モニタリングとインストルメンテーション
Sparkアプリケーションを監視する方法は、Web UI、メトリクス、および外部インストルメンテーションのいくつかあります。
Webインターフェース
すべてのSparkContextは、デフォルトでポート4040でWeb UIを起動し、アプリケーションに関する有用な情報を表示します。これには以下が含まれます。
- スケジューラーステージとタスクのリスト
- RDDのサイズとメモリ使用量の概要
- 環境情報。
- 実行中のエグゼキュータに関する情報
このインターフェースにアクセスするには、Webブラウザでhttp://<driver-node>:4040
を開くだけです。同じホスト上で複数のSparkContextが実行されている場合、それらは4040から始まる連続したポート(4041、4042など)にバインドされます。
この情報は、デフォルトではアプリケーションの実行中にのみ利用可能であることに注意してください。事後にWeb UIを表示するには、アプリケーションを開始する前にspark.eventLog.enabled
をtrueに設定します。これにより、UIに表示される情報をエンコードするSparkイベントが永続ストレージにログ記録されるようにSparkが構成されます。
事後の表示
アプリケーションのイベントログが存在する場合、Sparkの履歴サーバーを介してアプリケーションのUIを構築することも可能です。履歴サーバーは、以下を実行することで起動できます。
./sbin/start-history-server.sh
これにより、デフォルトでhttp://<server-url>:18080
にWebインターフェースが作成され、未完了および完了したアプリケーションと試行が一覧表示されます。
ファイルシステムプロバイダークラス(以下のspark.history.provider
を参照)を使用する場合、基本ログディレクトリはspark.history.fs.logDirectory
構成オプションで指定する必要があり、各アプリケーションのイベントログを表すサブディレクトリが含まれている必要があります。
Sparkジョブ自体は、イベントをログに記録し、同じ共有の書き込み可能なディレクトリにログを記録するように構成する必要があります。たとえば、サーバーがhdfs://namenode/shared/spark-logs
のログディレクトリで構成されている場合、クライアント側のオプションは次のようになります。
spark.eventLog.enabled true
spark.eventLog.dir hdfs://namenode/shared/spark-logs
履歴サーバーは、次のように構成できます。
環境変数
環境変数 | 意味 |
---|---|
SPARK_DAEMON_MEMORY |
履歴サーバーに割り当てるメモリ(デフォルト:1g)。 |
SPARK_DAEMON_JAVA_OPTS |
履歴サーバーのJVMオプション(デフォルト:なし)。 |
SPARK_DAEMON_CLASSPATH |
履歴サーバーのクラスパス(デフォルト:なし)。 |
SPARK_PUBLIC_DNS |
履歴サーバーのパブリックアドレス。これが設定されていない場合、アプリケーション履歴へのリンクはサーバーの内部アドレスを使用する可能性があり、リンクが壊れます(デフォルト:なし)。 |
SPARK_HISTORY_OPTS |
履歴サーバーのspark.history.* 構成オプション(デフォルト:なし)。 |
ローリングイベントログファイルへの圧縮の適用
長時間実行されるアプリケーション(ストリーミングなど)は、単一の巨大なイベントログファイルを作成する可能性があり、維持に多くのコストがかかり、また、Spark履歴サーバーの各更新ごとに再生するために大量のリソースが必要になります。
spark.eventLog.rolling.enabled
およびspark.eventLog.rolling.maxFileSize
を有効にすると、単一の巨大なイベントログファイルの代わりにローリングイベントログファイルを使用できるようになり、それ自体でいくつかのシナリオに役立つ可能性がありますが、ログ全体のサイズを削減するのに役立つわけではありません。
Spark履歴サーバーは、Spark履歴サーバーで構成spark.history.fs.eventLog.rolling.maxFilesToRetain
を設定することにより、ローリングイベントログファイルに圧縮を適用して、ログ全体のサイズを削減できます。
詳細は後述しますが、圧縮は損失のある操作であることに注意してください。圧縮は、UIに表示されなくなる一部のイベントを破棄します。オプションを有効にする前に、破棄されるイベントを確認することをお勧めします。
圧縮が発生すると、履歴サーバーはアプリケーションで利用可能なすべてのイベントログファイルをリストし、保持される最小インデックスを持つファイルよりも小さいインデックスを持つイベントログファイルを圧縮のターゲットと見なします。たとえば、アプリケーションAに5つのイベントログファイルがあり、spark.history.fs.eventLog.rolling.maxFilesToRetain
が2に設定されている場合、最初の3つのログファイルが圧縮対象として選択されます。
ターゲットを選択すると、除外できるイベントを把握するために分析し、除外すると判断されたイベントを破棄して、1つのコンパクトなファイルに書き直します。
圧縮は、古いデータを指すイベントを除外しようとします。現在のところ、以下は除外されるイベントの候補を示しています。
- 完了したジョブのイベントと、関連するステージ/タスクイベント
- 終了したエグゼキュータのイベント
- 完了したSQL実行のイベントと、関連するジョブ/ステージ/タスクイベント
書き換えが完了すると、元のログファイルはベストエフォートで削除されます。履歴サーバーは元のログファイルを削除できない場合がありますが、履歴サーバーの動作には影響しません。
Spark履歴サーバーは、圧縮中にあまりスペースが削減されないと判断した場合、古いイベントログファイルを圧縮しない可能性があることに注意してください。ストリーミングクエリの場合、通常、各マイクロバッチが1つ以上のジョブをトリガーし、すぐに完了するため、圧縮が実行されることを期待しますが、バッチクエリの場合、多くの場合圧縮は実行されません。
また、これはSpark 3.0で導入された新機能であり、完全に安定していない可能性があることに注意してください。状況によっては、圧縮によって予期よりも多くのイベントが除外され、アプリケーションの履歴サーバーでUIの問題が発生する可能性があります。注意して使用してください。
Spark履歴サーバーの構成オプション
Spark履歴サーバーのセキュリティオプションについては、セキュリティページで詳しく説明しています。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.history.provider | org.apache.spark.deploy.history.FsHistoryProvider |
アプリケーション履歴バックエンドを実装するクラスの名前。現在、Sparkによって提供される1つの実装のみがあり、ファイルシステムに保存されたアプリケーションログを検索します。 | 1.1.0 |
spark.history.fs.logDirectory | file:/tmp/spark-events | ファイルシステムの履歴プロバイダーの場合、ロードするアプリケーションイベントログを含むディレクトリへのURL。これは、ローカルのfile:// パス、HDFSパスhdfs://namenode/shared/spark-logs 、またはHadoop APIでサポートされている代替ファイルシステムのパスにすることができます。 |
1.1.0 |
spark.history.fs.update.interval | 10秒 | ファイルシステム履歴プロバイダーがログディレクトリ内の新規または更新されたログをチェックする間隔。間隔が短いほど新しいアプリケーションがより速く検出されますが、更新されたアプリケーションを再読み込みするためのサーバー負荷が増加します。更新が完了するとすぐに、完了したアプリケーションと未完了のアプリケーションのリストに変更が反映されます。 | 1.4.0 |
spark.history.retainedApplications | 50 | キャッシュにUIデータを保持するアプリケーションの数。この上限を超えると、最も古いアプリケーションがキャッシュから削除されます。アプリケーションがキャッシュにない場合、UIからアクセスするとディスクからロードする必要があります。 | 1.0.0 |
spark.history.ui.maxApplications | Int.MaxValue | 履歴概要ページに表示するアプリケーションの数。アプリケーションのUIは、履歴概要ページに表示されていなくても、URLに直接アクセスすることで引き続き利用できます。 | 2.0.1 |
spark.history.ui.port | 18080 | 履歴サーバーのWebインターフェースがバインドするポート。 | 1.0.0 |
spark.history.kerberos.enabled | false | 履歴サーバーがKerberosを使用してログインする必要があるかどうかを示します。これは、履歴サーバーがセキュアなHadoopクラスター上のHDFSファイルにアクセスする場合に必要です。 | 1.0.1 |
spark.history.kerberos.principal | (なし) | spark.history.kerberos.enabled=true の場合、履歴サーバーのKerberosプリンシパル名を指定します。 |
1.0.1 |
spark.history.kerberos.keytab | (なし) | spark.history.kerberos.enabled=true の場合、履歴サーバーのKerberos keytabファイルの場所を指定します。 |
1.0.1 |
spark.history.fs.cleaner.enabled | false | 履歴サーバーがストレージからイベントログを定期的にクリーンアップするかどうかを指定します。 | 1.4.0 |
spark.history.fs.cleaner.interval | 1日 | spark.history.fs.cleaner.enabled=true の場合、ファイルシステムのジョブ履歴クリーナーが削除対象ファイルをチェックする頻度を指定します。ファイルは、以下の2つの条件の少なくとも1つが満たされる場合に削除されます。まず、spark.history.fs.cleaner.maxAge より古い場合、削除されます。また、ファイル数が spark.history.fs.cleaner.maxNum を超える場合も削除されます。Sparkは、アプリケーションの完了した試行ログを、最も古い試行時間の順にクリーンアップしようとします。 |
1.4.0 |
spark.history.fs.cleaner.maxAge | 7d | spark.history.fs.cleaner.enabled=true の場合、この設定より古いジョブ履歴ファイルは、ファイルシステム履歴クリーナーが実行されたときに削除されます。 |
1.4.0 |
spark.history.fs.cleaner.maxNum | Int.MaxValue | spark.history.fs.cleaner.enabled=true の場合、イベントログディレクトリ内のファイルの最大数を指定します。Sparkは、この制限内でログディレクトリを維持するために、完了した試行ログをクリーンアップしようとします。これは、HDFS の `dfs.namenode.fs-limits.max-directory-items` のような、基盤となるファイルシステムの制限よりも小さくする必要があります。 |
3.0.0 |
spark.history.fs.endEventReparseChunkSize | 1m | ログファイルの末尾で終了イベントを検索するために解析するバイト数。これは、イベントログファイルの不要な部分をスキップすることで、アプリケーションリストの生成を高速化するために使用されます。この設定を0に設定すると無効にできます。 | 2.4.0 |
spark.history.fs.inProgressOptimization.enabled | true | 進行中のログの最適化された処理を有効にします。このオプションを使用すると、イベントログの名前変更に失敗した完了済みアプリケーションが、進行中としてリストされる場合があります。 | 2.4.0 |
spark.history.fs.driverlog.cleaner.enabled | spark.history.fs.cleaner.enabled |
履歴サーバーがストレージからドライバーログを定期的にクリーンアップするかどうかを指定します。 | 3.0.0 |
spark.history.fs.driverlog.cleaner.interval | spark.history.fs.cleaner.interval |
spark.history.fs.driverlog.cleaner.enabled=true の場合、ファイルシステムのドライバーログクリーナーが削除対象ファイルをチェックする頻度を指定します。ファイルは、spark.history.fs.driverlog.cleaner.maxAge より古い場合にのみ削除されます。 |
3.0.0 |
spark.history.fs.driverlog.cleaner.maxAge | spark.history.fs.cleaner.maxAge |
spark.history.fs.driverlog.cleaner.enabled=true の場合、この設定より古いドライバーログファイルは、ドライバーログクリーナーが実行されたときに削除されます。 |
3.0.0 |
spark.history.fs.numReplayThreads | 利用可能なコアの25% | 履歴サーバーがイベントログを処理するために使用するスレッド数。 | 2.0.0 |
spark.history.store.maxDiskUsage | 10g | キャッシュされたアプリケーション履歴情報を格納するローカルディレクトリの最大ディスク使用量。 | 2.3.0 |
spark.history.store.path | (なし) | アプリケーション履歴データをキャッシュするローカルディレクトリ。設定した場合、履歴サーバーは、メモリに保持する代わりに、アプリケーションデータをディスクに保存します。ディスクに書き込まれたデータは、履歴サーバーの再起動時に再利用されます。 | 2.3.0 |
spark.history.store.serializer | JSON | メモリ内のUIオブジェクトをディスクベースのKVストアに書き込み/読み込みするためのシリアライザー。JSONまたはPROTOBUF。JSONシリアライザーは、Spark 3.4.0以前の唯一の選択肢であるため、デフォルト値です。PROTOBUFシリアライザーは、JSONシリアライザーと比較して高速かつコンパクトです。 | 3.4.0 |
spark.history.custom.executor.log.url | (なし) | 履歴サーバーでクラスターマネージャーのアプリケーションログURLを使用する代わりに、外部ログサービスをサポートするためのカスタムSpark ExecutorログURLを指定します。Sparkは、クラスターマネージャーによって異なる可能性のあるパターンを介して、いくつかのパス変数をサポートします。サポートされているパターンがある場合は、クラスターマネージャーのドキュメントを確認してください。この構成は、ライブアプリケーションには影響せず、履歴サーバーにのみ影響します。 現在のところ、YARNモードのみがこの構成をサポートしています |
3.0.0 |
spark.history.custom.executor.log.url.applyIncompleteApplication | true | カスタムSpark ExecutorログURLを、未完了のアプリケーションにも適用するかどうかを指定します。実行中のアプリケーションのエグゼキューターログを元のログURLとして提供する場合は、これを `false` に設定します。未完了のアプリケーションには、正常にシャットダウンしなかったアプリケーションが含まれる可能性があることに注意してください。これが `true` に設定されていても、この構成はライブアプリケーションには影響せず、履歴サーバーにのみ影響します。 | 3.0.0 |
spark.history.fs.eventLog.rolling.maxFilesToRetain | Int.MaxValue | 非圧縮の状態で保持されるイベントログファイルの最大数。デフォルトでは、すべてのイベントログファイルが保持されます。技術的な理由から、最小値は1です。 詳細については、「古いイベントログファイルの圧縮適用」のセクションを参照してください。 |
3.0.0 |
spark.history.store.hybridStore.enabled | false | イベントログの解析時に、ストアとしてHybridStoreを使用するかどうか。HybridStoreは、最初にデータをメモリ内ストアに書き込み、メモリ内ストアへの書き込みが完了した後、バックグラウンドスレッドでデータをディスクストアにダンプします。 | 3.1.0 |
spark.history.store.hybridStore.maxMemoryUsage | 2g | HybridStoreの作成に使用できる最大メモリ空間。HybridStoreはヒープメモリを共有するため、HybridStoreが有効になっている場合は、SHSのメモリオプションを使用してヒープメモリを増やす必要があります。 | 3.1.0 |
spark.history.store.hybridStore.diskBackend | ROCKSDB | ハイブリッドストアで使用されるディスクベースのストアを指定します。LEVELDBまたはROCKSDB。 | 3.3.0 |
spark.history.fs.update.batchSize | Int.MaxValue | 新しいイベントログファイルの更新バッチサイズを指定します。これにより、各スキャンプロセスが妥当な時間内に完了するように制御され、大規模な環境で初期スキャンが長すぎて新しいイベントログファイルが時間内にスキャンされるのを妨げるのを防ぎます。 | 3.4.0 |
これらのUIではすべて、ヘッダーをクリックするとテーブルをソートできるため、低速なタスクやデータの偏りなどを簡単に特定できることに注意してください。
注
-
履歴サーバーは、完了済みと未完了の両方のSparkジョブを表示します。アプリケーションが失敗後に複数回試行した場合、失敗した試行、および進行中の未完了の試行または最終的な成功した試行が表示されます。
-
未完了のアプリケーションは、断続的にのみ更新されます。更新間の時間は、変更されたファイルのチェック間隔(
spark.history.fs.update.interval
)によって定義されます。大規模なクラスターでは、更新間隔を大きな値に設定できます。実行中のアプリケーションを表示する方法は、実際には独自のWeb UIを表示することです。 -
完了として自身を登録せずに終了したアプリケーションは、実行されていなくても未完了としてリストされます。これは、アプリケーションがクラッシュした場合に発生する可能性があります。
-
Sparkジョブの完了を通知する1つの方法は、Sparkコンテキストを明示的に停止する(
sc.stop()
)、またはPythonでwith SparkContext() as sc:
構造を使用してSparkコンテキストのセットアップと破棄を処理することです。
REST API
UIでメトリックを表示することに加えて、JSONとしても利用できます。これにより、開発者はSpark用の新しい視覚化ツールや監視ツールを簡単に作成できます。JSONは、実行中のアプリケーションと履歴サーバーの両方で利用できます。エンドポイントは/api/v1
にマウントされています。たとえば、履歴サーバーの場合、通常はhttp://<server-url>:18080/api/v1
でアクセスでき、実行中のアプリケーションの場合はhttp://localhost:4040/api/v1
でアクセスできます。
APIでは、アプリケーションはアプリケーションID、[app-id]
で参照されます。YARNで実行する場合、各アプリケーションは複数の試行を持つ可能性がありますが、クラスターモードのアプリケーションにのみ試行IDがあり、クライアントモードのアプリケーションにはありません。YARNクラスターモードのアプリケーションは、[attempt-id]
で識別できます。以下に示すAPIでは、YARNクラスターモードで実行する場合、[app-id]
は実際には[base-app-id]/[attempt-id]
になります。[base-app-id]
はYARNアプリケーションIDです。
エンドポイント | 意味 |
---|---|
/applications |
すべてのアプリケーションのリスト。?status=[completed|running] 選択した状態のアプリケーションのみをリストします。?minDate=[date] リストする最も早い開始日時。?maxDate=[date] リストする最も遅い開始日時。?minEndDate=[date] リストする最も早い終了日時。?maxEndDate=[date] リストする最も遅い終了日時。?limit=[limit] リストされるアプリケーションの数を制限します。例 ?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10 |
/applications/[app-id]/jobs |
特定のアプリケーションのすべてのジョブのリスト。?status=[running|succeeded|failed|unknown] 特定の状態のジョブのみをリストします。 |
/applications/[app-id]/jobs/[job-id] |
特定のジョブの詳細。 |
/applications/[app-id]/stages |
特定のアプリケーションのすべてのステージのリスト。?status=[active|complete|pending|failed] 特定の状態のステージのみをリストします。?details=true タスクデータを含むすべてのステージをリストします。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定したタスクステータスのタスクのみをリストします。クエリパラメーター `taskStatus` は、`details=true` の場合にのみ有効です。これは、`?details=true&taskStatus=SUCCESS&taskStatus=FAILED` のように複数の `taskStatus` もサポートしており、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。?withSummaries=true タスクメトリクスの分布とエグゼキューターメトリクスの分布を含むステージをリストします。?quantiles=0.0,0.25,0.5,0.75,1.0 指定された分位数でメトリクスを要約します。クエリパラメーター `quantiles` は、`withSummaries=true` の場合にのみ有効です。デフォルト値は `0.0,0.25,0.5,0.75,1.0` です。 |
/applications/[app-id]/stages/[stage-id] |
特定のステージのすべての試行のリスト。?details=true 特定のステージのタスクデータを含むすべての試行をリストします。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定したタスクステータスのタスクのみをリストします。クエリパラメーター `taskStatus` は、`details=true` の場合にのみ有効です。これは、`?details=true&taskStatus=SUCCESS&taskStatus=FAILED` のように複数の `taskStatus` もサポートしており、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。?withSummaries=true 各試行のタスクメトリクスの分布とエグゼキューターメトリクスの分布をリストします。?quantiles=0.0,0.25,0.5,0.75,1.0 指定された分位数でメトリクスを要約します。クエリパラメーター `quantiles` は、`withSummaries=true` の場合にのみ有効です。デフォルト値は `0.0,0.25,0.5,0.75,1.0` です。例 ?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] |
特定のステージ試行の詳細。?details=true 特定のステージ試行のすべてのタスクデータをリストします。?taskStatus=[RUNNING|SUCCESS|FAILED|KILLED|PENDING] 指定したタスクステータスのタスクのみをリストします。クエリパラメーター `taskStatus` は、`details=true` の場合にのみ有効です。これは、`?details=true&taskStatus=SUCCESS&taskStatus=FAILED` のように複数の `taskStatus` もサポートしており、指定されたタスクステータスのいずれかに一致するすべてのタスクを返します。?withSummaries=true 特定のステージ試行のタスクメトリクスの分布とエグゼキューターメトリクスの分布をリストします。?quantiles=0.0,0.25,0.5,0.75,1.0 指定された分位数でメトリクスを要約します。クエリパラメーター `quantiles` は、`withSummaries=true` の場合にのみ有効です。デフォルト値は `0.0,0.25,0.5,0.75,1.0` です。例 ?details=true
?details=true&taskStatus=RUNNING
?withSummaries=true
?details=true&withSummaries=true&quantiles=0.01,0.5,0.99
|
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary |
特定のステージ試行のすべてのタスクの要約メトリクス。?quantiles 指定された分位数でメトリクスを要約します。例: ?quantiles=0.01,0.5,0.99 |
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList |
特定のステージ試行のすべてのタスクのリスト。?offset=[offset]&length=[len] 指定された範囲のタスクをリストします。?sortBy=[runtime|-runtime] タスクをソートします。?status=[running|success|killed|failed|unknown] 特定の状態のタスクのみをリストします。例: ?offset=10&length=50&sortBy=runtime&status=running |
/applications/[app-id]/executors |
特定のアプリケーションのすべてのアクティブなエグゼキューターのリスト。 |
/applications/[app-id]/executors/[executor-id]/threads |
特定のアクティブなエグゼキューター内で実行されているすべてのスレッドのスタックトレース。履歴サーバー経由では利用できません。 |
/applications/[app-id]/allexecutors |
特定のアプリケーションのすべての(アクティブおよびデッド)エグゼキューターのリスト。 |
/applications/[app-id]/storage/rdd |
特定のアプリケーションの保存されたRDDのリスト。 |
/applications/[app-id]/storage/rdd/[rdd-id] |
特定のRDDのストレージステータスの詳細。 |
/applications/[base-app-id]/logs |
特定のアプリケーションのすべての試行のイベントログを、zipファイル内のファイルとしてダウンロードします。 |
/applications/[base-app-id]/[attempt-id]/logs |
特定のアプリケーション試行のイベントログをzipファイルとしてダウンロードします。 |
/applications/[app-id]/streaming/statistics |
ストリーミングコンテキストの統計情報。 |
/applications/[app-id]/streaming/receivers |
すべてのストリーミングレシーバーのリスト。 |
/applications/[app-id]/streaming/receivers/[stream-id] |
指定されたレシーバーの詳細。 |
/applications/[app-id]/streaming/batches |
保持されているすべてのバッチのリスト。 |
/applications/[app-id]/streaming/batches/[batch-id] |
指定されたバッチの詳細。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations |
指定されたバッチのすべての出力操作のリスト。 |
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] |
指定された操作と指定されたバッチの詳細。 |
/applications/[app-id]/sql |
指定されたアプリケーションのすべてのクエリのリスト。?details=[true (デフォルト) | false] は、Sparkプランノードの詳細を表示/非表示にします。?planDescription=[true (デフォルト) | false] は、物理プランのサイズが大きい場合に、必要に応じて物理 planDescription を有効/無効にします。?offset=[offset]&length=[len] は、指定された範囲のクエリをリストします。 |
/applications/[app-id]/sql/[execution-id] |
指定されたクエリの詳細。?details=[true (デフォルト) | false] は、指定されたクエリの詳細に加えて、メトリクスの詳細を表示/非表示にします。?planDescription=[true (デフォルト) | false] は、物理プランのサイズが大きい場合に、指定されたクエリに対して必要に応じて物理 planDescription を有効/無効にします。 |
/applications/[app-id]/environment |
指定されたアプリケーションの環境詳細。 |
/version |
現在のSparkバージョンを取得します。 |
取得できるジョブとステージの数は、スタンドアロンSpark UIの同じ保持メカニズムによって制約されます。 "spark.ui.retainedJobs"
は、ジョブのガベージコレクションをトリガーするしきい値を定義し、spark.ui.retainedStages
はステージのしきい値を定義します。ガベージコレクションは再生時に行われることに注意してください。これらの値を増やして履歴サーバーを再起動することで、より多くのエントリを取得できます。
エグゼキュータタスクのメトリクス
REST APIは、Sparkエグゼキューターによって収集されたタスクメトリクスの値をタスク実行の粒度で公開します。メトリクスは、パフォーマンスのトラブルシューティングとワークロードの特性評価に使用できます。利用可能なメトリクスのリストと簡単な説明。
Sparkエグゼキューターのタスクメトリクス名 | 簡単な説明 |
---|---|
executorRunTime | エグゼキューターがこのタスクの実行に費やした経過時間。これには、シャッフルデータのフェッチ時間も含まれます。値はミリ秒単位で表されます。 |
executorCpuTime | エグゼキューターがこのタスクの実行に費やしたCPU時間。これには、シャッフルデータのフェッチ時間も含まれます。値はナノ秒単位で表されます。 |
executorDeserializeTime | このタスクのデシリアライズに費やした経過時間。値はミリ秒単位で表されます。 |
executorDeserializeCpuTime | エグゼキューターでこのタスクをデシリアライズするためにかかったCPU時間。値はナノ秒単位で表されます。 |
resultSize | このタスクがTaskResultとしてドライバーに送信したバイト数。 |
jvmGCTime | このタスクの実行中にJVMがガベージコレクションに費やした経過時間。値はミリ秒単位で表されます。 |
resultSerializationTime | タスク結果のシリアライズに費やした経過時間。値はミリ秒単位で表されます。 |
memoryBytesSpilled | このタスクによってスピルされたインメモリバイト数。 |
diskBytesSpilled | このタスクによってスピルされたディスク上のバイト数。 |
peakExecutionMemory | シャッフル、集計、および結合中に作成された内部データ構造で使用されるピークメモリ。このアキュムレータの値は、このタスクで作成されたすべてのそのようなデータ構造のピークサイズの合計とほぼ一致する必要があります。SQLジョブの場合、これはすべての安全でない演算子とExternalSortのみを追跡します。 |
inputMetrics.* | org.apache.spark.rdd.HadoopRDD からのデータの読み取り、または永続化されたデータに関連するメトリクス。 |
.bytesRead | 読み取られた合計バイト数。 |
.recordsRead | 読み取られたレコードの合計数。 |
outputMetrics.* | 外部(分散ファイルシステムなど)へのデータの書き込みに関連するメトリクス。出力のあるタスクでのみ定義されます。 |
.bytesWritten | 書き込まれた合計バイト数 |
.recordsWritten | 書き込まれたレコードの合計数 |
shuffleReadMetrics.* | シャッフル読み取り操作に関連するメトリクス。 |
.recordsRead | シャッフル操作で読み取られたレコード数 |
.remoteBlocksFetched | シャッフル操作でフェッチされたリモートブロックの数 |
.localBlocksFetched | シャッフル操作でフェッチされたローカル(リモートエグゼキューターからの読み取りではなく)ブロックの数 |
.totalBlocksFetched | シャッフル操作でフェッチされたブロックの数(ローカルとリモートの両方) |
.remoteBytesRead | シャッフル操作で読み取られたリモートバイト数 |
.localBytesRead | シャッフル操作でローカルディスクから読み取られたバイト数(リモートエグゼキューターからの読み取りではなく) |
.totalBytesRead | シャッフル操作で読み取られたバイト数(ローカルとリモートの両方) |
.remoteBytesReadToDisk | シャッフル操作でディスクに読み取られたリモートバイト数。大きなブロックは、デフォルトの動作であるメモリに読み込むのではなく、シャッフル読み取り操作でディスクにフェッチされます。 |
.fetchWaitTime | タスクがリモートシャッフルブロックを待機するのに費やした時間。これには、シャッフル入力データでブロックする時間が含まれます。たとえば、タスクがブロックAの処理をまだ完了していないときにブロックBがフェッチされている場合、ブロックBでブロックしているとは見なされません。値はミリ秒単位で表されます。 |
shuffleWriteMetrics.* | シャッフルデータの書き込み操作に関連するメトリクス。 |
.bytesWritten | シャッフル操作で書き込まれたバイト数 |
.recordsWritten | シャッフル操作で書き込まれたレコード数 |
.writeTime | ディスクまたはバッファーキャッシュへの書き込みでブロックするのに費やした時間。値はナノ秒単位で表されます。 |
エグゼキュータのメトリクス
エグゼキューターレベルのメトリクスは、各エグゼキューターからドライバーにハートビートの一部として送信され、JVMヒープメモリやGC情報などのエグゼキューター自体のパフォーマンスメトリクスを記述します。エグゼキューターのメトリクス値と、エグゼキューターごとの測定されたメモリピーク値は、REST API経由でJSON形式およびPrometheus形式で公開されます。JSONエンドポイントは /applications/[app-id]/executors
で公開され、Prometheusエンドポイントは /metrics/executors/prometheus
で公開されます。Prometheusエンドポイントは、構成パラメーター:spark.ui.prometheus.enabled=true
(デフォルトは false
)に依存します。さらに、エグゼキューターメモリメトリクスのステージごとの集計されたピーク値は、spark.eventLog.logStageExecutorMetrics
がtrueの場合にイベントログに書き込まれます。エグゼキューターメモリメトリクスは、Dropwizard metrics library に基づくSparkメトリクスシステムからも公開されます。利用可能なメトリクスのリストと簡単な説明。
エグゼキューターレベルのメトリクス名 | 簡単な説明 |
---|---|
rddBlocks | このエグゼキューターのブロックマネージャー内のRDDブロック。 |
memoryUsed | このエグゼキューターで使用されるストレージメモリ。 |
diskUsed | このエグゼキューターによるRDDストレージに使用されるディスク容量。 |
totalCores | このエグゼキューターで使用可能なコアの数。 |
maxTasks | このエグゼキューターで同時に実行できる最大タスク数。 |
activeTasks | 現在実行中のタスクの数。 |
failedTasks | このエグゼキューターで失敗したタスクの数。 |
completedTasks | このエグゼキューターで完了したタスクの数。 |
totalTasks | このエグゼキューター内のタスクの合計数(実行中、失敗、完了)。 |
totalDuration | JVMがこのエグゼキューターでタスクの実行に費やした経過時間。値はミリ秒単位で表されます。 |
totalGCTime | JVMがこのエグゼキューターで合計したガベージコレクションに費やした経過時間。値はミリ秒単位で表されます。 |
totalInputBytes | このエグゼキューターで合計された入力バイトの合計。 |
totalShuffleRead | このエグゼキューターで合計されたシャッフル読み取りバイトの合計。 |
totalShuffleWrite | このエグゼキューターで合計されたシャッフル書き込みバイトの合計。 |
maxMemory | ストレージに使用可能なメモリの合計量(バイト単位)。 |
memoryMetrics.* | メモリメトリクスの現在の値 |
.usedOnHeapStorageMemory | 現在、ストレージに使用されるオンヒープメモリ(バイト単位)。 |
.usedOffHeapStorageMemory | 現在、ストレージに使用されるオフヒープメモリ(バイト単位)。 |
.totalOnHeapStorageMemory | ストレージに使用できるオンヒープメモリの合計(バイト単位)。この量は、MemoryManagerの実装によって時間の経過とともに変化する可能性があります。 |
.totalOffHeapStorageMemory | ストレージに使用できるオフヒープメモリの合計(バイト単位)。この量は、MemoryManagerの実装に応じて時間の経過とともに変化する可能性があります。 |
peakMemoryMetrics.* | メモリ(およびGC)メトリクスのピーク値 |
.JVMHeapMemory | オブジェクトの割り当てに使用されるヒープのピークメモリ使用量。ヒープは1つ以上のメモリプールで構成されます。返されるメモリ使用量の使用済みサイズとコミット済みサイズは、すべてのヒープメモリプールのこれらの値の合計ですが、返されるメモリ使用量の初期サイズと最大サイズは、すべてのヒープメモリプールの合計ではない可能性のあるヒープメモリの設定を表します。返されるメモリ使用量で使用されるメモリの量は、ライブオブジェクトと収集されていないガベージオブジェクトの両方が占めるメモリの量です(存在する場合)。 |
.JVMOffHeapMemory | Java仮想マシンで使用される非ヒープメモリのピークメモリ使用量。非ヒープメモリは、1つ以上のメモリプールで構成されます。返されるメモリ使用量の使用済みサイズとコミット済みサイズは、すべての非ヒープメモリプールのこれらの値の合計ですが、返されるメモリ使用量の初期サイズと最大サイズは、すべての非ヒープメモリプールの合計ではない可能性のある非ヒープメモリの設定を表します。 |
.OnHeapExecutionMemory | 使用中のオンヒープ実行メモリのピーク(バイト単位)。 |
.OffHeapExecutionMemory | 使用中のオフヒープ実行メモリのピーク(バイト単位)。 |
.OnHeapStorageMemory | 使用中のオンヒープストレージメモリのピーク(バイト単位)。 |
.OffHeapStorageMemory | 使用中のオフヒープストレージメモリのピーク(バイト単位)。 |
.OnHeapUnifiedMemory | オンヒープメモリのピーク(実行とストレージ)。 |
.OffHeapUnifiedMemory | オフヒープメモリのピーク(実行とストレージ)。 |
.DirectPoolMemory | JVMがダイレクトバッファプール(java.lang.management.BufferPoolMXBean )に使用しているピークメモリ |
.MappedPoolMemory | JVMがマップされたバッファプール(java.lang.management.BufferPoolMXBean )に使用しているピークメモリ |
.ProcessTreeJVMVMemory | 仮想メモリサイズ(バイト単位)。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。 |
.ProcessTreeJVMRSSMemory | 常駐セットサイズ:プロセスが実メモリに持つページ数。これは、テキスト、データ、またはスタック領域にカウントされるページのみです。これには、オンデマンドでロードされていないページや、スワップアウトされたページは含まれません。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。 |
.ProcessTreePythonVMemory | Pythonの仮想メモリサイズ(バイト単位)。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。 |
.ProcessTreePythonRSSMemory | Pythonの常駐セットサイズ。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。 |
.ProcessTreeOtherVMemory | 他の種類のプロセスの仮想メモリサイズ(バイト単位)。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。 |
.ProcessTreeOtherRSSMemory | 他の種類のプロセスの常駐セットサイズ。spark.executor.processTreeMetrics.enabledがtrueの場合に有効になります。 |
.MinorGCCount | マイナーGCの合計数。たとえば、ガベージコレクターは、コピー、PSスカベンジ、ParNew、G1ヤングジェネレーションなどです。 |
.MinorGCTime | 経過したマイナーGCの合計時間。値はミリ秒単位で表されます。 |
.MajorGCCount | メジャーGCの合計数。たとえば、ガベージコレクターは、MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1オールドジェネレーションなどです。 |
.MajorGCTime | 経過したメジャーGCの合計時間。値はミリ秒単位で表されます。 |
RSSとVmemの計算は、proc(5)に基づいています。
APIバージョニングポリシー
これらのエンドポイントは、上部にアプリケーションを開発しやすくするために、厳密にバージョン管理されています。特に、Sparkは保証します
- エンドポイントは、あるバージョンから削除されることはありません
- 個々のフィールドは、特定のエンドポイントで削除されることはありません
- 新しいエンドポイントが追加される可能性があります。
- 既存のエンドポイントに新しいフィールドが追加される可能性があります。
- 将来、新しいバージョンのAPIが別のエンドポイント(例:
api/v2
)として追加される可能性があります。新しいバージョンは、必ずしも下位互換性を持つ必要はありません。 - APIバージョンは削除される可能性がありますが、新しいAPIバージョンと少なくとも1つのマイナーリリースで共存した後のみです。
実行中のアプリケーションのUIを調べるときでも、利用可能なアプリケーションが1つしかない場合でも、applications/[app-id]
の部分が必要であることに注意してください。例えば、実行中のアプリのジョブのリストを表示するには、http://localhost:4040/api/v1/applications/[app-id]/jobs
にアクセスします。これは、両方のモードでパスの一貫性を保つためです。
メトリクス
Sparkには、Dropwizard Metrics Library に基づいた設定可能なメトリクスシステムがあります。これにより、ユーザーはHTTP、JMX、CSVファイルなど、さまざまなシンクにSparkメトリクスを報告できます。メトリクスは、Sparkのコードベースに埋め込まれたソースによって生成されます。これらは、特定の活動やSparkコンポーネントのインストルメンテーションを提供します。メトリクスシステムは、Sparkが$SPARK_HOME/conf/metrics.properties
に存在すると想定する設定ファイルによって構成されます。カスタムファイルの場所は、spark.metrics.conf
設定プロパティを介して指定できます。設定ファイルの代わりに、プレフィックス spark.metrics.conf.
を持つ一連の設定パラメーターを使用できます。デフォルトでは、ドライバーまたはエグゼキューターメトリクスに使用されるルート名前空間は、spark.app.id
の値です。ただし、多くの場合、ユーザーはドライバーとエグゼキューターのアプリ間でメトリクスを追跡したいと考えていますが、アプリケーションID(つまり、spark.app.id
)はアプリの呼び出しごとに変わるため、これは困難です。このようなユースケースでは、spark.metrics.namespace
設定プロパティを使用して、メトリクスのレポート用にカスタム名前空間を指定できます。例えば、ユーザーがメトリクス名前空間をアプリケーションの名前に設定したい場合、spark.metrics.namespace
プロパティを ${spark.app.name}
のような値に設定できます。この値は、Sparkによって適切に展開され、メトリクスシステムのルート名前空間として使用されます。ドライバーとエグゼキューター以外のメトリクスは、spark.app.id
でプレフィックスが付けられることはなく、spark.metrics.namespace
プロパティもそのようなメトリクスには影響しません。
Sparkのメトリクスは、Sparkコンポーネントに対応する異なるインスタンスに分離されています。各インスタンス内で、メトリクスが報告されるシンクのセットを設定できます。現在、以下のインスタンスがサポートされています。
master
:Sparkスタンドアロンマスタープロセス。applications
:マスター内のコンポーネントで、さまざまなアプリケーションに関するレポートを行います。worker
:Sparkスタンドアロンワーカープロセス。executor
:Sparkエグゼキューター。driver
:Sparkドライバープロセス(SparkContextが作成されるプロセス)。shuffleService
:Sparkシャッフルサービス。applicationMaster
:YARNで実行する場合のSpark ApplicationMaster。mesos_cluster
:Mesosで実行する場合のSparkクラスターのスケジューラー。
各インスタンスは、ゼロ個以上のシンクにレポートできます。シンクは、org.apache.spark.metrics.sink
パッケージに含まれています。
ConsoleSink
:メトリクス情報をコンソールに記録します。CSVSink
:メトリクスデータを定期的にCSVファイルにエクスポートします。JmxSink
:JMXコンソールで表示するためのメトリクスを登録します。MetricsServlet
:既存のSpark UI内にサーブレットを追加し、メトリクスデータをJSONデータとして提供します。PrometheusServlet
:(試験的)既存のSpark UI内にサーブレットを追加し、メトリクスデータをPrometheus形式で提供します。GraphiteSink
:メトリクスをGraphiteノードに送信します。Slf4jSink
:メトリクスをログエントリとしてslf4jに送信します。StatsdSink
:メトリクスをStatsDノードに送信します。
Sparkは、ライセンス制限のためにデフォルトビルドに含まれていないGangliaシンクもサポートしています。
GangliaSink
:メトリクスをGangliaノードまたはマルチキャストグループに送信します。
GangliaSink
をインストールするには、Sparkのカスタムビルドを実行する必要があります。このライブラリを埋め込むと、LGPLライセンスのコードがSparkパッケージに含まれることに注意してください。 sbtユーザーの場合は、ビルド前にSPARK_GANGLIA_LGPL
環境変数を設定します。Mavenユーザーの場合は、-Pspark-ganglia-lgpl
プロファイルを有効にします。クラスターのSparkビルドを修正することに加えて、ユーザーアプリケーションは spark-ganglia-lgpl
アーティファクトにリンクする必要があります。
メトリクス構成ファイルの構文と各シンクで使用できるパラメーターは、構成ファイルの例である $SPARK_HOME/conf/metrics.properties.template
に定義されています。
メトリクス構成ファイルの代わりにSpark構成パラメーターを使用する場合、関連するパラメーター名は、プレフィックス spark.metrics.conf.
と構成の詳細(つまり、パラメーターは spark.metrics.conf.[instance|*].sink.[sink_name].[parameter_name]
の形式をとります)で構成されます。この例は、Graphiteシンク用のSpark構成パラメーターのリストを示しています。
"spark.metrics.conf.*.sink.graphite.class"="org.apache.spark.metrics.sink.GraphiteSink"
"spark.metrics.conf.*.sink.graphite.host"="graphiteEndPoint_hostName>"
"spark.metrics.conf.*.sink.graphite.port"=<graphite_listening_port>
"spark.metrics.conf.*.sink.graphite.period"=10
"spark.metrics.conf.*.sink.graphite.unit"=seconds
"spark.metrics.conf.*.sink.graphite.prefix"="optional_prefix"
"spark.metrics.conf.*.sink.graphite.regex"="optional_regex_to_send_matching_metrics"
Sparkメトリクス構成のデフォルト値は次のとおりです。
"*.sink.servlet.class" = "org.apache.spark.metrics.sink.MetricsServlet"
"*.sink.servlet.path" = "/metrics/json"
"master.sink.servlet.path" = "/metrics/master/json"
"applications.sink.servlet.path" = "/metrics/applications/json"
追加のソースは、メトリクス構成ファイルまたは構成パラメーター spark.metrics.conf.[component_name].source.jvm.class=[source_name]
を使用して構成できます。現在、JVMソースは利用可能な唯一のオプションソースです。例えば、次の構成パラメーターはJVMソースをアクティブ化します:"spark.metrics.conf.*.source.jvm.class"="org.apache.spark.metrics.source.JvmSource"
利用可能なメトリクスプロバイダーのリスト
Sparkで使用されるメトリクスには、ゲージ、カウンター、ヒストグラム、メーター、タイマーの複数のタイプがあります。詳細については、Dropwizardライブラリのドキュメントを参照してください。次のコンポーネントとメトリクスのリストでは、コンポーネントインスタンスとソース名前空間ごとにグループ化された、利用可能なメトリクスの名前と詳細をいくつか報告します。Sparkインストルメンテーションで使用される最も一般的なメトリクスのタイプは、ゲージとカウンターです。カウンターは、.count
接尾辞が付いていることで識別できます。タイマー、メーター、ヒストグラムはリストで注釈が付けられています。リストの残りの要素はゲージタイプのメトリクスです。メトリクスの大部分は、親コンポーネントインスタンスが構成されるとすぐにアクティブになります。一部のメトリクスでは、追加の構成パラメーターを介して有効にする必要もあります。詳細についてはリストに記載されています。
コンポーネントインスタンス = Driver
これは、最も多くのインストルメント化されたメトリクスを持つコンポーネントです。
- 名前空間=BlockManager
- disk.diskSpaceUsed_MB
- memory.maxMem_MB
- memory.maxOffHeapMem_MB
- memory.maxOnHeapMem_MB
- memory.memUsed_MB
- memory.offHeapMemUsed_MB
- memory.onHeapMemUsed_MB
- memory.remainingMem_MB
- memory.remainingOffHeapMem_MB
- memory.remainingOnHeapMem_MB
- 名前空間=HiveExternalCatalog
- 注:これらのメトリクスは、構成パラメーター
spark.metrics.staticSources.enabled
(デフォルトはtrue)に条件付きです。 - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- 注:これらのメトリクスは、構成パラメーター
- 名前空間=CodeGenerator
- 注:これらのメトリクスは、構成パラメーター
spark.metrics.staticSources.enabled
(デフォルトはtrue)に条件付きです。 - compilationTime (ヒストグラム)
- generatedClassSize (ヒストグラム)
- generatedMethodSize (ヒストグラム)
- sourceCodeSize (ヒストグラム)
- 注:これらのメトリクスは、構成パラメーター
- 名前空間=DAGScheduler
- job.activeJobs
- job.allJobs
- messageProcessingTime (タイマー)
- stage.failedStages
- stage.runningStages
- stage.waitingStages
- 名前空間=LiveListenerBus
- listenerProcessingTime.org.apache.spark.HeartbeatReceiver (タイマー)
- listenerProcessingTime.org.apache.spark.scheduler.EventLoggingListener (タイマー)
- listenerProcessingTime.org.apache.spark.status.AppStatusListener (タイマー)
- numEventsPosted.count
- queue.appStatus.listenerProcessingTime (タイマー)
- queue.appStatus.numDroppedEvents.count
- queue.appStatus.size
- queue.eventLog.listenerProcessingTime (タイマー)
- queue.eventLog.numDroppedEvents.count
- queue.eventLog.size
- queue.executorManagement.listenerProcessingTime (タイマー)
- 名前空間=appStatus (すべてのメトリクスのタイプ=カウンター)
- 注:Spark 3.0で導入。構成パラメーター
spark.metrics.appStatusSource.enabled
(デフォルトはfalse)に条件付き。 - stages.failedStages.count
- stages.skippedStages.count
- stages.completedStages.count
- tasks.blackListedExecutors.count // 非推奨、代わりにexcludedExecutorsを使用してください
- tasks.excludedExecutors.count
- tasks.completedTasks.count
- tasks.failedTasks.count
- tasks.killedTasks.count
- tasks.skippedTasks.count
- tasks.unblackListedExecutors.count // 非推奨、代わりにunexcludedExecutorsを使用してください
- tasks.unexcludedExecutors.count
- jobs.succeededJobs
- jobs.failedJobs
- jobDuration
- 注:Spark 3.0で導入。構成パラメーター
- 名前空間=AccumulatorSource
- 注:メトリクスシステムにアキュムレーターをアタッチするためのユーザー設定可能なソース
- DoubleAccumulatorSource
- LongAccumulatorSource
- 名前空間=spark.streaming
- 注:これはSpark Structured Streamingのみに適用されます。構成パラメーター
spark.sql.streaming.metricsEnabled=true
(デフォルトはfalse)に条件付き。 - eventTime-watermark
- inputRate-total
- latency
- processingRate-total
- states-rowsTotal
- states-usedBytes
- 注:これはSpark Structured Streamingのみに適用されます。構成パラメーター
- 名前空間=JVMCPU
- jvmCpuTime
- 名前空間=executor
- 注:これらのメトリクスは、ローカルモードのドライバーでのみ利用可能です。
- この名前空間で使用可能なメトリクスの完全なリストは、エグゼキューターコンポーネントインスタンスの対応するエントリにあります。
- 名前空間=ExecutorMetrics
- 注:これらのメトリクスは、構成パラメーター
spark.metrics.executorMetricsSource.enabled
(デフォルトはtrue)に条件付きです。 - このソースには、メモリ関連のメトリクスが含まれています。この名前空間で使用可能なメトリクスの完全なリストは、エグゼキューターコンポーネントインスタンスの対応するエントリにあります。
- 注:これらのメトリクスは、構成パラメーター
- 名前空間=ExecutorAllocationManager
- 注:これらのメトリクスは、動的割り当てを使用している場合にのみ出力されます。構成パラメーター
spark.dynamicAllocation.enabled
(デフォルトはfalse)に条件付き。 - executors.numberExecutorsToAdd
- executors.numberExecutorsPendingToRemove
- executors.numberAllExecutors
- executors.numberTargetExecutors
- executors.numberMaxNeededExecutors
- executors.numberDecommissioningExecutors
- executors.numberExecutorsGracefullyDecommissioned.count
- executors.numberExecutorsDecommissionUnfinished.count
- executors.numberExecutorsExitedUnexpectedly.count
- executors.numberExecutorsKilledByDriver.count
- 注:これらのメトリクスは、動的割り当てを使用している場合にのみ出力されます。構成パラメーター
- namespace=plugin.<プラグインクラス名>
- オプションのネームスペース。このネームスペースのメトリクスは、ユーザーが提供したコードで定義され、SparkプラグインAPIを使用して構成されます。カスタムプラグインをSparkにロードする方法については、以下の「高度な計測」を参照してください。
コンポーネントインスタンス = Executor
これらのメトリクスは、Sparkエグゼキューターによって公開されます。
- namespace=executor (メトリクスのタイプはカウンターまたはゲージです)
- 注
spark.executor.metrics.fileSystemSchemes
(デフォルト:file,hdfs
)は、公開されるファイルシステムメトリクスを決定します。
- bytesRead.count
- bytesWritten.count
- cpuTime.count
- deserializeCpuTime.count
- deserializeTime.count
- diskBytesSpilled.count
- filesystem.file.largeRead_ops
- filesystem.file.read_bytes
- filesystem.file.read_ops
- filesystem.file.write_bytes
- filesystem.file.write_ops
- filesystem.hdfs.largeRead_ops
- filesystem.hdfs.read_bytes
- filesystem.hdfs.read_ops
- filesystem.hdfs.write_bytes
- filesystem.hdfs.write_ops
- jvmGCTime.count
- memoryBytesSpilled.count
- recordsRead.count
- recordsWritten.count
- resultSerializationTime.count
- resultSize.count
- runTime.count
- shuffleBytesWritten.count
- shuffleFetchWaitTime.count
- shuffleLocalBlocksFetched.count
- shuffleLocalBytesRead.count
- shuffleRecordsRead.count
- shuffleRecordsWritten.count
- shuffleRemoteBlocksFetched.count
- shuffleRemoteBytesRead.count
- shuffleRemoteBytesReadToDisk.count
- shuffleTotalBytesRead.count
- shuffleWriteTime.count
- succeededTasks.count
- threadpool.activeTasks
- threadpool.completeTasks
- threadpool.currentPool_size
- threadpool.maxPool_size
- threadpool.startedTasks
- 注
- 名前空間=ExecutorMetrics
- 注
- これらのメトリクスは、構成パラメーター
spark.metrics.executorMetricsSource.enabled
(デフォルト値はtrue)に依存します。 - ExecutorMetricsは、エグゼキューターとドライバーで定期的にスケジュールされたハートビートプロセスの一部として更新されます。
spark.executor.heartbeatInterval
(デフォルト値は10秒) - オプションでより高速なポーリングメカニズムをエグゼキューターメモリメトリクスに利用できます。これは、構成パラメーター
spark.executor.metrics.pollingInterval
を使用してポーリング間隔(ミリ秒単位)を設定することで有効にできます。
- これらのメトリクスは、構成パラメーター
- JVMHeapMemory
- JVMOffHeapMemory
- OnHeapExecutionMemory
- OnHeapStorageMemory
- OnHeapUnifiedMemory
- OffHeapExecutionMemory
- OffHeapStorageMemory
- OffHeapUnifiedMemory
- DirectPoolMemory
- MappedPoolMemory
- MinorGCCount
- MinorGCTime
- MajorGCCount
- MajorGCTime
- “ProcessTree*”メトリックカウンター
- ProcessTreeJVMVMemory
- ProcessTreeJVMRSSMemory
- ProcessTreePythonVMemory
- ProcessTreePythonRSSMemory
- ProcessTreeOtherVMemory
- ProcessTreeOtherRSSMemory
- 注:“ProcessTree*”メトリクスは、特定の条件下でのみ収集されます。条件は、次の論理ANDです:
/proc
ファイルシステムが存在する、spark.executor.processTreeMetrics.enabled=true
。“ProcessTree*”メトリクスは、これらの条件が満たされない場合は0を報告します。
- 注
- 名前空間=JVMCPU
- jvmCpuTime
- namespace=NettyBlockTransfer
- shuffle-client.usedDirectMemory
- shuffle-client.usedHeapMemory
- shuffle-server.usedDirectMemory
- shuffle-server.usedHeapMemory
- 名前空間=HiveExternalCatalog
- 注:これらのメトリクスは、構成パラメーター
spark.metrics.staticSources.enabled
(デフォルトはtrue)に条件付きです。 - fileCacheHits.count
- filesDiscovered.count
- hiveClientCalls.count
- parallelListingJobCount.count
- partitionsFetched.count
- 注:これらのメトリクスは、構成パラメーター
- 名前空間=CodeGenerator
- 注:これらのメトリクスは、構成パラメーター
spark.metrics.staticSources.enabled
(デフォルトはtrue)に条件付きです。 - compilationTime (ヒストグラム)
- generatedClassSize (ヒストグラム)
- generatedMethodSize (ヒストグラム)
- sourceCodeSize (ヒストグラム)
- 注:これらのメトリクスは、構成パラメーター
- namespace=plugin.<プラグインクラス名>
- オプションのネームスペース。このネームスペースのメトリクスは、ユーザーが提供したコードで定義され、SparkプラグインAPIを使用して構成されます。カスタムプラグインをSparkにロードする方法については、以下の「高度な計測」を参照してください。
ソース = JVMソース
注
- 関連する
metrics.properties
ファイルエントリまたは構成パラメーターを設定して、このソースを有効にします:spark.metrics.conf.*.source.jvm.class=org.apache.spark.metrics.source.JvmSource
- これらのメトリクスは、構成パラメーター
spark.metrics.staticSources.enabled
(デフォルトはtrue)に依存します。 - このソースは、ドライバーインスタンスとエグゼキューターインスタンスで使用でき、他のインスタンスでも使用できます。
- このソースは、Dropwizard/Codahale Metric Sets for JVM instrumentation、特にBufferPoolMetricSet、GarbageCollectorMetricSet、MemoryUsageGaugeSetを使用して、JVMメトリクスに関する情報を提供します。
コンポーネントインスタンス = applicationMaster
注:YARNで実行する場合に適用されます
- numContainersPendingAllocate
- numExecutorsFailed
- numExecutorsRunning
- numLocalityAwareTasks
- numReleasedContainers
コンポーネントインスタンス = mesos_cluster
注:Mesosで実行する場合に適用されます
- waitingDrivers
- launchedDrivers
- retryDrivers
コンポーネントインスタンス = master
注:Sparkスタンドアロンでマスターとして実行する場合に適用されます
- workers
- aliveWorkers
- apps
- waitingApps
コンポーネントインスタンス = ApplicationSource
注:Sparkスタンドアロンでマスターとして実行する場合に適用されます
- status
- runtime_ms
- cores
コンポーネントインスタンス = worker
注:Sparkスタンドアロンでワーカーとして実行する場合に適用されます
- executors
- coresUsed
- memUsed_MB
- coresFree
- memFree_MB
コンポーネントインスタンス = shuffleService
注:シャッフルサービスに適用されます
- blockTransferRate(メーター)-転送されているブロックのレート
- blockTransferMessageRate(メーター)-ブロック転送メッセージのレート。つまり、バッチフェッチが有効になっている場合、これはブロックの数ではなくバッチの数を表します。
- blockTransferRateBytes(メーター)
- blockTransferAvgTime_1min(ゲージ-1分間の移動平均)
- numActiveConnections.count
- numRegisteredConnections.count
- numCaughtExceptions.count
- openBlockRequestLatencyMillis(ヒストグラム)
- registerExecutorRequestLatencyMillis(ヒストグラム)
- registeredExecutorsSize
- shuffle-server.usedDirectMemory
-
shuffle-server.usedHeapMemory
- 注:以下のメトリクスは、プッシュベースのシャッフルの場合、サーバー側の構成
spark.shuffle.push.server.mergedShuffleFileManagerImpl
がorg.apache.spark.network.shuffle.MergedShuffleFileManager
に設定されている場合に適用されます。 - blockBytesWritten-ファイルに書き込まれたプッシュされたブロックデータのサイズ(バイト単位)
- blockAppendCollisions-同じリデュースパーティションの別のブロックが書き込まれているため、シャッフルサービスで衝突したシャッフルプッシュブロックの数
- lateBlockPushes-特定のシャッフルマージが完了した後、シャッフルサービスで受信されたシャッフルプッシュブロックの数
- deferredBlocks-現在メモリにバッファリングされている遅延ブロックパーツの数
- deferredBlockBytes-現在メモリにバッファリングされている遅延ブロックパーツのサイズ
- staleBlockPushes-古いシャッフルブロックプッシュリクエストの数
- ignoredBlockBytes-ESSに転送されたが無視されたプッシュされたブロックデータのサイズ。プッシュされたブロックデータは、次の場合に無視されると見なされます:1.シャッフルが完了した後に受信した場合。 2.プッシュリクエストが重複ブロックの場合。 3. ESSがブロックを書き込めなかった場合。
高度なインストルメンテーション
Sparkジョブのパフォーマンスをプロファイリングするのに役立ついくつかの外部ツールを使用できます。
- Gangliaなどのクラスター全体の監視ツールは、クラスター全体の利用率とリソースのボトルネックに関する洞察を提供できます。たとえば、Gangliaダッシュボードでは、特定のワークロードがディスクバウンド、ネットワークバウンド、またはCPUバウンドのいずれであるかをすばやく明らかにできます。
- dstat、iostat、およびiotopなどのOSプロファイリングツールは、個々のノードで詳細なプロファイリングを提供できます。
- スタックトレースを提供する
jstack
、ヒープダンプを作成するjmap
、時系列統計を報告するjstat
、およびさまざまなJVMプロパティを視覚的に探索するjconsole
などのJVMユーティリティは、JVM内部に精通している人に役立ちます。
Sparkは、カスタム計測コードをSparkアプリケーションに追加できるように、プラグインAPIも提供しています。Sparkにプラグインをロードするために利用できる構成キーは2つあります。
spark.plugins
spark.plugins.defaultList
どちらも、org.apache.spark.api.plugin.SparkPlugin
インターフェイスを実装するクラス名のコンマ区切りリストを受け取ります。2つの名前が存在するのは、1つのリストをSparkのデフォルト構成ファイルに配置して、ユーザーが構成ファイルのリストを上書きすることなく、コマンドラインから他のプラグインを簡単に追加できるようにするためです。重複するプラグインは無視されます。