決定木 - RDDベースのAPI

決定木とそのアンサンブルは、分類と回帰の機械学習タスクで人気のある手法です。決定木は解釈が容易で、カテゴリカルな特徴量を処理し、多クラス分類の設定に拡張でき、特徴量のスケーリングを必要とせず、非線形性と特徴量の相互作用を捉えることができるため、広く使用されています。ランダムフォレストやブースティングなどのツリーアンサンブルアルゴリズムは、分類と回帰タスクでトップレベルのパフォーマンスを示すものの1つです。

spark.mllibは、連続変数とカテゴリカルな特徴量の両方を使用して、2値分類、多クラス分類、回帰のための決定木をサポートしています。この実装は行ごとにデータを分割することで、数百万件のインスタンスを用いた分散トレーニングを可能にしています。

ツリーのアンサンブル(ランダムフォレストと勾配ブースティングツリー)については、アンサンブルガイドで説明しています。

基本アルゴリズム

決定木は、特徴空間を再帰的に2分木に分割する貪欲アルゴリズムです。ツリーは、最下部(葉)の各パーティションに対して同じラベルを予測します。各パーティションは、ツリーノードの情報利得を最大化するために、可能な分割のセットから最適な分割を選択することによって貪欲的に選択されます。言い換えれば、各ツリーノードで選択される分割は、$\underset{s}{\operatorname{argmax}} IG(D,s)$のセットから選択され、ここで$IG(D,s)$は、分割$s$をデータセット$D$に適用した場合の情報利得です。

ノードの不純度と情報利得

ノードの不純度は、ノードにおけるラベルの均質性の尺度です。現在の実装では、分類には2つの不純度尺度(ジニ不純度とエントロピー)、回帰には1つの不純度尺度(分散)を提供しています。

不純度タスク数式説明
ジニ不純度 分類 $\sum_{i=1}^{C} f_i(1-f_i)$$f_i$はノードにおけるラベル$i$の頻度、$C$は一意なラベルの数です。
エントロピー 分類 $\sum_{i=1}^{C} -f_ilog(f_i)$$f_i$はノードにおけるラベル$i$の頻度、$C$は一意なラベルの数です。
分散 回帰 $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$$y_i$はインスタンスのラベル、$N$はインスタンスの数、$\mu$は$\frac{1}{N} \sum_{i=1}^N y_i$で与えられる平均です。

情報利得は、親ノードの不純度と、2つの子ノードの不純度の重み付き和の差です。分割$s$がサイズ$N$のデータセット$D$をサイズ$N_{left}$$N_{right}$の2つのデータセット$D_{left}$$D_{right}$に分割すると仮定すると、情報利得は

$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$

分割候補

連続変数

単一マシン実装における小規模データセットの場合、各連続変数の分割候補は、通常、その特徴の固有値です。一部の実装では、特徴値をソートし、ソートされた固有値を分割候補として使用して、ツリー計算を高速化します。

大規模な分散データセットでは、特徴値のソートはコストがかかります。この実装では、データのサンプルされた一部に対して分位数計算を実行することで、近似的な分割候補のセットを計算します。ソートされた分割は「ビン」を作成し、そのようなビンの最大数はmaxBinsパラメータを使用して指定できます。

ビンの数はインスタンス数$N$を超えることはできません(デフォルトのmaxBins値は32であるため、まれなシナリオです)。ツリーアルゴリズムは、この条件が満たされない場合、ビンの数を自動的に減らします。

カテゴリカルな特徴量

$M$個の可能な値(カテゴリ)を持つカテゴリカルな特徴量の場合、$2^{M-1}-1$個の分割候補を考案できます。2値(0/1)分類と回帰では、平均ラベルによってカテゴリカルな特徴量値を並べ替えることで、分割候補の数を$M-1$に減らすことができます。(詳細は、Elements of Statistical Machine Learningの9.2.4節を参照してください)。たとえば、ラベル1の割合が0.2、0.6、0.4である3つのカテゴリA、B、Cを持つカテゴリカルな特徴量1つを持つ2値分類問題では、カテゴリカルな特徴量はA、C、Bの順に並べ替えられます。2つの分割候補はA | C、BとA、C | Bです。ここで|は分割を表します。

多クラス分類では、可能な限り常に$2^{M-1}-1$個の可能な分割が使用されます。$2^{M-1}-1$maxBinsパラメータより大きい場合、2値分類と回帰で使用される方法と同様の(ヒューリスティックな)方法を使用します。$M$個のカテゴリカルな特徴量値は不純度によって並べ替えられ、結果として得られる$M-1$個の分割候補が考慮されます。

停止規則

