PySpark における Apache Arrow#

Apache Arrow は、Spark で JVM と Python プロセスの間で効率的にデータを転送するために使用されるインメモリ列指向データフォーマットです。これは現在、Pandas/NumPy データを扱う Python ユーザーにとって最も有益です。その使用は自動ではなく、最大限の活用と互換性の確保のために、設定やコードにいくつかの軽微な変更が必要になる場合があります。このガイドでは、Spark で Arrow を使用する方法の概要と、Arrow 対応データで作業する際の相違点について説明します。

PyArrow のインストールを確認する#

PySpark で Apache Arrow を使用するには、推奨される PyArrow のバージョン をインストールする必要があります。pip を使用して PySpark をインストールする場合、SQL モジュールの追加依存関係として PyArrow を pip install pyspark[sql] コマンドで導入できます。それ以外の場合は、すべてのクラスタノードに PyArrow がインストールされて利用可能であることを確認する必要があります。pip または conda-forge チャンネルから conda を使用してインストールできます。詳細については、PyArrow のインストールを参照してください。

Arrow テーブルへの/からの変換#

Spark 4.0 以降では、SparkSession.createDataFrame() を使用して PyArrow Table から Spark DataFrame を作成できます。また、DataFrame.toArrow() を使用して Spark DataFrame を PyArrow Table に変換できます。

import pyarrow as pa
import numpy as np

# Create a PyArrow Table
table = pa.table([pa.array(np.random.rand(100)) for i in range(3)], names=["a", "b", "c"])

# Create a Spark DataFrame from the PyArrow Table
df = spark.createDataFrame(table)

# Convert the Spark DataFrame to a PyArrow Table
result_table = df.select("*").toArrow()

print(result_table.schema)
# a: double
# b: double
# c: double

注意:DataFrame.toArrow() は、DataFrame のすべてのレコードをドライバプログラムに収集する結果となるため、データの小さなサブセットに対してのみ実行することをお勧めします。現在、すべての Spark および Arrow のデータ型がサポートされているわけではなく、列にサポートされていない型が含まれている場合はエラーが発生する可能性があります。

Pandas との変換の有効化#

Arrow は、DataFrame.toPandas() の呼び出しを使用して Spark DataFrame を Pandas DataFrame に変換する場合、および SparkSession.createDataFrame() を使用して Pandas DataFrame から Spark DataFrame を作成する場合の最適化として利用できます。これらの呼び出しを実行する際に Arrow を使用するには、まず Spark 設定 spark.sql.execution.arrow.pyspark.enabledtrue に設定する必要があります。これはデフォルトでは無効になっています。

さらに、spark.sql.execution.arrow.pyspark.enabled によって有効化された最適化は、Spark 内の実際の計算の前にエラーが発生した場合、自動的に非 Arrow 最適化実装にフォールバックする可能性があります。これは spark.sql.execution.arrow.pyspark.fallback.enabled によって制御できます。

import numpy as np
import pandas as pd

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()

print("Pandas DataFrame result statistics:\n%s\n" % str(result_pdf.describe()))

上記の最適化を Arrow とともに使用すると、Arrow が有効になっていない場合と同じ結果が得られます。

注意:Arrow を使用しても、DataFrame.toPandas() は DataFrame のすべてのレコードをドライバプログラムに収集する結果となるため、データの小さなサブセットに対してのみ実行することをお勧めします。現在、すべての Spark データ型がサポートされているわけではなく、列にサポートされていない型が含まれている場合はエラーが発生する可能性があります。SparkSession.createDataFrame() 中にエラーが発生した場合、Spark は Arrow なしで DataFrame を作成するようにフォールバックします。

Pandas UDF (別名: Vectorized UDF)#

Pandas UDF は、Spark が Arrow を使用してデータを転送し、Pandas を使用してデータで操作を行うことで実行されるユーザー定義関数であり、ベクトル化された操作を可能にします。pandas_udf() をデコレーターとして使用するか、関数をラップすることで Pandas UDF を定義でき、追加の設定は不要です。Pandas UDF は、一般的に通常の PySpark 関数 API として動作します。

Spark 3.0 より前は、Pandas UDF は pyspark.sql.functions.PandasUDFType で定義されていました。Spark 3.0 以降、Python 3.6+ では、Python の型ヒントも使用できます。Python の型ヒントの使用が推奨されており、pyspark.sql.functions.PandasUDFType は将来のリリースで非推奨になります。

型ヒントはすべての場合で pandas.Series を使用する必要がありますが、入力または出力列が StructType である場合に、その入力または出力型ヒントとして pandas.DataFrame を使用する必要があるバリアントが 1 つあります。次の例は、long 列、string 列、および struct 列を受け取り、struct 列を出力する Pandas UDF を示しています。この UDF では、次のように型ヒントに pandas.Seriespandas.DataFrame を指定する必要があります。

import pandas as pd

from pyspark.sql.functions import pandas_udf

@pandas_udf("col1 string, col2 long")  # type: ignore[call-overload]
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame:
    s3['col2'] = s1 + s2.str.len()
    return s3

# Create a Spark DataFrame that has three columns including a struct column.
df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

df.printSchema()
# root
# |-- long_column: long (nullable = true)
# |-- string_column: string (nullable = true)
# |-- struct_column: struct (nullable = true)
# |    |-- col1: string (nullable = true)

df.select(func("long_col", "string_col", "struct_col")).printSchema()
# |-- func(long_col, string_col, struct_col): struct (nullable = true)
# |    |-- col1: string (nullable = true)
# |    |-- col2: long (nullable = true)

以降のセクションでは、サポートされている型ヒントの組み合わせについて説明します。簡単にするために、pandas.DataFrame バリアントは省略します。

Series から Series へ#

型ヒントは、pandas.Series, … -> pandas.Series として表現できます。

上記の型ヒントを持つ関数で pandas_udf() を使用すると、指定された関数が 1 つ以上の pandas.Series を受け取り、1 つの pandas.Series を出力する Pandas UDF が作成されます。関数の出力は、常に少なくとも入力と同じ長さである必要があります。内部的には、PySpark は列をバッチに分割し、各バッチに対して関数をデータのサブセットとして呼び出し、結果を連結することで Pandas UDF を実行します。

次の例は、2 つの列の積を計算するこの Pandas UDF の作成方法を示しています。

import pandas as pd

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())  # type: ignore[call-overload]

# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

詳細な使用方法については、pandas_udf() を参照してください。

Series のイテレータから Series のイテレータへ#

型ヒントは、Iterator[pandas.Series] -> Iterator[pandas.Series] として表現できます。

上記の型ヒントを持つ関数で pandas_udf() を使用すると、指定された関数が pandas.Series のイテレータを受け取り、pandas.Series のイテレータを出力する Pandas UDF が作成されます。関数からの全体の出力長は、全体の入力長と同じである必要があります。したがって、長さが同じ限り、入力イテレータからデータをプリフェッチできます。この場合、作成された Pandas UDF は、Pandas UDF が呼び出されたときに 1 つの入力列を必要とします。複数の入力列を使用するには、別の型ヒントが必要です。Iterator of Multiple Series to Iterator of Series を参照してください。

また、UDF の実行で状態を初期化する必要がある場合に便利ですが、内部的には Series to Series のケースと同一に機能します。疑似コードは例を示しています。

@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Do some expensive initialization with a state
    state = very_expensive_initialization()
    for x in iterator:
        # Use that state for the whole iterator.
        yield calculate_with_state(x, state)

df.select(calculate("value")).show()

次の例は、この Pandas UDF の作成方法を示しています。

from typing import Iterator

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in iterator:
        yield x + 1

df.select(plus_one("x")).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

詳細な使用方法については、pandas_udf() を参照してください。

複数の Series のイテレータから Series のイテレータへ#

型ヒントは、Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series] として表現できます。

pandas_udf() を上記の型ヒントを持つ関数で使用すると、指定された関数が複数の pandas.Series のタプルのイテレータを受け取り、pandas.Series のイテレータを出力する Pandas UDF が作成されます。この場合、作成された Pandas UDF は、Pandas UDF が呼び出されたときにタプルのシリーズと同じ数の複数の入力列を必要とします。それ以外の場合は、Iterator of Series to Iterator of Series のケースと同じ特性と制限があります。

次の例は、この Pandas UDF の作成方法を示しています。

from typing import Iterator, Tuple

import pandas as pd

from pyspark.sql.functions import pandas_udf

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# Declare the function and create the UDF
@pandas_udf("long")  # type: ignore[call-overload]
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

詳細な使用方法については、pandas_udf() を参照してください。

