監視とインストルメンテーション

Spark アプリケーションを監視するには、Web UI、メトリクス、外部インストルメンテーションなど、いくつかの方法があります。

Webインターフェイス

各 SparkContext は、アプリケーションに関する有用な情報を表示する Web UI を起動します。デフォルトではポート 4040 で起動し、これには以下が含まれます。

Web ブラウザで http://<driver-node>:4040 を開くだけで、このインターフェイスにアクセスできます。同じホストで複数の SparkContext が実行されている場合、それらは 4040 から始まる連続したポート (4041、4042 など) にバインドされます。

デフォルトでは、この情報はアプリケーションの存続期間中のみ利用可能であることに注意してください。後で Web UI を表示するには、アプリケーションを開始する前に spark.eventLog.enabled を true に設定します。これにより、Spark は UI に表示される情報をエンコードした Spark イベントを永続ストレージにログ記録するように構成されます。

事後表示

アプリケーションのイベントログが存在する場合、Spark の履歴サーバーを通じて UI を構築することは依然として可能です。履歴サーバーを起動するには、以下を実行します。

./sbin/start-history-server.sh

これにより、デフォルトで http://<server-url>:18080 に Web インターフェイスが作成され、未完了および完了したアプリケーションと試行が一覧表示されます。

ファイルシステムプロバイダクラス (以下の spark.history.provider を参照) を使用する場合、ベースログディレクトリは spark.history.fs.logDirectory 設定オプションで指定する必要があります。これは、各アプリケーションのイベントログを表すサブディレクトリを含む必要があります。

Spark ジョブ自体は、イベントをログ記録し、それらを同じ共有可能ディレクトリにログ記録するように構成する必要があります。たとえば、サーバーが hdfs://namenode/shared/spark-logs のログディレクトリで構成されている場合、クライアント側のオプションは次のようになります。

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

履歴サーバーは次のように構成できます。

環境変数

環境変数意味
SPARK_DAEMON_MEMORY 履歴サーバーに割り当てるメモリ (デフォルト: 1g)。
SPARK_DAEMON_JAVA_OPTS 履歴サーバーの JVM オプション (デフォルト: なし)。
SPARK_DAEMON_CLASSPATH 履歴サーバーのクラスパス (デフォルト: なし)。
SPARK_PUBLIC_DNS 履歴サーバーの公開アドレス。これが設定されていない場合、アプリケーション履歴へのリンクはサーバーの内部アドレスを使用する可能性があり、リンク切れにつながります (デフォルト: なし)。
SPARK_HISTORY_OPTS 履歴サーバーの spark.history.* 設定オプション (デフォルト: なし)。

ローリングイベントログファイルへのコンパクションの適用

長時間実行されるアプリケーション (例: ストリーミング) は、非常に大きな単一イベントログファイルを生成する可能性があり、保守に多大なコストがかかり、Spark History Server での各更新ごとに大量のリソースを必要とします。

spark.eventLog.rolling.enabledspark.eventLog.rolling.maxFileSize を有効にすると、単一の巨大なイベントログファイルではなく、ローリングイベントログファイルを使用できるようになります。これは、それ自体で一部のシナリオに役立つ可能性がありますが、ログの全体的なサイズを削減するのに役立つわけではありません。

Spark History Server は、Spark History Server で設定 spark.history.fs.eventLog.rolling.maxFilesToRetain を設定することにより、ローリングイベントログファイルをコンパクションしてログの全体的なサイズを削減できます。

詳細は後述しますが、コンパクションは損失を伴う操作であることに注意してください。コンパクションは、UI に表示されなくなる一部のイベントを破棄します。オプションを有効にする前に、どのイベントが破棄されるかを確認することをお勧めします。

コンパクションが発生すると、履歴サーバーはアプリケーションで利用可能なすべてのイベントログファイルを一覧表示し、保持される最小インデックスを持つファイルよりも小さいインデックスを持つイベントログファイルをコンパクションの対象と見なします。たとえば、アプリケーション A に 5 つのイベントログファイルがあり、spark.history.fs.eventLog.rolling.maxFilesToRetain が 2 に設定されている場合、最初の 3 つのログファイルがコンパクションのために選択されます。

対象を選択したら、それらを分析して除外できるイベントを特定し、除外が決定されたイベントを破棄して 1 つのコンパクトなファイルに書き換えます。

コンパクションは、古いデータを示すイベントを除外しようとします。現在、除外されるイベントの候補は次のとおりです。

書き換えが完了すると、元のログファイルは最善の努力で削除されます。履歴サーバーは元のログファイルを削除できない場合がありますが、履歴サーバーの操作には影響しません。

履歴サーバーは、コンパクション中に削減されるスペースが少ないと判断した場合、古いイベントログファイルをコンパクトしない場合があることに注意してください。ストリーミングクエリの場合、通常、各マイクロバッチが 1 つ以上のジョブをトリガーしてすぐに完了するため、コンパクションが実行されると予想されますが、バッチクエリでは多くの場合、コンパクションは実行されません。

