第5章:UDFとUDTFの活用#

大規模データ処理では、Sparkのネイティブ機能を拡張するためにカスタマイズが必要となることがよくあります。Pythonユーザー定義関数 (UDF)ユーザー定義テーブル関数 (UDTF) は、Pythonを使用して複雑な変換や計算を実行し、それらをSparkの分散環境にシームレスに統合する手段を提供します。

このセクションでは、Sparkの組み込み関数を超える複雑なデータ変換を実行するために、PySparkを活用してPythonでUDFとUDTFを作成し、使用する方法を探ります。

Python UDF#

Python UDFのカテゴリ#

PySparkでサポートされているUDFには、主に2つのカテゴリがあります。スカラPython UDFとPandas UDFです。

  • スカラPython UDFは、pickleまたはArrowによってシリアライズ/デシリアライズされたPythonオブジェクトを入力または出力とし、一度に1行を処理するユーザー定義のスカラー関数です。

  • Pandas UDF(別名ベクトル化UDF)は、Apache Arrowによってシリアライズ/デシリアライズされたpandas SeriesまたはDataFrameを入力/出力とし、ブロック単位で処理するUDFです。Pandas UDFには、使用方法によっていくつかのバリエーションがあり、特定の入力と出力の型があります:SeriesからSeries、SeriesからScalar、IteratorからIterator。

Pandas UDFの実装に基づき、Pandas Function APIもあります:Map(すなわちmapInPandas)と(Co)Grouped Map(すなわちapplyInPandas)、さらにArrow Function APIであるmapInArrowです。

スカラPython UDFを作成するには#

以下のコードでは、単純なスカラPython UDFを作成しました。

[6]:
from pyspark.sql.functions import udf

@udf(returnType='int')
def slen(s: str):
    return len(s)

Arrow最適化#

スカラPython UDFは、シリアライズおよびデシリアライズのためにcloudpickleに依存しており、特に大量のデータ入出力の処理においてパフォーマンスのボトルネックが発生します。パフォーマンスを大幅に向上させるために、Arrow最適化されたPython UDFを導入しました。

この最適化の中核は、標準化されたクロス言語のインメモリ列指向データ表現であるApache Arrowです。Arrowを活用することで、これらのUDFは従来の遅いデータ(デ)シリアライズ方法を回避し、JVMとPythonプロセス間の迅速なデータ交換を可能にします。Apache Arrowのリッチな型システムにより、これらの最適化されたUDFは、型変換の処理において、より一貫性があり標準化された方法を提供します。

functions.udfのブール型パラメータuseArrowを使用することで、個々のUDFに対してArrow最適化を有効にするかどうかを制御できます。例を以下に示します。

from pyspark.sql.functions import udf

@udf(returnType='int', useArrow=True)  # An Arrow Python UDF
def arrow_slen(s: str):
    ...

さらに、Spark設定spark.sql.execution.pythonUDF.arrow.enabledを介して、SparkSession全体のすべてのUDFに対してArrow最適化を有効にすることもできます。以下に例を示します。

spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)

@udf(returnType='int')  # An Arrow Python UDF
def arrow_slen(s: str):
    ...

スカラPython UDFを使用するには#

Pythonでは、組み込みSpark関数のように、列に対してUDFを直接呼び出すことができます。以下に例を示します。

[7]:
data = [("Alice",), ("Bob",), ("Charlie",)]
df = spark.createDataFrame(data, ["name"])
df.withColumn("name_length", slen(df["name"])).show()
+-------+-----------+
|   name|name_length|
+-------+-----------+
|  Alice|          5|
|    Bob|          3|
|Charlie|          7|
+-------+-----------+

Pandas UDFを作成するには#

以下のコードでは、1つのpandas.Seriesを入力とし、1つのpandas.Seriesを出力するPandas UDFを作成しました。

[8]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
    return s.str.upper()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()

+--------------+
|to_upper(name)|
+--------------+
|      JOHN DOE|
+--------------+

Pandas UDFを使用するには#

スカラPython UDFと同様に、pandas UDFも列に対して直接呼び出すことができます。

