RDDプログラミングガイド

概要

大まかに言うと、すべてのSparkアプリケーションは、ユーザーのmain関数を実行し、クラスタ上でさまざまな並列操作を実行するドライバプログラムで構成されます。Sparkが提供する主な抽象化は、Resilient Distributed Dataset (RDD)です。これは、並列に操作できるクラスタのノード間で分割された要素のコレクションです。RDDは、Hadoopファイルシステム(またはその他のHadoop対応ファイルシステム)内のファイル、またはドライバプログラム内の既存のScalaコレクションから開始し、それを変換することによって作成されます。ユーザーは、並列操作間で効率的に再利用できるように、RDDをメモリに永続化するようにSparkに要求することもできます。最後に、RDDはノード障害から自動的に回復します。

Sparkにおける2番目の抽象化は、並列操作で使用できる共有変数です。デフォルトでは、Sparkが異なるノード上でタスクのセットとして関数を並列に実行する場合、関数で使用される各変数のコピーを各タスクに送信します。場合によっては、変数をタスク間、またはタスクとドライバプログラム間で共有する必要があります。Sparkは、2種類の共有変数をサポートしています。ブロードキャスト変数は、すべてのノードのメモリに値をキャッシュするために使用でき、アキュムレータは、カウンターや合計など、 "追加"のみを行う変数です。

このガイドでは、Sparkがサポートする各言語でこれらの各機能を示します。Sparkのインタラクティブシェルを起動すると、理解しやすくなります。Scalaシェルの場合はbin/spark-shell、Pythonシェルの場合はbin/pysparkを使用してください。

Sparkとのリンク

Spark 3.5.1はPython 3.8以降で動作します。標準のCPythonインタープリターを使用できるため、NumPyのようなCライブラリを使用できます。また、PyPy 7.3.6以降でも動作します。

PythonでのSparkアプリケーションは、ランタイムにSparkを含むbin/spark-submitスクリプトを使用して実行するか、setup.pyに含めることで実行できます。

    install_requires=[
        'pyspark=={site.SPARK_VERSION}'
    ]

pipを使用してPySparkをインストールせずにPythonでSparkアプリケーションを実行するには、Sparkディレクトリにあるbin/spark-submitスクリプトを使用します。このスクリプトは、SparkのJava / Scalaライブラリをロードし、クラスタにアプリケーションをサブミットできるようにします。また、bin/pysparkを使用してインタラクティブなPythonシェルを起動することもできます。

HDFSデータにアクセスする場合は、HDFSのバージョンにリンクするPySparkのビルドを使用する必要があります。プリビルドパッケージは、一般的なHDFSバージョン向けにSparkのホームページでも入手できます。

最後に、いくつかのSparkクラスをプログラムにインポートする必要があります。次の行を追加してください。

from pyspark import SparkContext, SparkConf

PySparkでは、ドライバとワーカーの両方で同じマイナーバージョンのPythonが必要です。PATHのデフォルトのPythonバージョンを使用しますが、PYSPARK_PYTHONでどのバージョンのPythonを使用するかを指定できます。たとえば、

$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py

Spark 3.5.1は、デフォルトでScala 2.12と連携するようにビルドおよび配布されています。(Sparkは、他のバージョンのScalaと連携するようにビルドすることもできます。)Scalaでアプリケーションを作成するには、互換性のあるScalaバージョン(例:2.12.X)を使用する必要があります。

Sparkアプリケーションを作成するには、SparkへのMaven依存関係を追加する必要があります。Sparkは、Maven Centralから入手できます。

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1

さらに、HDFSクラスターにアクセスする場合は、HDFSのバージョンに対応するhadoop-clientへの依存関係を追加する必要があります。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後に、いくつかのSparkクラスをプログラムにインポートする必要があります。次の行を追加してください。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

(Spark 1.3.0より前のバージョンでは、必須の暗黙的な変換を有効にするために、明示的にimport org.apache.spark.SparkContext._を行う必要があります。)

Spark 3.5.1は、関数を簡潔に記述するためのラムダ式をサポートしています。そうでない場合は、org.apache.spark.api.java.functionパッケージのクラスを使用できます。

Java 7のサポートは、Spark 2.2.0で削除されたことに注意してください。

JavaでSparkアプリケーションを作成するには、Sparkへの依存関係を追加する必要があります。Sparkは、Maven Centralから入手できます。

groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1

さらに、HDFSクラスターにアクセスする場合は、HDFSのバージョンに対応するhadoop-clientへの依存関係を追加する必要があります。

groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

最後に、いくつかのSparkクラスをプログラムにインポートする必要があります。次の行を追加してください。

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;

Sparkの初期化

Sparkプログラムが最初に行う必要があるのは、SparkContextオブジェクトを作成することです。これは、SparkがクラスタにどのようにアクセスするかをSparkに伝えます。SparkContextを作成するには、最初にアプリケーションに関する情報を含むSparkConfオブジェクトをビルドする必要があります。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

Sparkプログラムが最初に行う必要があるのは、SparkContextオブジェクトを作成することです。これは、SparkがクラスタにどのようにアクセスするかをSparkに伝えます。SparkContextを作成するには、最初にアプリケーションに関する情報を含むSparkConfオブジェクトをビルドする必要があります。

JVMごとにアクティブなSparkContextは1つだけである必要があります。新しいSparkContextを作成する前に、アクティブなSparkContextをstop()する必要があります。

val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)

Sparkプログラムが最初に行う必要があるのは、JavaSparkContextオブジェクトを作成することです。これは、SparkがクラスタにどのようにアクセスするかをSparkに伝えます。SparkContextを作成するには、最初にアプリケーションに関する情報を含むSparkConfオブジェクトをビルドする必要があります。

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

appNameパラメーターは、クラスタUIに表示するアプリケーションの名前です。masterは、Spark、Mesos、またはYARNクラスターのURL、またはローカルモードで実行するための特別な「local」文字列です。実際には、クラスタ上で実行する場合、プログラムでmasterをハードコーディングするのではなく、spark-submitでアプリケーションを起動し、そこで受け取るようにします。ただし、ローカルテストと単体テストの場合は、「local」を渡してSparkをインプロセスで実行できます。

シェルを使用する

PySparkシェルでは、特別なインタープリター対応のSparkContextが、scという変数に既に作成されています。独自のSparkContextを作成しても機能しません。コンテキストが接続するマスターは、--master引数を使用して設定できます。また、コンマ区切りのリストを--py-filesに渡すことで、Pythonの.zip、.egg、または.pyファイルをランタイムパスに追加できます。サードパーティのPython依存関係については、Pythonパッケージ管理を参照してください。また、コンマ区切りのMaven座標のリストを--packages引数に指定することにより、シェルセッションに依存関係(例:Sparkパッケージ)を追加することもできます。依存関係が存在する可能性のある追加のリポジトリ(例:Sonatype)は、--repositories引数に渡すことができます。たとえば、正確に4つのコアでbin/pysparkを実行するには、次のように使用します。

$ ./bin/pyspark --master local[4]

または、検索パスにcode.pyも追加するには(後でimport codeできるように)、次のように使用します。

$ ./bin/pyspark --master local[4] --py-files code.py

オプションの完全なリストについては、pyspark --helpを実行してください。舞台裏では、pysparkは、より一般的なspark-submitスクリプトを呼び出します。

IPython(強化されたPythonインタープリター)でPySparkシェルを起動することも可能です。PySparkはIPython 1.0.0以降で動作します。IPythonを使用するには、bin/pysparkを実行するときに、PYSPARK_DRIVER_PYTHON変数をipythonに設定します。

$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

Jupyterノートブック(以前はIPythonノートブックとして知られていました)を使用するには、

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

ipython または jupyter コマンドをカスタマイズするには、PYSPARK_DRIVER_PYTHON_OPTS を設定します。

Jupyter Notebookサーバーが起動したら、「Files」タブから新しいノートブックを作成できます。ノートブック内で、JupyterノートブックからSparkを試す前に、コマンド %pylab inline をノートブックの一部として入力できます。

