パフォーマンスチューニング

ワークロードによっては、データをメモリにキャッシュするか、いくつかの実験的なオプションを有効にすることでパフォーマンスを向上させることができます。

メモリへのデータキャッシング

Spark SQLは、spark.catalog.cacheTable("tableName")またはdataFrame.cache()を呼び出すことで、インメモリの列形式を使用してテーブルをキャッシュできます。その後、Spark SQLは必要な列のみをスキャンし、メモリ使用量とGC負荷を最小限に抑えるために自動的に圧縮を調整します。 spark.catalog.uncacheTable("tableName")またはdataFrame.unpersist()を呼び出して、テーブルをメモリから削除できます。

インメモリキャッシングの構成は、SparkSessionsetConfメソッドを使用するか、SQLを使用してSET key=valueコマンドを実行することで行うことができます。

プロパティ名デフォルト意味バージョン
spark.sql.inMemoryColumnarStorage.compressed true trueに設定すると、Spark SQLはデータの統計に基づいて、各列の圧縮コーデックを自動的に選択します。 1.0.1
spark.sql.inMemoryColumnarStorage.batchSize 10000 列キャッシングのバッチサイズを制御します。バッチサイズを大きくすると、メモリ使用率と圧縮率が向上しますが、データをキャッシュするときにOOMのリスクがあります。 1.1.1

その他の構成オプション

次のオプションを使用して、クエリ実行のパフォーマンスを調整することもできます。これらのオプションは、最適化が自動的に行われるようになるにつれて、将来のリリースで非推奨になる可能性があります。

プロパティ名デフォルト意味バージョン
spark.sql.files.maxPartitionBytes 134217728 (128 MB) ファイルを読み取るときに、単一のパーティションにパックする最大バイト数。この構成は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 2.0.0
spark.sql.files.openCostInBytes 4194304 (4 MB) ファイルをオープンするための推定コスト。同じ時間にスキャンできるバイト数で測定されます。これは、複数のファイルをパーティションに入れるときに使用されます。過大評価する方が良く、小さなファイルを含むパーティションは、大きなファイルを含むパーティションよりも高速になります(最初にスケジュールされます)。この構成は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 2.0.0
spark.sql.files.minPartitionNum デフォルトの並列度 分割ファイルパーティションの推奨(保証されていない)最小数。設定されていない場合、デフォルト値は`spark.sql.leafNodeDefaultParallelism`です。この構成は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 3.1.0
spark.sql.files.maxPartitionNum なし 分割ファイルパーティションの推奨(保証されていない)最大数。設定されている場合、初期パーティション数がこの値を超える場合、Sparkは各パーティションをリスケールして、パーティション数がこの値に近づくようにします。この構成は、Parquet、JSON、ORCなどのファイルベースのソースを使用する場合にのみ有効です。 3.5.0
spark.sql.broadcastTimeout 300

ブロードキャスト結合でのブロードキャスト待機時間のタイムアウト(秒単位)

1.3.0
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 結合を実行するときに、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ(バイト単位)を構成します。この値を-1に設定すると、ブロードキャストを無効にできます。現在、統計は、コマンドANALYZE TABLE <tableName> COMPUTE STATISTICS noscanが実行されているHive Metastoreテーブルでのみサポートされていることに注意してください。 1.1.0
spark.sql.shuffle.partitions 200 結合または集計のためにデータをシャッフルするときに使用するパーティション数を構成します。 1.1.0
spark.sql.sources.parallelPartitionDiscovery.threshold 32 ジョブ入力パスの並列リストを有効にするためのしきい値を構成します。入力パスの数がこのしきい値よりも大きい場合、SparkはSpark分散ジョブを使用してファイルを一覧表示します。それ以外の場合は、順次リストに戻ります。この構成は、Parquet、ORC、JSONなどのファイルベースのデータソースを使用する場合にのみ有効です。 1.5.0
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 ジョブ入力パスの最大リスト並列度を構成します。入力パスの数がこの値よりも大きい場合、この値を使用するようにスロットリングされます。この構成は、Parquet、ORC、JSONなどのファイルベースのデータソースを使用する場合にのみ有効です。 2.1.1

SQLクエリの結合戦略ヒント

結合戦略ヒント、つまりBROADCASTMERGESHUFFLE_HASH、およびSHUFFLE_REPLICATE_NLは、関係を別の関係と結合するときに、指定された各関係にヒント付きの戦略を使用するようにSparkに指示します。たとえば、テーブル 't1' で BROADCAST ヒントが使用されている場合、統計で示されるテーブル 't1' のサイズが構成 spark.sql.autoBroadcastJoinThreshold を超えている場合でも、't1' をビルド側とするブロードキャスト結合 (等価結合キーがあるかどうかによって、ブロードキャストハッシュ結合またはブロードキャストネストループ結合) が Spark によって優先されます。

