データ型 - RDDベースAPI

MLlibは、単一マシンに格納されるローカルベクトルとローカル行列、および1つ以上のRDDを基盤とする分散行列をサポートしています。ローカルベクトルとローカル行列は、公開インターフェースとして機能するシンプルなデータモデルです。基盤となる線形代数演算は、Breezeによって提供されます。教師あり学習で使用される学習例は、MLlibでは「ラベル付きポイント」と呼ばれます。

ローカルベクトル

ローカルベクトルは、整数型の0ベースのインデックスとdouble型の値を持っており、単一マシンに格納されます。MLlibは、密ベクトルと疎ベクトルの2種類のローカルベクトルをサポートしています。密ベクトルは、その要素値を表すdouble配列を基盤とし、疎ベクトルは、2つの並列配列(インデックスと値)を基盤とします。たとえば、ベクトル (1.0, 0.0, 3.0) は、密形式では [1.0, 0.0, 3.0]、疎形式では (3, [0, 2], [1.0, 3.0]) として表現できます。ここで、3 はベクトルのサイズです。

MLlibは、以下の型を密ベクトルとして認識します。

  • NumPyの array
  • Pythonのリスト、例: [1, 2, 3]

そして、以下の型を疎ベクトルとして認識します。

効率のために、リストよりもNumPy配列を使用することをお勧めします。また、疎ベクトルを作成するには、Vectorsに実装されているファクトリメソッドを使用することをお勧めします。

APIの詳細については、Vectors Pythonドキュメントを参照してください。

import numpy as np
import scipy.sparse as sps
from pyspark.mllib.linalg import Vectors

# Use a NumPy array as a dense vector.
dv1 = np.array([1.0, 0.0, 3.0])
# Use a Python list as a dense vector.
dv2 = [1.0, 0.0, 3.0]
# Create a SparseVector.
sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0])
# Use a single-column SciPy csc_matrix as a sparse vector.
sv2 = sps.csc_matrix((np.array([1.0, 3.0]), np.array([0, 2]), np.array([0, 2])), shape=(3, 1))

ローカルベクトルの基本クラスは Vector であり、2つの実装、DenseVectorSparseVector を提供しています。ローカルベクトルを作成するには、Vectorsに実装されているファクトリメソッドを使用することをお勧めします。

APIの詳細については、Vector ScalaドキュメントおよびVectors Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.{Vector, Vectors}

// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

注意: Scalaではデフォルトで scala.collection.immutable.Vector がインポートされるため、MLlibの Vector を使用するには、org.apache.spark.mllib.linalg.Vector を明示的にインポートする必要があります。

ローカルベクトルの基本クラスは Vector であり、2つの実装、DenseVectorSparseVector を提供しています。ローカルベクトルを作成するには、Vectorsに実装されているファクトリメソッドを使用することをお勧めします。

APIの詳細については、Vector JavaドキュメントおよびVectors Javaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

// Create a dense vector (1.0, 0.0, 3.0).
Vector dv = Vectors.dense(1.0, 0.0, 3.0);
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
Vector sv = Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0});

ラベル付きポイント

ラベル付きポイントは、密または疎のローカルベクトルで、ラベル/応答に関連付けられています。MLlibでは、ラベル付きポイントは教師あり学習アルゴリズムで使用されます。ラベルを格納するためにdouble型を使用するため、回帰と分類の両方でラベル付きポイントを使用できます。二項分類の場合、ラベルは 0(負)または 1(正)のいずれかである必要があります。多クラス分類の場合、ラベルは0から始まるクラスインデックスである必要があります: 0, 1, 2, ...

ラベル付きポイントは LabeledPoint によって表されます。

APIの詳細については、LabeledPoint Pythonドキュメントを参照してください。

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

ラベル付きポイントは、ケースクラス LabeledPoint によって表されます。

APIの詳細については、LabeledPoint Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint

// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

ラベル付きポイントは LabeledPoint によって表されます。

APIの詳細については、LabeledPoint Javaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;

// Create a labeled point with a positive label and a dense feature vector.
LabeledPoint pos = new LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0));

// Create a labeled point with a negative label and a sparse feature vector.
LabeledPoint neg = new LabeledPoint(0.0, Vectors.sparse(3, new int[] {0, 2}, new double[] {1.0, 3.0}));

スパースデータ

実際には、スパースな学習データを持つことが非常に一般的です。MLlibは、LIBSVMおよびLIBLINEARによってデフォルトで使用されるテキスト形式であるLIBSVM形式で格納された学習例の読み込みをサポートしています。これは、各行が以下の形式でラベル付きスパース特徴ベクトルを表すテキスト形式です。