Sparkシェルでは、特別なインタープリター対応のSparkContextがすでに変数 sc に作成されています。独自のSparkContextを作成しても機能しません。コンテキストが接続するマスターは、--master 引数を使用して設定できます。また、コンマ区切りのリストを --jars 引数に渡すことで、JARをクラスパスに追加できます。依存関係(例えばSparkパッケージ)をシェルセッションに追加するには、Maven座標のコンマ区切りのリストを --packages 引数に渡します。依存関係が存在する可能性のある追加のリポジトリ(例えばSonatype)は、--repositories 引数に渡すことができます。例えば、正確に4つのコアで bin/spark-shell を実行するには、次のように使用します。

$ ./bin/spark-shell --master local[4]

または、code.jar をクラスパスに追加するには、次のように使用します。

$ ./bin/spark-shell --master local[4] --jars code.jar

Maven座標を使用して依存関係を含めるには

$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"

すべてのオプションのリストを表示するには、spark-shell --help を実行します。内部的には、spark-shell はより一般的な spark-submit スクリプトを呼び出します。

Resilient Distributed Datasets (RDDs)

Sparkは、並列に操作できるフォールトトレラントな要素のコレクションであるresilient distributed dataset(RDD)の概念を中心に展開しています。RDDを作成する方法は2つあります。ドライバープログラムで既存のコレクションを並列化するか、共有ファイルシステム、HDFS、HBase、またはHadoop InputFormatを提供する任意のデータソースなどの外部ストレージシステムのデータセットを参照します。

並列化されたコレクション

並列化されたコレクションは、ドライバープログラムの既存のイテラブルまたはコレクションに対して SparkContextparallelize メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。例えば、1から5までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

作成後、分散データセット(distData)は並列に操作できます。例えば、リストの要素を加算するために distData.reduce(lambda a, b: a + b) を呼び出すことができます。分散データセットの操作については後で説明します。

並列化されたコレクションは、ドライバープログラム(ScalaのSeq)の既存のコレクションに対して SparkContextparallelize メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。例えば、1から5までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

作成後、分散データセット(distData)は並列に操作できます。例えば、配列の要素を加算するために distData.reduce((a, b) => a + b) を呼び出すことができます。分散データセットの操作については後で説明します。

並列化されたコレクションは、ドライバープログラムの既存の Collection に対して JavaSparkContextparallelize メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。例えば、1から5までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);

作成後、分散データセット(distData)は並列に操作できます。例えば、リストの要素を加算するために distData.reduce((a, b) -> a + b) を呼び出すことができます。分散データセットの操作については後で説明します。

並列コレクションの重要なパラメーターの1つは、データセットを分割するパーティションの数です。Sparkは、クラスタの各パーティションに対して1つのタスクを実行します。通常、クラスタ内の各CPUに対して2〜4つのパーティションが必要になります。通常、Sparkはクラスタに基づいてパーティション数を自動的に設定しようとします。ただし、parallelize に2番目のパラメーターとして渡すことで、手動で設定することもできます(例えば、sc.parallelize(data, 10))。注:コードの一部の場所では、後方互換性を維持するために、スライス(パーティションの同義語)という用語を使用しています。

外部データセット

PySparkは、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3など、Hadoopでサポートされている任意のストレージソースから分散データセットを作成できます。Sparkは、テキストファイル、SequenceFiles、およびその他のHadoop InputFormatをサポートしています。

テキストファイルRDDは、SparkContexttextFile メソッドを使用して作成できます。このメソッドは、ファイルへのURI(マシンのローカルパス、または hdfs://s3a:// などのURI)を受け取り、行のコレクションとして読み取ります。以下に呼び出しの例を示します。

>>> distFile = sc.textFile("data.txt")

作成後、distFile はデータセット操作によって操作できます。例えば、次のように map および reduce 操作を使用して、すべての行のサイズを加算できます。distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)

Sparkでファイルを読み取る際の注意点

  • ローカルファイルシステムのパスを使用している場合、ファイルはワーカーノード上の同じパスからもアクセスできる必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークマウントされた共有ファイルシステムを使用します。

  • textFile を含む、Sparkのすべてのファイルベースの入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードでの実行をサポートしています。例えば、textFile("/my/directory")textFile("/my/directory/*.txt")、および textFile("/my/directory/*.gz") を使用できます。

  • textFile メソッドは、ファイルのパーティション数を制御するためのオプションの2番目の引数も受け取ります。デフォルトでは、Sparkはファイルの各ブロック(HDFSではデフォルトで128MBのブロック)に対して1つのパーティションを作成しますが、より大きな値を渡すことで、より多くのパーティションを要求することもできます。ブロック数よりも少ないパーティションを持つことはできないことに注意してください。

テキストファイル以外に、SparkのPython APIは他のいくつかのデータ形式もサポートしています。

  • SparkContext.wholeTextFiles を使用すると、複数の小さなテキストファイルを含むディレクトリを読み取ることができ、それらの各ファイルを(ファイル名、コンテンツ)のペアとして返します。これは、各ファイルの行ごとに1つのレコードを返す textFile とは対照的です。

  • RDD.saveAsPickleFile および SparkContext.pickleFile は、pickle化されたPythonオブジェクトで構成されるシンプルな形式でRDDを保存することをサポートします。バッチ処理は、デフォルトのバッチサイズ10でpickleシリアル化に使用されます。

  • SequenceFileおよびHadoop Input/Output形式

この機能は現在 Experimental とマークされており、上級ユーザー向けです。将来的には、Spark SQLに基づいた読み取り/書き込みサポートに置き換えられる可能性があります。その場合、Spark SQLが推奨されるアプローチです。

Writableサポート

PySpark SequenceFileサポートは、Java内のキーと値のペアのRDDをロードし、Writablesを基本のJava型に変換し、pickle を使用して結果のJavaオブジェクトをpickle化します。キーと値のペアのRDDをSequenceFileに保存する場合、PySparkは逆のことを行います。PythonオブジェクトをJavaオブジェクトにアンピックルしてから、Writablesに変換します。次のWritablesは自動的に変換されます。

Writable型Python型
Textstr
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
MapWritabledict

配列はそのままでは処理されません。ユーザーは、読み取りまたは書き込み時にカスタムの ArrayWritable サブタイプを指定する必要があります。書き込み時、ユーザーは配列をカスタムの ArrayWritable サブタイプに変換するカスタムコンバーターも指定する必要があります。読み取り時、デフォルトのコンバーターは、カスタムの ArrayWritable サブタイプをJavaの Object[] に変換し、その後Pythonのタプルにpickle化されます。プリミティブ型の配列に対してPythonの array.array を取得するには、ユーザーはカスタムコンバーターを指定する必要があります。

SequenceFileの保存とロード

テキストファイルと同様に、SequenceFileはパスを指定することで保存およびロードできます。キーと値のクラスを指定できますが、標準のWritablesにはこれは必要ありません。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

その他のHadoop Input/Output形式の保存とロード

PySparkは、「新しい」および「古い」Hadoop MapReduce APIの両方について、任意のHadoop InputFormatを読み取り、任意のHadoop OutputFormatを書き込むこともできます。必要に応じて、Hadoop構成をPython dictとして渡すことができます。以下に、Elasticsearch ESInputFormatを使用する例を示します。

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
                             "org.apache.hadoop.io.NullWritable",
                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",
                             conf=conf)
>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
 {u'field1': True,
  u'field2': u'Some Text',
  u'field3': 12345})

InputFormatがHadoop構成および/または入力パスにのみ依存しており、キーと値のクラスが上記の表に従って簡単に変換できる場合、このアプローチはそのような場合にうまく機能することに注意してください。

(Cassandra/HBaseからデータをロードするなど)カスタムのシリアル化されたバイナリデータがある場合は、最初にScala/Java側でそのデータをpickleのpicklerで処理できるものに変換する必要があります。このために Converter トレイトが提供されています。このトレイトを拡張し、convert メソッドに変換コードを実装するだけです。このクラスと、InputFormat にアクセスするために必要な依存関係が、Sparkジョブjarにパッケージ化され、PySparkクラスパスに含まれていることを確認してください。

