分類と回帰

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

このページでは、分類と回帰のためのアルゴリズムについて説明します。また、線形手法、ツリー、アンサンブルなどの特定のアルゴリズムクラスについても説明します。

目次

分類

ロジスティック回帰

ロジスティック回帰は、カテゴリカルな応答を予測するための一般的な手法です。これは、一般化線形モデルの特殊なケースであり、結果の確率を予測します。spark.mlでは、二項ロジスティック回帰を使用して二項応答を予測したり、多項ロジスティック回帰を使用して多クラス応答を予測したりできます。family パラメータを使用してこれら2つのアルゴリズムのいずれかを選択するか、未設定のままにしておくと、Sparkが正しいバリアントを推測します。

多項ロジスティック回帰は、family パラメータを「multinomial」に設定することで、二項分類に使用できます。これにより、2組の係数と2つの切片が生成されます。

切片なしでLogisticRegressionModelを定数非ゼロ列を持つデータセットで適合させる場合、Spark MLlibは定数非ゼロ列に対してゼロ係数を出力します。この動作は、Rのglmnetと同じですが、LIBSVMとは異なります。

二項ロジスティック回帰

二項ロジスティック回帰の実装に関する背景情報と詳細については、spark.mllib のロジスティック回帰のドキュメントを参照してください。

以下の例は、弾性ネット正則化を使用した二項および多項ロジスティック回帰モデルのトレーニング方法を示しています。elasticNetParam は $\alpha$ に対応し、regParam は $\lambda$ に対応します。

パラメータの詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/logistic_regression_with_elastic_net.py」にあります。

パラメータの詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for logistic regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// We can also use the multinomial family for binary classification
val mlr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)
  .setFamily("multinomial")

val mlrModel = mlr.fit(training)

// Print the coefficients and intercepts for logistic regression with multinomial family
println(s"Multinomial coefficients: ${mlrModel.coefficientMatrix}")
println(s"Multinomial intercepts: ${mlrModel.interceptVector}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionWithElasticNetExample.scala」にあります。

パラメータの詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for logistic regression
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// We can also use the multinomial family for binary classification
LogisticRegression mlr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8)
        .setFamily("multinomial");

// Fit the model
LogisticRegressionModel mlrModel = mlr.fit(training);

// Print the coefficients and intercepts for logistic regression with multinomial family
System.out.println("Multinomial coefficients: " + lrModel.coefficientMatrix()
  + "\nMultinomial intercepts: " + mlrModel.interceptVector());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionWithElasticNetExample.java」にあります。

パラメータの詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit an binomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/logit.R」にあります。

ロジスティック回帰のspark.ml実装は、トレーニングセット上のモデルの概要を抽出することもサポートしています。LogisticRegressionSummaryのDataFrameとして保存されている予測とメトリックは@transientとして注釈付けされているため、ドライバーでのみ利用可能であることに注意してください。

LogisticRegressionTrainingSummary は、LogisticRegressionModel の概要を提供します。二項分類の場合、ROC曲線など、特定の追加メトリックが利用可能です。BinaryLogisticRegressionTrainingSummary を参照してください。

前述の例の続き

from pyspark.ml.classification import LogisticRegression

# Extract the summary from the returned LogisticRegressionModel instance trained
# in the earlier example
trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
trainingSummary.roc.show()
print("areaUnderROC: " + str(trainingSummary.areaUnderROC))

# Set the model threshold to maximize F-Measure
fMeasure = trainingSummary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']) \
    .select('threshold').head()['threshold']
lr.setThreshold(bestThreshold)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/logistic_regression_summary_example.py」にあります。

LogisticRegressionTrainingSummary は、LogisticRegressionModel の概要を提供します。二項分類の場合、ROC曲線など、特定の追加メトリックが利用可能です。二項概要はbinarySummaryメソッドを介してアクセスできます。BinaryLogisticRegressionTrainingSummary を参照してください。

前述の例の続き

import org.apache.spark.ml.classification.LogisticRegression

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
val trainingSummary = lrModel.binarySummary

// Obtain the objective per iteration.
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(loss => println(loss))

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
val roc = trainingSummary.roc
roc.show()
println(s"areaUnderROC: ${trainingSummary.areaUnderROC}")

// Set the model threshold to maximize F-Measure
val fMeasure = trainingSummary.fMeasureByThreshold
val maxFMeasure = fMeasure.select(max("F-Measure")).head().getDouble(0)
val bestThreshold = fMeasure.where($"F-Measure" === maxFMeasure)
  .select("threshold").head().getDouble(0)
lrModel.setThreshold(bestThreshold)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/LogisticRegressionSummaryExample.scala」にあります。

LogisticRegressionTrainingSummary は、LogisticRegressionModel の概要を提供します。二項分類の場合、ROC曲線など、特定の追加メトリックが利用可能です。二項概要はbinarySummaryメソッドを介してアクセスできます。BinaryLogisticRegressionTrainingSummary を参照してください。

前述の例の続き

import org.apache.spark.ml.classification.BinaryLogisticRegressionTrainingSummary;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

// Extract the summary from the returned LogisticRegressionModel instance trained in the earlier
// example
BinaryLogisticRegressionTrainingSummary trainingSummary = lrModel.binarySummary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
  System.out.println(lossPerIteration);
}

// Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
Dataset<Row> roc = trainingSummary.roc();
roc.show();
roc.select("FPR").show();
System.out.println(trainingSummary.areaUnderROC());

// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with
// this selected threshold.
Dataset<Row> fMeasure = trainingSummary.fMeasureByThreshold();
double maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);
double bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure))
  .select("threshold").head().getDouble(0);
lrModel.setThreshold(bestThreshold);
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaLogisticRegressionSummaryExample.java」にあります。

多項ロジスティック回帰

多クラス分類は、多項ロジスティック (ソフトマックス) 回帰を介してサポートされます。多項ロジスティック回帰では、アルゴリズムは $K$ 組の係数、または $K \times J$ の次元の行列を生成します。ここで、$K$ はクラスの数、$J$ は特徴量の数です。アルゴリズムが切片項で適合される場合、長さ $K$ の切片ベクトルが利用可能になります。

多項係数はcoefficientMatrixとして、切片はinterceptVectorとして利用可能です。

多項ファミリーでトレーニングされたロジスティック回帰モデルのcoefficientsおよびinterceptメソッドはサポートされていません。代わりにcoefficientMatrixおよびinterceptVectorを使用してください。

結果のクラス $k \in {1, 2, …, K}$ の条件付き確率は、ソフトマックス関数を使用してモデル化されます。

\[ P(Y=k|\mathbf{X}, \boldsymbol{\beta}_k, \beta_{0k}) = \frac{e^{\boldsymbol{\beta}_k \cdot \mathbf{X} + \beta_{0k}}}{\sum_{k'=0}^{K-1} e^{\boldsymbol{\beta}_{k'} \cdot \mathbf{X} + \beta_{0k'}}} \]

過学習を制御するために、多項応答モデルと弾性ネットペナルティを使用して、重み付けされた負の対数尤度を最小化します。

\[ \min_{\beta, \beta_0} -\left[\sum_{i=1}^L w_i \cdot \log P(Y = y_i|\mathbf{x}_i)\right] + \lambda \left[\frac{1}{2}\left(1 - \alpha\right)||\boldsymbol{\beta}||_2^2 + \alpha ||\boldsymbol{\beta}||_1\right] \]

詳細な導出については、こちらを参照してください。

以下の例は、弾性ネット正則化を使用した多クラスロジスティック回帰モデルのトレーニング方法、およびモデル評価のための多クラストレーニング概要の抽出方法を示しています。

from pyspark.ml.classification import LogisticRegression

# Load training data
training = spark \
    .read \
    .format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lrModel.coefficientMatrix))
print("Intercept: " + str(lrModel.interceptVector))

trainingSummary = lrModel.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/multiclass_logistic_regression_with_elastic_net.py」にあります。
import org.apache.spark.ml.classification.LogisticRegression

// Load training data
val training = spark
  .read
  .format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for multinomial logistic regression
println(s"Coefficients: \n${lrModel.coefficientMatrix}")
println(s"Intercepts: \n${lrModel.interceptVector}")

val trainingSummary = lrModel.summary

// Obtain the objective per iteration
val objectiveHistory = trainingSummary.objectiveHistory
println("objectiveHistory:")
objectiveHistory.foreach(println)

// for multiclass, we can inspect metrics on a per-label basis
println("False positive rate by label:")
trainingSummary.falsePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
  println(s"label $label: $rate")
}

println("True positive rate by label:")
trainingSummary.truePositiveRateByLabel.zipWithIndex.foreach { case (rate, label) =>
  println(s"label $label: $rate")
}