また、これは Spark 3.0 で導入された新機能であり、完全に安定していない可能性があることに注意してください。状況によっては、コンパクションが予期しないよりも多くのイベントを除外する可能性があり、履歴サーバーでアプリケーションの UI に問題が発生する可能性があります。注意して使用してください。

Spark History Server の設定オプション

Spark History Server のセキュリティオプションについては、セキュリティページで詳しく説明しています。

プロパティ名 デフォルト 意味 バージョン以降
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider アプリケーション履歴バックエンドを実装するクラスの名前。現在、Spark が提供する実装は 1 つだけで、ファイルシステムに格納されているアプリケーションログを検索します。 1.1.0
spark.history.fs.logDirectory file:/tmp/spark-events ファイルシステム履歴プロバイダの場合、ロードするアプリケーションイベントログを含むディレクトリの URL。これは、ローカルの file:// パス、HDFS パス hdfs://namenode/shared/spark-logs、または Hadoop API でサポートされている代替ファイルシステムの URL にすることができます。 1.1.0
spark.history.fs.update.interval 10s ファイルシステム履歴プロバイダがログディレクトリ内の新しいまたは更新されたログをチェックする間隔。間隔を短くすると新しいアプリケーションをより速く検出できますが、更新されたアプリケーションを再読み取りするサーバー負荷が増加します。更新が完了するとすぐに、完了したアプリケーションと未完了のアプリケーションのリストに変更が反映されます。 1.4.0
spark.history.retainedApplications 50 キャッシュで UI データを保持するアプリケーションの数。この上限を超えると、最も古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュにない場合、UI からアクセスされるとディスクからロードする必要があります。 1.0.0
spark.history.ui.maxApplications Int.MaxValue 履歴概要ページに表示するアプリケーションの数。アプリケーション UI は、履歴概要ページに表示されなくても、URL に直接アクセスすることで引き続き利用できます。 2.0.1
spark.history.ui.port 18080 履歴サーバーの Web インターフェイスがバインドされるポート。 1.0.0
spark.history.kerberos.enabled false 履歴サーバーが Kerberos を使用してログインするかどうかを示します。これは、履歴サーバーがセキュアな Hadoop クラスタで HDFS ファイルにアクセスする場合に必要です。 1.0.1
spark.history.kerberos.principal (なし) spark.history.kerberos.enabled=true の場合、履歴サーバーの Kerberos プリンシパル名を指定します。 1.0.1
spark.history.kerberos.keytab (なし) spark.history.kerberos.enabled=true の場合、履歴サーバーの Kerberos キータブファイルの場所を指定します。 1.0.1
spark.history.fs.cleaner.enabled false 履歴サーバーが定期的にストレージからイベントログをクリーンアップするかどうかを指定します。 1.4.0
spark.history.fs.cleaner.interval 1d When spark.history.fs.cleaner.enabled=true, specifies how often the filesystem job history cleaner checks for files to delete. Files are deleted if at least one of two conditions holds. First, they're deleted if they're older than spark.history.fs.cleaner.maxAge. They are also deleted if the number of files is more than spark.history.fs.cleaner.maxNum, Spark tries to clean up the completed attempts from the applications based on the order of their oldest attempt time. 1.4.0
spark.history.fs.cleaner.maxAge 7d When spark.history.fs.cleaner.enabled=true, job history files older than this will be deleted when the filesystem history cleaner runs. 1.4.0
spark.history.fs.cleaner.maxNum Int.MaxValue When spark.history.fs.cleaner.enabled=true, specifies the maximum number of files in the event log directory. Spark tries to clean up the completed attempt logs to maintain the log directory under this limit. This should be smaller than the underlying file system limit like `dfs.namenode.fs-limits.max-directory-items` in HDFS. 3.0.0
spark.history.fs.endEventReparseChunkSize 1m How many bytes to parse at the end of log files looking for the end event. This is used to speed up generation of application listings by skipping unnecessary parts of event log files. It can be disabled by setting this config to 0. 2.4.0
spark.history.fs.inProgressOptimization.enabled true Enable optimized handling of in-progress logs. This option may leave finished applications that fail to rename their event logs listed as in-progress. 2.4.0
spark.history.fs.driverlog.cleaner.enabled spark.history.fs.cleaner.enabled Specifies whether the History Server should periodically clean up driver logs from storage. 3.0.0
spark.history.fs.driverlog.cleaner.interval spark.history.fs.cleaner.interval When spark.history.fs.driverlog.cleaner.enabled=true, specifies how often the filesystem driver log cleaner checks for files to delete. Files are only deleted if they are older than spark.history.fs.driverlog.cleaner.maxAge 3.0.0
spark.history.fs.driverlog.cleaner.maxAge spark.history.fs.cleaner.maxAge When spark.history.fs.driverlog.cleaner.enabled=true, driver log files older than this will be deleted when the driver log cleaner runs. 3.0.0
spark.history.fs.numReplayThreads 利用可能なコアの 25% 履歴サーバーがイベントログを処理するために使用するスレッド数。 2.0.0
spark.history.store.maxDiskUsage 10g キャッシュされたアプリケーション履歴情報が格納されるローカルディレクトリの最大ディスク使用量。 2.3.0
spark.history.store.path (なし) アプリケーション履歴データをキャッシュするローカルディレクトリ。設定されている場合、履歴サーバーはメモリ内に履歴を保持する代わりにディスクにアプリケーションデータを格納します。ディスクに書き込まれたデータは、履歴サーバーが再起動された場合に再利用されます。 2.3.0
spark.history.store.serializer JSON メモリ内の UI オブジェクトをディスクベースの KV ストアに書き込んだり読み取ったりするためのシリアライザー。JSON または PROTOBUF。JSON シリアライザーは Spark 3.4.0 より前の唯一の選択肢であるため、デフォルト値です。PROTOBUF シリアライザーは、JSON シリアライザーと比較して高速でコンパクトです。 3.4.0
spark.history.custom.executor.log.url (なし) 履歴サーバーでクラスタマネージャのアプリケーションログ URL を使用する代わりに、外部ログサービスをサポートするためのカスタム Spark Executor ログ URL を指定します。Spark は、クラスタマネージャによって異なる可能性のあるパターンを使用して、いくつかのパス変数をサポートします。サポートされているパターンがある場合は、クラスタマネージャのドキュメントを確認してください。この設定はライブアプリケーションには影響しません。履歴サーバーにのみ影響します。

