Spark Streaming + Kafka 統合ガイド (Kafka broker バージョン 0.10.0 以上)

Kafka 0.10 用の Spark Streaming 統合は、シンプルな並列処理、Kafka パーティションと Spark パーティション間の 1:1 対応、オフセットとメタデータへのアクセスを提供します。しかし、新しい統合では、単純な API の代わりに 新しい Kafka コンシューマー API を使用するため、使用方法に大きな違いがあります。

リンク

SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションでは、ストリーミングアプリケーションを次のアーティファクトにリンクします (詳細については、メインプログラミングガイドの リンクセクション を参照してください)。

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.12
version = 3.5.1

org.apache.kafka アーティファクト (例: kafka-clients) への依存関係を手動で追加しないでください。spark-streaming-kafka-0-10 アーティファクトには、すでに適切な推移的な依存関係が含まれており、異なるバージョンは診断が困難な方法で互換性がない可能性があります。

ダイレクトストリームの作成

インポートの名前空間にはバージョン org.apache.spark.streaming.kafka010 が含まれていることに注意してください

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

ストリーム内の各アイテムは ConsumerRecord です

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

使用可能な kafkaParams については、Kafka コンシューマー設定ドキュメント を参照してください。Spark バッチ期間がデフォルトの Kafka ハートビートセッションタイムアウト (30 秒) より大きい場合は、heartbeat.interval.ms と session.timeout.ms を適切に増やしてください。5 分を超えるバッチの場合は、ブローカーで group.max.session.timeout.ms を変更する必要があります。例では enable.auto.commit を false に設定していることに注意してください。詳細については、以下の オフセットの保存 を参照してください。

ロケーション戦略

新しい Kafka コンシューマー API は、メッセージをバッファにプリフェッチします。そのため、パフォーマンス上の理由から、Spark 統合では、エグゼキュータにキャッシュされたコンシューマーを保持し (各バッチで再作成するのではなく)、適切なコンシューマーを持つホストロケーションにパーティションをスケジュールすることをお勧めします。

ほとんどの場合、上記のように LocationStrategies.PreferConsistent を使用する必要があります。これにより、利用可能なエグゼキュータにパーティションが均等に分散されます。エグゼキュータが Kafka ブローカーと同じホストにある場合は、PreferBrokers を使用してください。これは、そのパーティションの Kafka リーダーにパーティションをスケジュールすることを優先します。最後に、パーティション間の負荷に大きな偏りがある場合は、PreferFixed を使用してください。これにより、パーティションからホストへの明示的なマッピングを指定できます (指定されていないパーティションは、一貫した場所を使用します)。

コンシューマーのキャッシュのデフォルトの最大サイズは 64 です。(64 * エグゼキュータ数) を超える Kafka パーティションを処理する予定がある場合は、spark.streaming.kafka.consumer.cache.maxCapacity でこの設定を変更できます。

Kafka コンシューマーのキャッシュを無効にする場合は、spark.streaming.kafka.consumer.cache.enabledfalse に設定できます。

キャッシュは topicpartition と group.id でキー設定されているため、createDirectStream を呼び出すたびに**別々の** group.id を使用してください。

コンシューマー戦略

新しい Kafka コンシューマー API には、トピックを指定する方法がいくつかあり、その中にはオブジェクトインスタンス化後のセットアップにかなりの時間を要するものがあります。ConsumerStrategies は、チェックポイントからの再起動後でも、Spark が適切に設定されたコンシューマーを取得できるようにする抽象化を提供します。

上記のように、ConsumerStrategies.Subscribe を使用すると、固定されたトピックのコレクションを購読できます。SubscribePattern を使用すると、正規表現を使用して対象のトピックを指定できます。0.8 統合とは異なり、Subscribe または SubscribePattern を使用すると、実行中のストリーム中にパーティションを追加することに対応できることに注意してください。最後に、Assign を使用すると、固定されたパーティションのコレクションを指定できます。3 つの戦略すべてに、特定のパーティションの開始オフセットを指定できるオーバーロードされたコンストラクタがあります。

上記のオプションでは対応できない特定のコンシューマーセットアップのニーズがある場合は、ConsumerStrategy を拡張できるパブリッククラスです。

RDD の作成

バッチ処理に適したユースケースがある場合は、定義されたオフセット範囲の RDD を作成できます。

// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
// Import dependencies and create kafka params as in Create Direct Stream above

OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

ストリームがないため、ブローカーメタデータを自動的に検索するドライバ側のコンシューマーがないため、PreferBrokers は使用できないことに注意してください。必要に応じて、独自のメタデータ検索で PreferFixed を使用してください。

