特徴量の抽出、変換、選択

このセクションでは、特徴量を扱うアルゴリズムを、大まかに次のグループに分けて説明します。

目次

特徴量抽出器

TF-IDF

Term frequency-inverse document frequency (TF-IDF) は、テキストマイニングで広く使用されている特徴量ベクトル化手法で、コーパス内のドキュメントに対する用語の重要性を反映します。用語を $t$、ドキュメントを $d$、コーパスを $D$ で表します。用語頻度 $TF(t, d)$ は、用語 $t$ がドキュメント $d$ に出現する回数であり、文書頻度 $DF(t, D)$ は、用語 $t$ を含むドキュメントの数です。用語頻度だけを使用して重要性を測定すると、頻繁に出現するがドキュメントに関する情報が少ない用語(「a」、「the」、「of」など)を過度に強調しやすくなります。ある用語がコーパス全体で非常に頻繁に出現する場合、その用語は特定のドキュメントに関する特別な情報を持たないことを意味します。逆文書頻度は、用語が提供する情報の量を示す数値尺度です。 \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] ここで、$|D|$ はコーパス内のドキュメントの総数です。対数が使用されているため、用語がすべてのドキュメントに出現する場合、その IDF 値は 0 になります。コーパス外の用語で 0 除算を回避するために、平滑化項が適用されることに注意してください。TF-IDF 尺度は、単に TF と IDF の積です。 \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 用語頻度と文書頻度の定義にはいくつかのバリアントがあります。MLlib では、柔軟性を高めるために TF と IDF を分離しています。

TF: HashingTFCountVectorizer の両方を使用して、用語頻度ベクトルを生成できます。

HashingTF は、用語のセットを受け取り、それらのセットを固定長のフィーチャベクトルに変換する Transformer です。テキスト処理では、「用語のセット」は bag of words である可能性があります。HashingTFハッシュトリックを利用します。生のフィーチャは、ハッシュ関数を適用することにより、インデックス(用語)にマッピングされます。ここで使用されるハッシュ関数は、MurmurHash 3 です。次に、マッピングされたインデックスに基づいて、用語頻度が計算されます。このアプローチでは、大規模なコーパスではコストのかかるグローバルな用語からインデックスへのマップを計算する必要がなくなりますが、ハッシュ衝突の可能性があり、異なる生のフィーチャがハッシュ後に同じ用語になる可能性があります。衝突の可能性を減らすために、ターゲットフィーチャの次元、つまりハッシュテーブルのバケット数を増やすことができます。ハッシュ値に対する単純なモジュロを使用してベクトルインデックスが決定されるため、フィーチャ次元として 2 のべき乗を使用することをお勧めします。そうしないと、フィーチャがベクトルインデックスに均等にマッピングされません。デフォルトのフィーチャ次元は $2^{18} = 262,144$ です。オプションのバイナリトグルパラメータは、用語頻度カウントを制御します。true に設定すると、ゼロ以外の頻度カウントはすべて 1 に設定されます。これは、整数カウントではなくバイナリカウントをモデル化する離散確率モデルに特に役立ちます。

CountVectorizer は、テキストドキュメントを用語カウントのベクトルに変換します。詳細については、CountVectorizerを参照してください。

IDF: IDF は、データセットに適合し、IDFModel を生成する Estimator です。IDFModel は、フィーチャベクトル(通常は HashingTF または CountVectorizer から作成)を取得し、各フィーチャをスケーリングします。直感的には、コーパスに頻繁に出現するフィーチャの重みを下げます。

注意: spark.ml は、テキストセグメンテーション用のツールを提供していません。Stanford NLP Groupscalanlp/chalk を参照してください。

次のコードセグメントでは、まず文のセットから始めます。Tokenizer を使用して、各文を単語に分割します。各文 (bag of words) について、HashingTF を使用して、文をフィーチャベクトルにハッシュします。IDF を使用して、フィーチャベクトルのスケールを変更します。これにより、通常、テキストをフィーチャとして使用する場合にパフォーマンスが向上します。フィーチャベクトルは、学習アルゴリズムに渡すことができます。

API の詳細については、HashingTF Python ドキュメントIDF Python ドキュメントを参照してください。

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/tf_idf_example.py" にあります。

API の詳細については、HashingTF Scala ドキュメントIDF Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala" にあります。

API の詳細については、HashingTF Java ドキュメントIDF Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, "Hi I heard about Spark"),
  RowFactory.create(0.0, "I wish Java could use case classes"),
  RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);

int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
  .setInputCol("words")
  .setOutputCol("rawFeatures")
  .setNumFeatures(numFeatures);

Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors

IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);

Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaTfIdfExample.java" にあります。

Word2Vec

Word2Vec は、ドキュメントを表す単語のシーケンスを受け取り、Word2VecModel をトレーニングする Estimator です。モデルは、各単語を一意の固定サイズのベクトルにマッピングします。Word2VecModel は、ドキュメント内のすべての単語の平均を使用して、各ドキュメントをベクトルに変換します。このベクトルは、予測、ドキュメントの類似性計算などのフィーチャとして使用できます。詳細については、Word2Vec に関する MLlib ユーザーガイドを参照してください。

次のコードセグメントでは、まず、それぞれが単語のシーケンスとして表されるドキュメントのセットから始めます。各ドキュメントについて、それをフィーチャベクトルに変換します。このフィーチャベクトルは、学習アルゴリズムに渡すことができます。

API の詳細については、Word2Vec Python ドキュメントを参照してください。

from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/word2vec_example.py" にあります。

API の詳細については、Word2Vec Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala" にあります。

API の詳細については、Word2Vec Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
  RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
  RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);

// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0);

Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);

for (Row row : result.collectAsList()) {
  List<String> text = row.getList(0);
  Vector vector = (Vector) row.get(1);
  System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaWord2VecExample.java" にあります。

CountVectorizer

CountVectorizerCountVectorizerModel は、テキストドキュメントのコレクションをトークンカウントのベクトルに変換するのに役立つことを目的としています。アプリオリな辞書が利用できない場合、CountVectorizerEstimator として使用して、語彙を抽出し、CountVectorizerModel を生成できます。モデルは、語彙全体でドキュメントのスパース表現を生成し、それらを LDA などの他のアルゴリズムに渡すことができます。

フィッティング処理中、CountVectorizer は、コーパス全体での出現頻度順に上位 vocabSize 個の単語を選択します。オプションのパラメータ minDF は、語彙に含めるために単語が出現しなければならないドキュメントの最小数(または < 1.0 の場合は割合)を指定することで、フィッティング処理にも影響を与えます。別のオプションのバイナリ切り替えパラメータは、出力ベクトルを制御します。true に設定すると、すべての非ゼロのカウントが 1 に設定されます。これは、整数カウントではなく、バイナリをモデル化する離散確率モデルに特に役立ちます。

以下のような idtexts 列を持つ DataFrame があると仮定します。

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

texts の各行は、Array[String] 型のドキュメントです。CountVectorizer の fit を呼び出すと、語彙 (a, b, c) を持つ CountVectorizerModel が生成されます。その後、変換後の出力列 "vector" には以下が含まれます。

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

各ベクトルは、語彙に対するドキュメントのトークン数を表します。

API の詳細については、CountVectorizer Python ドキュメントCountVectorizerModel Python ドキュメントを参照してください。

from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/count_vectorizer_example.py" にあります。

API の詳細については、CountVectorizer Scala ドキュメントCountVectorizerModel Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala" にあります。

API の詳細については、CountVectorizer Java ドキュメントCountVectorizerModel Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("a", "b", "c")),
  RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
  new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
  .setInputCol("text")
  .setOutputCol("feature")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df);

// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
  .setInputCol("text")
  .setOutputCol("feature");

cvModel.transform(df).show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaCountVectorizerExample.java" にあります。

FeatureHasher

フィーチャーハッシングは、カテゴリまたは数値のフィーチャーのセットを指定された次元(通常は元のフィーチャースペースよりもかなり小さい)のフィーチャーベクトルに投影します。これは、ハッシュトリックを使用して、フィーチャーをフィーチャーベクトルのインデックスにマッピングすることで行われます。