次の条件のいずれかが満たされると、再帰的なツリー構築はノードで停止します。

  1. ノードの深さがトレーニングパラメータmaxDepthと等しくなります。
  2. minInfoGainより大きい情報利得をもたらす分割候補がありません。
  3. それぞれ少なくともminInstancesPerNode個のトレーニングインスタンスを持つ子ノードを生成する分割候補がありません。

使用上のヒント

さまざまなパラメータについて説明することで、決定木の使用に関するいくつかのガイドラインを示します。パラメータは、重要度の高い順に概ねリストされています。新しいユーザーは、主に「問題特定パラメータ」セクションとmaxDepthパラメータを検討する必要があります。

問題特定パラメータ

これらのパラメータは、解決する問題とデータセットを記述します。それらは指定する必要があり、チューニングは必要ありません。

停止基準

これらのパラメータは、ツリーの構築(新しいノードの追加)がいつ停止するかを決定します。これらのパラメータを調整する場合は、過剰適合を避けるために、保持されたテストデータで検証するように注意してください。

調整可能なパラメータ

これらのパラメーターは調整できます。過学習を避けるために、調整時は必ず保持されたテストデータで検証してください。

キャッシングとチェックポイント

MLlib 1.2 では、より大規模(より深い)なツリーとツリーアンサンブルへのスケールアップのためのいくつかの機能が追加されました。maxDepth を大きく設定する場合、ノードIDキャッシングとチェックポイントを有効にすることが役立つ場合があります。これらのパラメーターは、numTrees が大きく設定されている場合の RandomForest にも役立ちます。

ノードIDキャッシングは、RDDのシーケンス(反復ごとに1つ)を生成します。この長い系譜はパフォーマンスの問題を引き起こす可能性がありますが、中間RDDのチェックポイント化によってこれらの問題を軽減できます。チェックポイント化は、useNodeIdCache が true に設定されている場合にのみ適用可能であることに注意してください。

スケーリング

計算量は、トレーニングインスタンスの数、特徴の数、maxBins パラメーターにほぼ線形に比例してスケールします。通信量は、特徴の数と maxBins にほぼ線形に比例してスケールします。

実装されたアルゴリズムは、スパースデータとデンスデータの両方を読み取ります。ただし、スパース入力に対して最適化されていません。

分類

以下の例は、LIBSVMデータファイル を読み込み、LabeledPoint のRDDとして解析し、ジニ不純度を不純度尺度として、最大ツリー深さを5として決定木を使用して分類を実行する方法を示しています。テストエラーを計算して、アルゴリズムの精度を測定します。

APIの詳細については、DecisionTree Python ドキュメントDecisionTreeModel Python ドキュメント を参照してください。

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
                                     impurity='gini', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/mllib/decision_tree_classification_example.py" にあります。

APIの詳細については、DecisionTree Scala ドキュメントDecisionTreeModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
  impurity, maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification tree model:\n ${model.toDebugString}")

// Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeClassificationExample.scala" にあります。

APIの詳細については、DecisionTree Java ドキュメントDecisionTreeModel Java ドキュメント を参照してください。

import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];

// Set parameters.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;

// Train a DecisionTree model for classification.
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
  categoricalFeaturesInfo, impurity, maxDepth, maxBins);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
  testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
  predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();

System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());

// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
  .load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeClassificationExample.java" にあります。

回帰

以下の例は、LIBSVMデータファイル を読み込み、LabeledPoint のRDDとして解析し、分散を不純度尺度として、最大ツリー深さを5として決定木を使用して回帰を実行する方法を示しています。最後に平均二乗誤差(MSE)を計算して、適合度の良さ を評価します。

APIの詳細については、DecisionTree Python ドキュメントDecisionTreeModel Python ドキュメント を参照してください。

from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils

# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
#  Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
                                    impurity='variance', maxDepth=5, maxBins=32)

# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
    float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())

# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/mllib/decision_tree_regression_example.py" にあります。

APIの詳細については、DecisionTree Scala ドキュメントDecisionTreeModel Scala ドキュメント を参照してください。

import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
  maxDepth, maxBins)

// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression tree model:\n ${model.toDebugString}")

// Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/DecisionTreeRegressionExample.scala" にあります。

APIの詳細については、DecisionTree Java ドキュメントDecisionTreeModel Java ドキュメント を参照してください。

import java.util.HashMap;
import java.util.Map;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;

SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];

// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "variance";
int maxDepth = 5;
int maxBins = 32;

// Train a DecisionTree model.
DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
  categoricalFeaturesInfo, impurity, maxDepth, maxBins);

// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
  testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
  double diff = pl._1() - pl._2();
  return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());

// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
DecisionTreeModel sameModel = DecisionTreeModel
  .load(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaDecisionTreeRegressionExample.java" にあります。