Structured Streaming + Kafka 統合ガイド (Kafka ブローカーバージョン 0.10.0 以降)

Kafka 0.10 向けの Structured Streaming 統合。Kafka からデータを読み取ったり、Kafka にデータを書き込んだりできます。

リンク

SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合、アプリケーションを次のアーティファクトとリンクしてください。

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.13
version = 4.0.0

ヘッダー機能を使用するには、Kafka クライアントのバージョンが 0.11.0.0 以降である必要があることに注意してください。

Python アプリケーションの場合、アプリケーションのデプロイ時に上記ライブラリとその依存関係を追加する必要があります。以下の デプロイ サブセクションを参照してください。

spark-shell で実験する場合も、spark-shell を呼び出す際に上記ライブラリとその依存関係を追加する必要があります。また、以下の デプロイ サブセクションを参照してください。

Kafka からのデータ読み取り

ストリーミングクエリ用の Kafka ソースの作成

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");

// Subscribe to multiple topics
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

バッチクエリ用の Kafka ソースの作成

バッチ処理に適したユースケースがある場合は、定義されたオフセット範囲の Dataset/DataFrame を作成できます。

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

ソースの各行は次のスキーマを持ちます。

key binary
value binary
topic string
partition int
offset long
timestamp timestamp
timestampType int
headers (オプション) array

バッチクエリとストリーミングクエリの両方で、Kafka ソースに以下のオプションを設定する必要があります。

オプションvalue意味
assign JSON 文字列 {"topicA":[0,1],"topicB":[2,4]} 消費する特定の TopicPartition。Kafka ソースでは、「assign」、「subscribe」、「subscribePattern」のいずれか 1 つのオプションのみを指定できます。
subscribe コンマ区切りのトピックリスト 購読するトピックリスト。Kafka ソースでは、「assign」、「subscribe」、「subscribePattern」のいずれか 1 つのオプションのみを指定できます。
subscribePattern Java 正規表現文字列 トピックに購読するために使用されるパターン。Kafka ソースでは、「assign」、「subscribe」、「subscribePattern」のいずれか 1 つのオプションのみを指定できます。
kafka.bootstrap.servers コンマ区切りの host:port リスト Kafka の "bootstrap.servers" 設定。

以下の設定はオプションです。

オプションvaluedefaultクエリタイプ意味
startingTimestamp タイムスタンプ文字列 例: "1000" なし (次に優先されるのは startingOffsetsByTimestamp) ストリーミングとバッチ クエリが開始されたときのタイムスタンプの開始点。購読されているトピックのすべてのパーティションの開始タイムスタンプを指定する文字列です。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、動作はオプション startingOffsetsByTimestampStrategy の値に従います。

注1: startingTimestampstartingOffsetsByTimestamp および startingOffsets よりも優先されます。

注2: ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開時には常にクエリが停止した時点から続行されます。クエリ中に新しく検出されたパーティションは earliest から開始されます。

startingOffsetsByTimestamp JSON 文字列 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ なし (次に優先されるのは startingOffsets) ストリーミングとバッチ クエリが開始されたときのタイムスタンプの開始点。各 TopicPartition の開始タイムスタンプを指定する JSON 文字列です。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、動作はオプション startingOffsetsByTimestampStrategy の値に従います。

注1: startingOffsetsByTimestampstartingOffsets よりも優先されます。

注2: ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開時には常にクエリが停止した時点から続行されます。クエリ中に新しく検出されたパーティションは earliest から開始されます。

startingOffsets "earliest", "latest" (ストリーミングのみ)、または JSON 文字列 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ ストリーミングの場合は "latest"、バッチの場合は "earliest" ストリーミングとバッチ クエリが開始されたときの開始点。「earliest」は最も古いオフセットから、「latest」は最新のオフセットから開始します。JSON 文字列で各 TopicPartition の開始オフセットを指定することもできます。JSON では、オフセットとして -2 は earliest を、-1 は latest を参照するために使用できます。注意: バッチクエリの場合、latest (暗黙的または JSON で -1 を使用) は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開時には常にクエリが停止した時点から続行されます。クエリ中に新しく検出されたパーティションは earliest から開始されます。
endingTimestamp タイムスタンプ文字列 例: "1000" なし (次に優先されるのは endingOffsetsByTimestamp) バッチクエリ バッチクエリが終了するときの終了点。購読されているトピックのすべてのパーティションの終了タイムスタンプを指定する JSON 文字列です。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、オフセットは latest に設定されます。

