Spark Streaming プログラミングガイド
注意
Spark Streaming は、Spark のストリーミングエンジンの前の世代のものです。Spark Streaming のアップデートはもうなく、レガシープロジェクトです。Spark には、構造化ストリーミングという、より新しく、使いやすいストリーミングエンジンがあります。ストリーミングアプリケーションとパイプラインには、Spark 構造化ストリーミングを使用する必要があります。構造化ストリーミングプログラミングガイドを参照してください。
概要
Spark Streaming は、コア Spark API の拡張であり、ライブデータストリームのスケーラブルで高スループット、フォールトトレラントなストリーム処理を可能にします。データは、Kafka、Kinesis、TCP ソケットなど、多くのソースから取り込むことができ、map
、reduce
、join
、window
のような高レベルの関数で表現された複雑なアルゴリズムを使用して処理できます。最後に、処理されたデータは、ファイルシステム、データベース、およびライブダッシュボードにプッシュできます。実際、Spark の機械学習およびグラフ処理アルゴリズムをデータストリームに適用できます。
内部的には、次のように動作します。Spark Streaming はライブ入力データストリームを受信し、データをバッチに分割します。その後、Spark エンジンがバッチを処理して、バッチで結果の最終ストリームを生成します。
Spark Streaming は、離散化ストリームまたはDStreamと呼ばれる高レベルの抽象化を提供します。これは、データの連続ストリームを表します。DStream は、Kafka や Kinesis などのソースからの入力データストリームから作成するか、他の DStream で高レベルの操作を適用することによって作成できます。内部的には、DStream はRDDのシーケンスとして表されます。
このガイドでは、DStream を使用して Spark Streaming プログラムの記述を開始する方法を示します。Spark Streaming プログラムは Scala、Java、または Python (Spark 1.2 で導入) で記述できます。これらはすべてこのガイドで紹介されています。このガイド全体で、さまざまな言語のコードスニペットを選択できるタブが表示されます。
注意: Python では、いくつかの API が異なっているか、利用できないものがあります。このガイド全体で、これらの違いを強調するPython APIタグが表示されます。
簡単な例
独自の Spark Streaming プログラムの記述方法の詳細に入る前に、簡単な Spark Streaming プログラムがどのようなものかを見てみましょう。TCP ソケットでリッスンしているデータサーバーから受信したテキストデータの単語数をカウントするとします。必要なのは次のとおりです。
まず、すべてのストリーミング機能のメインエントリポイントであるStreamingContextをインポートします。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 1)
このコンテキストを使用して、ホスト名(例:localhost
)とポート(例:9999
)として指定された TCP ソースからのストリーミングデータを表す DStream を作成できます。
# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 9999)
この lines
DStream は、データサーバーから受信されるデータのストリームを表します。この DStream の各レコードはテキストの行です。次に、行をスペースで単語に分割します。
# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))
flatMap
は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する 1 対多の DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words
DStream として表されます。次に、これらの単語をカウントします。
# Count each word in each batch
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
# Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.pprint()
words
DStream は、さらに (word, 1)
ペアの DStream にマップされ (1 対 1 の変換)、その後、各データのバッチ内の単語の頻度を取得するために縮小されます。最後に、wordCounts.pprint()
は、毎秒生成されたカウントの一部を出力します。
これらの行が実行されると、Spark Streaming は、起動時に実行する計算のみを設定し、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に次を呼び出します。
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
完全なコードは、Spark Streaming の例であるNetworkWordCountにあります。
まず、Spark Streaming クラスの名前と、StreamingContext から環境へのいくつかの暗黙的な変換をインポートして、他の必要なクラス (DStream など) に便利なメソッドを追加します。StreamingContextは、すべてのストリーミング機能のメインエントリポイントです。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
このコンテキストを使用して、ホスト名(例:localhost
)とポート(例:9999
)として指定された TCP ソースからのストリーミングデータを表す DStream を作成できます。
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
この lines
DStream は、データサーバーから受信されるデータのストリームを表します。この DStream の各レコードはテキストの行です。次に、行をスペース文字で単語に分割します。
// Split each line into words
val words = lines.flatMap(_.split(" "))
flatMap
は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する 1 対多の DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words
DStream として表されます。次に、これらの単語をカウントします。
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
words
DStream は、さらに (word, 1)
ペアの DStream にマップされ (1 対 1 の変換)、その後、各データのバッチ内の単語の頻度を取得するために縮小されます。最後に、wordCounts.print()
は、毎秒生成されたカウントの一部を出力します。
これらの行が実行されると、Spark Streaming は、起動時に実行する計算のみを設定し、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に次を呼び出します。
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
完全なコードは、Spark Streaming の例であるNetworkWordCountにあります。
まず、すべてのストリーミング機能のメインエントリポイントであるJavaStreamingContextオブジェクトを作成します。2 つの実行スレッドと 1 秒のバッチ間隔を持つローカル StreamingContext を作成します。
import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
このコンテキストを使用して、ホスト名(例:localhost
)とポート(例:9999
)として指定された TCP ソースからのストリーミングデータを表す DStream を作成できます。
// Create a DStream that will connect to hostname:port, like localhost:9999
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
この lines
DStream は、データサーバーから受信されるデータのストリームを表します。このストリームの各レコードはテキストの行です。次に、行をスペースで単語に分割します。
// Split each line into words
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
flatMap
は、ソース DStream の各レコードから複数の新しいレコードを生成することにより、新しい DStream を作成する DStream 操作です。この場合、各行は複数の単語に分割され、単語のストリームは words
DStream として表されます。FlatMapFunction オブジェクトを使用して変換を定義したことに注意してください。今後明らかにするように、Java API には DStream 変換の定義に役立つこのような便利なクラスが多数あります。
次に、これらの単語をカウントします。
// Count each word in each batch
JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print();
words
DStream は、PairFunction オブジェクトを使用して、さらに (word, 1)
ペアの DStream にマップされます (1 対 1 の変換)。次に、Function2 オブジェクトを使用して、各データのバッチ内の単語の頻度を取得するために削減されます。最後に、wordCounts.print()
は、毎秒生成されたカウントの一部を出力します。
これらの行が実行されると、Spark Streaming は、起動後に実行する計算のみを設定し、実際の処理はまだ開始されていないことに注意してください。すべての変換が設定された後に処理を開始するには、最後に start
メソッドを呼び出します。
jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate
完全なコードは、Spark Streaming の例であるJavaNetworkWordCountにあります。
すでにダウンロードしてビルドした場合、この例を次のように実行できます。最初に、次を使用して、データサーバーとして Netcat (ほとんどの Unix ライクなシステムで見られる小さなユーティリティ) を実行する必要があります。
$ nc -lk 9999
次に、別のターミナルで、次を使用して例を起動できます。
$ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999
$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
その後、netcat サーバーを実行しているターミナルで入力されたすべての行がカウントされ、毎秒画面に出力されます。次のようになります。
|
|
基本的な概念
次に、簡単な例を超えて、Spark Streamingの基本について詳しく説明します。
リンク
Sparkと同様に、Spark StreamingはMaven Centralを通じて利用できます。独自のSpark Streamingプログラムを作成するには、SBTまたはMavenプロジェクトに次の依存関係を追加する必要があります。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.5.1</version>
<scope>provided</scope>
</dependency>
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.5.1" % "provided"
Spark StreamingコアAPIにはないKafkaやKinesisなどのソースからデータを取り込むには、対応するアーティファクト spark-streaming-xyz_2.12
を依存関係に追加する必要があります。例えば、一般的なものは次のとおりです。
ソース | アーティファクト |
---|---|
Kafka | spark-streaming-kafka-0-10_2.12 |
Kinesis | spark-streaming-kinesis-asl_2.12 [Amazon Software License] |
最新のリストについては、サポートされているソースとアーティファクトの完全なリストについて、Mavenリポジトリを参照してください。
StreamingContext の初期化
Spark Streamingプログラムを初期化するには、すべてのSpark Streaming機能のメインエントリポイントであるStreamingContextオブジェクトを作成する必要があります。
StreamingContextオブジェクトは、SparkContextオブジェクトから作成できます。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
appName
パラメータは、クラスタUIに表示するアプリケーションの名前です。master
は、Spark、Mesos、またはYARNクラスタのURL、またはローカルモードで実行するための特別な“local[*]”文字列です。実際には、クラスタで実行する場合、プログラムにmaster
をハードコードするのではなく、spark-submit
でアプリケーションを起動してそこで受け取るようにします。ただし、ローカルテストやユニットテストでは、“local[*]”を渡してSpark Streamingをインプロセスで実行できます(ローカルシステムのコア数を検出します)。
バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスタリソースに基づいて設定する必要があります。詳細については、「パフォーマンスチューニング」セクションを参照してください。
StreamingContextオブジェクトは、SparkConfオブジェクトから作成できます。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
パラメータは、クラスタUIに表示するアプリケーションの名前です。master
は、Spark、Mesos、Kubernetes、またはYARNクラスタのURL、またはローカルモードで実行するための特別な“local[*]”文字列です。実際には、クラスタで実行する場合、プログラムにmaster
をハードコードするのではなく、spark-submit
でアプリケーションを起動してそこで受け取るようにします。ただし、ローカルテストやユニットテストでは、“local[*]”を渡してSpark Streamingをインプロセスで実行できます(ローカルシステムのコア数を検出します)。これは内部的にSparkContext(すべてのSpark機能の開始点)を作成し、ssc.sparkContext
としてアクセスできることに注意してください。
バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスタリソースに基づいて設定する必要があります。詳細については、「パフォーマンスチューニング」セクションを参照してください。
StreamingContext
オブジェクトは、既存のSparkContext
オブジェクトから作成することもできます。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
JavaStreamingContextオブジェクトは、SparkConfオブジェクトから作成できます。
import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
appName
パラメータは、クラスタUIに表示するアプリケーションの名前です。master
は、Spark、Mesos、またはYARNクラスタのURL、またはローカルモードで実行するための特別な“local[*]”文字列です。実際には、クラスタで実行する場合、プログラムにmaster
をハードコードするのではなく、spark-submit
でアプリケーションを起動してそこで受け取るようにします。ただし、ローカルテストやユニットテストでは、“local[*]”を渡してSpark Streamingをインプロセスで実行できます。これは内部的にJavaSparkContext(すべてのSpark機能の開始点)を作成し、ssc.sparkContext
としてアクセスできることに注意してください。
バッチ間隔は、アプリケーションのレイテンシ要件と利用可能なクラスタリソースに基づいて設定する必要があります。詳細については、「パフォーマンスチューニング」セクションを参照してください。
JavaStreamingContext
オブジェクトは、既存のJavaSparkContext
から作成することもできます。
import org.apache.spark.streaming.api.java.*;
JavaSparkContext sc = ... //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
コンテキストが定義されたら、次のことを行う必要があります。
- 入力DStreamを作成して入力ソースを定義します。
- DStreamに変換および出力操作を適用して、ストリーミング計算を定義します。
streamingContext.start()
を使用して、データの受信と処理を開始します。streamingContext.awaitTermination()
を使用して、処理が停止される(手動またはエラーによる)のを待ちます。- 処理は、
streamingContext.stop()
を使用して手動で停止できます。
覚えておくべきポイント
- コンテキストが開始されると、新しいストリーミング計算を設定または追加することはできません。
- コンテキストが停止すると、再起動できません。
- 同時に、1つのJVMでアクティブにできるStreamingContextは1つだけです。
- StreamingContextで
stop()
を実行すると、SparkContextも停止します。StreamingContextのみを停止するには、stop()
のオプションパラメータstopSparkContext
をfalseに設定します。 - 前のStreamingContextが(SparkContextを停止せずに)停止されてから次のStreamingContextが作成される限り、SparkContextを再利用して複数のStreamingContextを作成できます。
離散化ストリーム (DStream)
Discretized StreamまたはDStreamは、Spark Streamingによって提供される基本的な抽象化です。これは、ソースから受信した入力データストリーム、または入力ストリームを変換して生成された処理済みデータストリームの連続ストリームを表します。内部的には、DStreamは、不変の分散データセットのSparkの抽象化である一連の連続したRDDによって表されます(詳細については、Sparkプログラミングガイドを参照してください)。DStream内の各RDDには、次の図に示すように、特定の期間からのデータが含まれています。
DStreamに適用された操作は、基になるRDDに対する操作に変換されます。たとえば、前の例の行のストリームを単語に変換する場合、flatMap
操作は、lines
DStreamの各RDDに適用され、words
DStreamのRDDを生成します。これを次の図に示します。
これらの基になるRDD変換は、Sparkエンジンによって計算されます。DStream操作はこれらの詳細のほとんどを隠し、開発者に便利な高レベルのAPIを提供します。これらの操作については、後のセクションで詳しく説明します。
入力 DStream とレシーバー
入力DStreamは、ストリーミングソースから受信した入力データのストリームを表すDStreamです。簡単な例では、lines
は、netcatサーバーから受信したデータストリームを表していたため、入力DStreamでした。すべての入力DStream(このセクションで後述するファイルストリームを除く)は、ソースからデータを受信し、処理のためにSparkのメモリに保存するReceiver(Scala doc、Java doc)オブジェクトに関連付けられています。
Spark Streamingは、2つのカテゴリの組み込みストリーミングソースを提供します。
- 基本ソース:StreamingContext APIで直接利用可能なソース。例:ファイルシステム、ソケット接続。
- 高度なソース:Kafka、Kinesisなどのソースは、追加のユーティリティクラスを通じて利用できます。これらには、リンクセクションで説明したように、追加の依存関係へのリンクが必要です。
このセクションの後半で、各カテゴリに存在するいくつかのソースについて説明します。
ストリーミングアプリケーションで複数のデータストリームを並行して受信する場合は、複数の入力DStreamを作成できます(詳細については、「データ受信の並列処理レベル」セクションを参照してください)。これにより、複数のデータストリームを同時に受信する複数のレシーバーが作成されます。ただし、Sparkワーカー/エグゼキューターは長時間実行されるタスクであるため、Spark Streamingアプリケーションに割り当てられたコアの1つを占有することに注意してください。したがって、Spark Streamingアプリケーションには、受信したデータを処理し、レシーバーを実行するのに十分なコア(またはローカルで実行する場合はスレッド)が割り当てられている必要があることに注意することが重要です。
覚えておくべきポイント
-
Spark Streamingプログラムをローカルで実行する場合は、マスターURLとして“local”または“local[1]”を使用しないでください。どちらも、タスクをローカルで実行するために1つのスレッドのみが使用されることを意味します。レシーバー(ソケット、Kafkaなど)に基づく入力DStreamを使用している場合、単一のスレッドはレシーバーの実行に使用されるため、受信したデータを処理するためのスレッドは残りません。したがって、ローカルで実行する場合は、マスターURLとして常に“local[n]”を使用します。ここで、nは実行するレシーバーの数よりも大きくなります(マスターの設定方法については、Sparkプロパティを参照してください)。
-
クラスタでの実行にロジックを拡張すると、Spark Streamingアプリケーションに割り当てられたコアの数は、レシーバーの数よりも多くする必要があります。そうしないと、システムはデータを受信しますが、それを処理できません。
基本ソース
TCPソケット接続で受信したテキストデータからDStreamを作成する、簡単な例のssc.socketTextStream(...)
についてはすでに見てきました。ソケットに加えて、StreamingContext APIは、ファイルを入力ソースとしてDStreamを作成するためのメソッドを提供します。
ファイルストリーム
HDFS APIと互換性のあるファイルシステム(つまり、HDFS、S3、NFSなど)上のファイルからデータを読み取る場合、StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]
を介してDStreamを作成できます。
ファイルストリームはレシーバーの実行を必要としないため、ファイルデータを受信するためにコアを割り当てる必要はありません。
単純なテキストファイルの場合、最も簡単な方法はStreamingContext.textFileStream(dataDirectory)
です。
fileStream
はPython APIでは利用できません。textFileStream
のみが利用可能です。
streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
テキストファイルの場合
streamingContext.textFileStream(dataDirectory)
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
テキストファイルの場合
streamingContext.textFileStream(dataDirectory);
ディレクトリの監視方法
Spark Streamingは、ディレクトリdataDirectory
を監視し、そのディレクトリに作成されたファイルを処理します。
"hdfs://namenode:8040/logs/"
などの単純なディレクトリを監視できます。このようなパスの直下にあるすべてのファイルは、検出されると処理されます。- POSIX globパターンは、
"hdfs://namenode:8040/logs/2017/*"
のように指定できます。ここで、DStreamはパターンに一致するディレクトリ内のすべてのファイルで構成されます。つまり、ディレクトリ内のファイルではなく、ディレクトリのパターンです。 - すべてのファイルは同じデータ形式である必要があります。
- ファイルは、作成時間ではなく、変更時間に基づいて時間枠の一部と見なされます。
- 処理されると、現在のウィンドウ内のファイルへの変更は、ファイルが再読み込みされる原因にはなりません。つまり、更新は無視されます。
- ディレクトリ内のファイルが多いほど、ファイルが変更されていない場合でも、変更をスキャンするのに時間がかかります。
- ワイルドカードを使用してディレクトリを識別する場合、たとえば
"hdfs://namenode:8040/logs/2016-*"
のように指定すると、パスに一致するようにディレクトリ全体の名前を変更することで、監視対象ディレクトリのリストにそのディレクトリが追加されます。ストリームに含まれるのは、そのディレクトリ内のファイルのうち、変更時刻が現在のウィンドウ内にあるファイルのみです。 FileSystem.setTimes()
を呼び出してタイムスタンプを修正することは、ファイルの内容が変更されていなくても、後のウィンドウでファイルが選択されるようにする方法です。
データソースとしてオブジェクトストアを使用する
HDFSのような「フル」ファイルシステムでは、出力ストリームが作成されるとすぐにファイルに変更時刻が設定される傾向があります。ファイルが開かれると、データが完全に書き込まれる前であっても、DStream
に含まれる可能性があり、その後、同じウィンドウ内でのファイルへの更新は無視されます。つまり、変更が見逃されたり、データがストリームから省略されたりする可能性があります。
変更がウィンドウ内で確実に選択されるようにするには、ファイルを監視対象外のディレクトリに書き込み、出力ストリームが閉じられた直後に、宛先ディレクトリに名前を変更します。名前が変更されたファイルが、作成ウィンドウ中にスキャンされた宛先ディレクトリに表示されれば、新しいデータが選択されます。
対照的に、Amazon S3やAzure Storageなどのオブジェクトストアは、データが実際にコピーされるため、名前変更操作が遅いのが通常です。さらに、名前が変更されたオブジェクトの変更時刻はrename()
操作の時刻になる可能性があるため、元の作成時刻が示唆するウィンドウの一部とは見なされない場合があります。
ターゲットのオブジェクトストアに対して、ストアのタイムスタンプ動作がSpark Streamingで想定される動作と一貫していることを確認するために、慎重なテストが必要です。選択したオブジェクトストアを介してデータをストリーミングする場合、宛先ディレクトリに直接書き込むのが適切な戦略である可能性があります。
このトピックの詳細については、Hadoopファイルシステム仕様を参照してください。
カスタムレシーバーに基づくストリーム
DStreamは、カスタムレシーバーを介して受信したデータストリームで作成できます。詳細については、カスタムレシーバーガイドを参照してください。
ストリームとしてのRDDのキュー
テストデータを使用してSpark Streamingアプリケーションをテストする場合、streamingContext.queueStream(queueOfRDDs)
を使用して、RDDのキューに基づいてDStreamを作成することもできます。キューにプッシュされた各RDDは、DStream内のデータのバッチとして扱われ、ストリームのように処理されます。
ソケットとファイルからのストリームの詳細については、Scalaの場合はStreamingContext、Javaの場合はJavaStreamingContext、Pythonの場合はStreamingContextの関連関数のAPIドキュメントを参照してください。
高度なソース
Python API Spark 3.5.1の時点で、これらのソースのうち、KafkaとKinesisがPython APIで利用可能です。
このカテゴリのソースでは、外部の非Sparkライブラリとのインターフェイスが必要であり、一部には複雑な依存関係(例:Kafka)があります。したがって、依存関係のバージョン競合に関連する問題を最小限に抑えるために、これらのソースからDStreamを作成する機能は、必要に応じて明示的にリンクできる別のライブラリに移動されました。
これらの高度なソースはSparkシェルでは利用できないため、これらの高度なソースに基づくアプリケーションはシェルでテストできないことに注意してください。Sparkシェルでどうしても使用したい場合は、対応するMavenアーティファクトのJARとその依存関係をダウンロードして、クラスパスに追加する必要があります。
これらの高度なソースの一部を次に示します。
-
Kafka: Spark Streaming 3.5.1は、Kafkaブローカーバージョン0.10以降と互換性があります。詳細については、Kafka統合ガイドを参照してください。
-
Kinesis: Spark Streaming 3.5.1は、Kinesisクライアントライブラリ1.2.1と互換性があります。詳細については、Kinesis統合ガイドを参照してください。
カスタムソース
Python API これはPythonではまだサポートされていません。
カスタムデータソースから入力DStreamを作成することもできます。必要なのは、カスタムソースからデータを受信してSparkにプッシュできるユーザー定義のレシーバー(それが何かを理解するには次のセクションを参照)を実装することだけです。詳細については、カスタムレシーバーガイドを参照してください。
レシーバーの信頼性
信頼性に基づいて、2種類のデータソースがあります。(Kafkaのような)ソースでは、転送されたデータが確認応答されるようにすることができます。これらの信頼できるソースからデータを受信するシステムが、受信したデータを正しく確認応答する場合、いかなる障害によってもデータが失われないようにすることができます。これにより、2種類のレシーバーが生まれます。
- 信頼できるレシーバー - 信頼できるレシーバーは、データが受信され、複製とともにSparkに格納されたときに、信頼できるソースに正しく確認応答を送信します。
- 信頼できないレシーバー - 信頼できないレシーバーは、ソースに確認応答を送信しません。これは、確認応答をサポートしないソース、または確認応答の複雑さを望まない場合、または必要としない場合に、信頼できるソースに使用できます。
信頼できるレシーバーを記述する方法の詳細は、カスタムレシーバーガイドで説明されています。
DStream の変換
RDDと同様に、変換により、入力DStreamからのデータを変更できます。DStreamは、通常のSpark RDDで使用できる変換の多くをサポートします。一般的なものを次に示します。
変換 | 意味 |
---|---|
map(func) | ソースDStreamの各要素を関数funcに通すことで、新しいDStreamを返します。 |
flatMap(func) | mapに似ていますが、各入力項目を0個以上の出力項目にマッピングできます。 |
filter(func) | funcがtrueを返すソースDStreamのレコードのみを選択して、新しいDStreamを返します。 |
repartition(numPartitions) | より多くのパーティションを作成するか、より少ないパーティションを作成することにより、このDStreamの並列処理のレベルを変更します。 |
union(otherStream) | ソースDStreamとotherDStreamの要素の和集合を含む新しいDStreamを返します。 |
count() | ソースDStreamの各RDD内の要素数をカウントすることにより、単一要素のRDDの新しいDStreamを返します。 |
reduce(func) | 関数func(2つの引数を取り、1つを返す)を使用して、ソースDStreamの各RDD内の要素を集計することにより、単一要素のRDDの新しいDStreamを返します。関数は並行して計算できるように、結合的かつ可換である必要があります。 |
countByValue() | 型Kの要素のDStreamで呼び出されると、各キーの値がソースDStreamの各RDD内の頻度である(K、Long)ペアの新しいDStreamを返します。 |
reduceByKey(func, [numTasks]) | (K, V)ペアのDStreamで呼び出されると、指定されたreduce関数を使用して各キーの値が集計された(K, V)ペアの新しいDStreamを返します。注:デフォルトでは、これはSparkのデフォルトの並列タスク数(ローカルモードでは2、クラスターモードでは設定プロパティspark.default.parallelism によって決定される数)を使用してグループ化を行います。オプションのnumTasks 引数を渡して、別のタスク数を設定できます。 |
join(otherStream, [numTasks]) | (K, V)ペアと(K, W)ペアの2つのDStreamで呼び出されると、各キーのすべての要素ペアを含む(K, (V, W))ペアの新しいDStreamを返します。 |
cogroup(otherStream, [numTasks]) | (K, V)ペアと(K, W)ペアのDStreamで呼び出されると、(K, Seq[V], Seq[W])タプルの新しいDStreamを返します。 |
transform(func) | ソースDStreamのすべてのRDDにRDD-to-RDD関数を適用することにより、新しいDStreamを返します。これは、DStreamで任意のRDD操作を実行するために使用できます。 |
updateStateByKey(func) | 各キーの状態が、キーの前の状態とキーの新しい値に指定された関数を適用することによって更新される新しい「状態」DStreamを返します。これは、各キーの任意の状態データを維持するために使用できます。 |
これらの変換のいくつかは、さらに詳しく説明する価値があります。
UpdateStateByKey操作
updateStateByKey
操作を使用すると、新しい情報で継続的に更新しながら、任意の状態を維持できます。これを使用するには、2つのステップを実行する必要があります。
- 状態を定義する - 状態は任意のデータ型にできます。
- 状態更新関数を定義する - 前の状態と入力ストリームからの新しい値を使用して状態を更新する方法を関数で指定します。
すべてのバッチで、Sparkは、バッチに新しいデータがあるかどうかに関係なく、すべての既存のキーに対して状態更新関数を適用します。更新関数がNone
を返す場合、キーと値のペアは削除されます。
例でこれを説明しましょう。テキストデータストリームで見られた各単語の実行中のカウントを維持したいとします。ここで、実行中のカウントは状態であり、整数です。更新関数を次のように定義します。
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
これは、単語を含むDStream(たとえば、前の例の(word, 1)
ペアを含むpairs
DStream)に適用されます。
runningCounts = pairs.updateStateByKey(updateFunction)
更新関数は、各単語に対して、1のシーケンス((word, 1)
ペアから)を持つnewValues
と、前のカウントを持つrunningCount
を使用して呼び出されます。完全なPythonコードについては、stateful_network_wordcount.pyの例を参照してください。
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
これは、単語を含むDStream(たとえば、前の例の(word, 1)
ペアを含むpairs
DStream)に適用されます。
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新関数は、各単語に対して、1のシーケンス((word, 1)
ペアから)を持つnewValues
と、前のカウントを持つrunningCount
を使用して呼び出されます。
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
Integer newSum = ... // add the new values with the previous running count to get the new count
return Optional.of(newSum);
};
これは、単語を含むDStream(たとえば、簡単な例の(word, 1)
ペアを含むpairs
DStream)に適用されます。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
更新関数は、各単語に対して、1のシーケンス((word, 1)
ペアから)を持つnewValues
と、前のカウントを持つrunningCount
を使用して呼び出されます。完全なJavaコードについては、JavaStatefulNetworkWordCount.javaの例を参照してください。
updateStateByKey
を使用するには、チェックポイントディレクトリを構成する必要があることに注意してください。これについては、チェックポイントセクションで詳しく説明します。
Transform操作
transform
操作(およびtransformWith
などのバリエーション)を使用すると、DStreamに任意のRDD-to-RDD関数を適用できます。これは、DStream APIで公開されていない任意のRDD操作を適用するために使用できます。たとえば、データストリーム内のすべてのバッチを別のデータセットと結合する機能は、DStream APIでは直接公開されていません。ただし、transform
を使用して簡単にこれを行うことができます。これにより、非常に強力な可能性が可能になります。たとえば、入力データストリームを事前に計算されたスパム情報(おそらくSparkでも生成された)と結合し、それに基づいてフィルタリングすることにより、リアルタイムデータクレンジングを行うことができます。
spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
# join data stream with spam information to do data cleaning
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
...
});
指定された関数は、すべてのバッチ間隔で呼び出されることに注意してください。これにより、時間変化するRDD操作を実行できます。つまり、RDD操作、パーティション数、ブロードキャスト変数などをバッチ間で変更できます。
ウィンドウ操作
Spark Streaming は、データのスライディングウィンドウ上で変換を適用できるウィンドウ計算も提供します。次の図は、このスライディングウィンドウを示しています。
図に示すように、ウィンドウがソース DStream 上をスライドするたびに、ウィンドウ内に収まるソース RDD が結合され、操作されて、ウィンドウ化された DStream の RDD が生成されます。この特定のケースでは、操作は過去 3 タイムユニットのデータに適用され、2 タイムユニットずつスライドします。これは、ウィンドウ操作には 2 つのパラメーターを指定する必要があることを示しています。
- ウィンドウ長 - ウィンドウの期間(図では 3)。
- スライド間隔 - ウィンドウ操作が実行される間隔(図では 2)。
これらの 2 つのパラメーターは、ソース DStream のバッチ間隔(図では 1)の倍数である必要があります。
ウィンドウ操作を例で説明しましょう。たとえば、前の例を拡張して、過去 30 秒間のデータの単語数を 10 秒ごとに生成するとします。これを行うには、過去 30 秒間のデータに対して、(word, 1)
ペアの pairs
DStream に対して reduceByKey
操作を適用する必要があります。これは、reduceByKeyAndWindow
操作を使用して行われます。
# Reduce last 30 seconds of data, every 10 seconds
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
一般的なウィンドウ操作の一部を以下に示します。これらの操作はすべて、上記の 2 つのパラメーター - windowLength と slideInterval を取ります。
変換 | 意味 |
---|---|
window(windowLength, slideInterval) | ソース DStream のウィンドウ化されたバッチに基づいて計算される新しい DStream を返します。 |
countByWindow(windowLength, slideInterval) | ストリーム内の要素のスライディングウィンドウカウントを返します。 |
reduceByWindow(func, windowLength, slideInterval) | func を使用してスライディング間隔にわたってストリーム内の要素を集計することによって作成された、新しい単一要素ストリームを返します。この関数は、並列で正しく計算できるように、結合的かつ可換である必要があります。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | (K, V) ペアの DStream で呼び出されると、各キーの値がスライディングウィンドウ内のバッチに対して指定された reduce 関数 func を使用して集計される、(K, V) ペアの新しい DStream を返します。注意: デフォルトでは、これは Spark のデフォルトの並列タスク数(ローカルモードの場合は 2、クラスターモードの場合は構成プロパティ spark.default.parallelism によって決定される数)を使用してグループ化を行います。オプションの numTasks 引数を渡して、異なるタスク数を設定できます。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) |
上記の |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | (K, V) ペアの DStream で呼び出されると、各キーの値がスライディングウィンドウ内の頻度である (K, Long) ペアの新しい DStream を返します。reduceByKeyAndWindow のように、reduce タスクの数はオプションの引数を使用して構成可能です。 |
結合操作
最後に、Spark Streaming でさまざまな種類の結合をどれほど簡単に行うことができるかを強調しておく価値があります。
ストリーム間の結合
ストリームは、他のストリームと非常に簡単に結合できます。
stream1 = ...
stream2 = ...
joinedStream = stream1.join(stream2)
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
ここで、各バッチ間隔で、stream1
によって生成された RDD は、stream2
によって生成された RDD と結合されます。leftOuterJoin
、rightOuterJoin
、fullOuterJoin
も実行できます。さらに、ストリームのウィンドウ上で結合を行うと非常に便利なことがよくあります。これも非常に簡単です。
windowedStream1 = stream1.window(20)
windowedStream2 = stream2.window(60)
joinedStream = windowedStream1.join(windowedStream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
ストリームとデータセットの結合
これは、DStream.transform
操作を説明するときにすでに示しました。以下は、ウィンドウ化されたストリームをデータセットと結合する別の例です。
dataset = ... # some RDD
windowedStream = stream.window(20)
joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
実際には、結合するデータセットを動的に変更することもできます。transform
に提供される関数は、すべてのバッチ間隔で評価されるため、dataset
参照が指す現在のデータセットが使用されます。
DStream 変換の完全なリストは、API ドキュメントにあります。Scala API については、DStream と PairDStreamFunctions を参照してください。Java API については、JavaDStream と JavaPairDStream を参照してください。Python API については、DStream を参照してください。
DStream の出力操作
出力操作を使用すると、DStream のデータをデータベースやファイルシステムなどの外部システムにプッシュできます。出力操作は、変換されたデータを外部システムが実際に使用できるようにするため、すべての DStream 変換の実際の実行をトリガーします(RDD のアクションと同様)。現在、以下の出力操作が定義されています
出力操作 | 意味 |
---|---|
print() | ストリーミングアプリケーションを実行しているドライバーノードで、DStream のすべてのデータバッチの最初の 10 個の要素を出力します。これは、開発やデバッグに役立ちます。 Python API これは、Python API では pprint() と呼ばれます。 |
saveAsTextFiles(prefix, [suffix]) | この DStream の内容をテキストファイルとして保存します。各バッチ間隔でのファイル名は、prefix と suffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"。 |
saveAsObjectFiles(prefix, [suffix]) | この DStream の内容をシリアル化された Java オブジェクトの SequenceFiles として保存します。各バッチ間隔でのファイル名は、prefix と suffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"。Python API これは Python API では使用できません。 |
saveAsHadoopFiles(prefix, [suffix]) | この DStream の内容を Hadoop ファイルとして保存します。各バッチ間隔でのファイル名は、prefix と suffix に基づいて生成されます: "prefix-TIME_IN_MS[.suffix]"。 Python API これは Python API では使用できません。 |
foreachRDD(func) | ストリームから生成された各 RDD に関数 func を適用する、最も汎用的な出力演算子です。この関数は、各 RDD のデータをファイルへの保存、またはネットワーク経由でのデータベースへの書き込みなどの外部システムにプッシュする必要があります。関数 func は、ストリーミングアプリケーションを実行しているドライバープロセスで実行され、通常、ストリーミング RDD の計算を強制する RDD アクションが含まれることに注意してください。 |
foreachRDD の使用における設計パターン
dstream.foreachRDD
は、データを外部システムに送信できる強力なプリミティブです。ただし、このプリミティブを正しく効率的に使用する方法を理解することが重要です。回避すべき一般的な間違いを以下に示します。
多くの場合、外部システムへのデータの書き込みには、接続オブジェクト(リモートサーバーへの TCP 接続など)を作成し、それを使用してリモートシステムにデータを送信する必要があります。この目的のために、開発者は誤って Spark ドライバーで接続オブジェクトを作成し、Spark ワーカーでそれを使用して RDD にレコードを保存しようとする可能性があります。たとえば(Scala で)、
def sendRecord(rdd):
connection = createNewConnection() # executed at the driver
rdd.foreach(lambda record: connection.send(record))
connection.close()
dstream.foreachRDD(sendRecord)
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}
dstream.foreachRDD(rdd -> {
Connection connection = createNewConnection(); // executed at the driver
rdd.foreach(record -> {
connection.send(record); // executed at the worker
});
});
これは、接続オブジェクトをシリアル化してドライバーからワーカーに送信する必要があるため、正しくありません。このような接続オブジェクトは、マシン間で転送できることはほとんどありません。このエラーは、シリアル化エラー(接続オブジェクトがシリアル化できない)、初期化エラー(接続オブジェクトをワーカーで初期化する必要がある)などとして現れる可能性があります。正しい解決策は、ワーカーで接続オブジェクトを作成することです。
ただし、これにより、別のよくある間違いである、レコードごとに新しい接続を作成することにつながる可能性があります。たとえば、
def sendRecord(record):
connection = createNewConnection()
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}
dstream.foreachRDD(rdd -> {
rdd.foreach(record -> {
Connection connection = createNewConnection();
connection.send(record);
connection.close();
});
});
通常、接続オブジェクトの作成には時間とリソースのオーバーヘッドがあります。したがって、レコードごとに接続オブジェクトを作成および破棄すると、不必要に高いオーバーヘッドが発生し、システム全体の処理量を大幅に低下させる可能性があります。より良い解決策は、rdd.foreachPartition
を使用することです。つまり、単一の接続オブジェクトを作成し、その接続を使用して RDD パーティション内のすべてのレコードを送信します。
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
Connection connection = createNewConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
connection.close();
});
});
これにより、多数のレコードにわたって接続作成のオーバーヘッドが償却されます。
最後に、複数の RDD/バッチ間で接続オブジェクトを再利用することで、これをさらに最適化できます。複数のバッチの RDD が外部システムにプッシュされるときに再利用できる接続オブジェクトの静的プールを維持することができ、オーバーヘッドをさらに削減できます。
def sendPartition(iter):
# ConnectionPool is a static, lazily initialized pool of connections
connection = ConnectionPool.getConnection()
for record in iter:
connection.send(record)
# return to the pool for future reuse
ConnectionPool.returnConnection(connection)
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
dstream.foreachRDD(rdd -> {
rdd.foreachPartition(partitionOfRecords -> {
// ConnectionPool is a static, lazily initialized pool of connections
Connection connection = ConnectionPool.getConnection();
while (partitionOfRecords.hasNext()) {
connection.send(partitionOfRecords.next());
}
ConnectionPool.returnConnection(connection); // return to the pool for future reuse
});
});
プール内の接続は、オンデマンドで遅延作成され、しばらく使用されていない場合はタイムアウトする必要があることに注意してください。これにより、外部システムへのデータの最も効率的な送信が実現されます。
その他の注意点
-
DStream は、RDD が RDD アクションによって遅延実行されるのと同様に、出力操作によって遅延実行されます。具体的には、DStream 出力操作内の RDD アクションは、受信したデータの処理を強制します。したがって、アプリケーションに出力操作がない場合、またはその内部に RDD アクションのない
dstream.foreachRDD()
のような出力操作がある場合、何も実行されません。システムは単にデータを受信して破棄します。 -
デフォルトでは、出力操作は一度に 1 つずつ実行されます。また、アプリケーションで定義された順序で実行されます。
DataFrame と SQL の操作
ストリーミングデータで DataFrames と SQL 操作を簡単に使用できます。StreamingContext が使用している SparkContext を使用して SparkSession を作成する必要があります。さらに、これはドライバーの障害時に再起動できるように行う必要があります。これは、SparkSession の遅延インスタンス化されたシングルトンインスタンスを作成することによって行われます。次の例にこれを示します。以前の 単語数カウントの例を変更して、DataFrames と SQL を使用して単語数を生成します。各 RDD は DataFrame に変換され、一時テーブルとして登録され、SQL を使用してクエリされます。
# Lazily instantiated global instance of SparkSession
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
...
# DataFrame operations inside your streaming program
words = ... # DStream of strings
def process(time, rdd):
print("========= %s =========" % str(time))
try:
# Get the singleton instance of SparkSession
spark = getSparkSessionInstance(rdd.context.getConf())
# Convert RDD[String] to RDD[Row] to DataFrame
rowRdd = rdd.map(lambda w: Row(word=w))
wordsDataFrame = spark.createDataFrame(rowRdd)
# Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words")
# Do word count on table using SQL and print it
wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
except:
pass
words.foreachRDD(process)
完全な ソースコード を参照してください。
/** DataFrame operations inside your streaming program */
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
完全な ソースコード を参照してください。
/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
private String word;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
}
...
/** DataFrame operations inside your streaming program */
JavaDStream<String> words = ...
words.foreachRDD((rdd, time) -> {
// Get the singleton instance of SparkSession
SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
// Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
JavaRow record = new JavaRow();
record.setWord(word);
return record;
});
DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
// Creates a temporary view using the DataFrame
wordsDataFrame.createOrReplaceTempView("words");
// Do word count on table using SQL and print it
DataFrame wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word");
wordCountsDataFrame.show();
});
完全な ソースコード を参照してください。
別のスレッド(つまり、実行中の StreamingContext に対して非同期)からストリーミングデータで定義されたテーブルに対して SQL クエリを実行することもできます。クエリが実行できるように、StreamingContext が十分な量のストリーミングデータを記憶するように設定してください。それ以外の場合、非同期 SQL クエリを認識しない StreamingContext は、クエリが完了する前に古いストリーミングデータを削除します。たとえば、最後のバッチをクエリしたいが、クエリの実行に 5 分かかる場合は、streamingContext.remember(Minutes(5))
(Scala の場合、または他の言語で同等のもの)を呼び出します。
DataFrames の詳細については、DataFrames と SQL ガイドを参照してください。
MLlib の操作
MLlib で提供されている機械学習アルゴリズムを簡単に使用することもできます。まず、ストリーミング機械学習アルゴリズム(例:ストリーミング線形回帰、ストリーミング K 平均法など)があり、ストリーミングデータから同時に学習し、ストリーミングデータにモデルを適用できます。これらに加えて、はるかに大規模な機械学習アルゴリズムのクラスについては、オフラインで(つまり、過去のデータを使用して)学習モデルを学習し、ストリーミングデータでオンラインでモデルを適用できます。詳細については、MLlib ガイドを参照してください。
キャッシュ/永続化
RDDと同様に、DStreamも開発者がストリームのデータをメモリに永続化することを許可します。つまり、DStreamでpersist()
メソッドを使用すると、そのDStreamのすべてのRDDが自動的にメモリに永続化されます。これは、DStreamのデータが複数回計算される場合(例えば、同じデータに対して複数の操作を行う場合)に便利です。reduceByWindow
やreduceByKeyAndWindow
のようなウィンドウベースの操作や、updateStateByKey
のような状態ベースの操作では、これは暗黙的に当てはまります。したがって、ウィンドウベースの操作によって生成されたDStreamは、開発者がpersist()
を呼び出さなくても、自動的にメモリに永続化されます。
ネットワーク経由でデータを受信する入力ストリーム(Kafka、ソケットなど)の場合、デフォルトの永続化レベルは、フォールトトレランスのためにデータを2つのノードに複製するように設定されています。
RDDとは異なり、DStreamのデフォルトの永続化レベルでは、データはメモリにシリアル化された状態で保持されることに注意してください。これについては、パフォーマンスチューニングのセクションでさらに詳しく説明します。異なる永続化レベルに関する詳細については、Sparkプログラミングガイドを参照してください。
チェックポイント
ストリーミングアプリケーションは24時間365日稼働する必要があるため、アプリケーションロジックとは無関係な障害(システム障害、JVMクラッシュなど)に対して回復力がある必要があります。これを可能にするために、Spark Streamingは障害から回復できるように、フォールトトレラントなストレージシステムに十分な情報をチェックポイントする必要があります。チェックポイントされるデータには、2つのタイプがあります。
- メタデータチェックポイント - HDFSのようなフォールトトレラントなストレージへのストリーミング計算を定義する情報の保存。これは、ストリーミングアプリケーションのドライバを実行しているノードの障害から回復するために使用されます(詳細については後述)。メタデータには以下が含まれます。
- 構成 - ストリーミングアプリケーションの作成に使用された構成。
- DStream操作 - ストリーミングアプリケーションを定義するDStream操作のセット。
- 不完全なバッチ - ジョブがキューに入れられているが、まだ完了していないバッチ。
- データチェックポイント - 生成されたRDDを信頼できるストレージに保存すること。これは、複数のバッチにわたってデータを結合する一部のステートフル変換で必要です。このような変換では、生成されたRDDは以前のバッチのRDDに依存するため、依存関係チェーンの長さは時間とともに増加し続けます。回復時間(依存関係チェーンに比例)のこのような無制限の増加を回避するために、ステートフル変換の中間RDDは、依存関係チェーンを断ち切るために、定期的に信頼できるストレージ(例:HDFS)にチェックポイントされます。
要約すると、メタデータチェックポイントは主にドライバ障害からの回復に必要ですが、データまたはRDDチェックポイントは、ステートフル変換が使用されている場合は、基本的な機能にも必要です。
チェックポイントを有効にするタイミング
次の要件のいずれかを持つアプリケーションでは、チェックポイントを有効にする必要があります。
- ステートフル変換の使用 - アプリケーションで
updateStateByKey
またはreduceByKeyAndWindow
(逆関数付き)のいずれかが使用されている場合は、定期的なRDDチェックポイントを許可するためにチェックポイントディレクトリを提供する必要があります。 - アプリケーションを実行しているドライバの障害からの回復 - メタデータチェックポイントは、進捗情報を回復するために使用されます。
前述のステートフル変換を使用しない単純なストリーミングアプリケーションは、チェックポイントを有効にせずに実行できることに注意してください。その場合、ドライバの障害からの回復も部分的になります(受信したが処理されていないデータの一部が失われる可能性があります)。これは多くの場合許容範囲であり、多くの人がこの方法でSpark Streamingアプリケーションを実行しています。非Hadoop環境のサポートは、将来改善される予定です。
チェックポイントを構成する方法
チェックポイントは、チェックポイント情報が保存されるフォールトトレラントで信頼性の高いファイルシステム(例:HDFS、S3など)にディレクトリを設定することで有効にできます。これは、streamingContext.checkpoint(checkpointDirectory)
を使用することで行われます。これにより、前述のステートフル変換を使用できるようになります。さらに、アプリケーションをドライバ障害から回復させたい場合は、ストリーミングアプリケーションを次のように動作するように書き直す必要があります。
- プログラムを最初に起動するときは、新しいStreamingContextを作成し、すべてのストリームを設定してからstart()を呼び出します。
- 障害後にプログラムを再起動するときは、チェックポイントディレクトリのチェックポイントデータからStreamingContextを再作成します。
この動作は、StreamingContext.getOrCreate
を使用することで簡単になります。これは次のように使用されます。
# Function to create and setup a new StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
return ssc
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# Do additional setup on context that needs to be done,
# irrespective of whether it is being started or restarted
context. ...
# Start the context
context.start()
context.awaitTermination()
checkpointDirectory
が存在する場合、コンテキストはチェックポイントデータから再作成されます。ディレクトリが存在しない場合(つまり、初めて実行する場合)、関数functionToCreateContext
が呼び出されて、新しいコンテキストが作成され、DStreamが設定されます。Pythonの例recoverable_network_wordcount.pyを参照してください。この例では、ネットワークデータの単語カウントをファイルに追加します。
また、StreamingContext.getOrCreate(checkpointDirectory, None)
を使用して、チェックポイントデータから明示的にStreamingContext
を作成し、計算を開始することもできます。
この動作は、StreamingContext.getOrCreate
を使用することで簡単になります。これは次のように使用されます。
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()
checkpointDirectory
が存在する場合、コンテキストはチェックポイントデータから再作成されます。ディレクトリが存在しない場合(つまり、初めて実行する場合)、関数functionToCreateContext
が呼び出されて、新しいコンテキストが作成され、DStreamが設定されます。Scalaの例RecoverableNetworkWordCountを参照してください。この例では、ネットワークデータの単語カウントをファイルに追加します。
この動作は、JavaStreamingContext.getOrCreate
を使用することで簡単になります。これは次のように使用されます。
// Create a factory object that can create and setup a new JavaStreamingContext
JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
@Override public JavaStreamingContext create() {
JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
...
jssc.checkpoint(checkpointDirectory); // set checkpoint directory
return jssc;
}
};
// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start();
context.awaitTermination();
checkpointDirectory
が存在する場合、コンテキストはチェックポイントデータから再作成されます。ディレクトリが存在しない場合(つまり、初めて実行する場合)、関数contextFactory
が呼び出されて、新しいコンテキストが作成され、DStreamが設定されます。Javaの例JavaRecoverableNetworkWordCountを参照してください。この例では、ネットワークデータの単語カウントをファイルに追加します。
getOrCreate
を使用することに加えて、ドライバプロセスが障害時に自動的に再起動されるようにする必要もあります。これは、アプリケーションの実行に使用されるデプロイインフラストラクチャによってのみ実行できます。これについては、デプロイメントのセクションでさらに詳しく説明します。
RDDのチェックポイントは、信頼できるストレージへの保存コストがかかることに注意してください。これにより、RDDがチェックポイントされるバッチの処理時間が増加する可能性があります。したがって、チェックポイントの間隔を慎重に設定する必要があります。バッチサイズが小さい場合(たとえば1秒)、すべてのバッチをチェックポイントすると、操作のスループットが大幅に低下する可能性があります。逆に、チェックポイントの間隔が短すぎると、リネージとタスクサイズが増加し、悪影響を及ぼす可能性があります。RDDチェックポイントが必要なステートフル変換の場合、デフォルトの間隔は少なくとも10秒のバッチ間隔の倍数です。これはdstream.checkpoint(checkpointInterval)
を使用して設定できます。通常、DStreamの5〜10スライディング間隔のチェックポイント間隔は、試してみるのに適切な設定です。
アキュムレータ、ブロードキャスト変数、およびチェックポイント
アキュムレータとブロードキャスト変数は、Spark Streamingのチェックポイントから回復できません。チェックポイントを有効にして、アキュムレータまたはブロードキャスト変数も使用する場合は、アキュムレータとブロードキャスト変数の遅延インスタンス化されたシングルトンインスタンスを作成して、ドライバが障害後に再起動されたときに再インスタンス化できるようにする必要があります。これは次の例に示されています。
def getWordExcludeList(sparkContext):
if ("wordExcludeList" not in globals()):
globals()["wordExcludeList"] = sparkContext.broadcast(["a", "b", "c"])
return globals()["wordExcludeList"]
def getDroppedWordsCounter(sparkContext):
if ("droppedWordsCounter" not in globals()):
globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
return globals()["droppedWordsCounter"]
def echo(time, rdd):
# Get or register the excludeList Broadcast
excludeList = getWordExcludeList(rdd.context)
# Get or register the droppedWordsCounter Accumulator
droppedWordsCounter = getDroppedWordsCounter(rdd.context)
# Use excludeList to drop words and use droppedWordsCounter to count them
def filterFunc(wordCount):
if wordCount[0] in excludeList.value:
droppedWordsCounter.add(wordCount[1])
False
else:
True
counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
wordCounts.foreachRDD(echo)
完全なソースコードを参照してください。
object WordExcludeList {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordExcludeList = Seq("a", "b", "c")
instance = sc.broadcast(wordExcludeList)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("DroppedWordsCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the excludeList Broadcast
val excludeList = WordExcludeList.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use excludeList to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (excludeList.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
完全なソースコードを参照してください。
class JavaWordExcludeList {
private static volatile Broadcast<List<String>> instance = null;
public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaWordExcludeList.class) {
if (instance == null) {
List<String> wordExcludeList = Arrays.asList("a", "b", "c");
instance = jsc.broadcast(wordExcludeList);
}
}
}
return instance;
}
}
class JavaDroppedWordsCounter {
private static volatile LongAccumulator instance = null;
public static LongAccumulator getInstance(JavaSparkContext jsc) {
if (instance == null) {
synchronized (JavaDroppedWordsCounter.class) {
if (instance == null) {
instance = jsc.sc().longAccumulator("DroppedWordsCounter");
}
}
}
return instance;
}
}
wordCounts.foreachRDD((rdd, time) -> {
// Get or register the excludeList Broadcast
Broadcast<List<String>> excludeList = JavaWordExcludeList.getInstance(new JavaSparkContext(rdd.context()));
// Get or register the droppedWordsCounter Accumulator
LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
// Use excludeList to drop words and use droppedWordsCounter to count them
String counts = rdd.filter(wordCount -> {
if (excludeList.value().contains(wordCount._1())) {
droppedWordsCounter.add(wordCount._2());
return false;
} else {
return true;
}
}).collect().toString();
String output = "Counts at time " + time + " " + counts;
}
完全なソースコードを参照してください。
アプリケーションのデプロイ
このセクションでは、Spark Streamingアプリケーションをデプロイする手順について説明します。
要件
Spark Streamingアプリケーションを実行するには、以下が必要です。
-
クラスターマネージャー付きのクラスター - これは、あらゆるSparkアプリケーションの一般的な要件であり、デプロイメントガイドで詳細に説明されています。
-
アプリケーションJARのパッケージ化 - ストリーミングアプリケーションをJARにコンパイルする必要があります。
spark-submit
を使用してアプリケーションを開始する場合、JARにSparkとSpark Streamingを提供する必要はありません。ただし、アプリケーションが高度なソース(例:Kafka)を使用している場合は、アプリケーションのデプロイに使用されるJARに、それらがリンクする追加のアーティファクトを依存関係とともにパッケージ化する必要があります。たとえば、KafkaUtils
を使用するアプリケーションでは、spark-streaming-kafka-0-10_2.12
とそのすべての推移的な依存関係をアプリケーションJARに含める必要があります。 -
エグゼキューターに十分なメモリを構成する - 受信したデータはメモリに保存する必要があるため、エグゼキューターは受信したデータを保持するのに十分なメモリで構成する必要があります。10分間のウィンドウ操作を行っている場合、システムは少なくとも過去10分間のデータをメモリに保持する必要があることに注意してください。したがって、アプリケーションのメモリ要件は、アプリケーションで使用される操作によって異なります。
-
チェックポイントの構成 - ストリームアプリケーションで必要な場合、Hadoop API互換のフォールトトレラントストレージ(例:HDFS、S3など)のディレクトリをチェックポイントディレクトリとして構成し、チェックポイント情報を障害回復に使用できるようにストリーミングアプリケーションを記述する必要があります。詳細については、チェックポイントのセクションを参照してください。
- アプリケーションドライバの自動再起動の構成 - ドライバの障害から自動的に回復するには、ストリーミングアプリケーションの実行に使用されるデプロイインフラストラクチャがドライバプロセスを監視し、ドライバが失敗した場合にドライバを再起動する必要があります。異なるクラスターマネージャーには、これを実現するための異なるツールがあります。
- Spark Standalone - Sparkアプリケーションドライバは、Spark Standaloneクラスター内で実行するように送信できます(クラスターデプロイモードを参照)。つまり、アプリケーションドライバ自体はワーカーノードの1つで実行されます。さらに、Standaloneクラスターマネージャーにドライバを監視するように指示し、ドライバがゼロ以外の終了コード、またはドライバを実行しているノードの障害のいずれかが原因で失敗した場合に再起動できます。詳細については、Spark Standaloneガイドのクラスターモードと監視を参照してください。
- YARN - Yarnは、アプリケーションを自動的に再起動するための同様のメカニズムをサポートしています。詳細については、YARNドキュメントを参照してください。
- Mesos - Marathonは、Mesosでこれを実現するために使用されています。
-
書き込み先行ログの設定 - Spark 1.2 以降、強力なフォールトトレランス保証を実現するために書き込み先行ログが導入されました。有効にすると、レシーバーから受信したすべてのデータは、構成チェックポイントディレクトリにある書き込み先行ログに書き込まれます。これにより、ドライバーの復旧時にデータが失われるのを防ぎ、データの損失ゼロを保証します(詳細については、フォールトトレランスのセマンティクスのセクションを参照してください)。これは、設定パラメーター
spark.streaming.receiver.writeAheadLog.enable
をtrue
に設定することで有効にできます。ただし、これらの強力なセマンティクスは、個々のレシーバーの受信スループットを犠牲にする可能性があります。これは、より多くのレシーバーを並列で実行することで、合計スループットを増やすことで修正できます。さらに、書き込み先行ログはレプリケートされたストレージシステムに既に格納されているため、書き込み先行ログが有効になっている場合は、Spark 内で受信したデータのレプリケーションを無効にすることをお勧めします。これは、入力ストリームのストレージレベルをStorageLevel.MEMORY_AND_DISK_SER
に設定することで実行できます。S3 (またはフラッシュをサポートしないファイルシステム) を書き込み先行ログに使用する場合は、spark.streaming.driver.writeAheadLog.closeFileAfterWrite
およびspark.streaming.receiver.writeAheadLog.closeFileAfterWrite
を有効にすることを忘れないでください。詳細については、Spark Streaming の設定を参照してください。I/O 暗号化が有効になっている場合でも、Spark は書き込み先行ログに書き込まれるデータを暗号化しないことに注意してください。書き込み先行ログデータの暗号化が必要な場合は、ネイティブに暗号化をサポートするファイルシステムに保存する必要があります。 - 最大受信レートの設定 - ストリーミングアプリケーションが受信したデータを処理するのに十分なクラスターリソースがない場合、レシーバーはレコード/秒単位で最大レート制限を設定することでレートを制限できます。レシーバーについては設定パラメーター
spark.streaming.receiver.maxRate
、Direct Kafka アプローチについてはspark.streaming.kafka.maxRatePerPartition
を参照してください。Spark 1.5 では、バックプレッシャーと呼ばれる機能が導入され、このレート制限を設定する必要がなくなりました。Spark Streaming はレート制限を自動的に計算し、処理条件が変化した場合に動的に調整します。このバックプレッシャーは、設定パラメーターspark.streaming.backpressure.enabled
をtrue
に設定することで有効にできます。
アプリケーションコードのアップグレード
実行中の Spark Streaming アプリケーションを新しいアプリケーションコードでアップグレードする必要がある場合、2 つのメカニズムが考えられます。
-
アップグレードされた Spark Streaming アプリケーションが起動され、既存のアプリケーションと並行して実行されます。新しいアプリケーション(古いアプリケーションと同じデータを受信)がウォームアップされ、本番環境に対応できるようになったら、古いアプリケーションを停止できます。これは、データを 2 つの宛先(つまり、以前のアプリケーションとアップグレードされたアプリケーション)に送信することをサポートするデータソースに対して実行できることに注意してください。
-
既存のアプリケーションは、正常にシャットダウンされます(正常なシャットダウンオプションについては、
StreamingContext.stop(...)
またはJavaStreamingContext.stop(...)
を参照してください)。これにより、受信したデータがシャットダウン前に完全に処理されることが保証されます。その後、アップグレードされたアプリケーションを起動できます。アップグレードされたアプリケーションは、以前のアプリケーションが停止した時点から処理を開始します。これは、ソース側のバッファリングをサポートする入力ソース(Kafka など)でのみ実行できることに注意してください。これは、以前のアプリケーションがダウンしていて、アップグレードされたアプリケーションがまだ起動していない間、データをバッファリングする必要があるためです。また、アップグレード前のコードの以前のチェックポイント情報から再起動することはできません。チェックポイント情報には、シリアル化された Scala/Java/Python オブジェクトが本質的に含まれており、新しい変更されたクラスでオブジェクトをデシリアライズしようとするとエラーが発生する可能性があります。この場合は、アップグレードされたアプリを別のチェックポイントディレクトリで開始するか、以前のチェックポイントディレクトリを削除します。
アプリケーションのモニタリング
Spark の監視機能に加えて、Spark Streaming に固有の追加機能があります。StreamingContext が使用されると、Spark Web UI に追加の Streaming
タブが表示され、実行中のレシーバー(レシーバーがアクティブかどうか、受信したレコード数、レシーバーエラーなど)および完了したバッチ(バッチ処理時間、キューイング遅延など)に関する統計情報が表示されます。これは、ストリーミングアプリケーションの進捗状況を監視するために使用できます。
Web UI の次の 2 つのメトリクスは特に重要です
- 処理時間 - 各データバッチを処理する時間。
- スケジューリング遅延 - 前のバッチの処理が完了するまで、バッチがキューで待機する時間。
バッチ処理時間が常にバッチ間隔よりも長く、キューイング遅延が増加し続ける場合は、システムがバッチを生成される速度で処理できず、遅れていることを示します。その場合は、バッチ処理時間の削減を検討してください。
Spark Streaming プログラムの進捗状況は、StreamingListener インターフェイスを使用して監視することもできます。これにより、レシーバーのステータスと処理時間を確認できます。これは開発者 API であり、将来的に改善される(つまり、より多くの情報が報告される)可能性があることに注意してください。
パフォーマンスチューニング
クラスターで Spark Streaming アプリケーションを最大限に活用するには、少し調整が必要です。このセクションでは、アプリケーションのパフォーマンスを向上させるために調整できるパラメーターと構成について説明します。大まかに言って、次の 2 つのことを考慮する必要があります。
-
クラスターリソースを効率的に使用して、各データバッチの処理時間を短縮します。
-
データのバッチが受信される速度で処理できるように、適切なバッチサイズを設定します(つまり、データ処理がデータ取り込みに追いつくようにします)。
バッチ処理時間の短縮
Spark で実行できる多くの最適化により、各バッチの処理時間を最小限に抑えることができます。これらについては、チューニングガイドで詳しく説明しています。このセクションでは、最も重要なものをいくつか紹介します。
データ受信の並列処理レベル
ネットワーク経由(Kafka、ソケットなど)でデータを受信するには、データをデシリアライズして Spark に格納する必要があります。データ受信がシステムのボトルネックになる場合は、データ受信を並列化することを検討してください。各入力 DStream は、単一のデータストリームを受信する単一のレシーバー(ワーカーマシンで実行)を作成することに注意してください。したがって、複数のデータストリームを受信するには、複数の入力 DStream を作成し、ソースからデータストリームの異なるパーティションを受信するように構成することで実現できます。たとえば、2 つのデータトピックを受信する単一の Kafka 入力 DStream は、それぞれ 1 つのトピックのみを受信する 2 つの Kafka 入力ストリームに分割できます。これにより、2 つのレシーバーが実行され、データが並列で受信されるため、全体的なスループットが向上します。これらの複数の DStream を統合して、単一の DStream を作成できます。次に、単一の入力 DStream に適用されていた変換を、統合されたストリームに適用できます。これは次のように行われます。
numStreams = 5
kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
unifiedStream = streamingContext.union(*kafkaStreams)
unifiedStream.pprint()
val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()
int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
考慮する必要があるもう 1 つのパラメーターは、レシーバーのブロック間隔です。これは、設定パラメーター spark.streaming.blockInterval
によって決定されます。ほとんどのレシーバーの場合、受信したデータは、Spark のメモリに格納する前に、データのブロックにまとめられます。各バッチ内のブロックの数は、マップのような変換で受信したデータを処理するために使用されるタスクの数を決定します。バッチごとのレシーバーごとのタスク数は、ほぼ (バッチ間隔 / ブロック間隔) になります。たとえば、200 ミリ秒のブロック間隔では、2 秒のバッチごとに 10 個のタスクが作成されます。タスクの数が少なすぎる(つまり、マシンごとのコア数よりも少ない)場合、使用可能なすべてのコアがデータの処理に使用されないため、非効率になります。特定のバッチ間隔のタスク数を増やすには、ブロック間隔を短縮します。ただし、ブロック間隔の推奨される最小値は約 50 ミリ秒であり、これ未満にするとタスク起動のオーバーヘッドが問題になる可能性があります。
複数の入力ストリーム/レシーバーを使用してデータを受信する代わりに、入力データストリームを明示的に再パーティション化する(inputStream.repartition(<パーティション数>)
を使用)方法もあります。これにより、受信したデータのバッチが、さらに処理を行う前に、クラスター内の指定された数のマシンに分散されます。
ダイレクトストリームについては、Spark Streaming + Kafka 統合ガイドを参照してください。
データ処理の並列処理レベル
計算の任意段階で使用される並列タスクの数が十分でない場合、クラスターリソースが十分に活用されない可能性があります。たとえば、reduceByKey
や reduceByKeyAndWindow
などの分散削減操作の場合、並列タスクのデフォルト数は spark.default.parallelism
設定プロパティによって制御されます。並列処理のレベルを引数として渡す(PairDStreamFunctions
ドキュメントを参照)、または spark.default.parallelism
設定プロパティを設定してデフォルトを変更できます。
データのシリアル化
データシリアル化のオーバーヘッドは、シリアル化形式を調整することで削減できます。ストリーミングの場合、シリアル化されているデータには 2 種類あります。
-
入力データ: デフォルトでは、レシーバーを通じて受信した入力データは、StorageLevel.MEMORY_AND_DISK_SER_2 を使用してエグゼキューターのメモリに格納されます。つまり、データは GC のオーバーヘッドを削減するためにバイトにシリアル化され、エグゼキューターの障害を許容するために複製されます。また、データは最初にメモリに保持され、ストリーミング計算に必要なすべての入力データを保持するのにメモリが不十分な場合にのみディスクにスピルされます。このシリアル化には明らかにオーバーヘッドがあります。レシーバーは受信したデータをデシリアライズし、Spark のシリアル化形式を使用して再シリアル化する必要があります。
-
ストリーミング処理で生成される永続化されたRDD: ストリーミング計算で生成されたRDDは、メモリに永続化される場合があります。例えば、ウィンドウ処理では、データが複数回処理されるため、メモリにデータが永続化されます。ただし、Spark CoreのデフォルトのStorageLevel.MEMORY_ONLYとは異なり、ストリーミング計算で生成された永続化されたRDDは、GCのオーバーヘッドを最小限に抑えるために、デフォルトでStorageLevel.MEMORY_ONLY_SER(つまり、シリアライズされた状態)で永続化されます。
どちらの場合も、Kryoシリアライゼーションを使用すると、CPUとメモリの両方のオーバーヘッドを削減できます。詳細については、Sparkチューニングガイドを参照してください。Kryoの場合、カスタムクラスの登録、およびオブジェクト参照の追跡の無効化を検討してください(構成ガイドのKryo関連の構成を参照)。
ストリーミングアプリケーションで保持する必要のあるデータ量が少ない特定のケースでは、過剰なGCオーバーヘッドを発生させることなく、データ(両方のタイプ)をデシリアライズされたオブジェクトとして永続化することが可能になる場合があります。例えば、数秒のバッチ間隔を使用しており、ウィンドウ処理を使用していない場合は、ストレージレベルを明示的に設定することで、永続化されたデータのシリアライゼーションを無効にすることができます。これにより、シリアライゼーションによるCPUオーバーヘッドが削減され、GCオーバーヘッドを過度に増やすことなく、パフォーマンスが向上する可能性があります。
タスク起動のオーバーヘッド
1秒あたりに起動されるタスクの数が多い場合(例えば、1秒あたり50以上)、エグゼキュータにタスクを送信するオーバーヘッドが大きくなり、1秒未満のレイテンシを達成することが難しくなる可能性があります。このオーバーヘッドは、次の変更によって削減できます。
- 実行モード: Sparkをスタンドアロンモードまたは粗粒度Mesosモードで実行すると、細粒度Mesosモードよりもタスクの起動時間が短縮されます。詳細については、Mesosでの実行ガイドを参照してください。
これらの変更により、バッチ処理時間が数百ミリ秒短縮され、1秒未満のバッチサイズが実現可能になります。
適切なバッチ間隔の設定
クラスター上で実行されているSpark Streamingアプリケーションが安定するためには、システムは受信したデータを高速で処理できる必要があります。言い換えれば、データのバッチは生成されるのと同じ速さで処理される必要があります。これがアプリケーションに当てはまるかどうかは、ストリーミングWeb UIで処理時間を監視することで確認できます。ここで、バッチ処理時間はバッチ間隔よりも短くする必要があります。
ストリーミング計算の性質によっては、使用されるバッチ間隔が、固定されたクラスターリソース上でアプリケーションが維持できるデータレートに大きな影響を与える可能性があります。例えば、以前のWordCountNetworkの例を考えてみましょう。特定のデータレートの場合、システムは2秒ごとに単語数を報告することができます(つまり、バッチ間隔は2秒)が、500ミリ秒ごとにはできません。したがって、バッチ間隔は、本番環境で予想されるデータレートを維持できるように設定する必要があります。
アプリケーションに適したバッチサイズを見つけるための適切なアプローチは、控えめなバッチ間隔(例えば、5〜10秒)と低いデータレートでテストすることです。システムがデータレートに追いつくことができるかどうかを確認するには、処理された各バッチで発生するエンドツーエンドの遅延の値を確認できます(Sparkドライバーのlog4jログで「合計遅延」を探すか、StreamingListenerインターフェイスを使用します)。遅延がバッチサイズと同程度に維持されている場合、システムは安定しています。それ以外の場合、遅延が継続的に増加している場合は、システムが追いつくことができず、したがって不安定であることを意味します。安定した構成が分かったら、データレートを増やすか、バッチサイズを減らすことを試みることができます。一時的なデータレートの増加による遅延の一時的な増加は、遅延が低い値(つまり、バッチサイズ未満)に戻る限り、問題ないことに注意してください。
メモリチューニング
Sparkアプリケーションのメモリ使用量とGCの動作のチューニングについては、チューニングガイドで詳細に説明されています。それを読むことを強くお勧めします。このセクションでは、特にSpark Streamingアプリケーションのコンテキストにおけるいくつかのチューニングパラメータについて説明します。
Spark Streamingアプリケーションに必要なクラスターメモリの量は、使用される変換の種類に大きく依存します。例えば、過去10分間のデータのウィンドウ処理を使用したい場合は、クラスターは10分間のデータをメモリに保持するのに十分なメモリを持っている必要があります。または、多数のキーを持つupdateStateByKey
を使用したい場合は、必要なメモリが大きくなります。逆に、単純なmap-filter-store操作を実行したい場合は、必要なメモリは少なくなります。
一般に、レシーバーを介して受信したデータはStorageLevel.MEMORY_AND_DISK_SER_2で保存されるため、メモリに収まらないデータはディスクにスピルオーバーします。これにより、ストリーミングアプリケーションのパフォーマンスが低下する可能性があるため、ストリーミングアプリケーションに必要な十分なメモリを提供することをお勧めします。小規模でメモリ使用量を試して、それに応じて見積もるのが最善です。
メモリチューニングのもう1つの側面は、ガベージコレクションです。低レイテンシを必要とするストリーミングアプリケーションの場合、JVMガベージコレクションによって引き起こされる大きな一時停止は望ましくありません。
メモリ使用量とGCオーバーヘッドを調整するのに役立ついくつかのパラメータがあります。
-
DStreamの永続化レベル: データシリアライゼーションセクションで前述したように、入力データとRDDは、デフォルトでシリアライズされたバイトとして永続化されます。これにより、デシリアライズされた永続化と比較して、メモリ使用量とGCオーバーヘッドの両方が削減されます。Kryoシリアライゼーションを有効にすると、シリアライズされたサイズとメモリ使用量がさらに削減されます。メモリ使用量のさらなる削減は、CPU時間のコストを犠牲にして、圧縮(Spark構成
spark.rdd.compress
を参照)で実現できます。 -
古いデータのクリア: デフォルトでは、DStream変換によって生成されたすべての入力データと永続化されたRDDは自動的にクリアされます。Spark Streamingは、使用されている変換に基づいてデータをクリアするタイミングを決定します。例えば、10分間のウィンドウ処理を使用している場合、Spark Streamingは過去10分間のデータを保持し、古いデータを積極的に破棄します。
streamingContext.remember
を設定することにより、データをより長い期間保持することができます(例えば、古いデータをインタラクティブにクエリする場合)。 -
CMSガベージコレクター: GC関連の一時停止を常に低く保つために、並行マークアンドスイープGCの使用が強く推奨されます。並行GCは、システムの全体的な処理スループットを低下させることが知られていますが、より一貫したバッチ処理時間を実現するために、その使用が依然として推奨されます。ドライバー(
spark-submit
で--driver-java-options
を使用)とエグゼキュータの両方(Spark構成spark.executor.extraJavaOptions
を使用)でCMS GCを設定してください。 -
その他のヒント: GCオーバーヘッドをさらに削減するために、試してみるべきヒントをいくつか紹介します。
OFF_HEAP
ストレージレベルを使用してRDDを永続化します。詳細については、Sparkプログラミングガイドを参照してください。- より多くのエグゼキュータを、より小さなヒープサイズで使用します。これにより、各JVMヒープ内のGCの圧力が軽減されます。
覚えておくべき重要なポイント
-
DStreamは単一のレシーバーに関連付けられています。読み取りの並列処理を実現するには、複数のレシーバー、つまり複数のDStreamを作成する必要があります。レシーバーはエグゼキューター内で実行されます。レシーバーは1つのコアを占有します。レシーバースロットが予約された後、処理に十分なコアがあることを確認してください。つまり、
spark.cores.max
はレシーバースロットを考慮に入れる必要があります。レシーバーは、ラウンドロビン方式でエグゼキューターに割り当てられます。 -
ストリームソースからデータを受信すると、レシーバーはデータのブロックを作成します。新しいデータブロックは、blockIntervalミリ秒ごとに生成されます。バッチ間隔中にN個のデータブロックが作成されます。ここで、N = バッチ間隔/blockIntervalです。これらのブロックは、現在のエグゼキューターのBlockManagerによって、他のエグゼキューターのブロックマネージャーに分散されます。その後、ドライバーで実行されているネットワーク入力トラッカーに、今後の処理のためのブロックの場所が通知されます。
-
バッチ間隔中に作成されたブロックに対して、ドライバーでRDDが作成されます。バッチ間隔中に生成されたブロックは、RDDのパーティションです。各パーティションはSparkのタスクです。blockInterval == バッチ間隔は、単一のパーティションが作成され、おそらくローカルで処理されることを意味します。
-
ブロックに対するマップタスクは、(ブロックを受信した)エグゼキューターと、(ブロックが複製された)別のエグゼキューターで、非ローカルスケジューリングが開始されない限り、ブロック間隔に関係なく処理されます。blockIntervalが大きいほど、ブロックは大きくなります。
spark.locality.wait
の値を大きくすると、ローカルノードでブロックを処理する可能性が高くなります。より大きなブロックがローカルで処理されるように、これらの2つのパラメータの間でバランスを見つける必要があります。 -
batchIntervalとblockIntervalに依存する代わりに、
inputDstream.repartition(n)
を呼び出すことで、パーティションの数を定義できます。これにより、RDDのデータがランダムに再シャッフルされ、n個のパーティションが作成されます。そうです、並列処理を向上させるためです。ただし、シャッフルというコストがかかります。RDDの処理は、ドライバーのジョブスケジューラによってジョブとしてスケジュールされます。ある時点で、アクティブなジョブは1つだけです。したがって、1つのジョブが実行されている場合、他のジョブはキューに入れられます。 -
2つのDStreamがある場合、2つのRDDが形成され、順番にスケジュールされる2つのジョブが作成されます。これを回避するには、2つのDStreamを結合できます。これにより、DStreamの2つのRDDに対して単一のunionRDDが確実に形成されます。このunionRDDは、単一のジョブと見なされます。ただし、RDDのパーティション分割は影響を受けません。
-
バッチ処理時間がバッチ間隔よりも長い場合、明らかにレシーバーのメモリが一杯になり始め、例外(おそらくBlockNotFoundException)が発生します。現在、レシーバーを一時停止する方法はありません。SparkConf構成
spark.streaming.receiver.maxRate
を使用すると、レシーバーのレートを制限できます。
フォールトトレランスのセマンティクス
このセクションでは、障害が発生した場合のSpark Streamingアプリケーションの動作について説明します。
背景
Spark Streamingによって提供されるセマンティクスを理解するために、SparkのRDDの基本的なフォールトトレランスセマンティクスを思い出しましょう。
- RDDは、不変で、決定論的に再計算可能な分散データセットです。各RDDは、フォールトトレラントな入力データセット上で使用され、それを作成した決定論的な操作の系列を記憶しています。
- RDDのパーティションがワーカーノードの障害によって失われた場合、そのパーティションは、操作の系統(リネージ)を使用して、元のフォールトトレラントなデータセットから再計算できます。
- RDDのすべての変換が決定論的であると仮定すると、最終的な変換されたRDDのデータは、Sparkクラスタ内の障害に関係なく、常に同じになります。
Sparkは、HDFSやS3のようなフォールトトレラントなファイルシステム上のデータを操作します。したがって、フォールトトレラントなデータから生成されたすべてのRDDもフォールトトレラントです。ただし、Spark Streamingの場合は、ほとんどの場合、データがネットワーク経由で受信されるため(fileStream
が使用されている場合を除く)、そうではありません。生成されたすべてのRDDに対して同じフォールトトレランスプロパティを実現するために、受信データはクラスタ内のワーカーノードの複数のSparkエグゼキュータ間で複製されます(デフォルトの複製係数は2)。これにより、障害が発生した場合に復旧する必要がある2種類のデータがシステム内に存在することになります。
- 受信および複製されたデータ - このデータは、他のノードのいずれかにコピーが存在するため、単一のワーカーノードの障害を生き残ります。
- 受信されたが複製のためにバッファリングされたデータ - これは複製されていないため、このデータを復旧する唯一の方法は、ソースから再度取得することです。
さらに、考慮すべき2種類の障害があります。
- ワーカーノードの障害 - エグゼキュータを実行しているワーカーノードのいずれかが障害を起こす可能性があり、それらのノード上のすべてのインメモリデータが失われます。レシーバーが障害が発生したノードで実行されていた場合、それらのバッファリングされたデータは失われます。
- ドライバノードの障害 - Spark Streamingアプリケーションを実行しているドライバノードが障害を起こした場合、明らかにSparkContextが失われ、すべてのエグゼキュータとそのインメモリデータが失われます。
この基本的な知識を踏まえて、Spark Streamingのフォールトトレランスのセマンティクスを理解しましょう。
定義
ストリーミングシステムのセマンティクスは、多くの場合、システムが各レコードを処理できる回数で捉えられます。システムが、あらゆる可能な動作条件(障害などにもかかわらず)の下で提供できる保証には、3つのタイプがあります。
- 最大1回:各レコードは1回処理されるか、まったく処理されません。
- 少なくとも1回:各レコードは1回以上処理されます。これは、データが失われないことを保証するため、最大1回よりも強力です。ただし、重複がある可能性があります。
- 正確に1回:各レコードは正確に1回処理されます - データが失われることはなく、データが複数回処理されることもありません。これは明らかに3つの中で最も強力な保証です。
基本的なセマンティクス
ストリーム処理システムでは、大まかに言って、データを処理する際に3つのステップがあります。
-
データを受信する:データはレシーバーまたはその他の方法を使用してソースから受信されます。
-
データを変換する:受信したデータは、DStreamおよびRDD変換を使用して変換されます。
-
データをプッシュする:最終的に変換されたデータは、ファイルシステム、データベース、ダッシュボードなどの外部システムにプッシュされます。
ストリーミングアプリケーションがエンドツーエンドの正確に1回の保証を達成する必要がある場合、各ステップは正確に1回の保証を提供する必要があります。つまり、各レコードは正確に1回受信され、正確に1回変換され、ダウンストリームシステムに正確に1回プッシュされる必要があります。Spark Streamingのコンテキストにおけるこれらのステップのセマンティクスを理解しましょう。
-
データを受信する:さまざまな入力ソースは、異なる保証を提供します。これについては、次のサブセクションで詳しく説明します。
-
データを変換する:RDDが提供する保証のおかげで、受信したすべてのデータは正確に1回処理されます。障害が発生した場合でも、受信した入力データにアクセスできる限り、最終的に変換されたRDDは常に同じ内容になります。
-
データをプッシュする:出力操作は、デフォルトで少なくとも1回のセマンティクスを保証します。これは、出力操作のタイプ(べき等であるか否か)とダウンストリームシステムのセマンティクス(トランザクションをサポートするか否か)に依存するためです。ただし、ユーザーは、正確に1回のセマンティクスを実現するために、独自のトランザクションメカニズムを実装できます。これについては、セクションの後半で詳しく説明します。
受信データのセマンティクス
さまざまな入力ソースは、少なくとも1回から正確に1回までのさまざまな保証を提供します。詳細については、お読みください。
ファイルを使用する場合
すべての入力データがHDFSのようなフォールトトレラントなファイルシステムにすでに存在する場合、Spark Streamingは障害から常に回復し、すべてのデータを処理できます。これにより、正確に1回のセマンティクスが得られます。つまり、何が障害を起こしても、すべてのデータが正確に1回処理されます。
レシーバーベースのソースを使用する場合
レシーバーに基づく入力ソースの場合、フォールトトレランスのセマンティクスは、障害シナリオとレシーバーのタイプの両方に依存します。前に説明したように、レシーバーには2つのタイプがあります
- 信頼できるレシーバー - これらのレシーバーは、受信したデータが複製されたことを確認した後にのみ、信頼できるソースを認識します。このようなレシーバーが障害を起こした場合、ソースはバッファリングされた(複製されていない)データに対して確認応答を受け取りません。したがって、レシーバーが再起動されると、ソースはデータを再送信し、障害によってデータが失われることはありません。
- 信頼できないレシーバー - このようなレシーバーは確認応答を送信しないため、ワーカーまたはドライバの障害により障害が発生した場合にデータを失う可能性があります。
使用するレシーバーのタイプに応じて、次のセマンティクスを実現します。ワーカーノードが障害を起こした場合、信頼できるレシーバーではデータ損失はありません。信頼できないレシーバーでは、受信されたが複製されていないデータが失われる可能性があります。ドライバノードが障害を起こした場合、これらの損失に加えて、メモリ内で受信および複製された過去のすべてのデータが失われます。これは、状態を持つ変換の結果に影響を与えます。
この過去に受信したデータの損失を回避するために、Spark 1.2では、受信したデータをフォールトトレラントなストレージに保存する書き込み先行ログが導入されました。書き込み先行ログが有効で、信頼できるレシーバーを使用すると、データ損失はゼロになります。セマンティクスの観点から、少なくとも1回の保証を提供します。
次の表に、障害発生時のセマンティクスをまとめます。
デプロイメントシナリオ | ワーカーの障害 | ドライバの障害 |
---|---|---|
Spark 1.1以前、または 書き込み先行ログなしのSpark 1.2以降 |
信頼できないレシーバーではバッファリングされたデータが失われる 信頼できるレシーバーではデータ損失なし 少なくとも1回のセマンティクス |
信頼できないレシーバーではバッファリングされたデータが失われる すべてのレシーバーで過去のデータが失われる 未定義のセマンティクス |
書き込み先行ログ付きのSpark 1.2以降 | 信頼できるレシーバーではデータ損失なし 少なくとも1回のセマンティクス |
信頼できるレシーバーとファイルではデータ損失なし 少なくとも1回のセマンティクス |
Kafka Direct APIを使用する場合
Spark 1.3では、新しいKafka Direct APIを導入しました。これにより、すべてのKafkaデータがSpark Streamingによって正確に1回受信されることが保証されます。これに加えて、正確に1回の出力操作を実装すると、エンドツーエンドの正確に1回の保証を達成できます。このアプローチについては、Kafka統合ガイドでさらに説明します。
出力操作のセマンティクス
出力操作(foreachRDD
など)には、少なくとも1回のセマンティクスがあります。つまり、ワーカーの障害が発生した場合、変換されたデータが外部エンティティに複数回書き込まれる可能性があります。これは、saveAs***Files
操作を使用してファイルシステムに保存する場合は許容されます(ファイルは単に同じデータで上書きされるため)が、正確に1回のセマンティクスを実現するには、追加の努力が必要になる場合があります。2つのアプローチがあります。
-
べき等な更新:複数の試行で常に同じデータが書き込まれます。たとえば、
saveAs***Files
は常に同じデータを生成されたファイルに書き込みます。 -
トランザクション更新:すべての更新はトランザクション的に行われ、更新が正確に1回アトミックに行われるようにします。これを行う1つの方法は、次のとおりです。
- バッチ時間(
foreachRDD
で利用可能)とRDDのパーティションインデックスを使用して、識別子を作成します。この識別子は、ストリーミングアプリケーション内のBLOBデータを一意に識別します。 -
このBLOBをトランザクション的に(つまり、正確に1回、アトミックに)識別子を使用して外部システムで更新します。つまり、識別子がまだコミットされていない場合は、パーティションデータと識別子をアトミックにコミットします。それ以外の場合は、すでにコミットされている場合は、更新をスキップします。
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
- バッチ時間(