現時点では、YARN モードのみがこの設定をサポートしています。

3.0.0
spark.history.custom.executor.log.url.applyIncompleteApplication true カスタム Spark Executor ログ URL を未完了のアプリケーションにも適用するかどうかを指定します。実行中のアプリケーションの Executor ログが元のログ URL として提供されるべき場合は、これを `false` に設定します。未完了のアプリケーションには、正常にシャットダウンしなかったアプリケーションが含まれる場合があることに注意してください。これが `true` に設定されていても、この設定はライブアプリケーションには影響しません。履歴サーバーにのみ影響します。 3.0.0
spark.history.fs.eventLog.rolling.maxFilesToRetain Int.MaxValue コンパクションされていない状態として保持されるイベントログファイルの最大数。デフォルトでは、すべてのイベントログファイルが保持されます。技術的な理由から、最小値は 1 です。
詳細については、「古いイベントログファイルのコンパクションの適用」セクションをお読みください。
3.0.0
spark.history.store.hybridStore.enabled false イベントログの解析時に HybridStore をストアとして使用するかどうか。HybridStore は、まずインメモリストアにデータを書き込み、インメモリストアへの書き込みが完了した後にバックグラウンドスレッドでディスクストアにダンプします。 3.1.0
spark.history.store.hybridStore.maxMemoryUsage 2g HybridStore を作成するために使用できる最大メモリスペース。HybridStore はヒープメモリを共有するため、HybridStore が有効になっている場合、SHS のメモリオプションを通じてヒープメモリを増やす必要があります。 3.1.0
spark.history.store.hybridStore.diskBackend ROCKSDB ハイブリッドストアで使用されるディスクベースストアを指定します。ROCKSDB または LEVELDB (非推奨)。 3.3.0
spark.history.fs.update.batchSize Int.MaxValue 新しいイベントログファイルの更新バッチサイズを指定します。これは、各スキャンプロセスが妥当な時間内に完了することを制御し、初期スキャンが長くなりすぎて、大規模な環境で新しいイベントログファイルが適時にスキャンされるのをブロックするのを防ぎます。 3.4.0

これらの UI では、テーブルはヘッダーをクリックすることで並べ替え可能であり、遅いタスク、データスキューなどを簡単に特定できることに注意してください。

注意

  1. 履歴サーバーは、完了した Spark ジョブと未完了の Spark ジョブの両方を表示します。アプリケーションが障害後に複数回試行した場合、失敗した試行、進行中の未完了の試行、または最終的な成功した試行が表示されます。

  2. 未完了のアプリケーションは断続的にのみ更新されます。更新間の時間は、変更されたファイルをチェックする間隔 (spark.history.fs.update.interval) によって定義されます。大規模なクラスタでは、更新間隔が大きな値に設定されている場合があります。実行中のアプリケーションを表示する実際の方法は、その独自の Web UI を表示することです。

  3. 完了したと自身を登録せずに終了したアプリケーションは、実行されていなくても未完了として一覧表示されます。これは、アプリケーションがクラッシュした場合に発生する可能性があります。

  4. Spark ジョブの完了を通知する 1 つの方法は、Spark Context を明示的に停止すること (sc.stop())、または Python では with SparkContext() as sc: 構文を使用して Spark Context のセットアップとティアダウンを処理することです。

REST API

