第1章: DataFrame - 構造化データへのビュー#
[1]:
pip install pyspark
Requirement already satisfied: pyspark in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (3.5.0)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (from pyspark) (0.10.9.7)
Note: you may need to restart the kernel to use updated packages.
[2]:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
このセクションでは、PySpark の最も基本的なデータ構造である DataFrame を紹介します。
DataFrame は、型の異なる可能性のある列を持つ、2次元のラベル付きデータ構造です。DataFrame は、スプレッドシート、SQL テーブル、またはシリーズ オブジェクトの辞書のようなものと考えることができます。Apache Spark DataFrame は、豊富な API (列の選択、フィルタリング、結合、集計など) をサポートしており、一般的なデータ分析の問題を効率的に解決できます。
従来の RDBMS と比較して、Spark DataFrame はビッグデータ処理と分析においていくつかの重要な利点を提供します。
分散コンピューティング: Spark はクラスタ内の複数のノードにデータを分散させ、ビッグデータの並列処理を可能にします。
インメモリ処理: Spark はメモリ内で計算を実行するため、ディスクベースの処理よりも大幅に高速になる可能性があります。
スキーマの柔軟性: 従来のデータベースとは異なり、PySpark DataFrame はスキーマ進化と動的型付けをサポートします。
耐障害性: PySpark DataFrame は、本質的に耐障害性のある Resilient Distributed Dataset (RDD) の上に構築されています。Spark はノードの障害とデータのレプリケーションを自動的に処理し、データの信頼性と整合性を確保します。
RDD に関する注意: Spark 4.0 以降、Spark Connect では RDD の直接使用はサポートされなくなりました。Spark DataFrame と直接対話することで、単一の計画と最適化エンジンを使用でき、Databricks 上のすべてのサポート言語 (Python, SQL, Scala, R) でほぼ同等のパフォーマンスを得ることができます。
DataFrame の作成#
PySpark で DataFrame を作成するには、いくつかの方法があります。
辞書のリストから#
最も簡単な方法は、以下のように `createDataFrame()` メソッドを使用することです。
[3]:
employees = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
ローカルファイルから#
ローカルの CSV ファイルから DataFrame を作成することもできます。
[4]:
df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
df.show()
+-----------+-----------------+-----------------+
|Employee ID| Role| Location|
+-----------+-----------------+-----------------+
| 19238| Data Analyst| Seattle, WA|
| 19239|Software Engineer| Seattle, WA|
| 19240| IT Specialist| Seattle, WA|
| 19241| Data Analyst| New York, NY|
| 19242| Recruiter|San Francisco, CA|
| 19243| Product Manager| New York, NY|
+-----------+-----------------+-----------------+
またはローカルの JSON ファイルから。
[5]:
df = spark.read.option("multiline","true").json("../data/employees.json")
df.show()
+-----------+-----------------+-----------------+
|Employee ID| Location| Role|
+-----------+-----------------+-----------------+
| 19238| Seattle, WA| Data Analyst|
| 19239| Seattle, WA|Software Engineer|
| 19240| Seattle, WA| IT Specialist|
| 19241| New York, NY| Data Analyst|
| 19242|San Francisco, CA| Recruiter|
| 19243| New York, NY| Product Manager|
+-----------+-----------------+-----------------+
既存の DataFrame から#
特定の列を選択することで、別の既存の DataFrame から DataFrame を作成することもできます。
[6]:
employees = [
{"name": "John D.", "age": 30, "department": "HR"},
{"name": "Alice G.", "age": 25, "department": "Finance"},
{"name": "Bob T.", "age": 35, "department": "IT"},
{"name": "Eve A.", "age": 28, "department": "Marketing"}
]
df = spark.createDataFrame(employees)
# Select only the name and age columns
new_df = df.select("name", "age")
テーブルから#
Spark 環境に既存のテーブル `table_name` がある場合、以下のように DataFrame を作成できます。
[7]:
df = spark.read.table("table_name")
データベースから#
テーブルがデータベースにある場合、JDBC を使用してテーブルを DataFrame に読み込むことができます。
[9]:
url = "jdbc:mysql://:3306/mydatabase"
table = "employees"
properties = {
"user": "username",
"password": "password"
}
# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)
DataFrame の表示#
PySpark を使用して DataFrame を表示し、対話することができます。
DataFrame の表示#
`df.show()` は、DataFrame の内容の基本的な視覚化を表示します。上記の `createDataFrame()` の例から。
[10]:
employees = [{"name": "John D.", "age": 30},
{"name": "Alice G.", "age": 25},
{"name": "Bob T.", "age": 35},
{"name": "Eve A.", "age": 28}]
# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
[11]:
df.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
`df.show()` には、`n`、`truncate`、`vertical` という 3 つのオプション引数があります。
デフォルトでは、`df.show()` は DataFrame の最初の 20 行までを表示します。show() メソッドに引数を渡すことで、表示される行数を制御できます。
[12]:
df.show(n=2)
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
+---+--------+
only showing top 2 rows
truncate 引数は、表示される列の値の長さを制御します (デフォルト値は 20)。
[13]:
df.show(truncate=3)
+---+----+
|age|name|
+---+----+
| 30| Joh|
| 25| Ali|
| 35| Bob|
| 28| Eve|
+---+----+
`vertical` を True に設定すると、DataFrame は 1 行あたり 1 つの値で垂直に表示されます。
[14]:
df.show(vertical=True)
-RECORD 0--------
age | 30
name | John D.
-RECORD 1--------
age | 25
name | Alice G.
-RECORD 2--------
age | 35
name | Bob T.
-RECORD 3--------
age | 28
name | Eve A.
DataFrame スキーマの表示#
`printSchema()` メソッドを使用して、DataFrame スキーマに関する情報を表示できます。
[15]:
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
DataFrame の操作#
DataFrame を変換する方法をいくつか見てみましょう。
詳細については、データ操作に関するセクションを参照してください。第6章: Function Junction - PySpark によるデータ操作。
列名の変更#
`withColumnRenamed()` メソッドを使用して DataFrame の列名を変更できます。
[16]:
df.show()
df2 = df.withColumnRenamed("name", "full_name")
df2.show()
+---+--------+
|age| name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+--------+
+---+---------+
|age|full_name|
+---+---------+
| 30| John D.|
| 25| Alice G.|
| 35| Bob T.|
| 28| Eve A.|
+---+---------+
行のフィルタリング#
特定の年齢範囲の従業員をフィルタリングできます。以下の `df.filter` は、年齢条件に一致する行を持つ新しい DataFrame を作成します。
[17]:
filtered_df = df.filter((df["age"] > 26) & (df["age"] < 32))
filtered_df.show()
+---+-------+
|age| name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+
同じ結果を得るために `df.where` を使用することもできます。
[18]:
where_df = df.where((df["age"] > 26) & (df["age"] < 32))
where_df.show()
+---+-------+
|age| name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+
DataFrame vs. テーブル#
DataFrame は、現在の Spark セッションでのみ利用可能な、不変の分散データコレクションです。
テーブルは、複数の Spark セッションでアクセスできる永続的なデータ構造です。
DataFrame をテーブルに昇格させたい場合は、`createOrReplaceTempView()` メソッドを使用できます。
[19]:
df.createOrReplaceTempView("employees")
この一時テーブルの有効期間は、この DataFrame を作成するために使用された SparkSession に結び付けられていることに注意してください。この Spark セッションを超えてテーブルを永続化するには、永続ストレージに保存する必要があります。
DataFrame を永続ストレージに保存#
PySpark で DataFrame を永続ストレージに保存するには、いくつかの方法があります。ローカル環境へのデータの保存に関する詳細については、データロードのセクションを参照してください (TODO: リンクを追加)。
ファイルベースのデータソースに保存#
ファイルベースのデータソース (テキスト、parquet、json など) の場合、カスタムテーブルパスを以下のように指定できます。
[20]:
df.write.option("path", "../dataout").saveAsTable("dataframes_savetable_example")
テーブルがドロップされても、カスタムテーブルパスとテーブルデータはそのまま残ります。
カスタムテーブルパスが指定されていない場合、Spark はウェアハウスディレクトリの下にあるデフォルトのテーブルパスにデータを書き込みます。テーブルがドロップされると、デフォルトのテーブルパスも削除されます。
Hive メタストアに保存#
Hive メタストアに保存するには、以下を使用できます。
[21]:
df.write().mode("overwrite").saveAsTable("schemaName.tableName")