コンテンツへスキップ

SparkR は Apache Spark 4.0.0 で非推奨となり、将来のバージョンで削除される予定です。

概要

SparkR は、R から Apache Spark を使用するための軽量なフロントエンドを提供する R パッケージです。Spark 4.0.0 では、SparkR は、選択、フィルタリング、集計などのデータ処理操作をサポートする分散データフレーム実装と、MLlib を使用した分散機械学習を提供します。

はじめに

ローカルマシンで実行する例から始め、SparkR の使用方法の概要(データ取り込み、データ処理、機械学習)を説明します。

まず、パッケージをロードしてアタッチしましょう。

SparkSession は SparkR へのエントリポイントであり、R プログラムを Spark クラスタに接続します。sparkR.session を使用して SparkSession を作成し、アプリケーション名、依存する Spark パッケージなどのオプションを渡すことができます。

ローカルモードで実行するデフォルト設定を使用します。以前のインストールが見つからない場合は、Spark パッケージをバックグラウンドで自動ダウンロードします。セットアップの詳細については、「Spark Session」を参照してください。

## Java ref type org.apache.spark.sql.classic.SparkSession id 1

SparkR の操作は、SparkDataFrame という R クラスを中心に展開されます。これは、名前付き列に編成されたデータの分散コレクションであり、概念的にはリレーショナルデータベースのテーブルまたは R のデータフレームと同等ですが、内部でより豊富な最適化が行われています。

SparkDataFrame は、構造化データファイル、Hive のテーブル、外部データベース、または既存のローカル R データフレームなど、さまざまなソースから構築できます。たとえば、ローカル R データフレームから SparkDataFrame を作成します。

cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

head または showDF 関数を使用して、SparkDataFrame の最初の数行を表示できます。

head(carsDF)
##               model  mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1         Mazda RX4 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
## 2     Mazda RX4 Wag 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
## 3        Datsun 710 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
## 4    Hornet 4 Drive 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
## 5 Hornet Sportabout 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2
## 6           Valiant 18.1   6  225 105 2.76 3.460 20.22  1  0    3    1

filterselect などの一般的なデータ処理操作は、SparkDataFrame でサポートされています。

carsSubDF <- select(carsDF, "model", "mpg", "hp")
carsSubDF <- filter(carsSubDF, carsSubDF$hp >= 200)
head(carsSubDF)
##                 model  mpg  hp
## 1          Duster 360 14.3 245
## 2  Cadillac Fleetwood 10.4 205
## 3 Lincoln Continental 10.4 215
## 4   Chrysler Imperial 14.7 230
## 5          Camaro Z28 13.3 245
## 6      Ford Pantera L 15.8 264

SparkR は、グループ化後に多くの一般的な集計関数を使用できます。

carsGPDF <- summarize(groupBy(carsDF, carsDF$gear), count = n(carsDF$gear))
head(carsGPDF)
##   gear count
## 1    4    12
## 2    3    15
## 3    5     5

結果の carsDF および carsSubDFSparkDataFrame オブジェクトです。R の data.frame に戻すには、collect を使用できます。注意: collect() は、Spark ドライバーとして機能するクライアントにすべての分散 DataFrame をフェッチするため、インタラクティブ環境のメモリが不足する可能性があります。

carsGP <- collect(carsGPDF)
class(carsGP)
## [1] "data.frame"

SparkR は、一般的に使用される機械学習アルゴリズムのいくつかをサポートしています。内部では、SparkR は MLlib を使用してモデルをトレーニングします。ユーザーは summary を呼び出して適合モデルの概要を表示したり、predict を呼び出して新しいデータで予測を作成したり、write.ml/read.ml を呼び出して適合モデルを保存/ロードしたりできます。

SparkR は、R の式演算子の一部(「〜」、「.」、「:」、「+」、「-」)をモデルフィッティングでサポートしています。線形回帰を例として使用します。

model <- spark.glm(carsDF, mpg ~ wt + cyl)

結果は、carsDF の対応する data.frame mtcars に適用された R の glm 関数によって返される結果と一致します。実際、一般化線形モデルの場合、SparkDataFrame 用に glm を明示的に公開しているため、上記は model <- glm(mpg ~ wt + cyl, data = carsDF) と同等です。

summary(model)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -4.2893  -1.7085  -0.4713   1.5729   6.1004  
## 
## Coefficients:
##              Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)   39.6863     1.71498  23.1409  0.00000000
## wt            -3.1910     0.75691  -4.2158  0.00022202
## cyl           -1.5078     0.41469  -3.6360  0.00106428
## 
## (Dispersion parameter for gaussian family taken to be 6.592137)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  191.17  on 29  degrees of freedom
## AIC: 156
## 
## Number of Fisher Scoring iterations: 1

モデルは write.ml で保存し、read.ml でロードし直すことができます。

write.ml(model, path = "/HOME/tmp/mlModel/glmModel")

最後に、Spark セッションを停止するには、次を実行します。

セットアップ

インストール

他の多くの R パッケージとは異なり、SparkR を使用するには、Apache Spark の追加インストールが必要です。Spark のインストールは、SparkR プログラムをコンパイルおよび実行するバックエンドプロセスを実行するために使用されます。

SparkR パッケージをインストールした後、前のセクションで説明したように sparkR.session を呼び出して開始すると、Spark のインストールがチェックされます。インタラクティブシェル(例: R、RStudio)から SparkR を操作している場合、Spark が見つからない場合は自動的にダウンロードおよびキャッシュされます。または、使いやすい関数 install.spark を提供しており、これを手動で実行できます。コンピュータに Spark がインストールされていない場合は、Apache Spark Web サイトからダウンロードできます。

すでに Spark がインストールされている場合は、再度インストールする必要はありません。sparkHome 引数を sparkR.session に渡して、既存の Spark インストール場所を SparkR に通知できます。

sparkR.session(sparkHome = "/HOME/spark")

Spark Session

sparkHome に加えて、sparkR.session で他の多くのオプションを指定できます。完全なリストについては、「起動: SparkSession」および「SparkR API ドキュメント」を参照してください。

特に、以下の Spark ドライバープロパティを sparkConfig で設定できます。

プロパティ名 プロパティグループ spark-submit 相当
spark.driver.memory アプリケーションプロパティ --driver-memory
spark.driver.extraClassPath 実行時環境 --driver-class-path
spark.driver.extraJavaOptions 実行時環境 --driver-java-options
spark.driver.extraLibraryPath 実行時環境 --driver-library-path
spark.kerberos.keytab アプリケーションプロパティ --keytab
spark.kerberos.principal アプリケーションプロパティ --principal

Windows ユーザーの場合: オペレーティングシステム間でファイルプレフィックスが異なるため、誤ったプレフィックスの可能性による問題を回避するには、SparkSession を開始するときに spark.sql.warehouse.dir を指定することが現在の回避策です。

spark_warehouse_path <- file.path(path.expand('~'), "spark-warehouse")
sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)

クラスタモード

SparkR はリモート Spark クラスタに接続できます。「クラスタモードの概要」は、さまざまな Spark クラスタモードの優れた入門書です。

SparkR をリモート Spark クラスタに接続する際は、ローカルマシン上の Spark のバージョンと Hadoop のバージョンがクラスタ上の対応するバージョンと一致していることを確認してください。現在の SparkR パッケージは、

## [1] "Spark 4.0.0"

ローカルコンピュータとリモートクラスタの両方で使用する必要があります。

接続するには、マスターノードの URL を sparkR.session に渡します。完全なリストは、「Spark マスター URL」で確認できます。たとえば、ローカルスタンドアロン Spark マスターに接続するには、次のように呼び出すことができます。

sparkR.session(master = "spark://local:7077")

YARN クラスタの場合、SparkR はクライアントモードをサポートしており、マスターは「yarn」に設定されています。

sparkR.session(master = "yarn")

YARN クラスタモードは、現在のバージョンではサポートされていません。

データインポート

ローカルデータフレーム

最も簡単な方法は、ローカル R データフレームを SparkDataFrame に変換することです。具体的には、as.DataFrame または createDataFrame を使用し、ローカル R データフレームを渡して SparkDataFrame を作成できます。例として、以下は R の faithful データセットに基づいて SparkDataFrame を作成します。

df <- as.DataFrame(faithful)
head(df)
##   eruptions waiting
## 1     3.600      79
## 2     1.800      54
## 3     3.333      74
## 4     2.283      62
## 5     4.533      85
## 6     2.883      55

データソース

SparkR は、SparkDataFrame インターフェースを通じてさまざまなデータソースの操作をサポートしています。組み込みデータソースで利用可能な詳細なオプションについては、Spark SQL プログラミングガイドを参照してください。

データソースから SparkDataFrame を作成する一般的な方法は read.df です。このメソッドは、ロードするファイルのパスとデータソースのタイプを受け取り、現在アクティブな Spark セッションが自動的に使用されます。SparkR は、CSV、JSON、Parquet ファイルをネイティブに読み取ることをサポートしており、Spark Packages を通じて Avro のような一般的なファイル形式のデータソースコネクタを見つけることができます。これらのパッケージは、sparkR.session を使用して SparkSession を初期化する際に sparkPackages パラメータで追加できます。

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0")

CSV 入力ファイルを使用したデータソースの使用方法を見てみましょう。詳細については、SparkR の read.df API ドキュメントを参照してください。

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

データソース API は、JSON 形式の入力ファイルをネイティブでサポートしています。ここで使用されているファイルは、典型的な JSON ファイルではないことに注意してください。ファイル内の各行には、個別の、自己完結型の有効な JSON オブジェクトが含まれている必要があります。その結果、通常の複数行 JSON ファイルはほとんどの場合失敗します。

ここで使用されている生の JSON ファイルの最初の 2 行を見てみましょう。

