Spark Streaming カスタムレシーバー

Spark Streaming は、組み込みサポートのあるもの(Kafka、Kinesis、ファイル、ソケットなど)以外にも、任意のデータソースからストリーミングデータを受信できます。これには、開発者が関連するデータソースからのデータ受信用にカスタマイズされたレシーバーを実装する必要があります。このガイドでは、カスタムレシーバーの実装プロセスと、Spark Streaming アプリケーションでの使用方法を説明します。カスタムレシーバーは Scala または Java で実装できることに注意してください。

カスタムレシーバーの実装

これは、Receiver (Scala ドキュメントJava ドキュメント) を実装することから始まります。カスタムレシーバーは、2 つのメソッドを実装することで、この抽象クラスを拡張する必要があります。

onStart() および onStop() は、無期限にブロックしてはいけません。通常、onStart() はデータの受信を担当するスレッドを開始し、onStop() はこれらのデータ受信スレッドが停止されることを保証します。受信スレッドは、Receiver メソッドである isStopped() を使用して、データの受信を停止すべきかどうかを確認することもできます。

データを受信したら、Receiver クラスによって提供されるメソッドである store(data) を呼び出すことで、Spark 内にそのデータを保存できます。受信したデータをレコード単位、またはオブジェクト/バイト配列のコレクション全体として保存できる、いくつかの store() のバリエーションがあります。レシーバーを実装するために使用される store() のバリエーションは、その信頼性と耐障害性のセマンティクスに影響することに注意してください。これについては、後で 詳しく 説明します。

受信スレッド内の例外は、サイレントフェイルを避けるためにキャッチして適切に処理する必要があります。restart(<exception>) は、非同期で onStop() を呼び出し、遅延後に onStart() を呼び出すことでレシーバーを再起動します。stop(<exception>)onStop() を呼び出し、レシーバーを終了します。また、reportError(<error>) は、レシーバーを停止/再起動せずに、ドライバーにエラーメッセージを報告します(ログと UI で表示されます)。

以下は、ソケット経由でテキストストリームを受信するカスタムレシーバーの例です。テキストストリームの改行区切りの行をレコードとして扱い、Spark に保存します。受信スレッドで接続または受信にエラーが発生した場合、レシーバーは再起動され、再度接続を試みます。

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}
public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  @Override
  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread(this::receive).start();
  }

  @Override
  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data '" + userInput + "'");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

Spark Streaming アプリケーションでのカスタムレシーバーの使用

streamingContext.receiverStream(<instance of custom receiver>) を使用して、カスタムレシーバーを Spark Streaming アプリケーションで使用できます。これにより、以下に示すように、カスタムレシーバーのインスタンスによって受信されたデータを使用した入力 DStream が作成されます。

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...

完全なソースコードは、例の CustomReceiver.scala にあります。

// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...

完全なソースコードは、例の JavaCustomReceiver.java にあります。

レシーバーの信頼性

Spark Streaming プログラミングガイドの レシーバーの信頼性で簡単に説明したように、信頼性と耐障害性のセマンティクスに基づいて 2 種類のレシーバーがあります。

  1. 信頼性の高いレシーバー - 信頼性の高いソースでは、送信されたデータを確認できるため、信頼性の高いレシーバーは、データが Spark に確実に(つまり、正常にレプリケートされた)受信および保存されたことをソースに正しく確認します。通常、このレシーバーの実装には、ソースの確認のセマンティクスを注意深く検討する必要があります。
  2. 信頼性の低いレシーバー - 信頼性の低いレシーバーは、ソースに確認を送信しません。これは、確認をサポートしていないソース、または確認の複雑さを避けたい/必要としない信頼性の高いソースでも使用できます。

信頼性の高いレシーバーを実装するには、データを保存するために store(multiple-records) を使用する必要があります。この store のバリエーションは、指定されたすべてのレコードが Spark 内に保存されるまで返されないブロック呼び出しです。レシーバーに設定されたストレージレベルがレプリケーションを使用している場合(デフォルトで有効)、この呼び出しはレプリケーションが完了してから返されます。これにより、データが確実に保存されていることが保証され、レシーバーはソースに適切に確認できるようになります。これにより、レシーバーがデータレプリケーションの途中で失敗した場合でもデータが失われることはありません。バッファリングされたデータは確認されず、後でソースによって再送信されます。

信頼性の低いレシーバーは、このようなロジックを実装する必要はありません。store(single-record) を使用して、レコードをソースから受信して 1 つずつ挿入するだけで済みます。store(multiple-records) の信頼性保証はありませんが、以下の利点があります。

次の表は、両方のタイプのレシーバーの特性をまとめたものです。

レシーバータイプ 特性
信頼性の低いレシーバー 実装が簡単。
システムがブロック生成とレート制御を担当。耐障害性の保証なし、レシーバーの障害でデータが失われる可能性がある。
信頼性の高いレシーバー 強力な耐障害性保証、データ損失ゼロを保証できる。
ブロック生成とレート制御はレシーバーの実装で処理する必要がある。
実装の複雑さは、ソースの確認メカニズムによって異なる。