输入文件 sort.txt 内容为
40 20
40 10
40 30
40 5
30 30
30 20
30 10
30 40
50 20
50 50
50 10
50 60
输出文件的内容(从小到大排序)如下
30 10
30 20
30 30
30 40
--------
40 5
40 10
40 20
40 30
--------
50 10
50 20
50 50
50 60
从输出的结果可以看出Key实现了从小到大的排序,同时相同Key的Value也实现了从小到大的排序,这就是二次排序的结果
在本例中要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair ,它有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。二次排序的流程分为以下几步。
1、自定义 key
2、自定义分区
3、Key的比较类
4、定义分组类函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159 package com.qr.mr.secondarysort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
public class SecondarySort {
// 1、自定义 key
public static class IntPair implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;
/**
* Read the two integers.
* Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
*/
@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}
/**
* Set the left and right values.
*/
public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
}
// 2、自定义分区
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable> {
@Override
public int getPartition(IntPair key, IntWritable value, int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
// 3、Key的比较类
public static class FirstGroupingComparator implements RawComparator<IntPair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8,
b2, s2, Integer.SIZE/8);
}
@Override
public int compare(IntPair o1, IntPair o2) {
int l = o1.getFirst();
int r = o2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}
public static class MapClass extends Mapper<Object, Text,IntPair, IntWritable>{
private final IntPair key = new IntPair();
private final IntWritable value = new IntWritable();
@Override
public void map(Object inKey, Text inValue,
Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(inValue.toString());
int left = 0;
int right = 0;
if (itr.hasMoreTokens()) {
left = Integer.parseInt(itr.nextToken());
if (itr.hasMoreTokens()) {
right = Integer.parseInt(itr.nextToken());
}
key.set(left, right);
value.set(right);
context.write(key, value);
}
}
}
public static class Reduce extends Reducer<IntPair,IntWritable,Text,IntWritable>{
private static final Text SEPARATOR =
new Text("------------------------------------------------");
private final Text first = new Text();
public void reduce(IntPair key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value: values) {
context.write(first, value);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// group and partition by the first int in the pair
job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);
// the map output is IntPair, IntWritable
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);
// the reduce output is Text, IntWritable
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/secondarysort/sort.txt"));
FileOutputFormat.setOutputPath(job, new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/secondarysort/out"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}