filePath <- paste0(sparkR.conf("spark.home"),
                         "/examples/src/main/resources/people.json")
readLines(filePath, n = 2L)
## [1] "{\"name\":\"Michael\"}"          "{\"name\":\"Andy\", \"age\":30}"

read.df を使用して、それを SparkDataFrame に読み込みます。

people <- read.df(filePath, "json")
count(people)
## [1] 3
head(people)
##   age    name
## 1  NA Michael
## 2  30    Andy
## 3  19  Justin

SparkR は JSON ファイルからスキーマを自動的に推論します。

printSchema(people)
## root
##  |-- age: long (nullable = true)
##  |-- name: string (nullable = true)

複数の JSON ファイルを読み込みたい場合は、read.json を使用できます。

people <- read.json(paste0(Sys.getenv("SPARK_HOME"),
                           c("/examples/src/main/resources/people.json",
                             "/examples/src/main/resources/people.json")))
count(people)
## [1] 6

データソース API は、SparkDataFrames を複数のファイル形式に保存するためにも使用できます。たとえば、前の例の SparkDataFramewrite.df を使用して Parquet ファイルに保存できます。

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

Hive テーブル

Hive テーブルから SparkDataFrames を作成することもできます。これを行うには、Hive MetaStore のテーブルにアクセスできる Hive サポート付きの SparkSession を作成する必要があります。Spark が Hive サポート付きでビルドされていることを確認してください。詳細については、SQL プログラミングガイドを参照してください。SparkR では、デフォルトで Hive サポートが有効な SparkSession (enableHiveSupport = TRUE) を作成しようとします。

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

txtPath <- paste0(sparkR.conf("spark.home"), "/examples/src/main/resources/kv1.txt")
sqlCMD <- sprintf("LOAD DATA LOCAL INPATH '%s' INTO TABLE src", txtPath)
sql(sqlCMD)

results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)

データ処理

dplyr ユーザー向け: SparkR はデータ処理で dplyr と同様のインターフェースを持っています。しかし、最初に言及する価値のあるいくつかの顕著な違いがあります。ここでは、dfSparkDataFramecol を列名として使用します。

  1. 列を示します。SparkR は、列名を表す文字列または $ で構築された Column オブジェクトのいずれかを使用して列を示します。たとえば、dfcol を選択するには、select(df, "col") または select(df, df$col) と記述できます。

  2. 条件を記述します。SparkR では、Column オブジェクト表現を条件に直接挿入することも、SparkDataFrame を参照せずに、文字列を使用して条件を記述することもできます。たとえば、値が 1 より大きい行を選択するには、filter(df, df$col > 1) または filter(df, "col > 1") と記述できます。

より具体的な例を以下に示します。

dplyr SparkR
select(mtcars, mpg, hp) select(carsDF, "mpg", "hp")
filter(mtcars, mpg > 20, hp > 100) filter(carsDF, carsDF$mpg > 20, carsDF$hp > 100)

その他の違いについては、特定のメソッドで言及します。

上記で作成した SparkDataFrame carsDF を使用します。SparkDataFrame に関する基本的な情報を取得できます。

carsDF
## SparkDataFrame[model:string, mpg:double, cyl:double, disp:double, hp:double, drat:double, wt:double, qsec:double, vs:double, am:double, gear:double, carb:double]

スキーマをツリー形式で出力します。

printSchema(carsDF)
## root
##  |-- model: string (nullable = true)
##  |-- mpg: double (nullable = true)
##  |-- cyl: double (nullable = true)
##  |-- disp: double (nullable = true)
##  |-- hp: double (nullable = true)
##  |-- drat: double (nullable = true)
##  |-- wt: double (nullable = true)
##  |-- qsec: double (nullable = true)
##  |-- vs: double (nullable = true)
##  |-- am: double (nullable = true)
##  |-- gear: double (nullable = true)
##  |-- carb: double (nullable = true)

SparkDataFrame 操作

行、列の選択

SparkDataFrames は、構造化データ処理を行うための多くの関数をサポートしています。ここでは基本的な例をいくつか紹介します。完全なリストは API ドキュメントにあります。

列名を文字列として渡すこともできます。

head(select(carsDF, "mpg"))
##    mpg
## 1 21.0
## 2 21.0
## 3 22.8
## 4 21.4
## 5 18.7
## 6 18.1

mpg が 20 マイル/ガロン未満の行のみを保持するように SparkDataFrame をフィルタリングします。

head(filter(carsDF, carsDF$mpg < 20))
##               model  mpg cyl  disp  hp drat   wt  qsec vs am gear carb
## 1 Hornet Sportabout 18.7   8 360.0 175 3.15 3.44 17.02  0  0    3    2
## 2           Valiant 18.1   6 225.0 105 2.76 3.46 20.22  1  0    3    1
## 3        Duster 360 14.3   8 360.0 245 3.21 3.57 15.84  0  0    3    4
## 4          Merc 280 19.2   6 167.6 123 3.92 3.44 18.30  1  0    4    4
## 5         Merc 280C 17.8   6 167.6 123 3.92 3.44 18.90  1  0    4    4
## 6        Merc 450SE 16.4   8 275.8 180 3.07 4.07 17.40  0  0    3    3

グループ化、集計

グループ化と集計の一般的なフローは次のとおりです。

  1. groupBy または group_by をいくつかのグループ化変数に対して使用して、GroupedData オブジェクトを作成します。

  2. GroupedData オブジェクトを、各グループ内で数値を計算するために提供される集計関数とともに、agg または summarize 関数に渡します。

グループ化後のデータ集計には、avgcount_distinctcountfirstkurtosislastmaxmeanminsdskewnessstddev_popstddev_sampsum_distinctsumvar_popvar_sampvar を含む、広く使用されている多くの関数がサポートされています。リンクされている集計関数の API ドキュメントを参照してください。

たとえば、mtcars データセットのシリンダー数のヒストグラムを以下のように計算できます。

numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl))
head(numCyl)
##   cyl count
## 1   8    14
## 2   4    11
## 3   6     7

cube または rollup を使用して、複数次元にわたる小計を計算します。

mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]

{ (cyl, gear, am), (cyl, gear), (cyl), () } のグループ化を生成し、

mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
## SparkDataFrame[cyl:double, gear:double, am:double, avg(mpg):double]

グループ化列のすべての可能な組み合わせのグループ化を生成します。

列の操作

SparkR は、データ処理や集計中に直接列に適用できる多くの関数も提供しています。以下の例は、基本的な算術関数の使用を示しています。

carsDF_km <- carsDF
carsDF_km$kmpg <- carsDF_km$mpg * 1.61
head(select(carsDF_km, "model", "mpg", "kmpg"))
##               model  mpg   kmpg
## 1         Mazda RX4 21.0 33.810
## 2     Mazda RX4 Wag 21.0 33.810
## 3        Datsun 710 22.8 36.708
## 4    Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6           Valiant 18.1 29.141

ウィンドウ関数

ウィンドウ関数は集計関数の一種です。簡単に言うと、

  • 集計関数: n から 1 へのマッピング - エントリのグループに対して 1 つの値を返します。例として、sumcountmax があります。

  • ウィンドウ関数: n から n へのマッピング - グループ内の各エントリに対して 1 つの値を返しますが、その値は *グループ* のすべてのエントリに依存する場合があります。例として、rankleadlag があります。

正式には、上記で言及した *グループ* は *フレーム* と呼ばれます。各入力行には一意のフレームが関連付けられ、その行に対するウィンドウ関数の出力は、そのフレームに含まれる行に基づいています。

ウィンドウ関数は、次の関数と組み合わせて使用されることがよくあります: windowPartitionBywindowOrderBypartitionByorderByover。これを説明するために、次に例を見てみましょう。

mtcars データセットを引き続き使用します。対応する SparkDataFramecarsDF です。シリンダーの数ごとに、グループ内の mpg における各車のランクを計算したいとします。

carsSubDF <- select(carsDF, "model", "mpg", "cyl")
ws <- orderBy(windowPartitionBy("cyl"), "mpg")
carsRank <- withColumn(carsSubDF, "rank", over(rank(), ws))
head(carsRank, n = 20L)
##                  model  mpg cyl rank
## 1           Volvo 142E 21.4   4    1
## 2        Toyota Corona 21.5   4    2
## 3           Datsun 710 22.8   4    3
## 4             Merc 230 22.8   4    3
## 5            Merc 240D 24.4   4    5
## 6        Porsche 914-2 26.0   4    6
## 7            Fiat X1-9 27.3   4    7
## 8          Honda Civic 30.4   4    8
## 9         Lotus Europa 30.4   4    8
## 10            Fiat 128 32.4   4   10
## 11      Toyota Corolla 33.9   4   11
## 12           Merc 280C 17.8   6    1
## 13             Valiant 18.1   6    2
## 14            Merc 280 19.2   6    3
## 15        Ferrari Dino 19.7   6    4
## 16           Mazda RX4 21.0   6    5
## 17       Mazda RX4 Wag 21.0   6    5
## 18      Hornet 4 Drive 21.4   6    7
## 19  Cadillac Fleetwood 10.4   8    1
## 20 Lincoln Continental 10.4   8    1

上記のステップを詳細に説明します。

  • windowPartitionBy は、パーティションを定義するウィンドウ仕様オブジェクト WindowSpec を作成します。これは、どの行が指定された行と同じパーティションに入るかを制御します。この場合、cyl の値が同じ行が同じパーティションに入れられます。orderBy はさらに順序を定義します - 指定された行がパーティション内のどの位置にあるか。結果の WindowSpecws として返されます。

その他のウィンドウ仕様メソッドには、値によってフレームの境界を定義できる rangeBetween、行インデックスによって境界を定義できる rowsBetween があります。

  • withColumn は、rank という名前の列を SparkDataFrame に追加します。over はウィンドウ処理列を返します。最初の引数は通常、rank()lead(carsDF$wt) などのウィンドウ関数によって返される列です。これは、パーティション化および順序付けされたテーブルに従って対応する値を計算します。

ユーザー定義関数

SparkR では、いくつかの種類のユーザー定義関数 (UDF) をサポートしています。

パーティションごとの適用

dapply は、関数を SparkDataFrame の各パーティションに適用できます。SparkDataFrame の各パーティションに適用される関数は、1 つのパラメータ(パーティションに対応する data.frame)のみを持つ必要があり、出力も data.frame である必要があります。Schema は、結果の SparkDataFrame の行形式を指定します。返される値のデータ型と一致している必要があります。R と Spark のマッピングについては、こちらを参照してください。

mpgkmpg (キロメートル/ガロン) に変換します。carsSubDF は、carsDF の列のサブセットを持つ SparkDataFrame です。

carsSubDF <- select(carsDF, "model", "mpg")
schema <- "model STRING, mpg DOUBLE, kmpg DOUBLE"
out <- dapply(carsSubDF, function(x) { x <- cbind(x, x$mpg * 1.61) }, schema)
head(collect(out))
##               model  mpg   kmpg
## 1         Mazda RX4 21.0 33.810
## 2     Mazda RX4 Wag 21.0 33.810
## 3        Datsun 710 22.8 36.708
## 4    Hornet 4 Drive 21.4 34.454
## 5 Hornet Sportabout 18.7 30.107
## 6           Valiant 18.1 29.141

dapply と同様に、dapplyCollect は、関数を SparkDataFrame の各パーティションに適用し、結果を収集できます。関数の出力は data.frame である必要がありますが、この場合はスキーマは必要ありません。dapplyCollect は、UDF のすべてのパーティションでの出力がドライバーのメモリに読み込めない場合に失敗する可能性があることに注意してください。

out <- dapplyCollect(
         carsSubDF,
         function(x) {
           x <- cbind(x, "kmpg" = x$mpg * 1.61)
         })
head(out, 3)
##           model  mpg   kmpg
## 1     Mazda RX4 21.0 33.810
## 2 Mazda RX4 Wag 21.0 33.810
## 3    Datsun 710 22.8 36.708

グループごとの適用

gapply は、関数を SparkDataFrame の各グループに適用できます。関数は SparkDataFrame の各グループに適用され、2 つのパラメータ(グループキーと、そのキーに対応する R data.frame)のみを持つ必要があります。グループは SparkDataFrames の列から選択されます。関数の出力は data.frame である必要があります。Schema は、結果の SparkDataFrame の行形式を指定します。これは、Spark のデータ型に基づいて R 関数の出力スキーマを表す必要があります。返される data.frame の列名はユーザーによって設定されます。R と Spark のマッピングについては、こちらを参照してください。

schema <- structType(structField("cyl", "double"), structField("max_mpg", "double"))
result <- gapply(
    carsDF,
    "cyl",
    function(key, x) {
        y <- data.frame(key, max(x$mpg))
    },
    schema)
head(arrange(result, "max_mpg", decreasing = TRUE))
##   cyl max_mpg
## 1   4    33.9
## 2   6    21.4
## 3   8    19.2

gapply と同様に、gapplyCollect は、関数を SparkDataFrame の各パーティションに適用し、結果を R data.frame に収集できます。関数の出力は data.frame である必要がありますが、この場合はスキーマは必要ありません。gapplyCollect は、UDF のすべてのパーティションでの出力がドライバーのメモリに読み込めない場合に失敗する可能性があることに注意してください。

result <- gapplyCollect(
    carsDF,
    "cyl",
    function(key, x) {
         y <- data.frame(key, max(x$mpg))
        colnames(y) <- c("cyl", "max_mpg")
        y
    })
head(result[order(result$max_mpg, decreasing = TRUE), ])
##   cyl max_mpg
## 1   4    33.9
## 2   6    21.4
## 3   8    19.2

ローカル関数の分散

ネイティブ R の lapply と同様に、spark.lapply はリストの要素に対して関数を実行し、Spark で計算を分散します。spark.lapply は、リストの要素に対する doParallel または lapply と同様の方法で機能します。すべての計算の結果は、単一のマシンに収まる必要があります。そうでない場合は、df <- createDataFrame(list) のようにして、dapply を使用できます。

パッケージ e1071svm を例として使用します。制約違反のコストを変更すること以外は、すべてのデフォルト設定を使用します。spark.lapply は、これらの異なるモデルを並列でトレーニングできます。

costs <- exp(seq(from = log(1), to = log(1000), length.out = 5))
train <- function(cost) {
  stopifnot(requireNamespace("e1071", quietly = TRUE))
  model <- e1071::svm(Species ~ ., data = iris, cost = cost)
  summary(model)
}

モデルの概要のリストを返します。

model.summaries <- spark.lapply(costs, train)
class(model.summaries)
## [1] "list"

長すぎる表示を避けるため、2 番目の適合モデルの部分的な結果のみを示します。他のモデルも自由に検査できます。

print(model.summaries[[2]])
## $call
## svm(formula = Species ~ ., data = iris, cost = cost)
## 
## $type
## [1] 0
## 
## $kernel
## [1] 2
## 
## $cost
## [1] 5.623413
## 
## $degree
## [1] 3
## 
## $gamma
## [1] 0.25
## 
## $coef0
## [1] 0
## 
## $nu
## [1] 0.5
## 
## $epsilon
## [1] 0.1
## 
## $sparse
## [1] FALSE
## 
## $scaled
## [1] TRUE TRUE TRUE TRUE
## 
## $x.scale
## $x.scale$`scaled:center`
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     5.843333     3.057333     3.758000     1.199333 
## 
## $x.scale$`scaled:scale`
## Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##    0.8280661    0.4358663    1.7652982    0.7622377 
## 
## 
## $y.scale
## NULL
## 
## $nclasses
## [1] 3
## 
## $levels
## [1] "setosa"     "versicolor" "virginica" 
## 
## $tot.nSV
## [1] 35
## 
## $nSV
## [1]  6 15 14
## 
## $labels
## [1] 1 2 3
## 
## $SV
##     Sepal.Length Sepal.Width Petal.Length Petal.Width
## 14   -1.86378030 -0.13153881   -1.5056946  -1.4422448
## 16   -0.17309407  3.08045544   -1.2791040  -1.0486668
## 21   -0.53538397  0.78617383   -1.1658087  -1.3110521
## 23   -1.50149039  1.24503015   -1.5623422  -1.3110521
## 24   -0.89767388  0.55674567   -1.1658087  -0.9174741
## 42   -1.62225369 -1.73753594   -1.3923993  -1.1798595
## 51    1.39682886  0.32731751    0.5336209   0.2632600
## 53    1.27606556  0.09788935    0.6469162   0.3944526
## 54   -0.41462067 -1.73753594    0.1370873   0.1320673
## 55    0.79301235 -0.59039513    0.4769732   0.3944526
##  [ reached getOption("max.print") -- omitted 25 rows ]
## 
## $index
##  [1]  14  16  21  23  24  42  51  53  54  55  58  61  69  71  73  78  79  84  85
## [20]  86  99 107 111 119 120 124 127 128 130 132 134 135 139 149 150
## 
## $rho
## [1] -0.10346530  0.12160294 -0.09540346
## 
## $compprob
## [1] FALSE
## 
## $probA
## NULL
## 
## $probB
## NULL
## 
## $sigma
## NULL
## 
## $coefs
##              [,1]        [,2]
##  [1,]  0.00000000  0.06561739
##  [2,]  0.76813720  0.93378721
##  [3,]  0.00000000  0.12123270
##  [4,]  0.00000000  0.31170741
##  [5,]  1.11614066  0.46397392
##  [6,]  1.88141600  1.10392128
##  [7,] -0.55872622  0.00000000
##  [8,]  0.00000000  5.62341325
##  [9,]  0.00000000  0.27711792
## [10,]  0.00000000  5.28440007
## [11,] -1.06596713  0.00000000
## [12,] -0.57076709  1.09019756
## [13,] -0.03365904  5.62341325
## [14,]  0.00000000  5.62341325
## [15,]  0.00000000  5.62341325
## [16,]  0.00000000  5.62341325
## [17,]  0.00000000  4.70398738
## [18,]  0.00000000  5.62341325
## [19,]  0.00000000  4.97981371
## [20,] -0.77497987  0.00000000
##  [ reached getOption("max.print") -- omitted 15 rows ]
## 
## $na.action
## NULL
## 
## $xlevels
## named list()
## 
## $fitted
##      1      2      3      4      5      6      7      8      9     10     11 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     12     13     14     15     16     17     18     19     20     21     22 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     23     24     25     26     27     28     29     30     31     32     33 
## setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa setosa 
##     34     35     36     37     38     39     40 
## setosa setosa setosa setosa setosa setosa setosa 
##  [ reached getOption("max.print") -- omitted 110 entries ]
## Levels: setosa versicolor virginica
## 
## $decision.values
##     setosa/versicolor setosa/virginica versicolor/virginica
## 1           1.1911739        1.0908424            1.1275805
## 2           1.1336557        1.0619543            1.3260964
## 3           1.2085065        1.0698101            1.0511345
## 4           1.1646153        1.0505915            1.0806874
## 5           1.1880814        1.0950348            0.9542815
## 6           1.0990761        1.0984626            0.9326361
## 7           1.1573474        1.0343287            0.9726843
## 8           1.1851598        1.0815750            1.2206802
## 9           1.1673499        1.0406734            0.8837945
## 10          1.1629911        1.0560925            1.2430067
## 11          1.1339282        1.0803946            1.0338357
## 12          1.1724182        1.0641469            1.1190423
## 13          1.1827355        1.0667956            1.1414844
##  [ reached getOption("max.print") -- omitted 137 rows ]
## 
## $terms
## Species ~ Sepal.Length + Sepal.Width + Petal.Length + Petal.Width
## attr(,"variables")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"factors")
##              Sepal.Length Sepal.Width Petal.Length Petal.Width
## Species                 0           0            0           0
## Sepal.Length            1           0            0           0
## Sepal.Width             0           1            0           0
## Petal.Length            0           0            1           0
## Petal.Width             0           0            0           1
## attr(,"term.labels")
## [1] "Sepal.Length" "Sepal.Width"  "Petal.Length" "Petal.Width" 
## attr(,"order")
## [1] 1 1 1 1
## attr(,"intercept")
## [1] 0
## attr(,"response")
## [1] 1
## attr(,".Environment")
## <environment: 0x5580e90fd1d8>
## attr(,"predvars")
## list(Species, Sepal.Length, Sepal.Width, Petal.Length, Petal.Width)
## attr(,"dataClasses")
##      Species Sepal.Length  Sepal.Width Petal.Length  Petal.Width 
##     "factor"    "numeric"    "numeric"    "numeric"    "numeric" 
## 
## attr(,"class")
## [1] "summary.svm"

