線形メソッド - RDDベースのAPI

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

数式

多くの標準的な_機械学習_メソッドは、凸最適化問題、つまり、$d$個のエントリを持つ変数ベクトル$\wv$ (コードではweightsと呼ばれる) に依存する凸関数$f$の最小値を見つけるタスクとして定式化できます。正式には、この最適化問題を$\min_{\wv \in\R^d} \; f(\wv)$と書くことができます。ここで、目的関数は\begin{equation} f(\wv) := \lambda\, R(\wv) + \frac1n \sum_{i=1}^n L(\wv;\x_i,y_i) \label{eq:regPrimal} \ . \end{equation} の形式です。ここで、ベクトル$\x_i\in\R^d$は、$1\le i\le n$のトレーニングデータ例であり、$y_i\in\R$は、予測したい対応するラベルです。 $L(\wv; \x, y)$が$\wv^T x$と$y$の関数として表現できる場合、このメソッドを_線形_と呼びます。 spark.mllibの分類および回帰アルゴリズムのいくつかはこのカテゴリに属し、ここで説明します。

目的関数$f$は2つの部分で構成されています。モデルの複雑さを制御する正則化項と、トレーニングデータにおけるモデルの誤差を測定する損失です。損失関数$L(\wv;.)$は、通常、$\wv$の凸関数です。固定正則化パラメータ$\lambda \ge 0$ (コードではregParam) は、損失 (つまり、トレーニングエラー) を最小化するという2つの目標と、モデルの複雑さを最小化するという目標 (つまり、過剰適合を回避するため) の間のトレードオフを定義します。

損失関数

次の表は、spark.mllibがサポートするメソッドの損失関数とその勾配または劣勾配をまとめたものです。

損失関数 $L(\wv; \x, y)$勾配または劣勾配
ヒンジ損失$\max \{0, 1-y \wv^T \x \}, \quad y \in \{-1, +1\}$ $\begin{cases}-y \cdot \x & \text{if $y \wv^T \x <1$}, \\ 0 & \text{otherwise}.\end{cases}$
ロジスティック損失$\log(1+\exp( -y \wv^T \x)), \quad y \in \{-1, +1\}$ $-y \left(1-\frac1{1+\exp(-y \wv^T \x)} \right) \cdot \x$
二乗損失$\frac{1}{2} (\wv^T \x - y)^2, \quad y \in \R$ $(\wv^T \x - y) \cdot \x$

上記の数学的定式化では、2値ラベル$y$は、定式化に便利な$+1$ (正) または$-1$ (負) のいずれかとして表されます。 _ただし_、spark.mllibでは、負のラベルは、多クラスラベル付けとの整合性のために、$-1$ではなく$0$で表されます。

正則化

正則化項の目的は、単純なモデルを奨励し、過剰適合を回避することです。 spark.mllibでは、以下の正則化項をサポートしています。

正則化項 $R(\wv)$勾配または劣勾配
ゼロ (正則化なし)0$\0$
L2$\frac{1}{2}\|\wv\|_2^2$$\wv$
L1$\|\wv\|_1$$\mathrm{sign}(\wv)$
Elastic Net$\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$$\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$

ここで、$\mathrm{sign}(\wv)$は、$\wv$のすべてのエントリの符号 ($\pm1$) で構成されるベクトルです。

L2正則化問題は、滑らかさのために、一般的にL1正則化問題よりも解きやすいです。ただし、L1正則化は、重みのスパース性を促進するのに役立ち、より小さく、より解釈しやすいモデルにつながります。後者は、特徴量選択に役立ちます。 Elastic Netは、L1とL2正則化の組み合わせです。特にトレーニング例が少ない場合は、正則化なしでモデルをトレーニングすることはお勧めしません。

最適化

内部的には、線形メソッドは凸最適化メソッドを使用して目的関数を最適化します。 spark.mllibは、最適化のセクションで説明されているSGDとL-BFGSの2つのメソッドを使用します。現在、ほとんどのアルゴリズムAPIは確率的勾配降下法 (SGD) をサポートしており、いくつかはL-BFGSをサポートしています。最適化メソッドの選択に関するガイドラインについては、この最適化セクションを参照してください。

分類

