K-Meansアルゴリズムは距離ベースのクラスタリング手法であり、反復処理を用いてK個のクラスタ中心を計算し、データポイントをK個のクラスに分類します。
MLlibにおけるK-Meansアルゴリズムの実装原理は、複数のK-Means実行(各実行をrunと呼びます)を行い、最も優れたクラスタリング結果を中心として返します。初期のクラスタ中心はランダムに設定されるか、KMean++アルゴリズムによって決定されます。指定された回数の反復処理が完了するか、すべての実行が収束した時点でアルゴリズムは終了します。
Sparkを用いてK-Meansアルゴリズムを実装するには、まずpomファイルを修正して機械学習ライブラリのMLlibパッケージを導入する必要があります:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.1.0</version>
</dependency>
実装コードは以下の通りです:
import org.apache.log4j.{Level,Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
object KmeansClustering {
def main(args:Array[String]): Unit = {
// ログ出力を抑制
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// 実行環境の設定
val conf = new SparkConf().setAppName("K-Means-Clustering").setMaster("spark://master:7077")
.setJars(Seq("/path/to/your/project.jar"))
val sc = new SparkContext(conf)
// データセットの読み込み
val rawData = sc.textFile("hdfs://master:9000/clustering_data.txt", 1)
val vectorData = rawData.map(line => Vectors.dense(line.split(" ").map(_.toDouble)))
// クラスタリングの実行(2クラスタ、30回の反復)
val clusterCount = 2
val maxIterations = 30
val clusteringModel = KMeans.train(vectorData, clusterCount, maxIterations)
// クラスタ中心の出力
println("クラスタ中心点:")
for(center <- clusteringModel.clusterCenters) {
println(" " + center.toString)
}
// 誤差二乗和によるモデル評価
val errorSum = clusteringModel.computeCost(vectorData)
println("クラスタ内誤差二乗和 = " + errorSum)
// 単一データポイントのクラスタ判定
println("ベクトル [7.3, 1.5, 10.9] のクラスタ: " +
clusteringModel.predict(Vectors.dense("7.3 1.5 10.9".split(" ").map(_.toDouble))))
println("ベクトル [4.2, 11.2, 2.7] のクラスタ: " +
clusteringModel.predict(Vectors.dense("4.2 11.2 2.7".split(" ").map(_.toDouble))))
println("ベクトル [18.0, 4.5, 3.8] のクラスタ: " +
clusteringModel.predict(Vectors.dense("1.0 14.5 73.8".split(" ").map(_.toDouble))))
// 全データセットのクラスタリング結果
val clusteringResults = rawData.map {
dataLine =>
val dataVector = Vectors.dense(dataLine.split(" ").map(_.toDouble))
val predictedCluster = clusteringModel.predict(dataVector)
dataLine + " " + predictedCluster
}.collect.foreach(println)
sc.stop
}
}
textFile()メソッドでデータセットを読み込み、RDDを取得した後、KMeans.train()メソッドを用いてRDD、クラスタ数K、反復回数を指定してKMeansモデルを生成します。KMeansモデルが得られたら、Vectors.dense()メソッドでベクトルを作成し、KMeans.predict()メソッドを用いてデータがどのクラスタに属するか判定できます。
実行結果は以下の通りです:
クラスタ中心点:
[6.062499999999999,6.7124999999999995,11.5]
[3.5,12.2,60.0]
クラスタ内誤差二乗和 = 943.2074999999998
ベクトル [7.3, 1.5, 10.9] のクラスタ: 0
ベクトル [4.2, 11.2, 2.7] のクラスタ: 0
ベクトル [18.0, 4.5, 3.8] のクラスタ: 1
0.0 0.0 5.0 0
0.1 10.1 0.1 0
1.2 5.2 13.5 0
9.5 9.0 9.0 0
9.1 9.1 9.1 0
19.2 9.4 29.2 0
5.8 3.0 18.0 0
3.5 12.2 60.0 1
3.6 7.9 8.1 0