SQL クエリ

SparkDataFrame は Spark SQL の一時ビューとして登録することもできるため、そのデータに対して SQL クエリを実行できます。sql 関数により、アプリケーションは SQL クエリをプログラムで実行し、結果を SparkDataFrame として返すことができます。

people <- read.df(paste0(sparkR.conf("spark.home"),
                         "/examples/src/main/resources/people.json"), "json")

この SparkDataFrame を一時ビューとして登録します。

createOrReplaceTempView(people, "people")

sql メソッドを使用して SQL ステートメントを実行できます。

teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin

機械学習

SparkR は、次の機械学習モデルとアルゴリズムをサポートしています。

分類

  • 線形サポートベクターマシン (SVM) 分類器

  • ロジスティック回帰

  • 多層パーセプトロン (MLP)

  • ナイーブベイズ

  • 因子機械 (FM) 分類器

回帰

  • 加速故障時間 (AFT) 生存モデル

  • 一般化線形モデル (GLM)

  • 単調回帰

  • 線形回帰

  • 因子機械 (FM) 回帰子

ツリー - 分類と回帰

  • 決定木

  • 勾配ブースティングツリー (GBT)

  • ランダムフォレスト

クラスタリング

  • 二分kk-means

  • ガウス混合モデル (GMM)

  • kk-means クラスタリング

  • 潜在ディリクレ配分法 (LDA)

  • べき乗反復クラスタリング (PIC)

協調フィルタリング

  • 交互最小二乗法 (ALS)

頻出パターンマイニング

  • FP-growth
  • PrefixSpan

統計

  • コルモゴロフ・スミルノフ検定

R 式

上記のほとんどについて、SparkR はモデルフィッティングのために、R 式演算子(「〜」、「.」、「:」、「+」、「-」を含む)をサポートしています。これにより、R 関数を使用するのと同様の体験が得られます。

トレーニングセットとテストセット

randomSplit 関数を使用して、SparkDataFrame をランダムなトレーニングセットとテストセットに簡単に分割できます。これは、提供された weights で分割された SparkDataFrames のリストを返します。carsDF を例として使用し、約7070%トレーニングデータと3030%テストデータ。

splitDF_list <- randomSplit(carsDF, c(0.7, 0.3), seed = 0)
carsDF_train <- splitDF_list[[1]]
carsDF_test <- splitDF_list[[2]]
count(carsDF_train)
## [1] 24
head(carsDF_train)
##                model  mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1 Cadillac Fleetwood 10.4   8  472 205 2.93 5.250 17.98  0  0    3    4
## 2         Camaro Z28 13.3   8  350 245 3.73 3.840 15.41  0  0    3    4
## 3  Chrysler Imperial 14.7   8  440 230 3.23 5.345 17.42  0  0    3    4
## 4   Dodge Challenger 15.5   8  318 150 2.76 3.520 16.87  0  0    3    2
## 5         Duster 360 14.3   8  360 245 3.21 3.570 15.84  0  0    3    4
## 6       Ferrari Dino 19.7   6  145 175 3.62 2.770 15.50  0  1    5    6
count(carsDF_test)
## [1] 8
head(carsDF_test)
##            model  mpg cyl  disp  hp drat    wt  qsec vs am gear carb
## 1    AMC Javelin 15.2   8 304.0 150 3.15 3.435 17.30  0  0    3    2
## 2     Datsun 710 22.8   4 108.0  93 3.85 2.320 18.61  1  1    4    1
## 3       Fiat 128 32.4   4  78.7  66 4.08 2.200 19.47  1  1    4    1
## 4      Merc 240D 24.4   4 146.7  62 3.69 3.190 20.00  1  0    4    2
## 5       Merc 280 19.2   6 167.6 123 3.92 3.440 18.30  1  0    4    4
## 6 Toyota Corolla 33.9   4  71.1  65 4.22 1.835 19.90  1  1    4    1

モデルとアルゴリズム

線形サポートベクターマシン (SVM) 分類器

線形サポートベクターマシン (SVM) 分類器は、線形カーネルを使用した SVM 分類器です。これは二項分類器です。簡単な例を使用して、二項分類に spark.svmLinear を使用する方法を示します。

# load training data and create a DataFrame
t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Linear SVM classifier model
model <- spark.svmLinear(training,  Survived ~ ., regParam = 0.01, maxIter = 10)
summary(model)
## $coefficients
##                 Estimate
## (Intercept)  0.993131388
## Class_1st   -0.386500359
## Class_2nd   -0.622627816
## Class_3rd   -0.204446602
## Sex_Female  -0.589950309
## Age_Adult    0.741676902
## Freq        -0.006582887
## 
## $numClasses
## [1] 2
## 
## $numFeatures
## [1] 6

トレーニングデータでの値の予測

prediction <- predict(model, training)
head(select(prediction, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No        Yes
## 2   2nd   Male Child    0       No        Yes
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No        Yes
## 5   1st Female Child    0       No        Yes
## 6   2nd Female Child    0       No         No

ロジスティック回帰

ロジスティック回帰は、応答がカテゴリカルな場合に広く使用されるモデルです。一般化線形予測モデルの特殊なケースと見なすことができます。glmnet と同様に、弾性ネット正則化と特徴標準化をサポートするロジスティック回帰をサポートするために、spark.glm の上に spark.logit を提供します。

簡単な例を使用して spark.logit の使用法を実証します。一般に、spark.logit を使用するには 3 つのステップがあります: 1)。適切なデータソースからデータフレームを作成します。2)。適切なパラメータ設定で spark.logit を使用してロジスティック回帰モデルを適合させます。3)。summary を使用して適合モデルの係数行列を取得し、predict を使用してモデルで予測を行います。

二項ロジスティック回帰

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.logit(training, Survived ~ ., regParam = 0.04741301)
summary(model)
## $coefficients
##                  Estimate
## (Intercept)  0.2255014282
## Class_1st   -0.1338856652
## Class_2nd   -0.1479826947
## Class_3rd    0.0005674937
## Sex_Female  -0.2011183871
## Age_Adult    0.3263186885
## Freq        -0.0033111157

トレーニングデータでの値の予測

