構造化ストリーミング + Kafka 統合ガイド (Kafka ブローカーバージョン 0.10.0 以上)

Kafka 0.10 からデータを読み取り、Kafka にデータ書き込むための構造化ストリーミングの統合。

リンク

SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合、次のアーティファクトとアプリケーションをリンクします。

groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.1

ヘッダー機能を使用するには、Kafka クライアントのバージョンが 0.11.0.0 以上である必要があることに注意してください。

Python アプリケーションの場合、アプリケーションのデプロイ時に上記のライブラリとその依存関係を追加する必要があります。デプロイ のセクションを参照してください。

spark-shell で実験する場合は、spark-shell を呼び出す際にも、上記のライブラリとその依存関係を追加する必要があります。デプロイ のセクションを参照してください。

Kafka からデータを読み取る

ストリーミングクエリ用の Kafka ソースの作成

# Subscribe to 1 topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to 1 topic, with headers
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")

# Subscribe to multiple topics
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to 1 topic, with headers
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
  .as[(String, String, Array[(String, Array[Byte])])]

// Subscribe to multiple topics
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("includeHeaders", "true")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");

// Subscribe to multiple topics
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern
Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

バッチクエリ用の Kafka ソースの作成

バッチ処理に適したユースケースがある場合は、定義済みのオフセット範囲の Dataset/DataFrame を作成できます。

# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1,topic2")
  .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
  .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
  .read()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribePattern", "topic.*")
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");

ソース内の各行には、次のスキーマがあります。

key バイナリ
value バイナリ
topic 文字列
partition 整数
offset long
timestamp timestamp
timestampType 整数
headers (オプション) 配列

バッチクエリとストリーミングクエリの両方について、Kafka ソースには次のオプションを設定する必要があります。

オプションvalue意味
assign JSON 文字列 {"topicA":[0,1],"topicB":[2,4]} 消費する特定の TopicPartitions。Kafka ソースでは、「assign」、「subscribe」、または「subscribePattern」オプションのいずれか1つのみを指定できます。
subscribe コンマ区切りのトピック一覧 購読するトピック一覧。Kafka ソースでは、「assign」、「subscribe」、または「subscribePattern」オプションのいずれか1つのみを指定できます。
subscribePattern Java 正規表現文字列 トピックへの購読に使用されるパターン。Kafka ソースでは、「assign」、「subscribe」、または「subscribePattern」オプションのいずれか1つのみを指定できます。
kafka.bootstrap.servers コンマ区切りの host:port 一覧 Kafka の "bootstrap.servers" 設定。

次の設定はオプションです。

オプションvalueデフォルトクエリタイプ意味
startingTimestamp タイムスタンプ文字列(例:"1000") なし(次の優先順位は `startingOffsetsByTimestamp`) ストリーミングとバッチ クエリが開始されたときのタイムスタンプの開始点。購読されているトピックのすべてのパーティションの開始タイムスタンプを指定する文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、動作はオプション `startingOffsetsByTimestampStrategy` の値に従います。

注記1:`startingTimestamp` は `startingOffsetsByTimestamp` と `startingOffsets` より優先されます。

注記2:ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されます。クエリ中に新しく検出されたパーティションは、最も古いものから開始されます。

startingOffsetsByTimestamp JSON 文字列 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ なし(次の優先順位は `startingOffsets`) ストリーミングとバッチ クエリが開始されたときのタイムスタンプの開始点。各 TopicPartition の開始タイムスタンプを指定する JSON 文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、動作はオプション `startingOffsetsByTimestampStrategy` の値に従います。

注記1:`startingOffsetsByTimestamp` は `startingOffsets` より優先されます。

注記2:ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されます。クエリ中に新しく検出されたパーティションは、最も古いものから開始されます。

startingOffsets "earliest"、"latest"(ストリーミングのみ)、または JSON 文字列 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ ストリーミングの場合は "latest"、バッチの場合は "earliest" ストリーミングとバッチ クエリが開始されたときの開始点。"earliest" は最も古いオフセットから、"latest" は最新のオフセットから、または各 TopicPartition の開始オフセットを指定する JSON 文字列。JSON では、オフセットとして -2 を使用して最も古いものを参照し、-1 を使用して最新のものを参照できます。注:バッチクエリの場合、latest(暗黙的に、または JSON で -1 を使用して)は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されます。クエリ中に新しく検出されたパーティションは、最も古いものから開始されます。
endingTimestamp タイムスタンプ文字列(例:"1000") なし(次の優先順位は `endingOffsetsByTimestamp`) バッチクエリ バッチクエリが終了したときの終了点。購読されているトピックのすべてのパーティションの終了タイムスタンプを指定する JSON 文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、オフセットは最新に設定されます。

