hive使用和优化

Map数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
mapred.map.taskes=10;
(1)默认map个数
default_num = total_size / block_size;
(2)期望大小
goal_num = mapred.map.casks;
(3)设置处理的文件大小
split_size = max(mapred.min.split.size,block_size);
split_num = total_size / split_size;
(4)计算的map个数
compute_map_num = min(split_num,max(default_num,goal_num));

经过以上的分析,在设置map个数的时候,可以简单的总结为一下的几点:
(1)如果增大map个数,则设置mapred.map.tasks为一个较大的值
(2)如果想减小map个数,则设置mapred.min.split.size为一个较大的值。

情况1:输入文件size巨大,但不是小文件)
增大mapred.min.split.size的值
情况2:输入文件数量巨大,且都是小文件,就是单个文件的size小于blockSize。(造成网络负载大)
这种情况通过增大mapred.min.split.size不可行,需要使用CombineFileInputFormat将多个input path合并成一个InputSplit送个mapper处理,从而减少mapper的数量

使用complex type array 和 map 类型

创建表时指定类型和分隔符

1
2
3
4
5
6
create table test1  
(f1 array<int>,
f2 map<string,int>)
ROW FORMAT DELIMITED
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY ':';

文件中按照表定义的分隔符来分割,字段的分隔符默认为^A,可以在建表时通过 FIELDS TERMINATED BY ‘^A’ 指定

1
2
3
4
cat /home/hadoop/hivetest1.txt   

1|2|3^Aa:1|b:2
11|12|13^Aaa:1|bb:2

把文件 load 到表中

load data local inpath '/home/hadoop/hivetest1.txt' into table test1;

在查询中使用array 和 map 类型

select f1[0],f2['aa'] from test1; select * from test1 where f1[0]=11 and f2['aa']=1;

处理json字符串

create table test(json_str string);
test表的json_str字段的内容为:

1
2
3
4
5
6
7
{"a":"1","b":"2","c":["aa","11",{"aaa":"111","bbb":"222"}]}   



select get_json_object(json_str,'$.b') from test;
select get_json_object(json_str,'$.c[0]') from test;
select get_json_object(json_str,'$.c[2].aaa') from test;

动态分区

1
2
set hive.exec.dynamic.partition=true;   
set hive.exec.dynamic.partition.mode=nonstrict;

另外,如果默认值不够大,根据需要调大以下三个参数,否则会报错

1
2
3
set hive.exec.max.dynamic.partitions=2000  
set hive.exec.max.dynamic.partitions.pernode=1000
set hive.exec.max.created.files=110000;

这样,从另外一个表把数据导入到目标表时,就会自动按照指定字段分区。
例如:

insert overwrite table A partition(reportdate) select * from B;
表A要先创建,并且以reportdate作为分区的字段。在上面sql中不用指定reportdate的值(如果是非动态分区则需要指定)。表B最后一个字段作为分区的字段,会自动根据最后一个字段的值自动分区。

map join 优化

将hive.auto.convert.join设置为true

set hive.auto.convert.join=true;
设置进行mapjoin的小表的阈值(即当小表的大小 少于 一定该阈值时 hive 会做 mapjoin 优化,如果不设置,则采用默认值)

1
2
set hive.mapjoin.smalltable.filesize=25000000;  
hive.mapjoin.bucket.cache.size=100

每一个key有多少个value值缓存在内存中
hive.mapjoin.cache.numrows=25000
有多少行cache在内存中

insert into/overwrite多表插入的优化

通过对原表扫描一遍,插入到不同的目的表中

1
2
3
4
from table  
insert overwrite table dest1 select * where col=condition ;
insert overwrite table dest2 select * where col=condition ;
insert overwrite table dest3 select * where col=condition ;

对limit语句的优化

1
2
3
4
hive.limit.optimize.enable=false  
hive.limit.row.max.size=100000
hive.limit.optimize.limit.file=10
hive.limit.optimize.fetch.max=50000

对reduce个数的设置

hive默认是根据输入的大小来设置,每个reducer处理的数据量是1G。如果有10G的输入数据,则hive自动生成10个reducer。
默认情况下一个reducer处理1G的数据,这样一个reducer处理的数据量太大,可以改小一些。

1
2
3
hiive.exec.reducers.bytes.per.reducer=1G  
hive.exec.reducers.max=999
mapred.reduce.tasks

数据倾斜优化

有编译器的优化和运行期的优化

1
2
3
4
5
6
7
8
9
10
11
12
hive.groupby.skewindata=false  
hive.optimize.skewjoin.compiletime=false
hive.optimize.skewjoin=false
hive.skewjoin.key=100000

默认值:false
hive.skewjoin.key //倾斜键数目阈值,超过此值则判定为一个倾斜的 Join 查询。
默认值: 1000000
hive.skewjoin.mapjoin.map.tasks //处理数据倾斜的 Map Join 的 Map 数上限。
默认值: 10000
hive.skewjoin.mapjoin.min.split //处理数据倾斜的 Map Join 的最小数据切分大小,以字节为单位,默认为32M。
默认值:33554432

文件压缩

1
2
3
4
5
6
7
8
9
10
11
中间压缩就是处理hive查询的多个job之间的数据,对于中间压缩,最好选择一个节省CPU耗时的压缩方式   
hive.exec.compress.intermediate=true;//决定查询的中间 map/reduce job (中间 stage)的输出是否为压缩格式

hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec; //中间 map/reduce job 的压缩编解码器的类名(一个压缩编解码器可能包含多种压缩类型),该值可能在程序中被自动设置。

hive.intermediate.compression.type=BLOCK (压缩单元为块压缩) //中间 map/reduce job 的压缩类型,如 "BLOCK""RECORD"