fitted <- predict(model, training)
head(select(fitted, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No        Yes
## 2   2nd   Male Child    0       No        Yes
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No        Yes
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

3 クラスに対する多項ロジスティック回帰

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# Note in this case, Spark infers it is multinomial logistic regression, so family = "multinomial" is optional.
model <- spark.logit(training, Class ~ ., regParam = 0.07815179)
summary(model)
## $coefficients
##                      1st          2nd          3rd         Crew
## (Intercept)  0.051662845  0.062998145 -0.039083689 -0.075577300
## Sex_Female  -0.088030587 -0.102528148  0.059233106  0.131325629
## Age_Adult    0.141935316  0.169492058 -0.102562719 -0.208864654
## Survived_No  0.052721020  0.057980057 -0.029408423 -0.081292653
## Freq        -0.001555912 -0.001970377  0.001303836  0.002222453

多層パーセプトロン

多層パーセプトロン分類器 (MLPC) は、フィードフォワード人工ニューラルネットワークに基づく分類器です。MLPC は、ノードの複数の層で構成されています。各層は次の層に完全に接続されています。入力層のノードは入力データを表します。他のすべてのノードは、ノードの重みwwおよびバイアスbbの線形結合の入力を出力にマッピングし、活性化関数を適用します。これは、K+1K+1層の場合、行列形式で次のように記述できます。y(x)=fK(f2(w2Tf1(w1Tx+b1)+b2)+bK).y(x)=f_K(\ldots f_2(w_2^T f_1(w_1^T x + b_1) + b_2) \ldots + b_K).

中間層のノードはシグモイド (ロジスティック) 関数を使用します。f(zi)=11+ezi.f(z_i) = \frac{1}{1+e^{-z_i}}.

出力層のノードはソフトマックス関数を使用します。f(zi)=ezik=1Nezk.f(z_i) = \frac{e^{z_i}}{\sum_{k=1}^N e^{z_k}}.

ノード数NN出力層の数はクラスの数に対応します。

MLPC は、モデルを学習するためにバックプロパゲーションを使用します。最適化にはロジスティック損失関数を、最適化ルーチンには L-BFGS を使用します。

spark.mlp には、data に少なくとも 2 つの列が必要です: 1 つは "label"、もう 1 つは "features" です。"features" 列は libSVM 形式である必要があります。

タイタニックデータセットを使用して、分類で spark.mlp を使用する方法を示します。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
# fit a Multilayer Perceptron Classification Model
model <- spark.mlp(training, Survived ~ Age + Sex, blockSize = 128, layers = c(2, 2), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1, initialWeights = c( 0, 0, 5, 5, 9, 9))

長すぎる表示を避けるため、モデル概要の部分的な結果のみを示します。完全な結果は sparkR シェルから確認できます。

# check the summary of the fitted model
summary(model)
## $numOfInputs
## [1] 2
## 
## $numOfOutputs
## [1] 2
## 
## $layers
## [1] 2 2
## 
## $weights
## $weights[[1]]
## [1] 0
## 
## $weights[[2]]
## [1] 0
## 
## $weights[[3]]
## [1] 5
## 
## $weights[[4]]
## [1] 5
## 
## $weights[[5]]
## [1] 9
## 
## $weights[[6]]
## [1] 9
# make predictions use the fitted model
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
##   prediction
## 1         No
## 2         No
## 3         No
## 4         No
## 5         No
## 6         No

ナイーブベイズ

ナイーブベイズモデルは、特徴の独立性を仮定します。spark.naiveBayes は、SparkDataFrame に対して ベルヌーイナイーブベイズモデルを適合させます。データはすべてカテゴリカルである必要があります。これらのモデルは、文書分類によく使用されます。

titanic <- as.data.frame(Titanic)
titanicDF <- createDataFrame(titanic[titanic$Freq > 0, -5])
naiveBayesModel <- spark.naiveBayes(titanicDF, Survived ~ Class + Sex + Age)
summary(naiveBayesModel)
## $apriori
##            Yes        No
## [1,] 0.5769231 0.4230769
## 
## $tables
##     Class_3rd Class_1st Class_2nd Sex_Female Age_Adult
## Yes 0.3125    0.3125    0.3125    0.5        0.5625   
## No  0.4166667 0.25      0.25      0.5        0.75
naiveBayesPrediction <- predict(naiveBayesModel, titanicDF)
head(select(naiveBayesPrediction, "Class", "Sex", "Age", "Survived", "prediction"))
##   Class    Sex   Age Survived prediction
## 1   3rd   Male Child       No        Yes
## 2   3rd Female Child       No        Yes
## 3   1st   Male Adult       No        Yes
## 4   2nd   Male Adult       No        Yes
## 5   3rd   Male Adult       No         No
## 6  Crew   Male Adult       No        Yes

因子機械分類器

分類問題のための因子機械。

因子機械の実装の背景と詳細については、「因子機械」セクションを参照してください。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)

model <- spark.fmClassifier(training, Survived ~ Age + Sex)
summary(model)
## $coefficients
##                 Estimate
## (Intercept) 0.0064275991
## Age_Adult   0.0001294448
## Sex_Female  0.0001294448
## 
## $factors
##            [,1]        [,2]      [,3]       [,4]        [,5]        [,6]
## [1,] -0.3256224  0.11912568 0.1460235  0.1620567  0.13153516  0.06403695
## [2,] -0.1382155 -0.03658261 0.1717808 -0.1602241 -0.08446129 -0.19287098
##             [,7]        [,8]
## [1,] -0.03292446 -0.05166818
## [2,]  0.19252571  0.06237194
## 
## $numClasses
## [1] 2
## 
## $numFeatures
## [1] 2
## 
## $factorSize
## [1] 8
predictions <- predict(model, training)
head(select(predictions, predictions$prediction))
##   prediction
## 1        Yes
## 2        Yes
## 3        Yes
## 4        Yes
## 5        Yes
## 6        Yes

加速故障時間生存モデル

生存分析は、イベントが発生するまでの時間の期待期間を研究し、多くの場合、被験者に対するリスク因子または治療との関係を研究します。標準的な回帰分析とは異なり、生存モデリングは、非負の生存時間や打ち切りなどのデータの特殊な特性を扱う必要があります。

加速故障時間 (AFT) モデルは、共変量の効果が定数によってイベントのライフコースを加速または減速させると仮定する、打ち切りデータのためのパラメトリック生存モデルです。詳細については、Wikipedia ページのAFT モデルとその中の参考文献を参照してください。同じ目的で設計された比例ハザードモデルとは異なり、AFT モデルは、各インスタンスが目的関数に独立して寄与するため、並列化が容易です。

library(survival)
ovarianDF <- createDataFrame(ovarian)
aftModel <- spark.survreg(ovarianDF, Surv(futime, fustat) ~ ecog_ps + rx)
summary(aftModel)
## $coefficients
##                  Value
## (Intercept)  6.8966910
## ecog_ps     -0.3850414
## rx           0.5286455
## Log(scale)  -0.1234429
aftPredictions <- predict(aftModel, ovarianDF)
head(aftPredictions)
##   futime fustat     age resid_ds rx ecog_ps label prediction
## 1     59      1 72.3315        2  1       1    59   1141.724
## 2    115      1 74.4932        2  1       1   115   1141.724
## 3    156      1 66.4658        2  1       2   156    776.855
## 4    421      0 53.3644        2  2       1   421   1937.087
## 5    431      1 50.3397        2  1       1   431   1141.724
## 6    448      0 56.4301        1  1       2   448    776.855

一般化線形モデル

主な関数は spark.glm です。以下のファミリーとリンク関数がサポートされています。デフォルトはガウスです。

ファミリー リンク関数
gaussian identity, log, inverse
binomial logit, probit, cloglog (補数対数対数)
poisson log, identity, sqrt
gamma inverse, identity, log
tweedie power リンク関数

family 引数を指定するには 3 つの方法があります。

  • 文字列としてのファミリー名、例: family = "gaussian"

  • ファミリー関数、例: family = binomial

  • ファミリー関数によって返される結果、例: family = poisson(link = log)

  • ツイーディーファミリーを指定するには 2 つの方法があります。

    1. family = "tweedie" を設定し、var.power および link.power を指定します。
    2. パッケージ statmod がロードされている場合、ツイーディーファミリーは、そこで定義されているファミリー定義、つまり tweedie() を使用して指定されます。

ファミリーとそのリンク関数に関する詳細については、Wikipedia ページの一般化線形モデルを参照してください。

mtcars データセットを例として使用します。対応する SparkDataFramecarsDF です。モデルを適合させた後、概要を表示し、元のデータセットで予測を作成して適合値を確認します。同じスキーマを持つ新しい SparkDataFrame を渡して新しいデータで予測することもできます。

gaussianGLM <- spark.glm(carsDF, mpg ~ wt + hp)
summary(gaussianGLM)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -3.9410  -1.6499  -0.3267   1.0373   5.8538  
## 
## Coefficients:
##               Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)  37.227270   1.5987875  23.2847  0.0000e+00
## wt           -3.877831   0.6327335  -6.1287  1.1196e-06
## hp           -0.031773   0.0090297  -3.5187  1.4512e-03
## 
## (Dispersion parameter for gaussian family taken to be 6.725785)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  195.05  on 29  degrees of freedom
## AIC: 156.7
## 
## Number of Fisher Scoring iterations: 1

予測を行う際、prediction という名前の新しい列が追加されます。ここでは列のサブセットのみを表示します。

gaussianFitted <- predict(gaussianGLM, carsDF)
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
##               model prediction  mpg    wt  hp
## 1         Mazda RX4   23.57233 21.0 2.620 110
## 2     Mazda RX4 Wag   22.58348 21.0 2.875 110
## 3        Datsun 710   25.27582 22.8 2.320  93
## 4    Hornet 4 Drive   21.26502 21.4 3.215 110
## 5 Hornet Sportabout   18.32727 18.7 3.440 175
## 6           Valiant   20.47382 18.1 3.460 105

以下は、ツイーディーファミリーを使用した同じ適合です。

tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power = 0.0)
summary(tweedieGLM1)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##     Min       1Q   Median       3Q      Max  
## -3.9410  -1.6499  -0.3267   1.0373   5.8538  
## 
## Coefficients:
##               Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)  37.227270   1.5987875  23.2847  0.0000e+00
## wt           -3.877831   0.6327335  -6.1287  1.1196e-06
## hp           -0.031773   0.0090297  -3.5187  1.4512e-03
## 
## (Dispersion parameter for tweedie family taken to be 6.725785)
## 
##     Null deviance: 1126.05  on 31  degrees of freedom
## Residual deviance:  195.05  on 29  degrees of freedom
## AIC: 156.7
## 
## Number of Fisher Scoring iterations: 1

ツイーディーファミリーの他の分布を試すことができます。たとえば、対数リンクを持つ複合ポアソン分布です。

tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie",
                         var.power = 1.2, link.power = 0.0)
summary(tweedieGLM2)
## 
## Deviance Residuals: 
## (Note: These are approximate quantiles with relative error <= 0.01)
##      Min        1Q    Median        3Q       Max  
## -0.58074  -0.25335  -0.09892   0.18608   0.82717  
## 
## Coefficients:
##                Estimate  Std. Error  t value    Pr(>|t|)
## (Intercept)   3.8500849  0.06698272  57.4788  0.0000e+00
## wt           -0.2018426  0.02897283  -6.9666  1.1691e-07
## hp           -0.0016248  0.00041603  -3.9054  5.1697e-04
## 
## (Dispersion parameter for tweedie family taken to be 0.1340111)
## 
##     Null deviance: 29.8820  on 31  degrees of freedom
## Residual deviance:  3.7739  on 29  degrees of freedom
## AIC: NA
## 
## Number of Fisher Scoring iterations: 4

単調回帰

spark.isoreg は、SparkDataFrame に対して単調回帰モデルを適合させます。これは、完全な順序制約の下で、重み付き単変量回帰問題を解きます。具体的には、実測応答のセットy1,,yny_1, \ldots, y_n対応する実数特徴量x1,,xnx_1, \ldots, x_nおよびオプションで正の重みw1,,wnw_1, \ldots, w_n単調(区分線形)関数を見つけたいffを最小化します。(f)=i=1nwi(yif(xi))2.\ell(f) = \sum_{i=1}^n w_i (y_i - f(x_i))^2.

