MLパイプライン
\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]
このセクションでは、MLパイプラインの概念を紹介します。MLパイプラインは、DataFrame上に構築された、ユーザーが実用的な機械学習パイプラインを作成および調整するのに役立つ、統一された一連の高レベルAPIを提供します。
目次
パイプラインの主要な概念
MLlibは、機械学習アルゴリズムのAPIを標準化し、複数のアルゴリズムを単一のパイプライン、つまりワークフローに簡単に組み合わせることができるようにします。このセクションでは、パイプラインAPIによって導入された主要な概念について説明します。パイプラインの概念は、主にscikit-learnプロジェクトに触発されています。
-
DataFrame
: このML APIは、Spark SQLのDataFrame
をMLデータセットとして使用します。これは、さまざまなデータ型を保持できます。たとえば、DataFrame
は、テキスト、特徴量ベクトル、真のラベル、および予測を格納する異なる列を持つことができます。 -
Transformer
:Transformer
は、1つのDataFrame
を別のDataFrame
に変換できるアルゴリズムです。たとえば、MLモデルは、特徴量を持つDataFrame
を、予測を持つDataFrame
に変換するTransformer
です。 -
Estimator
:Estimator
は、DataFrame
に適合させてTransformer
を生成できるアルゴリズムです。たとえば、LogisticRegression
などの学習アルゴリズムはEstimator
であり、fit()
を呼び出すと、Model
であるLogisticRegressionModel
がトレーニングされます。これはTransformer
でもあります。 -
Pipeline
:Pipeline
は、複数のTransformer
とEstimator
をチェーンして、MLワークフローを指定します。 -
Parameter
: すべてのTransformer
とEstimator
は、パラメータを指定するための共通APIを共有するようになりました。
DataFrame
機械学習は、ベクトル、テキスト、画像、構造化データなど、さまざまなデータ型に適用できます。このAPIは、さまざまなデータ型をサポートするために、Spark SQLのDataFrame
を採用しています。
DataFrame
は、多くの基本型と構造化型をサポートしています。サポートされている型のリストについては、Spark SQLデータ型リファレンスを参照してください。Spark SQLガイドにリストされている型に加えて、DataFrame
はML Vector
型を使用できます。
DataFrame
は、通常のRDD
から暗黙的または明示的に作成できます。以下のコード例と、Spark SQLプログラミングガイドの例を参照してください。
DataFrame
の列には名前が付けられています。以下のコード例では、「text」、「features」、「label」などの名前を使用しています。
パイプラインコンポーネント
Transformer
Transformer
は、特徴量変換器と学習済みモデルを含む抽象化です。技術的には、Transformer
はtransform()
メソッドを実装しており、これは1つのDataFrame
を別のDataFrame
に変換します。通常は、1つ以上の列を追加することによって変換します。例えば
- 特徴量変換器は、
DataFrame
を取得し、列(例:テキスト)を読み取り、それを新しい列(例:特徴量ベクトル)にマッピングし、マッピングされた列が追加された新しいDataFrame
を出力する可能性があります。 - 学習モデルは、
DataFrame
を取得し、特徴量ベクトルを含む列を読み取り、各特徴量ベクトルのラベルを予測し、予測されたラベルが列として追加された新しいDataFrame
を出力する可能性があります。
Estimator
Estimator
は、学習アルゴリズム、またはデータに適合またはトレーニングするアルゴリズムの概念を抽象化したものです。技術的には、Estimator
はfit()
メソッドを実装しており、これはDataFrame
を受け取り、Transformer
であるModel
を生成します。たとえば、LogisticRegression
などの学習アルゴリズムはEstimator
であり、fit()
を呼び出すと、Model
であるLogisticRegressionModel
がトレーニングされます。これはTransformer
でもあります。
パイプラインコンポーネントのプロパティ
Transformer.transform()
とEstimator.fit()
はどちらもステートレスです。将来的には、代替概念を介してステートフルアルゴリズムがサポートされる可能性があります。
Transformer
またはEstimator
の各インスタンスには、パラメータの指定に役立つ一意のIDがあります(以下で説明)。
パイプライン
機械学習では、一連のアルゴリズムを実行してデータを処理し、データから学習するのが一般的です。たとえば、単純なテキストドキュメント処理ワークフローには、いくつかの段階が含まれる場合があります
- 各ドキュメントのテキストを単語に分割します。
- 各ドキュメントの単語を数値特徴量ベクトルに変換します。
- 特徴量ベクトルとラベルを使用して予測モデルを学習します。
MLlibは、このようなワークフローをPipeline
として表します。これは、特定の順序で実行される一連のPipelineStage
(Transformer
とEstimator
)で構成されます。このセクションでは、この単純なワークフローを実行例として使用します。
仕組み
Pipeline
は、一連のステージとして指定され、各ステージはTransformer
またはEstimator
のいずれかです。これらのステージは順番に実行され、入力DataFrame
は各ステージを通過する際に変換されます。Transformer
ステージの場合、DataFrame
でtransform()
メソッドが呼び出されます。Estimator
ステージの場合、fit()
メソッドが呼び出されてTransformer
(PipelineModel
、つまり適合したPipeline
の一部になります)が生成され、そのTransformer
のtransform()
メソッドがDataFrame
で呼び出されます。
単純なテキストドキュメントワークフローについて、これを説明します。以下の図は、Pipeline
の*トレーニング時*の使用法を示しています。
上記の最上段は、3つのステージを持つPipeline
を表しています。最初の2つ(Tokenizer
とHashingTF
)はTransformer
(青色)であり、3つ目(LogisticRegression
)はEstimator
(赤色)です。下段はパイプラインを流れるデータを表しており、円柱はDataFrame
を示しています。Pipeline.fit()
メソッドは、生のテキスト文書とラベルを持つ元のDataFrame
に対して呼び出されます。Tokenizer.transform()
メソッドは、生のテキスト文書を単語に分割し、単語を含む新しい列をDataFrame
に追加します。HashingTF.transform()
メソッドは、単語の列を特徴ベクトルに変換し、それらのベクトルを含む新しい列をDataFrame
に追加します。ここで、LogisticRegression
はEstimator
であるため、Pipeline
は最初にLogisticRegression.fit()
を呼び出してLogisticRegressionModel
を生成します。Pipeline
にさらにEstimator
がある場合、DataFrame
を次のステージに渡す前に、LogisticRegressionModel
のtransform()
メソッドをDataFrame
に対して呼び出します。
Pipeline
はEstimator
です。したがって、Pipeline
のfit()
メソッドが実行された後、Transformer
であるPipelineModel
が生成されます。このPipelineModel
は*テスト時*に使用されます。以下の図はこの使用方法を示しています。
上の図では、PipelineModel
は元のPipeline
と同じ数のステージを持っていますが、元のPipeline
のすべてのEstimator
はTransformer
になっています。PipelineModel
のtransform()
メソッドがテストデータセットに対して呼び出されると、データは適合されたパイプラインを順番に通過します。各ステージのtransform()
メソッドはデータセットを更新し、次のステージに渡します。
Pipeline
とPipelineModel
は、トレーニングデータとテストデータが同じ特徴量処理手順を経ることを保証するのに役立ちます。
詳細
DAG Pipeline
:Pipeline
のステージは、順序付けられた配列として指定されます。ここで示した例はすべて線形Pipeline
、つまり各ステージが前のステージによって生成されたデータを使用するPipeline
です。データフローグラフが有向非巡回グラフ(DAG)を形成する限り、非線形Pipeline
を作成することが可能です。このグラフは現在、各ステージの入力および出力列名(一般的にパラメータとして指定されます)に基づいて暗黙的に指定されます。Pipeline
がDAGを形成する場合、ステージはトポロジカル順に指定する必要があります。
実行時チェック:Pipeline
はさまざまなタイプのDataFrame
を操作できるため、コンパイル時型チェックを使用できません。Pipeline
とPipelineModel
は、代わりにPipeline
を実際に実行する前に実行時チェックを行います。この型チェックは、DataFrame
の*スキーマ*、つまりDataFrame
の列のデータ型の記述を使用して行われます。
一意のPipelineステージ:Pipeline
のステージは一意のインスタンスである必要があります。たとえば、Pipeline
ステージは一意のIDを持つ必要があるため、同じインスタンスmyHashingTF
をPipeline
に2回挿入しないでください。ただし、異なるインスタンスmyHashingTF1
とmyHashingTF2
(どちらもHashingTF
型)は、異なるインスタンスが異なるIDで作成されるため、同じPipeline
に入れることができます。
パラメータ
MLlibのEstimator
とTransformer
は、パラメータを指定するための統一APIを使用します。
Param
は、自己完結型のドキュメントを持つ名前付きパラメータです。ParamMap
は、(パラメータ、値)のペアのセットです。
アルゴリズムにパラメータを渡すには、主に2つの方法があります。
- インスタンスのパラメータを設定します。たとえば、
lr
がLogisticRegression
のインスタンスである場合、lr.setMaxIter(10)
を呼び出して、lr.fit()
が最大10回の反復を使用するようにすることができます。このAPIは、spark.mllib
パッケージで使用されるAPIに似ています。 ParamMap
をfit()
またはtransform()
に渡します。ParamMap
内のパラメータは、セッターメソッドを介して以前に指定されたパラメータをオーバーライドします。
パラメータは、Estimator
とTransformer
の特定のインスタンスに属します。たとえば、2つのLogisticRegression
インスタンスlr1
とlr2
がある場合、両方のmaxIter
パラメータが指定されたParamMap
を構築できます:ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)
。これは、Pipeline
にmaxIter
パラメータを持つアルゴリズムが2つある場合に便利です。
ML永続性: パイプラインの保存と読み込み
多くの場合、後で使用するためにモデルまたはパイプラインをディスクに保存する価値があります。Spark 1.6では、モデルのインポート/エクスポート機能がPipeline APIに追加されました。Spark 2.3以降、spark.ml
およびpyspark.ml
のDataFrameベースのAPIは完全に網羅されています。
MLの永続性は、Scala、Java、Pythonで機能します。ただし、Rは現在変更された形式を使用しているため、Rに保存されたモデルはRでのみロードできます。これは将来修正される予定であり、SPARK-15572で追跡されています。
ML永続性の下位互換性
一般的に、MLlibはMLの永続性について後方互換性を維持します。つまり、SparkのあるバージョンでMLモデルまたはPipelineを保存した場合、将来のバージョンのSparkでそれをロードして使用できるはずです。ただし、まれに例外があり、以下に説明します。
モデルの永続性:SparkバージョンXでApache Spark ML永続性を使用して保存されたモデルまたはPipelineは、SparkバージョンYでロードできますか?
- メジャーバージョン:保証はありませんが、ベストエフォートです。
- マイナーバージョンとパッチバージョン:はい。これらは後方互換性があります。
- 形式に関する注意:安定した永続性形式の保証はありませんが、モデルのロード自体は後方互換性を持つように設計されています。
モデルの動作:SparkバージョンXのモデルまたはPipelineは、SparkバージョンYで同じように動作しますか?
- メジャーバージョン:保証はありませんが、ベストエフォートです。
- マイナーバージョンとパッチバージョン:バグ修正を除いて、同じ動作です。
モデルの永続性とモデルの動作の両方について、マイナーバージョンまたはパッチバージョン間の破壊的な変更は、Sparkバージョンのリリースノートに報告されています。リリースノートに破損が報告されていない場合、修正されるべきバグとして扱う必要があります。
コード例
このセクションでは、上記の機能を示すコード例を示します。詳細については、APIドキュメント(Scala、Java、Python)を参照してください。
例: Estimator、Transformer、およびParam
この例では、Estimator
、Transformer
、Param
の概念について説明します。
APIの詳細については、Estimator
Pythonドキュメント、Transformer
Pythonドキュメント、Params
Pythonドキュメントを参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30 # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55}) # type: ignore
# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2) # type: ignore
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())
# Prepare test data
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
APIの詳細については、Estimator
Scalaドキュメント、Transformer
Scalaドキュメント、Params
Scalaドキュメントを参照してください。
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
APIの詳細については、Estimator
Javaドキュメント、Transformer
Javaドキュメント、Params
Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Prepare training data.
List<Row> dataTraining = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);
// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
.put(lr.maxIter().w(20)) // Specify 1 Param.
.put(lr.maxIter(), 30) // This overwrites the original maxIter.
.put(lr.regParam().w(0.1), lr.threshold().w(0.55)); // Specify multiple Params.
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
.put(lr.probabilityCol().w("myProbability")); // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
// Prepare test documents.
List<Row> dataTest = Arrays.asList(
RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
例: パイプライン
この例では、上記の図に示されている単純なテキストドキュメントPipeline
に従います。
APIの詳細については、Pipeline
Pythonドキュメントを参照してください。
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print(
"(%d, %s) --> prob=%s, prediction=%f" % (
rid, text, str(prob), prediction # type: ignore
)
)
APIの詳細については、Pipeline
Scalaドキュメントを参照してください。
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
APIの詳細については、Pipeline
Javaドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
new JavaLabeledDocument(1L, "b d", 0.0),
new JavaLabeledDocument(2L, "spark f g h", 1.0),
new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);
// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);
// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
モデル選択 (ハイパーパラメータチューニング)
MLパイプラインを使う大きなメリットとして、ハイパーパラメータの最適化が挙げられます。自動モデル選択について詳しくは、MLチューニングガイドをご覧ください。