pandasおよびPySpark DataFrameとの間で#
pandasおよび/またはPySparkのユーザーは、Spark上のpandas APIを使用する際に、APIの互換性問題に直面することがあります。Spark上のpandas APIはpandasとPySparkの両方の100%の互換性を目指しているわけではないため、ユーザーはこの場合、pandasおよび/またはPySparkのコードを移植するためにいくつかの回避策を実行したり、Spark上のpandas APIに慣れる必要があります。このページではそれについて説明します。
pandas#
pandasのユーザーは、DataFrame.to_pandas()を呼び出すことで、完全なpandas APIにアクセスできます。pandas-on-Spark DataFrameとpandas DataFrameは似ています。しかし、前者は分散されており、後者は単一のマシン上にあります。互いに変換する際に、データは複数のマシンと単一のクライアントマシン間で転送されます。
たとえば、pandas DataFrameのpandas_df.valuesを呼び出す必要がある場合、以下のように行うことができます。
>>> import pyspark.pandas as ps
>>>
>>> psdf = ps.range(10)
>>> pdf = psdf.to_pandas()
>>> pdf.values
array([[0],
[1],
[2],
[3],
[4],
[5],
[6],
[7],
[8],
[9]])
pandas DataFrameは、以下のように簡単にpandas-on-Spark DataFrameにできます。
>>> ps.from_pandas(pdf)
id
0 0
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
pandas-on-Spark DataFrameをpandasに変換するには、すべてのデータをクライアントマシンに収集する必要があることに注意してください。したがって、可能であれば、代わりにSpark上のpandas APIまたはPySpark APIを使用することをお勧めします。
PySpark#
PySparkのユーザーは、DataFrame.to_spark()を呼び出すことで、完全なPySpark APIにアクセスできます。pandas-on-Spark DataFrameとSpark DataFrameは実質的に相互に交換可能です。
たとえば、Spark DataFrameのspark_df.filter(...)を呼び出す必要がある場合、以下のように行うことができます。
>>> import pyspark.pandas as ps
>>>
>>> psdf = ps.range(10)
>>> sdf = psdf.to_spark().filter("id > 5")
>>> sdf.show()
+---+
| id|
+---+
| 6|
| 7|
| 8|
| 9|
+---+
Spark DataFrameは、以下のように簡単にpandas-on-Spark DataFrameにできます。
>>> sdf.pandas_api()
id
0 6
1 7
2 8
3 9
ただし、Spark DataFrameからpandas-on-Spark DataFrameが作成される際に新しいデフォルトインデックスが作成されることに注意してください。Default Index Typeを参照してください。このオーバーヘッドを避けるために、可能な場合はインデックスとして使用する列を指定してください。
>>> # Create a pandas-on-Spark DataFrame with an explicit index.
... psdf = ps.DataFrame({'id': range(10)}, index=range(10))
>>> # Keep the explicit index.
... sdf = psdf.to_spark(index_col='index')
>>> # Call Spark APIs
... sdf = sdf.filter("id > 5")
>>> # Uses the explicit index to avoid to create default index.
... sdf.pandas_api(index_col='index')
id
index
6 6
7 7
8 8
9 9