java调用kafka生产者,java调用kafka接口发送数据

发布时间:2022-11-20

本文目录一览:

  1. 怎么使用java连接kafka
  2. [Kafka简介+Kafka Tool使用简介+使用实例](#Kafka简介+Kafka Tool使用简介+使用实例)
  3. kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!
  4. [Kafka系列之(4)——Kafka Producer流程解析](#Kafka系列之(4)——Kafka Producer流程解析)
  5. [3分钟带你彻底搞懂 Kafka](#3分钟带你彻底搞懂 Kafka)
  6. [使用java实现kafka consumer时报错](#使用java实现kafka consumer时报错)

怎么使用java连接kafka

把你要传递的数据转换成json字符串返回接口,然后手机端调用接口就可以获取到你要传递是值了。

Kafka简介+Kafka Tool使用简介+使用实例

详细安装访问: macOS 可以用homebrew快速安装,访问地址: 原文链接: 查看topic列表: 创建topic:

  • --create:创建命令;
  • --topic:后面指定topic名称;
  • --replication-factor:后面指定副本数;
  • --partitions:指定分区数,根据broker的数量决定;
  • --zookeeper:后面指定 zookeeper.connect 的zk链接 查看某个topic: Kafka 作为消息系统的一种,当然可以像其他消息中间件一样作为消息数据中转的平台。下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。

1、引入依赖

2、消息生产者

示例中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。

  • bootstrap.servers 表示 Kafka 集群。如果集群中有多台物理服务器,则服务器地址之间用逗号分隔,比如 "192.168.1.1:9092,192.168.1.2:9092"。localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。
  • key.serializervalue.serializer 表示消息的序列化类型。Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的消息序列化为二进制类型,示例中是发送文本消息到服务器,所以使用的是 StringSerializer
  • key.deserializervalue.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消息反序列化为指定的类型,因为序列化用的是String类型,所以用 StringDeserializer 来反序列化。
  • zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL,提供了基于 ZooKeeper 的集群服务器自动感知功能,可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。 有了消息生产者之后,就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord 类型对象,ProducerRecord 类提供了多种构造函数形参,常见的有如下三种:
  • ProducerRecord(topic, partition, key, value)
  • ProducerRecord(topic, key, value)
  • ProducerRecord(topic, value) 其中 topicvalue 是必填的,partitionkey 是可选的。如果指定了 partition,那么消息会被发送至指定的 partition;如果没指定 partition 但指定了 key,那么消息会按照 hash(key) 发送到对应的 partition;如果既没指定 partition 也没指定 key,那么消息会按照 round-robin 模式发送(即以轮询的方式依次发送)到每一个 partition。示例中将向 test-topic 主题发送三条消息。

3、消息消费者

和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。

  • bootstrap.servers 和生产者一样,表示 Kafka 集群。
  • group.id 表示消费者的分组 ID。
  • enable.auto.commit 表示 Consumer 的 offset 是否自动提交。
  • auto.commit.interval.ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。
  • key.deserializervalue.deserializer 表示用字符串来反序列化消息数据。 消息消费者使用 subscribe 方法订阅了 Topic 为 test-topic 的消息。Consumer 调用 poll 方法来轮询 Kafka 集群的消息,一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止。如果读取到消息,则打印出消息记录的 partition, offset, key 等。

kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!

首先你在链接时候检查是否代码里的IP 和端口是不是对的,端口是 broker 端口,默认 9092; 其次查看代码是生产者,看 Kafka 集群里这个主题是否存在(如果不存在,默认是配置可以自动创建,看是否将该配置修改);然后检测防火墙,相应端口是否开放(防火墙直接关也可以);检测 server.properties 文件的 listeners 是否配置,若没有将其配置好。

Kafka系列之(4)——Kafka Producer流程解析

Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。

注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。

流程描述:

用户首先构建待发送的消息对象 ProducerRecord,然后调用 KafkaProducer#send 方法进行发送。KafkaProducer 接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给 partitioner 去确定目标分区,最后追加写入到内存中的消息缓冲池 (accumulator)。此时 KafkaProducer#send 方法成功返回。同时,KafkaProducer 中还有一个专门的 Sender IO线程负责将缓冲池中的消息分批次发送给对应的 broker,完成真正的消息发送逻辑。 新版本的 producer 从设计上来说具有以下几个特点:

  • 总共创建两个线程:执行 KafkaProducer#send 逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。
  • 不同于 Scala 老版本的 producer,新版本 producer 完全异步发送消息,并提供了回调机制 (callback) 供用户判断消息是否成功发送。
  • batching 机制——“分批发送”机制。每个批次 (batch) 中包含了若干个 PRODUCE 请求,因此具有更高的吞吐量。
  • 更加合理的默认分区策略:对于无 key 消息而言,Scala 版本分区策略是一段时间内 (默认是10分钟) 将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的 producer 采用轮询的方式均匀地将消息分发到不同的分区。
  • 底层统一使用基于 Selector 的网络客户端实现,结合 Java 提供的 Future 实现完整地提供了更加健壮和优雅的生命周期管理。

关键参数

  • batch.size:控制一个 batch 的大小,默认是 16KB。
  • acks:关乎到消息持久性 (durability) 的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么?高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0,-1 和 1。
  • linger.ms:减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小 batch,通过引入延时的方式合并成大 batch 发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时。
  • compression.type:producer 所使用的压缩器,目前支持 gzip, snappy 和 lz4。压缩是在用户主线程完成的,通常都需要花费大量的 CPU 时间,但对于减少网络 IO 来说确实利器。生产环境中可以结合压力测试进行适当配置。
  • max.in.flight.requests.per.connection:关乎消息乱序的一个配置参数。它指定了 Sender 线程在单个 Socket 连接上能够发送未应答 PRODUCE 请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升 producer 的性能。不过笔者始终觉得其效果不如调节 batch.size 来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于 1 可能造成消息发送的乱序 (先发送 A,然后发送 B,但 B 却先行被 broker 接收)。
  • retries:重试机制,对于瞬时失败的消息发送,开启重试后 KafkaProducer 会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。 当用户调用 KafkaProducer.send(ProducerRecord, Callback) 时 Kafka 内部流程分析: 这是 KafkaProducer#send 逻辑的第一步,即为待发送消息进行序列化并计算目标分区。 如上图所示,一条所属 topic 是 "test",消息体是 "message" 的消息被序列化之后结合 KafkaProducer 缓存的元数据 (比如该 topic 分区数信息等) 共同传给后面的 Partitioner 实现类进行目标分区的计算。 producer 创建时会创建一个默认 32MB (由 buffer.memory 参数指定) 的 accumulator 缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的 linger.msbatch.size 等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息 (batches)。该集合本质上是一个 HashMap,里面分别保存了每个 topic 分区下的 batch 队列,即前面说的批次是按照 topic 分区进行分组的。这样发往不同分区的消息保存在对应分区下的 batch 队列中。举个简单的例子,假设消息 M1, M2 被发送到 test 的 0 分区但属于不同的 batch,M3 分送到 test 的 1 分区,那么 batches 中包含的信息就是:{"test-0" - [batch1, batch2], "test-1" - [batch3]}。 单个 topic 分区下的 batch 队列中保存的是若干个消息批次。每个 batch 中最重要的 3 个组件包括:
  • compressor:负责执行追加写入操作
  • batch 缓冲区:由 batch.size 参数控制,消息被真正追加写入到的地方
  • thunks:保存消息回调逻辑的集合 这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示: 这一步执行完毕之后理论上讲 KafkaProducer.send 方法就执行完毕了,用户主线程所做的事情就是等待 Sender 线程发送消息并执行返回结果了。 此时,该 Sender 线程登场了。严格来说,Sender 线程自 KafkaProducer 创建后就一直都在运行着。它的工作流程基本上是这样的:
  • 不断轮询缓冲区寻找已做好发送准备的分区;
  • 将轮询获得的各个 batch 按照目标分区所在的 leader broker 进行分组;
  • 将分组后的 batch 通过底层创建的 Socket 连接发送给各个 broker;
  • 等待服务器端发送 response 回来。 为了说明上的方便,我还是基于图的方式来解释 Sender 线程的工作原理: 上图中 Sender 线程会发送 PRODUCE 请求给对应的 broker,broker 处理完毕之后发送对应的 PRODUCE response。一旦 Sender 线程接收到 response 将依次 (按照消息发送顺序) 调用 batch 中的回调方法。

3分钟带你彻底搞懂 Kafka

Kafka 到底是个啥?用来干嘛的? 官方定义如下: 翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠! 实时数据处理,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。 这些中间件,最大的特点主要有两个: 在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。 但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。 随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。 采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。 消息中间件:主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。 应用程序:只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可! 引入消息中间件之后,整个服务开发会变得更加简单,各负其责。 Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。 LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统,Kafka 由此诞生。 在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。 先来看一张图,下面这张图就是 kafka 生产与消费的核心架构模型! 如果你看不懂这些概念没关系,我会带着大家一起梳理一遍! 简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:

  • 与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。 这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。 这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快! 这是 kafka 与其他的消息系统最大的不同! 和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower,即使主分区挂了,也不会影响服务的正常运行。 那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则: 与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。 这里我们需要重点了解一个名词:消费组! 考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题! 但是不同的组,可以消费同一个分区的数据! 你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。 但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。 如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。 因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致! 光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。 kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。 zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk 下载zookeeper,并解压文件包 创建数据、日志目录 配置zookeeper 重新配置 dataDir 和 dataLogDir 的存储路径 最后,启动 Zookeeper 服务 到官网下载想要的版本,我这里下载是最新稳定版 2.8.0。 按需修改配置文件 server.properties(可选) server.properties 文件内容如下: 其中有四个重要的参数: 可根据自己需求修改对应的配置! 启动 kafka 服务 创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本: 运行 list topic 命令,可以看到该主题。 输出内容: Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。 运行生产者,然后在控制台中键入一些消息以发送到服务器。 输入两条内容并回车: Kafka 还有一个命令行使用者,它会将消息转储到标准输出。 输出结果如下: 本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。 由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!

使用java实现kafka consumer时报错

public static void consumer(){
    Properties props = new Properties();
    props.put("zk.connect", "hadoop-2:2181");
    props.put("zk.connectiontimeout.ms", "1000000");
    props.put("groupid", "fans_group");
    // Create the connection to the cluster
    ConsumerConfig consumerConfig = new ConsumerConfig(props);
    ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
    Map<String, Integer> map = new HashMap<>();
    map.put("fans", 1);
    // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
    Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
    List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");
    // create list of 4 threads to consume from each of the partitions
    ExecutorService executor = Executors.newFixedThreadPool(1);
    long startTime = System.currentTimeMillis();
    // consume the messages in the threads
    for(final KafkaStream<Message> stream: streams) {
        executor.submit(new Runnable() {
            public void run() {
                ConsumerIterator<Message> it = stream.iterator();
                while (it.hasNext()){
                    log.debug(byteBufferToString(it.next().message().payload()));
                }
            }
        });
        log.debug("use time="+(System.currentTimeMillis()-startTime));
    }
}