println("Precision by label:")
trainingSummary.precisionByLabel.zipWithIndex.foreach { case (prec, label) =>
  println(s"label $label: $prec")
}

println("Recall by label:")
trainingSummary.recallByLabel.zipWithIndex.foreach { case (rec, label) =>
  println(s"label $label: $rec")
}


println("F-measure by label:")
trainingSummary.fMeasureByLabel.zipWithIndex.foreach { case (f, label) =>
  println(s"label $label: $f")
}

val accuracy = trainingSummary.accuracy
val falsePositiveRate = trainingSummary.weightedFalsePositiveRate
val truePositiveRate = trainingSummary.weightedTruePositiveRate
val fMeasure = trainingSummary.weightedFMeasure
val precision = trainingSummary.weightedPrecision
val recall = trainingSummary.weightedRecall
println(s"Accuracy: $accuracy\nFPR: $falsePositiveRate\nTPR: $truePositiveRate\n" +
  s"F-measure: $fMeasure\nPrecision: $precision\nRecall: $recall")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/MulticlassLogisticRegressionWithElasticNetExample.scala」にあります。
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.classification.LogisticRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
        .load("data/mllib/sample_multiclass_classification_data.txt");

LogisticRegression lr = new LogisticRegression()
        .setMaxIter(10)
        .setRegParam(0.3)
        .setElasticNetParam(0.8);

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for multinomial logistic regression
System.out.println("Coefficients: \n"
        + lrModel.coefficientMatrix() + " \nIntercept: " + lrModel.interceptVector());
LogisticRegressionTrainingSummary trainingSummary = lrModel.summary();

// Obtain the loss per iteration.
double[] objectiveHistory = trainingSummary.objectiveHistory();
for (double lossPerIteration : objectiveHistory) {
    System.out.println(lossPerIteration);
}

// for multiclass, we can inspect metrics on a per-label basis
System.out.println("False positive rate by label:");
int i = 0;
double[] fprLabel = trainingSummary.falsePositiveRateByLabel();
for (double fpr : fprLabel) {
    System.out.println("label " + i + ": " + fpr);
    i++;
}

System.out.println("True positive rate by label:");
i = 0;
double[] tprLabel = trainingSummary.truePositiveRateByLabel();
for (double tpr : tprLabel) {
    System.out.println("label " + i + ": " + tpr);
    i++;
}

System.out.println("Precision by label:");
i = 0;
double[] precLabel = trainingSummary.precisionByLabel();
for (double prec : precLabel) {
    System.out.println("label " + i + ": " + prec);
    i++;
}

System.out.println("Recall by label:");
i = 0;
double[] recLabel = trainingSummary.recallByLabel();
for (double rec : recLabel) {
    System.out.println("label " + i + ": " + rec);
    i++;
}

System.out.println("F-measure by label:");
i = 0;
double[] fLabel = trainingSummary.fMeasureByLabel();
for (double f : fLabel) {
    System.out.println("label " + i + ": " + f);
    i++;
}

double accuracy = trainingSummary.accuracy();
double falsePositiveRate = trainingSummary.weightedFalsePositiveRate();
double truePositiveRate = trainingSummary.weightedTruePositiveRate();
double fMeasure = trainingSummary.weightedFMeasure();
double precision = trainingSummary.weightedPrecision();
double recall = trainingSummary.weightedRecall();
System.out.println("Accuracy: " + accuracy);
System.out.println("FPR: " + falsePositiveRate);
System.out.println("TPR: " + truePositiveRate);
System.out.println("F-measure: " + fMeasure);
System.out.println("Precision: " + precision);
System.out.println("Recall: " + recall);
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaMulticlassLogisticRegressionWithElasticNetExample.java」にあります。

パラメータの詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a multinomial logistic regression model with spark.logit
model <- spark.logit(training, label ~ features, maxIter = 10, regParam = 0.3, elasticNetParam = 0.8)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/logit.R」にあります。

決定木分類器

決定木は、分類および回帰手法の一般的なファミリです。spark.ml 実装に関する詳細については、決定木に関するセクションで確認できます。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。データ準備には2つの特徴量トランスフォーマーを使用します。これらは、ラベルとカテゴリ特徴量のカテゴリをインデックス付けするのに役立ち、決定木アルゴリズムが認識できるメタデータをDataFrameに追加します。

パラメータの詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/decision_tree_classification_example.py」にあります。

パラメータの詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")

val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeClassificationExample.scala」にあります。

パラメータの詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.DecisionTreeClassifier;
import org.apache.spark.ml.classification.DecisionTreeClassificationModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);

// Automatically identify categorical features, and index them.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a DecisionTree model.
DecisionTreeClassifier dt = new DecisionTreeClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{labelIndexer, featureIndexer, dt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

DecisionTreeClassificationModel treeModel =
  (DecisionTreeClassificationModel) (model.stages()[2]);
System.out.println("Learned classification tree model:\n" + treeModel.toDebugString());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeClassificationExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a DecisionTree classification model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "classification")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/decisionTree.R」にあります。

ランダムフォレスト分類器

ランダムフォレストは、分類および回帰手法の一般的なファミリです。spark.ml 実装に関する詳細については、ランダムフォレストに関するセクションで確認できます。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。データ準備には2つの特徴量トランスフォーマーを使用します。これらは、ラベルとカテゴリ特徴量のカテゴリをインデックス付けするのに役立ち、ツリーベースのアルゴリズムが認識できるメタデータをDataFrameに追加します。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/random_forest_classifier_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setNumTrees(10)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${(1.0 - accuracy)}")

val rfModel = model.stages(2).asInstanceOf[RandomForestClassificationModel]
println(s"Learned classification forest model:\n ${rfModel.toDebugString}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/RandomForestClassifierExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.RandomForestClassificationModel;
import org.apache.spark.ml.classification.RandomForestClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestClassifier rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures");

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, rf, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

RandomForestClassificationModel rfModel = (RandomForestClassificationModel)(model.stages()[2]);
System.out.println("Learned classification forest model:\n" + rfModel.toDebugString());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestClassifierExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a random forest classification model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "classification", numTrees = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/randomForest.R」にあります。

勾配ブースティング木分類器

勾配ブースティング木 (GBT) は、決定木のアンサンブルを使用した一般的な分類および回帰手法です。spark.ml 実装に関する詳細については、GBT に関するセクションで確認できます。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。データ準備には2つの特徴量トランスフォーマーを使用します。これらは、ラベルとカテゴリ特徴量のカテゴリをインデックス付けするのに役立ち、ツリーベースのアルゴリズムが認識できるメタデータをDataFrameに追加します。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)

# Chain indexers and GBT in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

gbtModel = model.stages[2]
print(gbtModel)  # summary only
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/gradient_boosted_tree_classifier_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)
  .setFeatureSubsetStrategy("auto")

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Chain indexers and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))

// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1.0 - accuracy}")

val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel]
println(s"Learned classification GBT model:\n ${gbtModel.toDebugString}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeClassifierExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.GBTClassificationModel;
import org.apache.spark.ml.classification.GBTClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
  .read()
  .format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data);
// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTClassifier gbt = new GBTClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray()[0]);

// Chain indexers and GBT in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {labelIndexer, featureIndexer, gbt, labelConverter});

// Train model. This also runs the indexers.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1.0 - accuracy));

GBTClassificationModel gbtModel = (GBTClassificationModel)(model.stages()[2]);
System.out.println("Learned classification GBT model:\n" + gbtModel.toDebugString());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeClassifierExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a GBT classification model with spark.gbt
model <- spark.gbt(training, label ~ features, "classification", maxIter = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/gbt.R」にあります。

多層パーセプトロン分類器

多層パーセプトロン分類器 (MLPC) は、フィードフォワード人工ニューラルネットワークに基づいた分類器です。MLPCは複数のノード層で構成されます。各層はネットワークの次の層と完全に接続されています。入力層のノードは入力データを表します。他のすべてのノードは、ノードの重み $\wv$ とバイアス $\bv$ との線形結合によって入力を出力にマッピングし、活性化関数を適用します。これは、$K+1$ 層のMLPCの場合、行列形式で次のように表すことができます:\[ \mathrm{y}(\x) = \mathrm{f_K}(...\mathrm{f_2}(\wv_2^T\mathrm{f_1}(\wv_1^T \x+b_1)+b_2)...+b_K) \] 中間層のノードはシグモイド (ロジスティック) 関数を使用します:\[ \mathrm{f}(z_i) = \frac{1}{1 + e^{-z_i}} \] 出力層のノードはソフトマックス関数を使用します:\[ \mathrm{f}(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}} \] 出力層のノード数 $N$ はクラス数に対応します。

