クラスタリング - RDDベースAPI
クラスタリングは、何らかの類似性の概念に基づいて、エンティティのサブセットを互いにグループ化することを目指す教師なし学習の問題です。クラスタリングは、探索的分析や、階層的な教師あり学習パイプラインのコンポーネント(各クラスタに対して個別の分類器または回帰モデルがトレーニングされる)としてよく使用されます。
spark.mllibパッケージは、以下のモデルをサポートしています。
- K-means
- ガウス混合モデル
- Power Iteration Clustering (PIC)
- Latent Dirichlet Allocation (LDA)
- Bisecting k-means
- Streaming k-means
K-means
K-meansは、データポイントを事前に定義された数のクラスタにクラスタリングする、最も一般的に使用されるクラスタリングアルゴリズムの1つです。spark.mllibの実装には、kmeans||と呼ばれるk-means++メソッドの並列化されたバリアントが含まれています。spark.mllibの実装には、以下のパラメータがあります。
- k:目的とするクラスタの数。例えば、クラスタリングする固有のポイントがk未満の場合、k未満のクラスタが返される可能性があることに注意してください。
- maxIterations:実行される最大イテレーション数。
- initializationMode:ランダム初期化またはk-means||による初期化を指定します。
- runs:このパラメータはSpark 2.0.0以降では効果がありません。
- initializationSteps:k-means||アルゴリズムのステップ数を決定します。
- epsilon:k-meansが収束したと見なされる距離しきい値を決定します。
- initialModel:初期化に使用されるクラスタ中心のオプションセット。このパラメータが指定されている場合、実行は1回のみ行われます。
例
次の例は、PySparkシェルでテストできます。
次の例では、データのロードと解析の後、KMeansオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的とするクラスタの数はアルゴリズムに渡されます。次に、Within Set Sum of Squared Error (WSSSE)を計算します。kを増やすことで、このエラー尺度を減らすことができます。実際、最適なkは通常、WSSSEグラフに「エルボー」がある点です。
APIの詳細については、KMeans PythonドキュメントおよびKMeansModel Pythonドキュメントを参照してください。
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")次のコードスニペットは、spark-shellで実行できます。
次の例では、データのロードと解析の後、KMeansオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的とするクラスタの数はアルゴリズムに渡されます。次に、Within Set Sum of Squared Error (WSSSE)を計算します。kを増やすことで、このエラー尺度を減らすことができます。実際、最適なkは通常、WSSSEグラフに「エルボー」がある点です。
APIの詳細については、KMeans ScalaドキュメントおよびKMeansModel Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaと同様の方法でインポートして呼び出すことができます。唯一の注意点は、メソッドがScala RDDオブジェクトを取るのに対し、Spark Java APIは別のJavaRDDクラスを使用することです。JavaRDDオブジェクトで.rdd()を呼び出すことで、Java RDDをScala RDDに変換できます。提供されているScalaの例と同等の自己完結型アプリケーションの例を以下に示します。
APIの詳細については、KMeans JavaドキュメントおよびKMeansModel Javaドキュメントを参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");ガウス混合モデル
a ガウス混合モデルは、各々が固有の確率を持つk個のガウスサブ分布のいずれかからポイントが描画される複合分布を表します。spark.mllibの実装は、サンプルセットが与えられた場合に最大尤度モデルを誘導するために期待値最大化アルゴリズムを使用します。実装には以下のパラメータがあります。
- k:目的とするクラスタの数。
- convergenceTol:収束が達成されたと見なされる対数尤度の最大変化量。
- maxIterations:収束に達せずに実行する最大イテレーション数。
- initialModel:EMアルゴリズムを開始するためのオプションの開始点。このパラメータが省略された場合、データからランダムな開始点が構築されます。
例
次の例では、データのロードと解析の後、GaussianMixtureオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的とするクラスタの数はアルゴリズムに渡されます。次に、混合モデルのパラメータを出力します。
APIの詳細については、GaussianMixture PythonドキュメントおよびGaussianMixtureModel Pythonドキュメントを参照してください。
from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())次の例では、データのロードと解析の後、GaussianMixtureオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的とするクラスタの数はアルゴリズムに渡されます。次に、混合モデルのパラメータを出力します。
APIの詳細については、GaussianMixture ScalaドキュメントおよびGaussianMixtureModel Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaと同様の方法でインポートして呼び出すことができます。唯一の注意点は、メソッドがScala RDDオブジェクトを取るのに対し、Spark Java APIは別のJavaRDDクラスを使用することです。JavaRDDオブジェクトで.rdd()を呼び出すことで、Java RDDをScala RDDに変換できます。提供されているScalaの例と同等の自己完結型アプリケーションの例を以下に示します。
APIの詳細については、GaussianMixture JavaドキュメントおよびGaussianMixtureModel Javaドキュメントを参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}Power Iteration Clustering (PIC)
Power iteration clustering (PIC)は、Lin and Cohen, Power Iteration Clusteringで説明されているように、エッジプロパティとしてのペアワイズ類似性が与えられたグラフの頂点のクラスタリングのためのスケーラブルで効率的なアルゴリズムです。spark.mllibは、GraphXをバックエンドとして使用したPICの実装を含んでいます。これは、(srcId, dstId, similarity)タプルのRDDを受け取り、クラスタリング割り当てを含むモデルを出力します。類似度は非負である必要があります。PICは、類似性尺度が対称であると仮定します。順序に関係なく、ペア(srcId, dstId)は入力データに最大1回出現する必要があります。ペアが入力に欠落している場合、その類似度はゼロとして扱われます。spark.mllibのPIC実装は、次の(ハイパー)パラメータを取ります。
k:クラスタ数maxIterations:最大パワージテレーション数initializationMode:初期化モデル。デフォルトの「random」(頂点プロパティとしてランダムベクトルを使用)または「degree」(正規化された類似度の合計を使用)のいずれかになります。
例
以下に、spark.mllibでPICを使用する方法を示すコードスニペットを示します。
PowerIterationClusteringはPICアルゴリズムを実装しています。これは、アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)タプルのRDDを受け取ります。PowerIterationClustering.runを呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModelが返されます。
APIの詳細については、PowerIterationClustering PythonドキュメントおよびPowerIterationClusteringModel Pythonドキュメントを参照してください。
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")PowerIterationClusteringはPICアルゴリズムを実装しています。これは、アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)タプルのRDDを受け取ります。PowerIterationClustering.runを呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModelが返されます。
APIの詳細については、PowerIterationClustering ScalaドキュメントおよびPowerIterationClusteringModel Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).transform((_, v) => v.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")PowerIterationClusteringはPICアルゴリズムを実装しています。これは、アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)タプルのJavaRDDを受け取ります。PowerIterationClustering.runを呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModelが返されます。
APIの詳細については、PowerIterationClustering JavaドキュメントおよびPowerIterationClusteringModel Javaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}Latent Dirichlet Allocation (LDA)
潜在ディリクレ配分法 (LDA)は、テキストドキュメントのコレクションからトピックを推論するトピックモデルです。LDAは次のようにクラスタリングアルゴリズムと見なすことができます。
- トピックはクラスタ中心に対応し、ドキュメントはデータセット内の例(行)に対応します。
- トピックとドキュメントは両方とも特徴空間に存在し、特徴ベクトルは単語数のベクトル(単語の袋)です。
- 従来の距離を使用するクラスタリングを推定するのではなく、LDAはテキストドキュメントがどのように生成されるかの統計モデルに基づいた関数を使用します。
LDAは、setOptimizer関数を介してさまざまな推論アルゴリズムをサポートします。EMLDAOptimizerは、尤度関数に対する期待値最大化を使用してクラスタリングを学習し、包括的な結果をもたらします。一方、OnlineLDAOptimizerは、オンライン変分推論のための反復ミニバッチサンプリングを使用し、一般的にメモリ効率が良いです。
LDAは、単語数のベクトルとしてのドキュメントのコレクションと以下のパラメータ(ビルダーパターンを使用して設定)を受け取ります。
k:トピック数(つまり、クラスタ中心数)optimizer:LDAモデルの学習に使用するオプティマイザ。EMLDAOptimizerまたはOnlineLDAOptimizer。docConcentration:ドキュメントのトピック分布に対する事前分布のディリクレパラメータ。値が大きいほど、推論された分布がより滑らかになります。topicConcentration:単語(ターム)に対するトピック分布に対する事前分布のディリクレパラメータ。値が大きいほど、推論された分布がより滑らかになります。maxIterations:イテレーション数の上限。checkpointInterval:チェックポインティング(Spark構成で設定)を使用する場合、このパラメータはチェックポイントが作成される頻度を指定します。maxIterationsが大きい場合、チェックポインティングの使用は、ディスク上のシャッフルファイルサイズを削減し、障害からの回復に役立ちます。
spark.mllibのすべてのLDAモデルは、以下をサポートしています。
describeTopics:最も重要なタームとタームの重みとしてのトピックを配列で返します。topicsMatrix:各列がトピックである、vocabSizexk行列を返します。
注意:LDAはまだ活発な開発中の実験的な機能です。その結果、一部の機能は2つのオプティマイザ/モデルのいずれかでのみ利用可能です。現在、分散モデルはローカルモデルに変換できますが、その逆はできません。
以下の議論では、各オプティマイザ/モデルのペアを個別に説明します。
期待値最大化
EMLDAOptimizerおよびDistributedLDAModelで実装されています。
LDAに渡されるパラメータについては
docConcentration:対称事前分布のみがサポートされているため、提供されたk次元ベクトルのすべての値は同一でなければなりません。すべての値も $> 1.0$ である必要があります。Vector(-1)を提供すると、デフォルトの動作(値が$(50 / k) + 1$の均一なk次元ベクトル)になります。topicConcentration:対称事前分布のみがサポートされています。値は $> 1.0$ でなければなりません。-1を提供すると、値は$0.1 + 1$にデフォルト設定されます。maxIterations:最大EMイテレーション数。
注意:十分なイテレーションを行うことが重要です。初期のイテレーションでは、EMはしばしば有用でないトピックを持ちますが、それらのトピックはより多くのイテレーション後に劇的に改善します。データセットに応じて、少なくとも20回、場合によっては50〜100回のイテレーションを使用するのが妥当です。
EMLDAOptimizerはDistributedLDAModelを生成します。これは、推論されたトピックだけでなく、トレーニングコーパス全体とトレーニングコーパスの各ドキュメントのトピック分布も格納します。DistributedLDAModelは以下をサポートします。
topTopicsPerDocument:トレーニングコーパスの各ドキュメントに対する上位トピックとその重みtopDocumentsPerTopic:各トピックに対する上位ドキュメントと、ドキュメントにおけるトピックの対応する重み。logPrior:ハイパーパラメータdocConcentrationおよびtopicConcentrationを考慮した、推論されたトピックとドキュメント-トピック分布の対数確率logLikelihood:推論されたトピックとドキュメント-トピック分布を考慮した、トレーニングコーパスの対数尤度
オンライン変分ベイズ
OnlineLDAOptimizerおよびLocalLDAModelで実装されています。
LDAに渡されるパラメータについては
docConcentration:k次元の各次元のディリクレパラメータに等しい値を持つベクトルを渡すことで、非対称事前分布を使用できます。値は $>= 0$ である必要があります。Vector(-1)を提供すると、デフォルトの動作(値が$(1.0 / k)$の均一なk次元ベクトル)になります。topicConcentration:対称事前分布のみがサポートされています。値は $>= 0$ である必要があります。-1を提供すると、値は$(1.0 / k)$にデフォルト設定されます。maxIterations:送信されるミニバッチの最大数。
さらに、OnlineLDAOptimizerは以下のパラメータを受け付けます。
miniBatchFraction:各イテレーションで使用されるコーパスの割合。optimizeDocConcentration:trueに設定されている場合、各ミニバッチ後にハイパーパラメータdocConcentration(別名alpha)の最尤推定を実行し、返されるLocalLDAModelに最適化されたdocConcentrationを設定します。tau0およびkappa:学習率減衰に使用され、$(\tau_0 + iter)^{-\kappa}$で計算されます。ここで、$iter$は現在のイテレーション数です。
OnlineLDAOptimizerは、推論されたトピックのみを格納するLocalLDAModelを生成します。LocalLDAModelは以下をサポートします。
logLikelihood(documents):推論されたトピックを考慮して、提供されたdocumentsの下限を計算します。logPerplexity(documents):推論されたトピックを考慮して、提供されたdocumentsのパープレキシティの上限を計算します。
例
次の例では、ドキュメントのコーパスを表す単語カウントベクトルをロードします。次に、LDAを使用してドキュメントから3つのトピックを推論します。目的とするクラスタの数はアルゴリズムに渡されます。次に、単語に対する確率分布として表されるトピックを出力します。
APIの詳細については、LDA PythonドキュメントおよびLDAModel Pythonドキュメントを参照してください。
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")APIの詳細については、LDA ScalaドキュメントおよびDistributedLDAModel Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex().map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"Topic $topic :")
for (word <- Range(0, ldaModel.vocabSize)) {
print(s"${topics(word, topic)}")
}
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")APIの詳細については、LDA JavaドキュメントおよびDistributedLDAModel Javaドキュメントを参照してください。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");Bisecting k-means
Bisecting K-meansは、通常のK-meansよりもはるかに高速な場合がありますが、一般的には異なるクラスタリングを生成します。
Bisecting k-meansは、階層的クラスタリングの一種です。階層的クラスタリングは、クラスタの階層を構築しようとするクラスタ分析で最も一般的に使用される手法の1つです。階層的クラスタリングの戦略は、一般的に2つのタイプに分類されます。
- 凝集型:これは「ボトムアップ」アプローチです。各観測値は独自のクラスタとして開始され、階層を上に移動するにつれてクラスタのペアがマージされます。
- 分割型:これは「トップダウン」アプローチです。すべての観測値は1つのクラスタとして開始され、階層を下に移動するにつれて再帰的に分割が行われます。
Bisecting k-meansアルゴリズムは、分割型アルゴリズムの一種です。MLlibの実装には、以下のパラメータがあります。
- k:目的とするリーフクラスタの数(デフォルト:4)。分割可能なリーフクラスタがない場合、実際の数はそれより小さくなる可能性があります。
- maxIterations:クラスタを分割するための最大k-meansイテレーション数(デフォルト:20)。
- minDivisibleClusterSize:分割可能なクラスタの最小ポイント数(1.0以上の場合)または最小ポイントの割合(1.0未満の場合)(デフォルト:1)。
- seed:ランダムシード(デフォルト:クラス名のハッシュ値)。
例
APIの詳細については、BisectingKMeans PythonドキュメントおよびBisectingKMeansModel Pythonドキュメントを参照してください。
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))APIの詳細については、BisectingKMeans ScalaドキュメントおよびBisectingKMeansModel Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}APIの詳細については、BisectingKMeans JavaドキュメントおよびBisectingKMeansModel Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}Streaming k-means
ストリームにデータが到着した場合、新しいデータが到着するたびにクラスタを動的に推定し、更新したい場合があります。spark.mllibは、推定の減衰(または「忘却」)を制御するためのパラメータを備えた、ストリーミングk-meansクラスタリングのサポートを提供します。このアルゴリズムは、ミニバッチk-means更新ルールの一般化を使用します。各データバッチについて、すべてのポイントを最も近いクラスタに割り当て、新しいクラスタ中心を計算し、次に各クラスタを以下のように更新します。
\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation} \begin{equation} n_{t+1} = n_t + m_t \end{equation}
ここで、$c_t$はクラスタの以前の中心、$n_t$はこれまでにクラスタに割り当てられたポイントの数、$x_t$は現在のバッチからの新しいクラスタ中心、$m_t$は現在のバッチでクラスタに追加されたポイントの数です。減衰係数$\alpha$は過去を無視するために使用できます。$\alpha$=1の場合、すべてのデータが最初から使用されます。$\alpha$=0の場合、最新のデータのみが使用されます。これは指数移動平均に類似しています。
減衰はhalfLifeパラメータを使用して指定できます。これは、時刻tで取得されたデータの寄与が、時刻t + halfLifeまでに0.5に低下するような適切な減衰係数aを決定します。時間の単位は、batchesまたはpointsのいずれかで指定でき、更新ルールはそれに応じて調整されます。
例
この例は、ストリーミングデータでクラスタを推定する方法を示しています。
APIの詳細については、StreamingKMeans Pythonドキュメントを参照してください。また、StreamingContextの詳細については、Spark Streaming Programming Guideを参照してください。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)APIの詳細については、StreamingKMeans Scalaドキュメントを参照してください。また、StreamingContextの詳細については、Spark Streaming Programming Guideを参照してください。
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()新しいテキストファイルにデータが追加されると、クラスタ中心が更新されます。各トレーニングポイントは[x1, x2, x3]形式で、各テストデータポイントは(y, [x1, x2, x3])形式であるべきです。ここで、yは有用なラベルまたは識別子(例:実際のカテゴリ割り当て)です。/training/data/dirにテキストファイルが配置されるたびに、モデルが更新されます。/testing/data/dirにテキストファイルが配置されるたびに、予測が表示されます。新しいデータとともに、クラスタ中心は変化します!