一、简介
Flink Kafka Consumer是Flink中针对Kafka数据源编写的一个控制台消费程序。其主要作用是从Kafka中消费数据,将消费的数据转换成Flink中的DataStream数据流,然后通过Flink的各种算子进行数据的处理和分析。
二、使用方式
使用Flink Kafka Consumer非常简单,只需要在Flink任务中先引入flink-connector-kafka_2.12
依赖,然后使用下面的方式创建一个KafkaConsumer:
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("topic-name", new SimpleStringSchema(), props);
其中,topic-name
是要消费的Kafka Topic名称,SimpleStringSchema
是指这个Kafka Topic的数据是以字符串的形式进行编码,props
是一个Properties对象,用于设置KafkaConsumer的一些配置信息,例如消费者组名称、Kafka Broker地址等等。
一旦创建好FlinkKafkaConsumer,我们就可以使用Flink的DataStream API从Kafka中消费数据了:
DataStream<String> stream = env.addSource(kafkaConsumer);
这样,我们就可以通过stream
对消费到的Kafka Topic数据进行各种分析和处理了。
三、常用参数详解
1. properties文件的配置
在Flink任务中使用FlinkKafkaConsumer消费Kafka Topic时,需要通过设置Properties对象来配置Kafka Consumer的各种参数。下面是一些常用的参数配置:
bootstrap.servers
:指定Kafka Broker地址,格式是host:port,多个Broker地址用逗号隔开。group.id
:指定Consumer Group的名称。auto.offset.reset
:指定Consumer在没有offset的情况下,从何处开始消费,可以选择earliest或latest。enable.auto.commit
:指定是否启用Auto Commit功能。fetch.max.bytes
:指定每次从Kafka Broker拉取数据的最大字节数。
2. 反序列化器的配置
FlinkKafkaConsumer需要将Kafka Topic中的数据解码成Flink中的java对象,这个过程可以使用Kafka提供的反序列化器来完成。Flink提供了各种反序列化器,例如SimpleStringSchema
、JSONDeserializationSchema
等等,常用的反序列化器配置如下:
SimpleStringSchema
:用于将字符串数据解析成String类型。JSONDeserializationSchema
:用于将JSON数据解析成POJO对象。AvroDeserializationSchema
:用于将Avro数据解析成POJO对象。
3. 消费位置的配置
FlinkKafkaConsumer支持多种不同的消费位置,例如从最早的Offset开始消费、从最新的Offset开始消费、从指定的Offset开始消费等等。我们可以通过配置KafkaConsumer的一个属性来指定消费位置。
auto.offset.reset
:当第一次启动一个Consumer时,如果没有指定一个初始的消费位置,那么Consumer会自动根据这个属性来设置消费位置。assign
和subscribe
:通过手动指定Topic Partition的Offset来设置消费位置。seek
:在运行时动态修改Consumer的消费位置。
四、完整代码示例
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class FlinkKafkaConsumerDemo {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-topic", new SimpleStringSchema(), props);
DataStream<String> stream = env.addSource(consumer);
// do some processing on the stream
stream.print();
env.execute("Flink Kafka Consumer Demo");
}
}