Series からスカラーへ#

型ヒントは、pandas.Series, … -> Any として表現できます。

上記の型ヒントを持つ関数で pandas_udf() を使用すると、PySpark の集計関数に似た Pandas UDF が作成されます。指定された関数は pandas.Series を受け取り、スカラー値を返します。戻り値の型はプリミティブなデータ型である必要があり、返されるスカラーは Python のプリミティブ型(例: int または float)または NumPy のデータ型(例: numpy.int64 または numpy.float64)のいずれかです。Any は、理想的には対応する特定のスカラー型であるべきです。

この UDF は、GroupedData.agg() および Window とともに使用することもできます。これは、1 つ以上の pandas.Series からスカラー値への集計を定義し、各 pandas.Series はグループまたはウィンドウ内の列を表します。

注意:このタイプの UDF は部分集計をサポートしておらず、グループまたはウィンドウのすべてのデータがメモリにロードされます。また、現在、Grouped aggregate Pandas UDF では、アンバウンドウィンドウのみがサポートされています。次の例は、このタイプの UDF を使用して、グループ化およびウィンドウ操作で平均を計算する方法を示しています。

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")  # type: ignore[call-overload]
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

詳細な使用方法については、pandas_udf() を参照してください。

Pandas Function APIs#

Pandas Function APIs は、Pandas インスタンスを使用して、Python ネイティブ関数を DataFrame 全体に直接適用できます。内部的には、Arrow を使用してデータを転送し、Pandas を使用してデータを操作するという Pandas UDF と同様の仕組みで動作し、ベクトル化された操作を可能にします。ただし、Pandas Function API は、Column ではなく、PySpark DataFrame の通常の API として動作し、Pandas Functions API の Python 型ヒントはオプションであり、現時点では内部の動作に影響しませんが、将来的には必要になる場合があります。

Spark 3.0 以降、grouped map pandas udf は、DataFrame.groupby().applyInPandas() という個別の Pandas Function API として分類されるようになりました。pyspark.sql.functions.PandasUDFType および DataFrame.groupby().apply() で引き続き使用することは可能ですが、DataFrame.groupby().applyInPandas() を直接使用することが推奨されます。pyspark.sql.functions.PandasUDFType の使用は、将来的に非推奨になります。

Grouped Map#

Pandas インスタンスを使用した Grouped map 操作は、DataFrame.groupby().applyInPandas() によってサポートされます。この操作は、pandas.DataFrame を受け取り、別の pandas.DataFrame を返す Python 関数を必要とします。各グループを Python 関数内の各 pandas.DataFrame にマップします。

この API は、「split-apply-combine」パターンを実装しており、次の 3 つのステップで構成されます。

  • DataFrame.groupBy() を使用してデータをグループに分割します。

  • 各グループに関数を適用します。関数の入力と出力は両方とも pandas.DataFrame です。入力データには、各グループのすべての行と列が含まれます。

  • 結果を新しい PySpark DataFrame に結合します。

DataFrame.groupBy().applyInPandas() を使用するには、ユーザーは次のものを定義する必要があります。

  • 各グループの計算を定義する Python 関数。

  • 出力 PySpark DataFrame のスキーマを定義する StructType オブジェクトまたは文字列。

返される pandas.DataFrame の列ラベルは、文字列として指定されている場合は定義された出力スキーマのフィールド名と一致するか、文字列でない場合(例: 整数インデックス)は位置によってフィールドデータ型と一致する必要があります。 pandas.DataFrame の構築時に列をラベル付けする方法については、pandas.DataFrame を参照してください。

注意:グループのすべてのデータは、関数が適用される前にメモリにロードされます。これは、特にグループサイズが偏っている場合、メモリ不足例外につながる可能性があります。maxRecordsPerBatch の設定はグループには適用されず、グループ化されたデータが利用可能なメモリに収まることを保証するのはユーザーの責任です。

次の例は、DataFrame.groupby().applyInPandas() を使用して、グループ内の各値から平均を減算する方法を示しています。

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+

詳細な使用方法については、GroupedData.applyInPandas() を参照してください。

Map#

DataFrame.mapInPandas() は、Pandas インスタンスを使用した Map 操作をサポートします。これは、pandas.DataFrame のイテレータを、現在の PySpark DataFrame を表す別の pandas.DataFrame のイテレータにマップし、結果を PySpark DataFrame として返します。関数は pandas.DataFrame のイテレータを受け取り、出力します。一部の Pandas UDF とは異なり、任意の長さの出力を返すことができますが、内部的には Series to Series Pandas UDF と同様に機能します。

