第3章:関数ジャンクション - PySparkでのデータ操作#

データのクリーニング#

データサイエンスにおいて、「garbage in, garbage out」(GIGO)とは、不正確、偏見のある、あるいは質の低い情報や入力は、同様の質の出力や結果を生み出すという概念です。分析の質を向上させるためには、データクリーニング、つまり「ゴミを金に変える」プロセスが必要です。これは、データの品質と有用性を向上させるために、エラーや一貫性のないものを特定、修正、または削除するプロセスで構成されています。

不正な値を含むデータフレームから始めましょう

[1]:
!pip install pyspark==4.0.0.dev2
[2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Data Loading and Storage Example") \
    .getOrCreate()
[3]:
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(age=10, height=80.0, NAME="Alice"),
    Row(age=10, height=80.0, NAME="Alice"),
    Row(age=5, height=float("nan"), NAME="BOB"),
    Row(age=None, height=None, NAME="Tom"),
    Row(age=None, height=float("nan"), NAME=None),
    Row(age=9, height=78.9, NAME="josh"),
    Row(age=18, height=1802.3, NAME="bush"),
    Row(age=7, height=75.3, NAME="jerry"),
])

df.show()
+----+------+-----+
| age|height| NAME|
+----+------+-----+
|  10|  80.0|Alice|
|  10|  80.0|Alice|
|   5|   NaN|  BOB|
|NULL|  NULL|  Tom|
|NULL|   NaN| NULL|
|   9|  78.9| josh|
|  18|1802.3| bush|
|   7|  75.3|jerry|
+----+------+-----+

列名の変更#

一見したところ、NAME列が大文字であることがわかります。一貫性のために、DataFrame.withColumnRenamedを使用して列名を変更できます。

[4]:
df2 = df.withColumnRenamed("NAME", "name")

df2.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|  80.0|Alice|
|  10|  80.0|Alice|
|   5|   NaN|  BOB|
|NULL|  NULL|  Tom|
|NULL|   NaN| NULL|
|   9|  78.9| josh|
|  18|1802.3| bush|
|   7|  75.3|jerry|
+----+------+-----+

Null値の削除#

次に、2種類の欠損データがあることに気づくかもしれません。

  • 3つの列すべてにおけるNULL値。

  • 数値列におけるNaNNot a Numberを意味する)値。

有効なnameを持たないレコードは、おそらく役に立たないので、まずそれらを削除しましょう。DataFrameNaFunctionsには欠損値処理のための関数群があり、DataFrame.na.dropまたはDataFrame.dropnaを使用して、NULLまたはNaN値を持つ行を省略できます。

ステップdf2.na.drop(subset="name")の後、無効なレコード(age=None, height=NaN, name=None)は破棄されます。

[5]:
df3 = df2.na.drop(subset="name")

df3.show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  10|  80.0|Alice|
|  10|  80.0|Alice|
|   5|   NaN|  BOB|
|NULL|  NULL|  Tom|
|   9|  78.9| josh|
|  18|1802.3| bush|
|   7|  75.3|jerry|
+----+------+-----+

値の補完#

残りの欠損値については、DataFrame.na.fillまたはDataFrame.fillnaを使用して補完できます。

Dict入力{'age': 10, 'height': 80.1}を使用すると、age列とheight列の値をまとめて指定できます。

[6]:
df4 = df3.na.fill({'age': 10, 'height': 80.1})

df4.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|  80.0|Alice|
| 10|  80.0|Alice|
|  5|  80.1|  BOB|
| 10|  80.1|  Tom|
|  9|  78.9| josh|
| 18|1802.3| bush|
|  7|  75.3|jerry|
+---+------+-----+

外れ値の削除#

上記のステップの後、すべての欠損値は削除または補完されました。しかし、height=1802.3は不合理に見えることがわかります。この種の外れ値を削除するために、(65, 85)のような有効な範囲でデータフレームをフィルタリングできます。

[7]:
df5 = df4.where(df4.height.between(65, 85))

df5.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|  80.0|Alice|
| 10|  80.0|Alice|
|  5|  80.1|  BOB|
| 10|  80.1|  Tom|
|  9|  78.9| josh|
|  7|  75.3|jerry|
+---+------+-----+

重複の削除#

これで、すべての無効なレコードは処理されました。しかし、レコード(age=10, height=80.0, name=Alice)が重複していることに気づきます。このような重複を削除するには、単純にDataFrame.distinctを適用します。

[8]:
df6 = df5.distinct()

df6.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|  80.0|Alice|
|  5|  80.1|  BOB|
| 10|  80.1|  Tom|
|  9|  78.9| josh|
|  7|  75.3|jerry|
+---+------+-----+

文字列操作#

name列には小文字と大文字の両方の文字が含まれています。lower()関数を適用して、すべての文字を小文字に変換できます。

[9]:
from pyspark.sql import functions as sf

df7 = df6.withColumn("name", sf.lower("name"))
df7.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|  80.0|alice|
|  5|  80.1|  bob|
| 10|  80.1|  tom|
|  9|  78.9| josh|
|  7|  75.3|jerry|
+---+------+-----+

より複雑な文字列操作については、udfを使用してPythonの強力な関数を利用することもできます。

[10]:
from pyspark.sql import functions as sf

capitalize = sf.udf(lambda s: s.capitalize())

df8 = df6.withColumn("name", capitalize("name"))
df8.show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|  80.0|Alice|
|  5|  80.1|  Bob|
| 10|  80.1|  Tom|
|  9|  78.9| Josh|
|  7|  75.3|Jerry|
+---+------+-----+

列の並べ替え#

上記のプロセスを経て、データはクリーンになり、データフレームを何らかのストレージに保存する前に列を並べ替えたいと思います。詳細については、前の章「読み込みと驚嘆: データ 読み込み、 ストレージ、 ファイル 形式」を参照してください。

通常、この目的にはDataFrame.selectを使用します。

[11]:
df9 = df7.select("name", "age", "height")

df9.show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|alice| 10|  80.0|
|  bob|  5|  80.1|
|  tom| 10|  80.1|
| josh|  9|  78.9|
|jerry|  7|  75.3|
+-----+---+------+

データの変換#

データエンジニアリングプロジェクトの主な部分は変換です。既存のデータフレームから新しいデータフレームを作成します。

select()での列の選択#

入力テーブルには数百の列が含まれている可能性がありますが、特定のプロジェクトでは、そのうちの小さなサブセットにしか興味がない可能性が高いです。

[12]:
from pyspark.sql import functions as sf
df = spark.range(10)

for i in range(20):
  df = df.withColumn(f"col_{i}", sf.lit(i))

df.show()
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+
| id|col_0|col_1|col_2|col_3|col_4|col_5|col_6|col_7|col_8|col_9|col_10|col_11|col_12|col_13|col_14|col_15|col_16|col_17|col_18|col_19|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+
|  0|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  1|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  2|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  3|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  4|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  5|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  6|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  7|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  8|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
|  9|    0|    1|    2|    3|    4|    5|    6|    7|    8|    9|    10|    11|    12|    13|    14|    15|    16|    17|    18|    19|
+---+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+------+------+------+------+------+------+------+------+------+------+

forループを使用して21列のデータフレームを作成し、その後selectで4列のみを選択します。idcol_2col_3の列は前のデータフレームから直接選択され、sqrt_col_4_plus_5列は数学関数によって生成されます。

pyspark.sql.functionpyspark.sql.Columnには、列操作のための数百もの関数があります。

[13]:

df2 = df.select("id", "col_2", "col_3", sf.sqrt(sf.col("col_4") + sf.col("col_5")).alias("sqrt_col_4_plus_5")) df2.show()
+---+-----+-----+-----------------+
| id|col_2|col_3|sqrt_col_4_plus_5|
+---+-----+-----+-----------------+
|  0|    2|    3|              3.0|
|  1|    2|    3|              3.0|
|  2|    2|    3|              3.0|
|  3|    2|    3|              3.0|
|  4|    2|    3|              3.0|
|  5|    2|    3|              3.0|
|  6|    2|    3|              3.0|
|  7|    2|    3|              3.0|
|  8|    2|    3|              3.0|
|  9|    2|    3|              3.0|
+---+-----+-----+-----------------+

where()での行のフィルタリング#

入力テーブルは非常に巨大で、数十億行が含まれている可能性があり、ごく一部のサブセットにしか興味がない場合もあります。

指定された条件でwhereまたはfilterを使用して行をフィルタリングできます。

たとえば、奇数のid値を持つ行を選択できます。

[14]:
df3 = df2.where(sf.col("id") % 2 == 1)

df3.show()
+---+-----+-----+-----------------+
| id|col_2|col_3|sqrt_col_4_plus_5|
+---+-----+-----+-----------------+
|  1|    2|    3|              3.0|
|  3|    2|    3|              3.0|
|  5|    2|    3|              3.0|
|  7|    2|    3|              3.0|
|  9|    2|    3|              3.0|
+---+-----+-----+-----------------+

データの集計#

データ分析では、通常、データをチャートまたはテーブルに集計して終了します。

[15]:
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(incomes=[123.0, 456.0, 789.0], NAME="Alice"),
    Row(incomes=[234.0, 567.0], NAME="BOB"),
    Row(incomes=[100.0, 200.0, 100.0], NAME="Tom"),
    Row(incomes=[79.0, 128.0], NAME="josh"),
    Row(incomes=[123.0, 145.0, 178.0], NAME="bush"),
    Row(incomes=[111.0, 187.0, 451.0, 188.0, 199.0], NAME="jerry"),
])

df.show()
+--------------------+-----+
|             incomes| NAME|
+--------------------+-----+
|[123.0, 456.0, 78...|Alice|
|      [234.0, 567.0]|  BOB|
|[100.0, 200.0, 10...|  Tom|
|       [79.0, 128.0]| josh|
|[123.0, 145.0, 17...| bush|
|[111.0, 187.0, 45...|jerry|
+--------------------+-----+

たとえば、月ごとの収入が与えられた場合、各名前の平均収入を見つけたいとします。

[16]:
from pyspark.sql import functions as sf

df2 = df.select(sf.lower("NAME").alias("name"), "incomes")

df2.show(truncate=False)
+-----+-----------------------------------+
|name |incomes                            |
+-----+-----------------------------------+
|alice|[123.0, 456.0, 789.0]              |
|bob  |[234.0, 567.0]                     |
|tom  |[100.0, 200.0, 100.0]              |
|josh |[79.0, 128.0]                      |
|bush |[123.0, 145.0, 178.0]              |
|jerry|[111.0, 187.0, 451.0, 188.0, 199.0]|
+-----+-----------------------------------+

explode()を使用したデータの形状変更#

集計を容易にするために、explode()関数を使用してデータの形状を変更できます。

[17]:
df3 = df2.select("name", sf.explode("incomes").alias("income"))

df3.show()
+-----+------+
| name|income|
+-----+------+
|alice| 123.0|
|alice| 456.0|
|alice| 789.0|
|  bob| 234.0|
|  bob| 567.0|
|  tom| 100.0|
|  tom| 200.0|
|  tom| 100.0|
| josh|  79.0|
| josh| 128.0|
| bush| 123.0|
| bush| 145.0|
| bush| 178.0|
|jerry| 111.0|
|jerry| 187.0|
|jerry| 451.0|
|jerry| 188.0|
|jerry| 199.0|
+-----+------+

groupBy()とagg()による集計#

次に、通常DataFrame.groupBy(...).agg(...)を使用してデータを集計します。平均収入を計算するために、集計関数avgを適用できます。

[18]:
df4 = df3.groupBy("name").agg(sf.avg("income").alias("avg_income"))

df4.show()
+-----+------------------+
| name|        avg_income|
+-----+------------------+
|alice|             456.0|
|  bob|             400.5|
|  tom|133.33333333333334|
| josh|             103.5|
| bush|148.66666666666666|
|jerry|             227.2|
+-----+------------------+

Orderby#

最終的な分析のために、通常はデータを並べ替えたいと考えます。この場合、nameでデータを並べ替えることができます。

[19]:
df5 = df4.orderBy("name")

df5.show()
+-----+------------------+
| name|        avg_income|
+-----+------------------+
|alice|             456.0|
|  bob|             400.5|
| bush|148.66666666666666|
|jerry|             227.2|
| josh|             103.5|
|  tom|133.33333333333334|
+-----+------------------+

データフレームの衝突:結合の技術#

複数のデータフレームを扱う場合、それらを何らかの方法で組み合わせる必要が生じることがよくあります。最も頻繁に使用されるアプローチは結合です。

たとえば、incomesデータとheightデータが与えられた場合、DataFrame.joinを使用してnameでそれらを結合できます。

最終結果には、alicejoshbushのみが含まれることがわかります。なぜなら、これらは両方のデータフレームに存在するためです。

[20]:
from pyspark.sql import Row

df1 = spark.createDataFrame([
    Row(age=10, height=80.0, name="alice"),
    Row(age=9, height=78.9, name="josh"),
    Row(age=18, height=82.3, name="bush"),
    Row(age=7, height=75.3, name="tom"),
])

df2 = spark.createDataFrame([
    Row(incomes=[123.0, 456.0, 789.0], name="alice"),
    Row(incomes=[234.0, 567.0], name="bob"),
    Row(incomes=[79.0, 128.0], name="josh"),
    Row(incomes=[123.0, 145.0, 178.0], name="bush"),
    Row(incomes=[111.0, 187.0, 451.0, 188.0, 199.0], name="jerry"),
])
[21]:
df3 = df1.join(df2, on="name")

df3.show(truncate=False)
+-----+---+------+---------------------+
|name |age|height|incomes              |
+-----+---+------+---------------------+
|alice|10 |80.0  |[123.0, 456.0, 789.0]|
|bush |18 |82.3  |[123.0, 145.0, 178.0]|
|josh |9  |78.9  |[79.0, 128.0]        |
+-----+---+------+---------------------+

7つの結合方法があります: - INNER - LEFT - RIGHT - FULL - CROSS - LEFTSEMI - LEFTANTI

そして、デフォルトはINNERです。

別の例としてLEFT結合を取り上げましょう。左結合は、2つのテーブルのうち最初の(左の)テーブルのすべてのレコードを含みます。たとえ2番目の(右の)テーブルのレコードに対応する値がない場合でもです。

[22]:
df4 = df1.join(df2, on="name", how="left")

df4.show(truncate=False)
+-----+---+------+---------------------+
|name |age|height|incomes              |
+-----+---+------+---------------------+
|alice|10 |80.0  |[123.0, 456.0, 789.0]|
|josh |9  |78.9  |[79.0, 128.0]        |
|bush |18 |82.3  |[123.0, 145.0, 178.0]|
|tom  |7  |75.3  |NULL                 |
+-----+---+------+---------------------+

そして、RIGHT結合は、右側のテーブルのすべてのレコードを保持します。

[23]:
df5 = df1.join(df2, on="name", how="right")

df5.show(truncate=False)
+-----+----+------+-----------------------------------+
|name |age |height|incomes                            |
+-----+----+------+-----------------------------------+
|alice|10  |80.0  |[123.0, 456.0, 789.0]              |
|bob  |NULL|NULL  |[234.0, 567.0]                     |
|josh |9   |78.9  |[79.0, 128.0]                      |
|bush |18  |82.3  |[123.0, 145.0, 178.0]              |
|jerry|NULL|NULL  |[111.0, 187.0, 451.0, 188.0, 199.0]|
+-----+----+------+-----------------------------------+