Cassandra/HBase InputFormatOutputFormat をカスタムコンバーターで使用する例については、Pythonの例Converterの例を参照してください。

Sparkは、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3など、Hadoopでサポートされている任意のストレージソースから分散データセットを作成できます。Sparkは、テキストファイル、SequenceFiles、およびその他のHadoop InputFormatをサポートしています。

テキストファイルRDDは、SparkContexttextFile メソッドを使用して作成できます。このメソッドは、ファイルへのURI(マシンのローカルパス、または hdfs://s3a:// などのURI)を受け取り、行のコレクションとして読み取ります。以下に呼び出しの例を示します。

scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

作成後、distFile はデータセット操作によって操作できます。例えば、次のように map および reduce 操作を使用して、すべての行のサイズを加算できます。distFile.map(s => s.length).reduce((a, b) => a + b)

Sparkでファイルを読み取る際の注意点

  • ローカルファイルシステムのパスを使用している場合、ファイルはワーカーノード上の同じパスからもアクセスできる必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークマウントされた共有ファイルシステムを使用します。

  • Sparkのファイルベースのすべての入力メソッド(textFileを含む)は、ディレクトリ、圧縮ファイル、ワイルドカードでの実行をサポートしています。たとえば、textFile("/my/directory")textFile("/my/directory/*.txt")、およびtextFile("/my/directory/*.gz")を使用できます。複数のファイルを読み込む場合、パーティションの順序は、ファイルシステムから返されるファイルの順序によって異なります。例えば、ファイルのパスの辞書順に従う場合も、そうでない場合もあります。パーティション内では、要素は基になるファイルでの順序に従って順序付けられます。

  • textFile メソッドは、ファイルのパーティション数を制御するためのオプションの2番目の引数も受け取ります。デフォルトでは、Sparkはファイルの各ブロック(HDFSではデフォルトで128MBのブロック)に対して1つのパーティションを作成しますが、より大きな値を渡すことで、より多くのパーティションを要求することもできます。ブロック数よりも少ないパーティションを持つことはできないことに注意してください。

テキストファイル以外にも、SparkのScala APIは他のいくつかのデータ形式をサポートしています。

  • SparkContext.wholeTextFilesを使用すると、複数の小さなテキストファイルを含むディレクトリを読み取り、それらをそれぞれ(ファイル名、内容)のペアとして返します。これは、各ファイルの行ごとに1つのレコードを返すtextFileとは対照的です。パーティショニングはデータの局所性によって決定されます。これにより、場合によってはパーティションが少なすぎる可能性があります。そのような場合、wholeTextFilesは、最小パーティション数を制御するためのオプションの2番目の引数を提供します。

  • SequenceFileの場合、SparkContextのsequenceFile[K, V]メソッドを使用します。ここで、KVは、ファイル内のキーと値の型です。これらは、IntWritableTextなどのHadoopのWritableインターフェースのサブクラスである必要があります。さらに、Sparkでは、いくつかの一般的なWritableに対してネイティブ型を指定できます。たとえば、sequenceFile[Int, String]はIntWritableとTextを自動的に読み込みます。

  • 他のHadoop InputFormatの場合は、SparkContext.hadoopRDDメソッドを使用できます。このメソッドは、任意のJobConfと入力形式クラス、キー・クラス、値クラスを取ります。これらは、入力ソースを持つHadoopジョブと同じ方法で設定します。また、「新しい」MapReduce API(org.apache.hadoop.mapreduce)に基づくInputFormatには、SparkContext.newAPIHadoopRDDを使用できます。

  • RDD.saveAsObjectFileSparkContext.objectFileは、シリアライズされたJavaオブジェクトで構成されるシンプルな形式でRDDを保存することをサポートします。これはAvroのような特殊な形式ほど効率的ではありませんが、任意のRDDを保存する簡単な方法を提供します。

Sparkは、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3など、Hadoopでサポートされている任意のストレージソースから分散データセットを作成できます。Sparkは、テキストファイル、SequenceFiles、およびその他のHadoop InputFormatをサポートしています。

テキストファイルRDDは、SparkContexttextFile メソッドを使用して作成できます。このメソッドは、ファイルへのURI(マシンのローカルパス、または hdfs://s3a:// などのURI)を受け取り、行のコレクションとして読み取ります。以下に呼び出しの例を示します。

JavaRDD<String> distFile = sc.textFile("data.txt");

作成されたdistFileは、データセット操作によって操作できます。例えば、以下のように、mapreduce操作を使用して、すべての行のサイズを合計できます。distFile.map(s -> s.length()).reduce((a, b) -> a + b)

Sparkでファイルを読み取る際の注意点

  • ローカルファイルシステムのパスを使用している場合、ファイルはワーカーノード上の同じパスからもアクセスできる必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークマウントされた共有ファイルシステムを使用します。

  • textFile を含む、Sparkのすべてのファイルベースの入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードでの実行をサポートしています。例えば、textFile("/my/directory")textFile("/my/directory/*.txt")、および textFile("/my/directory/*.gz") を使用できます。

  • textFile メソッドは、ファイルのパーティション数を制御するためのオプションの2番目の引数も受け取ります。デフォルトでは、Sparkはファイルの各ブロック(HDFSではデフォルトで128MBのブロック)に対して1つのパーティションを作成しますが、より大きな値を渡すことで、より多くのパーティションを要求することもできます。ブロック数よりも少ないパーティションを持つことはできないことに注意してください。

テキストファイル以外にも、SparkのJava APIは他のいくつかのデータ形式をサポートしています。

  • JavaSparkContext.wholeTextFilesを使用すると、複数の小さなテキストファイルを含むディレクトリを読み取り、それらをそれぞれ(ファイル名、内容)のペアとして返します。これは、各ファイルの行ごとに1つのレコードを返すtextFileとは対照的です。

  • SequenceFileの場合、SparkContextのsequenceFile[K, V]メソッドを使用します。ここで、KVは、ファイル内のキーと値の型です。これらは、IntWritableTextなどのHadoopのWritableインターフェースのサブクラスである必要があります。

  • 他のHadoop InputFormatの場合は、JavaSparkContext.hadoopRDDメソッドを使用できます。このメソッドは、任意のJobConfと入力形式クラス、キー・クラス、値クラスを取ります。これらは、入力ソースを持つHadoopジョブと同じ方法で設定します。また、「新しい」MapReduce API(org.apache.hadoop.mapreduce)に基づくInputFormatには、JavaSparkContext.newAPIHadoopRDDを使用できます。

  • JavaRDD.saveAsObjectFileJavaSparkContext.objectFileは、シリアライズされたJavaオブジェクトで構成されるシンプルな形式でRDDを保存することをサポートします。これはAvroのような特殊な形式ほど効率的ではありませんが、任意のRDDを保存する簡単な方法を提供します。

RDD操作

RDDは2種類の操作をサポートしています。既存のデータセットから新しいデータセットを作成する変換と、データセットで計算を実行した後、ドライバプログラムに値を返すアクションです。たとえば、mapは、各データセット要素を関数に通し、結果を表す新しいRDDを返す変換です。一方、reduceは、いくつかの関数を使用してRDDのすべての要素を集約し、最終結果をドライバプログラムに返すアクションです(ただし、分散データセットを返す並列reduceByKeyもあります)。

Sparkのすべての変換は、結果をすぐに計算するのではなく、遅延しています。代わりに、それらは、何らかの基本データセット(例えば、ファイル)に適用された変換を記憶するだけです。変換は、アクションがドライバプログラムに返される結果を必要とするときにのみ計算されます。この設計により、Sparkはより効率的に実行できます。例えば、mapによって作成されたデータセットがreduceで使用され、大きなマップされたデータセットではなく、reduceの結果のみをドライバに返すことを実現できます。

デフォルトでは、変換された各RDDは、アクションを実行するたびに再計算される可能性があります。ただし、persist(またはcache)メソッドを使用して、RDDをメモリに永続化することもできます。その場合、Sparkは、次にクエリを実行するときに、はるかに高速にアクセスできるように、要素をクラスタ上に保持します。ディスクにRDDを永続化したり、複数のノードにレプリケートしたりするためのサポートもあります。

基本

RDDの基本を説明するために、以下の簡単なプログラムを検討してください。

lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)

最初の行は、外部ファイルから基本RDDを定義します。このデータセットは、メモリにロードされたり、その他の操作が実行されたりすることはありません。linesは単なるファイルへのポインタです。2行目は、map変換の結果としてlineLengthsを定義します。ここでも、遅延のため、lineLengthsはすぐには計算されません。最後に、アクションであるreduceを実行します。この時点で、Sparkは計算を個別のマシンで実行するタスクに分割し、各マシンはマップの一部とローカル削減の両方を実行し、その答えのみをドライバプログラムに返します。

後でlineLengthsを再度使用したい場合は、

lineLengths.persist()

reduceの前に、lineLengthsが最初に計算された後でメモリに保存されるように追加できます。

RDDの基本を説明するために、以下の簡単なプログラムを検討してください。

val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)

