頻出パターンマイニング - RDDベースAPI

頻出アイテム、アイテムセット、サブシーケンス、またはその他の部分構造のマイニングは、通常、大規模データセットの分析の最初のステップの1つであり、長年にわたってデータマイニングにおける活発な研究トピックとなっています。詳細については、Wikipediaの相関ルール学習を参照してください。spark.mllibは、頻出アイテムセットのマイニングで一般的なアルゴリズムであるFP-growthの並列実装を提供します。

FP-growth

FP-growthアルゴリズムは、論文「Han et al., Mining frequent patterns without candidate generation」で説明されており、「FP」は頻出パターンを表します。トランザクションのデータセットが与えられた場合、FP-growthの最初のステップはアイテムの頻度を計算し、頻出アイテムを特定することです。同じ目的のために設計されたAprioriライクなアルゴリズムとは異なり、FP-growthの2番目のステップは、候補セットを明示的に生成することなくトランザクションをエンコードするためにサフィックスツリー(FP-tree)構造を使用します。候補セットの生成は通常コストがかかります。2番目のステップの後、FP-treeから頻出アイテムセットを抽出できます。spark.mllibでは、Li et al., PFP: Parallel FP-growth for query recommendationで説明されているPFPというFP-growthの並列バージョンを実装しました。PFPは、トランザクションのサフィックスに基づいてFP-treeの成長作業を分散するため、単一マシン実装よりもスケーラブルです。詳細については、各論文を参照してください。

spark.mllibのFP-growth実装は、次の(ハイパー)パラメータを取ります。

FPGrowthはFP-growthアルゴリズムを実装しています。これは、各トランザクションがジェネリック型のアイテムのListであるトランザクションのRDDを取ります。トランザクションでFPGrowth.trainを呼び出すと、頻出アイテムセットとその頻度を格納するFPGrowthModelが返されます。

APIの詳細については、FPGrowth Pythonドキュメントを参照してください。

from pyspark.mllib.fpm import FPGrowth

data = sc.textFile("data/mllib/sample_fpgrowth.txt")
transactions = data.map(lambda line: line.strip().split(' '))
model = FPGrowth.train(transactions, minSupport=0.2, numPartitions=10)
result = model.freqItemsets().collect()
for fi in result:
    print(fi)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/fpgrowth_example.py」にあります。

FPGrowthはFP-growthアルゴリズムを実装しています。これは、各トランザクションがジェネリック型のアイテムのArrayであるトランザクションのRDDを取ります。トランザクションでFPGrowth.runを呼び出すと、頻出アイテムセットとその頻度を格納するFPGrowthModelが返されます。次の例は、トランザクションから頻出アイテムセットと相関ルール(詳細については相関ルールを参照)をマイニングする方法を示しています。

APIの詳細については、FPGrowth Scalaドキュメントを参照してください。

import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(s"${itemset.items.mkString("[", ",", "]")},${itemset.freq}")
}

val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(s"${rule.antecedent.mkString("[", ",", "]")}=> " +
    s"${rule.consequent .mkString("[", ",", "]")},${rule.confidence}")
}
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/SimpleFPGrowth.scala」にあります。

FPGrowthはFP-growthアルゴリズムを実装しています。これは、各トランザクションがジェネリック型のアイテムのIterableであるトランザクションのJavaRDDを取ります。トランザクションでFPGrowth.runを呼び出すと、頻出アイテムセットとその頻度を格納するFPGrowthModelが返されます。次の例は、トランザクションから頻出アイテムセットと相関ルール(詳細については相関ルールを参照)をマイニングする方法を示しています。

APIの詳細については、FPGrowth Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;

JavaRDD<String> data = sc.textFile("data/mllib/sample_fpgrowth.txt");

JavaRDD<List<String>> transactions = data.map(line -> Arrays.asList(line.split(" ")));

FPGrowth fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10);
FPGrowthModel<String> model = fpg.run(transactions);

for (FPGrowth.FreqItemset<String> itemset: model.freqItemsets().toJavaRDD().collect()) {
  System.out.println("[" + itemset.javaItems() + "], " + itemset.freq());
}