注: endingTimestampendingOffsetsByTimestamp および endingOffsets よりも優先されます。

endingOffsetsByTimestamp JSON 文字列 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ なし (次に優先されるのは endingOffsets) バッチクエリ バッチクエリが終了するときの終了点。各 TopicPartition の終了タイムスタンプを指定する JSON 文字列です。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、オフセットは latest に設定されます。

注: endingOffsetsByTimestampendingOffsets よりも優先されます。

endingOffsets latest または JSON 文字列 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest バッチクエリ バッチクエリが終了するときの終了点。「latest」は最新のオフセットを参照し、「JSON 文字列」で各 TopicPartition の終了オフセットを指定します。JSON では、オフセットとして -1 は latest を参照するために使用でき、-2 (earliest) は許可されません。
failOnDataLoss true または false true ストリーミングとバッチ データが失われる可能性がある場合 (トピックが削除されたり、オフセットが範囲外になったりした場合) にクエリを失敗させるかどうか。これは誤検知の可能性があります。期待どおりに機能しない場合は無効にできます。
kafkaConsumer.pollTimeoutMs long 120000 ストリーミングとバッチ エグゼキュータで Kafka からデータをポーリングする際のミリ秒単位のタイムアウト。定義されていない場合は spark.network.timeout にフォールバックします。
fetchOffset.numRetries int 3 ストリーミングとバッチ Kafka オフセットの取得を断念するまでのリトライ回数。
fetchOffset.retryIntervalMs long 10 ストリーミングとバッチ Kafka オフセットの取得をリトライする前に待機するミリ秒。
maxOffsetsPerTrigger long なし ストリーミングクエリ トリガー間隔ごとに処理されるオフセットの最大数に対するレート制限。指定されたオフセットの合計数は、異なるボリュームのトピックパーティション全体に比例して分割されます。
minOffsetsPerTrigger long なし ストリーミングクエリ トリガー間隔ごとに処理されるオフセットの最小数。指定されたオフセットの合計数は、異なるボリュームのトピックパーティション全体に比例して分割されます。注意: maxTriggerDelay を超えた場合、利用可能なオフセットの数が minOffsetsPerTrigger に達しなくてもトリガーが起動します。
maxTriggerDelay 単位付きの時間 15m ストリーミングクエリ ソースからデータが利用可能であるにもかかわらず、トリガーが遅延する可能性のある最大時間。このオプションは、minOffsetsPerTrigger が設定されている場合にのみ適用されます。
minPartitions int なし ストリーミングとバッチ Kafka から読み取るパーティションの希望最小数。デフォルトでは、Spark は Kafka から消費するトピックパーティションと Spark パーティションの 1 対 1 のマッピングを持ちます。このオプションをトピックパーティション数よりも大きい値に設定すると、Spark は大きな Kafka パーティションを小さな断片に分割します。この設定は ヒント のようなものであり、Spark タスクの数は minPartitionsなることに注意してください。丸め誤差や新しいデータを受信しなかった Kafka パーティションにより、これより少なくなることも多くなることもあります。
maxRecordsPerPartition long なし ストリーミングとバッチ パーティションあたりの最大レコード数を制限します。デフォルトでは、Spark は Kafka から消費するトピックパーティションと Spark パーティションの 1 対 1 のマッピングを持ちます。このオプションを設定すると、Spark は各パーティションに最大 maxRecordsPerPartition レコードが含まれるように Kafka パーティションを小さな断片に分割します。minPartitionsmaxRecordsPerPartition の両方が設定されている場合、パーティション数は (recordsPerPartition / maxRecordsPerPartition)minPartitions の最大値になります。この場合、Spark は maxRecordsPerPartition に基づいてパーティションを分割し、最終的なパーティション数が minPartitions より少ない場合は、minPartitions に基づいて再度パーティションを分割します。
groupIdPrefix string spark-kafka-source ストリーミングとバッチ Structured Streaming クエリによって生成されるコンシューマーグループ識別子 (group.id) のプレフィックス。「kafka.group.id」が設定されている場合、このオプションは無視されます。
kafka.group.id string なし ストリーミングとバッチ Kafka から読み取る際に Kafka コンシューマーで使用する Kafka グループ ID。これは注意して使用してください。デフォルトでは、各クエリは読み取り用に一意のグループ ID を生成します。これにより、各 Kafka ソースが独自のコンシューマーグループを持ち、他のコンシューマーからの干渉を受けないため、購読しているトピックのすべてのパーティションを読み取ることができます。一部のシナリオ (例: Kafka のグループベースの認証) では、データを読み取るために特定の承認済みグループ ID を使用したい場合があります。オプションでグループ ID を設定できます。ただし、予期しない動作を引き起こす可能性があるため、細心の注意を払って行ってください。同時に実行されているクエリ (バッチとストリーミングの両方) または同じグループ ID を持つソースは、互いに干渉し、各クエリがデータのサブセットのみを読み取る可能性があります。これは、クエリが短時間で開始/再開された場合にも発生する可能性があります。これらの問題を最小限に抑えるには、Kafka コンシューマーセッションタイムアウト (オプション "kafka.session.timeout.ms" を設定) を非常に短く設定します。これが設定されると、オプション "groupIdPrefix" は無視されます。
includeHeaders boolean false ストリーミングとバッチ Kafka ヘッダーを行に含めるかどうか。
startingOffsetsByTimestampStrategy "error" または "latest" "error" ストリーミングとバッチ 指定された開始オフセット (タイムスタンプごとまたはパーティションごと) が Kafka が返したオフセットと一致しない場合に使用される戦略。戦略名と対応する説明は次のとおりです。

"error": クエリを失敗させ、エンドユーザーは手動手順を必要とする回避策に対処する必要があります。

"latest": これらのパーティションの latest オフセットを割り当て、Spark がこれらのパーティションからさらにマイクロバッチで新しいレコードを読み取れるようにします。

タイムスタンプオフセットオプションの詳細

各パーティションに対して返されるオフセットは、対応するパーティションの指定されたタイムスタンプ以上である earliest オフセットです。Kafka が一致するオフセットを返さない場合の動作は、オプションによって異なります。各オプションの説明を確認してください。

Spark は、タイムスタンプ情報を KafkaConsumer.offsetsForTimes に渡すだけで、値を解釈したり推論したりしません。KafkaConsumer.offsetsForTimes の詳細については、javadoc を参照してください。また、timestamp の意味は Kafka の設定 (log.message.timestamp.type) によって異なる場合があります。詳細については、Kafka ドキュメントを参照してください。

タイムスタンプオフセットオプションには Kafka 0.10.1.0 以降が必要です。

オフセット取得

Spark 3.0 以前では、Spark はオフセット取得に KafkaConsumer を使用していましたが、これはドライバーで無限待機を引き起こす可能性がありました。Spark 3.1 では、新しい設定オプション spark.sql.streaming.kafka.useDeprecatedOffsetFetching (デフォルト: false) が追加され、Spark は AdminClient を使用した新しいオフセット取得メカニズムを使用できるようになりました。(古いオフセット取得を KafkaConsumer で使用するには、これを true に設定してください。)

新しいメカニズムが使用される場合、以下が適用されます。

まず、新しいアプローチは Kafka ブローカー 0.11.0.0+ をサポートします。

Spark 3.0 以前では、セキュアな Kafka 処理にはドライバーからの次の ACL が必要でした。

Spark 3.1 以降、オフセットは KafkaConsumer の代わりに AdminClient を使用して取得できます。そのためには、ドライバーからの次の ACL が必要です。

AdminClient はドライバーでコンシューマーグループに接続しないため、group.id ベースの認証は機能しなくなります (エグゼキュータはグループベースの認証をまったく行いません)。エグゼキュータ側は以前とまったく同じように動作することに言及する価値があります (グループプレフィックスとオーバーライドは機能します)。

コンシューマーキャッシュ

Kafka コンシューマーの初期化は時間がかかります。特にストリーミングシナリオでは、処理時間が重要な要素です。このため、Spark は Apache Commons Pool を利用して、エグゼキュータで Kafka コンシューマーをプールします。

キャッシュキーは次の情報から構築されます。

コンシューマープールを設定するために使用できる次のプロパティ。

プロパティ名デフォルト意味バージョン以降
spark.kafka.consumer.cache.capacity 64 キャッシュされるコンシューマーの最大数。これはソフトリミットであることに注意してください。 3.0.0
spark.kafka.consumer.cache.timeout 5m (5 分) コンシューマーがプールでアイドル状態のまま、追い出し対象となる可能性のある最小時間。 3.0.0
spark.kafka.consumer.cache.evictorThreadRunInterval 1m (1 分) コンシューマープールのアイドル追い出しスレッドの実行間隔。非正の値の場合、アイドル追い出しスレッドは実行されません。 3.0.0
spark.kafka.consumer.cache.jmx.enable false この構成インスタンスで作成されたプールに対して JMX を有効または無効にします。プールの統計情報は JMX インスタンスを介して利用できます。JMX 名のプレフィックスは "kafka010-cached-simple-kafka-consumer-pool" に設定されます。 3.0.0

プールのサイズは spark.kafka.consumer.cache.capacity によって制限されますが、Spark タスクをブロックしない「ソフトリミット」として機能します。

アイドル追い出しスレッドは、指定されたタイムアウトよりも長く使用されていないコンシューマーを定期的に削除します。借用時にこのしきい値に達した場合、使用されていない最小使用エントリを削除しようとします。

削除できない場合、プールは成長し続けます。最悪の場合、プールはエグゼキュータで実行できる同時タスクの最大数 (つまり、タスクスロット数) まで成長します。

タスクが何らかの理由で失敗した場合、新しいタスクは安全のために新しく作成された Kafka コンシューマーで実行されます。同時に、キャッシュキーが同じプール内のすべてのコンシューマーを無効にし、失敗した実行で使用されたコンシューマーを削除します。他のタスクが使用しているコンシューマーは閉じられませんが、プールに戻されるときに無効化されます。

コンシューマーに加えて、Spark は取得されたレコードを個別にプールし、Kafka コンシューマーを Spark の観点からステートレスにし、プールの効率を最大化します。Kafka コンシューマープールと同じキャッシュキーを利用します。ただし、特性の違いにより Apache Commons Pool は利用しません。

取得データプールを設定するために使用できる次のプロパティ。

プロパティ名デフォルト意味バージョン以降
spark.kafka.consumer.fetchedData.cache.timeout 5m (5 分) 取得されたデータがプールでアイドル状態のまま、追い出し対象となる可能性のある最小時間。 3.0.0
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval 1m (1 分) 取得データプールのアイドル追い出しスレッドの実行間隔。非正の値の場合、アイドル追い出しスレッドは実行されません。 3.0.0

Kafka へのデータ書き込み

ここでは、Apache Kafka へのストリーミングクエリとバッチクエリの書き込みサポートについて説明します。Apache Kafka は少なくとも 1 回の書き込みセマンティクスのみをサポートしていることに注意してください。したがって、ストリーミングクエリまたはバッチクエリを Kafka に書き込む際に、一部のレコードが重複する可能性があります。これは、たとえば、ブローカーが受信してメッセージレコードを書き込んだにもかかわらず、Kafka がメッセージの再試行を必要とする場合に発生する可能性があります。Structured Streaming は、これらの Kafka 書き込みセマンティクスにより、このような重複の発生を防ぐことはできません。ただし、クエリの書き込みが成功した場合、クエリ出力は少なくとも 1 回書き込まれたと想定できます。書き込まれたデータを読み取る際に重複を削除する可能な解決策として、読み取り時に重複排除を実行するために使用できるプライマリ (一意) キーを導入することが考えられます。

Kafka に書き込まれる DataFrame は、スキーマに次の列を持つ必要があります。

key (オプション) string または binary
value (必須) string または binary
headers (オプション) array
topic (オプション) string
partition (オプション) int

* "topic" 設定オプションが指定されていない場合は、topic 列が必要です。

value 列は必須の唯一のオプションです。key 列が指定されていない場合、null 値の key 列が自動的に追加されます (null 値の key 値がどのように処理されるかについては、Kafka セマンティクスを参照してください)。topic 列が存在する場合、その値は、"topic" 設定オプションが設定されている場合を除き、指定された行を Kafka に書き込む際の topic として使用されます。つまり、「topic」設定オプションは topic 列をオーバーライドします。「partition」列が指定されていない場合 (またはその値が null の場合) は、パーティションは Kafka プロデューサーによって計算されます。Kafka パーティショナーは、kafka.partitioner.class オプションを設定することで Spark で指定できます。存在しない場合は、Kafka のデフォルトパーティショナーが使用されます。

バッチクエリとストリーミングクエリの両方で、Kafka シンクに以下のオプションを設定する必要があります。

オプションvalue意味
kafka.bootstrap.servers コンマ区切りの host:port リスト Kafka の "bootstrap.servers" 設定。

以下の設定はオプションです。

オプションvaluedefaultクエリタイプ意味
topic string なし ストリーミングとバッチ すべての行が Kafka のどのトピックに書き込まれるかを設定します。このオプションは、データに存在する可能性のあるトピック列をオーバーライドします。
includeHeaders boolean false ストリーミングとバッチ Kafka ヘッダーを行に含めるかどうか。

ストリーミングクエリ用の Kafka シンクの作成

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start();

バッチクエリの出力を Kafka に書き込む

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save();

プロデューサーキャッシュ

Kafka プロデューサーインスタンスはスレッドセーフに設計されているため、Spark は Kafka プロデューサーインスタンスを初期化し、同じキャッシュキーを持つタスク間で共有します。

キャッシュキーは次の情報から構築されます。

これには、Spark がデリゲーショントークンを使用している場合に自動的に含まれる認証の設定が含まれます。認証を考慮しても、同じ Kafka プロデューサーインスタンスが同じ Kafka プロデューサー設定で共有されることが期待できます。デリゲーショントークンが更新されると、異なる Kafka プロデューサーが使用されます。古いデリゲーショントークンの Kafka プロデューサーインスタンスは、キャッシュポリシーに従って追い出されます。

プロデューサープールを設定するために使用できる次のプロパティ。

プロパティ名デフォルト意味バージョン以降
spark.kafka.producer.cache.timeout 10m (10 分) プロデューサーがプールでアイドル状態のまま、追い出し対象となる可能性のある最小時間。 2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval 1m (1 分) プロデューサープールのアイドル追い出しスレッドの実行間隔。非正の値の場合、アイドル追い出しスレッドは実行されません。 3.0.0

アイドル追い出しスレッドは、指定されたタイムアウトよりも長く使用されていないプロデューサーを定期的に削除します。プロデューサーは共有され、同時に使用されるため、最終使用時刻はプロデューサーインスタンスが返され、参照カウントが 0 になった時点によって決定されることに注意してください。

Kafka 固有の設定

Kafka 自体の設定は、DataStreamReader.optionkafka. プレフィックスを使用して設定できます。例: stream.option("kafka.bootstrap.servers", "host:port")。可能な Kafka パラメータについては、データを読み取るためのパラメータは Kafka コンシューマー設定ドキュメント、データを書き込むためのパラメータは Kafka プロデューサー設定ドキュメント を参照してください。

次の Kafka パラメータは設定できず、Kafka ソースまたはシンクは例外をスローします。

デプロイ

他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-sql-kafka-0-10_2.13 とその依存関係は、--packages を使用して直接 spark-submit に追加できます。

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 ...

spark-shell で実験する場合も、--packages を使用して spark-sql-kafka-0-10_2.13 とその依存関係を直接追加できます。

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.13:4.0.0 ...

外部依存関係を持つアプリケーションの送信に関する詳細については、アプリケーション送信ガイドを参照してください。

セキュリティ

Kafka 0.9.0.0 は、クラスタのセキュリティを向上させるいくつかの機能を紹介しました。これらの可能性の詳細については、Kafka セキュリティドキュメントを参照してください。

セキュリティはオプションであり、デフォルトではオフになっていることに注意してください。

Spark は、Kafka クラスタに対して認証するために次の方法をサポートしています。

デリゲーショントークン

この方法では、アプリケーションを Spark パラメータで設定でき、JAAS ログイン設定は不要になる場合があります (Spark は Kafka の動的 JAAS 設定機能を使用できます)。デリゲーショントークンの詳細については、Kafka デリゲーショントークン ドキュメントを参照してください。

プロセスは、Spark の Kafka デリゲーショントークンプロバイダーによって開始されます。spark.kafka.clusters.${cluster}.auth.bootstrap.servers が設定されている場合、Spark は次のログインオプションを優先順位で考慮します。

Kafka デリゲーショントークンプロバイダーは、spark.security.credentials.kafka.enabledfalse に設定することで無効にできます (デフォルト: true)。

