第7章:ロードと活用 - データロード、ストレージ、ファイル形式#

[1]:
!pip install pyspark==4.0.0.dev2
Requirement already satisfied: pyspark==4.0.0.dev2 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (4.0.0.dev2)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (from pyspark==4.0.0.dev2) (0.10.9.7)
[2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Data Loading and Storage Example") \
    .getOrCreate()

このセクションでは、PySparkを使用してさまざまな形式のデータを読み書きする方法について説明します。一般的なファイルタイプ(CSV、JSON、Parquet、ORCなど)からデータをロードし、効率的にデータを保存する方法を学びます。

データの読み込み#

1.1 CSVファイルの読み込み#

CSVは、データ交換で最も一般的な形式の1つです。CSVファイルをDataFrameにロードする方法は次のとおりです。

[3]:
csv_df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
csv_df.show()
+-----------+-----------------+-----------------+
|Employee ID|             Role|         Location|
+-----------+-----------------+-----------------+
|      19238|     Data Analyst|      Seattle, WA|
|      19239|Software Engineer|      Seattle, WA|
|      19240|    IT Specialist|      Seattle, WA|
|      19241|     Data Analyst|     New York, NY|
|      19242|        Recruiter|San Francisco, CA|
|      19243|  Product Manager|     New York, NY|
+-----------+-----------------+-----------------+

説明: - header=True: 最初の行を列名として扱います。 - inferSchema=True: 列のデータ型を自動的に推測します。

1.2 JSONファイルの読み込み#

JSONファイルのロードは簡単で、単一行および複数行のJSON構造の両方を処理できます。

[4]:
json_df = spark.read.option("multiline", "true").json("../data/employees.json")
json_df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19238|      Seattle, WA|     Data Analyst|
|      19239|      Seattle, WA|Software Engineer|
|      19240|      Seattle, WA|    IT Specialist|
|      19241|     New York, NY|     Data Analyst|
|      19242|San Francisco, CA|        Recruiter|
|      19243|     New York, NY|  Product Manager|
+-----------+-----------------+-----------------+

説明: - multiline="true": 複数行のJSON構造を読み込むことができます。

1.3 Parquetファイルの読み込み#

Parquetは、効率的なデータ圧縮とエンコーディングをサポートする列指向フォーマットです。

[5]:
parquet_df = spark.read.parquet("../data/employees.parquet")
parquet_df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19239|      Seattle, WA|Software Engineer|
|      19243|     New York, NY|  Product Manager|
|      19242|San Francisco, CA|        Recruiter|
|      19241|     New York, NY|     Data Analyst|
|      19240|      Seattle, WA|    IT Specialist|
|      19238|      Seattle, WA|     Data Analyst|
+-----------+-----------------+-----------------+

ヒント: Parquetファイルは、列指向ストレージと圧縮により、データの保存に非常に効率的です。

1.4 ORCファイルの読み込み#

ORCは別の列指向ファイル形式であり、Hadoop環境でよく使用されます。

[6]:
orc_df = spark.read.orc("../data/employees.orc")
orc_df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19242|San Francisco, CA|        Recruiter|
|      19239|      Seattle, WA|Software Engineer|
|      19240|      Seattle, WA|    IT Specialist|
|      19243|     New York, NY|  Product Manager|
|      19238|      Seattle, WA|     Data Analyst|
|      19241|     New York, NY|     Data Analyst|
+-----------+-----------------+-----------------+

データの書き込み#

2.1 データをCSVとして書き込む#

[7]:
csv_df.write.csv("../data/employees_out.csv", mode="overwrite", header=True)

説明: - mode="overwrite": ディレクトリが存在する場合、上書きされます。 - header=True: 列名を最初の行として書き込みます。

2.2 データをParquetとして書き込む#

Parquet形式は、大規模データセットに推奨されます。

[8]:
parquet_df.write.parquet("../data/employees_out.parquet", mode="overwrite")

2.3 データをORCとして書き込む#

[9]:
json_df.write.orc("../data/employees_out.orc", mode="overwrite")

ヒント: ParquetおよびORC形式は、効率的なストレージと高速な読み込みに最適です。

追加オプションと設定#

追加オプションを使用して、データの読み書き方法をカスタマイズできます。以下にいくつかの例を示します。

CSVでのカスタム区切り文字:#

[10]:
spark.read.option("delimiter", ";").csv("../data/employees.csv").show(truncate=False)
+-------------------------------------+
|_c0                                  |
+-------------------------------------+
|Employee ID,Role,Location            |
|19238,Data Analyst,"Seattle, WA"     |
|19239,Software Engineer,"Seattle, WA"|
|19240,IT Specialist,"Seattle, WA"    |
|19241,Data Analyst,"New York, NY"    |
|19242,Recruiter,"San Francisco, CA"  |
|19243,Product Manager,"New York, NY" |
+-------------------------------------+

NULL値の処理:#

[11]:
spark.read.option("nullValue", "NULL").csv("../data/employees.csv").show(truncate=False)
+-----------+-----------------+-----------------+
|_c0        |_c1              |_c2              |
+-----------+-----------------+-----------------+
|Employee ID|Role             |Location         |
|19238      |Data Analyst     |Seattle, WA      |
|19239      |Software Engineer|Seattle, WA      |
|19240      |IT Specialist    |Seattle, WA      |
|19241      |Data Analyst     |New York, NY     |
|19242      |Recruiter        |San Francisco, CA|
|19243      |Product Manager  |New York, NY     |
+-----------+-----------------+-----------------+

圧縮オプション:#

[12]:
parquet_df.write.option("compression", "gzip").parquet("../data/employees_out.parquet", mode="overwrite")

サポートされているすべての機能とオプションを確認するには、PySpark APIリファレンスの入出力セクションを参照してください。