注記:`endingTimestamp` は `endingOffsetsByTimestamp` と `endingOffsets` より優先されます。

endingOffsetsByTimestamp JSON 文字列 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ なし(次の優先順位は `endingOffsets`) バッチクエリ バッチクエリが終了したときの終了点。各 TopicPartition の終了タイムスタンプを指定する JSON 文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、オフセットは最新に設定されます。

注記:`endingOffsetsByTimestamp` は `endingOffsets` より優先されます。

endingOffsets latest または JSON 文字列 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} latest バッチクエリ バッチクエリが終了したときの終了点。"latest" は最新を参照するだけ、または各 TopicPartition の終了オフセットを指定する JSON 文字列。JSON では、オフセットとして -1 を使用して最新を参照し、オフセットとして -2(最も古い)は使用できません。
failOnDataLoss true または false true ストリーミングとバッチ データが失われる可能性がある場合(例:トピックが削除された場合、またはオフセットが範囲外の場合)にクエリを失敗させるかどうか。これは誤検知の可能性があります。期待通りに動作しない場合は、無効にできます。
kafkaConsumer.pollTimeoutMs long 120000 ストリーミングとバッチ エグゼキュータで Kafka からデータを取得するためのタイムアウト(ミリ秒単位)。定義されていない場合は、`spark.network.timeout` にフォールバックします。
fetchOffset.numRetries 整数 3 ストリーミングとバッチ Kafka オフセットのフェッチを諦める前に再試行する回数。
fetchOffset.retryIntervalMs long 10 ストリーミングとバッチ Kafka オフセットのフェッチを再試行するまでの待機時間(ミリ秒単位)。
maxOffsetsPerTrigger long なし ストリーミングクエリ トリガー間隔ごとに処理されるオフセットの最大数に対するレート制限。指定されたオフセットの総数は、異なるボリュームの TopicPartition に比例して分割されます。
minOffsetsPerTrigger long なし ストリーミングクエリ トリガー間隔ごとに処理されるオフセットの最小数。指定されたオフセットの総数は、異なるボリュームの TopicPartition に比例して分割されます。注:maxTriggerDelay を超えた場合、使用可能なオフセットの数が minOffsetsPerTrigger に達しなくてもトリガーが起動されます。
maxTriggerDelay 単位付き時間 15m ストリーミングクエリ ソースからデータが使用可能である場合、2 つのトリガー間のトリガーを遅延させることができる最大時間。このオプションは、minOffsetsPerTrigger が設定されている場合にのみ適用されます。
minPartitions 整数 なし ストリーミングとバッチ Kafka から読み取るパーティションの最小数。デフォルトでは、Spark は Kafka から消費する Spark パーティションへの TopicPartition の 1 対 1 マッピングを行います。このオプションを TopicPartition より大きい値に設定すると、Spark は大きな Kafka パーティションを小さな部分に分割します。この設定はヒントのようなものであることに注意してください。Spark タスクの数は **約** `minPartitions` になります。丸め誤差や新しいデータを受け取らなかった Kafka パーティションによっては、少なくなる場合や多くなる場合があります。
groupIdPrefix 文字列 spark-kafka-source ストリーミングとバッチ 構造化ストリーミングクエリによって生成されるコンシューマグループ識別子(`group.id`)のプレフィックス。"kafka.group.id" が設定されている場合、このオプションは無視されます。
kafka.group.id 文字列 なし ストリーミングとバッチ Kafka から読み取る際に Kafka コンシューマで使用する Kafka グループ ID。注意して使用してください。デフォルトでは、各クエリはデータ読み取り用の固有のグループ ID を生成します。これにより、各 Kafka ソースに他のコンシューマからの干渉を受けない独自のコンシューマグループが確保され、したがって購読されたトピックのすべてのパーティションを読み取ることができます。いくつかのシナリオ(たとえば、Kafka グループベースの承認)では、データ読み取りに特定の承認済みグループ ID を使用する場合があります。オプションでグループ ID を設定できます。ただし、予期しない動作が発生する可能性があるため、非常に注意して行ってください。同時に実行されるクエリ(バッチとストリーミングの両方)または同じグループ ID を持つソースは、互いに干渉して各クエリがデータの一部のみを読み取る可能性があります。これは、クエリが短時間に開始/再開された場合にも発生する可能性があります。このような問題を最小限に抑えるには、Kafka コンシューマのセッションタイムアウト(オプション "kafka.session.timeout.ms" を設定して)を非常に小さく設定します。これが設定されている場合、オプション "groupIdPrefix" は無視されます。
includeHeaders ブール値 false ストリーミングとバッチ 行に Kafka ヘッダーを含めるかどうか。
startingOffsetsByTimestampStrategy "error" または "latest" "error" ストリーミングとバッチ タイムスタンプによる指定された開始オフセット(グローバルまたはパーティションごと)が、Kafkaが返したオフセットと一致しない場合に、この戦略が使用されます。戦略名と対応する説明を以下に示します。