有用な引数がいくつかあります。

  • weightCol: 重み列を指定する文字列。

  • isotonic: 出力シーケンスが単調増加(TRUE)または単調減少(FALSE)であるかどうかを示す論理値。

  • featureIndex: 式がベクトル列の場合の右辺の特徴量のインデックス(デフォルト: 0)。それ以外の場合は効果なし。

人工的な例を使用して使用法を示します。

y <- c(3.0, 6.0, 8.0, 5.0, 7.0)
x <- c(1.0, 2.0, 3.5, 3.0, 4.0)
w <- rep(1.0, 5)
data <- data.frame(y = y, x = x, w = w)
df <- createDataFrame(data)
isoregModel <- spark.isoreg(df, y ~ x, weightCol = "w")
isoregFitted <- predict(isoregModel, df)
head(select(isoregFitted, "x", "y", "prediction"))
##     x y prediction
## 1 1.0 3        3.0
## 2 2.0 6        5.5
## 3 3.5 8        7.5
## 4 3.0 5        5.5
## 5 4.0 7        7.5

予測段階では、適合した単調区分線形関数に基づいて、ルールは次のようになります。

  • 予測入力がトレーニング特徴量と正確に一致する場合、関連付けられた予測が返されます。同じ特徴量の予測が複数ある場合、そのうちの 1 つが返されます。どちらが返されるかは未定義です。

  • 予測入力がすべてのトレーニング特徴量よりも小さいまたは大きい場合、それぞれ最小または最大のトレーニング特徴量を持つ予測が返されます。同じ特徴量の予測が複数ある場合、それぞれ以前のポイントと同じルールが使用されます。

  • 予測入力が 2 つのトレーニング特徴量の間にある場合、予測は区分線形関数として扱われ、補間値は最も近い 2 つの特徴量での予測から計算されます。同じ特徴量の値が複数ある場合、前のポイントと同じルールが使用されます。

たとえば、入力が3.23.2で、最も近い 2 つの特徴量値が3.03.0and3.53.5の場合、予測値は、3.03.0and3.53.5.

newDF <- createDataFrame(data.frame(x = c(1.5, 3.2)))
head(predict(isoregModel, newDF))
##     x prediction
## 1 1.5       4.25
## 2 3.2       6.30

線形回帰

線形回帰モデル。

model <- spark.lm(carsDF, mpg ~ wt + hp)

summary(model)
## $coefficients
##                Estimate
## (Intercept) 37.22727012
## wt          -3.87783074
## hp          -0.03177295
## 
## $numFeatures
## [1] 2
predictions <- predict(model, carsDF)
head(select(predictions, predictions$prediction))
##   prediction
## 1   23.57233
## 2   22.58348
## 3   25.27582
## 4   21.26502
## 5   18.32727
## 6   20.47382

因子機械回帰子

回帰問題のための因子機械。

因子機械の実装の背景と詳細については、「因子機械」セクションを参照してください。

model <- spark.fmRegressor(carsDF, mpg ~ wt + hp)
summary(model)
## $coefficients
##              Estimate
## (Intercept) 0.1518559
## wt          3.6472555
## hp          2.8026828
## 
## $factors
##            [,1]       [,2]       [,3]       [,4]      [,5]       [,6]
## [1,]  0.1424420 -0.1178110 -0.3970272 -0.4696695  0.400288  0.3690930
## [2,] -0.1626185  0.1512138  0.3690435  0.4076975 -0.625752 -0.3715109
##             [,7]       [,8]
## [1,]  0.03472468 -0.1703219
## [2,] -0.02109148 -0.2006249
## 
## $numFeatures
## [1] 2
## 
## $factorSize
## [1] 8
predictions <- predict(model, carsDF)
head(select(predictions, predictions$prediction))
##   prediction
## 1  106.70996
## 2   87.07526
## 3  111.07931
## 4   60.89565
## 5   61.81374
## 6   40.70095

決定木

spark.decisionTree は、SparkDataFrame に対して決定木分類または回帰モデルを適合させます。ユーザーは summary を呼び出して適合モデルの概要を取得したり、predict を呼び出して予測を作成したり、write.ml/read.ml を呼び出して適合モデルを保存/ロードしたりできます。

