ジョブスケジューリング

概要

Spark には、計算間でリソースをスケジューリングするためのいくつかの機能があります。まず、クラスターモードの概要で説明したように、各 Spark アプリケーション (SparkContext のインスタンス) は独立した Executor プロセスのセットを実行することを思い出してください。Spark が実行されるクラスターマネージャーは、アプリケーション間のスケジューリングのための機能を提供します。次に、各 Spark アプリケーションの内部では、異なるスレッドから送信された場合、複数の「ジョブ」(Spark アクション) が並列して実行される可能性があります。これは、アプリケーションがネットワーク経由でリクエストを処理している場合に一般的です。Spark には、各 SparkContext 内でリソースをスケジューリングするためのフェアスケジューラーが含まれています。

アプリケーション間のスケジューリング

クラスター上で実行する場合、各 Spark アプリケーションは、そのアプリケーションのタスクのみを実行し、データを格納する Executor JVM の独立したセットを取得します。複数のユーザーがクラスターを共有する必要がある場合、クラスターマネージャーに応じて、割り当てを管理するためのさまざまなオプションがあります。

すべてのクラスターマネージャーで利用可能な最も簡単なオプションは、リソースの静的パーティショニングです。このアプローチでは、各アプリケーションには使用できる最大リソース量が与えられ、その期間全体で保持されます。これは、Spark のスタンドアロンおよびYARNモード、さらにK8sモードで使用されるアプローチです。リソースの割り当ては、クラスターのタイプに基づいて次のように設定できます。

現在、どのモードもアプリケーション間でメモリを共有する機能を提供していないことに注意してください。この方法でデータを共有したい場合は、同じ RDD をクエリすることで複数のリクエストを処理できる単一のサーバーアプリケーションを実行することをお勧めします。

動的リソース割り当て

Spark は、ワークロードに基づいてアプリケーションが占有するリソースを動的に調整するメカニズムを提供します。これは、アプリケーションが使用されなくなったリソースをクラスターに返却し、後で需要が発生したときに再度要求できることを意味します。この機能は、複数のアプリケーションが Spark クラスターでリソースを共有する場合に特に役立ちます。

この機能はデフォルトでは無効になっており、すべての疎結合クラスターマネージャー、つまりスタンドアロンモードYARNモードK8sモードで利用できます。

設定とセットアップ

この機能を使用するにはいくつかの方法があります。どの方法を選択した場合でも、アプリケーションは最初に spark.dynamicAllocation.enabledtrue に設定する必要があります。さらに、

外部シャフルサービス、シャフル追跡、または ShuffleDriverComponents の信頼性の高いストレージサポートの目的は、Executor が削除されても、それによって書き込まれたシャフルファイルを削除せずに済むようにすることです(詳細については以下で説明します)。シャフル追跡を有効にするのは簡単ですが、外部シャフルサービスの設定方法はクラスターマネージャーによって異なります。

スタンドアロンモードでは、spark.shuffle.service.enabledtrue に設定してワーカーを起動するだけです。

YARN モードでは、こちらの説明に従ってください。

その他すべての関連設定はオプションであり、spark.dynamicAllocation.* および spark.shuffle.service.* 名前空間にあります。詳細については、設定ページを参照してください。

注意点

リソース割り当てポリシー

高レベルでは、Spark は使用されなくなった Executor を解放し、必要になったときに Executor を取得する必要があります。間もなく削除される Executor が近い将来タスクを実行するかどうか、または間もなく追加される新しい Executor が実際にアイドル状態になるかどうかを確実に予測する方法がないため、Executor を削除および要求するタイミングを決定するためのヒューリスティックのセットが必要です。

リクエストポリシー

動的割り当てが有効になっている Spark アプリケーションは、スケジュール待ちのタスクがある場合に、追加の Executor を要求します。この条件は、既存の Executor セットが、提出されたがまだ完了していないすべてのタスクを同時に飽和させるのに不十分であることを必然的に意味します。

Spark はラウンドで Executor を要求します。実際の要求は、spark.dynamicAllocation.schedulerBacklogTimeout 秒間保留中のタスクがあった場合にトリガーされ、その後、保留中のタスクキューが持続する場合、さらに spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 秒ごとにトリガーされます。さらに、各ラウンドで要求される Executor の数は、前のラウンドから指数関数的に増加します。たとえば、アプリケーションは最初のラウンドで 1 つの Executor を追加し、その後、後続のラウンドで 2、4、8... というように Executor を追加します。

