首页标签分类
05Kafka
2024-06-03 · 更新 2026-03-03约 9 分钟 · 2431 字
大数据杂文记
000

目录

kafk复习
采用kafka作为消息队列原因
kafka的基础架构分析
生产者发送数据流程
broker的初始化流程
消费者初始化流程
消费者的消费流程
生产者的分区器分类
消费者的分区分配策略
kafka集群规模
数据量的估算
如何预估topic的分区数
kafka的数据一致性问题
kafka的数据有序性保证
kafka如何解决数据积压
kafka高效的原因
kafka的优化(提升kafka的吞吐)

kafk复习

采用kafka作为消息队列原因

  1. 出于后期对打造实时分析平台和现在的离线数据分析,需要兼顾,故采用
  2. 数据采集速率和数据处理速率不同,kafka可动态提高消费能力,避免数据积压
  3. 应用间的解耦,当后期出现不同的应用需要原始数据,kafka可以作为统一数据源

kafka的基础架构分析

生产者发送数据流程

  1. 生产者发送数据主要包含两个线程,main线程和sender线程,mian线程中,消息通过封装后,通过调
    sender方法发送给broker
  2. main线程中,消息依次通过拦截器,序列化器,分区器后进入缓冲,缓冲大小默认32M,每个分区的数据进
    对应的双端队列,通过batch.size 和 linger.ms参数控制数据发送给sender的时机
  3. sender线程中,会创建网络客户端,与broker建立联系,默认每个broker只能缓存5个请求,向broker
    送数据后,根据配置参数acks,broker会汇报数据的落盘情况

broker的初始化流程

  1. broker初始化时,如果使用zookeeper作为注册中心,先向zookeeper注册的会当选为集群的controller
  2. controller负责副本leader的选举,broker的上下线等,当数据的leader出现故障,controller根据
    zookeeper的选举机制,负责选举新的broker作为副本leader,同时更新zookeeper节点的信息,其他的存
    活follower从zookeeper读取最新的动态
  3. AR队列:所有副本的总称(ISR 和 OSR),ISR(In-Sync Replicas):同步的副本节点,包括leader节
    点 ; OSR(Out-of-Sync Replicas): 与leader失去联系的副本节点,不包括leader节点

消费者初始化流程

  1. 消费者的初始化流程主要包括消费者组中消费计划的制定
  2. 消费者建立时,会选择其中的一个消费者作为leader,制定分区消费策略,并汇报给cordinator,并由
    cordinator分发计划给其他的消费者

消费者的消费流程

消费时,每个消费者按照分配的消费策略,通过网络客户端对指定分区的数据发送请求数据,并将从broker抓取的
数据放在队列中,再从队列中拉取

如何保证消费的顺序?
kafak只能保证单分区内的数据有序性,具体顺序需求需要结合场景自定义实现

生产者的分区器分类

  1. 默认的分区分配策略 Sticky Partitioner
  • 是否指定分区?指定了,直接发往指定的分区
  • 消息是否拥有key?拥有,对key算法取模,决定发往的分区
  • 否则使用黏性分区
  1. 纯粹的黏性分区 Unifromed Sticky Partitioner
    不管消息是否有key,直接使用黏性分区策略
  2. Round Robin 轮询分区策略
    消息对topic的分区进行轮询分配

消费者的分区分配策略

默认策略:Range + CooperativeSticky

  1. Range 对每个topic,分区数量 / 消费者数 决定每个消费者将要消费的平均分区数, 当消费的topic数量
    多可能出现数据倾斜
  2. Round Robin 所有的topic的分区和consumer排序,按照hashcode排序,轮询分配,没有考虑到
    rebalance发生时带来的重分区开销
  3. 黏性消费,和Range策略大致相同,但当消费组内出现了rebalance,,尽量遵守两个规则,第一分区分配均
    衡,第二尽量会复用rebalance前的消费策略,如果两个规则冲突,会以第一条为准。

kafka集群规模

从吞吐的要求来看,考虑磁盘的吞吐
写入吞吐/s = 写入速率 * 副本数
读取吞吐/s = 读取速率 * (副本数 -1 + 消费者组数量)
实际的数据速率高峰 = 20M/s ,期望能支撑 100M/s的写入速率
写吞吐 = 200M/s
读吞吐 = 200M/s
一般单台磁盘的io速率 100+M/s ===》 至少2台,给3台 监控器
Kafka Eagle

数据量的估算

