Spark のチューニング

ほとんどの Spark 計算はインメモリで行われるため、Spark プログラムはクラスター内のどのリソース (CPU、ネットワーク帯域幅、メモリ) でもボトルネックになる可能性があります。多くの場合、データがメモリに収まる場合、ボトルネックはネットワーク帯域幅ですが、メモリ使用量を減らすために、RDD をシリアライズ形式で保存するなどのチューニングが必要になることもあります。このガイドでは、主に 2 つのトピック、つまりネットワークパフォーマンスに不可欠でメモリ使用量も削減できるデータシリアライゼーションとメモリチューニングについて説明します。また、いくつかの小さなトピックも概説します。

データシリアライゼーション

シリアライゼーションは、あらゆる分散アプリケーションのパフォーマンスにおいて重要な役割を果たします。オブジェクトをシリアライズするのが遅い形式や、大量のバイトを消費する形式は、計算を大幅に遅くします。多くの場合、Spark アプリケーションを最適化するために最初にチューニングすべきはこれです。Spark は、利便性 (操作で任意の Java 型を使用できる) とパフォーマンスのバランスを取ることを目指しています。2 つのシリアライゼーションライブラリを提供しています。

Kryo を使用するには、SparkConf でジョブを初期化し、conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") を呼び出します。この設定は、ワーカーノード間でデータをシャッフルする際に使用されるシリアライザだけでなく、RDD をディスクにシリアライズする際にも使用されるシリアライザを設定します。Kryo がデフォルトではない唯一の理由は、カスタム登録要件があるためですが、ネットワーク集約型のアプリケーションでは試してみることをお勧めします。Spark 2.0.0 以降、単純な型、単純な型の配列、または文字列型の RDD をシャッフルする際には、内部的に Kryo シリアライザを使用しています。

Spark は、Twitter chill ライブラリの AllScalaRegistrar でカバーされている、多くの一般的に使用されるコア Scala クラスの Kryo シリアライザを自動的に含んでいます。

カスタムクラスを Kryo に登録するには、registerKryoClasses メソッドを使用します。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo ドキュメントでは、カスタムシリアライゼーションコードの追加など、より高度な登録オプションについて説明しています。

オブジェクトが大きい場合は、設定spark.kryoserializer.buffer も増やす必要がある場合があります。この値は、シリアライズする最も大きいオブジェクトを保持するのに十分な大きさである必要があります。

最後に、カスタムクラスを登録しない場合でも Kryo は機能しますが、各オブジェクトとともに完全なクラス名を保存する必要があり、これは無駄です。

メモリチューニング

メモリ使用量をチューニングする際には、3 つの考慮事項があります。オブジェクトが使用するメモリの (データセット全体をメモリに収めたい場合があります)、それらのオブジェクトにアクセスするコスト、およびガベージコレクションのオーバーヘッド (オブジェクトの入れ替わりが激しい場合) です。

デフォルトでは、Java オブジェクトは高速にアクセスできますが、フィールド内の「生の」データよりも 2〜5 倍多くスペースを消費する可能性があります。これはいくつかの理由によるものです。

このセクションでは、Spark のメモリ管理の概要から始め、次にユーザーがアプリケーションでメモリをより効率的に使用するために取ることができる具体的な戦略について説明します。特に、オブジェクトのメモリ使用量を特定する方法、およびデータ構造を変更するか、シリアライズ形式でデータを保存することによってメモリ使用量を改善する方法について説明します。その後、Spark のキャッシュサイズと Java ガベージコレクタのチューニングについて説明します。

メモリ管理の概要

Spark のメモリ使用量は、大きく分けて実行メモリとストレージメモリの 2 つのカテゴリに分類されます。実行メモリとは、シャッフル、結合、ソート、集計の計算に使用されるメモリを指し、ストレージメモリとは、キャッシュとクラスター全体での内部データ伝播に使用されるメモリを指します。Spark では、実行メモリとストレージメモリは統一された領域 (M) を共有します。実行メモリが使用されていない場合、ストレージは利用可能なすべてのメモリを取得でき、その逆も同様です。実行は必要に応じてストレージをエビクトできますが、ストレージメモリの使用量が特定のしきい値 (R) を下回るまでです。言い換えると、R は、キャッシュされたブロックが決してエビクトされない M 内のサブ領域を表します。実装の複雑さから、ストレージは実行をエビクトできません。

この設計により、いくつかの望ましい特性が保証されます。第一に、キャッシュを使用しないアプリケーションは、全体的なスペースを実行のために使用できるため、不要なディスクスピルを回避できます。第二に、キャッシュを使用するアプリケーションは、データブロックがエビクトされない最小限のストレージスペース (R) を確保できます。最後に、このアプローチは、メモリが内部的にどのように分割されているかについてのユーザーの専門知識を必要とせずに、さまざまなワークロードに対して妥当なデフォルトパフォーマンスを提供します。

関連する構成は 2 つありますが、ほとんどのワークロードにデフォルト値が適用できるため、通常のユーザーが調整する必要はありません。

spark.memory.fraction の値は、このヒープスペースの量を JVM の古い世代または「tenured」世代に快適に収まるように設定する必要があります。詳細については、後述の高度な GC チューニングのセクションを参照してください。

メモリ使用量の特定

データセットが必要とするメモリ使用量をサイジングする最良の方法は、RDD を作成し、キャッシュに入れ、「Storage」ページを Web UI で確認することです。このページには、RDD が占有しているメモリ量が示されます。

特定のオブジェクトのメモリ使用量を推定するには、SizeEstimatorestimate メソッドを使用します。これは、メモリ使用量を削減するためにさまざまなデータレイアウトを試したり、ブロードキャスト変数が各エグゼキュータヒープに占めるスペースの量を決定したりするのに役立ちます。

データ構造のチューニング

メモリ使用量を削減する最初の方法は、ポインタベースのデータ構造やラッパーオブジェクトなどのオーバーヘッドを追加する Java の機能を回避することです。これにはいくつかの方法があります。

  1. 標準の Java または Scala コレクションクラス (例: HashMap) の代わりに、オブジェクトの配列とプリミティブ型を優先するようにデータ構造を設計します。fastutil ライブラリは、Java 標準ライブラリと互換性のあるプリミティブ型の便利なコレクションクラスを提供します。
  2. 可能な限り、多くの小さなオブジェクトとポインタを持つネストされた構造を避けてください。
  3. キーとして文字列の代わりに数値 ID または列挙オブジェクトの使用を検討してください。
  4. RAM が 32 GiB 未満の場合は、JVM フラグ -XX:+UseCompressedOops を設定して、ポインタを 8 バイトではなく 4 バイトにします。これらのオプションは spark-env.sh に追加できます。

シリアライズされた RDD ストレージ

オブジェクトがこのチューニングにもかかわらず効率的に保存するには大きすぎる場合は、メモリ使用量を削減するさらに簡単な方法は、RDD persist API のシリアライズされた StorageLevels (例: MEMORY_ONLY_SER) を使用して、シリアライズされた形式で保存することです。Spark は各 RDD パーティションを 1 つの大きなバイト配列として保存します。シリアライズされた形式でデータを保存する唯一の欠点は、オブジェクトをその場でデシリアライズする必要があるため、アクセス時間が遅くなることです。シリアライズされた形式でデータをキャッシュしたい場合は、Kryo を使用することを強くお勧めします。Java シリアライゼーション (そしてもちろん生の Java オブジェクト) よりもはるかに小さいサイズになります。

ガベージコレクションのチューニング

プログラムによって保存される RDD の「チャーン」(入れ替わり) が大きい場合、JVM ガベージコレクションが問題になることがあります。(通常、RDD を一度読み込んでから多くの操作を実行するプログラムでは問題になりません。) Java が新しいオブジェクトのスペースを空けるために古いオブジェクトをエビクトする必要がある場合、すべての Java オブジェクトをトレースして未使用のオブジェクトを見つける必要があります。ここで覚えておくべき主な点は、ガベージコレクションのコストは Java オブジェクトの数に比例するということです。したがって、オブジェクトの少ないデータ構造 (例: LinkedList の代わりに Int の配列) を使用すると、このコストが大幅に削減されます。さらに良い方法は、前述のようにオブジェクトをシリアライズされた形式で保存することです。この場合、RDD パーティションあたり1 つのオブジェクト (バイト配列) しかありません。他のテクニックを試す前に、GC が問題である場合の最初の試みは、シリアライズされたキャッシュを使用することです。

GC は、タスクのワーキングメモリ (タスクを実行するために必要なスペースの量) とノードにキャッシュされた RDD との干渉によっても問題になることがあります。これを軽減するために、RDD キャッシュに割り当てられるスペースを制御する方法について説明します。

GC の影響の測定

GC チューニングの最初のステップは、ガベージコレクションがどれくらいの頻度で発生するか、および GC に費やされた時間に関する統計情報を収集することです。これは、Java オプションに -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps を追加することで実現できます。(Spark ジョブに Java オプションを渡す方法については、構成ガイドを参照してください。) 次回 Spark ジョブを実行すると、ガベージコレクションが発生するたびにワーカーのログにメッセージが表示されます。これらのログは、ドライバープログラムではなく、クラスターのワーカーノード (作業ディレクトリの stdout ファイル) に表示されることに注意してください。

高度な GC チューニング

ガベージコレクションをさらにチューニングするには、まず JVM のメモリ管理に関する基本的な情報を理解する必要があります。

Spark における GC チューニングの目標は、寿命の長い RDD のみが Old ジェネレーションに保存され、Young ジェネレーションが短寿命のオブジェクトを保存するのに十分なサイズであることを保証することです。これにより、タスク実行中に作成された一時オブジェクトを収集するためのフル GC を回避できます。役立つ可能性のある手順は次のとおりです。

私たちの経験では、GC チューニングの効果はアプリケーションと利用可能なメモリ量に依存します。オンラインで説明されているチューニングオプションは他にもたくさんありますが、大まかに言うと、フル GC が発生する頻度を管理することでオーバーヘッドを削減できます。

エグゼキュータの GC チューニングフラグは、ジョブの構成で spark.executor.defaultJavaOptions または spark.executor.extraJavaOptions を設定することで指定できます。

