ML Pipelines

\[ \newcommand{\R}{\mathbb{R}} \newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} \newcommand{\av}{\mathbf{\alpha}} \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} \newcommand{\ind}{\mathbf{1}} \newcommand{\0}{\mathbf{0}} \newcommand{\unit}{\mathbf{e}} \newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]

このセクションでは、ML Pipelines の概念を紹介します。ML Pipelinesは、DataFrames 上に構築された統一された高レベルAPIセットを提供し、ユーザーが実用的な機械学習パイプラインを作成・調整するのを支援します。

目次

パイプラインの主要概念

MLlibは機械学習アルゴリズムのAPIを標準化しており、複数のアルゴリズムを単一のパイプラインまたはワークフローに簡単に組み合わせることができます。このセクションでは、Pipelines APIによって導入された主要な概念について説明します。パイプラインの概念は主にscikit-learnプロジェクトに触発されています。

DataFrame

機械学習は、ベクトル、テキスト、画像、構造化データなど、さまざまなデータ型に適用できます。このAPIは、さまざまなデータ型をサポートするために、Spark SQLのDataFrameを採用しています。

DataFrameは多くの基本型および構造化型をサポートしています。サポートされている型のリストについては、Spark SQLデータ型リファレンスを参照してください。Spark SQLガイドに記載されている型に加えて、DataFrameはMLのVector型を使用できます。

通常のRDDから暗黙的または明示的にDataFrameを作成できます。コード例と、Spark SQLプログラミングガイドの例を参照してください。

DataFrameの列には名前が付けられています。以下のコード例では、「text」、「features」、「label」などの名前を使用しています。

パイプラインコンポーネント

Transformer (変換器)

Transformerは、特徴量変換器および学習済みモデルを含む抽象化です。技術的には、Transformertransform()メソッドを実装しており、これは1つのDataFrameを別のDataFrameに変換します。一般的には、1つ以上の列を追加します。例として、

Estimator (推定器)

Estimatorは、学習アルゴリズムまたはデータに適合または学習する任意のアルゴリズムの概念を抽象化します。技術的には、Estimatorfit()メソッドを実装しており、これはDataFrameを受け入れてTransformerであるModelを生成します。例えば、LogisticRegressionのような学習アルゴリズムはEstimatorであり、fit()を呼び出すとTransformerであるModelLogisticRegressionModelがトレーニングされます。

パイプラインコンポーネントのプロパティ

Transformer.transform()Estimator.fit()はどちらもステートレスです。将来的には、代替概念を介してステートフルなアルゴリズムがサポートされる可能性があります。

TransformerまたはEstimatorのインスタンスには一意のIDがあり、これはパラメータの指定(後述)に役立ちます。

Pipeline (パイプライン)

機械学習では、データを処理し学習するために一連のアルゴリズムを実行することが一般的です。例えば、単純なテキストドキュメント処理ワークフローにはいくつかの段階が含まれる場合があります。

MLlibは、このようなワークフローをPipelineとして表現します。これは、特定の順序で実行されるPipelineStageTransformerおよびEstimator)のシーケンスで構成されます。この単純なワークフローを、このセクションで継続的な例として使用します。

動作方法

Pipelineはステージのシーケンスとして指定され、各ステージはTransformerまたはEstimatorです。これらのステージは順序通りに実行され、入力DataFrameは各ステージを通過する際に変換されます。Transformerステージの場合、transform()メソッドがDataFrameに対して呼び出されます。Estimatorステージの場合、fit()メソッドが呼び出されてTransformerPipelineModel、または適合済みPipelineの一部となる)が生成され、そのTransformertransform()メソッドがDataFrameに対して呼び出されます。

単純なテキストドキュメントワークフローについてこれを説明します。下の図は、Pipelineトレーニング時の使用法を示しています。

ML Pipeline Example

上記では、上段は3つのステージを持つPipelineを表しています。最初の2つ(TokenizerHashingTF)はTransformer(青)であり、3番目(LogisticRegression)はEstimator(赤)です。下段はパイプラインを流れるデータを表しており、円筒はDataFrameを示しています。Pipeline.fit()メソッドは、生のテキストドキュメントとラベルを持つ元のDataFrameに対して呼び出されます。Tokenizer.transform()メソッドは、生のテキストドキュメントを単語に分割し、単語を含む新しい列をDataFrameに追加します。HashingTF.transform()メソッドは、単語列を特徴量ベクトルに変換し、それらのベクトルを含む新しい列をDataFrameに追加します。ここで、LogisticRegressionEstimatorであるため、PipelineはまずLogisticRegression.fit()を呼び出してLogisticRegressionModelを生成します。もしPipelineにさらにEstimatorがあれば、LogisticRegressionModeltransform()メソッドをDataFrameに対して呼び出し、それを次のステージに渡します。

