Spark Streaming プログラミング ガイド

注意

Spark Streaming は、Spark のストリーミング エンジンの以前の世代です。Spark Streaming への更新はこれ以上行われず、レガシー プロジェクトとなっています。Spark には、より新しく使いやすいストリーミング エンジンである Structured Streaming があります。ストリーミング アプリケーションとパイプラインには Spark Structured Streaming を使用してください。Structured Streaming プログラミング ガイドを参照してください。

概要

Spark Streaming は、コア Spark API の拡張機能であり、ライブ データ ストリームのスケーラブルで高スループット、耐障害性のあるストリーム処理を可能にします。データは、Kafka、Kinesis、TCP ソケットなどの多くのソースから取り込むことができ、mapreducejoinwindow などの高レベル関数で表現された複雑なアルゴリズムを使用して処理できます。最後に、処理されたデータをファイル システム、データベース、ライブ ダッシュボードにプッシュできます。実際、Spark の機械学習およびグラフ処理アルゴリズムをデータ ストリームに適用できます。

Spark Streaming

内部的には、次のように機能します。Spark Streaming はライブ入力データ ストリームを受信し、データをバッチに分割します。その後、Spark エンジンによって処理され、結果の最終ストリームがバッチで生成されます。

Spark Streaming

Spark Streaming は、*discretized stream* または *DStream* と呼ばれる高レベルの抽象化を提供し、これは連続するデータ ストリームを表します。DStream は、Kafka や Kinesis などのソースからの入力データ ストリームから作成することも、他の DStream に対して高レベル操作を適用することによって作成することもできます。内部的には、DStream は RDD のシーケンスとして表されます。

このガイドでは、DStream を使用して Spark Streaming プログラムの作成を開始する方法を示します。Scala、Java、または Python (Spark 1.2 で導入) で Spark Streaming プログラムを作成できます。これらはすべてこのガイドで説明されています。このガイド全体に、異なる言語のコード スニペットを選択できるタブがあります。

注: Python では、異なる場合や利用できない API がいくつかあります。このガイド全体で、Python API タグがこれらの違いを強調しています。


簡単な例

独自の Spark Streaming プログラムの作成方法の詳細に入る前に、単純な Spark Streaming プログラムがどのようなものか簡単に見てみましょう。TCP ソケットでリッスンしているデータサーバーから受信したテキスト データ内の単語数をカウントするとします。必要なことは次のとおりです。

まず、すべてのストリーミング機能のメイン エントリ ポイントである StreamingContext をインポートします。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

このコンテキストを使用して、ホスト名 (例: localhost) およびポート (例: 9999) として指定された TCP ソースからのストリーミング データを表す DStream を作成できます。

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

この lines DStream は、データサーバーから受信されるデータ ストリームを表します。この DStream の各レコードはテキストの行です。次に、行をスペースで単語に分割します。

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する 1 対多 DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words DStream として表されます。次に、これらの単語をカウントします。

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

この words DStream は、(word, 1) ペアの DStream にさらにマップ (1 対 1 の変換) され、その後、各データ バッチでの単語の頻度を取得するために削減されます。最後に、wordCounts.pprint() は、1 秒ごとに生成されるカウントの一部を出力します。

これらの行が実行されても、Spark Streaming は開始時に実行される計算を設定するだけで、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に呼び出します。

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

完全なコードは、Spark Streaming の例 NetworkWordCount にあります。

まず、Spark Streaming クラスの名前と、DStream のような必要な他のクラスに便利なメソッドを追加するために、StreamingContext からいくつかの暗黙の型変換をインポートします。StreamingContext は、すべてのストリーミング機能のメイン エントリ ポイントです。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

このコンテキストを使用して、ホスト名 (例: localhost) およびポート (例: 9999) として指定された TCP ソースからのストリーミング データを表す DStream を作成できます。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

この lines DStream は、データサーバーから受信されるデータ ストリームを表します。この DStream の各レコードはテキストの行です。次に、行をスペース文字で単語に分割します。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する 1 対多 DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words DStream として表されます。次に、これらの単語をカウントします。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

この words DStream は、(word, 1) ペアの DStream にさらにマップ (1 対 1 の変換) され、その後、各データ バッチでの単語の頻度を取得するために削減されます。最後に、wordCounts.print() は、1 秒ごとに生成されるカウントの一部を出力します。

これらの行が実行されても、Spark Streaming は開始時に実行される計算を設定するだけで、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に呼び出します。

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完全なコードは、Spark Streaming の例 NetworkWordCount にあります。

まず、すべてのストリーミング機能のメイン エントリ ポイントである JavaStreamingContext オブジェクトを作成します。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

このコンテキストを使用して、ホスト名 (例: localhost) およびポート (例: 9999) として指定された TCP ソースからのストリーミング データを表す DStream を作成できます。

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

この lines DStream は、データサーバーから受信されるデータ ストリームを表します。このストリームの各レコードはテキストの行です。次に、行をスペースで単語に分割します。

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

flatMap は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words DStream として表されます。変換は FlatMapFunction オブジェクトを使用して定義したことに注意してください。後でわかるように、Java API には DStream 変換を定義するのに役立つ多くの便利なクラスがあります。

次に、これらの単語をカウントします。

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

この words DStream は、PairFunction オブジェクトを使用して (word, 1) ペアの DStream にさらにマップ (1 対 1 の変換) され、その後、Function2 オブジェクトを使用して各バッチの単語の頻度を取得するために削減されます。最後に、wordCounts.print() は、1 秒ごとに生成されるカウントの一部を出力します。

これらの行が実行されても、Spark Streaming は開始後に実行される計算を設定するだけで、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に start メソッドを呼び出します。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完全なコードは、Spark Streaming の例 JavaNetworkWordCount にあります。

すでに Spark をダウンロードおよびビルドしている場合は、次のようにしてこの例を実行できます。まず、Netcat (ほとんどの Unix ライク システムで見つかる小さなユーティリティ) をデータサーバーとして実行する必要があります。

$ nc -lk 9999

次に、別のターミナルで、次のようにして例を開始できます。

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

すると、netcat サーバーを実行しているターミナルに入力された行は、1 秒ごとにカウントされ、画面に出力されます。次のようになります。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
# TERMINAL 2: RUNNING network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


基本概念

次に、簡単な例を超えて、Spark Streaming の基本について詳しく説明します。

リンク

Spark と同様に、Spark Streaming は Maven Central 経由で利用できます。独自の Spark Streaming プログラムを作成するには、SBT または Maven プロジェクトに次の依存関係を追加する必要があります。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.13</artifactId>
    <version>4.0.0</version>
    <scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.13" % "4.0.0" % "provided"

Kafka や Kinesis のようなソースからデータを取得するために Spark Streaming のコア API に存在しない場合は、対応するアーティファクト spark-streaming-xyz_2.13 を依存関係に追加する必要があります。たとえば、一般的なものの一部を次に示します。

ソースアーティファクト
Kafkaspark-streaming-kafka-0-10_2.13
Kinesis
spark-streaming-kinesis-asl_2.13 [Amazon Software License]

最新のリストについては、Maven リポジトリを参照して、サポートされているソースとアーティファクトの完全なリストを確認してください。


StreamingContext の初期化

Spark Streaming プログラムを初期化するには、すべての Spark Streaming 機能のエントリ ポイントである **StreamingContext** オブジェクトを作成する必要があります。

A StreamingContext object can be created from a SparkContext object.

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

