一、数据倾斜的概念
数据倾斜是指在某些任务执行过程中,数据被分配到不同的处理节点上,但是某些节点上的数据过多,造成这些节点的负载压力过大,其他节点的负载并不高,造成了任务执行效率低下的情况。
我们常见的大数据处理技术,如Hive、Spark等,当数据倾斜发生时,任务的执行时间将会大大增加,因为数据倾斜产生的节点将会成为任务瓶颈,导致任务难以完成。
在Hive运行时发生数据倾斜的主要原因有:数据分布不均、Join操作中分桶列不相同、分桶列数据分布不均等。因此,我们需要针对这些原因进行改进和优化,才能有效避免数据倾斜带来的问题。
二、数据倾斜的解决方案
1. 数据分布不均
数据倾斜最常见的情况就是数据分布不均,一个或几个分区的数据量过大导致负载严重不平衡。我们可以采取以下措施:
(1)把存储和计算分离,先对数据进行采样,然后把采样的结果放到一个中间表中,再进行计算操作。
示例代码:
create table sample_table as select * from original_table tablesample (10 percent); create temporary table temp_table as select /*+ mapjoin(a) */ a.*, b.xxx from sample_table a left join big_table b on a.id=b.id; insert into result_table select /*+ mapjoin(a) */ a.*, b.xxx from original_table a left join temp_table b on a.id=b.id;
(2)通过调整分区和使用动态分区将数据均匀分散到各个节点上。
示例代码:
insert overwrite table target_table partition(date) select * where date='2021-01-01';
2. Join操作中分桶列不相同
当两个表通过Join操作进行连接时,若连接的列不是分桶列,会导致数据倾斜。若连接的列是分桶列但分桶列不相同,同样会导致数据倾斜。我们可以通过以下措施解决:
(1)让连接的列也成为相同的分桶列。
示例代码:
set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; create table raw_table(...) clustered by (id) into 3 buckets; create table result_table(...) clustered by (id) into 3 buckets; insert overwrite table result_table select /*+ mapjoin(a) */ a.*, b.xxx from raw_table a join raw_table b on a.id=b.id and a.dt=b.dt;
(2)使用Map Join或者Sort Merge Join。
示例代码:
set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; create temporary table temp_table as select /*+ mapjoin(a) */ a.*, b.xxx from table_a a join table_b b on a.id=b.id; insert overwrite table result_table select /*+ mapjoin(a) */ a.*, b.xxx from table_a a left join temp_table b on a.id=b.id;
3. 分桶列数据分布不均
如果分桶列的数据分布不均,同样会导致数据倾斜。我们可以使用以下方法解决:
(1)增加分桶数。
示例代码:
set hive.enforce.bucketing=true; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.optimize.bucketmapjoin=true; create table test_a(...) clustered by (id) into 10 buckets; create table result_table(...) clustered by (id) into 10 buckets; insert overwrite table result_table select /*+ mapjoin(a) */ a.*, b.xxx from test_a a left join table_b b on a.id=b.id;
(2)对分桶列进行求模操作。
示例代码:
create table tab1 (key int, value string) partitioned by (ds string, dt string) clustered by (key) into 10 buckets stored as orc; create table tab2 (key int, value string) partitioned by (ds string, dt string) clustered by (key) into 10 buckets stored as orc; insert into table tab1 partition (ds='2019-10-01', dt='2019-10-01') values(10,'a'); insert into table tab1 partition (ds='2019-10-01', dt='2019-10-01') values(11,'b'); insert into table tab1 partition (ds='2019-10-02', dt='2019-10-02') values(12,'c'); insert into table tab1 partition (ds='2019-10-02', dt='2019-10-02') values(13,'d'); insert into table tab1 partition (ds='2019-10-03', dt='2019-10-03') values(14,'e'); insert into table tab1 partition (ds='2019-10-03', dt='2019-10-03') values(15,'f'); insert into table tab2 partition (ds='2019-10-01', dt='2019-10-01') values(23,'x'); insert into table tab2 partition (ds='2019-10-02', dt='2019-10-02') values(25,'y'); insert into table tab2 partition (ds='2019-10-03', dt='2019-10-03') values(27,'z'); set mapred.reduce.tasks=10; select * from tab1 a join tab2 b on a.key%10=b.key%10 and a.ds=b.ds and a.dt=b.dt;
三、总结
数据倾斜问题一直是大数据处理领域的难点和痛点,解决数据倾斜问题关乎整个大数据技术的发展和应用。在Hive运行中,可以采取对数据分布、Join操作、分桶列等方面的优化来解决数据倾斜问题。我们需要对各种优化方法进行不断的总结和实践,以期达到更优秀的处理效果。