UI でメトリクスを表示するのに加えて、JSON としても利用できます。これにより、開発者は Spark 用の新しいビジュアライゼーションと監視ツールを簡単に作成できます。JSON は、実行中のアプリケーションと履歴サーバーの両方で利用できます。エンドポイントは /api/v1 にマウントされています。たとえば、履歴サーバーの場合、通常は http://<server-url>:18080/api/v1 でアクセスでき、実行中のアプリケーションの場合は https://:4040/api/v1 でアクセスできます。

API では、アプリケーションはアプリケーション ID [app-id] で参照されます。YARN で実行する場合、各アプリケーションには複数の試行がありますが、試行 ID はクラスタモードのアプリケーションにのみ存在し、クライアントモードのアプリケーションには存在しません。YARN クラスタモードのアプリケーションは、その [attempt-id] で識別できます。以下の API では、YARN クラスタモードで実行する場合、[app-id] は実際には [base-app-id]/[attempt-id] になります。ここで、[base-app-id] は YARN アプリケーション ID です。

エンドポイント意味
/applications すべてのアプリケーションのリスト。
?status=[completed|running] 指定された状態のアプリケーションのみを一覧表示します。
?minDate=[date] 一覧表示する最も早い開始日時。
?maxDate=[date] 一覧表示する最も遅い開始日時。
?minEndDate=[date] 一覧表示する最も早い終了日時。
?maxEndDate=[date] 一覧表示する最も遅い終了日時。
?limit=[limit] 一覧表示するアプリケーションの数を制限します。

?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 指定されたアプリケーションのすべてのジョブのリスト。
?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみを一覧表示します。
/applications/[app-id]/jobs/[job-id] 指定されたジョブの詳細。
/applications/[app-id]/stages 指定されたアプリケーションのすべてのステージのリスト。
?status=[active|complete|pending|failed] 指定された状態のステージのみを一覧表示します。
?details=true すべてのステージをタスクデータとともに一覧表示します。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定されたタスクステータスのタスクのみを一覧表示します。クエリパラメータ taskStatus は、details=true の場合にのみ有効です。これは、複数の taskStatus もサポートします。たとえば、?details=true&taskStatus=SUCCESS&taskStatus=FAILED は、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。
?withSummaries=true タスクメトリクス分布と Executor メトリクス分布を持つステージを一覧表示します。
?quantiles=0.0,0.25,0.5,0.75,1.0 指定された四分位数でメトリクスを要約します。クエリパラメータ quantiles は、withSummaries=true の場合にのみ有効です。デフォルト値は 0.0,0.25,0.5,0.75,1.0 です。
/applications/[app-id]/stages/[stage-id] 指定されたステージのすべての試行のリスト。
?details=true 指定されたステージのタスクデータとともにすべての試行を一覧表示します。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定されたタスクステータスのタスクのみを一覧表示します。クエリパラメータ taskStatus は、details=true の場合にのみ有効です。これは、複数の taskStatus もサポートします。たとえば、?details=true&taskStatus=SUCCESS&taskStatus=FAILED は、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。
?withSummaries=true 各試行のタスクメトリクス分布と Executor メトリクス分布を一覧表示します。
?quantiles=0.0,0.25,0.5,0.75,1.0 指定された四分位数でメトリクスを要約します。クエリパラメータ quantiles は、withSummaries=true の場合にのみ有効です。デフォルト値は 0.0,0.25,0.5,0.75,1.0 です。

?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 指定されたステージ試行の詳細。
?details=true 指定されたステージ試行のすべてのタスクデータを一覧表示します。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定されたタスクステータスのタスクのみを一覧表示します。クエリパラメータ taskStatus は、details=true の場合にのみ有効です。これは、複数の taskStatus もサポートします。たとえば、?details=true&taskStatus=SUCCESS&taskStatus=FAILED は、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。
?withSummaries=true 指定されたステージ試行のタスクメトリクス分布と Executor メトリクス分布を一覧表示します。
?quantiles=0.0,0.25,0.5,0.75,1.0 指定された四分位数でメトリクスを要約します。クエリパラメータ quantiles は、withSummaries=true の場合にのみ有効です。デフォルト値は 0.0,0.25,0.5,0.75,1.0 です。