100万日活,每人每天平均产生100条数据,每条数据0.5-2k,平均1k 每天条数 = 100万 * 100条 = 1亿条
每天的数据大小 = 1亿条 * 1k = 100G(每1万日活,对应1G数据,对应100万条) 平均条数 = 1亿条 / (24 * 60 * 60 ) = 约 1150条/s (说估算值,大概1000多条/s)
平均速率 = 1150条/s * 1k = 约 1M/s 高峰时段 = 晚上下班 7-12点左右
高峰条数 = 平均 20倍 = 23000条/s (大概值,大概2万条/s左右)
高峰速率 = 约 20 M/s 低谷时段 = 半夜 3-4点
低谷条数 = 几十条/s
低谷速率 = 几十k/s

如何预估topic的分区数

压测:官方压测脚本
单分区生产峰值速率 Tp = 30多M/s
单分区消费峰值速率 Tc = 50M/s
期望支撑的吞吐高峰 Tt= 100M/s
估算的分区数 = Tt / min(Tp,Tc) = 3个分区

kafka的数据一致性问题

  1. 数据是否重复 生产者开启幂等性:保证单分区单会话不重
    <PID,partition,SequenceNum>
    PID
    生产者id
    partition:发往的分区号
    SequenceNum:生产者给数据打上的自增序列 消费者重复消费引起的重复
    可能出现重复消费的场景
  • 消费者消费了数据,但是并未提交offset
  • 消费者使用自动提交offset,但当还没有提交的时候,消费出现了rebalance,新的消费者重新消费解决办法:
    • 下游数据消费支持幂等写入
    • 下游数据消费使用事务写入
  1. 数据是否丢失(producer写入时丢失) producer端:
    acks参数设置为-1

0 不用等待数据落盘,只管不断地生产数据
1 等待leader响应落盘
-1 等待leader和follower都落盘后响应

broker端:

  • 数据副本和最小同步副本数(min.insync.replicas)都至少为2
  • 数据副本为2保证降低数据丢失风险,最小同步副本数据为2保证何时才算数据同步完成

kafka的数据有序性保证

1.单分区有序

  • 开启幂等性
    新来数据的SeqNum = broker保存的最大SeqNum + 1 ,才接收如果 >1, 打回重新排序,按照一个并发
    发送
  • 并发设为1
    没有开启幂等性的时候,max.in.flight.requests.per.connection 2.多区间无序:由架构决定的 方案一: 使用单分区的topic 方案二: 特定场景, 保证同一张表的数据有序
    指定key = 库名+表名(项目中使用的方式),通过配置maxwell的producer_partition_by=table
    保证一张表的数据就是一个分区。 方案三: 数据消费之后,再攒批全局排序

kafka如何解决数据积压

  1. 如何发现数据积压?
    通过kafka的监控工具kafka eagle
  2. 数据积压的危害
    • 数据的失效性变低,解决积压问题后,再次消费数据导致滞后消费,时效性减低
    • 数据积压时间可能超过kafka设置数据清理最大时长,导致数据丢失
  3. 解决办法
    • 提高消费者的消费能力
      同比增加消费者数和topic的分区数据,提高消费吞吐提高单个消费者的消费能力,比如增大每次从broker
      抓取数据的数据的大小fetch.min.bytes,从默认的100M提高到200M;
      提高消费每次从队列中拉取数据的条数,从默认的500条提高到1000条 max.poll.records
    • 提高消费者的处理能力
      比如kafak的下游消费为flink,可以针对flink的反压进行优化

kafka高效的原因

1.读写操作

  • kafka本身分布式,采用数据分区写
  • 使用内核的页缓存技术(读和写),减少磁盘IO,如果Kafka producer的生产速率与consumer的消费速率相
    差不大,那么就能几乎只靠对broker page cache的读写完成整个生产-消费过程
    2.写数据:主要针对生产者
  • 顺序写磁盘,使得kafka使用普通磁盘和固态硬盘写数据速率差不多 3.读数据:主要针对消费者
  • 每个分区内数据数据逻辑分片并区内建立稀疏索引,可以快速定位消费的数据
    稀疏索引每隔4kB的数据大小建立一个(offset,文件中的绝对位置)索引,除此.timeindex文件也采用了
    稀疏索引(timestamp,offset),故kafka可以根据具体的offset或者时间戳指定消费
  • 零拷贝技术,不用在读取数据的时候重复在内核和应用间拷贝将要消费的数据
    普通的读数据流程:

    kafka使用零拷贝

kafka的优化(提升kafka的吞吐)

  1. producer端
    • batch.size 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
    • linger.ms 如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据
    • buffer.memory RecordAccumulator缓冲区总大小,默认32m
    • compress.type 生产者发送的所有数据的压缩方式。默认是none,不压缩
  2. broker端
    • 增加分区
  3. consumer端
    • fetch.min.bytes 默认1个字节。消费者获取服务器端一批消息最小的字节数。
    • fetch.max.bytes 消费者获取服务器端一批消息最大的字节数
    • max.poll.records 一次poll拉取数据返回消息的最大条数,默认是500条

本文作者:hedeoer

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!