"error":クエリが失敗し、エンドユーザーは手動による手順を必要とする回避策に対処する必要があります。

"latest":これらのパーティションの最新のオフセットを割り当て、Sparkが以降のマイクロバッチでこれらのパーティションから新しいレコードを読み取れるようにします。

タイムスタンプオフセットオプションの詳細

各パーティションに対して返されるオフセットは、対応するパーティション内の指定されたタイムスタンプ以上のタイムスタンプを持つ最も古いオフセットです。Kafkaが一致するオフセットを返さない場合、動作はオプションによって異なります。各オプションの説明を確認してください。

Sparkはタイムスタンプ情報をKafkaConsumer.offsetsForTimesに渡すだけで、値を解釈したり、推論したりしません。KafkaConsumer.offsetsForTimesの詳細については、javadocを参照してください。また、ここのtimestampの意味は、Kafkaの設定(log.message.timestamp.type)によって異なる場合があります。Kafkaドキュメントで詳細を確認してください。

タイムスタンプオフセットオプションには、Kafka 0.10.1.0以降が必要です。

オフセットの取得

Spark 3.0以前では、Sparkはオフセットの取得にKafkaConsumerを使用していましたが、これによりドライバで無限待機が発生する可能性がありました。Spark 3.1では、新しいオフセット取得メカニズムをAdminClientを使用して行うための新しい構成オプションspark.sql.streaming.kafka.useDeprecatedOffsetFetching(デフォルト:false)が追加されました。(KafkaConsumerを使用した古いオフセット取得を使用するには、これをtrueに設定します。)

新しいメカニズムを使用する場合、以下が適用されます。

まず、新しいアプローチはKafkaブローカ0.11.0.0+をサポートしています。

Spark 3.0以下では、セキュアなKafka処理には、ドライバの観点から次のACLが必要でした。

Spark 3.1以降、オフセットはKafkaConsumerではなくAdminClientを使用して取得できるため、ドライバの観点からは次のACLが必要になります。

ドライバのAdminClientはコンシューマグループに接続していないため、group.idベースの認証は機能しなくなります(エグゼキュータはグループベースの認証を行いません)。エグゼキュータ側は以前とまったく同じ動作をすることに注意してください(グループプレフィックスとオーバーライドは機能します)。

コンシューマキャッシング

Kafkaコンシューマの初期化には時間がかかります。特に、処理時間が重要なストリーミングシナリオではそうです。このため、SparkはApache Commons Poolを利用して、エグゼキュータ上でKafkaコンシューマをプールします。

キャッシングキーは、以下の情報から構築されます。

コンシューマプールを設定するために使用できるプロパティを以下に示します。

プロパティ名デフォルト意味バージョン以降
spark.kafka.consumer.cache.capacity 64 キャッシュされたコンシューマの最大数。これはソフトリミットであることに注意してください。 3.0.0
spark.kafka.consumer.cache.timeout 5m(5分) コンシューマがプール内でアイドル状態になる最小時間。この時間より長くアイドル状態になると、エビクタによって削除される可能性があります。 3.0.0
spark.kafka.consumer.cache.evictorThreadRunInterval 1m(1分) コンシューマプールのアイドルエビクタスレッドの実行間隔。正数でない場合は、アイドルエビクタスレッドは実行されません。 3.0.0
spark.kafka.consumer.cache.jmx.enable false この構成インスタンスで作成されたプールに対してJMXを有効または無効にします。プールの統計情報はJMXインスタンスから利用できます。JMX名のプレフィックスは「kafka010-cached-simple-kafka-consumer-pool」に設定されています。 3.0.0