?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 指定されたステージ試行のすべてのタスクの要約メトリクス。
?quantiles 指定された四分位数でメトリクスを要約します。
例: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 指定されたステージ試行のすべてのタスクのリスト。
?offset=[offset]&length=[len] 指定された範囲のタスクを一覧表示します。
?sortBy=[runtime|-runtime] タスクを並べ替えます。
?status=[running|success|killed|failed|unknown] 指定された状態のタスクのみを一覧表示します。
例: ?offset=10&length=50&sortBy=runtime&status=running
/applications/[app-id]/executors 指定されたアプリケーションのすべてのアクティブな Executor のリスト。
/applications/[app-id]/executors/[executor-id]/threads 指定されたアクティブな Executor で実行されているすべてのスレッドのスタックトレース。履歴サーバーからは利用できません。
/applications/[app-id]/allexecutors 指定されたアプリケーションのすべての Executor (アクティブおよび終了済み) のリスト。
/applications/[app-id]/storage/rdd 指定されたアプリケーションの格納されている RDD のリスト。
/applications/[app-id]/storage/rdd/[rdd-id] 指定された RDD の格納ステータスの詳細。
/applications/[base-app-id]/logs zip ファイル内のファイルとして、指定されたアプリケーションのすべての試行のイベントログをダウンロードします。
/applications/[base-app-id]/[attempt-id]/logs 指定されたアプリケーション試行のイベントログを zip ファイルとしてダウンロードします。
/applications/[app-id]/streaming/statistics ストリーミングコンテキストの統計。
/applications/[app-id]/streaming/receivers すべてのストリーミング受信機のリスト。
/applications/[app-id]/streaming/receivers/[stream-id] 指定された受信機の詳細。
/applications/[app-id]/streaming/batches 保持されているすべてのバッチのリスト。
/applications/[app-id]/streaming/batches/[batch-id] 指定されたバッチの詳細。
/applications/[app-id]/streaming/batches/[batch-id]/operations 指定されたバッチのすべての出力操作のリスト。
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] 指定された操作と指定されたバッチの詳細。
/applications/[app-id]/sql 指定されたアプリケーションのすべてのクエリのリスト。
?details=[true (default) | false] Spark プランノードの詳細を表示/非表示にします。
?planDescription=[true (default) | false] Physical Plan のサイズが大きい場合に、Physical planDescription をオンデマンドで有効/無効にします。
?offset=[offset]&length=[len] 指定された範囲のクエリを一覧表示します。
/applications/[app-id]/sql/[execution-id] 指定されたクエリの詳細。
?details=[true (default) | false] 指定されたクエリの詳細に加えて、メトリクスの詳細を表示/非表示にします。
?planDescription=[true (default) | false] 指定されたクエリの Physical Plan のサイズが大きい場合に、Physical planDescription をオンデマンドで有効/無効にします。
/applications/[app-id]/environment 指定されたアプリケーションの環境詳細。
/version 現在の Spark バージョンを取得します。

取得できるジョブとステージの数は、スタンドアロン Spark UI と同じ保持メカニズムによって制限されます。"spark.ui.retainedJobs" はジョブのガベージコレクションをトリガーする閾値を定義し、spark.ui.retainedStages はステージの閾値を定義します。ガベージコレクションは再生時に行われることに注意してください。これらの値を増やして履歴サーバーを再起動することで、より多くのエントリを取得できます。

Executor タスクメトリクス

REST API は、タスク実行の粒度で Spark Executor によって収集されたタスクメトリクスの値を提供します。メトリクスは、パフォーマンスのトラブルシューティングとワークロードの特性評価に使用できます。利用可能なメトリクスのリストと簡単な説明

Spark Executor タスクメトリクス名 簡単な説明
executorRunTime Executor がこのタスクの実行に費やした経過時間。これには、シャッフルデータの取得時間も含まれます。値はミリ秒単位で表されます。
executorCpuTime Executor がこのタスクの実行に費やした CPU 時間。これには、シャッフルデータの取得時間も含まれます。値はナノ秒単位で表されます。
executorDeserializeTime このタスクのデシリアライズに費やされた経過時間。値はミリ秒単位で表されます。
executorDeserializeCpuTime このタスクのデシリアライズに Executor で費やされた CPU 時間。値はナノ秒単位で表されます。
resultSize このタスクが TaskResult としてドライバに返送したバイト数。
jvmGCTime このタスクの実行中に JVM がガベージコレクションに費やした経過時間。値はミリ秒単位で表されます。
ConcurrentGCCount このメトリクスは、発生したコレクションの総数を示します。Java Garbage Collector が G1 Concurrent GC の場合にのみ適用されます。
ConcurrentGCTime このメトリクスは、累積されたコレクションの経過時間(ミリ秒)の近似値を示します。Java Garbage Collector が G1 Concurrent GC の場合にのみ適用されます。
resultSerializationTime タスク結果のシリアライゼーションに費やされた経過時間。値はミリ秒単位で表されます。
memoryBytesSpilled このタスクによってメモリからスピルされたバイト数。
diskBytesSpilled このタスクによってディスクにスピルされたバイト数。
peakExecutionMemory シャッフル、集計、結合中に作成された内部データ構造によって使用されたピークメモリ。このアキュムレータの値は、このタスクで作成されたすべてのデータ構造のピークサイズの合計に近似しているはずです。SQL ジョブの場合、これはすべてのアンセーフオペレータと ExternalSort のみを追跡します。
inputMetrics.* org.apache.spark.rdd.HadoopRDD から、または永続化されたデータからデータを読み取るに関連するメトリクス。
    .bytesRead 読み取られたバイトの総数。
    .recordsRead 読み取られたレコードの総数。
outputMetrics.* 外部 (例: 分散ファイルシステム) へのデータ書き込みに関連するメトリクス。出力を持つタスクでのみ定義されます。
    .bytesWritten 書き込まれたバイトの総数
    .recordsWritten 書き込まれたレコードの総数
