一、kafkak8s介绍
Kafka是一种分布式的基于发布/订阅模式的消息队列,用Scala语言编写,由Apache软件基金会开发。Kafka是一种快速的、可扩展的、设计内部极具容错性的消息队列系统,最初由LinkedIn公司开发,后来成为Apache项目的一部分。
Kafka8s是将Kafka与Kubernetes编排系统完美融合的产物。它利用Kubernetes的强大特性,轻松管理Kafka的部署、伸缩和升级过程,提高Kafka在容器时代的适用性。
Kafka8s架构如下图所示:
二、部署Kafka集群
使用Kafka8s部署Kafka集群非常简单。只需要编写一个Kafka集群的配置文件,然后使用Kubernetes API将其部署到Kubernetes集群即可。
以下是一个Kafka集群的配置文件示例:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-kafka
spec:
kafka:
version: 2.7.0
replicas: 3
listeners:
plain: {}
tls: {}
config:
auto.create.topics.enable: "true"
offsets.topic.replication.factor: 3
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
- id: 1
type: persistent-claim
size: 100Gi
deleteClaim: false
jvmOptions:
"-Xms": "2g"
"-Xmx": "2g"
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
有了以上文件,只需要使用以下命令即可将Kafka集群部署到Kubernetes集群中:
kubectl create -f kafka-cluster.yaml
使用kubectl命令查看Kafka集群部署状态:
kubectl get kafka
三、消息发送和消费
使用Java编写一个简单的生产者代码向Kafka集群发送消息:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MyProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
producer.send(record);
producer.close();
}
}
使用Java编写一个简单的消费者代码从Kafka集群接收消息:
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singleton("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
四、Kafka监控
使用Prometheus和Grafana可以实现对Kafka的监控。以下是一个Prometheus的配置文件的示例:
global:
scrape_interval: 10s
evaluation_interval: 10s
scrape_configs:
- job_name: kafka
metrics_path: "/metrics"
scheme: "http"
static_configs:
- targets: ["my-cluster-kafka-bootstrap:8080"]
relabel_configs:
- source_labels: [__address__]
regex: (.*)
target_label: instance
replacement: ${1}
使用以下命令启动Prometheus:
docker run -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
使用以下命令下载并安装Grafana:
curl https://grafana.com/api/plugins/grafana-piechart-panel/versions/1.6.0/download -k > /tmp/grafana-piechart-panel.zip
unzip /tmp/grafana-piechart-panel.zip -d /var/lib/grafana/plugins/
在Grafana中导入以下Dashboard模板即可:
{
"id": null,
"title": "Kafka Metrics",
"panels": [
{
"title": "Kafka Broker Metrics",
"type": "graph",
"span": 12,
"targets": [
{
"expr": "kafka_server_brokertopicmetrics_bytesinpersec[1m]",
"interval": "10s",
"legendFormat": "{{kafka_exporter_broker}}",
"refId": "A"
}
]
},
{
"title": "Kafka Topic Metrics",
"type": "table",
"span": 12,
"pageSize": null,
"columns": [],
"scroll": true,
"showHeader": true,
"showFullscreenControl": true,
"dataSource": {
"name": "Prometheus",
"type": "prometheus",
"url": "http://localhost:9090",
"access": "proxy",
"basicAuth": false
},
"targets": [
{
"expr": "kafka_topic_partition_current_offset{topic=\"my-topic\"}",
"interval": "10s",
"legendFormat": "",
"refId": "A"
},
{
"expr": "kafka_topic_partition_end_offset{topic=\"my-topic\"}",
"interval": "10s",
"legendFormat": "",
"refId": "B"
},
{
"expr": "rate(kafka_server_brokertopicmetrics_bytesinpersec{topic=\"my-topic\"}[1m])",
"interval": "10s",
"legendFormat": "",
"refId": "C"
}
]
}
],
"schemaVersion": 22,
"version": 0
}
五、Kafka集群伸缩
使用Kafka8s可以轻松地进行Kafka集群的伸缩。只需要修改Kafka集群配置文件中的replicas值,然后使用以下命令即可更新Kafka集群:
kubectl apply -f kafka-cluster.yaml
六、总结
Kafka8s是一个非常好用的分布式消息系统的部署方案,在容器时代具有很高的适用性。使用Kafka8s,我们可以轻松地部署和管理Kafka集群,同时也能够方便地进行监控和伸缩操作。