一、简介
Flink Kafka Consumer是Flink中针对Kafka数据源编写的一个控制台消费程序。其主要作用是从Kafka中消费数据,将消费的数据转换成Flink中的DataStream数据流,然后通过Flink的各种算子进行数据的处理和分析。
二、使用方式
使用Flink Kafka Consumer非常简单,只需要在Flink任务中先引入flink-connector-kafka_2.12依赖,然后使用下面的方式创建一个KafkaConsumer:
FlinkKafkaConsumerkafkaConsumer = new FlinkKafkaConsumer<>("topic-name", new SimpleStringSchema(), props);
其中,topic-name是要消费的Kafka Topic名称, SimpleStringSchema是指这个Kafka Topic的数据是以字符串的形式进行编码,props是一个Properties对象,用于设置KafkaConsumer的一些配置信息,例如消费者组名称、Kafka Broker地址等等。
一旦创建好FlinkKafkaConsumer,我们就可以使用Flink的DataStream API从Kafka中消费数据了:
DataStreamstream = env.addSource(kafkaConsumer);
这样,我们就可以通过stream对消费到的Kafka Topic数据进行各种分析和处理了。
三、常用参数详解
1. properties文件的配置
在Flink任务中使用FlinkKafkaConsumer消费Kafka Topic时,需要通过设置Properties对象来配置Kafka Consumer的各种参数。下面是一些常用的参数配置:
- bootstrap.servers:指定Kafka Broker地址,格式是host:port,多个Broker地址用逗号隔开。
- group.id:指定Consumer Group的名称。
- auto.offset.reset:指定Consumer在没有offset的情况下,从何处开始消费,可以选择earliest或latest。
- enable.auto.commit:指定是否启用Auto Commit功能。
- fetch.max.bytes:指定每次从Kafka Broker拉取数据的最大字节数。
2. 反序列化器的配置
FlinkKafkaConsumer需要将Kafka Topic中的数据解码成Flink中的java对象,这个过程可以使用Kafka提供的反序列化器来完成。Flink提供了各种反序列化器,例如SimpleStringSchema、JSONDeserializationSchema等等,常用的反序列化器配置如下:
- SimpleStringSchema:用于将字符串数据解析成String类型。
- JSONDeserializationSchema:用于将JSON数据解析成POJO对象。
- AvroDeserializationSchema:用于将Avro数据解析成POJO对象。
3. 消费位置的配置
FlinkKafkaConsumer支持多种不同的消费位置,例如从最早的Offset开始消费、从最新的Offset开始消费、从指定的Offset开始消费等等。我们可以通过配置KafkaConsumer的一个属性来指定消费位置。
- auto.offset.reset:当第一次启动一个Consumer时,如果没有指定一个初始的消费位置,那么Consumer会自动根据这个属性来设置消费位置。
- assign和subscribe:通过手动指定Topic Partition的Offset来设置消费位置。
- seek:在运行时动态修改Consumer的消费位置。
四、完整代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class FlinkKafkaConsumerDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "test"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props); DataStream stream = env.addSource(consumer); // do some processing on the stream stream.print(); env.execute("Flink Kafka Consumer Demo"); } }