協調フィルタリング
協調フィルタリング
協調フィルタリングは、レコメンダーシステムで一般的に使用されます。これらの技術は、ユーザーとアイテムの関連付け行列の欠落しているエントリを埋めることを目的としています。spark.ml は現在、モデルベースの協調フィルタリングをサポートしており、ユーザーと製品は、欠落しているエントリを予測するために使用できる少数の潜在因子によって記述されます。spark.ml は、これらの潜在因子を学習するために 交互最小二乗法 (ALS) アルゴリズムを使用します。spark.ml の実装には、次のパラメータがあります。
- numBlocks は、計算を並列化するためにユーザーとアイテムを分割するブロックの数です (デフォルトは 10)。
- rank は、モデルにおける潜在因子の数です (デフォルトは 10)。
- maxIter は、実行する最大反復回数です (デフォルトは 10)。
- regParam は、ALS における正則化パラメータを指定します (デフォルトは 1.0)。
- implicitPrefs は、明示的フィードバック ALS バリアントを使用するか、暗黙的フィードバック データ用に調整されたものを使用するかを指定します (デフォルトは
falseで、明示的フィードバック を使用することを意味します)。 - alpha は、ALS の暗黙的フィードバックバリアントに適用されるパラメータであり、観測された嗜好のベースライン信頼度を制御します (デフォルトは 1.0)。
- nonnegative は、最小二乗法に非負制約を使用するかどうかを指定します (デフォルトは
false)。
注意: ALS の DataFrame ベース API は、現在、ユーザーおよびアイテム ID に整数のみをサポートしています。ユーザーおよびアイテム ID 列には他の数値型もサポートされていますが、ID は整数値の範囲内である必要があります。
明示的フィードバックと暗黙的フィードバック
協調フィルタリングに基づく行列分解の標準的なアプローチでは、ユーザー-アイテム行列のエントリを、ユーザーがアイテムに対して与えた明示的な嗜好として扱います。たとえば、ユーザーが映画に評価を与える場合などです。
多くの実世界のユースケースでは、暗黙的フィードバック (例: 表示、クリック、購入、いいね、共有など) にしかアクセスできないことがよくあります。spark.ml でそのようなデータを処理するために使用されるアプローチは、「Collaborative Filtering for Implicit Feedback Datasets」から取られています。本質的に、評価行列を直接モデル化しようとするのではなく、このアプローチでは、データをユーザーアクションの観測における強さを表す数値 (クリック数や、誰かが映画を視聴した累積時間など) として扱います。これらの数値は、アイテムに与えられた明示的な評価ではなく、観測されたユーザーの嗜好における信頼のレベルに関連付けられます。次に、モデルは、ユーザーがアイテムに対して期待する嗜好を予測するために使用できる潜在因子を見つけようとします。
正則化パラメータのスケーリング
ユーザー因子を更新する際のユーザーが生成した評価の数、または製品因子を更新する際の製品が受け取った評価の数で、各最小二乗問題の解を割ることにより、正則化パラメータ regParam をスケーリングします。このアプローチは「ALS-WR」と名付けられ、「Large-Scale Parallel Collaborative Filtering for the Netflix Prize」という論文で議論されています。これにより、regParam はデータセットのスケールへの依存度が低くなり、サンプリングされたサブセットから学習した最適なパラメータをフルデータセットに適用して同等のパフォーマンスを期待できます。
コールドスタート戦略
ALSModel を使用して予測を行う場合、テストデータセットに、モデルのトレーニング中に存在しなかったユーザーやアイテムが存在することがよくあります。これは通常、2 つのシナリオで発生します。
- 本番環境では、評価履歴がなく、モデルがトレーニングされていない新しいユーザーまたはアイテムの場合 (これが「コールドスタート問題」です)。
- クロスバリデーション中、データはトレーニングセットと評価セットに分割されます。Spark の
CrossValidatorまたはTrainValidationSplitのような単純なランダム分割を使用すると、実際には、評価セットにトレーニングセットに存在しないユーザーやアイテムが存在することがよくあります。
デフォルトでは、Spark は、モデルにユーザーまたはアイテムの因子が存在しない場合に、ALSModel.transform 中に NaN 予測を割り当てます。これは本番システムでは役立つ場合があります。新しいユーザーまたはアイテムであることを示し、システムが予測として使用するフォールバックを決定できるためです。
しかし、これはクロスバリデーション中には望ましくありません。予測値に NaN が含まれていると、評価指標の結果 (たとえば、RegressionEvaluator を使用する場合) が NaN になり、モデル選択が不可能になります。
Spark では、ユーザーは coldStartStrategy パラメータを「drop」に設定して、NaN 値を含む予測の DataFrame からそのような行を削除できます。評価指標は、非 NaN データに対して計算され、有効になります。このパラメータの使用方法は、以下の例で示されています。
注意: 現在サポートされているコールドスタート戦略は、「nan」(上記のデフォルトの動作)と「drop」です。将来的に他の戦略がサポートされる可能性があります。
例
次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、およびタイムスタンプで構成されます。次に、デフォルトで評価が明示的である (implicitPrefs が False) と想定する ALS モデルをトレーニングします。評価は、評価予測の二乗平均平方根誤差を測定することによって行います。
API の詳細については、ALS Python ドキュメントを参照してください。
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)評価行列が他の情報源から派生している場合 (つまり、他のシグナルから推測される場合) は、implicitPrefs を True に設定すると、より良い結果が得られます。
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
userCol="userId", itemCol="movieId", ratingCol="rating")次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、およびタイムスタンプで構成されます。次に、デフォルトで評価が明示的である (implicitPrefs が false) と想定する ALS モデルをトレーニングします。評価は、評価予測の二乗平均平方根誤差を測定することによって行います。
API の詳細については、ALS Scala ドキュメントを参照してください。
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)評価行列が他の情報源から派生している場合 (つまり、他のシグナルから推測される場合) は、implicitPrefs を true に設定すると、より良い結果が得られます。
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、およびタイムスタンプで構成されます。次に、デフォルトで評価が明示的である (implicitPrefs が false) と想定する ALS モデルをトレーニングします。評価は、評価予測の二乗平均平方根誤差を測定することによって行います。
API の詳細については、ALS Java ドキュメントを参照してください。
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
public static class Rating implements Serializable {
private int userId;
private int movieId;
private float rating;
private long timestamp;
public Rating() {}
public Rating(int userId, int movieId, float rating, long timestamp) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
this.timestamp = timestamp;
}
public int getUserId() {
return userId;
}
public int getMovieId() {
return movieId;
}
public float getRating() {
return rating;
}
public long getTimestamp() {
return timestamp;
}
public static Rating parseRating(String str) {
String[] fields = str.split("::");
if (fields.length != 4) {
throw new IllegalArgumentException("Each line must contain 4 fields");
}
int userId = Integer.parseInt(fields[0]);
int movieId = Integer.parseInt(fields[1]);
float rating = Float.parseFloat(fields[2]);
long timestamp = Long.parseLong(fields[3]);
return new Rating(userId, movieId, rating, timestamp);
}
}
JavaRDD<Rating> ratingsRDD = spark
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];
// Build the recommendation model using ALS on the training data
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
ALSModel model = als.fit(training);
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);評価行列が他の情報源から派生している場合 (つまり、他のシグナルから推測される場合) は、implicitPrefs を true に設定すると、より良い結果が得られます。
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");詳細については、R API ドキュメントを参照してください。
# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df
# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
itemCol = "movieId", ratingCol = "rating")
# Model summary
summary(model)
# Prediction
predictions <- predict(model, test)
head(predictions)