第02篇_Kafka

第01章_Kafka快速入门

第一节 Kafka简介

1. 什么是Kafka?

Kafka是一个基于 Scala 语言开发的分布式消息队列(Message Queue),也称作分布式事件流平台(Event Streaming Platform)。

它支持如下两种模式:

主要的功能如下:

 

2. 基础架构

Kafka基础架构图如下:

image-20241128154659268

 

 

第二节 安装部署

1. 集群部署

 

2. 配置 server.properties

 

3. 启停命令

 

4. 配置开机自启

新建vim /etc/systemd/system/kafka.service文件,内容如下:

设置开机自启:

 

5. 常用命令行操作

1) 主题相关操作

注意:

  1. 在低版本中,查看所有主题需使用 ./kafka-topics.sh --zookeeper <ZOOKEEPER_ADDRESS> --list命令,其它同理。

 

2) 生产者相关操作

 

3) 消费者相关操作

 

4) 副本相关操作

 

5) Kafka压测操作

 

 

第三节 生产者

1. 消息发送流程

生产者采用推(push)模式将消息发送到 broker ,具体存储于哪个分区可由生产者指定(指定分区或指定Hash Key)或轮询选出

image-20241128193826525

注意:

  1. 分区数据始终先由 Leader 写入磁盘,然后复制给其它 follower 副本。

 

2. 生产者参数配置

参数名称参数说明
bootstrap.servers建立连接用到的Broker列表,以逗号分隔,如:10.201.65.21:9092,10.201.65.22:9092,10.201.65.23:9092;
key.serializer发送消息时 Key 的序列化器(全类名形式)
value.serializer发送消息时 Value 的序列化器(全类名形式)
buffer.memory生产者消息队列(RecordAccumulator)总大小,默认32m
batch.size消息发送缓冲区大小,默认 16k,提高该值可增加吞吐量,但会增加传输时延
linger.ms消息发送最多缓冲时间,默认 0ms,即立即发送,生产建议 5-100ms 之间
acks0:发送后无需等待数据落盘应答,一般不使用;
1:发送后需等待Leader应答,传输日志数据等;
-1/all:等待 isr 队列中的所有节点(可同步副本)应答,默认值,传输核心业务数据
max.in.flight.
requests.per.connection
允许最多没有返回 ack 的次数,默认为 5次,开启幂等性时要保证该值是 1-5 。
retries发送失败后的重试次数,默认为2147483647
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
enable.idempotence是否开启幂等性,默认 true,开启幂等性
compression.type消息压缩格式,默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

 

3. Java API

1) 导入依赖

 

2) 同步发送

 

3) 异步发送

注意:

  1. 除了自定义分区器外,ProducerRecord类可以直接指定消息发送的分区消息key
  2. 默认的DefaultPartitioner 会优先使用 消息Key 的 hash 值对主题分区数进行取余发送。
  3. 如果既未设置自定义分区器,又未设置消息 Key,则会使用随机粘性分区器(随机分区,但会优先填满目前分区的当前批次)

 

4) 事务控制

注意:

  1. 进行事务控制时, 必须开启幂等性,即 enable.idempotence 设置为 true
  2. 必须定义一个唯一的 transactional.id,这样即使生产者客户端挂掉了,重启后也能继续处理未完成的事务

 

4. Spring Boot API

1) 导入依赖

 

2) 生产者配置

 

3) 发送消息

 

 

第四节 Broker

1. ZK节点信息

image-20241212132329909

注意:

  1. 消费者在 zk 中注册,但生产者不在 zk 中注册。

 

2. Broker参数配置

参数名称参数说明
replica.lag.time.max.msFollower 向 Leader 发送通信请求/数据的最大间隔,默认30s,超时则会被踢出 ISR 队列
auto.leader.rebalance.enable自动 Leader Partition 平衡,默认为true
leader.imbalance.per.broker.percentage每个 broker 允许的不平衡 leader 的比率,默认为10%,超过后将会触发 leader 的平衡
leader.imbalance.check.interval.seconds检查 leader 负载是否平衡的间隔时间,默认300s
log.segment.bytes日志块大小,默认1G
log.index.interval.bytes日志索引间隔,默认4kb,即每写4kb日志文件,则在* index 文件*记录一个索引
log.retention.hours日志保存的时间,小时级别,默认 7 天
log.retention.minutes日志保存的时间,分钟级别,默认关闭
log.retention.ms日志保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms检查日志是否过期的间隔,默认是 5 分钟
log.retention.bytes日志保存的大小,默认-1,表示无穷大
log.cleanup.policy日志清理策略,默认是 delete,还可设置为 compact,使用压缩策略。
num.io.threads写磁盘的线程数,默认为8。这个参数值要占总核数的 50%
num.replica.fetchers副本拉取线程数,这个参数占总核数的 50% 的 1/3
num.network.threads数据传输线程数,默认为3,这个参数占总核数的50%的 2/3
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。
log.flush.interval.ms每隔多久刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

 

