パフォーマンスチューニング

Spark は、DataFrame または SQL ワークロードのパフォーマンスをチューニングするための多くのテクニックを提供しています。これらのテクニックは、広義には、データのキャッシュ、データセットのパーティショニング方法の変更、最適な結合戦略の選択、およびオプティマイザがより効率的な実行計画を構築するために使用できる追加情報を提供することを含みます。

データのキャッシュ

Spark SQL は、spark.catalog.cacheTable("tableName") または dataFrame.cache() を呼び出すことで、インメモリ列フォーマットを使用してテーブルをキャッシュできます。その後、Spark SQL は必要な列のみをスキャンし、メモリ使用量と GC 負荷を最小限に抑えるために圧縮を自動的にチューニングします。spark.catalog.uncacheTable("tableName") または dataFrame.unpersist() を呼び出すことで、メモリからテーブルを削除できます。

インメモリキャッシングの設定は、spark.conf.set を使用するか、SQL を使用して SET key=value コマンドを実行することで行うことができます。

プロパティ名デフォルト意味バージョン以降
spark.sql.inMemoryColumnarStorage.compressed true true に設定すると、Spark SQL はデータの統計情報に基づいて各列に自動的に圧縮コーデックを選択します。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 列キャッシュのバッチサイズを制御します。バッチサイズが大きいほど、メモリ使用率と圧縮率が向上する可能性がありますが、データをキャッシュする際に OOM のリスクがあります。 1.1.1

パーティションのチューニング

プロパティ名デフォルト意味バージョン以降
spark.sql.files.maxPartitionBytes 134217728 (128 MB) ファイルを読み込む際に 1 つのパーティションにパックされる最大バイト数。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) ファイルのオープンにかかる推定コスト。同じ時間でスキャンできるバイト数で測定されます。これは、複数のファイルを 1 つのパーティションにまとめる場合に使用されます。過小評価するよりも過大評価する方が良いです。そうすれば、小さなファイルを持つパーティションは、大きなファイルを持つパーティション(最初にスケジュールされる)よりも高速になります。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 2.0.0
spark.sql.files.minPartitionNum デフォルトの並列度 分割されたファイルパーティションの推奨される(保証されない)最小数。設定されていない場合、デフォルト値は `spark.sql.leafNodeDefaultParallelism` になります。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 3.1.0
spark.sql.files.maxPartitionNum なし 分割されたファイルパーティションの推奨される(保証されない)最大数。設定されている場合、初期パーティション数がこの値を超える場合、Spark はパーティション数をこの値に近づけるように各パーティションを再スケーリングします。この設定は、Parquet、JSON、ORC などのファイルベースのソースを使用する場合にのみ有効です。 3.5.0
spark.sql.shuffle.partitions 200 結合または集計のためにデータをシャッフルする際に使用するパーティション数を構成します。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 ジョブ入力パスの並列リストを有効にするためのしきい値を構成します。入力パスの数がこのしきい値より大きい場合、Spark は Spark 分散ジョブを使用してファイルをリストします。そうでない場合、シーケンシャルリストにフォールバックします。この設定は、Parquet、ORC、JSON などのファイルベースのデータソースを使用する場合にのみ有効です。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 ジョブ入力パスの最大リスト並列度を構成します。入力パスの数がこの値より大きい場合、この値を使用してスロットリングされます。この設定は、Parquet、ORC、JSON などのファイルベースのデータソースを使用する場合にのみ有効です。 2.1.1

Coalesce ヒント

Coalesce ヒントは、Spark SQL ユーザーが Dataset API の `coalesce`、`repartition`、`repartitionByRange` と同様に出力ファイルを制御できるようにします。これらは、パフォーマンスチューニングや出力ファイルの削減に使用できます。「COALESCE」ヒントはパーティション数のみをパラメータとして持ちます。「REPARTITION」ヒントは、パーティション数、列、またはその両方/なしをパラメータとして持ちます。「REPARTITION_BY_RANGE」ヒントは列名を必須とし、パーティション数はオプションです。「REBALANCE」ヒントは、初期パーティション数、列、またはその両方/なしをパラメータとして持ちます。

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;

