《Streaming Systems》阅读记录系列
现在,我们将从变成模型及接口的讨论,转向对实现框架的分析。模型和接口可以让用户去描述计算,但实际计算过程则需要运行在系统,通常是分布式系统中。
在此章中,我们将着重笔描述如何实现Beam Model系统,以产生准确结果。流式系统经常会谈论exactly-once的处理,即是说,保证记录被精确无误地处理一次。我们会阐释这个观点,并解释它是如何实现的。
我们将以Google Cloud Dataflow为具体例子,关注其在保证高效的exactly-once处理中使用到的技术。在本章的末尾,我们也会介绍其它流行批处理框架的exactly-once保证机制。
Why Exactly Once Matters
对于许多用户而言,在数据处理过程中丢失记录或数据,从而产生的风险不言自明,并不可接受。即使这样,由于各种历史原因,许多通用流式处理框架依然并不具有数据处理保证机制——仅能尽力完成而已。另一些系统则提供at-least-once保证,即数据流经系统,总是保证至少会得到一次处理,这却会带来重复处理问题(导致在结果聚合中的不准确),在实际运用中,许多这种at-least-once系统会在内存中进行聚合操作,使得当机器崩溃的时候依然会产生数据丢失。这类系统常用于低延迟的、推导式的结果,但是一般来说对结果的正确性并不具有任何保证。
正如第一章所指出的,流式处理框架的这种问题,导致了Lambda架构的产生——部署一套流式处理系统,以获得低延迟,但不准确的结果,随后(通常在一天的数据进入系统后),用批处理系统来获得准确的结果。Lambda架构只能在数据流可重放时有用,幸运的是,有足够多的数据源可以通过这种策略得到支持。然而,大量的用户在实践Lambda架构时,会经历这样一些问题:
不准确
用户往往会低估失败的影响。他们通常会假设只有非常少的记录会丢失或重复(基于其运行经验),很难相信有一天这种故障率能达到10%或者更多。从某种意义上说,这类系统只提供“一半”的保证,如果不能提供完全的保证,任何异常都有可能产生。不一致
Lambda中的批处理计算经常会与流失计算的逻辑产生差异。因此想要让两个系统产生一致的结果实际要比最初设想的要难。复杂性
按照定义,Lambda架构要求使用者编写并维护两套不同的代码,维护两套不同的分布式复杂系统,各自又有不同的故障问题。这种考虑了所有情况的处理程序,缺少简洁性迅速成为了其崩溃的引子。不可预测
在很多使用案例中,数据使用者在任何时间查看流式程序的结果时,都会获得一个与日报数据(由批处理程序产生)有随机偏差的结果。这种情况下,用户将逐渐丧失对流式处理程序结果的信任,而等待每天的批处理程序结果,从而使得获取低延迟计算结果变得毫无意义。延迟
有些业务场景需要低延迟且准确的计算结果,而Lambda在设计之初即满足不了这种需要幸运的是,许多Beam的引擎在此问题上处理的更优秀。在本章中,我们将解释exactly-once流式处理是如何通过一套统一的编程接口,帮用户做准确的计算,避免数据丢失风险的。考虑到很多人会将一些影响处理结果的因素错误的归类到exactly-once中,我们首先会简单的解释,在Beam的处理环境中,那些exactly-once所包含及未包含的问题。
准确性与完整度
在Beam程序处理记录的过程中,我们都希望任意时刻都不会发生丢失或重复。但是流式程序的天然特点决定了记录可能会晚于其时间处理窗口。Beam的SDK可以允许用户配置系统等待延迟数据的最晚时间,当且仅当数据晚于截止时间时会被删除。这个特性使得系统具有完整性,而不是准确性:所有的记录及时到达的,会被准确的处理一次,而延迟的数据,则会被明确地删除。
尽管延迟数据经常在streaming的语义中被讨论,但是需要注意,批处理中也有相似的完整性问题。比如,一个常见的例子,批处理程序在凌晨2点运行任务,来处理前一天的所有数据。但是如果前一天的数据在任务运行前还收集到,则数据就不会被批处理程序处理。因此,批处理程序也是只提供精确,但不总是完整的数据。
副作用
Beam和Dataflow系统的一个特征是,用户可以插入自定义的代码来作为整体流程的一部分来执行。无论是流式处理还是批处理系统,Dataflow并不保证这些代码会被严格的执行一次。特定的记录可能被用户代码转换多次,或者,同一个记录可能同时被多个worker处理,这在保证at-least-once是必要的,因为可能存在执行worker的故障。最终只有允许产生一个结果并输出到下游。
因此,非幂等操作无法保证数据exactly-once语义。如果你的代码相对于处理程序有外部副作用,比如访问外部服务,这些副作用对于某些记录可能生效不止一次。这种情况通常无法避免,毕竟没有什么办法能让Dataflow自动提交处理结果到外部服务上。处理流程最终需要将处理结果输出到外部,这种输出可能是非幂等的。在本章余下内容中可以看到,这些sink通常可以添加一些额外的阶段来重建服务调用以允许幂等操作。
问题界定
以上的几个例子都不属于我们讨论的问题范围。那么到底什么才是我们所说的exactly-once处理?我们以下面的流式处理程序为例,来阐述一下。
1 | Pipeline p = Pipeline.create(options); |
这个程序会在两个不同的窗口上进行聚合。第一个计算每个独立用户的每分钟事件数,第二个计算每分钟所有用户的事件数和。每个聚合结果都会被写入到一个不确定的sink上。
Dataflow中会在许多不同worker中并行执行程序,在每个GroupByKey
操作后(Count
操作底层使用了GroupByKey
),所有包含相同key的记录会被同一个worker处理,这个过程称为shuffle。Dataflow的worker在彼此间shuffle时会调用RPCs,保证某个key的所有记录最终会汇聚到同一个worker中。
下图表示Dataflow为例5-1创建的shuffle过程。Count.perKey
shuffle会将每个用户的数据传输到对应worker上,而Count.globally
则将所有worker中计算得到的部分统计结果shuffle到一个worker上来计算和。
为了让Dataflow能精确的处理数据,shuffle过程需要保证每个记录被准确的shuffle一次,你会发现,分布式系统的shuffle特点会让这种需求变得很有挑战。
由于程序兼具从外部读写数据,Dataflow必须保证这个交互过程不会引入任何的不准确。好在Dataflow自始至终都支持这种要求,无论是source还是sink,在任何时刻都是如此,这也是Apach Spark和Apach Flink中所谓的端到端的exactly-once。
本章中,我们将讨论以下三个主题:
- Shuffle:Dataflow如何保证每个记录被shuffle exactly-once
- Source:Dataflow如何保证每个记录在souce中被处理exactly-once
- Sinks:Dataflow如何保证每个sink产生精确输出
Shuffle中的Exactly-once
上文提到,Dataflow中shuffle使用RPCs,任何时候只要存在两个worker进行RPC通讯,都需要时刻考虑数据的完整性。首先,RPCs可能由于各种原因调用失败,比如网络故障,RPC在完成调用前产生超时,或接受服务器可能主动拒绝服务。为了保证记录不会在shuffle中丢失,Dataflow提供了上游的数据备份,简单来说,发送方会重试RPCs直至接收到接收放的ack确认,即使发送方发生故障。这种机制保证Dataflow中的数据一定会至少提交一次。
但是,另一个问题出现了,多次的重试可能导致数据的重复。大多数的RPC框架,包括Dataflow所使用的,都会提供给发送方一个状态码以表示成功或失败。但是,需要清楚的是,在分布式的系统中,有时RPCs会出现表现失败实际成功的状态。有很多原因可能造成这种情况,比如出现RPC超时的竞争条件,服务端成功的ack传输失败,等等。只有服务方接收到的成功状态才是确定的调用成功。
RPC调用时返回的错误状态,表明调用处于成功或失败的模糊结果。尽管有些错误码表明的是确切的错误发生,很多常见的RPC调用,比如超时,都是模棱两可的。在流式处理中shuffle中,重试成功的RPC意味着对记录的重复提交,Dataflow需要一些其它机制来检测和消除这种重复。
在较高的层次上,算法用于解决这种问题的手段是比较直观的(如下图):每个消息传输时都被赋予一个唯一标记。接收者会保存所有已经发送和处理的标记的记录表。每次记录被接受到时,其标记会与记录表做对比,重复的则删除。Dataflow的记录表是建立在一个伸缩性的键-值存储上的,这个存储结构可以用于保存一个不重复的记录表。
原因探讨
然而,使这个策略变成现实需要考虑诸多问题。其中一个直接问题是,Beam Model允许用户代码产生不确定的输出,意味着像ParDo
操作可能在同一个记录上执行两次(由于重试),但是在每次执行时会产生不同输出。需要仅其中一次会被提交给下游过程。但是不确定输出的引入,导致无法保证两次输出产生相同的确定性ID。更麻烦的是,Pardo
甚至可能产生多个输出,也就是说,多次重试时每次重试产生的输出数量也是不确定的。
那么,为何我们不直接要求所有用户代码的处理结果是确定的呢?实际使用经验表明,许多处理过程确实需要不确定的转换,而且大多数情况下,开发者们并没有意识到他们的代码是不确定的。比如,通过查询不断增长的Bigtable中的数据以补充输入数据,这是种不确定的转换任务,因为外部数据在多次重试中会发生变化;任何依赖当前时间的代码也是不确定的;或者某些转换需要依赖某些随机数生成器,即使我们的代码是完全确定的,任何允许延迟到达的基于事件时间的聚合都可能具有不确定的输入。
Dataflow通过使用checkpoint来高效的将不确定任务转换成确定过程。每次处理的数据都会连同其唯一ID保存到一个稳定的存储中,然后发送给下个处理过程。每次在shuffle传输过程中进行重试时都会将已存档的输出重放,而用户的不确定性代码不会再重试。换句话说,用户的代码可能会运行多次来竞争输出,但只有一次能最终“获胜”。此外,Dataflow提供一个一致性机制来阻止重复的数据写入到稳定存储。
性能
为了实现extactly-once shuffle的递交,每个接收者都会保存一份接收数据的ID字典,Dataflow通过对照每次接收到的数据与字典,来确定数据是否重复。在每个过程中产生的输出都会被保存到存储,这保证了生成的记录的ID都是稳定的。
然而,除非实现的非常精巧,否则这个过程中在读写上的巨大开销可能会严重的影响处理程序的性能。因此,为了让extractly-once处理具有可行性,Dataflow的I/O开销必须降低,通常典型的做法是,防止每个记录都进行I/O操作。
Dataflow使用了两个技术来达到降低I/O的目的:图优化及布隆过滤器
图优化
Dataflow在执行任务之前,会针对任务生成的图进行一系列优化。比如说,聚合优化,可将多个逻辑步骤合并成一个执行步骤。如下图描述了一些典型的聚合优化的例子
这些被合并的过程会被作为一个整体在执行单元内运行,因此无需对每个过程的结果保存extraly-once数据。在大量的例子中,聚合可以将整个任务的图精简成几个实际执行过程,大大减少了数据的传输(以及数据状态的保存等)。
Dataflow还优化了关联和交换的组合操作(比如Count和Sum),通过在节点上进行局部的合并,然后传输到全局合并过程(如下图),也可以大大减少消息的传输数量,从而减少读写。
布隆过滤器
上述所提到的技术主要目的都并非为了提高exactly-once的性能,无心插柳而已。而布隆过滤器则是确切的面向提高extractly-once处理性能的技术。
在一个正常的处理过程中,大多数到达的记录都不会重复。因此,我们可以使用这种特点,利用布隆过滤器(布隆过滤器是种压缩的数据结构,提供快速的集合查询操作)来极大的提高性能。布隆过滤器对我们而言很有个很有意思的特点:返回假阳性(FP),不返回假阴性(FN),即对存在值误报不漏报,对于任何一次查询,布隆过滤器返回两个结果:查询可能在集合中,查询一定不在集合中。使用布隆过滤器能帮我们快速过滤大量负查询,对于目前的需求来说,是非常合适的。
Dataflow中对于布隆过滤器的实现是这样的:每个工作节点为每个处理过的ID保存一个布隆过滤条件,每次处理新的记录ID时,先查询布隆过滤器,如果布隆过滤器返回false,则记录不重复,工作节点可以跳过昂贵的持久存储查询过程;只有当布隆过滤器返回true时,才会触发第二次查询,但,只要过滤器的假阳率足够低,则可以提供非常可观的优化效果。
然而,布隆过滤器的过滤池会随着时间流逝逐渐填满,假阳率会逐渐增加。同时,当worker重启的时候,还需要根据存储介质中的ID数据,来重建布隆过滤器。Dataflow在每个记录中会附带一个系统时间戳,服务会每隔10分钟会单独创建一个布隆过滤器,而不是维持全局的过滤器。这个过程能防止过滤池饱和,随着时间推移,过滤器会被垃圾回收处理,同时还能有效限制启动时需要扫描的数据量。
下图描述的即是这个过程:记录到达时,会根据其系统时间将其分配给所属时间片的布隆过滤器,没有命中任何过滤器过滤条件的为不重复的。比如记录R1是第二次被分配,可以确定其确实是重复的,但是由于布隆过滤器的特点(FP而不是FN),需要查找ID目录确定是否真的不重复,R4、R6也同理。而R8是不重复的。
垃圾回收
每个Dataflow的worker都会保存一份其已经接收的记录ID的列表目录,对于整个Dataflow的状态和一致性模型来说,即是保存每个以此ID标记的已处理消息的目录。当然我们没有办法以有限的介质,来存储所有的这些记录,因此需要一种内存回收机制,来对ID目录所占用的空间进行处理。
其中一个有效的策略是,让发送者为每个消息标记一个单调递增的序列号,用于追踪还在发送中的最早序列号(也就是还未被确认的发送消息)。所有在列表目录中的早于此序列号的ID,都可被垃圾回收处理掉(显然他们都已经被确认过了)。
我们还有一个更好的策略,正如前面所讲,Dataflow已经对每个接收的消息标记了系统时间戳,用于布隆过滤锁的高性能实现。所以,可以基于这个系统时间(其实也就是处理时间)来计算用于垃圾回收的时间水位,而不是使用单调递增序列号。这样做还有一个优点是,可以通过对某个物理stage的实际等待时间来计算watermark,从而反映出整个流程中的瓶颈。这个元数据也是Dataflow的WebUI中系统延迟测量的基础。
如果到达的记录的系统时间晚于已被垃圾回收处理的watermark,又会怎样?(这可能由于我们所说的网络残余所造成,即旧的消息在网络中无限期卡住,然后突然又出现。)但是,用于触发垃圾回收的低水位并不会移动,直到所有的消息都被确认,因此我们确认这种记录(到达的记录的系统时间晚于已被垃圾回收处理的watermark)已经被成功处理过了,所以网络残余并不会影响Shuffle的exactly-once语义。
Sources中的Exactly-once
Beam提供用于读入数据到Dataflow程序中的数据源(source) API。在数据处理失败时,Dataflow可能会重新从source中读取,因此也需要某种机制来保证source产生的每条独立记录被准确的exactly-once处理。
大多数Dataflow的Source处理都是透明的,也是确定性的。比如,用于从文件系统中读入数据的source,记录总是以确定的顺序,在确定的字节位上存储于文件中,而不管读取多少次。文件名及字节位可以唯一地确定一个记录,因此服务可以自动的为这些消息创建一个唯一ID。另一个相似的例子是Apache Kafka,每个Kafka topic会被分成一个静态的分片集合,分片中的数据也总是具有一个确定的顺序。这种确定的数据源,可以在Dataflow中无缝的对接,而不会有重复产生。
然而,并不是所有的数据源都是如此简单。举例来说,Google Cloud订阅发布系统就是一个常用的但是却并不确定的Dataflow系统的数据源:多个订阅者可以从订阅发布系统的topic中拉取数据,但是拉取到确定数据的订阅者却是不确定的,当消息处理失败时,订阅发布系统会重新发送消息,但消息的接收者(worker)可能不是上次处理的worker。对于数据源的不确定性,Dataflow需要提供用于检测重复的机制,但很不幸,服务并没有办法对于每个消息提供记录ID,来保证稳定重试(我们将在稍后介绍发布订阅系统时做进一步讨论)。
对于不确定的数据源,Dataflow需要数据源提供额外的数据,来生成记录ID,Dataflow的数据源API提供UnboundedReader.getCurrentRecordId
接口,用于提供记录ID的生成,来防止记录重复的读取。
Sinks中的Exactly-once
通常,每个流式程序都需要输出数据到外部,sink简单理解,即是这样的一类组件。 需要明白,将数据发送到外部,是有副作用的,我们之前在界定问题的时候提过,Dataflow不保证有副作用的程序的Exactly-once语义。那么,Sink是如何来保证数据被发送一次的呢?
Beam的SDK中提供了一系列内置的Sink组件,这些组件都被谨慎的设计过,以保证不产生重复输出,即使被执行多次。因此,开发者应尽量优先考虑使用内置的Sink。
但是,有时候内置的Sink并不能满足自定义需求,最好的办法就是能保证这种有副作用的操作是幂等的,从而保证重放的正确性。然而有些使用DoFn
并不是确定性的,重放的时候输出可能会发生变化。在窗口合并时,窗口中的记录集合可能就是不确定的。
比如,窗口可能尝试触发e0,e1,e2,但是在提交处理结果前worker崩溃了(但不先于发送副作用操作),当worker重启时,window会被重新触发,但是此时数据e3延迟到达,由于e3早于窗口提交,此时e3对于新触发的窗口,不视作延迟数据。DoFn
会以e0,e1,e2,e3数据集合调用,最终产生作用。幂等性此时就没有多大作用了,因为逻辑数据集不一样了。
除此之外,还有其它的不确定性引入。标准的定位此风险问题的方法可以基于Dataflow目前保证仅有一个版本的DnFn
的操作结果会产生Shuffle边界。可以通过内置的Reshuffle转换算子来使用这种保证,以下代码提供了一个范例,来保证有副作用的操作总是接受确定性记录来产生输出。
1 | c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1)))) |
代码将Sink分解为两个部分:PrepareOutputData
和WriteToSideEffect
,PrepareOutputData
对相关记录输出产生幂等性写出。如果只是简单的将算子一一串联,整个处理过程会在失败后重放,但是PrepareOutputData
可能会产生不同的输出,继而都写出为不同结果。因此我们在两者之间加入Reshuffle
操作,Dataflow保证此这种异常不会发生。
当然,Dataflow仍然会将WriteToSideEffect
算子执行多次,副作用操作仍然需要具备幂等性,否则sink会产生重复输出。比如,在设置或覆盖数据到数据仓库时,操作是幂等性的,即使运行多次,产生的结果也是正确的。而对列表的追加操作是非幂等的,如果操作运行多次,每次的结果都会被追加到列表中。
Reshuffle
提供一个简单的机制,通过GroupByKey,来为DoFn
提供稳定输出。目前有一项提议,以删除对GroupByKey的依赖,代之以对WriteToSideEffect
的特殊注解@RequiresStableInput
。
其它系统
我们已经介绍了Dataflow的Exactly-once的机制,我们可以对比一下其它主流的流处理框架简要设计,每种系统会以不同的方式实现Exactly-once保证,并因此有不同的权衡。
Apache Spark Streaming
Spark Streaming是一种基于微批处理架构的持续数据处理框架,逻辑上,用户处理的是个流对象,然而,在底层,Spark将流对象表示为一系列连续的RDD。每个RDD启动一个批处理,Spark依赖于批处理的Exactly-once的特性来保证准确性——而用于保证批处理流程准确的技术已经出现有一段时间了。他和中架构可能会导致输出的延迟加长——尤其是对于过程比较长以及数据输入量比较大的处理来说——为了获得理想的延迟,需要经过比较细致的调试过程。
Spark假设所有的操作都是幂等的,可以在DAG链的位置上向上回溯进行重放,Spark最初就提供了Checkpoint,但是这样会使RDD被持久化,来保证此RDD之前的历史不会被重放。Checkpoint最初是为了性能设计的(防止高性能消耗的过程重复执行),但是同样可以使用来解决非幂等的副作用。
Apache Flink
Apache Flink同样提供Exactly-once的流式处理,但是采用了一种与Dataflow及Spark截然不同的方式。Flink程序定期会计算一个一致性的快照,表示快照点上整个程序中的一致性状态。Flink的快照是渐进计算的,因此不用在计算过程中暂停整个程序,从而允许记录在快照生成过程中持续流经系统,而不用面临Spark Streaming那样的延迟问题。
Flink通过在源自Source的数据流中插入特殊的数字标记的快照生成器。每个算子接收到这个快照生成器后,执行一个特殊算法来将其状态保存到外部存储,并产生一个新的快照生成器,传给下游算子。当所有算子都执行过快照算法后,一个完整的快照就生成了。任何worker的错误,都会造成整个程序回滚到上一个完整的快照上。处于发送中的消息不需要保存在快照中。所有经过Flink发布的消息,都是通过一个有序的基于TCP的通道。所有的连接失败,都可以通过最后一个正确的序列号来恢复,与Dataflow有所不同,Flink的任务是静态的分配给worker的,因此,它可以假设连接可以从同一个发送者,同一个下游恢复。
由于Flink可能会在任意时刻回滚到前一个快照点,所以任何未保存到快照中的状态修改都被认为是临时的。所有Sink在发送数据到外部时,都必须等待快照生成,然后将快照中的数据发送出去。Flink提供notifySnapshotComplete
回调接口,在快照生成后,通知sink,然后发送数据。这个过程会引入延迟,但此延迟仅在Sink中,因此在实际使用中,Flink有更小的端对端延迟,相比于Spark,其延迟发生在程序的所有Stage中。
Flink分布式快照在处理流式程序的一致性上提供了一个很优雅的策略,但是其对于程序的环境做了很多的假设。比如,它假设错误是极少发生的,因为处理故障时的回滚影响时非常大的;它假设快照完成过程是非常快的,以维持输出的低延迟。至于其能否在大规模集群上正常输出,还需要看一看,毕竟,此时故障发生的几率会显著增加,同时快照的同步也需要些时间。
另外,它还通过动态分配任务给worker(至少在一段时间内),来简化整个系统设计,这样可以很简单的实现worker间Exactly-once的数据传输:当连接失败时,相同的数据可以由相同的worker拉取。相比之下,Dataflow中的任务总是被平衡的分配到不同worker中(worker也可以在这个过程中动态的增加或删除),所以无法给予这种假设,从而必须实现一种更加复杂的传输层模型,来提供Exactly-once的处理。