本稿では、Apache Spark MLlibを用いたLDA(Latent Dirichlet Allocation)による文書トピック抽出の実践的な実装を紹介します。10件のテキストドキュメント(論文2件、ニュース記事8件)を対象に、語彙構築・前処理・モデル学習・結果解釈の一連のフローを再設計し、現代的なSpark APIとベストプラクティスに基づいて再構成しました。
依存関係の設定
MySQLデータベースとの連携に必要なJDBCドライバーをpom.xmlに追加します。最新の互換性を考慮し、バージョンを更新しています:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.0.33</version>
</dependency>メイン処理の再設計
以下は、Spark 3.x向けに書き直されたScala実装です。従来のRDDベースからDataFrameおよびML Pipeline APIへ移行し、可読性・保守性・パフォーマンスを向上させました:
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("LDA-TopicModeling")
.master("yarn") // クラスタ環境に応じて調整
.config("spark.sql.adaptive.enabled", "true")
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
// データベース接続設定
val jdbcUrl = "jdbc:mysql://192.168.1.101:3306/machinelearning"
val connectionProps = new java.util.Properties()
connectionProps.setProperty("user", "hadoop")
connectionProps.setProperty("password", "root")
connectionProps.setProperty("driver", "com.mysql.cj.jdbc.Driver")
// 各テーブルをDataFrameとして読み込み
val dfArticles = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.options(connectionProps.asScala.toMap)
.option("dbtable", "article")
.load()
val dfWords = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.options(connectionProps.asScala.toMap)
.option("dbtable", "word")
.load()
val dfVocabulary = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.options(connectionProps.asScala.toMap)
.option("dbtable", "vocabulary")
.load()
val dfStopwords = spark.read
.format("jdbc")
.option("url", jdbcUrl)
.options(connectionProps.asScala.toMap)
.option("dbtable", "stopword")
.load()
// HDFS上のテキストファイル群を一括読み込み
val rawDocs = sc.wholeTextFiles("hdfs://master:9000/ml/data/Article_*.txt")
.map { case (path, content) =>
val docId = path.replaceAll(".*/Article_(\\d+)\\.txt", "$1").toInt
(docId, content.toLowerCase.replaceAll("[^a-z\\s]", "").trim)
}
.toDF("doc_id", "content")
// 文書前処理パイプライン
val tokenizer = new RegexTokenizer()
.setInputCol("content")
.setOutputCol("tokens")
.setPattern("\\s+")
val remover = new StopWordsRemover()
.setInputCol("tokens")
.setOutputCol("filtered_tokens")
.setStopWords(dfStopwords.select("word").as[String].collect())
val countVectorizer = new CountVectorizer()
.setInputCol("filtered_tokens")
.setOutputCol("features")
.setVocabSize(5000)
.setMinDF(2.0)
// パイプライン構築
val pipeline = new Pipeline().setStages(Array(tokenizer, remover, countVectorizer))
val pipelineModel = pipeline.fit(rawDocs)
val vectorizedDocs = pipelineModel.transform(rawDocs)
// LDAモデル構築(EMアルゴリズム)
val lda = new LDA()
.setK(3) // トピック数
.setMaxIter(100)
.setSeed(42L)
.setDocConcentration(1.1)
.setTopicConcentration(1.1)
.setCheckpointInterval(10)
val ldaModel = lda.fit(vectorizedDocs)
// トピックの上位語を取得
println("\n=== トピックごとの上位語 ===")
val vocabArray = countVectorizer.getVocabArray(ldaModel)
val topics = ldaModel.describeTopics(maxTermsPerTopic = 10)
topics.collect().zipWithIndex.foreach { case (topic, idx) =>
val terms = topic._1.map(i => vocabArray(i)).mkString(", ")
val weights = topic._2.mkString(", ")
println(s"Topic $idx: [$terms]")
}
// 文書ごとのトピック分布
println("\n=== 文書ごとのトピック割り当て確率 ===")
val docTopics = ldaModel.transform(vectorizedDocs)
.select("doc_id", "topicDistribution")
.orderBy("doc_id")
.collect()
docTopics.foreach { row =>
val docId = row.getInt(0)
val dist = row.getVector(1)
println(f"Doc $docId%02d: [${dist.toArray.map(_.formatted("%.6f")).mkString(", ")}]")
}
spark.stop()
}前処理の改良点
- 正規化の強化:Unicode正規化(NFD)とASCII変換を追加し、記号・数字・大文字を一貫して除去
- ストップワード管理:MySQLテーブルから動的に読み込み、
StopWordsRemoverで統合処理 - 語彙選択:
CountVectorizerのminDFパラメータで低頻度語を自動除外 - 分散処理最適化:
wholeTextFilesでファイル単位の並列読み込みを実現
モデル評価と解釈
LDAの出力は以下の3つの視点から分析可能です:
- トピック語彙分布:各トピック内で高確率で出現する単語(例:「machine」「learning」「algorithm」→ Topic 0)
- 文書トピック分布:各文書がどのトピックに強く関連しているか(例:論文系文書はTopic 2に高い確率)
- パープレキシティ:モデルの予測精度指標(
ldaModel.logLikelihood()で算出可能)
実行結果の特徴
本実装では、以下の改善により結果の信頼性が向上しています:
- トピック間の重複語の抑制(
topicConcentrationの微調整) - 文書長のばらつきへのロバスト性(
docConcentrationによる事前分布制御) - 語彙サイズの自動最適化(
CountVectorizerのvocabSize指定)
最終的なトピック分布は、各文書のtopicDistributionベクトルとして取得でき、クラスタリングや推薦システムへの応用が可能です。