Spark スタンドアロンモード
- セキュリティ
- Spark スタンドアロンモードをクラスターにインストールする
- クラスターを手動で起動する
- クラスター起動スクリプト
- リソース割り当てと設定の概要
- アプリケーションをクラスターに接続する
- クライアントプロパティ
- Spark アプリケーションの起動
- リソーススケジューリング
- Executor スケジューリング
- ステージレベルのスケジューリング概要
- 監視とロギング
- Hadoop と並行して実行する
- ネットワークセキュリティのためのポート設定
- 高可用性
YARN クラスターマネージャー上で実行することに加えて、Spark はシンプルなスタンドアロンデプロイモードも提供します。スタンドアロンクラスターは、マスターとワーカーを手動で起動して手動で起動することも、提供されている 起動スクリプト を使用することもできます。テストのためにこれらのデーモンを 1 台のマシンで実行することも可能です。
セキュリティ
認証のようなセキュリティ機能はデフォルトでは有効になっていません。インターネットまたは信頼されていないネットワークに公開されているクラスターをデプロイする場合、不正なアプリケーションがクラスター上で実行されるのを防ぐために、クラスターへのアクセスを保護することが重要です。Spark を実行する前に、Spark セキュリティ およびこのドキュメントの特定のセキュリティセクションを参照してください。
Spark スタンドアロンモードをクラスターにインストールする
Spark スタンドアロンモードをインストールするには、コンパイル済みの Spark をクラスターの各ノードに配置するだけです。各リリースで事前ビルドされた Spark を入手することも、自分でビルド することもできます。
クラスターを手動で起動する
スタンドアロンマスターサーバーを起動するには、以下を実行します。
./sbin/start-master.sh
起動後、マスターは自身の spark://HOST:PORT URL を出力します。この URL を使用してワーカーを接続したり、SparkContext の「master」引数として渡したりできます。この URL は、マスターの Web UI (デフォルトでは https://:8080) でも確認できます。
同様に、1 つ以上のワーカーを起動し、以下を介してマスターに接続できます。
./sbin/start-worker.sh <master-spark-URL>
ワーカーを起動したら、マスターの Web UI (デフォルトでは https://:8080) を確認してください。新しいノードが、CPU 数とメモリ (OS 用に 1GB 残す) とともにリストされているはずです。
最後に、以下の設定オプションをマスターとワーカーに渡すことができます。
| 引数 | 意味 |
|---|---|
-h HOST, --host HOST |
リッスンするホスト名 |
-p PORT, --port PORT |
サービスがリッスンするポート (デフォルト: マスターは 7077、ワーカーはランダム) |
--webui-port PORT |
Web UI のポート (デフォルト: マスターは 8080、ワーカーは 8081) |
-c CORES, --cores CORES |
Spark アプリケーションがマシンで使用することを許可する合計 CPU コア数 (デフォルト: すべて利用可能); ワーカーのみ |
-m MEM, --memory MEM |
Spark アプリケーションがマシンで使用することを許可する合計メモリ量 (例: 1000M または 2G); ワーカーのみ |
-d DIR, --work-dir DIR |
スクラッチスペースとジョブ出力ログに使用するディレクトリ (デフォルト: SPARK_HOME/work); ワーカーのみ |
--properties-file FILE |
ロードするカスタム Spark プロパティファイルへのパス (デフォルト: conf/spark-defaults.conf) |
クラスター起動スクリプト
起動スクリプトを使用して Spark スタンドアロンクラスターを起動するには、Spark ディレクトリに conf/workers という名前のファイルを作成する必要があります。このファイルには、Spark ワーカーを起動する予定のすべてのマシンのホスト名を 1 行に 1 つずつ記述する必要があります。conf/workers が存在しない場合、起動スクリプトはデフォルトで単一マシン (localhost) を使用します。これはテストに便利です。注意、マスターマシンは各ワーカーマシンに SSH でアクセスします。デフォルトでは、SSH は並列で実行され、パスワードなし (秘密鍵を使用) のアクセス設定が必要です。パスワードなしの設定がない場合は、環境変数 SPARK_SSH_FOREGROUND を設定して、各ワーカーのパスワードをシリアルに入力できます。
このファイルをセットアップしたら、Hadoop のデプロイ スクリプトに基づいた以下のシェルスクリプトを使用して、クラスターを起動または停止できます。これらのスクリプトは SPARK_HOME/sbin にあります。
sbin/start-master.sh- スクリプトが実行されたマシンでマスターインスタンスを起動します。sbin/start-workers.sh-conf/workersファイルで指定された各マシンでワーカーインスタンスを起動します。sbin/start-worker.sh- スクリプトが実行されたマシンでワーカーインスタンスを起動します。sbin/start-connect-server.sh- スクリプトが実行されたマシンで Spark Connect サーバーを起動します。sbin/start-all.sh- 上記で説明したように、マスターと複数のワーカーを起動します。sbin/stop-master.sh-sbin/start-master.shスクリプトで起動されたマスターを停止します。sbin/stop-worker.sh- スクリプトが実行されたマシンで、すべてのワーカーインスタンスを停止します。sbin/stop-workers.sh-conf/workersファイルで指定されたマシンで、すべてのワーカーインスタンスを停止します。sbin/stop-connect-server.sh- スクリプトが実行されたマシンで、すべての Spark Connect サーバーインスタンスを停止します。sbin/stop-all.sh- 上記で説明したように、マスターとワーカーの両方を停止します。
これらのスクリプトは、ローカルマシンではなく、Spark マスターを実行したいマシンで実行する必要があることに注意してください。
conf/spark-env.sh で環境変数を設定することにより、クラスターをさらに設定することができます。このファイルは conf/spark-env.sh.template から開始して作成し、設定を有効にするために *すべてのワーカーマシンにコピー* してください。以下の設定が利用可能です。
| 環境変数 | 意味 |
|---|---|
SPARK_MASTER_HOST |
マスターを特定のホスト名または IP アドレス (例: 公開されているもの) にバインドします。 |
SPARK_MASTER_PORT |
マスターを別のポートで起動します (デフォルト: 7077)。 |
SPARK_MASTER_WEBUI_PORT |
マスター Web UI のポート (デフォルト: 8080)。 |
SPARK_MASTER_OPTS |
マスターのみに適用される設定プロパティを "-Dx=y" の形式で指定します (デフォルト: なし)。可能なオプションのリストについては、以下を参照してください。 |
SPARK_LOCAL_DIRS |
Spark で "スクラッチ" スペースに使用するディレクトリ。これには、マップ出力ファイルやディスクに保存される RDD が含まれます。これは、システム上の高速なローカルディスク上にある必要があります。複数のディスク上の複数のディレクトリのカンマ区切りリストにすることもできます。 |
SPARK_LOG_DIR |
ログファイルが保存される場所。(デフォルト: SPARK_HOME/logs)。 |
SPARK_LOG_MAX_FILES |
ログファイルの最大数 (デフォルト: 5)。 |
SPARK_PID_DIR |
PID ファイルが保存される場所。(デフォルト: /tmp)。 |
SPARK_WORKER_CORES |
Spark アプリケーションがマシンで使用することを許可する合計コア数 (デフォルト: 利用可能なすべてのコア)。 |
SPARK_WORKER_MEMORY |
Spark アプリケーションがマシンで使用することを許可する合計メモリ量 (例: 1000m、2g) (デフォルト: 総メモリから 1GiB を引いた値)。各アプリケーションの*個別の*メモリは、`spark.executor.memory` プロパティを使用して構成されることに注意してください。 |
SPARK_WORKER_PORT |
Spark ワーカーを特定のポートで起動します (デフォルト: ランダム)。 |
SPARK_WORKER_WEBUI_PORT |
ワーカー Web UI のポート (デフォルト: 8081)。 |
SPARK_WORKER_DIR |
アプリケーションを実行するディレクトリ。ログとスクラッチスペースの両方が含まれます (デフォルト: SPARK_HOME/work)。 |
SPARK_WORKER_OPTS |
ワーカーのみに適用される設定プロパティを "-Dx=y" の形式で指定します (デフォルト: なし)。可能なオプションのリストについては、以下を参照してください。 |
SPARK_DAEMON_MEMORY |
Spark マスターおよびワーカーデーモン自体に割り当てるメモリ (デフォルト: 1g)。 |
SPARK_DAEMON_JAVA_OPTS |
Spark マスターおよびワーカーデーモン自体に対する JVM オプションを "-Dx=y" の形式で指定します (デフォルト: なし)。 |
SPARK_DAEMON_CLASSPATH |
Spark マスターおよびワーカーデーモン自体のクラスパス (デフォルト: なし)。 |
SPARK_PUBLIC_DNS |
Spark マスターおよびワーカーの公開 DNS 名 (デフォルト: なし)。 |
注意: 起動スクリプトは現在 Windows をサポートしていません。Windows 上で Spark クラスターを実行するには、マスターとワーカーを手動で起動してください。
SPARK_MASTER_OPTS は以下のシステムプロパティをサポートしています。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.master.ui.port |
8080 |
マスター Web UI エンドポイントのポート番号を指定します。 | 1.1.0 |
spark.master.ui.title |
(なし) | マスター UI ページのタイトルを指定します。設定されていない場合、デフォルトで `Spark Master at 'master url'` が使用されます。 | 4.0.0 |
spark.master.ui.decommission.allow.mode |
LOCAL |
マスター Web UI の /workers/kill エンドポイントの動作を指定します。可能な選択肢は次のとおりです: `LOCAL` はマスターを実行しているマシンからローカルな IP からこのエンドポイントを許可することを意味します、`DENY` はこのエンドポイントを完全に無効にすることを意味します、`ALLOW` は任意の IP からこのエンドポイントを呼び出すことを許可することを意味します。 | 3.1.0 |
spark.master.ui.historyServerUrl |
(なし) | Spark History Server が実行されている URL。すべての Spark ジョブが History Server がアクセスする同じイベントログの場所を共有していることを前提としていることに注意してください。 | 4.0.0 |
spark.master.rest.enabled |
false |
マスター REST API エンドポイントを使用するかどうか。 | 1.3.0 |
spark.master.rest.host |
(なし) | マスター REST API エンドポイントのホストを指定します。 | 4.0.0 |
spark.master.rest.port |
6066 |
マスター REST API エンドポイントのポート番号を指定します。 | 1.3.0 |
spark.master.rest.filters |
(なし) | マスター REST API に適用するフィルタークラス名のカンマ区切りリスト。 | 4.0.0 |
spark.master.useAppNameAsAppId.enabled |
false |
(実験的) true の場合、Spark マスターはユーザー指定の appName を appId として使用します。 | 4.0.0 |
spark.deploy.retainedApplications |
200 | 表示する完了したアプリケーションの最大数。この制限を維持するために、古いアプリケーションは UI から削除されます。 |
0.8.0 |
spark.deploy.retainedDrivers |
200 | 表示する完了したドライバーの最大数。この制限を維持するために、古いドライバーは UI から削除されます。 |
1.1.0 |
spark.deploy.spreadOutDrivers |
true | スタンドアロンクラスターマネージャーがドライバーをノード全体に分散させるか、できるだけ少ないノードに統合しようとするかどうか。分散は通常 HDFS でのデータローカリティに優れていますが、統合は計算集約型のワークロードにより効率的です。 | 4.0.0 |
spark.deploy.spreadOutApps |
true | スタンドアロンクラスターマネージャーがアプリケーションをノード全体に分散させるか、できるだけ少ないノードに統合しようとするかどうか。分散は通常 HDFS でのデータローカリティに優れていますが、統合は計算集約型のワークロードにより効率的です。 |
0.6.1 |
spark.deploy.defaultCores |
Int.MaxValue | Spark のスタンドアロンモードで、アプリケーションが `spark.cores.max` を設定しない場合にアプリケーションに与えるデフォルトのコア数。設定されていない場合、アプリケーションは `spark.cores.max` を自分で構成しない限り、常に利用可能なすべてのコアを取得します。共有クラスターでこれを低く設定すると、ユーザーがデフォルトでクラスター全体を占有するのを防ぐことができます。 |
0.9.0 |
spark.deploy.maxExecutorRetries |
10 | スタンドアロンクラスターマネージャーが欠陥のあるアプリケーションを削除する前に発生する可能性のある連続した Executor の障害の最大回数に対する制限。実行中の Executor が 1 つでもある場合、アプリケーションが削除されることはありません。アプリケーションが spark.deploy.maxExecutorRetries 回以上の連続した障害を経験し、その障害の間に Executor が正常に開始されず、アプリケーションに実行中の Executor がない場合、スタンドアロンクラスターマネージャーはアプリケーションを削除して失敗とマークします。この自動削除を無効にするには、spark.deploy.maxExecutorRetries を -1 に設定してください。 |
1.6.3 |
spark.deploy.maxDrivers |
Int.MaxValue | 実行中のドライバーの最大数。 | 4.0.0 |
spark.deploy.appNumberModulo |
(なし) | アプリ番号のモジュロ。デフォルトでは、`app-yyyyMMddHHmmss-9999` の次は `app-yyyyMMddHHmmss-10000` になります。モジュロが 10000 の場合、`app-yyyyMMddHHmmss-0000` になります。ほとんどの場合、プレフィックス `app-yyyyMMddHHmmss` は 10000 個のアプリケーションを作成する際に既に増分されます。 | 4.0.0 |
spark.deploy.driverIdPattern |
driver-%s-%04d |
Java の `String.format` メソッドに基づいたドライバー ID 生成パターン。デフォルト値は `driver-%s-%04d` で、既存のドライバー ID 文字列を表します (例: `driver-20231031224459-0019`)。一意の ID を生成するように注意してください。 | 4.0.0 |
spark.deploy.appIdPattern |
app-%s-%04d |
Java の `String.format` メソッドに基づいたアプリ ID 生成パターン。デフォルト値は `app-%s-%04d` で、既存のアプリ ID 文字列を表します (例: `app-20231031224509-0008`)。一意の ID を生成するように注意してください。 | 4.0.0 |
spark.worker.timeout |
60 | スタンドアロンデプロイマスターがハートビートを受け取らなかった場合にワーカーが失われたと見なすまでの秒数。 | 0.6.2 |
spark.dead.worker.persistence |
15 | UI でデッドワーカー情報を保持するイテレーション数。デフォルトでは、デッドワーカーは最後のハートビートから (15 + 1) * `spark.worker.timeout` の間表示されます。 | 0.8.0 |
spark.worker.resource.{name}.amount |
(なし) | ワーカー上の特定のリソースの使用量。 | 3.0.0 |
spark.worker.resource.{name}.discoveryScript |
(なし) | ワーカー起動時に特定のリソースを見つけるために使用されるリソース検出スクリプトへのパス。スクリプトの出力は `ResourceInformation` クラスのようにフォーマットされるべきです。 | 3.0.0 |
spark.worker.resourcesFile |
(なし) | ワーカー起動時にさまざまなリソースを見つけるために使用されるリソースファイルへのパス。リソースファイルの内容は `[{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}]` のようにフォーマットされるべきです。特定のリソースがリソースファイルで見つからない場合、検出スクリプトがそのリソースを見つけるために使用されます。検出スクリプトもリソースを見つけられない場合、ワーカーは起動に失敗します。 | 3.0.0 |
SPARK_WORKER_OPTS は以下のシステムプロパティをサポートしています。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.worker.initialRegistrationRetries |
6 | 短い間隔 (5 秒から 15 秒の間) で再接続するためのリトライ回数。 | 4.0.0 |
spark.worker.maxRegistrationRetries |
16 | 再接続の最大リトライ回数。spark.worker.initialRegistrationRetries 回の試行後、間隔は 30 秒から 90 秒の間になります。 |
4.0.0 |
spark.worker.cleanup.enabled |
true | ワーカー / アプリケーションディレクトリの定期的なクリーンアップを有効にします。これは YARN の動作が異なるため、スタンドアロンモードのみに影響することに注意してください。停止したアプリケーションのディレクトリのみがクリーンアップされます。spark.shuffle.service.db.enabled が "true" の場合は有効にする必要があります。 |
1.0.0 |
spark.worker.cleanup.interval |
1800 (30 分) | ワーカーがローカルマシン上の古いアプリケーションワークディレクトリをクリーンアップする間隔を秒単位で制御します。 | 1.0.0 |
spark.worker.cleanup.appDataTtl |
604800 (7 日、7 * 24 * 3600) | 各ワーカーでアプリケーションワークディレクトリを保持する秒数。これは TTL (Time To Live) であり、利用可能なディスク容量に応じて設定する必要があります。アプリケーションログと JAR は各アプリケーションワークディレクトリにダウンロードされます。時間が経つにつれて、ワークディレクトリはディスク容量をすぐに使い果たす可能性があります。特にジョブを非常に頻繁に実行する場合。 | 1.0.0 |
spark.shuffle.service.db.enabled |
true | 外部シャッフルサービスの状態をローカルディスクに保存するため、外部シャッフルサービスが再起動されたときに、現在の Executor の情報を自動的にリロードします。これはスタンドアロンモードのみに影響します (yarn は常にこの動作を有効にしています)。状態が最終的にクリーンアップされるように、spark.worker.cleanup.enabled も有効にすることをお勧めします。この設定は将来削除される可能性があります。 |
3.0.0 |
spark.shuffle.service.db.backend |
ROCKSDB | spark.shuffle.service.db.enabled が true の場合、ユーザーはこれを使用してシャッフルサービスの状態ストアで使用されるディスクベースストアの種類を指定できます。現在、ROCKSDB と LEVELDB (非推奨) をサポートしており、デフォルト値は ROCKSDB です。RocksDB/LevelDB の元のデータストアは、現在自動的に別の種類のストレージに変換されません。 |
3.4.0 |
spark.storage.cleanupFilesAfterExecutorExit |
true | Executor 終了後にワーカーディレクトリの非シャッフルファイル (一時シャッフルブロック、キャッシュされた RDD/ブロードキャストブロック、スピルファイルなど) をクリーンアップします。これは spark.worker.cleanup.enabled と重複しません。これは、デッド Executor のローカルディレクトリ内の非シャッフルファイルをクリーンアップすることを有効にし、spark.worker.cleanup.enabled は停止してタイムアウトしたアプリケーションのすべてのファイル/サブディレクトリのクリーンアップを有効にするためです。これはスタンドアロンモードのみに影響し、他のクラスターマネージャーのサポートは将来追加される可能性があります。 |
2.4.0 |
spark.worker.ui.compressedLogFileLengthCacheSize |
100 | 圧縮ログファイルの場合、非圧縮ファイルはファイルを解凍することによってのみ計算できます。Spark は圧縮ログファイルの非圧縮ファイルサイズをキャッシュします。このプロパティはキャッシュサイズを制御します。 | 2.0.2 |
spark.worker.idPattern |
worker-%s-%s-%d |
Java `String.format` メソッドに基づいたワーカー ID 生成パターン。デフォルト値は `worker-%s-%s-%d` で、既存のワーカー ID 文字列を表します (例: `worker-20231109183042-[fe80::1%lo0]-39729`)。一意の ID を生成するように注意してください。 | 4.0.0 |
リソース割り当てと設定の概要
設定ページ のカスタムリソーススケジューリングと設定の概要セクションを必ずお読みください。このセクションでは、リソーススケジューリングの Spark スタンドアロン固有の側面のみについて説明します。
Spark スタンドアロンには 2 つの部分があります。1 つ目はワーカーのリソース設定、2 つ目は特定のアプリケーションのリソース割り当てです。
ユーザーは、割り当てるリソースセットを持つようにワーカーを設定する必要があります。spark.worker.resource.{resourceName}.amount を使用して、ワーカーが持つ各リソースの量を制御します。ユーザーは、ワーカーが割り当てられたリソースをどのように検出するかを指定するために、spark.worker.resourcesFile または spark.worker.resource.{resourceName}.discoveryScript のいずれかを指定する必要があります。どちらの方法がセットアップに適しているかについては、それぞれの上記の説明を参照してください。
2 番目の部分は、Spark スタンドアロンでアプリケーションを実行することです。標準の Spark リソース設定からの唯一の特別なケースは、ドライバをクライアントモードで実行する場合です。クライアントモードのドライバの場合、ユーザーは spark.driver.resourcesFile または spark.driver.resource.{resourceName}.discoveryScript を介して使用するリソースを指定できます。ドライバが他のドライバと同じホストで実行されている場合は、リソースファイルまたは検出スクリプトが同じノードで実行されている他のドライバと競合しないリソースのみを返すようにしてください。
注意、ワーカーはアプリケーションの各 Executor を割り当てられたリソースで起動するため、アプリケーションを送信するときにユーザーが検出スクリプトを指定する必要はありません。
アプリケーションをクラスターに接続する
Spark クラスターでアプリケーションを実行するには、マスターの spark://IP:PORT URL を SparkContext コンストラクタ に渡すだけです。
クラスターに対してインタラクティブな Spark シェルを実行するには、次のコマンドを実行します。
./bin/spark-shell --master spark://IP:PORT
また、`--total-executor-cores <numCores>` オプションを渡して、spark-shell がクラスターで使用するコア数を制御することもできます。
クライアントプロパティ
Spark アプリケーションは、スタンドアロンモードに固有の以下の設定プロパティをサポートしています。
| プロパティ名 | デフォルト値 | 意味 | バージョン以降 |
|---|---|---|---|
spark.standalone.submit.waitAppCompletion |
false |
スタンドアロンクラスターモードでは、アプリケーションが完了するまでクライアントが終了するのを待つかどうかを制御します。true に設定されている場合、クライアントプロセスはドライバーの状態をポーリングしながらアクティブなままになります。それ以外の場合、クライアントプロセスは送信後に終了します。 | 3.1.0 |
Spark アプリケーションの起動
Spark プロトコル
spark-submit スクリプト は、コンパイル済みの Spark アプリケーションをクラスターに送信する最も簡単な方法です。スタンドアロンクラスターの場合、Spark は現在 2 つのデプロイモードをサポートしています。`client` モードでは、ドライバはアプリケーションを送信するクライアントと同じプロセスで起動されます。しかし、`cluster` モードでは、ドライバはクラスター内のワーカープロセスのいずれかから起動され、クライアントプロセスはアプリケーションの完了を待たずに、アプリケーションを送信するという責任を果たしたらすぐに終了します。
アプリケーションが Spark submit 経由で起動された場合、アプリケーション JAR はすべてのワーカーノードに自動的に配布されます。アプリケーションが依存する追加の JAR については、カンマ区切り (例: `--jars jar1,jar2`) を使用して `--jars` フラグで指定する必要があります。アプリケーションの設定または実行環境を制御するには、Spark 設定 を参照してください。
さらに、スタンドアロンの `cluster` モードでは、アプリケーションがゼロ以外の終了コードで終了した場合に自動的に再起動できます。この機能を使用するには、アプリケーションを起動する際に `spark-submit` に `--supervise` フラグを渡すことができます。次に、繰り返し失敗するアプリケーションを終了したい場合は、以下を通じて行うことができます。
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
スタンドアロンマスターの Web UI (http://<master url>:8080) でドライバー ID を見つけることができます。
REST API
spark.master.rest.enabled が有効になっている場合、Spark マスターは http://[host:port]/[version]/submissions/[action] 経由で追加の REST API を提供します。ここで、host はマスターホスト、port は spark.master.rest.port (デフォルト: 6066) で指定されたポート番号、version はプロトコルバージョン (現在 v1)、action はサポートされているアクションのいずれかです。
| コマンド | HTTP メソッド | 説明 | バージョン以降 |
|---|---|---|---|
create |
POST | Spark ドライバを `cluster` モードで作成します。4.0.0 以降、Spark マスターは Spark プロパティと環境変数の値のサーバーサイド変数置換をサポートしています。 | 1.3.0 |
kill |
POST | 単一の Spark ドライバを終了します。 | 1.3.0 |
killall |
POST | 実行中のすべての Spark ドライバを終了します。 | 4.0.0 |
status |
GET | Spark ジョブのステータスを確認します。 | 1.3.0 |
clear |
POST | 完了したドライバーとアプリケーションをクリアします。 | 4.0.0 |
以下は、`pi.py` と REST API を使用した `curl` CLI コマンドの例です。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
"appResource": "",
"sparkProperties": {
"spark.master": "spark://master:7077",
"spark.app.name": "Spark Pi",
"spark.driver.memory": "1g",
"spark.driver.cores": "1",
"spark.jars": ""
},
"clientSparkVersion": "",
"mainClass": "org.apache.spark.deploy.SparkSubmit",
"environmentVariables": { },
"action": "CreateSubmissionRequest",
"appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'
上記 `create` リクエストに対する REST API の応答は以下のとおりです。
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20231124153531-0000",
"serverSparkVersion" : "4.0.0",
"submissionId" : "driver-20231124153531-0000",
"success" : true
}
sparkProperties および environmentVariables の場合、ユーザーはサーバーサイド環境変数のプレースホルダーを次のように使用できます。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Authorization: Bearer USER-PROVIDED-WEB-TOEN-SIGNED-BY-THE-SAME-SHARED-KEY"
...
sparkProperties および environmentVariables の場合、ユーザーはサーバーサイド環境変数のプレースホルダーを次のように使用できます。
...
"sparkProperties": {
"spark.hadoop.fs.s3a.endpoint": "{{AWS_ENDPOINT_URL}}",
"spark.hadoop.fs.s3a.endpoint.region": "{{AWS_REGION}}"
},
"environmentVariables": {
"AWS_CA_BUNDLE": "{{AWS_CA_BUNDLE}}"
},
...
リソーススケジューリング
スタンドアロンクラスターモードは現在、アプリケーション間の単純な FIFO スケジューラのみをサポートしています。ただし、複数のユーザーが同時に実行できるように、各アプリケーションが使用する最大リソースを制御できます。デフォルトでは、クラスターの*すべて*のコアを取得しますが、これは一度に 1 つのアプリケーションのみを実行する場合にのみ意味があります。SparkConf の `spark.cores.max` を設定することで、コア数を制限できます。例:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)さらに、クラスターマスタープロセスで `spark.deploy.defaultCores` を構成して、`spark.cores.max` を無限未満に設定していないアプリケーションのデフォルトを変更できます。これは、`conf/spark-env.sh` に以下を追加することで行います。
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"これは、ユーザーが個別に最大コア数を構成していない共有クラスターで役立ちます。
Executor スケジューリング
各 Executor に割り当てられるコア数は構成可能です。`spark.executor.cores` が明示的に設定されている場合、ワーカーに十分なコアとメモリがある場合、同じアプリケーションの複数の Executor が同じワーカーに起動される可能性があります。それ以外の場合、各 Executor はデフォルトでワーカーで利用可能なすべてのコアを取得するため、この場合、1 回のスケジューリングイテレーション中に各ワーカーに起動できるアプリケーションごとの Executor は 1 つだけになります。
ステージレベルのスケジューリング概要
ステージレベルのスケジューリングはスタンドアロンでサポートされています。
- 動的割り当てが無効な場合: ユーザーはステージレベルで異なるタスクリソース要件を指定でき、起動時に要求されたのと同じ Executor を使用します。
- 動的割り当てが有効な場合: 現在、マスターがアプリケーションの Executor を割り当てる場合、複数の ResourceProfile の ResourceProfile ID の順序に基づいてスケジュールされます。ID の小さい ResourceProfile が最初にスケジュールされます。通常、Spark は 1 つのステージを完了してから別のステージを開始するため、これは問題になりませんが、ジョブサーバータイプのシナリオでは影響を与える可能性があります。これは考慮すべき点です。スケジューリングでは、Executor メモリと Executor コアのみが組み込み Executor リソースから取得され、他のすべてのカスタムリソースは ResourceProfile から取得されます。Executor のオーバーヘッドなどの他の組み込み Executor リソースは影響しません。ベースのデフォルトプロファイルは、アプリケーションを送信するときに Spark 設定に基づいて作成されます。ベースのデフォルトプロファイルからの Executor メモリと Executor コアはカスタム ResourceProfile に伝播できますが、他のすべてのカスタムリソースは伝播できません。
注意点
動的リソース割り当てで述べたように、動的割り当てが有効な場合に各 Executor のコアが明示的に指定されていない場合、Spark は予想よりも多くの Executor を取得する可能性があります。そのため、ステージレベルのスケジューリングを使用する場合は、各リソースプロファイルに対して Executor コアを明示的に設定することをお勧めします。
監視とロギング
Spark のスタンドアロンモードは、クラスターを監視するための Web ベースのユーザーインターフェイスを提供します。マスターと各ワーカーは、クラスターとジョブの統計情報を表示する独自の Web UI を備えています。デフォルトでは、ポート 8080 でマスターの Web UI にアクセスできます。ポートは、設定ファイルで変更するか、コマンドラインオプションで変更できます。
さらに、各ジョブの詳細なログ出力は、各ワーカーノードの作業ディレクトリ (デフォルトでは SPARK_HOME/work) にも書き込まれます。ジョブごとに、stdout と stderr の 2 つのファイルがあり、コンソールに出力されたすべての出力が含まれます。
Hadoop と並行して実行する
既存の Hadoop クラスターと並行して Spark を実行するには、同じマシンで別個のサービスとして起動するだけです。Spark から Hadoop データにアクセスするには、hdfs:// URL (通常は hdfs://<namenode>:9000/path) を使用します。または、Spark 用に別のクラスターをセットアップし、ネットワーク経由で HDFS にアクセスすることもできます。これはディスクローカルアクセスよりも遅くなりますが、同じローカルエリアネットワーク内で実行している場合 (例: Hadoop がある各ラックに Spark マシンを数台配置する) は問題にならないかもしれません。
ネットワークセキュリティのためのポート設定
一般的に、Spark クラスターとそのサービスは公開インターネット上にはデプロイされません。それらは一般的にプライベートサービスであり、Spark をデプロイした組織のネットワーク内でのみアクセス可能であるべきです。Spark サービスで使用されるホストとポートへのアクセスは、サービスにアクセスする必要があるオリジンホストに限定されるべきです。
これは、スタンドアロンリソースマネージャーを使用するクラスターにとって特に重要です。なぜなら、他のリソースマネージャーが行うようなきめ細かなアクセス制御をサポートしていないためです。
設定すべきポートの完全なリストについては、セキュリティページ を参照してください。
高可用性
デフォルトでは、スタンドアロンスケジューリングクラスターはワーカー障害に対して耐性があります (Spark 自体が作業を他のワーカーに移動して失うことに耐性がある限り)。しかし、スケジューラはスケジューリングの決定を行うためにマスターを使用しており、これは (デフォルトで) 単一障害点を作成します。マスターがクラッシュした場合、新しいアプリケーションを作成できません。これを回避するために、2 つの高可用性スキームを用意しました。以下に詳述します。
ZooKeeper によるスタンバイマスター
概要
ZooKeeper を使用してリーダー選出と一部の状態保存を提供することで、同じ ZooKeeper インスタンスに接続された複数のマスターをクラスターに起動できます。1 つが「リーダー」として選出され、残りはスタンバイモードになります。現在のリーダーがダウンした場合、別のマスターが選出され、古いマスターの状態を回復してからスケジューリングを再開します。全体の回復プロセス (最初のリーダーがダウンしてから) は 1 分から 2 分で完了するはずです。この遅延は *新しい* アプリケーションのスケジューリングにのみ影響することに注意してください。マスターのフェイルオーバー中にすでに実行されていたアプリケーションは影響を受けません。
ZooKeeper の開始方法については、こちら で詳細を確認してください。
設定
このリカバリモードを有効にするには、`spark.deploy.recoveryMode` および関連する `spark.deploy.zookeeper.*` 設定を構成して、spark-env で `SPARK_DAEMON_JAVA_OPTS` を設定できます。
注意点: クラスターに複数のマスターがあるのに、マスターが ZooKeeper を使用するように正しく設定されていない場合、マスターはお互いを検出できず、すべてがリーダーであると認識します。これは健全なクラスター状態にはなりません (すべてのマスターが個別にスケジュールするため)。
詳細
ZooKeeper クラスターをセットアップしたら、高可用性の有効化は簡単です。同じ ZooKeeper 設定 (ZooKeeper URL とディレクトリ) を持つ異なるノードで複数のマスタープロセスを起動するだけです。マスターはいつでも追加および削除できます。
新しいアプリケーションをスケジュールしたり、クラスターにワーカーを追加したりするには、以前は単一のマスターを渡していた場所にマスターの IP アドレスのリストを渡す必要があります。たとえば、SparkContext を `spark://host1:port1,host2:port2` にポイントして起動するかもしれません。これにより、SparkContext は両方のマスターに登録しようとします。host1 がダウンしても、新しいリーダーである host2 を見つけることができるため、この構成は引き続き有効です。
「マスターへの登録」と通常の操作の間には重要な区別があります。起動時、アプリケーションまたはワーカーは現在のリーダーマスターを見つけて登録できる必要があります。しかし、一度正常に登録されると、それは「システム内」(つまり、ZooKeeper に保存されている) になります。フェイルオーバーが発生した場合、新しいリーダーは以前に登録されたすべてのアプリケーションとワーカーに連絡してリーダーシップの変更を通知するため、起動時に新しいマスターの存在を知る必要さえありません。
このプロパティにより、新しいマスターはいつでも作成できます。心配する必要があるのは、*新しい*アプリケーションとワーカーがリーダーになった場合に登録できることだけです。一度登録されると、すべて完了です。
ファイルシステムによる単一ノードリカバリ
概要
ZooKeeper は本番レベルの高可用性にとって最良の方法ですが、マスターがダウンした場合に再起動できるだけであれば、FILESYSTEM モードで対応できます。アプリケーションとワーカーが登録する際、提供されたディレクトリに十分な状態が書き込まれるため、マスタープロセスの再起動時に回復できます。
設定
このリカバリモードを有効にするには、spark-env で `SPARK_DAEMON_JAVA_OPTS` を次の設定で使用して設定できます。
| システムプロパティ | デフォルト値 | 意味 | バージョン以降 |
|---|---|---|---|
spark.deploy.recoveryMode |
NONE | クラスターモードで失敗して再起動した Spark ジョブを回復するためのリカバリモード設定。ファイルシステムベースの単一ノードリカバリモードを有効にするには FILESYSTEM、RocksDB ベースの単一ノードリカバリモードを有効にするには ROCKSDB、ZooKeeper ベースのリカバリモードを使用するには ZOOKEEPER、追加の `spark.deploy.recoveryMode.factory` 設定でカスタムプロバイダークラスを提供するには CUSTOM に設定します。NONE はデフォルト値で、このリカバリモードを無効にします。 | 0.8.1 |
spark.deploy.recoveryDirectory |
"" | Spark がリカバリ状態を保存するディレクトリ。マスターの観点からアクセス可能です。spark.deploy.recoveryMode または spark.deploy.recoveryCompressionCodec が変更された場合、ディレクトリは手動でクリアする必要があることに注意してください。 |
0.8.1 |
spark.deploy.recoveryCompressionCodec |
(なし) | 永続化エンジンの圧縮コーデック。none (デフォルト)、lz4、lzf、snappy、zstd。現在、FILESYSTEM モードのみがこの構成をサポートしています。 | 4.0.0 |
spark.deploy.recoveryTimeout |
(なし) | リカバリプロセスのタイムアウト。デフォルト値は spark.worker.timeout と同じです。 |
4.0.0 |
spark.deploy.recoveryMode.factory |
"" | StandaloneRecoveryModeFactory インターフェイスを実装するクラス |
1.2.0 |
spark.deploy.zookeeper.url |
なし | spark.deploy.recoveryMode が ZOOKEEPER に設定されている場合、この構成は接続する zookeeper URL を設定するために使用されます。 |
0.8.1 |
spark.deploy.zookeeper.dir |
なし | spark.deploy.recoveryMode が ZOOKEEPER に設定されている場合、この構成はリカバリ状態を保存するための zookeeper ディレクトリを設定するために使用されます。 |
0.8.1 |
詳細
- このソリューションは、monit のようなプロセスモニター/マネージャーと組み合わせて使用することも、手動リカバリを有効にするためにも使用できます。
- ファイルシステムリカバリは、まったくリカバリしないよりも明らかに優れていますが、このモードは特定の開発または実験目的には最適ではない可能性があります。特に、`stop-master.sh` でマスターを停止してもリカバリ状態はクリーンアップされないため、新しいマスターを起動するたびにリカバリモードに入ります。これは、以前に登録されたすべてのワーカー/クライアントのタイムアウトを待つ必要がある場合、起動時間が最大 1 分増加する可能性があります。
- 公式にはサポートされていませんが、NFS ディレクトリをリカバリディレクトリとしてマウントできます。元のマスターノードが完全にダウンした場合、別のノードでマスターを起動できます。これにより、以前に登録されたすべてのワーカー/アプリケーションが正しく回復されます (ZooKeeper リカバリと同等)。ただし、将来のアプリケーションは登録できるように、新しいマスターを見つける必要があります。