Spark Streaming プログラミングガイド

注意

Spark Streaming は、Spark のストリーミングエンジンの前の世代のものです。Spark Streaming のアップデートはもうなく、レガシープロジェクトです。Spark には、構造化ストリーミングという、より新しく、使いやすいストリーミングエンジンがあります。ストリーミングアプリケーションとパイプラインには、Spark 構造化ストリーミングを使用する必要があります。構造化ストリーミングプログラミングガイドを参照してください。

概要

Spark Streaming は、コア Spark API の拡張であり、ライブデータストリームのスケーラブルで高スループット、フォールトトレラントなストリーム処理を可能にします。データは、Kafka、Kinesis、TCP ソケットなど、多くのソースから取り込むことができ、mapreducejoinwindow のような高レベルの関数で表現された複雑なアルゴリズムを使用して処理できます。最後に、処理されたデータは、ファイルシステム、データベース、およびライブダッシュボードにプッシュできます。実際、Spark の機械学習およびグラフ処理アルゴリズムをデータストリームに適用できます。

Spark Streaming

内部的には、次のように動作します。Spark Streaming はライブ入力データストリームを受信し、データをバッチに分割します。その後、Spark エンジンがバッチを処理して、バッチで結果の最終ストリームを生成します。

Spark Streaming

Spark Streaming は、離散化ストリームまたはDStreamと呼ばれる高レベルの抽象化を提供します。これは、データの連続ストリームを表します。DStream は、Kafka や Kinesis などのソースからの入力データストリームから作成するか、他の DStream で高レベルの操作を適用することによって作成できます。内部的には、DStream はRDDのシーケンスとして表されます。

このガイドでは、DStream を使用して Spark Streaming プログラムの記述を開始する方法を示します。Spark Streaming プログラムは Scala、Java、または Python (Spark 1.2 で導入) で記述できます。これらはすべてこのガイドで紹介されています。このガイド全体で、さまざまな言語のコードスニペットを選択できるタブが表示されます。

注意: Python では、いくつかの API が異なっているか、利用できないものがあります。このガイド全体で、これらの違いを強調するPython APIタグが表示されます。


簡単な例

独自の Spark Streaming プログラムの記述方法の詳細に入る前に、簡単な Spark Streaming プログラムがどのようなものかを見てみましょう。TCP ソケットでリッスンしているデータサーバーから受信したテキストデータの単語数をカウントするとします。必要なのは次のとおりです。

まず、すべてのストリーミング機能のメインエントリポイントであるStreamingContextをインポートします。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)

このコンテキストを使用して、ホスト名(例:localhost)とポート(例:9999)として指定された TCP ソースからのストリーミングデータを表す DStream を作成できます。

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)

この lines DStream は、データサーバーから受信されるデータのストリームを表します。この DStream の各レコードはテキストの行です。次に、行をスペースで単語に分割します。

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

flatMap は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する 1 対多の DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words DStream として表されます。次に、これらの単語をカウントします。

# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()

words DStream は、さらに (word, 1) ペアの DStream にマップされ (1 対 1 の変換)、その後、各データのバッチ内の単語の頻度を取得するために縮小されます。最後に、wordCounts.pprint() は、毎秒生成されたカウントの一部を出力します。

これらの行が実行されると、Spark Streaming は、起動時に実行する計算のみを設定し、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に次を呼び出します。

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate

完全なコードは、Spark Streaming の例であるNetworkWordCountにあります。

まず、Spark Streaming クラスの名前と、StreamingContext から環境へのいくつかの暗黙的な変換をインポートして、他の必要なクラス (DStream など) に便利なメソッドを追加します。StreamingContextは、すべてのストリーミング機能のメインエントリポイントです。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

このコンテキストを使用して、ホスト名(例:localhost)とポート(例:9999)として指定された TCP ソースからのストリーミングデータを表す DStream を作成できます。

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

この lines DStream は、データサーバーから受信されるデータのストリームを表します。この DStream の各レコードはテキストの行です。次に、行をスペース文字で単語に分割します。

// Split each line into words
val words = lines.flatMap(_.split(" "))

flatMap は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する 1 対多の DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words DStream として表されます。次に、これらの単語をカウントします。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

words DStream は、さらに (word, 1) ペアの DStream にマップされ (1 対 1 の変換)、その後、各データのバッチ内の単語の頻度を取得するために縮小されます。最後に、wordCounts.print() は、毎秒生成されたカウントの一部を出力します。

これらの行が実行されると、Spark Streaming は、起動時に実行する計算のみを設定し、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に次を呼び出します。

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

完全なコードは、Spark Streaming の例であるNetworkWordCountにあります。

まず、すべてのストリーミング機能のメインエントリポイントであるJavaStreamingContextオブジェクトを作成します。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;

// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

このコンテキストを使用して、ホスト名(例:localhost)とポート(例:9999)として指定された TCP ソースからのストリーミングデータを表す DStream を作成できます。

// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

この lines DStream は、データサーバーから受信されるデータのストリームを表します。このストリームの各レコードはテキストの行です。次に、行をスペースで単語に分割します。

// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());

flatMap は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words DStream として表されます。FlatMapFunction オブジェクトを使用して変換を定義したことに注意してください。今後明らかにするように、Java API には DStream 変換の定義に役立つこのような便利なクラスが多数あります。

次に、これらの単語をカウントします。

// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();

words DStream は、PairFunction オブジェクトを使用して、さらに (word, 1) ペアの DStream にマップされます (1 対 1 の変換)。次に、Function2 オブジェクトを使用して、各データのバッチ内の単語の頻度を取得するために削減されます。最後に、wordCounts.print() は、毎秒生成されたカウントの一部を出力します。

これらの行が実行されると、Spark Streaming は、起動後に実行する計算のみを設定し、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に start メソッドを呼び出します。

jssc.start();              // Start the computation
jssc.awaitTermination();   // Wait for the computation to terminate

完全なコードは、Spark Streaming の例であるJavaNetworkWordCountにあります。

すでにダウンロードしてビルドした場合、この例を次のように実行できます。最初に、次を使用して、データサーバーとして Netcat (ほとんどの Unix ライクなシステムで見られる小さなユーティリティ) を実行する必要があります。

$ nc -lk 9999

次に、別のターミナルで、次を使用して例を起動できます。

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

その後、netcat サーバーを実行しているターミナルで入力されたすべての行がカウントされ、毎秒画面に出力されます。次のようになります。

# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
# TERMINAL 2: RUNNING network_wordcount.py

$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
...
-------------------------------------------
Time: 2014-10-14 15:25:21
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...
# TERMINAL 2: RUNNING JavaNetworkWordCount

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...


基本的な概念

次に、簡単な例を超えて、Spark Streamingの基本について詳しく説明します。

リンク

Sparkと同様に、Spark StreamingはMaven Centralを通じて利用できます。独自のSpark Streamingプログラムを作成するには、SBTまたはMavenプロジェクトに次の依存関係を追加する必要があります。

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>3.5.1</version>
    <scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.5.1" % "provided"

Spark StreamingコアAPIにはないKafkaやKinesisなどのソースからデータを取り込むには、対応するアーティファクト spark-streaming-xyz_2.12 を依存関係に追加する必要があります。例えば、一般的なものは次のとおりです。

ソースアーティファクト
Kafkaspark-streaming-kafka-0-10_2.12
Kinesis
spark-streaming-kinesis-asl_2.12 [Amazon Software License]

最新のリストについては、サポートされているソースとアーティファクトの完全なリストについて、Mavenリポジトリを参照してください。


StreamingContext の初期化

Spark Streamingプログラムを初期化するには、すべてのSpark Streaming機能のメインエントリポイントであるStreamingContextオブジェクトを作成する必要があります。

StreamingContextオブジェクトは、SparkContextオブジェクトから作成できます。

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)

