richfunction 传递参数 发表于 2017-02-21 | 分类于 flink 1234567891011121314151617181920212223242526272829303132333435363738394041424344package com.examples.infoworld.helloword;/** *RichFuction除了提供原来MapFuction的方法之外,还提供open, close, getRuntimeContext 和setRuntimeContext方法, * 这些功能可用于参数化函数(传递参数),创建和完成本地状态,访问广播变量以及访问运行时信息以及有关迭代中的信息 * */import org.apache.flink.api.common.functions.RichFilterFunction;import org.apache.flink.api.java.DataSet;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.configuration.Configuration;public class RichfunctionTest {// ①传递参数 public static void main(String[] args) throws Exception { //基于DataSet而非DataStream ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSet<Integer> ds = env.fromElements(1, 2, 3); Configuration conf = new Configuration(); conf.setInteger("limit",2); DataSet<Integer> ds1 = ds.filter(new RichFilterFunction<Integer>() { //获取limit private int limit; // rich function新增方法 @Override public void open(Configuration conf) throws Exception { limit = conf.getInteger("limit", 0); } //map operator默认方法 @Override public boolean filter(Integer integer) throws Exception { return integer > limit; } }).withParameters(conf);//可以将Configuration中的limit参数的值传递进RichFuction里面,通过后面withParameters方法传递进去 ds1.print();//从configuration中获取了limit的值,并设定了fliter的阈值是2,从而过滤了1,2 }}