Spark スタンドアロンモード
- セキュリティ
- Spark スタンドアロンのクラスタへのインストール
- クラスタの手動起動
- クラスタ起動スクリプト
- リソース割り当てと設定の概要
- アプリケーションのクラスタへの接続
- クライアントプロパティ
- Spark アプリケーションの起動
- リソーススケジューリング
- エグゼキュータのスケジューリング
- ステージレベルスケジューリングの概要
- モニタリングとロギング
- Hadoop との同時実行
- ネットワークセキュリティのためのポート設定
- 高可用性
Mesos や YARN クラスタマネージャでの実行に加えて、Spark はシンプルなスタンドアロンデプロイモードも提供します。スタンドアロンクラスタは、マスターとワーカーを手動で起動するか、提供されている起動スクリプトを使用することで起動できます。テストのため、これらのデーモンを単一マシンで実行することも可能です。
セキュリティ
認証などのセキュリティ機能はデフォルトでは有効になっていません。インターネットまたは信頼できないネットワークに公開されているクラスタをデプロイする場合は、不正なアプリケーションがクラスタ上で実行されないように、クラスタへのアクセスを保護することが重要です。Spark を実行する前に、Spark セキュリティとこのドキュメントの具体的なセキュリティセクションを参照してください。
Spark スタンドアロンのクラスタへのインストール
Spark スタンドアロンモードをインストールするには、コンパイル済みの Spark バージョンをクラスタの各ノードに配置するだけです。各リリースで事前にビルドされた Spark バージョンを入手するか、自分でビルドすることができます。
クラスタの手動起動
スタンドアロンマスターサーバを起動するには、以下を実行します。
./sbin/start-master.sh
起動すると、マスターは自身のspark://HOST:PORT
URL を出力します。このURLを使用してワーカーを接続するか、「master」引数をSparkContext
に渡すことができます。このURLは、デフォルトでhttps://:8080であるマスターのWeb UI でも確認できます。
同様に、1つ以上のワーカーを起動し、以下を使用してマスターに接続できます。
./sbin/start-worker.sh <master-spark-URL>
ワーカーを起動したら、マスターのWeb UI(デフォルトでhttps://:8080)を確認してください。新しいノードが、そのCPU数とメモリ(OS用に1ギガバイト残して)とともに表示されます。
最後に、次の設定オプションをマスターとワーカーに渡すことができます。
引数 | 意味 |
---|---|
-h HOST 、--host HOST |
リスンするホスト名 |
-i HOST 、--ip HOST |
リスンするホスト名(非推奨、-h または --host を使用してください) |
-p PORT 、--port PORT |
サービスがリスンするポート(マスターの場合はデフォルトで 7077、ワーカーの場合はランダム) |
|
Web UI のポート(マスターの場合はデフォルトで 8080、ワーカーの場合は 8081) |
-c CORES 、--cores CORES |
マシン上で Spark アプリケーションが使用できる合計 CPU コア数(デフォルト:使用可能なすべてのコア)。ワーカーでのみ使用できます。 |
-m MEM 、--memory MEM |
マシン上で Spark アプリケーションが使用できる合計メモリ量(1000M や 2G のような形式、デフォルト:マシンの総 RAM から 1 GiB を差し引いた量)。ワーカーでのみ使用できます。 |
-d DIR 、--work-dir DIR |
スクラッチスペースとジョブ出力ログに使用するディレクトリ(デフォルト:SPARK_HOME/work)。ワーカーでのみ使用できます。 |
|
読み込むカスタム Spark プロパティファイルへのパス(デフォルト:conf/spark-defaults.conf) |
クラスタ起動スクリプト
起動スクリプトで Spark スタンドアロンクラスタを起動するには、Spark ディレクトリに conf/workers という名前のファイルを作成する必要があります。このファイルには、Spark ワーカーを起動するすべてのマシンのホスト名を、1行に1つずつ記述する必要があります。conf/workers が存在しない場合、起動スクリプトはデフォルトで単一マシン(localhost)になり、テストに役立ちます。マスターマシンは、ssh を介して各ワーカーマシンにアクセスします。デフォルトでは、ssh は並列で実行され、パスワードレス(秘密鍵を使用)アクセスが設定されている必要があります。パスワードレス設定がない場合は、環境変数 SPARK_SSH_FOREGROUND を設定し、各ワーカーのパスワードを順番に入力できます。
このファイルを設定したら、Hadoop のデプロイスクリプトに基づいて、SPARK_HOME/sbin
にある以下のシェルスクリプトを使用してクラスタを起動または停止できます。
sbin/start-master.sh
- スクリプトが実行されているマシンにマスターインスタンスを起動します。sbin/start-workers.sh
-conf/workers
ファイルに指定されている各マシンにワーカーインスタンスを起動します。sbin/start-worker.sh
- スクリプトが実行されているマシンにワーカーインスタンスを起動します。sbin/start-connect-server.sh
- スクリプトが実行されているマシンに Spark Connect サーバを起動します。sbin/start-all.sh
- 上記のようにマスターと複数のワーカーを起動します。sbin/stop-master.sh
-sbin/start-master.sh
スクリプトで起動されたマスターを停止します。sbin/stop-worker.sh
- スクリプトが実行されているマシン上のすべてのワーカーインスタンスを停止します。sbin/stop-workers.sh
-conf/workers
ファイルに指定されているマシン上のすべてのワーカーインスタンスを停止します。sbin/stop-connect-server.sh
- スクリプトが実行されているマシン上のすべての Spark Connect サーバインスタンスを停止します。sbin/stop-all.sh
- 上記のようにマスターとワーカーを停止します。
これらのスクリプトは、ローカルマシンではなく、Spark マスターを実行するマシンで実行する必要があることに注意してください。
conf/spark-env.sh
で環境変数を設定することで、クラスタをさらに設定できます。conf/spark-env.sh.template
からこのファイルを作成し、設定を有効にするために *すべてのワーカーマシンにコピー* してください。次の設定を使用できます。
環境変数 | 意味 |
---|---|
SPARK_MASTER_HOST |
マスターを特定のホスト名またはIPアドレス(例:パブリックアドレス)にバインドします。 |
SPARK_MASTER_PORT |
マスターを別のポートで起動します(デフォルト:7077)。 |
SPARK_MASTER_WEBUI_PORT |
マスターWeb UI のポート(デフォルト:8080)。 |
SPARK_MASTER_OPTS |
"-Dx=y" の形式でマスターのみに適用される設定プロパティ(デフォルト:なし)。可能なオプションのリストについては、以下を参照してください。 |
SPARK_LOCAL_DIRS |
マップ出力ファイルやディスクに保存されるRDDなど、Sparkの「スクラッチ」スペースに使用するディレクトリ。これはシステムの高速なローカルディスクにある必要があります。異なるディスク上の複数のディレクトリのコンマ区切りリストにすることもできます。 |
SPARK_WORKER_CORES |
マシン上で Spark アプリケーションが使用できる合計コア数(デフォルト:使用可能なすべてのコア)。 |
SPARK_WORKER_MEMORY |
マシン上で Spark アプリケーションが使用できる合計メモリ量(例:1000m 、2g 、デフォルト:合計メモリから 1 GiB を差し引いた量)。各アプリケーションの *個々の* メモリは、その spark.executor.memory プロパティを使用して設定することに注意してください。 |
SPARK_WORKER_PORT |
Spark ワーカーを特定のポートで起動します(デフォルト:ランダム)。 |
SPARK_WORKER_WEBUI_PORT |
ワーカーWeb UI のポート(デフォルト:8081)。 |
SPARK_WORKER_DIR |
アプリケーションを実行するディレクトリ(ログとスクラッチスペースを含む、デフォルト:SPARK_HOME/work)。 |
SPARK_WORKER_OPTS |
"-Dx=y" の形式でワーカーのみに適用される設定プロパティ(デフォルト:なし)。可能なオプションのリストについては、以下を参照してください。 |
SPARK_DAEMON_MEMORY |
Spark マスターとワーカーデーモン自体に割り当てるメモリ(デフォルト:1g)。 |
SPARK_DAEMON_JAVA_OPTS |
"-Dx=y" の形式で Spark マスターとワーカーデーモン自体の JVM オプション(デフォルト:なし)。 |
SPARK_DAEMON_CLASSPATH |
Spark マスターとワーカーデーモン自体のクラスパス(デフォルト:なし)。 |
SPARK_PUBLIC_DNS |
Spark マスターとワーカーのパブリックDNS名(デフォルト:なし)。 |
**注記:** 起動スクリプトは現在、Windows をサポートしていません。Windows で Spark クラスタを実行するには、マスターとワーカーを手動で起動してください。
SPARK_MASTER_OPTS は次のシステムプロパティをサポートします。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.master.ui.port |
8080 |
マスターWeb UI エンドポイントのポート番号を指定します。 | 1.1.0 |
spark.master.ui.decommission.allow.mode |
LOCAL |
マスターWeb UI の /workers/kill エンドポイントの動作を指定します。可能な選択肢は次のとおりです。LOCAL は、マスターを実行しているマシンに対してローカルであるIPアドレスからのこのエンドポイントを許可することを意味し、DENY はこのエンドポイントを完全に無効にすることを意味し、ALLOW は任意のIPアドレスからのこのエンドポイントの呼び出しを許可することを意味します。 |
3.1.0 |
spark.master.rest.enabled |
false |
マスターREST APIエンドポイントを使用するかどうかを指定します。 | 1.3.0 |
spark.master.rest.port |
6066 |
マスターREST APIエンドポイントのポート番号を指定します。 | 1.3.0 |
spark.deploy.retainedApplications |
200 | 表示する完了済みアプリケーションの最大数。この制限を維持するために、古いアプリケーションはUIから削除されます。 |
0.8.0 |
spark.deploy.retainedDrivers |
200 | 表示する完了済みドライバの最大数。この制限を維持するために、古いドライバはUIから削除されます。 |
1.1.0 |
spark.deploy.spreadOut |
true | スタンドアロンクラスタマネージャがアプリケーションをノード全体に分散させるか、可能な限り少ないノードに統合しようとするかを決定します。分散は通常、HDFSでのデータ局所性には優れていますが、計算集約型のワークロードには統合の方が効率的です。 |
0.6.1 |
spark.deploy.defaultCores |
(無限) | spark.cores.max を設定しない場合、Sparkのスタンドアロンモードでアプリケーションに割り当てるコアのデフォルト数。設定されていない場合、アプリケーションは、自身でspark.cores.max を設定しない限り、常に使用可能なすべてのコアを取得します。共有クラスタでは、ユーザーがデフォルトでクラスタ全体を取得するのを防ぐために、この値を低く設定します。 |
0.9.0 |
spark.deploy.maxExecutorRetries |
10 | スタンドアロンクラスタマネージャが故障したアプリケーションを削除する前に発生できる、連続したexecutorエラーの最大数の制限。実行中のexecutorがある場合は、アプリケーションは決して削除されません。アプリケーションでspark.deploy.maxExecutorRetries を超える連続したエラーが発生し、それらのエラーの間にexecutorが正常に開始されず、アプリケーションに実行中のexecutorがない場合、スタンドアロンクラスタマネージャはアプリケーションを削除し、失敗としてマークします。この自動削除を無効にするには、spark.deploy.maxExecutorRetries を-1 に設定します。 |
1.6.3 |
spark.worker.timeout |
60 | ハートビートを受信しなかった場合、スタンドアロンデプロイマスターがワーカーを失われたと見なすまでの秒数。 | 0.6.2 |
spark.worker.resource.{name}.amount |
(なし) | ワーカーで使用する特定のリソースの量。 | 3.0.0 |
spark.worker.resource.{name}.discoveryScript |
(なし) | ワーカーの起動時に特定のリソースを見つけるために使用されるリソース検出スクリプトへのパス。スクリプトの出力は、ResourceInformation クラスのようにフォーマットする必要があります。 |
3.0.0 |
spark.worker.resourcesFile |
(なし) | ワーカーの起動時にさまざまなリソースを見つけるために使用されるリソースファイルへのパス。リソースファイルの内容は、[{"id":{"componentName": "spark.worker", "resourceName":"gpu"}, "addresses":["0","1","2"]}] のようにフォーマットする必要があります。特定のリソースがリソースファイルに見つからない場合、そのリソースを見つけるために検出スクリプトが使用されます。検出スクリプトでもリソースが見つからない場合、ワーカーは起動に失敗します。 |
3.0.0 |
SPARK_WORKER_OPTSは次のシステムプロパティをサポートしています。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.worker.cleanup.enabled |
false | ワーカー/アプリケーションディレクトリの定期的なクリーンアップを有効にします。これはスタンドアロンモードのみに影響し、YARNは異なる方法で動作することに注意してください。停止したアプリケーションのディレクトリのみがクリーンアップされます。spark.shuffle.service.db.enabled が"true"の場合、これを有効にする必要があります。 |
1.0.0 |
spark.worker.cleanup.interval |
1800(30分) | ワーカーがローカルマシン上の古いアプリケーション作業ディレクトリをクリーンアップする間隔を秒単位で制御します。 | 1.0.0 |
spark.worker.cleanup.appDataTtl |
604800(7日間、7 * 24 * 3600) | 各ワーカーでアプリケーション作業ディレクトリを保持する秒数。これは存続時間であり、使用可能なディスク容量によって異なります。アプリケーションログとjarは、各アプリケーション作業ディレクトリにダウンロードされます。時間の経過とともに、特にジョブを頻繁に実行する場合、作業ディレクトリはディスク容量を急速に消費する可能性があります。 | 1.0.0 |
spark.shuffle.service.db.enabled |
true | 外部シャッフルサービスの状態をローカルディスクに保存することで、外部シャッフルサービスの再起動時に、現在のexecutorに関する情報を自動的に再ロードします。これはスタンドアロンモードのみに影響します(yarnでは常にこの動作が有効になっています)。状態が最終的にクリーンアップされるように、spark.worker.cleanup.enabled も有効にする必要があります。この設定は将来削除される可能性があります。 |
3.0.0 |
spark.shuffle.service.db.backend |
LEVELDB | spark.shuffle.service.db.enabled がtrueの場合、ユーザーはこれを使用して、シャッフルサービス状態ストアで使用されるディスクベースのストアの種類を指定できます。現在、`LEVELDB`と`ROCKSDB`をサポートしており、デフォルト値は`LEVELDB`です。`LevelDB/RocksDB`の元のデータストアは、現在、他の種類のストレージに自動的に変換されません。 |
3.4.0 |
spark.storage.cleanupFilesAfterExecutorExit |
true | executorの終了後に、ワーカーディレクトリの非シャッフルファイル(一時シャッフルブロック、キャッシュされたRDD/ブロードキャストブロック、スピルファイルなど)のクリーンアップを有効にします。これは`spark.worker.cleanup.enabled`と重複しません。これは、停止したexecutorのローカルディレクトリ内の非シャッフルファイルのクリーンアップを有効にする一方、`spark.worker.cleanup.enabled`は停止し、タイムアウトしたアプリケーションのすべてのファイル/サブディレクトリのクリーンアップを有効にするためです。これはスタンドアロンモードのみに影響し、他のクラスタマネージャのサポートは将来追加される可能性があります。 | 2.4.0 |
spark.worker.ui.compressedLogFileLengthCacheSize |
100 | 圧縮ログファイルの場合、非圧縮ファイルはファイルの圧縮解除によってのみ計算できます。Sparkは圧縮ログファイルの非圧縮ファイルサイズをキャッシュします。このプロパティはキャッシュサイズを制御します。 | 2.0.2 |
リソース割り当てと設定の概要
設定ページのカスタムリソーススケジューリングと設定の概要セクションをお読みください。このセクションでは、リソーススケジューリングのSparkスタンドアロン固有の側面についてのみ説明しています。
Sparkスタンドアロンは2つの部分で構成されています。1つはワーカーのリソースを設定することであり、もう1つは特定のアプリケーションのリソース割り当てです。
ユーザーは、executorに割り当てることができるリソースセットをワーカーに設定する必要があります。spark.worker.resource.{resourceName}.amount
は、ワーカーが割り当てられた各リソースの量を制御するために使用されます。ユーザーは、spark.worker.resourcesFile
またはspark.worker.resource.{resourceName}.discoveryScript
のいずれかを指定して、ワーカーが割り当てられたリソースの検出方法を指定する必要があります。上記の各説明を参照して、セットアップに最適な方法を確認してください。
2番目の部分は、Sparkスタンドアロンでアプリケーションを実行することです。標準のSparkリソース設定からの唯一の特殊なケースは、クライアントモードでドライバを実行する場合です。クライアントモードのドライバの場合、ユーザーはspark.driver.resourcesFile
またはspark.driver.resource.{resourceName}.discoveryScript
を使用して、使用するリソースを指定できます。ドライバが他のドライバと同じホストで実行されている場合は、リソースファイルまたは検出スクリプトが、同じノードで実行されている他のドライバと競合しないリソースのみを返すようにしてください。
ワーカーは割り当てられたリソースで各executorを開始するため、アプリケーションの送信時に検出スクリプトを指定する必要はありません。
アプリケーションのクラスタへの接続
Sparkクラスタでアプリケーションを実行するには、マスターのspark://IP:PORT
URLをSparkContext
コンストラクタに渡すだけです。
クラスタに対して対話型のSparkシェルを実行するには、次のコマンドを実行します。
./bin/spark-shell --master spark://IP:PORT
オプション--total-executor-cores <numCores>
を渡して、spark-shellがクラスタで使用するコア数を制御することもできます。
クライアントプロパティ
Sparkアプリケーションは、スタンドアロンモードに固有の次の設定プロパティをサポートしています。
プロパティ名 | デフォルト値 | 意味 | バージョン以降 |
---|---|---|---|
spark.standalone.submit.waitAppCompletion |
false |
スタンドアロンクラスタモードでは、アプリケーションが完了するまでクライアントが終了を待つかどうかを制御します。true に設定されている場合、クライアントプロセスはドライバの状態をポーリングしながら存続します。それ以外の場合は、クライアントプロセスは送信後に終了します。 |
3.1.0 |
Spark アプリケーションの起動
Spark プロトコル
spark-submit
スクリプトは、コンパイル済みのSparkアプリケーションをクラスタに送信する最も簡単な方法を提供します。スタンドアロンクラスタの場合、Sparkは現在、2つのデプロイモードをサポートしています。client
モードでは、ドライバはアプリケーションを送信するクライアントと同じプロセスで起動されます。しかし、cluster
モードでは、ドライバはクラスタ内のワーカープロセスの1つから起動され、クライアントプロセスはアプリケーションの完了を待たずにアプリケーションの送信という役割を果たすとすぐに終了します。
アプリケーションがSpark submitを通じて起動された場合、アプリケーションjarは自動的にすべてのワーカーノードに配布されます。アプリケーションが依存する追加のjarについては、コンマを区切り文字として使用して--jars
フラグを介して指定する必要があります(例:--jars jar1,jar2
)。アプリケーションの設定または実行環境を制御するには、Spark設定を参照してください。
さらに、スタンドアロンcluster
モードは、ゼロ以外の終了コードで終了した場合にアプリケーションを自動的に再起動できます。この機能を使用するには、アプリケーションの起動時に--supervise
フラグをspark-submit
に渡すことができます。その後、繰り返し失敗しているアプリケーションを強制終了したい場合は、次のように実行できます。
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
http://<master url>:8080
のスタンドアロンマスターWeb UIでドライバIDを確認できます。
REST API
spark.master.rest.enabled
が有効になっている場合、Sparkマスターはhttp://[host:port]/[version]/submissions/[action]
を介して追加のREST APIを提供します。ここで、host
はマスターホスト、port
はspark.master.rest.port
(デフォルト:6066)で指定されたポート番号、version
はプロトコルバージョン(現時点ではv1
)、action
は次のサポートされているアクションの1つです。
コマンド | 説明 | HTTPメソッド | バージョン以降 |
---|---|---|---|
create |
cluster モードを介してSparkドライバを作成します。 |
POST | 1.3.0 |
kill |
単一のSparkドライバを強制終了します。 | POST | 1.3.0 |
status |
Sparkジョブの状態を確認します。 | GET | 1.3.0 |
pi.py
とREST APIを使用したcurl
CLIコマンドの例を次に示します。
$ curl -XPOST http://IP:PORT/v1/submissions/create \
--header "Content-Type:application/json;charset=UTF-8" \
--data '{
"appResource": "",
"sparkProperties": {
"spark.master": "spark://master:7077",
"spark.app.name": "Spark Pi",
"spark.driver.memory": "1g",
"spark.driver.cores": "1",
"spark.jars": ""
},
"clientSparkVersion": "",
"mainClass": "org.apache.spark.deploy.SparkSubmit",
"environmentVariables": { },
"action": "CreateSubmissionRequest",
"appArgs": [ "/opt/spark/examples/src/main/python/pi.py", "10" ]
}'
上記のcreate
リクエストに対するREST APIからの応答を次に示します。
{
"action" : "CreateSubmissionResponse",
"message" : "Driver successfully submitted as driver-20231124153531-0000",
"serverSparkVersion" : "3.5.1",
"submissionId" : "driver-20231124153531-0000",
"success" : true
}
リソーススケジューリング
スタンドアロンクラスタモードは現在、アプリケーション間でシンプルなFIFOスケジューラのみをサポートしています。ただし、複数の同時ユーザーを許可するには、各アプリケーションが使用するリソースの最大数を制御できます。デフォルトでは、クラスタ内の*すべての*コアを取得します。これは、一度に1つのアプリケーションのみを実行する場合にのみ意味があります。SparkConfでspark.cores.max
を設定することにより、コア数を制限できます。例:
val conf = new SparkConf()
.setMaster(...)
.setAppName(...)
.set("spark.cores.max", "10")
val sc = new SparkContext(conf)
さらに、クラスタマスタープロセスでspark.deploy.defaultCores
を設定して、spark.cores.max
を設定していないアプリケーションのデフォルトを無限未満に変更できます。conf/spark-env.sh
に以下を追加して実行します。
export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"
これは、ユーザーが個別にコア数の最大値を設定していない共有クラスタで役立ちます。
エグゼキュータのスケジューリング
各エグゼキュータに割り当てられるコア数は構成可能です。spark.executor.cores
が明示的に設定されている場合、ワーカーに十分なコアとメモリがある場合、同じアプリケーションからの複数のエグゼキュータが同じワーカー上で起動される可能性があります。そうでない場合、各エグゼキュータはデフォルトでワーカーで使用可能なすべてのコアを取得するため、1回のスケジュール反復中に、アプリケーションごとに1つのエグゼキュータのみが各ワーカーで起動されます。
ステージレベルスケジューリングの概要
スタンドアロンでは、ステージレベルのスケジューリングがサポートされています。
- 動的リソース割り当てが無効になっている場合: ユーザーはステージレベルで異なるタスクリソース要件を指定でき、起動時に要求されたのと同じエグゼキュータを使用します。
- 動的リソース割り当てが有効になっている場合: 現在、マスターが1つのアプリケーションのエグゼキュータを割り当てると、複数のResourceProfileに対してResourceProfile IDの順序に基づいてスケジュールされます。IDが小さいResourceProfileが最初にスケジュールされます。通常、Sparkは1つのステージを完了してから次のステージを開始するため、これは問題になりませんが、ジョブサーバータイプのシナリオでは影響を与える可能性があるため、覚えておく必要があります。スケジューリングでは、組み込みのエグゼキュータリソースからエグゼキュータメモリとエグゼキュータコアリソースのみ、ResourceProfileからのその他のカスタムリソースのみを使用し、オフヒープやmemoryOverheadなどの他の組み込みのエグゼキュータリソースは効果がありません。基本のデフォルトプロファイルは、アプリケーションを送信するときにSpark構成に基づいて作成されます。基本のデフォルトプロファイルのエグゼキュータメモリとエグゼキュータコアリソースはカスタムResourceProfileに伝播できますが、他のカスタムリソースは伝播できません。
注意点
動的リソース割り当てで説明されているように、動的リソース割り当てが有効な状態で各エグゼキュータのコアが明示的に指定されていない場合、Sparkは予想よりもはるかに多くのエグゼキュータを取得する可能性があります。そのため、ステージレベルのスケジューリングを使用する場合は、各リソースプロファイルのエグゼキュータコアリソースを明示的に設定することをお勧めします。
モニタリングとロギング
Sparkのスタンドアロンモードは、クラスタを監視するためのWebベースのユーザーインターフェースを提供します。マスターと各ワーカーには、クラスタとジョブの統計情報を表示する独自のWeb UIがあります。デフォルトでは、ポート8080でマスターのWeb UIにアクセスできます。ポートは、構成ファイルまたはコマンドラインオプションのいずれかで変更できます。
さらに、各ジョブの詳細なログ出力も、各ワーカーノードの作業ディレクトリ(デフォルトではSPARK_HOME/work
)に書き込まれます。各ジョブには、コンソールに出力されたすべての出力を含むstdout
とstderr
という2つのファイルが表示されます。
Hadoop との同時実行
同じマシン上で別々のサービスとして起動するだけで、既存のHadoopクラスタとSparkを併用できます。SparkからHadoopデータにアクセスするには、hdfs:// URL(通常はhdfs://<namenode>:9000/path
ですが、Hadoop NameNodeのWeb UIで正しいURLを確認できます)を使用します。あるいは、Spark用に個別のクラスタを設定し、ネットワーク経由でHDFSにアクセスすることもできます。これはディスクローカルアクセスよりも遅くなりますが、同じローカルエリアネットワーク内(たとえば、HadoopがあるラックごとにいくつかのSparkマシンを配置する)で実行している場合は問題にならない可能性があります。
ネットワークセキュリティのためのポート設定
一般的に、Sparkクラスタとそのサービスはパブリックインターネット上に展開されません。通常はプライベートサービスであり、Sparkを展開する組織のネットワーク内からのみアクセス可能である必要があります。Sparkサービスで使用されるホストとポートへのアクセスは、サービスにアクセスする必要がある元のホストに制限する必要があります。
これは、他のリソースマネージャとは異なり、きめ細かいアクセス制御をサポートしていないスタンドアロンリソースマネージャを使用するクラスタにとって特に重要です。
構成する必要があるポートの完全なリストについては、セキュリティページを参照してください。
高可用性
デフォルトでは、スタンドアロンのスケジューリングクラスタはワーカーの障害に耐性があります(Spark自体が作業の損失に耐性があり、他のワーカーに移動することで耐性がある限り)。ただし、スケジューラはマスターを使用してスケジューリングの決定を行い、これにより(デフォルトで)単一障害点が作成されます。マスターがクラッシュすると、新しいアプリケーションを作成できなくなります。これを回避するために、以下に詳細を説明する2つの高可用性スキームがあります。
ZooKeeper を使用したスタンバイマスター
概要
リーダー選出と状態の保存にZooKeeperを使用することにより、同じZooKeeperインスタンスに接続されたクラスタに複数のマスターを起動できます。1つが「リーダー」に選出され、他のものはスタンバイモードになります。現在のリーダーが停止すると、別のマスターが選出され、古いマスターの状態が復元され、スケジューリングが再開されます。復旧プロセス全体(最初のリーダーが停止してから)には、1〜2分かかります。この遅延は、新しいアプリケーションのスケジューリングのみに影響します。マスターのフェイルオーバー中に既に実行されていたアプリケーションには影響ありません。
ZooKeeperの使用方法の詳細については、こちらをご覧ください。
設定
このリカバリモードを有効にするには、spark.deploy.recoveryMode
および関連するspark.deploy.zookeeper.*構成を構成することで、spark-envでSPARK_DAEMON_JAVA_OPTSを設定できます。これらの構成の詳細については、構成ドキュメントを参照してください。
考えられる問題点: クラスタに複数のマスターがあるのに、マスターがZooKeeperを使用するように正しく構成されていない場合、マスターはお互いを検出できず、すべてがリーダーであると考えてしまいます。これにより、(すべてのマスターが独立してスケジュールするため)正常なクラスタ状態になりません。
詳細
ZooKeeperクラスタを設定したら、高可用性を有効にするのは簡単です。同じZooKeeper構成(ZooKeeper URLとディレクトリ)を使用して、異なるノードで複数のマスタープロセスを起動するだけです。マスターはいつでも追加および削除できます。
新しいアプリケーションをスケジュールしたり、クラスタにワーカーを追加するには、現在のリーダーのIPアドレスを知る必要があります。これは、単一のマスターを渡していた場所に複数のマスターのリストを渡すだけで実現できます。たとえば、spark://host1:port1,host2:port2
を指すSparkContextを起動するとします。これにより、SparkContextは両方のマスターへの登録を試みます。host1
が停止した場合でも、新しいリーダーhost2
が見つかるため、この構成は依然として有効です。
「マスターへの登録」と通常の操作には重要な違いがあります。起動時に、アプリケーションまたはワーカーは現在のリードマスターを見つけて登録できる必要があります。ただし、正常に登録されると、「システム内」(つまり、ZooKeeperに保存されている)になります。フェイルオーバーが発生した場合、新しいリーダーは以前に登録されたすべてのアプリケーションとワーカーに連絡してリーダーシップの変化を通知するため、起動時に新しいマスターの存在を知る必要さえありません。
この特性により、新しいマスターはいつでも作成でき、心配する必要があるのは、新しいアプリケーションとワーカーがリーダーになった場合に登録できることです。登録されると、後は管理されます。
ローカルファイルシステムを使用したシングルノードリカバリ
概要
ZooKeeperは本番レベルの高可用性を実現するための最良の方法ですが、マスターが停止した場合に再起動するだけでよい場合は、FILESYSTEMモードを使用できます。アプリケーションとワーカーが登録されると、マスタープロセスの再起動時に復元できるだけの状態が指定されたディレクトリに書き込まれます。
設定
このリカバリモードを有効にするには、この構成を使用してspark-envでSPARK_DAEMON_JAVA_OPTSを設定できます。
システムプロパティ | 意味 | バージョン以降 |
---|---|---|
spark.deploy.recoveryMode |
単一ノードリカバリモードを有効にするにはFILESYSTEMに設定します(デフォルト:NONE)。 | 0.8.1 |
spark.deploy.recoveryDirectory |
Sparkがリカバリ状態を保存するディレクトリで、マスターの観点からアクセスできます。 | 0.8.1 |
詳細
- このソリューションは、monitなどのプロセス監視/マネージャーと組み合わせて使用するか、手動によるリカバリを再起動によって有効にするために使用できます。
- ファイルシステムリカバリは、リカバリを行わないよりもはるかに簡単ですが、このモードは特定の開発目的や実験目的には最適ではない可能性があります。特に、stop-master.shでマスターを強制終了してもリカバリ状態はクリーンアップされません。そのため、新しいマスターを起動するたびに、リカバリモードになります。これにより、以前に登録されたすべてのワーカー/クライアントのタイムアウトを待つ必要がある場合、起動時間が最大1分増加する可能性があります。
- 公式にはサポートされていませんが、NFSディレクトリをリカバリディレクトリとしてマウントできます。元のマスターノードが完全に停止した場合、別のノードでマスターを起動できます。これにより、以前に登録されたすべてのワーカー/アプリケーションが正しく復元されます(ZooKeeperリカバリと同等)。ただし、将来のアプリケーションは、登録するために新しいマスターを見つけることができる必要があります。