SparkR (Spark 上の R)
- 概要
- SparkDataFrame
- 機械学習
- R と Spark のデータ型マッピング
- Structured Streaming
- SparkR の Apache Arrow
- R 関数名の競合
- 移行ガイド
SparkR は Apache Spark 4.0.0 から非推奨となり、将来のバージョンで削除される予定です。
概要
SparkR は、R から Apache Spark を使用するための軽量なフロントエンドを提供する R パッケージです。Spark 4.0.0 では、SparkR は、選択、フィルタリング、集計などの操作をサポートする分散データフレーム実装を提供します(R のデータフレーム、dplyr と同様)が、大規模データセットに対して実行できます。SparkR は MLlib を使用した分散機械学習もサポートしています。
SparkDataFrame
SparkDataFrame は、名前付き列に編成されたデータの分散コレクションです。概念的にはリレーショナルデータベースのテーブルまたは R のデータフレームと同等ですが、内部的に豊富な最適化が施されています。SparkDataFrame は、構造化データファイル、Hive のテーブル、外部データベース、または既存のローカル R データフレームなど、幅広いソースから構築できます。
このページで説明するすべての例は、R または Spark ディストリビューションに含まれるサンプルデータを使用しており、./bin/sparkR シェルを使用して実行できます。
起動: SparkSession
SparkR へのエントリポイントは SparkSession であり、R プログラムを Spark クラスターに接続します。sparkR.session を使用して SparkSession を作成し、アプリケーション名、依存する Spark パッケージなどのオプションを渡すことができます。さらに、SparkSession を介して SparkDataFrame を操作することもできます。もし sparkR シェルから作業している場合、SparkSession はすでに作成されているため、sparkR.session を呼び出す必要はありません。
sparkR.session()RStudio からの起動
RStudio から SparkR を起動することもできます。RStudio、R シェル、Rscript、またはその他の R IDE から R プログラムを Spark クラスターに接続できます。開始するには、環境変数 SPARK_HOME が設定されていることを確認し(Sys.getenv で確認できます)、SparkR パッケージをロードし、以下のように sparkR.session を呼び出します。Spark のインストールがチェックされ、見つからなければ自動的にダウンロードおよびキャッシュされます。または、install.spark を手動で実行することもできます。
sparkR.session を呼び出すことに加えて、特定の Spark ドライバープロパティを指定することもできます。通常、これらの アプリケーションプロパティ と 実行時環境 は、ドライバー JVM プロセスがすでに起動しているため、プログラムで設定することはできません。この場合、SparkR がこれを担当します。設定するには、sparkR.session() の sparkConfig 引数に、他の設定プロパティと同様に渡します。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))RStudio からの sparkR.session で sparkConfig に設定できる Spark ドライバープロパティは以下の通りです。
| プロパティ名 | プロパティグループ | spark-submit 相当 |
|---|---|---|
spark.master |
アプリケーションプロパティ | --master |
spark.kerberos.keytab |
アプリケーションプロパティ | --keytab |
spark.kerberos.principal |
アプリケーションプロパティ | --principal |
spark.driver.memory |
アプリケーションプロパティ | --driver-memory |
spark.driver.extraClassPath |
実行時環境 | --driver-class-path |
spark.driver.extraJavaOptions |
実行時環境 | --driver-java-options |
spark.driver.extraLibraryPath |
実行時環境 | --driver-library-path |
SparkDataFrame の作成
SparkSession を使用すると、アプリケーションはローカル R データフレーム、Hive テーブル、またはその他のデータソースから SparkDataFrame を作成できます。
ローカルデータフレームから
データフレームを作成する最も簡単な方法は、ローカル R データフレームを SparkDataFrame に変換することです。具体的には、as.DataFrame または createDataFrame を使用し、ローカル R データフレームを渡して SparkDataFrame を作成できます。例として、以下は R の faithful データセットに基づいて SparkDataFrame を作成するものです。
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74データソースから
SparkR は、SparkDataFrame インターフェースを介して、さまざまなデータソースに対する操作をサポートしています。このセクションでは、データソースを使用したデータの読み込みと保存の一般的な方法について説明します。組み込みデータソースで利用可能な特定のオプションについては、Spark SQL プログラミングガイドを参照してください。
データソースから SparkDataFrame を作成する一般的な方法は read.df です。このメソッドは、読み込むファイルのパスとデータソースのタイプを受け取り、現在アクティブな SparkSession が自動的に使用されます。SparkR は、JSON、CSV、Parquet ファイルをネイティブで読み込むことをサポートしており、サードパーティプロジェクトなどのソースから利用可能なパッケージを介して、Avro のような一般的なファイル形式のデータソースコネクタを見つけることができます。これらのパッケージは、spark-submit または sparkR コマンドで --packages を指定して追加するか、インタラクティブな R シェルまたは RStudio から SparkSession を初期化する際に sparkPackages パラメータを渡すことによって追加できます。
sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.13:4.0.0")例の JSON 入力ファイルを使用してデータソースの使用方法を見てみましょう。ここで使用されているファイルは、典型的な JSON ファイルではないことに注意してください。ファイルの各行は、個別の自己完結型の有効な JSON オブジェクトを含む必要があります。詳細については、JSON Lines テキスト形式(改行区切り JSON とも呼ばれます)を参照してください。その結果、通常の複数行 JSON ファイルはほとんどの場合失敗します。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))データソース API は、CSV 形式の入力ファイルをネイティブでサポートしています。詳細については、SparkR の read.df API ドキュメントを参照してください。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")データソース API は、SparkDataFrame を複数のファイル形式に保存するためにも使用できます。たとえば、前の例の SparkDataFrame を write.df を使用して Parquet ファイルに保存できます。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")Hive テーブルから
Hive テーブルから SparkDataFrame を作成することもできます。これを行うには、Hive MetaStore のテーブルにアクセスできる Hive サポート付きの SparkSession を作成する必要があります。Spark は Hive サポートでビルドされている必要があり、詳細については SQL プログラミングガイドで確認できます。SparkR では、デフォルトで Hive サポートが有効な SparkSession (enableHiveSupport = TRUE)を作成しようとします。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311SparkDataFrame の操作
SparkDataFrame は、構造化データ処理を行うための多数の関数をサポートしています。ここでは基本的な例をいくつか示し、完全なリストは API ドキュメントで見つけることができます。
行、列の選択
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48グループ化、集計
SparkR データフレームは、グループ化後にデータを集計するためによく使用される関数を多数サポートしています。たとえば、以下に示すように、faithful データセットの waiting 時間のヒストグラムを計算できます。
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13標準的な集計に加えて、SparkR は OLAP cube 演算子である cube
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3と rollup
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3列に対する操作
もサポートしています。SparkR には、データ処理や集計中に列に直接適用できる多くの関数も用意されています。以下の例は、基本的な算術関数の使用方法を示しています。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440ユーザー定義関数の適用
SparkR では、いくつかの種類のユーザー定義関数をサポートしています。
dapply または dapplyCollect を使用して、指定された関数を大規模データセットに実行する
dapply
SparkDataFrame の各パーティションに関数を適用します。適用される関数は、各パーティションに対応する data.frame が渡される単一のパラメータを持つ必要があります。関数の出力は data.frame である必要があります。Schema は、結果の SparkDataFrame の行形式を指定します。データ型を返す値のスキーマと一致させる必要があります。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300dapplyCollect
dapply と同様に、SparkDataFrame の各パーティションに関数を適用し、結果を収集します。関数の出力は data.frame である必要があります。ただし、Schema の指定は不要です。注意点として、dapplyCollect は、すべてのパーティションで実行された UDF の出力がドライバーに転送され、ドライバーメモリに収まらない場合に失敗する可能性があります。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440入力列でグループ化し、gapply または gapplyCollect を使用して、指定された関数を大規模データセットに実行する
gapply
SparkDataFrame の各グループに関数を適用します。適用される関数は、SparkDataFrame の各グループに適用され、グループキーと、そのキーに対応する R data.frame の 2 つのパラメータを持つ必要があります。グループは SparkDataFrame の列から選択されます。関数の出力は data.frame である必要があります。Schema は、結果の SparkDataFrame の行形式を指定します。これは、Spark のデータ型に基づいて R 関数の出力スキーマを表す必要があります。返される data.frame の列名はユーザーによって設定されます。
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900gapplyCollect
gapply と同様に、SparkDataFrame の各パーティションに関数を適用し、結果を R data.frame に収集します。関数の出力は data.frame である必要があります。ただし、スキーマの指定は不要です。注意点として、gapplyCollect は、すべてのパーティションで実行された UDF の出力がドライバーに転送され、ドライバーメモリに収まらない場合に失敗する可能性があります。
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900spark.lapply を使用して、ローカル R 関数を分散実行する
spark.lapply
ネイティブ R の lapply と同様に、spark.lapply はリストの要素に関数を実行し、Spark を使用して計算を分散します。リストの要素に関数を適用します。これは doParallel または lapply と同様の方法で行われます。すべての計算結果は 1 台のマシンに収まる必要があります。そうでない場合は、df <- createDataFrame(list) のような処理を行い、その後 dapply を使用できます。
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# Print the summary of each model
print(model.summaries)即時実行
即時実行が有効になっている場合、SparkDataFrame が作成されると、データは R クライアントに即座に返されます。デフォルトでは、即時実行は有効になっておらず、SparkSession の起動時に設定プロパティ spark.sql.repl.eagerEval.enabled を true に設定することで有効にできます。
表示されるデータの最大行数と各列の最大文字数は、それぞれ spark.sql.repl.eagerEval.maxNumRows および spark.sql.repl.eagerEval.truncate の設定プロパティで制御できます。これらのプロパティは、即時実行が有効な場合にのみ有効です。これらのプロパティが明示的に設定されていない場合、デフォルトでは、最大 20 行、各列最大 20 文字までのデータが表示されます。
# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##| 43.0| 1|
##| 45.0| 3|
##| 46.0| 5|
##| 47.0| 4|
##| 48.0| 3|
##| 49.0| 5|
##| 50.0| 5|
##| 51.0| 6|
##| 52.0| 5|
##| 53.0| 7|
##+-------+-----+
##only showing top 10 rowssparkR シェルで即時実行を有効にするには、--conf オプションに spark.sql.repl.eagerEval.enabled=true 設定プロパティを追加してください。
SparkR からの SQL クエリの実行
SparkDataFrame は、Spark SQL で一時ビューとして登録することもでき、そのデータに対する SQL クエリを実行できます。sql 関数により、アプリケーションは SQL クエリをプログラムで実行し、結果を SparkDataFrame として返すことができます。
# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")
# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin機械学習
アルゴリズム
SparkR は現在、以下の機械学習アルゴリズムをサポートしています。
分類
spark.logit:ロジスティック回帰spark.mlp:多層パーセプトロン (MLP)spark.naiveBayes:ナイーブベイズspark.svmLinear:線形サポートベクターマシンspark.fmClassifier:因子分解マシン分類器
回帰
spark.survreg:加速故障時間 (AFT) 生存モデルspark.glmまたはglm:一般化線形モデル (GLM)spark.isoreg:等otonic 回帰spark.lm:線形回帰spark.fmRegressor:因子分解マシン回帰器
ツリー
spark.decisionTree:回帰および分類のための決定木spark.gbt:回帰および分類のための勾配ブーストツリーspark.randomForest:回帰および分類のためのランダムフォレスト
クラスタリング
spark.bisectingKmeans:二分割 k-meansspark.gaussianMixture:ガウス混合モデル (GMM)spark.kmeans:k-meansspark.lda:潜在ディリクレ配分法 (LDA)spark.powerIterationClustering (PIC):べき乗法クラスタリング (PIC)
協調フィルタリング
頻出パターンマイニング
統計
spark.kstest:コルモゴロフ・スミルノフ検定
内部的には、SparkR は MLlib を使用してモデルをトレーニングします。例コードについては、MLlib ユーザーガイドの対応するセクションを参照してください。ユーザーは、summary を呼び出して適合モデルの概要を表示したり、predict を呼び出して新しいデータで予測を行ったり、write.ml/read.ml を呼び出して適合モデルを保存/ロードしたりできます。SparkR は、モデルフィッティングのために利用可能な R の数式演算子の一部をサポートしており、これには '~'、'.'、':'、'+'、'-' が含まれます。
モデルの永続化
以下の例は、SparkR を使用して MLlib モデルを保存/ロードする方法を示しています。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)R と Spark のデータ型マッピング
| R | Spark |
|---|---|
| byte | byte |
| integer | integer |
| float | float |
| double | double |
| numeric | double |
| character | string |
| string | string |
| binary | binary |
| raw | binary |
| logical | boolean |
| POSIXct | timestamp |
| POSIXlt | timestamp |
| 日付 | date |
| array | array |
| list | array |
| env | map |
Structured Streaming
SparkR は、Structured Streaming API をサポートしています。Structured Streaming は、Spark SQL エンジン上に構築されたスケーラブルで耐障害性の高いストリーム処理エンジンです。詳細については、Structured Streaming プログラミングガイドの R API を参照してください。
SparkR の Apache Arrow
Apache Arrow は、Spark が JVM と R プロセスの間で効率的にデータを転送するために使用するインメモリ列指向データ形式です。PySpark の最適化についても参照してください。Apache Arrow を使用した Pandas の PySpark 使用ガイド。このガイドは、いくつかの重要なポイントとともに、SparkR で Arrow 最適化を使用する方法を説明することを目的としています。
Arrow のインストールを確認する
Arrow R ライブラリは CRAN で利用可能であり、以下のようにインストールできます。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
詳細については、Apache Arrow の公式ドキュメントを参照してください。
注意点として、Arrow R パッケージがインストールされており、すべてのクラスターノードで利用可能であることを確認する必要があります。現在のサポートされている最小バージョンは 1.0.0 ですが、SparkR の Arrow 最適化は実験的であるため、マイナーリリース間で変更される可能性があります。
R DataFrame との変換、dapply、gapply のための有効化
Arrow 最適化は、collect(spark_df) を使用して Spark DataFrame を R DataFrame に変換する場合、createDataFrame(r_df) を使用して R DataFrame から Spark DataFrame を作成する場合、dapply(...) を介して R ネイティブ関数を各パーティションに適用する場合、および gapply(...) を介してグループ化されたデータに R ネイティブ関数を適用する場合に利用できます。これらの操作を実行する際に Arrow を使用するには、ユーザーはまず Spark 設定 'spark.sql.execution.arrow.sparkr.enabled' を 'true' に設定する必要があります。これはデフォルトでは無効になっています。
最適化が有効になっているかどうかにかかわらず、SparkR は同じ結果を生成します。さらに、Spark DataFrame と R DataFrame の間の変換は、実際の計算の前に最適化が何らかの理由で失敗した場合、自動的に非 Arrow 最適化実装にフォールバックします。
# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)
# Converts Spark DataFrame to an R DataFrame
collect(spark_df)
# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# Apply an R native function to grouped data.
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))注意点として、Arrow を使用している場合でも、collect(spark_df) は DataFrame のすべてのレコードをドライバープログラムに収集することになるため、データの小さなサブセットに対してのみ実行する必要があります。さらに、gapply(...) および dapply(...) で指定された出力スキーマは、指定された関数によって返される R DataFrame のスキーマと一致している必要があります。
サポートされている SQL 型
現在、Arrow ベースの変換では、FloatType、BinaryType、ArrayType、StructType、MapType を除くすべての Spark SQL データ型がサポートされています。
R 関数名の競合
R で新しいパッケージをロードおよびアタッチする際に、関数が別の関数をマスクする名前の競合が発生する可能性があります。
SparkR パッケージによってマスクされる関数は以下の通りです。
| マスクされた関数 | アクセス方法 |
|---|---|
package:stats の cov |
|
package:stats の filter |
|
package:base の sample |
base::sample(x, size, replace = FALSE, prob = NULL) |
SparkR の一部は dplyr パッケージをモデルとしているため、SparkR の一部の関数は dplyr の関数と同じ名前を共有しています。2 つのパッケージのロード順序によっては、最初にロードされたパッケージの関数が後からロードされたパッケージの関数によってマスクされる場合があります。その場合、そのような呼び出しはパッケージ名でプレフィックスを付けます。たとえば、SparkR::cume_dist(x) または dplyr::cume_dist(x) のようになります。
R の検索パスは、search() で確認できます。
移行ガイド
移行ガイドは、このページにアーカイブされています。