[9]:
data = [("Alice",), ("Bob",), ("Charlie",)]
df = spark.createDataFrame(data, ["name"])
df.withColumn("name_length", to_upper(df["name"])).show()
+-------+-----------+
|   name|name_length|
+-------+-----------+
|  Alice|      ALICE|
|    Bob|        BOB|
|Charlie|    CHARLIE|
+-------+-----------+

その他の例#

例1:文字列とリストの列を持つDataFrameを処理するPython UDF#

[10]:
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import udf

data = [
    ("Hello World", [1, 2, 3]),
    ("PySpark is Fun", [4, 5, 6]),
    ("PySpark Rocks", [7, 8, 9])
]
df = spark.createDataFrame(data, ["text_column", "list_column"])

@udf(returnType="string")
def process_row(text: str, numbers):
    vowels_count = sum(1 for char in text if char in "aeiouAEIOU")
    doubled = [x * 2 for x in numbers]
    return f"Vowels: {vowels_count}, Doubled: {doubled}"

df.withColumn("process_row", process_row(df["text_column"], df["list_column"])).show(truncate=False)
+--------------+-----------+--------------------------------+
|text_column   |list_column|process_row                     |
+--------------+-----------+--------------------------------+
|Hello World   |[1, 2, 3]  |Vowels: 3, Doubled: [2, 4, 6]   |
|PySpark is Fun|[4, 5, 6]  |Vowels: 3, Doubled: [8, 10, 12] |
|PySpark Rocks |[7, 8, 9]  |Vowels: 2, Doubled: [14, 16, 18]|
+--------------+-----------+--------------------------------+

例2:統計計算と複雑な変換のためのPandas UDF#

[11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
import pandas as pd

data = [
    (10.0, "Spark"),
    (20.0, "Big Data"),
    (30.0, "AI"),
    (40.0, "Machine Learning"),
    (50.0, "Deep Learning")
]
df = spark.createDataFrame(data, ["numeric_column", "text_column"])

# Schema for the result
schema = StructType([
    StructField("mean_value", DoubleType(), True),
    StructField("sum_value", DoubleType(), True),
    StructField("processed_text", StringType(), True)
])

@pandas_udf(schema)
def compute_stats_and_transform_string(numeric_col: pd.Series, text_col: pd.Series) -> pd.DataFrame:
    mean_value = numeric_col.mean()
    sum_value = numeric_col.sum()

    # Reverse the string if its length is greater than 5, otherwise capitalize it
    processed_text = text_col.apply(lambda x: x[::-1] if len(x) > 5 else x.upper())

    result_df = pd.DataFrame({
        "mean_value": [mean_value] * len(text_col),
        "sum_value": [sum_value] * len(text_col),
        "processed_text": processed_text
    })

    return result_df

df.withColumn("result", compute_stats_and_transform_string(df["numeric_column"], df["text_column"])).show(truncate=False)
+--------------+----------------+------------------------------+
|numeric_column|text_column     |result                        |
+--------------+----------------+------------------------------+
|10.0          |Spark           |{10.0, 10.0, SPARK}           |
|20.0          |Big Data        |{20.0, 20.0, ataD giB}        |
|30.0          |AI              |{30.0, 30.0, AI}              |
|40.0          |Machine Learning|{40.0, 40.0, gninraeL enihcaM}|
|50.0          |Deep Learning   |{50.0, 50.0, gninraeL peeD}   |
+--------------+----------------+------------------------------+

Python UDTF#

Pythonユーザー定義テーブル関数 (UDTF) は、単一のスカラー結果値ではなくテーブルを出力として返す新しい種類の関数です。登録されると、SQLクエリのFROM句に表示できます。

Python UDTFを使用するタイミング#

要するに、複数の行と列を生成する関数を作成したい場合、そして豊富なPythonエコシステムを活用したい場合は、Python UDTFが適しています。

  • Python UDTF vs Python UDF:SparkのPython UDFは、それぞれゼロ個以上のスカラー値を入力として受け取り、単一の値を返すように設計されていますが、UDTFはより柔軟性を提供します。これらは複数の行と列を返すことができ、UDFの機能を拡張します。UDTFが特に役立つシナリオをいくつか紹介します。

    • 配列や構造体などのネストされたデータ型を展開し、複数の行に変換する。

    • 複数の部分に分割する必要がある文字列データを処理し、各部分を個別の行または複数の列として表現する。

    • 入力範囲に基づいて行を生成する。例えば、異なる日付の数値、タイムスタンプ、またはレコードのシーケンスを作成する。

  • Python UDTF vs SQL UDTF:SQL UDTFは効率的で汎用的ですが、Pythonはより豊富なライブラリとツールを提供します。SQLと比較して、Pythonは高度な変換や計算(例:統計関数や機械学習推論)を可能にするツールを提供します。

Python UDTFを作成するには#

以下のコードでは、2つの整数を入力として受け取り、元の数値とその二乗の2つの列を出力する単純なUDTFを作成しました。

yieldステートメントの使用に注意してください。Python UDTFは、結果が正しく処理されるように、戻り値の型としてタプルまたはRowオブジェクトを必要とします。

また、戻り値の型は、SparkでのブロックフォーマットのStructType、またはブロックフォーマットのStructTypeを表すDDL文字列である必要があります。

[12]:
from pyspark.sql.functions import udtf

@udtf(returnType="num: int, squared: int")
class SquareNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num * num)

