クラスタリング - RDDベースAPI

クラスタリングは、類似性の概念に基づいてエンティティのサブセットを互いにグループ化する非教師あり学習問題です。クラスタリングは、探索的分析や、階層的な教師あり学習パイプライン(各クラスタに対して異なる分類器または回帰モデルがトレーニングされる)のコンポーネントとしてよく使用されます。

spark.mllibパッケージは、次のモデルをサポートしています。

K-means

K-meansは、データ点を事前に定義された数のクラスタにクラスタリングする最も一般的に使用されるクラスタリングアルゴリズムの1つです。spark.mllibの実装には、k-means++メソッドの並列化されたバリアントであるkmeans||が含まれています。spark.mllibの実装には、次のパラメータがあります。

次の例は、PySparkシェルでテストできます。

次の例では、データのロードと解析後、KMeansオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、集合内平方和誤差(WSSSE)を計算します。kを増やすことでこの誤差を減らすことができます。実際、最適なkは通常、WSSSEグラフに「肘」がある点です。

APIの詳細については、KMeans PythonドキュメントKMeansModel Pythonドキュメントを参照してください。

from numpy import array
from math import sqrt

from pyspark.mllib.clustering import KMeans, KMeansModel

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
    center = clusters.centers[clusters.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))

# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
完全な例コードは、Sparkリポジトリの"examples/src/main/python/mllib/k_means_example.py"にあります。

次のコードスニペットはspark-shellで実行できます。

次の例では、データのロードと解析後、KMeansオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、集合内平方和誤差(WSSSE)を計算します。kを増やすことでこの誤差を減らすことができます。実際、最適なkは通常、WSSSEグラフに「肘」がある点です。

APIの詳細については、KMeans ScalaドキュメントKMeansModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")

// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
完全な例コードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala"にあります。

MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaと同じようにJavaでインポートして呼び出すことができます。唯一の注意点として、メソッドはScalaのRDDオブジェクトを受け取るのに対し、Spark Java APIは別のJavaRDDクラスを使用します。JavaRDDオブジェクトで.rdd()を呼び出すことで、Java RDDをScala RDDに変換できます。Scalaで提供されている例と同等の、独立したアプリケーション例を以下に示します。

APIの詳細については、KMeans JavaドキュメントKMeansModel Javaドキュメントを参照してください。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
parsedData.cache();

// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);

System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
  System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);

// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);

// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
  "target/org/apache/spark/JavaKMeansExample/KMeansModel");
完全な例コードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeansExample.java"にあります。

ガウス混合モデル

ガウス混合モデルは、点がそれぞれ独自の確率を持つk個のガウス部分分布のいずれかから描画される複合分布を表します。spark.mllibの実装は、期待値最大化アルゴリズムを使用して、サンプルのセットが与えられた場合の最尤モデルを誘導します。この実装には、次のパラメータがあります。

次の例では、データのロードと解析後、GaussianMixtureオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、混合モデルのパラメータを出力します。

APIの詳細については、GaussianMixture PythonドキュメントGaussianMixtureModel Pythonドキュメントを参照してください。

from numpy import array

from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel

# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))

# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)

# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
    .load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")

# output parameters of model
for i in range(2):
    print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
          "sigma = ", gmm.gaussians[i].sigma.toArray())
完全な例コードは、Sparkリポジトリの"examples/src/main/python/mllib/gaussian_mixture_example.py"にあります。

次の例では、データのロードと解析後、GaussianMixtureオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、混合モデルのパラメータを出力します。

APIの詳細については、GaussianMixture ScalaドキュメントGaussianMixtureModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()

// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)

// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
  "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")

// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
  println("weight=%f\nmu=%s\nsigma=\n%s\n" format
    (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
完全な例コードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala"にあります。

MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaと同じようにJavaでインポートして呼び出すことができます。唯一の注意点として、メソッドはScalaのRDDオブジェクトを受け取るのに対し、Spark Java APIは別のJavaRDDクラスを使用します。JavaRDDオブジェクトで.rdd()を呼び出すことで、Java RDDをScala RDDに変換できます。Scalaで提供されている例と同等の、独立したアプリケーション例を以下に示します。

APIの詳細については、GaussianMixture JavaドキュメントGaussianMixtureModel Javaドキュメントを参照してください。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.trim().split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
parsedData.cache();

// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
  "target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");

// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
  System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
    gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
完全な例コードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/mllib/JavaGaussianMixtureExample.java"にあります。

べき乗法クラスタリング (PIC)

べき乗法クラスタリング (PIC) は、Lin and Cohen, Power Iteration Clusteringで説明されているように、エッジプロパティとしてペアワイズ類似性が与えられたグラフの頂点をクラスタリングするためのスケーラブルで効率的なアルゴリズムです。べき乗法を介してグラフの正規化されたアフィニティ行列の擬似固有ベクトルを計算し、それを用いて頂点をクラスタリングします。spark.mllibには、バックエンドとしてGraphXを使用したPICの実装が含まれています。(srcId, dstId, similarity)タプルのRDDを受け取り、クラスタリング割り当てを含むモデルを出力します。類似度は非負でなければなりません。PICは、類似度尺度が対称であると仮定します。順序に関係なく、ペア(srcId, dstId)は入力データに最大1回しか表示されません。ペアが入力にない場合、その類似度は0として扱われます。spark.mllibのPIC実装は、次の(ハイパー)パラメータを受け取ります。

以下では、spark.mllibでPICを使用する方法を示すコードスニペットを示します。

PowerIterationClusteringはPICアルゴリズムを実装します。アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)タプルのRDDを取ります。PowerIterationClustering.runを呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModelが返されます。

