モニタリングとインストルメンテーション

Sparkアプリケーションを監視する方法は、Web UI、メトリクス、および外部インストルメンテーションのいくつかあります。

Webインターフェース

すべてのSparkContextは、デフォルトでポート4040でWeb UIを起動し、アプリケーションに関する有用な情報を表示します。これには以下が含まれます。

このインターフェースにアクセスするには、Webブラウザでhttp://<driver-node>:4040を開くだけです。同じホスト上で複数のSparkContextが実行されている場合、それらは4040から始まる連続したポート(4041、4042など)にバインドされます。

この情報は、デフォルトではアプリケーションの実行中にのみ利用可能であることに注意してください。事後にWeb UIを表示するには、アプリケーションを開始する前にspark.eventLog.enabledをtrueに設定します。これにより、UIに表示される情報をエンコードするSparkイベントが永続ストレージにログ記録されるようにSparkが構成されます。

事後の表示

アプリケーションのイベントログが存在する場合、Sparkの履歴サーバーを介してアプリケーションのUIを構築することも可能です。履歴サーバーは、以下を実行することで起動できます。

./sbin/start-history-server.sh

これにより、デフォルトでhttp://<server-url>:18080にWebインターフェースが作成され、未完了および完了したアプリケーションと試行が一覧表示されます。

ファイルシステムプロバイダークラス(以下のspark.history.providerを参照)を使用する場合、基本ログディレクトリはspark.history.fs.logDirectory構成オプションで指定する必要があり、各アプリケーションのイベントログを表すサブディレクトリが含まれている必要があります。

Sparkジョブ自体は、イベントをログに記録し、同じ共有の書き込み可能なディレクトリにログを記録するように構成する必要があります。たとえば、サーバーがhdfs://namenode/shared/spark-logsのログディレクトリで構成されている場合、クライアント側のオプションは次のようになります。

spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs

履歴サーバーは、次のように構成できます。

環境変数

環境変数意味
SPARK_DAEMON_MEMORY 履歴サーバーに割り当てるメモリ(デフォルト:1g)。
SPARK_DAEMON_JAVA_OPTS 履歴サーバーのJVMオプション(デフォルト:なし)。
SPARK_DAEMON_CLASSPATH 履歴サーバーのクラスパス(デフォルト:なし)。
SPARK_PUBLIC_DNS 履歴サーバーのパブリックアドレス。これが設定されていない場合、アプリケーション履歴へのリンクはサーバーの内部アドレスを使用する可能性があり、リンクが壊れます(デフォルト:なし)。
SPARK_HISTORY_OPTS 履歴サーバーのspark.history.*構成オプション(デフォルト:なし)。

ローリングイベントログファイルへの圧縮の適用

長時間実行されるアプリケーション(ストリーミングなど)は、単一の巨大なイベントログファイルを作成する可能性があり、維持に多くのコストがかかり、また、Spark履歴サーバーの各更新ごとに再生するために大量のリソースが必要になります。

spark.eventLog.rolling.enabledおよびspark.eventLog.rolling.maxFileSizeを有効にすると、単一の巨大なイベントログファイルの代わりにローリングイベントログファイルを使用できるようになり、それ自体でいくつかのシナリオに役立つ可能性がありますが、ログ全体のサイズを削減するのに役立つわけではありません。

Spark履歴サーバーは、Spark履歴サーバーで構成spark.history.fs.eventLog.rolling.maxFilesToRetainを設定することにより、ローリングイベントログファイルに圧縮を適用して、ログ全体のサイズを削減できます。

詳細は後述しますが、圧縮は損失のある操作であることに注意してください。圧縮は、UIに表示されなくなる一部のイベントを破棄します。オプションを有効にする前に、破棄されるイベントを確認することをお勧めします。

圧縮が発生すると、履歴サーバーはアプリケーションで利用可能なすべてのイベントログファイルをリストし、保持される最小インデックスを持つファイルよりも小さいインデックスを持つイベントログファイルを圧縮のターゲットと見なします。たとえば、アプリケーションAに5つのイベントログファイルがあり、spark.history.fs.eventLog.rolling.maxFilesToRetainが2に設定されている場合、最初の3つのログファイルが圧縮対象として選択されます。

ターゲットを選択すると、除外できるイベントを把握するために分析し、除外すると判断されたイベントを破棄して、1つのコンパクトなファイルに書き直します。

圧縮は、古いデータを指すイベントを除外しようとします。現在のところ、以下は除外されるイベントの候補を示しています。

書き換えが完了すると、元のログファイルはベストエフォートで削除されます。履歴サーバーは元のログファイルを削除できない場合がありますが、履歴サーバーの動作には影響しません。

Spark履歴サーバーは、圧縮中にあまりスペースが削減されないと判断した場合、古いイベントログファイルを圧縮しない可能性があることに注意してください。ストリーミングクエリの場合、通常、各マイクロバッチが1つ以上のジョブをトリガーし、すぐに完了するため、圧縮が実行されることを期待しますが、バッチクエリの場合、多くの場合圧縮は実行されません。

