頻出パターンマイニング - RDDベースのAPI
頻出アイテム、アイテムセット、サブシーケンス、またはその他の部分構造のマイニングは大規模データセットを分析する最初のステップの1つであり、長年にわたりデータマイニングの活発な研究トピックとなっています。詳細については、Wikipediaの相関ルール学習を参照してください。 spark.mllib
は、頻出アイテムセットをマイニングするための一般的なアルゴリズムであるFP-growthの並列実装を提供します。
FP-growth
FP-growthアルゴリズムは、Han et al., Mining frequent patterns without candidate generationの論文で説明されています。「FP」は頻出パターンを表します。トランザクションのデータセットが与えられた場合、FP-growthの最初のステップは、アイテム頻度を計算し、頻出アイテムを特定することです。同じ目的のために設計されたAprioriのようなアルゴリズムとは異なり、FP-growthの2番目のステップでは、接尾辞ツリー(FP-tree)構造を使用して、通常は生成にコストがかかる候補セットを明示的に生成することなく、トランザクションをエンコードします。2番目のステップの後、頻出アイテムセットをFP-treeから抽出できます。 spark.mllib
では、Li et al., PFP: Parallel FP-growth for query recommendationで説明されているPFPと呼ばれるFP-growthの並列バージョンを実装しました。PFPは、トランザクションの接尾辞に基づいてFP-treeを成長させる作業を分散するため、単一マシンの実装よりもスケーラブルです。詳細については、論文を参照してください。
spark.mllib
のFP-growth実装は、次の(ハイパー)パラメータを取ります
minSupport
: アイテムセットが頻出として識別されるための最小サポート。たとえば、アイテムが5つのトランザクションのうち3つに現れる場合、そのサポートは3/5=0.6です。numPartitions
: 作業を分散するために使用されるパーティションの数。
例
FPGrowth
はFP-growthアルゴリズムを実装しています。各トランザクションがジェネリック型のアイテムのList
であるトランザクションのRDD
を取ります。FPGrowth.train
をトランザクションで呼び出すと、頻出アイテムセットとその頻度を格納するFPGrowthModel
が返されます。
APIの詳細については、FPGrowth
Pythonドキュメントを参照してください。
from pyspark.mllib.fpm import FPGrowth
data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
print(fi)
FPGrowth
はFP-growthアルゴリズムを実装しています。各トランザクションがジェネリック型のアイテムのArray
であるトランザクションのRDD
を取ります。FPGrowth.run
をトランザクションで呼び出すと、頻出アイテムセットとその頻度を格納するFPGrowthModel
が返されます。次の例は、transactions
から頻出アイテムセットと相関ルール(詳細は相関ルールを参照)をマイニングする方法を示しています。
APIの詳細については、FPGrowth
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(s"${itemset.items.mkString("[", ",", "]")},${itemset.freq}")
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(s"${rule.antecedent.mkString("[", ",", "]")}=> " +
s"${rule.consequent .mkString("[", ",", "]")},${rule.confidence}")
}
FPGrowth
はFP-growthアルゴリズムを実装しています。各トランザクションがジェネリック型のアイテムのIterable
であるトランザクションのJavaRDD
を取ります。FPGrowth.run
をトランザクションで呼び出すと、頻出アイテムセットとその頻度を格納するFPGrowthModel
が返されます。次の例は、transactions
から頻出アイテムセットと相関ルール(詳細は相関ルールを参照)をマイニングする方法を示しています。
APIの詳細については、FPGrowth
Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");
JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));
FPGrowth fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);
for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}
double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
: model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
相関ルール
AssociationRulesは、単一アイテムを結論とするルールの構築のための並列ルール生成アルゴリズムを実装しています。
APIの詳細については、AssociationRules
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset
val freqItemsets = sc.parallelize(Seq(
new FreqItemset(Array("a"), 15L),
new FreqItemset(Array("b"), 35L),
new FreqItemset(Array("a", "b"), 12L)
))
val ar = new AssociationRules()
.setMinConfidence(0.8)
val results = ar.run(freqItemsets)
results.collect().foreach { rule =>
println(s"[${rule.antecedent.mkString(",")}=>${rule.consequent.mkString(",")} ]" +
s" ${rule.confidence}")
}
AssociationRulesは、単一アイテムを結論とするルールの構築のための並列ルール生成アルゴリズムを実装しています。
APIの詳細については、AssociationRules
Javaドキュメントを参照してください。
import java.util.Arrays;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;
JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
new FreqItemset<>(new String[] {"a"}, 15L),
new FreqItemset<>(new String[] {"b"}, 35L),
new FreqItemset<>(new String[] {"a", "b"}, 12L)
));
AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
PrefixSpan
PrefixSpanは、Pei et al., Mining Sequential Patterns by Pattern-Growth: The PrefixSpan Approachで説明されているシーケンシャルパターンマイニングアルゴリズムです。シーケンシャルパターンマイニング問題の公式化については、参照論文を参照してください。
spark.mllib
のPrefixSpan実装は、次のパラメータを取ります
minSupport
: 頻出シーケンシャルパターンと見なされるために必要な最小サポート。maxPatternLength
: 頻出シーケンシャルパターンの最大長。この長さを超える頻出パターンは結果に含まれません。maxLocalProjDBSize
: 接頭辞射影データベースのローカル反復処理が開始される前に、接頭辞射影データベースに許可される最大アイテム数。このパラメータは、エグゼキュータのサイズに合わせて調整する必要があります。
例
次の例は、シーケンス上で実行されているPrefixSpanを示しています(Pei et alと同じ表記法を使用)。
<(12)3>
<1(32)(12)>
<(12)5>
<6>
PrefixSpan
はPrefixSpanアルゴリズムを実装しています。PrefixSpan.run
を呼び出すと、頻出シーケンスとその頻度を格納するPrefixSpanModel
が返されます。
APIの詳細については、PrefixSpan
ScalaドキュメントとPrefixSpanModel
Scalaドキュメントを参照してください。
import org.apache.spark.mllib.fpm.PrefixSpan
val sequences = sc.parallelize(Seq(
Array(Array(1, 2), Array(3)),
Array(Array(1), Array(3, 2), Array(1, 2)),
Array(Array(1, 2), Array(5)),
Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
println(
s"${freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]")}," +
s" ${freqSequence.freq}")
}
PrefixSpan
はPrefixSpanアルゴリズムを実装しています。PrefixSpan.run
を呼び出すと、頻出シーケンスとその頻度を格納するPrefixSpanModel
が返されます。
APIの詳細については、PrefixSpan
JavaドキュメントとPrefixSpanModel
Javaドキュメントを参照してください。
import java.util.Arrays;
import java.util.List;
import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;
JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequence<Integer> freqSeq: model.freqSequences().toJavaRDD().collect()) {
System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}