クイックスタート: DataFrame#

これは PySpark DataFrame API の短い紹介とクイックスタートです。PySpark DataFrame は遅延評価されます。これは RDD の上に実装されています。Spark がデータを 変換 する際、すぐに変換を計算するのではなく、後でどのように計算するかを計画します。 collect() のような アクション が明示的に呼び出されると、計算が開始されます。このノートブックは、主に新規ユーザーを対象とした DataFrame の基本的な使い方を示しています。これらの例の最新バージョンは、クイックスタートページ の「Live Notebook: DataFrame」でご自身で実行できます。

Apache Spark ドキュメントサイトには、他にも役立つ情報があります。Spark SQL and DataFramesRDD Programming GuideStructured Streaming Programming GuideSpark Streaming Programming Guide、および Machine Learning Library (MLlib) Guide の最新バージョンを参照してください。

PySpark アプリケーションは、以下のように SparkSession を初期化することから始まります。これは PySpark のエントリポイントです。pyspark 実行可能ファイル経由で PySpark シェルで実行する場合、シェルは自動的にユーザーのために `spark` 変数にセッションを作成します。

[1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

DataFrame の作成#

PySpark DataFrame は、通常、リストのリスト、タプル、辞書、および pyspark.sql.Rowpandas DataFrame、またはそのようなリストを含む RDD を渡すことで、pyspark.sql.SparkSession.createDataFrame を介して作成できます。 pyspark.sql.SparkSession.createDataFrame は、DataFrame のスキーマを指定するために schema 引数を受け取ります。これが省略されると、PySpark はデータからサンプルを取得して対応するスキーマを推測します。

まず、行のリストから PySpark DataFrame を作成できます。

[2]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df
[2]:
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

明示的なスキーマを持つ PySpark DataFrame を作成します。

[3]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df
[3]:
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

pandas DataFrame から PySpark DataFrame を作成します。

[4]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df
[4]:
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

上記で作成された DataFrame はすべて同じ結果とスキーマを持っています。

[6]:
# All DataFrames above result same.
df.show()
df.printSchema()
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)

データの表示#

DataFrame の先頭行は、DataFrame.show() を使用して表示できます。

[7]:
df.show(1)
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row

Alternatively, you can enable spark.sql.repl.eagerEval.enabled configuration for the eager evaluation of PySpark DataFrame in notebooks such as Jupyter. The number of rows to show can be controlled via spark.sql.repl.eagerEval.maxNumRows configuration.

[8]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df
[8]:
abcde
12.0string12000-01-012000-01-01 12:00:00
23.0string22000-02-012000-01-02 12:00:00
34.0string32000-03-012000-01-03 12:00:00

行は垂直に表示することもできます。これは、行が水平に表示するには長すぎる場合に便利です。

[9]:
df.show(1, vertical=True)
-RECORD 0------------------
 a   | 1
 b   | 2.0
 c   | string1
 d   | 2000-01-01
 e   | 2000-01-01 12:00:00
only showing top 1 row

DataFrame のスキーマと列名を確認するには、以下のようにします。

[10]:
df.columns
[10]:
['a', 'b', 'c', 'd', 'e']
[11]:
df.printSchema()
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)

DataFrame の概要を表示します。

[12]:
df.select("a", "b", "c").describe().show()
+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+

DataFrame.collect() は、分散されたデータを Python のローカルデータとしてドライバー側に収集します。データセットが大きすぎてドライバー側に収まらない場合、すべてのデータをエグゼキュータからドライバー側に収集するため、メモリ不足エラーが発生する可能性があることに注意してください。

[13]:
df.collect()
[13]:
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

メモリ不足例外を回避するには、DataFrame.take() または DataFrame.tail() を使用します。

[14]:
df.take(1)
[14]:
[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

PySpark DataFrame は、pandas API を活用するために、pandas DataFrame への変換も提供します。toPandas もすべてのデータをドライバー側に収集するため、データが大きすぎてドライバー側に収まらない場合にメモリ不足エラーを容易に引き起こす可能性があることに注意してください。

[15]:
df.toPandas()
[15]:
a b c d e
0 1 2.0 string1 2000-01-01 2000-01-01 12:00:00
1 2 3.0 string2 2000-02-01 2000-01-02 12:00:00
2 3 4.0 string3 2000-03-01 2000-01-03 12:00:00

データの選択とアクセス#

PySpark DataFrame は遅延評価され、単に列を選択しても計算はトリガーされず、Column インスタンスが返されます。

[16]:
df.a
[16]:
Column<b'a'>

実際、ほとんどの列指向の操作は Column を返します。

[17]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())
[17]:
True

これらの Column インスタンスは、DataFrame から列を選択するために使用できます。たとえば、DataFrame.select()Column インスタンスを受け取り、別の DataFrame を返します。

[18]:
df.select(df.c).show()
+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+

新しい Column インスタンスを割り当てます。

[19]:
df.withColumn('upper_c', upper(df.c)).show()
+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+

行のサブセットを選択するには、DataFrame.filter() を使用します。

[20]:
df.filter(df.a == 1).show()
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+

関数の適用#

PySpark は、ユーザーが Python ネイティブ関数を実行できるように、さまざまな UDF および API をサポートしています。最新の Pandas UDF および Pandas Function APIs も参照してください。たとえば、以下の例では、ユーザーは Python ネイティブ関数内で pandas Series の API を直接使用できます。

[21]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()
+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+

別の例は DataFrame.mapInPandas です。これは、ユーザーが結果の長さに制限なく、pandas DataFrame の API を直接使用できるようにします。

[22]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()
+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+

データのグループ化#

PySpark DataFrame は、一般的なアプローチである split-apply-combine 戦略を使用して、グループ化されたデータを処理する方法も提供します。これは、データを特定の条件でグループ化し、各グループに関数を適用してから、それらを DataFrame に結合します。

[23]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

グループ化してから、結果のグループに avg() 関数を適用します。

[24]:
df.groupby('color').avg().show()
+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
|black|    6.0|   60.0|
| blue|    3.0|   30.0|
+-----+-------+-------+

pandas API を使用して、各グループに対して Python ネイティブ関数を適用することもできます。

[25]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
+-----+------+---+---+

共同グループ化と関数の適用。

[26]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def merge_ordered(l, r):
    return pd.merge_ordered(l, r)

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
    merge_ordered, schema='time int, id int, v1 double, v2 string').show()
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+

データの取得/書き出し#

CSV は直接的で使いやすいです。Parquet と ORC は、より高速に読み書きできる効率的でコンパクトなファイル形式です。

PySpark には、JDBC、text、binaryFile、Avro など、他にも多くのデータソースがあります。Apache Spark ドキュメントの最新の Spark SQL, DataFrames and Datasets Guide も参照してください。

CSV#

[27]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

Parquet#

[28]:
df.write.parquet('bar.parquet')
spark.read.parquet('bar.parquet').show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

ORC#

[29]:
df.write.orc('zoo.orc')
spark.read.orc('zoo.orc').show()
+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

SQL の操作#

DataFrame と Spark SQL は同じ実行エンジンを共有しているため、シームレスに相互に利用できます。たとえば、DataFrame をテーブルとして登録し、以下のように簡単に SQL を実行できます。

[30]:
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()
+--------+
|count(1)|
+--------+
|       8|
+--------+

さらに、UDF は登録して SQL でそのまま呼び出すことができます。

[31]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()
+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

これらの SQL 式は、PySpark の列として直接混合して使用できます。

[32]:
from pyspark.sql.functions import expr

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()
+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+