このページでは、Spark Connectアーキテクチャ、Spark Connectのメリット、およびSpark Connectへのアップグレード方法について説明します。

まず、Spark Connectのアーキテクチャの概要を説明します。

Spark Connectアーキテクチャの概要

Spark Connectは、クライアントアプリケーションがリモートのSpark Serverと通信する方法を規定するプロトコルです。Spark Connectプロトコルを実装したクライアントは、リモートのSpark Serverに接続してリクエストを行うことができ、JDBCドライバを使用してデータベースに接続するクライアントアプリケーションと非常によく似ています。クエリ` spark.table("some_table").limit(5) `は、単に結果を返す必要があります。このアーキテクチャにより、エンドユーザーは優れた開発エクスペリエンスを得ることができます。

Spark Connectの動作概要を以下に示します。

  1. クライアントとSpark Server間に接続が確立されます。
  2. クライアントは、DataFrameクエリを、どのように実行すべきかではなく、操作の意図を記述する未解決の論理プランに変換します。
  3. 未解決の論理プランはエンコードされ、Spark Serverに送信されます。
  4. Spark Serverはクエリを最適化して実行します。
  5. Spark Serverは結果をクライアントに送り返します。

Spark Connectの内部動作をより深く理解するために、これらのステップを詳しく見ていきましょう。

クライアントとSpark Server間の接続の確立

Spark Connectのネットワーク通信には、gRPCフレームワークを使用します。

gRPCは高性能で言語に依存しないため、Spark Connectは移植可能です。

DataFrameクエリを未解決の論理プランに変換する

クライアントはDataFrameクエリを解析し、未解決の論理プランに変換します。

次のDataFrameクエリがあるとします。` spark.table("some_table").limit(5) `。

クエリの未解決の論理プランを以下に示します。

== Parsed Logical Plan ==
GlobalLimit 5
+- LocalLimit 5
   +- SubqueryAlias spark_catalog.default.some_table
      +- UnresolvedRelation spark_catalog.default.some_table

クライアントは、未解決の論理プランを作成し、実行のためにSpark Serverに渡す役割を担っています。

未解決の論理プランをSpark Serverに送信する

未解決の論理プランは、ネットワーク経由で送信できるようにシリアル化する必要があります。Spark ConnectはProtocol Buffersを使用します。Protocol Buffersは、「言語に依存せず、プラットフォームに依存せず、拡張可能な構造化データのシリアル化メカニズム」です。

クライアントとSpark Serverは、異なるプログラミング言語や異なるソフトウェアバージョンを使用している可能性があるため、Protocol Buffersのような言語に依存しない形式で通信できる必要があります。

次に、Spark Serverがクエリを実行する方法を見てみましょう。

Spark Serverでのクエリの処理

Spark Serverは、未解決の論理プラン(Protocol Bufferの逆シリアル化後)を受け取り、他のクエリと同様に解析、最適化、および実行します。

Sparkは、クエリを実行する前に、未解決の論理プランに対して多くの最適化を実行します。これらの最適化はすべてSpark Serverで行われ、クライアントアプリケーションとは独立しています。

Spark Connectを使用すると、Sparkの強力なクエリ最適化機能を、SparkやJVMに依存しないクライアントでも活用できます。

結果をクライアントに送り返す

Spark Serverは、クエリを実行した後、結果をクライアントに送り返します。

結果は、Apache Arrowレコードバッチとしてクライアントに送信されます。1つのレコードバッチには、多くのデータ行が含まれています。

完全な結果は、一度にすべてではなく、レコードバッチの部分的なチャンクでクライアントにストリーミングされます。Spark Serverからクライアントへの結果のストリーミングにより、過剰なリクエストによって発生するメモリの問題を防ぎます。

Spark Connectの動作概要を図で示します。

Spark Connectのメリット

次に、Spark Connectアーキテクチャのメリットを見ていきましょう。

Spark Connectワークロードは、より簡単に維持できます。

Spark Connectを使用しない場合、クライアントとSpark Driverは同じソフトウェアバージョンを実行する必要があります。同じJava、Scala、およびその他の依存関係のバージョンが必要です。ローカルマシンでSparkプロジェクトを開発し、JARファイルとしてパッケージ化してクラウドにデプロイし、本番データセットで実行するとします。クラウドで使用されているものと同じ依存関係を使用して、ローカルマシンでJARファイルを作成する必要があります。Scala 2.13でJARファイルを作成する場合は、Scala 2.13でコンパイルされたSpark JARを使用してクラスタをプロビジョニングする必要があります。

Scala 2.12でJARファイルを作成していて、クラウドプロバイダーがScala 2.13で構築された新しいランタイムをリリースしたとします。Spark Connectを使用しない場合、ローカルでプロジェクトを更新する必要がありますが、これは困難な場合があります。たとえば、プロジェクトをScala 2.13に更新する場合は、すべてのプロジェクトの依存関係(および推移的な依存関係)もScala 2.13にアップグレードする必要があります。これらのJARファイルの一部が存在しない場合、アップグレードできません。

