Spark Streaming + Kafka 連携ガイド (Kafka ブローカーバージョン 0.10.0 以降)

Kafka 0.10 の Spark Streaming 連携は、単純な並列処理、Kafka パーティションと Spark パーティションの 1 対 1 の対応、オフセットとメタデータへのアクセスを提供します。ただし、新しい連携では単純な API ではなく 新しい Kafka Consumer API を使用するため、使用方法に顕著な違いがあります。

リンク

SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合、ストリーミングアプリケーションを以下のアーティファクトとリンクします (詳細については、メインのプログラミングガイドの リンクセクション を参照してください)。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.13
version = 4.0.0

手動で org.apache.kafka アーティファクト (例: kafka-clients) に依存関係を追加しないでください。spark-streaming-kafka-0-10 アーティファクトには、適切な推移的な依存関係が既に含まれており、異なるバージョンは診断が困難な方法で互換性がない可能性があります。

ダイレクトストリームの作成

インポートの名前空間にはバージョンが含まれることに注意してください: org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

ストリーム内の各項目は ConsumerRecord です。

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

考えられる kafkaParams については、Kafka Consumer 設定ドキュメント を参照してください。Spark のバッチ期間がデフォルトの Kafka ハートビートセッションタイムアウト (30 秒) より大きい場合は、heartbeat.interval.ms と session.timeout.ms を適切に増やしてください。5 分を超えるバッチの場合、ブローカーで group.max.session.timeout.ms を変更する必要があります。例では enable.auto.commit を false に設定していることに注意してください。詳細については、以下の オフセットの保存 を参照してください。

LocationStrategies

新しい Kafka Consumer API は、メッセージをバッファにプリフェッチします。したがって、パフォーマンス上の理由から、Spark 連携はキャッシュされたコンシューマーをエグゼキューターに保持し (各バッチで再作成するのではなく)、適切なコンシューマーが存在するホストロケーションにパーティションをスケジュールすることを優先することが重要です。

ほとんどの場合、上記のように LocationStrategies.PreferConsistent を使用する必要があります。これにより、パーティションが利用可能なエグゼキューターに均等に分散されます。エグゼキューターが Kafka ブローカーと同じホストにある場合は、PreferBrokers を使用して、そのパーティションの Kafka リーダーにパーティションをスケジュールすることを優先します。最後に、パーティション間で負荷に大きな偏りがある場合は、PreferFixed を使用します。これにより、パーティションとホストの明示的なマッピングを指定できます (指定されていないパーティションは一貫したロケーションを使用します)。

コンシューマーのキャッシュのデフォルトの最大サイズは 64 です。(64 * エグゼキューター数) を超える Kafka パーティションを処理する予定がある場合は、spark.streaming.kafka.consumer.cache.maxCapacity を介してこの設定を変更できます。

Kafka コンシューマーのキャッシュを無効にしたい場合は、spark.streaming.kafka.consumer.cache.enabledfalse に設定できます。

キャッシュは topicpartition と group.id でキー付けされているため、createDirectStream の各呼び出しには**個別の** group.id を使用してください。

ConsumerStrategies

新しい Kafka Consumer API には、トピックを指定するさまざまな方法があり、その一部はオブジェクトインスタンス化後のかなりのセットアップを必要とします。ConsumerStrategies は、Spark がチェックポイントからの再起動後でも適切に構成されたコンシューマーを取得できるようにする抽象化を提供します。

ConsumerStrategies.Subscribe は、上記のように、固定のトピックコレクションにサブスクライブできます。SubscribePattern は、正規表現を使用して関心のあるトピックを指定できます。0.8 の連携とは異なり、Subscribe または SubscribePattern を使用すると、実行中のストリーム中のパーティションの追加に対応する必要があることに注意してください。最後に、Assign は、固定のパーティションコレクションを指定できます。3 つの戦略すべてに、特定のパーティションの開始オフセットを指定できるオーバーロードされたコンストラクターがあります。

上記以外のコンシューマー設定のニーズがある場合は、ConsumerStrategy は拡張できる公開クラスです。

RDD の作成

バッチ処理に適したユースケースがある場合は、指定されたオフセット範囲の RDD を作成できます。

// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

ストリームがない場合、ドライバー側で自動的にブローカーメタデータを検索するコンシューマーがないため、PreferBrokers は使用できないことに注意してください。必要に応じて、独自のメタデータ検索とともに PreferFixed を使用してください。