FeatureHasher トランスフォーマーは、複数の列で動作します。各列には、数値またはカテゴリのフィーチャーのいずれかを含めることができます。列のデータ型の動作と処理は次のとおりです。

Null(欠損)値は無視されます(結果のフィーチャーベクトルでは暗黙的にゼロになります)。

ここで使用するハッシュ関数は、HashingTF で使用されている MurmurHash 3 と同じです。ハッシュ値の単純な剰余を使用してベクトルインデックスを決定するため、numFeatures パラメータには 2 の累乗を使用することをお勧めします。そうしないと、フィーチャーはベクトルインデックスに均等にマッピングされません。

4つの入力列 real, bool, stringNum, string を持つ DataFrame があると仮定します。これらの異なるデータ型を入力として、フィーチャーベクトルの列を生成するための変換の動作を示します。

real| bool|stringNum|string
----|-----|---------|------
 2.2| true|        1|   foo
 3.3|false|        2|   bar
 4.4|false|        3|   baz
 5.5|false|        4|   foo

この DataFrame で FeatureHasher.transform の出力は次のようになります。

real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1        |foo   |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2        |bar   |(262144,[6031,  80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3        |baz   |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4        |foo   |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])

結果のフィーチャーベクトルは、学習アルゴリズムに渡すことができます。

API の詳細については、FeatureHasher Python ドキュメントを参照してください。

from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/feature_hasher_example.py" にあります。

API の詳細については、FeatureHasher Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.FeatureHasher

val dataset = spark.createDataFrame(Seq(
  (2.2, true, "1", "foo"),
  (3.3, false, "2", "bar"),
  (4.4, false, "3", "baz"),
  (5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")

val hasher = new FeatureHasher()
  .setInputCols("real", "bool", "stringNum", "string")
  .setOutputCol("features")

val featurized = hasher.transform(dataset)
featurized.show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/FeatureHasherExample.scala" にあります。

API の詳細については、FeatureHasher Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(2.2, true, "1", "foo"),
  RowFactory.create(3.3, false, "2", "bar"),
  RowFactory.create(4.4, false, "3", "baz"),
  RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
  new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
  new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
  new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);

FeatureHasher hasher = new FeatureHasher()
  .setInputCols(new String[]{"real", "bool", "stringNum", "string"})
  .setOutputCol("features");

Dataset<Row> featurized = hasher.transform(dataset);

featurized.show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaFeatureHasherExample.java" にあります。

特徴量変換器

Tokenizer

トークン化は、テキスト(文など)を個々の用語(通常は単語)に分割するプロセスです。単純な Tokenizer クラスがこの機能を提供します。次の例は、文を単語のシーケンスに分割する方法を示しています。

RegexTokenizer を使用すると、正規表現(regex)マッチングに基づくより高度なトークン化が可能です。デフォルトでは、パラメータ “pattern”(正規表現、デフォルト:"\\s+")は、入力テキストを分割する区切り文字として使用されます。あるいは、ユーザーはパラメータ “gaps” を false に設定して、正規表現 “pattern” が分割ギャップではなく「トークン」を表すことを示し、トークン化の結果として一致するすべての出現箇所を見つけることができます。

API の詳細については、Tokenizer Python ドキュメントRegexTokenizer Python ドキュメントを参照してください。

from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/tokenizer_example.py" にあります。

API の詳細については、Tokenizer Scala ドキュメントRegexTokenizer Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala" にあります。

API の詳細については、Tokenizer Java ドキュメントRegexTokenizer Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import scala.collection.mutable.Seq;

import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "Hi I heard about Spark"),
  RowFactory.create(1, "I wish Java could use case classes"),
  RowFactory.create(2, "Logistic,regression,models,are,neat")
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

RegexTokenizer regexTokenizer = new RegexTokenizer()
    .setInputCol("sentence")
    .setOutputCol("words")
    .setPattern("\\W");  // alternatively .setPattern("\\w+").setGaps(false);

spark.udf().register(
  "countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);

Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);

Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
    .withColumn("tokens", call_udf("countTokens", col("words")))
    .show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java" にあります。

StopWordsRemover

ストップワードは、通常、単語が頻繁に出現し、あまり意味を持たないため、入力から除外する必要がある単語です。

StopWordsRemover は、文字列のシーケンス(たとえば、Tokenizer の出力)を入力として受け取り、入力シーケンスからすべてのストップワードを削除します。ストップワードのリストは、stopWords パラメータで指定します。一部の言語のデフォルトのストップワードは、StopWordsRemover.loadDefaultStopWords(language) を呼び出すことでアクセスできます。利用可能なオプションは、「danish」、「dutch」、「english」、「finnish」、「french」、「german」、「hungarian」、「italian」、「norwegian」、「portuguese」、「russian」、「spanish」、「swedish」、「turkish」です。ブール値パラメータ caseSensitive は、一致を大文字と小文字を区別するかどうかを示します(デフォルトでは false)。

以下のような idraw 列を持つ DataFrame があると仮定します。

 id | raw
----|----------
 0  | [I, saw, the, red, balloon]
 1  | [Mary, had, a, little, lamb]

raw を入力列、filtered を出力列として StopWordsRemover を適用すると、次のようになります。

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, balloon]  |  [saw, red, balloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

filtered では、ストップワード “I”, “the”, “had”, “a” がフィルタリングされています。

API の詳細については、StopWordsRemover Python ドキュメントを参照してください。

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/stopwords_remover_example.py" にあります。

API の詳細については、StopWordsRemover Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "balloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala" にあります。

API の詳細については、StopWordsRemover Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered");

List<Row> data = Arrays.asList(
  RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
  RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

StructType schema = new StructType(new StructField[]{
  new StructField(
    "raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStopWordsRemoverExample.java" にあります。

$n$-gram

n-gram は、整数 $n$ に対して $n$ 個のトークン(通常は単語)のシーケンスです。NGram クラスを使用して、入力フィーチャーを $n$-gram に変換できます。

NGram は、文字列のシーケンス(たとえば、Tokenizer の出力)を入力として受け取ります。パラメータ n は、各 $n$-gram の用語数を決定するために使用されます。出力は、$n$-gram のシーケンスで構成され、各 $n$-gram は $n$ 個の連続した単語のスペース区切りの文字列で表されます。入力シーケンスに n 個未満の文字列が含まれている場合、出力は生成されません。

API の詳細については、NGram Python ドキュメントを参照してください。

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/n_gram_example.py" にあります。

API の詳細については、NGram Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala" にあります。

API の詳細については、NGram Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
  RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
  RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField(
    "words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaNGramExample.java" にあります。

Binarizer

二値化は、数値フィーチャーを二値(0/1)フィーチャーに閾値処理するプロセスです。

Binarizer は、共通パラメータ inputColoutputCol に加えて、二値化の threshold を受け取ります。閾値よりも大きいフィーチャー値は 1.0 に二値化され、閾値以下の値は 0.0 に二値化されます。inputCol では、Vector と Double の両方の型がサポートされています。

API の詳細については、Binarizer Python ドキュメントを参照してください。

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/binarizer_example.py" にあります。

APIの詳細については、Binarizer Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala" にあります。

APIの詳細については、Binarizer Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 0.1),
  RowFactory.create(1, 0.8),
  RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaBinarizerExample.java" にあります。

PCA

主成分分析 (PCA) は、直交変換を使用して、相関する可能性のある変数の観測値を、主成分と呼ばれる線形に相関のない値のセットに変換する統計的手法です。PCA クラスは、PCA を使用してベクトルを低次元空間に投影するモデルをトレーニングします。以下の例では、5 次元の特徴ベクトルを 3 次元の主成分に投影する方法を示します。

APIの詳細については、PCA Pythonドキュメントを参照してください。

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/pca_example.py" にあります。

APIの詳細については、PCA Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala" にあります。

APIの詳細については、PCA Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
  RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
  RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPCAExample.java" にあります。

PolynomialExpansion

多項式展開は、元の次元のn次結合によって定式化される多項式空間に特徴を拡張するプロセスです。PolynomialExpansion クラスは、この機能を提供します。以下の例では、特徴を 3 次多項式空間に拡張する方法を示します。

APIの詳細については、PolynomialExpansion Pythonドキュメントを参照してください。

from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/polynomial_expansion_example.py" にあります。

APIの詳細については、PolynomialExpansion Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala" にあります。

APIの詳細については、PolynomialExpansion Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(2.0, 1.0)),
  RowFactory.create(Vectors.dense(0.0, 0.0)),
  RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaPolynomialExpansionExample.java" にあります。

離散コサイン変換 (DCT)

離散コサイン変換は、時間領域の長さ $N$ の実数値シーケンスを、周波数領域の別の長さ $N$ の実数値シーケンスに変換します。DCT クラスは、DCT-II を実装し、変換を表す行列がユニタリになるように結果を $1/\sqrt{2}$ でスケーリングすることにより、この機能を提供します。変換されたシーケンスにはシフトは適用されません (例: 変換されたシーケンスの $0$ 番目の要素は $0$ 番目の DCT 係数であり、$N/2$ 番目ではない)。

APIの詳細については、DCT Pythonドキュメントを参照してください。

from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/dct_example.py" にあります。

APIの詳細については、DCT Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala" にあります。

APIの詳細については、DCT Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
  RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
  RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(false);
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaDCTExample.java" にあります。

StringIndexer

StringIndexer は、ラベルの文字列列をラベルインデックスの列にエンコードします。StringIndexer は複数の列をエンコードできます。インデックスは [0, numLabels) にあり、次の 4 つの順序付けオプションがサポートされています。"frequencyDesc": ラベル頻度による降順 (最も頻度の高いラベルに 0 が割り当てられる)、"frequencyAsc": ラベル頻度による昇順 (最も頻度の低いラベルに 0 が割り当てられる)、"alphabetDesc": アルファベット順の降順、"alphabetAsc": アルファベット順の昇順 (デフォルト = "frequencyDesc")。なお、"frequencyDesc"/"frequencyAsc" で頻度が同じ場合は、文字列がアルファベット順にソートされます。

ユーザーが保持するように選択した場合、表示されないラベルは numLabels のインデックスに配置されます。入力列が数値の場合、文字列にキャストして文字列値をインデックス化します。 EstimatorTransformer などのダウンストリームのパイプラインコンポーネントが、この文字列でインデックス化されたラベルを使用する場合、コンポーネントの入力列をこの文字列でインデックス化された列名に設定する必要があります。多くの場合、setInputCol で入力列を設定できます。

idcategory を持つ次の DataFrame があると仮定します。

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category は、「a」、「b」、「c」の 3 つのラベルを持つ文字列列です。StringIndexer を、入力列として category、出力列として categoryIndex を使用して適用すると、次のようになります。

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

「a」は最も頻度が高いためインデックス 0 が割り当てられ、次に「c」がインデックス 1、「b」がインデックス 2 が割り当てられます。

さらに、StringIndexer が、あるデータセットで StringIndexer を適合させた後、別のデータセットを変換するために使用する場合に、表示されないラベルを処理する方法について、3 つの戦略があります。

前の例に戻りますが、今回は以前に定義した StringIndexer を次のデータセットで再利用します。

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

StringIndexer が表示されないラベルをどのように処理するかを設定していない場合、または「error」に設定した場合、例外がスローされます。ただし、setHandleInvalid("skip") を呼び出した場合、次のデータセットが生成されます。

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

「d」または「e」を含む行が表示されないことに注意してください。

setHandleInvalid("keep") を呼び出すと、次のデータセットが生成されます。

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

「d」または「e」を含む行がインデックス「3.0」にマップされていることに注意してください。

APIの詳細については、StringIndexer Pythonドキュメントを参照してください。

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/string_indexer_example.py" にあります。

APIの詳細については、StringIndexer Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala" にあります。

APIの詳細については、StringIndexer Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaStringIndexerExample.java" にあります。

IndexToString

StringIndexer と対称的に、IndexToString は、ラベルインデックスの列を、元のラベルを文字列として含む列にマッピングし直します。一般的なユースケースは、StringIndexer でラベルからインデックスを生成し、それらのインデックスでモデルをトレーニングし、IndexToString で予測されたインデックスの列から元のラベルを取得することです。ただし、独自のラベルを自由に指定できます。

StringIndexer の例に基づいて、列 idcategoryIndex を持つ次の DataFrame があると仮定します。

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

IndexToString を、入力列として categoryIndex、出力列として originalCategory を使用して適用すると、元のラベルを取得できます (列のメタデータから推論されます)。

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

APIの詳細については、IndexToString Pythonドキュメントを参照してください。

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/index_to_string_example.py" にあります。

APIの詳細については、IndexToString Scalaドキュメントを参照してください。

import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala" にあります。

APIの詳細については、IndexToString Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, "a"),
  RowFactory.create(1, "b"),
  RowFactory.create(2, "c"),
  RowFactory.create(3, "a"),
  RowFactory.create(4, "a"),
  RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df);
Dataset<Row> indexed = indexer.transform(df);

System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
    "to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();

StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
    Attribute.fromStructField(inputColSchema).toString() + "\n");

IndexToString converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);

