このページでは、簡単なサンプルを使って、さまざまな Apache Spark API の使い方を紹介します。
Spark は、小規模および大規模なデータセットに適した優れたエンジンです。シングルノード/ローカルホスト環境や分散クラスターで使用できます。Spark の広範な API、優れたパフォーマンス、および柔軟性により、多くの分析に適したオプションとなっています。このガイドでは、次の Spark API を使用した例を示します。
これらのサンプルでは、理解しやすいように、小さなデータセットを使用しています。
このセクションでは、Spark データフレームを作成し、簡単な操作を実行する方法を示します。サンプルは小さなデータフレームなので、機能を簡単に理解できます。
まず、Sparkセッションを作成しましょう。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("demo").getOrCreate()
一部の Spark ランタイム環境には、事前にインスタンス化された Spark セッションが付属しています。getOrCreate()
メソッドは、既存の Spark セッションを使用するか、まだ存在しない場合は新しい Spark セッションを作成します。
Spark データフレームの作成
first_name
列と age
列、および4行のデータを持つデータフレームを作成することから始めましょう。
df = spark.createDataFrame(
[
("sue", 32),
("li", 3),
("bob", 75),
("heo", 13),
],
["first_name", "age"],
)
show()
メソッドを使用して、データフレームの内容を表示します。
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
次に、データフレームに対していくつかのデータ処理操作を実行しましょう。
Spark データフレームに列を追加する
年齢が12歳以下の場合は「child」、年齢が13歳から19歳の場合は「teenager」、年齢が20歳以上の場合は「adult」を返す life_stage
列をデータフレームに追加しましょう。
from pyspark.sql.functions import col, when
df1 = df.withColumn(
"life_stage",
when(col("age") < 13, "child")
.when(col("age").between(13, 19), "teenager")
.otherwise("adult"),
)
Spark データフレームに列を追加するのは簡単です。df1
の内容を見てみましょう。
df1.show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| li| 3| child|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
元のデータフレームが変更されていないことに注意してください。
df.show()
+----------+---+
|first_name|age|
+----------+---+
| sue| 32|
| li| 3|
| bob| 75|
| heo| 13|
+----------+---+
Spark の操作はデータフレームをミューテートしません。後続の操作でデータフレームの変更にアクセスするには、結果を新しい変数に割り当てる必要があります。
Spark データフレームのフィルタリング
次に、10代と大人だけを含むようにデータフレームをフィルタリングします。
df1.where(col("life_stage").isin(["teenager", "adult"])).show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| sue| 32| adult|
| bob| 75| adult|
| heo| 13| teenager|
+----------+---+----------+
Spark データフレームでのグループ化による集計
次に、データセット内の全員の平均年齢を計算しましょう。
from pyspark.sql.functions import avg
df1.select(avg("age")).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
各 life_stage
の平均年齢を計算することもできます。
df1.groupBy("life_stage").avg().show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
プログラムによる API を使用したくない場合は、Spark を使用して SQL でデータフレームに対してクエリを実行できます。
SQL でデータフレームをクエリする
SQL を使用して全員の平均年齢を計算する方法は次のとおりです。
spark.sql("select avg(age) from {df1}", df1=df1).show()
+--------+
|avg(age)|
+--------+
| 30.75|
+--------+
また、SQL を使用して life_stage
ごとの平均年齢を計算する方法は次のとおりです。
spark.sql("select life_stage, avg(age) from {df1} group by life_stage", df1=df1).show()
+----------+--------+
|life_stage|avg(age)|
+----------+--------+
| adult| 53.5|
| child| 3.0|
| teenager| 13.0|
+----------+--------+
Spark を使用すると、プログラムによる API、SQL API、または両方の組み合わせを使用できます。この柔軟性により、Spark はさまざまなユーザーにとってアクセスしやすく、表現力が豊かになります。
SQL API を介して簡単にアクセスできる名前付き Parquet テーブルにデータフレームを永続化しましょう。
df1.write.saveAsTable("some_people")
テーブルがテーブル名でアクセス可能であることを確認してください。
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
+----------+---+----------+
次に、SQL を使用して、いくつかの行のデータをテーブルに挿入しましょう。
spark.sql("INSERT INTO some_people VALUES ('frank', 4, 'child')")
テーブルの内容を調べて、行が挿入されたことを確認します。
spark.sql("select * from some_people").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
| sue| 32| adult|
| bob| 75| adult|
| li| 3| child|
| frank| 4| child|
+----------+---+----------+
10代を返すクエリを実行します。
spark.sql("select * from some_people where life_stage='teenager'").show()
+----------+---+----------+
|first_name|age|life_stage|
+----------+---+----------+
| heo| 13| teenager|
+----------+---+----------+
Spark を使用すると、テーブルを登録し、純粋な SQL でクエリを実行することが簡単になります。
Spark には、バッチまたはリアルタイムのストリーミングアプリケーションを作成できる構造化ストリーミング API もあります。
Spark 構造化ストリーミングを使用して、Kafka からデータを読み取り、Parquet テーブルに毎時間書き込む方法を見てみましょう。
次のデータが継続的に入力される Kafka ストリームがあるとします。
{"student_name":"someXXperson", "graduation_year":"2023", "major":"math"}
{"student_name":"liXXyao", "graduation_year":"2025", "major":"physics"}
Kafka ソースを Spark データフレームに読み込む方法は次のとおりです。
df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", subscribeTopic)
.load()
)
入力データをクリーンアップする関数を作成します。
schema = StructType([
StructField("student_name", StringType()),
StructField("graduation_year", StringType()),
StructField("major", StringType()),
])
def with_normalized_names(df, schema):
parsed_df = (
df.withColumn("json_data", from_json(col("value").cast("string"), schema))
.withColumn("student_name", col("json_data.student_name"))
.withColumn("graduation_year", col("json_data.graduation_year"))
.withColumn("major", col("json_data.major"))
.drop(col("json_data"))
.drop(col("value"))
)
split_col = split(parsed_df["student_name"], "XX")
return (
parsed_df.withColumn("first_name", split_col.getItem(0))
.withColumn("last_name", split_col.getItem(1))
.drop("student_name")
)
次に、実行されるたびに Kafka 内のすべての新しいデータを読み取る関数を作成します。
def perform_available_now_update():
checkpointPath = "data/tmp_students_checkpoint/"
path = "data/tmp_students"
return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
availableNow=True
).format("parquet").option("checkpointLocation", checkpointPath).start(path)
perform_available_now_update()
関数を呼び出して、Parquet テーブルの内容を確認します。
perform_available_now_update()
関数を毎時間実行するように cron ジョブを設定して、Parquet テーブルが定期的に更新されるようにできます。
Spark RDD API は、非構造化データに適しています。
Spark データフレーム API は、構造化データに対してより簡単でパフォーマンスが向上します。
some_text.txt
という名前のテキストファイルに、次の3行のデータがあるとします。
these are words
these are more words
words in english
テキストファイル内の各単語の数を計算する必要があります。Spark RDD を使用してこの計算を実行する方法は次のとおりです。
text_file = spark.sparkContext.textFile("some_words.txt")
counts = (
text_file.flatMap(lambda line: line.split(" "))
.map(lambda word: (word, 1))
.reduceByKey(lambda a, b: a + b)
)
結果を見てみましょう。
counts.collect()
[('these', 2),
('are', 2),
('more', 1),
('in', 1),
('words', 3),
('english', 1)]
Spark はこの計算を並列化するため、クエリを効率的に実行できます。他の多くのクエリエンジンは、計算を並列化することができません。
これらの例では、Spark が小規模なデータセットの計算に優れたユーザー API を提供する方法を示しました。Spark は、これらの同じコード例を分散クラスター上の大規模なデータセットにスケーリングできます。Spark が大規模なデータセットと小規模なデータセットの両方を処理できるのは素晴らしいことです。
また、Spark は他のクエリエンジンと比較して広範な API を備えています。Spark を使用すると、プログラムによる API でデータフレーム操作を実行したり、SQL を記述したり、ストリーミング分析を実行したり、機械学習を実行したりできます。Spark を使用すると、複数のフレームワークを学習したり、さまざまなライブラリをパッチ適用して分析を実行したりする必要がなくなります。
多くの追加サンプルが Spark と共に配布されています。