線形手法 - RDDベースAPI

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数学的定式化

多くの標準的な*機械学習*手法は、凸最適化問題として定式化できます。つまり、変数ベクトル $\wv$(コードではweightsと呼ばれる)に依存する凸関数 $f$ の最小値を見つけるタスクであり、このベクトルは $d$ 個のエントリを持ちます。形式的には、これを最適化問題 $\min_{\wv \in\R^d} \; f(\wv)$ として記述できます。ここで、目的関数は $f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ .$ の形式です。ここで、ベクトル $\x_i\in\R^d$ は学習データ例であり、$1\le i\le n$、そして $y_i\in\R$ はそれに対応するラベルであり、予測したい値です。 $L(\wv; \x, y)$ が $\wv^T x$ と $y$ の関数として表現できる場合、その手法を*線形*と呼びます。 spark.mllib の分類および回帰アルゴリズムのいくつかは、このカテゴリに属し、ここで説明します。

目的関数 $f$ は2つの部分からなります。モデルの複雑さを制御する正則化項と、学習データにおけるモデルのエラーを測定する損失です。損失関数 $L(\wv;.)$ は通常、$\wv$ に関する凸関数です。固定の正則化パラメータ $\lambda \ge 0$ (コードではregParam)は、損失の最小化(つまり、学習誤差)とモデル複雑さの最小化(つまり、過学習の回避)という2つの目標間のトレードオフを定義します。

損失関数

spark.mllib がサポートする手法について、損失関数とその勾配または劣勾配をまとめた以下の表を示します。

損失関数 $L(\wv; \x, y)$勾配または劣勾配
ヒンジ損失$\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ $\begin{cases}-y \cdot \x & \text{if $y \wv^T \x <1$}, \\ 0 & \text{otherwise}.\end{cases}$
ロジスティック損失$\log(1+\exp( -y \wv^T \x)), \quad y \in \{-1, +1\}$ $-y \left(1-\frac1{1+\exp(-y \wv^T \x)} \right) \cdot \x$
二乗損失$\frac{1}{2} (\wv^T \x - y)^2, \quad y \in \R$ $(\wv^T \x - y) \cdot \x$

上記の数学的定式化では、二値ラベル $y$ は、便宜上 $+1$(陽性)または $-1$(陰性)として表されています。*しかし*、多クラスラベリングとの一貫性を保つため、spark.mllib では陰性ラベルは $-1$ の代わりに $0$ で表されます。

正則化項

正則化項の目的は、単純なモデルを奨励し、過学習を回避することです。spark.mllib でサポートしている正則化項は以下の通りです。

正則化項 $R(\wv)$勾配または劣勾配
ゼロ(正則化なし)0$\0$
L2$\frac{1}{2}\|\wv\|_2^2$$\wv$
L1$\|\wv\|_1$$\mathrm{sign}(\wv)$
Elastic net$\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$$\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$

ここで、$\mathrm{sign}(\wv)$ は $\wv$ のすべてのエントリの符号($\pm1$)からなるベクトルです。

L2正則化された問題は、滑らかさのためにL1正則化された問題よりも一般的に解きやすいです。しかし、L1正則化は重みのスパース性を促進するのに役立ち、より小さく解釈しやすいモデルにつながります。後者は特徴選択に役立つ可能性があります。Elastic netはL1およびL2正則化の組み合わせです。特に学習例の数が少ない場合は、正則化なしでモデルを学習することは推奨されません。

最適化

内部的には、線形手法は凸最適化手法を使用して目的関数を最適化します。spark.mllib は、最適化セクションで説明されているSGDとL-BFGSの2つの手法を使用します。現在、ほとんどのアルゴリズムAPIは確率的勾配降下法(SGD)をサポートしており、一部はL-BFGSをサポートしています。最適化手法の選択に関するガイドラインについては、この最適化セクションを参照してください。

分類

分類は、項目をカテゴリに分割することを目的としています。最も一般的な分類タイプは二値分類であり、2つのカテゴリがあり、通常は陽性および陰性と呼ばれます。カテゴリが2つより多い場合は、多クラス分類と呼ばれます。spark.mllib は、分類のための2つの線形手法をサポートしています。線形サポートベクターマシン(SVM)とロジスティック回帰です。線形SVMは二値分類のみをサポートしますが、ロジスティック回帰は二値および多クラス分類問題の両方をサポートします。両方の手法について、spark.mllib はL1およびL2正則化されたバリアントをサポートしています。学習データセットは、MLlibのLabeledPointのRDDで表され、ラベルは0から始まるクラスインデックスです:$0, 1, 2, \ldots$。