詳細については、パーティショニングヒント のドキュメントを参照してください。

統計情報の活用

Apache Spark が多くの可能なオプションの中から最適な実行計画を選択する能力は、部分的には実行計画の各ノード(読み取り、フィルタリング、結合など)から出力される行数を推定することによって決定されます。これらの推定は、Spark がいくつかの方法で利用可能になった統計情報に基づいています。

統計情報が不足していたり不正確だったりすると、Spark が最適な計画を選択する能力が妨げられ、クエリのパフォーマンスが低下する可能性があります。そのため、Spark で利用可能な統計情報と、クエリの計画および実行中に Spark が行う推定を調べることは役立ちます。

結合戦略の最適化

結合の自動ブロードキャスト

プロパティ名デフォルト意味バージョン以降
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 結合を実行する際に、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を構成します。この値を -1 に設定すると、ブロードキャストを無効にできます。 1.1.0
spark.sql.broadcastTimeout 300

ブロードキャスト結合におけるブロードキャスト待機時間の秒数

1.3.0

結合戦略ヒント

結合戦略ヒント、つまり BROADCASTMERGESHUFFLE_HASHSHUFFLE_REPLICATE_NL は、Spark に、指定された各リレーションで、別 のリレーションと結合する際にヒントされた戦略を使用するように指示します。たとえば、テーブル 't1' で BROADCAST ヒントが使用されている場合、統計情報によって示されるテーブル 't1' のサイズが `spark.sql.autoBroadcastJoinThreshold` 設定を超えている場合でも、't1' をビルドサイドとするブロードキャスト結合(ブロードキャストハッシュ結合またはブロードキャストネストドループ結合(等価結合キーの有無による))が Spark によって優先されます。

結合の両側で異なる結合戦略ヒントが指定されている場合、Spark は BROADCAST ヒントを MERGE ヒントより、MERGE ヒントを SHUFFLE_HASH ヒントより、SHUFFLE_HASH ヒントを SHUFFLE_REPLICATE_NL ヒントよりも優先します。両側で BROADCAST ヒントまたは SHUFFLE_HASH ヒントが指定されている場合、Spark は結合タイプとリレーションのサイズに基づいてビルドサイドを選択します。

特定の戦略がすべての結合タイプをサポートしているとは限らないため、Spark がヒントで指定された結合戦略を選択するという保証はありません。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

詳細については、結合ヒント のドキュメントを参照してください。

アダプティブクエリ実行

アダプティブクエリ実行(AQE)は、Spark SQL の最適化テクニックであり、実行時の統計情報を使用して最も効率的なクエリ実行計画を選択します。これは Apache Spark 3.2.0 以降、デフォルトで有効になっています。Spark SQL は、spark.sql.adaptive.enabled を包括的な設定として使用して、AQE をオンまたはオフにできます。

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.enabled true true の場合、アダプティブクエリ実行を有効にします。これは、正確な実行時統計情報に基づいて、クエリ実行の途中でクエリプランを再最適化します。 1.6.0

シャッフル後パーティションの結合

