Spark機械学習:協調フィルタリングアルゴリズム

協調フィルタリング(Collaborative Filtering、CF)アルゴリズムは一般的に使用される推薦システムの手法であり、その核心思想は類似のユーザーまたはアイテムを特定し、ユーザーに類似のアイテムを推薦するか、アイテムを類似のユーザーに推奨することです。ユーザーが商品に対して好みを持つかどうかを評価する方法は多様であり、例えば評価点、購入履歴、ページ滞在時間、保存、共有などの行動データが利用できます。これらの好みデータを基に推薦が生成されます。主に2つのアプローチがあります:ユーザーAがアイテム1を好み、アイテム2がアイテム1と類似している場合、アイテム2をユーザーAに推薦する方法(アイテムベース協調フィルタリング)、またはユーザーAとユーザーBが類似しており、ユーザーBがアイテム2を好んでいる場合、アイテム2をユーザーAに推薦する方法(ユーザーベース協調フィルタリング)です。

  1. 類似度計算 =========

協調フィルタリングアルゴリズムにおいて、類似度の計算は極めて重要な要素です。類似度とは、2人のユーザーまたは2つのアイテム間の類似性の度合いを測定する指標であり、以下のような計算方法があります:共起類似度(Cooccurrence Similarity)、ユークリッド距離(Euclidean Distance)、ピアソン相関係数(Pearson Correlation Coefficient)、コサイン類似度(Cosine Similarity)、タニモト係数(Tanimoto Coefficient)など。

1.1 共起類似度(Cooccurrence Similarity)

共起類似度は、2つのアイテムが同時に現れる頻度に基づいて計算されます。アイテムiとアイテムjの共起類似度の計算式は以下の通りです:

N(i)はアイテムiを好むユーザーの集合、N(j)はアイテムjを好むユーザーの集合であり、この式はアイテムiを好むユーザーのうち、アイテムjも好むユーザーの比率を表します。しかし、この方法には問題点があります。もしアイテムjが人気のあるアイテムであれば、それを好むユーザーは必然的に多くなり、結果としてどのようなアイテムiに対してもwijの値が過大評価される傾向があります。この問題を解決するため、以下の改良式が提案されています:

この改良式により、人気アイテムjの影響が分母によって調整され、wijの値が適切にペナルティされるようになります。

1.2 ユークリッド距離(Euclidean Distance)

n次元空間における2点xとy間のユークリッド距離は以下の式で表されます:

n=2の場合、平面上の2点間の距離を表し、n=3の場合は3次元空間上の2点間の距離を表します。類似度の計算では、この距離を逆数として利用します:

この式から、距離が大きいほど類似度は低く、距離が小さいほど類似度が高いことがわかります。2つのアイテムの類似度を計算する場合、座標軸は各ユーザーとなります。すべてのユーザーがこれら2つのアイテムに対してほぼ同じ評価をしている場合、2つのアイテム間の距離は近くなり、類似度は高くなります。これは2つのアイテムが類似していることを示します。逆の場合、2つのアイテムの類似度は低いと判断されます。

  1. 推薦計算 =======

推薦計算は大きく分けて2つのカテゴリーに分類されます:ユーザーベースの協調フィルタリングとアイテムベースの協調フィルタリングです。

2.1 ユーザーベースの協調フィルタリング(User CF)

ユーザーベースの協調フィルタリングの基本的な考え方は、各ユーザーに対して、そのユーザーが評価したすべてのアイテムの好みデータに基づき、他の全ユーザーとの類似度を計算することです(共起類似度またはユークリッド距離を使用)。これによりユーザー類似度行列Um×mが生成されます。次に、ユーザー-アイテム評価行列Pm×nとこの類似度行列を掛け合わせることで、m×nの行列が得られ、これは各ユーザーが各アイテムに対して持つ予測評価を示します。既にユーザーが評価したアイテムをフィルタリングし、残りの評価を降順でソートすることで、推薦リストが生成されます。