appNameパラメータは、クラスタUIに表示するアプリケーションの名前です。masterは、Spark、Mesos、またはYARNクラスタのURL、またはローカルモードで実行するための特別な“local[*]”文字列です。実際には、クラスタで実行する場合、プログラムにmasterをハードコードするのではなく、spark-submitでアプリケーションを起動してそこで受け取るようにします。ただし、ローカルテストやユニットテストでは、“local[*]”を渡してSpark Streamingをインプロセスで実行できます(ローカルシステムのコア数を検出します)。

バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスタリソースに基づいて設定する必要があります。詳細については、「パフォーマンスチューニング」セクションを参照してください。

StreamingContextオブジェクトは、SparkConfオブジェクトから作成できます。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appNameパラメータは、クラスタUIに表示するアプリケーションの名前です。masterは、Spark、Mesos、Kubernetes、またはYARNクラスタのURL、またはローカルモードで実行するための特別な“local[*]”文字列です。実際には、クラスタで実行する場合、プログラムにmasterをハードコードするのではなく、spark-submitでアプリケーションを起動してそこで受け取るようにします。ただし、ローカルテストやユニットテストでは、“local[*]”を渡してSpark Streamingをインプロセスで実行できます(ローカルシステムのコア数を検出します)。これは内部的にSparkContext(すべてのSpark機能の開始点)を作成し、ssc.sparkContextとしてアクセスできることに注意してください。

バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスタリソースに基づいて設定する必要があります。詳細については、「パフォーマンスチューニング」セクションを参照してください。

StreamingContextオブジェクトは、既存のSparkContextオブジェクトから作成することもできます。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

JavaStreamingContextオブジェクトは、SparkConfオブジェクトから作成できます。

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

appNameパラメータは、クラスタUIに表示するアプリケーションの名前です。masterは、Spark、Mesos、またはYARNクラスタのURL、またはローカルモードで実行するための特別な“local[*]”文字列です。実際には、クラスタで実行する場合、プログラムにmasterをハードコードするのではなく、spark-submitでアプリケーションを起動してそこで受け取るようにします。ただし、ローカルテストやユニットテストでは、“local[*]”を渡してSpark Streamingをインプロセスで実行できます。これは内部的にJavaSparkContext(すべてのSpark機能の開始点)を作成し、ssc.sparkContextとしてアクセスできることに注意してください。

バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスタリソースに基づいて設定する必要があります。詳細については、「パフォーマンスチューニング」セクションを参照してください。

JavaStreamingContextオブジェクトは、既存のJavaSparkContextから作成することもできます。

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

コンテキストが定義されたら、次のことを行う必要があります。

  1. 入力DStreamを作成して入力ソースを定義します。
  2. DStreamに変換および出力操作を適用して、ストリーミング計算を定義します。
  3. streamingContext.start()を使用して、データの受信と処理を開始します。
  4. streamingContext.awaitTermination()を使用して、処理が停止される(手動またはエラーによる)のを待ちます。
  5. 処理は、streamingContext.stop()を使用して手動で停止できます。
覚えておくべきポイント

離散化ストリーム (DStream)

Discretized StreamまたはDStreamは、Spark Streamingによって提供される基本的な抽象化です。これは、ソースから受信した入力データストリーム、または入力ストリームを変換して生成された処理済みデータストリームの連続ストリームを表します。内部的には、DStreamは、不変の分散データセットのSparkの抽象化である一連の連続したRDDによって表されます(詳細については、Sparkプログラミングガイドを参照してください)。DStream内の各RDDには、次の図に示すように、特定の期間からのデータが含まれています。

Spark Streaming

DStreamに適用された操作は、基になるRDDに対する操作に変換されます。たとえば、前の例の行のストリームを単語に変換する場合、flatMap操作は、lines DStreamの各RDDに適用され、words DStreamのRDDを生成します。これを次の図に示します。

Spark Streaming

これらの基になるRDD変換は、Sparkエンジンによって計算されます。DStream操作はこれらの詳細のほとんどを隠し、開発者に便利な高レベルのAPIを提供します。これらの操作については、後のセクションで詳しく説明します。


入力 DStream とレシーバー

入力DStreamは、ストリーミングソースから受信した入力データのストリームを表すDStreamです。簡単な例では、linesは、netcatサーバーから受信したデータストリームを表していたため、入力DStreamでした。すべての入力DStream(このセクションで後述するファイルストリームを除く)は、ソースからデータを受信し、処理のためにSparkのメモリに保存するReceiverScala docJava doc)オブジェクトに関連付けられています。

Spark Streamingは、2つのカテゴリの組み込みストリーミングソースを提供します。

このセクションの後半で、各カテゴリに存在するいくつかのソースについて説明します。

ストリーミングアプリケーションで複数のデータストリームを並行して受信する場合は、複数の入力DStreamを作成できます(詳細については、「データ受信の並列処理レベル」セクションを参照してください)。これにより、複数のデータストリームを同時に受信する複数のレシーバーが作成されます。ただし、Sparkワーカー/エグゼキューターは長時間実行されるタスクであるため、Spark Streamingアプリケーションに割り当てられたコアの1つを占有することに注意してください。したがって、Spark Streamingアプリケーションには、受信したデータを処理し、レシーバーを実行するのに十分なコア(またはローカルで実行する場合はスレッド)が割り当てられている必要があることに注意することが重要です。

覚えておくべきポイント

基本ソース

TCPソケット接続で受信したテキストデータからDStreamを作成する、簡単な例ssc.socketTextStream(...)についてはすでに見てきました。ソケットに加えて、StreamingContext APIは、ファイルを入力ソースとしてDStreamを作成するためのメソッドを提供します。

ファイルストリーム

HDFS APIと互換性のあるファイルシステム(つまり、HDFS、S3、NFSなど)上のファイルからデータを読み取る場合、StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]を介してDStreamを作成できます。

ファイルストリームはレシーバーの実行を必要としないため、ファイルデータを受信するためにコアを割り当てる必要はありません。

単純なテキストファイルの場合、最も簡単な方法はStreamingContext.textFileStream(dataDirectory)です。

fileStreamはPython APIでは利用できません。textFileStreamのみが利用可能です。

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

テキストファイルの場合

streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

テキストファイルの場合

streamingContext.textFileStream(dataDirectory);
ディレクトリの監視方法

Spark Streamingは、ディレクトリdataDirectoryを監視し、そのディレクトリに作成されたファイルを処理します。

データソースとしてオブジェクトストアを使用する

HDFSのような「フル」ファイルシステムでは、出力ストリームが作成されるとすぐにファイルに変更時刻が設定される傾向があります。ファイルが開かれると、データが完全に書き込まれる前であっても、DStreamに含まれる可能性があり、その後、同じウィンドウ内でのファイルへの更新は無視されます。つまり、変更が見逃されたり、データがストリームから省略されたりする可能性があります。

変更がウィンドウ内で確実に選択されるようにするには、ファイルを監視対象外のディレクトリに書き込み、出力ストリームが閉じられた直後に、宛先ディレクトリに名前を変更します。名前が変更されたファイルが、作成ウィンドウ中にスキャンされた宛先ディレクトリに表示されれば、新しいデータが選択されます。

対照的に、Amazon S3やAzure Storageなどのオブジェクトストアは、データが実際にコピーされるため、名前変更操作が遅いのが通常です。さらに、名前が変更されたオブジェクトの変更時刻はrename()操作の時刻になる可能性があるため、元の作成時刻が示唆するウィンドウの一部とは見なされない場合があります。

ターゲットのオブジェクトストアに対して、ストアのタイムスタンプ動作がSpark Streamingで想定される動作と一貫していることを確認するために、慎重なテストが必要です。選択したオブジェクトストアを介してデータをストリーミングする場合、宛先ディレクトリに直接書き込むのが適切な戦略である可能性があります。

このトピックの詳細については、Hadoopファイルシステム仕様を参照してください。

カスタムレシーバーに基づくストリーム

DStreamは、カスタムレシーバーを介して受信したデータストリームで作成できます。詳細については、カスタムレシーバーガイドを参照してください。

ストリームとしてのRDDのキュー

