協調フィルタリング - RDDベースAPI

協調フィルタリング

協調フィルタリングは、レコメンデーションシステムで一般的に使用されます。これらの手法は、ユーザーとアイテムの関連付け行列の欠損エントリを埋めることを目的としています。spark.mllibは現在、モデルベースの協調フィルタリングをサポートしており、ユーザーと製品は少数の潜在因子によって記述され、それらを使用して欠損エントリを予測できます。spark.mllibは、これらの潜在因子を学習するために交互最小二乗法 (ALS)アルゴリズムを使用します。spark.mllibの実装には、次のパラメータがあります。

明示的フィードバック vs 暗黙的フィードバック

行列分解ベースの協調フィルタリングの標準的なアプローチでは、ユーザーとアイテムの行列のエントリを、ユーザーがアイテムに対して与えた明示的な嗜好として扱います。たとえば、ユーザーが映画に評価を与える場合などです。

多くの実世界のユースケースでは、暗黙的フィードバック(例:閲覧、クリック、購入、いいね、共有など)にしかアクセスできないことがよくあります。spark.mllibでそのようなデータを処理するために使用されるアプローチは、「Collaborative Filtering for Implicit Feedback Datasets」から取られています。本質的に、このアプローチは、評価行列を直接モデル化しようとするのではなく、ユーザーアクションの観測における強さを表す数値(クリック数や、映画の視聴に費やした累積時間など)としてデータを扱います。これらの数値は、アイテムに与えられた明示的な評価ではなく、観測されたユーザーの嗜好の信頼度レベルに関連付けられます。その後、モデルは、ユーザーがアイテムに対して持つであろう期待される嗜好を予測するために使用できる潜在因子を見つけようとします。

正則化パラメータのスケーリング

v1.1以降、最小二乗問題の解決において、ユーザー因子を更新する際のユーザーが生成した評価の数、または製品因子を更新する際の製品が受け取った評価の数で、正則化パラメータlambdaをスケーリングします。このアプローチは「ALS-WR」と呼ばれ、「Large-Scale Parallel Collaborative Filtering for the Netflix Prize」という論文で議論されています。これにより、lambdaはデータセットのスケールに依存しにくくなるため、サンプリングされたサブセットから学習した最適なパラメータをフルデータセットに適用しても、同様のパフォーマンスが期待できます。

次の例では、評価データをロードします。各行は、ユーザー、製品、および評価で構成されます。デフォルトのALS.train()メソッドを使用しており、これは評価が明示的であることを前提としています。評価予測の平均二乗誤差を測定することによって、レコメンデーションを評価します。

APIの詳細については、ALS Pythonドキュメントを参照してください。

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/mllib/recommendation_example.py」にあります。

評価行列が他の情報源から派生している場合(つまり、他のシグナルから推測されている場合)、trainImplicitメソッドを使用してより良い結果を得ることができます。

# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)

次の例では、評価データをロードします。各行は、ユーザー、製品、および評価で構成されます。デフォルトのALS.train()メソッドを使用しており、これは評価が明示的であることを前提としています。評価予測の平均二乗誤差を測定することによって、レコメンデーションモデルを評価します。

APIの詳細については、ALS Scalaドキュメントを参照してください。

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println(s"Mean Squared Error = $MSE")

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/mllib/RecommendationExample.scala」にあります。

評価行列が別の情報源から派生している場合(つまり、他のシグナルから推測されている場合)、trainImplicitメソッドを使用してより良い結果を得ることができます。

val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)

MLlibのすべてのメソッドはJavaフレンドリーな型を使用しているため、Scalaでインポートして呼び出すのと同じ方法でJavaでインポートして呼び出すことができます。唯一の注意点は、メソッドがScala RDDオブジェクトを受け取るのに対し、Spark Java APIは別のJavaRDDクラスを使用することです。Java RDDをScala RDDに変換するには、JavaRDDオブジェクトで.rdd()を呼び出します。Scalaで提供される例と同等のスタンドアロンアプリケーションの例を以下に示します。

APIの詳細については、ALS Javaドキュメントを参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;

SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(s -> {
  String[] sarray = s.split(",");
  return new Rating(Integer.parseInt(sarray[0]),
    Integer.parseInt(sarray[1]),
    Double.parseDouble(sarray[2]));
});

// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts =
  ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
  model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
      .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
    ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
  .join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
  double err = pair._1() - pair._2();
  return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);

// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
  "target/tmp/myCollaborativeFilter");
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java」にあります。

上記のアプリケーションを実行するには、Sparkクイックスタートガイドの「スタンドアロンアプリケーション」セクションに記載されている手順に従ってください。ビルドファイルに依存関係としてspark-mllibを含めることも忘れないでください。

チュートリアル

Spark Summit 2014のトレーニング演習には、spark.mllibを使用したパーソナライズされた映画レコメンデーションのハンズオンチュートリアルが含まれています。