Flink核心框架的执行流程
前言
Flink是大数据处理领域最近很火的一个开源的分布式、高性能的流式处理框架,其对数据的处理可以达到毫秒级别。本文以一个来自官网的WordCount例子为引,全面阐述flink的核心架构及执行流程,希望读者可以借此更加深入的理解Flink逻辑。
本文跳过了一些基本概念,如果对相关概念感到迷惑,请参考官网文档。另外在本文写作过程中,Flink正式发布了其1.5 RELEASE版本,在其发布之后完成的内容将按照1.5的实现来组织。
1.从 Hello,World WordCount开始
首先,我们把WordCount的例子再放一遍:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45 public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
if (args.length != 2){
System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
return;
}
String hostName = args[0];
Integer port = Integer.parseInt(args[1]);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// get input data
DataStream<String> text = env.socketTextStream(hostName, port);
text.flatMap(new LineSplitter()).setParallelism(1)
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1).setParallelism(1)
.print();
// execute program
env.execute("Java WordCount from SocketTextStream Example");
}
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
*/
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
首先从命令行中获取socket对端的ip和端口,然后启动一个执行环境,从socket中读取数据,split成单个单词的流,并按单词进行总和的计数,最后打印出来。这个例子相信接触过大数据计算或者函数式编程的人都能看懂,就不过多解释了。
1.1 flink执行环境
程序的启动,从这句开始:1
2
3这行代码会返回一个可用的执行环境。执行环境是整个flink程序执行的上下文,记录了相关配置(如并行度等),并提供了一系列方法,如读取输入流的方法,以及真正开始运行整个代码的execute方法等。对于分布式流处理程序来说,我们在代码中定义的flatMap,keyBy等等操作,事实上可以理解为一种声明,告诉整个程序我们采用了什么样的算子,而真正开启计算的代码不在此处。由于我们是在本地运行flink程序,因此这行代码会返回一个LocalStreamEnvironment,最后我们要调用它的execute方法来开启真正的任务。我们先接着往下看。
### 1.2 算子(Operator)的注册(声明)
我们以flatMap为例,```text.flatMap(new LineSplitter())```这一句话跟踪进去是这样的:
public
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
1 | 里面完成了两件事,一是用反射拿到了flatMap算子的输出类型,二是生成了一个Operator。flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator,并且为了本地化处理的效率起见,operator之间也可以串成一个chain一起处理(可以参考责任链模式帮助理解)。下面这张图表明了flink是如何看待用户的处理流程的:抽象化为一系列operator,以source开始,以sink结尾,中间的operator做的操作叫做transform,并且可以把几个操作串在一起执行。 |
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(jarFiles, globalClasspaths, getClass().getClassLoader());1
然后创建一系列配置,交给Client对象。Client这个词有意思,看见它就知道这里绝对是跟远程集群打交道的客户端。
ClusterClient client;
try {
client = new StandaloneClusterClient(configuration);
client.setPrintStatusDuringExecution(getConfig().isSysoutLoggingEnabled());
}
}
try {
return client.run(streamGraph, jarFiles, globalClasspaths, usercodeClassLoader).getJobExecutionResult();
}
1 |
|
JobListeningContext jobListeningContext = submitJob(
actorSystem,config,highAvailabilityServices,jobGraph,timeout,sysoutLogUpdates, classLoader);
return awaitJobResult(jobListeningContext);
1 | 可以看到,该方法阻塞在awaitJobResult方法上,并最终返回了一个JobListeningContext,透过这个Context可以得到程序运行的状态和结果。 |
env.execute(“Java WordCount from SocketTextStream Example”);1
2远程模式和本地模式有一点不同,我们先按本地模式来调试。
我们跟进源码,(在本地调试模式下)会启动一个miniCluster,然后开始执行代码:
// LocalStreamEnvironment.java
@Override
public JobExecutionResult execute(String jobName) throws Exception {
//生成各种图结构
......
try {
//启动集群,包括启动JobMaster,进行leader选举等等
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
//提交任务到JobMaster
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
1 |
|
//MiniCluster.java
public JobExecutionResult executeJobBlocking(JobGraph job) throws JobExecutionException, InterruptedException {
checkNotNull(job, “job is null”);
//在这里,最终把job提交给了jobMaster
final CompletableFuture<JobSubmissionResult> submissionFuture = submitJob(job);
final CompletableFuture<JobResult> jobResultFuture = submissionFuture.thenCompose(
(JobSubmissionResult ignored) -> requestJobResult(job.getJobID()));
......
}
1 | 正如我在注释里写的,这一段代码核心逻辑就是调用那个```submitJob```方法。那么我们再接着看这个方法: |
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final DispatcherGateway dispatcherGateway;
try {
dispatcherGateway = getDispatcherGateway();
} catch (LeaderRetrievalException | InterruptedException e) {
ExceptionUtils.checkInterrupted(e);
return FutureUtils.completedExceptionally(e);
}
// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
final CompletableFuture<Void> jarUploadFuture = uploadAndSetJarFiles(dispatcherGateway, jobGraph);
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture.thenCompose(
//在这里执行了真正的submit操作
(Void ack) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout));
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
1 |
|
//jobManagerRunner.java
private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) throws Exception {
......
final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);
......
}
1 | 然后,JobMaster经过了一堆方法嵌套之后,执行到了这里: |
private void scheduleExecutionGraph() {
checkState(jobStatusListener == null);
// register self as job status change listener
jobStatusListener = new JobManagerJobStatusListener();
executionGraph.registerJobStatusListener(jobStatusListener);
try {
//这里调用了ExecutionGraph的启动方法
executionGraph.scheduleForExecution();
}
catch (Throwable t) {
executionGraph.failGlobal(t);
}
}
1 | 我们知道,flink的框架里有三层图结构,其中ExecutionGraph就是真正被执行的那一层,所以到这里为止,一个任务从提交到真正执行的流程就走完了,我们再回顾一下(顺便提一下远程提交时的流程区别): |
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
//注意,StreamGraph的生成是从sink开始的
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
//这个方法的核心逻辑就是判断传入的steamOperator是哪种类型,并执行相应的操作,详情见下面那一大堆if-else
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
//这里对操作符的类型进行判断,并以此调用相应的处理逻辑.简而言之,处理的核心无非是递归的将该节点和节点的上游节点加入图
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
//注意这里和函数开始时的方法相对应,在有向图中要注意避免循环的产生
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
1 | 因为map,filter等常用操作都是OneInputStreamOperator,我们就来看看```transformOneInputTransform((OneInputTransformation<?, ?>) transform)```方法。 |
private <IN, OUT> Collection
Collection<Integer> inputIds = transform(transform.getInput());
// 在递归处理节点过程中,某个节点可能已经被其他子节点先处理过了,需要跳过
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
//这里是获取slotSharingGroup。这个group用来定义当前我们在处理的这个操作符可以跟什么操作符chain到一个slot里进行操作
//因为有时候我们可能不满意flink替我们做的chain聚合
//一个slot就是一个执行task的基本容器
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
//把该operator加入图
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
//对于keyedStream,我们还要记录它的keySelector方法
//flink并不真正为每个keyedStream保存一个key,而是每次需要用到key的时候都使用keySelector方法进行计算
//因此,我们自定义的keySelector方法需要保证幂等性
//到后面介绍keyGroup的时候我们还会再次提到这一点
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
//为当前节点和它的依赖节点建立边
//这里可以看到之前提到的select union partition等逻辑节点被合并入edge的过程
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID,
downStreamVertexID,
typeNumber,
null,
new ArrayList<String>(),
null);
}
//addEdge的实现,会合并一些逻辑节点
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag) {
//如果输入边是侧输出节点,则把side的输入边作为本节点的输入边,并递归调用
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
//如果输入边是select,则把select的输入边作为本节点的输入边
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
//如果是partition节点
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else {
//正常的edge处理逻辑
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
1 | #### 2.2.3 WordCount函数的StreamGraph |
private JobGraph createJobGraph() {
// 设置启动模式为所有节点均在一开始就启动
jobGraph.setScheduleMode(ScheduleMode.EAGER);
// 为每个节点生成hash id
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// 为了保持兼容性创建的hash
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
//生成jobvertex,串成chain等
//这里的逻辑大致可以理解为,挨个遍历节点,如果该节点是一个chain的头节点,就生成一个JobVertex,如果不是头节点,就要把自身配置并入头节点,然后把头节点和自己的出边相连;对于不能chain的节点,当作只有头节点处理即可
setChaining(hashes, legacyHashes, chainedOperatorHashes);
//设置输入边edge
setPhysicalEdges();
//设置slot共享group
setSlotSharing();
//配置检查点
configureCheckpointing();
// 如果有之前的缓存文件的配置的话,重新读入
for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
}
// 传递执行环境配置
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
1 | #### 2.3.2 operator chain的逻辑 |
Future
该方法长200多行,其中一大半是checkpoiont的相关逻辑,我们暂且略过,直接看核心方法1
因为ExecutionGraph事实上只是改动了JobGraph的每个节点,而没有对整个拓扑结构进行变动,所以代码里只是挨个遍历jobVertex并进行处理:
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
//在这里生成ExecutionGraph的每个节点
//首先是进行了一堆赋值,将任务信息交给要生成的图节点,以及设定并行度等等
//然后是创建本节点的IntermediateResult,根据本节点的下游节点的个数确定创建几份
//最后是根据设定好的并行度创建用于执行task的ExecutionVertex
//如果job有设定inputsplit的话,这里还要指定inputsplits
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
rpcCallTimeout,
globalModVersion,
createTimestamp);
//这里要处理所有的JobEdge
//对每个edge,获取对应的intermediateResult,并记录到本节点的输入上
//最后,把每个ExecutorVertex和对应的IntermediateResult关联起来
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
1 | 至此,ExecutorGraph就创建完成了。 |
def runJobManager(
configuration: Configuration,
executionMode: JobManagerMode,
listeningAddress: String,
listeningPort: Int)
: Unit = {
val numberProcessors = Hardware.getNumberCPUCores()
val futureExecutor = Executors.newScheduledThreadPool(
numberProcessors,
new ExecutorThreadFactory("jobmanager-future"))
val ioExecutor = Executors.newFixedThreadPool(
numberProcessors,
new ExecutorThreadFactory("jobmanager-io"))
val timeout = AkkaUtils.getTimeout(configuration)
// we have to first start the JobManager ActorSystem because this determines the port if 0
// was chosen before. The method startActorSystem will update the configuration correspondingly.
val jobManagerSystem = startActorSystem(
configuration,
listeningAddress,
listeningPort)
val highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
configuration,
ioExecutor,
AddressResolution.NO_ADDRESS_RESOLUTION)
val metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(configuration))
metricRegistry.startQueryService(jobManagerSystem, null)
val (_, _, webMonitorOption, _) = try {
startJobManagerActors(
jobManagerSystem,
configuration,
executionMode,
listeningAddress,
futureExecutor,
ioExecutor,
highAvailabilityServices,
metricRegistry,
classOf[JobManager],
classOf[MemoryArchivist],
Option(classOf[StandaloneResourceManager])
)
} catch {
case t: Throwable =>
futureExecutor.shutdownNow()
ioExecutor.shutdownNow()
throw t
}
// block until everything is shut down
jobManagerSystem.awaitTermination()
.......
}1
2
3
4
5
6
7
8 - 配置Akka并生成ActorSystem,启动JobManager
- 启动HA和metric相关服务
- 在```startJobManagerActors()```方法中启动JobManagerActors,以及webserver,TaskManagerActor,ResourceManager等等
- 阻塞等待终止
- 集群通过LeaderService等选出JobManager的leader
#### 3.2.3 JobManager启动Task
JobManager 是一个Actor,通过各种消息来完成核心逻辑:
override def handleMessage: Receive = {
case GrantLeadership(newLeaderSessionID) =>
log.info(s”JobManager $getAddress was granted leadership with leader session ID “ +
s”$newLeaderSessionID.”)
leaderSessionID = newLeaderSessionID
.......
1 | 有几个比较重要的消息: |
private def submitJob(jobGraph: JobGraph, jobInfo: JobInfo, isRecovery: Boolean = false): Unit = {
......
executionGraph = ExecutionGraphBuilder.buildGraph(
executionGraph,
jobGraph,
flinkConfiguration,
futureExecutor,
ioExecutor,
scheduler,
userCodeLoader,
checkpointRecoveryFactory,
Time.of(timeout.length, timeout.unit),
restartStrategy,
jobMetrics,
numSlots,
blobServer,
log.logger)
......
if (leaderElectionService.hasLeadership) {
log.info(s"Scheduling job $jobId ($jobName).")
executionGraph.scheduleForExecution()
} else {
self ! decorateMessage(RemoveJob(jobId, removeJobFromStateBackend = false))
log.warn(s"Submitted job $jobId, but not leader. The other leader needs to recover " +
"this. I am not scheduling the job for execution.")
......
}1
首先做一些准备工作,然后获取一个ExecutionGraph,判断是否是恢复的job,然后将job保存下来,并且通知客户端本地已经提交成功了,最后如果确认本JobManager是leader,则执行```executionGraph.scheduleForExecution()```方法,这个方法经过一系列调用,把每个ExecutionVertex传递给了Excution类的deploy方法:
public void deploy() throws JobException {
......
try {
// good, we are allowed to deploy
if (!slot.setExecutedVertex(this)) {
throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
}
// race double check, did we fail/cancel and do we need to release the slot?
if (this.state != DEPLOYING) {
slot.releaseSlot();
return;
}
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(),
attemptNumber, getAssignedResourceLocation().getHostname()));
}
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskState,
attemptNumber);
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final CompletableFuture<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
......
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
1 | 我们首先生成了一个TaskDeploymentDescriptor,然后交给了```taskManagerGateway.submitTask()```方法执行。接下来的部分,就属于TaskManager的范畴了。 |
val task = new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId,
tdd.getAllocationId,
tdd.getSubtaskIndex,
tdd.getAttemptNumber,
tdd.getProducedPartitions,
tdd.getInputGates,
tdd.getTargetSlotNumber,
tdd.getTaskStateHandles,
memoryManager,
ioManager,
network,
bcVarManager,
taskManagerConnection,
inputSplitProvider,
checkpointResponder,
blobCache,
libCache,
fileCache,
config,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
context.dispatcher)1
如果读者是从头开始看这篇blog,里面有很多对象应该已经比较明确其作用了(除了那个brVarManager,这个是管理广播变量的,广播变量是一类会被分发到每个任务中的共享变量)。接下来的主要任务,就是把这个task启动起来,然后报告说已经启动task了:
// all good, we kick off the task, which performs its own initialization
task.startTaskThread()
sender ! decorateMessage(Acknowledge.get())1
2
3#### 3.3.2.1 生成Task对象
在执行new Task()方法时,第一步是把构造函数里的这些变量赋值给当前task的fields。
接下来是初始化ResultPartition和InputGate。这两个类描述了task的输出数据和输入数据。
for (ResultPartitionDeploymentDescriptor desc: resultPartitionDeploymentDescriptors) {
ResultPartitionID partitionId = new ResultPartitionID(desc.getPartitionId(), executionId);
this.producedPartitions[counter] = new ResultPartition(
taskNameWithSubtaskAndId,
this,
jobId,
partitionId,
desc.getPartitionType(),
desc.getNumberOfSubpartitions(),
desc.getMaxParallelism(),
networkEnvironment.getResultPartitionManager(),
resultPartitionConsumableNotifier,
ioManager,
desc.sendScheduleOrUpdateConsumersMessage());
//为每个partition初始化对应的writer
writers[counter] = new ResultPartitionWriter(producedPartitions[counter]);
++counter;
}
// Consumed intermediate result partitions
this.inputGates = new SingleInputGate[inputGateDeploymentDescriptors.size()];
this.inputGatesById = new HashMap<>();
counter = 0;
for (InputGateDeploymentDescriptor inputGateDeploymentDescriptor: inputGateDeploymentDescriptors) {
SingleInputGate gate = SingleInputGate.create(
taskNameWithSubtaskAndId,
jobId,
executionId,
inputGateDeploymentDescriptor,
networkEnvironment,
this,
metricGroup.getIOMetricGroup());
inputGates[counter] = gate;
inputGatesById.put(gate.getConsumedResultId(), gate);
++counter;
}1
2
3
4最后,创建一个Thread对象,并把自己放进该对象,这样在执行时,自己就有了自身的线程的引用。
#### 3.3.2.2 运行Task对象
Task对象本身就是一个Runable,因此在其run方法里定义了运行逻辑。
第一步是切换Task的状态:
while (true) {
ExecutionState current = this.executionState;
////如果当前的执行状态为CREATED,则将其设置为DEPLOYING状态
if (current == ExecutionState.CREATED) {
if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
// success, we can start our work
break;
}
}
//如果当前执行状态为FAILED,则发出通知并退出run方法
else if (current == ExecutionState.FAILED) {
// we were immediately failed. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
//如果当前执行状态为CANCELING,则将其修改为CANCELED状态,并退出run
else if (current == ExecutionState.CANCELING) {
if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
// we were immediately canceled. tell the TaskManager that we reached our final state
notifyFinalState();
if (metrics != null) {
metrics.close();
}
return;
}
}
//否则说明发生了异常
else {
if (metrics != null) {
metrics.close();
}
throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
}
}
1 | 其实这里有个值得关注的点,就是flink里大量使用了这种while(true)的写法来修改和检测状态,emmm... |
invokable.invoke();1
2方法。为什么这么说呢,因为这个方法就是用户代码所真正被执行的入口。比如我们写的什么new MapFunction()的逻辑,最终就是在这里被执行的。这里说一下这个invokable,这是一个抽象类,提供了可以被TaskManager执行的对象的基本抽象。
这个invokable是在解析JobGraph的时候生成相关信息的,并在此处形成真正可执行的对象
// now load the task’s invokable code
//通过反射生成对象
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);1
2
3
4
5![image_1cbkaa8r9182i18ct1kfu8g829m9.png-29.9kB][17]
上图显示了flink提供的可被执行的Task类型。从名字上就可以看出各个task的作用,在此不再赘述。
接下来就是invoke方法了,因为我们的wordcount例子用了流式api,在此我们以StreamTask的invoke方法为例进行说明。
#### 3.3.2.3 StreamTask的执行逻辑
先上部分核心代码:
public final void invoke() throws Exception {
boolean disposed = false;
try {
// -------- Initialize ---------
//先做一些赋值操作
......
// if the clock is not already set, then assign a default TimeServiceProvider
//处理timer
if (timerService == null) {
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName());
timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
}
//把之前JobGraph串起来的chain的信息形成实现
operatorChain = new OperatorChain<>(this);
headOperator = operatorChain.getHeadOperator();
// task specific initialization
//这个init操作的起名非常诡异,因为这里主要是处理算子采用了自定义的checkpoint检查机制的情况,但是起了一个非常大众脸的名字
init();
// save the work of reloading state, etc, if the task is already canceled
if (canceled) {
throw new CancelTaskException();
}
// -------- Invoke --------
LOG.debug("Invoking {}", getName());
// we need to make sure that any triggers scheduled in open() cannot be
// executed before all operators are opened
synchronized (lock) {
// both the following operations are protected by the lock
// so that we avoid race conditions in the case that initializeState()
// registers a timer, that fires before the open() is called.
//初始化操作符状态,主要是一些state啥的
initializeState();
//对于富操作符,执行其open操作
openAllOperators();
}
// final check to exit early before starting to run
f (canceled) {
throw new CancelTaskException();
}
// let the task do its work
//真正开始执行的代码
isRunning = true;
run();
1 | StreamTask.invoke()方法里,第一个值得一说的是```TimerService```。Flink在2015年决定向StreamTask类加入timer service的时候解释到: |
protected void run() throws Exception {
// cache processor reference on the stack, to make the code more JIT friendly
final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
while (running && inputProcessor.processInput()) {
// all the work happens in the "processInput" method
}
}
1 | 它做的,就是把任务直接交给了InputProcessor去执行processInput方法。这是一个```StreamInputProcessor```的实例,该processor的任务就是处理输入的数据,包括用户数据、watermark和checkpoint数据等。我们先来看看这个processor是如何产生的: |
public void init() throws Exception {
StreamConfig configuration = getConfiguration();
TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
int numberOfInputs = configuration.getNumberOfInputs();
if (numberOfInputs > 0) {
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<>(
inputGates,
inSerializer,
this,
configuration.getCheckpointMode(),
getCheckpointLock(),
getEnvironment().getIOManager(),
getEnvironment().getTaskManagerInfo().getConfiguration(),
getStreamStatusMaintainer(),
this.headOperator);
// make sure that stream tasks report their I/O statistics
inputProcessor.setMetricGroup(getEnvironment().getMetricGroup().getIOMetricGroup());
}
}
1 | 这是OneInputStreamTask的init方法,从configs里面获取StreamOperator信息,生成自己的inputProcessor。那么inputProcessor是如何处理数据的呢?我们接着跟进源码: |
public boolean processInput() throws Exception {
if (isFinished) {
return false;
}
if (numRecordsIn == null) {
numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
}
//这个while是用来处理单个元素的(不要想当然以为是循环处理元素的)
while (true) {
//注意 1在下面
//2.接下来,会利用这个反序列化器得到下一个数据记录,并进行解析(是用户数据还是watermark等等),然后进行对应的操作
if (currentRecordDeserializer != null) {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
currentRecordDeserializer.getCurrentBuffer().recycle();
currentRecordDeserializer = null;
}
if (result.isFullRecord()) {
StreamElement recordOrMark = deserializationDelegate.getInstance();
//如果元素是watermark,就准备更新当前channel的watermark值(并不是简单赋值,因为有乱序存在),
if (recordOrMark.isWatermark()) {
// handle watermark
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel);
continue;
} else if (recordOrMark.isStreamStatus()) {
//如果元素是status,就进行相应处理。可以看作是一个flag,标志着当前stream接下来即将没有元素输入(idle),或者当前即将由空闲状态转为有元素状态(active)。同时,StreamStatus还对如何处理watermark有影响。通过发送status,上游的operator可以很方便的通知下游当前的数据流的状态。
// handle stream status
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel);
continue;
} else if (recordOrMark.isLatencyMarker()) {
//LatencyMarker是用来衡量代码执行时间的。在Source处创建,携带创建时的时间戳,流到Sink时就可以知道经过了多长时间
// handle latency marker
synchronized (lock) {
streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
}
continue;
} else {
//这里就是真正的,用户的代码即将被执行的地方。从章节1到这里足足用了三万字,有点万里长征的感觉
// now we can do the actual processing
StreamRecord<IN> record = recordOrMark.asRecord();
synchronized (lock) {
numRecordsIn.inc();
streamOperator.setKeyContextElement1(record);
streamOperator.processElement(record);
}
return true;
}
}
}
//1.程序首先获取下一个buffer
//这一段代码是服务于flink的FaultTorrent机制的,后面我会讲到,这里只需理解到它会尝试获取buffer,然后赋值给当前的反序列化器
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
else {
isFinished = true;
if (!barrierHandler.isEmpty()) {
throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
}
return false;
}
}
}
1 | 到此为止,以上部分就是一个flink程序启动后,到执行用户代码之前,flink框架所做的准备工作。回顾一下: |
public class StreamSource<OUT, SRC extends SourceFunction
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator
......
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
LatencyMarksEmitter latencyEmitter = null;
if (getExecutionConfig().isLatencyTrackingEnabled()) {
latencyEmitter = new LatencyMarksEmitter<>(
getProcessingTimeService(),
collector,
getExecutionConfig().getLatencyTrackingInterval(),
getOperatorConfig().getVertexID(),
getRuntimeContext().getIndexOfThisSubtask());
}
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
try {
userFunction.run(ctx);
// if we get here, then the user function either exited after being done (finite source)
// or the function was canceled or stopped. For the finite source case, we should emit
// a final watermark that indicates that we reached the end of event-time
if (!isCanceledOrStopped()) {
ctx.emitWatermark(Watermark.MAX_WATERMARK);
}
} finally {
// make sure that the context is closed in any case
ctx.close();
if (latencyEmitter != null) {
latencyEmitter.close();
}
}
}
......
private static class LatencyMarksEmitter<OUT> {
private final ScheduledFuture<?> latencyMarkTimer;
public LatencyMarksEmitter(
final ProcessingTimeService processingTimeService,
final Output<StreamRecord<OUT>> output,
long latencyTrackingInterval,
final int vertexID,
final int subtaskIndex) {
latencyMarkTimer = processingTimeService.scheduleAtFixedRate(
new ProcessingTimeCallback() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
try {
// ProcessingTimeService callbacks are executed under the checkpointing lock
output.emitLatencyMarker(new LatencyMarker(timestamp, vertexID, subtaskIndex));
} catch (Throwable t) {
// we catch the Throwables here so that we don't trigger the processing
// timer services async exception handler
LOG.warn("Error while emitting latency marker.", t);
}
}
},
0L,
latencyTrackingInterval);
}
public void close() {
latencyMarkTimer.cancel(true);
}
}
}1
在StreamSource生成上下文之后,接下来就是把上下文交给SourceFunction去执行:
userFunction.run(ctx);1
2SourceFunction是对Function的一个抽象,就好像MapFunction,KeyByFunction一样,用户选择实现这些函数,然后flink框架就能利用这些函数进行计算,完成用户逻辑。
我们的wordcount程序使用了flink提供的一个```SocketTextStreamFunction```。我们可以看一下它的实现逻辑,对source如何运行有一个基本的认识:
public void run(SourceContext
final StringBuilder buffer = new StringBuilder();
long attempt = 0;
while (isRunning) {
try (Socket socket = new Socket()) {
currentSocket = socket;
LOG.info("Connecting to server socket " + hostname + ':' + port);
socket.connect(new InetSocketAddress(hostname, port), CONNECTION_TIMEOUT_TIME);
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] cbuf = new char[8192];
int bytesRead;
//核心逻辑就是一直读inputSocket,然后交给collect方法
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
buffer.append(cbuf, 0, bytesRead);
int delimPos;
while (buffer.length() >= delimiter.length() && (delimPos = buffer.indexOf(delimiter)) != -1) {
String record = buffer.substring(0, delimPos);
// truncate trailing carriage return
if (delimiter.equals("\n") && record.endsWith("\r")) {
record = record.substring(0, record.length() - 1);
}
//读到数据后,把数据交给collect方法,collect方法负责把数据交到合适的位置(如发布为br变量,或者交给下个operator,或者通过网络发出去)
ctx.collect(record);
buffer.delete(0, delimPos + delimiter.length());
}
}
}
// if we dropped out of this loop due to an EOF, sleep and retry
if (isRunning) {
attempt++;
if (maxNumRetries == -1 || attempt < maxNumRetries) {
LOG.warn("Lost connection to server socket. Retrying in " + delayBetweenRetries + " msecs...");
Thread.sleep(delayBetweenRetries);
}
else {
// this should probably be here, but some examples expect simple exists of the stream source
// throw new EOFException("Reached end of stream and reconnects are not enabled.");
break;
}
}
}
// collect trailing data
if (buffer.length() > 0) {
ctx.collect(buffer.toString());
}
}
1 | 整段代码里,只有collect方法有些复杂度,后面我们在讲到flink的对象机制时会结合来讲,此处知道collect方法会收集结果,然后发送给接收者即可。在我们的wordcount里,这个算子的接收者就是被chain在一起的flatmap算子,不记得这个示例程序的话,可以返回第一章去看一下。 |
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator
/**
* Processes one element that arrived at this operator.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*/
void processElement(StreamRecord<IN> element) throws Exception;
/**
* Processes a {@link Watermark}.
* This method is guaranteed to not be called concurrently with other methods of the operator.
*
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
void processWatermark(Watermark mark) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}1
而实现了这个接口的StreamFlatMap算子也很简单,没什么可说的:
public class StreamFlatMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
userFunction.flatMap(element.getValue(), collector);
}
}1
从类图里可以看到,flink为我们封装了一个算子的基类```AbstractUdfStreamOperator```,提供了一些通用功能,比如把context赋给算子,保存快照等等,其中最为大家了解的应该是这两个:
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
@Override
public void close() throws Exception {
super.close();
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
}
1 | 这两个就是flink提供的```Rich***Function```系列算子的open和close方法被执行的地方。 |
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
sinkContext.element = element;
userFunction.invoke(element.getValue(), sinkContext);
}
@Override
protected void reportOrForwardLatencyMarker(LatencyMarker maker) {
// all operators are tracking latencies
this.latencyGauge.reportLatency(maker, true);
// sinks don't forward latency markers
}
1 | 其中,```processElement``` 是继承自StreamOperator的方法。```reportOrForwardLatencyMarker```是用来计算延迟的,前面提到StreamSource会产生LateMarker,用于记录数据计算时间,就是在这里完成了计算。 |
public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
// make sure all prior timers are cancelled
stopCheckpointScheduler();
periodicScheduling = true;
currentPeriodicTrigger = timer.scheduleAtFixedRate(
new ScheduledTrigger(),
baseInterval, baseInterval, TimeUnit.MILLISECONDS);
}
}
private final class ScheduledTrigger implements Runnable {
@Override
public void run() {
try {
triggerCheckpoint(System.currentTimeMillis(), true);
}
catch (Exception e) {
LOG.error("Exception while triggering checkpoint.", e);
}
}
}
1 | 启动之后,就会以设定好的频率调用```triggerCheckPoint()```方法。这个方法太长,我大概说一下都做了什么: |
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}1
这里是调用了Execution的triggerCheckpoint方法,一个execution就是一个executionVertex的实际执行者。我们看一下这个方法:
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
final LogicalSlot slot = assignedResource;
if (slot != null) {
//TaskManagerGateway是用来跟taskManager进行通信的组件
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
"no longer running.");
}
}
1 |
|
public void triggerCheckpointBarrier(
final long checkpointID,
long checkpointTimestamp,
final CheckpointOptions checkpointOptions) {
......
Runnable runnable = new Runnable() {
@Override
public void run() {
// set safety net from the task's context for checkpointing thread
LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
try {
boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
if (!success) {
checkpointResponder.declineCheckpoint(
getJobID(), getExecutionId(), checkpointID,
new CheckpointDeclineTaskNotReadyException(taskName));
}
}
......
}
};
executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
}
}
1 | 上面代码里的invokable事实上就是我们的StreamTask了。Task类实际上是将checkpoint委托给了更具体的类去执行,而StreamTask也将委托给更具体的类,直到业务代码。 |
private boolean performCheckpoint(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
synchronized (lock) {
if (isRunning) {
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);
checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
return true;
}
else {
......
}
}
}
1 | 完成```broadcastCheckpointBarrier```方法后,在```checkpointState()```方法中,StreamTask还做了很多别的工作: |
public void executeCheckpointing() throws Exception {
......
try {
//这里,就是调用StreamOperator进行snapshotState的入口方法
for (StreamOperator<?> op : allOperators) {
checkpointStreamOperator(op);
}
// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit
AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
//这里注册了一个Runnable,在执行完checkpoint之后向JobManager发出CompletedCheckPoint消息,这也是fault tolerant两阶段提交的一部分
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
......
}
}
1 | 说到checkpoint,我们印象里最直观的感受肯定是我们的一些做聚合的操作符的状态保存,比如sum的和以及count的值等等。这些内容就是StreamOperator部分将要触发保存的内容。可以看到,除了我们直观的这些操作符的状态保存外,flink的checkpoint做了大量的其他工作。 |
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
try {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
catch (InterruptedException e) {
throw new IOException("Interrupted while broadcasting checkpoint barrier");
}
}
1 | 下游的operator接收到本barrier,就会触发其自身的checkpoint。 |
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
1 | kafka的snapshot逻辑就是记录一下当前消费的offsets,然后做成tuple(partitiion,offset)放进一个```StateBackend```里。StateBackend是flink抽象出来的一个用于保存状态的接口。 |
//还是AbstractStreamOperator.java的snapshotState方法
if (null != operatorStateBackend) {
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
}
1 | 那么这个operatorStateBackend是怎么保存状态的呢? |
public SnapshotResult<OperatorStateHandle> performOperation() throws Exception {
long asyncStartTime = System.currentTimeMillis();
CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
// get the registered operator state infos ...
List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> operatorMetaInfoSnapshots =
new ArrayList<>(registeredOperatorStatesDeepCopies.size());
for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStatesDeepCopies.entrySet()) {
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());
}
// ... write them all in the checkpoint stream ...
DataOutputView dov = new DataOutputViewStreamWrapper(localOut);
OperatorBackendSerializationProxy backendSerializationProxy =
new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
backendSerializationProxy.write(dov);
......
}
1 |
|
//每个元素都会触发这一段逻辑,如果下一个数据是buffer,则从外围的while循环里进入处理用户数据的逻辑;这个方法里默默的处理了barrier的逻辑
final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();
if (bufferOrEvent != null) {
if (bufferOrEvent.isBuffer()) {
currentChannel = bufferOrEvent.getChannelIndex();
currentRecordDeserializer = recordDeserializers[currentChannel];
currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
else {
// Event received
final AbstractEvent event = bufferOrEvent.getEvent();
if (event.getClass() != EndOfPartitionEvent.class) {
throw new IOException("Unexpected event: " + event);
}
}
}
1 |
|
//BarrierBuffer.getNextNonBlocked方法
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
if (!endOfStream) {
// process barriers only if there is a chance of the checkpoint completing
processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());
}
1 |
|
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
if (totalNumberOfInputChannels == 1) {
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier);
}
return;
}
// -- general code path for multiple input channels --
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
if (barrierId == currentCheckpointId) {
// regular case
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
// we did not complete the current checkpoint, another started before
LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.", barrierId, currentCheckpointId);
// let the task know we are not completing this
notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));
// abort the current checkpoint
releaseBlocksAndResetBarriers();
// begin a the new checkpoint
beginNewAlignment(barrierId, channelIndex);
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
return;
}
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
beginNewAlignment(barrierId, channelIndex);
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
return;
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers, triggering checkpoint {} at {}",
receivedBarrier.getId(), receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers();
notifyCheckpoint(receivedBarrier);
}
}
1 |
|
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);1
2从这个类也可以看出来,它的逻辑是通过rpc的方式远程调JobManager的相关方法完成报告事件,底层也是通过akka实现的。
那么,谁响应了这个rpc调用呢?是该任务的JobMaster。
//JobMaster.java
public void acknowledgeCheckpoint(
final JobID jobID,
final ExecutionAttemptID executionAttemptID,
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);
if (checkpointCoordinator != null) {
getRpcService().execute(() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
} catch (Throwable t) {
log.warn("Error while processing checkpoint acknowledgement message");
}
});
} else {
log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator",
jobGraph.getJobID());
}
}
1 |
|
public void notifyCheckpointComplete(long checkpointId) throws Exception {
synchronized (lock) {
if (isRunning) {
LOG.debug("Notification of complete checkpoint for task {}", getName());
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
if (operator != null) {
operator.notifyCheckpointComplete(checkpointId);
}
}
}
else {
LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
}
}
}
1 | 再接下来无非就是层层通知对应的算子做出响应罢了。 |
public abstract class MemorySegment {
public abstract byte get(int index);
public abstract void put(int index, byte b);
public int size() ;
public abstract ByteBuffer wrap(int offset, int length);
......
}1
2
3
4
5
6
7我们可以看到,它在提供了诸多直接操作内存的方法外,还提供了一个```wrap()```方法,将自己包装成一个ByteBuffer,我们待会儿讲这个ByteBuffer。
Flink为MemorySegment提供了两个实现类:```HeapMemorySegment```和```HybridMemorySegment```。他们的区别在于前者只能分配堆内存,而后者能用来分配堆内和堆外内存。事实上,Flink框架里,只使用了后者。这是为什么呢?
如果HybridMemorySegment只能用于分配堆外内存的话,似乎更合常理。但是在JVM的世界中,如果一个方法是一个虚方法,那么每次调用时,JVM都要花时间去确定调用的到底是哪个子类实现的该虚方法(方法重写机制,不明白的去看JVM的invokeVirtual指令),也就意味着每次都要去翻方法表;而如果该方法虽然是个虚方法,但实际上整个JVM里只有一个实现(就是说只加载了一个子类进来),那么JVM会很聪明的把它去虚化处理,这样就不用每次调用方法时去找方法表了,能够大大提升性能。但是只分配堆内或者堆外内存不能满足我们的需要,所以就出现了HybridMemorySegment同时可以分配两种内存的设计。
我们可以看看HybridMemorySegment的构造代码:
HybridMemorySegment(ByteBuffer buffer, Object owner) {
super(checkBufferAndGetAddress(buffer), buffer.capacity(), owner);
this.offHeapBuffer = buffer;
}
HybridMemorySegment(byte[] buffer, Object owner) {
super(buffer, owner);
this.offHeapBuffer = null;
}
1 | 其中,第一个构造函数的```checkBufferAndGetAddress()```方法能够得到direct buffer的内存地址,因此可以操作堆外内存。 |
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private volatile int refCnt = 1;
......
}1
2
3
4
为了方便管理```NetworkBuffer```,Flink提供了```BufferPoolFactory```,并且提供了唯一实现```NetworkBufferPool```,这是个工厂模式的应用。
NetworkBufferPool在每个TaskManager上只有一个,负责所有子task的内存管理。其实例化时就会尝试获取所有可由它管理的内存(对于堆内存来说,直接获取所有内存并放入老年代,并令用户对象只在新生代存活,可以极大程度的减少Full GC),我们看看其构造方法:
public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
......
try {
this.availableMemorySegments = new ArrayBlockingQueue<>(numberOfSegmentsToAllocate);
}
catch (OutOfMemoryError err) {
throw new OutOfMemoryError("Could not allocate buffer queue of length "
+ numberOfSegmentsToAllocate + " - " + err.getMessage());
}
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
}
}
......
long allocatedMb = (sizeInLong * availableMemorySegments.size()) >> 20;
LOG.info("Allocated {} MB for network buffer pool (number of memory segments: {}, bytes per segment: {}).",
allocatedMb, availableMemorySegments.size(), segmentSize);
}
1 |
|
private void redistributeBuffers() throws IOException {
assert Thread.holdsLock(factoryLock);
// All buffers, which are not among the required ones
final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
for (LocalBufferPool bufferPool : allBufferPools) {
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments());
}
return;
}
long totalCapacity = 0; // long to avoid int overflow
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
totalCapacity += Math.min(numAvailableMemorySegment, excessMax);
}
// no capacity to receive additional buffers?
if (totalCapacity == 0) {
return; // necessary to avoid div by zero when nothing to re-distribute
}
final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
Math.min(numAvailableMemorySegment, totalCapacity));
long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
// shortcut
if (excessMax == 0) {
continue;
}
totalPartsUsed += Math.min(numAvailableMemorySegment, excessMax);
final int mySize = MathUtils.checkedDownCast(
memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
assert (totalPartsUsed == totalCapacity);
assert (numDistributedMemorySegment == memorySegmentsToDistribute);
}
1 |
|
/** 该内存池需要的最少内存片数目*/
private final int numberOfRequiredMemorySegments;
/**
* 当前已经获得的内存片中,还没有写入数据的空白内存片
*/
private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
/**
* 注册的所有监控buffer可用性的监听器
*/
private final ArrayDeque<BufferListener> registeredListeners = new ArrayDeque<>();
/** 能给内存池分配的最大分片数*/
private final int maxNumberOfMemorySegments;
/** 当前内存池大小 */
private int currentPoolSize;
/**
* 所有经由NetworkBufferPool分配的,被本内存池引用到的(非直接获得的)分片数
*/
private int numberOfRequestedMemorySegments;
1 | 承接NetworkBufferPool的重分配方法,我们来看看LocalBufferPool的```setNumBuffers()```方法,代码很短,逻辑也相当简单,就不展开说了: |
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
checkArgument(numBuffers >= numberOfRequiredMemorySegments,
"Buffer pool needs at least %s buffers, but tried to set to %s",
numberOfRequiredMemorySegments, numBuffers);
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
} else {
currentPoolSize = numBuffers;
}
returnExcessMemorySegments();
// If there is a registered owner and we have still requested more buffers than our
// size, trigger a recycle via the owner.
if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
owner.releaseMemory(numberOfRequestedMemorySegments - numBuffers);
}
}
}
1 |
|
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
1 |
|
- 有可用的数据时,下游算子从阻塞醒来,从InputChannel取出buffer,再解序列化成record,交给算子执行用户代码
数据在不同机器的算子之间传递的步骤就是以上这些。
了解了步骤之后,再来看一下部分关键代码:
首先是把数据交给recordwriter。1
2
3
4
5
6
7
8
9
10 //RecordWriterOutput.java
@Override
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
// we are only responsible for emitting to the main input
return;
}
//这里可以看到把记录交给了recordwriter
pushToRecordWriter(record);
}
然后recordwriter把数据发送到对应的通道。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37 //RecordWriter.java
public void emit(T record) throws IOException, InterruptedException {
//channelselector登场了
for (int targetChannel : channelSelector.selectChannels(record, numChannels)) {
sendToTarget(record, targetChannel);
}
}
private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
//选择序列化器并序列化数据
RecordSerializer<T> serializer = serializers[targetChannel];
SerializationResult result = serializer.addRecord(record);
while (result.isFullBuffer()) {
if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) {
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
// pipeline).
if (result.isFullRecord()) {
break;
}
}
BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel);
//写入channel
result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
}
checkState(!serializer.hasSerializedData(), "All data should be written at once");
if (flushAlways) {
targetPartition.flush(targetChannel);
}
}
接下来是把数据推给底层设施(netty)的过程:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 //ResultPartition.java
@Override
public void flushAll() {
for (ResultSubpartition subpartition : subpartitions) {
subpartition.flush();
}
}
//PartitionRequestQueue.java
void notifyReaderNonEmpty(final NetworkSequenceViewReader reader) {
//这里交给了netty server线程去推
ctx.executor().execute(new Runnable() {
@Override
public void run() {
ctx.pipeline().fireUserEventTriggered(reader);
}
});
}
netty相关的部分:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//AbstractChannelHandlerContext.java
public ChannelHandlerContext fireUserEventTriggered(final Object event) {
if (event == null) {
throw new NullPointerException("event");
} else {
final AbstractChannelHandlerContext next = this.findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeUserEventTriggered(event);
} else {
executor.execute(new OneTimeTask() {
public void run() {
next.invokeUserEventTriggered(event);
}
});
}
return this;
}
}
最后真实的写入:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 //PartittionRequesetQueue.java
private void enqueueAvailableReader(final NetworkSequenceViewReader reader) throws Exception {
if (reader.isRegisteredAsAvailable() || !reader.isAvailable()) {
return;
}
// Queue an available reader for consumption. If the queue is empty,
// we try trigger the actual write. Otherwise this will be handled by
// the writeAndFlushNextMessageIfPossible calls.
boolean triggerWrite = availableReaders.isEmpty();
registerAvailableReader(reader);
if (triggerWrite) {
writeAndFlushNextMessageIfPossible(ctx.channel());
}
}
private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
......
next = reader.getNextBuffer();
if (next == null) {
if (!reader.isReleased()) {
continue;
}
markAsReleased(reader.getReceiverId());
Throwable cause = reader.getFailureCause();
if (cause != null) {
ErrorResponse msg = new ErrorResponse(
new ProducerFailedException(cause),
reader.getReceiverId());
ctx.writeAndFlush(msg);
}
} else {
// This channel was now removed from the available reader queue.
// We re-add it into the queue if it is still available
if (next.moreAvailable()) {
registerAvailableReader(reader);
}
BufferResponse msg = new BufferResponse(
next.buffer(),
reader.getSequenceNumber(),
reader.getReceiverId(),
next.buffersInBacklog());
if (isEndOfPartitionEvent(next.buffer())) {
reader.notifySubpartitionConsumed();
reader.releaseAllResources();
markAsReleased(reader.getReceiverId());
}
// Write and flush and wait until this is done before
// trying to continue with the next buffer.
channel.writeAndFlush(msg).addListener(writeListener);
return;
}
......
}
上面这段代码里第二个方法中调用的1
2
有写就有读,nio通道的另一端需要读入buffer,代码如下:
//CreditBasedPartitionRequestClientHandler.java
private void decodeMsg(Object msg) throws Throwable {
final Class<?> msgClazz = msg.getClass();
// ---- Buffer --------------------------------------------------------
if (msgClazz == NettyMessage.BufferResponse.class) {
NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;
RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
if (inputChannel == null) {
bufferOrEvent.releaseBuffer();
cancelRequestFor(bufferOrEvent.receiverId);
return;
}
decodeBufferOrEvent(inputChannel, bufferOrEvent);
}
......
}
1 |
|
//StreamElementSerializer.java
public StreamElement deserialize(DataInputView source) throws IOException {
int tag = source.readByte();
if (tag == TAG_REC_WITH_TIMESTAMP) {
long timestamp = source.readLong();
return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
}
else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
return new StreamRecord<T>(typeSerializer.deserialize(source));
}
else if (tag == TAG_WATERMARK) {
return new Watermark(source.readLong());
}
else if (tag == TAG_STREAM_STATUS) {
return new StreamStatus(source.readInt());
}
else if (tag == TAG_LATENCY_MARKER) {
return new LatencyMarker(source.readLong(), new OperatorID(source.readLong(), source.readLong()), source.readInt());
}
else {
throw new IOException("Corrupt stream, found tag: " + tag);
}
}
然后再次在
StreamInputProcessor.processInput()`
循环中得到处理。
至此,数据在跨jvm的节点之间的流转过程就讲完了。
6.3 Credit漫谈
在看上一部分的代码时,有一个小细节不知道读者有没有注意到,我们的数据发送端的代码叫做PartittionRequesetQueue.java
,而我们的接收端却起了一个完全不相干的名字:CreditBasedPartitionRequestClientHandler.java
。为什么前面加了CreditBased的前缀呢?
6.3.1 背压问题
在流模型中,我们期待数据是像水流一样平滑的流过我们的引擎,但现实生活不会这么美好。数据的上游可能因为各种原因数据量暴增,远远超出了下游的瞬时处理能力(回忆一下98年大洪水),导致系统崩溃。
那么框架应该怎么应对呢?和人类处理自然灾害的方式类似,我们修建了三峡大坝,当洪水来临时把大量的水囤积在大坝里;对于Flink来说,就是在数据的接收端和发送端放置了缓存池,用以缓冲数据,并且设置闸门阻止数据向下流。
那么Flink又是如何处理背压的呢?答案也是靠这些缓冲池。
![image_1cfksrl5cd4m1lbqqqgvc811349.png-43.1kB][30]
这张图说明了Flink在生产和消费数据时的大致情况。ResultPartition
和InputGate
在输出和输入数据时,都要向NetworkBufferPool
申请一块MemorySegment
作为缓存池。
接下来的情况和生产者消费者很类似。当数据发送太多,下游处理不过来了,那么首先InputChannel会被填满,然后是InputChannel能申请到的内存达到最大,于是下游停止读取数据,上游负责发送数据的nettyServer会得到响应,停止从ResultSubPartition读取缓存,那么ResultPartition很快也将存满数据不能被消费,从而生产数据的逻辑被阻塞在获取新buffer上,非常自然地形成背压的效果。
Flink自己做了个试验用以说明这个机制的效果:
![image_1cfkta54rkdd1od4aau1e3n7nhm.png-240.6kB][31]
我们首先设置生产者的发送速度为60%,然后下游的算子以同样的速度处理数据。然后我们将下游算子的处理速度降低到30%,可以看到上游的生产者的数据产生曲线几乎与消费者同步下滑。而后当我们解除限速,整个流的速度立刻提高到了100%。
6.3.2 使用Credit实现ATM网络流控
上文已经提到,对于流量控制,一个朴素的思路就是在长江上建三峡链路上建立一个拦截的dam,如下图所示:
![image_1cfku114lf7hpqf3lmcl0116c13.png-22.7kB][32]
基于Credit的流控就是这样一种建立在信用(消费数据的能力)上的,面向每个虚链路(而非端到端的)流模型,如下图所示:
![image_1cfku4g4g174d7gb5ecbfcib71g.png-22.5kB][33]
首先,下游会向上游发送一条credit message,用以通知其目前的信用(可联想信用卡的可用额度),然后上游会根据这个信用消息来决定向下游发送多少数据。当上游把数据发送给下游时,它就从下游的信用卡上划走相应的额度(credit balance):
![image_1cfkug5sm1v4l15pbgj4jntc7q1t.png-12.9kB][34]
下游总共获得的credit数目是Buf_Alloc,已经消费的数据是Fwd_Cnt,上游发送出来的数据是Tx_Cnt,那么剩下的那部分就是Crd_Bal:
Crd_Bal = Buf_Alloc - ( Tx_Cnt - Fwd_Cnt )
上面这个式子应该很好理解。
可以看到,Credit Based Flow Control的关键是buffer分配。这种分配可以在数据的发送端完成,也可以在接收端完成。对于下游可能有多个上游节点的情况(比如Flink),使用接收端的credit分配更加合理:
![image_1cfkvpmlh1gl31ef41cvh1c903a19.png-13.1kB][35]
上图中,接收者可以观察到每个上游连接的带宽情况,而上游的节点Snd1却不可能轻易知道发往同一个下游节点的其他Snd2的带宽情况,从而如果在上游控制流量将会很困难,而在下游控制流量将会很方便。
因此,这就是为何Flink在接收端有一个基于Credit的Client,而不是在发送端有一个CreditServer的原因。
最后,再讲一下Credit的面向虚链路的流设计和端到端的流设计的区别:
![image_1cfl05d2f1ub879c1lc5qsq14n9m.png-13.4kB][36]
如上图所示,a是面向连接的流设计,b是端到端的流设计。其中,a的设计使得当下游节点3因某些情况必须缓存数据暂缓处理时,每个上游节点(1和2)都可以利用其缓存保存数据;而端到端的设计b里,只有节点3的缓存才可以用于保存数据(读者可以从如何实现上想想为什么)。
对流控制感兴趣的读者,可以看这篇文章:[Traffic Management For High-Speed Networks][37]。
7.其他核心概念
截至第六章,和执行过程相关的部分就全部讲完,告一段落了。第七章主要讲一点杂七杂八的内容,有时间就不定期更新。
7.1 EventTime时间模型
flink有三种时间模型:ProcessingTime,EventTime和IngestionTime。
关于时间模型看这张图:
![image_1cdbotdcmoe11q961st5lbn1j4n9.png-38.4kB][38]
从这张图里可以很清楚的看到三种Time模型的区别。
- EventTime是数据被生产出来的时间,可以是比如传感器发出信号的时间等(此时数据还没有被传输给flink)。
- IngestionTime是数据进入flink的时间,也就是从Source进入flink流的时间(此时数据刚刚被传给flink)
- ProcessingTime是针对当前算子的系统时间,是指该数据已经进入某个operator时,operator所在系统的当前时间
例如,我在写这段话的时间是2018年5月13日03点47分,但是我引用的这张EventTime的图片,是2015年画出来的,那么这张图的EventTime是2015年,而ProcessingTime是现在。
Flink官网对于时间戳的解释非常详细:点我
Flink对于EventTime模型的实现,依赖的是一种叫做watermark
的对象。watermark是携带有时间戳的一个对象,会按照程序的要求被插入到数据流中,用以标志某个事件在该时间发生了。
我再做一点简短的说明,还是以官网的图为例:
![image_1cdbt8v5jl2ujn91uu1joh1p4gm.png-11.3kB][39]
对于有序到来的数据,假设我们在timestamp为11的元素后加入一个watermark,时间记录为11,则下个元素收到该watermark时,认为所有早于11的元素均已到达。这是非常理想的情况。
![image_1cdbtcc5c1a6i1tuaadb1rd5136913.png-11.6kB][40]
而在现实生活中,经常会遇到乱序的数据。这时,我们虽然在timestamp为7的元素后就收到了11,但是我们一直等到了收到元素12之后,才插入了watermark为11的元素。与上面的图相比,如果我们仍然在11后就插入11的watermark,那么元素9就会被丢弃,造成数据丢失。而我们在12之后插入watermark11,就保证了9仍然会被下一个operator处理。当然,我们不可能无限制的永远等待迟到元素,所以要在哪个元素后插入11需要根据实际场景权衡。
对于来自多个数据源的watermark,可以看这张图:
![image_1cdbufp4a1opmsit5n61mial4520.png-72kB][41]
可以看到,当一个operator收到多个watermark时,它遵循最小原则(或者说最早),即算子的当前watermark是流经该算子的最小watermark,以容许来自不同的source的乱序数据到来。
关于事件时间模型,更多内容可以参考Stream 101 和谷歌的这篇论文:Dataflow Model paper
7.2 FLIP-6 部署及处理模型演进
就在老白写这篇blog的时候,Flink发布了其1.5 RELEASE版本,号称实现了其部署及处理模型(也就是FLIP-6),所以打算简略地说一下FLIP-6的主要内容。
7.2.1 现有模型不足
1.5之前的Flink模型有很多不足,包括:
- 只能静态分配计算资源
- 在YARN上所有的资源分配都是一碗水端平的
- 与Docker/k8s的集成非常之蠢,颇有脱裤子放屁的神韵
- JobManager没有任务调度逻辑
- 任务在YARN上执行结束后web dashboard就不可用
- 集群的session模式和per job模式混淆难以理解
就我个人而言,我觉得Flink有一个这里完全没提到的不足才是最应该修改的:针对任务的完全的资源隔离。尤其是如果用Standalone集群,一个用户的task跑挂了TaskManager,然后拖垮了整个集群的情况简直不要太多。
7.2.2 核心变更
Single Job JobManager
最重要的变更是一个JobManager只处理一个job。当我们生成JobGraph时就顺便起一个JobManager,这显然更加自然。
ResourceManager
其职责包括获取新的TM和slot,通知失败,释放资源以及缓存TM以用于重用等。重要的是,这个组件要能做到挂掉时不要搞垮正在运行的好好的任务。其职责和与JobManager、TaskManager的交互图如下:
![image_1cfl9453k1gld4acr1m13j3195sg.png-23.9kB][42]
TaskManager
TM要与上面的两个组件交互。与JobManager交互时,要能提供slot,要能与所有给出slot的JM交互。丢失与JM的连接时要能试图把本TM上的slot的情况通告给新JM,如果这一步失败,就要能重新分配slot。
与ResourceManager交互时,要通知RM自己的资源和当前的Job分配情况,能按照RM的要求分配资源或者关闭自身。
JobManager Slot Pool
这个pool要持有所有分配给当前job的slot资源,并且能在RM挂掉的情况下管理当前已经持有的slot。
Dispatcher
需要一个Job的分发器的主要原因是在有的集群环境下我们可能需要一个统一的提交和监控点,以及替代之前的Standalone模式下的JobManager。将来对分发器的期望可能包括权限控制等。
![image_1cfl9ju2617bh1s191mar1jsp12vot.png-31.4kB][43]
7.2.3 Cluster Manager的架构
YARN
新的基于YARN的架构主要包括不再需要先在容器里启动集群,然后提交任务;用户代码不再使用动态ClassLoader加载;不用的资源可以释放;可以按需分配不同大小的容器等。其执行过程如下:
无Dispatcher时
![image_1cfla0n7u1lg21n3o36uu0c1o5h1a.png-46.2kB][44]
有Dispatcher时
![image_1cfla15os15i3qcsu6c4p4clk1n.png-50.7kB][45]
Mesos
与基于YARN的模式很像,但是只有带Dispatcher模式,因为只有这样才能在Mesos集群里跑其RM。
![image_1cfla4tka101n18bf1mno4npu9s24.png-49.2kB][46]
Mesos的Fault Tolerance是类似这样的:
![image_1cfla6eka1ph71mu1pll1q0mgqq2h.png-12.1kB][47]
必须用类似Marathon之类的技术保证Dispatcher的HA。
Standalone
其实没啥可说的,把以前的JobManager的职责换成现在的Dispatcher就行了。
![image_1cflaaim2ih2v54umsmq01lqc2u.png-36.8kB][48]
将来可能会实现一个类似于轻量级Yarn的模式。
Docker/k8s
用户定义好容器,至少有一个是job specific的(不然怎么启动任务);还有用于启动TM的,可以不是job specific的。启动过程如下
![image_1cflafs2o1trgicjmdbndn1bdq3b.png-24.2kB][49]
7.2.4 组件设计及细节
分配slot相关细节
从新的TM取slot过程:
![image_1cflakoadvjm8pf6nt1k331qj33o.png-77.2kB][50]
从Cached TM取slot过程:
![image_1cflambu91ufi5fl1cg9gimdff45.png-63.4kB][51]
失败处理
TM失败
TM失败时,RM要能检测到失败,更新自己的状态,发送消息给JM,重启一份TM;JM要能检测到失败,从状态移除失效slot,标记该TM的task为失败,并在没有足够slot继续任务时调整规模;TM自身则要能从Checkpoint恢复RM失败
此时TM要能检测到失败,并准备向新的RM注册自身,并且向新的RM传递自身的资源情况;JM要能检测到失败并且等待新的RM可用,重新请求需要的资源;丢失的数据要能从Container、TM等处恢复。JM失败
TM释放所有task,向新JM注册资源,并且如果不成功,就向RM报告这些资源可用于重分配;RM坐等;JM丢失的数据从持久化存储中获得,已完成的checkpoints从HA恢复,从最近的checkpoint重启task,并申请资源。JM & RM 失败
TM将在一段时间内试图把资源交给新上任的JM,如果失败,则把资源交给新的RMTM & RM失败
JM如果正在申请资源,则要等到新的RM启动后才能获得;JM可能需要调整其规模,因为损失了TM的slot。
8.后记
Flink是当前流处理领域的优秀框架,其设计思想和代码实现都蕴含着许多人的智慧结晶。这篇解读花了很多时间,篇幅也写了很长,也仍然不能能覆盖Flink的方方面面,也肯定有很多错误之处,欢迎大家批评指正!Flink生态里中文资料确实不多,对Flink源码有兴趣的读者,可以参考[VinoYang的专栏][52],继续学习之旅。