線形サポートベクターマシン(SVM)

線形SVMは、大規模な分類タスクの標準的な手法です。これは、上記の式$\eqref{eq:regPrimal}$で説明されている線形手法であり、定式化における損失関数はヒンジ損失によって与えられます。

\[ L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}. \] デフォルトでは、線形SVMはL2正則化で学習されます。L1正則化もサポートしています。この場合、問題は線形計画法になります。

線形SVMアルゴリズムはSVMモデルを出力します。新しいデータポイント $\x$ が与えられた場合、モデルは $\wv^T \x$ の値に基づいて予測を行います。デフォルトでは、$\wv^T \x \geq 0$ の場合、結果は陽性であり、それ以外の場合は陰性です。

以下の例は、サンプルデータセットのロード方法、SVMモデルの構築方法、および結果のモデルで予測を行い学習誤差を計算する方法を示しています。

APIの詳細については、SVMWithSGD Python ドキュメントおよびSVMModel Python ドキュメントを参照してください。

from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/svm_with_sgd_example.py」にあります。

以下のコードスニペットは、サンプルデータセットのロード方法、アルゴリズムオブジェクトの静的メソッドを使用してこの学習データで学習アルゴリズムを実行する方法、および結果のモデルで予測を行い学習誤差を計算する方法を示しています。

APIの詳細については、SVMWithSGD Scala ドキュメントおよびSVMModel Scala ドキュメントを参照してください。

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println(s"Area under ROC = $auROC")

// Save and load model
model.save(sc, "target/tmp/scalaSVMWithSGDModel")
val sameModel = SVMModel.load(sc, "target/tmp/scalaSVMWithSGDModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/SVMWithSGDExample.scala」にあります。

SVMWithSGD.train() メソッドは、デフォルトで正則化パラメータを1.0に設定してL2正則化を実行します。このアルゴリズムを設定したい場合は、新しいオブジェクトを直接作成し、セッターメソッドを呼び出すことで SVMWithSGD をさらにカスタマイズできます。spark.mllib の他のすべてのアルゴリズムも同様の方法でカスタマイズをサポートしています。たとえば、以下のコードは、正則化パラメータを0.1に設定したL1正則化バリアントのSVMを生成し、学習アルゴリズムを200イテレーション実行します。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)

MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaでインポートして呼び出すのと同じ方法でJavaでインポートして呼び出すことができます。唯一の注意点は、メソッドがScala RDDオブジェクトを受け取るのに対し、Spark Java APIは別のJavaRDDクラスを使用することです。Java RDDをScala RDDに変換するには、JavaRDDオブジェクトで.rdd()を呼び出します。Scalaで提供される例と同等のスタンドアロンアプリケーションの例を以下に示します。

APIの詳細については、SVMWithSGD Java ドキュメントおよびSVMModel Java ドキュメントを参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);

// Run training algorithm to build the model.
int numIterations = 100;
SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();

System.out.println("Area under ROC = " + auROC);

// Save and load model
model.save(sc, "target/tmp/javaSVMWithSGDModel");
SVMModel sameModel = SVMModel.load(sc, "target/tmp/javaSVMWithSGDModel");
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java」にあります。

SVMWithSGD.train() メソッドは、デフォルトで正則化パラメータを1.0に設定してL2正則化を実行します。このアルゴリズムを設定したい場合は、新しいオブジェクトを直接作成し、セッターメソッドを呼び出すことで SVMWithSGD をさらにカスタマイズできます。spark.mllib の他のすべてのアルゴリズムも同様の方法でカスタマイズをサポートしています。たとえば、以下のコードは、正則化パラメータを0.1に設定したL1正則化バリアントのSVMを生成し、学習アルゴリズムを200イテレーション実行します。

import org.apache.spark.mllib.optimization.L1Updater;

SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater());
SVMModel modelL1 = svmAlg.run(training.rdd());

上記のアプリケーションを実行するには、Sparkクイックスタートガイドの自己完結型アプリケーションセクションに記載されている手順に従ってください。ビルドファイルにspark-mllibを依存関係として含めることも忘れないでください。

