Structured Streaming プログラミングガイド

概要

TransformWithState は、Apache Spark 4.0 リリース以降の Structured Streaming に導入された新しい任意のステートフルオペレーターです。このオペレーターは、Apache Spark で任意のステートフル処理を行うための、Scala の古い mapGroupsWithState/flatMapGroupsWithState API および Python の applyInPandasWithState API の次世代の後継となります。

このオペレーターは、オブジェクト指向のステートフルプロセッサー定義、複合型、自動 TTL ベースの削除、タイマーなど、包括的な機能をサポートしており、ビジネス上重要な運用ユースケースの構築に使用できます。

言語サポート

TransformWithState は Scala、Java、Python で利用できます。Python では、Apache Spark の Pandas インターフェイスと連携する他のオペレーターと同様に、オペレーター名は transformWithStateInPandas となります。

TransformWithState クエリのコンポーネント

TransformWithState クエリは、通常、次のコンポーネントで構成されます。

次のセクションで、上記のコンポーネントについて詳しく説明します。

ステートフルプロセッサーの定義

ステートフルプロセッサーは、入力イベントを操作するために使用されるユーザー定義ロジックの中核です。ステートフルプロセッサーは、StatefulProcessor クラスを拡張し、いくつかのメソッドを実装することで定義されます。

典型的なステートフルプロセッサーは、次の構造を扱います。

ステートフルプロセッサーは、オブジェクト指向パラダイムを使用してステートフルロジックを定義します。ステートフルロジックは、次のメソッドを実装することで定義されます。

上記のメソッドは、オペレーターがストリーミングクエリの一部として実行されるときに Spark クエリエンジンによって呼び出されます。

また、すべての種類の操作が各メソッドでサポートされているわけではないことに注意してください。たとえば、ユーザーは init メソッドでタイマーを登録できません。同様に、handleExpiredTimer メソッドで入力行を操作することはできません。エンジンは、サポートされていない/互換性のない操作を検出し、必要に応じてクエリを失敗させます。

StatefulProcessorHandle の使用

上記のメソッド内の多くの操作は、StatefulProcessorHandle オブジェクトを使用して実行できます。StatefulProcessorHandle オブジェクトは、基盤となる状態ストアと対話するためのメソッドを提供します。このオブジェクトは、getHandle メソッドを呼び出すことで、StatefulProcessor 内で取得できます。

状態変数の使用

状態変数は、ユーザーの状態を保存するために使用されるクラス固有のメンバーです。これらは一度宣言され、ステートフルプロセッサーの init メソッド内で初期化する必要があります。

状態変数の初期化は、通常、次の手順で行われます。

状態変数の種類

状態変数は、次の型にすることができます。

人気のあるプログラミング言語のコレクションと同様に、状態型は、基盤となるストレージレイヤーのさまざまな種類の操作に最適化されたデータ構造をモデル化するために使用できます。たとえば、追加は ListState に最適化され、ポイントルックアップは MapState に最適化されます。

状態エンコーダーの提供

状態エンコーダーは、状態変数をシリアル化およびデシリアル化するために使用されます。Scala では、暗黙的なエンコーダーが利用可能な場合は状態エンコーダーをスキップできます。Java と Python では、状態エンコーダーを明示的に提供する必要があります。プリミティブ、ケースクラス、Java Bean クラスの組み込みエンコーダーは、Spark SQL エンコーダーを介してデフォルトで提供されます。

Scala での暗黙的なエンコーダーの提供

Scala では、ケースクラスとプリミティブ型に暗黙的なエンコーダーを提供できます。implicits オブジェクトは、StatefulProcessor クラスの一部として提供されます。StatefulProcessor 定義内では、ユーザーは単純に implicits を import implicits._ としてインポートでき、エンコーダー型を明示的に渡す必要がなくなります。

状態変数に TTL を提供する

状態変数は、オプションの TTL (Time-To-Live) 値で構成できます。TTL 値は、指定された期間後に状態変数を自動的に削除するために使用されます。TTL 値は Duration として提供できます。

入力行の処理