また、これはSpark 3.0で導入された新機能であり、完全に安定していない可能性があることに注意してください。状況によっては、圧縮によって予期よりも多くのイベントが除外され、アプリケーションの履歴サーバーでUIの問題が発生する可能性があります。注意して使用してください。

Spark履歴サーバーの構成オプション

Spark履歴サーバーのセキュリティオプションについては、セキュリティページで詳しく説明しています。

プロパティ名 デフォルト 意味 バージョン以降
spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider アプリケーション履歴バックエンドを実装するクラスの名前。現在、Sparkによって提供される1つの実装のみがあり、ファイルシステムに保存されたアプリケーションログを検索します。 1.1.0
spark.history.fs.logDirectory file:/tmp/spark-events ファイルシステムの履歴プロバイダーの場合、ロードするアプリケーションイベントログを含むディレクトリへのURL。これは、ローカルのfile://パス、HDFSパスhdfs://namenode/shared/spark-logs、またはHadoop APIでサポートされている代替ファイルシステムのパスにすることができます。 1.1.0
spark.history.fs.update.interval 10秒 ファイルシステム履歴プロバイダーがログディレクトリ内の新規または更新されたログをチェックする間隔。間隔が短いほど新しいアプリケーションがより速く検出されますが、更新されたアプリケーションを再読み込みするためのサーバー負荷が増加します。更新が完了するとすぐに、完了したアプリケーションと未完了のアプリケーションのリストに変更が反映されます。 1.4.0
spark.history.retainedApplications 50 キャッシュにUIデータを保持するアプリケーションの数。この上限を超えると、最も古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュにない場合、UIからアクセスするとディスクからロードする必要があります。 1.0.0
spark.history.ui.maxApplications Int.MaxValue 履歴概要ページに表示するアプリケーションの数。アプリケーションのUIは、履歴概要ページに表示されていなくても、URLに直接アクセスすることで引き続き利用できます。 2.0.1
spark.history.ui.port 18080 履歴サーバーのWebインターフェースがバインドするポート。 1.0.0
spark.history.kerberos.enabled false 履歴サーバーがKerberosを使用してログインする必要があるかどうかを示します。これは、履歴サーバーがセキュアなHadoopクラスター上のHDFSファイルにアクセスする場合に必要です。 1.0.1
spark.history.kerberos.principal (なし) spark.history.kerberos.enabled=trueの場合、履歴サーバーのKerberosプリンシパル名を指定します。 1.0.1
spark.history.kerberos.keytab (なし) spark.history.kerberos.enabled=trueの場合、履歴サーバーのKerberos keytabファイルの場所を指定します。 1.0.1
spark.history.fs.cleaner.enabled false 履歴サーバーがストレージからイベントログを定期的にクリーンアップするかどうかを指定します。 1.4.0
spark.history.fs.cleaner.interval 1日 spark.history.fs.cleaner.enabled=true の場合、ファイルシステムのジョブ履歴クリーナーが削除対象ファイルをチェックする頻度を指定します。ファイルは、以下の2つの条件の少なくとも1つが満たされる場合に削除されます。まず、spark.history.fs.cleaner.maxAge より古い場合、削除されます。また、ファイル数が spark.history.fs.cleaner.maxNum を超える場合も削除されます。Sparkは、アプリケーションの完了した試行ログを、最も古い試行時間の順にクリーンアップしようとします。 1.4.0
spark.history.fs.cleaner.maxAge 7d spark.history.fs.cleaner.enabled=true の場合、この設定より古いジョブ履歴ファイルは、ファイルシステム履歴クリーナーが実行されたときに削除されます。 1.4.0
spark.history.fs.cleaner.maxNum Int.MaxValue spark.history.fs.cleaner.enabled=true の場合、イベントログディレクトリ内のファイルの最大数を指定します。Sparkは、この制限内でログディレクトリを維持するために、完了した試行ログをクリーンアップしようとします。これは、HDFS の `dfs.namenode.fs-limits.max-directory-items` のような、基盤となるファイルシステムの制限よりも小さくする必要があります。 3.0.0
spark.history.fs.endEventReparseChunkSize 1m ログファイルの末尾で終了イベントを検索するために解析するバイト数。これは、イベントログファイルの不要な部分をスキップすることで、アプリケーションリストの生成を高速化するために使用されます。この設定を0に設定すると無効にできます。 2.4.0
spark.history.fs.inProgressOptimization.enabled true 進行中のログの最適化された処理を有効にします。このオプションを使用すると、イベントログの名前変更に失敗した完了済みアプリケーションが、進行中としてリストされる場合があります。 2.4.0
spark.history.fs.driverlog.cleaner.enabled spark.history.fs.cleaner.enabled 履歴サーバーがストレージからドライバーログを定期的にクリーンアップするかどうかを指定します。 3.0.0
spark.history.fs.driverlog.cleaner.interval spark.history.fs.cleaner.interval spark.history.fs.driverlog.cleaner.enabled=true の場合、ファイルシステムのドライバーログクリーナーが削除対象ファイルをチェックする頻度を指定します。ファイルは、spark.history.fs.driverlog.cleaner.maxAge より古い場合にのみ削除されます。 3.0.0
spark.history.fs.driverlog.cleaner.maxAge spark.history.fs.cleaner.maxAge spark.history.fs.driverlog.cleaner.enabled=true の場合、この設定より古いドライバーログファイルは、ドライバーログクリーナーが実行されたときに削除されます。 3.0.0
spark.history.fs.numReplayThreads 利用可能なコアの25% 履歴サーバーがイベントログを処理するために使用するスレッド数。 2.0.0
spark.history.store.maxDiskUsage 10g キャッシュされたアプリケーション履歴情報を格納するローカルディレクトリの最大ディスク使用量。 2.3.0
spark.history.store.path (なし) アプリケーション履歴データをキャッシュするローカルディレクトリ。設定した場合、履歴サーバーは、メモリに保持する代わりに、アプリケーションデータをディスクに保存します。ディスクに書き込まれたデータは、履歴サーバーの再起動時に再利用されます。 2.3.0
spark.history.store.serializer JSON メモリ内のUIオブジェクトをディスクベースのKVストアに書き込み/読み込みするためのシリアライザー。JSONまたはPROTOBUF。JSONシリアライザーは、Spark 3.4.0以前の唯一の選択肢であるため、デフォルト値です。PROTOBUFシリアライザーは、JSONシリアライザーと比較して高速かつコンパクトです。 3.4.0
spark.history.custom.executor.log.url (なし) 履歴サーバーでクラスターマネージャーのアプリケーションログURLを使用する代わりに、外部ログサービスをサポートするためのカスタムSpark ExecutorログURLを指定します。Sparkは、クラスターマネージャーによって異なる可能性のあるパターンを介して、いくつかのパス変数をサポートします。サポートされているパターンがある場合は、クラスターマネージャーのドキュメントを確認してください。この構成は、ライブアプリケーションには影響せず、履歴サーバーにのみ影響します。

