監視とインストルメンテーション
Spark アプリケーションを監視するには、Web UI、メトリクス、外部インストルメンテーションなど、いくつかの方法があります。
Webインターフェイス
各 SparkContext は、アプリケーションに関する有用な情報を表示する Web UI を起動します。デフォルトではポート 4040 で起動し、これには以下が含まれます。
- スケジューラステージとタスクのリスト
- RDD サイズとメモリ使用量の概要
- 環境情報。
- 実行中の Executor に関する情報
Web ブラウザで http://<driver-node>:4040 を開くだけで、このインターフェイスにアクセスできます。同じホストで複数の SparkContext が実行されている場合、それらは 4040 から始まる連続したポート (4041、4042 など) にバインドされます。
デフォルトでは、この情報はアプリケーションの存続期間中のみ利用可能であることに注意してください。後で Web UI を表示するには、アプリケーションを開始する前に spark.eventLog.enabled を true に設定します。これにより、Spark は UI に表示される情報をエンコードした Spark イベントを永続ストレージにログ記録するように構成されます。
事後表示
アプリケーションのイベントログが存在する場合、Spark の履歴サーバーを通じて UI を構築することは依然として可能です。履歴サーバーを起動するには、以下を実行します。
./sbin/start-history-server.sh
これにより、デフォルトで http://<server-url>:18080 に Web インターフェイスが作成され、未完了および完了したアプリケーションと試行が一覧表示されます。
ファイルシステムプロバイダクラス (以下の spark.history.provider を参照) を使用する場合、ベースログディレクトリは spark.history.fs.logDirectory 設定オプションで指定する必要があります。これは、各アプリケーションのイベントログを表すサブディレクトリを含む必要があります。
Spark ジョブ自体は、イベントをログ記録し、それらを同じ共有可能ディレクトリにログ記録するように構成する必要があります。たとえば、サーバーが hdfs://namenode/shared/spark-logs のログディレクトリで構成されている場合、クライアント側のオプションは次のようになります。
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
履歴サーバーは次のように構成できます。
環境変数
| 環境変数 | 意味 |
|---|---|
SPARK_DAEMON_MEMORY |
履歴サーバーに割り当てるメモリ (デフォルト: 1g)。 |
SPARK_DAEMON_JAVA_OPTS |
履歴サーバーの JVM オプション (デフォルト: なし)。 |
SPARK_DAEMON_CLASSPATH |
履歴サーバーのクラスパス (デフォルト: なし)。 |
SPARK_PUBLIC_DNS |
履歴サーバーの公開アドレス。これが設定されていない場合、アプリケーション履歴へのリンクはサーバーの内部アドレスを使用する可能性があり、リンク切れにつながります (デフォルト: なし)。 |
SPARK_HISTORY_OPTS |
履歴サーバーの spark.history.* 設定オプション (デフォルト: なし)。 |
ローリングイベントログファイルへのコンパクションの適用
長時間実行されるアプリケーション (例: ストリーミング) は、非常に大きな単一イベントログファイルを生成する可能性があり、保守に多大なコストがかかり、Spark History Server での各更新ごとに大量のリソースを必要とします。
spark.eventLog.rolling.enabled と spark.eventLog.rolling.maxFileSize を有効にすると、単一の巨大なイベントログファイルではなく、ローリングイベントログファイルを使用できるようになります。これは、それ自体で一部のシナリオに役立つ可能性がありますが、ログの全体的なサイズを削減するのに役立つわけではありません。
Spark History Server は、Spark History Server で設定 spark.history.fs.eventLog.rolling.maxFilesToRetain を設定することにより、ローリングイベントログファイルをコンパクションしてログの全体的なサイズを削減できます。
詳細は後述しますが、コンパクションは損失を伴う操作であることに注意してください。コンパクションは、UI に表示されなくなる一部のイベントを破棄します。オプションを有効にする前に、どのイベントが破棄されるかを確認することをお勧めします。
コンパクションが発生すると、履歴サーバーはアプリケーションで利用可能なすべてのイベントログファイルを一覧表示し、保持される最小インデックスを持つファイルよりも小さいインデックスを持つイベントログファイルをコンパクションの対象と見なします。たとえば、アプリケーション A に 5 つのイベントログファイルがあり、spark.history.fs.eventLog.rolling.maxFilesToRetain が 2 に設定されている場合、最初の 3 つのログファイルがコンパクションのために選択されます。
対象を選択したら、それらを分析して除外できるイベントを特定し、除外が決定されたイベントを破棄して 1 つのコンパクトなファイルに書き換えます。
コンパクションは、古いデータを示すイベントを除外しようとします。現在、除外されるイベントの候補は次のとおりです。
- 完了したジョブおよび関連するステージ/タスクのイベント
- 終了した Executor のイベント
- 完了した SQL 実行および関連するジョブ/ステージ/タスクのイベント
書き換えが完了すると、元のログファイルは最善の努力で削除されます。履歴サーバーは元のログファイルを削除できない場合がありますが、履歴サーバーの操作には影響しません。
履歴サーバーは、コンパクション中に削減されるスペースが少ないと判断した場合、古いイベントログファイルをコンパクトしない場合があることに注意してください。ストリーミングクエリの場合、通常、各マイクロバッチが 1 つ以上のジョブをトリガーしてすぐに完了するため、コンパクションが実行されると予想されますが、バッチクエリでは多くの場合、コンパクションは実行されません。
また、これは Spark 3.0 で導入された新機能であり、完全に安定していない可能性があることに注意してください。状況によっては、コンパクションが予期しないよりも多くのイベントを除外する可能性があり、履歴サーバーでアプリケーションの UI に問題が発生する可能性があります。注意して使用してください。
Spark History Server の設定オプション
Spark History Server のセキュリティオプションについては、セキュリティページで詳しく説明しています。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
| spark.history.provider | org.apache.spark.deploy.history.FsHistoryProvider |
アプリケーション履歴バックエンドを実装するクラスの名前。現在、Spark が提供する実装は 1 つだけで、ファイルシステムに格納されているアプリケーションログを検索します。 | 1.1.0 |
| spark.history.fs.logDirectory | file:/tmp/spark-events | ファイルシステム履歴プロバイダの場合、ロードするアプリケーションイベントログを含むディレクトリの URL。これは、ローカルの file:// パス、HDFS パス hdfs://namenode/shared/spark-logs、または Hadoop API でサポートされている代替ファイルシステムの URL にすることができます。 |
1.1.0 |
| spark.history.fs.update.interval | 10s | ファイルシステム履歴プロバイダがログディレクトリ内の新しいまたは更新されたログをチェックする間隔。間隔を短くすると新しいアプリケーションをより速く検出できますが、更新されたアプリケーションを再読み取りするサーバー負荷が増加します。更新が完了するとすぐに、完了したアプリケーションと未完了のアプリケーションのリストに変更が反映されます。 | 1.4.0 |
| spark.history.retainedApplications | 50 | キャッシュで UI データを保持するアプリケーションの数。この上限を超えると、最も古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュにない場合、UI からアクセスされるとディスクからロードする必要があります。 | 1.0.0 |
| spark.history.ui.maxApplications | Int.MaxValue | 履歴概要ページに表示するアプリケーションの数。アプリケーション UI は、履歴概要ページに表示されなくても、URL に直接アクセスすることで引き続き利用できます。 | 2.0.1 |
| spark.history.ui.port | 18080 | 履歴サーバーの Web インターフェイスがバインドされるポート。 | 1.0.0 |
| spark.history.kerberos.enabled | false | 履歴サーバーが Kerberos を使用してログインするかどうかを示します。これは、履歴サーバーがセキュアな Hadoop クラスタで HDFS ファイルにアクセスする場合に必要です。 | 1.0.1 |
| spark.history.kerberos.principal | (なし) | spark.history.kerberos.enabled=true の場合、履歴サーバーの Kerberos プリンシパル名を指定します。 |
1.0.1 |
| spark.history.kerberos.keytab | (なし) | spark.history.kerberos.enabled=true の場合、履歴サーバーの Kerberos キータブファイルの場所を指定します。 |
1.0.1 |
| spark.history.fs.cleaner.enabled | false | 履歴サーバーが定期的にストレージからイベントログをクリーンアップするかどうかを指定します。 | 1.4.0 |
| spark.history.fs.cleaner.interval | 1d | When spark.history.fs.cleaner.enabled=true, specifies how often the filesystem job history cleaner checks for files to delete. Files are deleted if at least one of two conditions holds. First, they're deleted if they're older than spark.history.fs.cleaner.maxAge. They are also deleted if the number of files is more than spark.history.fs.cleaner.maxNum, Spark tries to clean up the completed attempts from the applications based on the order of their oldest attempt time. |
1.4.0 |
| spark.history.fs.cleaner.maxAge | 7d | When spark.history.fs.cleaner.enabled=true, job history files older than this will be deleted when the filesystem history cleaner runs. |
1.4.0 |
| spark.history.fs.cleaner.maxNum | Int.MaxValue | When spark.history.fs.cleaner.enabled=true, specifies the maximum number of files in the event log directory. Spark tries to clean up the completed attempt logs to maintain the log directory under this limit. This should be smaller than the underlying file system limit like `dfs.namenode.fs-limits.max-directory-items` in HDFS. |
3.0.0 |
| spark.history.fs.endEventReparseChunkSize | 1m | How many bytes to parse at the end of log files looking for the end event. This is used to speed up generation of application listings by skipping unnecessary parts of event log files. It can be disabled by setting this config to 0. | 2.4.0 |
| spark.history.fs.inProgressOptimization.enabled | true | Enable optimized handling of in-progress logs. This option may leave finished applications that fail to rename their event logs listed as in-progress. | 2.4.0 |
| spark.history.fs.driverlog.cleaner.enabled | spark.history.fs.cleaner.enabled |
Specifies whether the History Server should periodically clean up driver logs from storage. | 3.0.0 |
| spark.history.fs.driverlog.cleaner.interval | spark.history.fs.cleaner.interval |
When spark.history.fs.driverlog.cleaner.enabled=true, specifies how often the filesystem driver log cleaner checks for files to delete. Files are only deleted if they are older than spark.history.fs.driverlog.cleaner.maxAge |
3.0.0 |
| spark.history.fs.driverlog.cleaner.maxAge | spark.history.fs.cleaner.maxAge |
When spark.history.fs.driverlog.cleaner.enabled=true, driver log files older than this will be deleted when the driver log cleaner runs. |
3.0.0 |
| spark.history.fs.numReplayThreads | 利用可能なコアの 25% | 履歴サーバーがイベントログを処理するために使用するスレッド数。 | 2.0.0 |
| spark.history.store.maxDiskUsage | 10g | キャッシュされたアプリケーション履歴情報が格納されるローカルディレクトリの最大ディスク使用量。 | 2.3.0 |
| spark.history.store.path | (なし) | アプリケーション履歴データをキャッシュするローカルディレクトリ。設定されている場合、履歴サーバーはメモリ内に履歴を保持する代わりにディスクにアプリケーションデータを格納します。ディスクに書き込まれたデータは、履歴サーバーが再起動された場合に再利用されます。 | 2.3.0 |
| spark.history.store.serializer | JSON | メモリ内の UI オブジェクトをディスクベースの KV ストアに書き込んだり読み取ったりするためのシリアライザー。JSON または PROTOBUF。JSON シリアライザーは Spark 3.4.0 より前の唯一の選択肢であるため、デフォルト値です。PROTOBUF シリアライザーは、JSON シリアライザーと比較して高速でコンパクトです。 | 3.4.0 |
| spark.history.custom.executor.log.url | (なし) | 履歴サーバーでクラスタマネージャのアプリケーションログ URL を使用する代わりに、外部ログサービスをサポートするためのカスタム Spark Executor ログ URL を指定します。Spark は、クラスタマネージャによって異なる可能性のあるパターンを使用して、いくつかのパス変数をサポートします。サポートされているパターンがある場合は、クラスタマネージャのドキュメントを確認してください。この設定はライブアプリケーションには影響しません。履歴サーバーにのみ影響します。 現時点では、YARN モードのみがこの設定をサポートしています。 |
3.0.0 |
| spark.history.custom.executor.log.url.applyIncompleteApplication | true | カスタム Spark Executor ログ URL を未完了のアプリケーションにも適用するかどうかを指定します。実行中のアプリケーションの Executor ログが元のログ URL として提供されるべき場合は、これを `false` に設定します。未完了のアプリケーションには、正常にシャットダウンしなかったアプリケーションが含まれる場合があることに注意してください。これが `true` に設定されていても、この設定はライブアプリケーションには影響しません。履歴サーバーにのみ影響します。 | 3.0.0 |
| spark.history.fs.eventLog.rolling.maxFilesToRetain | Int.MaxValue | コンパクションされていない状態として保持されるイベントログファイルの最大数。デフォルトでは、すべてのイベントログファイルが保持されます。技術的な理由から、最小値は 1 です。 詳細については、「古いイベントログファイルのコンパクションの適用」セクションをお読みください。 |
3.0.0 |
| spark.history.store.hybridStore.enabled | false | イベントログの解析時に HybridStore をストアとして使用するかどうか。HybridStore は、まずインメモリストアにデータを書き込み、インメモリストアへの書き込みが完了した後にバックグラウンドスレッドでディスクストアにダンプします。 | 3.1.0 |
| spark.history.store.hybridStore.maxMemoryUsage | 2g | HybridStore を作成するために使用できる最大メモリスペース。HybridStore はヒープメモリを共有するため、HybridStore が有効になっている場合、SHS のメモリオプションを通じてヒープメモリを増やす必要があります。 | 3.1.0 |
| spark.history.store.hybridStore.diskBackend | ROCKSDB | ハイブリッドストアで使用されるディスクベースストアを指定します。ROCKSDB または LEVELDB (非推奨)。 | 3.3.0 |
| spark.history.fs.update.batchSize | Int.MaxValue | 新しいイベントログファイルの更新バッチサイズを指定します。これは、各スキャンプロセスが妥当な時間内に完了することを制御し、初期スキャンが長くなりすぎて、大規模な環境で新しいイベントログファイルが適時にスキャンされるのをブロックするのを防ぎます。 | 3.4.0 |
これらの UI では、テーブルはヘッダーをクリックすることで並べ替え可能であり、遅いタスク、データスキューなどを簡単に特定できることに注意してください。
注意
-
履歴サーバーは、完了した Spark ジョブと未完了の Spark ジョブの両方を表示します。アプリケーションが障害後に複数回試行した場合、失敗した試行、進行中の未完了の試行、または最終的な成功した試行が表示されます。
-
未完了のアプリケーションは断続的にのみ更新されます。更新間の時間は、変更されたファイルをチェックする間隔 (
spark.history.fs.update.interval) によって定義されます。大規模なクラスタでは、更新間隔が大きな値に設定されている場合があります。実行中のアプリケーションを表示する実際の方法は、その独自の Web UI を表示することです。 -
完了したと自身を登録せずに終了したアプリケーションは、実行されていなくても未完了として一覧表示されます。これは、アプリケーションがクラッシュした場合に発生する可能性があります。
-
Spark ジョブの完了を通知する 1 つの方法は、Spark Context を明示的に停止すること (
sc.stop())、または Python ではwith SparkContext() as sc:構文を使用して Spark Context のセットアップとティアダウンを処理することです。
REST API
UI でメトリクスを表示するのに加えて、JSON としても利用できます。これにより、開発者は Spark 用の新しいビジュアライゼーションと監視ツールを簡単に作成できます。JSON は、実行中のアプリケーションと履歴サーバーの両方で利用できます。エンドポイントは /api/v1 にマウントされています。たとえば、履歴サーバーの場合、通常は http://<server-url>:18080/api/v1 でアクセスでき、実行中のアプリケーションの場合は https://:4040/api/v1 でアクセスできます。
API では、アプリケーションはアプリケーション ID [app-id] で参照されます。YARN で実行する場合、各アプリケーションには複数の試行がありますが、試行 ID はクラスタモードのアプリケーションにのみ存在し、クライアントモードのアプリケーションには存在しません。YARN クラスタモードのアプリケーションは、その [attempt-id] で識別できます。以下の API では、YARN クラスタモードで実行する場合、[app-id] は実際には [base-app-id]/[attempt-id] になります。ここで、[base-app-id] は YARN アプリケーション ID です。
| エンドポイント | 意味 |
|---|---|
/applications |
すべてのアプリケーションのリスト。?status=[completed|running] 指定された状態のアプリケーションのみを一覧表示します。?minDate=[date] 一覧表示する最も早い開始日時。?maxDate=[date] 一覧表示する最も遅い開始日時。?minEndDate=[date] 一覧表示する最も早い終了日時。?maxEndDate=[date] 一覧表示する最も遅い終了日時。?limit=[limit] 一覧表示するアプリケーションの数を制限します。例 ?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10 |
/applications/[app-id]/jobs |
指定されたアプリケーションのすべてのジョブのリスト。?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみを一覧表示します。 |
/applications/[app-id]/jobs/[job-id] |
指定されたジョブの詳細。 |
/applications/[app-id]/stages |
指定されたアプリケーションのすべてのステージのリスト。?status=[active|complete|pending|failed] 指定された状態のステージのみを一覧表示します。?details=true すべてのステージをタスクデータとともに一覧表示します。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定されたタスクステータスのタスクのみを一覧表示します。クエリパラメータ taskStatus は、details=true の場合にのみ有効です。これは、複数の taskStatus もサポートします。たとえば、?details=true&taskStatus=SUCCESS&taskStatus=FAILED は、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。?withSummaries=true タスクメトリクス分布と Executor メトリクス分布を持つステージを一覧表示します。?quantiles=0.0,0.25,0.5,0.75,1.0 指定された四分位数でメトリクスを要約します。クエリパラメータ quantiles は、withSummaries=true の場合にのみ有効です。デフォルト値は 0.0,0.25,0.5,0.75,1.0 です。 |
/applications/[app-id]/stages/[stage-id] |
指定されたステージのすべての試行のリスト。?details=true 指定されたステージのタスクデータとともにすべての試行を一覧表示します。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定されたタスクステータスのタスクのみを一覧表示します。クエリパラメータ taskStatus は、details=true の場合にのみ有効です。これは、複数の taskStatus もサポートします。たとえば、?details=true&taskStatus=SUCCESS&taskStatus=FAILED は、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。?withSummaries=true 各試行のタスクメトリクス分布と Executor メトリクス分布を一覧表示します。?quantiles=0.0,0.25,0.5,0.75,1.0 指定された四分位数でメトリクスを要約します。クエリパラメータ quantiles は、withSummaries=true の場合にのみ有効です。デフォルト値は 0.0,0.25,0.5,0.75,1.0 です。例 ?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] |
指定されたステージ試行の詳細。?details=true 指定されたステージ試行のすべてのタスクデータを一覧表示します。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定されたタスクステータスのタスクのみを一覧表示します。クエリパラメータ taskStatus は、details=true の場合にのみ有効です。これは、複数の taskStatus もサポートします。たとえば、?details=true&taskStatus=SUCCESS&taskStatus=FAILED は、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。?withSummaries=true 指定されたステージ試行のタスクメトリクス分布と Executor メトリクス分布を一覧表示します。?quantiles=0.0,0.25,0.5,0.75,1.0 指定された四分位数でメトリクスを要約します。クエリパラメータ quantiles は、withSummaries=true の場合にのみ有効です。デフォルト値は 0.0,0.25,0.5,0.75,1.0 です。例 ?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary |
指定されたステージ試行のすべてのタスクの要約メトリクス。?quantiles 指定された四分位数でメトリクスを要約します。例: ?quantiles=0.01,0.5,0.99 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList |
指定されたステージ試行のすべてのタスクのリスト。?offset=[offset]&length=[len] 指定された範囲のタスクを一覧表示します。?sortBy=[runtime|-runtime] タスクを並べ替えます。?status=[running|success|killed|failed|unknown] 指定された状態のタスクのみを一覧表示します。例: ?offset=10&length=50&sortBy=runtime&status=running |
/applications/[app-id]/executors |
指定されたアプリケーションのすべてのアクティブな Executor のリスト。 |
/applications/[app-id]/executors/[executor-id]/threads |
指定されたアクティブな Executor で実行されているすべてのスレッドのスタックトレース。履歴サーバーからは利用できません。 |
/applications/[app-id]/allexecutors |
指定されたアプリケーションのすべての Executor (アクティブおよび終了済み) のリスト。 |
/applications/[app-id]/storage/rdd |
指定されたアプリケーションの格納されている RDD のリスト。 |
/applications/[app-id]/storage/rdd/[rdd-id] |
指定された RDD の格納ステータスの詳細。 |
/applications/[base-app-id]/logs |
zip ファイル内のファイルとして、指定されたアプリケーションのすべての試行のイベントログをダウンロードします。 |
/applications/[base-app-id]/[attempt-id]/logs |
指定されたアプリケーション試行のイベントログを zip ファイルとしてダウンロードします。 |
/applications/[app-id]/streaming/statistics |
ストリーミングコンテキストの統計。 |
/applications/[app-id]/streaming/receivers |
すべてのストリーミング受信機のリスト。 |
/applications/[app-id]/streaming/receivers/[stream-id] |
指定された受信機の詳細。 |
/applications/[app-id]/streaming/batches |
保持されているすべてのバッチのリスト。 |
/applications/[app-id]/streaming/batches/[batch-id] |
指定されたバッチの詳細。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations |
指定されたバッチのすべての出力操作のリスト。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] |
指定された操作と指定されたバッチの詳細。 |
/applications/[app-id]/sql |
指定されたアプリケーションのすべてのクエリのリスト。?details=[true (default) | false] Spark プランノードの詳細を表示/非表示にします。?planDescription=[true (default) | false] Physical Plan のサイズが大きい場合に、Physical planDescription をオンデマンドで有効/無効にします。?offset=[offset]&length=[len] 指定された範囲のクエリを一覧表示します。 |
/applications/[app-id]/sql/[execution-id] |
指定されたクエリの詳細。?details=[true (default) | false] 指定されたクエリの詳細に加えて、メトリクスの詳細を表示/非表示にします。?planDescription=[true (default) | false] 指定されたクエリの Physical Plan のサイズが大きい場合に、Physical planDescription をオンデマンドで有効/無効にします。 |
/applications/[app-id]/environment |
指定されたアプリケーションの環境詳細。 |
/version |
現在の Spark バージョンを取得します。 |
取得できるジョブとステージの数は、スタンドアロン Spark UI と同じ保持メカニズムによって制限されます。"spark.ui.retainedJobs" はジョブのガベージコレクションをトリガーする閾値を定義し、spark.ui.retainedStages はステージの閾値を定義します。ガベージコレクションは再生時に行われることに注意してください。これらの値を増やして履歴サーバーを再起動することで、より多くのエントリを取得できます。
Executor タスクメトリクス
REST API は、タスク実行の粒度で Spark Executor によって収集されたタスクメトリクスの値を提供します。メトリクスは、パフォーマンスのトラブルシューティングとワークロードの特性評価に使用できます。利用可能なメトリクスのリストと簡単な説明
| Spark Executor タスクメトリクス名 | 簡単な説明 |
|---|---|
| executorRunTime | Executor がこのタスクの実行に費やした経過時間。これには、シャッフルデータの取得時間も含まれます。値はミリ秒単位で表されます。 |
| executorCpuTime | Executor がこのタスクの実行に費やした CPU 時間。これには、シャッフルデータの取得時間も含まれます。値はナノ秒単位で表されます。 |
| executorDeserializeTime | このタスクのデシリアライズに費やされた経過時間。値はミリ秒単位で表されます。 |
| executorDeserializeCpuTime | このタスクのデシリアライズに Executor で費やされた CPU 時間。値はナノ秒単位で表されます。 |
| resultSize | このタスクが TaskResult としてドライバに返送したバイト数。 |
| jvmGCTime | このタスクの実行中に JVM がガベージコレクションに費やした経過時間。値はミリ秒単位で表されます。 |
| ConcurrentGCCount | このメトリクスは、発生したコレクションの総数を示します。Java Garbage Collector が G1 Concurrent GC の場合にのみ適用されます。 |
| ConcurrentGCTime | このメトリクスは、累積されたコレクションの経過時間(ミリ秒)の近似値を示します。Java Garbage Collector が G1 Concurrent GC の場合にのみ適用されます。 |
| resultSerializationTime | タスク結果のシリアライゼーションに費やされた経過時間。値はミリ秒単位で表されます。 |
| memoryBytesSpilled | このタスクによってメモリからスピルされたバイト数。 |
| diskBytesSpilled | このタスクによってディスクにスピルされたバイト数。 |
| peakExecutionMemory | シャッフル、集計、結合中に作成された内部データ構造によって使用されたピークメモリ。このアキュムレータの値は、このタスクで作成されたすべてのデータ構造のピークサイズの合計に近似しているはずです。SQL ジョブの場合、これはすべてのアンセーフオペレータと ExternalSort のみを追跡します。 |
| inputMetrics.* | org.apache.spark.rdd.HadoopRDD から、または永続化されたデータからデータを読み取るに関連するメトリクス。 |
| .bytesRead | 読み取られたバイトの総数。 |
| .recordsRead | 読み取られたレコードの総数。 |
| outputMetrics.* | 外部 (例: 分散ファイルシステム) へのデータ書き込みに関連するメトリクス。出力を持つタスクでのみ定義されます。 |
| .bytesWritten | 書き込まれたバイトの総数 |
| .recordsWritten | 書き込まれたレコードの総数 |
| shuffleReadMetrics.* | シャッフル読み取り操作に関連するメトリクス。 |
| .recordsRead | シャッフル操作で読み取られたレコード数 |
| .remoteBlocksFetched | シャッフル操作で取得されたリモートブロックの数 |
| .localBlocksFetched | シャッフル操作で取得されたローカルブロックの数 (リモート Executor から読み取られたものではなく) |
| .totalBlocksFetched | シャッフル操作で取得されたブロックの数 (ローカルとリモートの両方) |
| .remoteBytesRead | シャッフル操作で読み取られたリモートバイト数 |
| .localBytesRead | シャッフル操作でローカルディスクから読み取られたバイト数 (リモート Executor から読み取られたものではなく) |
| .totalBytesRead | シャッフル操作で読み取られたバイト数 (ローカルとリモートの両方) |
| .remoteBytesReadToDisk | シャッフル操作でディスクに読み取られたリモートバイト数。大きなブロックは、デフォルトの動作であるメモリに読み込まれるのではなく、シャッフル読み取り操作のためにディスクにフェッチされます。 |
| .fetchWaitTime | タスクがリモートシャッフルブロックを待機するのに費やした時間。これは、シャッフル入力データでブロックされる時間のみを含みます。たとえば、タスクがブロック A の処理をまだ終えていない間にブロック B がフェッチされている場合、ブロック B でブロックされているとは見なされません。値はミリ秒単位で表されます。 |
| shuffleWriteMetrics.* | シャッフルデータを書き込む操作に関連するメトリクス。 |
| .bytesWritten | シャッフル操作で書き込まれたバイト数 |
| .recordsWritten | シャッフル操作で書き込まれたレコード数 |
| .writeTime | ディスクまたはバッファキャッシュへの書き込みをブロックするのに費やされた時間。値はナノ秒単位で表されます。 |
Executor メトリクス
Executor レベルのメトリクスは、Executor 自体のパフォーマンスメトリクス (JVM ヒープメモリ、GC 情報など) を説明するために、Heartbeat の一部として各 Executor からドライバに送信されます。Executor メトリクス値とその測定されたメモリピーク値は、REST API を介して JSON 形式と Prometheus 形式で公開されます。JSON エンドポイントは /applications/[app-id]/executors で公開され、Prometheus エンドポイントは /metrics/executors/prometheus で公開されます。さらに、Executor メモリメトリクスの集計されたステージごとのピーク値は、spark.eventLog.logStageExecutorMetrics が true の場合にイベントログに書き込まれます。Executor メモリメトリクスは、Dropwizard メトリクスライブラリに基づいた Spark メトリクスシステムを介して公開される場合もあります。利用可能なメトリクスのリストと簡単な説明
| Executor レベルメトリクス名 | 簡単な説明 |
|---|---|
| rddBlocks | この Executor のブロックマネージャにある RDD ブロック。 |
| memoryUsed | この Executor によって使用されているストレージメモリ。 |
| diskUsed | この Executor による RDD ストレージのために使用されているディスクスペース。 |
| totalCores | この Executor で利用可能なコア数。 |
| maxTasks | この Executor で同時に実行できるタスクの最大数。 |
| activeTasks | 現在実行中のタスク数。 |
| failedTasks | この Executor で失敗したタスク数。 |
| completedTasks | この Executor で完了したタスク数。 |
| totalTasks | この Executor のタスクの総数 (実行中、失敗、完了)。 |
| totalDuration | この Executor でタスクを実行するために JVM が費やした経過時間。値はミリ秒単位で表されます。 |
| totalGCTime | この Executor で合計されたガベージコレクションに JVM が費やした経過時間。値はミリ秒単位で表されます。 |
| totalInputBytes | この Executor で合計された入力バイト数。 |
| totalShuffleRead | この Executor で合計されたシャッフル読み取りバイト数。 |
| totalShuffleWrite | この Executor で合計されたシャッフル書き込みバイト数。 |
| maxMemory | ストレージに利用可能なメモリの総量 (バイト単位)。 |
| memoryMetrics.* | メモリメトリクスの現在の値 |
| .usedOnHeapStorageMemory | 現在ストレージに使用されているヒープ上のメモリ (バイト単位)。 |
| .usedOffHeapStorageMemory | 現在ストレージに使用されているヒープ外メモリ (バイト単位)。 |
| .totalOnHeapStorageMemory | ストレージに利用可能なヒープ上のメモリの総量 (バイト単位)。この量は、MemoryManager の実装によって時間とともに変動する可能性があります。 |
| .totalOffHeapStorageMemory | ストレージに利用可能なヒープ外メモリの総量 (バイト単位)。この量は、MemoryManager の実装によって時間とともに変動する可能性があります。 |
| peakMemoryMetrics.* | メモリ (および GC) メトリクスのピーク値 |
| .JVMHeapMemory | オブジェクト割り当てに使用されるヒープのピークメモリ使用量。ヒープは 1 つ以上のメモリプールで構成されます。返されたメモリ使用量の使用済みおよびコミットされたサイズは、すべてのヒープメモリプールのこれらの値の合計ですが、返されたメモリ使用量の初期および最大サイズは、ヒープメモリの設定を表し、すべてのヒープメモリプールの合計ではない場合があります。返されたメモリ使用量の使用済みメモリ量は、ライブオブジェクトと、収集されていないガベージオブジェクトの両方によって占有されているメモリ量です。 |
| .JVMOffHeapMemory | Java Virtual Machine によって使用されるヒープ外メモリのピークメモリ使用量。ヒープ外メモリは 1 つ以上のメモリプールで構成されます。返されたメモリ使用量の使用済みおよびコミットされたサイズは、すべてのヒープ外メモリプールのこれらの値の合計ですが、返されたメモリ使用量の初期および最大サイズは、ヒープ外メモリの設定を表し、すべてのヒープ外メモリプールの合計ではない場合があります。 |
| .OnHeapExecutionMemory | 使用中のヒープ上実行メモリのピーク値 (バイト単位)。 |
| .OffHeapExecutionMemory | 使用中のヒープ外実行メモリのピーク値 (バイト単位)。 |
| .OnHeapStorageMemory | 使用中のヒープ上ストレージメモリのピーク値 (バイト単位)。 |
| .OffHeapStorageMemory | 使用中のヒープ外ストレージメモリのピーク値 (バイト単位)。 |
| .OnHeapUnifiedMemory | ヒープ上メモリ (実行およびストレージ) のピーク。 |
| .OffHeapUnifiedMemory | ヒープ外メモリ (実行およびストレージ) のピーク。 |
| .DirectPoolMemory | JVM がダイレクトバッファプールに使用しているメモリのピーク値 (java.lang.management.BufferPoolMXBean) |
| .MappedPoolMemory | JVM がマップバッファプールに使用しているメモリのピーク値 (java.lang.management.BufferPoolMXBean) |
| .ProcessTreeJVMVMemory | 仮想メモリサイズ (バイト単位)。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
| .ProcessTreeJVMRSSMemory | Resident Set Size: プロセスが物理メモリに保持しているページの数。これは、テキスト、データ、またはスタック領域をカウントするページのみです。需要ロードされていないページや、スワップアウトされたページは含まれません。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
| .ProcessTreePythonVMemory | Python の仮想メモリサイズ (バイト単位)。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
| .ProcessTreePythonRSSMemory | Python の Resident Set Size。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
| .ProcessTreeOtherVMemory | その他の種類のプロセスの仮想メモリサイズ (バイト単位)。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
| .ProcessTreeOtherRSSMemory | その他の種類のプロセスの Resident Set Size。spark.executor.processTreeMetrics.enabled が true の場合に有効になります。 |
| .MinorGCCount | マイナー GC の総回数。たとえば、ガベージコレクタは Copy、PS Scavenge、ParNew、G1 Young Generation などです。 |
| .MinorGCTime | マイナー GC の経過時間合計。値はミリ秒単位で表されます。 |
| .MajorGCCount | メジャー GC の総回数。たとえば、ガベージコレクタは MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation などです。 |
| .MajorGCTime | メジャー GC の経過時間合計。値はミリ秒単位で表されます。 |
RSS と Vmem の計算は proc(5) に基づいています。
API バージョンポリシー
これらのエンドポイントは、アプリケーションを構築しやすくするために、強力にバージョン管理されています。特に、Spark は以下を保証します。
- エンドポイントは、1 つのバージョンから削除されることはありません
- 特定の API エンドポイントの個々のフィールドが削除されることはありません
- 新しいエンドポイントが追加される可能性があります
- 既存のエンドポイントに新しいフィールドが追加される可能性があります
- API の新しいバージョンは、将来、別のエンドポイント (例:
api/v2) として追加される可能性があります。新しいバージョンは、後方互換性が保証されている必要はありません。 - API バージョンは、新しい API バージョンと共存してから少なくとも 1 つのマイナーリリース後に削除される可能性があります。
実行中のアプリケーションの UI を確認する場合でも、applications/[app-id] の部分は、アプリケーションが 1 つしかない場合でも必要です。たとえば、実行中のアプリのジョブリストを表示するには、https://:4040/api/v1/applications/[app-id]/jobs にアクセスします。これは、両方のモードでパスを一貫させるためです。
メトリクス
Spark は、Dropwizard Metrics Library に基づいた設定可能なメトリクスシステムを備えています。これにより、ユーザーは Spark メトリクスを HTTP、JMX、CSV ファイルなど、さまざまなシンクにレポートできます。メトリクスは、Spark コードベースに埋め込まれたソースによって生成されます。これらは、特定の活動と Spark コンポーネントのインストルメンテーションを提供します。メトリクスシステムは、Spark が $SPARK_HOME/conf/metrics.properties に存在すると想定する設定ファイルを通じて構成されます。カスタムファイル場所は、spark.metrics.conf 設定プロパティを介して指定できます。設定ファイルを使用する代わりに、spark.metrics.conf. というプレフィックスを持つ一連の設定パラメータを使用できます。デフォルトでは、ドライバまたは Executor メトリクスに使用されるルート名前空間は spark.app.id の値です。しかし、多くの場合、ユーザーはドライバと Executor のアプリ全体でメトリクスを追跡できるようにしたいと考えており、これはアプリの各呼び出しで変更されるため、アプリケーション ID (つまり spark.app.id) では困難です。このようなユースケースでは、spark.metrics.namespace 設定プロパティを使用して、メトリクスレポート用のカスタム名前空間を指定できます。たとえば、ユーザーがメトリクス名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace プロパティを ${spark.app.name} のような値に設定できます。この値は、Spark によって適切に展開され、メトリクスシステムのルート名前空間として使用されます。ドライバと Executor 以外のメトリクスは、spark.app.id でプレフィックスが付けられることはありません。また、spark.metrics.namespace プロパティもそのようなメトリクスには影響しません。
Spark のメトリクスは、Spark コンポーネントに対応するさまざまな *インスタンス* に分離されています。各インスタンス内で、メトリクスがレポートされる一連のシンクを構成できます。現在サポートされているインスタンスは次のとおりです。
master: Spark スタンドアロンマスタープロセス。applications: マスター内の、さまざまなアプリケーションをレポートするコンポーネント。worker: Spark スタンドアロンワーカープロセス。executor: Spark Executor。driver: Spark ドライバプロセス (SparkContext が作成されるプロセス)。shuffleService: Spark シャッフルサービス。applicationMaster: YARN で実行する場合の Spark ApplicationMaster。
各インスタンスは、0 個以上の *シンク* にレポートできます。シンクは org.apache.spark.metrics.sink パッケージに含まれています。
ConsoleSink: メトリクス情報をコンソールにログ記録します。CSVSink: メトリクスデータを定期的に CSV ファイルにエクスポートします。JmxSink: JMX コンソールで表示するためのメトリクスを登録します。MetricsServlet: 既存の Spark UI 内にサーブレットを追加し、メトリクスデータを JSON データとして提供します。PrometheusServlet: (実験的) 既存の Spark UI 内にサーブレットを追加し、メトリクスデータを Prometheus 形式で提供します。GraphiteSink: メトリクスを Graphite ノードに送信します。Slf4jSink: メトリクスをログエントリとして slf4j に送信します。StatsdSink: メトリクスを StatsD ノードに送信します。
Prometheus Servlet は、Metrics Servlet と REST API によって公開される JSON データを時系列形式でミラーリングします。以下は、同等の Prometheus Servlet エンドポイントです。
| Component | ポート | JSON エンドポイント | Prometheus エンドポイント |
|---|---|---|---|
| Master | 8080 | /metrics/master/json/ |
/metrics/master/prometheus/ |
| Master | 8080 | /metrics/applications/json/ |
/metrics/applications/prometheus/ |
| Worker | 8081 | /metrics/json/ |
/metrics/prometheus/ |
| Driver | 4040 | /metrics/json/ |
/metrics/prometheus/ |
| Driver | 4040 | /api/v1/applications/{id}/executors/ |
/metrics/executors/prometheus/ |
Spark は、ライセンス上の制限によりデフォルトビルドに含まれていない Ganglia シンクもサポートしています。
GangliaSink: メトリクスを Ganglia ノードまたはマルチキャストグループに送信します。
GangliaSink をインストールするには、Spark のカスタムビルドを実行する必要があります。このライブラリを埋め込むと、LGPL ライセンスのコードが Spark パッケージに含まれることに注意してください。sbt ユーザーは、ビルド前に SPARK_GANGLIA_LGPL 環境変数を設定します。Maven ユーザーは、-Pspark-ganglia-lgpl プロファイルを有効にします。クラスタの Spark ビルドを変更するだけでなく、ユーザーアプリケーションは spark-ganglia-lgpl アーティファクトにリンクする必要があります。
メトリクス設定ファイルの構文と、各シンクで利用可能なパラメータは、例の設定ファイル $SPARK_HOME/conf/metrics.properties.template で定義されています。
メトリクス設定ファイルではなく Spark 設定パラメータを使用する場合、関連するパラメータ名はプレフィックス spark.metrics.conf. に設定詳細が続くことで構成されます。つまり、パラメータは spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name] の形式を取ります。この例では、Graphite シンクの Spark 設定パラメータのリストを示します。
"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"
Spark メトリクス設定のデフォルト値は次のとおりです。
"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"
追加のソースは、メトリクス設定ファイルまたは設定パラメータ spark.metrics.conf.[component_name].source.jvm.class=[source_name] を使用して構成できます。現在、JVM ソースは唯一利用可能なオプションソースです。たとえば、次の設定パラメータは JVM ソースをアクティブにします: "spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"
利用可能なメトリクスプロバイダのリスト
Spark で使用されるメトリクスは、ゲージ、カウンター、ヒストグラム、メーター、タイマーなど、複数のタイプがあります。詳細は Dropwizard ライブラリのドキュメントを参照してください。次のコンポーネントとメトリクスのリストは、利用可能なメトリクスの名前と詳細を、コンポーネントインスタンスとソース名前空間ごとにグループ化して報告します。Spark インストルメンテーションで最も一般的に使用されるメトリクスのタイプは、ゲージとカウンターです。カウンターは .count サフィックスがあることで認識できます。タイマー、メーター、ヒストグラムはリストに注釈付けされています。リストの残りの要素はゲージタイプのメトリクスです。メトリクスの大多数は、親コンポーネントインスタンスが構成されるとすぐにアクティブになります。一部のメトリクスは、追加の設定パラメータを介して有効にする必要もあります。詳細はリストに報告されています。
コンポーネントインスタンス = Driver
これは、最も多くのインストルメント化されたメトリクスを持つコンポーネントです。
- namespace=BlockManager
- disk.diskSpaceUsed_MB
- memory.maxMem_MB
- memory.maxOffHeapMem_MB
- memory.maxOnHeapMem_MB
- memory.memUsed_MB
- memory.offHeapMemUsed_MB
- memory.onHeapMemUsed_MB
- memory.remainingMem_MB
- memory.remainingOffHeapMem_MB
- memory.remainingOnHeapMem_MB
- namespace=HiveExternalCatalog
- 注意: これらのメトリクスは、設定パラメータ:
spark.metrics.staticSources.enabled(デフォルトは true) に条件付きです。 - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- 注意: これらのメトリクスは、設定パラメータ:
- namespace=CodeGenerator
- 注意: これらのメトリクスは、設定パラメータ:
spark.metrics.staticSources.enabled(デフォルトは true) に条件付きです。 - compilationTime (histogram)
- generatedClassSize (histogram)
- generatedMethodSize (histogram)
- sourceCodeSize (histogram)
- 注意: これらのメトリクスは、設定パラメータ:
- namespace=DAGScheduler
- job.activeJobs
- job.allJobs
- messageProcessingTime (timer)
- stage.failedStages
- stage.runningStages
- stage.waitingStages
- namespace=LiveListenerBus
- listenerProcessingTime.org.apache.spark.HeartbeatReceiver (timer)
- listenerProcessingTime.org.apache.spark.scheduler.EventLoggingListener (timer)
- listenerProcessingTime.org.apache.spark.status.AppStatusListener (timer)
- numEventsPosted.count
- queue.appStatus.listenerProcessingTime (timer)
- queue.appStatus.numDroppedEvents.count
- queue.appStatus.size
- queue.eventLog.listenerProcessingTime (timer)
- queue.eventLog.numDroppedEvents.count
- queue.eventLog.size
- queue.executorManagement.listenerProcessingTime (timer)
- namespace=appStatus (すべてのメトリクスタイプ=counter)
- 注意: Spark 3.0 で導入されました。設定パラメータ:
spark.metrics.appStatusSource.enabled(デフォルトは true) に条件付き - stages.failedStages.count
- stages.skippedStages.count
- stages.completedStages.count
- tasks.blackListedExecutors.count // 非推奨、excludedExecutors を使用
- tasks.excludedExecutors.count
- tasks.completedTasks.count
- tasks.failedTasks.count
- tasks.killedTasks.count
- tasks.skippedTasks.count
- tasks.unblackListedExecutors.count // 非推奨、unexcludedExecutors を使用
- tasks.unexcludedExecutors.count
- jobs.succeededJobs
- jobs.failedJobs
- jobDuration
- 注意: Spark 3.0 で導入されました。設定パラメータ:
- namespace=AccumulatorSource
- 注意: アキュムレータをメトリクスシステムにアタッチするための、ユーザー提供のソース。
- DoubleAccumulatorSource
- LongAccumulatorSource
- namespace=spark.streaming
- 注意: これは Spark Structured Streaming にのみ適用されます。設定パラメータ:
spark.sql.streaming.metricsEnabled=true(デフォルトは false) に条件付き - eventTime-watermark
- inputRate-total
- latency
- processingRate-total
- states-rowsTotal
- states-usedBytes
- 注意: これは Spark Structured Streaming にのみ適用されます。設定パラメータ:
- namespace=JVMCPU
- jvmCpuTime
- namespace=executor
- 注意: これらのメトリクスは、ローカルモードでのみドライバで利用できます。
- この名前空間で利用可能なメトリクスの完全なリストは、Executor コンポーネントインスタンスに対応するエントリで確認できます。
- namespace=ExecutorMetrics
- 注意: これらのメトリクスは、設定パラメータ:
spark.metrics.executorMetricsSource.enabled(デフォルトは true) に条件付きです。 - このソースにはメモリ関連のメトリクスが含まれています。この名前空間で利用可能なメトリクスの完全なリストは、Executor コンポーネントインスタンスに対応するエントリで確認できます。
- 注意: これらのメトリクスは、設定パラメータ:
- namespace=ExecutorAllocationManager
- 注意: これらのメトリクスは、動的割り当てを使用している場合にのみ発行されます。設定パラメータ
spark.dynamicAllocation.enabled(デフォルトは false) に条件付き。 - executors.numberExecutorsToAdd
- executors.numberExecutorsPendingToRemove
- executors.numberAllExecutors
- executors.numberTargetExecutors
- executors.numberMaxNeededExecutors
- executors.numberDecommissioningExecutors
- executors.numberExecutorsGracefullyDecommissioned.count
- executors.numberExecutorsDecommissionUnfinished.count
- executors.numberExecutorsExitedUnexpectedly.count
- executors.numberExecutorsKilledByDriver.count
- 注意: これらのメトリクスは、動的割り当てを使用している場合にのみ発行されます。設定パラメータ
- namespace=plugin.<Plugin Class Name>
- オプションの名前空間。この名前空間のメトリクスは、ユーザー提供のコードによって定義され、Spark プラグイン API を使用して構成されます。「高度なインストルメンテーション」セクションで、カスタムプラグインを Spark にロードする方法を参照してください。
コンポーネントインスタンス = Executor
これらのメトリクスは Spark Executor によって公開されます。
- namespace=executor (メトリクスは counter または gauge タイプ)
- 注記
spark.executor.metrics.fileSystemSchemes(デフォルト:file,hdfs) は、公開されるファイルシステムメトリクスを決定します。
- bytesRead.count
- bytesWritten.count
- cpuTime.count
- deserializeCpuTime.count
- deserializeTime.count
- diskBytesSpilled.count
- filesystem.file.largeRead_ops
- filesystem.file.read_bytes
- filesystem.file.read_ops
- filesystem.file.write_bytes
- filesystem.file.write_ops
- filesystem.hdfs.largeRead_ops
- filesystem.hdfs.read_bytes
- filesystem.hdfs.read_ops
- filesystem.hdfs.write_bytes
- filesystem.hdfs.write_ops
- jvmGCTime.count
- memoryBytesSpilled.count
- recordsRead.count
- recordsWritten.count
- resultSerializationTime.count
- resultSize.count
- runTime.count
- shuffleBytesWritten.count
- shuffleFetchWaitTime.count
- shuffleLocalBlocksFetched.count
- shuffleLocalBytesRead.count
- shuffleRecordsRead.count
- shuffleRecordsWritten.count
- shuffleRemoteBlocksFetched.count
- shuffleRemoteBytesRead.count
- shuffleRemoteBytesReadToDisk.count
- shuffleTotalBytesRead.count
- shuffleWriteTime.count
- プッシュベースシャッフル関連メトリクス
- shuffleCorruptMergedBlockChunks
- shuffleMergedFetchFallbackCount
- shuffleMergedRemoteBlocksFetched
- shuffleMergedLocalBlocksFetched
- shuffleMergedRemoteChunksFetched
- shuffleMergedLocalChunksFetched
- shuffleMergedRemoteBytesRead
- shuffleMergedLocalBytesRead
- shuffleRemoteReqsDuration
- shuffleMergedRemoteReqsDuration
- succeededTasks.count
- threadpool.activeTasks
- threadpool.completeTasks
- threadpool.currentPool_size
- threadpool.maxPool_size
- threadpool.startedTasks
- 注記
- namespace=ExecutorMetrics
- 注記
- これらのメトリクスは、設定パラメータ:
spark.metrics.executorMetricsSource.enabled(デフォルト値は true) に条件付きです。 - ExecutorMetrics は、Executor およびドライバのためにスケジュールされたハートビートプロセスの一部として定期的に更新されます:
spark.executor.heartbeatInterval(デフォルト値は 10 秒)。 - Executor メモリメトリクス用のオプションの高速ポーリングメカニズムが利用可能です。これは、設定パラメータ
spark.executor.metrics.pollingIntervalを使用してポーリング間隔 (ミリ秒単位) を設定することでアクティブ化できます。
- これらのメトリクスは、設定パラメータ:
- JVMHeapMemory
- JVMOffHeapMemory
- OnHeapExecutionMemory
- OnHeapStorageMemory
- OnHeapUnifiedMemory
- OffHeapExecutionMemory
- OffHeapStorageMemory
- OffHeapUnifiedMemory
- DirectPoolMemory
- MappedPoolMemory
- MinorGCCount
- MinorGCTime
- MajorGCCount
- MajorGCTime
- 「ProcessTree*」メトリクスカウンター
- ProcessTreeJVMVMemory
- ProcessTreeJVMRSSMemory
- ProcessTreePythonVMemory
- ProcessTreePythonRSSMemory
- ProcessTreeOtherVMemory
- ProcessTreeOtherRSSMemory
- 注意: 「ProcessTree*」メトリクスは、特定の条件下でのみ収集されます。条件は、
/procファイルシステムが存在し、spark.executor.processTreeMetrics.enabled=trueであることの論理 AND です。「ProcessTree*」メトリクスは、これらの条件が満たされない場合は 0 を返します。
- 注記
- namespace=JVMCPU
- jvmCpuTime
- namespace=NettyBlockTransfer
- shuffle-client.usedDirectMemory
- shuffle-client.usedHeapMemory
- shuffle-server.usedDirectMemory
- shuffle-server.usedHeapMemory
- namespace=HiveExternalCatalog
- 注意: これらのメトリクスは、設定パラメータ:
spark.metrics.staticSources.enabled(デフォルトは true) に条件付きです。 - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- 注意: これらのメトリクスは、設定パラメータ:
- namespace=CodeGenerator
- 注意: これらのメトリクスは、設定パラメータ:
spark.metrics.staticSources.enabled(デフォルトは true) に条件付きです。 - compilationTime (histogram)
- generatedClassSize (histogram)
- generatedMethodSize (histogram)
- sourceCodeSize (histogram)
- 注意: これらのメトリクスは、設定パラメータ:
- namespace=plugin.<Plugin Class Name>
- オプションの名前空間。この名前空間のメトリクスは、ユーザー提供のコードによって定義され、Spark プラグイン API を使用して構成されます。「高度なインストルメンテーション」セクションで、カスタムプラグインを Spark にロードする方法を参照してください。
ソース = JVMソース
注記
- 関連する
metrics.propertiesファイルのエントリまたは設定パラメータ:spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSourceを設定して、このソースをアクティブ化します。 - これらのメトリクスは、設定パラメータ:
spark.metrics.staticSources.enabled(デフォルトは true) に条件付きです。 - このソースは、ドライバおよび Executor インスタンスで利用可能であり、他のインスタンスでも利用可能です。
- このソースは、Dropwizard/Codahale Metric Sets for JVM instrumentation を使用して JVM メトリクスに関する情報を提供し、特に BufferPoolMetricSet、GarbageCollectorMetricSet、MemoryUsageGaugeSet のメトリクスセットを提供します。
コンポーネントインスタンス = applicationMaster
注意: YARN で実行する場合に適用されます。
- numContainersPendingAllocate
- numExecutorsFailed
- numExecutorsRunning
- numLocalityAwareTasks
- numReleasedContainers
コンポーネントインスタンス = master
注意: Spark スタンドアロンのマスターとして実行する場合に適用されます。
- workers
- aliveWorkers
- apps
- waitingApps
コンポーネントインスタンス = ApplicationSource
注意: Spark スタンドアロンのマスターとして実行する場合に適用されます。
- status
- runtime_ms
- cores
コンポーネントインスタンス = worker
注意: Spark スタンドアロンのワーカーとして実行する場合に適用されます。
- executors
- coresUsed
- memUsed_MB
- coresFree
- memFree_MB
コンポーネントインスタンス = shuffleService
注意: シャッフルサービスに適用されます。
- blockTransferRate (meter) - ブロック転送のレート
- blockTransferMessageRate (meter) - ブロック転送メッセージのレート。つまり、バッチフェッチが有効になっている場合、これはブロック数ではなくバッチ数を表します。
- blockTransferRateBytes (meter)
- blockTransferAvgSize_1min (gauge - 1分移動平均)
- numActiveConnections.count
- numRegisteredConnections.count
- numCaughtExceptions.count
- openBlockRequestLatencyMillis (timer)
- registerExecutorRequestLatencyMillis (timer)
- fetchMergedBlocksMetaLatencyMillis (timer)
- finalizeShuffleMergeLatencyMillis (timer)
- registeredExecutorsSize
- shuffle-server.usedDirectMemory
-
shuffle-server.usedHeapMemory
- 注意: 以下のメトリクスは、サーバー側設定
spark.shuffle.push.server.mergedShuffleFileManagerImplが Push-Based Shuffle のためにorg.apache.spark.network.shuffle.MergedShuffleFileManagerに設定されている場合に適用されます。 - blockBytesWritten - プッシュされたブロックデータがファイルに書き込まれたサイズ (バイト単位)
- blockAppendCollisions - シャッフルプッシュブロックがシャッフルサービスで衝突した回数。同じ削減パーティションの別のブロックが書き込まれていた場合。
- lateBlockPushes - 特定のシャッフルマージが完了した後にシャッフルサービスで受信されたシャッフルプッシュブロックの数
- deferredBlocks - メモリにバッファリングされている現在の遅延ブロック部分の数
- deferredBlockBytes - メモリにバッファリングされている現在の遅延ブロック部分のサイズ
- staleBlockPushes - 古いシャッフルブロックプッシュ要求の数
- ignoredBlockBytes - ESS に転送されたが無視されたプッシュブロックデータのサイズ。プッシュブロックデータは、次の場合に無視されたと見なされます: 1. シャッフルが完了した後に受信された場合。2. プッシュ要求が重複ブロックのものである場合。3. ESS がブロックを書き込めなかった場合。
高度なインストルメンテーション
Spark ジョブのパフォーマンスをプロファイリングするのに役立ついくつかの外部ツールがあります。
- Ganglia のようなクラスタ全体の監視ツールは、全体的なクラスタの使用状況とリソースのボトルネックに関する洞察を提供できます。たとえば、Ganglia ダッシュボードは、特定のワークロードがディスクバウンド、ネットワークバウンド、または CPU バウンドであるかを迅速に明らかにできます。
- dstat、iostat、iotop のような OS プロファイリングツールは、個々のノードで詳細なプロファイリングを提供できます。
- スタックトレースを提供する
jstack、ヒープダンプを作成するためのjmap、時系列統計を報告するためのjstat、およびさまざまな JVM プロパティを視覚的に探索するためのjconsoleのような JVM ユーティリティは、JVM の内部に精通しているユーザーにとって役立ちます。
Spark はプラグイン API も提供しているため、カスタムインストルメンテーションコードを Spark アプリケーションに追加できます。Spark にプラグインをロードするために利用できる 2 つの設定キーがあります。
spark.pluginsspark.plugins.defaultList
どちらも、org.apache.spark.api.plugin.SparkPlugin インターフェイスを実装するクラス名のカンマ区切りリストを受け取ります。2 つの名前が存在するのは、1 つのリストを Spark のデフォルト設定ファイルに配置できるようにするためです。これにより、ユーザーは設定ファイルのリストを上書きすることなく、コマンドラインから簡単に他のプラグインを追加できます。重複するプラグインは無視されます。