Spark Connect によるアプリケーション開発
Spark Connect の概要
Apache Spark 3.4 では、Spark Connect は、DataFrame API と解決されていない論理プランをプロトコルとして使用して、Spark クラスターへのリモート接続を可能にする、分離されたクライアント・サーバー アーキテクチャを導入しました。クライアントとサーバーの分離により、Spark とそのオープン エコシステムはどこからでも活用できるようになります。最新のデータ アプリケーション、IDE、Notebook、およびプログラミング言語に組み込むことができます。
Spark Connect の詳細については、「Spark Connect の概要」を参照してください。
Spark Connect を使用した Spark アプリケーションの再定義
分離されたクライアント・サーバー アーキテクチャにより、Spark Connect は Spark アプリケーションの開発方法を簡素化します。Spark クライアント アプリケーションと Spark サーバー ライブラリという概念が次のように導入されます。
- Spark クライアント アプリケーション は、分散データ処理のために Spark とその豊富なエコシステムを使用する通常の Spark アプリケーションです。例としては、ETL パイプライン、データ準備、モデルのトレーニングと推論などがあります。
- Spark サーバー ライブラリ は、Spark の機能を拡張、拡張、補完します。たとえば、MLlib (Spark の強力な分散処理を使用する分散 ML ライブラリ) です。Spark Connect は、Spark サーバー ライブラリのクライアント側インターフェイスを公開するように拡張できます。
Spark 3.4 および Spark Connect では、Spark クライアント アプリケーションの開発が簡素化され、Spark サーバー ライブラリの構築方法に関する明確な拡張ポイントとガイドラインが提供されており、両方のタイプのアプリケーションが Spark と共に進化しやすくなっています。図 1 に示すように、Spark クライアント アプリケーションは Spark Connect API を使用して Spark に接続します。これは基本的に DataFrame API であり、完全に宣言型です。
Spark サーバー ライブラリは Spark を拡張します。通常、Spark Connect API の一部としてクライアント アプリケーションに公開される、Spark と統合された追加のサーバー側ロジックを提供します。これは Spark Connect 拡張ポイントを使用します。たとえば、Spark サーバー ライブラリ は、カスタム サービス側ロジック (カスタム ライブラリ プラグインというラベルの付いた青いボックスで示される) で構成され、Spark Connect API の一部として青いボックスを介してクライアントに公開されます。クライアントは、PySpark や Spark Scala クライアントなどと組み合わせてこの API を使用し、Spark クライアント アプリケーションがカスタム ロジック/ライブラリと連携しやすくなります。
Spark API モード: Spark クライアントと Spark Classic
Spark は API モードである spark.api.mode 設定を提供しており、Spark Classic アプリケーションを Spark Connect にシームレスに切り替えることができます。 spark.api.mode の値に応じて、アプリケーションは Spark Classic モードまたは Spark Connect モードのいずれかで実行できます。例を以下に示します。
from pyspark.sql import SparkSession
SparkSession.builder.config("spark.api.mode", "connect").master("...").getOrCreate()アプリケーションを送信する際に、この設定を Scala および PySpark アプリケーションに適用することもできます。
spark-submit --master "..." --conf spark.api.mode=connectさらに、Spark Connect はローカル テストのための便利なオプションを提供します。spark.remote を local[...] または local-cluster[...] に設定することで、ローカル Spark Connect サーバーを起動し、Spark Connect セッションにアクセスできます。
これは、--master ... と共に --conf spark.api.mode=connect を使用するのと似ています。ただし、spark.remote および --remote は local* 値に限定されるのに対し、--master ... と共に --conf spark.api.mode=connect は、Spark Classic との互換性を広げるために、spark:// などの追加のクラスター URL をサポートすることに注意してください。
Spark クライアント アプリケーション
Spark クライアント アプリケーションは、Spark ユーザーが現在開発している通常の Spark アプリケーションです。たとえば、ETL パイプライン、データ準備、モデルのトレーニングまたは推論などです。これらは通常、Spark の宣言型 DataFrame および DataSet API を使用して構築されます。Spark Connect では、コアの動作は同じですが、いくつかの違いがあります。
- 低レベルの非宣言型 API (RDD) は、Spark クライアント アプリケーションから直接使用できなくなりました。RDD 機能の欠落に対する代替手段は、高レベルの DataFrame API の一部として提供されます。
- クライアント アプリケーションは、Spark ドライバー JVM に直接アクセスできなくなりました。クライアントとサーバーは完全に分離されています。
Spark Connect に基づくクライアント アプリケーションは、以前のジョブと同じ方法で送信できます。さらに、Spark Connect に基づく Spark クライアント アプリケーションには、以前の Spark バージョン (3.4 以前) を使用する従来の Spark アプリケーションと比較していくつかの利点があります。
- アップグレード可能性: Spark Connect API がサーバー側の変更/改善を抽象化するため、新しい Spark サーバー バージョンへのアップグレードはシームレスです。クライアント API とサーバー API はきれいに分離されています。
- シンプルさ: ユーザーに公開される API の数は 3 から 2 に減少しました。Spark Connect API は完全に宣言型であり、したがって SQL に慣れた新規ユーザーにとって学習が容易です。
- 安定性: Spark Connect を使用する場合、クライアント アプリケーションは Spark ドライバー上で実行されなくなるため、サーバー側の不安定性を引き起こしたり、影響を受けたりすることはありません。
- リモート接続: 分離されたアーキテクチャにより、SQL および JDBC 以外の Spark へのリモート接続が可能になります。あらゆるアプリケーションが「サービスとしての Spark」を対話的に使用できるようになりました。
- 下位互換性: Spark Connect API は、RDD の使用を除いて、以前の Spark バージョンとコード互換性があります。RDD の使用については、Spark Connect で代替 API のリストが提供されています。
Spark サーバー ライブラリ
Spark 3.4 まで、Spark への拡張 (たとえば、Spark ML または Spark-NLP) は、Spark クライアント アプリケーションのようにビルドおよびデプロイされていました。Spark 3.4 および Spark Connect では、Spark サーバー ライブラリを介して Spark を拡張するための明示的な拡張ポイントが提供されています。SparkSession extensions や Spark Plugins などの Spark の既存の拡張ポイントとは異なり、これらの拡張ポイントはクライアントに公開できる機能を提供します。
開始方法: Spark サーバー ライブラリで Spark を拡張する
Spark Connect は利用可能であり、PySpark および Scala アプリケーションをサポートしています。Spark Connect を使用した Apache Spark サーバーの実行方法と、Spark Connect クライアント ライブラリを使用してクライアント アプリケーションからそれに接続する方法を説明します。
Spark サーバー ライブラリは、図 2 に示す次のコンポーネントで構成されます。
- Spark Connect プロトコル拡張 (青いボックス Proto API)
- Spark Connect プラグイン。
- Spark を拡張するアプリケーション ロジック。
- Spark サーバー ライブラリ アプリケーション ロジックを、PySpark または Scala Spark クライアントと共に Spark クライアント アプリケーションに公開するクライアント パッケージ。
(1) Spark Connect プロトコル拡張
新しい Spark サーバー ライブラリで Spark を拡張するために、開発者は Spark Connect プロトコルの 3 つの主要な操作タイプである Relation、Expression、および Command を拡張できます。
message Relation {
oneof rel_type {
Read read = 1;
// ...
google.protobuf.Any extension = 998;
}
}
message Expression {
oneof expr_type {
Literal literal = 1;
// ...
google.protobuf.Any extension = 999;
}
}
message Command {
oneof command_type {
WriteCommand write_command = 1;
// ...
google.protobuf.Any extension = 999;
}
} 拡張フィールドにより、Spark Connect プロトコルの一部として任意の protobuf メッセージをシリアル化できます。これらのメッセージは、拡張実装のパラメーターまたは状態を表します。カスタム式タイプをビルドするには、開発者はまず式のカスタム protobuf 定義を定義します。
message ExamplePluginExpression {
Expression child = 1;
string custom_field = 2;
}(2) (3) カスタム アプリケーション ロジックを備えた Spark Connect プラグイン実装
次のステップとして、開発者は、protobuf メッセージの入力パラメーターに基づいたカスタム アプリケーション ロジックで Spark Connect の ExpressionPlugin クラスを実装します。
class ExampleExpressionPlugin extends ExpressionPlugin {
override def transform(
relation: protobuf.Any,
planner: SparkConnectPlanner): Option[Expression] = {
// Check if the serialized value of protobuf.Any matches the type
// of our example expression.
if (!relation.is(classOf[proto.ExamplePluginExpression])) {
return None
}
val exp = relation.unpack(classOf[proto.ExamplePluginExpression])
Some(Alias(planner.transformExpression(
exp.getChild), exp.getCustomField)(explicitMetadata = None))
}
}アプリケーション ロジックが開発されたら、コードを jar としてパッケージ化し、Spark が追加ロジックを認識するように構成する必要があります。関連する Spark 設定オプションは次のとおりです。
- spark.jars は、カスタム式のビルド済みアプリケーション ロジックを含む Jar ファイルの場所を定義します。
- spark.connect.extensions.expression.classes は、Spark によってロードされる各式拡張の完全なクラス名を指定します。これらの設定オプションに基づいて、Spark は起動時に値をロードし、処理のために利用可能にします。
(4) Spark サーバー ライブラリ クライアント パッケージ
サーバー コンポーネントがデプロイされると、任意のクライアントが適切な protobuf メッセージを使用して使用できます。上記の例では、Spark Connect エンドポイントに送信された次のメッセージ ペイロードがあれば、拡張メカニズムをトリガーするのに十分です。
{
"project": {
"input": {
"sql": {
"query": "select * from samples.nyctaxi.trips"
}
},
"expressions": [
{
"extension": {
"typeUrl": "type.googleapis.com/spark.connect.ExamplePluginExpression",
"value": "\n\006\022\004\n\002id\022\006testval"
}
}
]
}
} 例を Python で利用できるようにするために、アプリケーション開発者は、新しい式をラップし、それを PySpark に埋め込む Python ライブラリを提供します。任意の式に機能を提供する最も簡単な方法は、PySpark 列インスタンスを引数として受け取り、式が適用された新しい列インスタンスを返すことです。
from pyspark.sql.connect.column import Expression
import pyspark.sql.connect.proto as proto
from myxample.proto import ExamplePluginExpression
# Internal class that satisfies the interface by the Python client
# of Spark Connect to generate the protobuf representation from
# an instance of the expression.
class ExampleExpression(Expression):
def to_plan(self, session) -> proto.Expression:
fun = proto.Expression()
plugin = ExamplePluginExpression()
plugin.child.literal.long = 10
plugin.custom_field = "example"
fun.extension.Pack(plugin)
return fun
# Defining the function to be used from the consumers.
def example_expression(col: Column) -> Column:
return Column(ExampleExpression())
# Using the expression in the Spark Connect client code.
df = spark.read.table("samples.nyctaxi.trips")
df.select(example_expression(df["fare_amount"])).collect()