この appName パラメータは、クラスター UI に表示されるアプリケーションの名前です。master は、Spark または YARN クラスター URL、またはローカル モードで実行するための特別な **“local[*]”** 文字列です。実際には、クラスターで実行する場合、プログラムに master をハードコーディングしたくないでしょう。代わりに、spark-submit を使用してアプリケーションを起動し、そこで受け取るでしょう。ただし、ローカル テストと単体テストの場合、ローカルでインプロセスで Spark Streaming を実行するために “local[*]” を渡すことができます (ローカル システムのコア数を検出します)。

バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスター リソースに基づいて設定する必要があります。詳細については、「パフォーマンス チューニング」セクションを参照してください。

A StreamingContext object can be created from a SparkConf object.

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

この appName パラメータは、クラスター UI に表示されるアプリケーションの名前です。master は、Spark、Kubernetes、または YARN クラスター URL、またはローカル モードで実行するための特別な **“local[*]”** 文字列です。実際には、クラスターで実行する場合、プログラムに master をハードコーディングしたくないでしょう。代わりに、spark-submit を使用してアプリケーションを起動し、そこで受け取るでしょう。ただし、ローカル テストと単体テストの場合、ローカルでインプロセスで Spark Streaming を実行するために “local[*]” を渡すことができます。これは内部的に SparkContext (すべての Spark 機能の開始点) を作成し、ssc.sparkContext としてアクセスできることに注意してください。

バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスター リソースに基づいて設定する必要があります。詳細については、「パフォーマンス チューニング」セクションを参照してください。

A StreamingContext object can also be created from an existing SparkContext object.

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

A JavaStreamingContext object can be created from a SparkConf object.

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

この appName パラメータは、クラスター UI に表示されるアプリケーションの名前です。master は、Spark または YARN クラスター URL、またはローカル モードで実行するための特別な **“local[*]”** 文字列です。実際には、クラスターで実行する場合、プログラムに master をハードコーディングしたくないでしょう。代わりに、spark-submit を使用してアプリケーションを起動し、そこで受け取るでしょう。ただし、ローカル テストと単体テストの場合、ローカルでインプロセスで Spark Streaming を実行するために “local[*]” を渡すことができます。これは内部的に JavaSparkContext (すべての Spark 機能の開始点) を作成し、ssc.sparkContext としてアクセスできることに注意してください。

バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスター リソースに基づいて設定する必要があります。詳細については、「パフォーマンス チューニング」セクションを参照してください。

A JavaStreamingContext object can also be created from an existing JavaSparkContext.

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

コンテキストが定義されたら、次のことを行う必要があります。

  1. 入力 DStream を作成して、入力ソースを定義します。
  2. DStream に変換および出力操作を適用して、ストリーミング計算を定義します。
  3. streamingContext.start() を使用して、データの受信と処理を開始します。
  4. streamingContext.awaitTermination() を使用して、処理が停止するのを待ちます (手動またはエラーのため)。
  5. 処理は、streamingContext.stop() を使用して手動で停止できます。
覚えておくべき点

Discretized Streams (DStreams)

Discretized Stream または DStream は、Spark Streaming によって提供される基本的な抽象化です。これは、ソースから受信した入力データ ストリーム、または入力ストリームを変換して生成された処理済みデータ ストリームのいずれかを表す連続データ ストリームです。内部的には、DStream は一連の RDD によって表されます。これは、不変の分散データセットの Spark の抽象化です (詳細については、Spark プログラミング ガイドを参照してください)。DStream の各 RDD には、次の図に示すように、特定の期間のデータが含まれています。

Spark Streaming

DStream に適用される操作は、基になる RDD の操作に変換されます。たとえば、ストリームの行を単語に変換する前の例では、flatMap 操作が lines DStream の各 RDD に適用されて、words DStream の RDD が生成されます。これは次の図に示されています。

Spark Streaming

これらの基になる RDD 変換は、Spark エンジンによって計算されます。DStream 操作は、これらの詳細のほとんどを隠し、開発者に利便性のために高レベルの API を提供します。これらの操作については、後続のセクションで詳しく説明します。


入力 DStreams と Receiver

入力 DStream は、ストリーミング ソースから受信した入力データのストリームを表す DStream です。簡単な例では、lines はネットキャット サーバーから受信したデータ ストリームを表していたため、入力 DStream でした。すべての入力 DStream (このセクションの後半で説明するファイル ストリームを除く) には、ソースからデータを受信し、処理のために Spark のメモリに格納する **Receiver** (Scala docJava doc) オブジェクトが関連付けられています。

Spark Streaming は、2 つのカテゴリの組み込みストリーミング ソースを提供します。

このセクションの後半で、各カテゴリのソースのいくつかについて説明します。

ストリーミング アプリケーションで複数のデータ ストリームを並列に受信したい場合は、複数の入力 DStream を作成できます (「パフォーマンス チューニング」セクションでさらに説明)。これにより、複数のレシーバーが作成され、複数のデータ ストリームが同時に受信されます。ただし、Spark ワーカー/エグゼキューターは長時間実行されるタスクであるため、Spark Streaming アプリケーションに割り当てられたコアの 1 つを占有します。したがって、Spark Streaming アプリケーションには、受信したデータを処理するため、およびレシーバーを実行するための十分なコア (またはローカルで実行する場合はスレッド) を割り当てる必要があることに注意することが重要です。

覚えておくべき点

基本ソース

簡単な例」で、TCP ソケット接続経由で受信したテキスト データから DStream を作成する ssc.socketTextStream(...) をすでに確認しました。ソケットの他に、StreamingContext API はファイル システムを入力ソースとして DStream を作成するためのメソッドを提供します。

ファイル ストリーム

HDFS API と互換性のある任意のファイル システム (HDFS、S3、NFS など) からファイルを読み取るには、StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] を介して DStream を作成できます。

ファイル ストリームはレシーバーの実行を必要としないため、ファイル データを取得するためのコアを割り当てる必要はありません。

単純なテキスト ファイルの場合、最も簡単な方法は StreamingContext.textFileStream(dataDirectory) です。

fileStream は Python API では利用できません。利用できるのは textFileStream のみです。

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

テキスト ファイルの場合

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

テキスト ファイルの場合

streamingContext.textFileStream(dataDirectory);
ディレクトリの監視方法

Spark Streaming は dataDirectory ディレクトリを監視し、そのディレクトリに作成されたファイルを処理します。

オブジェクト ストアをデータ ソースとして使用する

HDFS のような「フル」ファイル システムは、出力ストリームが作成されるとすぐにファイルに最終更新時間を設定する傾向があります。ファイルが開かれ、データが完全に書き込まれる前でも、DStream に含まれる可能性があり、その後、同じウィンドウ内でのファイルの更新は無視されます。つまり、変更が見逃されたり、データがストリームから除外されたりする可能性があります。

ウィンドウ内で変更を確実に取得するには、ファイルを監視されていないディレクトリに書き込み、出力ストリームが閉じられた直後に、宛先ディレクトリに名前変更します。名前変更されたファイルが、作成されたウィンドウ中にスキャンされた宛先ディレクトリに表示される場合、新しいデータが取得されます。

対照的に、Amazon S3 や Azure Storage などのオブジェクト ストアは、通常、リネーム操作が遅くなります。これは、データが実際にコピーされるためです。さらに、名前が変更されたオブジェクトは、元の作成時間が示唆していたウィンドウの一部とは見なされない場合があります。これは、rename() 操作の時刻を最終更新時間とするためです。