結合の両側で異なる結合戦略ヒントが指定されている場合、SparkはSHUFFLE_REPLICATE_NLヒントよりもSHUFFLE_HASHヒントよりもMERGEヒントよりもBROADCASTヒントを優先します。両側が BROADCAST ヒントまたは SHUFFLE_HASH ヒントで指定されている場合、Spark は結合タイプと関係のサイズに基づいてビルド側を選択します。

特定の戦略がすべての結合タイプをサポートしていない可能性があるため、Spark がヒントで指定された結合戦略を選択することを保証するものではないことに注意してください。

spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
src <- sql("SELECT * FROM src")
records <- sql("SELECT * FROM records")
head(join(src, hint(records, "broadcast"), src$key == records$key))
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key

詳細については、結合ヒントのドキュメントを参照してください。

SQLクエリの結合ヒント

結合ヒントを使用すると、Spark SQL ユーザーは、Dataset API の coalescerepartition、および repartitionByRange と同様に、出力ファイルの数を制御できます。これらは、パフォーマンスチューニングと出力ファイルの数を減らすために使用できます。「COALESCE」ヒントには、パラメータとしてパーティション番号のみがあります。「REPARTITION」ヒントには、パラメータとしてパーティション番号、列、またはその両方/どちらもありません。「REPARTITION_BY_RANGE」ヒントには、列名が必要であり、パーティション番号はオプションです。「REBALANCE」ヒントには、パラメータとして初期パーティション番号、列、またはその両方/どちらもありません。

SELECT /*+ COALESCE(3) */ * FROM t;
SELECT /*+ REPARTITION(3) */ * FROM t;
SELECT /*+ REPARTITION(c) */ * FROM t;
SELECT /*+ REPARTITION(3, c) */ * FROM t;
SELECT /*+ REPARTITION */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t;
SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t;
SELECT /*+ REBALANCE */ * FROM t;
SELECT /*+ REBALANCE(3) */ * FROM t;
SELECT /*+ REBALANCE(c) */ * FROM t;
SELECT /*+ REBALANCE(3, c) */ * FROM t;

詳細については、パーティショニングヒントのドキュメントを参照してください。

適応クエリ実行

適応クエリ実行(AQE)は、Apache Spark 3.2.0以降、デフォルトで有効になっている最も効率的なクエリ実行プランを選択するためにランタイム統計を使用するSpark SQLの最適化手法です。Spark SQLは、spark.sql.adaptive.enabledを包括的な構成として使用して、AQEをオン/オフできます。Spark 3.0の時点では、AQEには、シャッフル後のパーティションの結合、ソートマージ結合からブロードキャスト結合への変換、およびスキュー結合の最適化の3つの主要な機能があります。

シャッフル後のパーティションの結合

spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabledの両方の構成がtrueの場合、この機能はマップ出力統計に基づいてシャッフル後のパーティションを結合します。この機能により、クエリの実行時にシャッフルパーティション数のチューニングが簡素化されます。データセットに合わせて適切なシャッフルパーティション数を設定する必要はありません。 spark.sql.adaptive.coalescePartitions.initialPartitionNum構成を介して十分な初期シャッフルパーティション数を設定すると、Sparkは実行時に適切なシャッフルパーティション数を選択できます。

プロパティ名デフォルト意味バージョン
spark.sql.adaptive.coalescePartitions.enabled true spark.sql.adaptive.enabled が true の場合、かつこれが true の場合、Spark は、小さすぎるタスクを避けるために、目標サイズ (spark.sql.adaptive.advisoryPartitionSizeInBytes で指定) に従って、連続したシャッフルパーティションを結合します。 3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirst true true の場合、Spark は連続したシャッフルパーティションを結合する際に、spark.sql.adaptive.advisoryPartitionSizeInBytes (デフォルト 64MB) で指定された目標サイズを無視し、並列処理を最大化するために、spark.sql.adaptive.coalescePartitions.minPartitionSize (デフォルト 1MB) で指定された最小パーティションサイズのみを考慮します。これは、アダプティブクエリ実行を有効にした際にパフォーマンスが低下するのを防ぐためです。この構成を false に設定し、spark.sql.adaptive.advisoryPartitionSizeInBytes で指定された目標サイズを尊重することをお勧めします。 3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize 1MB 結合後のシャッフルパーティションの最小サイズ。その値は、spark.sql.adaptive.advisoryPartitionSizeInBytes の最大 20% にすることができます。これは、パーティションの結合中に目標サイズが無視される場合 (デフォルトの場合) に役立ちます。 3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum (なし) 結合前のシャッフルパーティションの初期数。設定されていない場合、spark.sql.shuffle.partitions と同じになります。この構成は、spark.sql.adaptive.enabledspark.sql.adaptive.coalescePartitions.enabled の両方が有効な場合にのみ有効になります。 3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes 64 MB アダプティブ最適化中のシャッフルパーティションの推奨サイズ (バイト単位) (spark.sql.adaptive.enabled が true の場合)。Spark が小さなシャッフルパーティションを結合したり、偏ったシャッフルパーティションを分割したりするときに有効になります。 3.0.0