指数関数的な増加ポリシーの動機は 2 つあります。第一に、アプリケーションは、少数の追加 Executor で十分であることが判明した場合に備えて、最初は慎重に Executor を要求する必要があります。これは TCP スロースタートの正当化を反映しています。第二に、多くの Executor が実際に必要であることが判明した場合に、アプリケーションはリソース使用量をタイムリーに増やすことができる必要があります。

削除ポリシー

Executor を削除するためのポリシーははるかに単純です。Spark アプリケーションは、spark.dynamicAllocation.executorIdleTimeout 秒以上アイドル状態の Executor を削除します。ほとんどの状況では、Executor がアイドル状態であってはならないという点で、この条件は要求条件と相互排他的であることに注意してください。スケジュール待ちのタスクがまだ存在している場合。

Executor の正常な廃止

動的割り当ての前では、関連するアプリケーションも終了したときに Spark Executor が終了した場合、Executor に関連付けられたすべての状態は不要になり、安全に破棄できます。しかし、動的割り当てでは、Executor が明示的に削除されるときにアプリケーションはまだ実行中です。アプリケーションが Executor に格納されている、または Executor によって書き込まれた状態にアクセスしようとすると、状態を再計算する必要があります。したがって、Spark は、Executor を削除する前にその状態を保持することにより、Executor を正常に廃止するためのメカニズムを必要とします。

この要件は、特にシャフルにとって重要です。シャフルの間、Spark Executor は最初に自身のマップ出力をローカルディスクに書き込み、次に他の Executor がそれらを取得しようとするときにそれらのファイルのリモートサーバーとして機能します。ストレッジャー(同僚よりもはるかに長い時間実行されるタスク)の場合、動的割り当てはシャフルが完了する前に Executor を削除する可能性があります。この場合、その Executor によって書き込まれたシャフルファイルは不必要に再計算する必要がある場合があります。

シャフルファイルを保持するための解決策は、Spark 1.2 で導入された外部シャフルサービスを使用することです。このサービスは、Spark アプリケーションとその Executor とは独立して、クラスターの各ノードで実行される長時間実行プロセスを指します。サービスが有効になっている場合、Spark Executor は互いにではなく、サービスからシャフルファイルを取得します。これは、Executor によって書き込まれたシャフル状態は、Executor のライフサイクルを超えて提供され続ける可能性があることを意味します。

シャフルファイルを書き込むことに加えて、Executor はディスクまたはメモリにデータをキャッシュします。しかし、Executor が削除されると、キャッシュされたすべてのデータにアクセスできなくなります。これを軽減するために、デフォルトではキャッシュされたデータを含む Executor は決して削除されません。この動作は spark.dynamicAllocation.cachedExecutorIdleTimeout で設定できます。 spark.shuffle.service.fetch.rdd.enabledtrue に設定すると、Spark はディスク永続化 RDD ブロックの取得に ExternalShuffleService を使用できます。動的割り当ての場合、この機能が有効になっていると、ディスク永続化ブロックのみを持つ Executor は spark.dynamicAllocation.executorIdleTimeout 後にアイドル状態と見なされ、それに応じて解放されます。将来のリリースでは、キャッシュされたデータは、外部シャフルサービスを介してシャフルファイルが保持されるのと同様の精神で、オフヒープストレージを介して保持される可能性があります。

アプリケーション内のスケジューリング

指定された Spark アプリケーション (SparkContext インスタンス) 内では、別々のスレッドから送信された場合、複数の並列ジョブが同時に実行される可能性があります。このセクションでは「ジョブ」は、Spark アクション (例: savecollect) と、そのアクションを評価するために実行する必要があるすべてのタスクを意味します。Spark のスケジューラーは完全にスレッドセーフであり、このユースケースをサポートして、複数のリクエスト (例: 複数のユーザーからのクエリ) を処理するアプリケーションを有効にします。

デフォルトでは、Spark のスケジューラーはジョブを FIFO 方式で実行します。各ジョブは「ステージ」(例: マップフェーズとリデュースフェーズ) に分割され、最初のジョブは、ステージに起動するタスクがある間、すべての利用可能なリソースに対して優先権を得ます。次に 2 番目のジョブが優先権を得ます。など。キューの先頭にあるジョブがクラスター全体を使用する必要がない場合、後続のジョブはすぐに開始できますが、キューの先頭にあるジョブが大きい場合、後続のジョブは大幅に遅延する可能性があります。

