クラスタリング

このページでは、MLlib のクラスタリングアルゴリズムについて説明します。 RDD ベース API のクラスタリングガイドにも、これらのアルゴリズムに関する関連情報があります。

目次

K-means

k-means は、データポイントを定義済みの数のクラスタにクラスタリングする、最も一般的に使用されるクラスタリングアルゴリズムの 1 つです。MLlib の実装には、kmeans|| と呼ばれる k-means++ メソッドの並列化されたバリアントが含まれています。

KMeansEstimator として実装されており、ベースモデルとして KMeansModel を生成します。

入力列

パラメータ名 タイプ デフォルト 説明
featuresCol Vector "features" 特徴量ベクトル

出力列

パラメータ名 タイプ デフォルト 説明
predictionCol Int "prediction" 予測されたクラスタ中心

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/kmeans_example.py" で確認できます。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a k-means model.
val kmeans = new KMeans().setK(2).setSeed(1L)
val model = kmeans.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
model.clusterCenters.foreach(println)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/KMeansExample.scala" で確認できます。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.KMeansModel;
import org.apache.spark.ml.clustering.KMeans;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a k-means model.
KMeans kmeans = new KMeans().setK(2).setSeed(1L);
KMeansModel model = kmeans.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
Vector[] centers = model.clusterCenters();
System.out.println("Cluster Centers: ");
for (Vector center: centers) {
  System.out.println(center);
}
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaKMeansExample.java" で確認できます。

詳細については、R API ドキュメントを参照してください。

# Fit a k-means model with spark.kmeans
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
df_list <- randomSplit(training, c(7,3), 2)
kmeansDF <- df_list[[1]]
kmeansTestDF <- df_list[[2]]
kmeansModel <- spark.kmeans(kmeansDF, ~ Class + Sex + Age + Freq,
                            k = 3)

# Model summary
summary(kmeansModel)

# Get fitted result from the k-means model
head(fitted(kmeansModel))

# Prediction
kmeansPredictions <- predict(kmeansModel, kmeansTestDF)
head(kmeansPredictions)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/r/ml/kmeans.R" で確認できます。

Latent Dirichlet allocation (LDA)

LDA は、EMLDAOptimizerOnlineLDAOptimizer の両方をサポートする Estimator として実装されており、ベースモデルとして LDAModel を生成します。専門ユーザーは、必要に応じて、EMLDAOptimizer によって生成された LDAModelDistributedLDAModel にキャストできます。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_lda_libsvm_data.txt")

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/lda_example.py" で確認できます。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.LDA

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt")

// Trains a LDA model.
val lda = new LDA().setK(10).setMaxIter(10)
val model = lda.fit(dataset)

val ll = model.logLikelihood(dataset)
val lp = model.logPerplexity(dataset)
println(s"The lower bound on the log likelihood of the entire corpus: $ll")
println(s"The upper bound on perplexity: $lp")

// Describe topics.
val topics = model.describeTopics(3)
println("The topics described by their top-weighted terms:")
topics.show(false)

// Shows the result.
val transformed = model.transform(dataset)
transformed.show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/LDAExample.scala" で確認できます。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.LDA;
import org.apache.spark.ml.clustering.LDAModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_lda_libsvm_data.txt");

// Trains a LDA model.
LDA lda = new LDA().setK(10).setMaxIter(10);
LDAModel model = lda.fit(dataset);

double ll = model.logLikelihood(dataset);
double lp = model.logPerplexity(dataset);
System.out.println("The lower bound on the log likelihood of the entire corpus: " + ll);
System.out.println("The upper bound on perplexity: " + lp);

// Describe topics.
Dataset<Row> topics = model.describeTopics(3);
System.out.println("The topics described by their top-weighted terms:");
topics.show(false);

// Shows the result.
Dataset<Row> transformed = model.transform(dataset);
transformed.show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaLDAExample.java" で確認できます。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a latent dirichlet allocation model with spark.lda
model <- spark.lda(training, k = 10, maxIter = 10)

# Model summary
summary(model)

# Posterior probabilities
posterior <- spark.posterior(model, test)
head(posterior)

