構造化ストリーミング + Kafka 統合ガイド (Kafka ブローカーバージョン 0.10.0 以上)
Kafka 0.10 からデータを読み取り、Kafka にデータ書き込むための構造化ストリーミングの統合。
リンク
SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合、次のアーティファクトとアプリケーションをリンクします。
groupId = org.apache.spark
artifactId = spark-sql-kafka-0-10_2.12
version = 3.5.1
ヘッダー機能を使用するには、Kafka クライアントのバージョンが 0.11.0.0 以上である必要があることに注意してください。
Python アプリケーションの場合、アプリケーションのデプロイ時に上記のライブラリとその依存関係を追加する必要があります。デプロイ のセクションを参照してください。
spark-shell
で実験する場合は、spark-shell
を呼び出す際にも、上記のライブラリとその依存関係を追加する必要があります。デプロイ のセクションを参照してください。
Kafka からデータを読み取る
ストリーミングクエリ用の Kafka ソースの作成
# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to 1 topic, with headers
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# Subscribe to multiple topics
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic, with headers
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
.as[(String, String, Array[(String, Array[Byte])])]
// Subscribe to multiple topics
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to 1 topic, with headers
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.option("includeHeaders", "true")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
// Subscribe to multiple topics
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
バッチクエリ用の Kafka ソースの作成
バッチ処理に適したユースケースがある場合は、定義済みのオフセット範囲の Dataset/DataFrame を作成できます。
# Subscribe to 1 topic defaults to the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics, specifying explicit Kafka offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern, at the earliest and latest offsets
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
ソース内の各行には、次のスキーマがあります。
列 | 型 |
---|---|
key | バイナリ |
value | バイナリ |
topic | 文字列 |
partition | 整数 |
offset | long |
timestamp | timestamp |
timestampType | 整数 |
headers (オプション) | 配列 |
バッチクエリとストリーミングクエリの両方について、Kafka ソースには次のオプションを設定する必要があります。
オプション | value | 意味 |
---|---|---|
assign | JSON 文字列 {"topicA":[0,1],"topicB":[2,4]} | 消費する特定の TopicPartitions。Kafka ソースでは、「assign」、「subscribe」、または「subscribePattern」オプションのいずれか1つのみを指定できます。 |
subscribe | コンマ区切りのトピック一覧 | 購読するトピック一覧。Kafka ソースでは、「assign」、「subscribe」、または「subscribePattern」オプションのいずれか1つのみを指定できます。 |
subscribePattern | Java 正規表現文字列 | トピックへの購読に使用されるパターン。Kafka ソースでは、「assign」、「subscribe」、または「subscribePattern」オプションのいずれか1つのみを指定できます。 |
kafka.bootstrap.servers | コンマ区切りの host:port 一覧 | Kafka の "bootstrap.servers" 設定。 |
次の設定はオプションです。
オプション | value | デフォルト | クエリタイプ | 意味 |
---|---|---|---|---|
startingTimestamp | タイムスタンプ文字列(例:"1000") | なし(次の優先順位は `startingOffsetsByTimestamp`) | ストリーミングとバッチ | クエリが開始されたときのタイムスタンプの開始点。購読されているトピックのすべてのパーティションの開始タイムスタンプを指定する文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、動作はオプション `startingOffsetsByTimestampStrategy` の値に従います。
注記1:`startingTimestamp` は `startingOffsetsByTimestamp` と `startingOffsets` より優先されます。 注記2:ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されます。クエリ中に新しく検出されたパーティションは、最も古いものから開始されます。 |
startingOffsetsByTimestamp | JSON 文字列 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | なし(次の優先順位は `startingOffsets`) | ストリーミングとバッチ | クエリが開始されたときのタイムスタンプの開始点。各 TopicPartition の開始タイムスタンプを指定する JSON 文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、動作はオプション `startingOffsetsByTimestampStrategy` の値に従います。
注記1:`startingOffsetsByTimestamp` は `startingOffsets` より優先されます。 注記2:ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されます。クエリ中に新しく検出されたパーティションは、最も古いものから開始されます。 |
startingOffsets | "earliest"、"latest"(ストリーミングのみ)、または JSON 文字列 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """ | ストリーミングの場合は "latest"、バッチの場合は "earliest" | ストリーミングとバッチ | クエリが開始されたときの開始点。"earliest" は最も古いオフセットから、"latest" は最新のオフセットから、または各 TopicPartition の開始オフセットを指定する JSON 文字列。JSON では、オフセットとして -2 を使用して最も古いものを参照し、-1 を使用して最新のものを参照できます。注:バッチクエリの場合、latest(暗黙的に、または JSON で -1 を使用して)は許可されません。ストリーミングクエリの場合、これは新しいクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されます。クエリ中に新しく検出されたパーティションは、最も古いものから開始されます。 |
endingTimestamp | タイムスタンプ文字列(例:"1000") | なし(次の優先順位は `endingOffsetsByTimestamp`) | バッチクエリ | バッチクエリが終了したときの終了点。購読されているトピックのすべてのパーティションの終了タイムスタンプを指定する JSON 文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、オフセットは最新に設定されます。 注記:`endingTimestamp` は `endingOffsetsByTimestamp` と `endingOffsets` より優先されます。
|
endingOffsetsByTimestamp | JSON 文字列 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """ | なし(次の優先順位は `endingOffsets`) | バッチクエリ | バッチクエリが終了したときの終了点。各 TopicPartition の終了タイムスタンプを指定する JSON 文字列。タイムスタンプオフセットオプションの詳細については、以下を参照してください。Kafka が一致するオフセットを返さない場合、オフセットは最新に設定されます。 注記:`endingOffsetsByTimestamp` は `endingOffsets` より優先されます。 |
endingOffsets | latest または JSON 文字列 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} | latest | バッチクエリ | バッチクエリが終了したときの終了点。"latest" は最新を参照するだけ、または各 TopicPartition の終了オフセットを指定する JSON 文字列。JSON では、オフセットとして -1 を使用して最新を参照し、オフセットとして -2(最も古い)は使用できません。 |
failOnDataLoss | true または false | true | ストリーミングとバッチ | データが失われる可能性がある場合(例:トピックが削除された場合、またはオフセットが範囲外の場合)にクエリを失敗させるかどうか。これは誤検知の可能性があります。期待通りに動作しない場合は、無効にできます。 |
kafkaConsumer.pollTimeoutMs | long | 120000 | ストリーミングとバッチ | エグゼキュータで Kafka からデータを取得するためのタイムアウト(ミリ秒単位)。定義されていない場合は、`spark.network.timeout` にフォールバックします。 |
fetchOffset.numRetries | 整数 | 3 | ストリーミングとバッチ | Kafka オフセットのフェッチを諦める前に再試行する回数。 |
fetchOffset.retryIntervalMs | long | 10 | ストリーミングとバッチ | Kafka オフセットのフェッチを再試行するまでの待機時間(ミリ秒単位)。 |
maxOffsetsPerTrigger | long | なし | ストリーミングクエリ | トリガー間隔ごとに処理されるオフセットの最大数に対するレート制限。指定されたオフセットの総数は、異なるボリュームの TopicPartition に比例して分割されます。 |
minOffsetsPerTrigger | long | なし | ストリーミングクエリ | トリガー間隔ごとに処理されるオフセットの最小数。指定されたオフセットの総数は、異なるボリュームの TopicPartition に比例して分割されます。注:maxTriggerDelay を超えた場合、使用可能なオフセットの数が minOffsetsPerTrigger に達しなくてもトリガーが起動されます。 |
maxTriggerDelay | 単位付き時間 | 15m | ストリーミングクエリ | ソースからデータが使用可能である場合、2 つのトリガー間のトリガーを遅延させることができる最大時間。このオプションは、minOffsetsPerTrigger が設定されている場合にのみ適用されます。 |
minPartitions | 整数 | なし | ストリーミングとバッチ | Kafka から読み取るパーティションの最小数。デフォルトでは、Spark は Kafka から消費する Spark パーティションへの TopicPartition の 1 対 1 マッピングを行います。このオプションを TopicPartition より大きい値に設定すると、Spark は大きな Kafka パーティションを小さな部分に分割します。この設定はヒントのようなものであることに注意してください。Spark タスクの数は **約** `minPartitions` になります。丸め誤差や新しいデータを受け取らなかった Kafka パーティションによっては、少なくなる場合や多くなる場合があります。 |
groupIdPrefix | 文字列 | spark-kafka-source | ストリーミングとバッチ | 構造化ストリーミングクエリによって生成されるコンシューマグループ識別子(`group.id`)のプレフィックス。"kafka.group.id" が設定されている場合、このオプションは無視されます。 |
kafka.group.id | 文字列 | なし | ストリーミングとバッチ | Kafka から読み取る際に Kafka コンシューマで使用する Kafka グループ ID。注意して使用してください。デフォルトでは、各クエリはデータ読み取り用の固有のグループ ID を生成します。これにより、各 Kafka ソースに他のコンシューマからの干渉を受けない独自のコンシューマグループが確保され、したがって購読されたトピックのすべてのパーティションを読み取ることができます。いくつかのシナリオ(たとえば、Kafka グループベースの承認)では、データ読み取りに特定の承認済みグループ ID を使用する場合があります。オプションでグループ ID を設定できます。ただし、予期しない動作が発生する可能性があるため、非常に注意して行ってください。同時に実行されるクエリ(バッチとストリーミングの両方)または同じグループ ID を持つソースは、互いに干渉して各クエリがデータの一部のみを読み取る可能性があります。これは、クエリが短時間に開始/再開された場合にも発生する可能性があります。このような問題を最小限に抑えるには、Kafka コンシューマのセッションタイムアウト(オプション "kafka.session.timeout.ms" を設定して)を非常に小さく設定します。これが設定されている場合、オプション "groupIdPrefix" は無視されます。 |
includeHeaders | ブール値 | false | ストリーミングとバッチ | 行に Kafka ヘッダーを含めるかどうか。 |
startingOffsetsByTimestampStrategy | "error" または "latest" | "error" | ストリーミングとバッチ | タイムスタンプによる指定された開始オフセット(グローバルまたはパーティションごと)が、Kafkaが返したオフセットと一致しない場合に、この戦略が使用されます。戦略名と対応する説明を以下に示します。
"error":クエリが失敗し、エンドユーザーは手動による手順を必要とする回避策に対処する必要があります。 "latest":これらのパーティションの最新のオフセットを割り当て、Sparkが以降のマイクロバッチでこれらのパーティションから新しいレコードを読み取れるようにします。 |
タイムスタンプオフセットオプションの詳細
各パーティションに対して返されるオフセットは、対応するパーティション内の指定されたタイムスタンプ以上のタイムスタンプを持つ最も古いオフセットです。Kafkaが一致するオフセットを返さない場合、動作はオプションによって異なります。各オプションの説明を確認してください。
Sparkはタイムスタンプ情報をKafkaConsumer.offsetsForTimes
に渡すだけで、値を解釈したり、推論したりしません。KafkaConsumer.offsetsForTimes
の詳細については、javadocを参照してください。また、ここのtimestamp
の意味は、Kafkaの設定(log.message.timestamp.type
)によって異なる場合があります。Kafkaドキュメントで詳細を確認してください。
タイムスタンプオフセットオプションには、Kafka 0.10.1.0以降が必要です。
オフセットの取得
Spark 3.0以前では、Sparkはオフセットの取得にKafkaConsumer
を使用していましたが、これによりドライバで無限待機が発生する可能性がありました。Spark 3.1では、新しいオフセット取得メカニズムをAdminClient
を使用して行うための新しい構成オプションspark.sql.streaming.kafka.useDeprecatedOffsetFetching
(デフォルト:false
)が追加されました。(KafkaConsumer
を使用した古いオフセット取得を使用するには、これをtrue
に設定します。)
新しいメカニズムを使用する場合、以下が適用されます。
まず、新しいアプローチはKafkaブローカ0.11.0.0+
をサポートしています。
Spark 3.0以下では、セキュアなKafka処理には、ドライバの観点から次のACLが必要でした。
- トピックリソース記述操作
- トピックリソース読み取り操作
- グループリソース読み取り操作
Spark 3.1以降、オフセットはKafkaConsumer
ではなくAdminClient
を使用して取得できるため、ドライバの観点からは次のACLが必要になります。
- トピックリソース記述操作
ドライバのAdminClient
はコンシューマグループに接続していないため、group.id
ベースの認証は機能しなくなります(エグゼキュータはグループベースの認証を行いません)。エグゼキュータ側は以前とまったく同じ動作をすることに注意してください(グループプレフィックスとオーバーライドは機能します)。
コンシューマキャッシング
Kafkaコンシューマの初期化には時間がかかります。特に、処理時間が重要なストリーミングシナリオではそうです。このため、SparkはApache Commons Poolを利用して、エグゼキュータ上でKafkaコンシューマをプールします。
キャッシングキーは、以下の情報から構築されます。
- トピック名
- トピックパーティション
- グループID
コンシューマプールを設定するために使用できるプロパティを以下に示します。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.kafka.consumer.cache.capacity | 64 | キャッシュされたコンシューマの最大数。これはソフトリミットであることに注意してください。 | 3.0.0 |
spark.kafka.consumer.cache.timeout | 5m(5分) | コンシューマがプール内でアイドル状態になる最小時間。この時間より長くアイドル状態になると、エビクタによって削除される可能性があります。 | 3.0.0 |
spark.kafka.consumer.cache.evictorThreadRunInterval | 1m(1分) | コンシューマプールのアイドルエビクタスレッドの実行間隔。正数でない場合は、アイドルエビクタスレッドは実行されません。 | 3.0.0 |
spark.kafka.consumer.cache.jmx.enable | false | この構成インスタンスで作成されたプールに対してJMXを有効または無効にします。プールの統計情報はJMXインスタンスから利用できます。JMX名のプレフィックスは「kafka010-cached-simple-kafka-consumer-pool」に設定されています。 | 3.0.0 |
プールのサイズはspark.kafka.consumer.cache.capacity
によって制限されますが、「ソフトリミット」として機能し、Sparkタスクをブロックしません。
アイドルエビクタスレッドは、指定されたタイムアウト時間より長く使用されていないコンシューマを定期的に削除します。借り出し時にこの閾値に達した場合、現在使用されていない、最も使用頻度の低いエントリの削除を試みます。
削除できない場合、プールは成長し続けます。最悪の場合、プールはエグゼキュータで実行できる同時タスクの最大数(つまり、タスクスロット数)にまで成長します。
何らかの理由でタスクが失敗した場合、安全上の理由から、新しいタスクは新しく作成されたKafkaコンシューマを使用して実行されます。同時に、失敗した実行で使用されたコンシューマを削除するために、同じキャッシングキーを持つプール内のすべてのコンシューマを無効化します。他のタスクが使用しているコンシューマは閉じられませんが、プールに戻されたときにも無効化されます。
Sparkはコンシューマとともに、Kafkaからフェッチされたレコードを個別にプールして、Sparkの観点からKafkaコンシューマをステートレスにし、プーリングの効率を最大化します。Kafkaコンシューマプールと同じキャッシュキーを利用します。特性の違いにより、Apache Commons Poolは利用していないことに注意してください。
フェッチされたデータプールを設定するために使用できるプロパティを以下に示します。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.kafka.consumer.fetchedData.cache.timeout | 5m(5分) | フェッチされたデータがプール内でアイドル状態になる最小時間。この時間より長くアイドル状態になると、エビクタによって削除される可能性があります。 | 3.0.0 |
spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval | 1m(1分) | フェッチされたデータプールのアイドルエビクタスレッドの実行間隔。正数でない場合は、アイドルエビクタスレッドは実行されません。 | 3.0.0 |
Kafkaへのデータ書き込み
ここでは、ストリーミングクエリとバッチクエリをApache Kafkaに書き込むためのサポートについて説明します。Apache Kafkaは少なくとも一度書き込みのセマンティクスのみをサポートしていることに注意してください。したがって、ストリーミングクエリまたはバッチクエリをKafkaに書き込む場合、一部のレコードが重複する可能性があります。これは、たとえば、ブローカがメッセージを受信して書き込んだ場合でも、Kafkaがブローカによって承認されなかったメッセージを再試行する必要がある場合に発生する可能性があります。これらのKafka書き込みセマンティクスのため、Structured Streamingは重複の発生を防ぐことはできません。ただし、クエリの書き込みが成功した場合は、クエリの出力が少なくとも一度書き込まれたと見なすことができます。書き込まれたデータを読み取るときに重複を削除する可能性のある解決策は、読み取り時に重複除去を実行するために使用できる主キー(一意キー)を導入することです。
Kafkaに書き込まれるDataFrameは、スキーマに次の列を含んでいる必要があります。
列 | 型 |
---|---|
key(オプション) | 文字列またはバイナリ |
value(必須) | 文字列またはバイナリ |
headers (オプション) | 配列 |
topic(*オプション) | 文字列 |
partition(オプション) | 整数 |
* トピック列は、「topic」構成オプションが指定されていない場合に必要です。
value列は唯一の必須オプションです。キー列が指定されていない場合、null
値のキー列が自動的に追加されます(null
値のキー値の処理方法については、Kafkaのセマンティクスを参照してください)。トピック列が存在する場合、その値は「topic」構成オプションが設定されていない限り、指定された行をKafkaに書き込む際のトピックとして使用されます。「topic」構成オプションはトピック列をオーバーライドします。「partition」列が指定されていない場合(またはその値がnull
の場合)、パーティションはKafkaプロデューサーによって計算されます。Kafkaパーティショナーは、kafka.partitioner.class
オプションを設定することでSparkで指定できます。存在しない場合は、Kafkaのデフォルトパーティショナーが使用されます。
バッチクエリとストリーミングクエリの両方で、Kafkaシンクに対して次のオプションを設定する必要があります。
オプション | value | 意味 |
---|---|---|
kafka.bootstrap.servers | コンマ区切りの host:port 一覧 | Kafka の "bootstrap.servers" 設定。 |
次の設定はオプションです。
オプション | value | デフォルト | クエリタイプ | 意味 |
---|---|---|---|---|
topic | 文字列 | なし | ストリーミングとバッチ | すべての行がKafkaに書き込まれるトピックを設定します。このオプションは、データに存在する可能性のあるトピック列をオーバーライドします。 |
includeHeaders | ブール値 | false | ストリーミングとバッチ | 行に Kafka ヘッダーを含めるかどうか。 |
ストリーミングクエリのKafkaシンクの作成
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start();
バッチクエリの出力をKafkaに書き込む
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save();
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save();
プロデューサーキャッシング
Kafkaプロデューサーインスタンスはスレッドセーフになるように設計されているため、SparkはKafkaプロデューサーインスタンスを初期化し、同じキャッシングキーを持つタスク間で共有します。
キャッシングキーは、以下の情報から構築されます。
- Kafkaプロデューサーの設定
これには、委任トークンが使用されている場合にSparkが自動的に含める認証の設定が含まれます。認証を考慮に入れても、同じKafkaプロデューサー設定間では同じKafkaプロデューサーインスタンスが使用されることが期待できます。委任トークンが更新されると、異なるKafkaプロデューサーが使用されます。古い委任トークンのKafkaプロデューサーインスタンスは、キャッシュポリシーに従って削除されます。
プロデューサープールを設定するために使用できるプロパティを以下に示します。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.kafka.producer.cache.timeout | 10m(10分) | プロデューサーがプール内でアイドル状態になる最小時間。この時間より長くアイドル状態になると、エビクタによって削除される可能性があります。 | 2.2.1 |
spark.kafka.producer.cache.evictorThreadRunInterval | 1m(1分) | プロデューサープールのアイドルエビクタスレッドの実行間隔。正数でない場合は、アイドルエビクタスレッドは実行されません。 | 3.0.0 |
アイドルエビクタスレッドは、指定されたタイムアウト時間より長く使用されていないプロデューサーを定期的に削除します。プロデューサーは共有され、同時に使用されるため、最後に使用されたタイムスタンプは、プロデューサーインスタンスが返され、参照カウントが0になった時点によって決定されます。
Kafka固有の設定
Kafka独自の構成は、kafka.
プレフィックスを使用してDataStreamReader.option
を介して設定できます。例:stream.option("kafka.bootstrap.servers", "host:port")
。可能なKafkaパラメータについては、データの読み取りに関連するパラメータについてはKafkaコンシューマ設定ドキュメント、データの書き込みに関連するパラメータについてはKafkaプロデューサー設定ドキュメントを参照してください。
次のKafkaパラメータは設定できず、Kafkaソースまたはシンクは例外をスローします。
- group.id:Kafkaソースは、各クエリに対して一意のグループIDを自動的に作成します。ユーザーは、自動的に生成されるgroup.idのプレフィックスをオプションのソースオプション
groupIdPrefix
を介して設定できます。デフォルト値は「spark-kafka-source」です。「kafka.group.id」を設定してSparkに特別なgroup.idを使用させることもできますが、このオプションに関する警告を読んでから慎重に使用してください。 - auto.offset.reset:代わりに開始位置を指定するには、ソースオプション
startingOffsets
を設定します。Structured Streamingは、Kafka Consumerに依存するのではなく、内部的に消費されるオフセットを管理します。これにより、新しいトピック/パーティションが動的にサブスクライブされた場合でも、データが失われないようにします。startingOffsets
は、新しいストリーミングクエリが開始された場合にのみ適用され、再開は常にクエリが中断した場所から再開されることに注意してください。ストリーミングアプリケーションによって消費されるオフセットがKafkaに存在しなくなった場合(例:トピックが削除された、オフセットが範囲外である、またはオフセットが保持期間後に削除された)、オフセットはリセットされず、ストリーミングアプリケーションはデータ損失が発生します。極端なケースでは、たとえば、ストリーミングアプリケーションのスループットがKafkaの保持速度に追いつけない場合、バッチのオフセット範囲がKafkaに完全に存在しなくなると、バッチの入力行は徐々に減少してゼロになる可能性があります。failOnDataLoss
オプションを有効にすると、このような場合にStructured Streamingはクエリを失敗させることができます。 - key.deserializer: キーは常にByteArrayDeserializerを使用してバイト配列として逆シリアル化されます。キーを明示的に逆シリアル化するには、DataFrame操作を使用してください。
- value.deserializer: 値は常にByteArrayDeserializerを使用してバイト配列として逆シリアル化されます。値を明示的に逆シリアル化するには、DataFrame操作を使用してください。
- key.serializer: キーは常にByteArraySerializerまたはStringSerializerを使用してシリアル化されます。キーを文字列またはバイト配列に明示的にシリアル化するには、DataFrame操作を使用してください。
- value.serializer: 値は常にByteArraySerializerまたはStringSerializerを使用してシリアル化されます。値を文字列またはバイト配列に明示的にシリアル化するには、DataFrame操作を使用してください。
- enable.auto.commit: Kafkaソースはオフセットをコミットしません。
- interceptor.classes: Kafkaソースは常にキーと値をバイト配列として読み取ります。クエリが中断する可能性があるため、ConsumerInterceptorを使用することは安全ではありません。
デプロイ
他のSparkアプリケーションと同様に、spark-submit
を使用してアプリケーションを起動します。spark-sql-kafka-0-10_2.12
とその依存関係は、--packages
を使用してspark-submit
に直接追加できます。例:
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...
spark-shell
で実験する場合は、--packages
を使用してspark-sql-kafka-0-10_2.12
とその依存関係を直接追加することもできます。
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 ...
外部依存関係を含むアプリケーションの提出に関する詳細については、「アプリケーション提出ガイド」を参照してください。
セキュリティ
Kafka 0.9.0.0では、クラスタのセキュリティを強化するいくつかの機能が導入されました。これらの可能性に関する詳細な説明については、「Kafkaセキュリティドキュメント」を参照してください。
セキュリティはオプションであり、デフォルトではオフになっていることに注意してください。
Sparkは、Kafkaクラスタに対する認証に以下の方法をサポートしています。
- 委任トークン(Kafkaブローカー1.1.0で導入)
- JAASログイン構成
委任トークン
この方法では、アプリケーションをSparkパラメータを介して構成でき、JAASログイン構成は不要になる場合があります(SparkはKafkaの動的JAAS構成機能を使用できます)。委任トークンに関する詳細については、「Kafka委任トークンドキュメント」を参照してください。
このプロセスは、SparkのKafka委任トークンプロバイダーによって開始されます。spark.kafka.clusters.${cluster}.auth.bootstrap.servers
が設定されている場合、Sparkは次のログインオプションを優先順位順に考慮します。
- JAASログイン構成、以下の例を参照してください。
-
Keytabファイル、例:
./bin/spark-submit \ --keytab <KEYTAB_FILE> \ --principal <PRINCIPAL> \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ...
-
Kerberos資格情報キャッシュ、例:
./bin/spark-submit \ --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \ ...
spark.security.credentials.kafka.enabled
をfalse
に設定することで、Kafka委任トークンプロバイダーをオフにすることができます(デフォルト:true
)。
Sparkは、トークンを取得するために次の認証プロトコルを使用するように構成できます(Kafkaブローカー構成と一致する必要があります)。
- SASL SSL(デフォルト)
- SSL
- SASL PLAINTEXT(テスト用)
委任トークンを正常に取得した後、Sparkはそれをノード間で配布し、それに応じて更新します。委任トークンはSCRAM
ログインモジュールを使用して認証を行うため、適切なspark.kafka.clusters.${cluster}.sasl.token.mechanism
(デフォルト:SCRAM-SHA-512
)を構成する必要があります。また、このパラメータはKafkaブローカー構成と一致する必要があります。
エグゼキュータで委任トークンが利用可能な場合、Sparkは次のログインオプションを優先順位順に考慮します。
- JAASログイン構成、以下の例を参照してください。
- 委任トークン、詳細については
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex
パラメータを参照してください。
上記のいずれも当てはまらない場合は、安全でない接続と見なされます。
設定
委任トークンは複数のクラスタから取得でき、${cluster}
は、異なる設定をグループ化するのに役立つ任意の一意の識別子です。
プロパティ名 | デフォルト | 意味 | バージョン以降 |
---|---|---|---|
spark.kafka.clusters.${cluster}.auth.bootstrap.servers |
なし | Kafkaクラスタへの初期接続を確立するために使用する、カンマ区切りのホスト/ポートペアのリスト。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.0.0 |
spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex |
.* | アプリケーション内のソースとシンクのbootstrap.servers 設定と照合する正規表現。サーバーアドレスがこの正規表現と一致する場合、接続時に対応するブートストラップサーバーから取得した委任トークンが使用されます。複数のクラスタがアドレスと一致する場合、例外がスローされ、クエリは開始されません。Kafkaの安全なリスナーと安全でないリスナーは異なるポートにバインドされています。両方を使用する場合、セキュアリスナーポートは正規表現の一部である必要があります。 |
3.0.0 |
spark.kafka.clusters.${cluster}.security.protocol |
SASL_SSL | ブローカーと通信するために使用するプロトコル。詳細については、Kafkaドキュメントを参照してください。プロトコルは、bootstrap.servers 設定が一致するすべてのソースとシンクにデフォルトで適用され(詳細についてはspark.kafka.clusters.${cluster}.target.bootstrap.servers.regex を参照)、ソースまたはシンクでkafka.security.protocol を設定することで上書きできます。 |
3.0.0 |
spark.kafka.clusters.${cluster}.sasl.kerberos.service.name |
kafka | Kafkaが実行されるKerberosプリンシパル名。これは、KafkaのJAAS構成またはKafkaの構成のいずれかで定義できます。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.type |
なし | トラストストアファイルのファイル形式。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.location |
なし | トラストストアファイルの場所。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.truststore.password |
なし | トラストストアファイルのストアパスワード。これはオプションであり、spark.kafka.clusters.${cluster}.ssl.truststore.location が構成されている場合にのみ必要です。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 |
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.type |
なし | キーストアファイルのファイル形式。クライアントにはオプションです。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.2.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.location |
なし | キーストアファイルの場所。クライアントにはオプションであり、クライアントの双方向認証に使用できます。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.0.0 |
spark.kafka.clusters.${cluster}.ssl.keystore.password |
なし | キーストアファイルのストアパスワード。これはオプションであり、spark.kafka.clusters.${cluster}.ssl.keystore.location が構成されている場合にのみ必要です。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 |
3.0.0 |
spark.kafka.clusters.${cluster}.ssl.key.password |
なし | キーストアファイル内の秘密鍵のパスワード。クライアントにはオプションです。詳細については、Kafkaドキュメントを参照してください。委任トークンを取得するためだけに使用されます。 | 3.0.0 |
spark.kafka.clusters.${cluster}.sasl.token.mechanism |
SCRAM-SHA-512 | 委任トークンを使用してクライアント接続に使用するSASLメカニズム。認証に使用されるSCRAMログインモジュールのため、互換性のあるメカニズムをここに設定する必要があります。詳細については、Kafkaドキュメント(sasl.mechanism )を参照してください。委任トークンを使用してKafkaブローカーに対して認証するためだけに使用されます。 |
3.0.0 |
Kafka固有の設定
Kafka独自の構成は、kafka.
プレフィックスを使用して設定できます。例:--conf spark.kafka.clusters.${cluster}.kafka.retries=1
。可能なKafkaパラメータについては、「Kafka adminclient構成ドキュメント」を参照してください。
注意事項
- プロキシユーザーの委任トークンの取得はまだサポートされていません(KAFKA-6945)。
JAASログイン構成
JAASログイン構成は、SparkがKafkaクラスタにアクセスしようとするすべてのノードに配置する必要があります。これにより、保守コストが高くなりますが、カスタム認証ロジックを適用できます。これはいくつかの方法で行うことができます。1つの方法は、追加のJVMパラメータを提供することです。例:
./bin/spark-submit \
--driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
--conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
...