Spark StreamingとKinesisの統合
Amazon Kinesisは、大規模なストリーミングデータのリアルタイム処理のためのフルマネージドサービスです。Kinesisレシーバーは、Amazon Software License (ASL)の下でAmazonが提供するKinesis Client Library (KCL)を使用して入力DStreamを作成します。KCLは、Apache 2.0ライセンスのAWS Java SDK上に構築されており、ワーカー、チェックポイント、シャードリースの概念を通じて負荷分散、フォールトトレランス、チェックポイントを提供します。ここでは、Kinesisからデータを受信するようにSpark Streamingを設定する方法について説明します。
Kinesisの設定
Kinesisストリームは、以下のガイドに従って、有効なKinesisエンドポイントのいずれかに、シャードごとに1つ以上設定できます。
Spark Streamingアプリケーションの設定
-
**リンク:** SBT/Mavenプロジェクト定義を使用するScala/Javaアプリケーションの場合、ストリーミングアプリケーションを以下のアーティファクトにリンクします(詳細については、メインプログラミングガイドのリンクセクションを参照)。
groupId = org.apache.spark artifactId = spark-streaming-kinesis-asl_2.12 version = 3.5.1
Pythonアプリケーションの場合、アプリケーションをデプロイする際に、上記のライブラリとその依存関係を追加する必要があります。以下の*デプロイ*の項を参照してください。**このライブラリにリンクすることにより、アプリケーションにASLライセンスのコードが含まれることに注意してください。**
-
**プログラミング:** ストリーミングアプリケーションコードで、
KinesisInputDStream
をインポートし、バイト配列の入力DStreamを次のように作成します。from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream kinesisStream = KinesisUtils.createStream( streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL], [region name], [initial position], [checkpoint interval], [metricsLevel.DETAILED], StorageLevel.MEMORY_AND_DISK_2)
APIドキュメントと例を参照してください。例を実行する手順については、*例の 실행*の項を参照してください。
- CloudWatchメトリクスのレベルとディメンション。詳細は、KCLの監視に関するAWSドキュメントを参照してください。デフォルトはMetricsLevel.DETAILEDです。
import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build()
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .metricsLevel([metricsLevel.DETAILED]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .build();
以下の設定を行うこともできます。これは現在、ScalaとJavaでのみサポートされています。
- パーティションキーなど、
Record
に含まれる他のデータを使用する場合に、KinesisRecord
を受け取り、汎用オブジェクトT
を返す「メッセージハンドラ関数」。
import collection.JavaConverters._ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.kinesis.KinesisInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kinesis.KinesisInitialPositions import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .buildWithMessageHandler([message handler])
import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.kinesis.KinesisInputDStream; import org.apache.spark.streaming.Seconds; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.kinesis.KinesisInitialPositions; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import scala.collection.JavaConverters; KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder() .streamingContext(streamingContext) .endpointUrl([endpoint URL]) .regionName([region name]) .streamName([streamName]) .initialPosition([initial position]) .checkpointAppName([Kinesis app name]) .checkpointInterval([checkpoint interval]) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet()) .buildWithMessageHandler([message handler]);
-
streamingContext
: KinesisがこのKinesisアプリケーションをKinesisストリームに関連付けるために使用するアプリケーション名を含むStreamingContext [Kinesisアプリ名]
: DynamoDBテーブルにKinesisシーケンス番号をチェックポイントするために使用されるアプリケーション名。- アプリケーション名は、特定のアカウントとリージョンに対して一意である必要があります。
- テーブルが存在するが、正しくないチェックポイント情報(異なるストリーム、または期限切れの古いシーケンス番号)が含まれている場合、一時的なエラーが発生する可能性があります。
-
[Kinesisストリーム名]
: このストリーミングアプリケーションがデータをプルするKinesisストリーム。 -
[エンドポイントURL]
: 有効なKinesisエンドポイントURLはこちらにあります。 -
[リージョン名]
: 有効なKinesisリージョン名はこちらにあります。 -
[チェックポイント間隔]
: Kinesis Client Libraryがストリーム内の位置を保存する間隔(例:Duration(2000) = 2秒)。最初は、ストリーミングアプリケーションのバッチ間隔と同じに設定します。 -
[初期位置]
:KinesisInitialPositions.TrimHorizon
、KinesisInitialPositions.Latest
、またはKinesisInitialPositions.AtTimestamp
のいずれかです(Kinesisチェックポイント
セクションとAmazon Kinesis APIドキュメント
を参照)。 [メッセージハンドラ]
: KinesisRecord
を受け取り、汎用T
を出力する関数。
APIの他のバージョンでは、AWSアクセスキーとシークレットキーを直接指定することもできます。
-
**デプロイ:** 他のSparkアプリケーションと同様に、
spark-submit
を使用してアプリケーションを起動します。ただし、詳細はScala/JavaアプリケーションとPythonアプリケーションで若干異なります。ScalaおよびJavaアプリケーションの場合、プロジェクト管理にSBTまたはMavenを使用している場合は、
spark-streaming-kinesis-asl_2.12
とその依存関係をアプリケーションJARにパッケージ化します。spark-core_2.12
とspark-streaming_2.12
はSparkインストールに既に存在するため、provided
依存関係としてマークされていることを確認してください。次に、spark-submit
を使用してアプリケーションを起動します(メインプログラミングガイドのデプロイセクションを参照)。SBT/Mavenプロジェクト管理がないPythonアプリケーションの場合、
spark-streaming-kinesis-asl_2.12
とその依存関係は、--packages
を使用してspark-submit
に直接追加できます(アプリケーション送信ガイドを参照)。つまり、./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 ...
または、Mavenアーティファクト
spark-streaming-kinesis-asl-assembly
のJARをMavenリポジトリからダウンロードし、--jars
を使用してspark-submit
に追加することもできます。実行時に注意すべき点
-
Kinesisデータ処理はパーティションごとに順序付けられ、メッセージごとに少なくとも1回発生します。
-
複数のアプリケーションが同じKinesisストリームから読み取ることができます。Kinesisは、DynamoDBにアプリケーション固有のシャードとチェックポイント情報を保持します。
-
単一のKinesisストリームシャードは、一度に1つの入力DStreamによって処理されます。
-
単一のKinesis入力DStreamは、複数のKinesisRecordProcessorスレッドを作成することにより、Kinesisストリームの複数のシャードから読み取ることができます。
-
別々のプロセス/インスタンスで実行されている複数の入力DStreamは、Kinesisストリームから読み取ることができます。
-
各入力DStreamは単一のシャードを処理する少なくとも1つのKinesisRecordProcessorスレッドを作成するため、Kinesisストリームシャードの数よりも多くのKinesis入力DStreamは必要ありません。
-
水平スケーリングは、(単一のプロセス内または複数のプロセス/インスタンスにわたって)Kinesis入力DStreamを追加/削除することによって実現されます - 前のポイントに従って、Kinesisストリームシャードの総数まで。
-
Kinesis入力DStreamは、すべてのDStream間で負荷を分散します - プロセス/インスタンス間でも。
-
Kinesis入力DStreamは、負荷の変化による再シャードイベント(マージと分割)中に負荷を分散します。
-
ベストプラクティスとして、可能な場合はオーバープロビジョニングを行うことで、再シャードのジッターを回避することをお勧めします。
-
各Kinesis入力DStreamは、独自のチェックポイント情報を保持します。詳細は、Kinesisチェックポイントのセクションを参照してください。
-
Kinesisストリームシャードの数と、入力DStream処理中にSparkクラスター全体で作成されるRDDパーティション/シャードの数との間には相関関係はありません。これらは2つの独立したパーティションスキームです。
-
例の 실행
例を実行するには、
-
ダウンロードサイトからSparkバイナリをダウンロードします。
-
AWS内でKinesisストリームを設定します(前のセクションを参照)。Kinesisストリームの名前と、ストリームが作成されたリージョンに対応するエンドポイントURLをメモしてください。
-
環境変数
AWS_ACCESS_KEY_ID
とAWS_SECRET_ACCESS_KEY
をAWS認証情報で設定します。 -
Sparkのルートディレクトリで、次のように例を実行します。
./bin/spark-submit --jars 'connector/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \ connector/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \ [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.1 streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
これは、Kinesisストリームからデータが受信されるまで待機します。
-
Kinesisストリームに配置するランダムな文字列データを生成するには、別のターミナルで、関連付けられているKinesisデータプロデューサーを実行します。
./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
これにより、1行あたり10個の乱数が1秒あたり1000行、Kinesisストリームにプッシュされます。このデータは、実行中の例によって受信および処理されるはずです。
レコードの逆集約
Kinesis Producer Library (KPL)を使用してデータが生成される場合、コスト削減のためにメッセージが集約されることがあります。Spark Streamingは、消費中にレコードを自動的に逆集約します。
Kinesisチェックポイント
-
各Kinesis入力DStreamは、定期的にストリームの現在位置をバッキングDynamoDBテーブルに保存します。これにより、システムは障害から回復し、DStreamが中断したところから処理を続行できます。
-
チェックポイントを頻繁に実行すると、AWSチェックポイントストレージレイヤーに過剰な負荷がかかり、AWSスロットリングが発生する可能性があります。提供されている例では、ランダムバックオフ再試行戦略を使用してこのスロットリングを処理します。
-
入力DStreamの開始時にKinesisチェックポイント情報が存在しない場合、使用可能な最も古いレコード(
KinesisInitialPositions.TrimHorizon
)、最新のヒント(KinesisInitialPositions.Latest
)、または(Pythonを除く)指定されたUTCタイムスタンプで示される位置(KinesisInitialPositions.AtTimestamp(Date timestamp)
)から開始されます。これは設定可能です。- 入力DStreamが実行されていない(チェックポイント情報が保存されていない)間にストリームにデータが追加された場合、
KinesisInitialPositions.Latest
はレコードの欠落につながる可能性があります。 KinesisInitialPositions.TrimHorizon
は、チェックポイントの頻度と処理の冪等性に依存する影響があるレコードの重複処理につながる可能性があります。
- 入力DStreamが実行されていない(チェックポイント情報が保存されていない)間にストリームにデータが追加された場合、
Kinesis再試行設定
spark.streaming.kinesis.retry.waitTime
: Kinesis再試行間隔(期間文字列)。読み込み速度が規定値(5トランザクション/秒、2MiB/秒)を超えると`ProvisionedThroughputExceededException`が発生する可能性があるため、この値を調整して再試行間隔を延ばし、例外発生を抑制します。デフォルトは"100ms"。spark.streaming.kinesis.retry.maxAttempts
: Kinesisフェッチの最大再試行回数。`ProvisionedThroughputExceededException`への対策として、再試行回数を増やすことができます。デフォルトは3。