shuffleReadMetrics.* シャッフル読み取り操作に関連するメトリクス。
    .recordsRead シャッフル操作で読み取られたレコード数
    .remoteBlocksFetched シャッフル操作で取得されたリモートブロックの数
    .localBlocksFetched シャッフル操作で取得されたローカルブロックの数 (リモート Executor から読み取られたものではなく)
    .totalBlocksFetched シャッフル操作で取得されたブロックの数 (ローカルとリモートの両方)
    .remoteBytesRead シャッフル操作で読み取られたリモートバイト数
    .localBytesRead シャッフル操作でローカルディスクから読み取られたバイト数 (リモート Executor から読み取られたものではなく)
    .totalBytesRead シャッフル操作で読み取られたバイト数 (ローカルとリモートの両方)
    .remoteBytesReadToDisk シャッフル操作でディスクに読み取られたリモートバイト数。大きなブロックは、デフォルトの動作であるメモリに読み込まれるのではなく、シャッフル読み取り操作のためにディスクにフェッチされます。
    .fetchWaitTime タスクがリモートシャッフルブロックを待機するのに費やした時間。これは、シャッフル入力データでブロックされる時間のみを含みます。たとえば、タスクがブロック A の処理をまだ終えていない間にブロック B がフェッチされている場合、ブロック B でブロックされているとは見なされません。値はミリ秒単位で表されます。
shuffleWriteMetrics.* シャッフルデータを書き込む操作に関連するメトリクス。
    .bytesWritten シャッフル操作で書き込まれたバイト数
    .recordsWritten シャッフル操作で書き込まれたレコード数
    .writeTime ディスクまたはバッファキャッシュへの書き込みをブロックするのに費やされた時間。値はナノ秒単位で表されます。

Executor メトリクス

Executor レベルのメトリクスは、Executor 自体のパフォーマンスメトリクス (JVM ヒープメモリ、GC 情報など) を説明するために、Heartbeat の一部として各 Executor からドライバに送信されます。Executor メトリクス値とその測定されたメモリピーク値は、REST API を介して JSON 形式と Prometheus 形式で公開されます。JSON エンドポイントは /applications/[app-id]/executors で公開され、Prometheus エンドポイントは /metrics/executors/prometheus で公開されます。さらに、Executor メモリメトリクスの集計されたステージごとのピーク値は、spark.eventLog.logStageExecutorMetrics が true の場合にイベントログに書き込まれます。Executor メモリメトリクスは、Dropwizard メトリクスライブラリに基づいた Spark メトリクスシステムを介して公開される場合もあります。利用可能なメトリクスのリストと簡単な説明

Executor レベルメトリクス名 簡単な説明
rddBlocks この Executor のブロックマネージャにある RDD ブロック。
memoryUsed この Executor によって使用されているストレージメモリ。
diskUsed この Executor による RDD ストレージのために使用されているディスクスペース。
totalCores この Executor で利用可能なコア数。
maxTasks この Executor で同時に実行できるタスクの最大数。
activeTasks 現在実行中のタスク数。
failedTasks この Executor で失敗したタスク数。
completedTasks この Executor で完了したタスク数。
totalTasks この Executor のタスクの総数 (実行中、失敗、完了)。
totalDuration この Executor でタスクを実行するために JVM が費やした経過時間。値はミリ秒単位で表されます。
totalGCTime この Executor で合計されたガベージコレクションに JVM が費やした経過時間。値はミリ秒単位で表されます。
totalInputBytes この Executor で合計された入力バイト数。
totalShuffleRead この Executor で合計されたシャッフル読み取りバイト数。
totalShuffleWrite この Executor で合計されたシャッフル書き込みバイト数。
maxMemory ストレージに利用可能なメモリの総量 (バイト単位)。
memoryMetrics.* メモリメトリクスの現在の値
    .usedOnHeapStorageMemory 現在ストレージに使用されているヒープ上のメモリ (バイト単位)。
    .usedOffHeapStorageMemory 現在ストレージに使用されているヒープ外メモリ (バイト単位)。
    .totalOnHeapStorageMemory ストレージに利用可能なヒープ上のメモリの総量 (バイト単位)。この量は、MemoryManager の実装によって時間とともに変動する可能性があります。
    .totalOffHeapStorageMemory ストレージに利用可能なヒープ外メモリの総量 (バイト単位)。この量は、MemoryManager の実装によって時間とともに変動する可能性があります。
