Spark のチューニング

ほとんどの Spark 計算はインメモリで行われるため、Spark プログラムはクラスタ内のあらゆるリソース(CPU、ネットワーク帯域幅、メモリ)によってボトルネックとなる可能性があります。ほとんどの場合、データがメモリに収まればボトルネックはネットワーク帯域幅になりますが、メモリ使用量を削減するために、RDD をシリアライズ形式で保存するなど、チューニングが必要になる場合もあります。このガイドでは、優れたネットワークパフォーマンスに不可欠であり、メモリ使用量も削減できるデータシリアライゼーションと、メモリチューニングという 2 つの主要なトピックについて説明します。また、いくつかの小さなトピックについても簡単に説明します。

データシリアライゼーション

シリアライゼーションは、あらゆる分散アプリケーションのパフォーマンスにおいて重要な役割を果たします。オブジェクトのシリアライズが遅い形式や、大量のバイトを消費する形式は、計算を大幅に遅くします。多くの場合、これは Spark アプリケーションを最適化するために最初に調整すべき点です。Spark は、利便性(操作で任意の Java タイプを使用できる)とパフォーマンスのバランスを取ることを目指しています。2 つのシリアライゼーションライブラリを提供しています。

SparkConf でジョブを初期化し、conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") を呼び出すことで、Kryo を使用するように切り替えることができます。この設定は、ワーカーノード間でデータをシャッフルするだけでなく、RDD をディスクにシリアライズする場合にも使用されるシリアライザを設定します。Kryo がデフォルトではない唯一の理由は、カスタム登録の要件があるためですが、ネットワークを集中的に使用するアプリケーションでは試してみることをお勧めします。Spark 2.0.0 以降、単純な型、単純な型の配列、または文字列型の RDD をシャッフルする際に、内部的に Kryo シリアライザを使用しています。

Spark は、Twitter chill ライブラリの AllScalaRegistrar でカバーされている、多くのよく使用されるコア Scala クラスの Kryo シリアライザを自動的に含んでいます。

独自のカスタムクラスを Kryo に登録するには、registerKryoClasses メソッドを使用します。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

Kryo のドキュメントでは、カスタムシリアライゼーションコードの追加など、より高度な登録オプションについて説明しています。

オブジェクトが大きい場合は、設定spark.kryoserializer.buffer を増やす必要がある場合もあります。この値は、シリアライズする *最大* のオブジェクトを保持できる大きさにする必要があります。

最後に、カスタムクラスを登録しない場合でも Kryo は機能しますが、各オブジェクトに完全なクラス名を保存する必要があり、無駄になります。

メモリチューニング

メモリ使用量のチューニングには、3 つの考慮事項があります。オブジェクトが使用するメモリの *量* (データセット全体をメモリに収めたい場合があります)、それらのオブジェクトにアクセスする *コスト* 、および *ガベージコレクション* のオーバーヘッド(オブジェクトのターンオーバーが高い場合)です。

デフォルトでは、Java オブジェクトへのアクセスは高速ですが、フィールド内の「生の」データの 2〜5 倍のスペースを簡単に消費する可能性があります。これはいくつかの理由によるものです。

このセクションでは、まず Spark のメモリ管理の概要を説明し、次に、アプリケーションでメモリをより効率的に使用するためにユーザーが採用できる具体的な戦略について説明します。特に、オブジェクトのメモリ使用量を確認する方法と、データ構造を変更するか、データをシリアライズ形式で保存することで、メモリ使用量を改善する方法について説明します。次に、Spark のキャッシュサイズと Java ガベージコレクタのチューニングについて説明します。

メモリ管理の概要

