一、背景介绍
在现今互联网时代,获取和处理实时数据已经成为数据处理领域的重要研究方向之一。对于实时数据的处理,流式计算框架成为了关键技术之一。而 Apache Flink 作为开源的流式计算框架,被广泛使用于实时数据的处理,这里加入Kafka,将Flink作为实时处理引擎,与Kafka集成,在Kafka中实时获取数据。
二、FlinkKafka整合介绍
整合Flink与Kafka的方式可分为两种,一种是使用 Flink 官方提供的Kafka Consumer API 直连Kafka,一种是使用 FlinkKafkaConnector 接入Kafka,下面将逐一介绍两种方式的具体实现。
三、使用 Flink 官方提供的 Kafka Consumer API 直连 Kafka
在此之前,需要确保读者对 Kafka 的基本概念以及Flink的 DataSet 和 DataStream 有一定的了解。如果您还不熟悉这两个东西,建议先去了解一下。 使用 Flink 的 Java 调用 Kafka 的 Consumer API 直连 Kafka,它需要我们手动实现一个 FlinkKafkaConsumer010 的一个匿名内部类,来指定我们要读取的 Kafka topic、消息反序列化的类以及 Kafka broker 的信息。具体代码实现如下:
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Properties; public class FlinkKafkaConsumerDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); DataStreamSourcekafkaSource = env.addSource(new FlinkKafkaConsumer010 ( "mytopic", //topic new SimpleStringSchema(), // String 序列化 properties)); // Kafka 连接参数 kafkaSource.print(); env.execute("Flink Streaming Java API Skeleton"); } }
四、使用 FlinkKafkaConnector 接入 Kafka
如果不想手动实现 Kafka Consumer API,也可以使用 Flink 提供的 FlinkKafkaConnector 来实现接入 Kafka。 下面是一个使用 FlinkKafkaConnector读取 Kafka 数据的例子,需要引用FlinkKafkaConnector依赖包和Kafka的依赖包:
import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; public class KafkaSource { private static final String brokerList = "localhost:9092"; private static final String topic = "test"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.setProperty("bootstrap.servers", brokerList); props.setProperty("group.id", "test"); DataStreamSourcekafkaSource = env.addSource(new FlinkKafkaConsumer011 (topic, new SimpleStringSchema(), props)); kafkaSource.print(); env.execute("KafkaSource"); } }
五、Kafka 数据写入到 Flink
在 Streaming 处理过程中,除了从 Kafka 中读取数据之外,如果我们想把 Flink 中处理过的结果返回到 Kafka 中,那么就需要了解 Flink 内置的 Kafka Producer API 的使用。 这里我们使用 FlinkKafkaProducer010 来将结果写入到 Kafka 中。以下实例代码是将Flink 数据写入到 Kafka 中。
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import java.util.Properties; public class KafkaSink { private static final String brokerList = "localhost:9092"; private static final String topic = "test"; public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", brokerList); FlinkKafkaProducer010kafkaProducer = new FlinkKafkaProducer010<>(topic,new SimpleStringSchema(),properties); DataStream input = env.socketTextStream("localhost", 9000, "\n"); input.addSink(kafkaProducer); env.execute("KafkaSink"); } }
六、总结
本文主要对 FlinkKafka 整合的两种方式进行了详细的介绍。作为一种高效实时流处理解决方案,FlinkKafka已被广泛应用于大数据领域。本文介绍的两种整合方式均使用简单、效率高、易于控制,可以有效满足多种实时计算使用场景。