次の例は、DataFrame.mapInPandas() の使用方法を示しています。

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def filter_func(iterator: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+

詳細な使用方法については、DataFrame.mapInPandas() を参照してください。

Co-grouped Map#

DataFrame.groupby().cogroup().applyInPandas() は、Pandas インスタンスを使用した Co-grouped map 操作をサポートします。これにより、2 つの PySpark DataFrame が共通のキーでコグループ化され、各コグループに Python 関数が適用されます。次のステップで構成されます。

  • 各データフレームのグループがキーを共有するようにデータをシャッフルし、コグループ化します。

  • 各コグループに関数を適用します。関数の入力は 2 つの pandas.DataFrame(オプションでキーを表すタプルを含む)です。関数の出力は pandas.DataFrame です。

  • すべてのグループからの pandas.DataFrame を新しい PySpark DataFrame に結合します。

groupBy().cogroup().applyInPandas() を使用するには、ユーザーは次のものを定義する必要があります。

  • 各コグループの計算を定義する Python 関数。

  • 出力 PySpark DataFrame のスキーマを定義する StructType オブジェクトまたは文字列。

返される pandas.DataFrame の列ラベルは、文字列として指定されている場合は定義された出力スキーマのフィールド名と一致するか、文字列でない場合(例: 整数インデックス)は位置によってフィールドデータ型と一致する必要があります。 pandas.DataFrame の構築時に列をラベル付けする方法については、pandas.DataFrame を参照してください。

注意:コグループのすべてのデータは、関数が適用される前にメモリにロードされます。これは、特にグループサイズが偏っている場合、メモリ不足例外につながる可能性があります。maxRecordsPerBatch の設定は適用されず、コグループ化されたデータが利用可能なメモリに収まることを保証するのはユーザーの責任です。

次の例は、DataFrame.groupby().cogroup().applyInPandas() を使用して、2 つのデータセット間で ASOF 結合を実行する方法を示しています。

import pandas as pd

df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))

df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))

def merge_ordered(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_ordered(left, right)

df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    merge_ordered, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+----+
# |    time| id| v1|  v2|
# +--------+---+---+----+
# |20000101|  1|1.0|   x|
# |20000102|  1|3.0|null|
# |20000101|  2|2.0|   y|
# |20000102|  2|4.0|null|
# +--------+---+---+----+

詳細な使用方法については、PandasCogroupedOps.applyInPandas() を参照してください。

Arrow Python UDFs#

Arrow Python UDF は、行ごとに実行されるユーザー定義関数であり、Arrow を使用して効率的なバッチデータ転送とシリアライゼーションを利用します。Arrow Python UDF を定義するには、udf() デコレーターを使用するか、関数を udf() メソッドでラップし、useArrow パラメータが True に設定されていることを確認します。さらに、Spark 設定 spark.sql.execution.pythonUDF.arrow.enabled を true に設定することで、SparkSession 全体で Python UDF の Arrow 最適化を有効にできます。useArrow が未設定または None の場合にのみ Spark 設定が有効になることに注意してください。

Arrow Python UDF の型ヒントは、デフォルトのピクル化された Python UDF と同じ方法で指定する必要があります。

デフォルトのピクル化された Python UDF と Arrow Python UDF の両方の使用方法を示す例を次に示します。

from pyspark.sql.functions import udf

@udf(returnType='int')  # A default, pickled Python UDF
def slen(s):  # type: ignore[no-untyped-def]
    return len(s)

@udf(returnType='int', useArrow=True)  # An Arrow Python UDF
def arrow_slen(s):  # type: ignore[no-untyped-def]
    return len(s)

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))

df.select(slen("name"), arrow_slen("name")).show()
# +----------+----------------+
# |slen(name)|arrow_slen(name)|
# +----------+----------------+
# |         8|               8|
# +----------+----------------+