Spark Streaming が期待するタイムスタンプの動作が、ターゲット オブジェクト ストアと一貫していることを確認するには、慎重なテストが必要です。選択したオブジェクト ストア経由でストリーミング データにデータを取り込むには、宛先ディレクトリに直接書き込むことが適切な戦略である可能性があります。

このトピックの詳細については、Hadoop Filesystem Specification を参照してください。

カスタム レシーバーに基づくストリーム

カスタム レシーバー経由で受信したデータ ストリームを使用して、DStream を作成できます。詳細については、「カスタム レシーバー ガイド」を参照してください。

RDD のキューをストリームとして使用

テスト データで Spark Streaming アプリケーションをテストするために、streamingContext.queueStream(queueOfRDDs) を使用して、RDD のキューに基づく DStream を作成することもできます。キューにプッシュされた各 RDD は、DStream のデータ バッチとして扱われ、ストリームのように処理されます。

ソケットおよびファイルからのストリームの詳細については、Python 用の StreamingContext、Scala 用の StreamingContext、および Java 用の JavaStreamingContext の関連関数の API ドキュメントを参照してください。

高度なソース

Python API Spark 4.0.0 時点では、これらのソースのうち Kafka と Kinesis は Python API で利用可能です。

このカテゴリのソースは、外部の非 Spark ライブラリとのインターフェイスを必要とします。それらのいくつかは複雑な依存関係 (例: Kafka) を持っています。したがって、依存関係のバージョン競合に関連する問題を最小限に抑えるために、これらのソースからの DStream を作成する機能は、必要に応じて明示的にリンクできる個別のライブラリに移動されました。

これらの高度なソースは Spark シェルでは利用できないため、これらの高度なソースに基づくアプリケーションはシェルでテストできません。Spark シェルでそれらを使用したい場合は、対応する Maven アーティファクトの JAR とその依存関係をダウンロードして、クラスパスに追加する必要があります。

これらの高度なソースのいくつかには、次のものがあります。

カスタム ソース

Python API これは Python ではまだサポートされていません。

入力 DStream は、カスタム データ ソースから作成することもできます。カスタム ソースからデータを受信し、Spark にプッシュできるユーザー定義の **receiver** (それが何であるかを理解するために次のセクションを参照) を実装するだけです。詳細については、「カスタム レシーバー ガイド」を参照してください。

Receiver の信頼性

信頼性に基づいて、2 種類のデータ ソースがあります。ソース (Kafka など) は、転送されたデータを承認できます。これらの信頼性の高いソースからデータを受信したシステムが受信したデータを正しく承認した場合、障害によるデータの損失がないことが保証されます。これにより、2 種類のレシーバーが生成されます。

  1. 信頼性の高い Receiver -信頼性の高い Receiver は、データが Spark で受信およびレプリケートされて格納されたときに、信頼性の高いソースに正しく確認応答を送信します。
  2. 信頼性の低い Receiver -信頼性の低い Receiver は、ソースに確認応答を送信しません。これは、確認応答をサポートしないソース、または確認応答の複雑さに入りたくない/必要としない場合でも、信頼性の高いソースに使用できます。

信頼性の高いレシーバーの作成方法の詳細は、「カスタム レシーバー ガイド」で説明されています。


DStreams 上の変換

RDD と同様に、変換により、入力 DStream のデータを変更できます。DStream は、通常の Spark RDD で利用可能な変換の多くをサポートしています。一般的なものの一部を次に示します。

変換意味
map(func) ソース DStream の各要素を関数 func で処理して、新しい DStream を返します。
flatMap(func) map と同様ですが、各入力項目は 0 個以上の出力項目にマップできます。
filter(func) ソース DStream のレコードのうち、func が true を返すもののみを選択して、新しい DStream を返します。
repartition(numPartitions) パーティションの数を増減させることにより、この DStream の並列度を変更します。
union(otherStream) ソース DStream と otherDStream の要素の和集合を含む新しい DStream を返します。
count() ソース DStream の各 RDD の要素数をカウントして、単一要素 RDD の新しい DStream を返します。
reduce(func) ソース DStream の各 RDD の要素を関数 func (2 つの引数を取り、1 つを返す) を使用して集計して、単一要素 RDD の新しい DStream を返します。関数は、並列で計算できるように、結合可能で可換である必要があります。
countByValue() タイプ K の要素の DStream で呼び出されると、各 RDD でのキーの頻度がソース DStream の各 RDD で値となる (K, Long) ペアの新しい DStream を返します。
reduceByKey(func, [numTasks]) (K, V) ペアの DStream で呼び出されると、指定された削減関数を使用して各キーの値が集計される (K, V) ペアの新しい DStream を返します。**注:** デフォルトでは、これは Spark のデフォルトの並列タスク数 (ローカル モードでは 2、クラスター モードでは `spark.default.parallelism` 設定プロパティで決定される数) を使用してグループ化を行います。オプションの `numTasks` 引数を渡して、異なるタスク数を設定できます。
join(otherStream, [numTasks]) (K, V) および (K, W) ペアの 2 つの DStream で呼び出されると、各キーのすべての要素ペアを持つ (K, (V, W)) ペアの新しい DStream を返します。
cogroup(otherStream, [numTasks]) (K, V) および (K, W) ペアの DStream で呼び出されると、(K, Seq[V], Seq[W]) タプルの新しい DStream を返します。
transform(func) ソース DStream の各 RDD に RDD 対 RDD 関数を適用して、新しい DStream を返します。これは、DStream に対して任意の RDD 操作を実行するために使用できます。
updateStateByKey(func) 各キーの状態を、キーの前の状態とキーの新しい値に指定された関数を適用して更新する新しい「状態」DStream を返します。これは、各キーの任意の状態データを保持するために使用できます。

これらの変換のいくつかは、さらに詳細に説明する価値があります。

updateStateByKey 操作

updateStateByKey 操作により、状態を継続的に更新しながら、任意の状態を保持できます。これを使用するには、次の 2 つのステップを実行する必要があります。

  1. 状態を定義する - 状態は任意のデータ型にできます。
  2. 状態更新関数を定義する - 関数で、以前の状態と入力ストリームからの新しい値を使用して状態を更新する方法を指定します。

各バッチで、Spark は、バッチに新しいデータがあるかどうかにかかわらず、すべて既存のキーの状態更新関数を適用します。更新関数が None を返す場合、キーと値のペアは削除されます。

これを例で説明しましょう。テキスト データ ストリームで見られる各単語の実行カウントを保持したいとします。ここでは、実行カウントが状態であり、整数です。更新関数を次のように定義します。

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

これは、単語を含む DStream (たとえば、前の例(word, 1) ペアを含む pairs DStream) に適用されます。

runningCounts = pairs.updateStateByKey(updateFunction)

更新関数は、newValues が 1 のシーケンス (「(word, 1)」ペアから) を持ち、runningCount が前のカウントを持つ状態で、各単語に対して呼び出されます。完全な Python コードについては、例 stateful_network_wordcount.py を参照してください。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

これは、単語を含む DStream (たとえば、前の例(word, 1) ペアを含む pairs DStream) に適用されます。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新関数は、newValues が 1 のシーケンス (「(word, 1)」ペアから) を持ち、runningCount が前のカウントを持つ状態で、各単語に対して呼び出されます。

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

これは、単語を含む DStream (たとえば、簡単な例(word, 1) ペアを含む pairs DStream) に適用されます。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

