SparkR (Spark 上の R)

SparkR は Apache Spark 4.0.0 から非推奨となり、将来のバージョンで削除される予定です。

概要

SparkR は、R から Apache Spark を使用するための軽量なフロントエンドを提供する R パッケージです。Spark 4.0.0 では、SparkR は、選択、フィルタリング、集計などの操作をサポートする分散データフレーム実装を提供します(R のデータフレーム、dplyr と同様)が、大規模データセットに対して実行できます。SparkR は MLlib を使用した分散機械学習もサポートしています。

SparkDataFrame

SparkDataFrame は、名前付き列に編成されたデータの分散コレクションです。概念的にはリレーショナルデータベースのテーブルまたは R のデータフレームと同等ですが、内部的に豊富な最適化が施されています。SparkDataFrame は、構造化データファイル、Hive のテーブル、外部データベース、または既存のローカル R データフレームなど、幅広いソースから構築できます。

このページで説明するすべての例は、R または Spark ディストリビューションに含まれるサンプルデータを使用しており、./bin/sparkR シェルを使用して実行できます。

起動: SparkSession

SparkR へのエントリポイントは SparkSession であり、R プログラムを Spark クラスターに接続します。sparkR.session を使用して SparkSession を作成し、アプリケーション名、依存する Spark パッケージなどのオプションを渡すことができます。さらに、SparkSession を介して SparkDataFrame を操作することもできます。もし sparkR シェルから作業している場合、SparkSession はすでに作成されているため、sparkR.session を呼び出す必要はありません。

sparkR.session()

RStudio からの起動

RStudio から SparkR を起動することもできます。RStudio、R シェル、Rscript、またはその他の R IDE から R プログラムを Spark クラスターに接続できます。開始するには、環境変数 SPARK_HOME が設定されていることを確認し(Sys.getenv で確認できます)、SparkR パッケージをロードし、以下のように sparkR.session を呼び出します。Spark のインストールがチェックされ、見つからなければ自動的にダウンロードおよびキャッシュされます。または、install.spark を手動で実行することもできます。

sparkR.session を呼び出すことに加えて、特定の Spark ドライバープロパティを指定することもできます。通常、これらの アプリケーションプロパティ実行時環境 は、ドライバー JVM プロセスがすでに起動しているため、プログラムで設定することはできません。この場合、SparkR がこれを担当します。設定するには、sparkR.session()sparkConfig 引数に、他の設定プロパティと同様に渡します。

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

RStudio からの sparkR.sessionsparkConfig に設定できる Spark ドライバープロパティは以下の通りです。

プロパティ名プロパティグループspark-submit 相当
spark.master アプリケーションプロパティ --master
spark.kerberos.keytab アプリケーションプロパティ --keytab
spark.kerberos.principal アプリケーションプロパティ --principal
spark.driver.memory アプリケーションプロパティ --driver-memory
spark.driver.extraClassPath 実行時環境 --driver-class-path
spark.driver.extraJavaOptions 実行時環境 --driver-java-options
spark.driver.extraLibraryPath 実行時環境 --driver-library-path

SparkDataFrame の作成

SparkSession を使用すると、アプリケーションはローカル R データフレーム、Hive テーブル、またはその他のデータソースから SparkDataFrame を作成できます。

ローカルデータフレームから

データフレームを作成する最も簡単な方法は、ローカル R データフレームを SparkDataFrame に変換することです。具体的には、as.DataFrame または createDataFrame を使用し、ローカル R データフレームを渡して SparkDataFrame を作成できます。例として、以下は R の faithful データセットに基づいて SparkDataFrame を作成するものです。

df <- as.DataFrame(faithful)

# Displays the first part of the SparkDataFrame
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

データソースから

SparkR は、SparkDataFrame インターフェースを介して、さまざまなデータソースに対する操作をサポートしています。このセクションでは、データソースを使用したデータの読み込みと保存の一般的な方法について説明します。組み込みデータソースで利用可能な特定のオプションについては、Spark SQL プログラミングガイドを参照してください。

