一、安装Java
在CentOS 7系统中安装Kafka需要先安装Java,可以选择OpenJDK或者Oracle JDK,本文以安装OpenJDK为例。
sudo yum install java-1.8.0-openjdk -y
java -version
执行上述命令即可完成安装,通过java -version确认Java是否安装成功。
二、安装Kafka
1.下载Kafka
从官网下载最新的Kafka发布版本:https://kafka.apache.org/downloads,本文以kafka_2.12-2.7.0.tgz为例进行示范。
wget https://dlcdn.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz
2.解压Kafka并设置环境变量
tar -xzf kafka_2.12-2.7.0.tgz
sudo mv kafka_2.12-2.7.0 /usr/local/kafka
echo "export KAFKA_HOME=/usr/local/kafka" >> ~/.bashrc
echo "export PATH=$PATH:$KAFKA_HOME/bin" >> ~/.bashrc
source ~/.bashrc
执行上述命令,将Kafka解压到/usr/local/kafka目录,并配置环境变量。
三、启动和测试Kafka
1.启动Kafka服务
使用以下命令启动Kafka服务:
cd $KAFKA_HOME
bin/zookeeper-server-start.sh config/zookeeper.properties
等待ZooKeeper启动后,新开一个终端启动Kafka服务:
cd $KAFKA_HOME
bin/kafka-server-start.sh config/server.properties
2.创建Topic
使用以下命令创建一个Topic:
bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092
执行以上命令即可创建一个Topic,名为my-topic。
3.发布和消费消息
使用以下命令在Topic中发布消息:
bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
然后在控制台中输入需要发布的消息。使用以下命令消费消息:
bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092
执行以上命令即可消费消息。
四、配置Kafka
1.修改Kafka配置
编辑Kafka配置文件config/server.properties:
sudo vi $KAFKA_HOME/config/server.properties
可以修改默认的Kafka配置,例如修改broker的id、日志目录等,也可以添加自定义的配置。
2.配置单机多节点Kafka集群
在多台机器上安装Kafka并配置其为一个集群,需要在每个Kafka节点上修改配置。
比如在两台机器上搭建Kafka集群:zk01(192.168.1.1)、zk02(192.168.1.2),配置方式如下:
在zk01上修改配置文件
sudo vi $KAFKA_HOME/config/server.properties
修改以下属性,其他属性根据需要进行配置:
broker.id=0
listeners=PLAINTEXT://192.168.1.1:9092
advertised.listeners=PLAINTEXT://192.168.1.1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2181
zookeeper.connection.timeout.ms=6000
在zk02上修改配置文件
sudo vi $KAFKA_HOME/config/server.properties
修改以下属性,其他属性根据需要进行配置:
broker.id=1
listeners=PLAINTEXT://192.168.1.2:9092
advertised.listeners=PLAINTEXT://192.168.1.2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2181
zookeeper.connection.timeout.ms=6000
修改完成后,分别在两台机器上启动Kafka服务即可搭建Kafka集群。