System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
    "original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaIndexToStringExample.java" にあります。

OneHotEncoder

One-hot エンコーディングは、ラベルインデックスとして表現されたカテゴリ特徴を、すべての特徴値のセットから特定の特徴値の存在を示す最大 1 つの値を持つバイナリベクトルにマッピングします。このエンコーディングにより、ロジスティック回帰など、連続した特徴を期待するアルゴリズムでカテゴリ特徴を使用できるようになります。文字列型の入力データの場合、StringIndexer を使用して最初にカテゴリ特徴をエンコードするのが一般的です。

OneHotEncoder は、複数の列を変換でき、入力列ごとに one-hot エンコードされた出力ベクトル列を返します。これらのベクトルを VectorAssembler を使用して単一の特徴ベクトルにマージするのが一般的です。

OneHotEncoder は、データの変換中に無効な入力をどのように処理するかを選択する handleInvalid パラメーターをサポートしています。使用可能なオプションには、「keep」(無効な入力は追加のカテゴリインデックスに割り当てられる) と「error」(エラーをスローする) があります。

APIの詳細については、OneHotEncoder Pythonドキュメントを参照してください。

from pyspark.ml.feature import OneHotEncoder

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/onehot_encoder_example.py" にあります。

APIの詳細については、OneHotEncoder Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.OneHotEncoder

val df = spark.createDataFrame(Seq(
  (0.0, 1.0),
  (1.0, 0.0),
  (2.0, 1.0),
  (0.0, 2.0),
  (0.0, 1.0),
  (2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")

val encoder = new OneHotEncoder()
  .setInputCols(Array("categoryIndex1", "categoryIndex2"))
  .setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)

val encoded = model.transform(df)
encoded.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala"にあります。

APIの詳細については、OneHotEncoder Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0.0, 1.0),
  RowFactory.create(1.0, 0.0),
  RowFactory.create(2.0, 1.0),
  RowFactory.create(0.0, 2.0),
  RowFactory.create(0.0, 1.0),
  RowFactory.create(2.0, 0.0)
);

StructType schema = new StructType(new StructField[]{
  new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

OneHotEncoder encoder = new OneHotEncoder()
  .setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
  .setOutputCols(new String[] {"categoryVec1", "categoryVec2"});

OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaOneHotEncoderExample.java"にあります。

VectorIndexer

VectorIndexerは、Vectorのデータセット内のカテゴリ特徴のインデックス付けを支援します。どの特徴がカテゴリであるかを自動的に判断し、元の値をカテゴリインデックスに変換することができます。具体的には、以下の処理を行います。

  1. Vector型の入力列とパラメータmaxCategoriesを受け取ります。
  2. 異なる値の数に基づいて、どの特徴をカテゴリとするかを決定します。最大でmaxCategories個の異なる値を持つ特徴はカテゴリであると宣言されます。
  3. 各カテゴリ特徴に対して、0ベースのカテゴリインデックスを計算します。
  4. カテゴリ特徴にインデックスを付け、元の特徴値をインデックスに変換します。

カテゴリ特徴にインデックスを付けることで、決定木やツリーアンサンブルなどのアルゴリズムがカテゴリ特徴を適切に扱うことができるようになり、パフォーマンスが向上します。

以下の例では、ラベル付きポイントのデータセットを読み込み、VectorIndexerを使用して、どの特徴をカテゴリとして扱うべきかを決定します。カテゴリ特徴の値をインデックスに変換します。この変換されたデータは、カテゴリ特徴を処理するDecisionTreeRegressorなどのアルゴリズムに渡すことができます。

APIの詳細については、VectorIndexer Pythonドキュメントを参照してください。

from pyspark.ml.feature import VectorIndexer

data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
      (len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))

# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/vector_indexer_example.py"にあります。

APIの詳細については、VectorIndexer Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
  s"categorical features: ${categoricalFeatures.mkString(", ")}")

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala"にあります。

