Spark GraphXを用いたグラフ処理の基本的な実装例

Spark GraphXは分散環境でのグラフ計算を実現するためのフレームワークです。SNSやソーシャルネットワークではユーザー間の複雑な関係性が存在し、WeChatやQQ、Weiboなどのプラットフォームにおける友人関係やフォロー関係などは巨大なグラフ構造を形成します。このような大規模なデータは単一マシンでは処理が困難であるため、分散型グラフ処理フレームワークが必要となります。

  1. プロジェクト設定 ========

Mavenプロジェクトの依存関係にGraphXライブラリを追加します:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
  1. 実行環境の準備 =========
    val configuration = new SparkConf()
      .setAppName("GraphX Basic Example")
      .setMaster("local[*]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
    val sparkContext = new SparkContext(configuration)
  1. グラフ構築プロセス =======

グラフはノードとエッジで構成されるため、まずはこれらの要素を定義します:

    val nodesData = Array(
      (1L, ("Yamada", 25)),
      (2L, ("Tanaka", 30)),
      (3L, ("Suzuki", 45)),
      (4L, ("Watanabe", 28)),
      (5L, ("Sato", 33)),
      (6L, ("Ito", 22))
    )

    val connections = Array(
      Edge(1L, 2L, 4),
      Edge(2L, 3L, 6),
      Edge(3L, 4L, 2),
      Edge(4L, 5L, 5),
      Edge(5L, 1L, 3),
      Edge(6L, 2L, 7),
      Edge(6L, 4L, 1),
      Edge(3L, 6L, 8)
    )

次にRDD形式に変換します:

    val nodeRDD: RDD[(Long, (String, Int))] = sparkContext.parallelize(nodesData)
    val connectionRDD: RDD[Edge[Int]] = sparkContext.parallelize(connections)

これらを利用してグラフを作成します:

    val sampleGraph: Graph[(String, Int), Int] = Graph(nodeRDD, connectionRDD)
  1. グラフ特性の操作 =========

GraphXにおけるグラフの主要特性:

(1) vertices:すべてのノード情報

(2) edges:すべての接続情報

(3) triplets:ソースノード、ターゲットノード、およびそれらを結ぶエッジの組み合わせ

(4) degrees:各ノードの総次数

(5) inDegrees:各ノードの入次数

(6) outDegrees:各ノードの出次数

これらの特性を利用した処理の実装:

    println("=============================================================")
    println("グラフ特性の検証")
    println("=============================================================")
    
    println("年齢が25歳を超えるノードの表示(パターン1):")
    sampleGraph.vertices.filter { case (id, (name, age)) => 
      age > 25 
    }.collect.foreach { 
      case (id, (name, age)) => 
        println(s"ユーザ名: $name, 年齢: $age")
    }

    println("年齢が25歳を超えるノードの表示(パターン2):")
    sampleGraph.vertices.filter(node => node._2._2 > 25).collect.foreach { 
      node => 
        println(s"ユーザ名: ${node._2._1}, 年齢: ${node._2._2}")
    }

    println("重みが4より大きいエッジの抽出:")
    sampleGraph.edges.filter(connection => connection.attr > 4).collect.foreach { 
      connection => 
        println(s"開始ID: ${connection.srcId} → 終了ID: ${connection.dstId}, 重み: ${connection.attr}")
    }

    println("\n全トリプレットの表示:")
    sampleGraph.triplets.collect.foreach { triplet =>
      println(s"${triplet.srcAttr._1} から ${triplet.dstAttr._1} への関係")
    }

    println("\n重みが4より大きいトリプレット:")
    sampleGraph.triplets.filter(trip => trip.attr > 4).collect.foreach { triplet =>
      println(s"${triplet.srcAttr._1} から ${triplet.dstAttr._1} への強関係")
    }

    println("\n最大次数の計算:")
    def findMaximum(first: (VertexId, Int), second: (VertexId, Int)): (VertexId, Int) = {
      if (first._2 > second._2) first else second
    }
    
    println(s"最大出次数: ${sampleGraph.outDegrees.reduce(findMaximum)}")
    println(s"最大入次数: ${sampleGraph.inDegrees.reduce(findMaximum)}")
    println(s"最大総次数: ${sampleGraph.degrees.reduce(findMaximum)}")

実行結果:

INFO SparkContext: Running with master=local[*]
INFO GraphX Basic Example: グラフ特性の検証
=============================================================
年齢が25歳を超えるノードの表示(パターン1):
ユーザ名: Tanaka, 年齢: 30
ユーザ名: Suzuki, 年齢: 45
ユーザ名: Sato, 年齢: 33
年齢が25歳を超えるノードの表示(パターン2):
ユーザ名: Tanaka, 年齢: 30
ユーザ名: Suzuki, 年齢: 45
ユーザ名: Sato, 年齢: 33
重みが4より大きいエッジの抽出:
開始ID: 2 → 終了ID: 3, 重み: 6
開始ID: 4 → 終了ID: 5, 重み: 5
開始ID: 6 → 終了ID: 2, 重み: 7
開始ID: 3 → 終了ID: 6, 重み: 8

全トリプレットの表示:
Yamada から Tanaka への関係
Tanaka から Suzuki への関係
Suzuki から Watanabe への関係
Watanabe から Sato への関係
Sato から Yamada への関係
Ito から Tanaka への関係
Ito から Watanabe への関係
Suzuki から Ito への関係

重みが4より大きいトリプレット:
Tanaka から Suzuki への強関係
Watanabe から Sato への強関係
Ito から Tanaka への強関係
Suzuki から Ito への強関係

最大次数の計算:
最大出次数: (3,2)
最大入次数: (2,3)
最大総次数: (2,4)

タグ: Spark graphx Scala distributed-computing graph-processing

6月15日 16:51 投稿