決定木 - RDDベースAPI
決定木とそのアンサンブルは、分類および回帰の機械学習タスクで人気のある手法です。決定木は、解釈が容易で、カテゴリカル特徴量を処理でき、マルチクラス分類設定に拡張でき、特徴量スケーリングを必要とせず、非線形性や特徴量間の相互作用を捉えることができるため、広く使用されています。ランダムフォレストやブースティングなどのツリーアンサンブルアルゴリズムは、分類および回帰タスクでトップクラスのパフォーマンスを発揮します。
spark.mllib は、バイナリおよびマルチクラス分類、回帰に対して、連続特徴量とカテゴリカル特徴量の両方を使用した決定木をサポートします。この実装では、データを行ごとにパーティション分割することで、数百万インスタンスでの分散トレーニングが可能になります。
ツリーのアンサンブル(ランダムフォレストおよび勾配ブースティングツリー)については、アンサンブルガイドで説明しています。
基本アルゴリズム
決定木は、特徴空間の再帰的な二項分割を実行する貪欲なアルゴリズムです。ツリーは、各最下位(リーフ)パーティションに対して同じラベルを予測します。各パーティションは、ツリーノードでの情報利得を最大化するように、可能な分割のセットから**最適な分割**を選択することによって貪欲に選択されます。言い換えると、各ツリーノードで選択される分割は、分割 $s$ がデータセット $D$ に適用されたときの情報利得を $IG(D,s)$ とすると、 $\underset{s}{\operatorname{argmax}} IG(D,s)$ のセットから選択されます。
ノードの不純度と情報利得
**ノードの不純度**は、ノードでのラベルの均一性の尺度です。現在の実装では、分類用に2つの不純度尺度(ジニ不純度とエントロピー)、回帰用に1つの不純度尺度(分散)が提供されています。
| 不純度 | タスク | 式 | 説明 |
|---|---|---|---|
| ジニ不純度 | 分類 | $\sum_{i=1}^{C} f_i(1-f_i)$ | $f_i$ はノードでのラベル $i$ の頻度、$C$ は一意のラベルの数です。 |
| エントロピー | 分類 | $\sum_{i=1}^{C} -f_ilog(f_i)$ | $f_i$ はノードでのラベル $i$ の頻度、$C$ は一意のラベルの数です。 |
| 分散 | 回帰 | $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$ | $y_i$ はインスタンスのラベル、$N$ はインスタンスの数、$\mu$ は $\frac{1}{N} \sum_{i=1}^N y_i$ によって与えられる平均です。 |
**情報利得**は、親ノードの不純度と、2つの子ノードの不純度の重み付き合計との差です。分割 $s$ がサイズ $N$ のデータセット $D$ を、それぞれサイズ $N_{left}$ および $N_{right}$ の2つのデータセット $D_{left}$ および $D_{right}$ に分割すると仮定すると、情報利得は次のようになります。
$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$
分割候補
連続特徴量
単一マシン実装の小規模データセットの場合、各連続特徴量の分割候補は通常、その特徴量のユニークな値です。一部の実装では、特徴量値をソートし、ソートされたユニークな値を分割候補として使用することで、ツリー計算を高速化します。
大規模分散データセットの場合、特徴量値のソートはコストがかかります。この実装では、データのサンプリングされた一部に対して量子化計算を実行することで、分割候補の近似セットを計算します。ソートされた分割は「ビン」を作成し、このようなビンの最大数は maxBins パラメータを使用して指定できます。
ビンの数はインスタンス数 $N$ を超えることはできません(デフォルトの maxBins 値は32であるため、これはまれなケースです)。ツリーアルゴリズムは、条件が満たされない場合、ビンの数を自動的に減らします。
カテゴリカル特徴量
$M$ 個の可能な値(カテゴリ)を持つカテゴリカル特徴量の場合、$2^{M-1}-1$ 個の分割候補が考えられます。バイナリ(0/1)分類および回帰では、カテゴリカル特徴量値を平均ラベルでソートすることにより、分割候補の数を $M-1$ に減らすことができます。(詳細は、Elements of Statistical Machine Learning のセクション9.2.4を参照してください。)たとえば、3つのカテゴリA、B、Cを持ち、対応するラベル1の割合が0.2、0.6、0.4であるバイナリ分類問題では、カテゴリカル特徴量はA、C、Bの順にソートされます。2つの分割候補は A | C, B および A , C | B です。ここで | は分割を示します。
マルチクラス分類では、可能なすべての $2^{M-1}-1$ 個の分割が可能な限り使用されます。 $2^{M-1}-1$ が maxBins パラメータより大きい場合、(ヒューリスティックな)方法がバイナリ分類および回帰に使用される方法と同様に使用されます。 $M$ 個のカテゴリカル特徴量値は不純度でソートされ、結果として得られる $M-1$ 個の分割候補が検討されます。
停止ルール
再帰的なツリー構築は、以下のいずれかの条件が満たされたときにノードで停止します。
- ノードの深さが
maxDepthトレーニングパラメータに等しい。 - 情報利得が
minInfoGainより大きい分割候補がない。 - 情報利得が
minInfoGainより大きい分割候補がない。
使用上のヒント
いくつかのガイドラインを、さまざまなパラメータを議論することで決定木の使用について示します。パラメータは、重要度の降順でおおよそリストされています。新規ユーザーは、主に「問題仕様パラメータ」セクションと maxDepth パラメータを検討してください。
問題仕様パラメータ
これらのパラメータは、解決したい問題とデータセットを記述します。これらは指定する必要があり、チューニングは必要ありません。
-
algo:決定木のタイプ。ClassificationまたはRegressionです。 -
numClasses:(Classificationのみ)クラスの数。 -
categoricalFeaturesInfo:どの特徴量がカテゴリカルであるか、およびそれらの各特徴量が取りうるカテゴリカル値の数を示します。これは、特徴量インデックスから特徴量のアリティ(カテゴリ数)へのマップとして与えられます。このマップに含まれていない特徴量は、連続として扱われます。- たとえば、
Map(0 -> 2, 4 -> 10)は、特徴量0がバイナリ(値0または1を取る)であり、特徴量4が10個のカテゴリ(値{0, 1, ..., 9})を持つことを示します。特徴量インデックスは0から始まります。特徴量0と4は、インスタンスの特徴量ベクトルの1番目と5番目の要素です。 categoricalFeaturesInfoを指定する必要はないことに注意してください。アルゴリズムはそれでも実行され、合理的な結果が得られる可能性があります。ただし、カテゴリカル特徴量が適切に指定されている場合、パフォーマンスは向上するはずです。
- たとえば、
停止基準
これらのパラメータは、ツリーの構築(新しいノードの追加)をいつ停止するかを決定します。これらのパラメータをチューニングする際は、過学習を避けるために、分離されたテストデータで検証するように注意してください。
-
maxDepth:ツリーの最大深度。深いツリーはより表現力豊か(潜在的に精度が高くなる可能性がある)ですが、トレーニングコストが高く、過学習しやすくなります。 -
minInstancesPerNode:ノードをさらに分割するためには、その子ノードごとに少なくともこの数のトレーニングインスタンスを受け取る必要があります。これは、RandomForest でよく使用されます。なぜなら、それらは個々のツリーよりも深くトレーニングされることが多いためです。 -
minInfoGain:ノードをさらに分割するためには、分割によって(情報利得の観点から)少なくともこの量だけ改善する必要があります。
調整可能なパラメータ
これらのパラメータはチューニング可能です。過学習を避けるために、チューニング時には分離されたテストデータで検証するように注意してください。
maxBins:連続特徴量を離散化する際に使用されるビンの数。maxBinsを増やすと、アルゴリズムはより多くの分割候補を検討し、きめ細かな分割決定を行うことができます。ただし、計算と通信も増加します。maxBinsパラメータは、任意のカテゴリカル特徴量に対して、カテゴリの最大数 $M$ 以上である必要があることに注意してください。
maxMemoryInMB:統計情報を収集するために使用されるメモリ量。- デフォルト値は、決定アルゴリズムがほとんどのシナリオで機能するように、保守的に256 MiBに設定されています。
maxMemoryInMBを増やすと、より少ないデータパスを許可することで、トレーニングが高速になる可能性があります(メモリが利用可能な場合)。ただし、各イテレーションでの通信量がmaxMemoryInMBに比例する可能性があるため、maxMemoryInMBが大きくなるにつれて収穫逓減が生じる可能性があります。 - 実装上の詳細:高速化のため、決定木アルゴリズムは、1ノードずつではなく、分割対象のノードのグループに関する統計情報を収集します。1つのグループで処理できるノードの数は、メモリ要件(特徴量によって異なります)によって決まります。
maxMemoryInMBパラメータは、各ワーカーがこれらの統計情報に使用できるメガバイト単位のメモリ制限を指定します。
- デフォルト値は、決定アルゴリズムがほとんどのシナリオで機能するように、保守的に256 MiBに設定されています。
-
subsamplingRate:決定木学習に使用されるトレーニングデータの割合。このパラメータは、ツリーのアンサンブル(RandomForestおよびGradientBoostedTreesを使用)のトレーニングに最も関連性があり、元のデータをサブサンプリングするのに役立つ場合があります。単一の決定木をトレーニングする場合、トレーニングインスタンスの数が通常主な制約ではないため、このパラメータの有用性は低くなります。 impurity:候補分割の選択に使用される不純度尺度(上記で説明)。この尺度は、algoパラメータと一致する必要があります。
キャッシングとチェックポインティング
MLlib 1.2 では、より大規模な(より深い)ツリーおよびツリーアンサンブルへのスケーリングのためのいくつかの機能が追加されました。maxDepth が大きく設定されている場合、ノードIDキャッシュとチェックポインティングを有効にすると役立ちます。これらのパラメータは、 numTrees が大きく設定されている場合の RandomForest にも役立ちます。
useNodeIdCache:これが true に設定されている場合、アルゴリズムは各イテレーションで現在のモデル(ツリーまたはツリー群)をエグゼキュータに渡すことを回避します。- これは、深いツリー(ワーカーでの計算を高速化)や大規模なランダムフォレスト(各イテレーションでの通信を削減)で役立つ場合があります。
- 実装上の詳細:デフォルトでは、アルゴリズムは現在のモデルをエグゼキュータに通信して、エグゼキュータがトレーニングインスタンスとツリーノードを一致させられるようにします。この設定が有効になっている場合、アルゴリズムは代わりにこの情報をキャッシュします。
ノードIDキャッシングは、RDDのシーケンス(イテレーションごとに1つ)を生成します。この長い lineage はパフォーマンスの問題を引き起こす可能性がありますが、中間RDDをチェックポイントすることでこれらの問題を軽減できます。チェックポインティングは、 useNodeIdCache が true に設定されている場合にのみ適用されることに注意してください。
-
checkpointDir:ノードIDキャッシュRDDをチェックポイントするためのディレクトリ。 -
checkpointInterval:ノードIDキャッシュRDDのチェックポイント間隔。これを低く設定しすぎると、HDFSへの書き込みによるオーバーヘッドが増加します。高く設定しすぎると、エグゼキュータが失敗した場合にRDDを再計算する必要がある場合に問題が発生する可能性があります。
スケーリング
計算は、トレーニングインスタンスの数、特徴量の数、および maxBins パラメータに対してほぼ線形にスケールします。通信は、特徴量の数と maxBins に対してほぼ線形にスケールします。
実装されたアルゴリズムは、スパースデータと密データの両方を読み取ります。ただし、スパース入力用に最適化されていません。
例
分類
以下の例は、 LIBSVMデータファイルをロードし、それを LabeledPoint のRDDとして解析し、ジニ不純度を不純度尺度とし、最大ツリー深度を5として、決定木を使用して分類を実行する方法を示しています。テストエラーは、アルゴリズムの精度を測定するために計算されます。
APIの詳細については、 DecisionTree Python ドキュメントおよび DecisionTreeModel Python ドキュメントを参照してください。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")APIの詳細については、 DecisionTree Scala ドキュメントおよび DecisionTreeModel Scala ドキュメントを参照してください。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification tree model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")APIの詳細については、 DecisionTree Java ドキュメントおよび DecisionTreeModel Java ドキュメントを参照してください。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
// Train a DecisionTree model for classification.
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");回帰
以下の例は、 LIBSVMデータファイルをロードし、それを LabeledPoint のRDDとして解析し、分散を不純度尺度とし、最大ツリー深度を5として、決定木を使用して回帰を実行する方法を示しています。最後に平均二乗誤差(MSE)を計算して、適合度を評価します。
APIの詳細については、 DecisionTree Python ドキュメントおよび DecisionTreeModel Python ドキュメントを参照してください。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
impurity='variance', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")APIの詳細については、 DecisionTree Scala ドキュメントおよび DecisionTreeModel Scala ドキュメントを参照してください。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression tree model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")APIの詳細については、 DecisionTree Java ドキュメントおよび DecisionTreeModel Java ドキュメントを参照してください。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "variance";
int maxDepth = 5;
int maxBins = 32;
// Train a DecisionTree model.
DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");