hive join倾斜

hive执行引擎会将HQL“翻译”成为map-reduce任务,如果多张表使用同一列做join则将被翻译成一个reduce,否则将被翻译成多个map-reduce任务。
一般来说(map side join除外),map过程负责分发数据,具体的join操作在reduce完成,因此,如果多表基于不同的列做join,则无法在一轮map-reduce任务中将所有相关数据shuffle到统一个reducer
对于多表join,hive会将前面的表缓存在reducer的内存中,然后后面的表会流式的进入reducer和reducer内存中其它的表做join.
为了防止数据量过大导致oom,将数据量最大的表放到最后,或者通过“STREAMTABLE”显示指定reducer流式读入的表

common join

1
SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
  • Map阶段
    读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
    Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表upload successful
    该图中tag在value中
    按照key进行排序

  • Shuffle阶段
    根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中

  • Reduce阶段
    reduce阶段主要看它是如何把它合并起来了,从图上可以直观的看到,其实就是把tag=1的内容,都加到tag=0的后面,就是这么简单。

upload successful
该图中tag在key.key这里后面的数字是tag,后面在reduce阶段用来区分来自于那个表的数据。tag是附属在key后面的。那为什么会把a(0)和a(1)汇集在一起了呢,是因为对先对a求了hashcode,设在了HiveKey上,所以同一个key还是在一起的。该方式虽然可以解释通过,但不是真的具体实现。

1
2
3
4
5
6
7
8
9
10
在reduce阶段join。 
map阶段标记数据来自哪个文件,比如来自file1标记tag=1,来自file2标记tag=2。

reduce阶段把key相同的file1的数据和file2的数据通过笛卡尔乘积join在一起。

个人理解:举个例子
file1 有{1:'a', 2:'b', 3:'c'}
file2 有{1:'A', 2:'B'}

可以join成{1:['a','A'], 2:['b', 'B']}

operator

upload successful

explain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
hive> explain SELECT a.uid,a.name,b.age FROM logs a JOIN users b ON (a.uid=b.uid);
OK

//语法树
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME logs) a) (TOK_TABREF (TOK_TABNAME users) b) (= (. (TOK_TABLE_OR_COL a) uid) (. (TOK_TABLE_OR_COL b) uid)))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) uid)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL a) name)) (TOK_SELEXPR (. (TOK_TABLE_OR_COL b) age)))))

//阶段
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 is a root stage

STAGE PLANS:
Stage: Stage-1
Map Reduce
Alias -> Map Operator Tree: //mapper阶段
a
TableScan //扫描表, 就只是一行一行的传递下去而已
alias: a
Reduce Output Operator //输出给reduce的内容
key expressions: // key啦,这里的key是uid,就是我们写在ON子句那个,你可以试试加多几个条件
expr: uid
type: string
sort order: + //排序
Map-reduce partition columns://分区字段,貌似是和key一样的
expr: uid
type: string
tag: 0 //用来区分这个key是来自哪个表的
value expressions: //reduce用到的value字段
expr: uid
type: string
expr: name
type: string
b
TableScan //扫描表, 就只是一行一行的传递下去而已
alias: b
Reduce Output Operator //输出给reduce的内容
key expressions: //key
expr: uid
type: string
sort order: +
Map-reduce partition columns: //分区字段
expr: uid
type: string
tag: 1 //用来区分这个key是来自哪个表的
value expressions: //值
expr: age
type: int
Reduce Operator Tree: // reduce阶段
Join Operator // JOIN的Operator
condition map:
Inner Join 0 to 1 // 内连接0和1表
condition expressions: // 第0个表有两个字段,分别是uid和name, 第1个表有一个字段age
0 {VALUE._col0} {VALUE._col1}
1 {VALUE._col1}
handleSkewJoin: false //是否处理倾斜join,如果是,会分为两个MR任务
outputColumnNames: _col0, _col1, _col6 //输出字段
Select Operator //列裁剪(我们sql写的select字段)
expressions:
expr: _col0
type: string
expr: _col1
type: string
expr: _col6
type: int
outputColumnNames: _col0, _col1, _col2
File Output Operator //把结果输出到文件
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat

Stage: Stage-0
Fetch Operator
limit: -1

可以看到里面都是一个个Operator顺序的执行下来

map join

MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
Hive0.7之前,需要使用hint提示 /+ mapjoin(table) /才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数hive.auto.convert.join来控制,默认为true.
假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为MapJoin。

upload successful
如图中的流程,首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中,该HashTable的数据结构可以抽象为:

upload successful
图中红框圈出了执行Local Task的信息。

接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。
总的来说,因为小表的存在,可以在Map阶段直接完成Join的操作,为了优化小表的查找速度,将其转化为HashTable的结构,并加载进分布式缓存中。

Auto Map Join

upload successful
还记得原理中提到的物理优化器?Physical Optimizer么?它的其中一个功能就是把Join优化成Auto Map Join

图上左边是优化前的,右边是优化后的

优化过程是把Join作业前面加上一个条件选择器ConditionalTask和一个分支。左边的分支是MapJoin,右边的分支是Common Join(Reduce Join)

看看左边的分支是不是和我们上上一张图很像?

这个时候,我们在执行的时候,就由这个Conditional Task 进行实时路径选择,遇到小于25兆走左边,大于25兆走右边。所谓,男的走左边,女的走右边,人妖走中间。

在比较新版的Hive中,Auto Mapjoin是默认开启的。如果没有开启,可以使用一个开关, set hive.auto.convert.join=true 开启。
当然,Join也会遇到和上面的Group By一样的倾斜问题。

Hive 也可以通过像Group By一样两道作业的模式单独处理一行或者多行倾斜的数据。即采用下面的方式。

semi join半连接

这个是要改进reduce side join。建立一个小表file3,把file1的所有要参加join的数据的key复制进去,然后把file3复制到每一个map task中去,然后找出不在file2中的key,过滤掉这些数据后再进行reduce side join,减少跨机器数据传输。

个人理解:举个例子
file1 有{1:’a’, 2:’b’, 3:’c’}
file2 有{1:’A’, 2:’B’}

建立一个小表file3,所有要参加join的数据的key复制进去也就是
[1, 2, 3],然后发现file2中没有key=3的,所以可以过滤掉key=3的数据后再进行reduce join,来减少跨机器数据传输。
加入bloom filter
继续改进3。引入bloom filter。这种数据结构的特点是,存在false positive。如果使用它判断一个元素在集合中(positive),那其实有可能不在(false)。但是如果使用它判断一个元素不在集合中,那这个元素就真的不在这个集合中了。(没有false negative)

利用这个特点可以怎么改进3呢?在3中的file3我们可以用bloom filter来实现,要判断file2的key是否存在于file3中的时候直接使用bloom filter来判断。这样,如果判断说file2的某个key存在于file3中(positive),但是实际不在(false),那也无所谓,只是少过滤了一些key而已,还是可以正确地join。但是bloom filter可以保证没有false negative,如果判断file2的某个key不在file3中,那就真的不在file3中,这样可以保证join的正确性(不会少join了一些数据)。

hive.optimize.skewjoin

hive 中设定

1
2
set hive.optimize.skewjoin = true; 
set hive.skewjoin.key = skew_key_threshold (default = 100000)

可以就按官方默认的1个reduce 只处理1G 的算法,那么skew_key_threshold= 1G/平均行长.或者默认直接设成250000000 (差不多算平均行长4个字节)

其原理是就在Reduce Join过程,把超过十万条的倾斜键的行写到文件里,回头再起一道Join单行的Map Join作业来单独收拾它们。最后把结果取并集就是了

upload successful
对full outer join无效。

其他方法

  1. null或者某个无效字符太多导致数据倾斜。
    null=null结果是null,即false,在join时关联不上,join之前去掉不影响结果;
    ‘’关联得上,但是不需要时产生不必要的脏数据,可以在join之前把key为null/‘’的值去掉。
    因为null关联不上如果null有用不能去掉,可以用下面两种方法。
    办法(1)用union all
    1
    2
    3
    4
    5
    Select * From log a 
       Join users b
    On a.user_id is not null And a.user_id = b.user_id
      Union all
    Select * from log a where a.user_id is null;

办法(2)赋予null值新的随机值

1
2
3
4
5
6
Select * 
from log a
   left Join
bmw_users b
on case when a.user_id is null then concat('dp_hive',rand())
   else a.user_id end = b.user_id;

方法1的log读取两次,jobs是2。方法2的job数是1。这个优化适合无效id(比如-99,’’,null等)产生的倾斜问题。
把空值的key变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上 ,解决数据倾斜问题。

  1. 不同数据类型关联也会产生数据倾斜。
    场景:一张表s8的日志,每个商品一条记录,要和商品表关联。但关联却碰到倾斜的问题,s8的日志中有字符串商品id,也有数字的商品id,类型是string的,但商品中的数字id是bigint的。
    问题原因:把s8的商品id转成数字id做hash来分配reduce,所以字符串id的s8日志,都到一个reduce上了,解决的方法验证了这个猜测。
    解决方法:把数字类型转换成字符串类型

    1
    2
    3
    Select * from s8_log a
     Left outer join r_auction_auctions b
      On a.auction_id = cast(b.auction_id as string);
  2. 大表Join的数据偏斜
    MapReduce编程模型下开发代码需要考虑数据偏斜的问题,Hive代码也是一样。数据偏斜的原因包括以下两点:
      1. Map输出key数量极少,导致reduce端退化为单机作业。
      2. Map输出key分布不均,少量key对应大量value,导致reduce端单机瓶颈。
    Hive中我们使用MapJoin解决数据偏斜的问题,即将其中的某个小表(全量)分发到所有Map端的内存进行Join,从而避免了reduce。这要求分发的表可以被全量载入内存。
    极限情况下,Join两边的表都是大表,就无法使用MapJoin。这种问题最为棘手,目前已知的解决思路有两种:

    1. 如果是上述情况1,考虑先对Join中的一个表去重,以此结果过滤无用信息。
      这样一般会将其中一个大表转化为小表,再使用MapJoin 。一个实例是广告投放效果分析,
         例如将广告投放者信息表i中的信息填充到广告曝光日志表w中,使用投放者id关联。因为实际广告投放者数量很少(但是投放者信息表i很大),因此可以考虑先在w表中去重查询所有实际广告投放者id列表,以此Join过滤表i,这一结果必然是一个小表,就可以使用MapJoin

      1
      2
      3
      4
      5
        select /*+mapjoin(x)*/* from log a left outer join (
        select /*+mapjoin(c)*/d.*
        from ( select distinct user_id from log ) c
        join users d on c.user_id = d.user_id
      ) x on a.user_id = b.user_id;
    2. 如果是上述情况2,考虑切分Join中的一个表为多片,以便将切片全部载入内存,然后采用多次MapJoin得到结果。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      select * from  
      (
      select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
      from w left outer join i sampletable(1 out of 2 on id) i1
      )
      union all
      (
      select w.id, w.time, w.amount, i2.name, i2.loc, i2.cat
      from w left outer join i sampletable(1 out of 2 on id) i2
      )
      );

以下语句实现了left outer join逻辑:

1
2
3
4
5
6
7
8
select t1.id, t1.time, t1.amount,  
coalease(t1.name, t2.name),
coalease(t1.loc, t2.loc),
coalease(t1.cat, t2.cat)
from (
select w.id, w.time, w.amount, i1.name, i1.loc, i1.cat
from w left outer join i sampletable(1 out of 2 on id) i1
) t1 left outer join i sampletable(2 out of 2 on id) t2;

上述语句使用Hive的sample table特性对表做切分。

  1. 如果A表关联B表在某个key上倾斜,B是码表,那么可以将B表放大1000倍,然后对A表的key字段加上一个1000以内hash后缀,然后和放大后的b表关联,可以解决问题。

参考:
https://blog.csdn.net/lw_ghy/article/details/51469753
https://www.cnblogs.com/skyl/p/4855099.html
https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization
https://blog.csdn.net/u013668852/article/details/79768266