分類は、項目をカテゴリに分割することを目的としています。最も一般的な分類タイプは、2値分類で、通常は正と負と呼ばれる2つのカテゴリがあります。3つ以上のカテゴリがある場合は、多クラス分類と呼ばれます。 spark.mllibは、分類のための2つの線形メソッド、線形サポートベクターマシン (SVM) とロジスティック回帰をサポートしています。線形SVMは2値分類のみをサポートしますが、ロジスティック回帰は2値と多クラスの両方の分類問題をサポートします。どちらのメソッドについても、spark.mllibはL1およびL2正則化バリアントをサポートしています。トレーニングデータセットは、MLlibではLabeledPointのRDDで表され、ラベルはゼロから始まるクラスインデックスです: $0, 1, 2, \ldots$。

線形サポートベクターマシン (SVM)

線形SVMは、大規模な分類タスクの標準的な方法です。これは、式$\eqref{eq:regPrimal}$で前述したように線形メソッドであり、定式化における損失関数はヒンジ損失によって与えられます。

\[ L(\wv;\x,y) := \max \{0, 1-y \wv^T \x \}. \] デフォルトでは、線形SVMはL2正則化でトレーニングされます。代替のL1正則化もサポートしています。この場合、問題は線形計画問題になります。

線形SVMアルゴリズムは、SVMモデルを出力します。 $\x$で表される新しいデータポイントが与えられると、モデルは$\wv^T \x$の値に基づいて予測を行います。デフォルトでは、$\wv^T \x \geq 0$の場合、結果は正になり、そうでない場合は負になります。

次の例は、サンプルデータセットを読み込み、SVMモデルを構築し、結果のモデルを使用して予測を行い、トレーニングエラーを計算する方法を示しています。

APIの詳細については、SVMWithSGD PythonドキュメントおよびSVMModel Pythonドキュメントを参照してください。

from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonSVMWithSGDModel")
sameModel = SVMModel.load(sc, "target/tmp/pythonSVMWithSGDModel")
Sparkリポジトリの "examples/src/main/python/mllib/svm_with_sgd_example.py" に完全なサンプルコードがあります。

次のコードスニペットは、サンプルデータセットを読み込み、アルゴリズムオブジェクトの静的メソッドを使用してこのトレーニングデータでトレーニングアルゴリズムを実行し、結果のモデルで予測を行ってトレーニングエラーを計算する方法を示しています。

APIの詳細については、SVMWithSGD ScalaドキュメントおよびSVMModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val numIterations = 100
val model = SVMWithSGD.train(training, numIterations)

// Clear the default threshold.
model.clearThreshold()

// Compute raw scores on the test set.
val scoreAndLabels = test.map { point =>
  val score = model.predict(point.features)
  (score, point.label)
}

// Get evaluation metrics.
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()

println(s"Area under ROC = $auROC")

// Save and load model
model.save(sc, "target/tmp/scalaSVMWithSGDModel")
val sameModel = SVMModel.load(sc, "target/tmp/scalaSVMWithSGDModel")
Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/SVMWithSGDExample.scala" に完全なサンプルコードがあります。

SVMWithSGD.train()メソッドは、デフォルトでは、正則化パラメータを1.0に設定してL2正則化を実行します。このアルゴリズムを設定する場合、新しいオブジェクトを直接作成し、セッターメソッドを呼び出すことによって、SVMWithSGDをさらにカスタマイズできます。他のすべてのspark.mllibアルゴリズムも、このようにカスタマイズをサポートしています。たとえば、次のコードは、正則化パラメータを0.1に設定したSVMのL1正則化バリアントを生成し、トレーニングアルゴリズムを200イテレーション実行します。

import org.apache.spark.mllib.optimization.L1Updater

val svmAlg = new SVMWithSGD()
svmAlg.optimizer
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater)
val modelL1 = svmAlg.run(training)

MLlibのすべてのメソッドは、Javaフレンドリな型を使用しているため、Scalaと同じ方法でJavaにインポートして呼び出すことができます。唯一の注意点は、メソッドがScala RDDオブジェクトを受け取る一方、Spark Java APIは別のJavaRDDクラスを使用することです。 JavaRDDオブジェクトで.rdd()を呼び出すことにより、Java RDDをScala RDDに変換できます。Scalaで提供されている例と同等の自己完結型アプリケーションの例を以下に示します。

