決定木 - RDDベースのAPI
決定木とそのアンサンブルは、分類と回帰の機械学習タスクで人気のある手法です。決定木は解釈が容易で、カテゴリカルな特徴量を処理し、多クラス分類の設定に拡張でき、特徴量のスケーリングを必要とせず、非線形性と特徴量の相互作用を捉えることができるため、広く使用されています。ランダムフォレストやブースティングなどのツリーアンサンブルアルゴリズムは、分類と回帰タスクでトップレベルのパフォーマンスを示すものの1つです。
spark.mllib
は、連続変数とカテゴリカルな特徴量の両方を使用して、2値分類、多クラス分類、回帰のための決定木をサポートしています。この実装は行ごとにデータを分割することで、数百万件のインスタンスを用いた分散トレーニングを可能にしています。
ツリーのアンサンブル(ランダムフォレストと勾配ブースティングツリー)については、アンサンブルガイドで説明しています。
基本アルゴリズム
決定木は、特徴空間を再帰的に2分木に分割する貪欲アルゴリズムです。ツリーは、最下部(葉)の各パーティションに対して同じラベルを予測します。各パーティションは、ツリーノードの情報利得を最大化するために、可能な分割のセットから最適な分割を選択することによって貪欲的に選択されます。言い換えれば、各ツリーノードで選択される分割は、$\underset{s}{\operatorname{argmax}} IG(D,s)$
のセットから選択され、ここで$IG(D,s)$
は、分割$s$
をデータセット$D$
に適用した場合の情報利得です。
ノードの不純度と情報利得
ノードの不純度は、ノードにおけるラベルの均質性の尺度です。現在の実装では、分類には2つの不純度尺度(ジニ不純度とエントロピー)、回帰には1つの不純度尺度(分散)を提供しています。
不純度 | タスク | 数式 | 説明 |
---|---|---|---|
ジニ不純度 | 分類 | $\sum_{i=1}^{C} f_i(1-f_i)$ | $f_i$はノードにおけるラベル$i$の頻度、$C$は一意なラベルの数です。 |
エントロピー | 分類 | $\sum_{i=1}^{C} -f_ilog(f_i)$ | $f_i$はノードにおけるラベル$i$の頻度、$C$は一意なラベルの数です。 |
分散 | 回帰 | $\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$ | $y_i$はインスタンスのラベル、$N$はインスタンスの数、$\mu$は$\frac{1}{N} \sum_{i=1}^N y_i$で与えられる平均です。 |
情報利得は、親ノードの不純度と、2つの子ノードの不純度の重み付き和の差です。分割$s$がサイズ$N$
のデータセット$D$
をサイズ$N_{left}$
と$N_{right}$
の2つのデータセット$D_{left}$
と$D_{right}$
に分割すると仮定すると、情報利得は
$IG(D,s) = Impurity(D) - \frac{N_{left}}{N} Impurity(D_{left}) - \frac{N_{right}}{N} Impurity(D_{right})$
分割候補
連続変数
単一マシン実装における小規模データセットの場合、各連続変数の分割候補は、通常、その特徴の固有値です。一部の実装では、特徴値をソートし、ソートされた固有値を分割候補として使用して、ツリー計算を高速化します。
大規模な分散データセットでは、特徴値のソートはコストがかかります。この実装では、データのサンプルされた一部に対して分位数計算を実行することで、近似的な分割候補のセットを計算します。ソートされた分割は「ビン」を作成し、そのようなビンの最大数はmaxBins
パラメータを使用して指定できます。
ビンの数はインスタンス数$N$
を超えることはできません(デフォルトのmaxBins
値は32であるため、まれなシナリオです)。ツリーアルゴリズムは、この条件が満たされない場合、ビンの数を自動的に減らします。
カテゴリカルな特徴量
$M$
個の可能な値(カテゴリ)を持つカテゴリカルな特徴量の場合、$2^{M-1}-1$
個の分割候補を考案できます。2値(0/1)分類と回帰では、平均ラベルによってカテゴリカルな特徴量値を並べ替えることで、分割候補の数を$M-1$
に減らすことができます。(詳細は、Elements of Statistical Machine Learningの9.2.4節を参照してください)。たとえば、ラベル1の割合が0.2、0.6、0.4である3つのカテゴリA、B、Cを持つカテゴリカルな特徴量1つを持つ2値分類問題では、カテゴリカルな特徴量はA、C、Bの順に並べ替えられます。2つの分割候補はA | C、BとA、C | Bです。ここで|は分割を表します。
多クラス分類では、可能な限り常に$2^{M-1}-1$
個の可能な分割が使用されます。$2^{M-1}-1$
がmaxBins
パラメータより大きい場合、2値分類と回帰で使用される方法と同様の(ヒューリスティックな)方法を使用します。$M$
個のカテゴリカルな特徴量値は不純度によって並べ替えられ、結果として得られる$M-1$
個の分割候補が考慮されます。
停止規則
次の条件のいずれかが満たされると、再帰的なツリー構築はノードで停止します。
- ノードの深さがトレーニングパラメータ
maxDepth
と等しくなります。 minInfoGain
より大きい情報利得をもたらす分割候補がありません。- それぞれ少なくとも
minInstancesPerNode
個のトレーニングインスタンスを持つ子ノードを生成する分割候補がありません。
使用上のヒント
さまざまなパラメータについて説明することで、決定木の使用に関するいくつかのガイドラインを示します。パラメータは、重要度の高い順に概ねリストされています。新しいユーザーは、主に「問題特定パラメータ」セクションとmaxDepth
パラメータを検討する必要があります。
問題特定パラメータ
これらのパラメータは、解決する問題とデータセットを記述します。それらは指定する必要があり、チューニングは必要ありません。
-
algo
: 決定木のタイプ。Classification
またはRegression
のいずれか。 -
numClasses
: クラスの数(Classification
のみ)。 -
categoricalFeaturesInfo
: どの特徴量がカテゴリカルであり、それらの特徴量のそれぞれがいくつカテゴリカルな値を取ることができるかを指定します。これは、特徴量のインデックスから特徴量のアーリティ(カテゴリの数)へのマップとして与えられます。このマップにない特徴は、連続変数として扱われます。- たとえば、
Map(0 -> 2, 4 -> 10)
は、特徴量0
が2値(0
または1
の値を取る)であり、特徴量4
が10個のカテゴリ(値{0, 1, ..., 9}
)を持つことを指定します。特徴量のインデックスは0ベースであることに注意してください。特徴量0
と4
は、インスタンスの特徴ベクトルの1番目と5番目の要素です。 categoricalFeaturesInfo
を指定する必要はありません。アルゴリズムは依然として実行され、妥当な結果が得られる可能性があります。ただし、カテゴリカルな特徴量が適切に指定されている場合は、パフォーマンスが向上するはずです。
- たとえば、
停止基準
これらのパラメータは、ツリーの構築(新しいノードの追加)がいつ停止するかを決定します。これらのパラメータを調整する場合は、過剰適合を避けるために、保持されたテストデータで検証するように注意してください。
-
maxDepth
: ツリーの最大深さ。より深いツリーは表現力が高くなります(高い精度が得られる可能性があります)が、トレーニングのコストも高く、過剰適合する可能性も高くなります。 -
minInstancesPerNode
: ノードをさらに分割するには、その子ノードのそれぞれが少なくともこの数のトレーニングインスタンスを受け取る必要があります。これは、個々のツリーよりも深くトレーニングされることが多いRandomForestとよく一緒に使用されます。 -
minInfoGain
: ノードをさらに分割するには、分割によって情報利得が少なくともこの値だけ向上する必要があります。
調整可能なパラメータ
これらのパラメーターは調整できます。過学習を避けるために、調整時は必ず保持されたテストデータで検証してください。
maxBins
: 連続特徴を離散化する際に使用するビンの数。maxBins
を増やすと、アルゴリズムはより多くの分割候補を考慮し、より細かい分割の決定を行うことができます。ただし、計算量と通信量も増加します。maxBins
パラメーターは、カテゴリ型特徴のカテゴリ数の最大値$M$
以上である必要があります。
maxMemoryInMB
: 十分な統計情報を収集するために使用するメモリの量。- デフォルト値は、ほとんどのシナリオで決定アルゴリズムが動作するように、保守的に 256 MiB に設定されています。
maxMemoryInMB
を増やすと、データのスキャン回数を減らすことでトレーニングを高速化できます(メモリが利用可能な場合)。ただし、maxMemoryInMB
が増加するにつれて、効果は減少する可能性があります。これは、各反復での通信量がmaxMemoryInMB
に比例する可能性があるためです。 - 実装の詳細: 処理速度を上げるため、決定木アルゴリズムは(一度に1つのノードではなく)分割するノードのグループに関する統計情報を収集します。1つのグループで処理できるノードの数は、メモリ要件(特徴ごとに異なります)によって決まります。
maxMemoryInMB
パラメーターは、各ワーカーがこれらの統計情報に使用できるメモリ制限をメガバイト単位で指定します。
- デフォルト値は、ほとんどのシナリオで決定アルゴリズムが動作するように、保守的に 256 MiB に設定されています。
-
subsamplingRate
: 決定木の学習に使用されるトレーニングデータの割合。このパラメーターは、ツリーのアンサンブル(RandomForest
とGradientBoostedTrees
を使用)のトレーニングにおいて最も関連性が高く、元のデータをサブサンプリングするのに役立ちます。単一の決定木のトレーニングの場合、トレーニングインスタンスの数が一般的に主要な制約ではないため、このパラメーターはあまり役に立ちません。 impurity
: 候補分割の選択に使用される不純度尺度(上記で説明)。この尺度は、algo
パラメーターと一致する必要があります。
キャッシングとチェックポイント
MLlib 1.2 では、より大規模(より深い)なツリーとツリーアンサンブルへのスケールアップのためのいくつかの機能が追加されました。maxDepth
を大きく設定する場合、ノードIDキャッシングとチェックポイントを有効にすることが役立つ場合があります。これらのパラメーターは、numTrees
が大きく設定されている場合の RandomForest にも役立ちます。
useNodeIdCache
: true に設定されている場合、アルゴリズムは各反復で現在のモデル(ツリーまたはツリーの集合)を実行プログラムに渡すことを回避します。- これは、深いツリー(ワーカーでの計算の高速化)や大規模なランダムフォレスト(各反復での通信量の削減)に役立ちます。
- 実装の詳細: デフォルトでは、アルゴリズムは現在のモデルを実行プログラムに送信し、実行プログラムがトレーニングインスタンスとツリーノードを一致させることができます。この設定が有効になっている場合、アルゴリズムは代わりにこの情報をキャッシュします。
ノードIDキャッシングは、RDDのシーケンス(反復ごとに1つ)を生成します。この長い系譜はパフォーマンスの問題を引き起こす可能性がありますが、中間RDDのチェックポイント化によってこれらの問題を軽減できます。チェックポイント化は、useNodeIdCache
が true に設定されている場合にのみ適用可能であることに注意してください。
-
checkpointDir
: ノードIDキャッシュRDDのチェックポイント化のためのディレクトリ。 -
checkpointInterval
: ノードIDキャッシュRDDのチェックポイント化の頻度。これを低すぎる値に設定すると、HDFSへの書き込みによるオーバーヘッドが発生します。高すぎる値に設定すると、実行プログラムが失敗し、RDDの再計算が必要になった場合に問題が発生する可能性があります。
スケーリング
計算量は、トレーニングインスタンスの数、特徴の数、maxBins
パラメーターにほぼ線形に比例してスケールします。通信量は、特徴の数と maxBins
にほぼ線形に比例してスケールします。
実装されたアルゴリズムは、スパースデータとデンスデータの両方を読み取ります。ただし、スパース入力に対して最適化されていません。
例
分類
以下の例は、LIBSVMデータファイル を読み込み、LabeledPoint
のRDDとして解析し、ジニ不純度を不純度尺度として、最大ツリー深さを5として決定木を使用して分類を実行する方法を示しています。テストエラーを計算して、アルゴリズムの精度を測定します。
APIの詳細については、DecisionTree
Python ドキュメント と DecisionTreeModel
Python ドキュメント を参照してください。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainClassifier(trainingData, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(
lambda lp: lp[0] != lp[1]).count() / float(testData.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
APIの詳細については、DecisionTree
Scala ドキュメント と DecisionTreeModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainClassifier(trainingData, numClasses, categoricalFeaturesInfo,
impurity, maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelAndPreds = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testErr = labelAndPreds.filter(r => r._1 != r._2).count().toDouble / testData.count()
println(s"Test Error = $testErr")
println(s"Learned classification tree model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myDecisionTreeClassificationModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeClassificationModel")
APIの詳細については、DecisionTree
Java ドキュメント と DecisionTreeModel
Java ドキュメント を参照してください。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeClassificationExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
int numClasses = 2;
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "gini";
int maxDepth = 5;
int maxBins = 32;
// Train a DecisionTree model for classification.
DecisionTreeModel model = DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testErr =
predictionAndLabel.filter(pl -> !pl._1().equals(pl._2())).count() / (double) testData.count();
System.out.println("Test Error: " + testErr);
System.out.println("Learned classification tree model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeClassificationModel");
回帰
以下の例は、LIBSVMデータファイル を読み込み、LabeledPoint
のRDDとして解析し、分散を不純度尺度として、最大ツリー深さを5として決定木を使用して回帰を実行する方法を示しています。最後に平均二乗誤差(MSE)を計算して、適合度の良さ を評価します。
APIの詳細については、DecisionTree
Python ドキュメント と DecisionTreeModel
Python ドキュメント を参照してください。
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.mllib.util import MLUtils
# Load and parse the data file into an RDD of LabeledPoint.
data = MLUtils.loadLibSVMFile(sc, 'data/mllib/sample_libsvm_data.txt')
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])
# Train a DecisionTree model.
# Empty categoricalFeaturesInfo indicates all features are continuous.
model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo={},
impurity='variance', maxDepth=5, maxBins=32)
# Evaluate model on test instances and compute test error
predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
testMSE = labelsAndPredictions.map(lambda lp: (lp[0] - lp[1]) * (lp[0] - lp[1])).sum() /\
float(testData.count())
print('Test Mean Squared Error = ' + str(testMSE))
print('Learned regression tree model:')
print(model.toDebugString())
# Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
APIの詳細については、DecisionTree
Scala ドキュメント と DecisionTreeModel
Scala ドキュメント を参照してください。
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))
// Train a DecisionTree model.
// Empty categoricalFeaturesInfo indicates all features are continuous.
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "variance"
val maxDepth = 5
val maxBins = 32
val model = DecisionTree.trainRegressor(trainingData, categoricalFeaturesInfo, impurity,
maxDepth, maxBins)
// Evaluate model on test instances and compute test error
val labelsAndPredictions = testData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val testMSE = labelsAndPredictions.map{ case (v, p) => math.pow(v - p, 2) }.mean()
println(s"Test Mean Squared Error = $testMSE")
println(s"Learned regression tree model:\n ${model.toDebugString}")
// Save and load model
model.save(sc, "target/tmp/myDecisionTreeRegressionModel")
val sameModel = DecisionTreeModel.load(sc, "target/tmp/myDecisionTreeRegressionModel")
APIの詳細については、DecisionTree
Java ドキュメント と DecisionTreeModel
Java ドキュメント を参照してください。
import java.util.HashMap;
import java.util.Map;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import org.apache.spark.mllib.util.MLUtils;
SparkConf sparkConf = new SparkConf().setAppName("JavaDecisionTreeRegressionExample");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
// Load and parse the data file.
String datapath = "data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(jsc.sc(), datapath).toJavaRDD();
// Split the data into training and test sets (30% held out for testing)
JavaRDD<LabeledPoint>[] splits = data.randomSplit(new double[]{0.7, 0.3});
JavaRDD<LabeledPoint> trainingData = splits[0];
JavaRDD<LabeledPoint> testData = splits[1];
// Set parameters.
// Empty categoricalFeaturesInfo indicates all features are continuous.
Map<Integer, Integer> categoricalFeaturesInfo = new HashMap<>();
String impurity = "variance";
int maxDepth = 5;
int maxBins = 32;
// Train a DecisionTree model.
DecisionTreeModel model = DecisionTree.trainRegressor(trainingData,
categoricalFeaturesInfo, impurity, maxDepth, maxBins);
// Evaluate model on test instances and compute test error
JavaPairRDD<Double, Double> predictionAndLabel =
testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
double testMSE = predictionAndLabel.mapToDouble(pl -> {
double diff = pl._1() - pl._2();
return diff * diff;
}).mean();
System.out.println("Test Mean Squared Error: " + testMSE);
System.out.println("Learned regression tree model:\n" + model.toDebugString());
// Save and load model
model.save(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");
DecisionTreeModel sameModel = DecisionTreeModel
.load(jsc.sc(), "target/tmp/myDecisionTreeRegressionModel");