PySpark における Apache Arrow#
Apache Arrow は、Spark で JVM と Python プロセスの間で効率的にデータを転送するために使用されるインメモリ列指向データフォーマットです。これは現在、Pandas/NumPy データを扱う Python ユーザーにとって最も有益です。その使用は自動ではなく、最大限の活用と互換性の確保のために、設定やコードにいくつかの軽微な変更が必要になる場合があります。このガイドでは、Spark で Arrow を使用する方法の概要と、Arrow 対応データで作業する際の相違点について説明します。
PyArrow のインストールを確認する#
PySpark で Apache Arrow を使用するには、推奨される PyArrow のバージョン をインストールする必要があります。pip を使用して PySpark をインストールする場合、SQL モジュールの追加依存関係として PyArrow を pip install pyspark[sql] コマンドで導入できます。それ以外の場合は、すべてのクラスタノードに PyArrow がインストールされて利用可能であることを確認する必要があります。pip または conda-forge チャンネルから conda を使用してインストールできます。詳細については、PyArrow のインストールを参照してください。
Arrow テーブルへの/からの変換#
Spark 4.0 以降では、SparkSession.createDataFrame() を使用して PyArrow Table から Spark DataFrame を作成できます。また、DataFrame.toArrow() を使用して Spark DataFrame を PyArrow Table に変換できます。
import pyarrow as pa
import numpy as np
# Create a PyArrow Table
table = pa.table([pa.array(np.random.rand(100)) for i in range(3)], names=["a", "b", "c"])
# Create a Spark DataFrame from the PyArrow Table
df = spark.createDataFrame(table)
# Convert the Spark DataFrame to a PyArrow Table
result_table = df.select("*").toArrow()
print(result_table.schema)
# a: double
# b: double
# c: double
注意:DataFrame.toArrow() は、DataFrame のすべてのレコードをドライバプログラムに収集する結果となるため、データの小さなサブセットに対してのみ実行することをお勧めします。現在、すべての Spark および Arrow のデータ型がサポートされているわけではなく、列にサポートされていない型が含まれている場合はエラーが発生する可能性があります。
Pandas との変換の有効化#
Arrow は、DataFrame.toPandas() の呼び出しを使用して Spark DataFrame を Pandas DataFrame に変換する場合、および SparkSession.createDataFrame() を使用して Pandas DataFrame から Spark DataFrame を作成する場合の最適化として利用できます。これらの呼び出しを実行する際に Arrow を使用するには、まず Spark 設定 spark.sql.execution.arrow.pyspark.enabled を true に設定する必要があります。これはデフォルトでは無効になっています。
さらに、spark.sql.execution.arrow.pyspark.enabled によって有効化された最適化は、Spark 内の実際の計算の前にエラーが発生した場合、自動的に非 Arrow 最適化実装にフォールバックする可能性があります。これは spark.sql.execution.arrow.pyspark.fallback.enabled によって制御できます。
import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()
print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))
上記の最適化を Arrow とともに使用すると、Arrow が有効になっていない場合と同じ結果が得られます。
注意:Arrow を使用しても、DataFrame.toPandas() は DataFrame のすべてのレコードをドライバプログラムに収集する結果となるため、データの小さなサブセットに対してのみ実行することをお勧めします。現在、すべての Spark データ型がサポートされているわけではなく、列にサポートされていない型が含まれている場合はエラーが発生する可能性があります。SparkSession.createDataFrame() 中にエラーが発生した場合、Spark は Arrow なしで DataFrame を作成するようにフォールバックします。
Pandas UDF (別名: Vectorized UDF)#
Pandas UDF は、Spark が Arrow を使用してデータを転送し、Pandas を使用してデータで操作を行うことで実行されるユーザー定義関数であり、ベクトル化された操作を可能にします。pandas_udf() をデコレーターとして使用するか、関数をラップすることで Pandas UDF を定義でき、追加の設定は不要です。Pandas UDF は、一般的に通常の PySpark 関数 API として動作します。
Spark 3.0 より前は、Pandas UDF は pyspark.sql.functions.PandasUDFType で定義されていました。Spark 3.0 以降、Python 3.6+ では、Python の型ヒントも使用できます。Python の型ヒントの使用が推奨されており、pyspark.sql.functions.PandasUDFType は将来のリリースで非推奨になります。
型ヒントはすべての場合で pandas.Series を使用する必要がありますが、入力または出力列が StructType である場合に、その入力または出力型ヒントとして pandas.DataFrame を使用する必要があるバリアントが 1 つあります。次の例は、long 列、string 列、および struct 列を受け取り、struct 列を出力する Pandas UDF を示しています。この UDF では、次のように型ヒントに pandas.Series と pandas.DataFrame を指定する必要があります。
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long") # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
s3['col2'] = s1 + s2.str.len()
return s3
# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# | |-- col1: string (nullable = true)
df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# | |-- col1: string (nullable = true)
# | |-- col2: long (nullable = true)
以降のセクションでは、サポートされている型ヒントの組み合わせについて説明します。簡単にするために、pandas.DataFrame バリアントは省略します。
Series から Series へ#
型ヒントは、pandas.Series, … -> pandas.Series として表現できます。
上記の型ヒントを持つ関数で pandas_udf() を使用すると、指定された関数が 1 つ以上の pandas.Series を受け取り、1 つの pandas.Series を出力する Pandas UDF が作成されます。関数の出力は、常に少なくとも入力と同じ長さである必要があります。内部的には、PySpark は列をバッチに分割し、各バッチに対して関数をデータのサブセットとして呼び出し、結果を連結することで Pandas UDF を実行します。
次の例は、2 つの列の積を計算するこの Pandas UDF の作成方法を示しています。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType()) # type: ignore[call-overload]
# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
詳細な使用方法については、pandas_udf() を参照してください。
Series のイテレータから Series のイテレータへ#
型ヒントは、Iterator[pandas.Series] -> Iterator[pandas.Series] として表現できます。
上記の型ヒントを持つ関数で pandas_udf() を使用すると、指定された関数が pandas.Series のイテレータを受け取り、pandas.Series のイテレータを出力する Pandas UDF が作成されます。関数からの全体の出力長は、全体の入力長と同じである必要があります。したがって、長さが同じ限り、入力イテレータからデータをプリフェッチできます。この場合、作成された Pandas UDF は、Pandas UDF が呼び出されたときに 1 つの入力列を必要とします。複数の入力列を使用するには、別の型ヒントが必要です。Iterator of Multiple Series to Iterator of Series を参照してください。
また、UDF の実行で状態を初期化する必要がある場合に便利ですが、内部的には Series to Series のケースと同一に機能します。疑似コードは例を示しています。
@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Do some expensive initialization with a state
state = very_expensive_initialization()
for x in iterator:
# Use that state for the whole iterator.
yield calculate_with_state(x, state)
df.select(calculate("value")).show()
次の例は、この Pandas UDF の作成方法を示しています。
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in iterator:
yield x + 1
df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
詳細な使用方法については、pandas_udf() を参照してください。
複数の Series のイテレータから Series のイテレータへ#
型ヒントは、Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series] として表現できます。
pandas_udf() を上記の型ヒントを持つ関数で使用すると、指定された関数が複数の pandas.Series のタプルのイテレータを受け取り、pandas.Series のイテレータを出力する Pandas UDF が作成されます。この場合、作成された Pandas UDF は、Pandas UDF が呼び出されたときにタプルのシリーズと同じ数の複数の入力列を必要とします。それ以外の場合は、Iterator of Series to Iterator of Series のケースと同じ特性と制限があります。
次の例は、この Pandas UDF の作成方法を示しています。
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# Declare the function and create the UDF
@pandas_udf("long") # type: ignore[call-overload]
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
詳細な使用方法については、pandas_udf() を参照してください。
Series からスカラーへ#
型ヒントは、pandas.Series, … -> Any として表現できます。
上記の型ヒントを持つ関数で pandas_udf() を使用すると、PySpark の集計関数に似た Pandas UDF が作成されます。指定された関数は pandas.Series を受け取り、スカラー値を返します。戻り値の型はプリミティブなデータ型である必要があり、返されるスカラーは Python のプリミティブ型(例: int または float)または NumPy のデータ型(例: numpy.int64 または numpy.float64)のいずれかです。Any は、理想的には対応する特定のスカラー型であるべきです。
この UDF は、GroupedData.agg() および Window とともに使用することもできます。これは、1 つ以上の pandas.Series からスカラー値への集計を定義し、各 pandas.Series はグループまたはウィンドウ内の列を表します。
注意:このタイプの UDF は部分集計をサポートしておらず、グループまたはウィンドウのすべてのデータがメモリにロードされます。また、現在、Grouped aggregate Pandas UDF では、アンバウンドウィンドウのみがサポートされています。次の例は、このタイプの UDF を使用して、グループ化およびウィンドウ操作で平均を計算する方法を示しています。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double") # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
詳細な使用方法については、pandas_udf() を参照してください。
Pandas Function APIs#
Pandas Function APIs は、Pandas インスタンスを使用して、Python ネイティブ関数を DataFrame 全体に直接適用できます。内部的には、Arrow を使用してデータを転送し、Pandas を使用してデータを操作するという Pandas UDF と同様の仕組みで動作し、ベクトル化された操作を可能にします。ただし、Pandas Function API は、Column ではなく、PySpark DataFrame の通常の API として動作し、Pandas Functions API の Python 型ヒントはオプションであり、現時点では内部の動作に影響しませんが、将来的には必要になる場合があります。
Spark 3.0 以降、grouped map pandas udf は、DataFrame.groupby().applyInPandas() という個別の Pandas Function API として分類されるようになりました。pyspark.sql.functions.PandasUDFType および DataFrame.groupby().apply() で引き続き使用することは可能ですが、DataFrame.groupby().applyInPandas() を直接使用することが推奨されます。pyspark.sql.functions.PandasUDFType の使用は、将来的に非推奨になります。
Grouped Map#
Pandas インスタンスを使用した Grouped map 操作は、DataFrame.groupby().applyInPandas() によってサポートされます。この操作は、pandas.DataFrame を受け取り、別の pandas.DataFrame を返す Python 関数を必要とします。各グループを Python 関数内の各 pandas.DataFrame にマップします。
この API は、「split-apply-combine」パターンを実装しており、次の 3 つのステップで構成されます。
DataFrame.groupBy()を使用してデータをグループに分割します。各グループに関数を適用します。関数の入力と出力は両方とも
pandas.DataFrameです。入力データには、各グループのすべての行と列が含まれます。結果を新しい PySpark DataFrame に結合します。
DataFrame.groupBy().applyInPandas() を使用するには、ユーザーは次のものを定義する必要があります。
各グループの計算を定義する Python 関数。
出力 PySpark DataFrame のスキーマを定義する
StructTypeオブジェクトまたは文字列。
返される pandas.DataFrame の列ラベルは、文字列として指定されている場合は定義された出力スキーマのフィールド名と一致するか、文字列でない場合(例: 整数インデックス)は位置によってフィールドデータ型と一致する必要があります。 pandas.DataFrame の構築時に列をラベル付けする方法については、pandas.DataFrame を参照してください。
注意:グループのすべてのデータは、関数が適用される前にメモリにロードされます。これは、特にグループサイズが偏っている場合、メモリ不足例外につながる可能性があります。maxRecordsPerBatch の設定はグループには適用されず、グループ化されたデータが利用可能なメモリに収まることを保証するのはユーザーの責任です。
次の例は、DataFrame.groupby().applyInPandas() を使用して、グループ内の各値から平均を減算する方法を示しています。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
詳細な使用方法については、GroupedData.applyInPandas() を参照してください。
Map#
DataFrame.mapInPandas() は、Pandas インスタンスを使用した Map 操作をサポートします。これは、pandas.DataFrame のイテレータを、現在の PySpark DataFrame を表す別の pandas.DataFrame のイテレータにマップし、結果を PySpark DataFrame として返します。関数は pandas.DataFrame のイテレータを受け取り、出力します。一部の Pandas UDF とは異なり、任意の長さの出力を返すことができますが、内部的には Series to Series Pandas UDF と同様に機能します。
次の例は、DataFrame.mapInPandas() の使用方法を示しています。
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
詳細な使用方法については、DataFrame.mapInPandas() を参照してください。
Co-grouped Map#
DataFrame.groupby().cogroup().applyInPandas() は、Pandas インスタンスを使用した Co-grouped map 操作をサポートします。これにより、2 つの PySpark DataFrame が共通のキーでコグループ化され、各コグループに Python 関数が適用されます。次のステップで構成されます。
各データフレームのグループがキーを共有するようにデータをシャッフルし、コグループ化します。
各コグループに関数を適用します。関数の入力は 2 つの
pandas.DataFrame(オプションでキーを表すタプルを含む)です。関数の出力はpandas.DataFrameです。すべてのグループからの
pandas.DataFrameを新しい PySpark DataFrame に結合します。
groupBy().cogroup().applyInPandas() を使用するには、ユーザーは次のものを定義する必要があります。
各コグループの計算を定義する Python 関数。
出力 PySpark DataFrame のスキーマを定義する
StructTypeオブジェクトまたは文字列。
返される pandas.DataFrame の列ラベルは、文字列として指定されている場合は定義された出力スキーマのフィールド名と一致するか、文字列でない場合(例: 整数インデックス)は位置によってフィールドデータ型と一致する必要があります。 pandas.DataFrame の構築時に列をラベル付けする方法については、pandas.DataFrame を参照してください。
注意:コグループのすべてのデータは、関数が適用される前にメモリにロードされます。これは、特にグループサイズが偏っている場合、メモリ不足例外につながる可能性があります。maxRecordsPerBatch の設定は適用されず、コグループ化されたデータが利用可能なメモリに収まることを保証するのはユーザーの責任です。
次の例は、DataFrame.groupby().cogroup().applyInPandas() を使用して、2 つのデータセット間で ASOF 結合を実行する方法を示しています。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_ordered(left, right)
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# | time| id| v1| v2|
# +--------+---+---+----+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0|null|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0|null|
# +--------+---+---+----+
詳細な使用方法については、PandasCogroupedOps.applyInPandas() を参照してください。
Arrow Python UDFs#
Arrow Python UDF は、行ごとに実行されるユーザー定義関数であり、Arrow を使用して効率的なバッチデータ転送とシリアライゼーションを利用します。Arrow Python UDF を定義するには、udf() デコレーターを使用するか、関数を udf() メソッドでラップし、useArrow パラメータが True に設定されていることを確認します。さらに、Spark 設定 spark.sql.execution.pythonUDF.arrow.enabled を true に設定することで、SparkSession 全体で Python UDF の Arrow 最適化を有効にできます。useArrow が未設定または None の場合にのみ Spark 設定が有効になることに注意してください。
Arrow Python UDF の型ヒントは、デフォルトのピクル化された Python UDF と同じ方法で指定する必要があります。
デフォルトのピクル化された Python UDF と Arrow Python UDF の両方の使用方法を示す例を次に示します。
from pyspark.sql.functions import udf
@udf(returnType='int') # A default, pickled Python UDF
def slen(s): # type: ignore[no-untyped-def]
return len(s)
@udf(returnType='int', useArrow=True) # An Arrow Python UDF
def arrow_slen(s): # type: ignore[no-untyped-def]
return len(s)
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# | 8| 8|
# +----------+----------------+
デフォルトのピクル化された Python UDF と比較して、Arrow Python UDF はより一貫した型変換メカニズムを提供します。UDF の型変換は、UDF によって返される Python インスタンスがユーザー指定の戻り型と一致しない場合に問題が発生します。デフォルトのピクル化された Python UDF の型変換には、型ミスマッチのフォールバックとして None に依存するなど、いくつかの制限があり、曖昧さやデータ損失につながる可能性があります。さらに、日付、時刻、およびタプルを文字列に変換すると、曖昧な結果が得られる場合があります。一方、Arrow Python UDF は、Arrow の機能を利用して型変換を標準化し、これらの問題を効果的に解決します。
使用上の注意#
サポートされている SQL 型#
現在、TimestampType の ArrayType を除く、すべての Spark SQL データ型が Arrow ベースの変換でサポートされています。MapType およびネストされた StructType の ArrayType は、PyArrow 2.0.0 以降でのみサポートされています。
Arrow バッチサイズの 설정#
Spark のデータパーティションは Arrow レコードバッチに変換され、JVM で一時的に高いメモリ使用量につながる可能性があります。メモリ不足例外を回避するために、conf spark.sql.execution.arrow.maxRecordsPerBatch を、各バッチの最大行数を決定する整数に設定することで、Arrow レコードバッチのサイズを調整できます。デフォルト値は 10,000 レコード/バッチです。列数が多い場合は、値に応じて調整する必要があります。この制限を使用すると、各データパーティションは処理のために 1 つ以上のレコードバッチになります。
タイムスタンプとタイムゾーンのセマンティクス#
Spark は内部的にタイムスタンプを UTC 値として保存し、タイムゾーンが指定されていないタイムスタンプデータは、マイクロ秒解像度でローカル時間から UTC に変換されます。Spark でタイムスタンプデータをエクスポートまたは表示する場合、セッションタイムゾーンを使用してタイムスタンプ値をローカライズします。セッションタイムゾーンは、設定 spark.sql.session.timeZone で設定され、設定されていない場合は JVM システムのローカルタイムゾーンにデフォルト設定されます。Pandas は、ナノ秒解像度で datetime64 型(datetime64[ns])を使用し、列ごとにオプションでタイムゾーンを設定できます。
タイムスタンプデータが Spark から Pandas に転送される場合、ナノ秒に変換され、各列は Spark セッションタイムゾーンに変換され、そのタイムゾーンにローカライズされるため、タイムゾーンは削除され、値はローカル時間として表示されます。これは、DataFrame.toPandas() またはタイムスタンプ列を持つ pandas_udf を呼び出すときに発生します。
タイムスタンプデータが Spark から PyArrow Table に転送される場合、UTC タイムゾーンでマイクロ秒解像度を維持します。これは、タイムスタンプ列で DataFrame.toArrow() を呼び出すときに発生します。
Pandas または PyArrow から Spark にタイムスタンプデータが転送される場合、UTC マイクロ秒に変換されます。これは、Pandas DataFrame または PyArrow Table で SparkSession.createDataFrame() を呼び出す場合、または pandas_udf からタイムスタンプを返す場合に発生します。これらの変換は、Spark が期待される形式でデータを持つことを保証するために自動的に行われるため、これらの変換を自分で行う必要はありません。ナノ秒の値は切り捨てられます。
注意:標準の UDF(非 Pandas)は、タイムスタンプデータを Python datetime オブジェクトとしてロードします。これは Pandas タイムスタンプとは異なります。最適なパフォーマンスを得るために、pandas_udf でタイムスタンプを扱う場合は、Pandas の時系列機能を使用することをお勧めします。詳細については、こちらを参照してください。
推奨される Pandas および PyArrow のバージョン#
pyspark.sql との連携には、Pandas の最小サポートバージョンは 2.0.0、PyArrow は 11.0.0 です。これらより高いバージョンも使用できますが、互換性とデータの正確性は保証されず、ユーザーが検証する必要があります。
メモリ節約のための Arrow self_destruct の設定#
Spark 3.2 以降、Spark 設定 spark.sql.execution.arrow.pyspark.selfDestruct.enabled を使用して、PyArrow の self_destruct 機能を有効にできます。これにより、toPandas を介して Pandas DataFrame を作成する際に、Pandas DataFrame を構築しながら Arrow が割り当てたメモリを解放することでメモリを節約できます。このオプションは、toArrow を介して PyArrow Table を作成する際にもメモリを節約できます。このオプションは実験的です。toPandas と一緒に使用すると、不変のバッキング配列のために、結果の Pandas DataFrame に対する一部の操作が失敗する可能性があります。通常、ValueError: buffer source array is read-only というエラーが表示されます。新しいバージョンの Pandas は、このようなケースのサポートを改善することで、これらのエラーを修正する可能性があります。このエラーは、事前に列をコピーすることで回避できます。さらに、この変換はシングルスレッドであるため、遅くなる可能性があります。