一、Flink概述
Apache Flink是一个分布式计算引擎,能够处理有限数据流和无限数据流。 它可以为Spark,Hadoop和Storm等现有框架提供替代方案,并提供更好的性能和可扩展性。
Flink提供了一种称为流处理的新型计算模式,这种计算模式可以处理事件流,并支持非常低的延迟和高的吞吐量。 Flink还提供了存储和处理批处理数据的能力。 因此,Flink可以处理所有类型的数据流,从简单的请求响应到数据挖掘和复杂的事件处理。
二、WordCount的概念
WordCount是一个可以统计文档中每个单词出现次数的程序。 它是 分布式系统的"Hello, World!"。 在分布式系统中,您需要处理文件或流,并将其划分为片段,以在多个节点上并行处理。 在WordCount示例中,文件被划分为“行”(每行文本),并且每个节点处理一部分数据(各行文本的单词计数)。
三、Flink实现WordCount
1、环境设置
首先,在您的本地机器上安装Java,然后下载并安装Flink。 flinkwordcount程序可以在本地或分布式集群上运行。
2、创建Flink项目
使用Eclipse或IntelliJ等任何一个Java IDE,创建一个Maven项目,该项目包含所需的Flink依赖项。 您可以使用以下依赖关系在POM.xml中获取所需的库:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<version>1.14.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
</dependency>
3、实现Flink WordCount代码
下面展示了Flink WordCount的代码:
package com.flink.example;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class FlinkWordCount {
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataStream
text;
final StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
text = see.readTextFile(params.get("input"));
DataStream
counts = text
.flatMap(new FlatMapFunction
() {
@Override
public void flatMap(String value, Collector
out) {
for (String word : value.split("\\s")) {
out.collect(new WordCount(word, 1));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.sum("count");
counts.writeAsText(params.get("output"), WriteMode.OVERWRITE);
see.execute();
}
public static class WordCount {
public String word;
public int count;
public WordCount() {}
public WordCount(String word, int count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return "WordCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}
4、代码解析
首先,我们从参数工具创建带有ParameterTool的ExecutionEnvironment对象。 然后,我们读取指定的输入文件并使用flatMap函数将其转换为键值对。 我们使用keyBy函数对WordCount(word,count)键入过的流"word"进行分组,并使累加器累加值。 该代码使用窗口运算符,它将一定量的数据视为一个数据集,并在整个数据集上应用函数计算。
5、编译和运行
编译和运行flinkwordcount程序,使用以下命令:
./bin/flink run /path/to/flinkwordcount-0.0.1.jar --input /path/to/input/file --output /path/to/output/directory
6、结果输出
运行flinkwordcount程序后,它将在指定的输出目录中生成一个文本文件,并在其中提供WordCount结果。以下是输出文件的示例:
(hello, 1)
(world, 1)
(hello, 1)
四、总结
在本文中,我们学习了Apache Flink,并看到如何使用它来实现分布式系统中的WordCount程序。 Flink提供了一种称为流处理的新型计算模式,该计算模式可以处理事件流,并支持非常低的延迟和高的吞吐量。 我们看到了如何使用flinkwordcount程序来计算文档中每个单词的出现次数。希望这篇文章对您有所帮助。