double minConfidence = 0.8;
for (AssociationRules.Rule<String> rule
  : model.generateAssociationRules(minConfidence).toJavaRDD().collect()) {
  System.out.println(
    rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaSimpleFPGrowth.java」にあります。

相関ルール

AssociationRulesは、後件が単一のアイテムであるルールを構築するための並列ルール生成アルゴリズムを実装しています。

APIの詳細については、AssociationRules Scalaドキュメントを参照してください。

import org.apache.spark.mllib.fpm.AssociationRules
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset

val freqItemsets = sc.parallelize(Seq(
  new FreqItemset(Array("a"), 15L),
  new FreqItemset(Array("b"), 35L),
  new FreqItemset(Array("a", "b"), 12L)
))

val ar = new AssociationRules()
  .setMinConfidence(0.8)
val results = ar.run(freqItemsets)

results.collect().foreach { rule =>
println(s"[${rule.antecedent.mkString(",")}=>${rule.consequent.mkString(",")} ]" +
    s" ${rule.confidence}")
}
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/AssociationRulesExample.scala」にあります。

AssociationRulesは、後件が単一のアイテムであるルールを構築するための並列ルール生成アルゴリズムを実装しています。

APIの詳細については、AssociationRules Javaドキュメントを参照してください。

import java.util.Arrays;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.fpm.AssociationRules;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset;

JavaRDD<FPGrowth.FreqItemset<String>> freqItemsets = sc.parallelize(Arrays.asList(
  new FreqItemset<>(new String[] {"a"}, 15L),
  new FreqItemset<>(new String[] {"b"}, 35L),
  new FreqItemset<>(new String[] {"a", "b"}, 12L)
));

AssociationRules arules = new AssociationRules()
  .setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);

for (AssociationRules.Rule<String> rule : results.collect()) {
  System.out.println(
    rule.javaAntecedent() + " => " + rule.javaConsequent() + ", " + rule.confidence());
}
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaAssociationRulesExample.java」にあります。

PrefixSpan

PrefixSpanは、論文「Pei et al., Mining Sequential Patterns by Pattern-Growth: The PrefixSpan Approach」で説明されているシーケンシャルパターンマイニングアルゴリズムです。シーケンシャルパターンマイニング問題の形式化については、参照論文を参照してください。

spark.mllibのPrefixSpan実装は、次のパラメータを取ります。

次の例は、シーケンス(Pei et al.と同じ表記法を使用)でPrefixSpanを実行する方法を示しています。

  <(12)3>
  <1(32)(12)>
  <(12)5>
  <6>

PrefixSpanはPrefixSpanアルゴリズムを実装しています。PrefixSpan.runを呼び出すと、頻出シーケンスとその頻度を格納するPrefixSpanModelが返されます。

APIの詳細については、PrefixSpan ScalaドキュメントおよびPrefixSpanModel Scalaドキュメントを参照してください。

import org.apache.spark.mllib.fpm.PrefixSpan

val sequences = sc.parallelize(Seq(
  Array(Array(1, 2), Array(3)),
  Array(Array(1), Array(3, 2), Array(1, 2)),
  Array(Array(1, 2), Array(5)),
  Array(Array(6))
), 2).cache()
val prefixSpan = new PrefixSpan()
  .setMinSupport(0.5)
  .setMaxPatternLength(5)
val model = prefixSpan.run(sequences)
model.freqSequences.collect().foreach { freqSequence =>
  println(
    s"${freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]")}," +
      s" ${freqSequence.freq}")
}
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/PrefixSpanExample.scala」にあります。

PrefixSpanはPrefixSpanアルゴリズムを実装しています。PrefixSpan.runを呼び出すと、頻出シーケンスとその頻度を格納するPrefixSpanModelが返されます。

APIの詳細については、PrefixSpan JavaドキュメントおよびPrefixSpanModel Javaドキュメントを参照してください。

import java.util.Arrays;
import java.util.List;

import org.apache.spark.mllib.fpm.PrefixSpan;
import org.apache.spark.mllib.fpm.PrefixSpanModel;

JavaRDD<List<List<Integer>>> sequences = sc.parallelize(Arrays.asList(
  Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3)),
  Arrays.asList(Arrays.asList(1), Arrays.asList(3, 2), Arrays.asList(1, 2)),
  Arrays.asList(Arrays.asList(1, 2), Arrays.asList(5)),
  Arrays.asList(Arrays.asList(6))
), 2);
PrefixSpan prefixSpan = new PrefixSpan()
  .setMinSupport(0.5)
  .setMaxPatternLength(5);
PrefixSpanModel<Integer> model = prefixSpan.run(sequences);
for (PrefixSpan.FreqSequence<Integer> freqSeq: model.freqSequences().toJavaRDD().collect()) {
  System.out.println(freqSeq.javaSequence() + ", " + freqSeq.freq());
}
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaPrefixSpanExample.java」にあります。