最初の行は、外部ファイルから基本RDDを定義します。このデータセットは、メモリにロードされたり、その他の操作が実行されたりすることはありません。linesは単なるファイルへのポインタです。2行目は、map変換の結果としてlineLengthsを定義します。ここでも、遅延のため、lineLengthsはすぐには計算されません。最後に、アクションであるreduceを実行します。この時点で、Sparkは計算を個別のマシンで実行するタスクに分割し、各マシンはマップの一部とローカル削減の両方を実行し、その答えのみをドライバプログラムに返します。

後でlineLengthsを再度使用したい場合は、

lineLengths.persist()

reduceの前に、lineLengthsが最初に計算された後でメモリに保存されるように追加できます。

RDDの基本を説明するために、以下の簡単なプログラムを検討してください。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);

最初の行は、外部ファイルから基本RDDを定義します。このデータセットは、メモリにロードされたり、その他の操作が実行されたりすることはありません。linesは単なるファイルへのポインタです。2行目は、map変換の結果としてlineLengthsを定義します。ここでも、遅延のため、lineLengthsはすぐには計算されません。最後に、アクションであるreduceを実行します。この時点で、Sparkは計算を個別のマシンで実行するタスクに分割し、各マシンはマップの一部とローカル削減の両方を実行し、その答えのみをドライバプログラムに返します。

後でlineLengthsを再度使用したい場合は、

lineLengths.persist(StorageLevel.MEMORY_ONLY());

reduceの前に、lineLengthsが最初に計算された後でメモリに保存されるように追加できます。

Sparkへの関数の受け渡し

SparkのAPIは、クラスタ上で実行するためにドライバプログラムで関数を渡すことに大きく依存しています。これを行うには、次の3つの推奨される方法があります。

  • ラムダ式。式として記述できる単純な関数に使用します。(ラムダは、複数ステートメント関数や値を返さないステートメントをサポートしていません。)
  • Sparkを呼び出す関数内のローカルdef。長いコードの場合。
  • モジュール内のトップレベル関数。

例えば、lambdaを使用してサポートできるよりも長い関数を渡すには、以下のコードを検討してください。

"""MyScript.py"""
if __name__ == "__main__":
    def myFunc(s):
        words = s.split(" ")
        return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)

クラスインスタンス(シングルトンオブジェクトとは対照的)のメソッドへの参照を渡すことも可能ですが、これには、メソッドと一緒にそのクラスを含むオブジェクトを送信する必要があります。例えば、以下を検討してください。

class MyClass(object):
    def func(self, s):
        return s
    def doStuff(self, rdd):
        return rdd.map(self.func)

ここで、new MyClassを作成し、それに対してdoStuffを呼び出すと、内部のmapは、*そのMyClassインスタンス*のfuncメソッドを参照するため、オブジェクト全体をクラスタに送信する必要があります。

同様に、外側のオブジェクトのフィールドにアクセスすると、オブジェクト全体が参照されます。

class MyClass(object):
    def __init__(self):
        self.field = "Hello"
    def doStuff(self, rdd):
        return rdd.map(lambda s: self.field + s)

この問題を回避する最も簡単な方法は、外部からアクセスする代わりに、fieldをローカル変数にコピーすることです。

def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)

SparkのAPIは、クラスタ上で実行するためにドライバプログラムで関数を渡すことに大きく依存しています。これを行うには、次の2つの推奨される方法があります。

  • 無名関数構文。短いコードで使用できます。
  • グローバルシングルトンオブジェクト内の静的メソッド。例えば、object MyFunctionsを定義し、以下のようにMyFunctions.func1を渡すことができます。
object MyFunctions {
  def func1(s: String): String = { ... }
}

myRdd.map(MyFunctions.func1)

クラスインスタンス(シングルトンオブジェクトとは対照的)のメソッドへの参照を渡すことも可能ですが、これには、メソッドと一緒にそのクラスを含むオブジェクトを送信する必要があります。例えば、以下を検討してください。

class MyClass {
  def func1(s: String): String = { ... }
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}

ここで、新しいMyClassインスタンスを作成し、それに対してdoStuffを呼び出すと、内部のmapは、*そのMyClassインスタンス*のfunc1メソッドを参照するため、オブジェクト全体をクラスタに送信する必要があります。これは、rdd.map(x => this.func1(x))を記述するのと似ています。

同様に、外側のオブジェクトのフィールドにアクセスすると、オブジェクト全体が参照されます。

class MyClass {
  val field = "Hello"
  def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}

rdd.map(x => this.field + x)を記述するのと同じで、thisすべてを参照します。この問題を回避する最も簡単な方法は、外部からアクセスする代わりに、fieldをローカル変数にコピーすることです。

def doStuff(rdd: RDD[String]): RDD[String] = {
  val field_ = this.field
  rdd.map(x => field_ + x)
}

SparkのAPIは、クラスタ上で実行するためにドライバプログラムで関数を渡すことに大きく依存しています。Javaでは、関数はorg.apache.spark.api.java.functionパッケージのインターフェースを実装するクラスによって表されます。このような関数を作成するには、次の2つの方法があります。

  • 自分のクラスでFunctionインターフェースを実装し、無名インナークラスまたは名前付きクラスとして、そのインスタンスをSparkに渡します。
  • ラムダ式を使用して、実装を簡潔に定義します。

このガイドの多くでは簡潔にするためにラムダ構文を使用していますが、長形式ですべて同じAPIを使用するのは簡単です。たとえば、上記のコードを次のように記述できたはずです。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
  public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
  public Integer call(Integer a, Integer b) { return a + b; }
});

または、関数をインラインで記述するのが面倒な場合。

class GetLength implements Function<String, Integer> {
  public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
  public Integer call(Integer a, Integer b) { return a + b; }
}

JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());

Javaの無名インナークラスは、finalでマークされている限り、外側のスコープの変数にもアクセスできることに注意してください。Sparkは、他の言語と同様に、これらの変数のコピーを各ワーカーノードに送信します。

クロージャの理解

Sparkの難しい点の1つは、クラスタ全体でコードを実行するときの変数とメソッドのスコープとライフサイクルを理解することです。スコープ外の変数を変更するRDD操作は、混乱を招くことが多い原因になります。以下の例では、foreach()を使用してカウンタをインクリメントするコードを見ていきますが、他の操作でも同様の問題が発生する可能性があります。

以下に示す単純なRDD要素の合計は、実行が同じJVM内で行われるか否かによって異なる動作をする可能性があります。この一般的な例は、Sparkをlocalモード(--master = local[n])で実行する場合と、Sparkアプリケーションをクラスターにデプロイする場合(例えば、spark-submitを使用してYARNにデプロイする場合)です。

counter = 0
rdd = sc.parallelize(data)

# Wrong: Don't do this!!
def increment_counter(x):
    global counter
    counter += x
rdd.foreach(increment_counter)