PipelineEstimatorです。したがって、Pipelinefit()メソッドが実行された後、TransformerであるPipelineModelが生成されます。このPipelineModelテスト時に使用されます。下の図は、この使用法を示しています。

ML PipelineModel Example

上記の図では、PipelineModelは元のPipelineと同じ数のステージを持っていますが、元のPipeline内のすべてのEstimatorTransformerになっています。テストデータセットに対してPipelineModeltransform()メソッドが呼び出されると、データは適合されたパイプラインを順に通過します。各ステージのtransform()メソッドはデータセットを更新し、次のステージに渡します。

PipelinePipelineModelは、トレーニングデータとテストデータが同一の特徴量処理ステップを通過することを保証するのに役立ちます。

詳細

DAGパイプライン: Pipelineのステージは、順序付けられた配列として指定されます。ここで示す例はすべて線形Pipeline用です。つまり、各ステージが前のステージによって生成されたデータを使用するPipelineです。データフローグラフが有向非巡回グラフ(DAG)を形成する限り、非線形Pipelineを作成することが可能です。このグラフは現在、各ステージの入力および出力列名(一般的にはパラメータとして指定される)に基づいて暗黙的に指定されます。PipelineがDAGを形成する場合、ステージはトポロジカル順序で指定する必要があります。

実行時チェック: Pipelineはさまざまな型のDataFrameで操作できるため、コンパイル時型チェックを使用できません。PipelinePipelineModelは、実際にPipelineを実行する前に実行時チェックを行います。この型チェックは、DataFrameの列のデータ型の説明であるDataFrameスキーマを使用して行われます。

一意のパイプラインステージ: Pipelineのステージは一意のインスタンスである必要があります。例えば、同じインスタンスmyHashingTFPipelineに2回挿入すべきではありません。なぜなら、Pipelineステージは一意のIDを持つ必要があるからです。ただし、異なるインスタンスmyHashingTF1myHashingTF2(どちらもHashingTF型)は、異なるIDで作成されるため、同じPipelineに入れることができます。

パラメータ

MLlibのEstimatorTransformerは、パラメータを指定するための統一APIを使用します。

Paramは、自己完結型のドキュメントを持つ名前付きパラメータです。ParamMapは、(パラメータ、値)ペアのセットです。

アルゴリズムにパラメータを渡す主な方法は2つあります。

  1. インスタンスのパラメータを設定します。例えば、lrLogisticRegressionのインスタンスである場合、lr.setMaxIter(10)を呼び出すことで、lr.fit()が最大10回の反復を使用するようにできます。このAPIは、spark.mllibパッケージで使用されるAPIに似ています。
  2. ParamMapfit()またはtransform()に渡します。ParamMap内のパラメータは、セッターメソッドを介して以前に指定されたパラメータを上書きします。

パラメータは、EstimatorおよびTransformerの特定のインスタンスに属します。例えば、2つのLogisticRegressionインスタンスlr1lr2がある場合、両方のmaxIterパラメータを指定したParamMapを構築できます: ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)。これは、Pipeline内にmaxIterパラメータを持つ2つのアルゴリズムがある場合に便利です。

ML永続化: パイプラインの保存と読み込み

モデルまたはパイプラインをディスクに保存して後で使用することは、しばしば価値があります。Spark 1.6では、Pipeline APIにモデルのインポート/エクスポート機能が追加されました。Spark 2.3現在、spark.mlおよびpyspark.mlのDataFrameベースAPIは完全なカバレッジを持っています。

ML永続化はScala、Java、Python間で機能します。ただし、Rは現在変更された形式を使用しているため、Rで保存されたモデルはRでしか読み込めません。これは将来的に修正される予定であり、SPARK-15572で追跡されています。

ML永続化の後方互換性

一般的に、MLlibはML永続化の後方互換性を維持します。つまり、Sparkの1つのバージョンでMLモデルまたはPipelineを保存した場合、将来のバージョンのSparkでそれを読み込んで使用できるはずです。ただし、まれな例外があり、以下に説明します。

モデル永続化: Spark ML永続化を使用してSparkバージョンXで保存されたモデルまたはPipelineは、SparkバージョンYでロード可能ですか?

モデルの動作: SparkバージョンXのモデルまたはPipelineは、SparkバージョンYで同一に動作しますか?

モデルの永続化とモデルの動作の両方において、マイナーバージョンまたはパッチバージョンをまたぐ破壊的な変更は、Sparkバージョンリリースノートで報告されます。リリースノートで破壊的な変更が報告されていない場合は、修正すべきバグとして扱われるべきです。

コード例

このセクションでは、前述の機能を示すコード例を提供します。詳細については、APIドキュメント(PythonScala、およびJava)を参照してください。

例: Estimator、Transformer、およびParam