hive查询最终的输出也可以压缩
hive.exec.compress.output=true; //决定查询中最后一个 map/reduce job 的输出是否为压缩格式
mapred.output.compression.codec=orgapache.hadoop.io.compress.GzipCodec; // 压缩格式
mapred.output.compression.type=BLOCK //压缩类型

并行执行

一个hive sql语句编译成多个MR作业,没有依赖关系的作业可以并行执行。

1
2
hive.exec.parallel=false  
hive.exec.parallel.thread.number=8

合并结果的小文件

对于hive的结果中是小文件的,会再起一个MR作业合并小文件。

1
2
3
4
hive.merge.mapfiles=true  
hive.merge.mapredfiles=false
hive.merge.size.per.task=256000000
hive.merge.smallfiles.avgsize=16000000

推断执行

1
hive.mapred.reduce.tasks.speculative.execution=true

设置reduce个数

显示设置reduce的个数,或者每个reduce处理的数据的大小(默认1G的值很多时候有些大,可以设置小一些,同时reduce的内存也要相应小一些,提高并行度)。

1
2
3
mapred.reduce.tasks=-1  
hive.exec.reducers.bytes.per.reducer=1000000000(1G)
hive.exec.reducers.max=999

map端聚合

hive.map.aggr=true
group by语句时现在map聚合一次,减少传输到reduce的数据,就是mapreduce的combiner。默认是打开的。

列剪裁

列剪裁优化,只对需要的列进行处理,忽略其他的列,减少数据的处理量。默认是打开的。

hive.optimize.cp=true

本地模式

一些sql处理的数据量比较少,或者计算量比较少,可以在本地运行而不是MR作业运行,这样性能会更好些,也节约hadoop集群的资源。

1
2
hive.exec.mode.local.auto=false  
hive.fetch.task.conversion=minimal

当一个job满足如下条件会使用本地模式:
①job的输入数据大小必须小于:
hive.exec.mode.local.auto.inputbytes.max (默认128)
②job的map数必须小于参数:
hive.exec.mode.local.auto.tasks.max (默认4)
③job的reduce数必须为0或者1

测试模式

1
2
3
4
hive.test.mode=false  
hive.test.mode.prefix=test_
hive.test.mode.samplefreq=32
hive.test.mode.nosamplelist=table1,table2,table3

用于测试,通过采样减少输入的数据,结果表前面加前缀“test_”

严格模式

1
hive.mapred.mode=nonstrict

通过严格模式,使一些不严格的用法不通过,防止潜在的错误。
No partition being picked up for a query.
Comparing bigints and strings.
Comparing bigints and doubles.
Orderby without limit.

jvm重用

JVM重用是hadoop调优参数的内容,对hive的性能具有非常大的 影响,特别是对于很难避免小文件的场景或者task特别多的场景,这类场景大多数执行时间都很短。hadoop默认配置是使用派生JVM来执行map和 reduce任务的,这是jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。

JVM重用可以使得JVM实例在同一个JOB中重新使用N次,N的值可以在Hadoop的mapre-site.xml文件中进行设置

1
mapred.job.reuse.jvm.num.tasks

也可在hive的执行设置:

1
set  mapred.job.reuse.jvm.num.tasks=10;

JVM的一个缺点是,开启JVM重用将会一直占用使用到的task插槽,以便进行重用,直到任务完成后才能释放。如果某个“不平衡“的job中有几个 reduce task 执行的时间要比其他reduce task消耗的时间多得多的话,那么保留的插槽就会一直空闲着却无法被其他的job使用,直到所有的task都结束了才会释放。

hive参数优化之默认启用本地模式

启动hive本地模式参数,一般建议将其设置为true,即时刻启用:

1
2
hive (chavin)> set hive.exec.mode.local.auto;    
hive.exec.mode.local.auto=false

推测执行相关配置

1
2
3
4
5
6
7
8
9
10
11
hive (default)> set mapred.map.tasks.speculative.execution;

mapred.map.tasks.speculative.execution=true

hive (default)> set mapred.reduce.tasks.speculative.execution;

mapred.reduce.tasks.speculative.execution=true

hive (default)> set hive.mapred.reduce.tasks.speculative.execution;

hive.mapred.reduce.tasks.speculative.execution=true

单个mapreduce中运行多个group by

参数hive.multigroupby.singlemr控制师徒将查询中的多个group by组装到单个mapreduce任务中。如果启用这个优化,那么需要一组常用的group by键:

1
2
3
4
5
6
7
8
9
10
11
12
13
例子:

select Provice,city,county,count(rainfall) from area where data="2018-09-02" group by provice,city,count
select Provice,count(rainfall) from area where data="2018-09-02" group by provice

 

#使用multi group by
from area
 insert overwrite table temp1
  select Provice,city,county,count(rainfall) from area where data="2018-09-02" group by provice,city,count
 insert overwrite table temp2
  select Provice,count(rainfall) from area where data="2018-09-02" group by provice

聚合优化:

启用参数:hive.map.aggr=true默认开启

参数hive.fetch.task.conversion的调优:

默认值:hive.fetch.task.conversion=minimal

建议值:set hive.fetch.task.conversion=more;
原理:
对于简单的不需要聚合的类似 SELECT from

LIMIT n 语句,不需要起 MapReduce job ,直接通过 Fetch task 获取数据
https://blog.csdn.net/liyaohhh/article/details/50675267

内表转外表

1
ALTER TABLE xxx SET TBLPROPERTIES ('EXTERNAL'='TRUE');

参考:
https://www.cnblogs.com/duanxingxing/p/4535842.html
https://blog.csdn.net/z_l_l_m/article/details/8773505
http://www.cnblogs.com/yshb/p/3147710.html