您的位置:

Flink实现WordCount

一、Flink概述

Apache Flink是一个分布式计算引擎,能够处理有限数据流和无限数据流。 它可以为Spark,Hadoop和Storm等现有框架提供替代方案,并提供更好的性能和可扩展性。

Flink提供了一种称为流处理的新型计算模式,这种计算模式可以处理事件流,并支持非常低的延迟和高的吞吐量。 Flink还提供了存储和处理批处理数据的能力。 因此,Flink可以处理所有类型的数据流,从简单的请求响应到数据挖掘和复杂的事件处理。

二、WordCount的概念

WordCount是一个可以统计文档中每个单词出现次数的程序。 它是 分布式系统的"Hello, World!"。 在分布式系统中,您需要处理文件或流,并将其划分为片段,以在多个节点上并行处理。 在WordCount示例中,文件被划分为“行”(每行文本),并且每个节点处理一部分数据(各行文本的单词计数)。

三、Flink实现WordCount

1、环境设置

首先,在您的本地机器上安装Java,然后下载并安装Flink。 flinkwordcount程序可以在本地或分布式集群上运行。

2、创建Flink项目

使用Eclipse或IntelliJ等任何一个Java IDE,创建一个Maven项目,该项目包含所需的Flink依赖项。 您可以使用以下依赖关系在POM.xml中获取所需的库:


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-parent</artifactId>
    <version>1.14.2</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
</dependency>

3、实现Flink WordCount代码

下面展示了Flink WordCount的代码:


package com.flink.example;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class FlinkWordCount {

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        DataStream
    text;
        final StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();

        text = see.readTextFile(params.get("input"));
        DataStream
     counts = text
                .flatMap(new FlatMapFunction
     () {
                    @Override
                    public void flatMap(String value, Collector
       out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordCount(word, 1));
                        }
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5))
                .sum("count");

        counts.writeAsText(params.get("output"), WriteMode.OVERWRITE);
        see.execute();
    }

    public static class WordCount {
        public String word;
        public int count;

        public WordCount() {}

        public WordCount(String word, int count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return "WordCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

      
     
    
   

4、代码解析

首先,我们从参数工具创建带有ParameterTool的ExecutionEnvironment对象。 然后,我们读取指定的输入文件并使用flatMap函数将其转换为键值对。 我们使用keyBy函数对WordCount(word,count)键入过的流"word"进行分组,并使累加器累加值。 该代码使用窗口运算符,它将一定量的数据视为一个数据集,并在整个数据集上应用函数计算。

5、编译和运行

编译和运行flinkwordcount程序,使用以下命令:


./bin/flink run /path/to/flinkwordcount-0.0.1.jar --input /path/to/input/file --output /path/to/output/directory

6、结果输出

运行flinkwordcount程序后,它将在指定的输出目录中生成一个文本文件,并在其中提供WordCount结果。以下是输出文件的示例:


(hello, 1)
(world, 1)
(hello, 1)

四、总结

在本文中,我们学习了Apache Flink,并看到如何使用它来实现分布式系统中的WordCount程序。 Flink提供了一种称为流处理的新型计算模式,该计算模式可以处理事件流,并支持非常低的延迟和高的吞吐量。 我们看到了如何使用flinkwordcount程序来计算文档中每个单词的出现次数。希望这篇文章对您有所帮助。