Spark スタンドアロンモード

YARN クラスターマネージャー上で実行することに加えて、Spark はシンプルなスタンドアロンデプロイモードも提供します。スタンドアロンクラスターは、マスターとワーカーを手動で起動して手動で起動することも、提供されている 起動スクリプト を使用することもできます。テストのためにこれらのデーモンを 1 台のマシンで実行することも可能です。

セキュリティ

認証のようなセキュリティ機能はデフォルトでは有効になっていません。インターネットまたは信頼されていないネットワークに公開されているクラスターをデプロイする場合、不正なアプリケーションがクラスター上で実行されるのを防ぐために、クラスターへのアクセスを保護することが重要です。Spark を実行する前に、Spark セキュリティ およびこのドキュメントの特定のセキュリティセクションを参照してください。

Spark スタンドアロンモードをクラスターにインストールする

Spark スタンドアロンモードをインストールするには、コンパイル済みの Spark をクラスターの各ノードに配置するだけです。各リリースで事前ビルドされた Spark を入手することも、自分でビルド することもできます。

クラスターを手動で起動する

スタンドアロンマスターサーバーを起動するには、以下を実行します。

./sbin/start-master.sh

起動後、マスターは自身の spark://HOST:PORT URL を出力します。この URL を使用してワーカーを接続したり、SparkContext の「master」引数として渡したりできます。この URL は、マスターの Web UI (デフォルトでは https://:8080) でも確認できます。

同様に、1 つ以上のワーカーを起動し、以下を介してマスターに接続できます。

./sbin/start-worker.sh <master-spark-URL>

