Spark Connect によるアプリケーション開発

Spark Connect の概要

Apache Spark 3.4 では、Spark Connect は、DataFrame API と解決されていない論理プランをプロトコルとして使用して、Spark クラスターへのリモート接続を可能にする、分離されたクライアント・サーバー アーキテクチャを導入しました。クライアントとサーバーの分離により、Spark とそのオープン エコシステムはどこからでも活用できるようになります。最新のデータ アプリケーション、IDE、Notebook、およびプログラミング言語に組み込むことができます。

Spark Connect の詳細については、「Spark Connect の概要」を参照してください。

Spark Connect を使用した Spark アプリケーションの再定義

分離されたクライアント・サーバー アーキテクチャにより、Spark Connect は Spark アプリケーションの開発方法を簡素化します。Spark クライアント アプリケーションと Spark サーバー ライブラリという概念が次のように導入されます。

Spark 3.4 および Spark Connect では、Spark クライアント アプリケーションの開発が簡素化され、Spark サーバー ライブラリの構築方法に関する明確な拡張ポイントとガイドラインが提供されており、両方のタイプのアプリケーションが Spark と共に進化しやすくなっています。図 1 に示すように、Spark クライアント アプリケーションは Spark Connect API を使用して Spark に接続します。これは基本的に DataFrame API であり、完全に宣言型です。

Extending Spark
Connect Diagram

Spark サーバー ライブラリは Spark を拡張します。通常、Spark Connect API の一部としてクライアント アプリケーションに公開される、Spark と統合された追加のサーバー側ロジックを提供します。これは Spark Connect 拡張ポイントを使用します。たとえば、Spark サーバー ライブラリ は、カスタム サービス側ロジック (カスタム ライブラリ プラグインというラベルの付いた青いボックスで示される) で構成され、Spark Connect API の一部として青いボックスを介してクライアントに公開されます。クライアントは、PySpark や Spark Scala クライアントなどと組み合わせてこの API を使用し、Spark クライアント アプリケーションがカスタム ロジック/ライブラリと連携しやすくなります。

Spark API モード: Spark クライアントと Spark Classic

Spark は API モードである spark.api.mode 設定を提供しており、Spark Classic アプリケーションを Spark Connect にシームレスに切り替えることができます。 spark.api.mode の値に応じて、アプリケーションは Spark Classic モードまたは Spark Connect モードのいずれかで実行できます。例を以下に示します。

from pyspark.sql import SparkSession

SparkSession.builder.config("spark.api.mode", "connect").master("...").getOrCreate()

アプリケーションを送信する際に、この設定を Scala および PySpark アプリケーションに適用することもできます。

spark-submit --master "..." --conf spark.api.mode=connect

さらに、Spark Connect はローカル テストのための便利なオプションを提供します。spark.remotelocal[...] または local-cluster[...] に設定することで、ローカル Spark Connect サーバーを起動し、Spark Connect セッションにアクセスできます。

これは、--master ... と共に --conf spark.api.mode=connect を使用するのと似ています。ただし、spark.remote および --remotelocal* 値に限定されるのに対し、--master ... と共に --conf spark.api.mode=connect は、Spark Classic との互換性を広げるために、spark:// などの追加のクラスター URL をサポートすることに注意してください。

Spark クライアント アプリケーション

Spark クライアント アプリケーションは、Spark ユーザーが現在開発している通常の Spark アプリケーションです。たとえば、ETL パイプライン、データ準備、モデルのトレーニングまたは推論などです。これらは通常、Spark の宣言型 DataFrame および DataSet API を使用して構築されます。Spark Connect では、コアの動作は同じですが、いくつかの違いがあります。

Spark Connect に基づくクライアント アプリケーションは、以前のジョブと同じ方法で送信できます。さらに、Spark Connect に基づく Spark クライアント アプリケーションには、以前の Spark バージョン (3.4 以前) を使用する従来の Spark アプリケーションと比較していくつかの利点があります。

Spark サーバー ライブラリ

Spark 3.4 まで、Spark への拡張 (たとえば、Spark ML または Spark-NLP) は、Spark クライアント アプリケーションのようにビルドおよびデプロイされていました。Spark 3.4 および Spark Connect では、Spark サーバー ライブラリを介して Spark を拡張するための明示的な拡張ポイントが提供されています。SparkSession extensionsSpark Plugins などの Spark の既存の拡張ポイントとは異なり、これらの拡張ポイントはクライアントに公開できる機能を提供します。

開始方法: Spark サーバー ライブラリで Spark を拡張する