プールのサイズはspark.kafka.consumer.cache.capacityによって制限されますが、「ソフトリミット」として機能し、Sparkタスクをブロックしません。

アイドルエビクタスレッドは、指定されたタイムアウト時間より長く使用されていないコンシューマを定期的に削除します。借り出し時にこの閾値に達した場合、現在使用されていない、最も使用頻度の低いエントリの削除を試みます。

削除できない場合、プールは成長し続けます。最悪の場合、プールはエグゼキュータで実行できる同時タスクの最大数(つまり、タスクスロット数)にまで成長します。

何らかの理由でタスクが失敗した場合、安全上の理由から、新しいタスクは新しく作成されたKafkaコンシューマを使用して実行されます。同時に、失敗した実行で使用されたコンシューマを削除するために、同じキャッシングキーを持つプール内のすべてのコンシューマを無効化します。他のタスクが使用しているコンシューマは閉じられませんが、プールに戻されたときにも無効化されます。

Sparkはコンシューマとともに、Kafkaからフェッチされたレコードを個別にプールして、Sparkの観点からKafkaコンシューマをステートレスにし、プーリングの効率を最大化します。Kafkaコンシューマプールと同じキャッシュキーを利用します。特性の違いにより、Apache Commons Poolは利用していないことに注意してください。

フェッチされたデータプールを設定するために使用できるプロパティを以下に示します。

プロパティ名デフォルト意味バージョン以降
spark.kafka.consumer.fetchedData.cache.timeout 5m(5分) フェッチされたデータがプール内でアイドル状態になる最小時間。この時間より長くアイドル状態になると、エビクタによって削除される可能性があります。 3.0.0
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval 1m(1分) フェッチされたデータプールのアイドルエビクタスレッドの実行間隔。正数でない場合は、アイドルエビクタスレッドは実行されません。 3.0.0

Kafkaへのデータ書き込み

ここでは、ストリーミングクエリとバッチクエリをApache Kafkaに書き込むためのサポートについて説明します。Apache Kafkaは少なくとも一度書き込みのセマンティクスのみをサポートしていることに注意してください。したがって、ストリーミングクエリまたはバッチクエリをKafkaに書き込む場合、一部のレコードが重複する可能性があります。これは、たとえば、ブローカがメッセージを受信して書き込んだ場合でも、Kafkaがブローカによって承認されなかったメッセージを再試行する必要がある場合に発生する可能性があります。これらのKafka書き込みセマンティクスのため、Structured Streamingは重複の発生を防ぐことはできません。ただし、クエリの書き込みが成功した場合は、クエリの出力が少なくとも一度書き込まれたと見なすことができます。書き込まれたデータを読み取るときに重複を削除する可能性のある解決策は、読み取り時に重複除去を実行するために使用できる主キー(一意キー)を導入することです。

Kafkaに書き込まれるDataFrameは、スキーマに次の列を含んでいる必要があります。

key(オプション) 文字列またはバイナリ
value(必須) 文字列またはバイナリ
headers (オプション) 配列
topic(*オプション) 文字列
partition(オプション) 整数

* トピック列は、「topic」構成オプションが指定されていない場合に必要です。

value列は唯一の必須オプションです。キー列が指定されていない場合、null値のキー列が自動的に追加されます(null値のキー値の処理方法については、Kafkaのセマンティクスを参照してください)。トピック列が存在する場合、その値は「topic」構成オプションが設定されていない限り、指定された行をKafkaに書き込む際のトピックとして使用されます。「topic」構成オプションはトピック列をオーバーライドします。「partition」列が指定されていない場合(またはその値がnullの場合)、パーティションはKafkaプロデューサーによって計算されます。Kafkaパーティショナーは、kafka.partitioner.classオプションを設定することでSparkで指定できます。存在しない場合は、Kafkaのデフォルトパーティショナーが使用されます。

バッチクエリとストリーミングクエリの両方で、Kafkaシンクに対して次のオプションを設定する必要があります。

