join

测试数据
user.txt (用户id,用户名)
1 用户1
2 用户2
3 用户3

more post.txt (用户id,帖子id,标题)
1 1 贴子1
1 2 贴子2
2 3 帖子3
4 4 贴子4
5 5 贴子5
5 6 贴子6
5 7 贴子7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package com.qr.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* * user.txt 用户表(用户id,用户名)
* * post.txt 帖子表(用户id,帖子id,标题)
*/
public class Join {
// 类型 U表示用户,P表示帖子
public static class UserMap extends Mapper<Object, Text,Text,Text> {
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
context.write(new Text(line[0]),new Text("U,"+value.toString()));
}
}

public static class PostMap extends Mapper<Object, Text,Text,Text> {
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
String[] line = value.toString().split(",");
context.write(new Text(line[0]),new Text("P,"+value.toString()));
}
}

public static class Reduce extends Reducer<Text,Text,Text,Text> {
private List<String> users = new ArrayList<String>();
private List<String> posts = new ArrayList<String>();
private String joinType;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
joinType = context.getConfiguration().get("joinType");
}

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//todo
users.clear();
posts.clear();

for (Text v:values) {
if (v.toString().contains("U,")){
users.add(v.toString().substring(2));
} else {
posts.add(v.toString().substring(2));
}
}

if (joinType.equals("innerJoin")){
if (users.size() > 0 && posts.size() >0){
for (String user:users){
for (String post:posts){
context.write(new Text(user),new Text(post));
}
}
}
}
if (joinType.equals("leftOuter")){
for (String user:users) {
if (posts.size() > 0){//todo
for (String post:posts) {
context.write(new Text(user),new Text(post));
}
} else {
context.write(new Text(user),new Text(",,"));
}
}
}
if (joinType.equals("rightOuter")){
for (String post:posts) {
if (users.size() > 0){
for (String user:users) {
context.write(new Text(user),new Text(post));
}
} else {
context.write(new Text(","),new Text(post));
}
}
}

if (joinType.equals("allOuter")){
if (users.size() > 0){
for (String user:users) {
if (posts.size() > 0){
for (String post:posts) {
context.write(new Text(user),new Text(post));
}
} else {
context.write(new Text(user),new Text(",,"));
}
}
} else {
for (String post:posts) {
context.write(new Text(","),new Text(post));
}
}
}

if (joinType.equals("anti")){
if (users.size() == 0 || posts.size() == 0){//todo
for (String user:users) {
context.write(new Text(user),new Text(",,"));
}
for (String post:posts) {
context.write(new Text(","),new Text(post));
}
}
}
}
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf,"Join");
job.setJarByClass(Join.class);
//设置连接类型
//innerJoin,leftOuter,rightOuter,allOuter,anti
job.getConfiguration().set("joinType","allOuter");

// job.setMapperClass(Map.class); //todo 不能单独设置
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//todo
MultipleInputs.addInputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/join/in/user.txt"), TextInputFormat.class,UserMap.class);
MultipleInputs.addInputPath(job,new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/join/in/post.txt"), TextInputFormat.class,PostMap.class);
Path outpath = new Path("/Users/lifei/qrproject/hiveudf/src/main/java/com/qr/mr/join/out");
FileSystem fs = outpath.getFileSystem(job.getConfiguration());
if (fs.exists(outpath)){
fs.delete(outpath,true);
}
FileOutputFormat.setOutputPath(job,outpath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}