APIの詳細については、VectorIndexer Javaドキュメントを参照してください。

import java.util.Map;

import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

VectorIndexer indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);

Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");

for (Integer feature : categoryMaps.keySet()) {
  System.out.print(" " + feature);
}
System.out.println();

// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaVectorIndexerExample.java"にあります。

Interaction

Interactionは、ベクトルまたはdouble値の列を受け取り、各入力列から1つの値のすべての組み合わせの積を含む単一のベクトル列を生成するTransformerです。

たとえば、入力列としてそれぞれ3次元を持つ2つのベクトル型列がある場合、出力列として9次元ベクトルが得られます。

次のDataFrameに "id1"、"vec1"、"vec2" の列があると仮定します。

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

これらの入力列に対してInteractionを適用すると、出力列としてのinteractedColには以下が含まれます。

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       

APIの詳細については、Interaction Pythonドキュメントを参照してください。

from pyspark.ml.feature import Interaction, VectorAssembler

df = spark.createDataFrame(
    [(1, 1, 2, 3, 8, 4, 5),
     (2, 4, 3, 8, 7, 9, 8),
     (3, 6, 1, 9, 2, 3, 6),
     (4, 10, 8, 6, 9, 4, 5),
     (5, 9, 2, 7, 10, 7, 3),
     (6, 1, 1, 4, 2, 8, 4)],
    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

assembled1 = assembler1.transform(df)

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

interacted = interaction.transform(assembled2)

interacted.show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/interaction_example.py"にあります。

APIの詳細については、Interaction Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala"にあります。

APIの詳細については、Interaction Javaドキュメントを参照してください。

List<Row> data = Arrays.asList(
  RowFactory.create(1, 1, 2, 3, 8, 4, 5),
  RowFactory.create(2, 4, 3, 8, 7, 9, 8),
  RowFactory.create(3, 6, 1, 9, 2, 3, 6),
  RowFactory.create(4, 10, 8, 6, 9, 4, 5),
  RowFactory.create(5, 9, 2, 7, 10, 7, 3),
  RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VectorAssembler assembler1 = new VectorAssembler()
        .setInputCols(new String[]{"id2", "id3", "id4"})
        .setOutputCol("vec1");

Dataset<Row> assembled1 = assembler1.transform(df);

VectorAssembler assembler2 = new VectorAssembler()
        .setInputCols(new String[]{"id5", "id6", "id7"})
        .setOutputCol("vec2");

Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");

Interaction interaction = new Interaction()
        .setInputCols(new String[]{"id1","vec1","vec2"})
        .setOutputCol("interactedCol");

Dataset<Row> interacted = interaction.transform(assembled2);

interacted.show(false);
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaInteractionExample.java"にあります。

Normalizer

Normalizerは、Vector行のデータセットを変換し、各Vectorを単位ノルムにするTransformerです。正規化に使用されるp-ノルムを指定するパラメータpを受け取ります(デフォルトは$p = 2$です)。この正規化は、入力データを標準化し、学習アルゴリズムの動作を改善するのに役立ちます。

次の例は、libsvm形式でデータセットをロードし、各行を単位$L^1$ノルムおよび単位$L^\infty$ノルムにする方法を示しています。

APIの詳細については、Normalizer Pythonドキュメントを参照してください。

from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/normalizer_example.py"にあります。

APIの詳細については、Normalizer Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala"にあります。

APIの詳細については、Normalizer Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0);

Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();

// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
  normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaNormalizerExample.java"にあります。

StandardScaler

StandardScalerは、Vector行のデータセットを変換し、各特徴を単位標準偏差および/またはゼロ平均になるように正規化します。パラメータを受け取ります。

StandardScalerは、データセットにfitしてStandardScalerModelを生成できるEstimatorです。これは、要約統計量を計算することに相当します。モデルは、データセットのVector列を変換して、単位標準偏差および/またはゼロ平均の特徴を持つようにすることができます。

特徴の標準偏差がゼロの場合、その特徴のVectorにデフォルト値0.0が返されることに注意してください。

次の例は、libsvm形式でデータセットをロードし、各特徴を単位標準偏差にする方法を示しています。

APIの詳細については、StandardScaler Pythonドキュメントを参照してください。

from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)

# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/standard_scaler_example.py"にあります。

APIの詳細については、StandardScaler Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala"にあります。

APIの詳細については、StandardScaler Javaドキュメントを参照してください。

import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

StandardScaler scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false);

// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);

// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaStandardScalerExample.java"にあります。

RobustScaler

RobustScalerは、Vector行のデータセットを変換し、中央値を削除し、特定の分位範囲(デフォルトではIQR:四分位範囲、第1四分位数と第3四分位数の間の分位範囲)に従ってデータをスケーリングします。その動作はStandardScalerとよく似ていますが、平均と標準偏差の代わりに中央値と分位範囲が使用されるため、外れ値に対してロバストです。パラメータを受け取ります。

RobustScalerは、データセットにfitしてRobustScalerModelを生成できるEstimatorです。これは、分位統計量を計算することに相当します。モデルは、データセットのVector列を変換して、単位分位範囲および/またはゼロ中央値の特徴を持つようにすることができます。

特徴の分位範囲がゼロの場合、その特徴のVectorにデフォルト値0.0が返されることに注意してください。

次の例は、libsvm形式でデータセットをロードし、各特徴を単位分位範囲にする方法を示しています。

APIの詳細については、RobustScaler Pythonドキュメントを参照してください。

from pyspark.ml.feature import RobustScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)

# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/robust_scaler_example.py"にあります。

APIの詳細については、RobustScaler Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.RobustScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75)

// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)

// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/RobustScalerExample.scala"にあります。

APIの詳細については、RobustScaler Javaドキュメントを参照してください。

import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> dataFrame =
  spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");

RobustScaler scaler = new RobustScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithScaling(true)
  .setWithCentering(false)
  .setLower(0.25)
  .setUpper(0.75);

// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);

// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaRobustScalerExample.java"にあります。

MinMaxScaler

MinMaxScalerは、Vector行のデータセットを変換し、各特徴を特定の範囲(多くの場合[0、1])に再スケーリングします。パラメータを受け取ります。

MinMaxScalerは、データセットの要約統計量を計算し、MinMaxScalerModelを生成します。モデルは、各特徴が指定された範囲内になるように個別に変換できます。

特徴Eの再スケーリングされた値は、\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}として計算されます。$E_{max} == E_{min}$の場合、$Rescaled(e_i) = 0.5 * (max + min)$となります。

ゼロ値が非ゼロ値に変換される可能性があるため、トランスフォーマーの出力はスパース入力の場合でもDenseVectorになることに注意してください。

次の例は、libsvm形式でデータセットをロードし、各特徴を[0、1]に再スケーリングする方法を示しています。