print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don't do this!!
rdd.foreach(x => counter += x)

println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);

// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);

println("Counter value: " + counter);

ローカルモードとクラスタモード

上記のコードの動作は未定義であり、意図したとおりに動作しない可能性があります。ジョブを実行するために、SparkはRDD操作の処理をタスクに分割し、各タスクはexecutorによって実行されます。実行前に、Sparkはタスクのクロージャを計算します。クロージャとは、executorがRDD上で計算を実行するために可視でなければならない変数とメソッドのことです(この場合はforeach())。このクロージャはシリアライズされ、各executorに送信されます。

各executorに送信されるクロージャ内の変数はコピーとなるため、foreach関数内でcounterを参照した場合、もはやドライバーノード上のcounterではありません。ドライバーノードのメモリにはまだcounterが存在しますが、これはexecutorからは見えなくなっています!executorはシリアライズされたクロージャからのコピーのみを参照します。したがって、counterの最終値はゼロのままになります。なぜなら、counterに対するすべての操作がシリアライズされたクロージャ内の値を参照していたからです。

ローカルモードでは、場合によってはforeach関数がドライバーと同じJVM内で実際に実行され、同じ元のcounterを参照して、実際に更新する可能性があります。

このようなシナリオで明確に定義された動作を保証するには、Accumulatorを使用する必要があります。Sparkのアキュムレータは、クラスター内のワーカーノード間で実行が分割された場合に、変数を安全に更新するためのメカニズムを提供するために特に使用されます。このガイドのアキュムレータのセクションでは、これらについてさらに詳しく説明します。

一般的に、ループやローカルで定義されたメソッドのようなクロージャは、グローバルな状態を変更するために使用すべきではありません。Sparkは、クロージャの外部から参照されるオブジェクトの変更の動作を定義または保証しません。これを行う一部のコードはローカルモードでは動作するかもしれませんが、それは単なる偶然であり、このようなコードは分散モードでは期待どおりに動作しません。グローバルな集計が必要な場合は、代わりにアキュムレータを使用してください。

RDDの要素の表示

もう1つの一般的なイディオムは、rdd.foreach(println)またはrdd.map(println)を使用してRDDの要素を出力しようとすることです。単一のマシンでは、これは期待される出力を生成し、RDDのすべての要素を出力します。ただし、clusterモードでは、executorによって呼び出されるstdoutへの出力は、ドライバーのstdoutではなく、executorのstdoutに書き込まれるため、ドライバーのstdoutにはこれらは表示されません。ドライバーのすべての要素を出力するには、最初にcollect()メソッドを使用してRDDをドライバーノードに取り込むことができます。例:rdd.collect().foreach(println)。ただし、collect()はRDD全体を単一のマシンにフェッチするため、これによりドライバーがメモリ不足になる可能性があります。RDDの少数の要素のみを出力する必要がある場合は、take()を使用する方が安全です。例:rdd.take(100).foreach(println)

キーと値のペアの操作

ほとんどのSpark操作は任意の型のオブジェクトを含むRDDで動作しますが、いくつかの特別な操作はキーと値のペアのRDDでのみ利用可能です。最も一般的なものは、キーで要素をグループ化または集計するような分散型「シャッフル」操作です。

Pythonでは、これらの操作は(1, 2)のような組み込みのPythonタプルを含むRDDで動作します。このようなタプルを作成してから、目的の操作を呼び出すだけです。

たとえば、次のコードでは、キーと値のペアに対してreduceByKey操作を使用して、ファイル内の各テキスト行が何回発生するかをカウントします。

lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

また、たとえば、counts.sortByKey()を使用してペアをアルファベット順にソートし、最後にcounts.collect()を使用してオブジェクトのリストとしてドライバープログラムに戻すこともできます。

ほとんどのSpark操作は任意の型のオブジェクトを含むRDDで動作しますが、いくつかの特別な操作はキーと値のペアのRDDでのみ利用可能です。最も一般的なものは、キーで要素をグループ化または集計するような分散型「シャッフル」操作です。

Scalaでは、これらの操作はTuple2オブジェクト((a, b)と記述するだけで作成できる言語の組み込みタプル)を含むRDDで自動的に利用できます。キーと値のペア操作は、タプルのRDDを自動的にラップするPairRDDFunctionsクラスで利用できます。

たとえば、次のコードでは、キーと値のペアに対してreduceByKey操作を使用して、ファイル内の各テキスト行が何回発生するかをカウントします。

val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)

また、たとえば、counts.sortByKey()を使用してペアをアルファベット順にソートし、最後にcounts.collect()を使用してオブジェクトの配列としてドライバープログラムに戻すこともできます。

注意: カスタムオブジェクトをキーと値のペア操作のキーとして使用する場合は、カスタムのequals()メソッドに一致するhashCode()メソッドを必ず含める必要があります。詳細については、Object.hashCode()ドキュメントに記載されているコントラクトを参照してください。

ほとんどのSpark操作は任意の型のオブジェクトを含むRDDで動作しますが、いくつかの特別な操作はキーと値のペアのRDDでのみ利用可能です。最も一般的なものは、キーで要素をグループ化または集計するような分散型「シャッフル」操作です。

Javaでは、キーと値のペアはScala標準ライブラリのscala.Tuple2クラスを使用して表されます。new Tuple2(a, b)を呼び出してタプルを作成し、後でtuple._1()およびtuple._2()でそのフィールドにアクセスできます。

キーと値のペアのRDDは、JavaPairRDDクラスで表されます。JavaRDDからJavaPairRDDを構築するには、mapToPairflatMapToPairのようなmap操作の特別なバージョンを使用します。JavaPairRDDには、標準のRDD関数と特別なキーと値の関数があります。

たとえば、次のコードでは、キーと値のペアに対してreduceByKey操作を使用して、ファイル内の各テキスト行が何回発生するかをカウントします。

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

また、たとえば、counts.sortByKey()を使用してペアをアルファベット順にソートし、最後にcounts.collect()を使用してオブジェクトの配列としてドライバープログラムに戻すこともできます。

注意: カスタムオブジェクトをキーと値のペア操作のキーとして使用する場合は、カスタムのequals()メソッドに一致するhashCode()メソッドを必ず含める必要があります。詳細については、Object.hashCode()ドキュメントに記載されているコントラクトを参照してください。

変換

次の表に、Sparkでサポートされている一般的な変換の一部を示します。詳細については、RDD APIドキュメント(ScalaJavaPythonR)およびペアRDD関数ドキュメント(ScalaJava)を参照してください。