テストデータを使用してSpark Streamingアプリケーションをテストする場合、streamingContext.queueStream(queueOfRDDs)を使用して、RDDのキューに基づいてDStreamを作成することもできます。キューにプッシュされた各RDDは、DStream内のデータのバッチとして扱われ、ストリームのように処理されます。

ソケットとファイルからのストリームの詳細については、Scalaの場合はStreamingContext、Javaの場合はJavaStreamingContext、Pythonの場合はStreamingContextの関連関数のAPIドキュメントを参照してください。

高度なソース

Python API Spark 3.5.1の時点で、これらのソースのうち、KafkaとKinesisがPython APIで利用可能です。

このカテゴリのソースでは、外部の非Sparkライブラリとのインターフェイスが必要であり、一部には複雑な依存関係(例:Kafka)があります。したがって、依存関係のバージョン競合に関連する問題を最小限に抑えるために、これらのソースからDStreamを作成する機能は、必要に応じて明示的にリンクできる別のライブラリに移動されました。

これらの高度なソースはSparkシェルでは利用できないため、これらの高度なソースに基づくアプリケーションはシェルでテストできないことに注意してください。Sparkシェルでどうしても使用したい場合は、対応するMavenアーティファクトのJARとその依存関係をダウンロードして、クラスパスに追加する必要があります。

これらの高度なソースの一部を次に示します。

カスタムソース

Python API これはPythonではまだサポートされていません。

カスタムデータソースから入力DStreamを作成することもできます。必要なのは、カスタムソースからデータを受信してSparkにプッシュできるユーザー定義のレシーバー(それが何かを理解するには次のセクションを参照)を実装することだけです。詳細については、カスタムレシーバーガイドを参照してください。

レシーバーの信頼性

信頼性に基づいて、2種類のデータソースがあります。(Kafkaのような)ソースでは、転送されたデータが確認応答されるようにすることができます。これらの信頼できるソースからデータを受信するシステムが、受信したデータを正しく確認応答する場合、いかなる障害によってもデータが失われないようにすることができます。これにより、2種類のレシーバーが生まれます。

  1. 信頼できるレシーバー - 信頼できるレシーバーは、データが受信され、複製とともにSparkに格納されたときに、信頼できるソースに正しく確認応答を送信します。
  2. 信頼できないレシーバー - 信頼できないレシーバーは、ソースに確認応答を送信しません。これは、確認応答をサポートしないソース、または確認応答の複雑さを望まない場合、または必要としない場合に、信頼できるソースに使用できます。

信頼できるレシーバーを記述する方法の詳細は、カスタムレシーバーガイドで説明されています。


DStream の変換

RDDと同様に、変換により、入力DStreamからのデータを変更できます。DStreamは、通常のSpark RDDで使用できる変換の多くをサポートします。一般的なものを次に示します。

変換意味
map(func) ソースDStreamの各要素を関数funcに通すことで、新しいDStreamを返します。
flatMap(func) mapに似ていますが、各入力項目を0個以上の出力項目にマッピングできます。
filter(func) funcがtrueを返すソースDStreamのレコードのみを選択して、新しいDStreamを返します。
repartition(numPartitions) より多くのパーティションを作成するか、より少ないパーティションを作成することにより、このDStreamの並列処理のレベルを変更します。
union(otherStream) ソースDStreamとotherDStreamの要素の和集合を含む新しいDStreamを返します。
count() ソースDStreamの各RDD内の要素数をカウントすることにより、単一要素のRDDの新しいDStreamを返します。
reduce(func) 関数func(2つの引数を取り、1つを返す)を使用して、ソースDStreamの各RDD内の要素を集計することにより、単一要素のRDDの新しいDStreamを返します。関数は並行して計算できるように、結合的かつ可換である必要があります。
countByValue() 型Kの要素のDStreamで呼び出されると、各キーの値がソースDStreamの各RDD内の頻度である(K、Long)ペアの新しいDStreamを返します。
reduceByKey(func, [numTasks]) (K, V)ペアのDStreamで呼び出されると、指定されたreduce関数を使用して各キーの値が集計された(K, V)ペアの新しいDStreamを返します。注:デフォルトでは、これはSparkのデフォルトの並列タスク数(ローカルモードでは2、クラスターモードでは設定プロパティspark.default.parallelismによって決定される数)を使用してグループ化を行います。オプションのnumTasks引数を渡して、別のタスク数を設定できます。
join(otherStream, [numTasks]) (K, V)ペアと(K, W)ペアの2つのDStreamで呼び出されると、各キーのすべての要素ペアを含む(K, (V, W))ペアの新しいDStreamを返します。
cogroup(otherStream, [numTasks]) (K, V)ペアと(K, W)ペアのDStreamで呼び出されると、(K, Seq[V], Seq[W])タプルの新しいDStreamを返します。
transform(func) ソースDStreamのすべてのRDDにRDD-to-RDD関数を適用することにより、新しいDStreamを返します。これは、DStreamで任意のRDD操作を実行するために使用できます。
updateStateByKey(func) 各キーの状態が、キーの前の状態とキーの新しい値に指定された関数を適用することによって更新される新しい「状態」DStreamを返します。これは、各キーの任意の状態データを維持するために使用できます。

これらの変換のいくつかは、さらに詳しく説明する価値があります。

UpdateStateByKey操作

updateStateByKey操作を使用すると、新しい情報で継続的に更新しながら、任意の状態を維持できます。これを使用するには、2つのステップを実行する必要があります。

  1. 状態を定義する - 状態は任意のデータ型にできます。
  2. 状態更新関数を定義する - 前の状態と入力ストリームからの新しい値を使用して状態を更新する方法を関数で指定します。

すべてのバッチで、Sparkは、バッチに新しいデータがあるかどうかに関係なく、すべての既存のキーに対して状態更新関数を適用します。更新関数がNoneを返す場合、キーと値のペアは削除されます。

例でこれを説明しましょう。テキストデータストリームで見られた各単語の実行中のカウントを維持したいとします。ここで、実行中のカウントは状態であり、整数です。更新関数を次のように定義します。

def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)  # add the new values with the previous running count to get the new count

これは、単語を含むDStream(たとえば、前の例(word, 1)ペアを含むpairs DStream)に適用されます。

runningCounts = pairs.updateStateByKey(updateFunction)

更新関数は、各単語に対して、1のシーケンス((word, 1)ペアから)を持つnewValuesと、前のカウントを持つrunningCountを使用して呼び出されます。完全なPythonコードについては、stateful_network_wordcount.pyの例を参照してください。

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

これは、単語を含むDStream(たとえば、前の例(word, 1)ペアを含むpairs DStream)に適用されます。

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新関数は、各単語に対して、1のシーケンス((word, 1)ペアから)を持つnewValuesと、前のカウントを持つrunningCountを使用して呼び出されます。

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

これは、単語を含むDStream(たとえば、簡単な例(word, 1)ペアを含むpairs DStream)に適用されます。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

更新関数は、各単語に対して、1のシーケンス((word, 1)ペアから)を持つnewValuesと、前のカウントを持つrunningCountを使用して呼び出されます。完全なJavaコードについては、JavaStatefulNetworkWordCount.javaの例を参照してください。

updateStateByKeyを使用するには、チェックポイントディレクトリを構成する必要があることに注意してください。これについては、チェックポイントセクションで詳しく説明します。

Transform操作

transform操作(およびtransformWithなどのバリエーション)を使用すると、DStreamに任意のRDD-to-RDD関数を適用できます。これは、DStream APIで公開されていない任意のRDD操作を適用するために使用できます。たとえば、データストリーム内のすべてのバッチを別のデータセットと結合する機能は、DStream APIでは直接公開されていません。ただし、transformを使用して簡単にこれを行うことができます。これにより、非常に強力な可能性が可能になります。たとえば、入力データストリームを事前に計算されたスパム情報(おそらくSparkでも生成された)と結合し、それに基づいてフィルタリングすることにより、リアルタイムデータクレンジングを行うことができます。