# The log perplexity of the LDA model
logPerplexity <- spark.perplexity(model, test)
print(paste0("The upper bound bound on perplexity: ", logPerplexity))
完全なサンプルコードは、Spark リポジトリの "examples/src/main/r/ml/lda.R" で確認できます。

Bisecting k-means

Bisecting k-means は、分割的(または「トップダウン」)アプローチを使用する一種の 階層的クラスタリング です。すべての観測値は 1 つのクラスタから始まり、階層を下るにつれて再帰的に分割が行われます。

Bisecting K-means は、通常の K-means よりもはるかに高速であることがよくありますが、一般的には異なるクラスタリング結果になります。

BisectingKMeansEstimator として実装されており、ベースモデルとして BisectingKMeansModel を生成します。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Loads data.
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# Trains a bisecting k-means model.
bkm = BisectingKMeans().setK(2).setSeed(1)
model = bkm.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/bisecting_k_means_example.py" で確認できます。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.BisectingKMeans
import org.apache.spark.ml.evaluation.ClusteringEvaluator

// Loads data.
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains a bisecting k-means model.
val bkm = new BisectingKMeans().setK(2).setSeed(1)
val model = bkm.fit(dataset)

// Make predictions
val predictions = model.transform(dataset)

// Evaluate clustering by computing Silhouette score
val evaluator = new ClusteringEvaluator()

val silhouette = evaluator.evaluate(predictions)
println(s"Silhouette with squared euclidean distance = $silhouette")

// Shows the result.
println("Cluster Centers: ")
val centers = model.clusterCenters
centers.foreach(println)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BisectingKMeansExample.scala" で確認できます。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.BisectingKMeans;
import org.apache.spark.ml.clustering.BisectingKMeansModel;
import org.apache.spark.ml.evaluation.ClusteringEvaluator;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a bisecting k-means model.
BisectingKMeans bkm = new BisectingKMeans().setK(2).setSeed(1);
BisectingKMeansModel model = bkm.fit(dataset);

// Make predictions
Dataset<Row> predictions = model.transform(dataset);

// Evaluate clustering by computing Silhouette score
ClusteringEvaluator evaluator = new ClusteringEvaluator();

double silhouette = evaluator.evaluate(predictions);
System.out.println("Silhouette with squared euclidean distance = " + silhouette);

// Shows the result.
System.out.println("Cluster Centers: ");
Vector[] centers = model.clusterCenters();
for (Vector center : centers) {
  System.out.println(center);
}
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBisectingKMeansExample.java" で確認できます。

詳細については、R API ドキュメントを参照してください。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# Fit bisecting k-means model with four centers
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)

# get fitted result from a bisecting k-means model
fitted.model <- fitted(model, "centers")

# Model summary
head(summary(fitted.model))

# fitted values on training data
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
完全なサンプルコードは、Spark リポジトリの "examples/src/main/r/ml/bisectingKmeans.R" で確認できます。

Gaussian Mixture Model (GMM)

ガウス混合モデル は、各々が独自の確率を持つ k 個のガウスサブ分布のいずれかから点が描画される複合分布を表します。 spark.ml の実装では、期待値最大化アルゴリズムを使用して、サンプルセットが与えられた場合の最尤モデルを誘導します。

GaussianMixtureEstimator として実装されており、ベースモデルとして GaussianMixtureModel を生成します。

入力列

パラメータ名 タイプ デフォルト 説明
featuresCol Vector "features" 特徴量ベクトル

出力列

パラメータ名 タイプ デフォルト 説明
predictionCol Int "prediction" 予測されたクラスタ中心
probabilityCol Vector "probability" 各クラスタの確率

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.clustering import GaussianMixture

# loads data
dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

gmm = GaussianMixture().setK(2).setSeed(538009335)
model = gmm.fit(dataset)

print("Gaussians shown as a DataFrame: ")
model.gaussiansDF.show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/gaussian_mixture_example.py" で確認できます。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.GaussianMixture

// Loads data
val dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

// Trains Gaussian Mixture Model
val gmm = new GaussianMixture()
  .setK(2)
