GraphX プログラミングガイド
概要
GraphX は、グラフとグラフ並列計算のための Spark の新しいコンポーネントです。概要レベルでは、GraphX は Spark RDD を拡張し、新しい グラフ 抽象化を導入します。これは、各頂点と辺にプロパティが添付された有向マルチグラフです。グラフ計算をサポートするために、GraphX は一連の基本演算子 (例: 部分グラフ、joinVertices、aggregateMessages) と、Pregel API の最適化されたバリアントを公開します。さらに、GraphX には、グラフ分析タスクを簡素化するためのグラフ アルゴリズム と ビルダー のコレクションが増え続けています。
はじめに
使い始めるには、まず Spark と GraphX をプロジェクトにインポートする必要があります。次のようにします。
import org.apache.spark._
import org.apache.spark.graphx._
// To make some of the examples work we will also need RDD
import org.apache.spark.rdd.RDD
Spark シェルを使用していない場合は、SparkContext
も必要になります。Spark の使用方法の詳細については、Spark クイックスタートガイド を参照してください。
プロパティグラフ
プロパティグラフ は、各頂点と辺にユーザー定義オブジェクトが添付された有向マルチグラフです。有向マルチグラフは、同じソース頂点と宛先頂点を共有する複数の並列辺を持つ可能性のある有向グラフです。並列辺をサポートできるため、同じ頂点間に複数の関係 (例: 同僚と友人) が存在する可能性のあるモデリングシナリオが簡素化されます。各頂点は、*一意の* 64 ビット long 識別子 (VertexId
) によってキーが設定されます。GraphX は、頂点識別子に順序制約を課しません。同様に、辺には対応するソース頂点識別子と宛先頂点識別子があります。
プロパティグラフは、頂点 (VD
) と辺 (ED
) の型をパラメータとして使用します。これらは、それぞれ各頂点と辺に関連付けられたオブジェクトの型です。
GraphX は、頂点と辺の型がプリミティブデータ型 (例: int、double など) の場合、それらを特殊な配列に格納することにより、メモリフットプリントを削減して表現を最適化します。
場合によっては、同じグラフ内に異なるプロパティ型を持つ頂点を持つことが望ましい場合があります。これは、継承によって実現できます。たとえば、ユーザーと製品を二部グラフとしてモデル化するには、次のようにします。
class VertexProperty()
case class UserProperty(val name: String) extends VertexProperty
case class ProductProperty(val name: String, val price: Double) extends VertexProperty
// The graph might then have the type:
var graph: Graph[VertexProperty, String] = null
RDD と同様に、プロパティグラフは不変、分散、耐障害性があります。グラフの値または構造の変更は、目的の変更を加えた新しいグラフを作成することによって実現されます。元のグラフのかなりの部分 (つまり、影響を受けない構造、属性、およびインデックス) は新しいグラフで再利用され、この本質的に関数型データ構造のコストが削減されることに注意してください。グラフは、頂点パーティション分割ヒューリスティックの範囲を使用して、エグゼキュータ全体にパーティション分割されます。RDD と同様に、グラフの各パーティションは、障害が発生した場合に別のマシンで再作成できます。
論理的には、プロパティグラフは、各頂点と辺のプロパティをエンコードする一対の型付きコレクション (RDD) に対応します。結果として、グラフクラスには、グラフの頂点と辺にアクセスするためのメンバーが含まれています。
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
}
VertexRDD[VD]
クラスと EdgeRDD[ED]
クラスは、それぞれ RDD[(VertexId, VD)]
と RDD[Edge[ED]]
を拡張および最適化したバージョンです。VertexRDD[VD]
と EdgeRDD[ED]
はどちらも、グラフ計算を中心に構築された追加機能を提供し、内部最適化を活用します。頂点と辺の RDD のセクションで VertexRDD
VertexRDD と EdgeRDD
EdgeRDD API について詳しく説明しますが、ここでは、単に RDD[(VertexId, VD)]
と RDD[Edge[ED]]
形式の RDD と考えることができます。
プロパティグラフの例
GraphX プロジェクトのさまざまな共同作業者で構成されるプロパティグラフを構築したいとします。頂点プロパティには、ユーザー名と職業が含まれている場合があります。共同作業者間の関係を説明する文字列で辺に注釈を付けることができます。
結果のグラフは、次の型シグネチャを持ちます。
val userGraph: Graph[(String, String), String]
生のファイル、RDD、さらには合成ジェネレータからプロパティグラフを構築する方法は多数あり、これらについては グラフビルダー のセクションで詳しく説明します。おそらく最も一般的な方法は、Graph オブジェクト を使用することです。たとえば、次のコードは RDD のコレクションからグラフを構築します。
// Assume the SparkContext has already been constructed
val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
上記の例では、Edge
ケースクラスを使用しています。辺には、ソース頂点識別子と宛先頂点識別子に対応する srcId
と dstId
があります。さらに、Edge
クラスには、辺プロパティを格納する attr
メンバーがあります。
それぞれ graph.vertices
メンバーと graph.edges
メンバーを使用して、グラフをそれぞれの頂点ビューと辺ビューに分解できます。
val graph: Graph[(String, String), String] // Constructed from above
// Count all users which are postdocs
graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count
// Count all the edges where src > dst
graph.edges.filter(e => e.srcId > e.dstId).count
graph.vertices
はRDD[(VertexId, (String, String))]
を拡張するVertexRDD[(String, String)]
を返すため、scalacase
式を使用してタプルを分解することに注意してください。一方、graph.edges
はEdge[String]
オブジェクトを含むEdgeRDD
を返します。次のようにケースクラスタイプコンストラクタを使用することもできました。
graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
プロパティグラフの頂点ビューと辺ビューに加えて、GraphX はトリプレットビューも公開します。トリプレットビューは、頂点と辺のプロパティを論理的に結合し、EdgeTriplet
クラスのインスタンスを含む RDD[EdgeTriplet[VD, ED]]
を生成します。この *結合* は、次の SQL 式で表すことができます。
SELECT src.id, dst.id, src.attr, e.attr, dst.attr
FROM edges AS e LEFT JOIN vertices AS src, vertices AS dst
ON e.srcId = src.Id AND e.dstId = dst.Id
または、グラフィカルに次のように表すことができます。
EdgeTriplet
クラスは、それぞれソースプロパティと宛先プロパティを含む srcAttr
メンバーと dstAttr
メンバーを追加することにより、Edge
クラスを拡張します。グラフのトリプレットビューを使用して、ユーザー間の関係を説明する文字列のコレクションをレンダリングできます。
val graph: Graph[(String, String), String] // Constructed from above
// Use the triplets view to create an RDD of facts.
val facts: RDD[String] =
graph.triplets.map(triplet =>
triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)
facts.collect.foreach(println(_))
グラフ演算子
RDD に map
、filter
、reduceByKey
などの基本的な操作があるのと同様に、プロパティグラフにも、ユーザー定義関数を受け取り、変換されたプロパティと構造を持つ新しいグラフを生成する基本的な演算子のコレクションがあります。最適化された実装を持つコア演算子は Graph
で定義され、コア演算子の合成として表される便利な演算子は GraphOps
で定義されます。ただし、Scala の暗黙的変換のおかげで、GraphOps
の演算子は Graph
のメンバーとして自動的に使用できます。たとえば、各頂点の次数 ( in-degree ) ( GraphOps
で定義) は、次のように計算できます。
val graph: Graph[(String, String), String]
// Use the implicit GraphOps.inDegrees operator
val inDegrees: VertexRDD[Int] = graph.inDegrees
コアグラフ操作と GraphOps
を区別する理由は、将来、異なるグラフ表現をサポートできるようにするためです。各グラフ表現は、コア操作の実装を提供し、GraphOps
で定義されている多くの便利な操作を再利用する必要があります。
演算子の概要リスト
以下は、Graph
と GraphOps
の両方で定義されている機能の簡単な要約ですが、簡潔にするために Graph のメンバーとして提示されています。一部の関数シグネチャは簡略化されており (例: デフォルトの引数と型の制約が削除されている)、一部の高度な機能は削除されているため、操作の公式リストについては API ドキュメントを参照してください。
/** Summary of the functionality in the property graph */
class Graph[VD, ED] {
// Information about the Graph ===================================================================
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
// Views of the graph as collections =============================================================
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
// Functions for caching graphs ==================================================================
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def cache(): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = false): Graph[VD, ED]
// Change the partitioning heuristic ============================================================
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
// Transform vertex and edge attributes ==========================================================
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2])
: Graph[VD, ED2]
// Modify the graph structure ====================================================================
def reverse: Graph[VD, ED]
def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join RDDs with the graph ======================================================================
def joinVertices[U](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U, VD2](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
// Aggregate information about adjacent triplets =================================================
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A]
// Iterative graph-parallel computation ==========================================================
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
// Basic graph algorithms ========================================================================
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def connectedComponents(): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
}
プロパティ演算子
RDD map
演算子と同様に、プロパティグラフには次が含まれています。
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
これらの各演算子は、ユーザー定義の map
関数によって頂点または辺のプロパティが変更された新しいグラフを生成します。
いずれの場合も、グラフ構造は影響を受けないことに注意してください。これは、これらの演算子の重要な機能であり、結果のグラフが元のグラフの構造インデックスを再利用できるようにします。次のスニペットは論理的には同等ですが、最初のスニペットは構造インデックスを保持せず、GraphX システムの最適化の恩恵を受けません。
val newVertices = graph.vertices.map { case (id, attr) => (id, mapUdf(id, attr)) }
val newGraph = Graph(newVertices, graph.edges)
代わりに、インデックスを保持するために
mapVertices
を使用してください。
val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr))
これらの演算子は、特定の計算のためにグラフを初期化したり、不要なプロパティを除去したりするためによく使用されます。たとえば、頂点プロパティとして出力次数を持つグラフ(このようなグラフの構築方法については後述します)が与えられた場合、PageRankのためにそれを初期化します。
// Given a graph where the vertex property is the out degree
val inputGraph: Graph[Int, String] =
graph.outerJoinVertices(graph.outDegrees)((vid, _, degOpt) => degOpt.getOrElse(0))
// Construct a graph where each edge contains the weight
// and each vertex is the initial PageRank
val outputGraph: Graph[Double, Double] =
inputGraph.mapTriplets(triplet => 1.0 / triplet.srcAttr).mapVertices((id, _) => 1.0)
構造演算子
現在、GraphXは一般的に使用される構造演算子の単純なセットのみをサポートしており、将来的にはさらに追加する予定です。以下は、基本的な構造演算子のリストです。
class Graph[VD, ED] {
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}
reverse
演算子は、すべての辺の方向が逆になった新しいグラフを返します。これは、たとえば、逆PageRankを計算しようとする場合に役立ちます。逆演算は、頂点または辺のプロパティを変更したり、辺の数を変更したりしないため、データの移動や複製なしで効率的に実装できます。
subgraph
演算子は、頂点と辺の述語を取り、頂点述語を満たす(trueと評価される)頂点と、辺述語を満たし*、頂点述語を満たす頂点を接続する*辺のみを含むグラフを返します。subgraph
演算子は、多くの状況で使用して、グラフを対象の頂点と辺に制限したり、壊れたリンクを排除したりできます。たとえば、次のコードでは、壊れたリンクを削除します。
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")), (2L, ("istoica", "prof")),
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)
// Notice that there is a user 0 (for which we have no information) connected to users
// 4 (peter) and 5 (franklin).
graph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// The valid subgraph will disconnect users 4 and 5 by removing user 0
validGraph.vertices.collect.foreach(println(_))
validGraph.triplets.map(
triplet => triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1
).collect.foreach(println(_))
上記の例では、頂点述語のみが提供されていることに注意してください。頂点または辺の述語が提供されない場合、
subgraph
演算子はデフォルトでtrue
になります。
mask
演算子は、入力グラフにも見つかる頂点と辺を含むグラフを返すことによって、部分グラフを構築します。これは、subgraph
演算子と組み合わせて使用して、別の関連グラフのプロパティに基づいてグラフを制限できます。たとえば、頂点のないグラフを使用して連結成分を実行し、結果を有効な部分グラフに制限することができます。
// Run Connected Components
val ccGraph = graph.connectedComponents() // No longer contains missing field
// Remove missing vertices as well as the edges to connected to them
val validGraph = graph.subgraph(vpred = (id, attr) => attr._2 != "Missing")
// Restrict the answer to the valid subgraph
val validCCGraph = ccGraph.mask(validGraph)
groupEdges
演算子は、マルチグラフ内の並列エッジ(つまり、頂点のペア間の重複エッジ)をマージします。多くの数値アプリケーションでは、並列エッジを単一のエッジに*追加*(重みを組み合わせる)して、グラフのサイズを縮小できます。
結合演算子
多くの場合、外部コレクション(RDD)のデータをグラフに結合する必要があります。たとえば、既存のグラフにマージしたい追加のユーザープロパティがある場合や、あるグラフから別のグラフに頂点プロパティを取得したい場合があります。これらのタスクは、*結合*演算子を使用して実行できます。以下に、主要な結合演算子をリストします。
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
: Graph[VD, ED]
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
joinVertices
演算子は、頂点を入力RDDと結合し、結合された頂点の結果にユーザー定義のmap
関数を適用することによって取得された頂点プロパティを持つ新しいグラフを返します。RDDに対応する値のない頂点は、元の値を保持します。
RDDに特定の頂点に対して複数の値が含まれている場合は、1つだけが使用されることに注意してください。したがって、以下を使用して入力RDDを一意にすることをお勧めします。これにより、結果の値が*事前にインデックス付け*され、後続の結合が大幅に高速化されます。
val nonUniqueCosts: RDD[(VertexId, Double)]
val uniqueCosts: VertexRDD[Double] =
graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b)
val joinedGraph = graph.joinVertices(uniqueCosts)(
(id, oldCost, extraCost) => oldCost + extraCost)
より一般的なouterJoinVertices
は、ユーザー定義のmap
関数がすべての頂点に適用され、頂点プロパティのタイプを変更できることを除いて、joinVertices
と同様に動作します。すべての頂点が入力RDDに対応する値を持っているとは限らないため、map
関数はOption
タイプを取ります。たとえば、頂点プロパティをoutDegree
で初期化することにより、PageRankのグラフを設定できます。
val outDegrees: VertexRDD[Int] = graph.outDegrees
val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) =>
outDegOpt match {
case Some(outDeg) => outDeg
case None => 0 // No outDegree means zero outDegree
}
}
上記の例で使用されている複数の引数リスト(例:
f(a)(b)
)カリー化関数パターンに気付いたかもしれません。f(a)(b)
をf(a,b)
と書くこともできましたが、これはb
の型推論がa
に依存しないことを意味します。結果として、ユーザーはユーザー定義関数に型注釈を提供する必要があります。
val joinedGraph = graph.joinVertices(uniqueCosts,
(id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost)
近傍集約
多くのグラフ分析タスクの重要なステップは、各頂点の近傍に関する情報を集約することです。たとえば、各ユーザーのフォロワーの数や、各ユーザーのフォロワーの平均年齢を知りたい場合があります。多くの反復グラフアルゴリズム(例:PageRank、最短パス、および連結成分)は、隣接する頂点のプロパティ(例:現在のPageRank値、ソースへの最短パス、および到達可能な最小頂点ID)を繰り返し集約します。
パフォーマンスを向上させるために、プライマリアグリゲーション演算子が
graph.mapReduceTriplets
から新しいgraph.AggregateMessages
に変更されました。APIの変更は比較的小さいですが、以下に遷移ガイドを提供します。
メッセージの集約 (aggregateMessages)
GraphXのコア集約操作は、aggregateMessages
です。この演算子は、ユーザー定義のsendMsg
関数をグラフ内の各*エッジトリプレット*に適用し、mergeMsg
関数を使用して、それらのメッセージを宛先頂点で集約します。
class Graph[VD, ED] {
def aggregateMessages[Msg: ClassTag](
sendMsg: EdgeContext[VD, ED, Msg] => Unit,
mergeMsg: (Msg, Msg) => Msg,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[Msg]
}
ユーザー定義のsendMsg
関数は、EdgeContext
を取り、ソース属性と宛先属性、エッジ属性、およびソース属性と宛先属性にメッセージを送信する関数(sendToSrc
、およびsendToDst
)を公開します。sendMsg
をマップリデュースの*マップ*関数と考えてください。ユーザー定義のmergeMsg
関数は、同じ頂点宛ての2つのメッセージを取り、単一のメッセージを生成します。mergeMsg
をマップリデュースの*リデュース*関数と考えてください。aggregateMessages
演算子は、各頂点宛ての集約メッセージ(Msg
型)を含むVertexRDD[Msg]
を返します。メッセージを受信しなかった頂点は、返されたVertexRDD
VertexRDDには含まれません。
さらに、aggregateMessages
は、オプションのtripletsFields
を取り、EdgeContext
でどのデータにアクセスするかを示します(つまり、ソース頂点属性にアクセスしますが、宛先頂点属性にはアクセスしません)。tripletsFields
の可能なオプションは、TripletFields
で定義されており、デフォルト値はTripletFields.All
です。これは、ユーザー定義のsendMsg
関数がEdgeContext
の任意のフィールドにアクセスできることを示します。tripletFields
引数を使用して、EdgeContext
の一部のみが必要であることをGraphXに通知することで、GraphXが最適化された結合戦略を選択できるようにすることができます。たとえば、各ユーザーのフォロワーの平均年齢を計算する場合、ソースフィールドのみが必要になるため、TripletFields.Src
を使用して、ソースフィールドのみが必要であることを示します。
GraphXの以前のバージョンでは、バイトコード検査を使用して
TripletFields
を推測していましたが、バイトコード検査はわずかに信頼性が低いことがわかったので、より明示的なユーザー制御を選択しました。
次の例では、aggregateMessages
演算子を使用して、各ユーザーのより上位のフォロワーの平均年齢を計算します。
import org.apache.spark.graphx.{Graph, VertexRDD}
import org.apache.spark.graphx.util.GraphGenerators
// Create a graph with "age" as the vertex property.
// Here we use a random graph for simplicity.
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
// Compute the number of older followers and their total age
val olderFollowers: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)](
triplet => { // Map Function
if (triplet.srcAttr > triplet.dstAttr) {
// Send message to destination vertex containing counter and age
triplet.sendToDst((1, triplet.srcAttr))
}
},
// Add counter and age
(a, b) => (a._1 + b._1, a._2 + b._2) // Reduce Function
)
// Divide total age by number of older followers to get average age of older followers
val avgAgeOfOlderFollowers: VertexRDD[Double] =
olderFollowers.mapValues( (id, value) =>
value match { case (count, totalAge) => totalAge / count } )
// Display the results
avgAgeOfOlderFollowers.collect.foreach(println(_))
aggregateMessages
操作は、メッセージ(およびメッセージの合計)のサイズが一定の場合(例:リストと連結ではなく、浮動小数点数と加算)に最適に実行されます。
Map Reduce Triplets 移行ガイド (レガシー)
GraphXの以前のバージョンでは、近傍集約はmapReduceTriplets
演算子を使用して実行されていました。
class Graph[VD, ED] {
def mapReduceTriplets[Msg](
map: EdgeTriplet[VD, ED] => Iterator[(VertexId, Msg)],
reduce: (Msg, Msg) => Msg)
: VertexRDD[Msg]
}
mapReduceTriplets
演算子は、各トリプレットに適用されるユーザー定義のマップ関数を取り、ユーザー定義のreduce
関数を使用して集約される*メッセージ*を生成できます。ただし、返されたイテレータのユーザーは費用がかかり、追加の最適化(例:ローカル頂点の番号付け直し)を適用する能力が阻害されることがわかりました。aggregateMessages
では、トリプレットフィールドと、ソース頂点と宛先頂点に明示的にメッセージを送信する関数を公開するEdgeContextを導入しました。さらに、バイトコード検査を削除し、代わりにユーザーがトリプレットのどのフィールドが実際に必要かを示す必要があります。
mapReduceTriplets
を使用した次のコードブロック
val graph: Graph[Int, Float] = ...
def msgFun(triplet: Triplet[Int, Float]): Iterator[(Int, String)] = {
Iterator((triplet.dstId, "Hi"))
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.mapReduceTriplets[String](msgFun, reduceFun)
は、aggregateMessages
を使用して次のように書き直すことができます。
val graph: Graph[Int, Float] = ...
def msgFun(triplet: EdgeContext[Int, Float, String]) {
triplet.sendToDst("Hi")
}
def reduceFun(a: String, b: String): String = a + " " + b
val result = graph.aggregateMessages[String](msgFun, reduceFun)
次数情報の計算
一般的な集約タスクは、各頂点の次数、つまり各頂点に隣接する辺の数を計算することです。有向グラフのコンテキストでは、多くの場合、各頂点の入力次数、出力次数、および合計次数を知る必要があります。GraphOps
クラスには、各頂点の次数を計算するための一連の演算子が含まれています。たとえば、以下では、最大入力次数、出力次数、および合計次数を計算します。
// Define a reduce operation to compute the highest degree vertex
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// Compute the max degrees
val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)
近傍の収集
場合によっては、各頂点で隣接する頂点とその属性を収集することによって計算を表す方が簡単な場合があります。これは、collectNeighborIds
演算子とcollectNeighbors
演算子を使用して簡単に実行できます。
class GraphOps[VD, ED] {
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ]
}
これらの演算子は、情報を複製し、大量の通信を必要とするため、非常にコストがかかる可能性があります。 可能であれば、
aggregateMessages
演算子を直接使用して、同じ計算を表現してみてください。
キャッシングとキャッシュ解除
Sparkでは、RDDはデフォルトでメモリに永続化されません。 再計算を避けるために、複数回使用する場合には明示的にキャッシュする必要があります(Sparkプログラミングガイドを参照)。 GraphXのグラフも同じように動作します。 グラフを複数回使用する場合は、最初にGraph.cache()
を呼び出すようにしてください。
反復計算では、最高のパフォーマンスを得るために、*アンキャッシュ*が必要になる場合もあります。 デフォルトでは、キャッシュされたRDDとグラフは、メモリ圧力によってLRU順に削除されるまでメモリに残ります。 反復計算では、以前の反復の中間結果がキャッシュをいっぱいにします。 最終的には削除されますが、メモリに保存されている不要なデータはガベージコレクションを遅くします。 中間結果が必要なくなったらすぐにアンキャッシュする方が効率的です。 これには、グラフまたはRDDを毎回マテリアライズ(キャッシュして強制)し、他のすべてのデータセットをアンキャッシュし、将来の反復ではマテリアライズされたデータセットのみを使用することが含まれます。 ただし、グラフは複数のRDDで構成されているため、正しく永続化解除するのが難しい場合があります。 反復計算には、中間結果を正しく永続化解除するPregel APIを使用することをお勧めします。
Pregel API
グラフは、頂点のプロパティが隣接する頂点のプロパティに依存し、さらに*それらの*隣接する頂点のプロパティに依存するため、本質的に再帰的なデータ構造です。 その結果、多くの重要なグラフアルゴリズムは、固定小数点条件に達するまで、各頂点のプロパティを反復的に再計算します。 これらの反復アルゴリズムを表現するために、さまざまなグラフ並列抽象化が提案されています。 GraphXは、Pregel APIのバリアントを公開しています。
高レベルでは、GraphXのPregel演算子は、*グラフのトポロジーに制約された*バルク同期並列メッセージング抽象化です。 Pregel演算子は、一連の superstep で実行されます。 頂点は前の superstep からの受信メッセージの*合計*を受け取り、頂点プロパティの新しい値を計算し、次の superstep で隣接する頂点にメッセージを送信します。 Pregelとは異なり、メッセージはエッジトリプレットの関数として並列に計算され、メッセージ計算はソースとデスティネーションの両方の頂点属性にアクセスできます。 メッセージを受信しない頂点は、superstep 内でスキップされます。 Pregel演算子は、残りのメッセージがない場合に反復を終了し、最終的なグラフを返します。
より標準的なPregel実装とは異なり、GraphXの頂点は隣接する頂点にのみメッセージを送信でき、メッセージ構築はユーザー定義のメッセージング関数を使用して並列に行われます。 これらの制約により、GraphX内で追加の最適化が可能になります。
以下は、Pregel演算子の型シグネチャと、その実装の*スケッチ*です(注:長い系統チェーンによるstackOverflowErrorを回避するために、pregelは「spark.graphx.pregel.checkpointInterval」を正の数(たとえば10)に設定することにより、グラフとメッセージを定期的にチェックポイントします。 また、SparkContext.setCheckpointDir(directory:String)を使用してチェックポイントディレクトリも設定します)
class GraphOps[VD, ED] {
def pregel[A]
(initialMsg: A,
maxIter: Int = Int.MaxValue,
activeDir: EdgeDirection = EdgeDirection.Out)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] = {
// Receive the initial message at each vertex
var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
// compute the messages
var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)
var activeMessages = messages.count()
// Loop until no messages remain or maxIterations is achieved
var i = 0
while (activeMessages > 0 && i < maxIterations) {
// Receive the messages and update the vertices.
g = g.joinVertices(messages)(vprog).cache()
val oldMessages = messages
// Send new messages, skipping edges where neither side received a message. We must cache
// messages so it can be materialized on the next line, allowing us to uncache the previous
// iteration.
messages = GraphXUtils.mapReduceTriplets(
g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
activeMessages = messages.count()
i += 1
}
g
}
}
Pregelは2つの引数リスト(つまり、graph.pregel(list1)(list2)
)を受け取ることに注意してください。 最初の引数リストには、初期メッセージ、最大反復回数、メッセージを送信するエッジの方向(デフォルトでは出力エッジに沿って)などの構成パラメータが含まれています。 2番目の引数リストには、メッセージを受信するためのユーザー定義関数(頂点プログラムvprog
)、メッセージの計算(sendMsg
)、およびメッセージの結合mergeMsg
が含まれています。
次の例のように、Pregel演算子を使用して、単一ソースの最短パスなどの計算を表現できます。
import org.apache.spark.graphx.{Graph, VertexId}
import org.apache.spark.graphx.util.GraphGenerators
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
グラフビルダー
GraphXは、RDDまたはディスク上の頂点とエッジのコレクションからグラフを構築するいくつかの方法を提供します。 デフォルトでは、グラフビルダーはいずれもグラフのエッジを再パーティション化しません。 代わりに、エッジはデフォルトのパーティション(HDFSの元のブロックなど)に残されます。 Graph.groupEdges
は、同じエッジが同じパーティションに配置されると想定しているため、グラフの再パーティション化が必要です。そのため、groupEdges
を呼び出す前にGraph.partitionBy
を呼び出す必要があります。
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
: Graph[Int, Int]
}
GraphLoader.edgeListFile
は、ディスク上のエッジのリストからグラフを読み込む方法を提供します。 #
で始まるコメント行をスキップして、次の形式の(ソース頂点ID、宛先頂点ID)ペアの隣接リストを解析します
# This is a comment
2 1
4 1
1 2
指定されたエッジからGraph
を作成し、エッジによって言及された頂点を自動的に作成します。 すべての頂点とエッジの属性はデフォルトで1です。 canonicalOrientation
引数を使用すると、正の方向(srcId < dstId
)にエッジを方向転換できます。これは、連結成分アルゴリズムで必要です。 minEdgePartitions
引数は、生成するエッジパーティションの最小数を指定します。 たとえば、HDFSファイルにブロックが多い場合など、指定された数よりも多くのエッジパーティションが存在する場合があります。
object Graph {
def apply[VD, ED](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null)
: Graph[VD, ED]
def fromEdges[VD, ED](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]
}
Graph.apply
を使用すると、頂点とエッジのRDDからグラフを作成できます。 重複する頂点は任意に選択され、エッジRDDには見つかるが頂点RDDには見つからない頂点には、デフォルトの属性が割り当てられます。
Graph.fromEdges
を使用すると、エッジのRDDのみからグラフを作成し、エッジによって言及された頂点を自動的に作成し、それらにデフォルト値を割り当てることができます。
Graph.fromEdgeTuples
を使用すると、エッジタプルのRDDのみからグラフを作成し、エッジに値1を割り当て、エッジによって言及された頂点を自動的に作成し、それらにデフォルト値を割り当てることができます。 また、エッジの重複除去もサポートしています。 重複除去するには、PartitionStrategy
のSome
をuniqueEdges
パラメータとして渡します(たとえば、uniqueEdges = Some(PartitionStrategy.RandomVertexCut)
)。 同じエッジを同じパーティションに配置して重複除去できるようにするには、パーティション戦略が必要です。
頂点および辺 RDD
GraphXは、グラフ内に格納されている頂点とエッジのRDD
ビューを公開します。 ただし、GraphXは頂点とエッジを最適化されたデータ構造に保持し、これらのデータ構造は追加の機能を提供するため、頂点とエッジはそれぞれVertexRDD
VertexRDDおよびEdgeRDD
EdgeRDDとして返されます。 このセクションでは、これらのタイプに追加された便利な機能のいくつかを確認します。 これは不完全なリストにすぎません。操作の公式リストについては、APIドキュメントを参照してください。
VertexRDD
VertexRDD[A]
はRDD[(VertexId, A)]
を拡張し、各VertexId
が*1回だけ*出現するという追加の制約を追加します。 さらに、VertexRDD[A]
は、それぞれがタイプA
の属性を持つ頂点の*セット*を表します。 内部的には、これは頂点属性を再利用可能なハッシュマップデータ構造に格納することによって実現されます。 その結果、2つのVertexRDD
が同じベースのVertexRDD
VertexRDDから派生している場合(たとえば、filter
またはmapValues
による)、ハッシュ評価なしで一定時間で結合できます。 このインデックス付きデータ構造を活用するために、VertexRDD
VertexRDDは次の追加機能を公開します
class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// Filter the vertex set but preserves the internal index
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
// Transform the values without changing the ids (preserves the internal index)
def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
// Show only vertices unique to this set based on their VertexId's
def minus(other: RDD[(VertexId, VD)])
// Remove vertices from this set that appear in the other set
def diff(other: VertexRDD[VD]): VertexRDD[VD]
// Join operators that take advantage of the internal indexing to accelerate joins (substantially)
def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD.
def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
}
たとえば、filter
演算子がVertexRDD
VertexRDDを返す方法に注目してください。 フィルターは実際にはBitSet
を使用して実装されているため、インデックスを再利用し、他のVertexRDD
との高速結合を行うことができます。 同様に、mapValues
演算子は、map
関数がVertexId
を変更することを許可しないため、同じHashMap
データ構造を再利用できます。 leftJoin
とinnerJoin
はどちらも、同じHashMap
から派生した2つのVertexRDD
を結合するときに識別し、コストのかかるポイントルックアップではなく線形スキャンによって結合を実装できます。
aggregateUsingIndex
演算子は、RDD[(VertexId, A)]
から新しいVertexRDD
VertexRDDを効率的に構築するのに役立ちます。 概念的には、頂点のセット上にVertexRDD[B]
を構築した場合、*それがいくつかのRDD[(VertexId, A)]
の頂点のスーパーセットである*場合、インデックスを再利用してRDD[(VertexId, A)]
を集約し、その後インデックスを作成できます。 例えば
val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1)))
val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0)))
// There should be 200 entries in rddB
rddB.count
val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _)
// There should be 100 entries in setB
setB.count
// Joining A and B should now be fast!
val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b)
EdgeRDD
RDD[Edge[ED]]
を拡張するEdgeRDD[ED]
は、PartitionStrategy
で定義されているさまざまなパーティション戦略のいずれかを使用してパーティション化されたブロックにエッジを編成します。 各パーティション内では、エッジ属性と隣接構造は個別に格納され、属性値を変更する際の再利用が最大化されます。
EdgeRDD
EdgeRDDによって公開される3つの追加機能は次のとおりです。
// Transform the edge attributes while preserving the structure
def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2]
// Reverse the edges reusing both attributes and structure
def reverse: EdgeRDD[ED]
// Join two `EdgeRDD`s partitioned using the same partitioning strategy.
def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
ほとんどのアプリケーションでは、EdgeRDD
EdgeRDDに対する操作はグラフ演算子によって実行されるか、基本RDD
クラスで定義されている操作に依存していることがわかりました。
最適化された表現
分散グラフのGraphX表現で使用される最適化の詳細な説明はこのガイドの範囲外ですが、高レベルの理解は、スケーラブルなアルゴリズムの設計とAPIの最適な使用に役立つ場合があります。 GraphXは、分散グラフパーティション化に頂点カットアプローチを採用しています
GraphXは、エッジに沿ってグラフを分割するのではなく、頂点に沿ってグラフを分割します。これにより、通信とストレージのオーバーヘッドの両方を削減できます。 論理的には、これはエッジをマシンに割り当て、頂点が複数のマシンにまたがることを許可することに対応します。 エッジを割り当てる正確な方法は、PartitionStrategy
に依存し、さまざまなヒューリスティックにはいくつかのトレードオフがあります。 ユーザーは、Graph.partitionBy
演算子を使用してグラフを再パーティション化することにより、異なる戦略を選択できます。 デフォルトのパーティション戦略は、グラフ構築で提供されるように、エッジの初期パーティションを使用することです。 ただし、ユーザーはGraphXに含まれる2Dパーティション化または他のヒューリスティックに簡単に切り替えることができます。
エッジが分割された後、効率的なグラフ並列計算の重要な課題は、頂点属性とエッジを効率的に結合することです。実際のグラフは通常、頂点よりも多くのエッジを持つため、頂点属性をエッジに移動します。すべてのパーティションにすべての頂点に隣接するエッジが含まれているわけではないため、triplets
や aggregateMessages
などの操作に必要な結合を実装する際に、頂点をどこにブロードキャストするかを識別するルーティングテーブルを内部的に保持します。
グラフアルゴリズム
GraphX には、分析タスクを簡素化するための一連のグラフアルゴリズムが含まれています。アルゴリズムは org.apache.spark.graphx.lib
パッケージに含まれており、GraphOps
を介して Graph
のメソッドとして直接アクセスできます。このセクションでは、アルゴリズムとその使用方法について説明します。
PageRank
PageRank は、u から v へのエッジが u による v の重要性の支持を表すことを前提として、グラフ内の各頂点の重要度を測定します。たとえば、Twitter ユーザーが多くの他のユーザーにフォローされている場合、そのユーザーは高くランク付けされます。
GraphX には、PageRank
オブジェクトのメソッドとして、PageRank の静的および動的な実装が付属しています。静的 PageRank は固定回数の反復を実行しますが、動的 PageRank はランクが収束するまで(つまり、指定された許容値以上変化しなくなるまで)実行されます。GraphOps
を使用すると、これらのアルゴリズムを Graph
のメソッドとして直接呼び出すことができます。
GraphX には、PageRank を実行できるソーシャルネットワークデータセットの例も含まれています。ユーザーのセットは data/graphx/users.txt
に、ユーザー間の関係のセットは data/graphx/followers.txt
に示されています。各ユーザーの PageRank は次のように計算します。
import org.apache.spark.graphx.GraphLoader
// Load the edges as a graph
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Run PageRank
val ranks = graph.pageRank(0.0001).vertices
// Join the ranks with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
// Print the result
println(ranksByUsername.collect().mkString("\n"))
連結成分
連結成分アルゴリズムは、グラフの各連結成分に、番号が最も小さい頂点の ID でラベルを付けます。たとえば、ソーシャルネットワークでは、連結成分はクラスタを近似できます。GraphX には、ConnectedComponents
オブジェクト にアルゴリズムの実装が含まれており、PageRank セクション のソーシャルネットワークデータセットの例の連結成分は次のように計算します。
import org.apache.spark.graphx.GraphLoader
// Load the graph as in the PageRank example
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Find the connected components
val cc = graph.connectedComponents().vertices
// Join the connected components with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ccByUsername = users.join(cc).map {
case (id, (username, cc)) => (username, cc)
}
// Print the result
println(ccByUsername.collect().mkString("\n"))
三角形カウント
頂点は、それらの間にエッジを持つ 2 つの隣接頂点がある場合、三角形の一部です。GraphX は、TriangleCount
オブジェクト に三角形カウントアルゴリズムを実装しており、各頂点を通過する三角形の数を決定し、クラスタリングの尺度を提供します。PageRank セクション のソーシャルネットワークデータセットの三角形カウントを計算します。TriangleCount
は、エッジが標準方向(srcId < dstId
)であり、グラフが Graph.partitionBy
を使用して分割されている必要があります。
import org.apache.spark.graphx.{GraphLoader, PartitionStrategy}
// Load the edges in canonical order and partition the graph for triangle count
val graph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt", true)
.partitionBy(PartitionStrategy.RandomVertexCut)
// Find the triangle count for each vertex
val triCounts = graph.triangleCount().vertices
// Join the triangle counts with the usernames
val users = sc.textFile("data/graphx/users.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) =>
(username, tc)
}
// Print the result
println(triCountByUsername.collect().mkString("\n"))
例
いくつかのテキストファイルからグラフを作成し、重要な関係とユーザーにグラフを制限し、サブグラフでページランクを実行し、最後に上位ユーザーに関連付けられた属性を返す必要があるとします。GraphX を使用すると、これらすべてをわずか数行のコードで実行できます。
import org.apache.spark.graphx.GraphLoader
// Load my user data and parse into tuples of user id and attribute list
val users = (sc.textFile("data/graphx/users.txt")
.map(line => line.split(",")).map( parts => (parts.head.toLong, parts.tail) ))
// Parse the edge data which is already in userId -> userId format
val followerGraph = GraphLoader.edgeListFile(sc, "data/graphx/followers.txt")
// Attach the user attributes
val graph = followerGraph.outerJoinVertices(users) {
case (uid, deg, Some(attrList)) => attrList
// Some users may not have attributes so we set them as empty
case (uid, deg, None) => Array.empty[String]
}
// Restrict the graph to users with usernames and names
val subgraph = graph.subgraph(vpred = (vid, attr) => attr.size == 2)
// Compute the PageRank
val pagerankGraph = subgraph.pageRank(0.001)
// Get the attributes of the top pagerank users
val userInfoWithPageRank = subgraph.outerJoinVertices(pagerankGraph.vertices) {
case (uid, attrList, Some(pr)) => (pr, attrList.toList)
case (uid, attrList, None) => (0.0, attrList.toList)
}
println(userInfoWithPageRank.vertices.top(5)(Ordering.by(_._2._1)).mkString("\n"))