変換意味
map(func) ソースの各要素に関数funcを適用して形成された新しい分散データセットを返します。
filter(func) funcがtrueを返すソースの要素を選択して形成された新しいデータセットを返します。
flatMap(func) mapに似ていますが、各入力アイテムを0個以上の出力アイテムにマップできます(したがって、funcは単一のアイテムではなくSeqを返す必要があります)。
mapPartitions(func) mapに似ていますが、RDDの各パーティション(ブロック)で個別に実行されるため、型TのRDDで実行する場合は、funcはIterator<T> => Iterator<U>の型である必要があります。
mapPartitionsWithIndex(func) mapPartitionsに似ていますが、パーティションのインデックスを表す整数値もfuncに提供するため、型TのRDDで実行する場合は、funcは(Int, Iterator<T>) => Iterator<U>の型である必要があります。
sample(withReplacement, fraction, seed) 指定された乱数ジェネレーターのシードを使用して、置換の有無にかかわらず、データのごく一部であるfractionをサンプリングします。
union(otherDataset) ソースデータセットと引数の要素の和集合を含む新しいデータセットを返します。
intersection(otherDataset) ソースデータセットと引数の要素の交差部分を含む新しいRDDを返します。
distinct([numPartitions])) ソースデータセットの重複しない要素を含む新しいデータセットを返します。
groupByKey([numPartitions]) (K, V)ペアのデータセットで呼び出された場合、(K, Iterable<V>)ペアのデータセットを返します。
注意: 各キーに対する集計(合計や平均など)を実行するためにグループ化する場合は、reduceByKeyまたはaggregateByKeyを使用すると、はるかに優れたパフォーマンスが得られます。
注意: デフォルトでは、出力の並列処理のレベルは、親RDDのパーティションの数に依存します。オプションのnumPartitions引数を渡して、タスクの数を変更できます。
reduceByKey(func, [numPartitions]) (K, V)ペアのデータセットで呼び出された場合、各キーの値が、(V,V) => Vの型である必要のある指定されたreduce関数funcを使用して集計された(K, V)ペアのデータセットを返します。groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数で構成できます。
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) (K, V)ペアのデータセットで呼び出された場合、各キーの値が、指定された結合関数と中立な「ゼロ」の値を使用して集計された(K, U)ペアのデータセットを返します。入力値の型とは異なる集計値の型を許可し、不要な割り当てを回避します。groupByKeyと同様に、reduceタスクの数は、オプションの2番目の引数で構成できます。
sortByKey([ascending], [numPartitions]) KがOrderedを実装する(K, V)ペアのデータセットで呼び出された場合、ブール値のascending引数で指定されたように、昇順または降順でキーでソートされた(K, V)ペアのデータセットを返します。
join(otherDataset, [numPartitions]) (K, V)と(K, W)の型のデータセットで呼び出された場合、各キーのすべての要素のペアを含む(K, (V, W))ペアのデータセットを返します。外部結合は、leftOuterJoinrightOuterJoin、およびfullOuterJoinを通じてサポートされています。
cogroup(otherDataset, [numPartitions]) (K, V)と(K, W)の型のデータセットで呼び出された場合、(K, (Iterable<V>, Iterable<W>))タプルのデータセットを返します。この操作はgroupWithとも呼ばれます。
cartesian(otherDataset) 型TとUのデータセットで呼び出された場合、(T, U)ペアのデータセット(すべての要素のペア)を返します。
pipe(command, [envVars]) RDDの各パーティションをシェルコマンド(たとえば、Perlまたはbashスクリプト)に通します。RDD要素はプロセスのstdinに書き込まれ、stdoutに出力された行は文字列のRDDとして返されます。
coalesce(numPartitions) RDDのパーティション数をnumPartitionsに減らします。大規模なデータセットをフィルタリングした後、より効率的に処理を実行するのに役立ちます。
repartition(numPartitions) RDD内のデータをランダムにシャッフルし、パーティション数を増減させたり、パーティション間でバランスを取ったりします。この操作では常にすべてのデータがネットワーク経由でシャッフルされます。
repartitionAndSortWithinPartitions(partitioner) 与えられたpartitionerに従ってRDDを再パーティション化し、結果の各パーティション内でレコードをキーでソートします。これは、シャッフル機構にソートをプッシュダウンできるため、repartitionを呼び出してから各パーティション内でソートするよりも効率的です。

アクション

