Spark Connect の概要

クライアントサイドの Spark アプリケーションの構築

Apache Spark 3.4 では、Spark Connect は、DataFrame API と未解決の論理プランをプロトコルとして使用して Spark クラスタへのリモート接続を可能にする、分離されたクライアント-サーバーアーキテクチャを導入しました。クライアントとサーバーの分離により、Spark とそのオープンなエコシステムをあらゆる場所から活用できるようになります。最新のデータアプリケーション、IDE、Notebook、およびプログラミング言語に組み込むことができます。

開始するには、クイックスタート: Spark Connect を参照してください。

Spark Connect API Diagram

Spark Connect の仕組み

Spark Connect クライアントライブラリは、Spark アプリケーションの開発を簡素化するように設計されています。これは、アプリケーションサーバー、IDE、ノートブック、およびプログラミング言語など、あらゆる場所に組み込むことができる薄い API です。Spark Connect API は、クライアントと Spark ドライバ間の言語に依存しないプロトコルとして、未解決の論理プランを使用して Spark の DataFrame API を基盤として構築されています。

Spark Connect クライアントは、DataFrame 操作をプロトコルバッファを使用してエンコードされた未解決の論理クエリプランに変換します。これらは gRPC フレームワークを使用してサーバーに送信されます。

Spark サーバーに組み込まれた Spark Connect エンドポイントは、未解決の論理プランを受信し、Spark の論理プラン演算子に変換します。これは、SQL クエリを解析するのと同様で、属性と関係が解析され、初期解析プランが構築されます。そこから、標準の Spark 実行プロセスが開始され、Spark Connect が Spark のすべての最適化と拡張機能を活用することが保証されます。結果は、Apache Arrow でエンコードされた行バッチとして gRPC を介してクライアントにストリーミングで返送されます。

Spark Connect communication

Spark Connect の運用上の利点

この新しいアーキテクチャにより、Spark Connect はいくつかのマルチテナント運用上の問題を軽減します。

安定性: 過剰なメモリを使用するアプリケーションは、独自のプロセスで実行できるため、独自環境のみに影響を与えるようになります。ユーザーはクライアントで独自の依存関係を定義でき、Spark ドライバとの潜在的な競合を心配する必要はありません。

アップグレード性: Spark ドライバは、パフォーマンスの向上やセキュリティ修正などのメリットを得るために、アプリケーションとは独立してシームレスにアップグレードできるようになりました。つまり、サーバー側の RPC 定義が後方互換性を持つように設計されている限り、アプリケーションは前方互換性を持つことができます。

デバッグ可能性と可観測性: Spark Connect は、お気に入りの IDE から直接開発中のインタラクティブなデバッグを可能にします。同様に、アプリケーションはアプリケーションのフレームワークネイティブなメトリクスとログライブラリを使用して監視できます。

Spark Connect の使用方法

Spark 3.4 以降、Spark Connect は利用可能であり、PySpark および Scala アプリケーションをサポートしています。Spark Connect を使用して Apache Spark サーバーを実行し、Spark Connect クライアントライブラリを使用してクライアントアプリケーションから接続する方法について説明します。

Spark Connect で Spark サーバーをダウンロードして起動する

まず、Apache Spark のダウンロードページから Spark をダウンロードします。Spark Connect は Apache Spark バージョン 3.4 で導入されたため、ページ上部のリリースドロップダウンで 3.4.0 以降を選択してください。次に、パッケージタイプ(通常は「Apache Hadoop 3.3 以降用にプリビルド」)を選択し、リンクをクリックしてダウンロードします。

次に、ダウンロードした Spark パッケージをコンピューター上に展開します。たとえば、

tar -xvf spark-3.5.1-bin-hadoop3.tgz

ターミナルウィンドウで、以前に Spark を展開した場所の spark フォルダに移動し、start-connect-server.sh スクリプトを実行して、次の例のように Spark Connect で Spark サーバーを起動します。

./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.1

Spark サーバーを起動する際には、Spark Connect パッケージ (spark-connect_2.12:3.5.1) を含めることに注意してください。これは Spark Connect を使用するために必要です。以前にダウンロードした Spark バージョンと同じバージョンのパッケージを使用してください。この例では、Spark 3.5.1 と Scala 2.12 を使用しています。