オフセットの取得

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

HasOffsetRanges への型キャストは、createDirectStream の結果に対して呼び出された最初のメソッドで行われた場合にのみ成功し、メソッドチェーンの後半では成功しないことに注意してください。RDD パーティションと Kafka パーティション間の 1 対 1 のマッピングは、reduceByKey() や window() など、シャッフルまたは再パーティションを行うメソッドの後には保持されないことに注意してください。

オフセットの保存

障害発生時の Kafka の配信セマンティクスは、オフセットの保存方法とタイミングによって異なります。Spark の出力操作は 少なくとも1回 です。そのため、exactly-once セマンティクスと同等のものを実現するには、冪等出力の後にオフセットを保存するか、出力と一緒にアトミックトランザクションでオフセットを保存する必要があります。この統合では、オフセットの保存方法について、信頼性の低い順に (コードの複雑さも低い順に) 3 つのオプションがあります。

チェックポイント

Spark チェックポイント を有効にすると、オフセットはチェックポイントに保存されます。これは簡単に有効にできますが、欠点があります。繰り返される出力が生成されるため、出力操作は冪等である必要があります。トランザクションはオプションではありません。さらに、アプリケーションコードが変更された場合、チェックポイントから回復することはできません。計画されたアップグレードの場合は、新しいコードと古いコードを同時に実行することで、これを軽減できます (出力はとにかく冪等である必要があるため、衝突するべきではありません)。ただし、コード変更が必要な計画外の障害の場合は、既知の適切な開始オフセットを識別する別の方法がない限り、データが失われます。

Kafka 自体

Kafka には、特別な Kafka トピックにオフセットを保存するオフセットコミット API があります。デフォルトでは、新しいコンシューマーは定期的にオフセットを自動コミットします。これは、コンシューマーによって正常にポーリングされたメッセージがまだ Spark 出力操作を実行していない可能性があり、未定義のセマンティクスが発生するため、ほとんどの場合望ましい動作ではありません。そのため、上記のストリームの例では、「enable.auto.commit」を false に設定しています。ただし、出力が保存されたことがわかった後、commitAsync API を使用して Kafka にオフセットをコミットできます。チェックポイントと比較した利点は、アプリケーションコードの変更に関係なく、Kafka が永続的なストアであることです。ただし、Kafka はトランザクションではないため、出力は依然として冪等である必要があります。

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

HasOffsetRanges と同様に、CanCommitOffsets へのキャストは、変換後ではなく、createDirectStream の結果に対して呼び出された場合にのみ成功します。commitAsync 呼び出しはスレッドセーフですが、意味のあるセマンティクスが必要な場合は、出力の後に行う必要があります。

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // some time later, after outputs have completed
  ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});

独自のデータストア

トランザクションをサポートするデータストアの場合、結果と同じトランザクションにオフセットを保存することで、障害が発生した場合でも 2 つを同期させることができます。繰り返されるオフセット範囲またはスキップされたオフセット範囲の検出に注意を払えば、トランザクションをロールバックすることで、重複したメッセージまたは失われたメッセージが結果に影響を与えるのを防ぐことができます。これにより、exactly-once セマンティクスと同等のものが得られます。また、通常は冪等にするのが難しい集計から生じる出力にも、この戦術を使用できます。

// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
// The details depend on your data store, but the general idea looks like this

// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}

JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  
  Object results = yourCalculation(rdd);

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
});

SSL / TLS

新しい Kafka コンシューマーは SSL をサポートしています。有効にするには、createDirectStream / createRDD に渡す前に、kafkaParams を適切に設定してください。これは、Spark と Kafka ブローカー間の通信にのみ適用されることに注意してください。Spark ノード間通信を個別に 保護する 必要があります。

val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
  "ssl.truststore.password" -> "test1234",
  "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
  "ssl.keystore.password" -> "test1234",
  "ssl.key.password" -> "test1234"
)
Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

デプロイ

他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。

Scala および Java アプリケーションの場合、プロジェクト管理に SBT または Maven を使用している場合は、spark-streaming-kafka-0-10_2.12 とその依存関係をアプリケーション JAR にパッケージ化します。spark-core_2.12spark-streaming_2.12 は、Spark インストールにすでに存在するため、provided 依存関係としてマークされていることを確認してください。次に、spark-submit を使用してアプリケーションを起動します (メインプログラミングガイドの デプロイセクション を参照してください)。

セキュリティ

ストラクチャードストリーミングのセキュリティ を参照してください。

その他の注意事項