ベストプラクティス#

PySpark API を活用する#

Pandas API on Spark は内部で Spark を使用しています。したがって、多くの機能とパフォーマンス最適化は Pandas API on Spark でも利用できます。これらの最先端の機能を Pandas API on Spark と組み合わせて活用してください。

既存の Spark コンテキストと Spark セッションは、Pandas API on Spark でそのまま使用されます。すでに設定済みの Spark コンテキストまたはセッションが実行されている場合、Pandas API on Spark はそれらを使用します。

環境に Spark コンテキストまたはセッションが実行されていない場合(例:通常の Python インタープリター)、そのような設定は SparkContext および/または SparkSession に設定できます。Spark コンテキストおよび/またはセッションが作成されると、Pandas API on Spark はこのコンテキストおよび/またはセッションを自動的に使用できます。たとえば、Spark のエクゼキュータメモリを設定したい場合、以下のように行うことができます。

from pyspark import SparkConf, SparkContext
conf = SparkConf()
conf.set('spark.executor.memory', '2g')
# Pandas API on Spark automatically uses this Spark context with the configurations set.
SparkContext(conf=conf)

import pyspark.pandas as ps
...

もう 1 つの一般的な構成は、PySpark の Arrow 最適化です。SQL の構成の場合、以下のように Spark セッションに設定できます。

from pyspark.sql import SparkSession
builder = SparkSession.builder.appName("pandas-on-spark")
builder = builder.config("spark.sql.execution.arrow.pyspark.enabled", "true")
# Pandas API on Spark automatically uses this Spark session with the configurations set.
builder.getOrCreate()

import pyspark.pandas as ps
...

履歴サーバー、Web UI、デプロイメントモードなどのすべての Spark 機能は、Pandas API on Spark でもそのまま使用できます。パフォーマンスチューニングに興味がある場合は、Spark のチューニングも参照してください。

実行計画を確認する#

