このページでは、pandas API on Spark(「pandas on Spark」)の利点と、pandasの代わりに(またはpandasと組み合わせて)使用するタイミングについて説明します。
pandas on Sparkはpandasよりもはるかに高速になる可能性があり、pandasユーザーになじみのある構文を提供します。Sparkのパワーとpandasの使いやすさを兼ね備えています。
pandas on Sparkの主な利点は次のとおりです。
pandasには多くの制限があります。
pandas on Sparkがpandasの制限をどのように克服しているかをよりよく理解するために、いくつかの簡単な例を見てみましょう。また、pandas on Sparkの制限についても調査します。
このページの最後には、pandasとpandas on Sparkの両方を組み合わせて使用する方法が示されています。どちらか一方を選択する必要はありません。多くの場合、両方を併用するのが最適な選択肢です。
このセクションでは、pandas on Sparkがローカルホスト上の単一ファイルに対してpandasよりも高速にクエリを実行する方法を示します。pandas on Sparkはすべてのクエリで必ずしも高速になるとは限りませんが、この例では、大幅な高速化が実現される場合を示しています。
9列と10億行のデータを含むParquetファイルがあるとします。ファイルの最初の3行は次のとおりです。
+-------+-------+--------------+-------+-------+--------+------+------+---------+
| id1 | id2 | id3 | id4 | id5 | id6 | v1 | v2 | v3 |
|-------+-------+--------------+-------+-------+--------+------+------+---------|
| id016 | id046 | id0000109363 | 88 | 13 | 146094 | 4 | 6 | 18.8377 |
| id039 | id087 | id0000466766 | 14 | 30 | 111330 | 4 | 14 | 46.7973 |
| id047 | id098 | id0000307804 | 85 | 23 | 187639 | 3 | 5 | 47.5773 |
+-------+-------+--------------+-------+-------+--------+------+------+---------+
pandas on Sparkを使用してファイルを読み取り、グループ化集計を実行する方法は次のとおりです。
import pyspark.pandas as ps
df = ps.read_parquet("G1_1e9_1e2_0_0.parquet")[
["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
このクエリは、Spark 3.5.0を搭載した64 GB RAMの2020 M1 Macbookで実行すると、62秒で完了します。
これを最適化されていないpandasコードと比較してみましょう。
import pandas as pd
df = pd.read_parquet("G1_1e9_1e2_0_0.parquet")[
["id1", "id2", "v3"]
]
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
このクエリは、64GB RAMのマシンに10億行のデータをメモリに格納するのに十分なスペースがないため、エラーが発生します。
クエリを実行するために、pandasの最適化を手動で追加してみましょう。
df = pd.read_parquet(
"G1_1e9_1e2_0_0.parquet",
columns=["id1", "id2", "v3"],
filters=[("id1", ">", "id098")],
engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
このクエリは、pandas 2.2.0で275秒で実行されます。
pandasでこれらの最適化を手動でコーディングすると、誤った結果が生じる可能性があります。正しいグループ化クエリですが、行グループフィルタリング述語が間違っている例を次に示します。
df = pd.read_parquet(
"G1_1e9_1e2_0_0.parquet",
columns=["id1", "id2", "v3"],
filters=[("id1", "==", "id001")],
engine="pyarrow",
)
df.query("id1 > 'id098'").groupby("id2").sum().head(3)
グループ化集計ロジックは正しいにもかかわらず、これは間違った結果を返します!
pandasでは、Parquetファイルを読み取るときに、列のプルーニングと行グループのフィルタリングを手動で適用する必要があります。pandas on Sparkでは、Sparkオプティマイザがこれらのクエリの拡張機能を自動的に適用するため、手動で入力する必要はありません。
pandas on Sparkの利点について、さらに詳しく調べてみましょう。
pandas on Sparkの利点をまとめてみましょう。
高速なクエリ実行
pandas on Sparkは、利用可能なすべてのコアを使用して計算を並列化し、実行前にクエリを最適化して効率的な実行を可能にするため、pandasよりも高速にクエリを実行できます。
pandasの計算は単一コアでのみ実行されます。
メモリサイズを超えるデータセットへのスケーラブル性
pandasはクエリを実行する前にデータをメモリにロードするため、メモリに収まるデータセットのみをクエリできます。
Sparkは、データをストリーミングし、増分的に計算を実行することで、メモリよりも大きいデータセットをクエリできます。
pandasは、データセットのサイズが大きくなるとエラーが発生することで有名ですが、Sparkにはこの制限はありません。
多数のマシンのクラスターで実行可能
Sparkは、単一のマシンで実行することも、クラスター内の多数のマシンに分散することもできます。
Sparkが単一のマシンで実行される場合、計算は利用可能なすべてのコアで実行されます。これは、単一コアでのみ計算を実行するpandasよりも高速になることがよくあります。
複数のマシンでの計算のスケーリングは、より大きなデータセットで計算を実行する場合、または単に多くのRAM /コアにアクセスしてクエリを高速に実行する場合に最適です。
pandasユーザーになじみのある構文
pandas on Sparkは、pandasユーザーになじみのある構文を提供するように設計されています。
なじみのある構文がすべてです。pandas on Sparkは、Sparkのパワーを、pandasユーザーが慣れているのと同じ構文で提供します。
Sparkの実績のあるクエリ最適化機能へのアクセスを提供
pandas on Sparkの計算は、実行前にSparkのCatalystオプティマイザによって最適化されます。
これらの最適化により、クエリが簡素化され、最適化が追加されます。
この投稿の前半では、たとえばParquetファイルを読み取るときに、Sparkが列のプルーニング/行グループフィルタリングの最適化を自動的に追加する方法を説明しました。pandasにはクエリ最適化機能がないため、これらの最適化を自分で追加する必要があります。最適化を手動で追加するのは面倒でエラーが発生しやすいです。適切な最適化を手動で適用しないと、クエリは間違った結果を返します。
pandas on Sparkは、pandasがサポートするすべてのAPIをサポートしていません。理由は2つあります。
一部の機能はまだpandas on Sparkに追加されていません。
一部のpandas機能は、Sparkの分散並列実行モデルでは意味がありません。
Sparkは、データフレームを複数のチャンクに分割して並列処理できるようにするため、特定のpandas操作はSparkの実行モデルにうまく移行できません。
多くの場合、pandas on Sparkとpandasを使用して、両方の長所を活かすことができます。
大規模なデータセットをクレンジングおよび集計して、scikit-learn機械学習モデルに渡される小さなデータセットを作成するとします。
高速なクエリ時間と並列実行を活用するために、pandas on Sparkでデータセットをクレンジングおよび集計できます。データセットが処理されたら、to_pandas()
を使用してpandasデータフレームに変換し、scikit-learnで機械学習モデルを実行できます。このアプローチは、データセットをpandasデータフレームに収まる程度に縮小できる場合に適しています。
pandas on Sparkは、pandasとはまったく異なる方法でクエリを実行します。
pandas on Sparkは遅延評価を使用します。クエリを未解決の論理プランに変換し、Sparkで最適化し、結果が要求された場合にのみ計算を実行します。
pandasは先行評価を使用します。すべてのデータをメモリにロードし、呼び出されたらすぐに操作を実行します。pandasはクエリ最適化を適用せず、クエリを実行する前にすべてのデータをメモリにロードする必要があります。
pandas on Sparkとpandasを比較する場合、データをメモリにロードするのにかかる時間とクエリを実行するのにかかる時間を考慮する必要があります。多くのデータセットは、pandasにロードするのに時間がかかります。
pandas on Sparkでデータをメモリにロードすることもできますが、これは多くの場合アンチパターンと見なされます。メモリにロードされたデータセットは、ストレージ内のデータが(追加、マージ、または削除によって)変更されても更新されません。Sparkデータフレームを永続化することは、特定の状況でクエリ時間を短縮するために賢明ですが、誤ったクエリ結果が発生する可能性があるため、慎重に使用する必要があります。
pandas on SparkとPySparkはどちらもクエリを受け取り、未解決の論理プランに変換してから、Sparkで実行します。
PySparkとpandas on Sparkはどちらも同様のクエリ実行モデルを持っています。クエリを未解決の論理プランに変換するのは比較的速く行えます。クエリの最適化と実行には、はるかに時間がかかります。そのため、PySparkとpandas on Sparkのパフォーマンスは似ているはずです。
pandas on SparkとPySparkの主な違いは、構文だけです。
pandas on Sparkは、クエリをより高速に実行し、独自の最適化を作成するのではなくSparkのオプティマイザを活用したいpandasユーザーにとって優れた選択肢です。
pandas on Sparkは、pandasユーザーになじみのある構文を使用しているため、簡単に習得できます。
pandas on Sparkは、pandasと組み合わせて使用するのにも最適なテクノロジーです。pandas on Sparkのビッグデータ処理機能と高性能処理機能を使用して、データセットを処理してから、他のテクノロジーと互換性のあるpandasデータフレームに変換できます。
pandas on Sparkの使用方法の詳細については、ドキュメントをご覧ください。