データソースから SparkDataFrame を作成する一般的な方法は read.df です。このメソッドは、読み込むファイルのパスとデータソースのタイプを受け取り、現在アクティブな SparkSession が自動的に使用されます。SparkR は、JSON、CSV、Parquet ファイルをネイティブで読み込むことをサポートしており、サードパーティプロジェクトなどのソースから利用可能なパッケージを介して、Avro のような一般的なファイル形式のデータソースコネクタを見つけることができます。これらのパッケージは、spark-submit または sparkR コマンドで --packages を指定して追加するか、インタラクティブな R シェルまたは RStudio から SparkSession を初期化する際に sparkPackages パラメータを渡すことによって追加できます。

sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.13:4.0.0")

例の JSON 入力ファイルを使用してデータソースの使用方法を見てみましょう。ここで使用されているファイルは、典型的な JSON ファイルではないことに注意してください。ファイルの各行は、個別の自己完結型の有効な JSON オブジェクトを含む必要があります。詳細については、JSON Lines テキスト形式(改行区切り JSON とも呼ばれます)を参照してください。その結果、通常の複数行 JSON ファイルはほとんどの場合失敗します。

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

データソース API は、CSV 形式の入力ファイルをネイティブでサポートしています。詳細については、SparkR の read.df API ドキュメントを参照してください。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

データソース API は、SparkDataFrame を複数のファイル形式に保存するためにも使用できます。たとえば、前の例の SparkDataFrame を write.df を使用して Parquet ファイルに保存できます。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

Hive テーブルから

Hive テーブルから SparkDataFrame を作成することもできます。これを行うには、Hive MetaStore のテーブルにアクセスできる Hive サポート付きの SparkSession を作成する必要があります。Spark は Hive サポートでビルドされている必要があり、詳細については SQL プログラミングガイドで確認できます。SparkR では、デフォルトで Hive サポートが有効な SparkSession (enableHiveSupport = TRUE)を作成しようとします。

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame の操作

SparkDataFrame は、構造化データ処理を行うための多数の関数をサポートしています。ここでは基本的な例をいくつか示し、完全なリストは API ドキュメントで見つけることができます。

行、列の選択

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

グループ化、集計

SparkR データフレームは、グループ化後にデータを集計するためによく使用される関数を多数サポートしています。たとえば、以下に示すように、faithful データセットの waiting 時間のヒストグラムを計算できます。

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

標準的な集計に加えて、SparkR は OLAP cube 演算子である cube

head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1  NA 140.8    4     22.8
##2   4  75.7    4     30.4
##3   8 400.0    3     19.2
##4   8 318.0    3     15.5
##5  NA 351.0   NA     15.8
##6  NA 275.8   NA     16.3

rollup

head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
##  cyl  disp gear avg(mpg)
##1   4  75.7    4     30.4
##2   8 400.0    3     19.2
##3   8 318.0    3     15.5
##4   4  78.7   NA     32.4
##5   8 304.0    3     15.2
##6   4  79.0   NA     27.3

列に対する操作

もサポートしています。SparkR には、データ処理や集計中に列に直接適用できる多くの関数も用意されています。以下の例は、基本的な算術関数の使用方法を示しています。

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

ユーザー定義関数の適用

SparkR では、いくつかの種類のユーザー定義関数をサポートしています。

dapply または dapplyCollect を使用して、指定された関数を大規模データセットに実行する

dapply

SparkDataFrame の各パーティションに関数を適用します。適用される関数は、各パーティションに対応する data.frame が渡される単一のパラメータを持つ必要があります。関数の出力は data.frame である必要があります。Schema は、結果の SparkDataFrame の行形式を指定します。データ型を返す値のスキーマと一致させる必要があります。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300
dapplyCollect

dapply と同様に、SparkDataFrame の各パーティションに関数を適用し、結果を収集します。関数の出力は data.frame である必要があります。ただし、Schema の指定は不要です。注意点として、dapplyCollect は、すべてのパーティションで実行された UDF の出力がドライバーに転送され、ドライバーメモリに収まらない場合に失敗する可能性があります。

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