APIの詳細については、SVMWithSGD JavaドキュメントおよびSVMModel Javaドキュメントを参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);

// Run training algorithm to build the model.
int numIterations = 100;
SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
BinaryClassificationMetrics metrics =
  new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();

System.out.println("Area under ROC = " + auROC);

// Save and load model
model.save(sc, "target/tmp/javaSVMWithSGDModel");
SVMModel sameModel = SVMModel.load(sc, "target/tmp/javaSVMWithSGDModel");
Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaSVMWithSGDExample.java" に完全なサンプルコードがあります。

SVMWithSGD.train()メソッドは、デフォルトでは、正則化パラメータを1.0に設定してL2正則化を実行します。このアルゴリズムを設定する場合、新しいオブジェクトを直接作成し、セッターメソッドを呼び出すことによって、SVMWithSGDをさらにカスタマイズできます。他のすべてのspark.mllibアルゴリズムも、このようにカスタマイズをサポートしています。たとえば、次のコードは、正則化パラメータを0.1に設定したSVMのL1正則化バリアントを生成し、トレーニングアルゴリズムを200イテレーション実行します。

import org.apache.spark.mllib.optimization.L1Updater;

SVMWithSGD svmAlg = new SVMWithSGD();
svmAlg.optimizer()
  .setNumIterations(200)
  .setRegParam(0.1)
  .setUpdater(new L1Updater());
SVMModel modelL1 = svmAlg.run(training.rdd());

上記のアプリケーションを実行するには、Sparkクイックスタートガイドの自己完結型アプリケーションセクションに記載されている手順に従ってください。また、ビルドファイルにspark-mllibを依存関係として含めるようにしてください。

ロジスティック回帰

ロジスティック回帰は、二項応答を予測するために広く使用されています。これは、上記の式$\eqref{eq:regPrimal}$で説明されている線形手法であり、定式化における損失関数は、以下のロジスティック損失によって与えられます。\[ L(\wv;\x,y) := \log(1+\exp( -y \wv^T \x)). \]

二値分類問題の場合、アルゴリズムは二項ロジスティック回帰モデルを出力します。$\x$で表される新しいデータポイントが与えられると、モデルはロジスティック関数\[ \mathrm{f}(z) = \frac{1}{1 + e^{-z}} \]を適用して予測を行います。ここで、$z = \wv^T \x$です。デフォルトでは、$\mathrm{f}(\wv^T x) > 0.5$の場合、結果は正になり、そうでない場合は負になります。ただし、線形SVMとは異なり、ロジスティック回帰モデルの生の出力$\mathrm{f}(z)$には確率的解釈(つまり、$\x$が正である確率)があります。

二項ロジスティック回帰は、多項ロジスティック回帰に一般化して、多クラス分類問題をトレーニングおよび予測することができます。たとえば、$K$個の可能な結果に対して、結果の1つを「ピボット」として選択し、他の$K - 1$個の結果をピボット結果に対して個別に回帰させることができます。spark.mllibでは、最初のクラス$0$が「ピボット」クラスとして選択されます。参考文献については、統計的学習の基礎のセクション4.4を参照してください。 詳細な数学的導出はこちらです。

多クラス分類問題の場合、アルゴリズムは多項ロジスティック回帰モデルを出力します。これは、最初のクラスに対して回帰された$K - 1$個の二項ロジスティック回帰モデルを含みます。新しいデータポイントが与えられると、$K - 1$個のモデルが実行され、確率が最も高いクラスが予測クラスとして選択されます。

ロジスティック回帰を解くために、ミニバッチ勾配降下法とL-BFGSの2つのアルゴリズムを実装しました。高速な収束のため、ミニバッチ勾配降下法よりもL-BFGSをお勧めします。

次の例は、サンプルデータセットを読み込み、ロジスティック回帰モデルを構築し、結果のモデルを使用して予測を行い、トレーニングエラーを計算する方法を示しています。

Python APIは、まだ多クラス分類とモデルの保存/読み込みをサポートしていませんが、将来的にはサポートする予定です。

APIの詳細については、LogisticRegressionWithLBFGS PythonドキュメントLogisticRegressionModel Pythonドキュメントを参照してください。

from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)

