このページでは、Spark Connect のアーキテクチャ、Spark Connect の利点、および Spark Connect へのアップグレード方法について説明します。

まず、Spark Connect のアーキテクチャを全体像から見ていきましょう。

Spark Connect の全体像アーキテクチャ

Spark Connect は、クライアントアプリケーションがリモート Spark Server と通信する方法を指定するプロトコルです。Spark Connect プロトコルを実装するクライアントは、JDBC ドライバーを使用してクライアントアプリケーションがデータベースに接続するのと非常によく似た方法で、リモート Spark Server に接続し、リクエストを行うことができます。クエリ spark.table("some_table").limit(5) は、単純に結果を返すはずです。このアーキテクチャは、エンドユーザーに優れた開発者エクスペリエンスを提供します。

Spark Connect の全体像は次のようになっています。

  1. クライアントと Spark Server 間に接続が確立されます。
  2. クライアントは DataFrame クエリを実行方法ではなく、操作の意図を表す解決されていない論理プランに変換します。
  3. 解決されていない論理プランはエンコードされ、Spark Server に送信されます。
  4. Spark Server はクエリを最適化して実行します。
  5. Spark Server は結果をクライアントに返します。

Spark Connect の内部動作をより深く理解するために、これらのステップをさらに詳しく見ていきましょう。

クライアントと Spark Server 間の接続の確立

Spark Connect のネットワーク通信は、gRPC フレームワークを使用します。

gRPC はパフォーマンスが高く、言語に依存しないため、Spark Connect をポータブルにすることができます。

DataFrame クエリを解決されていない論理プランに変換する

クライアントは DataFrame クエリを解析し、解決されていない論理プランに変換します。

次のような DataFrame クエリがあるとします。spark.table("some_table").limit(5)

クエリの解決されていない論理プランは次のようになります。

== Parsed Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
   +- SubqueryAlias spark_catalog.default.some_table
      +- UnresolvedRelation spark_catalog.default.some_table

クライアントは解決されていない論理プランの作成を担当し、実行のために Spark Server に渡します。

解決されていない論理プランを Spark Server に送信する

解決されていない論理プランは、ネットワーク経由で送信できるようにシリアライズする必要があります。Spark Connect は、構造化データをシリアライズするための「言語ニュートラル、プラットフォームニュートラルで拡張可能なメカニズム」である Protocol Buffers を使用します。

クライアントと Spark Server は、異なるプログラミング言語や異なるソフトウェアバージョンを使用している可能性があるため、Protocol Buffers のような言語ニュートラルな形式で通信できる必要があります。

次に、Spark Server がクエリをどのように実行するかを見てみましょう。

Spark Server でのクエリの実行

Spark Server は、解決されていない論理プラン(Protocol Buffer がデシリアライズされた後)を受信し、他のクエリと同様に解析、最適化、実行します。

Spark は、クエリを実行する前に、解決されていない論理プランに対して多くの最適化を行います。これらの最適化はすべて Spark Server で行われ、クライアントアプリケーションとは独立しています。

Spark Connect を使用すると、Spark や JVM に依存しないクライアントでも、Spark の強力なクエリ最適化機能を利用できます。

結果をクライアントに返す

Spark Server は、クエリを実行した後、結果をクライアントに返します。

結果は Apache Arrow レコードバッチとしてクライアントに送信されます。単一のレコードバッチには多くの行のデータが含まれます。

結果全体は、一度にすべてではなく、レコードバッチの断片的なチャンクでクライアントにストリーミングされます。Spark Server からクライアントへの結果のストリーミングは、過度に大きなリクエストによるメモリの問題を防ぎます。

Spark Connect の動作概要を画像で示します。

Spark Connect の利点

次に、Spark Connect アーキテクチャの利点に注目しましょう。

Spark Connect ワークロードの保守が容易になります。

Spark Connect を使用しない場合、クライアントと Spark Driver は同じソフトウェアバージョンを実行する必要があります。同じ Java、Scala、その他の依存関係バージョンが必要です。ローカルマシンで Spark プロジェクトを開発し、JAR ファイルとしてパッケージ化して、本番データセットで実行するためにクラウドにデプロイするとします。クラウドで使用されているものと同じ依存関係で、ローカルマシンで JAR ファイルをビルドする必要があります。Scala 2.13 で JAR ファイルをコンパイルした場合、クラスターにも Scala 2.13 でコンパイルされた Spark JAR をプロビジョニングする必要があります。

JAR を Scala 2.12 でビルドしていて、クラウドプロバイダーが Scala 2.13 でビルドされた新しいランタイムをリリースしたとします。Spark Connect を使用しない場合、ローカルでプロジェクトを更新する必要があります。これは困難な場合があります。たとえば、プロジェクトを Scala 2.13 に更新すると、プロジェクトのすべての依存関係(および推移的依存関係)も Scala 2.13 にアップグレードする必要があります。それらの JAR ファイルのいくつかが存在しない場合、アップグレードできません。

