Flink 双数据流转换为单数据流操作的运算有cogroup, join和coflatmap
- Join:只输出条件匹配的元素对。
- CoGroup: 除了输出匹配的元素对以外,未能匹配的元素也会输出。
- CoFlatMap:没有匹配条件,不进行匹配,分别处理两个流的元素。在此基础上完全可以实现join和cogroup的功能,比他们使用上更加自由。
Join
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
- 创建一个socket stream。本机9000端口。输入的字符串以空格为界分割成Array[String]。然后再取出其中前两个元素组成(String, String)类型的tuple。
- 同上。端口变为9001。
- join条件为两个流中的数据((String, String)类型)第一个元素相同。
- 为测试方便,这里使用session window。只有两个元素到来时间前后相差不大于30秒之时才会被匹配。(Session window的特点为,没有固定的开始和结束时间,只要两个元素之间的时间间隔不大于设定值,就会分配到同一个window中,否则后来的元素会进入新的window)。
- 将window默认的trigger修改为count trigger。这里的含义为每到来一个元素,都会立刻触发计算。
- 处理匹配到的两个数据,例如到来的数据为(1, “a”)和(1, “b”),输出到下游则为”a<=>b”
下面我们测试下程序。
打开两个terminal,分别输入 nc -lk 127.0.0.1 9000 和 nc -lk 127.0.0.1 9001。
在terminal1中输入,1 a,然后在terminal2中输入2 b。观察程序console,发现没有输出。这两条数据不满足匹配条件,因此没有输出。
在30秒之内输入1 c,发现程序控制台输出了结果a<=>c。再输入1 d,控制台输出a<=>c和a<=>d两个结果。
等待30秒之后,在terminal2中输入1 e,发现控制台无输出。由于session window的效果,该数据和之前stream1中的数据不在同一个window中。因此没有匹配结果,控制台不会有输出。
综上我们得出结论:
- join只返回匹配到的数据对。若在window中没有能够与之匹配的数据,则不会有输出。
- join会输出window中所有的匹配数据对。
- 不在window内的数据不会被匹配到。
CoGroup
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
经过同样的测试我们得出结论:
CoGroup的作用和join基本相同,但有一点不一样的是,如果未能找到新到来的数据与另一个流在window中存在的匹配数据,仍会将其输出。
CoFlatMap
1 | public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable { |
简单理解就是当stream1数据到来时,会调用flatMap1方法,stream2收到数据之时,会调用flatMap2方法。1
2
3
4
5
6
7
8
9stream1.connect(stream2).flatMap(new CoFlatMapFunction[(String, String), (String, String), String] {
override def flatMap1(value: (String, String), out: Collector[String]): Unit = {
println("stream1 value: " + value)
}
override def flatMap2(value: (String, String), out: Collector[String]): Unit = {
println("stream2 value: " + value)
}
}).print()
总结
Join、CoGroup和CoFlatMap这三个运算符都能够将双数据流转换为单个数据流。Join和CoGroup会根据指定的条件进行数据配对操作,不同的是Join只输出匹配成功的数据对,CoGroup无论是否有匹配都会输出。CoFlatMap没有匹配操作,只是分别去接收两个流的输入。大家可以根据具体的业务需求,选择不同的双流操作。
参考:
https://blog.csdn.net/lmalds/article/details/51743038
http://www.imooc.com/article/264552
https://www.jianshu.com/p/aa7a0828a300
https://forum.huawei.com/enterprise/zh/thread-452555.html
https://www.cnblogs.com/yunqishequ/p/10027909.html