このページでは、さまざまな Apache Spark API を簡単な例でどのように使用できるかを示します。
Spark は、小規模から大規模なデータセットまで対応できる優れたエンジンです。単一ノード/ローカル環境、または分散クラスターで使用できます。Spark の広範な API、優れたパフォーマンス、および柔軟性により、多くの分析に適した選択肢となります。このガイドでは、以下の Spark API を使用した例を示します。
例では、理解しやすいように小規模なデータセットを使用しています。
このセクションでは、Spark DataFrame を作成し、簡単な操作を実行する方法を示します。例は小規模な DataFrame を対象としているため、機能を簡単に確認できます。
Spark Session を作成することから始めましょう
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
一部の Spark ランタイム環境には、事前にインスタンス化された Spark Session が用意されています。getOrCreate() メソッドは、既存の Spark Session を使用するか、Spark Session が存在しない場合は新しい Spark Session を作成します。
Spark DataFrame の作成
first_name と age の列、および 4 行のデータを持つ DataFrame を作成することから始めます。
df = spark.createDataFrame(
[
("sue", 32),
("li", 3),
("bob", 75),
("heo", 13),
],
["first_name", "age"],
)
show() メソッドを使用して、DataFrame の内容を表示します。
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
次に、DataFrame に対してデータ処理操作を実行しましょう。
Spark DataFrame への列の追加
DataFrame に life_stage 列を追加します。この列は、年齢が 12 歳以下の場合は「child」、13 歳から 19 歳の場合は「teenager」、20 歳以上の場合は「adult」を返します。
from pyspark.sql.functions import col, when
df1 = df.withColumn(
"life_stage",
when(col("age") < 13, "child")
.when(col("age").between(13, 19), "teenager")
.otherwise("adult"),
)
Spark DataFrame に列を追加するのは簡単です。df1 の内容を表示しましょう。
df1.show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| li| 3| child|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
元の DataFrame が変更されていないことに注意してください。
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
Spark の操作は DataFrame を変更しません。後続の操作で DataFrame の変更にアクセスするには、結果を新しい変数に割り当てる必要があります。
Spark DataFrame のフィルタリング
次に、DataFrame をフィルタリングして、ティーンエイジャーと大人のみを含めるようにします。
df1.where(col("life_stage").isin(["teenager", "adult"])).show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
Spark DataFrame でのグループ化集計
では、データセット内のすべての人の平均年齢を計算しましょう。
from pyspark.sql.functions import avg
df1.select(avg("age")).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
各 life_stage の平均年齢を計算することもできます。
df1.groupBy("life_stage").avg().show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
Spark では、プログラム API を使用したくない場合に、DataFrame に対して SQL でクエリを実行できます。
DataFrame の SQL によるクエリ
SQL を使用して、すべての人の平均年齢を計算する方法は次のとおりです。
spark.sql("select avg(age) from {df1}", df1=df1).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
SQL を使用して、life_stage ごとの平均年齢を計算する方法は次のとおりです。
spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
Spark では、プログラム API、SQL API、またはその両方の組み合わせを使用できます。この柔軟性により、Spark はさまざまなユーザーが利用でき、強力で表現力豊かになります。
DataFrame を、SQL API から簡単にアクセスできる名前付き Parquet テーブルに永続化しましょう。
df1.write.saveAsTable("some_people")
テーブル名でテーブルにアクセスできることを確認します。
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
+----------+---+----------+
では、SQL を使用してテーブルにいくつかの行を挿入しましょう。
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
テーブルの内容を検査して、行が挿入されたことを確認します。
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
| frank| 4| child|
+----------+---+----------+
ティーンエイジャーを返すクエリを実行します。
spark.sql("select * from some_people where life_stage='teenager'").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
+----------+---+----------+
Spark では、テーブルを簡単に登録し、純粋な SQL でクエリを実行できます。
Spark には Structured Streaming API もあり、バッチまたはリアルタイムのストリーミングアプリケーションを作成できます。
Spark Structured Streaming を使用して、Kafka からデータを読み取り、それを 1 時間ごとに Parquet テーブルに書き込む方法を見てみましょう。
次のようなデータで継続的に入力される Kafka ストリームがあると仮定します。
{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}
Kafka ソースを Spark DataFrame に読み込む方法は次のとおりです。
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", subscribeTopic)
.load()
)
入力データをクリーンアップする関数を作成します。
schema = StructType([
StructField("student_name", StringType()),
StructField("graduation_year", StringType()),
StructField("major", StringType()),
])
def with_normalized_names(df, schema):
parsed_df = (
df.withColumn("json_data", from_json(col("value").cast("string"), schema))
.withColumn("student_name", col("json_data.student_name"))
.withColumn("graduation_year", col("json_data.graduation_year"))
.withColumn("major", col("json_data.major"))
.drop(col("json_data"))
.drop(col("value"))
)
split_col = split(parsed_df["student_name"], "XX")
return (
parsed_df.withColumn("first_name", split_col.getItem(0))
.withColumn("last_name", split_col.getItem(1))
.drop("student_name")
)
次に、実行されるたびに Kafka の新しいデータをすべて読み取る関数を作成します。
def perform_available_now_update():
checkpointPath = "data/tmp_students_checkpoint/"
path = "data/tmp_students"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
availableNow=True
).format("parquet").option("checkpointLocation", checkpointPath).start(path)
perform_available_now_update() 関数を呼び出して、Parquet テーブルの内容を確認します。
cron ジョブを設定して、perform_available_now_update() 関数を 1 時間ごとに実行することで、Parquet テーブルが定期的に更新されるようにすることができます。
Spark RDD API は、非構造化データに適しています。
Spark DataFrame API は、構造化データに対してより簡単でパフォーマンスが高くなります。
次のような 3 行のデータが含まれる some_text.txt という名前のテキストファイルがあると仮定します。
these are words
these are more words
words in english
テキストファイル内の各単語のカウントを計算したいとします。Spark RDD を使用してこの計算を実行する方法は次のとおりです。
text_file = spark.sparkContext.textFile("some_words.txt")
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
結果を見てみましょう。
counts.collect()
[('these', 2),
('are', 2),
('more', 1),
('in', 1),
('words', 3),
('english', 1)]
Spark は、この計算を並列化することで、クエリを効率的に実行できます。他の多くのクエリエンジンは、計算を並列化できません。
これらの例は、Spark が小規模なデータセットでの計算に便利なユーザー API を提供していることを示しました。Spark は、これらの同じコード例を分散クラスター上の大規模なデータセットにスケーリングできます。Spark が大規模および小規模の両方のデータセットを処理できるのは素晴らしいことです。
Spark は、他のクエリエンジンと比較して広範な API も備えています。Spark を使用すると、プログラム API を使用して DataFrame 操作を実行したり、SQL を記述したり、ストリーミング分析を実行したり、機械学習を行ったりできます。Spark を使用すれば、分析を実行するために複数のフレームワークを学習したり、さまざまなライブラリをパッチワークしたりする必要がなくなります。
Spark には、さらに多くの例が配布されています。