SparkR (R on Spark)

概要

SparkRは、RからApache Sparkを使用するための軽量フロントエンドを提供するRパッケージです。Spark 3.5.1では、SparkRは選択、フィルタリング、集計などの操作(Rデータフレーム、dplyrと同様)をサポートする分散データフレームの実装を提供しますが、大規模なデータセットを対象としています。SparkRは、MLlibを使用した分散機械学習もサポートしています。

SparkDataFrame

SparkDataFrameは、名前付き列に編成されたデータの分散コレクションです。これは概念的にはリレーショナルデータベースのテーブルまたはRのデータフレームと同等ですが、舞台裏ではより豊富な最適化が行われています。SparkDataFrameは、構造化データファイル、Hiveのテーブル、外部データベース、既存のローカルRデータフレームなど、幅広いソースから構築できます。

このページのすべての例では、RまたはSparkディストリビューションに含まれているサンプルデータを使用しており、./bin/sparkRシェルを使用して実行できます。

起動: SparkSession

SparkRへのエントリポイントは、RプログラムをSparkクラスターに接続するSparkSessionです。sparkR.sessionを使用してSparkSessionを作成し、アプリケーション名、依存するSparkパッケージなどのオプションを渡すことができます。さらに、SparkSessionを介してSparkDataFrameを操作することもできます。sparkRシェルから作業している場合、SparkSessionはすでに作成されているはずであるため、sparkR.sessionを呼び出す必要はありません。

sparkR.session()

RStudioからの起動

RStudioからSparkRを起動することもできます。RStudio、Rシェル、Rscript、またはその他のR IDEからRプログラムをSparkクラスターに接続できます。開始するには、SPARK_HOMEが環境で設定されていることを確認し(Sys.getenvで確認できます)、SparkRパッケージをロードし、以下のようにsparkR.sessionを呼び出します。Sparkのインストールがチェックされ、見つからない場合は、自動的にダウンロードおよびキャッシュされます。または、install.sparkを手動で実行することもできます。

sparkR.sessionの呼び出しに加えて、特定のSparkドライバープロパティを指定することもできます。通常、これらのアプリケーションプロパティおよびランタイム環境は、ドライバーJVMプロセスが開始されているため、プログラムで設定することはできません。この場合、SparkRがこれを行います。それらを設定するには、sparkR.session()へのsparkConfig引数で、他の構成プロパティと同様にそれらを渡します。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

RStudioからsparkR.sessionsparkConfigに設定できる、次のSparkドライバープロパティがあります。

プロパティ名プロパティグループspark-submit相当
spark.master アプリケーションプロパティ --master
spark.kerberos.keytab アプリケーションプロパティ --keytab
spark.kerberos.principal アプリケーションプロパティ --principal
spark.driver.memory アプリケーションプロパティ --driver-memory
spark.driver.extraClassPath ランタイム環境 --driver-class-path
spark.driver.extraJavaOptions ランタイム環境 --driver-java-options
spark.driver.extraLibraryPath ランタイム環境 --driver-library-path

SparkDataFrameの作成

SparkSessionを使用すると、アプリケーションはローカルRデータフレーム、Hiveテーブル、または他のデータソースからSparkDataFrameを作成できます。

ローカルデータフレームから

データフレームを作成する最も簡単な方法は、ローカルRデータフレームをSparkDataFrameに変換することです。具体的には、as.DataFrameまたはcreateDataFrameを使用し、ローカルRデータフレームを渡してSparkDataFrameを作成できます。例として、以下はRのfaithfulデータセットを使用してSparkDataFrameを作成します。

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

データソースから

SparkRは、SparkDataFrameインターフェイスを介してさまざまなデータソースでの操作をサポートしています。このセクションでは、データソースを使用したデータのロードと保存の一般的な方法について説明します。Spark SQLプログラミングガイドで、組み込みデータソースで使用できる特定のオプションを確認できます。

データソースからSparkDataFrameを作成するための一般的な方法は、read.dfです。このメソッドは、ロードするファイルのパスとデータソースのタイプを受け取り、現在アクティブなSparkSessionが自動的に使用されます。SparkRは、JSON、CSV、Parquetファイルの読み取りをネイティブにサポートしており、サードパーティプロジェクトなどのソースから入手できるパッケージを使用することで、Avroのような一般的なファイル形式のデータソースコネクタを見つけることができます。これらのパッケージは、spark-submitまたはsparkRコマンドで--packagesを指定するか、インタラクティブRシェルまたはRStudioでSparkSessionを初期化する際にsparkPackagesパラメータを指定することで追加できます。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.1")

JSON入力ファイルを使用してデータソースを使用する方法を見てみましょう。ここで使用されるファイルは、典型的なJSONファイルではないことに注意してください。ファイル内の各行には、個別の、自己完結型の有効なJSONオブジェクトが含まれている必要があります。詳細については、JSON Linesテキスト形式 (改行区切りJSONとも呼ばれる)を参照してください。その結果、通常の複数行のJSONファイルはほとんどの場合失敗します。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