# Build the model
model = LogisticRegressionWithLBFGS.train(parsedData)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

# Save and load model
model.save(sc, "target/tmp/pythonLogisticRegressionWithLBFGSModel")
sameModel = LogisticRegressionModel.load(sc,
                                         "target/tmp/pythonLogisticRegressionWithLBFGSModel")
Sparkリポジトリの "examples/src/main/python/mllib/logistic_regression_with_lbfgs_example.py" に完全なサンプルコードがあります。

次のコードは、サンプルの多クラスデータセットを読み込み、トレーニングとテストに分割し、LogisticRegressionWithLBFGSを使用してロジスティック回帰モデルを適合させる方法を示しています。次に、モデルはテストデータセットに対して評価され、ディスクに保存されます。

APIの詳細については、LogisticRegressionWithLBFGS ScalaドキュメントLogisticRegressionModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.classification.{LogisticRegressionModel, LogisticRegressionWithLBFGS}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// Load training data in LIBSVM format.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)

// Run training algorithm to build the model
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training)

// Compute raw scores on the test set.
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

// Get evaluation metrics.
val metrics = new MulticlassMetrics(predictionAndLabels)
val accuracy = metrics.accuracy
println(s"Accuracy = $accuracy")

// Save and load model
model.save(sc, "target/tmp/scalaLogisticRegressionWithLBFGSModel")
val sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/scalaLogisticRegressionWithLBFGSModel")
Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/LogisticRegressionWithLBFGSExample.scala" に完全なサンプルコードがあります。

次のコードは、サンプルの多クラスデータセットを読み込み、トレーニングとテストに分割し、LogisticRegressionWithLBFGSを使用してロジスティック回帰モデルを適合させる方法を示しています。次に、モデルはテストデータセットに対して評価され、ディスクに保存されます。

APIの詳細については、LogisticRegressionWithLBFGS JavaドキュメントLogisticRegressionModel Javaドキュメントを参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

String path = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[] {0.6, 0.4}, 11L);
JavaRDD<LabeledPoint> training = splits[0].cache();
JavaRDD<LabeledPoint> test = splits[1];

// Run training algorithm to build the model.
LogisticRegressionModel model = new LogisticRegressionWithLBFGS()
  .setNumClasses(10)
  .run(training.rdd());

// Compute raw scores on the test set.
JavaPairRDD<Object, Object> predictionAndLabels = test.mapToPair(p ->
  new Tuple2<>(model.predict(p.features()), p.label()));

// Get evaluation metrics.
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("Accuracy = " + accuracy);

// Save and load model
model.save(sc, "target/tmp/javaLogisticRegressionWithLBFGSModel");
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
  "target/tmp/javaLogisticRegressionWithLBFGSModel");
Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaLogisticRegressionWithLBFGSExample.java" に完全なサンプルコードがあります。

回帰

線形最小二乗法、Lasso、リッジ回帰

線形最小二乗法は、回帰問題の最も一般的な定式化です。これは、上記の式$\eqref{eq:regPrimal}$で説明されている線形手法であり、定式化における損失関数は、以下の二乗損失によって与えられます。\[ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]

さまざまな関連回帰手法は、異なるタイプの正則化を使用することによって導き出されます。通常最小二乗法または線形最小二乗法は正則化を使用しません。リッジ回帰はL2正則化を使用します。そしてLassoはL1正則化を使用します。これらのすべてのモデルについて、平均損失またはトレーニングエラー$\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$は、平均二乗誤差と呼ばれます。

ストリーミング線形回帰

データがストリーミング形式で到着する場合、新しいデータが到着するにつれてモデルのパラメータを更新するオンライン回帰モデルを適合させることが有用です。spark.mllibは現在、通常最小二乗法を使用したストリーミング線形回帰をサポートしています。フィッティングはオフラインで実行されるものと似ていますが、フィッティングはデータの各バッチで発生するため、モデルはストリームからのデータを反映するように継続的に更新されます。

次の例は、2つの異なるテキストファイルの入力ストリームからトレーニングデータとテストデータを読み込み、ストリームをラベル付きポイントとして解析し、最初のストリームにオンライン線形回帰モデルを適合させ、2番目のストリームで予測を行う方法を示しています。