MLPCは、モデル学習のためにバックプロパゲーションを採用しています。最適化にはロジスティック損失関数を、最適化ルーチンにはL-BFGSを使用します。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm")\
    .load("data/mllib/sample_multiclass_classification_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]

# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)

# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/multilayer_perceptron_classification.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)

// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100)

// train the model
val model = trainer.fit(train)

// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")

println(s"Test set accuracy = ${evaluator.evaluate(predictionAndLabels)}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/MultilayerPerceptronClassifierExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.ml.classification.MultilayerPerceptronClassificationModel;
import org.apache.spark.ml.classification.MultilayerPerceptronClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;

// Load training data
String path = "data/mllib/sample_multiclass_classification_data.txt";
Dataset<Row> dataFrame = spark.read().format("libsvm").load(path);

// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];

// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
int[] layers = new int[] {4, 5, 4, 3};

// create the trainer and set its parameters
MultilayerPerceptronClassifier trainer = new MultilayerPerceptronClassifier()
  .setLayers(layers)
  .setBlockSize(128)
  .setSeed(1234L)
  .setMaxIter(100);

// train the model
MultilayerPerceptronClassificationModel model = trainer.fit(train);

// compute accuracy on the test set
Dataset<Row> result = model.transform(test);
Dataset<Row> predictionAndLabels = result.select("prediction", "label");
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy");

System.out.println("Test set accuracy = " + evaluator.evaluate(predictionAndLabels));
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaMultilayerPerceptronClassifierExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training <- df
test <- df

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = c(4, 5, 4, 3)

# Fit a multi-layer perceptron neural network model with spark.mlp
model <- spark.mlp(training, label ~ features, maxIter = 100,
                   layers = layers, blockSize = 128, seed = 1234)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/mlp.R」にあります。

線形サポートベクターマシン

サポートベクターマシンは、高次元または無限次元空間に超平面または超平面のセットを構築します。これは、分類、回帰、またはその他のタスクに使用できます。直感的には、適切な分離は、一般にマージンが大きいほど分類器の汎化誤差が低くなるため、いずれかのクラスの最も近いトレーニングデータポイントからの距離 (いわゆる機能マージン) が最大である超平面によって達成されます。Spark MLのLinearSVCは、線形SVMを使用した二項分類をサポートしています。内部的には、OWLQNオプティマイザを使用してヒンジ損失を最適化します。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.classification import LinearSVC

# Load training data
training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

lsvc = LinearSVC(maxIter=10, regParam=0.1)

# Fit the model
lsvcModel = lsvc.fit(training)

# Print the coefficients and intercept for linear SVC
print("Coefficients: " + str(lsvcModel.coefficients))
print("Intercept: " + str(lsvcModel.intercept))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/linearsvc.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.classification.LinearSVC

// Load training data
val training = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1)

// Fit the model
val lsvcModel = lsvc.fit(training)

// Print the coefficients and intercept for linear svc
println(s"Coefficients: ${lsvcModel.coefficients} Intercept: ${lsvcModel.intercept}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/LinearSVCExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.classification.LinearSVC;
import org.apache.spark.ml.classification.LinearSVCModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

LinearSVC lsvc = new LinearSVC()
  .setMaxIter(10)
  .setRegParam(0.1);

// Fit the model
LinearSVCModel lsvcModel = lsvc.fit(training);

// Print the coefficients and intercept for LinearSVC
System.out.println("Coefficients: "
  + lsvcModel.coefficients() + " Intercept: " + lsvcModel.intercept());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaLinearSVCExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# load training data
t <- as.data.frame(Titanic)
training <- createDataFrame(t)

# fit Linear SVM model
model <- spark.svmLinear(training,  Survived ~ ., regParam = 0.01, maxIter = 10)

# Model summary
summary(model)

# Prediction
prediction <- predict(model, training)
showDF(prediction)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/svmLinear.R」にあります。

One-vs-Rest 分類器 (別名 One-vs-All)

OneVsRest は、効率的に二項分類を実行できるベース分類器が与えられた場合に、多クラス分類を実行するための機械学習リダクションの例です。「One-vs-All」としても知られています。

OneVsRestEstimator として実装されています。ベース分類器として Classifier のインスタンスを受け取り、$k$ 個のクラスそれぞれについて二項分類問題を作成します。クラス $i$ の分類器は、ラベルが $i$ であるかどうかを予測するようにトレーニングされ、クラス $i$ を他のすべてのクラスから区別します。

予測は、各二項分類器を評価し、最も信頼性の高い分類器のインデックスをラベルとして出力することによって行われます。

以下の例は、Iris データセットをロードし、DataFrameとして解析し、OneVsRest を使用して多クラス分類を実行する方法を示しています。テストエラーは、アルゴリズムの精度を測定するために計算されます。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# load data file.
inputData = spark.read.format("libsvm") \
    .load("data/mllib/sample_multiclass_classification_data.txt")

# generate the train/test split.
(train, test) = inputData.randomSplit([0.8, 0.2])

# instantiate the base classifier.
lr = LogisticRegression(maxIter=10, tol=1E-6, fitIntercept=True)

# instantiate the One Vs Rest Classifier.
ovr = OneVsRest(classifier=lr)

# train the multiclass model.
ovrModel = ovr.fit(train)

# score the model on test data.
predictions = ovrModel.transform(test)

# obtain evaluator.
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")

# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/one_vs_rest_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.classification.{LogisticRegression, OneVsRest}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// load data file.
val inputData = spark.read.format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt")

// generate the train/test split.
val Array(train, test) = inputData.randomSplit(Array(0.8, 0.2))

// instantiate the base classifier
val classifier = new LogisticRegression()
  .setMaxIter(10)
  .setTol(1E-6)
  .setFitIntercept(true)

// instantiate the One Vs Rest Classifier.
val ovr = new OneVsRest().setClassifier(classifier)

// train the multiclass model.
val ovrModel = ovr.fit(train)

// score the model on test data.
val predictions = ovrModel.transform(test)

// obtain evaluator.
val evaluator = new MulticlassClassificationEvaluator()
  .setMetricName("accuracy")

// compute the classification error on test data.
val accuracy = evaluator.evaluate(predictions)
println(s"Test Error = ${1 - accuracy}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/OneVsRestExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.OneVsRest;
import org.apache.spark.ml.classification.OneVsRestModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// load data file.
Dataset<Row> inputData = spark.read().format("libsvm")
  .load("data/mllib/sample_multiclass_classification_data.txt");

// generate the train/test split.
Dataset<Row>[] tmp = inputData.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> train = tmp[0];
Dataset<Row> test = tmp[1];

// configure the base classifier.
LogisticRegression classifier = new LogisticRegression()
  .setMaxIter(10)
  .setTol(1E-6)
  .setFitIntercept(true);

// instantiate the One Vs Rest Classifier.
OneVsRest ovr = new OneVsRest().setClassifier(classifier);

// train the multiclass model.
OneVsRestModel ovrModel = ovr.fit(train);

// score the model on test data.
Dataset<Row> predictions = ovrModel.transform(test)
  .select("prediction", "label");

// obtain evaluator.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
        .setMetricName("accuracy");

// compute the classification error on test data.
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Error = " + (1 - accuracy));
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaOneVsRestExample.java」にあります。

ナイーブベイズ

ナイーブベイズ分類器は、すべての特徴量のペア間に強い (ナイーブな) 独立性の仮定に基づいて、単純な確率的、多クラス分類器のファミリです。

ナイーブベイズは非常に効率的にトレーニングできます。トレーニングデータセットを1回スキャンするだけで、各ラベルが与えられた各特徴量の条件付き確率分布を計算します。予測には、ベイズの定理を適用して、観測値が与えられた各ラベルの条件付き確率分布を計算します。

MLlibは、多項ナイーブベイズ補完ナイーブベイズベルヌーイナイーブベイズ、およびガウスナイーブベイズをサポートしています。

入力データ: これらの多項、補完、ベルヌーイモデルは、通常、ドキュメント分類に使用されます。その文脈では、各観測値はドキュメントであり、各特徴量は用語を表します。特徴量の値は、用語の頻度 (多項または補完ナイーブベイズ) または用語がドキュメントに含まれているかどうかを示す 0 または 1 (ベルヌーイナイーブベイズ) です。多項モデルとベルヌーイモデルの特徴量値は非負である必要があります。モデルタイプは、オプションパラメータ「multinomial」、「complement」、「bernoulli」、「gaussian」で選択され、デフォルトは「multinomial」です。ドキュメント分類の場合、入力特徴量ベクトルは通常スパースベクトルである必要があります。トレーニングデータは一度しか使用されないため、キャッシュする必要はありません。

$\lambda$ (デフォルト 1.0) パラメータを設定することで、加算スムージングを使用できます。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load training data
data = spark.read.format("libsvm") \
    .load("data/mllib/sample_libsvm_data.txt")

# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]

# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

# train the model
model = nb.fit(train)

# select example rows to display.
predictions = model.transform(test)
predictions.show()

# compute accuracy on the test set
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction",
                                              metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/naive_bayes_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.classification.NaiveBayes
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Split the data into training and test sets (30% held out for testing)
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3), seed = 1234L)