Spark でのメモリ使用量は、主に実行とストレージの 2 つのカテゴリに分類されます。実行メモリは、シャッフル、結合、ソート、集計の計算に使用されるメモリを指し、ストレージメモリは、キャッシュとクラスタ全体の内部データの伝播に使用されるメモリを指します。Spark では、実行とストレージは統合された領域(M)を共有します。実行メモリが使用されていない場合、ストレージは利用可能なすべてのメモリを取得でき、その逆も可能です。実行は必要に応じてストレージをエビクトできますが、ストレージメモリの合計使用量が特定のしきい値(R)を下回るまでです。言い換えれば、R は、キャッシュされたブロックがエビクトされない M 内のサブ領域を表します。実装の複雑さのため、ストレージは実行をエビクトできない場合があります。

この設計により、いくつかの望ましい特性が保証されます。まず、キャッシングを使用しないアプリケーションは、実行のためにスペース全体を使用できるため、不要なディスクスピルを回避できます。次に、キャッシングを使用するアプリケーションは、データブロックがエビクトされない最小ストレージスペース(R)を予約できます。最後に、このアプローチは、メモリが内部的にどのように分割されているかについてのユーザーの専門知識を必要とせずに、さまざまなワークロードに対して妥当なすぐに使えるパフォーマンスを提供します。

関連する設定は 2 つありますが、デフォルト値はほとんどのワークロードに適用できるため、通常のユーザーは調整する必要はありません。

spark.memory.fraction の値は、この量のヒープスペースが JVM の古い世代または「テニュア」世代に快適に収まるように設定する必要があります。詳細については、以下の高度な GC チューニングの説明を参照してください。

メモリ消費量の確認

データセットに必要なメモリ消費量をサイジングする最良の方法は、RDD を作成し、キャッシュに配置し、Web UI の「ストレージ」ページを確認することです。ページには、RDD が占有しているメモリの量が示されます。

特定のオブジェクトのメモリ消費量を推定するには、SizeEstimatorestimate メソッドを使用します。これは、メモリ使用量を削減するためにさまざまなデータレイアウトを試したり、ブロードキャスト変数が各 executor ヒープで占有するスペースの量を決定したりするのに役立ちます。

データ構造のチューニング

メモリ消費量を削減する最初の方法は、ポインタベースのデータ構造やラッパーオブジェクトなど、オーバーヘッドを追加する Java の機能を回避することです。これを行うには、いくつかの方法があります。

  1. 標準の Java または Scala コレクションクラス(例:HashMap)ではなく、オブジェクトの配列とプリミティブ型を優先するようにデータ構造を設計します。fastutil ライブラリは、Java 標準ライブラリと互換性のあるプリミティブ型に便利なコレクションクラスを提供します。
  2. 可能であれば、小さなオブジェクトとポインタを多数含む入れ子構造は避けてください。
  3. キーには、文字列ではなく数値 ID または列挙オブジェクトを使用することを検討してください。
  4. 32 GiB 未満の RAM がある場合は、JVM フラグ -XX:+UseCompressedOops を設定して、ポインタを 8 バイトではなく 4 バイトにします。これらのオプションは、spark-env.sh に追加できます。

シリアライズされた RDD ストレージ

このチューニングを行ってもオブジェクトが大きすぎて効率的に保存できない場合は、RDD 永続化 API のシリアライズされた StorageLevels(MEMORY_ONLY_SER など)を使用して、*シリアライズされた* 形式で保存することで、メモリ使用量を削減する方がはるかに簡単です。Spark は、各 RDD パーティションを 1 つの大きなバイト配列として格納します。データをシリアライズ形式で保存することの唯一の欠点は、各オブジェクトをオンザフライでデシリアライズする必要があるため、アクセス時間が遅くなることです。データをシリアライズ形式でキャッシュする場合は、Kryo を使用することを強くお勧めします。これは、Java シリアライゼーション(そしてもちろん生の Java オブジェクト)よりもはるかに小さいサイズになるためです。

ガベージコレクションのチューニング

