Spark GraphX是Apache Spark的图形计算框架,它可以用于处理大规模图形和图形分析。它是一个高性能的分布式图形处理系统,可以在集群上运行,并且可以使用Spark的内存管理和数据并行性来加快处理速度。GraphX使用RDDs(可分区数据集)来表示图形和关系,并提供了一套高性能的API来执行各种图形转换和分析。
GraphX API包含了一些有用的函数,可以帮助开发者快速实现各种图形分析。例如:PageRank、Triangle Counting、Connected Components、Strongly Connected Components、Label Propagation、SVD++ 等。此外,GraphX还允许开发者使用Pregel API来定义自己的图形分析函数。
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt") val ranks = graph.pageRank(0.0001).vertices ranks.collect.foreach(println)
上面代码中,我们使用GraphLoader加载一个文本文件中的节点关系数据作为一个Graph对象。然后我们使用pageRank()方法来执行PageRank分析(d=0.0001)并返回所有顶点的PageRank值。最后我们遍历打印出所有顶点的PageRank值。
属性图是一个有向多重图,它带有连接到每个顶点和边的用户定义的对象。有向多重图中多个并行(parallel)的边共享相同的源和目的地顶点。支持并行边的能力简化了建模场景,这个场景中,相同的顶点存在多种关系(例如co-worker和friend)。每个顶点由一个唯一的64位长的标识符(VertexID)作为key。GraphX并没有对顶点标识强加任何排序。同样,顶点拥有相应的源和目的顶点标识符。
属性图通过vertex(VD)和edge(ED)类型参数化,这些类型是分别与每个顶点和边相关联的对象的类型。
在某些情况下,在相同的图形中,可能希望顶点拥有不同的属性类型。这可以通过继承完成。例如,将用户和产品建模成一个二分图,我们可以用如下方式
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
和RDD一样,属性图是不可变的、分布式的、容错的。图的值或者结构的改变需要按期望的生成一个新的图来实现。注意,原始图的大部分都可以在新图中重用,用来减少这种固有的功能数据结构的成本。执行者使用一系列顶点分区试探法来对图进行分区。如RDD一样,图中的每个分区可以在发生故障的情况下被重新创建在不同的机器上。
逻辑上的属性图对应于一对类型化的集合(RDD),这个集合编码了每一个顶点和边的属性。因此,图类包含访问图中顶点和边的成员。
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
VertexRDD[VD]
和EdgeRDD[ED]
类分别继承和优化自RDD[(VertexID, VD)]
和RDD[Edge[ED]]
。VertexRDD[VD]
和EdgeRDD[ED]
都支持额外的功能来建立在图计算和利用内部优化。
在GraphX项目中,假设我们想构造一个包括不同合作者的属性图。顶点属性可能包含用户名和职业。我们可以用描述合作者之间关系的字符串标注边缘。
所得的图形将具有类型签名
val userGraph: Graph[(String, String), String]
有很多方式从一个原始文件、RDD构造一个属性图。最一般的方法是利用Graph object。下面的代码从RDD集合生成属性图。
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
在上面的例子中,我们用到了Edge样本类。边有一个srcId
和dstId
分别对应于源和目标顶点的标示符。另外,Edge
类有一个attr
成员用来存储边属性。
我们可以分别用graph.vertices
和graph.edges
成员将一个图解构为相应的顶点和边。
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
注意,graph.vertices返回一个VertexRDD[(String, String)],它继承于 RDD[(VertexID, (String, String))]。所以我们可以用scala的case表达式解构这个元组。另一方面,
graph.edges返回一个包含Edge[String]对象的EdgeRDD。我们也可以用到case类的类型构造器,如下例所示。
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
除了属性图的顶点和边视图,GraphX也包含了一个三元组视图,三元视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]]
,它包含EdgeTriplet类的实例。可以通过下面的Sql表达式表示这个连接。
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
或者通过下面的图来表示。
EdgeTriplet
类继承于Edge
类,并且加入了srcAttr
和dstAttr
成员,这两个成员分别包含源和目的的属性。我们可以用一个三元组视图渲染字符串集合用来描述用户之间的关系。
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
Spark StreamingCheckpointing一个流应用程序必须全天候运行,所有必须能够解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等...
概论在高层中,每个 Spark 应用程序都由一个驱动程序(driver programe)构成,驱动程序在集群上运行用户的 main 函数来执行各...
使用非 JVM 语言开发有时候你可能想使用不是基于 JVM 的语言开发一个 Storm 工程,你可能更喜欢使用别的语言或者想使用用某种语...
Neo4j使用CQL DELETE子句删除节点。删除节点及相关节点和关系。我们将在本章中讨论如何删除一个节点。 我们将在下一章讨论如何删...
Neo4j CQL ORDER BY子句 Neo4j CQL在MATCH命令中提供了“ORDER BY”子句,对MATCH查询返回的结果进行排序。我们可以按升序或降序...
Neo4j CQL RETURN子句用于 - 检索节点的某些属性检索节点的所有属性检索节点和关联关系的某些属性检索节点和关联关系的所有属性R...
在本章中,我们将讨论如何开发一个 Spring 框架项目来使用 Neo4j 数据库。Spring DATA Neo4j 模块注释我们将使用以下 Spring F...
Map-Reduce是一种计算模型,简单的说就是将大批量的工作(数据)分解(MAP)执行,然后再将结果合并成最终结果(REDUCE)。Mongo...
语法MongoDB 删除数据库的语法格式如下:db.dropDatabase()删除当前数据库,默认为 test,你可以使用 db 命令查看当前数据库名。...