Kafka是一个基于 Scala 语言开发的分布式消息队列(Message Queue),也称作分布式事件流平台(Event Streaming Platform)。
它支持如下两种模式:
主要的功能如下:
Kafka的基础架构图如下:
x1# 1. 安装JDK
2https://www.oracle.com/java/technologies/javase/jdk11-archive-downloads.html
3
4# 2. 下载Kafka
5http://kafka.apache.org/downloads.html
6
7# 3. 上传和解压安装包
8mkdir /usr/local/kafka
9tar -zxvf kafka_2.12-3.9.0.tgz
10
11# 4. 创建logs目录
12mkdir /usr/local/kafka/kafka_2.12-3.9.0/logs
13
14# 5. 修改Broker配置文件
15vi config/server.properties
16
17# 6. 配置环境变量
18vim /etc/profile
19export KAFKA_HOME=/usr/local/kafka/kafka_2.12-3.9.0
20export PATH=$PATH:$KAFKA_HOME/bin
21source /etc/profile
22
331# 【重要】broker编号(不同服务器编号必须不同)
2broker.id=0
3
4# 【重要】监听端口
5listeners=PLAINTEXT://主机名:9092
6
7# 支持删除主题
8delete.topic.enable=true
9
10# 处理网络请求的线程数量
11num.network.threads=3
12# 用来处理磁盘 IO 的线程数量
13num.io.threads=8
14# 发送套接字的缓冲区大小
15socket.send.buffer.bytes=102400
16# 接收套接字的缓冲区大小
17socket.receive.buffer.bytes=102400
18# 请求套接字的缓冲区大小
19socket.request.max.bytes=104857600
20
21# 【重要】运行日志存放的路径
22log.dirs=/usr/local/kafka/kafka_2.12-3.9.0/logs
23
24# 主题在当前机器上的分区个数
25num.partitions=1
26# 用来恢复和清理 data 下数据的线程数量
27num.recovery.threads.per.data.dir=1
28# segment 文件保留的最长时间,超时将被删除
29log.retention.hours=168
30
31# 【重要】ZK地址(以逗号分隔)
32zookeeper.connect=127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181
33
131# 授权
2cd /usr/local/kafka/kafka_2.12-3.9.0/bin
3chmod +x *.sh
4
5# ZK启动(可能需要)
6./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
7
8# Kafka启动
9./bin/kafka-server-start.sh -daemon ./config/server.properties
10
11# 关闭
12./bin/kafka-server-stop.sh stop
13
新建vim /etc/systemd/system/kafka.service
文件,内容如下:
151[Unit]
2Description=Apache Kafka Server
3After=network.target
4
5[Service]
6Type=simple
7User=kafka
8Group=kafka
9ExecStart=/usr/local/kafka/kafka_2.12-3.9.0/bin/kafka-server-start.sh /usr/local/kafka/kafka_2.12-3.9.0/server.properties
10ExecStop=/usr/local/kafka/kafka_2.12-3.9.0/bin/kafka-server-stop.sh /usr/local/kafka/kafka_2.12-3.9.0/server.properties
11Restart=on-failure
12
13[Install]
14WantedBy=multi-user.target
15
设置开机自启:
91# 开启服务
2sudo systemctl start kafka
3
4# 开机自启
5sudo systemctl enable kafka
6
7# 查看服务状态
8sudo systemctl status kafka
9
251# 查看主题操作文档
2# --bootstrap-server ip/host:port 连接到哪个Broker
3# --list 查看所有主题
4# --describe/create/delete/alter/ 查看/创建/删除/修改主题
5# --topic topic_name 操作的主题
6# --partitions partition_num 设置分区数
7# --replication-factor replication_num 设置分区副本数
8# --config name=value 修改系统默认的配置
9bin/kafka-topics.sh
10
11# 查看所有主题
12bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
13
14# 查看主题详情
15bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
16
17# 创建主题
18bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
19
20# 删除主题
21bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first
22
23# 增加主题分区数(注意:分区数只能增加,不能减少)
24bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
25
注意:
- 在低版本中,查看所有主题需使用
./kafka-topics.sh --zookeeper <ZOOKEEPER_ADDRESS> --list
命令,其它同理。
91# 查看生产者操作文档
2# --bootstrap-server ip/host:port 连接到哪个Broker
3# --topic topic_name 操作的主题
4bin/kafka-console-producer.sh
5
6# 向指定主题发送消息
7bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
8>hello world
9
131# 查看消费者操作文档
2# --bootstrap-server ip/host:port 连接到哪个Broker
3# --topic topic_name 操作的主题
4# --from-beginning 从头开始消费
5# --group consumer_group_id 指定消费者组名称
6bin/kafka-console-consumer.sh
7
8# 消费指定主题中的数据
9bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
10
11# 从头消费指定主题中的数据(包括之前已消费过的历史数据)
12bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
13
251# 增加副本因子
2vim increase-replication-factor.json
3输入:
4{
5 "version":1,
6 "partitions":[
7 {"topic":"four","partition":0,"replicas":[0,1,2]},
8 {"topic":"four","partition":1,"replicas":[0,1,2]},
9 {"topic":"four","partition":2,"replicas":[0,1,2]}]
10}
11bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
12
13# 调整分区副本存储
14vim increase-replication-factor.json
15输入:
16{
17 "version":1,
18 "partitions":[{"topic":"three","partition":0,"replicas":[0,1]},
19 {"topic":"three","partition":1,"replicas":[0,1]},
20 {"topic":"three","partition":2,"replicas":[1,0]},
21 {"topic":"three","partition":3,"replicas":[1,0]}]
22}
23bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --execute
24bin/kafka-reassign-partitions.sh --bootstrap-server hadoop102:9092 --reassignment-json-file increase-replication-factor.json --verify
25
171# 生产者压测
2bin/kafka-producer-perf-test.sh
3 --topic test
4 --record-size 1024 # 一条信息有多大,单位是字节,本次测试设置为 1k
5 --num-records 1000000 # 总共发送多少条信息,本次测试设置为 100 万条
6 --throughput 10000 # 每秒多少条信息,设成-1,表示不限流,尽可能快的生产数据
7 --producer-props # 生产者参数设置
8 bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
9 batch.size=16384 linger.ms=0
10
11# 消费者压测
12bin/kafka-consumer-perf-test.sh
13 --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092
14 --topic test
15 --messages 1000000 # 要消费的消息个数
16 --consumer.config config/consumer.properties
17
生产者采用推(push)模式将消息发送到 broker ,具体存储于哪个分区可由生产者指定(指定分区或指定Hash Key)或轮询选出。
注意:
- 分区数据始终先由 Leader 写入磁盘,然后复制给其它 follower 副本。
参数名称 | 参数说明 |
---|---|
bootstrap.servers | 建立连接用到的Broker列表,以逗号分隔,如:10.201.65.21:9092,10.201.65.22:9092,10.201.65.23:9092; |
key.serializer | 发送消息时 Key 的序列化器(全类名形式) |
value.serializer | 发送消息时 Value 的序列化器(全类名形式) |
buffer.memory | 生产者消息队列(RecordAccumulator)总大小,默认32m |
batch.size | 消息发送缓冲区大小,默认 16k,提高该值可增加吞吐量,但会增加传输时延 |
linger.ms | 消息发送最多缓冲时间,默认 0ms,即立即发送,生产建议 5-100ms 之间 |
acks | 0:发送后无需等待数据落盘应答,一般不使用; 1:发送后需等待Leader应答,传输日志数据等; -1/all:等待 isr 队列中的所有节点(可同步副本)应答,默认值,传输核心业务数据; |
max.in.flight. requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5次,开启幂等性时要保证该值是 1-5 。 |
retries | 发送失败后的重试次数,默认为2147483647; |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性 |
compression.type | 消息压缩格式,默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
71<dependencies>
2 <dependency>
3 <groupId>org.apache.kafka</groupId>
4 <artifactId>kafka-clients</artifactId>
5 <version>3.0.0</version>
6 </dependency>
7</dependencies>
331public class DemoProducerBySync {
2 public static void main(String[] args) throws ExecutionException, InterruptedException {
3 // 1. 连接配置
4 Properties properties = new Properties();
5 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.202.82.87:9092");
6 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
7 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
8
9 // 2. 创建生产者
10 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
11
12 // 3. 发送数据
13 for (int i = 0; i < 5; i++) {
14 // 发送
15 Future<RecordMetadata> future = kafkaProducer.send(new ProducerRecord<>("topic_name", "data" + i));
16
17 try {
18 // 同步阻塞获取发送结果
19 RecordMetadata metadata = future.get();
20
21 // 消息发送成功
22 System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区:" + metadata.partition());
23 } catch (ExecutionException exception) {
24 // 消息发送异常
25 exception.printStackTrace();
26 }
27
28 }
29
30 // 4. 关闭生产者
31 kafkaProducer.close();
32 }
33}
751// 异步发送消息
2public class DemoProducer {
3 public static void main(String[] args) throws InterruptedException {
4 // 1. 连接配置
5 Properties properties = new Properties();
6 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.202.82.87:9092");
7 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
8 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
9
10 // 自定义分区器
11 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.xxx.xxx.DemoPartitioner");
12
13 // 平衡吞吐量和传输延时
14 properties.put(ProducerConfig.ACKS_CONFIG, "1"); // 只等待Leader应答
15 properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区大小
16 properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批次大小
17 properties.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 最大缓冲时间(ms)
18 properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 压缩格式
19
20 // 2. 创建生产者
21 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
22
23 // 3. 发送数据
24 for (int i = 0; i < 5; i++) {
25 kafkaProducer.send(new ProducerRecord<>("topic_name", "data" + i),
26 // 回调函数(可选):在收到 ack 时触发
27 new Callback() {
28
29 public void onCompletion(RecordMetadata metadata, Exception exception) {
30
31 // 消息发送异常
32 if (exception != null) {
33 exception.printStackTrace();
34 return;
35 }
36
37 // 消息发送成功
38 System.out.println(" 主 题 : " + metadata.topic() + "->" + "分区:" + metadata.partition());
39 }
40 });
41
42 // 模拟业务延时
43 Thread.sleep(2);
44 }
45
46 // 4. 关闭生产者
47 kafkaProducer.close();
48 }
49}
50
51// 自定义分区器
52public class DemoPartitioner implements Partitioner {
53
54 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
55
56 // 获取消息值
57 String msgValues = value.toString();
58
59 // 将vip开头的消息发送到 1 分区
60 if (msgValues != null && msgValues.startsWith("vip:")) {
61 return 1;
62 }
63
64 return 0;
65 }
66
67
68 public void close() {
69 }
70
71
72 public void configure(Map<String, ?> configs) {
73 }
74}
75
注意:
- 除了自定义分区器外,
ProducerRecord
类可以直接指定消息发送的分区或消息key。- 默认的
DefaultPartitioner
会优先使用 消息Key 的 hash 值对主题分区数进行取余发送。- 如果既未设置自定义分区器,又未设置消息 Key,则会使用随机粘性分区器(随机分区,但会优先填满目前分区的当前批次)。
381public class DemoProducerByTranaction {
2 public static void main(String[] args) {
3 // 1. 连接配置
4 Properties properties = new Properties();
5 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
6 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
7 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
8 properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01"); // 【重要】指定事务id
9
10 // 2. 创建生产者
11 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
12
13 // 3. 开启事务
14 kafkaProducer.initTransactions();
15 kafkaProducer.beginTransaction();
16
17 try {
18 // 4. 发送数据
19 for (int i = 0; i < 5; i++) {
20 kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i));
21 }
22
23 // 模拟业务异常
24 int i = 1 / 0;
25
26 // 5. 提交事务
27 kafkaProducer.commitTransaction();
28 } catch (Exception e) {
29
30 // 5. 回滚事务
31 kafkaProducer.abortTransaction();
32 } finally {
33
34 // 6. 关闭生产者
35 kafkaProducer.close();
36 }
37 }
38}
注意:
- 进行事务控制时, 必须开启幂等性,即
enable.idempotence
设置为true
。- 必须定义一个唯一的
transactional.id
,这样即使生产者客户端挂掉了,重启后也能继续处理未完成的事务。
41<dependency>
2 <groupId>org.springframework.kafka</groupId>
3 <artifactId>spring-kafka</artifactId>
4</dependency>
91# application.propeties
2
3# 指定 kafka 的地址
4spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
5
6# 【生产者】指定 key 和 value 的序列化器
7spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
8spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
9
131
2public class ProducerController {
3 // Kafka 模板用来向 kafka 发送数据
4
5 KafkaTemplate<String, String> kafka;
6
7 "/atguigu") (
8 public String data(String msg) {
9 kafka.send("first", msg);
10 return "ok";
11 }
12}
13
注意:
- 消费者在 zk 中注册,但生产者不在 zk 中注册。
参数名称 | 参数说明 |
---|---|
replica.lag.time.max.ms | Follower 向 Leader 发送通信请求/数据的最大间隔,默认30s,超时则会被踢出 ISR 队列 |
auto.leader.rebalance.enable | 自动 Leader Partition 平衡,默认为true |
leader.imbalance.per.broker.percentage | 每个 broker 允许的不平衡 leader 的比率,默认为10%,超过后将会触发 leader 的平衡 |
leader.imbalance.check.interval.seconds | 检查 leader 负载是否平衡的间隔时间,默认300s |
log.segment.bytes | 日志块大小,默认1G |
log.index.interval.bytes | 日志索引间隔,默认4kb,即每写4kb日志文件,则在* index 文件*记录一个索引 |
log.retention.hours | 日志保存的时间,小时级别,默认 7 天 |
log.retention.minutes | 日志保存的时间,分钟级别,默认关闭 |
log.retention.ms | 日志保存的时间,毫秒级别,默认关闭 |
log.retention.check.interval.ms | 检查日志是否过期的间隔,默认是 5 分钟 |
log.retention.bytes | 日志保存的大小,默认-1,表示无穷大 |
log.cleanup.policy | 日志清理策略,默认是 delete,还可设置为 compact,使用压缩策略。 |
num.io.threads | 写磁盘的线程数,默认为8。这个参数值要占总核数的 50%。 |
num.replica.fetchers | 副本拉取线程数,这个参数占总核数的 50% 的 1/3 |
num.network.threads | 数据传输线程数,默认为3,这个参数占总核数的50%的 2/3 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。 |
log.flush.interval.ms | 每隔多久刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
Controller Leader 负责管理集群 broker的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。
提示:
- 使用 kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index 可以查看日志和索引文件。
- Broker相关参数可设置文件清理策略,按时间清理或按日志大小清理等。
消费者以消费者组为单位采用 拉(pull)模式从 broker 中读取数据,每个分区在同一时间只能由组中的一个消费者进行读取,但是不同组可以同时消费这个分区。
注意:
- 拉模式可以由消费者控制消息消费的速率,但是消费者可能会在等待消息的“长轮询”中被阻塞。
参数名称 | 参数说明 |
---|---|
bootstrap.servers | 建立连接用到的Broker列表,以逗号分隔,如:10.201.65.21:9092,10.201.65.22:9092,10.201.65.23:9092; |
key.deserializer | 接收消息时 Key 的序列化器(全类名形式) |
value.deserializer | 接收消息时 Value 的序列化器(全类名形式) |
group.id | 标记消费者所属的消费者组 |
enable.auto.commit | 消费者是否向服务器提交偏移量,默认值为 true |
auto.commit.interval.ms | 消费者向服务器提交偏移量的频率,默认 5s |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在的处理方式: earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数,默认是 50 个分区 |
heartbeat.interval.ms | 消费者和 coordinator 之间的心跳时间,默认 3s,建议小于 session.timeout.ms 的 1/3 |
session.timeout.ms | 消费者和 coordinator 之间连接超时时间,默认45s,超过则移除该消费者,消费者组执行再平衡 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟,超过则移除该消费者,消费者组执行再平衡 |
fetch.min.bytes | 消费者提取消息最小字节数,默认 1 个字节 |
fetch.max.bytes | 消费者提取消息最大字节数,默认52428800(50 m) |
fetch.max.wait.ms | 消费者提取消息最长等待时间,默认500ms |
max.poll.records | 一次 poll拉取数据返回消息的最大条数,默认是 500 条。 |
partition.assignment.strategy | 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。 Kafka 可以同时使用多个分区分配策略,可选:Range、RoundRobin、Sticky、CooperativeSticky。 |
71<dependencies>
2 <dependency>
3 <groupId>org.apache.kafka</groupId>
4 <artifactId>kafka-clients</artifactId>
5 <version>3.0.0</version>
6 </dependency>
7</dependencies>
421public class DemoConsumer {
2 public static void main(String[] args) {
3
4 // 1. 连接配置
5 Properties properties = new Properties();
6 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
7 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
8 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
9
10 // 消费者组ID(必须配置)
11 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test5");
12
13 // 设置分区分配策略
14 properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor");
15
16 // 手动提交offset
17 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
18
19 // 2. 创建消费者
20 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
21
22 // 3. 订阅主题
23 kafkaConsumer.subscribe(new ArrayList<String>() {{
24 add("first");
25 }});
26
27 // 4. 消费数据
28 while (true) {
29 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
30
31 // 执行业务
32 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
33 System.out.println(consumerRecord);
34 }
35
36 // 异步提交offset
37 kafkaConsumer.commitAsync(); // 可能会失败
38 // kafkaConsumer.commitSync(); // 同步提交,失败重试
39 }
40 }
41}
42
331public class DemoConsumerPartition {
2 public static void main(String[] args) {
3 // 1. 连接配置
4 Properties properties = new Properties();
5 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
6 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
7 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
8
9 // 消费者组ID(必须配置)
10 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
11
12 // 自动提交offset,默认为true
13 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
14 properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 自动提交 offset 时间间隔,默认5s
15
16 // 2. 创建消费者
17 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
18
19 // 3. 订阅主题指定分区
20 kafkaConsumer.assign(new ArrayList<TopicPartition>() {{
21 add(new TopicPartition("first", 0)); // first主题的 0 分区
22 }});
23
24 // 4. 消费数据
25 while (true) {
26 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
27 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
28 System.out.println(consumerRecord);
29 }
30 }
31 }
32}
33
431
2public class DemoConsumerSeek {
3 public static void main(String[] args) {
4
5 // 1. 连接配置
6 Properties properties = new Properties();
7 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
8 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
9 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
10
11 // 消费者组ID(必须配置)
12 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
13
14 // 2. 创建消费者
15 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
16
17 // 3. 订阅主题
18 kafkaConsumer.subscribe(new ArrayList<String>() {{
19 add("first");
20 }});
21
22 // 4. 查询消费者分区分配信息(有了分区分配信息才能开始消费)
23 Set<TopicPartition> assignment = kafkaConsumer.assignment();
24 while (assignment.size() == 0) {
25 kafkaConsumer.poll(Duration.ofSeconds(1));
26 assignment = kafkaConsumer.assignment();
27 }
28
29 // 5. 设置消从指定 offset 开始消费(对所有分区设置)
30 for (TopicPartition topicPartition : assignment) {
31 kafkaConsumer.seek(topicPartition, 600);
32 }
33
34 // 6. 消费数据
35 while (true) {
36 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
37 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
38 System.out.println(consumerRecord);
39 }
40 }
41 }
42}
43
521public class DemoConsumerSeekTime {
2 public static void main(String[] args) {
3 // 1. 连接配置
4 Properties properties = new Properties();
5 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
6 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
7 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
8
9 // 消费者组ID(必须配置)
10 properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
11
12 // 2. 创建消费者
13 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
14
15 // 3. 订阅主题
16 kafkaConsumer.subscribe(new ArrayList<String>() {{
17 add("first");
18 }});
19
20 // 4. 查询消费者分区分配信息(有了分区分配信息才能开始消费)
21 Set<TopicPartition> assignment = kafkaConsumer.assignment();
22 while (assignment.size() == 0) {
23 kafkaConsumer.poll(Duration.ofSeconds(1));
24 assignment = kafkaConsumer.assignment();
25 }
26
27 // 5. 查询各分区 1 天前的 offset
28 HashMap<TopicPartition, Long> timeMap = new HashMap<>();
29 for (TopicPartition topicPartition : assignment) {
30 timeMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); // 每个分区都从前 1 天开始消费
31 }
32 Map<TopicPartition, OffsetAndTimestamp> offsetMap = kafkaConsumer.offsetsForTimes(timeMap); // 获取每个分区前 1 天时的 offset
33
34 // 6. 设置消从指定 offset 开始消费(对所有分区设置)
35 for (TopicPartition topicPartition : assignment) {
36 // 获取 1 天前的分区 offset 并设置
37 OffsetAndTimestamp offsetAndTimestamp = offsetMap.get(topicPartition);
38 if (offsetAndTimestamp != null) {
39 kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
40 }
41 }
42
43 // 7. 消费数据
44 while (true) {
45 ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
46 for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
47 System.out.println(consumerRecord);
48 }
49 }
50 }
51}
52
131// 自动提交offset,默认为true
2properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
3properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); // 自动提交 offset 时间间隔,默认5s
4
5// 手动提交offset
6properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
7
8// 异步提交offset
9// 没有失败重试机制,故有可能提交失败
10kafkaConsumer.commitAsync();
11// 同步提交offset
12// 同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败)
13kafkaConsumer.commitSync();
提示:
- 消费 offset 保存在内置主题__consumer_offsets中,key 是 group.id+topic+分区号,value 就是当前 offset 的值。
41<dependency>
2 <groupId>org.springframework.kafka</groupId>
3 <artifactId>spring-kafka</artifactId>
4</dependency>
111# application.propeties
2
3# 指定 kafka 的地址
4spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
5
6# 【消费者】指定 key 和 value 的反序列化器
7spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
8spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
9
10# 【消费者】指定消费者组的 group_id
11spring.kafka.consumer.group-id=atguigu
81
2public class KafkaConsumer {
3 // 指定要监听的 topic
4 topics = "first") (
5 public void consumeTopic(String msg) {
6 System.out.println(" 收到的信息: " + msg);
7 }
8}
Kafka-Eagle 框架(https://www.kafka-eagle.org/)可以监控 Kafka 集群的整体运行情况,在生产环境中经常使用。
在 Kafka 现有架构中(左图),元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。
而在 kraft 模式架构中(右图,实验性),不再依赖 zookeeper 集群,而是用三台 controller 节点 代替 zookeeper,元数据保存在controller 中,由 controller 直接进行 Kafka集群管理。 这样做的好处有以下几个: