Apache Spark™ の例

このページでは、さまざまな Apache Spark API を簡単な例でどのように使用できるかを示します。

Spark は、小規模から大規模なデータセットまで対応できる優れたエンジンです。単一ノード/ローカル環境、または分散クラスターで使用できます。Spark の広範な API、優れたパフォーマンス、および柔軟性により、多くの分析に適した選択肢となります。このガイドでは、以下の Spark API を使用した例を示します。

  • DataFrame
  • SQL
  • Structured Streaming
  • RDD

例では、理解しやすいように小規模なデータセットを使用しています。

Spark DataFrame の例

このセクションでは、Spark DataFrame を作成し、簡単な操作を実行する方法を示します。例は小規模な DataFrame を対象としているため、機能を簡単に確認できます。

Spark Session を作成することから始めましょう

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("demo").getOrCreate()

一部の Spark ランタイム環境には、事前にインスタンス化された Spark Session が用意されています。getOrCreate() メソッドは、既存の Spark Session を使用するか、Spark Session が存在しない場合は新しい Spark Session を作成します。

Spark DataFrame の作成

first_nameage の列、および 4 行のデータを持つ DataFrame を作成することから始めます。

df = spark.createDataFrame(
    [
        ("sue", 32),
        ("li", 3),
        ("bob", 75),
        ("heo", 13),
    ],
    ["first_name", "age"],
)

show() メソッドを使用して、DataFrame の内容を表示します。

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

次に、DataFrame に対してデータ処理操作を実行しましょう。

Spark DataFrame への列の追加

DataFrame に life_stage 列を追加します。この列は、年齢が 12 歳以下の場合は「child」、13 歳から 19 歳の場合は「teenager」、20 歳以上の場合は「adult」を返します。

from pyspark.sql.functions import col, when

df1 = df.withColumn(
    "life_stage",
    when(col("age") < 13, "child")
    .when(col("age").between(13, 19), "teenager")
    .otherwise("adult"),
)

Spark DataFrame に列を追加するのは簡単です。df1 の内容を表示しましょう。

df1.show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|        li|  3|     child|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

元の DataFrame が変更されていないことに注意してください。

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       sue| 32|
|        li|  3|
|       bob| 75|
|       heo| 13|
+----------+---+

Spark の操作は DataFrame を変更しません。後続の操作で DataFrame の変更にアクセスするには、結果を新しい変数に割り当てる必要があります。

Spark DataFrame のフィルタリング

次に、DataFrame をフィルタリングして、ティーンエイジャーと大人のみを含めるようにします。

df1.where(col("life_stage").isin(["teenager", "adult"])).show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       sue| 32|     adult|
|       bob| 75|     adult|
|       heo| 13|  teenager|
+----------+---+----------+

Spark DataFrame でのグループ化集計

では、データセット内のすべての人の平均年齢を計算しましょう。

from pyspark.sql.functions import avg

df1.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

life_stage の平均年齢を計算することもできます。

df1.groupBy("life_stage").avg().show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

Spark では、プログラム API を使用したくない場合に、DataFrame に対して SQL でクエリを実行できます。

DataFrame の SQL によるクエリ

SQL を使用して、すべての人の平均年齢を計算する方法は次のとおりです。

spark.sql("select avg(age) from {df1}", df1=df1).show()

+--------+
|avg(age)|
+--------+
|   30.75|
+--------+

SQL を使用して、life_stage ごとの平均年齢を計算する方法は次のとおりです。

spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()

+----------+--------+
|life_stage|avg(age)|
+----------+--------+
|     adult|    53.5|
|     child|     3.0|
|  teenager|    13.0|
+----------+--------+

Spark では、プログラム API、SQL API、またはその両方の組み合わせを使用できます。この柔軟性により、Spark はさまざまなユーザーが利用でき、強力で表現力豊かになります。

Spark SQL の例

DataFrame を、SQL API から簡単にアクセスできる名前付き Parquet テーブルに永続化しましょう。

df1.write.saveAsTable("some_people")

テーブル名でテーブルにアクセスできることを確認します。

spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
+----------+---+----------+

では、SQL を使用してテーブルにいくつかの行を挿入しましょう。

spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")

テーブルの内容を検査して、行が挿入されたことを確認します。

spark.sql("select * from some_people").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
|       sue| 32|     adult|
|       bob| 75|     adult|
|        li|  3|     child|
|     frank|  4|     child|
+----------+---+----------+

ティーンエイジャーを返すクエリを実行します。

spark.sql("select * from some_people where life_stage='teenager'").show()

+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
|       heo| 13|  teenager|
+----------+---+----------+

Spark では、テーブルを簡単に登録し、純粋な SQL でクエリを実行できます。

Spark Structured Streaming の例

Spark には Structured Streaming API もあり、バッチまたはリアルタイムのストリーミングアプリケーションを作成できます。

Spark Structured Streaming を使用して、Kafka からデータを読み取り、それを 1 時間ごとに Parquet テーブルに書き込む方法を見てみましょう。

次のようなデータで継続的に入力される Kafka ストリームがあると仮定します。

{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}

Kafka ソースを Spark DataFrame に読み込む方法は次のとおりです。

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", subscribeTopic)
    .load()
)

入力データをクリーンアップする関数を作成します。

schema = StructType([
 StructField("student_name", StringType()),
 StructField("graduation_year", StringType()),
 StructField("major", StringType()),
])

def with_normalized_names(df, schema):
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .withColumn("student_name", col("json_data.student_name"))
        .withColumn("graduation_year", col("json_data.graduation_year"))
        .withColumn("major", col("json_data.major"))
        .drop(col("json_data"))
        .drop(col("value"))
    )
    split_col = split(parsed_df["student_name"], "XX")
    return (
        parsed_df.withColumn("first_name", split_col.getItem(0))
        .withColumn("last_name", split_col.getItem(1))
        .drop("student_name")
    )

次に、実行されるたびに Kafka の新しいデータをすべて読み取る関数を作成します。

def perform_available_now_update():
    checkpointPath = "data/tmp_students_checkpoint/"
    path = "data/tmp_students"
    return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
        availableNow=True
    ).format("parquet").option("checkpointLocation", checkpointPath).start(path)

perform_available_now_update() 関数を呼び出して、Parquet テーブルの内容を確認します。

cron ジョブを設定して、perform_available_now_update() 関数を 1 時間ごとに実行することで、Parquet テーブルが定期的に更新されるようにすることができます。

Spark RDD の例

Spark RDD API は、非構造化データに適しています。

Spark DataFrame API は、構造化データに対してより簡単でパフォーマンスが高くなります。

次のような 3 行のデータが含まれる some_text.txt という名前のテキストファイルがあると仮定します。

these are words
these are more words
words in english

テキストファイル内の各単語のカウントを計算したいとします。Spark RDD を使用してこの計算を実行する方法は次のとおりです。

text_file = spark.sparkContext.textFile("some_words.txt")

counts = (
    text_file.flatMap(lambda line: line.split(" "))
    .map(lambda word: (word, 1))
    .reduceByKey(lambda a, b: a + b)
)

結果を見てみましょう。

counts.collect()

[('these', 2),
 ('are', 2),
 ('more', 1),
 ('in', 1),
 ('words', 3),
 ('english', 1)]

Spark は、この計算を並列化することで、クエリを効率的に実行できます。他の多くのクエリエンジンは、計算を並列化できません。

結論

これらの例は、Spark が小規模なデータセットでの計算に便利なユーザー API を提供していることを示しました。Spark は、これらの同じコード例を分散クラスター上の大規模なデータセットにスケーリングできます。Spark が大規模および小規模の両方のデータセットを処理できるのは素晴らしいことです。

Spark は、他のクエリエンジンと比較して広範な API も備えています。Spark を使用すると、プログラム API を使用して DataFrame 操作を実行したり、SQL を記述したり、ストリーミング分析を実行したり、機械学習を行ったりできます。Spark を使用すれば、分析を実行するために複数のフレームワークを学習したり、さまざまなライブラリをパッチワークしたりする必要がなくなります。

追加の例

Spark には、さらに多くの例が配布されています。