構造化ストリーミングプログラミングガイド

概要

構造化ストリーミングは、Spark SQLエンジン上に構築されたスケーラブルで耐障害性のあるストリーム処理エンジンです。静的データに対するバッチ計算を表現するのと同じ方法で、ストリーミング計算を表現できます。Spark SQLエンジンは、ストリーミングデータが到着し続けるにつれて、それをインクリメンタルかつ継続的に実行し、最終結果を更新します。Scala、Java、Python、またはRのDataset/DataFrame APIを使用して、ストリーミング集計、イベント時間ウィンドウ、ストリーム-バッチ結合などを表現できます。計算は、同じ最適化されたSpark SQLエンジン上で実行されます。最後に、システムは、チェックポイントとWrite-Ahead Logsを通じて、エンドツーエンドの正確な1回限りの耐障害性を保証します。つまり、構造化ストリーミングは、ユーザーがストリーミングについて考えなくても、高速、スケーラブル、耐障害性、エンドツーエンドの正確な1回限りのストリーム処理を提供します。

内部的には、デフォルトでは、構造化ストリーミングクエリはマイクロバッチ処理エンジンを使用して処理され、データストリームを一連の小さなバッチジョブとして処理することにより、100ミリ秒という低いエンドツーエンドのレイテンシと、正確な1回限りの耐障害性を実現します。ただし、Spark 2.3以降では、継続処理と呼ばれる新しい低レイテンシ処理モードが導入されており、少なくとも1回限りの保証で1ミリ秒という低いエンドツーエンドのレイテンシを実現できます。クエリでDataset/DataFrame操作を変更せずに、アプリケーションの要件に基づいてモードを選択できます。

このガイドでは、プログラミングモデルとAPIについて説明します。主にデフォルトのマイクロバッチ処理モデルを使用して概念を説明し、後で継続処理モデルについて説明します。まず、構造化ストリーミングクエリの簡単な例であるストリーミング単語数から始めましょう。

簡単な例

TCPソケットでリッスンしているデータサーバーから受信したテキストデータの実行中の単語数を維持したいとします。構造化ストリーミングを使用してこれをどのように表現できるかを見てみましょう。完全なコードは、Scala/Java/Python/Rで確認できます。また、Sparkをダウンロードすると、例を直接実行できます。いずれにせよ、例をステップごとに見て、どのように機能するかを理解しましょう。まず、必要なクラスをインポートし、Sparkに関連するすべての機能の開始点であるローカルSparkSessionを作成する必要があります。

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder
  .appName("StructuredNetworkWordCount")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();
sparkR.session(appName = "StructuredNetworkWordCount")

次に、localhost:9999でリッスンしているサーバーから受信したテキストデータを表すストリーミングDataFrameを作成し、DataFrameを変換して単語数を計算しましょう。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

このlines DataFrameは、ストリーミングテキストデータを含む無制限のテーブルを表します。このテーブルには「value」という名前の文字列の1つの列が含まれており、ストリーミングテキストデータの各行がテーブルの行になります。これは現在、変換を設定しているだけであり、まだ開始していないため、データを受信していないことに注意してください。次に、組み込みのSQL関数であるsplitとexplodeを使用して、各行を単語ごとの複数の行に分割しました。さらに、alias関数を使用して、新しい列に「word」という名前を付けます。最後に、Dataset内の一意の値でグループ化してカウントすることにより、wordCounts DataFrameを定義しました。これは、ストリームの実行中の単語数を表すストリーミングDataFrameであることに注意してください。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))

// Generate running word count
val wordCounts = words.groupBy("value").count()

このlines DataFrameは、ストリーミングテキストデータを含む無制限のテーブルを表します。このテーブルには「value」という名前の文字列の1つの列が含まれており、ストリーミングテキストデータの各行がテーブルの行になります。これは現在、変換を設定しているだけであり、まだ開始していないため、データを受信していないことに注意してください。次に、.as[String]を使用してDataFrameをStringのDatasetに変換し、flatMap操作を適用して各行を複数の単語に分割できるようにしました。結果のwords Datasetには、すべての単語が含まれています。最後に、Dataset内の一意の値でグループ化してカウントすることにより、wordCounts DataFrameを定義しました。これは、ストリームの実行中の単語数を表すストリーミングDataFrameであることに注意してください。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

このlines DataFrameは、ストリーミングテキストデータを含む無制限のテーブルを表します。このテーブルには「value」という名前の文字列の1つの列が含まれており、ストリーミングテキストデータの各行がテーブルの行になります。これは現在、変換を設定しているだけであり、まだ開始していないため、データを受信していないことに注意してください。次に、.as(Encoders.STRING())を使用してDataFrameをStringのDatasetに変換し、flatMap操作を適用して各行を複数の単語に分割できるようにしました。結果のwords Datasetには、すべての単語が含まれています。最後に、Dataset内の一意の値でグループ化してカウントすることにより、wordCounts DataFrameを定義しました。これは、ストリームの実行中の単語数を表すストリーミングDataFrameであることに注意してください。

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines <- read.stream("socket", host = "localhost", port = 9999)

# Split the lines into words
words <- selectExpr(lines, "explode(split(value, ' ')) as word")

# Generate running word count
wordCounts <- count(group_by(words, "word"))

このlines SparkDataFrameは、ストリーミングテキストデータを含む無制限のテーブルを表します。このテーブルには「value」という名前の文字列の1つの列が含まれており、ストリーミングテキストデータの各行がテーブルの行になります。これは現在、変換を設定しているだけであり、まだ開始していないため、データを受信していないことに注意してください。次に、splitとexplodeの2つのSQL関数を含むSQL式を使用して、各行を単語ごとの複数の行に分割しました。さらに、新しい列に「word」という名前を付けます。最後に、SparkDataFrame内の一意の値でグループ化してカウントすることにより、wordCounts SparkDataFrameを定義しました。これは、ストリームの実行中の単語数を表すストリーミングSparkDataFrameであることに注意してください。

ストリーミングデータに対するクエリを設定しました。残っているのは、実際にデータを受信してカウントを計算することだけです。これを行うために、カウントが更新されるたびに、カウントの完全なセットをコンソールに出力(outputMode("complete")で指定)するように設定します。そして、start()を使用してストリーミング計算を開始します。

 # Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();
# Start running the query that prints the running counts to the console
query <- write.stream(wordCounts, "console", outputMode = "complete")

awaitTermination(query)

このコードが実行されると、ストリーミング計算がバックグラウンドで開始されます。queryオブジェクトは、アクティブなストリーミングクエリへのハンドルであり、クエリがアクティブな間、プロセスが終了しないように、awaitTermination()を使用してクエリの終了を待機することにしました。

実際にこのサンプルコードを実行するには、独自のSparkアプリケーションでコードをコンパイルするか、Sparkをダウンロードしたら、サンプルを実行します。後者を示しています。まず、次のコマンドを使用して、Netcat(ほとんどのUnixのようなシステムにある小さなユーティリティ)をデータサーバーとして実行する必要があります。

$ nc -lk 9999

次に、別のターミナルで、次のコマンドを使用してサンプルを開始できます。

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

次に、netcatサーバーを実行しているターミナルに入力された行は、1秒ごとにカウントされて画面に表示されます。次のようになります。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999
apache spark
apache hadoop



















...
# TERMINAL 2: RUNNING structured_network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING StructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...
# TERMINAL 2: RUNNING structured_network_wordcount.R

$ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999

-------------------------------------------
Batch: 0
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    1|
| spark|    1|
+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----+
| value|count|
+------+-----+
|apache|    2|
| spark|    1|
|hadoop|    1|
+------+-----+
...

プログラミングモデル

構造化ストリーミングの重要なアイデアは、ライブデータストリームを継続的に追加されるテーブルとして扱うことです。これにより、バッチ処理モデルと非常によく似た新しいストリーム処理モデルが生まれます。静的テーブルでの標準のバッチのようなクエリとしてストリーミング計算を表現すると、Sparkはそれをアンバウンド入力テーブルに対する増分クエリとして実行します。このモデルをより詳細に理解しましょう。

基本概念

入力データストリームを「入力テーブル」として考えてください。ストリームに到着するすべてのデータ項目は、入力テーブルに追加される新しい行のようなものです。

Stream as a Table

入力に対するクエリは、「結果テーブル」を生成します。トリガー間隔(たとえば、1秒ごと)に、新しい行が入力テーブルに追加され、最終的に結果テーブルが更新されます。結果テーブルが更新されるたびに、変更された結果行を外部シンクに書き込みたいと思うでしょう。

Model

「出力」は、外部ストレージに書き出されるものとして定義されます。出力は異なるモードで定義できます。

各モードは特定の種類のクエリに適用できることに注意してください。これについては、後で詳しく説明します。

このモデルの使用例を説明するために、上記のクイック例のコンテキストでモデルを理解しましょう。最初のlines DataFrameは入力テーブルであり、最終的なwordCounts DataFrameは結果テーブルです。ストリーミングlines DataFrameに対してwordCountsを生成するクエリは、静的なDataFrameの場合とまったく同じであることに注意してください。ただし、このクエリが開始されると、Sparkはソケット接続からの新しいデータを継続的にチェックします。新しいデータがある場合、Sparkは以前の実行中のカウントと新しいデータを組み合わせて更新されたカウントを計算する「増分」クエリを実行します。以下に示すように。

Model

構造化ストリーミングはテーブル全体を実体化しないことに注意してください。ストリーミングデータソースから利用可能な最新のデータを読み取り、結果を更新するために増分的に処理し、ソースデータを破棄します。結果を更新するために必要な最小限の中間状態データのみを保持します(たとえば、前の例の中間カウント)。

このモデルは、他の多くのストリーム処理エンジンとは大きく異なります。多くのストリーミングシステムでは、ユーザーが実行中の集計を自分で維持する必要があり、フォールトトレランス、およびデータの整合性(少なくとも1回、または最大1回、または正確に1回)について推論する必要があります。このモデルでは、Sparkは新しいデータがある場合に結果テーブルを更新する責任があり、ユーザーはそれについて推論する必要がありません。例として、このモデルがイベント時間ベースの処理と遅延到着データをどのように処理するかを見てみましょう。

イベント時間と遅延データの処理

イベント時間とは、データ自体に埋め込まれている時間です。多くのアプリケーションでは、このイベント時間に基づいて操作したい場合があります。たとえば、IoTデバイスが1分ごとに生成したイベントの数を知りたい場合は、おそらくSparkがそれらを受信した時間ではなく、データが生成された時間(つまり、データ内のイベント時間)を使用したいでしょう。このイベント時間は、このモデルで非常に自然に表現されます。デバイスからの各イベントはテーブル内の行であり、イベント時間は行の列の値です。これにより、ウィンドウベースの集計(例:1分ごとのイベント数)は、イベント時間列でのグループ化と集計の特別なタイプになります。各時間ウィンドウはグループであり、各行は複数のウィンドウ/グループに属することができます。したがって、このようなイベント時間ウィンドウベースの集計クエリは、静的データセット(例:収集されたデバイスイベントログから)とデータストリームの両方で一貫して定義できるため、ユーザーの生活がはるかに楽になります。

さらに、このモデルは、イベント時間に基づいて予想よりも遅れて到着したデータを自然に処理します。Sparkは結果テーブルを更新しているため、遅延データがある場合の古い集計の更新と、中間状態データのサイズを制限するための古い集計のクリーンアップを完全に制御できます。Spark 2.1以降では、ウォーターマークをサポートしており、ユーザーは遅延データのしきい値を指定でき、エンジンはそれに応じて古い状態をクリーンアップできます。これらについては、ウィンドウ操作セクションで後で詳しく説明します。

