本文目录一览:
- 1、用Kafka和Java搭建的项目,Kafka管理中心在什么情况下会重复发送消息?消费端的程序接收到消息,进入方法
- 2、java工程kafka传递自定义对象,消费端获取到的是null
- 3、kafka消费者java版本读取不到消息怎么办
- 4、java客户端使用kafka时什么情况下使用kafka client和spring kafka?
- 5、使用java实现kafka consumer时报错
用Kafka和Java搭建的项目,Kafka管理中心在什么情况下会重复发送消息?消费端的程序接收到消息,进入方法
非手动提交offset
消费者只要读取到数据,就会修改offset,不需要方法体执行完
手动提交
需要手动提交代码执行完毕
针对你的问题,情况有很多种可能。
你是否开启手动提交offset
你的消费者,有几个?是否是同一个组?
java工程kafka传递自定义对象,消费端获取到的是null
3. 启服务
3.1 启zookeeper
启zk两种式第种使用kafka自带zk
bin/zookeeper-server-start.sh config/zookeeper.properties
另种使用其zookeeper位于本机位于其址种情况需要修改config面sercer.properties面zookeeper址
例zookeeper.connect=10.202.4.179:2181
3.2 启 kafka
bin/kafka-server-start.sh config/server.properties
4.创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建名testtopic副本区
通list命令查看刚刚创建topic
bin/kafka-topics.sh -list -zookeeper 10.202.4.179:2181
5.启producer并发送消息启producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启发送消息
比
test
hello boy
按Ctrl+C退发送消息
6.启consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
启consumerconsole看producer发送消息
启两终端发送消息接受消息
都行查看zookeeper进程kafkatopic步步排查原吧
kafka消费者java版本读取不到消息怎么办
Kafka的生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
java客户端使用kafka时什么情况下使用kafka client和spring kafka?
spring-kafka 是基于 java版的 kafka client与spring的集成,提供了 KafkaTemplate,封装了各种方法,方便操作
所以你使用spring的情况下,可以用spring-kafka,当然直接用kafka client也行
使用java实现kafka consumer时报错
public static void consumer(){
Properties props = new Properties();
props.put("zk.connect", "hadoop-2:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "fans_group");
// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
MapString, Integer map = new HashMapString, Integer();
map.put("fans", 1);
// create 4 partitions of the stream for topic “test”, to allow 4 threads to consume
MapString, ListKafkaStreamMessage topicMessageStreams = consumerConnector.createMessageStreams(map);
ListKafkaStreamMessage streams = topicMessageStreams.get("fans");
// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(1);
long startTime = System.currentTimeMillis();
// consume the messages in the threads
for(final KafkaStreamMessage stream: streams) {
executor.submit(new Runnable() {
public void run() {
ConsumerIteratorMessage it = stream.iterator();
while (it.hasNext()){
log.debug(byteBufferToString(it.next().message().payload()));
}
}
});
log.debug("use time="+(System.currentTimeMillis()-startTime));
}
}