您的位置:

Hive数据倾斜问题详解

一、数据倾斜的概念

数据倾斜是指在某些任务执行过程中,数据被分配到不同的处理节点上,但是某些节点上的数据过多,造成这些节点的负载压力过大,其他节点的负载并不高,造成了任务执行效率低下的情况。

我们常见的大数据处理技术,如Hive、Spark等,当数据倾斜发生时,任务的执行时间将会大大增加,因为数据倾斜产生的节点将会成为任务瓶颈,导致任务难以完成。

在Hive运行时发生数据倾斜的主要原因有:数据分布不均、Join操作中分桶列不相同、分桶列数据分布不均等。因此,我们需要针对这些原因进行改进和优化,才能有效避免数据倾斜带来的问题。

二、数据倾斜的解决方案

1. 数据分布不均

数据倾斜最常见的情况就是数据分布不均,一个或几个分区的数据量过大导致负载严重不平衡。我们可以采取以下措施:

(1)把存储和计算分离,先对数据进行采样,然后把采样的结果放到一个中间表中,再进行计算操作。

示例代码:

    create table sample_table as
    select * from original_table tablesample (10 percent);
    create temporary table temp_table as
    select /*+ mapjoin(a) */
    a.*, b.xxx
    from sample_table a
    left join big_table b
    on a.id=b.id;
    insert into result_table
    select /*+ mapjoin(a) */
    a.*, b.xxx
    from original_table a
    left join temp_table b
    on a.id=b.id;

(2)通过调整分区和使用动态分区将数据均匀分散到各个节点上。

示例代码:

  insert overwrite table target_table partition(date)
  select * where date='2021-01-01';

2. Join操作中分桶列不相同

当两个表通过Join操作进行连接时,若连接的列不是分桶列,会导致数据倾斜。若连接的列是分桶列但分桶列不相同,同样会导致数据倾斜。我们可以通过以下措施解决:

(1)让连接的列也成为相同的分桶列。

示例代码:

  set hive.optimize.bucketmapjoin=true;
  set hive.optimize.bucketmapjoin.sortedmerge=true;
  set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
  
  create table raw_table(...)
  clustered by (id) into 3 buckets;
  
  create table result_table(...)
  clustered by (id) into 3 buckets;
  
  insert overwrite table result_table
  select /*+ mapjoin(a) */
  a.*, b.xxx
  from raw_table a
  join raw_table b
  on a.id=b.id and a.dt=b.dt;

(2)使用Map Join或者Sort Merge Join。

示例代码:

  set hive.optimize.bucketmapjoin=true;
  set hive.optimize.bucketmapjoin.sortedmerge=true;
  
  create temporary table temp_table as
  select /*+ mapjoin(a) */
  a.*, b.xxx
  from table_a a
  join table_b b
  on a.id=b.id;
  
  insert overwrite table result_table
  select /*+ mapjoin(a) */
  a.*, b.xxx
  from table_a a
  left join temp_table b
  on a.id=b.id;

3. 分桶列数据分布不均

如果分桶列的数据分布不均,同样会导致数据倾斜。我们可以使用以下方法解决:

(1)增加分桶数。

示例代码:

  set hive.enforce.bucketing=true;
  set hive.exec.dynamic.partition=true;
  set hive.exec.dynamic.partition.mode=nonstrict;
  set hive.optimize.bucketmapjoin=true;
  
  create table test_a(...)
  clustered by (id) into 10 buckets;
  
  create table result_table(...)
  clustered by (id) into 10 buckets;
  
  insert overwrite table result_table
  select /*+ mapjoin(a) */
  a.*, b.xxx
  from test_a a
  left join table_b b
  on a.id=b.id;

(2)对分桶列进行求模操作。

示例代码:

  create table tab1 (key int, value string)
  partitioned by (ds string, dt string)
  clustered by (key) into 10 buckets
  stored as orc;

  create table tab2 (key int, value string)
  partitioned by (ds string, dt string)
  clustered by (key) into 10 buckets
  stored as orc;
  
  insert into table tab1 partition (ds='2019-10-01', dt='2019-10-01') values(10,'a');
  insert into table tab1 partition (ds='2019-10-01', dt='2019-10-01') values(11,'b');
  insert into table tab1 partition (ds='2019-10-02', dt='2019-10-02') values(12,'c');
  insert into table tab1 partition (ds='2019-10-02', dt='2019-10-02') values(13,'d');
  insert into table tab1 partition (ds='2019-10-03', dt='2019-10-03') values(14,'e');
  insert into table tab1 partition (ds='2019-10-03', dt='2019-10-03') values(15,'f');
 
  insert into table tab2 partition (ds='2019-10-01', dt='2019-10-01') values(23,'x');
  insert into table tab2 partition (ds='2019-10-02', dt='2019-10-02') values(25,'y');
  insert into table tab2 partition (ds='2019-10-03', dt='2019-10-03') values(27,'z');
 
  set mapred.reduce.tasks=10;
  select * from tab1 a join tab2 b
  on a.key%10=b.key%10 
  and a.ds=b.ds 
  and a.dt=b.dt;

三、总结

数据倾斜问题一直是大数据处理领域的难点和痛点,解决数据倾斜问题关乎整个大数据技术的发展和应用。在Hive运行中,可以采取对数据分布、Join操作、分桶列等方面的优化来解决数据倾斜问题。我们需要对各种优化方法进行不断的总结和实践,以期达到更优秀的处理效果。