耐障害性セマンティクス

エンドツーエンドの正確に1回のセマンティクスを提供することは、構造化ストリーミングの設計の背後にある重要な目標の1つでした。これを実現するために、構造化ストリーミングのソース、シンク、および実行エンジンを設計して、処理の正確な進行状況を確実に追跡し、再起動や再処理によってあらゆる種類の障害を処理できるようにしました。すべてのストリーミングソースは、ストリーム内の読み取り位置を追跡するためのオフセット(KafkaオフセットまたはKinesisシーケンス番号と同様)を持っていると想定されます。エンジンは、チェックポイントと書き込み先行ログを使用して、各トリガーで処理されるデータのオフセット範囲を記録します。ストリーミングシンクは、再処理を処理するためにべき等であるように設計されています。ストリーミングソースとべき等シンクを使用して、構造化ストリーミングは、いかなる障害が発生した場合でもエンドツーエンドの正確に1回のセマンティクスを保証できます。

DatasetsとDataFramesを使用したAPI

Spark 2.0以降では、DataFrameとDatasetは、静的で境界のあるデータだけでなく、ストリーミングで境界のないデータも表すことができます。静的なDataset/DataFrameと同様に、共通のエントリーポイントであるSparkSessionScala/Java/Python/Rドキュメント)を使用して、ストリーミングソースからストリーミングDataFrame/Datasetを作成し、静的なDataFrame/Datasetと同じ操作を適用できます。Dataset/DataFrameに慣れていない場合は、DataFrame/Datasetプログラミングガイドを使用して慣れておくことを強くお勧めします。

ストリーミングDataFrameとストリーミングDatasetの作成

ストリーミングDataFrameは、SparkSession.readStream()によって返されるDataStreamReaderインターフェース(Scala/Java/Pythonドキュメント)を介して作成できます。Rでは、read.stream()メソッドを使用します。静的なDataFrameを作成するための読み取りインターフェースと同様に、ソースの詳細(データ形式、スキーマ、オプションなど)を指定できます。

入力ソース

いくつかの組み込みソースがあります。

一部のソースは、障害発生後にチェックポイントされたオフセットを使用してデータを再生できることを保証しないため、フォールトトレラントではありません。以前のフォールトトレラントセマンティクスに関するセクションを参照してください。以下に、Sparkのすべてのソースの詳細を示します。

