Spark Streaming カスタムレシーバ

Spark Streamingは、組み込みでサポートされているデータソース(Kafka、Kinesis、ファイル、ソケットなど)以外にも、任意のデータソースからのストリーミングデータを受信できます。そのためには、対象のデータソースからデータを受信するためにカスタマイズされた *レシーバ* を開発者が実装する必要があります。このガイドでは、カスタムレシーバを実装し、Spark Streamingアプリケーションで使用する方法を説明します。カスタムレシーバは、ScalaまたはJavaで実装できます。

カスタムレシーバの実装

これは、**Receiver** (Scala ドキュメントJava ドキュメント) の実装から始まります。カスタムレシーバは、2つのメソッドを実装することでこの抽象クラスを拡張する必要があります。

onStart()onStop() のどちらも、無期限にブロックしてはなりません。onStart() は通常、データ受信を担当するスレッドを開始し、onStop() はこれらのデータ受信スレッドが停止されることを保証します。受信スレッドは、データの受信を停止する必要があるかどうかを確認するために、Receiver メソッドである isStopped() を使用することもできます。

データを受信したら、Receiver クラスで提供される store(data) メソッドを呼び出すことで、Spark 内にそのデータを保存できます。store() にはいくつかの種類があり、受信したデータをレコード単位で保存したり、オブジェクトやシリアライズされたバイトの集合として保存したりできます。store() の種類は、レシーバの信頼性とフォールトトレランスのセマンティクスに影響します。これは、後ほど詳細に説明します。

受信スレッドの例外は、レシーバのサイレントエラーを回避するために適切にキャッチして処理する必要があります。restart(<exception>) は、非同期的に onStop() を呼び出し、その後遅延後に onStart() を呼び出すことでレシーバを再開します。stop(<exception>)onStop() を呼び出してレシーバを終了します。また、reportError(<error>) は、レシーバを停止/再開せずに、ドライバにエラーメッセージを報告します(ログとUIに表示されます)。

以下は、ソケットを介してテキストストリームを受信するカスタムレシーバです。テキストストリーム内の'\n'区切りの行をレコードとして扱い、Sparkに保存します。受信スレッドに接続または受信エラーが発生した場合、レシーバは再開され、接続を再試行します。

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
      // Connect to host:port
      socket = new Socket(host, port)

      // Until stopped or connection broken continue reading
      val reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
      userInput = reader.readLine()
      while(!isStopped && userInput != null) {
        store(userInput)
        userInput = reader.readLine()
      }
      reader.close()
      socket.close()

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again")
    } catch {
      case e: java.net.ConnectException =>
        // restart if could not connect to server
        restart("Error connecting to " + host + ":" + port, e)
      case t: Throwable =>
        // restart if there is any other error
        restart("Error receiving data", t)
    }
  }
}
public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  @Override
  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread(this::receive).start();
  }

  @Override
  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(
        new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data '" + userInput + "'");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

Spark Streamingアプリケーションでのカスタムレシーバの使用

カスタムレシーバは、streamingContext.receiverStream(<カスタムレシーバのインスタンス>) を使用してSpark Streamingアプリケーションで使用できます。これにより、カスタムレシーバのインスタンスによって受信されたデータを使用して入力DStreamが作成されます(下記参照)。

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = customReceiverStream.flatMap(_.split(" "))
...

完全なソースコードは、例CustomReceiver.scalaにあります。

// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);
...

完全なソースコードは、例JavaCustomReceiver.javaにあります。

レシーバの信頼性

Spark Streaming プログラミングガイドで簡単に説明したように、信頼性とフォールトトレランスのセマンティクスに基づいて、2種類のレシーバがあります。

  1. *信頼性の高いレシーバ* - 送信されたデータの確認を許可する*信頼性の高いソース*の場合、*信頼性の高いレシーバ*は、データがSparkに確実に受信され保存されたこと(つまり、正常にレプリケートされたこと)をソースに正しく確認します。通常、このレシーバの実装には、ソースの確認のセマンティクスを慎重に考慮する必要があります。
  2. *信頼性の低いレシーバ* - *信頼性の低いレシーバ*は、ソースに確認を送信しません。これは、確認をサポートしないソース、または確認の複雑さを必要としない信頼性の高いソースにも使用できます。

*信頼性の高いレシーバ*を実装するには、store(複数レコード)を使用してデータを保存する必要があります。この種類のstoreはブロッキングコールであり、指定されたすべてのレコードがSpark内に保存された後にのみ戻ります。レシーバの構成されたストレージレベルがレプリケーションを使用している場合(デフォルトで有効)、この呼び出しはレプリケーションが完了した後に戻ります。したがって、データが確実に保存され、レシーバはソースに適切に確認応答できます。これにより、レシーバがデータのレプリケーション中に失敗した場合でもデータが失われることはありません。バッファされたデータは確認されず、後でソースによって再送信されます。

*信頼性の低いレシーバ*は、このロジックを実装する必要はありません。store(単一レコード)を使用して、ソースからレコードを1つずつ受信して挿入できます。store(複数レコード)の信頼性の保証は得られませんが、次の利点があります。

次の表は、2種類のレシーバの特徴をまとめたものです。

レシーバの種類 特徴
信頼性の低いレシーバ 実装が簡単です。
システムはブロックの生成とレート制御を処理します。フォールトトレランスの保証はありません。レシーバの障害時にデータが失われる可能性があります。
信頼性の高いレシーバ 強力なフォールトトレランスの保証があり、データの損失をゼロにすることができます。
ブロックの生成とレート制御は、レシーバの実装によって処理する必要があります。
実装の複雑さは、ソースの確認メカニズムによって異なります。