対照的に、Spark Connect はクライアントと Spark Driver を分離するため、クライアントを更新せずに Spark Driver(サーバーサイドの依存関係を含む)を更新できます。これにより、Spark プロジェクトの保守が大幅に容易になります。特に、純粋な Python ワークロードの場合、クライアント側の Java 依存関係から Python を分離することで、Apache Spark の全体的なユーザーエクスペリエンスが向上します。

Spark Connect を使用すると、非 JVM 言語で Spark Connect クライアントを構築できます。

Spark Connect はクライアントと Spark Driver を分離するため、任意の言語で Spark Connect クライアントを記述できます。以下は、Java/Scala に依存しない Spark Connect クライアントの例です。

たとえば、Golang 用の Apache Spark Connect クライアントである spark-connect-go は、Spark Connect プロトコルを実装しており、Java に依存しません。この Spark Connect クライアントを使用して、Java または Spark をインストールせずに Go で Spark アプリケーションを開発できます。

spark-connect-go を使用して Go プログラミング言語でクエリを実行する方法は次のとおりです。

spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
df, _ := spark.Sql("select * from my_cool_table where age > 42")
df.Show(100, false)

df.Show() が呼び出されると、spark-connect-go はクエリを解決されていない論理プランに処理し、実行のために Spark Driver に送信します。

spark-connect-go は、Spark Connect の分離された性質がより良いエンドユーザーエクスペリエンスを可能にするという素晴らしい例です。

Go だけがこのアーキテクチャから恩恵を受ける言語ではありません。

Spark Connect は、リモート開発とテストを向上させます。

Spark Connect は、SSH なしでリモートクラスター上のテキストエディターに Spark を埋め込むことも可能にします(「リモート開発」)。

Spark Connect を使用しない場合、Spark をテキストエディターに埋め込むには、ローカルで実行されている Spark Session またはリモート Spark Driver への SSH 接続が必要です。

Spark Connect を使用すると、SSH なしでテキストエディターに完全に埋め込まれた接続でリモート Spark Driver に接続できます。これにより、VS Code のようなテキストエディターでリモート Spark クラスター上でコードを開発する際のユーザーエクスペリエンスが向上します。

Spark Connect を使用すると、ローカル Spark Session からリモート Spark Session への切り替えが簡単です。接続文字列を変更するだけです。

Spark Connect はデバッグを容易にします。

Spark Connect を使用すると、IntelliJ のようなテキストエディターをリモート Spark クラスターに接続し、デバッガーを使用してコードをステップ実行できます。ローカルマシンでのテストデータセットの場合と同様に、本番データセットで実行されているアプリケーションをデバッグできます。これは、IDE に組み込まれた高品質のデバッグツールを活用したい場合に、特に優れた開発者エクスペリエンスを提供します。

Spark JVM は、テキストエディターと完全に統合されていないため、このデバッグエクスペリエンスを許可しません。Spark Connect を使用すると、リモート Spark ワークフロー向けの優れたデバッグエクスペリエンスを備えたテキストエディターで緊密な統合を構築できます。Spark Connect セッションの接続文字列を切り替えるだけで、クライアントをさまざまな実行環境でテストを実行するように簡単に構成でき、複雑な Spark アプリケーションをデプロイする必要はありません。

Spark Connect はより安定しています。

Spark Connect を利用するクライアントアプリケーションの分離された性質により、クライアントの障害は Spark Driver から分離されました。これは、クライアントアプリケーションが失敗した場合、その障害モードは他のアプリケーションとは完全に独立しており、他のクライアントアプリケーションにサービスを提供し続ける可能性のある実行中の Spark Driver に影響を与えないことを意味します。

Spark Connect へのアップグレード

Spark Connect は、すべての Spark JVM API をサポートしているわけではありません。たとえば、Spark JVM には、一部のユーザーが Spark クラスターで任意の Java コードを実行するために利用するプライベートメソッドがあります。Spark Connect クライアントは必ずしも Java を実行しているわけではないため、Spark Connect は当然ながらこれらのメソッドをサポートできません!

Spark JVM から Spark Connect への移行に関するガイドを確認して、Spark Connect で動作するコードの記述方法について詳しく学んでください。また、Spark Connect カスタム拡張機能の構築方法を確認して、専門的なロジックの使用方法を学んでください。

結論

Spark Connect は、本番環境で Spark を実行するためのより優れたアーキテクチャです。より柔軟で、保守が容易で、より良い開発者エクスペリエンスを提供します。

一部の Spark JVM コードベースを Spark Connect に移行するのは簡単ですが、他のコードベースでは移行が困難です。RDD API を利用するコードベースや、プライベート Spark JVM 関数を使用するコードベースは、移行がより困難です。

しかし、Spark JVM から Spark Connect への移行は一度限りのコストであるため、移行すればすべてのメリットを享受できます。