これとは対照的に、Spark ConnectはクライアントとSpark Driverを分離するため、クライアントを更新せずに、サーバー側の依存関係を含むSpark Driverを更新できます。これにより、Sparkプロジェクトの維持がはるかに容易になります。特に純粋なPythonワークロードの場合、クライアント側のJava依存関係からPythonを分離することで、Apache Sparkの全体的なユーザーエクスペリエンスが向上します。

Spark Connectを使用すると、JVM以外の言語でSpark Connectクライアントを構築できます。

Spark ConnectはクライアントとSpark Driverを分離するため、任意の言語でSpark Connectクライアントを作成できます。Java/Scalaに依存しないSpark Connectクライアントの例を以下に示します。

たとえば、Apache Spark ConnectクライアントであるGolang用spark-connect-goは、Spark Connectプロトコルを実装し、Javaに依存しません。このSpark Connectクライアントを使用して、JavaやSparkをインストールせずに、GoでSparkアプリケーションを開発できます。

spark-connect-goを使用してGoプログラミング言語でクエリを実行する方法を以下に示します。

spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
df, _ := spark.Sql("select * from my_cool_table where age > 42")
df.Show(100, false)

` df.Show() `が呼び出されると、spark-connect-goはクエリを未解決の論理プランに処理し、実行のためにSpark Driverに送信します。

spark-connect-goは、Spark Connectの分離された性質が、より良いエンドユーザーエクスペリエンスを可能にする方法の素晴らしい例です。

このアーキテクチャから恩恵を受けるのはGoだけではありません。

Spark Connectは、リモート開発とテストを改善します。

Spark Connectを使用すると、SSHなしでリモートクラスタ上のテキストエディタにSparkを埋め込む(「リモート開発」)こともできます。

Spark Connectを使用しない場合、Sparkを使用してテキストエディタにSparkを埋め込むには、ローカルで実行されているSparkセッションまたはリモートSpark DriverへのSSH接続が必要です。

Spark Connectを使用すると、SSHなしでテキストエディタに完全に埋め込まれた接続を使用して、リモートSpark Driverに接続できます。これにより、リモートSparkクラスタでVS Codeなどのテキストエディタでコードを開発する場合に、ユーザーエクスペリエンスが向上します。

Spark Connectを使用すると、ローカルSparkセッションからリモートSparkセッションへの切り替えが簡単です。接続文字列を変更するだけです。

Spark Connectにより、デバッグが容易になります。

Spark Connectを使用すると、IntelliJなどのテキストエディタをリモートSparkクラスタに接続し、デバッガーを使用してコードをステップ実行できます。ローカルマシンのテストデータセットと同様に、本番データセットで実行されているアプリケーションをデバッグできます。これは、IDEに組み込まれた高品質のデバッグツールを活用したい場合に特に、優れた開発エクスペリエンスを提供します。

Spark JVMは、テキストエディタと完全に統合されていないため、このデバッグエクスペリエンスを許可しません。Spark Connectを使用すると、リモートSparkワークフローの優れたデバッグエクスペリエンスを使用して、テキストエディタに緊密な統合を構築できます。Spark Connectセッションの接続文字列を切り替えるだけで、複雑なSparkアプリケーションをデプロイせずに、さまざまな実行環境でテストを実行するようにクライアントを簡単に構成できます。

Spark Connectはより安定しています。

Spark Connectを活用するクライアントアプリケーションの分離された性質により、クライアントの障害はSpark Driverから分離されるようになりました。つまり、クライアントアプリケーションが失敗した場合、その失敗モードは他のアプリケーションとは完全に独立しており、他のクライアントアプリケーションにサービスを提供し続ける可能性のある実行中のSpark Driverに影響を与えません。

Spark Connectへのアップグレード

Spark Connectは、すべてのSpark JVM APIをサポートしているわけではありません。たとえば、Spark JVMには、一部のユーザーがSparkクラスタで任意のJavaコードを実行するために利用するプライベートメソッドがあります。Spark Connectクライアントは必ずしもJavaを実行しているとは限らないため、Spark Connectは明らかにこれらのメソッドをサポートできません!

Spark JVMからSpark Connectへの移行に関するガイドを参照して、Spark Connectで動作するコードの書き方、および特殊なロジックを使用する方法について学習してください。

結論

Spark Connectは、本番環境でSparkを実行するための優れたアーキテクチャです。より柔軟性があり、保守が容易で、優れた開発エクスペリエンスを提供します。

一部のSpark JVMコードベースをSpark Connectに移行することは簡単ですが、他のコードベースでは移行が困難です。RDD APIを利用するか、プライベートSpark JVM関数を使用するコードベースは、移行がより困難です。

ただし、Spark JVMからSpark Connectへの移行は一度限りのコストであるため、移行後はすべてのメリットを享受できます。