偏ったシャッフルパーティションの分割

プロパティ名デフォルト意味バージョン
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled true true であり、spark.sql.adaptive.enabled が true の場合、Spark は RebalancePartitions 内の偏ったシャッフルパーティションを最適化し、データスキューを回避するために、目標サイズ (spark.sql.adaptive.advisoryPartitionSizeInBytes で指定) に従って、それらをより小さなものに分割します。 3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor 0.2 パーティションのサイズが、この係数に spark.sql.adaptive.advisoryPartitionSizeInBytes を掛けた値よりも小さい場合、分割中にマージされます。 3.3.0

ソートマージ結合からブロードキャスト結合への変換

AQE は、結合側の実行時統計がアダプティブブロードキャストハッシュ結合のしきい値よりも小さい場合、ソートマージ結合をブロードキャストハッシュ結合に変換します。これは、最初にブロードキャストハッシュ結合を計画するほど効率的ではありませんが、結合の両側のソートを節約し、(spark.sql.adaptive.localShuffleReader.enabled が true の場合は)ネットワークトラフィックを節約するためにシャッフルファイルをローカルで読み取ることができるため、ソートマージ結合を継続するよりも優れています。

プロパティ名デフォルト意味バージョン
spark.sql.adaptive.autoBroadcastJoinThreshold (なし) 結合を実行するときに、すべてのワーカーノードにブロードキャストされるテーブルの最大サイズ (バイト単位) を構成します。この値を -1 に設定すると、ブロードキャストを無効にできます。デフォルト値は spark.sql.autoBroadcastJoinThreshold と同じです。この構成は、アダプティブフレームワークでのみ使用されることに注意してください。 3.2.0
spark.sql.adaptive.localShuffleReader.enabled true true であり、spark.sql.adaptive.enabled が true の場合、Spark は、シャッフルパーティション分割が必要ない場合 (たとえば、ソートマージ結合をブロードキャストハッシュ結合に変換した後など)、ローカルシャッフルリーダーを使用してシャッフルデータを読み取ろうとします。 3.0.0

ソートマージ結合からシャッフルハッシュ結合への変換

AQE は、すべてのシャッフル後のパーティションがしきい値より小さい場合、ソートマージ結合をシャッフルハッシュ結合に変換します。最大しきい値は、構成 spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold を参照してください。

プロパティ名デフォルト意味バージョン
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold 0 ローカルハッシュマップの構築を許可できるパーティションあたりの最大サイズ (バイト単位) を構成します。この値が spark.sql.adaptive.advisoryPartitionSizeInBytes より小さくなく、すべてのパーティションサイズがこの構成より大きくない場合、結合の選択では、spark.sql.join.preferSortMergeJoin の値に関係なく、ソートマージ結合の代わりにシャッフルハッシュ結合を使用することを優先します。 3.2.0

スキュー結合の最適化

データスキューは、結合クエリのパフォーマンスを著しく低下させる可能性があります。この機能は、偏ったタスクをほぼ均等なサイズのタスクに分割する (必要に応じて複製する) ことにより、ソートマージ結合のスキューを動的に処理します。spark.sql.adaptive.enabled および spark.sql.adaptive.skewJoin.enabled の両方の構成が有効な場合に有効になります。

プロパティ名デフォルト意味バージョン
spark.sql.adaptive.skewJoin.enabled true true であり、spark.sql.adaptive.enabled が true の場合、Spark は、偏ったパーティションを分割する (必要に応じて複製する) ことにより、ソートマージ結合のスキューを動的に処理します。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor 5.0 パーティションのサイズが、中央値のパーティションサイズにこの係数を掛けた値よりも大きく、かつ spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes よりも大きい場合、偏っていると見なされます。 3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 256MB パーティションのサイズがバイト単位でこのしきい値よりも大きく、かつ中央値のパーティションサイズに spark.sql.adaptive.skewJoin.skewedPartitionFactor を掛けた値よりも大きい場合、偏っていると見なされます。理想的には、この構成は spark.sql.adaptive.advisoryPartitionSizeInBytes よりも大きく設定する必要があります。 3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoin false true の場合、余分なシャッフルが発生する場合でも、ストラグラータスクを回避するために、偏った結合を最適化するアダプティブルールである OptimizeSkewedJoin を強制的に有効にします。 3.3.0

その他

プロパティ名デフォルト意味バージョン
spark.sql.adaptive.optimizer.excludedRules (なし) アダプティブオプティマイザーで無効にするルールのリストを構成します。ルールは、ルール名で指定し、コンマで区切ります。オプティマイザーは、実際に除外されたルールをログに記録します。 3.1.0
spark.sql.adaptive.customCostEvaluatorClass (なし) アダプティブ実行に使用するカスタムコスト評価クラス。設定されていない場合、Spark はデフォルトで独自の SimpleCostEvaluator を使用します。 3.2.0