現在のところ、YARNモードのみがこの構成をサポートしています

3.0.0
spark.history.custom.executor.log.url.applyIncompleteApplication true カスタムSpark ExecutorログURLを、未完了のアプリケーションにも適用するかどうかを指定します。実行中のアプリケーションのエグゼキューターログを元のログURLとして提供する場合は、これを `false` に設定します。未完了のアプリケーションには、正常にシャットダウンしなかったアプリケーションが含まれる可能性があることに注意してください。これが `true` に設定されていても、この構成はライブアプリケーションには影響せず、履歴サーバーにのみ影響します。 3.0.0
spark.history.fs.eventLog.rolling.maxFilesToRetain Int.MaxValue 非圧縮の状態で保持されるイベントログファイルの最大数。デフォルトでは、すべてのイベントログファイルが保持されます。技術的な理由から、最小値は1です。
詳細については、「古いイベントログファイルの圧縮適用」のセクションを参照してください。
3.0.0
spark.history.store.hybridStore.enabled false イベントログの解析時に、ストアとしてHybridStoreを使用するかどうか。HybridStoreは、最初にデータをメモリ内ストアに書き込み、メモリ内ストアへの書き込みが完了した後、バックグラウンドスレッドでデータをディスクストアにダンプします。 3.1.0
spark.history.store.hybridStore.maxMemoryUsage 2g HybridStoreの作成に使用できる最大メモリ空間。HybridStoreはヒープメモリを共有するため、HybridStoreが有効になっている場合は、SHSのメモリオプションを使用してヒープメモリを増やす必要があります。 3.1.0
spark.history.store.hybridStore.diskBackend ROCKSDB ハイブリッドストアで使用されるディスクベースのストアを指定します。LEVELDBまたはROCKSDB。 3.3.0
spark.history.fs.update.batchSize Int.MaxValue 新しいイベントログファイルの更新バッチサイズを指定します。これにより、各スキャンプロセスが妥当な時間内に完了するように制御され、大規模な環境で初期スキャンが長すぎて新しいイベントログファイルが時間内にスキャンされるのを妨げるのを防ぎます。 3.4.0

これらのUIではすべて、ヘッダーをクリックするとテーブルをソートできるため、低速なタスクやデータの偏りなどを簡単に特定できることに注意してください。

  1. 履歴サーバーは、完了済みと未完了の両方のSparkジョブを表示します。アプリケーションが失敗後に複数回試行した場合、失敗した試行、および進行中の未完了の試行または最終的な成功した試行が表示されます。

  2. 未完了のアプリケーションは、断続的にのみ更新されます。更新間の時間は、変更されたファイルのチェック間隔(spark.history.fs.update.interval)によって定義されます。大規模なクラスターでは、更新間隔を大きな値に設定できます。実行中のアプリケーションを表示する方法は、実際には独自のWeb UIを表示することです。

  3. 完了として自身を登録せずに終了したアプリケーションは、実行されていなくても未完了としてリストされます。これは、アプリケーションがクラッシュした場合に発生する可能性があります。

  4. Sparkジョブの完了を通知する1つの方法は、Sparkコンテキストを明示的に停止する(sc.stop())、またはPythonで with SparkContext() as sc: 構造を使用してSparkコンテキストのセットアップと破棄を処理することです。