ロジスティック回帰

\[ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). \]

二値分類問題の場合、アルゴリズムは二値ロジスティック回帰モデルを出力します。新しいデータポイント $\x$ が与えられた場合、モデルはロジスティック関数\[ \mathrm{f}(z) = \frac{1}{1 + e^{-z}} \] を適用して予測を行います。ここで、$z = \wv^T \x$ です。デフォルトでは、$\mathrm{f}(\wv^T x) > 0.5$ の場合、結果は陽性であり、それ以外の場合は陰性です。ただし、線形SVMとは異なり、ロジスティック回帰モデルの生の出力 $\mathrm{f}(z)$ は、確率的解釈(つまり、$\x$ が陽性である確率)を持ちます。

二値ロジスティック回帰は、多項ロジスティック回帰に一般化して、多クラス分類問題を学習および予測できます。たとえば、$K$ 個の可能な結果がある場合、結果の1つを「ピボット」として選択し、他の $K - 1$ 個の結果をピボット結果に対して個別に回帰させることができます。spark.mllib では、最初のクラス $0$ が「ピボット」クラスとして選択されます。参照については、The Elements of Statistical Learningのセクション4.4を参照してください。以下に詳細な数学的導出を示します。

多クラス分類問題の場合、アルゴリズムは多項ロジスティック回帰モデルを出力します。これには、$K - 1$ 個の二値ロジスティック回帰モデルが含まれ、これらは最初のクラスに対して回帰されます。新しいデータポイントが与えられると、$K - 1$ 個のモデルが実行され、最大の確率を持つクラスが予測クラスとして選択されます。

ロジスティック回帰を解くために、ミニバッチ勾配降下法とL-BFGSの2つのアルゴリズムを実装しました。収束が速いため、ミニバッチ勾配降下法よりもL-BFGSをお勧めします。

以下の例は、サンプルデータセットのロード方法、ロジスティック回帰モデルの構築方法、および結果のモデルで予測を行い学習誤差を計算する方法を示しています。

Python APIは、現時点では多クラス分類およびモデルの保存/ロードをサポートしていませんが、将来的にはサポートする予定です。

APIの詳細については、LogisticRegressionWithLBFGS Python ドキュメントおよびLogisticRegressionModel Python ドキュメントを参照してください。

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py」にあります。

以下のコードは、サンプル多クラスデータセットのロード方法、それを学習用とテスト用に分割する方法、およびLogisticRegressionWithLBFGSを使用してロジスティック回帰モデルを適合させる方法を示しています。その後、モデルはテストデータセットに対して評価され、ディスクに保存されます。

APIの詳細については、LogisticRegressionWithLBFGS Scala ドキュメントおよびLogisticRegressionModel Scala ドキュメントを参照してください。

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training)

// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

// Get evaluation metrics.
val metrics = new MulticlassMetrics(predictionAndLabels)
val accuracy = metrics.accuracy
println(s"Accuracy = $accuracy")

// Save and load model
model.save(sc, "target/tmp/scalaLogisticRegressionWithLBFGSModel")
val sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/scalaLogisticRegressionWithLBFGSModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/LogisticRegressionWithLBFGSExample.scala」にあります。

以下のコードは、サンプル多クラスデータセットのロード方法、それを学習用とテスト用に分割する方法、およびLogisticRegressionWithLBFGSを使用してロジスティック回帰モデルを適合させる方法を示しています。その後、モデルはテストデータセットに対して評価され、ディスクに保存されます。

APIの詳細については、LogisticRegressionWithLBFGS Java ドキュメントおよびLogisticRegressionModel Java ドキュメントを参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];

// Run training algorithm to build the model.
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training.rdd());

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("Accuracy = " + accuracy);

// Save and load model
model.save(sc, "target/tmp/javaLogisticRegressionWithLBFGSModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/javaLogisticRegressionWithLBFGSModel");
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java」にあります。

回帰

線形最小二乗法、Lasso、リッジ回帰

\[ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]

さまざまな関連回帰手法は、異なる種類の正則化を使用することによって導出されます。Ordinary least squaresまたはlinear least squaresは正則化を使用しません。ridge regressionはL2正則化を使用し、LassoはL1正則化を使用します。これらのモデルすべてにおいて、平均損失または学習誤差 $\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$ は平均二乗誤差として知られています。