Spark 0.8 以降では、ジョブ間で公平な共有を設定することも可能になりました。公平な共有の下では、Spark は「ラウンドロビン」方式でジョブ間でタスクを割り当てるため、すべてのジョブがクラスターリソースをほぼ均等に共有します。これは、長いジョブが実行中に送信された短いジョブがすぐにリソースの割り当てを開始でき、長いジョブが完了するのを待つことなく、良好な応答時間を取得できることを意味します。このモードはマルチユーザー環境に最適です。

この機能はデフォルトで無効になっており、すべての疎結合クラスターマネージャー、つまりスタンドアロンモードYARNモードK8sモードで利用できます。フェアスケジューラーを有効にするには、SparkContext を設定する際に spark.scheduler.mode プロパティを FAIR に設定するだけです。

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

フェアスケジューラープール

フェアスケジューラーは、ジョブをプールにグループ化し、各プールに異なるスケジューリングオプション (例: 重み) を設定することもサポートしています。これは、たとえば、より重要なジョブのために「高優先度」プールを作成したり、各ユーザーのジョブをグループ化してユーザーが同時に実行しているジョブの数に関係なく均等な共有を得るようにしたりするのに役立ちます。このアプローチは、Hadoop Fair Schedulerにモデル化されています。

介入なしでは、新しく送信されたジョブはデフォルトプールに入りますが、ジョブのプールは、それらを送信しているスレッドの SparkContext に spark.scheduler.pool 「ローカルプロパティ」を追加することで設定できます。これは次のように行われます。

// Assuming sc is your SparkContext variable
sc.setLocalProperty("spark.scheduler.pool", "pool1")

このローカルプロパティを設定した後、このスレッド内で送信されたすべてのジョブ (このスレッドでの RDD.savecountcollect などの呼び出しによる) はこのプール名を使用します。設定はスレッドごとに個別に設定されるため、1 つのスレッドで同じユーザーの代わりに複数のジョブを実行しやすくなります。スレッドに関連付けられているプールをクリアしたい場合は、単純に呼び出します。

sc.setLocalProperty("spark.scheduler.pool", null)

プールのデフォルト動作

デフォルトでは、各プールはクラスターの均等な共有 (デフォルトプールの各ジョブとも均等) を取得しますが、各プール内ではジョブは FIFO 順序で実行されます。たとえば、ユーザーごとに 1 つのプールを作成すると、各ユーザーはクラスターの均等な共有を取得し、各ユーザーのクエリは、後続のクエリがそのユーザーの以前のクエリからリソースを奪うのではなく、順序で実行されることになります。

プールプロパティの設定

特定のプールのプロパティは、設定ファイルを通じて変更することもできます。各プールは 3 つのプロパティをサポートしています。

プールプロパティは、conf/fairscheduler.xml.template に類似した XML ファイルを作成し、fairscheduler.xml という名前のファイルをクラスパスに置くか、SparkConfspark.scheduler.allocation.file プロパティを設定することによって設定できます。ファイルパスは Hadoop の設定を尊重し、ローカルファイルパスまたは HDFS ファイルパスのいずれかになります。

// scheduler file at local
conf.set("spark.scheduler.allocation.file", "file:///path/to/file")
// scheduler file at hdfs
conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")

XML ファイルの形式は、各プールに対して単純な <pool> 要素であり、その中にさまざまな設定のための異なる要素があります。たとえば、

<?xml version="1.0"?>
<allocations>
  <pool name="production">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

conf/fairscheduler.xml.template にも完全な例があります。XML ファイルで構成されていないプールは、すべての設定 (スケジューリングモード FIFO、重み 1、minShare 0) に対してデフォルト値を取得するだけです。

JDBC 接続を使用したスケジューリング

フェアスケジューラプールを JDBC クライアントセッションに設定するには、ユーザーは spark.sql.thriftserver.scheduler.pool 変数を設定できます。

SET spark.sql.thriftserver.scheduler.pool=accounting;

PySpark における並列ジョブ

PySpark は、デフォルトでは PVM スレッドと JVM スレッドの同期をサポートしておらず、複数の PVM スレッドで複数のジョブを起動しても、各ジョブが対応する JVM スレッドで起動されることは保証されません。この制限のため、別の PVM スレッドで sc.setJobGroup を介して異なるジョブグループを設定することはできず、後で sc.cancelJobGroup を介してジョブをキャンセルすることもできません。

pyspark.InheritableThread は、PVM スレッドが JVM スレッド内のローカルプロパティなどの継承可能な属性を継承するために一緒に使用することが推奨されます。