協調フィルタリング - RDDベースAPI

協調フィルタリング

協調フィルタリング は、レコメンデーションシステムで一般的に使用されます。これらの手法は、ユーザーとアイテムの関連付け行列の欠損値を埋めることを目的としています。spark.mllib は現在、ユーザーと製品を、欠損値の予測に使用できる少数の潜在因子で記述する、モデルベースの協調フィルタリングをサポートしています。spark.mllib は、これらの潜在因子を学習するために交互最小二乗法 (ALS) アルゴリズムを使用しています。spark.mllib の実装には、次のパラメータがあります。

明示的フィードバック対暗黙的フィードバック

行列分解ベースの協調フィルタリングの標準的なアプローチでは、ユーザーとアイテムの行列のエントリを、ユーザーがアイテムに与えた明示的な選好(たとえば、ユーザーが映画に与えた評価)として扱います。

多くの現実世界のユースケースでは、暗黙的フィードバック(例:閲覧、クリック、購入、いいね、共有など)のみにアクセスできることが一般的です。このようなデータに対処するためにspark.mllib で使用されるアプローチは、Collaborative Filtering for Implicit Feedback Datasets から取られています。基本的に、評価行列を直接モデル化しようとするのではなく、このアプローチは、ユーザーアクションの観測における強度(クリック数や、ユーザーが映画の視聴に費やした累積時間など)を表す数値としてデータを扱います。これらの数値は、アイテムに与えられた明示的な評価ではなく、観測されたユーザー選好の信頼レベルに関連付けられます。その後、モデルは、ユーザーのアイテムに対する予想される選好を予測するために使用できる潜在因子を見つけようとします。

正則化パラメータのスケーリング

v1.1 以降、ユーザー因子の更新におけるユーザーが生成した評価の数、または製品因子の更新における製品が受け取った評価の数によって、各最小二乗問題を解く際の正則化パラメータlambda をスケールしています。このアプローチは「ALS-WR」と呼ばれ、「Large-Scale Parallel Collaborative Filtering for the Netflix Prize」で説明されています。lambda をデータセットの規模に依存しにくくするため、サンプリングされたサブセットから学習した最適なパラメータを完全なデータセットに適用し、同様のパフォーマンスを期待できます。

次の例では、評価データを読み込みます。各行は、ユーザー、製品、評価で構成されています。評価は明示的であると仮定するデフォルトの ALS.train() メソッドを使用します。評価予測の平均二乗誤差を測定することにより、レコメンデーションを評価します。

API の詳細については、ALS Python ドキュメント を参照してください。

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# Load and parse the data
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(','))\
    .map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/mllib/recommendation_example.py" にあります。

評価行列が他の情報源から導出されている場合(つまり、他のシグナルから推論されている場合)、trainImplicit メソッドを使用してより良い結果を得ることができます。

# Build the recommendation model using Alternating Least Squares based on implicit ratings
model = ALS.trainImplicit(ratings, rank, numIterations, alpha=0.01)

次の例では、評価データを読み込みます。各行は、ユーザー、製品、評価で構成されています。評価は明示的であると仮定するデフォルトのALS.train()メソッドを使用します。評価予測の平均二乗誤差を測定することにより、レコメンデーションモデルを評価します。

API の詳細については、ALS Scala ドキュメント を参照してください。

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println(s"Mean Squared Error = $MSE")

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/mllib/RecommendationExample.scala" にあります。

評価行列が他の情報源から導出されている場合(つまり、他のシグナルから推論されている場合)、trainImplicit メソッドを使用してより良い結果を得ることができます。

val alpha = 0.01
val lambda = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha)

MLlib のすべてのメソッドは Java フレンドリーな型を使用しているため、Scala と同じように Java でインポートして呼び出すことができます。唯一の違いは、メソッドが Scala RDD オブジェクトを受け取るのに対し、Spark Java API は別のJavaRDDクラスを使用することです。JavaRDDオブジェクトで.rdd()を呼び出すことで、Java RDD を Scala RDD に変換できます。Scala の例と同等の、自己完結型のアプリケーション例を以下に示します。

API の詳細については、ALS Java ドキュメント を参照してください。

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.apache.spark.SparkConf;

SparkConf conf = new SparkConf().setAppName("Java Collaborative Filtering Example");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Load and parse the data
String path = "data/mllib/als/test.data";
JavaRDD<String> data = jsc.textFile(path);
JavaRDD<Rating> ratings = data.map(s -> {
  String[] sarray = s.split(",");
  return new Rating(Integer.parseInt(sarray[0]),
    Integer.parseInt(sarray[1]),
    Double.parseDouble(sarray[2]));
});

// Build the recommendation model using ALS
int rank = 10;
int numIterations = 10;
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), rank, numIterations, 0.01);

// Evaluate the model on rating data
JavaRDD<Tuple2<Object, Object>> userProducts =
  ratings.map(r -> new Tuple2<>(r.user(), r.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(
  model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD()
      .map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating()))
);
JavaRDD<Tuple2<Double, Double>> ratesAndPreds = JavaPairRDD.fromJavaRDD(
    ratings.map(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())))
  .join(predictions).values();
double MSE = ratesAndPreds.mapToDouble(pair -> {
  double err = pair._1() - pair._2();
  return err * err;
}).mean();
System.out.println("Mean Squared Error = " + MSE);

// Save and load model
model.save(jsc.sc(), "target/tmp/myCollaborativeFilter");
MatrixFactorizationModel sameModel = MatrixFactorizationModel.load(jsc.sc(),
  "target/tmp/myCollaborativeFilter");
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/mllib/JavaRecommendationExample.java" にあります。

上記のアプリケーションを実行するには、Spark クイックスタートガイドの自己完結型アプリケーションセクションに記載されている手順に従ってください。また、依存関係としてビルドファイルにspark-mllibを含めるようにしてください。

チュートリアル

トレーニング演習(Spark Summit 2014)には、spark.mllibを使用したパーソナライズされた映画レコメンデーションに関する実践的なチュートリアルが含まれています。