2024-06-03 · 更新 2026-03-03约 9 分钟 · 2431 字大数据杂文记
00 目录
kafk复习
采用kafka作为消息队列原因
kafka的基础架构分析
生产者发送数据流程
broker的初始化流程
消费者初始化流程
消费者的消费流程
生产者的分区器分类
消费者的分区分配策略
kafka集群规模
数据量的估算
如何预估topic的分区数
kafka的数据一致性问题
kafka的数据有序性保证
kafka如何解决数据积压
kafka高效的原因
kafka的优化(提升kafka的吞吐)
kafk复习
采用kafka作为消息队列原因
- 出于后期对打造实时分析平台和现在的离线数据分析,需要兼顾,故采用
- 数据采集速率和数据处理速率不同,kafka可动态提高消费能力,避免数据积压
- 应用间的解耦,当后期出现不同的应用需要原始数据,kafka可以作为统一数据源
kafka的基础架构分析
生产者发送数据流程

- 生产者发送数据主要包含两个线程,main线程和sender线程,mian线程中,消息通过封装后,通过调
sender方法发送给broker
- main线程中,消息依次通过拦截器,序列化器,分区器后进入缓冲,缓冲大小默认32M,每个分区的数据进
对应的双端队列,通过batch.size 和 linger.ms参数控制数据发送给sender的时机
- sender线程中,会创建网络客户端,与broker建立联系,默认每个broker只能缓存5个请求,向broker
送数据后,根据配置参数acks,broker会汇报数据的落盘情况
broker的初始化流程

- broker初始化时,如果使用zookeeper作为注册中心,先向zookeeper注册的会当选为集群的controller
- controller负责副本leader的选举,broker的上下线等,当数据的leader出现故障,controller根据
zookeeper的选举机制,负责选举新的broker作为副本leader,同时更新zookeeper节点的信息,其他的存
活follower从zookeeper读取最新的动态
- AR队列:所有副本的总称(ISR 和 OSR),ISR(In-Sync Replicas):同步的副本节点,包括leader节
点 ; OSR(Out-of-Sync Replicas): 与leader失去联系的副本节点,不包括leader节点
消费者初始化流程

- 消费者的初始化流程主要包括消费者组中消费计划的制定
- 消费者建立时,会选择其中的一个消费者作为leader,制定分区消费策略,并汇报给cordinator,并由
cordinator分发计划给其他的消费者
消费者的消费流程
消费时,每个消费者按照分配的消费策略,通过网络客户端对指定分区的数据发送请求数据,并将从broker抓取的
数据放在队列中,再从队列中拉取
如何保证消费的顺序?
kafak只能保证单分区内的数据有序性,具体顺序需求需要结合场景自定义实现
生产者的分区器分类
- 默认的分区分配策略 Sticky Partitioner
- 是否指定分区?指定了,直接发往指定的分区
- 消息是否拥有key?拥有,对key算法取模,决定发往的分区
- 否则使用黏性分区