オフセットの取得

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

HasOffsetRanges への型キャストは、createDirectStream の結果に対して最初に呼び出されたメソッドでのみ成功し、後続のメソッドチェーンでは失敗することに注意してください。RDD パーティションと Kafka パーティションの 1 対 1 のマッピングは、shuffle または repartition を行うメソッド (例: reduceByKey() または window()) の後には維持されないことに注意してください。

オフセットの保存

障害発生時の Kafka の配信セマンティクスは、オフセットがどのように、いつ保存されるかに依存します。Spark の出力操作は 少なくとも 1 回 です。したがって、完全に一度のセマンティクスと同等のものを望む場合は、冪等な出力の後にオフセットを保存するか、出力と同時にアトミックトランザクションでオフセットを保存する必要があります。この連携では、オフセットを保存する方法として、信頼性が高まる順 (およびコードの複雑さが増す順) に 3 つのオプションがあります。

チェックポイント

Spark の チェックポイント を有効にすると、オフセットはチェックポイントに保存されます。これは有効にするのが簡単ですが、欠点もあります。出力操作は冪等である必要があります。なぜなら、出力が繰り返されるためであり、トランザクションはオプションではありません。さらに、アプリケーションコードが変更された場合、チェックポイントから回復することはできません。計画的なアップグレードの場合、新しいコードを古いコードと同時に実行することでこれを軽減できます (出力は anyway 冪等である必要があるため、競合することはないはずです)。しかし、計画外の障害でコードの変更が必要な場合、既知の良い開始オフセットを特定する別の方法がない限り、データが失われます。

Kafka 自体

Kafka には、オフセットを特別な Kafka トピックに保存するオフセットコミット API があります。デフォルトでは、新しいコンシューマーは定期的にオフセットを自動コミットします。これはほとんどの場合望ましくありません。なぜなら、コンシューマーによって正常にポーリングされたメッセージがまだ Spark の出力操作の結果になっていない可能性があり、定義されていないセマンティクスにつながるためです。そのため、上記のストリーム例では "enable.auto.commit" を false に設定しています。しかし、出力が保存されたことがわかった後に、commitAsync API を使用して Kafka にオフセットをコミットできます。チェックポイントと比較した場合の利点は、Kafka はアプリケーションコードの変更に関係なく耐久性のあるストアであることです。ただし、Kafka はトランザクショナルではないため、出力は依然として冪等である必要があります。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

HasOffsetRanges と同様に、CanCommitOffsets へのキャストは createDirectStream の結果に対して呼び出された場合にのみ成功し、変換後には失敗します。commitAsync 呼び出しはスレッドセーフですが、意味のあるセマンティクスを得るためには出力の後に発生する必要があります。

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

独自のデータストア

トランザクションをサポートするデータストアの場合、結果と同じトランザクションでオフセットを保存すると、障害発生時でも両者を同期させることができます。繰り返されたりスキップされたりしたオフセット範囲を検出することに注意すれば、トランザクションのロールバックによって重複メッセージや失われたメッセージが結果に影響を与えるのを防ぐことができます。これにより、完全に一度のセマンティクスと同等のものが得られます。また、通常は冪等にするのが難しい集計の結果として生じる出力に対しても、この手法を使用できます。

// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  
  Object results = yourCalculation(rdd);

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
});

SSL / TLS

新しい Kafka Consumer は SSL をサポート しています。有効にするには、createDirectStream / createRDD に渡す前に、kafkaParams を適切に設定してください。これは Spark と Kafka ブローカー間の通信にのみ適用されることに注意してください。Spark ノード間通信の セキュリティ保護 は、引き続き別個に行う必要があります。

val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
  "ssl.truststore.password" -> "test1234",
  "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
  "ssl.keystore.password" -> "test1234",
  "ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

デプロイ

すべての Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。

Scala および Java アプリケーションの場合、SBT または Maven をプロジェクト管理に使用している場合は、spark-streaming-kafka-0-10_2.13 およびその依存関係をアプリケーション JAR にパッケージ化します。spark-core_2.13 および spark-streaming_2.13 は、Spark インストールに既に存在するため、provided 依存関係としてマークされていることを確認してください。その後、spark-submit を使用してアプリケーションを起動します (メインのプログラミングガイドの デプロイメントセクション を参照)。

セキュリティ

Structured Streaming Security を参照してください。

その他の注意事項