特徴量の抽出、変換、選択
このセクションでは、特徴量に関するアルゴリズムを、おおよそ以下のグループに分けて説明します。
- 抽出: 「生」データから特徴量を抽出する
- 変換: 特徴量のスケーリング、変換、または変更
- 選択: より大きな特徴量のセットからサブセットを選択する
- Locality Sensitive Hashing (LSH): このアルゴリズム群は、特徴量変換と他のアルゴリズムの側面を組み合わせています。
目次
- 特徴量抽出器
- 特徴量変換器
- Tokenizer
- StopWordsRemover
- $n$-gram
- Binarizer
- PCA
- PolynomialExpansion
- 離散コサイン変換 (DCT)
- StringIndexer
- IndexToString
- OneHotEncoder
- TargetEncoder
- VectorIndexer
- Interaction
- Normalizer
- StandardScaler
- RobustScaler
- MinMaxScaler
- MaxAbsScaler
- Bucketizer
- ElementwiseProduct
- SQLTransformer
- VectorAssembler
- VectorSizeHint
- QuantileDiscretizer
- Imputer
- 特徴量選択器
- Locality Sensitive Hashing
特徴量抽出器
TF-IDF
Term frequency-inverse document frequency (TF-IDF) は、コーパスにおける文書に対する単語の重要性を反映するために、テキストマイニングで広く使用される特徴量ベクトル化手法です。単語を $t$、文書を $d$、コーパスを $D$ とします。単語頻度 $TF(t, d)$ は、単語 $t$ が文書 $d$ に出現する回数ですが、文書頻度 $DF(t, D)$ は、単語 $t$ を含む文書の数です。単語頻度のみを重要度の測定に使用すると、頻繁に出現するが文書に関する情報がほとんどない単語(例:「a」、「the」、「of」)を過度に強調しやすくなります。単語がコーパス全体で頻繁に出現する場合、それは特定の文書に関する特別な情報を持っていないことを意味します。逆文書頻度は、単語がどれだけの情報を提供するかを示す数値尺度です。\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \] ここで $|D|$ はコーパス内の文書の総数です。対数が使用されているため、単語がすべての文書に出現する場合、その IDF 値は 0 になります。コーパス外の単語のゼロ除算を避けるために、平滑化項が適用されていることに注意してください。TF-IDF 尺度は、TF と IDF の積にすぎません。\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \] 単語頻度と文書頻度の定義にはいくつかのバリエーションがあります。MLlib では、柔軟性を持たせるために TF と IDF を分離しています。
TF: HashingTF と CountVectorizer の両方を使用して、単語頻度ベクトルを生成できます。
HashingTF は、単語のセットを受け取り、それらのセットを固定長のフィーチャベクトルに変換する Transformer です。テキスト処理では、「単語のセット」は単語のバッグ(bag of words)である場合があります。HashingTF は、ハッシュトリックを利用します。生のフィーチャは、ハッシュ関数を適用することによってインデックス(単語)にマッピングされます。ここで使用されるハッシュ関数は MurmurHash 3 です。次に、単語頻度は、マッピングされたインデックスに基づいて計算されます。このアプローチは、大規模コーパスでは高価になる可能性のあるグローバルな単語からインデックスへのマップの計算を不要にしますが、ハッシュ衝突の可能性という欠点があります。つまり、異なる生のフィーチャがハッシュ後に同じ単語になる可能性があります。衝突の可能性を減らすには、ターゲットフィーチャの次元、つまりハッシュテーブルのバケット数を増やすことができます。ハッシュ値に対する単純な modulo 演算がベクトルインデックスを決定するために使用されるため、フィーチャ次元として 2 のべき乗を使用することが推奨されます。そうしないと、フィーチャはベクトルインデックスに均等にマッピングされません。デフォルトのフィーチャ次元は $2^{18} = 262,144$ です。オプションのバイナリートグル パラメータは、単語頻度カウントを制御します。true に設定すると、すべての非ゼロ頻度カウントが 1 に設定されます。これは、整数カウントではなくバイナリカウントをモデル化する離散確率モデルに特に役立ちます。
CountVectorizer は、テキスト文書を単語カウントのベクトルに変換します。詳細については、CountVectorizer を参照してください。
IDF: IDF は、データセットで適合され、IDFModel を生成する Estimator です。IDFModel は、フィーチャベクトル(一般的には HashingTF または CountVectorizer から作成されたもの)を受け取り、各フィーチャをスケーリングします。直感的には、コーパスで頻繁に出現するフィーチャの重みを減らします。
注意: spark.ml はテキストセグメンテーションのためのツールを提供していません。ユーザーは Stanford NLP Group および scalanlp/chalk を参照してください。
例
以下のコードセグメントでは、一連の文から始めます。Tokenizer を使用して各文を単語に分割します。各文(単語のバッグ)について、HashingTF を使用して文をフィーチャベクトルにハッシュします。IDF を使用してフィーチャベクトルを再スケーリングします。これは、テキストを特徴量として使用する場合、通常パフォーマンスを向上させます。これらのフィーチャベクトルは、その後、学習アルゴリズムに渡すことができます。
API の詳細については、HashingTF Python ドキュメントおよびIDF Python ドキュメントを参照してください。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()API の詳細については、HashingTF Scala ドキュメントおよびIDF Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()API の詳細については、HashingTF Java ドキュメントおよびIDF Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, "Hi I heard about Spark"),
RowFactory.create(0.0, "I wish Java could use case classes"),
RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();Word2Vec
Word2Vec は、文書を表す単語のシーケンスを受け取り、Word2VecModel をトレーニングする Estimator です。このモデルは、各単語をユニークな固定サイズのベクトルにマッピングします。Word2VecModel は、文書内のすべての単語の平均を使用して各文書をベクトルに変換します。このベクトルは、予測、文書類似性計算などのための特徴量として使用できます。詳細については、Word2Vec に関する MLlib ユーザーガイドを参照してください。
例
以下のコードセグメントでは、各文書が単語のシーケンスとして表される一連の文書から始めます。各文書について、それをフィーチャベクトルに変換します。このフィーチャベクトルは、その後、学習アルゴリズムに渡すことができます。
API の詳細については、Word2Vec Python ドキュメントを参照してください。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))API の詳細については、Word2Vec Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }API の詳細については、Word2Vec Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);
for (Row row : result.collectAsList()) {
List<String> text = row.getList(0);
Vector vector = (Vector) row.get(1);
System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}CountVectorizer
CountVectorizer および CountVectorizerModel は、テキスト文書のコレクションを単語カウントのベクトルに変換するのに役立ちます。事前定義された辞書がない場合、CountVectorizer は Estimator として使用して語彙を抽出し、CountVectorizerModel を生成できます。このモデルは、語彙に対して文書のスパース表現を生成し、これを LDA などの他のアルゴリズムに渡すことができます。
適合プロセス中に、CountVectorizer は、コーパス全体での単語頻度で並べられた上位 vocabSize 個の単語を選択します。オプションのパラメータ minDF は、語彙に含めるために単語が出現する必要のある文書の最小数(または 1.0 未満の場合はその割合)を指定することによっても、適合プロセスに影響を与えます。別のオプションのバイナリートグル パラメータは、出力ベクトルを制御します。true に設定すると、すべての非ゼロカウントが 1 に設定されます。これは、整数カウントではなくバイナリカウントをモデル化する離散確率モデルに特に役立ちます。
例
以下のような DataFrame が、id と texts の列を持つと仮定します。
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
texts の各行は Array[String] 型の文書です。CountVectorizer の fit を呼び出すと、語彙(a, b, c)を持つ CountVectorizerModel が生成されます。その後、変換後の出力列「vector」には以下が含まれます。
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
各ベクトルは、文書の単語カウントを語彙上で表します。
API の詳細については、CountVectorizer Python ドキュメントおよびCountVectorizerModel Python ドキュメントを参照してください。
from pyspark.ml.feature import CountVectorizer
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)API の詳細については、CountVectorizer Scala ドキュメントおよびCountVectorizerModel Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).show(false)API の詳細については、CountVectorizer Java ドキュメントおよびCountVectorizerModel Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show(false);FeatureHasher
特徴量ハッシュは、カテゴリカルまたは数値の特徴量を指定された次元の特徴量ベクトルに投影します(通常、元の特徴空間よりも大幅に小さい)。これは、ハッシュトリックを使用して特徴量を特徴量ベクトル内のインデックスにマッピングすることによって行われます。
FeatureHasher トランスフォーマーは、複数の列を操作します。各列には、数値特徴量またはカテゴリカル特徴量のいずれかを含めることができます。列のデータ型とその処理は次のとおりです。
- 数値列: 数値特徴量の場合、列名のハッシュ値を使用して、特徴量値を特徴量ベクトル内のインデックスにマッピングします。デフォルトでは、数値特徴量はカテゴリカルとは見なされません(整数であっても)。それらをカテゴリカルと見なすには、
categoricalColsパラメータを使用して関連する列を指定します。 - 文字列列: カテゴリカル特徴量の場合、「列名=値」という文字列のハッシュ値がベクトルインデックスにマッピングされ、インジケーター値は $1.0$ になります。したがって、カテゴリカル特徴量は「ワンホット」エンコードされます(
dropLast=falseを使用した OneHotEncoder と同様)。 - ブール列: ブール値は、文字列列と同様の方法で扱われます。つまり、ブール特徴量は「列名=true」または「列名=false」として表され、インジケーター値は $1.0$ になります。
NULL(欠損)値は無視されます(結果の特徴量ベクトルでは暗黙的にゼロ)。
ここで使用されるハッシュ関数も、HashingTF で使用される MurmurHash 3 です。ハッシュ値に対する単純な modulo 演算がベクトルインデックスを決定するために使用されるため、フィーチャ次元として 2 のべき乗を使用することが推奨されます。そうしないと、フィーチャはベクトルインデックスに均等にマッピングされません。
例
real、bool、stringNum、string の 4 つの入力列を持つ DataFrame があると仮定します。これらの異なるデータ型を入力として使用すると、特徴量ベクトルの列を生成するための transform の動作が例示されます。
real| bool|stringNum|string
----|-----|---------|------
2.2| true| 1| foo
3.3|false| 2| bar
4.4|false| 3| baz
5.5|false| 4| foo
その後、この DataFrame に対する FeatureHasher.transform の出力は次のようになります。
real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1 |foo |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2 |bar |(262144,[6031, 80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3 |baz |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4 |foo |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])
結果の特徴量ベクトルは、その後、学習アルゴリズムに渡すことができます。
API の詳細については、FeatureHasher Python ドキュメントを参照してください。
from pyspark.ml.feature import FeatureHasher
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)API の詳細については、FeatureHasher Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.FeatureHasher
val dataset = spark.createDataFrame(Seq(
(2.2, true, "1", "foo"),
(3.3, false, "2", "bar"),
(4.4, false, "3", "baz"),
(5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")
val hasher = new FeatureHasher()
.setInputCols("real", "bool", "stringNum", "string")
.setOutputCol("features")
val featurized = hasher.transform(dataset)
featurized.show(false)API の詳細については、FeatureHasher Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(2.2, true, "1", "foo"),
RowFactory.create(3.3, false, "2", "bar"),
RowFactory.create(4.4, false, "3", "baz"),
RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
FeatureHasher hasher = new FeatureHasher()
.setInputCols(new String[]{"real", "bool", "stringNum", "string"})
.setOutputCol("features");
Dataset<Row> featurized = hasher.transform(dataset);
featurized.show(false);特徴量変換器
Tokenizer
トークン化は、テキスト(文など)を受け取り、個々の単語(通常は単語)に分割するプロセスです。単純な Tokenizer クラスは、この機能を提供します。以下の例は、文を単語のシーケンスに分割する方法を示しています。
RegexTokenizer は、正規表現(regex)マッチングに基づいた、より高度なトークン化を可能にします。デフォルトでは、「pattern」(正規表現、デフォルト: "\\s+")パラメータが、入力テキストを分割するための区切り文字として使用されます。あるいは、ユーザーは「gaps」パラメータを false に設定して、正規表現「pattern」が区切りギャップではなく「トークン」を示すことを意味し、一致するすべての出現箇所をトークン化結果として見つけることができます。
例
API の詳細については、Tokenizer Python ドキュメントおよびRegexTokenizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)API の詳細については、Tokenizer Scala ドキュメントおよびRegexTokenizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val countTokens = udf { (words: Seq[String]) => words.length }
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)API の詳細については、Tokenizer Java ドキュメントおよびRegexTokenizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import scala.collection.mutable.Seq;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
spark.udf().register(
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);StopWordsRemover
ストップワードは、通常、頻繁に出現し、それほど意味を持たない単語であるため、入力から除外されるべき単語です。
StopWordsRemover は、文字列のシーケンス(例: Tokenizer の出力)を入力として受け取り、入力シーケンスからすべてのストップワードを削除します。ストップワードのリストは、stopWords パラメータで指定されます。一部の言語のデフォルトのストップワードは、StopWordsRemover.loadDefaultStopWords(language) を呼び出すことでアクセスできます。利用可能なオプションは、「danish」、「dutch」、「english」、「finnish」、「french」、「german」、「hungarian」、「italian」、「norwegian」、「portuguese」、「russian」、「spanish」、「swedish」、「turkish」です。ブールパラメータ caseSensitive は、マッチングが大文字と小文字を区別するかどうかを示します(デフォルトは false)。
例
以下のような DataFrame が、id と raw の列を持つと仮定します。
id | raw
----|----------
0 | [I, saw, the, red, balloon]
1 | [Mary, had, a, little, lamb]
入力列として raw、出力列として filtered を指定して StopWordsRemover を適用すると、次のようになります。
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, balloon] | [saw, red, balloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
filtered では、ストップワード「I」、「the」、「had」、「a」がフィルタリングされています。
API の詳細については、StopWordsRemover Python ドキュメントを参照してください。
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)API の詳細については、StopWordsRemover Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "balloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show(false)API の詳細については、StopWordsRemover Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);$n$-gram
$n$-gram は、$n$ 個のトークン(通常は単語)のシーケンスであり、$n$ は整数です。NGram クラスは、入力特徴量を $n$-gram に変換するために使用できます。
NGram は、文字列のシーケンス(例: Tokenizer の出力)を入力として受け取ります。パラメータ n は、各 $n$-gram の単語数を決定するために使用されます。出力は、$n$-gram のシーケンスで構成され、各 $n$-gram は $n$ 個の連続する単語のスペース区切り文字列として表されます。入力シーケンスに $n$ 個未満の文字列が含まれている場合、出力は生成されません。
例
API の詳細については、NGram Python ドキュメントを参照してください。
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)API の詳細については、NGram Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = spark.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")
val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)API の詳細については、NGram Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");
Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);Binarizer
二値化は、数値特徴量を二値(0/1)特徴量に閾値処理するプロセスです。
Binarizer は、共通のパラメータ inputCol および outputCol、さらに二値化の threshold を取ります。閾値より大きい特徴量値は 1.0 に二値化され、閾値以下の値は 0.0 に二値化されます。inputCol には、Vector 型と Double 型の両方がサポートされています。
例
API の詳細については、Binarizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()API の詳細については、Binarizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Binarizer
val data = immutable.ArraySeq.unsafeWrapArray(Array((0, 0.1), (1, 0.8), (2, 0.2)))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()API の詳細については、Binarizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);
System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();PCA
PCA は、相関する可能性のある変数の観測値のセットを、主成分と呼ばれる線形に無相関な変数の値のセットに変換するために直交変換を使用する統計的手順です。PCA クラスは、PCA を使用してベクトルを低次元空間に投影するモデルをトレーニングします。以下の例は、5 次元の特徴量ベクトルを 3 次元の主成分に投影する方法を示しています。
例
API の詳細については、PCA Python ドキュメントを参照してください。
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)API の詳細については、PCA Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply)))
.toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val result = pca.transform(df).select("pcaFeatures")
result.show(false)API の詳細については、PCA Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);PolynomialExpansion
多項式展開は、元の次元の n 次の組み合わせによって定式化される多項式空間に特徴量を展開するプロセスです。PolynomialExpansion クラスは、この機能を提供します。以下の例は、特徴量を 3 次の多項式空間に展開する方法を示しています。
例
API の詳細については、PolynomialExpansion Python ドキュメントを参照してください。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)API の詳細については、PolynomialExpansion Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(2.0, 1.0),
Vectors.dense(0.0, 0.0),
Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply)))
.toDF("features")
val polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polyExpansion.transform(df)
polyDF.show(false)API の詳細については、PolynomialExpansion Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(2.0, 1.0)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);離散コサイン変換 (DCT)
離散コサイン変換(DCT)は、時間領域の長さ $N$ の実数シーケンスを、周波数領域の別の長さ $N$ の実数シーケンスに変換します。DCT クラスは、この機能を提供し、DCT-II を実装し、$1/\sqrt{2}$ で結果をスケーリングして、変換の表現行列がユニタリになるようにします。変換されたシーケンスにはシフトは適用されません(例: 変換されたシーケンスの 0 番目の要素は、0 番目の DCT 係数であり、$N/2$ 番目の要素ではありません)。
例
API の詳細については、DCT Python ドキュメントを参照してください。
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)API の詳細については、DCT Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)API の詳細については、DCT Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
Dataset<Row> dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(false);StringIndexer
StringIndexer は、文字列ラベルの列をラベルインデックスの列にエンコードします。StringIndexer は複数の列をエンコードできます。インデックスは $[0, \text{numLabels})$ の範囲にあり、4 つの順序オプションがサポートされています:「frequencyDesc」(ラベル頻度の降順、最も頻繁なラベルに 0 を割り当てる)、「frequencyAsc」(ラベル頻度の昇順、最も頻度の低いラベルに 0 を割り当てる)、「alphabetDesc」(降順アルファベット順)、「alphabetAsc」(昇順アルファベット順)(デフォルト =「frequencyDesc」)。「frequencyDesc」/「frequencyAsc」の下で頻度が等しい場合、文字列はさらにアルファベット順にソートされることに注意してください。
ユーザーがラベルを保持することを選択した場合、未知のラベルは numLabels のインデックスに配置されます。入力列が数値の場合、それを文字列にキャストし、文字列値をインデックス化します。後続のパイプラインコンポーネント(Estimator または Transformer など)がこの文字列インデックス付けされたラベルを使用する場合、コンポーネントの入力列をこの文字列インデックス付けされた列名に設定する必要があります。多くの場合、setInputCol で入力列を設定できます。
例
以下のような DataFrame が、id と category の列を持つと仮定します。
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category は 3 つのラベル(「a」、「b」、「c」)を持つ文字列列です。category を入力列、categoryIndex を出力列として StringIndexer を適用すると、次のようになります。
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
「a」は最も頻繁であるためインデックス 0 を取得し、次に頻度 1 の「c」、インデックス 2 の「b」が続きます。
さらに、1 つのデータセットで StringIndexer を適合させ、それを別のデータセットに変換する場合に、StringIndexer が未知のラベルをどのように処理するかについて、3 つの戦略があります。
- 例外をスローする(デフォルト)
- 未知のラベルを含む行を完全にスキップする
- 未知のラベルを特別な追加バケット(インデックス numLabels)に配置する
例
前の例に戻り、以前に定義した StringIndexer を次のデータセットで再利用してみましょう。
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
4 | e
StringIndexer が未知のラベルの処理方法を設定していないか、「error」に設定している場合、例外がスローされます。ただし、setHandleInvalid("skip") を呼び出していた場合、次のデータセットが生成されます。
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
「d」または「e」を含む行が表示されないことに注意してください。
setHandleInvalid("keep") を呼び出すと、次のデータセットが生成されます。
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | d | 3.0
4 | e | 3.0
「d」または「e」を含む行がインデックス「3.0」にマッピングされていることに注意してください。
API の詳細については、StringIndexer Python ドキュメントを参照してください。
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()API の詳細については、StringIndexer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()API の詳細については、StringIndexer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();IndexToString
StringIndexer と対称的に、IndexToString は、ラベルインデックスの列を元のラベルを文字列として含む列にマッピングします。一般的なユースケースは、StringIndexer でラベルからインデックスを生成し、それらのインデックスでモデルをトレーニングし、IndexToString で予測インデックスの列から元のラベルを取得することです。ただし、独自のラベルを提供することも自由です。
例
StringIndexer の例を拡張して、id と categoryIndex の列を持つ次の DataFrame があると仮定します。
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
入力列として categoryIndex、出力列として originalCategory を指定して IndexToString を適用すると、元のラベルを(列のメタデータから推測されます)取得できます。
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
API の詳細については、IndexToString Python ドキュメントを参照してください。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()API の詳細については、IndexToString Scala ドキュメントを参照してください。
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
println(s"Transformed string column '${indexer.getInputCol}' " +
s"to indexed column '${indexer.getOutputCol}'")
indexed.show()
val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
s"${Attribute.fromStructField(inputColSchema).toString}\n")
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()API の詳細については、IndexToString Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
"to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();
StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
Attribute.fromStructField(inputColSchema).toString() + "\n");
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);
System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
"original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();OneHotEncoder
One-hotエンコーディングは、ラベルインデックスとして表されるカテゴリカル特徴量を、最大で 1 つの 1 を持つバイナリベクトルにマッピングし、すべての特徴量値のセットの中から特定のフィーチャ値の存在を示します。このエンコーディングにより、ロジスティック回帰や決定木などの連続特徴量を期待するアルゴリズムで、カテゴリカル特徴量を使用できるようになります。文字列型の入力データの場合、まず StringIndexer を使用してカテゴリカル特徴量をエンコードするのが一般的です。
OneHotEncoder は複数の列を変換でき、各入力列に対して 1 つのワンホットエンコードされた出力ベクトル列を返します。これらのベクトルを VectorAssembler を使用して単一の特徴量ベクトルにマージするのが一般的です。
OneHotEncoder は、変換中に無効な入力を処理する方法を選択するために handleInvalid パラメータをサポートしています。利用可能なオプションは、「keep」(無効な入力は追加のカテゴリカルインデックスに割り当てられる)と「error」(エラーをスローする)です。
例
API の詳細については、OneHotEncoder Python ドキュメントを参照してください。
from pyspark.ml.feature import OneHotEncoder
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()API の詳細については、OneHotEncoder Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.OneHotEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)
val encoded = model.transform(df)
encoded.show()API の詳細については、OneHotEncoder Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0),
RowFactory.create(1.0, 0.0),
RowFactory.create(2.0, 1.0),
RowFactory.create(0.0, 2.0),
RowFactory.create(0.0, 1.0),
RowFactory.create(2.0, 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryVec1", "categoryVec2"});
OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();TargetEncoder
ターゲットエンコーディングは、高カーディナリティのカテゴリカル特徴量を、回帰タイプのモデルで使用するのに適した準連続スカラー属性に変換するデータ前処理技術です。このパラダイムは、独立した特徴量の個々の値を、従属属性の何らかの推定値(つまり、ターゲットに関して類似した統計を示すカテゴリ値が類似した表現を持つ)を表すスカラーにマッピングします。
カテゴリカル特徴量とターゲット変数との関係を活用することで、ターゲットエンコーディングは通常ワンホットエンコーディングよりも優れたパフォーマンスを発揮し、最終的なバイナリベクトルエンコーディングを必要としないため、データセットの全体的な次元が削減されます。
ユーザーは、単一列のユースケースのために inputCol および outputCol を設定するか、複数列のユースケースのために inputCols および outputCols を設定することで、入力および出力列名を指定できます(両方の配列は同じサイズである必要があります)。これらの列は、カテゴリカルインデックス(正の整数)を含むことが期待され、欠損値(null)は別のカテゴリとして扱われます。データ型は、‘NumericType’ のサブクラスである必要があります。文字列型の入力データの場合、まず StringIndexer を使用してカテゴリカル特徴量をエンコードするのが一般的です。
ユーザーは、エンコーディングが導出されるグランドトゥルース ラベルを含む列である label を設定することで、ターゲット列名を指定できます。欠損ラベル(null)を持つ観測値は、推定値の計算には考慮されません。データ型は、‘NumericType’ のサブクラスである必要があります。
TargetEncoder は、トレーニング時に見られなかったカテゴリを含む無効な入力を新しいデータにエンコードする際に、無効な入力を処理する方法を選択するために handleInvalid パラメータをサポートしています。利用可能なオプションは、「keep」(無効な入力は追加のカテゴリカルインデックスに割り当てられる)と「error」(例外をスローする)です。
TargetEncoder は、データセットを適合させる際にラベルのタイプを選択し、推定値の計算方法に影響を与える targetType パラメータをサポートしています。利用可能なオプションは、「binary」と「continuous」です。
「binary」に設定すると、ターゲット属性 $Y$ はバイナリ、$Y \in \{0, 1\}$ であることが期待されます。変換は、個々の値 $X_i$ を、$X=X_i$ であるという条件付き確率にマッピングします: $S_i = P(Y \mid X = X_i)$。このアプローチは bin-counting としても知られています。
「continuous」に設定すると、ターゲット属性 $Y$ は連続、$Y \in \mathbb{Q}$ であることが期待されます。変換は、個々の値 $X_i$ を、$X=X_i$ であるという条件付き期待値にマッピングします: $S_i = E[Y \mid X = X_i]$。このアプローチは mean-encoding としても知られています。
TargetEncoder は、カテゴリ内の統計と全体的な統計をどのようにブレンドするかを調整するために smoothing パラメータをサポートしています。高カーディナリティのカテゴリカル特徴量は、通常、$X$ のすべての可能な値にわたって均等に分布していません。したがって、$S_i$ をカテゴリ内の統計のみに基づいて計算すると、これらの推定値は非常に信頼性が低くなり、めったに見られないカテゴリは過学習を引き起こす可能性が非常に高くなります。
平滑化は、データセット全体における特定クラスの相対的なサイズに応じて、カテゴリ内の推定値と全体的な推定値を重み付けすることで、この動作を防ぎます。
$\;\;\; S_i = \lambda(n_i) \, P(Y \mid X = X_i) + (1 - \lambda(n_i)) \, P(Y)$(バイナリの場合)
$\;\;\; S_i = \lambda(n_i) \, E[Y \mid X = X_i] + (1 - \lambda(n_i)) \, E[Y]$(連続の場合)
ここで、$\lambda(n_i)$ は $n_i$ について単調増加し、0 と 1 の間に制限される関数です。
通常、$\lambda(n_i)$ はパラメータ関数 $\lambda(n_i) = \frac{n_i}{n_i + m}$ として実装されます。ここで $m$ は平滑化係数であり、TargetEncoder の smoothing パラメータによって表されます。
例
TargetEncoder の例を拡張して、feature と target(バイナリおよび連続)の列を持つ次の DataFrame があると仮定します。
feature | target | target
| (bin) | (cont)
--------|--------|--------
1 | 0 | 1.3
1 | 1 | 2.5
1 | 0 | 1.6
2 | 1 | 1.8
2 | 0 | 2.4
3 | 1 | 3.2
ターゲットタイプを「binary」、入力列を feature、ラベル列を target (bin)、出力列を encoded として TargetEncoder を適用すると、データにモデルを適合させてエンコーディングを学習し、これらのマッピングに従ってデータを変換できます。
feature | target | encoded
| (bin) |
--------|--------|--------
1 | 0 | 0.333
1 | 1 | 0.333
1 | 0 | 0.333
2 | 1 | 0.5
2 | 0 | 0.5
3 | 1 | 1.0
ターゲットタイプを「continuous」、入力列を feature、ラベル列を target (cont)、出力列を encoded として TargetEncoder を適用すると、データにモデルを適合させてエンコーディングを学習し、これらのマッピングに従ってデータを変換できます。
feature | target | encoded
| (cont) |
--------|--------|--------
1 | 1.3 | 1.8
1 | 2.5 | 1.8
1 | 1.6 | 1.8
2 | 1.8 | 2.1
2 | 2.4 | 2.1
3 | 3.2 | 3.2
API の詳細については、TargetEncoder Python ドキュメントを参照してください。
from pyspark.ml.feature import TargetEncoder
df = spark.createDataFrame(
[
(0.0, 1.0, 0, 10.0),
(1.0, 0.0, 1, 20.0),
(2.0, 1.0, 0, 30.0),
(0.0, 2.0, 1, 40.0),
(0.0, 1.0, 0, 50.0),
(2.0, 0.0, 1, 60.0),
],
["categoryIndex1", "categoryIndex2", "binaryLabel", "continuousLabel"],
)
# binary target
encoder = TargetEncoder(
inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryIndex1Target", "categoryIndex2Target"],
labelCol="binaryLabel",
targetType="binary"
)
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
# continuous target
encoder = TargetEncoder(
inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryIndex1Target", "categoryIndex2Target"],
labelCol="continuousLabel",
targetType="continuous"
)
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()API の詳細については、TargetEncoder Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.TargetEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0, 0, 10.0),
(1.0, 0.0, 1, 20.0),
(2.0, 1.0, 0, 30.0),
(0.0, 2.0, 1, 40.0),
(0.0, 1.0, 0, 50.0),
(2.0, 0.0, 1, 60.0)
)).toDF("categoryIndex1", "categoryIndex2",
"binaryLabel", "continuousLabel")
// binary target
val bin_encoder = new TargetEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryIndex1Target", "categoryIndex2Target"))
.setLabelCol("binaryLabel")
.setTargetType("binary");
val bin_model = bin_encoder.fit(df)
val bin_encoded = bin_model.transform(df)
bin_encoded.show()
// continuous target
val cont_encoder = new TargetEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryIndex1Target", "categoryIndex2Target"))
.setLabelCol("continuousLabel")
.setTargetType("continuous");
val cont_model = cont_encoder.fit(df)
val cont_encoded = cont_model.transform(df)
cont_encoded.show()API の詳細については、TargetEncoder Java ドキュメントを参照してください。
import org.apache.spark.ml.feature.TargetEncoder;
import org.apache.spark.ml.feature.TargetEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.List;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0, 0, 10.0),
RowFactory.create(1.0, 0.0, 1, 20.0),
RowFactory.create(2.0, 1.0, 0, 30.0),
RowFactory.create(0.0, 2.0, 1, 40.0),
RowFactory.create(0.0, 1.0, 0, 50.0),
RowFactory.create(2.0, 0.0, 1, 60.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("binaryLabel", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("continuousLabel", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// binary target
TargetEncoder bin_encoder = new TargetEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryIndex1Target", "categoryIndex2Target"})
.setLabelCol("binaryLabel")
.setTargetType("binary");
TargetEncoderModel bin_model = bin_encoder.fit(df);
Dataset<Row> bin_encoded = bin_model.transform(df);
bin_encoded.show();
// continuous target
TargetEncoder cont_encoder = new TargetEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryIndex1Target", "categoryIndex2Target"})
.setLabelCol("continuousLabel")
.setTargetType("continuous");
TargetEncoderModel cont_model = cont_encoder.fit(df);
Dataset<Row> cont_encoded = cont_model.transform(df);
cont_encoded.show();VectorIndexer
VectorIndexer は、Vector のデータセットでカテゴリカル特徴量をインデックス化するのに役立ちます。これは、どちらの特徴量をカテゴリカルと見なすべきかを自動的に判断し、元の値をカテゴリインデックスに変換できます。具体的には、次のことを行います。
Vector型の入力列とmaxCategoriesパラメータを取ります。- 異なる値の数に基づいて、どの特徴量をカテゴリカルと見なすべきかを決定します。
maxCategories以下の特徴量はカテゴリカルと宣言されます。 - 各カテゴリカル特徴量に対して 0 ベースのカテゴリインデックスを計算します。
- カテゴリカル特徴量をインデックス化し、元の特徴量値をインデックスに変換します。
カテゴリカル特徴量をインデックス化することで、決定木やツリーアンサンブルなどのアルゴリズムがカテゴリカル特徴量を適切に扱えるようになり、パフォーマンスが向上します。
例
以下の例では、ラベル付きポイントのデータセットを読み込み、VectorIndexer を使用してどの特徴量をカテゴリカルとして扱うべきかを判断します。カテゴリカル特徴量値をそのインデックスに変換します。この変換されたデータは、カテゴリカル特徴量を扱う DecisionTreeRegressor などのアルゴリズムに渡すことができます。
API の詳細については、VectorIndexer Python ドキュメントを参照してください。
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()API の詳細については、VectorIndexer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
s"categorical features: ${categoricalFeatures.mkString(", ")}")
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()API の詳細については、VectorIndexer Java ドキュメントを参照してください。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();Interaction
Interaction は、ベクトルまたは倍精度の値を持つ列を受け取り、各入力列から 1 つの値のすべての組み合わせの積を含む単一のベクトル列を生成する Transformer です。
たとえば、3 次元を持つ 2 つのベクトル型列が入力列としてある場合、9 次元のベクトルが出力列として生成されます。
例
「id1」、「vec1」、「vec2」の列を持つ次の DataFrame があると仮定します。
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
これらの入力列で Interaction を適用し、interactedCol を出力列として指定すると、次のようになります。
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
API の詳細については、Interaction Python ドキュメントを参照してください。
from pyspark.ml.feature import Interaction, VectorAssembler
df = spark.createDataFrame(
[(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)],
["id1", "id2", "id3", "id4", "id5", "id6", "id7"])
assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")
assembled1 = assembler1.transform(df)
assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")
assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")
interacted = interaction.transform(assembled2)
interacted.show(truncate=False)API の詳細については、Interaction Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")
val assembler1 = new VectorAssembler().
setInputCols(Array("id2", "id3", "id4")).
setOutputCol("vec1")
val assembled1 = assembler1.transform(df)
val assembler2 = new VectorAssembler().
setInputCols(Array("id5", "id6", "id7")).
setOutputCol("vec2")
val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
val interaction = new Interaction()
.setInputCols(Array("id1", "vec1", "vec2"))
.setOutputCol("interactedCol")
val interacted = interaction.transform(assembled2)
interacted.show(truncate = false)API の詳細については、Interaction Java ドキュメントを参照してください。
List<Row> data = Arrays.asList(
RowFactory.create(1, 1, 2, 3, 8, 4, 5),
RowFactory.create(2, 4, 3, 8, 7, 9, 8),
RowFactory.create(3, 6, 1, 9, 2, 3, 6),
RowFactory.create(4, 10, 8, 6, 9, 4, 5),
RowFactory.create(5, 9, 2, 7, 10, 7, 3),
RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);
StructType schema = new StructType(new StructField[]{
new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VectorAssembler assembler1 = new VectorAssembler()
.setInputCols(new String[]{"id2", "id3", "id4"})
.setOutputCol("vec1");
Dataset<Row> assembled1 = assembler1.transform(df);
VectorAssembler assembler2 = new VectorAssembler()
.setInputCols(new String[]{"id5", "id6", "id7"})
.setOutputCol("vec2");
Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");
Interaction interaction = new Interaction()
.setInputCols(new String[]{"id1","vec1","vec2"})
.setOutputCol("interactedCol");
Dataset<Row> interacted = interaction.transform(assembled2);
interacted.show(false);Normalizer
Normalizer は、Vector 行のデータセットを変換し、各 Vector を単位ノルムにするように正規化する Transformer です。パラメータ p を取り、正規化に使用される p-ノルムを指定します。(デフォルトは $p = 2$)。この正規化は、入力データを標準化し、学習アルゴリズムの動作を改善するのに役立ちます。
例
以下の例は、libsvm 形式でデータセットをロードし、各行を単位 $L^1$ ノルムおよび単位 $L^\infty$ ノルムに正規化する方法を示しています。
API の詳細については、Normalizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()API の詳細については、Normalizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()API の詳細については、Normalizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();StandardScaler
StandardScaler は、Vector 行のデータセットを変換し、各特徴量を単位標準偏差および/またはゼロ平均になるように正規化します。パラメータを取ります。
withStd: デフォルトは true。データを単位標準偏差にスケーリングします。withMean: デフォルトは false。スケーリング前に平均でデータを中央揃えします。密な出力を構築するため、スパース入力に適用する際は注意してください。
StandardScaler は、データセットで fit して StandardScalerModel を生成できる Estimator です。これは、要約統計量を計算することに相当します。このモデルはその後、Vector 列を、単位標準偏差および/またはゼロ平均特徴量を持つデータセットに変換できます。
特徴量の標準偏差がゼロの場合、その特徴量に対して Vector にデフォルトの 0.0 値が返されることに注意してください。
例
以下の例は、libsvm 形式でデータセットをロードし、各特徴量を単位標準偏差に正規化する方法を示しています。
API の詳細については、StandardScaler Python ドキュメントを参照してください。
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()API の詳細については、StandardScaler Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()API の詳細については、StandardScaler Java ドキュメントを参照してください。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();RobustScaler
RobustScaler は、Vector 行のデータセットを変換し、中央値を除去し、特定の四分位数範囲(デフォルトでは IQR: 四分位範囲、第 1 四分位数と第 3 四分位数の間の四分位数範囲)に従ってデータをスケーリングします。その動作は StandardScaler と非常に似ていますが、平均と標準偏差の代わりに中央値と四分位数範囲が使用されるため、外れ値に対してロバストです。パラメータを取ります。
lower: デフォルトは 0.25。四分位数範囲を計算するための下限。すべての特徴量で共有されます。upper: デフォルトは 0.75。四分位数範囲を計算するための上限。すべての特徴量で共有されます。withScaling: デフォルトは true。データを四分位数範囲にスケーリングします。withCentering: デフォルトは false。スケーリング前に中央値でデータを中央揃えします。密な出力を構築するため、スパース入力に適用する際は注意してください。
RobustScaler は、データセットで fit して RobustScalerModel を生成できる Estimator です。これは、四分位数統計量を計算することに相当します。このモデルはその後、Vector 列を、単位四分位数範囲および/またはゼロ中央値特徴量を持つデータセットに変換できます。
特徴量の四分位数範囲がゼロの場合、その特徴量に対して Vector にデフォルトの 0.0 値が返されることに注意してください。
例
以下の例は、libsvm 形式でデータセットをロードし、各特徴量を単位四分位数範囲に正規化する方法を示しています。
API の詳細については、RobustScaler Python ドキュメントを参照してください。
from pyspark.ml.feature import RobustScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
withScaling=True, withCentering=False,
lower=0.25, upper=0.75)
# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)
# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()API の詳細については、RobustScaler Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.RobustScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75)
// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)
// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()API の詳細については、RobustScaler Java ドキュメントを参照してください。
import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
RobustScaler scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75);
// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);
// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();MinMaxScaler
MinMaxScaler は、Vector 行のデータセットを変換し、各特徴量を特定の範囲(多くの場合 [0, 1])に再スケーリングします。パラメータを取ります。
min: デフォルトは 0.0。変換後の下限。すべての特徴量で共有されます。max: デフォルトは 1.0。変換後の上限。すべての特徴量で共有されます。
MinMaxScaler は、データセットの要約統計量を計算し、MinMaxScalerModel を生成します。このモデルはその後、各特徴量を個別に変換して、指定された範囲に収まるようにできます。
特徴量 E の再スケーリングされた値は、次のように計算されます。\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation} $E_{max} == E_{min}$ の場合、$Rescaled(e_i) = 0.5 * (max + min)$
ゼロ値は非ゼロ値に変換される可能性が高いため、トランスフォーマーの出力は、スパース入力であっても DenseVector になることに注意してください。
例
以下の例は、libsvm 形式でデータセットをロードし、各特徴量を [0, 1] に再スケーリングする方法を示しています。
API の詳細については、MinMaxScaler Python ドキュメントおよびMinMaxScalerModel Python ドキュメントを参照してください。
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()API の詳細については、MinMaxScaler Scala ドキュメントおよびMinMaxScalerModel Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()API の詳細については、MinMaxScaler Java ドキュメントおよびMinMaxScalerModel Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
+ scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();MaxAbsScaler
MaxAbsScaler は、Vector 行のデータセットを変換し、各特徴量を最大絶対値で割ることによって、各特徴量を範囲 [-1, 1] に再スケーリングします。データをシフト/中央揃えしないため、スパース性を破壊しません。
MaxAbsScaler は、データセットの要約統計量を計算し、MaxAbsScalerModel を生成します。このモデルはその後、各特徴量を個別に変換して、範囲 [-1, 1] に収まるようにできます。
例
以下の例は、libsvm 形式でデータセットをロードし、各特徴量を [-1, 1] に再スケーリングする方法を示しています。
API の詳細については、MaxAbsScaler Python ドキュメントおよびMaxAbsScalerModel Python ドキュメントを参照してください。
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()API の詳細については、MaxAbsScaler Scala ドキュメントおよびMaxAbsScalerModel Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -8.0)),
(1, Vectors.dense(2.0, 1.0, -4.0)),
(2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()API の詳細については、MaxAbsScaler Java ドキュメントおよびMaxAbsScalerModel Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();Bucketizer
Bucketizer は、連続特徴量の列を、ユーザーが指定したバケットによる特徴量バケットの列に変換します。パラメータを取ります。
splits: 連続特徴量をバケットにマッピングするためのパラメータ。n+1 個の分割で、n 個のバケットがあります。分割 x, y で定義されるバケットは [x, y) の範囲の値を含みますが、最後のバケットは y も含みます。分割は厳密に増加する必要があります。-inf、inf の値は、すべての Double 値をカバーするために明示的に提供する必要があります。それ以外の場合、指定された分割外の値はエラーとして扱われます。splitsの 2 つの例は、Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)およびArray(0.0, 1.0, 2.0)です。
ターゲット列の上限と下限が不明な場合は、Double.NegativeInfinity と Double.PositiveInfinity を分割の境界として追加して、Bucketizer の境界外例外の可能性を防ぐ必要があることに注意してください。
また、指定した分割は厳密に増加順でなければならないことにも注意してください。つまり、$s_0 < s_1 < s_2 < \dots < s_n$ です。
詳細については、Bucketizer の API ドキュメントを参照してください。
例
以下の例は、Double の列を別のインデックス化された列にバケット化する方法を示しています。
API の詳細については、Bucketizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()API の詳細については、Bucketizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(
immutable.ArraySeq.unsafeWrapArray(data.map(Tuple1.apply))).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()
val splitsArray = Array(
Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))
val data2 = Array(
(-999.9, -999.9),
(-0.5, -0.2),
(-0.3, -0.1),
(0.0, 0.0),
(0.2, 0.4),
(999.9, 999.9))
val dataFrame2 = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data2))
.toDF("features1", "features2")
val bucketizer2 = new Bucketizer()
.setInputCols(Array("features1", "features2"))
.setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
.setSplitsArray(splitsArray)
// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)
println(s"Bucketizer output with [" +
s"${bucketizer2.getSplitsArray(0).length-1}, " +
s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()API の詳細については、Bucketizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-999.9),
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2),
RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();
// Bucketize multiple columns at one pass.
double[][] splitsArray = {
{Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
{Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};
List<Row> data2 = Arrays.asList(
RowFactory.create(-999.9, -999.9),
RowFactory.create(-0.5, -0.2),
RowFactory.create(-0.3, -0.1),
RowFactory.create(0.0, 0.0),
RowFactory.create(0.2, 0.4),
RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);
Bucketizer bucketizer2 = new Bucketizer()
.setInputCols(new String[] {"features1", "features2"})
.setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
.setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);
System.out.println("Bucketizer output with [" +
(bucketizer2.getSplitsArray()[0].length-1) + ", " +
(bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();ElementwiseProduct
ElementwiseProduct は、指定された「重み」ベクトルを使用して、各入力ベクトルに要素ごとの乗算を行います。言い換えると、データセットの各列をスカラー乗数でスケーリングします。これは、入力ベクトル $v$ と変換ベクトル $w$ の間の アダマール積 を表し、結果ベクトルを生成します。
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
例
以下の例は、変換ベクトル値を使用してベクトルを変換する方法を示しています。
API の詳細については、ElementwiseProduct Python ドキュメントを参照してください。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()API の詳細については、ElementwiseProduct Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()API の詳細については、ElementwiseProduct Java ドキュメントを参照してください。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();SQLTransformer
SQLTransformer は、SQL ステートメントによって定義される変換を実装します。現在、"SELECT ... FROM __THIS__ ... " のような SQL 構文のみをサポートしています。ここで、"__THIS__" は入力データセットの基になるテーブルを表します。SELECT 句は、出力に表示するフィールド、定数、および式を指定し、Spark SQL がサポートする任意の SELECT 句にすることができます。ユーザーは、Spark SQL の組み込み関数や UDF を使用して、これらの選択された列を操作することもできます。たとえば、SQLTransformer は次のようなステートメントをサポートしています。
SELECT a, a + b AS a_b FROM __THIS__SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
例
以下のような DataFrame が、id、v1、v2 の列を持つと仮定します。
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
これは、ステートメント "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__" を使用した SQLTransformer の出力です。
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
API の詳細については、SQLTransformer Python ドキュメントを参照してください。
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()API の詳細については、SQLTransformer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()API の詳細については、SQLTransformer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();VectorAssembler
VectorAssembler は、指定された列のリストを単一のベクトル列に結合するトランスフォーマーです。これは、生のフィーチャと異なるフィーチャトランスフォーマーによって生成されたフィーチャを単一のフィーチャベクトルに結合し、ロジスティック回帰や決定木などの ML モデルをトレーニングするのに役立ちます。VectorAssembler は、次の入力列タイプを受け入れます:すべての数値タイプ、ブールタイプ、およびベクトルタイプ。各行で、入力列の値は指定された順序でベクトルに連結されます。
例
id、hour、mobile、userFeatures、clicked の列を持つ DataFrame があると仮定します。
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures は、3 つのユーザー特徴量を含むベクトル列です。hour、mobile、userFeatures を単一のフィーチャベクトル features に結合し、それを clicked を予測するために使用したいと考えています。VectorAssembler の入力列を hour、mobile、userFeatures に、出力列を features に設定すると、変換後に次の DataFrame が得られます。
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
API の詳細については、VectorAssembler Python ドキュメントを参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)API の詳細については、VectorAssembler Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)API の詳細については、VectorAssembler Java ドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);VectorSizeHint
VectorType の列のベクトルのサイズを明示的に指定することが役立つ場合があります。たとえば、VectorAssembler は、入力列のサイズ情報を使用して、出力列のサイズ情報とメタデータを生成します。これには、列の内容を検査することによって取得できる場合もありますが、ストリーミングデータフレームでは、ストリームが開始されるまで内容は利用できません。VectorSizeHint を使用すると、ユーザーは列のベクトルサイズを明示的に指定できるため、VectorAssembler や、ベクトルサイズを知る必要がある可能性のある他のトランスフォーマーが、その列を入力として使用できます。
VectorSizeHint を使用するには、ユーザーは inputCol および size パラメータを設定する必要があります。このトランスフォーマーをデータフレームに適用すると、inputCol のメタデータが更新された新しいデータフレームが生成され、ベクトルサイズが指定されます。結果のデータフレームに対する下流の操作は、メタデータを使用してこのサイズを取得できます。
VectorSizeHint は、オプションの handleInvalid パラメータも取ることができます。これは、ベクトル列に NULL または不正なサイズのベクトルが含まれる場合の動作を制御します。デフォルトでは handleInvalid は「error」に設定されており、例外をスローすることを示します。このパラメータは、「skip」(無効な値を含む行は結果のデータフレームからフィルタリングされることを示す)または「optimistic」(列を無効な値に対してチェックせず、すべての行を保持することを示す)に設定することもできます。「optimistic」の使用は、結果のデータフレームが不整合な状態になる可能性があることに注意してください。つまり、VectorSizeHint が適用された列のメタデータが、その列の内容と一致しません。ユーザーはこの種の不整合な状態を避けるように注意する必要があります。
API の詳細については、VectorSizeHint Python ドキュメントを参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
(0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
sizeHint = VectorSizeHint(
inputCol="userFeatures",
handleInvalid="skip",
size=3)
datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)API の詳細については、VectorSizeHint Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq(
(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3)
val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)API の詳細については、VectorSizeHint Java ドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);
VectorSizeHint sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3);
Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);QuantileDiscretizer
QuantileDiscretizer は、連続特徴量を持つ列を受け取り、ビン化されたカテゴリカル特徴量を持つ列を出力します。ビンの数は numBuckets パラメータで設定されます。例えば、入力の異なる値が少なすぎて十分な異なる分位数を生成できない場合など、使用されるビンの数がこの値よりも少なくなる可能性があります。
NaN 値: NaN 値は QuantileDiscretizer の適合中に列から削除されます。これにより、予測を行うための Bucketizer モデルが生成されます。変換中、Bucketizer はデータセットに NaN 値が見つかった場合にエラーを発生させますが、ユーザーは handleInvalid を設定することによって NaN 値を保持するか削除するかを選択することもできます。ユーザーが NaN 値を保持することを選択した場合、それらは特別な方法で処理され、独自のバケットに配置されます。たとえば、4 つのバケットが使用されている場合、非 NaN データはバケット [0-3] に配置されますが、NaN は特別なバケット [4] にカウントされます。
アルゴリズム: ビン範囲は、近似アルゴリズムを使用して選択されます(詳細については、approxQuantile のドキュメントを参照してください)。近似の精度は relativeError パラメータで制御できます。ゼロに設定すると、正確な分位数が計算されます(注意: 正確な分位数を計算することはコストのかかる操作です)。下部と上部のビン境界は、すべての実数値をカバーする -Infinity と +Infinity になります。
例
id、hour の列を持つ DataFrame があると仮定します。
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour は、Double 型の連続特徴量です。連続特徴量をカテゴリカルなものに変換したいと考えています。numBuckets = 3 が与えられた場合、次の DataFrame が得られるはずです。
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
API の詳細については、QuantileDiscretizer Python ドキュメントを参照してください。
from pyspark.ml.feature import QuantileDiscretizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()API の詳細については、QuantileDiscretizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(immutable.ArraySeq.unsafeWrapArray(data)).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)API の詳細については、QuantileDiscretizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);Imputer
Imputer estimator は、欠損値が存在する列の平均、中央値、または最頻値を使用して、データセットの欠損値を補完します。入力列は数値型である必要があります。現在、Imputer はカテゴリカル特徴量をサポートしておらず、カテゴリカル特徴量を含む列に対して誤った値を生成する可能性があります。.setMissingValue(custom_value) を使用すると、Imputer は 'NaN' 以外のカスタム値を補完できます。たとえば、.setMissingValue(0) は、(0) のすべての出現を補完します。
注意入力列のnull値はすべて欠損値として扱われ、補完されます。
例
列aとbを持つDataFrameがあると仮定します。
a | b
------------|-----------
1.0 | Double.NaN
2.0 | Double.NaN
Double.NaN | 3.0
4.0 | 4.0
5.0 | 5.0
この例では、ImputerはDouble.NaN(欠損値のデフォルト)のすべての出現箇所を、対応する列の他の値から計算された平均(デフォルトの補完戦略)で置き換えます。この例では、列aとbの代理値はそれぞれ3.0と4.0です。変換後、出力列の欠損値は、関連する列の代理値で置き換えられます。
a | b | out_a | out_b
------------|------------|-------|-------
1.0 | Double.NaN | 1.0 | 4.0
2.0 | Double.NaN | 2.0 | 4.0
Double.NaN | 3.0 | 3.0 | 3.0
4.0 | 4.0 | 4.0 | 4.0
5.0 | 5.0 | 5.0 | 5.0
APIの詳細については、Imputer Pythonドキュメントを参照してください。
from pyspark.ml.feature import Imputer
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()APIの詳細については、Imputer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.Imputer
val df = spark.createDataFrame(Seq(
(1.0, Double.NaN),
(2.0, Double.NaN),
(Double.NaN, 3.0),
(4.0, 4.0),
(5.0, 5.0)
)).toDF("a", "b")
val imputer = new Imputer()
.setInputCols(Array("a", "b"))
.setOutputCols(Array("out_a", "out_b"))
val model = imputer.fit(df)
model.transform(df).show()APIの詳細については、Imputer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1.0, Double.NaN),
RowFactory.create(2.0, Double.NaN),
RowFactory.create(Double.NaN, 3.0),
RowFactory.create(4.0, 4.0),
RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
createStructField("a", DoubleType, false),
createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Imputer imputer = new Imputer()
.setInputCols(new String[]{"a", "b"})
.setOutputCols(new String[]{"out_a", "out_b"});
ImputerModel model = imputer.fit(df);
model.transform(df).show();特徴量選択器
VectorSlicer
VectorSlicerは、特徴ベクトルを受け取り、元の特徴のサブ配列を持つ新しい特徴ベクトルを出力するトランスフォーマーです。ベクトル列から特徴を抽出するのに役立ちます。
VectorSlicerは、指定されたインデックスを持つベクトル列を受け入れ、そのインデックスによって選択された値を持つ新しいベクトル列を出力します。インデックスには2種類あります。
-
ベクトルへのインデックスを表す整数インデックス(
setIndices())。 -
ベクトルへの特徴の名前を表す文字列インデックス(
setNames())。これは、実装がAttributeのnameフィールドに一致するため、ベクトル列にAttributeGroupが必要であることを意味します。
整数と文字列による指定はどちらも可能です。さらに、整数インデックスと文字列名を同時に使用できます。少なくとも1つの特徴を選択する必要があります。重複する特徴は許可されないため、選択されたインデックスと名前の間には重複はありません。名前で特徴を選択した場合、空の入力属性が検出されると例外がスローされることに注意してください。
出力ベクトルは、選択されたインデックス(指定された順序)を持つ特徴を最初に並べ、次に選択された名前(指定された順序)を並べます。
例
列userFeaturesを持つDataFrameがあると仮定します。
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeaturesは3つのユーザー特徴を含むベクトル列です。userFeaturesの最初の列がすべてゼロであると仮定すると、それを削除して最後の2つの列のみを選択したいとします。VectorSlicerはsetIndices(1, 2)で最後の2つの要素を選択し、featuresという名前の新しいベクトル列を生成します。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
また、userFeaturesの候補となる入力属性(例:["f1", "f2", "f3"])があると仮定すると、setNames("f2", "f3")を使用してそれらを選択できます。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
APIの詳細については、VectorSlicer Pythonドキュメントを参照してください。
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()APIの詳細については、VectorSlicer Scalaドキュメントを参照してください。
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
val data = Arrays.asList(
Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
Row(Vectors.dense(-2.0, 2.3, 0.0))
)
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
output.show(false)APIの詳細については、VectorSlicer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = {
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
Dataset<Row> dataset =
spark.createDataFrame(data, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);RFormula
RFormulaは、Rモデルの数式で指定された列を選択します。現在、サポートされているR演算子のサブセットには、「~」、「.」、「:」、「+」、「-」が含まれます。基本的な演算子は次のとおりです。
~ターゲットと項を分離します。+項を連結します。「+ 0」は切片の削除を意味します。-項を削除します。「- 1」は切片の削除を意味します。:相互作用(数値値の場合は乗算、または二値化されたカテゴリ値)。.ターゲットを除くすべての列。
aとbがdouble列であると仮定し、RFormulaの効果を説明するために簡単な例を以下に示します。
y ~ a + bは、y ~ w0 + w1 * a + w2 * bをモデル化することを意味します。ここで、w0は切片、w1、w2は係数です。y ~ a + b + a:b - 1は、y ~ w1 * a + w2 * b + w3 * a * bをモデル化することを意味します。ここで、w1、w2、w3は係数です。
RFormulaは、特徴のベクトル列とラベルのdoubleまたはstring列を生成します。Rで線形回帰の数式が使用される場合と同様に、数値列はdoubleにキャストされます。文字列入力列については、stringOrderTypeによって決定される順序を使用してStringIndexerで変換され、順序付け後の最後のカテゴリがドロップされ、次にdoubleがワンホットエンコーディングされます。
値{'b', 'a', 'b', 'a', 'c', 'b'}を含む文字列特徴列があると仮定し、エンコーディングを制御するためにstringOrderTypeを設定します。
stringOrderType | Category mapped to 0 by StringIndexer | Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b') | least frequent category ('c')
'frequencyAsc' | least frequent category ('c') | most frequent category ('b')
'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')
'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c')
ラベル列がstring型の場合、frequencyDesc順序を使用してStringIndexerで最初にdoubleに変換されます。ラベル列がDataFrameに存在しない場合、出力ラベル列は数式で指定された応答変数から作成されます。
注意:順序オプションstringOrderTypeはラベル列には使用されません。ラベル列がインデックス付けされる場合、StringIndexerのデフォルトの降順頻度順序が使用されます。
例
列id、country、hour、clickedを持つDataFrameがあると仮定します。
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
clicked ~ country + hourという数式文字列でRFormulaを使用した場合、これはcountryとhourに基づいてclickedを予測したいことを示しています。変換後、次のDataFrameが得られるはずです。
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
APIの詳細については、RFormula Pythonドキュメントを参照してください。
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()APIの詳細については、RFormula Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()APIの詳細については、RFormula Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
List<Row> data = Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
);
Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();ChiSqSelector
ChiSqSelectorは、カイ二乗特徴選択を表します。これは、ラベル付きデータとカテゴリ特徴で機能します。ChiSqSelectorは、カイ二乗独立性検定を使用して、どの特徴を選択するかを決定します。5つの選択方法をサポートしています:numTopFeatures、percentile、fpr、fdr、fwe。
numTopFeaturesは、カイ二乗検定に従って固定数の上位特徴を選択します。これは、最も予測力の高い特徴を生成するのと似ています。percentileはnumTopFeaturesに似ていますが、固定数ではなく、特徴の割合を選択します。fprは、p値がしきい値未満の特徴をすべて選択し、選択の偽陽性率を制御します。fdrは、Benjamini-Hochberg手順を使用して、偽発見率がしきい値未満の特徴をすべて選択します。fweは、p値がしきい値未満の特徴をすべて選択します。しきい値は1/numFeaturesでスケーリングされ、選択のファミリーごとの誤り率を制御します。デフォルトでは、選択方法はnumTopFeaturesであり、デフォルトの上位特徴数は50に設定されています。ユーザーはsetSelectorTypeを使用して選択方法を選択できます。
例
列id、features、clickedを持つDataFrameがあると仮定します。このclickedは予測対象のターゲットとして使用されます。
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
numTopFeatures = 1でChiSqSelectorを使用した場合、ラベルclickedに従って、featuresの最後の列が最も有用な特徴として選択されます。
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
APIの詳細については、ChiSqSelector Pythonドキュメントを参照してください。
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()APIの詳細については、ChiSqSelector Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()APIの詳細については、ChiSqSelector Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
+ " features selected");
result.show();UnivariateFeatureSelector
UnivariateFeatureSelectorは、カテゴリ/連続ラベルとカテゴリ/連続特徴で機能します。ユーザーはfeatureTypeとlabelTypeを設定でき、Sparkは指定されたfeatureTypeとlabelTypeに基づいて使用するスコアリング関数を選択します。
featureType | labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous |categorical | ANOVATest (f_classif)
continuous |continuous | F-value (f_regression)
5つの選択モードをサポートしています:numTopFeatures、percentile、fpr、fdr、fwe。
numTopFeaturesは、固定数の上位特徴を選択します。percentileはnumTopFeaturesに似ていますが、固定数ではなく、特徴の割合を選択します。fprは、p値がしきい値未満の特徴をすべて選択し、選択の偽陽性率を制御します。fdrは、Benjamini-Hochberg手順を使用して、偽発見率がしきい値未満の特徴をすべて選択します。fweは、p値がしきい値未満の特徴をすべて選択します。しきい値は1/numFeaturesでスケーリングされ、選択のファミリーごとの誤り率を制御します。
デフォルトでは、選択モードはnumTopFeaturesであり、デフォルトのselectionThresholdは50に設定されています。
例
列id、features、labelを持つDataFrameがあると仮定します。このlabelは予測対象のターゲットとして使用されます。
id | features | label
---|--------------------------------|---------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0
featureTypeをcontinuous、labelTypeをcategoricalに設定し、numTopFeatures = 1とした場合、featuresの最後の列が最も有用な特徴として選択されます。
id | features | label | selectedFeatures
---|--------------------------------|---------|------------------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0 | [2.3]
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0 | [4.1]
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0 | [2.5]
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0 | [3.8]
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0 | [3.0]
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0 | [2.1]
APIの詳細については、UnivariateFeatureSelector Pythonドキュメントを参照してください。
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
(2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
(3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
(4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
(5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
(6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)
result = selector.fit(df).transform(df)
print("UnivariateFeatureSelector output with top %d features selected using f_classif"
% selector.getSelectionThreshold())
result.show()APIの詳細については、UnivariateFeatureSelector Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)
val df = spark.createDataset(data).toDF("id", "features", "label")
val selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
s" features selected using f_classif")
result.show()APIの詳細については、UnivariateFeatureSelector Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("UnivariateFeatureSelector output with top "
+ selector.getSelectionThreshold() + " features selected using f_classif");
result.show();VarianceThresholdSelector
VarianceThresholdSelectorは、低分散特徴を削除するセレクターです。varianceThresholdを超えない(標本)分散を持つ特徴は削除されます。設定されていない場合、varianceThresholdはデフォルトで0になり、これは分散が0の特徴(つまり、すべてのサンプルで同じ値を持つ特徴)のみが削除されることを意味します。
例
列idとfeaturesを持つDataFrameがあると仮定します。このfeaturesは予測対象のターゲットとして使用されます。
id | features
---|--------------------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]
6つの特徴の標本分散は、それぞれ16.67、0.67、8.17、10.17、5.07、11.47です。varianceThreshold = 8.0でVarianceThresholdSelectorを使用した場合、分散が8.0以下の特徴が削除されます。
id | features | selectedFeatures
---|--------------------------------|-------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]
APIの詳細については、VarianceThresholdSelector Pythonドキュメントを参照してください。
from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
(2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
(3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
(4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
(5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
(6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")
result = selector.fit(df).transform(df)
print("Output: Features with variance lower than %f are removed." %
selector.getVarianceThreshold())
result.show()APIの詳細については、VarianceThresholdSelector Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)
val df = spark.createDataset(data).toDF("id", "features")
val selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"Output: Features with variance lower than" +
s" ${selector.getVarianceThreshold} are removed.")
result.show()APIの詳細については、VarianceThresholdSelector Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VarianceThresholdSelector selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("Output: Features with variance lower than "
+ selector.getVarianceThreshold() + " are removed.");
result.show();Locality Sensitive Hashing
Locality Sensitive Hashing (LSH)は、大規模データセットでのクラスタリング、近似最近傍探索、外れ値検出に一般的に使用されるハッシュ技術の重要なクラスです。
LSHの一般的な考え方は、関数ファミリー(「LSHファミリー」)を使用してデータポイントをバケットにハッシュすることです。これにより、互いに近いデータポイントは高い確率で同じバケットに入り、互いに遠いデータポイントは非常に高い確率で異なるバケットに入ります。LSHファミリーは次のように形式的に定義されます。
距離関数dを持つ計量空間(M, d)において、Mは集合、dはM上の距離関数とします。LSHファミリーは、次のプロパティを満たす関数hのファミリーです。\[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \]このLSHファミリーは(r1, r2, p1, p2)-sensitiveと呼ばれます。
Sparkでは、異なるLSHファミリーは個別のクラス(例:MinHash)に実装されており、各クラスで特徴変換、近似類似性結合、近似最近傍のAPIが提供されています。
LSHでは、偽陽性を、同じバケットにハッシュされた遠い入力特徴のペア($d(p,q) \geq r2$)と定義し、偽陰性を、異なるバケットにハッシュされた近い特徴のペア($d(p,q) \leq r1$)と定義します。
LSH 操作
LSHが使用できる主要な操作の種類を説明します。学習済みのLSHモデルには、これらの各操作のメソッドがあります。
特徴量変換
特徴変換は、ハッシュ値を新しい列として追加するための基本的な機能です。これは次元削減に役立ちます。ユーザーはinputColとoutputColを設定することで、入力列と出力列名を指定できます。
LSHは複数のLSHハッシュテーブルもサポートしています。ユーザーはnumHashTablesを設定することでハッシュテーブルの数を指定できます。これは、近似類似性結合および近似最近傍におけるOR-amplificationにも使用されます。ハッシュテーブルの数を増やすと精度が向上しますが、通信コストと実行時間も増加します。
outputColの型はSeq[Vector]であり、配列の次元はnumHashTablesに等しく、ベクトルの次元は現在1に設定されています。将来のリリースでは、AND-amplificationを実装して、ユーザーがこれらのベクトルの次元を指定できるようにします。
近似類似性結合
近似類似性結合は、2つのデータセットを受け取り、ユーザー定義のしきい値より小さい距離を持つデータセット内の行のペアを近似的に返します。近似類似性結合は、2つの異なるデータセットの結合と自己結合の両方をサポートしています。自己結合はいくつかの重複ペアを生成します。
近似類似性結合は、変換済みおよび未変換のデータセットの両方を入力として受け入れます。未変換のデータセットが使用される場合、自動的に変換されます。この場合、ハッシュシグネチャはoutputColとして作成されます。
結合されたデータセットでは、元のデータセットはdatasetAとdatasetBでクエリできます。出力データセットには、返された各行ペアの真の距離を示す距離列が追加されます。
近似最近傍探索
近似最近傍探索は、データセット(特徴ベクトル)とキー(単一の特徴ベクトル)を受け取り、データセット内のベクトルに最も近い指定された数の行を近似的に返します。
近似最近傍探索は、変換済みおよび未変換のデータセットの両方を入力として受け入れます。未変換のデータセットが使用される場合、自動的に変換されます。この場合、ハッシュシグネチャはoutputColとして作成されます。
出力データセットには、各出力行と検索されたキーとの真の距離を示す距離列が追加されます。
注意:ハッシュバケットに十分な候補がない場合、近似最近傍探索はk未満の行を返します。
LSH アルゴリズム
ユークリッド距離のためのバケット化ランダム投影
Bucketed Random Projectionは、ユークリッド距離のためのLSHファミリーです。ユークリッド距離は次のように定義されます。\[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \]そのLSHファミリーは、特徴ベクトル$\mathbf{x}$をランダムな単位ベクトル$\mathbf{v}$に投影し、投影された結果をハッシュバケットに分割します。\[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \]ここで、rはユーザー定義のバケット長です。バケット長は、ハッシュバケットの平均サイズ(したがってバケット数)を制御するために使用できます。バケット長が大きいほど(つまり、バケット数が少ないほど)、特徴が同じバケットにハッシュされる確率が高くなり(真陽性と偽陽性の数が増加します)。
Bucketed Random Projectionは、任意のベクトルを入力特徴として受け入れ、スパースベクトルと密ベクトルを両方サポートします。
APIの詳細については、BucketedRandomProjectionLSH Pythonドキュメントを参照してください。
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(4, Vectors.dense([1.0, 0.0]),),
(5, Vectors.dense([-1.0, 0.0]),),
(6, Vectors.dense([0.0, 1.0]),),
(7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.dense([1.0, 0.0])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()APIの詳細については、BucketedRandomProjectionLSH Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 1.0)),
(1, Vectors.dense(1.0, -1.0)),
(2, Vectors.dense(-1.0, -1.0)),
(3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(4, Vectors.dense(1.0, 0.0)),
(5, Vectors.dense(-1.0, 0.0)),
(6, Vectors.dense(0.0, 1.0)),
(7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")
val key = Vectors.dense(1.0, 0.0)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val model = brp.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()APIの詳細については、BucketedRandomProjectionLSH Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 1.0)),
RowFactory.create(1, Vectors.dense(1.0, -1.0)),
RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(4, Vectors.dense(1.0, 0.0)),
RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
RowFactory.create(6, Vectors.dense(0.0, 1.0)),
RowFactory.create(7, Vectors.dense(0.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
Vector key = Vectors.dense(1.0, 0.0);
BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes");
BucketedRandomProjectionLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();ジャカード距離のための MinHash
MinHashは、入力特徴が自然数の集合である場合のJaccard距離のためのLSHファミリーです。2つの集合のJaccard距離は、それらの共通部分と和集合の濃度によって定義されます。\[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \]MinHashはランダムなハッシュ関数gを集合の各要素に適用し、すべてのハッシュ値の最小値を取ります。\[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]
MinHashの入力集合はバイナリベクトルで表され、ベクトルのインデックスが要素自体を表し、ベクトル内の非ゼロ値が集合内のその要素の存在を表します。密ベクトルと疎ベクトルの両方がサポートされていますが、通常は効率のために疎ベクトルが推奨されます。たとえば、Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])は、空間に10個の要素があることを意味します。この集合には要素2、要素3、要素5が含まれます。すべての非ゼロ値はバイナリ「1」値として扱われます。
注意:空の集合はMinHashで変換できません。つまり、すべての入力ベクトルには少なくとも1つの非ゼロエントリが必要です。
APIの詳細については、MinHashLSH Pythonドキュメントを参照してください。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()APIの詳細については、MinHashLSH Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()APIの詳細については、MinHashLSH Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);
MinHashLSH mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes");
MinHashLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();