更新関数は、newValues が 1 のシーケンス (「(word, 1)」ペアから) を持ち、runningCount が前のカウントを持つ状態で、各単語に対して呼び出されます。完全な Java コードについては、例 JavaStatefulNetworkWordCount.java を参照してください。

注: updateStateByKey を使用するには、チェックポイント ディレクトリを構成する必要があります。これは、「チェックポインティング」セクションで詳しく説明されています。

transform 操作

transform 操作 (「transformWith」のようなバリエーションを含む) は、任意の RDD 対 RDD 関数を DStream に適用できるようにします。これは、DStream API で公開されていない RDD 操作を実行するために使用できます。たとえば、データ ストリームの各バッチと別のデータセットを結合する機能は、DStream API で直接公開されていません。しかし、transform を使用してこれを簡単に行うことができます。これにより、非常に強力な可能性が生まれます。たとえば、入力データ ストリームを事前計算されたスパム情報 (Spark で生成された可能性のある) と結合し、それに基づいてフィルタリングすることで、リアルタイム データ クリーニングを実行できます。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

提供される関数は、各バッチ間隔で呼び出されることに注意してください。これにより、時間変化する RDD 操作を実行できます。つまり、RDD 操作、パーティション数、ブロードキャスト変数などをバッチ間で変更できます。

ウィンドウ操作

Spark Streaming は、ウィンドウ処理も提供しており、これにより、データのスライディング ウィンドウに対して変換を適用できます。次の図は、このスライディング ウィンドウを示しています。

Spark Streaming

図に示すように、ウィンドウがソース DStream をスライドするたびに、ウィンドウ内に収まるソース RDD が結合され、操作されて、ウィンドウ化された DStream の RDD が生成されます。この特定のケースでは、操作は最後の 3 つの時間単位のデータに対して適用され、2 つの時間単位でスライドします。これは、任意のウィンドウ操作で 2 つのパラメータを指定する必要があることを示しています。

これらの 2 つのパラメータは、ソース DStream のバッチ間隔 (図では 1) の倍数である必要があります。

ウィンドウ操作の例で説明しましょう。たとえば、「前の例」を拡張して、10 秒ごとに過去 30 秒間のデータの単語カウントを生成するとします。これを行うには、pairs DStream の (word, 1) ペアに対して、過去 30 秒間のデータに対する reduceByKey 操作を適用する必要があります。これは、reduceByKeyAndWindow 操作を使用して行われます。

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

一般的なウィンドウ操作のいくつかを次に示します。これらの操作はすべて、前述の 2 つのパラメータ - windowLengthslideInterval を取ります。

変換意味
window(windowLength, slideInterval) ソース DStream のウィンドウ化されたバッチに基づいて計算される新しい DStream を返します。
countByWindow(windowLength, slideInterval) ストリーム内の要素のスライディング ウィンドウ カウントを返します。
reduceByWindow(func, windowLength, slideInterval) func を使用してスライディング間隔でストリーム内の要素を集計して作成された新しい単一要素ストリームを返します。関数は、並列で正しく計算できるように、結合可能で可換である必要があります。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) (K, V) ペアの DStream で呼び出されると、指定された削減関数 func を使用して、ウィンドウ内のバッチ全体で各キーの値が集計される (K, V) ペアの新しい DStream を返します。**注:** デフォルトでは、これは Spark のデフォルトの並列タスク数 (ローカル モードでは 2、クラスター モードでは `spark.default.parallelism` 設定プロパティで決定される数) を使用してグループ化を行います。オプションの `numTasks` 引数を渡して、異なるタスク数を設定できます。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

上記の `reduceByKeyAndWindow()` のより効率的なバージョンで、各ウィンドウの削減値は、前のウィンドウの削減値を使用して増分的に計算されます。これは、スライディング ウィンドウに入る新しいデータを削減し、ウィンドウから出る古いデータを「逆削減」することによって行われます。例としては、「カウントの加算」と「減算」が挙げられます。ただし、これは「反転可能な削減関数」にのみ適用可能です。つまり、対応する「逆削減」関数 (パラメータ invFunc として渡される) を持つ削減関数です。 `reduceByKeyAndWindow` と同様に、削減タスクの数はオプションの引数で構成可能です。この操作を使用するには、チェックポインティングを有効にする必要があることに注意してください。

countByValueAndWindow(windowLength, slideInterval, [numTasks]) (K, V) ペアの DStream で呼び出されると、スライディング ウィンドウ内の各キーの頻度が値となる (K, Long) ペアの新しい DStream を返します。 `reduceByKeyAndWindow` と同様に、削減タスクの数はオプションの引数で構成可能です。

結合操作

最後に、Spark Streaming でさまざまな種類の結合をどれほど簡単に実行できるかを強調する価値があります。

ストリームとストリームの結合

ストリームは、他のストリームと非常に簡単に結合できます。

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

ここで、各バッチ間隔で、stream1 によって生成された RDD が stream2 によって生成された RDD と結合されます。また、leftOuterJoinrightOuterJoinfullOuterJoin も実行できます。さらに、ストリームのウィンドウ全体で結合を実行することも非常に役立ちます。これも非常に簡単です。

windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
ストリームとデータセットの結合

DStream.transform 操作を説明したときに、すでに示されています。ここでは、ウィンドウ化されたストリームをデータセットと結合する別の例を示します。

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

実際、結合したいデータセットを動的に変更することもできます。transform に提供される関数は、各バッチ間隔で評価されるため、dataset 参照が指している現在のデータセットを使用します。

DStream 変換の完全なリストは、API ドキュメントで入手できます。Python API については、DStream を参照してください。Scala API については、DStream および PairDStreamFunctions を参照してください。Java API については、JavaDStream および JavaPairDStream を参照してください。


DStreams 上の出力操作

出力操作により、DStream のデータをデータベースやファイル システムなどの外部システムにプッシュできます。出力操作は、変換されたデータを外部システムが消費できるようにするため、すべての DStream 変換の実際の実行をトリガーします (RDD のアクションと同様)。現在、次の出力操作が定義されています。

出力操作意味
print() ストリーミング アプリケーションを実行しているドライバー ノードで、DStream の各バッチ データの最初の 10 個の要素を出力します。これは、開発とデバッグに役立ちます。
Python API Python API では、これは pprint() と呼ばれます。
saveAsTextFiles(prefix, [suffix]) この DStream の内容をテキスト ファイルとして保存します。各バッチ間隔でのファイル名は、prefixsuffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix, [suffix]) この DStream の内容を、シリアライズされた Java オブジェクトの SequenceFiles として保存します。各バッチ間隔でのファイル名は、prefixsuffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"
Python API これは Python API では利用できません。
saveAsHadoopFiles(prefix, [suffix]) この DStream の内容を Hadoop ファイルとして保存します。各バッチ間隔でのファイル名は、prefixsuffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"
Python API これは Python API では利用できません。
foreachRDD(func) ストリームから生成された各 RDD に関数 func を適用する、最も一般的な出力演算子です。この関数は、RDD をファイルに保存したり、ネットワーク経由でデータベースに書き込んだりするなど、各 RDD のデータを外部システムにプッシュする必要があります。関数 func は、ストリーミング アプリケーションを実行しているドライバー プロセスで実行され、通常はストリーミング RDD の計算を強制する RDD アクションが含まれます。

foreachRDD を使用するためのデザイン パターン

dstream.foreachRDD は、データを外部システムに送信できる強力なプリミティブです。ただし、このプリミティブを正しく効率的に使用する方法を理解することが重要です。よくある間違いを次に示します。

