richfunction 传递参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.examples.infoworld.helloword;
/**
*RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法,
* 这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息
*
*/

import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;

public class RichfunctionTest {
// ①传递参数
public static void main(String[] args) throws Exception {
//基于DataSet而非DataStream
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> ds = env.fromElements(1, 2, 3);

Configuration conf = new Configuration();
conf.setInteger("limit",2);

DataSet<Integer> ds1 = ds.filter(new RichFilterFunction<Integer>() {
//获取limit

private int limit;

// rich function新增方法
@Override
public void open(Configuration conf) throws Exception {
limit = conf.getInteger("limit", 0);
}

//map operator默认方法
@Override
public boolean filter(Integer integer) throws Exception {
return integer > limit;
}
}).withParameters(conf);//可以将Configuration中的limit参数的值传递进RichFuction里面,通过后面withParameters方法传递进去

ds1.print();//从configuration中获取了limit的值,并设定了fliter的阈值是2,从而过滤了1,2

}
}