DataStreamtext = env.socketTextStream("localhost", 9999); DataStream mapResult = text .map(new MapFunction () { public Integer map(String value) { return Integer.parseInt(value); } }); DataStream sum = mapResult .keyBy(0) .sum(1); sum.print(); env.execute("Flink Streaming Java API Skeleton");
DataStream> input = env.fromElements( Tuple2.of("a", 1), Tuple2.of("a", 2), Tuple2.of("a", 3), Tuple2.of("b", 4), Tuple2.of("b", 5), Tuple2.of("b", 6) ); input .keyBy(0) .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) .sum(1) .print(); env.execute();
public class Average implements MapFunction, Tuple2 >> { @Override public Tuple2 map(Tuple2 value) throws Exception { Double avg = (double) (value.f1 / value.f0); return Tuple2.of(value.f0, avg); } } DataStream > input = env.fromElements( Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 6L) ); input.keyBy(0) .map(new Average()) .keyBy(0) .map(new RichMapFunction , Tuple2 >>() { private transient ValueState > valueState; @Override public Tuple2 > map(Tuple2 input) throws Exception { Tuple2 currentSum = valueState.value(); if (currentSum == null) { currentSum = Tuple2.of(0.0, 0); } currentSum.f0 += input.f1; currentSum.f1 += 1; valueState.update(currentSum); return Tuple2.of(input.f0, currentSum); } @Override public void open(Configuration config) throws Exception { ValueStateDescriptor > descriptor = new ValueStateDescriptor<>("average", Types.TUPLE(Types.DOUBLE, Types.INT)); valueState = getRuntimeContext().getState(descriptor); } }) .print(); env.execute();
public class TimestampWithFailures { private long timestamp; private final boolean isNormal; public TimestampWithFailures(long timestamp, boolean isNormal) { this.timestamp = timestamp; this.isNormal = isNormal; } public long getTimestamp() { return timestamp; } public void setTimestamp(long timestamp) { this.timestamp = timestamp; } public boolean isNormal() { return isNormal; } } public class WatermarkExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); WatermarkStrategystrategy = WatermarkStrategy . forBoundedOutOfOrderness(Duration.ofMillis(500)) .withTimestampAssigner((event, timestamp) -> event.getTimestamp()); DataStream stream = env.addSource(new SourceFunction () { private Random rand = new Random(); @Override public void run(SourceContext ctx) throws Exception { int maxTimestamp = 102000; int numFailures = 0; while (!Thread.interrupted()) { long timestamp = System.currentTimeMillis() - rand.nextInt(maxTimestamp); if (rand.nextBoolean() || rand.nextBoolean()) { ctx.collectWithTimestamp(new TimestampWithFailures(timestamp, true), timestamp); } else { numFailures++; // only emit watermark if there were normal events emitted since last watermark if (numFailures % 3 == 0) { ctx.emitWatermark(new Watermark(timestamp - 500)); } // emit event with a bit delay, but still within the bounds of the assigned watermark ctx.collectWithTimestamp(new TimestampWithFailures(timestamp + 250, false), timestamp + 250); } } } @Override public void cancel() { } }); stream .assignTimestampsAndWatermarks(strategy) .keyBy(event -> event.isNormal()) .countWindow(10) .sum(1) .print(); env.execute("Stress test with Watermarks"); } }
在上面的例子中,我们模拟了一个随机生成数据的源,其中一些数据处理成功,另一些会失败。在处理失败事件时,我们会发出水印,并将其与事件时间轴上稍后的普通事件一起传输。在这个例子中,我们使用了forBoundedOutOfOrderness WatermarkStrategy,该策略使用最近500毫秒内收到的最大事件时间戳。这个例子中的累加器在一个10个元素的窗口上进行计算。
五、Flink SQL
Flink SQL是Flink的一种高级抽象层,允许用户使用SQL查询语言对数据流和表进行操作。Flink SQL支持标准的SQL语法,并提供了一些Flink特定的扩展。下面是一个使用Flink SQL查询语言的例子:
public class FlinkSqlExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStreamSourceorderSource = env.addSource(new ExampleData.OrderSource()); Table orderTable = tableEnv.fromDataStream(orderSource, "user, product, amount, rowtime.rowtime"); tableEnv.registerTable("Orders", orderTable); Table result = tableEnv.sqlQuery("SELECT TUMBLE_START(rowtime, INTERVAL '5' SECOND), product, sum(amount) as total_amount FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL '5' SECOND), product"); DataStream resultSet = tableEnv.toAppendStream(result, Row.class); resultSet.print(); env.execute(); } }