Spark のチューニング
ほとんどの Spark 計算はインメモリで行われるため、Spark プログラムはクラスタ内のあらゆるリソース(CPU、ネットワーク帯域幅、メモリ)によってボトルネックとなる可能性があります。ほとんどの場合、データがメモリに収まればボトルネックはネットワーク帯域幅になりますが、メモリ使用量を削減するために、RDD をシリアライズ形式で保存するなど、チューニングが必要になる場合もあります。このガイドでは、優れたネットワークパフォーマンスに不可欠であり、メモリ使用量も削減できるデータシリアライゼーションと、メモリチューニングという 2 つの主要なトピックについて説明します。また、いくつかの小さなトピックについても簡単に説明します。
データシリアライゼーション
シリアライゼーションは、あらゆる分散アプリケーションのパフォーマンスにおいて重要な役割を果たします。オブジェクトのシリアライズが遅い形式や、大量のバイトを消費する形式は、計算を大幅に遅くします。多くの場合、これは Spark アプリケーションを最適化するために最初に調整すべき点です。Spark は、利便性(操作で任意の Java タイプを使用できる)とパフォーマンスのバランスを取ることを目指しています。2 つのシリアライゼーションライブラリを提供しています。
- Java シリアライゼーション:デフォルトでは、Spark は Java の
ObjectOutputStream
フレームワークを使用してオブジェクトをシリアライズし、java.io.Serializable
を実装する作成したクラスで動作します。java.io.Externalizable
を拡張することで、シリアライゼーションのパフォーマンスをより細かく制御することもできます。Java シリアライゼーションは柔軟ですが、多くの場合非常に遅く、多くのクラスで大きなシリアライズ形式になります。 - Kryo シリアライゼーション:Spark は、Kryo ライブラリ(バージョン 4)を使用してオブジェクトをより高速にシリアライズすることもできます。Kryo は Java シリアライゼーションよりも大幅に高速でコンパクトです(多くの場合 10 倍も高速です)が、すべての
Serializable
タイプをサポートしているわけではなく、最適なパフォーマンスを得るためには、プログラムで使用するクラスを事前に *登録* する必要があります。
SparkConf でジョブを初期化し、conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
を呼び出すことで、Kryo を使用するように切り替えることができます。この設定は、ワーカーノード間でデータをシャッフルするだけでなく、RDD をディスクにシリアライズする場合にも使用されるシリアライザを設定します。Kryo がデフォルトではない唯一の理由は、カスタム登録の要件があるためですが、ネットワークを集中的に使用するアプリケーションでは試してみることをお勧めします。Spark 2.0.0 以降、単純な型、単純な型の配列、または文字列型の RDD をシャッフルする際に、内部的に Kryo シリアライザを使用しています。
Spark は、Twitter chill ライブラリの AllScalaRegistrar でカバーされている、多くのよく使用されるコア Scala クラスの Kryo シリアライザを自動的に含んでいます。
独自のカスタムクラスを Kryo に登録するには、registerKryoClasses
メソッドを使用します。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
Kryo のドキュメントでは、カスタムシリアライゼーションコードの追加など、より高度な登録オプションについて説明しています。
オブジェクトが大きい場合は、設定 の spark.kryoserializer.buffer
を増やす必要がある場合もあります。この値は、シリアライズする *最大* のオブジェクトを保持できる大きさにする必要があります。
最後に、カスタムクラスを登録しない場合でも Kryo は機能しますが、各オブジェクトに完全なクラス名を保存する必要があり、無駄になります。
メモリチューニング
メモリ使用量のチューニングには、3 つの考慮事項があります。オブジェクトが使用するメモリの *量* (データセット全体をメモリに収めたい場合があります)、それらのオブジェクトにアクセスする *コスト* 、および *ガベージコレクション* のオーバーヘッド(オブジェクトのターンオーバーが高い場合)です。
デフォルトでは、Java オブジェクトへのアクセスは高速ですが、フィールド内の「生の」データの 2〜5 倍のスペースを簡単に消費する可能性があります。これはいくつかの理由によるものです。
- 個別の Java オブジェクトにはそれぞれ「オブジェクトヘッダー」があり、これは約 16 バイトで、クラスへのポインタなどの情報が含まれています。データが非常に少ないオブジェクト(たとえば、1 つの
Int
フィールド)の場合、これはデータよりも大きくなる可能性があります。 - Java の
String
は、生の文字列データに約 40 バイトのオーバーヘッドがあります(Char
の配列に格納し、長さなどの追加データを保持するため)、String
内部で UTF-16 エンコーディングを使用しているため、各文字を *2* バイトとして格納します。したがって、10 文字の文字列は簡単に 60 バイトを消費する可能性があります。 HashMap
やLinkedList
などの一般的なコレクションクラスは、リンクされたデータ構造を使用します。そこでは、各エントリ(例:Map.Entry
)に「ラッパー」オブジェクトがあります。このオブジェクトにはヘッダーだけでなく、リスト内の次のオブジェクトへのポインタ(通常はそれぞれ 8 バイト)もあります。- プリミティブ型の kolekcji は、多くの場合、
java.lang.Integer
などの「ボックス化」されたオブジェクトとして格納されます。
このセクションでは、まず Spark のメモリ管理の概要を説明し、次に、アプリケーションでメモリをより効率的に使用するためにユーザーが採用できる具体的な戦略について説明します。特に、オブジェクトのメモリ使用量を確認する方法と、データ構造を変更するか、データをシリアライズ形式で保存することで、メモリ使用量を改善する方法について説明します。次に、Spark のキャッシュサイズと Java ガベージコレクタのチューニングについて説明します。
メモリ管理の概要
Spark でのメモリ使用量は、主に実行とストレージの 2 つのカテゴリに分類されます。実行メモリは、シャッフル、結合、ソート、集計の計算に使用されるメモリを指し、ストレージメモリは、キャッシュとクラスタ全体の内部データの伝播に使用されるメモリを指します。Spark では、実行とストレージは統合された領域(M)を共有します。実行メモリが使用されていない場合、ストレージは利用可能なすべてのメモリを取得でき、その逆も可能です。実行は必要に応じてストレージをエビクトできますが、ストレージメモリの合計使用量が特定のしきい値(R)を下回るまでです。言い換えれば、R
は、キャッシュされたブロックがエビクトされない M
内のサブ領域を表します。実装の複雑さのため、ストレージは実行をエビクトできない場合があります。
この設計により、いくつかの望ましい特性が保証されます。まず、キャッシングを使用しないアプリケーションは、実行のためにスペース全体を使用できるため、不要なディスクスピルを回避できます。次に、キャッシングを使用するアプリケーションは、データブロックがエビクトされない最小ストレージスペース(R)を予約できます。最後に、このアプローチは、メモリが内部的にどのように分割されているかについてのユーザーの専門知識を必要とせずに、さまざまなワークロードに対して妥当なすぐに使えるパフォーマンスを提供します。
関連する設定は 2 つありますが、デフォルト値はほとんどのワークロードに適用できるため、通常のユーザーは調整する必要はありません。
spark.memory.fraction
は、M
のサイズを(JVM ヒープスペース - 300MiB)の割合として表します(デフォルトは 0.6)。残りのスペース(40%)は、ユーザーデータ構造、Spark の内部メタデータ、およびスパースで異常に大きなレコードの場合の OOM エラーを防ぐために予約されています。spark.memory.storageFraction
は、R
のサイズをM
の割合として表します(デフォルトは 0.5)。R
は、実行によってエビクトされないキャッシュされたブロックが格納されているM
内のストレージスペースです。
spark.memory.fraction
の値は、この量のヒープスペースが JVM の古い世代または「テニュア」世代に快適に収まるように設定する必要があります。詳細については、以下の高度な GC チューニングの説明を参照してください。
メモリ消費量の確認
データセットに必要なメモリ消費量をサイジングする最良の方法は、RDD を作成し、キャッシュに配置し、Web UI の「ストレージ」ページを確認することです。ページには、RDD が占有しているメモリの量が示されます。
特定のオブジェクトのメモリ消費量を推定するには、SizeEstimator
の estimate
メソッドを使用します。これは、メモリ使用量を削減するためにさまざまなデータレイアウトを試したり、ブロードキャスト変数が各 executor ヒープで占有するスペースの量を決定したりするのに役立ちます。
データ構造のチューニング
メモリ消費量を削減する最初の方法は、ポインタベースのデータ構造やラッパーオブジェクトなど、オーバーヘッドを追加する Java の機能を回避することです。これを行うには、いくつかの方法があります。
- 標準の Java または Scala コレクションクラス(例:
HashMap
)ではなく、オブジェクトの配列とプリミティブ型を優先するようにデータ構造を設計します。fastutil ライブラリは、Java 標準ライブラリと互換性のあるプリミティブ型に便利なコレクションクラスを提供します。 - 可能であれば、小さなオブジェクトとポインタを多数含む入れ子構造は避けてください。
- キーには、文字列ではなく数値 ID または列挙オブジェクトを使用することを検討してください。
- 32 GiB 未満の RAM がある場合は、JVM フラグ
-XX:+UseCompressedOops
を設定して、ポインタを 8 バイトではなく 4 バイトにします。これらのオプションは、spark-env.sh
に追加できます。
シリアライズされた RDD ストレージ
このチューニングを行ってもオブジェクトが大きすぎて効率的に保存できない場合は、RDD 永続化 API のシリアライズされた StorageLevels(MEMORY_ONLY_SER
など)を使用して、*シリアライズされた* 形式で保存することで、メモリ使用量を削減する方がはるかに簡単です。Spark は、各 RDD パーティションを 1 つの大きなバイト配列として格納します。データをシリアライズ形式で保存することの唯一の欠点は、各オブジェクトをオンザフライでデシリアライズする必要があるため、アクセス時間が遅くなることです。データをシリアライズ形式でキャッシュする場合は、Kryo を使用することを強くお勧めします。これは、Java シリアライゼーション(そしてもちろん生の Java オブジェクト)よりもはるかに小さいサイズになるためです。
ガベージコレクションのチューニング
プログラムによって保存されるRDDの観点から、大量の「チャーン」がある場合、JVMのガベージコレクションが問題になる可能性があります。(通常、RDDを1回だけ読み込んでから、多くの操作を実行するプログラムでは問題になりません。)Javaが新しいオブジェクトのためのスペースを確保するために古いオブジェクトを削除する必要がある場合、すべてのJavaオブジェクトをトレースして、未使用のオブジェクトを見つける必要があります。ここで覚えておくべき重要な点は、*ガベージコレクションのコストはJavaオブジェクトの数に比例する*ということです。そのため、オブジェクトの数が少ないデータ構造(たとえば、`LinkedList`ではなく`Int`の配列)を使用すると、このコストが大幅に削減されます。さらに良い方法は、上記のように、オブジェクトをシリアル化された形式で永続化することです。これで、RDDパーティションごとに*1つ*のオブジェクト(バイト配列)しか存在しなくなります。GCが問題になる場合、他の手法を試す前に、最初に試すべきことは、シリアル化されたキャッシングを使用することです。
GCは、タスクのワーキングメモリ(タスクの実行に必要なスペースの量)とノードにキャッシュされたRDDとの干渉によっても問題になる可能性があります。これを軽減するために、RDDキャッシュに割り当てられるスペースを制御する方法について説明します。
GCの影響の測定
GCチューニングの最初のステップは、ガベージコレクションが発生する頻度とGCに費やされる時間に関する統計情報を収集することです。これは、Javaオプションに`-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps`を追加することで実行できます。(SparkジョブにJavaオプションを渡す方法については、設定ガイドを参照してください。)次回Sparkジョブを実行すると、ガベージコレクションが発生するたびに、ワーカーのログにメッセージが出力されます。これらのログは、ドライバプログラムではなく、クラスタのワーカーノード(作業ディレクトリの`stdout`ファイル内)にあることに注意してください。
高度なGCチューニング
ガベージコレクションをさらに調整するには、まずJVMのメモリ管理に関する基本的な情報を理解する必要があります。
-
Javaヒープスペースは、YoungとOldの2つの領域に分割されています。Young世代は短命のオブジェクトを保持することを目的としており、Old世代は寿命の長いオブジェクトを対象としています。
-
Young世代は、さらに3つの領域[Eden、Survivor1、Survivor2]に分割されます。
-
ガベージコレクション手順の簡略化された説明:Edenがいっぱいになると、EdenでマイナーGCが実行され、EdenとSurvivor1から生きているオブジェクトがSurvivor2にコピーされます。Survivor領域は交換されます。オブジェクトが十分に古いか、Survivor2がいっぱいになると、Oldに移動されます。最後に、Oldがいっぱいになると、フルGCが呼び出されます。
SparkにおけるGCチューニングの目標は、寿命の長いRDDのみがOld世代に格納され、Young世代がタスク実行中に作成された一時オブジェクトを収集するためのフルGCを回避するのに十分なサイズであることを保証することです。役立つ可能性のある手順を以下に示します。
-
GC統計を収集することにより、ガベージコレクションが多すぎるかどうかを確認します。タスクが完了する前にフルGCが複数回呼び出された場合、タスクを実行するための十分なメモリが利用できないことを意味します。
-
マイナーコレクションが多すぎても、メジャーGCがあまりない場合は、Edenにより多くのメモリを割り当てると役立ちます。Edenのサイズを、各タスクに必要なメモリの過大評価に設定できます。Edenのサイズが`E`と決定された場合、オプション`-Xmn=4/3*E`を使用してYoung世代のサイズを設定できます。(4/3によるスケールアップは、サバイバー領域で使用されるスペースも考慮するためです。)
-
出力されるGC統計で、OldGenがいっぱいになりそうな場合は、`spark.memory.fraction`を下げてキャッシングに使用されるメモリ量を減らします。タスクの実行速度を低下させるよりも、オブジェクトのキャッシュ数を減らす方が適切です。または、Young世代のサイズを小さくすることを検討してください。これは、上記のように`-Xmn`を設定した場合、`-Xmn`を下げることを意味します。そうでない場合は、JVMの`NewRatio`パラメータの値を変更してみてください。多くのJVMでは、これはデフォルトで2に設定されています。つまり、Old世代はヒープの2/3を占めます。この割合が`spark.memory.fraction`を超えるように、十分に大きくする必要があります。
-
`-XX:+UseG1GC`を使用してG1GCガベージコレクタを試してください。ガベージコレクションがボトルネックになっている状況では、パフォーマンスが向上する可能性があります。エグゼキュータのヒープサイズが大きい場合は、`-XX:G1HeapRegionSize`を使用してG1リージョンサイズを増やすことが重要になる場合があります。
-
たとえば、タスクがHDFSからデータを読み取っている場合、タスクで使用されるメモリ量は、HDFSから読み取られたデータブロックのサイズを使用して推定できます。解凍されたブロックのサイズは、多くの場合、ブロックのサイズの2倍または3倍であることに注意してください。そのため、3つまたは4つのタスクの作業スペースを用意し、HDFSブロックサイズが128 MiBの場合、Edenのサイズを`4*3*128MiB`と推定できます。
-
新しい設定で、ガベージコレクションの頻度と所要時間がどのように変化するかを監視します。
経験から、GCチューニングの効果はアプリケーションと使用可能なメモリ量によって異なります。オンラインで説明されているさらに多くのチューニングオプションがありますが、高いレベルでは、フルGCが実行される頻度を管理することで、オーバーヘッドを削減できます。
エグゼキュータのGCチューニングフラグは、ジョブの構成で`spark.executor.defaultJavaOptions`または`spark.executor.extraJavaOptions`を設定することで指定できます。
その他の考慮事項
並列度
各操作の並列度を十分に高く設定しない限り、クラスタは完全に活用されません。Sparkは、ファイルサイズに応じて各ファイルで実行する「マップ」タスクの数を自動的に設定します(ただし、`SparkContext.textFile`などのオプションパラメータを使用して制御できます)。`groupByKey`や`reduceByKey`などの分散「reduce」操作では、親RDDの中で最大のパーティション数を使用します。並列度を2番目の引数として渡すことができます(`spark.PairRDDFunctions`のドキュメントを参照してください)、または設定プロパティ`spark.default.parallelism`を設定してデフォルトを変更できます。一般に、クラスタのCPUコアごとに2〜3タスクを推奨します。
入力パスの並列リスト
ジョブ入力が多数のディレクトリを持っている場合、ディレクトリリストの並列処理を増やす必要がある場合があります。そうしないと、特にS3のようなオブジェクトストアに対して、プロセスに非常に時間がかかる可能性があります。ジョブがHadoop入力形式のRDDで動作する場合(たとえば、`SparkContext.sequenceFile`経由)、並列処理は`spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads`によって制御されます(現在のデフォルトは1です)。
ファイルベースのデータソースを使用するSpark SQLの場合、`spark.sql.sources.parallelPartitionDiscovery.threshold`と`spark.sql.sources.parallelPartitionDiscovery.parallelism`を調整して、リストの並列処理を改善できます。詳細については、Spark SQLパフォーマンチューニングガイドを参照してください。
Reduce タスクのメモリ使用量
RDDがメモリに収まらないためではなく、`groupByKey`のreduceタスクの1つなど、タスクのワーキングセットが大きすぎたために、OutOfMemoryErrorが発生することがあります。Sparkのシャッフル操作(`sortByKey`、`groupByKey`、`reduceByKey`、`join`など)は、各タスク内にハッシュテーブルを作成してグループ化を実行しますが、これは多くの場合大きくなる可能性があります。ここで最も簡単な解決策は、*並列度を高める*ことです。そうすれば、各タスクの入力セットが小さくなります。Sparkは、1つのエグゼキュータJVMを多くのタスクで再利用し、タスク起動コストが低いため、200ミリ秒程度の短いタスクを効率的にサポートできます。そのため、クラスタのコア数よりも多くの並列度を安全に増やすことができます。
大きな変数のブロードキャスト
`SparkContext`で利用可能なブロードキャスト機能を使用すると、シリアル化された各タスクのサイズと、クラスタ上でジョブを起動するコストを大幅に削減できます。タスクが内部でドライバプログラムの大きなオブジェクト(静的ルックアップテーブルなど)を使用している場合は、それをブロードキャスト変数に変換することを検討してください。Sparkはマスターに各タスクのシリアル化されたサイズを出力するため、それを見てタスクが大きすぎるかどうかを判断できます。一般に、約20 KiBを超えるタスクは最適化する価値があります。
データローカリティ
データの局所性は、Sparkジョブのパフォーマンスに大きな影響を与える可能性があります。データとそれを操作するコードが一緒になっている場合、計算は高速になる傾向があります。ただし、コードとデータが分離されている場合は、一方が他方に移動する必要があります。通常、コードサイズはデータよりもはるかに小さいため、シリアル化されたコードを場所から場所へ移動する方が、データのチャンクを移動するよりも高速です。Sparkは、このデータの局所性に関する一般的な原則に基づいてスケジューリングを構築します。
データの局所性とは、データを処理するコードまでの距離のことです。データの現在の場所に基づいて、いくつかのレベルの局所性があります。最も近いものから最も遠いものまで、順番に
- `PROCESS_LOCAL`データは、実行中のコードと同じJVMにあります。これは可能な限り最高の局所性です。
- `NODE_LOCAL`データは同じノードにあります。たとえば、同じノードのHDFS、または同じノードの別のエグゼキュータにある場合があります。データはプロセス間を移動する必要があるため、これは`PROCESS_LOCAL`よりも少し遅くなります。
- `NO_PREF`データは、どこからでも同じように高速にアクセスでき、局所性の設定がありません。
- `RACK_LOCAL`データは、同じサーバーラックにあります。データは同じラック上の別のサーバーにあるため、通常は単一のスイッチを介してネットワーク経由で送信する必要があります。
- `ANY`データは、ネットワーク上の他の場所にあり、同じラックにはありません。
Sparkは、すべてのタスクを最適な局所性レベルでスケジュールすることを優先しますが、これは常に可能とは限りません。アイドル状態のエグゼキュータに未処理データがない状況では、Sparkはより低い局所性レベルに切り替えます。この場合、2つの選択肢があります。 a) 同じサーバー上のデータでタスクを開始するために、ビジー状態のCPUが解放されるまで待機する、または b) データの移動が必要となる、より遠隔地で新しいタスクをすぐに開始する。
Sparkは通常、ビジー状態のCPUが解放されることを期待して少し待機します。そのタイムアウトが期限切れになると、遠くから空いているCPUにデータの移動を開始します。各レベル間のフォールバックの待機タイムアウトは、個別に、またはすべてまとめて1つのパラメータで設定できます。詳細は、設定ページのspark.locality
パラメータを参照してください。タスクの実行時間が長く、局所性が低い場合は、これらの設定値を増やす必要がありますが、通常はデフォルト値で問題ありません。
まとめ
これは、Sparkアプリケーションをチューニングする際に知っておくべき主な懸念事項、特にデータのシリアライゼーションとメモリのチューニングについて指摘するための簡単なガイドです。ほとんどのプログラムでは、Kryoシリアライゼーションに切り替え、データをシリアライズされた形式で永続化することで、一般的なパフォーマンスの問題のほとんどが解決します。その他のチューニングのベストプラクティスについては、Sparkメーリングリストでお気軽にお問い合わせください。