peakMemoryMetrics.* メモリ (および GC) メトリクスのピーク値
    .JVMHeapMemory オブジェクト割り当てに使用されるヒープのピークメモリ使用量。ヒープは 1 つ以上のメモリプールで構成されます。返されたメモリ使用量の使用済みおよびコミットされたサイズは、すべてのヒープメモリプールのこれらの値の合計ですが、返されたメモリ使用量の初期および最大サイズは、ヒープメモリの設定を表し、すべてのヒープメモリプールの合計ではない場合があります。返されたメモリ使用量の使用済みメモリ量は、ライブオブジェクトと、収集されていないガベージオブジェクトの両方によって占有されているメモリ量です。
    .JVMOffHeapMemory Java Virtual Machine によって使用されるヒープ外メモリのピークメモリ使用量。ヒープ外メモリは 1 つ以上のメモリプールで構成されます。返されたメモリ使用量の使用済みおよびコミットされたサイズは、すべてのヒープ外メモリプールのこれらの値の合計ですが、返されたメモリ使用量の初期および最大サイズは、ヒープ外メモリの設定を表し、すべてのヒープ外メモリプールの合計ではない場合があります。
    .OnHeapExecutionMemory 使用中のヒープ上実行メモリのピーク値 (バイト単位)。
    .OffHeapExecutionMemory 使用中のヒープ外実行メモリのピーク値 (バイト単位)。
    .OnHeapStorageMemory 使用中のヒープ上ストレージメモリのピーク値 (バイト単位)。
    .OffHeapStorageMemory 使用中のヒープ外ストレージメモリのピーク値 (バイト単位)。
    .OnHeapUnifiedMemory ヒープ上メモリ (実行およびストレージ) のピーク。
    .OffHeapUnifiedMemory ヒープ外メモリ (実行およびストレージ) のピーク。
    .DirectPoolMemory JVM がダイレクトバッファプールに使用しているメモリのピーク値 (java.lang.management.BufferPoolMXBean)
    .MappedPoolMemory JVM がマップバッファプールに使用しているメモリのピーク値 (java.lang.management.BufferPoolMXBean)
    .ProcessTreeJVMVMemory 仮想メモリサイズ (バイト単位)。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreeJVMRSSMemory Resident Set Size: プロセスが物理メモリに保持しているページの数。これは、テキスト、データ、またはスタック領域をカウントするページのみです。需要ロードされていないページや、スワップアウトされたページは含まれません。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreePythonVMemory Python の仮想メモリサイズ (バイト単位)。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreePythonRSSMemory Python の Resident Set Size。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreeOtherVMemory その他の種類のプロセスの仮想メモリサイズ (バイト単位)。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .ProcessTreeOtherRSSMemory その他の種類のプロセスの Resident Set Size。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。
    .MinorGCCount マイナー GC の総回数。たとえば、ガベージコレクタは Copy、PS Scavenge、ParNew、G1 Young Generation などです。
    .MinorGCTime マイナー GC の経過時間合計。値はミリ秒単位で表されます。
    .MajorGCCount メジャー GC の総回数。たとえば、ガベージコレクタは MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation などです。
    .MajorGCTime メジャー GC の経過時間合計。値はミリ秒単位で表されます。

RSS と Vmem の計算は proc(5) に基づいています。

API バージョンポリシー

これらのエンドポイントは、アプリケーションを構築しやすくするために、強力にバージョン管理されています。特に、Spark は以下を保証します。

実行中のアプリケーションの UI を確認する場合でも、applications/[app-id] の部分は、アプリケーションが 1 つしかない場合でも必要です。たとえば、実行中のアプリのジョブリストを表示するには、https://:4040/api/v1/applications/[app-id]/jobs にアクセスします。これは、両方のモードでパスを一貫させるためです。

メトリクス

Spark は、Dropwizard Metrics Library に基づいた設定可能なメトリクスシステムを備えています。これにより、ユーザーは Spark メトリクスを HTTP、JMX、CSV ファイルなど、さまざまなシンクにレポートできます。メトリクスは、Spark コードベースに埋め込まれたソースによって生成されます。これらは、特定の活動と Spark コンポーネントのインストルメンテーションを提供します。メトリクスシステムは、Spark が $SPARK_HOME/conf/metrics.properties に存在すると想定する設定ファイルを通じて構成されます。カスタムファイル場所は、spark.metrics.conf 設定プロパティを介して指定できます。設定ファイルを使用する代わりに、spark.metrics.conf. というプレフィックスを持つ一連の設定パラメータを使用できます。デフォルトでは、ドライバまたは Executor メトリクスに使用されるルート名前空間は spark.app.id の値です。しかし、多くの場合、ユーザーはドライバと Executor のアプリ全体でメトリクスを追跡できるようにしたいと考えており、これはアプリの各呼び出しで変更されるため、アプリケーション ID (つまり spark.app.id) では困難です。このようなユースケースでは、spark.metrics.namespace 設定プロパティを使用して、メトリクスレポート用のカスタム名前空間を指定できます。たとえば、ユーザーがメトリクス名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace プロパティを ${spark.app.name} のような値に設定できます。この値は、Spark によって適切に展開され、メトリクスシステムのルート名前空間として使用されます。ドライバと Executor 以外のメトリクスは、spark.app.id でプレフィックスが付けられることはありません。また、spark.metrics.namespace プロパティもそのようなメトリクスには影響しません。

