回帰 - RDDベースAPI

単調回帰

「等張回帰」(Isotonic regression) は、回帰アルゴリズムのファミリに属します。形式的には、等張回帰は、観測された応答を表す実数 `{y_1, y_2, ..., y_n}` の有限集合と、適合させる未知の応答値 `{x_1, x_2, ..., x_n}` が与えられた場合に、以下の式を最小化する関数を見つける問題です。

\begin{equation} f(x) = \sum_{i=1}^n w_i (y_i - x_i)^2 \end{equation}

ただし、これは完全順序付けのもとで、`$x_1\le x_2\le ...\le x_n$` の制約のもとで行われ、`$w_i$` は正の重みです。結果として得られる関数は等張回帰と呼ばれ、一意に定まります。これは順序制約下の最小二乗問題と見なすことができます。本質的に、等張回帰は元のデータ点に最もよく適合する「単調関数」(monotonic function) です。

spark.mllib は、「隣接違反者統合アルゴリズム」(pool adjacent violators algorithm) をサポートしており、これは「等張回帰の並列化」(parallelizing isotonic regression) のアプローチを使用します。トレーニング入力は、ラベル、特徴量、および重みをこの順序で表す3つのdouble値のタプルのRDDです。同じ特徴量を持つ複数のタプルが存在する場合、これらのタプルは次のように単一のタプルに集約されます。

さらに、IsotonicRegressionアルゴリズムには、デフォルトでtrueに設定されている`$isotonic$`という名前のオプションパラメータが1つあります。この引数は、等張回帰が単調増加(isotonic)か単調減少(antitonic)かを指定します。

トレーニングは、既知および未知の特徴量に対するラベルを予測するために使用できる`IsotonicRegressionModel`を返します。等張回帰の結果は、区分的線形関数として扱われます。したがって、予測のルールは次のようになります。

データは、各行が `label,feature` の形式(例: `4710.28,500.00`)であるファイルから読み込まれます。データはトレーニングセットとテストセットに分割されます。モデルはトレーニングセットを使用して作成され、テストセットの予測ラベルと実際のラベルから平均二乗誤差が計算されます。

APIの詳細については、IsotonicRegression Python ドキュメント および IsotonicRegressionModel Python ドキュメント を参照してください。

import math
from pyspark.mllib.regression import IsotonicRegression, IsotonicRegressionModel
from pyspark.mllib.util import MLUtils

# Load and parse the data
def parsePoint(labeledData):
    return (labeledData.label, labeledData.features[0], 1.0)

data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_isotonic_regression_libsvm_data.txt")

# Create label, feature, weight tuples from input data with weight set to default value 1.0.
parsedData = data.map(parsePoint)

# Split data into training (60%) and test (40%) sets.
training, test = parsedData.randomSplit([0.6, 0.4], 11)

# Create isotonic regression model from training data.
# Isotonic parameter defaults to true so it is only shown for demonstration
model = IsotonicRegression.train(training)

# Create tuples of predicted and real labels.
predictionAndLabel = test.map(lambda p: (model.predict(p[1]), p[0]))

# Calculate mean squared error between predicted and real labels.
meanSquaredError = predictionAndLabel.map(lambda pl: math.pow((pl[0] - pl[1]), 2)).mean()
print("Mean Squared Error = " + str(meanSquaredError))

# Save and load model
model.save(sc, "target/tmp/myIsotonicRegressionModel")
sameModel = IsotonicRegressionModel.load(sc, "target/tmp/myIsotonicRegressionModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/isotonic_regression_example.py」にあります。

データは、各行が `label,feature` の形式(例: `4710.28,500.00`)であるファイルから読み込まれます。データはトレーニングセットとテストセットに分割されます。モデルはトレーニングセットを使用して作成され、テストセットの予測ラベルと実際のラベルから平均二乗誤差が計算されます。

APIの詳細については、IsotonicRegression Scala ドキュメント および IsotonicRegressionModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel}
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc,
  "data/mllib/sample_isotonic_regression_libsvm_data.txt").cache()

// Create label, feature, weight tuples from input data with weight set to default value 1.0.
val parsedData = data.map { labeledPoint =>
  (labeledPoint.label, labeledPoint.features(0), 1.0)
}

// Split data into training (60%) and test (40%) sets.
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)

// Create isotonic regression model from training data.
// Isotonic parameter defaults to true so it is only shown for demonstration
val model = new IsotonicRegression().setIsotonic(true).run(training)

// Create tuples of predicted and real labels.
val predictionAndLabel = test.map { point =>
  val predictedLabel = model.predict(point._2)
  (predictedLabel, point._1)
}

// Calculate mean squared error between predicted and real labels.
val meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean()
println(s"Mean Squared Error = $meanSquaredError")

// Save and load model
model.save(sc, "target/tmp/myIsotonicRegressionModel")
val sameModel = IsotonicRegressionModel.load(sc, "target/tmp/myIsotonicRegressionModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/IsotonicRegressionExample.scala」にあります。

データは、各行が `label,feature` の形式(例: `4710.28,500.00`)であるファイルから読み込まれます。データはトレーニングセットとテストセットに分割されます。モデルはトレーニングセットを使用して作成され、テストセットの予測ラベルと実際のラベルから平均二乗誤差が計算されます。

APIの詳細については、IsotonicRegression Java ドキュメント および IsotonicRegressionModel Java ドキュメント を参照してください。

import scala.Tuple2;
import scala.Tuple3;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.regression.IsotonicRegression;
import org.apache.spark.mllib.regression.IsotonicRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(
  jsc.sc(), "data/mllib/sample_isotonic_regression_libsvm_data.txt").toJavaRDD();

// Create label, feature, weight tuples from input data with weight set to default value 1.0.
JavaRDD<Tuple3<Double, Double, Double>> parsedData = data.map(point ->
  new Tuple3<>(point.label(), point.features().apply(0), 1.0));

// Split data into training (60%) and test (40%) sets.
JavaRDD<Tuple3<Double, Double, Double>>[] splits =
  parsedData.randomSplit(new double[]{0.6, 0.4}, 11L);
JavaRDD<Tuple3<Double, Double, Double>> training = splits[0];
JavaRDD<Tuple3<Double, Double, Double>> test = splits[1];

// Create isotonic regression model from training data.
// Isotonic parameter defaults to true so it is only shown for demonstration
IsotonicRegressionModel model = new IsotonicRegression().setIsotonic(true).run(training);

// Create tuples of predicted and real labels.
JavaPairRDD<Double, Double> predictionAndLabel = test.mapToPair(point ->
  new Tuple2<>(model.predict(point._2()), point._1()));

// Calculate mean squared error between predicted and real labels.
double meanSquaredError = predictionAndLabel.mapToDouble(pl -> {
  double diff = pl._1() - pl._2();
  return diff * diff;
}).mean();
System.out.println("Mean Squared Error = " + meanSquaredError);

// Save and load model
model.save(jsc.sc(), "target/tmp/myIsotonicRegressionModel");
IsotonicRegressionModel sameModel =
  IsotonicRegressionModel.load(jsc.sc(), "target/tmp/myIsotonicRegressionModel");
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaIsotonicRegressionExample.java」にあります。