一、Kafka概述
Kafka是一个分布式、可横向扩展的消息队列,是一种高吞吐量的分布式发布订阅消息系统。Kafka的设计目标是:将发布者与订阅者解耦,同时提高消息处理速度。Kafka是用Scala编写的,但是它支持多种编程语言。
二、Kafka架构
Kafka的架构中有四个角色:生产者,消费者,Kafka Broker和ZooKeeper。
1. 生产者
Kafka生产者将消息发布到Kafka Broker。生产者将消息发送到指定的Topic。Topic是写入消息的主题,“生产者”发布的每条消息都属于某个主题。生产者必须知道应该将消息发送到哪个Topic中。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class KafkaProducerExample {
public static void main(String[] args) throws Exception{
String topicName = "my-topic";
String key = "my-key";
String value = "my-value";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer
producer = new KafkaProducer
(props);
ProducerRecord
record = new ProducerRecord
(topicName, key, value);
producer.send(record);
producer.close();
}
}
2. 消费者
消费者可以订阅一个或多个Topic,并消费其中的消息。Kafka消费者将分区中的消息读取出来,按照顺序消费。Kafka消费者在处理消息时是有状态的,需要记录已消费消息的偏移量。消费者可以设置偏移量的提交方式,是同步式还是异步式。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) throws Exception{
String topicName = "my-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer
consumer = new KafkaConsumer
(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords
records = consumer.poll(100);
for (ConsumerRecord
record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
3. Kafka Broker
Kafka Broker是Kafka集群中的一个,是一个Kafka Server实例。Broker接收生产者发布的消息,并将消息写入磁盘上的一个或多个分区中。Broker还提供了消费者可以从其订阅的Partition中读取消息的服务。
4. ZooKeeper
Kafka依赖ZooKeeper来完成元数据管理、Leader选举、分区管理等任务。ZooKeeper还支持监视Kafka Broker和Consumer的状态。
三、Kafka分区和副本
Kafka中的Topic分为多个Partition,每个Partition可以分配多个Replica(副本)。Partition和Replica的数量可以在创建Topic时进行配置。消息Producer将消息通过轮询算法发送到一个Partition中的某个Replica。
1. Partition
Partition用于较大的Topic。Kafka中所有消息都必须属于某个Topic,但是如果一个Topic的消息量很大,需要很高的处理能力来处理每个Message,那么就可以采用Partition的方式来划分Message。Partition将一个Topic中的所有消息分为若干个区,每一个区中的消息互不干扰。可以将每个Partition存储在一个文件中。
2. Replica
副本可以提高读写的性能,副本的作用就是让数据更可靠。Kafka中的Replica(副本)是指能够复制Partition内容的节点,同一个Topic中,不同Partition的Replica可以配置成不同的个数(即Replication Factor)。一个Partition的所有副本被称为一个副本集。如果在一个Broker中存储了多个Partition的多个副本,同一个Broker中相同Partition的副本不能保存在同一个磁盘路径下。
四、Kafka的可靠性
Kafka的可靠性分为两个方面:Producer的可靠性和Consumer的可靠性。
1. Producer的可靠性
Kafka保证消息不会丢失是通过副本策略实现的。Producer向Broker中发送消息,Broker接收到消息后将消息写入磁盘,并同时往一个或多个其它Broker的Replica中也写入消息。只有将消息发送给所有的Replica后才返回ACK,表示消息已经保存成功,这时Producer才能确定消息已经被保存下来。
2. Consumer的可靠性
Kafka提供了两种Commit的方式:自动Commit和手动Commit。
- 自动Commit:Consumer在poll方法的过程中可以选择自动提交偏移量,但是这种方式可能会出现宕机丢失数据的情况。
- 手动Commit:Consumer在处理消息后,可以手动提交偏移量。如果偏移量提交失败的话,Consumer将会等待一段时间后再次提交,直到提交成功。