APIの詳細については、MinMaxScaler PythonドキュメントおよびMinMaxScalerModel Pythonドキュメントを参照してください。

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -1.0]),),
    (1, Vectors.dense([2.0, 1.1, 1.0]),),
    (2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/min_max_scaler_example.py"にあります。

APIの詳細については、MinMaxScaler ScalaドキュメントMinMaxScalerModel Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala"にあります。

APIの詳細については、MinMaxScaler JavaドキュメントMinMaxScalerModel Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
    RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MinMaxScaler scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
    + scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaMinMaxScalerExample.java"にあります。

MaxAbsScaler

MaxAbsScalerは、各特徴量をその特徴量の絶対値の最大値で割ることにより、Vector行のデータセットを変換し、各特徴量の範囲を[-1, 1]にリスケールします。データのシフト/センタリングは行わないため、スパース性が失われることはありません。

MaxAbsScalerはデータセットの要約統計量を計算し、MaxAbsScalerModelを生成します。このモデルは、各特徴量を個別に[-1, 1]の範囲に変換できます。

次の例では、libsvm形式のデータセットをロードし、各特徴量を[-1, 1]にリスケールする方法を示します。

APIの詳細については、MaxAbsScaler PythonドキュメントMaxAbsScalerModel Pythonドキュメントを参照してください。

from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.1, -8.0]),),
    (1, Vectors.dense([2.0, 1.0, -4.0]),),
    (2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)

# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)

scaledData.select("features", "scaledFeatures").show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/max_abs_scaler_example.py"にあります。

APIの詳細については、MaxAbsScaler ScalaドキュメントMaxAbsScalerModel Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala"にあります。

APIの詳細については、MaxAbsScaler JavaドキュメントMaxAbsScalerModel Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
    RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
    RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
    RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
    new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
    new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

MaxAbsScaler scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures");

// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);

// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaMaxAbsScalerExample.java"にあります。

Bucketizer

Bucketizerは、連続特徴量の列を、ユーザーが指定したバケットで指定された特徴量バケットの列に変換します。パラメータを取ります

対象の列の上限と下限がわからない場合は、Bucketizerの範囲外になる可能性のある例外を防ぐために、スプリットの境界としてDouble.NegativeInfinityDouble.PositiveInfinityを追加する必要があることに注意してください。

また、提供したスプリットは、s0 < s1 < s2 < ... < snのように、厳密に増加する順序である必要があることにも注意してください。

詳細については、BucketizerのAPIドキュメントを参照してください。

次の例では、Doubleの列を別のインデックス化された列にバケット化する方法を示します。

APIの詳細については、Bucketizer Pythonドキュメントを参照してください。

from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/bucketizer_example.py"にあります。

APIの詳細については、Bucketizer Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)

println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()

val splitsArray = Array(
  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
  Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))

val data2 = Array(
  (-999.9, -999.9),
  (-0.5, -0.2),
  (-0.3, -0.1),
  (0.0, 0.0),
  (0.2, 0.4),
  (999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")

val bucketizer2 = new Bucketizer()
  .setInputCols(Array("features1", "features2"))
  .setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
  .setSplitsArray(splitsArray)

// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)

println(s"Bucketizer output with [" +
  s"${bucketizer2.getSplitsArray(0).length-1}, " +
  s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala"にあります。

APIの詳細については、Bucketizer Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};

List<Row> data = Arrays.asList(
  RowFactory.create(-999.9),
  RowFactory.create(-0.5),
  RowFactory.create(-0.3),
  RowFactory.create(0.0),
  RowFactory.create(0.2),
  RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
  new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Bucketizer bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits);

// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);

System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();

// Bucketize multiple columns at one pass.
double[][] splitsArray = {
  {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
  {Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};

List<Row> data2 = Arrays.asList(
  RowFactory.create(-999.9, -999.9),
  RowFactory.create(-0.5, -0.2),
  RowFactory.create(-0.3, -0.1),
  RowFactory.create(0.0, 0.0),
  RowFactory.create(0.2, 0.4),
  RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
  new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);

Bucketizer bucketizer2 = new Bucketizer()
  .setInputCols(new String[] {"features1", "features2"})
  .setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
  .setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);

System.out.println("Bucketizer output with [" +
  (bucketizer2.getSplitsArray()[0].length-1) + ", " +
  (bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java"にあります。

ElementwiseProduct

ElementwiseProductは、要素ごとの乗算を使用して、各入力ベクトルに提供された「重み」ベクトルを乗算します。言い換えれば、データセットの各列をスカラー乗数でスケーリングします。これは、入力ベクトルvと変換ベクトルwとの間のアダマール積を表し、結果ベクトルを生成します。

\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]

以下の例では、変換ベクトル値を使用してベクトルを変換する方法を示します。

APIの詳細については、ElementwiseProduct Pythonドキュメントを参照してください。

from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/elementwise_product_example.py"にあります。

APIの詳細については、ElementwiseProduct Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala"にあります。

APIの詳細については、ElementwiseProduct Javaドキュメントを参照してください。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
  RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
  RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);

List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));

StructType schema = DataTypes.createStructType(fields);

Dataset<Row> dataFrame = spark.createDataFrame(data, schema);

Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);

ElementwiseProduct transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector");

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaElementwiseProductExample.java"にあります。

SQLTransformer

SQLTransformerは、SQLステートメントで定義された変換を実装します。現在、"SELECT ... FROM __THIS__ ..."のようなSQL構文のみをサポートしています。ここで、"__THIS__"は入力データセットの基になるテーブルを表します。select句は、出力に表示するフィールド、定数、および式を指定し、Spark SQLがサポートする任意のselect句を使用できます。ユーザーは、Spark SQL組み込み関数およびUDFを使用して、選択したこれらの列を操作することもできます。たとえば、SQLTransformerは次のようなステートメントをサポートします。

idv1v2を持つ次のDataFrameがあると仮定します。

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

これは、ステートメント"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"を使用したSQLTransformerの出力です。

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0

APIの詳細については、SQLTransformer Pythonドキュメントを参照してください。

from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/sql_transformer.py"にあります。

APIの詳細については、SQLTransformer Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala"にあります。

APIの詳細については、SQLTransformer Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 1.0, 3.0),
  RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
  new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);

SQLTransformer sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");

sqlTrans.transform(df).show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaSQLTransformerExample.java"にあります。

VectorAssembler

VectorAssemblerは、指定された列のリストを単一のベクトル列に結合するトランスフォーマーです。これは、ロジスティック回帰や決定木などのMLモデルをトレーニングするために、生のフィーチャと異なるフィーチャトランスフォーマーによって生成されたフィーチャを単一のフィーチャベクトルに結合する場合に役立ちます。VectorAssemblerは、次の入力列タイプを受け入れます:すべての数値型、ブール型、およびベクトル型。各行で、入力列の値は、指定された順序でベクトルに連結されます。

idhourmobileuserFeatures、およびclickedを持つDataFrameがあると仮定します。

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0

userFeaturesは、3つのユーザーフィーチャを含むベクトル列です。hourmobile、およびuserFeaturesfeaturesと呼ばれる単一のフィーチャベクトルに結合し、clickedを予測するために使用します。VectorAssemblerの入力列をhourmobile、およびuserFeaturesに設定し、出力列をfeaturesに設定すると、変換後、次のDataFrameが得られます。

 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

APIの詳細については、VectorAssembler Pythonドキュメントを参照してください。

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/vector_assembler_example.py"にあります。

APIの詳細については、VectorAssembler Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala"にあります。

APIの詳細については、VectorAssembler Javaドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaVectorAssemblerExample.java"にあります。

VectorSizeHint

VectorTypeの列のベクトルのサイズを明示的に指定すると便利な場合があります。たとえば、VectorAssemblerは、入力列からのサイズ情報を使用して、出力列のサイズ情報とメタデータを生成します。場合によっては、この情報を列の内容を調べることで取得できますが、ストリーミングデータフレームでは、ストリームが開始されるまで内容は利用できません。VectorSizeHintを使用すると、ユーザーは列のベクトルサイズを明示的に指定できるため、VectorAssembler、またはベクトルサイズを知る必要がある可能性のある他のトランスフォーマーは、その列を入力として使用できます。

VectorSizeHintを使用するには、ユーザーはinputColパラメーターとsizeパラメーターを設定する必要があります。このトランスフォーマーをデータフレームに適用すると、ベクトルサイズを指定するinputColのメタデータが更新された新しいデータフレームが生成されます。結果のデータフレームに対するダウンストリーム操作は、メタデータを使用してこのサイズを取得できます。