まず、入力データを解析し、モデルを作成するために必要なクラスをインポートします。

次に、トレーニングデータとテストデータの入力ストリームを作成します。StreamingContext ssc は既に作成されていると仮定します。詳細については、Spark Streamingプログラミングガイドを参照してください。この例では、トレーニングストリームとテストストリームでラベル付きポイントを使用しますが、実際にはテストデータにラベルのないベクトルを使用することがあります。

重みを0に初期化することにより、モデルを作成します。

次に、トレーニングとテストのストリームを登録し、ジョブを開始します。

これで、トレーニングフォルダーまたはテストフォルダーにデータを含むテキストファイルを保存できます。各行は、(y,[x1,x2,x3])の形式のデータポイントである必要があります。ここで、yはラベル、x1,x2,x3は特徴です。テキストファイルがsys.argv[1]に配置されるたびに、モデルが更新されます。テキストファイルがsys.argv[2]に配置されるたびに、予測が表示されます。トレーニングディレクトリにデータを追加するほど、予測の精度が向上します。

完全な例を以下に示します。

import sys

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.regression import StreamingLinearRegressionWithSGD

def parse(lp):
    label = float(lp[lp.find('(') + 1: lp.find(',')])
    vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
    return LabeledPoint(label, vec)

trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
testData = ssc.textFileStream(sys.argv[2]).map(parse)

numFeatures = 3
model = StreamingLinearRegressionWithSGD()
model.setInitialWeights([0.0, 0.0, 0.0])

model.trainOn(trainingData)
print(model.predictOnValues(testData.map(lambda lp: (lp.label, lp.features))))

ssc.start()
ssc.awaitTermination()
Sparkリポジトリの "examples/src/main/python/mllib/streaming_linear_regression_example.py" に完全なサンプルコードがあります。

まず、入力データを解析し、モデルを作成するために必要なクラスをインポートします。

次に、トレーニングデータとテストデータの入力ストリームを作成します。StreamingContext ssc は既に作成されていると仮定します。詳細については、Spark Streamingプログラミングガイドを参照してください。この例では、トレーニングストリームとテストストリームでラベル付きポイントを使用しますが、実際にはテストデータにラベルのないベクトルを使用することがあります。

重みをゼロに初期化することによりモデルを作成し、トレーニングとテストのストリームを登録してジョブを開始します。真のラベルと一緒に予測を出力することで、結果を簡単に確認できます。

最後に、トレーニングフォルダーまたはテストフォルダーにデータを含むテキストファイルを保存できます。各行は、(y,[x1,x2,x3])の形式のデータポイントである必要があります。ここで、yはラベル、x1,x2,x3は特徴です。テキストファイルがargs(0)に配置されるたびに、モデルが更新されます。テキストファイルがargs(1)に配置されるたびに、予測が表示されます。トレーニングディレクトリにデータを追加するほど、予測の精度が向上します。

完全な例を以下に示します。

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse).cache()
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD()
  .setInitialWeights(Vectors.zeros(numFeatures))

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()
Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegressionExample.scala" に完全なサンプルコードがあります。

実装 (開発者向け)

背後では、spark.mllibは、基礎となる勾配降下プリミティブ(最適化セクションで説明)に基づいて、確率的勾配降下法(SGD)の単純な分散バージョンを実装しています。提供されるすべてのアルゴリズムは、正則化パラメータ(regParam)と、確率的勾配降下法に関連するさまざまなパラメータ(stepSizenumIterationsminiBatchFraction)を入力として受け取ります。それぞれについて、3つの可能な正則化(なし、L1、またはL2)すべてをサポートしています。

ロジスティック回帰の場合、L-BFGSバージョンはLogisticRegressionWithLBFGSの下に実装されており、このバージョンは2値ロジスティック回帰と多項ロジスティック回帰の両方をサポートしていますが、SGDバージョンは2値ロジスティック回帰のみをサポートしています。ただし、L-BFGSバージョンはL1正則化をサポートしていませんが、SGDはL1正則化をサポートしています。L1正則化が不要な場合は、L-BFGSバージョンを強くお勧めします。これは、準ニュートン法を使用して逆ヘッセ行列を近似することにより、SGDと比較して高速かつ正確に収束するためです。

アルゴリズムはすべてScalaで実装されています