協調フィルタリング

協調フィルタリング

協調フィルタリングは、レコメンダーシステムによく使用されます。これらの技術は、ユーザーとアイテムの関連行列の欠損エントリを埋めることを目的としています。spark.mlは現在、モデルベースの協調フィルタリングをサポートしており、ユーザーと製品は、欠落しているエントリを予測するために使用できる少数の潜在因子によって記述されます。spark.mlは、これらの潜在因子を学習するために、交互最小二乗法(ALS)アルゴリズムを使用します。spark.mlの実装には、以下のパラメータがあります。

注: ALS の DataFrame ベースの API は、現在、ユーザー ID とアイテム ID に整数のみをサポートしています。ユーザー ID 列とアイテム ID 列には他の数値型もサポートされていますが、ID は整数値の範囲内である必要があります。

明示的フィードバック vs. 暗黙的フィードバック

行列分解ベースの協調フィルタリングへの標準的なアプローチでは、ユーザーとアイテムの行列のエントリを、ユーザーからアイテムに与えられた明示的な好み(たとえば、ユーザーが映画に評価を与えるなど)として扱います。

多くの現実世界のユースケースでは、暗黙的フィードバック(例:閲覧数、クリック数、購入数、いいね、共有など)にのみアクセスできるのが一般的です。spark.mlでこのようなデータを処理するために使用されるアプローチは、暗黙的フィードバックデータセットの協調フィルタリングから採用されています。基本的に、このアプローチは、評価の行列を直接モデル化しようとするのではなく、データがユーザーのアクションの観測における強さを表す数値(クリック数や、誰かが映画を見た累積時間など)として扱います。これらの数値は、アイテムに与えられた明示的な評価ではなく、観測されたユーザーの好みの信頼度レベルに関連付けられます。次に、モデルは、ユーザーのアイテムに対する予想される好みを予測するために使用できる潜在的な因子を見つけようとします。

正則化パラメータのスケーリング

ユーザー因子を更新する際にユーザーが生成した評価の数、または製品因子を更新する際に製品が受け取った評価の数によって、各最小二乗問題を解く際に正則化パラメータregParamをスケーリングします。このアプローチは「ALS-WR」と呼ばれ、「Netflix Prize のための大規模並列協調フィルタリング」という論文で議論されています。これにより、regParamはデータセットのスケールに依存しなくなるため、サンプリングされたサブセットから学習した最適なパラメータをフルデータセットに適用して、同様のパフォーマンスを期待できます。

コールドスタート戦略

ALSModelを使用して予測を行う場合、モデルのトレーニング中に存在しなかったテストデータセットのユーザーやアイテムに遭遇することがよくあります。これは通常、次の 2 つのシナリオで発生します。

  1. 本番環境では、評価履歴がなく、モデルがトレーニングされていない新しいユーザーまたはアイテムの場合(これが「コールドスタート問題」です)。
  2. クロスバリデーション中は、データはトレーニングセットと評価セットに分割されます。Spark のCrossValidatorTrainValidationSplitのように単純なランダム分割を使用する場合、トレーニングセットにない評価セットでユーザーやアイテムに遭遇することは非常に一般的です。

デフォルトでは、ユーザーまたはアイテムの要素がモデルに存在しない場合、Spark はALSModel.transform中にNaN予測を割り当てます。これは、新しいユーザーまたはアイテムを示すため、本番システムで役立つ場合があります。したがって、システムは予測として使用するフォールバックについて決定できます。

ただし、NaNで予測された値は評価指標(たとえば、RegressionEvaluatorを使用する場合など)でNaN結果になるため、クロスバリデーション中は望ましくありません。これにより、モデル選択が不可能になります。

Spark では、ユーザーがcoldStartStrategyパラメータを「drop」に設定して、NaN値を含む予測のDataFrame内の行を削除できます。次に、評価指標はNaNでないデータで計算され、有効になります。このパラメータの使用法については、以下の例で説明します。

注:現在サポートされているコールドスタート戦略は、「nan」(上記のデフォルトの動作)と「drop」です。将来的には、他の戦略がサポートされる可能性があります。

次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、タイムスタンプで構成されています。次に、デフォルトでは評価が明示的であると想定する ALS モデルをトレーニングします(implicitPrefsFalseです)。評価予測の二乗平均平方根誤差を測定して、推奨モデルを評価します。

