決定木 - RDDベースAPI

決定木とそのアンサンブルは、分類および回帰の機械学習タスクで人気のある手法です。決定木は、解釈が容易で、カテゴリカル特徴量を処理でき、マルチクラス分類設定に拡張でき、特徴量スケーリングを必要とせず、非線形性や特徴量間の相互作用を捉えることができるため、広く使用されています。ランダムフォレストやブースティングなどのツリーアンサンブルアルゴリズムは、分類および回帰タスクでトップクラスのパフォーマンスを発揮します。

spark.mllib は、バイナリおよびマルチクラス分類、回帰に対して、連続特徴量とカテゴリカル特徴量の両方を使用した決定木をサポートします。この実装では、データを行ごとにパーティション分割することで、数百万インスタンスでの分散トレーニングが可能になります。

ツリーのアンサンブル(ランダムフォレストおよび勾配ブースティングツリー)については、アンサンブルガイドで説明しています。

基本アルゴリズム

決定木は、特徴空間の再帰的な二項分割を実行する貪欲なアルゴリズムです。ツリーは、各最下位(リーフ)パーティションに対して同じラベルを予測します。各パーティションは、ツリーノードでの情報利得を最大化するように、可能な分割のセットから**最適な分割**を選択することによって貪欲に選択されます。言い換えると、各ツリーノードで選択される分割は、分割 $s$ がデータセット $D$ に適用されたときの情報利得を $IG(D,s)$ とすると、 $\underset{s}{\operatorname{argmax}} IG(D,s)$ のセットから選択されます。

ノードの不純度と情報利得

**ノードの不純度**は、ノードでのラベルの均一性の尺度です。現在の実装では、分類用に2つの不純度尺度(ジニ不純度とエントロピー)、回帰用に1つの不純度尺度(分散)が提供されています。

不純度タスク説明
ジニ不純度 分類 $\sum_{i=1}^{C} f_i(1-f_i)$$f_i$ はノードでのラベル $i$ の頻度、$C$ は一意のラベルの数です。
エントロピー 分類 $\sum_{i=1}^{C} -f_ilog(f_i)$$f_i$ はノードでのラベル $i$ の頻度、$C$ は一意のラベルの数です。
分散 回帰 $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$$y_i$ はインスタンスのラベル、$N$ はインスタンスの数、$\mu$ は $\frac{1}{N} \sum_{i=1}^N y_i$ によって与えられる平均です。

**情報利得**は、親ノードの不純度と、2つの子ノードの不純度の重み付き合計との差です。分割 $s$ がサイズ $N$ のデータセット $D$ を、それぞれサイズ $N_{left}$ および $N_{right}$ の2つのデータセット $D_{left}$ および $D_{right}$ に分割すると仮定すると、情報利得は次のようになります。

$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$

分割候補

連続特徴量

単一マシン実装の小規模データセットの場合、各連続特徴量の分割候補は通常、その特徴量のユニークな値です。一部の実装では、特徴量値をソートし、ソートされたユニークな値を分割候補として使用することで、ツリー計算を高速化します。

大規模分散データセットの場合、特徴量値のソートはコストがかかります。この実装では、データのサンプリングされた一部に対して量子化計算を実行することで、分割候補の近似セットを計算します。ソートされた分割は「ビン」を作成し、このようなビンの最大数は maxBins パラメータを使用して指定できます。

ビンの数はインスタンス数 $N$ を超えることはできません(デフォルトの maxBins 値は32であるため、これはまれなケースです)。ツリーアルゴリズムは、条件が満たされない場合、ビンの数を自動的に減らします。

カテゴリカル特徴量

$M$ 個の可能な値(カテゴリ)を持つカテゴリカル特徴量の場合、$2^{M-1}-1$ 個の分割候補が考えられます。バイナリ(0/1)分類および回帰では、カテゴリカル特徴量値を平均ラベルでソートすることにより、分割候補の数を $M-1$ に減らすことができます。(詳細は、Elements of Statistical Machine Learning のセクション9.2.4を参照してください。)たとえば、3つのカテゴリA、B、Cを持ち、対応するラベル1の割合が0.2、0.6、0.4であるバイナリ分類問題では、カテゴリカル特徴量はA、C、Bの順にソートされます。2つの分割候補は A | C, B および A , C | B です。ここで | は分割を示します。

マルチクラス分類では、可能なすべての $2^{M-1}-1$ 個の分割が可能な限り使用されます。 $2^{M-1}-1$ が maxBins パラメータより大きい場合、(ヒューリスティックな)方法がバイナリ分類および回帰に使用される方法と同様に使用されます。 $M$ 個のカテゴリカル特徴量値は不純度でソートされ、結果として得られる $M-1$ 個の分割候補が検討されます。