label index1:value1 index2:value2 ...

ここで、インデックスは1ベースで昇順です。読み込み後、特徴インデックスは0ベースに変換されます。

MLUtils.loadLibSVMFile は、LIBSVM形式で格納された学習例を読み込みます。

APIの詳細については、MLUtils Pythonドキュメントを参照してください。

from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile は、LIBSVM形式で格納された学習例を読み込みます。

APIの詳細については、MLUtils Scalaドキュメントを参照してください。

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD

val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

MLUtils.loadLibSVMFile は、LIBSVM形式で格納された学習例を読み込みます。

APIの詳細については、MLUtils Javaドキュメントを参照してください。

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;

JavaRDD<LabeledPoint> examples = 
  MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();

ローカル行列

ローカル行列は、整数型の行と列のインデックス、およびdouble型の値を持っており、単一マシンに格納されます。MLlibは、密行列(要素値が列優先順で単一のdouble配列に格納される)と疎行列(非ゼロ要素値が列優先順でCompressed Sparse Column (CSC)形式で格納される)をサポートしています。たとえば、以下の密行列 \[ \begin{pmatrix} 1.0 & 2.0 \\ 3.0 & 4.0 \\ 5.0 & 6.0 \end{pmatrix} \] は、行列サイズ (3, 2) で、1次元配列 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0] として格納されます。

ローカル行列の基本クラスは Matrix であり、2つの実装、DenseMatrix、および SparseMatrix を提供しています。ローカル行列を作成するには、Matricesに実装されているファクトリメソッドを使用することをお勧めします。MLlibのローカル行列は列優先順で格納されることに注意してください。

APIの詳細については、Matrix PythonドキュメントおよびMatrices Pythonドキュメントを参照してください。

from pyspark.mllib.linalg import Matrix, Matrices

# Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6])

# Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])

ローカル行列の基本クラスは Matrix であり、2つの実装、DenseMatrix、および SparseMatrix を提供しています。ローカル行列を作成するには、Matricesに実装されているファクトリメソッドを使用することをお勧めします。MLlibのローカル行列は列優先順で格納されることに注意してください。

APIの詳細については、Matrix ScalaドキュメントおよびMatrices Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.{Matrix, Matrices}

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

ローカル行列の基本クラスは Matrix であり、2つの実装、DenseMatrix、および SparseMatrix を提供しています。ローカル行列を作成するには、Matricesに実装されているファクトリメソッドを使用することをお勧めします。MLlibのローカル行列は列優先順で格納されることに注意してください。

APIの詳細については、Matrix JavaドキュメントおよびMatrices Javaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Matrices;

// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
Matrix dm = Matrices.dense(3, 2, new double[] {1.0, 3.0, 5.0, 2.0, 4.0, 6.0});

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
Matrix sm = Matrices.sparse(3, 2, new int[] {0, 1, 3}, new int[] {0, 2, 1}, new double[] {9, 6, 8});

分散行列

分散行列は、long型の行と列のインデックス、およびdouble型の値を持っており、1つ以上のRDDに分散して格納されます。大規模で分散された行列を格納するための適切な形式を選択することは非常に重要です。分散行列を異なる形式に変換するには、グローバルシャッフルが必要になる場合があり、これは非常にコストがかかります。これまでに4種類の分散行列が実装されています。

基本型はRowMatrixと呼ばれます。RowMatrixは、意味のある行インデックスを持たない行指向の分散行列です(例:特徴ベクトルのコレクション)。これは、行のRDDを基盤とし、各行はローカルベクトルです。RowMatrixでは、列数がそれほど大きくないため、単一のローカルベクトルがドライバーに合理的に通信でき、単一ノードで格納/操作できると想定しています。IndexedRowMatrixRowMatrixに似ていますが、行インデックスがあり、行の識別や結合の実行に使用できます。CoordinateMatrixは、座標リスト (COO)形式で格納された分散行列で、エントリのRDDを基盤とします。BlockMatrixは、MatrixBlock(Int, Int, Matrix)のタプル)のRDDを基盤とする分散行列です。

注意

分散行列の基盤となるRDDは、行列サイズをキャッシュするため、決定論的でなければなりません。一般的に、非決定論的なRDDの使用はエラーにつながる可能性があります。

RowMatrix

a RowMatrix は、意味のある行インデックスを持たない行指向の分散行列であり、各行がローカルベクトルである行のRDDを基盤とします。各行はローカルベクトルで表されるため、列数は整数範囲によって制限されますが、実際にはそれよりもはるかに小さいはずです。

