一、Kafka幂等性原理
Kafka幂等性保证了消息在发送和消费的过程中不会重复或丢失,其核心原理是基于唯一消息标识符以及多个产生者实例或消费者实例之间的协调。
Kafka中的每一个消息都有一个全局唯一的消息标识符,称之为消息的唯一标识符(Message UUID)或者消息的Sequence ID。这个标识符是Kafka Broker自动创建的,可以保证消息的顺序性和唯一性。
当开启Kafka幂等性时,每个生产者都会带上自己的唯一ID并向Kafka Broker发送消息,Broker会根据唯一标识符判断是否重复;当消费者消费消息时,会自动提交消息的位移偏移量,如果出现重复消费,消费者会自动回滚到上一次提交的位移。
二、Kafka幂等性开启
Kafka幂等性默认是关闭的,可以通过在Producer配置中添加enable.idempotence=true来开启,如下所示:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("enable.idempotence", "true"); // 开启幂等性
props.put("retries", "3");
props.put("batch.size", "16384");
props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer
producer = new KafkaProducer<>(props);
Kafka幂等性只在acks=all的情况下生效,因为只有在确认所有副本均已写入消息后,才会返回成功响应。
三、Kafka幂等性参数
Kafka幂等性需要额外的参数支持,如下所示:
max.in.flight.requests.per.connection:每个连接的最大并发请求数,默认是5,如果设置为1,则每次只发送一条消息。
retries:重试次数,默认为Integer.MAX_VALUE。
delivery.timeout.ms:允许消息传输的最长时间,默认为2分钟。
四、Kafka幂等性消费
当消费者接收消息时,需要正确地处理幂等性,不仅仅需要考虑消费端的幂等性,还需要考虑生产端的幂等性,如下所示:
public void onMessage(List
> records, Acknowledgment acknowledgment) {
Map
offsetsToCommit = new HashMap<>();
for (ConsumerRecord
record : records) {
if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判断是否是第一次消费消息
processRecord(record);
offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // 提交消费位移
} else {
logger.info("Duplicated message received: {}", record.value());
}
}
acknowledge.acknowledge(); // 手动提交位移
if (!offsetsToCommit.isEmpty()) {
kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
}
}
在消费端,通过判断消息的headers中是否有惟一id,来判断该消息是否已经被消费,如果已经被消费,则不进行消费操作;否则,处理该消息并提交消费位移。
五、Kafka幂等性配置
除了上述用于开启幂等性的配置参数,Kafka还有一些其他的配置参数可以帮助我们更好地控制和管理Kafka幂等性,如下所示:
unclean.leader.election.enable:是否允许脏的Leader选举,默认为false。
min.insync.replicas:多少个副本需要写入消息才算成功,默认是1。
transactional.id:若要开启Kafka事务,则需要用到这个参数。
六、Kafka幂等性作用
Kafka幂等性通过去除重复消息,从而降低了系统中数据被重复消费或发生重复操作的概率,避免了一些潜在的并发问题。
此外,Kafka幂等性还可以确保数据的顺序性和完整性,从而保证数据一致性。
七、Kafka幂等性写入
在进行写入操作时,需要注意如下几点:
1、为了确保唯一标识符的可用性,可以通过实现自定义序列化器或者使用字符串作为Key。
2、为了实现幂等性,对于重复消息需要进行忽略以及失败的重新尝试。
3、为了防止消息丢失,消费者应该尽量快的完成消费并提交位移。
八、Kafka幂等性面试题
1、什么是Kafka幂等性?
2、Kafka幂等性的原理是什么?
3、如何在Kafka中开启幂等性?
4、Kafka幂等性可以保证什么?有什么作用?
5、Kafka幂等性有哪些应用场景?
6、Kafka幂等性可以通过哪些参数进行配置?
九、Kafka幂等性和事务
Kafka幂等性与事务是可以一起使用的。通过事务的支持,可以确保原子性、隔离性和持久性。
在使用事务时,需要在Producer的配置中添加transactional.id参数,并将Kafka幂等性的配置参数与事务相关参数进行配合使用,如下所示:
props.put("transaction.id", "transaction-id");
props.put("max.in.flight.requests.per.connection", 1); // 和幂等性相关的配置
props.put("retries", 3); // 和幂等性相关的配置
props.put("enable.idempotence", true); // 开启幂等性
KafkaProducer
producer = new KafkaProducer
(props);
十、Kafka幂等性跨分区选取
Kafka并不保证同一组消费者中的每个消费者都可以消费到每个分区中的每个消息,这就需要我们手动为每个消费者分配特定的分区进行消费。在Kafka中,有两种方式可以对分区进行操作,分别是手动分配和自动分配,手动分配更加灵活,但自动分配更加简便易行。
对于Kafka幂等性来说,需要注意两点:
1、在使用手动分配方式时,需要注意确保同一组消费者中,至少一个消费者消费到每个分区。
2、在使用自动分配方式时,需要在启动消费者时设置partition.assignment.strategy参数为RoundRobinAssignor。
代码示例
开启Kafka幂等性:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("enable.idempotence", "true"); // 开启幂等性
props.put("retries", "3");
props.put("batch.size", "16384");
props.put("linger.ms", "1");
props.put("buffer.memory", "33554432");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer
producer = new KafkaProducer<>(props);
消费者监听Kafka消息,并实现幂等性处理:
public void onMessage(List
> records, Acknowledgment acknowledgment) {
Map
offsetsToCommit = new HashMap<>();
for (ConsumerRecord
record : records) {
if (record.headers().lastHeader("kafka_correlation_id") == null) { // 判断是否是第一次消费消息
processRecord(record);
offsetsToCommit.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // 提交消费位移
} else {
logger.info("Duplicated message received: {}", record.value());
}
}
acknowledgment.acknowledge(); // 手动提交位移
if (!offsetsToCommit.isEmpty()) {
kafkaConsumer.commitSync(offsetsToCommit); // 提交位移
}
}
参考资料
1、https://kafka.apache.org/documentation/#producerconfigs_idempotence
2、https://kafka.apache.org/documentation/#consumerconfigs_enable.idempotence
3、https://blog.csdn.net/qq_34707550/article/details/80205655
4、https://www.jianshu.com/p/66b10a39353f