API の詳細については、ALS Python ドキュメントを参照してください。

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/python/ml/als_example.py" にあります。

評価行列が別の情報源から導出された場合(つまり、他の信号から推測された場合)、implicitPrefsTrueに設定して、より良い結果を得ることができます

als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")

次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、タイムスタンプで構成されています。次に、デフォルトでは評価が明示的であると想定する ALS モデルをトレーニングします(implicitPrefsfalseです)。評価予測の二乗平均平方根誤差を測定して、推奨モデルを評価します。

API の詳細については、ALS Scala ドキュメントを参照してください。

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Build the recommendation model using ALS on the training data
val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")
val model = als.fit(training)

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop")
val predictions = model.transform(test)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction")
val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")

// Generate top 10 movie recommendations for each user
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)

// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/ml/ALSExample.scala" にあります。

評価行列が別の情報源から導出された場合(つまり、他の信号から推測された場合)、implicitPrefstrueに設定して、より良い結果を得ることができます

val als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating")

次の例では、MovieLens データセットから評価データをロードします。各行は、ユーザー、映画、評価、タイムスタンプで構成されています。次に、デフォルトでは評価が明示的であると想定する ALS モデルをトレーニングします(implicitPrefsfalseです)。評価予測の二乗平均平方根誤差を測定して、推奨モデルを評価します。

API の詳細については、ALS Java ドキュメントを参照してください。

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.recommendation.ALS;
import org.apache.spark.ml.recommendation.ALSModel;

public static class Rating implements Serializable {
  private int userId;
  private int movieId;
  private float rating;
  private long timestamp;

  public Rating() {}

  public Rating(int userId, int movieId, float rating, long timestamp) {
    this.userId = userId;
    this.movieId = movieId;
    this.rating = rating;
    this.timestamp = timestamp;
  }

  public int getUserId() {
    return userId;
  }

  public int getMovieId() {
    return movieId;
  }

  public float getRating() {
    return rating;
  }

  public long getTimestamp() {
    return timestamp;
  }

  public static Rating parseRating(String str) {
    String[] fields = str.split("::");
    if (fields.length != 4) {
      throw new IllegalArgumentException("Each line must contain 4 fields");
    }
    int userId = Integer.parseInt(fields[0]);
    int movieId = Integer.parseInt(fields[1]);
    float rating = Float.parseFloat(fields[2]);
    long timestamp = Long.parseLong(fields[3]);
    return new Rating(userId, movieId, rating, timestamp);
  }
}

JavaRDD<Rating> ratingsRDD = spark
  .read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
  .map(Rating::parseRating);
Dataset<Row> ratings = spark.createDataFrame(ratingsRDD, Rating.class);
Dataset<Row>[] splits = ratings.randomSplit(new double[]{0.8, 0.2});
Dataset<Row> training = splits[0];
Dataset<Row> test = splits[1];

// Build the recommendation model using ALS on the training data
ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");
ALSModel model = als.fit(training);

// Evaluate the model by computing the RMSE on the test data
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
model.setColdStartStrategy("drop");
Dataset<Row> predictions = model.transform(test);

RegressionEvaluator evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating")
  .setPredictionCol("prediction");
double rmse = evaluator.evaluate(predictions);
System.out.println("Root-mean-square error = " + rmse);

// Generate top 10 movie recommendations for each user
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);

// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
完全なサンプルコードは、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/ml/JavaALSExample.java" にあります。

評価行列が別の情報源から導出された場合(つまり、他の信号から推測された場合)、implicitPrefstrueに設定して、より良い結果を得ることができます

ALS als = new ALS()
  .setMaxIter(5)
  .setRegParam(0.01)
  .setImplicitPrefs(true)
  .setUserCol("userId")
  .setItemCol("movieId")
  .setRatingCol("rating");

詳細については、R API ドキュメントを参照してください。

# Load training data
data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0),
             list(1, 2, 4.0), list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(data, c("userId", "movieId", "rating"))
training <- df
test <- df

# Fit a recommendation model using ALS with spark.als
model <- spark.als(training, maxIter = 5, regParam = 0.01, userCol = "userId",
                   itemCol = "movieId", ratingCol = "rating")

# Model summary
summary(model)

# Prediction
predictions <- predict(model, test)
head(predictions)
完全なサンプルコードは、Spark リポジトリの "examples/src/main/r/ml/als.R" にあります。