Spark は、トークンを取得するために次の認証プロトコルを使用するように構成できます (Kafka ブローカー設定と一致する必要があります)。

デリゲーショントークンを正常に取得した後、Spark はそれをノード全体に配布し、適切に更新します。デリゲーショントークンは認証に SCRAM ログインモジュールを使用するため、適切な spark.kafka.clusters.${cluster}.sasl.token.mechanism (デフォルト: SCRAM-SHA-512) を構成する必要があります。また、このパラメータは Kafka ブローカー設定と一致する必要があります。

デリゲーショントークンがエグゼキュータで利用可能な場合、Spark は次のログインオプションを優先順位で考慮します。

上記のいずれも該当しない場合、非セキュアな接続とみなされます。

設定

デリゲーショントークンは複数のクラスタから取得でき、${cluster} は異なる構成をグループ化するのに役立つ任意のユニークな識別子です。

プロパティ名デフォルト意味バージョン以降
spark.kafka.clusters.${cluster}.auth.bootstrap.servers なし Kafka クラスタへの初期接続を確立するために使用するコンマ区切りのホスト/ポートペアのリスト。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex .* アプリケーション内のソースとシンクの bootstrap.servers 設定に一致する正規表現。サーバーアドレスがこの正規表現に一致する場合、対応するブートストラップサーバーから取得されたデリゲーショントークンが接続に使用されます。複数のクラスタがアドレスに一致する場合、例外がスローされ、クエリは開始されません。Kafka のセキュアリスナーと非セキュアリスナーは異なるポートにバインドされます。両方が使用される場合、セキュアリスナーポートは正規表現の一部である必要があります。 3.0.0
spark.kafka.clusters.${cluster}.security.protocol SASL_SSL ブローカーとの通信に使用されるプロトコル。詳細については Kafka ドキュメントを参照してください。プロトコルは、bootstrap.servers 設定が一致するすべてのソースとシンクにデフォルトで適用されます (詳細については spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex を参照)。ソースまたはシンクで kafka.security.protocol を設定することでオーバーライドできます。 3.0.0
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name kafka Kafka が実行される Kerberos プリンシパル名。これは Kafka の JAAS 設定または Kafka の設定で定義できます。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.type なし トラストストアファイルのファイル形式。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.2.0
spark.kafka.clusters.${cluster}.ssl.truststore.location なし トラストストアファイルの場所。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.password なし トラストストアファイルのストアパスワード。これはオプションであり、spark.kafka.clusters.${cluster}.ssl.truststore.location が設定されている場合にのみ必要です。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.type なし キーストアファイルのファイル形式。これはクライアントにはオプションです。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.2.0
spark.kafka.clusters.${cluster}.ssl.keystore.location なし キーストアファイルの場所。これはクライアントにはオプションであり、クライアントの双方向認証に使用できます。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.password なし キーストアファイルのストアパスワード。これはオプションであり、spark.kafka.clusters.${cluster}.ssl.keystore.location が設定されている場合にのみ必要です。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.ssl.key.password なし キーストアファイル内の秘密鍵のパスワード。これはクライアントにはオプションです。詳細については Kafka ドキュメントを参照してください。デリゲーショントークンを取得するためだけにものです。 3.0.0
spark.kafka.clusters.${cluster}.sasl.token.mechanism SCRAM-SHA-512 デリゲーショントークンを使用したクライアント接続に使用される SASL メカニズム。認証には SCRAM ログインモジュールが使用されるため、互換性のあるメカニズムをここに設定する必要があります。詳細については Kafka ドキュメント (sasl.mechanism) を参照してください。デリゲーショントークンを使用して Kafka ブローカーに対して認証するためだけにものです。 3.0.0

Kafka 固有の設定

Kafka 自体の設定は、kafka. プレフィックスを使用して設定できます。例: --conf spark.kafka.clusters.${cluster}.kafka.retries=1。可能な Kafka パラメータについては、Kafka adminclient 設定ドキュメントを参照してください。

注意点

JAAS ログイン設定

JAAS ログイン設定は、Spark が Kafka クラスタにアクセスしようとするすべてのノードに配置する必要があります。これにより、メンテナンスコストが高くなるカスタム認証ロジックを適用する可能性が提供されます。これはいくつかの方法で実現できます。1 つの方法は、追加の JVM パラメータを提供することです。例:

./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...