オプションvalue意味
kafka.bootstrap.servers コンマ区切りの host:port 一覧 Kafka の "bootstrap.servers" 設定。

次の設定はオプションです。

オプションvalueデフォルトクエリタイプ意味
topic 文字列 なし ストリーミングとバッチ すべての行がKafkaに書き込まれるトピックを設定します。このオプションは、データに存在する可能性のあるトピック列をオーバーライドします。
includeHeaders ブール値 false ストリーミングとバッチ 行に Kafka ヘッダーを含めるかどうか。

ストリーミングクエリのKafkaシンクの作成

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .start()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
  .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .start();

バッチクエリの出力をKafkaに書き込む

# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()

# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save();

// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
  .write()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .save();

プロデューサーキャッシング

Kafkaプロデューサーインスタンスはスレッドセーフになるように設計されているため、SparkはKafkaプロデューサーインスタンスを初期化し、同じキャッシングキーを持つタスク間で共有します。

キャッシングキーは、以下の情報から構築されます。

これには、委任トークンが使用されている場合にSparkが自動的に含める認証の設定が含まれます。認証を考慮に入れても、同じKafkaプロデューサー設定間では同じKafkaプロデューサーインスタンスが使用されることが期待できます。委任トークンが更新されると、異なるKafkaプロデューサーが使用されます。古い委任トークンのKafkaプロデューサーインスタンスは、キャッシュポリシーに従って削除されます。

プロデューサープールを設定するために使用できるプロパティを以下に示します。

プロパティ名デフォルト意味バージョン以降
spark.kafka.producer.cache.timeout 10m(10分) プロデューサーがプール内でアイドル状態になる最小時間。この時間より長くアイドル状態になると、エビクタによって削除される可能性があります。 2.2.1
spark.kafka.producer.cache.evictorThreadRunInterval 1m(1分) プロデューサープールのアイドルエビクタスレッドの実行間隔。正数でない場合は、アイドルエビクタスレッドは実行されません。 3.0.0

アイドルエビクタスレッドは、指定されたタイムアウト時間より長く使用されていないプロデューサーを定期的に削除します。プロデューサーは共有され、同時に使用されるため、最後に使用されたタイムスタンプは、プロデューサーインスタンスが返され、参照カウントが0になった時点によって決定されます。

Kafka固有の設定

Kafka独自の構成は、kafka.プレフィックスを使用してDataStreamReader.optionを介して設定できます。例:stream.option("kafka.bootstrap.servers", "host:port")。可能なKafkaパラメータについては、データの読み取りに関連するパラメータについてはKafkaコンシューマ設定ドキュメント、データの書き込みに関連するパラメータについてはKafkaプロデューサー設定ドキュメントを参照してください。

次のKafkaパラメータは設定できず、Kafkaソースまたはシンクは例外をスローします。

デプロイ

他のSparkアプリケーションと同様に、spark-submitを使用してアプリケーションを起動します。spark-sql-kafka-0-10_2.12とその依存関係は、--packagesを使用してspark-submitに直接追加できます。例:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...

spark-shellで実験する場合は、--packagesを使用してspark-sql-kafka-0-10_2.12とその依存関係を直接追加することもできます。

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...

外部依存関係を含むアプリケーションの提出に関する詳細については、「アプリケーション提出ガイド」を参照してください。

セキュリティ

Kafka 0.9.0.0では、クラスタのセキュリティを強化するいくつかの機能が導入されました。これらの可能性に関する詳細な説明については、「Kafkaセキュリティドキュメント」を参照してください。

セキュリティはオプションであり、デフォルトではオフになっていることに注意してください。

Sparkは、Kafkaクラスタに対する認証に以下の方法をサポートしています。

委任トークン

この方法では、アプリケーションをSparkパラメータを介して構成でき、JAASログイン構成は不要になる場合があります(SparkはKafkaの動的JAAS構成機能を使用できます)。委任トークンに関する詳細については、「Kafka委任トークンドキュメント」を参照してください。

このプロセスは、SparkのKafka委任トークンプロバイダーによって開始されます。spark.kafka.clusters.${cluster}.auth.bootstrap.serversが設定されている場合、Sparkは次のログインオプションを優先順位順に考慮します。

spark.security.credentials.kafka.enabledfalseに設定することで、Kafka委任トークンプロバイダーをオフにすることができます(デフォルト:true)。

