您的位置:

FlinkState在Flink流式计算中的应用

一、FlinkState简介

FlinkState 是 Apache Flink 中,用与表示和处理状态(state)的一个核心组件。在流式计算中,状态是处理逐步发展的关键。在传统计算模型中,每个计算任务都有自己的状态,而在 Flink 流式计算框架中,所有的计算任务共享一个状态。因此,FlinkState 能够轻松的应对大规模、高并发、低延迟、容错性的计算需求。

二、FlinkState的核心特点

1、分布式:FlinkState允许分布式地存储和访问状态,避免了单个节点故障导致状态丢失的问题。

2、高可用:在分布式存储的基础上,FlinkState 提供了高可用性的保证。当存在节点故障时,FlinkState 能够使用备份节点快速恢复状态。

3、容错性:FlinkState 具有自动的快照机制,能够在接受到故障恢复请求时,快速恢复计算任务的状态。

4、高性能:FlinkState 提供了快速的数据读写能力,能够保证高并发、低延迟的计算需求。

三、FlinkState在Flink流式计算中的应用

1、FlinkState的模式

FlinkState 模式有 4 种:

ValueState
   
ListState
    
MapState
     
ReducingState
      

      
     
    
   

其中:

ValueState :保存单个Java对象(类型为T)的状态。

ListState :保存一个Java对象(类型为T)列表的状态。

MapState :保存键值对的状态,键是一个Java对象(类型为K),值是另一个Java对象(类型为V)。

ReducingState :状态类型为T的集合进行固定操作的结果。

2、简化计算

使用 FlinkState 可以简化一些计算任务。例如,我们要在流中筛选出不同的用户数据,然后计算用户的平均值。在传统的计算模型中,我们需要维护两个状态:用户数量及其对应的总值。而在 Flink 流式计算中,我们可以定义一个 Sum 状态,在 Sum 状态中,保存当前流的总和即可。这样可以避免在计算过程中不断判定用户数据的状态,大大简化计算。以下是实现代码示例:

public static class AvgFunction extends RichFlatMapFunction
   
    , Tuple2
     > {

    private transient ValueState
      
       
        > sum; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor
        
         
          > descriptor = new ValueStateDescriptor<>("sum", TypeInformation.of(new TypeHint
          
           
            >() {})); sum = getRuntimeContext().getState(descriptor); } @Override public void flatMap(Tuple2
            
             value, Collector
             
              
               > out) throws Exception { Tuple2
               
                currentSum = sum.value(); if (currentSum == null) { currentSum = Tuple2.of(0, 0); } currentSum.f0 += 1; currentSum.f1 += value.f1; sum.update(currentSum); if (currentSum.f0 >= 3) { double avg = (double) currentSum.f1 / currentSum.f0; out.collect(Tuple2.of(value.f0, avg)); sum.clear(); } } }
               
              
             
            
           
          
         
        
       
      
     
    
   

3、统计任务

在一些统计任务中,需要维护某些 Key 的状态,记录它们的经过时间后出现的次数。比如我们可以用 FlinkState 实现一个简单的登录任务,记录某个用户在几小时内登录了几次,以下是实现代码示例:

public static class LoginCount extends RichFlatMapFunction
   
    , Tuple2
     > {

    private static final long HOUR_MS = 60 * 60 * 1000;
    private static final long SECOND_MS = 1000;

    private transient MapState
       countMap;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        MapStateDescriptor
       
        countDesc = new MapStateDescriptor<>("count", Long.class, Integer.class); countMap = getRuntimeContext().getMapState(countDesc); } @Override public void flatMap(Tuple2
        
         event, Collector
         
          
           > out) throws Exception { long hour = event.f1 / HOUR_MS; int count = 1; Integer oldCount = countMap.get(hour); if (oldCount != null) { count += oldCount; } countMap.put(hour, count); int sum = 0; for (Integer integer : countMap.values()) { sum += integer; } out.collect(Tuple2.of(event.f0, sum)); } }
          
         
        
       
      
     
    
   

4、跨任务状态共享

在 Flink 流式计算中,多个任务可能需要共享一些状态,例如,在一个事件流系统中,多个流都需要同时接受数据。在这种情况下,我们可以使用 Flink 的 Broadcast State 来共享状态。以下是实现代码示例:

public static final MapStateDescriptor
    BC_DESC = new MapStateDescriptor<>("broadcast", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);

public static class SplitStream extends ProcessFunction
     {

    private transient MapState
      broadcastState;

    @Override
    public void open(Configuration parameters) throws Exception {
        broadcastState = getRuntimeContext().getMapState(BC_DESC);
    }

    @Override
    public void processElement(String value, Context ctx, Collector
       out) throws Exception {
        //从广播状态中获取特定信息做相应处理
        String bcInfo = broadcastState.get("bcInfo");
        if (StringUtils.isNotBlank(bcInfo)) {
            out.collect(value + " " + bcInfo);
        }
    }
}

public static class BroadcastStream extends RichMapFunction
       
        
         , Tuple2
         
          > { private transient MapState
          
           broadcastState; @Override public void open(Configuration parameters) throws Exception { broadcastState = getRuntimeContext().getMapState(BC_DESC); //将bcInfo信息放入广播状态中 broadcastState.put("bcInfo", "broadcastInfo"); } @Override public Tuple2
           
            map(Tuple2
            
             value) throws Exception { return Tuple2.of(value.f1, value.f0.toString()); } }
            
           
          
         
        
       
      
     
    
   

总结

本文详细介绍了 FlinkState 的特点、优点、常见模式以及应用场景,以及精简计算、统计任务、跨任务状态共享案例撰写的实现代码。