タイタニックデータセットを使用して決定木をトレーニングし、予測を行います。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
dtModel <- spark.decisionTree(df, Survived ~ ., type = "classification", maxDepth = 2)
summary(dtModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[5],[1.0])
## Max Depth:  2
##  DecisionTreeClassificationModel: uid=dtc_3b470a52c3a2, depth=2, numNodes=5, numClasses=2, numFeatures=6
##   If (feature 5 <= 4.5)
##    Predict: 0.0
##   Else (feature 5 > 4.5)
##    If (feature 5 <= 84.5)
##     Predict: 1.0
##    Else (feature 5 > 84.5)
##     Predict: 0.0
## 
predictions <- predict(dtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

勾配ブースティングツリー

spark.gbt は、SparkDataFrame に対して勾配ブースティングツリー分類または回帰モデルを適合させます。ユーザーは summary を呼び出して適合モデルの概要を取得したり、predict を呼び出して予測を作成したり、write.ml/read.ml を呼び出して適合モデルを保存/ロードしたりできます。

タイタニックデータセットを使用して勾配ブースティングツリーをトレーニングし、予測を行います。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
gbtModel <- spark.gbt(df, Survived ~ ., type = "classification", maxDepth = 2, maxIter = 2)
summary(gbtModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[1,2,5],[0.03336902858878361,0.16099525743106016,0.8056357139801562])
## Max Depth:  2
## Number of trees:  2
## Tree weights:  1 0.1
##  GBTClassificationModel: uid = gbtc_474511659869, numTrees=2, numClasses=2, numFeatures=6
##   Tree 0 (weight 1.0):
##     If (feature 5 <= 4.5)
##      If (feature 1 in {1.0})
##       Predict: -1.0
##      Else (feature 1 not in {1.0})
##       Predict: -0.3333333333333333
##     Else (feature 5 > 4.5)
##      If (feature 5 <= 84.5)
##       Predict: 0.5714285714285714
##      Else (feature 5 > 84.5)
##       Predict: -0.42857142857142855
##   Tree 1 (weight 0.1):
##     If (feature 2 in {1.0})
##      If (feature 5 <= 15.5)
##       Predict: 0.9671846896296403
##      Else (feature 5 > 15.5)
##       Predict: -1.0857923804083338
##     Else (feature 2 not in {1.0})
##      If (feature 5 <= 13.5)
##       Predict: -0.08651035613926407
##      Else (feature 5 > 13.5)
##       Predict: 0.6566673506774614
## 
predictions <- predict(gbtModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

ランダムフォレスト

spark.randomForest は、SparkDataFrame に対してランダムフォレスト分類または回帰モデルを適合させます。ユーザーは summary を呼び出して適合モデルの概要を取得したり、predict を呼び出して予測を作成したり、write.ml/read.ml を呼び出して適合モデルを保存/ロードしたりできます。

次の例では、タイタニックデータセットを使用してランダムフォレストをトレーニングし、予測を行います。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
rfModel <- spark.randomForest(df, Survived ~ ., type = "classification", maxDepth = 2, numTrees = 2)
summary(rfModel)
## Formula:  Survived ~ .
## Number of features:  6
## Features:  Class_1st Class_2nd Class_3rd Sex_Female Age_Adult Freq
## Feature importances:  (6,[3,4,5],[0.09849498327759101,0.401505016722409,0.5])
## Max Depth:  2
## Number of trees:  2
## Tree weights:  1 1
##  RandomForestClassificationModel: uid=rfc_d8fbf7a89562, numTrees=2, numClasses=2, numFeatures=6
##   Tree 0 (weight 1.0):
##     If (feature 4 in {0.0})
##      Predict: 0.0
##     Else (feature 4 not in {0.0})
##      If (feature 3 in {1.0})
##       Predict: 0.0
##      Else (feature 3 not in {1.0})
##       Predict: 1.0
##   Tree 1 (weight 1.0):
##     If (feature 5 <= 84.5)
##      If (feature 5 <= 4.5)
##       Predict: 0.0
##      Else (feature 5 > 4.5)
##       Predict: 1.0
##     Else (feature 5 > 84.5)
##      Predict: 0.0
## 
predictions <- predict(rfModel, df)
head(select(predictions, "Class", "Sex", "Age", "Freq", "Survived", "prediction"))
##   Class    Sex   Age Freq Survived prediction
## 1   1st   Male Child    0       No         No
## 2   2nd   Male Child    0       No         No
## 3   3rd   Male Child   35       No        Yes
## 4  Crew   Male Child    0       No         No
## 5   1st Female Child    0       No         No
## 6   2nd Female Child    0       No         No

二分 k-means

spark.bisectingKmeans は、すべての観測値が 1 つのクラスタから始まり、階層を下降するにつれて再帰的に分割が実行される、分割(または「トップダウン」)アプローチを使用した階層クラスタリングの一種です。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
model <- spark.bisectingKmeans(training, Class ~ Survived, k = 4)
summary(model)
## $k
## [1] 4
## 
## $coefficients
##   Survived_No
## 1 0          
## 2 1          
## 3 0          
## 4 1          
## 
## $size
## $size[[1]]
## [1] 16
## 
## $size[[2]]
## [1] 16
## 
## $size[[3]]
## [1] 0
## 
## $size[[4]]
## [1] 0
## 
## 
## $cluster
## SparkDataFrame[prediction:int]
## 
## $is.loaded
## [1] FALSE
fitted <- predict(model, training)
head(select(fitted, "Class", "prediction"))
##   Class prediction
## 1   1st          1
## 2   2nd          1
## 3   3rd          1
## 4  Crew          1
## 5   1st          1
## 6   2nd          1

ガウス混合モデル

spark.gaussianMixture は、SparkDataFrame に対して多変量ガウス混合モデル (GMM) を適合させます。期待値最大化 (EM) アルゴリズムを使用して、モデルの最尤推定値 (MLE) を近似します。

シミュレートされた例を使用して使用法を実証します。

X1 <- data.frame(V1 = rnorm(4), V2 = rnorm(4))
X2 <- data.frame(V1 = rnorm(6, 3), V2 = rnorm(6, 4))
data <- rbind(X1, X2)
df <- createDataFrame(data)
gmmModel <- spark.gaussianMixture(df, ~ V1 + V2, k = 2)
summary(gmmModel)
## $lambda
## [1] 0.408879 0.591121
## 
## $mu
## $mu[[1]]
## [1] -0.8254152973  0.0009888204
## 
## $mu[[2]]
## [1] 3.006119 3.620325
## 
## 
## $sigma
## $sigma[[1]]
##      [,1]     [,2]    
## [1,] 1.377944 1.092401
## [2,] 1.092401 1.477489
## 
## $sigma[[2]]
##      [,1]      [,2]     
## [1,] 1.317545  0.5716919
## [2,] 0.5716919 0.7335671
## 
## 
## $loglik
## [1] -32.8689
## 
## $posterior
## SparkDataFrame[posterior:array<double>]
## 
## $is.loaded
## [1] FALSE
gmmFitted <- predict(gmmModel, df)
head(select(gmmFitted, "V1", "V2", "prediction"))
##             V1         V2 prediction
## 1 -1.400043517  0.6215527          0
## 2  0.255317055  1.1484116          0
## 3 -2.437263611 -1.8218177          0
## 4 -0.005571287 -0.2473253          0
## 5  2.755800393  4.5124269          1
## 6  2.717294551  2.1369885          1

k-Means クラスタリング

spark.kmeans は、SparkDataFrame に対してkk-means クラスタリングモデルを適合させます。教師なし学習法であるため、応答変数は必要ありません。したがって、R 式の左辺は空のままにする必要があります。クラスタリングは、右辺の変数のみに基づいています。

kmeansModel <- spark.kmeans(carsDF, ~ mpg + hp + wt, k = 3)
summary(kmeansModel)
## $k
## [1] 3
## 
## $coefficients
##        mpg        hp       wt
## 1 24.22353  93.52941 2.599588
## 2 15.80000 178.50000 3.926400
## 3 14.62000 263.80000 3.899000
## 
## $size
## $size[[1]]
## [1] 17
## 
## $size[[2]]
## [1] 10
## 
## $size[[3]]
## [1] 5
## 
## 
## $cluster
## SparkDataFrame[prediction:int]
## 
## $is.loaded
## [1] FALSE
## 
## $clusterSize
## [1] 3
kmeansPredictions <- predict(kmeansModel, carsDF)
head(select(kmeansPredictions, "model", "mpg", "hp", "wt", "prediction"), n = 20L)
##                  model  mpg  hp    wt prediction
## 1            Mazda RX4 21.0 110 2.620          0
## 2        Mazda RX4 Wag 21.0 110 2.875          0
## 3           Datsun 710 22.8  93 2.320          0
## 4       Hornet 4 Drive 21.4 110 3.215          0
## 5    Hornet Sportabout 18.7 175 3.440          1
## 6              Valiant 18.1 105 3.460          0
## 7           Duster 360 14.3 245 3.570          2
## 8            Merc 240D 24.4  62 3.190          0
## 9             Merc 230 22.8  95 3.150          0
## 10            Merc 280 19.2 123 3.440          0
## 11           Merc 280C 17.8 123 3.440          0
## 12          Merc 450SE 16.4 180 4.070          1
## 13          Merc 450SL 17.3 180 3.730          1
## 14         Merc 450SLC 15.2 180 3.780          1
## 15  Cadillac Fleetwood 10.4 205 5.250          1
## 16 Lincoln Continental 10.4 215 5.424          1
## 17   Chrysler Imperial 14.7 230 5.345          2
## 18            Fiat 128 32.4  66 2.200          0
## 19         Honda Civic 30.4  52 1.615          0
## 20      Toyota Corolla 33.9  65 1.835          0

潜在ディリクレ配分法

spark.lda は、SparkDataFrame に対して潜在ディリクレ配分法モデルを適合させます。これは、テキストドキュメントのコレクションからトピックが推測されるトピックモデリングでよく使用されます。LDA は、次のようにクラスタリングアルゴリズムと見なすことができます。

  • トピックはクラスタ中心に対応し、ドキュメントはデータセット内の例(行)に対応します。

  • トピックとドキュメントは両方とも特徴空間に存在し、特徴ベクトルは単語数のベクトル(ボキャブラリー)です。

  • 従来の距離を使用してクラスタリングするのではなく、LDA は、テキストドキュメントがどのように生成されるかの統計モデルに基づいた関数を使用します。

LDA を使用するには、各エントリがドキュメントを表す datafeatures 列を指定する必要があります。列には 2 つのオプションがあります。

  • 文字列: これは、ドキュメント全体を表す文字列にすることができます。自動的に解析されます。customizedStopWords に追加のストップワードを追加できます。

  • libSVM: 各エントリは単語のコレクションであり、直接処理されます。

適合モデルには、さらに 2 つの関数が用意されています。

  • spark.posterior は、"topicDistribution" という名前の posterior 確率ベクトル列を含む SparkDataFrame を返します。

  • spark.perplexity は、指定された SparkDataFrame の対数パープレキシティ、または data 引数が欠落している場合はトレーニングデータの対数パープレキシティを返します。

詳細については、ヘルプドキュメント ?spark.lda を参照してください。

人工的な例を見てみましょう。

corpus <- data.frame(features = c(
  "1 2 6 0 2 3 1 1 0 0 3",
  "1 3 0 1 3 0 0 2 0 0 1",
  "1 4 1 0 0 4 9 0 1 2 0",
  "2 1 0 3 0 0 5 0 2 3 9",
  "3 1 1 9 3 0 2 0 0 1 3",
  "4 2 0 3 4 5 1 1 1 4 0",
  "2 1 0 3 0 0 5 0 2 2 9",
  "1 1 1 9 2 1 2 0 0 1 3",
  "4 4 0 3 4 2 1 3 0 0 0",
  "2 8 2 0 3 0 2 0 2 7 2",
  "1 1 1 9 0 2 2 0 0 3 3",
  "4 1 0 0 4 5 1 3 0 1 0"))
corpusDF <- createDataFrame(corpus)
model <- spark.lda(data = corpusDF, k = 5, optimizer = "em")
summary(model)
## $docConcentration
## [1] 11 11 11 11 11
## 
## $topicConcentration
## [1] 1.1
## 
## $logLikelihood
## [1] -353.2948
## 
## $logPerplexity
## [1] 2.676476
## 
## $isDistributed
## [1] TRUE
## 
## $vocabSize
## [1] 10
## 
## $topics
## SparkDataFrame[topic:int, term:array<string>, termWeights:array<double>]
## 
## $vocabulary
##  [1] "0" "1" "2" "3" "4" "9" "5" "8" "7" "6"
## 
## $trainingLogLikelihood
## [1] -239.5629
## 
## $logPrior
## [1] -980.2974
posterior <- spark.posterior(model, corpusDF)
head(posterior)
##                features                                     topicDistribution
## 1 1 2 6 0 2 3 1 1 0 0 3 0.1972183, 0.1986639, 0.2022022, 0.2006584, 0.2012571
## 2 1 3 0 1 3 0 0 2 0 0 1 0.1989991, 0.1988732, 0.2015973, 0.2006385, 0.1998919
## 3 1 4 1 0 0 4 9 0 1 2 0 0.2020625, 0.2026072, 0.1968842, 0.1987298, 0.1997163
## 4 2 1 0 3 0 0 5 0 2 3 9 0.2004088, 0.1981928, 0.2013018, 0.2006318, 0.1994647
## 5 3 1 1 9 3 0 2 0 0 1 3 0.1971488, 0.1983950, 0.2023585, 0.2011592, 0.2009385
## 6 4 2 0 3 4 5 1 1 1 4 0 0.2020255, 0.2041807, 0.1955397, 0.1997236, 0.1985306
perplexity <- spark.perplexity(model, corpusDF)
perplexity
## [1] 2.676476

交互最小二乗法

spark.als は、協調フィルタリングにおける潜在因子を交互最小二乗法を介して学習します。

spark.als では、rankregnonnegative を含む複数のオプションを設定できます。完全なリストについては、ヘルプファイルを参照してください。

ratings <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0),
                list(2, 1, 1.0), list(2, 2, 5.0))
df <- createDataFrame(ratings, c("user", "item", "rating"))
model <- spark.als(df, "rating", "user", "item", rank = 10, reg = 0.1, nonnegative = TRUE)

潜在因子を抽出します。

stats <- summary(model)
userFactors <- stats$userFactors
itemFactors <- stats$itemFactors
head(userFactors)
head(itemFactors)

予測を行います。

predicted <- predict(model, df)
head(predicted)

べき乗反復クラスタリング

べき乗反復クラスタリング (PIC) は、スケーラブルなグラフクラスタリングアルゴリズムです。spark.assignClusters メソッドは PIC アルゴリズムを実行し、各入力頂点のクラスタ割り当てを返します。

df <- createDataFrame(list(list(0L, 1L, 1.0), list(0L, 2L, 1.0),
                           list(1L, 2L, 1.0), list(3L, 4L, 1.0),
                           list(4L, 0L, 0.1)),
                      schema = c("src", "dst", "weight"))
