SparkR (R on Spark)
- 概要
- SparkDataFrame
- 機械学習
- RとSpark間のデータ型マッピング
- 構造化ストリーミング
- SparkRでのApache Arrow
- R関数名の競合
- 移行ガイド
概要
SparkRは、RからApache Sparkを使用するための軽量フロントエンドを提供するRパッケージです。Spark 3.5.1では、SparkRは選択、フィルタリング、集計などの操作(Rデータフレーム、dplyrと同様)をサポートする分散データフレームの実装を提供しますが、大規模なデータセットを対象としています。SparkRは、MLlibを使用した分散機械学習もサポートしています。
SparkDataFrame
SparkDataFrameは、名前付き列に編成されたデータの分散コレクションです。これは概念的にはリレーショナルデータベースのテーブルまたはRのデータフレームと同等ですが、舞台裏ではより豊富な最適化が行われています。SparkDataFrameは、構造化データファイル、Hiveのテーブル、外部データベース、既存のローカルRデータフレームなど、幅広いソースから構築できます。
このページのすべての例では、RまたはSparkディストリビューションに含まれているサンプルデータを使用しており、./bin/sparkR
シェルを使用して実行できます。
起動: SparkSession
SparkRへのエントリポイントは、RプログラムをSparkクラスターに接続するSparkSession
です。sparkR.session
を使用してSparkSession
を作成し、アプリケーション名、依存するSparkパッケージなどのオプションを渡すことができます。さらに、SparkSession
を介してSparkDataFrameを操作することもできます。sparkR
シェルから作業している場合、SparkSession
はすでに作成されているはずであるため、sparkR.session
を呼び出す必要はありません。
sparkR.session()
RStudioからの起動
RStudioからSparkRを起動することもできます。RStudio、Rシェル、Rscript、またはその他のR IDEからRプログラムをSparkクラスターに接続できます。開始するには、SPARK_HOMEが環境で設定されていることを確認し(Sys.getenvで確認できます)、SparkRパッケージをロードし、以下のようにsparkR.session
を呼び出します。Sparkのインストールがチェックされ、見つからない場合は、自動的にダウンロードおよびキャッシュされます。または、install.spark
を手動で実行することもできます。
sparkR.session
の呼び出しに加えて、特定のSparkドライバープロパティを指定することもできます。通常、これらのアプリケーションプロパティおよびランタイム環境は、ドライバーJVMプロセスが開始されているため、プログラムで設定することはできません。この場合、SparkRがこれを行います。それらを設定するには、sparkR.session()
へのsparkConfig
引数で、他の構成プロパティと同様にそれらを渡します。
if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
RStudioからsparkR.session
でsparkConfig
に設定できる、次のSparkドライバープロパティがあります。
プロパティ名 | プロパティグループ | spark-submit 相当 |
---|---|---|
spark.master |
アプリケーションプロパティ | --master |
spark.kerberos.keytab |
アプリケーションプロパティ | --keytab |
spark.kerberos.principal |
アプリケーションプロパティ | --principal |
spark.driver.memory |
アプリケーションプロパティ | --driver-memory |
spark.driver.extraClassPath |
ランタイム環境 | --driver-class-path |
spark.driver.extraJavaOptions |
ランタイム環境 | --driver-java-options |
spark.driver.extraLibraryPath |
ランタイム環境 | --driver-library-path |
SparkDataFrameの作成
SparkSession
を使用すると、アプリケーションはローカルRデータフレーム、Hiveテーブル、または他のデータソースからSparkDataFrame
を作成できます。
ローカルデータフレームから
データフレームを作成する最も簡単な方法は、ローカルRデータフレームをSparkDataFrameに変換することです。具体的には、as.DataFrame
またはcreateDataFrame
を使用し、ローカルRデータフレームを渡してSparkDataFrameを作成できます。例として、以下はRのfaithful
データセットを使用してSparkDataFrame
を作成します。
df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
head(df)
## eruptions waiting
##1 3.600 79
##2 1.800 54
##3 3.333 74
データソースから
SparkRは、SparkDataFrame
インターフェイスを介してさまざまなデータソースでの操作をサポートしています。このセクションでは、データソースを使用したデータのロードと保存の一般的な方法について説明します。Spark SQLプログラミングガイドで、組み込みデータソースで使用できる特定のオプションを確認できます。
データソースからSparkDataFrameを作成するための一般的な方法は、read.df
です。このメソッドは、ロードするファイルのパスとデータソースのタイプを受け取り、現在アクティブなSparkSessionが自動的に使用されます。SparkRは、JSON、CSV、Parquetファイルの読み取りをネイティブにサポートしており、サードパーティプロジェクトなどのソースから入手できるパッケージを使用することで、Avroのような一般的なファイル形式のデータソースコネクタを見つけることができます。これらのパッケージは、spark-submit
またはsparkR
コマンドで--packages
を指定するか、インタラクティブRシェルまたはRStudioでSparkSessionを初期化する際にsparkPackages
パラメータを指定することで追加できます。
sparkR.session(sparkPackages = "org.apache.spark:spark-avro_2.12:3.5.1")
JSON入力ファイルを使用してデータソースを使用する方法を見てみましょう。ここで使用されるファイルは、典型的なJSONファイルではないことに注意してください。ファイル内の各行には、個別の、自己完結型の有効なJSONオブジェクトが含まれている必要があります。詳細については、JSON Linesテキスト形式 (改行区切りJSONとも呼ばれる)を参照してください。その結果、通常の複数行のJSONファイルはほとんどの場合失敗します。
people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
## age name
##1 NA Michael
##2 30 Andy
##3 19 Justin
# SparkR automatically infers the schema from the JSON file
printSchema(people)
# root
# |-- age: long (nullable = true)
# |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
データソースAPIは、CSV形式の入力ファイルをネイティブにサポートしています。詳細については、SparkRのread.df APIドキュメントを参照してください。
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
データソースAPIを使用して、SparkDataFrameを複数のファイル形式で保存することもできます。たとえば、前の例のSparkDataFrameをwrite.df
を使用してParquetファイルに保存できます。
write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
Hiveテーブルから
HiveテーブルからSparkDataFrameを作成することもできます。これを行うには、Hive MetaStoreのテーブルにアクセスできるHiveサポート付きのSparkSessionを作成する必要があります。SparkがHiveサポート付きでビルドされている必要があり、詳細についてはSQLプログラミングガイドに記載されています。SparkRでは、デフォルトで、Hiveサポートが有効な(enableHiveSupport = TRUE
)SparkSessionを作成しようとします。
sparkR.session()
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
head(results)
## key value
## 1 238 val_238
## 2 86 val_86
## 3 311 val_311
SparkDataFrameの操作
SparkDataFrameは、構造化データ処理を行うための多数の関数をサポートしています。ここでは、基本的な例をいくつか示します。APIドキュメントで完全なリストを確認できます
行、列の選択
# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
df
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
## eruptions
##1 3.600
##2 1.800
##3 3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
## eruptions waiting
##1 1.750 47
##2 1.750 47
##3 1.867 48
グループ化、集計
SparkRデータフレームは、グループ化後のデータを集計するために一般的に使用される多くの関数をサポートしています。たとえば、以下に示すように、faithful
データセットでのwaiting
時間のヒストグラムを計算できます
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
## waiting count
##1 70 4
##2 67 1
##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
## waiting count
##1 78 15
##2 83 14
##3 81 13
標準的な集計に加えて、SparkRはOLAPキューブ演算子cube
をサポートしています
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
およびrollup
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
列の操作
SparkRは、データ処理および集計中に列に直接適用できる多数の関数も提供します。以下の例は、基本的な算術関数の使用法を示しています。
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
ユーザー定義関数の適用
SparkRでは、いくつかの種類のユーザー定義関数をサポートしています
dapply
または dapplyCollect
を使用して大規模データセットで特定の関数を実行
dapply
SparkDataFrame
の各パーティションに関数を適用します。SparkDataFrame
の各パーティションに適用される関数は、パラメータを1つだけ持つ必要があり、各パーティションに対応するdata.frame
が渡されます。関数の出力はdata.frame
である必要があります。スキーマは、結果として得られるSparkDataFrame
の行形式を指定します。データ型の戻り値に一致する必要があります。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
##4 2.283 62 3720
##5 4.533 85 5100
##6 2.883 55 3300
dapplyCollect
dapply
と同様に、SparkDataFrame
の各パーティションに関数を適用し、結果を回収します。関数の出力はdata.frame
である必要があります。ただし、スキーマを渡す必要はありません。dapplyCollect
は、すべてのパーティションで実行されたUDFの出力がドライバにプルできず、ドライバのメモリに収まらない場合に失敗する可能性があることに注意してください。
# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
## eruptions waiting waiting_secs
##1 3.600 79 4740
##2 1.800 54 3240
##3 3.333 74 4440
入力列でグループ化し、gapply
または gapplyCollect
を使用して大規模データセットで特定の関数を実行
gapply
SparkDataFrame
の各グループに関数を適用します。この関数は、SparkDataFrame
の各グループに適用され、グループ化キーと、そのキーに対応するRのdata.frame
の2つのパラメータのみを持つ必要があります。グループは、SparkDataFrame
の列から選択されます。関数の出力はdata.frame
である必要があります。スキーマは、結果のSparkDataFrame
の行形式を指定します。これは、Sparkのデータ型に基づいて、R関数の出力スキーマを表す必要があります。返されるdata.frame
の列名はユーザーによって設定されます。
# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
gapplyCollect
gapply
と同様に、SparkDataFrame
の各パーティションに関数を適用し、結果をRのdata.frameに回収します。関数の出力はdata.frame
である必要があります。ただし、スキーマを渡す必要はありません。gapplyCollect
は、すべてのパーティションで実行されたUDFの出力がドライバにプルできず、ドライバのメモリに収まらない場合に失敗する可能性があることに注意してください。
# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
## waiting max_eruption
##1 64 5.100
##2 69 5.067
##3 71 5.033
##4 87 5.000
##5 63 4.933
##6 89 4.900
spark.lapply
を使用して分散されたローカルR関数を実行
spark.lapply
ネイティブRのlapply
と同様に、spark.lapply
は、要素のリストに対して関数を実行し、Sparkで計算を分散します。doParallel
またはlapply
と同様の方法で、リストの要素に関数を適用します。すべての計算の結果は、単一のマシンに収まる必要があります。そうでない場合は、df <- createDataFrame(list)
のようにしてから、dapply
を使用できます。
# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# Print the summary of each model
print(model.summaries)
即時実行
即時実行が有効になっている場合、SparkDataFrame
が作成されると、データはすぐにRクライアントに返されます。デフォルトでは、即時実行は有効になっていません。有効にするには、SparkSession
が起動されたときに、構成プロパティspark.sql.repl.eagerEval.enabled
をtrue
に設定します。
表示するデータの最大行数と列ごとの最大文字数は、それぞれspark.sql.repl.eagerEval.maxNumRows
およびspark.sql.repl.eagerEval.truncate
構成プロパティで制御できます。これらのプロパティは、即時実行が有効になっている場合にのみ有効です。これらのプロパティが明示的に設定されていない場合、デフォルトでは最大20行、列ごとに最大20文字のデータが表示されます。
# Start up spark session with eager execution enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
# Create a grouped and sorted SparkDataFrame
df <- createDataFrame(faithful)
df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
# Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
df2
##+-------+-----+
##|waiting|count|
##+-------+-----+
##| 43.0| 1|
##| 45.0| 3|
##| 46.0| 5|
##| 47.0| 4|
##| 48.0| 3|
##| 49.0| 5|
##| 50.0| 5|
##| 51.0| 6|
##| 52.0| 5|
##| 53.0| 7|
##+-------+-----+
##only showing top 10 rows
sparkR
シェルで即時実行を有効にするには、--conf
オプションにspark.sql.repl.eagerEval.enabled=true
構成プロパティを追加してください。
SparkRからのSQLクエリの実行
SparkDataFrameは、Spark SQLで一時ビューとして登録することもでき、そのデータに対してSQLクエリを実行できます。sql
関数を使用すると、アプリケーションはプログラムでSQLクエリを実行でき、結果がSparkDataFrame
として返されます。
# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")
# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
##1 Justin
機械学習
アルゴリズム
SparkRは、現在、次の機械学習アルゴリズムをサポートしています。
分類
spark.logit
:ロジスティック回帰
spark.mlp
:多層パーセプトロン(MLP)
spark.naiveBayes
:ナイーブベイズ
spark.svmLinear
:線形サポートベクターマシン
spark.fmClassifier
:因子分解マシン分類器
回帰
spark.survreg
:加速故障時間(AFT)生存モデル
spark.glm
またはglm
:一般化線形モデル(GLM)
spark.isoreg
:等調回帰
spark.lm
:線形回帰
spark.fmRegressor
:因子分解マシン回帰器
木
spark.decisionTree
:決定木
(回帰
および分類
)spark.gbt
:勾配ブースティング木
(回帰
および分類
)spark.randomForest
:ランダムフォレスト
(回帰
および分類
)
クラスタリング
spark.bisectingKmeans
:二分k平均法
spark.gaussianMixture
:ガウス混合モデル(GMM)
spark.kmeans
:K平均法
spark.lda
:潜在的ディリクレ配分(LDA)
spark.powerIterationClustering (PIC)
:べき乗反復クラスタリング(PIC)
協調フィルタリング
頻出パターンマイニング
統計
spark.kstest
:コルモゴロフ-スミルノフ検定
内部的には、SparkRはMLlibを使用してモデルをトレーニングします。コード例については、MLlibユーザーガイドの対応するセクションを参照してください。ユーザーは、summary
を呼び出して、適合したモデルの要約を出力したり、predictを使用して新しいデータの予測を行ったり、write.ml/read.mlを使用して適合したモデルを保存/ロードしたりできます。SparkRは、モデルフィッティングのために、利用可能なRの数式演算子(’~’, ‘.’, ‘:’, ‘+’, ‘-‘など)のサブセットをサポートしています。
モデルの永続化
次の例は、SparkRでMLlibモデルを保存/ロードする方法を示しています。
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)
# Check model summary
summary(gaussianGLM2)
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)
unlink(modelPath)
RとSpark間のデータ型マッピング
R | Spark |
---|---|
byte | byte |
integer | integer |
float | float |
double | double |
numeric | double |
character | string |
string | string |
binary | binary |
raw | binary |
logical | boolean |
POSIXct | timestamp |
POSIXlt | timestamp |
Date | date |
array | array |
list | array |
env | map |
構造化ストリーミング
SparkRは、構造化ストリーミングAPIをサポートしています。構造化ストリーミングは、Spark SQLエンジン上に構築された、スケーラブルでフォールトトレラントなストリーム処理エンジンです。詳細については、構造化ストリーミングプログラミングガイドのR APIを参照してください。
SparkRでのApache Arrow
Apache Arrowは、JVMとRプロセス間でデータを効率的に転送するためにSparkで使用される、インメモリの列指向データ形式です。PySparkの最適化(Apache Arrowを使用したPandas向けのPySpark使用ガイド)も参照してください。このガイドでは、SparkRでArrow最適化を使用する方法といくつかの重要なポイントについて説明します。
Arrowがインストールされていることを確認
Arrow RライブラリはCRANで入手でき、以下のようにインストールできます。
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
詳細については、Apache Arrowの公式ドキュメントを参照してください。
すべてのクラスターノードにArrow Rパッケージがインストールされ、利用可能であることを確認する必要があります。現在サポートされている最小バージョンは1.0.0です。ただし、SparkRでのArrow最適化は実験的なものであるため、マイナーリリース間で変更される可能性があります。
R DataFrameとの間の変換、dapply
および gapply
の有効化
Arrow最適化は、collect(spark_df)
の呼び出しを使用してSpark DataFrameをR DataFrameに変換するとき、createDataFrame(r_df)
を使用してR DataFrameからSpark DataFrameを作成するとき、dapply(...)
を介して各パーティションにRネイティブ関数を適用するとき、およびgapply(...)
を介してグループ化されたデータにRネイティブ関数を適用するときに使用できます。これらを実行するときにArrowを使用するには、最初にSpark構成 'spark.sql.execution.arrow.sparkr.enabled'を 'true' に設定する必要があります。これはデフォルトでは無効になっています。
最適化が有効かどうかに関わらず、SparkR は同じ結果を生成します。さらに、Spark DataFrame と R DataFrame 間の変換は、実際の計算前に何らかの理由で最適化が失敗した場合、自動的に Arrow 最適化ではない実装にフォールバックします。
# Start up spark session with Arrow optimization enabled
sparkR.session(master = "local[*]",
sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
# Converts Spark DataFrame from an R DataFrame
spark_df <- createDataFrame(mtcars)
# Converts Spark DataFrame to an R DataFrame
collect(spark_df)
# Apply an R native function to each partition.
collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
# Apply an R native function to grouped data.
collect(gapply(spark_df,
"gear",
function(key, group) {
data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
},
structType("gear double, disp boolean")))
Arrow を使用する場合でも、collect(spark_df)
は DataFrame 内のすべてのレコードをドライバプログラムに収集するため、データの小さなサブセットに対してのみ実行する必要があります。さらに、gapply(...)
および dapply(...)
で指定された出力スキーマは、指定された関数によって返される R DataFrame のスキーマと一致する必要があります。
サポートされているSQL型
現在、FloatType
、BinaryType
、ArrayType
、StructType
、および MapType
を除く、すべての Spark SQL データ型が Arrow ベースの変換でサポートされています。
R関数名の競合
R で新しいパッケージをロードしてアタッチする場合、関数が別の関数をマスクする名前の競合が発生する可能性があります。
以下の関数は SparkR パッケージによってマスクされています。
マスクされた関数 | アクセス方法 |
---|---|
package:stats 内の cov |
|
package:stats 内の filter |
|
package:base 内の sample |
base::sample(x, size, replace = FALSE, prob = NULL) |
SparkR の一部は dplyr
パッケージに基づいてモデル化されているため、SparkR の特定の関数は dplyr
の関数と同じ名前を共有しています。2 つのパッケージのロード順序に応じて、最初にロードされたパッケージの一部の関数は、後でロードされたパッケージの関数によってマスクされます。このような場合は、例えば SparkR::cume_dist(x)
または dplyr::cume_dist(x)
のように、パッケージ名をプレフィックスとして付与して呼び出してください。
R での検索パスは、search()
で確認できます。
移行ガイド
移行ガイドは、このページでアーカイブされました。