博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
大数据基础面试题1
阅读量:3967 次
发布时间:2019-05-24

本文共 9273 字,大约阅读时间需要 30 分钟。

Kafka分区分配策略

在 Kafka内部存在两种默认的分区分配策略:Range和 RoundRobin。

Range是默认策略。Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
例如:我们有10个分区,两个消费者(C1,C2),3个消费者线程,10 / 3 = 3而且除不尽。
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C2-1 将消费 7, 8, 9 分区
将所有主题分区组成TopicAndPartition列表,然后对TopicAndPartition列表按照hashCode进行排序,最后按照轮询的方式发给每一个消费线程。
在这里插入图片描述

Kafka数据重复

幂等性+ack-1+事务

Kafka数据重复,可以再下一级:SparkStreaming、redis或者hive中dwd层去重,去重的手段:分组、按照id开窗只取第一个值;
Kafka消费者有两个配置参数:

max.poll.interval.ms

两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。

两次poll超过此时间间隔,Kafka服务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。

max.poll.records

一次poll操作获取的消息数量。默认值50。

如果每条消息处理时间超过60秒,那么一批消息处理时间将超过5分钟,从而引发poll超时,最终导致重复消费。

配置

1)分析业务代码逻辑,进行性能优化,确保每条消息处理时间控制在合理范围

每条消息的处理时间不要超过5分钟。

如果超过5分钟,则考虑进行架构上的优化。

比如A线程消费消息,放到进程内部队列中,提交offset;其他线程从内部队列取消息,并处理业务逻辑。为防止内部队列消息积压,A线程需要监控队列中消息数量,超过一定量时进入等待。

2)适当增大max.poll.interval.ms的值

SpringBoot没有提供可调节此数值的参数。如果修改此数值,需要自己封装方法创建Kafka客户端:

org.apache.kafka.clients.consumer.KafkaConsumer.KafkaConsumer<String,String>(Properties)

3)适当减小max.poll.records的值

可通过SpringBoot配置参数"spring.kafka.consumer.max-poll-records"调整。

flume kafka channel哪个版本产生的?

flume1.6 版本产生=》并没有火;因为有bug

topic-start 数据内容true 和false 很遗憾,都不起作用。
增加了额外清洗的工作量。
flume1.7解决了这个问题,开始火了。

spark Streaming消费kafka数据有两种方式

1)receiver方式(zk管理偏移量)

  1. 需要使用单独的Receiver线程来异步获取Kafka数据。
  2. Receiver底层实现中使用了Kafka高级消费者API,因此,不需要自己管理Offset,只需指定Zookeeper和消费者组GroupID,系统便会自行管理。
  3. 执行过程: Spark
    Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据,获取的数据先存储在Receiver中(存储方式由StorageLevel决定),后续,当Batch
    Job触发后,这些数据会被转移到剩下的Executor中被处理。处理完毕后,Receiver会自动更新Zookeeper中的Offset。
  4. 默认情况下,程序失败或Executor宕掉后可能会丢失数据,为避免数据丢失,可启用预写日志(Write Ahead
    Log,WAL)。将Receiver收到的数据再备份一份到更可靠的系统如HDFS分布式文件中,以冗余的数据来换取数据不丢失。
  5. 生产下,为保证数据完全不丢失,一般需要启用WAL。启用WAL,在数据量较大,网络不好情况下,会严重降低性能。

2)Kafka Direct方式(自己管理偏移量)

不需要使用单独的Receiver线程从Kafka获取数据。

  1. 使用Kafka简单消费者API,不需要ZooKeeper参与,直接从Kafka Broker获取数据。
  2. 执行过程:Spark Streaming Batch
    Job触发时,Driver端确定要读取的Topic-Partition的OffsetRange,然后由Executor并行从Kafka各Partition读取数据并计算。
  3. 为保证整个应用EOS, Offset管理一般需要借助外部存储实现。如Mysql、HBase等。
  4. 由于不需要WAL,且Spark Streaming会创建和Kafka Topic Partition一样多的RDD
    Partition,且一一对应,这样,就可以并行读取,大大提高了性能。
  5. Spark
    Streaming应用启动后,自己通过内部currentOffsets变量跟踪Offset,避免了基于Receiver的方式中Spark
    Streaming和Zookeeper中的Offset不一致问题。

