ジョブスケジューリング
概要
Spark は、計算間でリソースをスケジューリングするためのいくつかの機能を備えています。まず、クラスタモードの概要で説明されているように、各 Spark アプリケーション (SparkContext のインスタンス) は、独立したエクゼキュータプロセスセットを実行します。Spark が実行されるクラスタマネージャは、アプリケーション間のスケジューリングのための機能を提供します。次に、各 Spark アプリケーション*内*では、複数の「ジョブ」 (Spark アクション) が、異なるスレッドによって送信された場合、同時に実行される可能性があります。これは、アプリケーションがネットワーク経由でリクエストを処理している場合に一般的です。Spark には、各 SparkContext 内でリソースをスケジューリングするための Fair スケジューラ が含まれています。
アプリケーション間のスケジューリング
クラスタ上で実行する場合、各 Spark アプリケーションは、そのアプリケーションのタスクの実行とデータの格納のみを行う、独立したエクゼキュータ JVM のセットを取得します。複数のユーザーがクラスタを共有する必要がある場合、クラスタマネージャに応じて、割り当てを管理するためのさまざまなオプションがあります。
すべてのクラスタマネージャで使用可能な最も簡単なオプションは、リソースの*静的パーティション分割*です。このアプローチでは、各アプリケーションは使用できるリソースの最大量を与えられ、その期間中はそれを保持します。これは、Spark の スタンドアロンモードと YARN モード、および 粗粒度 Mesos モードで使用されるアプローチです。リソース割り当ては、クラスタタイプに基づいて次のように設定できます。
- **スタンドアロンモード:** デフォルトでは、スタンドアロンモードクラスタに送信されたアプリケーションは FIFO (先入れ先出し) 順序で実行され、各アプリケーションは利用可能なすべてのノードを使用しようとします。アプリケーションが使用するノード数を制限するには、
spark.cores.max
設定プロパティを設定するか、この設定を行わないアプリケーションのデフォルトをspark.deploy.defaultCores
で変更します。最後に、コアの制御に加えて、各アプリケーションのspark.executor.memory
設定は、そのメモリ使用量を制御します。 - **Mesos:** Mesos で静的パーティション分割を使用するには、
spark.mesos.coarse
設定プロパティをtrue
に設定し、オプションでspark.cores.max
を設定して、スタンドアロンモードと同様に各アプリケーションのリソース共有を制限します。また、spark.executor.memory
を設定して、エクゼキュータメモリを制御する必要があります。 - **YARN:** Spark YARN クライアントの
--num-executors
オプションは、クラスタに割り当てるエクゼキュータの数 (spark.executor.instances
設定プロパティ) を制御し、--executor-memory
(spark.executor.memory
設定プロパティ) と--executor-cores
(spark.executor.cores
設定プロパティ) は、エクゼキュータあたりのリソースを制御します。詳細については、YARN Spark プロパティ を参照してください。
Mesos で使用可能な 2 番目のオプションは、CPU コアの*動的共有*です。このモードでは、各 Spark アプリケーションは依然として固定された独立したメモリ割り当て ( spark.executor.memory
で設定) を持ちますが、アプリケーションがマシン上でタスクを実行していない場合、他のアプリケーションがそれらのコア上でタスクを実行できます。このモードは、個別のユーザーからのシェルセッションなど、それほどアクティブではない多数のアプリケーションが予想される場合に役立ちます。ただし、アプリケーションが作業を行う際にノードのコアを取り戻すのに時間がかかる可能性があるため、予測可能性の低いレイテンシのリスクが伴います。このモードを使用するには、単に mesos://
URL を使用し、spark.mesos.coarse
を false に設定します。
現在のところ、どのモードもアプリケーション間でメモリ共有を提供していないことに注意してください。このようにデータを共有したい場合は、同じ RDD をクエリすることによって複数のリクエストを処理できる単一のサーバーアプリケーションを実行することをお勧めします。
動的リソース割り当て
Spark は、ワークロードに基づいてアプリケーションが占有するリソースを動的に調整するメカニズムを提供します。これは、アプリケーションが使用されなくなったリソースをクラスタに返却し、後で需要があるときに再びリクエストできることを意味します。この機能は、複数のアプリケーションが Spark クラスタのリソースを共有する場合に特に役立ちます。
この機能はデフォルトで無効になっており、すべての粗粒度クラスタマネージャ、つまり スタンドアロンモード、YARN モード、Mesos 粗粒度モード、および K8s モード で使用できます。
注意事項
- スタンドアロンモードでは、
spark.executor.cores
を明示的に設定しないと、各エクゼキュータはワーカーの利用可能なすべてのコアを取得します。この場合、動的割り当てが有効になっていると、spark は予期したよりもはるかに多くのエクゼキュータを取得する可能性があります。スタンドアロンモードで動的割り当てを使用する場合は、問題 SPARK-30299 が修正される前に、各エクゼキュータのコアを明示的に設定することをお勧めします。
設定とセットアップ
この機能を使用するには、いくつかの方法があります。どのアプローチを選択する場合でも、アプリケーションは最初に spark.dynamicAllocation.enabled
を true
に設定する必要があります。さらに、
- 同じクラスタ内の各ワーカーノードで*外部シャッフルサービス*を設定した後、アプリケーションは
spark.shuffle.service.enabled
をtrue
に設定する必要があります。または、 - アプリケーションは
spark.dynamicAllocation.shuffleTracking.enabled
をtrue
に設定する必要があります。または、 - アプリケーションは
spark.decommission.enabled
とspark.storage.decommission.shuffleBlocks.enabled
の両方をtrue
に設定する必要があります。または、 - アプリケーションは、信頼性の高いストレージをサポートするカスタム
ShuffleDataIO
であるShuffleDriverComponents
を使用するために、spark.shuffle.sort.io.plugin.class
を設定する必要があります。
外部シャッフルサービスまたはシャッフルトラッキングまたは信頼性の高いストレージをサポートする ShuffleDriverComponents
の目的は、それらによって書き込まれたシャッフルファイルを削除せずにエクゼキュータを削除できるようにすることです (詳細は以下で説明します)。シャッフルトラッキングを有効にするのは簡単ですが、外部シャッフルサービスを設定する方法は、クラスタマネージャによって異なります。
スタンドアロンモードでは、spark.shuffle.service.enabled
を true
に設定してワーカーを起動するだけです。
Mesos 粗粒度モードでは、spark.shuffle.service.enabled
を true
に設定して、すべてのワーカーノードで $SPARK_HOME/sbin/start-mesos-shuffle-service.sh
を実行します。たとえば、Marathon を介して実行できます。
YARN モードでは、こちらの指示に従ってください。
その他すべての関連設定はオプションであり、spark.dynamicAllocation.*
および spark.shuffle.service.*
名前空間の下にあります。詳細については、設定ページ を参照してください。
リソース割り当てポリシー
高度なレベルでは、Spark は使用されなくなったエクゼキュータを解放し、必要なときにエクゼキュータを取得する必要があります。削除されようとしているエクゼキュータが近い将来にタスクを実行するかどうか、または追加されようとしている新しいエクゼキュータが実際にアイドル状態になるかどうかを予測する決定的な方法がないため、いつエクゼキュータを削除およびリクエストするかを決定するための一連のヒューリスティックが必要です。
リクエストポリシー
動的割り当てが有効になっている Spark アプリケーションは、スケジュールされるのを待機している保留中のタスクがある場合、追加のエクゼキュータをリクエストします。この条件は、必然的に、既存のエクゼキュータセットが、送信済みだがまだ完了していないすべてのタスクを同時に飽和させるのに不十分であることを意味します。
Spark は、ラウンド単位でエクゼキュータをリクエストします。実際のリクエストは、spark.dynamicAllocation.schedulerBacklogTimeout
秒 동안 보류 중인 작업이 있었을 때 트리거되고, 그 이후 보류 중인 작업 대기열이 지속되면 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
초마다 다시 트리거됩니다. 또한 각 라운드에서 요청되는 실행기의 수는 이전 라운드에서 기하급수적으로 증가합니다. 예를 들어 애플리케이션은 첫 번째 라운드에서 실행기를 1개 추가한 다음 후속 라운드에서 2, 4, 8 등의 실행기를 추가합니다.
指数関数的増加ポリシーの動機は 2 つあります。まず、アプリケーションは、少数の追加のエクゼキュータで十分であることが判明した場合に備えて、最初にエクゼキュータを慎重にリクエストする必要があります。これは、TCP スロースタートの正当化と一致しています。次に、多くのエクゼキュータが実際に必要であることが判明した場合、アプリケーションはリソースの使用量を適時に増やすことができなければなりません。
削除ポリシー
エクゼキュータを削除するためのポリシーははるかに簡単です。Spark アプリケーションは、spark.dynamicAllocation.executorIdleTimeout
秒以上アイドル状態になっているエクゼキュータを削除します。ほとんどの場合、スケジュールされる保留中のタスクがまだある場合、エクゼキュータはアイドル状態にならないため、この条件はリクエスト条件と相互に排他的であることに注意してください。
エクゼキュータのグレースフルデコミッション
動的割り当ての前に、関連付けられたアプリケーションも終了したときに Spark エクゼキュータが終了した場合、エクゼキュータに関連付けられたすべての状態は不要になり、安全に破棄できます。ただし、動的割り当てでは、エクゼキュータが明示的に削除されたときにアプリケーションはまだ実行されています。アプリケーションがエクゼキュータに格納されている状態またはエクゼキュータによって書き込まれた状態にアクセスしようとすると、状態の再計算を実行する必要があります。したがって、Spark は、エクゼキュータを削除する前にその状態を保持することによって、エクゼキュータを正常に廃止するメカニズムを必要とします。
この要件は、シャッフルにおいて特に重要です。シャッフル中、Sparkエグゼキュータはまず独自のマップ出力をローカルディスクに書き込み、その後、他のエグゼキュータがそれらのファイルを取得しようとするときに、それらのファイルのサーバーとして機能します。ピアよりもはるかに長く実行されるタスクであるストラグラーが発生した場合、動的割り当てによってシャッフルが完了する前にエグゼキュータが削除される可能性があり、その場合、そのエグゼキュータによって書き込まれたシャッフルファイルは不必要に再計算する必要があります。
シャッフルファイルを保持するためのソリューションは、Spark 1.2で導入された外部シャッフルサービスを使用することです。このサービスは、Sparkアプリケーションとそのエグゼキュータとは独立して、クラスタの各ノードで実行される長時間実行プロセスを指します。サービスが有効になっている場合、Sparkエグゼキュータは互いにではなく、サービスからシャッフルファイルを取得します。これは、エグゼキュータによって書き込まれたシャッフル状態が、エグゼキュータの有効期間を超えても引き続き提供される可能性があることを意味します。
シャッフルファイルの書き込みに加えて、エグゼキュータはディスクまたはメモリにデータをキャッシュします。ただし、エグゼキュータが削除されると、キャッシュされたすべてのデータにアクセスできなくなります。これを軽減するために、デフォルトでは、キャッシュされたデータを含むエグゼキュータは削除されません。この動作は、spark.dynamicAllocation.cachedExecutorIdleTimeout
で設定できます。spark.shuffle.service.fetch.rdd.enabled
を true
に設定すると、Sparkはディスクに永続化されたRDDブロックの取得にExternalShuffleServiceを使用できます。動的割り当ての場合、この機能が有効になっていると、ディスクに永続化されたブロックのみを持つエグゼキュータは、spark.dynamicAllocation.executorIdleTimeout
後にアイドル状態と見なされ、それに応じて解放されます。将来のリリースでは、外部シャッフルサービスを介してシャッフルファイルが保持されるのと同様の精神で、オフヒープストレージを介してキャッシュされたデータが保持される可能性があります。
アプリケーション内スケジューリング
特定のSparkアプリケーション(SparkContextインスタンス)内では、別々のスレッドから送信された場合、複数の並列ジョブを同時に実行できます。このセクションでは、「ジョブ」とは、Sparkアクション(例:save
、collect
)とそのアクションを評価するために実行する必要があるタスクを意味します。Sparkのスケジューラは完全にスレッドセーフであり、複数のリクエスト(例:複数のユーザーのクエリ)を提供するアプリケーションを有効にするために、このユースケースをサポートしています。
デフォルトでは、SparkのスケジューラはFIFO方式でジョブを実行します。各ジョブは「ステージ」(例:マップフェーズとリデュースフェーズ)に分割され、最初のジョブは、ステージに起動するタスクがある間、すべてのリソースで優先順位が付けられ、次に2番目のジョブが優先順位が付けられます。キューの先頭のジョブがクラスタ全体を使用する必要がない場合、後のジョブはすぐに実行を開始できますが、キューの先頭のジョブが大きい場合、後のジョブは大幅に遅れる可能性があります。
Spark 0.8以降、ジョブ間の公平な共有を設定することもできます。公平な共有では、Sparkはジョブ間でタスクを「ラウンドロビン」方式で割り当て、すべてのジョブがクラスタリソースをほぼ均等に共有できるようにします。これは、長時間実行されているジョブの実行中に送信された短いジョブが、長時間実行されているジョブの完了を待たずに、すぐにリソースの受信を開始し、良好な応答時間を得ることができることを意味します。このモードは、マルチユーザー設定に最適です。
この機能はデフォルトで無効になっており、すべての粗粒度クラスタマネージャ、つまりスタンドアロンモード、YARNモード、K8sモード、およびMesos粗粒度モードで使用できます。フェアスケジューラを有効にするには、SparkContextを設定するときに、spark.scheduler.mode
プロパティを FAIR
に設定するだけです。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)
Fair Scheduler プール
フェアスケジューラは、ジョブを*プール*にグループ化し、プールごとに異なるスケジューリングオプション(例:重み)を設定することもサポートしています。これは、たとえば、より重要なジョブの「高優先度」プールを作成したり、各ユーザーのジョブをグループ化して、*ジョブ*に均等なシェアを与えるのではなく、いくつの同時ジョブがあるかに関係なく*ユーザー*に均等なシェアを与えるのに役立ちます。このアプローチは、Hadoop Fair Schedulerをモデルにしています。
介入なしに、新しく送信されたジョブは*デフォルトプール*に入りますが、ジョブのプールは、送信しているスレッドのSparkContextにspark.scheduler.pool
「ローカルプロパティ」を追加することで設定できます。これは次のように行われます。
// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")
このローカルプロパティを設定した後、このスレッド内で送信された*すべて*のジョブ(このスレッドでのRDD.save
、count
、collect
などへの呼び出しによる)はこのプール名を使用します。設定はスレッドごとに行われ、スレッドが同じユーザーに代わって複数のジョブを実行しやすくなります。スレッドに関連付けられているプールをクリアするには、単に次を呼び出します。
sc.setLocalProperty("spark.scheduler.pool", null)
プールのデフォルト動作
デフォルトでは、各プールはクラスタの均等なシェアを取得しますが(デフォルトプールの各ジョブのシェアも均等です)、各プール内では、ジョブはFIFO順に実行されます。たとえば、ユーザーごとに1つのプールを作成する場合、これは、各ユーザーがクラスタの均等なシェアを取得し、各ユーザーのクエリが、後のクエリがそのユーザーの以前のクエリからリソースを取得するのではなく、順番に実行されることを意味します。
プールプロパティの設定
特定のプールのプロパティは、構成ファイルからも変更できます。各プールは3つのプロパティをサポートしています。
schedulingMode
: これはFIFOまたはFAIRにすることができ、プール内のジョブが互いにキューイングする(デフォルト)か、プールのリソースを公平に共有するかを制御します。weight
: これは、他のプールに対するプールのクラスタのシェアを制御します。デフォルトでは、すべてのプールの重みは1です。たとえば、特定のプールに重み2を指定すると、他のアクティブなプールの2倍のリソースが取得されます。1000などの高い重みを設定すると、プール間に*優先順位*を実装することもできます。本質的に、重み1000のプールは、アクティブなジョブがあるときはいつでも、常に最初にタスクを起動できます。minShare
: 全体の重みに加えて、各プールには、管理者が希望する*最小シェア*(CPUコア数として)を与えることができます。フェアスケジューラは、重みに従って追加のリソースを再配布する前に、常にすべてのアクティブなプールの最小シェアを満たそうとします。したがって、minShare
プロパティは、プールが常に一定量のリソース(例:10コア)を迅速に取得できることを保証する別の方法であり、クラスタの残りの部分に対して高い優先順位を与える必要はありません。デフォルトでは、各プールのminShare
は0です。
プールプロパティは、conf/fairscheduler.xml.template
と同様にXMLファイルを作成し、fairscheduler.xml
という名前のファイルをクラスパスに配置するか、SparkConfでspark.scheduler.allocation.file
プロパティを設定することで設定できます。ファイルパスはhadoop設定を尊重し、ローカルファイルパスまたはHDFSファイルパスのいずれかになります。
// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
XMLファイルの形式は、プールごとに単純な<pool>
要素であり、さまざまな設定のためのさまざまな要素がその中にあります。例えば
<?xml version="1.0"?>
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
完全な例は、conf/fairscheduler.xml.template
にもあります。XMLファイルで設定されていないプールは、すべて設定のデフォルト値(スケジューリングモードFIFO、重み1、minShare 0)を取得することに注意してください。
JDBC 接続を使用したスケジューリング
JDBCクライアントセッションにFair Schedulerプールを設定するには、ユーザーはspark.sql.thriftserver.scheduler.pool
変数を設定できます。
SET spark.sql.thriftserver.scheduler.pool=accounting;
PySpark における同時ジョブ
PySparkは、デフォルトでは、PVMスレッドとJVMスレッドの同期をサポートしておらず、複数のPVMスレッドで複数のジョブを起動しても、対応する各JVMスレッドで各ジョブを起動することは保証されません。この制限により、個別のPVMスレッドでsc.setJobGroup
を介して異なるジョブグループを設定できず、後でsc.cancelJobGroup
を介してジョブをキャンセルすることもできません。
PVMスレッドがJVMスレッドのローカルプロパティなどの継承可能な属性を継承するには、pyspark.InheritableThread
を一緒に使用することをお勧めします。