全排序

错误写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package com.qr.mr.datasort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

/**
* http://www.aboutyun.com/thread-7046-1-1.html
*
* 这个实例仅仅要求对输入数据进行排序,熟悉MapReduce过程的读者会很快想到在MapReduce过程中就有排序,是否可以利用这个默认的排序,而不需要自己再实现具体的排序呢?
* 答案是肯定的。
*
* 但是在使用之前首先需要了解它的默认排序规则。它是按照key值进行排序的,
* 如果key为封装int的IntWritable类型,那么MapReduce按照数字大小对key排序,
* 如果key为封装为String的Text类型,那么MapReduce按照字典顺序对字符串排序。
*
* 了解了这个细节,我们就知道应该使用封装int的IntWritable型数据结构了。
* 也就是在map中将读入的数据转化成IntWritable型,然后作为key值输出(value任意)。
* reduce拿到<key,value-list>之后,将输入的key作为value输出,并根据value-list中元素的个数决定输出的次数。
* 输出的key(即代码中的linenum)是一个全局变量,它统计当前key的位次。
*
* 需要注意的是这个程序中没有配置Combiner,也就是在MapReduce过程中不使用Combiner。
* 这主要是因为使用map和reduce就已经能够完成任务了。
*
*
* https://blog.csdn.net/evo_steven/article/details/17139123
*/
public class DataSort {

public static class Map extends Mapper<Object, Text, IntWritable,Text> {
// private static IntWritable data = null;//todo 此处不能这样写,否则会报错
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String line = value.toString();
context.write(new IntWritable(Integer.parseInt(line)),new Text(""));
}
}

public static class Reduce extends Reducer<IntWritable,Text,IntWritable,IntWritable> {
// 每个reduce中也只是单独的全局变量,并非整个集群的全局变量,
// 一旦加入job.setNumReduceTasks(2);就会有两个文件,会出现错误结果
private static IntWritable count = new IntWritable(1); //todo IntWritable
public void reduce(IntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
for (Text val:values) {
context.write(count,key);
count = new IntWritable(count.get() + 1);
}
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"DataSort");
job.setJarByClass(DataSort.class);
job.setNumReduceTasks(2); //todo 如果有此处,实际环境中会报错

job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/datasort/in"));
FileOutputFormat.setOutputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/datasort/out"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

有问题的做法,缺少setPartitionerClass
设计模式中的全排序实现思路是 两次JOB,第一次做分区,第二次做排序,也用到了setPartitionerClass,reduce只负责输出
Partitinoner的作用除了快速找到key对应的reducer,更重要的一点是:这个Partitioner控制了排序的总体有序!

正确做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.qr.mr.datasort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;

import java.io.IOException;

public class DataSortNew {

public static class Map extends Mapper<Text, Text, Text,IntWritable> {
public void map(Text key,Text value,Context context) throws IOException, InterruptedException {
context.write(key,new IntWritable(Integer.parseInt(key.toString())));//todo key是Text,value也是key
}
}

public static class Reduce extends Reducer<Text,IntWritable,IntWritable, NullWritable> {//todo 此处的key也一定要跟着改为Text
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
for (IntWritable val:values) {
context.write(val,NullWritable.get());
}
}
}

public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(Text.class, true);
}

@Override
public int compare(WritableComparable w1, WritableComparable w2) {
int v1 = Integer.parseInt(w1.toString());
int v2 = Integer.parseInt(w2.toString());

return v1 - v2;
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
Job job = Job.getInstance(conf,"DataSortNew");

job.setJarByClass(DataSortNew.class);
FileInputFormat.addInputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/datasort/in"));
FileOutputFormat.setOutputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/datasort/out"));

job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setSortComparatorClass(KeyComparator.class);//todo

job.setNumReduceTasks(100); //todo
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);


// 在新版本的Hadoop中,内置了三个采样器: SplitSampler,RandomSampler和IntervalSampler。这三个采样器都是InputSampler类的静态内部类,并且都实现了InputSampler类的内部接口Sample
// https://flyingdutchman.iteye.com/blog/1878962

// 0.01-----------------每个样本被抽到的概率
// 1000------------------样本数
// 100--------------------分区数
String partitionPath="/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/datasort/ss/sampler";
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path(partitionPath));
InputSampler.RandomSampler<Text,Text> sampler =new InputSampler.RandomSampler<>(0.01,1000,100);
InputSampler.writePartitionFile(job,sampler);

job.setPartitionerClass(TotalOrderPartitioner.class);//todo
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

参考:
https://www.iteblog.com/archives/2146.html
https://www.iteblog.com/archives/2147.html
https://flyingdutchman.iteye.com/blog/1878962