一、Kafka消息传输保障机制概述
Kafka 是一个分布式发布/订阅消息系统,其最大不同点是基于 pull 的消息传输机制,在保证高性能的同时实现了数据的可靠性。Kafka 的消息传输保障机制主要包括 3 种模式:
1、At most once:最多一次,消息发送者无论消息是否成功投递,都不会对消息进行重试,卡夫卡集群最终不一定会收到该消息。该模式的优点在于消息的延迟最小,性能最高,但是会出现消息丢失的情况,适合那些对于消息的可靠性要求不高的业务场景。在实际中,最多一次这种模式会被用来传送一些临时的状态消息,比如说心跳确认等。
2、At least once:最少一次,消息会被重试直到成功将其发送到 Kafka 集群。但是,在发送重试期间,同一条消息可能会被写入多次,会产生数据冗余,适合那些对于数据的一致性要求比较高,但是允许数据防止重复的业务场景,比如说电商平台中的订单提交。
3、Exactly once:恰好一次,这是目前 Kafka 队列的默认传输保障模式,它同时具备 At most once 和 At least once 两种模式的优点,并且没有它们的缺点。它保证了数据的一致性和可靠性,适用于金融、医疗、物流等对数据可靠性要求极高的行业。在实际中,使用此模式需要使得发送者以幂等(idempotent)的方式向 Kafka 进行数据发送,即同一条消息不会被重复投递。
二、Kafka消息传输保障机制实现
实现Kafka的消息传输保障机制需要两个关键组件:生产者 API 和消费者 API。
为了保证每一条消息都能按照想要的模式投递到 Kafka 集群,生产者 API 具备了重试机制。在发送消息时,如果网络不稳定或者 Kafka 集群出现宕机等突发状况,生产者会在配置的时间间隔内进行重试。生产者 API 还允许配置幂等性(idempotent)保证消息不重复,减少对数据的冗余写入。
消费者 API 需要做到消费的消息是通过已经成功提交的 offset 来标识的。在 Kafka 中,offset 是用来标识每一条消息,在消费数据时,使用 offset 来记录消费的位置。
三、Kafka消息传输保障机制代码示例
以下为 Kafka 的 Java API 的代码示例:
public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); } } public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("my-topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }
四、结论
本文详细分析了 Kafka 消息传输保障机制,从三个方面探讨了 Kafka 的消息传输保障机制概述、实现和示例代码。Kafka 采用 pull 模式的消息拉取机制,在保证高性能的同时也确保了消息的可靠性。阐述了 At most once、At least once 和 Exactly once 这三种不同的消息传输保障机制,并且从生产者 API 和消费者 API 这两个关键组件分别进行了阐述。最后,通过示例代码演示了如何在实际项目中使用 Kafka 的消息传输保障机制。