handleInputRows メソッドは、グループ化キーに属する入力行を処理し、必要に応じて出力を発行するために使用されます。このメソッドは、オペレーターによって受信された各グループ化キー値に対して Spark クエリエンジンによって呼び出されます。同じグループ化キーに属する複数の行がある場合、提供されたイテレータにはそれらのすべての行が含まれます。

期限切れのタイマーの処理

handleInputRows または handleExpiredTimer メソッド内で、ステートフルプロセッサーは後でトリガーされるタイマーを登録できます。handleExpiredTimer メソッドは、ステートフルプロセッサーによって設定されたタイマーが期限切れになったときに Spark クエリエンジンによって呼び出されます。このメソッドは、期限切れのタイマーごとに 1 回呼び出されます。サポートされているタイマープロパティをいくつか紹介します。

初期状態の処理

handleInitialState メソッドは、オプションで初期状態バッチデータフレームを処理するために使用されます。初期状態バッチデータフレームは、ステートフルプロセッサーの状態を事前に入力するために使用されます。このメソッドは、初期状態バッチデータフレームが利用可能になったときに Spark クエリエンジンによって呼び出されます。このメソッドは、クエリのライフタイム中に 1 回だけ呼び出されます。これは、ステートフルプロセッサーによって入力行が処理される前に呼び出されます。

すべてをまとめる

ここに、ダウンタイム検出器を実装する StatefulProcessor の例を示します。指定されたキーに対して新しい値が検出されるたびに、lastSeen 状態値を更新し、既存のタイマーをすべてクリアし、将来のタイマーをリセットします。

タイマーが期限切れになると、アプリケーションはキーで最後に観測されたイベントからの経過時間を発行します。その後、10 秒後に更新を発行するように新しいタイマーを設定します。

class DownTimeDetector(StatefulProcessor):
    def init(self, handle: StatefulProcessorHandle) -> None:
        # Define schema for the state value (timestamp)
        state_schema = StructType([StructField("value", TimestampType(), True)])
        self.handle = handle
        # Initialize state to store the last seen timestamp for each key
        self.last_seen = handle.getValueState("last_seen", state_schema)

    def handleExpiredTimer(self, key, timerValues, expiredTimerInfo) -> Iterator[pd.DataFrame]:
        latest_from_existing = self.last_seen.get()
        # Calculate downtime duration
        downtime_duration = timerValues.getCurrentProcessingTimeInMs() - int(latest_from_existing.timestamp() * 1000)
        # Register a new timer for 10 seconds in the future
        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)
        # Yield a DataFrame with the key and downtime duration
        yield pd.DataFrame(
            {
                "id": key,
                "timeValues": str(downtime_duration),
            }
        )

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        # Find the row with the maximum timestamp
        max_row = max((tuple(pdf.iloc[0]) for pdf in rows), key=lambda row: row[1])

        # Get the latest timestamp from existing state or use epoch start if not exists
        if self.last_seen.exists():
            latest_from_existing = self.last_seen.get()
        else:
            latest_from_existing = datetime.fromtimestamp(0)

        # If new data is more recent than existing state
        if latest_from_existing < max_row[1]:
            # Delete all existing timers
            for timer in self.handle.listTimers():
                self.handle.deleteTimer(timer)
            # Update the last seen timestamp
            self.last_seen.update((max_row[1],))

        # Register a new timer for 5 seconds in the future
        self.handle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 5000)

        # Yield an empty DataFrame
        yield pd.DataFrame()

    def close(self) -> None:
        # No cleanup needed
        pass
// The (String, Timestamp) schema represents an (id, time). We want to do downtime
// detection on every single unique sensor, where each sensor has a sensor ID.
class DowntimeDetector(duration: Duration) extends
  StatefulProcessor[String, (String, Timestamp), (String, Duration)] {

  @transient private var _lastSeen: ValueState[Timestamp] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    _lastSeen = getHandle.getValueState[Timestamp]("lastSeen", Encoders.TIMESTAMP, TTLConfig.NONE)
  }

  // The logic here is as follows: find the largest timestamp seen so far. Set a timer for
  // the duration later.
  override def handleInputRows(
      key: String,
      inputRows: Iterator[(String, Timestamp)],
      timerValues: TimerValues): Iterator[(String, Duration)] = {
    val latestRecordFromNewRows = inputRows.maxBy(_._2.getTime)

    val latestTimestampFromExistingRows = if (_lastSeen.exists()) {
      _lastSeen.get()
    } else {
      new Timestamp(0)
    }

    val latestTimestampFromNewRows = latestRecordFromNewRows._2

    if (latestTimestampFromNewRows.after(latestTimestampFromExistingRows)) {
      // Cancel the one existing timer, since we have a new latest timestamp.
      // We call "listTimers()" just because we don't know ahead of time what
      // the timestamp of the existing timer is.
      getHandle.listTimers().foreach(timer => getHandle.deleteTimer(timer))

      _lastSeen.update(latestTimestampFromNewRows)
      // Use timerValues to schedule a timer using processing time.
      getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + duration.toMillis)
    } else {
      // No new latest timestamp, so no need to update state or set a timer.
    }

    Iterator.empty
  }

  override def handleExpiredTimer(
    key: String,
    timerValues: TimerValues,
    expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, Duration)] = {
      val latestTimestamp = _lastSeen.get()
      val downtimeDuration = new Duration(
        timerValues.getCurrentProcessingTimeInMs() - latestTimestamp.getTime)

      // Register another timer that will fire in 10 seconds.
      // Timers can be registered anywhere but init()
      getHandle.registerTimer(timerValues.getCurrentProcessingTimeInMs() + 10000)

      Iterator((key, downtimeDuration))
  }
}

ストリーミングクエリでの StatefulProcessor の使用

StatefulProcessor を定義したので、ストリーミングクエリで使用できます。次のコードスニペットは、Python と Scala のストリーミングクエリで StatefulProcessor を使用する方法を示しています。

q = (df.groupBy("key")
  .transformWithStateInPandas(
    statefulProcessor=DownTimeDetector(),
    outputStructType=output_schema,
    outputMode="Update",
    timeMode="None",
  )
  .writeStream...
  
val query = df.groupBy("key")
  .transformWithState(
    statefulProcessor = new DownTimeDetector(),
    outputMode = OutputMode.Update,
    timeMode = TimeMode.None)
  .writeStream...

状態スキーマの進化

TransformWithState は、管理状態のスキーマ進化を実行することもできます。ここでは 2 つの部分があります。

スキーマ進化は値側でのみサポートされていることに注意してください。キー側の状態スキーマ進化はサポートされていません。

状態変数間の進化

このオペレーターにより、同じストリーミングクエリの異なる実行間で状態変数を追加および削除できます。変数を削除するには、基盤となる状態をパージできるようにエンジンに通知する必要もあります。ユーザーは、StatefulProcessor の init メソッド内で、指定された状態変数に対して deleteIfExists メソッドを呼び出すことでこれを実現できます。

状態変数内の進化

このオペレーターは、特定の状態変数の状態スキーマを進化させることもできます。たとえば、ValueState 変数内で状態を保存するためにケースクラスを使用している場合、フィールドの追加/削除/拡大によってこのケースクラスを進化させることができます。このようなスキーマ進化は、基盤となるエンコーディング形式が Avro に設定されている場合にのみサポートされます。これを有効にするには、Spark 設定を spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro") として設定してください。

次の進化操作は Avro ルール内でサポートされています。

次の進化操作はサポートされていません。

状態データソースとの統合

TransformWithState は、ユーザーがバッチ間で任意の状態を維持できるステートフルオペレーターです。この状態を読み取るには、ユーザーは状態データソースリーダークエリに追加のオプションを指定する必要があります。このオペレーターは、同じクエリ内で複数の状態変数を使用できます。ただし、それらは異なる複合型とエンコーディング形式である可能性があるため、バッチクエリ内で一度に 1 つの変数ずつ読み取る必要があります。これを可能にするために、ユーザーは読み取る状態変数に対して stateVarName を指定する必要があります。

readRegisteredTimers を true に設定することで、タイマーを読み取ることができます。これにより、グループ化キー全体で登録されているすべてのタイマーが返されます。

また、複合型変数を 2 つの形式で読み取ることができます。

メモリ要件に応じて、ユースケースに最適な形式を選択できます。ソースオプションの詳細については、こちらを参照してください。