グラフ計算フレームワークの基本概念
分散処理エンジンApache Sparkが提供するGraphFramesは、大規模ネットワークデータの解析に特化した拡張機能です。Pythonインターフェースを通じてグラフアルゴリズムを実行可能で、SNS分析や通信トラフィック検証などに応用できます。本稿では実装手順を解説します。
実行環境の構築
GraphFramesを動作させるには、事前に以下のコンポーネントを準備する必要があります。
基本パッケージのインストール
pip install pyspark graphframes
実行環境の設定
Spark本体のバイナリパッケージをダウンロード後、環境変数を設定します。Windows環境ではHadoop互換ツールの配置が必須です。
import os
os.environ["SPARK_HOME"] = "/opt/spark-3.5.0"
os.environ["HADOOP_HOME"] = "/opt/spark-3.5.0"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
グラフデータの構築と解析
以下の手順でネットワーク構造をモデル化します。
頂点データの定義
vertices = spark.createDataFrame([
("node_101", "User_A", 28),
("node_102", "User_B", 35),
("node_103", "User_C", 42)
], ["id", "account", "age"])
辺データの定義
edges = spark.createDataFrame([
("node_101", "node_102", "direct"),
("node_102", "node_103", "indirect"),
("node_103", "node_101", "indirect")
], ["src", "dst", "connection_type"])
グラフオブジェクトの生成
from graphframes import GraphFrame
network = GraphFrame(vertices, edges)
ネットワーク特性の計測
基本的なトポロジー分析手法を実装します。
接続度数の算出
inflow = network.inDegrees
outflow = network.outDegrees
inflow.filter("inDegree > 1").show()
outflow.orderBy("outDegree", ascending=False).show(1)
ページランクアルゴリズムの実行
pagerank_results = network.pageRank(
resetProbability=0.15,
maxIter=10
)
pagerank_results.vertices.select("id", "pagerank").show()
実データへの適用方法
実際のネットワークログを処理する際の実装パターンです。
# TSV形式のエッジリストを読み込み
edge_data = spark.read.csv(
"network_edges.tsv",
sep="\t",
header=False
).toDF("src", "dst")
# 頂点リストの自動生成
vertex_data = edge_data.selectExpr("src as id").union(
edge_data.selectExpr("dst as id")
).distinct()
# グラフ構築と分析
traffic_graph = GraphFrame(vertex_data, edge_data)
traffic_graph.triangleCount().run().show()