特徴抽出と変換 - RDDベースAPI
TF-IDF
注意: DataFrameベースのAPIの使用を推奨します。詳細はMLユーザーガイドのTF-IDFを参照してください。
TF-IDF(Term Frequency-Inverse Document Frequency)は、コーパス内の文書に対する単語の重要度を反映するために、テキストマイニングで広く使用されている特徴ベクトル化手法です。単語を$t$、文書を$d$、コーパスを$D$とします。単語頻度$TF(t, d)$は、単語$t$が文書$d$に出現する回数であり、文書頻度$DF(t, D)$は、単語$t$を含む文書の数です。重要度を測るために単語頻度のみを使用すると、「a」、「the」、「of」のように、頻繁に出現するが文書に関する情報がほとんどない単語を過度に強調してしまう可能性があります。単語がコーパス全体で頻繁に出現する場合、それは特定の文書に関する特別な情報を持っていないことを意味します。逆文書頻度は、単語がどれだけの情報を提供するかを示す数値尺度です。\[ IDF(t, D) = \log \frac{|D| + 1}{DF(t, D) + 1}, \]ここで$|D|$はコーパス内の文書の総数です。対数が使用されているため、単語がすべての文書に出現する場合、そのIDF値は0になります。コーパス外の単語のゼロ除算を避けるために平滑化項が適用されていることに注意してください。TF-IDF尺度は、TFとIDFの積にすぎません。\[ TFIDF(t, d, D) = TF(t, d) \cdot IDF(t, D). \]単語頻度と文書頻度の定義にはいくつかのバリエーションがあります。spark.mllibでは、TFとIDFを分離して柔軟性を持たせています。
単語頻度の実装では、ハッシュトリックを利用しています。生のフィーチャーは、ハッシュ関数を適用することによってインデックス(単語)にマッピングされます。その後、マッピングされたインデックスに基づいて単語頻度が計算されます。このアプローチは、大規模なコーパスでは高価になる可能性のあるグローバルな単語-インデックスマップの計算を回避しますが、ハッシュ衝突の可能性があり、異なる生のフィーチャーがハッシュ後に同じ単語になる可能性があります。衝突の可能性を減らすために、ターゲットフィーチャーの次元、つまりハッシュテーブルのバケット数を増やすことができます。デフォルトのフィーチャー次元は$2^{20} = 1,048,576$です。
注意: spark.mllibはテキストセグメンテーションツールを提供していません。Stanford NLP Groupおよびscalanlp/chalkを参照してください。
TFとIDFは、HashingTFとIDFで実装されています。HashingTFは、リストのRDDを入力として受け取ります。各レコードは、文字列またはその他の型のイテラブルである場合があります。
HashingTF Pythonドキュメントを参照して、APIの詳細を確認してください。
from pyspark.mllib.feature import HashingTF, IDF
# Load documents (one per line).
documents = sc.textFile("data/mllib/kmeans_data.txt").map(lambda line: line.split(" "))
hashingTF = HashingTF()
tf = hashingTF.transform(documents)
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
# spark.mllib's IDF implementation provides an option for ignoring terms
# which occur in less than a minimum number of documents.
# In such cases, the IDF for these terms is set to 0.
# This feature can be used by passing the minDocFreq value to the IDF constructor.
idfIgnore = IDF(minDocFreq=2).fit(tf)
tfidfIgnore = idfIgnore.transform(tf)TFとIDFは、HashingTFとIDFで実装されています。HashingTFは、RDD[Iterable[_]]を入力として受け取ります。各レコードは、文字列またはその他の型のイテラブルである場合があります。
HashingTF Scalaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.mllib.feature.{HashingTF, IDF}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.rdd.RDD
// Load documents (one per line).
val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt")
.map(_.split(" ").toSeq)
val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documents)
// While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
// First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)
// spark.mllib IDF implementation provides an option for ignoring terms which occur in less than
// a minimum number of documents. In such cases, the IDF for these terms is set to 0.
// This feature can be used by passing the minDocFreq value to the IDF constructor.
val idfIgnore = new IDF(minDocFreq = 2).fit(tf)
val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)Word2Vec
Word2Vecは、単語の分散ベクトル表現を計算します。分散表現の主な利点は、類似した単語がベクトル空間で近くに配置されることで、新しいパターンへの一般化が容易になり、モデル推定がより堅牢になります。分散ベクトル表現は、固有表現認識、曖昧性解消、解析、タグ付け、機械翻訳など、多くの自然言語処理アプリケーションで有用であることが示されています。
モデル
Word2Vecの実装では、スキップグラムモデルを使用しています。スキップグラムの学習目的は、同じ文脈内での予測に優れた単語ベクトル表現を学習することです。数学的には、学習単語のシーケンス$w_1, w_2, \dots, w_T$が与えられた場合、スキップグラムモデルの目的は、平均対数尤度を最大化することです。\[ \frac{1}{T} \sum_{t = 1}^{T}\sum_{j=-k}^{j=k} \log p(w_{t+j} | w_t) \]ここで、$k$は学習ウィンドウのサイズです。
スキップグラムモデルでは、各単語$w$は、単語としての$w$と文脈としての$w$のベクトル表現である2つのベクトル$u_w$と$v_w$に関連付けられています。単語$w_j$が与えられたときに単語$w_i$を正しく予測する確率は、ソフトマックスモデルによって決定されます。\[ p(w_i | w_j ) = \frac{\exp(u_{w_i}^{\top}v_{w_j})}{\sum_{l=1}^{V} \exp(u_l^{\top}v_{w_j})} \]ここで$V$は語彙サイズです。
ソフトマックスを用いたスキップグラムモデルは、$\log p(w_i | w_j)$の計算コストが$V$に比例するため高価ですが、$V$は数百万オーダーになる可能性があります。Word2Vecの学習を高速化するために、階層的ソフトマックスを使用し、$\log p(w_i | w_j)$の計算複雑度を$O(\log(V))$に削減しました。
例
以下の例では、テキストファイルをロードし、それをSeq[String]のRDDとして解析し、Word2Vecインスタンスを構築してから、入力データでWord2VecModelをフィットさせる方法を示しています。最後に、指定された単語のトップ40の同義語を表示します。例を実行するには、まずtext8データをダウンロードし、お好みのディレクトリに展開してください。ここでは、展開されたファイルがtext8であり、spark shellを実行するのと同じディレクトリにあると仮定します。
Word2Vec Pythonドキュメントを参照して、APIの詳細を確認してください。
from pyspark.mllib.feature import Word2Vec
inp = sc.textFile("data/mllib/sample_lda_data.txt").map(lambda row: row.split(" "))
word2vec = Word2Vec()
model = word2vec.fit(inp)
synonyms = model.findSynonyms('1', 5)
for word, cosine_distance in synonyms:
print("{}: {}".format(word, cosine_distance))Word2Vec Scalaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel}
val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq)
val word2vec = new Word2Vec()
val model = word2vec.fit(input)
val synonyms = model.findSynonyms("1", 5)
for ((synonym, cosineSimilarity) <- synonyms) {
println(s"$synonym $cosineSimilarity")
}
// Save and load model
model.save(sc, "myModelPath")
val sameModel = Word2VecModel.load(sc, "myModelPath")StandardScaler
学習セットのサンプルに関する列の要約統計量を使用して、単位分散へのスケーリングや平均の除去を行うことで特徴を標準化します。これは非常に一般的な前処理ステップです。
例えば、サポートベクターマシン(SVM)のRBFカーネルや、L1およびL2正則化線形モデルは、通常、すべて特徴が単位分散および/またはゼロ平均を持つ場合にうまく機能します。
標準化は、最適化プロセス中の収束率を改善し、また、非常に大きな分散を持つ特徴がモデル学習中に過度に大きな影響力を持つことを防ぎます。
モデルのフィッティング
StandardScalerは、コンストラクタに以下のパラメータがあります。
withMeanデフォルトはFalseです。スケーリング前にデータを平均で中心化します。密な出力を構築するため、スパース入力に適用する際は注意が必要です。withStdデフォルトはTrueです。データを単位標準偏差にスケーリングします。
StandardScalerにはfitメソッドが用意されており、RDD[Vector]の入力を受け取り、要約統計量を学習し、その後、StandardScalerの設定に応じて、入力データセットを単位標準偏差および/またはゼロ平均の特徴に変換できるモデルを返します。
このモデルはVectorTransformerを実装しており、Vectorに標準化を適用して変換されたVectorを生成するか、RDD[Vector]に適用して変換されたRDD[Vector]を生成できます。
特徴の分散がゼロの場合、その特徴のVectorにはデフォルトで0.0の値が返されることに注意してください。
例
以下の例は、libsvm形式のデータセットをロードし、新しい特徴が単位標準偏差および/またはゼロ平均を持つように特徴を標準化する方法を示しています。
StandardScaler Pythonドキュメントを参照して、APIの詳細を確認してください。
from pyspark.mllib.feature import StandardScaler
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
label = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
scaler1 = StandardScaler().fit(features)
scaler2 = StandardScaler(withMean=True, withStd=True).fit(features)
# data1 will be unit variance.
data1 = label.zip(scaler1.transform(features))
# data2 will be unit variance and zero mean.
data2 = label.zip(scaler2.transform(features.map(lambda x: Vectors.dense(x.toArray()))))StandardScaler Scalaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val scaler1 = new StandardScaler().fit(data.map(x => x.features))
val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features))
// scaler3 is an identical model to scaler2, and will produce identical transformations
val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean)
// data1 will be unit variance.
val data1 = data.map(x => (x.label, scaler1.transform(x.features)))
// data2 will be unit variance and zero mean.
val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))Normalizer
Normalizerは、個々のサンプルを単位$L^p$ノルムを持つようにスケーリングします。これは、テキスト分類やクラスタリングにおいて一般的な操作です。例えば、2つの$L^2$正規化されたTF-IDFベクトルの内積は、ベクトル間のコサイン類似度になります。
Normalizerは、コンストラクタに以下のパラメータがあります。
p$L^p$空間での正規化、$p = 2$(デフォルト)です。
NormalizerはVectorTransformerを実装しており、Vectorに正規化を適用して変換されたVectorを生成するか、RDD[Vector]に適用して変換されたRDD[Vector]を生成できます。
入力のノルムがゼロの場合、入力ベクトルが返されることに注意してください。
例
以下の例は、libsvm形式のデータセットをロードし、$L^2$ノルムおよび$L^\infty$ノルムで特徴を正規化する方法を示しています。
Normalizer Pythonドキュメントを参照して、APIの詳細を確認してください。
from pyspark.mllib.feature import Normalizer
from pyspark.mllib.util import MLUtils
data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
labels = data.map(lambda x: x.label)
features = data.map(lambda x: x.features)
normalizer1 = Normalizer()
normalizer2 = Normalizer(p=float("inf"))
# Each sample in data1 will be normalized using $L^2$ norm.
data1 = labels.zip(normalizer1.transform(features))
# Each sample in data2 will be normalized using $L^\infty$ norm.
data2 = labels.zip(normalizer2.transform(features))Normalizer Scalaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.mllib.feature.Normalizer
import org.apache.spark.mllib.util.MLUtils
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val normalizer1 = new Normalizer()
val normalizer2 = new Normalizer(p = Double.PositiveInfinity)
// Each sample in data1 will be normalized using $L^2$ norm.
val data1 = data.map(x => (x.label, normalizer1.transform(x.features)))
// Each sample in data2 will be normalized using $L^\infty$ norm.
val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))ChiSqSelector
特徴選択は、モデル構築に使用する関連特徴を特定しようとします。特徴空間のサイズを削減することで、速度と統計学習の両方の動作を改善できます。
ChiSqSelectorは、カイ二乗特徴選択を実装しています。これは、カテゴリカル特徴を持つラベル付きデータで動作します。ChiSqSelectorは、選択する特徴を決定するためにカイ二乗独立性検定を使用します。5つの選択方法をサポートしています:numTopFeatures、percentile、fpr、fdr、fwe。
numTopFeaturesは、カイ二乗検定に従って固定数のトップ特徴を選択します。これは、最も予測力の高い特徴を生成することに似ています。percentileはnumTopFeaturesに似ていますが、固定数ではなく、すべての特徴の割合を選択します。fprは、p値がしきい値未満のすべて特徴を選択し、選択の偽陽性率を制御します。fdrは、Benjamini-Hochberg手順を使用して、偽発見率がしきい値未満のすべて特徴を選択します。fweは、p値がしきい値未満のすべて特徴を選択します。しきい値は1/numFeaturesでスケーリングされ、選択のファミリーワイズエラー率を制御します。
デフォルトでは、選択方法はnumTopFeaturesであり、デフォルトのトップ特徴数は50に設定されています。ユーザーはsetSelectorTypeを使用して選択方法を選択できます。
選択する特徴の数は、保持された検証セットを使用して調整できます。
モデルのフィッティング
fitメソッドは、カテゴリカル特徴を持つRDD[LabeledPoint]の入力を受け取り、要約統計量を学習し、入力データセットを削減された特徴空間に変換できるChiSqSelectorModelを返します。ChiSqSelectorModelは、Vectorに適用して削減されたVectorを生成するか、RDD[Vector]に適用して削減されたRDD[Vector]を生成できます。
ユーザーは、手動で選択された特徴インデックスの配列(昇順である必要があります)を提供することによって、ChiSqSelectorModelを構築することもできることに注意してください。
例
以下の例は、ChiSqSelectorの基本的な使用方法を示しています。使用されているデータセットには、特徴ごとに0から255まで変化するグレースケール値からなる特徴行列があります。
ChiSqSelector Scalaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.mllib.feature.ChiSqSelector
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
// Load some data in libsvm format
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Even though features are doubles, the ChiSqSelector treats each unique value as a category
val discretizedData = data.map { lp =>
LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor }))
}
// Create ChiSqSelector that will select top 50 of 692 features
val selector = new ChiSqSelector(50)
// Create ChiSqSelector model (selecting features)
val transformer = selector.fit(discretizedData)
// Filter the top 50 features from each feature vector
val filteredData = discretizedData.map { lp =>
LabeledPoint(lp.label, transformer.transform(lp.features))
}ChiSqSelector Javaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ChiSqSelector;
import org.apache.spark.mllib.feature.ChiSqSelectorModel;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
JavaRDD<LabeledPoint> points = MLUtils.loadLibSVMFile(jsc.sc(),
"data/mllib/sample_libsvm_data.txt").toJavaRDD().cache();
// Discretize data in 16 equal bins since ChiSqSelector requires categorical features
// Although features are doubles, the ChiSqSelector treats each unique value as a category
JavaRDD<LabeledPoint> discretizedData = points.map(lp -> {
double[] discretizedFeatures = new double[lp.features().size()];
for (int i = 0; i < lp.features().size(); ++i) {
discretizedFeatures[i] = Math.floor(lp.features().apply(i) / 16);
}
return new LabeledPoint(lp.label(), Vectors.dense(discretizedFeatures));
});
// Create ChiSqSelector that will select top 50 of 692 features
ChiSqSelector selector = new ChiSqSelector(50);
// Create ChiSqSelector model (selecting features)
ChiSqSelectorModel transformer = selector.fit(discretizedData.rdd());
// Filter the top 50 features from each feature vector
JavaRDD<LabeledPoint> filteredData = discretizedData.map(lp ->
new LabeledPoint(lp.label(), transformer.transform(lp.features())));ElementwiseProduct
ElementwiseProductは、提供された「重み」ベクトルを使用して、要素ごとの積で各入力ベクトルを乗算します。つまり、データセットの各列をスカラー乗数でスケーリングします。これは、入力ベクトルvと変換ベクトルscalingVecの間のアダマール積を表し、結果ベクトルを生成します。
scalingVecを「w」と表すと、この変換は次のように記述できます。
\[ \begin{pmatrix} v_1 \\ \vdots \\ v_N \end{pmatrix} \circ \begin{pmatrix} w_1 \\ \vdots \\ w_N \end{pmatrix} = \begin{pmatrix} v_1 w_1 \\ \vdots \\ v_N w_N \end{pmatrix} \]
ElementwiseProductは、コンストラクタに以下のパラメータがあります。
scalingVec: 変換ベクトル。
ElementwiseProductはVectorTransformerを実装しており、Vectorに重み付けを適用して変換されたVectorを生成するか、RDD[Vector]に適用して変換されたRDD[Vector]を生成できます。
例
以下の例は、変換ベクトル値を使用してベクトルを変換する方法を示しています。
ElementwiseProduct Pythonドキュメントを参照して、APIの詳細を確認してください。
from pyspark.mllib.feature import ElementwiseProduct
from pyspark.mllib.linalg import Vectors
data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda x: [float(t) for t in x.split(" ")])
# Create weight vector.
transformingVector = Vectors.dense([0.0, 1.0, 2.0])
transformer = ElementwiseProduct(transformingVector)
# Batch transform
transformedData = transformer.transform(parsedData)
# Single-row transform
transformedData2 = transformer.transform(parsedData.first())ElementwiseProduct Scalaドキュメントを参照して、APIの詳細を確認してください。
import org.apache.spark.mllib.feature.ElementwiseProduct
import org.apache.spark.mllib.linalg.Vectors
// Create some vector data; also works for sparse vectors
val data = sc.parallelize(Seq(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)))
val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct(transformingVector)
// Batch transform and per-row transform give the same results:
val transformedData = transformer.transform(data)
val transformedData2 = data.map(x => transformer.transform(x))ElementwiseProduct Javaドキュメントを参照して、APIの詳細を確認してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.feature.ElementwiseProduct;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
// Create some vector data; also works for sparse vectors
JavaRDD<Vector> data = jsc.parallelize(Arrays.asList(
Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0)));
Vector transformingVector = Vectors.dense(0.0, 1.0, 2.0);
ElementwiseProduct transformer = new ElementwiseProduct(transformingVector);
// Batch transform and per-row transform give the same results:
JavaRDD<Vector> transformedData = transformer.transform(data);
JavaRDD<Vector> transformedData2 = data.map(transformer::transform);PCA
PCAを使用してベクトルを低次元空間に射影する特徴変換器です。詳細は次元削減を参照してください。