クラスタリング

このページでは、MLlib のクラスタリングアルゴリズムについて説明します。 RDD ベースの API におけるクラスタリングのガイド にも、これらのアルゴリズムに関する関連情報が記載されています。

目次

K-means

k-means は、データポイントを定義済みのクラスタ数に分類する最も一般的に使用されるクラスタリングアルゴリズムの 1 つです。 MLlib の実装には、k-means++ メソッドの並列化されたバリエーションである kmeans|| が含まれます。

KMeansEstimator として実装されており、基礎モデルとして KMeansModel を生成します。

入力列

パラメータ名 タイプ 既定 説明
featuresCol ベクター "features" 特徴ベクター

出力列

パラメータ名 タイプ 既定 説明
predictionCol 整数 "prediction" 予測クラスタ中心

詳細については、Python API ドキュメント を参照してください。

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/kmeans_example.py" にあります。

詳細については、Scala API ドキュメント を参照してください。

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" にあります。

Java API ドキュメントを参照して詳細をご覧ください。

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
Spark リポジトリ内の「examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java」で完全なコード例を確認してください。

R API ドキュメントを参照して詳細をご覧ください。

# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
head(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
Spark リポジトリ内の「examples/src/main/r/ml/kmeans.R」で完全なコード例を確認してください。

潜在的ディリクレ配分 (LDA)

LDA は、EMLDAOptimizerOnlineLDAOptimizer の両方をサポートする Estimator として実装され、ベースモデルとして LDAModel を生成します。エキスパートユーザーは必要に応じて、EMLDAOptimizer によって生成された LDAModelDistributedLDAModel にキャストできます。

Python API ドキュメントを参照して詳細をご覧ください。

from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
Spark リポジトリ内の「examples/src/main/python/ml/lda_example.py」で完全なコード例を確認してください。

Scala API ドキュメントを参照して詳細をご覧ください。

import org.apache.spark.ml.clustering.LDA

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt")

// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")

// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)

// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
Spark リポジトリ内の「examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala」で完全なコード例を確認してください。

Java API ドキュメントを参照して詳細をご覧ください。

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt");

// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);

double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);

// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);

// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
Spark リポジトリ内の「examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java」で完全なコード例を確認してください。

R API ドキュメントを参照して詳細をご覧ください。

# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)

# Model summary
summary(model)

# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)

# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
Spark リポジトリ内の「examples/src/main/r/ml/lda.R」で完全なコード例を確認してください。

2 分割 k-means

二分 k-means は、分割(または「トップダウン」)アプローチを使用した一種の 階層的クラスタリングです。すべての観測は 1 つのクラスタから開始し、階層を下るにつれて再帰的に分割が実行されます。

二分 K-means は通常の K-means よりもはるかに高速になることがありますが、一般的には異なるクラスタリングが生成されます。

BisectingKMeansEstimator として実装され、ベースモデルとして BisectingKMeansModel を生成します。

Python API ドキュメントを参照して詳細をご覧ください。

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)
Spark リポジトリ内の「examples/src/main/python/ml/bisecting_k_means_example.py」で完全なコード例を確認してください。

Scala API ドキュメントを参照して詳細をご覧ください。

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
Spark リポジトリ内の「examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala」で完全なコード例を確認してください。

Java API ドキュメントを参照して詳細をご覧ください。

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
  System.out.println(center);
}
Spark リポジトリ内の「examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java」で完全なコード例を確認してください。

R API ドキュメントを参照して詳細をご覧ください。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)

# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")

# Model summary
head(summary(fitted.model))

# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
Spark リポジトリ内の「examples/src/main/r/ml/bisectingKmeans.R」で完全なコード例を確認してください。

ガウス混合モデル (GMM)

ガウス混合モデルは、k 個のガウスサブ分布のいずれかからポイントが描かれ、それぞれに確率がある混合分布を表します。spark.ml の実装では、期待最大化 アルゴリズムを使用して、一連のサンプルを与えたときの最尤モデルを誘導します。

