RDDプログラミングガイド
- 概要
- Sparkとのリンク
- Sparkの初期化
- Resilient Distributed Datasets (RDDs)
- 共有変数
- クラスタへのデプロイ
- Java / ScalaからのSparkジョブの起動
- 単体テスト
- 次のステップ
概要
大まかに言うと、すべてのSparkアプリケーションは、ユーザーのmain
関数を実行し、クラスタ上でさまざまな並列操作を実行するドライバプログラムで構成されます。Sparkが提供する主な抽象化は、Resilient Distributed Dataset (RDD)です。これは、並列に操作できるクラスタのノード間で分割された要素のコレクションです。RDDは、Hadoopファイルシステム(またはその他のHadoop対応ファイルシステム)内のファイル、またはドライバプログラム内の既存のScalaコレクションから開始し、それを変換することによって作成されます。ユーザーは、並列操作間で効率的に再利用できるように、RDDをメモリに永続化するようにSparkに要求することもできます。最後に、RDDはノード障害から自動的に回復します。
Sparkにおける2番目の抽象化は、並列操作で使用できる共有変数です。デフォルトでは、Sparkが異なるノード上でタスクのセットとして関数を並列に実行する場合、関数で使用される各変数のコピーを各タスクに送信します。場合によっては、変数をタスク間、またはタスクとドライバプログラム間で共有する必要があります。Sparkは、2種類の共有変数をサポートしています。ブロードキャスト変数は、すべてのノードのメモリに値をキャッシュするために使用でき、アキュムレータは、カウンターや合計など、 "追加"のみを行う変数です。
このガイドでは、Sparkがサポートする各言語でこれらの各機能を示します。Sparkのインタラクティブシェルを起動すると、理解しやすくなります。Scalaシェルの場合はbin/spark-shell
、Pythonシェルの場合はbin/pyspark
を使用してください。
Sparkとのリンク
Spark 3.5.1はPython 3.8以降で動作します。標準のCPythonインタープリターを使用できるため、NumPyのようなCライブラリを使用できます。また、PyPy 7.3.6以降でも動作します。
PythonでのSparkアプリケーションは、ランタイムにSparkを含むbin/spark-submit
スクリプトを使用して実行するか、setup.pyに含めることで実行できます。
install_requires=[
'pyspark=={site.SPARK_VERSION}'
]
pipを使用してPySparkをインストールせずにPythonでSparkアプリケーションを実行するには、Sparkディレクトリにあるbin/spark-submit
スクリプトを使用します。このスクリプトは、SparkのJava / Scalaライブラリをロードし、クラスタにアプリケーションをサブミットできるようにします。また、bin/pyspark
を使用してインタラクティブなPythonシェルを起動することもできます。
HDFSデータにアクセスする場合は、HDFSのバージョンにリンクするPySparkのビルドを使用する必要があります。プリビルドパッケージは、一般的なHDFSバージョン向けにSparkのホームページでも入手できます。
最後に、いくつかのSparkクラスをプログラムにインポートする必要があります。次の行を追加してください。
from pyspark import SparkContext, SparkConf
PySparkでは、ドライバとワーカーの両方で同じマイナーバージョンのPythonが必要です。PATHのデフォルトのPythonバージョンを使用しますが、PYSPARK_PYTHON
でどのバージョンのPythonを使用するかを指定できます。たとえば、
$ PYSPARK_PYTHON=python3.8 bin/pyspark
$ PYSPARK_PYTHON=/path-to-your-pypy/pypy bin/spark-submit examples/src/main/python/pi.py
Spark 3.5.1は、デフォルトでScala 2.12と連携するようにビルドおよび配布されています。(Sparkは、他のバージョンのScalaと連携するようにビルドすることもできます。)Scalaでアプリケーションを作成するには、互換性のあるScalaバージョン(例:2.12.X)を使用する必要があります。
Sparkアプリケーションを作成するには、SparkへのMaven依存関係を追加する必要があります。Sparkは、Maven Centralから入手できます。
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1
さらに、HDFSクラスターにアクセスする場合は、HDFSのバージョンに対応するhadoop-client
への依存関係を追加する必要があります。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後に、いくつかのSparkクラスをプログラムにインポートする必要があります。次の行を追加してください。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
(Spark 1.3.0より前のバージョンでは、必須の暗黙的な変換を有効にするために、明示的にimport org.apache.spark.SparkContext._
を行う必要があります。)
Spark 3.5.1は、関数を簡潔に記述するためのラムダ式をサポートしています。そうでない場合は、org.apache.spark.api.java.functionパッケージのクラスを使用できます。
Java 7のサポートは、Spark 2.2.0で削除されたことに注意してください。
JavaでSparkアプリケーションを作成するには、Sparkへの依存関係を追加する必要があります。Sparkは、Maven Centralから入手できます。
groupId = org.apache.spark
artifactId = spark-core_2.12
version = 3.5.1
さらに、HDFSクラスターにアクセスする場合は、HDFSのバージョンに対応するhadoop-client
への依存関係を追加する必要があります。
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>
最後に、いくつかのSparkクラスをプログラムにインポートする必要があります。次の行を追加してください。
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.SparkConf;
Sparkの初期化
Sparkプログラムが最初に行う必要があるのは、SparkContextオブジェクトを作成することです。これは、SparkがクラスタにどのようにアクセスするかをSparkに伝えます。SparkContext
を作成するには、最初にアプリケーションに関する情報を含むSparkConfオブジェクトをビルドする必要があります。
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
Sparkプログラムが最初に行う必要があるのは、SparkContextオブジェクトを作成することです。これは、SparkがクラスタにどのようにアクセスするかをSparkに伝えます。SparkContext
を作成するには、最初にアプリケーションに関する情報を含むSparkConfオブジェクトをビルドする必要があります。
JVMごとにアクティブなSparkContextは1つだけである必要があります。新しいSparkContextを作成する前に、アクティブなSparkContextをstop()
する必要があります。
val conf = new SparkConf().setAppName(appName).setMaster(master)
new SparkContext(conf)
Sparkプログラムが最初に行う必要があるのは、JavaSparkContextオブジェクトを作成することです。これは、SparkがクラスタにどのようにアクセスするかをSparkに伝えます。SparkContext
を作成するには、最初にアプリケーションに関する情報を含むSparkConfオブジェクトをビルドする必要があります。
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
appName
パラメーターは、クラスタUIに表示するアプリケーションの名前です。master
は、Spark、Mesos、またはYARNクラスターのURL、またはローカルモードで実行するための特別な「local」文字列です。実際には、クラスタ上で実行する場合、プログラムでmaster
をハードコーディングするのではなく、spark-submit
でアプリケーションを起動し、そこで受け取るようにします。ただし、ローカルテストと単体テストの場合は、「local」を渡してSparkをインプロセスで実行できます。
シェルを使用する
PySparkシェルでは、特別なインタープリター対応のSparkContextが、sc
という変数に既に作成されています。独自のSparkContextを作成しても機能しません。コンテキストが接続するマスターは、--master
引数を使用して設定できます。また、コンマ区切りのリストを--py-files
に渡すことで、Pythonの.zip、.egg、または.pyファイルをランタイムパスに追加できます。サードパーティのPython依存関係については、Pythonパッケージ管理を参照してください。また、コンマ区切りのMaven座標のリストを--packages
引数に指定することにより、シェルセッションに依存関係(例:Sparkパッケージ)を追加することもできます。依存関係が存在する可能性のある追加のリポジトリ(例:Sonatype)は、--repositories
引数に渡すことができます。たとえば、正確に4つのコアでbin/pyspark
を実行するには、次のように使用します。
$ ./bin/pyspark --master local[4]
または、検索パスにcode.py
も追加するには(後でimport code
できるように)、次のように使用します。
$ ./bin/pyspark --master local[4] --py-files code.py
オプションの完全なリストについては、pyspark --help
を実行してください。舞台裏では、pyspark
は、より一般的なspark-submit
スクリプトを呼び出します。
IPython(強化されたPythonインタープリター)でPySparkシェルを起動することも可能です。PySparkはIPython 1.0.0以降で動作します。IPythonを使用するには、bin/pyspark
を実行するときに、PYSPARK_DRIVER_PYTHON
変数をipython
に設定します。
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
Jupyterノートブック(以前はIPythonノートブックとして知られていました)を使用するには、
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
ipython
または jupyter
コマンドをカスタマイズするには、PYSPARK_DRIVER_PYTHON_OPTS
を設定します。
Jupyter Notebookサーバーが起動したら、「Files」タブから新しいノートブックを作成できます。ノートブック内で、JupyterノートブックからSparkを試す前に、コマンド %pylab inline
をノートブックの一部として入力できます。
Sparkシェルでは、特別なインタープリター対応のSparkContextがすでに変数 sc
に作成されています。独自のSparkContextを作成しても機能しません。コンテキストが接続するマスターは、--master
引数を使用して設定できます。また、コンマ区切りのリストを --jars
引数に渡すことで、JARをクラスパスに追加できます。依存関係(例えばSparkパッケージ)をシェルセッションに追加するには、Maven座標のコンマ区切りのリストを --packages
引数に渡します。依存関係が存在する可能性のある追加のリポジトリ(例えばSonatype)は、--repositories
引数に渡すことができます。例えば、正確に4つのコアで bin/spark-shell
を実行するには、次のように使用します。
$ ./bin/spark-shell --master local[4]
または、code.jar
をクラスパスに追加するには、次のように使用します。
$ ./bin/spark-shell --master local[4] --jars code.jar
Maven座標を使用して依存関係を含めるには
$ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
すべてのオプションのリストを表示するには、spark-shell --help
を実行します。内部的には、spark-shell
はより一般的な spark-submit
スクリプトを呼び出します。
Resilient Distributed Datasets (RDDs)
Sparkは、並列に操作できるフォールトトレラントな要素のコレクションであるresilient distributed dataset(RDD)の概念を中心に展開しています。RDDを作成する方法は2つあります。ドライバープログラムで既存のコレクションを並列化するか、共有ファイルシステム、HDFS、HBase、またはHadoop InputFormatを提供する任意のデータソースなどの外部ストレージシステムのデータセットを参照します。
並列化されたコレクション
並列化されたコレクションは、ドライバープログラムの既存のイテラブルまたはコレクションに対して SparkContext
の parallelize
メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。例えば、1から5までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
作成後、分散データセット(distData
)は並列に操作できます。例えば、リストの要素を加算するために distData.reduce(lambda a, b: a + b)
を呼び出すことができます。分散データセットの操作については後で説明します。
並列化されたコレクションは、ドライバープログラム(ScalaのSeq
)の既存のコレクションに対して SparkContext
の parallelize
メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。例えば、1から5までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
作成後、分散データセット(distData
)は並列に操作できます。例えば、配列の要素を加算するために distData.reduce((a, b) => a + b)
を呼び出すことができます。分散データセットの操作については後で説明します。
並列化されたコレクションは、ドライバープログラムの既存の Collection
に対して JavaSparkContext
の parallelize
メソッドを呼び出すことによって作成されます。コレクションの要素はコピーされ、並列に操作できる分散データセットを形成します。例えば、1から5までの数値を保持する並列化されたコレクションを作成する方法は次のとおりです。
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
作成後、分散データセット(distData
)は並列に操作できます。例えば、リストの要素を加算するために distData.reduce((a, b) -> a + b)
を呼び出すことができます。分散データセットの操作については後で説明します。
並列コレクションの重要なパラメーターの1つは、データセットを分割するパーティションの数です。Sparkは、クラスタの各パーティションに対して1つのタスクを実行します。通常、クラスタ内の各CPUに対して2〜4つのパーティションが必要になります。通常、Sparkはクラスタに基づいてパーティション数を自動的に設定しようとします。ただし、parallelize
に2番目のパラメーターとして渡すことで、手動で設定することもできます(例えば、sc.parallelize(data, 10)
)。注:コードの一部の場所では、後方互換性を維持するために、スライス(パーティションの同義語)という用語を使用しています。
外部データセット
PySparkは、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3など、Hadoopでサポートされている任意のストレージソースから分散データセットを作成できます。Sparkは、テキストファイル、SequenceFiles、およびその他のHadoop InputFormatをサポートしています。
テキストファイルRDDは、SparkContext
の textFile
メソッドを使用して作成できます。このメソッドは、ファイルへのURI(マシンのローカルパス、または hdfs://
、s3a://
などのURI)を受け取り、行のコレクションとして読み取ります。以下に呼び出しの例を示します。
>>> distFile = sc.textFile("data.txt")
作成後、distFile
はデータセット操作によって操作できます。例えば、次のように map
および reduce
操作を使用して、すべての行のサイズを加算できます。distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
。
Sparkでファイルを読み取る際の注意点
-
ローカルファイルシステムのパスを使用している場合、ファイルはワーカーノード上の同じパスからもアクセスできる必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークマウントされた共有ファイルシステムを使用します。
-
textFile
を含む、Sparkのすべてのファイルベースの入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードでの実行をサポートしています。例えば、textFile("/my/directory")
、textFile("/my/directory/*.txt")
、およびtextFile("/my/directory/*.gz")
を使用できます。 -
textFile
メソッドは、ファイルのパーティション数を制御するためのオプションの2番目の引数も受け取ります。デフォルトでは、Sparkはファイルの各ブロック(HDFSではデフォルトで128MBのブロック)に対して1つのパーティションを作成しますが、より大きな値を渡すことで、より多くのパーティションを要求することもできます。ブロック数よりも少ないパーティションを持つことはできないことに注意してください。
テキストファイル以外に、SparkのPython APIは他のいくつかのデータ形式もサポートしています。
-
SparkContext.wholeTextFiles
を使用すると、複数の小さなテキストファイルを含むディレクトリを読み取ることができ、それらの各ファイルを(ファイル名、コンテンツ)のペアとして返します。これは、各ファイルの行ごとに1つのレコードを返すtextFile
とは対照的です。 -
RDD.saveAsPickleFile
およびSparkContext.pickleFile
は、pickle化されたPythonオブジェクトで構成されるシンプルな形式でRDDを保存することをサポートします。バッチ処理は、デフォルトのバッチサイズ10でpickleシリアル化に使用されます。 -
SequenceFileおよびHadoop Input/Output形式
注 この機能は現在 Experimental
とマークされており、上級ユーザー向けです。将来的には、Spark SQLに基づいた読み取り/書き込みサポートに置き換えられる可能性があります。その場合、Spark SQLが推奨されるアプローチです。
Writableサポート
PySpark SequenceFileサポートは、Java内のキーと値のペアのRDDをロードし、Writablesを基本のJava型に変換し、pickle を使用して結果のJavaオブジェクトをpickle化します。キーと値のペアのRDDをSequenceFileに保存する場合、PySparkは逆のことを行います。PythonオブジェクトをJavaオブジェクトにアンピックルしてから、Writablesに変換します。次のWritablesは自動的に変換されます。
Writable型 | Python型 |
---|---|
Text | str |
IntWritable | int |
FloatWritable | float |
DoubleWritable | float |
BooleanWritable | bool |
BytesWritable | bytearray |
NullWritable | None |
MapWritable | dict |
配列はそのままでは処理されません。ユーザーは、読み取りまたは書き込み時にカスタムの ArrayWritable
サブタイプを指定する必要があります。書き込み時、ユーザーは配列をカスタムの ArrayWritable
サブタイプに変換するカスタムコンバーターも指定する必要があります。読み取り時、デフォルトのコンバーターは、カスタムの ArrayWritable
サブタイプをJavaの Object[]
に変換し、その後Pythonのタプルにpickle化されます。プリミティブ型の配列に対してPythonの array.array
を取得するには、ユーザーはカスタムコンバーターを指定する必要があります。
SequenceFileの保存とロード
テキストファイルと同様に、SequenceFileはパスを指定することで保存およびロードできます。キーと値のクラスを指定できますが、標準のWritablesにはこれは必要ありません。
>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
その他のHadoop Input/Output形式の保存とロード
PySparkは、「新しい」および「古い」Hadoop MapReduce APIの両方について、任意のHadoop InputFormatを読み取り、任意のHadoop OutputFormatを書き込むこともできます。必要に応じて、Hadoop構成をPython dictとして渡すことができます。以下に、Elasticsearch ESInputFormatを使用する例を示します。
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
InputFormatがHadoop構成および/または入力パスにのみ依存しており、キーと値のクラスが上記の表に従って簡単に変換できる場合、このアプローチはそのような場合にうまく機能することに注意してください。
(Cassandra/HBaseからデータをロードするなど)カスタムのシリアル化されたバイナリデータがある場合は、最初にScala/Java側でそのデータをpickleのpicklerで処理できるものに変換する必要があります。このために Converter トレイトが提供されています。このトレイトを拡張し、convert
メソッドに変換コードを実装するだけです。このクラスと、InputFormat
にアクセスするために必要な依存関係が、Sparkジョブjarにパッケージ化され、PySparkクラスパスに含まれていることを確認してください。
Cassandra/HBase InputFormat
と OutputFormat
をカスタムコンバーターで使用する例については、Pythonの例とConverterの例を参照してください。
Sparkは、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3など、Hadoopでサポートされている任意のストレージソースから分散データセットを作成できます。Sparkは、テキストファイル、SequenceFiles、およびその他のHadoop InputFormatをサポートしています。
テキストファイルRDDは、SparkContext
の textFile
メソッドを使用して作成できます。このメソッドは、ファイルへのURI(マシンのローカルパス、または hdfs://
、s3a://
などのURI)を受け取り、行のコレクションとして読み取ります。以下に呼び出しの例を示します。
scala> val distFile = sc.textFile("data.txt")
distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
作成後、distFile
はデータセット操作によって操作できます。例えば、次のように map
および reduce
操作を使用して、すべての行のサイズを加算できます。distFile.map(s => s.length).reduce((a, b) => a + b)
。
Sparkでファイルを読み取る際の注意点
-
ローカルファイルシステムのパスを使用している場合、ファイルはワーカーノード上の同じパスからもアクセスできる必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークマウントされた共有ファイルシステムを使用します。
-
Sparkのファイルベースのすべての入力メソッド(
textFile
を含む)は、ディレクトリ、圧縮ファイル、ワイルドカードでの実行をサポートしています。たとえば、textFile("/my/directory")
、textFile("/my/directory/*.txt")
、およびtextFile("/my/directory/*.gz")
を使用できます。複数のファイルを読み込む場合、パーティションの順序は、ファイルシステムから返されるファイルの順序によって異なります。例えば、ファイルのパスの辞書順に従う場合も、そうでない場合もあります。パーティション内では、要素は基になるファイルでの順序に従って順序付けられます。 -
textFile
メソッドは、ファイルのパーティション数を制御するためのオプションの2番目の引数も受け取ります。デフォルトでは、Sparkはファイルの各ブロック(HDFSではデフォルトで128MBのブロック)に対して1つのパーティションを作成しますが、より大きな値を渡すことで、より多くのパーティションを要求することもできます。ブロック数よりも少ないパーティションを持つことはできないことに注意してください。
テキストファイル以外にも、SparkのScala APIは他のいくつかのデータ形式をサポートしています。
-
SparkContext.wholeTextFiles
を使用すると、複数の小さなテキストファイルを含むディレクトリを読み取り、それらをそれぞれ(ファイル名、内容)のペアとして返します。これは、各ファイルの行ごとに1つのレコードを返すtextFile
とは対照的です。パーティショニングはデータの局所性によって決定されます。これにより、場合によってはパーティションが少なすぎる可能性があります。そのような場合、wholeTextFiles
は、最小パーティション数を制御するためのオプションの2番目の引数を提供します。 -
SequenceFileの場合、SparkContextの
sequenceFile[K, V]
メソッドを使用します。ここで、K
とV
は、ファイル内のキーと値の型です。これらは、IntWritableやTextなどのHadoopのWritableインターフェースのサブクラスである必要があります。さらに、Sparkでは、いくつかの一般的なWritableに対してネイティブ型を指定できます。たとえば、sequenceFile[Int, String]
はIntWritableとTextを自動的に読み込みます。 -
他のHadoop InputFormatの場合は、
SparkContext.hadoopRDD
メソッドを使用できます。このメソッドは、任意のJobConf
と入力形式クラス、キー・クラス、値クラスを取ります。これらは、入力ソースを持つHadoopジョブと同じ方法で設定します。また、「新しい」MapReduce API(org.apache.hadoop.mapreduce
)に基づくInputFormatには、SparkContext.newAPIHadoopRDD
を使用できます。 -
RDD.saveAsObjectFile
とSparkContext.objectFile
は、シリアライズされたJavaオブジェクトで構成されるシンプルな形式でRDDを保存することをサポートします。これはAvroのような特殊な形式ほど効率的ではありませんが、任意のRDDを保存する簡単な方法を提供します。
Sparkは、ローカルファイルシステム、HDFS、Cassandra、HBase、Amazon S3など、Hadoopでサポートされている任意のストレージソースから分散データセットを作成できます。Sparkは、テキストファイル、SequenceFiles、およびその他のHadoop InputFormatをサポートしています。
テキストファイルRDDは、SparkContext
の textFile
メソッドを使用して作成できます。このメソッドは、ファイルへのURI(マシンのローカルパス、または hdfs://
、s3a://
などのURI)を受け取り、行のコレクションとして読み取ります。以下に呼び出しの例を示します。
JavaRDD<String> distFile = sc.textFile("data.txt");
作成されたdistFile
は、データセット操作によって操作できます。例えば、以下のように、map
とreduce
操作を使用して、すべての行のサイズを合計できます。distFile.map(s -> s.length()).reduce((a, b) -> a + b)
。
Sparkでファイルを読み取る際の注意点
-
ローカルファイルシステムのパスを使用している場合、ファイルはワーカーノード上の同じパスからもアクセスできる必要があります。ファイルをすべてのワーカーにコピーするか、ネットワークマウントされた共有ファイルシステムを使用します。
-
textFile
を含む、Sparkのすべてのファイルベースの入力メソッドは、ディレクトリ、圧縮ファイル、およびワイルドカードでの実行をサポートしています。例えば、textFile("/my/directory")
、textFile("/my/directory/*.txt")
、およびtextFile("/my/directory/*.gz")
を使用できます。 -
textFile
メソッドは、ファイルのパーティション数を制御するためのオプションの2番目の引数も受け取ります。デフォルトでは、Sparkはファイルの各ブロック(HDFSではデフォルトで128MBのブロック)に対して1つのパーティションを作成しますが、より大きな値を渡すことで、より多くのパーティションを要求することもできます。ブロック数よりも少ないパーティションを持つことはできないことに注意してください。
テキストファイル以外にも、SparkのJava APIは他のいくつかのデータ形式をサポートしています。
-
JavaSparkContext.wholeTextFiles
を使用すると、複数の小さなテキストファイルを含むディレクトリを読み取り、それらをそれぞれ(ファイル名、内容)のペアとして返します。これは、各ファイルの行ごとに1つのレコードを返すtextFile
とは対照的です。 -
SequenceFileの場合、SparkContextの
sequenceFile[K, V]
メソッドを使用します。ここで、K
とV
は、ファイル内のキーと値の型です。これらは、IntWritableやTextなどのHadoopのWritableインターフェースのサブクラスである必要があります。 -
他のHadoop InputFormatの場合は、
JavaSparkContext.hadoopRDD
メソッドを使用できます。このメソッドは、任意のJobConf
と入力形式クラス、キー・クラス、値クラスを取ります。これらは、入力ソースを持つHadoopジョブと同じ方法で設定します。また、「新しい」MapReduce API(org.apache.hadoop.mapreduce
)に基づくInputFormatには、JavaSparkContext.newAPIHadoopRDD
を使用できます。 -
JavaRDD.saveAsObjectFile
とJavaSparkContext.objectFile
は、シリアライズされたJavaオブジェクトで構成されるシンプルな形式でRDDを保存することをサポートします。これはAvroのような特殊な形式ほど効率的ではありませんが、任意のRDDを保存する簡単な方法を提供します。
RDD操作
RDDは2種類の操作をサポートしています。既存のデータセットから新しいデータセットを作成する変換と、データセットで計算を実行した後、ドライバプログラムに値を返すアクションです。たとえば、map
は、各データセット要素を関数に通し、結果を表す新しいRDDを返す変換です。一方、reduce
は、いくつかの関数を使用してRDDのすべての要素を集約し、最終結果をドライバプログラムに返すアクションです(ただし、分散データセットを返す並列reduceByKey
もあります)。
Sparkのすべての変換は、結果をすぐに計算するのではなく、遅延しています。代わりに、それらは、何らかの基本データセット(例えば、ファイル)に適用された変換を記憶するだけです。変換は、アクションがドライバプログラムに返される結果を必要とするときにのみ計算されます。この設計により、Sparkはより効率的に実行できます。例えば、map
によって作成されたデータセットがreduce
で使用され、大きなマップされたデータセットではなく、reduce
の結果のみをドライバに返すことを実現できます。
デフォルトでは、変換された各RDDは、アクションを実行するたびに再計算される可能性があります。ただし、persist
(またはcache
)メソッドを使用して、RDDをメモリに永続化することもできます。その場合、Sparkは、次にクエリを実行するときに、はるかに高速にアクセスできるように、要素をクラスタ上に保持します。ディスクにRDDを永続化したり、複数のノードにレプリケートしたりするためのサポートもあります。
基本
RDDの基本を説明するために、以下の簡単なプログラムを検討してください。
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
最初の行は、外部ファイルから基本RDDを定義します。このデータセットは、メモリにロードされたり、その他の操作が実行されたりすることはありません。lines
は単なるファイルへのポインタです。2行目は、map
変換の結果としてlineLengths
を定義します。ここでも、遅延のため、lineLengths
はすぐには計算されません。最後に、アクションであるreduce
を実行します。この時点で、Sparkは計算を個別のマシンで実行するタスクに分割し、各マシンはマップの一部とローカル削減の両方を実行し、その答えのみをドライバプログラムに返します。
後でlineLengths
を再度使用したい場合は、
lineLengths.persist()
reduce
の前に、lineLengths
が最初に計算された後でメモリに保存されるように追加できます。
RDDの基本を説明するために、以下の簡単なプログラムを検討してください。
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
最初の行は、外部ファイルから基本RDDを定義します。このデータセットは、メモリにロードされたり、その他の操作が実行されたりすることはありません。lines
は単なるファイルへのポインタです。2行目は、map
変換の結果としてlineLengths
を定義します。ここでも、遅延のため、lineLengths
はすぐには計算されません。最後に、アクションであるreduce
を実行します。この時点で、Sparkは計算を個別のマシンで実行するタスクに分割し、各マシンはマップの一部とローカル削減の両方を実行し、その答えのみをドライバプログラムに返します。
後でlineLengths
を再度使用したい場合は、
lineLengths.persist()
reduce
の前に、lineLengths
が最初に計算された後でメモリに保存されるように追加できます。
RDDの基本を説明するために、以下の簡単なプログラムを検討してください。
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
最初の行は、外部ファイルから基本RDDを定義します。このデータセットは、メモリにロードされたり、その他の操作が実行されたりすることはありません。lines
は単なるファイルへのポインタです。2行目は、map
変換の結果としてlineLengths
を定義します。ここでも、遅延のため、lineLengths
はすぐには計算されません。最後に、アクションであるreduce
を実行します。この時点で、Sparkは計算を個別のマシンで実行するタスクに分割し、各マシンはマップの一部とローカル削減の両方を実行し、その答えのみをドライバプログラムに返します。
後でlineLengths
を再度使用したい場合は、
lineLengths.persist(StorageLevel.MEMORY_ONLY());
reduce
の前に、lineLengths
が最初に計算された後でメモリに保存されるように追加できます。
Sparkへの関数の受け渡し
SparkのAPIは、クラスタ上で実行するためにドライバプログラムで関数を渡すことに大きく依存しています。これを行うには、次の3つの推奨される方法があります。
- ラムダ式。式として記述できる単純な関数に使用します。(ラムダは、複数ステートメント関数や値を返さないステートメントをサポートしていません。)
- Sparkを呼び出す関数内のローカル
def
。長いコードの場合。 - モジュール内のトップレベル関数。
例えば、lambda
を使用してサポートできるよりも長い関数を渡すには、以下のコードを検討してください。
"""MyScript.py"""
if __name__ == "__main__":
def myFunc(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("file.txt").map(myFunc)
クラスインスタンス(シングルトンオブジェクトとは対照的)のメソッドへの参照を渡すことも可能ですが、これには、メソッドと一緒にそのクラスを含むオブジェクトを送信する必要があります。例えば、以下を検討してください。
class MyClass(object):
def func(self, s):
return s
def doStuff(self, rdd):
return rdd.map(self.func)
ここで、new MyClass
を作成し、それに対してdoStuff
を呼び出すと、内部のmap
は、*そのMyClass
インスタンス*のfunc
メソッドを参照するため、オブジェクト全体をクラスタに送信する必要があります。
同様に、外側のオブジェクトのフィールドにアクセスすると、オブジェクト全体が参照されます。
class MyClass(object):
def __init__(self):
self.field = "Hello"
def doStuff(self, rdd):
return rdd.map(lambda s: self.field + s)
この問題を回避する最も簡単な方法は、外部からアクセスする代わりに、field
をローカル変数にコピーすることです。
def doStuff(self, rdd):
field = self.field
return rdd.map(lambda s: field + s)
SparkのAPIは、クラスタ上で実行するためにドライバプログラムで関数を渡すことに大きく依存しています。これを行うには、次の2つの推奨される方法があります。
- 無名関数構文。短いコードで使用できます。
- グローバルシングルトンオブジェクト内の静的メソッド。例えば、
object MyFunctions
を定義し、以下のようにMyFunctions.func1
を渡すことができます。
object MyFunctions {
def func1(s: String): String = { ... }
}
myRdd.map(MyFunctions.func1)
クラスインスタンス(シングルトンオブジェクトとは対照的)のメソッドへの参照を渡すことも可能ですが、これには、メソッドと一緒にそのクラスを含むオブジェクトを送信する必要があります。例えば、以下を検討してください。
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
ここで、新しいMyClass
インスタンスを作成し、それに対してdoStuff
を呼び出すと、内部のmap
は、*そのMyClass
インスタンス*のfunc1
メソッドを参照するため、オブジェクト全体をクラスタに送信する必要があります。これは、rdd.map(x => this.func1(x))
を記述するのと似ています。
同様に、外側のオブジェクトのフィールドにアクセスすると、オブジェクト全体が参照されます。
class MyClass {
val field = "Hello"
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
}
rdd.map(x => this.field + x)
を記述するのと同じで、this
すべてを参照します。この問題を回避する最も簡単な方法は、外部からアクセスする代わりに、field
をローカル変数にコピーすることです。
def doStuff(rdd: RDD[String]): RDD[String] = {
val field_ = this.field
rdd.map(x => field_ + x)
}
SparkのAPIは、クラスタ上で実行するためにドライバプログラムで関数を渡すことに大きく依存しています。Javaでは、関数はorg.apache.spark.api.java.functionパッケージのインターフェースを実装するクラスによって表されます。このような関数を作成するには、次の2つの方法があります。
- 自分のクラスでFunctionインターフェースを実装し、無名インナークラスまたは名前付きクラスとして、そのインスタンスをSparkに渡します。
- ラムダ式を使用して、実装を簡潔に定義します。
このガイドの多くでは簡潔にするためにラムダ構文を使用していますが、長形式ですべて同じAPIを使用するのは簡単です。たとえば、上記のコードを次のように記述できたはずです。
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
public Integer call(String s) { return s.length(); }
});
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
または、関数をインラインで記述するのが面倒な場合。
class GetLength implements Function<String, Integer> {
public Integer call(String s) { return s.length(); }
}
class Sum implements Function2<Integer, Integer, Integer> {
public Integer call(Integer a, Integer b) { return a + b; }
}
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(new GetLength());
int totalLength = lineLengths.reduce(new Sum());
Javaの無名インナークラスは、final
でマークされている限り、外側のスコープの変数にもアクセスできることに注意してください。Sparkは、他の言語と同様に、これらの変数のコピーを各ワーカーノードに送信します。
クロージャの理解
Sparkの難しい点の1つは、クラスタ全体でコードを実行するときの変数とメソッドのスコープとライフサイクルを理解することです。スコープ外の変数を変更するRDD操作は、混乱を招くことが多い原因になります。以下の例では、foreach()
を使用してカウンタをインクリメントするコードを見ていきますが、他の操作でも同様の問題が発生する可能性があります。
例
以下に示す単純なRDD要素の合計は、実行が同じJVM内で行われるか否かによって異なる動作をする可能性があります。この一般的な例は、Sparkをlocal
モード(--master = local[n]
)で実行する場合と、Sparkアプリケーションをクラスターにデプロイする場合(例えば、spark-submitを使用してYARNにデプロイする場合)です。
counter = 0
rdd = sc.parallelize(data)
# Wrong: Don't do this!!
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
var counter = 0
var rdd = sc.parallelize(data)
// Wrong: Don't do this!!
rdd.foreach(x => counter += x)
println("Counter value: " + counter)
int counter = 0;
JavaRDD<Integer> rdd = sc.parallelize(data);
// Wrong: Don't do this!!
rdd.foreach(x -> counter += x);
println("Counter value: " + counter);
ローカルモードとクラスタモード
上記のコードの動作は未定義であり、意図したとおりに動作しない可能性があります。ジョブを実行するために、SparkはRDD操作の処理をタスクに分割し、各タスクはexecutorによって実行されます。実行前に、Sparkはタスクのクロージャを計算します。クロージャとは、executorがRDD上で計算を実行するために可視でなければならない変数とメソッドのことです(この場合はforeach()
)。このクロージャはシリアライズされ、各executorに送信されます。
各executorに送信されるクロージャ内の変数はコピーとなるため、foreach
関数内でcounterを参照した場合、もはやドライバーノード上のcounterではありません。ドライバーノードのメモリにはまだcounterが存在しますが、これはexecutorからは見えなくなっています!executorはシリアライズされたクロージャからのコピーのみを参照します。したがって、counterの最終値はゼロのままになります。なぜなら、counterに対するすべての操作がシリアライズされたクロージャ内の値を参照していたからです。
ローカルモードでは、場合によってはforeach
関数がドライバーと同じJVM内で実際に実行され、同じ元のcounterを参照して、実際に更新する可能性があります。
このようなシナリオで明確に定義された動作を保証するには、Accumulator
を使用する必要があります。Sparkのアキュムレータは、クラスター内のワーカーノード間で実行が分割された場合に、変数を安全に更新するためのメカニズムを提供するために特に使用されます。このガイドのアキュムレータのセクションでは、これらについてさらに詳しく説明します。
一般的に、ループやローカルで定義されたメソッドのようなクロージャは、グローバルな状態を変更するために使用すべきではありません。Sparkは、クロージャの外部から参照されるオブジェクトの変更の動作を定義または保証しません。これを行う一部のコードはローカルモードでは動作するかもしれませんが、それは単なる偶然であり、このようなコードは分散モードでは期待どおりに動作しません。グローバルな集計が必要な場合は、代わりにアキュムレータを使用してください。
RDDの要素の表示
もう1つの一般的なイディオムは、rdd.foreach(println)
またはrdd.map(println)
を使用してRDDの要素を出力しようとすることです。単一のマシンでは、これは期待される出力を生成し、RDDのすべての要素を出力します。ただし、cluster
モードでは、executorによって呼び出されるstdout
への出力は、ドライバーのstdout
ではなく、executorのstdout
に書き込まれるため、ドライバーのstdout
にはこれらは表示されません。ドライバーのすべての要素を出力するには、最初にcollect()
メソッドを使用してRDDをドライバーノードに取り込むことができます。例:rdd.collect().foreach(println)
。ただし、collect()
はRDD全体を単一のマシンにフェッチするため、これによりドライバーがメモリ不足になる可能性があります。RDDの少数の要素のみを出力する必要がある場合は、take()
を使用する方が安全です。例:rdd.take(100).foreach(println)
。
キーと値のペアの操作
ほとんどのSpark操作は任意の型のオブジェクトを含むRDDで動作しますが、いくつかの特別な操作はキーと値のペアのRDDでのみ利用可能です。最も一般的なものは、キーで要素をグループ化または集計するような分散型「シャッフル」操作です。
Pythonでは、これらの操作は(1, 2)
のような組み込みのPythonタプルを含むRDDで動作します。このようなタプルを作成してから、目的の操作を呼び出すだけです。
たとえば、次のコードでは、キーと値のペアに対してreduceByKey
操作を使用して、ファイル内の各テキスト行が何回発生するかをカウントします。
lines = sc.textFile("data.txt")
pairs = lines.map(lambda s: (s, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
また、たとえば、counts.sortByKey()
を使用してペアをアルファベット順にソートし、最後にcounts.collect()
を使用してオブジェクトのリストとしてドライバープログラムに戻すこともできます。
ほとんどのSpark操作は任意の型のオブジェクトを含むRDDで動作しますが、いくつかの特別な操作はキーと値のペアのRDDでのみ利用可能です。最も一般的なものは、キーで要素をグループ化または集計するような分散型「シャッフル」操作です。
Scalaでは、これらの操作はTuple2オブジェクト((a, b)
と記述するだけで作成できる言語の組み込みタプル)を含むRDDで自動的に利用できます。キーと値のペア操作は、タプルのRDDを自動的にラップするPairRDDFunctionsクラスで利用できます。
たとえば、次のコードでは、キーと値のペアに対してreduceByKey
操作を使用して、ファイル内の各テキスト行が何回発生するかをカウントします。
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
また、たとえば、counts.sortByKey()
を使用してペアをアルファベット順にソートし、最後にcounts.collect()
を使用してオブジェクトの配列としてドライバープログラムに戻すこともできます。
注意: カスタムオブジェクトをキーと値のペア操作のキーとして使用する場合は、カスタムのequals()
メソッドに一致するhashCode()
メソッドを必ず含める必要があります。詳細については、Object.hashCode()ドキュメントに記載されているコントラクトを参照してください。
ほとんどのSpark操作は任意の型のオブジェクトを含むRDDで動作しますが、いくつかの特別な操作はキーと値のペアのRDDでのみ利用可能です。最も一般的なものは、キーで要素をグループ化または集計するような分散型「シャッフル」操作です。
Javaでは、キーと値のペアはScala標準ライブラリのscala.Tuple2クラスを使用して表されます。new Tuple2(a, b)
を呼び出してタプルを作成し、後でtuple._1()
およびtuple._2()
でそのフィールドにアクセスできます。
キーと値のペアのRDDは、JavaPairRDDクラスで表されます。JavaRDDからJavaPairRDDを構築するには、mapToPair
やflatMapToPair
のようなmap
操作の特別なバージョンを使用します。JavaPairRDDには、標準のRDD関数と特別なキーと値の関数があります。
たとえば、次のコードでは、キーと値のペアに対してreduceByKey
操作を使用して、ファイル内の各テキスト行が何回発生するかをカウントします。
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
また、たとえば、counts.sortByKey()
を使用してペアをアルファベット順にソートし、最後にcounts.collect()
を使用してオブジェクトの配列としてドライバープログラムに戻すこともできます。
注意: カスタムオブジェクトをキーと値のペア操作のキーとして使用する場合は、カスタムのequals()
メソッドに一致するhashCode()
メソッドを必ず含める必要があります。詳細については、Object.hashCode()ドキュメントに記載されているコントラクトを参照してください。
変換
次の表に、Sparkでサポートされている一般的な変換の一部を示します。詳細については、RDD APIドキュメント(Scala、Java、Python、R)およびペアRDD関数ドキュメント(Scala、Java)を参照してください。
アクション
次の表は、Sparkでサポートされている一般的なアクションの一部を示しています。詳細については、RDD APIドキュメント(Scala、Java、Python、R)
とペアRDD関数ドキュメント(Scala、Java)を参照してください。
アクション | 意味 |
---|---|
reduce(func) | データセットの要素を関数func(2つの引数を取り、1つを返す)を使用して集約します。関数は、並列で正しく計算できるように、可換かつ結合的である必要があります。 |
collect() | データセットのすべての要素を、ドライバープログラムで配列として返します。これは通常、フィルタリングや、データの十分に小さなサブセットを返す他の操作の後で役立ちます。 |
count() | データセット内の要素の数を返します。 |
first() | データセットの最初の要素を返します(take(1)と同様)。 |
take(n) | データセットの最初のn個の要素を含む配列を返します。 |
takeSample(withReplacement, num, [seed]) | データセットからランダムにnum個の要素のサンプルを、置換ありまたは置換なしで返し、必要に応じて乱数ジェネレーターのシードを事前指定します。 |
takeOrdered(n, [ordering]) | RDDの最初のn個の要素を、自然順序またはカスタムコンパレータを使用して返します。 |
saveAsTextFile(path) | データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoop対応ファイルシステム内の指定されたディレクトリにテキストファイル(またはテキストファイルのセット)として書き込みます。Sparkは、各要素に対してtoStringを呼び出して、ファイルのテキスト行に変換します。 |
saveAsSequenceFile(path) (JavaおよびScala) |
データセットの要素を、ローカルファイルシステム、HDFS、またはその他のHadoop対応ファイルシステム内の指定されたパスにHadoop SequenceFileとして書き込みます。これは、HadoopのWritableインターフェイスを実装するキーと値のペアのRDDで使用できます。 Scalaでは、Writableに暗黙的に変換可能な型(Int、Double、Stringなどの基本型に対する変換がSparkに含まれています)でも利用できます。 |
saveAsObjectFile(path) (JavaおよびScala) |
Javaシリアライズを使用して、データセットの要素を単純な形式で書き込みます。これは、SparkContext.objectFile() を使用してロードできます。 |
countByKey() | (K, V)型のRDDでのみ使用できます。各キーのカウントを持つ(K, Int)ペアのハッシュマップを返します。 |
foreach(func) | データセットの各要素に対して関数funcを実行します。これは通常、アキュムレータの更新や外部ストレージシステムとのやり取りなどの副作用のために行われます。 注: foreach() の外でアキュムレータ以外の変数を変更すると、未定義の動作になる可能性があります。詳細については、クロージャの理解を参照してください。 |
Spark RDD APIは、foreach
のforeachAsync
など、いくつかのアクションの非同期バージョンも公開しており、アクションの完了をブロックする代わりに、呼び出し元にFutureAction
をすぐに返します。これは、アクションの非同期実行を管理または待機するために使用できます。
シャッフル操作
Spark内の一部の操作は、シャッフルと呼ばれるイベントをトリガーします。シャッフルは、パーティション間で異なるグループにデータが再配布されるためのSparkのメカニズムです。これは通常、エグゼキュータとマシン間でのデータのコピーを伴うため、シャッフルは複雑でコストのかかる操作になります。
背景
シャッフル中に何が起こるかを理解するために、reduceByKey
操作の例を考えてみましょう。reduceByKey
操作は、単一のキーのすべての値がタプルに結合される新しいRDDを生成します。これは、キーと、そのキーに関連付けられたすべての値に対してreduce関数を実行した結果です。課題は、単一のキーのすべての値が必ずしも同じパーティション、さらには同じマシンに存在しているとは限らないが、結果を計算するために同じ場所に配置する必要があることです。
Sparkでは、データは通常、特定の操作に必要な場所にパーティション間で分散されていません。計算中、単一のタスクは単一のパーティションで動作します。したがって、単一のreduceByKey
reduceタスクが実行するためにすべてのデータを整理するには、Sparkはオールツーオール操作を実行する必要があります。すべてのキーのすべての値を見つけるためにすべてのパーティションから読み取り、パーティションを越えて値をまとめて、各キーの最終結果を計算する必要があります。これがシャッフルと呼ばれます。
新しくシャッフルされたデータの各パーティション内の要素のセットは決定論的であり、パーティション自体の順序も決定論的ですが、これらの要素の順序はそうではありません。シャッフル後に予測可能な順序付けられたデータが必要な場合は、次のものを使用できます。
mapPartitions
を使用して、たとえば.sorted
を使用して各パーティションをソートしますrepartitionAndSortWithinPartitions
を使用して、同時に再パーティション化しながらパーティションを効率的にソートしますsortBy
を使用してグローバルに順序付けられたRDDを作成します
シャッフルを引き起こす可能性のある操作には、repartition
やcoalesce
などの再パーティション操作、groupByKey
やreduceByKey
などの‘ByKey操作(カウントを除く)、およびcogroup
やjoin
などの結合操作が含まれます。
パフォーマンスへの影響
シャッフルは、ディスクI/O、データシリアライゼーション、ネットワークI/Oを伴うため、コストのかかる操作です。シャッフルのためにデータを整理するために、Sparkはタスクのセットを生成します。データを整理するマップタスクと、それを集約するリデュースタスクのセットです。この命名法はMapReduceに由来しており、Sparkのmap
およびreduce
操作に直接関係するものではありません。
内部的には、個々のマップタスクの結果は、収まらなくなるまでメモリに保持されます。次に、これらはターゲットパーティションに基づいてソートされ、単一のファイルに書き込まれます。リデュース側では、タスクは関連するソートされたブロックを読み取ります。
一部のシャッフル操作は、転送前または転送後にレコードを整理するためにインメモリデータ構造を使用するため、大量のヒープメモリを消費する可能性があります。具体的には、reduceByKey
とaggregateByKey
はマップ側でこれらの構造を作成し、'ByKey
操作はリデュース側でこれらを生成します。データがメモリに収まらない場合、Sparkはこれらのテーブルをディスクにスピルし、追加のディスクI/Oのオーバーヘッドとガベージコレクションの増加が発生します。
シャッフルは、ディスク上に大量の中間ファイルも生成します。Spark 1.3以降、これらのファイルは、対応するRDDが使用されなくなり、ガベージコレクションされるまで保持されます。これは、リネージが再計算された場合にシャッフルファイルを再作成する必要がないようにするためです。アプリケーションがこれらのRDDへの参照を保持している場合、またはGCが頻繁に開始されない場合、ガベージコレクションは長い時間が経過した後にのみ発生する可能性があります。これは、長期間実行されるSparkジョブが大量のディスク領域を消費する可能性があることを意味します。一時ストレージディレクトリは、Sparkコンテキストを構成するときにspark.local.dir
構成パラメーターによって指定されます。
シャッフル動作は、さまざまな構成パラメーターを調整することで調整できます。「シャッフル動作」セクション内のSpark構成ガイドを参照してください。
RDDの永続化
Sparkの最も重要な機能の1つは、操作間でデータセットをメモリに永続化(またはキャッシュ)することです。RDDを永続化すると、各ノードは、計算したパーティションをメモリに保存し、そのデータセット(またはそこから派生したデータセット)に対する他のアクションで再利用します。これにより、今後のアクションがはるかに高速になります(多くの場合、10倍以上)。キャッシングは、反復アルゴリズムと高速なインタラクティブ使用のための重要なツールです。
RDDを永続化するようにマークするには、RDDのpersist()
またはcache()
メソッドを使用します。アクションで最初に計算されるとき、ノードのメモリに保持されます。Sparkのキャッシュはフォールトトレラントです。RDDのパーティションが失われた場合、最初に作成した変換を使用して自動的に再計算されます。
さらに、永続化された各RDDは、異なるストレージレベルを使用して保存できます。たとえば、データセットをディスクに永続化したり、メモリにシリアライズされたJavaオブジェクトとして永続化したり(スペースを節約するため)、ノード間でレプリケートしたりできます。これらのレベルは、StorageLevel
オブジェクト(Scala、Java、Python)をpersist()
に渡すことで設定されます。cache()
メソッドは、デフォルトのストレージレベルであるStorageLevel.MEMORY_ONLY
(メモリに逆シリアライズされたオブジェクトを格納)を使用するための省略形です。ストレージレベルの完全なセットは次のとおりです。
ストレージレベル | 意味 |
---|---|
MEMORY_ONLY | RDDをJVM内の逆シリアライズされたJavaオブジェクトとして保存します。RDDがメモリに収まらない場合、一部のパーティションはキャッシュされず、必要なときに毎回その場で再計算されます。これがデフォルトレベルです。 |
MEMORY_AND_DISK | RDDをJVM内の逆シリアライズされたJavaオブジェクトとして保存します。RDDがメモリに収まらない場合は、収まらないパーティションをディスクに保存し、必要なときにそこから読み取ります。 |
MEMORY_ONLY_SER (JavaおよびScala) |
RDDをシリアライズされたJavaオブジェクト(パーティションごとに1つのバイト配列)として保存します。これは通常、逆シリアライズされたオブジェクトよりもスペース効率が優れており、特に高速シリアライザーを使用する場合にそうですが、読み取るのにCPU負荷が高くなります。 |
MEMORY_AND_DISK_SER (JavaおよびScala) |
MEMORY_ONLY_SERと同様ですが、メモリに収まらないパーティションは、必要なときに毎回その場で再計算する代わりに、ディスクにスピルします。 |
DISK_ONLY | RDDパーティションをディスクのみに保存します。 |
MEMORY_ONLY_2、MEMORY_AND_DISK_2など。 | 上記のレベルと同じですが、各パーティションを2つのクラスターノードに複製します。 |
OFF_HEAP(実験的) | MEMORY_ONLY_SERと同様ですが、データをオフヒープメモリに保存します。これには、オフヒープメモリを有効にする必要があります。 |
注:Pythonでは、保存されたオブジェクトは常にPickleライブラリでシリアライズされるため、シリアライズされたレベルを選択するかどうかは問題ではありません。Pythonで使用可能なストレージレベルには、MEMORY_ONLY
、MEMORY_ONLY_2
、MEMORY_AND_DISK
、MEMORY_AND_DISK_2
、DISK_ONLY
、DISK_ONLY_2
、およびDISK_ONLY_3
が含まれます。
Sparkは、ユーザーがpersist
を呼び出さなくても、シャッフル操作(例:reduceByKey
)で中間データを自動的に永続化します。これは、シャッフル中にノードが故障した場合に入力全体を再計算することを避けるためです。結果のRDDを再利用する場合は、ユーザーがpersist
を呼び出すことをお勧めします。
どのストレージレベルを選択すべきか?
Sparkのストレージレベルは、メモリ使用量とCPU効率の間で異なるトレードオフを提供することを目的としています。いずれかを選択するには、次のプロセスを経ることをお勧めします。
-
RDDがデフォルトのストレージレベル(
MEMORY_ONLY
)に快適に収まる場合は、そのままにしておいてください。これが最もCPU効率の高いオプションであり、RDDに対する操作を可能な限り高速に実行できます。 -
そうでない場合は、
MEMORY_ONLY_SER
を使用し、高速なシリアライゼーションライブラリを選択して、オブジェクトのスペース効率を大幅に向上させつつ、アクセス速度を適度に保つようにしてください(JavaとScala)。 -
データセットを計算した関数が高価であるか、大量のデータをフィルタリングする場合を除き、ディスクにスピルしないでください。そうでない場合、パーティションの再計算は、ディスクから読み取るのと同程度に高速になる可能性があります。
-
高速な障害復旧が必要な場合は、レプリケートされたストレージレベルを使用してください(例:Webアプリケーションからリクエストを処理するためにSparkを使用する場合)。すべてのストレージレベルは、失われたデータを再計算することにより、完全な耐障害性を提供しますが、レプリケートされたレベルでは、失われたパーティションを再計算するのを待たずに、RDDでタスクの実行を継続できます。
データの削除
Sparkは、各ノードでのキャッシュ使用状況を自動的に監視し、最近最も使用されていない(LRU)方式で古いデータパーティションを削除します。キャッシュから削除されるのを待つのではなく、RDDを手動で削除したい場合は、RDD.unpersist()
メソッドを使用します。このメソッドはデフォルトではブロックしないことに注意してください。リソースが解放されるまでブロックするには、このメソッドを呼び出すときにblocking=true
を指定します。
共有変数
通常、Sparkの操作(map
やreduce
など)に渡された関数がリモートクラスタノードで実行されるとき、関数で使用されるすべての変数の個別のコピーで動作します。これらの変数は各マシンにコピーされ、リモートマシンでの変数への更新はドライバプログラムに伝播されません。タスク間で汎用的な読み取り/書き込み共有変数をサポートすると効率が悪くなります。ただし、Sparkは、2つの一般的な使用パターンに対して、2つの制限されたタイプの共有変数を提供します。それは、ブロードキャスト変数とアキュムレータです。
ブロードキャスト変数
ブロードキャスト変数を使用すると、プログラマは、タスクと一緒にコピーを送信するのではなく、各マシンで読み取り専用変数をキャッシュに保持できます。たとえば、すべてのノードに大きな入力データセットのコピーを効率的な方法で提供するために使用できます。Sparkは、通信コストを削減するために、効率的なブロードキャストアルゴリズムを使用してブロードキャスト変数を配布しようともします。
Sparkアクションは、分散型の「シャッフル」操作で区切られた一連のステージを通じて実行されます。Sparkは、各ステージ内のタスクに必要な共通データを自動的にブロードキャストします。この方法でブロードキャストされたデータは、シリアライズされた形式でキャッシュされ、各タスクを実行する前にデシリアライズされます。つまり、ブロードキャスト変数を明示的に作成することは、複数のステージにわたるタスクが同じデータを必要とする場合、またはデシリアライズされた形式でデータをキャッシュすることが重要な場合にのみ役立ちます。
ブロードキャスト変数は、SparkContext.broadcast(v)
を呼び出すことにより、変数v
から作成されます。ブロードキャスト変数はv
のラッパーであり、その値はvalue
メソッドを呼び出すことでアクセスできます。以下のコードはこれを示しています。
>>> broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>
>>> broadcastVar.value
[1, 2, 3]
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
broadcastVar.value();
// returns [1, 2, 3]
ブロードキャスト変数が作成されたら、v
がノードに複数回送信されないように、クラスタで実行される関数では値v
の代わりにブロードキャスト変数を使用する必要があります。さらに、すべてのノードがブロードキャスト変数の同じ値を取得できるように(たとえば、変数が後で新しいノードに送信される場合)、ブロードキャスト後にオブジェクトv
を変更しないでください。
ブロードキャスト変数がエクゼキューターにコピーしたリソースを解放するには、.unpersist()
を呼び出します。ブロードキャストが後で再び使用される場合は、再ブロードキャストされます。ブロードキャスト変数で使用されるすべてのリソースを完全に解放するには、.destroy()
を呼び出します。その後、ブロードキャスト変数を使用することはできません。これらのメソッドはデフォルトではブロックしないことに注意してください。リソースが解放されるまでブロックするには、それらを呼び出すときにblocking=true
を指定します。
アキュムレータ
アキュムレータは、連想的で可換的な操作を通じてのみ「追加」される変数であるため、並列で効率的にサポートできます。これらは、カウンター(MapReduceの場合のように)または合計を実装するために使用できます。Sparkは、数値型のアキュムレータをネイティブにサポートしており、プログラマは新しい型のサポートを追加できます。
ユーザーは、名前付きまたは名前なしのアキュムレータを作成できます。下の図に示すように、名前付きアキュムレータ(このインスタンスではcounter
)は、そのアキュムレータを変更するステージのWeb UIに表示されます。Sparkは、「タスク」テーブルで、タスクによって変更された各アキュムレータの値を表示します。
UIでアキュムレータを追跡することは、実行中のステージの進捗状況を理解するのに役立ちます(注:これはPythonではまだサポートされていません)。
アキュムレータは、SparkContext.accumulator(v)
を呼び出すことにより、初期値v
から作成されます。クラスタで実行されているタスクは、add
メソッドまたは+=
演算子を使用してそれに追加できます。ただし、その値を読み取ることはできません。ドライバプログラムのみが、value
メソッドを使用してアキュムレータの値を読み取ることができます。
以下のコードは、配列の要素を合計するためにアキュムレータを使用する方法を示しています。
>>> accum = sc.accumulator(0)
>>> accum
Accumulator<id=0, value=0>
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
>>> accum.value
10
このコードでは、Int型のアキュムレータの組み込みサポートを使用しましたが、プログラマはAccumulatorParamをサブクラス化して独自の型を作成することもできます。AccumulatorParamインターフェースには、データ型の「ゼロ値」を提供するためのzero
と、2つの値を加算するためのaddInPlace
の2つのメソッドがあります。たとえば、数学ベクトルを表すVector
クラスがある場合、次のように記述できます。
class VectorAccumulatorParam(AccumulatorParam):
def zero(self, initialValue):
return Vector.zeros(initialValue.size)
def addInPlace(self, v1, v2):
v1 += v2
return v1
# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
数値アキュムレータは、それぞれLong型またはDouble型の値を累積するために、SparkContext.longAccumulator()
またはSparkContext.doubleAccumulator()
を呼び出すことで作成できます。クラスタで実行されているタスクは、add
メソッドを使用してそれに追加できます。ただし、その値を読み取ることはできません。ドライバプログラムのみが、value
メソッドを使用してアキュムレータの値を読み取ることができます。
以下のコードは、配列の要素を合計するためにアキュムレータを使用する方法を示しています。
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
このコードでは、Long型のアキュムレータの組み込みサポートを使用しましたが、プログラマはAccumulatorV2をサブクラス化して独自の型を作成することもできます。AccumulatorV2抽象クラスには、オーバーライドする必要のあるいくつかのメソッドがあります。アキュムレータをゼロにリセットするためのreset
、アキュムレータに別の値を追加するためのadd
、別の同じ型のアキュムレータをこれにマージするためのmerge
です。オーバーライドする必要があるその他のメソッドは、APIドキュメントに含まれています。たとえば、数学ベクトルを表すMyVector
クラスがある場合、次のように記述できます。
class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
private val myVector: MyVector = MyVector.createZeroVector
def reset(): Unit = {
myVector.reset()
}
def add(v: MyVector): Unit = {
myVector.add(v)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
プログラマが独自のAccumulatorV2型を定義する場合、結果の型が追加された要素の型と異なる場合があることに注意してください。
数値アキュムレータは、それぞれLong型またはDouble型の値を累積するために、SparkContext.longAccumulator()
またはSparkContext.doubleAccumulator()
を呼び出すことで作成できます。クラスタで実行されているタスクは、add
メソッドを使用してそれに追加できます。ただし、その値を読み取ることはできません。ドライバプログラムのみが、value
メソッドを使用してアキュムレータの値を読み取ることができます。
以下のコードは、配列の要素を合計するためにアキュムレータを使用する方法を示しています。
LongAccumulator accum = jsc.sc().longAccumulator();
sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
// ...
// 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
accum.value();
// returns 10
このコードでは、Long型のアキュムレータの組み込みサポートを使用しましたが、プログラマはAccumulatorV2をサブクラス化して独自の型を作成することもできます。AccumulatorV2抽象クラスには、オーバーライドする必要のあるいくつかのメソッドがあります。アキュムレータをゼロにリセットするためのreset
、アキュムレータに別の値を追加するためのadd
、別の同じ型のアキュムレータをこれにマージするためのmerge
です。オーバーライドする必要があるその他のメソッドは、APIドキュメントに含まれています。たとえば、数学ベクトルを表すMyVector
クラスがある場合、次のように記述できます。
class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
private MyVector myVector = MyVector.createZeroVector();
public void reset() {
myVector.reset();
}
public void add(MyVector v) {
myVector.add(v);
}
...
}
// Then, create an Accumulator of this type:
VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
// Then, register it into spark context:
jsc.sc().register(myVectorAcc, "MyVectorAcc1");
プログラマが独自のAccumulatorV2型を定義する場合、結果の型が追加された要素の型と異なる場合があることに注意してください。
警告:Sparkタスクが完了すると、Sparkはこのタスクで累積された更新をアキュムレータにマージしようとします。失敗した場合、Sparkは失敗を無視し、タスクを成功とマークして他のタスクの実行を継続します。したがって、バグのあるアキュムレータはSparkジョブに影響を与えませんが、Sparkジョブが成功しても正しく更新されない可能性があります。
アクション内でのみ実行されるアキュムレータの更新の場合、Sparkは、各タスクのアキュムレータへの更新が1回だけ適用されることを保証します。つまり、再起動されたタスクは値を更新しません。変換では、タスクまたはジョブステージが再実行された場合、各タスクの更新が複数回適用される可能性があることに注意する必要があります。
アキュムレータは、Sparkの遅延評価モデルを変更しません。RDDでの操作内で更新されている場合、その値は、RDDがアクションの一部として計算された場合にのみ更新されます。したがって、アキュムレータの更新は、map()
のような遅延変換内で実行された場合、実行が保証されません。以下のコードフラグメントは、このプロパティを示しています。
accum = sc.accumulator(0)
def g(x):
accum.add(x)
return f(x)
data.map(g)
# Here, accum is still 0 because no actions have caused the `map` to be computed.
val accum = sc.longAccumulator
data.map { x => accum.add(x); x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.
LongAccumulator accum = jsc.sc().longAccumulator();
data.map(x -> { accum.add(x); return f(x); });
// Here, accum is still 0 because no actions have caused the `map` to be computed.
クラスタへのデプロイ
アプリケーション送信ガイドでは、クラスタにアプリケーションを送信する方法について説明しています。簡単に言うと、アプリケーションをJAR(Java/Scalaの場合)または.py
または.zip
ファイル(Pythonの場合)にパッケージ化したら、bin/spark-submit
スクリプトを使用して、サポートされている任意のクラスタマネージャーに送信できます。
Java / ScalaからのSparkジョブの起動
org.apache.spark.launcherパッケージは、シンプルなJava APIを使用して、子プロセスとしてSparkジョブを起動するためのクラスを提供します。
単体テスト
Sparkは、一般的なユニットテストフレームワークでのユニットテストに適しています。マスターURLがlocal
に設定されたSparkContext
をテストで作成し、操作を実行してから、SparkContext.stop()
を呼び出して破棄するだけです。Sparkは同じプログラムで同時に実行される2つのコンテキストをサポートしていないため、finally
ブロックまたはテストフレームワークのtearDown
メソッド内でコンテキストを停止してください。
次のステップ
Spark WebサイトでいくつかのSparkプログラムの例を見ることができます。また、Sparkには、examples
ディレクトリにいくつかのサンプルが含まれています(Scala, Java, Python, R)。JavaおよびScalaの例を実行するには、クラス名をSparkのbin/run-example
スクリプトに渡します。たとえば、次のようになります。
./bin/run-example SparkPi
Pythonの例では、代わりにspark-submit
を使用します。
./bin/spark-submit examples/src/main/python/pi.py
Rの例では、代わりにspark-submit
を使用します。
./bin/spark-submit examples/src/main/r/dataframe.R
プログラムの最適化については、構成およびチューニングガイドに、ベストプラクティスに関する情報が記載されています。これらは、データが効率的な形式でメモリに格納されていることを確認するために特に重要です。デプロイについては、クラスタモードの概要で、分散操作に関与するコンポーネントとサポートされているクラスタマネージャーについて説明しています。