- 纯粹的黏性分区 Unifromed Sticky Partitioner
不管消息是否有key,直接使用黏性分区策略
- Round Robin 轮询分区策略
消息对topic的分区进行轮询分配
消费者的分区分配策略
默认策略:Range + CooperativeSticky
- Range 对每个topic,分区数量 / 消费者数 决定每个消费者将要消费的平均分区数, 当消费的topic数量
多可能出现数据倾斜
- Round Robin 所有的topic的分区和consumer排序,按照hashcode排序,轮询分配,没有考虑到
rebalance发生时带来的重分区开销
- 黏性消费,和Range策略大致相同,但当消费组内出现了rebalance,,尽量遵守两个规则,第一分区分配均
衡,第二尽量会复用rebalance前的消费策略,如果两个规则冲突,会以第一条为准。
kafka集群规模
从吞吐的要求来看,考虑磁盘的吞吐
写入吞吐/s = 写入速率 * 副本数
读取吞吐/s = 读取速率 * (副本数 -1 + 消费者组数量)
实际的数据速率高峰 = 20M/s ,期望能支撑 100M/s的写入速率
写吞吐 = 200M/s
读吞吐 = 200M/s
一般单台磁盘的io速率 100+M/s ===》 至少2台,给3台
监控器
Kafka Eagle
数据量的估算
100万日活,每人每天平均产生100条数据,每条数据0.5-2k,平均1k
每天条数 = 100万 * 100条 = 1亿条
每天的数据大小 = 1亿条 * 1k = 100G(每1万日活,对应1G数据,对应100万条)
平均条数 = 1亿条 / (24 * 60 * 60 ) = 约 1150条/s (说估算值,大概1000多条/s)
平均速率 = 1150条/s * 1k = 约 1M/s
高峰时段 = 晚上下班 7-12点左右
高峰条数 = 平均 20倍 = 23000条/s (大概值,大概2万条/s左右)
高峰速率 = 约 20 M/s
低谷时段 = 半夜 3-4点
低谷条数 = 几十条/s
低谷速率 = 几十k/s
如何预估topic的分区数
压测:官方压测脚本
单分区生产峰值速率 Tp = 30多M/s
单分区消费峰值速率 Tc = 50M/s
期望支撑的吞吐高峰 Tt= 100M/s
估算的分区数 = Tt / min(Tp,Tc) = 3个分区
kafka的数据一致性问题
- 数据是否重复
生产者开启
幂等性:保证单分区单会话不重
<PID,partition,SequenceNum>
PID 生产者id
partition:发往的分区号
SequenceNum:生产者给数据打上的自增序列
消费者重复消费引起的重复
可能出现重复消费的场景
- 消费者消费了数据,但是并未提交offset
- 消费者使用自动提交offset,但当还没有提交的时候,消费出现了rebalance,新的消费者重新消费解决办法:
- 下游数据消费支持幂等写入
- 下游数据消费使用事务写入
- 数据是否丢失(producer写入时丢失)
producer端:
acks参数设置为-1
0 不用等待数据落盘,只管不断地生产数据
1 等待leader响应落盘
-1 等待leader和follower都落盘后响应
broker端:
数据副本和最小同步副本数(min.insync.replicas)都至少为2
- 数据副本为2保证降低数据丢失风险,最小同步副本数据为2保证何时才算数据同步完成
kafka的数据有序性保证
1.单分区有序
- 开启幂等性
新来数据的SeqNum = broker保存的最大SeqNum + 1 ,才接收如果 >1, 打回重新排序,按照一个并发
发送
- 并发设为1
没有开启幂等性的时候,max.in.flight.requests.per.connection
2.多区间无序:由架构决定的
方案一: 使用单分区的topic
方案二: 特定场景, 保证同一张表的数据有序
指定key = 库名+表名(项目中使用的方式),通过配置maxwell的producer_partition_by=table
保证一张表的数据就是一个分区。
方案三: 数据消费之后,再攒批全局排序
kafka如何解决数据积压
- 如何发现数据积压?
通过kafka的监控工具kafka eagle

- 数据积压的危害
- 数据的失效性变低,解决积压问题后,再次消费数据导致滞后消费,时效性减低
- 数据积压时间可能超过kafka设置数据清理最大时长,导致数据丢失
- 解决办法
- 提高消费者的消费能力
同比增加消费者数和topic的分区数据,提高消费吞吐提高单个消费者的消费能力,比如增大每次从broker
抓取数据的数据的大小fetch.min.bytes,从默认的100M提高到200M;
提高消费每次从队列中拉取数据的条数,从默认的500条提高到1000条 max.poll.records
- 提高消费者的处理能力
比如kafak的下游消费为flink,可以针对flink的反压进行优化
kafka高效的原因
1.读写操作
- kafka本身分布式,采用数据分区写
- 使用内核的页缓存技术(读和写),减少磁盘IO,如果Kafka producer的生产速率与consumer的消费速率相
差不大,那么就能几乎只靠对broker page cache的读写完成整个生产-消费过程
2.写数据:主要针对生产者
- 顺序写磁盘,使得kafka使用普通磁盘和固态硬盘写数据速率差不多
3.读数据:主要针对消费者
- 每个分区内数据数据逻辑分片并区内建立稀疏索引,可以快速定位消费的数据
稀疏索引每隔4kB的数据大小建立一个(offset,文件中的绝对位置)索引,除此.timeindex文件也采用了
稀疏索引(timestamp,offset),故kafka可以根据具体的offset或者时间戳指定消费
- 零拷贝技术,不用在读取数据的时候重复在内核和应用间拷贝将要消费的数据
普通的读数据流程:

kafka使用零拷贝

kafka的优化(提升kafka的吞吐)
- producer端
- batch.size 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
- linger.ms 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
- buffer.memory RecordAccumulator缓冲区总大小,默认32m
- compress.type 生产者发送的所有数据的压缩方式。默认是none,不压缩
- broker端
- consumer端
- fetch.min.bytes 默认1个字节。消费者获取服务器端一批消息最小的字节数。
- fetch.max.bytes 消费者获取服务器端一批消息最大的字节数
- max.poll.records 一次poll拉取数据返回消息的最大条数,默认是500条
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA
许可协议。转载请注明出处!