您的位置:

CentOS 7安装Kafka

一、安装Java

在CentOS 7系统中安装Kafka需要先安装Java,可以选择OpenJDK或者Oracle JDK,本文以安装OpenJDK为例。

sudo yum install java-1.8.0-openjdk -y
java -version

执行上述命令即可完成安装,通过java -version确认Java是否安装成功。

二、安装Kafka

1.下载Kafka

从官网下载最新的Kafka发布版本:https://kafka.apache.org/downloads,本文以kafka_2.12-2.7.0.tgz为例进行示范。

wget https://dlcdn.apache.org/kafka/2.7.0/kafka_2.12-2.7.0.tgz

2.解压Kafka并设置环境变量

tar -xzf kafka_2.12-2.7.0.tgz
sudo mv kafka_2.12-2.7.0 /usr/local/kafka
echo "export KAFKA_HOME=/usr/local/kafka" >> ~/.bashrc
echo "export PATH=$PATH:$KAFKA_HOME/bin" >> ~/.bashrc
source ~/.bashrc

执行上述命令,将Kafka解压到/usr/local/kafka目录,并配置环境变量。

三、启动和测试Kafka

1.启动Kafka服务

使用以下命令启动Kafka服务:

cd $KAFKA_HOME
bin/zookeeper-server-start.sh config/zookeeper.properties

等待ZooKeeper启动后,新开一个终端启动Kafka服务:

cd $KAFKA_HOME
bin/kafka-server-start.sh config/server.properties

2.创建Topic

使用以下命令创建一个Topic:

bin/kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092

执行以上命令即可创建一个Topic,名为my-topic。

3.发布和消费消息

使用以下命令在Topic中发布消息:

bin/kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092

然后在控制台中输入需要发布的消息。使用以下命令消费消息:

bin/kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092

执行以上命令即可消费消息。

四、配置Kafka

1.修改Kafka配置

编辑Kafka配置文件config/server.properties:

sudo vi $KAFKA_HOME/config/server.properties

可以修改默认的Kafka配置,例如修改broker的id、日志目录等,也可以添加自定义的配置。

2.配置单机多节点Kafka集群

在多台机器上安装Kafka并配置其为一个集群,需要在每个Kafka节点上修改配置。

比如在两台机器上搭建Kafka集群:zk01(192.168.1.1)、zk02(192.168.1.2),配置方式如下:

在zk01上修改配置文件

sudo vi $KAFKA_HOME/config/server.properties

修改以下属性,其他属性根据需要进行配置:

broker.id=0
listeners=PLAINTEXT://192.168.1.1:9092
advertised.listeners=PLAINTEXT://192.168.1.1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2181
zookeeper.connection.timeout.ms=6000

在zk02上修改配置文件

sudo vi $KAFKA_HOME/config/server.properties

修改以下属性,其他属性根据需要进行配置:

broker.id=1
listeners=PLAINTEXT://192.168.1.2:9092
advertised.listeners=PLAINTEXT://192.168.1.2:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zk01:2181,zk02:2181
zookeeper.connection.timeout.ms=6000

修改完成后,分别在两台机器上启动Kafka服务即可搭建Kafka集群。