REST API

UIでメトリックを表示することに加えて、JSONとしても利用できます。これにより、開発者はSpark用の新しい視覚化ツールや監視ツールを簡単に作成できます。JSONは、実行中のアプリケーションと履歴サーバーの両方で利用できます。エンドポイントは/api/v1にマウントされています。たとえば、履歴サーバーの場合、通常はhttp://<server-url>:18080/api/v1でアクセスでき、実行中のアプリケーションの場合はhttp://localhost:4040/api/v1でアクセスできます。

APIでは、アプリケーションはアプリケーションID、[app-id]で参照されます。YARNで実行する場合、各アプリケーションは複数の試行を持つ可能性がありますが、クラスターモードのアプリケーションにのみ試行IDがあり、クライアントモードのアプリケーションにはありません。YARNクラスターモードのアプリケーションは、[attempt-id]で識別できます。以下に示すAPIでは、YARNクラスターモードで実行する場合、[app-id]は実際には[base-app-id]/[attempt-id]になります。[base-app-id]はYARNアプリケーションIDです。

エンドポイント意味
/applications すべてのアプリケーションのリスト。
?status=[completed|running] 選択した状態のアプリケーションのみをリストします。
?minDate=[date] リストする最も早い開始日時。
?maxDate=[date] リストする最も遅い開始日時。
?minEndDate=[date] リストする最も早い終了日時。
?maxEndDate=[date] リストする最も遅い終了日時。
?limit=[limit] リストされるアプリケーションの数を制限します。

?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs 特定のアプリケーションのすべてのジョブのリスト。
?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみをリストします。
/applications/[app-id]/jobs/[job-id] 特定のジョブの詳細。
/applications/[app-id]/stages 特定のアプリケーションのすべてのステージのリスト。
?status=[active|complete|pending|failed] 特定の状態のステージのみをリストします。
?details=true タスクデータを含むすべてのステージをリストします。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定したタスクステータスのタスクのみをリストします。クエリパラメーター `taskStatus` は、`details=true` の場合にのみ有効です。これは、`?details=true&taskStatus=SUCCESS&taskStatus=FAILED` のように複数の `taskStatus` もサポートしており、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。
?withSummaries=true タスクメトリクスの分布とエグゼキューターメトリクスの分布を含むステージをリストします。
?quantiles=0.0,0.25,0.5,0.75,1.0 指定された分位数でメトリクスを要約します。クエリパラメーター `quantiles` は、`withSummaries=true` の場合にのみ有効です。デフォルト値は `0.0,0.25,0.5,0.75,1.0` です。
/applications/[app-id]/stages/[stage-id] 特定のステージのすべての試行のリスト。
?details=true 特定のステージのタスクデータを含むすべての試行をリストします。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定したタスクステータスのタスクのみをリストします。クエリパラメーター `taskStatus` は、`details=true` の場合にのみ有効です。これは、`?details=true&taskStatus=SUCCESS&taskStatus=FAILED` のように複数の `taskStatus` もサポートしており、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。
?withSummaries=true 各試行のタスクメトリクスの分布とエグゼキューターメトリクスの分布をリストします。
?quantiles=0.0,0.25,0.5,0.75,1.0 指定された分位数でメトリクスを要約します。クエリパラメーター `quantiles` は、`withSummaries=true` の場合にのみ有効です。デフォルト値は `0.0,0.25,0.5,0.75,1.0` です。

?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] 特定のステージ試行の詳細。
?details=true 特定のステージ試行のすべてのタスクデータをリストします。
?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定したタスクステータスのタスクのみをリストします。クエリパラメーター `taskStatus` は、`details=true` の場合にのみ有効です。これは、`?details=true&taskStatus=SUCCESS&taskStatus=FAILED` のように複数の `taskStatus` もサポートしており、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。
?withSummaries=true 特定のステージ試行のタスクメトリクスの分布とエグゼキューターメトリクスの分布をリストします。
?quantiles=0.0,0.25,0.5,0.75,1.0 指定された分位数でメトリクスを要約します。クエリパラメーター `quantiles` は、`withSummaries=true` の場合にのみ有効です。デフォルト値は `0.0,0.25,0.5,0.75,1.0` です。