3. 核心机制

1) Leader 选举机制

Controller Leader 负责管理集群 broker的上下线,所有 topic 的分区副本分配Leader 选举等工作。

image-20241212195311434

2) Follower故障处理

image-20241212200435237

 

3) Leader故障处理

image-20241212200512208

 

4) 文件存储机制

image-20241212201321488

提示:

  1. 使用 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index 可以查看日志和索引文件。
  2. Broker相关参数可设置文件清理策略,按时间清理或按日志大小清理等。

 

 

第五节 消费者

1. 消费流程

消费者消费者组为单位采用 拉(pull)模式broker 中读取数据,每个分区在同一时间只能由组中的一个消费者进行读取,但是不同组可以同时消费这个分区。

image-20241128194553991

注意:

  1. 拉模式可以由消费者控制消息消费的速率,但是消费者可能会在等待消息的“长轮询”中被阻塞

 

2. 消费者参数配置

参数名称参数说明
bootstrap.servers建立连接用到的Broker列表,以逗号分隔,如:10.201.65.21:9092,10.201.65.22:9092,10.201.65.23:9092;
key.deserializer接收消息时 Key 的序列化器(全类名形式)
value.deserializer接收消息时 Value 的序列化器(全类名形式)
group.id标记消费者所属的消费者组
enable.auto.commit消费者是否向服务器提交偏移量,默认值为 true
auto.commit.interval.ms消费者向服务器提交偏移量的频率,默认 5s
auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在的处理方式:
earliest:自动重置偏移量到最早的偏移量。
latest:默认,自动重置偏移量为最新的偏移量
none:如果消费组原来的偏移量不存在,则向消费者抛异常。
anything:向消费者抛异常。
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区
heartbeat.interval.ms消费者和 coordinator 之间的心跳时间,默认 3s,建议小于 session.timeout.ms 的 1/3
session.timeout.ms消费者和 coordinator 之间连接超时时间,默认45s,超过则移除该消费者,消费者组执行再平衡
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟,超过则移除该消费者,消费者组执行再平衡
fetch.min.bytes消费者提取消息最小字节数,默认 1 个字节
fetch.max.bytes消费者提取消息最大字节数,默认52428800(50 m)
fetch.max.wait.ms消费者提取消息最长等待时间,默认500ms
max.poll.records一次 poll拉取数据返回消息的最大条数,默认是 500 条
partition.assignment.strategy消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky
Kafka 可以同时使用多个分区分配策略,可选:Range、RoundRobin、Sticky、CooperativeSticky。

 

3. Java API

1) 导入依赖

 

2) 从指定主题消费消息

 

3) 从指定分区消费消息

 

4) 从指定offset消费消息

 

5) 从指定时间消费消息

 

6) 事务控制

提示:

  1. 消费 offset 保存在内置主题__consumer_offsets中,key 是 group.id+topic+分区号,value 就是当前 offset 的值。

 

4. Spring Boot API

1) 导入依赖

 

2) 消费者配置

 

3) 消费消息

 

 

第六节 扩展知识

1. 生产经验

1) 生产者精确发送一次

 

2) 保证分区内数据有序

 

3) 消费者精确消费一次

 

4) 如何提高Kafka吞吐量

 

 

2. Kafka-Eagle 监控

Kafka-Eagle 框架(https://www.kafka-eagle.org/)可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。

image-20241211222552406

 

 

3. Kafka-Kraft 模式

Kafka 现有架构中(左图),元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。

而在 kraft 模式架构中(右图,实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点 代替 zookeeper,元数据保存在controller 中,由 controller 直接进行 Kafka集群管理。 这样做的好处有以下几个:

image-20241211222901717