您的位置:

使用kafkak8s为中心打造分布式消息系统

一、kafkak8s介绍


Kafka是一种分布式的基于发布/订阅模式的消息队列,用Scala语言编写,由Apache软件基金会开发。Kafka是一种快速的、可扩展的、设计内部极具容错性的消息队列系统,最初由LinkedIn公司开发,后来成为Apache项目的一部分。

Kafka8s是将Kafka与Kubernetes编排系统完美融合的产物。它利用Kubernetes的强大特性,轻松管理Kafka的部署、伸缩和升级过程,提高Kafka在容器时代的适用性。

Kafka8s架构如下图所示:

使用kafkak8s为中心打造分布式消息系统

二、部署Kafka集群

使用Kafka8s部署Kafka集群非常简单。只需要编写一个Kafka集群的配置文件,然后使用Kubernetes API将其部署到Kubernetes集群即可。

以下是一个Kafka集群的配置文件示例:


apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-kafka
spec:
  kafka:
    version: 2.7.0
    replicas: 3
    listeners:
      plain: {}
      tls: {}
    config:
      auto.create.topics.enable: "true"
      offsets.topic.replication.factor: 3
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
    jvmOptions:
      "-Xms": "2g"
      "-Xmx": "2g"
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false

有了以上文件,只需要使用以下命令即可将Kafka集群部署到Kubernetes集群中:


kubectl create -f kafka-cluster.yaml

使用kubectl命令查看Kafka集群部署状态:


kubectl get kafka

三、消息发送和消费

使用Java编写一个简单的生产者代码向Kafka集群发送消息:


import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class MyProducer {
    public static void main(String[] args) {    
        Properties props = new Properties();
        props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);        
        ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");
        producer.send(record);     
        producer.close();   
    }
}

使用Java编写一个简单的消费者代码从Kafka集群接收消息:


import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class MyConsumer {
    public static void main(String[] args) {    
        Properties props = new Properties();
        props.put("bootstrap.servers", "my-cluster-kafka-bootstrap:9092");
        props.put("group.id", "my-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("my-topic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

四、Kafka监控

使用Prometheus和Grafana可以实现对Kafka的监控。以下是一个Prometheus的配置文件的示例:


global:
  scrape_interval: 10s
  evaluation_interval: 10s
scrape_configs:
  - job_name: kafka
    metrics_path: "/metrics"
    scheme: "http"
    static_configs:
      - targets: ["my-cluster-kafka-bootstrap:8080"]
    relabel_configs:
      - source_labels: [__address__]
        regex: (.*)
        target_label: instance
        replacement: ${1}

使用以下命令启动Prometheus:


docker run -p 9090:9090 -v /path/to/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus

使用以下命令下载并安装Grafana:


curl https://grafana.com/api/plugins/grafana-piechart-panel/versions/1.6.0/download -k > /tmp/grafana-piechart-panel.zip
unzip /tmp/grafana-piechart-panel.zip -d /var/lib/grafana/plugins/

在Grafana中导入以下Dashboard模板即可:


{
  "id": null,
  "title": "Kafka Metrics",
  "panels": [
    {
      "title": "Kafka Broker Metrics",
      "type": "graph",
      "span": 12,
      "targets": [
        {
          "expr": "kafka_server_brokertopicmetrics_bytesinpersec[1m]",
          "interval": "10s",
          "legendFormat": "{{kafka_exporter_broker}}",
          "refId": "A"
        }
      ]
    },
    {
      "title": "Kafka Topic Metrics",
      "type": "table",
      "span": 12,
      "pageSize": null,
      "columns": [],
      "scroll": true,
      "showHeader": true,
      "showFullscreenControl": true,
      "dataSource": {
        "name": "Prometheus",
        "type": "prometheus",
        "url": "http://localhost:9090",
        "access": "proxy",
        "basicAuth": false
      },
      "targets": [
        {
          "expr": "kafka_topic_partition_current_offset{topic=\"my-topic\"}",
          "interval": "10s",
          "legendFormat": "",
          "refId": "A"
        },
        {
          "expr": "kafka_topic_partition_end_offset{topic=\"my-topic\"}",
          "interval": "10s",
          "legendFormat": "",
          "refId": "B"
        },
        {
          "expr": "rate(kafka_server_brokertopicmetrics_bytesinpersec{topic=\"my-topic\"}[1m])",
          "interval": "10s",
          "legendFormat": "",
          "refId": "C"
        }
      ]
    }
  ],
  "schemaVersion": 22,
  "version": 0
}

五、Kafka集群伸缩

使用Kafka8s可以轻松地进行Kafka集群的伸缩。只需要修改Kafka集群配置文件中的replicas值,然后使用以下命令即可更新Kafka集群:


kubectl apply -f kafka-cluster.yaml

六、总结

Kafka8s是一个非常好用的分布式消息系统的部署方案,在容器时代具有很高的适用性。使用Kafka8s,我们可以轻松地部署和管理Kafka集群,同时也能够方便地进行监控和伸缩操作。