spamInfoRDD = sc.pickleFile(...)  # RDD containing spam information

# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts.transform { rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

指定された関数は、すべてのバッチ間隔で呼び出されることに注意してください。これにより、時間変化するRDD操作を実行できます。つまり、RDD操作、パーティション数、ブロードキャスト変数などをバッチ間で変更できます。

ウィンドウ操作

Spark Streaming は、データのスライディングウィンドウ上で変換を適用できるウィンドウ計算も提供します。次の図は、このスライディングウィンドウを示しています。

Spark Streaming

図に示すように、ウィンドウがソース DStream 上をスライドするたびに、ウィンドウ内に収まるソース RDD が結合され、操作されて、ウィンドウ化された DStream の RDD が生成されます。この特定のケースでは、操作は過去 3 タイムユニットのデータに適用され、2 タイムユニットずつスライドします。これは、ウィンドウ操作には 2 つのパラメーターを指定する必要があることを示しています。

これらの 2 つのパラメーターは、ソース DStream のバッチ間隔(図では 1)の倍数である必要があります。

ウィンドウ操作を例で説明しましょう。たとえば、前の例を拡張して、過去 30 秒間のデータの単語数を 10 秒ごとに生成するとします。これを行うには、過去 30 秒間のデータに対して、(word, 1) ペアの pairs DStream に対して reduceByKey 操作を適用する必要があります。これは、reduceByKeyAndWindow 操作を使用して行われます。

# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

一般的なウィンドウ操作の一部を以下に示します。これらの操作はすべて、上記の 2 つのパラメーター - windowLengthslideInterval を取ります。

変換意味
window(windowLength, slideInterval) ソース DStream のウィンドウ化されたバッチに基づいて計算される新しい DStream を返します。
countByWindow(windowLength, slideInterval) ストリーム内の要素のスライディングウィンドウカウントを返します。
reduceByWindow(func, windowLength, slideInterval) func を使用してスライディング間隔にわたってストリーム内の要素を集計することによって作成された、新しい単一要素ストリームを返します。この関数は、並列で正しく計算できるように、結合的かつ可換である必要があります。
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) (K, V) ペアの DStream で呼び出されると、各キーの値がスライディングウィンドウ内のバッチに対して指定された reduce 関数 func を使用して集計される、(K, V) ペアの新しい DStream を返します。注意: デフォルトでは、これは Spark のデフォルトの並列タスク数(ローカルモードの場合は 2、クラスターモードの場合は構成プロパティ spark.default.parallelism によって決定される数)を使用してグループ化を行います。オプションの numTasks 引数を渡して、異なるタスク数を設定できます。
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])

上記の reduceByKeyAndWindow() のより効率的なバージョンで、各ウィンドウの reduce 値は、前のウィンドウの reduce 値を使用して増分的に計算されます。これは、スライディングウィンドウに入る新しいデータを reduce し、ウィンドウから出る古いデータを「逆 reduce」することで行われます。例としては、ウィンドウがスライドするときにキーのカウントを「加算」および「減算」することです。ただし、これは「可逆 reduce 関数」、つまり、対応する「逆 reduce」関数(パラメーター invFunc として受け取られる)を持つ reduce 関数にのみ適用可能です。reduceByKeyAndWindow のように、reduce タスクの数はオプションの引数を使用して構成可能です。この操作を使用するには、チェックポインティングを有効にする必要があることに注意してください。

countByValueAndWindow(windowLength, slideInterval, [numTasks]) (K, V) ペアの DStream で呼び出されると、各キーの値がスライディングウィンドウ内の頻度である (K, Long) ペアの新しい DStream を返します。reduceByKeyAndWindow のように、reduce タスクの数はオプションの引数を使用して構成可能です。

結合操作

最後に、Spark Streaming でさまざまな種類の結合をどれほど簡単に行うことができるかを強調しておく価値があります。

ストリーム間の結合

ストリームは、他のストリームと非常に簡単に結合できます。

stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

ここで、各バッチ間隔で、stream1 によって生成された RDD は、stream2 によって生成された RDD と結合されます。leftOuterJoinrightOuterJoinfullOuterJoin も実行できます。さらに、ストリームのウィンドウ上で結合を行うと非常に便利なことがよくあります。これも非常に簡単です。

windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
ストリームとデータセットの結合

これは、DStream.transform 操作を説明するときにすでに示しました。以下は、ウィンドウ化されたストリームをデータセットと結合する別の例です。

dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

実際には、結合するデータセットを動的に変更することもできます。transform に提供される関数は、すべてのバッチ間隔で評価されるため、dataset 参照が指す現在のデータセットが使用されます。

DStream 変換の完全なリストは、API ドキュメントにあります。Scala API については、DStreamPairDStreamFunctions を参照してください。Java API については、JavaDStreamJavaPairDStream を参照してください。Python API については、DStream を参照してください。


DStream の出力操作

出力操作を使用すると、DStream のデータをデータベースやファイルシステムなどの外部システムにプッシュできます。出力操作は、変換されたデータを外部システムが実際に使用できるようにするため、すべての DStream 変換の実際の実行をトリガーします(RDD のアクションと同様)。現在、以下の出力操作が定義されています

出力操作意味
print() ストリーミングアプリケーションを実行しているドライバーノードで、DStream のすべてのデータバッチの最初の 10 個の要素を出力します。これは、開発やデバッグに役立ちます。
Python API これは、Python API では pprint() と呼ばれます。
saveAsTextFiles(prefix, [suffix]) この DStream の内容をテキストファイルとして保存します。各バッチ間隔でのファイル名は、prefixsuffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix, [suffix]) この DStream の内容をシリアル化された Java オブジェクトの SequenceFiles として保存します。各バッチ間隔でのファイル名は、prefixsuffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"
Python API これは Python API では使用できません。
saveAsHadoopFiles(prefix, [suffix]) この DStream の内容を Hadoop ファイルとして保存します。各バッチ間隔でのファイル名は、prefixsuffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"
Python API これは Python API では使用できません。
foreachRDD(func) ストリームから生成された各 RDD に関数 func を適用する、最も汎用的な出力演算子です。この関数は、各 RDD のデータをファイルへの保存、またはネットワーク経由でのデータベースへの書き込みなどの外部システムにプッシュする必要があります。関数 func は、ストリーミングアプリケーションを実行しているドライバープロセスで実行され、通常、ストリーミング RDD の計算を強制する RDD アクションが含まれることに注意してください。

foreachRDD の使用における設計パターン

dstream.foreachRDD は、データを外部システムに送信できる強力なプリミティブです。ただし、このプリミティブを正しく効率的に使用する方法を理解することが重要です。回避すべき一般的な間違いを以下に示します。

多くの場合、外部システムへのデータの書き込みには、接続オブジェクト(リモートサーバーへの TCP 接続など)を作成し、それを使用してリモートシステムにデータを送信する必要があります。この目的のために、開発者は誤って Spark ドライバーで接続オブジェクトを作成し、Spark ワーカーでそれを使用して RDD にレコードを保存しようとする可能性があります。たとえば(Scala で)、

def sendRecord(rdd):
    connection = createNewConnection()  # executed at the driver
    rdd.foreach(lambda record: connection.send(record))
    connection.close()

dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}
dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

これは、接続オブジェクトをシリアル化してドライバーからワーカーに送信する必要があるため、正しくありません。このような接続オブジェクトは、マシン間で転送できることはほとんどありません。このエラーは、シリアル化エラー(接続オブジェクトがシリアル化できない)、初期化エラー(接続オブジェクトをワーカーで初期化する必要がある)などとして現れる可能性があります。正しい解決策は、ワーカーで接続オブジェクトを作成することです。

ただし、これにより、別のよくある間違いである、レコードごとに新しい接続を作成することにつながる可能性があります。たとえば、

def sendRecord(record):
    connection = createNewConnection()
    connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