Spark のメトリクスは、Spark コンポーネントに対応するさまざまな *インスタンス* に分離されています。各インスタンス内で、メトリクスがレポートされる一連のシンクを構成できます。現在サポートされているインスタンスは次のとおりです。

各インスタンスは、0 個以上の *シンク* にレポートできます。シンクは org.apache.spark.metrics.sink パッケージに含まれています。

Prometheus Servlet は、Metrics Servlet と REST API によって公開される JSON データを時系列形式でミラーリングします。以下は、同等の Prometheus Servlet エンドポイントです。

Component ポート JSON エンドポイント Prometheus エンドポイント
Master 8080 /metrics/master/json/ /metrics/master/prometheus/
Master 8080 /metrics/applications/json/ /metrics/applications/prometheus/
Worker 8081 /metrics/json/ /metrics/prometheus/
Driver 4040 /metrics/json/ /metrics/prometheus/
Driver 4040 /api/v1/applications/{id}/executors/ /metrics/executors/prometheus/

Spark は、ライセンス上の制限によりデフォルトビルドに含まれていない Ganglia シンクもサポートしています。

GangliaSink をインストールするには、Spark のカスタムビルドを実行する必要があります。このライブラリを埋め込むと、LGPL ライセンスのコードが Spark パッケージに含まれることに注意してください。sbt ユーザーは、ビルド前に SPARK_GANGLIA_LGPL 環境変数を設定します。Maven ユーザーは、-Pspark-ganglia-lgpl プロファイルを有効にします。クラスタの Spark ビルドを変更するだけでなく、ユーザーアプリケーションは spark-ganglia-lgpl アーティファクトにリンクする必要があります。

メトリクス設定ファイルの構文と、各シンクで利用可能なパラメータは、例の設定ファイル $SPARK_HOME/conf/metrics.properties.template で定義されています。

メトリクス設定ファイルではなく Spark 設定パラメータを使用する場合、関連するパラメータ名はプレフィックス spark.metrics.conf. に設定詳細が続くことで構成されます。つまり、パラメータは spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name] の形式を取ります。この例では、Graphite シンクの Spark 設定パラメータのリストを示します。

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"

Spark メトリクス設定のデフォルト値は次のとおりです。

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

追加のソースは、メトリクス設定ファイルまたは設定パラメータ spark.metrics.conf.[component_name].source.jvm.class=[source_name] を使用して構成できます。現在、JVM ソースは唯一利用可能なオプションソースです。たとえば、次の設定パラメータは JVM ソースをアクティブにします: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

利用可能なメトリクスプロバイダのリスト

Spark で使用されるメトリクスは、ゲージ、カウンター、ヒストグラム、メーター、タイマーなど、複数のタイプがあります。詳細は Dropwizard ライブラリのドキュメントを参照してください。次のコンポーネントとメトリクスのリストは、利用可能なメトリクスの名前と詳細を、コンポーネントインスタンスとソース名前空間ごとにグループ化して報告します。Spark インストルメンテーションで最も一般的に使用されるメトリクスのタイプは、ゲージとカウンターです。カウンターは .count サフィックスがあることで認識できます。タイマー、メーター、ヒストグラムはリストに注釈付けされています。リストの残りの要素はゲージタイプのメトリクスです。メトリクスの大多数は、親コンポーネントインスタンスが構成されるとすぐにアクティブになります。一部のメトリクスは、追加の設定パラメータを介して有効にする必要もあります。詳細はリストに報告されています。

コンポーネントインスタンス = Driver

これは、最も多くのインストルメント化されたメトリクスを持つコンポーネントです。

コンポーネントインスタンス = Executor

これらのメトリクスは Spark Executor によって公開されます。

ソース = JVMソース

注記

コンポーネントインスタンス = applicationMaster

注意: YARN で実行する場合に適用されます。

コンポーネントインスタンス = master

注意: Spark スタンドアロンのマスターとして実行する場合に適用されます。

コンポーネントインスタンス = ApplicationSource

注意: Spark スタンドアロンのマスターとして実行する場合に適用されます。

コンポーネントインスタンス = worker

注意: Spark スタンドアロンのワーカーとして実行する場合に適用されます。

コンポーネントインスタンス = shuffleService

注意: シャッフルサービスに適用されます。

高度なインストルメンテーション

Spark ジョブのパフォーマンスをプロファイリングするのに役立ついくつかの外部ツールがあります。

Spark はプラグイン API も提供しているため、カスタムインストルメンテーションコードを Spark アプリケーションに追加できます。Spark にプラグインをロードするために利用できる 2 つの設定キーがあります。

どちらも、org.apache.spark.api.plugin.SparkPlugin インターフェイスを実装するクラス名のカンマ区切りリストを受け取ります。2 つの名前が存在するのは、1 つのリストを Spark のデフォルト設定ファイルに配置できるようにするためです。これにより、ユーザーは設定ファイルのリストを上書きすることなく、コマンドラインから簡単に他のプラグインを追加できます。重複するプラグインは無視されます。