Spark Streaming(以下简写SS)
Exactly once语义(以下简写EO)
首先EO表示可以精准控制到某一条记录,但由于SS是基于rdd和batch的,所以SS的EO可以认为是针对一个批次的的精准控制(控制各个批次间是否重复和漏读)。
涉及到三部分都保证 exactly once 的语义。
1、(数据源)上游是否EO到SS
2、(数据处理)SS作为整体是否保证了EO
3、(数据存储)SS是否将数据EO地写出到了下游 涉及到
上游是否EO到SS
对于 接收数据,主要取决于上游数据源的特性。例如,
从 HDFS 这类支持容错的文件系统中读取文件,能够直接支持 Exactly-once 语义。
如果上游消息系统支持 ACK(如RabbitMQ),我们就可以结合 Spark 的 Write Ahead Log 特性来实现 At-least-once 语义。
对于非可靠的数据接收器(如 socketTextStream),当 Worker 或 Driver 节点发生故障时就会产生数据丢失,提供的语义也是未知的。
而 Kafka 消息系统是基于偏移量(Offset)的,它的 Direct API 可以提供 Exactly-once 语义。
官方在创建 DirectKafkaInputStream(Kafka direct api)时只需要输入消费 Kafka 的 From Offset,然后其自行获取本次消费的 End Offset,也就是当前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消费从上次的 End Offset 开始消费。
当程序宕机或重启任务后,这其中存在一些问题。如果在数据处理完成前存储 Offset,则可能存在作业处理数据失败与作业宕机等情况,重启后会无法追溯上次处理的数据导致数据出现丢失。如果在数据处理完成后存储 Offset,但是存储 Offset 过程中发生失败或作业宕机等情况,则在重启后会重复消费上次已经消费过的数据。
而且此时又无法保证重启后消费的数据与宕机前的数据量相同数据相当,这又会引入另外一个问题,如果是基于聚合统计指标作更新操作,这会带来无法判断上次数据是否已经更新成功。
参考文章中给出的解决方案是,保证在创建 DirectKafkaInputStream 可以同时输入 From Offset 与 End Offset,并且我们在存储 Kafka Offset 的时候保存了每个批次的起始Offset 与结束 Offset。这样的设计使得后面用户在后面对于第一个批次的数据处理非常灵活可变:
1、如果用户直接忽略第一个批次的数据,那此时保证的是 at most once 的语义,因为我们无法获知重启前的最后一个批次数据操作是否有成功完成。
2、如果用户依照原有逻辑处理第一个批次的数据,不对其做去重操作,那此时保证的是 at least once 的语义,最终结果中可能存在重复数据;
3、最后如果用户想要实现 exactly once,muise spark core 提供了根据topic、partition 与 offset 生成 UID 的功能。只要确保两个批次消费的 Offset 相同,则最终生成的 UID 也相同,用户可以根据此 UID 作为判断上个批次数据是否有存储成功的依据。下面简单的给出了重启后第一个批次操作的行为。
参考:
http://www.10tiao.com/html/522/201809/2651425155/1.html
实际上如果是聚合操作,完全可以引入状态计算,而不需要修改源码。
或者将Offset,存储到redis中,每次存redis中读取,见其他文章https://blog.csdn.net/qq_32252917/article/details/78827126 。本文只对EO做讨论,状态计算在其他文章做介绍。EO只要明确保证能拿到上次成功结束的offset就可以了(保证数据零丢失),至于后面是否会被重复计算部分,可以根据业务做不同的处理(根据输出做幂等设计)。
控制offset实际上是解决数据丢失如下的主要场景:
SS在使用Receiver收到数据时(非Kafka direct api),通过Driver的调度,Executor开始计算数据的时候如果Driver突然奔溃(导致Executor会被Kill掉),此时Executor会被Kill掉,那么Executor中的数据就会丢失,此时就必须通过例如WAL机制让所有的数据通过类似HDFS的方式进行安全性容错处理,从而解决Executor被Kill掉后导致数据丢失可以通过WAL机制恢复回来。此时数据可以零丢失,但并不能保证Exactly Once,如果Receiver接收且保存起来后没来得及更新updateOffsets时,就会导致数据被重复处理。所以本文讨论的EO是用户自己能精准控制offset而非交给框架去处理。
SS处理数据是否保证了EO
在使用 Spark RDD 对数据进行 转换或汇总 时,我们可以天然获得 Exactly-once 语义,因为 RDD 本身就是一种具备容错性、不变性、以及计算确定性的数据结构。只要数据来源是可用的,且处理过程中没有副作用(Side effect),我们就能一直得到相同的计算结果。
SS内部的实现机制是spark core基于RDD模型的,RDD为保证计算过程中数据不丢失使用了checkpoint机制,也就是说其计算逻辑是RDD的变换过程,也就是DAG,可以在计算过程中的任何一个阶段(也就是这个阶段的RDD)上使用checkpoint方法,就可以保证当后续计算失败,可以从这个checkpoint重新算起,使得计算延续下去。当Spark Streaming场景下,其天然会进行batch操作,也就是说kafka过来的数据,每秒(一个固定batch的时间周期)会对当前kafka中的数据产生一个RDD,那么后续计算就是在这个RDD上进行的。只需要在kafkaRDD这个位置合理使用了checkpoint(这一点在前面已经讲过,可以保证)就能保证SS内部的Exactly once。
SS是否将数据EO地写出到了下游
结果输出 默认符合 At-least-once 语义,因为 foreachRDD 方法可能会因为 Worker 节点失效而执行多次,从而重复写入外部存储。我们有两种方式解决这一问题,幂等更新和事务更新。下面我们将深入探讨这两种方式。
参考:https://blog.csdn.net/qq_32252917/article/details/78827126
首先输出操作是具有At-least Once语义的,也就是说SS可以保证需要输出的数据一定会输出出去,只不过由于失败等原因可能会输出多次。那么如何保证Exactly once?
第一种“鸵鸟做法”,就是期望下游(数据)具有幂等特性。
比如相同数据写 hdfs 同一个文件,这本身就是幂等操作,保证了多次操作最终获取的值还是相同;HBase、ElasticSearch 与 redis 等都能够实现幂等操作。对于关系型数据库的操作一般都是能够支持事务性操作。
第二种使用事务更新,简要代码如下:1
2
3
4
5
6
7dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
这样保证同一个partition要么一起更新成功,要么一起失败,通过uniqueId来标识这一次的更新,这就要求下游支持事务机制。
如果不采用幂等或者事务。可以采用如下方案,除了数据主动(重启服务)出错外,还会遇到如下问题。
关于Spark Streaming数据输出多次重写及解决方案:
为什么会有这个问题,因为SparkStreaming在计算的时候基于SparkCore,SparkCore天生会做以下事情导致SparkStreaming的结果(部分)重复输出:
1.Task重试;
2.慢任务推测;
3.Stage重复;
4.Job重试;
会导致数据的丢失。
对应的解决方案:
1.一个任务失败就是job 失败,设置spark.task.maxFailures次数为1;
2.设置spark.speculation为关闭状态(因为慢任务推测其实非常消耗性能,所以关闭后可以显著的提高Spark Streaming处理性能)
3.Spark streaming on kafka的话,假如job失败后可以设置kafka的auto.offset.reset为largest的方式会自动恢复job的执行。
参考:
https://zybuluo.com/marlin/note/486917
http://spark.apache.org/docs/latest/streaming-programming-guide.html#semantics-of-output-operations
https://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/