Spark StreamingとKinesisの統合

Amazon Kinesisは、大規模なストリーミングデータのリアルタイム処理のためのフルマネージドサービスです。Kinesisレシーバーは、Amazon Software License (ASL)の下でAmazonが提供するKinesis Client Library (KCL)を使用して入力DStreamを作成します。KCLは、Apache 2.0ライセンスのAWS Java SDK上に構築されており、ワーカー、チェックポイント、シャードリースの概念を通じて負荷分散、フォールトトレランス、チェックポイントを提供します。ここでは、Kinesisからデータを受信するようにSpark Streamingを設定する方法について説明します。

Kinesisの設定

Kinesisストリームは、以下のガイドに従って、有効なKinesisエンドポイントのいずれかに、シャードごとに1つ以上設定できます。

Spark Streamingアプリケーションの設定

  1. **リンク:** SBT/Mavenプロジェクト定義を使用するScala/Javaアプリケーションの場合、ストリーミングアプリケーションを以下のアーティファクトにリンクします(詳細については、メインプログラミングガイドのリンクセクションを参照)。

     groupId = org.apache.spark
     artifactId = spark-streaming-kinesis-asl_2.12
     version = 3.5.1
    

    Pythonアプリケーションの場合、アプリケーションをデプロイする際に、上記のライブラリとその依存関係を追加する必要があります。以下の*デプロイ*の項を参照してください。**このライブラリにリンクすることにより、アプリケーションにASLライセンスのコードが含まれることに注意してください。**

  2. **プログラミング:** ストリーミングアプリケーションコードで、KinesisInputDStreamをインポートし、バイト配列の入力DStreamを次のように作成します。

         from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    
         kinesisStream = KinesisUtils.createStream(
             streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
             [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
    

    APIドキュメントを参照してください。例を実行する手順については、*例の 실행*の項を参照してください。

         import org.apache.spark.storage.StorageLevel
         import org.apache.spark.streaming.kinesis.KinesisInputDStream
         import org.apache.spark.streaming.{Seconds, StreamingContext}
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions
    
         val kinesisStream = KinesisInputDStream.builder
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build()
    

    APIドキュメントを参照してください。例を実行する方法については、*例の 실행*の項を参照してください。

         import org.apache.spark.storage.StorageLevel;
         import org.apache.spark.streaming.kinesis.KinesisInputDStream;
         import org.apache.spark.streaming.Seconds;
         import org.apache.spark.streaming.StreamingContext;
         import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
    
         KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
             .streamingContext(streamingContext)
             .endpointUrl([endpoint URL])
             .regionName([region name])
             .streamName([streamName])
             .initialPosition([initial position])
             .checkpointAppName([Kinesis app name])
             .checkpointInterval([checkpoint interval])
             .metricsLevel([metricsLevel.DETAILED])
             .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
             .build();
    

    APIドキュメントを参照してください。例を実行する手順については、*例の 실행*の項を参照してください。

    以下の設定を行うこともできます。これは現在、ScalaとJavaでのみサポートされています。

    • パーティションキーなど、Recordに含まれる他のデータを使用する場合に、Kinesis Recordを受け取り、汎用オブジェクトTを返す「メッセージハンドラ関数」。
             import collection.JavaConverters._
             import org.apache.spark.storage.StorageLevel
             import org.apache.spark.streaming.kinesis.KinesisInputDStream
             import org.apache.spark.streaming.{Seconds, StreamingContext}
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
    
             val kinesisStream = KinesisInputDStream.builder
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
                 .buildWithMessageHandler([message handler])
    
             import org.apache.spark.storage.StorageLevel;
             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
             import org.apache.spark.streaming.Seconds;
             import org.apache.spark.streaming.StreamingContext;
             import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
             import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
             import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
             import scala.collection.JavaConverters;
    
             KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
                 .streamingContext(streamingContext)
                 .endpointUrl([endpoint URL])
                 .regionName([region name])
                 .streamName([streamName])
                 .initialPosition([initial position])
                 .checkpointAppName([Kinesis app name])
                 .checkpointInterval([checkpoint interval])
                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
                 .metricsLevel(MetricsLevel.DETAILED)
                 .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
                 .buildWithMessageHandler([message handler]);
    
    • streamingContext: KinesisがこのKinesisアプリケーションをKinesisストリームに関連付けるために使用するアプリケーション名を含むStreamingContext

    • [Kinesisアプリ名]: DynamoDBテーブルにKinesisシーケンス番号をチェックポイントするために使用されるアプリケーション名。
      • アプリケーション名は、特定のアカウントとリージョンに対して一意である必要があります。
      • テーブルが存在するが、正しくないチェックポイント情報(異なるストリーム、または期限切れの古いシーケンス番号)が含まれている場合、一時的なエラーが発生する可能性があります。
    • [Kinesisストリーム名]: このストリーミングアプリケーションがデータをプルするKinesisストリーム。

    • [エンドポイントURL]: 有効なKinesisエンドポイントURLはこちらにあります。

    • [リージョン名]: 有効なKinesisリージョン名はこちらにあります。

    • [チェックポイント間隔]: Kinesis Client Libraryがストリーム内の位置を保存する間隔(例:Duration(2000) = 2秒)。最初は、ストリーミングアプリケーションのバッチ間隔と同じに設定します。

    • [初期位置]: KinesisInitialPositions.TrimHorizonKinesisInitialPositions.Latest、またはKinesisInitialPositions.AtTimestampのいずれかです(KinesisチェックポイントセクションとAmazon Kinesis APIドキュメントを参照)。

    • [メッセージハンドラ]: Kinesis Recordを受け取り、汎用Tを出力する関数。

    APIの他のバージョンでは、AWSアクセスキーとシークレットキーを直接指定することもできます。

  3. **デプロイ:** 他のSparkアプリケーションと同様に、spark-submitを使用してアプリケーションを起動します。ただし、詳細はScala/JavaアプリケーションとPythonアプリケーションで若干異なります。

    ScalaおよびJavaアプリケーションの場合、プロジェクト管理にSBTまたはMavenを使用している場合は、spark-streaming-kinesis-asl_2.12とその依存関係をアプリケーションJARにパッケージ化します。spark-core_2.12spark-streaming_2.12はSparkインストールに既に存在するため、provided依存関係としてマークされていることを確認してください。次に、spark-submitを使用してアプリケーションを起動します(メインプログラミングガイドのデプロイセクションを参照)。

    SBT/Mavenプロジェクト管理がないPythonアプリケーションの場合、spark-streaming-kinesis-asl_2.12とその依存関係は、--packagesを使用してspark-submitに直接追加できます(アプリケーション送信ガイドを参照)。つまり、

     ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 ...
    

    または、Mavenアーティファクトspark-streaming-kinesis-asl-assemblyのJARをMavenリポジトリからダウンロードし、--jarsを使用してspark-submitに追加することもできます。

    Spark Streaming Kinesis Architecture

    実行時に注意すべき点

    • Kinesisデータ処理はパーティションごとに順序付けられ、メッセージごとに少なくとも1回発生します。

    • 複数のアプリケーションが同じKinesisストリームから読み取ることができます。Kinesisは、DynamoDBにアプリケーション固有のシャードとチェックポイント情報を保持します。

    • 単一のKinesisストリームシャードは、一度に1つの入力DStreamによって処理されます。

    • 単一のKinesis入力DStreamは、複数のKinesisRecordProcessorスレッドを作成することにより、Kinesisストリームの複数のシャードから読み取ることができます。

    • 別々のプロセス/インスタンスで実行されている複数の入力DStreamは、Kinesisストリームから読み取ることができます。

    • 各入力DStreamは単一のシャードを処理する少なくとも1つのKinesisRecordProcessorスレッドを作成するため、Kinesisストリームシャードの数よりも多くのKinesis入力DStreamは必要ありません。

    • 水平スケーリングは、(単一のプロセス内または複数のプロセス/インスタンスにわたって)Kinesis入力DStreamを追加/削除することによって実現されます - 前のポイントに従って、Kinesisストリームシャードの総数まで。

    • Kinesis入力DStreamは、すべてのDStream間で負荷を分散します - プロセス/インスタンス間でも。

    • Kinesis入力DStreamは、負荷の変化による再シャードイベント(マージと分割)中に負荷を分散します。

    • ベストプラクティスとして、可能な場合はオーバープロビジョニングを行うことで、再シャードのジッターを回避することをお勧めします。

    • 各Kinesis入力DStreamは、独自のチェックポイント情報を保持します。詳細は、Kinesisチェックポイントのセクションを参照してください。

    • Kinesisストリームシャードの数と、入力DStream処理中にSparkクラスター全体で作成されるRDDパーティション/シャードの数との間には相関関係はありません。これらは2つの独立したパーティションスキームです。

例の 실행

例を実行するには、

レコードの逆集約

Kinesis Producer Library (KPL)を使用してデータが生成される場合、コスト削減のためにメッセージが集約されることがあります。Spark Streamingは、消費中にレコードを自動的に逆集約します。

Kinesisチェックポイント

Kinesis再試行設定