VectorSizeHint は、オプションの handleInvalid パラメータも受け取ることができ、これは、ベクトル列にnullが含まれる場合や、サイズが間違ったベクトルが含まれる場合の動作を制御します。デフォルトでは、handleInvalid は "error" に設定されており、例外がスローされることを示します。このパラメータは、"skip" に設定することもでき、これは無効な値を含む行を結果のデータフレームからフィルタリングすることを示します。また、"optimistic" に設定することもでき、これは列が無効な値についてチェックされず、すべての行が保持されることを示します。"optimistic" の使用は、結果のデータフレームを矛盾した状態にする可能性があることに注意してください。つまり、VectorSizeHint が適用された列のメタデータが、その列の内容と一致しないことを意味します。ユーザーは、このような矛盾した状態を避けるように注意する必要があります。

APIの詳細については、VectorSizeHint Python ドキュメントを参照してください。

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/vector_size_hint_example.py" にあります。

APIの詳細については、VectorSizeHint Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq(
    (0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
    (0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3)

val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorSizeHintExample.scala" にあります。

APIの詳細については、VectorSizeHint Java ドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("hour", IntegerType, false),
  createStructField("mobile", DoubleType, false),
  createStructField("userFeatures", new VectorUDT(), false),
  createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);

VectorSizeHint sizeHint = new VectorSizeHint()
  .setInputCol("userFeatures")
  .setHandleInvalid("skip")
  .setSize(3);

Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);

VectorAssembler assembler = new VectorAssembler()
  .setInputCols(new String[]{"hour", "mobile", "userFeatures"})
  .setOutputCol("features");

// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
    "'features'");
output.select("features", "clicked").show(false);
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSizeHintExample.java" にあります。

QuantileDiscretizer

QuantileDiscretizer は、連続的な特徴を持つ列を受け取り、ビン分割されたカテゴリ特徴を持つ列を出力します。ビンの数は numBuckets パラメータで設定します。使用されるバケットの数がこの値よりも小さくなる可能性があり、例えば、十分な個別の分位数を作成するための入力の個別の値が少なすぎる場合などがあります。

NaN値:NaN値は、QuantileDiscretizer のフィッティング中に列から削除されます。これにより、予測を行うための Bucketizer モデルが生成されます。変換中、Bucketizer はデータセット内でNaN値を見つけたときにエラーを発生させますが、ユーザーは handleInvalid を設定することで、データセット内のNaN値を保持または削除することを選択することもできます。ユーザーがNaN値を保持することを選択した場合、それらは特別に処理され、独自のバケットに配置されます。例えば、4つのバケットが使用される場合、NaN以外のデータはバケット[0-3]に入れられますが、NaNは特別なバケット[4]に数えられます。

アルゴリズム:ビンの範囲は、近似アルゴリズムを使用して選択されます(詳細については、approxQuantile のドキュメントを参照してください)。近似の精度は、relativeError パラメータで制御できます。ゼロに設定すると、正確な分位数が計算されます(注:正確な分位数の計算はコストのかかる操作です)。下限と上限のビンの境界は、すべての実数値をカバーする -Infinity+Infinity になります。

idhour を持つ DataFrame があると仮定します。

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hourDouble 型の連続的な特徴です。この連続的な特徴をカテゴリカルな特徴に変換したいと考えています。numBuckets = 3 が与えられた場合、次の DataFrame が得られるはずです。

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0

APIの詳細については、QuantileDiscretizer Python ドキュメントを参照してください。

from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/quantile_discretizer_example.py" にあります。

APIの詳細については、QuantileDiscretizer Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show(false)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala" にあります。

APIの詳細については、QuantileDiscretizer Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(0, 18.0),
  RowFactory.create(1, 19.0),
  RowFactory.create(2, 8.0),
  RowFactory.create(3, 5.0),
  RowFactory.create(4, 2.2)
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

QuantileDiscretizer discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3);

Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaQuantileDiscretizerExample.java" にあります。

Imputer

Imputer 推定器は、欠損値が位置する列の平均、中央値、または最頻値を使用して、データセット内の欠損値を補完します。入力列は数値型である必要があります。現在、Imputer はカテゴリカルな特徴をサポートしておらず、カテゴリカルな特徴を含む列に対して誤った値を生成する可能性があります。Imputer は、.setMissingValue(custom_value) によって 'NaN' 以外のカスタム値を補完できます。たとえば、.setMissingValue(0) は(0)のすべての出現を補完します。

注意 入力列のすべての null 値は欠損値として扱われ、同様に補完されます。

a および b を持つ DataFrame があると仮定します。

      a     |      b      
------------|-----------
     1.0    | Double.NaN
     2.0    | Double.NaN
 Double.NaN |     3.0   
     4.0    |     4.0   
     5.0    |     5.0   

この例では、Imputerは、Double.NaN (欠損値のデフォルト)のすべての出現を、対応する列の他の値から計算された平均(デフォルトの補完戦略)に置き換えます。この例では、列 a および b の代替値はそれぞれ 3.0 と 4.0 です。変換後、出力列の欠損値は、関連する列の代替値に置き換えられます。

      a     |      b     | out_a | out_b   
------------|------------|-------|-------
     1.0    | Double.NaN |  1.0  |  4.0 
     2.0    | Double.NaN |  2.0  |  4.0 
 Double.NaN |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

APIの詳細については、Imputer Python ドキュメントを参照してください。

from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/imputer_example.py" にあります。

APIの詳細については、Imputer Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.Imputer

val df = spark.createDataFrame(Seq(
  (1.0, Double.NaN),
  (2.0, Double.NaN),
  (Double.NaN, 3.0),
  (4.0, 4.0),
  (5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
  .setInputCols(Array("a", "b"))
  .setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ImputerExample.scala" にあります。

APIの詳細については、Imputer Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1.0, Double.NaN),
  RowFactory.create(2.0, Double.NaN),
  RowFactory.create(Double.NaN, 3.0),
  RowFactory.create(4.0, 4.0),
  RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
  createStructField("a", DoubleType, false),
  createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
  .setInputCols(new String[]{"a", "b"})
  .setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaImputerExample.java" にあります。

特徴量セレクター

VectorSlicer

VectorSlicer は、特徴ベクトルを受け取り、元の特徴のサブ配列を持つ新しい特徴ベクトルを出力するトランスフォーマーです。ベクトル列から特徴を抽出するのに役立ちます。

VectorSlicer は、指定されたインデックスを持つベクトル列を受け入れ、それらのインデックスを介して値が選択される新しいベクトル列を出力します。インデックスには2つのタイプがあります。

  1. ベクトルへのインデックスを表す整数インデックス、setIndices()

  2. ベクトルの特徴の名前を表す文字列インデックス、setNames()これは、実装が Attribute のnameフィールドで一致するため、ベクトル列が AttributeGroup を持つことを必要とします。

整数と文字列による指定はどちらも受け入れ可能です。さらに、整数のインデックスと文字列の名前を同時に使用できます。少なくとも1つの特徴を選択する必要があります。重複する特徴は許可されないため、選択されたインデックスと名前の間に重複があってはなりません。特徴の名前が選択された場合、空の入力属性が発生すると例外がスローされることに注意してください。

出力ベクトルは、最初に選択されたインデックス(指定された順序)で特徴を並べ、その後に選択された名前(指定された順序)を続けます。

userFeatures を持つ DataFrame があると仮定します。

 userFeatures
------------------
 [0.0, 10.0, 0.5]

userFeatures は、3つのユーザー特徴を含むベクトル列です。userFeatures の最初の列はすべてゼロであると仮定し、それを削除して最後の2つの列のみを選択したいと考えています。VectorSlicer は、setIndices(1, 2) を使用して最後の2つの要素を選択し、features という名前の新しいベクトル列を生成します。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]

また、userFeatures の潜在的な入力属性、つまり ["f1", "f2", "f3"] があるとすると、setNames("f2", "f3") を使用してそれらを選択できます。

 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

APIの詳細については、VectorSlicer Python ドキュメントを参照してください。

from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/python/ml/vector_slicer_example.py" にあります。

APIの詳細については、VectorSlicer Scala ドキュメントを参照してください。

import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala" にあります。

APIの詳細については、VectorSlicer Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

Attribute[] attrs = {
  NumericAttribute.defaultAttr().withName("f1"),
  NumericAttribute.defaultAttr().withName("f2"),
  NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);

List<Row> data = Arrays.asList(
  RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
  RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);

Dataset<Row> dataset =
  spark.createDataFrame(data, (new StructType()).add(group.toStructField()));

VectorSlicer vectorSlicer = new VectorSlicer()
  .setInputCol("userFeatures").setOutputCol("features");

vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})

Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
完全なサンプルコードは、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVectorSlicerExample.java" にあります。