多くの場合、外部システムへの書き込みには、接続オブジェクト (例: リモート サーバーへの TCP 接続) を作成し、それを使用してリモート システムにデータを送信する必要があります。この目的のために、開発者は誤って Spark ドライバーで接続オブジェクトを作成し、それを使用して RDD のレコードを保存するために Spark ワーカーでそれを使用しようとする場合があります。たとえば (Scala では)、

def sendRecord(rdd):
    connection = createNewConnection()  # executed at the driver
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

これは正しくありません。接続オブジェクトをシリアライズしてドライバーからワーカーに送信する必要があるためです。このような接続オブジェクトは、マシン間で転送されることはほとんどありません。このエラーは、シリアライズ エラー (接続オブジェクトはシリアライズ可能ではない)、初期化エラー (接続オブジェクトはワーカーで初期化する必要がある) などとして現れる場合があります。正しい解決策は、ワーカーで接続オブジェクトを作成することです。

ただし、これは別の一般的な間違いにつながる可能性があります。つまり、レコードごとに新しい接続を作成することです。たとえば、

def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

通常、接続オブジェクトの作成には時間とリソースのオーバーヘッドがかかります。したがって、レコードごとに接続オブジェクトを作成および破棄すると、不必要に高いオーバーヘッドが発生し、システムの全体的なスループットが大幅に低下する可能性があります。より良い解決策は、rdd.foreachPartition を使用することです。つまり、単一の接続オブジェクトを作成し、その接続を使用して RDD パーティションのすべてのレコードを送信します。

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

これにより、接続作成のオーバーヘッドが多くのレコードに償却されます。

最後に、複数の RDD/バッチ間で接続オブジェクトを再利用することで、これをさらに最適化できます。静的接続オブジェクトのプールを維持して、複数のバッチの RDD が外部システムにプッシュされるときに再利用できるようにすることで、オーバーヘッドをさらに削減できます。

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

プール内の接続は、オンデマンドで遅延して作成され、しばらく使用されない場合はタイムアウトする必要があることに注意してください。これにより、外部システムへのデータの送信が最も効率的になります。

その他の注意事項

DataFrame および SQL 操作

ストリーミング データに対してDataFrame と SQL 操作を簡単に使用できます。StreamingContext が使用している SparkContext を使用して SparkSession を作成する必要があります。さらに、これはドライバーの障害時に再起動できるように行う必要があります。これは、SparkSession の遅延初期化されたシングルトン インスタンスを作成することによって行われます。これは次の例で示されています。これは、DataFrame と SQL を使用して単語カウントを生成するために、以前の単語カウント例を変更したものです。各 RDD は DataFrame に変換され、一時テーブルとして登録され、SQL を使用してクエリされます。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

完全なソース コードを参照してください。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

完全なソース コードを参照してください。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ...

words.foreachRDD((rdd, time) -> {
  // Get the singleton instance of SparkSession
  SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

  // Convert RDD[String] to RDD[case class] to DataFrame
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
    JavaRow record = new JavaRow();
    record.setWord(word);
    return record;
  });
  DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

  // Creates a temporary view using the DataFrame
  wordsDataFrame.createOrReplaceTempView("words");

  // Do word count on table using SQL and print it
  DataFrame wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word");
  wordCountsDataFrame.show();
});

完全なソース コードを参照してください。

別のスレッド (つまり、実行中の StreamingContext とは非同期) からのストリーミング データに定義されたテーブルに対する SQL クエリを実行することもできます。クエリを実行するために十分な量のストリーミング データを記憶するように StreamingContext を設定するだけで十分です。そうしないと、非同期 SQL クエリを認識しない StreamingContext が、クエリが完了する前に古いストリーミング データを削除します。たとえば、最後のバッチをクエリしたいが、クエリに 5 分かかる場合、streamingContext.remember(Minutes(5)) (Scala、または他の言語で同等のもの) を呼び出します。

DataFrame の詳細については、「DataFrame と SQL」ガイドを参照してください。


MLlib 操作

MLlib で提供される機械学習アルゴリズムを簡単に使用することもできます。まず、ストリーミング データから同時に学習し、ストリーミング データにモデルを適用できるストリーミング機械学習アルゴリズム (例: ストリーミング線形回帰ストリーミング KMeans など) があります。これらを超えて、より広範な機械学習アルゴリズムのクラスについては、オフラインで学習モデルを学習し (つまり、履歴データを使用して)、その後、オンラインでストリーミング データにモデルを適用できます。詳細については、「MLlib」ガイドを参照してください。


キャッシュ / パーシステンス

RDD と同様に、DStream では開発者はストリームのデータをメモリに永続化することもできます。つまり、DStream で persist() メソッドを使用すると、その DStream のすべての RDD が自動的にメモリに永続化されます。これは、DStream のデータが複数回計算される場合 (例: 同じデータに対する複数の操作) に便利です。reduceByWindowreduceByKeyAndWindow のようなウィンドウベースの操作や、updateStateByKey のような状態ベースの操作では、これは暗黙的に true です。したがって、ウィンドウベースの操作によって生成された DStream は、開発者が persist() を呼び出さなくても、自動的にメモリに永続化されます。

ネットワーク経由でデータを受信する入力ストリーム (Kafka、ソケットなど) の場合、デフォルトの永続化レベルは、耐障害性のためにデータを 2 つのノードにレプリケートするように設定されています。

RDD とは異なり、DStream のデフォルトの永続化レベルでは、データはメモリにシリアライズされたまま保持されることに注意してください。これは、「メモリ チューニング」セクションでさらに説明されています。さまざまな永続化レベルの詳細については、Spark プログラミング ガイドを参照してください。


チェックポインティング

ストリーミング アプリケーションは 24 時間 365 日稼働する必要があるため、アプリケーション ロジック以外の障害 (システム障害、JVM クラッシュなど) に対して回復力がある必要があります。これが可能になるために、Spark Streaming は、障害から回復するために十分な情報を耐障害性のあるストレージ システムにチェックポイントする必要があります。チェックポイントされるデータの種類は 2 つあります。

要約すると、メタデータ チェックポインティングは主にドライバー障害からの回復に必要ですが、ステートフル変換が使用される場合は、データまたは RDD チェックポインティングが基本的な機能のためにも必要です。

チェックポインティングを有効にするタイミング

チェックポインティングは、次のいずれかの要件があるアプリケーションで有効にする必要があります。

注: 前述のステートフル変換を使用しない単純なストリーミング アプリケーションは、チェックポインティングを有効にせずに実行できます。この場合、ドライバー障害からの回復も部分的になります (受信されたが処理されていないデータが失われる可能性があります)。これは多くの場合許容範囲であり、多くの人が Spark Streaming アプリケーションをこの方法で実行しています。Hadoop 環境のサポートは、将来改善される予定です。

チェックポインティングの構成方法

チェックポインティングは、チェックポイント情報が保存される耐障害性のある信頼性の高いファイル システム (例: HDFS、S3 など) にディレクトリを設定することによって有効にできます。これは、streamingContext.checkpoint(checkpointDirectory) を使用して行われます。これにより、前述のステートフル変換を使用できるようになります。さらに、アプリケーションがドライバー障害から回復するようにしたい場合は、ストリーミング アプリケーションを次のように書き直す必要があります。

この動作は、StreamingContext.getOrCreate を使用することで簡単になります。次のように使用されます。

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)  # new context
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()