プログラムによって保存されるRDDの観点から、大量の「チャーン」がある場合、JVMのガベージコレクションが問題になる可能性があります。(通常、RDDを1回だけ読み込んでから、多くの操作を実行するプログラムでは問題になりません。)Javaが新しいオブジェクトのためのスペースを確保するために古いオブジェクトを削除する必要がある場合、すべてのJavaオブジェクトをトレースして、未使用のオブジェクトを見つける必要があります。ここで覚えておくべき重要な点は、*ガベージコレクションのコストはJavaオブジェクトの数に比例する*ということです。そのため、オブジェクトの数が少ないデータ構造(たとえば、`LinkedList`ではなく`Int`の配列)を使用すると、このコストが大幅に削減されます。さらに良い方法は、上記のように、オブジェクトをシリアル化された形式で永続化することです。これで、RDDパーティションごとに*1つ*のオブジェクト(バイト配列)しか存在しなくなります。GCが問題になる場合、他の手法を試す前に、最初に試すべきことは、シリアル化されたキャッシングを使用することです。

GCは、タスクのワーキングメモリ(タスクの実行に必要なスペースの量)とノードにキャッシュされたRDDとの干渉によっても問題になる可能性があります。これを軽減するために、RDDキャッシュに割り当てられるスペースを制御する方法について説明します。

GCの影響の測定

GCチューニングの最初のステップは、ガベージコレクションが発生する頻度とGCに費やされる時間に関する統計情報を収集することです。これは、Javaオプションに`-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps`を追加することで実行できます。(SparkジョブにJavaオプションを渡す方法については、設定ガイドを参照してください。)次回Sparkジョブを実行すると、ガベージコレクションが発生するたびに、ワーカーのログにメッセージが出力されます。これらのログは、ドライバプログラムではなく、クラスタのワーカーノード(作業ディレクトリの`stdout`ファイル内)にあることに注意してください。

高度なGCチューニング

ガベージコレクションをさらに調整するには、まずJVMのメモリ管理に関する基本的な情報を理解する必要があります。

SparkにおけるGCチューニングの目標は、寿命の長いRDDのみがOld世代に格納され、Young世代がタスク実行中に作成された一時オブジェクトを収集するためのフルGCを回避するのに十分なサイズであることを保証することです。役立つ可能性のある手順を以下に示します。

経験から、GCチューニングの効果はアプリケーションと使用可能なメモリ量によって異なります。オンラインで説明されているさらに多くのチューニングオプションがありますが、高いレベルでは、フルGCが実行される頻度を管理することで、オーバーヘッドを削減できます。

エグゼキュータのGCチューニングフラグは、ジョブの構成で`spark.executor.defaultJavaOptions`または`spark.executor.extraJavaOptions`を設定することで指定できます。

その他の考慮事項

並列度

各操作の並列度を十分に高く設定しない限り、クラスタは完全に活用されません。Sparkは、ファイルサイズに応じて各ファイルで実行する「マップ」タスクの数を自動的に設定します(ただし、`SparkContext.textFile`などのオプションパラメータを使用して制御できます)。`groupByKey`や`reduceByKey`などの分散「reduce」操作では、親RDDの中で最大のパーティション数を使用します。並列度を2番目の引数として渡すことができます(`spark.PairRDDFunctions`のドキュメントを参照してください)、または設定プロパティ`spark.default.parallelism`を設定してデフォルトを変更できます。一般に、クラスタのCPUコアごとに2〜3タスクを推奨します。

入力パスの並列リスト

ジョブ入力が多数のディレクトリを持っている場合、ディレクトリリストの並列処理を増やす必要がある場合があります。そうしないと、特にS3のようなオブジェクトストアに対して、プロセスに非常に時間がかかる可能性があります。ジョブがHadoop入力形式のRDDで動作する場合(たとえば、`SparkContext.sequenceFile`経由)、並列処理は`spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads`によって制御されます(現在のデフォルトは1です)。

