Spark設定
- Sparkプロパティ
- 環境変数
- ロギングの設定
- 設定ディレクトリのオーバーライド
- Hadoopクラスタ設定の継承
- カスタムHadoop/Hive設定
- カスタムリソーススケジューリングと設定の概要
- ステージレベルスケジューリングの概要
- プッシュベースシャッフルの概要
Sparkはシステムを設定するための3つの場所を提供します
- Sparkプロパティは、ほとんどのアプリケーションパラメータを制御し、SparkConfオブジェクトを使用するか、Javaシステムプロパティを通じて設定できます。
- 環境変数を使用して、各ノードの
conf/spark-env.sh
スクリプトを通じて、IPアドレスなどのマシンごとの設定を設定できます。 - ロギングは
log4j2.properties
で設定できます。
Sparkプロパティ
Sparkプロパティは、ほとんどのアプリケーション設定を制御し、各アプリケーションごとに個別に設定されます。これらのプロパティは、SparkContext
に渡されるSparkConfで直接設定できます。SparkConf
を使用すると、一般的なプロパティ(マスターURLやアプリケーション名など)の一部と、set()
メソッドによる任意のキーと値のペアを設定できます。たとえば、2つのスレッドでアプリケーションを初期化するには、次のようになります。
local[2]で実行していることに注意してください。これは2つのスレッドを意味し、「最小限の」並列処理を表します。これにより、分散環境でのみ発生するバグを検出するのに役立ちます。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
ローカルモードでは1つ以上のスレッドを使用でき、Spark Streamingなどでは、飢餓状態を防ぐために実際には1つ以上のスレッドが必要になる場合があります。
時間の長さを指定するプロパティは、時間単位で設定する必要があります。次の形式が受け入れられます。
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
バイトサイズを指定するプロパティは、サイズ単位で設定する必要があります。次の形式が受け入れられます。
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
単位のない数値は一般的にバイトとして解釈されますが、一部はKiBまたはMiBとして解釈されます。個々の設定プロパティのドキュメントを参照してください。可能な限り単位を指定することが望ましいです。
Sparkプロパティの動的ロード
SparkConf
で特定の設定をハードコーディングしない方が良い場合があります。たとえば、同じアプリケーションを異なるマスターまたは異なるメモリの量で実行する場合です。Sparkでは、空のconfを作成するだけで済みます。
val sc = new SparkContext(new SparkConf())
その後、実行時に設定値を指定できます。
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
Sparkシェルとspark-submit
ツールは、設定を動的にロードする2つの方法をサポートしています。1つ目は、上記のように--master
などのコマンドラインオプションです。spark-submit
は--conf/-c
フラグを使用して任意のSparkプロパティを受け入れることができますが、Sparkアプリケーションの起動に関係するプロパティには特別なフラグを使用します。./bin/spark-submit --help
を実行すると、これらのオプションの完全なリストが表示されます。
bin/spark-submit
は、各行が空白で区切られたキーと値で構成されるconf/spark-defaults.conf
からの設定オプションも読み取ります。たとえば
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
フラグまたはプロパティファイルで指定された値は、アプリケーションに渡され、SparkConfで指定された値とマージされます。SparkConfで直接設定されたプロパティは最高の優先順位を持ち、次にspark-submit
またはspark-shell
に渡されたフラグ、次にspark-defaults.conf
ファイル内のオプションが続きます。いくつかの設定キーはSparkの以前のバージョンから名前が変更されています。そのような場合、古いキー名は引き続き受け入れられますが、新しいキーのインスタンスよりも優先順位が低くなります。
Sparkプロパティは主に2種類に分類できます。1つは「spark.driver.memory」、「spark.executor.instances」など、デプロイに関連するものです。この種のプロパティは、実行時にSparkConf
を通じてプログラムで設定した場合に影響を受けない場合もあります。または、動作は選択したクラスタマネージャとデプロイモードによって異なります。そのため、設定ファイルまたはspark-submit
コマンドラインオプションを使用して設定することをお勧めします。もう1つは「spark.task.maxFailures」など、主にSparkランタイム制御に関連するものです。この種のプロパティはどちらの方法でも設定できます。
Sparkプロパティの表示
http://<driver>:4040
にあるアプリケーションWeb UIの「環境」タブにSparkプロパティがリストされます。これは、プロパティが正しく設定されていることを確認するために役立つ場所です。spark-defaults.conf
、SparkConf
、またはコマンドラインで明示的に指定された値のみが表示されることに注意してください。その他のすべての設定プロパティについては、デフォルト値が使用されていると想定できます。
使用可能なプロパティ
内部設定を制御するプロパティのほとんどは、妥当なデフォルト値を持っています。設定する最も一般的なオプションの一部を以下に示します。
アプリケーションプロパティ
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.app.name |
(なし) | アプリケーションの名前。これはUIとログデータに表示されます。 | 0.9.0 |
spark.driver.cores |
1 | クラスタモードでのみ、ドライバプロセスに使用するコア数。 | 1.3.0 |
spark.driver.maxResultSize |
1g | 各Sparkアクション(collectなど)のすべてのパーティションのシリアル化された結果の合計サイズの上限(バイト単位)。少なくとも1M、または無制限の場合は0にする必要があります。合計サイズがこの制限を超えると、ジョブは中止されます。上限を高くすると、ドライバでメモリ不足エラーが発生する可能性があります(spark.driver.memoryとJVM内のオブジェクトのメモリオーバーヘッドに依存します)。適切な制限を設定することで、ドライバをメモリ不足エラーから保護できます。 | 1.2.0 |
spark.driver.memory |
1g | ドライバプロセス(SparkContextが初期化される場所)で使用されるメモリの量。JVMメモリ文字列と同じ形式で、サイズ単位のサフィックス(「k」、「m」、「g」、または「t」)(例:512m 、2g )を使用します。注:クライアントモードでは、ドライバJVMはすでに開始されているため、この設定をアプリケーション内の SparkConf で直接設定することはできません。代わりに、--driver-memory コマンドラインオプションまたはデフォルトのプロパティファイルで設定してください。 |
1.1.1 |
spark.driver.memoryOverhead |
driverMemory * spark.driver.memoryOverheadFactor 、最小値は384 |
クラスタモードでドライバプロセスごとに割り当てられる非ヒープメモリの量(MiB単位、特に指定がない限り)。これは、VMオーバーヘッド、インターンの文字列、その他のネイティブオーバーヘッドなどを考慮したメモリです。これはコンテナサイズとともに増加する傾向があります(通常6〜10%)。このオプションは現在、YARN、Mesos、Kubernetesでサポートされています。注:非ヒープメモリには、オフヒープメモリ(spark.memory.offHeap.enabled=true の場合)と、同じコンテナで実行されている他のドライバプロセス(たとえば、PySparkドライバに付属するpythonプロセス)で使用されるメモリ、および他の非ドライバプロセスで使用されるメモリが含まれます。ドライバを実行するコンテナの最大メモリサイズは、spark.driver.memoryOverhead とspark.driver.memory の合計によって決まります。 |
2.3.0 |
spark.driver.memoryOverheadFactor |
0.10 | クラスタモードで、ドライバプロセスごとに追加の非ヒープメモリとして割り当てるドライバメモリの割合。これは、VMオーバーヘッド、インターンの文字列、その他のネイティブオーバーヘッドなどを考慮したメモリです。コンテナサイズとともに増加する傾向があります。この値は、Kubernetesの非JVMジョブを除いて、デフォルトで0.10になります。Kubernetesの非JVMジョブでは、デフォルトで0.40になります。これは、非JVMタスクではより多くの非JVMヒープ空間が必要であり、そのようなタスクは一般的に「Memory Overhead Exceeded」エラーで失敗するためです。この設定により、より高いデフォルト値でこのエラーを回避します。spark.driver.memoryOverhead が直接設定されている場合、この値は無視されます。 |
3.3.0 |
spark.driver.resource.{resourceName}.amount |
0 | ドライバで使用する特定のリソースの種類の量。これを使用する場合は、起動時にドライバがリソースを検出するためにspark.driver.resource.{resourceName}.discoveryScript も指定する必要があります。 |
3.0.0 |
spark.driver.resource.{resourceName}.discoveryScript |
なし | ドライバが特定のリソースの種類を検出するために実行するスクリプト。これは、ResourceInformationクラスの形式でJSON文字列をSTDOUTに出力する必要があります。これには、名前とアドレスの配列が含まれています。クライアントが送信したドライバの場合、ディスカバリスクリプトは、同じホスト上の他のドライバと比較して、このドライバに異なるリソースアドレスを割り当てる必要があります。 | 3.0.0 |
spark.driver.resource.{resourceName}.vendor |
なし | ドライバで使用するリソースのベンダー。このオプションは現在、Kubernetesでのみサポートされており、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインの両方です。(例:KubernetesのGPUの場合、この設定はnvidia.comまたはamd.comに設定されます) | 3.0.0 |
spark.resources.discoveryPlugin |
org.apache.spark.resource.ResourceDiscoveryScriptPlugin | アプリケーションにロードするorg.apache.spark.api.resource.ResourceDiscoveryPluginを実装するクラス名のカンマ区切りリスト。これは、高度なユーザーがリソース検出クラスをカスタム実装に置き換えるためです。Sparkは、指定された各クラスを試行し、それらのいずれかがそのリソースのリソース情報を返すまで続けます。プラグインのいずれもそのリソースの情報を返さない場合、最後にディスカバリスクリプトを試行します。 | 3.0.0 |
spark.executor.memory |
1g | エグゼキュータプロセスごとに使用するメモリの量。JVMメモリの文字列と同じ形式で、サイズ単位のサフィックス(「k」、「m」、「g」、「t」)付き(例:512m 、2g )。 |
0.7.0 |
spark.executor.pyspark.memory |
設定なし | 各エグゼキュータでPySparkに割り当てるメモリの量(特に指定がない限りMiB単位)。設定されている場合、エグゼキュータのPySparkメモリはこの量に制限されます。設定されていない場合、SparkはPythonのメモリ使用量を制限せず、アプリケーション側で他の非JVMプロセスと共有されるオーバーヘッドメモリ空間を超えないようにする必要があります。PySparkをYARNまたはKubernetesで実行する場合、このメモリはエグゼキュータのリソース要求に追加されます。 注:この機能はPythonの resource モジュールに依存しています。そのため、動作と制限は継承されます。たとえば、Windowsはリソース制限をサポートしておらず、MacOSでは実際のリソースは制限されません。 |
2.4.0 |
spark.executor.memoryOverhead |
executorMemory * spark.executor.memoryOverheadFactor 、最小値384 |
エグゼキュータプロセスごとに追加で割り当てるメモリの量(特に指定がない限りMiB単位)。これは、VMオーバーヘッド、インターンの文字列、その他のネイティブオーバーヘッドなどを考慮したメモリです。エグゼキュータのサイズとともに増加する傾向があります(通常6〜10%)。このオプションは現在、YARNとKubernetesでサポートされています。 注:追加メモリには、( spark.executor.pyspark.memory が設定されていない場合)PySparkエグゼキュータメモリと、同じコンテナで実行されている他の非エグゼキュータプロセスによって使用されるメモリが含まれます。エグゼキュータを実行するコンテナの最大メモリサイズは、spark.executor.memoryOverhead 、spark.executor.memory 、spark.memory.offHeap.size 、spark.executor.pyspark.memory の合計によって決定されます。 |
2.3.0 |
spark.executor.memoryOverheadFactor |
0.10 | エグゼキュータプロセスごとに追加の非ヒープメモリとして割り当てるエグゼキュータメモリの割合。これは、VMオーバーヘッド、インターンの文字列、その他のネイティブオーバーヘッドなどを考慮したメモリです。コンテナサイズとともに増加する傾向があります。この値は、Kubernetesの非JVMジョブを除いて、デフォルトで0.10になります。Kubernetesの非JVMジョブでは、デフォルトで0.40になります。これは、非JVMタスクではより多くの非JVMヒープ空間が必要であり、そのようなタスクは一般的に「Memory Overhead Exceeded」エラーで失敗するためです。この設定により、より高いデフォルト値でこのエラーを回避します。spark.executor.memoryOverhead が直接設定されている場合、この値は無視されます。 |
3.3.0 |
spark.executor.resource.{resourceName}.amount |
0 | エグゼキュータプロセスごとに使用する特定のリソースの種類の量。これを使用する場合は、起動時にエグゼキュータがリソースを検出するためにspark.executor.resource.{resourceName}.discoveryScript も指定する必要があります。 |
3.0.0 |
spark.executor.resource.{resourceName}.discoveryScript |
なし | エグゼキュータが特定のリソースの種類を検出するために実行するスクリプト。これは、ResourceInformationクラスの形式でJSON文字列をSTDOUTに出力する必要があります。これには、名前とアドレスの配列が含まれています。 | 3.0.0 |
spark.executor.resource.{resourceName}.vendor |
なし | エグゼキュータで使用するリソースのベンダー。このオプションは現在、Kubernetesでのみサポートされており、実際にはKubernetesデバイスプラグインの命名規則に従ったベンダーとドメインの両方です。(例:KubernetesのGPUの場合、この設定はnvidia.comまたはamd.comに設定されます) | 3.0.0 |
spark.extraListeners |
(なし) | SparkListener を実装するクラスのカンマ区切りのリスト。SparkContextを初期化すると、これらのクラスのインスタンスが作成され、Sparkのリスナーバスに登録されます。SparkConfを受け取る単一引数のコンストラクタを持つクラスの場合、そのコンストラクタが呼び出されます。そうでない場合は、引数のないコンストラクタが呼び出されます。有効なコンストラクタが見つからない場合、SparkContextの作成は例外で失敗します。 |
1.3.0 |
spark.local.dir |
/tmp | Sparkで「スクラッチ」空間(マップ出力ファイルやディスクに保存されるRDDなど)に使用するディレクトリ。これは、システムの高速なローカルディスクにある必要があります。異なるディスク上の複数のディレクトリのカンマ区切りリストにすることもできます。 注:これは、クラスタマネージャによって設定されたSPARK_LOCAL_DIRS(スタンドアロン)、MESOS_SANDBOX(Mesos)、またはLOCAL_DIRS(YARN)環境変数によってオーバーライドされます。 |
0.5.0 |
spark.logConf |
false | SparkContextが開始されるときに、有効なSparkConfをINFOとしてログに記録します。 | 0.9.0 |
spark.master |
(なし) | 接続するクラスタマネージャ。許可されたマスターURLのリストを参照してください。 | 0.9.0 |
spark.submit.deployMode |
client | Sparkドライバプログラムのデプロイモード。「client」または「cluster」のいずれかです。「client」はドライバプログラムをローカルで起動し、「cluster」はクラスタ内のノードの1つでリモートで起動することを意味します。 | 1.5.0 |
spark.log.callerContext |
(なし) | Yarn/HDFSで実行されている場合、Yarn RMログ/HDFS監査ログに書き込まれるアプリケーション情報。その長さは、Hadoopの設定hadoop.caller.context.max.size によって異なります。簡潔にする必要があり、通常は最大50文字になります。 |
2.2.0 |
spark.log.level |
(なし) | 設定されている場合、Spark起動時にSparkContext.setLogLevel() を呼び出すかのように、ユーザー定義のログ設定をオーバーライドします。有効なログレベルには、「ALL」、「DEBUG」、「ERROR」、「FATAL」、「INFO」、「OFF」、「TRACE」、「WARN」が含まれます。 |
3.5.0 |
spark.driver.supervise |
false | trueの場合、ゼロ以外の終了ステータスで失敗した場合、ドライバを自動的に再起動します。SparkスタンドアロンモードまたはMesosクラスタデプロイモードでのみ有効です。 | 1.3.0 |
spark.driver.log.dfsDir |
(なし) | spark.driver.log.persistToDfs.enabled がtrueの場合、Sparkドライバログが同期される基本ディレクトリ。この基本ディレクトリ内では、各アプリケーションはアプリケーション固有のファイルにドライバログを記録します。ユーザーは、ドライバログファイルを後で使用するために保持できるように、これをHDFSディレクトリなどの統一された場所に設定することをお勧めします。このディレクトリは、すべてのSparkユーザーがファイルの読み書きを許可し、Spark History Serverユーザーがファイルを削除できるようにする必要があります。さらに、spark.history.fs.driverlog.cleaner.enabled がtrueに設定されている場合、このディレクトリの古いログはSpark History Serverによって削除され、spark.history.fs.driverlog.cleaner.maxAge で設定された最大経過時間より古い場合に削除されます。 |
3.0.0 |
spark.driver.log.persistToDfs.enabled |
false | trueの場合、クライアントモードで実行されているSparkアプリケーションは、spark.driver.log.dfsDir で設定された永続ストレージにドライバログを書き込みます。spark.driver.log.dfsDir が設定されていない場合、ドライバログは保持されません。さらに、Spark History Serverでspark.history.fs.driverlog.cleaner.enabled をtrueに設定してクリーナーを有効にします。 |
3.0.0 |
spark.driver.log.layout |
%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex | spark.driver.log.dfsDir に同期されるドライバログのレイアウト。これが設定されていない場合、log4j2.propertiesで定義された最初のアプリエンダーのレイアウトを使用します。それも設定されていない場合、ドライバログはデフォルトのレイアウトを使用します。 |
3.0.0 |
spark.driver.log.allowErasureCoding |
false | ドライバログでイレージャコーディングを使用することを許可するかどうか。HDFSでは、イレージャコーディングされたファイルは通常のレプリケートファイルほど迅速には更新されないため、アプリケーションによって書き込まれた変更を反映するのに時間がかかる場合があります。これがtrueであっても、Sparkはファイルを強制的にイレージャコーディングを使用することはなく、ファイルシステムのデフォルトを使用するだけです。 | 3.0.0 |
spark.decommission.enabled |
false | 廃止が有効になっている場合、Sparkはエグゼキュータを正常にシャットダウンしようとします。spark.storage.decommission.enabled が有効になっている場合、SparkはすべてのRDDブロック(spark.storage.decommission.rddBlocks.enabled で制御)とシャッフルブロック(spark.storage.decommission.shuffleBlocks.enabled で制御)を廃止中のエグゼキュータからリモートエグゼキュータに移行しようとします。廃止が有効になっている場合、Sparkはspark.dynamicAllocation.enabled が有効になっている場合でも、エグゼキュータを強制終了する代わりに廃止します。 |
3.1.0 |
spark.executor.decommission.killInterval |
(なし) | 廃止されたエグゼキュータが外部(例:非Spark)サービスによって強制的に強制終了されるまでの期間。 | 3.1.0 |
spark.executor.decommission.forceKillTimeout |
(なし) | Sparkが廃止中のエグゼキュータの終了を強制するまでの期間。ほとんどの場合、この値は高く設定する必要があります。低い値に設定すると、ブロックの移行が完了するのに十分な時間がなくなるためです。 | 3.2.0 |
spark.executor.decommission.signal |
PWR | エグゼキュータの廃止開始をトリガーするために使用されるシグナル。 | 3.2.0 |
spark.executor.maxNumFailures |
numExecutors * 2、最小値3 | アプリケーションに失敗する前に許容されるエグゼキュータの最大失敗数。この設定は、YARN、または`spark.kubernetes.allocation.pods.allocator`が'direct'に設定されているKubernetesでのみ有効です。 | 3.5.0 |
spark.executor.failuresValidityInterval |
(なし) | 実行プログラムの失敗が独立したものであり、試行回数に累積されないものと見なされるまでの間隔。この設定は、YARN、または`spark.kubernetes.allocation.pods.allocator`が 'direct' に設定されている場合のKubernetesでのみ有効です。 | 3.5.0 |
これら以外にも、以下のプロパティが利用可能であり、状況によっては有用な場合があります。
ランタイム環境
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.driver.extraClassPath |
(なし) | ドライバのクラスパスにプリペンドする追加のクラスパスエントリ。 注記: クライアントモードでは、ドライバのJVMは既に起動しているため、アプリケーションで直接 SparkConf を使用してこの設定を行うことはできません。代わりに、--driver-class-path コマンドラインオプション、またはデフォルトのプロパティファイルで設定してください。 |
1.0.0 |
spark.driver.defaultJavaOptions |
(なし) | spark.driver.extraJavaOptions にプリペンドするデフォルトのJVMオプションの文字列。これは管理者によって設定されることを意図しています。例えば、GC設定やその他のログ出力などです。このオプションで最大ヒープサイズ(-Xmx)の設定を行うことはできません。最大ヒープサイズは、クラスタモードではspark.driver.memory で、クライアントモードでは--driver-memory コマンドラインオプションで設定できます。注記: クライアントモードでは、ドライバのJVMは既に起動しているため、アプリケーションで直接 SparkConf を使用してこの設定を行うことはできません。代わりに、--driver-java-options コマンドラインオプション、またはデフォルトのプロパティファイルで設定してください。 |
3.0.0 |
spark.driver.extraJavaOptions |
(なし) | ドライバに渡す追加のJVMオプションの文字列。これはユーザによって設定されることを意図しています。例えば、GC設定やその他のログ出力などです。このオプションで最大ヒープサイズ(-Xmx)の設定を行うことはできません。最大ヒープサイズは、クラスタモードではspark.driver.memory で、クライアントモードでは--driver-memory コマンドラインオプションで設定できます。注記: クライアントモードでは、ドライバのJVMは既に起動しているため、アプリケーションで直接 SparkConf を使用してこの設定を行うことはできません。代わりに、--driver-java-options コマンドラインオプション、またはデフォルトのプロパティファイルで設定してください。spark.driver.defaultJavaOptions はこの設定の前に追加されます。 |
1.0.0 |
spark.driver.extraLibraryPath |
(なし) | ドライバJVMの起動時に使用する特別なライブラリパスを設定します。 注記: クライアントモードでは、ドライバのJVMは既に起動しているため、アプリケーションで直接 SparkConf を使用してこの設定を行うことはできません。代わりに、--driver-library-path コマンドラインオプション、またはデフォルトのプロパティファイルで設定してください。 |
1.0.0 |
spark.driver.userClassPathFirst |
false | (実験的) ドライバでクラスをロードする際に、ユーザが追加したjarをSpark自身のjarよりも優先させるかどうか。この機能は、Sparkの依存関係とユーザの依存関係の競合を軽減するために使用できます。現在、実験的な機能です。クラスタモードでのみ使用されます。 | 1.3.0 |
spark.executor.extraClassPath |
(なし) | 実行プログラムのクラスパスにプリペンドする追加のクラスパスエントリ。これは主に、古いバージョンのSparkとの後方互換性のために存在します。通常、ユーザがこのオプションを設定する必要はありません。 | 1.0.0 |
spark.executor.defaultJavaOptions |
(なし) | spark.executor.extraJavaOptions にプリペンドするデフォルトのJVMオプションの文字列。これは管理者によって設定されることを意図しています。例えば、GC設定やその他のログ出力などです。このオプションでSparkプロパティまたは最大ヒープサイズ(-Xmx)の設定を行うことはできません。Sparkプロパティは、SparkConfオブジェクトまたはspark-submitスクリプトで使用されるspark-defaults.confファイルを使用して設定する必要があります。最大ヒープサイズはspark.executor.memoryで設定できます。存在する場合は、以下の記号が補間されます: はアプリケーションIDに置き換えられ、 は実行プログラムIDに置き換えられます。たとえば、/tmpにあるアプリケーションの実行プログラムIDの名前のファイルに詳細なGCログを出力するには、'value'に-verbose:gc -Xloggc:/tmp/-.gc を渡します。 |
3.0.0 |
spark.executor.extraJavaOptions |
(なし) | 実行プログラムに渡す追加のJVMオプションの文字列。これはユーザによって設定されることを意図しています。例えば、GC設定やその他のログ出力などです。このオプションでSparkプロパティまたは最大ヒープサイズ(-Xmx)の設定を行うことはできません。Sparkプロパティは、SparkConfオブジェクトまたはspark-submitスクリプトで使用されるspark-defaults.confファイルを使用して設定する必要があります。最大ヒープサイズはspark.executor.memoryで設定できます。存在する場合は、以下の記号が補間されます: はアプリケーションIDに置き換えられ、 は実行プログラムIDに置き換えられます。たとえば、/tmpにあるアプリケーションの実行プログラムIDの名前のファイルに詳細なGCログを出力するには、'value'に-verbose:gc -Xloggc:/tmp/-.gc を渡します。spark.executor.defaultJavaOptions はこの設定の前に追加されます。 |
1.0.0 |
spark.executor.extraLibraryPath |
(なし) | 実行プログラムJVMの起動時に使用する特別なライブラリパスを設定します。 | 1.0.0 |
spark.executor.logs.rolling.maxRetainedFiles |
-1 | システムによって保持される最新のローリングログファイル数を設定します。古いログファイルは削除されます。デフォルトでは無効です。 | 1.1.0 |
spark.executor.logs.rolling.enableCompression |
false | 実行プログラムログの圧縮を有効にします。有効にすると、ローリングされた実行プログラムログが圧縮されます。デフォルトでは無効です。 | 2.0.2 |
spark.executor.logs.rolling.maxSize |
1024 * 1024 | 実行プログラムログがロールオーバーされるファイルの最大サイズ(バイト単位)を設定します。ローリングはデフォルトでは無効です。古いログの自動クリーンアップについては、spark.executor.logs.rolling.maxRetainedFiles を参照してください。 |
1.4.0 |
spark.executor.logs.rolling.strategy |
(なし) | 実行プログラムログのローリング戦略を設定します。デフォルトでは無効です。「time」(時間ベースのローリング)または「size」(サイズベースのローリング)に設定できます。「time」の場合、ローリング間隔を設定するにはspark.executor.logs.rolling.time.interval を使用します。「size」の場合、ローリングの最大ファイルサイズを設定するにはspark.executor.logs.rolling.maxSize を使用します。 |
1.1.0 |
spark.executor.logs.rolling.time.interval |
daily | 実行プログラムログがロールオーバーされる時間間隔を設定します。ローリングはデフォルトでは無効です。有効な値はdaily 、hourly 、minutely 、または秒単位の間隔です。古いログの自動クリーンアップについては、spark.executor.logs.rolling.maxRetainedFiles を参照してください。 |
1.1.0 |
spark.executor.userClassPathFirst |
false | (実験的) spark.driver.userClassPathFirst と同じ機能ですが、実行プログラムインスタンスに適用されます。 |
1.3.0 |
spark.executorEnv.[EnvironmentVariableName] |
(なし) | EnvironmentVariableName で指定された環境変数をExecutorプロセスに追加します。複数の環境変数を設定するために、これらを複数指定できます。 |
0.9.0 |
spark.redaction.regex |
(?i)secret|password|token|access[.]key | ドライバと実行プログラムの環境にあるどのSpark設定プロパティと環境変数が機密情報を含むかを決定するための正規表現。この正規表現がプロパティキーまたは値と一致する場合、値は環境UIおよびYARNログやイベントログなどのさまざまなログから削除されます。 | 2.1.2 |
spark.redaction.string.regex |
(なし) | Sparkによって生成された文字列のどの部分が機密情報を含むかを決定するための正規表現。この正規表現が文字列の一部と一致する場合、その文字列の部分はダミー値に置き換えられます。これは現在、SQL explainコマンドの出力を削除するために使用されています。 | 2.2.0 |
spark.python.profile |
false | Pythonワーカーのプロファイリングを有効にします。プロファイルの結果はsc.show_profiles() で表示されるか、ドライバの終了前に表示されます。また、sc.dump_profiles(path) でディスクにダンプすることもできます。プロファイル結果の一部を手動で表示した場合、ドライバの終了前に自動的に表示されることはありません。デフォルトではpyspark.profiler.BasicProfiler が使用されますが、これはSparkContext コンストラクタのパラメータとしてプロファイラクラスを渡すことでオーバーライドできます。 |
1.2.0 |
spark.python.profile.dump |
(なし) | ドライバの終了前にプロファイル結果をダンプするために使用されるディレクトリ。結果はRDDごとに個別のファイルとしてダンプされます。これらはpstats.Stats() でロードできます。これが指定されている場合、プロファイル結果は自動的に表示されません。 |
1.2.0 |
spark.python.worker.memory |
512m | 集計中にPythonワーカープロセスごとに使用するメモリの量。JVMメモリの文字列と同じ形式で、サイズ単位のサフィックス(「k」、「m」、「g」、または「t」)(例:512m 、2g )を使用します。集計中に使用されるメモリがこの量を超えると、データはディスクにスピルされます。 |
1.1.0 |
spark.python.worker.reuse |
true | Pythonワーカーを再利用するかどうか。はいの場合、固定数のPythonワーカーを使用し、タスクごとにPythonプロセスをフォークする必要はありません。大きなブロードキャストがある場合に非常に役立ちます。ブロードキャストは、タスクごとにJVMからPythonワーカーに転送する必要がなくなります。 | 1.2.0 |
spark.files |
各実行プログラムの作業ディレクトリに配置されるファイルのカンマ区切りのリスト。グローバルワイルドカードが使用できます。 | 1.0.0 | |
spark.submit.pyFiles |
PythonアプリケーションのPYTHONPATHに配置する.zip、.egg、または.pyファイルのカンマ区切りのリスト。グローバルワイルドカードが使用できます。 | 1.0.1 | |
spark.jars |
ドライバと実行プログラムのクラスパスに含めるjarのカンマ区切りのリスト。グローバルワイルドカードが使用できます。 | 0.9.0 | |
spark.jars.packages |
ドライバと実行プログラムのクラスパスに含めるjarのMaven座標のカンマ区切りのリスト。座標はgroupId:artifactId:versionである必要があります。spark.jars.ivySettings が指定されている場合、アーティファクトはファイル内の設定に従って解決されます。そうでない場合、アーティファクトはローカルのMavenリポジトリ、次にMavenセントラル、最後にコマンドラインオプション--repositories で指定された追加のリモートリポジトリで検索されます。詳細については、「高度な依存関係管理」を参照してください。 |
1.5.0 | |
spark.jars.excludes |
依存関係の競合を避けるために、spark.jars.packages で提供された依存関係を解決する際に除外するgroupId:artifactIdのカンマ区切りのリスト。 |
1.5.0 | |
spark.jars.ivy |
ローカルのIvyキャッシュとspark.jars.packages からのパッケージファイルに使用されるIvyユーザディレクトリを指定するパス。これは、デフォルトで~/.ivy2であるIvyプロパティivy.default.ivy.user.dir をオーバーライドします。 |
1.3.0 | |
spark.jars.ivySettings |
spark.jars.packages を使用して指定されたjarの解決を、Mavenセントラルなどの組み込みのデフォルトではなく、カスタマイズするために使用するIvy設定ファイルへのパス。コマンドラインオプション--repositories またはspark.jars.repositories で指定された追加のリポジトリも含まれます。ファイアウォールの背後にあるアーティファクト(例:Artifactoryのような社内アーティファクトサーバ)をSparkが解決できるようにする場合に役立ちます。設定ファイルの形式の詳細については、「設定ファイル」を参照してください。file:// スキームを持つパスのみがサポートされます。スキームのないパスは、file:// スキームを持つとみなされます。YARNクラスタモードで実行する場合、このファイルはリモートドライバにもローカライズされ、 |
2.2.0 | |
spark.jars.repositories |
--packages またはspark.jars.packages で指定されたMaven座標を検索するための追加のリモートリポジトリのカンマ区切りのリスト。 |
2.3.0 | |
spark.archives |
各実行プログラムの作業ディレクトリに展開されるアーカイブのカンマ区切りのリスト。.jar、.tar.gz、.tgz、および.zipがサポートされます。ファイル名に# を追加して展開するディレクトリ名を指定できます。たとえば、file.zip#directory です。この設定は実験的です。 |
3.1.0 | |
spark.pyspark.driver.python |
ドライバでPySparkに使用するPythonバイナリ実行ファイル。(デフォルトはspark.pyspark.python です) |
2.1.0 | |
spark.pyspark.python |
ドライバと実行プログラムの両方でPySparkに使用するPythonバイナリ実行ファイル。 | 2.1.0 |
シャッフル動作
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.reducer.maxSizeInFlight |
48m | 各Reduceタスクから同時にフェッチするマップ出力の最大サイズ(MiB単位、特に指定がない限り)。各出力を受け取るためのバッファを作成する必要があるため、これはReduceタスクあたりの固定メモリオーバーヘッドを表します。メモリが大量にない限り、小さく設定してください。 | 1.4.0 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | この設定は、任意の時点でブロックをフェッチするためのリモートリクエストの数を制限します。クラスタ内のホスト数が増加すると、1つ以上のノードへの非常に多くの着信接続につながる可能性があり、ワーカーが負荷で失敗する原因となります。フェッチリクエストの数を制限できるようにすることで、このシナリオを軽減できます。 | 2.0.0 |
spark.reducer.maxBlocksInFlightPerAddress |
Int.MaxValue | この設定は、特定のホストポートからReduceタスクごとにフェッチされるリモートブロックの数を制限します。単一のフェッチまたは同時に特定のアドレスから大量のブロックが要求されると、サービスを提供しているExecutorまたはNode Managerがクラッシュする可能性があります。これは、外部シャッフルが有効になっている場合にNode Managerの負荷を軽減するのに特に役立ちます。値を小さく設定することで、この問題を軽減できます。 | 2.2.1 |
spark.shuffle.compress |
true | マップ出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮にはspark.io.compression.codec が使用されます。 |
0.6.0 |
spark.shuffle.file.buffer |
32k | 各シャッフルファイル出力ストリームのインメモリバッファのサイズ(KiB単位、特に指定がない限り)。これらのバッファにより、中間シャッフルファイルの作成で発生するディスクシークとシステムコールの数が減少します。 | 1.4.0 |
spark.shuffle.unsafe.file.output.buffer |
32k | アンセーフシャッフルライターで各パーティションの書き込み後のこのバッファサイズのファイルシステム。KiB単位、特に指定がない限り。 | 2.3.0 |
spark.shuffle.spill.diskWriteBufferSize |
1024 * 1024 | ソートされたレコードをディスク上のファイルに書き込む際に使用するバッファサイズ(バイト単位)。 | 2.3.0 |
spark.shuffle.io.maxRetries |
3 | (Nettyのみ) 0以外の値に設定されている場合、IO関連の例外によって失敗したフェッチは自動的に再試行されます。この再試行ロジックは、長いGC一時停止や一時的なネットワーク接続の問題が発生した場合でも、大規模なシャッフルの安定化に役立ちます。 | 1.2.0 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (Nettyのみ) ホスト間の接続は再利用されるため、大規模クラスタでの接続の増加を削減できます。ハードディスクが多く、ホストが少ないクラスタでは、すべてのディスクを飽和させるのに十分な並行性が得られない可能性があり、ユーザーは値を増やすことを検討する必要があるかもしれません。 | 1.2.1 |
spark.shuffle.io.preferDirectBufs |
true | (Nettyのみ) シャッフルとキャッシュブロック転送中のガベージコレクションを削減するために、オフヒープバッファが使用されます。オフヒープメモリが厳しく制限されている環境では、すべてのNettyからの割り当てをオンヒープにするために、これをオフにすることをお勧めします。 | 1.2.0 |
spark.shuffle.io.retryWait |
5s | (Nettyのみ) フェッチの再試行間の待機時間。再試行によって発生する最大遅延は、デフォルトで15秒(maxRetries * retryWait で計算)です。 |
1.2.1 |
spark.shuffle.io.backLog |
-1 | シャッフルサービスのアクセプトリストの長さ。大規模なアプリケーションでは、短時間で多数の接続が到着した場合にサービスが対応できない場合でも、着信接続がドロップされないように、この値を増やす必要がある場合があります。これは、アプリケーションの外部にある可能性のある(下記のspark.shuffle.service.enabled オプションを参照)シャッフルサービス自体が実行されている場所で構成する必要があります。1未満に設定すると、Nettyのio.netty.util.NetUtil#SOMAXCONN で定義されているOSのデフォルトに戻ります。 |
1.1.1 |
spark.shuffle.io.connectionTimeout |
spark.network.timeout の値 |
シャッフルサーバーとクライアント間の確立された接続のタイムアウト。未処理のフェッチリクエストが残っているが、少なくともconnectionTimeout の間チャネルにトラフィックがない場合、アイドル状態としてマークされ、閉じられます。 |
1.2.0 |
spark.shuffle.io.connectionCreationTimeout |
spark.shuffle.io.connectionTimeout の値 |
シャッフルサーバーとクライアント間の接続を確立するためのタイムアウト。 | 3.2.0 |
spark.shuffle.service.enabled |
false | 外部シャッフルサービスを有効にします。このサービスは、Executorによって書き込まれたシャッフルファイルを保持します(例:Executorを安全に削除できるように、またはExecutorの失敗時にシャッフルフェッチを継続できるように)。有効にするには、外部シャッフルサービスを設定する必要があります。詳細については、動的割り当ての設定とセットアップに関するドキュメントを参照してください。 | 1.2.0 |
spark.shuffle.service.port |
7337 | 外部シャッフルサービスが実行されるポート。 | 1.2.0 |
spark.shuffle.service.name |
spark_shuffle | クライアントが通信する必要があるSparkシャッフルサービスの構成済み名前。これは、YARN NodeManagerの設定(yarn.nodemanager.aux-services )内でシャッフルを構成するために使用される名前と一致する必要があります。spark.shuffle.service.enabled がtrueに設定されている場合にのみ有効になります。 |
3.2.0 |
spark.shuffle.service.index.cache.size |
100m | キャッシュエントリは、指定されたメモリフットプリント(バイト単位、特に指定がない限り)に制限されます。 | 2.3.0 |
spark.shuffle.service.removeShuffle |
false | シャッフルが不要になったときに、割り当て解除されたExecutorのシャッフルブロックの削除にExternalShuffleServiceを使用するかどうか。これが有効になっていない場合、割り当て解除されたExecutor上のシャッフルデータは、アプリケーションが終了するまでディスクに残ります。 | 3.3.0 |
spark.shuffle.maxChunksBeingTransferred |
Long.MAX_VALUE | シャッフルサービスで同時に転送できるチャンクの最大数。最大数に達すると、新しい着信接続が閉じられることに注意してください。クライアントはシャッフル再試行の設定(spark.shuffle.io.maxRetries およびspark.shuffle.io.retryWait を参照)に従って再試行しますが、これらの制限に達すると、タスクはフェッチ失敗で失敗します。 |
2.3.0 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高度な設定) ソートベースのシャッフルマネージャーでは、マップサイド集約がなく、Reduceパーティションがこれ以下の数しかない場合、データのマージソートを回避します。 | 1.1.1 |
spark.shuffle.sort.io.plugin.class |
org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO | シャッフルIOに使用するクラスの名前。 | 3.0.0 |
spark.shuffle.spill.compress |
true | シャッフル中にスピルされたデータを圧縮するかどうか。圧縮にはspark.io.compression.codec が使用されます。 |
0.9.0 |
spark.shuffle.accurateBlockThreshold |
100 * 1024 * 1024 | HighlyCompressedMapStatus内のシャッフルブロックのサイズが正確に記録されるバイト単位のしきい値。これは、シャッフルブロックのフェッチ時にシャッフルブロックのサイズを過小評価することを回避することにより、OOMを防止するのに役立ちます。 | 2.2.1 |
spark.shuffle.registration.timeout |
5000 | 外部シャッフルサービスへの登録のタイムアウト(ミリ秒単位)。 | 2.3.0 |
spark.shuffle.registration.maxAttempts |
3 | 外部シャッフルサービスへの登録に失敗した場合、maxAttempts回再試行します。 | 2.3.0 |
spark.shuffle.reduceLocality.enabled |
true | Reduceタスクの局所性プリファレンスを計算するかどうか。 | 1.5.0 |
spark.shuffle.mapOutput.minSizeForBroadcast |
512k | マップ出力ステータスをExecutorに送信するためにブロードキャストを使用するサイズ。 | 2.0.0 |
spark.shuffle.detectCorrupt |
true | フェッチされたブロックの破損を検出するかどうか。 | 2.2.0 |
spark.shuffle.detectCorrupt.useExtraMemory |
false | 有効にすると、圧縮/暗号化されたストリームの一部が余分なメモリを使用して解凍/復号化され、早期の破損が検出されます。スローされたIOExceptionにより、タスクは一度再試行され、同じ例外で再び失敗した場合、FetchFailedExceptionがスローされ、前のステージが再試行されます。 | 3.0.0 |
spark.shuffle.useOldFetchProtocol |
false | シャッフルブロックのフェッチを行う際に古いプロトコルを使用するかどうか。新しいSparkバージョンのジョブが古いバージョンの外部シャッフルサービスからシャッフルブロックをフェッチする必要がある場合にのみ有効になります。 | 3.0.0 |
spark.shuffle.readHostLocalDisk |
true | 有効になっている場合(およびspark.shuffle.useOldFetchProtocol が無効になっている場合)、同じホストで実行されているブロックマネージャーから要求されたシャッフルブロックは、ネットワーク経由でリモートブロックとしてフェッチされるのではなく、ディスクから直接読み取られます。 |
3.0.0 |
spark.files.io.connectionTimeout |
spark.network.timeout の値 |
Spark RPC環境でファイルをフェッチするための確立された接続のタイムアウト。ダウンロードされているファイルが残っているが、少なくともconnectionTimeout の間チャネルにトラフィックがない場合、アイドル状態としてマークされ、閉じられます。 |
1.6.0 |
spark.files.io.connectionCreationTimeout |
spark.files.io.connectionTimeout の値 |
Spark RPC環境でファイルをフェッチするための接続を確立するためのタイムアウト。 | 3.2.0 |
spark.shuffle.checksum.enabled |
true | シャッフルデータのチェックサムを計算するかどうか。有効にすると、Sparkはマップ出力ファイル内の各パーティションデータのチェックサム値を計算し、その値をディスク上のチェックサムファイルに保存します。シャッフルデータの破損が検出されると、Sparkはチェックサムファイルを使用して、破損の原因(ネットワークの問題、ディスクの問題など)を診断しようとします。 | 3.2.0 |
spark.shuffle.checksum.algorithm |
ADLER32 | シャッフルチェックサムの計算に使用されるアルゴリズム。現在、JDKの組み込みアルゴリズム(ADLER32、CRC32など)のみがサポートされています。 | 3.2.0 |
spark.shuffle.service.fetch.rdd.enabled |
false | ディスクに永続化されたRDDブロックのフェッチにExternalShuffleServiceを使用するかどうか。動的割り当ての場合、この機能が有効になっていると、ディスクに永続化されたブロックのみを持つExecutorはspark.dynamicAllocation.executorIdleTimeout 後にアイドルと見なされ、それに応じて解放されます。 |
3.0.0 |
spark.shuffle.service.db.enabled |
true | ExternalShuffleServiceでデータベースを使用するかどうか。これはスタンドアロンモードのみに影響することに注意してください。 | 3.0.0 |
spark.shuffle.service.db.backend |
LEVELDB | シャッフルサービスローカルデータベースで使用されるディスクベースのストアを指定します。LEVELDBまたはROCKSDBとして設定します。 | 3.4.0 |
Spark UI
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.eventLog.logBlockUpdates.enabled |
false | spark.eventLog.enabled がtrueの場合、ブロック更新ごとにイベントをログに記録するかどうか。*警告*:これにより、イベントログのサイズが大幅に増加します。 |
2.3.0 |
spark.eventLog.longForm.enabled |
false | trueの場合、イベントログでコールサイトの長形式を使用します。それ以外の場合は、短形式を使用します。 | 2.4.0 |
spark.eventLog.compress |
false | spark.eventLog.enabled がtrueの場合、ログに記録されたイベントを圧縮するかどうか。 |
1.0.0 |
spark.eventLog.compression.codec |
zstd | ログに記録されたイベントを圧縮するコーデック。デフォルトでは、Sparkは4つのコーデックを提供します:lz4 、lzf 、snappy 、zstd 。また、完全修飾クラス名を使用してコーデックを指定することもできます(例:org.apache.spark.io.LZ4CompressionCodec 、org.apache.spark.io.LZFCompressionCodec 、org.apache.spark.io.SnappyCompressionCodec 、org.apache.spark.io.ZStdCompressionCodec )。 |
3.0.0 |
spark.eventLog.erasureCoding.enabled |
false | イベントログでイレージャコーディングを使用できるようにするか、ファイルシステムのデフォルトに関係なくイレージャコーディングをオフにするかどうか。HDFSでは、イレージャコーディングされたファイルは通常のレプリケートされたファイルほど迅速には更新されないため、アプリケーションの更新がHistory Serverに表示されるまでに時間がかかります。これがtrueであっても、Sparkはファイルをイレージャコーディングを使用するように強制しません。ファイルシステムのデフォルトを使用するだけです。 | 3.0.0 |
spark.eventLog.dir |
file:///tmp/spark-events | spark.eventLog.enabled がtrueの場合、Sparkイベントがログに記録される基本ディレクトリ。この基本ディレクトリ内では、Sparkは各アプリケーションのサブディレクトリを作成し、このディレクトリにアプリケーション固有のイベントをログに記録します。ユーザーは、履歴ファイルが履歴サーバーによって読み取れるように、HDFSディレクトリなどの統合された場所に設定することをお勧めします。 |
1.0.0 |
spark.eventLog.enabled |
false | Sparkイベントをログに記録するかどうか。アプリケーションの終了後にWeb UIを再構築するのに役立ちます。 | 1.0.0 |
spark.eventLog.overwrite |
false | 既存のファイルを上書きするかどうか。 | 1.0.0 |
spark.eventLog.buffer.kb |
100k | 出力ストリームに書き込む際に使用するバッファサイズ(KiB単位、特に指定がない限り)。 | 1.0.0 |
spark.eventLog.rolling.enabled |
false | イベントログファイルのローリングオーバーが有効になっているかどうか。trueに設定すると、各イベントログファイルが構成されたサイズに短縮されます。 | 3.0.0 |
spark.eventLog.rolling.maxFileSize |
128m | spark.eventLog.rolling.enabled=true の場合、ローリングオーバーされる前のイベントログファイルの最大サイズを指定します。 |
3.0.0 |
spark.ui.dagGraph.retainedRootRDDs |
Int.MaxValue | Spark UIとステータスAPIがガベージコレクションを行う前に、DAGグラフノードをいくつ記憶するか。 | 2.1.0 |
spark.ui.enabled |
true | SparkアプリケーションのWeb UIを実行するかどうか。 | 1.1.1 |
spark.ui.store.path |
なし | ライブUIのアプリケーション情報をキャッシュするローカルディレクトリ。デフォルトでは設定されておらず、すべてのアプリケーション情報はメモリに保持されます。 | 3.4.0 |
spark.ui.killEnabled |
true | Web UIからジョブとステージをキルすることを許可します。 | 1.0.0 |
spark.ui.liveUpdate.period |
100ms | ライブエンティティを更新する頻度。アプリケーションの再生時には-1は「更新しない」ことを意味し、最後の書き込みのみが行われます。ライブアプリケーションの場合、これは受信タスクイベントを迅速に処理する際に不要ないくつかの操作を回避します。 | 2.3.0 |
spark.ui.liveUpdate.minFlushPeriod |
1s | 古いUIデータがフラッシュされるまでの最小経過時間。受信タスクイベントが頻繁に発生しない場合、UIの陳腐化を回避します。 | 2.4.2 |
spark.ui.port |
4040 | アプリケーションのダッシュボードのポート。メモリとワークロードのデータが表示されます。 | 0.7.0 |
spark.ui.retainedJobs |
1000 | Spark UIとステータスAPIがガベージコレクションを行う前に、ジョブをいくつ記憶するか。これは目標の最大値であり、状況によってはそれより少ない要素が保持される場合があります。 | 1.2.0 |
spark.ui.retainedStages |
1000 | Spark UIとステータスAPIがガベージコレクションを行う前に、ステージをいくつ記憶するか。これは目標の最大値であり、状況によってはそれより少ない要素が保持される場合があります。 | 0.9.0 |
spark.ui.retainedTasks |
100000 | Spark UIとステータスAPIがガベージコレクションを行う前に、1つのステージ内のタスクをいくつ記憶するか。これは目標の最大値であり、状況によってはそれより少ない要素が保持される場合があります。 | 2.0.1 |
spark.ui.reverseProxy |
false | ワーカーとアプリケーションのUIのためにSpark Masterをリバースプロキシとして実行することを有効にします。このモードでは、Spark MasterはワーカーとアプリケーションのUIをリバースプロキシし、ホストへの直接アクセスを必要とせずにアクセスできるようにします。ワーカーとアプリケーションのUIには直接アクセスできなくなるため、注意して使用してください。Spark Master/プロキシの公開URLを介してのみアクセスできます。この設定は、クラスタで実行されているすべてのワーカーとアプリケーションのUIに影響し、すべてのワーカー、ドライバ、およびマスターで設定する必要があります。 | 2.1.0 |
spark.ui.reverseProxyUrl |
Spark UIを別のフロントエンドリバースプロキシを介して提供する必要がある場合、そのリバースプロキシを介してSpark Master UIにアクセスするためのURLです。これは、認証のためのプロキシ(例:OAuthプロキシ)を実行する場合に役立ちます。URLには、`http://mydomain.com/path/to/spark/` のようなパスプレフィックスを含めることができます。これにより、同じ仮想ホストとポートを介して複数のSparkクラスタと他のWebアプリケーションのUIを提供できます。通常、これはスキーム(http/https)、ホスト、およびポートを含む絶対URLである必要があります。ここで「/」から始まる相対URLを指定することも可能です。この場合、Spark UIとSpark REST APIによって生成されるすべてのURLは、サーバー相対リンクになります。これは、Spark UI全体が同じホストとポートを介して提供されるため、引き続き機能します。 この設定はSpark UIでのリンク生成に影響しますが、フロントエンドリバースプロキシは以下を担当します。
設定値には、「/」で分割した後にキーワード「proxy」または「history」を含めることができません。Spark UIは、URIからREST APIエンドポイントを取得するために、両方のキーワードに依存しています。 |
2.1.0 | |
spark.ui.proxyRedirectUri |
Sparkがプロキシの背後で実行されている場合のリダイレクト先のアドレス。これにより、Sparkはリダイレクト応答を変更して、Spark UI自身のアドレスではなく、プロキシサーバーを指すようにします。これは、アプリケーションのプレフィックスパスを含まないサーバーのアドレスのみでなければなりません。プレフィックスは、プロキシサーバー自体(`X-Forwarded-Context`リクエストヘッダーを追加することによって)、またはSparkアプリケーションの設定でプロキシベースを設定することによって設定する必要があります。 | 3.0.0 | |
spark.ui.showConsoleProgress |
false | コンソールにプログレスバーを表示します。プログレスバーは、500ms以上実行されるステージの進捗状況を表示します。複数のステージが同時に実行される場合、同じ行に複数のプログレスバーが表示されます。 注記: シェル環境では、spark.ui.showConsoleProgressのデフォルト値はtrueです。 |
1.2.1 |
spark.ui.custom.executor.log.url |
(なし) | クラスタマネージャのアプリケーションログURLの代わりに、外部ログサービスをサポートするためのカスタムSpark executorログURLを指定します。Sparkは、クラスタマネージャによって異なる可能性のあるパターンを介していくつかのパス変数をサポートします。サポートされているパターンがあるかどうかを確認するには、クラスタマネージャのドキュメントを確認してください。 この設定はイベントログ内の元のログURLも置き換えることに注意してください。これは、履歴サーバーでアプリケーションにアクセスする場合にも有効です。新しいログURLは永続的である必要があります。そうでないと、executorログURLのリンクが切れる可能性があります。 現時点では、YARNモードのみがこの設定をサポートしています。 |
3.0.0 |
spark.worker.ui.retainedExecutors |
1000 | Spark UIとステータスAPIがガベージコレクションを行う前に、完了したexecutorをいくつ記憶するか。 | 1.5.0 |
spark.worker.ui.retainedDrivers |
1000 | Spark UIとステータスAPIがガベージコレクションを行う前に、完了したドライバをいくつ記憶するか。 | 1.5.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UIとステータスAPIがガベージコレクションを行う前に、完了した実行をいくつ記憶するか。 | 1.5.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark UIとステータスAPIがガベージコレクションを行う前に、完了したバッチをいくつ記憶するか。 | 1.0.0 |
spark.ui.retainedDeadExecutors |
100 | Spark UIとステータスAPIがガベージコレクションを行う前に、死んでいるexecutorをいくつ記憶するか。 | 2.0.0 |
spark.ui.filters |
なし | Spark Web UIに適用するフィルターのクラス名のカンマ区切りリスト。フィルターは標準のjavax servlet Filterである必要があります。 フィルターパラメータは、`spark.<クラス名>.param.<パラメータ名>=<値>`という形式の設定エントリを設定することで、設定にも指定できます。 例えば spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
|
1.0.0 |
spark.ui.requestHeaderSize |
8k | HTTPリクエストヘッダーの最大許容サイズ(バイト単位、特に指定がない限り)。この設定はSpark History Serverにも適用されます。 | 2.2.3 |
spark.ui.timelineEnabled |
true | UIページにイベントタイムラインデータを表示するかどうか。 | 3.4.0 |
spark.ui.timeline.executors.maximum |
250 | イベントタイムラインに表示されるexecutorの最大数。 | 3.2.0 |
spark.ui.timeline.jobs.maximum |
500 | イベントタイムラインに表示されるジョブの最大数。 | 3.2.0 |
spark.ui.timeline.stages.maximum |
500 | イベントタイムラインに表示されるステージの最大数。 | 3.2.0 |
spark.ui.timeline.tasks.maximum |
1000 | イベントタイムラインに表示されるタスクの最大数。 | 1.4.0 |
spark.appStatusStore.diskStoreDir |
なし | SQL実行の診断情報を保存するローカルディレクトリ。この設定はライブUI専用です。 | 3.4.0 |
圧縮とシリアライゼーション
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.broadcast.compress |
true | ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。圧縮には`spark.io.compression.codec`が使用されます。 | 0.6.0 |
spark.checkpoint.compress |
false | RDDチェックポイントを圧縮するかどうか。一般的に良い考えです。圧縮には`spark.io.compression.codec`が使用されます。 | 2.2.0 |
spark.io.compression.codec |
lz4 | RDDパーティション、イベントログ、ブロードキャスト変数、シャッフル出力などの内部データを圧縮するために使用されるコーデック。デフォルトでは、Sparkは4つのコーデックを提供します。`lz4`、`lzf`、`snappy`、`zstd`。また、完全修飾クラス名を使用してコーデックを指定することもできます(例:`org.apache.spark.io.LZ4CompressionCodec`、`org.apache.spark.io.LZFCompressionCodec`、`org.apache.spark.io.SnappyCompressionCodec`、`org.apache.spark.io.ZStdCompressionCodec`)。 | 0.8.0 |
spark.io.compression.lz4.blockSize |
32k | LZ4圧縮コーデックを使用する場合のLZ4圧縮で使用されるブロックサイズ。このブロックサイズを小さくすると、LZ4を使用する場合のシャッフルメモリ使用量も小さくなります。特に指定がない限り、デフォルト単位はバイトです。この設定は`spark.io.compression.codec`のみに適用されます。 | 1.4.0 |
spark.io.compression.snappy.blockSize |
32k | Snappy圧縮コーデックを使用する場合のSnappy圧縮でのブロックサイズ。このブロックサイズを小さくすると、Snappyを使用する場合のシャッフルメモリ使用量も小さくなります。特に指定がない限り、デフォルト単位はバイトです。この設定は`spark.io.compression.codec`のみに適用されます。 | 1.4.0 |
spark.io.compression.zstd.level |
1 | Zstd圧縮コーデックの圧縮レベル。圧縮レベルを上げると、より良い圧縮が得られますが、CPUとメモリの消費が増えます。この設定は`spark.io.compression.codec`のみに適用されます。 | 2.3.0 |
spark.io.compression.zstd.bufferSize |
32k | Zstd圧縮コーデックを使用する場合のZstd圧縮で使用されるバイト単位のバッファサイズ。このサイズを小さくすると、Zstdを使用する場合のシャッフルメモリ使用量が少なくなりますが、過剰なJNI呼び出しオーバーヘッドのために圧縮コストが増加する可能性があります。この設定は`spark.io.compression.codec`のみに適用されます。 | 2.3.0 |
spark.io.compression.zstd.bufferPool.enabled |
true | trueの場合、ZSTD JNIライブラリのバッファプールを有効にします。 | 3.2.0 |
spark.kryo.classesToRegister |
(なし) | Kryoシリアル化を使用する場合は、Kryoに登録するカスタムクラス名のカンマ区切りのリストを指定します。詳細については、チューニングガイドを参照してください。 | 1.2.0 |
spark.kryo.referenceTracking |
true | Kryoを使用してデータをシリアル化するときに、同じオブジェクトへの参照を追跡するかどうか。オブジェクトグラフにループがある場合に必要であり、同じオブジェクトの複数のコピーが含まれている場合は効率のために役立ちます。この状況でないことがわかっている場合は、パフォーマンスを向上させるために無効にすることができます。 | 0.8.0 |
spark.kryo.registrationRequired |
false | Kryoへの登録を要求するかどうか。'true'に設定されている場合、登録されていないクラスがシリアル化されると、Kryoは例外をスローします。false(デフォルト)に設定されている場合、Kryoは各オブジェクトと共に登録されていないクラス名を書き込みます。クラス名を書き込むと、パフォーマンスに大きなオーバーヘッドが生じる可能性があるため、このオプションを有効にすることで、ユーザーが登録からクラスを省略していないことを厳密に強制できます。 | 1.1.0 |
spark.kryo.registrator |
(なし) | Kryoシリアル化を使用する場合は、カスタムクラスをKryoに登録するクラスのカンマ区切りのリストを指定します。このプロパティは、カスタムの方法でクラスを登録する必要がある場合(例:カスタムフィールドシリアライザを指定する場合)に役立ちます。そうでない場合は、`spark.kryo.classesToRegister`の方が簡単です。 KryoRegistrator を拡張するクラスに設定する必要があります。詳細については、チューニングガイドを参照してください。 |
0.5.0 |
spark.kryo.unsafe |
true | UnsafeベースのKryoシリアライザを使用するかどうか。UnsafeベースのIOを使用することで、大幅に高速化できます。 | 2.1.0 |
spark.kryoserializer.buffer.max |
64m | Kryoシリアル化バッファの最大許容サイズ(MiB単位、特に指定がない限り)。これは、シリアル化しようとするオブジェクトよりも大きくなければならず、2048m未満でなければなりません。Kryo内で「バッファ制限を超えました」という例外が発生する場合は、これを増やしてください。 | 1.4.0 |
spark.kryoserializer.buffer |
64k | Kryoのシリアル化バッファの初期サイズ(KiB単位、特に指定がない限り)。各ワーカーの各コアに1つのバッファがあることに注意してください。必要に応じて、このバッファは`spark.kryoserializer.buffer.max`まで増加します。 | 1.4.0 |
spark.rdd.compress |
false | シリアル化されたRDDパーティションを圧縮するかどうか(例:JavaとScalaでは`StorageLevel.MEMORY_ONLY_SER`、Pythonでは`StorageLevel.MEMORY_ONLY`)。追加のCPU時間のコストで、かなりのスペースを節約できます。圧縮には`spark.io.compression.codec`が使用されます。 | 0.6.0 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer |
ネットワーク経由で送信されるか、シリアル化された形式でキャッシュする必要があるオブジェクトのシリアル化に使用されるクラスです。Javaのデフォルトのシリアル化は、任意のSerializableなJavaオブジェクトで動作しますが、かなり遅いため、速度が必要な場合はorg.apache.spark.serializer.KryoSerializer の使用とKryoシリアル化の設定をお勧めします。org.apache.spark.Serializer の任意のサブクラスにすることができます。 |
0.5.0 |
spark.serializer.objectStreamReset |
100 | org.apache.spark.serializer.JavaSerializer を使用してシリアル化する際、シリアライザは冗長なデータの書き込みを防ぐためにオブジェクトをキャッシュしますが、これにより、それらのオブジェクトのガベージコレクションが停止します。「reset」を呼び出すことで、シリアライザからその情報をフラッシュし、古いオブジェクトが収集されるようにします。この定期的なリセットをオフにするには、-1に設定します。デフォルトでは、100個のオブジェクトごとにシリアライザをリセットします。 |
1.0.0 |
メモリ管理
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.memory.fraction |
0.6 | (ヒープスペース - 300MB)の実行とストレージに使用される割合です。この値が低いほど、スピルとキャッシュされたデータの削除が頻繁に発生します。この設定の目的は、内部メタデータ、ユーザーデータ構造、およびスパースで異常に大きなレコードの場合の不正確なサイズ推定のためにメモリを確保することです。デフォルト値のままにすることをお勧めします。この値を増やす場合のJVMガベージコレクションの正しいチューニングに関する重要な情報を含む詳細については、この説明を参照してください。 | 1.6.0 |
spark.memory.storageFraction |
0.5 | spark.memory.fraction によって確保された領域のサイズに対する割合として表される、削除されないストレージメモリの量です。この値が高いほど、実行に使用できるワーキングメモリが少なくなり、タスクがディスクにスピルする頻度が高くなる可能性があります。デフォルト値のままにすることをお勧めします。詳細については、この説明を参照してください。 |
1.6.0 |
spark.memory.offHeap.enabled |
false | trueの場合、Sparkは特定の操作にオフヒープメモリを使用しようとします。オフヒープメモリの使用が有効になっている場合、spark.memory.offHeap.size は正の値である必要があります。 |
1.6.0 |
spark.memory.offHeap.size |
0 | オフヒープ割り当てに使用できるメモリの絶対量(バイト単位、特に指定がない限り)。この設定はヒープメモリの使用量には影響を与えません。そのため、エグゼキュータの総メモリ消費量が特定の上限内に収まる必要がある場合は、JVMヒープサイズをそれに応じて縮小してください。spark.memory.offHeap.enabled=true の場合は、正の値に設定する必要があります。 |
1.6.0 |
spark.storage.unrollMemoryThreshold |
1024 * 1024 | ブロックの展開を開始する前に要求する初期メモリ。 | 1.1.0 |
spark.storage.replication.proactive |
false | RDDブロックのプロアクティブなブロックレプリケーションを有効にします。エグゼキュータの障害により失われたキャッシュされたRDDブロックレプリカは、既存の利用可能なレプリカがあれば補充されます。これにより、ブロックのレプリケーションレベルを初期の数に近づけようとします。 | 2.2.0 |
spark.storage.localDiskByExecutors.cacheSize |
1000 | ローカルディレクトリが保存されるエグゼキュータの最大数です。このサイズは、ドライバーとエグゼキュータの両方で適用され、無制限のストアにならないようにします。このキャッシュは、同じホストからディスクに永続化されたRDDブロックまたはシャッフルブロック(spark.shuffle.readHostLocalDisk が設定されている場合)を取得する場合に、ネットワークを回避するために使用されます。 |
3.0.0 |
spark.cleaner.periodicGC.interval |
30分 | ガベージコレクションをトリガーする頻度を制御します。 このコンテキストクリーナーは、弱い参照がガベージコレクションされた場合にのみクリーンアップをトリガーします。ドライバーJVMが大きく、ドライバーにメモリプレッシャーが少ない長期間実行されるアプリケーションでは、これは非常にまれにしか発生しないか、まったく発生しません。まったくクリーンアップしないと、しばらくしてエグゼキュータのディスク容量が不足する可能性があります。 |
1.6.0 |
spark.cleaner.referenceTracking |
true | コンテキストクリーニングの有効化または無効化。 | 1.0.0 |
spark.cleaner.referenceTracking.blocking |
true | クリーンアップスレッドがクリーンアップタスクをブロックするかどうかを制御します(シャッフルを除く。シャッフルはspark.cleaner.referenceTracking.blocking.shuffle Sparkプロパティによって制御されます)。 |
1.0.0 |
spark.cleaner.referenceTracking.blocking.shuffle |
false | クリーンアップスレッドがシャッフルクリーンアップタスクをブロックするかどうかを制御します。 | 1.1.1 |
spark.cleaner.referenceTracking.cleanCheckpoints |
false | 参照がスコープ外の場合にチェックポイントファイルをクリーンアップするかどうかを制御します。 | 1.4.0 |
実行動作
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.broadcast.blockSize |
4MB | TorrentBroadcastFactory のブロックの各部分のサイズ(特に指定がない限りKiB単位)。値が大きすぎると、ブロードキャスト中の並列処理が低下します(遅くなります)。ただし、小さすぎると、BlockManager のパフォーマンスが低下する可能性があります。 |
0.5.0 |
spark.broadcast.checksum |
true | ブロードキャストのチェックサムを有効にするかどうか。有効にすると、ブロードキャストにはチェックサムが含まれ、少し多くのデータの計算と送信のコストがかかりますが、破損したブロックの検出に役立ちます。ネットワークにブロードキャスト中にデータが破損しないことを保証する他のメカニズムがある場合は、無効にすることができます。 | 2.1.1 |
spark.broadcast.UDFCompressionThreshold |
1 * 1024 * 1024 | ユーザー定義関数(UDF)とPython RDDコマンドがブロードキャストによって圧縮されるしきい値(バイト単位、特に指定がない限り)。 | 3.0.0 |
spark.executor.cores |
YARNモードでは1、スタンドアロンモードとMesos粗粒度モードではワーカーで使用可能なすべてのコア。 | 各エグゼキュータで使用されるコアの数です。スタンドアロンモードとMesos粗粒度モードの詳細については、この説明を参照してください。 | 1.0.0 |
spark.default.parallelism |
reduceByKey やjoin などの分散シャッフル操作では、親RDD内のパーティションの最大数です。親RDDのないparallelize などの操作では、クラスタマネージャーによって異なります。
|
ユーザーによって設定されていない場合、join 、reduceByKey 、parallelize などの変換によって返されるRDDのデフォルトのパーティション数。 |
0.5.0 |
spark.executor.heartbeatInterval |
10秒 | 各エグゼキュータからドライバーへのハートビート間のインターバルです。ハートビートにより、ドライバーはエグゼキュータがまだ稼働中であることを認識し、進行中のタスクのメトリクスで更新されます。spark.executor.heartbeatInterval はspark.network.timeout よりも大幅に短くする必要があります。 |
1.1.0 |
spark.files.fetchTimeout |
60秒 | SparkContext.addFile() を使用してドライバーから追加されたファイルを取得する際に使用する通信タイムアウト。 |
1.0.0 |
spark.files.useFetchCache |
true | trueに設定されている場合(デフォルト)、ファイルの取得は、同じアプリケーションに属するエグゼキュータによって共有されるローカルキャッシュを使用します。これにより、同じホストで多くのエグゼキュータを実行する場合、タスクの起動パフォーマンスが向上する可能性があります。falseに設定すると、これらのキャッシング最適化が無効になり、すべてエグゼキュータがファイルの独自の複製を取得します。NFSファイルシステムにあるSparkのローカルディレクトリを使用するために、この最適化を無効にする場合があります(詳細についてはSPARK-6313を参照)。 | 1.2.2 |
spark.files.overwrite |
false | 起動時に存在するファイルを上書きするかどうか。ユーザーは、このオプションがtrue に設定されている場合でも、SparkContext.addFile またはSparkContext.addJar によって追加されたファイルは上書きできません。 |
1.0.0 |
spark.files.ignoreCorruptFiles |
false | 破損したファイルを無視するかどうか。trueの場合、破損したファイルまたは存在しないファイルに遭遇してもSparkジョブは実行を継続し、読み取られた内容は返されます。 | 2.1.0 |
spark.files.ignoreMissingFiles |
false | 欠損ファイルを無視するかどうか。trueの場合、欠損ファイルに遭遇してもSparkジョブは実行を継続し、読み取られた内容は返されます。 | 2.4.0 |
spark.files.maxPartitionBytes |
134217728 (128 MiB) | ファイルを読み込むときに単一のパーティションにパックできるバイト数の最大値。 | 2.1.0 |
spark.files.openCostInBytes |
4194304 (4 MiB) | ファイルを開くための推定コスト(同時にスキャンできるバイト数で測定)。複数のファイルをパーティションに入れる際に使用されます。過大評価の方が良く、小さなファイルを含むパーティションの方が大きなファイルを含むパーティションよりも高速になります。 | 2.1.0 |
spark.hadoop.cloneConf |
false | trueに設定されている場合、各タスクに対して新しいHadoop Configuration オブジェクトを複製します。このオプションは、Configuration のスレッドセーフティの問題を回避するために有効にする必要があります(詳細についてはSPARK-2546を参照)。これらの問題の影響を受けないジョブのパフォーマンスの予期しない低下を回避するために、デフォルトでは無効になっています。 |
1.0.3 |
spark.hadoop.validateOutputSpecs |
true | trueに設定されている場合、saveAsHadoopFile およびその他のバリアントで使用される出力仕様を検証します(たとえば、出力ディレクトリが既に存在するかどうかをチェックします)。これは、既存の出力ディレクトリに起因する例外を抑制するために無効にすることができます。ユーザーは、以前のバージョンのSparkとの互換性を達成しようとする場合を除き、これを無効にしないことをお勧めします。HadoopのFileSystem APIを使用して、手動で出力ディレクトリを削除するだけです。この設定は、Spark StreamingのStreamingContext によって生成されたジョブについては無視されます。データは、チェックポイント復旧中に既存の出力ディレクトリに書き直す必要がある可能性があるためです。 |
1.0.1 |
spark.storage.memoryMapThreshold |
2MB | ディスクからブロックを読み込むときにSparkがメモリマップするブロックのサイズ。デフォルト単位はバイトです(特に指定がない限り)。これにより、Sparkが非常に小さなブロックをメモリマップすることを防ぎます。一般的に、メモリマッピングは、オペレーティングシステムのページサイズに近いまたはそれ以下のブロックではオーバーヘッドが高くなります。 | 0.9.2 |
spark.storage.decommission.enabled |
false | エグゼキュータの廃止時にブロックマネージャーを廃止するかどうか。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.enabled |
true | ブロックマネージャーの廃止中にシャッフルブロックを転送するかどうか。移行可能なシャッフルリゾルバー(ソートベースのシャッフルなど)が必要です。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.maxThreads |
8 | シャッフルファイルの移行に使用できるスレッドの最大数。 | 3.1.0 |
spark.storage.decommission.rddBlocks.enabled |
true | ブロックマネージャーの廃止中にRDDブロックを転送するかどうか。 | 3.1.0 |
spark.storage.decommission.fallbackStorage.path |
(なし) | ブロックマネージャーの廃止中のフォールバックストレージの場所。たとえば、s3a://spark-storage/ 。空の場合、フォールバックストレージは無効になります。ストレージはTTLによって管理する必要があります。Sparkはクリーンアップしません。 |
3.1.0 |
spark.storage.decommission.fallbackStorage.cleanUp |
false | trueの場合、Sparkはシャットダウン時にフォールバックストレージデータをクリーンアップします。 | 3.2.0 |
spark.storage.decommission.shuffleBlocks.maxDiskSize |
(なし) | リモートシャッフルブロックを拒否する前に、シャッフルブロックの保存に使用するディスク容量の最大値。リモートシャッフルブロックを拒否するということは、エグゼキュータがシャッフルの移行を受け取らなくなることを意味し、移行可能な他のエグゼキュータがない場合、spark.storage.decommission.fallbackStorage.path が設定されていない限り、シャッフルブロックは失われます。 |
3.2.0 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version |
1 | ファイル出力コミッタアルゴリズムのバージョン、有効なアルゴリズムバージョン番号:1または2。2はMAPREDUCE-7282のような正しさの問題を引き起こす可能性があることに注意してください。 | 2.2.0 |
エグゼキュータメトリクス
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.eventLog.logStageExecutorMetrics |
false | エグゼキュータメトリクスのステージごとのピーク(エグゼキュータごと)をイベントログに書き込むかどうか。 注:メトリクスはポーリング(収集)され、エグゼキュータのハートビートで送信されます。これは常に実行されます。この設定は、集計されたメトリクスのピークがイベントログに書き込まれるかどうかを決定するためだけです。 |
3.0.0 |
spark.executor.processTreeMetrics.enabled |
false | エグゼキュータメトリクスを収集する際に、プロセスツリーメトリクス(/procファイルシステムから)を収集するかどうか。 注:プロセスツリーメトリクスは、/procファイルシステムが存在する場合にのみ収集されます。 |
3.0.0 |
spark.executor.metrics.pollingInterval |
0 | エグゼキュータメトリクスを収集する頻度(ミリ秒単位)。 0の場合、ポーリングはエグゼキュータのハートビート(したがって、 spark.executor.heartbeatInterval で指定されたハートビートインターバル)で行われます。正の場合、ポーリングはこのインターバルで行われます。 |
3.0.0 |
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors |
Copy、PS Scavenge、ParNew、G1 Young Generation | サポートされているYoung Generationガベージコレクタの名前。名前は通常、GarbageCollectorMXBean.getNameの戻り値です。ビルトインのYoung Generationガベージコレクタは、Copy、PS Scavenge、ParNew、G1 Young Generationです。 | 3.0.0 |
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors |
MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generation | サポートされているOld Generationガベージコレクタの名前。名前は通常、GarbageCollectorMXBean.getNameの戻り値です。ビルトインのOld Generationガベージコレクタは、MarkSweepCompact、PS MarkSweep、ConcurrentMarkSweep、G1 Old Generationです。 | 3.0.0 |
spark.executor.metrics.fileSystemSchemes |
file、hdfs | エグゼキュータメトリクスでレポートするファイルシステムスキーム。 | 3.1.0 |
ネットワーク
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.rpc.message.maxSize |
128 | "制御プレーン"通信で許可される最大メッセージサイズ(MiB単位)。一般的には、エグゼキュータとドライバ間で送受信されるマップ出力サイズ情報にのみ適用されます。多数のマップタスクとリダイスタスクを実行し、RPCメッセージサイズに関するメッセージが表示される場合は、この値を増やしてください。 | 2.0.0 |
spark.blockManager.port |
(ランダム) | すべてのブロックマネージャがリスンするポート。ドライバとエグゼキュータの両方で存在します。 | 1.1.0 |
spark.driver.blockManager.port |
(spark.blockManager.portの値) | エグゼキュータと同じ設定を使用できない場合に、ブロックマネージャがリスンするドライバ固有のポート。 | 2.1.0 |
spark.driver.bindAddress |
(spark.driver.hostの値) | リスニングソケットをバインドするホスト名またはIPアドレス。この設定は、SPARK_LOCAL_IP環境変数を上書きします(下記参照)。 また、エグゼキュータまたは外部システムにアドバタイズするローカルアドレスとは異なるアドレスを許可します。これは、たとえば、ブリッジされたネットワークを持つコンテナを実行する場合に役立ちます。これが正しく動作するには、ドライバで使用される異なるポート(RPC、ブロックマネージャ、UI)をコンテナのホストから転送する必要があります。 |
2.1.0 |
spark.driver.host |
(ローカルホスト名) | ドライバのホスト名またはIPアドレス。エグゼキュータとスタンドアロンマスターとの通信に使用されます。 | 0.7.0 |
spark.driver.port |
(ランダム) | ドライバがリスンするポート。エグゼキュータとスタンドアロンマスターとの通信に使用されます。 | 0.7.0 |
spark.rpc.io.backLog |
64 | RPCサーバのアクセプトキューの長さ。大規模なアプリケーションでは、短時間で多数の接続が到着した場合に受信接続がドロップされないように、この値を増やす必要がある場合があります。 | 3.0.0 |
spark.network.timeout |
120秒 | すべてのネットワークインタラクションのデフォルトのタイムアウト。spark.storage.blockManagerHeartbeatTimeoutMs 、spark.shuffle.io.connectionTimeout 、spark.rpc.askTimeout 、spark.rpc.lookupTimeout が設定されていない場合は、この設定が使用されます。 |
1.3.0 |
spark.network.timeoutInterval |
60秒 | ドライバがチェックして、死活監視されていないエグゼキュータを期限切れにする間隔。 | 1.3.2 |
spark.network.io.preferDirectBufs |
true | 有効にすると、共有アロケータによるオフヒープバッファ割り当てが優先されます。オフヒープバッファは、シャッフルとキャッシュブロック転送中のガベージコレクションを削減するために使用されます。オフヒープメモリが厳しく制限されている環境では、すべての割り当てをオンヒープにするために、これをオフにすることをお勧めします。 | 3.0.0 |
spark.port.maxRetries |
16 | 諦める前にポートにバインドするときの最大再試行回数。ポートに特定の値(0以外)が指定されている場合、後続の再試行ごとに、前の試行で使用されたポートに1を加算して再試行します。これにより、指定された開始ポートからポート + maxRetriesまでのポート範囲を試行できます。 | 1.1.1 |
spark.rpc.askTimeout |
spark.network.timeout |
RPC ask操作がタイムアウトするまで待機する期間。 | 1.4.0 |
spark.rpc.lookupTimeout |
120秒 | RPCリモートエンドポイントルックアップ操作がタイムアウトするまで待機する期間。 | 1.4.0 |
spark.network.maxRemoteBlockSizeFetchToMem |
200MB | ブロックのサイズがバイト単位でこのしきい値を超えると、リモートブロックはディスクにフェッチされます。これは、巨大なリクエストがメモリを消費しすぎるのを防ぐためです。この設定は、シャッフルフェッチとブロックマネージャのリモートブロックフェッチの両方に影響することに注意してください。外部シャッフルサービスを有効にしたユーザーの場合、この機能は外部シャッフルサービスが少なくとも2.3.0の場合にのみ機能します。 | 3.0.0 |
spark.rpc.io.connectionTimeout |
spark.network.timeout の値 |
未処理のRPCリクエストがあるがチャネルでトラフィックがない場合に、アイドル状態としてマークされ、クローズされるRPCピア間の確立された接続のタイムアウト。少なくとも`connectionTimeout`の間。 | 1.2.0 |
spark.rpc.io.connectionCreationTimeout |
spark.rpc.io.connectionTimeout の値 |
RPCピア間の接続を確立するためのタイムアウト。 | 3.2.0 |
スケジューリング
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.cores.max |
(設定なし) | スタンドアロンデプロイクラスタまたは"粗粒度"共有モードのMesosクラスタで実行する場合、クラスタ全体からアプリケーションに要求するCPUコアの最大数(各マシンからではありません)。設定されていない場合、デフォルトはSparkのスタンドアロンクラスタマネージャではspark.deploy.defaultCores 、Mesosでは無限大(使用可能なすべてのコア)になります。 |
0.6.0 |
spark.locality.wait |
3秒 | データローカルタスクを起動する前に諦めて、ローカル性の低いノードで起動するまで待機する時間。同じ待機時間が複数のローカルレベル(プロセスローカル、ノードローカル、ラックローカル、その後任意)をステップ実行するためにも使用されます。spark.locality.wait.node などを設定して、各レベルの待機時間をカスタマイズすることもできます。タスクが長く、ローカル性が低い場合はこの設定を増やす必要がありますが、デフォルトは通常うまく機能します。 |
0.5.0 |
spark.locality.wait.node |
spark.locality.wait | ノードローカル性の待ち時間をカスタマイズします。たとえば、これを0に設定してノードローカル性をスキップし、(クラスタにラック情報がある場合)すぐにラックローカル性を検索できます。 | 0.8.0 |
spark.locality.wait.process |
spark.locality.wait | プロセスローカル性の待ち時間をカスタマイズします。これは、特定のエグゼキュータプロセスでキャッシュされたデータにアクセスしようとするタスクに影響します。 | 0.8.0 |
spark.locality.wait.rack |
spark.locality.wait | ラックローカル性の待ち時間をカスタマイズします。 | 0.8.0 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30秒 | スケジューリングを開始する前に、リソースが登録されるのを待つ最大時間。 | 1.1.1 |
spark.scheduler.minRegisteredResourcesRatio |
KUBERNETESモードの場合0.8、YARNモードの場合0.8、スタンドアロンモードとMesos粗粒度モードの場合0.0 | スケジューリングを開始する前に待つ、登録済みリソースの最小比率(登録済みリソース/合計予想リソース)(リソースはYARNモードとKubernetesモードではエグゼキュータ、スタンドアロンモードとMesos粗粒度モードではCPUコアです。['spark.cores.max'の値はMesos粗粒度モードの合計予想リソースです])。0.0と1.0の間の倍数として指定します。最小リソース比率に達したかどうかに関係なく、スケジューリングを開始する前に待つ最大時間は、設定spark.scheduler.maxRegisteredResourcesWaitingTime によって制御されます。 |
1.1.1 |
spark.scheduler.mode |
FIFO | 同じSparkContextに送信されたジョブ間のスケジューリングモード。ジョブを順番にキューイングする代わりにフェアシェアリングを使用するには、FAIR に設定できます。マルチユーザーサービスに役立ちます。 |
0.8.0 |
spark.scheduler.revive.interval |
1s | スケジューラがワーカリソースオファーを再開してタスクを実行するための間隔の長さ。 | 0.8.1 |
spark.scheduler.listenerbus.eventqueue.capacity |
10000 | イベントキューのデフォルト容量。Sparkは、最初に`spark.scheduler.listenerbus.eventqueue.queueName.capacity`で指定された容量を使用してイベントキューを初期化しようとします。設定されていない場合は、この設定で指定されたデフォルト容量を使用します。容量は0より大きくする必要があります。リスナーイベントがドロップされる場合は、値を増やすことを検討してください(例:20000)。この値を増やすと、ドライバのメモリ消費量が増える可能性があります。 | 2.3.0 |
spark.scheduler.listenerbus.eventqueue.shared.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
リスナーバスに登録された外部リスナーのイベントを保持するSparkリスナーバスの共有イベントキューの容量。共有キューに対応するリスナーイベントがドロップされる場合は、値を増やすことを検討してください。この値を増やすと、ドライバのメモリ消費量が増える可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.appStatus.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
内部アプリケーションステータスリスナーのイベントを保持するappStatusイベントキューの容量。appStatusキューに対応するリスナーイベントがドロップされる場合は、値を増やすことを検討してください。この値を増やすと、ドライバのメモリ消費量が増える可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
内部エグゼキュータ管理リスナーのイベントを保持するSparkリスナーバスのエグゼキュータ管理イベントキューの容量。executorManagementキューに対応するリスナーイベントがドロップされる場合は、値を増やすことを検討してください。この値を増やすと、ドライバのメモリ消費量が増える可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.eventLog.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
イベントログにイベントを書き込むイベントロギングリスナーのイベントを保持するSparkリスナーバスのeventLogキューの容量。eventLogキューに対応するリスナーイベントがドロップされる場合は、値を増やすことを検討してください。この値を増やすと、ドライバのメモリ消費量が増える可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.streams.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
内部ストリーミングリスナーのイベントを保持するSparkリスナーバスのstreamsキューの容量。streamsキューに対応するリスナーイベントがドロップされる場合は、値を増やすことを検討してください。この値を増やすと、ドライバのメモリ消費量が増える可能性があります。 | 3.0.0 |
spark.scheduler.resource.profileMergeConflicts |
false | "true"に設定されている場合、Sparkは、単一のステージに結合されるRDDで異なるプロファイルが指定されている場合にResourceProfilesをマージします。マージされると、Sparkは各リソースの最大値を選択し、新しいResourceProfileを作成します。デフォルトのfalseでは、同じステージに入るRDDで複数の異なるResourceProfilesが見つかった場合、Sparkは例外をスローします。 | 3.1.0 |
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout |
120秒 | タスクの失敗によりすべての実行者が除外されているためスケジュール不可能なTaskSetを中止する前に、新しい実行者を取得してタスクをスケジュールするために待つタイムアウト(秒単位)。 | 2.4.1 |
spark.standalone.submit.waitAppCompletion |
false | "true"に設定されている場合、Sparkは、単一のステージに結合されるRDDで異なるプロファイルが指定されている場合にResourceProfilesをマージします。マージされると、Sparkは各リソースの最大値を選択し、新しいResourceProfileを作成します。デフォルトのfalseでは、同じステージに入るRDDで複数の異なるResourceProfilesが見つかった場合、Sparkは例外をスローします。 | 3.1.0 |
spark.excludeOnFailure.enabled |
false | "true"に設定すると、タスクの失敗が多すぎるために除外されたexecutor上で、Sparkがタスクのスケジュールを実行することを防止します。executorとノードを除外するアルゴリズムは、他の"spark.excludeOnFailure"設定オプションによってさらに制御できます。 | 2.1.0 |
spark.excludeOnFailure.timeout |
1h | (実験的) ノードまたはexecutorが、新しいタスクの実行を試みるために除外リストから無条件に削除される前に、アプリケーション全体で除外される期間。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor |
1 | (実験的) 特定のタスクについて、executorがそのタスクのために除外される前に、1つのexecutor上で再試行できる回数。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerNode |
2 | (実験的) 特定のタスクについて、ノード全体がそのタスクのために除外される前に、1つのノード上で再試行できる回数。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor |
2 | (実験的) 1つのステージ内で、1つのexecutor上で失敗する必要がある異なるタスクの数(そのexecutorがそのステージのために除外される前に)。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode |
2 | (実験的) 特定のステージで除外されたexecutorの数(ノード全体がそのステージで失敗とマークされる前に)。 | 2.1.0 |
spark.excludeOnFailure.application.maxFailedTasksPerExecutor |
2 | (実験的) 成功したタスクセットにおいて、1つのexecutor上で失敗する必要がある異なるタスクの数(そのexecutorがアプリケーション全体で除外される前に)。除外されたexecutorは、`spark.excludeOnFailure.timeout`で指定されたタイムアウト後に、自動的に利用可能なリソースのプールに追加されます。ただし、動的割り当てでは、executorがアイドル状態としてマークされ、クラスタマネージャーによって回収される可能性があります。 | 2.2.0 |
spark.excludeOnFailure.application.maxFailedExecutorsPerNode |
2 | (実験的) アプリケーション全体で除外する必要がある異なるexecutorの数(ノード全体がアプリケーション全体で除外される前に)。除外されたノードは、`spark.excludeOnFailure.timeout`で指定されたタイムアウト後に、自動的に利用可能なリソースのプールに追加されます。ただし、動的割り当てでは、ノード上のexecutorがアイドル状態としてマークされ、クラスタマネージャーによって回収される可能性があります。 | 2.2.0 |
spark.excludeOnFailure.killExcludedExecutors |
false | (実験的) "true"に設定すると、フェッチ失敗時またはアプリケーション全体で除外された場合に、Sparkがexecutorを自動的にキルすることを許可します(`spark.killExcludedExecutors.application.*`で制御)。ノード全体が除外されると、そのノード上のすべてのexecutorがキルされます。 | 2.2.0 |
spark.excludeOnFailure.application.fetchFailure.enabled |
false | (実験的) "true"に設定すると、フェッチ失敗が発生した場合、Sparkはexecutorを直ちに除外します。外部シャッフルサービスが有効になっている場合、ノード全体が除外されます。 | 2.3.0 |
spark.speculation |
false | "true"に設定すると、タスクの投機的実行を実行します。これは、ステージで1つ以上のタスクが遅れている場合、それらが再起動されることを意味します。 | 0.6.0 |
spark.speculation.interval |
100ms | Sparkが投機対象のタスクをチェックする頻度。 | 0.6.0 |
spark.speculation.multiplier |
1.5 | 投機対象と見なされるタスクが中央値よりも遅い倍率。 | 0.6.0 |
spark.speculation.quantile |
0.75 | 特定のステージで投機が有効になる前に完了する必要があるタスクの割合。 | 0.6.0 |
spark.speculation.minTaskRuntime |
100ms | 投機対象と見なされる前にタスクが実行される最小時間。これは、非常に短いタスクの投機的なコピーを起動することを避けるために使用できます。 | 3.2.0 |
spark.speculation.task.duration.threshold |
なし | スケジューラがタスクの投機的実行を試みるタスクの持続時間。指定されている場合、現在のステージに、単一のexecutorのスロット数以下のタスクが含まれており、タスクがしきい値よりも長い時間かかっている場合、タスクは投機的に実行されます。この設定は、非常に少ないタスクを持つステージを投機するのに役立ちます。executorのスロットが十分に大きい場合、通常の投機設定も適用される場合があります。たとえば、しきい値に達していなくても、十分な成功した実行がある場合、タスクは再起動される可能性があります。スロット数は、`spark.executor.cores`と`spark.task.cpus`の設定値に基づいて計算されます(最小値は1)。デフォルト単位はバイトです(別途指定がない限り)。 | 3.0.0 |
spark.speculation.efficiency.processRateMultiplier |
0.75 | 非効率的なタスクを評価する際に使用される乗数。乗数が高いほど、非効率と見なされる可能性のあるタスクが多くなります。 | 3.4.0 |
spark.speculation.efficiency.longRunTaskFactor |
2 | タスクの持続時間が、この係数と時間しきい値(`spark.speculation.multiplier` * successfulTaskDurations.median または `spark.speculation.minTaskRuntime` のいずれか)を掛け合わせた値を超えている限り、データ処理速度が良好かどうかに関係なく、タスクはとにかく投機的に実行されます。これにより、タスクの遅延がデータ処理速度に関連しない場合に、非効率的なタスクを見逃すことを回避できます。 | 3.4.0 |
spark.speculation.efficiency.enabled |
true | trueに設定すると、Sparkはステージタスクメトリクスまたはその持続時間を通じてタスク処理の効率を評価し、非効率的なタスクのみを投機する必要があります。タスクが非効率的であるのは、1) そのデータ処理速度が、ステージ内のすべての成功したタスクの平均データ処理速度に倍率を掛けたものよりも低い場合、または 2) その持続時間が、`spark.speculation.efficiency.longRunTaskFactor`と時間しきい値(`spark.speculation.multiplier` * successfulTaskDurations.median または `spark.speculation.minTaskRuntime` のいずれか)を掛け合わせた値を超えている場合です。 | 3.4.0 |
spark.task.cpus |
1 | 各タスクに割り当てるコア数。 | 0.5.0 |
spark.task.resource.{resourceName}.amount |
1 | 各タスクに割り当てる特定のリソースの種類の量。これはdouble型にすることができます。これを指定する場合は、executorの設定`spark.executor.resource.{resourceName}.amount`と、対応する検出設定も提供して、executorがそのリソースの種類で作成されるようにする必要があります。整数に加えて、小数(例:0.25、リソースの1/4を意味する)を指定することもできます。小数は0.5以下にする必要があります。つまり、リソース共有の最小量はリソースあたり2つのタスクです。さらに、小数は切り捨てられてリソーススロットが割り当てられます(例:0.2222の設定、または1/0.2222スロットは5ではなく4タスク/リソースになります)。 | 3.0.0 |
spark.task.maxFailures |
4 | ジョブを諦める前に、特定のタスクが連続して失敗する回数。異なるタスクにわたる失敗の総数は、ジョブの失敗原因にはなりません。特定のタスクはこの回数連続して失敗する必要があります。いずれかの試行が成功すると、そのタスクの失敗回数はリセットされます。1以上にする必要があります。許可される再試行回数 = この値 - 1。 | 0.8.0 |
spark.task.reaper.enabled |
false | キルされた/中断されたタスクの監視を有効にします。"true"に設定すると、キルされたタスクは、実際に実行が完了するまでexecutorによって監視されます。この監視の正確な動作を制御する方法については、他の`spark.task.reaper.*`設定を参照してください。"false"(デフォルト)に設定すると、タスクのキルは、そのような監視機能がない古いコードパスを使用します。 | 2.0.3 |
spark.task.reaper.pollingInterval |
10秒 | `spark.task.reaper.enabled = true`の場合、この設定は、executorがキルされたタスクの状態をポーリングする頻度を制御します。ポーリング時にキルされたタスクがまだ実行されている場合は、警告がログに記録され、デフォルトではタスクのスレッドダンプがログに記録されます(このスレッドダンプは、以下で説明されている`spark.task.reaper.threadDump`設定を使用して無効にすることができます)。 | 2.0.3 |
spark.task.reaper.threadDump |
true | `spark.task.reaper.enabled = true`の場合、この設定は、キルされたタスクの定期的なポーリング中にタスクのスレッドダンプをログに記録するかどうかを制御します。スレッドダンプの収集を無効にするには、これをfalseに設定します。 | 2.0.3 |
spark.task.reaper.killTimeout |
-1 | `spark.task.reaper.enabled = true`の場合、この設定は、キルされたタスクの実行が停止しない場合のタイムアウト時間を指定します。デフォルト値の-1は、このメカニズムを無効にし、executorの自己破壊を防ぎます。この設定の目的は、実行できないタスクによってexecutorを使用できなくなるのを防ぐセーフティネットとして機能することです。 | 2.0.3 |
spark.stage.maxConsecutiveAttempts |
4 | ステージが中断される前に許可される連続するステージ試行の数。 | 2.2.0 |
spark.stage.ignoreDecommissionFetchFailure |
false | `spark.stage.maxConsecutiveAttempts`のカウント時に、executorの廃止によって発生したステージフェッチエラーを無視するかどうか。 | 3.4.0 |
バリア実行モード
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.barrier.sync.timeout |
365d | バリアータスクからの各`barrier()`呼び出しのタイムアウト時間(秒単位)。コーディネータが設定時間内にバリアータスクからすべての同期メッセージを受信しなかった場合、SparkExceptionをスローしてすべてのタスクを失敗させます。デフォルト値は31536000(3600 * 24 * 365)に設定されているため、`barrier()`呼び出しは1年間待ちます。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.interval |
15s | 最大同時タスクチェックの失敗と次のチェックの間の待ち時間(秒単位)。最大同時タスクチェックは、クラスタが送信されたジョブのバリアステージで必要とされるよりも多くの同時タスクを起動できることを確認します。クラスタが開始されたばかりで、十分なexecutorが登録されていない場合、チェックが失敗する可能性があるため、少し待ってからチェックを再実行します。ジョブに対して設定された最大失敗回数を超えてチェックが失敗した場合、現在のジョブの送信は失敗します。この設定は、1つ以上のバリアステージを含むジョブのみに適用され、非バリアジョブではチェックを実行しません。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures |
40 | ジョブの送信が失敗する前に許可される最大同時タスクチェックの失敗回数。最大同時タスクチェックは、クラスタが送信されたジョブのバリアステージで必要とされるよりも多くの同時タスクを起動できることを確認します。クラスタが開始されたばかりで、十分なexecutorが登録されていない場合、チェックが失敗する可能性があるため、少し待ってからチェックを再実行します。ジョブに対して設定された最大失敗回数を超えてチェックが失敗した場合、現在のジョブの送信は失敗します。この設定は、1つ以上のバリアステージを含むジョブのみに適用され、非バリアジョブではチェックを実行しません。 | 2.4.0 |
動的割り当て
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.dynamicAllocation.enabled |
false | 動的リソース割り当てを使用するかどうか。これは、ワークロードに基づいて、このアプリケーションに登録されているexecutorの数を増減します。詳細については、こちらの説明を参照してください。 次の条件のいずれかが必要です。1) `spark.shuffle.service.enabled`で外部シャッフルサービスを有効にする、または2) `spark.dynamicAllocation.shuffleTracking.enabled`でシャッフルトラッキングを有効にする、または3) `spark.decommission.enabled`と`spark.storage.decommission.shuffleBlocks.enabled`でシャッフルブロックの廃止を有効にする、または4) (実験的) `spark.shuffle.sort.io.plugin.class`を設定して、信頼できるストレージをサポートするカスタム`ShuffleDataIO`の`ShuffleDriverComponents`を使用する。次の設定も関連します。`spark.dynamicAllocation.minExecutors`、`spark.dynamicAllocation.maxExecutors`、`spark.dynamicAllocation.initialExecutors`、`spark.dynamicAllocation.executorAllocationRatio` |
1.2.0 |
spark.dynamicAllocation.executorIdleTimeout |
60秒 | 動的割り当てが有効になっており、executorがこれ以上の期間アイドル状態になっている場合、executorは削除されます。詳細については、こちらの説明を参照してください。 | 1.2.0 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 動的割り当てが有効になっており、キャッシュされたデータブロックを持つexecutorがこれ以上の期間アイドル状態になっている場合、executorは削除されます。詳細については、こちらの説明を参照してください。 | 1.4.0 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
動的リソース割り当てが有効な場合に実行されるエグゼキュータの初期数。--num-executors (またはspark.executor.instances )が設定され、この値より大きい場合、それがエグゼキュータの初期数として使用されます。 |
1.3.0 |
spark.dynamicAllocation.maxExecutors |
infinity | 動的リソース割り当てが有効な場合のエグゼキュータ数の最大値。 | 1.2.0 |
spark.dynamicAllocation.minExecutors |
0 | 動的リソース割り当てが有効な場合のエグゼキュータ数の最小値。 | 1.2.0 |
spark.dynamicAllocation.executorAllocationRatio |
1 | デフォルトでは、動的リソース割り当ては、処理するタスクの数に応じて並列処理を最大化するために必要な数のエグゼキュータを要求します。これによりジョブのレイテンシは最小化されますが、タスクが小さい場合、エグゼキュータの割り当てオーバーヘッドのために多くのリソースが無駄になる可能性があります(一部のエグゼキュータは何も作業を行わない可能性があるため)。この設定により、完全な並列処理に対するエグゼキュータ数を削減するために使用される比率を設定できます。デフォルトは1.0で、最大限の並列処理を行います。0.5はターゲットのエグゼキュータ数を2で割ります。動的リソース割り当てによって計算されたターゲットのエグゼキュータ数は、spark.dynamicAllocation.minExecutors とspark.dynamicAllocation.maxExecutors の設定によって上書きされる可能性があります。 |
2.4.0 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 動的リソース割り当てが有効になっており、この期間よりも長く保留中のタスクがバックログされている場合、新しいエグゼキュータが要求されます。詳細については、この説明を参照してください。 | 1.2.0 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
spark.dynamicAllocation.schedulerBacklogTimeout と同じですが、後続のエグゼキュータ要求のみに使用されます。詳細については、この説明を参照してください。 |
1.2.0 |
spark.dynamicAllocation.shuffleTracking.enabled |
true |
エグゼキュータのシャッフルファイルのトラッキングを有効にします。これにより、外部のシャッフルサービスを必要とせずに動的リソース割り当てが可能になります。このオプションは、アクティブなジョブのシャッフルデータを保存しているエグゼキュータを存続させようとします。 | 3.0.0 |
spark.dynamicAllocation.shuffleTracking.timeout |
infinity |
シャッフルトラッキングが有効になっている場合、シャッフルデータを持っているエグゼキュータのタイムアウトを制御します。デフォルト値は、Sparkがシャッフルのガベージコレクションに依存してエグゼキュータを解放できることを意味します。何らかの理由でガベージコレクションがシャッフルを十分に迅速にクリーンアップしていない場合、このオプションを使用して、シャッフルデータの保存中でもエグゼキュータのタイムアウトを制御できます。 | 3.0.0 |
スレッド設定
ジョブとクラスタ構成によっては、Sparkの複数の場所でスレッド数を設定して、利用可能なリソースを効率的に活用し、パフォーマンスを向上させることができます。Spark 3.0より前は、これらのスレッド構成は、ドライバー、エグゼキュータ、ワーカー、マスターなど、Sparkのすべての役割に適用されます。Spark 3.0以降は、ドライバーとエグゼキュータから始めて、より細かい粒度でスレッドを構成できます。下の表ではRPCモジュールを例として示します。シャッフルなどの他のモジュールについては、spark.{driver|executor}.rpc.netty.dispatcher.numThreads
(RPCモジュールのみ)を除き、プロパティ名で「rpc」を「shuffle」に置き換えます。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.{driver|executor}.rpc.io.serverThreads |
spark.rpc.io.serverThreads にフォールバックします。 |
サーバーのスレッドプールで使用されるスレッド数。 | 1.6.0 |
spark.{driver|executor}.rpc.io.clientThreads |
spark.rpc.io.clientThreads にフォールバックします。 |
クライアントのスレッドプールで使用されるスレッド数。 | 1.6.0 |
spark.{driver|executor}.rpc.netty.dispatcher.numThreads |
spark.rpc.netty.dispatcher.numThreads にフォールバックします。 |
RPCメッセージディスパッチャのスレッドプールで使用されるスレッド数。 | 3.0.0 |
スレッド関連の構成キーの数に対するデフォルト値は、ドライバーまたはエグゼキュータに要求されたコア数の最小値、またはその値がない場合は、JVMで使用可能なコア数(ハードコードされた上限は8)です。
Spark Connect
サーバー設定
サーバー設定はSpark Connectサーバーで設定されます。たとえば、./sbin/start-connect-server.sh
でSpark Connectサーバーを起動する場合です。通常は、構成ファイルと--conf/-c
を使用したコマンドラインオプションを介して設定されます。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.connect.grpc.binding.port |
15002 | Spark Connectサーバーのバインドするポート。 | 3.4.0 |
spark.connect.grpc.interceptor.classes |
(なし) | io.grpc.ServerInterceptor インターフェースを実装する必要があるクラス名のカンマ区切りリスト。 |
3.4.0 |
spark.connect.grpc.arrow.maxBatchSize |
4MB | Apache Arrowを使用する場合、サーバー側からクライアント側に送信できる1つのArrowバッチの最大サイズを制限します。現在、サイズは正確ではなく推定値であるため、保守的に70%を使用しています。 | 3.4.0 |
spark.connect.grpc.maxInboundMessageSize |
134217728 | gRPCリクエストの最大受信メッセージサイズを設定します。ペイロードが大きいリクエストは失敗します。 | 3.4.0 |
spark.connect.extensions.relation.classes |
(なし) | プロトコルでカスタムリレーションタイプをサポートするために、トレイトorg.apache.spark.sql.connect.plugin.RelationPlugin を実装するクラスのカンマ区切りリスト。 |
3.4.0 |
spark.connect.extensions.expression.classes |
(なし) | プロトコルでカスタム式タイプをサポートするために、トレイトorg.apache.spark.sql.connect.plugin.ExpressionPlugin を実装するクラスのカンマ区切りリスト。 |
3.4.0 |
spark.connect.extensions.command.classes |
(なし) | プロトコルでカスタムコマンドタイプをサポートするために、トレイトorg.apache.spark.sql.connect.plugin.CommandPlugin を実装するクラスのカンマ区切りリスト。 |
3.4.0 |
セキュリティ
さまざまなSparkサブシステムのセキュリティ保護方法に関する利用可能なオプションについては、セキュリティページを参照してください。
Spark SQL
ランタイムSQL設定
ランタイムSQL設定は、セッションごとに、変更可能なSpark SQL設定です。--conf/-c
で始まる構成ファイルとコマンドラインオプション、またはSparkSession
を作成するために使用されるSparkConf
を設定することで、初期値を設定できます。また、SETコマンドで設定およびクエリを行い、RESETコマンドで初期値に戻すか、ランタイムでSparkSession.conf
のsetterとgetterメソッドを使用することもできます。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes |
(spark.sql.adaptive.shuffle.targetPostShuffleInputSize の値) |
適応型最適化中(spark.sql.adaptive.enabledがtrueの場合)のシャッフルパーティションの推奨サイズ(バイト単位)。Sparkが小さなシャッフルパーティションを結合したり、歪んだシャッフルパーティションを分割したりする際に有効になります。 |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold |
(なし) | 結合の実行時に、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を設定します。この値を-1に設定すると、ブロードキャストを無効にできます。デフォルト値はspark.sql.autoBroadcastJoinThresholdと同じです。この設定は、適応型フレームワークでのみ使用されることに注意してください。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.enabled |
true | trueで 'spark.sql.adaptive.enabled' がtrueの場合、Sparkは、ターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes'で指定)に従って連続したシャッフルパーティションを結合し、小さすぎるタスクが多くなるのを防ぎます。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(なし) | 結合する前のシャッフルパーティションの初期数。設定されていない場合、spark.sql.shuffle.partitionsと等しくなります。この設定は、'spark.sql.adaptive.enabled' と 'spark.sql.adaptive.coalescePartitions.enabled' の両方がtrueの場合にのみ有効です。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 結合後のシャッフルパーティションの最小サイズ。パーティション結合中に適応的に計算されたターゲットサイズが小さすぎる場合に役立ちます。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | trueの場合、Sparkは連続したシャッフルパーティションの結合時に 'spark.sql.adaptive.advisoryPartitionSizeInBytes'(デフォルト64MB)で指定されたターゲットサイズを尊重せず、Sparkクラスタのデフォルトの並列処理に応じてターゲットサイズを適応的に計算します。計算されたサイズは、通常、設定されたターゲットサイズよりも小さくなります。これは、並列処理を最大化し、適応型クエリ実行を有効にした場合のパフォーマンスの低下を防ぐためです。この設定をfalseに設定し、設定されたターゲットサイズを尊重することをお勧めします。 |
3.2.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(なし) | 適応型実行に使用されるカスタムコスト評価クラス。設定されていない場合、Sparkはデフォルトで独自のSimpleCostEvaluatorを使用します。 |
3.2.0 |
spark.sql.adaptive.enabled |
true | trueの場合、適応型クエリ実行を有効にします。これは、正確なランタイム統計に基づいて、クエリ実行の途中でクエリプランを再最適化します。 |
1.6.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | trueの場合、追加のシャッフルが導入されても、OptimizeSkewedJoinを強制的に有効にします。 |
3.3.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | trueで 'spark.sql.adaptive.enabled' がtrueの場合、Sparkは、シャッフルパーティションが不要な場合(たとえば、ソートマージ結合をブロードキャストハッシュ結合に変換した後)、ローカルシャッフルリーダーを使用してシャッフルデータの読み取りを試みます。 |
3.0.0 |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0b | ローカルハッシュマップの構築を許可できるパーティションあたりの最大サイズ(バイト単位)を設定します。この値がspark.sql.adaptive.advisoryPartitionSizeInBytesより小さくなく、すべてのパーティションサイズがこの設定より大きくない場合、結合選択は、spark.sql.join.preferSortMergeJoinの値に関係なく、ソートマージ結合ではなくシャッフルハッシュ結合を使用することを優先します。 |
3.2.0 |
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | trueで 'spark.sql.adaptive.enabled' がtrueの場合、SparkはRebalancePartitions内の歪んだシャッフルパーティションを最適化し、ターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes'で指定)に従ってそれらをより小さなパーティションに分割して、データの歪みを防ぎます。 |
3.2.0 |
spark.sql.adaptive.optimizer.excludedRules |
(なし) | 適応型オプティマイザで無効にするルールのリストを設定します。ルールはルール名で指定され、カンマで区切られます。オプティマイザは、実際に除外されたルールをログに記録します。 |
3.1.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | パーティションのサイズがこのファクタにspark.sql.adaptive.advisoryPartitionSizeInBytesを掛けた値より小さい場合、分割中にそのパーティションはマージされます。 |
3.3.0 |
spark.sql.adaptive.skewJoin.enabled |
true | trueで 'spark.sql.adaptive.enabled' がtrueの場合、Sparkは、歪んだパーティションを分割(必要に応じてレプリケート)することで、シャッフル結合(ソートマージとシャッフルハッシュ)の歪みを動的に処理します。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | パーティションのサイズが、中央値のパーティションサイズにこのファクタを掛けた値より大きく、また 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes' よりも大きい場合、そのパーティションは歪んでいると見なされます。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | パーティションのサイズ(バイト単位)がこの閾値より大きく、また中央値のパーティションサイズに 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' を掛けた値よりも大きい場合、そのパーティションは歪んでいると見なされます。理想的には、この設定は 'spark.sql.adaptive.advisoryPartitionSizeInBytes' よりも大きく設定する必要があります。 |
3.0.0 |
spark.sql.allowNamedFunctionArguments |
true | trueの場合、Sparkは実装されているすべての関数に対して名前付きパラメータのサポートを有効にします。 |
3.5.0 |
spark.sql.ansi.doubleQuotedIdentifiers |
false | trueで 'spark.sql.ansi.enabled' がtrueの場合、Spark SQLは二重引用符(")で囲まれたリテラルを識別子として読み取ります。falseの場合、文字列リテラルとして読み取られます。 |
3.4.0 |
spark.sql.ansi.enabled |
false | trueの場合、Spark SQLはHive準拠ではなく、ANSI準拠のダイアレクトを使用します。たとえば、SQL演算子/関数の入力が無効な場合、Sparkはnullの結果を返すのではなく、ランタイム時に例外をスローします。このダイアレクトの詳細は、Sparkのドキュメントの「ANSI準拠」セクションにあります。一部のANSIダイアレクト機能は、ANSI SQL標準から直接のものではない場合がありますが、その動作はANSI SQLのスタイルに準拠しています。 |
3.0.0 |
spark.sql.ansi.enforceReservedKeywords |
false | trueで 'spark.sql.ansi.enabled' がtrueの場合、Spark SQLパーサーはANSI予約語を強制し、予約語をエイリアス名やテーブル、ビュー、関数などの識別子として使用するSQLクエリを禁止します。 |
3.3.0 |
spark.sql.ansi.relationPrecedence |
false | trueで 'spark.sql.ansi.enabled' がtrueの場合、関係を結合する際にJOINはカンマよりも優先されます。たとえば、 |
3.4.0 |
spark.sql.autoBroadcastJoinThreshold |
10MB | JOIN実行時にすべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を設定します。この値を-1に設定すると、ブロードキャストを無効にできます。現在、統計情報は、`ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan`コマンドが実行されたHiveメタストアテーブルと、ファイルベースのデータソーステーブル(統計情報がデータファイル上で直接計算されるもの)でのみサポートされていることに注意してください。 |
1.1.0 |
spark.sql.avro.compression.codec |
snappy | AVROファイル書き込みで使用される圧縮コーデック。サポートされているコーデック:uncompressed、deflate、snappy、bzip2、xz、zstandard。デフォルトコーデックはsnappyです。 |
2.4.0 |
spark.sql.avro.deflate.level |
-1 | AVROファイルの書き込みで使用されるdeflateコーデックの圧縮レベル。有効な値は1~9の範囲、または-1です。デフォルト値は-1で、現在の実装ではレベル6に対応します。 |
2.4.0 |
spark.sql.avro.filterPushdown.enabled |
true | trueの場合、Avroデータソースへのフィルタプッシュダウンを有効にします。 |
3.1.0 |
spark.sql.broadcastTimeout |
300 | ブロードキャスト結合におけるブロードキャスト待機時間のタイムアウト(秒単位)。 |
1.3.0 |
spark.sql.bucketing.coalesceBucketsInJoin.enabled |
false | trueの場合、バケット数の異なる2つのバケット化されたテーブルを結合する場合、バケット数の多い方のテーブルは、もう一方のテーブルと同じバケット数になるように結合されます。バケット数の多い方は、バケット数の少ない方で割り切れる必要があります。バケットの結合は、ソートマージ結合とシャッフルハッシュ結合に適用されます。注:バケット化されたテーブルの結合により、結合時の不要なシャッフルを回避できますが、並列処理も低下し、シャッフルハッシュ結合でOOMが発生する可能性もあります。 |
3.1.0 |
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio |
4 | 結合される2つのバケット数の比率は、バケット結合が適用されるためにはこの値以下である必要があります。この設定は、「spark.sql.bucketing.coalesceBucketsInJoin.enabled」がtrueに設定されている場合のみ有効です。 |
3.1.0 |
spark.sql.catalog.spark_catalog |
(なし) | Sparkの組み込みv1カタログに対するv2インターフェースとして使用されるカタログ実装:spark_catalog。このカタログは、spark_catalogと識別子名前空間を共有し、それとの整合性がなければなりません。たとえば、spark_catalogでテーブルを読み込むことができる場合、このカタログもテーブルのメタデータを返す必要があります。spark_catalogへの操作を委任するために、実装は「CatalogExtension」を拡張できます。 |
3.0.0 |
spark.sql.cbo.enabled |
false | trueに設定すると、計画統計の推定にCBOを有効にします。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.star.filter |
false | コストベースの結合列挙にスター結合フィルタのヒューリスティックを適用します。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.threshold |
12 | 動的計画法で許可される結合ノードの最大数。 |
2.2.0 |
spark.sql.cbo.joinReorder.enabled |
false | CBOでの結合順序変更を有効にします。 |
2.2.0 |
spark.sql.cbo.planStats.enabled |
false | trueの場合、論理計画はカタログから行数と列統計を取得します。 |
3.0.0 |
spark.sql.cbo.starSchemaDetection |
false | trueの場合、スタースキーマ検出に基づいた結合順序変更を有効にします。 |
2.2.0 |
spark.sql.charAsVarchar |
false | trueの場合、SparkはCREATE/REPLACE/ALTER TABLEコマンドでCHAR型をVARCHAR型に置き換えるため、新しく作成/更新されたテーブルにはCHAR型の列/フィールドが含まれません。CHAR型の列/フィールドを持つ既存のテーブルは、この設定の影響を受けません。 |
3.3.0 |
spark.sql.cli.print.header |
false | trueに設定されている場合、spark-sql CLIはクエリ出力の列名を印刷します。 |
3.2.0 |
spark.sql.columnNameOfCorruptRecord |
_corrupt_record | 解析に失敗した生の/未解析のJSONおよびCSVレコードを格納するための内部列の名前。 |
1.2.0 |
spark.sql.csv.filterPushdown.enabled |
true | trueの場合、CSVデータソースへのフィルタプッシュダウンを有効にします。 |
3.0.0 |
spark.sql.datetime.java8API.enabled |
false | この設定プロパティがtrueに設定されている場合、Java 8 APIの`java.time.Instant`と`java.time.LocalDate`クラスが、Catalystの`TimestampType`と`DateType`の外部型として使用されます。falseに設定されている場合、同じ目的で`java.sql.Timestamp`と`java.sql.Date`が使用されます。 |
3.0.0 |
spark.sql.debug.maxToStringFields |
25 | デバッグ出力で文字列に変換できるシーケンスのようなエントリのフィールドの最大数。制限を超える要素は削除され、「... N more fields」プレースホルダーに置き換えられます。 |
3.0.0 |
spark.sql.defaultCatalog |
spark_catalog | デフォルトカタログの名前。これは、ユーザーが現在のカタログを明示的にまだ設定していない場合の現在のカタログになります。 |
3.0.0 |
spark.sql.error.messageFormat |
PRETTY | PRETTYの場合、エラーメッセージは、エラークラス、メッセージ、クエリコンテキストのテキスト表現で構成されます。MINIMALとSTANDARDの形式は、STANDARDが追加のJSONフィールド`message`を含むかなりきれいなJSON形式です。この設定プロパティは、クエリ実行中のThrift ServerとSQL CLIのエラーメッセージに影響します。 |
3.4.0 |
spark.sql.execution.arrow.enabled |
false | (Spark 3.0以降非推奨、'spark.sql.execution.arrow.pyspark.enabled'を設定してください。) |
2.3.0 |
spark.sql.execution.arrow.fallback.enabled |
true | (Spark 3.0以降非推奨、'spark.sql.execution.arrow.pyspark.fallback.enabled'を設定してください。) |
2.4.0 |
spark.sql.execution.arrow.localRelationThreshold |
48MB | ArrowバッチをSpark DataFrameに変換する場合、Arrowバッチのバイトサイズがこのしきい値よりも小さい場合、ドライバ側でローカルコレクションが使用されます。それ以外の場合は、Arrowバッチが送信され、実行担当者でSpark内部行に逆シリアル化されます。 |
3.4.0 |
spark.sql.execution.arrow.maxRecordsPerBatch |
10000 | Apache Arrowを使用する場合、メモリに書き込める単一のArrowRecordBatchのレコード数の最大値を制限します。0または負の値に設定すると、制限はありません。 |
2.3.0 |
spark.sql.execution.arrow.pyspark.enabled |
(`spark.sql.execution.arrow.enabled`の値) | trueの場合、PySparkでの列方向データ転送にApache Arrowを使用します。この最適化は、1. pyspark.sql.DataFrame.toPandas。2. その入力がPandas DataFrameまたはNumPy ndarrayである`pyspark.sql.SparkSession.createDataFrame`に適用されます。次のデータ型はサポートされていません:TimestampTypeのArrayType。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.fallback.enabled |
(`spark.sql.execution.arrow.fallback.enabled`の値) | trueの場合、'spark.sql.execution.arrow.pyspark.enabled'によって有効化された最適化は、エラーが発生した場合に自動的に非最適化された実装にフォールバックします。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.selfDestruct.enabled |
false | (実験的) trueの場合、ArrowからPandasへの変換時に、PySparkでの列方向データ転送にApache Arrowの自己破壊とスプリットブロックオプションを使用します。これにより、CPU時間のコストでメモリ使用量が削減されます。この最適化は、'spark.sql.execution.arrow.pyspark.enabled'が設定されている場合に`pyspark.sql.DataFrame.toPandas`に適用されます。 |
3.2.0 |
spark.sql.execution.arrow.sparkr.enabled |
false | trueの場合、SparkRでの列方向データ転送にApache Arrowを使用します。この最適化は、1. 入力がR DataFrameである`createDataFrame`。2. `collect`。3. `dapply`。4. `gapply`に適用されます。次のデータ型はサポートされていません:FloatType、BinaryType、ArrayType、StructType、MapType。 |
3.0.0 |
spark.sql.execution.pandas.structHandlingMode |
legacy | pandas DataFrameを作成する際のstruct型の変換モード。"legacy"の場合、1. Arrow最適化が無効な場合、Rowオブジェクトに変換する。2. Arrow最適化が有効な場合、dictに変換するか、重複する入れ子になったフィールド名がある場合は例外を発生させる。"row"の場合、Arrow最適化に関係なくRowオブジェクトに変換する。"dict"の場合、重複する入れ子になったフィールド名がある場合(例:a_0、a_1)、Arrow最適化に関係なく、接尾辞付きキー名を使用してdictに変換する。 |
3.5.0 |
spark.sql.execution.pandas.udf.buffer.size |
(`spark.buffer.size`の値) | `spark.buffer.size`と同じですが、Pandas UDFの実行のみに適用されます。設定されていない場合、フォールバックは`spark.buffer.size`です。Pandasの実行には4バイト以上が必要です。この値を小さくすると、小さなPandas UDFバッチが反復処理されパイプライン化される可能性がありますが、パフォーマンスが低下する可能性があります。SPARK-27870を参照してください。 |
3.0.0 |
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled |
true | trueの場合、Python UDFからのトレースバックが簡素化されます。PySparkのトレースバックからPythonワーカー、(逆)シリアル化などを隠し、UDFからの例外メッセージのみを表示します。これはCPython 3.7以降でのみ機能することに注意してください。 |
3.1.0 |
spark.sql.execution.pythonUDF.arrow.enabled |
false | 通常のPython UDFでArrow最適化を有効にします。この最適化は、指定された関数が少なくとも1つの引数をとる場合にのみ有効にすることができます。 |
3.4.0 |
spark.sql.execution.pythonUDTF.arrow.enabled |
false | Python UDTFのArrow最適化を有効にします。 |
3.5.0 |
spark.sql.execution.topKSortFallbackThreshold |
2147483632 | 「SELECT x FROM t ORDER BY y LIMIT m」のようなSORTに続くLIMITを持つSQLクエリでは、mがこのしきい値以下の場合、メモリ内でトップKソートを実行し、それ以外の場合は必要に応じてディスクに書き出すグローバルソートを実行します。 |
2.4.0 |
spark.sql.files.ignoreCorruptFiles |
false | 破損したファイルを無視するかどうか。trueの場合、破損したファイルに遭遇してもSparkジョブは実行を続け、読み取られた内容は返されます。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 |
2.1.1 |
spark.sql.files.ignoreMissingFiles |
false | 欠損ファイルを無視するかどうか。trueの場合、欠損ファイルに遭遇してもSparkジョブは実行を続け、読み取られた内容は返されます。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 |
2.3.0 |
spark.sql.files.maxPartitionBytes |
128MB | ファイルの読み取り時に単一パーティションにパックするバイト数の最大値。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 |
2.0.0 |
spark.sql.files.maxPartitionNum |
(なし) | 分割ファイルパーティションの推奨される(保証されない)最大数。設定されている場合、Sparkは各パーティションのサイズを変更して、初期パーティション数がこの値を超える場合、パーティション数をこの値に近づけます。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 |
3.5.0 |
spark.sql.files.maxRecordsPerFile |
0 | 単一のファイルに書き出すレコード数の最大値。この値が0または負の場合、制限はありません。 |
2.2.0 |
spark.sql.files.minPartitionNum |
(なし) | 分割ファイルパーティションの推奨される(保証されない)最小数。設定されていない場合、デフォルト値は`spark.sql.leafNodeDefaultParallelism`です。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 |
3.1.0 |
spark.sql.function.concatBinaryAsString |
false | このオプションがfalseに設定され、すべての入力がバイナリの場合、`functions.concat`は出力をバイナリとして返します。それ以外の場合は、文字列として返します。 |
2.3.0 |
spark.sql.function.eltOutputAsString |
false | このオプションがfalseに設定され、すべての入力がバイナリの場合、 |
2.3.0 |
spark.sql.groupByAliases |
true | trueの場合、selectリスト内のエイリアスをgroup by句で使用できます。falseの場合、その場合、分析例外がスローされます。 |
2.2.0 |
spark.sql.groupByOrdinal |
true | trueの場合、group by句の序数を選択リストの位置として扱います。falseの場合、序数は無視されます。 |
2.0.0 |
spark.sql.hive.convertInsertingPartitionedTable |
true | trueに設定され、 |
3.0.0 |
spark.sql.hive.convertMetastoreCtas |
true | trueに設定されている場合、SparkはCTASでHive Serdeの代わりに組み込みのデータソースライターを使用しようとします。このフラグは、ParquetとORC形式に対してそれぞれ |
3.0.0 |
spark.sql.hive.convertMetastoreInsertDir |
true | trueに設定されている場合、SparkはINSERT OVERWRITE DIRECTORYでHive Serdeの代わりに組み込みのデータソースライターを使用しようとします。このフラグは、ParquetとORC形式に対してそれぞれ |
3.3.0 |
spark.sql.hive.convertMetastoreOrc |
true | trueに設定されている場合、Hive Serdeの代わりに、組み込みのORCリーダーとライターを使用して、HiveQL構文を使用して作成されたORCテーブルを処理します。 |
2.0.0 |
spark.sql.hive.convertMetastoreParquet |
true | trueに設定されている場合、Hive Serdeの代わりに、組み込みのParquetリーダーとライターを使用して、HiveQL構文を使用して作成されたParquetテーブルを処理します。 |
1.1.1 |
spark.sql.hive.convertMetastoreParquet.mergeSchema |
false | trueの場合、異なる可能性のあるが互換性のあるParquetスキーマを異なるParquetデータファイルでマージしようとします。この設定は、「spark.sql.hive.convertMetastoreParquet」がtrueの場合にのみ有効です。 |
1.3.1 |
spark.sql.hive.dropPartitionByName.enabled |
false | trueの場合、Sparkはパーティションオブジェクトではなくパーティション名を取得してパーティションを削除するため、パーティション削除のパフォーマンスが向上します。 |
3.4.0 |
spark.sql.hive.filesourcePartitionFileCacheSize |
262144000 | 0以外の値の場合、パーティションファイルメタデータのメモリ内キャッシュを有効にします。すべてのテーブルは、ファイルメタデータに指定されたバイト数まで使用できるキャッシュを共有します。この設定は、Hiveファイルソースパーティション管理が有効になっている場合にのみ有効です。 |
2.1.1 |
spark.sql.hive.manageFilesourcePartitions |
true | trueの場合、ファイルソーステーブルのメタストアパーティション管理も有効にします。これには、データソースと変換されたHiveテーブルの両方が含まれます。パーティション管理が有効になっている場合、データソーステーブルはパーティションをHiveメタストアに保存し、 |
2.1.1 |
spark.sql.hive.metastorePartitionPruning |
true | trueの場合、一部の述語はHiveメタストアにプッシュダウンされるため、一致しないパーティションを早期に排除できます。 |
1.5.0 |
spark.sql.hive.metastorePartitionPruningFallbackOnException |
false | メタストアからMetaExceptionが発生した場合に、すべてのパーティションをHiveメタストアから取得し、Sparkクライアント側でパーティションプルーニングを実行するかどうかを指定します。これが有効になっていて、リストするパーティションが多い場合、Sparkクエリの性能が低下する可能性があります。無効にすると、Sparkはクエリを失敗させます。 |
3.3.0 |
spark.sql.hive.metastorePartitionPruningFastFallback |
false | この設定が有効になっている場合、述語がHiveでサポートされていないか、メタストアからMetaExceptionが発生したためにSparkがフォールバックした場合、Sparkは代わりにパーティション名を最初に取得し、クライアント側でフィルタ式を評価することによってパーティションをプルーニングします。TimeZoneAwareExpressionを含む述語はサポートされていません。 |
3.3.0 |
spark.sql.hive.thriftServer.async |
true | trueに設定されている場合、Hive Thriftサーバーは非同期的にSQLクエリを実行します。 |
1.5.0 |
spark.sql.hive.verifyPartitionPath |
false | trueの場合、HDFSに格納されているデータを読み取る際に、テーブルのルートディレクトリの下にあるすべてのパーティションパスをチェックします。この設定は将来のリリースで非推奨となり、 |
1.4.0 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | カラムキャッシングのバッチサイズを制御します。バッチサイズを大きくすると、メモリの使用効率と圧縮率が向上しますが、データをキャッシングする際にOOMのリスクが高まります。 |
1.1.1 |
spark.sql.inMemoryColumnarStorage.compressed |
true | trueに設定されている場合、Spark SQLはデータの統計に基づいて各カラムの圧縮コーデックを自動的に選択します。 |
1.0.1 |
spark.sql.inMemoryColumnarStorage.enableVectorizedReader |
true | カラムキャッシングのベクトル化されたリーダーを有効にします。 |
2.3.1 |
spark.sql.json.filterPushdown.enabled |
true | trueの場合、JSONデータソースへのフィルタプッシュダウンを有効にします。 |
3.1.0 |
spark.sql.jsonGenerator.ignoreNullFields |
true | JSONデータソースと`to_json`などのJSON関数でJSONオブジェクトを生成する際に、nullフィールドを無視するかどうかを指定します。falseの場合、JSONオブジェクトのnullフィールドにnullを生成します。 |
3.0.0 |
spark.sql.leafNodeDefaultParallelism |
(なし) | ファイルスキャンノード、ローカルデータスキャンノード、範囲ノードなど、データを作成するSpark SQLリーフノードのデフォルトの並列度。この設定のデフォルト値は「SparkContext#defaultParallelism」です。 |
3.2.0 |
spark.sql.mapKeyDedupPolicy |
EXCEPTION | 組み込み関数CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat、TransformKeysにおけるマップキーの重複排除ポリシー。EXCEPTIONの場合、重複するマップキーが検出されるとクエリは失敗します。LAST_WINの場合、最後に挿入されたマップキーが優先されます。 |
3.0.0 |
spark.sql.maven.additionalRemoteRepositories |
https://maven-central.storage-download.googleapis.com/maven2/ | オプションの追加のリモートMavenミラーリポジトリのカンマ区切り文字列設定。これは、デフォルトのMaven Centralリポジトリにアクセスできない場合、IsolatedClientLoaderでHive JARをダウンロードする場合にのみ使用されます。 |
3.0.0 |
spark.sql.maxMetadataStringLength |
100 | メタデータ文字列に対して出力する最大文字数。例: |
3.1.0 |
spark.sql.maxPlanStringLength |
2147483632 | プラン文字列に対して出力する最大文字数。プランがこれより長い場合、それ以降の出力は切り捨てられます。デフォルト設定では常に完全なプランが生成されます。プラン文字列がメモリを消費しすぎている場合、またはドライバーまたはUIプロセスでOutOfMemoryエラーが発生している場合は、この値を8kなどのより低い値に設定します。 |
3.0.0 |
spark.sql.maxSinglePartitionBytes |
9223372036854775807b | 単一パーティションに許可される最大バイト数。それ以外の場合、プランナーはシャッフルを導入して並列処理を向上させます。 |
3.4.0 |
spark.sql.optimizer.collapseProjectAlwaysInline |
false | 追加の複製が発生した場合でも、常に2つの隣接するプロジェクションを折りたたみ、式をインライン化するかどうかの指定。 |
3.3.0 |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
true | trueの場合、パーティション列が結合キーとして使用されている場合、パーティション列の述語を生成します。 |
3.0.0 |
spark.sql.optimizer.enableCsvExpressionOptimization |
true | SQLオプティマイザでCSV式を最適化するかどうかを指定します。これには、from_csvから不要な列をプルーニングすることが含まれます。 |
3.2.0 |
spark.sql.optimizer.enableJsonExpressionOptimization |
true | SQLオプティマイザでJSON式を最適化するかどうかを指定します。これには、from_jsonから不要な列をプルーニングすること、from_json + to_jsonの簡素化、to_json + named_struct(from_json.col1、from_json.col2、....)が含まれます。 |
3.1.0 |
spark.sql.optimizer.excludedRules |
(なし) | オプティマイザで無効にするルールのリストを設定します。ルールはルール名で指定され、カンマで区切られます。この設定内のすべてのルールが最終的に除外されるとは限らないことに注意してください。正しさのために、一部のルールは必要です。オプティマイザは、実際に除外されたルールをログに記録します。 |
2.4.0 |
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold |
10GB | ブルームフィルタアプリケーションサイドプランの集計スキャンサイズのバイトサイズしきい値。ブルームフィルタアプリケーションサイドの集計スキャンバイトサイズは、ブルームフィルタを挿入するにはこの値を超える必要があります。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold |
10MB | ブルームフィルタ作成サイドプランのサイズしきい値。ブルームフィルタを挿入しようとすると、推定サイズはこの値以下である必要があります。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.enabled |
true | trueの場合、シャッフル結合の一方の側に選択的な述語がある場合、もう一方の側にブルームフィルタを挿入してシャッフルデータの量を削減しようとします。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems |
1000000 | ランタイムブルームフィルタの予想されるアイテムのデフォルトの数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumBits |
67108864 | ランタイムブルームフィルタに使用する最大ビット数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumItems |
4000000 | ランタイムブルームフィルタの予想されるアイテムの最大許容数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.numBits |
8388608 | ランタイムブルームフィルタに使用するデフォルトのビット数。 |
3.3.0 |
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled |
true | グループベースの行レベル操作のランタイムグループフィルタリングを有効にします。データのグループを置き換えるデータソース(例:ファイル、パーティション)は、行レベル操作スキャンを計画する際に、提供されたデータソースフィルタを使用してグループ全体をプルーニングできます。ただし、すべての式をデータソースフィルタに変換できるわけではなく、一部の式はSparkによってのみ評価できるため(例:サブクエリ)、このようなフィルタリングは制限されています。グループの書き換えはコストがかかるため、Sparkはランタイムでクエリを実行して、行レベル操作の条件に一致するレコードを見つけ出すことができます。一致するレコードに関する情報は行レベル操作スキャンに返され、データソースは書き換えする必要のないグループを破棄できます。 |
3.4.0 |
spark.sql.optimizer.runtimeFilter.number.threshold |
10 | 単一クエリに対する注入されたランタイムフィルタ(DPP以外)の総数。これは、ブルームフィルタが多すぎることによるドライバーのOOMを防ぐためです。 |
3.3.0 |
spark.sql.optimizer.runtimeFilter.semiJoinReduction.enabled |
false | trueの場合、シャッフル結合の一方の側に選択的な述語がある場合、もう一方の側にセミ結合を挿入してシャッフルデータの量を削減しようとします。 |
3.3.0 |
spark.sql.orc.aggregatePushdown |
false | trueの場合、集計は最適化のためにORCにプッシュダウンされます。MIN、MAX、COUNTを集計式としてサポートします。MIN/MAXの場合、ブール値、整数、浮動小数点数、日付型をサポートします。COUNTの場合、すべてのデータ型をサポートします。ORCファイルフッターから統計情報が欠落している場合、例外がスローされます。 |
3.3.0 |
spark.sql.orc.columnarReaderBatchSize |
4096 | ORCベクトル化リーダーバッチに含める行数。オーバーヘッドを最小限に抑え、データ読み取り時のOOMを回避するために、この数値を慎重に選択する必要があります。 |
2.4.0 |
spark.sql.orc.columnarWriterBatchSize |
1024 | ORCベクトル化ライターバッチに含める行数。オーバーヘッドを最小限に抑え、データ書き込み時のOOMを回避するために、この数値を慎重に選択する必要があります。 |
3.4.0 |
spark.sql.orc.compression.codec |
snappy | ORCファイルの書き込みに使用される圧縮コーデックを設定します。テーブル固有のオプション/プロパティで |
2.3.0 |
spark.sql.orc.enableNestedColumnVectorizedReader |
true | ネストされたカラムのベクトル化されたORCデコードを有効にします。 |
3.2.0 |
spark.sql.orc.enableVectorizedReader |
true | ベクトル化されたORCデコードを有効にします。 |
2.3.0 |
spark.sql.orc.filterPushdown |
true | trueの場合、ORCファイルのフィルタプッシュダウンを有効にします。 |
1.4.0 |
spark.sql.orc.mergeSchema |
false | trueの場合、Orcデータソースはすべてのデータファイルから収集されたスキーマをマージします。それ以外の場合は、ランダムなデータファイルからスキーマが選択されます。 |
3.0.0 |
spark.sql.orderByOrdinal |
true | trueの場合、序数はselectリストの位置として扱われます。falseの場合、order/sort by句の序数は無視されます。 |
2.0.0 |
spark.sql.parquet.aggregatePushdown |
false | trueの場合、集計は最適化のためにParquetにプッシュダウンされます。MIN、MAX、COUNTを集計式としてサポートします。MIN/MAXの場合、ブール値、整数、浮動小数点数、日付型をサポートします。COUNTの場合、すべてのデータ型をサポートします。Parquetファイルフッターから統計情報が欠落している場合、例外がスローされます。 |
3.3.0 |
spark.sql.parquet.binaryAsString |
false | 特にImpalaや古いバージョンのSpark SQLなど、一部の他のParquet生成システムでは、Parquetスキーマの書き出し時にバイナリデータと文字列を区別しません。このフラグは、これらのシステムとの互換性を提供するために、Spark SQLにバイナリデータを文字列として解釈するように指示します。 |
1.1.1 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | Parquetベクトル化リーダーバッチに含める行数。オーバーヘッドを最小限に抑え、データ読み取り時のOOMを回避するために、この数値を慎重に選択する必要があります。 |
2.4.0 |
spark.sql.parquet.compression.codec |
snappy | Parquetファイル書き込み時に使用する圧縮コーデックを設定します。テーブル固有のオプション/プロパティで |
1.1.1 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | ネストされた列(例:構造体、リスト、マップ)のベクトル化されたParquetデコードを有効にします。 |
3.3.0 |
spark.sql.parquet.enableVectorizedReader |
true | ベクトル化されたParquetデコードを有効にします。 |
2.0.0 |
spark.sql.parquet.fieldId.read.enabled |
false | フィールドIDは、Parquetスキーマ仕様のネイティブフィールドです。有効にすると、Parquetリーダーは列名を使用する代わりに、要求されたSparkスキーマ内のフィールドID(存在する場合)を使用してParquetフィールドを検索します。 |
3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | ParquetファイルにフィールドIDがないのに、Sparkの読み込みスキーマがフィールドIDを使用して読み込みを行う場合、このフラグが有効になっていると、サイレントにnullを返します。そうでない場合はエラーになります。 |
3.3.0 |
spark.sql.parquet.fieldId.write.enabled |
true | フィールドIDは、Parquetスキーマ仕様のネイティブフィールドです。有効にすると、ParquetライターはSparkスキーマ内のフィールドIDメタデータ(存在する場合)をParquetスキーマに設定します。 |
3.3.0 |
spark.sql.parquet.filterPushdown |
true | trueに設定すると、Parquetフィルタープッシュダウン最適化を有効にします。 |
1.2.0 |
spark.sql.parquet.inferTimestampNTZ.enabled |
true | 有効にすると、`isAdjustedToUTC = false`という注釈が付いたParquetのタイムスタンプ列は、スキーマ推論時にTIMESTAMP_NTZ型として推論されます。それ以外の場合は、すべてのParquetタイムスタンプ列はTIMESTAMP_LTZ型として推論されます。Sparkはファイル書き込み時に出力スキーマをParquetのフッターメタデータに書き込み、ファイル読み込み時にそれを利用することに注意してください。そのため、この設定は、Sparkによって書き込まれていないParquetファイルのスキーマ推論のみに影響します。 |
3.4.0 |
spark.sql.parquet.int96AsTimestamp |
true | 一部のParquet生成システム、特にImpalaは、タイムスタンプをINT96に格納します。ナノ秒フィールドの精度を失わないようにするために、SparkもタイムスタンプをINT96として格納します。このフラグは、Spark SQLにINT96データをタイムスタンプとして解釈するように指示し、これらのシステムとの互換性を提供します。 |
1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | これは、Impalaによって書き込まれたデータについて、タイムスタンプに変換する際のINT96データにタイムゾーン調整を適用するかどうかを制御します。これは、ImpalaがHiveとSparkとは異なるタイムゾーンオフセットでINT96データを格納するため、必要です。 |
2.3.0 |
spark.sql.parquet.mergeSchema |
false | trueの場合、Parquetデータソースはすべてのデータファイルから収集されたスキーマをマージします。それ以外の場合は、サマリーファイルからスキーマが選択されます。サマリーファイルがない場合は、ランダムなデータファイルから選択されます。 |
1.5.0 |
spark.sql.parquet.outputTimestampType |
INT96 | SparkがデータをParquetファイルに書き込む際に使用するParquetタイムスタンプの種類を設定します。INT96は、Parquetでは非標準ですが一般的に使用されているタイムスタンプの種類です。TIMESTAMP_MICROSはParquetの標準的なタイムスタンプの種類で、Unixエポックからのマイクロ秒数を格納します。TIMESTAMP_MILLISも標準ですが、ミリ秒の精度であるため、Sparkはタイムスタンプ値のマイクロ秒部分を切り捨てる必要があります。 |
2.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | trueの場合、プッシュダウンされたフィルターを使用してParquetのネイティブなレコードレベルのフィルタリングを有効にします。この設定は、'spark.sql.parquet.filterPushdown'が有効になっていて、ベクトル化されたリーダーが使用されていない場合にのみ有効です。'spark.sql.parquet.enableVectorizedReader'をfalseに設定することで、ベクトル化されたリーダーが使用されていないことを確認できます。 |
2.3.0 |
spark.sql.parquet.respectSummaryFiles |
false | trueの場合、Parquetのすべてのpartファイルはサマリーファイルと互換性があると仮定し、スキーマのマージ時にそれらを無視します。それ以外の場合は、デフォルトのfalseの場合、すべてのpartファイルをマージします。これは専門家向けのオプションと見なされ、正確な意味を理解する前に有効にしないでください。 |
1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | trueの場合、Spark 1.4以前の方法でデータが書き込まれます。たとえば、10進値はApache Parquetの固定長のバイト配列形式で書き込まれます。これは、Apache HiveやApache Impalaなどの他のシステムで使用されます。falseの場合、Parquetの新しい形式が使用されます。たとえば、10進数はintベースの形式で書き込まれます。Parquet出力がこの新しい形式をサポートしていないシステムで使用されることを意図している場合は、trueに設定します。 |
1.6.0 |
spark.sql.parser.quotedRegexColumnNames |
false | trueの場合、SELECT文内の引用符で囲まれた識別子(バッククォートを使用)は正規表現として解釈されます。 |
2.3.0 |
spark.sql.pivotMaxValues |
10000 | ピボット列の値を指定せずにピボットを行う場合、エラーなしで収集される(異なる)値の最大数です。 |
1.6.0 |
spark.sql.pyspark.inferNestedDictAsStruct.enabled |
false | PySparkのSparkSession.createDataFrameは、デフォルトでネストされた辞書をマップとして推論します。trueに設定すると、ネストされた辞書を構造体として推論します。 |
3.3.0 |
spark.sql.pyspark.jvmStacktrace.enabled |
false | trueの場合、Pythonのスタックトレースとともに、ユーザー向けのPySpark例外にJVMのスタックトレースが表示されます。デフォルトでは無効になっており、JVMのスタックトレースは非表示になり、Pythonフレンドリーな例外のみが表示されます。これはログレベルの設定とは無関係であることに注意してください。 |
3.0.0 |
spark.sql.pyspark.legacy.inferArrayTypeFromFirstElement.enabled |
false | PySparkのSparkSession.createDataFrameは、デフォルトで配列の要素型を配列内のすべての値から推論します。この設定がtrueに設定されている場合、最初の配列要素からのみ型を推論するという従来の動作が復元されます。 |
3.4.0 |
spark.sql.readSideCharPadding |
true | trueの場合、Sparkは書き込み側の埋め込みに加えて、CHAR型の列/フィールドを読み取る際に文字列の埋め込みを適用します。この設定はデフォルトでtrueになっており、外部テーブルなどの場合にCHAR型のセマンティクスをより確実に適用します。 |
3.4.0 |
spark.sql.redaction.options.regex |
(?i)url | Spark SQLコマンドのオプションマップ内のどのキーが機密情報を含むかを決定するための正規表現です。この正規表現に一致する名前のオプションの値は、explain出力で秘匿化されます。この秘匿化は、spark.redaction.regexによって定義されたグローバルな秘匿化設定の上に適用されます。 |
2.2.2 |
spark.sql.redaction.string.regex |
(spark.redaction.string.regex の値) |
Sparkによって生成された文字列のどの部分が機密情報を含むかを決定するための正規表現です。この正規表現が文字列の部分に一致する場合、その文字列の部分はダミー値に置き換えられます。これは現在、SQL explainコマンドの出力を秘匿化するために使用されています。この設定が設定されていない場合、 |
2.3.0 |
spark.sql.repl.eagerEval.enabled |
false | 早期評価を有効にするかどうかを設定します。trueの場合、REPLが早期評価をサポートする場合に限り、Datasetの上位K行が表示されます。現在、早期評価はPySparkとSparkRでサポートされています。PySparkでは、Jupyterなどのノートブックでは、HTMLテーブル(`_repr_html`によって生成)が返されます。プレーンなPython REPLでは、返された出力はdataframe.show()のようにフォーマットされます。SparkRでは、返された出力はRのdata.frameのように表示されます。 |
2.4.0 |
spark.sql.repl.eagerEval.maxNumRows |
20 | 早期評価によって返される行の最大数です。これは、spark.sql.repl.eagerEval.enabledがtrueに設定されている場合にのみ有効です。この設定の有効範囲は0から(Int.MaxValue - 1)なので、負の数や(Int.MaxValue - 1)より大きいなどの無効な設定は、0と(Int.MaxValue - 1)に正規化されます。 |
2.4.0 |
spark.sql.repl.eagerEval.truncate |
20 | 早期評価によって返される各セルの文字数の最大数です。これは、spark.sql.repl.eagerEval.enabledがtrueに設定されている場合にのみ有効です。 |
2.4.0 |
spark.sql.session.localRelationCacheThreshold |
67108864 | シリアライズ後にドライバ側でキャッシュされるローカルリレーションのサイズ(バイト単位)の閾値です。 |
3.5.0 |
spark.sql.session.timeZone |
(ローカルタイムゾーンの値) | 地域ベースのゾーンIDまたはゾーンオフセットの形式でセッションローカルタイムゾーンのIDを設定します。地域IDは'area/city'(例:'America/Los_Angeles')の形式である必要があります。ゾーンオフセットは'(+|-)HH'、'(+|-)HH:mm'、または'(+|-)HH:mm:ss'(例:'-08'、'+01:00'、または'-13:33:33')の形式である必要があります。また、'UTC'と'Z'は'+00:00'のエイリアスとしてサポートされています。他の略名はあいまいになる可能性があるため、使用はお勧めしません。 |
2.2.0 |
spark.sql.shuffle.partitions |
200 | 結合または集計のためにデータをシャッフルする際に使用するパーティションのデフォルト数です。注:構造化ストリーミングの場合、この設定は同じチェックポイント位置からのクエリ再開間で変更できません。 |
1.1.0 |
spark.sql.shuffledHashJoinFactor |
3 | 小さい側のデータサイズにこの係数を掛けた値が、大きい側よりも小さい場合、シャッフルハッシュ結合を選択できます。 |
3.3.0 |
spark.sql.sources.bucketing.autoBucketedScan.enabled |
true | trueの場合、クエリプランに基づいて入力テーブルでバケット化されたスキャンを実行するかどうかを自動的に決定します。1. クエリがバケット化を利用する演算子(例:結合、グループ化など)を持っていない場合、または2. これらの演算子とテーブルスキャンの間に交換演算子がある場合、バケット化されたスキャンは使用しません。'spark.sql.sources.bucketing.enabled'がfalseに設定されている場合、この設定は効果がありません。 |
3.1.0 |
spark.sql.sources.bucketing.enabled |
true | falseの場合、バケット化されたテーブルは通常のテーブルとして扱われます。 |
2.0.0 |
spark.sql.sources.bucketing.maxBuckets |
100000 | 許容されるバケットの最大数です。 |
2.4.0 |
spark.sql.sources.default |
parquet | 入出力で使用されるデフォルトのデータソースです。 |
1.3.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | ドライバ側でファイルをリストする際に許可されるパスの最大数です。パーティション検出中に検出されたパスの数がこの値を超えると、別のSpark分散ジョブでファイルをリストしようとします。この設定は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 |
1.5.0 |
spark.sql.sources.partitionColumnTypeInference.enabled |
true | trueの場合、パーティション化された列のデータ型を自動的に推論します。 |
1.5.0 |
spark.sql.sources.partitionOverwriteMode |
STATIC | パーティション化されたデータソーステーブルにINSERT OVERWRITEを実行する場合、現在、静的モードと動的モードの2つのモードをサポートしています。静的モードでは、SparkはINSERT文内のパーティション指定(例:PARTITION(a=1,b))に一致するすべてのパーティションを上書きする前に削除します。動的モードでは、Sparkは事前にパーティションを削除せず、実行時にデータが書き込まれたパーティションのみを上書きします。デフォルトでは、Spark 2.3以前と同じ動作を維持するために静的モードを使用します。この設定はHive serdeテーブルには影響しません。Hive serdeテーブルは常に動的モードで上書きされます。これは、キーpartitionOverwriteModeを使用してデータソースの出力オプションとしても設定できます(この設定よりも優先されます)。例:dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。 |
2.3.0 |
spark.sql.sources.v2.bucketing.enabled |
false | spark.sql.sources.bucketing.enabledと同様に、この設定はV2データソースのバケット化を有効にするために使用されます。有効にすると、SparkはSupportsReportPartitioningを通じてV2データソースによって報告された特定の分散を認識し、必要に応じてシャッフルを回避しようとします。 |
3.3.0 |
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled |
false | ストレージパーティション化された結合において、両方の結合側がKeyGroupedPartitioningの場合に、入力パーティションの部分的なクラスタリングを許可するかどうかを指定します。計画時に、Sparkはテーブル統計に基づいてデータサイズが少ない側を選択し、もう一方の側と一致するようにグループ化および複製します。これはスキュー結合の最適化であり、特定のパーティションに大量のデータが割り当てられている場合のデータの歪みを軽減するのに役立ちます。この設定には、`spark.sql.sources.v2.bucketing.enabled`と`spark.sql.sources.v2.bucketing.pushPartValues.enabled`の両方が有効になっている必要があります。 |
3.4.0 |
spark.sql.sources.v2.bucketing.pushPartValues.enabled |
false | `spark.sql.sources.v2.bucketing.enabled`が有効になっている場合に、共通のパーティション値をプッシュダウンするかどうかを指定します。オンにすると、結合の両方がKeyGroupedPartitioningであり、互換性のあるパーティションキーを共有している場合、パーティション値が完全に同じではない場合でも、Sparkはパーティション値のスーパーセットを計算し、その情報をスキャンノードにプッシュダウンします。スキャンノードは、どちらかの側の不足しているパーティション値に対して空のパーティションを使用します。これにより、不要なシャッフルを排除するのに役立つ場合があります。 |
3.4.0 |
spark.sql.statistics.fallBackToHdfs |
false | trueの場合、テーブルメタデータからテーブル統計が利用できない場合、HDFSにフォールバックします。これは、ブロードキャスト結合を使用するのに十分な小ささのテーブルかどうかを判断するのに役立ちます。このフラグは、パーティション化されていないHiveテーブルにのみ有効です。パーティション化されていないデータソーステーブルの場合、テーブル統計が利用できない場合は自動的に再計算されます。パーティション化されたデータソーステーブルとパーティション化されたHiveテーブルの場合、テーブル統計が利用できない場合は`spark.sql.defaultSizeInBytes`になります。 |
2.0.0 |
spark.sql.statistics.histogram.enabled |
false | 有効になっている場合、列統計を計算するときにヒストグラムを生成します。ヒストグラムは、より正確な推定精度を提供できます。現在、Sparkは等高ヒストグラムのみをサポートしています。ヒストグラムの収集には追加コストがかかることに注意してください。たとえば、列統計の収集には通常1回のテーブルスキャンしかかかりませんが、等高ヒストグラムを生成すると、追加のテーブルスキャンが発生します。 |
2.3.0 |
spark.sql.statistics.size.autoUpdate.enabled |
false | テーブルのデータが変更されたときに、テーブルサイズの自動更新を有効にします。テーブルのファイルの総数が非常に多い場合、これはコストが高く、データ変更コマンドの速度を低下させる可能性があることに注意してください。 |
2.3.0 |
spark.sql.storeAssignmentPolicy |
ANSI | 異なるデータ型の列に値を挿入する場合、Sparkは型強制を実行します。現在、型強制ルールのポリシーとして、ANSI、レガシー、厳密の3つのポリシーをサポートしています。ANSIポリシーでは、SparkはANSI SQLに従って型強制を実行します。実際には、PostgreSQLとほぼ同じ動作です。`string`を`int`に変換したり、`double`を`boolean`に変換したりするなど、特定の不合理な型変換は許可しません。レガシーポリシーでは、有効な`Cast`であれば、Sparkは型強制を許可します。これは非常に緩いです。たとえば、`string`を`int`に変換したり、`double`を`boolean`に変換したりすることが許可されます。これはSpark 2.xでの唯一の動作であり、Hiveと互換性があります。厳密なポリシーでは、Sparkは型強制における精度損失やデータの切り捨てを一切許可しません。たとえば、`double`を`int`に変換したり、`decimal`を`double`に変換したりすることは許可されません。 |
3.0.0 |
spark.sql.streaming.checkpointLocation |
(なし) | ストリーミングクエリのためのチェックポイントデータの保存場所のデフォルトです。 |
2.0.0 |
spark.sql.streaming.continuous.epochBacklogQueueSize |
10000 | 遅延エポックを待つためにキューに格納されるエントリの最大数です。このパラメーターがキューのサイズを超えると、ストリームはエラーで停止します。 |
3.0.0 |
spark.sql.streaming.disabledV2Writers |
StreamWriteSupportが無効になっている、完全修飾データソースレジスタクラス名のコンマ区切りリストです。これらのソースへの書き込みは、V1シンクにフォールバックします。 |
2.3.1 | |
spark.sql.streaming.fileSource.cleaner.numThreads |
1 | ファイルソースの完了済みファイルクリーナーで使用されるスレッド数です。 |
3.0.0 |
spark.sql.streaming.forceDeleteTempCheckpointLocation |
false | trueの場合、一時チェックポイントロケーションの強制削除を有効にします。 |
3.0.0 |
spark.sql.streaming.metricsEnabled |
false | アクティブなストリーミングクエリに対してDropwizard/Codahaleメトリクスが報告されるかどうかを指定します。 |
2.0.2 |
spark.sql.streaming.multipleWatermarkPolicy |
min | ストリーミングクエリに複数のウォーターマーク演算子がある場合のグローバルウォーターマーク値の計算ポリシーです。デフォルト値は'min'で、複数の演算子で報告されたウォーターマークの最小値を選択します。その他の代替値は'max'で、複数の演算子で報告されたウォーターマークの最大値を選択します。注:この設定は、同じチェックポイント場所からのクエリ再開間で変更できません。 |
2.4.0 |
spark.sql.streaming.noDataMicroBatches.enabled |
true | 状態のあるストリーミングクエリのための積極的な状態管理のために、ストリーミングマイクロバッチエンジンがデータなしでバッチを実行するかどうかを指定します。 |
2.4.1 |
spark.sql.streaming.numRecentProgressUpdates |
100 | ストリーミングクエリに対して保持する進捗状況更新の数です。 |
2.1.1 |
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition |
false | trueの場合、ストリーミングセッションウィンドウはシャッフルの前にローカルパーティション内のセッションをソートしてマージします。これはシャッフルする行数を減らすためですが、同じセッションに割り当てられるバッチに多数の行がある場合にのみ有効です。 |
3.2.0 |
spark.sql.streaming.stateStore.stateSchemaCheck |
true | trueの場合、Sparkは既存の状態のスキーマに対して状態スキーマを検証し、互換性がない場合はクエリを失敗させます。 |
3.1.0 |
spark.sql.streaming.stopActiveRunOnRestart |
true | 同じストリーミングクエリを複数同時に実行することはサポートされていません。同じまたは異なるSparkSession(同じクラスタ上)でストリーミングクエリのアクティブな同時実行を見つけ、このフラグがtrueの場合、新しいクエリを開始するために古いストリーミングクエリの処理を停止します。 |
3.0.0 |
spark.sql.streaming.stopTimeout |
0 | ストリーミングクエリのstop()メソッドを呼び出したときに、ストリーミング実行スレッドの停止を待つミリ秒数です。0または負の値は、無期限に待ちます。 |
3.0.0 |
spark.sql.thriftServer.interruptOnCancel |
true | trueの場合、1つのクエリがキャンセルされると、実行中のすべてのタスクが中断されます。falseの場合、実行中のすべてのタスクは終了するまで実行されます。 |
3.2.0 |
spark.sql.thriftServer.queryTimeout |
0ms | Thrift Serverでクエリ時間制限を秒単位で設定します。タイムアウトを正の値に設定すると、タイムアウトが超過すると実行中のクエリは自動的にキャンセルされます。それ以外の場合は、クエリは完了するまで実行されます。`java.sql.Statement.setQueryTimeout`を介して各ステートメントにタイムアウト値が設定され、それらがこの設定値よりも小さい場合、それらが優先されます。このタイムアウトを設定し、タスクの完了を待たずにすぐにクエリをキャンセルする場合は、`spark.sql.thriftServer.interruptOnCancel`を一緒に有効にすることを検討してください。 |
3.1.0 |
spark.sql.thriftserver.scheduler.pool |
(なし) | JDBCクライアントセッションのFair Schedulerプールを設定します。 |
1.1.1 |
spark.sql.thriftserver.ui.retainedSessions |
200 | JDBC/ODBC Web UI履歴に保持されるSQLクライアントセッションの数です。 |
1.4.0 |
spark.sql.thriftserver.ui.retainedStatements |
200 | JDBC/ODBC Web UI履歴に保持されるSQLステートメントの数です。 |
1.4.0 |
spark.sql.timestampType |
TIMESTAMP_LTZ | SQL DDL、Cast句、型リテラル、およびデータソースのスキーマ推論を含む、Spark SQLのデフォルトのタイムスタンプ型を設定します。この設定をTIMESTAMP_NTZに設定すると、デフォルト型としてTIMESTAMP WITHOUT TIME ZONEが使用され、TIMESTAMP_LTZに設定すると、TIMESTAMP WITH LOCAL TIME ZONEが使用されます。3.4.0リリース以前は、SparkはTIMESTAMP WITH LOCAL TIME ZONE型のみをサポートしていました。 |
3.4.0 |
spark.sql.tvf.allowMultipleTableArguments.enabled |
false | trueの場合、テーブル値関数に対して複数のテーブル引数を許可し、これらのテーブルのすべての行のデカルト積を受け取ります。 |
3.5.0 |
spark.sql.ui.explainMode |
formatted | Spark SQL UIで使用されるクエリ説明モードを設定します。値は'simple'、'extended'、'codegen'、'cost'、または'formatted'です。デフォルト値は'formatted'です。 |
3.1.0 |
spark.sql.variable.substitute |
true | これにより、`${var}`、`${system:var}`、`${env:var}`のような構文を使用した置換が有効になります。 |
2.0.0 |
静的SQL設定
静的SQL設定は、セッション間で共有され、変更不可能なSpark SQL設定です。これらは、`--conf/-c`プレフィックス付きの構成ファイルとコマンドラインオプション、または`SparkSession`の作成に使用される`SparkConf`を設定することで、最終値で設定できます。外部ユーザーは、`SparkSession.conf`を介して、または`SET`コマンド(例:`SET spark.sql.extensions;`)を介して、静的SQL設定値を照会できますが、設定/解除することはできません。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.sql.cache.serializer |
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer | org.apache.spark.sql.columnar.CachedBatchSerializerを実装するクラスの名前です。これは、SQLデータをより効率的にキャッシュできる形式に変換するために使用されます。基礎となるAPIは変更される可能性があるため、注意して使用してください。複数のクラスを指定することはできません。クラスには引数のないコンストラクターが必要です。 |
3.1.0 |
spark.sql.catalog.spark_catalog.defaultDatabase |
default | セッションカタログのデフォルトデータベースです。 |
3.4.0 |
spark.sql.event.truncate.length |
2147483647 | イベントに追加する前にSQLの長さが切り捨てられるしきい値です。デフォルトでは切り捨てられません。0に設定すると、代わりに呼び出し元がログに記録されます。 |
3.0.0 |
spark.sql.extensions |
(なし) | Sparkセッション拡張機能の構成に使用される`Function1[SparkSessionExtensions, Unit]`を実装するクラスのコンマ区切りリストです。クラスには引数のないコンストラクターが必要です。複数の拡張機能を指定した場合は、指定した順序で適用されます。ルールとプランナー戦略の場合、指定された順序で適用されます。パーサーの場合、最後のパーサーが使用され、各パーサーは前のパーサーに委任できます。関数名の競合の場合、最後に登録された関数名が使用されます。 |
2.2.0 |
spark.sql.hive.metastore.barrierPrefixes |
Spark SQLが通信しているHiveの各バージョンに対して明示的にリロードする必要があるクラスプレフィックスのコンマ区切りリストです。たとえば、通常は共有されるプレフィックス(つまり、`org.apache.spark.*`)で宣言されているHive UDFなどです。 |
1.4.0 | |
spark.sql.hive.metastore.jars |
builtin | HiveMetastoreClientのインスタンス化に使用されるJARの場所です。このプロパティには、次の4つのオプションのいずれかを使用できます。1. "builtin" `-Phive`が有効になっている場合、SparkアセンブリにバンドルされているHive 2.3.9を使用します。このオプションを選択する場合は、`spark.sql.hive.metastore.version`は`2.3.9`であるか、定義されていない必要があります。2. "maven" Mavenリポジトリからダウンロードされた指定されたバージョンのHive JARを使用します。3. "path" コンマ区切り形式で`spark.sql.hive.metastore.jars.path`によって設定されたHive JARを使用します。ローカルパスとリモートパスの両方をサポートします。提供されたJARは、`spark.sql.hive.metastore.version`と同じバージョンである必要があります。4. HiveとHadoopの両方の標準形式のクラスパスです。提供されたJARは、`spark.sql.hive.metastore.version`と同じバージョンである必要があります。 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
HiveMetastoreClientのインスタンス化に使用されるJARのコンマ区切りパスです。この設定は、`spark.sql.hive.metastore.jars`が`path`に設定されている場合にのみ役立ちます。パスは次のいずれかの形式にすることができます。1. file://path/to/jar/foo.jar 2. hdfs://nameservice/path/to/jar/foo.jar 3. /path/to/jar/(URIスキームなしのパスは、conf `fs.defaultFS`のURIスキームに従います)4. [http/https/ftp]://path/to/jar/foo.jar 注:1、2、および3はワイルドカードをサポートします。例:1. file://path/to/jar/,file://path2/to/jar//.jar 2. hdfs://nameservice/path/to/jar/,hdfs://nameservice2/path/to/jar//.jar |
3.1.0 | |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | Spark SQLと特定バージョンのHive間で共有されるクラスローダーを使用してロードする必要があるクラス接頭辞のカンマ区切りリストです。共有する必要があるクラスの例としては、メタストアとの通信に必要なJDBCドライバなどがあります。その他、既に共有されているクラスとやり取りする必要があるクラスも共有する必要があります。例えば、log4jで使用されるカスタムアペンダーなどです。 |
1.4.0 |
spark.sql.hive.metastore.version |
2.3.9 | Hiveメタストアのバージョン。使用可能なオプションは`0.12.0`から`2.3.9`、`3.0.0`から`3.1.3`です。 |
1.4.0 |
spark.sql.hive.thriftServer.singleSession |
false | trueに設定すると、Hive Thriftサーバーはシングルセッションモードで実行されます。すべてのJDBC/ODBC接続は、一時ビュー、関数レジストリ、SQL構成、および現在のデータベースを共有します。 |
1.6.0 |
spark.sql.hive.version |
2.3.9 | バンドルされているSparkディストリビューションのコンパイル済み(別名、ビルトイン)Hiveバージョンです。これは読み取り専用のconfであり、ビルトインのHiveバージョンのレポートにのみ使用されることに注意してください。Sparkが呼び出す異なるメタストアクライアントが必要な場合は、`spark.sql.hive.metastore.version`を参照してください。 |
1.1.1 |
spark.sql.metadataCacheTTLSeconds |
-1000ms | メタデータキャッシュ(パーティションファイルメタデータキャッシュとセッションカタログキャッシュ)の存続時間(TTL)値です。この設定は、この値が正の値(> 0)を持つ場合にのみ有効です。また、`spark.sql.catalogImplementation`を`hive`に設定し、`spark.sql.hive.filesourcePartitionFileCacheSize`を> 0に設定し、`spark.sql.hive.manageFilesourcePartitions`を`true`に設定して、パーティションファイルメタデータキャッシュに適用する必要があります。 |
3.1.0 |
spark.sql.queryExecutionListeners |
(なし) | 新しく作成されたセッションに自動的に追加される、`QueryExecutionListener`を実装するクラス名のリストです。これらのクラスには、引数のないコンストラクタ、または`SparkConf`引数を受け取るコンストラクタのいずれかが必要です。 |
2.3.0 |
spark.sql.sources.disabledJdbcConnProviderList |
無効になっているJDBC接続プロバイダのリストを設定します。このリストには、カンマで区切られたJDBC接続プロバイダの名前が含まれています。 |
3.1.0 | |
spark.sql.streaming.streamingQueryListeners |
(なし) | 新しく作成されたセッションに自動的に追加される、`StreamingQueryListener`を実装するクラス名のリストです。これらのクラスには、引数のないコンストラクタ、または`SparkConf`引数を受け取るコンストラクタのいずれかが必要です。 |
2.4.0 |
spark.sql.streaming.ui.enabled |
true | Spark Web UIが有効になっている場合に、SparkアプリケーションのStructured Streaming Web UIを実行するかどうかを指定します。 |
3.0.0 |
spark.sql.streaming.ui.retainedProgressUpdates |
100 | Structured Streaming UIのためにストリーミングクエリに対して保持する進捗状況更新の数です。 |
3.0.0 |
spark.sql.streaming.ui.retainedQueries |
100 | Structured Streaming UIのために保持する非アクティブクエリの数です。 |
3.0.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UIに保持する実行の数。 |
1.5.0 |
spark.sql.warehouse.dir |
(`$PWD/spark-warehouse`の値) | 管理対象のデータベースとテーブルのデフォルトの場所。 |
2.0.0 |
Spark Streaming
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.streaming.backpressure.enabled |
false | Spark Streamingの内蔵バックプレッシャーメカニズムを有効または無効にします(1.5以降)。これにより、Spark Streamingは現在のバッチスケジューリングの遅延と処理時間を基に受信レートを制御できるため、システムはシステムが処理できる速度でしか受信しなくなります。内部的には、これはレシーバーの最大受信レートを動的に設定します。このレートは、`spark.streaming.receiver.maxRate`および`spark.streaming.kafka.maxRatePerPartition`が設定されている場合、それらの値によって上限が設定されます(下記参照)。 | 1.5.0 |
spark.streaming.backpressure.initialRate |
設定なし | バックプレッシャーメカニズムが有効になっている場合、各レシーバーが最初のバッチに対してデータを受信する初期の最大受信レートです。 | 2.0.0 |
spark.streaming.blockInterval |
200ms | Spark Streamingレシーバーによって受信されたデータがSparkに保存される前にデータのブロックに分割される間隔です。推奨最小値は50msです。詳細については、Spark Streamingプログラミングガイドの[パフォーマンス調整](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)セクションを参照してください。 | 0.8.0 |
spark.streaming.receiver.maxRate |
設定なし | 各レシーバーがデータを受信する最大レート(1秒あたりのレコード数)です。効果的に、各ストリームは1秒あたり最大この数のレコードを消費します。この設定を0または負の数に設定すると、レートに制限がなくなります。詳細については、Spark Streamingプログラミングガイドの[デプロイメントガイド](streaming-programming-guide.html#deploying-applications)を参照してください。 | 1.0.2 |
spark.streaming.receiver.writeAheadLog.enable |
false | レシーバーのライトアヘッドログを有効にします。レシーバーを介して受信されたすべての入力データは、ドライバの障害後に復旧できるようにライトアヘッドログに保存されます。詳細については、Spark Streamingプログラミングガイドの[デプロイメントガイド](streaming-programming-guide.html#deploying-applications)を参照してください。 | 1.2.1 |
spark.streaming.unpersist |
true | Spark Streamingによって生成および永続化されたRDDを、Sparkのメモリから自動的にアンパーシストすることを強制します。Spark Streamingによって受信された生の入力データも自動的にクリアされます。これをfalseに設定すると、生のデータと永続化されたRDDは、自動的にクリアされないため、ストリーミングアプリケーションの外からアクセスできます。ただし、Sparkのメモリ使用量の増加を招きます。 | 0.9.0 |
spark.streaming.stopGracefullyOnShutdown |
false | `true`の場合、SparkはJVMのシャットダウン時に`StreamingContext`を即座にではなく、正常にシャットダウンします。 | 1.4.0 |
spark.streaming.kafka.maxRatePerPartition |
設定なし | 新しいKafkaダイレクトストリームAPIを使用する場合、各Kafkaパーティションからデータを読み取る最大レート(1秒あたりのレコード数)です。詳細については、[Kafka統合ガイド](streaming-kafka-0-10-integration.html)を参照してください。 | 1.3.0 |
spark.streaming.kafka.minRatePerPartition |
1 | 新しいKafkaダイレクトストリームAPIを使用する場合、各Kafkaパーティションからデータを読み取る最小レート(1秒あたりのレコード数)です。 | 2.4.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UIとステータスAPIがガベージコレクションする前に記憶するバッチ数。 | 1.0.0 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | ドライバでライトアヘッドログレコードを書き込んだ後にファイルを閉じるかどうかを指定します。ドライバのメタデータWALにS3(またはフラッシュをサポートしていないファイルシステム)を使用する場合は、これを「true」に設定します。 | 1.6.0 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | レシーバーでライトアヘッドログレコードを書き込んだ後にファイルを閉じるかどうかを指定します。レシーバーのデータWALにS3(またはフラッシュをサポートしていないファイルシステム)を使用する場合は、これを「true」に設定します。 | 1.6.0 |
SparkR
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.r.numRBackendThreads |
2 | RBackendがSparkRパッケージからのRPC呼び出しを処理するために使用するスレッド数。 | 1.4.0 |
spark.r.command |
Rscript | ドライバとワーカーの両方でクラスタモードでRスクリプトを実行するための実行ファイル。 | 1.5.3 |
spark.r.driver.command |
spark.r.command | クライアントモードでドライバのRスクリプトを実行するための実行ファイルです。クラスタモードでは無視されます。 | 1.5.3 |
spark.r.shell.command |
R | クライアントモードでドライバのsparkRシェルを実行するための実行ファイルです。クラスタモードでは無視されます。これは環境変数`SPARKR_DRIVER_R`と同じですが、優先されます。`spark.r.shell.command`はsparkRシェルに使用され、`spark.r.driver.command`はRスクリプトの実行に使用されます。 | 2.1.0 |
spark.r.backendConnectionTimeout |
6000 | RプロセスがRBackendへの接続で設定する接続タイムアウト(秒単位)。 | 2.1.0 |
spark.r.heartBeatInterval |
100 | 接続タイムアウトを防ぐために、SparkRバックエンドからRプロセスに送信されるハートビートの間隔。 | 2.1.0 |
GraphX
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.graphx.pregel.checkpointInterval |
-1 | Pregelにおけるグラフとメッセージのチェックポイント間隔です。多くの反復の後、長いリンケージチェーンによる`stackOverflowError`を回避するために使用されます。チェックポイントはデフォルトで無効になっています。 | 2.2.0 |
デプロイ
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.deploy.recoveryMode |
NONE | クラスタモードで実行中に失敗して再起動した場合、送信されたSparkジョブを回復するための回復モード設定です。これは、StandaloneまたはMesosで実行する場合のクラスタモードにのみ適用されます。 | 0.8.1 |
spark.deploy.zookeeper.url |
なし | `spark.deploy.recoveryMode`がZOOKEEPERに設定されている場合、この設定を使用して接続するZooKeeperのURLを設定します。 | 0.8.1 |
spark.deploy.zookeeper.dir |
なし | `spark.deploy.recoveryMode`がZOOKEEPERに設定されている場合、この設定を使用してリカバリ状態を保存するZooKeeperディレクトリを設定します。 | 0.8.1 |
クラスタマネージャ
Sparkの各クラスタマネージャには、追加の構成オプションがあります。構成は、各モードのページにあります。
YARN
Mesos
Kubernetes
スタンドアロンモード
環境変数
特定のSpark設定は環境変数を使用して構成できます。これらの環境変数は、Sparkがインストールされているディレクトリにある`conf/spark-env.sh`スクリプト(Windowsの場合は`conf/spark-env.cmd`)から読み取られます。StandaloneモードとMesosモードでは、このファイルはホスト名などのマシン固有の情報を提供できます。ローカルのSparkアプリケーションまたは提出スクリプトを実行する場合にも参照されます。
Sparkのインストール時に`conf/spark-env.sh`はデフォルトでは存在しません。ただし、`conf/spark-env.sh.template`をコピーして作成できます。コピーを実行可能にすることを確認してください。
`spark-env.sh`で設定できる変数は次のとおりです。
環境変数 | 意味 |
---|---|
JAVA_HOME |
Javaがインストールされている場所(デフォルトの`PATH`にない場合)。 |
PYSPARK_PYTHON |
ドライバとワーカーの両方でPySparkに使用するPythonバイナリ実行ファイル(使用可能な場合は`python3`、そうでない場合は`python`がデフォルトです)。プロパティ`spark.pyspark.python`が設定されている場合は優先されます。 |
PYSPARK_DRIVER_PYTHON |
ドライバのみにPySparkに使用するPythonバイナリ実行ファイル(デフォルトは`PYSPARK_PYTHON`です)。プロパティ`spark.pyspark.driver.python`が設定されている場合は優先されます。 |
SPARKR_DRIVER_R |
SparkRシェルに使用するRバイナリ実行ファイル(デフォルトは`R`です)。プロパティ`spark.r.shell.command`が設定されている場合は優先されます。 |
SPARK_LOCAL_IP |
バインドするマシンのIPアドレス。 |
SPARK_PUBLIC_DNS |
Sparkプログラムが他のマシンにアドバタイズするホスト名。 |
上記に加えて、各マシンで使用するコア数や最大メモリなど、Sparkの[スタンドアロンクラスタスクリプト](spark-standalone.html#cluster-launch-scripts)を設定するためのオプションもあります。
`spark-env.sh`はシェルスクリプトであるため、これらのいくつかはプログラムで設定できます。たとえば、特定のネットワークインターフェースのIPを検索することで`SPARK_LOCAL_IP`を計算できます。
注:`cluster`モードでYARN上でSparkを実行する場合、環境変数は`conf/spark-defaults.conf`ファイル内の`spark.yarn.appMasterEnv.[EnvironmentVariableName]`プロパティを使用して設定する必要があります。`spark-env.sh`で設定された環境変数は、`cluster`モードのYARN Application Masterプロセスには反映されません。詳細については、[YARN関連のSparkプロパティ](running-on-yarn.html#spark-properties)を参照してください。
ロギングの設定
Sparkはロギングに[log4j](http://logging.apache.org/log4j/)を使用します。`conf`ディレクトリに`log4j2.properties`ファイルを追加することで構成できます。1つの方法は、そこに存在する既存の`log4j2.properties.template`をコピーすることです。
デフォルトでは、SparkはMDC(Mapped Diagnostic Context)に1つのレコードを追加します。`mdc.taskName`は`task 1.0 in stage 0.0`のようなものを表示します。ログに表示するには、`%X{mdc.taskName}`を`patternLayout`に追加できます。さらに、`spark.sparkContext.setLocalProperty(s"mdc.$name", "value")`を使用して、ユーザー固有のデータをMDCに追加できます。MDCのキーは「mdc.$name」の文字列になります。
設定ディレクトリのオーバーライド
デフォルトの「SPARK_HOME/conf」以外の構成ディレクトリを指定するには、SPARK_CONF_DIRを設定できます。Sparkはこのディレクトリから構成ファイル(spark-defaults.conf、spark-env.sh、log4j2.propertiesなど)を使用します。
Hadoopクラスタ設定の継承
Sparkを使用してHDFSから読み書きする予定がある場合は、Sparkのクラスパスに含める必要がある2つのHadoop構成ファイルがあります。
- `hdfs-site.xml`は、HDFSクライアントのデフォルトの動作を提供します。
- `core-site.xml`は、デフォルトのファイルシステム名を設定します。
これらの構成ファイルの場所はHadoopのバージョンによって異なりますが、一般的な場所は`/etc/hadoop/conf`内です。一部のツールは動的に構成を作成しますが、それらのコピーをダウンロードするメカニズムを提供します。
これらのファイルをSparkから見えるようにするには、構成ファイルを含む場所を$SPARK_HOME/conf/spark-env.sh
内のHADOOP_CONF_DIR
に設定します。
カスタムHadoop/Hive設定
SparkアプリケーションがHadoopまたはHive、あるいはその両方と連携している場合、SparkのクラスパスにHadoop/Hiveの構成ファイルが存在する可能性があります。
複数の稼働アプリケーションでは、異なるHadoop/Hiveクライアント側の構成が必要になる場合があります。Sparkのクラスパスにあるhdfs-site.xml
、core-site.xml
、yarn-site.xml
、hive-site.xml
をアプリケーションごとにコピーして変更できます。YARN上で実行されているSparkクラスタでは、これらの構成ファイルはクラスタ全体に設定され、アプリケーションによって安全に変更することはできません。
より良い方法は、spark.hadoop.*
形式のSpark Hadoopプロパティと、spark.hive.*
形式のSpark Hiveプロパティを使用することです。例えば、「spark.hadoop.abc.def=xyz」という構成を追加すると、Hadoopプロパティ「abc.def=xyz」が追加され、「spark.hive.abc=xyz」という構成を追加すると、Hiveプロパティ「hive.abc=xyz」が追加されます。これらは、$SPARK_HOME/conf/spark-defaults.conf
で設定できる通常のSparkプロパティと同じように扱うことができます。
場合によっては、SparkConf
に特定の構成をハードコーディングしない方が良い場合があります。例えば、Sparkでは、空のconfを作成し、Spark/Spark Hadoop/Spark Hiveプロパティを設定するだけで済みます。
val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)
また、実行時に構成を変更または追加することもできます。
./bin/spark-submit \
--name "My app" \
--master local[4] \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jar
カスタムリソーススケジューリングと設定の概要
GPUなどのアクセラレータは、深層学習や信号処理など、特殊なワークロードの高速化に広く使用されています。Sparkは現在、いくつかの注意点がありますが、GPUなどの汎用リソースの要求とスケジューリングをサポートしています。現在の実装では、リソースにはスケジューラによって割り当て可能なアドレスが必要です。クラスタマネージャがリソースをサポートし、適切に構成されている必要があります。
ドライバのリソース要求にはspark.driver.resource.{resourceName}.amount
、エグゼキュータのリソース要求にはspark.executor.resource.{resourceName}.amount
、各タスクの要件の指定にはspark.task.resource.{resourceName}.amount
という構成が使用できます。spark.driver.resource.{resourceName}.discoveryScript
設定は、YARN、Kubernetes、およびSpark Standaloneのクライアントサイドドライバで必要です。spark.executor.resource.{resourceName}.discoveryScript
設定は、YARNとKubernetesで必要です。Kubernetesでは、spark.driver.resource.{resourceName}.vendor
および/またはspark.executor.resource.{resourceName}.vendor
も必要です。各設定の詳細については、上記の構成の説明を参照してください。
Sparkは、指定された構成を使用して、まずクラスタマネージャから対応するリソースを持つコンテナを要求します。コンテナを取得すると、Sparkはそのコンテナにエグゼキュータを起動し、コンテナが持つリソースと各リソースに関連付けられたアドレスを検出します。エグゼキュータはドライバに登録し、そのエグゼキュータで使用可能なリソースを報告します。その後、Sparkスケジューラは各エグゼキュータにタスクをスケジュールし、ユーザーが指定したリソース要件に基づいて特定のリソースアドレスを割り当てることができます。ユーザーは、TaskContext.get().resources
APIを使用して、タスクに割り当てられたリソースを確認できます。ドライバでは、ユーザーはSparkContextのresources
呼び出しを使用して、割り当てられたリソースを確認できます。その後、ユーザーは割り当てられたアドレスを使用して目的の処理を実行するか、使用しているML/AIフレームワークにそれらを渡すことができます。
各要件と詳細については、クラスタマネージャ固有のページを参照してください。- YARN、Kubernetes、スタンドアロンモード。Mesosまたはローカルモードでは現在使用できません。また、複数のワーカーがいるローカルクラスタモードはサポートされていません(スタンドアロンのドキュメントを参照)。
ステージレベルスケジューリングの概要
ステージレベルのスケジューリング機能により、ユーザーはステージレベルでタスクとエグゼキュータのリソース要件を指定できます。これにより、異なるリソースを持つエグゼキュータで異なるステージを実行できます。その代表的な例として、1つのETLステージはCPUのみのエグゼキュータで実行され、次のステージはGPUが必要なMLステージです。ステージレベルのスケジューリングにより、ユーザーはMLステージの実行時にGPUを持つエグゼキュータを要求できます。アプリケーションの開始時にGPUを持つエグゼキュータを取得し、ETLステージの実行中にアイドル状態になる必要はありません。これは、Scala、Java、PythonのRDD APIでのみ使用できます。動的割り当てが有効になっている場合、YARN、Kubernetes、およびスタンドアロンで使用できます。動的割り当てが無効になっている場合、ユーザーはステージレベルで異なるタスクリソース要件を指定でき、これは現在、YARN、Kubernetes、およびスタンドアロンクラスタでサポートされています。YARNページ、Kubernetesページ、またはスタンドアロンページで、実装の詳細を確認してください。
RDD.withResources
およびResourceProfileBuilder
APIを参照して、この機能を使用してください。動的割り当てが無効になっている場合、異なるタスクリソース要件を持つタスクは、DEFAULT_RESOURCE_PROFILE
を持つエグゼキュータを共有します。一方、動的割り当てが有効になっている場合、現在の実装では作成された各ResourceProfile
に対して新しいエグゼキュータを取得する必要があり、現在は正確に一致する必要があります。Sparkは、エグゼキュータの作成に使用されたものとは異なるResourceProfileを必要とするタスクをエグゼキュータに適合させようとしません。使用されていないエグゼキュータは、動的割り当てロジックでアイドルタイムアウトになります。この機能のデフォルト設定では、ステージごとに1つのResourceProfileのみを許可します。ユーザーが1つ以上のResourceProfileをRDDに関連付ける場合、Sparkはデフォルトで例外をスローします。その動作を制御するには、spark.scheduler.resource.profileMergeConflicts
設定を参照してください。spark.scheduler.resource.profileMergeConflicts
が有効になっている場合にSparkが実装する現在のマージ戦略は、競合するResourceProfile内の各リソースの単純な最大値です。Sparkは、各リソースの最大値を持つ新しいResourceProfileを作成します。
プッシュベースシャッフルの概要
プッシュベースのシャッフルは、Sparkシャッフルの信頼性とパフォーマンスを向上させるのに役立ちます。マップタスクによって生成されたシャッフルブロックを、シャッフルパーティションごとにマージされるリモート外部シャッフルサービスにプッシュするベストエフォートのアプローチをとります。リデュースタスクは、マージされたシャッフルパーティションと元のシャッフルブロックの組み合わせを入力データとしてフェッチするため、外部シャッフルサービスによる小さなランダムなディスク読み取りが大きなシーケンシャル読み取りに変換されます。リデュースタスクのデータ局所性の可能性が高まることで、ネットワークIOを最小限に抑えることができます。プッシュベースのシャッフルは、マージされた出力が利用可能な場合のパーティションの結合など、いくつかのシナリオではバッチフェッチよりも優先されます。
プッシュベースのシャッフルは、シャッフル中に大量のディスクI/Oを伴う長時間実行されるジョブ/クエリのパフォーマンスを向上させます。現在、少量のシャッフルデータを取り扱う迅速に実行されるジョブ/クエリには適していません。これは将来のリリースでさらに改善される予定です。
現在、プッシュベースのシャッフルは、外部シャッフルサービスを使用するYARN上のSparkでのみサポートされています。
外部シャッフルサービス(サーバー側)の設定オプション
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.shuffle.push.server.mergedShuffleFileManagerImpl |
org.apache.spark.network.shuffle.
|
MergedShuffleFileManager の実装のクラス名で、プッシュベースのシャッフルを管理します。これは、サーバー側の設定として、プッシュベースのシャッフルを無効または有効にするためのものです。デフォルトでは、サーバー側でプッシュベースのシャッフルは無効になっています。サーバー側でプッシュベースのシャッフルを有効にするには、この設定を |
3.2.0 |
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile |
2MB |
プッシュベースのシャッフル中にマージされたシャッフルファイルを複数のチャンクに分割するときのチャンクの最小サイズ。マージされたシャッフルファイルは、複数の小さなシャッフルブロックで構成されています。マージされたシャッフルファイルを単一のディスクI/Oでフェッチすると、クライアントと外部シャッフルサービスの両方のメモリ要件が増加します。代わりに、外部シャッフルサービスは、 これを高く設定すると、クライアントと外部シャッフルサービスの両方のメモリ要件が増加します。 これを低く設定すると、外部シャッフルサービスへのRPC要求の総数が不必要に増加します。 |
3.2.0 |
spark.shuffle.push.server.mergedIndexCacheSize |
100m |
プッシュベースのシャッフルでマージされたインデックスファイルを保存するために使用できるメモリ内のキャッシュの最大サイズ。このキャッシュは、spark.shuffle.service.index.cache.size で構成されたキャッシュに追加されます。 |
3.2.0 |
クライアント側設定オプション
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.shuffle.push.enabled |
false |
クライアント側でプッシュベースのシャッフルを有効にするにはtrueに設定し、サーバー側のフラグspark.shuffle.push.server.mergedShuffleFileManagerImpl と連携して動作します。 |
3.2.0 |
spark.shuffle.push.finalize.timeout |
10秒 |
与えられたシャッフルマップステージのすべてのマッパーが完了した後、ドライバがリモート外部シャッフルサービスにマージの完了要求を送信する前に待機する時間(秒単位)。これにより、外部シャッフルサービスはブロックをマージするための追加時間が与えられます。これを長く設定すると、パフォーマンスの低下につながる可能性があります。 | 3.2.0 |
spark.shuffle.push.maxRetainedMergerLocations |
500 |
プッシュベースのシャッフルのためにキャッシュされたマージ位置の最大数。現在、マージ位置は、プッシュされたブロックの処理、マージ、および後のシャッフルフェッチのためのマージされたブロックのサービスを担当する外部シャッフルサービスのホストです。 | 3.2.0 |
spark.shuffle.push.mergersMinThresholdRatio |
0.05 |
リデューサステージのパーティション数に基づいて、ステージに必要なシャッフルマージ位置の最小数を計算するために使用される比率。例えば、100個のパーティションを持ち、デフォルト値0.05を使用するリデューサステージは、プッシュベースのシャッフルを有効にするために少なくとも5つの固有のマージ位置が必要です。 | 3.2.0 |
spark.shuffle.push.mergersMinStaticThreshold |
5 |
ステージのプッシュベースのシャッフルを有効にするために使用可能なシャッフルプッシュマージ位置の数に対する静的閾値。この設定は、spark.shuffle.push.mergersMinThresholdRatio と連携して動作します。spark.shuffle.push.mergersMinStaticThreshold とspark.shuffle.push.mergersMinThresholdRatio 比率の数のマージの最大値は、ステージのプッシュベースのシャッフルを有効にするために必要です。例:子ステージのパーティション数が1000個で、spark.shuffle.push.mergersMinStaticThresholdが5、spark.shuffle.push.mergersMinThresholdRatioが0.05の場合、そのステージのプッシュベースのシャッフルを有効にするには、少なくとも50個のマージが必要です。 |
3.2.0 |
spark.shuffle.push.numPushThreads |
(なし) | ブロックプッシャープール内のスレッド数を指定します。これらのスレッドは、接続の作成と、リモート外部シャッフルサービスへのブロックのプッシュを支援します。デフォルトでは、スレッドプールのサイズは、Sparkエグゼキュータコアスレッド数と同じです。 | 3.2.0 |
spark.shuffle.push.maxBlockSizeToPush |
1m |
リモート外部シャッフルサービスにプッシュする個々のブロックの最大サイズ。この閾値より大きいブロックは、リモートでマージするためにプッシュされません。これらのシャッフルブロックは、元の方法でフェッチされます。 これを高く設定すると、リモート外部シャッフルサービスにプッシュされるブロックが増えますが、それらは既存のメカニズムで効率的にフェッチされているため、大きなブロックをリモート外部シャッフルサービスにプッシュする追加のオーバーヘッドが発生します。 この値を低く設定しすぎると、マージされるブロック数が減り、マッパー外部シャッフルサービスから直接フェッチされるため、小さなランダムリードが増加し、全体のディスクI/Oパフォーマンスに悪影響を及ぼします。 |
3.2.0 |
spark.shuffle.push.maxBlockBatchSize |
3m |
単一のプッシュリクエストにグループ化されるシャッフルブロックのバッチの最大サイズ。spark.storage.memoryMapThreshold のデフォルト値(2m )よりもわずかに高く3m に設定されています。これは、ブロックのバッチごとにメモリマップされる可能性が高く、オーバーヘッドが増加するためです。 |
3.2.0 |
spark.shuffle.push.merge.finalizeThreads |
8 | ドライバがシャッフルマージの完了処理に使用するスレッド数。大規模なシャッフルの完了処理には数秒かかる可能性があるため、複数のスレッドを使用することで、プッシュベースのシャッフルが有効になっている場合、ドライバは同時実行されるシャッフルマージの完了リクエストを処理しやすくなります。 | 3.3.0 |
spark.shuffle.push.minShuffleSizeToWait |
500m |
ドライバは、シャッフルデータの合計サイズがこの閾値を超えている場合にのみ、マージ完了処理の完了を待ちます。シャッフルサイズの合計が小さい場合は、ドライバはすぐにシャッフル出力を完了します。 | 3.3.0 |
spark.shuffle.push.minCompletedPushRatio |
1.0 |
プッシュベースのシャッフル中に、ドライバがシャッフルマージの完了処理を開始する前に、プッシュが完了する必要がある最小マップパーティションの割合。 | 3.3.0 |