一、kafka概述
Kafka是一个开源的消息系统,最初由LinkedIn公司开发并贡献给Apache基金会。Kafka的设计目标是使得能够处理大规模的实时数据流,以及向多个客户端提供高吞吐量的数据流。Kafka除了作为消息系统使用,还可以用作存储系统,即将Kafka作为数据存储工具使用,这是由于Kafka的持久性、可扩展性和快速的读写性能。
Kafka的核心是由一个或多个broker和ZooKeeper协调器组成的集群。生产者将消息发送到Kafka的topic,消费者从topic消费消息。每个broker在本地存储消息,同时通过复制和分区机制实现高可靠性、扩展性和吞吐量。
二、kafka的消息模型
Kafka的消息模型是基于发布/订阅模式的,在Kafka中被称为topic。生产者将消息发送到topic,消费者从topic消费消息。一个topic可以有多个订阅者,而生产者在将消息发送到topic时不必知道哪些订阅者将会接收到这些消息。Kafka还可以支持动态创建topic。
Kafka的topic可以由一个或多个partition组成,每个partition在存储上是一个独立的分片,也就是说一个topic可以跨越多个broker。每个partition内的消息是有序的,并且在单个broker上具有高吞吐量。同时,由于partition的数量可以随意增加,因此Kafka可以轻松地扩展存储和吞吐量。
对于一个topic,Kafka提供了多个消费者group(消费者组),一个消费者组内可以有多个消费者。对于同一个消费者组内的多个消费者来说,它们会从不同的partition中消费消息,消费进度是独立的。
三、kafka的核心设计
1、分布式的存储与分区机制
public class Partition {
private int partitionId;
private Broker leader;
private List
replicas;
public Partition(int partitionId, Broker leader, List
replicas) {
this.partitionId = partitionId;
this.leader = leader;
this.replicas = replicas;
}
}
Kafka的消息通过partition进行分区,每个partition可以由多个broker进行复制,对于每个partition而言,只有其中一个broker是leader,其余的broker都是follower。leader负责接收生产者的消息,并将消息写入到本地存储中,同时将消息发送给follower以进行备份。如果leader宕机了,Kafka会自动将其中一个follower升级为leader。
2、高吞吐量的消息读写
public class Producer {
private KafkaClient kafkaClient;
public Producer(KafkaClient kafkaClient) {
this.kafkaClient = kafkaClient;
}
public void send(String topic, String message) {
Message messageToSend = new Message(message);
kafkaClient.send(messageToSend, topic);
}
}
Kafka的生产者将消息发送到topic,消费者从topic消费消息。Kafka的重要设计之一是零拷贝(Zero Copy),在网络传输和磁盘IO中尽可能地避免数据的复制,从而提升IO效率。
3、多副本与可扩展性
public class Broker {
private int brokerId;
private boolean isLeader;
public Broker(int brokerId, boolean isLeader) {
this.brokerId = brokerId;
this.isLeader = isLeader;
}
public boolean isLeader() {
return isLeader;
}
}
Kafka的吞吐量可以通过添加更多的broker来进行水平扩展。每个broker存储一个或多个topic的partition数据。Kafka的副本机制保证了消息的可靠性,分布式系统中,单个broker的故障不会影响整个系统的可用性。
四、kafka实战
1、安装和配置Kafka
# 安装Kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.2/kafka_2.13-2.6.2.tgz
tar -xzvf kafka_2.13-2.6.2.tgz
cd kafka_2.13-2.6.2
# 配置Kafka
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
# 修改配置
vi config/server-1.properties
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs-1
vi config/server-2.properties
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs-2
2、创建Topic
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test
3、启动Kafka
# 以后台方式启动Kafka
./bin/kafka-server-start.sh -daemon config/server.properties
# 启动server-1
./bin/kafka-server-start.sh -daemon config/server-1.properties
# 启动server-2
./bin/kafka-server-start.sh -daemon config/server-2.properties
4、生产者和消费者
# 启动控制台生产者
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
# 启动控制台消费者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
五、总结
通过本文对Kafka的深入解析,我们了解了Kafka的概念、消息模型、核心设计和实战操作。Kafka具有高可靠性、高扩展性和高吞吐量等优点,可以帮助我们处理实时数据流,从而在数据处理和存储领域发挥重要作用。