ステートデータソース統合ガイド

Structured Streaming におけるステートデータソースガイド (実験的)

概要

ステートデータソースは、チェックポイントからステートを操作する機能を提供します。

Spark 4.0 以降、ステートデータソースはバッチクエリによる読み込み機能を提供します。書き込みを含む追加機能は将来のロードマップにあります。

注: このデータソースは現在実験的なものとしてマークされています。ソースオプションと動作 (出力) は変更される可能性があります。

チェックポイントからステートのキー・バリューを読み取る

ステートデータソースは、個別のバッチクエリを実行することにより、チェックポイント内のステートストアからキー・バリューペアを読み取ることができます。ユーザーはこの機能を利用して、以下に説明する2つの主要なユースケースをカバーできます。

ユーザーは、ほとんどの場合、単一のステートフルオペレータに一致するステートストアのインスタンスを読み取ることができます。これは、ユーザーが単一のステートフルオペレータのステート内のすべてのキー・バリューペアを読み取れることを期待できることを意味します。

ただし、例外が存在する可能性があることに注意してください。例えば、内部で複数のステートストアインスタンスを利用するストリーム・ストリーム結合です。データソースは、内部表現をユーザーから抽象化し、ステートを読み取るためのユーザーフレンドリーなアプローチを提供します。詳細については、ストリーム・ストリーム結合のセクションを参照してください。

ステートストアをバッチクエリとして読み取る (すべてデフォルト)

df = spark \
.read \
.format("statestore") \
.load("<checkpointLocation>")
val df = spark
.read
.format("statestore")
.load("<checkpointLocation>")
Dataset<Row> df = spark
.read()
.format("statestore")
.load("<checkpointLocation>");

ソースの各行は以下のスキーマを持ちます。

注意
key struct (ステートキーの型に依存)
value struct (ステート値の型に依存)
partition_id int

キーと値のネストされた列は、ステートフルオペレータの入力スキーマおよびオペレータの型に大きく依存します。ユーザーは、まず `df.schema()` / `df.printSchema()` を通じてスキーマについて問い合わせ、出力の型を理解することをお勧めします。

ソースには以下のオプションを設定する必要があります。

オプション意味
path string チェックポイントの場所のルートディレクトリを指定します。 `option("path", \`path\`)` または `load(\`path\`)` を介してパスを指定できます。

以下の設定はオプションです。

オプションデフォルト意味
batchId 数値 最新のコミット済みバッチ 読み取り対象のバッチを表します。このオプションは、ユーザーがタイムトラベルを実行したい場合に使用されます。バッチはコミットされている必要がありますが、まだクリーンアップされていない必要があります。
operatorId 数値 0 読み取り対象のオペレータを表します。このオプションは、クエリが複数のステートフルオペレータを使用している場合に使用されます。
storeName string DEFAULT 読み取り対象のステートストアの名前を表します。このオプションは、ステートフルオペレータが複数のステートストアインスタンスを使用している場合に使用されます。ストリーム・ストリーム結合以外では必要ありません。
joinSide 文字列 ("left" または "right") (なし) 読み取り対象のサイドを表します。このオプションは、ユーザーがストリーム・ストリーム結合からのステートを読み取りたい場合に使用されます。
snapshotStartBatchId 数値 指定した場合、このバッチIDでスナップショットを強制的に読み取り、その後変更ログが 'batchId' またはそのデフォルトまで再生されます。スナップショットのバッチIDは0から始まり、スナップショットバージョンIDから1を引いた値と等しくなります。このオプションは `snapshotPartitionId` と一緒に使用する必要があります。
snapshotPartitionId 数値 指定した場合、この特定のパーティションのみが読み取られます。パーティションIDは0から始まります。このオプションは `snapshotStartBatchId` と一緒に使用する必要があります。
readChangeFeed boolean false true に設定されている場合、マイクロバッチをまたぐステートの変更を読み取ります。出力テーブルのスキーマも異なります。詳細は "マイクロバッチをまたぐステートの変更を読み取る" セクションを参照してください。このオプションと共に `changeStartBatchId` を指定する必要があります。 `batchId`, `joinSide`, `snapshotStartBatchId`, `snapshotPartitionId` オプションはこのオプションと同時に使用できません。
changeStartBatchId 数値 read change feed モードで読み取る最初のバッチを表します。このオプションには `readChangeFeed` を true に設定する必要があります。
changeEndBatchId 数値 最新のコミット済み batchId read change feed モードで読み取る最後のバッチを表します。このオプションには `readChangeFeed` を true に設定する必要があります。
stateVarName string このバッチクエリの一部として読み取るステート変数名です。 `transformWithState` オペレータが使用されている場合、これは必須オプションです。現在、このオプションは `transformWithState` オペレータにのみ適用されます。
readRegisteredTimers boolean false `transformWithState` オペレータ内で使用される登録済みタイマーを読み取ることができます。現在、このオプションは `transformWithState` オペレータにのみ適用されます。このオプションと上記の `stateVarName` オプションは相互に排他的であり、一度に1つしか使用できません。
flattenCollectionTypes boolean true true に設定されている場合、リストステート、マップステートなどのステート変数のコレクション型がフラット化されます。false の場合、値は Spark SQL の Array または Map 型として提供されます。現在、このオプションは `transformWithState` オペレータにのみ適用されます。

ストリーム・ストリーム結合のステートを読み取る