?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary 特定のステージ試行のすべてのタスクの要約メトリクス。
?quantiles 指定された分位数でメトリクスを要約します。
例: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList 特定のステージ試行のすべてのタスクのリスト。
?offset=[offset]&length=[len] 指定された範囲のタスクをリストします。
?sortBy=[runtime|-runtime] タスクをソートします。
?status=[running|success|killed|failed|unknown] 特定の状態のタスクのみをリストします。
例: ?offset=10&length=50&sortBy=runtime&status=running
/applications/[app-id]/executors 特定のアプリケーションのすべてのアクティブなエグゼキューターのリスト。
/applications/[app-id]/executors/[executor-id]/threads 特定のアクティブなエグゼキューター内で実行されているすべてのスレッドのスタックトレース。履歴サーバー経由では利用できません。
/applications/[app-id]/allexecutors 特定のアプリケーションのすべての(アクティブおよびデッド)エグゼキューターのリスト。
/applications/[app-id]/storage/rdd 特定のアプリケーションの保存されたRDDのリスト。
/applications/[app-id]/storage/rdd/[rdd-id] 特定のRDDのストレージステータスの詳細。
/applications/[base-app-id]/logs 特定のアプリケーションのすべての試行のイベントログを、zipファイル内のファイルとしてダウンロードします。
/applications/[base-app-id]/[attempt-id]/logs 特定のアプリケーション試行のイベントログをzipファイルとしてダウンロードします。
/applications/[app-id]/streaming/statistics ストリーミングコンテキストの統計情報。
/applications/[app-id]/streaming/receivers すべてのストリーミングレシーバーのリスト。
/applications/[app-id]/streaming/receivers/[stream-id] 指定されたレシーバーの詳細。
/applications/[app-id]/streaming/batches 保持されているすべてのバッチのリスト。
/applications/[app-id]/streaming/batches/[batch-id] 指定されたバッチの詳細。
/applications/[app-id]/streaming/batches/[batch-id]/operations 指定されたバッチのすべての出力操作のリスト。
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] 指定された操作と指定されたバッチの詳細。
/applications/[app-id]/sql 指定されたアプリケーションのすべてのクエリのリスト。
?details=[true (デフォルト) | false] は、Sparkプランノードの詳細を表示/非表示にします。
?planDescription=[true (デフォルト) | false] は、物理プランのサイズが大きい場合に、必要に応じて物理 planDescription を有効/無効にします。
?offset=[offset]&length=[len] は、指定された範囲のクエリをリストします。
/applications/[app-id]/sql/[execution-id] 指定されたクエリの詳細。
?details=[true (デフォルト) | false] は、指定されたクエリの詳細に加えて、メトリクスの詳細を表示/非表示にします。
?planDescription=[true (デフォルト) | false] は、物理プランのサイズが大きい場合に、指定されたクエリに対して必要に応じて物理 planDescription を有効/無効にします。
/applications/[app-id]/environment 指定されたアプリケーションの環境詳細。
/version 現在のSparkバージョンを取得します。

取得できるジョブとステージの数は、スタンドアロンSpark UIの同じ保持メカニズムによって制約されます。 "spark.ui.retainedJobs" は、ジョブのガベージコレクションをトリガーするしきい値を定義し、spark.ui.retainedStages はステージのしきい値を定義します。ガベージコレクションは再生時に行われることに注意してください。これらの値を増やして履歴サーバーを再起動することで、より多くのエントリを取得できます。

エグゼキュータタスクのメトリクス

REST APIは、Sparkエグゼキューターによって収集されたタスクメトリクスの値をタスク実行の粒度で公開します。メトリクスは、パフォーマンスのトラブルシューティングとワークロードの特性評価に使用できます。利用可能なメトリクスのリストと簡単な説明。

Sparkエグゼキューターのタスクメトリクス名 簡単な説明
executorRunTime エグゼキューターがこのタスクの実行に費やした経過時間。これには、シャッフルデータのフェッチ時間も含まれます。値はミリ秒単位で表されます。
executorCpuTime エグゼキューターがこのタスクの実行に費やしたCPU時間。これには、シャッフルデータのフェッチ時間も含まれます。値はナノ秒単位で表されます。
executorDeserializeTime このタスクのデシリアライズに費やした経過時間。値はミリ秒単位で表されます。
executorDeserializeCpuTime エグゼキューターでこのタスクをデシリアライズするためにかかったCPU時間。値はナノ秒単位で表されます。
resultSize このタスクがTaskResultとしてドライバーに送信したバイト数。
jvmGCTime このタスクの実行中にJVMがガベージコレクションに費やした経過時間。値はミリ秒単位で表されます。
resultSerializationTime タスク結果のシリアライズに費やした経過時間。値はミリ秒単位で表されます。
memoryBytesSpilled このタスクによってスピルされたインメモリバイト数。
diskBytesSpilled このタスクによってスピルされたディスク上のバイト数。
peakExecutionMemory シャッフル、集計、および結合中に作成された内部データ構造で使用されるピークメモリ。このアキュムレータの値は、このタスクで作成されたすべてのそのようなデータ構造のピークサイズの合計とほぼ一致する必要があります。SQLジョブの場合、これはすべての安全でない演算子とExternalSortのみを追跡します。
inputMetrics.* org.apache.spark.rdd.HadoopRDD からのデータの読み取り、または永続化されたデータに関連するメトリクス。
    .bytesRead 読み取られた合計バイト数。
    .recordsRead 読み取られたレコードの合計数。