Kafka消费者角度考虑是拉取数据还是推送数据

拉取数据

flume 拦截器可以不用吗?

可以不用;需要在下一级hive的dwd层和sparksteaming里面处理

优势:只处理一次,轻度处理;
劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。

如何保证数据不丢失? ACK机制?

acks:

0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当broker故障时有可能丢失数据;
1:producer等待broker的ack,partition的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
在这里插入图片描述
-1(all):producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
在这里插入图片描述

flume taildir挂了怎么办?

不会丢数:断点续传

flume 的file channel /memory channel/kafka channel

  1. file channel
    数据存储于磁盘,优势:可靠性高;劣势:传输速度低
    默认容量:100万event 注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
  2. memory channel
    数据存储于内存,优势:传输速度快;劣势:可靠性差
    默认容量:100个event
  3. kafka channel
    数据存储于Kafka,基于磁盘;
    优势:可靠性高;
    传输速度快 kafka channel》memory channel+kafka sink 原因省去了sink阶段

Kafka的硬盘大小

每天的数据量100g2个副本3天/70%

kafka副本数设定

一般我们设置成2个或3个,很多企业设置为2个。

副本的优势:提高可靠性;
副本劣势:增加了网络IO传输

flume 自定义拦截器步骤

(1)实现 Interceptor

(2)重写四个方法
initialize 初始化
public Event intercept(Event event) 处理单个Event
public List intercept(List events) 处理多个Event,在这个方法中调用Event intercept(Event event)
close 方法
(3)静态内部类,实现Interceptor.Builder

Kafka中数据量计算

每天总数据量100g,每天产生1亿条日志, 10000万/24/60/60=1150条/每秒钟

平均每秒钟:1150条
低谷每秒钟:50条
高峰每秒钟:1150条*(2-20倍)=2300条-23000条
每条日志大小:0.5k-2k(取1k)
每秒多少数据量:2.0M-20MB

kafka 组件?

1)Producer :消息生产者,就是向kafka broker发消息的客户端;2)Consumer :消息消费者,向kafka broker取消息的客户端;3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。

Kafka挂掉

1)Flume记录

2)日志有记录
3)短期没事

Kafka单条日志传输大小

kafka对于消息体的大小默认为单条最大值是1M但是在我们应用场景中, 常常会出现一条消息大于1M,如果不对kafka进行配置。则会出现生产者无法将消息推送到kafka或消费者无法去消费kafka里面的数据, 这时我们就要对kafka进行以下配置:server.properties

replica.fetch.max.bytes: 1048576 broker可复制的消息的最大字节数, 默认为1M
message.max.bytes: 1000012 kafka 会接收单个消息size的最大限制, 默认为1M左右
注意:message.max.bytes必须小于等于replica.fetch.max.bytes,否则就会导致replica之间数据同步失败。

kafka 的消息包含那些?

CRC32:4个字节,消息的校验码。

magic:1字节,魔数标识,与消息格式有关,取值为0或1。
attributes: 1字节,消息的属性。
timestamp: 时间戳,其含义由attributes的第3位确定。
key length:消息key的长度。
key:消息的key。
value length:消息的value长度。
value:消息的内容

谈谈Flume监控器

1)采用Ganglia监控器,监控到flume尝试提交的次数远远大于最终成功的次数,说明flume运行比较差。

2)解决办法?
(1)自身:增加内存flume-env.sh 4-6g
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
(2)找朋友:增加服务器台数
搞活动 618 =》增加服务器=》用完在退出
日志服务器配置:8-16g内存、磁盘8T

Flume采集数据会丢失吗?(防止数据丢失的机制)

如果是FileChannel不会,Channel存储可以存储在File中,数据传输自身有事务。

如果是MemoryChannel有可能丢。

怎么保证kafka数据全局有序?

1)保证生产有充、消费有序、存储有序