RFormula

RFormula は、Rモデルの式で指定された列を選択します。現在、'~'、'.'、':'、'+'、'-'を含むR演算子の限られたサブセットをサポートしています。基本的な演算子は次のとおりです。

ab が double 列であると仮定し、RFormula の効果を説明するために、次の簡単な例を使用します。

RFormula は、特徴のベクトル列と、double または string のラベル列を生成します。線形回帰のためにRで式が使用される場合と同様に、数値列はdoubleにキャストされます。文字列の入力列については、最初に StringIndexer によって stringOrderType によって決定された順序を使用して変換され、順序付け後の最後のカテゴリは削除され、doubleはワンホットエンコードされます。

{'b', 'a', 'b', 'a', 'c', 'b'} を含む文字列特徴列があると仮定し、エンコードを制御するために stringOrderType を設定します。

stringOrderType | Category mapped to 0 by StringIndexer |  Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b')          | least frequent category ('c')
'frequencyAsc'  | least frequent category ('c')         | most frequent category ('b')
'alphabetDesc'  | last alphabetical category ('c')      | first alphabetical category ('a')
'alphabetAsc'   | first alphabetical category ('a')     | last alphabetical category ('c')

ラベル列が文字列型の場合、StringIndexer によって frequencyDesc の順序を使用して最初に double に変換されます。ラベル列が DataFrame に存在しない場合、出力ラベル列は式で指定された応答変数から作成されます。

注:順序付けオプション stringOrderType はラベル列には使用されません。ラベル列がインデックス化されると、StringIndexer のデフォルトの降順頻度順序を使用します。

idcountryhourclicked を持つ DataFrame があると仮定します。

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

もし、clicked ~ country + hour という数式文字列を持つ RFormula を使用する場合、これは countryhour に基づいて clicked を予測したいということを示しています。変換後、次の DataFrame が得られるはずです。

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

API の詳細については、RFormula Python ドキュメントを参照してください。

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/rformula_example.py" にあります。

API の詳細については、RFormula Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala" にあります。

API の詳細については、RFormula Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

StructType schema = createStructType(new StructField[]{
  createStructField("id", IntegerType, false),
  createStructField("country", StringType, false),
  createStructField("hour", IntegerType, false),
  createStructField("clicked", DoubleType, false)
});

List<Row> data = Arrays.asList(
  RowFactory.create(7, "US", 18, 1.0),
  RowFactory.create(8, "CA", 12, 0.0),
  RowFactory.create(9, "NZ", 15, 0.0)
);

Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaRFormulaExample.java" にあります。

ChiSqSelector

ChiSqSelector は、カイ二乗特徴選択を表します。これは、カテゴリ特徴を持つラベル付きデータに対して動作します。ChiSqSelector は、どの特徴を選択するかを決定するために、独立性のカイ二乗検定を使用します。これは、次の 5 つの選択方法をサポートしています: numTopFeaturespercentilefprfdrfwe

予測対象として使用される idfeatures、および clicked の列を持つ DataFrame があると仮定します。

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0

numTopFeatures = 1ChiSqSelector を使用する場合、ラベル clicked に基づいて、features の最後の列が最も有用な特徴として選択されます。

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

API の詳細については、ChiSqSelector Python ドキュメントを参照してください。

from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/chisq_selector_example.py" にあります。

API の詳細については、ChiSqSelector Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala" にあります。

API の詳細については、ChiSqSelector Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(
  RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

ChiSqSelector selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
    + " features selected");
result.show();
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaChiSqSelectorExample.java" にあります。

UnivariateFeatureSelector

UnivariateFeatureSelector は、カテゴリ/連続特徴を持つカテゴリ/連続ラベルに対して動作します。ユーザーは featureType および labelType を設定でき、Spark は指定された featureType および labelType に基づいて使用するスコア関数を選択します。

featureType |  labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous  |categorical | ANOVATest (f_classif)
continuous  |continuous  | F-value (f_regression)

これは、次の 5 つの選択モードをサポートしています: numTopFeaturespercentilefprfdrfwe

デフォルトでは、選択モードは numTopFeatures で、デフォルトの selectionThreshold は 50 に設定されています。

予測対象として使用される idfeatures、および label の列を持つ DataFrame があると仮定します。

id | features                       | label
---|--------------------------------|---------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0

featureTypecontinuous に、labelTypecategorical に設定し、numTopFeatures = 1 とした場合、features の最後の列が最も有用な特徴として選択されます。

id | features                       | label   | selectedFeatures
---|--------------------------------|---------|------------------
 1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0     | [2.3]
 2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0     | [4.1]
 3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0     | [2.5]
 4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0     | [3.8]
 5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0     | [3.0]
 6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0     | [2.1]

API の詳細については、UnivariateFeatureSelector Python ドキュメントを参照してください。