head(spark.assignClusters(df, initMode = "degree", weightCol = "weight"))
##   id cluster
## 1  4       1
## 2  0       0
## 3  1       0
## 4  3       1
## 5  2       0

FP-growth

spark.fpGrowth は、FP-growth アルゴリズムを実行して、SparkDataFrame 上の頻出アイテムセットをマイニングします。itemsCol は値の配列である必要があります。

df <- selectExpr(createDataFrame(data.frame(rawItems = c(
  "T,R,U", "T,S", "V,R", "R,U,T,V", "R,S", "V,S,U", "U,R", "S,T", "V,R", "V,U,S",
  "T,V,U", "R,V", "T,S", "T,S", "S,T", "S,U", "T,R", "V,R", "S,V", "T,S,U"
))), "split(rawItems, ',') AS items")

fpm <- spark.fpGrowth(df, minSupport = 0.2, minConfidence = 0.5)

spark.freqItemsets メソッドを使用して、頻出アイテムセットを含む SparkDataFrame を取得できます。

##   items freq
## 1     S   11
## 2     T   10
## 3  T, S    6
## 4     R    9
## 5     V    9
## 6  V, R    5

spark.associationRules は、関連ルールを含む SparkDataFrame を返します。

##   antecedent consequent confidence      lift support
## 1          V          R  0.5555556 1.2345679    0.25
## 2          S          T  0.5454545 1.0909091    0.30
## 3          T          S  0.6000000 1.0909091    0.30
## 4          R          V  0.5555556 1.2345679    0.25
## 5          U          S  0.5000000 0.9090909    0.20
## 6          U          T  0.5000000 1.0000000    0.20

antecedent に基づいて予測を行うことができます。

head(predict(fpm, df))
##        items prediction
## 1    T, R, U       S, V
## 2       T, S       NULL
## 3       V, R       NULL
## 4 R, U, T, V          S
## 5       R, S       T, V
## 6    V, S, U       R, T

PrefixSpan

spark.findFrequentSequentialPatterns メソッドを使用して、入力シーケンスのアイテムセット内の頻出シーケンスパターンの完全なセットを見つけることができます。

df <- createDataFrame(list(list(list(list(1L, 2L), list(3L))),
                           list(list(list(1L), list(3L, 2L), list(1L, 2L))),
                           list(list(list(1L, 2L), list(5L))),
                           list(list(list(6L)))),
                      schema = c("sequence"))
head(spark.findFrequentSequentialPatterns(df, minSupport = 0.5, maxPatternLength = 5L))
##   sequence freq
## 1        1    3
## 2        3    2
## 3        2    3
## 4     1, 2    3
## 5     1, 3    2

コルモゴロフ・スミルノフ検定

spark.kstest は、両側 1 サンプル コルモゴロフ・スミルノフ (KS) 検定を実行します。SparkDataFrame が与えられると、この検定は、testCol 列の連続データを、nullHypothesis パラメータで指定された理論的分布と比較します。ユーザーは summary を呼び出して検定結果の概要を取得できます。

次の例では、タイタニックデータセットの Freq 列が正規分布に従うかどうかをテストします。正規分布のパラメータを、サンプルの平均と標準偏差を使用して設定します。

t <- as.data.frame(Titanic)
df <- createDataFrame(t)
freqStats <- head(select(df, mean(df$Freq), sd(df$Freq)))
freqMean <- freqStats[1]
freqStd <- freqStats[2]

test <- spark.kstest(df, "Freq", "norm", c(freqMean, freqStd))
testSummary <- summary(test)
testSummary
## Kolmogorov-Smirnov test summary:
## degrees of freedom = 0 
## statistic = 0.3065126710255011 
## pValue = 0.0036336792155329256 
## Very strong presumption against null hypothesis: Sample follows theoretical distribution.

モデルの永続化

次の例は、SparkR で ML モデルを保存/ロードする方法を示しています。

t <- as.data.frame(Titanic)
training <- createDataFrame(t)
gaussianGLM <- spark.glm(training, Freq ~ Sex + Age, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)
## 
## Saved-loaded model does not support output 'Deviance Residuals'.
## 
## Coefficients:
##              Estimate  Std. Error  t value   Pr(>|t|)
## (Intercept)    46.219      35.994   1.2841  0.2092846
## Sex_Female    -78.812      41.562  -1.8962  0.0679311
## Age_Adult     123.938      41.562   2.9820  0.0057522
## 
## (Dispersion parameter for gaussian family taken to be 13819.52)
## 
##     Null deviance: 573341  on 31  degrees of freedom
## Residual deviance: 400766  on 29  degrees of freedom
## AIC: 400.7
## 
## Number of Fisher Scoring iterations: 1
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, training)
head(gaussianPredictions)
##   Class    Sex   Age Survived Freq label prediction
## 1   1st   Male Child       No    0     0   46.21875
## 2   2nd   Male Child       No    0     0   46.21875
## 3   3rd   Male Child       No   35    35   46.21875
## 4  Crew   Male Child       No    0     0   46.21875
## 5   1st Female Child       No    0     0  -32.59375
## 6   2nd Female Child       No    0     0  -32.59375
unlink(modelPath)

Structured Streaming

SparkR は Structured Streaming API をサポートしています。

Structured Streaming プログラミングガイドで、プログラミングモデルと基本概念の紹介を確認できます。

シンプルなソースとシンク

Spark には、いくつかの組み込み入力ソースがあります。例として、ソケットソースを使用してテキストを単語に読み込み、計算された単語数を表示してテストします。

# Create DataFrame representing the stream of input lines from connection
lines <- read.stream("socket", host = hostname, port = port)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(groupBy(words, "word"))

# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

Kafka ソース

Kafka からデータを読み取るのは簡単です。詳細については、Structured Streaming でサポートされている入力ソースを参照してください。

topic <- read.stream("kafka",
                     kafka.bootstrap.servers = "host1:port1,host2:port2",
                     subscribe = "topic1")
keyvalue <- selectExpr(topic, "CAST(key AS STRING)", "CAST(value AS STRING)")

操作とシンク

SparkDataFrame のほとんどの一般的な操作はストリーミングでサポートされています。選択、射影、集計などです。最終結果を定義したら、ストリーミング計算を開始するために、シンクと outputMode を設定した write.stream メソッドを呼び出します。

ストリーミング SparkDataFrame は、デバッグのためにコンソール、一時的なインメモリテーブル、またはフォールトトレラントな方法でさまざまな形式のファイルシンクに書き込むことができます。

noAggDF <- select(where(deviceDataStreamingDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# Aggregate
aggDF <- count(groupBy(noAggDF, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

head(sql("select * from aggregates"))

高度なトピック

SparkR オブジェクトクラス

SparkR には、作業する可能性のある 3 つの主要なオブジェクトクラスがあります。

  • SparkDataFrame: SparkR の中心的なコンポーネントです。これは、名前付き列に編成されたデータの分散コレクションを表す S4 クラスであり、概念的にはリレーショナルデータベースのテーブルまたは R のデータフレームと同等です。sdfenv の 2 つのスロットがあります。

    • sdf は、Spark JVM バックエンドの対応する Spark Dataset への参照を格納します。
    • env は、isCached などのオブジェクトのメタ情報を保存します。

    データインポートメソッドによって、または既存の SparkDataFrame を変換することによって作成できます。多数のデータ処理関数で SparkDataFrame を操作し、それを機械学習アルゴリズムにフィードできます。

  • Column: SparkDataFrame の列を表す S4 クラスです。jc スロットは、Spark JVM バックエンドの対応する Column オブジェクトへの参照を保存します。

    $ 演算子、例: df$col を使用して SparkDataFrame から取得できます。より頻繁には、select と組み合わせて特定の列を選択したり、filter と構築された条件で行を選択したり、集計関数と組み合わせてグループごとの集計統計を計算したりするために使用されます。

  • GroupedData: groupBy によって、または他の GroupedData を変換することによって作成されたグループ化されたデータを表す S4 クラスです。その sgd スロットは、バックエンドの RelationalGroupedDataset オブジェクトへの参照を保存します。

    これは、グループ情報を持つ中間オブジェクトであり、集計操作がそれに続きます。

アーキテクチャ

アーキテクチャの完全な説明は、参考文献、特に論文 *SparkR: Scaling R Programs with Spark* で確認できます。

SparkR の内部では Spark SQL エンジンが使用されています。これにより、解釈された R コードを実行する際のオーバーヘッドが回避され、Spark の最適化された SQL 実行エンジンは、データと計算フローの構造情報を使用して多くの最適化を実行し、計算を高速化します。

実際の計算の主要なメソッド呼び出しは、ドライバーの Spark JVM で行われます。R から JVM 上の関数を呼び出すことができるソケットベースの SparkR API があります。Netty ベースのソケットサーバーでリッスンする SparkR JVM バックエンドを使用します。

SparkR JVM バックエンドでは、メソッド呼び出しと新しいオブジェクトの作成の 2 種類の RPC がサポートされています。メソッド呼び出しは 2 つの方法で行うことができます。

  • sparkR.callJMethod は、既存の Java オブジェクトへの参照と、メソッドに渡される引数のリストを受け取ります。

  • sparkR.callJStatic は、静的メソッドのクラス名と、メソッドに渡される引数のリストを受け取ります。

引数はカスタムワイヤーフォーマットを使用してシリアル化され、JVM 側でデシリアル化されます。その後、Java リフレクションを使用して適切なメソッドを呼び出します。

オブジェクトを作成するには、sparkR.newJObject を使用し、同様に適切なコンストラクタを、提供された引数で呼び出します。

最後に、バックエンドに存在する Java オブジェクトを参照する新しい R クラス jobj を使用します。これらの参照は Java 側で追跡され、R 側でスコープ外になると自動的にガベージコレクションされます。

付録

R と Spark のデータ型

R Spark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
日付 date
array array
list array
env map

参考文献