您的位置:

Kafka如何保证不重复消费又不丢失数据

Kafka是一个高吞吐量分布式消息系统,被广泛应用于数据传输中。在Kafka中,一些场景下需要保证消息不丢失也不被重复消费,本文将从多个方面对这个问题进行阐述。

一、消息持久化

Kafka将所有消息进行持久化存储,每个partition有相应的replication factor,即副本数量。当Producer发送消息到Kafka,它会被复制到该Partition所有副本机器上,只有在副本写入到磁盘时才会通知Producer写入成功。这意味着即使一个broker宕机,也会有其他副本来保证消息的可用性和一致性。 对于消费者而言,它只需简单地向Kafka请求下一批消息。Kafka仅在Broker端记录每个Consumer最后读取的消息Offset,这个Offset是Consumer自己记录的,这样即使Consumer下线或者重启后,可以在Offset的位置继续读取消息。 实例: ``` ProducerConfig props = new ProducerConfig(getProducerProperties()); Producer producer = new Producer (props); KeyedMessage message = new KeyedMessage ("topic","key", "value"); producer.send(message); producer.close(); ConsumerConfig props = new ConsumerConfig(getConsumerProperties()); ConsumerConnector consumer = Consumer.createJavaConsumerConnector(props); String topic = "topic"; Map topicCount = new HashMap (); topicCount.put(topic, 1); Map >> consumerStreams = consumer.createMessageStreams(topicCount); List > streams = consumerStreams.get(topic); for (KafkaStream stream : streams) { ConsumerIterator it = stream.iterator(); while (it.hasNext()) { System.out.println(new String(it.next().message())); } } ```

二、幂等性

Kafka 0.11后加入了幂等性保证,此功能可被Producer使用,以确保相同的消息能够被重复生产,但在消息分配时,每个消息只会被处理一次。这将确保即使重复发送消息,也不会导致数据损坏或消息重复,从而达到不丢失数据和不重复消费的目标。 实例: ```java Properties producerProps = new Properties(); producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); producerProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 32_768); producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); Producer producer = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()); producer.initTransactions(); try { producer.beginTransaction(); for (String data : dataList) { producer.send(new ProducerRecord<>("topic", key, data)); } producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } producer.flush(); producer.close(); ```

三、使用Consumer Group

Kafka提供了Consumer Group概念,一个Consumer Group由一组Consumer组成,共同消费一个或者一组Partition。当一个Consumer Group进行消息消费时,Partition中的每个消息只能被组中的一个Consumer消费,其他Consumer则不能再消费该Partition中的任何消息。当Consumer Group中的任何一个Consumer宕机或加入时,该Consumer Group都不会影响另外Consumer Group中的消费情况。 实例: ```java Properties props = new Properties(); props.put("group.id", "test"); props.put("bootstrap.servers", "localhost:9092"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } ```

四、使用Batch Consumer

Batch Consumer每次消费多个消息,以提高消息处理效率和优化网络IO。它可以一次消费多条消息,然后一次性提交Offset,从而能够提高效率,减少IO操作次数。 实例: ```java import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class BatchConsumerExampler { public static void main(String[] args) { final Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); try { while (true) { final ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(100)); if (consumerRecords.count() == 0) { continue; } int count = 0; for (ConsumerRecord consumerRecord : consumerRecords) { System.out.printf("ConsumerRecord:(%d, %s, %d, %d, %s)\n", consumerRecord.key(), consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.timestamp()); count++; } System.out.println("succeed to consume " + count + " records"); consumer.commitAsync(); } } finally { consumer.close(); } } } ```

五、总结

本文从消息持久化、幂等性、Consumer Group、Batch Consumer等方面对Kafka如何保证数据不重复消费又不丢失进行了详细的阐述。相信在实际工作中,针对不同场景,读者能够合理地使用这些技术,保障数据的安全性和处理效率。