outputMetrics.* 外部(分散ファイルシステムなど)へのデータの書き込みに関連するメトリクス。出力のあるタスクでのみ定義されます。
    .bytesWritten 書き込まれた合計バイト数
    .recordsWritten 書き込まれたレコードの合計数
shuffleReadMetrics.* シャッフル読み取り操作に関連するメトリクス。
    .recordsRead シャッフル操作で読み取られたレコード数
    .remoteBlocksFetched シャッフル操作でフェッチされたリモートブロックの数
    .localBlocksFetched シャッフル操作でフェッチされたローカル(リモートエグゼキューターからの読み取りではなく)ブロックの数
    .totalBlocksFetched シャッフル操作でフェッチされたブロックの数(ローカルとリモートの両方)
    .remoteBytesRead シャッフル操作で読み取られたリモートバイト数
    .localBytesRead シャッフル操作でローカルディスクから読み取られたバイト数(リモートエグゼキューターからの読み取りではなく)
    .totalBytesRead シャッフル操作で読み取られたバイト数(ローカルとリモートの両方)
    .remoteBytesReadToDisk シャッフル操作でディスクに読み取られたリモートバイト数。大きなブロックは、デフォルトの動作であるメモリに読み込むのではなく、シャッフル読み取り操作でディスクにフェッチされます。
    .fetchWaitTime タスクがリモートシャッフルブロックを待機するのに費やした時間。これには、シャッフル入力データでブロックする時間が含まれます。たとえば、タスクがブロックAの処理をまだ完了していないときにブロックBがフェッチされている場合、ブロックBでブロックしているとは見なされません。値はミリ秒単位で表されます。
shuffleWriteMetrics.* シャッフルデータの書き込み操作に関連するメトリクス。
    .bytesWritten シャッフル操作で書き込まれたバイト数
    .recordsWritten シャッフル操作で書き込まれたレコード数
    .writeTime ディスクまたはバッファーキャッシュへの書き込みでブロックするのに費やした時間。値はナノ秒単位で表されます。

エグゼキュータのメトリクス

エグゼキューターレベルのメトリクスは、各エグゼキューターからドライバーにハートビートの一部として送信され、JVMヒープメモリやGC情報などのエグゼキューター自体のパフォーマンスメトリクスを記述します。エグゼキューターのメトリクス値と、エグゼキューターごとの測定されたメモリピーク値は、REST API経由でJSON形式およびPrometheus形式で公開されます。JSONエンドポイントは /applications/[app-id]/executors で公開され、Prometheusエンドポイントは /metrics/executors/prometheus で公開されます。Prometheusエンドポイントは、構成パラメーター:spark.ui.prometheus.enabled=true(デフォルトは false)に依存します。さらに、エグゼキューターメモリメトリクスのステージごとの集計されたピーク値は、spark.eventLog.logStageExecutorMetrics がtrueの場合にイベントログに書き込まれます。エグゼキューターメモリメトリクスは、Dropwizard metrics library に基づくSparkメトリクスシステムからも公開されます。利用可能なメトリクスのリストと簡単な説明。

エグゼキューターレベルのメトリクス名 簡単な説明
rddBlocks このエグゼキューターのブロックマネージャー内のRDDブロック。
memoryUsed このエグゼキューターで使用されるストレージメモリ。
diskUsed このエグゼキューターによるRDDストレージに使用されるディスク容量。
totalCores このエグゼキューターで使用可能なコアの数。
maxTasks このエグゼキューターで同時に実行できる最大タスク数。
activeTasks 現在実行中のタスクの数。
failedTasks このエグゼキューターで失敗したタスクの数。
completedTasks このエグゼキューターで完了したタスクの数。
totalTasks このエグゼキューター内のタスクの合計数(実行中、失敗、完了)。
totalDuration JVMがこのエグゼキューターでタスクの実行に費やした経過時間。値はミリ秒単位で表されます。
totalGCTime JVMがこのエグゼキューターで合計したガベージコレクションに費やした経過時間。値はミリ秒単位で表されます。
totalInputBytes このエグゼキューターで合計された入力バイトの合計。
totalShuffleRead このエグゼキューターで合計されたシャッフル読み取りバイトの合計。
totalShuffleWrite このエグゼキューターで合計されたシャッフル書き込みバイトの合計。
maxMemory ストレージに使用可能なメモリの合計量(バイト単位)。
memoryMetrics.* メモリメトリクスの現在の値
    .usedOnHeapStorageMemory 現在、ストレージに使用されるオンヒープメモリ(バイト単位)。
    .usedOffHeapStorageMemory 現在、ストレージに使用されるオフヒープメモリ(バイト単位)。
    .totalOnHeapStorageMemory ストレージに使用できるオンヒープメモリの合計(バイト単位)。この量は、MemoryManagerの実装によって時間の経過とともに変化する可能性があります。
    .totalOffHeapStorageMemory ストレージに使用できるオフヒープメモリの合計(バイト単位)。この量は、MemoryManagerの実装に応じて時間の経過とともに変化する可能性があります。