通常、接続オブジェクトの作成には時間とリソースのオーバーヘッドがあります。したがって、レコードごとに接続オブジェクトを作成および破棄すると、不必要に高いオーバーヘッドが発生し、システム全体の処理量を大幅に低下させる可能性があります。より良い解決策は、rdd.foreachPartition を使用することです。つまり、単一の接続オブジェクトを作成し、その接続を使用して RDD パーティション内のすべてのレコードを送信します。

def sendPartition(iter):
    connection = createNewConnection()
    for record in iter:
        connection.send(record)
    connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

これにより、多数のレコードにわたって接続作成のオーバーヘッドが償却されます。

最後に、複数の RDD/バッチ間で接続オブジェクトを再利用することで、これをさらに最適化できます。複数のバッチの RDD が外部システムにプッシュされるときに再利用できる接続オブジェクトの静的プールを維持することができ、オーバーヘッドをさらに削減できます。

def sendPartition(iter):
    # ConnectionPool is a static, lazily initialized pool of connections
    connection = ConnectionPool.getConnection()
    for record in iter:
        connection.send(record)
    # return to the pool for future reuse
    ConnectionPool.returnConnection(connection)

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

プール内の接続は、オンデマンドで遅延作成され、しばらく使用されていない場合はタイムアウトする必要があることに注意してください。これにより、外部システムへのデータの最も効率的な送信が実現されます。

その他の注意点

DataFrame と SQL の操作

ストリーミングデータで DataFrames と SQL 操作を簡単に使用できます。StreamingContext が使用している SparkContext を使用して SparkSession を作成する必要があります。さらに、これはドライバーの障害時に再起動できるように行う必要があります。これは、SparkSession の遅延インスタンス化されたシングルトンインスタンスを作成することによって行われます。次の例にこれを示します。以前の 単語数カウントの例を変更して、DataFrames と SQL を使用して単語数を生成します。各 RDD は DataFrame に変換され、一時テーブルとして登録され、SQL を使用してクエリされます。

# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]

...

# DataFrame operations inside your streaming program

words = ... # DStream of strings

def process(time, rdd):
    print("========= %s =========" % str(time))
    try:
        # Get the singleton instance of SparkSession
        spark = getSparkSessionInstance(rdd.context.getConf())

        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = spark.createDataFrame(rowRdd)

        # Creates a temporary view using the DataFrame
        wordsDataFrame.createOrReplaceTempView("words")

        # Do word count on table using SQL and print it
        wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
        wordCountsDataFrame.show()
    except:
        pass

words.foreachRDD(process)

完全な ソースコード を参照してください。

/** DataFrame operations inside your streaming program */

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // Get the singleton instance of SparkSession
  val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
  import spark.implicits._

  // Convert RDD[String] to DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // Create a temporary view
  wordsDataFrame.createOrReplaceTempView("words")

  // Do word count on DataFrame using SQL and print it
  val wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()
}

完全な ソースコード を参照してください。

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
  private String word;

  public String getWord() {
    return word;
  }

  public void setWord(String word) {
    this.word = word;
  }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ...

words.foreachRDD((rdd, time) -> {
  // Get the singleton instance of SparkSession
  SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

  // Convert RDD[String] to RDD[case class] to DataFrame
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
    JavaRow record = new JavaRow();
    record.setWord(word);
    return record;
  });
  DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

  // Creates a temporary view using the DataFrame
  wordsDataFrame.createOrReplaceTempView("words");

  // Do word count on table using SQL and print it
  DataFrame wordCountsDataFrame =
    spark.sql("select word, count(*) as total from words group by word");
  wordCountsDataFrame.show();
});

完全な ソースコード を参照してください。

別のスレッド(つまり、実行中の StreamingContext に対して非同期)からストリーミングデータで定義されたテーブルに対して SQL クエリを実行することもできます。クエリが実行できるように、StreamingContext が十分な量のストリーミングデータを記憶するように設定してください。それ以外の場合、非同期 SQL クエリを認識しない StreamingContext は、クエリが完了する前に古いストリーミングデータを削除します。たとえば、最後のバッチをクエリしたいが、クエリの実行に 5 分かかる場合は、streamingContext.remember(Minutes(5)) (Scala の場合、または他の言語で同等のもの)を呼び出します。

DataFrames の詳細については、DataFrames と SQL ガイドを参照してください。


MLlib の操作

MLlib で提供されている機械学習アルゴリズムを簡単に使用することもできます。まず、ストリーミング機械学習アルゴリズム(例:ストリーミング線形回帰ストリーミング K 平均法など)があり、ストリーミングデータから同時に学習し、ストリーミングデータにモデルを適用できます。これらに加えて、はるかに大規模な機械学習アルゴリズムのクラスについては、オフラインで(つまり、過去のデータを使用して)学習モデルを学習し、ストリーミングデータでオンラインでモデルを適用できます。詳細については、MLlib ガイドを参照してください。


キャッシュ/永続化

RDDと同様に、DStreamも開発者がストリームのデータをメモリに永続化することを許可します。つまり、DStreamでpersist()メソッドを使用すると、そのDStreamのすべてのRDDが自動的にメモリに永続化されます。これは、DStreamのデータが複数回計算される場合(例えば、同じデータに対して複数の操作を行う場合)に便利です。reduceByWindowreduceByKeyAndWindowのようなウィンドウベースの操作や、updateStateByKeyのような状態ベースの操作では、これは暗黙的に当てはまります。したがって、ウィンドウベースの操作によって生成されたDStreamは、開発者がpersist()を呼び出さなくても、自動的にメモリに永続化されます。

ネットワーク経由でデータを受信する入力ストリーム(Kafka、ソケットなど)の場合、デフォルトの永続化レベルは、フォールトトレランスのためにデータを2つのノードに複製するように設定されています。

RDDとは異なり、DStreamのデフォルトの永続化レベルでは、データはメモリにシリアル化された状態で保持されることに注意してください。これについては、パフォーマンスチューニングのセクションでさらに詳しく説明します。異なる永続化レベルに関する詳細については、Sparkプログラミングガイドを参照してください。


チェックポイント

ストリーミングアプリケーションは24時間365日稼働する必要があるため、アプリケーションロジックとは無関係な障害(システム障害、JVMクラッシュなど)に対して回復力がある必要があります。これを可能にするために、Spark Streamingは障害から回復できるように、フォールトトレラントなストレージシステムに十分な情報をチェックポイントする必要があります。チェックポイントされるデータには、2つのタイプがあります。

要約すると、メタデータチェックポイントは主にドライバ障害からの回復に必要ですが、データまたはRDDチェックポイントは、ステートフル変換が使用されている場合は、基本的な機能にも必要です。

チェックポイントを有効にするタイミング

次の要件のいずれかを持つアプリケーションでは、チェックポイントを有効にする必要があります。

前述のステートフル変換を使用しない単純なストリーミングアプリケーションは、チェックポイントを有効にせずに実行できることに注意してください。その場合、ドライバの障害からの回復も部分的になります(受信したが処理されていないデータの一部が失われる可能性があります)。これは多くの場合許容範囲であり、多くの人がこの方法でSpark Streamingアプリケーションを実行しています。非Hadoop環境のサポートは、将来改善される予定です。

チェックポイントを構成する方法

チェックポイントは、チェックポイント情報が保存されるフォールトトレラントで信頼性の高いファイルシステム(例:HDFS、S3など)にディレクトリを設定することで有効にできます。これは、streamingContext.checkpoint(checkpointDirectory)を使用することで行われます。これにより、前述のステートフル変換を使用できるようになります。さらに、アプリケーションをドライバ障害から回復させたい場合は、ストリーミングアプリケーションを次のように動作するように書き直す必要があります。

この動作は、StreamingContext.getOrCreateを使用することで簡単になります。これは次のように使用されます。

# Function to create and setup a new StreamingContext
def functionToCreateContext():
    sc = SparkContext(...)  # new context
    ssc = StreamingContext(...)
    lines = ssc.socketTextStream(...)  # create DStreams
    ...
    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc

# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...

# Start the context
context.start()
context.awaitTermination()

