一、FlinkState简介
FlinkState 是 Apache Flink 中,用与表示和处理状态(state)的一个核心组件。在流式计算中,状态是处理逐步发展的关键。在传统计算模型中,每个计算任务都有自己的状态,而在 Flink 流式计算框架中,所有的计算任务共享一个状态。因此,FlinkState 能够轻松的应对大规模、高并发、低延迟、容错性的计算需求。
二、FlinkState的核心特点
1、分布式:FlinkState允许分布式地存储和访问状态,避免了单个节点故障导致状态丢失的问题。
2、高可用:在分布式存储的基础上,FlinkState 提供了高可用性的保证。当存在节点故障时,FlinkState 能够使用备份节点快速恢复状态。
3、容错性:FlinkState 具有自动的快照机制,能够在接受到故障恢复请求时,快速恢复计算任务的状态。
4、高性能:FlinkState 提供了快速的数据读写能力,能够保证高并发、低延迟的计算需求。
三、FlinkState在Flink流式计算中的应用
1、FlinkState的模式
FlinkState 模式有 4 种:
ValueState
ListState
MapState
ReducingState
其中:
ValueState
ListState
MapState
ReducingState
2、简化计算
使用 FlinkState 可以简化一些计算任务。例如,我们要在流中筛选出不同的用户数据,然后计算用户的平均值。在传统的计算模型中,我们需要维护两个状态:用户数量及其对应的总值。而在 Flink 流式计算中,我们可以定义一个 Sum 状态,在 Sum 状态中,保存当前流的总和即可。这样可以避免在计算过程中不断判定用户数据的状态,大大简化计算。以下是实现代码示例:
public static class AvgFunction extends RichFlatMapFunction
, Tuple2
> {
private transient ValueState
> sum; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor
> descriptor = new ValueStateDescriptor<>("sum", TypeInformation.of(new TypeHint
>() {})); sum = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Tuple2
value, Collector
> out) throws Exception { Tuple2
currentSum = sum.value(); if (currentSum == null) { currentSum = Tuple2.of(0, 0); } currentSum.f0 += 1; currentSum.f1 += value.f1; sum.update(currentSum); if (currentSum.f0 >= 3) { double avg = (double) currentSum.f1 / currentSum.f0; out.collect(Tuple2.of(value.f0, avg)); sum.clear(); } } }
3、统计任务
在一些统计任务中,需要维护某些 Key 的状态,记录它们的经过时间后出现的次数。比如我们可以用 FlinkState 实现一个简单的登录任务,记录某个用户在几小时内登录了几次,以下是实现代码示例:
public static class LoginCount extends RichFlatMapFunction
, Tuple2
> {
private static final long HOUR_MS = 60 * 60 * 1000;
private static final long SECOND_MS = 1000;
private transient MapState
countMap;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
MapStateDescriptor
countDesc = new MapStateDescriptor<>("count", Long.class, Integer.class); countMap = getRuntimeContext().getMapState(countDesc); } @Override public void flatMap(Tuple2
event, Collector
> out) throws Exception { long hour = event.f1 / HOUR_MS; int count = 1; Integer oldCount = countMap.get(hour); if (oldCount != null) { count += oldCount; } countMap.put(hour, count); int sum = 0; for (Integer integer : countMap.values()) { sum += integer; } out.collect(Tuple2.of(event.f0, sum)); } }
4、跨任务状态共享
在 Flink 流式计算中,多个任务可能需要共享一些状态,例如,在一个事件流系统中,多个流都需要同时接受数据。在这种情况下,我们可以使用 Flink 的 Broadcast State 来共享状态。以下是实现代码示例:
public static final MapStateDescriptor
BC_DESC = new MapStateDescriptor<>("broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
public static class SplitStream extends ProcessFunction
{
private transient MapState
broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
broadcastState = getRuntimeContext().getMapState(BC_DESC);
}
@Override
public void processElement(String value, Context ctx, Collector
out) throws Exception {
//从广播状态中获取特定信息做相应处理
String bcInfo = broadcastState.get("bcInfo");
if (StringUtils.isNotBlank(bcInfo)) {
out.collect(value + " " + bcInfo);
}
}
}
public static class BroadcastStream extends RichMapFunction
, Tuple2
> { private transient MapState
broadcastState; @Override public void open(Configuration parameters) throws Exception { broadcastState = getRuntimeContext().getMapState(BC_DESC); //将bcInfo信息放入广播状态中 broadcastState.put("bcInfo", "broadcastInfo"); } @Override public Tuple2
map(Tuple2
value) throws Exception { return Tuple2.of(value.f1, value.f0.toString()); } }
总结
本文详细介绍了 FlinkState 的特点、优点、常见模式以及应用场景,以及精简计算、统计任务、跨任务状态共享案例撰写的实现代码。