入力列でグループ化し、gapply または gapplyCollect を使用して、指定された関数を大規模データセットに実行する

gapply

SparkDataFrame の各グループに関数を適用します。適用される関数は、SparkDataFrame の各グループに適用され、グループキーと、そのキーに対応する R data.frame の 2 つのパラメータを持つ必要があります。グループは SparkDataFrame の列から選択されます。関数の出力は data.frame である必要があります。Schema は、結果の SparkDataFrame の行形式を指定します。これは、Spark のデータ型に基づいて R 関数の出力スキーマを表す必要があります。返される data.frame の列名はユーザーによって設定されます。

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900
gapplyCollect

gapply と同様に、SparkDataFrame の各パーティションに関数を適用し、結果を R data.frame に収集します。関数の出力は data.frame である必要があります。ただし、スキーマの指定は不要です。注意点として、gapplyCollect は、すべてのパーティションで実行された UDF の出力がドライバーに転送され、ドライバーメモリに収まらない場合に失敗する可能性があります。

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

spark.lapply を使用して、ローカル R 関数を分散実行する

spark.lapply

ネイティブ R の lapply と同様に、spark.lapply はリストの要素に関数を実行し、Spark を使用して計算を分散します。リストの要素に関数を適用します。これは doParallel または lapply と同様の方法で行われます。すべての計算結果は 1 台のマシンに収まる必要があります。そうでない場合は、df <- createDataFrame(list) のような処理を行い、その後 dapply を使用できます。

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

即時実行

即時実行が有効になっている場合、SparkDataFrame が作成されると、データは R クライアントに即座に返されます。デフォルトでは、即時実行は有効になっておらず、SparkSession の起動時に設定プロパティ spark.sql.repl.eagerEval.enabledtrue に設定することで有効にできます。

表示されるデータの最大行数と各列の最大文字数は、それぞれ spark.sql.repl.eagerEval.maxNumRows および spark.sql.repl.eagerEval.truncate の設定プロパティで制御できます。これらのプロパティは、即時実行が有効な場合にのみ有効です。これらのプロパティが明示的に設定されていない場合、デフォルトでは、最大 20 行、各列最大 20 文字までのデータが表示されます。

# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
                                  spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))

# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")

# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2

##+-------+-----+
##|waiting|count|
##+-------+-----+
##|   43.0|    1|
##|   45.0|    3|
##|   46.0|    5|
##|   47.0|    4|
##|   48.0|    3|
##|   49.0|    5|
##|   50.0|    5|
##|   51.0|    6|
##|   52.0|    5|
##|   53.0|    7|
##+-------+-----+
##only showing top 10 rows

sparkR シェルで即時実行を有効にするには、--conf オプションに spark.sql.repl.eagerEval.enabled=true 設定プロパティを追加してください。

SparkR からの SQL クエリの実行

SparkDataFrame は、Spark SQL で一時ビューとして登録することもでき、そのデータに対する SQL クエリを実行できます。sql 関数により、アプリケーションは SQL クエリをプログラムで実行し、結果を SparkDataFrame として返すことができます。

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

機械学習

アルゴリズム

SparkR は現在、以下の機械学習アルゴリズムをサポートしています。

分類

回帰

ツリー

クラスタリング

協調フィルタリング

頻出パターンマイニング

統計

内部的には、SparkR は MLlib を使用してモデルをトレーニングします。例コードについては、MLlib ユーザーガイドの対応するセクションを参照してください。ユーザーは、summary を呼び出して適合モデルの概要を表示したり、predict を呼び出して新しいデータで予測を行ったり、write.ml/read.ml を呼び出して適合モデルを保存/ロードしたりできます。SparkR は、モデルフィッティングのために利用可能な R の数式演算子の一部をサポートしており、これには '~'、'.'、':'、'+'、'-' が含まれます。

モデルの永続化

以下の例は、SparkR を使用して MLlib モデルを保存/ロードする方法を示しています。

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)
完全な例コードは、Spark リポジトリの "examples/src/main/r/ml/ml.R" で見つけることができます。

