協調フィルタリング
協調フィルタリング
協調フィルタリングは、レコメンダーシステムによく使用されます。これらの技術は、ユーザーとアイテムの関連行列の欠損エントリを埋めることを目的としています。spark.ml
は現在、モデルベースの協調フィルタリングをサポートしており、ユーザーと製品は、欠落しているエントリを予測するために使用できる少数の潜在因子によって記述されます。spark.ml
は、これらの潜在因子を学習するために、交互最小二乗法(ALS)アルゴリズムを使用します。spark.ml
の実装には、以下のパラメータがあります。
- numBlocksは、計算を並列化するためにユーザーとアイテムが分割されるブロックの数です(デフォルトは10)。
- rankは、モデル内の潜在因子の数です(デフォルトは10)。
- maxIterは、実行する最大イテレーション数です(デフォルトは10)。
- regParamは、ALS の正則化パラメータを指定します(デフォルトは 1.0)。
- implicitPrefsは、明示的フィードバックALS バリアントを使用するか、暗黙的フィードバックデータに適応したバリアントを使用するかを指定します(デフォルトは
false
。これは、明示的フィードバックを使用することを意味します)。 - alphaは、ALS の暗黙的フィードバックバリアントに適用可能なパラメータであり、優先度の観測におけるベースラインの信頼性を制御します(デフォルトは 1.0)。
- nonnegativeは、最小二乗法に非負制約を使用するかどうかを指定します(デフォルトは
false
)。
注: ALS の DataFrame ベースの API は、現在、ユーザー ID とアイテム ID に整数のみをサポートしています。ユーザー ID 列とアイテム ID 列には他の数値型もサポートされていますが、ID は整数値の範囲内である必要があります。
明示的フィードバック vs. 暗黙的フィードバック
行列分解ベースの協調フィルタリングへの標準的なアプローチでは、ユーザーとアイテムの行列のエントリを、ユーザーからアイテムに与えられた明示的な好み(たとえば、ユーザーが映画に評価を与えるなど)として扱います。
多くの現実世界のユースケースでは、暗黙的フィードバック(例:閲覧数、クリック数、購入数、いいね、共有など)にのみアクセスできるのが一般的です。spark.ml
でこのようなデータを処理するために使用されるアプローチは、暗黙的フィードバックデータセットの協調フィルタリングから採用されています。基本的に、このアプローチは、評価の行列を直接モデル化しようとするのではなく、データがユーザーのアクションの観測における強さを表す数値(クリック数や、誰かが映画を見た累積時間など)として扱います。これらの数値は、アイテムに与えられた明示的な評価ではなく、観測されたユーザーの好みの信頼度レベルに関連付けられます。次に、モデルは、ユーザーのアイテムに対する予想される好みを予測するために使用できる潜在的な因子を見つけようとします。
正則化パラメータのスケーリング
ユーザー因子を更新する際にユーザーが生成した評価の数、または製品因子を更新する際に製品が受け取った評価の数によって、各最小二乗問題を解く際に正則化パラメータregParam
をスケーリングします。このアプローチは「ALS-WR」と呼ばれ、「Netflix Prize のための大規模並列協調フィルタリング」という論文で議論されています。これにより、regParam
はデータセットのスケールに依存しなくなるため、サンプリングされたサブセットから学習した最適なパラメータをフルデータセットに適用して、同様のパフォーマンスを期待できます。
コールドスタート戦略
ALSModel
を使用して予測を行う場合、モデルのトレーニング中に存在しなかったテストデータセットのユーザーやアイテムに遭遇することがよくあります。これは通常、次の 2 つのシナリオで発生します。
- 本番環境では、評価履歴がなく、モデルがトレーニングされていない新しいユーザーまたはアイテムの場合(これが「コールドスタート問題」です)。
- クロスバリデーション中は、データはトレーニングセットと評価セットに分割されます。Spark の
CrossValidator
やTrainValidationSplit
のように単純なランダム分割を使用する場合、トレーニングセットにない評価セットでユーザーやアイテムに遭遇することは非常に一般的です。
デフォルトでは、ユーザーまたはアイテムの要素がモデルに存在しない場合、Spark はALSModel.transform
中にNaN
予測を割り当てます。これは、新しいユーザーまたはアイテムを示すため、本番システムで役立つ場合があります。したがって、システムは予測として使用するフォールバックについて決定できます。
ただし、NaN
で予測された値は評価指標(たとえば、RegressionEvaluator
を使用する場合など)でNaN
結果になるため、クロスバリデーション中は望ましくありません。これにより、モデル選択が不可能になります。
Spark では、ユーザーがcoldStartStrategy
パラメータを「drop」に設定して、NaN
値を含む予測のDataFrame
内の行を削除できます。次に、評価指標はNaN
でないデータで計算され、有効になります。このパラメータの使用法については、以下の例で説明します。
注:現在サポートされているコールドスタート戦略は、「nan」(上記のデフォルトの動作)と「drop」です。将来的には、他の戦略がサポートされる可能性があります。
例
次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、タイムスタンプで構成されています。次に、デフォルトでは評価が明示的であると想定する ALS モデルをトレーニングします(implicitPrefs
はFalse
です)。評価予測の二乗平均平方根誤差を測定して、推奨モデルを評価します。
API の詳細については、ALS
Python ドキュメントを参照してください。
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])
# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
coldStartStrategy="drop")
model = als.fit(training)
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
評価行列が別の情報源から導出された場合(つまり、他の信号から推測された場合)、implicitPrefs
をTrue
に設定して、より良い結果を得ることができます
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
userCol="userId", itemCol="movieId", ratingCol="rating")
次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、タイムスタンプで構成されています。次に、デフォルトでは評価が明示的であると想定する ALS モデルをトレーニングします(implicitPrefs
はfalse
です)。評価予測の二乗平均平方根誤差を測定して、推奨モデルを評価します。
API の詳細については、ALS
Scala ドキュメントを参照してください。
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
val fields = str.split("::")
assert(fields.size == 4)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
// Build the recommendation model using ALS on the training data
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
val model = als.fit(training)
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")
// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
評価行列が別の情報源から導出された場合(つまり、他の信号から推測された場合)、implicitPrefs
をtrue
に設定して、より良い結果を得ることができます
val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")
次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、タイムスタンプで構成されています。次に、デフォルトでは評価が明示的であると想定する ALS モデルをトレーニングします(implicitPrefs
はfalse
です)。評価予測の二乗平均平方根誤差を測定して、推奨モデルを評価します。
API の詳細については、ALS
Java ドキュメントを参照してください。
import java.io.Serializable;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;
public static class Rating implements Serializable {
private int userId;
private int movieId;
private float rating;
private long timestamp;
public Rating() {}
public Rating(int userId, int movieId, float rating, long timestamp) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
this.timestamp = timestamp;
}
public int getUserId() {
return userId;
}
public int getMovieId() {
return movieId;
}
public float getRating() {
return rating;
}
public long getTimestamp() {
return timestamp;
}
public static Rating parseRating(String str) {
String[] fields = str.split("::");
if (fields.length != 4) {
throw new IllegalArgumentException("Each line must contain 4 fields");
}
int userId = Integer.parseInt(fields[0]);
int movieId = Integer.parseInt(fields[1]);
float rating = Float.parseFloat(fields[2]);
long timestamp = Long.parseLong(fields[3]);
return new Rating(userId, movieId, rating, timestamp);
}
}
JavaRDD<Rating> ratingsRDD = spark
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];
// Build the recommendation model using ALS on the training data
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
ALSModel model = als.fit(training);
// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);
RegressionEvaluator evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);
// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
評価行列が別の情報源から導出された場合(つまり、他の信号から推測された場合)、implicitPrefs
をtrue
に設定して、より良い結果を得ることができます
ALS als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating");
詳細については、R API ドキュメントを参照してください。
# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df
# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
itemCol = "movieId", ratingCol = "rating")
# Model summary
summary(model)
# Prediction
predictions <- predict(model, test)
head(predictions)