Arrow最適化#

Apache Arrowは、JavaとPythonプロセス間で効率的なデータ転送を可能にするインメモリ列指向データフォーマットです。UDTFが多数の行を出力する場合、パフォーマンスを大幅に向上させることができます。Arrow最適化は、例えばuseArrow=Trueを使用して有効にできます。

from pyspark.sql.functions import udtf

@udtf(returnType="num: int, squared: int", useArrow=True)
class SquareNumbers:
    ...

Python UDTFを使用するには#

Pythonでは、クラス名を使用してUDTFを直接呼び出すことができます。以下に例を示します。

[13]:
from pyspark.sql.functions import lit

SquareNumbers(lit(1), lit(3)).show()
+---+-------+
|num|squared|
+---+-------+
|  1|      1|
|  2|      4|
|  3|      9|
+---+-------+

SQLでは、Python UDTFを登録し、クエリのFROM句でテーブル値関数としてSQLで使用できます。

spark.sql("SELECT * FROM square_numbers(1, 3)").show()

その他の例#

例1:範囲内の数値、その二乗、三乗、階乗を生成する#

[14]:
from pyspark.sql.functions import lit, udtf
import math

@udtf(returnType="num: int, square: int, cube: int, factorial: int")
class GenerateComplexNumbers:
    def eval(self, start: int, end: int):
        for num in range(start, end + 1):
            yield (num, num ** 2, num ** 3, math.factorial(num))

GenerateComplexNumbers(lit(1), lit(5)).show()
+---+------+----+---------+
|num|square|cube|factorial|
+---+------+----+---------+
|  1|     1|   1|        1|
|  2|     4|   8|        2|
|  3|     9|  27|        6|
|  4|    16|  64|       24|
|  5|    25| 125|      120|
+---+------+----+---------+

例2:文を単語に分割し、複数の操作を実行する#

[15]:
from pyspark.sql.functions import lit, udtf

@udtf(returnType="word: string, length: int, is_palindrome: boolean")
class ProcessWords:
    def eval(self, sentence: str):
        words = sentence.split()  # Split sentence into words
        for word in words:
            is_palindrome = word == word[::-1]  # Check if the word is a palindrome
            yield (word, len(word), is_palindrome)

ProcessWords(lit("hello world")).show()
+-----+------+-------------+
| word|length|is_palindrome|
+-----+------+-------------+
|hello|     5|        false|
|world|     5|        false|
+-----+------+-------------+

例3:JSON文字列をデータ型を持つキーと値のペアに解析する#

[16]:
import json
from pyspark.sql.functions import lit, udtf

@udtf(returnType="key: string, value: string, value_type: string")
class ParseJSON:
    def eval(self, json_str: str):
        try:
            json_data = json.loads(json_str)
            for key, value in json_data.items():
                value_type = type(value).__name__
                yield (key, str(value), value_type)
        except json.JSONDecodeError:
            yield ("Invalid JSON", "", "")

ParseJSON(lit('{"name": "Alice", "age": 25, "is_student": false}')).show()
+----------+-----+----------+
|       key|value|value_type|
+----------+-----+----------+
|      name|Alice|       str|
|       age|   25|       int|
|is_student|False|      bool|
+----------+-----+----------+