Kafka-Java客户端


kafka-clients

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>${kafka-clients.version}</version>
</dependency>

Producer

生产者基础参数

bootstrap.servers

指定生产者客户端连接kafka集群所需的broker地址列表,格式为host1:port1,host2:port2,可以设置一个或多个

key.serializer

broker接收消息必须以字节数组byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的类型
可以使用kafka自带的序列化工具
org.apache.kafka.common.serialization.StringSerializer

value.seriablizer

broker接收消息必须以字节数组byte[]形式存在,KafkaProducer<K,V>和ProducerRecord<K,V>中的泛型就是key和value的类型
可以使用kafka自带的序列化工具

org.apache.kafka.common.serialization.StringSerializer

生产者核心参数

acks

0:producer不等待broker的ack,这一操作提供了一个最低的延迟,broker一接收到还没有写入磁盘就已经返回,当 broker故障时有可能丢失数据;
1:producer等待broker的ack,分区的leader落盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据;
-1(all):producer等待broker的ack,分区的leader和follower全部落盘成功后才返回ack。但是如果在follower同步完成后,broker发送ack之前,leader发生故障,那么会造成数据重复。

max.request.size

用来限制生产者客户端能发送的消息的最大值,默认值为1048576B,即1MB,一般不建议改

retries

生产者重试次数,默认值0

buffer.memory

生产端消息缓冲池或缓冲区的大小,默认值 32 * 1024 * 1024L

batch.size

发送到缓冲区中的消息会被分为一个一个的batch,分批次的发送到broker端,这个参数就表示batch批次大小,默认值为16384,即16KB。因此减小batch大小有利于降低消息延时,增加batch大小有利于提升吞吐量。

linger.ms

用来控制batch最大的空闲时间,超过该时间的batch也会被发送到broker端

request.timeout.ms

表示生产端发送请求后等待broker端响应的最长时间,默认值为30000,即30s,超时生产端可能会选择重试(如果配置了retries)

Consumer

消费者基础参数

bootstrap.servers

参考生产者

key.deserializer

value.deserializer

消费者核心参数

group.id

组id,标识一个consumer组的名称

enable.auto.commit

boolean 类型,配置是否开启自动提交消费位移的功能,默认开启

auto.commit.interval.ms

当enbale.auto.commit参数设置为 true 时才生效,表示开启自动提交消费位移功能时自动提交`消费位移`的时间间隔

auto.offset.reset

当消费者读取偏移量无效的情况下,需要重置消费起始位置,可选值:"latest", "earliest", "none"
latest: 从消费者启动后生成的记录 (默认值)
earliest: 将从有效的最小位移位置开始消费

max.poll.records

消费者每次poll最大数据量,默认500

消费者消息手动提交

手动提交-sync

-- 同步提交,当前线程会阻塞直到offset提交成功
org.apache.kafka.clients.consumer.KafkaConsumer#commitSync()

手动提交-async

-- 异步没有失败重试机制,故有可能提交失败
org.apache.kafka.clients.consumer.KafkaConsumer#commitAsync()

kafka-springboot

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>${spring-kafka.version}</version>
    </dependency>

配置文件

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    # 生产者配置
    producer:
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      group-id: springboot-gid-001
      enable-auto-commit: true
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 3

声明:微默网|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - Kafka-Java客户端


不以物喜,不以己悲! 不忘初心,方得始终!