您的位置:

Spark.speculation详解

在大数据处理场景中,Spark一直是一个被广泛使用的框架。对于Spark性能优化的探索也一直不停止。Spark.speculation是其中一个性能优化的重要手段之一。本文将围绕着Spark.speculation来进行展开。

一、启用Spark.speculation=true

首先,让我们来看一下如何启用Spark.speculation。通过设置SparkConf中的spark.speculation参数来启用:

SparkConf sparkConf =  new SparkConf().setAppName(appName)
.set("spark.speculation", "true");

这可能是最简单的启用Spark.speculation的方法。如果您需要更多的Spark.speculation定制参数,请参阅下一节。

二、Spark.speculation参数说明

1. spark.speculation.interval

spark.speculation.interval表示两次检查确认残留任务的间隔时间。默认情况下,此参数设置为100毫秒。

值得注意的是,如果您设置此值过高,则可能会减缓Spark中的任务完成。因此,在特定环境下,您可能需要将此参数视为最佳设置。

SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.speculation", "true")
.set("spark.speculation.interval", "50ms");

2. spark.speculation.multiplier

spark.speculation.multiplier 用于计算任务是否被认为是慢任务。该参数的默认值为1.5。因此,如果计算任务已经超过该任务的平均时间的1.5倍,则认为该任务是一个慢任务。

如果您认为设置1.5的倍增因子不足以检测到某些特定运行缓慢的任务,那么可以适当增加此倍增因子。

SparkConf sparkConf = new SparkConf().setAppName(appName)
.set("spark.speculation", "true")
.set("spark.speculation.multiplier", "2.0");

3. spark.speculation.quantile(已弃用)

该参数原本存放了任务完成时间的百分位数,以便确定任务的执行时间。但是,从Spark 2.0.0版本开始,该参数已经被弃用并被spark.speculation.multiplier取代。

三、实战应用Spark.speculation

1. 按 Spark.speculation默认配置运行任务

在示例代码中,我们将使用如下Spark任务:

JavaRDD
    rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9))
...
JavaPairRDD<Integer, Integer> pairRDD = rdd.mapToPair(i -> new Tuple2(i, i * 2));
...
JavaPairRDD<Integer, Integer> resultRDD = pairRDD.reduceByKey((x,y)->x+y);
resultRDD.foreach(x -> System.out.println(x._1() + ':' + x._2()));
   

在默认情况下,Spark不会启用Spark speculation。因此,任务完成时间可能会非常长,如下图所示:

在本例中,*任务8* 可能是我们需要解决的问题。在Spark的任务日志中,我们可以看到,任务8的执行时间是6819毫秒,这远远高于任务的平均执行时间。这表明任务8正在缓慢运行。

2. 启用Spark speculations

为了启用Spark speculations,在代码中设置SparkConf对象即可。

SparkConf sparkConf =  new SparkConf().setAppName(appName)
.set("spark.speculation", "true");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

设置Spark.speculation以后,再次运行任务:

JavaRDD
    rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9))
...
JavaPairRDD<Integer, Integer> pairRDD = rdd.mapToPair(i -> new Tuple2(i, i * 2));
...
JavaPairRDD<Integer, Integer> resultRDD = pairRDD.reduceByKey((x,y)->x+y);
resultRDD.foreach(x -> System.out.println(x._1() + ':' + x._2()));
   

此时,在那些运行缓慢的任务上计算的机器将启用Spark speculation进一步计算结果并验证结果正确性

最终结果如下所示:

我们看到任务 8 的执行时间大大缩短,这意味着Spark.speculation在该任务上有效工作。

四、总结

通过Spark.speculation,您可以轻松地检测缓慢运行的任务并重新计算以提高任务执行时间和Spark作业的整体执行效率。通过本文的介绍,相信您能更好的了解Spark.speculation的原理和实际应用。