SparkによるK-Meansクラスタリングの実装

K-Meansアルゴリズムは距離ベースのクラスタリング手法であり、反復処理を用いてK個のクラスタ中心を計算し、データポイントをK個のクラスに分類します。

MLlibにおけるK-Meansアルゴリズムの実装原理は、複数のK-Means実行(各実行をrunと呼びます)を行い、最も優れたクラスタリング結果を中心として返します。初期のクラスタ中心はランダムに設定されるか、KMean++アルゴリズムによって決定されます。指定された回数の反復処理が完了するか、すべての実行が収束した時点でアルゴリズムは終了します。

Sparkを用いてK-Meansアルゴリズムを実装するには、まずpomファイルを修正して機械学習ライブラリのMLlibパッケージを導入する必要があります:

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

実装コードは以下の通りです:

import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

object KmeansClustering {
  def main(args:Array[String]): Unit = {
    // ログ出力を抑制
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    // 実行環境の設定
    val conf = new SparkConf().setAppName("K-Means-Clustering").setMaster("spark://master:7077")
      .setJars(Seq("/path/to/your/project.jar"))
    val sc = new SparkContext(conf)

    // データセットの読み込み
    val rawData = sc.textFile("hdfs://master:9000/clustering_data.txt", 1)
    val vectorData = rawData.map(line => Vectors.dense(line.split(" ").map(_.toDouble)))

    // クラスタリングの実行(2クラスタ、30回の反復)
    val clusterCount = 2
    val maxIterations = 30
    val clusteringModel = KMeans.train(vectorData, clusterCount, maxIterations)

    // クラスタ中心の出力
    println("クラスタ中心点:")
    for(center <- clusteringModel.clusterCenters) {
      println("  " + center.toString)
    }

    // 誤差二乗和によるモデル評価
    val errorSum = clusteringModel.computeCost(vectorData)
    println("クラスタ内誤差二乗和 = " + errorSum)

    // 単一データポイントのクラスタ判定
    println("ベクトル [7.3, 1.5, 10.9] のクラスタ: " + 
      clusteringModel.predict(Vectors.dense("7.3 1.5 10.9".split(" ").map(_.toDouble))))
    println("ベクトル [4.2, 11.2, 2.7] のクラスタ: " + 
      clusteringModel.predict(Vectors.dense("4.2 11.2 2.7".split(" ").map(_.toDouble))))
    println("ベクトル [18.0, 4.5, 3.8] のクラスタ: " + 
      clusteringModel.predict(Vectors.dense("1.0 14.5 73.8".split(" ").map(_.toDouble))))

    // 全データセットのクラスタリング結果
    val clusteringResults = rawData.map {
      dataLine =>
        val dataVector = Vectors.dense(dataLine.split(" ").map(_.toDouble))
        val predictedCluster = clusteringModel.predict(dataVector)
        dataLine + " " + predictedCluster
    }.collect.foreach(println)

    sc.stop
  }
}

textFile()メソッドでデータセットを読み込み、RDDを取得した後、KMeans.train()メソッドを用いてRDD、クラスタ数K、反復回数を指定してKMeansモデルを生成します。KMeansモデルが得られたら、Vectors.dense()メソッドでベクトルを作成し、KMeans.predict()メソッドを用いてデータがどのクラスタに属するか判定できます。

実行結果は以下の通りです:

クラスタ中心点:
  [6.062499999999999,6.7124999999999995,11.5]
  [3.5,12.2,60.0]
クラスタ内誤差二乗和 = 943.2074999999998
ベクトル [7.3, 1.5, 10.9] のクラスタ: 0
ベクトル [4.2, 11.2, 2.7] のクラスタ: 0
ベクトル [18.0, 4.5, 3.8] のクラスタ: 1
0.0 0.0 5.0 0
0.1 10.1 0.1 0
1.2 5.2 13.5 0
9.5 9.0 9.0 0
9.1 9.1 9.1 0
19.2 9.4 29.2 0
5.8 3.0 18.0 0
3.5 12.2 60.0 1
3.6 7.9 8.1 0

タグ: Spark MLlib K-Means クラスタリング 機械学習

5月25日 02:57 投稿