一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming需要checkpoint足够的信息到容错存储系统中,以使系统从故障中恢复。
Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括
Incomplete batches:操作存在队列中的未完成的批
元数据checkpoint主要是为了从driver故障中恢复数据。如果transformation操作被用到了,数据checkpoint即使在简单的操作中都是必须的。
应用程序在下面两种情况下必须开启checkpoint
updateStateByKey
或者reduceByKeyAndWindow
,checkpoint目录必需提供用以定期checkpoint RDD。注意,没有前述的有状态的transformation的简单流应用程序在运行时可以不开启checkpoint。在这种情况下,从driver故障的恢复将是部分恢复(接收到了但是还没有处理的数据将会丢失)。这通常是可以接受的,许多运行的Spark Streaming应用程序都是这种方式。
在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。着可以通过streamingContext.checkpoint(checkpointDirectory)
方法来做。这运行你用之前介绍的有状态transformation。另外,如果你想从driver故障中恢复,你应该以下面的方式重写你的Streaming应用程序。
start()
方法// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
如果checkpointDirectory
存在,上下文将会利用checkpoint数据重新创建。如果这个目录不存在,将会调用functionToCreateContext
函数创建一个新的上下文,建立DStreams。请看RecoverableNetworkWordCount例子。
除了使用getOrCreate
,开发者必须保证在故障发生时,driver处理自动重启。只能通过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。
注意,RDD的checkpointing有存储成本。这会导致批数据(包含的RDD被checkpoint)的处理时间增加。因此,需要小心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)情况下,checkpoint每批数据会显著的减少操作的吞吐量。相反,checkpointing太少会导致谱系以及任务大小增大,这会产生有害的影响。因为有状态的transformation需要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它可以通过dstream.checkpoint
来设置。典型的情况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。
概论在高层中,每个 Spark 应用程序都由一个驱动程序(driver programe)构成,驱动程序在集群上运行用户的 main 函数来执行各...
使用非 JVM 语言开发有时候你可能想使用不是基于 JVM 的语言开发一个 Storm 工程,你可能更喜欢使用别的语言或者想使用用某种语...
Neo4j使用CQL DELETE子句删除节点。删除节点及相关节点和关系。我们将在本章中讨论如何删除一个节点。 我们将在下一章讨论如何删...
Neo4j CQL ORDER BY子句 Neo4j CQL在MATCH命令中提供了“ORDER BY”子句,对MATCH查询返回的结果进行排序。我们可以按升序或降序...
Neo4j CQL RETURN子句用于 - 检索节点的某些属性检索节点的所有属性检索节点和关联关系的某些属性检索节点和关联关系的所有属性R...
在本章中,我们将讨论如何开发一个 Spring 框架项目来使用 Neo4j 数据库。Spring DATA Neo4j 模块注释我们将使用以下 Spring F...
Map-Reduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE)。Mongo...
语法MongoDB 删除数据库的语法格式如下:db.dropDatabase()删除当前数据库,默认为 test,你可以使用 db 命令查看当前数据库名。...
MongoDB中聚合(aggregate)主要用于处理数据(诸如统计平均值,求和等),并返回计算后的数据结果。有点类似sql语句中的 count(*)。 ...