これで、Spark サーバーが起動し、クライアントアプリケーションからの Spark Connect セッションを受け入れる準備が整いました。次のセクションでは、クライアントアプリケーションを作成する際に Spark Connect を使用する方法について説明します。

インタラクティブ分析に Spark Connect を使用する

Spark セッションを作成する際には、Spark Connect を使用することを指定でき、以下に示すようにいくつかの方法があります。

ここで説明するメカニズムのいずれも使用しない場合、Spark セッションは以前とまったく同様に機能し、Spark Connect は利用されません。

SPARK_REMOTE 環境変数の設定

Spark クライアントアプリケーションが実行されているクライアントマシンで SPARK_REMOTE 環境変数を設定し、次の例のように新しい Spark セッションを作成すると、セッションは Spark Connect セッションになります。この方法では、Spark Connect を使用開始するためにコードを変更する必要はありません。

ターミナルウィンドウで、SPARK_REMOTE 環境変数を、以前にコンピューターで起動したローカル Spark サーバーを指すように設定します。

export SPARK_REMOTE="sc://localhost"

そして、通常どおりに Spark シェルを起動します。

./bin/pyspark

PySpark シェルは、ウェルカムメッセージに示されているように、Spark Connect を使用して Spark に接続されました。

Client connected to the Spark Connect server at localhost

Spark セッション作成時の Spark Connect の指定

Spark セッションを作成するときに、Spark Connect を明示的に使用するように指定することもできます。

たとえば、ここで示すように、Spark Connect を使用して PySpark シェルを起動できます。

Spark Connect を使用して PySpark シェルを起動するには、remote パラメータを含め、Spark サーバーの場所を指定するだけです。ここでは、以前に起動したローカル Spark サーバーに接続するために、この例では localhost を使用しています。

./bin/pyspark --remote "sc://localhost"

PySpark シェルのウェルカムメッセージには、Spark Connect を使用して Spark に接続したことが表示されます。

Client connected to the Spark Connect server at localhost

また、Spark セッションのタイプを確認することもできます。そこに .connect. が含まれている場合は、この例に示すように Spark Connect を使用しています。

SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>

これで、シェルで PySpark コードを実行して、Spark Connect の動作を確認できます。