ストリーミング線形回帰

データがストリーミング形式で到着する場合、回帰モデルをオンラインで適合させ、新しいデータが到着するたびにモデルのパラメータを更新することが便利です。spark.mllib は現在、通常の最小二乗法を使用したストリーミング線形回帰をサポートしています。適合はオフラインで行われる適合と同様ですが、適合は各データバッチで発生するため、モデルはストリームからのデータを反映するように継続的に更新されます。

以下の例は、2つの異なるテキストファイルストリームから学習用とテスト用データをロードする方法、ストリームをラベル付きポイントとして解析する方法、最初のストリームに線形回帰モデルをオンラインで適合させる方法、および2番目のストリームで予測を行う方法を示しています。

まず、入力データを解析し、モデルを作成するために必要なクラスをインポートします。

次に、学習用とテスト用の入力ストリームを作成します。ここでは、StreamingContext ssc が既に作成されていると仮定します。詳細については、Spark Streaming Programming Guideを参照してください。この例では、学習およびテストストリームでラベル付きポイントを使用しますが、実際にはテストデータにはラベルなしベクトルを使用したい場合が多いでしょう。

重みを0に初期化してモデルを作成します。

次に、学習用とテスト用のストリームを登録し、ジョブを開始します。

これで、学習用またはテスト用のフォルダにテキストファイルを保存できます。各行は、(y,[x1,x2,x3]) の形式のデータポイントである必要があります。ここで、y はラベルであり、x1,x2,x3 は特徴です。sys.argv[1] にテキストファイルが配置されるたびに、モデルが更新されます。sys.argv[2] にテキストファイルが配置されるたびに、予測が表示されます。学習ディレクトリにデータが追加されるにつれて、予測は改善されます。

ここに完全な例を示します。

import sys

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)

numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])

model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/streaming_linear_regression_example.py」にあります。

まず、入力データを解析し、モデルを作成するために必要なクラスをインポートします。

次に、学習用とテスト用の入力ストリームを作成します。ここでは、StreamingContext ssc が既に作成されていると仮定します。詳細については、Spark Streaming Programming Guideを参照してください。この例では、学習およびテストストリームでラベル付きポイントを使用しますが、実際にはテストデータにはラベルなしベクトルを使用したい場合が多いでしょう。

重みをゼロに初期化してモデルを作成し、学習用とテスト用のストリームを登録してからジョブを開始します。真のラベルと一緒に予測を表示することで、結果を簡単に確認できます。

最後に、学習用またはテスト用のフォルダにテキストファイルを保存できます。各行は、(y,[x1,x2,x3]) の形式のデータポイントである必要があります。ここで、y はラベルであり、x1,x2,x3 は特徴です。args(0) にテキストファイルが配置されるたびに、モデルが更新されます。args(1) にテキストファイルが配置されるたびに、予測が表示されます。学習ディレクトリにデータが追加されるにつれて、予測は改善されます。

ここに完全な例を示します。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
  .setInitialWeights(Vectors.zeros(numFeatures))

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegressionExample.scala」にあります。

実装(開発者向け)

内部的には、spark.mllib は、基盤となる勾配降下法プリミティブ(最適化セクションで説明)に基づいて、確率的勾配降下法(SGD)の単純な分散バージョンを実装しています。提供されているすべてのアルゴリズムは、正則化パラメータ(regParam)と、確率的勾配降下法に関連するさまざまなパラメータ(stepSizenumIterationsminiBatchFraction)を入力として受け取ります。それぞれについて、3つの可能な正則化(なし、L1、またはL2)すべてをサポートしています。

ロジスティック回帰の場合、L-BFGSバージョンはLogisticRegressionWithLBFGSの下に実装されており、このバージョンは二値および多項ロジスティック回帰の両方をサポートしますが、SGDバージョンは二値ロジスティック回帰のみをサポートします。ただし、L-BFGSバージョンはL1正則化をサポートしませんが、SGDバージョンはL1正則化をサポートします。L1正則化が不要な場合、L-BFGSバージョンは、準ニュートン法を使用して逆ヘッセ行列を近似することにより、SGDよりも収束が速く、より正確であるため、強く推奨されます。

アルゴリズムはすべてScalaで実装されています。