GaussianMixtureEstimator として実装されており、基本となるモデルとしてGaussianMixtureModel を生成します。

入力列

パラメータ名 タイプ 既定 説明
featuresCol ベクター "features" 特徴ベクター

出力列

パラメータ名 タイプ 既定 説明
predictionCol 整数 "prediction" 予測クラスタ中心
probabilityCol ベクター "probability" 各クラスタの確率

詳細については、Python API ドキュメント を参照してください。

from pyspark.ml.clustering import GaussianMixture

# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
Spark リポジトリの "examples/src/main/python/ml/gaussian_mixture_example.py" で完全なサンプル コードを確認してください。

詳細については、Scala API ドキュメント を参照してください。

import org.apache.spark.ml.clustering.GaussianMixture

// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
val model = gmm.fit(dataset)

// output parameters of mixture model model
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
      s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala" で完全なサンプル コードを確認してください。

詳細については、Java API ドキュメント を参照してください。

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
  .setK(2);
GaussianMixtureModel model = gmm.fit(dataset);

// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
  System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
          i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java" で完全なサンプル コードを確認してください。

詳細については、R API ドキュメント を参照してください。

# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
Spark リポジトリの "examples/src/main/r/ml/gaussianMixture.R" で完全なサンプル コードを確認してください。

パワー反復クラスタリング (PIC)

パワー イテレーション クラスタリング (PIC) は、Lin と Cohen によって開発されたスケーラブルなグラフ クラスタリング アルゴリズムです。要約から、PIC はデータの正規化されたペアワイズ類似行列に対する切り捨てパワー イテレーションを使用して、データセットの非常に低次元の埋め込みを見つけます。

spark.ml の PowerIterationClustering 実装では、次のパラメータを使用できます。

詳細については、Python API ドキュメント を参照してください。

from pyspark.ml.clustering import PowerIterationClustering

df = spark.createDataFrame([
    (0, 1, 1.0),
    (0, 2, 1.0),
    (1, 2, 1.0),
    (3, 4, 1.0),
    (4, 0, 0.1)
], ["src", "dst", "weight"])

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

# Shows the cluster assignment
pic.assignClusters(df).show()
Spark リポジトリの "examples/src/main/python/ml/power_iteration_clustering_example.py" で完全なサンプル コードを確認してください。

詳細については、Scala API ドキュメント を参照してください。

import org.apache.spark.ml.clustering.PowerIterationClustering

val dataset = spark.createDataFrame(Seq(
  (0L, 1L, 1.0),
  (0L, 2L, 1.0),
  (1L, 2L, 1.0),
  (3L, 4L, 1.0),
  (4L, 0L, 0.1)
)).toDF("src", "dst", "weight")

val model = new PowerIterationClustering().
  setK(2).
  setMaxIter(20).
  setInitMode("degree").
  setWeightCol("weight")

val prediction = model.assignClusters(dataset).select("id", "cluster")

//  Shows the cluster assignment
prediction.show(false)
Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala" で完全なサンプル コードを確認してください。

詳細については、Java API ドキュメント を参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0L, 1L, 1.0),
  RowFactory.create(0L, 2L, 1.0),
  RowFactory.create(1L, 2L, 1.0),
  RowFactory.create(3L, 4L, 1.0),
  RowFactory.create(4L, 0L, 0.1)
);

StructType schema = new StructType(new StructField[]{
  new StructField("src", DataTypes.LongType, false, Metadata.empty()),
  new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
  new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PowerIterationClustering model = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(10)
  .setInitMode("degree")
  .setWeightCol("weight");

Dataset<Row> result = model.assignClusters(df);
result.show(false);
Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java" で完全なサンプル コードを確認してください。

詳細については、R API ドキュメント を参照してください。

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
                                 initMode = "degree", weightCol = "weight")

showDF(arrange(clusters, clusters$id))
Spark リポジトリの "examples/src/main/r/ml/powerIterationClustering.R" で完全なサンプル コードを確認してください。