Structured Streaming は、内部で複数のステートストアインスタンスを利用することにより、ストリーム・ストリーム結合機能を実装します。これらのインスタンスは論理的にバッファを構成し、左右の入力行を格納します。

ユーザーにとってより理解しやすいように、データソースは `joinSide` オプションを提供して、結合の特定のサイドのバッファリングされた入力を読み取ることができます。内部ステートストアインスタンスを直接読み取る機能を有効にするために、 `storeName` オプションを指定することも許可しますが、 `storeName` と `joinSide` は同時に指定できないという制限があります。

transformWithState のステートを読み取る

TransformWithState は、ユーザーがバッチ間で任意のステートを維持できるステートフルオペレータです。このステートを読み取るために、ユーザーはステートデータソースリーダークエリに追加のオプションを提供する必要があります。このオペレータは、同じクエリ内で複数のステート変数を使用できます。しかし、それらは異なる複合型とエンコーディング形式である可能性があるため、バッチクエリ内で一度に1つの変数ずつ読み取る必要があります。これを可能にするために、ユーザーは興味のあるステート変数の `stateVarName` を指定する必要があります。

`readRegisteredTimers` を true に設定することでタイマーを読み取ることができます。これにより、グループ化キー全体に登録されているすべてのタイマーが返されます。

複合型変数を2つの形式で読み取ることも許可されています。

メモリ要件に応じて、ユースケースに最適な形式を選択できます。

マイクロバッチをまたぐステートの変更を読み取る

特定のマイクロバッチのステートストア全体ではなく、マイクロバッチをまたぐステートストアの変更を理解したい場合、 `readChangeFeed` が使用するオプションです。例えば、これはバッチ2から最新のコミット済みバッチまでのステートの変更を読み取るためのコードです。

df = spark \
.read \
.format("statestore") \
.option("readChangeFeed", true) \
.option("changeStartBatchId", 2) \
.load("<checkpointLocation>")
val df = spark
.read
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>")
Dataset<Row> df = spark
.read()
.format("statestore")
.option("readChangeFeed", true)
.option("changeStartBatchId", 2)
.load("<checkpointLocation>");

出力スキーマも通常の出力とは異なります。

注意
batch_id long
change_type string 'update' と 'delete' の2つの値があります。Update は、存在しないキー・バリューペアの挿入、または既存のキーを新しい値で更新することを示します。delete レコードの場合、'value' フィールドは null になります。
key struct (ステートキーの型に依存)
value struct (ステート値の型に依存)
partition_id int

ステートメタデータソース

ステートデータソースを介して既存のチェックポイントからステートをクエリする前に、ユーザーはチェックポイントの情報を理解したい場合があります。これには、チェックポイントで利用可能なオペレータとステートストアインスタンス、利用可能なバッチIDの範囲などが含まれます。

Structured Streaming は、チェックポイントからステート関連のメタデータ情報を提供する "State metadata source" という名前のデータソースを提供します。

注: メタデータは、ストリーミングクエリが Spark 4.0 以降で実行されるときに構築されます。以前の Spark バージョンで実行されていた既存のチェックポイントにはメタデータがなく、このメタデータソースでクエリ/使用することはできません。メタデータをクエリする前にメタデータを構築するために、Spark 4.0 以降で既存のチェックポイントを指すストリーミングクエリを実行する必要があります。ユーザーはオプションで `batchId` を指定して、特定の時点のオペレータメタデータを取得できます。

バッチクエリ用のステートメタデータストアの作成

df = spark \
.read \
.format("state-metadata") \
.load("<checkpointLocation>")
val df = spark
.read
.format("state-metadata")
.load("<checkpointLocation>")
Dataset<Row> df = spark
.read()
.format("state-metadata")
.load("<checkpointLocation>");

ソースには以下のオプションを設定する必要があります。

オプション意味
path string チェックポイントの場所のルートディレクトリを指定します。 `option("path", \`path\`)` または `load(\`path\`)` を介してパスを指定できます。

以下の設定はオプションです。

オプションデフォルト意味
batchId 数値 利用可能な場合、最新のコミット済みバッチ、それ以外の場合は0 そのバッチのオペレータメタデータを取得するために使用されるオプションの `batchId`。

ソースの各行は以下のスキーマを持ちます。

注意
operatorId int
operatorName string
stateStoreName int
numPartitions int
minBatchId int ステートをクエリするために利用可能な最小バッチID。クリーニングが実行されるため、ストリーミングクエリが実行中の場合、値は無効になる可能性があります。
maxBatchId int ステートをクエリするために利用可能な最大バッチID。クエリがさらにバッチをコミットすると、ストリーミングクエリが実行中の場合、値は無効になる可能性があります。
operatorProperties string JSONとしてエンコードされたオペレータで使用されるプロパティのリスト。ここで生成される出力はオペレータに依存します。
_numColsPrefixKey int メタデータ列 (SELECT で指定しない限り非表示)

このデータソースの主要なユースケースの1つは、クエリに複数のステートフルオペレータがある場合 (例: ストリーム・ストリーム結合の後に重複排除)、オペレータIDを特定することです。`operatorName` 列は、ユーザーが指定されたオペレータのオペレータIDを特定するのに役立ちます。

さらに、ユーザーがステートフルオペレータの内部ステートストアインスタンス (例: ストリーム・ストリーム結合) についてクエリしたい場合、 `stateStoreName` 列はターゲットを決定するのに役立ちます。