Apache Spark を用いた FP-Growth アルゴリズムによる頻出パターン発掘

データ分析において、アソシエーションルール学習は取引データベース内の隠れた相関関係を抽出するための重要な手法です。その応用例として最も知られているのがマーケティング分野での市場バスケット分析であり、顧客の購買履歴から特定の組み合わせで購入されやすい商品を特定し、陳列配置の改善や推奨システムの構築に役立てます。

基礎概念の定義

このアルゴリズムを理解するためには、以下の基本的な指標を把握する必要があります。

  • アソシエーションルール: データセット内部で発見される事象間の因果や傾向を表現します。一般に X が発生した場合、Y も発生するという形式 (X → Y) で表されます。
  • サポート (Support): アイテムセット {X, Y} が全トランザクション中に出現する割合を示します。
  • コンフィデンス (Confidence): 条件 X が満たされている際に、結果 Y も同時に満たされる確率を表します。

従来の Apriori 手法との比較

頻出パターン探索においては、従来 Apriori 算法が主流でした。これは候補となるアイテムセットを階層的に生成・篩い落とすプロセスに基づきます。具体的には、単一のアイテムから開始し、サポート閾値を満たさないものを削除しつつ、k-1 アイテムセットから k アイテムセットへと連鎖的に拡張していく手順を踏みます。

しかし、この手法は計算コストが課題となります。各反復サイクルで候補セットの検証のために元のデータを再読み込みする必要があるため、I/O オーバーヘッドが大きく、大規模データに対する処理効率が低下する傾向がありました。これを克服するために開発されたのが、データベースへのアクセス回数を最小限に抑える FP-Growth (Frequent Pattern Growth) アルゴリズムです。

FP-Growth アルゴリズムの仕組み

本アプローチでは、効率的な検索のために専用のデータ構造を利用します。主な要素としては項頭表、FP-Tree(頻出パターンツリー)、およびノード連結リストが含まれます。

項頭表の構成 (Header Table)

まず、データを一度スキャンして頻出する単一アイテムを特定し、サポート閾値を下回る項目をフィルタリングします。その後、残りのアイテムをサポート数降順で並べ替えて項頭表を作成します。続いて二度目のデータ走査により、非頻出項目を除去し、項頭表に登録された順序に従ってデータを整理します。

FP-Tree の構築

FP-Tree は根ノード (Null) から始まる木構造です。整理済みのデータを一つのトランザクションずつ取り出し、項頭表の順序に合わせてツリー上に挿入します。既存のパスに重複するノードがある場合、カウント値が増加し、分岐が発生する場合は新しいノードが作成されます。

パターン発掘プロセス

構築されたツリーからは以下のステップで頻出パターンを取得できます。

  1. 条件パターンのベース (Condition Pattern Base) 作成: 対象とするアイテムを含むすべての前向きパスを抽出します。
  2. 条件 FP-Tree の生成: 抽出した条件パターンを集計し、再び閾値フィルタを行った後、条件付きの木構造を作成します。
  3. 再帰的探索: 生成された条件 FP-Tree を対象に同様の操作を再帰的に実行し、最終的な頻出パターンを導出します。

これにより、元データを多次読み込む必要がなくなり、大幅な性能向上が期待できます。

Spark MLlib での実装例

Apache Spark の機械学習ライブラリである MLlib では、FPGrowth クラスがこのアルゴリズムを実装しています。以下に Scala を使用した実装コードを示します。

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.mllib.fpm.FPGrowth

object FrequentPatternMining {
  
  def main(args: Array[String]): Unit = {
    // スパークコンテキストの初期化
    val sparkConf = new SparkConf().setAppName("FP_Growth_Analysis")
    val sc = new SparkContext(sparkConf)

    // テキストファイルからのデータ読み込みとクリーニング
    // トランザクションごとのアイテムをスペース区切りで保持想定
    val transactionPath = "hdfs:///user/spark/data/fpgrowth_sample.txt"
    val rawTransactions = sc.textFile(transactionPath)
    
    // パーティションごとの分割処理
    val parsedTransactions = rawTransactions
      .map(line => line.split(" ").toSeq)
      .cache()

    // モデル設定: サポート閾値 0.3 に調整
    // numPartition は並列処理のためのパーティション数
    val thresholdSupport = 0.3
    val parallelism = 4
    
    // FPGrowth モデルのトレーニング
    val frequentPatterns = new FPGrowth()
      .setMinSupport(thresholdSupport)
      .setNumPartitions(parallelism)
      .run(parsedTransactions)

    // 結果の確認
    println(s"発見された頻出アイテムセットの数:${frequentPatterns.freqItemsets.count()}")
    
    // コンソール出力
    frequentPatterns.freqItemsets.collect().foreach { itemset =>
      val items = itemset.items.mkString("[", ", ", "]")
      val frequency = itemset.freq
      println(s"Items: $items -> Frequency: $frequency")
    }

    sc.stop()
  }
}

上記のスクリプトでは、入力データを RDD として準備し、FPGrowth ライブラリを直接呼び出してモデルを生成しています。サンプルデータとしては、「D E」「A B C」のようにスペース区切りの文字列が一行あたり一つのトランザクションを表す形式が必要です。

D E
A B C
A B C E
B E
C D E
A B C
A B C E
B E
F G
D F

これらの手順により、指定された最小サポート数以上のアイテムの組合せが自動的に抽出されます。

タグ: Apache-Spark fp-growth MLlib association-rules Scala

6月27日 22:22 投稿