アプリケーションの送信
Spark の bin ディレクトリにある spark-submit スクリプトは、クラスター上でアプリケーションを起動するために使用されます。このスクリプトは、Spark がサポートするすべての クラスターマネージャー を統一されたインターフェースで利用できるため、各クラスターマネージャーごとにアプリケーションを特別に設定する必要はありません。
アプリケーションの依存関係のバンドル
コードが他のプロジェクトに依存している場合、Spark クラスターにコードを配布するために、それらをアプリケーションと一緒にパッケージ化する必要があります。これを行うには、コードとその依存関係を含むアセンブリ JAR(または「uber」JAR)を作成します。sbt と Maven の両方にアセンブリプラグインがあります。アセンブリ JAR を作成する際には、Spark と Hadoop を provided 依存関係としてリストしてください。これらは実行時にクラスターマネージャーによって提供されるため、バンドルする必要はありません。アセンブルされた JAR ができたら、ここで示すように bin/spark-submit スクリプトを呼び出し、JAR を渡します。
Python の場合、spark-submit の --py-files 引数を使用して、アプリケーションとともに配布される .py、.zip、または .egg ファイルを追加できます。複数の Python ファイルに依存している場合は、それらを .zip または .egg にパッケージ化することをお勧めします。サードパーティの Python 依存関係については、「Python パッケージ管理」を参照してください。
spark-submit を使用したアプリケーションの起動
ユーザーアプリケーションがバンドルされたら、bin/spark-submit スクリプトを使用して起動できます。このスクリプトは、Spark およびその依存関係のクラスパスを設定する処理を行い、Spark がサポートするさまざまなクラスターマネージャーとデプロイモードをサポートできます。
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]一般的に使用されるオプションの一部を以下に示します。
--class: アプリケーションのエントリーポイント(例:org.apache.spark.examples.SparkPi)--master: クラスターの マスター URL(例:spark://23.195.26.187:7077)--deploy-mode: ドライバーをワーカーノードにデプロイするか(cluster)、ローカルの外部クライアントとしてデプロイするか(client)(デフォルト:client) †--conf: キーと値の形式の任意の Spark 設定プロパティ。値にスペースが含まれる場合は、「key=value」を引用符で囲みます(上記参照)。複数の設定は、個別の引数として渡します。(例:--conf <key>=<value> --conf <key2>=<value2>)application-jar: アプリケーションとすべての依存関係を含むバンドルされた JAR へのパス。URL はクラスター内でグローバルに可視である必要があります。たとえば、hdfs://パスや、すべてのノードに存在するfile://パスなどです。application-arguments: メインクラスの main メソッドに渡される引数(存在する場合)
† 一般的なデプロイ戦略は、ワーカーマシンと物理的に共存するゲートウェイマシン(例: スタンドアロン EC2 クラスターのマスターノード)からアプリケーションを送信することです。このセットアップでは、client モードが適切です。client モードでは、ドライバーは spark-submit プロセス内で直接起動され、クラスターへのクライアントとして機能します。アプリケーションの入出力はコンソールにアタッチされます。したがって、このモードは REPL(例: Spark shell)を含むアプリケーションに特に適しています。
あるいは、アプリケーションがワーカーマシンから離れたマシン(例: ローカルのラップトップ)から送信される場合、ドライバーとエグゼキューター間のネットワーク遅延を最小限に抑えるために cluster モードを使用することが一般的です。現在、スタンドアロンモードでは Python アプリケーションのクラスターモードはサポートされていません。
Python アプリケーションの場合、`
使用されている クラスターマネージャー に固有のオプションがいくつかあります。たとえば、Spark スタンドアロンクラスターで cluster デプロイモードを使用する場合、ドライバがゼロ以外の終了コードで失敗した場合に自動的に再起動されるように --supervise を指定することもできます。spark-submit で利用可能なすべてのオプションを列挙するには、--help を付けて実行します。一般的なオプションの例をいくつか示します。
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master "local[8]" \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster in cluster deploy mode
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://xx.yy.zz.ww:443 \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
http://path/to/examples.jar \
1000マスター URL
Spark に渡されるマスター URL は、次のいずれかの形式にすることができます。
| マスター URL | 意味 |
|---|---|
local | Spark を 1 つのワーカー スレッドでローカルに実行します(つまり、並列処理はまったく行われません)。 |
local[K] | Spark を K 個のワーカー スレッドでローカルに実行します(理想的には、マシン上のコア数に設定します)。 |
local[K,F] | Spark を K 個のワーカー スレッドと F 個の最大失敗回数でローカルに実行します(この変数の説明については、spark.task.maxFailures を参照してください)。 |
local[*] | Spark を、マシン上の論理コア数と同じ数のワーカー スレッドでローカルに実行します。 |
local[*,F] | Spark を、マシン上の論理コア数と同じ数のワーカー スレッドと F 個の最大失敗回数でローカルに実行します。 |
local-cluster[N,C,M] | ローカルクラスターモードは単体テスト用です。単一の JVM 内で N 個のワーカー、ワーカーあたり C 個のコア、およびワーカーあたり M MiB のメモリをエミュレートして、分散クラスターをシミュレートします。 |
spark://HOST:PORT | 指定された Spark スタンドアロンクラスター マスターに接続します。ポートは、マスターが使用するように構成されているポートである必要があります。デフォルトは 7077 です。 |
spark://HOST1:PORT1,HOST2:PORT2 | 指定された ZooKeeper を使用したスタンバイマスターを持つ Spark スタンドアロンクラスターに接続します。リストには、ZooKeeper で設定された高可用性クラスター内のすべてのマスターホストが含まれている必要があります。ポートは、各マスターが使用するように構成されているポートである必要があります。デフォルトは 7077 です。 |
yarn | client または cluster モードで YARN クラスターに接続します(--deploy-mode の値によります)。クラスターの場所は、HADOOP_CONF_DIR または YARN_CONF_DIR 変数に基づいて見つかります。 |
k8s://HOST:PORT | client または cluster モードで Kubernetes クラスターに接続します(--deploy-mode の値によります)。HOST と PORT は Kubernetes API サーバー を参照します。デフォルトでは TLS を使用して接続します。安全でない接続を強制するには、k8s://http://HOST:PORT を使用できます。 |
ファイルからの設定の読み込み
spark-submit スクリプトは、プロパティファイルからデフォルトの Spark 設定値を読み込み、それらをアプリケーションに渡すことができます。ファイルは --properties-file パラメーターで指定できます。指定しない場合、デフォルトで Spark は SPARK_HOME ディレクトリの conf/spark-defaults.conf からオプションを読み取ります。
追加のフラグ --load-spark-defaults を使用して、プロパティファイルが --properties-file 経由で提供されている場合でも、Spark に conf/spark-defaults.conf から設定を読み込むように指示できます。これは、たとえば、ユーザーが前者でシステム全体の設定を、後者でユーザー/クラスター固有の設定を行いたい場合に便利です。
このようにデフォルトの Spark 設定を読み込むことで、spark-submit の一部のフラグが不要になる場合があります。たとえば、spark.master プロパティが設定されている場合、spark-submit から --master フラグを安全に省略できます。一般的に、SparkConf で明示的に設定された設定値が最も優先され、次に spark-submit に渡されたフラグ、その次にデフォルトファイルの値が優先されます。
設定オプションの出所が不明な場合は、--verbose オプションを付けて spark-submit を実行することで、詳細なデバッグ情報を表示できます。
高度な依存関係管理
spark-submit を使用する場合、アプリケーション JAR は、--jars オプションで含められた JAR とともに、自動的にクラスターに転送されます。--jars の後の URL はカンマで区切る必要があります。このリストは、ドライバーとエグゼキューターのクラスパスに含まれます。ディレクトリ展開は --jars では機能しません。
Spark は、JAR を配布するためのさまざまな戦略を可能にするために、次の URL スキームを使用します。
- file: - 絶対パスおよび
file:/URI は、ドライバーの HTTP ファイルサーバーによって提供され、各エグゼキューターはドライバー HTTP サーバーからファイルを取得します。 - hdfs:, http:, https:, ftp: - これらは URI からファイルをダウンロードして JAR にします。
- local: - local:/ で始まる URI は、各ワーカーノードにローカルファイルとして存在することが期待されます。これは、ネットワーク I/O が発生しないことを意味し、各ワーカーにプッシュされた、または NFS、GlusterFS などで共有された大きなファイル/JAR に適しています。
JAR およびファイルは、エグゼキューターノードの各 SparkContext の作業ディレクトリにコピーされることに注意してください。これにより、時間が経つにつれてかなりのスペースが使用される可能性があり、クリーンアップが必要になります。YARN では、クリーンアップは自動的に処理され、Spark スタンドアロンでは、spark.worker.cleanup.appDataTtl プロパティで自動クリーンアップを設定できます。
ユーザーは、--packages を使用して、Maven 座標のカンマ区切りリストを指定することで、他の依存関係を含めることもできます。このコマンドを使用すると、すべての推移的な依存関係が処理されます。追加のリポジトリ(または SBT のリゾルバ)は、--repositories フラグを使用してカンマ区切りで追加できます。(パスワードで保護されたリポジトリの認証情報は、リポジトリ URI で、たとえば https://user:password@host/... のように指定できる場合があります。この方法で認証情報を指定する場合は注意してください。)これらのコマンドは、pyspark、spark-shell、および spark-submit で Spark Packages を含めるために使用できます。
Python の場合、同等の --py-files オプションを使用して、`.egg`、`.zip`、および `.py` ライブラリをエグゼキューターに配布できます。
詳細情報
アプリケーションをデプロイしたら、クラスターモードの概要で、分散実行に関与するコンポーネント、およびアプリケーションの監視とデバッグ方法について説明します。