データソースAPIは、CSV形式の入力ファイルをネイティブにサポートしています。詳細については、SparkRのread.df APIドキュメントを参照してください。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

データソースAPIを使用して、SparkDataFrameを複数のファイル形式で保存することもできます。たとえば、前の例のSparkDataFrameをwrite.dfを使用してParquetファイルに保存できます。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

Hiveテーブルから

HiveテーブルからSparkDataFrameを作成することもできます。これを行うには、Hive MetaStoreのテーブルにアクセスできるHiveサポート付きのSparkSessionを作成する必要があります。SparkがHiveサポート付きでビルドされている必要があり、詳細についてはSQLプログラミングガイドに記載されています。SparkRでは、デフォルトで、Hiveサポートが有効な(enableHiveSupport = TRUE)SparkSessionを作成しようとします。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrameの操作

SparkDataFrameは、構造化データ処理を行うための多数の関数をサポートしています。ここでは、基本的な例をいくつか示します。APIドキュメントで完全なリストを確認できます

行、列の選択

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

グループ化、集計

SparkRデータフレームは、グループ化後のデータを集計するために一般的に使用される多くの関数をサポートしています。たとえば、以下に示すように、faithfulデータセットでのwaiting時間のヒストグラムを計算できます

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

標準的な集計に加えて、SparkRはOLAPキューブ演算子cubeをサポートしています

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

およびrollup

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

列の操作

SparkRは、データ処理および集計中に列に直接適用できる多数の関数も提供します。以下の例は、基本的な算術関数の使用法を示しています。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

ユーザー定義関数の適用

SparkRでは、いくつかの種類のユーザー定義関数をサポートしています

dapply または dapplyCollect を使用して大規模データセットで特定の関数を実行

dapply

SparkDataFrameの各パーティションに関数を適用します。SparkDataFrameの各パーティションに適用される関数は、パラメータを1つだけ持つ必要があり、各パーティションに対応するdata.frameが渡されます。関数の出力はdata.frameである必要があります。スキーマは、結果として得られるSparkDataFrameの行形式を指定します。データ型の戻り値に一致する必要があります。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

dapplyと同様に、SparkDataFrameの各パーティションに関数を適用し、結果を回収します。関数の出力はdata.frameである必要があります。ただし、スキーマを渡す必要はありません。dapplyCollectは、すべてのパーティションで実行されたUDFの出力がドライバにプルできず、ドライバのメモリに収まらない場合に失敗する可能性があることに注意してください。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

入力列でグループ化し、gapply または gapplyCollect を使用して大規模データセットで特定の関数を実行

gapply

SparkDataFrameの各グループに関数を適用します。この関数は、SparkDataFrameの各グループに適用され、グループ化キーと、そのキーに対応するRのdata.frameの2つのパラメータのみを持つ必要があります。グループは、SparkDataFrameの列から選択されます。関数の出力はdata.frameである必要があります。スキーマは、結果のSparkDataFrameの行形式を指定します。これは、Sparkのデータ型に基づいて、R関数の出力スキーマを表す必要があります。返されるdata.frameの列名はユーザーによって設定されます。

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

gapplyと同様に、SparkDataFrameの各パーティションに関数を適用し、結果をRのdata.frameに回収します。関数の出力はdata.frameである必要があります。ただし、スキーマを渡す必要はありません。gapplyCollectは、すべてのパーティションで実行されたUDFの出力がドライバにプルできず、ドライバのメモリに収まらない場合に失敗する可能性があることに注意してください。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

spark.lapply を使用して分散されたローカルR関数を実行

spark.lapply

ネイティブRのlapplyと同様に、spark.lapplyは、要素のリストに対して関数を実行し、Sparkで計算を分散します。doParallelまたはlapplyと同様の方法で、リストの要素に関数を適用します。すべての計算の結果は、単一のマシンに収まる必要があります。そうでない場合は、df <- createDataFrame(list)のようにしてから、dapplyを使用できます。

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

即時実行

即時実行が有効になっている場合、SparkDataFrameが作成されると、データはすぐにRクライアントに返されます。デフォルトでは、即時実行は有効になっていません。有効にするには、SparkSessionが起動されたときに、構成プロパティspark.sql.repl.eagerEval.enabledtrueに設定します。

表示するデータの最大行数と列ごとの最大文字数は、それぞれspark.sql.repl.eagerEval.maxNumRowsおよびspark.sql.repl.eagerEval.truncate構成プロパティで制御できます。これらのプロパティは、即時実行が有効になっている場合にのみ有効です。これらのプロパティが明示的に設定されていない場合、デフォルトでは最大20行、列ごとに最大20文字のデータが表示されます。

# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
                                  spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))

# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")

# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2

##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##only showing top 10 rows

sparkRシェルで即時実行を有効にするには、--confオプションにspark.sql.repl.eagerEval.enabled=true構成プロパティを追加してください。

SparkRからのSQLクエリの実行

