RDD プログラミングガイド

概要

全体として、すべての Spark アプリケーションは、ユーザーの main 関数を実行し、クラスター上で様々な 並列操作 を実行する ドライバープログラム で構成されます。Spark が提供する主要な抽象化は、クラスターのノード間でパーティション化された要素のコレクションであり、並列に操作できる 耐障害性分散データセット (RDD) です。RDD は、Hadoop ファイルシステム(またはその他の Hadoop 対応ファイルシステム)のファイル、またはドライバープログラム内の既存の Scala コレクションから開始し、それを変換することによって作成されます。ユーザーは、RDD をメモリに 永続化 するように Spark に依頼することもでき、これにより並列操作全体で効率的に再利用できます。最後に、RDD はノード障害から自動的に回復します。

Spark の 2 番目の抽象化は、並列操作で使用できる 共有変数 です。デフォルトでは、Spark は関数を異なるノード上のタスクのセットとして並列に実行する際に、関数で使用される各変数のコピーを各タスクに送信します。場合によっては、変数はタスク間、またはタスクとドライバープログラム間で共有される必要があります。Spark は 2 種類の共有変数をサポートしています。ブロードキャスト変数 は、すべてのノードのメモリに値をキャッシュするために使用でき、アキュムレータ は、カウンターや合計など、"加算" のみが行われる変数です。

このガイドでは、Spark がサポートする各言語でのこれらの機能について説明します。Spark の対話型シェルを起動すると、最も簡単に追従できます。Scala シェルの場合は bin/spark-shell、Python シェルの場合は bin/pyspark です。

Spark との連携

Spark 4.0.0 は、Python 3.9+ で動作します。標準の CPython インタープリタを使用できるため、NumPy などの C ライブラリを使用できます。PyPy 7.3.6+ でも動作します。

Python の Spark アプリケーションは、実行時に Spark を含んでいる bin/spark-submit スクリプトで実行するか、または setup.py に以下のように含めることで実行できます。

    install_requires=[
        'pyspark==4.0.0'
    ]

pip で PySpark をインストールせずに Python で Spark アプリケーションを実行するには、Spark ディレクトリにある bin/spark-submit スクリプトを使用します。このスクリプトは Spark の Java/Scala ライブラリをロードし、クラスターにアプリケーションを送信できます。対話型 Python シェルを起動するには bin/pyspark を使用することもできます。

HDFS データにアクセスしたい場合は、HDFS のバージョンにリンクされた PySpark のビルドを使用する必要があります。事前ビルドされたパッケージ も Spark ホームページで一般的な HDFS バージョン向けに提供されています。

最後に、Spark クラスの一部をプログラムにインポートする必要があります。次の行を追加します。

from pyspark import SparkContext, SparkConf

PySpark は、ドライバーとワーカーの両方で同じマイナーバージョンの Python を必要とします。PATH のデフォルトの Python バージョンを使用します。PYSPARK_PYTHON を設定することで、使用したい Python のバージョンを指定できます。例:

$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py

Spark 4.0.0 は、デフォルトで Scala 2.13 用にビルドおよび配布されています。(Spark は他の Scala バージョンでもビルドできます。) Scala でアプリケーションを記述するには、互換性のある Scala バージョン(例:2.13.X)を使用する必要があります。

Spark アプリケーションを記述するには、Spark への Maven 依存関係を追加する必要があります。Spark は Maven Central で入手できます。

groupId = org.apache.spark
artifactId = spark-core_2.13
version = 4.0.0

さらに、HDFS クラスターにアクセスしたい場合は、HDFS のバージョン用の hadoop-client への依存関係を追加する必要があります。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後に、Spark クラスの一部をプログラムにインポートする必要があります。次の行を追加します。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(Spark 1.3.0 より前は、不可欠な暗黙の変換を有効にするために、明示的に import org.apache.spark.SparkContext._ をインポートする必要がありました。)

Spark 4.0.0 は、関数を簡潔に記述するための ラムダ式 をサポートしています。それ以外の場合は、org.apache.spark.api.java.function パッケージのクラスを使用できます。

注意:Spark 2.2.0 で Java 7 のサポートが削除されました。

Java で Spark アプリケーションを記述するには、Spark への依存関係を追加する必要があります。Spark は Maven Central で入手できます。

groupId = org.apache.spark
artifactId = spark-core_2.13
version = 4.0.0

さらに、HDFS クラスターにアクセスしたい場合は、HDFS のバージョン用の hadoop-client への依存関係を追加する必要があります。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後に、Spark クラスの一部をプログラムにインポートする必要があります。次の行を追加します。

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

Spark の初期化

Spark プログラムが最初に行うべきことは、SparkContext オブジェクトを作成することです。これは、Spark がクラスターにどのようにアクセスするかを伝えます。SparkContext を作成するには、まずアプリケーションに関する情報を含む SparkConf オブジェクトを構築する必要があります。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Spark プログラムが最初に行うべきことは、SparkContext オブジェクトを作成することです。これは、Spark がクラスターにどのようにアクセスするかを伝えます。SparkContext を作成するには、まずアプリケーションに関する情報を含む SparkConf オブジェクトを構築する必要があります。

JVM ごとにアクティブな SparkContext は 1 つだけである必要があります。新しい SparkContext を作成する前に、アクティブな SparkContext を stop() する必要があります。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

Spark プログラムが最初に行うべきことは、JavaSparkContext オブジェクトを作成することです。これは、Spark がクラスターにどのようにアクセスするかを伝えます。SparkContext を作成するには、まずアプリケーションに関する情報を含む SparkConf オブジェクトを構築する必要があります。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appName パラメータは、クラスター UI に表示されるアプリケーションの名前です。master は、Spark または YARN クラスターの URL、またはローカルモードで実行するための特別な "local" 文字列です。実際には、クラスターで実行する場合、プログラムに master をハードコーディングしたくはありません。代わりに、spark-submit でアプリケーションを起動 し、そこで受け取るようにします。ただし、ローカルテストや単体テストでは、「local」を渡して Spark をインプロセスで実行できます。

シェルを使用する

PySpark シェルでは、sc という名前の特別なインタープリタ対応 SparkContext がすでに作成されています。独自の SparkContext を作成しても機能しません。--master 引数を使用してコンテキストが接続するマスターを設定でき、--py-files にカンマ区切りのリストを渡すことで、実行時のパスに Python .zip、.egg、または .py ファイルを追加できます。サードパーティの Python 依存関係については、Python パッケージ管理 を参照してください。--packages 引数に Maven 座標のカンマ区切りのリストを提供することで、シェルセッションに依存関係(例:Spark Packages)を追加することもできます。依存関係が存在する可能性のある追加のリポジトリ(例:Sonatype)は、--repositories 引数に渡すことができます。たとえば、4 つのコアで bin/pyspark を実行するには、次のようにします。

$ ./bin/pyspark --master "local[4]"

または、code.py を検索パスに追加するには (後で import code できるようにするため)、次のようにします。

$ ./bin/pyspark --master "local[4]" --py-files code.py

オプションの完全なリストについては、pyspark --help を実行してください。内部的には、pyspark はより一般的な spark-submit スクリプト を呼び出します。

IPython、拡張 Python インタープリタで PySpark シェルを起動することも可能です。PySpark は IPython 1.0.0 以降で動作します。IPython を使用するには、bin/pyspark を実行する際に PYSPARK_DRIVER_PYTHON 変数を ipython に設定します。

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

Jupyter Notebook (旧称 IPython Notebook) を使用するには、

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

PYSPARK_DRIVER_PYTHON_OPTS を設定することで、ipython または jupyter コマンドをカスタマイズできます。