2)只能有一个生产者,一个partition,一个consumer
ps:但是这违背分布式系统初衷,因此这是一个伪命题

Kafka监控

公司自己开发的监控器;

开源的监控器:KafkaManager、KafkaMonitor、KafkaEagle

Kafka高效读写数据

1)Kafka本身是分布式集群,同时采用分区技术,并发度高。

2)顺序写磁盘
Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。
3)零复制技术

Kafka参数优化

1)Broker参数配置(server.properties)

1、日志保留策略配置
#保留三天,也可以更短 (log.cleaner.delete.retention.ms)
log.retention.hours=72

2、Replica相关配置

default.replication.factor:1 默认副本1个

3、网络通信延时

replica.socket.timeout.ms:30000 #当集群之间网络不稳定时,调大该参数
replica.lag.time.max.ms= 600000# 如果网络不好,或者kafka集群压力较大,会出现副本丢失,然后会频繁复制副本,导致集群压力更大,此时可以调大该参数
2)Producer优化(producer.properties)
compression.type:none
#默认发送不进行压缩,推荐配置一种适合的压缩算法,可以大幅度的减缓网络压力和Broker的存储压力。
3)Kafka内存调整(kafka-server-start.sh)
默认内存1个G,生产环境尽量不要超过6个G。
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"

flume HDFS sink 设置

(1)时间(1小时-2小时) or 大小128m、event个数(0禁止)

具体参数:hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0

kafka 多少个Topic

通常情况:多少个日志类型就多少个Topic。也有对日志类型进行合并的。

生产环境如何选择 flume channel

如果下一级是kafka,优先选择kafka channel

如果是金融、对钱要求准确的公司,选择file channel
如果就是普通的日志,通常可以选择memory channel
每天丢几百万数据 pb级 亿万富翁,掉1块钱会捡?

Kafka丢不丢数据

Ack=0,相当于异步发送,消息发送完毕即offset增加,继续生产。

Ack=1,leader收到leader replica 对一个消息的接受ack才增加offset,然后继续生产。
Ack=-1/ all,leader收到所有replica 对一个消息的接受ack才增加offset,然后继续生产。

flume 事务

put事务步骤:

doput :先将批数据写入临时缓冲区putlist里面
docommit:去检查channel里面有没有空位置,如果有就传入数据,如果没有那么dorollback就把数据回滚到putlist里面。

take事务步骤:

dotake:将数据读取到临时缓冲区takelist,并将数据传到hdfs上。
docommit :去判断数据发送是否成功,若成功那么清除临时缓冲区takelist
若不成功(比如hdfs系统服务器崩溃等)那么dorollback将数据回滚到channel里面。

Kafka压测

Kafka官方自带压力测试脚本(kafka-consumer-perf-test.sh、kafka-producer-perf-test.sh)。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

Kafka的ISR副本同步队列

ISR(In-Sync Replicas),副本同步队列。ISR中包括Leader和Follower。如果Leader进程挂掉,会在ISR队列中选择一个服务作为新的Leader。有replica.lag.max.messages(延迟条数)和replica.lag.time.max.ms(延迟时间)两个参数决定一台服务是否可以加入ISR副本队列,在0.10版本移除了replica.lag.max.messages参数,防止服务频繁的进去队列。

任意一个维度超过阈值都会把Follower剔除出ISR,存入OSR(Outof-Sync Replicas)列表,新加入的Follower也会先存放在OSR中。

Kafka日志保存时间

默认保存7天;生产环境建议3天

Kafka中的数据是有序的吗

单分区内有序;多分区,分区与分区间无序;

Kafka过期数据清理

保证数据没有被引用(没人消费他)

日志清理保存的策略只有delete和compact两种
log.cleanup.policy=delete启用删除策略
log.cleanup.policy=compact启用压缩策略

flume taildir source 是否支持递归遍历文件夹读取文件?

不支持。

自定义 递归遍历文件夹 +读取文件

flume taildir source 怎么处理重复数据?

不处理:

