第2章:PySparkデータ型ツアー#
PySparkの基本データ型#
DataFrameスキーマの定義や効率的なデータ処理の実行には、PySparkの基本データ型を理解することが不可欠です。以下に、各型の詳細な概要、説明、Pythonでの相当するもの、および例を示します。
数値型#
ByteType -128から127までのバイト長整数の格納に使用されます。小さなデータを効率的に格納するのに理想的です。- Pythonでの相当するもの:int(-128から127) Pythonの例
[2]:
byte_example = 127 # Maximum value for a signed byte
ShortType -32768から32767までの値を格納する短い整数を表します。数値範囲の小さいデータに使用する場合、IntegerTypeよりも効率的です。- Pythonでの相当するもの:int(-32768から32767) Pythonの例
[3]:
short_example = 32767 # Maximum value for a signed short
IntegerType 整数値の格納に使用されます。カウント、インデックス、および任意の離散量に理想的です。- Pythonでの相当するもの:int(-2147483648から2147483647) Pythonの例
[4]:
integer_example = 123
LongType 大きな整数値を格納するのに適しており、識別子や大きなカウントによく使用されます。- Pythonでの相当するもの:int(-9223372036854775808から9223372036854775807) Pythonの例
[5]:
long_integer_example = 1234567890123456789
DoubleType 正確で精密な計算のための倍精度浮動小数点数を提供します。- Pythonでの相当するもの:float(倍精度) Pythonの例
[6]:
double_example = 12345.6789
FloatType 精度が低くてもパフォーマンスが優先される浮動小数点数に使用されます。- Pythonでの相当するもの:float(単精度) Pythonの例
[7]:
float_example = 123.456
DecimalType 固定精度とスケールを可能にし、金融計算など、正確な小数表現が必要なシナリオで使用されます。- Pythonでの相当するもの:decimal.Decimal Pythonの例
[8]:
from decimal import Decimal
decimal_example = Decimal('12345.6789')
StringType#
テキストデータに使用されます。Unicodeをサポートし、任意の文字列データを格納できます。- Pythonでの相当するもの:str Pythonの例
[9]:
string_example = "Hello, World!"
BinaryType#
ファイルの内容や画像などの生のバイトデータをバイナリストリームとして格納するために使用されます。- Pythonでの相当するもの:bytes Pythonの例
[10]:
binary_example = b'Hello, binary world!'
BooleanType#
ブール値を表し、条件演算やフィルタで広く使用されます。- Pythonでの相当するもの:bool Pythonの例
[11]:
boolean_example = True
日付時刻型#
DateType 時間を含まない日付に使用され、誕生日や特定の日などのカレンダー日付の格納に適しています。- Pythonでの相当するもの:datetime.date Pythonの例
[12]:
from datetime import date
date_example = date(2020, 1, 1)
TimestampType 日付と時刻の両方を格納し、ログのタイムスタンプなど、正確な時点の記録に不可欠です。- Pythonでの相当するもの:datetime.datetime Pythonの例
[13]:
from datetime import datetime
timestamp_example = datetime(2020, 1, 1, 12, 0)
PySparkでPythonオブジェクトからDataFrameを作成する#
ここでは、各基本データ型に対応するPythonオブジェクトを使用して、PySparkでスキーマを定義し、DataFrameを作成する方法を示します。
[14]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, DoubleType, FloatType
from pyspark.sql.types import DecimalType, StringType, BinaryType, BooleanType, DateType, TimestampType
from decimal import Decimal
from datetime import date, datetime
# Define the schema of the DataFrame
schema = StructType([
StructField("integer_field", IntegerType(), nullable=False),
StructField("long_field", LongType(), nullable=False),
StructField("double_field", DoubleType(), nullable=False),
StructField("float_field", FloatType(), nullable=False),
StructField("decimal_field", DecimalType(10, 2), nullable=False),
StructField("string_field", StringType(), nullable=False),
StructField("binary_field", BinaryType(), nullable=False),
StructField("boolean_field", BooleanType(), nullable=False),
StructField("date_field", DateType(), nullable=False),
StructField("timestamp_field", TimestampType(), nullable=False)
])
# Sample data using the Python objects corresponding to each PySpark type
data = [
(123, 1234567890123456789, 12345.6789, 123.456, Decimal('12345.67'), "Hello, World!",
b'Hello, binary world!', True, date(2020, 1, 1), datetime(2020, 1, 1, 12, 0)),
(456, 9223372036854775807, 98765.4321, 987.654, Decimal('98765.43'), "Goodbye, World!",
b'Goodbye, binary world!', False, date(2025, 12, 31), datetime(2025, 12, 31, 23, 59)),
(-1, -1234567890123456789, -12345.6789, -123.456, Decimal('-12345.67'), "Negative Values",
b'Negative binary!', False, date(1990, 1, 1), datetime(1990, 1, 1, 0, 0)),
(0, 0, 0.0, 0.0, Decimal('0.00'), "", b'', True, date(2000, 1, 1), datetime(2000, 1, 1, 0, 0))
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show()
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
|integer_field| long_field|double_field|float_field|decimal_field| string_field| binary_field|boolean_field|date_field| timestamp_field|
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
| 123| 1234567890123456789| 12345.6789| 123.456| 12345.67| Hello, World!|[48 65 6C 6C 6F 2...| true|2020-01-01|2020-01-01 12:00:00|
| 456| 9223372036854775807| 98765.4321| 987.654| 98765.43|Goodbye, World!|[47 6F 6F 64 62 7...| false|2025-12-31|2025-12-31 23:59:00|
| -1|-1234567890123456789| -12345.6789| -123.456| -12345.67|Negative Values|[4E 65 67 61 74 6...| false|1990-01-01|1990-01-01 00:00:00|
| 0| 0| 0.0| 0.0| 0.00| | []| true|2000-01-01|2000-01-01 00:00:00|
+-------------+--------------------+------------+-----------+-------------+---------------+--------------------+-------------+----------+-------------------+
Double、Float、Decimalの精度#
数値データ型の精度を理解することは、特に金融分析、科学計算、工学など、高い精度を必要とする分野において、データ整合性のために重要です。PySparkはこれらのニーズに対応するためにさまざまなデータ型を提供しています。
FloatType PySparkのFloatTypeは、単精度32ビットIEEE 754浮動小数点数を表します。精度は低いですが、ストレージ容量が少なく、DoubleTypeよりも高速に処理できます。そのため、大量の数値データを迅速に処理する必要があり、極端な精度が重要でないアプリケーションに適しています。使用シナリオ:大量のデータセットを処理する際の機械学習アルゴリズムで、より高速な計算に役立ちます。
DoubleType DoubleTypeは、倍精度64ビットIEEE 754浮動小数点数に対応します。精度とパフォーマンスのバランスが良く、精度が重要なほとんどの数値計算に適しています。使用シナリオ:計算速度よりも精度が重要な金融計算に最適です。
DecimalType 高精度固定スケール小数を扱う場合にDecimalTypeを使用します。精度とスケールはユーザーが定義できるため、正確な小数表現が丸め誤差を回避するのに役立つ金融レポートなどのアプリケーションで非常に価値があります。使用シナリオ:会計アプリケーションでは、セント単位までの正確な計算が必要なため、不可欠です。
例:金融統計の計算#
この例では、PySparkでさまざまな数値データ型を使用して、収益の集計や平均の計算など、適切な精度での金融計算を実行する方法を示します。
[15]:
from decimal import Decimal
from pyspark.sql.types import StructType, StructField, FloatType, DoubleType, DecimalType
from pyspark.sql.functions import sum, avg, col, format_number
# Define the schema of the DataFrame
schema = StructType([
StructField("revenue_float", FloatType(), nullable=False),
StructField("revenue_double", DoubleType(), nullable=False),
StructField("revenue_decimal", DecimalType(10, 2), nullable=False)
])
# Sample data
data = [
(12345.67, 12345.6789, Decimal('12345.68')),
(98765.43, 98765.4321, Decimal('98765.43')),
(54321.10, 54321.0987, Decimal('54321.10'))
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Calculations
result = df.select(
format_number(sum(col("revenue_float")), 2).alias("Total_Revenue_Float"),
format_number(avg(col("revenue_float")), 2).alias("Average_Revenue_Float"),
format_number(sum(col("revenue_double")), 2).alias("Total_Revenue_Double"),
format_number(avg(col("revenue_double")), 2).alias("Average_Revenue_Double"),
format_number(sum(col("revenue_decimal")), 2).alias("Total_Revenue_Decimal"),
format_number(avg(col("revenue_decimal")), 2).alias("Average_Revenue_Decimal")
)
result.show()
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
|Total_Revenue_Float|Average_Revenue_Float|Total_Revenue_Double|Average_Revenue_Double|Total_Revenue_Decimal|Average_Revenue_Decimal|
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
| 165,432.20| 55,144.07| 165,432.21| 55,144.07| 165,432.21| 55,144.07|
+-------------------+---------------------+--------------------+----------------------+---------------------+-----------------------+
PySparkの複合データ型#
PySparkの複合データ型は、JSON、XMLなどの現代的なデータ形式や、ビッグデータエコシステムで一般的に見られるその他の形式を扱う上で不可欠な、ネストされた構造化データの処理を容易にします。このセクションでは、PySparkで利用可能な主な複合データ型であるArrayType、StructType、MapTypeとそのユースケースを調査します。
ArrayType 1つの列に同じ型の複数の値を格納できます。タグ、カテゴリ、履歴データポイントなど、自然にリストを形成するデータに理想的です。- Pythonでの相当するもの:list Pythonの例
[16]:
array_example = ['apple', 'banana', 'cherry']
使用シナリオ:各レコードに関連付けられたアイテムのリストを管理します。例:単一の連絡先に対する複数の電話番号またはメールアドレス。
StructType DataFrameの列のネストを可能にし、単一のDataFrameセル内に複雑な階層構造のデータ構造を許可します。StructTypeの各フィールドは、それ自体が複合型になる可能性があります。DataFrameの行に似ており、構造化されたスキーマを持つレコードをカプセル化するために一般的に使用されます。- Pythonでの相当するもの:pyspark.sql.Row Pythonの例
[17]:
from pyspark.sql import Row
struct_example = Row(name="John Doe", age=30, address=Row(street="123 Elm St", city="Somewhere"))
使用シナリオ:JSONオブジェクトを表すためにしばしば使用され、各JSONフィールドをDataFrameの列のように操作できます。
MapType DataFrameの列でキーと値のペアを表し、各キーと値は任意のデータ型にすることができます。動的に構造化されたデータに役立ちます。- Pythonでの相当するもの:dict Pythonの例
[18]:
map_example = {'food': 'pizza', 'color': 'blue', 'car': 'Tesla'}
使用シナリオ:単一のDataFrame列内でキーと値のペアのコレクションを格納および処理します。例:製品の属性で、キーは属性名、値は属性値。
例:複雑なネストデータの処理#
これらの複合データ型の使用を説明するために、複数の住所やさまざまなカテゴリの好みを統合した顧客レコードなど、ネストされたデータ構造を含む実践的な例を検討してみましょう。
[19]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType
from pyspark.sql import Row
# Define the schema of the DataFrame
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("addresses", ArrayType(
StructType([
StructField("street", StringType(), nullable=False),
StructField("city", StringType(), nullable=False),
StructField("zip", StringType(), nullable=False)
])
), nullable=True),
StructField("preferences", MapType(StringType(), StringType()), nullable=True)
])
# Sample data using Row objects for StructType
data = [
Row(name="John Doe",
addresses=[Row(street="123 Elm St", city="Somewhere", zip="12345"),
Row(street="456 Oak St", city="Anywhere", zip="67890")],
preferences={"food": "pizza", "color": "blue", "car": "Tesla"}),
Row(name="Jane Smith",
addresses=[Row(street="789 Pine St", city="Everywhere", zip="10112")],
preferences={"food": "sushi", "color": "green", "car": "Honda"})
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Show the DataFrame
df.show(truncate=False)
+----------+---------------------------------------------------------------+---------------------------------------------+
|name |addresses |preferences |
+----------+---------------------------------------------------------------+---------------------------------------------+
|John Doe |[{123 Elm St, Somewhere, 12345}, {456 Oak St, Anywhere, 67890}]|{color -> blue, car -> Tesla, food -> pizza} |
|Jane Smith|[{789 Pine St, Everywhere, 10112}] |{color -> green, car -> Honda, food -> sushi}|
+----------+---------------------------------------------------------------+---------------------------------------------+
この例では、- ArrayTypeは、各顧客の複数の住所を格納するために使用されます。- StructTypeは、各住所を構造化されたレコードとして表すためにArrayType内にネストされています。- MapTypeは、動的なデータストレージを可能にするキーと値のペアとして好みを格納します。
PySparkでの列のキャスト#
列のキャストはデータ処理における基本的な操作であり、DataFrameの列のデータ型をある型から別の型に変換します。PySparkは、入力データ型をデータ処理操作またはアプリケーションの要件に合わせることを可能にする簡単なメソッドを提供します。
列のキャスト方法#
PySparkで列をキャストするには、列に対してcast()またはastype()メソッドを使用できます。以下は、基本的なキャスト操作を実行する方法を示す完全な例です。
[20]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType
# Define the schema of the DataFrame
schema = StructType([
StructField("float_column", FloatType(), nullable=True),
StructField("string_column", StringType(), nullable=True)
])
# Sample data
data = [
(123.456, "123"),
(789.012, "456"),
(None, "789")
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Display original DataFrame
print("Original DataFrame:")
df.show()
# Example of casting a float column to string
df = df.withColumn('string_from_float', col('float_column').cast('string'))
# Example of casting a string column to integer
df = df.withColumn('integer_from_string', col('string_column').cast('integer'))
# Display DataFrame after casting
print("DataFrame after Casting:")
df.show()
Original DataFrame:
+------------+-------------+
|float_column|string_column|
+------------+-------------+
| 123.456| 123|
| 789.012| 456|
| NULL| 789|
+------------+-------------+
DataFrame after Casting:
+------------+-------------+-----------------+-------------------+
|float_column|string_column|string_from_float|integer_from_string|
+------------+-------------+-----------------+-------------------+
| 123.456| 123| 123.456| 123|
| 789.012| 456| 789.012| 456|
| NULL| 789| NULL| 789|
+------------+-------------+-----------------+-------------------+
注意してキャスト:潜在的なデータ損失#
列をキャストする際には、PySparkが互換性のない、または無効なキャスト操作をどのように処理するかを認識することが重要です。
サイレントなNullへの変換- ANSIモードが無効になっている場合、PySparkは、キャスト中に値を目的の型に変換できない場合でもエラーを発生させません。代わりに、値はオーバーフローするかnullに変換されます。この動作は、データセットのデータ損失につながる可能性があり、すぐに明らかにならないことがあります。- ANSIモードが有効になっている場合、PySparkはその場合にエラーを発生させます。許容される場合は、代わりにtry_castを使用してください。
例:データ損失の確認- キャスト操作によって発生した予期しないNull、特に書式設定の問題で失敗する可能性のある文字列から数値型への変換の場合、確認することは良い習慣です。
[21]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# Disable ANSI mode
spark.conf.set("spark.sql.ansi.enabled", False)
# Define the schema of the DataFrame
schema = StructType([
StructField("original_column", StringType(), nullable=True)
])
# Sample data
data = [
("123",), # Valid integer in string form
("abc",), # Invalid, will result in null when cast to integer
(None,) # Original null, remains null
]
# Create DataFrame
df = spark.createDataFrame(data, schema=schema)
# Display original DataFrame
print("Original DataFrame:")
df.show()
# Add a new column with casted values
df = df.withColumn('casted_column', col('original_column').cast('integer'))
# Show rows where casting resulted in nulls but the original column had data
print("DataFrame Showing Potential Data Loss:")
df.filter(col('original_column').isNotNull() & col('casted_column').isNull()).show()
spark.conf.unset("spark.sql.ansi.enabled")
Original DataFrame:
+---------------+
|original_column|
+---------------+
| 123|
| abc|
| NULL|
+---------------+
DataFrame Showing Potential Data Loss:
+---------------+-------------+
|original_column|casted_column|
+---------------+-------------+
| abc| NULL|
+---------------+-------------+
キャストのベストプラクティス#
最初にデータを検証する- 列をキャストする前、特に文字列を数値型に変換する前に、データが予期された形式に準拠していることを確認するために、データを検証およびクリーニングしてください。
例:整数へのキャストの前に、数値文字列が正しくフォーマットされているかを確認する
[22]:
from pyspark.sql.functions import col, regexp_extract
# Sample DataFrame with a string column
df = spark.createDataFrame([("100",), ("20x",), ("300",)], ["data"])
# Checking and filtering rows where data can be safely cast to an integer
valid_df = df.filter(regexp_extract(col("data"), '^[0-9]+$', 0) != "")
valid_df.show()
+----+
|data|
+----+
| 100|
| 300|
+----+
明示的なスキーマを使用する- データを読み込む際は、間違ったデータ型推論を避けるために明示的なスキーマを使用してください。これにより、キャストの必要性を最小限に抑えることができます。
例:データを読み込む際にスキーマを指定して、最初から正しいデータ型が適用されることを確認する
[23]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
# Define a schema
schema = StructType([
StructField("Employee ID", IntegerType(), True),
StructField("Role", StringType(), True),
StructField("Location", StringType(), True)
])
# Read data with an explicit schema
df = spark.read.csv("../data/employees.csv", schema=schema)
df.printSchema()
root
|-- Employee ID: integer (nullable = true)
|-- Role: string (nullable = true)
|-- Location: string (nullable = true)
PySparkでの半構造化データ処理#
このセクションでは、JSONおよびXMLに焦点を当てたPySparkの半構造化データ形式の処理機能を探り、一部のSQLデータベースで一般的に使用されるVARIANT型のようなデータの管理方法について説明します。
JSON処理#
JSONは、Webサービスやデータ交換で広く使用されている形式です。PySparkはJSONデータを構造化されたDataFrameに解析することを簡素化し、操作と分析を容易にします。
主要関数- from_json():JSON文字列を構造化データ型を持つDataFrame列に変換します。- to_json():DataFrameの列をJSON文字列に変換します。
例:JSON文字列の解析
[24]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
json_schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType())
])
df = spark.createDataFrame([("{\"name\":\"John\", \"age\":30}",), ("{\"name\":\"Jane\", \"age\":25}",)], ["json_str"])
df.select(from_json(col("json_str"), json_schema).alias("parsed_json")).show()
+-----------+
|parsed_json|
+-----------+
| {John, 30}|
| {Jane, 25}|
+-----------+
例:JSONデータの読み込みと処理
[25]:
df = spark.read.json('../data/books.json')
df.select("author", "title", "genre").show()
+-------------+--------------------+---------+
| author| title| genre|
+-------------+--------------------+---------+
|George Orwell| 1984|Dystopian|
| Jane Austen| Pride and Prejudice| Romance|
| Mark Twain|Adventures of Huc...| Fiction|
+-------------+--------------------+---------+
XML処理#
注:このセクションはSpark 4.0に適用されます。
XMLは半構造化データのもう1つの一般的な形式であり、さまざまなエンタープライズアプリケーションで広く使用されています。例:XMLデータの読み込みと処理
[26]:
df = spark.read \
.format('xml') \
.option('rowTag', 'book') \
.load('../data/books.xml')
df.select("author", "title", "genre").show()
+-------------+--------------------+---------+
| author| title| genre|
+-------------+--------------------+---------+
|George Orwell| 1984|Dystopian|
| Jane Austen| Pride and Prejudice| Romance|
| Mark Twain|Adventures of Huc...| Fiction|
+-------------+--------------------+---------+
PySparkでのVARIANTデータ型の処理#
注:このセクションはSpark 4.0に適用されます。
VARIANTデータ型の導入により、半構造化データの処理がより効率的になりました。VARIANT型は、JSONやXMLのように固定スキーマに準拠しないデータをDataFrame列に直接格納するように設計されています。
PySparkにおけるVARIANTの機能- **柔軟性**:VARIANT型は、事前定義されたスキーマ制約なしで、JSONやXMLのようなデータ構造を格納でき、データ取り込みと操作に高い柔軟性を提供します。- **統合**:半構造化データを使用するシステムとの統合が向上し、より直接的なデータ交換とクエリが可能になります。
VARIANT使用時の考慮事項- **パフォーマンス**:VARIANTは柔軟性を提供しますが、動的な性質によりパフォーマンスに影響を与える可能性があります。VARIANT型を含むデータ操作のテストと最適化が重要です。- **互換性**:このデータ型を利用する場合、特に外部システムにデータをエクスポートする際は、データパイプラインのすべての部分がVARIANTをサポートしていることを確認してください。
実践例:VARIANTを使用したJSONデータの処理この例では、PySparkでVARIANTを使用してJSONデータを効果的に処理する方法を示します。
[27]:
from datetime import date, datetime
from decimal import Decimal
from pyspark.sql.functions import try_parse_json, try_variant_get, col
# Sample JSON data
data = [
'1234567890123456789',
'12345.6789',
'"Hello, World!"',
'true',
'{"id": 1, "attributes": {"key1": "value1", "key2": "value2"}}',
'{"id": 2, "attributes": {"key1": "value3", "key2": "value4"}}',
]
# Load data into DataFrame with VARIANT
df = spark.createDataFrame(data, StringType()).select(try_parse_json(col("value")).alias("variant_data"))
df.printSchema()
df.show(truncate=False)
# Accessing elements inside the VARIANT
df.select(
try_variant_get(col("variant_data"), "$", "long").alias("long_value"),
try_variant_get(col("variant_data"), "$.id", "int").alias("id"),
try_variant_get(col("variant_data"), "$.attributes.key1", "string").alias("key1"),
try_variant_get(col("variant_data"), "$.attributes.key2", "string").alias("key2"),
).show()
# Collect data and convert to Python objects
[row["variant_data"].toPython() for row in df.collect()]
root
|-- variant_data: variant (nullable = true)
+-------------------------------------------------------+
|variant_data |
+-------------------------------------------------------+
|1234567890123456789 |
|12345.6789 |
|"Hello, World!" |
|true |
|{"attributes":{"key1":"value1","key2":"value2"},"id":1}|
|{"attributes":{"key1":"value3","key2":"value4"},"id":2}|
+-------------------------------------------------------+
+-------------------+----+------+------+
| long_value| id| key1| key2|
+-------------------+----+------+------+
|1234567890123456789|NULL| NULL| NULL|
| 12345|NULL| NULL| NULL|
| NULL|NULL| NULL| NULL|
| 1|NULL| NULL| NULL|
| NULL| 1|value1|value2|
| NULL| 2|value3|value4|
+-------------------+----+------+------+
[27]:
[1234567890123456789,
Decimal('12345.6789'),
'Hello, World!',
True,
{'attributes': {'key1': 'value1', 'key2': 'value2'}, 'id': 1},
{'attributes': {'key1': 'value3', 'key2': 'value4'}, 'id': 2}]