checkpointDirectoryが存在する場合、コンテキストはチェックポイントデータから再作成されます。ディレクトリが存在しない場合(つまり、初めて実行する場合)、関数functionToCreateContextが呼び出されて、新しいコンテキストが作成され、DStreamが設定されます。Pythonの例recoverable_network_wordcount.pyを参照してください。この例では、ネットワークデータの単語カウントをファイルに追加します。

また、StreamingContext.getOrCreate(checkpointDirectory, None)を使用して、チェックポイントデータから明示的にStreamingContextを作成し、計算を開始することもできます。

この動作は、StreamingContext.getOrCreateを使用することで簡単になります。これは次のように使用されます。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
  val ssc = new StreamingContext(...)   // new context
  val lines = ssc.socketTextStream(...) // create DStreams
  ...
  ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
  ssc
}

// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start()
context.awaitTermination()

checkpointDirectoryが存在する場合、コンテキストはチェックポイントデータから再作成されます。ディレクトリが存在しない場合(つまり、初めて実行する場合)、関数functionToCreateContextが呼び出されて、新しいコンテキストが作成され、DStreamが設定されます。Scalaの例RecoverableNetworkWordCountを参照してください。この例では、ネットワークデータの単語カウントをファイルに追加します。

この動作は、JavaStreamingContext.getOrCreateを使用することで簡単になります。これは次のように使用されます。

// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
  @Override public JavaStreamingContext create() {
    JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
    JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
    ...
    jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
    return jssc;
  }
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);

// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...

// Start the context
context.start();
context.awaitTermination();

checkpointDirectoryが存在する場合、コンテキストはチェックポイントデータから再作成されます。ディレクトリが存在しない場合(つまり、初めて実行する場合)、関数contextFactoryが呼び出されて、新しいコンテキストが作成され、DStreamが設定されます。Javaの例JavaRecoverableNetworkWordCountを参照してください。この例では、ネットワークデータの単語カウントをファイルに追加します。

getOrCreateを使用することに加えて、ドライバプロセスが障害時に自動的に再起動されるようにする必要もあります。これは、アプリケーションの実行に使用されるデプロイインフラストラクチャによってのみ実行できます。これについては、デプロイメントのセクションでさらに詳しく説明します。

RDDのチェックポイントは、信頼できるストレージへの保存コストがかかることに注意してください。これにより、RDDがチェックポイントされるバッチの処理時間が増加する可能性があります。したがって、チェックポイントの間隔を慎重に設定する必要があります。バッチサイズが小さい場合(たとえば1秒)、すべてのバッチをチェックポイントすると、操作のスループットが大幅に低下する可能性があります。逆に、チェックポイントの間隔が短すぎると、リネージとタスクサイズが増加し、悪影響を及ぼす可能性があります。RDDチェックポイントが必要なステートフル変換の場合、デフォルトの間隔は少なくとも10秒のバッチ間隔の倍数です。これはdstream.checkpoint(checkpointInterval)を使用して設定できます。通常、DStreamの5〜10スライディング間隔のチェックポイント間隔は、試してみるのに適切な設定です。


アキュムレータ、ブロードキャスト変数、およびチェックポイント

アキュムレータブロードキャスト変数は、Spark Streamingのチェックポイントから回復できません。チェックポイントを有効にして、アキュムレータまたはブロードキャスト変数も使用する場合は、アキュムレータブロードキャスト変数の遅延インスタンス化されたシングルトンインスタンスを作成して、ドライバが障害後に再起動されたときに再インスタンス化できるようにする必要があります。これは次の例に示されています。

def getWordExcludeList(sparkContext):
    if ("wordExcludeList" not in globals()):
        globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
    return globals()["wordExcludeList"]

def getDroppedWordsCounter(sparkContext):
    if ("droppedWordsCounter" not in globals()):
        globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
    return globals()["droppedWordsCounter"]

def echo(time, rdd):
    # Get or register the excludeList Broadcast
    excludeList = getWordExcludeList(rdd.context)
    # Get or register the droppedWordsCounter Accumulator
    droppedWordsCounter = getDroppedWordsCounter(rdd.context)

    # Use excludeList to drop words and use droppedWordsCounter to count them
    def filterFunc(wordCount):
        if wordCount[0] in excludeList.value:
            droppedWordsCounter.add(wordCount[1])
            False
        else:
            True

    counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())

wordCounts.foreachRDD(echo)

完全なソースコードを参照してください。

object WordExcludeList {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordExcludeList = Seq("a", "b", "c")
          instance = sc.broadcast(wordExcludeList)
        }
      }
    }
    instance
  }
}

object DroppedWordsCounter {

  @volatile private var instance: LongAccumulator = null

  def getInstance(sc: SparkContext): LongAccumulator = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.longAccumulator("DroppedWordsCounter")
        }
      }
    }
    instance
  }
}

wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
  // Get or register the excludeList Broadcast
  val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use excludeList to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (excludeList.value.contains(word)) {
      droppedWordsCounter.add(count)
      false
    } else {
      true
    }
  }.collect().mkString("[", ", ", "]")
  val output = "Counts at time " + time + " " + counts
})

完全なソースコードを参照してください。

