kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka-clients.version}</version>
</dependency>
Producer
生产者基础参数
bootstrap.servers
指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个
key.serializer
broker接收消息必须以字节数组byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的类型
可以使用kafka自带的序列化工具
org.apache.kafka.common.serialization.StringSerializer
value.seriablizer
broker接收消息必须以字节数组byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的类型
可以使用kafka自带的序列化工具
org.apache.kafka.common.serialization.StringSerializer
生产者核心参数
acks
0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当 broker故障时有可能丢失数据;
1:producer等待broker的ack,分区的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):producer等待broker的ack,分区的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。
max.request.size
用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB,一般不建议改
retries
生产者重试次数,默认值0
buffer.memory
生产端消息缓冲池或缓冲区的大小,默认值 32 * 1024 * 1024L
batch.size
发送到缓冲区中的消息会被分为一个一个的batch,分批次的发送到broker端,这个参数就表示batch批次大小,默认值为16384,即16KB。因此减小batch大小有利于降低消息延时,增加batch大小有利于提升吞吐量。
linger.ms
用来控制batch最大的空闲时间,超过该时间的batch也会被发送到broker端
request.timeout.ms
表示生产端发送请求后等待broker端响应的最长时间,默认值为30000,即30s,超时生产端可能会选择重试(如果配置了retries)
Consumer
消费者基础参数
bootstrap.servers
参考生产者
key.deserializer
value.deserializer
消费者核心参数
group.id
组id,标识一个consumer组的名称
enable.auto.commit
boolean 类型,配置是否开启自动提交消费位移的功能,默认开启
auto.commit.interval.ms
当enbale.auto.commit参数设置为 true 时才生效,表示开启自动提交消费位移功能时自动提交`消费位移`的时间间隔
auto.offset.reset
当消费者读取偏移量无效的情况下,需要重置消费起始位置,可选值:"latest", "earliest", "none"
latest: 从消费者启动后生成的记录 (默认值)
earliest: 将从有效的最小位移位置开始消费
max.poll.records
消费者每次poll最大数据量,默认500
消费者消息手动提交
手动提交-sync
-- 同步提交,当前线程会阻塞直到offset提交成功
org.apache.kafka.clients.consumer.KafkaConsumer#commitSync()
手动提交-async
-- 异步没有失败重试机制,故有可能提交失败
org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync()
kafka-springboot
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
配置文件
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
# 生产者配置
producer:
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384 #16K
buffer-memory: 33554432 #32M
acks: all
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者配置
consumer:
group-id: springboot-gid-001
enable-auto-commit: true
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 3