val model = gmm.fit(dataset)

// output parameters of mixture model model
for (i <- 0 until model.getK) {
  println(s"Gaussian $i:\nweight=${model.weights(i)}\n" +
      s"mu=${model.gaussians(i).mean}\nsigma=\n${model.gaussians(i).cov}\n")
}
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/GaussianMixtureExample.scala" で確認できます。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.GaussianMixture;
import org.apache.spark.ml.clustering.GaussianMixtureModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data
Dataset<Row> dataset = spark.read().format("libsvm").load("data/mllib/sample_kmeans_data.txt");

// Trains a GaussianMixture model
GaussianMixture gmm = new GaussianMixture()
  .setK(2);
GaussianMixtureModel model = gmm.fit(dataset);

// Output the parameters of the mixture model
for (int i = 0; i < model.getK(); i++) {
  System.out.printf("Gaussian %d:\nweight=%f\nmu=%s\nsigma=\n%s\n\n",
          i, model.weights()[i], model.gaussians()[i].mean(), model.gaussians()[i].cov());
}
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaGaussianMixtureExample.java" で確認できます。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_kmeans_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a gaussian mixture clustering model with spark.gaussianMixture
model <- spark.gaussianMixture(training, ~ features, k = 2)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/r/ml/gaussianMixture.R" で確認できます。

Power Iteration Clustering (PIC)

Power Iteration Clustering (PIC) は、Lin and Cohen によって開発されたスケーラブルなグラフクラスタリングアルゴリズムです。要旨から:PIC は、データの正規化されたペアワイズ類似度行列に対する截断べき反復を使用して、データセットの非常に低次元の埋め込みを見つけます。

spark.ml の PowerIterationClustering 実装は、次のパラメータを受け取ります。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.clustering import PowerIterationClustering

df = spark.createDataFrame([
    (0, 1, 1.0),
    (0, 2, 1.0),
    (1, 2, 1.0),
    (3, 4, 1.0),
    (4, 0, 0.1)
], ["src", "dst", "weight"])

pic = PowerIterationClustering(k=2, maxIter=20, initMode="degree", weightCol="weight")

# Shows the cluster assignment
pic.assignClusters(df).show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/power_iteration_clustering_example.py" で確認できます。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.clustering.PowerIterationClustering

val dataset = spark.createDataFrame(Seq(
  (0L, 1L, 1.0),
  (0L, 2L, 1.0),
  (1L, 2L, 1.0),
  (3L, 4L, 1.0),
  (4L, 0L, 0.1)
)).toDF("src", "dst", "weight")

val model = new PowerIterationClustering().
  setK(2).
  setMaxIter(20).
  setInitMode("degree").
  setWeightCol("weight")

val prediction = model.assignClusters(dataset).select("id", "cluster")

//  Shows the cluster assignment
prediction.show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PowerIterationClusteringExample.scala" で確認できます。

詳細については、Java API ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.clustering.PowerIterationClustering;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0L, 1L, 1.0),
  RowFactory.create(0L, 2L, 1.0),
  RowFactory.create(1L, 2L, 1.0),
  RowFactory.create(3L, 4L, 1.0),
  RowFactory.create(4L, 0L, 0.1)
);

StructType schema = new StructType(new StructField[]{
  new StructField("src", DataTypes.LongType, false, Metadata.empty()),
  new StructField("dst", DataTypes.LongType, false, Metadata.empty()),
  new StructField("weight", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PowerIterationClustering model = new PowerIterationClustering()
  .setK(2)
  .setMaxIter(10)
  .setInitMode("degree")
  .setWeightCol("weight");

Dataset<Row> result = model.assignClusters(df);
result.show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPowerIterationClusteringExample.java" で確認できます。

詳細については、R API ドキュメントを参照してください。

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
# assign clusters
clusters <- spark.assignClusters(df, k = 2L, maxIter = 20L,
                                 initMode = "degree", weightCol = "weight")

showDF(arrange(clusters, clusters$id))
完全なサンプルコードは、Spark リポジトリの "examples/src/main/r/ml/powerIterationClustering.R" で確認できます。