Jupyter Notebook サーバーが起動したら、「Files」タブから新しいノートブックを作成できます。ノートブック内では、Jupyter Notebook から Spark を試す前に、%pylab inline というコマンドをノートブックの一部として入力できます。

Spark シェルでは、sc という名前の特別なインタープリタ対応 SparkContext がすでに作成されています。独自の SparkContext を作成しても機能しません。--master 引数を使用してコンテキストが接続するマスターを設定でき、--jars にカンマ区切りのリストを渡すことで、JAR をクラスパスに追加できます。--packages 引数に Maven 座標のカンマ区切りのリストを提供することで、シェルセッションに依存関係(例:Spark Packages)を追加することもできます。依存関係が存在する可能性のある追加のリポジトリ(例:Sonatype)は、--repositories 引数に渡すことができます。たとえば、4 つのコアで bin/spark-shell を実行するには、次のようにします。

$ ./bin/spark-shell --master "local[4]"

または、code.jar をクラスパスに追加するには、次のようにします。

$ ./bin/spark-shell --master "local[4]" --jars code.jar

Maven 座標を使用して依存関係を含めるには

$ ./bin/spark-shell --master "local[4]" --packages "org.example:example:0.1"

オプションの完全なリストについては、spark-shell --help を実行してください。内部的には、spark-shell はより一般的な spark-submit スクリプト を呼び出します。

Resilient Distributed Datasets (RDD)

Spark は、耐障害性分散データセット (RDD) という概念を中心に展開されます。これは、並列に操作できる要素の耐障害性のあるコレクションです。RDD を作成するには 2 つの方法があります。ドライバープログラム内の既存のコレクションを 並列化 するか、共有ファイルシステム、HDFS、HBase などの外部ストレージシステムにあるデータセットを参照するか、Hadoop InputFormat を提供するデータソースを参照します。

並列コレクション

並列化されたコレクションは、ドライバープログラムの既存のイテラブルまたはコレクションに対して SparkContextparallelize メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。たとえば、1 から 5 までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

作成後、分散データセット (distData) は並列に操作できます。たとえば、リストの要素を合計するために distData.reduce(lambda a, b: a + b) を呼び出すことができます。分散データセットの操作については後で説明します。

並列化されたコレクションは、ドライバープログラムの既存のコレクション (Scala Seq) に対して SparkContextparallelize メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。たとえば、1 から 5 までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

作成後、分散データセット (distData) は並列に操作できます。たとえば、配列の要素を合計するために distData.reduce((a, b) => a + b) を呼び出すことができます。分散データセットの操作については後で説明します。

並列化されたコレクションは、ドライバープログラムの既存の Collection に対して JavaSparkContextparallelize メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。たとえば、1 から 5 までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

作成後、分散データセット (distData) は並列に操作できます。たとえば、リストの要素を合計するために distData.reduce((a, b) -> a + b) を呼び出すことができます。分散データセットの操作については後で説明します。

並列コレクションの重要なパラメータの 1 つは、データセットを分割する パーティション の数です。Spark は各パーティションに対して 1 つのタスクを実行します。通常、クラスターの各 CPU に対して 2〜4 個のパーティションが望ましいです。通常、Spark はクラスターに基づいてパーティション数を自動的に設定しようとします。ただし、parallelize に 2 番目のパラメータとして渡すことで手動で設定することもできます (例:sc.parallelize(data, 10))。注意: コードのいくつかの場所では、後方互換性を維持するためにスライス (パーティションの同義語) という用語が使用されています。

外部データセット

PySpark は、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3 など、Hadoop がサポートするあらゆるストレージソースから分散データセットを作成できます。Spark はテキストファイル、SequenceFiles、およびその他の Hadoop InputFormat をサポートしています。