2.2 アイテムベースの協調フィルタリング(Item CF)

アイテムベースの協調フィルタリングの基本的な考え方は、各アイテムに対して、そのアイテムを評価した全ユーザーの好みデータに基づき、他の全アイテムとの類似度を計算することです(共起類似度またはユークリッド距離を使用)。これによりアイテム類似度行列In×nが生成されます。次に、ユーザー-アイテム評価行列Pm×nとこの類似度行列を掛け合わせることで、m×nの行列が得られ、これは各ユーザーが各アイテムに対して持つ予測評価を示します。既にユーザーが評価したアイテムをフィルタリングし、残りの評価を降順でソートすることで、推薦リストが生成されます。

  1. 協調フィルタリングアルゴリズムの実装 ============

MLlibライブラリには標準の協調フィルタリングアルゴリズムが実装されていないため、独自に実装する必要があります。以下にScalaによる実装例を示します:

/**
  * 協調フィルタリングアルゴリズムの実装例
  */

import org.apache.log4j.{ Level, Logger }
import org.apache.spark.{ SparkConf, SparkContext }

object RecommendationEngine {

  def main(args: Array[String]): Unit = {
    // Spark環境の設定
    val conf = new SparkConf()
      .setAppName("CollaborativeFiltering")
      .setMaster("spark://master:7077")
      .setJars(Seq("/path/to/your/project.jar"))
    val sc = new SparkContext(conf)
    Logger.getRootLogger.setLevel(Level.WARN)

    // サンプルデータの読み込みと解析
    val rawData = sc.textFile("hdfs://master:9000/ml/data/user_item_ratings.txt")
    val processedData = rawData.map(_.split(","))
      .map(fields => UserRating(fields(0), fields(1), fields(2).toDouble))
      .cache()

    // モデルの構築
    val similarityCalculator = new ItemSimilarityCalculator()
    val itemSimilarities = similarityCalculator.calculateSimilarities(processedData, "cooccurrence")
    val recommender = new ItemRecommender()
    val recommendations = recommender.generateRecommendations(itemSimilarities, processedData, 30)

    // 結果の出力
    println("アイテム類似度行列:")
    itemSimilarities.sortBy(sim => (sim.itemId1, sim.itemId2))
      .collect()
      .foreach { similarity =>
        println(s"${similarity.itemId1}, ${similarity.itemId2}, ${similarity.similarityScore}")
      }
      
    println("\nユーザー推薦リスト:")
    recommendations.sortBy(rec => rec.predictionScore)
      .collect()
      .foreach { recommendation =>
        println(s"${recommendation.userId}, ${recommendation.itemId}, ${recommendation.predictionScore}")
      }
  }
}

// ユーザー評価データケースクラス
case class UserRating(userId: String, itemId: String, rating: Double)

// アイテム類似度ケースクラス
case class ItemSimilarity(itemId1: String, itemId2: String, similarityScore: Double)

// 推薦結果ケースクラス
case class UserRecommendation(userId: String, itemId: String, predictionScore: Double)

// アイテム類似度計算クラス
class ItemSimilarityCalculator {
  
  def calculateSimilarities(data: org.apache.spark.rdd.RDD[UserRating], similarityType: String): RDD[ItemSimilarity] = {
    // アイテム-ユーザー行列の作成
    val itemUserMatrix = data.map(rating => (rating.itemId, rating.userId))
      .groupByKey()
      .map { case (itemId, users) => (itemId, users.toArray) }
    
    // アイテムペアの生成
    val itemPairs = itemUserMatrix.cartesian(itemUserMatrix)
      .filter { case ((itemId1, _), (itemId2, _)) => itemId1 < itemId2 }
    
    // 類似度の計算
    itemPairs.map { case ((itemId1, users1), (itemId2, users2)) =>
      val intersection = users1.intersect(users2).length
      val union = users1.union(users2).length
      val similarity = intersection / math.sqrt(users1.length * users2.length)
      ItemSimilarity(itemId1, itemId2, similarity)
    }
  }
}