ソース オプション フォールトトレラント
ファイルソース path:入力ディレクトリへのパス。すべてのファイル形式に共通です。
maxFilesPerTrigger:すべてのトリガーで考慮する新しいファイルの最大数(デフォルト:最大なし)
latestFirst:最新の新しいファイルを最初に処理するかどうか。ファイルのバックログが多い場合に役立ちます(デフォルト:false)
fileNameOnly:新しいファイルを、フルパスではなくファイル名のみに基づいてチェックするかどうか(デフォルト:false)。これを`true`に設定すると、次のファイルは、ファイル名 "dataset.txt" が同じであるため、同じファイルと見なされます。
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"
maxFileAge:無視される前に、このディレクトリで見つけることができるファイルの最大経過時間。最初のバッチでは、すべてのファイルが有効と見なされます。latestFirstが`true`に設定され、maxFilesPerTriggerが設定されている場合、このパラメーターは無視されます。これは、有効であり、処理する必要がある古いファイルが無視される可能性があるためです。最大経過時間は、現在のシステムのタイムスタンプではなく、最新ファイルのタイムスタンプに関して指定されます(デフォルト:1週間)。
cleanSource:処理後に完了したファイルをクリーンアップするオプション。
利用可能なオプションは、「archive」、「delete」、「off」です。オプションが指定されていない場合、デフォルト値は「off」です。
「archive」が指定されている場合は、追加オプションのsourceArchiveDirも指定する必要があります。「sourceArchiveDir」の値は、ソースパターンと深さ(ルートディレクトリからのディレクトリ数)が一致してはなりません。ここで、深さは両方のパスでの深さの最小値です。これにより、アーカイブされたファイルが新しいソースファイルとして含まれることはありません。
たとえば、ソースパターンとして「/hello?/spark/*」を指定した場合、「/hello1/spark/archive/dir」は「sourceArchiveDir」の値として使用できません。「/hello?/spark/*」と「/hello1/spark/archive」が一致するためです。「/hello1/spark」も「sourceArchiveDir」の値として使用できません。「/hello?/spark」と「/hello1/spark」が一致するためです。「/archived/here」は一致しないため、問題ありません。
Sparkは、独自のパスを尊重してソースファイルを移動します。たとえば、ソースファイルのパスが/a/b/dataset.txtで、アーカイブディレクトリのパスが/archived/hereの場合、ファイルは/archived/here/a/b/dataset.txtに移動されます。
注:完了したファイルのアーカイブ(移動による)または削除の両方で、各マイクロバッチでオーバーヘッド(別のスレッドで発生している場合でも、速度低下)が発生するため、このオプションを有効にする前に、ファイルシステムでの各操作のコストを理解する必要があります。一方、このオプションを有効にすると、ソースファイルを一覧表示するコストを削減できます。これは高価な操作になる可能性があります。
完了したファイルクリーナーで使用されるスレッド数は、spark.sql.streaming.fileSource.cleaner.numThreads(デフォルト:1)で設定できます。
注2:このオプションを有効にする場合は、複数のソースまたはクエリからソースパスを使用しないでください。同様に、ソースパスがファイルストリームシンクの出力ディレクトリ内のファイルと一致しないことを確認する必要があります。
注3:削除と移動の両方のアクションは、ベストエフォートです。ファイルの削除または移動に失敗しても、ストリーミングクエリは失敗しません。Sparkは、特定の状況下(たとえば、アプリケーションが正常にシャットダウンしない、クリーンアップするためにキューに入れられたファイルが多すぎるなど)では、一部のソースファイルをクリーンアップしない場合があります。

ファイル形式固有のオプションについては、DataStreamReaderの関連メソッドを参照してください(Scala/Java/Python/R)。例:「parquet」形式のオプションについては、DataStreamReader.parquet()を参照してください。

さらに、特定のファイル形式に影響を与えるセッション構成があります。詳細については、SQLプログラミングガイドを参照してください。例:「parquet」については、Parquet構成セクションを参照してください。
はい globパスをサポートしますが、カンマ区切りの複数のパス/globはサポートしません。
ソケットソース host:接続先のホスト。指定必須。
port:接続先のポート。指定必須。
いいえ
レートソース rowsPerSecond (例: 100、デフォルト: 1): 1秒あたりに生成される行数。

rampUpTime (例: 5s、デフォルト: 0s): 生成速度がrowsPerSecondになるまでにかかる時間。秒より細かい粒度を使用すると、整数秒に切り捨てられます。

numPartitions (例: 10、デフォルト: Sparkのデフォルト並列度): 生成される行のパーティション数。

ソースはrowsPerSecondに到達するように最善を尽くしますが、クエリがリソース制約を受ける可能性があり、目的の速度に到達するためにnumPartitionsを調整できます。
はい
マイクロバッチごとのレートソース (形式: rate-micro-batch) rowsPerBatch (例: 100): マイクロバッチごとに生成される行数。

numPartitions (例: 10、デフォルト: Sparkのデフォルト並列度): 生成される行のパーティション数。

startTimestamp (例: 1000、デフォルト: 0): 生成された時間の開始値。

advanceMillisPerBatch (例: 1000、デフォルト: 1000): 各マイクロバッチで生成された時間が進む時間量。

はい
Kafkaソース Kafka統合ガイドを参照してください。 はい

いくつかの例を以下に示します。

spark = SparkSession. ...

# Read text from socket
socketDF = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources

socketDF.printSchema()

# Read all the csv files written atomically in a directory
userSchema = StructType().add("name", "string").add("age", "integer")
csvDF = spark \
    .readStream \
    .option("sep", ";") \
    .schema(userSchema) \
    .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
val spark: SparkSession = ...

// Read text from socket
val socketDF = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

socketDF.isStreaming    // Returns True for DataFrames that have streaming sources

socketDF.printSchema

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
SparkSession spark = ...

// Read text from socket
Dataset<Row> socketDF = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources

socketDF.printSchema();

// Read all the csv files written atomically in a directory
StructType userSchema = new StructType().add("name", "string").add("age", "integer");
Dataset<Row> csvDF = spark
  .readStream()
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
sparkR.session(...)

# Read text from socket
socketDF <- read.stream("socket", host = hostname, port = port)

isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources

printSchema(socketDF)

# Read all the csv files written atomically in a directory
schema <- structType(structField("name", "string"),
                     structField("age", "integer"))
csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")

これらの例では、型なしのストリーミング DataFrame を生成します。つまり、DataFrame のスキーマはコンパイル時にはチェックされず、クエリが送信されるランタイム時にのみチェックされます。mapflatMap などの一部の操作では、コンパイル時に型が既知である必要があります。これらを実行するには、静的な DataFrame と同じメソッドを使用して、これらの型なしストリーミング DataFrame を型付きストリーミング Dataset に変換できます。詳細については、SQL プログラミングガイドを参照してください。さらに、サポートされているストリーミングソースの詳細については、ドキュメントの後半で説明します。

Spark 3.1 以降では、DataStreamReader.table() を使用してテーブルからストリーミング DataFrame を作成することもできます。詳細については、ストリーミングテーブル APIを参照してください。

ストリーミングDataFrame/Datasetのスキーマ推論とパーティション

デフォルトでは、ファイルベースのソースからの Structured Streaming では、Spark が自動的に推論するのではなく、スキーマを指定する必要があります。この制限により、障害が発生した場合でも、ストリーミングクエリに一貫したスキーマが使用されることが保証されます。アドホックなユースケースでは、spark.sql.streaming.schemaInferencetrue に設定することで、スキーマ推論を再度有効にできます。

/key=value/ という名前のサブディレクトリが存在する場合、パーティション検出が発生し、リストはこれらのディレクトリに自動的に再帰します。これらの列がユーザー提供のスキーマに表示される場合、Spark は読み込まれるファイルのパスに基づいてそれらを埋めます。パーティショニングスキームを構成するディレクトリは、クエリの開始時に存在する必要があり、静的なままである必要があります。たとえば、/data/year=2015/ が存在するときに /data/year=2016/ を追加することは問題ありませんが、パーティショニング列を変更する (つまり、ディレクトリ /data/date=2016-04-17/ を作成する) ことは無効です。

ストリーミングDataFrame/Datasetに対する操作

ストリーミング DataFrame/Dataset に対して、型なしの SQL のような操作 (例: selectwheregroupBy) から、型付きの RDD のような操作 (例: mapfilterflatMap) まで、あらゆる種類の操作を適用できます。詳細については、SQL プログラミングガイドを参照してください。使用できるいくつかの操作の例を見てみましょう。

基本操作 - 選択、射影、集計

DataFrame/Dataset に対する一般的な操作のほとんどは、ストリーミングでサポートされています。サポートされていないいくつかの操作については、このセクションの後で説明します

df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
df.select("device").where("signal > 10")

# Running count of the number of updates for each device type
df.groupBy("deviceType").count()
case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)

val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10")      // using untyped APIs
ds.filter(_.signal > 10).map(_.device)         // using typed APIs

// Running count of the number of updates for each device type
df.groupBy("deviceType").count()                          // using untyped API

// Running average signal for each device type
import org.apache.spark.sql.expressions.scalalang.typed
ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
import org.apache.spark.api.java.function.*;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.javalang.typed;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;

public class DeviceData {
  private String device;
  private String deviceType;
  private Double signal;
  private java.sql.Date time;
  ...
  // Getter and setter methods for each field
}

Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data

// Select the devices which have signal more than 10
df.select("device").where("signal > 10"); // using untyped APIs
ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
  .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());

// Running count of the number of updates for each device type
df.groupBy("deviceType").count(); // using untyped API

// Running average signal for each device type
ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
  .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }

# Select the devices which have signal more than 10
select(where(df, "signal > 10"), "device")

# Running count of the number of updates for each device type
count(groupBy(df, "deviceType"))

ストリーミング DataFrame/Dataset を一時ビューとして登録し、SQL コマンドを適用することもできます。

df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  # returns another streaming DF
df.createOrReplaceTempView("updates")
spark.sql("select count(*) from updates")  // returns another streaming DF
df.createOrReplaceTempView("updates");
spark.sql("select count(*) from updates");  // returns another streaming DF
createOrReplaceTempView(df, "updates")
sql("select count(*) from updates")

DataFrame/Dataset にストリーミングデータがあるかどうかは、df.isStreaming を使用して識別できることに注意してください。

df.isStreaming()
df.isStreaming
df.isStreaming()
isStreaming(df)

クエリプランを確認することをお勧めします。Spark はストリーミングデータセットに対する SQL ステートメントの解釈中にステートフルな操作を注入する可能性があります。ステートフルな操作がクエリプランに注入されたら、ステートフルな操作での考慮事項 (例: 出力モード、ウォーターマーク、状態ストアのサイズ管理など) を考慮してクエリを確認する必要がある場合があります。

イベント時間でのウィンドウ操作

スライディングイベントタイムウィンドウでの集計は、Structured Streaming を使用すると簡単で、グループ化された集計と非常によく似ています。グループ化された集計では、集計値 (例: カウント) は、ユーザー指定のグループ化列の一意の値ごとに保持されます。ウィンドウベースの集計の場合、集計値は、行のイベントタイムが該当するウィンドウごとに保持されます。これを例で理解してみましょう。

クイック例が変更され、ストリームに、行が生成された時刻とともに、行が含まれるようになったと想像してください。単語数を実行する代わりに、10分間のウィンドウ内で、5分ごとに更新される単語数をカウントする必要があります。つまり、10分間のウィンドウ 12:00 - 12:10、12:05 - 12:15、12:10 - 12:20 などで受信した単語の単語数です。12:00 - 12:10 は、12:00 以降 12:10 より前に到着したデータを意味することに注意してください。ここで、12:07 に受信した単語を考えます。この単語は、2 つのウィンドウ 12:00 - 12:10 と 12:05 - 12:15 に対応するカウントを増やす必要があります。したがって、カウントはグループ化キー (つまり、単語) とウィンドウ (イベントタイムから計算可能) の両方によってインデックス付けされます。

結果テーブルは次のようになります。

Window Operations

このウィンドウ処理はグループ化に似ているため、コードでは、groupBy() および window() 操作を使用して、ウィンドウ化された集計を表現できます。以下の例の完全なコードは、Scala/Java/Python で確認できます。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

遅延データとウォーターマークの処理

ここで、イベントの 1 つがアプリケーションに遅れて到着した場合に何が起こるかについて考えてみましょう。たとえば、12:04 (つまり、イベントタイム) に生成された単語が、12:11 にアプリケーションによって受信されたとします。アプリケーションは、12:11 の代わりに 12:04 という時間を使用して、ウィンドウ 12:00 - 12:10 の古いカウントを更新する必要があります。これはウィンドウベースのグループ化で自然に発生します。Structured Streaming は、遅延データが古いウィンドウの集計を正しく更新できるように、部分集計の中間状態を長期間維持できます。以下に図を示します。

Handling Late Data

ただし、このクエリを数日間実行するには、システムが累積する中間メモリ内の状態の量を制限する必要があります。これは、アプリケーションがその集計の遅延データを受信しなくなるため、古い集計をメモリ内の状態から削除できる時期をシステムが知る必要があることを意味します。これを有効にするために、Spark 2.1 では、ウォーターマーキングが導入されました。これにより、エンジンはデータ内の現在のイベントタイムを自動的に追跡し、それに応じて古い状態をクリーンアップしようとします。イベントタイム列と、イベントタイムに関してデータが遅れると予想されるしきい値を指定することにより、クエリのウォーターマークを定義できます。時間 T で終了する特定のウィンドウの場合、エンジンは状態を維持し、(エンジンで確認された最大イベントタイム - 遅延しきい値 > T) まで、遅延データが状態を更新できるようにします。言い換えれば、しきい値内の遅延データは集計されますが、しきい値より後のデータは破棄され始めます (正確な保証については、このセクションの後で参照)。例でこれを理解してみましょう。以下に示すように、withWatermark() を使用して、前の例でウォーターマーキングを簡単に定義できます。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(words.timestamp, "10 minutes", "5 minutes"),
        words.word) \
    .count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window($"timestamp", "10 minutes", "5 minutes"),
        $"word")
    .count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),
        col("word"))
    .count();
words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group

words <- withWatermark(words, "timestamp", "10 minutes")
windowedCounts <- count(
                    groupBy(
                      words,
                      window(words$timestamp, "10 minutes", "5 minutes"),
                      words$word))

この例では、クエリのウォーターマークを列「timestamp」の値で定義し、データの遅延が許容されるしきい値として「10 分」を定義しています。このクエリが更新出力モード (この後の出力モードセクションで説明) で実行される場合、エンジンは、ウィンドウが、列「timestamp」の現在のイベントタイムより 10 分遅れているウォーターマークより古くなるまで、結果テーブルのウィンドウのカウントを更新し続けます。次に図を示します。

Watermarking in Update Mode

図に示すように、エンジンによって追跡される最大イベントタイムは青い破線であり、すべてのトリガーの開始時に (最大イベントタイム - '10 分') として設定されたウォーターマークは赤い線です。たとえば、エンジンがデータ (12:14, dog) を観測すると、次のトリガーのウォーターマークを 12:04 に設定します。このウォーターマークにより、エンジンは、遅延データがカウントされるように、さらに 10 分間、中間状態を維持できます。たとえば、データ (12:09, cat) は順序が異なり遅延しており、ウィンドウ 12:00 - 12:1012:05 - 12:15 に該当します。これは、トリガーのウォーターマーク 12:04 よりまだ前であるため、エンジンは中間カウントを状態として保持し、関連するウィンドウのカウントを正しく更新します。ただし、ウォーターマークが 12:11 に更新されると、ウィンドウ (12:00 - 12:10) の中間状態はクリアされ、後続のすべてのデータ (例: (12:04, donkey)) は「遅すぎる」と見なされ、無視されます。すべてのトリガーの後、更新されたカウント (つまり、紫色の行) は、更新モードで指定されたトリガー出力としてシンクに書き込まれることに注意してください。

一部のシンク (例: ファイル) は、更新モードに必要な細かい更新をサポートしていない可能性があります。これらを操作するために、最終カウントのみがシンクに書き込まれる、追加モードもサポートしています。以下に図を示します。

ストリーミングではない Dataset で withWatermark を使用しても何も影響がないことに注意してください。ウォーターマークはバッチクエリに影響を与えないため、直接無視されます。

Watermarking in Append Mode

以前の更新モードと同様に、エンジンは各ウィンドウの中間カウントを保持します。ただし、部分的なカウントは結果テーブルに更新されず、シンクにも書き込まれません。エンジンは、遅延した日付がカウントされるまで「10分」待ち、次にウォーターマーク < ウィンドウの中間状態を破棄し、最終的なカウントを結果テーブル/シンクに追加します。たとえば、ウィンドウ 12:00 - 12:10 の最終的なカウントは、ウォーターマークが 12:11 に更新された後にのみ結果テーブルに追加されます。

時間ウィンドウのタイプ

Sparkは、タンブリング(固定)、スライディング、セッションの3種類の時間ウィンドウをサポートしています。

The types of time windows

タンブリングウィンドウは、固定サイズで、重複せず、連続した時間間隔のシリーズです。入力は単一のウィンドウにのみバインドできます。

スライディングウィンドウは、「固定サイズ」である点ではタンブリングウィンドウと似ていますが、スライドの期間がウィンドウの期間よりも小さい場合はウィンドウが重複する可能性があり、この場合、入力は複数のウィンドウにバインドできます。

タンブリングウィンドウとスライディングウィンドウは、上記の例で説明した window 関数を使用します。

セッションウィンドウは、前の2つのタイプとは異なる特性を持っています。セッションウィンドウは、入力に応じてウィンドウ長のサイズが動的です。セッションウィンドウは入力で開始し、次の入力がギャップ期間内に受信された場合、自身を拡張します。静的なギャップ期間の場合、セッションウィンドウは、最新の入力を受信した後、ギャップ期間内に入力が受信されなかったときに閉じます。

セッションウィンドウは、session_window 関数を使用します。この関数の使い方は、window 関数と同様です。

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window(events.timestamp, "5 minutes"),
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window(col("timestamp"), "5 minutes"),
        col("userId"))
    .count();

静的な値の代わりに、入力行に基づいてギャップ期間を動的に指定する式を提供することもできます。負またはゼロのギャップ期間を持つ行は、集計から除外されることに注意してください。

動的なギャップ期間では、セッションウィンドウの終了は最新の入力には依存しなくなります。セッションウィンドウの範囲は、クエリの実行中にイベント開始時刻と評価されたギャップ期間によって決定されるすべてのイベントの範囲の和集合です。

from pyspark.sql import functions as sf

events = ...  # streaming DataFrame of schema { timestamp: Timestamp, userId: String }

session_window = session_window(events.timestamp, \
    sf.when(events.userId == "user1", "5 seconds") \
    .when(events.userId == "user2", "20 seconds").otherwise("5 minutes"))

# Group the data by session window and userId, and compute the count of each group
sessionizedCounts = events \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        session_window,
        events.userId) \
    .count()
import spark.implicits._

val events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

val sessionWindow = session_window($"timestamp", when($"userId" === "user1", "5 seconds")
  .when($"userId" === "user2", "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        Column(sessionWindow),
        $"userId")
    .count()
Dataset<Row> events = ... // streaming DataFrame of schema { timestamp: Timestamp, userId: String }

SessionWindow sessionWindow = session_window(col("timestamp"), when(col("userId").equalTo("user1"), "5 seconds")
  .when(col("userId").equalTo("user2"), "20 seconds")
  .otherwise("5 minutes"))

// Group the data by session window and userId, and compute the count of each group
Dataset<Row> sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        new Column(sessionWindow),
        col("userId"))
    .count();

ストリーミングクエリでセッションウィンドウを使用する場合は、以下のような制限があることに注意してください。

バッチクエリの場合、グローバルウィンドウ(グループ化キーに session_window のみを持つ)がサポートされています。

デフォルトでは、Sparkはセッションウィンドウの集計に対して部分集計を実行しません。これは、グループ化の前にローカルパーティションで追加のソートが必要になるためです。ローカルパーティションの同じグループキーに少数の入力行しかない場合は、より効果的ですが、ローカルパーティションに同じグループキーを持つ多数の入力行がある場合は、部分集計を実行することで、追加のソートにもかかわらず、パフォーマンスを大幅に向上させることができます。

Sparkに部分集計を実行させるには、spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition を有効にできます。

時間ウィンドウの時間表現

一部のユースケースでは、時間ウィンドウで時間に関する表現を抽出して、タイムスタンプを必要とする操作を時間ウィンドウ化されたデータに適用する必要があります。1つの例は、時間ウィンドウに対する別の時間ウィンドウを定義したい、連鎖的な時間ウィンドウ集計です。たとえば、5分の時間ウィンドウを1時間のタンブル時間ウィンドウとして集計したいとします。

これを実現する方法は、以下の2つがあります。

  1. 時間ウィンドウ列をパラメータとして使用して、window_time SQL関数を使用します。
  2. 時間ウィンドウ列をパラメータとして使用して、window SQL関数を使用します。

window_time 関数は、時間ウィンドウの時間を表すタイムスタンプを生成します。ユーザーは、結果を window 関数のパラメータ(またはタイムスタンプを必要とする任意の場所)に渡して、タイムスタンプを必要とする時間ウィンドウで操作を実行できます。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window(window_time($"window"), "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window(functions.window_time("window"), "1 hour"),
  windowedCounts.col("word")
).count();

window 関数は、タイムスタンプ列だけでなく、時間ウィンドウ列も取得します。これは、ユーザーが連鎖的な時間ウィンドウ集計を適用したい場合に特に役立ちます。

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(windowedCounts.window, "1 hour"),
    windowedCounts.word
).count()
import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words.groupBy(
  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
  words.col("word")
).count();

// Group the windowed data by another window and word and compute the count of each group
Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
  functions.window("window", "1 hour"),
  windowedCounts.col("word")
).count();
集計状態をクリーンアップするためのウォーターマークの条件

集計クエリでウォーターマークが状態をクリーンアップするには、次の条件を満たす必要があることに注意してください(Spark 2.1.1時点、将来変更される可能性があります)

ウォーターマークを使用した集計のセマンティック保証

結合操作

Structured Streamingは、ストリーミングDataset/DataFrameと静的Dataset/DataFrame、および別のストリーミングDataset/DataFrameを結合することをサポートしています。ストリーミング結合の結果は、前のセクションのストリーミング集計の結果と同様に、段階的に生成されます。このセクションでは、上記のケースでサポートされている結合の種類(つまり、内部、外部、セミなど)について説明します。サポートされているすべての結合タイプでは、ストリーミングDataset/DataFrameとの結合の結果は、ストリームに同じデータを含む静的Dataset/DataFrameの場合とまったく同じになることに注意してください。

ストリーム-静的結合

Spark 2.0での導入以来、Structured Streamingは、ストリーミングDataFrame/Datasetと静的DataFrame/Dataset間の結合(内部結合および一部のタイプの外部結合)をサポートしています。簡単な例を次に示します。

staticDf = spark.read. ...
streamingDf = spark.readStream. ...
streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  # left outer join with a static DF
val staticDf = spark.read. ...
val streamingDf = spark.readStream. ...

streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer")  // left outer join with a static DF
Dataset<Row> staticDf = spark.read(). ...;
Dataset<Row> streamingDf = spark.readStream(). ...;
streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
streamingDf.join(staticDf, "type", "left_outer");  // left outer join with a static DF
staticDf <- read.df(...)
streamingDf <- read.stream(...)
joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
joined <- join(
            streamingDf,
            staticDf,
            streamingDf$value == staticDf$value,
            "left_outer")  # left outer join with a static DF

ストリーム-静的結合はステートフルではないため、状態管理は必要ないことに注意してください。ただし、一部のタイプのストリーム-静的外部結合はまだサポートされていません。これらは、この結合セクションの最後にリストされています。

ストリーム-ストリーム結合

Spark 2.3では、ストリーム-ストリーム結合のサポートを追加しました。つまり、2つのストリーミングDataset/DataFrameを結合できます。2つのデータストリーム間で結合結果を生成する際の課題は、任意の時点でデータセットのビューが結合の両側で不完全であり、入力間のマッチングを見つけるのがはるかに難しいことです。一方の入力ストリームから受信した行は、もう一方の入力ストリームからの将来の、まだ受信されていない行と一致する可能性があります。したがって、両方の入力ストリームについて、過去の入力をストリーミング状態としてバッファリングし、すべての将来の入力を過去の入力と照合して、それに応じて結合された結果を生成できるようにします。さらに、ストリーミング集計と同様に、遅延、順序が異なるデータを自動的に処理し、ウォーターマークを使用して状態を制限できます。サポートされているさまざまなタイプのストリーム-ストリーム結合とその使用方法について説明します。

オプションのウォーターマーク付きの内部結合

任意の種類の結合条件とともに、任意の種類の列の内部結合がサポートされています。ただし、ストリームが実行されると、すべての過去の入力は、新しい入力が過去の任意の入力と一致する可能性があるため、保存する必要があるため、ストリーミング状態のサイズは無期限に増え続けます。境界のない状態を回避するには、過去の入力が無期限に将来の入力と一致することがないため、状態からクリアできるような追加の結合条件を定義する必要があります。言い換えれば、結合で次の追加の手順を実行する必要があります。

  1. エンジンが入力の遅延時間を認識できるように、両方の入力でウォーターマーク遅延を定義します(ストリーミング集計と同様)。

  2. エンジンが一方の入力の古い行が他方の入力との一致に必要ではなくなる(つまり、時間制約を満たさなくなる)時期を把握できるように、2つの入力間でイベント時間に関する制約を定義します。この制約は、次の2つの方法のいずれかで定義できます。

    1. 時間範囲結合条件(例:...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR

    2. イベント時間ウィンドウでの結合(例:...JOIN ON leftTimeWindow = rightTimeWindow

例を見て理解しましょう。

たとえば、広告が表示されたタイミングの広告インプレッションのストリームと、収益化可能なクリックにつながったインプレッションを関連付けるための広告に対するユーザーのクリックの別のストリームを結合したいとします。このストリーム-ストリーム結合で状態のクリーンアップを許可するには、次のようにウォーターマーク遅延と時間制約を指定する必要があります。

  1. ウォーターマーク遅延:たとえば、インプレッションと対応するクリックは、イベント時間で最大2時間と3時間それぞれ遅延/順序が異なる可能性があるとします。

  2. イベント時間の範囲条件:たとえば、クリックは、対応するインプレッションの後、0秒から1時間の時間範囲内で発生する可能性があります。

コードは次のようになります。

from pyspark.sql.functions import expr

impressions = spark.readStream. ...
clicks = spark.readStream. ...

# Apply watermarks on event-time columns
impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

# Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import org.apache.spark.sql.functions.expr

val impressions = spark.readStream. ...
val clicks = spark.readStream. ...

// Apply watermarks on event-time columns
val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """)
)
import static org.apache.spark.sql.functions.expr

Dataset<Row> impressions = spark.readStream(). ...
Dataset<Row> clicks = spark.readStream(). ...

// Apply watermarks on event-time columns
Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");

// Join with event-time constraints
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour ")
);
impressions <- read.stream(...)
clicks <- read.stream(...)

# Apply watermarks on event-time columns
impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")

# Join with event-time constraints
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"
)))
ウォーターマークを使用したストリーム-ストリーム内部結合のセマンティック保証

これは、ウォーターマーキングによる集計で提供される保証と同様です。「2時間」のウォーターマーク遅延は、エンジンが2時間未満の遅延のデータを破棄しないことを保証します。ただし、2時間以上遅延したデータは処理される場合もあれば、処理されない場合もあります。

ウォーターマーク付きの外部結合

ウォーターマークとイベント時間の制約は、内部結合ではオプションですが、外部結合では指定する必要があります。これは、外部結合でNULLの結果を生成するためには、エンジンが入力行が将来的に何も一致しないことを知る必要があるためです。したがって、正しい結果を生成するためには、ウォーターマークとイベント時間の制約を指定する必要があります。したがって、外部結合を含むクエリは、以前の広告収益化の例と非常によく似ていますが、外部結合であることを指定する追加のパラメータがあります。

impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
 )
impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);
joined <- join(
  impressionsWithWatermark,
  clicksWithWatermark,
  expr(
    paste(
      "clickAdId = impressionAdId AND",
      "clickTime >= impressionTime AND",
      "clickTime <= impressionTime + interval 1 hour"),
  "left_outer"                 # can be "inner", "left_outer", "right_outer", "full_outer", "left_semi"
))
ウォーターマーキングによるストリーム間外部結合のセマンティック保証

外部結合は、ウォーターマークの遅延とデータが破棄されるかどうかに関して、内部結合と同じ保証を持ちます。

注意点

外部結果がどのように生成されるかに関して、いくつかの重要な特性に注意する必要があります。

ウォーターマーク付きの準結合

セミ結合は、右側と一致する左側の関係から値を返します。これは、左セミ結合とも呼ばれます。外部結合と同様に、セミ結合にはウォーターマークとイベント時間の制約を指定する必要があります。これは、左側の一致しない入力行を削除するために、エンジンが左側の入力行が将来的に右側の何も一致しないことを知る必要があるためです。

ウォーターマーキングによるストリーム間セミ結合のセマンティック保証

セミ結合は、ウォーターマークの遅延とデータが破棄されるかどうかに関して、内部結合と同じ保証を持ちます。

ストリーミングクエリにおける結合のサポートマトリックス
左入力 右入力 結合タイプ
静的 静的 すべてのタイプ ストリーミングクエリに存在する場合でも、ストリーミングデータ上ではないため、サポートされています
ストリーム 静的 内部 サポートされており、ステートフルではありません
左外部 サポートされており、ステートフルではありません
右外部 サポートされていません
完全外部 サポートされていません
左セミ サポートされており、ステートフルではありません
静的 ストリーム 内部 サポートされており、ステートフルではありません
左外部 サポートされていません
右外部 サポートされており、ステートフルではありません
完全外部 サポートされていません
左セミ サポートされていません
ストリーム ストリーム 内部 サポートされています。両側にウォーターマークと状態クリーンアップの時間制約をオプションで指定します
左外部 条件付きでサポートされています。正しい結果を得るには右側にウォーターマークと時間制約を指定する必要があり、すべての状態クリーンアップには左側にウォーターマークをオプションで指定します
右外部 条件付きでサポートされています。正しい結果を得るには左側にウォーターマークと時間制約を指定する必要があり、すべての状態クリーンアップには右側にウォーターマークをオプションで指定します
完全外部 条件付きでサポートされています。正しい結果を得るには片側にウォーターマークと時間制約を指定する必要があり、すべての状態クリーンアップにはもう一方の側にウォーターマークをオプションで指定します
左セミ 条件付きでサポートされています。正しい結果を得るには右側にウォーターマークと時間制約を指定する必要があり、すべての状態クリーンアップには左側にウォーターマークをオプションで指定します

サポートされている結合に関する追加の詳細

追加出力モードでは、結合の前後に、集計、重複排除、ストリーム間結合などのマップのような操作以外の操作を含むクエリを構築できます。

たとえば、以下は、両方のストリームでの時間ウィンドウ集計と、それに続くイベント時間ウィンドウを使用したストリーム間結合の例です

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")
Dataset<Row> clicksWindow = clicksWithWatermark
  .groupBy(functions.window(clicksWithWatermark.col("clickTime"), "1 hour"))
  .count();

Dataset<Row> impressionsWindow = impressionsWithWatermark
  .groupBy(functions.window(impressionsWithWatermark.col("impressionTime"), "1 hour"))
  .count();

clicksWindow.join(impressionsWindow, "window", "inner");
clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

以下は、時間範囲結合条件を使用したストリーム間結合と、それに続く時間ウィンドウ集計の別の例です

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()
Dataset<Row> joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr(
    "clickAdId = impressionAdId AND " +
    "clickTime >= impressionTime AND " +
    "clickTime <= impressionTime + interval 1 hour "),
  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
);

joined
  .groupBy(joined.col("clickAdId"), functions.window(joined.col("clickTime"), "1 hour"))
  .count();
joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

ストリーミング重複排除

イベント内の一意の識別子を使用して、データストリーム内のレコードを重複排除できます。これは、一意の識別子列を使用する静的な重複排除とまったく同じです。クエリは、重複レコードをフィルター処理できるように、以前のレコードから必要な量のデータを保存します。集計と同様に、ウォーターマーキングの有無にかかわらず重複排除を使用できます。

streamingDf = spark.readStream. ...

# Without watermark using guid column
streamingDf.dropDuplicates("guid")

# With watermark using guid and eventTime columns
streamingDf \
  .withWatermark("eventTime", "10 seconds") \
  .dropDuplicates("guid", "eventTime")
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid")

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// Without watermark using guid column
streamingDf.dropDuplicates("guid");

// With watermark using guid and eventTime columns
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
streamingDf <- read.stream(...)

# Without watermark using guid column
streamingDf <- dropDuplicates(streamingDf, "guid")

# With watermark using guid and eventTime columns
streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")

特にストリーミングの場合、ウォーターマークの時間範囲内で、イベント内の一意の識別子を使用してデータストリーム内のレコードを重複排除できます。たとえば、ウォーターマークの遅延しきい値を「1時間」に設定した場合、1時間以内に発生した重複イベントを正しく重複排除できます。(詳細については、dropDuplicatesWithinWatermark の API ドキュメントを参照してください。)

これは、イベント時間列を一意の識別子の一部にすることができないユースケースに対処するために使用できます。ほとんどの場合、イベント時間が同じレコードでも何らかの形で異なるケースが原因です。(例:非べき等ライターで、書き込み時にイベント時間の発行が発生する場合)

ユーザーは、重複イベント間の最大タイムスタンプ差よりも長いウォーターマークの遅延しきい値を設定することをお勧めします。

この機能には、ストリーミング DataFrame/Dataset で遅延しきい値を持つウォーターマークを設定する必要があります。

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
  .withWatermark("eventTime", "10 hours") \
  .dropDuplicatesWithinWatermark("guid")
val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid")
Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...

// deduplicate using guid column with watermark based on eventTime column
streamingDf
  .withWatermark("eventTime", "10 hours")
  .dropDuplicatesWithinWatermark("guid");

複数のウォーターマークを処理するポリシー

ストリーミングクエリには、結合または結合されている複数の入力ストリームを含めることができます。各入力ストリームには、ステートフル操作で許容する必要がある遅延データの異なるしきい値を設定できます。これらのしきい値は、各入力ストリームで withWatermarks("eventTime", delay) を使用して指定します。たとえば、inputStream1inputStream2 の間のストリーム間結合を含むクエリを考えてみましょう。

inputStream1.withWatermark("eventTime1", "1 hour")
  .join(
    inputStream2.withWatermark("eventTime2", "2 hours"),
    joinCondition)

クエリを実行中、構造化ストリーミングは、各入力ストリームで確認された最大イベント時間を個別に追跡し、対応する遅延に基づいてウォーターマークを計算し、ステートフル操作に使用する単一のグローバルウォーターマークを選択します。デフォルトでは、最小値がグローバルウォーターマークとして選択されます。これは、ストリームの1つが他のストリームよりも遅れている場合(たとえば、アップストリームの障害のためにストリームの1つがデータの受信を停止した場合)、データが遅すぎると誤って削除されないようにするためです。言い換えれば、グローバルウォーターマークは最も遅いストリームのペースで安全に移動し、クエリの出力はそれに応じて遅延します。

ただし、場合によっては、最も遅いストリームからのデータを削除することになったとしても、より速く結果を得たい場合があります。Spark 2.4 以降では、SQL 設定 spark.sql.streaming.multipleWatermarkPolicymax に設定することにより、グローバルウォーターマークとして最大値を選択するように複数ウォーターマークポリシーを設定できます(デフォルトは min です)。これにより、グローバルウォーターマークは最速のストリームのペースで移動できます。ただし、副作用として、遅いストリームからのデータは積極的に削除されます。したがって、この構成は慎重に使用してください。

任意のステートフル操作

多くのユースケースでは、集計よりも高度なステートフル操作が必要です。たとえば、多くのユースケースでは、イベントのデータストリームからセッションを追跡する必要があります。このようなセッション化を行うには、任意のタイプのデータを状態として保存し、すべてのトリガーでデータストリームイベントを使用して状態に対して任意の操作を実行する必要があります。Spark 2.2 以降、これは操作 mapGroupsWithState およびより強力な操作 flatMapGroupsWithState を使用して実行できます。どちらの操作でも、ユーザー定義の状態を更新するために、グループ化されたデータセットにユーザー定義のコードを適用できます。具体的な詳細については、API ドキュメント(Scala/Java)と例(Scala/Java)をご覧ください。

Spark はチェックして強制することはできませんが、状態関数は出力モードのセマンティクスに従って実装する必要があります。たとえば、更新モードでは、Spark は状態関数が現在のウォーターマークと許可された遅延レコード遅延よりも古い行を発行することを期待していませんが、追加モードでは状態関数がこれらの行を発行できます。

サポートされていない操作

ストリーミング DataFrame/Dataset でサポートされていない DataFrame/Dataset 操作がいくつかあります。それらのいくつかを以下に示します。

さらに、ストリーミングデータセットでは機能しないデータセットメソッドがいくつかあります。これらは、クエリをすぐに実行して結果を返すアクションであり、ストリーミングデータセットでは意味がありません。むしろ、これらの機能は、ストリーミングクエリを明示的に開始することで実行できます(次のセクションを参照)。

これらの操作のいずれかを試みると、「操作 XYZ はストリーミング DataFrame/Dataset ではサポートされていません」のような AnalysisException が表示されます。それらの一部は Spark の将来のリリースでサポートされる可能性がありますが、ストリーミングデータで効率的に実装するのが根本的に難しいものもあります。たとえば、入力ストリームでの並べ替えは、ストリームで受信したすべてのデータを追跡する必要があるため、サポートされていません。したがって、これは効率的に実行するのが根本的に難しいです。

ステートストア

状態ストアは、読み取りと書き込みの両方の操作を提供するバージョン管理されたキーと値のストアです。構造化ストリーミングでは、バッチ間のステートフル操作を処理するために状態ストアプロバイダーを使用します。2つの組み込み状態ストアプロバイダー実装があります。エンドユーザーは、StateStoreProvider インターフェイスを拡張して、独自の状態ストアプロバイダーを実装することもできます。

HDFSステートストアプロバイダー

HDFSバックエンドのステートストアプロバイダーは、[[StateStoreProvider]]と[[StateStore]]のデフォルト実装であり、すべてのデータは最初の段階ではメモリマップに格納され、その後、HDFS互換のファイルシステムのファイルによってバックアップされます。ストアへのすべての更新は、トランザクションとしてセットで行う必要があり、更新の各セットはストアのバージョンをインクリメントします。これらのバージョンは、ストアの正しいバージョンで更新を再実行(RDD操作での再試行による)し、ストアのバージョンを再生成するために使用できます。

RocksDBステートストア実装

Spark 3.2以降では、新しい組み込みステートストア実装であるRocksDBステートストアプロバイダーが追加されました。

ストリーミングクエリでステートフルな操作(たとえば、ストリーミング集計、ストリーミングdropDuplicates、ストリーム-ストリーム結合、mapGroupsWithState、またはflatMapGroupsWithState)があり、状態に数百万のキーを保持したい場合、JVMガベージコレクション(GC)の一時停止時間が長くなり、マイクロバッチ処理時間が大きく変動する問題に直面する可能性があります。これは、HDFSBackedStateStoreの実装により、状態データがエグゼキューターのJVMメモリに保持され、多数の状態オブジェクトがJVMにメモリ負荷をかけ、GCの一時停止時間が長くなるために発生します。

このような場合、RocksDBに基づくより最適化された状態管理ソリューションを使用することを選択できます。このソリューションでは、状態をJVMメモリに保持するのではなく、RocksDBを使用してネイティブメモリとローカルディスクで状態を効率的に管理します。さらに、この状態に対する変更はすべて、構造化ストリーミングによって提供されたチェックポイントの場所に自動的に保存されるため、完全なフォールトトレランス保証(デフォルトの状態管理と同じ)が提供されます。

新しい組み込みステートストア実装を有効にするには、spark.sql.streaming.stateStore.providerClassorg.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProviderに設定します。

以下は、ステートストアプロバイダーのRocksDBインスタンスに関する構成です。

構成名 説明 デフォルト値
spark.sql.streaming.stateStore.rocksdb.compactOnCommit コミット操作でRocksDBインスタンスの範囲コンパクションを実行するかどうか False
spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled RocksDB StateStoreコミット中にスナップショットの代わりに変更ログをアップロードするかどうか False
spark.sql.streaming.stateStore.rocksdb.blockSizeKB RocksDBのデフォルトSSTファイル形式であるRocksDB BlockBasedTableのブロックごとにパックされるユーザーデータの概算サイズ(KB単位)。 4
spark.sql.streaming.stateStore.rocksdb.blockCacheSizeMB ブロックのキャッシュのサイズ容量(MB単位)。 8
spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs RocksDBインスタンスのロード操作でロックを取得するための待機時間(ミリ秒単位)。 60000
spark.sql.streaming.stateStore.rocksdb.maxOpenFiles RocksDBインスタンスで使用できるオープンファイルの数。-1の値は、開かれたファイルが常に開いたままになることを意味します。オープンファイルの制限に達した場合、RocksDBはオープンファイルキャッシュからエントリを削除し、それらのファイル記述子を閉じてキャッシュからエントリを削除します。 -1
spark.sql.streaming.stateStore.rocksdb.resetStatsOnLoad ロード時にRocksDBのすべてのティッカーとヒストグラムの統計をリセットするかどうか。 True
spark.sql.streaming.stateStore.rocksdb.trackTotalNumberOfRows ステートストア内の行の総数を追跡するかどうか。パフォーマンスに関する考慮事項の詳細を参照してください。 True
spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMB RocksDBのMemTableの最大サイズ。-1の値は、RocksDBの内部デフォルト値が使用されることを意味します。 -1
spark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumber RocksDBのMemTableの最大数(アクティブとイミュータブルの両方)。-1の値は、RocksDBの内部デフォルト値が使用されることを意味します。 -1
spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage 単一ノード上のRocksDBステートストアインスタンスの合計メモリ使用量が制限されているかどうか。 false
spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB 単一ノード上のRocksDBステートストアインスタンスの合計メモリ制限(MB単位)。 500
spark.sql.streaming.stateStore.rocksdb.writeBufferCacheRatio maxMemoryUsageMBを使用して、単一ノード上のすべてのRocksDBインスタンスに割り当てられたメモリの割合として、書き込みバッファーによって占有される合計メモリ。 0.5
spark.sql.streaming.stateStore.rocksdb.highPriorityPoolRatio maxMemoryUsageMBを使用して、単一ノード上のすべてのRocksDBインスタンスに割り当てられたメモリの割合として、高優先度プール内のブロックによって占有される合計メモリ。 0.1
RocksDBステートストアのメモリ管理

RocksDBは、memtable、ブロックキャッシュ、フィルター/インデックスブロックなど、さまざまなオブジェクトにメモリを割り当てます。制限がない場合、複数のインスタンスにわたるRocksDBメモリ使用量は無期限に増加し、OOM(メモリ不足)の問題を引き起こす可能性があります。RocksDBは、書き込みバッファーマネージャー機能を使用することで、単一ノードで実行されているすべてのDBインスタンスのメモリ使用量を制限する方法を提供します。spark.sql.streaming.stateStore.rocksdb.boundedMemoryUsage構成をtrueに設定することで、Spark構造化ストリーミングデプロイメントでRocksDBのメモリ使用量を制限できます。spark.sql.streaming.stateStore.rocksdb.maxMemoryUsageMB値を静的な数値、またはノードで使用可能な物理メモリの割合として設定することで、RocksDBインスタンスの最大許容メモリを決定することもできます。個々のRocksDBインスタンスの制限は、spark.sql.streaming.stateStore.rocksdb.writeBufferSizeMBspark.sql.streaming.stateStore.rocksdb.maxWriteBufferNumberを必要な値に設定することによっても構成できます。デフォルトでは、これらの設定にはRocksDB内部デフォルトが使用されます。

RocksDBステートストアの変更ログチェックポイント

新しいバージョンのSparkでは、RocksDBステートストアに変更ログチェックポイントが導入されました。RocksDBステートストアの従来のチェックポイントメカニズムは、インクリメンタルスナップショットチェックポイントであり、RocksDBインスタンスのマニフェストファイルと新しく生成されたRocksDB SSTファイルが永続ストレージにアップロードされます。RocksDBインスタンスのデータファイルをアップロードする代わりに、変更ログのチェックポイントは、永続性のために最後のチェックポイント以降に状態に加えられた変更をアップロードします。スナップショットは、予測可能な障害復旧と変更ログのトリミングのために、バックグラウンドで定期的に永続化されます。変更ログのチェックポイントは、RocksDBインスタンスのスナップショットをキャプチャしてアップロードするコストを回避し、ストリーミングクエリのレイテンシを大幅に削減します。

変更ログチェックポイントはデフォルトで無効になっています。spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled構成をtrueに設定することで、RocksDBステートストアの変更ログチェックポイントを有効にできます。変更ログチェックポイントは、従来のチェックポイントメカニズムとの下位互換性を持つように設計されています。RocksDBステートストアプロバイダーは、2つのチェックポイントメカニズム間の移行を双方向でシームレスにサポートします。これにより、古い状態のチェックポイントを破棄することなく、変更ログチェックポイントのパフォーマンス上の利点を活用できます。変更ログチェックポイントをサポートするSparkのバージョンでは、Sparkセッションで変更ログチェックポイントを有効にすることで、古いバージョンのSparkから変更ログチェックポイントにストリーミングクエリを移行できます。逆に、新しいバージョンのSparkで変更ログチェックポイントを安全に無効にすることができ、変更ログチェックポイントですでに実行されているクエリは従来のチェックポイントに戻ります。チェックポイントメカニズムの変更を適用するにはストリーミングクエリを再起動する必要がありますが、その過程でパフォーマンスの低下は見られません。

パフォーマンス面の考慮事項
  1. RocksDBステートストアのパフォーマンスを向上させるために、行の総数の追跡を無効にすることもできます。

行数の追跡は、書き込み操作で追加のルックアップをもたらします。特にステートオペレーターのメトリクスの値が大きい場合(numRowsUpdatednumRowsRemoved)、RocksDBステートストアのチューニング時に構成をオフにしてみることをお勧めします。

クエリの再起動中に構成を変更できます。これにより、「可観測性 vs パフォーマンス」のトレードオフの決定を変更できます。構成が無効になっている場合、状態の行数(numTotalStateRows)は0として報告されます。

ステートストアとタスクの局所性

ステートフルな操作は、エグゼキューターのステートストア内のイベントの状態を格納します。ステートストアは、状態を格納するためにメモリやディスクスペースなどのリソースを占有します。そのため、異なるストリーミングバッチ間で同じエグゼキューターでステートストアプロバイダーを実行し続ける方が効率的です。ステートストアプロバイダーの場所を変更するには、チェックポイントされた状態をロードする追加のオーバーヘッドが必要です。チェックポイントから状態をロードするオーバーヘッドは、外部ストレージと状態のサイズによって異なり、マイクロバッチ実行のレイテンシを損なう傾向があります。非常に大きな状態データを処理するなど、一部のユースケースでは、チェックポイントされた状態から新しいステートストアプロバイダーをロードすると、非常に時間がかかり、非効率的になる可能性があります。

構造化ストリーミングクエリのステートフルな操作は、SparkのRDDの優先ロケーション機能に依存して、同じエグゼキューターでステートストアプロバイダーを実行します。次のバッチで、対応するステートストアプロバイダーがこのエグゼキューターで再度スケジュールされた場合、以前の状態を再利用して、チェックポイントされた状態をロードする時間を節約できます。

ただし、一般的に、優先ロケーションは必須要件ではなく、Sparkが優先ロケーション以外のエグゼキューターにタスクをスケジュールする可能性は依然としてあります。この場合、Sparkは新しいエグゼキューターのチェックポイントされた状態からステートストアプロバイダーをロードします。前のバッチで実行されたステートストアプロバイダーはすぐにはアンロードされません。Sparkは、エグゼキューターで非アクティブなステートストアプロバイダーをチェックしてアンロードするメンテナンスタスクを実行します。

タスクのスケジューリングに関連するSpark構成(たとえば、spark.locality.wait)を変更することで、ユーザーはデータローカルタスクの起動をどれくらい待つかをSparkに構成できます。構造化ストリーミングのステートフルな操作の場合、バッチ間で同じエグゼキューターでステートストアプロバイダーを実行させることができます。

特に組み込みのHDFSステートストアプロバイダーの場合、ユーザーはloadedMapCacheHitCountloadedMapCacheMissCountなどのステートストアメトリクスを確認できます。理想的には、キャッシュミス数が最小限に抑えられているのが最適です。つまり、Sparkはチェックポイントされた状態のロードにあまり時間を費やしません。ユーザーは、バッチ間で異なるエグゼキューターでステートストアプロバイダーをロードしないように、Sparkのローカリティ待機構成を増やすことができます。

ストリーミングクエリの開始

最終的な結果であるDataFrame/Datasetを定義したら、あとはストリーミング計算を開始するだけです。これを行うには、DataStreamWriter (Scala/Java/Python のドキュメント) を Dataset.writeStream() を通して返す必要があります。このインターフェースでは、以下の1つ以上を指定する必要があります。

出力モード

出力モードにはいくつかの種類があります。

さまざまなタイプのストリーミングクエリは、異なる出力モードをサポートしています。以下に互換性マトリックスを示します。

クエリの種類 サポートされている出力モード
集計を含むクエリ ウォーターマークを使用したイベント時間での集計 Append、Update、Complete Append モードは、ウォーターマークを使用して古い集計状態を削除します。ただし、ウィンドウ集計の出力は、モードのセマンティクスにより、行が確定された後 (つまり、ウォーターマークを越えた後) に1回だけ結果テーブルに追加できるため、withWatermark() で指定された遅延しきい値だけ遅延します。詳細については、遅延データのセクションを参照してください。

Update モードは、ウォーターマークを使用して古い集計状態を削除します。

Complete モードは、定義上、このモードでは結果テーブルのすべてのデータが保持されるため、古い集計状態を削除しません。
その他の集計 Complete、Update ウォーターマークが定義されていない (他のカテゴリでのみ定義されている) ため、古い集計状態は削除されません。

集計は更新できるため、Append モードのセマンティクスに違反するため、サポートされていません。
mapGroupsWithState を使用したクエリ Update mapGroupsWithState を使用したクエリでは、集計は許可されていません。
flatMapGroupsWithState を使用したクエリ Append 操作モード Append flatMapGroupsWithState の後で集計が許可されています。
Update 操作モード Update flatMapGroupsWithState を使用したクエリでは、集計は許可されていません。
joins を使用したクエリ Append Update および Complete モードはまだサポートされていません。サポートされている結合の種類について詳しくは、結合操作セクションのサポートマトリックスを参照してください。
その他のクエリ Append、Update 集計されていないすべてのデータを結果テーブルに保持することが非現実的であるため、Complete モードはサポートされていません。

出力シンク

組み込みの出力シンクには、いくつかの種類があります。

writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .option("path", "path/to/destination/dir")
    .start()
writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "updates")
    .start()
writeStream
    .foreach(...)
    .start()
writeStream
    .format("console")
    .start()
writeStream
    .format("memory")
    .queryName("tableName")
    .start()

一部のシンクは、出力の永続性を保証せず、デバッグ目的のみを対象としているため、フォールトトレラントではありません。フォールトトレランスのセマンティクスに関する前のセクションを参照してください。以下に、Sparkのすべてのシンクの詳細を示します。

シンク サポートされている出力モード オプション フォールトトレラント
ファイルシンク Append path: 出力ディレクトリへのパス。指定する必要があります。
retention: 出力ファイルの有効期間 (TTL)。TTLよりも古いコミットされたバッチの出力ファイルは、最終的にメタデータログから除外されます。これは、シンクの出力ディレクトリを読み取るリーダーークエリがそれらを処理しない可能性があることを意味します。時間の文字列形式 (「12h」、「7d」など) で値を指定できます。デフォルトでは無効になっています。

ファイル形式固有のオプションについては、DataFrameWriter (Scala/Java/Python/R) の関連メソッドを参照してください。例: 「parquet」形式のオプションについては、DataFrameWriter.parquet() を参照してください。
はい (正確に1回) パーティション分割されたテーブルへの書き込みをサポートします。時間によるパーティション分割が役立つ場合があります。
Kafkaシンク Append、Update、Complete Kafka 統合ガイドを参照してください。 はい (少なくとも1回) 詳細については、Kafka 統合ガイドを参照してください。
Foreach シンク Append、Update、Complete なし はい (少なくとも1回) 詳細については、次のセクションを参照してください。
ForeachBatch シンク Append、Update、Complete なし 実装に依存します 詳細については、次のセクションを参照してください。
コンソールシンク Append、Update、Complete numRows: 各トリガーで印刷する行数 (デフォルト: 20)
truncate: 出力が長すぎる場合に切り捨てるかどうか (デフォルト: true)
いいえ
メモリシンク Append、Complete なし いいえ。ただし、Complete モードでは、再起動されたクエリは完全なテーブルを再作成します。 テーブル名はクエリ名です。

クエリの実行を実際に開始するには、start() を呼び出す必要があることに注意してください。これにより、継続的に実行される実行へのハンドルである StreamingQuery オブジェクトが返されます。このオブジェクトを使用してクエリを管理できます。これについては、次のサブセクションで説明します。ここでは、いくつかの例を使用して、これらすべてを理解しましょう。

# ========== DF with no aggregations ==========
noAggDF = deviceDataDf.select("device").where("signal > 10")

# Print new data to console
noAggDF \
    .writeStream \
    .format("console") \
    .start()

# Write new data to Parquet files
noAggDF \
    .writeStream \
    .format("parquet") \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .option("path", "path/to/destination/dir") \
    .start()

# ========== DF with aggregation ==========
aggDF = df.groupBy("device").count()

# Print updated aggregations to console
aggDF \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

# Have all the aggregates in an in-memory table. The query name will be the table name
aggDF \
    .writeStream \
    .queryName("aggregates") \
    .outputMode("complete") \
    .format("memory") \
    .start()

spark.sql("select * from aggregates").show()   # interactively query in-memory table
// ========== DF with no aggregations ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")

// Print new data to console
noAggDF
  .writeStream
  .format("console")
  .start()

// Write new data to Parquet files
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()

// ========== DF with aggregation ==========
val aggDF = df.groupBy("device").count()

// Print updated aggregations to console
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

// Have all the aggregates in an in-memory table
aggDF
  .writeStream
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start()

spark.sql("select * from aggregates").show()   // interactively query in-memory table
// ========== DF with no aggregations ==========
Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");

// Print new data to console
noAggDF
  .writeStream()
  .format("console")
  .start();

// Write new data to Parquet files
noAggDF
  .writeStream()
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start();

// ========== DF with aggregation ==========
Dataset<Row> aggDF = df.groupBy("device").count();

// Print updated aggregations to console
aggDF
  .writeStream()
  .outputMode("complete")
  .format("console")
  .start();

// Have all the aggregates in an in-memory table
aggDF
  .writeStream()
  .queryName("aggregates")    // this query name will be the table name
  .outputMode("complete")
  .format("memory")
  .start();

spark.sql("select * from aggregates").show();   // interactively query in-memory table
# ========== DF with no aggregations ==========
noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")

# Print new data to console
write.stream(noAggDF, "console")

# Write new data to Parquet files
write.stream(noAggDF,
             "parquet",
             path = "path/to/destination/dir",
             checkpointLocation = "path/to/checkpoint/dir")

# ========== DF with aggregation ==========
aggDF <- count(groupBy(df, "device"))

# Print updated aggregations to console
write.stream(aggDF, "console", outputMode = "complete")

# Have all the aggregates in an in memory table. The query name will be the table name
write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")

# Interactively query in-memory table
head(sql("select * from aggregates"))
ForeachとForeachBatchの使用

foreach および foreachBatch 操作を使用すると、ストリーミングクエリの出力に対して任意の操作と書き込みロジックを適用できます。これらには少し異なるユースケースがあります。foreach ではすべての行に対してカスタムの書き込みロジックが可能ですが、foreachBatch では各マイクロバッチの出力に対して任意の操作とカスタムロジックが可能です。これらの使用法について詳しく理解しましょう。

ForeachBatch

foreachBatch(...) では、ストリーミングクエリの各マイクロバッチの出力データに対して実行される関数を指定できます。Spark 2.4 以降、これは Scala、Java、および Python でサポートされています。これには、マイクロバッチの出力データを持つ DataFrame または Dataset と、マイクロバッチの一意の ID という 2 つのパラメーターが必要です。

def foreach_batch_function(df, epoch_id):
    # Transform and write batchDF
    pass

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  // Transform and write batchDF
}.start()
streamingDatasetOfString.writeStream().foreachBatch(
  new VoidFunction2<Dataset<String>, Long>() {
    public void call(Dataset<String> dataset, Long batchId) {
      // Transform and write batchDF
    }
  }
).start();

R はまだサポートされていません。

foreachBatch を使用すると、次のことができます。

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

注意

Foreach

foreachBatch がオプションでない場合 (たとえば、対応するバッチデータライターが存在しない場合や、継続処理モードの場合)、foreach を使用してカスタムライターロジックを表現できます。具体的には、データ書き込みロジックを、openprocess、および close の3つのメソッドに分割して表現できます。Spark 2.4 以降、foreach は Scala、Java、および Python で利用できます。

Python では、関数またはオブジェクトの 2 つの方法で foreach を呼び出すことができます。関数は処理ロジックを表現する簡単な方法を提供しますが、障害によって一部の入力データの再処理が発生した場合に生成されたデータを重複排除することはできません。その状況では、オブジェクトで処理ロジックを指定する必要があります。

  • まず、関数は入力を行として受け取ります。
def process_row(row):
    # Write row to storage
    pass

query = streamingDF.writeStream.foreach(process_row).start()
  • 次に、オブジェクトには process メソッドとオプションの open および close メソッドがあります
class ForeachWriter:
    def open(self, partition_id, epoch_id):
        # Open connection. This method is optional in Python.
        pass

    def process(self, row):
        # Write row to connection. This method is NOT optional in Python.
        pass

    def close(self, error):
        # Close the connection. This method in optional in Python.
        pass

query = streamingDF.writeStream.foreach(ForeachWriter()).start()

Scala では、ForeachWriter クラス (ドキュメント) を拡張する必要があります。

streamingDatasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String): Unit = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

Java では、ForeachWriter クラス (ドキュメント) を拡張する必要があります。

streamingDatasetOfString.writeStream().foreach(
  new ForeachWriter<String>() {

    @Override public boolean open(long partitionId, long version) {
      // Open connection
    }

    @Override public void process(String record) {
      // Write string to connection
    }

    @Override public void close(Throwable errorOrNull) {
      // Close the connection
    }
  }
).start();

R はまだサポートされていません。

実行セマンティクス ストリーミングクエリが開始されると、Spark は次の方法で関数またはオブジェクトのメソッドを呼び出します

ストリーミングテーブルAPI

Spark 3.1以降では、DataStreamReader.table()を使用してテーブルをストリーミングDataFrameとして読み取り、DataStreamWriter.toTable()を使用してストリーミングDataFrameをテーブルとして書き込むこともできます。

spark = ...  # spark session

# Create a streaming DataFrame
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

# Write the streaming DataFrame to a table
df.writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .toTable("myTable")

# Check the table result
spark.read.table("myTable").show()

# Transform the source dataset and write to a new table
spark.readStream \
    .table("myTable") \
    .select("value") \
    .writeStream \
    .option("checkpointLocation", "path/to/checkpoint/dir") \
    .format("parquet") \
    .toTable("newTable")

# Check the new table result
spark.read.table("newTable").show()
val spark: SparkSession = ...

// Create a streaming DataFrame
val df = spark.readStream
  .format("rate")
  .option("rowsPerSecond", 10)
  .load()

// Write the streaming DataFrame to a table
df.writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable")

// Check the table result
spark.read.table("myTable").show()

// Transform the source dataset and write to a new table
spark.readStream
  .table("myTable")
  .select("value")
  .writeStream
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable")

// Check the new table result
spark.read.table("newTable").show()
SparkSession spark = ...

// Create a streaming DataFrame
Dataset<Row> df = spark.readStream()
  .format("rate")
  .option("rowsPerSecond", 10)
  .load();

// Write the streaming DataFrame to a table
df.writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .toTable("myTable");

// Check the table result
spark.read().table("myTable").show();

// Transform the source dataset and write to a new table
spark.readStream()
  .table("myTable")
  .select("value")
  .writeStream()
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .format("parquet")
  .toTable("newTable");

// Check the new table result
spark.read().table("newTable").show();

Rでは利用できません。

詳細については、DataStreamReader(Scala / Java / Pythonドキュメント)およびDataStreamWriter(Scala / Java / Pythonドキュメント)のドキュメントを確認してください。

トリガー

ストリーミングクエリのトリガー設定は、ストリーミングデータ処理のタイミングを定義します。クエリを固定バッチ間隔のマイクロバッチクエリとして実行するか、継続的な処理クエリとして実行するかを定義します。サポートされているさまざまな種類のトリガーを次に示します。

トリガータイプ 説明
未指定(デフォルト) トリガー設定が明示的に指定されていない場合、デフォルトでは、クエリはマイクロバッチモードで実行され、マイクロバッチは前のマイクロバッチの処理が完了するとすぐに生成されます。
固定間隔マイクロバッチ クエリはマイクロバッチモードで実行され、マイクロバッチはユーザー指定の間隔で開始されます。
  • 前のマイクロバッチが間隔内に完了した場合、エンジンは次のマイクロバッチを開始する前に間隔が終わるまで待機します。
  • 前のマイクロバッチが完了するのに間隔よりも時間がかかる場合(つまり、間隔の境界が失われた場合)、次のマイクロバッチは前のマイクロバッチが完了するとすぐに開始されます(つまり、次の間隔の境界を待機しません)。
  • 新しいデータが利用可能でない場合、マイクロバッチは開始されません。
1回限りのマイクロバッチ(非推奨) クエリは、利用可能なすべてのデータを処理するために1回のみマイクロバッチを実行し、その後自動的に停止します。これは、クラスターを定期的にスピンアップし、前回の期間以降に利用可能になったすべてを処理してから、クラスターをシャットダウンする場合に役立ちます。場合によっては、これにより大幅なコスト削減につながる可能性があります。このトリガーは非推奨であり、ユーザーは利用可能マイクロバッチに移行することが推奨されます。これは、処理のより良い保証、バッチの細かいスケール、およびデータなしのバッチを含むウォーターマークの漸進的な処理の改善を提供するためです。
利用可能マイクロバッチ クエリの1回限りのマイクロバッチトリガーと同様に、クエリは利用可能なすべてのデータを処理し、その後自動的に停止します。違いは、ソースオプション(たとえば、ファイルソースのmaxFilesPerTrigger)に基づいて、(場合によっては)複数のマイクロバッチでデータを処理することであり、これによりクエリのスケーラビリティが向上します。
  • このトリガーは強力な処理保証を提供します。前の実行でどれだけのバッチが残っていたかに関係なく、実行時に利用可能なすべてのデータが終了前に処理されることを保証します。コミットされていないすべてのバッチが最初に処理されます。
  • ウォーターマークはバッチごとに進み、最後のバッチがウォーターマークを進めた場合、終了前にデータなしバッチが実行されます。これにより、より小さく予測可能な状態サイズとステートフル演算子の出力のレイテンシを小さく保つことができます。
固定チェックポイント間隔での継続
(実験的)
クエリは、新しい低レイテンシの継続的な処理モードで実行されます。詳細については、以下の継続的な処理セクションを参照してください。

いくつかのコード例を以下に示します。

# Default trigger (runs micro-batch as soon as it can)
df.writeStream \
  .format("console") \
  .start()

# ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream \
  .format("console") \
  .trigger(processingTime='2 seconds') \
  .start()

# One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream \
  .format("console") \
  .trigger(once=True) \
  .start()

# Available-now trigger
df.writeStream \
  .format("console") \
  .trigger(availableNow=True) \
  .start()

# Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(continuous='1 second')
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()
import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start();

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start();

// One-time trigger (Deprecated, encouraged to use Available-now trigger)
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start();

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start();

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start();
# Default trigger (runs micro-batch as soon as it can)
write.stream(df, "console")

# ProcessingTime trigger with two-seconds micro-batch interval
write.stream(df, "console", trigger.processingTime = "2 seconds")

# One-time trigger
write.stream(df, "console", trigger.once = TRUE)

# Continuous trigger is not yet supported

ストリーミングクエリの管理

クエリを開始したときに作成されたStreamingQueryオブジェクトを使用して、クエリを監視および管理できます。

query = df.writeStream.format("console").start()   # get the query object

query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart

query.name()        # get the name of the auto-generated or user-specified name

query.explain()   # print detailed explanations of the query

query.stop()      # stop the query

query.awaitTermination()   # block until query is terminated, with stop() or with error

query.exception()       # the exception if the query has been terminated with error

query.recentProgress  # a list of the most recent progress updates for this query

query.lastProgress    # the most recent progress update of this streaming query
val query = df.writeStream.format("console").start()   // get the query object

query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId       // get the unique id of this run of the query, which will be generated at every start/restart

query.name        // get the name of the auto-generated or user-specified name

query.explain()   // print detailed explanations of the query

query.stop()      // stop the query

query.awaitTermination()   // block until query is terminated, with stop() or with error

query.exception       // the exception if the query has been terminated with error

query.recentProgress  // an array of the most recent progress updates for this query

query.lastProgress    // the most recent progress update of this streaming query
StreamingQuery query = df.writeStream().format("console").start();   // get the query object

query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data

query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart

query.name();        // get the name of the auto-generated or user-specified name

query.explain();   // print detailed explanations of the query

query.stop();      // stop the query

query.awaitTermination();   // block until query is terminated, with stop() or with error

query.exception();       // the exception if the query has been terminated with error

query.recentProgress();  // an array of the most recent progress updates for this query

query.lastProgress();    // the most recent progress update of this streaming query
query <- write.stream(df, "console")  # get the query object

queryName(query)          # get the name of the auto-generated or user-specified name

explain(query)            # print detailed explanations of the query

stopQuery(query)          # stop the query

awaitTermination(query)   # block until query is terminated, with stop() or with error

lastProgress(query)       # the most recent progress update of this streaming query

単一のSparkSessionで任意の数のクエリを開始できます。それらはすべて、クラスターリソースを共有して同時に実行されます。sparkSession.streams()を使用して、現在アクティブなクエリの管理に使用できるStreamingQueryManagerScala / Java / Pythonドキュメント)を取得できます。

spark = ...  # spark session

spark.streams.active  # get the list of currently active streaming queries

spark.streams.get(id)  # get a query object by its unique id

spark.streams.awaitAnyTermination()  # block until any one of them terminates
val spark: SparkSession = ...

spark.streams.active    // get the list of currently active streaming queries

spark.streams.get(id)   // get a query object by its unique id

spark.streams.awaitAnyTermination()   // block until any one of them terminates
SparkSession spark = ...

spark.streams().active();    // get the list of currently active streaming queries

spark.streams().get(id);   // get a query object by its unique id

spark.streams().awaitAnyTermination();   // block until any one of them terminates
Not available in R.

ストリーミングクエリの監視

アクティブなストリーミングクエリを監視するには、複数の方法があります。SparkのDropwizard Metricsサポートを使用してメトリックを外部システムにプッシュするか、プログラムでアクセスできます。

インタラクティブなメトリクスの読み取り

streamingQuery.lastProgress()およびstreamingQuery.status()を使用して、アクティブなクエリの現在のステータスとメトリックを直接取得できます。lastProgress()は、ScalaJavaStreamingQueryProgressオブジェクトと、Pythonの同じフィールドを持つ辞書を返します。これには、ストリームの最後のトリガーで行われた進捗状況に関するすべての情報が含まれています。処理されたデータ、処理速度、レイテンシなどです。最後のいくつかの進捗状況の配列を返すstreamingQuery.recentProgressもあります。

さらに、streamingQuery.status()は、ScalaJavaStreamingQueryStatusオブジェクトと、Pythonの同じフィールドを持つ辞書を返します。クエリが現在何をしているかについての情報を提供します。トリガーがアクティブか、データが処理されているかなどです。

いくつかの例を以下に示します。

query = ...  # a StreamingQuery
print(query.lastProgress)

'''
Will print something like the following.

{u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
'''

print(query.status)
'''
Will print something like the following.

{u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
'''
val query: StreamingQuery = ...

println(query.lastProgress)

/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


println(query.status)

/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
StreamingQuery query = ...

System.out.println(query.lastProgress());
/* Will print something like the following.

{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/


System.out.println(query.status());
/*  Will print something like the following.
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/
query <- ...  # a StreamingQuery
lastProgress(query)

'''
Will print something like the following.

{
  "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
  "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
  "name" : null,
  "timestamp" : "2017-04-26T08:27:28.835Z",
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 1
  },
  "stateOperators" : [ {
    "numRowsTotal" : 4,
    "numRowsUpdated" : 0
  } ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9999]",
    "startOffset" : 1,
    "endOffset" : 1,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
  }
}
'''

status(query)
'''
Will print something like the following.

{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
'''

非同期APIを使用したプログラムによるメトリクスのレポート

StreamingQueryListenerScala / Java / Pythonドキュメント)をアタッチすることにより、SparkSessionに関連付けられているすべてのクエリを非同期的に監視することもできます。sparkSession.streams.addListener()を使用してカスタムStreamingQueryListenerオブジェクトをアタッチすると、クエリが開始および停止したとき、およびアクティブなクエリで進捗状況が発生したときにコールバックを受け取ります。例を次に示します。

spark = ...

class Listener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("Query started: " + queryStarted.id)

    def onQueryProgress(self, event):
        print("Query made progress: " + queryProgress.progress)

    def onQueryTerminated(self, event):
    	print("Query terminated: " + queryTerminated.id)


spark.streams.addListener(Listener())
val spark: SparkSession = ...

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})
SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
Not available in R.

Dropwizardを使用したメトリクスのレポート

Sparkは、Dropwizardライブラリを使用したメトリックのレポートをサポートしています。構造化ストリーミングクエリのメトリックもレポートできるようにするには、SparkSessionで構成spark.sql.streaming.metricsEnabledを明示的に有効にする必要があります。

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
# or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true")
spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
// or
spark.sql("SET spark.sql.streaming.metricsEnabled=true");
sql("SET spark.sql.streaming.metricsEnabled=true")

この構成が有効になった後、SparkSessionで開始されたすべてのクエリは、構成されたシンク(Ganglia、Graphite、JMXなど)にDropwizardを介してメトリックをレポートします。

チェックポイントによる障害からの回復

失敗または意図的なシャットダウンの場合、以前のクエリの進捗状況と状態を回復し、中断したところから続行できます。これは、チェックポイントと先行書き込みログを使用して行われます。チェックポイントの場所を使用してクエリを構成できます。クエリは、すべての進捗情報(つまり、各トリガーで処理されたオフセットの範囲)と、チェックポイントの場所への実行中の集計(たとえば、簡単な例の単語数)を保存します。このチェックポイントの場所は、HDFS互換ファイルシステムのパスである必要があり、クエリを開始するときにDataStreamWriterのオプションとして設定できます。

aggDF \
    .writeStream \
    .outputMode("complete") \
    .option("checkpointLocation", "path/to/HDFS/dir") \
    .format("memory") \
    .start()
aggDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()
aggDF
  .writeStream()
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start();
write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")

ストリーミングクエリの変更後の回復セマンティクス

同じチェックポイントの場所から再起動する間に、ストリーミングクエリで許可される変更には制限があります。変更が許可されないか、変更の影響が明確に定義されていない変更の種類を次に示します。それらすべてについて

変更の種類

非同期の進捗状況追跡

それは何ですか?

非同期進捗追跡により、ストリーミングクエリは、マイクロバッチ内の実際のデータ処理と並行して、非同期的に進捗をチェックポイントできるため、オフセットログとコミットログの維持に関連するレイテンシが削減されます。

Async Progress Tracking

どのように機能しますか?

Structured Streamingは、クエリ処理の進捗指標としてオフセットを保持および管理することに依存しています。オフセット管理操作は、これらの操作が完了するまでデータ処理が発生できないため、処理レイテンシに直接影響します。非同期進捗追跡により、ストリーミングクエリは、これらのオフセット管理操作の影響を受けることなく進捗をチェックポイントできます。

どのように使用しますか?

以下のコードスニペットは、この機能の使用方法の例を示しています。

val stream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "in")
      .load()
val query = stream.writeStream
     .format("kafka")
	.option("topic", "out")
     .option("checkpointLocation", "/tmp/checkpoint")
	.option("asyncProgressTrackingEnabled", "true")
     .start()

以下の表は、この機能の設定と、それに関連するデフォルト値について説明しています。

オプション デフォルト 説明
asyncProgressTrackingEnabled true/false false 非同期進捗追跡を有効または無効にします
asyncProgressTrackingCheckpointIntervalMs ミリ秒 1000 オフセットと完了コミットをコミットする間隔

制限事項

この機能の初期バージョンには、次の制限があります。

設定のオフ

非同期進捗追跡をオフにすると、次の例外がスローされる可能性があります。

java.lang.IllegalStateException: batch x doesn't exist

また、次のエラーメッセージがドライバログに出力される可能性があります。

The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.

これは、非同期進捗追跡が有効になっている場合、フレームワークが非同期進捗追跡を使用しない場合のようにすべてのバッチの進捗をチェックポイントしないという事実が原因です。この問題を解決するには、「asyncProgressTrackingEnabled」を再度有効にし、「asyncProgressTrackingCheckpointIntervalMs」を0に設定して、少なくとも2つのマイクロバッチが処理されるまでストリーミングクエリを実行します。これで、非同期進捗追跡を安全に無効にでき、クエリの再起動は正常に続行されます。

継続処理

[試験的]

継続処理は、Spark 2.3で導入された新しい実験的なストリーミング実行モードであり、少なくとも1回のフォールトトレランス保証で低い(〜1ミリ秒)エンドツーエンドレイテンシを可能にします。これを、正確に1回の保証を達成できますが、最高のレイテンシが〜100ミリ秒のデフォルトのマイクロバッチ処理エンジンと比較してください。一部のタイプのクエリ(後述)では、アプリケーションロジックを変更せずに(つまり、DataFrame/Dataset操作を変更せずに)、どちらのモードで実行するかを選択できます。

継続処理モードでサポートされているクエリを実行するには、必要なチェックポイント間隔をパラメータとして持つ継続トリガーを指定するだけです。例:

spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load() \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .trigger(continuous="1 second") \     # only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start()
import org.apache.spark.sql.streaming.Trigger;

spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

1秒のチェックポイント間隔は、継続処理エンジンが1秒ごとにクエリの進捗を記録することを意味します。結果として得られるチェックポイントは、マイクロバッチエンジンと互換性のある形式であるため、任意のクエリを任意のトリガーで再起動できます。たとえば、マイクロバッチモードで開始されたサポートされているクエリは、継続モードで再起動でき、その逆も可能です。継続モードに切り替えるたびに、少なくとも1回のフォールトトレランス保証が得られることに注意してください。

サポートされているクエリ

Spark 2.4の時点では、継続処理モードでサポートされているのは、次のタイプのクエリのみです。

詳細については、「入力ソース」セクションと「出力シンク」セクションを参照してください。コンソールシンクはテストに適していますが、エンドツーエンドの低レイテンシ処理は、Kafkaをソースとシンクとして使用することで最もよく観察できます。これにより、エンジンはデータを処理し、入力トピックで入力データが利用可能になってから数ミリ秒以内に出力トピックで結果を利用できるようにします。

注意点

追加情報

さらに読む

講演

移行ガイド

移行ガイドは現在、このページにアーカイブされています。