Sparkは、トークンを取得するために次の認証プロトコルを使用するように構成できます(Kafkaブローカー構成と一致する必要があります)。

委任トークンを正常に取得した後、Sparkはそれをノード間で配布し、それに応じて更新します。委任トークンはSCRAMログインモジュールを使用して認証を行うため、適切なspark.kafka.clusters.${cluster}.sasl.token.mechanism(デフォルト:SCRAM-SHA-512)を構成する必要があります。また、このパラメータはKafkaブローカー構成と一致する必要があります。

エグゼキュータで委任トークンが利用可能な場合、Sparkは次のログインオプションを優先順位順に考慮します。

上記のいずれも当てはまらない場合は、安全でない接続と見なされます。

設定

委任トークンは複数のクラスタから取得でき、${cluster}は、異なる設定をグループ化するのに役立つ任意の一意の識別子です。

プロパティ名デフォルト意味バージョン以降
spark.kafka.clusters.${cluster}.auth.bootstrap.servers なし Kafkaクラスタへの初期接続を確立するために使用する、カンマ区切りのホスト/ポートペアのリスト。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex .* アプリケーション内のソースとシンクのbootstrap.servers設定と照合する正規表現。サーバーアドレスがこの正規表現と一致する場合、接続時に対応するブートストラップサーバーから取得した委任トークンが使用されます。複数のクラスタがアドレスと一致する場合、例外がスローされ、クエリは開始されません。Kafkaの安全なリスナーと安全でないリスナーは異なるポートにバインドされています。両方を使用する場合、セキュアリスナーポートは正規表現の一部である必要があります。 3.0.0
spark.kafka.clusters.${cluster}.security.protocol SASL_SSL ブローカーと通信するために使用するプロトコル。詳細については、Kafkaドキュメントを参照してください。プロトコルは、bootstrap.servers設定が一致するすべてのソースとシンクにデフォルトで適用され(詳細についてはspark.kafka.clusters.${cluster}.target.bootstrap.servers.regexを参照)、ソースまたはシンクでkafka.security.protocolを設定することで上書きできます。 3.0.0
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name kafka Kafkaが実行されるKerberosプリンシパル名。これは、KafkaのJAAS構成またはKafkaの構成のいずれかで定義できます。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.type なし トラストストアファイルのファイル形式。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.2.0
spark.kafka.clusters.${cluster}.ssl.truststore.location なし トラストストアファイルの場所。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.ssl.truststore.password なし トラストストアファイルのストアパスワード。これはオプションであり、spark.kafka.clusters.${cluster}.ssl.truststore.locationが構成されている場合にのみ必要です。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.type なし キーストアファイルのファイル形式。クライアントにはオプションです。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.2.0
spark.kafka.clusters.${cluster}.ssl.keystore.location なし キーストアファイルの場所。クライアントにはオプションであり、クライアントの双方向認証に使用できます。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.ssl.keystore.password なし キーストアファイルのストアパスワード。これはオプションであり、spark.kafka.clusters.${cluster}.ssl.keystore.locationが構成されている場合にのみ必要です。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.ssl.key.password なし キーストアファイル内の秘密鍵のパスワード。クライアントにはオプションです。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 3.0.0
spark.kafka.clusters.${cluster}.sasl.token.mechanism SCRAM-SHA-512 委任トークンを使用してクライアント接続に使用するSASLメカニズム。認証に使用されるSCRAMログインモジュールのため、互換性のあるメカニズムをここに設定する必要があります。詳細については、Kafkaドキュメント(sasl.mechanism)を参照してください。委任トークンを使用してKafkaブローカーに対して認証するためだけに使用されます。 3.0.0

Kafka固有の設定

Kafka独自の構成は、kafka.プレフィックスを使用して設定できます。例:--conf spark.kafka.clusters.${cluster}.kafka.retries=1。可能なKafkaパラメータについては、「Kafka adminclient構成ドキュメント」を参照してください。

注意事項

JAASログイン構成

JAASログイン構成は、SparkがKafkaクラスタにアクセスしようとするすべてのノードに配置する必要があります。これにより、保守コストが高くなりますが、カスタム認証ロジックを適用できます。これはいくつかの方法で行うことができます。1つの方法は、追加のJVMパラメータを提供することです。例:

./bin/spark-submit \
    --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
    --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
    ...