class JavaWordExcludeList {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordExcludeList.class) {
        if (instance == null) {
          List<String> wordExcludeList = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordExcludeList);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("DroppedWordsCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the excludeList Broadcast
  Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use excludeList to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (excludeList.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

完全なソースコードを参照してください。


アプリケーションのデプロイ

このセクションでは、Spark Streamingアプリケーションをデプロイする手順について説明します。

要件

Spark Streamingアプリケーションを実行するには、以下が必要です。

アプリケーションコードのアップグレード

実行中の Spark Streaming アプリケーションを新しいアプリケーションコードでアップグレードする必要がある場合、2 つのメカニズムが考えられます。


アプリケーションのモニタリング

Spark の監視機能に加えて、Spark Streaming に固有の追加機能があります。StreamingContext が使用されると、Spark Web UI に追加の Streaming タブが表示され、実行中のレシーバー(レシーバーがアクティブかどうか、受信したレコード数、レシーバーエラーなど)および完了したバッチ(バッチ処理時間、キューイング遅延など)に関する統計情報が表示されます。これは、ストリーミングアプリケーションの進捗状況を監視するために使用できます。

Web UI の次の 2 つのメトリクスは特に重要です

バッチ処理時間が常にバッチ間隔よりも長く、キューイング遅延が増加し続ける場合は、システムがバッチを生成される速度で処理できず、遅れていることを示します。その場合は、バッチ処理時間の削減を検討してください。

Spark Streaming プログラムの進捗状況は、StreamingListener インターフェイスを使用して監視することもできます。これにより、レシーバーのステータスと処理時間を確認できます。これは開発者 API であり、将来的に改善される(つまり、より多くの情報が報告される)可能性があることに注意してください。



パフォーマンスチューニング

クラスターで Spark Streaming アプリケーションを最大限に活用するには、少し調整が必要です。このセクションでは、アプリケーションのパフォーマンスを向上させるために調整できるパラメーターと構成について説明します。大まかに言って、次の 2 つのことを考慮する必要があります。

  1. クラスターリソースを効率的に使用して、各データバッチの処理時間を短縮します。

  2. データのバッチが受信される速度で処理できるように、適切なバッチサイズを設定します(つまり、データ処理がデータ取り込みに追いつくようにします)。

バッチ処理時間の短縮

Spark で実行できる多くの最適化により、各バッチの処理時間を最小限に抑えることができます。これらについては、チューニングガイドで詳しく説明しています。このセクションでは、最も重要なものをいくつか紹介します。

データ受信の並列処理レベル

ネットワーク経由(Kafka、ソケットなど)でデータを受信するには、データをデシリアライズして Spark に格納する必要があります。データ受信がシステムのボトルネックになる場合は、データ受信を並列化することを検討してください。各入力 DStream は、単一のデータストリームを受信する単一のレシーバー(ワーカーマシンで実行)を作成することに注意してください。したがって、複数のデータストリームを受信するには、複数の入力 DStream を作成し、ソースからデータストリームの異なるパーティションを受信するように構成することで実現できます。たとえば、2 つのデータトピックを受信する単一の Kafka 入力 DStream は、それぞれ 1 つのトピックのみを受信する 2 つの Kafka 入力ストリームに分割できます。これにより、2 つのレシーバーが実行され、データが並列で受信されるため、全体的なスループットが向上します。これらの複数の DStream を統合して、単一の DStream を作成できます。次に、単一の入力 DStream に適用されていた変換を、統合されたストリームに適用できます。これは次のように行われます。

numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

考慮する必要があるもう 1 つのパラメーターは、レシーバーのブロック間隔です。これは、設定パラメーター spark.streaming.blockInterval によって決定されます。ほとんどのレシーバーの場合、受信したデータは、Spark のメモリに格納する前に、データのブロックにまとめられます。各バッチ内のブロックの数は、マップのような変換で受信したデータを処理するために使用されるタスクの数を決定します。バッチごとのレシーバーごとのタスク数は、ほぼ (バッチ間隔 / ブロック間隔) になります。たとえば、200 ミリ秒のブロック間隔では、2 秒のバッチごとに 10 個のタスクが作成されます。タスクの数が少なすぎる(つまり、マシンごとのコア数よりも少ない)場合、使用可能なすべてのコアがデータの処理に使用されないため、非効率になります。特定のバッチ間隔のタスク数を増やすには、ブロック間隔を短縮します。ただし、ブロック間隔の推奨される最小値は約 50 ミリ秒であり、これ未満にするとタスク起動のオーバーヘッドが問題になる可能性があります。

複数の入力ストリーム/レシーバーを使用してデータを受信する代わりに、入力データストリームを明示的に再パーティション化する(inputStream.repartition(<パーティション数>) を使用)方法もあります。これにより、受信したデータのバッチが、さらに処理を行う前に、クラスター内の指定された数のマシンに分散されます。

ダイレクトストリームについては、Spark Streaming + Kafka 統合ガイドを参照してください。

データ処理の並列処理レベル

計算の任意段階で使用される並列タスクの数が十分でない場合、クラスターリソースが十分に活用されない可能性があります。たとえば、reduceByKeyreduceByKeyAndWindow などの分散削減操作の場合、並列タスクのデフォルト数は spark.default.parallelism 設定プロパティによって制御されます。並列処理のレベルを引数として渡す(PairDStreamFunctions ドキュメントを参照)、または spark.default.parallelism 設定プロパティを設定してデフォルトを変更できます。

データのシリアル化

データシリアル化のオーバーヘッドは、シリアル化形式を調整することで削減できます。ストリーミングの場合、シリアル化されているデータには 2 種類あります。

どちらの場合も、Kryoシリアライゼーションを使用すると、CPUとメモリの両方のオーバーヘッドを削減できます。詳細については、Sparkチューニングガイドを参照してください。Kryoの場合、カスタムクラスの登録、およびオブジェクト参照の追跡の無効化を検討してください(構成ガイドのKryo関連の構成を参照)。

ストリーミングアプリケーションで保持する必要のあるデータ量が少ない特定のケースでは、過剰なGCオーバーヘッドを発生させることなく、データ(両方のタイプ)をデシリアライズされたオブジェクトとして永続化することが可能になる場合があります。例えば、数秒のバッチ間隔を使用しており、ウィンドウ処理を使用していない場合は、ストレージレベルを明示的に設定することで、永続化されたデータのシリアライゼーションを無効にすることができます。これにより、シリアライゼーションによるCPUオーバーヘッドが削減され、GCオーバーヘッドを過度に増やすことなく、パフォーマンスが向上する可能性があります。

タスク起動のオーバーヘッド

1秒あたりに起動されるタスクの数が多い場合(例えば、1秒あたり50以上)、エグゼキュータにタスクを送信するオーバーヘッドが大きくなり、1秒未満のレイテンシを達成することが難しくなる可能性があります。このオーバーヘッドは、次の変更によって削減できます。

これらの変更により、バッチ処理時間が数百ミリ秒短縮され、1秒未満のバッチサイズが実現可能になります。


適切なバッチ間隔の設定

クラスター上で実行されているSpark Streamingアプリケーションが安定するためには、システムは受信したデータを高速で処理できる必要があります。言い換えれば、データのバッチは生成されるのと同じ速さで処理される必要があります。これがアプリケーションに当てはまるかどうかは、ストリーミングWeb UIで処理時間を監視することで確認できます。ここで、バッチ処理時間はバッチ間隔よりも短くする必要があります。

ストリーミング計算の性質によっては、使用されるバッチ間隔が、固定されたクラスターリソース上でアプリケーションが維持できるデータレートに大きな影響を与える可能性があります。例えば、以前のWordCountNetworkの例を考えてみましょう。特定のデータレートの場合、システムは2秒ごとに単語数を報告することができます(つまり、バッチ間隔は2秒)が、500ミリ秒ごとにはできません。したがって、バッチ間隔は、本番環境で予想されるデータレートを維持できるように設定する必要があります。

アプリケーションに適したバッチサイズを見つけるための適切なアプローチは、控えめなバッチ間隔(例えば、5〜10秒)と低いデータレートでテストすることです。システムがデータレートに追いつくことができるかどうかを確認するには、処理された各バッチで発生するエンドツーエンドの遅延の値を確認できます(Sparkドライバーのlog4jログで「合計遅延」を探すか、StreamingListenerインターフェイスを使用します)。遅延がバッチサイズと同程度に維持されている場合、システムは安定しています。それ以外の場合、遅延が継続的に増加している場合は、システムが追いつくことができず、したがって不安定であることを意味します。安定した構成が分かったら、データレートを増やすか、バッチサイズを減らすことを試みることができます。一時的なデータレートの増加による遅延の一時的な増加は、遅延が低い値(つまり、バッチサイズ未満)に戻る限り、問題ないことに注意してください。


メモリチューニング

Sparkアプリケーションのメモリ使用量とGCの動作のチューニングについては、チューニングガイドで詳細に説明されています。それを読むことを強くお勧めします。このセクションでは、特にSpark Streamingアプリケーションのコンテキストにおけるいくつかのチューニングパラメータについて説明します。

Spark Streamingアプリケーションに必要なクラスターメモリの量は、使用される変換の種類に大きく依存します。例えば、過去10分間のデータのウィンドウ処理を使用したい場合は、クラスターは10分間のデータをメモリに保持するのに十分なメモリを持っている必要があります。または、多数のキーを持つupdateStateByKeyを使用したい場合は、必要なメモリが大きくなります。逆に、単純なmap-filter-store操作を実行したい場合は、必要なメモリは少なくなります。

一般に、レシーバーを介して受信したデータはStorageLevel.MEMORY_AND_DISK_SER_2で保存されるため、メモリに収まらないデータはディスクにスピルオーバーします。これにより、ストリーミングアプリケーションのパフォーマンスが低下する可能性があるため、ストリーミングアプリケーションに必要な十分なメモリを提供することをお勧めします。小規模でメモリ使用量を試して、それに応じて見積もるのが最善です。

メモリチューニングのもう1つの側面は、ガベージコレクションです。低レイテンシを必要とするストリーミングアプリケーションの場合、JVMガベージコレクションによって引き起こされる大きな一時停止は望ましくありません。

メモリ使用量とGCオーバーヘッドを調整するのに役立ついくつかのパラメータがあります。


覚えておくべき重要なポイント


フォールトトレランスのセマンティクス

このセクションでは、障害が発生した場合のSpark Streamingアプリケーションの動作について説明します。

背景

Spark Streamingによって提供されるセマンティクスを理解するために、SparkのRDDの基本的なフォールトトレランスセマンティクスを思い出しましょう。

  1. RDDは、不変で、決定論的に再計算可能な分散データセットです。各RDDは、フォールトトレラントな入力データセット上で使用され、それを作成した決定論的な操作の系列を記憶しています。
  2. RDDのパーティションがワーカーノードの障害によって失われた場合、そのパーティションは、操作の系統(リネージ)を使用して、元のフォールトトレラントなデータセットから再計算できます。
  3. RDDのすべての変換が決定論的であると仮定すると、最終的な変換されたRDDのデータは、Sparkクラスタ内の障害に関係なく、常に同じになります。

Sparkは、HDFSやS3のようなフォールトトレラントなファイルシステム上のデータを操作します。したがって、フォールトトレラントなデータから生成されたすべてのRDDもフォールトトレラントです。ただし、Spark Streamingの場合は、ほとんどの場合、データがネットワーク経由で受信されるため(fileStreamが使用されている場合を除く)、そうではありません。生成されたすべてのRDDに対して同じフォールトトレランスプロパティを実現するために、受信データはクラスタ内のワーカーノードの複数のSparkエグゼキュータ間で複製されます(デフォルトの複製係数は2)。これにより、障害が発生した場合に復旧する必要がある2種類のデータがシステム内に存在することになります。

  1. 受信および複製されたデータ - このデータは、他のノードのいずれかにコピーが存在するため、単一のワーカーノードの障害を生き残ります。
  2. 受信されたが複製のためにバッファリングされたデータ - これは複製されていないため、このデータを復旧する唯一の方法は、ソースから再度取得することです。

さらに、考慮すべき2種類の障害があります。

  1. ワーカーノードの障害 - エグゼキュータを実行しているワーカーノードのいずれかが障害を起こす可能性があり、それらのノード上のすべてのインメモリデータが失われます。レシーバーが障害が発生したノードで実行されていた場合、それらのバッファリングされたデータは失われます。
  2. ドライバノードの障害 - Spark Streamingアプリケーションを実行しているドライバノードが障害を起こした場合、明らかにSparkContextが失われ、すべてのエグゼキュータとそのインメモリデータが失われます。

この基本的な知識を踏まえて、Spark Streamingのフォールトトレランスのセマンティクスを理解しましょう。

定義

ストリーミングシステムのセマンティクスは、多くの場合、システムが各レコードを処理できる回数で捉えられます。システムが、あらゆる可能な動作条件(障害などにもかかわらず)の下で提供できる保証には、3つのタイプがあります。

  1. 最大1回:各レコードは1回処理されるか、まったく処理されません。
  2. 少なくとも1回:各レコードは1回以上処理されます。これは、データが失われないことを保証するため、最大1回よりも強力です。ただし、重複がある可能性があります。
  3. 正確に1回:各レコードは正確に1回処理されます - データが失われることはなく、データが複数回処理されることもありません。これは明らかに3つの中で最も強力な保証です。

基本的なセマンティクス

ストリーム処理システムでは、大まかに言って、データを処理する際に3つのステップがあります。

  1. データを受信する:データはレシーバーまたはその他の方法を使用してソースから受信されます。

  2. データを変換する:受信したデータは、DStreamおよびRDD変換を使用して変換されます。

  3. データをプッシュする:最終的に変換されたデータは、ファイルシステム、データベース、ダッシュボードなどの外部システムにプッシュされます。

ストリーミングアプリケーションがエンドツーエンドの正確に1回の保証を達成する必要がある場合、各ステップは正確に1回の保証を提供する必要があります。つまり、各レコードは正確に1回受信され、正確に1回変換され、ダウンストリームシステムに正確に1回プッシュされる必要があります。Spark Streamingのコンテキストにおけるこれらのステップのセマンティクスを理解しましょう。

  1. データを受信する:さまざまな入力ソースは、異なる保証を提供します。これについては、次のサブセクションで詳しく説明します。

  2. データを変換する:RDDが提供する保証のおかげで、受信したすべてのデータは正確に1回処理されます。障害が発生した場合でも、受信した入力データにアクセスできる限り、最終的に変換されたRDDは常に同じ内容になります。

  3. データをプッシュする:出力操作は、デフォルトで少なくとも1回のセマンティクスを保証します。これは、出力操作のタイプ(べき等であるか否か)とダウンストリームシステムのセマンティクス(トランザクションをサポートするか否か)に依存するためです。ただし、ユーザーは、正確に1回のセマンティクスを実現するために、独自のトランザクションメカニズムを実装できます。これについては、セクションの後半で詳しく説明します。

受信データのセマンティクス

さまざまな入力ソースは、少なくとも1回から正確に1回までのさまざまな保証を提供します。詳細については、お読みください。

ファイルを使用する場合

すべての入力データがHDFSのようなフォールトトレラントなファイルシステムにすでに存在する場合、Spark Streamingは障害から常に回復し、すべてのデータを処理できます。これにより、正確に1回のセマンティクスが得られます。つまり、何が障害を起こしても、すべてのデータが正確に1回処理されます。

レシーバーベースのソースを使用する場合

レシーバーに基づく入力ソースの場合、フォールトトレランスのセマンティクスは、障害シナリオとレシーバーのタイプの両方に依存します。前に説明したように、レシーバーには2つのタイプがあります

  1. 信頼できるレシーバー - これらのレシーバーは、受信したデータが複製されたことを確認した後にのみ、信頼できるソースを認識します。このようなレシーバーが障害を起こした場合、ソースはバッファリングされた(複製されていない)データに対して確認応答を受け取りません。したがって、レシーバーが再起動されると、ソースはデータを再送信し、障害によってデータが失われることはありません。
  2. 信頼できないレシーバー - このようなレシーバーは確認応答を送信しないため、ワーカーまたはドライバの障害により障害が発生した場合にデータを失う可能性があります。

使用するレシーバーのタイプに応じて、次のセマンティクスを実現します。ワーカーノードが障害を起こした場合、信頼できるレシーバーではデータ損失はありません。信頼できないレシーバーでは、受信されたが複製されていないデータが失われる可能性があります。ドライバノードが障害を起こした場合、これらの損失に加えて、メモリ内で受信および複製された過去のすべてのデータが失われます。これは、状態を持つ変換の結果に影響を与えます。

この過去に受信したデータの損失を回避するために、Spark 1.2では、受信したデータをフォールトトレラントなストレージに保存する書き込み先行ログが導入されました。書き込み先行ログが有効で、信頼できるレシーバーを使用すると、データ損失はゼロになります。セマンティクスの観点から、少なくとも1回の保証を提供します。

次の表に、障害発生時のセマンティクスをまとめます。

デプロイメントシナリオ ワーカーの障害 ドライバの障害
Spark 1.1以前、または
書き込み先行ログなしのSpark 1.2以降
信頼できないレシーバーではバッファリングされたデータが失われる
信頼できるレシーバーではデータ損失なし
少なくとも1回のセマンティクス
信頼できないレシーバーではバッファリングされたデータが失われる
すべてのレシーバーで過去のデータが失われる
未定義のセマンティクス
書き込み先行ログ付きのSpark 1.2以降 信頼できるレシーバーではデータ損失なし
少なくとも1回のセマンティクス
信頼できるレシーバーとファイルではデータ損失なし
少なくとも1回のセマンティクス

Kafka Direct APIを使用する場合

Spark 1.3では、新しいKafka Direct APIを導入しました。これにより、すべてのKafkaデータがSpark Streamingによって正確に1回受信されることが保証されます。これに加えて、正確に1回の出力操作を実装すると、エンドツーエンドの正確に1回の保証を達成できます。このアプローチについては、Kafka統合ガイドでさらに説明します。

出力操作のセマンティクス

出力操作(foreachRDDなど)には、少なくとも1回のセマンティクスがあります。つまり、ワーカーの障害が発生した場合、変換されたデータが外部エンティティに複数回書き込まれる可能性があります。これは、saveAs***Files操作を使用してファイルシステムに保存する場合は許容されます(ファイルは単に同じデータで上書きされるため)が、正確に1回のセマンティクスを実現するには、追加の努力が必要になる場合があります。2つのアプローチがあります。



今後のステップ