ファイルベースのデータソースを使用するSpark SQLの場合、`spark.sql.sources.parallelPartitionDiscovery.threshold`と`spark.sql.sources.parallelPartitionDiscovery.parallelism`を調整して、リストの並列処理を改善できます。詳細については、Spark SQLパフォーマンチューニングガイドを参照してください。

Reduce タスクのメモリ使用量

RDDがメモリに収まらないためではなく、`groupByKey`のreduceタスクの1つなど、タスクのワーキングセットが大きすぎたために、OutOfMemoryErrorが発生することがあります。Sparkのシャッフル操作(`sortByKey`、`groupByKey`、`reduceByKey`、`join`など)は、各タスク内にハッシュテーブルを作成してグループ化を実行しますが、これは多くの場合大きくなる可能性があります。ここで最も簡単な解決策は、*並列度を高める*ことです。そうすれば、各タスクの入力セットが小さくなります。Sparkは、1つのエグゼキュータJVMを多くのタスクで再利用し、タスク起動コストが低いため、200ミリ秒程度の短いタスクを効率的にサポートできます。そのため、クラスタのコア数よりも多くの並列度を安全に増やすことができます。

大きな変数のブロードキャスト

`SparkContext`で利用可能なブロードキャスト機能を使用すると、シリアル化された各タスクのサイズと、クラスタ上でジョブを起動するコストを大幅に削減できます。タスクが内部でドライバプログラムの大きなオブジェクト(静的ルックアップテーブルなど)を使用している場合は、それをブロードキャスト変数に変換することを検討してください。Sparkはマスターに各タスクのシリアル化されたサイズを出力するため、それを見てタスクが大きすぎるかどうかを判断できます。一般に、約20 KiBを超えるタスクは最適化する価値があります。

データローカリティ

データの局所性は、Sparkジョブのパフォーマンスに大きな影響を与える可能性があります。データとそれを操作するコードが一緒になっている場合、計算は高速になる傾向があります。ただし、コードとデータが分離されている場合は、一方が他方に移動する必要があります。通常、コードサイズはデータよりもはるかに小さいため、シリアル化されたコードを場所から場所へ移動する方が、データのチャンクを移動するよりも高速です。Sparkは、このデータの局所性に関する一般的な原則に基づいてスケジューリングを構築します。

データの局所性とは、データを処理するコードまでの距離のことです。データの現在の場所に基づいて、いくつかのレベルの局所性があります。最も近いものから最も遠いものまで、順番に

Sparkは、すべてのタスクを最適な局所性レベルでスケジュールすることを優先しますが、これは常に可能とは限りません。アイドル状態のエグゼキュータに未処理データがない状況では、Sparkはより低い局所性レベルに切り替えます。この場合、2つの選択肢があります。 a) 同じサーバー上のデータでタスクを開始するために、ビジー状態のCPUが解放されるまで待機する、または b) データの移動が必要となる、より遠隔地で新しいタスクをすぐに開始する。

Sparkは通常、ビジー状態のCPUが解放されることを期待して少し待機します。そのタイムアウトが期限切れになると、遠くから空いているCPUにデータの移動を開始します。各レベル間のフォールバックの待機タイムアウトは、個別に、またはすべてまとめて1つのパラメータで設定できます。詳細は、設定ページspark.localityパラメータを参照してください。タスクの実行時間が長く、局所性が低い場合は、これらの設定値を増やす必要がありますが、通常はデフォルト値で問題ありません。

まとめ

これは、Sparkアプリケーションをチューニングする際に知っておくべき主な懸念事項、特にデータのシリアライゼーションとメモリのチューニングについて指摘するための簡単なガイドです。ほとんどのプログラムでは、Kryoシリアライゼーションに切り替え、データをシリアライズされた形式で永続化することで、一般的なパフォーマンスの問題のほとんどが解決します。その他のチューニングのベストプラクティスについては、Sparkメーリングリストでお気軽にお問い合わせください。