次元削減 - RDDベースAPI
次元削減とは、考慮する変数の数を減らすプロセスです。生データやノイズの多い特徴量から潜在的な特徴を抽出したり、構造を維持しながらデータを圧縮したりするために使用できます。spark.mllib は、RowMatrix クラスで次元削減をサポートしています。
特異値分解 (SVD)
特異値分解 (SVD) は、行列を3つの行列 $U$、$\Sigma$、$V$ に分解します。
\[ A = U \Sigma V^T, \]
ここで
- $U$ は正規直交行列であり、その列は左特異ベクトルと呼ばれます。
- $\Sigma$ は非負の対角成分を降順に並べた対角行列であり、その対角成分は特異値と呼ばれます。
- $V$ は正規直交行列であり、その列は右特異ベクトルと呼ばれます。
大規模な行列の場合、通常は完全な分解を必要とせず、上位の特異値とその関連する特異ベクトルのみが必要です。これにより、ストレージを節約し、ノイズを除去し、行列の低ランク構造を復元できます。
上位 $k$ 個の特異値を保持する場合、結果として得られる低ランク行列の次元は次のようになります。
$U$:$m \times k$,$\Sigma$:$k \times k$,$V$:$n \times k$.
パフォーマンス
$n$ は $m$ より小さいと仮定します。特異値と右特異ベクトルは、Gram行列 $A^T A$ の固有値と固有ベクトルから導出されます。左特異ベクトル $U$ を格納する行列は、$U = A (V S^{-1})$ として行列乗算によって計算されます(ユーザーが computeU パラメータで要求した場合)。実際の計算方法は、計算コストに基づいて自動的に決定されます。
- $n$ が小さい場合 ($n < 100$) または $k$ が $n$ に比べて大きい場合 ($k > n / 2$) は、まずGram行列を計算し、次にその上位固有値と固有ベクトルをドライバーでローカルに計算します。これには、各エグゼキューターとドライバーで $O(n^2)$ のストレージを必要とする1回のパスと、ドライバーで $O(n^2 k)$ の時間が必要です。
- それ以外の場合は、$(A^T A) v$ を分散方式で計算し、ARPACK に送信して、ドライバーノードで $(A^T A)$ の上位固有値と固有ベクトルを計算します。これには、$O(k)$ 回のパス、各エグゼキューターで $O(n)$ のストレージ、ドライバーで $O(n k)$ のストレージが必要です。
SVDの例
spark.mllib は、行指向の行列(RowMatrix クラスで提供)に対してSVD機能を提供します。
APIの詳細については、SingularValueDecomposition Python ドキュメントを参照してください。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 5 singular values and corresponding singular vectors.
svd = mat.computeSVD(5, computeU=True)
U = svd.U # The U factor is a RowMatrix.
s = svd.s # The singular values are stored in a local dense vector.
V = svd.V # The V factor is a local dense matrix.U が IndexedRowMatrix として定義されている場合、IndexedRowMatrix にも同じコードが適用されます。
APIの詳細については、SingularValueDecomposition Scala ドキュメントを参照してください。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(immutable.ArraySeq.unsafeWrapArray(data))
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 5 singular values and corresponding singular vectors.
val svd: SingularValueDecomposition[RowMatrix, Matrix] = mat.computeSVD(5, computeU = true)
val U: RowMatrix = svd.U // The U factor is a RowMatrix.
val s: Vector = svd.s // The singular values are stored in a local dense vector.
val V: Matrix = svd.V // The V factor is a local dense matrix.U が IndexedRowMatrix として定義されている場合、IndexedRowMatrix にも同じコードが適用されます。
APIの詳細については、SingularValueDecomposition Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 5 singular values and corresponding singular vectors.
SingularValueDecomposition<RowMatrix, Matrix> svd = mat.computeSVD(5, true, 1.0E-9d);
RowMatrix U = svd.U(); // The U factor is a RowMatrix.
Vector s = svd.s(); // The singular values are stored in a local dense vector.
Matrix V = svd.V(); // The V factor is a local dense matrix.U が IndexedRowMatrix として定義されている場合、IndexedRowMatrix にも同じコードが適用されます。
主成分分析 (PCA)
主成分分析 (PCA) は、最初の座標が可能な限り最大の分散を持つように回転を見つけ、各後続の座標が順に可能な限り最大の分散を持つようにする統計的手法です。回転行列の列は主成分と呼ばれます。PCAは次元削減に広く使用されています。
spark.mllib は、行指向形式で格納された背の高い細い行列と、任意のVectorのPCAをサポートしています。
次のコードは、RowMatrix で主成分を計算し、それらを使用してベクトルを低次元空間に射影する方法を示しています。
APIの詳細については、RowMatrix Python ドキュメントを参照してください。
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import RowMatrix
rows = sc.parallelize([
Vectors.sparse(5, {1: 1.0, 3: 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
])
mat = RowMatrix(rows)
# Compute the top 4 principal components.
# Principal components are stored in a local dense matrix.
pc = mat.computePrincipalComponents(4)
# Project the rows to the linear space spanned by the top 4 principal components.
projected = mat.multiply(pc)次のコードは、RowMatrix で主成分を計算し、それらを使用してベクトルを低次元空間に射影する方法を示しています。
APIの詳細については、RowMatrix Scala ドキュメントを参照してください。
import org.apache.spark.mllib.linalg.Matrix
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
val rows = sc.parallelize(immutable.ArraySeq.unsafeWrapArray(data))
val mat: RowMatrix = new RowMatrix(rows)
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
val pc: Matrix = mat.computePrincipalComponents(4)
// Project the rows to the linear space spanned by the top 4 principal components.
val projected: RowMatrix = mat.multiply(pc)次のコードは、ソースベクトルで主成分を計算し、それらを使用して関連付けられたラベルを保持しながらベクトルを低次元空間に射影する方法を示しています。
APIの詳細については、PCA Scala ドキュメントを参照してください。
import org.apache.spark.mllib.feature.PCA
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
val data: RDD[LabeledPoint] = sc.parallelize(Seq(
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 1)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 1, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0)),
new LabeledPoint(0, Vectors.dense(1, 0, 0, 0, 0)),
new LabeledPoint(1, Vectors.dense(1, 1, 0, 0, 0))))
// Compute the top 5 principal components.
val pca = new PCA(5).fit(data.map(_.features))
// Project vectors to the linear space spanned by the top 5 principal
// components, keeping the label
val projected = data.map(p => p.copy(features = pca.transform(p.features)))次のコードは、RowMatrix で主成分を計算し、それらを使用してベクトルを低次元空間に射影する方法を示しています。
APIの詳細については、RowMatrix Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
List<Vector> data = Arrays.asList(
Vectors.sparse(5, new int[] {1, 3}, new double[] {1.0, 7.0}),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
);
JavaRDD<Vector> rows = jsc.parallelize(data);
// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());
// Compute the top 4 principal components.
// Principal components are stored in a local dense matrix.
Matrix pc = mat.computePrincipalComponents(4);
// Project the rows to the linear space spanned by the top 4 principal components.
RowMatrix projected = mat.multiply(pc);