from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
    (2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
    (3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
    (4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
    (5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
    (6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])

selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
                                     labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)

result = selector.fit(df).transform(df)

print("UnivariateFeatureSelector output with top %d features selected using f_classif"
      % selector.getSelectionThreshold())
result.show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/univariate_feature_selector_example.py" にあります。

API の詳細については、UnivariateFeatureSelector Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  (2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  (3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  (4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  (5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  (6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)

val df = spark.createDataset(data).toDF("id", "features", "label")

val selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
  s" features selected using f_classif")
result.show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/UnivariateFeatureSelectorExample.scala" にあります。

API の詳細については、UnivariateFeatureSelector Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
  RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
  RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
  RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
  RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
  RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty()),
  new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
  .setFeatureType("continuous")
  .setLabelType("categorical")
  .setSelectionMode("numTopFeatures")
  .setSelectionThreshold(1)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("UnivariateFeatureSelector output with top "
    + selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaUnivariateFeatureSelectorExample.java" にあります。

VarianceThresholdSelector

VarianceThresholdSelector は、分散の低い特徴を削除するセレクターです。varianceThreshold より大きくない(サンプル)分散を持つ特徴は削除されます。設定されていない場合、varianceThreshold はデフォルトで 0 に設定されます。これは、分散が 0 の特徴(つまり、すべてのサンプルで同じ値を持つ特徴)のみが削除されることを意味します。

予測対象として使用される id および features の列を持つ DataFrame があると仮定します。

id | features
---|--------------------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]

6 つの特徴のサンプル分散は、それぞれ 16.67、0.67、8.17、10.17、5.07、11.47 です。varianceThreshold = 8.0VarianceThresholdSelector を使用する場合、分散が 8.0 以下の特徴は削除されます。

id | features                       | selectedFeatures
---|--------------------------------|-------------------
 1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
 2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
 3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
 4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
 5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
 6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]

API の詳細については、VarianceThresholdSelector Python ドキュメントを参照してください。

from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
    (2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
    (3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
    (4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
    (5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
    (6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])

selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")

result = selector.fit(df).transform(df)

print("Output: Features with variance lower than %f are removed." %
      selector.getVarianceThreshold())
result.show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/variance_threshold_selector_example.py" にあります。

API の詳細については、VarianceThresholdSelector Scala ドキュメントを参照してください。

import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  (2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  (3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  (4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  (5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  (6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)

val df = spark.createDataset(data).toDF("id", "features")

val selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"Output: Features with variance lower than" +
  s" ${selector.getVarianceThreshold} are removed.")
result.show()
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/VarianceThresholdSelectorExample.scala" にあります。

API の詳細については、VarianceThresholdSelector Java ドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;

List<Row> data = Arrays.asList(
  RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
  RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
  RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
  RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
  RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
  RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});

Dataset<Row> df = spark.createDataFrame(data, schema);

VarianceThresholdSelector selector = new VarianceThresholdSelector()
  .setVarianceThreshold(8.0)
  .setFeaturesCol("features")
  .setOutputCol("selectedFeatures");

Dataset<Row> result = selector.fit(df).transform(df);

System.out.println("Output: Features with variance lower than "
    + selector.getVarianceThreshold() + " are removed.");
result.show();
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaVarianceThresholdSelectorExample.java" にあります。

Locality Sensitive Hashing

局所性鋭敏型ハッシュ法 (LSH) は、大規模なデータセットを使用したクラスタリング、近似最近傍探索、および外れ値検出で一般的に使用される、重要なハッシュ技術のクラスです。

LSH の一般的な考え方は、データポイントをバケットにハッシュするために、関数のファミリー(「LSH ファミリー」)を使用することです。これにより、互いに近いデータポイントは高い確率で同じバケットに入り、互いに離れたデータポイントは異なるバケットに入る可能性が非常に高くなります。LSH ファミリーは、次のように正式に定義されます。

メトリック空間 (M, d) において、M は集合であり、dM 上の距離関数である場合、LSH ファミリーとは、次のプロパティを満たす関数のファミリー h です: \[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \] この LSH ファミリーは、(r1, r2, p1, p2)-敏感型と呼ばれます。

Spark では、異なる LSH ファミリーは個別のクラス(例: MinHash)で実装され、各クラスで特徴変換、近似類似性結合、および近似最近傍探索のための API が提供されています。

LSH では、同じバケットにハッシュされる遠い入力特徴のペア($d(p,q) \geq r2$)を偽陽性と定義し、異なるバケットにハッシュされる近い特徴のペア($d(p,q) \leq r1$)を偽陰性と定義します。

LSH 操作

LSH が使用できる主な操作の種類について説明します。適合した LSH モデルには、これらの各操作のためのメソッドがあります。

特徴量変換

特徴変換は、ハッシュされた値を新しい列として追加する基本的な機能です。これは、次元削減に役立つ可能性があります。ユーザーは、inputCol および outputCol を設定して、入力および出力列名を指定できます。

LSH は、複数の LSH ハッシュテーブルもサポートしています。ユーザーは、numHashTables を設定して、ハッシュテーブルの数を指定できます。これは、近似類似性結合と近似最近傍探索における OR 増幅にも使用されます。ハッシュテーブルの数を増やすと精度は向上しますが、通信コストと実行時間も増加します。

outputCol の型は Seq[Vector] であり、配列の次元は numHashTables と等しく、ベクトルの次元は現在 1 に設定されています。将来のリリースでは、ユーザーがこれらのベクトルの次元を指定できるように、AND 増幅を実装する予定です。

近似類似結合

近似類似性結合は、2 つのデータセットを受け取り、データセット内の距離がユーザー定義の閾値よりも小さい行のペアを近似的に返します。近似類似性結合は、2 つの異なるデータセットの結合と自己結合の両方をサポートしています。自己結合は、重複するペアを生成します。

近似類似性結合は、変換されたデータセットと変換されていないデータセットの両方を入力として受け入れます。変換されていないデータセットが使用される場合、自動的に変換されます。この場合、ハッシュ署名は outputCol として作成されます。

結合されたデータセットでは、元のデータセットはdatasetAdatasetBでクエリできます。出力データセットには、返された各行のペア間の真の距離を示す距離列が追加されます。

近似最近傍探索は、(特徴ベクトルの)データセットとキー(単一の特徴ベクトル)を受け取り、そのキーに最も近いデータセット内の指定された数の行を近似的に返します。

近似最近傍探索は、変換されたデータセットと未変換のデータセットの両方を入力として受け入れます。未変換のデータセットが使用された場合、自動的に変換されます。この場合、ハッシュ署名はoutputColとして作成されます。

出力データセットには、各出力行と検索されたキーとの間の真の距離を示す距離列が追加されます。

注意: 近似最近傍探索では、ハッシュバケット内に十分な候補がない場合、k行よりも少ない行が返されます。

LSH アルゴリズム

ユークリッド距離に対するバケット化ランダム射影

バケット化ランダム射影は、ユークリッド距離のためのLSHファミリです。ユークリッド距離は次のように定義されます: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \] そのLSHファミリは、特徴ベクトル$\mathbf{x}$をランダムな単位ベクトル$\mathbf{v}$に射影し、射影された結果をハッシュバケットに分割します: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \] ここで、rはユーザー定義のバケット長です。バケット長は、ハッシュバケットの平均サイズ(つまり、バケット数)を制御するために使用できます。バケット長が大きいほど(つまり、バケットが少ないほど)、特徴が同じバケットにハッシュされる確率が高くなります(真陽性および偽陽性の数が増加します)。

バケット化ランダム射影は、任意ベクトルを入力特徴として受け入れ、スパースベクトルと密ベクトルの両方をサポートします。

APIの詳細については、BucketedRandomProjectionLSH Pythonドキュメントを参照してください。

from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.dense([1.0, 1.0]),),
         (1, Vectors.dense([1.0, -1.0]),),
         (2, Vectors.dense([-1.0, -1.0]),),
         (3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(4, Vectors.dense([1.0, 0.0]),),
         (5, Vectors.dense([-1.0, 0.0]),),
         (6, Vectors.dense([0.0, 1.0]),),
         (7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.dense([1.0, 0.0])

brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
                                  numHashTables=3)
model = brp.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("EuclideanDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/bucketed_random_projection_lsh_example.py"にあります。

APIの詳細については、BucketedRandomProjectionLSH Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = brp.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala"にあります。

APIの詳細については、BucketedRandomProjectionLSH Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.dense(1.0, 1.0)),
  RowFactory.create(1, Vectors.dense(1.0, -1.0)),
  RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
  RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);

List<Row> dataB = Arrays.asList(
    RowFactory.create(4, Vectors.dense(1.0, 0.0)),
    RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
    RowFactory.create(6, Vectors.dense(0.0, 1.0)),
    RowFactory.create(7, Vectors.dense(0.0, -1.0))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

Vector key = Vectors.dense(1.0, 0.0);

BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("features")
  .setOutputCol("hashes");

BucketedRandomProjectionLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("EuclideanDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaBucketedRandomProjectionLSHExample.java"にあります。

ジャカード距離に対する MinHash

MinHashは、入力特徴が自然数の集合である場合のジャカード距離のLSHファミリです。2つの集合のジャカード距離は、それらの交差と結合のカーディナリティによって定義されます: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \] MinHashは、ランダムなハッシュ関数gを集合内の各要素に適用し、すべてのハッシュ値の最小値を取ります: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]

MinHashの入力集合は、バイナリベクトルとして表現されます。ここで、ベクトルのインデックスは要素自体を表し、ベクトルの非ゼロ値は、その要素が集合内に存在することを表します。密ベクトルとスパースベクトルの両方がサポートされていますが、通常は効率のためにスパースベクトルが推奨されます。たとえば、Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])は、空間内に10個の要素があることを意味します。このセットには、要素2、要素3、要素5が含まれています。すべての非ゼロ値は、バイナリの「1」値として扱われます。

注意: 空のセットはMinHashによって変換できません。つまり、入力ベクトルには少なくとも1つの非ゼロエントリが必要です。

APIの詳細については、MinHashLSH Pythonドキュメントを参照してください。

from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col

dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
         (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
         (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])

dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
         (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
         (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])

key = Vectors.sparse(6, [1, 3], [1.0, 1.0])

mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)

# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
    .select(col("datasetA.id").alias("idA"),
            col("datasetB.id").alias("idB"),
            col("JaccardDistance")).show()

# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/python/ml/min_hash_lsh_example.py"にあります。

APIの詳細については、MinHashLSH Scalaドキュメントを参照してください。

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes")

val model = mh.fit(dfA)

// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show()

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala"にあります。

APIの詳細については、MinHashLSH Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.col;

List<Row> dataA = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

List<Row> dataB = Arrays.asList(
  RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
  RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);

StructType schema = new StructType(new StructField[]{
  new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
  new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);

int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);

MinHashLSH mh = new MinHashLSH()
  .setNumHashTables(5)
  .setInputCol("features")
  .setOutputCol("hashes");

MinHashLSHModel model = mh.fit(dfA);

// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
  .select(col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")).show();

// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
完全なサンプルコードは、Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashLSHExample.java"にあります。