第7章:ロードと活用 - データロード、ストレージ、ファイル形式#
[1]:
!pip install pyspark==4.0.0.dev2
Requirement already satisfied: pyspark==4.0.0.dev2 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (4.0.0.dev2)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/envs/llm-spark/lib/python3.11/site-packages (from pyspark==4.0.0.dev2) (0.10.9.7)
[2]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Data Loading and Storage Example") \
.getOrCreate()
このセクションでは、PySparkを使用してさまざまな形式のデータを読み書きする方法について説明します。一般的なファイルタイプ(CSV、JSON、Parquet、ORCなど)からデータをロードし、効率的にデータを保存する方法を学びます。
データの読み込み#
1.1 CSVファイルの読み込み#
CSVは、データ交換で最も一般的な形式の1つです。CSVファイルをDataFrameにロードする方法は次のとおりです。
[3]:
csv_df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
csv_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Role| Location|
+-----------+-----------------+-----------------+
| 19238| Data Analyst| Seattle, WA|
| 19239|Software Engineer| Seattle, WA|
| 19240| IT Specialist| Seattle, WA|
| 19241| Data Analyst| New York, NY|
| 19242| Recruiter|San Francisco, CA|
| 19243| Product Manager| New York, NY|
+-----------+-----------------+-----------------+
説明: - header=True: 最初の行を列名として扱います。 - inferSchema=True: 列のデータ型を自動的に推測します。
1.2 JSONファイルの読み込み#
JSONファイルのロードは簡単で、単一行および複数行のJSON構造の両方を処理できます。
[4]:
json_df = spark.read.option("multiline", "true").json("../data/employees.json")
json_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19238| Seattle, WA| Data Analyst|
| 19239| Seattle, WA|Software Engineer|
| 19240| Seattle, WA| IT Specialist|
| 19241| New York, NY| Data Analyst|
| 19242|San Francisco, CA| Recruiter|
| 19243| New York, NY| Product Manager|
+-----------+-----------------+-----------------+
説明: - multiline="true": 複数行のJSON構造を読み込むことができます。
1.3 Parquetファイルの読み込み#
Parquetは、効率的なデータ圧縮とエンコーディングをサポートする列指向フォーマットです。
[5]:
parquet_df = spark.read.parquet("../data/employees.parquet")
parquet_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19239| Seattle, WA|Software Engineer|
| 19243| New York, NY| Product Manager|
| 19242|San Francisco, CA| Recruiter|
| 19241| New York, NY| Data Analyst|
| 19240| Seattle, WA| IT Specialist|
| 19238| Seattle, WA| Data Analyst|
+-----------+-----------------+-----------------+
ヒント: Parquetファイルは、列指向ストレージと圧縮により、データの保存に非常に効率的です。
1.4 ORCファイルの読み込み#
ORCは別の列指向ファイル形式であり、Hadoop環境でよく使用されます。
[6]:
orc_df = spark.read.orc("../data/employees.orc")
orc_df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19242|San Francisco, CA| Recruiter|
| 19239| Seattle, WA|Software Engineer|
| 19240| Seattle, WA| IT Specialist|
| 19243| New York, NY| Product Manager|
| 19238| Seattle, WA| Data Analyst|
| 19241| New York, NY| Data Analyst|
+-----------+-----------------+-----------------+
データの書き込み#
2.1 データをCSVとして書き込む#
[7]:
csv_df.write.csv("../data/employees_out.csv", mode="overwrite", header=True)
説明: - mode="overwrite": ディレクトリが存在する場合、上書きされます。 - header=True: 列名を最初の行として書き込みます。
2.2 データをParquetとして書き込む#
Parquet形式は、大規模データセットに推奨されます。
[8]:
parquet_df.write.parquet("../data/employees_out.parquet", mode="overwrite")
2.3 データをORCとして書き込む#
[9]:
json_df.write.orc("../data/employees_out.orc", mode="overwrite")
ヒント: ParquetおよびORC形式は、効率的なストレージと高速な読み込みに最適です。
追加オプションと設定#
追加オプションを使用して、データの読み書き方法をカスタマイズできます。以下にいくつかの例を示します。
CSVでのカスタム区切り文字:#
[10]:
spark.read.option("delimiter", ";").csv("../data/employees.csv").show(truncate=False)
+-------------------------------------+
|_c0 |
+-------------------------------------+
|Employee ID,Role,Location |
|19238,Data Analyst,"Seattle, WA" |
|19239,Software Engineer,"Seattle, WA"|
|19240,IT Specialist,"Seattle, WA" |
|19241,Data Analyst,"New York, NY" |
|19242,Recruiter,"San Francisco, CA" |
|19243,Product Manager,"New York, NY" |
+-------------------------------------+
NULL値の処理:#
[11]:
spark.read.option("nullValue", "NULL").csv("../data/employees.csv").show(truncate=False)
+-----------+-----------------+-----------------+
|_c0 |_c1 |_c2 |
+-----------+-----------------+-----------------+
|Employee ID|Role |Location |
|19238 |Data Analyst |Seattle, WA |
|19239 |Software Engineer|Seattle, WA |
|19240 |IT Specialist |Seattle, WA |
|19241 |Data Analyst |New York, NY |
|19242 |Recruiter |San Francisco, CA|
|19243 |Product Manager |New York, NY |
+-----------+-----------------+-----------------+
圧縮オプション:#
[12]:
parquet_df.write.option("compression", "gzip").parquet("../data/employees_out.parquet", mode="overwrite")
サポートされているすべての機能とオプションを確認するには、PySpark APIリファレンスの入出力セクションを参照してください。