peakMemoryMetrics.* メモリ(およびGC)メトリクスのピーク値
    .JVMHeapMemory オブジェクトの割り当てに使用されるヒープのピークメモリ使用量。ヒープは1つ以上のメモリプールで構成されます。返されるメモリ使用量の使用済みサイズとコミット済みサイズは、すべてのヒープメモリプールのこれらの値の合計ですが、返されるメモリ使用量の初期サイズと最大サイズは、すべてのヒープメモリプールの合計ではない可能性のあるヒープメモリの設定を表します。返されるメモリ使用量で使用されるメモリの量は、ライブオブジェクトと収集されていないガベージオブジェクトの両方が占めるメモリの量です(存在する場合)。
    .JVMOffHeapMemory Java仮想マシンで使用される非ヒープメモリのピークメモリ使用量。非ヒープメモリは、1つ以上のメモリプールで構成されます。返されるメモリ使用量の使用済みサイズとコミット済みサイズは、すべての非ヒープメモリプールのこれらの値の合計ですが、返されるメモリ使用量の初期サイズと最大サイズは、すべての非ヒープメモリプールの合計ではない可能性のある非ヒープメモリの設定を表します。
    .OnHeapExecutionMemory 使用中のオンヒープ実行メモリのピーク(バイト単位)。
    .OffHeapExecutionMemory 使用中のオフヒープ実行メモリのピーク(バイト単位)。
    .OnHeapStorageMemory 使用中のオンヒープストレージメモリのピーク(バイト単位)。
    .OffHeapStorageMemory 使用中のオフヒープストレージメモリのピーク(バイト単位)。
    .OnHeapUnifiedMemory オンヒープメモリのピーク(実行とストレージ)。
    .OffHeapUnifiedMemory オフヒープメモリのピーク(実行とストレージ)。
    .DirectPoolMemory JVMがダイレクトバッファプール(java.lang.management.BufferPoolMXBean)に使用しているピークメモリ
    .MappedPoolMemory JVMがマップされたバッファプール(java.lang.management.BufferPoolMXBean)に使用しているピークメモリ
    .ProcessTreeJVMVMemory 仮想メモリサイズ(バイト単位)。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。
    .ProcessTreeJVMRSSMemory 常駐セットサイズ:プロセスが実メモリに持つページ数。これは、テキスト、データ、またはスタック領域にカウントされるページのみです。これには、オンデマンドでロードされていないページや、スワップアウトされたページは含まれません。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。
    .ProcessTreePythonVMemory Pythonの仮想メモリサイズ(バイト単位)。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。
    .ProcessTreePythonRSSMemory Pythonの常駐セットサイズ。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。
    .ProcessTreeOtherVMemory 他の種類のプロセスの仮想メモリサイズ(バイト単位)。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。
    .ProcessTreeOtherRSSMemory 他の種類のプロセスの常駐セットサイズ。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。
    .MinorGCCount マイナーGCの合計数。たとえば、ガベージコレクターは、コピー、PSスカベンジ、ParNew、G1ヤングジェネレーションなどです。
    .MinorGCTime 経過したマイナーGCの合計時間。値はミリ秒単位で表されます。
    .MajorGCCount メジャーGCの合計数。たとえば、ガベージコレクターは、MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1オールドジェネレーションなどです。
    .MajorGCTime 経過したメジャーGCの合計時間。値はミリ秒単位で表されます。

RSSとVmemの計算は、proc(5)に基づいています。

APIバージョニングポリシー

これらのエンドポイントは、上部にアプリケーションを開発しやすくするために、厳密にバージョン管理されています。特に、Sparkは保証します

実行中のアプリケーションのUIを調べるときでも、利用可能なアプリケーションが1つしかない場合でも、applications/[app-id] の部分が必要であることに注意してください。例えば、実行中のアプリのジョブのリストを表示するには、http://localhost:4040/api/v1/applications/[app-id]/jobs にアクセスします。これは、両方のモードでパスの一貫性を保つためです。

メトリクス

Sparkには、Dropwizard Metrics Library に基づいた設定可能なメトリクスシステムがあります。これにより、ユーザーはHTTP、JMX、CSVファイルなど、さまざまなシンクにSparkメトリクスを報告できます。メトリクスは、Sparkのコードベースに埋め込まれたソースによって生成されます。これらは、特定の活動やSparkコンポーネントのインストルメンテーションを提供します。メトリクスシステムは、Sparkが$SPARK_HOME/conf/metrics.propertiesに存在すると想定する設定ファイルによって構成されます。カスタムファイルの場所は、spark.metrics.conf 設定プロパティを介して指定できます。設定ファイルの代わりに、プレフィックス spark.metrics.conf. を持つ一連の設定パラメーターを使用できます。デフォルトでは、ドライバーまたはエグゼキューターメトリクスに使用されるルート名前空間は、spark.app.id の値です。ただし、多くの場合、ユーザーはドライバーとエグゼキューターのアプリ間でメトリクスを追跡したいと考えていますが、アプリケーションID(つまり、spark.app.id)はアプリの呼び出しごとに変わるため、これは困難です。このようなユースケースでは、spark.metrics.namespace 設定プロパティを使用して、メトリクスのレポート用にカスタム名前空間を指定できます。例えば、ユーザーがメトリクス名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace プロパティを ${spark.app.name} のような値に設定できます。この値は、Sparkによって適切に展開され、メトリクスシステムのルート名前空間として使用されます。ドライバーとエグゼキューター以外のメトリクスは、spark.app.id でプレフィックスが付けられることはなく、spark.metrics.namespace プロパティもそのようなメトリクスには影響しません。

