Spark のチューニング
ほとんどの Spark 計算はインメモリで行われるため、Spark プログラムはクラスター内のどのリソース (CPU、ネットワーク帯域幅、メモリ) でもボトルネックになる可能性があります。多くの場合、データがメモリに収まる場合、ボトルネックはネットワーク帯域幅ですが、メモリ使用量を減らすために、RDD をシリアライズ形式で保存するなどのチューニングが必要になることもあります。このガイドでは、主に 2 つのトピック、つまりネットワークパフォーマンスに不可欠でメモリ使用量も削減できるデータシリアライゼーションとメモリチューニングについて説明します。また、いくつかの小さなトピックも概説します。
データシリアライゼーション
シリアライゼーションは、あらゆる分散アプリケーションのパフォーマンスにおいて重要な役割を果たします。オブジェクトをシリアライズするのが遅い形式や、大量のバイトを消費する形式は、計算を大幅に遅くします。多くの場合、Spark アプリケーションを最適化するために最初にチューニングすべきはこれです。Spark は、利便性 (操作で任意の Java 型を使用できる) とパフォーマンスのバランスを取ることを目指しています。2 つのシリアライゼーションライブラリを提供しています。
- Java シリアライゼーション: デフォルトでは、Spark は Java の
ObjectOutputStreamフレームワークを使用してオブジェクトをシリアライズし、java.io.Serializableを実装する作成した任意のクラスで動作します。また、java.io.Externalizableを拡張することで、シリアライゼーションのパフォーマンスをより詳細に制御できます。Java シリアライゼーションは柔軟ですが、多くの場合非常に遅く、多くのクラスで大きなシリアライズ形式になります。 - Kryo シリアライゼーション: Spark は Kryo ライブラリ (バージョン 4) を使用して、オブジェクトをより高速にシリアライズすることもできます。Kryo は Java シリアライゼーションよりも大幅に高速でコンパクト (多くの場合 10 倍) ですが、すべての
Serializable型をサポートしているわけではなく、最高のパフォーマンスを得るには、プログラムで使用するクラスを事前に登録する必要があります。
Kryo を使用するには、SparkConf でジョブを初期化し、conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") を呼び出します。この設定は、ワーカーノード間でデータをシャッフルする際に使用されるシリアライザだけでなく、RDD をディスクにシリアライズする際にも使用されるシリアライザを設定します。Kryo がデフォルトではない唯一の理由は、カスタム登録要件があるためですが、ネットワーク集約型のアプリケーションでは試してみることをお勧めします。Spark 2.0.0 以降、単純な型、単純な型の配列、または文字列型の RDD をシャッフルする際には、内部的に Kryo シリアライザを使用しています。
Spark は、Twitter chill ライブラリの AllScalaRegistrar でカバーされている、多くの一般的に使用されるコア Scala クラスの Kryo シリアライザを自動的に含んでいます。
カスタムクラスを Kryo に登録するには、registerKryoClasses メソッドを使用します。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)Kryo ドキュメントでは、カスタムシリアライゼーションコードの追加など、より高度な登録オプションについて説明しています。
オブジェクトが大きい場合は、設定の spark.kryoserializer.buffer も増やす必要がある場合があります。この値は、シリアライズする最も大きいオブジェクトを保持するのに十分な大きさである必要があります。
最後に、カスタムクラスを登録しない場合でも Kryo は機能しますが、各オブジェクトとともに完全なクラス名を保存する必要があり、これは無駄です。
メモリチューニング
メモリ使用量をチューニングする際には、3 つの考慮事項があります。オブジェクトが使用するメモリの量 (データセット全体をメモリに収めたい場合があります)、それらのオブジェクトにアクセスするコスト、およびガベージコレクションのオーバーヘッド (オブジェクトの入れ替わりが激しい場合) です。
デフォルトでは、Java オブジェクトは高速にアクセスできますが、フィールド内の「生の」データよりも 2〜5 倍多くスペースを消費する可能性があります。これはいくつかの理由によるものです。
- 各 Java オブジェクトには「オブジェクトヘッダー」があり、約 16 バイトで、クラスへのポインタなどの情報が含まれています。データが非常に少ないオブジェクト (たとえば 1 つの
Intフィールド) の場合、これはデータよりも大きくなる可能性があります。 - Java の
Stringは、生の文字列データよりも約 40 バイトのオーバーヘッドがあります (文字の配列に保存し、長さなどの追加データを保持するため)。また、Stringが UTF-16 エンコーディングを内部的に使用しているため、各文字は2 バイトで保存されます。したがって、10 文字の文字列は簡単に 60 バイトを消費する可能性があります。 HashMapやLinkedListなどの一般的なコレクションクラスは、リンクリスト構造を使用しており、各エントリに「ラッパー」オブジェクト (例:Map.Entry) があります。このオブジェクトは、ヘッダーだけでなく、リスト内の次のオブジェクトへのポインタ (通常は各 8 バイト) も保持しています。- プリミティブ型のコレクションは、多くの場合、
java.lang.Integerのような「ボクシングされた」オブジェクトとして保存されます。
このセクションでは、Spark のメモリ管理の概要から始め、次にユーザーがアプリケーションでメモリをより効率的に使用するために取ることができる具体的な戦略について説明します。特に、オブジェクトのメモリ使用量を特定する方法、およびデータ構造を変更するか、シリアライズ形式でデータを保存することによってメモリ使用量を改善する方法について説明します。その後、Spark のキャッシュサイズと Java ガベージコレクタのチューニングについて説明します。
メモリ管理の概要
Spark のメモリ使用量は、大きく分けて実行メモリとストレージメモリの 2 つのカテゴリに分類されます。実行メモリとは、シャッフル、結合、ソート、集計の計算に使用されるメモリを指し、ストレージメモリとは、キャッシュとクラスター全体での内部データ伝播に使用されるメモリを指します。Spark では、実行メモリとストレージメモリは統一された領域 (M) を共有します。実行メモリが使用されていない場合、ストレージは利用可能なすべてのメモリを取得でき、その逆も同様です。実行は必要に応じてストレージをエビクトできますが、ストレージメモリの使用量が特定のしきい値 (R) を下回るまでです。言い換えると、R は、キャッシュされたブロックが決してエビクトされない M 内のサブ領域を表します。実装の複雑さから、ストレージは実行をエビクトできません。
この設計により、いくつかの望ましい特性が保証されます。第一に、キャッシュを使用しないアプリケーションは、全体的なスペースを実行のために使用できるため、不要なディスクスピルを回避できます。第二に、キャッシュを使用するアプリケーションは、データブロックがエビクトされない最小限のストレージスペース (R) を確保できます。最後に、このアプローチは、メモリが内部的にどのように分割されているかについてのユーザーの専門知識を必要とせずに、さまざまなワークロードに対して妥当なデフォルトパフォーマンスを提供します。
関連する構成は 2 つありますが、ほとんどのワークロードにデフォルト値が適用できるため、通常のユーザーが調整する必要はありません。
spark.memory.fractionは、(JVM ヒープスペース - 300MiB) に対する M のサイズを分数で表します (デフォルトは 0.6)。残りのスペース (40%) は、ユーザーデータ構造、Spark の内部メタデータ、およびスパースで異常に大きなレコードの場合の OOM エラーに対する保護のために予約されています。spark.memory.storageFractionは、M に対する R のサイズを分数で表します (デフォルトは 0.5)。R は、実行によってエビクトされないキャッシュされたブロックの M 内のストレージスペースです。
spark.memory.fraction の値は、このヒープスペースの量を JVM の古い世代または「tenured」世代に快適に収まるように設定する必要があります。詳細については、後述の高度な GC チューニングのセクションを参照してください。
メモリ使用量の特定
データセットが必要とするメモリ使用量をサイジングする最良の方法は、RDD を作成し、キャッシュに入れ、「Storage」ページを Web UI で確認することです。このページには、RDD が占有しているメモリ量が示されます。
特定のオブジェクトのメモリ使用量を推定するには、SizeEstimator の estimate メソッドを使用します。これは、メモリ使用量を削減するためにさまざまなデータレイアウトを試したり、ブロードキャスト変数が各エグゼキュータヒープに占めるスペースの量を決定したりするのに役立ちます。
データ構造のチューニング
メモリ使用量を削減する最初の方法は、ポインタベースのデータ構造やラッパーオブジェクトなどのオーバーヘッドを追加する Java の機能を回避することです。これにはいくつかの方法があります。
- 標準の Java または Scala コレクションクラス (例:
HashMap) の代わりに、オブジェクトの配列とプリミティブ型を優先するようにデータ構造を設計します。fastutil ライブラリは、Java 標準ライブラリと互換性のあるプリミティブ型の便利なコレクションクラスを提供します。 - 可能な限り、多くの小さなオブジェクトとポインタを持つネストされた構造を避けてください。
- キーとして文字列の代わりに数値 ID または列挙オブジェクトの使用を検討してください。
- RAM が 32 GiB 未満の場合は、JVM フラグ
-XX:+UseCompressedOopsを設定して、ポインタを 8 バイトではなく 4 バイトにします。これらのオプションはspark-env.shに追加できます。
シリアライズされた RDD ストレージ
オブジェクトがこのチューニングにもかかわらず効率的に保存するには大きすぎる場合は、メモリ使用量を削減するさらに簡単な方法は、RDD persist API のシリアライズされた StorageLevels (例: MEMORY_ONLY_SER) を使用して、シリアライズされた形式で保存することです。Spark は各 RDD パーティションを 1 つの大きなバイト配列として保存します。シリアライズされた形式でデータを保存する唯一の欠点は、オブジェクトをその場でデシリアライズする必要があるため、アクセス時間が遅くなることです。シリアライズされた形式でデータをキャッシュしたい場合は、Kryo を使用することを強くお勧めします。Java シリアライゼーション (そしてもちろん生の Java オブジェクト) よりもはるかに小さいサイズになります。
ガベージコレクションのチューニング
プログラムによって保存される RDD の「チャーン」(入れ替わり) が大きい場合、JVM ガベージコレクションが問題になることがあります。(通常、RDD を一度読み込んでから多くの操作を実行するプログラムでは問題になりません。) Java が新しいオブジェクトのスペースを空けるために古いオブジェクトをエビクトする必要がある場合、すべての Java オブジェクトをトレースして未使用のオブジェクトを見つける必要があります。ここで覚えておくべき主な点は、ガベージコレクションのコストは Java オブジェクトの数に比例するということです。したがって、オブジェクトの少ないデータ構造 (例: LinkedList の代わりに Int の配列) を使用すると、このコストが大幅に削減されます。さらに良い方法は、前述のようにオブジェクトをシリアライズされた形式で保存することです。この場合、RDD パーティションあたり1 つのオブジェクト (バイト配列) しかありません。他のテクニックを試す前に、GC が問題である場合の最初の試みは、シリアライズされたキャッシュを使用することです。
GC は、タスクのワーキングメモリ (タスクを実行するために必要なスペースの量) とノードにキャッシュされた RDD との干渉によっても問題になることがあります。これを軽減するために、RDD キャッシュに割り当てられるスペースを制御する方法について説明します。
GC の影響の測定
GC チューニングの最初のステップは、ガベージコレクションがどれくらいの頻度で発生するか、および GC に費やされた時間に関する統計情報を収集することです。これは、Java オプションに -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps を追加することで実現できます。(Spark ジョブに Java オプションを渡す方法については、構成ガイドを参照してください。) 次回 Spark ジョブを実行すると、ガベージコレクションが発生するたびにワーカーのログにメッセージが表示されます。これらのログは、ドライバープログラムではなく、クラスターのワーカーノード (作業ディレクトリの stdout ファイル) に表示されることに注意してください。
高度な GC チューニング
ガベージコレクションをさらにチューニングするには、まず JVM のメモリ管理に関する基本的な情報を理解する必要があります。
-
Java ヒープスペースは、Young および Old の 2 つの領域に分割されます。Young ジェネレーションは短寿命のオブジェクトを保持することを目的としており、Old ジェネレーションは寿命の長いオブジェクトを対象としています。
-
Young ジェネレーションは、さらに 3 つの領域 (Eden、Survivor1、Survivor2) に分割されます。
-
ガベージコレクション手順の単純化された説明: Eden が満杯になると、Eden でマイナー GC が実行され、Eden と Survivor1 から生存しているオブジェクトが Survivor2 にコピーされます。Survivor 領域はスワップされます。オブジェクトが十分に古いか、Survivor2 が満杯になると、Old に移動されます。最後に、Old が満杯に近くなると、フル GC が呼び出されます。
Spark における GC チューニングの目標は、寿命の長い RDD のみが Old ジェネレーションに保存され、Young ジェネレーションが短寿命のオブジェクトを保存するのに十分なサイズであることを保証することです。これにより、タスク実行中に作成された一時オブジェクトを収集するためのフル GC を回避できます。役立つ可能性のある手順は次のとおりです。
-
GC の統計情報を収集して、ガベージコレクションが多すぎないか確認します。タスクが完了する前にフル GC が複数回呼び出される場合、タスクを実行するためのメモリが不足していることを意味します。
-
マイナーコレクションが多すぎてメジャー GC が少ない場合は、Eden により多くのメモリを割り当てることが役立ちます。Eden のサイズは、各タスクが必要とするメモリ量の過大評価として設定できます。Eden のサイズが E と決定された場合、Young ジェネレーションのサイズは -Xmn=4/3*E オプションを使用して設定できます (4/3 のスケールアップは、Survivor 領域によって使用されるスペースも考慮するためです)。
-
印刷される GC 統計情報で、OldGen が満杯に近い場合は、
spark.memory.fractionを下げることでキャッシュに使用されるメモリ量を減らします。タスクの実行を遅くするよりも、キャッシュするオブジェクトを少なくする方が良いです。または、Young ジェネレーションのサイズを小さくすることを検討してください。これは、上記のように設定した場合は -Xmn の値を下げることを意味します。そうでない場合は、JVM の NewRatio パラメータの値を変更してみてください。多くの JVM では、デフォルトは 2 であり、Old ジェネレーションがヒープの 2/3 を占めることを意味します。この割合が spark.memory.fraction を超えるのに十分な大きさである必要があります。 -
4.0.0 以降、Spark はデフォルトで JDK 17 を使用しており、G1GC ガベージコレクタもデフォルトになっています。エグゼキュータのヒープサイズが大きい場合、G1GC 領域サイズを -XX:G1HeapRegionSize で増やすことが重要になる場合があります。
-
例として、タスクが HDFS からデータを読み取っている場合、タスクによって使用されるメモリ量は、HDFS から読み取られたデータブロックのサイズを使用して推定できます。非圧縮ブロックのサイズは、ブロックサイズの 2〜3 倍になることがよくあります。したがって、3〜4 タスク分のワーキングスペースが必要であり、HDFS ブロックサイズが 128 MiB の場合、Eden のサイズを 4*3*128MiB と推定できます。
-
新しい設定でガベージコレクションの頻度と所要時間がどのように変化するかを監視します。
私たちの経験では、GC チューニングの効果はアプリケーションと利用可能なメモリ量に依存します。オンラインで説明されているチューニングオプションは他にもたくさんありますが、大まかに言うと、フル GC が発生する頻度を管理することでオーバーヘッドを削減できます。
エグゼキュータの GC チューニングフラグは、ジョブの構成で spark.executor.defaultJavaOptions または spark.executor.extraJavaOptions を設定することで指定できます。
その他の考慮事項
並列度
各操作の並列度を十分に高く設定しないと、クラスターは完全に利用されません。Spark は、ファイルサイズに応じて各ファイルで実行される「マップ」タスクの数を自動的に設定します (ただし、SparkContext.textFile などのオプション パラメータで制御できます)。また、分散「リデュース」操作 (例: groupByKey および reduceByKey) の場合、最も大きい親 RDD のパーティション数を使用します。並列度を 2 番目の引数として渡す (例: spark.PairRDDFunctions ドキュメントを参照) か、設定プロパティ spark.default.parallelism を設定してデフォルトを変更できます。一般的に、クラスターの CPU コアあたり 2〜3 タスクを推奨します。
入力パスでの並列リスト表示
場合によっては、ジョブ入力に多数のディレクトリがある場合にディレクトリリスト表示の並列度を増やす必要があることもあります。そうしないと、特に S3 のようなオブジェクトストアに対して、プロセスに非常に時間がかかる可能性があります。ジョブが RDD と Hadoop 入力フォーマット (例: SparkContext.sequenceFile を介して) で動作する場合、並列度は spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads (現在デフォルトは 1) によって制御されます。
Spark SQL のファイルベースのデータソースの場合、リスト表示の並列度を向上させるために spark.sql.sources.parallelPartitionDiscovery.threshold および spark.sql.sources.parallelPartitionDiscovery.parallelism をチューニングできます。詳細については、Spark SQL パフォーマンスチューニングガイドを参照してください。
Reduce タスクのメモリ使用量
場合によっては、RDD がメモリに収まらないためではなく、タスクの 1 つのワーキングセット (たとえば groupByKey のリデュースタスクの 1 つ) が大きすぎたために OutOfMemoryError が発生することがあります。Spark のシャッフル操作 (sortByKey、groupByKey、reduceByKey、join など) は、各タスク内でグループ化を実行するためのハッシュテーブルを構築しており、これはしばしば大きくなる可能性があります。ここでの最も簡単な解決策は、並列度を増やすことです。これにより、各タスクの入力セットが小さくなります。Spark は、1 つのエグゼキュータ JVM を多くのタスクで再利用し、タスク起動コストが低いため、200 ms 程度の短いタスクでも効率的にサポートできます。そのため、クラスターのコア数よりも多い並列度を安全に増やすことができます。
大規模変数のブロードキャスト
SparkContext で利用可能なブロードキャスト機能を使用すると、各シリアライズされたタスクのサイズと、クラスター全体でジョブを起動するコストを大幅に削減できます。タスクがドライバープログラムから (たとえば静的なルックアップテーブルなど) 大きなオブジェクトを使用している場合は、それをブロードキャスト変数に変換することを検討してください。Spark はマスターに各タスクのシリアライズされたサイズを出力するため、タスクが大きすぎるかどうかを判断するためにそれを確認できます。一般的に、約 20 KiB より大きいタスクは最適化する価値があるでしょう。
データローカリティ
データローカリティは、Spark ジョブのパフォーマンスに大きな影響を与える可能性があります。データとそれを処理するコードが一緒であれば、計算は高速になる傾向があります。しかし、コードとデータが分離している場合、一方が他方に移動する必要があります。通常、コードサイズはデータチャンクよりもはるかに小さいため、シリアライズされたコードを場所から場所へ移動する方が、データのチャンクを移動するよりも高速です。Spark は、この一般的なデータローカリティの原則に基づいてスケジューリングを構築します。
データローカリティとは、データがそれを処理するコードに近い度合いです。データの現在の場所に基づいて、いくつかのレベルのローカリティがあります。最も近いものから最も遠いものへ順に
PROCESS_LOCALデータは、実行中のコードと同じ JVM 内にあります。これは可能な限り最高のローカリティです。NODE_LOCALデータは同じノード上にあります。例としては、同じノード上の HDFS、または同じノード上の別のエグゼキュータが考えられます。データはプロセス間を移動する必要があるため、PROCESS_LOCALよりも少し遅くなります。NO_PREFデータはどこからでも等しく速くアクセスでき、ローカリティの優先順位はありません。RACK_LOCALデータは同じサーバーラック上にあります。データは同じラック上の別のサーバーにあるため、ネットワーク経由で、通常は単一のスイッチを経由して送信する必要があります。ANYデータはネットワーク上の別の場所にあり、同じラックにはありません。
Spark は、すべてのタスクを最高のローカリティレベルでスケジュールすることを好みますが、常に可能とは限りません。アイドル状態のエグゼキュータに未処理のデータがない状況では、Spark はより低いローカリティレベルに切り替わります。2 つのオプションがあります。a) 同じサーバー上のデータでタスクを開始するためにビジーな CPU が解放されるのを待つ、または b) より遠い場所で新しいタスクをすぐに開始し、そこにデータを移動する。
Spark が通常行うことは、ビジーな CPU が解放されることを期待して少し待つことです。そのタイムアウトが切れると、遠くから空いている CPU にデータを移動し始めます。各レベル間のフォールバックの待機タイムアウトは、個別に、またはすべてまとめて 1 つのパラメータで設定できます。詳細については、構成ページの spark.locality パラメータを参照してください。タスクが長時間実行されており、ローカリティが低い場合は、これらの設定を増やす必要がありますが、デフォルトは通常うまく機能します。
まとめ
これは、Spark アプリケーションのチューニングで知っておくべき主な懸念事項、特にデータシリアライゼーションとメモリチューニングを指摘するための短いガイドでした。ほとんどのプログラムでは、Kryo シリアライゼーションに切り替え、データをシリアライズされた形式で保存することで、一般的なパフォーマンスの問題のほとんどが解決されます。その他のチューニングのベストプラクティスについては、Spark メーリングリストでお気軽にお問い合わせください。