デフォルトのピクル化された Python UDF と比較して、Arrow Python UDF はより一貫した型変換メカニズムを提供します。UDF の型変換は、UDF によって返される Python インスタンスがユーザー指定の戻り型と一致しない場合に問題が発生します。デフォルトのピクル化された Python UDF の型変換には、型ミスマッチのフォールバックとして None に依存するなど、いくつかの制限があり、曖昧さやデータ損失につながる可能性があります。さらに、日付、時刻、およびタプルを文字列に変換すると、曖昧な結果が得られる場合があります。一方、Arrow Python UDF は、Arrow の機能を利用して型変換を標準化し、これらの問題を効果的に解決します。

使用上の注意#

サポートされている SQL 型#

現在、TimestampTypeArrayType を除く、すべての Spark SQL データ型が Arrow ベースの変換でサポートされています。MapType およびネストされた StructTypeArrayType は、PyArrow 2.0.0 以降でのみサポートされています。

Arrow バッチサイズの 설정#

Spark のデータパーティションは Arrow レコードバッチに変換され、JVM で一時的に高いメモリ使用量につながる可能性があります。メモリ不足例外を回避するために、conf spark.sql.execution.arrow.maxRecordsPerBatch を、各バッチの最大行数を決定する整数に設定することで、Arrow レコードバッチのサイズを調整できます。デフォルト値は 10,000 レコード/バッチです。列数が多い場合は、値に応じて調整する必要があります。この制限を使用すると、各データパーティションは処理のために 1 つ以上のレコードバッチになります。

タイムスタンプとタイムゾーンのセマンティクス#

Spark は内部的にタイムスタンプを UTC 値として保存し、タイムゾーンが指定されていないタイムスタンプデータは、マイクロ秒解像度でローカル時間から UTC に変換されます。Spark でタイムスタンプデータをエクスポートまたは表示する場合、セッションタイムゾーンを使用してタイムスタンプ値をローカライズします。セッションタイムゾーンは、設定 spark.sql.session.timeZone で設定され、設定されていない場合は JVM システムのローカルタイムゾーンにデフォルト設定されます。Pandas は、ナノ秒解像度で datetime64 型(datetime64[ns])を使用し、列ごとにオプションでタイムゾーンを設定できます。

タイムスタンプデータが Spark から Pandas に転送される場合、ナノ秒に変換され、各列は Spark セッションタイムゾーンに変換され、そのタイムゾーンにローカライズされるため、タイムゾーンは削除され、値はローカル時間として表示されます。これは、DataFrame.toPandas() またはタイムスタンプ列を持つ pandas_udf を呼び出すときに発生します。

タイムスタンプデータが Spark から PyArrow Table に転送される場合、UTC タイムゾーンでマイクロ秒解像度を維持します。これは、タイムスタンプ列で DataFrame.toArrow() を呼び出すときに発生します。

Pandas または PyArrow から Spark にタイムスタンプデータが転送される場合、UTC マイクロ秒に変換されます。これは、Pandas DataFrame または PyArrow Table で SparkSession.createDataFrame() を呼び出す場合、または pandas_udf からタイムスタンプを返す場合に発生します。これらの変換は、Spark が期待される形式でデータを持つことを保証するために自動的に行われるため、これらの変換を自分で行う必要はありません。ナノ秒の値は切り捨てられます。

注意:標準の UDF(非 Pandas)は、タイムスタンプデータを Python datetime オブジェクトとしてロードします。これは Pandas タイムスタンプとは異なります。最適なパフォーマンスを得るために、pandas_udf でタイムスタンプを扱う場合は、Pandas の時系列機能を使用することをお勧めします。詳細については、こちらを参照してください。

メモリ節約のための Arrow self_destruct の設定#

Spark 3.2 以降、Spark 設定 spark.sql.execution.arrow.pyspark.selfDestruct.enabled を使用して、PyArrow の self_destruct 機能を有効にできます。これにより、toPandas を介して Pandas DataFrame を作成する際に、Pandas DataFrame を構築しながら Arrow が割り当てたメモリを解放することでメモリを節約できます。このオプションは、toArrow を介して PyArrow Table を作成する際にもメモリを節約できます。このオプションは実験的です。toPandas と一緒に使用すると、不変のバッキング配列のために、結果の Pandas DataFrame に対する一部の操作が失敗する可能性があります。通常、ValueError: buffer source array is read-only というエラーが表示されます。新しいバージョンの Pandas は、このようなケースのサポートを改善することで、これらのエラーを修正する可能性があります。このエラーは、事前に列をコピーすることで回避できます。さらに、この変換はシングルスレッドであるため、遅くなる可能性があります。