题意
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma
只是这次是假设一个没有重复,也就是不会出现Tom Lucy-Lucy Tom这样的好友关系表。
任务是求其其中的二度人脉、潜在好友,也就是如下图:
比如I认识C、G、H,但C不认识G,那么C-G就是一对潜在好友,但G-H早就认识了,因此不算为潜在好友。
最终得出的是,如下的一个,没有重复的,首字母接近A再前的,也就Tom Mary-MaryTom只输出Mary Tom,因为M比T更接近A:
Alice Jesse
Alice Jone
Alice Mark
Alice Philip
Alice Tom
Alma Terry
Ben Jone
Ben Mary
Ben Tom
Jack Lucy
Jack Terry
Jesse Jone
Jesse Mark
Jesse Philip
Jesse Tom
Jone Mary
Jone Tom
Mark Philip
Mary Tom
思路如下
首先,我们进行第一个MapReduce:
1、一个输入行,产生一对互逆的关系,压入context
例如Tom Lucy这个输入行就在Map阶段搞出Tom Lucy-Lucy Tom这样的互逆关系。
2、之后Map-reduce会自动对context中相同的key合并在一起。
例如由于存在Tom Lucy、Tom Jack,显然会产生一个Tom:{Lucy,Jack}
这是Reduce阶段以开始的键值对。
3、这个键值对相当于Tom所认识的人。先进行如下的输出,1代表Tom的一度人脉
Tom Lucy 1
Tom Jack 1
潜在好友显然会在{Lucy,Jack}这个Tom所认识的人产生,对这个数组做笛卡尔乘积,形成关系:{<Lucy,Lucy>,<Jack,Jack>,<Lucy,Jack>,<Jack,Lucy>}
将存在自反性,前项首字母大于后项剔除,也就是<Lucy,Lucy>这类无意义的剔除,<Lucy,Jack>,<Jack,Lucy>认定为一个关系,将剩余关系进行如下的输出,其中2代表Tom的二度人脉,也就是所谓的潜在好友:
Lucy Jack 2
此时,第一个MapReduce,输出如下:
Alice Jack 1
Alice Terry 1
Jack Terry 2
Alma Philip 1
Alma Mark 1
Mark Philip 2
Ben Lucy 1
Jack Jesse 1
Jack Tom 1
Jack Jone 1
Alice Jack 1
Jesse Tom 2
Jesse Jone 2
Jone Tom 2
Alice Jesse 2
Alice Tom 2
Alice Jone 2
Jesse Terry 1
Jack Jesse 1
Jack Terry 2
Jack Jone 1
Jone Lucy 1
Jack Lucy 2
Ben Lucy 1
Jone Lucy 1
Lucy Tom 1
Lucy Mary 1
Ben Jone 2
Ben Tom 2
Ben Mary 2
Jone Tom 2
Jone Mary 2
Mary Tom 2
Alma Mark 1
Mark Terry 1
Alma Terry 2
Lucy Mary 1
Philip Terry 1
Alma Philip 1
Alma Terry 2
Philip Terry 1
Alice Terry 1
Mark Terry 1
Jesse Terry 1
Alice Philip 2
Alice Mark 2
Alice Jesse 2
Mark Philip 2
Jesse Philip 2
Jesse Mark 2
Jack Tom 1
Lucy Tom 1
Jack Lucy 2
这时,形式已经很明朗了
再进行第二个Mapreduce
任务是剔除本身就存在的关系,也就是在潜在好友中剔除本身就认识的关系。
将上述第一个Mapreduce的输出,关系作为key,后面的X度人脉这个1、2值作为value,进行Mapreduce的处理。
那么例如
将被认定为二度人脉的关系输出,就得到最终结果。
代码
package com.qr.mr.deg2friend;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
//https://www.cnblogs.com/hxsyl/p/6127843.html
public class Deg2Friend {
public static class Map extends Mapper<Object,Text,Text,Text> {
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
context.write(new Text(line[0]),new Text(line[1]));
context.write(new Text(line[1]),new Text(line[0]));
}
}
public static class Reduce extends Reducer<Text,Text,Text,Text>{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
List<String> potentialFriends = new ArrayList<>();
//这里一定要用string存,而不是用Text,Text只是一个类似指针,一个MapReduce的引用
//换成用Text来存,你会惊讶,为何我存的值都一模一样的?
for (Text v:values) {
potentialFriends.add(v.toString());
if (key.toString().compareTo(v.toString()) < 0){// 确保首字母大者再前,如Tom Alice则输出Alice Tom
context.write(new Text(key + "\t" + v),new Text("1"));
} else {
context.write(new Text(v + "\t" + key),new Text("1"));
}
}
for (String p1:potentialFriends) {
for (String p2:potentialFriends) {
if (p1.compareTo(p2) < 0){//todo 将存在自反性,前项首字母大于后项的关系剔除
context.write(new Text(p1 +"\t" + p2),new Text("2"));
}
}
}
}
}
public static class Map2 extends Mapper<Object,Text,Text,Text>{
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] line = value.toString().split("\t");//输入文件,键值对的分隔符为\t
context.write(new Text(line[0] + "\t" + line[1]), new Text(line[2]));//关系作为key,后面的X度人脉这个1、2值作为value
}
}
public static class Reduce2 extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
//检查合并之后是否存在任意一对一度人脉
boolean isPotentialFriend = true;
for (Text v:values) {
if (v.toString().equals("1")){
isPotentialFriend = false;
break;
}
}
if (isPotentialFriend){
String[] potentialFriends = key.toString().split("\t");
context.write(new Text(potentialFriends[0]),new Text(potentialFriends[1]));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 判断output文件夹是否存在,如果存在则删除
Path outPath = new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/deg2friend/out");
FileSystem fs = outPath.getFileSystem(conf);
if (fs.exists(outPath)){
fs.delete(outPath,true);// true的意思是,就算output有东西,也一带删除
}
Job job = Job.getInstance(conf,"deg2friend");
job.setJarByClass(Deg2Friend.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);//todo keyClass
job.setOutputValueClass(Text.class);//todo valueClass
// 原来当mapper与reducer的输出类型一致时可以用 job.setOutputKeyClass(theClass)与job.setOutputValueClass
//(theClass)这两个进行配置就行,但是当mapper用于reducer两个的输出类型不一致的时候就需要分别进行配置了。
// job.setMapOutputKeyClass(Text.class); //todo 测试只有map的情况
// job.setMapOutputValueClass(Text.class);
FileInputFormat.addInputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/deg2friend/in"));
Path tempDir = new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/deg2friend/out/"
+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE))
);
FileOutputFormat.setOutputPath(job, tempDir);
if (job.waitForCompletion(true)){
Job job2 = Job.getInstance(conf,"job2");
job2.setMapperClass(Map2.class);
job2.setReducerClass(Reduce2.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2,tempDir);
FileOutputFormat.setOutputPath(job2,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/deg2friend/out/final"));
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}
}