Spark Streaming + Kinesis 連携

Amazon Kinesis は、大規模なストリーミングデータをリアルタイムで処理するための完全に管理されたサービスです。Kinesis レシーバーは、Amazon が Amazon Software License (ASL) の下で提供する Kinesis Client Library (KCL) を使用して入力 DStream を作成します。KCL は Apache 2.0 ライセンスの AWS Java SDK を基盤として構築されており、Worker、Checkpoint、Shard Lease の概念を通じてロードバランシング、耐障害性、チェックポンティングを提供します。ここでは、Kinesis からデータを受信するように Spark Streaming を設定する方法を説明します。

Kinesis の設定

Kinesis ストリームは、有効な Kinesis エンドポイントのいずれかに、次の ガイドに従って 1 つ以上のシャードで設定できます。

Spark Streaming アプリケーションの設定

  1. リンク: SBT/Maven プロジェクト定義を使用する Scala/Java アプリケーションの場合、ストリーミングアプリケーションを次のアーティファクトにリンクします (詳細については、メインプログラミングガイドの リンクセクション を参照してください)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.13
     version = 4.0.0
    

    Python アプリケーションの場合、アプリケーションのデプロイ時にこのライブラリとその依存関係を追加する必要があります。以下の「デプロイ」サブセクションを参照してください。このライブラリにリンクすることにより、アプリケーションに ASL ライセンスのコードが含まれることに注意してください。

  2. プログラミング: ストリーミングアプリケーションコードで、KinesisInputDStream をインポートし、次のようにバイト配列の入力 DStream を作成します。

     from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
     kinesisStream = KinesisUtils.createStream(
         streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
         [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED],
         StorageLevel.MEMORY_AND_DISK_2)
    

    API ドキュメント を参照してください。例の実行手順については、「例の実行」サブセクションを参照してください。

    • CloudWatch メトリクスレベルとディメンション。詳細については、KCL を使用した監視に関する AWS ドキュメント を参照してください。デフォルトは MetricsLevel.DETAILED です。
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.kinesis.KinesisInputDStream
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
     val kinesisStream = KinesisInputDStream.builder
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .metricsLevel([metricsLevel.DETAILED])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .build()
    

    API ドキュメント を参照してください。例の実行手順については、「例の実行」サブセクションを参照してください。

     import org.apache.spark.storage.StorageLevel;
     import org.apache.spark.streaming.kinesis.KinesisInputDStream;
     import org.apache.spark.streaming.Seconds;
     import org.apache.spark.streaming.StreamingContext;
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
     KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .metricsLevel([metricsLevel.DETAILED])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .build();
    

    API ドキュメント を参照してください。例の実行手順については、「例の実行」サブセクションを参照してください。

    以下の設定も指定できます。これは現在、Scala と Java でのみサポートされています。

    • Kinesis Record を受け取り、汎用オブジェクト T を返す「メッセージハンドラー関数」。パーティションキーなどの Record に含まれる他のデータを使用したい場合に使用します。
     import collection.JavaConverters._
     import org.apache.spark.storage.StorageLevel
     import org.apache.spark.streaming.kinesis.KinesisInputDStream
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
     import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
     val kinesisStream = KinesisInputDStream.builder
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .metricsLevel(MetricsLevel.DETAILED)
         .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
         .buildWithMessageHandler([message handler])
    
     import org.apache.spark.storage.StorageLevel;
     import org.apache.spark.streaming.kinesis.KinesisInputDStream;
     import org.apache.spark.streaming.Seconds;
     import org.apache.spark.streaming.StreamingContext;
     import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
     import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
     import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
     import scala.collection.JavaConverters;
    
     KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
         .streamingContext(streamingContext)
         .endpointUrl([endpoint URL])
         .regionName([region name])
         .streamName([streamName])
         .initialPosition([initial position])
         .checkpointAppName([Kinesis app name])
         .checkpointInterval([checkpoint interval])
         .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
         .metricsLevel(MetricsLevel.DETAILED)
         .metricsEnabledDimensions(
             JavaConverters.asScalaSetConverter(
                 KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS
             )
             .asScala().toSet()
         )
         .buildWithMessageHandler([message handler]);
    
    • streamingContext: Kinesis がこの Kinesis アプリケーションを Kinesis ストリームに紐付けるために使用するアプリケーション名を含む StreamingContext。

    • [Kinesis アプリケーション名]: DynamoDB テーブルに Kinesis シーケンス番号をチェックポイントするために使用されるアプリケーション名。
      • アプリケーション名は、アカウントとリージョンごとに一意である必要があります。
      • テーブルが存在するが、チェックポイント情報が間違っている場合 (別のストリーム、または古い失効したシーケンス番号の場合)、一時的なエラーが発生する可能性があります。
    • [Kinesis ストリーム名]: このストリーミングアプリケーションがデータをプルする Kinesis ストリーム。

    • [エンドポイント URL]: 有効な Kinesis エンドポイント URL は こちら で確認できます。

    • [リージョン名]: 有効な Kinesis リージョン名は こちら で確認できます。

    • [チェックポイント間隔]: Kinesis Client Library がストリーム内の位置を保存する間隔 (例: Duration(2000) = 2 秒)。最初は、ストリーミングアプリケーションのバッチ間隔と同じに設定してください。

    • [初期位置]: KinesisInitialPositions.TrimHorizonKinesisInitialPositions.Latest、または KinesisInitialPositions.AtTimestamp のいずれかです (詳細については、Kinesis チェックポンティング セクションおよび Amazon Kinesis API ドキュメント を参照してください)。

    • [メッセージハンドラー]: Kinesis Record を受け取り、汎用 T を出力する関数。

    API の他のバージョンでは、AWS アクセスキーとシークレットキーを直接指定することもできます。

  3. デプロイ: 他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。ただし、Scala/Java アプリケーションと Python アプリケーションでは詳細が若干異なります。

    Scala および Java アプリケーションの場合、SBT または Maven をプロジェクト管理に使用している場合は、spark-streaming-kinesis-asl_2.13 およびその依存関係をアプリケーション JAR にパッケージ化します。spark-core_2.13 および spark-streaming_2.13 が Spark インストールに既に存在するため、provided 依存関係としてマークされていることを確認してください。その後、spark-submit を使用してアプリケーションを起動します (メインプログラミングガイドの デプロイセクション を参照)。

    SBT/Maven プロジェクト管理がない Python アプリケーションの場合、spark-streaming-kinesis-asl_2.13 およびその依存関係を --packages を使用して spark-submit に直接追加できます (「アプリケーション提出ガイド」を参照)。つまり、

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.13:4.0.0 ...
    

    または、Maven リポジトリ から Maven アーティファクト spark-streaming-kinesis-asl-assembly の JAR をダウンロードし、--jars を使用して spark-submit に追加することもできます。

    Spark Streaming Kinesis Architecture

    実行時に留意すべき点

    • Kinesis データ処理は、パーティションごとに順序付けされ、メッセージごとに少なくとも 1 回発生します。

    • 複数のアプリケーションが同じ Kinesis ストリームから読み取ることができます。Kinesis は、アプリケーション固有のシャードとチェックポイント情報を DynamoDB に保持します。

    • 単一の Kinesis ストリームシャードは、一度に 1 つの入力 DStream によって処理されます。

    • 単一の Kinesis 入力 DStream は、複数の KinesisRecordProcessor スレッドを作成することにより、Kinesis ストリームの複数のシャードから読み取ることができます。

    • 複数のプロセス/インスタンスで実行されている複数の入力 DStream は、Kinesis ストリームから読み取ることができます。

    • 各入力 DStream は、単一のシャードを処理する少なくとも 1 つの KinesisRecordProcessor スレッドを作成するため、Kinesis ストリームシャードの数を超える Kinesis 入力 DStream は必要ありません。

    • 水平スケーリングは、(単一プロセス内または複数のプロセス/インスタンスにわたって) Kinesis 入力 DStream を追加/削除することによって実現されます。これは、前述のポイントに従って、Kinesis ストリームシャードの総数までです。

    • Kinesis 入力 DStream は、プロセス/インスタンス全体にわたっても、すべての DStream 間でロードをバランシングします。

    • Kinesis 入力 DStream は、ロードの変更によるリシャードイベント (マージおよび分割) 中にロードをバランシングします。

    • ベストプラクティスとして、可能な場合はオーバープロビジョニングによりリシャードジッターを回避することが推奨されます。

    • 各 Kinesis 入力 DStream は、独自のチェックポイント情報を保持します。詳細については、Kinesis チェックポンティングセクションを参照してください。

    • Kinesis ストリームシャードの数と、入力 DStream 処理中に Spark クラスター全体で作成される RDD パーティション/シャードの数との間には相関関係はありません。これらは 2 つの独立したパーティショニングスキームです。

例の実行

例を実行するには、

レコードのデ・アグリゲーション

Kinesis Producer Library (KPL) を使用してデータが生成される場合、コスト削減のためにメッセージがアグリゲートされることがあります。Spark Streaming は、消費中にレコードを自動的にデ・アグリゲートします。

Kinesis チェックポンティング

Kinesis リトライ設定