checkpointDirectory」が存在する場合、コンテキストはチェックポイント データから再作成されます。ディレクトリが存在しない場合 (つまり、初めて実行する場合)、functionToCreateContext 関数が呼び出されて新しいコンテキストが作成され、DStream がセットアップされます。Python の例 recoverable_network_wordcount.py を参照してください。この例では、ネットワーク データの単語カウントをファイルに追加します。

チェックポイント データから明示的に StreamingContext を作成し、StreamingContext.getOrCreate(checkpointDirectory, None) を使用して計算を開始することもできます。

この動作は、StreamingContext.getOrCreate を使用することで簡単になります。次のように使用されます。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

checkpointDirectory」が存在する場合、コンテキストはチェックポイント データから再作成されます。ディレクトリが存在しない場合 (つまり、初めて実行する場合)、functionToCreateContext 関数が呼び出されて新しいコンテキストが作成され、DStream がセットアップされます。Scala の例 RecoverableNetworkWordCount を参照してください。この例では、ネットワーク データの単語カウントをファイルに追加します。

この動作は、JavaStreamingContext.getOrCreate を使用することで簡単になります。次のように使用されます。

// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

checkpointDirectory」が存在する場合、コンテキストはチェックポイント データから再作成されます。ディレクトリが存在しない場合 (つまり、初めて実行する場合)、contextFactory 関数が呼び出されて新しいコンテキストが作成され、DStream がセットアップされます。Java の例 JavaRecoverableNetworkWordCount を参照してください。この例では、ネットワーク データの単語カウントをファイルに追加します。

「getOrCreate」を使用することに加えて、ドライバー プロセスが障害時に自動的に再起動されるようにする必要があります。これは、アプリケーションの実行に使用されるデプロイメント インフラストラクチャによってのみ可能です。これは、「デプロイメント」セクションでさらに説明されています。

注: RDD のチェックポインティングは、信頼性の高いストレージへの保存のコストを発生させます。これにより、RDD がチェックポイントされるバッチの処理時間が増加する可能性があります。したがって、チェックポインティングの間隔は慎重に設定する必要があります。バッチ サイズが小さい場合 (たとえば 1 秒)、各バッチをチェックポイントすると、操作のスループットが大幅に低下する可能性があります。逆に、チェックポインティングが不十分な場合、ラインエージとタスク サイズが大きくなり、悪影響を及ぼす可能性があります。RDD チェックポインティングを必要とするステートフル変換の場合、デフォルトの間隔はバッチ間隔の倍数で、少なくとも 10 秒です。dstream.checkpoint(checkpointInterval) を使用して設定できます。通常、DStream の 5 ~ 10 スライディング間隔のチェックポイント間隔は、試すのに適した設定です。


Accumulators, Broadcast Variables, および Checkpoints

Accumulators および Broadcast variables は、Spark Streaming でチェックポイントから回復することはできません。チェックポインティングを有効にして、Accumulators または Broadcast variables も使用する場合、ドライバーが障害後に再インスタンス化できるように、Accumulators および Broadcast variables の遅延初期化されたシングルトン インスタンスを作成する必要があります。これは次の例で示されています。

def getWordExcludeList(sparkContext):
    if ("wordExcludeList" not in globals()):
        globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
    return globals()["wordExcludeList"]

def getDroppedWordsCounter(sparkContext):
    if ("droppedWordsCounter" not in globals()):
        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
    return globals()["droppedWordsCounter"]

def echo(time, rdd):
    # Get or register the excludeList Broadcast
    excludeList = getWordExcludeList(rdd.context)
    # Get or register the droppedWordsCounter Accumulator
    droppedWordsCounter = getDroppedWordsCounter(rdd.context)

    # Use excludeList to drop words and use droppedWordsCounter to count them
    def filterFunc(wordCount):
        if wordCount[0] in excludeList.value:
            droppedWordsCounter.add(wordCount[1])
            False
        else:
            True

    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

完全なソース コードを参照してください。

