アプリケーションの送信
Sparkのbin
ディレクトリにあるspark-submit
スクリプトは、クラスタ上でアプリケーションを起動するために使用されます。これは、統一されたインターフェースを通じてSparkがサポートするすべてのクラスタマネージャを使用できるため、アプリケーションを個別に設定する必要はありません。
アプリケーションの依存関係のバンドル
コードが他のプロジェクトに依存している場合、Sparkクラスタにコードを配布するために、アプリケーションと一緒にそれらをパッケージ化する必要があります。これを行うには、コードとその依存関係を含むアセンブリjar(または「uber」jar)を作成します。sbtとMavenの両方には、アセンブリプラグインがあります。アセンブリjarを作成する際には、SparkとHadoopをprovided
依存関係としてリストします。これらは、実行時にクラスタマネージャによって提供されるため、バンドルする必要はありません。アセンブリjarが作成されたら、ここで示すようにjarを渡しながらbin/spark-submit
スクリプトを呼び出すことができます。
Pythonの場合、spark-submit
の--py-files
引数を使用して、.py
、.zip
、または.egg
ファイルをアプリケーションと共に配布に追加できます。複数のPythonファイルに依存する場合は、それらを.zip
または.egg
にパッケージすることをお勧めします。サードパーティのPython依存関係については、Pythonパッケージ管理を参照してください。
spark-submitによるアプリケーションの起動
ユーザアプリケーションがバンドルされると、bin/spark-submit
スクリプトを使用して起動できます。このスクリプトは、Sparkとその依存関係でクラスパスを設定し、Sparkがサポートするさまざまなクラスタマネージャとデプロイモードをサポートします。
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
一般的に使用されるオプションの一部を以下に示します。
--class
: アプリケーションのエントリポイント(例:org.apache.spark.examples.SparkPi
)--master
: クラスタのマスターURL(例:spark://23.195.26.187:7077
)--deploy-mode
: ワーカーノード(cluster
)にドライバをデプロイするか、外部クライアント(client
)としてローカルにデプロイするか (デフォルト:client
) †--conf
: key=value形式の任意のSpark設定プロパティ。スペースを含む値は、「key=value」を引用符で囲みます(例のとおり)。複数の設定は、個別の引数として渡す必要があります。(例:--conf <key>=<value> --conf <key2>=<value2>
)application-jar
: アプリケーションとすべての依存関係を含むバンドルされたjarへのパス。このURLは、クラスタ内でグローバルに表示可能である必要があります。たとえば、hdfs://
パスや、すべてのノードに存在するfile://
パスなどです。application-arguments
: メインクラスのmainメソッドに渡される引数(存在する場合)
†一般的なデプロイメント戦略は、ワーカーマシンと物理的に同じ場所に配置されたゲートウェイマシン(例:スタンドアロンEC2クラスタのマスターノード)からアプリケーションを送信することです。この設定では、client
モードが適切です。client
モードでは、ドライバはspark-submit
プロセスの内部で直接起動され、クラスタへのクライアントとして機能します。アプリケーションの入出力はコンソールに接続されます。したがって、このモードは、REPL(例:Sparkシェル)を含むアプリケーションに特に適しています。
あるいは、アプリケーションがワーカーマシンから離れたマシン(例:ラップトップ上ローカル)から送信される場合、ドライバとエグゼキュータ間のネットワーク遅延を最小限に抑えるためにcluster
モードを使用するのが一般的です。現在、スタンドアロンモードはPythonアプリケーションのクラスタモードをサポートしていません。
Pythonアプリケーションの場合、<application-jar>
の代わりに.py
ファイルを単純に渡し、--py-files
を使用してPython .zip
、.egg
、または.py
ファイルを検索パスに追加します。
使用されているクラスタマネージャに固有のオプションがいくつかあります。たとえば、cluster
デプロイモードのSparkスタンドアロンクラスタでは、--supervise
を指定して、ゼロ以外の終了コードで失敗した場合にドライバが自動的に再起動されるようにすることもできます。spark-submit
で使用できるそのようなすべてのオプションを列挙するには、--help
を使用して実行します。一般的なオプションの例をいくつか示します。
# Run application locally on 8 cores
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[8] \
/path/to/examples.jar \
100
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a Spark standalone cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
# Run on a YARN cluster in cluster deploy mode
export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
# Run a Python application on a Spark standalone cluster
./bin/spark-submit \
--master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000
# Run on a Mesos cluster in cluster deploy mode with supervise
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://207.184.161.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
http://path/to/examples.jar \
1000
# Run on a Kubernetes cluster in cluster deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master k8s://xx.yy.zz.ww:443 \
--deploy-mode cluster \
--executor-memory 20G \
--num-executors 50 \
http://path/to/examples.jar \
1000
マスターURL
Sparkに渡されるマスターURLは、次の形式のいずれかになります。
マスターURL | 意味 |
---|---|
local | 1つのワーカスレッドを使用してローカルでSparkを実行します(つまり、まったく並列処理を行いません)。 |
local[K] | K個のワーカスレッドを使用してローカルでSparkを実行します(理想的には、マシンのコア数に設定します)。 |
local[K,F] | K個のワーカスレッドとF個のmaxFailuresを使用してローカルでSparkを実行します(この変数の説明についてはspark.task.maxFailuresを参照してください)。 |
local[*] | マシンの論理コア数と同じ数のワーカスレッドを使用してローカルでSparkを実行します。 |
local[*,F] | マシンの論理コア数と同じ数のワーカスレッドとF個のmaxFailuresを使用してローカルでSparkを実行します。 |
local-cluster[N,C,M] | ローカルクラスタモードは、単体テストのみに使用されます。単一のJVMでN個のワーカー、ワーカーあたりC個のコア、ワーカーあたりM MiBのメモリを持つ分散クラスタをエミュレートします。 |
spark://HOST:PORT | 指定されたSparkスタンドアロンクラスタマスターに接続します。ポートは、マスターが使用するように設定されているポートである必要があります(デフォルトは7077)。 |
spark://HOST1:PORT1,HOST2:PORT2 | Zookeeperを使用したスタンバイマスターを含むSparkスタンドアロンクラスタに接続します。このリストには、Zookeeperで設定された高可用性クラスタのすべてのマスターホストを含める必要があります。ポートは、各マスターが使用するように設定されているポートである必要があります(デフォルトは7077)。 |
mesos://HOST:PORT | 指定されたMesosクラスタに接続します。ポートは、使用するように設定されているポートである必要があります(デフォルトは5050)。または、ZooKeeperを使用するMesosクラスタの場合は、mesos://zk://... を使用します。--deploy-mode cluster で送信するには、HOST:PORTをMesosClusterDispatcherに接続するように設定する必要があります。 |
yarn | --deploy-mode の値に応じて、client モードまたはcluster モードでYARNクラスタに接続します。クラスタの場所は、HADOOP_CONF_DIR またはYARN_CONF_DIR 変数に基づいて検出されます。 |
k8s://HOST:PORT | --deploy-mode の値に応じて、client モードまたはcluster モードでKubernetesクラスタに接続します。HOST とPORT はKubernetes APIサーバーを参照します。デフォルトではTLSを使用して接続します。安全でない接続を強制するには、k8s://http://HOST:PORT を使用できます。 |
ファイルからの設定の読み込み
spark-submit
スクリプトは、プロパティファイルからデフォルトのSpark設定値を読み込み、アプリケーションに渡すことができます。デフォルトでは、Sparkディレクトリのconf/spark-defaults.conf
からオプションを読み取ります。詳細については、デフォルト設定の読み込みに関するセクションを参照してください。
このようにしてデフォルトのSpark設定を読み込むと、spark-submit
への特定のフラグの必要性をなくすことができます。spark.master
プロパティが設定されている場合、spark-submit
から--master
フラグを省略できます。一般的に、SparkConf
で明示的に設定された設定値が最も優先され、次にspark-submit
に渡されたフラグ、次にデフォルトファイルの値が続きます。
設定オプションの出所が不明な場合は、--verbose
オプションを使用してspark-submit
を実行することで、詳細なデバッグ情報を表示できます。
高度な依存関係管理
spark-submit
を使用する場合、アプリケーションのJARファイルと--jars
オプションで指定されたJARファイルは、自動的にクラスタに転送されます。--jars
の後に指定するURLは、コンマで区切る必要があります。このリストは、ドライバとエグゼキュータのクラスパスに含まれます。--jars
では、ディレクトリの展開は機能しません。
Sparkは、JARファイルの配布に様々な戦略を可能にするために、以下のURLスキームを使用します。
- file: - 絶対パスと
file:/
URIは、ドライバのHTTPファイルサーバによって提供され、各エグゼキュータはドライバのHTTPサーバからファイルを取得します。 - hdfs:、http:、https:、ftp: - これらは、期待通りにURIからファイルとJARファイルをダウンロードします。
- local: - local:/で始まるURIは、各ワーカーノード上にローカルファイルとして存在することが期待されます。これは、ネットワークI/Oが発生せず、各ワーカーにプッシュされるか、NFS、GlusterFSなどで共有される大規模なファイル/JARファイルに適しています。
JARファイルとファイルは、エグゼキュータノード上の各SparkContextの作業ディレクトリにコピーされることに注意してください。これは、時間の経過とともにかなりの容量を消費し、クリーンアップする必要があります。YARNではクリーンアップは自動的に処理され、Sparkスタンドアロンモードでは、spark.worker.cleanup.appDataTtl
プロパティを使用して自動クリーンアップを構成できます。
ユーザーは、--packages
にコンマ区切りのMaven座標リストを提供することで、他の依存関係を含めることもできます。このコマンドを使用すると、すべての推移的依存関係が処理されます。追加のリポジトリ(またはSBTのリゾルバ)は、--repositories
フラグを使用してコンマ区切りで追加できます。(パスワードで保護されたリポジトリの資格情報は、https://user:password@host/...
のように、場合によってはリポジトリURIに提供できます。この方法で資格情報を提供する際には注意してください。)これらのコマンドは、pyspark
、spark-shell
、spark-submit
で使用して、Sparkパッケージを含めることができます。
Pythonでは、同等の--py-files
オプションを使用して、.egg
、.zip
、.py
ライブラリをエグゼキュータに配布できます。
詳細情報
アプリケーションをデプロイしたら、「クラスタモードの概要」で、分散実行に関与するコンポーネントと、アプリケーションの監視およびデバッグ方法について説明しています。