Spark の構成
- Spark プロパティ
- 環境変数
- ロギングの構成
- 構成ディレクトリのオーバーライド
- Hadoop クラスター構成の継承
- カスタム Hadoop/Hive 構成
- カスタムリソーススケジューリングと構成の概要
- ステージレベルのスケジューリング概要
- プッシュベースシャッフルの概要
Spark はシステムを構成するための 3 つの場所を提供します
- Spark プロパティはほとんどのアプリケーションパラメータを制御し、SparkConf オブジェクトを使用するか、Java システムプロパティを通じて設定できます。
- 環境変数は、各マシン設定 (IP アドレスなど) を設定するために使用でき、各ノードの
conf/spark-env.shスクリプトを通じて行います。 - ロギングは
log4j2.propertiesを通じて構成できます。
Spark プロパティ
Spark プロパティはほとんどのアプリケーション設定を制御し、アプリケーションごとに個別に構成されます。これらのプロパティは、SparkContext に渡される SparkConf で直接設定できます。SparkConf を使用すると、一般的なプロパティ (マスター URL やアプリケーション名など) の一部、および set() メソッドを介した任意のキーと値のペアを構成できます。たとえば、2 つのスレッドでアプリケーションを次のように初期化できます。
local[2] で実行していることに注意してください。これは 2 つのスレッドを意味し、「最小限」の並列処理を表します。これは、分散コンテキストで実行した場合にのみ存在するバグを検出するのに役立ちます。
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)ローカルモードでは 1 つ以上のスレッドを持つことができることに注意してください。Spark Streaming のようなケースでは、スターベーションの問題を防ぐために実際に 1 つ以上のスレッドが必要になる場合があります。
時間期間を指定するプロパティは、時間の単位で構成する必要があります。次の形式が受け入れられます。
25ms (milliseconds)
5s (seconds)
10m or 10min (minutes)
3h (hours)
5d (days)
1y (years)
バイトサイズを指定するプロパティは、サイズの単位で構成する必要があります。次の形式が受け入れられます。
1b (bytes)
1k or 1kb (kibibytes = 1024 bytes)
1m or 1mb (mebibytes = 1024 kibibytes)
1g or 1gb (gibibytes = 1024 mebibytes)
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
単位なしの数値は一般的にバイトとして解釈されますが、一部は KiB または MiB として解釈されます。個々の構成プロパティのドキュメントを参照してください。可能な場合は単位を指定することをお勧めします。
Spark プロパティの動的読み込み
場合によっては、SparkConf に特定の構成をハードコーディングすることを避けたい場合があります。たとえば、異なるマスターまたは異なる量のメモリで同じアプリケーションを実行したい場合です。Spark では、空の構成を作成するだけでよいです。
val sc = new SparkContext(new SparkConf())その後、実行時に構成値を指定できます。
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
myApp.jar
Spark シェルと spark-submit ツールは、構成を動的に読み込む 2 つの方法をサポートしています。1 つ目は、上記の例のようなコマンドラインオプションです。spark-submit は、--conf/-c フラグを使用して任意の Spark プロパティを受け入れることができますが、Spark アプリケーションの起動に関係するプロパティには特別なフラグを使用します。./bin/spark-submit --help を実行すると、これらのオプションの全リストが表示されます。
--conf/-c フラグを介して構成が指定された場合、bin/spark-submit は conf/spark-defaults.conf から構成オプションも読み取ります。このファイルでは、各行は空白で区切られたキーと値で構成されます。例:
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
さらに、Spark 構成を含むプロパティファイルを --properties-file パラメータを介して bin/spark-submit に渡すことができます。これが設定されると、別のパラメータ --load-spark-defaults が提供されない限り、Spark は conf/spark-defaults.conf から構成を読み込まなくなります。
フラグまたはプロパティファイルで指定された値は、アプリケーションに渡され、SparkConf を介して指定された値とマージされます。SparkConf で直接設定されたプロパティが最も優先され、次に spark-submit または spark-shell に渡された --conf フラグまたは --properties-file を介して設定されたプロパティ、次に spark.defaults.conf ファイルのオプションが続きます。以前の Spark バージョンから一部の構成キーの名前が変更されています。その場合、古いキー名も引き続き受け入れられますが、新しいキーのインスタンスよりも優先度が低くなります。
Spark プロパティは主に 2 種類に分けられます。1 つはデプロイに関連するもので、「spark.driver.memory」、「spark.executor.instances」などです。この種のプロパティは、実行時に SparkConf を介してプログラムで設定しても影響がない場合があり、または選択したクラスターマネージャーとデプロイモードによって動作が異なります。そのため、構成ファイルまたは spark-submit コマンドラインオプションを介して設定することをお勧めします。もう 1 つは主に Spark の実行時制御に関連するもので、「spark.task.maxFailures」などです。この種のプロパティは、どちらの方法でも設定できます。
Spark プロパティの表示
URL http://<driver>:4040 のアプリケーション Web UI は、「Environment」タブに Spark プロパティを一覧表示します。これは、プロパティが正しく設定されていることを確認するための便利な場所です。明示的に spark.defaults.conf、SparkConf、またはコマンドラインを介して指定された値のみが表示されることに注意してください。その他のすべての構成プロパティについては、デフォルト値が使用されていると想定できます。
利用可能なプロパティ
内部設定を制御するプロパティのほとんどには、妥当なデフォルト値があります。設定する最も一般的なオプションの一部は次のとおりです。
アプリケーションプロパティ
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.app.name |
(なし) | アプリケーションの名前。UI およびログデータに表示されます。 | 0.9.0 |
spark.driver.cores |
1 | クラスターモードでのみ、ドライバープロセスに使用するコア数。 | 1.3.0 |
spark.driver.maxResultSize |
1g | バイト単位の、各 Spark アクション (例: collect) のすべてのパーティションのシリアライズされた結果の合計サイズの上限。少なくとも 1M、または無制限の場合は 0 である必要があります。合計サイズがこの制限を超えると、ジョブは中止されます。高い制限があると、ドライバーでメモリ不足エラーが発生する可能性があります (spark.driver.memory およびオブジェクトのメモリオーバーヘッドによります)。適切な制限を設定すると、ドライバーをメモリ不足エラーから保護できます。 | 1.2.0 |
spark.driver.memory |
1g | ドライバープロセス (SparkContext が初期化される場所) に使用するメモリ量。JVM メモリ文字列と同じ形式で、サイズ単位の接尾辞 ("k"、"m"、"g"、"t") が付きます (例: 512m、2g)。注: クライアントモードでは、ドライバー JVM はすでに開始されているため、この構成はアプリケーションで直接 SparkConf を介して設定することはできません。代わりに、--driver-memory コマンドラインオプションまたはデフォルトのプロパティファイルで設定してください。 |
1.1.1 |
spark.driver.memoryOverhead |
driverMemory * spark.driver.memoryOverheadFactor、最小値は spark.driver.minMemoryOverhead |
クラスターモードでのドライバープロセスごとに割り当てられる非ヒープメモリの量 (指定がない限り MiB)。これは、VM オーバーヘッド、インターン化された文字列、その他のネイティブオーバーヘッドなどに対応するメモリです。これはコンテナサイズとともに増加する傾向があります (通常 6-10%)。このオプションは現在、YARN および Kubernetes でサポートされています。注: 非ヒープメモリには、オフヒープメモリ (spark.memory.offHeap.enabled=true の場合) およびその他のドライバープロセス (PySpark ドライバーに伴う Python プロセスなど) が使用するメモリ、および同じコンテナで実行されるその他の非ドライバープロセスが使用するメモリが含まれます。ドライバーを実行するコンテナの最大メモリサイズは、spark.driver.memoryOverhead と spark.driver.memory の合計によって決定されます。 |
2.3.0 |
spark.driver.minMemoryOverhead |
384m | クラスターモードでのドライバープロセスごとに割り当てられる非ヒープメモリの最小量 (指定がない限り MiB)。これは、spark.driver.memoryOverhead が定義されていない場合に適用されます。このオプションは現在、YARN および Kubernetes でサポートされています。 |
4.0.0 |
spark.driver.memoryOverheadFactor |
0.10 | クラスターモードでのドライバープロセスごとに追加の非ヒープメモリとして割り当てられるドライバーメモリの割合。これは、VM オーバーヘッド、インターン化された文字列、その他のネイティブオーバーヘッドなどに対応するメモリです。これはコンテナサイズとともに増加する傾向があります。この値は、Kubernetes 非 JVM ジョブの場合は 0.40、それ以外の場合は 0.10 をデフォルトとします。これは、非 JVM タスクがより多くの非 JVM ヒープスペースを必要とし、そのようなタスクが「Memory Overhead Exceeded」エラーで失敗することが多いためです。これは、より高いデフォルト値でこのエラーを事前防止します。spark.driver.memoryOverhead が直接設定されている場合、この値は無視されます。 |
3.3.0 |
spark.driver.resource.{resourceName}.amount |
0 | ドライバーで使用する特定のリソースタイプの量。これを使用する場合、ドライバーが起動時にリソースを見つけるために spark.driver.resource.{resourceName}.discoveryScript も指定する必要があります。 |
3.0.0 |
spark.driver.resource.{resourceName}.discoveryScript |
なし | ドライバーが特定のリソースタイプを検出するために実行するスクリプト。これは、ResourceInformation クラスの形式で標準出力に JSON 文字列を書き込む必要があります。名前とアドレスの配列があります。クライアントから送信されたドライバーの場合、検出スクリプトは、同じホスト上の他のドライバーとは異なるリソースアドレスをこのドライバーに割り当てる必要があります。 | 3.0.0 |
spark.driver.resource.{resourceName}.vendor |
なし | ドライバーに使用するリソースのベンダー。このオプションは現在、Kubernetes でのみサポートされており、実際には Kubernetes デバイスプラグインの命名規則に従ったベンダーとドメインの両方です。(例: Kubernetes 上の GPU の場合、この構成は nvidia.com または amd.com に設定されます) | 3.0.0 |
spark.resources.discoveryPlugin |
org.apache.spark.resource.ResourceDiscoveryScriptPlugin | アプリケーションにロードする org.apache.spark.api.resource.ResourceDiscoveryPlugin を実装するクラス名のカンマ区切りリスト。これは、リソース検出クラスをカスタム実装に置き換えるための高度なユーザー向けです。Spark は、リソース情報が返されるまで、指定された各クラスを試します。どちらのプラグインもリソース情報を提供しない場合、最後に検出スクリプトを試します。 | 3.0.0 |
spark.executor.memory |
1g | 各 executor プロセスに使用するメモリ量。JVM メモリ文字列と同じ形式で、サイズ単位の接尾辞 ("k"、"m"、"g"、"t") が付きます (例: 512m、2g)。 |
0.7.0 |
spark.executor.pyspark.memory |
未設定 | 各 executor の PySpark に割り当てられるメモリ量 (指定がない限り MiB)。設定されている場合、executor の PySpark メモリはこの量に制限されます。設定されていない場合、Spark は Python のメモリ使用量を制限しません。アプリケーションは、他の非 JVM プロセスと共有されるオーバーヘッドメモリ空間を超えることを避ける必要があります。YARN または Kubernetes で PySpark を実行する場合、このメモリは executor リソース要求に追加されます。 注: この機能は Python の resource モジュールに依存しているため、動作と制限は継承されます。たとえば、Windows はリソース制限をサポートしておらず、MacOS では実際のリソースは制限されていません。 |
2.4.0 |
spark.executor.memoryOverhead |
executorMemory * spark.executor.memoryOverheadFactor、最小値は spark.executor.minMemoryOverhead |
各 executor プロセスに割り当てられる追加メモリの量 (指定がない限り MiB)。これは、VM オーバーヘッド、インターン化された文字列、その他のネイティブオーバーヘッドなどに対応するメモリです。これは executor サイズとともに増加する傾向があります (通常 6-10%)。このオプションは現在、YARN および Kubernetes でサポートされています。 注: 追加メモリには、PySpark executor メモリ ( spark.executor.pyspark.memory が構成されていない場合) および同じコンテナで実行される他の非 executor プロセスが使用するメモリが含まれます。executor を実行するコンテナの最大メモリサイズは、spark.executor.memoryOverhead、spark.executor.memory、spark.memory.offHeap.size、および spark.executor.pyspark.memory の合計によって決定されます。 |
2.3.0 |
spark.executor.minMemoryOverhead |
384m | 各 executor プロセスに割り当てられる非ヒープメモリの最小量 (指定がない限り MiB)。これは、spark.executor.memoryOverhead が定義されていない場合に適用されます。このオプションは現在、YARN および Kubernetes でサポートされています。 |
4.0.0 |
spark.executor.memoryOverheadFactor |
0.10 | 各 executor プロセスに追加の非ヒープメモリとして割り当てられる executor メモリの割合。これは、VM オーバーヘッド、インターン化された文字列、その他のネイティブオーバーヘッドなどに対応するメモリです。これはコンテナサイズとともに増加する傾向があります。この値は、Kubernetes 非 JVM ジョブの場合は 0.40、それ以外の場合は 0.10 をデフォルトとします。これは、非 JVM タスクがより多くの非 JVM ヒープスペースを必要とし、そのようなタスクが「Memory Overhead Exceeded」エラーで失敗することが多いためです。これは、より高いデフォルト値でこのエラーを事前防止します。spark.executor.memoryOverhead が直接設定されている場合、この値は無視されます。 |
3.3.0 |
spark.executor.resource.{resourceName}.amount |
0 | executor プロセスごとに使用する特定のリソースタイプの量。これを使用する場合、executor が起動時にリソースを見つけるために spark.executor.resource.{resourceName}.discoveryScript も指定する必要があります。 |
3.0.0 |
spark.executor.resource.{resourceName}.discoveryScript |
なし | executor が特定のリソースタイプを検出するために実行するスクリプト。これは、ResourceInformation クラスの形式で標準出力に JSON 文字列を書き込む必要があります。名前とアドレスの配列があります。 | 3.0.0 |
spark.executor.resource.{resourceName}.vendor |
なし | executor に使用するリソースのベンダー。このオプションは現在、Kubernetes でのみサポートされており、実際には Kubernetes デバイスプラグインの命名規則に従ったベンダーとドメインの両方です。(例: Kubernetes 上の GPU の場合、この構成は nvidia.com または amd.com に設定されます) | 3.0.0 |
spark.extraListeners |
(なし) | SparkListener を実装するクラスのカンマ区切りリスト。SparkContext の初期化時に、これらのクラスのインスタンスが作成され、Spark のリスナーバスに登録されます。クラスに SparkConf を受け入れる単一引数コンストラクタがある場合、そのコンストラクタが呼び出されます。それ以外の場合は、ゼロ引数コンストラクタが呼び出されます。有効なコンストラクタが見つからない場合、SparkContext の作成は例外で失敗します。 |
1.3.0 |
spark.local.dir |
/tmp | Spark の「スクラッチ」スペースに使用するディレクトリ。マップ出力ファイルやディスクに保存される RDD などが含まれます。これは、システム上の高速なローカルディスク上にある必要があります。また、複数のディスク上の複数のディレクトリのカンマ区切りリストにすることもできます。 注: これは、クラスターマネージャーによって設定される SPARK_LOCAL_DIRS (Standalone) または LOCAL_DIRS (YARN) 環境変数によってオーバーライドされます。 |
0.5.0 |
spark.logConf |
false | SparkContext が開始されたときに、効果的な SparkConf を INFO レベルでログに記録します。 | 0.9.0 |
spark.master |
(なし) | 接続するクラスターマネージャー。許可されるマスター URL のリストを参照してください。 | 0.9.0 |
spark.submit.deployMode |
client | Spark ドライバープログラムのデプロイモード。「client」または「cluster」のいずれか。これは、ドライバープログラムをローカルで (「client」) またはクラスター内のノードの 1 つでリモートで (「cluster」) 起動することを意味します。 | 1.5.0 |
spark.log.callerContext |
(なし) | Yarn RM ログ/HDFS 監査ログに書き込まれるアプリケーション情報。Yarn/HDFS で実行する場合。その長さは Hadoop の構成 hadoop.caller.context.max.size に依存します。簡潔である必要があり、通常は最大 50 文字までです。 |
2.2.0 |
spark.log.level |
(なし) | 設定されている場合、SparkStartup で SparkContext.setLogLevel() を呼び出すのと同じように、ユーザー定義のログ設定をオーバーライドします。有効なログレベルは、「ALL」、「DEBUG」、「ERROR」、「FATAL」、「INFO」、「OFF」、「TRACE」、「WARN」です。 |
3.5.0 |
spark.driver.supervise |
false | true の場合、ドライバーがゼロ以外の終了ステータスで失敗した場合、ドライバーが自動的に再起動されます。Spark スタンドアロンモードでのみ効果があります。 | 1.3.0 |
spark.driver.timeout |
0min | Spark ドライバーのタイムアウト (分単位)。0 は無制限を意味します。正の時間値の場合、タイムアウト期間を超えて実行されたドライバーは終了コード 124 で終了します。使用するには、spark.plugins を org.apache.spark.deploy.DriverTimeoutPlugin で設定する必要があります。 |
4.0.0 |
spark.driver.log.localDir |
(なし) | ドライバーログを書き込み、Driver Log UI タブを有効にするローカルディレクトリを指定します。 | 4.0.0 |
spark.driver.log.dfsDir |
(なし) | spark.driver.log.persistToDfs.enabled が true の場合、Spark ドライバーログが同期されるベースディレクトリ。このベースディレクトリ内で、各アプリケーションはアプリケーション固有のファイルにドライバーログを記録します。ユーザーはこれを HDFS ディレクトリなどの統合された場所に設定して、ドライバーログファイルを後で使用できるように永続化したい場合があります。このディレクトリは、任意の Spark ユーザーがファイルを読み書きできるようにし、Spark History Server ユーザーがファイルを削除できるようにする必要があります。さらに、spark.history.fs.driverlog.cleaner.enabled が true で、Spark History Server によって古いログがクリーンアップされる場合、spark.history.fs.driverlog.cleaner.maxAge で設定された最大年齢よりも古いログはクリーンアップされます。 |
3.0.0 |
spark.driver.log.persistToDfs.enabled |
false | true の場合、クライアントモードで実行されている Spark アプリケーションは、spark.driver.log.dfsDir で構成された永続ストレージにドライバーログを書き込みます。spark.driver.log.dfsDir が構成されていない場合、ドライバーログは永続化されません。さらに、Spark History Server で spark.history.fs.driverlog.cleaner.enabled を true に設定してクリーンアップを有効にします。 |
3.0.0 |
spark.driver.log.layout |
%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n%ex | spark.driver.log.localDir および spark.driver.log.dfsDir に同期されるドライバーログのレイアウト。これが構成されていない場合、log4j2.properties で定義された最初のアペンダーのレイアウトが使用されます。それも構成されていない場合、ドライバーログはデフォルトのレイアウトを使用します。 |
3.0.0 |
spark.driver.log.allowErasureCoding |
false | ドライバーログで誤り訂正符号を使用することを許可するかどうか。HDFS では、誤り訂正符号化されたファイルは通常のレプリケートされたファイルほど速く更新されないため、アプリケーションの変更が履歴サーバーに反映されるのに時間がかかる場合があります。これが true であっても、Spark はファイルを強制的に誤り訂正符号化するわけではなく、ファイルシステムのデフォルトを使用するだけであることに注意してください。 | 3.0.0 |
spark.decommission.enabled |
false | decommission が有効な場合、Spark は executor を正常にシャットダウンするように最善を尽くします。spark.storage.decommission.enabled が有効な場合、Spark は decomission されている executor からすべての RDD ブロック (spark.storage.decommission.rddBlocks.enabled で制御) およびシャッフルブロック (spark.storage.decommission.shuffleBlocks.enabled で制御) をリモート executor に移行しようとします。decommission が有効になっている場合、spark.dynamicAllocation.enabled が有効な場合、Spark は executor をキルする代わりに decomission します。 |
3.1.0 |
spark.executor.decommission.killInterval |
(なし) | 外部 (例: 非 Spark) サービスによって、decommission された executor が強制的にキルされるまでの期間。 | 3.1.0 |
spark.executor.decommission.forceKillTimeout |
(なし) | Spark が decomission されている executor を強制的に終了するまでの期間。低い値はブロック移行が完了するのに十分な時間を確保できないため、ほとんどの場合、この値は高く設定する必要があります。 | 3.2.0 |
spark.executor.decommission.signal |
PWR | executor に decommission を開始させるために使用されるシグナル。 | 3.2.0 |
spark.executor.maxNumFailures |
numExecutors * 2、最小値は 3 | アプリケーションを失敗させる前の executor の最大障害数。この構成は YARN および Kubernetes でのみ有効です。 | 3.5.0 |
spark.executor.failuresValidityInterval |
(なし) | executor の障害が独立したものと見なされ、試行回数に累積されなくなるまでの間隔。この構成は YARN および Kubernetes でのみ有効です。 | 3.5.0 |
これら以外にも、以下のプロパティが利用可能であり、状況によっては役立つ場合があります。
実行時環境
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.driver.extraClassPath |
(なし) | ドライバーのクラスパスの先頭に追加する追加のクラスパスエントリ。 注: クライアントモードでは、ドライバー JVM はすでに開始されているため、この構成はアプリケーションで直接 SparkConf を介して設定することはできません。代わりに、--driver-class-path コマンドラインオプションまたはデフォルトのプロパティファイルで設定してください。 |
1.0.0 |
spark.driver.defaultJavaOptions |
(なし) | spark.driver.extraJavaOptions の先頭に追加されるデフォルトの JVM オプションの文字列。これは管理者が設定することを目的としています。たとえば、GC 設定やその他のロギングなどです。このオプションで最大ヒープサイズ (-Xmx) 設定を設定することは illegal です。最大ヒープサイズ設定は、クラスターモードでは spark.driver.memory で、クライアントモードでは --driver-memory コマンドラインオプションで設定できます。注: クライアントモードでは、ドライバー JVM はすでに開始されているため、この構成はアプリケーションで直接 SparkConf を介して設定することはできません。代わりに、--driver-java-options コマンドラインオプションまたはデフォルトのプロパティファイルで設定してください。 |
3.0.0 |
spark.driver.extraJavaOptions |
(なし) | ドライバーに渡す追加の JVM オプションの文字列。これはユーザーが設定することを目的としています。たとえば、GC 設定やその他のロギングなどです。このオプションで Spark プロパティまたは最大ヒープサイズ (-Xmx) 設定を設定することは illegal です。Spark プロパティは、SparkConf オブジェクトまたは spark-defaults.conf ファイルを使用して設定する必要があります。最大ヒープサイズ設定は、spark.driver.memory で設定できます。以下に示すすべてのシンボルは、アプリケーション ID で置き換えられ、executor ID で置き換えられます。たとえば、/tmp にある executor ID のファイルに verbose gc ロギングを有効にするには、次の 'value' を渡します。-verbose:gc -Xloggc:/tmp/-.gc注: クライアントモードでは、ドライバー JVM はすでに開始されているため、この構成はアプリケーションで直接 SparkConf を介して設定することはできません。代わりに、--driver-java-options コマンドラインオプションまたはデフォルトのプロパティファイルで設定してください。spark.driver.defaultJavaOptions がこの構成の先頭に追加されます。 |
1.0.0 |
spark.driver.extraLibraryPath |
(なし) | ドライバー JVM を起動する際に使用する特別なライブラリパスを設定します。 注: クライアントモードでは、ドライバー JVM はすでに開始されているため、この構成はアプリケーションで直接 SparkConf を介して設定することはできません。代わりに、--driver-library-path コマンドラインオプションまたはデフォルトのプロパティファイルで設定してください。 |
1.0.0 |
spark.driver.userClassPathFirst |
false | (実験的) ドライバーでクラスをロードする際に、Spark 独自の JAR よりもユーザー追加の JAR を優先するかどうか。この機能は、Spark の依存関係とユーザーの依存関係との競合を緩和するために使用できます。これは現在実験的な機能です。クラスターモードでのみ使用されます。 | 1.3.0 |
spark.executor.extraClassPath |
(なし) | executor のクラスパスの先頭に追加する追加のクラスパスエントリ。これは主に Spark の古いバージョンとの後方互換性のために存在します。通常、ユーザーはこのオプションを設定する必要はありません。 | 1.0.0 |
spark.executor.defaultJavaOptions |
(なし) | spark.executor.extraJavaOptions の先頭に追加されるデフォルトの JVM オプションの文字列。これは管理者が設定することを目的としています。たとえば、GC 設定やその他のロギングなどです。このオプションで Spark プロパティまたは最大ヒープサイズ (-Xmx) 設定を設定することは illegal です。Spark プロパティは、SparkConf オブジェクトまたは spark-submit スクリプトで使用される spark-defaults.conf ファイルを使用して設定する必要があります。最大ヒープサイズ設定は spark.executor.memory で設定できます。以下に示すすべてのシンボルは、アプリケーション ID で置き換えられ、executor ID で置き換えられます。たとえば、/tmp にある executor ID のファイルに verbose gc ロギングを有効にするには、次の 'value' を渡します。-verbose:gc -Xloggc:/tmp/-.gc |
3.0.0 |
spark.executor.extraJavaOptions |
(なし) | executor に渡す追加の JVM オプションの文字列。これはユーザーが設定することを目的としています。たとえば、GC 設定やその他のロギングなどです。このオプションで Spark プロパティまたは最大ヒープサイズ (-Xmx) 設定を設定することは illegal です。Spark プロパティは、SparkConf オブジェクトまたは spark-defaults.conf ファイルを使用して設定する必要があります。最大ヒープサイズ設定は spark.executor.memory で設定できます。以下に示すすべてのシンボルは、アプリケーション ID で置き換えられ、executor ID で置き換えられます。たとえば、/tmp にある executor ID のファイルに verbose gc ロギングを有効にするには、次の 'value' を渡します。-verbose:gc -Xloggc:/tmp/-.gc spark.executor.defaultJavaOptions がこの構成の先頭に追加されます。 |
1.0.0 |
spark.executor.extraLibraryPath |
(なし) | executor JVM を起動する際に使用する特別なライブラリパスを設定します。 | 1.0.0 |
spark.executor.logs.rolling.maxRetainedFiles |
-1 | システムによって保持される最新のローリングログファイルの数を設定します。古いログファイルは削除されます。デフォルトでは無効です。 | 1.1.0 |
spark.executor.logs.rolling.enableCompression |
false | executor ログ圧縮を有効にします。有効な場合、ロールされた executor ログは圧縮されます。デフォルトでは無効です。 | 2.0.2 |
spark.executor.logs.rolling.maxSize |
1024 * 1024 | executor ログがロールオーバーされるファイルサイズの上限 (バイト単位)。デフォルトではロールオーバーは無効です。古いログの自動クリーニングについては、spark.executor.logs.rolling.maxRetainedFiles を参照してください。 |
1.4.0 |
spark.executor.logs.rolling.strategy |
"" (無効) | executor ログのロールオーバー戦略を設定します。デフォルトでは無効です。「time」(時間ベースのロールオーバー)、「size」(サイズベースのロールオーバー)、または "" (無効) に設定できます。「time」の場合は、spark.executor.logs.rolling.time.interval を使用してロールオーバー間隔を設定します。「size」の場合は、spark.executor.logs.rolling.maxSize を使用してロールオーバーの最大ファイルサイズを設定します。 |
1.1.0 |
spark.executor.logs.rolling.time.interval |
daily | executor ログがロールオーバーされる時間間隔を設定します。デフォルトではロールオーバーは無効です。有効な値は、daily、hourly、minutely、または秒単位の任意の間隔です。古いログの自動クリーニングについては、spark.executor.logs.rolling.maxRetainedFiles を参照してください。 |
1.1.0 |
spark.executor.userClassPathFirst |
false | (実験的) executor インスタンスに適用される、spark.driver.userClassPathFirst と同じ機能。 |
1.3.0 |
spark.executorEnv.[EnvironmentVariableName] |
(なし) | Executor プロセスに EnvironmentVariableName で指定された環境変数を追加します。ユーザーは複数のこの設定を指定して、複数の環境変数を設定できます。 |
0.9.0 |
spark.redaction.regex |
(?i)secret|password|token|access[.]?key | ドライバーおよび executor 環境の Spark 構成プロパティと環境変数に含まれる機密情報を決定するための正規表現。この正規表現がプロパティキーまたは値と一致すると、値は環境 UI および YARN やイベントログなどのさまざまなログから削除されます。 | 2.1.2 |
spark.redaction.string.regex |
(なし) | Spark が生成する文字列の一部に含まれる機密情報を決定するための正規表現。この正規表現が文字列の一部と一致すると、その文字列の一部はダミー値に置き換えられます。これは現在、SQL の explain コマンドの出力を削除するために使用されています。 | 2.2.0 |
spark.python.profile |
false | Python ワーカーでプロファイリングを有効にします。プロファイル結果は sc.show_profiles() で表示されるか、ドライバーが終了する前に表示されます。また、sc.dump_profiles(path) によってディスクにダンプすることもできます。プロファイル結果の一部を手動で表示した場合、ドライバー終了前に自動的に表示されることはありません。デフォルトでは pyspark.profiler.BasicProfiler が使用されますが、これは SparkContext コンストラクタにプロファイラークラスをパラメータとして渡すことでオーバーライドできます。 |
1.2.0 |
spark.python.profile.dump |
(なし) | ドライバーが終了する前にプロファイル結果をダンプするために使用されるディレクトリ。結果は RDD ごとに個別のファイルとしてダンプされます。これらは pstats.Stats() でロードできます。これが指定されている場合、プロファイル結果は自動的に表示されません。 |
1.2.0 |
spark.python.worker.memory |
512m | 集計中に Python ワーカープロセスごとに使用するメモリ量。JVM メモリ文字列と同じ形式で、サイズ単位の接尾辞 ("k"、"m"、"g"、"t") が付きます (例: 512m、2g)。集計中に使用されるメモリがこの量を超えると、ディスクにデータをスピルします。 |
1.1.0 |
spark.python.worker.reuse |
true | Python ワーカーを再利用するかどうか。yes の場合、固定数の Python ワーカーを使用し、タスクごとに Python プロセスを fork() する必要がなくなります。これは、大きなブロードキャストがある場合に非常に役立ちます。その場合、ブロードキャストはタスクごとに JVM から Python ワーカーに転送する必要がなくなります。 | 1.2.0 |
spark.files |
各 executor の作業ディレクトリに配置するファイルのカンマ区切りリスト。グロブも許可されます。 | 1.0.0 | |
spark.submit.pyFiles |
Python アプリケーションの PYTHONPATH に配置する .zip、.egg、または .py ファイルのカンマ区切りリスト。グロブも許可されます。 | 1.0.1 | |
spark.jars |
ドライバーおよび executor のクラスパスに含まれる JAR のカンマ区切りリスト。グロブも許可されます。 | 0.9.0 | |
spark.jars.packages |
ドライバーおよび executor のクラスパスに含まれる JAR の Maven 座標のカンマ区切りリスト。座標は groupId:artifactId:version である必要があります。spark.jars.ivySettings が指定されている場合、アーティファクトはファイルの設定に従って解決されます。それ以外の場合は、ローカルの Maven リポジトリ、次に Maven Central、最後にコマンドラインオプション --repositories で指定された追加のリモートリポジトリでアーティファクトが検索されます。詳細については、高度な依存関係管理を参照してください。 |
1.5.0 | |
spark.jars.excludes |
spark.jars.packages で提供される依存関係を解決する際に除外する groupId:artifactId のカンマ区切りリスト。依存関係の競合を回避するため。 |
1.5.0 | |
spark.jars.ivy |
spark.jars.packages からのローカル Ivy キャッシュおよびパッケージファイルに使用される Ivy ユーザーディレクトリを指定するパス。これは、デフォルトで ~/.ivy2 である Ivy プロパティ ivy.default.ivy.user.dir をオーバーライドします。 |
1.3.0 | |
spark.jars.ivySettings |
spark.jars.packages で指定された JAR の解決を、Maven Central のような組み込みデフォルトではなくカスタマイズするための Ivy 設定ファイルのパス。コマンドラインオプション --repositories または spark.jars.repositories で指定された追加のリポジトリも含まれます。ファイアウォール経由 (例: Artifactory のような社内アーティファクトサーバー) でアーティファクトを解決できるようにするために役立ちます。設定ファイル形式の詳細については、設定ファイルを参照してください。file:// スキームを持つパスのみがサポートされます。スキームを持たないパスは file:// スキームを持つと想定されます。YARN クラスターモードで実行する場合、このファイルは依存関係解決のためにリモートドライバーにローカライズされ、 |
2.2.0 | |
spark.jars.repositories |
--packages または spark.jars.packages で指定された Maven 座標を検索する追加のリモートリポジトリのカンマ区切りリスト。 |
2.3.0 | |
spark.archives |
各 executor の作業ディレクトリに展開されるアーカイブのカンマ区切りリスト。.jar、.tar.gz、.tgz、.zip がサポートされています。ファイル名の後に # を追加して、解凍するディレクトリ名を指定できます。例: file.zip#directory。この構成は実験的です。 |
3.1.0 | |
spark.pyspark.driver.python |
PySpark ドライバーで使用する Python バイナリ実行可能ファイル。(デフォルトは spark.pyspark.python) |
2.1.0 | |
spark.pyspark.python |
PySpark ドライバーおよび executor で使用する Python バイナリ実行可能ファイル。 | 2.1.0 |
シャッフルの動作
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.reducer.maxSizeInFlight |
48m | 各 reduce タスクが同時にフェッチできるマップ出力の最大サイズ (指定がない限り MiB)。各出力は受信用のバッファの作成を必要とするため、これは reduce タスクごとに固定のメモリオーバーヘッドを表します。メモリが多い場合を除き、小さく保ってください。 | 1.4.0 |
spark.reducer.maxReqsInFlight |
Int.MaxValue | この構成は、任意の時点でブロックをフェッチするリモート要求の数を制限します。クラスター内のホスト数が増加すると、1 つ以上のノードへの多数の受信接続が発生し、ワークロードの下でワーカーが失敗する可能性があります。フェッチ要求の数を制限できるようにすることで、このシナリオを軽減できます。 | 2.0.0 |
spark.reducer.maxBlocksInFlightPerAddress |
Int.MaxValue | この構成は、指定されたホストポートからフェッチされるリモートブロックの数を reduce タスクごとに制限します。指定されたアドレスから多数のブロックが単一のフェッチまたは同時に要求される場合、これはサービング executor または Node Manager をクラッシュさせる可能性があります。これは、外部シャッフルが有効になっている場合に Node Manager への負荷を軽減するのに特に役立ちます。値を小さく設定することで、この問題を軽減できます。 | 2.2.1 |
spark.shuffle.compress |
true | マップ出力ファイルを圧縮するかどうか。一般的に良い考えです。圧縮には spark.io.compression.codec が使用されます。 |
0.6.0 |
spark.shuffle.file.buffer |
32k | 各シャッフルファイル出力ストリームのインメモリバッファのサイズ (指定がない限り KiB)。これらのバッファは、中間シャッフルファイルを作成する際のディスクシークとシステムコールの数を減らします。 | 1.4.0 |
spark.shuffle.file.merge.buffer |
32k | 各シャッフルファイル入力ストリームのインメモリバッファのサイズ (指定がない限り KiB)。これらのバッファはオフヒープバッファを使用し、シャッフルファイルのファイル数に関連しています。大きすぎるバッファは避けるべきです。 | 4.0.0 |
spark.shuffle.unsafe.file.output.buffer |
32k | Spark 4.0 から非推奨。spark.shuffle.localDisk.file.output.buffer を使用してください。 |
2.3.0 |
spark.shuffle.localDisk.file.output.buffer |
32k | すべてのローカルディスクシャッフルライターで各パーティションが書き込まれた後のこのバッファサイズのファイルシステム。指定がない限り KiB。 | 4.0.0 |
spark.shuffle.spill.diskWriteBufferSize |
1024 * 1024 | ソートされたレコードをディスクファイルに書き込む際に使用するバッファサイズ (バイト単位)。 | 2.3.0 |
spark.shuffle.io.maxRetries |
3 | (Netty のみ) IO 関連の例外が原因で失敗したフェッチは、これがゼロ以外の値に設定されている場合、自動的に再試行されます。この再試行ロジックは、長い GC 一時停止や一時的なネットワーク接続の問題に直面した場合に、大規模なシャッフルを安定させるのに役立ちます。 | 1.2.0 |
spark.shuffle.io.numConnectionsPerPeer |
1 | (Netty のみ) 大規模クラスターの接続構築を減らすために、ホスト間の接続が再利用されます。ハードディスクが多くホストが少ないクラスターの場合、すべてのディスクを飽和させるのに十分な並列処理が得られない可能性があるため、ユーザーはこの値を増やすことを検討する場合があります。 | 1.2.1 |
spark.shuffle.io.preferDirectBufs |
true | (Netty のみ) シャッフルおよびキャッシュブロック転送中のガベージコレクションを削減するために、オフヒープバッファが使用されます。オフヒープメモリが厳しく制限されている環境では、ユーザーはこの設定を無効にして、Netty からのすべての割り当てをヒープに強制することを望むかもしれません。 | 1.2.0 |
spark.shuffle.io.retryWait |
5s | (Netty のみ) フェッチの再試行間に待機する時間。再試行による最大遅延はデフォルトで 15 秒で、maxRetries * retryWait として計算されます。 |
1.2.1 |
spark.shuffle.io.backLog |
-1 | シャッフルサービスの受け入れキューの長さ。大規模なアプリケーションの場合、サービスが短期間に大量の接続要求を処理できない場合に受信接続がドロップされないように、この値を増やす必要がある場合があります。これは、シャッフルサービス自体が実行されている場所 (アプリケーション外である可能性がある) で構成する必要があり、spark.shuffle.service.enabled オプションを参照してください。1 未満に設定すると、OS のデフォルト (Netty の io.netty.util.NetUtil#SOMAXCONN で定義) にフォールバックします。 |
1.1.1 |
spark.shuffle.io.connectionTimeout |
spark.network.timeout の値 |
確立されたシャッフルサーバーとクライアント間の接続のタイムアウト。アイドル状態とマークされ、まだ未完了のフェッチ要求があるが、チャネルにトラフィックがない場合、connectionTimeout 以上の間、接続は閉じられます。 |
1.2.0 |
spark.shuffle.io.connectionCreationTimeout |
spark.shuffle.io.connectionTimeout の値 |
シャッフルサーバーとクライアント間の接続確立のタイムアウト。 | 3.2.0 |
spark.shuffle.service.enabled |
false | 外部シャッフルサービスを有効にします。このサービスは、executor によって書き込まれたシャッフルファイルを保持します。たとえば、executor が安全に削除できるようにしたり、executor の障害が発生した場合でもシャッフルフェッチを続行できるようにします。外部シャッフルサービスを有効にするには、セットアップが必要です。詳細については、動的割り当ての構成とセットアップのドキュメントを参照してください。 | 1.2.0 |
spark.shuffle.service.port |
7337 | 外部シャッフルサービスが実行されるポート。 | 1.2.0 |
spark.shuffle.service.name |
spark_shuffle | クライアントが通信する必要のある Spark シャッフルサービスの構成済み名前。これは、YARN NodeManager の構成 (yarn.nodemanager.aux-services) でシャッフルを構成するために使用される名前と一致する必要があります。spark.shuffle.service.enabled が true に設定されている場合にのみ有効です。 |
3.2.0 |
spark.shuffle.service.index.cache.size |
100m | キャッシュエントリは、指定されたメモリフットプリントに制限されます (指定がない限りバイト単位)。 | 2.3.0 |
spark.shuffle.service.removeShuffle |
true | シャッフルが不要になったときに、deallocated された executor のシャッフルブロックを削除するために ExternalShuffleService を使用するかどうか。これが有効でない場合、deallocated された executor のシャッフルデータは、アプリケーションが終了するまでディスク上に残ります。 | 3.3.0 |
spark.shuffle.maxChunksBeingTransferred |
Long.MAX_VALUE | シャッフルサービスで同時に転送できるチャンクの最大数。新しい受信接続は、最大数に達すると閉じられます。クライアントはシャッフル再試行設定 (spark.shuffle.io.maxRetries および spark.shuffle.io.retryWait を参照) に従って再試行します。これらの制限に達した場合、タスクはフェッチ失敗で失敗します。 |
2.3.0 |
spark.shuffle.sort.bypassMergeThreshold |
200 | (高度) ソートベースのシャッフルマネージャーでは、マップサイド集計がなく、reduce パーティションがこの数以下の場合、マージソートを回避します。 | 1.1.1 |
spark.shuffle.sort.io.plugin.class |
org.apache.spark.shuffle.sort.io.LocalDiskShuffleDataIO | シャッフル IO に使用するクラスの名前。 | 3.0.0 |
spark.shuffle.spill.compress |
true | シャッフル中にスピルされたデータを圧縮するかどうか。圧縮には spark.io.compression.codec が使用されます。 |
0.9.0 |
spark.shuffle.accurateBlockThreshold |
100 * 1024 * 1024 | HighlyCompressedMapStatus のシャッフルブロックのサイズが正確に記録されるバイト単位のしきい値。これは、シャッフルブロックを取得する際にシャッフルブロックサイズの過小評価を避けることで OOM を防ぐのに役立ちます。 | 2.2.1 |
spark.shuffle.accurateBlockSkewedFactor |
-1.0 | シャッフルブロックのサイズが、中央値シャッフルブロックサイズまたは spark.shuffle.accurateBlockThreshold にこの係数を掛けた値よりも大きい場合、そのシャッフルブロックはスキューしていると見なされ、HighlyCompressedMapStatus に正確に記録されます。このパラメータを spark.sql.adaptive.skewJoin.skewedPartitionFactor と同じ値に設定することをお勧めします。デフォルトでこの機能を無効にするには -1.0 に設定してください。 |
3.3.0 |
spark.shuffle.registration.timeout |
5000 | 外部シャッフルサービスへの登録のタイムアウト (ミリ秒)。 | 2.3.0 |
spark.shuffle.registration.maxAttempts |
3 | 外部シャッフルサービスへの登録に失敗した場合、maxAttempts 回まで再試行します。 | 2.3.0 |
spark.shuffle.reduceLocality.enabled |
true | reduce タスクのローカリティ設定を計算するかどうか。 | 1.5.0 |
spark.shuffle.mapOutput.minSizeForBroadcast |
512k | マップ出力ステータスを executor にブロードキャストするために使用するサイズ。 | 2.0.0 |
spark.shuffle.detectCorrupt |
true | フェッチされたブロックに破損がないか検出するかどうか。 | 2.2.0 |
spark.shuffle.detectCorrupt.useExtraMemory |
false | 有効な場合、圧縮/暗号化ストリームの一部は、早期の破損を検出するために追加メモリを使用して解凍/復号化されます。IOExceptions が発生すると、タスクは 1 回再試行され、同じ例外で再度失敗した場合、FetchFailedException がスローされて前のステージが再試行されます。 | 3.0.0 |
spark.shuffle.useOldFetchProtocol |
false | シャッフルブロックのフェッチ中に古いプロトコルを使用するかどうか。新しい Spark バージョンのジョブが古いバージョンの外部シャッフルサービスからシャッフルブロックをフェッチするシナリオでの互換性が必要な場合にのみ有効になります。 | 3.0.0 |
spark.shuffle.readHostLocalDisk |
true | 有効な場合 (および spark.shuffle.useOldFetchProtocol が無効な場合)、同じホストで実行されているブロックマネージャーから要求されたシャッフルブロックは、ネットワーク経由でリモートブロックとしてフェッチされるのではなく、ディスクから直接読み取られます。 |
3.0.0 |
spark.files.io.connectionTimeout |
spark.network.timeout の値 |
Spark RPC 環境でファイルをフェッチするための確立された接続のタイムアウト。アイドル状態とマークされ、まだ未完了のファイルがダウンロードされているがチャネルにトラフィックがない場合、connectionTimeout 以上の間、接続は閉じられます。 |
1.6.0 |
spark.files.io.connectionCreationTimeout |
spark.files.io.connectionTimeout の値 |
Spark RPC 環境でファイルをフェッチするための接続確立のタイムアウト。 | 3.2.0 |
spark.shuffle.checksum.enabled |
true | シャッフルデータのチェックサムを計算するかどうか。有効な場合、Spark はマップ出力ファイル内の各パーティションデータのチェックサム値を計算し、ディスク上にチェックサムファイルを保存します。シャッフルデータの破損が検出された場合、Spark はチェックサムファイルを使用して破損の原因 (例: ネットワークの問題、ディスクの問題など) を診断しようとします。 | 3.2.0 |
spark.shuffle.checksum.algorithm |
ADLER32 | シャッフルチェックサムを計算するために使用されるアルゴリズム。現在、JDK の組み込みアルゴリズム、たとえば ADLER32、CRC32、CRC32C のみをサポートしています。 | 3.2.0 |
spark.shuffle.service.fetch.rdd.enabled |
false | ディスクに永続化された RDD ブロックをフェッチするために ExternalShuffleService を使用するかどうか。動的割り当ての場合、この機能が有効な場合、ディスクに永続化されたブロックのみを持つ executor は spark.dynamicAllocation.executorIdleTimeout の後にアイドルと見なされ、それに応じて解放されます。 |
3.0.0 |
spark.shuffle.service.db.enabled |
true | ExternalShuffleService で DB を使用するかどうか。これはスタンドアロンモードにのみ影響することに注意してください。 | 3.0.0 |
spark.shuffle.service.db.backend |
ROCKSDB | シャッフルサービスローカル DB で使用されるディスクベースのストアを指定します。ROCKSDB または LEVELDB (非推奨) として設定します。 | 3.4.0 |
Spark UI
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.eventLog.logBlockUpdates.enabled |
false | spark.eventLog.enabled が true の場合、各ブロック更新のイベントをログに記録するかどうか。*警告*: これにより、イベントログのサイズが大幅に増加します。 |
2.3.0 |
spark.eventLog.longForm.enabled |
false | true の場合、イベントログで呼び出しサイトの長形式を使用します。それ以外の場合は短形式を使用します。 | 2.4.0 |
spark.eventLog.compress |
true | spark.eventLog.enabled が true の場合、ログに記録されたイベントを圧縮するかどうか。 |
1.0.0 |
spark.eventLog.compression.codec |
zstd | ログに記録されたイベントを圧縮するために使用されるコーデック。デフォルトでは、Spark は 4 つのコーデックを提供します: lz4、lzf、snappy、および zstd。コーデックを指定するために完全修飾クラス名を使用することもできます。例: org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.SnappyCompressionCodec、および org.apache.spark.io.ZStdCompressionCodec。 |
3.0.0 |
spark.eventLog.erasureCoding.enabled |
false | イベントログで誤り訂正符号を使用することを許可するかどうか、またはファイルシステムのデフォルトに関係なく誤り訂正符号を無効にするかどうか。HDFS では、誤り訂正符号化されたファイルは通常のレプリケートされたファイルほど速く更新されないため、アプリケーションの更新が履歴サーバーに表示されるのに時間がかかります。これが true であっても、Spark はファイルを強制的に誤り訂正符号化するわけではなく、ファイルシステムのデフォルトを使用するだけであることに注意してください。 | 3.0.0 |
spark.eventLog.dir |
file:///tmp/spark-events | spark.eventLog.enabled が true の場合、Spark イベントがログに記録されるベースディレクトリ。このベースディレクトリ内で、Spark は各アプリケーションのサブディレクトリを作成し、アプリケーション固有のイベントをこのディレクトリにログに記録します。ユーザーはこれを HDFS ディレクトリなどの統合された場所に設定して、履歴サーバーが履歴ファイルを読み取れるようにしたい場合があります。 |
1.0.0 |
spark.eventLog.enabled |
false | Spark イベントをログに記録するかどうか。アプリケーションが終了した後に Web UI を再構築するのに役立ちます。 | 1.0.0 |
spark.eventLog.overwrite |
false | 既存のファイルを上書きするかどうか。 | 1.0.0 |
spark.eventLog.buffer.kb |
100k | 出力ストリームに書き込む際に使用するバッファサイズ (指定がない限り KiB)。 | 1.0.0 |
spark.eventLog.rolling.enabled |
false | イベントログファイルのロールオーバーが有効かどうか。true に設定すると、各イベントログファイルが構成されたサイズでカットダウンされます。 | 3.0.0 |
spark.eventLog.rolling.maxFileSize |
128m | spark.eventLog.rolling.enabled=true の場合、ロールオーバーされるイベントログファイルの最大サイズを指定します。 |
3.0.0 |
spark.ui.dagGraph.retainedRootRDDs |
Int.MaxValue | Spark UI およびステータス API がガベージコレクションする前に保持する DAG グラフノードの数。 | 2.1.0 |
spark.ui.groupSQLSubExecutionEnabled |
true | SQL UI で、同じルート実行に属するサブ実行をグループ化するかどうか。 | 3.4.0 |
spark.ui.enabled |
true | Spark アプリケーションの Web UI を実行するかどうか。 | 1.1.1 |
spark.ui.store.path |
なし | ライブ UI の診断情報をキャッシュするローカルディレクトリ。デフォルトでは設定されておらず、すべてのアプリケーション情報がメモリに保持されます。 | 3.4.0 |
spark.ui.killEnabled |
true | Web UI からジョブやステージをキルできるようにします。 | 1.0.0 |
spark.ui.threadDumpsEnabled |
true | Stages および Executor ページで executor スレッドダンプへのリンクを表示するかどうか。 | 1.2.0 |
spark.ui.threadDump.flamegraphEnabled |
true | executor スレッドダンプの FlameGraph をレンダリングするかどうか。 | 4.0.0 |
spark.ui.heapHistogramEnabled |
true | Executor ページで executor ヒープヒストグラムへのリンクを表示するかどうか。 | 3.5.0 |
spark.ui.liveUpdate.period |
100ms | ライブエンティティを更新する頻度。-1 は、アプリケーションの再生時に「決して更新しない」ことを意味し、最後の書き込みのみが発生します。ライブアプリケーションの場合、これは、多数のタスクイベントを急速に処理する際に、いくつか省略できる操作を回避します。 | 2.3.0 |
spark.ui.liveUpdate.minFlushPeriod |
1s | 古い UI データがフラッシュされるまでの最小時間間隔。これは、受信タスクイベントが頻繁に発生しない場合の UI の古さを回避します。 | 2.4.2 |
spark.ui.port |
4040 | メモリとワークロードデータを表示するアプリケーションダッシュボードのポート。 | 0.7.0 |
spark.ui.retainedJobs |
1000 | Spark UI およびステータス API がガベージコレクションする前に保持するジョブの数。これは目標最大値であり、状況によってはより少ない要素が保持される場合があります。 | 1.2.0 |
spark.ui.retainedStages |
1000 | Spark UI およびステータス API がガベージコレクションする前に保持するステージの数。これは目標最大値であり、状況によってはより少ない要素が保持される場合があります。 | 0.9.0 |
spark.ui.retainedTasks |
100000 | Spark UI およびステータス API がガベージコレクションする前に、1 ステージあたりのタスクの数。これは目標最大値であり、状況によってはより少ない要素が保持される場合があります。 | 2.0.1 |
spark.ui.reverseProxy |
false | Spark Master をワーカーおよびアプリケーション UI のリバースプロキシとして実行することを有効にします。このモードでは、Spark Master はワーカーおよびアプリケーション UI をリバースプロキシして、それらのホストへの直接アクセスを必要とせずにアクセスできるようにします。ワーカーおよびアプリケーション UI は直接アクセスできなくなり、spark master/proxy の公開 URL からのみアクセスできるため、注意して使用してください。この設定は、クラスターで実行されているすべてのワーカーおよびアプリケーション UI に影響し、すべてのワーカー、ドライバー、およびマスターで設定する必要があります。 | 2.1.0 |
spark.ui.reverseProxyUrl |
Spark UI が別のフロントエンドリバースプロキシを介して提供される場合、これはそのリバースプロキシを介して Spark Master UI にアクセスするための URL です。これは、認証用のプロキシ (例: OAuth プロキシ) を実行する場合に便利です。URL にはパスプレフィックスが含まれる場合があります (例: http://mydomain.com/path/to/spark/)。これにより、同じ仮想ホストとポートを介して複数の Spark クラスターおよびその他の Web アプリケーションの UI を提供できます。通常、これはスキーム (http/https)、ホスト、およびポートを含む絶対 URL である必要があります。 "/" で始まる相対 URL をここで指定することも可能です。この場合、Spark UI および Spark REST API によって生成されるすべての URL はサーバー相対リンクになります。これにより、マスターページの相対リンクは正しく機能します。この設定は Spark UI のリンク生成に影響しますが、フロントエンドリバースプロキシは次の責任を負います。
spark.ui.reverseProxy がオンになっている場合にのみ有効です。Spark Master Web UI が直接到達可能な場合は、この設定は必要ありません。設定の値は、「/」で分割された後、キーワード proxy または history を含めることはできません。Spark UI は、URI から REST API エンドポイントを取得するために、両方のキーワードに依存しています。 |
2.1.0 | |
spark.ui.proxyRedirectUri |
Spark がプロキシの背後で実行されている場合のリダイレクト先。これにより、Spark はリダイレクト応答を Spark UI のアドレスではなくプロキシサーバーを指すように変更します。これは、サーバーのアドレスのみである必要があり、アプリケーションのプレフィックスパスはありません。プレフィックスは、プロキシサーバー自体 (X-Forwarded-Context リクエストヘッダーを追加する) または Spark アプリケーションの構成でプロキシベースを設定することによって設定する必要があります。 |
3.0.0 | |
spark.ui.showConsoleProgress |
false | コンソールに進行状況バーを表示します。進行状況バーは、500ms 以上実行されたステージの進行状況を表示します。複数のステージが同時に実行される場合、同じ行に複数の進行状況バーが表示されます。 注: シェル環境では、spark.ui.showConsoleProgress のデフォルト値は true です。 |
1.2.1 |
spark.ui.consoleProgress.update.interval |
200 | コンソールで進行状況バーを更新する間隔 (ミリ秒)。 | 2.1.0 |
spark.ui.custom.executor.log.url |
(なし) | Spark UI のクラスターマネージャーのアプリケーションログ URL の代わりに、外部ログサービスをサポートするためのカスタム Spark executor ログ URL を指定します。Spark は、クラスターマネージャーによって異なるパターンで、いくつかのパス変数をサポートします。サポートされているパターンがあれば、クラスターマネージャーのドキュメントを確認してください。 この構成は、イベントログの元のログ URL も置き換えることに注意してください。これは、履歴サーバーでアプリケーションにアクセスする場合にも有効になります。新しいログ URL は永続的である必要があります。そうでない場合、executor ログ URL のリンク切れが発生する可能性があります。 現在、YARN および K8s クラスターマネージャーのみがこの構成をサポートしています。 |
3.0.0 |
spark.ui.prometheus.enabled |
true | ドライバー Web ページで /metrics/executors/prometheus に executor メトリクスを公開します。 | 3.0.0 |
spark.worker.ui.retainedExecutors |
1000 | Spark UI およびステータス API がガベージコレクションする前に保持する完了した executor の数。 | 1.5.0 |
spark.worker.ui.retainedDrivers |
1000 | Spark UI およびステータス API がガベージコレクションする前に保持する完了したドライバーの数。 | 1.5.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UI およびステータス API がガベージコレクションする前に保持する完了した実行の数。 | 1.5.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark UI およびステータス API がガベージコレクションする前に保持する完了したバッチの数。 | 1.0.0 |
spark.ui.retainedDeadExecutors |
100 | Spark UI およびステータス API がガベージコレクションする前に保持するデッド executor の数。 | 2.0.0 |
spark.ui.filters |
なし | Spark Web UI に適用するフィルタークラス名のカンマ区切りリスト。フィルターは標準の javax servlet Filter である必要があります。 フィルターパラメータは、 spark.<class name of filter>.param.<param name>=<value> の形式で構成エントリを設定することにより、構成でも指定できます。例 spark.ui.filters=com.test.filter1
spark.com.test.filter1.param.name1=foo
spark.com.test.filter1.param.name2=bar
|
1.0.0 |
spark.ui.requestHeaderSize |
8k | HTTP リクエストヘッダーで許可される最大サイズ (指定がない限りバイト単位)。この設定は Spark History Server にも適用されます。 | 2.2.3 |
spark.ui.timelineEnabled |
true | UI ページでイベントタイムラインデータを表示するかどうか。 | 3.4.0 |
spark.ui.timeline.executors.maximum |
250 | イベントタイムラインに表示される executor の最大数。 | 3.2.0 |
spark.ui.timeline.jobs.maximum |
500 | イベントタイムラインに表示されるジョブの最大数。 | 3.2.0 |
spark.ui.timeline.stages.maximum |
500 | イベントタイムラインに表示されるステージの最大数。 | 3.2.0 |
spark.ui.timeline.tasks.maximum |
1000 | イベントタイムラインに表示されるタスクの最大数。 | 1.4.0 |
spark.appStatusStore.diskStoreDir |
なし | SQL 実行の診断情報を保存するローカルディレクトリ。この構成はライブ UI のみに適用されます。 | 3.4.0 |
圧縮とシリアライゼーション
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.broadcast.compress |
true | ブロードキャスト変数を送信する前に圧縮するかどうか。一般的に良い考えです。圧縮には spark.io.compression.codec が使用されます。 |
0.6.0 |
spark.checkpoint.dir |
(なし) | チェックポイントのデフォルトディレクトリを設定します。SparkContext.setCheckpointDir によってオーバーライドできます。 | 4.0.0 |
spark.checkpoint.compress |
false | RDD チェックポイントを圧縮するかどうか。一般的に良い考えです。圧縮には spark.io.compression.codec が使用されます。 |
2.2.0 |
spark.io.compression.codec |
lz4 | RDD パーティション、イベントログ、ブロードキャスト変数、シャッフル出力などの内部データを圧縮するために使用されるコーデック。デフォルトでは、Spark は 4 つのコーデックを提供します: lz4、lzf、snappy、および zstd。コーデックを指定するために完全修飾クラス名を使用することもできます。例: org.apache.spark.io.LZ4CompressionCodec、org.apache.spark.io.LZFCompressionCodec、org.apache.spark.io.SnappyCompressionCodec、および org.apache.spark.io.ZStdCompressionCodec。 |
0.8.0 |
spark.io.compression.lz4.blockSize |
32k | LZ4 圧縮コーデックが使用される場合に使用される LZ4 圧縮のブロックサイズ。このブロックサイズを小さくすると、LZ4 が使用される際のシャッフルメモリ使用量も削減されます。デフォルトの単位はバイトですが、別途指定しない限りです。この構成は spark.io.compression.codec にのみ適用されます。 |
1.4.0 |
spark.io.compression.snappy.blockSize |
32k | Snappy 圧縮コーデックが使用される場合に使用される Snappy 圧縮のブロックサイズ。このブロックサイズを小さくすると、Snappy が使用される際のシャッフルメモリ使用量も削減されます。デフォルトの単位はバイトですが、別途指定しない限りです。この構成は spark.io.compression.codec にのみ適用されます。 |
1.4.0 |
spark.io.compression.zstd.level |
1 | Zstd 圧縮コーデックの圧縮レベル。圧縮レベルを上げると、CPU とメモリの消費が増える代わりに、圧縮率が向上します。この構成は spark.io.compression.codec にのみ適用されます。 |
2.3.0 |
spark.io.compression.zstd.bufferSize |
32k | Zstd 圧縮コーデックが使用される場合に使用される Zstd 圧縮のバッファサイズ (バイト単位)。このサイズを小さくすると、Zstd が使用される際のシャッフルメモリ使用量が削減されますが、過剰な JNI 呼び出しオーバーヘッドにより圧縮コストが増加する可能性があります。この構成は spark.io.compression.codec にのみ適用されます。 |
2.3.0 |
spark.io.compression.zstd.bufferPool.enabled |
true | true の場合、ZSTD JNI ライブラリのバッファプールを有効にします。 | 3.2.0 |
spark.io.compression.zstd.workers |
0 | Zstd を使用して並列に圧縮するために生成されるスレッド数。値が 0 の場合、ワーカーは生成されず、シングルスレッドモードで動作します。値が > 0 の場合、非同期モードがトリガーされ、対応する数のスレッドが生成されます。ワーカーが多いほどパフォーマンスは向上しますが、メモリコストも増加します。 | 4.0.0 |
spark.io.compression.lzf.parallel.enabled |
false | true の場合、LZF 圧縮は複数のスレッドを使用してデータを並列に圧縮します。 | 4.0.0 |
spark.kryo.classesToRegister |
(なし) | Kryo シリアライゼーションを使用する場合、Kryo に登録するカスタムクラス名のカンマ区切りリストを指定します。詳細については、チューニングガイドを参照してください。 | 1.2.0 |
spark.kryo.referenceTracking |
true | Kryo でデータをシリアライズする際に同じオブジェクトへの参照を追跡するかどうか。これは、オブジェクトグラフにループがある場合に必要であり、同じオブジェクトの複数のコピーが含まれている場合に効率のために役立ちます。このケースではないことがわかっている場合は、パフォーマンスを向上させるために無効にできます。 | 0.8.0 |
spark.kryo.registrationRequired |
false | Kryo で登録を要求するかどうか。'true' に設定されている場合、登録されていないクラスがシリアライズされると Kryo は例外をスローします。false (デフォルト) に設定されている場合、Kryo は各オブジェクトとともに登録されていないクラス名を書き込みます。クラス名を書き込むとパフォーマンスに大きなオーバーヘッドが発生する可能性があるため、このオプションを有効にすると、ユーザーが登録からクラスを省略していないことを厳密に強制できます。 | 1.1.0 |
spark.kryo.registrator |
(なし) | Kryo シリアライゼーションを使用する場合、カスタムクラスを Kryo に登録するクラスのカンマ区切りリストを指定します。このプロパティは、カスタム方法でクラスを登録する必要がある場合 (例: カスタムフィールドシリアライザーを指定する) に役立ちます。それ以外の場合は、spark.kryo.classesToRegister の方が簡単です。これは、KryoRegistrator を拡張するクラスに設定する必要があります。詳細については、チューニングガイドを参照してください。 |
0.5.0 |
spark.kryo.unsafe |
true | unsafe ベースの Kryo シリアライザーを使用するかどうか。Unsafe Based IO を使用することで、大幅に高速化できます。 | 2.1.0 |
spark.kryoserializer.buffer.max |
64m | Kryo シリアライゼーションバッファの最大許容サイズ (指定がない限り MiB)。シリアライズしようとする任意のオブジェクトよりも大きくなければならず、2048m 未満である必要があります。「buffer limit exceeded」という例外が Kryo 内で発生した場合、これを増やしてください。 | 1.4.0 |
spark.kryoserializer.buffer |
64k | Kryo シリアライゼーションバッファの初期サイズ (指定がない限り KiB)。各ワーカーのコアごとに 1 つのバッファがあることに注意してください。このバッファは、必要に応じて spark.kryoserializer.buffer.max まで成長します。 |
1.4.0 |
spark.rdd.compress |
false | シリアライズされた RDD パーティションを圧縮するかどうか (例: Java および Scala の StorageLevel.MEMORY_ONLY_SER または Python の StorageLevel.MEMORY_ONLY)。CPU 時間の増加と引き換えに、かなりのスペースを節約できます。圧縮には spark.io.compression.codec が使用されます。 |
0.6.0 |
spark.serializer |
org.apache.spark.serializer. JavaSerializer |
ネットワーク上で送信されるオブジェクトや、シリアライズされた形式でキャッシュする必要があるオブジェクトをシリアライズするために使用されるクラス。Java シリアライゼーションのデフォルトは、任意の Serializable Java オブジェクトで動作しますが、非常に遅いため、速度が必要な場合は org.apache.spark.serializer.KryoSerializer を使用し、Kryo シリアライゼーションを構成することをお勧めします。org.apache.spark.Serializer のサブクラスであれば何でも使用できます。 |
0.5.0 |
spark.serializer.objectStreamReset |
100 | org.apache.spark.serializer.JavaSerializer を使用してシリアライズする場合、シリアライザーは冗長なデータの書き込みを防ぐためにオブジェクトをキャッシュしますが、これによりそれらのオブジェクトのガベージコレクションが停止します。「reset」を呼び出すと、シリアライザーからその情報がフラッシュされ、古いオブジェクトが収集されるようになります。この定期的なリセットをオフにするには -1 に設定します。デフォルトでは 100 オブジェクトごとにシリアライザーをリセットします。 | 1.0.0 |
メモリ管理
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.memory.fraction |
0.6 | 実行およびストレージに使用される (ヒープスペース - 300MB) の割合。これが低いほど、スピルやキャッシュされたデータの除外が頻繁に発生します。この構成の目的は、内部メタデータ、ユーザーデータ構造、およびスパースで異常に大きなレコードの場合の不正確なサイズ推定のためにメモリを確保することです。デフォルト値のままにしておくことをお勧めします。詳細については、この値が増加した場合に JVM ガベージコレクションを正しくチューニングすることに関する重要な情報を含め、この説明を参照してください。 | 1.6.0 |
spark.memory.storageFraction |
0.5 | 除外されないストレージメモリの量。spark.memory.fraction によって確保された領域のサイズに対する割合として表されます。これが高いほど、作業メモリが少なくなり、タスクがディスクにスピルする頻度が高くなる可能性があります。デフォルト値のままにしておくことをお勧めします。詳細については、この説明を参照してください。 |
1.6.0 |
spark.memory.offHeap.enabled |
false | true の場合、Spark は特定の操作にオフヒープメモリを使用しようとします。オフヒープメモリの使用が有効な場合、spark.memory.offHeap.size は正である必要があります。 |
1.6.0 |
spark.memory.offHeap.size |
0 | オフヒープ割り当てに使用できるメモリの絶対量 (指定がない限りバイト単位)。この設定はヒープメモリ使用量に影響しないため、executor の合計メモリ消費量が何らかのハードリミット内に収まる必要がある場合は、JVM ヒープサイズをそれに応じて縮小してください。spark.memory.offHeap.enabled=true の場合、これは正の値に設定する必要があります。 |
1.6.0 |
spark.storage.unrollMemoryThreshold |
1024 * 1024 | ブロックをアンロールする前に要求される初期メモリ。 | 1.1.0 |
spark.storage.replication.proactive |
true | RDD ブロックのプロアクティブなブロックレプリケーションを有効にします。executor の障害により失われたキャッシュされた RDD ブロックのレプリカは、既存のレプリカがあれば補充されます。これにより、ブロックのレプリケーションレベルを初期値に近づけようとします。 | 2.2.0 |
spark.storage.localDiskByExecutors.cacheSize |
1000 | ローカルディレクトリが格納される executor の最大数。このサイズは、ドライバー側と executor 側の両方に適用され、無制限のストアを回避します。このキャッシュは、同じホストからディスクに永続化された RDD ブロックまたはシャッフルブロック (spark.shuffle.readHostLocalDisk が設定されている場合) をフェッチする場合にネットワークを回避するために使用されます。 |
3.0.0 |
spark.cleaner.periodicGC.interval |
30min | ガベージコレクションをトリガーする頻度を制御します。 このコンテキストクリーナーは、弱参照がガベージコレクションされたときにのみクリーンアップをトリガーします。メモリ圧力がほとんどない大規模なドライバー JVM を持つ長時間実行アプリケーションでは、これは非常にまれにしか発生しないか、まったく発生しない場合があります。まったくクリーンアップしないと、しばらくすると executor がディスク容量を使い果たす可能性があります。 |
1.6.0 |
spark.cleaner.referenceTracking |
true | コンテキストクリーニングを有効または無効にします。 | 1.0.0 |
spark.cleaner.referenceTracking.blocking |
true | クリーニングスレッドがクリーンアップタスクでブロックするかどうかを制御します (シャッフルを除く。シャッフルは spark.cleaner.referenceTracking.blocking.shuffle Spark プロパティによって制御されます)。 |
1.0.0 |
spark.cleaner.referenceTracking.blocking.shuffle |
false | クリーニングスレッドがシャッフルクリーンアップタスクでブロックするかどうかを制御します。 | 1.1.1 |
spark.cleaner.referenceTracking.cleanCheckpoints |
false | 参照がスコープ外の場合にチェックポイントファイルをクリーンアップするかどうかを制御します。 | 1.4.0 |
実行の動作
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.broadcast.blockSize |
4m | TorrentBroadcastFactory の各ブロックピースのサイズ (指定がない限り KiB)。大きすぎる値はブロードキャスト中の並列処理を低下させ (遅くする) ます。しかし、小さすぎると BlockManager のパフォーマンスが低下する可能性があります。 |
0.5.0 |
spark.broadcast.checksum |
true | ブロードキャストのチェックサムを有効にするかどうか。有効な場合、ブロードキャストにはチェックサムが含まれ、破損したブロックの検出に役立ちますが、計算と送信に少し追加データが必要になります。ネットワークにデータがブロードキャスト中に破損しないことを保証する他のメカニズムがある場合、無効にすることができます。 | 2.1.1 |
spark.broadcast.UDFCompressionThreshold |
1 * 1024 * 1024 | ユーザー定義関数 (UDF) および Python RDD コマンドがバイト単位でブロードキャストによって圧縮されるしきい値 (指定がない限り)。 | 3.0.0 |
spark.executor.cores |
YARN モードでは 1、スタンドアロンモードではワーカーの利用可能なすべてのコア。 | 各 executor で使用するコア数。 | 1.0.0 |
spark.default.parallelism |
reduceByKey および join のような分散シャッフル操作の場合、親 RDD の最大パーティション数。parallelize のような親 RDD がない操作の場合、クラスターマネージャーによって異なります。
|
join、reduceByKey、および parallelize のような変換によって返される RDD のデフォルトパーティション数 (ユーザーによって設定されていない場合)。 |
0.5.0 |
spark.executor.heartbeatInterval |
10s | executor からドライバーへのハートビート間の間隔。ハートビートは、executor がまだ生存していることをドライバーに知らせ、進行中のタスクのメトリクスを更新します。spark.executor.heartbeatInterval は spark.network.timeout よりも大幅に小さくする必要があります。 | 1.1.0 |
spark.files.fetchTimeout |
60s | SparkContext.addFile() を介して追加されたファイルをドライバーからフェッチする際の通信タイムアウト。 | 1.0.0 |
spark.files.useFetchCache |
true | true (デフォルト) に設定されている場合、ファイルフェッチは、同じアプリケーションに属する executor が共有するローカルキャッシュを使用し、同じホストで多数の executor を実行する場合のタスク起動パフォーマンスを向上させることができます。false に設定すると、これらのキャッシュ最適化は無効になり、すべての executor は独自のファイルコピーをフェッチします。この最適化は、NFS ファイルシステム上に存在する Spark ローカルディレクトリを使用するために無効にされる場合があります (詳細については、SPARK-6313 を参照してください)。 | 1.2.2 |
spark.files.overwrite |
false | 起動時に存在するファイルを上書きするかどうか。このオプションが true に設定されていても、ユーザーは SparkContext.addFile または SparkContext.addJar によって追加されたファイルを上書きすることはできません。 | 1.0.0 |
spark.files.ignoreCorruptFiles |
false | 破損したファイルを無視するかどうか。true の場合、Spark ジョブは破損した、または存在しないファイルに遭遇しても実行を継続し、読み取られた内容は引き続き返されます。 | 2.1.0 |
spark.files.ignoreMissingFiles |
false | 欠落したファイルを無視するかどうか。true の場合、Spark ジョブは欠落したファイルに遭遇しても実行を継続し、読み取られた内容は引き続き返されます。 | 2.4.0 |
spark.files.maxPartitionBytes |
134217728 (128 MiB) | ファイルを読み取る際に 1 つのパーティションにパッキングされる最大バイト数。 | 2.1.0 |
spark.files.openCostInBytes |
4194304 (4 MiB) | ファイルを開く際の推定コスト(同時にスキャンできるバイト数で測定)。これは、複数のファイルをパーティションにまとめる場合に使用されます。過大評価する方が、小さなファイルを持つパーティションが大きなファイルを持つパーティションよりも速くなるため、良いでしょう。 | 2.1.0 |
spark.hadoop.cloneConf |
false | true に設定すると、タスクごとに新しい Hadoop Configuration オブジェクトをクローンします。このオプションは、Configuration のスレッドセーフの問題を回避するために有効にする必要があります(詳細は SPARK-2546 を参照)。これらの問題の影響を受けないジョブでは予期しないパフォーマンスの低下を避けるため、デフォルトでは無効になっています。 |
1.0.3 |
spark.hadoop.validateOutputSpecs |
true | true に設定すると、saveAsHadoopFile およびその他のバリアントで使用される出力仕様(例: 出力ディレクトリが既に存在するかどうかのチェック)を検証します。既存の出力ディレクトリによる例外を抑制するために無効にすることができます。Spark の以前のバージョンとの互換性を確保しようとする場合を除き、この設定を無効にすることは推奨しません。単に Hadoop の FileSystem API を使用して出力ディレクトリを手動で削除してください。Spark Streaming の StreamingContext を介して生成されたジョブでは、チェックポイントの復旧中にデータが出力ディレクトリに書き戻される可能性があるため、この設定は無視されます。 | 1.0.1 |
spark.storage.memoryMapThreshold |
2m | Spark がディスクからブロックを読み取る際にメモリマップを行うブロックのサイズ。デフォルトの単位はバイトですが、別途指定されている場合は除きます。これにより、Spark は非常に小さなブロックのメモリマッピングを回避します。一般的に、メモリマッピングは、オペレーティングシステムのページサイズに近いかそれ以下のブロックに対してはオーバーヘッドが高くなります。 | 0.9.2 |
spark.storage.decommission.enabled |
false | エグゼキューターを廃止する際にブロックマネージャーを廃止するかどうか。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.enabled |
true | ブロックマネージャーの廃止中にシャッフルブロックを転送するかどうか。移行可能なシャッフルリゾルバ(ソートベースシャッフルなど)が必要です。 | 3.1.0 |
spark.storage.decommission.shuffleBlocks.maxThreads |
8 | シャッフルファイルの移行に使用するスレッドの最大数。 | 3.1.0 |
spark.storage.decommission.rddBlocks.enabled |
true | ブロックマネージャーの廃止中に RDD ブロックを転送するかどうか。 | 3.1.0 |
spark.storage.decommission.fallbackStorage.path |
(なし) | ブロックマネージャーの廃止中のフォールバックストレージの場所。例: s3a://spark-storage/。空の場合は、フォールバックストレージは無効になります。Spark はフォールバックストレージをクリーンアップしないため、ストレージは TTL によって管理する必要があります。 |
3.1.0 |
spark.storage.decommission.fallbackStorage.cleanUp |
false | true の場合、Spark はシャットダウン中にフォールバックストレージデータをクリーンアップします。 | 3.2.0 |
spark.storage.decommission.shuffleBlocks.maxDiskSize |
(なし) | リモートシャッフルブロックを拒否する前にシャッフルブロックを保存するために使用する最大ディスクスペース。リモートシャッフルブロックを拒否するということは、エグゼキューターがシャッフル移行を受信せず、移行に使用できる他のエグゼキューターがない場合、spark.storage.decommission.fallbackStorage.path が設定されていない限りシャッフルブロックが失われる可能性があることを意味します。 |
3.2.0 |
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version |
1 | ファイル出力コミッターアルゴリズムのバージョン。有効なアルゴリズムバージョン番号は 1 または 2 です。バージョン 2 は MAPREDUCE-7282 のような正確性の問題を引き起こす可能性があることに注意してください。 | 2.2.0 |
Executor メトリクス
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.eventLog.logStageExecutorMetrics |
false | ステージごとのエグゼキューターメトリクスのピーク(各エグゼキューターごと)をイベントログに書き込むかどうか。 注意: メトリクスはエグゼキューターのハートビートでポーリング(収集)され、常に送信されます。この設定は、集計されたメトリクスのピークがイベントログに書き込まれるかどうかを決定するためだけです。 |
3.0.0 |
spark.executor.processTreeMetrics.enabled |
false | エグゼキューターメトリクスを収集する際に、プロセスツリーメトリクス(/proc ファイルシステムから)を収集するかどうか。 注意: プロセスツリーメトリクスは、/proc ファイルシステムが存在する場合にのみ収集されます。 |
3.0.0 |
spark.executor.metrics.pollingInterval |
0 | エグゼキューターメトリクスを収集する頻度(ミリ秒)。 0 の場合、ポーリングはエグゼキューターのハートビート時に行われます(つまり、 spark.executor.heartbeatInterval で指定されたハートビート間隔)。正の値の場合、ポーリングはこの間隔で行われます。 |
3.0.0 |
spark.eventLog.gcMetrics.youngGenerationGarbageCollectors |
Copy,PS Scavenge,ParNew,G1 Young Generation | サポートされている若い世代のガベージコレクターの名前。通常、名前は GarbageCollectorMXBean.getName の戻り値です。組み込みの若い世代のガベージコレクターは Copy, PS Scavenge, ParNew, G1 Young Generation です。 | 3.0.0 |
spark.eventLog.gcMetrics.oldGenerationGarbageCollectors |
MarkSweepCompact,PS MarkSweep,ConcurrentMarkSweep,G1 Old Generation | サポートされている古い世代のガベージコレクターの名前。通常、名前は GarbageCollectorMXBean.getName の戻り値です。組み込みの古い世代のガベージコレクターは MarkSweepCompact, PS MarkSweep, ConcurrentMarkSweep, G1 Old Generation です。 | 3.0.0 |
spark.executor.metrics.fileSystemSchemes |
file,hdfs | エグゼキューターメトリクスで報告するファイルシステムスキーム。 | 3.1.0 |
ネットワーキング
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.rpc.message.maxSize |
128 | 「コントロールプレーン」通信で許可される最大メッセージサイズ(MiB)。一般的には、エグゼキューターとドライバー間で送信されるマップ出力サイズ情報にのみ適用されます。数千ものマップおよびリデュースタスクを持つジョブを実行していて、RPC メッセージサイズに関するメッセージが表示される場合は、これを増やしてください。 | 2.0.0 |
spark.blockManager.port |
(ランダム) | すべてのブロックマネージャーがリッスンするポート。これらはドライバーとエグゼキューターの両方に存在します。 | 1.1.0 |
spark.driver.blockManager.port |
(spark.blockManager.port の値) | ブロックマネージャーがリッスンするドライバー固有のポート。エグゼキューターと同じ設定を使用できない場合に役立ちます。 | 2.1.0 |
spark.driver.bindAddress |
(spark.driver.host の値) | リッスンソケットをバインドするホスト名または IP アドレス。この設定は、SPARK_LOCAL_IP 環境変数をオーバーライドします(以下を参照)。 また、ローカルアドレスとは異なるアドレスをエグゼキューターまたは外部システムにアドバタイズすることもできます。これは、たとえば、ブリッジされたネットワークを持つコンテナを実行する場合に便利です。これを正しく機能させるには、ドライバーが使用する異なるポート(RPC、ブロックマネージャー、UI)をコンテナのホストから転送する必要があります。 |
2.1.0 |
spark.driver.host |
(ローカルホスト名) | ドライバーのホスト名または IP アドレス。これは、エグゼキューターおよびスタンドアロンマスターとの通信に使用されます。 | 0.7.0 |
spark.driver.port |
(ランダム) | ドライバーがリッスンするポート。これは、エグゼキューターおよびスタンドアロンマスターとの通信に使用されます。 | 0.7.0 |
spark.rpc.io.backLog |
64 | RPC サーバーの受け入れキューの長さ。大規模なアプリケーションでは、短時間で多数の接続が到着した場合に着信接続がドロップされないように、この値を増やす必要がある場合があります。 | 3.0.0 |
spark.network.timeout |
120s | すべてのネットワークインタラクションのデフォルトタイムアウト。この設定は、spark.storage.blockManagerHeartbeatTimeoutMs、spark.shuffle.io.connectionTimeout、spark.rpc.askTimeout、または spark.rpc.lookupTimeout が設定されていない場合に使用されます。 |
1.3.0 |
spark.network.timeoutInterval |
60s | ドライバーがデッドエグゼキューターをチェックして期限切れにする間隔。 | 1.3.2 |
spark.network.io.preferDirectBufs |
true | 有効な場合、共有アロケーターはヒープ外バッファ割り当てを優先します。ヒープ外バッファは、シャッフルおよびキャッシュブロック転送中のガベージコレクションを削減するために使用されます。ヒープ外メモリが厳しく制限されている環境では、ユーザーはこれを無効にして、すべての割り当てをヒープ内で行うようにしたい場合があります。 | 3.0.0 |
spark.port.maxRetries |
16 | ポートへのバインドを試行する際の最大リトライ回数。ポートに特定の値(0以外)が指定されている場合、後続のリトライでは、リトライ前に以前の試行で使用されたポートが 1 増分されます。これにより、指定された開始ポートから port + maxRetries までのポート範囲を試すことができます。 | 1.1.1 |
spark.rpc.askTimeout |
spark.network.timeout |
RPC ask 操作がタイムアウトするまでの待機時間。 | 1.4.0 |
spark.rpc.lookupTimeout |
120s | RPC リモートエンドポイントのルックアップ操作がタイムアウトするまでの待機時間。 | 1.4.0 |
spark.network.maxRemoteBlockSizeFetchToMem |
200m | リモートブロックは、ブロックのサイズがこのしきい値(バイト単位)を超えている場合にディスクにフェッチされます。これは、巨大なリクエストがメモリを過剰に使用するのを避けるためです。この設定は、シャッフルフェッチとブロックマネージャーのリモートブロックフェッチの両方に影響します。外部シャッフルサービスを有効にしているユーザーの場合、この機能は外部シャッフルサービスが少なくとも 2.3.0 の場合にのみ機能します。 | 3.0.0 |
spark.rpc.io.connectionTimeout |
spark.network.timeout の値 |
RPC ピア間の確立された接続がアイドル状態としてマークされ、保留中の RPC 要求があるがチャネルにトラフィックがない場合に、connectionTimeout 以上経過すると閉じられるまでのタイムアウト。 |
1.2.0 |
spark.rpc.io.connectionCreationTimeout |
spark.rpc.io.connectionTimeout の値 |
RPC ピア間の接続を確立するためのタイムアウト。 | 3.2.0 |
スケジューリング
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.cores.max |
(未設定) | スタンドアロンデプロイクラスタで実行する場合、アプリケーションにクラスタ全体(各マシンからではなく)で要求する CPU コアの最大量。設定されていない場合、Spark のスタンドアロンクラスターマネージャーでは spark.deploy.defaultCores がデフォルトになります。 |
0.6.0 |
spark.locality.wait |
3s | データローカルタスクを起動するのを諦めて、よりローカルでないノードで起動するまでの待機時間。同じ待機時間が、複数のローカリティレベル(プロセスローカル、ノードローカル、ラックローカル、そして任意)をステップスルーするために使用されます。spark.locality.wait.node などを設定することで、各レベルの待機時間をカスタマイズすることも可能です。タスクが長く、ローカリティが低い場合は、この設定を増やすべきですが、デフォルトで通常はうまく機能します。 |
0.5.0 |
spark.locality.wait.node |
spark.locality.wait | ノードローカリティのローカリティ待機時間をカスタマイズします。たとえば、ノードローカリティをスキップしてラックローカリティをすぐに検索するように 0 に設定できます(クラスタにラック情報がある場合)。 | 0.8.0 |
spark.locality.wait.process |
spark.locality.wait | プロセスローカリティのローカリティ待機時間をカスタマイズします。これは、特定のエグゼキュータープロセス内のキャッシュされたデータにアクセスしようとするタスクに影響します。 | 0.8.0 |
spark.locality.wait.rack |
spark.locality.wait | ラックローカリティのローカリティ待機時間をカスタマイズします。 | 0.8.0 |
spark.scheduler.maxRegisteredResourcesWaitingTime |
30s | スケジューリングが開始される前にリソースが登録されるのを待つ最大時間。 | 1.1.1 |
spark.scheduler.minRegisteredResourcesRatio |
KUBERNETES モードの場合 0.8; YARN モードの場合 0.8; スタンドアロンモードの場合 0.0 | スケジューリングが開始される前に登録されたリソースの最小比率(登録済みリソース / 期待される総リソース)。(リソースは YARN モードと Kubernetes モードではエグゼキューター、スタンドアロンモードでは CPU コア)。0.0 から 1.0 の間の浮動小数点数で指定します。リソースの最小比率が達成されたかどうかに関わらず、スケジューリングが開始されるまで待機する最大時間は、設定 spark.scheduler.maxRegisteredResourcesWaitingTime によって制御されます。 |
1.1.1 |
spark.scheduler.mode |
FIFO | 同じ SparkContext に投入されたジョブ間のスケジューリングモード。キューイングジョブの代わりにフェアシェアリングを使用するには FAIR に設定できます。マルチユーザーサービスに便利です。 |
0.8.0 |
spark.scheduler.revive.interval |
1s | スケジューラーがタスクを実行するためのワーカーリソースオファーをリバイブする間隔。 | 0.8.1 |
spark.scheduler.listenerbus.eventqueue.capacity |
10000 | イベントキューのデフォルト容量。Spark は、spark.scheduler.listenerbus.eventqueue.queueName.capacity で指定された容量を使用してイベントキューを初期化しようとします。設定されていない場合、Spark はこの設定で指定されたデフォルト容量を使用します。容量は 0 より大きい必要があります。リスナーイベントがドロップされる場合は、値(例: 20000)を増やすことを検討してください。この値を増やすと、ドライバーがより多くのメモリを使用する可能性があります。 |
2.3.0 |
spark.scheduler.listenerbus.eventqueue.shared.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark リスナーバスの共有イベントキューの容量。これは、リスナーバスに登録された外部リスナーのイベントを保持します。共有キューに対応するリスナーイベントがドロップされる場合は、値の増加を検討してください。この値を増やすと、ドライバーがより多くのメモリを使用する可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.appStatus.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
appStatus イベントキューの容量。これは、内部アプリケーションステータスリスナーのイベントを保持します。appStatus キューに対応するリスナーイベントがドロップされる場合は、値の増加を検討してください。この値を増やすと、ドライバーがより多くのメモリを使用する可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.executorManagement.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark リスナーバスの executorManagement イベントキューの容量。これは、内部エグゼキューター管理リスナーのイベントを保持します。executorManagement キューに対応するリスナーイベントがドロップされる場合は、値の増加を検討してください。この値を増やすと、ドライバーがより多くのメモリを使用する可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.eventLog.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark リスナーバスの eventLog キューの容量。これは、イベントログにイベントを書き込むイベントロギングリスナーのイベントを保持します。eventLog キューに対応するリスナーイベントがドロップされる場合は、値の増加を検討してください。この値を増やすと、ドライバーがより多くのメモリを使用する可能性があります。 | 3.0.0 |
spark.scheduler.listenerbus.eventqueue.streams.capacity |
spark.scheduler.listenerbus.eventqueue.capacity |
Spark リスナーバスの streams キューの容量。これは、内部ストリーミングリスナーのイベントを保持します。streams キューに対応するリスナーイベントがドロップされる場合は、値の増加を検討してください。この値を増やすと、ドライバーがより多くのメモリを使用する可能性があります。 | 3.0.0 |
spark.scheduler.resource.profileMergeConflicts |
false | 「true」に設定されている場合、Spark は、単一のステージに結合される RDD で異なる ResourceProfiles が指定されている場合に ResourceProfiles をマージします。マージされると、Spark は各リソースの最大値を選択し、新しい ResourceProfile を作成します。false のデフォルトでは、同じステージに入る RDD で複数の異なる ResourceProfiles が見つかった場合に Spark は例外をスローします。 | 3.1.0 |
spark.scheduler.excludeOnFailure.unschedulableTaskSetTimeout |
120s | タスクの失敗により除外されたすべてのエグゼキューターのためにスケジュール不可能になったタスクセットを中止する前に、新しいエグゼキューターを取得してタスクをスケジュールするために待機する秒単位のタイムアウト。 | 2.4.1 |
spark.standalone.submit.waitAppCompletion |
false | true に設定されている場合、Spark は、単一のステージに結合される RDD で異なる ResourceProfiles が指定されている場合に ResourceProfiles をマージします。マージされると、Spark は各リソースの最大値を選択し、新しい ResourceProfile を作成します。false のデフォルトでは、同じステージに入る RDD で複数の異なる ResourceProfiles が見つかった場合に Spark は例外をスローします。 | 3.1.0 |
spark.excludeOnFailure.enabled |
false | 「true」に設定されている場合、Spark がタスクの失敗が多すぎるために除外されたエグゼキューターでタスクをスケジュールすることを防ぎます。エグゼキューターとノードを除外するために使用されるアルゴリズムは、他の「spark.excludeOnFailure」設定オプションでさらに制御できます。この設定は、「spark.excludeOnFailure.application.enabled」および「spark.excludeOnFailure.taskAndStage.enabled」によってオーバーライドされ、個々のレベルでの除外の有効化を指定します。 | 2.1.0 |
spark.excludeOnFailure.application.enabled |
false | 「true」に設定されている場合、タスクの失敗が多すぎるためにアプリケーション全体のエグゼキューターを除外し、Spark がそれらのエグゼキューターでタスクをスケジュールすることを防ぎます。この設定は、「spark.excludeOnFailure.enabled」をオーバーライドします。 | 4.0.0 |
spark.excludeOnFailure.taskAndStage.enabled |
false | 「true」に設定されている場合、タスクセットレベルでタスクの失敗が多すぎるためにエグゼキューターを除外し、Spark がそれらのエグゼキューターでタスクをスケジュールすることを防ぎます。この設定は、「spark.excludeOnFailure.enabled」をオーバーライドします。 | 4.0.0 |
spark.excludeOnFailure.timeout |
1h | (実験的) ノードまたはエグゼキューターがアプリケーション全体で除外される期間。その後、新しいタスクの実行を試みるために無条件に除外リストから削除されます。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerExecutor |
1 | (実験的) 特定のタスクについて、そのタスクが 1 つのエグゼキューターでリトライできる回数。その後、そのタスクに対してエグゼキューターが除外されます。 | 2.1.0 |
spark.excludeOnFailure.task.maxTaskAttemptsPerNode |
2 | (実験的) 特定のタスクについて、そのタスクが 1 つのノードでリトライできる回数。その後、そのタスクに対してノード全体が除外されます。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedTasksPerExecutor |
2 | (実験的) 1 つのステージ内で、1 つのエグゼキューターで失敗する必要がある異なるタスクの数。その後、そのステージに対してエグゼキューターが除外されます。 | 2.1.0 |
spark.excludeOnFailure.stage.maxFailedExecutorsPerNode |
2 | (実験的) 特定のステージで除外されているとマークされた異なるエグゼキューターの数。その後、ステージに対してノード全体が失敗したとマークされます。 | 2.1.0 |
spark.excludeOnFailure.application.maxFailedTasksPerExecutor |
2 | (実験的) 成功したタスクセット内で、1 つのエグゼキューターで失敗する必要がある異なるタスクの数。その後、アプリケーション全体でエグゼキューターが除外されます。除外されたエグゼキューターは、spark.excludeOnFailure.timeout で指定されたタイムアウト後に、利用可能なリソースのプールに自動的に戻されます。ただし、動的割り当てでは、エグゼキューターがアイドル状態とマークされ、クラスタマネージャーによって再利用される可能性があることに注意してください。 |
2.2.0 |
spark.excludeOnFailure.application.maxFailedExecutorsPerNode |
2 | (実験的) アプリケーション全体で除外されている必要がある異なるエグゼキューターの数。その後、アプリケーション全体でノードが除外されます。除外されたノードは、spark.excludeOnFailure.timeout で指定されたタイムアウト後に、利用可能なリソースのプールに自動的に戻されます。ただし、動的割り当てでは、ノード上のエグゼキューターがアイドル状態とマークされ、クラスタマネージャーによって再利用される可能性があることに注意してください。 |
2.2.0 |
spark.excludeOnFailure.killExcludedExecutors |
false | (実験的) 「true」に設定されている場合、Spark がフェッチ障害またはアプリケーション全体での除外により除外されたエグゼキューターを自動的にキルすることを許可します。これは spark.killExcludedExecutors.application.* によって制御されます。ただし、ノード全体が除外に追加された場合、そのノード上のすべてエグゼキューターがキルされることに注意してください。 | 2.2.0 |
spark.excludeOnFailure.application.fetchFailure.enabled |
false | (実験的) 「true」に設定されている場合、Spark はフェッチ障害が発生するとすぐにエグゼキューターを除外します。外部シャッフルサービスが有効な場合、ノード全体が除外されます。 | 2.3.0 |
spark.speculation |
false | 「true」に設定されている場合、タスクの投機的実行を実行します。これは、1 つ以上のタスクがステージで遅く実行されている場合、それらが再起動されることを意味します。 | 0.6.0 |
spark.speculation.interval |
100ms | Spark が投機的実行するタスクをチェックする頻度。 | 0.6.0 |
spark.speculation.multiplier |
3 | タスクが中央値よりも何倍遅い場合に、投機的実行の対象とみなされるか。 | 0.6.0 |
spark.speculation.quantile |
0.9 | 投機的実行が有効になる前に、完了する必要があるタスクの割合(特定のステージに対して)。 | 0.6.0 |
spark.speculation.minTaskRuntime |
100ms | タスクが投機的実行の対象とみなされる前に実行する最小時間。これにより、非常に短いタスクの投機的コピーの起動を回避できます。 | 3.2.0 |
spark.speculation.task.duration.threshold |
なし | スケジューラーがタスクの投機的実行を試みるタスクの実行時間。指定されている場合、ステージ内のタスク数が単一エグゼキューターのスロット数以下であり、タスクがしきい値よりも長い時間がかかっている場合、タスクは投機的に実行されます。この設定は、タスクが非常に少ないステージの投機的実行に役立ちます。エグゼキューターのスロットが十分な場合、通常の投機的設定も適用される可能性があります。たとえば、十分な成功した実行があっても、しきい値に達していない場合でも、タスクは再起動される可能性があります。スロット数は、spark.executor.cores および spark.task.cpus の最小値 1 の設定値に基づいて計算されます。デフォルトの単位はバイトですが、別途指定されている場合を除きます。 | 3.0.0 |
spark.speculation.efficiency.processRateMultiplier |
0.75 | 非効率なタスクを評価する際に使用される乗数。乗数が高いほど、より多くのタスクが非効率とみなされる可能性があります。 | 3.4.0 |
spark.speculation.efficiency.longRunTaskFactor |
2 | タスクの実行時間が、係数と時間しきい値(spark.speculation.multiplier * successfulTaskDurations.median または spark.speculation.minTaskRuntime)の積を超えている限り、タスクは anyway 投機的に実行されます。これは、タスクの遅延がデータ処理レートに関連していない場合に非効率なタスクを見逃すのを防ぎます。 |
3.4.0 |
spark.speculation.efficiency.enabled |
true | true に設定されている場合、Spark はステージタスクメトリクスまたはその実行時間を通じてタスク処理の効率を評価し、非効率なタスクのみを投機的に実行する必要があります。タスクが非効率なのは、1) そのデータ処理レートが、ステージのすべての成功したタスクの平均データ処理レートに係数を掛けたものより小さい、または 2) その実行時間が、spark.speculation.multiplier * successfulTaskDurations.median または spark.speculation.minTaskRuntime の積の値を超えている場合です。 |
3.4.0 |
spark.task.cpus |
1 | 各タスクに割り当てるコア数。 | 0.5.0 |
spark.task.resource.{resourceName}.amount |
1 | 各タスクに割り当てる特定のリソースタイプの量。これは浮動小数点数になる可能性があることに注意してください。これが指定されている場合は、エグゼキューター設定 spark.executor.resource.{resourceName}.amount および対応する検出設定も提供する必要があります。これにより、エグゼキューターはそのリソースタイプで作成されます。整数量に加えて、分数量(たとえば、0.25 はリソースの 1/4 を意味します)を指定できます。分数量は 0.5 以下である必要があります。つまり、リソース共有の最小量は 2 タスク/リソースです。さらに、分数量はリソーススロットを割り当てるために切り捨てられます(例: 0.2222 の設定、つまり 1/0.2222 スロットは 5 ではなく 4 タスク/リソースになります)。 |
3.0.0 |
spark.task.maxFailures |
4 | ジョブを断念する前に、任意の特定のタスクが連続して失敗する回数。異なるタスクにまたがる合計失敗回数ではジョブは失敗しません。特定のタスクがこの回数連続で失敗する必要があります。いずれかの試行が成功すると、タスクの失敗回数はリセットされます。1 以上である必要があります。許可されるリトライ回数 = この値 - 1。 | 0.8.0 |
spark.task.reaper.enabled |
false | キルされた/中断されたタスクの監視を有効にします。true に設定されている場合、キルされたタスクは、そのタスクが実際に実行を終了するまでエグゼキューターによって監視されます。この監視の正確な動作を制御する方法については、他の spark.task.reaper.* 設定を参照してください。false(デフォルト)に設定されている場合、タスクのキルは、そのような監視がない古いコードパスを使用します。 |
2.0.3 |
spark.task.reaper.pollingInterval |
10s | spark.task.reaper.enabled = true の場合、この設定は、エグゼキューターがキルされたタスクのステータスをポーリングする頻度を制御します。ポーリング時にキルされたタスクがまだ実行中の場合、警告がログに記録され、デフォルトではタスクのスレッドダンプがログに記録されます(これは、以下に文書化されている spark.task.reaper.threadDump 設定で無効にできます)。 |
2.0.3 |
spark.task.reaper.threadDump |
true | spark.task.reaper.enabled = true の場合、この設定は、キルされたタスクの定期的なポーリング中にタスクスレッドダンプがログに記録されるかどうかを制御します。スレッドダンプの収集を無効にするには、これを false に設定してください。 |
2.0.3 |
spark.task.reaper.killTimeout |
-1 | spark.task.reaper.enabled = true の場合、この設定は、キルされたタスクが実行を停止していない場合にエグゼキューター JVM が自身をキルするまでのタイムアウトを指定します。デフォルト値の -1 はこのメカニズムを無効にし、エグゼキューターが自己破壊するのを防ぎます。この設定の目的は、キャンセル不可能な実行中のタスクがエグゼキューターを使用不能にするのを防ぐためのセーフティネットとして機能することです。 |
2.0.3 |
spark.stage.maxConsecutiveAttempts |
4 | ステージが中止される前に許可される連続ステージ試行回数。 | 2.2.0 |
spark.stage.ignoreDecommissionFetchFailure |
true | spark.stage.maxConsecutiveAttempts をカウントする際に、エグゼキューターの廃止によって引き起こされるステージフェッチ障害を無視するかどうか。 |
3.4.0 |
バリア実行モード
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.barrier.sync.timeout |
365d | バリアタスクからの各 barrier() 呼び出しのタイムアウト。コーディネーターが構成された時間内にすべてのバリアタスクからの同期メッセージを受信しなかった場合、SparkException をスローしてすべてのタスクを失敗させます。デフォルト値は 31536000(3600 * 24 * 365)に設定されているため、barrier() 呼び出しは 1 年間待機します。 |
2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.interval |
15s | 最大同時タスクチェックの失敗と次のチェックの間の待機時間(秒)。最大同時タスクチェックは、ジョブが送信されたときにバリアステージに必要な同時タスクよりも多くの同時タスクをクラスタが起動できることを保証します。チェックは、クラスタが起動したばかりで十分なエグゼキューターが登録されていない場合に失敗する可能性があります。そのため、しばらく待ってから再度チェックを実行します。ジョブで構成された最大失敗回数を超えてチェックが失敗した場合、現在のジョブ送信を失敗させます。この設定は、1 つ以上のバリアステージを含むジョブにのみ適用されることに注意してください。バリアステージを含まないジョブではチェックを実行しません。 | 2.4.0 |
spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures |
40 | ジョブ送信を失敗させる前に許可される最大同時タスクチェックの失敗回数。最大同時タスクチェックは、ジョブが送信されたときにバリアステージに必要な同時タスクよりも多くの同時タスクをクラスタが起動できることを保証します。チェックは、クラスタが起動したばかりで十分なエグゼキューターが登録されていない場合に失敗する可能性があります。そのため、しばらく待ってから再度チェックを実行します。ジョブで構成された最大失敗回数を超えてチェックが失敗した場合、現在のジョブ送信を失敗させます。この設定は、1 つ以上のバリアステージを含むジョブにのみ適用されることに注意してください。バリアステージを含まないジョブではチェックを実行しません。 | 2.4.0 |
動的割り当て
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.dynamicAllocation.enabled |
false | 動的リソース割り当てを使用するかどうか。これにより、ワークロードに基づいてこのアプリケーションに登録されているエグゼキューターの数を増減させます。詳細については、ここの説明を参照してください。 これには、次のいずれかの条件が必要です。1) spark.shuffle.service.enabled を介して外部シャッフルサービスを有効にする、または 2) spark.dynamicAllocation.shuffleTracking.enabled を介してシャッフル追跡を有効にする、または 3) spark.decommission.enabled および spark.storage.decommission.shuffleBlocks.enabled を介してシャッフルブロックの廃止を有効にする、または 4) (実験的) カスタム ShuffleDataIO で ShuffleDriverComponents が信頼性の高いストレージをサポートする spark.shuffle.sort.io.plugin.class を設定する。次の設定も関連します: spark.dynamicAllocation.minExecutors、spark.dynamicAllocation.maxExecutors、および spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.executorAllocationRatio |
1.2.0 |
spark.dynamicAllocation.executorIdleTimeout |
60s | 動的割り当てが有効になっており、エグゼキューターがこの期間以上アイドル状態の場合、エグゼキューターは削除されます。詳細については、この説明を参照してください。 | 1.2.0 |
spark.dynamicAllocation.cachedExecutorIdleTimeout |
infinity | 動的割り当てが有効になっており、キャッシュされたデータブロックを持つエグゼキューターがこの期間以上アイドル状態の場合、エグゼキューターは削除されます。詳細については、この説明を参照してください。 | 1.4.0 |
spark.dynamicAllocation.initialExecutors |
spark.dynamicAllocation.minExecutors |
動的割り当てが有効な場合、最初に実行されるエグゼキューターの数。--num-executors(または spark.executor.instances)が設定されており、この値より大きい場合、それがエグゼキューターの初期数として使用されます。 |
1.3.0 |
spark.dynamicAllocation.maxExecutors |
infinity | 動的割り当てが有効な場合のエグゼキューターの数の上限。 | 1.2.0 |
spark.dynamicAllocation.minExecutors |
0 | 動的割り当てが有効な場合のエグゼキューターの数の下限。 | 1.2.0 |
spark.dynamicAllocation.executorAllocationRatio |
1 | デフォルトでは、動的割り当ては、処理するタスクの数に基づいて並列度を最大化するために十分なエグゼキューターを要求します。これによりジョブのレイテンシが最小限に抑えられますが、タスクが小さい場合、エグゼキューターの割り当てオーバーヘッドにより、一部のエグゼキューターがまったく作業を行わない可能性があるため、リソースが無駄になる可能性があります。この設定では、比率を設定して、完全な並列度に対するエグゼキューターの数を減らすことができます。デフォルトは 1.0 で、最大の並列度を提供します。0.5 はターゲットエグゼキューター数を 2 で割ります。動的割り当てによって計算されたターゲットエグゼキューター数は、spark.dynamicAllocation.minExecutors および spark.dynamicAllocation.maxExecutors 設定によってオーバーライドされる場合があります。 |
2.4.0 |
spark.dynamicAllocation.schedulerBacklogTimeout |
1s | 動的割り当てが有効になっており、保留中のタスクがこの期間以上バックログになっている場合、新しいエグゼキューターが要求されます。詳細については、この説明を参照してください。 | 1.2.0 |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout |
schedulerBacklogTimeout |
spark.dynamicAllocation.schedulerBacklogTimeout と同じですが、後続のエグゼキューター要求にのみ使用されます。詳細については、この説明を参照してください。 |
1.2.0 |
spark.dynamicAllocation.shuffleTracking.enabled |
true |
エグゼキューターのシャッフルファイル追跡を有効にします。これにより、外部シャッフルサービスなしで動的割り当てが可能になります。このオプションは、アクティブなジョブのシャッフルデータを格納しているエグゼキューターを維持しようとします。 | 3.0.0 |
spark.dynamicAllocation.shuffleTracking.timeout |
infinity |
シャッフル追跡が有効になっている場合、シャッフルデータを保持しているエグゼキューターのタイムアウトを制御します。デフォルト値は、Spark がシャッフルがガベージコレクションされることでエグゼキューターを解放できることに依存することを意味します。何らかの理由でガベージコレクションがシャッフルを十分にクリーンアップしない場合、このオプションを使用して、シャッフルデータを格納している場合でもエグゼキューターのタイムアウトを制御できます。 | 3.0.0 |
スレッド構成
ジョブとクラスタの設定によっては、Spark のさまざまな場所にスレッド数を設定して、利用可能なリソースを効率的に活用し、パフォーマンスを向上させることができます。Spark 3.0 より前は、これらのスレッド設定は、ドライバー、エグゼキューター、ワーカー、マスターなど、Spark のすべてのロールに適用されていました。Spark 3.0 以降では、ドライバーとエグゼキューターから始めて、より細かい粒度でスレッドを設定できます。RPC モジュールを例にとると、以下の表のようになります。他のモジュール(シャッフルなど)については、spark.{driver|executor}.rpc.netty.dispatcher.numThreads(RPC モジュールのみ)を除き、「rpc」をプロパティ名に置き換えるだけです。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.{driver|executor}.rpc.io.serverThreads |
spark.rpc.io.serverThreads にフォールバック |
サーバースレッドプールで使用されるスレッド数 | 1.6.0 |
spark.{driver|executor}.rpc.io.clientThreads |
spark.rpc.io.clientThreads にフォールバック |
クライアントスレッドプールで使用されるスレッド数 | 1.6.0 |
spark.{driver|executor}.rpc.netty.dispatcher.numThreads |
spark.rpc.netty.dispatcher.numThreads にフォールバック |
RPC メッセージディスパッチャスレッドプールで使用されるスレッド数 | 3.0.0 |
スレッド関連のキーのデフォルト値は、ドライバーまたはエグゼキューターに要求されたコア数、またはそれが指定されていない場合は、JVM に利用可能なコア数(ハードコードされた上限 8)の最小値です。
Spark Connect
サーバー構成
サーバー構成は Spark Connect サーバーで設定されます。たとえば、./sbin/start-connect-server.sh で Spark Connect サーバーを起動する場合などです。これらは通常、設定ファイルおよびコマンドラインオプション(--conf/-c 付き)を介して設定されます。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.api.mode |
classic | Spark Classic アプリケーションの場合、ローカル Spark Connect サーバーを実行して Spark Connect を自動的に使用するかどうかを指定します。値は classic または connect になります。 |
4.0.0 |
spark.connect.grpc.binding.address |
(なし) | Spark Connect サーバーをバインドするアドレス。 | 4.0.0 |
spark.connect.grpc.binding.port |
15002 | Spark Connect サーバーをバインドするポート。 | 3.4.0 |
spark.connect.grpc.port.maxRetries |
0 | gRPC サーバーバインディングの最大ポートリトライ試行回数。デフォルトでは 0 に設定されており、ポートの競合が発生した場合、サーバーはすぐに失敗します。 | 4.0.0 |
spark.connect.grpc.interceptor.classes |
(なし) | io.grpc.ServerInterceptor インターフェースを実装する必要があるクラス名のコンマ区切りリスト |
3.4.0 |
spark.connect.grpc.arrow.maxBatchSize |
4m | Apache Arrow を使用する場合、サーバー側からクライアント側に送信できる 1 つの Arrow バッチの最大サイズを制限します。現在、サイズは正確ではなく推定値であるため、保守的に 70% を使用しています。 | 3.4.0 |
spark.connect.grpc.maxInboundMessageSize |
134217728 | gRPC リクエストの最大受信メッセージサイズを設定します。ペイロードが大きいリクエストは失敗します。 | 3.4.0 |
spark.connect.extensions.relation.classes |
(なし) | proto でカスタムリレーションタイプをサポートするために、org.apache.spark.sql.connect.plugin.RelationPlugin トレイトを実装するクラスのコンマ区切りリスト。 |
3.4.0 |
spark.connect.extensions.expression.classes |
(なし) | proto でカスタム式タイプをサポートするために、org.apache.spark.sql.connect.plugin.ExpressionPlugin トレイトを実装するクラスのコンマ区切りリスト。 |
3.4.0 |
spark.connect.extensions.command.classes |
(なし) | proto でカスタムコマンドタイプをサポートするために、org.apache.spark.sql.connect.plugin.CommandPlugin トレイトを実装するクラスのコンマ区切りリスト。 |
3.4.0 |
spark.connect.ml.backend.classes |
(なし) | 指定された Spark ML オペレーターをバックエンド固有の実装に置き換えるために、org.apache.spark.sql.connect.plugin.MLBackendPlugin トレイトを実装するクラスのコンマ区切りリスト。 |
4.0.0 |
spark.connect.jvmStacktrace.maxSize |
1024 | `spark.sql.pyspark.jvmStacktrace.enabled` が true の場合に表示されるスタックトレースの最大サイズを設定します。 | 3.5.0 |
spark.sql.connect.ui.retainedSessions |
200 | Spark Connect UI の履歴で保持されるクライアントセッション数。 | 3.5.0 |
spark.sql.connect.ui.retainedStatements |
200 | Spark Connect UI の履歴で保持されるステートメント数。 | 3.5.0 |
spark.sql.connect.enrichError.enabled |
true | true の場合、追加の RPC を介してクライアント側でエラーを完全な例外メッセージとオプションのサーバー側スタックトレースでリッチにします。 | 4.0.0 |
spark.sql.connect.serverStacktrace.enabled |
true | true の場合、ユーザーに表示される Spark 例外にサーバー側スタックトレースを設定します。 | 4.0.0 |
spark.connect.grpc.maxMetadataSize |
1024 | メタデータフィールドの最大サイズを設定します。たとえば、`ErrorInfo` のメタデータフィールドを制限します。 | 4.0.0 |
spark.connect.progress.reportInterval |
2s | クエリの進行状況がクライアントに報告される間隔。値が負の値に設定されている場合、進行状況レポートは無効になります。 | 4.0.0 |
セキュリティ
さまざまな Spark サブシステムを保護する方法については、セキュリティページを参照してください。
Spark SQL
実行時 SQL 構成
実行時 SQL 設定は、セッションごとで、変更可能な Spark SQL 設定です。これらは、設定ファイルおよびコマンドラインオプション(プレフィックス付きの --conf/-c)で初期値とともに設定するか、SparkSession を作成するために使用される SparkConf を設定することによって設定できます。また、SET コマンドで設定およびクエリでき、RESET コマンド、または実行時の SparkSession.conf のセッターおよびゲッターメソッドによって初期値にリセットできます。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.sql.adaptive.advisoryPartitionSizeInBytes |
(spark.sql.adaptive.shuffle.targetPostShuffleInputSize の値) |
アダプティブ最適化中(spark.sql.adaptive.enabled が true の場合)のシャッフルパーティションのアドバイザリサイズ(バイト単位)。これは、Spark が小さなシャッフルパーティションを結合したり、スキューしたシャッフルパーティションを分割したりする場合に有効になります。 |
3.0.0 |
spark.sql.adaptive.autoBroadcastJoinThreshold |
(なし) | 結合を実行する際に、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を設定します。この値を -1 に設定すると、ブロードキャストを無効にできます。デフォルト値は spark.sql.autoBroadcastJoinThreshold と同じです。この設定はアダプティブフレームワークでのみ使用されることに注意してください。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.enabled |
true | 'spark.sql.adaptive.enabled' が true で、true に設定されている場合、Spark は、ターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes' で指定)に従って連続したシャッフルパーティションを結合し、小さすぎるタスクを回避します。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum |
(なし) | 結合前のシャッフルパーティションの初期数。設定されていない場合、spark.sql.shuffle.partitions と等しくなります。この設定は、'spark.sql.adaptive.enabled' と 'spark.sql.adaptive.coalescePartitions.enabled' の両方が true の場合にのみ効果があります。 |
3.0.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize |
1MB | 結合後のシャッフルパーティションの最小サイズ。アダプティブに計算されたターゲットサイズがパーティション結合中に小さすぎる場合に役立ちます。 |
3.2.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst |
true | true の場合、Spark は、Spark クラスタのデフォルト並列度に従ってターゲットサイズをアダプティブに計算しますが、'spark.sql.adaptive.advisoryPartitionSizeInBytes'(デフォルト 64MB)で指定されたターゲットサイズを尊重しません。計算されたサイズは通常、設定されたターゲットサイズよりも小さくなります。これは、並列度を最大化し、アダプティブクエリ実行を有効にする際のパフォーマンス低下を回避するためです。ビジーなクラスタでは、リソース利用率をより効率的にするために、この設定を false に設定することをお勧めします(小さすぎるタスクの数が少なくなります)。 |
3.2.0 |
spark.sql.adaptive.customCostEvaluatorClass |
(なし) | アダプティブ実行に使用されるカスタムコスト評価クラス。設定されていない場合、Spark はデフォルトで独自の SimpleCostEvaluator を使用します。 |
3.2.0 |
spark.sql.adaptive.enabled |
true | true の場合、アダプティブクエリ実行を有効にします。これは、正確な実行時統計に基づいて、クエリ実行の途中でクエリプランを再最適化します。 |
1.6.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin |
false | true の場合、追加のシャッフルが発生する場合でも、OptimizeSkewedJoin を強制的に有効にします。 |
3.3.0 |
spark.sql.adaptive.localShuffleReader.enabled |
true | true で 'spark.sql.adaptive.enabled' が true の場合、Spark は、シャッフルパーティショニングが必要ない場合(たとえば、ソートマージ結合をブロードキャストハッシュ結合に変換した後など)に、ローカルシャッフルリーダーを使用してシャッフルデータを読み取ろうとします。 |
3.0.0 |
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold |
0b | ローカルハッシュマップの構築を許可されるパーティションあたりの最大サイズ(バイト単位)を設定します。この値が spark.sql.adaptive.advisoryPartitionSizeInBytes より小さくなく、すべてのパーティションサイズがこの設定より大きくない場合、spark.sql.join.preferSortMergeJoin の値に関わらず、結合選択はシャッフルハッシュ結合の使用を優先します。 |
3.2.0 |
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled |
true | true で 'spark.sql.adaptive.enabled' が true の場合、Spark は RebalancePartitions でスキューしたシャッフルパーティションを最適化し、ターゲットサイズ('spark.sql.adaptive.advisoryPartitionSizeInBytes' で指定)に従って小さなパーティションに分割して、データスキューを回避します。 |
3.2.0 |
spark.sql.adaptive.optimizer.excludedRules |
(なし) | アダプティブオプティマイザで無効にするルールのリストを設定します。ルールはルール名で指定され、コンマで区切られます。オプティマイザは、実際に除外されたルールをログに記録します。 |
3.1.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor |
0.2 | パーティションが spark.sql.adaptive.advisoryPartitionSizeInBytes のこの係数倍よりも小さい場合、パーティションは分割中にマージされます。 |
3.3.0 |
spark.sql.adaptive.skewJoin.enabled |
true | true で 'spark.sql.adaptive.enabled' が true の場合、Spark は、スキューしたパーティションを分割(および必要に応じてレプリケート)することで、シャッフル結合(ソートマージおよびシャッフルハッシュ)のスキューを動的に処理します。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor |
5.0 | パーティションサイズが中央値パーティションサイズのこの係数倍よりも大きく、かつ 'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes' よりも大きい場合、パーティションはスキューしているとみなされます。 |
3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes |
256MB | パーティションサイズがバイト単位でこのしきい値よりも大きく、かつ 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' に中央値パーティションサイズを掛けたものよりも大きい場合、パーティションはスキューしているとみなされます。理想的には、この設定は 'spark.sql.adaptive.advisoryPartitionSizeInBytes' よりも大きく設定する必要があります。 |
3.0.0 |
spark.sql.allowNamedFunctionArguments |
true | true の場合、Spark は、実装されているすべての関数に対して名前付きパラメータのサポートを有効にします。 |
3.5.0 |
spark.sql.ansi.doubleQuotedIdentifiers |
false | true で 'spark.sql.ansi.enabled' が true の場合、Spark SQL は二重引用符(")で囲まれたリテラルを識別子として読み取ります。false の場合、それらは文字列リテラルとして読み取られます。 |
3.4.0 |
spark.sql.ansi.enabled |
true | true の場合、Spark SQL は Hive 準拠ではなく、ANSI 準拠のダイアレクトを使用します。たとえば、Spark は無効な SQL 演算子/関数の入力に対して null 結果を返すのではなく、実行時に例外をスローします。このダイアレクトの詳細については、Spark のドキュメントの「ANSI Compliance」セクションを参照してください。一部の ANSI ダイアレクト機能は ANSI SQL 標準から直接ではない場合がありますが、その動作は ANSI SQL のスタイルに沿っています。 |
3.0.0 |
spark.sql.ansi.enforceReservedKeywords |
false | true で 'spark.sql.ansi.enabled' が true の場合、Spark SQL パーサーは ANSI 予約キーワードを強制し、予約キーワードをエイリアス名および/またはテーブル、ビュー、関数などの識別子として使用する SQL クエリを禁止します。 |
3.3.0 |
spark.sql.ansi.relationPrecedence |
false | true で 'spark.sql.ansi.enabled' が true の場合、JOIN はリレーションの結合においてカンマよりも優先されます。たとえば、 |
3.4.0 |
spark.sql.autoBroadcastJoinThreshold |
10MB | 結合を実行する際に、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を設定します。この値を -1 に設定すると、ブロードキャストを無効にできます。 |
1.1.0 |
spark.sql.avro.compression.codec |
snappy | AVRO ファイルの書き込みに使用される圧縮コーデック。サポートされているコーデック: uncompressed, deflate, snappy, bzip2, xz, zstandard。デフォルトのコーデックは snappy です。 |
2.4.0 |
spark.sql.avro.deflate.level |
-1 | AVRO ファイルの書き込みに使用される deflate コーデックの圧縮レベル。有効な値は 1 から 9 の範囲(または -1)である必要があります。デフォルト値は -1 で、現在の実装ではレベル 6 に対応します。 |
2.4.0 |
spark.sql.avro.filterPushdown.enabled |
true | true の場合、Avro データソースへのフィルタプッシュダウンを有効にします。 |
3.1.0 |
spark.sql.avro.xz.level |
6 | AVRO ファイルの書き込みに使用される xz コーデックの圧縮レベル。有効な値は 1 から 9 の範囲である必要があります。デフォルト値は 6 です。 |
4.0.0 |
spark.sql.avro.zstandard.bufferPool.enabled |
false | true の場合、AVRO ファイルの書き込み時に ZSTD JNI ライブラリのバッファプールを有効にします。 |
4.0.0 |
spark.sql.avro.zstandard.level |
3 | AVRO ファイルの書き込みに使用される zstandard コーデックの圧縮レベル。 |
4.0.0 |
spark.sql.binaryOutputStyle |
(なし) | バイナリデータを表示するために使用される出力スタイル。有効な値は 'UTF-8', 'BASIC', 'BASE64', 'HEX', 'HEX_DISCRETE' です。 |
4.0.0 |
spark.sql.broadcastTimeout |
300 | ブロードキャスト結合でのブロードキャスト待機時間のタイムアウト(秒)。 |
1.3.0 |
spark.sql.bucketing.coalesceBucketsInJoin.enabled |
false | true の場合、バケット数が異なる 2 つのバケット化されたテーブルが結合される場合、バケット数が多い側はもう一方の側と同じバケット数になるように結合されます。バケット数が多い方が、少ない方のバケット数で割り切れる必要があります。バケット結合は、ソートマージ結合およびシャッフルハッシュ結合に適用されます。注意: バケット化されたテーブルの結合は、結合時の不要なシャッフルを回避できますが、並列度も低下し、シャッフルハッシュ結合で OOM が発生する可能性があります。 |
3.1.0 |
spark.sql.bucketing.coalesceBucketsInJoin.maxBucketRatio |
4 | バケット結合が適用されるためには、結合される 2 つのバケットの数に対する比率がこの値以下である必要があります。この設定は、'spark.sql.bucketing.coalesceBucketsInJoin.enabled' が true に設定されている場合にのみ効果があります。 |
3.1.0 |
spark.sql.catalog.spark_catalog |
builtin | Spark の組み込み v1 カタログ:spark_catalog の v2 インターフェースとして使用されるカタログ実装。このカタログは spark_catalog と同じ識別子名前空間を共有し、それと一致する必要があります。たとえば、テーブルが spark_catalog でロードできる場合、このカタログもテーブルメタデータを返す必要があります。spark_catalog に操作を委譲するために、実装は 'CatalogExtension' を拡張できます。値は 'builtin'(Spark の組み込み V2SessionCatalog を表す)またはカタログ実装の完全修飾クラス名のいずれかである必要があります。 |
3.0.0 |
spark.sql.cbo.enabled |
false | true に設定すると、プラン統計の推定のために CBO を有効にします。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.star.filter |
false | コストベースの結合列挙にスター結合フィルターヒューリスティクスを適用します。 |
2.2.0 |
spark.sql.cbo.joinReorder.dp.threshold |
12 | 動的計画法アルゴリズムで許可される結合ノードの最大数。 |
2.2.0 |
spark.sql.cbo.joinReorder.enabled |
false | CBO で結合並べ替えを有効にします。 |
2.2.0 |
spark.sql.cbo.planStats.enabled |
false | true の場合、論理プランはカタログから行数と列統計を取得します。 |
3.0.0 |
spark.sql.cbo.starSchemaDetection |
false | true の場合、スター スキーマ検出に基づく結合並べ替えを有効にします。 |
2.2.0 |
spark.sql.charAsVarchar |
false | true の場合、Spark は CREATE/REPLACE/ALTER TABLE コマンドで CHAR 型を VARCHAR 型に置き換えるため、新しく作成/更新されたテーブルには CHAR 型の列/フィールドはなくなります。CHAR 型の列/フィールドを持つ既存のテーブルは、この設定の影響を受けません。 |
3.3.0 |
spark.sql.chunkBase64String.enabled |
true |
|
3.5.2 |
spark.sql.cli.print.header |
false | true に設定されている場合、spark-sql CLI はクエリ出力に列の名前を表示します。 |
3.2.0 |
spark.sql.columnNameOfCorruptRecord |
_corrupt_record | 解析に失敗した生の/解析されていない JSON および CSV レコードを格納するための内部列の名前。 |
1.2.0 |
spark.sql.csv.filterPushdown.enabled |
true | true の場合、CSV データソースへのフィルタプッシュダウンを有効にします。 |
3.0.0 |
spark.sql.datetime.java8API.enabled |
false | 設定プロパティが true に設定されている場合、Java 8 API の java.time.Instant および java.time.LocalDate クラスが、Catalyst の TimestampType および DateType の外部タイプとして使用されます。false に設定されている場合、java.sql.Timestamp および java.sql.Date が同じ目的で使用されます。 |
3.0.0 |
spark.sql.debug.maxToStringFields |
25 | デバッグ出力で文字列に変換できるシーケンスライクなエントリのフィールドの最大数。制限を超える要素はドロップされ、「... N more fields」プレースホルダーに置き換えられます。 |
3.0.0 |
spark.sql.defaultCacheStorageLevel |
MEMORY_AND_DISK |
|
4.0.0 |
spark.sql.defaultCatalog |
spark_catalog | デフォルトカタログの名前。ユーザーが現在のカタログを明示的に設定していない場合、これは現在のカタログになります。 |
3.0.0 |
spark.sql.error.messageFormat |
PRETTY | PRETTY の場合、エラーメッセージは、エラークラス、メッセージ、およびクエリコンテキストのテキスト表現で構成されます。MINIMAL および STANDARD フォーマットは、STANDARD が追加の JSON フィールド |
3.4.0 |
spark.sql.execution.arrow.enabled |
false | (Spark 3.0 で非推奨、'spark.sql.execution.arrow.pyspark.enabled' を設定してください。) |
2.3.0 |
spark.sql.execution.arrow.fallback.enabled |
true | (Spark 3.0 で非推奨、'spark.sql.execution.arrow.pyspark.fallback.enabled' を設定してください。) |
2.4.0 |
spark.sql.execution.arrow.localRelationThreshold |
48MB | Arrow バッチを Spark DataFrame に変換する際、Arrow バッチのバイトサイズがこのしきい値より小さい場合、ドライバー側でローカルコレクションが使用されます。それ以外の場合、Arrow バッチはエグゼキューターに送信され、Spark 内部行にデシリアライズされます。 |
3.4.0 |
spark.sql.execution.arrow.maxRecordsPerBatch |
10000 | Apache Arrow を使用する場合、メモリ内の単一 ArrowRecordBatch に書き込むことができるレコードの最大数を制限します。この設定は、各グループが各 ArrowRecordBatch になるため、DataFrame(.cogroup).groupby.applyInPandas のようなグループ化 API では効果がありません。ゼロまたは負の値に設定されている場合、制限はありません。spark.sql.execution.arrow.maxBytesPerBatch も参照してください。両方が設定されている場合、いずれかの条件が満たされると各バッチが作成されます。 |
2.3.0 |
spark.sql.execution.arrow.pyspark.enabled |
(spark.sql.execution.arrow.enabled の値) |
true の場合、PySpark での列指向データ転送に Apache Arrow を使用します。この最適化は、1. pyspark.sql.DataFrame.toPandas。2. pyspark.sql.SparkSession.createDataFrame(入力が Pandas DataFrame または NumPy ndarray の場合)に適用されます。次のデータ型はサポートされていません: TimestampType の ArrayType。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.fallback.enabled |
(spark.sql.execution.arrow.fallback.enabled の値) |
true の場合、エラーが発生すると、'spark.sql.execution.arrow.pyspark.enabled' によって有効化された最適化は自動的に最適化されていない実装にフォールバックします。 |
3.0.0 |
spark.sql.execution.arrow.pyspark.selfDestruct.enabled |
false | (実験的) true の場合、PySpark での列指向データ転送のために Apache Arrow の自己破壊および分割ブロックオプションを使用します。これは、CPU 時間を一部犠牲にしてメモリ使用量を削減します。この最適化は、'spark.sql.execution.arrow.pyspark.enabled' が設定されている場合の pyspark.sql.DataFrame.toPandas に適用されます。 |
3.2.0 |
spark.sql.execution.arrow.sparkr.enabled |
false | true の場合、SparkR での列指向データ転送に Apache Arrow を使用します。この最適化は、1. createDataFrame(入力が R DataFrame の場合)2. collect 3. dapply 4. gapply に適用されます。次のデータ型はサポートされていません: FloatType, BinaryType, ArrayType, StructType, MapType。 |
3.0.0 |
spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch |
10000 | TransformWithStateInPandas を使用する場合、メモリ内の単一 ArrowRecordBatch に書き込むことができる状態レコードの最大数を制限します。 |
4.0.0 |
spark.sql.execution.arrow.useLargeVarTypes |
false | Apache Arrow を使用する場合、文字列およびバイナリタイプには大きな可変幅ベクトルを使用します。通常の文字列およびバイナリタイプは、単一レコードバッチで 2GiB の制限があります。大きな可変幅タイプは、値あたりのメモリ使用量が増加する代わりに、この制限を解除します。 |
3.5.0 |
spark.sql.execution.interruptOnCancel |
true | true の場合、クエリをキャンセルすると、すべての実行中タスクが中断されます。 |
4.0.0 |
spark.sql.execution.pandas.inferPandasDictAsMap |
false | true の場合、spark.createDataFrame は Pandas DataFrame からの dict を MapType として推測します。false の場合、spark.createDataFrame は Pandas DataFrame からの dict を StructType として推測します(これは PyArrow からのデフォルトの推測です)。 |
4.0.0 |
spark.sql.execution.pandas.structHandlingMode |
legacy | pandas DataFrame を作成する際の構造体タイプの変換モード。「legacy」の場合、1. Arrow 最適化が無効な場合、Row オブジェクトに変換します。2. Arrow 最適化が有効な場合、重複するネストフィールド名がある場合は dict に変換するか例外をスローします。「row」の場合、Arrow 最適化に関係なく Row オブジェクトに変換します。「dict」の場合、dict に変換し、重複するネストフィールド名がある場合は接尾辞付きキー名(例: a_0, a_1)を使用します。Arrow 最適化に関係ありません。 |
3.5.0 |
spark.sql.execution.pandas.udf.buffer.size |
(spark.buffer.size の値) |
Pandas UDF 実行にのみ適用される |
3.0.0 |
spark.sql.execution.pyspark.udf.faulthandler.enabled |
(spark.python.worker.faulthandler.enabled の値) |
DataFrame および SQL を使用した Python 実行の場合の spark.python.worker.faulthandler.enabled と同じです。実行中に変更できます。 |
4.0.0 |
spark.sql.execution.pyspark.udf.hideTraceback.enabled |
false | true の場合、Python UDF からの例外メッセージのみを表示し、スタックトレースは非表示にします。これが有効な場合、simplifiedTraceback は効果がありません。 |
4.0.0 |
spark.sql.execution.pyspark.udf.idleTimeoutSeconds |
(spark.python.worker.idleTimeoutSeconds の値) |
DataFrame および SQL を使用した Python 実行の場合の spark.python.worker.idleTimeoutSeconds と同じです。実行中に変更できます。 |
4.0.0 |
spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled |
true | true の場合、Python UDF からのトレースバックが簡略化されます。Python ワーカー、(デ)シリアライゼーションなど、PySpark からのトレースバックを非表示にし、UDF からの例外メッセージのみを表示します。これは CPython 3.7+ でのみ機能することに注意してください。 |
3.1.0 |
spark.sql.execution.python.udf.buffer.size |
(spark.buffer.size の値) |
Python UDF 実行にのみ適用される |
4.0.0 |
spark.sql.execution.python.udf.maxRecordsPerBatch |
100 | Python UDF を使用する場合、シリアライゼーション/デシリアライゼーションのためにバッチ処理できるレコードの最大数を制限します。 |
4.0.0 |
spark.sql.execution.pythonUDF.arrow.concurrency.level |
(なし) | Arrow 最適化された Python UDF を実行するための並列度。Python UDF が I/O 負荷が高い場合、これは便利です。 |
4.0.0 |
spark.sql.execution.pythonUDF.arrow.enabled |
false | 通常の Python UDF で Arrow 最適化を有効にします。この最適化は、指定された関数が少なくとも 1 つの引数を受け取る場合にのみ有効にできます。 |
3.4.0 |
spark.sql.execution.pythonUDTF.arrow.enabled |
false | Python UDTF の Arrow 最適化を有効にします。 |
3.5.0 |
spark.sql.execution.topKSortFallbackThreshold |
2147483632 | 「SELECT x FROM t ORDER BY y LIMIT m」のような SORT の後に LIMIT が続く SQL クエリで、m がこのしきい値未満の場合、メモリ内でトップ K ソートを実行し、それ以外の場合はグローバルソートを実行して、必要に応じてディスクにスピルします。 |
2.4.0 |
spark.sql.extendedExplainProviders |
(なし) | org.apache.spark.sql.ExtendedExplainGenerator トレイトを実装するクラスのコンマ区切りリスト。指定されている場合、Spark は、プロバイダーから追加のプラン情報を、explain プランおよび UI で出力します。 |
4.0.0 |
spark.sql.files.ignoreCorruptFiles |
false | 破損したファイルを無視するかどうか。true の場合、Spark ジョブは破損したファイルに遭遇しても実行を継続し、読み取られた内容は引き続き返されます。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 |
2.1.1 |
spark.sql.files.ignoreInvalidPartitionPaths |
false | <column>=<value> に一致しない無効なパーティションパスを無視するかどうか。オプションが有効な場合、'table/invalid' および 'table/col=1' という 2 つのパーティションディレクトリを持つテーブルは、後者のディレクトリのみをロードし、無効なパーティションを無視します。 |
4.0.0 |
spark.sql.files.ignoreMissingFiles |
false | 欠落しているファイルを無視するかどうか。true の場合、Spark ジョブは欠落しているファイルに遭遇しても実行を継続し、読み取られた内容は引き続き返されます。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 |
2.3.0 |
spark.sql.files.maxPartitionBytes |
128MB | ファイルを読み取る際に 1 つのパーティションにパックできる最大バイト数。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 |
2.0.0 |
spark.sql.files.maxPartitionNum |
(なし) | 分割ファイルパーティションの推奨(保証なし)最大数。設定されている場合、初期パーティション数がこれを超えると、Spark は各パーティションを再スケーリングして、パーティション数をこの値に近づけます。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 |
3.5.0 |
spark.sql.files.maxRecordsPerFile |
0 | 1 つのファイルに書き出すレコードの最大数。この値がゼロまたは負の場合、制限はありません。 |
2.2.0 |
spark.sql.files.minPartitionNum |
(なし) | 分割ファイルパーティションの推奨(保証なし)最小数。設定されていない場合、デフォルト値は |
3.1.0 |
spark.sql.function.concatBinaryAsString |
false | このオプションが false に設定されており、すべての入力がバイナリの場合、 |
2.3.0 |
spark.sql.function.eltOutputAsString |
false | このオプションが false に設定されており、すべての入力がバイナリの場合、 |
2.3.0 |
spark.sql.groupByAliases |
true | true の場合、SELECT リストのエイリアスを GROUP BY 句で使用できます。false の場合、このケースでは解析例外がスローされます。 |
2.2.0 |
spark.sql.groupByOrdinal |
true | true の場合、GROUP BY 句の序数番号は SELECT リスト内の位置として扱われます。false の場合、序数番号は無視されます。 |
2.0.0 |
spark.sql.hive.convertInsertingPartitionedTable |
true | true に設定され、 |
3.0.0 |
spark.sql.hive.convertInsertingUnpartitionedTable |
true | true に設定され、 |
4.0.0 |
spark.sql.hive.convertMetastoreCtas |
true | true に設定されている場合、Spark は Hive serde の代わりに組み込みデータソースライターを CTAS で使用しようとします。このフラグは、Parquet および ORC フォーマットに対して |
3.0.0 |
spark.sql.hive.convertMetastoreInsertDir |
true | true に設定されている場合、Spark は Hive serde の代わりに INSERT OVERWRITE DIRECTORY で組み込みデータソースライターを使用しようとします。このフラグは、Parquet および ORC フォーマットに対して |
3.3.0 |
spark.sql.hive.convertMetastoreOrc |
true | true に設定されている場合、Hive serde の代わりに、組み込み ORC リーダーおよびライターが HiveQL 構文を使用して作成された ORC テーブルを処理するために使用されます。 |
2.0.0 |
spark.sql.hive.convertMetastoreParquet |
true | true に設定されている場合、Hive serde の代わりに、組み込み Parquet リーダーおよびライターが HiveQL 構文を使用して作成された Parquet テーブルを処理するために使用されます。 |
1.1.1 |
spark.sql.hive.convertMetastoreParquet.mergeSchema |
false | true の場合、異なる Parquet データファイルで互換性がある可能性のある異なる Parquet スキーマをマージしようとします。この設定は、「spark.sql.hive.convertMetastoreParquet」が true の場合にのみ有効です。 |
1.3.1 |
spark.sql.hive.dropPartitionByName.enabled |
false | true の場合、Spark はパーティションオブジェクトではなくパーティション名を取得してパーティションをドロップするため、パーティションのドロップパフォーマンスを向上させることができます。 |
3.4.0 |
spark.sql.hive.filesourcePartitionFileCacheSize |
262144000 | ゼロ以外の場合、メモリ内でのパーティションファイルメタデータのキャッシングを有効にします。すべてのテーブルは、ファイルメタデータに指定されたバイト数まで使用できるキャッシュを共有します。この設定は、Hive ファイルソースパーティション管理が有効になっている場合にのみ効果があります。 |
2.1.1 |
spark.sql.hive.manageFilesourcePartitions |
true | true の場合、データソーステーブルと変換された Hive テーブルの両方に対して、ファイルソーステーブルのメタストアパーティション管理を有効にします。パーティション管理が有効な場合、データソーステーブルはパーティションを Hive メタストアに格納し、spark.sql.hive.metastorePartitionPruning が true に設定されている場合、クエリ計画中にパーティションをプルーニングするためにメタストアを使用します。 |
2.1.1 |
spark.sql.hive.metastorePartitionPruning |
true | true の場合、一部の述語は Hive メタストアにプッシュダウンされるため、一致しないパーティションを早期に除外できます。 |
1.5.0 |
spark.sql.hive.metastorePartitionPruningFallbackOnException |
false | メタストアから MetaException が発生した場合、Hive メタストアからすべてのパーティションを取得し、Spark クライアント側でパーティションプルーニングを実行するようにフォールバックするかどうか。これが有効な場合、リストするパーティションが多いと Spark クエリのパフォーマンスが低下する可能性があることに注意してください。無効にすると、Spark はクエリを失敗させます。 |
3.3.0 |
spark.sql.hive.metastorePartitionPruningFastFallback |
false | この設定が有効な場合、述語が Hive でサポートされていない場合、またはメタストアから MetaException が発生して Spark がフォールバックする場合、Spark は代わりにパーティション名を取得してからクライアント側でフィルター式を評価することによってパーティションをプルーニングします。TimeZoneAwareExpression を持つ述語はサポートされていないことに注意してください。 |
3.3.0 |
spark.sql.hive.thriftServer.async |
true | true に設定されている場合、Hive Thrift Server は SQL クエリを非同期で実行します。 |
1.5.0 |
spark.sql.icu.caseMappings.enabled |
true | 有効な場合、UTF8_BINARY コレーションの下で文字列のケースマッピングを実装するために(JVM の代わりに)ICU ライブラリを使用します。 |
4.0.0 |
spark.sql.inMemoryColumnarStorage.batchSize |
10000 | 列キャッシュのバッチサイズを制御します。バッチサイズが大きいと、メモリ使用量と圧縮が改善される可能性がありますが、データをキャッシュする際に OOM を引き起こすリスクがあります。 |
1.1.1 |
spark.sql.inMemoryColumnarStorage.compressed |
true | true に設定すると、Spark SQL はデータ統計に基づいて各列の圧縮コーデックを自動的に選択します。 |
1.0.1 |
spark.sql.inMemoryColumnarStorage.enableVectorizedReader |
true | 列キャッシュ用のベクトル化リーダーを有効にします。 |
2.3.1 |
spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio |
1.2 | spark.sql.inMemoryColumnarStorage.hugeVectorThreshold <= 0 または必要なメモリが spark.sql.inMemoryColumnarStorage.hugeVectorThreshold より小さい場合、spark は必要なメモリの 2 倍のメモリを予約します。それ以外の場合、spark は必要なメモリにこの比率を乗じたメモリを予約し、次のバッチ行を読み取る前にこの列ベクトルメモリを解放します。 |
4.0.0 |
spark.sql.inMemoryColumnarStorage.hugeVectorThreshold |
-1b | 必要なメモリがこれより大きい場合、spark は次に必要なメモリの spark.sql.inMemoryColumnarStorage.hugeVectorReserveRatio 倍のメモリを予約し、次のバッチ行を読み取る前にこの列ベクトルメモリを解放します。-1 は最適化を無効にすることを意味します。 |
4.0.0 |
spark.sql.json.filterPushdown.enabled |
true | true の場合、JSON データソースへのフィルタプッシュダウンを有効にします。 |
3.1.0 |
spark.sql.json.useUnsafeRow |
false | true に設定すると、JSON パーサーで構造体結果を表すために UnsafeRow を使用します。これは、JSON オプション |
4.0.0 |
spark.sql.jsonGenerator.ignoreNullFields |
true | JSON データソースおよび to_json などの JSON 関数で JSON オブジェクトを生成する際に null フィールドを無視するかどうか。false の場合、JSON オブジェクトの null フィールドに対して null を生成します。 |
3.0.0 |
spark.sql.leafNodeDefaultParallelism |
(なし) | ファイルスキャンノード、ローカルデータスキャンノード、レンジノードなど、データを生成する Spark SQL リーフノードのデフォルトの並列度。この構成のデフォルト値は 'SparkContext#defaultParallelism' です。 |
3.2.0 |
spark.sql.mapKeyDedupPolicy |
EXCEPTION | 組み込み関数 (CreateMap、MapFromArrays、MapFromEntries、StringToMap、MapConcat、TransformKeys) でマップキーを重複排除するポリシー。EXCEPTION の場合、重複マップキーが検出されるとクエリが失敗します。LAST_WIN の場合、最後に挿入されたマップキーが優先されます。 |
3.0.0 |
spark.sql.maven.additionalRemoteRepositories |
https://maven-central.storage-download.googleapis.com/maven2/ | オプションの追加リモート Maven ミラーリポジトリのコンマ区切りの文字列構成。これは、デフォルトの Maven Central リポジトリに到達できない場合に、IsolatedClientLoader で Hive JAR をダウンロードするためにのみ使用されます。 |
3.0.0 |
spark.sql.maxMetadataStringLength |
100 | メタデータ文字列の出力最大文字数。例: |
3.1.0 |
spark.sql.maxPlanStringLength |
2147483632 | プラン文字列の出力最大文字数。プランがこれより長い場合、それ以降の出力は切り捨てられます。デフォルト設定では常に完全なプランが生成されます。プラン文字列がメモリを大量に消費しすぎる、またはドライバーまたは UI プロセスで OutOfMemory エラーを引き起こす場合は、8k のような低い値に設定してください。 |
3.0.0 |
spark.sql.maxSinglePartitionBytes |
128m | 単一パーティションで許可される最大バイト数。それ以外の場合、プランナーは並列度を向上させるためにシャッフルを導入します。 |
3.4.0 |
spark.sql.operatorPipeSyntaxEnabled |
true | true の場合、Apache Spark SQL で演算子パイプ構文を有効にします。これは、クエリが実行するステップのシーケンスを合成可能な方法で記述する manner で、SQL の句の区切りを示すために演算子パイプマーカー |> を使用します。 |
4.0.0 |
spark.sql.optimizer.avoidCollapseUDFWithExpensiveExpr |
true | UDF で高価な式を重複させるプロジェクションの折りたたみ (collapse) を回避するかどうか。 |
4.0.0 |
spark.sql.optimizer.collapseProjectAlwaysInline |
false | extra の重複を引き起こす場合でも、常に 2 つの隣接するプロジェクションを折りたたみ、式をインライン化するかどうか。 |
3.3.0 |
spark.sql.optimizer.dynamicPartitionPruning.enabled |
true | true の場合、パーティション列が結合キーとして使用されるときに、パーティション列の述語を生成します。 |
3.0.0 |
spark.sql.optimizer.enableCsvExpressionOptimization |
true | SQL オプティマイザーで CSV 式を最適化するかどうか。これには、from_csv から不要な列を削除することが含まれます。 |
3.2.0 |
spark.sql.optimizer.enableJsonExpressionOptimization |
true | SQL オプティマイザーで JSON 式を最適化するかどうか。これには、from_json から不要な列を削除すること、from_json + to_json の単純化、to_json + named_struct(from_json.col1, from_json.col2, ....) が含まれます。 |
3.1.0 |
spark.sql.optimizer.excludedRules |
(なし) | オプティマイザーで無効にするルールのリストを構成します。ルールはルール名で指定され、コンマで区切られます。一部のルールは正しさのために必要であるため、この構成のすべてのルールが最終的に除外されることが保証されるわけではありません。オプティマイザーは、実際に除外されたルールをログに記録します。 |
2.4.0 |
spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold |
10GB | ブルームフィルター適用側プランの集約スキャンサイズのバイトサイズしきい値。ブルームフィルターを挿入するには、ブルームフィルター適用側プランの集約スキャンバイトサイズがこの値を超える必要があります。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold |
10MB | ブルームフィルター作成側プランのサイズしきい値。ブルームフィルターの挿入を試みるには、推定サイズがこの値以下である必要があります。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.enabled |
true | true で、シャッフル結合の片側が選択的な述語を持つ場合、シャッフルデータを削減するために反対側でブルームフィルターを挿入しようとします。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.expectedNumItems |
1000000 | ランタイムブルームフィルターの期待されるアイテムのデフォルト数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumBits |
67108864 | ランタイムブルームフィルターに使用される最大ビット数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.maxNumItems |
4000000 | ランタイムブルームフィルターの期待されるアイテムの最大許容数。 |
3.3.0 |
spark.sql.optimizer.runtime.bloomFilter.numBits |
8388608 | ランタイムブルームフィルターに使用されるデフォルトビット数。 |
3.3.0 |
spark.sql.optimizer.runtime.rowLevelOperationGroupFilter.enabled |
true | グループベースの行レベル操作のランタイムグループフィルタリングを有効にします。グループ (ファイル、パーティションなど) のデータを置き換えるデータソースは、行レベル操作スキャンを計画する際に提供されたデータソースフィルターを使用して、グループ全体をプルーニングできます。ただし、すべての式をデータソースフィルターに変換できるわけではなく、一部の式は Spark によってのみ評価できる (例: サブクエリ) ため、このようなフィルタリングは制限されます。グループの書き換えはコストがかかるため、Spark はランタイムでクエリを実行して、行レベル操作の条件に一致するレコードを特定できます。一致するレコードの情報は行レベル操作スキャンに渡され、データソースは書き換えが必要ないグループを破棄できます。 |
3.4.0 |
spark.sql.optimizer.runtimeFilter.number.threshold |
10 | 1 つのクエリに挿入されたランタイムフィルター (DPP 以外) の総数。これは、多数のブルームフィルターによるドライバー OOM を防ぐためです。 |
3.3.0 |
spark.sql.orc.aggregatePushdown |
false | true の場合、集計は最適化のために ORC にプッシュダウンされます。集計式として MIN、MAX、COUNT をサポートします。MIN/MAX の場合、ブール値、整数、浮動小数点数、日付型をサポートします。COUNT の場合、すべてのデータ型をサポートします。いずれかの ORC ファイルフッターに統計情報がない場合、例外がスローされます。 |
3.3.0 |
spark.sql.orc.columnarReaderBatchSize |
4096 | ORC ベクトル化リーダーバッチに含まれる行数。オーバーヘッドを最小限に抑え、データの読み取りで OOM を回避するために、この数を慎重に選択する必要があります。 |
2.4.0 |
spark.sql.orc.columnarWriterBatchSize |
1024 | ORC ベクトル化ライターバッチに含まれる行数。オーバーヘッドを最小限に抑え、データの書き込みで OOM を回避するために、この数を慎重に選択する必要があります。 |
3.4.0 |
spark.sql.orc.compression.codec |
zstd | ORC ファイルの書き込み時に使用される圧縮コーデックを設定します。テーブル固有のオプション/プロパティで |
2.3.0 |
spark.sql.orc.enableNestedColumnVectorizedReader |
true | ネストされた列のベクトル化 ORC デコードを有効にします。 |
3.2.0 |
spark.sql.orc.enableVectorizedReader |
true | ベクトル化 ORC デコードを有効にします。 |
2.3.0 |
spark.sql.orc.filterPushdown |
true | true の場合、ORC ファイルへのフィルタプッシュダウンを有効にします。 |
1.4.0 |
spark.sql.orc.mergeSchema |
false | true の場合、ORC データソースはすべてのデータファイルから収集されたスキーマをマージします。それ以外の場合、スキーマはランダムなデータファイルから選択されます。 |
3.0.0 |
spark.sql.orderByOrdinal |
true | true の場合、序数 (ordinal) 番号は SELECT リストでの位置として扱われます。false の場合、ORDER BY または SORT BY 句の序数番号は無視されます。 |
2.0.0 |
spark.sql.parquet.aggregatePushdown |
false | true の場合、集計は最適化のために Parquet にプッシュダウンされます。集計式として MIN、MAX、COUNT をサポートします。MIN/MAX の場合、ブール値、整数、浮動小数点数、日付型をサポートします。COUNT の場合、すべてのデータ型をサポートします。いずれかの Parquet ファイルフッターに統計情報がない場合、例外がスローされます。 |
3.3.0 |
spark.sql.parquet.binaryAsString |
false | 一部の Parquet 生成システム (特に Impala や Spark SQL の古いバージョン) は、Parquet スキーマを書き出す際にバイナリデータと文字列を区別しません。このフラグは、これらのシステムとの互換性を提供するために、Spark SQL がバイナリデータを文字列として解釈するように指示します。 |
1.1.1 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | Parquet ベクトル化リーダーバッチに含まれる行数。オーバーヘッドを最小限に抑え、データの読み取りで OOM を回避するために、この数を慎重に選択する必要があります。 |
2.4.0 |
spark.sql.parquet.compression.codec |
snappy | Parquet ファイルの書き込み時に使用される圧縮コーデックを設定します。テーブル固有のオプション/プロパティで |
1.1.1 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | ネストされた列 (struct、list、map など) のベクトル化 Parquet デコードを有効にします。spark.sql.parquet.enableVectorizedReader が有効になっている必要があります。 |
3.3.0 |
spark.sql.parquet.enableVectorizedReader |
true | ベクトル化 Parquet デコードを有効にします。 |
2.0.0 |
spark.sql.parquet.fieldId.read.enabled |
false | フィールド ID は Parquet スキーマ仕様のネイティブフィールドです。有効にすると、Parquet リーダーは、要求された Spark スキーマのフィールド ID (存在する場合) を使用して、列名ではなく Parquet フィールドを検索します。 |
3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | Parquet ファイルにフィールド ID がなく、Spark の読み取りスキーマがフィールド ID を使用して読み取っている場合、このフラグが有効になっていると null がサイレントに返され、それ以外の場合はエラーになります。 |
3.3.0 |
spark.sql.parquet.fieldId.write.enabled |
true | フィールド ID は Parquet スキーマ仕様のネイティブフィールドです。有効にすると、Parquet ライターは、Spark スキーマに (存在する場合) フィールド ID メタデータを Parquet スキーマに書き込みます。 |
3.3.0 |
spark.sql.parquet.filterPushdown |
true | true に設定すると、Parquet フィルタプッシュダウン最適化が有効になります。 |
1.2.0 |
spark.sql.parquet.inferTimestampNTZ.enabled |
true | 有効にすると、注釈 isAdjustedToUTC = false を持つ Parquet タイムスタンプ列は、スキーマ推論中に TIMESTAMP_NTZ 型として推論されます。それ以外の場合、すべての Parquet タイムスタンプ列は TIMESTAMP_LTZ 型として推論されます。Spark はファイル書き込み時に出力スキーマを Parquet のフッターメタデータに書き込み、ファイル読み取り時にそれを利用することに注意してください。したがって、この構成は、Spark によって書き込まれていない Parquet ファイルのスキーマ推論にのみ影響します。 |
3.4.0 |
spark.sql.parquet.int96AsTimestamp |
true | 一部の Parquet 生成システム (特に Impala) は、INT96 にタイムスタンプを格納します。ナノ秒フィールドの精度低下を回避する必要があるため、Spark もタイムスタンプを INT96 として格納します。このフラグは、これらのシステムとの互換性を提供するために、Spark SQL が INT96 データをタイムスタンプとして解釈するように指示します。 |
1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | Impala によって書き込まれたデータの場合、INT96 データをタイムスタンプに変換する際にタイムスタンプ調整を適用するかどうかを制御します。これは、Impala が Hive & Spark とは異なるタイムゾーンオフセットで INT96 データを格納するため、必要です。 |
2.3.0 |
spark.sql.parquet.mergeSchema |
false | true の場合、Parquet データソースはすべてのデータファイルから収集されたスキーマをマージします。それ以外の場合、スキーマはサマリーファイルから、またはサマリーファイルがない場合はランダムなデータファイルから選択されます。 |
1.5.0 |
spark.sql.parquet.outputTimestampType |
INT96 | Spark が Parquet ファイルにデータを書き込む際に使用する Parquet タイムスタンプ型を設定します。INT96 は Parquet で非標準ですが一般的に使用されるタイムスタンプ型です。TIMESTAMP_MICROS は Parquet で標準のタイムスタンプ型であり、Unix エポックからのマイクロ秒数を格納します。TIMESTAMP_MILLIS も標準ですが、ミリ秒精度であり、Spark はタイムスタンプ値のマイクロ秒部分を切り捨てる必要があります。 |
2.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | true の場合、プッシュダウンされたフィルターを使用した Parquet のネイティブレコードレベルフィルタリングを有効にします。この構成は、'spark.sql.parquet.filterPushdown' が有効で、ベクトル化リーダーが使用されていない場合にのみ効果があります。'spark.sql.parquet.enableVectorizedReader' を false に設定することで、ベクトル化リーダーが使用されていないことを確認できます。 |
2.3.0 |
spark.sql.parquet.respectSummaryFiles |
false | true の場合、Parquet のすべてのパートファイルがサマリーファイルと一貫していると仮定し、スキーマのマージ時にそれらを無視します。それ以外の場合 (false、デフォルト)、すべてのパートファイルをマージします。これは上級者向けオプションであり、正確な意味を理解する前に有効にすべきではありません。 |
1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | true の場合、データは Spark 1.4 以前の方法で書き込まれます。たとえば、Decimal 値は Apache Parquet の固定長バイト配列形式で書き込まれます。これは Apache Hive や Apache Impala などの他のシステムが使用する形式です。false の場合、Parquet の新しい形式が使用されます。たとえば、Decimal は整数ベースの形式で書き込まれます。Parquet 出力がこの新しい形式をサポートしないシステムでの使用を意図している場合は、true に設定してください。 |
1.6.0 |
spark.sql.parser.quotedRegexColumnNames |
false | true の場合、SELECT ステートメントの引用符で囲まれた識別子 (バックティックを使用) は正規表現として解釈されます。 |
2.3.0 |
spark.sql.pivotMaxValues |
10000 | ピボット列の値が指定されていないピボットを行う場合、エラーなしで収集される (一意な) 値の最大数。 |
1.6.0 |
spark.sql.planner.pythonExecution.memory |
(なし) | Spark ドライバーで Python コードを実行するためのメモリ割り当てを MiB 単位で指定します。設定すると、Python 実行のメモリはその指定された量に制限されます。設定されていない場合、Spark は Python のメモリ使用量を制限せず、アプリケーションが他の非 JVM プロセスと共有されるオーバーヘッドメモリ領域を超えないようにするのはアプリケーション次第です。注意: Windows はリソース制限をサポートしておらず、MacOS では実際のリソースは制限されません。 |
4.0.0 |
spark.sql.preserveCharVarcharTypeInfo |
false | true の場合、Spark は CHAR/VARCHAR 型を STRING 型に置き換えません。これは Spark 3.0 以前のバージョンでのデフォルトの動作です。これにより、CHAR/VARCHAR 型の長さチェックが強制され、CHAR 型も適切にパディングされます。 |
4.0.0 |
spark.sql.pyspark.inferNestedDictAsStruct.enabled |
false | PySpark の SparkSession.createDataFrame は、デフォルトでネストされた辞書をマップとして推論します。true に設定すると、ネストされた辞書を構造体として推論します。 |
3.3.0 |
spark.sql.pyspark.jvmStacktrace.enabled |
false | true の場合、Python スタックトレースと並んで、ユーザー側の PySpark 例外に JVM スタックトレースが表示されます。デフォルトでは、JVM スタックトレースを非表示にし、Python フレンドリーな例外のみを表示するために無効になっています。これはログレベル設定とは独立していることに注意してください。 |
3.0.0 |
spark.sql.pyspark.plotting.max_rows |
1000 | プロットの視覚的制限。トップ N ベースのプロット (円グラフ、棒グラフ、横棒グラフ) に 1000 が設定されている場合、最初の 1000 データポイントがプロットに使用されます。サンプリングベースのプロット (散布図、面積図、線図) では、1000 データポイントがランダムにサンプリングされます。 |
4.0.0 |
spark.sql.pyspark.udf.profiler |
(なし) | "perf" または "memory" のタイプを選択して有効または無効にすることで Python/Pandas UDF プロファイラーを構成します。または、構成を解除するとプロファイラーが無効になります。デフォルトでは無効です。 |
4.0.0 |
spark.sql.readSideCharPadding |
true | true の場合、Spark は CHAR 型列/フィールドを読み取る際に、書き込み側パディングに加えて文字列パディングを適用します。この構成は、外部テーブルのようなケースで CHAR 型のセマンティクスをより適切に強制するために、デフォルトで true になっています。 |
3.4.0 |
spark.sql.redaction.options.regex |
(?i)url | Spark SQL コマンドのオプションマップのどのキーが機密情報を含むかを決定する正規表現。この正規表現に一致する名前のオプションの値は、explain 出力で赤外線処理されます。この赤外線処理は、spark.redaction.regex によって定義されるグローバル赤外線処理構成の上に適用されます。 |
2.2.2 |
spark.sql.redaction.string.regex |
(spark.redaction.string.regex の値) |
Spark が生成する文字列のどの部分が機密情報を含むかを決定する正規表現。この正規表現が文字列の部分に一致すると、その文字列の部分はダミー値に置き換えられます。これは現在、SQL explain コマンドの出力を赤外線処理するために使用されています。この構成が設定されていない場合、 |
2.3.0 |
spark.sql.repl.eagerEval.enabled |
false | 早期評価を有効にするかどうか。true の場合、Dataset の上位 K 行は、REPL が早期評価をサポートしている場合にのみ表示されます。現在、早期評価は PySpark と SparkR でサポートされています。PySpark では、Jupyter のようなノートブックの場合、HTML テーブル (repr_html によって生成される) が返されます。プレーン Python REPL の場合、返される出力は dataframe.show() のような形式になります。SparkR では、返される出力は R data.frame のように表示されます。 |
2.4.0 |
spark.sql.repl.eagerEval.maxNumRows |
20 | 早期評価によって返される行の最大数。これは spark.sql.repl.eagerEval.enabled が true に設定されている場合にのみ有効です。この構成の有効範囲は 0 から (Int.MaxValue - 1) までなので、負の値や (Int.MaxValue - 1) より大きい無効な構成は 0 および (Int.MaxValue - 1) に正規化されます。 |
2.4.0 |
spark.sql.repl.eagerEval.truncate |
20 | 早期評価によって返される各セルの最大文字数。これは spark.sql.repl.eagerEval.enabled が true に設定されている場合にのみ有効です。 |
2.4.0 |
spark.sql.scripting.enabled |
false | SQL スクリプティング機能は開発中であり、この機能フラグの下で使用する必要があります。SQL スクリプティングにより、ユーザーは制御フローやエラーハンドリングを含む手続き型 SQL を記述できます。 |
4.0.0 |
spark.sql.session.localRelationCacheThreshold |
67108864 | シリアライズ後にドライバー側でキャッシュされるローカルリレーションのサイズ (バイト単位) のしきい値。 |
3.5.0 |
spark.sql.session.timeZone |
(ローカルタイムゾーンの値) | セッションローカルタイムゾーンの ID。地域ベースのゾーン ID またはゾーンオフセットの形式です。地域 ID は 'area/city' の形式である必要があります (例: 'America/Los_Angeles')。ゾーンオフセットは '(+|-)HH'、'(+|-)HH:mm'、または '(+|-)HH:mm:ss' の形式である必要があります (例: '-08'、'+01:00'、または '-13:33:33')。また、'UTC' と 'Z' は '+00:00' のエイリアスとしてサポートされています。他の短い名前は曖昧である可能性があるため、使用は推奨されません。 |
2.2.0 |
spark.sql.shuffle.partitions |
200 | 結合や集計のためにデータをシャッフルする際に使用されるデフォルトのパーティション数。注意: 構造化ストリーミングの場合、この構成は同じチェックポイント場所からのクエリ再開間で変更できません。 |
1.1.0 |
spark.sql.shuffleDependency.fileCleanup.enabled |
false | 有効にすると、Spark Connect SQL 実行の終了時にシャッフルファイルがクリーンアップされます。 |
4.0.0 |
spark.sql.shuffleDependency.skipMigration.enabled |
false | 有効にすると、Spark Connect SQL 実行のシャッフル依存関係は実行の終了時にマークされ、廃止中に移行されません。 |
4.0.0 |
spark.sql.shuffledHashJoinFactor |
3 | 小さい側のデータサイズにこの係数を乗じたものが大きい側より小さい場合、シャッフルハッシュ結合が選択される可能性があります。 |
3.3.0 |
spark.sql.sources.bucketing.autoBucketedScan.enabled |
true | true の場合、クエリプランに基づいて入力テーブルでバケットスキャンを実行するかどうかを自動的に決定します。1. クエリにバケットを利用する演算子がない (例: join、group-by など) 場合、または 2. これらの演算子とテーブルスキャンの間に exchange 演算子がある場合、バケットスキャンは使用されません。'spark.sql.sources.bucketing.enabled' が false に設定されている場合、この構成は効果がないことに注意してください。 |
3.1.0 |
spark.sql.sources.bucketing.enabled |
true | false の場合、バケット化されたテーブルを通常のテーブルとして扱います。 |
2.0.0 |
spark.sql.sources.bucketing.maxBuckets |
100000 | 許可される最大バケット数。 |
2.4.0 |
spark.sql.sources.default |
parquet | 入力/出力で使用されるデフォルトのデータソース。 |
1.3.0 |
spark.sql.sources.parallelPartitionDiscovery.threshold |
32 | ドライバー側でファイルをリストするための最大パス数。パーティション検出中に検出されたパスの数がこの値を超えた場合、別の Spark 分散ジョブでファイルをリストしようとします。この構成は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 |
1.5.0 |
spark.sql.sources.partitionColumnTypeInference.enabled |
true | true の場合、パーティション化された列のデータ型を自動的に推論します。 |
1.5.0 |
spark.sql.sources.partitionOverwriteMode |
STATIC | パーティション化されたデータソーステーブルに INSERT OVERWRITE する場合、現在 2 つのモードをサポートしています: static と dynamic。静的モードでは、Spark は INSERT ステートメントのパーティション指定 (例: PARTITION(a=1,b)) に一致するすべてのパーティションを、上書き前に削除します。動的モードでは、Spark は事前にパーティションを削除せず、ランタイムで書き込まれたパーティションのみを上書きします。デフォルトでは、Spark 2.3 より前の動作を維持するために静的モードを使用します。Hive serde テーブルには影響しないことに注意してください。これらは常に動的モードで上書きされます。これは、データソースの出力オプションとして key partitionOverwriteMode (この設定より優先される) を使用して設定することもできます。例: dataframe.write.option("partitionOverwriteMode", "dynamic").save(path)。 |
2.3.0 |
spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled |
false | パーティション変換が互換性はあるが同一ではない場合に、ストレージパーティション結合を許可するかどうか。この構成には、spark.sql.sources.v2.bucketing.enabled と spark.sql.sources.v2.bucketing.pushPartValues.enabled の両方が有効であり、spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled が無効である必要があります。 |
4.0.0 |
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled |
false | 結合キーがソーステーブルのパーティションキーのサブセットである場合に、ストレージパーティション結合を許可するかどうか。計画時に、Spark は結合キーに含まれるキーのみでパーティションをグループ化します。これは現在、spark.sql.requireAllClusterKeysForDistribution が false の場合にのみ有効です。 |
4.0.0 |
spark.sql.sources.v2.bucketing.enabled |
false | spark.sql.sources.bucketing.enabled と同様に、この構成は V2 データソースのバケット化を有効にするために使用されます。オンにすると、Spark は SupportsReportPartitioning を介して V2 データソースから報告された特定の分布を認識し、必要に応じてシャッフルを回避しようとします。 |
3.3.0 |
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled |
false | ストレージパーティション化された結合中に、両方の結合側が KeyGroupedPartitioning の場合、入力パーティションが部分的にクラスタリングされることを許可するかどうか。計画時に、Spark はテーブル統計に基づいてデータ量が少ない側を選択し、それらをグループ化および複製して反対側と一致させます。これはスキュー結合の最適化であり、特定のパーティションに大量のデータが割り当てられている場合のデータスキューを減らすのに役立ちます。この構成には、spark.sql.sources.v2.bucketing.enabled と spark.sql.sources.v2.bucketing.pushPartValues.enabled の両方が有効である必要があります。 |
3.4.0 |
spark.sql.sources.v2.bucketing.partition.filter.enabled |
false | ストレージパーティション結合を実行する際にパーティションをフィルタリングするかどうか。有効にすると、反対側で一致しないパーティションは、結合タイプによって許可される場合、スキャンから除外できます。この構成には、spark.sql.sources.v2.bucketing.enabled と spark.sql.sources.v2.bucketing.pushPartValues.enabled の両方が有効である必要があります。 |
4.0.0 |
spark.sql.sources.v2.bucketing.pushPartValues.enabled |
true | spark.sql.sources.v2.bucketing.enabled が有効な場合に、共通のパーティション値をプッシュダウンするかどうか。オンにすると、結合の両側が KeyGroupedPartitioning であり、互換性のあるパーティションキーを共有している場合、それらが正確に同じパーティション値を持っていなくても、Spark はパーティション値のスーパーセットを計算し、その情報をスキャンノードにプッシュダウンします。これにより、不要なシャッフルが削除されます。 |
3.4.0 |
spark.sql.sources.v2.bucketing.shuffle.enabled |
false | ストレージパーティション化された結合中に、片側のみをシャッフルすることを許可するかどうか。片側のみが KeyGroupedPartitioning の場合、条件が満たされれば、spark は反対側のみをシャッフルします。この最適化により、シャッフルが必要なデータ量が削減されます。この構成には spark.sql.sources.v2.bucketing.enabled が有効である必要があります。 |
4.0.0 |
spark.sql.sources.v2.bucketing.sorting.enabled |
false | オンにすると、Spark は SupportsReportPartitioning を介して V2 データソースから報告された特定の分布を認識し、それらの列でソートする場合に可能な限りシャッフルを回避しようとします。この構成には spark.sql.sources.v2.bucketing.enabled が有効である必要があります。 |
4.0.0 |
spark.sql.stackTracesInDataFrameContext |
1 | キャプチャされた DataFrame クエリコンテキスト内の非 Spark スタックトレースの数。 |
4.0.0 |
spark.sql.statistics.fallBackToHdfs |
false | true の場合、テーブルメタデータからテーブル統計が利用できない場合は HDFS にフォールバックします。これは、テーブルがブロードキャスト結合に使用できるほど小さいかどうかを判断するのに役立ちます。このフラグは、パーティション化されていない Hive テーブルに対してのみ有効です。パーティション化されていないデータソーステーブルの場合、テーブル統計が利用できない場合は自動的に再計算されます。パーティション化されたデータソースおよびパーティション化された Hive テーブルの場合、テーブル統計が利用できない場合は 'spark.sql.defaultSizeInBytes' になります。 |
2.0.0 |
spark.sql.statistics.histogram.enabled |
false | 有効にすると、列統計の計算時にヒストグラムを生成します。ヒストグラムは、より良い推定精度を提供できます。現在、Spark は等高ヒストグラムのみをサポートしています。ヒストグラムの収集には追加のコストがかかることに注意してください。たとえば、列統計の収集は通常 1 回のテーブルスキャンで済みますが、等高ヒストグラムの生成は追加のテーブルスキャンを引き起こします。 |
2.3.0 |
spark.sql.statistics.size.autoUpdate.enabled |
false | テーブルのデータが変更されると、テーブルサイズの自動更新を有効にします。テーブルの総ファイル数が非常に多い場合、これはコストがかかり、データ変更コマンドを遅くする可能性があることに注意してください。 |
2.3.0 |
spark.sql.statistics.updatePartitionStatsInAnalyzeTable.enabled |
false | この構成が有効な場合、Spark は analyze table コマンド (つまり、ANALYZE TABLE .. COMPUTE STATISTICS [NOSCAN]) でもパーティション統計を更新します。コマンドもより高価になることに注意してください。この構成が無効な場合、Spark はテーブルレベルの統計のみを更新します。 |
4.0.0 |
spark.sql.storeAssignmentPolicy |
ANSI | 異なるデータ型の列に値を挿入する際、Spark は型変換を実行します。現在、型変換ルールには 3 つのポリシーをサポートしています: ANSI、legacy、strict。ANSI ポリシーでは、Spark は ANSI SQL に従って型変換を実行します。実際には、その動作は PostgreSQL とほぼ同じです。 |
3.0.0 |
spark.sql.streaming.checkpointLocation |
(なし) | ストリーミングクエリのチェックポイントデータを保存するためのデフォルトの場所。 |
2.0.0 |
spark.sql.streaming.continuous.epochBacklogQueueSize |
10000 | 遅延エポックを待つためにキューに保存されるエントリの最大数。キューのサイズがこのパラメータを超えた場合、ストリームはエラーで停止します。 |
3.0.0 |
spark.sql.streaming.disabledV2Writers |
StreamWriteSupport が無効になっているデータソース登録クラス名の完全修飾名のコンマ区切りリスト。これらのソースへの書き込みは、V1 シンクにフォールバックします。 |
2.3.1 | |
spark.sql.streaming.fileSource.cleaner.numThreads |
1 | ファイルソースの完了ファイルクリーナーで使用されるスレッド数。 |
3.0.0 |
spark.sql.streaming.forceDeleteTempCheckpointLocation |
false | true の場合、一時チェックポイント場所の強制削除を有効にします。 |
3.0.0 |
spark.sql.streaming.metricsEnabled |
false | アクティブなストリーミングクエリに対して Dropwizard/Codahale メトリクスが報告されるかどうか。 |
2.0.2 |
spark.sql.streaming.multipleWatermarkPolicy |
min | ストリーミングクエリに複数のウォーターマーク演算子がある場合に、グローバルウォーターマーク値を計算するためのポリシー。デフォルト値は 'min' で、複数の演算子にわたる最小ウォーターマークを選択します。別の値は 'max' で、複数の演算子にわたる最大値を選択します。注意: この構成は、同じチェックポイント場所からのクエリ再開間で変更できません。 |
2.4.0 |
spark.sql.streaming.noDataMicroBatches.enabled |
true | ステートフルストリーミングクエリの積極的な状態管理のために、ストリーミングマイクロバッチエンジンがデータのないバッチを実行するかどうか。 |
2.4.1 |
spark.sql.streaming.numRecentProgressUpdates |
100 | ストリーミングクエリのために保持される進捗更新の数。 |
2.1.1 |
spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition |
false | true の場合、ストリーミングセッションウィンドウは、シャッフルする前にローカルパーティションでセッションをソートおよびマージします。これは、シャッフルされる行を減らすためですが、バッチで多くの行が同じセッションに割り当てられている場合にのみ有益です。 |
3.2.0 |
spark.sql.streaming.stateStore.encodingFormat |
unsaferow | ステートフル演算子がステートストアに情報を格納するために使用するエンコーディング形式。 |
4.0.0 |
spark.sql.streaming.stateStore.stateSchemaCheck |
true | true の場合、Spark は既存の状態に対するスキーマに対して状態スキーマを検証し、互換性がない場合はクエリを失敗させます。 |
3.1.0 |
spark.sql.streaming.stopActiveRunOnRestart |
true | 同じストリーミングクエリの複数の実行を同時に実行することはサポートされていません。同じ SparkSession または異なる SparkSession で同じストリーミングクエリの同時アクティブ実行が見つかり、このフラグが true の場合、新しいストリーミングクエリ実行を開始するために古いストリーミングクエリ実行を停止します。 |
3.0.0 |
spark.sql.streaming.stopTimeout |
0 | ストリーミングクエリの stop() メソッドを呼び出すときに、ストリーミング実行スレッドが停止するのを待つ時間 (ミリ秒)。0 または負の値は無期限に待機します。 |
3.0.0 |
spark.sql.streaming.transformWithState.stateSchemaVersion |
3 | transformWithState 演算子によって使用される状態スキーマのバージョン。 |
4.0.0 |
spark.sql.thriftServer.interruptOnCancel |
(spark.sql.execution.interruptOnCancel の値) |
true の場合、1 つのクエリをキャンセルすると、すべての実行中のタスクが中断されます。false の場合、すべての実行中のタスクは完了するまで続行されます。 |
3.2.0 |
spark.sql.thriftServer.queryTimeout |
0ms | Thrift Server でクエリ期間のタイムアウトを秒単位で設定します。タイムアウトが正の値に設定されている場合、タイムアウトを超えると実行中のクエリは自動的にキャンセルされます。それ以外の場合、クエリは完了まで実行を続行します。 |
3.1.0 |
spark.sql.thriftserver.scheduler.pool |
(なし) | JDBC クライアントセッションの Fair Scheduler プールを設定します。 |
1.1.1 |
spark.sql.thriftserver.ui.retainedSessions |
200 | JDBC/ODBC Web UI の履歴に保持される SQL クライアントセッション数。 |
1.4.0 |
spark.sql.thriftserver.ui.retainedStatements |
200 | JDBC/ODBC Web UI の履歴に保持される SQL ステートメント数。 |
1.4.0 |
spark.sql.timeTravelTimestampKey |
timestampAsOf | テーブルを読み取る際に時間移動タイムスタンプを指定するオプション名。 |
4.0.0 |
spark.sql.timeTravelVersionKey |
versionAsOf | テーブルを読み取る際に時間移動テーブルバージョンを指定するオプション名。 |
4.0.0 |
spark.sql.timestampType |
TIMESTAMP_LTZ | SQL DDL、Cast 句、型リテラル、およびデータソースのスキーマ推論を含む、Spark SQL のデフォルトのタイムスタンプ型を構成します。構成を TIMESTAMP_NTZ に設定すると、デフォルト型として TIMESTAMP WITHOUT TIME ZONE を使用します。TIMESTAMP_LTZ に設定すると、TIMESTAMP WITH LOCAL TIME ZONE を使用します。3.4.0 リリースより前は、Spark は TIMESTAMP WITH LOCAL TIME ZONE 型のみをサポートしていました。 |
3.4.0 |
spark.sql.transposeMaxValues |
500 | インデックス列の値が指定されていないトランスポーズを行う場合、エラーなしでトランスポーズされる値の最大数。 |
4.0.0 |
spark.sql.tvf.allowMultipleTableArguments.enabled |
false | true の場合、テーブル値関数で複数のテーブル引数を許可し、それらのテーブルのすべての行のデカルト積を受け取ります。 |
3.5.0 |
spark.sql.ui.explainMode |
formatted | Spark SQL UI で使用されるクエリ説明モードを構成します。値は 'simple'、'extended'、'codegen'、'cost'、または 'formatted' にすることができます。デフォルト値は 'formatted' です。 |
3.1.0 |
spark.sql.variable.substitute |
true | これは、 |
2.0.0 |
静的 SQL 構成
静的 SQL 構成は、セッション間で不変の Spark SQL 構成です。これらは、config ファイルおよび --conf/-c で始まるコマンドラインオプション、または SparkSession の作成に使用される SparkConf を設定することによって、最終値で設定できます。外部ユーザーは、SparkSession.conf または set コマンド (例: SET spark.sql.extensions;) を介して静的 SQL 構成値をクエリできますが、それらを設定/解除することはできません。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.sql.cache.serializer |
org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer | SQL データをより効率的にキャッシュできる形式に変換するために使用される、org.apache.spark.sql.columnar.CachedBatchSerializer を実装するクラスの名前。基盤となる API は変更される可能性があるため、注意して使用してください。複数のクラスを指定することはできません。クラスには引数なしのコンストラクタが必要です。 |
3.1.0 |
spark.sql.catalog.spark_catalog.defaultDatabase |
default | セッションカタログのデフォルトデータベース。 |
3.4.0 |
spark.sql.event.truncate.length |
2147483647 | イベントに追加される前に、SQL の長さが切り捨てられるしきい値。デフォルトは切り捨てなしです。0 に設定すると、代わりに callsites がログに記録されます。 |
3.0.0 |
spark.sql.extensions |
(なし) | Spark Session 拡張機能を構成するために使用される Function1[SparkSessionExtensions, Unit] を実装するクラスのコンマ区切りリスト。クラスには引数なしのコンストラクタが必要です。複数の拡張機能が指定されている場合、指定された順序で適用されます。ルールとプランナ戦略の場合、指定された順序で適用されます。パーサーの場合、最後のパーサーが使用され、各パーサーは前のパーサーに委譲できます。関数名の競合の場合、最後に登録された関数名が使用されます。 |
2.2.0 |
spark.sql.extensions.test.loadFromCp |
true | SparkSessionExtensionsProvider メカニズムを使用してクラスパスから拡張機能をロードするかどうかを決定するフラグ。これはテスト専用フラグです。 |
|
spark.sql.hive.metastore.barrierPrefixes |
Spark SQL が通信している Hive の各バージョンに対して明示的にリロードする必要があるクラスプレフィックスのコンマ区切りリスト。たとえば、通常共有されるプレフィックス (例: |
1.4.0 | |
spark.sql.hive.metastore.jars |
builtin | HiveMetastoreClient をインスタンス化するために使用される JAR の場所。このプロパティは 4 つのオプションのいずれかになります: 1. "builtin" Spark アセンブリにバンドルされている Hive 2.3.10 を使用します ( |
1.4.0 |
spark.sql.hive.metastore.jars.path |
HiveMetastoreClient をインスタンス化するために使用される JAR のコンマ区切りパス。この構成は、 |
3.1.0 | |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | Spark SQL と特定の Hive バージョン間で共有されるクラスローダーを使用してロードする必要があるクラスプレフィックスのコンマ区切りリスト。共有される必要があるクラスの例としては、メタストアとの通信に必要な JDBC ドライバーがあります。共有される必要がある他のクラスは、既に共有されているクラスと相互作用するクラスです。たとえば、log4j で使用されるカスタムアペンダー。 |
1.4.0 |
spark.sql.hive.metastore.version |
2.3.10 | Hive メタストアのバージョン。利用可能なオプションは |
1.4.0 |
spark.sql.hive.thriftServer.singleSession |
false | true に設定すると、Hive Thrift Server はシングルセッションモードで実行されます。すべての JDBC/ODBC 接続は、一時ビュー、関数レジストリ、SQL 構成、および現在のデータベースを共有します。 |
1.6.0 |
spark.sql.hive.version |
2.3.10 | Spark ディストリビューションにバンドルされている、コンパイル済みの (つまり、組み込みの) Hive バージョン。これは読み取り専用の構成であり、組み込み Hive バージョンを報告するためにのみ使用されることに注意してください。Spark が呼び出すための異なるメタストアクライアントが必要な場合は、spark.sql.hive.metastore.version を参照してください。 |
1.1.1 |
spark.sql.metadataCacheTTLSeconds |
-1000ms | メタデータキャッシュ (パーティションファイルメタデータキャッシュおよびセッションカタログキャッシュ) の TTL (Time-to-Live) 値。この構成は、この値が正の値 (> 0) を持つ場合にのみ効果があります。また、'spark.sql.catalogImplementation' が |
3.1.0 |
spark.sql.queryExecutionListeners |
(なし) | 新しく作成されたセッションに自動的に追加される QueryExecutionListener を実装するクラス名のリスト。クラスは、引数なしのコンストラクタ、または SparkConf 引数を受け取るコンストラクタを持つ必要があります。 |
2.3.0 |
spark.sql.sources.disabledJdbcConnProviderList |
無効にされている JDBC 接続プロバイダーのリストを構成します。リストには、コンマで区切られた JDBC 接続プロバイダーの名前が含まれます。 |
3.1.0 | |
spark.sql.streaming.streamingQueryListeners |
(なし) | 新しく作成されたセッションに自動的に追加される StreamingQueryListener を実装するクラス名のリスト。クラスは、引数なしのコンストラクタ、または SparkConf 引数を受け取るコンストラクタを持つ必要があります。 |
2.4.0 |
spark.sql.streaming.ui.enabled |
true | Spark Web UI が有効な場合に、Spark アプリケーションで構造化ストリーミング Web UI を実行するかどうか。 |
3.0.0 |
spark.sql.streaming.ui.retainedProgressUpdates |
100 | 構造化ストリーミング UI のストリーミングクエリのために保持される進捗更新の数。 |
3.0.0 |
spark.sql.streaming.ui.retainedQueries |
100 | 構造化ストリーミング UI のために保持される非アクティブなクエリの数。 |
3.0.0 |
spark.sql.ui.retainedExecutions |
1000 | Spark UI で保持される実行の数。 |
1.5.0 |
spark.sql.warehouse.dir |
($PWD/spark-warehouse の値) |
管理データベースおよびテーブルのデフォルトの場所。 |
2.0.0 |
Spark Streaming
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.streaming.backpressure.enabled |
false | Spark Streaming の内部バックプレッシャーメカニズムを有効または無効にします (1.5 以降)。これにより、Spark Streaming は現在のバッチスケジューリング遅延と処理時間に基づいて受信レートを制御できるため、システムは処理できる速度でしか受信しません。内部的には、これはレシーバーの最大受信レートを動的に設定します。このレートは、設定されている場合 (下記参照) spark.streaming.receiver.maxRate および spark.streaming.kafka.maxRatePerPartition の値の上限があります。 |
1.5.0 |
spark.streaming.backpressure.initialRate |
設定されていません | これは、バックプレッシャーメカニズムが有効な場合に、各レシーバーが最初のバッチのデータを受信する初期最大受信レートです。 | 2.0.0 |
spark.streaming.blockInterval |
200ms | Spark Streaming レシーバーによって受信されたデータが、Spark に格納される前にデータブロックにチャンク化される間隔。最小推奨値は 50 ms です。Spark Streaming プログラミングガイドの パフォーマンスチューニング セクションで詳細を参照してください。 | 0.8.0 |
spark.streaming.receiver.maxRate |
設定されていません | 各レシーバーがデータを受信する最大レート (1 秒あたりのレコード数)。実質的に、各ストリームは 1 秒あたりこのレコード数以下を消費します。この構成を 0 または負の値に設定すると、レートに制限がなくなります。Spark Streaming プログラミングガイドの デプロイガイド で詳細を参照してください。 | 1.0.2 |
spark.streaming.receiver.writeAheadLog.enable |
false | レシーバーの書き込み先行ログを有効にします。レシーバー経由で受信されたすべての入力データは、ドライバー障害後に回復できるようにする書き込み先行ログに保存されます。Spark Streaming プログラミングガイドの デプロイガイド で詳細を参照してください。 | 1.2.1 |
spark.streaming.unpersist |
true | Spark Streaming によって生成および永続化された RDD が Spark のメモリから自動的にアンパース化されるように強制します。Spark Streaming によって受信された生の入力データも自動的にクリアされます。これを false に設定すると、生のデータと永続化された RDD がストリーミングアプリケーションの外部からアクセスできるようになります。これらは自動的にクリアされないためです。ただし、メモリ使用量が増加するというコストがかかります。 | 0.9.0 |
spark.streaming.stopGracefullyOnShutdown |
false | true の場合、Spark は JVM シャットダウン時に StreamingContext を即時ではなく正常にシャットダウンします。 |
1.4.0 |
spark.streaming.kafka.maxRatePerPartition |
設定されていません | 新しい Kafka ダイレクトストリーム API を使用する場合、各 Kafka パーティションから読み取られるデータの最大レート (1 秒あたりのレコード数)。詳細については、Kafka Integration ガイドの Kafka Integration guide を参照してください。 | 1.3.0 |
spark.streaming.kafka.minRatePerPartition |
1 | 新しい Kafka ダイレクトストリーム API を使用する場合、各 Kafka パーティションから読み取られるデータの最小レート (1 秒あたりのレコード数)。 | 2.4.0 |
spark.streaming.ui.retainedBatches |
1000 | Spark Streaming UI およびステータス API がガベージコレクションする前に、いくつのバッチを記憶するか。 | 1.0.0 |
spark.streaming.driver.writeAheadLog.closeFileAfterWrite |
false | ドライバーで書き込み先行ログレコードを書き込んだ後にファイルを閉じるかどうか。ドライバーでメタデータ WAL に S3 (またはフラッシュをサポートしないファイルシステム) を使用したい場合は、これを 'true' に設定してください。 | 1.6.0 |
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite |
false | レシーバーで書き込み先行ログレコードを書き込んだ後にファイルを閉じるかどうか。レシーバーでデータ WAL に S3 (またはフラッシュをサポートしないファイルシステム) を使用したい場合は、これを 'true' に設定してください。 | 1.6.0 |
SparkR (非推奨)
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.r.numRBackendThreads |
2 | SparkR パッケージからの RPC 呼び出しを処理するために RBackend によって使用されるスレッド数。 | 1.4.0 |
spark.r.command |
Rscript | ドライバーとワーカーの両方でクラスターモードで R スクリプトを実行するための実行可能ファイル。 | 1.5.3 |
spark.r.driver.command |
spark.r.command | クライアントモードでドライバーの R スクリプトを実行するための実行可能ファイル。クラスターモードでは無視されます。 | 1.5.3 |
spark.r.shell.command |
R | クライアントモードでドライバーの sparkR シェルを実行するための実行可能ファイル。クラスターモードでは無視されます。これは環境変数 SPARKR_DRIVER_R と同じですが、それを優先します。spark.r.shell.command は sparkR シェルに使用され、spark.r.driver.command は R スクリプトの実行に使用されます。 |
2.1.0 |
spark.r.backendConnectionTimeout |
6000 | R プロセスが RBackend への接続を試みる際の接続タイムアウト (秒単位)。 | 2.1.0 |
spark.r.heartBeatInterval |
100 | 接続タイムアウトを防ぐために、SparkR バックエンドから R プロセスに送信されるハートビートの間隔。 | 2.1.0 |
GraphX
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.graphx.pregel.checkpointInterval |
-1 | Pregel におけるグラフとメッセージのチェックポイント間隔。多数の反復による長い lineage チェーンによる stackOverflowError を回避するために使用されます。チェックポイントはデフォルトで無効になっています。 | 2.2.0 |
クラスターマネージャー
Spark の各クラスターマネージャーには追加の構成オプションがあります。構成は各モードのページで確認できます。
YARN
Kubernetes
スタンドアロンモード
環境変数
特定の Spark 設定は環境変数を通じて構成できます。これらの環境変数は、Spark がインストールされているディレクトリの conf/spark-env.sh スクリプト (Windows の場合は conf/spark-env.cmd) から読み取られます。Standalone モードでは、このファイルはホスト名などのマシン固有の情報を提供できます。ローカル Spark アプリケーションまたはサブミッション スクリプトを実行する際にもソースされます。
conf/spark-env.sh は、Spark がインストールされたときにデフォルトで存在しないことに注意してください。ただし、conf/spark-env.sh.template をコピーして作成できます。コピーを実行可能にすることを忘れないでください。
次の変数は spark-env.sh で設定できます。
| 環境変数 | 意味 |
|---|---|
JAVA_HOME |
Java がインストールされている場所 (デフォルトの PATH にない場合)。 |
PYSPARK_PYTHON |
ドライバーとワーカーの両方で PySpark に使用する Python バイナリ実行可能ファイル (デフォルトは python3 が利用可能な場合は python3、それ以外の場合は python)。プロパティ spark.pyspark.python が設定されている場合は、そちらが優先されます。 |
PYSPARK_DRIVER_PYTHON |
ドライバーのみで PySpark に使用する Python バイナリ実行可能ファイル (デフォルトは PYSPARK_PYTHON)。プロパティ spark.pyspark.driver.python が設定されている場合は、そちらが優先されます。 |
SPARKR_DRIVER_R |
SparkR シェルに使用する R バイナリ実行可能ファイル (デフォルトは R)。プロパティ spark.r.shell.command が設定されている場合は、そちらが優先されます。 |
SPARK_LOCAL_IP |
バインドするマシンの IP アドレス。 |
SPARK_PUBLIC_DNS |
Spark プログラムが他のマシンにアドバタイズするホスト名。 |
上記に加えて、Spark スタンドアロンクラスター スクリプト の設定オプションもあり、各マシンで使用するコア数や最大メモリなどが含まれます。
spark-env.sh はシェルスクリプトであるため、これらのうちいくつかはプログラムで設定できます。たとえば、特定のネットワークインターフェイスの IP を検索して SPARK_LOCAL_IP を計算することもできます。
cluster モードで YARN 上で Spark を実行する場合、環境変数は conf/spark-defaults.conf ファイルの spark.yarn.appMasterEnv.[EnvironmentVariableName] プロパティを使用して設定する必要があります。spark-env.sh で設定された環境変数は、cluster モードの YARN Application Master プロセスには反映されません。詳細については、YARN 関連の Spark プロパティ を参照してください。
ロギングの構成
Spark はログ記録に log4j を使用します。conf ディレクトリに log4j2.properties ファイルを追加することで構成できます。開始するには、提供されているテンプレートのいずれかをコピーします: log4j2.properties.template (プレーンテキストログ用) または log4j2-json-layout.properties.template (構造化ログ用)。
プレーンテキストロギング
デフォルトのログ形式はプレーンテキストで、Log4j の Pattern Layout を使用します。
MDC (Mapped Diagnostic Context) 情報は、デフォルトではプレーンテキストログに含まれていません。含めるには、log4j2.properties ファイルの PatternLayout 構成を更新します。たとえば、タスク名を含めるには %X{task_name} を追加します。さらに、spark.sparkContext.setLocalProperty("key", "value") を使用して、MDC にカスタムデータを追加できます。
構造化ロギング
バージョン 4.0.0 以降、spark-submit は JSON Template Layout を使用して、オプションの構造化ログ記録をサポートしています。この形式は、JSON データソースを使用して Spark SQL でログを効率的にクエリできるようにし、検索性とデバッグを向上させるためにすべての MDC 情報を含みます。
構造化ログ記録を有効にし、MDC 情報を含めるには、構成 spark.log.structuredLogging.enabled を true (デフォルトは false) に設定します。追加のカスタマイズについては、log4j2-json-layout.properties.template を conf/log4j2.properties にコピーして必要に応じて調整してください。
Spark SQL による構造化ログのクエリ
JSON 形式の構造化ログをクエリするには、次のコードスニペットを使用します。
Python
from pyspark.logger import SPARK_LOG_SCHEMA
logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
Scala
import org.apache.spark.util.LogUtils.SPARK_LOG_SCHEMA
val logDf = spark.read.schema(SPARK_LOG_SCHEMA).json("path/to/logs")
注意: インタラクティブシェル (pyspark shell または spark-shell) を使用している場合、SPARK_LOG_SCHEMA はシェルのコンテキストで既に利用可能であるため、コードの import ステートメントを省略できます。
構成ディレクトリのオーバーライド
デフォルトの "SPARK_HOME/conf" 以外の構成ディレクトリを指定するには、SPARK_CONF_DIR を設定します。Spark はこのディレクトリの構成ファイル (spark-defaults.conf、spark-env.sh、log4j2.properties など) を使用します。
Hadoop クラスター構成の継承
Spark を使用して HDFS から読み書きする予定がある場合、Spark のクラスパスに含めるべき 2 つの Hadoop 構成ファイルがあります。
hdfs-site.xml、これは HDFS クライアントのデフォルトの動作を提供します。core-site.xml、これはデフォルトのファイルシステム名を設定します。
これらの構成ファイルの場所は Hadoop のバージョンによって異なりますが、一般的な場所は /etc/hadoop/conf の中にあります。一部のツールはオンザフライで構成を作成しますが、それらのコピーをダウンロードするメカニズムを提供します。
これらのファイルを Spark で表示するには、$SPARK_HOME/conf/spark-env.sh の HADOOP_CONF_DIR を構成ファイルを含む場所に設定します。
カスタム Hadoop/Hive 構成
Spark アプリケーションが Hadoop、Hive、またはその両方とやり取りする場合、Spark のクラスパスに Hadoop/Hive 構成ファイルが多数存在する可能性があります。
複数の実行中のアプリケーションでは、異なる Hadoop/Hive クライアント側の構成が必要になる場合があります。各アプリケーションの Spark のクラスパスに hdfs-site.xml、core-site.xml、yarn-site.xml、hive-site.xml をコピーして変更できます。YARN 上で実行されている Spark クラスターでは、これらの構成ファイルはクラスター全体で設定されており、アプリケーションによって安全に変更することはできません。
より良い選択肢は、spark.hadoop.* の形式の Spark Hadoop プロパティと、spark.hive.* の形式の Spark Hive プロパティを使用することです。たとえば、構成 "spark.hadoop.abc.def=xyz" を追加することは、Hadoop プロパティ "abc.def=xyz" を追加することと同等であり、構成 "spark.hive.abc=xyz" を追加することは、Hive プロパティ "hive.abc=xyz" を追加することと同等です。これらは、$SPARK_HOME/conf/spark-defaults.conf で設定できる通常の Spark プロパティと同じと見なすことができます。
場合によっては、SparkConf で特定の構成をハードコーディングすることを避けたい場合があります。たとえば、Spark では空の conf を作成し、spark/spark hadoop/spark hive プロパティを設定するだけで済みます。
val conf = new SparkConf().set("spark.hadoop.abc.def", "xyz")
val sc = new SparkContext(conf)また、実行時に構成を変更または追加することもできます。
./bin/spark-submit \
--name "My app" \
--master "local[4]" \
--conf spark.eventLog.enabled=false \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
--conf spark.hadoop.abc.def=xyz \
--conf spark.hive.abc=xyz
myApp.jarカスタムリソーススケジューリングと構成の概要
GPU やその他のアクセラレータは、ディープラーニングや信号処理などの特殊なワークロードの高速化に広く使用されています。Spark は現在、いくつかの注意点とともに、汎用リソース (GPU など) の要求とスケジューリングをサポートしています。現在の実装では、リソースがスケジューラによって割り当て可能なアドレスを持っている必要があります。クラスターマネージャーがリソースをサポートし、適切に構成されている必要があります。
ドライバーのリソースを要求するための構成: spark.driver.resource.{resourceName}.amount、エグゼキュータのリソースを要求するための構成: spark.executor.resource.{resourceName}.amount、および各タスクの要件を指定するための構成: spark.task.resource.{resourceName}.amount。spark.driver.resource.{resourceName}.discoveryScript 構成は YARN、Kubernetes、および Spark Standalone のクライアント側ドライバーで必要です。spark.executor.resource.{resourceName}.discoveryScript 構成は YARN および Kubernetes で必要です。Kubernetes は spark.driver.resource.{resourceName}.vendor および/または spark.executor.resource.{resourceName}.vendor も必要とします。各詳細については、上記の構成説明を参照してください。
Spark は、指定された構成を使用して、まずクラスターマネージャーから対応するリソースを持つコンテナを要求します。コンテナを取得すると、Spark はそのコンテナ内のリソースと各リソースに関連付けられたアドレスを検出するエグゼキュータを起動します。エグゼキュータはドライバーに登録し、そのエグゼキュータが利用可能なリソースを報告します。次に Spark スケジューラは、ユーザーが指定したリソース要件に基づいて、各エグゼキュータにタスクをスケジュールし、特定のリソースアドレスを割り当てることができます。ユーザーは TaskContext.get().resources API を使用してタスクに割り当てられたリソースを確認できます。ドライバーでは、ユーザーは SparkContext resources コールで割り当てられたリソースを確認できます。その後、ユーザーは割り当てられたアドレスを使用して、目的の処理を実行するか、ML/AI フレームワークに渡すことになります。
要件と各詳細については、YARN、Kubernetes、Standalone モード のクラスターマネージャー固有のページを参照してください。ローカルモードでは現在利用できません。また、複数のワーカーを持つローカルクラスターモードはサポートされていないことにも注意してください (Standalone ドキュメントを参照)。
ステージレベルのスケジューリング概要
ステージレベルのスケジューリング機能により、ユーザーはステージレベルでタスクとエグゼキュータのリソース要件を指定できます。これにより、異なるステージが異なるリソースを持つエグゼキュータで実行できるようになります。これの主な例は、ETL ステージが CPU のみを持つエグゼキュータで実行され、次の ML ステージが GPU を必要とする場合です。ステージレベルのスケジューリングにより、ETL ステージが実行されている間アイドル状態になる GPU を持つエグゼキュータをアプリケーションの開始時に取得するのではなく、ML ステージが実行されるときに GPU を持つ異なるエグゼキュータを要求できるようになります。これは Scala、Java、Python の RDD API でのみ利用可能です。動的割り当てが有効な場合、YARN、Kubernetes、および Standalone で利用可能です。動的割り当てが無効な場合、ユーザーはステージレベルで異なるタスクリソース要件を指定できます。これは現在、YARN、Kubernetes、および Standalone クラスターでサポートされています。詳細については、YARN ページ、Kubernetes ページ、または Standalone ページを参照してください。
この機能を使用するには、RDD.withResources および ResourceProfileBuilder API を参照してください。動的割り当てが無効な場合、異なるタスクリソース要件を持つタスクは、DEFAULT_RESOURCE_PROFILE を持つエグゼキュータを共有します。動的割り当てが有効な場合、現在の実装では、作成された各 ResourceProfile に対して新しいエグゼキュータが取得され、現在は正確な一致が必要です。Spark は、エグゼキュータが作成された ResourceProfile とは異なる ResourceProfile を必要とするエグゼキュータにタスクを適合させようとしません。使用されていないエグゼキュータは、動的割り当てロジックでアイドルタイムアウトになります。この機能のデフォルト構成は、ステージごとに 1 つの ResourceProfile のみを許可することです。ユーザーが RDD に 1 つ以上の ResourceProfile を関連付けると、デフォルトで Spark は例外をスローします。この動作を制御するには、構成 spark.scheduler.resource.profileMergeConflicts を参照してください。spark.scheduler.resource.profileMergeConflicts が有効な場合に Spark が実装する現在のマージ戦略は、競合する ResourceProfiles 内の各リソースの単純な最大値です。Spark は、リソースの最大値を持つ新しい ResourceProfile を作成します。
プッシュベースシャッフルの概要
プッシュベースシャッフルは、Spark シャッフルの信頼性とパフォーマンスを向上させます。これは、マップタスクによって生成されたシャッフルブロックをリモート外部シャッフルサービスにプッシュして、シャッフルパーティションごとにマージするための最善の試みを行います。リデュースタスクは、マージされたシャッフルパーティションと元のシャッフルブロックの組み合わせを入力データとして取得し、外部シャッフルサービスによる小さなランダムディスク読み取りを大きなシーケンシャル読み取りに変換します。リデュースタスクのデータ局所性の可能性が向上することも、ネットワーク IO を最小限に抑えるのに役立ちます。プッシュベースシャッフルは、マージされた出力が利用可能な場合など、一部のシナリオで長時間実行されるジョブ/クエリのパフォーマンスを向上させます。
プッシュベースシャッフルは、シャッフル中の大量のディスク IO を処理する長時間実行ジョブ/クエリのパフォーマンスを向上させます。現在、処理量が少ないジョブ/クエリには適していません。これは将来のリリースでさらに改善される予定です。
現在、プッシュベースシャッフルは、外部シャッフルサービスを備えた Spark on YARN でのみサポートされています。
外部シャッフルサービス (サーバー) 側の構成オプション
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.shuffle.push.server.mergedShuffleFileManagerImpl |
org.apache.spark.network.shuffle.
|
プッシュベースシャッフルを管理する MergedShuffleFileManager の実装のクラス名。これは、プッシュベースシャッフルを無効または有効にするためのサーバー側の構成として機能します。デフォルトでは、プッシュベースシャッフルはサーバー側で無効になっています。サーバー側でプッシュベースシャッフルを有効にするには、この構成を |
3.2.0 |
spark.shuffle.push.server.minChunkSizeInMergedShuffleFile |
2m |
プッシュベースシャッフル中に、マージされたシャッフルファイルを複数のチャンクに分割する際のチャンクの最小サイズ。マージされたシャッフルファイルは、複数の小さなシャッフルブロックで構成されます。完全なマージされたシャッフルファイルを 1 回のディスク I/O で取得すると、クライアントと外部シャッフルサービスの両方のメモリ要件が増加します。代わりに、外部シャッフルサービスは これを高く設定すると、クライアントと外部シャッフルサービスの両方でメモリ要件が増加します。 これを低く設定すると、RPC リクエストの総数が不必要に外部シャッフルサービスに増加します。 |
3.2.0 |
spark.shuffle.push.server.mergedIndexCacheSize |
100m |
プッシュベースシャッフルでマージされたインデックスファイルを格納するために使用できるメモリ内のキャッシュの最大サイズ。このキャッシュは、spark.shuffle.service.index.cache.size で構成されるキャッシュに加えて使用されます。 |
3.2.0 |
クライアント側の構成オプション
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.shuffle.push.enabled |
false |
クライアント側でプッシュベースシャッフルを有効にするには true に設定し、サーバー側フラグ spark.shuffle.push.server.mergedShuffleFileManagerImpl と連携して動作します。 |
3.2.0 |
spark.shuffle.push.finalize.timeout |
10s |
マップタスクが特定のシャッフルマップステージの完了後に、リモート外部シャッフルサービスへのマージ完了要求を送信するまでのドライバーの待機時間 (秒)。これにより、外部シャッフルサービスはマージブロックのための追加時間を取得します。これを長く設定しすぎると、パフォーマンスが低下する可能性があります。 | 3.2.0 |
spark.shuffle.push.maxRetainedMergerLocations |
500 |
プッシュベースシャッフルのためにキャッシュされたマージャーロケーションの最大数。現在、マージャーロケーションは、プッシュされたブロックを処理し、それらをマージし、後でシャッフルフェッチのためにマージされたブロックを提供する責任のある外部シャッフルサービスのホストです。 | 3.2.0 |
spark.shuffle.push.mergersMinThresholdRatio |
0.05 |
リデューサーステージのパーティション数に基づいてステージに必要な最小シャッフルマージャーロケーション数を計算するために使用される比率。たとえば、100 パーティションを持ち、デフォルト値 0.05 を使用するリデュースステージでは、プッシュベースシャッフルを有効にするために少なくとも 5 つの一意のマージャーロケーションが必要です。 | 3.2.0 |
spark.shuffle.push.mergersMinStaticThreshold |
5 |
ステージのプッシュベースシャッフルを有効にするために利用可能なシャッフルプッシュマージャーロケーションの数の静的しきい値。この構成は spark.shuffle.push.mergersMinThresholdRatio と連携して機能します。ステージのプッシュベースシャッフルを有効にするには、spark.shuffle.push.mergersMinStaticThreshold と spark.shuffle.push.mergersMinThresholdRatio 比率の数(多い方)のマージャーが必要です。例: 子ステージに 1000 パーティションがあり、spark.shuffle.push.mergersMinStaticThreshold が 5 で、spark.shuffle.push.mergersMinThresholdRatio が 0.05 に設定されている場合、そのステージのプッシュベースシャッフルを有効にするには少なくとも 50 のマージャーが必要です。 |
3.2.0 |
spark.shuffle.push.numPushThreads |
(なし) | ブロックプッシャープール内のスレッド数を指定します。これらのスレッドは、接続を作成し、リモート外部シャッフルサービスにブロックをプッシュするのを支援します。デフォルトでは、スレッドプールのサイズは Spark エグゼキュータコア数に等しくなります。 | 3.2.0 |
spark.shuffle.push.maxBlockSizeToPush |
1m |
リモート外部シャッフルサービスにプッシュする個々のブロックの最大サイズ。このしきい値を超えるブロックは、リモートでマージするためにプッシュされません。これらのシャッフルブロックは、元の方法でフェッチされます。 これを高く設定すると、より多くのブロックがリモート外部シャッフルサービスにプッシュされますが、それらは既存のメカニズムで既に効率的にフェッチされているため、大きなブロックをリモート外部シャッフルサービスにプッシュする追加のオーバーヘッドが発生します。 これを低く設定すると、より少ないブロックがマージされ、マッパー外部シャッフルサービスから直接フェッチされるため、小さなランダム読み取りが増加し、全体的なディスク I/O パフォーマンスに影響します。 |
3.2.0 |
spark.shuffle.push.maxBlockBatchSize |
3m |
1 回のプッシュリクエストにグループ化されるシャッフルブロックのバッチの最大サイズ。デフォルトは 3m に設定されており、spark.storage.memoryMapThreshold のデフォルト値である 2m よりもわずかに高いため、各ブロックのバッチはメモリマップされる可能性が高く、オーバーヘッドが増加します。 |
3.2.0 |
spark.shuffle.push.merge.finalizeThreads |
8 | プッシュベースシャッフルが有効な場合、ドライバーがシャッフルマージを完了するために使用するスレッド数。大きなシャッフルが完了するのに数秒かかる可能性があるため、複数のスレッドがあることは、ドライバーが同時シャッフルマージ完了要求を処理するのに役立ちます。 | 3.3.0 |
spark.shuffle.push.minShuffleSizeToWait |
500m |
ドライバーは、合計シャッフルデータサイズがこのしきい値を超えている場合にのみ、マージ完了を待ちます。合計シャッフルサイズがそれより小さい場合、ドライバーはシャッフル出力をすぐに完了します。 | 3.3.0 |
spark.shuffle.push.minCompletedPushRatio |
1.0 |
プッシュベースシャッフル中にドライバーがシャッフルマージ完了を開始する前に、プッシュが完了する必要がある最小マップパーティションの割合。 | 3.3.0 |