テキストファイル RDD は、SparkContexttextFile メソッドを使用して作成できます。このメソッドは、ファイルの URI (ローカルパスまたは hdfs://s3a:// などの URI) を受け取り、それを行のコレクションとして読み取ります。例:

>>> distFile = sc.textFile("data.txt")

作成後、distFile はデータセット操作で処理できます。たとえば、map および reduce 操作を使用して、すべての行のサイズを合計できます。例:distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

Spark でファイルを読み取ることに関する注意点

  • ローカルファイルシステム上のパスを使用する場合、そのファイルはワーカーノードでも同じパスでアクセス可能である必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークでマウントされた共有ファイルシステムを使用してください。

  • textFile を含む Spark のすべてのファイルベース入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードもサポートしています。たとえば、textFile("/my/directory")textFile("/my/directory/*.txt")、および textFile("/my/directory/*.gz") を使用できます。

  • textFile メソッドは、ファイルのパーティション数を制御するためのオプションの 2 番目の引数も取ります。デフォルトでは、Spark はファイルごとに 1 つのパーティションを作成します (HDFS ではデフォルトで 128MB)。ただし、より大きな値を渡して、より多くのパーティションを要求することもできます。注意:ブロック数より少ないパーティションを持つことはできません。

テキストファイル以外に、Spark の Python API は他のいくつかのデータ形式もサポートしています。

  • SparkContext.wholeTextFiles は、複数の小さなテキストファイルを含むディレクトリを読み込み、各ファイルを (ファイル名、コンテンツ) のペアとして返します。これは、textFile が各ファイルの各行を 1 つのレコードとして返すのとは対照的です。

  • RDD.saveAsPickleFile および SparkContext.pickleFile は、ピクル化された Python オブジェクトの単純な形式で RDD を保存することをサポートしています。ピクルシリアライズではバッチ処理が使用され、デフォルトのバッチサイズは 10 です。

  • SequenceFile および Hadoop Input/Output Formats

注意 この機能は現在 Experimental としてマークされており、高度なユーザーを対象としています。将来的に Spark SQL に基づいた読み取り/書き込みサポートに置き換えられる可能性があり、その場合は Spark SQL が推奨されるアプローチです。

Writable サポート

PySpark SequenceFile サポートは、Java 内のキー・バリューペアの RDD をロードし、Writable を基本 Java 型に変換し、結果の Java オブジェクトを pickle を使用してピクル化します。キー・バリューペアの RDD を SequenceFile に保存する場合、PySpark は逆を行います。Python オブジェクトを Java オブジェクトにアンピクル化し、それを Writable に変換します。以下の Writable は自動的に変換されます。

Writable タイプPython タイプ
Textstr
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableなし
MapWritabledict

配列は標準では扱われません。ユーザーは、読み取りまたは書き込み時にカスタム ArrayWritable サブタイプを指定する必要があります。書き込み時には、ユーザーは配列をカスタム ArrayWritable サブタイプに変換するカスタムコンバータも指定する必要があります。読み取り時には、デフォルトのコンバータはカスタム ArrayWritable サブタイプを Java Object[] に変換し、それが Python タプルにピクル化されます。プリミティブ型の配列の Python array.array を取得するには、ユーザーがカスタムコンバータを指定する必要があります。

SequenceFiles の保存と読み込み

テキストファイルと同様に、SequenceFile はパスを指定して保存および読み込むことができます。キーと値のクラスを指定できますが、標準の Writable では不要です。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

その他の Hadoop Input/Output Format の保存と読み込み

PySpark は、新しい Hadoop MapReduce API および古い Hadoop MapReduce API の両方について、任意の Hadoop InputFormat を読み取ったり、任意の Hadoop OutputFormat を書き込んだりすることもできます。必要に応じて、Python dict として Hadoop 設定を渡すことができます。Elasticsearch ESInputFormat を使用した例を次に示します。

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

注意:InputFormat が Hadoop 設定および/または入力パスにのみ依存しており、キーと値のクラスが上記の表に従って簡単に変換できる場合、このアプローチはこのようなケースでうまく機能するはずです。

カスタムシリアライズされたバイナリデータ (Cassandra/HBase からのデータ読み込みなど) がある場合は、まず Scala/Java 側で pickle の pickler で処理できるものにデータを変換する必要があります。Converter トレイトが提供されています。このトレイトを拡張し、convert メソッドに変換コードを実装するだけです。このクラスと InputFormat にアクセスするために必要な依存関係が Spark ジョブ jar にパッケージ化され、PySpark クラスパスに含まれていることを確認してください。

Python の例Converter の例 を参照して、Cassandra / HBase InputFormat および OutputFormat をカスタムコンバータと共に使用する例を確認してください。

Spark は、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3 など、Hadoop がサポートするあらゆるストレージソースから分散データセットを作成できます。Spark はテキストファイル、SequenceFiles、およびその他の Hadoop InputFormat をサポートしています。

テキストファイル RDD は、SparkContexttextFile メソッドを使用して作成できます。このメソッドは、ファイルの URI (ローカルパスまたは hdfs://s3a:// などの URI) を受け取り、それを行のコレクションとして読み取ります。例:

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

作成後、distFile はデータセット操作で処理できます。たとえば、map および reduce 操作を使用して、すべての行のサイズを合計できます。例:distFile.map(s => s.length).reduce((a, b) => a + b)

Spark でファイルを読み取ることに関する注意点

  • ローカルファイルシステム上のパスを使用する場合、そのファイルはワーカーノードでも同じパスでアクセス可能である必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークでマウントされた共有ファイルシステムを使用してください。

  • textFile を含む Spark のすべてのファイルベース入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードもサポートしています。たとえば、textFile("/my/directory")textFile("/my/directory/*.txt")、および textFile("/my/directory/*.gz") を使用できます。複数のファイルが読み取られる場合、パーティションの順序はファイルシステムから返される順序によって決まります。たとえば、パスによるファイルの辞書順ソートに従う場合もあれば、そうでない場合もあります。パーティション内では、要素は基になるファイル内の順序に従って並べられます。

  • textFile メソッドは、ファイルのパーティション数を制御するためのオプションの 2 番目の引数も取ります。デフォルトでは、Spark はファイルごとに 1 つのパーティションを作成します (HDFS ではデフォルトで 128MB)。ただし、より大きな値を渡して、より多くのパーティションを要求することもできます。注意:ブロック数より少ないパーティションを持つことはできません。

テキストファイル以外に、Spark の Scala API は他のいくつかのデータ形式もサポートしています。

  • SparkContext.wholeTextFiles は、複数の小さなテキストファイルを含むディレクトリを読み込み、各ファイルを (ファイル名、コンテンツ) のペアとして返します。これは、textFile が各ファイルの各行を 1 つのレコードとして返すのとは対照的です。パーティショニングはデータローカリティによって決定されるため、場合によってはパーティションが少なすぎる可能性があります。そのような場合、wholeTextFiles は最小パーティション数を制御するためのオプションの 2 番目の引数を提供します。

  • SequenceFiles の場合、SparkContext の sequenceFile[K, V] メソッドを使用します。ここで KV はファイル内のキーと値の型です。これらは、Hadoop の Writable インターフェースのサブクラスである必要があります。たとえば、IntWritableText です。さらに、Spark は一般的な Writable のいくつかに対してネイティブ型を指定できます。たとえば、sequenceFile[Int, String] は IntWritable と Text を自動的に読み取ります。

  • その他の Hadoop InputFormats については、SparkContext.hadoopRDD メソッドを使用できます。これは、任意の JobConf と入力フォーマットクラス、キークラス、値クラスを取ります。これらは、入力ソースを持つ Hadoop ジョブの場合と同様に設定します。また、「新しい」MapReduce API (org.apache.hadoop.mapreduce) に基づく InputFormats の場合は SparkContext.newAPIHadoopRDD を使用することもできます。

  • RDD.saveAsObjectFile および SparkContext.objectFile は、Java オブジェクトのシリアライズされた単純な形式で RDD を保存することをサポートしています。これは Avro のような専門化された形式ほど効率的ではありませんが、任意の RDD を簡単に保存できます。

Spark は、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3 など、Hadoop がサポートするあらゆるストレージソースから分散データセットを作成できます。Spark はテキストファイル、SequenceFiles、およびその他の Hadoop InputFormat をサポートしています。

テキストファイル RDD は、SparkContexttextFile メソッドを使用して作成できます。このメソッドは、ファイルの URI (ローカルパスまたは hdfs://s3a:// などの URI) を受け取り、それを行のコレクションとして読み取ります。例:

JavaRDD<String> distFile = sc.textFile("data.txt");

作成後、distFile はデータセット操作で処理できます。たとえば、map および reduce 操作を使用して、すべての行のサイズを合計できます。例:distFile.map(s -> s.length()).reduce((a, b) -> a + b)

Spark でファイルを読み取ることに関する注意点

  • ローカルファイルシステム上のパスを使用する場合、そのファイルはワーカーノードでも同じパスでアクセス可能である必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークでマウントされた共有ファイルシステムを使用してください。

  • textFile を含む Spark のすべてのファイルベース入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードもサポートしています。たとえば、textFile("/my/directory")textFile("/my/directory/*.txt")、および textFile("/my/directory/*.gz") を使用できます。

  • textFile メソッドは、ファイルのパーティション数を制御するためのオプションの 2 番目の引数も取ります。デフォルトでは、Spark はファイルごとに 1 つのパーティションを作成します (HDFS ではデフォルトで 128MB)。ただし、より大きな値を渡して、より多くのパーティションを要求することもできます。注意:ブロック数より少ないパーティションを持つことはできません。

テキストファイル以外に、Spark の Java API は他のいくつかのデータ形式もサポートしています。

  • JavaSparkContext.wholeTextFiles は、複数の小さなテキストファイルを含むディレクトリを読み込み、各ファイルを (ファイル名、コンテンツ) のペアとして返します。これは、textFile が各ファイルの各行を 1 つのレコードとして返すのとは対照的です。

  • SequenceFiles の場合、SparkContext の sequenceFile[K, V] メソッドを使用します。ここで KV はファイル内のキーと値の型です。これらは、Hadoop の Writable インターフェースのサブクラスである必要があります。たとえば、IntWritableText です。

  • その他の Hadoop InputFormats については、JavaSparkContext.hadoopRDD メソッドを使用できます。これは、任意の JobConf と入力フォーマットクラス、キークラス、値クラスを取ります。これらは、入力ソースを持つ Hadoop ジョブの場合と同様に設定します。また、「新しい」MapReduce API (org.apache.hadoop.mapreduce) に基づく InputFormats の場合は JavaSparkContext.newAPIHadoopRDD を使用することもできます。

  • JavaRDD.saveAsObjectFile および JavaSparkContext.objectFile は、Java オブジェクトのシリアライズされた単純な形式で RDD を保存することをサポートしています。これは Avro のような専門化された形式ほど効率的ではありませんが、任意の RDD を簡単に保存できます。

RDD 操作

RDD は 2 種類の操作をサポートしています。変換 (既存のデータセットから新しいデータセットを作成する) と アクション (データセットに対する計算を実行した後、ドライバープログラムに値を返す) です。たとえば、map は、各データセット要素を関数で処理し、結果を表す新しい RDD を返す変換です。一方、reduce は、一部の関数を使用して RDD のすべての要素を集約し、最終結果をドライバープログラムに返すアクションです (ただし、分散データセットを返す並列 reduceByKey もあります)。

Spark のすべての変換は 遅延 です。つまり、結果をすぐに計算しません。代わりに、基になるデータセット (例: ファイル) に適用された変換を記憶するだけです。変換は、アクションがドライバープログラムに結果を返す必要がある場合にのみ計算されます。この設計により、Spark はより効率的に実行できます。たとえば、map を通じて作成されたデータセットが reduce で使用されることを認識し、より大きなマップされたデータセットではなく、reduce の結果のみをドライバーに返すことができます。

デフォルトでは、各変換された RDD は、アクションを実行するたびに再計算される可能性があります。ただし、persist (または cache) メソッドを使用して RDD をメモリに 永続化 することもできます。この場合、Spark はクラスター上の要素を保持し、次回クエリを実行するときに効率的にアクセスできます。RDD をディスクに永続化したり、複数のノードに複製したりすることもサポートされています。

基本

RDD の基本を説明するために、次の簡単なプログラムを検討してください。

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

最初の行は、外部ファイルから基になる RDD を定義します。このデータセットはメモリにロードされたり、その他の方法で処理されたりしません。lines は単にファイルへのポインタです。2 番目の行は、map 変換の結果として lineLengths を定義します。ここでも、遅延のため、lineLengths はすぐに計算されません。最後に、アクションである reduce を実行します。この時点で Spark は、計算を別々のマシンで実行されるタスクに分割し、各マシンはマップとその部分のローカル削減の両方を実行し、その結果のみをドライバープログラムに返します。

後で lineLengths を再度使用したい場合は、次を追加できます。

lineLengths.persist()

reduce の前に、これにより lineLengths が初めて計算された後にメモリに保存されます。

RDD の基本を説明するために、次の簡単なプログラムを検討してください。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

最初の行は、外部ファイルから基になる RDD を定義します。このデータセットはメモリにロードされたり、その他の方法で処理されたりしません。lines は単にファイルへのポインタです。2 番目の行は、map 変換の結果として lineLengths を定義します。ここでも、遅延のため、lineLengths はすぐに計算されません。最後に、アクションである reduce を実行します。この時点で Spark は、計算を別々のマシンで実行されるタスクに分割し、各マシンはマップとその部分のローカル削減の両方を実行し、その結果のみをドライバープログラムに返します。

後で lineLengths を再度使用したい場合は、次を追加できます。

lineLengths.persist()

reduce の前に、これにより lineLengths が初めて計算された後にメモリに保存されます。

RDD の基本を説明するために、次の簡単なプログラムを検討してください。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

最初の行は、外部ファイルから基になる RDD を定義します。このデータセットはメモリにロードされたり、その他の方法で処理されたりしません。lines は単にファイルへのポインタです。2 番目の行は、map 変換の結果として lineLengths を定義します。ここでも、遅延のため、lineLengths はすぐに計算されません。最後に、アクションである reduce を実行します。この時点で Spark は、計算を別々のマシンで実行されるタスクに分割し、各マシンはマップとその部分のローカル削減の両方を実行し、その結果のみをドライバープログラムに返します。

後で lineLengths を再度使用したい場合は、次を追加できます。

lineLengths.persist(StorageLevel.MEMORY_ONLY());

reduce の前に、これにより lineLengths が初めて計算された後にメモリに保存されます。

Spark への関数の引き渡し

Spark の API は、クラスターで実行するためにドライバープログラムに関数を渡すことに大きく依存しています。これを行うための 3 つの推奨される方法があります。

  • ラムダ式: 式として記述できる簡単な関数の場合。(ラムダは複数ステートメントの関数や値を返さないステートメントはサポートしません。)
  • Spark を呼び出す関数内のローカル def: より長いコードの場合。
  • モジュールのトップレベル関数。

たとえば、lambda ではサポートできないより長い関数を渡すには、次のコードを検討してください。

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

メソッドへの参照をクラスインスタンス (シングルトンオブジェクトとは対照的) に渡すことも可能ですが、そのメソッドを含むオブジェクトも一緒に送信する必要があります。たとえば、次を検討してください。

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

ここで、new MyClass を作成し、それに対して doStuff を呼び出すと、その中の mapその MyClass インスタンスの func メソッドを参照するため、オブジェクト全体をクラスターに送信する必要があります。

同様に、外側のオブジェクトのフィールドにアクセスすると、オブジェクト全体が参照されます。

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

この問題を回避するには、最も簡単な方法は、外部からアクセスするのではなく、field をローカル変数にコピーすることです。

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

Spark の API は、クラスターで実行するためにドライバープログラムに関数を渡すことに大きく依存しています。これを行うための 2 つの推奨される方法があります。

  • 匿名関数構文: 短いコード片に使用できます。
  • グローバルシングルトンオブジェクト内の静的メソッド。たとえば、MyFunctions オブジェクトを定義してから、MyFunctions.func1 を次のように渡すことができます。
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

メソッドへの参照をクラスインスタンス (シングルトンオブジェクトとは対照的) に渡すことも可能ですが、そのメソッドを含むオブジェクトも一緒に送信する必要があります。たとえば、次を検討してください。

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

ここで、新しい MyClass インスタンスを作成し、それに対して doStuff を呼び出すと、その中の mapその MyClass インスタンスの func1 メソッドを参照するため、オブジェクト全体をクラスターに送信する必要があります。これは、rdd.map(x => this.func1(x)) と書くのと似ています。

同様に、外側のオブジェクトのフィールドにアクセスすると、オブジェクト全体が参照されます。

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

は、rdd.map(x => this.field + x) と書くのと同じであり、これは this のすべてを参照します。この問題を回避するには、外部からアクセスするのではなく、field をローカル変数にコピーするのが最も簡単な方法です。

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

Spark の API は、クラスターで実行するためにドライバープログラムに関数を渡すことに大きく依存しています。Java では、関数は org.apache.spark.api.java.function パッケージのインターフェースを実装するクラスによって表されます。これらの関数を作成するには 2 つの方法があります。

  • 独自のクラスで Function インターフェースを実装します (匿名内部クラスまたは名前付きクラスのいずれか)。そして、そのインスタンスを Spark に渡します。
  • ラムダ式 を使用して、実装を簡潔に定義します。

このガイドの多くは簡潔さのためにラムダ構文を使用していますが、すべての API を長形式で簡単に使用できます。たとえば、上記のコードを次のように記述できます。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

または、関数をインラインで記述するのが煩雑な場合。

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

Java の匿名内部クラスは、final としてマークされている限り、エンクロージングスコープの変数にアクセスすることもできることに注意してください。Spark は、他の言語と同様に、これらの変数のコピーを各ワーカーノードに送信します。

クロージャの理解

Spark に関する難しいことの 1 つは、クラスター全体でコードを実行する際の変数とメソッドのスコープとライフサイクルを理解することです。スコープ外の変数を変更する RDD 操作は、混乱の原因となることがよくあります。次の例では、foreach() を使用してカウンターをインクリメントするコードを取り上げますが、同様の問題は他の操作でも発生する可能性があります。

単純な RDD 要素の合計を次に示します。これは、実行が同じ JVM 内で行われるかどうかに応じて異なる動作をする可能性があります。この一般的な例は、Spark を local モード (--master = "local[n]") で実行する場合と、Spark アプリケーションをクラスターにデプロイする場合 (例:YARN への spark-submit 経由) です。

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

ローカルモード vs. クラスターモード

上記のコードの動作は未定義であり、意図したとおりに動作しない可能性があります。ジョブを実行するために、Spark は RDD 操作の処理をタスクに分割し、各タスクはエグゼキュータによって実行されます。実行前に、Spark はタスクの クロージャ を計算します。クロージャとは、エグゼキュータが RDD に対して計算を実行するために可視である必要がある変数とメソッド (この場合は foreach()) です。このクロージャはシリアライズされ、各エグゼキュータに送信されます。

各エグゼキュータに送信されたクロージャ内の変数はコピーされるため、foreach 関数内で counter が参照されると、ドライバーノードの counter ではなくなります。ドライバーノードのメモリにはまだ counter がありますが、これはエグゼキュータからはもう見えません!エグゼキュータは、シリアライズされたクロージャからのコピーのみを参照します。したがって、counter の最終値は 0 のままになります。これは、counter に対するすべての操作がシリアライズされたクロージャ内の値を参照していたためです。

ローカルモードでは、状況によっては、foreach 関数がドライバーと同じ JVM 内で実行され、同じ元の counter を参照し、実際に更新する可能性があります。

これらのシナリオで定義済みの動作を確保するには、Accumulator を使用する必要があります。Spark のアキュムレータは、クラスター内のワーカーノード間で実行が分割されている場合に安全に変数を更新するためのメカニズムを提供する目的で使用されます。このガイドのアキュムレータのセクションで、これらについて詳しく説明します。

一般的に、クロージャ (ループやローカルで定義されたメソッドなどの構造) は、グローバル状態を変更するために使用すべきではありません。Spark は、クロージャの外側から参照されるオブジェクトへの変更の動作を定義または保証しません。これを行う一部のコードはローカルモードで機能するかもしれませんが、それは偶然であり、そのようなコードは分散モードでは意図したとおりに動作しません。グローバル集計が必要な場合は、代わりにアキュムレータを使用してください。

RDD の要素の表示

もう 1 つの一般的なイディオムは、rdd.foreach(println) または rdd.map(println) を使用して RDD の要素を印刷しようとすることです。単一のマシンでは、期待される出力が生成され、RDD のすべての要素が印刷されます。ただし、cluster モードでは、エグゼキュータによって呼び出される stdout への出力は、ドライバーの stdout ではなく、エグゼキュータの stdout に書き込まれるため、ドライバーの stdout には表示されません!ドライバーにすべての要素を印刷するには、collect() メソッドを使用して RDD をドライバーノードに取得します。例:rdd.collect().foreach(println)。ただし、collect() は RDD 全体を 1 台のマシンにフェッチするため、ドライバーがメモリ不足になる可能性があります。少数の要素のみを印刷する必要がある場合は、より安全なアプローチは take() を使用することです。例:rdd.take(100).foreach(println)

キー・バリューペアの操作

Spark のほとんどの操作は任意のオブジェクト型の RDD で動作しますが、いくつかの特別な操作はキー・バリューペアの RDD でのみ利用可能です。最も一般的なものは、キーによる要素のグループ化や集約などの分散 "シャッフル" 操作です。

Python では、これらの操作は (1, 2) のような組み込み Python タプルを含む RDD で機能します。これらのタプルを作成し、目的の操作を呼び出すだけです。

たとえば、次のコードはキー・バリューペアの reduceByKey 操作を使用して、ファイル内の各行が何回出現するかをカウントします。

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

たとえば、counts.sortByKey() を使用してペアをアルファベット順にソートし、最後に counts.collect() を使用してオブジェクトのリストとしてドライバープログラムに戻すこともできます。

Spark のほとんどの操作は任意のオブジェクト型の RDD で動作しますが、いくつかの特別な操作はキー・バリューペアの RDD でのみ利用可能です。最も一般的なものは、キーによる要素のグループ化や集約などの分散 "シャッフル" 操作です。

Scala では、これらの操作は Tuple2 オブジェクト (言語の組み込みタプル、単に (a, b) と記述して作成) を含む RDD で自動的に利用可能になります。キー・バリューペア操作は、PairRDDFunctions クラスで利用可能であり、これはタプルの RDD を自動的にラップします。

たとえば、次のコードはキー・バリューペアの reduceByKey 操作を使用して、ファイル内の各行が何回出現するかをカウントします。

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

たとえば、counts.sortByKey() を使用してペアをアルファベット順にソートし、最後に counts.collect() を使用してオブジェクトの配列としてドライバープログラムに戻すこともできます。

注意: キー・バリューペア操作でカスタムオブジェクトをキーとして使用する場合、カスタム equals() メソッドに一致する hashCode() メソッドが伴っていることを確認する必要があります。詳細については、Object.hashCode() ドキュメント に概説されている契約を参照してください。

Spark のほとんどの操作は任意のオブジェクト型の RDD で動作しますが、いくつかの特別な操作はキー・バリューペアの RDD でのみ利用可能です。最も一般的なものは、キーによる要素のグループ化や集約などの分散 "シャッフル" 操作です。

Java では、キー・バリューペアは Scala 標準ライブラリの scala.Tuple2 クラスを使用して表現されます。タプルを作成するには new Tuple2(a, b) を呼び出すだけで、後で tuple._1()tuple._2() でフィールドにアクセスできます。

キー・バリューペアの RDD は、JavaPairRDD クラスによって表されます。mapToPairflatMapToPair のような map 操作の特別なバージョンを使用して、JavaRDD から JavaPairRDD を構築できます。JavaPairRDD には、標準の RDD 関数と特別なキー・バリュー関数があります。

たとえば、次のコードはキー・バリューペアの reduceByKey 操作を使用して、ファイル内の各行が何回出現するかをカウントします。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

たとえば、counts.sortByKey() を使用してペアをアルファベット順にソートし、最後に counts.collect() を使用してオブジェクトの配列としてドライバープログラムに戻すこともできます。

注意: キー・バリューペア操作でカスタムオブジェクトをキーとして使用する場合、カスタム equals() メソッドに一致する hashCode() メソッドが伴っていることを確認する必要があります。詳細については、Object.hashCode() ドキュメント に概説されている契約を参照してください。

変換 (Transformations)

次の表は、Spark でサポートされている一般的な変換の一部を示しています。詳細については、RDD API ドキュメント (PythonScalaJavaR) およびペア RDD 関数ドキュメント (ScalaJava) を参照してください。

変換 (Transformation)意味
map(func) ソースの各要素を関数 func で処理して形成された新しい分散データセットを返します。
filter(func) ソースの要素のうち、func が true を返すものだけを選択して形成された新しいデータセットを返します。
flatMap(func) map と似ていますが、各入力項目は 0 個以上の出力項目にマッピングできます (したがって、func は単一の項目ではなく Seq を返す必要があります)。
mapPartitions(func) map と似ていますが、RDD の各パーティション (ブロック) で個別に実行されます。したがって、T 型の RDD で実行する場合、func は Iterator<T> => Iterator<U> の型である必要があります。
mapPartitionsWithIndex(func) mapPartitions と似ていますが、パーティションのインデックスを表す整数値も func に提供します。したがって、T 型の RDD で実行する場合、func は (Int, Iterator<T>) => Iterator<U> の型である必要があります。
sample(withReplacement, fraction, seed) 指定された乱数生成器シードを使用して、置換ありまたはなしで、データの一部 fraction をサンプリングします。
union(otherDataset) ソースデータセットと引数の要素の和集合を含む新しいデータセットを返します。
intersection(otherDataset) ソースデータセットと引数の要素の共通部分を含む新しい RDD を返します。
distinct([numPartitions])) ソースデータセットの重複しない要素を含む新しいデータセットを返します。
groupByKey([numPartitions]) (K, V) ペアのデータセットで呼び出されると、(K, Iterable<V>) ペアのデータセットを返します。
注意: 各キーの集計を実行するためにグループ化している場合は、reduceByKey または aggregateByKey を使用するとパフォーマンスが大幅に向上します。
注意: デフォルトでは、出力の並列度合いは親 RDD のパーティション数に依存します。オプションの numPartitions 引数を渡して、異なるタスク数を設定できます。
reduceByKey(func, [numPartitions]) (K, V) ペアのデータセットで呼び出されると、各キーの値が指定された削減関数 func (型は (V,V) => V である必要があります) を使用して集計された (K, V) ペアのデータセットを返します。groupByKey と同様に、削減タスクの数はオプションの 2 番目の引数で設定可能です。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) (K, V) ペアのデータセットで呼び出されると、各キーの値が指定された結合関数と中立的な "ゼロ" 値を使用して集計された (K, U) ペアのデータセットを返します。不要な割り当てを回避しながら、入力値型とは異なる集計値型を許可します。groupByKey と同様に、削減タスクの数はオプションの 2 番目の引数で設定可能です。
sortByKey([ascending], [numPartitions]) K が Ordered を実装する (K, V) ペアのデータセットで呼び出されると、ブール値 ascending 引数で指定された昇順または降順でキーによってソートされた (K, V) ペアのデータセットを返します。
join(otherDataset, [numPartitions]) (K, V) および (K, W) 型のデータセットで呼び出されると、各キーのすべてのペアの (K, (V, W)) ペアのデータセットを返します。外部結合は、leftOuterJoinrightOuterJoinfullOuterJoin を通じてサポートされます。
cogroup(otherDataset, [numPartitions]) (K, V) および (K, W) 型のデータセットで呼び出されると、(K, (Iterable<V>, Iterable<W>)) タプルのデータセットを返します。この操作は groupWith とも呼ばれます。
cartesian(otherDataset) T 型と U 型のデータセットで呼び出されると、(T, U) ペアのデータセット (すべての要素のペア) を返します。
pipe(command, [envVars]) RDD の各パーティションをシェルコマンド (例: Perl または bash スクリプト) にパイプします。RDD 要素はプロセスの標準入力に書き込まれ、その標準出力への行は文字列の RDD として返されます。
coalesce(numPartitions) RDD のパーティション数を numPartitions に減らします。大量のデータセットをフィルタリングした後に操作をより効率的に実行する場合に便利です。
repartition(numPartitions) RDD のデータをランダムにシャッフルして、パーティションを増減させ、それらの間でバランスを取ります。これは常にすべてのデータをネットワーク全体でシャッフルします。
repartitionAndSortWithinPartitions(partitioner) 指定されたパーティショナーに従って RDD を再パーティション化し、結果の各パーティション内でキーでレコードをソートします。これは、repartition を呼び出して各パーティション内でソートするよりも効率的です。なぜなら、ソートをシャッフル機構にプッシュできるからです。

アクション (Actions)

次の表は、Spark でサポートされている一般的なアクションの一部を示しています。詳細については、RDD API ドキュメント (PythonScalaJavaR)

およびペア RDD 関数ドキュメント (ScalaJava) を参照してください。

アクション (Action)意味
reduce(func) データセットの要素を関数 func (2 つの引数を取り 1 つを返す) を使用して集約します。関数は可換かつ結合的である必要があり、並列に正しく計算できるようにします。
collect() データセットのすべての要素をドライバープログラムの配列として返します。これは通常、フィルタリング後や、データのごく一部を返すその他の操作の後で役立ちます。
count() データセットの要素数を返します。
first() データセットの最初の要素を返します (take(1) に似ています)。
take(n) データセットの最初の n 個の要素を含む配列を返します。
takeSample(withReplacement, num, [seed]) 置換ありまたはなしで、データセットから num 個の要素のランダムなサンプルを含む配列を返します。オプションで乱数生成器シードを事前に指定できます。
takeOrdered(n, [ordering]) 自然順序またはカスタムコンパレータを使用して、RDD の最初の n 個の要素を返します。
saveAsTextFile(path) ローカルファイルシステム、HDFS、またはその他の Hadoop 対応ファイルシステムの指定されたディレクトリに、データセットの要素をテキストファイル (またはテキストファイルのセット) として書き込みます。Spark は各要素に toString を呼び出して、ファイルに行として変換します。
saveAsSequenceFile(path)
(Java および Scala)
ローカルファイルシステム、HDFS、またはその他の Hadoop 対応ファイルシステムの指定されたパスに、データセットの要素を Hadoop SequenceFile として書き込みます。これは、Hadoop の Writable インターフェースを実装するキー・バリューペアの RDD で利用できます。Scala では、Writable に暗黙的に変換可能な型でも利用できます (Spark は Int、Double、String などの基本型への変換を含みます)。
saveAsObjectFile(path)
(Java および Scala)
データセットの要素を Java シリアライズを使用した単純な形式で書き込みます。これは SparkContext.objectFile() を使用して読み込むことができます。
countByKey() (K, V) 型の RDD のみで利用可能です。各キーのカウントを含む (K, Int) ペアのハッシュマップを返します。
foreach(func) データセットの各要素で関数 func を実行します。これは通常、Accumulator の更新や外部ストレージシステムとの対話などの副作用のために行われます。
注意: foreach() の外部にある変数への変更は、未定義の動作を引き起こす可能性があります。詳細については、クロージャの理解 を参照してください。

Spark RDD API は、foreachforeachAsync のようなアクションの非同期バージョンも公開しており、アクションの完了を待機するのではなく、呼び出し元に FutureAction をすぐに返します。これは、アクションの非同期実行を管理または待機するために使用できます。

シャッフル操作

Spark 内の特定の操作は、シャッフルとして知られるイベントをトリガーします。シャッフルは、パーティション間で異なるようにグループ化されるようにデータを再分散するための Spark のメカニズムです。これには通常、エグゼキュータとマシン全体でのデータコピーが含まれるため、シャッフルは複雑でコストのかかる操作になります。

背景

シャッフル中に何が起こるかを理解するために、reduceByKey 操作の例を検討できます。reduceByKey 操作は、単一キーのすべての値がキーと、そのキーに関連付けられたすべての値に対して実行された削減関数の結果とのタプルに結合される新しい RDD を生成します。課題は、単一キーのすべての値が必ずしも同じパーティション、さらには同じマシン上にあるとは限らないことですが、結果を計算するには共存させる必要があります。

Spark では、データは一般的に、特定の操作に必要な場所に配置されるようにパーティション全体に分散されていません。計算中、単一のタスクは単一のパーティションを処理します。したがって、reduceByKey 削減タスクのすべてのデータを整理して実行するために、Spark はすべて対すべて (all-to-all) の操作を実行する必要があります。すべてのパーティションから読み取ってすべてのキーのすべての値を見つけ、次にパーティション間で値をまとめて各キーの最終結果を計算する必要があります。これはシャッフルと呼ばれます。

新しくシャッフルされたデータの各パーティションの要素のセットは決定的であり、パーティション自体の順序も同様ですが、これらの要素の順序はそうではありません。シャッフルの後に予測可能な順序のデータが必要な場合は、次を使用できます。

シャッフルを引き起こす可能性のある操作には、repartition および coalesce のような 再パーティション化 操作、count 以外の 'ByKey 操作 (groupByKeyreduceByKey など)、および cogroupjoin のような 結合 操作が含まれます。

パフォーマンスへの影響

シャッフル は、ディスク I/O、データシリアライズ、ネットワーク I/O を伴うため、高コストな操作です。シャッフルのためのデータを整理するために、Spark はタスクのセットを生成します。つまり、データを整理するための マップ タスクと、それを集約するための 削減 タスクのセットです。この命名法は MapReduce に由来しており、Spark の map および reduce 操作とは直接関係ありません。

内部的には、個々のマップタスクの結果は、それらが収まらないまでメモリに保持されます。その後、ターゲットパーティションに基づいてソートされ、単一のファイルに書き込まれます。削減側では、タスクは関連するソート済みブロックを読み取ります。

一部のシャッフル操作は、転送前または転送後にレコードを整理するためにインメモリデータ構造を使用するため、大量のヒープメモリを消費する可能性があります。具体的には、reduceByKey および aggregateByKey はマップ側でこれらの構造を作成し、'ByKey 操作は削減側でこれらの構造を生成します。データがメモリに収まらない場合、Spark はこれらのテーブルをディスクにスピルし、ディスク I/O の追加オーバーヘッドとガベージコレクションの増加を招きます。

シャッフルはディスクに多数の中間ファイルを生成します。Spark 1.3 以降、これらのファイルは、対応する RDD がもはや使用されなくなり、ガベージコレクションされるまで保持されます。これは、ラインエージが再計算された場合にシャッフルファイルを再作成する必要がないようにするためです。ガベージコレクションは、アプリケーションがこれらの RDD への参照を保持している場合、または GC が頻繁に発生しない場合、長期間経過してからのみ発生する可能性があります。これは、長期間実行される Spark ジョブが大量のディスクスペースを消費する可能性があることを意味します。一時ストレージディレクトリは、Spark コンテキストを構成する際に spark.local.dir 設定パラメータによって指定されます。

シャッフルの動作は、様々な設定パラメータを調整することで調整できます。Spark 設定ガイドの「シャッフル動作」セクションを参照してください。

RDD の永続化

Spark の最も重要な機能の 1 つは、操作全体でデータセットをメモリに 永続化 (または キャッシュ) することです。RDD を永続化すると、各ノードは計算した RDD のパーティションをメモリに格納し、そのデータセット (またはそれから派生したデータセット) に対する他の操作で再利用します。これにより、将来のアクションがはるかに高速になります (多くの場合 10 倍以上)。キャッシングは、反復アルゴリズムと高速な対話型使用のための重要なツールです。

RDD に persist() または cache() メソッドを使用して永続化マークを付けることができます。アクションで初めて計算されると、ノードのメモリに保持されます。Spark のキャッシュは耐障害性があります。RDD のパーティションが失われた場合、元の作成元となった変換を使用して自動的に再計算されます。

さらに、各永続化された RDD は異なる ストレージレベル を使用して格納できます。これにより、たとえば、データセットをディスクに永続化したり、メモリに永続化するがシリアライズされた Java オブジェクトとして (スペースを節約するため) したり、ノード間で複製したりできます。これらのレベルは、StorageLevel オブジェクト (PythonScalaJava) を persist() に渡すことによって設定されます。cache() メソッドは、デフォルトのストレージレベルである StorageLevel.MEMORY_ONLY (デシリアライズされたオブジェクトをメモリに格納) を使用するショートカットです。ストレージレベルの完全なセットは次のとおりです。

ストレージレベル意味
MEMORY_ONLY RDD をデシリアライズされた Java オブジェクトとして JVM に格納します。RDD がメモリに収まらない場合、一部のパーティションはキャッシュされず、必要になるたびにオンザフライで再計算されます。これがデフォルトレベルです。
MEMORY_AND_DISK RDD をデシリアライズされた Java オブジェクトとして JVM に格納します。RDD がメモリに収まらない場合、収まらないパーティションをディスクにスピルし、必要になったときにそこから読み取ります。
MEMORY_ONLY_SER
(Java および Scala)
RDD を シリアライズされた Java オブジェクト (パーティションごとに 1 つのバイト配列) として格納します。これは、特に 高速シリアライザー を使用する場合、デシリアライズされたオブジェクトよりも一般的にスペース効率が高くなりますが、読み取りには CPU 負荷が高くなります。
MEMORY_AND_DISK_SER
(Java および Scala)
MEMORY_ONLY_SER と似ていますが、メモリに収まらないパーティションは、必要になるたびにオンザフライで再計算するのではなく、ディスクにスピルします。
DISK_ONLY RDD パーティションをディスクにのみ格納します。
MEMORY_ONLY_2, MEMORY_AND_DISK_2 など。 上記レベルと同じですが、各パーティションを 2 つのクラスターノードに複製します。
OFF_HEAP (実験的) MEMORY_ONLY_SER と似ていますが、データを オフヒープメモリ に格納します。これにはオフヒープメモリを有効にする必要があります。

注意: Python では、格納されるオブジェクトは常に Pickle ライブラリを使用してシリアライズされるため、シリアライズされたレベルを選択するかどうかは関係ありません。Python で利用可能なストレージレベルには、MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLYDISK_ONLY_2、および DISK_ONLY_3 が含まれます。

Spark はシャッフル操作 (例:reduceByKey) の一部の中間データも自動的に永続化します。これは、シャッフル中にノードが障害を起こした場合に、入力全体を再計算するのを避けるためです。それでも、結果の RDD を再利用する予定がある場合は、persist を呼び出すことをお勧めします。

どのストレージレベルを選択するか?

Spark のストレージレベルは、メモリ使用量と CPU 効率の間で異なるトレードオフを提供するように設計されています。次のプロセスを経て選択することをお勧めします。

データの削除

Spark は各ノードでのキャッシュ使用状況を自動的に監視し、古いデータパーティションを最も最近使用されていない (LRU) 方式でドロップします。キャッシュから外れるのを待つのではなく、手動で RDD を削除したい場合は、RDD.unpersist() メソッドを使用します。このメソッドはデフォルトではブロックしないことに注意してください。リソースが解放されるまでブロックするには、このメソッドを呼び出すときに blocking=true を指定します。

共有変数

通常、Spark 操作 (mapreduce など) に渡される関数がリモートクラスターノードで実行される場合、関数で使用されるすべての変数の個別のコピーで動作します。これらの変数は各マシンにコピーされ、リモートマシン上の変数への更新はドライバープログラムに伝播されません。タスク全体での汎用的な読み書き共有変数のサポートは非効率的になります。ただし、Spark は 2 つの一般的な使用パターンに対して、2 つの限定的な種類の 共有変数 を提供します。ブロードキャスト変数とアキュムレータです。

ブロードキャスト変数

ブロードキャスト変数を使用すると、プログラマはタスクごとに変数のコピーを送信するのではなく、読み取り専用変数を各マシンにキャッシュしておくことができます。たとえば、効率的な方法で各ノードに大規模な入力データセットのコピーを提供するために使用できます。Spark は、通信コストを削減するために、効率的なブロードキャストアルゴリズムを使用してブロードキャスト変数を配布しようとします。

Spark アクションは、分散 "シャッフル" 操作によって区切られた一連のステージを通じて実行されます。Spark は、各ステージ内のタスクが必要とする共通データを自動的にブロードキャストします。このようにブロードキャストされたデータはシリアライズされた形式でキャッシュされ、各タスクを実行する前にデシリアライズされます。これは、ブロードキャスト変数を明示的に作成することが、複数ステージにわたるタスクが同じデータを必要とする場合、またはデータをデシリアライズされた形式でキャッシュすることが重要な場合にのみ役立つことを意味します。

ブロードキャスト変数は、変数 v から SparkContext.broadcast(v) を呼び出すことによって作成されます。ブロードキャスト変数は v のラッパーであり、その値は value メソッドを呼び出すことでアクセスできます。次のコードはこれを例示しています。

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.core.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

ブロードキャスト変数が作成された後、クラスターで実行される関数では、v がノードに 1 回以上送信されないように、値 v の代わりにそれを使用する必要があります。さらに、すべてのノードがブロードキャスト変数の同じ値を取得するように、ブロードキャスト後にオブジェクト v を変更しないでください (例:後で新しいノードに送信される場合)。

ブロードキャスト変数がエグゼキュータにコピーしたリソースを解放するには、.unpersist() を呼び出します。その後ブロードキャストが再度使用された場合、再ブロードキャストされます。ブロードキャスト変数が使用するすべてリソースを永久に解放するには、.destroy() を呼び出します。その後、ブロードキャスト変数は使用できません。これらのメソッドはデフォルトではブロックしないことに注意してください。リソースが解放されるまでブロックするには、それらを呼び出すときに blocking=true を指定します。

アキュムレータ

アキュムレータは、結合的かつ可換な操作を通じてのみ "追加" される変数であり、したがって効率的に並列サポートできます。これらは、(MapReduce のように) カウンターや合計を実装するために使用できます。Spark は数値型のアキュムレータをネイティブでサポートしており、プログラマは新しい型をサポートに追加できます。

ユーザーは、名前付きまたは名前なしのアキュムレータを作成できます。下の画像に示すように、名前付きアキュムレータ (この場合は counter) は、そのアキュムレータを変更するステージの Web UI に表示されます。「タスク」テーブルに、タスクによって変更された各アキュムレータの値が表示されます。

Accumulators in the Spark UI

UI でアキュムレータを追跡することは、実行中のステージの進行状況を理解するのに役立ちます (注意: これは Python ではまだサポートされていません)。

アキュムレータは、初期値 v から SparkContext.accumulator(v) を呼び出すことによって作成されます。クラスターで実行されるタスクは、add メソッドまたは += 演算子を使用してそれに加算できます。ただし、その値は読み取れません。ドライバープログラムのみが、value メソッドを使用してアキュムレータの値にアクセスできます。

次のコードは、配列の要素を合計するためにアキュムレータが使用されていることを示しています。

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

このコードは Int 型のアキュムレータの組み込みサポートを使用しましたが、プログラマは AccumulatorParam をサブクラス化することによって独自の型を作成することもできます。AccumulatorParam インターフェースには 2 つのメソッドがあります。データ型の「ゼロ値」を提供する zero と、2 つの値を結合する addInPlace です。たとえば、数学ベクトルを表す Vector クラスがある場合、次のように記述できます。

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

数値アキュムレータは、SparkContext.longAccumulator() または SparkContext.doubleAccumulator() を呼び出すことで作成できます。これは、それぞれ Long 型または Double 型の値を集計します。クラスターで実行されるタスクは、add メソッドを使用してそれに加算できます。ただし、その値は読み取れません。ドライバープログラムのみが、value メソッドを使用してアキュムレータの値にアクセスできます。

次のコードは、配列の要素を合計するためにアキュムレータが使用されていることを示しています。

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

このコードでは、組み込みのLong型のアキュムレータサポートを使用しましたが、プログラマはAccumulatorV2をサブクラス化することで独自の型を作成することもできます。AccumulatorV2抽象クラスには、オーバーライドする必要があるいくつかのメソッドがあります。アキュムレータをゼロにリセットするためのreset、アキュムレータに別の値を追加するためのadd、同じ型の別のアキュムレータをこのアキュムレータにマージするためのmergeです。オーバーライドする必要があるその他のメソッドは、APIドキュメントに含まれています。たとえば、数学的なベクトルを表すMyVectorクラスがあると仮定すると、次のように記述できます。

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

注意: プログラマが独自のAccumulatorV2型を定義する場合、結果の型は追加される要素の型と異なる場合があります。

数値アキュムレータは、SparkContext.longAccumulator() または SparkContext.doubleAccumulator() を呼び出すことで作成できます。これは、それぞれ Long 型または Double 型の値を集計します。クラスターで実行されるタスクは、add メソッドを使用してそれに加算できます。ただし、その値は読み取れません。ドライバープログラムのみが、value メソッドを使用してアキュムレータの値にアクセスできます。

次のコードは、配列の要素を合計するためにアキュムレータが使用されていることを示しています。

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

このコードでは、組み込みのLong型のアキュムレータサポートを使用しましたが、プログラマはAccumulatorV2をサブクラス化することで独自の型を作成することもできます。AccumulatorV2抽象クラスには、オーバーライドする必要があるいくつかのメソッドがあります。アキュムレータをゼロにリセットするためのreset、アキュムレータに別の値を追加するためのadd、同じ型の別のアキュムレータをこのアキュムレータにマージするためのmergeです。オーバーライドする必要があるその他のメソッドは、APIドキュメントに含まれています。たとえば、数学的なベクトルを表すMyVectorクラスがあると仮定すると、次のように記述できます。

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

注意: プログラマが独自のAccumulatorV2型を定義する場合、結果の型は追加される要素の型と異なる場合があります。

警告: Sparkタスクが終了すると、Sparkはそのタスクでの累積された更新をアキュムレータにマージしようとします。失敗した場合、Sparkは失敗を無視し、タスクを成功とマークして他のタスクの実行を続行します。したがって、バグのあるアキュムレータはSparkジョブに影響を与えませんが、Sparkジョブが成功しても正しく更新されない可能性があります。

アクションのみで実行されるアキュムレータの更新について、Sparkは、各タスクのアキュムレータへの更新が一度だけ適用されることを保証します。つまり、再起動されたタスクは値を更新しません。変換では、タスクまたはジョブステージが再実行された場合、各タスクの更新が複数回適用される可能性があることに注意してください。

アキュムレータはSparkの遅延評価モデルを変更しません。RDDの操作内で更新されている場合、その値はアクションの一部としてそのRDDが計算されたときにのみ更新されます。したがって、map()のような遅延変換内でアキュムレータの更新が実行されることは保証されません。以下のコードフラグメントは、このプロパティを示しています。

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

クラスターへのデプロイ

アプリケーション提出ガイド」では、クラスターにアプリケーションを提出する方法について説明しています。要するに、アプリケーションをJAR(Java/Scalaの場合)または一連の.pyまたは.zipファイル(Pythonの場合)にパッケージ化したら、bin/spark-submitスクリプトを使用して、サポートされている任意のクラスターマネージャーに提出できます。

Java / Scala から Spark ジョブの起動

org.apache.spark.launcherパッケージには、単純なJava APIを使用してSparkジョブを子プロセスとして起動するためのクラスが用意されています。

単体テスト

Sparkは、一般的な単体テストフレームワークと組み合わせて単体テストを行うのに適しています。テストで、マスターURLをlocalに設定したSparkContextを作成し、操作を実行してから、SparkContext.stop()を呼び出して破棄します。Sparkは同じプログラムで2つのコンテキストを同時に実行することをサポートしていないため、コンテキストは必ずfinallyブロックまたはテストフレームワークのtearDownメソッド内で停止してください。

次へ進む

SparkのWebサイトで、いくつかのSparkのサンプルプログラムをご覧いただけます。さらに、Sparkにはexamplesディレクトリにいくつかのサンプルが含まれています(PythonScalaJavaR)。JavaおよびScalaの例は、クラス名をSparkのbin/run-exampleスクリプトに渡すことで実行できます。たとえば、次のようになります。

./bin/run-example SparkPi

Pythonの例については、代わりにspark-submitを使用してください。

./bin/spark-submit examples/src/main/python/pi.py

Rの例については、代わりにspark-submitを使用してください。

./bin/spark-submit examples/src/main/r/dataframe.R

プログラムの最適化については、「設定」および「チューニング」ガイドでベストプラクティスに関する情報を提供しています。これらは、データがメモリに効率的な形式で保存されていることを確認するために特に重要です。デプロイメントについては、「クラスターモードの概要」で、分散操作に関係するコンポーネントとサポートされているクラスターマネージャーについて説明しています。

最後に、完全なAPIドキュメントは、PythonScalaJava、およびRで利用できます。