倒排

题意

upload successful
hdfs 上有三个文件,内容下上面左面框中所示。右框中为处理完成后的结果文件。
倒排索引(Inverted index),也常被称为反向索引、置入档案或反向档案,是一种索引方法,被用来存储在全文搜索下某个单词在一个文档或者一组文档中的存储位置的映射。它是文档检索系统中最常用的数据结构。通过倒排索引,可以根据单词快速获取包含这个单词的文档列表。倒排索引主要由两个部分组成:“单词词典”和“倒排文件”
这个任务与传统的倒排索引任务不同的地方是加上了每个文件中的频数。

实现思路

首先关注结果中有文件名称,这个我们有两种方式处理:
1、自定义InputFormat,在其中的自定义RecordReader中,直接通过InputSplit得到Path,继而得到FileName;
2、在Mapper中,通过上下文可以取到Split,也可以得到fileName。这个任务中我们使用第二种方式,得到filename.
在mapper中,得到filename 及 word,封装到一个自定义keu中。value 使用IntWritable。在map 中直接输出值为1的IntWritable对象。
对进入reduce函数中的key进行分组控制,要求按word相同的进入同一次reduce调用。所以需要自定义GroupingComparator。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package com.qr.mr.invertedsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.LinkedHashMap;

/**
* http://blog.itpub.net/30066956/viewspace-2120238/
*/
public class InvertedSort {

static class WordKey implements WritableComparable<WordKey> {

private String fileName;
private String word;

@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(fileName);
out.writeUTF(word);
}

@Override
public void readFields(DataInput in) throws IOException {
this.fileName = in.readUTF();
this.word = in.readUTF();
}

@Override
public int compareTo(WordKey key) {
int r = word.compareTo(key.word);
if(r==0)
r = fileName.compareTo(key.fileName);
return r;
}

public String getFileName() {
return fileName;
}

public void setFileName(String fileName) {
this.fileName = fileName;
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}
}

public static class IndexInvertedMapper extends Mapper<LongWritable, Text,WordKey, IntWritable> {
private WordKey newKey = new WordKey();
private IntWritable ONE = new IntWritable(1);
private String fileName ;

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
newKey.setFileName(fileName);
String words [] = value.toString().split(" ");
for(String w:words){
newKey.setWord(w);
context.write(newKey, ONE);
}
}
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
fileName = inputSplit.getPath().getName();
}

}
public static class IndexInvertedReducer extends Reducer<WordKey,IntWritable,Text,Text> {
private Text outputKey = new Text();

@Override
protected void reduce(WordKey key, Iterable<IntWritable> values,Context context)
throws IOException, InterruptedException {
outputKey.set(key.getWord());
LinkedHashMap<String,Integer> map = new LinkedHashMap<String,Integer>();
for(IntWritable v :values){
if(map.containsKey(key.getFileName())){
map.put(key.getFileName(), map.get(key.getFileName())+ v.get());
}
else{
map.put(key.getFileName(), v.get());
}
}
StringBuilder sb = new StringBuilder();
sb.append("{");
for(String k: map.keySet()){
sb.append("(").append(k).append(",").append(map.get(k)).append(")").append(",");
}
sb.deleteCharAt(sb.length()-1).append("}");
context.write(outputKey, new Text(sb.toString()));
}

}
public static class IndexInvertedGroupingComparator extends WritableComparator {
Logger log = Logger.getLogger(getClass());
public IndexInvertedGroupingComparator(){
super(WordKey.class,true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
WordKey key1 = (WordKey) a;
WordKey key2 = (WordKey) b;
log.info("==============key1.getWord().compareTo(key2.getWord()):"+key1.getWord().compareTo(key2.getWord()));
return key1.getWord().compareTo(key2.getWord());
}

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"IndexInvertedJob");
job.setJarByClass(InvertedSort.class);

Path in = new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/invertedsort/in");
Path out = new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/invertedsort/out");
FileSystem.get(conf).delete(out,true);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setMapperClass(IndexInvertedMapper.class);
job.setMapOutputKeyClass(WordKey.class);
job.setMapOutputValueClass(IntWritable.class);

job.setReducerClass(IndexInvertedReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setGroupingComparatorClass(IndexInvertedGroupingComparator.class);

System.exit(job.waitForCompletion(true)? 0 : 1);
}
}

参考:
http://blog.itpub.net/30066956/viewspace-2120238/