Spark Connect は利用可能であり、PySpark および Scala アプリケーションをサポートしています。Spark Connect を使用した Apache Spark サーバーの実行方法と、Spark Connect クライアント ライブラリを使用してクライアント アプリケーションからそれに接続する方法を説明します。

Spark サーバー ライブラリは、図 2 に示す次のコンポーネントで構成されます。

  1. Spark Connect プロトコル拡張 (青いボックス Proto API)
  2. Spark Connect プラグイン。
  3. Spark を拡張するアプリケーション ロジック。
  4. Spark サーバー ライブラリ アプリケーション ロジックを、PySpark または Scala Spark クライアントと共に Spark クライアント アプリケーションに公開するクライアント パッケージ。

Extending Spark
Connect Diagram - Labelled Steps

(1) Spark Connect プロトコル拡張

新しい Spark サーバー ライブラリで Spark を拡張するために、開発者は Spark Connect プロトコルの 3 つの主要な操作タイプである RelationExpression、および Command を拡張できます。

message Relation {
  oneof rel_type {
    Read read = 1;
    // ...
    google.protobuf.Any extension = 998;
  } 
}

message Expression {
  oneof expr_type {
    Literal literal = 1;
    // ...
    google.protobuf.Any extension = 999;
  } 
}

message Command {
  oneof command_type {
    WriteCommand write_command = 1;
    // ...
    google.protobuf.Any extension = 999;
  } 
} 

拡張フィールドにより、Spark Connect プロトコルの一部として任意の protobuf メッセージをシリアル化できます。これらのメッセージは、拡張実装のパラメーターまたは状態を表します。カスタム式タイプをビルドするには、開発者はまず式のカスタム protobuf 定義を定義します。

message ExamplePluginExpression {
  Expression child = 1;
  string custom_field = 2;
}

(2) (3) カスタム アプリケーション ロジックを備えた Spark Connect プラグイン実装

次のステップとして、開発者は、protobuf メッセージの入力パラメーターに基づいたカスタム アプリケーション ロジックで Spark Connect の ExpressionPlugin クラスを実装します。

class ExampleExpressionPlugin extends ExpressionPlugin {
  override def transform(
      relation: protobuf.Any,
      planner: SparkConnectPlanner): Option[Expression] = {
    // Check if the serialized value of protobuf.Any matches the type
    // of our example expression.
    if (!relation.is(classOf[proto.ExamplePluginExpression])) {
      return None
    }
    val exp = relation.unpack(classOf[proto.ExamplePluginExpression])
    Some(Alias(planner.transformExpression(
        exp.getChild), exp.getCustomField)(explicitMetadata = None))
  }
}

アプリケーション ロジックが開発されたら、コードを jar としてパッケージ化し、Spark が追加ロジックを認識するように構成する必要があります。関連する Spark 設定オプションは次のとおりです。

(4) Spark サーバー ライブラリ クライアント パッケージ

サーバー コンポーネントがデプロイされると、任意のクライアントが適切な protobuf メッセージを使用して使用できます。上記の例では、Spark Connect エンドポイントに送信された次のメッセージ ペイロードがあれば、拡張メカニズムをトリガーするのに十分です。

{
  "project": {
    "input": {
      "sql": {
        "query": "select * from samples.nyctaxi.trips"
      }
    },
    "expressions": [
      {
        "extension": {
          "typeUrl": "type.googleapis.com/spark.connect.ExamplePluginExpression",
          "value": "\n\006\022\004\n\002id\022\006testval"
        }
      }
    ]
  }
} 

例を Python で利用できるようにするために、アプリケーション開発者は、新しい式をラップし、それを PySpark に埋め込む Python ライブラリを提供します。任意の式に機能を提供する最も簡単な方法は、PySpark 列インスタンスを引数として受け取り、式が適用された新しい列インスタンスを返すことです。

from pyspark.sql.connect.column import Expression
import pyspark.sql.connect.proto as proto

from myxample.proto import ExamplePluginExpression

# Internal class that satisfies the interface by the Python client
# of Spark Connect to generate the protobuf representation from
# an instance of the expression.
class ExampleExpression(Expression):
    def to_plan(self, session) -> proto.Expression:
        fun = proto.Expression()
        plugin = ExamplePluginExpression()
        plugin.child.literal.long = 10
        plugin.custom_field = "example"
        fun.extension.Pack(plugin)
        return fun

# Defining the function to be used from the consumers.
def example_expression(col: Column) -> Column:
    return Column(ExampleExpression())


# Using the expression in the Spark Connect client code.
df = spark.read.table("samples.nyctaxi.trips")
df.select(example_expression(df["fare_amount"])).collect()