object WordExcludeList {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordExcludeList = Seq("a", "b", "c")
          instance = sc.broadcast(wordExcludeList)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("DroppedWordsCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the excludeList Broadcast
  val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use excludeList to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (excludeList.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

完全なソース コードを参照してください。

class JavaWordExcludeList {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordExcludeList.class) {
        if (instance == null) {
          List<String> wordExcludeList = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordExcludeList);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("DroppedWordsCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the excludeList Broadcast
  Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use excludeList to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (excludeList.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

完全なソース コードを参照してください。


アプリケーションのデプロイ

このセクションでは、Spark Streaming アプリケーションをデプロイする手順について説明します。

要件

Spark Streaming アプリケーションを実行するには、次のものが必要です。

アプリケーション コードのアップグレード

実行中の Spark Streaming アプリケーションを新しいアプリケーション コードでアップグレードする必要がある場合、2 つのメカニズムが可能です。


アプリケーションの監視

Spark の監視機能を超えて、Spark Streaming 固有の追加機能があります。StreamingContext が使用されている場合、Spark Web UI に追加の Streaming タブが表示され、実行中のレシーバー (レシーバーがアクティブかどうか、受信レコード数、レシーバー エラーなど) と完了したバッチ (バッチ処理時間、キューイング遅延など) に関する統計情報が表示されます。これは、ストリーミング アプリケーションの進行状況を監視するために使用できます。

Web UI の次の 2 つのメトリックは特に重要です。

バッチ処理時間がバッチ間隔よりも一貫して長い場合、および/またはキューイング遅延が増加し続ける場合、システムが生成される速度でバッチを処理できず、遅れていることを示しています。その場合、バッチ処理時間を短縮することを検討してください。

Spark Streaming プログラムの進行状況は、StreamingListener インターフェイスを使用して監視することもできます。これにより、レシーバーの状態と処理時間を確認できます。これは開発者 API であり、将来的に改善される (つまり、より多くの情報が報告される) 可能性があります。



パフォーマンスチューニング

クラスターで Spark Streaming アプリケーションから最高のパフォーマンスを引き出すには、ある程度のチューニングが必要です。このセクションでは、アプリケーションのパフォーマンスを向上させるためにチューニングできる多くのパラメータと構成について説明します。大まかに言うと、2 つのことを考慮する必要があります。

  1. クラスター リソースを効率的に使用して、各データ バッチの処理時間を短縮する。

  2. データが受信される速度でバッチ データを処理できるように、適切なバッチ サイズを設定する (つまり、データ処理がデータ取り込みに追いつく)。

バッチ処理時間の短縮

各バッチの処理時間を最小限に抑えるために Spark で実行できる最適化が多数あります。これらは、「チューニング ガイド」で詳しく説明されています。このセクションでは、最も重要なものをいくつか紹介します。

データ受信の並列度

ネットワーク(Kafka、ソケットなど)を介してデータを受信するには、データをデシリアライズしてSparkに格納する必要があります。データ受信がシステムのボトルネックになっている場合は、データ受信の並列化を検討してください。各入力DStreamは、単一のデータストリームを受信する単一のレシーバー(ワーカーマシンで実行)を作成することに注意してください。したがって、複数のデータストリームを受信するには、複数の入力DStreamを作成し、それらをソースからのデータストリームの異なるパーティションを受信するように設定することで実現できます。たとえば、2つのトピックのデータを受信する単一のKafka入力DStreamは、それぞれ1つのトピックのみを受信する2つのKafka入力ストリームに分割できます。これにより、2つのレシーバーが実行され、データを並列に受信できるようになり、全体のスループットが増加します。これらの複数のDStreamは、unionedtogetherされて単一のDStreamを作成できます。次に、単一の入力DStreamに適用されていた変換は、統一されたストリームに適用できます。これは次のように行われます。

numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

考慮すべきもう1つのパラメータは、レシーバーのブロック間隔であり、これは設定パラメータspark.streaming.blockIntervalによって決定されます。ほとんどのレシーバーでは、受信したデータは、Sparkのメモリに格納される前に、データブロックにまとめられます。各バッチ内のブロックの数は、マップのような変換で受信したデータを処理するために使用されるタスクの数を決定します。バッチあたりのレシーバーあたりのタスク数は、約(バッチ間隔/ブロック間隔)になります。たとえば、ブロック間隔が200ミリ秒の場合、2秒バッチごとに10個のタスクが作成されます。タスクの数が少なすぎる(つまり、マシンあたりのコア数よりも少ない)場合、すべての利用可能なコアがデータを処理するために使用されないため、非効率的になります。指定されたバッチ間隔のタスク数を増やすには、ブロック間隔を短くします。ただし、ブロック間隔の推奨最小値は約50ミリ秒であり、それ以下ではタスク起動のオーバーヘッドが問題になる可能性があります。

複数の入力ストリーム/レシーバーでデータを受信する代わりに、入力データストリームを明示的に再パーティション化することもできます(inputStream.repartition(<number of partitions>)を使用)。これにより、受信したデータバッチが、クラスター内の指定された数のマシンに分散され、さらに処理されます。

Direct Streamについては、「Spark Streaming + Kafka Integration Guide」を参照してください。

データ処理における並列度

計算のいずれかの段階で使用される並列タスクの数が十分に高くない場合、クラスターリソースが過小評価される可能性があります。たとえば、reduceByKeyおよびreduceByKeyAndWindowのような分散reduce操作では、並列タスクのデフォルト数は、spark.default.parallelism設定プロパティによって制御されます。並列度を引数として渡すか(PairDStreamFunctionsのドキュメントを参照)、spark.default.parallelism設定プロパティを設定してデフォルトを変更できます。

データシリアライゼーション

データシリアライゼーションのオーバーヘッドは、シリアライゼーションフォーマットを調整することで削減できます。ストリーミングの場合、シリアライズされるデータの種類は2つあります。

どちらの場合も、Kryoシリアライゼーションを使用すると、CPUとメモリのオーバーヘッドの両方を削減できます。詳細については、「Spark Tuning Guide」を参照してください。Kryoの場合、カスタムクラスを登録し、オブジェクト参照追跡を無効にすることを検討してください(「Configuration Guide」のKryo関連の設定を参照)。

ストリーミングアプリケーションで保持する必要があるデータの量がそれほど多くない特定のケースでは、過剰なGCオーバーヘッドを発生させることなく、データ(両方の種類)をデシリアライズされたオブジェクトとして永続化することが実現可能かもしれません。たとえば、数秒のバッチ間隔を使用しており、ウィンドウ操作を使用していない場合、ストレージレベルを明示的に設定して永続化データ(両方の種類)のシリアライゼーションを無効にしてみてください。これにより、シリアライゼーションによるCPUオーバーヘッドが削減され、GCオーバーヘッドがそれほど大きくならずにパフォーマンスが向上する可能性があります。


適切なバッチ間隔の設定

クラスターで実行されているSpark Streamingアプリケーションが安定するためには、システムは受信される速度と同じ速度でデータを処理できる必要があります。つまり、データバッチは生成される速度と同じ速度で処理される必要があります。アプリケーションでこれが真実であるかどうかは、ストリーミングWeb UIでの処理時間を監視することで確認できます。そこでは、バッチ処理時間はバッチ間隔よりも短くなるはずです。

ストリーミング計算の性質に応じて、使用されるバッチ間隔は、固定されたクラスターリソースでアプリケーションが維持できるデータレートに大きな影響を与える可能性があります。たとえば、前のWordCountNetworkの例を考えてみましょう。特定のデータレートに対して、システムは2秒ごと(つまり、バッチ間隔が2秒)に単語カウントを報告することに対応できるかもしれませんが、500ミリ秒ごとには対応できないかもしれません。したがって、バッチ間隔は、運用環境での予想されるデータレートを維持できるような値に設定する必要があります。

アプリケーションに適切なバッチサイズを特定するための良いアプローチは、保守的なバッチ間隔(たとえば、5〜10秒)と低いデータレートでテストすることです。システムがデータレートに対応できるかどうかを確認するには、処理された各バッチが経験するエンドツーエンド遅延の値を確認します(Sparkドライバーのlog4jログの「Total delay」を探すか、StreamingListenerインターフェイスを使用します)。遅延がバッチサイズに匹敵するように維持されている場合、システムは安定しています。そうでない場合、遅延が継続的に増加している場合は、システムが対応できておらず、したがって不安定であることを意味します。安定した構成のアイデアが得られたら、データレートを増やしたり、バッチサイズを小さくしたりすることができます。一時的なデータレートの増加による遅延の瞬間的な増加は、遅延が低い値(つまり、バッチサイズ未満)に戻る限り、許容されることに注意してください。


メモリチューニング

Sparkアプリケーションのメモリ使用量とGCの動作のチューニングについては、「Tuning Guide」で詳細に説明されています。そちらを読むことを強くお勧めします。このセクションでは、Spark Streamingアプリケーションのコンテキストでいくつかのチューニングパラメータについて説明します。

Spark Streamingアプリケーションに必要なクラスターメモリの量は、使用される変換の種類に大きく依存します。たとえば、過去10分間のデータに対してウィンドウ操作を使用したい場合、クラスターには10分間のデータをメモリに保持するのに十分なメモリが必要です。または、多数のキーでupdateStateByKeyを使用したい場合、必要なメモリは高くなります。逆に、単純なマップ-フィルター-ストア操作を実行したい場合、必要なメモリは少なくなります。

一般的に、レシーバーを介して受信されるデータはStorageLevel.MEMORY_AND_DISK_SER_2で格納されるため、メモリに収まらないデータはディスクにスピルされます。これにより、ストリーミングアプリケーションのパフォーマンスが低下する可能性があるため、ストリーミングアプリケーションで必要とされる十分なメモリを提供することをお勧めします。小規模な環境でメモリ使用量を確認し、それに応じて見積もるのが最善です。

メモリチューニングのもう1つの側面は、ガベージコレクションです。低レイテンシを必要とするストリーミングアプリケーションでは、JVMガベージコレクションによる長い停止は望ましくありません。

メモリ使用量とGCオーバーヘッドをチューニングするのに役立ついくつかのパラメータがあります。


覚えておくべき重要な点


耐障害性セマンティクス

このセクションでは、障害発生時のSpark Streamingアプリケーションの動作について説明します。

背景

Spark Streamingが提供するセマンティクスを理解するために、SparkのRDDの基本的な耐障害性セマンティクスを思い出してみましょう。

  1. RDDは、不変で、決定論的に再計算可能な、分散データセットです。各RDDは、耐障害性のある入力データセットに対して、それを生成するために使用された決定論的な操作のラインエージを記憶しています。
  2. RDDのいずれかのパーティションがワーカーノードの障害により失われた場合、そのパーティションは、操作のラインエージを使用して元の耐障害性のあるデータセットから再計算できます。
  3. すべてのRDD変換が決定論的であると仮定すると、最終的な変換されたRDDのデータは、Sparkクラスターでの障害に関係なく常に同じになります。

SparkはHDFSまたはS3のような耐障害性のあるファイルシステム上のデータに対して操作を行います。したがって、耐障害性のあるデータから生成されたすべてのRDDも耐障害性があります。ただし、Spark Streamingの場合はそうではありません。ほとんどの場合、ネットワーク経由でデータを受信するためです(fileStreamが使用される場合を除く)。生成されたすべてのRDDに対して同じ耐障害性プロパティを実現するために、受信したデータはクラスター内のワーカーノードの複数のSparkエグゼキュータ間で複製されます(デフォルトのレプリケーションファクターは2)。これにより、システムには障害発生時に回復する必要のある2種類のデータが存在することになります。

  1. 受信および複製されたデータ - このデータは、他のノードにコピーが存在するため、単一のワーカーノードの障害を乗り越えます。
  2. 受信されたが複製のためにバッファリングされたデータ - これは複製されないため、このデータを回復する唯一の方法は、ソースから再度取得することです。

さらに、考慮すべき2種類の障害があります。

  1. ワーカーノードの障害 - エグゼキュータを実行しているワーカーノードのいずれかが障害を起こす可能性があり、それらのノードのすべてのメモリ内データが失われます。障害を起こしたノードでレシーバーが実行されていた場合、それらのバッファリングされたデータは失われます。
  2. ドライバーノードの障害 - Spark Streamingアプリケーションを実行しているドライバーノードが障害を起こした場合、当然SparkContextは失われ、すべてのエグゼキュータとそのメモリ内データは失われます。

この基本的な知識があれば、Spark Streamingの耐障害性セマンティクスを理解できます。

定義

ストリーミングシステムのセマンティクスは、多くの場合、各レコードがシステムによって何回処理されるかという観点から捉えられます。システムがすべての可能な運用条件(障害などにもかかわらず)で提供できる3種類の保証があります。

  1. 高々1回: 各レコードは、1回処理されるか、まったく処理されないかのいずれかです。
  2. 少なくとも1回: 各レコードは、1回以上処理されます。これは高々1回よりも強力で、データが失われないことを保証します。ただし、重複がある場合があります。
  3. ちょうど1回: 各レコードは、ちょうど1回処理されます。データは失われず、データは複数回処理されません。これは明らかに3つの中で最も強力な保証です。

基本的なセマンティクス

いずれのストリーム処理システムにおいても、大まかに言って、データ処理には3つのステップがあります。

  1. データの受信: レシーバーなどを使用してソースからデータを受信します。

  2. データの変換: 受信したデータは、DStreamおよびRDD変換を使用して変換されます。

  3. データのプッシュアウト: 最終的に変換されたデータは、ファイルシステム、データベース、ダッシュボードなどの外部システムにプッシュアウトされます。

ストリーミングアプリケーションがエンドツーエンドのちょうど1回保証を達成する必要がある場合、各ステップがちょうど1回の保証を提供する必要があります。つまり、各レコードはちょうど1回受信され、ちょうど1回変換され、ちょうど1回ダウンストリームシステムにプッシュアウトされる必要があります。Spark Streamingのコンテキストでこれらのステップのセマンティクスを理解しましょう。

  1. データの受信: 異なる入力ソースは、異なる保証を提供します。これは次のサブセクションで詳細に説明されています。

  2. データの変換: 受信されたすべてのデータは、RDDが提供する保証のおかげで、ちょうど1回処理されます。障害が発生した場合でも、受信した入力データがアクセス可能である限り、最終的な変換されたRDDは常に同じ内容になります。

  3. データのプッシュアウト: 出力操作は、デフォルトで少なくとも1回のセマンティクスを保証します。これは、出力操作の種類(冪等かそうでないか)とダウンストリームシステムのセマンティクス(トランザクションをサポートするかどうか)に依存するためです。ただし、ユーザーは独自のトランザクションメカニズムを実装してちょうど1回のセマンティクスを達成できます。これは、このセクションの後半でさらに詳しく説明します。

受信データのセマンティクス

異なる入力ソースは、少なくとも1回からちょうど1回まで、さまざまな保証を提供します。詳細については、こちらをお読みください。

ファイルの場合

すべての入力データがHDFSのような耐障害性のあるファイルシステムに既に存在する場合、Spark Streamingは任意の障害から常に回復し、すべてのデータを処理できます。これにより、ちょうど1回のセマンティクスが得られます。つまり、何が障害を起こしても、すべてのデータはちょうど1回処理されます。

レシーバーベースのソースの場合

レシーバーベースの入力ソースの場合、耐障害性セマンティクスは、障害シナリオとレシーバーの種類の両方に依存します。以前の説明で説明したように、レシーバーには2種類あります。

  1. 信頼性の高いレシーバー - これらのレシーバーは、受信したデータが複製されたことを確認した後でのみ、信頼性の高いソースを認識します。そのようなレシーバーが障害を起こした場合、ソースはバッファリングされた(複製されていない)データの確認応答を受け取りません。したがって、レシーバーが再起動されると、ソースはデータを再送信し、障害によるデータの損失はありません。
  2. 信頼性の低いレシーバー - そのようなレシーバーは確認応答を送信せず、したがってワーカーまたはドライバーの障害による障害時にデータを失う可能性があります。

使用しているレシーバーの種類に応じて、次のセマンティクスが得られます。ワーカーノードが障害を起こした場合、信頼性の高いレシーバーではデータ損失はありません。信頼性の低いレシーバーでは、受信されたが複製されていないデータが失われる可能性があります。ドライバーノードが障害を起こした場合、これらの損失に加えて、受信されメモリに複製された過去のすべてのデータが失われます。これはステートフル変換の結果に影響します。

過去に受信したデータの損失を防ぐために、Spark 1.2では書き込み前ログが導入され、受信したデータを耐障害性のあるストレージに保存します。書き込み前ログを有効にし、信頼性の高いレシーバーを使用すると、データ損失はゼロになります。セマンティクスの観点からは、少なくとも1回の保証を提供します。

次の表は、障害発生時のセマンティクスをまとめたものです。

デプロイメントシナリオ ワーカー障害 ドライバー障害
Spark 1.1以前、または
書き込み前ログなしのSpark 1.2以降
信頼性の低いレシーバーではバッファリングされたデータが失われる
信頼性の高いレシーバーではデータ損失ゼロ
少なくとも1回のセマンティクス
信頼性の低いレシーバーではバッファリングされたデータが失われる
すべてのレシーバーで過去のデータが失われる
未定義のセマンティクス
書き込み前ログありのSpark 1.2以降 信頼性の高いレシーバーではデータ損失ゼロ
少なくとも1回のセマンティクス
信頼性の高いレシーバーとファイルではデータ損失ゼロ
少なくとも1回のセマンティクス

Kafka Direct APIを使用

Spark 1.3では、新しいKafka Direct APIが導入され、すべてのKafkaデータがSpark Streamingにちょうど1回受信されることが保証されます。これに加えて、ちょうど1回の出力操作を実装すると、エンドツーエンドのちょうど1回保証を達成できます。このアプローチについては、「Kafka Integration Guide」でさらに詳しく説明します。

出力操作のセマンティクス

出力操作(foreachRDDなど)は少なくとも1回のセマンティクスを持ちます。つまり、ワーカー障害が発生した場合、変換されたデータが外部エンティティに複数回書き込まれる可能性があります。これは、saveAs***Files操作を使用してファイルシステムに保存する場合には許容されます(ファイルは同じデータで上書きされるだけです)。ただし、ちょうど1回のセマンティクスを達成するには、追加の作業が必要になる場合があります。2つのアプローチがあります。



次へ進む