生产环境通常不处理,因为会影响传输效率
处理:自身:
在taildirsource里面增加自定义事务
找兄弟:
下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis)

flume的source,channel,sink分别有哪些

source有avro、spooldir、exec、syslog tcp、kafka、自定义

channel有memory 、file、jdbc
sink有kafka、hdfs、hbase、file、avro、logger、自定义等

Kafka可以按照时间消费数据

Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = KafkaUtil.fetchOffsetsWithTimestamp(topic, sTime, kafkaProp);

Kafka的机器数量

Kafka机器数量=2*(峰值生产速度*副本数/100)+ 1

flume没有断点续传功能时怎么做的?

设计思路:

  1. 在sink往kafka发送完后,记录原始日志文件及行号信息到record文件(记录文件内容只有几个字节,内容为:debug.log,2016-05-18-15,265。第二个字段为时间槽或者文件的最后修改的long型毫秒时间)
  2. 每次重启时,在source中检查record文件是否存在,存在的话读取上次最后一条日志的文件名和行号
  3. 如果第2步的record文件记录和当前最新的需要抽取日志文件是同一个,且行数差距在1000以内(具体可以配置),直接tail -f -n +${上次抽完的行号+1} --pid ${flume的进程号} debug.log
  4. 如果第2步的record文件记录和当前最新的需要抽取日志文件是同一个且行数差距在1000以上;或者record文件记录里原始日志文件名和当前的不是同一个,直接tail -f -n +${文件的最新行号+1} debug.log;另外记录这段落下的日志信息到快照文件snapshot,快照文件内容为:debug.log,2016-05-18-14,5432,debug.log,2016-05-18-15,1234
  5. 在source中启动tail命令后,检查是否有快照文件snapshot,如果有则开启一个线程把快照文件里记录的日志读取直接写到kafka。
    部分细节
  6. record文件的写入,采用ByteBuffer,避免频繁IO,在指定时间间隔后flush到磁盘
  7. 重启过程中,不要使用kill -9结束flume,使用kill -15,在接收到停止信号时,所有的source和sink快速停止操作,等待flume框架来调用每个source和sink的stop方法,在stop方法中刷新ByteBuffer内容到磁盘。
  8. 为什么要在sink里记录行号, 主要是保证记录往kafka里写入成功的行号,另外不用tail -F 也是为了在日志文件名切换时重新从1开始计数行号
  9. 采用fileChannel的,相应的在source写入channel成功后,记录行号即可
    实际测试结果,在一个每秒产生300条日志的单个日志文件,做flume重启,没有丢失数据

Kafka消息数据积压,Kafka消费能力不足怎么处理?

1)如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

2)如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

flume 拦截器注意事项

项目中自定义了:ETL拦截器。

采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些

Kakfa分区数

1)创建一个只有1个分区的topic

2)测试这个topic的producer吞吐量和consumer吞吐量。
3)假设他们的值分别是Tp和Tc,单位可以是MB/s。
4)然后假设总的目标吞吐量是Tt,那么分区数=Tt / min(Tp,Tc)
例如:producer吞吐量=20m/s;consumer吞吐量=50m/s,期望吞吐量100m/s;
分区数=100 / 20 =5分区
分区数一般设置为:3-10个

转载地址:http://gwgzi.baihongyu.com/

你可能感兴趣的文章
awk 控制结构
查看>>
awk 格式化输出
查看>>
awk 正则表达式
查看>>
awk 函数
查看>>
awk 向命令传递参数
查看>>
awk I/O
查看>>
grep 精萃
查看>>
java switch语句
查看>>
java try-with-resources 语句
查看>>
DB2 行转列
查看>>
DB2 认证路线图
查看>>
一个类似行转列的问题
查看>>
遇到问题该如何解决
查看>>
美国金融体系
查看>>
DB CHNGPGS_THRES 参数
查看>>
DB2 特殊寄存器(Special Registers)
查看>>
在ORDER BY 子句中加入主键或唯一键
查看>>
DB2 UPDATE 语句
查看>>
SQL PL 精萃
查看>>
GROUPING SETS、ROLLUP、CUBE
查看>>