R と Spark のデータ型マッピング

RSpark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
日付 date
array array
list array
env map

Structured Streaming

SparkR は、Structured Streaming API をサポートしています。Structured Streaming は、Spark SQL エンジン上に構築されたスケーラブルで耐障害性の高いストリーム処理エンジンです。詳細については、Structured Streaming プログラミングガイドの R API を参照してください。

SparkR の Apache Arrow

Apache Arrow は、Spark が JVM と R プロセスの間で効率的にデータを転送するために使用するインメモリ列指向データ形式です。PySpark の最適化についても参照してください。Apache Arrow を使用した Pandas の PySpark 使用ガイド。このガイドは、いくつかの重要なポイントとともに、SparkR で Arrow 最適化を使用する方法を説明することを目的としています。

Arrow のインストールを確認する

Arrow R ライブラリは CRAN で利用可能であり、以下のようにインストールできます。

Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'

詳細については、Apache Arrow の公式ドキュメントを参照してください。

注意点として、Arrow R パッケージがインストールされており、すべてのクラスターノードで利用可能であることを確認する必要があります。現在のサポートされている最小バージョンは 1.0.0 ですが、SparkR の Arrow 最適化は実験的であるため、マイナーリリース間で変更される可能性があります。

R DataFrame との変換、dapplygapply のための有効化

Arrow 最適化は、collect(spark_df) を使用して Spark DataFrame を R DataFrame に変換する場合、createDataFrame(r_df) を使用して R DataFrame から Spark DataFrame を作成する場合、dapply(...) を介して R ネイティブ関数を各パーティションに適用する場合、および gapply(...) を介してグループ化されたデータに R ネイティブ関数を適用する場合に利用できます。これらの操作を実行する際に Arrow を使用するには、ユーザーはまず Spark 設定 'spark.sql.execution.arrow.sparkr.enabled' を 'true' に設定する必要があります。これはデフォルトでは無効になっています。

最適化が有効になっているかどうかにかかわらず、SparkR は同じ結果を生成します。さらに、Spark DataFrame と R DataFrame の間の変換は、実際の計算の前に最適化が何らかの理由で失敗した場合、自動的に非 Arrow 最適化実装にフォールバックします。

# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
               sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))

# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)

# Converts Spark DataFrame to an R DataFrame
collect(spark_df)

# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))

# Apply an R native function to grouped data.
collect(gapply(spark_df,
               "gear",
               function(key, group) {
                 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
               },
               structType("gear double, disp boolean")))

注意点として、Arrow を使用している場合でも、collect(spark_df) は DataFrame のすべてのレコードをドライバープログラムに収集することになるため、データの小さなサブセットに対してのみ実行する必要があります。さらに、gapply(...) および dapply(...) で指定された出力スキーマは、指定された関数によって返される R DataFrame のスキーマと一致している必要があります。

サポートされている SQL 型

現在、Arrow ベースの変換では、FloatTypeBinaryTypeArrayTypeStructTypeMapType を除くすべての Spark SQL データ型がサポートされています。

R 関数名の競合

R で新しいパッケージをロードおよびアタッチする際に、関数が別の関数をマスクする名前の競合が発生する可能性があります。

SparkR パッケージによってマスクされる関数は以下の通りです。

マスクされた関数アクセス方法
package:statscov
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
package:statsfilter
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
package:basesample base::sample(x, size, replace = FALSE, prob = NULL)

SparkR の一部は dplyr パッケージをモデルとしているため、SparkR の一部の関数は dplyr の関数と同じ名前を共有しています。2 つのパッケージのロード順序によっては、最初にロードされたパッケージの関数が後からロードされたパッケージの関数によってマスクされる場合があります。その場合、そのような呼び出しはパッケージ名でプレフィックスを付けます。たとえば、SparkR::cume_dist(x) または dplyr::cume_dist(x) のようになります。

R の検索パスは、search() で確認できます。

移行ガイド

移行ガイドは、このページにアーカイブされています。