次の表は、Sparkでサポートされている一般的なアクションの一部を示しています。詳細については、RDD APIドキュメント(ScalaJavaPythonR

とペアRDD関数ドキュメント(ScalaJava)を参照してください。

アクション意味
reduce(func) データセットの要素を関数func(2つの引数を取り、1つを返す)を使用して集約します。関数は、並列で正しく計算できるように、可換かつ結合的である必要があります。
collect() データセットのすべての要素を、ドライバープログラムで配列として返します。これは通常、フィルタリングや、データの十分に小さなサブセットを返す他の操作の後で役立ちます。
count() データセット内の要素の数を返します。
first() データセットの最初の要素を返します(take(1)と同様)。
take(n) データセットの最初のn個の要素を含む配列を返します。
takeSample(withReplacement, num, [seed]) データセットからランダムにnum個の要素のサンプルを、置換ありまたは置換なしで返し、必要に応じて乱数ジェネレーターのシードを事前指定します。
takeOrdered(n, [ordering]) RDDの最初のn個の要素を、自然順序またはカスタムコンパレータを使用して返します。
saveAsTextFile(path) データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoop対応ファイルシステム内の指定されたディレクトリにテキストファイル(またはテキストファイルのセット)として書き込みます。Sparkは、各要素に対してtoStringを呼び出して、ファイルのテキスト行に変換します。
saveAsSequenceFile(path)
(JavaおよびScala)
データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoop対応ファイルシステム内の指定されたパスにHadoop SequenceFileとして書き込みます。これは、HadoopのWritableインターフェイスを実装するキーと値のペアのRDDで使用できます。 Scalaでは、Writableに暗黙的に変換可能な型(Int、Double、Stringなどの基本型に対する変換がSparkに含まれています)でも利用できます。
saveAsObjectFile(path)
(JavaおよびScala)
Javaシリアライズを使用して、データセットの要素を単純な形式で書き込みます。これは、SparkContext.objectFile()を使用してロードできます。
countByKey() (K, V)型のRDDでのみ使用できます。各キーのカウントを持つ(K, Int)ペアのハッシュマップを返します。
foreach(func) データセットの各要素に対して関数funcを実行します。これは通常、アキュムレータの更新や外部ストレージシステムとのやり取りなどの副作用のために行われます。
foreach()の外でアキュムレータ以外の変数を変更すると、未定義の動作になる可能性があります。詳細については、クロージャの理解を参照してください。

Spark RDD APIは、foreachforeachAsyncなど、いくつかのアクションの非同期バージョンも公開しており、アクションの完了をブロックする代わりに、呼び出し元にFutureActionをすぐに返します。これは、アクションの非同期実行を管理または待機するために使用できます。

シャッフル操作

Spark内の一部の操作は、シャッフルと呼ばれるイベントをトリガーします。シャッフルは、パーティション間で異なるグループにデータが再配布されるためのSparkのメカニズムです。これは通常、エグゼキュータとマシン間でのデータのコピーを伴うため、シャッフルは複雑でコストのかかる操作になります。

背景

シャッフル中に何が起こるかを理解するために、reduceByKey操作の例を考えてみましょう。reduceByKey操作は、単一のキーのすべての値がタプルに結合される新しいRDDを生成します。これは、キーと、そのキーに関連付けられたすべての値に対してreduce関数を実行した結果です。課題は、単一のキーのすべての値が必ずしも同じパーティション、さらには同じマシンに存在しているとは限らないが、結果を計算するために同じ場所に配置する必要があることです。

Sparkでは、データは通常、特定の操作に必要な場所にパーティション間で分散されていません。計算中、単一のタスクは単一のパーティションで動作します。したがって、単一のreduceByKey reduceタスクが実行するためにすべてのデータを整理するには、Sparkはオールツーオール操作を実行する必要があります。すべてのキーのすべての値を見つけるためにすべてのパーティションから読み取り、パーティションを越えて値をまとめて、各キーの最終結果を計算する必要があります。これがシャッフルと呼ばれます。

新しくシャッフルされたデータの各パーティション内の要素のセットは決定論的であり、パーティション自体の順序も決定論的ですが、これらの要素の順序はそうではありません。シャッフル後に予測可能な順序付けられたデータが必要な場合は、次のものを使用できます。

シャッフルを引き起こす可能性のある操作には、repartitioncoalesceなどの再パーティション操作、groupByKeyreduceByKeyなどの‘ByKey操作(カウントを除く)、およびcogroupjoinなどの結合操作が含まれます。

パフォーマンスへの影響

シャッフルは、ディスクI/O、データシリアライゼーション、ネットワークI/Oを伴うため、コストのかかる操作です。シャッフルのためにデータを整理するために、Sparkはタスクのセットを生成します。データを整理するマップタスクと、それを集約するリデュースタスクのセットです。この命名法はMapReduceに由来しており、Sparkのmapおよびreduce操作に直接関係するものではありません。

内部的には、個々のマップタスクの結果は、収まらなくなるまでメモリに保持されます。次に、これらはターゲットパーティションに基づいてソートされ、単一のファイルに書き込まれます。リデュース側では、タスクは関連するソートされたブロックを読み取ります。

一部のシャッフル操作は、転送前または転送後にレコードを整理するためにインメモリデータ構造を使用するため、大量のヒープメモリを消費する可能性があります。具体的には、reduceByKeyaggregateByKeyはマップ側でこれらの構造を作成し、'ByKey操作はリデュース側でこれらを生成します。データがメモリに収まらない場合、Sparkはこれらのテーブルをディスクにスピルし、追加のディスクI/Oのオーバーヘッドとガベージコレクションの増加が発生します。

シャッフルは、ディスク上に大量の中間ファイルも生成します。Spark 1.3以降、これらのファイルは、対応するRDDが使用されなくなり、ガベージコレクションされるまで保持されます。これは、リネージが再計算された場合にシャッフルファイルを再作成する必要がないようにするためです。アプリケーションがこれらのRDDへの参照を保持している場合、またはGCが頻繁に開始されない場合、ガベージコレクションは長い時間が経過した後にのみ発生する可能性があります。これは、長期間実行されるSparkジョブが大量のディスク領域を消費する可能性があることを意味します。一時ストレージディレクトリは、Sparkコンテキストを構成するときにspark.local.dir構成パラメーターによって指定されます。

シャッフル動作は、さまざまな構成パラメーターを調整することで調整できます。「シャッフル動作」セクション内のSpark構成ガイドを参照してください。

RDDの永続化

Sparkの最も重要な機能の1つは、操作間でデータセットをメモリに永続化(またはキャッシュ)することです。RDDを永続化すると、各ノードは、計算したパーティションをメモリに保存し、そのデータセット(またはそこから派生したデータセット)に対する他のアクションで再利用します。これにより、今後のアクションがはるかに高速になります(多くの場合、10倍以上)。キャッシングは、反復アルゴリズムと高速なインタラクティブ使用のための重要なツールです。

RDDを永続化するようにマークするには、RDDのpersist()またはcache()メソッドを使用します。アクションで最初に計算されるとき、ノードのメモリに保持されます。Sparkのキャッシュはフォールトトレラントです。RDDのパーティションが失われた場合、最初に作成した変換を使用して自動的に再計算されます。

さらに、永続化された各RDDは、異なるストレージレベルを使用して保存できます。たとえば、データセットをディスクに永続化したり、メモリにシリアライズされたJavaオブジェクトとして永続化したり(スペースを節約するため)、ノード間でレプリケートしたりできます。これらのレベルは、StorageLevelオブジェクト(ScalaJavaPython)をpersist()に渡すことで設定されます。cache()メソッドは、デフォルトのストレージレベルであるStorageLevel.MEMORY_ONLY(メモリに逆シリアライズされたオブジェクトを格納)を使用するための省略形です。ストレージレベルの完全なセットは次のとおりです。

ストレージレベル意味
MEMORY_ONLY RDDをJVM内の逆シリアライズされたJavaオブジェクトとして保存します。RDDがメモリに収まらない場合、一部のパーティションはキャッシュされず、必要なときに毎回その場で再計算されます。これがデフォルトレベルです。
MEMORY_AND_DISK RDDをJVM内の逆シリアライズされたJavaオブジェクトとして保存します。RDDがメモリに収まらない場合は、収まらないパーティションをディスクに保存し、必要なときにそこから読み取ります。
MEMORY_ONLY_SER
(JavaおよびScala)
RDDをシリアライズされたJavaオブジェクト(パーティションごとに1つのバイト配列)として保存します。これは通常、逆シリアライズされたオブジェクトよりもスペース効率が優れており、特に高速シリアライザーを使用する場合にそうですが、読み取るのにCPU負荷が高くなります。
MEMORY_AND_DISK_SER
(JavaおよびScala)
MEMORY_ONLY_SERと同様ですが、メモリに収まらないパーティションは、必要なときに毎回その場で再計算する代わりに、ディスクにスピルします。
DISK_ONLY RDDパーティションをディスクのみに保存します。
MEMORY_ONLY_2、MEMORY_AND_DISK_2など。 上記のレベルと同じですが、各パーティションを2つのクラスターノードに複製します。
OFF_HEAP(実験的) MEMORY_ONLY_SERと同様ですが、データをオフヒープメモリに保存します。これには、オフヒープメモリを有効にする必要があります。

注:Pythonでは、保存されたオブジェクトは常にPickleライブラリでシリアライズされるため、シリアライズされたレベルを選択するかどうかは問題ではありません。Pythonで使用可能なストレージレベルには、MEMORY_ONLYMEMORY_ONLY_2MEMORY_AND_DISKMEMORY_AND_DISK_2DISK_ONLYDISK_ONLY_2、およびDISK_ONLY_3が含まれます。

Sparkは、ユーザーがpersistを呼び出さなくても、シャッフル操作(例:reduceByKey)で中間データを自動的に永続化します。これは、シャッフル中にノードが故障した場合に入力全体を再計算することを避けるためです。結果のRDDを再利用する場合は、ユーザーがpersistを呼び出すことをお勧めします。

どのストレージレベルを選択すべきか?

Sparkのストレージレベルは、メモリ使用量とCPU効率の間で異なるトレードオフを提供することを目的としています。いずれかを選択するには、次のプロセスを経ることをお勧めします。

データの削除

Sparkは、各ノードでのキャッシュ使用状況を自動的に監視し、最近最も使用されていない(LRU)方式で古いデータパーティションを削除します。キャッシュから削除されるのを待つのではなく、RDDを手動で削除したい場合は、RDD.unpersist()メソッドを使用します。このメソッドはデフォルトではブロックしないことに注意してください。リソースが解放されるまでブロックするには、このメソッドを呼び出すときにblocking=trueを指定します。

共有変数

通常、Sparkの操作(mapreduceなど)に渡された関数がリモートクラスタノードで実行されるとき、関数で使用されるすべての変数の個別のコピーで動作します。これらの変数は各マシンにコピーされ、リモートマシンでの変数への更新はドライバプログラムに伝播されません。タスク間で汎用的な読み取り/書き込み共有変数をサポートすると効率が悪くなります。ただし、Sparkは、2つの一般的な使用パターンに対して、2つの制限されたタイプの共有変数を提供します。それは、ブロードキャスト変数とアキュムレータです。

ブロードキャスト変数

ブロードキャスト変数を使用すると、プログラマは、タスクと一緒にコピーを送信するのではなく、各マシンで読み取り専用変数をキャッシュに保持できます。たとえば、すべてのノードに大きな入力データセットのコピーを効率的な方法で提供するために使用できます。Sparkは、通信コストを削減するために、効率的なブロードキャストアルゴリズムを使用してブロードキャスト変数を配布しようともします。

Sparkアクションは、分散型の「シャッフル」操作で区切られた一連のステージを通じて実行されます。Sparkは、各ステージ内のタスクに必要な共通データを自動的にブロードキャストします。この方法でブロードキャストされたデータは、シリアライズされた形式でキャッシュされ、各タスクを実行する前にデシリアライズされます。つまり、ブロードキャスト変数を明示的に作成することは、複数のステージにわたるタスクが同じデータを必要とする場合、またはデシリアライズされた形式でデータをキャッシュすることが重要な場合にのみ役立ちます。

ブロードキャスト変数は、SparkContext.broadcast(v)を呼び出すことにより、変数vから作成されます。ブロードキャスト変数はvのラッパーであり、その値はvalueメソッドを呼び出すことでアクセスできます。以下のコードはこれを示しています。

>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});

broadcastVar.value();
// returns [1, 2, 3]

ブロードキャスト変数が作成されたら、vがノードに複数回送信されないように、クラスタで実行される関数では値vの代わりにブロードキャスト変数を使用する必要があります。さらに、すべてのノードがブロードキャスト変数の同じ値を取得できるように(たとえば、変数が後で新しいノードに送信される場合)、ブロードキャスト後にオブジェクトvを変更しないでください。

ブロードキャスト変数がエクゼキューターにコピーしたリソースを解放するには、.unpersist()を呼び出します。ブロードキャストが後で再び使用される場合は、再ブロードキャストされます。ブロードキャスト変数で使用されるすべてのリソースを完全に解放するには、.destroy()を呼び出します。その後、ブロードキャスト変数を使用することはできません。これらのメソッドはデフォルトではブロックしないことに注意してください。リソースが解放されるまでブロックするには、それらを呼び出すときにblocking=trueを指定します。