// Train a NaiveBayes model.
val model = new NaiveBayes()
  .fit(trainingData)

// Select example rows to display.
val predictions = model.transform(testData)
predictions.show()

// Select (prediction, true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/NaiveBayesExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data
Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
// Split the data into train and test
Dataset<Row>[] splits = dataFrame.randomSplit(new double[]{0.6, 0.4}, 1234L);
Dataset<Row> train = splits[0];
Dataset<Row> test = splits[1];

// create the trainer and set its parameters
NaiveBayes nb = new NaiveBayes();

// train the model
NaiveBayesModel model = nb.fit(train);

// Select example rows to display.
Dataset<Row> predictions = model.transform(test);
predictions.show();

// compute accuracy on the test set
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test set accuracy = " + accuracy);
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaNaiveBayesExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Fit a Bernoulli naive Bayes model with spark.naiveBayes
titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
nbDF <- titanicDF
nbTestDF <- titanicDF
nbModel <- spark.naiveBayes(nbDF, Survived ~ Class + Sex + Age)

# Model summary
summary(nbModel)

# Prediction
nbPredictions <- predict(nbModel, nbTestDF)
head(nbPredictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/naiveBayes.R」にあります。

Factorization Machines 分類器

Factorization Machines の実装に関する背景情報と詳細については、Factorization Machines セクションを参照してください。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。爆発的な勾配の問題を防ぐために、特徴量を0から1の間にスケーリングします。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import FMClassifier
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMClassifier(labelCol="indexedLabel", featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[labelIndexer, featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = %g" % accuracy)

fmModel = model.stages[2]
print("Factors: " + str(fmModel.factors))  # type: ignore
print("Linear: " + str(fmModel.linear))  # type: ignore
print("Intercept: " + str(fmModel.intercept))  # type: ignore
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/fm_classifier_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.{FMClassificationModel, FMClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, MinMaxScaler, StringIndexer}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(data)
// Scale features.
val featureScaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a FM model.
val fm = new FMClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("scaledFeatures")
  .setStepSize(0.001)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labelsArray(0))

// Create a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, featureScaler, fm, labelConverter))

// Train model.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)

// Select (prediction, true label) and compute test accuracy.
val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol("indexedLabel")
  .setPredictionCol("prediction")
  .setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println(s"Test set accuracy = $accuracy")

val fmModel = model.stages(2).asInstanceOf[FMClassificationModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
  s"Intercept: ${fmModel.intercept}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/FMClassifierExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.FMClassificationModel;
import org.apache.spark.ml.classification.FMClassifier;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.*;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark
    .read()
    .format("libsvm")
    .load("data/mllib/sample_libsvm_data.txt");

// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
StringIndexerModel labelIndexer = new StringIndexer()
    .setInputCol("label")
    .setOutputCol("indexedLabel")
    .fit(data);
// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a FM model.
FMClassifier fm = new FMClassifier()
    .setLabelCol("indexedLabel")
    .setFeaturesCol("scaledFeatures")
    .setStepSize(0.001);

// Convert indexed labels back to original labels.
IndexToString labelConverter = new IndexToString()
    .setInputCol("prediction")
    .setOutputCol("predictedLabel")
    .setLabels(labelIndexer.labelsArray()[0]);

// Create a Pipeline.
Pipeline pipeline = new Pipeline()
    .setStages(new PipelineStage[] {labelIndexer, featureScaler, fm, labelConverter});

// Train model.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5);

// Select (prediction, true label) and compute test accuracy.
MulticlassClassificationEvaluator evaluator = new MulticlassClassificationEvaluator()
    .setLabelCol("indexedLabel")
    .setPredictionCol("prediction")
    .setMetricName("accuracy");
double accuracy = evaluator.evaluate(predictions);
System.out.println("Test Accuracy = " + accuracy);

FMClassificationModel fmModel = (FMClassificationModel)(model.stages()[2]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaFMClassifierExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

注意: 現在、SparkRは特徴量スケーリングをサポートしていません。

# Load training data
df <- read.df("data/mllib/sample_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a FM classification model
model <- spark.fmClassifier(training, label ~ features)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/fmClassifier.R」にあります。

回帰

線形回帰

線形回帰モデルとモデル概要を扱うインターフェースは、ロジスティック回帰の場合と同様です。

「l-bfgs」ソルバーで切片なしのLinearRegressionModelを定数非ゼロ列を持つデータセットで適合させる場合、Spark MLlibは定数非ゼロ列に対してゼロ係数を出力します。この動作は、Rのglmnetと同じですが、LIBSVMとは異なります。

以下の例は、弾性ネット正則化線形回帰モデルのトレーニングと、モデル概要統計の抽出方法を示しています。

パラメータの詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.regression import LinearRegression

# Load training data
training = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for linear regression
print("Coefficients: %s" % str(lrModel.coefficients))
print("Intercept: %s" % str(lrModel.intercept))

# Summarize the model over the training set and print out some metrics
trainingSummary = lrModel.summary
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/linear_regression_with_elastic_net.py」にあります。

パラメータの詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.regression.LinearRegression

// Load training data
val training = spark.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}")
println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
println(s"r2: ${trainingSummary.r2}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/LinearRegressionWithElasticNetExample.scala」にあります。

パラメータの詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.regression.LinearRegression;
import org.apache.spark.ml.regression.LinearRegressionModel;
import org.apache.spark.ml.regression.LinearRegressionTrainingSummary;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load training data.
Dataset<Row> training = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

LinearRegression lr = new LinearRegression()
  .setMaxIter(10)
  .setRegParam(0.3)
  .setElasticNetParam(0.8);

// Fit the model.
LinearRegressionModel lrModel = lr.fit(training);

// Print the coefficients and intercept for linear regression.
System.out.println("Coefficients: "
  + lrModel.coefficients() + " Intercept: " + lrModel.intercept());

// Summarize the model over the training set and print out some metrics.
LinearRegressionTrainingSummary trainingSummary = lrModel.summary();
System.out.println("numIterations: " + trainingSummary.totalIterations());
System.out.println("objectiveHistory: " + Vectors.dense(trainingSummary.objectiveHistory()));
trainingSummary.residuals().show();
System.out.println("RMSE: " + trainingSummary.rootMeanSquaredError());
System.out.println("r2: " + trainingSummary.r2());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaLinearRegressionWithElasticNetExample.java」にあります。

パラメータの詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a linear regression model
model <- spark.lm(training, label ~ features, regParam = 0.3, elasticNetParam = 0.8)

# Prediction
predictions <- predict(model, test)
head(predictions)

# Summarize
summary(model)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/lm_with_elastic_net.R」にあります。

一般化線形回帰

線形回帰では出力が正規分布に従うと仮定するのに対し、一般化線形モデル (GLM) は、応答変数 $Y_i$ が指数型分布族のいずれかの分布に従う線形モデルの仕様です。SparkのGeneralizedLinearRegressionインターフェースにより、線形回帰、ポアソン回帰、ロジスティック回帰など、さまざまな種類の予測問題に使用できるGLMを柔軟に指定できます。現在、spark.mlでは、指数型分布族の分布のサブセットのみがサポートされており、それらは以下にリストされています。

注意: Sparkは現在、GeneralizedLinearRegressionインターフェースを通じて最大4096の特徴量しかサポートしておらず、この制約を超えると例外がスローされます。詳細については、高度なセクションを参照してください。それでも、線形回帰およびロジスティック回帰については、特徴量数が増加したモデルはLinearRegressionおよびLogisticRegression推定器を使用してトレーニングできます。

GLMは、その「正規」または「自然」形式、別名自然指数型分布で記述できる指数型分布族を必要とします。自然指数型分布の形式は次のように与えられます。

\[f_Y(y|\theta, \tau) = h(y, \tau)\exp{\left( \frac{\theta \cdot y - A(\theta)}{d(\tau)} \right)}\]

ここで、$\theta$ は関心のあるパラメータ、$ \tau $ は分散パラメータです。GLMでは、応答変数 $Y_i$ は自然指数型分布から抽出されると仮定されます。

\[Y_i \sim f\left(\cdot|\theta_i, \tau \right)\]

ここで、関心のあるパラメータ $\theta_i$ は、応答変数の期待値 $\mu_i$ と次の関係があります。

\[\mu_i = A'(\theta_i)\]

ここでは、$A’(\theta_i)$ は選択された分布の形式によって定義されます。GLMはリンク関数も指定できます。これは、応答変数の期待値 $\mu_i$ といわゆる線形予測子 $\eta_i$ の関係を定義します。

\[g(\mu_i) = \eta_i = \vec{x_i}^T \cdot \vec{\beta}\]

多くの場合、リンク関数は $A’ = g^{-1}$ となるように選択されます。これにより、関心のあるパラメータ $\theta$ と線形予測子 $\eta$ の間の関係が単純化されます。この場合、リンク関数 $g(\mu)$ は「正規」リンク関数と呼ばれます。

\[\theta_i = A'^{-1}(\mu_i) = g(g^{-1}(\eta_i)) = \eta_i\]

GLMは、尤度関数を最大化する回帰係数 $\vec{\beta}$ を見つけます。

\[\max_{\vec{\beta}} \mathcal{L}(\vec{\theta}|\vec{y},X) = \prod_{i=1}^{N} h(y_i, \tau) \exp{\left(\frac{y_i\theta_i - A(\theta_i)}{d(\tau)}\right)}\]

ここで、関心のあるパラメータ $\theta_i$ は、回帰係数 $\vec{\beta}$ と次の関係があります。

\[\theta_i = A'^{-1}(g^{-1}(\vec{x_i} \cdot \vec{\beta}))\]

Sparkの一般化線形回帰インターフェースは、残差、p値、逸脱度、赤池情報量基準など、GLMモデルの適合を診断するための要約統計量も提供します。

こちらで、GLMとその応用に関するより包括的なレビューを参照してください。

利用可能なファミリー

ファミリー 応答タイプ サポートされているリンク
Gaussian (正規) 連続 Identity*、Log、Inverse
Binomial (二項) 二値 Logit*、Probit、CLogLog
Poisson (ポアソン) カウント Log*、Identity、Sqrt
Gamma 連続 Inverse*、Identity、Log
Tweedie (ツイーディー) ゼロインフレート連続 Power リンク関数
* 正規リンク

以下の例は、ガウス応答とアイデンティティリンク関数を持つGLMのトレーニングと、モデル概要統計の抽出方法を示しています。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.regression import GeneralizedLinearRegression

# Load training data
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_linear_regression_data.txt")

glr = GeneralizedLinearRegression(family="gaussian", link="identity", maxIter=10, regParam=0.3)

# Fit the model
model = glr.fit(dataset)

# Print the coefficients and intercept for generalized linear regression model
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))

# Summarize the model over the training set and print out some metrics
summary = model.summary
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))
print("Dispersion: " + str(summary.dispersion))
print("Null Deviance: " + str(summary.nullDeviance))
print("Residual Degree Of Freedom Null: " + str(summary.residualDegreeOfFreedomNull))
print("Deviance: " + str(summary.deviance))
print("Residual Degree Of Freedom: " + str(summary.residualDegreeOfFreedom))
print("AIC: " + str(summary.aic))
print("Deviance Residuals: ")
summary.residuals().show()
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/generalized_linear_regression_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.regression.GeneralizedLinearRegression

// Load training data
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt")

val glr = new GeneralizedLinearRegression()
  .setFamily("gaussian")
  .setLink("identity")
  .setMaxIter(10)
  .setRegParam(0.3)

// Fit the model
val model = glr.fit(dataset)

// Print the coefficients and intercept for generalized linear regression model
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")

// Summarize the model over the training set and print out some metrics
val summary = model.summary
println(s"Coefficient Standard Errors: ${summary.coefficientStandardErrors.mkString(",")}")
println(s"T Values: ${summary.tValues.mkString(",")}")
println(s"P Values: ${summary.pValues.mkString(",")}")
println(s"Dispersion: ${summary.dispersion}")
println(s"Null Deviance: ${summary.nullDeviance}")
println(s"Residual Degree Of Freedom Null: ${summary.residualDegreeOfFreedomNull}")
println(s"Deviance: ${summary.deviance}")
println(s"Residual Degree Of Freedom: ${summary.residualDegreeOfFreedom}")
println(s"AIC: ${summary.aic}")
println("Deviance Residuals: ")
summary.residuals().show()
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/GeneralizedLinearRegressionExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.ml.regression.GeneralizedLinearRegression;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionModel;
import org.apache.spark.ml.regression.GeneralizedLinearRegressionTrainingSummary;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Load training data
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_linear_regression_data.txt");

GeneralizedLinearRegression glr = new GeneralizedLinearRegression()
  .setFamily("gaussian")
  .setLink("identity")
  .setMaxIter(10)
  .setRegParam(0.3);

// Fit the model
GeneralizedLinearRegressionModel model = glr.fit(dataset);

// Print the coefficients and intercept for generalized linear regression model
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());

// Summarize the model over the training set and print out some metrics
GeneralizedLinearRegressionTrainingSummary summary = model.summary();
System.out.println("Coefficient Standard Errors: "
  + Arrays.toString(summary.coefficientStandardErrors()));
System.out.println("T Values: " + Arrays.toString(summary.tValues()));
System.out.println("P Values: " + Arrays.toString(summary.pValues()));
System.out.println("Dispersion: " + summary.dispersion());
System.out.println("Null Deviance: " + summary.nullDeviance());
System.out.println("Residual Degree Of Freedom Null: " + summary.residualDegreeOfFreedomNull());
System.out.println("Deviance: " + summary.deviance());
System.out.println("Residual Degree Of Freedom: " + summary.residualDegreeOfFreedom());
System.out.println("AIC: " + summary.aic());
System.out.println("Deviance Residuals: ");
summary.residuals().show();
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaGeneralizedLinearRegressionExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7, 3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Model summary
summary(gaussianGLM)

# Prediction
gaussianPredictions <- predict(gaussianGLM, gaussianTestDF)
head(gaussianPredictions)

# Fit a generalized linear model with glm (R-compliant)
gaussianGLM2 <- glm(label ~ features, gaussianDF, family = "gaussian")
summary(gaussianGLM2)

# Fit a generalized linear model of family "binomial" with spark.glm
training2 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
training2 <- transform(training2, label = cast(training2$label > 1, "integer"))
df_list2 <- randomSplit(training2, c(7, 3), 2)
binomialDF <- df_list2[[1]]
binomialTestDF <- df_list2[[2]]
binomialGLM <- spark.glm(binomialDF, label ~ features, family = "binomial")

# Model summary
summary(binomialGLM)

# Prediction
binomialPredictions <- predict(binomialGLM, binomialTestDF)
head(binomialPredictions)

# Fit a generalized linear model of family "tweedie" with spark.glm
training3 <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
tweedieDF <- transform(training3, label = training3$label * exp(randn(10)))
tweedieGLM <- spark.glm(tweedieDF, label ~ features, family = "tweedie",
                        var.power = 1.2, link.power = 0)

# Model summary
summary(tweedieGLM)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/glm.R」にあります。

決定木回帰

決定木は、分類および回帰手法の一般的なファミリです。spark.ml 実装に関する詳細については、決定木に関するセクションで確認できます。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。カテゴリ特徴量をインデックス付けする特徴量トランスフォーマーを使用し、決定木アルゴリズムが認識できるメタデータをDataFrameに追加します。

パラメータの詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeRegressor(featuresCol="indexedFeatures")

# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, dt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

treeModel = model.stages[1]
# summary only
print(treeModel)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/decision_tree_regression_example.py」にあります。

パラメータの詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor

// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Here, we treat features with > 4 distinct values as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a DecisionTree model.
val dt = new DecisionTreeRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and tree in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, dt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val treeModel = model.stages(1).asInstanceOf[DecisionTreeRegressionModel]
println(s"Learned regression tree model:\n ${treeModel.toDebugString}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/DecisionTreeRegressionExample.scala」にあります。

パラメータの詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.DecisionTreeRegressionModel;
import org.apache.spark.ml.regression.DecisionTreeRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load the data stored in LIBSVM format as a DataFrame.
Dataset<Row> data = spark.read().format("libsvm")
  .load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[]{0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a DecisionTree model.
DecisionTreeRegressor dt = new DecisionTreeRegressor()
  .setFeaturesCol("indexedFeatures");

// Chain indexer and tree in a Pipeline.
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[]{featureIndexer, dt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

DecisionTreeRegressionModel treeModel =
  (DecisionTreeRegressionModel) (model.stages()[1]);
System.out.println("Learned regression tree model:\n" + treeModel.toDebugString());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaDecisionTreeRegressionExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a DecisionTree regression model with spark.decisionTree
model <- spark.decisionTree(training, label ~ features, "regression")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/decisionTree.R」にあります。

ランダムフォレスト回帰

ランダムフォレストは、分類および回帰手法の一般的なファミリです。spark.ml 実装に関する詳細については、ランダムフォレストに関するセクションで確認できます。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。カテゴリ特徴量をインデックス付けする特徴量トランスフォーマーを使用し、ツリーベースのアルゴリズムが認識できるメタデータをDataFrameに追加します。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures")

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/random_forest_regressor_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a RandomForest model.
val rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")

// Chain indexer and forest in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, rf))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val rfModel = model.stages(1).asInstanceOf[RandomForestRegressionModel]
println(s"Learned regression forest model:\n ${rfModel.toDebugString}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/RandomForestRegressorExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.RandomForestRegressionModel;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing)
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a RandomForest model.
RandomForestRegressor rf = new RandomForestRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures");

// Chain indexer and forest in a Pipeline
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {featureIndexer, rf});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

RandomForestRegressionModel rfModel = (RandomForestRegressionModel)(model.stages()[1]);
System.out.println("Learned regression forest model:\n" + rfModel.toDebugString());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a random forest regression model with spark.randomForest
model <- spark.randomForest(training, label ~ features, "regression", numTrees = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/randomForest.R」にあります。

勾配ブースティング木回帰

勾配ブースティング木 (GBT) は、決定木のアンサンブルを使用した一般的な回帰手法です。spark.ml 実装に関する詳細については、GBT に関するセクションで確認できます。

注意: このサンプルデータセットの場合、GBTRegressor は実際には1回のイテレーションしか必要としませんが、一般的にはそうではありません。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a GBT model.
gbt = GBTRegressor(featuresCol="indexedFeatures", maxIter=10)

# Chain indexer and GBT in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, gbt])

# Train model.  This also runs the indexer.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

gbtModel = model.stages[1]
print(gbtModel)  # summary only
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/gradient_boosted_tree_regressor_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.VectorIndexer
import org.apache.spark.ml.regression.{GBTRegressionModel, GBTRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
val featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a GBT model.
val gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10)

// Chain indexer and GBT in a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureIndexer, gbt))

// Train model. This also runs the indexer.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val gbtModel = model.stages(1).asInstanceOf[GBTRegressionModel]
println(s"Learned regression GBT model:\n ${gbtModel.toDebugString}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/GradientBoostedTreeRegressorExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.ml.regression.GBTRegressionModel;
import org.apache.spark.ml.regression.GBTRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Automatically identify categorical features, and index them.
// Set maxCategories so features with > 4 distinct values are treated as continuous.
VectorIndexerModel featureIndexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexedFeatures")
  .setMaxCategories(4)
  .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a GBT model.
GBTRegressor gbt = new GBTRegressor()
  .setLabelCol("label")
  .setFeaturesCol("indexedFeatures")
  .setMaxIter(10);

// Chain indexer and GBT in a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureIndexer, gbt});

// Train model. This also runs the indexer.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

GBTRegressionModel gbtModel = (GBTRegressionModel)(model.stages()[1]);
System.out.println("Learned regression GBT model:\n" + gbtModel.toDebugString());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaGradientBoostedTreeRegressorExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training <- df
test <- df

# Fit a GBT regression model with spark.gbt
model <- spark.gbt(training, label ~ features, "regression", maxIter = 10)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/gbt.R」にあります。

生存時間回帰

spark.mlでは、加速故障時間 (AFT) モデルを実装しています。これは、打ち切りデータのためのパラメータ生存回帰モデルです。これは生存時間分析のための対数線形モデルと呼ばれることもあります。同様の目的で設計された比例ハザードモデルとは異なり、AFTモデルは、各インスタンスが目的関数に独立して寄与するため、並列化が容易です。

共変量 $x^{‘}$ の値が与えられた場合、被験者 $i = 1, …, n$ のランダム生存時間 $t_{i}$ (右側打ち切りが発生する可能性あり) の下でのAFTモデルの尤度関数は次のように与えられます。\[ L(\beta,\sigma)=\prod_{i=1}^n[\frac{1}{\sigma}f_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})]^{\delta_{i}}S_{0}(\frac{\log{t_{i}}-x^{'}\beta}{\sigma})^{1-\delta_{i}} \] ここで、$\delta_{i}$ はイベントが発生した (つまり、打ち切られていないか) かどうかを示す指標です。 $\epsilon_{i}=\frac{\log{t_{i}}-x^{‘}\beta}{\sigma}$ を使用すると、対数尤度関数は次の形式になります。\[ \iota(\beta,\sigma)=\sum_{i=1}^{n}[-\delta_{i}\log\sigma+\delta_{i}\log{f_{0}}(\epsilon_{i})+(1-\delta_{i})\log{S_{0}(\epsilon_{i})}] \] ここで、$S_{0}(\epsilon_{i})$ はベースライン生存関数、$f_{0}(\epsilon_{i})$ は対応する密度関数です。

最も一般的に使用されるAFTモデルは、ワイブル分布の生存時間に基づいています。生存時間のワイブル分布は、生存時間の対数に対する極値分布に対応し、$S_{0}(\epsilon)$ 関数は次のようになります。\[ S_{0}(\epsilon_{i})=\exp(-e^{\epsilon_{i}}) \] $f_{0}(\epsilon_{i})$ 関数は次のようになります。\[ f_{0}(\epsilon_{i})=e^{\epsilon_{i}}\exp(-e^{\epsilon_{i}}) \] ワイブル分布の生存時間を持つAFTモデルの対数尤度関数は次のようになります。\[ \iota(\beta,\sigma)= -\sum_{i=1}^n[\delta_{i}\log\sigma-\delta_{i}\epsilon_{i}+e^{\epsilon_{i}}] \] 負の対数尤度を最小化することは、最大事後確率に相当するため、最適化に使用する損失関数は $-\iota(\beta,\sigma)$ です。 $\beta$ および $\log\sigma$ の勾配関数はそれぞれ次のようになります。\[ \frac{\partial (-\iota)}{\partial \beta}=\sum_{1=1}^{n}[\delta_{i}-e^{\epsilon_{i}}]\frac{x_{i}}{\sigma} \] \[ \frac{\partial (-\iota)}{\partial (\log\sigma)}=\sum_{i=1}^{n}[\delta_{i}+(\delta_{i}-e^{\epsilon_{i}})\epsilon_{i}] \]

AFTモデルは、凸最適化問題として定式化できます。つまり、係数ベクトル $\beta$ とスケールパラメータ $\log\sigma$ に依存する凸関数 $-\iota(\beta,\sigma)$ の最小化を見つけるタスクです。実装の基盤となる最適化アルゴリズムはL-BFGSです。この実装は、Rの生存関数survregの結果と一致します。

切片なしのAFTSurvivalRegressionModelを定数非ゼロ列を持つデータセットで適合させる場合、Spark MLlibは定数非ゼロ列に対してゼロ係数を出力します。この動作は、Rのsurvival::survregとは異なります。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml.regression import AFTSurvivalRegression
from pyspark.ml.linalg import Vectors

training = spark.createDataFrame([
    (1.218, 1.0, Vectors.dense(1.560, -0.605)),
    (2.949, 0.0, Vectors.dense(0.346, 2.158)),
    (3.627, 0.0, Vectors.dense(1.380, 0.231)),
    (0.273, 1.0, Vectors.dense(0.520, 1.151)),
    (4.199, 0.0, Vectors.dense(0.795, -0.226))], ["label", "censor", "features"])
quantileProbabilities = [0.3, 0.6]
aft = AFTSurvivalRegression(quantileProbabilities=quantileProbabilities,
                            quantilesCol="quantiles")

model = aft.fit(training)

# Print the coefficients, intercept and scale parameter for AFT survival regression
print("Coefficients: " + str(model.coefficients))
print("Intercept: " + str(model.intercept))
print("Scale: " + str(model.scale))
model.transform(training).show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/aft_survival_regression.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.regression.AFTSurvivalRegression

val training = spark.createDataFrame(Seq(
  (1.218, 1.0, Vectors.dense(1.560, -0.605)),
  (2.949, 0.0, Vectors.dense(0.346, 2.158)),
  (3.627, 0.0, Vectors.dense(1.380, 0.231)),
  (0.273, 1.0, Vectors.dense(0.520, 1.151)),
  (4.199, 0.0, Vectors.dense(0.795, -0.226))
)).toDF("label", "censor", "features")
val quantileProbabilities = Array(0.3, 0.6)
val aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles")

val model = aft.fit(training)

// Print the coefficients, intercept and scale parameter for AFT survival regression
println(s"Coefficients: ${model.coefficients}")
println(s"Intercept: ${model.intercept}")
println(s"Scale: ${model.scale}")
model.transform(training).show(false)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/AFTSurvivalRegressionExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.regression.AFTSurvivalRegression;
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(1.218, 1.0, Vectors.dense(1.560, -0.605)),
  RowFactory.create(2.949, 0.0, Vectors.dense(0.346, 2.158)),
  RowFactory.create(3.627, 0.0, Vectors.dense(1.380, 0.231)),
  RowFactory.create(0.273, 1.0, Vectors.dense(0.520, 1.151)),
  RowFactory.create(4.199, 0.0, Vectors.dense(0.795, -0.226))
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("censor", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(data, schema);
double[] quantileProbabilities = new double[]{0.3, 0.6};
AFTSurvivalRegression aft = new AFTSurvivalRegression()
  .setQuantileProbabilities(quantileProbabilities)
  .setQuantilesCol("quantiles");

AFTSurvivalRegressionModel model = aft.fit(training);

// Print the coefficients, intercept and scale parameter for AFT survival regression
System.out.println("Coefficients: " + model.coefficients());
System.out.println("Intercept: " + model.intercept());
System.out.println("Scale: " + model.scale());
model.transform(training).show(false);
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaAFTSurvivalRegressionExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

# Use the ovarian dataset available in R survival package
library(survival)

# Fit an accelerated failure time (AFT) survival regression model with spark.survreg
ovarianDF <- suppressWarnings(createDataFrame(ovarian))
aftDF <- ovarianDF
aftTestDF <- ovarianDF
aftModel <- spark.survreg(aftDF, Surv(futime, fustat) ~ ecog_ps + rx)

# Model summary
summary(aftModel)

# Prediction
aftPredictions <- predict(aftModel, aftTestDF)
head(aftPredictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/survreg.R」にあります。

単調回帰

等張性回帰は、回帰アルゴリズムのファミリに属します。形式的には、等張性回帰は、観測された応答を表す有限個の実数 $Y = {y_1, y_2, ..., y_n}$ と、適合される未知の応答値 $X = {x_1, x_2, ..., x_n}$ が与えられた場合に、関数を見つける問題です。

\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}

完全順序 $x_1\le x_2\le ...\le x_n$ で、$w_i$ は正の重みです。結果の関数は等張性回帰と呼ばれ、一意です。これは順序制約の下での最小二乗問題と見なすことができます。本質的に、等張性回帰は元のデータポイントに最もよく適合する単調関数です。

隣接違反者プールアルゴリズムを実装しており、これは等張性回帰の並列化のアプローチを使用しています。トレーニング入力は、ラベル、特徴量、および重みの3つの列を含むDataFrameです。さらに、IsotonicRegressionアルゴリズムには、$isotonic$ という名前のオプションパラメータが1つあり、デフォルトはtrueです。この引数は、等張性回帰が等張性 (単調増加) であるか、または逆等張性 (単調減少) であるかを指定します。

トレーニングにより、既知および未知の特徴量の両方に対してラベルを予測するために使用できるIsotonicRegressionModelが返されます。等張性回帰の結果は、区分線形関数として扱われます。したがって、予測のルールは次のようになります。

APIに関する詳細については、IsotonicRegression Python ドキュメントを参照してください。

from pyspark.ml.regression import IsotonicRegression

# Loads data.
dataset = spark.read.format("libsvm")\
    .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

# Trains an isotonic regression model.
model = IsotonicRegression().fit(dataset)
print("Boundaries in increasing order: %s\n" % str(model.boundaries))
print("Predictions associated with the boundaries: %s\n" % str(model.predictions))

# Makes predictions.
model.transform(dataset).show()
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/isotonic_regression_example.py」にあります。

APIに関する詳細については、IsotonicRegression Scala ドキュメントを参照してください。

import org.apache.spark.ml.regression.IsotonicRegression

// Loads data.
val dataset = spark.read.format("libsvm")
  .load("data/mllib/sample_isotonic_regression_libsvm_data.txt")

// Trains an isotonic regression model.
val ir = new IsotonicRegression()
val model = ir.fit(dataset)

println(s"Boundaries in increasing order: ${model.boundaries}\n")
println(s"Predictions associated with the boundaries: ${model.predictions}\n")

// Makes predictions.
model.transform(dataset).show()
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/IsotonicRegressionExample.scala」にあります。

APIに関する詳細については、IsotonicRegression Java ドキュメントを参照してください。

import org.apache.spark.ml.regression.IsotonicRegression;
import org.apache.spark.ml.regression.IsotonicRegressionModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Loads data.
Dataset<Row> dataset = spark.read().format("libsvm")
  .load("data/mllib/sample_isotonic_regression_libsvm_data.txt");

// Trains an isotonic regression model.
IsotonicRegression ir = new IsotonicRegression();
IsotonicRegressionModel model = ir.fit(dataset);

System.out.println("Boundaries in increasing order: " + model.boundaries() + "\n");
System.out.println("Predictions associated with the boundaries: " + model.predictions() + "\n");

// Makes predictions.
model.transform(dataset).show();
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaIsotonicRegressionExample.java」にあります。

APIに関する詳細については、IsotonicRegression R API ドキュメントを参照してください。

# Load training data
df <- read.df("data/mllib/sample_isotonic_regression_libsvm_data.txt", source = "libsvm")
training <- df
test <- df

# Fit an isotonic regression model with spark.isoreg
model <- spark.isoreg(training, label ~ features, isotonic = FALSE)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/isoreg.R」にあります。

Factorization Machines 回帰

Factorization Machines の実装に関する背景情報と詳細については、Factorization Machines セクションを参照してください。

以下の例では、LibSVM形式のデータセットをロードし、トレーニングセットとテストセットに分割し、最初のデータセットでトレーニングしてから、保持されたテストセットで評価します。爆発的な勾配の問題を防ぐために、特徴量を0から1の間にスケーリングします。

詳細については、Python API ドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.regression import FMRegressor
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Scale features.
featureScaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures").fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a FM model.
fm = FMRegressor(featuresCol="scaledFeatures", stepSize=0.001)

# Create a Pipeline.
pipeline = Pipeline(stages=[featureScaler, fm])

# Train model.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

fmModel = model.stages[1]
print("Factors: " + str(fmModel.factors))  # type: ignore
print("Linear: " + str(fmModel.linear))  # type: ignore
print("Intercept: " + str(fmModel.intercept))  # type: ignore
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/ml/fm_regressor_example.py」にあります。

詳細については、Scala API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.regression.{FMRegressionModel, FMRegressor}

// Load and parse the data file, converting it to a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

// Scale features.
val featureScaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .fit(data)

// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

// Train a FM model.
val fm = new FMRegressor()
  .setLabelCol("label")
  .setFeaturesCol("scaledFeatures")
  .setStepSize(0.001)

// Create a Pipeline.
val pipeline = new Pipeline()
  .setStages(Array(featureScaler, fm))

// Train model.
val model = pipeline.fit(trainingData)

// Make predictions.
val predictions = model.transform(testData)

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5)

// Select (prediction, true label) and compute test error.
val evaluator = new RegressionEvaluator()
  .setLabelCol("label")
  .setPredictionCol("prediction")
  .setMetricName("rmse")
val rmse = evaluator.evaluate(predictions)
println(s"Root Mean Squared Error (RMSE) on test data = $rmse")

val fmModel = model.stages(1).asInstanceOf[FMRegressionModel]
println(s"Factors: ${fmModel.factors} Linear: ${fmModel.linear} " +
  s"Intercept: ${fmModel.intercept}")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/FMRegressorExample.scala」にあります。

詳細については、Java API ドキュメントを参照してください。

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.regression.FMRegressionModel;
import org.apache.spark.ml.regression.FMRegressor;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

// Load and parse the data file, converting it to a DataFrame.
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

// Scale features.
MinMaxScalerModel featureScaler = new MinMaxScaler()
    .setInputCol("features")
    .setOutputCol("scaledFeatures")
    .fit(data);

// Split the data into training and test sets (30% held out for testing).
Dataset<Row>[] splits = data.randomSplit(new double[] {0.7, 0.3});
Dataset<Row> trainingData = splits[0];
Dataset<Row> testData = splits[1];

// Train a FM model.
FMRegressor fm = new FMRegressor()
    .setLabelCol("label")
    .setFeaturesCol("scaledFeatures")
    .setStepSize(0.001);

// Create a Pipeline.
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] {featureScaler, fm});

// Train model.
PipelineModel model = pipeline.fit(trainingData);

// Make predictions.
Dataset<Row> predictions = model.transform(testData);

// Select example rows to display.
predictions.select("prediction", "label", "features").show(5);

// Select (prediction, true label) and compute test error.
RegressionEvaluator evaluator = new RegressionEvaluator()
    .setLabelCol("label")
    .setPredictionCol("prediction")
    .setMetricName("rmse");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root Mean Squared Error (RMSE) on test data = " + rmse);

FMRegressionModel fmModel = (FMRegressionModel)(model.stages()[1]);
System.out.println("Factors: " + fmModel.factors());
System.out.println("Linear: " + fmModel.linear());
System.out.println("Intercept: " + fmModel.intercept());
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaFMRegressorExample.java」にあります。

詳細については、R API ドキュメントを参照してください。

注意: 現在、SparkRは特徴量スケーリングをサポートしていません。

# Load training data
df <- read.df("data/mllib/sample_linear_regression_data.txt", source = "libsvm")
training_test <- randomSplit(df, c(0.7, 0.3))
training <- training_test[[1]]
test <- training_test[[2]]

# Fit a FM regression model
model <- spark.fmRegressor(training, label ~ features)

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/ml/fmRegressor.R」にあります。

線形手法

ロジスティック回帰や線形最小二乗法などの一般的な線形手法を、$L_1$ または $L_2$ 正則化とともに実装しています。実装とチューニングの詳細については、RDDベースAPIの線形手法ガイドを参照してください。この情報は依然として関連性があります。

Elastic net、つまり $L_1$ 正則化と $L_2$ 正則化のハイブリッドのDataFrame APIも含まれています。これは、Zou et al, Regularization and variable selection via the elastic netで提案されたものです。数学的には、 $L_1$ および $L_2$ 正則化項の凸結合として定義されます:\[ \alpha \left( \lambda \|\wv\|_1 \right) + (1-\alpha) \left( \frac{\lambda}{2}\|\wv\|_2^2 \right) , \alpha \in [0, 1], \lambda \geq 0 \] $\alpha$ を適切に設定することで、弾性ネットは $L_1$ および $L_2$ 正則化の両方を特殊なケースとして含みます。たとえば、線形回帰モデルが弾性ネットパラメータ $\alpha$ を $1$ に設定してトレーニングされた場合、Lassoモデルと同等です。一方、$\alpha$ が $0$ に設定された場合、トレーニングされたモデルはリッジ回帰モデルに還元されます。弾性ネット正則化を使用した線形回帰とロジスティック回帰の両方に対して、パイプラインAPIを実装しています。

Factorization Machines

Factorization Machinesは、広告やレコメンデーションシステムのような非常にスパースな問題でも、特徴量間の相互作用を推定できます。spark.ml 実装は、二項分類と回帰のためにFactorization Machinesをサポートしています。

FMの式は次のとおりです。

\[\hat{y} = w_0 + \sum\limits^n_{i-1} w_i x_i + \sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j\]

最初の2つの項は切片と線形項 (線形回帰と同様) を表し、最後の項はペアワイズ相互作用項を表します。 \(v_i\) は、k個の因子を持つi番目の変数を記述します。

FMは回帰に使用でき、最適化基準は平均二乗誤差です。FMは、シグモイド関数を介して二項分類にも使用できます。最適化基準はロジスティック損失です。

ペアワイズ相互作用は次のように書き換えることができます。

\[\sum\limits^n_{i=1} \sum\limits^n_{j=i+1} \langle v_i, v_j \rangle x_i x_j = \frac{1}{2}\sum\limits^k_{f=1} \left(\left( \sum\limits^n_{i=1}v_{i,f}x_i \right)^2 - \sum\limits^n_{i=1}v_{i,f}^2x_i^2 \right)\]

この方程式は、$k$ と $n$ の両方で線形複雑性のみを持ちます。つまり、計算は \(O(kn)\) です。

一般的に、爆発的な勾配の問題を防ぐために、連続特徴量を0から1の間にスケーリングするか、連続特徴量をビンに分割してワンホットエンコーディングすることが最善です。

決定木

決定木とそのアンサンブルは、分類と回帰の機械学習タスクで一般的な手法です。決定木は、解釈が容易で、カテゴリ特徴量を処理でき、多クラス分類設定に拡張でき、特徴量スケーリングを必要とせず、非線形性や特徴量間の相互作用を捉えることができるため、広く使用されています。ランダムフォレストやブースティングなどのツリーアンサンブルアルゴリズムは、分類および回帰タスクでトップパフォーマーの1つです。

spark.ml実装は、連続特徴量とカテゴリ特徴量の両方を使用して、二項および多クラス分類、および回帰のための決定木をサポートしています。この実装はデータを行ごとにパーティション分割するため、数百万または数十億のインスタンスでの分散トレーニングが可能です。

ユーザーは、決定木アルゴリズムに関する詳細をMLlib決定木ガイドで確認できます。このAPIと元のMLlib決定木APIの主な違いは次のとおりです。

決定木のためのパイプラインAPIは、元のAPIよりもわずかに多くの機能を提供します。特に、分類の場合は、ユーザーは各クラスの予測確率 (別名クラス条件付き確率) を取得できます。回帰の場合は、ユーザーは予測のバイアス付きサンプル分散を取得できます。

ツリーのアンサンブル (ランダムフォレストと勾配ブースティング木) については、以下にあるツリーアンサンブルセクションで説明します。

入力と出力

ここでは、入力および出力 (予測) 列のタイプをリストします。すべての出力列はオプションです。出力列を除外するには、対応するParamを空の文字列に設定します。

入力列

Param名 タイプ デフォルト 説明
labelCol Double "label" 予測するラベル
featuresCol Vector "features" 特徴量ベクトル

出力列

Param名 タイプ デフォルト 説明 注記
predictionCol Double "prediction" 予測されたラベル
rawPredictionCol Vector "rawPrediction" 予測を行うツリーノードにおけるトレーニングインスタンスラベルのカウントを含む、クラス数長のベクトル 分類のみ
probabilityCol Vector "probability" rawPredictionを多項分布に正規化したクラス数長のベクトル 分類のみ
varianceCol Double 予測のバイアス付きサンプル分散 回帰のみ

ツリーアンサンブル

DataFrame APIは、2つの主要なツリーアンサンブルアルゴリズムをサポートしています。ランダムフォレスト勾配ブースティング木 (GBT)です。どちらもベースモデルとしてspark.ml決定木を使用します。

アンサンブルアルゴリズムに関する詳細については、MLlibアンサンブルガイドで確認できます。ここでは、アンサンブルのDataFrame APIを示します。

このAPIと元のMLlibアンサンブルAPIの主な違いは次のとおりです。

ランダムフォレスト

ランダムフォレストは、決定木のアンサンブルです。ランダムフォレストは、過学習のリスクを軽減するために多数の決定木を組み合わせます。spark.ml実装は、連続特徴量とカテゴリ特徴量の両方を使用して、二項および多クラス分類、および回帰のためのランダムフォレストをサポートしています。

アルゴリズム自体の詳細については、spark.mllib のランダムフォレストに関するドキュメントを参照してください。

入力と出力

ここでは、入力および出力 (予測) 列のタイプをリストします。すべての出力列はオプションです。出力列を除外するには、対応するParamを空の文字列に設定します。

入力列

Param名 タイプ デフォルト 説明
labelCol Double "label" 予測するラベル
featuresCol Vector "features" 特徴量ベクトル

出力列 (予測)

Param名 タイプ デフォルト 説明 注記
predictionCol Double "prediction" 予測されたラベル
rawPredictionCol Vector "rawPrediction" 予測を行うツリーノードにおけるトレーニングインスタンスラベルのカウントを含む、クラス数長のベクトル 分類のみ
probabilityCol Vector "probability" rawPredictionを多項分布に正規化したクラス数長のベクトル 分類のみ

勾配ブースティング木 (GBT)

勾配ブースティング木 (GBT)は、決定木のアンサンブルです。GBTは、損失関数を最小化するために、決定木を反復的にトレーニングします。spark.ml実装は、連続特徴量とカテゴリ特徴量の両方を使用して、二項分類と回帰のためのGBTをサポートしています。

アルゴリズム自体の詳細については、spark.mllib の GBT に関するドキュメントを参照してください。

入力と出力

ここでは、入力および出力 (予測) 列のタイプをリストします。すべての出力列はオプションです。出力列を除外するには、対応するParamを空の文字列に設定します。

入力列

Param名 タイプ デフォルト 説明
labelCol Double "label" 予測するラベル
featuresCol Vector "features" 特徴量ベクトル

現在、GBTClassifier は二項ラベルのみをサポートしていることに注意してください。

出力列 (予測)

Param名 タイプ デフォルト 説明 注記
predictionCol Double "prediction" 予測されたラベル

将来的には、GBTClassifierRandomForestClassifier と同様に、rawPrediction および probability の列を出力するようになります。