この機能は、`spark.sql.adaptive.enabled` および `spark.sql.adaptive.coalescePartitions.enabled` の両方の設定が true の場合に、マップ出力統計情報に基づいて、シャッフル後パーティションを結合します。この機能により、クエリの実行時にシャッフルパーティション数をチューニングする手間が省けます。データセットに適合する適切なシャッフルパーティション数を設定する必要はありません。`spark.sql.adaptive.coalescePartitions.initialPartitionNum` 設定で十分な数の初期シャッフルパーティションを設定すれば、Spark は実行時に適切なシャッフルパーティション数を選択できます。

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.coalescePartitions.enabled true true で、かつ `spark.sql.adaptive.enabled` が true の場合、Spark はターゲットサイズ(`spark.sql.adaptive.advisoryPartitionSizeInBytes` で指定)に基づいて連続したシャッフルパーティションを結合し、小さすぎるタスクを回避します。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true true の場合、Spark は連続したシャッフルパーティションを結合する際に `spark.sql.adaptive.advisoryPartitionSizeInBytes`(デフォルト 64MB)で指定されたターゲットサイズを無視し、`spark.sql.adaptive.coalescePartitions.minPartitionSize`(デフォルト 1MB)で指定された最小パーティションサイズのみを尊重して、並列度を最大化します。これは、アダプティブクエリ実行を有効にした際のパフォーマンス低下を避けるためのものです。ビジーなクラスターでリソース使用率をより効率的にするために、この設定を false に設定することをお勧めします(小さすぎるタスクが多数発生しないようにするため)。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 結合後のシャッフルパーティションの最小サイズ。これは、ターゲットサイズがパーティション結合中に無視される場合(デフォルトの場合)に役立ちます。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (なし) 結合前の初期シャッフルパーティション数。設定されていない場合、`spark.sql.shuffle.partitions` と等しくなります。この設定は、`spark.sql.adaptive.enabled` および `spark.sql.adaptive.coalescePartitions.enabled` が両方とも有効な場合にのみ効果があります。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB アダプティブ最適化中(`spark.sql.adaptive.enabled` が true の場合)のシャッフルパーティションのアドバイザリサイズ(バイト単位)。これは、Spark が小さいシャッフルパーティションを結合したり、スキューズしたシャッフルパーティションを分割したりする場合に有効になります。 3.0.0

スキューズしたシャッフルパーティションの分割

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true true で、かつ `spark.sql.adaptive.enabled` が true の場合、Spark は RebalancePartitions におけるスキューズしたシャッフルパーティションを最適化し、ターゲットサイズ(`spark.sql.adaptive.advisoryPartitionSizeInBytes` で指定)に従ってより小さなパーティションに分割して、データスキューズを回避します。 3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 パーティションサイズが `spark.sql.adaptive.advisoryPartitionSizeInBytes` のこの係数倍より小さい場合、そのパーティションは分割中にマージされます。 3.3.0

ソートマージ結合をブロードキャスト結合に変換

AQE は、実行時統計情報がアダプティブブロードキャストハッシュ結合のしきい値より小さい場合、ソートマージ結合をブロードキャストハッシュ結合に変換します。これは、最初にブロードキャストハッシュ結合を計画するほど効率的ではありませんが、ソートマージ結合を続行するよりは優れています。ソートマージ結合を続行するより優れているのは、両方の結合サイドのソートを回避し、ローカルでシャッフルファイルを読み取ってネットワークトラフィックを節約できるためです(`spark.sql.adaptive.localShuffleReader.enabled` が true の場合)。

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.autoBroadcastJoinThreshold (なし) 結合を実行する際に、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を構成します。この値を -1 に設定すると、ブロードキャストを無効にできます。デフォルト値は `spark.sql.autoBroadcastJoinThreshold` と同じです。この設定は、アダプティブフレームワークでのみ使用されることに注意してください。 3.2.0
spark.sql.adaptive.localShuffleReader.enabled true true で、かつ `spark.sql.adaptive.enabled` が true の場合、Spark は、シャッフルパーティションが不要な場合(たとえば、ソートマージ結合をブロードキャストハッシュ結合に変換した後)に、ローカルシャッフルリーダーを使用してシャッフルデータを読み込もうとします。 3.0.0

ソートマージ結合をシャッフルハッシュ結合に変換

AQE は、すべてのシャッフル後パーティションが `spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold` で構成されたしきい値よりも小さい場合、ソートマージ結合をシャッフルハッシュ結合に変換します。

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 ローカルハッシュマップを構築できるパーティションあたりの最大サイズ(バイト単位)を構成します。この値が `spark.sql.adaptive.advisoryPartitionSizeInBytes` より小さくなく、すべてのパーティションサイズがこの設定より大きくない場合、結合選択は `spark.sql.join.preferSortMergeJoin` の値に関係なく、ソートマージ結合よりもシャッフルハッシュ結合を使用することを優先します。 3.2.0

スキューズ結合の最適化

データスキューズは、結合クエリのパフォーマンスを著しく低下させる可能性があります。この機能は、スキューズしたタスクをほぼ均等なサイズのタスクに分割する(必要に応じてレプリケートする)ことによって、ソートマージ結合におけるスキューズを動的に処理します。これは、`spark.sql.adaptive.enabled` および `spark.sql.adaptive.skewJoin.enabled` の両方の設定が有効な場合に有効になります。

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.skewJoin.enabled true true で、かつ `spark.sql.adaptive.enabled` が true の場合、Spark はソートマージ結合におけるスキューズを動的に処理し、スキューズしたパーティションを分割(必要に応じてレプリケート)します。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 パーティションサイズが中央値パーティションサイズのこの係数倍よりも大きく、かつ `spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes` よりも大きい場合、そのパーティションはスキューズしていると見なされます。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB パーティションサイズがこのしきい値より大きく、かつ中央値パーティションサイズに `spark.sql.adaptive.skewJoin.skewedPartitionFactor` を掛けた値よりも大きい場合、そのパーティションはスキューズしていると見なされます。理想的には、この設定は `spark.sql.adaptive.advisoryPartitionSizeInBytes` より大きく設定する必要があります。 3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoin false true の場合、OptimizeSkewedJoin を強制的に有効にします。これは、余分なシャッフルが発生する場合でも、ストラグラータスクを回避するためにスキューズした結合を最適化するアダプティブルールです。 3.3.0

高度なカスタマイズ

カスタムコスト評価クラスを提供するか、AQE オプティマイザールールを除外することで、AQE の動作の詳細を制御できます。

プロパティ名デフォルト意味バージョン以降
spark.sql.adaptive.optimizer.excludedRules (なし) アダプティブオプティマイザで無効にするルールのリストを構成します。ルールはルール名で指定され、カンマで区切られます。オプティマイザは、実際に除外されたルールをログに記録します。 3.1.0
spark.sql.adaptive.customCostEvaluatorClass (なし) アダプティブ実行に使用されるカスタムコスト評価クラス。設定されていない場合、Spark はデフォルトで独自の SimpleCostEvaluator を使用します。 3.2.0

ストレージパーティション結合

ストレージパーティション結合(SPJ)は、Spark SQL の最適化テクニックであり、既存のストレージレイアウトを利用してシャッフルフェーズを回避します。

これは、バケット化されたテーブルにのみ適用されるバケット結合の概念を、FunctionCatalog に登録された関数によってパーティション化されたテーブルに一般化したものです。ストレージパーティション結合は、現在、互換性のある V2 データソースでサポートされています。

以下の SQL プロパティは、さまざまな最適化を備えたさまざまな結合クエリでストレージパーティション結合を有効にします。