アキュムレータ

アキュムレータは、連想的で可換的な操作を通じてのみ「追加」される変数であるため、並列で効率的にサポートできます。これらは、カウンター(MapReduceの場合のように)または合計を実装するために使用できます。Sparkは、数値型のアキュムレータをネイティブにサポートしており、プログラマは新しい型のサポートを追加できます。

ユーザーは、名前付きまたは名前なしのアキュムレータを作成できます。下の図に示すように、名前付きアキュムレータ(このインスタンスではcounter)は、そのアキュムレータを変更するステージのWeb UIに表示されます。Sparkは、「タスク」テーブルで、タスクによって変更された各アキュムレータの値を表示します。

Accumulators in the Spark UI

UIでアキュムレータを追跡することは、実行中のステージの進捗状況を理解するのに役立ちます(注:これはPythonではまだサポートされていません)。

アキュムレータは、SparkContext.accumulator(v)を呼び出すことにより、初期値vから作成されます。クラスタで実行されているタスクは、addメソッドまたは+=演算子を使用してそれに追加できます。ただし、その値を読み取ることはできません。ドライバプログラムのみが、valueメソッドを使用してアキュムレータの値を読み取ることができます。

以下のコードは、配列の要素を合計するためにアキュムレータを使用する方法を示しています。

>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

>>> accum.value
10

このコードでは、Int型のアキュムレータの組み込みサポートを使用しましたが、プログラマはAccumulatorParamをサブクラス化して独自の型を作成することもできます。AccumulatorParamインターフェースには、データ型の「ゼロ値」を提供するためのzeroと、2つの値を加算するためのaddInPlaceの2つのメソッドがあります。たとえば、数学ベクトルを表すVectorクラスがある場合、次のように記述できます。

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

数値アキュムレータは、それぞれLong型またはDouble型の値を累積するために、SparkContext.longAccumulator()またはSparkContext.doubleAccumulator()を呼び出すことで作成できます。クラスタで実行されているタスクは、addメソッドを使用してそれに追加できます。ただし、その値を読み取ることはできません。ドライバプログラムのみが、valueメソッドを使用してアキュムレータの値を読み取ることができます。

以下のコードは、配列の要素を合計するためにアキュムレータを使用する方法を示しています。

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

このコードでは、Long型のアキュムレータの組み込みサポートを使用しましたが、プログラマはAccumulatorV2をサブクラス化して独自の型を作成することもできます。AccumulatorV2抽象クラスには、オーバーライドする必要のあるいくつかのメソッドがあります。アキュムレータをゼロにリセットするためのreset、アキュムレータに別の値を追加するためのadd、別の同じ型のアキュムレータをこれにマージするためのmergeです。オーバーライドする必要があるその他のメソッドは、APIドキュメントに含まれています。たとえば、数学ベクトルを表すMyVectorクラスがある場合、次のように記述できます。

class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

  private val myVector: MyVector = MyVector.createZeroVector

  def reset(): Unit = {
    myVector.reset()
  }

  def add(v: MyVector): Unit = {
    myVector.add(v)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

プログラマが独自のAccumulatorV2型を定義する場合、結果の型が追加された要素の型と異なる場合があることに注意してください。

数値アキュムレータは、それぞれLong型またはDouble型の値を累積するために、SparkContext.longAccumulator()またはSparkContext.doubleAccumulator()を呼び出すことで作成できます。クラスタで実行されているタスクは、addメソッドを使用してそれに追加できます。ただし、その値を読み取ることはできません。ドライバプログラムのみが、valueメソッドを使用してアキュムレータの値を読み取ることができます。

以下のコードは、配列の要素を合計するためにアキュムレータを使用する方法を示しています。

LongAccumulator accum = jsc.sc().longAccumulator();

sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

accum.value();
// returns 10

このコードでは、Long型のアキュムレータの組み込みサポートを使用しましたが、プログラマはAccumulatorV2をサブクラス化して独自の型を作成することもできます。AccumulatorV2抽象クラスには、オーバーライドする必要のあるいくつかのメソッドがあります。アキュムレータをゼロにリセットするためのreset、アキュムレータに別の値を追加するためのadd、別の同じ型のアキュムレータをこれにマージするためのmergeです。オーバーライドする必要があるその他のメソッドは、APIドキュメントに含まれています。たとえば、数学ベクトルを表すMyVectorクラスがある場合、次のように記述できます。

class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {

  private MyVector myVector = MyVector.createZeroVector();

  public void reset() {
    myVector.reset();
  }

  public void add(MyVector v) {
    myVector.add(v);
  }
  ...
}

// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");

プログラマが独自のAccumulatorV2型を定義する場合、結果の型が追加された要素の型と異なる場合があることに注意してください。

警告:Sparkタスクが完了すると、Sparkはこのタスクで累積された更新をアキュムレータにマージしようとします。失敗した場合、Sparkは失敗を無視し、タスクを成功とマークして他のタスクの実行を継続します。したがって、バグのあるアキュムレータはSparkジョブに影響を与えませんが、Sparkジョブが成功しても正しく更新されない可能性があります。

アクション内でのみ実行されるアキュムレータの更新の場合、Sparkは、各タスクのアキュムレータへの更新が1回だけ適用されることを保証します。つまり、再起動されたタスクは値を更新しません。変換では、タスクまたはジョブステージが再実行された場合、各タスクの更新が複数回適用される可能性があることに注意する必要があります。

アキュムレータは、Sparkの遅延評価モデルを変更しません。RDDでの操作内で更新されている場合、その値は、RDDがアクションの一部として計算された場合にのみ更新されます。したがって、アキュムレータの更新は、map()のような遅延変換内で実行された場合、実行が保証されません。以下のコードフラグメントは、このプロパティを示しています。

accum = sc.accumulator(0)
def g(x):
    accum.add(x)
    return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.

クラスタへのデプロイ

アプリケーション送信ガイドでは、クラスタにアプリケーションを送信する方法について説明しています。簡単に言うと、アプリケーションをJAR(Java/Scalaの場合)または.pyまたは.zipファイル(Pythonの場合)にパッケージ化したら、bin/spark-submitスクリプトを使用して、サポートされている任意のクラスタマネージャーに送信できます。

Java / ScalaからのSparkジョブの起動

org.apache.spark.launcherパッケージは、シンプルなJava APIを使用して、子プロセスとしてSparkジョブを起動するためのクラスを提供します。

単体テスト

Sparkは、一般的なユニットテストフレームワークでのユニットテストに適しています。マスターURLがlocalに設定されたSparkContextをテストで作成し、操作を実行してから、SparkContext.stop()を呼び出して破棄するだけです。Sparkは同じプログラムで同時に実行される2つのコンテキストをサポートしていないため、finallyブロックまたはテストフレームワークのtearDownメソッド内でコンテキストを停止してください。

次のステップ

Spark WebサイトでいくつかのSparkプログラムの例を見ることができます。また、Sparkには、examplesディレクトリにいくつかのサンプルが含まれています(Scala, Java, Python, R)。JavaおよびScalaの例を実行するには、クラス名をSparkのbin/run-exampleスクリプトに渡します。たとえば、次のようになります。

./bin/run-example SparkPi

Pythonの例では、代わりにspark-submitを使用します。

./bin/spark-submit examples/src/main/python/pi.py

Rの例では、代わりにspark-submitを使用します。

./bin/spark-submit examples/src/main/r/dataframe.R

プログラムの最適化については、構成およびチューニングガイドに、ベストプラクティスに関する情報が記載されています。これらは、データが効率的な形式でメモリに格納されていることを確認するために特に重要です。デプロイについては、クラスタモードの概要で、分散操作に関与するコンポーネントとサポートされているクラスタマネージャーについて説明しています。

最後に、完全なAPIドキュメントは、ScalaJavaPythonおよびRで入手できます。