a RowMatrix は、ベクトルの RDD から作成できます。

APIの詳細については、RowMatrix Pythonドキュメントを参照してください。

from pyspark.mllib.linalg.distributed import RowMatrix

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

a RowMatrix は、RDD[Vector] インスタンスから作成できます。その後、列の要約統計量と分解を計算できます。QR分解は A = QR という形式で、Q は直交行列、R は上三角行列です。特異値分解 (SVD) および 主成分分析 (PCA) については、「次元削減」を参照してください。

APIの詳細については、RowMatrix Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix

val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// QR decomposition 
val qrResult = mat.tallSkinnyQR(true)

a RowMatrix は、JavaRDD<Vector> インスタンスから作成できます。その後、列の要約統計量を計算できます。

APIの詳細については、RowMatrix Javaドキュメントを参照してください。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<Vector> rows = ... // a JavaRDD of local vectors
// Create a RowMatrix from a JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// QR decomposition 
QRDecomposition<RowMatrix, Matrix> result = mat.tallSkinnyQR(true);

IndexedRowMatrix

a IndexedRowMatrix は、意味のある行インデックスを持つ RowMatrix と似ています。これは、インデックス付き行のRDDを基盤とするため、各行はインデックス(long型)とローカルベクトルによって表されます。

a IndexedRowMatrix は、IndexedRowRDD から作成できます。ここで、IndexedRow(long, vector) のラッパーです。IndexedRowMatrix は、行インデックスを削除することで RowMatrix に変換できます。

APIの詳細については、IndexedRowMatrix Pythonドキュメントを参照してください。

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# Create an RDD of indexed rows.
#   - This can be done explicitly with the IndexedRow class:
indexedRows = sc.parallelize([IndexedRow(0, [1, 2, 3]),
                              IndexedRow(1, [4, 5, 6]),
                              IndexedRow(2, [7, 8, 9]),
                              IndexedRow(3, [10, 11, 12])])
#   - or by using (long, vector) tuples:
indexedRows = sc.parallelize([(0, [1, 2, 3]), (1, [4, 5, 6]),
                              (2, [7, 8, 9]), (3, [10, 11, 12])])

# Create an IndexedRowMatrix from an RDD of IndexedRows.
mat = IndexedRowMatrix(indexedRows)

# Get its size.
m = mat.numRows()  # 4
n = mat.numCols()  # 3

# Get the rows as an RDD of IndexedRows.
rowsRDD = mat.rows

# Convert to a RowMatrix by dropping the row indices.
rowMat = mat.toRowMatrix()

a IndexedRowMatrix は、RDD[IndexedRow] インスタンスから作成できます。ここで、IndexedRow(Long, Vector) のラッパーです。IndexedRowMatrix は、行インデックスを削除することで RowMatrix に変換できます。

APIの詳細については、IndexedRowMatrix Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}

val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()

a IndexedRowMatrix は、JavaRDD<IndexedRow> インスタンスから作成できます。ここで、IndexedRow(long, Vector) のラッパーです。IndexedRowMatrix は、行インデックスを削除することで RowMatrix に変換できます。

APIの詳細については、IndexedRowMatrix Javaドキュメントを参照してください。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

JavaRDD<IndexedRow> rows = ... // a JavaRDD of indexed rows
// Create an IndexedRowMatrix from a JavaRDD<IndexedRow>.
IndexedRowMatrix mat = new IndexedRowMatrix(rows.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Drop its row indices.
RowMatrix rowMat = mat.toRowMatrix();

CoordinateMatrix

a CoordinateMatrix は、エントリのRDDを基盤とする分散行列です。各エントリは (i: Long, j: Long, value: Double) のタプルで、ここで i は行インデックス、j は列インデックス、value はエントリの値です。CoordinateMatrix は、行列の両方の次元が非常に大きく、行列が非常にスパースな場合にのみ使用する必要があります。

a CoordinateMatrix は、MatrixEntry エントリの RDD から作成できます。ここで、MatrixEntry(long, long, float) のラッパーです。CoordinateMatrix は、toRowMatrix を呼び出して RowMatrix に変換するか、toIndexedRowMatrix を呼び出してスパースな行を持つ IndexedRowMatrix に変換できます。

APIの詳細については、CoordinateMatrix Pythonドキュメントを参照してください。

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries.
#   - This can be done explicitly with the MatrixEntry class:
entries = sc.parallelize([MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)])
#   - or using (long, long, float) tuples:
entries = sc.parallelize([(0, 0, 1.2), (1, 0, 2.1), (2, 1, 3.7)])

