YARN 上での Spark 実行
- セキュリティ
- YARN 上での Spark の起動
- 準備
- 設定
- アプリケーションのデバッグ
- リソース割り当てと設定の概要
- ステージレベルスケジューリングの概要
- 重要な注意事項
- Kerberos
- 外部シャッフルサービスの設定
- Apache Oozie を使用したアプリケーションの起動
- Spark Web UI の代わりに Spark History Server を使用すること
- 複数のバージョンの Spark シャッフルサービスの実行のサポート
YARN (Hadoop NextGen) 上での実行のサポートは、Spark バージョン 0.6.0 で追加され、以降のリリースで改良されました。YARN (Hadoop NextGen)
セキュリティ
認証などのセキュリティ機能は、デフォルトでは有効になっていません。インターネットまたは信頼できないネットワークに公開されているクラスタをデプロイする場合、クラスタへのアクセスを保護して、不正なアプリケーションがクラスタ上で実行されないようにすることが重要です。Spark を実行する前に、Spark セキュリティ とこのドキュメントの具体的なセキュリティセクションを参照してください。
YARN 上での Spark の起動
HADOOP_CONF_DIR
または YARN_CONF_DIR
が、Hadoop クラスタの(クライアント側の)設定ファイルが含まれているディレクトリを指していることを確認してください。これらの設定は HDFS に書き込み、YARN ResourceManager に接続するために使用されます。このディレクトリに含まれる設定は YARN クラスタに配布されるため、アプリケーションで使用されるすべてのコンテナが同じ設定を使用します。設定が YARN によって管理されていない Java システムプロパティまたは環境変数を参照する場合は、Spark アプリケーションの設定(ドライバ、エグゼキュータ、およびクライアントモードで実行する場合の AM)にも設定する必要があります。
YARN 上で Spark アプリケーションを起動するために使用できるデプロイモードは 2 つあります。cluster
モードでは、Spark ドライバはクラスタ上の YARN によって管理されるアプリケーションマスタープロセス内で実行され、クライアントはアプリケーションの開始後に終了できます。client
モードでは、ドライバはクライアントプロセス内で実行され、アプリケーションマスターは YARN からリソースを要求するためだけに使用されます。
マスターのアドレスが --master
パラメータで指定される Spark がサポートする他のクラスタマネージャとは異なり、YARN モードでは ResourceManager のアドレスは Hadoop 設定から取得されます。したがって、--master
パラメータは yarn
です。
cluster
モードで Spark アプリケーションを起動するには
$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
例:
$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue thequeue \
examples/jars/spark-examples*.jar \
10
上記のコマンドは、デフォルトのアプリケーションマスターを起動する YARN クライアントプログラムを開始します。その後、SparkPi はアプリケーションマスターの子スレッドとして実行されます。クライアントは定期的にアプリケーションマスターのステータス更新をポーリングし、コンソールに表示します。アプリケーションの実行が終了すると、クライアントは終了します。ドライバとエグゼキュータのログの確認方法については、以下のアプリケーションのデバッグセクションを参照してください。
client
モードで Spark アプリケーションを起動するには、cluster
を client
に置き換えます。以下は、client
モードで spark-shell
を実行する方法を示しています。
$ ./bin/spark-shell --master yarn --deploy-mode client
その他の JAR の追加
cluster
モードでは、ドライバはクライアントとは異なるマシンで実行されるため、SparkContext.addJar
はクライアントローカルのファイルではそのままでは機能しません。クライアント上のファイルを SparkContext.addJar
で使用できるようにするには、起動コマンドで --jars
オプションを使用してそれらを含めます。
$ ./bin/spark-submit --class my.main.Class \
--master yarn \
--deploy-mode cluster \
--jars my-other-jar.jar,my-other-other-jar.jar \
my-main-jar.jar \
app_arg1 app_arg2
準備
YARN 上で Spark を実行するには、YARN サポートを備えてビルドされた Spark のバイナリ配布が必要です。バイナリ配布は、プロジェクト Web サイトのダウンロードページからダウンロードできます。ダウンロードできる Spark バイナリ配布には 2 つのバリエーションがあります。1つは特定のバージョンの Apache Hadoop を使用して事前にビルドされています。この Spark 配布には、組み込みの Hadoop ランタイムが含まれているため、「with-hadoop
」Spark 配布と呼びます。もう1つは、ユーザーが提供した Hadoop を使用して事前にビルドされています。この Spark 配布には組み込みの Hadoop ランタイムが含まれていないため、サイズが小さくなりますが、ユーザーは別途 Hadoop インストールを提供する必要があります。このバリアントを「no-hadoop
」Spark 配布と呼びます。with-hadoop
Spark 配布の場合、組み込みの Hadoop ランタイムが既に含まれているため、デフォルトでは、ジョブが Hadoop Yarn クラスタに送信されると、jar の競合を防ぐために、Yarn のクラスパスは Spark には設定されません。この動作をオーバーライドするには、spark.yarn.populateHadoopClasspath=true
を設定できます。no-hadoop
Spark 配布の場合、Spark は Hadoop ランタイムを取得するために、デフォルトで Yarn のクラスパスを設定します。with-hadoop
Spark 配布の場合、アプリケーションがクラスタでのみ利用可能な特定のライブラリに依存している場合は、上記のプロパティを設定して Yarn クラスパスを設定してみてください。そうすることで jar の競合が発生する場合は、それをオフにして、このライブラリをアプリケーション jar に含める必要があります。
Spark を自分でビルドするには、Spark のビルドを参照してください。
YARN 側から Spark ランタイム jar にアクセスできるようにするには、spark.yarn.archive
または spark.yarn.jars
を指定できます。詳細については、Spark プロパティを参照してください。spark.yarn.archive
と spark.yarn.jars
のどちらも指定されていない場合、Spark は $SPARK_HOME/jars
の下のすべての jar を含む zip ファイルを作成し、それを分散キャッシュにアップロードします。
設定
ほとんどの設定は、YARN 上の Spark と他のデプロイモードで同じです。それらに関する詳細については、設定ページを参照してください。これらは、YARN 上の Spark に固有の設定です。
アプリケーションのデバッグ
YARN の用語では、エグゼキュータとアプリケーションマスターは「コンテナ」内で実行されます。YARN には、アプリケーションの完了後にコンテナログを処理するための 2 つのモードがあります。ログの集約が有効になっている場合(yarn.log-aggregation-enable
設定を使用)、コンテナログは HDFS にコピーされ、ローカルマシンから削除されます。これらのログは、yarn logs
コマンドを使用して、クラスタ内のどこからでも表示できます。
yarn logs -applicationId <app ID>
は、指定されたアプリケーションのすべてのコンテナからのすべてのログファイルの内容を出力します。HDFS シェルまたは API を使用して、HDFS でコンテナログファイルも直接表示できます。それらが配置されているディレクトリは、YARN 設定(yarn.nodemanager.remote-app-log-dir
と yarn.nodemanager.remote-app-log-dir-suffix
)を確認することで見つけることができます。ログは、エグゼキュータタブにある Spark Web UI でも利用できます。Spark history server と MapReduce history server の両方が実行されており、yarn-site.xml
で yarn.log.server.url
が適切に設定されている必要があります。Spark history server UI のログURLは、集約されたログを表示するためにMapReduce history server にリダイレクトされます。
ログの集約が有効になっていない場合、ログは各マシンの YARN_APP_LOGS_DIR
の下にローカルに保持されます。これは通常、Hadoop のバージョンとインストールに応じて /tmp/logs
または $HADOOP_HOME/logs/userlogs
に設定されます。コンテナのログを表示するには、ログが含まれているホストにアクセスし、このディレクトリを確認する必要があります。サブディレクトリは、アプリケーションIDとコンテナIDによってログファイルを整理します。ログは、エグゼキュータタブにある Spark Web UI でも利用でき、MapReduce history server の実行は必要ありません。
コンテナごとの起動環境を確認するには、yarn.nodemanager.delete.debug-delay-sec
を大きな値(例:36000
)に増やし、コンテナが起動されたノードの yarn.nodemanager.local-dirs
を介してアプリケーションキャッシュにアクセスします。このディレクトリには、起動スクリプト、JAR、および各コンテナの起動に使用されるすべての環境変数が含まれています。このプロセスは、特にクラスパスの問題のデバッグに役立ちます。(これを有効にするには、クラスタ設定に対する管理者権限とすべてのノードマネージャの再起動が必要です。したがって、これはホストされたクラスタには適用されません)。
アプリケーションマスターまたはエグゼキュータにカスタム log4j 設定を使用するには、次のオプションがあります。
spark-submit
を使用してカスタムlog4j.properties
をアップロードし、アプリケーションと共にアップロードするファイルの--files
リストに追加します。-Dlog4j.configuration=<設定ファイルの場所>
をspark.driver.extraJavaOptions
(ドライバの場合)またはspark.executor.extraJavaOptions
(エグゼキュータの場合)に追加します。ファイルを使用する場合は、file:
プロトコルを明示的に指定する必要があり、ファイルはすべてのノードのローカルに存在する必要があります。$SPARK_CONF_DIR/log4j.properties
ファイルを更新すると、他の設定と共に自動的にアップロードされます。複数のオプションが指定されている場合、他の 2 つのオプションの方が優先順位が高くなります。
最初のオプションでは、エグゼキュータとアプリケーションマスターの両方が同じlog4j構成を共有することに注意してください。これにより、同じノードで実行する場合(たとえば、同じログファイルへの書き込みを試行する場合)に問題が発生する可能性があります。
YARNでログファイルを適切な場所に配置し、YARNで適切に表示および集約できるようにするには、log4j2.properties
でspark.yarn.app.container.log.dir
を使用します。例:appender.file_appender.fileName=${sys:spark.yarn.app.container.log.dir}/spark.log
。ストリーミングアプリケーションの場合、RollingFileAppender
を構成し、ファイルの場所をYARNのログディレクトリに設定すると、大きなログファイルによって発生するディスクオーバーフローを防ぎ、YARNのログユーティリティを使用してログにアクセスできます。
アプリケーションマスターとエグゼキュータにカスタムのmetrics.propertiesを使用するには、$SPARK_CONF_DIR/metrics.properties
ファイルを更新します。これは他の構成と共に自動的にアップロードされるため、--files
で手動で指定する必要はありません。
Spark プロパティ
プロパティ名 | デフォルト値 | 説明 | 導入バージョン |
---|---|---|---|
spark.yarn.am.memory |
512m |
クライアントモードでYARNアプリケーションマスターが使用するメモリの量。JVMメモリの文字列と同じ形式(例:512m 、2g )。クラスタモードでは、代わりにspark.driver.memory を使用します。接尾辞には小文字を使用します(例: |
1.3.0 |
spark.yarn.am.resource.{resource-type}.amount |
(なし) |
クライアントモードでYARNアプリケーションマスターが使用するリソース量。クラスタモードでは、代わりにspark.yarn.driver.resource.<resource-type>.amount を使用します。この機能はYARN 3.0以降でのみ使用できます。参考として、YARNリソースモデルのドキュメントを参照してください:https://hadoop.dokyumento.jp/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceModel.html例:YARNからGPUリソースを要求するには、 |
3.0.0 |
spark.yarn.applicationType |
SPARK |
SPARK 、SPARK-SQL 、SPARK-STREAMING 、SPARK-MLLIB 、SPARK-GRAPH など、より具体的なアプリケーションタイプを定義します。20文字を超えないように注意してください。 |
3.1.0 |
spark.yarn.driver.resource.{resource-type}.amount |
(なし) |
クラスタモードでYARNアプリケーションマスターが使用するリソース量。この機能はYARN 3.0以降でのみ使用できます。参考として、YARNリソースモデルのドキュメントを参照してください:https://hadoop.dokyumento.jp/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceModel.html 例:YARNからGPUリソースを要求するには、 |
3.0.0 |
spark.yarn.executor.resource.{resource-type}.amount |
(なし) |
エグゼキュータプロセスごとに使用するリソース量。この機能はYARN 3.0以降でのみ使用できます。参考として、YARNリソースモデルのドキュメントを参照してください:https://hadoop.dokyumento.jp/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceModel.html 例:YARNからGPUリソースを要求するには、 |
3.0.0 |
spark.yarn.resourceGpuDeviceName |
yarn.io/gpu |
Sparkリソースタイプgpu とGPUを表すYARNリソースのマッピングを指定します。デフォルトではYARNはyarn.io/gpu を使用しますが、YARNがカスタムリソースタイプで構成されている場合、これによりリマッピングが可能です。spark.{driver/executor}.resource.gpu.* 構成を使用する場合に適用されます。 |
3.2.1 |
spark.yarn.resourceFpgaDeviceName |
yarn.io/fpga |
Sparkリソースタイプfpga とFPGAを表すYARNリソースのマッピングを指定します。デフォルトではYARNはyarn.io/fpga を使用しますが、YARNがカスタムリソースタイプで構成されている場合、これによりリマッピングが可能です。spark.{driver/executor}.resource.fpga.* 構成を使用する場合に適用されます。 |
3.2.1 |
spark.yarn.am.cores |
1 |
クライアントモードでYARNアプリケーションマスターが使用するコア数。クラスタモードでは、代わりにspark.driver.cores を使用します。 |
1.3.0 |
spark.yarn.am.waitTime |
100s |
クラスタモードでのみ使用されます。YARNアプリケーションマスターがSparkContextの初期化を待つ時間。 | 1.3.0 |
spark.yarn.submit.file.replication |
デフォルトのHDFSレプリケーション(通常は3 ) |
アプリケーションのためにHDFSにアップロードされたファイルのHDFSレプリケーションレベル。これには、Spark JAR、アプリJAR、および分散キャッシュファイル/アーカイブなどが含まれます。 | 0.8.1 |
spark.yarn.stagingDir |
ファイルシステム内の現在のユーザーのホームディレクトリ | アプリケーションの送信中に使用されるステージングディレクトリ。 | 2.0.0 |
spark.yarn.preserve.staging.files |
false |
ジョブの終了時にステージングされたファイル(Spark JAR、アプリJAR、分散キャッシュファイル)を削除するのではなく保持するには、true に設定します。 |
1.1.0 |
spark.yarn.scheduler.heartbeat.interval-ms |
3000 |
SparkアプリケーションマスターがYARN ResourceManagerにハートビートを送信する間隔(ミリ秒単位)。この値は、YARNの構成における有効期限間隔の値の半分(つまり、yarn.am.liveness-monitor.expiry-interval-ms )に制限されます。 |
0.8.1 |
spark.yarn.scheduler.initial-allocation.interval |
200ms |
保留中のコンテナ割り当て要求がある場合、SparkアプリケーションマスターがYARN ResourceManagerに積極的にハートビートを送信する最初のインターバル。これはspark.yarn.scheduler.heartbeat.interval-ms 以下にする必要があります。保留中のコンテナがまだ存在する場合は、後続の積極的なハートビートで割り当て間隔が2倍になり、spark.yarn.scheduler.heartbeat.interval-ms に達するまで続きます。 |
1.4.0 |
spark.yarn.historyServer.address |
(なし) | Spark履歴サーバーのアドレス(例:host.com:18080 )。アドレスにはスキーム(http:// )を含めることはできません。履歴サーバーはオプションのサービスであるため、デフォルトでは設定されていません。このアドレスは、Sparkアプリケーションが終了したときにYARN ResourceManagerに渡され、ResourceManager UIからSpark履歴サーバーUIへのアプリケーションをリンクします。このプロパティでは、YARNプロパティを変数として使用でき、これらはSparkによって実行時に置換されます。たとえば、Spark履歴サーバーがYARN ResourceManagerと同じノードで実行される場合は、${hadoopconf-yarn.resourcemanager.hostname}:18080 に設定できます。 |
1.0.0 |
spark.yarn.dist.archives |
(なし) | 各エグゼキュータの作業ディレクトリに展開するアーカイブのカンマ区切りリスト。 | 1.0.0 |
spark.yarn.dist.files |
(なし) | 各エグゼキュータの作業ディレクトリに配置するファイルのカンマ区切りリスト。 | 1.0.0 |
spark.yarn.dist.jars |
(なし) | 各エグゼキュータの作業ディレクトリに配置するJARファイルのカンマ区切りリスト。 | 2.0.0 |
spark.yarn.dist.forceDownloadSchemes |
(なし) |
YARNの分散キャッシュに追加する前に、ローカルディスクにダウンロードされるリソースのスキームのカンマ区切りリスト。YARNサービスがSparkでサポートされているスキーム(http、https、ftpなど)をサポートしていない場合、またはローカルYARNクライアントのクラスパスにある必要があるJARファイルを使用する場合に使用します。ワイルドカード「*」は、すべてのスキームのリソースをダウンロードすることを示します。 | 2.3.0 |
spark.executor.instances |
2 |
静的割り当てのエグゼキュータの数。spark.dynamicAllocation.enabled を使用すると、エグゼキュータの初期セットは少なくともこのサイズになります。 |
1.0.0 |
spark.yarn.am.memoryOverhead |
AMメモリ * 0.10、最小384 | spark.driver.memoryOverhead と同じですが、クライアントモードのYARNアプリケーションマスター用です。 |
1.3.0 |
spark.yarn.queue |
default |
アプリケーションが送信されるYARNキューの名前。 | 1.0.0 |
spark.yarn.jars |
(なし) | YARNコンテナに配布するSparkコードを含むライブラリのリスト。デフォルトでは、YARN上のSparkはローカルにインストールされているSpark JARを使用しますが、Spark JARはHDFS上の世界で読み取り可能な場所にも存在できます。これにより、YARNはノードにキャッシュできるため、アプリケーションが実行されるたびに配布する必要がなくなります。たとえば、HDFS上のJARを指すには、この構成をhdfs:///some/path に設定します。グロブを使用できます。 |
2.0.0 |
spark.yarn.archive |
(なし) | YARNキャッシュに配布するために必要なSpark JARを含むアーカイブ。設定されている場合、この構成はspark.yarn.jars を置き換え、アーカイブはすべてのアプリケーションのコンテナで使用されます。アーカイブには、ルートディレクトリにJARファイルを含める必要があります。前のオプションと同様に、アーカイブはHDFS上にホストしてファイル配布を高速化することもできます。 |
2.0.0 |
spark.yarn.appMasterEnv.[EnvironmentVariableName] |
(なし) | EnvironmentVariableName で指定された環境変数を、YARNで起動されたアプリケーションマスタープロセスに追加します。ユーザーはこれらを複数指定して、複数の環境変数を設定できます。クラスタモードでは、Sparkドライバの環境を制御し、クライアントモードではエグゼキュータランチャの環境のみを制御します。 |
1.1.0 |
spark.yarn.containerLauncherMaxThreads |
25 |
エグゼキュータコンテナの起動にYARNアプリケーションマスターで使用するスレッドの最大数。 | 1.2.0 |
spark.yarn.am.extraJavaOptions |
(なし) | クライアントモードでYARNアプリケーションマスターに渡す追加のJVMオプションの文字列。クラスタモードでは、代わりにspark.driver.extraJavaOptions を使用します。このオプションで最大ヒープサイズ(-Xmx)設定を設定することはできません。最大ヒープサイズ設定はspark.yarn.am.memory で設定できます。 |
1.3.0 |
spark.yarn.am.extraLibraryPath |
(なし) | クライアントモードでYARNアプリケーションマスターを起動するときに使用する特別なライブラリパスを設定します。 | 1.4.0 |
spark.yarn.populateHadoopClasspath |
with-hadoop Spark配布ではfalse に設定され、no-hadoop 配布ではtrue に設定されます。 |
yarn.application.classpath およびmapreduce.application.classpath からHadoopクラスパスを設定するかどうか。これがfalse に設定されている場合、Hadoopランタイムをバンドルするwith-Hadoop Spark配布が必要であるか、またはユーザーが別途Hadoopインストールを提供する必要があります。 |
2.4.6 |
spark.yarn.maxAppAttempts |
YARNのyarn.resourcemanager.am.max-attempts |
アプリケーションの送信を試行する最大回数。これは、YARN構成のグローバルな最大試行回数の数以下にする必要があります。 | 1.3.0 |
spark.yarn.am.attemptFailuresValidityInterval |
(なし) | AM障害追跡の有効期間を定義します。AMが定義された期間以上実行されている場合、AM障害カウントはリセットされます。構成されていない場合は、この機能は無効です。 | 1.6.0 |
spark.yarn.am.clientModeTreatDisconnectAsFailed |
false | yarn-clientのクリーンでない切断を障害として扱う。yarn-clientモードでは、通常、アプリケーションは常に最終ステータスがSUCCESSで終了します。これは、場合によっては、アプリケーションがユーザーによって意図的に終了されたのか、実際のエラーがあったのかを知ることは不可能なためです。この設定は、アプリケーションマスターがドライバからクリーンでない方法で(つまり、適切なシャットダウンスハンドシェイクなしで)切断された場合、アプリケーションが最終ステータスFAILEDで終了するように、この動作を変更します。これにより、呼び出し元はそれが本当に失敗だったかどうかを判断できます。この設定が設定されていて、ユーザーがクライアントアプリケーションを誤って終了した場合、実際にはFAILEDではなかったのにFAILEDステータスが表示される可能性があることに注意してください。 | 3.3.0 |
spark.yarn.am.clientModeExitOnError |
false | yarn-clientモードで、これがtrueの場合、ドライバが最終ステータスKILLEDまたはFAILEDでアプリケーションレポートを取得した場合、ドライバは対応するSparkContextを停止し、コード1でプログラムを終了します。これがtrueで別のアプリケーションから呼び出された場合、親アプリケーションも終了します。 | 3.3.0 |
spark.yarn.am.tokenConfRegex |
(なし) | この設定値は、ジョブの設定ファイル(例:hdfs-site.xml)から設定エントリのリストをgrepするために使用される正規表現です。このリストはResourceManager (RM) に送信され、委任トークンの更新時に使用されます。この機能の典型的なユースケースは、YARNクラスタが複数のダウンストリームHDFSクラスタと通信する必要がある環境で委任トークンをサポートすることです。この場合、YARN RMにはこれらのクラスタに接続するための設定(例:dfs.nameservices、dfs.ha.namenodes.*、dfs.namenode.rpc-address.*)がない可能性があります。このようなシナリオでは、Sparkユーザーは設定値を`^dfs.nameservices$|^dfs.namenode.rpc-address.*$|^dfs.ha.namenodes.*$`として指定することで、ジョブのローカル設定ファイルからこれらのHDFS設定を解析できます。この設定は`mapreduce.job.send-token-conf`と非常に似ています。詳細はYARN-5910を参照してください。 | 3.3.0 |
spark.yarn.submit.waitAppCompletion |
true |
YARNクラスタモードで、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。true に設定されている場合、クライアントプロセスは存続し、アプリケーションのステータスを報告します。それ以外の場合は、クライアントプロセスは送信後に終了します。 |
1.4.0 |
spark.yarn.am.nodeLabelExpression |
(なし) | AMがスケジュールされるノードのセットを制限するYARNノードラベル式です。YARN 2.6以上のバージョンのみがノードラベル式をサポートしているため、それ以前のバージョンで実行する場合は、このプロパティは無視されます。 | 1.6.0 |
spark.yarn.executor.nodeLabelExpression |
(なし) | エグゼキュータがスケジュールされるノードのセットを制限するYARNノードラベル式です。YARN 2.6以上のバージョンのみがノードラベル式をサポートしているため、それ以前のバージョンで実行する場合は、このプロパティは無視されます。 | 1.4.0 |
spark.yarn.tags |
(なし) | YARN ApplicationReportsに表示されるYARNアプリケーションタグとして渡される、コンマ区切りの文字列リストです。YARNアプリケーションのクエリ時にフィルタリングに使用できます。 | 1.5.0 |
spark.yarn.priority |
(なし) | 保留中のアプリケーションの順序付けポリシーを定義するためのYARNのアプリケーション優先度です。値が大きいほど、アクティブになる可能性が高くなります。現在、YARNはFIFO順序付けポリシーを使用する場合にのみアプリケーション優先度をサポートしています。 | 3.0.0 |
spark.yarn.config.gatewayPath |
(なし) | ゲートウェイホスト(Sparkアプリケーションが開始されるホスト)で有効なパスですが、クラスタ内の他のノードの同じリソースのパスとは異なる場合があります。spark.yarn.config.replacementPath と組み合わせて、異種構成のクラスタをサポートし、Sparkがリモートプロセスを正しく起動できるようにします。置換パスには通常、YARNによってエクスポートされ(そのため、Sparkコンテナから見える)、いくつかの環境変数への参照が含まれます。 たとえば、ゲートウェイノードにHadoopライブラリが`/disk1/hadoop`にインストールされており、Hadoopインストールの場所はYARNによって環境変数`HADOOP_HOME`としてエクスポートされている場合、この値を`/disk1/hadoop`に、置換パスを`$HADOOP_HOME`に設定すると、リモートプロセスの起動に使用されるパスがローカルのYARN構成を正しく参照するようになります。 |
1.5.0 |
spark.yarn.config.replacementPath |
(なし) | spark.yarn.config.gatewayPath を参照してください。 |
1.5.0 |
spark.yarn.rolledLog.includePattern |
(なし) | 定義されたインクルードパターンに一致するログファイルをフィルタリングするJava正規表現です。これらのログファイルはローリング方式で集計されます。YARNのローリングログ集計で使用されます。YARN側でこの機能を有効にするには、`yarn-site.xml`で`yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds`を設定する必要があります。Sparkのlog4jアペンダーは、実行中にファイルが削除されることを処理できるFileAppenderまたは別のAppenderを使用するように変更する必要があります。log4j設定で構成されたファイル名(spark.logなど)に基づいて、ユーザーは集計する必要があるすべてのログファイルを含める正規表現(spark*)を設定する必要があります。 | 2.0.0 |
spark.yarn.rolledLog.excludePattern |
(なし) | 定義された除外パターンに一致するログファイルをフィルタリングするJava正規表現です。これらのログファイルはローリング方式で集計されません。ログファイル名がインクルードパターンと除外パターンの両方に一致する場合、最終的にこのファイルは除外されます。 | 2.0.0 |
spark.yarn.executor.launch.excludeOnFailure.enabled |
false | YARNリソース割り当ての問題のあるノードを除外する機能を有効にするフラグです。除外するエラーの制限は、spark.excludeOnFailure.application.maxFailedExecutorsPerNode で設定できます。 |
2.4.0 |
spark.yarn.exclude.nodes |
(なし) | リソース割り当てから除外されるYARNノード名のコンマ区切りのリストです。 | 3.0.0 |
spark.yarn.metrics.namespace |
(なし) | AMメトリクスレポートのルート名前空間です。設定されていない場合は、YARNアプリケーションIDが使用されます。 | 2.4.0 |
spark.yarn.report.interval |
1s |
クラスタモードでの現在のSparkジョブステータスのレポート間のインターバルです。 | 0.9.0 |
spark.yarn.report.loggingFrequency |
30 |
次のアプリケーションステータスがログに記録されるまで処理されるアプリケーションレポートの最大数です。ステータスが変更された場合は、処理されたアプリケーションレポートの数に関係なく、アプリケーションステータスがログに記録されます。 | 3.5.0 |
spark.yarn.clientLaunchMonitorInterval |
1s |
アプリの開始時にクライアントモードAMのステータス要求間のインターバルです。 | 2.3.0 |
spark.yarn.includeDriverLogsLink |
false |
クラスタモードで、クライアントアプリケーションレポートにドライバコンテナのログへのリンクが含まれるかどうかを指定します。ResourceManagerのREST APIをポーリングする必要があるため、RMに追加の負荷がかかります。 | 3.1.0 |
spark.yarn.unmanagedAM.enabled |
false |
クライアントモードで、アンマネージドAMを使用してクライアントの一部としてApplication Masterサービスを起動するかどうかを指定します。 | 3.0.0 |
spark.yarn.shuffle.server.recovery.disabled |
false | セキュリティ要件が高く、シークレットをデータベースに保存しない方が良いアプリケーションにtrueを設定します。このようなアプリケーションのシャッフルデータは、External Shuffle Serviceの再起動後には復元されません。 | 3.5.0 |
SHS カスタムエグゼキュータログURL の利用可能なパターン
パターン | 説明 |
---|---|
{{HTTP_SCHEME}} | YARN HTTPポリシーに従ってhttp:// またはhttps:// 。(yarn.http.policy で構成) |
{{NM_HOST}} | コンテナが実行されたノードの「ホスト」です。 |
{{NM_PORT}} | コンテナが実行されたノードマネージャの「ポート」です。 |
{{NM_HTTP_PORT}} | コンテナが実行されたノードマネージャのHTTPサーバーの「ポート」です。 |
{{NM_HTTP_ADDRESS}} | コンテナが割り当てられたノードのHTTP URIです。 |
{{CLUSTER_ID}} | ResourceManagerのクラスタID。(yarn.resourcemanager.cluster-id で構成) |
{{CONTAINER_ID}} | コンテナのIDです。 |
{{USER}} | システム環境のSPARK_USER 。 |
{{FILE_NAME}} | stdout 、stderr 。 |
たとえば、ログURLリンクをNodeManager HTTPサーバーによるリダイレクトではなく、ジョブ履歴サーバーに直接ポイントしたい場合は、以下のようにspark.history.custom.executor.log.url
を設定できます。
{{HTTP_SCHEME}}<JHS_HOST>:<JHS_PORT>/jobhistory/logs/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}?start=-4096
注:<JHS_HOST>と<JHS_PORT>は実際の値に置き換える必要があります。
リソース割り当てと設定の概要
設定ページのカスタムリソーススケジューリングと設定の概要セクションをよくお読みください。このセクションでは、リソーススケジューリングのYARN固有の側面についてのみ説明しています。
ユーザーがSparkで使用したいリソースをサポートするには、YARNを設定する必要があります。YARNのリソーススケジューリングはYARN 3.1.0で追加されました。リソースの構成と分離の適切な設定の詳細については、YARNのドキュメントを参照してください。理想的には、エグゼキュータが割り当てられたリソースのみを参照できるように、リソースは分離して設定されます。分離を有効にしていない場合は、ユーザーは、エグゼキュータ間でリソースが共有されないようにする検出スクリプトを作成する必要があります。
YARNはユーザー定義のリソースタイプをサポートしていますが、GPU(`yarn.io/gpu`)とFPGA(`yarn.io/fpga`)の組み込みタイプがあります。そのため、これらのリソースのいずれかを使用している場合、SparkはSparkリソースのリクエストをYARNリソースに変換できるため、spark.{driver/executor}.resource.
設定を指定するだけで済みます。YARNでGPUまたはFPGAにカスタムリソースタイプを使用している場合は、spark.yarn.resourceGpuDeviceName
とspark.yarn.resourceFpgaDeviceName
を使用してSparkのマッピングを変更できます。FPGAまたはGPU以外のリソースを使用している場合は、YARN(spark.yarn.{driver/executor}.resource.
)とSpark(spark.{driver/executor}.resource.
)の両方の設定をユーザーが指定する必要があります。
たとえば、ユーザーが各エグゼキュータに2つのGPUを要求する場合、ユーザーはspark.executor.resource.gpu.amount=2
を指定するだけで、SparkがYARNから`yarn.io/gpu`リソースタイプを要求します。
ユーザーがユーザー定義のYARNリソース(`acceleratorX`と呼びます)を持っている場合、ユーザーはspark.yarn.executor.resource.acceleratorX.amount=2
とspark.executor.resource.acceleratorX.amount=2
を指定する必要があります。
YARNは、各コンテナに割り当てられたリソースのアドレスをSparkに通知しません。そのため、ユーザーは、起動時にエグゼキュータによって実行され、そのエグゼキュータで使用可能なリソースを検出する検出スクリプトを指定する必要があります。例となるスクリプトは、examples/src/main/scripts/getGpusResources.sh
にあります。スクリプトには実行権限を設定する必要があり、ユーザーは悪意のあるユーザーがスクリプトを変更できないように権限を設定する必要があります。スクリプトは、ResourceInformationクラスの形式でJSON文字列をSTDOUTに出力する必要があります。これには、リソース名と、そのエグゼキュータのみが使用できるリソースアドレスの配列が含まれています。
ステージレベルスケジューリングの概要
ステージレベルのスケジューリングはYARNでサポートされています。
- 動的割り当てが無効になっている場合:ユーザーはステージレベルで異なるタスクリソース要件を指定でき、起動時に要求されたエグゼキュータと同じエグゼキュータを使用します。
- 動的割り当てが有効になっている場合:ユーザーはステージレベルでタスクとエグゼキュータのリソース要件を指定でき、追加のエグゼキュータを要求します。
YARN固有の注意点として、各ResourceProfileには異なるYARNコンテナ優先度が必要であることが挙げられます。マッピングは単純にResourceProfile IDが優先度になり、YARNでは数値が小さいほど優先度が高くなります。つまり、先に作成されたプロファイルの方がYARNでの優先度が高くなります。通常、Sparkは1つのステージが完了する前に次のステージを開始するため、これは問題になりません。影響を与える可能性があるのはジョブサーバータイプのシナリオのみです。念頭に置いておいてください。カスタムリソースの処理方法は、基本のデフォルトプロファイルとカスタムResourceProfileで異なります。ユーザーがSparkでのスケジューリングを行わずに、追加リソースを持つYARNコンテナを要求できるようにするため、ユーザーはspark.yarn.executor.resource.
設定を使用してリソースを指定できます。ただし、これらの設定は基本のデフォルトプロファイルでのみ使用され、他のカスタムResourceProfileには伝播されません。これは、ステージでそれらを使用しないようにする場合に削除する方法がないためです。その結果、デフォルトプロファイルにはspark.yarn.executor.resource.
で定義されたカスタムリソースと、Sparkで定義されたGPUまたはFPGAのリソースが設定されます。SparkはGPUとFPGAのリソースをYARN組み込みタイプyarn.io/gpu
とyarn.io/fpga
に変換しますが、他のリソースのマッピングは認識しません。その他のSparkカスタムリソースは、デフォルトプロファイルのYARNには伝播されません。そのため、Sparkがカスタムリソースに基づいてスケジューリングを行い、YARNから要求するようにするには、YARN(spark.yarn.{driver/executor}.resource.
)とSpark(spark.{driver/executor}.resource.
)の両方の設定で指定する必要があります。YARNコンテナに追加リソースのみを必要とし、Sparkでそれらを使用してスケジューリングを行わない場合は、Sparkの設定をオフのままにしてください。カスタムResourceProfileの場合、現在、Sparkによるスケジューリングを行わずにYARNリソースのみを指定する方法はありません。つまり、カスタムResourceProfileでは、ResourceProfileで定義されているすべてのリソースがYARNに伝播されます。GPUとFPGAも同様にYARN組み込みタイプに変換されます。そのため、指定するカスタムリソースの名前は、YARNで定義されている名前と一致する必要があります。
重要な注意事項
- コア要求がスケジューリングの意思決定で考慮されるかどうかは、使用しているスケジューラとその設定によって異なります。
cluster
モードでは、Spark ExecutorとSpark Driverで使用されるローカルディレクトリは、YARN用に設定されたローカルディレクトリ(Hadoop YARN設定yarn.nodemanager.local-dirs
)になります。ユーザーがspark.local.dir
を指定した場合、それは無視されます。client
モードでは、Spark ExecutorはYARN用に設定されたローカルディレクトリを使用しますが、Spark Driverはspark.local.dir
で定義されたローカルディレクトリを使用します。これは、client
モードではSpark DriverがYARNクラスタ上で実行されず、Spark Executorのみが実行されるためです。--files
オプションと--archives
オプションは、Hadoopと同様に#を使用してファイル名を指定できます。たとえば、--files localtest.txt#appSees.txt
と指定すると、ローカルにlocaltest.txt
という名前で存在するファイルがHDFSにアップロードされますが、appSees.txt
という名前でリンクされます。アプリケーションは、YARN上で実行する際にはappSees.txt
という名前を使用して参照する必要があります。--jars
オプションを使用すると、ローカルファイルを使用していてcluster
モードで実行している場合、SparkContext.addJar
関数が動作します。HDFS、HTTP、HTTPS、またはFTPファイルを使用している場合は、使用する必要はありません。
Kerberos
Sparkの標準Kerberosサポートについては、「セキュリティ」ページを参照してください。
YARNモードでHadoopファイルシステムにアクセスする場合、Hadoop設定のデフォルトファイルシステムに加えて、SparkはSparkアプリケーションのステージングディレクトリをホストするサービスの委任トークンも自動的に取得します。
YARN 固有の Kerberos 設定
プロパティ名 | デフォルト値 | 説明 | 導入バージョン |
---|---|---|---|
spark.kerberos.keytab |
(なし) | 上記で指定されたプリンシパルのキータブを含むファイルへのフルパス。このキータブは、YARN分散キャッシュを介してYARN Application Masterを実行しているノードにコピーされ、ログインチケットと委任トークンの定期的な更新に使用されます。コマンドライン引数--keytab に相当します。(「local」マスターでも機能します。) |
3.0.0 |
spark.kerberos.principal |
(なし) | セキュアクラスタで実行中にKDCにログインするために使用されるプリンシパル。コマンドライン引数--principal に相当します。(「local」マスターでも機能します。) |
3.0.0 |
spark.yarn.kerberos.relogin.period |
1m | Kerberos TGTを更新する必要があるかどうかを確認する頻度。これは、TGT更新期間(またはTGT更新が無効になっている場合はTGTの有効期間)よりも短い値に設定する必要があります。ほとんどのデプロイメントでは、デフォルト値で十分です。 | 2.3.0 |
spark.yarn.kerberos.renewal.excludeHadoopFileSystems |
(なし) | リソーススケジューラでの委任トークン更新から除外されるホストのHadoopファイルシステムのカンマ区切りリスト。例:spark.yarn.kerberos.renewal.excludeHadoopFileSystems=hdfs://nn1.com:8032, hdfs://nn2.com:8032 。これは現時点ではYARNで動作することが知られています。そのため、YARNリソースマネージャはアプリケーションのトークンを更新しません。リソーススケジューラはトークンを更新しないため、元のトークンの有効期限を過ぎたアプリケーションでそのトークンを使用しようとすると、失敗する可能性が高いことに注意してください。 |
3.2.0 |
Kerberos のトラブルシューティング
Hadoop/Kerberosの問題のデバッグは「困難」な場合があります。便利なテクニックの1つは、HADOOP_JAAS_DEBUG
環境変数を設定して、HadoopでのKerberos操作のログを詳細に記録することです。
export HADOOP_JAAS_DEBUG=true
JDKクラスは、システムプロパティsun.security.krb5.debug
とsun.security.spnego.debug=true
を介して、KerberosとSPNEGO/REST認証の詳細なログ記録を有効にするように設定できます。
-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
これらのオプションはすべて、Application Masterで有効にできます。
spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG true
spark.yarn.am.extraJavaOptions -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true
最後に、org.apache.spark.deploy.yarn.Client
のログレベルがDEBUG
に設定されている場合、ログには取得されたすべてのトークンとその有効期限の詳細が含まれます。
外部シャッフルサービスの設定
YARNクラスタの各NodeManager
でSpark Shuffle Serviceを起動するには、次の手順に従ってください。
- SparkをYARNプロファイルでビルドします。事前にパッケージ化されたディストリビューションを使用している場合は、この手順をスキップしてください。
spark-<version>-yarn-shuffle.jar
を探します。これは、自分でSparkをビルドしている場合は$SPARK_HOME/common/network-yarn/target/scala-<version>
の下に、ディストリビューションを使用している場合はyarn
の下にあります。- このjarをクラスタ内のすべての
NodeManager
のクラスパスに追加します。 - 各ノードの
yarn-site.xml
で、yarn.nodemanager.aux-services
にspark_shuffle
を追加し、yarn.nodemanager.aux-services.spark_shuffle.class
をorg.apache.spark.network.yarn.YarnShuffleService
に設定します。 - シャッフル中のガベージコレクションの問題を回避するために、
etc/hadoop/yarn-env.sh
でYARN_HEAPSIZE
(デフォルトは1000)を設定して、NodeManager
のヒープサイズを増やします。 - クラスタ内のすべての
NodeManager
を再起動します。
シャッフルサービスがYARNで実行されている場合、次の追加設定オプションを使用できます。
プロパティ名 | デフォルト値 | 説明 | 導入バージョン |
---|---|---|---|
spark.yarn.shuffle.stopOnFailure |
false |
Spark Shuffle Serviceの初期化に失敗した場合にNodeManagerを停止するかどうか。これにより、Spark Shuffle Serviceが実行されていないNodeManagerでコンテナを実行することによって発生するアプリケーションの失敗を防ぎます。 | 2.1.0 |
spark.yarn.shuffle.service.metrics.namespace |
sparkShuffleService |
NodeManagerのHadoop metrics2システムにシャッフルサービスメトリックを出力する際に使用する名前空間。 | 3.2.0 |
spark.yarn.shuffle.service.logs.namespace |
(設定なし) |
YARNシャッフルサービスからのログを出力するために使用するロガー名を作成する際に、クラス名に追加される名前空間。例:org.apache.spark.network.yarn.YarnShuffleService.logsNamespaceValue 。一部のロギングフレームワークはロガー名がクラス名のように見えることを期待するため、一般的に有効なJavaパッケージ名またはクラス名であり、スペースを含まない値を提供することをお勧めします。 |
3.3.0 |
spark.shuffle.service.db.backend |
LEVELDB | YARNで作業保持再起動が有効になっている場合、これはシャッフルサービス状態ストアで使用されるディスクベースストアを指定するために使用されます。「LEVELDB」と「ROCKSDB」をサポートし、「LEVELDB」がデフォルト値です。`LevelDB/RocksDB`の元のデータストアは、現時点では自動的に別の種類のストレージに変換されません。元のデータストアは保持され、ストレージの種類を切り替える際に新しいタイプのデータストアが作成されます。 | 3.4.0 |
上記の手順では、デフォルトのシャッフルサービス名spark_shuffle
が使用されていることを前提としています。ここでは任意の名前を使用できますが、YARN NodeManagerの設定で使用される値は、Sparkアプリケーションのspark.shuffle.service.name
の値と一致する必要があります。
シャッフルサービスは、デフォルトでは、NodeManagerで使用されるHadoop Configuration(例:yarn-site.xml
)からすべての構成を取得します。ただし、spark-shuffle-site.xml
という名前のファイルを使用して、シャッフルサービスを個別に構成することもできます。このファイルは、シャッフルサービスのクラスパス(デフォルトではNodeManagerのクラスパスと共有されています)に配置する必要があります。シャッフルサービスはこれを標準的なHadoop Configurationリソースとして扱い、NodeManagerの構成の上に重ねます。
Apache Oozie を使用したアプリケーションの起動
Apache Oozieは、ワークフローの一部としてSparkアプリケーションを起動できます。セキュアクラスタでは、起動されたアプリケーションは、クラスタのサービスにアクセスするために関連するトークンを必要とします。Sparkがキータブを使用して起動された場合、これは自動的に行われます。ただし、Sparkをキータブなしで起動する場合は、セキュリティの設定の責任がOozieに移譲されます。
セキュアクラスタ用のOozieの設定とジョブの資格情報の取得の詳細については、特定のリリースのドキュメントの「認証」セクションにあるOozie Webサイトを参照してください。
Sparkアプリケーションの場合、Oozieワークフローは、アプリケーションに必要なすべてのトークン(以下を含む)を要求するように設定する必要があります。
- YARNリソースマネージャ。
- ローカルHadoopファイルシステム。
- I/Oのソースまたは宛先として使用されるリモートHadoopファイルシステム。
- Hive(使用する場合)。
- HBase(使用する場合)。
- アプリケーションがこのサーバーとやり取りする場合、YARNタイムラインサーバー。
SparkがHive、HBase、およびリモートHDFSトークンの取得を試みて失敗するのを避けるために、Spark設定でサービスのトークン収集を無効にする必要があります。
Spark設定には、次の行を含める必要があります。
spark.security.credentials.hive.enabled false
spark.security.credentials.hbase.enabled false
設定オプションspark.kerberos.access.hadoopFileSystems
は設定解除する必要があります。
Spark Web UI の代わりに Spark History Server を使用すること
アプリケーションUIが無効になっている場合、実行中のアプリケーションのトラッキングURLとしてSpark History Serverのアプリケーションページを使用できます。これはセキュアなクラスタの場合や、Sparkドライバのメモリ使用量を削減するために望ましい場合があります。Spark History Serverによるトラッキングを設定するには、以下の手順を実行します。
- アプリケーション側では、Sparkの設定で
spark.yarn.historyServer.allowTracking=true
を設定します。これにより、アプリケーションのUIが無効になっている場合、Sparkは履歴サーバーのURLをトラッキングURLとして使用します。 - Spark History Serverでは、
spark.ui.filters
設定のフィルタリストにorg.apache.spark.deploy.yarn.YarnProxyRedirectFilter
を追加します。
履歴サーバーの情報は、アプリケーションの状態と最新ではない可能性があることに注意してください。
複数のバージョンの Spark シャッフルサービスの実行のサポート
このセクションは、YARNバージョン2.9.0以降で実行する場合にのみ適用されることに注意してください。
場合によっては、異なるバージョンのSparkを使用するSpark Shuffle Serviceの複数のインスタンスを実行することが望ましい場合があります。これは、たとえば、複数のSparkバージョンを実行するアプリケーションの混在ワークロードを持つYARNクラスタを実行する場合に役立ちます。特定のバージョンのシャッフルサービスは、他のバージョンのSparkと常に互換性があるとは限らないためです。YARNバージョン2.9.0以降は、分離されたクラスローダー内でシャッフルサービスを実行する機能をサポートしています(YARN-4577を参照)。つまり、単一ノードマネージャ内で複数のSparkバージョンが共存できます。yarn.nodemanager.aux-services.<service-name>.classpath
、およびYARN 2.10.2/3.1.1/3.2.0以降ではyarn.nodemanager.aux-services.<service-name>.remote-classpath
オプションを使用してこれを設定できます。YARN 3.3.0/3.3.1には、回避策としてyarn.nodemanager.aux-services.<service-name>.system-classes
を設定する必要がある問題があります。YARN-11053を参照してください。個別のクラスパスの設定に加えて、2つのバージョンが異なるポートにアドバタイズするようにする必要があります。これは、上記で説明したspark-shuffle-site.xml
ファイルを使用して実現できます。たとえば、次のような設定を行うことができます。
yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y
yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-path/fat.jar:/path/to/spark-x-config
yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-path/fat.jar:/path/to/spark-y-config
または
yarn.nodemanager.aux-services = spark_shuffle_x,spark_shuffle_y
yarn.nodemanager.aux-services.spark_shuffle_x.classpath = /path/to/spark-x-path/*:/path/to/spark-x-config
yarn.nodemanager.aux-services.spark_shuffle_y.classpath = /path/to/spark-y-path/*:/path/to/spark-y-config
2つのspark-*-config
ディレクトリにはそれぞれ1つのファイルspark-shuffle-site.xml
が含まれています。これらはHadoop Configuration形式のXMLファイルであり、それぞれにポート番号とメトリクス名のプレフィックスを調整するためのいくつかの設定が含まれています。
<configuration>
<property>
<name>spark.shuffle.service.port</name>
<value>7001</value>
</property>
<property>
<name>spark.yarn.shuffle.service.metrics.namespace</name>
<value>sparkShuffleServiceX</value>
</property>
</configuration>
2つの異なるサービスに対して、これらの値は異なる必要があります。
次に、Sparkアプリケーションの設定では、一方を次のように設定する必要があります。
spark.shuffle.service.name = spark_shuffle_x
spark.shuffle.service.port = 7001
そして、もう一方を次のように設定する必要があります。
spark.shuffle.service.name = spark_shuffle_y
spark.shuffle.service.port = <other value>