APIの詳細については、PowerIterationClustering Python ドキュメントPowerIterationClusteringModel Python ドキュメントを参照してください。

from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel

# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))

# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)

model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))

# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
    .load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/mllib/power_iteration_clustering_example.py"にあります。

PowerIterationClusteringはPICアルゴリズムを実装します。アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)タプルのRDDを取ります。PowerIterationClustering.runを呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModelが返されます。

APIの詳細については、PowerIterationClustering Scala ドキュメントPowerIterationClusteringModel Scala ドキュメントを参照してください。

import org.apache.spark.mllib.clustering.PowerIterationClustering

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
  .setK(params.k)
  .setMaxIterations(params.maxIterations)
  .setInitializationMode("degree")
  .run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
  .map { case (k, v) =>
    s"$k -> ${v.sorted.mkString("[", ",", "]")}"
  }.mkString(", ")
val sizesStr = assignments.map {
  _._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala"にあります。

PowerIterationClusteringはPICアルゴリズムを実装します。アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)タプルのJavaRDDを取ります。PowerIterationClustering.runを呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModelが返されます。

APIの詳細については、PowerIterationClustering Java ドキュメントPowerIterationClusteringModel Java ドキュメントを参照してください。

import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;

JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
  new Tuple3<>(0L, 1L, 0.9),
  new Tuple3<>(1L, 2L, 0.9),
  new Tuple3<>(2L, 3L, 0.9),
  new Tuple3<>(3L, 4L, 0.1),
  new Tuple3<>(4L, 5L, 0.9)));

PowerIterationClustering pic = new PowerIterationClustering()
  .setK(2)
  .setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);

for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
  System.out.println(a.id() + " -> " + a.cluster());
}
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaPowerIterationClusteringExample.java"にあります。

潜在ディリクレ配分 (LDA)

潜在的ディリクレ配分(LDA)は、テキスト文書のコレクションからトピックを推論するトピックモデルです。LDAは、次のようにクラスタリングアルゴリズムと考えることができます。

LDAはsetOptimizer関数によって異なる推論アルゴリズムをサポートしています。EMLDAOptimizerは尤度関数に対して期待値最大化を使用してクラスタリングを学習し、包括的な結果を提供しますが、OnlineLDAOptimizerオンライン変分推論のための反復ミニバッチサンプリングを使用し、一般的にメモリ効率が良いです。

LDAは、単語数のベクトルとしての文書のコレクションと、以下のパラメータ(ビルダーパターンを使用して設定)を取ります。

spark.mllibのすべてのLDAモデルは、以下をサポートしています。

注記:LDAはまだ開発中の実験的機能です。その結果、特定の機能は、2つの最適化アルゴリズム/最適化アルゴリズムによって生成されたモデルのいずれか1つでのみ使用できます。現在、分散型モデルはローカルモデルに変換できますが、その逆はできません。

以降では、各最適化アルゴリズム/モデルのペアについて個別に説明します。

期待値最大化

EMLDAOptimizerDistributedLDAModelで実装されています。

LDAに提供されるパラメータの場合

注記:十分な反復を行うことが重要です。初期の反復では、EMはしばしば役に立たないトピックを持ちますが、それらのトピックはより多くの反復の後で劇的に改善されます。データセットによっては、少なくとも20回、場合によっては50〜100回の反復を行うのが妥当なことが多いです。

EMLDAOptimizerDistributedLDAModelを生成します。これは、推論されたトピックだけでなく、トレーニングコーパス全体とトレーニングコーパス内の各文書のトピック分布も格納します。DistributedLDAModelは以下をサポートします。

オンライン変分ベイズ

OnlineLDAOptimizerLocalLDAModelで実装されています。

LDAに提供されるパラメータの場合

さらに、OnlineLDAOptimizerは次のパラメータを受け入れます。

OnlineLDAOptimizerLocalLDAModelを生成します。これは、推論されたトピックのみを格納します。LocalLDAModelは以下をサポートします。

以下の例では、文書コーパスを表す単語カウントベクトルを読み込みます。次に、LDAを使用して、文書から3つのトピックを推測します。目的のクラスタ数はアルゴリズムに渡されます。その後、単語に対する確率分布として表されるトピックを出力します。

APIの詳細については、LDA PythonドキュメントLDAModel Pythonドキュメントを参照してください。

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/mllib/latent_dirichlet_allocation_example.py"にあります。

APIの詳細については、LDA ScalaドキュメントDistributedLDAModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors

// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)

// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
  print(s"Topic $topic :")
  for (word <- Range(0, ldaModel.vocabSize)) {
    print(s"${topics(word, topic)}")
  }
  println()
}

// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
  "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala"にあります。

APIの詳細については、LDA JavaドキュメントDistributedLDAModel Javaドキュメントを参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
  String[] sarray = s.trim().split(" ");
  double[] values = new double[sarray.length];
  for (int i = 0; i < sarray.length; i++) {
    values[i] = Double.parseDouble(sarray[i]);
  }
  return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
  JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();

// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);

// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
  + " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
  System.out.print("Topic " + topic + ":");
  for (int word = 0; word < ldaModel.vocabSize(); word++) {
    System.out.print(" " + topics.apply(word, topic));
  }
  System.out.println();
}

ldaModel.save(jsc.sc(),
  "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
  "target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/mllib/JavaLatentDirichletAllocationExample.java"にあります。

二分k-means

二分K平均は、通常のK平均よりもはるかに高速になることがありますが、一般的に異なるクラスタリングが生成されます。

二分k平均は、階層的クラスタリングの一種です。階層的クラスタリングは、クラスタ分析で最も一般的に使用される方法の1つであり、クラスタの階層を構築することを目指しています。階層的クラスタリングの戦略は、一般的に2つのタイプに分類されます。

二分k平均アルゴリズムは、分割型アルゴリズムの一種です。MLlibの実装には、以下のパラメータがあります。

APIの詳細については、BisectingKMeans PythonドキュメントBisectingKMeansModel Pythonドキュメントを参照してください。

from numpy import array

from pyspark.mllib.clustering import BisectingKMeans

# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))

# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)

# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/mllib/bisecting_k_means_example.py"にあります。

APIの詳細については、BisectingKMeans ScalaドキュメントBisectingKMeansModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)

// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
  println(s"Cluster Center ${idx}: ${center}")
}
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala"にあります。

APIの詳細については、BisectingKMeans JavaドキュメントBisectingKMeansModel Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

List<Vector> localData = Arrays.asList(
  Vectors.dense(0.1, 0.1),   Vectors.dense(0.3, 0.3),
  Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
  Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
  Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);

BisectingKMeans bkm = new BisectingKMeans()
  .setK(4);
BisectingKMeansModel model = bkm.run(data);

System.out.println("Compute Cost: " + model.computeCost(data));

Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
  Vector clusterCenter = clusterCenters[i];
  System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/mllib/JavaBisectingKMeansExample.java"にあります。

ストリーミングk-means

データがストリームで到着する場合、新しいデータが到着するにつれて更新される動的なクラスタを推定したい場合があります。spark.mllibは、推定値の減衰(または「忘却」)を制御するパラメータを使用して、ストリーミングk平均クラスタリングをサポートしています。このアルゴリズムは、ミニバッチk平均更新ルールの一般化を使用します。各バッチのデータについて、すべての点を最も近いクラスタに割り当て、新しいクラスタの中心を計算し、次を使用して各クラスタを更新します。

\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation} \begin{equation} n_{t+1} = n_t + m_t \end{equation}

ここで、$c_t$はクラスタの以前の中心、$n_t$はこれまでクラスタに割り当てられた点の数、$x_t$は現在のバッチからの新しいクラスタの中心、$m_t$は現在のバッチでクラスタに追加された点の数です。減衰係数$\alpha$を使用して過去を無視できます。 $\alpha$=1の場合、最初からすべてのデータが使用されます。 $\alpha$=0の場合、最新のデータのみが使用されます。これは、指数加重移動平均に似ています。

減衰はhalfLifeパラメータを使用して指定できます。これは、時間tに取得されたデータについて、時間t + halfLifeまでにその寄与が0.5に低下するように、正しい減衰係数aを決定します。時間の単位はbatchesまたはpointsとして指定でき、更新ルールはそれに応じて調整されます。

この例は、ストリーミングデータ上のクラスタを推定する方法を示しています。

APIの詳細については、StreamingKMeans Pythonドキュメントを参照してください。また、Spark StreamingプログラミングガイドでStreamingContextの詳細を参照してください。

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans

# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(')')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))

    return LabeledPoint(label, vec)

trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
    .map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))

testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)

trainingQueue = [trainingData]
testingQueue = [testingData]

trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)

# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)

# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)

result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()

ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/mllib/streaming_k_means_example.py"にあります。

APIの詳細については、StreamingKMeans Scalaドキュメントを参照してください。また、Spark StreamingプログラミングガイドでStreamingContextの詳細を参照してください。

import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val model = new StreamingKMeans()
  .setK(args(3).toInt)
  .setDecayFactor(1.0)
  .setRandomCenters(args(4).toInt, 0.0)

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala"にあります。

データを含む新しいテキストファイルを追加すると、クラスタの中心が更新されます。各トレーニングポイントは[x1, x2, x3]としてフォーマットする必要があり、各テストデータポイントは(y, [x1, x2, x3])としてフォーマットする必要があります。ここで、yは、役立つラベルまたは識別子(例:真のカテゴリの割り当て)です。/training/data/dirにテキストファイルが配置されると、モデルが更新されます。/testing/data/dirにテキストファイルが配置されると、予測が表示されます。新しいデータを使用すると、クラスタの中心が変化します!