ワーカーを起動したら、マスターの Web UI (デフォルトでは https://:8080) を確認してください。新しいノードが、CPU 数とメモリ (OS 用に 1GB 残す) とともにリストされているはずです。

最後に、以下の設定オプションをマスターとワーカーに渡すことができます。

引数意味
-h HOST, --host HOST リッスンするホスト名
-p PORT, --port PORT サービスがリッスンするポート (デフォルト: マスターは 7077、ワーカーはランダム)
--webui-port PORT Web UI のポート (デフォルト: マスターは 8080、ワーカーは 8081)
-c CORES, --cores CORES Spark アプリケーションがマシンで使用することを許可する合計 CPU コア数 (デフォルト: すべて利用可能); ワーカーのみ
-m MEM, --memory MEM Spark アプリケーションがマシンで使用することを許可する合計メモリ量 (例: 1000M または 2G); ワーカーのみ
-d DIR, --work-dir DIR スクラッチスペースとジョブ出力ログに使用するディレクトリ (デフォルト: SPARK_HOME/work); ワーカーのみ
--properties-file FILE ロードするカスタム Spark プロパティファイルへのパス (デフォルト: conf/spark-defaults.conf)

クラスター起動スクリプト

起動スクリプトを使用して Spark スタンドアロンクラスターを起動するには、Spark ディレクトリに conf/workers という名前のファイルを作成する必要があります。このファイルには、Spark ワーカーを起動する予定のすべてのマシンのホスト名を 1 行に 1 つずつ記述する必要があります。conf/workers が存在しない場合、起動スクリプトはデフォルトで単一マシン (localhost) を使用します。これはテストに便利です。注意、マスターマシンは各ワーカーマシンに SSH でアクセスします。デフォルトでは、SSH は並列で実行され、パスワードなし (秘密鍵を使用) のアクセス設定が必要です。パスワードなしの設定がない場合は、環境変数 SPARK_SSH_FOREGROUND を設定して、各ワーカーのパスワードをシリアルに入力できます。

このファイルをセットアップしたら、Hadoop のデプロイ スクリプトに基づいた以下のシェルスクリプトを使用して、クラスターを起動または停止できます。これらのスクリプトは SPARK_HOME/sbin にあります。

これらのスクリプトは、ローカルマシンではなく、Spark マスターを実行したいマシンで実行する必要があることに注意してください。

conf/spark-env.sh で環境変数を設定することにより、クラスターをさらに設定することができます。このファイルは conf/spark-env.sh.template から開始して作成し、設定を有効にするために *すべてのワーカーマシンにコピー* してください。以下の設定が利用可能です。

環境変数意味
SPARK_MASTER_HOST マスターを特定のホスト名または IP アドレス (例: 公開されているもの) にバインドします。
SPARK_MASTER_PORT マスターを別のポートで起動します (デフォルト: 7077)。
SPARK_MASTER_WEBUI_PORT マスター Web UI のポート (デフォルト: 8080)。
SPARK_MASTER_OPTS マスターのみに適用される設定プロパティを "-Dx=y" の形式で指定します (デフォルト: なし)。可能なオプションのリストについては、以下を参照してください。
SPARK_LOCAL_DIRS Spark で "スクラッチ" スペースに使用するディレクトリ。これには、マップ出力ファイルやディスクに保存される RDD が含まれます。これは、システム上の高速なローカルディスク上にある必要があります。複数のディスク上の複数のディレクトリのカンマ区切りリストにすることもできます。
SPARK_LOG_DIR ログファイルが保存される場所。(デフォルト: SPARK_HOME/logs)。
SPARK_LOG_MAX_FILES ログファイルの最大数 (デフォルト: 5)。
SPARK_PID_DIR PID ファイルが保存される場所。(デフォルト: /tmp)。
SPARK_WORKER_CORES Spark アプリケーションがマシンで使用することを許可する合計コア数 (デフォルト: 利用可能なすべてのコア)。
SPARK_WORKER_MEMORY Spark アプリケーションがマシンで使用することを許可する合計メモリ量 (例: 1000m2g) (デフォルト: 総メモリから 1GiB を引いた値)。各アプリケーションの*個別の*メモリは、`spark.executor.memory` プロパティを使用して構成されることに注意してください。
SPARK_WORKER_PORT Spark ワーカーを特定のポートで起動します (デフォルト: ランダム)。
SPARK_WORKER_WEBUI_PORT ワーカー Web UI のポート (デフォルト: 8081)。
SPARK_WORKER_DIR アプリケーションを実行するディレクトリ。ログとスクラッチスペースの両方が含まれます (デフォルト: SPARK_HOME/work)。
SPARK_WORKER_OPTS ワーカーのみに適用される設定プロパティを "-Dx=y" の形式で指定します (デフォルト: なし)。可能なオプションのリストについては、以下を参照してください。
SPARK_DAEMON_MEMORY Spark マスターおよびワーカーデーモン自体に割り当てるメモリ (デフォルト: 1g)。
SPARK_DAEMON_JAVA_OPTS Spark マスターおよびワーカーデーモン自体に対する JVM オプションを "-Dx=y" の形式で指定します (デフォルト: なし)。
SPARK_DAEMON_CLASSPATH Spark マスターおよびワーカーデーモン自体のクラスパス (デフォルト: なし)。
SPARK_PUBLIC_DNS Spark マスターおよびワーカーの公開 DNS 名 (デフォルト: なし)。

注意: 起動スクリプトは現在 Windows をサポートしていません。Windows 上で Spark クラスターを実行するには、マスターとワーカーを手動で起動してください。

SPARK_MASTER_OPTS は以下のシステムプロパティをサポートしています。

プロパティ名デフォルト意味バージョン以降
spark.master.ui.port 8080 マスター Web UI エンドポイントのポート番号を指定します。 1.1.0
spark.master.ui.title (なし) マスター UI ページのタイトルを指定します。設定されていない場合、デフォルトで `Spark Master at 'master url'` が使用されます。 4.0.0
spark.master.ui.decommission.allow.mode LOCAL マスター Web UI の /workers/kill エンドポイントの動作を指定します。可能な選択肢は次のとおりです: `LOCAL` はマスターを実行しているマシンからローカルな IP からこのエンドポイントを許可することを意味します、`DENY` はこのエンドポイントを完全に無効にすることを意味します、`ALLOW` は任意の IP からこのエンドポイントを呼び出すことを許可することを意味します。 3.1.0
spark.master.ui.historyServerUrl (なし) Spark History Server が実行されている URL。すべての Spark ジョブが History Server がアクセスする同じイベントログの場所を共有していることを前提としていることに注意してください。 4.0.0
spark.master.rest.enabled false マスター REST API エンドポイントを使用するかどうか。 1.3.0
spark.master.rest.host (なし) マスター REST API エンドポイントのホストを指定します。 4.0.0
spark.master.rest.port 6066 マスター REST API エンドポイントのポート番号を指定します。 1.3.0
spark.master.rest.filters (なし) マスター REST API に適用するフィルタークラス名のカンマ区切りリスト。 4.0.0
spark.master.useAppNameAsAppId.enabled false (実験的) true の場合、Spark マスターはユーザー指定の appName を appId として使用します。 4.0.0
spark.deploy.retainedApplications 200 表示する完了したアプリケーションの最大数。この制限を維持するために、古いアプリケーションは UI から削除されます。
0.8.0
spark.deploy.retainedDrivers 200 表示する完了したドライバーの最大数。この制限を維持するために、古いドライバーは UI から削除されます。
1.1.0
spark.deploy.spreadOutDrivers true スタンドアロンクラスターマネージャーがドライバーをノード全体に分散させるか、できるだけ少ないノードに統合しようとするかどうか。分散は通常 HDFS でのデータローカリティに優れていますが、統合は計算集約型のワークロードにより効率的です。 4.0.0
spark.deploy.spreadOutApps true スタンドアロンクラスターマネージャーがアプリケーションをノード全体に分散させるか、できるだけ少ないノードに統合しようとするかどうか。分散は通常 HDFS でのデータローカリティに優れていますが、統合は計算集約型のワークロードにより効率的です。
0.6.1
spark.deploy.defaultCores Int.MaxValue Spark のスタンドアロンモードで、アプリケーションが `spark.cores.max` を設定しない場合にアプリケーションに与えるデフォルトのコア数。設定されていない場合、アプリケーションは `spark.cores.max` を自分で構成しない限り、常に利用可能なすべてのコアを取得します。共有クラスターでこれを低く設定すると、ユーザーがデフォルトでクラスター全体を占有するのを防ぐことができます。
0.9.0
spark.deploy.maxExecutorRetries 10 スタンドアロンクラスターマネージャーが欠陥のあるアプリケーションを削除する前に発生する可能性のある連続した Executor の障害の最大回数に対する制限。実行中の Executor が 1 つでもある場合、アプリケーションが削除されることはありません。アプリケーションが spark.deploy.maxExecutorRetries 回以上の連続した障害を経験し、その障害の間に Executor が正常に開始されず、アプリケーションに実行中の Executor がない場合、スタンドアロンクラスターマネージャーはアプリケーションを削除して失敗とマークします。この自動削除を無効にするには、spark.deploy.maxExecutorRetries-1 に設定してください。
1.6.3
spark.deploy.maxDrivers Int.MaxValue 実行中のドライバーの最大数。 4.0.0
spark.deploy.appNumberModulo (なし) アプリ番号のモジュロ。デフォルトでは、`app-yyyyMMddHHmmss-9999` の次は `app-yyyyMMddHHmmss-10000` になります。モジュロが 10000 の場合、`app-yyyyMMddHHmmss-0000` になります。ほとんどの場合、プレフィックス `app-yyyyMMddHHmmss` は 10000 個のアプリケーションを作成する際に既に増分されます。 4.0.0
spark.deploy.driverIdPattern driver-%s-%04d Java の `String.format` メソッドに基づいたドライバー ID 生成パターン。デフォルト値は `driver-%s-%04d` で、既存のドライバー ID 文字列を表します (例: `driver-20231031224459-0019`)。一意の ID を生成するように注意してください。 4.0.0
spark.deploy.appIdPattern app-%s-%04d Java の `String.format` メソッドに基づいたアプリ ID 生成パターン。デフォルト値は `app-%s-%04d` で、既存のアプリ ID 文字列を表します (例: `app-20231031224509-0008`)。一意の ID を生成するように注意してください。 4.0.0
spark.worker.timeout 60 スタンドアロンデプロイマスターがハートビートを受け取らなかった場合にワーカーが失われたと見なすまでの秒数。 0.6.2
spark.dead.worker.persistence 15 UI でデッドワーカー情報を保持するイテレーション数。デフォルトでは、デッドワーカーは最後のハートビートから (15 + 1) * `spark.worker.timeout` の間表示されます。 0.8.0
spark.worker.resource.{name}.amount (なし) ワーカー上の特定のリソースの使用量。 3.0.0
spark.worker.resource.{name}.discoveryScript (なし) ワーカー起動時に特定のリソースを見つけるために使用されるリソース検出スクリプトへのパス。スクリプトの出力は `ResourceInformation` クラスのようにフォーマットされるべきです。 3.0.0
spark.worker.resourcesFile (なし) ワーカー起動時にさまざまなリソースを見つけるために使用されるリソースファイルへのパス。リソースファイルの内容は `[{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}]` のようにフォーマットされるべきです。特定のリソースがリソースファイルで見つからない場合、検出スクリプトがそのリソースを見つけるために使用されます。検出スクリプトもリソースを見つけられない場合、ワーカーは起動に失敗します。 3.0.0

SPARK_WORKER_OPTS は以下のシステムプロパティをサポートしています。

プロパティ名デフォルト意味バージョン以降
spark.worker.initialRegistrationRetries 6 短い間隔 (5 秒から 15 秒の間) で再接続するためのリトライ回数。 4.0.0
spark.worker.maxRegistrationRetries 16 再接続の最大リトライ回数。spark.worker.initialRegistrationRetries 回の試行後、間隔は 30 秒から 90 秒の間になります。 4.0.0
spark.worker.cleanup.enabled true ワーカー / アプリケーションディレクトリの定期的なクリーンアップを有効にします。これは YARN の動作が異なるため、スタンドアロンモードのみに影響することに注意してください。停止したアプリケーションのディレクトリのみがクリーンアップされます。spark.shuffle.service.db.enabled が "true" の場合は有効にする必要があります。 1.0.0
spark.worker.cleanup.interval 1800 (30 分) ワーカーがローカルマシン上の古いアプリケーションワークディレクトリをクリーンアップする間隔を秒単位で制御します。 1.0.0
spark.worker.cleanup.appDataTtl 604800 (7 日、7 * 24 * 3600) 各ワーカーでアプリケーションワークディレクトリを保持する秒数。これは TTL (Time To Live) であり、利用可能なディスク容量に応じて設定する必要があります。アプリケーションログと JAR は各アプリケーションワークディレクトリにダウンロードされます。時間が経つにつれて、ワークディレクトリはディスク容量をすぐに使い果たす可能性があります。特にジョブを非常に頻繁に実行する場合。 1.0.0
spark.shuffle.service.db.enabled true 外部シャッフルサービスの状態をローカルディスクに保存するため、外部シャッフルサービスが再起動されたときに、現在の Executor の情報を自動的にリロードします。これはスタンドアロンモードのみに影響します (yarn は常にこの動作を有効にしています)。状態が最終的にクリーンアップされるように、spark.worker.cleanup.enabled も有効にすることをお勧めします。この設定は将来削除される可能性があります。 3.0.0
spark.shuffle.service.db.backend ROCKSDB spark.shuffle.service.db.enabled が true の場合、ユーザーはこれを使用してシャッフルサービスの状態ストアで使用されるディスクベースストアの種類を指定できます。現在、ROCKSDBLEVELDB (非推奨) をサポートしており、デフォルト値は ROCKSDB です。RocksDB/LevelDB の元のデータストアは、現在自動的に別の種類のストレージに変換されません。 3.4.0
spark.storage.cleanupFilesAfterExecutorExit true Executor 終了後にワーカーディレクトリの非シャッフルファイル (一時シャッフルブロック、キャッシュされた RDD/ブロードキャストブロック、スピルファイルなど) をクリーンアップします。これは spark.worker.cleanup.enabled と重複しません。これは、デッド Executor のローカルディレクトリ内の非シャッフルファイルをクリーンアップすることを有効にし、spark.worker.cleanup.enabled は停止してタイムアウトしたアプリケーションのすべてのファイル/サブディレクトリのクリーンアップを有効にするためです。これはスタンドアロンモードのみに影響し、他のクラスターマネージャーのサポートは将来追加される可能性があります。 2.4.0
spark.worker.ui.compressedLogFileLengthCacheSize 100 圧縮ログファイルの場合、非圧縮ファイルはファイルを解凍することによってのみ計算できます。Spark は圧縮ログファイルの非圧縮ファイルサイズをキャッシュします。このプロパティはキャッシュサイズを制御します。 2.0.2
spark.worker.idPattern worker-%s-%s-%d Java `String.format` メソッドに基づいたワーカー ID 生成パターン。デフォルト値は `worker-%s-%s-%d` で、既存のワーカー ID 文字列を表します (例: `worker-20231109183042-[fe80::1%lo0]-39729`)。一意の ID を生成するように注意してください。 4.0.0

リソース割り当てと設定の概要

設定ページ のカスタムリソーススケジューリングと設定の概要セクションを必ずお読みください。このセクションでは、リソーススケジューリングの Spark スタンドアロン固有の側面のみについて説明します。

Spark スタンドアロンには 2 つの部分があります。1 つ目はワーカーのリソース設定、2 つ目は特定のアプリケーションのリソース割り当てです。

ユーザーは、割り当てるリソースセットを持つようにワーカーを設定する必要があります。spark.worker.resource.{resourceName}.amount を使用して、ワーカーが持つ各リソースの量を制御します。ユーザーは、ワーカーが割り当てられたリソースをどのように検出するかを指定するために、spark.worker.resourcesFile または spark.worker.resource.{resourceName}.discoveryScript のいずれかを指定する必要があります。どちらの方法がセットアップに適しているかについては、それぞれの上記の説明を参照してください。

2 番目の部分は、Spark スタンドアロンでアプリケーションを実行することです。標準の Spark リソース設定からの唯一の特別なケースは、ドライバをクライアントモードで実行する場合です。クライアントモードのドライバの場合、ユーザーは spark.driver.resourcesFile または spark.driver.resource.{resourceName}.discoveryScript を介して使用するリソースを指定できます。ドライバが他のドライバと同じホストで実行されている場合は、リソースファイルまたは検出スクリプトが同じノードで実行されている他のドライバと競合しないリソースのみを返すようにしてください。

注意、ワーカーはアプリケーションの各 Executor を割り当てられたリソースで起動するため、アプリケーションを送信するときにユーザーが検出スクリプトを指定する必要はありません。

アプリケーションをクラスターに接続する

Spark クラスターでアプリケーションを実行するには、マスターの spark://IP:PORT URL を SparkContext コンストラクタ に渡すだけです。

クラスターに対してインタラクティブな Spark シェルを実行するには、次のコマンドを実行します。

./bin/spark-shell --master spark://IP:PORT

また、`--total-executor-cores <numCores>` オプションを渡して、spark-shell がクラスターで使用するコア数を制御することもできます。

クライアントプロパティ

Spark アプリケーションは、スタンドアロンモードに固有の以下の設定プロパティをサポートしています。

プロパティ名デフォルト値意味バージョン以降
spark.standalone.submit.waitAppCompletion false スタンドアロンクラスターモードでは、アプリケーションが完了するまでクライアントが終了するのを待つかどうかを制御します。true に設定されている場合、クライアントプロセスはドライバーの状態をポーリングしながらアクティブなままになります。それ以外の場合、クライアントプロセスは送信後に終了します。 3.1.0

Spark アプリケーションの起動

Spark プロトコル

spark-submit スクリプト は、コンパイル済みの Spark アプリケーションをクラスターに送信する最も簡単な方法です。スタンドアロンクラスターの場合、Spark は現在 2 つのデプロイモードをサポートしています。`client` モードでは、ドライバはアプリケーションを送信するクライアントと同じプロセスで起動されます。しかし、`cluster` モードでは、ドライバはクラスター内のワーカープロセスのいずれかから起動され、クライアントプロセスはアプリケーションの完了を待たずに、アプリケーションを送信するという責任を果たしたらすぐに終了します。

アプリケーションが Spark submit 経由で起動された場合、アプリケーション JAR はすべてのワーカーノードに自動的に配布されます。アプリケーションが依存する追加の JAR については、カンマ区切り (例: `--jars jar1,jar2`) を使用して `--jars` フラグで指定する必要があります。アプリケーションの設定または実行環境を制御するには、Spark 設定 を参照してください。

さらに、スタンドアロンの `cluster` モードでは、アプリケーションがゼロ以外の終了コードで終了した場合に自動的に再起動できます。この機能を使用するには、アプリケーションを起動する際に `spark-submit` に `--supervise` フラグを渡すことができます。次に、繰り返し失敗するアプリケーションを終了したい場合は、以下を通じて行うことができます。

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

スタンドアロンマスターの Web UI (http://<master url>:8080) でドライバー ID を見つけることができます。

REST API

spark.master.rest.enabled が有効になっている場合、Spark マスターは http://[host:port]/[version]/submissions/[action] 経由で追加の REST API を提供します。ここで、host はマスターホスト、portspark.master.rest.port (デフォルト: 6066) で指定されたポート番号、version はプロトコルバージョン (現在 v1)、action はサポートされているアクションのいずれかです。

コマンドHTTP メソッド説明バージョン以降
create POST Spark ドライバを `cluster` モードで作成します。4.0.0 以降、Spark マスターは Spark プロパティと環境変数の値のサーバーサイド変数置換をサポートしています。 1.3.0
kill POST 単一の Spark ドライバを終了します。 1.3.0
killall POST 実行中のすべての Spark ドライバを終了します。 4.0.0
status GET Spark ジョブのステータスを確認します。 1.3.0
clear POST 完了したドライバーとアプリケーションをクリアします。 4.0.0

以下は、`pi.py` と REST API を使用した `curl` CLI コマンドの例です。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
  "appResource": "",
  "sparkProperties": {
    "spark.master": "spark://master:7077",
    "spark.app.name": "Spark Pi",
    "spark.driver.memory": "1g",
    "spark.driver.cores": "1",
    "spark.jars": ""
  },
  "clientSparkVersion": "",
  "mainClass": "org.apache.spark.deploy.SparkSubmit",
  "environmentVariables": { },
  "action": "CreateSubmissionRequest",
  "appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'

上記 `create` リクエストに対する REST API の応答は以下のとおりです。

{
  "action" : "CreateSubmissionResponse",
  "message" : "Driver successfully submitted as driver-20231124153531-0000",
  "serverSparkVersion" : "4.0.0",
  "submissionId" : "driver-20231124153531-0000",
  "success" : true
}

sparkProperties および environmentVariables の場合、ユーザーはサーバーサイド環境変数のプレースホルダーを次のように使用できます。

$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Authorization: Bearer USER-PROVIDED-WEB-TOEN-SIGNED-BY-THE-SAME-SHARED-KEY"
...

sparkProperties および environmentVariables の場合、ユーザーはサーバーサイド環境変数のプレースホルダーを次のように使用できます。


...
  "sparkProperties": {
    "spark.hadoop.fs.s3a.endpoint": "{{AWS_ENDPOINT_URL}}",
    "spark.hadoop.fs.s3a.endpoint.region": "{{AWS_REGION}}"
  },
  "environmentVariables": {
    "AWS_CA_BUNDLE": "{{AWS_CA_BUNDLE}}"
  },
...

リソーススケジューリング

スタンドアロンクラスターモードは現在、アプリケーション間の単純な FIFO スケジューラのみをサポートしています。ただし、複数のユーザーが同時に実行できるように、各アプリケーションが使用する最大リソースを制御できます。デフォルトでは、クラスターの*すべて*のコアを取得しますが、これは一度に 1 つのアプリケーションのみを実行する場合にのみ意味があります。SparkConf の `spark.cores.max` を設定することで、コア数を制限できます。例:

val conf = new SparkConf()
  .setMaster(...)
  .setAppName(...)
  .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

さらに、クラスターマスタープロセスで `spark.deploy.defaultCores` を構成して、`spark.cores.max` を無限未満に設定していないアプリケーションのデフォルトを変更できます。これは、`conf/spark-env.sh` に以下を追加することで行います。

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

これは、ユーザーが個別に最大コア数を構成していない共有クラスターで役立ちます。

Executor スケジューリング

各 Executor に割り当てられるコア数は構成可能です。`spark.executor.cores` が明示的に設定されている場合、ワーカーに十分なコアとメモリがある場合、同じアプリケーションの複数の Executor が同じワーカーに起動される可能性があります。それ以外の場合、各 Executor はデフォルトでワーカーで利用可能なすべてのコアを取得するため、この場合、1 回のスケジューリングイテレーション中に各ワーカーに起動できるアプリケーションごとの Executor は 1 つだけになります。

ステージレベルのスケジューリング概要

ステージレベルのスケジューリングはスタンドアロンでサポートされています。

注意点

動的リソース割り当てで述べたように、動的割り当てが有効な場合に各 Executor のコアが明示的に指定されていない場合、Spark は予想よりも多くの Executor を取得する可能性があります。そのため、ステージレベルのスケジューリングを使用する場合は、各リソースプロファイルに対して Executor コアを明示的に設定することをお勧めします。

監視とロギング

Spark のスタンドアロンモードは、クラスターを監視するための Web ベースのユーザーインターフェイスを提供します。マスターと各ワーカーは、クラスターとジョブの統計情報を表示する独自の Web UI を備えています。デフォルトでは、ポート 8080 でマスターの Web UI にアクセスできます。ポートは、設定ファイルで変更するか、コマンドラインオプションで変更できます。

さらに、各ジョブの詳細なログ出力は、各ワーカーノードの作業ディレクトリ (デフォルトでは SPARK_HOME/work) にも書き込まれます。ジョブごとに、stdoutstderr の 2 つのファイルがあり、コンソールに出力されたすべての出力が含まれます。

Hadoop と並行して実行する

既存の Hadoop クラスターと並行して Spark を実行するには、同じマシンで別個のサービスとして起動するだけです。Spark から Hadoop データにアクセスするには、hdfs:// URL (通常は hdfs://<namenode>:9000/path) を使用します。または、Spark 用に別のクラスターをセットアップし、ネットワーク経由で HDFS にアクセスすることもできます。これはディスクローカルアクセスよりも遅くなりますが、同じローカルエリアネットワーク内で実行している場合 (例: Hadoop がある各ラックに Spark マシンを数台配置する) は問題にならないかもしれません。

ネットワークセキュリティのためのポート設定

一般的に、Spark クラスターとそのサービスは公開インターネット上にはデプロイされません。それらは一般的にプライベートサービスであり、Spark をデプロイした組織のネットワーク内でのみアクセス可能であるべきです。Spark サービスで使用されるホストとポートへのアクセスは、サービスにアクセスする必要があるオリジンホストに限定されるべきです。

これは、スタンドアロンリソースマネージャーを使用するクラスターにとって特に重要です。なぜなら、他のリソースマネージャーが行うようなきめ細かなアクセス制御をサポートしていないためです。

設定すべきポートの完全なリストについては、セキュリティページ を参照してください。

高可用性

デフォルトでは、スタンドアロンスケジューリングクラスターはワーカー障害に対して耐性があります (Spark 自体が作業を他のワーカーに移動して失うことに耐性がある限り)。しかし、スケジューラはスケジューリングの決定を行うためにマスターを使用しており、これは (デフォルトで) 単一障害点を作成します。マスターがクラッシュした場合、新しいアプリケーションを作成できません。これを回避するために、2 つの高可用性スキームを用意しました。以下に詳述します。

ZooKeeper によるスタンバイマスター

概要

ZooKeeper を使用してリーダー選出と一部の状態保存を提供することで、同じ ZooKeeper インスタンスに接続された複数のマスターをクラスターに起動できます。1 つが「リーダー」として選出され、残りはスタンバイモードになります。現在のリーダーがダウンした場合、別のマスターが選出され、古いマスターの状態を回復してからスケジューリングを再開します。全体の回復プロセス (最初のリーダーがダウンしてから) は 1 分から 2 分で完了するはずです。この遅延は *新しい* アプリケーションのスケジューリングにのみ影響することに注意してください。マスターのフェイルオーバー中にすでに実行されていたアプリケーションは影響を受けません。

ZooKeeper の開始方法については、こちら で詳細を確認してください。

設定

このリカバリモードを有効にするには、`spark.deploy.recoveryMode` および関連する `spark.deploy.zookeeper.*` 設定を構成して、spark-env で `SPARK_DAEMON_JAVA_OPTS` を設定できます。

注意点: クラスターに複数のマスターがあるのに、マスターが ZooKeeper を使用するように正しく設定されていない場合、マスターはお互いを検出できず、すべてがリーダーであると認識します。これは健全なクラスター状態にはなりません (すべてのマスターが個別にスケジュールするため)。

詳細

ZooKeeper クラスターをセットアップしたら、高可用性の有効化は簡単です。同じ ZooKeeper 設定 (ZooKeeper URL とディレクトリ) を持つ異なるノードで複数のマスタープロセスを起動するだけです。マスターはいつでも追加および削除できます。

新しいアプリケーションをスケジュールしたり、クラスターにワーカーを追加したりするには、以前は単一のマスターを渡していた場所にマスターの IP アドレスのリストを渡す必要があります。たとえば、SparkContext を `spark://host1:port1,host2:port2` にポイントして起動するかもしれません。これにより、SparkContext は両方のマスターに登録しようとします。host1 がダウンしても、新しいリーダーである host2 を見つけることができるため、この構成は引き続き有効です。

「マスターへの登録」と通常の操作の間には重要な区別があります。起動時、アプリケーションまたはワーカーは現在のリーダーマスターを見つけて登録できる必要があります。しかし、一度正常に登録されると、それは「システム内」(つまり、ZooKeeper に保存されている) になります。フェイルオーバーが発生した場合、新しいリーダーは以前に登録されたすべてのアプリケーションとワーカーに連絡してリーダーシップの変更を通知するため、起動時に新しいマスターの存在を知る必要さえありません。

このプロパティにより、新しいマスターはいつでも作成できます。心配する必要があるのは、*新しい*アプリケーションとワーカーがリーダーになった場合に登録できることだけです。一度登録されると、すべて完了です。

ファイルシステムによる単一ノードリカバリ

概要

ZooKeeper は本番レベルの高可用性にとって最良の方法ですが、マスターがダウンした場合に再起動できるだけであれば、FILESYSTEM モードで対応できます。アプリケーションとワーカーが登録する際、提供されたディレクトリに十分な状態が書き込まれるため、マスタープロセスの再起動時に回復できます。

設定

このリカバリモードを有効にするには、spark-env で `SPARK_DAEMON_JAVA_OPTS` を次の設定で使用して設定できます。

システムプロパティデフォルト値意味バージョン以降
spark.deploy.recoveryMode NONE クラスターモードで失敗して再起動した Spark ジョブを回復するためのリカバリモード設定。ファイルシステムベースの単一ノードリカバリモードを有効にするには FILESYSTEM、RocksDB ベースの単一ノードリカバリモードを有効にするには ROCKSDB、ZooKeeper ベースのリカバリモードを使用するには ZOOKEEPER、追加の `spark.deploy.recoveryMode.factory` 設定でカスタムプロバイダークラスを提供するには CUSTOM に設定します。NONE はデフォルト値で、このリカバリモードを無効にします。 0.8.1
spark.deploy.recoveryDirectory "" Spark がリカバリ状態を保存するディレクトリ。マスターの観点からアクセス可能です。spark.deploy.recoveryMode または spark.deploy.recoveryCompressionCodec が変更された場合、ディレクトリは手動でクリアする必要があることに注意してください。 0.8.1
spark.deploy.recoveryCompressionCodec (なし) 永続化エンジンの圧縮コーデック。none (デフォルト)、lz4、lzf、snappy、zstd。現在、FILESYSTEM モードのみがこの構成をサポートしています。 4.0.0
spark.deploy.recoveryTimeout (なし) リカバリプロセスのタイムアウト。デフォルト値は spark.worker.timeout と同じです。 4.0.0
spark.deploy.recoveryMode.factory "" StandaloneRecoveryModeFactory インターフェイスを実装するクラス 1.2.0
spark.deploy.zookeeper.url なし spark.deploy.recoveryMode が ZOOKEEPER に設定されている場合、この構成は接続する zookeeper URL を設定するために使用されます。 0.8.1
spark.deploy.zookeeper.dir なし spark.deploy.recoveryMode が ZOOKEEPER に設定されている場合、この構成はリカバリ状態を保存するための zookeeper ディレクトリを設定するために使用されます。 0.8.1

詳細