停止ルール

再帰的なツリー構築は、以下のいずれかの条件が満たされたときにノードで停止します。

  1. ノードの深さが maxDepth トレーニングパラメータに等しい。
  2. 情報利得が minInfoGain より大きい分割候補がない。
  3. 情報利得が minInfoGain より大きい分割候補がない。

使用上のヒント

いくつかのガイドラインを、さまざまなパラメータを議論することで決定木の使用について示します。パラメータは、重要度の降順でおおよそリストされています。新規ユーザーは、主に「問題仕様パラメータ」セクションと maxDepth パラメータを検討してください。

問題仕様パラメータ

これらのパラメータは、解決したい問題とデータセットを記述します。これらは指定する必要があり、チューニングは必要ありません。

停止基準

これらのパラメータは、ツリーの構築(新しいノードの追加)をいつ停止するかを決定します。これらのパラメータをチューニングする際は、過学習を避けるために、分離されたテストデータで検証するように注意してください。

調整可能なパラメータ

これらのパラメータはチューニング可能です。過学習を避けるために、チューニング時には分離されたテストデータで検証するように注意してください。

キャッシングとチェックポインティング

MLlib 1.2 では、より大規模な(より深い)ツリーおよびツリーアンサンブルへのスケーリングのためのいくつかの機能が追加されました。maxDepth が大きく設定されている場合、ノードIDキャッシュとチェックポインティングを有効にすると役立ちます。これらのパラメータは、 numTrees が大きく設定されている場合の RandomForest にも役立ちます。

ノードIDキャッシングは、RDDのシーケンス(イテレーションごとに1つ)を生成します。この長い lineage はパフォーマンスの問題を引き起こす可能性がありますが、中間RDDをチェックポイントすることでこれらの問題を軽減できます。チェックポインティングは、 useNodeIdCache が true に設定されている場合にのみ適用されることに注意してください。

スケーリング

計算は、トレーニングインスタンスの数、特徴量の数、および maxBins パラメータに対してほぼ線形にスケールします。通信は、特徴量の数と maxBins に対してほぼ線形にスケールします。

実装されたアルゴリズムは、スパースデータと密データの両方を読み取ります。ただし、スパース入力用に最適化されていません。

分類

以下の例は、 LIBSVMデータファイルをロードし、それを LabeledPoint のRDDとして解析し、ジニ不純度を不純度尺度とし、最大ツリー深度を5として、決定木を使用して分類を実行する方法を示しています。テストエラーは、アルゴリズムの精度を測定するために計算されます。

APIの詳細については、 DecisionTree Python ドキュメントおよび DecisionTreeModel Python ドキュメントを参照してください。

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/decision_tree_classification_example.py」にあります。

APIの詳細については、 DecisionTree Scala ドキュメントおよび DecisionTreeModel Scala ドキュメントを参照してください。

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
  impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification tree model:\n ${model.toDebugString}")

// Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeClassificationExample.scala」にあります。

APIの詳細については、 DecisionTree Java ドキュメントおよび DecisionTreeModel Java ドキュメントを参照してください。

import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];

// Set parameters.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;

// Train a DecisionTree model for classification.
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
  categoricalFeaturesInfo, impurity, maxDepth, maxBins);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
  testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
  predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();

System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());

// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
  .load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java」にあります。

回帰

以下の例は、 LIBSVMデータファイルをロードし、それを LabeledPoint のRDDとして解析し、分散を不純度尺度とし、最大ツリー深度を5として、決定木を使用して回帰を実行する方法を示しています。最後に平均二乗誤差(MSE)を計算して、適合度を評価します。

APIの詳細については、 DecisionTree Python ドキュメントおよび DecisionTreeModel Python ドキュメントを参照してください。

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/decision_tree_regression_example.py」にあります。

APIの詳細については、 DecisionTree Scala ドキュメントおよび DecisionTreeModel Scala ドキュメントを参照してください。

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
  maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression tree model:\n ${model.toDebugString}")

// Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRegressionExample.scala」にあります。

APIの詳細については、 DecisionTree Java ドキュメントおよび DecisionTreeModel Java ドキュメントを参照してください。

import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];

// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "variance";
int maxDepth = 5;
int maxBins = 32;

// Train a DecisionTree model.
DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
  categoricalFeaturesInfo, impurity, maxDepth, maxBins);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
  testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
  double diff = pl._1() - pl._2();
  return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());

// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
DecisionTreeModel sameModel = DecisionTreeModel
  .load(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java」にあります。