本文目录一览:
- 怎么使用java连接kafka
- [Kafka简介+Kafka Tool使用简介+使用实例](#Kafka简介+Kafka Tool使用简介+使用实例)
- kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!
- [Kafka系列之(4)——Kafka Producer流程解析](#Kafka系列之(4)——Kafka Producer流程解析)
- [3分钟带你彻底搞懂 Kafka](#3分钟带你彻底搞懂 Kafka)
- [使用java实现kafka consumer时报错](#使用java实现kafka consumer时报错)
怎么使用java连接kafka
把你要传递的数据转换成json字符串返回接口,然后手机端调用接口就可以获取到你要传递是值了。
Kafka简介+Kafka Tool使用简介+使用实例
详细安装访问: macOS 可以用homebrew快速安装,访问地址: 原文链接: 查看topic列表: 创建topic:
--create
:创建命令;--topic
:后面指定topic名称;--replication-factor
:后面指定副本数;--partitions
:指定分区数,根据broker的数量决定;--zookeeper
:后面指定zookeeper.connect
的zk链接 查看某个topic: Kafka 作为消息系统的一种,当然可以像其他消息中间件一样作为消息数据中转的平台。下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。
1、引入依赖
2、消息生产者
示例中用 KafkaProducer
类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。
bootstrap.servers
表示 Kafka 集群。如果集群中有多台物理服务器,则服务器地址之间用逗号分隔,比如"192.168.1.1:9092,192.168.1.2:9092"
。localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。key.serializer
和value.serializer
表示消息的序列化类型。Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的消息序列化为二进制类型,示例中是发送文本消息到服务器,所以使用的是StringSerializer
。key.deserializer
和value.deserializer
表示消息的反序列化类型。把来自 Kafka 集群的二进制消息反序列化为指定的类型,因为序列化用的是String类型,所以用StringDeserializer
来反序列化。zk.connect
用于指定 Kafka 连接 ZooKeeper 的 URL,提供了基于 ZooKeeper 的集群服务器自动感知功能,可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。 有了消息生产者之后,就可以调用send
方法发送消息了。该方法的入参是ProducerRecord
类型对象,ProducerRecord
类提供了多种构造函数形参,常见的有如下三种:ProducerRecord(topic, partition, key, value)
ProducerRecord(topic, key, value)
ProducerRecord(topic, value)
其中topic
和value
是必填的,partition
和key
是可选的。如果指定了partition
,那么消息会被发送至指定的partition
;如果没指定partition
但指定了key
,那么消息会按照hash(key)
发送到对应的partition
;如果既没指定partition
也没指定key
,那么消息会按照round-robin
模式发送(即以轮询的方式依次发送)到每一个partition
。示例中将向test-topic
主题发送三条消息。
3、消息消费者
和消息生产者类似,这里用 KafkaConsumer
类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。
bootstrap.servers
和生产者一样,表示 Kafka 集群。group.id
表示消费者的分组 ID。enable.auto.commit
表示 Consumer 的 offset 是否自动提交。auto.commit.interval.ms
用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。key.deserializer
和value.deserializer
表示用字符串来反序列化消息数据。 消息消费者使用subscribe
方法订阅了 Topic 为test-topic
的消息。Consumer 调用poll
方法来轮询 Kafka 集群的消息,一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止。如果读取到消息,则打印出消息记录的partition
,offset
,key
等。
kafka集群测试正常,但是Java连接kafka出现异常,急求大神解答!!!!!!!!!!!
首先你在链接时候检查是否代码里的IP 和端口是不是对的,端口是 broker 端口,默认 9092;
其次查看代码是生产者,看 Kafka 集群里这个主题是否存在(如果不存在,默认是配置可以自动创建,看是否将该配置修改);然后检测防火墙,相应端口是否开放(防火墙直接关也可以);检测 server.properties
文件的 listeners
是否配置,若没有将其配置好。
Kafka系列之(4)——Kafka Producer流程解析
Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。
注:
ProducerRecord
允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner
计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。
流程描述:
用户首先构建待发送的消息对象 ProducerRecord
,然后调用 KafkaProducer#send
方法进行发送。KafkaProducer
接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给 partitioner
去确定目标分区,最后追加写入到内存中的消息缓冲池 (accumulator
)。此时 KafkaProducer#send
方法成功返回。同时,KafkaProducer
中还有一个专门的 Sender
IO线程负责将缓冲池中的消息分批次发送给对应的 broker,完成真正的消息发送逻辑。
新版本的 producer 从设计上来说具有以下几个特点:
- 总共创建两个线程:执行
KafkaProducer#send
逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。 - 不同于 Scala 老版本的 producer,新版本 producer 完全异步发送消息,并提供了回调机制 (callback) 供用户判断消息是否成功发送。
batching
机制——“分批发送”机制。每个批次 (batch) 中包含了若干个 PRODUCE 请求,因此具有更高的吞吐量。- 更加合理的默认分区策略:对于无 key 消息而言,Scala 版本分区策略是一段时间内 (默认是10分钟) 将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的 producer 采用轮询的方式均匀地将消息分发到不同的分区。
- 底层统一使用基于 Selector 的网络客户端实现,结合 Java 提供的 Future 实现完整地提供了更加健壮和优雅的生命周期管理。
关键参数
batch.size
:控制一个 batch 的大小,默认是 16KB。acks
:关乎到消息持久性 (durability) 的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么?高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0,-1 和 1。linger.ms
:减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小 batch,通过引入延时的方式合并成大 batch 发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时。compression.type
:producer 所使用的压缩器,目前支持 gzip, snappy 和 lz4。压缩是在用户主线程完成的,通常都需要花费大量的 CPU 时间,但对于减少网络 IO 来说确实利器。生产环境中可以结合压力测试进行适当配置。max.in.flight.requests.per.connection
:关乎消息乱序的一个配置参数。它指定了 Sender 线程在单个 Socket 连接上能够发送未应答 PRODUCE 请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升 producer 的性能。不过笔者始终觉得其效果不如调节batch.size
来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于 1 可能造成消息发送的乱序 (先发送 A,然后发送 B,但 B 却先行被 broker 接收)。retries
:重试机制,对于瞬时失败的消息发送,开启重试后 KafkaProducer 会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。 当用户调用KafkaProducer.send(ProducerRecord, Callback)
时 Kafka 内部流程分析: 这是KafkaProducer#send
逻辑的第一步,即为待发送消息进行序列化并计算目标分区。 如上图所示,一条所属 topic 是 "test",消息体是 "message" 的消息被序列化之后结合 KafkaProducer 缓存的元数据 (比如该 topic 分区数信息等) 共同传给后面的Partitioner
实现类进行目标分区的计算。 producer 创建时会创建一个默认 32MB (由buffer.memory
参数指定) 的accumulator
缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的linger.ms
和batch.size
等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息 (batches)。该集合本质上是一个 HashMap,里面分别保存了每个 topic 分区下的 batch 队列,即前面说的批次是按照 topic 分区进行分组的。这样发往不同分区的消息保存在对应分区下的 batch 队列中。举个简单的例子,假设消息 M1, M2 被发送到 test 的 0 分区但属于不同的 batch,M3 分送到 test 的 1 分区,那么 batches 中包含的信息就是:{"test-0" - [batch1, batch2], "test-1" - [batch3]}。 单个 topic 分区下的 batch 队列中保存的是若干个消息批次。每个 batch 中最重要的 3 个组件包括:compressor
:负责执行追加写入操作batch
缓冲区:由batch.size
参数控制,消息被真正追加写入到的地方thunks
:保存消息回调逻辑的集合 这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示: 这一步执行完毕之后理论上讲KafkaProducer.send
方法就执行完毕了,用户主线程所做的事情就是等待Sender
线程发送消息并执行返回结果了。 此时,该Sender
线程登场了。严格来说,Sender
线程自KafkaProducer
创建后就一直都在运行着。它的工作流程基本上是这样的:- 不断轮询缓冲区寻找已做好发送准备的分区;
- 将轮询获得的各个 batch 按照目标分区所在的 leader broker 进行分组;
- 将分组后的 batch 通过底层创建的 Socket 连接发送给各个 broker;
- 等待服务器端发送 response 回来。
为了说明上的方便,我还是基于图的方式来解释
Sender
线程的工作原理: 上图中Sender
线程会发送 PRODUCE 请求给对应的 broker,broker 处理完毕之后发送对应的 PRODUCE response。一旦Sender
线程接收到 response 将依次 (按照消息发送顺序) 调用 batch 中的回调方法。
3分钟带你彻底搞懂 Kafka
Kafka 到底是个啥?用来干嘛的? 官方定义如下: 翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠! 实时数据处理,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。 这些中间件,最大的特点主要有两个: 在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。 但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。 随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。 采用这种模型,本质就是将要推送的数据,不在存放在当前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。 消息中间件:主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。 应用程序:只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可! 引入消息中间件之后,整个服务开发会变得更加简单,各负其责。 Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。 LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统,Kafka 由此诞生。 在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。 先来看一张图,下面这张图就是 kafka 生产与消费的核心架构模型! 如果你看不懂这些概念没关系,我会带着大家一起梳理一遍! 简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:
- 与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个分区 Partition 的概念。 这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分配到 5 个分区中,其中每个分区存储 200 条数据。 这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快! 这是 kafka 与其他的消息系统最大的不同! 和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower,即使主分区挂了,也不会影响服务的正常运行。 那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则: 与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。 这里我们需要重点了解一个名词:消费组! 考虑到多个消费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题! 但是不同的组,可以消费同一个分区的数据! 你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。 但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。 如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。 因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致! 光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。 kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。 zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk 下载zookeeper,并解压文件包 创建数据、日志目录 配置zookeeper 重新配置 dataDir 和 dataLogDir 的存储路径 最后,启动 Zookeeper 服务 到官网下载想要的版本,我这里下载是最新稳定版 2.8.0。 按需修改配置文件 server.properties(可选) server.properties 文件内容如下: 其中有四个重要的参数: 可根据自己需求修改对应的配置! 启动 kafka 服务 创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本: 运行 list topic 命令,可以看到该主题。 输出内容: Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。 运行生产者,然后在控制台中键入一些消息以发送到服务器。 输入两条内容并回车: Kafka 还有一个命令行使用者,它会将消息转储到标准输出。 输出结果如下: 本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。 由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!
使用java实现kafka consumer时报错
public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> map = new HashMap<>();
map.put("fans", 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
Map<String, List<KafkaStream<Message>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
List<KafkaStream<Message>> streams = topicMessageStreams.get("fans");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStream<Message> stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIterator<Message> it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}