Sparkのメトリクスは、Sparkコンポーネントに対応する異なるインスタンスに分離されています。各インスタンス内で、メトリクスが報告されるシンクのセットを設定できます。現在、以下のインスタンスがサポートされています。

各インスタンスは、ゼロ個以上のシンクにレポートできます。シンクは、org.apache.spark.metrics.sink パッケージに含まれています。

Sparkは、ライセンス制限のためにデフォルトビルドに含まれていないGangliaシンクもサポートしています。

GangliaSinkをインストールするには、Sparkのカスタムビルドを実行する必要があります。このライブラリを埋め込むと、LGPLライセンスのコードがSparkパッケージに含まれることに注意してください。 sbtユーザーの場合は、ビルド前にSPARK_GANGLIA_LGPL環境変数を設定します。Mavenユーザーの場合は、-Pspark-ganglia-lgplプロファイルを有効にします。クラスターのSparkビルドを修正することに加えて、ユーザーアプリケーションは spark-ganglia-lgpl アーティファクトにリンクする必要があります。

メトリクス構成ファイルの構文と各シンクで使用できるパラメーターは、構成ファイルの例である $SPARK_HOME/conf/metrics.properties.template に定義されています。

メトリクス構成ファイルの代わりにSpark構成パラメーターを使用する場合、関連するパラメーター名は、プレフィックス spark.metrics.conf. と構成の詳細(つまり、パラメーターは spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name] の形式をとります)で構成されます。この例は、Graphiteシンク用のSpark構成パラメーターのリストを示しています。

"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"

Sparkメトリクス構成のデフォルト値は次のとおりです。

"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"

追加のソースは、メトリクス構成ファイルまたは構成パラメーター spark.metrics.conf.[component_name].source.jvm.class=[source_name] を使用して構成できます。現在、JVMソースは利用可能な唯一のオプションソースです。例えば、次の構成パラメーターはJVMソースをアクティブ化します:"spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"

利用可能なメトリクスプロバイダーのリスト

Sparkで使用されるメトリクスには、ゲージ、カウンター、ヒストグラム、メーター、タイマーの複数のタイプがあります。詳細については、Dropwizardライブラリのドキュメントを参照してください。次のコンポーネントとメトリクスのリストでは、コンポーネントインスタンスとソース名前空間ごとにグループ化された、利用可能なメトリクスの名前と詳細をいくつか報告します。Sparkインストルメンテーションで使用される最も一般的なメトリクスのタイプは、ゲージとカウンターです。カウンターは、.count 接尾辞が付いていることで識別できます。タイマー、メーター、ヒストグラムはリストで注釈が付けられています。リストの残りの要素はゲージタイプのメトリクスです。メトリクスの大部分は、親コンポーネントインスタンスが構成されるとすぐにアクティブになります。一部のメトリクスでは、追加の構成パラメーターを介して有効にする必要もあります。詳細についてはリストに記載されています。

コンポーネントインスタンス = Driver

これは、最も多くのインストルメント化されたメトリクスを持つコンポーネントです。

コンポーネントインスタンス = Executor

これらのメトリクスは、Sparkエグゼキューターによって公開されます。

ソース = JVMソース

コンポーネントインスタンス = applicationMaster

注:YARNで実行する場合に適用されます

コンポーネントインスタンス = mesos_cluster

注:Mesosで実行する場合に適用されます

コンポーネントインスタンス = master

注:Sparkスタンドアロンでマスターとして実行する場合に適用されます

コンポーネントインスタンス = ApplicationSource

注:Sparkスタンドアロンでマスターとして実行する場合に適用されます

コンポーネントインスタンス = worker

注:Sparkスタンドアロンでワーカーとして実行する場合に適用されます

コンポーネントインスタンス = shuffleService

注:シャッフルサービスに適用されます

高度なインストルメンテーション

Sparkジョブのパフォーマンスをプロファイリングするのに役立ついくつかの外部ツールを使用できます。

Sparkは、カスタム計測コードをSparkアプリケーションに追加できるように、プラグインAPIも提供しています。Sparkにプラグインをロードするために利用できる構成キーは2つあります。

どちらも、org.apache.spark.api.plugin.SparkPluginインターフェイスを実装するクラス名のコンマ区切りリストを受け取ります。2つの名前が存在するのは、1つのリストをSparkのデフォルト構成ファイルに配置して、ユーザーが構成ファイルのリストを上書きすることなく、コマンドラインから他のプラグインを簡単に追加できるようにするためです。重複するプラグインは無視されます。