>>> columns = ["id","name"]
>>> data = [(1,"Sarah"),(2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
|  1|Sarah|
|  2|Maria|
+---+-----+

Scala シェルの場合、現在 Apache Spark パッケージに含まれていない Ammonite ベースの REPL を使用します。

新しい Scala シェルをセットアップするには、まず Coursier CLI をダウンロードしてインストールします。次に、ターミナルウィンドウで次のコマンドを使用して REPL をインストールします。

cs install –-contrib spark-connect-repl

これで、次のように Ammonite ベースの Scala REPL/シェルを起動して Spark サーバーに接続できます。

spark-connect-repl

REPL が正常に初期化されると、挨拶メッセージが表示されます。

Spark session available as 'spark'.
   _____                  __      ______                            __
  / ___/____  ____ ______/ /__   / ____/___  ____  ____  ___  _____/ /_
  \__ \/ __ \/ __ `/ ___/ //_/  / /   / __ \/ __ \/ __ \/ _ \/ ___/ __/
 ___/ / /_/ / /_/ / /  / ,<    / /___/ /_/ / / / / / / /  __/ /__/ /_
/____/ .___/\__,_/_/  /_/|_|   \____/\____/_/ /_/_/ /_/\___/\___/\__/
    /_/

デフォルトでは、REPL はローカル Spark サーバーへの接続を試みます。シェルで次の Scala コードを実行して、Spark Connect の動作を確認します。

@ spark.range(10).count
res0: Long = 10L

クライアント-サーバー接続の構成

デフォルトでは、REPL はポート 15002 のローカル Spark サーバーへの接続を試みます。ただし、この接続は、この構成 リファレンス で説明されているように、いくつかの方法で構成できます。

SPARK_REMOTE 環境変数の設定

クライアントマシンで SPARK_REMOTE 環境変数を設定して、REPL の起動時に初期化されるクライアント-サーバー接続をカスタマイズできます。

export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
spark-connect-repl

または

SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl

CLI 引数の使用

カスタマイズは、以下に示すように CLI 引数を介して渡すこともできます。

spark-connect-repl --host myhost.com --port 443 --token ABCDEFG

サポートされている CLI 引数のリストは、こちらにあります。

接続文字列によるプログラム的な構成

この例のように、SparkSession#builder を使用してプログラムで接続を作成することもできます。

@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://localhost:443/;token=ABCDEFG").build()

スタンドアロンアプリケーションでの Spark Connect の使用

まず、pip install pyspark[connect]==3.5.0 を使用して PySpark をインストールするか、パッケージ化された PySpark アプリケーション/ライブラリを作成する場合は、setup.py ファイルに次のように追加します。

install_requires=[
'pyspark[connect]==3.5.0'
]

独自のコードを作成する場合は、この例のように、Spark セッションを作成するときに、Spark サーバーへの参照を含む remote 関数を含めます。

from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()

説明のために、簡単な Spark Connect アプリケーションである SimpleApp.py を作成します。

"""SimpleApp.py"""
from pyspark.sql import SparkSession

logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
spark = SparkSession.builder.remote("sc://localhost").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()

numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()

print("Lines with a: %i, lines with b: %i" % (numAs, numBs))

spark.stop()

このプログラムは、テキストファイルで 'a' を含む行の数と 'b' を含む行の数をカウントするだけです。YOUR_SPARK_HOME を Spark がインストールされている場所に置き換える必要があることに注意してください。

このアプリケーションは、次のように通常の Python インタープリターで実行できます。

# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 72, lines with b: 39

Scala アプリケーション/プロジェクトの一部として Spark Connect を使用するには、まず適切な依存関係を含める必要があります。sbt ビルドシステムを例として使用すると、次の依存関係を build.sbt ファイルに追加します。

libraryDependencies += "org.apache.spark" %% "spark-sql-api" % "3.5.0"
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "3.5.0"

独自のコードを作成する場合は、この例のように、Spark セッションを作成するときに、Spark サーバーへの参照を含む remote 関数を含めます。

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://localhost").build()

: UDF、フィルター、マップなどのユーザー定義コードを参照する操作では、必要なクラスファイルをピックアップしてアップロードするために、ClassFinder を登録する必要があります。また、JAR の依存関係は、SparkSession#AddArtifact を使用してサーバーにアップロードする必要があります。

import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// Register a ClassFinder to monitor and upload the classfiles from the build output.
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classfinder)

// Upload JAR dependencies
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)

ここで、ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR はビルドシステムがクラスファイルを書き込む出力ディレクトリであり、ABSOLUTE_PATH_JAR_DEP はローカルファイルシステム上の JAR の場所です。

REPLClassDirMonitor は、ClassFinder の提供された実装であり、特定のディレクトリを監視しますが、カスタム検索と監視のために ClassFinder を拡張した独自のクラスを実装することもできます。

クライアントアプリケーションの認証

Spark Connect には組み込みの認証機能はありませんが、既存の認証インフラストラクチャとシームレスに連携するように設計されています。gRPC HTTP/2 インターフェースにより、認証プロキシを使用できるため、Spark で直接認証ロジックを実装することなく、Spark Connect を安全に保護できます。

Spark 3.4 でサポートされている内容

PySpark: Spark 3.4 では、Spark Connect は、DataFrameFunctionsColumn を含む、ほとんどの PySpark API をサポートしています。ただし、SparkContextRDD などの一部の API はサポートされていません。現在サポートされている API は、API リファレンスドキュメントで確認できます。サポートされている API には「Supports Spark Connect」というラベルが付いているため、既存のコードを Spark Connect に移行する前に、使用している API が利用可能かどうかを確認できます。

Scala: Spark 3.5 では、Spark Connect は、DatasetfunctionsColumnCatalogKeyValueGroupedDataset を含む、ほとんどの Scala API をサポートしています。

ユーザー定義関数 (UDF) は、デフォルトではシェルで、追加の設定要件があるスタンドアロンアプリケーションでサポートされています。

ストリーミング API の大部分は、DataStreamReaderDataStreamWriterStreamingQueryStreamingQueryListener を含めてサポートされています。

SparkContextRDD などの API は、すべての Spark Connect バージョンで非推奨となっています。

より多くの API のサポートは、今後の Spark リリースで計画されています。