Spark Streaming + Kinesis 連携
Amazon Kinesis は、大規模なストリーミングデータをリアルタイムで処理するための完全に管理されたサービスです。Kinesis レシーバーは、Amazon が Amazon Software License (ASL) の下で提供する Kinesis Client Library (KCL) を使用して入力 DStream を作成します。KCL は Apache 2.0 ライセンスの AWS Java SDK を基盤として構築されており、Worker、Checkpoint、Shard Lease の概念を通じてロードバランシング、耐障害性、チェックポンティングを提供します。ここでは、Kinesis からデータを受信するように Spark Streaming を設定する方法を説明します。
Kinesis の設定
Kinesis ストリームは、有効な Kinesis エンドポイントのいずれかに、次の ガイドに従って 1 つ以上のシャードで設定できます。
Spark Streaming アプリケーションの設定
-
リンク: SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合、ストリーミングアプリケーションを次のアーティファクトにリンクします (詳細については、メインプログラミングガイドの リンクセクション を参照してください)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.13 version = 4.0.0Python アプリケーションの場合、アプリケーションのデプロイ時にこのライブラリとその依存関係を追加する必要があります。以下の「デプロイ」サブセクションを参照してください。このライブラリにリンクすることにより、アプリケーションに ASL ライセンスのコードが含まれることに注意してください。
-
プログラミング: ストリーミングアプリケーションコードで、
KinesisInputDStreamをインポートし、次のようにバイト配列の入力 DStream を作成します。from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)API ドキュメント と 例 を参照してください。例の実行手順については、「例の実行」サブセクションを参照してください。
- CloudWatch メトリクスレベルとディメンション。詳細については、KCL を使用した監視に関する AWS ドキュメント を参照してください。デフォルトは
MetricsLevel.DETAILEDです。
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()API ドキュメント と 例 を参照してください。例の実行手順については、「例の実行」サブセクションを参照してください。
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();API ドキュメント と 例 を参照してください。例の実行手順については、「例の実行」サブセクションを参照してください。
以下の設定も指定できます。これは現在、Scala と Java でのみサポートされています。
- Kinesis
Recordを受け取り、汎用オブジェクトTを返す「メッセージハンドラー関数」。パーティションキーなどのRecordに含まれる他のデータを使用したい場合に使用します。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions( JavaConverters.asScalaSetConverter( KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS ) .asScala().toSet() ) .buildWithMessageHandler([message handler]);-
streamingContext: Kinesis がこの Kinesis アプリケーションを Kinesis ストリームに紐付けるために使用するアプリケーション名を含む StreamingContext。 [Kinesis アプリケーション名]: DynamoDB テーブルに Kinesis シーケンス番号をチェックポイントするために使用されるアプリケーション名。- アプリケーション名は、アカウントとリージョンごとに一意である必要があります。
- テーブルが存在するが、チェックポイント情報が間違っている場合 (別のストリーム、または古い失効したシーケンス番号の場合)、一時的なエラーが発生する可能性があります。
-
[Kinesis ストリーム名]: このストリーミングアプリケーションがデータをプルする Kinesis ストリーム。 -
[エンドポイント URL]: 有効な Kinesis エンドポイント URL は こちら で確認できます。 -
[リージョン名]: 有効な Kinesis リージョン名は こちら で確認できます。 -
[チェックポイント間隔]: Kinesis Client Library がストリーム内の位置を保存する間隔 (例: Duration(2000) = 2 秒)。最初は、ストリーミングアプリケーションのバッチ間隔と同じに設定してください。 -
[初期位置]:KinesisInitialPositions.TrimHorizon、KinesisInitialPositions.Latest、またはKinesisInitialPositions.AtTimestampのいずれかです (詳細については、Kinesis チェックポンティングセクションおよびAmazon Kinesis API ドキュメントを参照してください)。 [メッセージハンドラー]: KinesisRecordを受け取り、汎用Tを出力する関数。
API の他のバージョンでは、AWS アクセスキーとシークレットキーを直接指定することもできます。
- CloudWatch メトリクスレベルとディメンション。詳細については、KCL を使用した監視に関する AWS ドキュメント を参照してください。デフォルトは
-
デプロイ: 他の Spark アプリケーションと同様に、
spark-submitを使用してアプリケーションを起動します。ただし、Scala/Java アプリケーションと Python アプリケーションでは詳細が若干異なります。Scala および Java アプリケーションの場合、SBT または Maven をプロジェクト管理に使用している場合は、
spark-streaming-kinesis-asl_2.13およびその依存関係をアプリケーション JAR にパッケージ化します。spark-core_2.13およびspark-streaming_2.13が Spark インストールに既に存在するため、provided依存関係としてマークされていることを確認してください。その後、spark-submitを使用してアプリケーションを起動します (メインプログラミングガイドの デプロイセクション を参照)。SBT/Maven プロジェクト管理がない Python アプリケーションの場合、
spark-streaming-kinesis-asl_2.13およびその依存関係を--packagesを使用してspark-submitに直接追加できます (「アプリケーション提出ガイド」を参照)。つまり、./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 ...または、Maven リポジトリ から Maven アーティファクト
spark-streaming-kinesis-asl-assemblyの JAR をダウンロードし、--jarsを使用してspark-submitに追加することもできます。
実行時に留意すべき点
-
Kinesis データ処理は、パーティションごとに順序付けされ、メッセージごとに少なくとも 1 回発生します。
-
複数のアプリケーションが同じ Kinesis ストリームから読み取ることができます。Kinesis は、アプリケーション固有のシャードとチェックポイント情報を DynamoDB に保持します。
-
単一の Kinesis ストリームシャードは、一度に 1 つの入力 DStream によって処理されます。
-
単一の Kinesis 入力 DStream は、複数の KinesisRecordProcessor スレッドを作成することにより、Kinesis ストリームの複数のシャードから読み取ることができます。
-
複数のプロセス/インスタンスで実行されている複数の入力 DStream は、Kinesis ストリームから読み取ることができます。
-
各入力 DStream は、単一のシャードを処理する少なくとも 1 つの KinesisRecordProcessor スレッドを作成するため、Kinesis ストリームシャードの数を超える Kinesis 入力 DStream は必要ありません。
-
水平スケーリングは、(単一プロセス内または複数のプロセス/インスタンスにわたって) Kinesis 入力 DStream を追加/削除することによって実現されます。これは、前述のポイントに従って、Kinesis ストリームシャードの総数までです。
-
Kinesis 入力 DStream は、プロセス/インスタンス全体にわたっても、すべての DStream 間でロードをバランシングします。
-
Kinesis 入力 DStream は、ロードの変更によるリシャードイベント (マージおよび分割) 中にロードをバランシングします。
-
ベストプラクティスとして、可能な場合はオーバープロビジョニングによりリシャードジッターを回避することが推奨されます。
-
各 Kinesis 入力 DStream は、独自のチェックポイント情報を保持します。詳細については、Kinesis チェックポンティングセクションを参照してください。
-
Kinesis ストリームシャードの数と、入力 DStream 処理中に Spark クラスター全体で作成される RDD パーティション/シャードの数との間には相関関係はありません。これらは 2 つの独立したパーティショニングスキームです。
-
例の実行
例を実行するには、
-
Spark バイナリを ダウンロードサイト からダウンロードします。
-
AWS 内で Kinesis ストリームを設定します (前のセクションを参照)。Kinesis ストリームの名前と、ストリームが作成されたリージョンに対応するエンドポイント URL をメモします。
-
AWS クレデンシャルを使用して、環境変数
AWS_ACCESS_KEY_IDおよびAWS_SECRET_ACCESS_KEYを設定します。 -
Spark のルートディレクトリで、次のように例を実行します。
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]これにより、Kinesis ストリームからデータが受信されるのを待ちます。
-
Kinesis ストリームにランダムな文字列データをプッシュするには、別のターミナルで関連する Kinesis データプロデューサーを実行します。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10これにより、1 秒あたり 1000 行、1 行あたり 10 個のランダムな数値が Kinesis ストリームにプッシュされます。このデータは、実行中の例によって受信および処理されるはずです。
レコードのデ・アグリゲーション
Kinesis Producer Library (KPL) を使用してデータが生成される場合、コスト削減のためにメッセージがアグリゲートされることがあります。Spark Streaming は、消費中にレコードを自動的にデ・アグリゲートします。
Kinesis チェックポンティング
-
各 Kinesis 入力 DStream は、ストリームの現在の位置をバックエンドの DynamoDB テーブルに定期的に保存します。これにより、システムは障害から復旧し、DStream が停止した場所から処理を続行できます。
-
チェックポンティングの頻度が高すぎると、AWS チェックポイントストレージレイヤーに過剰な負荷がかかり、AWS のスロットリングが発生する可能性があります。提供されている例では、ランダムバックオフリトライ戦略でこのスロットリングを処理します。
-
入力 DStream が開始されたときに Kinesis チェックポイント情報が存在しない場合、最も古い利用可能なレコード (
KinesisInitialPositions.TrimHorizon)、最新の先端 (KinesisInitialPositions.Latest)、または (Python を除く) 提供された UTC タイムスタンプ (KinesisInitialPositions.AtTimestamp(Date timestamp)) で示される位置から開始されます。これは設定可能です。KinesisInitialPositions.Latestは、入力 DStream が実行されておらず (チェックポイント情報が保存されていない)、ストリームにデータが追加された場合にレコードの欠落につながる可能性があります。KinesisInitialPositions.TrimHorizonは、チェックポイントの頻度と処理の冪等性によって影響が異なるレコードの重複処理につながる可能性があります。
Kinesis リトライ設定
spark.streaming.kinesis.retry.waitTime: Kinesis リトライ間の待機時間 (期間文字列)。Amazon Kinesis から読み取る際、ユーザーは 5 トランザクション/秒より速く消費する場合、または最大読み取りレート 2 MiB/秒を超える場合に、ProvisionedThroughputExceededExceptionに遭遇する可能性があります。この設定を調整することで、フェッチに失敗した場合のフェッチ間のスリープ時間を増やし、これらの例外を減らすことができます。デフォルトは「100ms」です。spark.streaming.kinesis.retry.maxAttempts: Kinesis フェッチの最大リトライ回数。この設定は、上記のシナリオで KinesisProvisionedThroughputExceededExceptionに対処するためにも使用できます。Kinesis 読み取りのリトライ回数を増やすために、この値を増やすことができます。デフォルトは 3 です。