SparkDataFrameは、Spark SQLで一時ビューとして登録することもでき、そのデータに対してSQLクエリを実行できます。sql関数を使用すると、アプリケーションはプログラムでSQLクエリを実行でき、結果がSparkDataFrameとして返されます。

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

機械学習

アルゴリズム

SparkRは、現在、次の機械学習アルゴリズムをサポートしています。

分類

回帰

クラスタリング

協調フィルタリング

頻出パターンマイニング

統計

内部的には、SparkRはMLlibを使用してモデルをトレーニングします。コード例については、MLlibユーザーガイドの対応するセクションを参照してください。ユーザーは、summaryを呼び出して、適合したモデルの要約を出力したり、predictを使用して新しいデータの予測を行ったり、write.ml/read.mlを使用して適合したモデルを保存/ロードしたりできます。SparkRは、モデルフィッティングのために、利用可能なRの数式演算子(’~’, ‘.’, ‘:’, ‘+’, ‘-‘など)のサブセットをサポートしています。

モデルの永続化

次の例は、SparkRでMLlibモデルを保存/ロードする方法を示しています。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)
完全なコード例は、Sparkリポジトリの「examples/src/main/r/ml/ml.R」にあります。

RとSpark間のデータ型マッピング

RSpark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map

構造化ストリーミング

SparkRは、構造化ストリーミングAPIをサポートしています。構造化ストリーミングは、Spark SQLエンジン上に構築された、スケーラブルでフォールトトレラントなストリーム処理エンジンです。詳細については、構造化ストリーミングプログラミングガイドのR APIを参照してください。

SparkRでのApache Arrow

Apache Arrowは、JVMとRプロセス間でデータを効率的に転送するためにSparkで使用される、インメモリの列指向データ形式です。PySparkの最適化(Apache Arrowを使用したPandas向けのPySpark使用ガイド)も参照してください。このガイドでは、SparkRでArrow最適化を使用する方法といくつかの重要なポイントについて説明します。

Arrowがインストールされていることを確認

Arrow RライブラリはCRANで入手でき、以下のようにインストールできます。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

詳細については、Apache Arrowの公式ドキュメントを参照してください。

すべてのクラスターノードにArrow Rパッケージがインストールされ、利用可能であることを確認する必要があります。現在サポートされている最小バージョンは1.0.0です。ただし、SparkRでのArrow最適化は実験的なものであるため、マイナーリリース間で変更される可能性があります。

R DataFrameとの間の変換、dapply および gapply の有効化

Arrow最適化は、collect(spark_df)の呼び出しを使用してSpark DataFrameをR DataFrameに変換するとき、createDataFrame(r_df)を使用してR DataFrameからSpark DataFrameを作成するとき、dapply(...)を介して各パーティションにRネイティブ関数を適用するとき、およびgapply(...)を介してグループ化されたデータにRネイティブ関数を適用するときに使用できます。これらを実行するときにArrowを使用するには、最初にSpark構成 'spark.sql.execution.arrow.sparkr.enabled'を 'true' に設定する必要があります。これはデフォルトでは無効になっています。

最適化が有効かどうかに関わらず、SparkR は同じ結果を生成します。さらに、Spark DataFrame と R DataFrame 間の変換は、実際の計算前に何らかの理由で最適化が失敗した場合、自動的に Arrow 最適化ではない実装にフォールバックします。

# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

# Converts Spark DataFrame to an R DataFrame
collect(spark_df)

# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))

# Apply an R native function to grouped data.
collect(gapply(spark_df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))

Arrow を使用する場合でも、collect(spark_df) は DataFrame 内のすべてのレコードをドライバプログラムに収集するため、データの小さなサブセットに対してのみ実行する必要があります。さらに、gapply(...) および dapply(...) で指定された出力スキーマは、指定された関数によって返される R DataFrame のスキーマと一致する必要があります。

サポートされているSQL型

現在、FloatTypeBinaryTypeArrayTypeStructType、および MapType を除く、すべての Spark SQL データ型が Arrow ベースの変換でサポートされています。

R関数名の競合

R で新しいパッケージをロードしてアタッチする場合、関数が別の関数をマスクする名前の競合が発生する可能性があります。

以下の関数は SparkR パッケージによってマスクされています。

マスクされた関数アクセス方法
package:stats 内の cov
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
package:stats 内の filter
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
package:base 内の sample base::sample(x, size, replace = FALSE, prob = NULL)

SparkR の一部は dplyr パッケージに基づいてモデル化されているため、SparkR の特定の関数は dplyr の関数と同じ名前を共有しています。2 つのパッケージのロード順序に応じて、最初にロードされたパッケージの一部の関数は、後でロードされたパッケージの関数によってマスクされます。このような場合は、例えば SparkR::cume_dist(x) または dplyr::cume_dist(x) のように、パッケージ名をプレフィックスとして付与して呼び出してください。

R での検索パスは、search() で確認できます。

移行ガイド

移行ガイドは、このページでアーカイブされました。