プロパティ名デフォルト意味バージョン以降
spark.sql.sources.v2.bucketing.enabled false true の場合、互換性のある V2 データソースによって報告されたパーティショニングを使用して、シャッフルを排除しようとします。 3.3.0
spark.sql.sources.v2.bucketing.pushPartValues.enabled true 有効な場合、結合の一方のサイドに他方のサイドからのパーティション値が欠落している場合、シャッフルを排除しようとします。この設定は、`spark.sql.sources.v2.bucketing.enabled` が true であることを必要とします。 3.4.0
spark.sql.requireAllClusterKeysForCoPartition true true の場合、シャッフルを排除するために、結合または MERGE キーがパーティションキーと同じで、同じ順序であることを要求します。したがって、シャッフルを排除するために、この状況で false に設定してください。 3.4.0
spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled false true で、かつ結合がフルアウター結合でない場合、シャッフルを回避する際に大量のデータを持つパーティションを処理するためのスキューズ最適化を有効にします。テーブル統計情報に基づいて一方のサイドがビッグテーブルとして選択され、このサイドの分割が部分的にクラスタリングされます。もう一方のサイドの分割は、一致するようにグループ化され、レプリケートされます。この設定は、`spark.sql.sources.v2.bucketing.enabled` と `spark.sql.sources.v2.bucketing.pushPartValues.enabled` の両方が true であることを必要とします。 3.4.0
spark.sql.sources.v2.bucketing.allowJoinKeysSubsetOfPartitionKeys.enabled false 有効な場合、結合または MERGE 条件がすべてのパーティション列を含んでいない場合、シャッフルを回避しようとします。この設定は、`spark.sql.sources.v2.bucketing.enabled` と `spark.sql.sources.v2.bucketing.pushPartValues.enabled` の両方が true であり、`spark.sql.requireAllClusterKeysForCoPartition` が false であることを必要とします。 4.0.0
spark.sql.sources.v2.bucketing.allowCompatibleTransforms.enabled false 有効な場合、パーティショントランスフォームが互換性があるが同一ではない場合、シャッフルを回避しようとします。この設定は、`spark.sql.sources.v2.bucketing.enabled` と `spark.sql.sources.v2.bucketing.pushPartValues.enabled` の両方が true であることを必要とします。 4.0.0
spark.sql.sources.v2.bucketing.shuffle.enabled false 有効な場合、もう一方のサイドで V2 データソースによって報告されたパーティショニングを認識することにより、結合の一方のサイドでのシャッフルを回避しようとします。 4.0.0

ストレージパーティション結合が実行される場合、クエリプランには結合前の Exchange ノードは含まれません。

以下の例は、ストレージパーティション結合をサポートする Spark V2 データソースである Iceberg (https://iceberg.apache.org/docs/latest/spark-getting-started/)を使用しています。

CREATE TABLE prod.db.target (id INT, salary INT, dep STRING)
USING iceberg
PARTITIONED BY (dep, bucket(8, id))

CREATE TABLE prod.db.source (id INT, salary INT, dep STRING)
USING iceberg
PARTITIONED BY (dep, bucket(8, id))

EXPLAIN SELECT * FROM target t INNER JOIN source s
ON t.dep = s.dep AND t.id = s.id

-- Plan without Storage Partition Join
== Physical Plan ==
* Project (12)
+- * SortMergeJoin Inner (11)
   :- * Sort (5)
   :  +- Exchange (4) // DATA SHUFFLE
   :     +- * Filter (3)
   :        +- * ColumnarToRow (2)
   :           +- BatchScan (1)
   +- * Sort (10)
      +- Exchange (9) // DATA SHUFFLE
         +- * Filter (8)
            +- * ColumnarToRow (7)
               +- BatchScan (6)


SET 'spark.sql.sources.v2.bucketing.enabled' 'true'
SET 'spark.sql.iceberg.planning.preserve-data-grouping' 'true'
SET 'spark.sql.sources.v2.bucketing.pushPartValues.enabled' 'true'
SET 'spark.sql.requireAllClusterKeysForCoPartition' 'false'
SET 'spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled' 'true'

-- Plan with Storage Partition Join
== Physical Plan ==
* Project (10)
+- * SortMergeJoin Inner (9)
   :- * Sort (4)
   :  +- * Filter (3)
   :     +- * ColumnarToRow (2)
   :        +- BatchScan (1)
   +- * Sort (8)
      +- * Filter (7)
         +- * ColumnarToRow (6)
            +- BatchScan (5)