GroupComparator原理

分析

最近看dadoop中关于辅助排序(SecondarySort)的实现,说到了三个东西要设置:1. partioner;2. Key Comparator;3. Group Comparator。前两个都比较容易理解,但是关于group的概念我一直理解不了:
一,有了partioner,所有的key已经放到一个分区了,每个分区对应一个reducer,而且key也可以排序了,那么不是实现了整个数据集的全排序了吗?
第二,mapper产生的中间结果经过shuffle和sort后,每个key整合成一个记录(集合),每次reduce方法调用处理这个记录(集合),但是group的目的是让一次reduce调用处理多条记录(将该集合进行内部分组),这不是矛盾吗,找了好久一直都没找到这个问题的清晰解释。

后来找到一本书,《Pro Hadoop》,里面有一部分内容详细解释了这个问题,看后终于明白了,和大家分享一下。reduce方法每次是读一条记录(集合),读到相应的key,但是处理value集合时,处理完当前记录的value后,还会判断下一条记录是不是和当前的key是不是同一个组,如果是的话,会继续读取这些记录的值,而这个记录也会被认为已经处理了,直到记录不是当前组,这次reduce调用才结束,这样一次reduce调用就会处理掉一个组中的所有记录,而不仅仅是一条完整的记录(集合)了。

这个有什么用呢?如果不用分组,那么同一组的记录就要在多次reduce方法中独立处理(所有的数据都在同一组中),那么有些状态数据就要传递了,就会增加复杂度,在一次调用中处理的话,这些状态只要用方法内的变量就可以的。比如查找最大值,只要读第一个值就可以了。

参考:
https://blog.csdn.net/qq_20641565/article/details/52770872

源码分析

目标:弄明白,我们配置的GroupComparator是如何对进入reduce函数中的key Iterable 进行影响。
如下是一个配置了GroupComparator 的reduce 函数。具体影响是我们可以在自定义的GroupComparator 中确定哪儿些value组成一组,进入一个reduce函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static class DividendGrowthReducer extends Reducer<Stock, DoubleWritable, NullWritable, DividendChange> {
private NullWritable outputKey = NullWritable.get();
private DividendChange outputValue = new DividendChange();

@Override
protected void reduce(Stock key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double previousDividend = 0.0;
for(DoubleWritable dividend : values) {
double currentDividend = dividend.get();
double growth = currentDividend - previousDividend;
if(Math.abs(growth) > 0.000001) {
outputValue.setSymbol(key.getSymbol());
outputValue.setDate(key.getDate());
outputValue.setChange(growth);
context.write(outputKey, outputValue);
previousDividend = currentDividend;
}
}
}
}

着先我们找到向上找,是谁调用了我们写的这个reduce函数。 Reducer类的run 方法。通过如下代码,可以看到是在run方法中,对于每个key,调用一次reduce函数。
此处传入reduce函数的都是对象引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Advanced application writers can use the
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.
*/
public void run(Context context) throws IOException, InterruptedException {
.....
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
.....
}
.....
}
}

结合我们写的reduce函数,key是在遍历value的时候会对应变化。
那我们继续跟踪context.getValues 得到的迭代器的next方法。context 此处是ReduceContext.java (接口). 对应的实现类为ReduceContextImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected class ValueIterable implements Iterable<VALUEIN> {
private ValueIterator iterator = new ValueIterator();
@Override
public Iterator<VALUEIN> iterator() {
return iterator;
}
}

/**
* Iterate through the values for the current key, reusing the same value
* object, which is stored in the context.
* @return the series of values associated with the current key. All of the
* objects returned directly and indirectly from this method are reused.
*/
public
Iterable<VALUEIN> getValues() throws IOException, InterruptedException {
return iterable;
}

直接返回了一个iterable。继续跟踪ValueIterable 类型的iterable。那明白了,在reduce 函数中进行Iterable的遍历,其实调用的是ValueIterable的next方法。下面看一下next的实现。

1
2
3
4
5
6
7
@Override
public VALUEIN next() {
………………
nextKeyValue();
return value;
………………
}

再继续跟踪nextKeyValue()方法。终于找了一个comparator。 这个就是我们配置的GroupingComparator.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
……………………………………
if (hasMore) {
nextKey = input.getKey();
nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0,
currentRawKey.getLength(),
nextKey.getData(),
nextKey.getPosition(),
nextKey.getLength() - nextKey.getPosition()
) == 0;
} else {
nextKeyIsSame = false;
}
inputValueCounter.increment(1);
return true;
}

为了证明这个就是我们配置的GroupingComparator。 跟踪ReduceContextImpl的构造调用者。 ReduceTask的run方法。

1
2
3
4
5
6
7
8
@Override
@SuppressWarnings("unchecked")
public void run(JobConf job, final TaskUmbilicalProtocol umbilical){
………………………………
RawComparator comparator = job.getOutputValueGroupingComparator();
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}

下面把runNewReducer 的代码也贴出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) {

org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);

好吧,关于自定义GroupingComparator如何起做用的代码分析,就到此吧。
参考:
http://blog.itpub.net/30066956/viewspace-2095520/