wc

java v1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package com.examples.infoworld.helloword;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class wc {

public static void main(String[] args) throws Exception{

//环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//source
DataStreamSource<String> lines = env.socketTextStream("localhost", 9999);
//transformation
SingleOutputStreamOperator<WC> sum = lines
.flatMap(new FlatMapFunction<String, WC>() {
@Override
public void flatMap(String s, Collector<WC> collector) throws Exception {
for (String word : s.split(",")) {
collector.collect(new WC(word, 1L));
}
}
})
.keyBy("word")
.timeWindow(Time.seconds(4), Time.seconds(2))
.sum("count");
sum.print().setParallelism(1);
env.execute("wc");


}

public static class WC{
private String word;
private Long count;

public WC(String word, Long count) {
this.word = word;
this.count = count;
}

public WC() {
}

public String getWord() {
return word;
}

public void setWord(String word) {
this.word = word;
}

public Long getCount() {
return count;
}

public void setCount(Long count) {
this.count = count;
}

@Override
public String toString() {
return "WC{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}


}

注意引包是注意区分flink-streaming-java和flink-streaming-scala的区别,此工程为java工程所以只需要flink-streaming-java nc -lk 9999先启动,然后在启动工程

java v2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.examples.infoworld.helloword;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import scala.Tuple2;

public class wc2 {

public static void main(String[] args) {
ParameterTool tool = ParameterTool.fromArgs(args);
int port = tool.getInt("port",9999);
String hostname = tool.get("host","localhost");

//环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//source
DataStreamSource<String> lines = env.socketTextStream(hostname, port);
//代替wcl类,Tuple1表示一个参数,Tuple2表示两个参数,。。。。
lines.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word:s.split(",")){
out.collect(new Tuple2<String,Integer>(word,1));
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(4),Time.seconds(1))
.sum(1);
}
}

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.test.scala

//隐式转换
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time


object ScalaWc {

def main(args: Array[String]): Unit = {
//step 1 创建env
val env = StreamExecutionEnvironment.getExecutionEnvironment

//step2 创建source
val lines = env.socketTextStream("localhost",9999)

//step3 transformations
val results = lines.flatMap(x => x.split(","))
.map(x => wc(x,1))
.keyBy("word")
.timeWindow(Time.seconds(4),Time.seconds(2))
.sum("count")

//step4 sink
results.print().setParallelism(1)

//step exe
env.execute("ScalaWc")
}
case class wc(word:String ,count:Long)
}