この例では、EstimatorTransformer、およびParamの概念を扱います。

APIの詳細については、Estimator PythonドキュメントTransformer Pythonドキュメント、およびParams Pythonドキュメントを参照してください。

from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

# Prepare training data from a list of (label, features) tuples.
training = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.0, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)

# Since model1 is a Model (i.e., a transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30  # Specify 1 Param, overwriting the original maxIter.
# Specify multiple Params.
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})  # type: ignore

# You can combine paramMaps, which are python dictionaries.
# Change output column name
paramMap2 = {lr.probabilityCol: "myProbability"}
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)  # type: ignore

# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# Prepare test data
test = spark.createDataFrame([
    (1.0, Vectors.dense([-1.0, 1.5, 1.3])),
    (0.0, Vectors.dense([3.0, 2.0, -0.1])),
    (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# Make predictions on test data using the Transformer.transform() method.
# LogisticRegression.transform will only use the 'features' column.
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction") \
    .collect()

for row in result:
    print("features=%s, label=%s -> prob=%s, prediction=%s"
          % (row.features, row.label, row.myProbability, row.prediction))
完全な例コードは、Sparkリポジトリの「examples/src/main/python/ml/estimator_transformer_param_example.py」で見つけることができます。

APIの詳細については、Estimator ScalaドキュメントTransformer Scalaドキュメント、およびParams Scalaドキュメントを参照してください。

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap()}")

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap()}")

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }
完全な例コードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala」で見つけることができます。

APIの詳細については、Estimator JavaドキュメントTransformer Javaドキュメント、およびParams Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Prepare training data.
List<Row> dataTraining = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(0.0, 1.1, 0.1)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.0, -1.0)),
    RowFactory.create(0.0, Vectors.dense(2.0, 1.3, 1.0)),
    RowFactory.create(1.0, Vectors.dense(0.0, 1.2, -0.5))
);
StructType schema = new StructType(new StructField[]{
    new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> training = spark.createDataFrame(dataTraining, schema);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");

// We may set parameters using setter methods.
lr.setMaxIter(10).setRegParam(0.01);

// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());

// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap()
  .put(lr.maxIter().w(20))  // Specify 1 Param.
  .put(lr.maxIter(), 30)  // This overwrites the original maxIter.
  .put(lr.regParam().w(0.1), lr.threshold().w(0.55));  // Specify multiple Params.

// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap()
  .put(lr.probabilityCol().w("myProbability"));  // Change output column name
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());

// Prepare test documents.
List<Row> dataTest = Arrays.asList(
    RowFactory.create(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
    RowFactory.create(0.0, Vectors.dense(3.0, 2.0, -0.1)),
    RowFactory.create(1.0, Vectors.dense(0.0, 2.2, -1.5))
);
Dataset<Row> test = spark.createDataFrame(dataTest, schema);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
完全な例コードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaEstimatorTransformerParamExample.java」で見つけることができます。

例: Pipeline

この例は、上記の図で示された単純なテキストドキュメントPipelineに従っています。

APIの詳細については、Pipeline Pythonドキュメントを参照してください。

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop")
], ["id", "text"])

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print(
        "(%d, %s) --> prob=%s, prediction=%f" % (
            rid, text, str(prob), prediction   # type: ignore
        )
    )
完全な例コードは、Sparkリポジトリの「examples/src/main/python/ml/pipeline_example.py」で見つけることができます。

APIの詳細については、Pipeline Scalaドキュメントを参照してください。

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }
完全な例コードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala」で見つけることができます。

APIの詳細については、Pipeline Javaドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Prepare training documents, which are labeled.
Dataset<Row> training = spark.createDataFrame(Arrays.asList(
  new JavaLabeledDocument(0L, "a b c d e spark", 1.0),
  new JavaLabeledDocument(1L, "b d", 0.0),
  new JavaLabeledDocument(2L, "spark f g h", 1.0),
  new JavaLabeledDocument(3L, "hadoop mapreduce", 0.0)
), JavaLabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words");
HashingTF hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol())
  .setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001);
Pipeline pipeline = new Pipeline()
  .setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);

// Prepare test documents, which are unlabeled.
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
  new JavaDocument(4L, "spark i j k"),
  new JavaDocument(5L, "l m n"),
  new JavaDocument(6L, "spark hadoop spark"),
  new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r : predictions.select("id", "text", "probability", "prediction").collectAsList()) {
  System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
    + ", prediction=" + r.get(3));
}
完全な例コードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/ml/JavaPipelineExample.java」で見つけることができます。

モデル選択(ハイパーパラメータチューニング)

ML Pipelinesを使用する大きな利点は、ハイパーパラメータ最適化です。自動モデル選択の詳細については、MLチューニングガイドを参照してください。