特徴量の抽出、変換、選択
このセクションでは、特徴量を扱うアルゴリズムを、大まかに次のグループに分けて説明します。
- 抽出: 「生の」データから特徴量を抽出する
- 変換: 特徴量のスケーリング、変換、または変更
- 選択: より大きな特徴量セットからサブセットを選択する
- Locality Sensitive Hashing (LSH): このクラスのアルゴリズムは、特徴量変換と他のアルゴリズムの側面を組み合わせたものです。
目次
- 特徴量抽出器
- 特徴量変換器
- Tokenizer
- StopWordsRemover
- $n$-gram
- Binarizer
- PCA
- PolynomialExpansion
- 離散コサイン変換 (DCT)
- StringIndexer
- IndexToString
- OneHotEncoder
- VectorIndexer
- Interaction
- Normalizer
- StandardScaler
- RobustScaler
- MinMaxScaler
- MaxAbsScaler
- Bucketizer
- ElementwiseProduct
- SQLTransformer
- VectorAssembler
- VectorSizeHint
- QuantileDiscretizer
- Imputer
- 特徴量セレクター
- Locality Sensitive Hashing
特徴量抽出器
TF-IDF
Term frequency-inverse document frequency (TF-IDF) は、テキストマイニングで広く使用されている特徴量ベクトル化手法で、コーパス内のドキュメントに対する用語の重要性を反映します。用語を $t$
、ドキュメントを $d$
、コーパスを $D$
で表します。用語頻度 $TF(t, d)$
は、用語 $t$
がドキュメント $d$
に出現する回数であり、文書頻度 $DF(t, D)$
は、用語 $t$
を含むドキュメントの数です。用語頻度だけを使用して重要性を測定すると、頻繁に出現するがドキュメントに関する情報が少ない用語(「a」、「the」、「of」など)を過度に強調しやすくなります。ある用語がコーパス全体で非常に頻繁に出現する場合、その用語は特定のドキュメントに関する特別な情報を持たないことを意味します。逆文書頻度は、用語が提供する情報の量を示す数値尺度です。 \[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]
ここで、$|D|$
はコーパス内のドキュメントの総数です。対数が使用されているため、用語がすべてのドキュメントに出現する場合、その IDF 値は 0 になります。コーパス外の用語で 0 除算を回避するために、平滑化項が適用されることに注意してください。TF-IDF 尺度は、単に TF と IDF の積です。 \[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]
用語頻度と文書頻度の定義にはいくつかのバリアントがあります。MLlib では、柔軟性を高めるために TF と IDF を分離しています。
TF: HashingTF
と CountVectorizer
の両方を使用して、用語頻度ベクトルを生成できます。
HashingTF
は、用語のセットを受け取り、それらのセットを固定長のフィーチャベクトルに変換する Transformer
です。テキスト処理では、「用語のセット」は bag of words である可能性があります。HashingTF
は ハッシュトリックを利用します。生のフィーチャは、ハッシュ関数を適用することにより、インデックス(用語)にマッピングされます。ここで使用されるハッシュ関数は、MurmurHash 3 です。次に、マッピングされたインデックスに基づいて、用語頻度が計算されます。このアプローチでは、大規模なコーパスではコストのかかるグローバルな用語からインデックスへのマップを計算する必要がなくなりますが、ハッシュ衝突の可能性があり、異なる生のフィーチャがハッシュ後に同じ用語になる可能性があります。衝突の可能性を減らすために、ターゲットフィーチャの次元、つまりハッシュテーブルのバケット数を増やすことができます。ハッシュ値に対する単純なモジュロを使用してベクトルインデックスが決定されるため、フィーチャ次元として 2 のべき乗を使用することをお勧めします。そうしないと、フィーチャがベクトルインデックスに均等にマッピングされません。デフォルトのフィーチャ次元は $2^{18} = 262,144$
です。オプションのバイナリトグルパラメータは、用語頻度カウントを制御します。true に設定すると、ゼロ以外の頻度カウントはすべて 1 に設定されます。これは、整数カウントではなくバイナリカウントをモデル化する離散確率モデルに特に役立ちます。
CountVectorizer
は、テキストドキュメントを用語カウントのベクトルに変換します。詳細については、CountVectorizerを参照してください。
IDF: IDF
は、データセットに適合し、IDFModel
を生成する Estimator
です。IDFModel
は、フィーチャベクトル(通常は HashingTF
または CountVectorizer
から作成)を取得し、各フィーチャをスケーリングします。直感的には、コーパスに頻繁に出現するフィーチャの重みを下げます。
注意: spark.ml
は、テキストセグメンテーション用のツールを提供していません。Stanford NLP Group と scalanlp/chalk を参照してください。
例
次のコードセグメントでは、まず文のセットから始めます。Tokenizer
を使用して、各文を単語に分割します。各文 (bag of words) について、HashingTF
を使用して、文をフィーチャベクトルにハッシュします。IDF
を使用して、フィーチャベクトルのスケールを変更します。これにより、通常、テキストをフィーチャとして使用する場合にパフォーマンスが向上します。フィーチャベクトルは、学習アルゴリズムに渡すことができます。
API の詳細については、HashingTF Python ドキュメントとIDF Python ドキュメントを参照してください。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
API の詳細については、HashingTF Scala ドキュメントとIDF Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
val sentenceData = spark.createDataFrame(Seq(
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()
API の詳細については、HashingTF Java ドキュメントとIDF Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, "Hi I heard about Spark"),
RowFactory.create(0.0, "I wish Java could use case classes"),
RowFactory.create(1.0, "Logistic regression models are neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceData = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsData = tokenizer.transform(sentenceData);
int numFeatures = 20;
HashingTF hashingTF = new HashingTF()
.setInputCol("words")
.setOutputCol("rawFeatures")
.setNumFeatures(numFeatures);
Dataset<Row> featurizedData = hashingTF.transform(wordsData);
// alternatively, CountVectorizer can also be used to get term frequency vectors
IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
IDFModel idfModel = idf.fit(featurizedData);
Dataset<Row> rescaledData = idfModel.transform(featurizedData);
rescaledData.select("label", "features").show();
Word2Vec
Word2Vec
は、ドキュメントを表す単語のシーケンスを受け取り、Word2VecModel
をトレーニングする Estimator
です。モデルは、各単語を一意の固定サイズのベクトルにマッピングします。Word2VecModel
は、ドキュメント内のすべての単語の平均を使用して、各ドキュメントをベクトルに変換します。このベクトルは、予測、ドキュメントの類似性計算などのフィーチャとして使用できます。詳細については、Word2Vec に関する MLlib ユーザーガイドを参照してください。
例
次のコードセグメントでは、まず、それぞれが単語のシーケンスとして表されるドキュメントのセットから始めます。各ドキュメントについて、それをフィーチャベクトルに変換します。このフィーチャベクトルは、学習アルゴリズムに渡すことができます。
API の詳細については、Word2Vec Python ドキュメントを参照してください。
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
("Hi I heard about Spark".split(" "), ),
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
text, vector = row
print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))
API の詳細については、Word2Vec Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
"Hi I heard about Spark".split(" "),
"I wish Java could use case classes".split(" "),
"Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")
// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }
API の詳細については、Word2Vec Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Word2Vec;
import org.apache.spark.ml.feature.Word2VecModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("Hi I heard about Spark".split(" "))),
RowFactory.create(Arrays.asList("I wish Java could use case classes".split(" "))),
RowFactory.create(Arrays.asList("Logistic regression models are neat".split(" ")))
);
StructType schema = new StructType(new StructField[]{
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> documentDF = spark.createDataFrame(data, schema);
// Learn a mapping from words to Vectors.
Word2Vec word2Vec = new Word2Vec()
.setInputCol("text")
.setOutputCol("result")
.setVectorSize(3)
.setMinCount(0);
Word2VecModel model = word2Vec.fit(documentDF);
Dataset<Row> result = model.transform(documentDF);
for (Row row : result.collectAsList()) {
List<String> text = row.getList(0);
Vector vector = (Vector) row.get(1);
System.out.println("Text: " + text + " => \nVector: " + vector + "\n");
}
CountVectorizer
CountVectorizer
と CountVectorizerModel
は、テキストドキュメントのコレクションをトークンカウントのベクトルに変換するのに役立つことを目的としています。アプリオリな辞書が利用できない場合、CountVectorizer
を Estimator
として使用して、語彙を抽出し、CountVectorizerModel
を生成できます。モデルは、語彙全体でドキュメントのスパース表現を生成し、それらを LDA などの他のアルゴリズムに渡すことができます。
フィッティング処理中、CountVectorizer
は、コーパス全体での出現頻度順に上位 vocabSize
個の単語を選択します。オプションのパラメータ minDF
は、語彙に含めるために単語が出現しなければならないドキュメントの最小数(または < 1.0 の場合は割合)を指定することで、フィッティング処理にも影響を与えます。別のオプションのバイナリ切り替えパラメータは、出力ベクトルを制御します。true に設定すると、すべての非ゼロのカウントが 1 に設定されます。これは、整数カウントではなく、バイナリをモデル化する離散確率モデルに特に役立ちます。
例
以下のような id
と texts
列を持つ DataFrame があると仮定します。
id | texts
----|----------
0 | Array("a", "b", "c")
1 | Array("a", "b", "b", "c", "a")
texts
の各行は、Array[String] 型のドキュメントです。CountVectorizer
の fit を呼び出すと、語彙 (a, b, c) を持つ CountVectorizerModel
が生成されます。その後、変換後の出力列 "vector" には以下が含まれます。
id | texts | vector
----|---------------------------------|---------------
0 | Array("a", "b", "c") | (3,[0,1,2],[1.0,1.0,1.0])
1 | Array("a", "b", "b", "c", "a") | (3,[0,1,2],[2.0,2.0,1.0])
各ベクトルは、語彙に対するドキュメントのトークン数を表します。
API の詳細については、CountVectorizer Python ドキュメントとCountVectorizerModel Python ドキュメントを参照してください。
from pyspark.ml.feature import CountVectorizer
# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])
# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
API の詳細については、CountVectorizer Scala ドキュメントとCountVectorizerModel Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
val df = spark.createDataFrame(Seq(
(0, Array("a", "b", "c")),
(1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")
// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
.setInputCol("words")
.setOutputCol("features")
.setVocabSize(3)
.setMinDF(2)
.fit(df)
// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
.setInputCol("words")
.setOutputCol("features")
cvModel.transform(df).show(false)
API の詳細については、CountVectorizer Java ドキュメントとCountVectorizerModel Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// Input data: Each row is a bag of words from a sentence or document.
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("a", "b", "c")),
RowFactory.create(Arrays.asList("a", "b", "b", "c", "a"))
);
StructType schema = new StructType(new StructField [] {
new StructField("text", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
// fit a CountVectorizerModel from the corpus
CountVectorizerModel cvModel = new CountVectorizer()
.setInputCol("text")
.setOutputCol("feature")
.setVocabSize(3)
.setMinDF(2)
.fit(df);
// alternatively, define CountVectorizerModel with a-priori vocabulary
CountVectorizerModel cvm = new CountVectorizerModel(new String[]{"a", "b", "c"})
.setInputCol("text")
.setOutputCol("feature");
cvModel.transform(df).show(false);
FeatureHasher
フィーチャーハッシングは、カテゴリまたは数値のフィーチャーのセットを指定された次元(通常は元のフィーチャースペースよりもかなり小さい)のフィーチャーベクトルに投影します。これは、ハッシュトリックを使用して、フィーチャーをフィーチャーベクトルのインデックスにマッピングすることで行われます。
FeatureHasher
トランスフォーマーは、複数の列で動作します。各列には、数値またはカテゴリのフィーチャーのいずれかを含めることができます。列のデータ型の動作と処理は次のとおりです。
- 数値列:数値フィーチャーの場合、列名のハッシュ値を使用して、フィーチャー値をフィーチャーベクトル内のインデックスにマッピングします。デフォルトでは、数値フィーチャーはカテゴリとして扱われません(整数であっても)。カテゴリとして扱うには、
categoricalCols
パラメータを使用して、関連する列を指定します。 - 文字列列:カテゴリフィーチャーの場合、文字列 “column_name=value” のハッシュ値を使用して、ベクトルインデックスにマッピングし、指標値を
1.0
とします。したがって、カテゴリフィーチャーは「ワンホット」エンコードされます(OneHotEncoder をdropLast=false
で使用するのと同様です)。 - ブール列:ブール値は、文字列列と同じ方法で処理されます。つまり、ブールフィーチャーは “column_name=true” または “column_name=false” として表現され、指標値は
1.0
です。
Null(欠損)値は無視されます(結果のフィーチャーベクトルでは暗黙的にゼロになります)。
ここで使用するハッシュ関数は、HashingTF で使用されている MurmurHash 3 と同じです。ハッシュ値の単純な剰余を使用してベクトルインデックスを決定するため、numFeatures パラメータには 2 の累乗を使用することをお勧めします。そうしないと、フィーチャーはベクトルインデックスに均等にマッピングされません。
例
4つの入力列 real
, bool
, stringNum
, string
を持つ DataFrame があると仮定します。これらの異なるデータ型を入力として、フィーチャーベクトルの列を生成するための変換の動作を示します。
real| bool|stringNum|string
----|-----|---------|------
2.2| true| 1| foo
3.3|false| 2| bar
4.4|false| 3| baz
5.5|false| 4| foo
この DataFrame で FeatureHasher.transform
の出力は次のようになります。
real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1 |foo |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2 |bar |(262144,[6031, 80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3 |baz |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4 |foo |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])
結果のフィーチャーベクトルは、学習アルゴリズムに渡すことができます。
API の詳細については、FeatureHasher Python ドキュメントを参照してください。
from pyspark.ml.feature import FeatureHasher
dataset = spark.createDataFrame([
(2.2, True, "1", "foo"),
(3.3, False, "2", "bar"),
(4.4, False, "3", "baz"),
(5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])
hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
outputCol="features")
featurized = hasher.transform(dataset)
featurized.show(truncate=False)
API の詳細については、FeatureHasher Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.FeatureHasher
val dataset = spark.createDataFrame(Seq(
(2.2, true, "1", "foo"),
(3.3, false, "2", "bar"),
(4.4, false, "3", "baz"),
(5.5, false, "4", "foo")
)).toDF("real", "bool", "stringNum", "string")
val hasher = new FeatureHasher()
.setInputCols("real", "bool", "stringNum", "string")
.setOutputCol("features")
val featurized = hasher.transform(dataset)
featurized.show(false)
API の詳細については、FeatureHasher Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.FeatureHasher;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(2.2, true, "1", "foo"),
RowFactory.create(3.3, false, "2", "bar"),
RowFactory.create(4.4, false, "3", "baz"),
RowFactory.create(5.5, false, "4", "foo")
);
StructType schema = new StructType(new StructField[]{
new StructField("real", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("bool", DataTypes.BooleanType, false, Metadata.empty()),
new StructField("stringNum", DataTypes.StringType, false, Metadata.empty()),
new StructField("string", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
FeatureHasher hasher = new FeatureHasher()
.setInputCols(new String[]{"real", "bool", "stringNum", "string"})
.setOutputCol("features");
Dataset<Row> featurized = hasher.transform(dataset);
featurized.show(false);
特徴量変換器
Tokenizer
トークン化は、テキスト(文など)を個々の用語(通常は単語)に分割するプロセスです。単純な Tokenizer クラスがこの機能を提供します。次の例は、文を単語のシーケンスに分割する方法を示しています。
RegexTokenizer を使用すると、正規表現(regex)マッチングに基づくより高度なトークン化が可能です。デフォルトでは、パラメータ “pattern”(正規表現、デフォルト:"\\s+"
)は、入力テキストを分割する区切り文字として使用されます。あるいは、ユーザーはパラメータ “gaps” を false に設定して、正規表現 “pattern” が分割ギャップではなく「トークン」を表すことを示し、トークン化の結果として一致するすべての出現箇所を見つけることができます。
例
API の詳細については、Tokenizer Python ドキュメントとRegexTokenizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
countTokens = udf(lambda words: len(words), IntegerType())
tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
API の詳細については、Tokenizer Scala ドキュメントとRegexTokenizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val sentenceDataFrame = spark.createDataFrame(Seq(
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)
val countTokens = udf { (words: Seq[String]) => words.length }
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
.withColumn("tokens", countTokens(col("words"))).show(false)
API の詳細については、Tokenizer Java ドキュメントとRegexTokenizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import scala.collection.mutable.Seq;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.call_udf;
import static org.apache.spark.sql.functions.col;
List<Row> data = Arrays.asList(
RowFactory.create(0, "Hi I heard about Spark"),
RowFactory.create(1, "I wish Java could use case classes"),
RowFactory.create(2, "Logistic,regression,models,are,neat")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("sentence", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
spark.udf().register(
"countTokens", (Seq<?> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
regexTokenized.select("sentence", "words")
.withColumn("tokens", call_udf("countTokens", col("words")))
.show(false);
StopWordsRemover
ストップワードは、通常、単語が頻繁に出現し、あまり意味を持たないため、入力から除外する必要がある単語です。
StopWordsRemover
は、文字列のシーケンス(たとえば、Tokenizer の出力)を入力として受け取り、入力シーケンスからすべてのストップワードを削除します。ストップワードのリストは、stopWords
パラメータで指定します。一部の言語のデフォルトのストップワードは、StopWordsRemover.loadDefaultStopWords(language)
を呼び出すことでアクセスできます。利用可能なオプションは、「danish」、「dutch」、「english」、「finnish」、「french」、「german」、「hungarian」、「italian」、「norwegian」、「portuguese」、「russian」、「spanish」、「swedish」、「turkish」です。ブール値パラメータ caseSensitive
は、一致を大文字と小文字を区別するかどうかを示します(デフォルトでは false)。
例
以下のような id
と raw
列を持つ DataFrame があると仮定します。
id | raw
----|----------
0 | [I, saw, the, red, balloon]
1 | [Mary, had, a, little, lamb]
raw
を入力列、filtered
を出力列として StopWordsRemover
を適用すると、次のようになります。
id | raw | filtered
----|-----------------------------|--------------------
0 | [I, saw, the, red, balloon] | [saw, red, balloon]
1 | [Mary, had, a, little, lamb]|[Mary, little, lamb]
filtered
では、ストップワード “I”, “the”, “had”, “a” がフィルタリングされています。
API の詳細については、StopWordsRemover Python ドキュメントを参照してください。
from pyspark.ml.feature import StopWordsRemover
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])
remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
API の詳細については、StopWordsRemover Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.StopWordsRemover
val remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "balloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")
remover.transform(dataSet).show(false)
API の詳細については、StopWordsRemover Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StopWordsRemover;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
StopWordsRemover remover = new StopWordsRemover()
.setInputCol("raw")
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);
StructType schema = new StructType(new StructField[]{
new StructField(
"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> dataset = spark.createDataFrame(data, schema);
remover.transform(dataset).show(false);
$n$-gram
n-gram は、整数 $n$ に対して $n$ 個のトークン(通常は単語)のシーケンスです。NGram
クラスを使用して、入力フィーチャーを $n$-gram に変換できます。
NGram
は、文字列のシーケンス(たとえば、Tokenizer の出力)を入力として受け取ります。パラメータ n
は、各 $n$-gram の用語数を決定するために使用されます。出力は、$n$-gram のシーケンスで構成され、各 $n$-gram は $n$ 個の連続した単語のスペース区切りの文字列で表されます。入力シーケンスに n
個未満の文字列が含まれている場合、出力は生成されません。
例
API の詳細については、NGram Python ドキュメントを参照してください。
from pyspark.ml.feature import NGram
wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)
API の詳細については、NGram Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.NGram
val wordDataFrame = spark.createDataFrame(Seq(
(0, Array("Hi", "I", "heard", "about", "Spark")),
(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
(2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")
val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false)
API の詳細については、NGram Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.NGram;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),
RowFactory.create(1, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),
RowFactory.create(2, Arrays.asList("Logistic", "regression", "models", "are", "neat"))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField(
"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
NGram ngramTransformer = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams");
Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);
ngramDataFrame.select("ngrams").show(false);
Binarizer
二値化は、数値フィーチャーを二値(0/1)フィーチャーに閾値処理するプロセスです。
Binarizer
は、共通パラメータ inputCol
と outputCol
に加えて、二値化の threshold
を受け取ります。閾値よりも大きいフィーチャー値は 1.0 に二値化され、閾値以下の値は 0.0 に二値化されます。inputCol
では、Vector と Double の両方の型がサポートされています。
例
API の詳細については、Binarizer Python ドキュメントを参照してください。
from pyspark.ml.feature import Binarizer
continuousDataFrame = spark.createDataFrame([
(0, 0.1),
(1, 0.8),
(2, 0.2)
], ["id", "feature"])
binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")
binarizedDataFrame = binarizer.transform(continuousDataFrame)
print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
APIの詳細については、Binarizer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.Binarizer
val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")
val binarizer: Binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5)
val binarizedDataFrame = binarizer.transform(dataFrame)
println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()
APIの詳細については、Binarizer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Binarizer;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 0.1),
RowFactory.create(1, 0.8),
RowFactory.create(2, 0.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);
Binarizer binarizer = new Binarizer()
.setInputCol("feature")
.setOutputCol("binarized_feature")
.setThreshold(0.5);
Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);
System.out.println("Binarizer output with Threshold = " + binarizer.getThreshold());
binarizedDataFrame.show();
PCA
主成分分析 (PCA) は、直交変換を使用して、相関する可能性のある変数の観測値を、主成分と呼ばれる線形に相関のない値のセットに変換する統計的手法です。PCA クラスは、PCA を使用してベクトルを低次元空間に投影するモデルをトレーニングします。以下の例では、5 次元の特徴ベクトルを 3 次元の主成分に投影する方法を示します。
例
APIの詳細については、PCA Pythonドキュメントを参照してください。
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
APIの詳細については、PCA Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df)
val result = pca.transform(df).select("pcaFeatures")
result.show(false)
APIの詳細については、PCA Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PCA;
import org.apache.spark.ml.feature.PCAModel;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),
RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),
RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
PCAModel pca = new PCA()
.setInputCol("features")
.setOutputCol("pcaFeatures")
.setK(3)
.fit(df);
Dataset<Row> result = pca.transform(df).select("pcaFeatures");
result.show(false);
PolynomialExpansion
多項式展開は、元の次元のn次結合によって定式化される多項式空間に特徴を拡張するプロセスです。PolynomialExpansion クラスは、この機能を提供します。以下の例では、特徴を 3 次多項式空間に拡張する方法を示します。
例
APIの詳細については、PolynomialExpansion Pythonドキュメントを参照してください。
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])
polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)
polyDF.show(truncate=False)
APIの詳細については、PolynomialExpansion Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors
val data = Array(
Vectors.dense(2.0, 1.0),
Vectors.dense(0.0, 0.0),
Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3)
val polyDF = polyExpansion.transform(df)
polyDF.show(false)
APIの詳細については、PolynomialExpansion Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.PolynomialExpansion;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
PolynomialExpansion polyExpansion = new PolynomialExpansion()
.setInputCol("features")
.setOutputCol("polyFeatures")
.setDegree(3);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(2.0, 1.0)),
RowFactory.create(Vectors.dense(0.0, 0.0)),
RowFactory.create(Vectors.dense(3.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Dataset<Row> polyDF = polyExpansion.transform(df);
polyDF.show(false);
離散コサイン変換 (DCT)
離散コサイン変換は、時間領域の長さ $N$ の実数値シーケンスを、周波数領域の別の長さ $N$ の実数値シーケンスに変換します。DCT クラスは、DCT-II を実装し、変換を表す行列がユニタリになるように結果を $1/\sqrt{2}$ でスケーリングすることにより、この機能を提供します。変換されたシーケンスにはシフトは適用されません (例: 変換されたシーケンスの $0$ 番目の要素は $0$ 番目の DCT 係数であり、$N/2$ 番目ではない)。
例
APIの詳細については、DCT Pythonドキュメントを参照してください。
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])
dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")
dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(truncate=False)
APIの詳細については、DCT Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
Vectors.dense(0.0, 1.0, -2.0, 3.0),
Vectors.dense(-1.0, 2.0, 4.0, -7.0),
Vectors.dense(14.0, -2.0, -5.0, 1.0))
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false)
val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)
APIの詳細については、DCT Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.DCT;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),
RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),
RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("features", new VectorUDT(), false, Metadata.empty()),
});
Dataset<Row> df = spark.createDataFrame(data, schema);
DCT dct = new DCT()
.setInputCol("features")
.setOutputCol("featuresDCT")
.setInverse(false);
Dataset<Row> dctDf = dct.transform(df);
dctDf.select("featuresDCT").show(false);
StringIndexer
StringIndexer
は、ラベルの文字列列をラベルインデックスの列にエンコードします。StringIndexer
は複数の列をエンコードできます。インデックスは [0, numLabels)
にあり、次の 4 つの順序付けオプションがサポートされています。"frequencyDesc": ラベル頻度による降順 (最も頻度の高いラベルに 0 が割り当てられる)、"frequencyAsc": ラベル頻度による昇順 (最も頻度の低いラベルに 0 が割り当てられる)、"alphabetDesc": アルファベット順の降順、"alphabetAsc": アルファベット順の昇順 (デフォルト = "frequencyDesc")。なお、"frequencyDesc"/"frequencyAsc" で頻度が同じ場合は、文字列がアルファベット順にソートされます。
ユーザーが保持するように選択した場合、表示されないラベルは numLabels のインデックスに配置されます。入力列が数値の場合、文字列にキャストして文字列値をインデックス化します。 Estimator
や Transformer
などのダウンストリームのパイプラインコンポーネントが、この文字列でインデックス化されたラベルを使用する場合、コンポーネントの入力列をこの文字列でインデックス化された列名に設定する必要があります。多くの場合、setInputCol
で入力列を設定できます。
例
列 id
と category
を持つ次の DataFrame があると仮定します。
id | category
----|----------
0 | a
1 | b
2 | c
3 | a
4 | a
5 | c
category
は、「a」、「b」、「c」の 3 つのラベルを持つ文字列列です。StringIndexer
を、入力列として category
、出力列として categoryIndex
を使用して適用すると、次のようになります。
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | a | 0.0
4 | a | 0.0
5 | c | 1.0
「a」は最も頻度が高いためインデックス 0
が割り当てられ、次に「c」がインデックス 1
、「b」がインデックス 2
が割り当てられます。
さらに、StringIndexer
が、あるデータセットで StringIndexer
を適合させた後、別のデータセットを変換するために使用する場合に、表示されないラベルを処理する方法について、3 つの戦略があります。
- 例外をスローする (デフォルト)
- 表示されないラベルを含む行を完全にスキップする
- 表示されないラベルを特別な追加バケットに、インデックス numLabels で配置する
例
前の例に戻りますが、今回は以前に定義した StringIndexer
を次のデータセットで再利用します。
id | category
----|----------
0 | a
1 | b
2 | c
3 | d
4 | e
StringIndexer
が表示されないラベルをどのように処理するかを設定していない場合、または「error」に設定した場合、例外がスローされます。ただし、setHandleInvalid("skip")
を呼び出した場合、次のデータセットが生成されます。
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
「d」または「e」を含む行が表示されないことに注意してください。
setHandleInvalid("keep")
を呼び出すと、次のデータセットが生成されます。
id | category | categoryIndex
----|----------|---------------
0 | a | 0.0
1 | b | 2.0
2 | c | 1.0
3 | d | 3.0
4 | e | 3.0
「d」または「e」を含む行がインデックス「3.0」にマップされていることに注意してください。
APIの詳細については、StringIndexer Pythonドキュメントを参照してください。
from pyspark.ml.feature import StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()
APIの詳細については、StringIndexer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.StringIndexer
val df = spark.createDataFrame(
Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
val indexed = indexer.fit(df).transform(df)
indexed.show()
APIの詳細については、StringIndexer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("category", StringType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex");
Dataset<Row> indexed = indexer.fit(df).transform(df);
indexed.show();
IndexToString
StringIndexer
と対称的に、IndexToString
は、ラベルインデックスの列を、元のラベルを文字列として含む列にマッピングし直します。一般的なユースケースは、StringIndexer
でラベルからインデックスを生成し、それらのインデックスでモデルをトレーニングし、IndexToString
で予測されたインデックスの列から元のラベルを取得することです。ただし、独自のラベルを自由に指定できます。
例
StringIndexer
の例に基づいて、列 id
と categoryIndex
を持つ次の DataFrame があると仮定します。
id | categoryIndex
----|---------------
0 | 0.0
1 | 2.0
2 | 1.0
3 | 0.0
4 | 0.0
5 | 1.0
IndexToString
を、入力列として categoryIndex
、出力列として originalCategory
を使用して適用すると、元のラベルを取得できます (列のメタデータから推論されます)。
id | categoryIndex | originalCategory
----|---------------|-----------------
0 | 0.0 | a
1 | 2.0 | b
2 | 1.0 | c
3 | 0.0 | a
4 | 0.0 | a
5 | 1.0 | c
APIの詳細については、IndexToString Pythonドキュメントを参照してください。
from pyspark.ml.feature import IndexToString, StringIndexer
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)
print("Transformed string column '%s' to indexed column '%s'"
% (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()
print("StringIndexer will store labels in output column metadata\n")
converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)
print("Transformed indexed column '%s' back to original string column '%s' using "
"labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()
APIの詳細については、IndexToString Scalaドキュメントを参照してください。
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}
val df = spark.createDataFrame(Seq(
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
)).toDF("id", "category")
val indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df)
val indexed = indexer.transform(df)
println(s"Transformed string column '${indexer.getInputCol}' " +
s"to indexed column '${indexer.getOutputCol}'")
indexed.show()
val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
s"${Attribute.fromStructField(inputColSchema).toString}\n")
val converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory")
val converted = converter.transform(indexed)
println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()
APIの詳細については、IndexToString Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, "a"),
RowFactory.create(1, "b"),
RowFactory.create(2, "c"),
RowFactory.create(3, "a"),
RowFactory.create(4, "a"),
RowFactory.create(5, "c")
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("category", DataTypes.StringType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
StringIndexerModel indexer = new StringIndexer()
.setInputCol("category")
.setOutputCol("categoryIndex")
.fit(df);
Dataset<Row> indexed = indexer.transform(df);
System.out.println("Transformed string column '" + indexer.getInputCol() + "' " +
"to indexed column '" + indexer.getOutputCol() + "'");
indexed.show();
StructField inputColSchema = indexed.schema().apply(indexer.getOutputCol());
System.out.println("StringIndexer will store labels in output column metadata: " +
Attribute.fromStructField(inputColSchema).toString() + "\n");
IndexToString converter = new IndexToString()
.setInputCol("categoryIndex")
.setOutputCol("originalCategory");
Dataset<Row> converted = converter.transform(indexed);
System.out.println("Transformed indexed column '" + converter.getInputCol() + "' back to " +
"original string column '" + converter.getOutputCol() + "' using labels in metadata");
converted.select("id", "categoryIndex", "originalCategory").show();
OneHotEncoder
One-hot エンコーディングは、ラベルインデックスとして表現されたカテゴリ特徴を、すべての特徴値のセットから特定の特徴値の存在を示す最大 1 つの値を持つバイナリベクトルにマッピングします。このエンコーディングにより、ロジスティック回帰など、連続した特徴を期待するアルゴリズムでカテゴリ特徴を使用できるようになります。文字列型の入力データの場合、StringIndexer を使用して最初にカテゴリ特徴をエンコードするのが一般的です。
OneHotEncoder
は、複数の列を変換でき、入力列ごとに one-hot エンコードされた出力ベクトル列を返します。これらのベクトルを VectorAssembler を使用して単一の特徴ベクトルにマージするのが一般的です。
OneHotEncoder
は、データの変換中に無効な入力をどのように処理するかを選択する handleInvalid
パラメーターをサポートしています。使用可能なオプションには、「keep」(無効な入力は追加のカテゴリインデックスに割り当てられる) と「error」(エラーをスローする) があります。
例
APIの詳細については、OneHotEncoder Pythonドキュメントを参照してください。
from pyspark.ml.feature import OneHotEncoder
df = spark.createDataFrame([
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])
encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()
APIの詳細については、OneHotEncoder Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.OneHotEncoder
val df = spark.createDataFrame(Seq(
(0.0, 1.0),
(1.0, 0.0),
(2.0, 1.0),
(0.0, 2.0),
(0.0, 1.0),
(2.0, 0.0)
)).toDF("categoryIndex1", "categoryIndex2")
val encoder = new OneHotEncoder()
.setInputCols(Array("categoryIndex1", "categoryIndex2"))
.setOutputCols(Array("categoryVec1", "categoryVec2"))
val model = encoder.fit(df)
val encoded = model.transform(df)
encoded.show()
APIの詳細については、OneHotEncoder Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.OneHotEncoder;
import org.apache.spark.ml.feature.OneHotEncoderModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0.0, 1.0),
RowFactory.create(1.0, 0.0),
RowFactory.create(2.0, 1.0),
RowFactory.create(0.0, 2.0),
RowFactory.create(0.0, 1.0),
RowFactory.create(2.0, 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("categoryIndex1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("categoryIndex2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
OneHotEncoder encoder = new OneHotEncoder()
.setInputCols(new String[] {"categoryIndex1", "categoryIndex2"})
.setOutputCols(new String[] {"categoryVec1", "categoryVec2"});
OneHotEncoderModel model = encoder.fit(df);
Dataset<Row> encoded = model.transform(df);
encoded.show();
VectorIndexer
VectorIndexer
は、Vector
のデータセット内のカテゴリ特徴のインデックス付けを支援します。どの特徴がカテゴリであるかを自動的に判断し、元の値をカテゴリインデックスに変換することができます。具体的には、以下の処理を行います。
- Vector型の入力列とパラメータ
maxCategories
を受け取ります。 - 異なる値の数に基づいて、どの特徴をカテゴリとするかを決定します。最大で
maxCategories
個の異なる値を持つ特徴はカテゴリであると宣言されます。 - 各カテゴリ特徴に対して、0ベースのカテゴリインデックスを計算します。
- カテゴリ特徴にインデックスを付け、元の特徴値をインデックスに変換します。
カテゴリ特徴にインデックスを付けることで、決定木やツリーアンサンブルなどのアルゴリズムがカテゴリ特徴を適切に扱うことができるようになり、パフォーマンスが向上します。
例
以下の例では、ラベル付きポイントのデータセットを読み込み、VectorIndexer
を使用して、どの特徴をカテゴリとして扱うべきかを決定します。カテゴリ特徴の値をインデックスに変換します。この変換されたデータは、カテゴリ特徴を処理するDecisionTreeRegressor
などのアルゴリズムに渡すことができます。
APIの詳細については、VectorIndexer Pythonドキュメントを参照してください。
from pyspark.ml.feature import VectorIndexer
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)
categoricalFeatures = indexerModel.categoryMaps
print("Chose %d categorical features: %s" %
(len(categoricalFeatures), ", ".join(str(k) for k in categoricalFeatures.keys())))
# Create new column "indexed" with categorical values transformed to indices
indexedData = indexerModel.transform(data)
indexedData.show()
APIの詳細については、VectorIndexer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.VectorIndexer
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10)
val indexerModel = indexer.fit(data)
val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} " +
s"categorical features: ${categoricalFeatures.mkString(", ")}")
// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()
APIの詳細については、VectorIndexer Javaドキュメントを参照してください。
import java.util.Map;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> data = spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
VectorIndexer indexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexed")
.setMaxCategories(10);
VectorIndexerModel indexerModel = indexer.fit(data);
Map<Integer, Map<Double, Integer>> categoryMaps = indexerModel.javaCategoryMaps();
System.out.print("Chose " + categoryMaps.size() + " categorical features:");
for (Integer feature : categoryMaps.keySet()) {
System.out.print(" " + feature);
}
System.out.println();
// Create new column "indexed" with categorical values transformed to indices
Dataset<Row> indexedData = indexerModel.transform(data);
indexedData.show();
Interaction
Interaction
は、ベクトルまたはdouble値の列を受け取り、各入力列から1つの値のすべての組み合わせの積を含む単一のベクトル列を生成するTransformer
です。
たとえば、入力列としてそれぞれ3次元を持つ2つのベクトル型列がある場合、出力列として9次元ベクトルが得られます。
例
次のDataFrameに "id1"、"vec1"、"vec2" の列があると仮定します。
id1|vec1 |vec2
---|--------------|--------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0]
これらの入力列に対してInteraction
を適用すると、出力列としてのinteractedCol
には以下が含まれます。
id1|vec1 |vec2 |interactedCol
---|--------------|--------------|------------------------------------------------------
1 |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]
2 |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]
3 |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]
4 |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
5 |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0]
6 |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]
APIの詳細については、Interaction Pythonドキュメントを参照してください。
from pyspark.ml.feature import Interaction, VectorAssembler
df = spark.createDataFrame(
[(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)],
["id1", "id2", "id3", "id4", "id5", "id6", "id7"])
assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")
assembled1 = assembler1.transform(df)
assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")
assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")
interacted = interaction.transform(assembled2)
interacted.show(truncate=False)
APIの詳細については、Interaction Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler
val df = spark.createDataFrame(Seq(
(1, 1, 2, 3, 8, 4, 5),
(2, 4, 3, 8, 7, 9, 8),
(3, 6, 1, 9, 2, 3, 6),
(4, 10, 8, 6, 9, 4, 5),
(5, 9, 2, 7, 10, 7, 3),
(6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")
val assembler1 = new VectorAssembler().
setInputCols(Array("id2", "id3", "id4")).
setOutputCol("vec1")
val assembled1 = assembler1.transform(df)
val assembler2 = new VectorAssembler().
setInputCols(Array("id5", "id6", "id7")).
setOutputCol("vec2")
val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")
val interaction = new Interaction()
.setInputCols(Array("id1", "vec1", "vec2"))
.setOutputCol("interactedCol")
val interacted = interaction.transform(assembled2)
interacted.show(truncate = false)
APIの詳細については、Interaction Javaドキュメントを参照してください。
List<Row> data = Arrays.asList(
RowFactory.create(1, 1, 2, 3, 8, 4, 5),
RowFactory.create(2, 4, 3, 8, 7, 9, 8),
RowFactory.create(3, 6, 1, 9, 2, 3, 6),
RowFactory.create(4, 10, 8, 6, 9, 4, 5),
RowFactory.create(5, 9, 2, 7, 10, 7, 3),
RowFactory.create(6, 1, 1, 4, 2, 8, 4)
);
StructType schema = new StructType(new StructField[]{
new StructField("id1", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id2", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id3", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id4", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id5", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id6", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("id7", DataTypes.IntegerType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VectorAssembler assembler1 = new VectorAssembler()
.setInputCols(new String[]{"id2", "id3", "id4"})
.setOutputCol("vec1");
Dataset<Row> assembled1 = assembler1.transform(df);
VectorAssembler assembler2 = new VectorAssembler()
.setInputCols(new String[]{"id5", "id6", "id7"})
.setOutputCol("vec2");
Dataset<Row> assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2");
Interaction interaction = new Interaction()
.setInputCols(new String[]{"id1","vec1","vec2"})
.setOutputCol("interactedCol");
Dataset<Row> interacted = interaction.transform(assembled2);
interacted.show(false);
Normalizer
Normalizer
は、Vector
行のデータセットを変換し、各Vector
を単位ノルムにするTransformer
です。正規化に使用されるp-ノルムを指定するパラメータp
を受け取ります(デフォルトは$p = 2$です)。この正規化は、入力データを標準化し、学習アルゴリズムの動作を改善するのに役立ちます。
例
次の例は、libsvm形式でデータセットをロードし、各行を単位$L^1$ノルムおよび単位$L^\infty$ノルムにする方法を示しています。
APIの詳細については、Normalizer Pythonドキュメントを参照してください。
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.5, -1.0]),),
(1, Vectors.dense([2.0, 1.0, 1.0]),),
(2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()
# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()
APIの詳細については、Normalizer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.5, -1.0)),
(1, Vectors.dense(2.0, 1.0, 1.0)),
(2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")
// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0)
val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()
// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()
APIの詳細については、Normalizer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Normalizer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
// Normalize each Vector using $L^1$ norm.
Normalizer normalizer = new Normalizer()
.setInputCol("features")
.setOutputCol("normFeatures")
.setP(1.0);
Dataset<Row> l1NormData = normalizer.transform(dataFrame);
l1NormData.show();
// Normalize each Vector using $L^\infty$ norm.
Dataset<Row> lInfNormData =
normalizer.transform(dataFrame, normalizer.p().w(Double.POSITIVE_INFINITY));
lInfNormData.show();
StandardScaler
StandardScaler
は、Vector
行のデータセットを変換し、各特徴を単位標準偏差および/またはゼロ平均になるように正規化します。パラメータを受け取ります。
withStd
: デフォルトはTrueです。データを単位標準偏差にスケールします。withMean
: デフォルトはFalseです。スケーリングする前に、平均でデータを中央寄せします。密な出力が生成されるため、スパース入力に適用する場合は注意してください。
StandardScaler
は、データセットにfit
してStandardScalerModel
を生成できるEstimator
です。これは、要約統計量を計算することに相当します。モデルは、データセットのVector
列を変換して、単位標準偏差および/またはゼロ平均の特徴を持つようにすることができます。
特徴の標準偏差がゼロの場合、その特徴のVector
にデフォルト値0.0
が返されることに注意してください。
例
次の例は、libsvm形式でデータセットをロードし、各特徴を単位標準偏差にする方法を示しています。
APIの詳細については、StandardScaler Pythonドキュメントを参照してください。
from pyspark.ml.feature import StandardScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(dataFrame)
# Normalize each feature to have unit standard deviation.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細については、StandardScaler Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.StandardScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)
// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細については、StandardScaler Javaドキュメントを参照してください。
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.StandardScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
StandardScaler scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false);
// Compute summary statistics by fitting the StandardScaler
StandardScalerModel scalerModel = scaler.fit(dataFrame);
// Normalize each feature to have unit standard deviation.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
RobustScaler
RobustScaler
は、Vector
行のデータセットを変換し、中央値を削除し、特定の分位範囲(デフォルトではIQR:四分位範囲、第1四分位数と第3四分位数の間の分位範囲)に従ってデータをスケーリングします。その動作はStandardScaler
とよく似ていますが、平均と標準偏差の代わりに中央値と分位範囲が使用されるため、外れ値に対してロバストです。パラメータを受け取ります。
lower
: デフォルトは0.25です。分位範囲を計算するための下位分位で、すべての特徴で共有されます。upper
: デフォルトは0.75です。分位範囲を計算するための上位分位で、すべての特徴で共有されます。withScaling
: デフォルトはTrueです。データを分位範囲にスケーリングします。withCentering
: デフォルトはFalseです。スケーリングする前に、中央値でデータを中央寄せします。密な出力が生成されるため、スパース入力に適用する場合は注意してください。
RobustScaler
は、データセットにfit
してRobustScalerModel
を生成できるEstimator
です。これは、分位統計量を計算することに相当します。モデルは、データセットのVector
列を変換して、単位分位範囲および/またはゼロ中央値の特徴を持つようにすることができます。
特徴の分位範囲がゼロの場合、その特徴のVector
にデフォルト値0.0
が返されることに注意してください。
例
次の例は、libsvm形式でデータセットをロードし、各特徴を単位分位範囲にする方法を示しています。
APIの詳細については、RobustScaler Pythonドキュメントを参照してください。
from pyspark.ml.feature import RobustScaler
dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
withScaling=True, withCentering=False,
lower=0.25, upper=0.75)
# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)
# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細については、RobustScaler Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.RobustScaler
val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
val scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75)
// Compute summary statistics by fitting the RobustScaler.
val scalerModel = scaler.fit(dataFrame)
// Transform each feature to have unit quantile range.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()
APIの詳細については、RobustScaler Javaドキュメントを参照してください。
import org.apache.spark.ml.feature.RobustScaler;
import org.apache.spark.ml.feature.RobustScalerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> dataFrame =
spark.read().format("libsvm").load("data/mllib/sample_libsvm_data.txt");
RobustScaler scaler = new RobustScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithScaling(true)
.setWithCentering(false)
.setLower(0.25)
.setUpper(0.75);
// Compute summary statistics by fitting the RobustScaler
RobustScalerModel scalerModel = scaler.fit(dataFrame);
// Transform each feature to have unit quantile range.
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.show();
MinMaxScaler
MinMaxScaler
は、Vector
行のデータセットを変換し、各特徴を特定の範囲(多くの場合[0、1])に再スケーリングします。パラメータを受け取ります。
min
: デフォルトは0.0です。変換後の下限で、すべての特徴で共有されます。max
: デフォルトは1.0です。変換後の上限で、すべての特徴で共有されます。
MinMaxScaler
は、データセットの要約統計量を計算し、MinMaxScalerModel
を生成します。モデルは、各特徴が指定された範囲内になるように個別に変換できます。
特徴Eの再スケーリングされた値は、\begin{equation} Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min \end{equation}
として計算されます。$E_{max} == E_{min}$
の場合、$Rescaled(e_i) = 0.5 * (max + min)$
となります。
ゼロ値が非ゼロ値に変換される可能性があるため、トランスフォーマーの出力はスパース入力の場合でもDenseVector
になることに注意してください。
例
次の例は、libsvm形式でデータセットをロードし、各特徴を[0、1]に再スケーリングする方法を示しています。
APIの詳細については、MinMaxScaler PythonドキュメントおよびMinMaxScalerModel Pythonドキュメントを参照してください。
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -1.0]),),
(1, Vectors.dense([2.0, 1.1, 1.0]),),
(2, Vectors.dense([3.0, 10.1, 3.0]),)
], ["id", "features"])
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [min, max].
scaledData = scalerModel.transform(dataFrame)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show()
APIの詳細については、MinMaxScaler ScalaドキュメントとMinMaxScalerModel Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -1.0)),
(1, Vectors.dense(2.0, 1.1, 1.0)),
(2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")
val scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()
APIの詳細については、MinMaxScaler JavaドキュメントとMinMaxScalerModel Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinMaxScaler;
import org.apache.spark.ml.feature.MinMaxScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -1.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.1, 1.0)),
RowFactory.create(2, Vectors.dense(3.0, 10.1, 3.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MinMaxScaler scaler = new MinMaxScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MinMaxScalerModel
MinMaxScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [min, max].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
System.out.println("Features scaled to range: [" + scaler.getMin() + ", "
+ scaler.getMax() + "]");
scaledData.select("features", "scaledFeatures").show();
MaxAbsScaler
MaxAbsScaler
は、各特徴量をその特徴量の絶対値の最大値で割ることにより、Vector
行のデータセットを変換し、各特徴量の範囲を[-1, 1]にリスケールします。データのシフト/センタリングは行わないため、スパース性が失われることはありません。
MaxAbsScaler
はデータセットの要約統計量を計算し、MaxAbsScalerModel
を生成します。このモデルは、各特徴量を個別に[-1, 1]の範囲に変換できます。
例
次の例では、libsvm形式のデータセットをロードし、各特徴量を[-1, 1]にリスケールする方法を示します。
APIの詳細については、MaxAbsScaler PythonドキュメントとMaxAbsScalerModel Pythonドキュメントを参照してください。
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors
dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])
scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
# Compute summary statistics and generate MaxAbsScalerModel
scalerModel = scaler.fit(dataFrame)
# rescale each feature to range [-1, 1].
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
APIの詳細については、MaxAbsScaler ScalaドキュメントとMaxAbsScalerModel Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
val dataFrame = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 0.1, -8.0)),
(1, Vectors.dense(2.0, 1.0, -4.0)),
(2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")
val scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)
// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
APIの詳細については、MaxAbsScaler JavaドキュメントとMaxAbsScalerModel Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MaxAbsScaler;
import org.apache.spark.ml.feature.MaxAbsScalerModel;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 0.1, -8.0)),
RowFactory.create(1, Vectors.dense(2.0, 1.0, -4.0)),
RowFactory.create(2, Vectors.dense(4.0, 10.0, 8.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
MaxAbsScaler scaler = new MaxAbsScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures");
// Compute summary statistics and generate MaxAbsScalerModel
MaxAbsScalerModel scalerModel = scaler.fit(dataFrame);
// rescale each feature to range [-1, 1].
Dataset<Row> scaledData = scalerModel.transform(dataFrame);
scaledData.select("features", "scaledFeatures").show();
Bucketizer
Bucketizer
は、連続特徴量の列を、ユーザーが指定したバケットで指定された特徴量バケットの列に変換します。パラメータを取ります
splits
: 連続特徴量をバケットにマッピングするためのパラメータ。n+1個のスプリットがある場合、n個のバケットがあります。スプリットx,yで定義されたバケットは、最後のバケットを除き[x,y)の範囲の値を保持し、最後のバケットはyも含まれます。スプリットは厳密に増加している必要があります。すべてのDouble値をカバーするために、-inf、infの値は明示的に指定する必要があります。そうしないと、指定されたスプリット外の値はエラーとして扱われます。splits
の2つの例は、Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)
とArray(0.0, 1.0, 2.0)
です。
対象の列の上限と下限がわからない場合は、Bucketizerの範囲外になる可能性のある例外を防ぐために、スプリットの境界としてDouble.NegativeInfinity
とDouble.PositiveInfinity
を追加する必要があることに注意してください。
また、提供したスプリットは、s0 < s1 < s2 < ... < sn
のように、厳密に増加する順序である必要があることにも注意してください。
詳細については、BucketizerのAPIドキュメントを参照してください。
例
次の例では、Double
の列を別のインデックス化された列にバケット化する方法を示します。
APIの詳細については、Bucketizer Pythonドキュメントを参照してください。
from pyspark.ml.feature import Bucketizer
splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")
# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)
print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits()) - 1))
bucketedData.show()
APIの詳細については、Bucketizer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.Bucketizer
val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)
val data = Array(-999.9, -0.5, -0.3, 0.0, 0.2, 999.9)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")
val bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits)
// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
println(s"Bucketizer output with ${bucketizer.getSplits.length-1} buckets")
bucketedData.show()
val splitsArray = Array(
Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity),
Array(Double.NegativeInfinity, -0.3, 0.0, 0.3, Double.PositiveInfinity))
val data2 = Array(
(-999.9, -999.9),
(-0.5, -0.2),
(-0.3, -0.1),
(0.0, 0.0),
(0.2, 0.4),
(999.9, 999.9))
val dataFrame2 = spark.createDataFrame(data2).toDF("features1", "features2")
val bucketizer2 = new Bucketizer()
.setInputCols(Array("features1", "features2"))
.setOutputCols(Array("bucketedFeatures1", "bucketedFeatures2"))
.setSplitsArray(splitsArray)
// Transform original data into its bucket index.
val bucketedData2 = bucketizer2.transform(dataFrame2)
println(s"Bucketizer output with [" +
s"${bucketizer2.getSplitsArray(0).length-1}, " +
s"${bucketizer2.getSplitsArray(1).length-1}] buckets for each input column")
bucketedData2.show()
APIの詳細については、Bucketizer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Bucketizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
double[] splits = {Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY};
List<Row> data = Arrays.asList(
RowFactory.create(-999.9),
RowFactory.create(-0.5),
RowFactory.create(-0.3),
RowFactory.create(0.0),
RowFactory.create(0.2),
RowFactory.create(999.9)
);
StructType schema = new StructType(new StructField[]{
new StructField("features", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Bucketizer bucketizer = new Bucketizer()
.setInputCol("features")
.setOutputCol("bucketedFeatures")
.setSplits(splits);
// Transform original data into its bucket index.
Dataset<Row> bucketedData = bucketizer.transform(dataFrame);
System.out.println("Bucketizer output with " + (bucketizer.getSplits().length-1) + " buckets");
bucketedData.show();
// Bucketize multiple columns at one pass.
double[][] splitsArray = {
{Double.NEGATIVE_INFINITY, -0.5, 0.0, 0.5, Double.POSITIVE_INFINITY},
{Double.NEGATIVE_INFINITY, -0.3, 0.0, 0.3, Double.POSITIVE_INFINITY}
};
List<Row> data2 = Arrays.asList(
RowFactory.create(-999.9, -999.9),
RowFactory.create(-0.5, -0.2),
RowFactory.create(-0.3, -0.1),
RowFactory.create(0.0, 0.0),
RowFactory.create(0.2, 0.4),
RowFactory.create(999.9, 999.9)
);
StructType schema2 = new StructType(new StructField[]{
new StructField("features1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("features2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> dataFrame2 = spark.createDataFrame(data2, schema2);
Bucketizer bucketizer2 = new Bucketizer()
.setInputCols(new String[] {"features1", "features2"})
.setOutputCols(new String[] {"bucketedFeatures1", "bucketedFeatures2"})
.setSplitsArray(splitsArray);
// Transform original data into its bucket index.
Dataset<Row> bucketedData2 = bucketizer2.transform(dataFrame2);
System.out.println("Bucketizer output with [" +
(bucketizer2.getSplitsArray()[0].length-1) + ", " +
(bucketizer2.getSplitsArray()[1].length-1) + "] buckets for each input column");
bucketedData2.show();
ElementwiseProduct
ElementwiseProductは、要素ごとの乗算を使用して、各入力ベクトルに提供された「重み」ベクトルを乗算します。言い換えれば、データセットの各列をスカラー乗数でスケーリングします。これは、入力ベクトルv
と変換ベクトルw
との間のアダマール積を表し、結果ベクトルを生成します。
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
例
以下の例では、変換ベクトル値を使用してベクトルを変換する方法を示します。
APIの詳細については、ElementwiseProduct Pythonドキュメントを参照してください。
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()
APIの詳細については、ElementwiseProduct Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors
// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
("a", Vectors.dense(1.0, 2.0, 3.0)),
("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector")
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()
APIの詳細については、ElementwiseProduct Javaドキュメントを参照してください。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ElementwiseProduct;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
// Create some vector data; also works for sparse vectors
List<Row> data = Arrays.asList(
RowFactory.create("a", Vectors.dense(1.0, 2.0, 3.0)),
RowFactory.create("b", Vectors.dense(4.0, 5.0, 6.0))
);
List<StructField> fields = new ArrayList<>(2);
fields.add(DataTypes.createStructField("id", DataTypes.StringType, false));
fields.add(DataTypes.createStructField("vector", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
Dataset<Row> dataFrame = spark.createDataFrame(data, schema);
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct()
.setScalingVec(transformingVector)
.setInputCol("vector")
.setOutputCol("transformedVector");
// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show();
SQLTransformer
SQLTransformer
は、SQLステートメントで定義された変換を実装します。現在、"SELECT ... FROM __THIS__ ..."
のようなSQL構文のみをサポートしています。ここで、"__THIS__"
は入力データセットの基になるテーブルを表します。select句は、出力に表示するフィールド、定数、および式を指定し、Spark SQLがサポートする任意のselect句を使用できます。ユーザーは、Spark SQL組み込み関数およびUDFを使用して、選択したこれらの列を操作することもできます。たとえば、SQLTransformer
は次のようなステートメントをサポートします。
SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
例
列id
、v1
、v2
を持つ次のDataFrameがあると仮定します。
id | v1 | v2
----|-----|-----
0 | 1.0 | 3.0
2 | 2.0 | 5.0
これは、ステートメント"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__"
を使用したSQLTransformer
の出力です。
id | v1 | v2 | v3 | v4
----|-----|-----|-----|-----
0 | 1.0 | 3.0 | 4.0 | 3.0
2 | 2.0 | 5.0 | 7.0 |10.0
APIの詳細については、SQLTransformer Pythonドキュメントを参照してください。
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
APIの詳細については、SQLTransformer Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.SQLTransformer
val df = spark.createDataFrame(
Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")
val sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
APIの詳細については、SQLTransformer Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.SQLTransformer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(0, 1.0, 3.0),
RowFactory.create(2, 2.0, 5.0)
);
StructType schema = new StructType(new StructField [] {
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("v1", DataTypes.DoubleType, false, Metadata.empty()),
new StructField("v2", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
SQLTransformer sqlTrans = new SQLTransformer().setStatement(
"SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__");
sqlTrans.transform(df).show();
VectorAssembler
VectorAssembler
は、指定された列のリストを単一のベクトル列に結合するトランスフォーマーです。これは、ロジスティック回帰や決定木などのMLモデルをトレーニングするために、生のフィーチャと異なるフィーチャトランスフォーマーによって生成されたフィーチャを単一のフィーチャベクトルに結合する場合に役立ちます。VectorAssembler
は、次の入力列タイプを受け入れます:すべての数値型、ブール型、およびベクトル型。各行で、入力列の値は、指定された順序でベクトルに連結されます。
例
列id
、hour
、mobile
、userFeatures
、およびclicked
を持つDataFrameがあると仮定します。
id | hour | mobile | userFeatures | clicked
----|------|--------|------------------|---------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0
userFeatures
は、3つのユーザーフィーチャを含むベクトル列です。hour
、mobile
、およびuserFeatures
をfeatures
と呼ばれる単一のフィーチャベクトルに結合し、clicked
を予測するために使用します。VectorAssembler
の入力列をhour
、mobile
、およびuserFeatures
に設定し、出力列をfeatures
に設定すると、変換後、次のDataFrameが得られます。
id | hour | mobile | userFeatures | clicked | features
----|------|--------|------------------|---------|-----------------------------
0 | 18 | 1.0 | [0.0, 10.0, 0.5] | 1.0 | [18.0, 1.0, 0.0, 10.0, 0.5]
APIの詳細については、VectorAssembler Pythonドキュメントを参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
APIの詳細については、VectorAssembler Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
val output = assembler.transform(dataset)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
APIの詳細については、VectorAssembler Javaドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row), schema);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
Dataset<Row> output = assembler.transform(dataset);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
VectorSizeHint
VectorType
の列のベクトルのサイズを明示的に指定すると便利な場合があります。たとえば、VectorAssembler
は、入力列からのサイズ情報を使用して、出力列のサイズ情報とメタデータを生成します。場合によっては、この情報を列の内容を調べることで取得できますが、ストリーミングデータフレームでは、ストリームが開始されるまで内容は利用できません。VectorSizeHint
を使用すると、ユーザーは列のベクトルサイズを明示的に指定できるため、VectorAssembler
、またはベクトルサイズを知る必要がある可能性のある他のトランスフォーマーは、その列を入力として使用できます。
VectorSizeHint
を使用するには、ユーザーはinputCol
パラメーターとsize
パラメーターを設定する必要があります。このトランスフォーマーをデータフレームに適用すると、ベクトルサイズを指定するinputCol
のメタデータが更新された新しいデータフレームが生成されます。結果のデータフレームに対するダウンストリーム操作は、メタデータを使用してこのサイズを取得できます。
VectorSizeHint
は、オプションの handleInvalid
パラメータも受け取ることができ、これは、ベクトル列にnullが含まれる場合や、サイズが間違ったベクトルが含まれる場合の動作を制御します。デフォルトでは、handleInvalid
は "error" に設定されており、例外がスローされることを示します。このパラメータは、"skip" に設定することもでき、これは無効な値を含む行を結果のデータフレームからフィルタリングすることを示します。また、"optimistic" に設定することもでき、これは列が無効な値についてチェックされず、すべての行が保持されることを示します。"optimistic" の使用は、結果のデータフレームを矛盾した状態にする可能性があることに注意してください。つまり、VectorSizeHint
が適用された列のメタデータが、その列の内容と一致しないことを意味します。ユーザーは、このような矛盾した状態を避けるように注意する必要があります。
APIの詳細については、VectorSizeHint Python ドキュメントを参照してください。
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
(0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
sizeHint = VectorSizeHint(
inputCol="userFeatures",
handleInvalid="skip",
size=3)
datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)
APIの詳細については、VectorSizeHint Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.{VectorAssembler, VectorSizeHint}
import org.apache.spark.ml.linalg.Vectors
val dataset = spark.createDataFrame(
Seq(
(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0),
(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")
val sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3)
val datasetWithSize = sizeHint.transform(dataset)
println("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(false)
val assembler = new VectorAssembler()
.setInputCols(Array("hour", "mobile", "userFeatures"))
.setOutputCol("features")
// This dataframe can be used by downstream transformers as before
val output = assembler.transform(datasetWithSize)
println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(false)
APIの詳細については、VectorSizeHint Java ドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("hour", IntegerType, false),
createStructField("mobile", DoubleType, false),
createStructField("userFeatures", new VectorUDT(), false),
createStructField("clicked", DoubleType, false)
});
Row row0 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0);
Row row1 = RowFactory.create(0, 18, 1.0, Vectors.dense(0.0, 10.0), 0.0);
Dataset<Row> dataset = spark.createDataFrame(Arrays.asList(row0, row1), schema);
VectorSizeHint sizeHint = new VectorSizeHint()
.setInputCol("userFeatures")
.setHandleInvalid("skip")
.setSize(3);
Dataset<Row> datasetWithSize = sizeHint.transform(dataset);
System.out.println("Rows where 'userFeatures' is not the right size are filtered out");
datasetWithSize.show(false);
VectorAssembler assembler = new VectorAssembler()
.setInputCols(new String[]{"hour", "mobile", "userFeatures"})
.setOutputCol("features");
// This dataframe can be used by downstream transformers as before
Dataset<Row> output = assembler.transform(datasetWithSize);
System.out.println("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column " +
"'features'");
output.select("features", "clicked").show(false);
QuantileDiscretizer
QuantileDiscretizer
は、連続的な特徴を持つ列を受け取り、ビン分割されたカテゴリ特徴を持つ列を出力します。ビンの数は numBuckets
パラメータで設定します。使用されるバケットの数がこの値よりも小さくなる可能性があり、例えば、十分な個別の分位数を作成するための入力の個別の値が少なすぎる場合などがあります。
NaN値:NaN値は、QuantileDiscretizer
のフィッティング中に列から削除されます。これにより、予測を行うための Bucketizer
モデルが生成されます。変換中、Bucketizer
はデータセット内でNaN値を見つけたときにエラーを発生させますが、ユーザーは handleInvalid
を設定することで、データセット内のNaN値を保持または削除することを選択することもできます。ユーザーがNaN値を保持することを選択した場合、それらは特別に処理され、独自のバケットに配置されます。例えば、4つのバケットが使用される場合、NaN以外のデータはバケット[0-3]に入れられますが、NaNは特別なバケット[4]に数えられます。
アルゴリズム:ビンの範囲は、近似アルゴリズムを使用して選択されます(詳細については、approxQuantile のドキュメントを参照してください)。近似の精度は、relativeError
パラメータで制御できます。ゼロに設定すると、正確な分位数が計算されます(注:正確な分位数の計算はコストのかかる操作です)。下限と上限のビンの境界は、すべての実数値をカバーする -Infinity
と +Infinity
になります。
例
列 id
、hour
を持つ DataFrame があると仮定します。
id | hour
----|------
0 | 18.0
----|------
1 | 19.0
----|------
2 | 8.0
----|------
3 | 5.0
----|------
4 | 2.2
hour
は Double
型の連続的な特徴です。この連続的な特徴をカテゴリカルな特徴に変換したいと考えています。numBuckets = 3
が与えられた場合、次の DataFrame が得られるはずです。
id | hour | result
----|------|------
0 | 18.0 | 2.0
----|------|------
1 | 19.0 | 2.0
----|------|------
2 | 8.0 | 1.0
----|------|------
3 | 5.0 | 1.0
----|------|------
4 | 2.2 | 0.0
APIの詳細については、QuantileDiscretizer Python ドキュメントを参照してください。
from pyspark.ml.feature import QuantileDiscretizer
data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
result = discretizer.fit(df).transform(df)
result.show()
APIの詳細については、QuantileDiscretizer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show(false)
APIの詳細については、QuantileDiscretizer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.QuantileDiscretizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(0, 18.0),
RowFactory.create(1, 19.0),
RowFactory.create(2, 8.0),
RowFactory.create(3, 5.0),
RowFactory.create(4, 2.2)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("hour", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
QuantileDiscretizer discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3);
Dataset<Row> result = discretizer.fit(df).transform(df);
result.show(false);
Imputer
Imputer
推定器は、欠損値が位置する列の平均、中央値、または最頻値を使用して、データセット内の欠損値を補完します。入力列は数値型である必要があります。現在、Imputer
はカテゴリカルな特徴をサポートしておらず、カテゴリカルな特徴を含む列に対して誤った値を生成する可能性があります。Imputer は、.setMissingValue(custom_value)
によって 'NaN' 以外のカスタム値を補完できます。たとえば、.setMissingValue(0)
は(0)のすべての出現を補完します。
注意 入力列のすべての null
値は欠損値として扱われ、同様に補完されます。
例
列 a
および b
を持つ DataFrame があると仮定します。
a | b
------------|-----------
1.0 | Double.NaN
2.0 | Double.NaN
Double.NaN | 3.0
4.0 | 4.0
5.0 | 5.0
この例では、Imputerは、Double.NaN
(欠損値のデフォルト)のすべての出現を、対応する列の他の値から計算された平均(デフォルトの補完戦略)に置き換えます。この例では、列 a
および b
の代替値はそれぞれ 3.0 と 4.0 です。変換後、出力列の欠損値は、関連する列の代替値に置き換えられます。
a | b | out_a | out_b
------------|------------|-------|-------
1.0 | Double.NaN | 1.0 | 4.0
2.0 | Double.NaN | 2.0 | 4.0
Double.NaN | 3.0 | 3.0 | 3.0
4.0 | 4.0 | 4.0 | 4.0
5.0 | 5.0 | 5.0 | 5.0
APIの詳細については、Imputer Python ドキュメントを参照してください。
from pyspark.ml.feature import Imputer
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])
imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)
model.transform(df).show()
APIの詳細については、Imputer Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.Imputer
val df = spark.createDataFrame(Seq(
(1.0, Double.NaN),
(2.0, Double.NaN),
(Double.NaN, 3.0),
(4.0, 4.0),
(5.0, 5.0)
)).toDF("a", "b")
val imputer = new Imputer()
.setInputCols(Array("a", "b"))
.setOutputCols(Array("out_a", "out_b"))
val model = imputer.fit(df)
model.transform(df).show()
APIの詳細については、Imputer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1.0, Double.NaN),
RowFactory.create(2.0, Double.NaN),
RowFactory.create(Double.NaN, 3.0),
RowFactory.create(4.0, 4.0),
RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
createStructField("a", DoubleType, false),
createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);
Imputer imputer = new Imputer()
.setInputCols(new String[]{"a", "b"})
.setOutputCols(new String[]{"out_a", "out_b"});
ImputerModel model = imputer.fit(df);
model.transform(df).show();
特徴量セレクター
VectorSlicer
VectorSlicer
は、特徴ベクトルを受け取り、元の特徴のサブ配列を持つ新しい特徴ベクトルを出力するトランスフォーマーです。ベクトル列から特徴を抽出するのに役立ちます。
VectorSlicer
は、指定されたインデックスを持つベクトル列を受け入れ、それらのインデックスを介して値が選択される新しいベクトル列を出力します。インデックスには2つのタイプがあります。
-
ベクトルへのインデックスを表す整数インデックス、
setIndices()
。 -
ベクトルの特徴の名前を表す文字列インデックス、
setNames()
。これは、実装がAttribute
のnameフィールドで一致するため、ベクトル列がAttributeGroup
を持つことを必要とします。
整数と文字列による指定はどちらも受け入れ可能です。さらに、整数のインデックスと文字列の名前を同時に使用できます。少なくとも1つの特徴を選択する必要があります。重複する特徴は許可されないため、選択されたインデックスと名前の間に重複があってはなりません。特徴の名前が選択された場合、空の入力属性が発生すると例外がスローされることに注意してください。
出力ベクトルは、最初に選択されたインデックス(指定された順序)で特徴を並べ、その後に選択された名前(指定された順序)を続けます。
例
列 userFeatures
を持つ DataFrame があると仮定します。
userFeatures
------------------
[0.0, 10.0, 0.5]
userFeatures
は、3つのユーザー特徴を含むベクトル列です。userFeatures
の最初の列はすべてゼロであると仮定し、それを削除して最後の2つの列のみを選択したいと考えています。VectorSlicer
は、setIndices(1, 2)
を使用して最後の2つの要素を選択し、features
という名前の新しいベクトル列を生成します。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
また、userFeatures
の潜在的な入力属性、つまり ["f1", "f2", "f3"]
があるとすると、setNames("f2", "f3")
を使用してそれらを選択できます。
userFeatures | features
------------------|-----------------------------
[0.0, 10.0, 0.5] | [10.0, 0.5]
["f1", "f2", "f3"] | ["f2", "f3"]
APIの詳細については、VectorSlicer Python ドキュメントを参照してください。
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row
df = spark.createDataFrame([
Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])
slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])
output = slicer.transform(df)
output.select("userFeatures", "features").show()
APIの詳細については、VectorSlicer Scala ドキュメントを参照してください。
import java.util.Arrays
import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType
val data = Arrays.asList(
Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
Row(Vectors.dense(-2.0, 2.3, 0.0))
)
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))
val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")
slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))
val output = slicer.transform(dataset)
output.show(false)
APIの詳細については、VectorSlicer Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.attribute.Attribute;
import org.apache.spark.ml.attribute.AttributeGroup;
import org.apache.spark.ml.attribute.NumericAttribute;
import org.apache.spark.ml.feature.VectorSlicer;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
Attribute[] attrs = {
NumericAttribute.defaultAttr().withName("f1"),
NumericAttribute.defaultAttr().withName("f2"),
NumericAttribute.defaultAttr().withName("f3")
};
AttributeGroup group = new AttributeGroup("userFeatures", attrs);
List<Row> data = Arrays.asList(
RowFactory.create(Vectors.sparse(3, new int[]{0, 1}, new double[]{-2.0, 2.3})),
RowFactory.create(Vectors.dense(-2.0, 2.3, 0.0))
);
Dataset<Row> dataset =
spark.createDataFrame(data, (new StructType()).add(group.toStructField()));
VectorSlicer vectorSlicer = new VectorSlicer()
.setInputCol("userFeatures").setOutputCol("features");
vectorSlicer.setIndices(new int[]{1}).setNames(new String[]{"f3"});
// or slicer.setIndices(new int[]{1, 2}), or slicer.setNames(new String[]{"f2", "f3"})
Dataset<Row> output = vectorSlicer.transform(dataset);
output.show(false);
RFormula
RFormula
は、Rモデルの式で指定された列を選択します。現在、'~'、'.'、':'、'+'、'-'を含むR演算子の限られたサブセットをサポートしています。基本的な演算子は次のとおりです。
~
ターゲットと項を分離します+
項を連結します。「+ 0」は切片の削除を意味します-
項を削除します。「- 1」は切片の削除を意味します:
相互作用(数値の場合は乗算、またはバイナライズされたカテゴリ値).
ターゲットを除くすべての列
a
と b
が double 列であると仮定し、RFormula
の効果を説明するために、次の簡単な例を使用します。
y ~ a + b
は、y ~ w0 + w1 * a + w2 * b
というモデルを意味します。ここで、w0
は切片であり、w1, w2
は係数です。y ~ a + b + a:b - 1
は、y ~ w1 * a + w2 * b + w3 * a * b
というモデルを意味します。ここで、w1, w2, w3
は係数です。
RFormula
は、特徴のベクトル列と、double または string のラベル列を生成します。線形回帰のためにRで式が使用される場合と同様に、数値列はdoubleにキャストされます。文字列の入力列については、最初に StringIndexer によって stringOrderType
によって決定された順序を使用して変換され、順序付け後の最後のカテゴリは削除され、doubleはワンホットエンコードされます。
値 {'b', 'a', 'b', 'a', 'c', 'b'}
を含む文字列特徴列があると仮定し、エンコードを制御するために stringOrderType
を設定します。
stringOrderType | Category mapped to 0 by StringIndexer | Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b') | least frequent category ('c')
'frequencyAsc' | least frequent category ('c') | most frequent category ('b')
'alphabetDesc' | last alphabetical category ('c') | first alphabetical category ('a')
'alphabetAsc' | first alphabetical category ('a') | last alphabetical category ('c')
ラベル列が文字列型の場合、StringIndexer によって frequencyDesc
の順序を使用して最初に double に変換されます。ラベル列が DataFrame に存在しない場合、出力ラベル列は式で指定された応答変数から作成されます。
注:順序付けオプション stringOrderType
はラベル列には使用されません。ラベル列がインデックス化されると、StringIndexer
のデフォルトの降順頻度順序を使用します。
例
列 id
、country
、hour
、clicked
を持つ DataFrame があると仮定します。
id | country | hour | clicked
---|---------|------|---------
7 | "US" | 18 | 1.0
8 | "CA" | 12 | 0.0
9 | "NZ" | 15 | 0.0
もし、clicked ~ country + hour
という数式文字列を持つ RFormula
を使用する場合、これは country
と hour
に基づいて clicked
を予測したいということを示しています。変換後、次の DataFrame が得られるはずです。
id | country | hour | clicked | features | label
---|---------|------|---------|------------------|-------
7 | "US" | 18 | 1.0 | [0.0, 0.0, 18.0] | 1.0
8 | "CA" | 12 | 0.0 | [0.0, 1.0, 12.0] | 0.0
9 | "NZ" | 15 | 0.0 | [1.0, 0.0, 15.0] | 0.0
API の詳細については、RFormula Python ドキュメントを参照してください。
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
API の詳細については、RFormula Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.RFormula
val dataset = spark.createDataFrame(Seq(
(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
API の詳細については、RFormula Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.RFormula;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.types.DataTypes.*;
StructType schema = createStructType(new StructField[]{
createStructField("id", IntegerType, false),
createStructField("country", StringType, false),
createStructField("hour", IntegerType, false),
createStructField("clicked", DoubleType, false)
});
List<Row> data = Arrays.asList(
RowFactory.create(7, "US", 18, 1.0),
RowFactory.create(8, "CA", 12, 0.0),
RowFactory.create(9, "NZ", 15, 0.0)
);
Dataset<Row> dataset = spark.createDataFrame(data, schema);
RFormula formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label");
Dataset<Row> output = formula.fit(dataset).transform(dataset);
output.select("features", "label").show();
ChiSqSelector
ChiSqSelector
は、カイ二乗特徴選択を表します。これは、カテゴリ特徴を持つラベル付きデータに対して動作します。ChiSqSelector
は、どの特徴を選択するかを決定するために、独立性のカイ二乗検定を使用します。これは、次の 5 つの選択方法をサポートしています: numTopFeatures
、percentile
、fpr
、fdr
、fwe
numTopFeatures
は、カイ二乗検定に基づいて、上位の特徴を固定数選択します。これは、最も予測能力の高い特徴を生成することに似ています。percentile
はnumTopFeatures
に似ていますが、固定数ではなく、すべての特徴の割合を選択します。fpr
は、p 値が閾値以下のすべての特徴を選択し、選択の偽陽性率を制御します。fdr
は、Benjamini-Hochberg 手順を使用して、偽発見率が閾値以下のすべての特徴を選択します。fwe
は、p 値が閾値以下のすべての特徴を選択します。閾値は 1/numFeatures でスケーリングされ、選択のファミリーワイズエラー率を制御します。デフォルトでは、選択方法はnumTopFeatures
で、上位の特徴のデフォルト数は 50 に設定されています。ユーザーは、setSelectorType
を使用して選択方法を選択できます。
例
予測対象として使用される id
、features
、および clicked
の列を持つ DataFrame があると仮定します。
id | features | clicked
---|-----------------------|---------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0
8 | [0.0, 1.0, 12.0, 0.0] | 0.0
9 | [1.0, 0.0, 15.0, 0.1] | 0.0
numTopFeatures = 1
で ChiSqSelector
を使用する場合、ラベル clicked
に基づいて、features
の最後の列が最も有用な特徴として選択されます。
id | features | clicked | selectedFeatures
---|-----------------------|---------|------------------
7 | [0.0, 0.0, 18.0, 1.0] | 1.0 | [1.0]
8 | [0.0, 1.0, 12.0, 0.0] | 0.0 | [0.0]
9 | [1.0, 0.0, 15.0, 0.1] | 0.0 | [0.1]
API の詳細については、ChiSqSelector Python ドキュメントを参照してください。
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
(8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
(9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
outputCol="selectedFeatures", labelCol="clicked")
result = selector.fit(df).transform(df)
print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()
API の詳細については、ChiSqSelector Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)
val df = spark.createDataset(data).toDF("id", "features", "clicked")
val selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()
API の詳細については、ChiSqSelector Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.ChiSqSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
List<Row> data = Arrays.asList(
RowFactory.create(7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
RowFactory.create(8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
RowFactory.create(9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("clicked", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
ChiSqSelector selector = new ChiSqSelector()
.setNumTopFeatures(1)
.setFeaturesCol("features")
.setLabelCol("clicked")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("ChiSqSelector output with top " + selector.getNumTopFeatures()
+ " features selected");
result.show();
UnivariateFeatureSelector
UnivariateFeatureSelector
は、カテゴリ/連続特徴を持つカテゴリ/連続ラベルに対して動作します。ユーザーは featureType
および labelType
を設定でき、Spark は指定された featureType
および labelType
に基づいて使用するスコア関数を選択します。
featureType | labelType |score function
------------|------------|--------------
categorical |categorical | chi-squared (chi2)
continuous |categorical | ANOVATest (f_classif)
continuous |continuous | F-value (f_regression)
これは、次の 5 つの選択モードをサポートしています: numTopFeatures
、percentile
、fpr
、fdr
、fwe
numTopFeatures
は、上位の特徴を固定数選択します。percentile
はnumTopFeatures
に似ていますが、固定数ではなく、すべての特徴の割合を選択します。fpr
は、p 値が閾値以下のすべての特徴を選択し、選択の偽陽性率を制御します。fdr
は、Benjamini-Hochberg 手順を使用して、偽発見率が閾値以下のすべての特徴を選択します。fwe
は、p 値が閾値以下のすべての特徴を選択します。閾値は 1/numFeatures でスケーリングされ、選択のファミリーワイズエラー率を制御します。
デフォルトでは、選択モードは numTopFeatures
で、デフォルトの selectionThreshold は 50 に設定されています。
例
予測対象として使用される id
、features
、および label
の列を持つ DataFrame があると仮定します。
id | features | label
---|--------------------------------|---------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0
featureType
を continuous
に、labelType
を categorical
に設定し、numTopFeatures = 1
とした場合、features
の最後の列が最も有用な特徴として選択されます。
id | features | label | selectedFeatures
---|--------------------------------|---------|------------------
1 | [1.7, 4.4, 7.6, 5.8, 9.6, 2.3] | 3.0 | [2.3]
2 | [8.8, 7.3, 5.7, 7.3, 2.2, 4.1] | 2.0 | [4.1]
3 | [1.2, 9.5, 2.5, 3.1, 8.7, 2.5] | 3.0 | [2.5]
4 | [3.7, 9.2, 6.1, 4.1, 7.5, 3.8] | 2.0 | [3.8]
5 | [8.9, 5.2, 7.8, 8.3, 5.2, 3.0] | 4.0 | [3.0]
6 | [7.9, 8.5, 9.2, 4.0, 9.4, 2.1] | 4.0 | [2.1]
API の詳細については、UnivariateFeatureSelector Python ドキュメントを参照してください。
from pyspark.ml.feature import UnivariateFeatureSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([1.7, 4.4, 7.6, 5.8, 9.6, 2.3]), 3.0,),
(2, Vectors.dense([8.8, 7.3, 5.7, 7.3, 2.2, 4.1]), 2.0,),
(3, Vectors.dense([1.2, 9.5, 2.5, 3.1, 8.7, 2.5]), 3.0,),
(4, Vectors.dense([3.7, 9.2, 6.1, 4.1, 7.5, 3.8]), 2.0,),
(5, Vectors.dense([8.9, 5.2, 7.8, 8.3, 5.2, 3.0]), 4.0,),
(6, Vectors.dense([7.9, 8.5, 9.2, 4.0, 9.4, 2.1]), 4.0,)], ["id", "features", "label"])
selector = UnivariateFeatureSelector(featuresCol="features", outputCol="selectedFeatures",
labelCol="label", selectionMode="numTopFeatures")
selector.setFeatureType("continuous").setLabelType("categorical").setSelectionThreshold(1)
result = selector.fit(df).transform(df)
print("UnivariateFeatureSelector output with top %d features selected using f_classif"
% selector.getSelectionThreshold())
result.show()
API の詳細については、UnivariateFeatureSelector Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.UnivariateFeatureSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
)
val df = spark.createDataset(data).toDF("id", "features", "label")
val selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"UnivariateFeatureSelector output with top ${selector.getSelectionThreshold}" +
s" features selected using f_classif")
result.show()
API の詳細については、UnivariateFeatureSelector Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.UnivariateFeatureSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(1.7, 4.4, 7.6, 5.8, 9.6, 2.3), 3.0),
RowFactory.create(2, Vectors.dense(8.8, 7.3, 5.7, 7.3, 2.2, 4.1), 2.0),
RowFactory.create(3, Vectors.dense(1.2, 9.5, 2.5, 3.1, 8.7, 2.5), 3.0),
RowFactory.create(4, Vectors.dense(3.7, 9.2, 6.1, 4.1, 7.5, 3.8), 2.0),
RowFactory.create(5, Vectors.dense(8.9, 5.2, 7.8, 8.3, 5.2, 3.0), 4.0),
RowFactory.create(6, Vectors.dense(7.9, 8.5, 9.2, 4.0, 9.4, 2.1), 4.0)
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty()),
new StructField("label", DataTypes.DoubleType, false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
UnivariateFeatureSelector selector = new UnivariateFeatureSelector()
.setFeatureType("continuous")
.setLabelType("categorical")
.setSelectionMode("numTopFeatures")
.setSelectionThreshold(1)
.setFeaturesCol("features")
.setLabelCol("label")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("UnivariateFeatureSelector output with top "
+ selector.getSelectionThreshold() + " features selected using f_classif");
result.show();
VarianceThresholdSelector
VarianceThresholdSelector
は、分散の低い特徴を削除するセレクターです。varianceThreshold
より大きくない(サンプル)分散を持つ特徴は削除されます。設定されていない場合、varianceThreshold
はデフォルトで 0 に設定されます。これは、分散が 0 の特徴(つまり、すべてのサンプルで同じ値を持つ特徴)のみが削除されることを意味します。
例
予測対象として使用される id
および features
の列を持つ DataFrame があると仮定します。
id | features
---|--------------------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0]
6 つの特徴のサンプル分散は、それぞれ 16.67、0.67、8.17、10.17、5.07、11.47 です。varianceThreshold = 8.0
で VarianceThresholdSelector
を使用する場合、分散が 8.0 以下の特徴は削除されます。
id | features | selectedFeatures
---|--------------------------------|-------------------
1 | [6.0, 7.0, 0.0, 7.0, 6.0, 0.0] | [6.0,0.0,7.0,0.0]
2 | [0.0, 9.0, 6.0, 0.0, 5.0, 9.0] | [0.0,6.0,0.0,9.0]
3 | [0.0, 9.0, 3.0, 0.0, 5.0, 5.0] | [0.0,3.0,0.0,5.0]
4 | [0.0, 9.0, 8.0, 5.0, 6.0, 4.0] | [0.0,8.0,5.0,4.0]
5 | [8.0, 9.0, 6.0, 5.0, 4.0, 4.0] | [8.0,6.0,5.0,4.0]
6 | [8.0, 9.0, 6.0, 0.0, 0.0, 0.0] | [8.0,6.0,0.0,0.0]
API の詳細については、VarianceThresholdSelector Python ドキュメントを参照してください。
from pyspark.ml.feature import VarianceThresholdSelector
from pyspark.ml.linalg import Vectors
df = spark.createDataFrame([
(1, Vectors.dense([6.0, 7.0, 0.0, 7.0, 6.0, 0.0])),
(2, Vectors.dense([0.0, 9.0, 6.0, 0.0, 5.0, 9.0])),
(3, Vectors.dense([0.0, 9.0, 3.0, 0.0, 5.0, 5.0])),
(4, Vectors.dense([0.0, 9.0, 8.0, 5.0, 6.0, 4.0])),
(5, Vectors.dense([8.0, 9.0, 6.0, 5.0, 4.0, 4.0])),
(6, Vectors.dense([8.0, 9.0, 6.0, 0.0, 0.0, 0.0]))], ["id", "features"])
selector = VarianceThresholdSelector(varianceThreshold=8.0, outputCol="selectedFeatures")
result = selector.fit(df).transform(df)
print("Output: Features with variance lower than %f are removed." %
selector.getVarianceThreshold())
result.show()
API の詳細については、VarianceThresholdSelector Scala ドキュメントを参照してください。
import org.apache.spark.ml.feature.VarianceThresholdSelector
import org.apache.spark.ml.linalg.Vectors
val data = Seq(
(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
)
val df = spark.createDataset(data).toDF("id", "features")
val selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures")
val result = selector.fit(df).transform(df)
println(s"Output: Features with variance lower than" +
s" ${selector.getVarianceThreshold} are removed.")
result.show()
API の詳細については、VarianceThresholdSelector Java ドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.VarianceThresholdSelector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.*;
List<Row> data = Arrays.asList(
RowFactory.create(1, Vectors.dense(6.0, 7.0, 0.0, 7.0, 6.0, 0.0)),
RowFactory.create(2, Vectors.dense(0.0, 9.0, 6.0, 0.0, 5.0, 9.0)),
RowFactory.create(3, Vectors.dense(0.0, 9.0, 3.0, 0.0, 5.0, 5.0)),
RowFactory.create(4, Vectors.dense(0.0, 9.0, 8.0, 5.0, 6.0, 4.0)),
RowFactory.create(5, Vectors.dense(8.0, 9.0, 6.0, 5.0, 4.0, 4.0)),
RowFactory.create(6, Vectors.dense(8.0, 9.0, 6.0, 0.0, 0.0, 0.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> df = spark.createDataFrame(data, schema);
VarianceThresholdSelector selector = new VarianceThresholdSelector()
.setVarianceThreshold(8.0)
.setFeaturesCol("features")
.setOutputCol("selectedFeatures");
Dataset<Row> result = selector.fit(df).transform(df);
System.out.println("Output: Features with variance lower than "
+ selector.getVarianceThreshold() + " are removed.");
result.show();
Locality Sensitive Hashing
局所性鋭敏型ハッシュ法 (LSH) は、大規模なデータセットを使用したクラスタリング、近似最近傍探索、および外れ値検出で一般的に使用される、重要なハッシュ技術のクラスです。
LSH の一般的な考え方は、データポイントをバケットにハッシュするために、関数のファミリー(「LSH ファミリー」)を使用することです。これにより、互いに近いデータポイントは高い確率で同じバケットに入り、互いに離れたデータポイントは異なるバケットに入る可能性が非常に高くなります。LSH ファミリーは、次のように正式に定義されます。
メトリック空間 (M, d)
において、M
は集合であり、d
は M
上の距離関数である場合、LSH ファミリーとは、次のプロパティを満たす関数のファミリー h
です: \[ \forall p, q \in M,\\ d(p,q) \leq r1 \Rightarrow Pr(h(p)=h(q)) \geq p1\\ d(p,q) \geq r2 \Rightarrow Pr(h(p)=h(q)) \leq p2 \]
この LSH ファミリーは、(r1, r2, p1, p2)
-敏感型と呼ばれます。
Spark では、異なる LSH ファミリーは個別のクラス(例: MinHash
)で実装され、各クラスで特徴変換、近似類似性結合、および近似最近傍探索のための API が提供されています。
LSH では、同じバケットにハッシュされる遠い入力特徴のペア($d(p,q) \geq r2$
)を偽陽性と定義し、異なるバケットにハッシュされる近い特徴のペア($d(p,q) \leq r1$
)を偽陰性と定義します。
LSH 操作
LSH が使用できる主な操作の種類について説明します。適合した LSH モデルには、これらの各操作のためのメソッドがあります。
特徴量変換
特徴変換は、ハッシュされた値を新しい列として追加する基本的な機能です。これは、次元削減に役立つ可能性があります。ユーザーは、inputCol
および outputCol
を設定して、入力および出力列名を指定できます。
LSH は、複数の LSH ハッシュテーブルもサポートしています。ユーザーは、numHashTables
を設定して、ハッシュテーブルの数を指定できます。これは、近似類似性結合と近似最近傍探索における OR 増幅にも使用されます。ハッシュテーブルの数を増やすと精度は向上しますが、通信コストと実行時間も増加します。
outputCol
の型は Seq[Vector]
であり、配列の次元は numHashTables
と等しく、ベクトルの次元は現在 1 に設定されています。将来のリリースでは、ユーザーがこれらのベクトルの次元を指定できるように、AND 増幅を実装する予定です。
近似類似結合
近似類似性結合は、2 つのデータセットを受け取り、データセット内の距離がユーザー定義の閾値よりも小さい行のペアを近似的に返します。近似類似性結合は、2 つの異なるデータセットの結合と自己結合の両方をサポートしています。自己結合は、重複するペアを生成します。
近似類似性結合は、変換されたデータセットと変換されていないデータセットの両方を入力として受け入れます。変換されていないデータセットが使用される場合、自動的に変換されます。この場合、ハッシュ署名は outputCol
として作成されます。
結合されたデータセットでは、元のデータセットはdatasetA
とdatasetB
でクエリできます。出力データセットには、返された各行のペア間の真の距離を示す距離列が追加されます。
近似最近傍探索
近似最近傍探索は、(特徴ベクトルの)データセットとキー(単一の特徴ベクトル)を受け取り、そのキーに最も近いデータセット内の指定された数の行を近似的に返します。
近似最近傍探索は、変換されたデータセットと未変換のデータセットの両方を入力として受け入れます。未変換のデータセットが使用された場合、自動的に変換されます。この場合、ハッシュ署名はoutputCol
として作成されます。
出力データセットには、各出力行と検索されたキーとの間の真の距離を示す距離列が追加されます。
注意: 近似最近傍探索では、ハッシュバケット内に十分な候補がない場合、k
行よりも少ない行が返されます。
LSH アルゴリズム
ユークリッド距離に対するバケット化ランダム射影
バケット化ランダム射影は、ユークリッド距離のためのLSHファミリです。ユークリッド距離は次のように定義されます: \[ d(\mathbf{x}, \mathbf{y}) = \sqrt{\sum_i (x_i - y_i)^2} \]
そのLSHファミリは、特徴ベクトル$\mathbf{x}$
をランダムな単位ベクトル$\mathbf{v}$
に射影し、射影された結果をハッシュバケットに分割します: \[ h(\mathbf{x}) = \Big\lfloor \frac{\mathbf{x} \cdot \mathbf{v}}{r} \Big\rfloor \]
ここで、r
はユーザー定義のバケット長です。バケット長は、ハッシュバケットの平均サイズ(つまり、バケット数)を制御するために使用できます。バケット長が大きいほど(つまり、バケットが少ないほど)、特徴が同じバケットにハッシュされる確率が高くなります(真陽性および偽陽性の数が増加します)。
バケット化ランダム射影は、任意ベクトルを入力特徴として受け入れ、スパースベクトルと密ベクトルの両方をサポートします。
APIの詳細については、BucketedRandomProjectionLSH Pythonドキュメントを参照してください。
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.dense([1.0, 1.0]),),
(1, Vectors.dense([1.0, -1.0]),),
(2, Vectors.dense([-1.0, -1.0]),),
(3, Vectors.dense([-1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(4, Vectors.dense([1.0, 0.0]),),
(5, Vectors.dense([-1.0, 0.0]),),
(6, Vectors.dense([0.0, 1.0]),),
(7, Vectors.dense([0.0, -1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.dense([1.0, 0.0])
brp = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,
numHashTables=3)
model = brp.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
print("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, distCol="EuclideanDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
APIの詳細については、BucketedRandomProjectionLSH Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.dense(1.0, 1.0)),
(1, Vectors.dense(1.0, -1.0)),
(2, Vectors.dense(-1.0, -1.0)),
(3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(4, Vectors.dense(1.0, 0.0)),
(5, Vectors.dense(-1.0, 0.0)),
(6, Vectors.dense(0.0, 1.0)),
(7, Vectors.dense(0.0, -1.0))
)).toDF("id", "features")
val key = Vectors.dense(1.0, 0.0)
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes")
val model = brp.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
println("Approximately joining dfA and dfB on Euclidean distance smaller than 1.5:")
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
APIの詳細については、BucketedRandomProjectionLSH Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH;
import org.apache.spark.ml.feature.BucketedRandomProjectionLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.dense(1.0, 1.0)),
RowFactory.create(1, Vectors.dense(1.0, -1.0)),
RowFactory.create(2, Vectors.dense(-1.0, -1.0)),
RowFactory.create(3, Vectors.dense(-1.0, 1.0))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(4, Vectors.dense(1.0, 0.0)),
RowFactory.create(5, Vectors.dense(-1.0, 0.0)),
RowFactory.create(6, Vectors.dense(0.0, 1.0)),
RowFactory.create(7, Vectors.dense(0.0, -1.0))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
Vector key = Vectors.dense(1.0, 0.0);
BucketedRandomProjectionLSH mh = new BucketedRandomProjectionLSH()
.setBucketLength(2.0)
.setNumHashTables(3)
.setInputCol("features")
.setOutputCol("hashes");
BucketedRandomProjectionLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 1.5)`
System.out.println("Approximately joining dfA and dfB on distance smaller than 1.5:");
model.approxSimilarityJoin(dfA, dfB, 1.5, "EuclideanDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("EuclideanDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();
ジャカード距離に対する MinHash
MinHashは、入力特徴が自然数の集合である場合のジャカード距離のLSHファミリです。2つの集合のジャカード距離は、それらの交差と結合のカーディナリティによって定義されます: \[ d(\mathbf{A}, \mathbf{B}) = 1 - \frac{|\mathbf{A} \cap \mathbf{B}|}{|\mathbf{A} \cup \mathbf{B}|} \]
MinHashは、ランダムなハッシュ関数g
を集合内の各要素に適用し、すべてのハッシュ値の最小値を取ります: \[ h(\mathbf{A}) = \min_{a \in \mathbf{A}}(g(a)) \]
MinHashの入力集合は、バイナリベクトルとして表現されます。ここで、ベクトルのインデックスは要素自体を表し、ベクトルの非ゼロ値は、その要素が集合内に存在することを表します。密ベクトルとスパースベクトルの両方がサポートされていますが、通常は効率のためにスパースベクトルが推奨されます。たとえば、Vectors.sparse(10, Array[(2, 1.0), (3, 1.0), (5, 1.0)])
は、空間内に10個の要素があることを意味します。このセットには、要素2、要素3、要素5が含まれています。すべての非ゼロ値は、バイナリの「1」値として扱われます。
注意: 空のセットはMinHashによって変換できません。つまり、入力ベクトルには少なくとも1つの非ゼロエントリが必要です。
APIの詳細については、MinHashLSH Pythonドキュメントを参照してください。
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
dataA = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
(1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
(2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),)]
dfA = spark.createDataFrame(dataA, ["id", "features"])
dataB = [(3, Vectors.sparse(6, [1, 3, 5], [1.0, 1.0, 1.0]),),
(4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
(5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
dfB = spark.createDataFrame(dataB, ["id", "features"])
key = Vectors.sparse(6, [1, 3], [1.0, 1.0])
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=5)
model = mh.fit(dfA)
# Feature Transformation
print("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
print("Approximately joining dfA and dfB on distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, distCol="JaccardDistance")\
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
# Compute the locality sensitive hashes for the input rows, then perform approximate nearest
# neighbor search.
# We could avoid computing hashes by passing in the already-transformed dataset, e.g.
# `model.approxNearestNeighbors(transformedA, key, 2)`
# It may return less than 2 rows when not enough approximate near-neighbor candidates are
# found.
print("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
APIの詳細については、MinHashLSH Scalaドキュメントを参照してください。
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "features")
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
val mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes")
val model = mh.fit(dfA)
// Feature Transformation
println("The hashed dataset where hashed values are stored in the column 'hashes':")
model.transform(dfA).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:")
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show()
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
println("Approximately searching dfA for 2 nearest neighbors of the key:")
model.approxNearestNeighbors(dfA, key, 2).show()
APIの詳細については、MinHashLSH Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.feature.MinHashLSH;
import org.apache.spark.ml.feature.MinHashLSHModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.ml.linalg.VectorUDT;
import org.apache.spark.ml.linalg.Vectors;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import static org.apache.spark.sql.functions.col;
List<Row> dataA = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{0, 1, 2}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{0, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
List<Row> dataB = Arrays.asList(
RowFactory.create(0, Vectors.sparse(6, new int[]{1, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(1, Vectors.sparse(6, new int[]{2, 3, 5}, new double[]{1.0, 1.0, 1.0})),
RowFactory.create(2, Vectors.sparse(6, new int[]{1, 2, 4}, new double[]{1.0, 1.0, 1.0}))
);
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", new VectorUDT(), false, Metadata.empty())
});
Dataset<Row> dfA = spark.createDataFrame(dataA, schema);
Dataset<Row> dfB = spark.createDataFrame(dataB, schema);
int[] indices = {1, 3};
double[] values = {1.0, 1.0};
Vector key = Vectors.sparse(6, indices, values);
MinHashLSH mh = new MinHashLSH()
.setNumHashTables(5)
.setInputCol("features")
.setOutputCol("hashes");
MinHashLSHModel model = mh.fit(dfA);
// Feature Transformation
System.out.println("The hashed dataset where hashed values are stored in the column 'hashes':");
model.transform(dfA).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate
// similarity join.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxSimilarityJoin(transformedA, transformedB, 0.6)`
System.out.println("Approximately joining dfA and dfB on Jaccard distance smaller than 0.6:");
model.approxSimilarityJoin(dfA, dfB, 0.6, "JaccardDistance")
.select(col("datasetA.id").alias("idA"),
col("datasetB.id").alias("idB"),
col("JaccardDistance")).show();
// Compute the locality sensitive hashes for the input rows, then perform approximate nearest
// neighbor search.
// We could avoid computing hashes by passing in the already-transformed dataset, e.g.
// `model.approxNearestNeighbors(transformedA, key, 2)`
// It may return less than 2 rows when not enough approximate near-neighbor candidates are
// found.
System.out.println("Approximately searching dfA for 2 nearest neighbors of the key:");
model.approxNearestNeighbors(dfA, key, 2).show();