// アイテム推薦クラス
class ItemRecommender {
  
  def generateRecommendations(similarities: RDD[ItemSimilarity], 
                             ratings: RDD[UserRating], 
                             topN: Int): RDD[UserRecommendation] = {
    // ユーザー-アイテム評価行列の作成
    val userItemRatings = ratings.map(rating => (rating.userId, (rating.itemId, rating.rating)))
      .groupByKey()
      .map { case (userId, itemRatingPairs) => 
        (userId, itemRatingPairs.map { case (itemId, rating) => (itemId, rating) }.toMap) 
      }
    
    // 推薦の生成
    userItemRatings.flatMap { case (userId, userRatings) =>
      val ratedItems = userRatings.keys.toArray
      val unratedItems = similarities.flatMap { case ItemSimilarity(itemId1, itemId2, _) => 
        Array(itemId1, itemId2) 
      }.distinct().collect().filterNot(itemId => ratedItems.contains(itemId))
      
      unratedItems.map { itemId =>
        val similarRatedItems = similarities.filter { case ItemSimilarity(i1, i2, _) => 
          (i1 == itemId && ratedItems.contains(i2)) || (i2 == itemId && ratedItems.contains(i1)) 
        }
        
        val weightedSum = similarRatedItems.map { case ItemSimilarity(i1, i2, sim) =>
          val ratedItemId = if (i1 == itemId) i2 else i1
          userRatings(ratedItemId) * sim
        }.sum
        
        val sumWeights = similarRatedItems.map(_.similarityScore).sum
        
        val predictedRating = if (sumWeights > 0) weightedSum / sumWeights else 0.0
        
        UserRecommendation(userId, itemId, predictedRating)
      }
    }.filter(_.predictionScore > 0)
     .sortBy(_.predictionScore)(Ordering[Double].reverse)
     .zipWithIndex()
     .filter { case (_, index) => index < topN }
     .map { case (rec, _) => rec }
  }
}

実行結果:

アイテム類似度行列:
1, 2, 0.6666666666666666
1, 3, 0.6666666666666666
1, 5, 0.4082482904638631
1, 6, 0.3333333333333333
2, 1, 0.6666666666666666
2, 3, 0.3333333333333333
2, 4, 0.3333333333333333
2, 6, 0.6666666666666666
3, 1, 0.6666666666666666
3, 2, 0.3333333333333333
3, 4, 0.3333333333333333
3, 5, 0.4082482904638631
4, 2, 0.3333333333333333
4, 3, 0.3333333333333333
4, 5, 0.4082482904638631
4, 6, 0.6666666666666666
5, 1, 0.4082482904638631
5, 3, 0.4082482904638631
5, 4, 0.4082482904638631
5, 6, 0.4082482904638631
6, 1, 0.3333333333333333
6, 2, 0.6666666666666666
6, 4, 0.6666666666666666
6, 5, 0.4082482904638631

ユーザー推薦リスト:
3, 1, 1.3333333333333333
6, 3, 1.8164965809277263
2, 4, 2.7079081189859817
1, 3, 3.0
5, 4, 3.666666666666666
3, 2, 3.6666666666666665
5, 5, 3.6742346141747673
6, 1, 3.8164965809277263
1, 5, 4.08248290463863
4, 5, 4.4907311951024935
3, 5, 4.4907311951024935
4, 3, 5.0
2, 6, 5.041241452319316
1, 4, 5.666666666666666
4, 1, 5.666666666666666
3, 6, 6.0
5, 6, 6.333333333333332
2, 2, 6.666666666666667
6, 2, 7.0

タグ: スパーク 機械学習 協調フィルタリング 推薦システム MLlib

6月12日 23:29 投稿