Pandas API on Spark は遅延実行に基づいているため、実際の計算の前に PySpark API の DataFrame.spark.explain() を活用することで、高コストな操作を予測できます。例を以下に示します。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Filter (id#1L > 5)
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

このようなケースで確信が持てない場合は、実際の実行計画を確認して、高コストなケースを予測できます。

Pandas API on Spark は、Spark オプティマイザーを活用してこのようなシャッフル操作を最適化し、削減するために最善を尽くしていますが、可能な限りアプリケーション側でシャッフルを避けることが最善です。

チェックポイントを使用する#

Pandas API on Spark オブジェクトに対して多数の操作を行った後、巨大で複雑な計画のために Spark の内部プランナーが遅くなることがあります。Spark 計画が巨大になったり、計画に長い時間がかかったりする場合、DataFrame.spark.checkpoint() または DataFrame.spark.local_checkpoint() が役立ちます。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf = psdf[psdf.id > 5]
>>> psdf['id'] = psdf['id'] + (10 * psdf['id'] + psdf['id'])
>>> psdf = psdf.groupby('id').head(2)
>>> psdf.spark.explain()
== Physical Plan ==
*(3) Project [__index_level_0__#0L, id#31L]
+- *(3) Filter (isnotnull(__row_number__#44) AND (__row_number__#44 <= 2))
   +- Window [row_number() windowspecdefinition(__groupkey_0__#36L, __natural_order__#16L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS __row_number__#44], [__groupkey_0__#36L], [__natural_order__#16L ASC NULLS FIRST]
      +- *(2) Sort [__groupkey_0__#36L ASC NULLS FIRST, __natural_order__#16L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(__groupkey_0__#36L, 200), true, [id=#33]
            +- *(1) Project [__index_level_0__#0L, (id#1L + ((id#1L * 10) + id#1L)) AS __groupkey_0__#36L, (id#1L + ((id#1L * 10) + id#1L)) AS id#31L, __natural_order__#16L]
               +- *(1) Project [__index_level_0__#0L, id#1L, monotonically_increasing_id() AS __natural_order__#16L]
                  +- *(1) Filter (id#1L > 5)
                     +- *(1) Scan ExistingRDD[__index_level_0__#0L,id#1L]

>>> psdf = psdf.spark.local_checkpoint()  # or psdf.spark.checkpoint()
>>> psdf.spark.explain()
== Physical Plan ==
*(1) Project [__index_level_0__#0L, id#31L]
+- *(1) Scan ExistingRDD[__index_level_0__#0L,id#31L,__natural_order__#59L]

ご覧のとおり、以前の Spark 計画は破棄され、単純な計画で開始されます。DataFrame.spark.checkpoint() を呼び出したときに前の DataFrame の結果は設定されたファイルシステムに保存され、DataFrame.spark.local_checkpoint() を呼び出したときにエクゼキュータに保存されます。

シャッフルを避ける#

sort_values のような一部の操作は、単一マシンのインメモリで行うよりも、並列または分散環境で行うのが難しくなります。なぜなら、他のノードにデータを送信し、ネットワーク経由で複数のノード間でデータを交換する必要があるからです。以下に例を示します。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)}).sort_values(by="id")
>>> psdf.spark.explain()
== Physical Plan ==
*(2) Sort [id#9L ASC NULLS LAST], true, 0
+- Exchange rangepartitioning(id#9L ASC NULLS LAST, 200), true, [id=#18]
   +- *(1) Scan ExistingRDD[__index_level_0__#8L,id#9L]

ご覧のとおり、シャッフルを必要とする Exchange が必要であり、これは高コストになる可能性が高いです。

単一パーティションでの計算を避ける#

もう 1 つの一般的なケースは、単一パーティションでの計算です。現在、DataFrame.rank のような一部の API は、パーティション指定を指定せずに PySpark の Window を使用しています。これにより、すべてのデータが単一マシン上の単一パーティションに移動し、深刻なパフォーマンス低下を引き起こす可能性があります。このような API は、非常に大きなデータセットでは避けるべきです。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'id': range(10)})
>>> psdf.rank().spark.explain()
== Physical Plan ==
*(4) Project [__index_level_0__#16L, id#24]
+- Window [avg(cast(_w0#26 as bigint)) windowspecdefinition(id#17L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS id#24], [id#17L]
   +- *(3) Project [__index_level_0__#16L, _w0#26, id#17L]
      +- Window [row_number() windowspecdefinition(id#17L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _w0#26], [id#17L ASC NULLS FIRST]
         +- *(2) Sort [id#17L ASC NULLS FIRST], false, 0
            +- Exchange SinglePartition, true, [id=#48]
               +- *(1) Scan ExistingRDD[__index_level_0__#16L,id#17L]

代わりに、GroupBy.rank を使用してください。これは、データをグループごとに分散して計算できるため、コストが低くなります。

予約済みの列名を使用しない#

先頭に __ 、末尾に __ が付く列は、Pandas API on Spark では予約されています。インデックスなどの内部動作を処理するために、Pandas API on Spark はいくつかの内部列を使用します。したがって、このような列名を使用することは推奨されず、動作が保証されるものではありません。

重複した列名を使用しない#

Spark SQL は一般的にこれを許可しないため、重複した列名を使用することは許可されていません。Pandas API on Spark はこの動作を継承します。たとえば、以下を参照してください。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'b':[3, 4]})
>>> psdf.columns = ["a", "a"]
...
Reference 'a' is ambiguous, could be: a, a.;

さらに、大文字と小文字を区別する列名を使用することは強く推奨されません。Pandas API on Spark はデフォルトでこれを許可しません。

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
...
Reference 'a' is ambiguous, could be: a, a.;

ただし、Spark の構成で spark.sql.caseSensitive をオンにすると、自己責任で有効にすることができます。

>>> from pyspark.sql import SparkSession
>>> builder = SparkSession.builder.appName("pandas-on-spark")
>>> builder = builder.config("spark.sql.caseSensitive", "true")
>>> builder.getOrCreate()

>>> import pyspark.pandas as ps
>>> psdf = ps.DataFrame({'a': [1, 2], 'A':[3, 4]})
>>> psdf
   a  A
0  1  3
1  2  4

Spark DataFrame から pandas-on-Spark DataFrame への変換時にインデックス列を指定する#

pandas-on-Spark DataFrame が Spark DataFrame から変換されるとき、インデックス情報が失われ、Pandas API on Spark DataFrame でデフォルトのインデックスが使用されます。デフォルトのインデックスは、明示的にインデックス列を指定するのに比べて、一般的に非効率的です。可能な限りインデックス列を指定してください。

PySpark の操作については、PySpark の操作 を参照してください。

distributed または distributed-sequence のデフォルトインデックスを使用する#

pandas-on-Spark ユーザーが直面する一般的な問題の 1 つは、デフォルトのインデックスによるパフォーマンスの低下です。Pandas API on Spark は、インデックスが不明な場合(例:Spark DataFrame が直接 pandas-on-Spark DataFrame に変換された場合)にデフォルトのインデックスをアタッチします。

注:sequence は単一パーティションでの計算を必要とし、これは推奨されません。本番環境で大量のデータを処理する予定がある場合は、デフォルトインデックスを distributed または distributed-sequence に設定して分散させます。

デフォルトインデックスの構成の詳細については、デフォルトインデックスタイプ を参照してください。

distributed-sequence によるインデックスの不整合の処理#

distributed-sequence はグローバルにシーケンシャルなインデックスを保証しますが、異なる操作間で同じ行とインデックスのマッピングが維持されることは保証されません。apply()groupby()、または transform() のような操作は、インデックスが再生成される原因となり、行と計算された値の不整合を引き起こす可能性があります。

apply() による問題の例#

次の例では、record_id が一意の識別子として機能するデータセットをロードし、apply() 関数を使用して期間(営業日数)を計算します。しかし、apply() 中の distributed-sequence インデックスの再生成により、結果が誤った行に割り当てられる可能性があります。

import pyspark.pandas as ps
import numpy as np

ps.set_option('compute.default_index_type', 'distributed-sequence')

df = ps.DataFrame({
    'record_id': ["RECORD_1001", "RECORD_1002"],
    'start_date': ps.to_datetime(["2024-01-01", "2024-01-02"]),
    'end_date': ps.to_datetime(["2024-01-01", "2024-01-03"])
})

df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)

期待される出力

record_id    start_date    end_date     duration
RECORD_1001  2024-01-01    2024-01-01   0
RECORD_1002  2024-01-02    2024-01-03   1

しかし、apply() 中に distributed-sequence インデックスが再生成されるため、結果の DataFrame は以下のようになる可能性があります。

record_id    start_date    end_date     duration
RECORD_1002  2024-01-02    2024-01-03   0   # Wrong mapping!
RECORD_1001  2024-01-01    2024-01-01   1   # Wrong mapping!

インデックスの不整合を防ぐためのベストプラクティス#

行とインデックスのマッピングが一貫して維持されるようにするには、次のアプローチを検討してください。

  1. 関数を適用する前に、明示的にインデックス列を設定する

    df = df.set_index("record_id")  # Ensure the index is explicitly set
    df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)
    
  2. 関数を適用する前に DataFrame を永続化して、行の順序を維持する

    df = df.spark.persist()
    df['duration'] = df.apply(lambda x: np.busday_count(x['start_date'].date(), x['end_date'].date()), axis=1)
    
  3. シーケンスインデックスタイプを使用する(潜在的なパフォーマンスのトレードオフに注意)

    ps.set_option('compute.default_index_type', 'sequence')
    

アプリケーションで厳密な行とインデックスのマッピングが必要な場合は、デフォルトの distributed-sequence インデックスに依存するのではなく、上記のアプローチのいずれかを使用することを検討してください。

詳細については、デフォルトインデックスタイプ を参照してください。

異なる DataFrame/Series に対する操作を減らす#

Pandas API on Spark は、高コストな操作を防ぐために、デフォルトで異なる DataFrame(または Series)に対する操作を許可しません。内部的に結合操作を実行しますが、これは一般的に高コストであり、推奨されません。可能な限り、この操作は避けるべきです。

詳細については、異なる DataFrame に対する操作 を参照してください。

可能な限り Pandas API on Spark を直接使用する#

Pandas API on Spark には pandas と同等の API が多数ありますが、まだ実装されていない API や明示的にサポートされていない API もいくつかあります。

例として、Pandas API on Spark は、ユーザーがクラスタ全体からすべてのデータをクライアント(ドライバー)サイドに収集することを防ぐために、__iter__() を実装していません。残念ながら、min、max、sum などの Python の組み込み関数のような多くの外部 API は、指定された引数がイテラブルであることを要求します。pandas の場合、以下のようにそのまま適切に動作します。

>>> import pandas as pd
>>> max(pd.Series([1, 2, 3]))
3
>>> min(pd.Series([1, 2, 3]))
1
>>> sum(pd.Series([1, 2, 3]))
6

pandas のデータセットは単一マシン上に存在し、自然に同じマシン内でローカルにイテラブルです。しかし、pandas-on-Spark のデータセットは複数のマシンにまたがり、分散方式で計算されます。ローカルでイテラブルにすることは困難であり、ユーザーが知らないうちに、データ全体をクライアントサイドに収集してしまう可能性が非常に高いです。したがって、Pandas API on Spark を使用し続けることが最善です。上記の例は、以下のように変換できます。

>>> import pyspark.pandas as ps
>>> ps.Series([1, 2, 3]).max()
3
>>> ps.Series([1, 2, 3]).min()
1
>>> ps.Series([1, 2, 3]).sum()
6

pandas ユーザーからのもう 1 つの一般的なパターンは、リスト内包表記またはジェネレータ式に依存することかもしれません。しかし、これも内部的にデータセットがローカルでイテラブルであることを前提としています。したがって、以下のように pandas ではシームレスに動作します。

>>> import pandas as pd
>>> data = []
>>> countries = ['London', 'New York', 'Helsinki']
>>> pser = pd.Series([20., 21., 12.], index=countries)
>>> for temperature in pser:
...     assert temperature > 0
...     if temperature > 1000:
...         temperature = None
...     data.append(temperature ** 2)
...
>>> pd.Series(data, index=countries)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64

しかし、Pandas API on Spark では、上記の理由と同じ理由で動作しません。上記の例も、以下のように Pandas API on Spark を直接使用するように変更できます。

>>> import pyspark.pandas as ps
>>> import numpy as np
>>> countries = ['London', 'New York', 'Helsinki']
>>> psser = ps.Series([20., 21., 12.], index=countries)
>>> def square(temperature) -> np.float64:
...     assert temperature > 0
...     if temperature > 1000:
...         temperature = None
...     return temperature ** 2
...
>>> psser.apply(square)
London      400.0
New York    441.0
Helsinki    144.0
dtype: float64