Hive小文件合并问题

Hive小文件合并问题

输入合并

1
2
3
4
5
6
7
8
--hive-0.8.0及其之后默认就是CombineHiveInputFormat,可以将多个小文件合并成一个split切片,也可以将一个大文件,分成N个split(即N个map的输入)
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
--切片大小最大值,不设置,则所有输入只启动一个map任务
mapreduce.input.fileinputformat.split.maxsize
--同一节点的数据块形成切片时,切片大小的最小值
mapreduce.input.fileinputformat.split.minsize.per.node
--同一机架的数据块形成切片时,切片大小的最小值
mapreduce.input.fileinputformat.split.minsize.per.rack

大小关系:

  • maxSplitSize >= minSplitSizeRack >= minSplitSizeNode
  • 否则会抛出IOException

周一测试一下,minSplitSizeNode与minSplitSizeRack大小设置是否会报IOException异常?

1
2
3
4
// CombineFileInputFormat.java 代码中有这样一段,表示minsize.per.node <= minsize.per.rack ,否则抛异常
} else if (minSizeRack != 0L && minSizeNode > minSizeRack) {
throw new IOException("Minimum split size per node" + minSizeNode + " cannot be smaller than minimum split size per rack " + minSizeRack);
} else {

// hive-1.2.1 上进行验证:

1
2
3
4
hive> set mapreduce.input.fileinputformat.split.minsize.per.node=2;
hive> set mapreduce.input.fileinputformat.split.minsize.per.rack=1;
Job Submission failed with exception 'java.io.IOException(Minimum split size per node 2 cannot be larger than minimum split size per rack 1)'
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. Minimum split size per node 2 cannot be larger than minimum split size per rack 1

mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize.per.node
mapreduce.input.fileinputformat.split.minsize.per.rack
上面三个参数如果=0,那么在调用CombineFileInputFormat.getSplit()方式时,会分别使用下面三个参数值填充
mapred.max.split.size
mapred.min.split.size.per.node
mapred.min.split.size.per.rack

  • 这个需要到hive中确认一下默认参数

形成切片的过程:
可以参考下面博客内容:
https://blog.csdn.net/hellojoy/article/details/104866468

  1. 不断迭代节点列表,逐个节点(以数据块为单位)形成切片(Local Split):
    1. 如果maxSplitSize==0,则整个节点上的block数据形成一个切片
    2. 如果maxSplitSize!=0,遍历并累加每个节点上的数据,如果累加数据块大小>= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < maxSplitSize。则进行一下步
    3. 如果剩下数据块累加大小 >= minSplitSizeNode, 则将这些累加的剩余数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < minSplitSizeNode。则进行一下步,并将这些数据块留下来待后续处理。
  2. 不断迭代机架列表,逐个机架(以数据块为单位)形成切片(Rack Split):
    1. 遍历并累加这个机架上所有节点的数据块(这些数据块即上一步遗留下来的数据块),如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该步骤,直到剩余数据块累加大小 < maxSplitSize,则进行下一步。
    2. 如果剩余数据块累加大小 >= minSplitSizeRack,则将这些剩余数据块形成一个切片。如果剩余数据块累加大小 < minSplitSizeRack,则这些数据块留待后续处理。
  3. 遍历并累加所有Rack上的剩余数据块,如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < maxSplitSize,则进行下一步。
  4. 将最终剩余的数据块形成一个切片

重点是,当启用自动合并功能时,避免因为文件过小,而参数过大,导致最终小文件只能通过步骤4来进行合并,应该尽可能利用前面的方式来处理

对hive输入格式设置为CombineHiveInputFormat的进行分析map数是如何计算的。

1
set hive.input.format=org.apache.hadoop.hive.al.io.CombineHiveInputFormat

map数与逻辑split数是一致的,决定map的主要因素有:

1、相关表或分区input的文件个数

2、input文件的大小

3、input文件在node和rack的分布

4、set mapred.max.split.size; 最大split大小

5、set mapred.min.split.size.per.node; 一个节点上最小的split大小

6、set mapred.min.split.size.per.rack; 一个机架上最小的split大小

输出合并

例如:

1. 不能设置为0,否则,还是合并为一个mapper。

1
2
3
4
5
6
7
8
hive> set mapreduce.input.fileinputformat.split.minsize.per.rack=0;
hive> set mapreduce.input.fileinputformat.split.minsize.per.node=0;
...
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2019-05-28 15:42:09,538 Stage-1 map = 0%, reduce = 0%

set mapreduce.input.fileinputformat.split.minsize.per.node=0
-- 不能设置为0,否则,还是合并为一个mapper。

2. 测试开启输出合并小文件的参数后,第二个merge job的per.node和per.rack数值的变化

1
2
3
4
5
6
7
8
create table titan_mass.t_20210309145  row format delimited fields terminated by '\t' stored as orc tblproperties ('orc.compress'='SNAPPY') as

set mapreduce.input.fileinputformat.split.maxsize=4096000000;
set hive.merge.mapfiles=false;
set mapreduce.input.fileinputformat.split.minsize.per.node=2;
set mapreduce.input.fileinputformat.split.minsize.per.rack=1;
set mapreduce.job.queuename=titan_low;
create table titan_mass.tmp_052815_02 row format delimited fields terminated by '\t' stored as orc tblproperties ('orc.compress'='SNAPPY') as select * from simba.dw_evt where usernum='15295558956' and import_time>='2021032800' and import_time<'2021032900';

表的文件个数和大小

1
2
3
[root@web04.simba.bd.com ~]# hdfs dfs -count -h -v /apps/hive/warehouse/titan_mass.db/tmp_052815_02
DIR_COUNT FILE_COUNT CONTENT_SIZE PATHNAME
1 159 336.2 K /apps/hive/warehouse/titan_mass.db/tmp_052815_02
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
--测试开启输出合并小文件的参数后,第二个merge job的per.node和per.rack数值的变化
hive> set hive.merge.mapfiles=true;
hive> set hive.merge.mapredfiles=true;
hive> set mapreduce.input.fileinputformat.split.maxsize=4096000000;
hive> set hive.merge.size.per.task=256000000;
hive> set hive.merge.smallfiles.avgsize=16000000;
hive> set mapreduce.input.fileinputformat.split.minsize.per.node=1;
hive> set mapreduce.input.fileinputformat.split.minsize.per.rack=1;
hive> set mapreduce.job.queuename=titan_low;
hive> create table test.tmp_smallf_04021417 row format delimited fields terminated by '\t' stored as orc tblproperties ('orc.compress'='SNAPPY') as select * from titan_mass.tmp_052815_02;
Query ID = root_20210402142118_d7bd5e86-1772-4cd3-863b-812db5214b9c
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there''s no reduce operator
Starting Job = job_1615708732706_61993, Tracking URL = http://ambari01.simba.bd.com:8088/proxy/application_1615708732706_61993/
Kill Command = /usr/hdp/2.6.1.0-129/hadoop/bin/hadoop job -kill job_1615708732706_61993
Hadoop job information for Stage-1: number of mappers: 29; number of reducers: 0
2021-04-02 14:21:30,821 Stage-1 map = 0%, reduce = 0%
2021-04-02 14:21:37,394 Stage-1 map = 34%, reduce = 0%, Cumulative CPU 18.55 sec
2021-04-02 14:21:38,447 Stage-1 map = 76%, reduce = 0%, Cumulative CPU 94.1 sec
2021-04-02 14:21:39,502 Stage-1 map = 90%, reduce = 0%, Cumulative CPU 110.59 sec
2021-04-02 14:21:40,552 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 124.24 sec
MapReduce Total cumulative CPU time: 2 minutes 4 seconds 240 msec
Ended Job = job_1615708732706_61993
Stage-4 is filtered out by condition resolver.
Stage-3 is selected by condition resolver.
Stage-5 is filtered out by condition resolver.
Starting Job = job_1615708732706_61996, Tracking URL = http://ambari01.simba.bd.com:8088/proxy/application_1615708732706_61996/
Kill Command = /usr/hdp/2.6.1.0-129/hadoop/bin/hadoop job -kill job_1615708732706_61996
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2021-04-02 14:21:49,260 Stage-3 map = 0%, reduce = 0%
2021-04-02 14:21:55,635 Stage-3 map = 100%, reduce = 0%, Cumulative CPU 5.09 sec
MapReduce Total cumulative CPU time: 5 seconds 90 msec
Ended Job = job_1615708732706_61996
Moving data to directory hdfs://ambari01/apps/hive/warehouse/test.db/tmp_smallf_04021417
Table test.tmp_smallf_04021417 stats: [numFiles=1, numRows=146, totalSize=90414, rawDataSize=446513]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 29 Cumulative CPU: 124.24 sec HDFS Read: 742866 HDFS Write: 127642 SUCCESS
Stage-Stage-3: Map: 1 Cumulative CPU: 5.09 sec HDFS Read: 338055 HDFS Write: 90414 SUCCESS
Total MapReduce CPU Time Spent: 2 minutes 9 seconds 330 msec
OK
Time taken: 39.689 seconds

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
application_1615708732706_61993的Configuration

mapreduce.input.fileinputformat.split.maxsize 4096000000 job.xml ⬅ programatically
hive.merge.size.per.task 256000000 job.xml ⬅ programatically
hive.merge.smallfiles.avgsize 16000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.node 1 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.rack 1 job.xml ⬅ programatically

开启merge参数后自动启来的job
application_1615708732706_61996的Configuration

mapreduce.input.fileinputformat.split.maxsize 256000000 job.xml ⬅ programatically
hive.merge.size.per.task 256000000 job.xml ⬅ programatically
hive.merge.smallfiles.avgsize 16000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.node 256000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.rack 256000000 job.xml ⬅ programatically

3. 在session级别per.node和per.rack参数都设置=1的情况下,下面几个任务都是merge map job

1
2
3
4
5
6
7
8
http://115.15.3.203:19888/jobhistory/conf/job_1558144373688_14700

hive.merge.size.per.task 256000000 job.xml ⬅ file:/data/bigdata/tbds/etc/hive/conf.server/hive-site.xml
hive.merge.smallfiles.avgsize 16000000 job.xml ⬅ file:/data/bigdata/tbds/etc/hive/conf.server/hive-site.xml
mapreduce.input.fileinputformat.split.maxsize 256000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize 4096000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.node 256000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.rack 256000000 job.xml ⬅ programatically
1
2
3
4
5
6
7
8
http://115.15.3.203:19888/jobhistory/conf/job_1558144373688_13167

hive.merge.size.per.task 2000000 job.xml ⬅ programatically
hive.merge.smallfiles.avgsize 16000000 job.xml ⬅ file:/data/bigdata/tbds/etc/hive/conf.server/hive-site.xml
mapreduce.input.fileinputformat.split.maxsize 16000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize 4096000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.node 16000000 job.xml ⬅ programatically
mapreduce.input.fileinputformat.split.minsize.per.rack 16000000 job.xml ⬅ programatically

从上面2,3的结果可知道,开启merge参数后自动启来的job中决定生成split的三个参数会受到hive.merge.size.per.taskhive.merge.smallfiles.avgsize两个参数中值较大的参数的影响

处理输出合并时的操作结论

所以我们的策略是不开启 merge job,而是单独起一个任务来处理,执行insert overwrite来重新插入一次,主要是防止merge job仅起一个map导致加载数据过慢

  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2022-2023 ligongzhao
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信