クラスタリング - RDDベースAPI
クラスタリングは、類似性の概念に基づいてエンティティのサブセットを互いにグループ化する非教師あり学習問題です。クラスタリングは、探索的分析や、階層的な教師あり学習パイプライン(各クラスタに対して異なる分類器または回帰モデルがトレーニングされる)のコンポーネントとしてよく使用されます。
spark.mllib
パッケージは、次のモデルをサポートしています。
K-means
K-meansは、データ点を事前に定義された数のクラスタにクラスタリングする最も一般的に使用されるクラスタリングアルゴリズムの1つです。spark.mllib
の実装には、k-means++メソッドの並列化されたバリアントであるkmeans||が含まれています。spark.mllib
の実装には、次のパラメータがあります。
- k は、目的のクラスタ数です。kよりも少ないクラスタが返される可能性があります(例:クラスタリングするkよりも少ない異なる点が存在する場合)。
- maxIterations は、実行する最大反復回数です。
- initializationMode は、ランダム初期化またはk-means||による初期化を指定します。
- runs このパラメータは、Spark 2.0.0以降は効果がありません。
- initializationSteps は、k-means||アルゴリズムの手順数を決定します。
- epsilon は、k-meansが収束したとみなす距離のしきい値を決定します。
- initialModel は、初期化に使用されるクラスタ中心のオプションセットです。このパラメータが指定されている場合、実行は1回のみ行われます。
例
次の例は、PySparkシェルでテストできます。
次の例では、データのロードと解析後、KMeansオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、集合内平方和誤差(WSSSE)を計算します。kを増やすことでこの誤差を減らすことができます。実際、最適なkは通常、WSSSEグラフに「肘」がある点です。
APIの詳細については、KMeans
PythonドキュメントとKMeansModel
Pythonドキュメントを参照してください。
from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):
center = clusters.centers[clusters.predict(point)]
return sqrt(sum([x**2 for x in (point - center)]))
WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Within Set Sum of Squared Error = " + str(WSSSE))
# Save and load model
clusters.save(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
sameModel = KMeansModel.load(sc, "target/org/apache/spark/PythonKMeansExample/KMeansModel")
次のコードスニペットはspark-shell
で実行できます。
次の例では、データのロードと解析後、KMeans
オブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、集合内平方和誤差(WSSSE)を計算します。kを増やすことでこの誤差を減らすことができます。実際、最適なkは通常、WSSSEグラフに「肘」がある点です。
APIの詳細については、KMeans
ScalaドキュメントとKMeansModel
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using KMeans
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE = clusters.computeCost(parsedData)
println(s"Within Set Sum of Squared Errors = $WSSSE")
// Save and load model
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaと同じようにJavaでインポートして呼び出すことができます。唯一の注意点として、メソッドはScalaのRDDオブジェクトを受け取るのに対し、Spark Java APIは別のJavaRDD
クラスを使用します。JavaRDD
オブジェクトで.rdd()
を呼び出すことで、Java RDDをScala RDDに変換できます。Scalaで提供されている例と同等の、独立したアプリケーション例を以下に示します。
APIの詳細については、KMeans
JavaドキュメントとKMeansModel
Javaドキュメントを参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/kmeans_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
System.out.println("Cluster centers:");
for (Vector center: clusters.clusterCenters()) {
System.out.println(" " + center);
}
double cost = clusters.computeCost(parsedData.rdd());
System.out.println("Cost: " + cost);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
// Save and load model
clusters.save(jsc.sc(), "target/org/apache/spark/JavaKMeansExample/KMeansModel");
KMeansModel sameModel = KMeansModel.load(jsc.sc(),
"target/org/apache/spark/JavaKMeansExample/KMeansModel");
ガウス混合モデル
ガウス混合モデルは、点がそれぞれ独自の確率を持つk個のガウス部分分布のいずれかから描画される複合分布を表します。spark.mllib
の実装は、期待値最大化アルゴリズムを使用して、サンプルのセットが与えられた場合の最尤モデルを誘導します。この実装には、次のパラメータがあります。
- k は、目的のクラスタ数です。
- convergenceTol は、収束が達成されたとみなす対数尤度の最大変化量です。
- maxIterations は、収束に達することなく実行する最大反復回数です。
- initialModel は、EMアルゴリズムを開始するオプションの開始点です。このパラメータが省略された場合、データからランダムな開始点が構築されます。
例
次の例では、データのロードと解析後、GaussianMixtureオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、混合モデルのパラメータを出力します。
APIの詳細については、GaussianMixture
PythonドキュメントとGaussianMixtureModel
Pythonドキュメントを参照してください。
from numpy import array
from pyspark.mllib.clustering import GaussianMixture, GaussianMixtureModel
# Load and parse the data
data = sc.textFile("data/mllib/gmm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.strip().split(' ')]))
# Build the model (cluster the data)
gmm = GaussianMixture.train(parsedData, 2)
# Save and load model
gmm.save(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
sameModel = GaussianMixtureModel\
.load(sc, "target/org/apache/spark/PythonGaussianMixtureExample/GaussianMixtureModel")
# output parameters of model
for i in range(2):
print("weight = ", gmm.weights[i], "mu = ", gmm.gaussians[i].mu,
"sigma = ", gmm.gaussians[i].sigma.toArray())
次の例では、データのロードと解析後、GaussianMixtureオブジェクトを使用してデータを2つのクラスタにクラスタリングします。目的のクラスタ数はアルゴリズムに渡されます。次に、混合モデルのパラメータを出力します。
APIの詳細については、GaussianMixture
ScalaドキュメントとGaussianMixtureModel
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()
// Cluster the data into two classes using GaussianMixture
val gmm = new GaussianMixture().setK(2).run(parsedData)
// Save and load model
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
"target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
// output parameters of max-likelihood model
for (i <- 0 until gmm.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}
MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaと同じようにJavaでインポートして呼び出すことができます。唯一の注意点として、メソッドはScalaのRDDオブジェクトを受け取るのに対し、Spark Java APIは別のJavaRDD
クラスを使用します。JavaRDD
オブジェクトで.rdd()
を呼び出すことで、Java RDDをScala RDDに変換できます。Scalaで提供されている例と同等の、独立したアプリケーション例を以下に示します。
APIの詳細については、GaussianMixture
JavaドキュメントとGaussianMixtureModel
Javaドキュメントを参照してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse data
String path = "data/mllib/gmm_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
parsedData.cache();
// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());
// Save and load GaussianMixtureModel
gmm.save(jsc.sc(), "target/org/apache/spark/JavaGaussianMixtureExample/GaussianMixtureModel");
GaussianMixtureModel sameModel = GaussianMixtureModel.load(jsc.sc(),
"target/org.apache.spark.JavaGaussianMixtureExample/GaussianMixtureModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n",
gmm.weights()[j], gmm.gaussians()[j].mu(), gmm.gaussians()[j].sigma());
}
べき乗法クラスタリング (PIC)
べき乗法クラスタリング (PIC) は、Lin and Cohen, Power Iteration Clusteringで説明されているように、エッジプロパティとしてペアワイズ類似性が与えられたグラフの頂点をクラスタリングするためのスケーラブルで効率的なアルゴリズムです。べき乗法を介してグラフの正規化されたアフィニティ行列の擬似固有ベクトルを計算し、それを用いて頂点をクラスタリングします。spark.mllib
には、バックエンドとしてGraphXを使用したPICの実装が含まれています。(srcId, dstId, similarity)
タプルのRDD
を受け取り、クラスタリング割り当てを含むモデルを出力します。類似度は非負でなければなりません。PICは、類似度尺度が対称であると仮定します。順序に関係なく、ペア(srcId, dstId)
は入力データに最大1回しか表示されません。ペアが入力にない場合、その類似度は0として扱われます。spark.mllib
のPIC実装は、次の(ハイパー)パラメータを受け取ります。
k
:クラスタ数maxIterations
:べき乗法の最大反復回数initializationMode
:初期化モデル。これは、デフォルトの「random」(ランダムベクトルを頂点プロパティとして使用する)か、「degree」(正規化された合計類似度を使用する)のいずれかになります。
例
以下では、spark.mllib
でPICを使用する方法を示すコードスニペットを示します。
PowerIterationClustering
はPICアルゴリズムを実装します。アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)
タプルのRDD
を取ります。PowerIterationClustering.run
を呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModel
が返されます。
APIの詳細については、PowerIterationClustering
Python ドキュメントとPowerIterationClusteringModel
Python ドキュメントを参照してください。
from pyspark.mllib.clustering import PowerIterationClustering, PowerIterationClusteringModel
# Load and parse the data
data = sc.textFile("data/mllib/pic_data.txt")
similarities = data.map(lambda line: tuple([float(x) for x in line.split(' ')]))
# Cluster the data into two classes using PowerIterationClustering
model = PowerIterationClustering.train(similarities, 2, 10)
model.assignments().foreach(lambda x: print(str(x.id) + " -> " + str(x.cluster)))
# Save and load model
model.save(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
sameModel = PowerIterationClusteringModel\
.load(sc, "target/org/apache/spark/PythonPowerIterationClusteringExample/PICModel")
PowerIterationClustering
はPICアルゴリズムを実装します。アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)
タプルのRDD
を取ります。PowerIterationClustering.run
を呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModel
が返されます。
APIの詳細については、PowerIterationClustering
Scala ドキュメントとPowerIterationClusteringModel
Scala ドキュメントを参照してください。
import org.apache.spark.mllib.clustering.PowerIterationClustering
val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setInitializationMode("degree")
.run(circlesRdd)
val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
.map { case (k, v) =>
s"$k -> ${v.sorted.mkString("[", ",", "]")}"
}.mkString(", ")
val sizesStr = assignments.map {
_._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
PowerIterationClustering
はPICアルゴリズムを実装します。アフィニティ行列を表す(srcId: Long, dstId: Long, similarity: Double)
タプルのJavaRDD
を取ります。PowerIterationClustering.run
を呼び出すと、計算されたクラスタリング割り当てを含むPowerIterationClusteringModel
が返されます。
APIの詳細については、PowerIterationClustering
Java ドキュメントとPowerIterationClusteringModel
Java ドキュメントを参照してください。
import org.apache.spark.mllib.clustering.PowerIterationClustering;
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel;
JavaRDD<Tuple3<Long, Long, Double>> similarities = sc.parallelize(Arrays.asList(
new Tuple3<>(0L, 1L, 0.9),
new Tuple3<>(1L, 2L, 0.9),
new Tuple3<>(2L, 3L, 0.9),
new Tuple3<>(3L, 4L, 0.1),
new Tuple3<>(4L, 5L, 0.9)));
PowerIterationClustering pic = new PowerIterationClustering()
.setK(2)
.setMaxIterations(10);
PowerIterationClusteringModel model = pic.run(similarities);
for (PowerIterationClustering.Assignment a: model.assignments().toJavaRDD().collect()) {
System.out.println(a.id() + " -> " + a.cluster());
}
潜在ディリクレ配分 (LDA)
潜在的ディリクレ配分(LDA)は、テキスト文書のコレクションからトピックを推論するトピックモデルです。LDAは、次のようにクラスタリングアルゴリズムと考えることができます。
- トピックはクラスタの中心に、文書はデータセットの例(行)に対応します。
- トピックと文書はどちらも特徴空間内に存在し、特徴ベクトルは単語数のベクトル(単語のバッグ)です。
- 従来の距離を使用してクラスタリングを推定するのではなく、LDAはテキスト文書がどのように生成されるかの統計モデルに基づいた関数を使用します。
LDAはsetOptimizer
関数によって異なる推論アルゴリズムをサポートしています。EMLDAOptimizer
は尤度関数に対して期待値最大化を使用してクラスタリングを学習し、包括的な結果を提供しますが、OnlineLDAOptimizer
はオンライン変分推論のための反復ミニバッチサンプリングを使用し、一般的にメモリ効率が良いです。
LDAは、単語数のベクトルとしての文書のコレクションと、以下のパラメータ(ビルダーパターンを使用して設定)を取ります。
k
:トピックの数(つまり、クラスタの中心)optimizer
:LDAモデルの学習に使用する最適化アルゴリズム。EMLDAOptimizer
またはOnlineLDAOptimizer
のいずれか。docConcentration
:トピックに対する文書の分布の事前分布に対するディリクレパラメータ。値が大きいほど、よりスムーズな推論分布が促進されます。topicConcentration
:用語(単語)に対するトピックの分布の事前分布に対するディリクレパラメータ。値が大きいほど、よりスムーズな推論分布が促進されます。maxIterations
:反復回数の制限。checkpointInterval
:チェックポイントを使用する場合(Sparkの設定で設定)、このパラメータはチェックポイントが作成される頻度を指定します。maxIterations
が大きい場合、チェックポイントを使用すると、ディスク上のシャッフルファイルのサイズを削減し、障害からの復旧に役立ちます。
spark.mllib
のすべてのLDAモデルは、以下をサポートしています。
describeTopics
:最も重要な用語とその重みの配列としてトピックを返します。topicsMatrix
:各列がトピックであるvocabSize
×k
行列を返します。
注記:LDAはまだ開発中の実験的機能です。その結果、特定の機能は、2つの最適化アルゴリズム/最適化アルゴリズムによって生成されたモデルのいずれか1つでのみ使用できます。現在、分散型モデルはローカルモデルに変換できますが、その逆はできません。
以降では、各最適化アルゴリズム/モデルのペアについて個別に説明します。
期待値最大化
EMLDAOptimizer
とDistributedLDAModel
で実装されています。
LDA
に提供されるパラメータの場合
docConcentration
:対称事前分布のみがサポートされるため、提供されたk
次元ベクトルのすべての値は同一である必要があります。すべての値は$ > 1.0$である必要があります。Vector(-1)
を提供すると、デフォルトの動作(値$(50 / k) + 1$を持つ均一なk
次元ベクトル)になります。topicConcentration
:対称事前分布のみがサポートされます。値は$ > 1.0$である必要があります。-1
を提供すると、値0.1 + 1にデフォルト設定されます。maxIterations
:EM反復の最大回数。
注記:十分な反復を行うことが重要です。初期の反復では、EMはしばしば役に立たないトピックを持ちますが、それらのトピックはより多くの反復の後で劇的に改善されます。データセットによっては、少なくとも20回、場合によっては50〜100回の反復を行うのが妥当なことが多いです。
EMLDAOptimizer
はDistributedLDAModel
を生成します。これは、推論されたトピックだけでなく、トレーニングコーパス全体とトレーニングコーパス内の各文書のトピック分布も格納します。DistributedLDAModel
は以下をサポートします。
topTopicsPerDocument
:トレーニングコーパス内の各文書の上位トピックとその重み。topDocumentsPerTopic
:各トピックの上位文書と、文書内のトピックの対応する重み。logPrior
:ハイパーパラメータdocConcentration
とtopicConcentration
を考慮した、推定されたトピックと文書-トピック分布の対数確率。logLikelihood
:推論されたトピックと文書-トピック分布を考慮した、トレーニングコーパスの対数尤度。
オンライン変分ベイズ
OnlineLDAOptimizer
とLocalLDAModel
で実装されています。
LDA
に提供されるパラメータの場合
docConcentration
:k
次元のそれぞれでディリクレパラメータに等しい値を持つベクトルを渡すことで、非対称事前分布を使用できます。値は$ >= 0$である必要があります。Vector(-1)
を提供すると、デフォルトの動作(値$(1.0 / k)$を持つ均一なk
次元ベクトル)になります。topicConcentration
:対称事前分布のみがサポートされます。値は$ >= 0$である必要があります。-1
を提供すると、値$(1.0 / k)$にデフォルト設定されます。maxIterations
:送信するミニバッチの最大数。
さらに、OnlineLDAOptimizer
は次のパラメータを受け入れます。
miniBatchFraction
:各反復でサンプリングされ使用されるコーパスの割合。optimizeDocConcentration
:trueに設定すると、各ミニバッチの後にハイパーパラメータdocConcentration
(別名alpha
)の最尤推定を行い、返されるLocalLDAModel
で最適化されたdocConcentration
を設定します。tau0
とkappa
:学習率の減衰に使用されます。これは$(\tau_0 + iter)^{-\kappa}$(ここで$iter$は現在の反復回数)によって計算されます。
OnlineLDAOptimizer
はLocalLDAModel
を生成します。これは、推論されたトピックのみを格納します。LocalLDAModel
は以下をサポートします。
logLikelihood(documents)
:推論されたトピックを考慮した、提供されたdocuments
の下限を計算します。logPerplexity(documents)
:推論されたトピックを考慮した、提供されたdocuments
のパープレキシティの上限を計算します。
例
以下の例では、文書コーパスを表す単語カウントベクトルを読み込みます。次に、LDAを使用して、文書から3つのトピックを推測します。目的のクラスタ数はアルゴリズムに渡されます。その後、単語に対する確率分布として表されるトピックを出力します。
APIの詳細については、LDA
PythonドキュメントとLDAModel
Pythonドキュメントを参照してください。
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
APIの詳細については、LDA
ScalaドキュメントとDistributedLDAModel
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// Index documents with unique IDs
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
// Cluster the documents into three topics using LDA
val ldaModel = new LDA().setK(3).run(corpus)
// Output topics. Each is a distribution over words (matching word count vectors)
println(s"Learned topics (as distributions over vocab of ${ldaModel.vocabSize} words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
print(s"Topic $topic :")
for (word <- Range(0, ldaModel.vocabSize)) {
print(s"${topics(word, topic)}")
}
println()
}
// Save and load model.
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
"target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
APIの詳細については、LDA
JavaドキュメントとDistributedLDAModel
Javaドキュメントを参照してください。
import scala.Tuple2;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.clustering.LDAModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Load and parse the data
String path = "data/mllib/sample_lda_data.txt";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Vector> parsedData = data.map(s -> {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus =
JavaPairRDD.fromJavaRDD(parsedData.zipWithIndex().map(Tuple2::swap));
corpus.cache();
// Cluster the documents into three topics using LDA
LDAModel ldaModel = new LDA().setK(3).run(corpus);
// Output topics. Each is a distribution over words (matching word count vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize()
+ " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
ldaModel.save(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
DistributedLDAModel sameModel = DistributedLDAModel.load(jsc.sc(),
"target/org/apache/spark/JavaLatentDirichletAllocationExample/LDAModel");
二分k-means
二分K平均は、通常のK平均よりもはるかに高速になることがありますが、一般的に異なるクラスタリングが生成されます。
二分k平均は、階層的クラスタリングの一種です。階層的クラスタリングは、クラスタ分析で最も一般的に使用される方法の1つであり、クラスタの階層を構築することを目指しています。階層的クラスタリングの戦略は、一般的に2つのタイプに分類されます。
- 凝集型: これは「ボトムアップ」アプローチです。各観測値は独自のクラスタで始まり、階層を上に移動するにつれてクラスタのペアがマージされます。
- 分割型: これは「トップダウン」アプローチです。すべての観測値は1つのクラスタで始まり、階層を下に移動するにつれて分割が再帰的に実行されます。
二分k平均アルゴリズムは、分割型アルゴリズムの一種です。MLlibの実装には、以下のパラメータがあります。
- k: 目標とするリーフクラスタの数(デフォルト: 4)。分割可能なリーフクラスタがない場合は、実際の数値が小さくなる可能性があります。
- maxIterations: クラスタを分割するためのk平均反復の最大数(デフォルト: 20)
- minDivisibleClusterSize: 分割可能なクラスタの最小点数(>= 1.0の場合)または最小割合(< 1.0の場合)(デフォルト: 1)
- seed: ランダムシード(デフォルト: クラス名のハッシュ値)
例
APIの詳細については、BisectingKMeans
PythonドキュメントとBisectingKMeansModel
Pythonドキュメントを参照してください。
from numpy import array
from pyspark.mllib.clustering import BisectingKMeans
# Load and parse the data
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
model = BisectingKMeans.train(parsedData, 2, maxIterations=5)
# Evaluate clustering
cost = model.computeCost(parsedData)
print("Bisecting K-means Cost = " + str(cost))
APIの詳細については、BisectingKMeans
ScalaドキュメントとBisectingKMeansModel
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Loads and parses data
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()
// Clustering the data into 6 clusters by BisectingKMeans.
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)
// Show the compute cost and the cluster centers
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
println(s"Cluster Center ${idx}: ${center}")
}
APIの詳細については、BisectingKMeans
JavaドキュメントとBisectingKMeansModel
Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.clustering.BisectingKMeans;
import org.apache.spark.mllib.clustering.BisectingKMeansModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
List<Vector> localData = Arrays.asList(
Vectors.dense(0.1, 0.1), Vectors.dense(0.3, 0.3),
Vectors.dense(10.1, 10.1), Vectors.dense(10.3, 10.3),
Vectors.dense(20.1, 20.1), Vectors.dense(20.3, 20.3),
Vectors.dense(30.1, 30.1), Vectors.dense(30.3, 30.3)
);
JavaRDD<Vector> data = sc.parallelize(localData, 2);
BisectingKMeans bkm = new BisectingKMeans()
.setK(4);
BisectingKMeansModel model = bkm.run(data);
System.out.println("Compute Cost: " + model.computeCost(data));
Vector[] clusterCenters = model.clusterCenters();
for (int i = 0; i < clusterCenters.length; i++) {
Vector clusterCenter = clusterCenters[i];
System.out.println("Cluster Center " + i + ": " + clusterCenter);
}
ストリーミングk-means
データがストリームで到着する場合、新しいデータが到着するにつれて更新される動的なクラスタを推定したい場合があります。spark.mllib
は、推定値の減衰(または「忘却」)を制御するパラメータを使用して、ストリーミングk平均クラスタリングをサポートしています。このアルゴリズムは、ミニバッチk平均更新ルールの一般化を使用します。各バッチのデータについて、すべての点を最も近いクラスタに割り当て、新しいクラスタの中心を計算し、次を使用して各クラスタを更新します。
\begin{equation} c_{t+1} = \frac{c_tn_t\alpha + x_tm_t}{n_t\alpha+m_t} \end{equation}
\begin{equation} n_{t+1} = n_t + m_t \end{equation}
ここで、$c_t$
はクラスタの以前の中心、$n_t$
はこれまでクラスタに割り当てられた点の数、$x_t$
は現在のバッチからの新しいクラスタの中心、$m_t$
は現在のバッチでクラスタに追加された点の数です。減衰係数$\alpha$
を使用して過去を無視できます。 $\alpha$=1
の場合、最初からすべてのデータが使用されます。 $\alpha$=0
の場合、最新のデータのみが使用されます。これは、指数加重移動平均に似ています。
減衰はhalfLife
パラメータを使用して指定できます。これは、時間t
に取得されたデータについて、時間t + halfLife
までにその寄与が0.5に低下するように、正しい減衰係数a
を決定します。時間の単位はbatches
またはpoints
として指定でき、更新ルールはそれに応じて調整されます。
例
この例は、ストリーミングデータ上のクラスタを推定する方法を示しています。
APIの詳細については、StreamingKMeans
Pythonドキュメントを参照してください。また、Spark StreamingプログラミングガイドでStreamingContextの詳細を参照してください。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.clustering import StreamingKMeans
# we make an input stream of vectors for training,
# as well as a stream of vectors for testing
def parse(lp):
label = float(lp[lp.find('(') + 1: lp.find(')')])
vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
return LabeledPoint(label, vec)
trainingData = sc.textFile("data/mllib/kmeans_data.txt")\
.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
testingData = sc.textFile("data/mllib/streaming_kmeans_data_test.txt").map(parse)
trainingQueue = [trainingData]
testingQueue = [testingData]
trainingStream = ssc.queueStream(trainingQueue)
testingStream = ssc.queueStream(testingQueue)
# We create a model with random clusters and specify the number of clusters to find
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
# Now register the streams for training and testing and start the job,
# printing the predicted cluster assignments on new data points as they arrive.
model.trainOn(trainingStream)
result = model.predictOnValues(testingStream.map(lambda lp: (lp.label, lp.features)))
result.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
APIの詳細については、StreamingKMeans
Scalaドキュメントを参照してください。また、Spark StreamingプログラミングガイドでStreamingContextの詳細を参照してください。
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
データを含む新しいテキストファイルを追加すると、クラスタの中心が更新されます。各トレーニングポイントは[x1, x2, x3]
としてフォーマットする必要があり、各テストデータポイントは(y, [x1, x2, x3])
としてフォーマットする必要があります。ここで、y
は、役立つラベルまたは識別子(例:真のカテゴリの割り当て)です。/training/data/dir
にテキストファイルが配置されると、モデルが更新されます。/testing/data/dir
にテキストファイルが配置されると、予測が表示されます。新しいデータを使用すると、クラスタの中心が変化します!