richfunction 广播变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.examples.infoworld.helloword;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;

import java.util.Collection;

public class RichfunctionTest2 {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> ds1 = env.fromElements(1,2,3 );
DataSet<String> ds2 = env.fromElements("a","b" );

DataSet<String> ds3 = ds2.map(new RichMapFunction<String, String>() {
private Collection<Integer> broadcastSet;

@Override
public void open(Configuration conf) throws Exception {
broadcastSet = getRuntimeContext().getBroadcastVariable("ds1");
}

@Override
public String map(String s) throws Exception {
for (Integer i:broadcastSet){
System.out.println(i);
}
return s;
}
}).withBroadcastSet(ds1,"ds1");
ds3.print();
}
}