その他の考慮事項

並列度

各操作の並列度を十分に高く設定しないと、クラスターは完全に利用されません。Spark は、ファイルサイズに応じて各ファイルで実行される「マップ」タスクの数を自動的に設定します (ただし、SparkContext.textFile などのオプション パラメータで制御できます)。また、分散「リデュース」操作 (例: groupByKey および reduceByKey) の場合、最も大きい親 RDD のパーティション数を使用します。並列度を 2 番目の引数として渡す (例: spark.PairRDDFunctions ドキュメントを参照) か、設定プロパティ spark.default.parallelism を設定してデフォルトを変更できます。一般的に、クラスターの CPU コアあたり 2〜3 タスクを推奨します。

入力パスでの並列リスト表示

場合によっては、ジョブ入力に多数のディレクトリがある場合にディレクトリリスト表示の並列度を増やす必要があることもあります。そうしないと、特に S3 のようなオブジェクトストアに対して、プロセスに非常に時間がかかる可能性があります。ジョブが RDD と Hadoop 入力フォーマット (例: SparkContext.sequenceFile を介して) で動作する場合、並列度は spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads (現在デフォルトは 1) によって制御されます。

Spark SQL のファイルベースのデータソースの場合、リスト表示の並列度を向上させるために spark.sql.sources.parallelPartitionDiscovery.threshold および spark.sql.sources.parallelPartitionDiscovery.parallelism をチューニングできます。詳細については、Spark SQL パフォーマンスチューニングガイドを参照してください。

Reduce タスクのメモリ使用量

場合によっては、RDD がメモリに収まらないためではなく、タスクの 1 つのワーキングセット (たとえば groupByKey のリデュースタスクの 1 つ) が大きすぎたために OutOfMemoryError が発生することがあります。Spark のシャッフル操作 (sortByKeygroupByKeyreduceByKeyjoin など) は、各タスク内でグループ化を実行するためのハッシュテーブルを構築しており、これはしばしば大きくなる可能性があります。ここでの最も簡単な解決策は、並列度を増やすことです。これにより、各タスクの入力セットが小さくなります。Spark は、1 つのエグゼキュータ JVM を多くのタスクで再利用し、タスク起動コストが低いため、200 ms 程度の短いタスクでも効率的にサポートできます。そのため、クラスターのコア数よりも多い並列度を安全に増やすことができます。

大規模変数のブロードキャスト

SparkContext で利用可能なブロードキャスト機能を使用すると、各シリアライズされたタスクのサイズと、クラスター全体でジョブを起動するコストを大幅に削減できます。タスクがドライバープログラムから (たとえば静的なルックアップテーブルなど) 大きなオブジェクトを使用している場合は、それをブロードキャスト変数に変換することを検討してください。Spark はマスターに各タスクのシリアライズされたサイズを出力するため、タスクが大きすぎるかどうかを判断するためにそれを確認できます。一般的に、約 20 KiB より大きいタスクは最適化する価値があるでしょう。

データローカリティ

データローカリティは、Spark ジョブのパフォーマンスに大きな影響を与える可能性があります。データとそれを処理するコードが一緒であれば、計算は高速になる傾向があります。しかし、コードとデータが分離している場合、一方が他方に移動する必要があります。通常、コードサイズはデータチャンクよりもはるかに小さいため、シリアライズされたコードを場所から場所へ移動する方が、データのチャンクを移動するよりも高速です。Spark は、この一般的なデータローカリティの原則に基づいてスケジューリングを構築します。

データローカリティとは、データがそれを処理するコードに近い度合いです。データの現在の場所に基づいて、いくつかのレベルのローカリティがあります。最も近いものから最も遠いものへ順に

Spark は、すべてのタスクを最高のローカリティレベルでスケジュールすることを好みますが、常に可能とは限りません。アイドル状態のエグゼキュータに未処理のデータがない状況では、Spark はより低いローカリティレベルに切り替わります。2 つのオプションがあります。a) 同じサーバー上のデータでタスクを開始するためにビジーな CPU が解放されるのを待つ、または b) より遠い場所で新しいタスクをすぐに開始し、そこにデータを移動する。

Spark が通常行うことは、ビジーな CPU が解放されることを期待して少し待つことです。そのタイムアウトが切れると、遠くから空いている CPU にデータを移動し始めます。各レベル間のフォールバックの待機タイムアウトは、個別に、またはすべてまとめて 1 つのパラメータで設定できます。詳細については、構成ページspark.locality パラメータを参照してください。タスクが長時間実行されており、ローカリティが低い場合は、これらの設定を増やす必要がありますが、デフォルトは通常うまく機能します。

まとめ

これは、Spark アプリケーションのチューニングで知っておくべき主な懸念事項、特にデータシリアライゼーションとメモリチューニングを指摘するための短いガイドでした。ほとんどのプログラムでは、Kryo シリアライゼーションに切り替え、データをシリアライズされた形式で保存することで、一般的なパフォーマンスの問題のほとんどが解決されます。その他のチューニングのベストプラクティスについては、Spark メーリングリストでお気軽にお問い合わせください。