# Create a CoordinateMatrix from an RDD of MatrixEntries.
mat = CoordinateMatrix(entries)

# Get its size.
m = mat.numRows()  # 3
n = mat.numCols()  # 2

# Get the entries as an RDD of MatrixEntries.
entriesRDD = mat.entries

# Convert to a RowMatrix.
rowMat = mat.toRowMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a BlockMatrix.
blockMat = mat.toBlockMatrix()

a CoordinateMatrix は、RDD[MatrixEntry] インスタンスから作成できます。ここで、MatrixEntry(Long, Long, Double) のラッパーです。CoordinateMatrix は、toIndexedRowMatrix を呼び出してスパースな行を持つ IndexedRowMatrix に変換できます。CoordinateMatrix のその他の計算は、現在サポートされていません。

APIの詳細については、CoordinateMatrix Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)

// Get its size.
val m = mat.numRows()
val n = mat.numCols()

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

a CoordinateMatrix は、JavaRDD<MatrixEntry> インスタンスから作成できます。ここで、MatrixEntry(long, long, double) のラッパーです。CoordinateMatrix は、toIndexedRowMatrix を呼び出してスパースな行を持つ IndexedRowMatrix に変換できます。CoordinateMatrix のその他の計算は、現在サポートされていません。

APIの詳細については、CoordinateMatrix Javaドキュメントを参照してください。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of matrix entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix mat = new CoordinateMatrix(entries.rdd());

// Get its size.
long m = mat.numRows();
long n = mat.numCols();

// Convert it to an IndexRowMatrix whose rows are sparse vectors.
IndexedRowMatrix indexedRowMatrix = mat.toIndexedRowMatrix();

BlockMatrix

a BlockMatrix は、MatrixBlock のRDDを基盤とする分散行列です。MatrixBlock((Int, Int), Matrix) のタプルで、ここで (Int, Int) はブロックのインデックス、Matrix は指定されたインデックスにある rowsPerBlock x colsPerBlock サイズのサブ行列です。BlockMatrix は、別の BlockMatrix との add および multiply などのメソッドをサポートしています。BlockMatrix には、BlockMatrix が正しく設定されているかを確認するために使用できるヘルパー関数 validate もあります。

a BlockMatrix は、サブ行列ブロックの RDD から作成できます。ここで、サブ行列ブロックは ((blockRowIndex, blockColIndex), sub-matrix) のタプルです。

APIの詳細については、BlockMatrix Pythonドキュメントを参照してください。

from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])),
                         ((1, 0), Matrices.dense(3, 2, [7, 8, 9, 10, 11, 12]))])

# Create a BlockMatrix from an RDD of sub-matrix blocks.
mat = BlockMatrix(blocks, 3, 2)

# Get its size.
m = mat.numRows()  # 6
n = mat.numCols()  # 2

# Get the blocks as an RDD of sub-matrix blocks.
blocksRDD = mat.blocks

# Convert to a LocalMatrix.
localMat = mat.toLocalMatrix()

# Convert to an IndexedRowMatrix.
indexedRowMat = mat.toIndexedRowMatrix()

# Convert to a CoordinateMatrix.
coordinateMat = mat.toCoordinateMatrix()

a BlockMatrix は、toBlockMatrix を呼び出すことにより、IndexedRowMatrix または CoordinateMatrix から最も簡単に作成できます。toBlockMatrix はデフォルトで 1024 x 1024 のサイズでブロックを作成します。ユーザーは、toBlockMatrix(rowsPerBlock, colsPerBlock) を介して値を指定することで、ブロックサイズを変更できます。

APIの詳細については、BlockMatrix Scalaドキュメントを参照してください。

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

a BlockMatrix は、toBlockMatrix を呼び出すことにより、IndexedRowMatrix または CoordinateMatrix から最も簡単に作成できます。toBlockMatrix はデフォルトで 1024 x 1024 のサイズでブロックを作成します。ユーザーは、toBlockMatrix(rowsPerBlock, colsPerBlock) を介して値を指定することで、ブロックサイズを変更できます。

APIの詳細については、BlockMatrix Javaドキュメントを参照してください。

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;

JavaRDD<MatrixEntry> entries = ... // a JavaRDD of (i, j, v) Matrix Entries
// Create a CoordinateMatrix from a JavaRDD<MatrixEntry>.
CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();

// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();

// Calculate A^T A.
BlockMatrix ata = matA.transpose().multiply(matA);