第1章: DataFrame - 構造化データへのビュー#

[1]:
pip install pyspark
Requirement already satisfied: pyspark in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (3.5.0)
Requirement already satisfied: py4j==0.10.9.7 in /Users/amanda.liu/anaconda3/lib/python3.10/site-packages (from pyspark) (0.10.9.7)
Note: you may need to restart the kernel to use updated packages.
[2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

このセクションでは、PySpark の最も基本的なデータ構造である DataFrame を紹介します。

DataFrame は、型の異なる可能性のある列を持つ、2次元のラベル付きデータ構造です。DataFrame は、スプレッドシート、SQL テーブル、またはシリーズ オブジェクトの辞書のようなものと考えることができます。Apache Spark DataFrame は、豊富な API (列の選択、フィルタリング、結合、集計など) をサポートしており、一般的なデータ分析の問題を効率的に解決できます。

従来の RDBMS と比較して、Spark DataFrame はビッグデータ処理と分析においていくつかの重要な利点を提供します。

  • 分散コンピューティング: Spark はクラスタ内の複数のノードにデータを分散させ、ビッグデータの並列処理を可能にします。

  • インメモリ処理: Spark はメモリ内で計算を実行するため、ディスクベースの処理よりも大幅に高速になる可能性があります。

  • スキーマの柔軟性: 従来のデータベースとは異なり、PySpark DataFrame はスキーマ進化と動的型付けをサポートします。

  • 耐障害性: PySpark DataFrame は、本質的に耐障害性のある Resilient Distributed Dataset (RDD) の上に構築されています。Spark はノードの障害とデータのレプリケーションを自動的に処理し、データの信頼性と整合性を確保します。

RDD に関する注意: Spark 4.0 以降、Spark Connect では RDD の直接使用はサポートされなくなりました。Spark DataFrame と直接対話することで、単一の計画と最適化エンジンを使用でき、Databricks 上のすべてのサポート言語 (Python, SQL, Scala, R) でほぼ同等のパフォーマンスを得ることができます。

DataFrame の作成#

PySpark で DataFrame を作成するには、いくつかの方法があります。

辞書のリストから#

最も簡単な方法は、以下のように `createDataFrame()` メソッドを使用することです。

[3]:
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

ローカルファイルから#

ローカルの CSV ファイルから DataFrame を作成することもできます。

[4]:
df = spark.read.csv("../data/employees.csv", header=True, inferSchema=True)
df.show()
+-----------+-----------------+-----------------+
|Employee ID|             Role|         Location|
+-----------+-----------------+-----------------+
|      19238|     Data Analyst|      Seattle, WA|
|      19239|Software Engineer|      Seattle, WA|
|      19240|    IT Specialist|      Seattle, WA|
|      19241|     Data Analyst|     New York, NY|
|      19242|        Recruiter|San Francisco, CA|
|      19243|  Product Manager|     New York, NY|
+-----------+-----------------+-----------------+

またはローカルの JSON ファイルから。

[5]:
df = spark.read.option("multiline","true").json("../data/employees.json")
df.show()
+-----------+-----------------+-----------------+
|Employee ID|         Location|             Role|
+-----------+-----------------+-----------------+
|      19238|      Seattle, WA|     Data Analyst|
|      19239|      Seattle, WA|Software Engineer|
|      19240|      Seattle, WA|    IT Specialist|
|      19241|     New York, NY|     Data Analyst|
|      19242|San Francisco, CA|        Recruiter|
|      19243|     New York, NY|  Product Manager|
+-----------+-----------------+-----------------+

既存の DataFrame から#

特定の列を選択することで、別の既存の DataFrame から DataFrame を作成することもできます。

[6]:
employees = [
  {"name": "John D.", "age": 30, "department": "HR"},
  {"name": "Alice G.", "age": 25, "department": "Finance"},
  {"name": "Bob T.", "age": 35, "department": "IT"},
  {"name": "Eve A.", "age": 28, "department": "Marketing"}
]
df = spark.createDataFrame(employees)

# Select only the name and age columns
new_df = df.select("name", "age")

テーブルから#

Spark 環境に既存のテーブル `table_name` がある場合、以下のように DataFrame を作成できます。

[7]:
df = spark.read.table("table_name")

データベースから#

テーブルがデータベースにある場合、JDBC を使用してテーブルを DataFrame に読み込むことができます。

[9]:
url = "jdbc:mysql://:3306/mydatabase"
table = "employees"
properties = {
  "user": "username",
  "password": "password"
}

# Read table into DataFrame
df = spark.read.jdbc(url=url, table=table, properties=properties)

DataFrame の表示#

PySpark を使用して DataFrame を表示し、対話することができます。

DataFrame の表示#

`df.show()` は、DataFrame の内容の基本的な視覚化を表示します。上記の `createDataFrame()` の例から。

[10]:
employees = [{"name": "John D.", "age": 30},
  {"name": "Alice G.", "age": 25},
  {"name": "Bob T.", "age": 35},
  {"name": "Eve A.", "age": 28}]

# Create a DataFrame containing the employees data
df = spark.createDataFrame(employees)
[11]:
df.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

`df.show()` には、`n`、`truncate`、`vertical` という 3 つのオプション引数があります。

デフォルトでは、`df.show()` は DataFrame の最初の 20 行までを表示します。show() メソッドに引数を渡すことで、表示される行数を制御できます。

[12]:
df.show(n=2)
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
+---+--------+
only showing top 2 rows

truncate 引数は、表示される列の値の長さを制御します (デフォルト値は 20)。

[13]:
df.show(truncate=3)
+---+----+
|age|name|
+---+----+
| 30| Joh|
| 25| Ali|
| 35| Bob|
| 28| Eve|
+---+----+

`vertical` を True に設定すると、DataFrame は 1 行あたり 1 つの値で垂直に表示されます。

[14]:
df.show(vertical=True)
-RECORD 0--------
 age  | 30
 name | John D.
-RECORD 1--------
 age  | 25
 name | Alice G.
-RECORD 2--------
 age  | 35
 name | Bob T.
-RECORD 3--------
 age  | 28
 name | Eve A.

DataFrame の操作#

DataFrame を変換する方法をいくつか見てみましょう。

詳細については、データ操作に関するセクションを参照してください。第6章: Function Junction - PySpark によるデータ操作

列名の変更#

`withColumnRenamed()` メソッドを使用して DataFrame の列名を変更できます。

[16]:
df.show()
df2 = df.withColumnRenamed("name", "full_name")
df2.show()
+---+--------+
|age|    name|
+---+--------+
| 30| John D.|
| 25|Alice G.|
| 35|  Bob T.|
| 28|  Eve A.|
+---+--------+

+---+---------+
|age|full_name|
+---+---------+
| 30|  John D.|
| 25| Alice G.|
| 35|   Bob T.|
| 28|   Eve A.|
+---+---------+

行のフィルタリング#

特定の年齢範囲の従業員をフィルタリングできます。以下の `df.filter` は、年齢条件に一致する行を持つ新しい DataFrame を作成します。

[17]:
filtered_df = df.filter((df["age"] > 26) & (df["age"] < 32))
filtered_df.show()
+---+-------+
|age|   name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+

同じ結果を得るために `df.where` を使用することもできます。

[18]:
where_df = df.where((df["age"] > 26) & (df["age"] < 32))
where_df.show()
+---+-------+
|age|   name|
+---+-------+
| 30|John D.|
| 28| Eve A.|
+---+-------+

DataFrame vs. テーブル#

DataFrame は、現在の Spark セッションでのみ利用可能な、不変の分散データコレクションです。

テーブルは、複数の Spark セッションでアクセスできる永続的なデータ構造です。

DataFrame をテーブルに昇格させたい場合は、`createOrReplaceTempView()` メソッドを使用できます。

[19]:
df.createOrReplaceTempView("employees")

この一時テーブルの有効期間は、この DataFrame を作成するために使用された SparkSession に結び付けられていることに注意してください。この Spark セッションを超えてテーブルを永続化するには、永続ストレージに保存する必要があります。

DataFrame を永続ストレージに保存#

PySpark で DataFrame を永続ストレージに保存するには、いくつかの方法があります。ローカル環境へのデータの保存に関する詳細については、データロードのセクションを参照してください (TODO: リンクを追加)。

ファイルベースのデータソースに保存#

ファイルベースのデータソース (テキスト、parquet、json など) の場合、カスタムテーブルパスを以下のように指定できます。

[20]:
df.write.option("path", "../dataout").saveAsTable("dataframes_savetable_example")

テーブルがドロップされても、カスタムテーブルパスとテーブルデータはそのまま残ります。

カスタムテーブルパスが指定されていない場合、Spark はウェアハウスディレクトリの下にあるデフォルトのテーブルパスにデータを書き込みます。テーブルがドロップされると、デフォルトのテーブルパスも削除されます。

Hive メタストアに保存#

Hive メタストアに保存するには、以下を使用できます。

[21]:
df.write().mode("overwrite").saveAsTable("schemaName.tableName")