Spark Connect の概要
クライアントサイドの Spark アプリケーションの構築
Apache Spark 3.4 では、Spark Connect がクライアント・サーバーアーキテクチャを分離し、DataFrame API と未解決論理プランをプロトコルとして使用することで、Spark クラスターへのリモート接続を可能にしました。クライアントとサーバーの分離により、Spark とそのオープンエコシステムをどこからでも活用できるようになります。モダンなデータアプリケーション、IDE、ノートブック、プログラミング言語に組み込むことができます。
開始するには、クイックスタート:Spark Connect を参照してください。
Spark Connect の仕組み
Spark Connect クライアントライブラリは、Spark アプリケーション開発を簡素化するように設計されています。これは、アプリケーションサーバー、IDE、ノートブック、プログラミング言語など、どこにでも組み込むことができる薄い API です。Spark Connect API は、Spark の DataFrame API を基盤とし、クライアントと Spark ドライバー間の言語非依存プロトコルとして未解決論理プランを使用します。
Spark Connect クライアントは、DataFrame 操作を未解決論理クエリプランに変換し、これらを Protocol Buffers を使用してエンコードします。これらは gRPC フレームワークを使用してサーバーに送信されます。
Spark サーバーに組み込まれた Spark Connect エンドポイントは、未解決論理プランを受信し、Spark の論理プランオペレータに変換します。これは、SQL クエリを解析するのと似ており、属性とリレーションが解析され、初期の解析プランが構築されます。そこから、標準の Spark 実行プロセスが開始され、Spark Connect が Spark のすべての最適化と機能強化を活用できるようにします。結果は、Apache Arrow でエンコードされた行バッチとして gRPC を介してクライアントにストリーミングバックされます。
Spark Connect クライアントアプリケーションと従来の Spark アプリケーションの違い
Spark Connect の主な設計目標の 1 つは、クライアントとサーバーの完全な分離と独立を可能にすることです。その結果、Spark Connect を使用する際に開発者が認識しておくべき変更がいくつかあります。
- クライアントは Spark ドライバーと同じプロセスで実行されません。これは、クライアントが実行環境を操作するためにドライバー JVM に直接アクセスして対話できないことを意味します。特に PySpark では、クライアントは Py4J を使用しないため、
DataFrame、Column、SparkSessionなどの JVM 実装を保持するプライベートフィールドにアクセスすることはできません(例:df._jdf)。 - 設計上、Spark Connect プロトコルは、サーバーで実行される操作を宣言的に記述できるように、Spark の論理プランを抽象化として使用します。したがって、Spark Connect プロトコルは Spark のすべての実行 API、特に RDD をサポートしていません。
- Spark Connect は、コンシューマー向けにセッションベースのクライアントを提供します。これは、クライアントがすべての接続済みクライアントの環境を操作するクラスターのプロパティにアクセスできないことを意味します。最も重要なのは、クライアントが静的な Spark 設定または SparkContext にアクセスできないことです。
Spark Connect の運用上のメリット
この新しいアーキテクチャにより、Spark Connect はいくつかのマルチテナント運用の問題を軽減します。
安定性:メモリを過剰に使用するアプリケーションは、独自のプロセスで実行できるため、その環境にのみ影響します。ユーザーは独自のクライアント依存関係を定義でき、Spark ドライバーとの潜在的な競合を心配する必要はありません。
アップグレード可能性:Spark ドライバーは、アプリケーションとは独立してシームレスにアップグレードできるようになり、パフォーマンスの向上やセキュリティ修正の恩恵を受けることができます。これは、サーバー側の RPC 定義が後方互換性を持つように設計されている限り、アプリケーションは前方互換性を持つことができることを意味します。
デバッグ可能性とオブザーバビリティ:Spark Connect を使用すると、お気に入りの IDE から直接、開発中にインタラクティブなデバッグが可能になります。同様に、アプリケーションは、アプリケーションのフレームワークネイティブのメトリクスとロギングライブラリを使用して監視できます。
Spark Connect の使用方法
Spark Connect は PySpark および Scala アプリケーションで利用可能であり、サポートされています。Apache Spark サーバーを Spark Connect で実行し、Spark Connect クライアントライブラリを使用してクライアントアプリケーションから接続する方法を説明します。
Spark Connect で Spark サーバーをダウンロードして起動する
まず、Apache Spark のダウンロードページから Spark をダウンロードします。ページ上部のリリースドロップダウンで最新リリースを選択します。次に、パッケージタイプを選択します。通常は「Apache Hadoop 3.3 以降向けに事前ビルド済み」を選択し、リンクをクリックしてダウンロードします。
次に、ダウンロードした Spark パッケージをコンピューターに展開します。例:
tar -xvf spark-4.0.0-bin-hadoop3.tgzターミナルウィンドウで、Spark を展開した場所にある spark フォルダーに移動し、start-connect-server.sh スクリプトを実行して Spark Connect で Spark サーバーを起動します。以下はその例です。
./sbin/start-connect-server.shダウンロードした Spark のバージョンと同じバージョンのパッケージを使用してください。この例では、Scala 2.13 を使用した Spark 4.0.0 です。
これで Spark サーバーが実行され、クライアントアプリケーションからの Spark Connect セッションを受け入れる準備ができました。次のセクションでは、クライアントアプリケーションの作成時に Spark Connect を使用する方法を説明します。
Spark Connect をインタラクティブ分析に使用する
Spark セッションを作成する際に、Spark Connect を使用することを指定できます。その方法はいくつかあります。
ここに記載されているメカニズムのいずれも使用しない場合、Spark セッションは Spark Connect を活用せずに、以前と同様に機能します。
SPARK_REMOTE 環境変数を設定する
Spark クライアントアプリケーションを実行しているクライアントマシンで SPARK_REMOTE 環境変数を設定し、以下の例のように新しい Spark セッションを作成すると、そのセッションは Spark Connect セッションになります。このアプローチでは、Spark Connect の使用を開始するためにコードの変更は必要ありません。
ターミナルウィンドウで、SPARK_REMOTE 環境変数を、以前コンピューターで起動したローカル Spark サーバーを指すように設定します。
export SPARK_REMOTE="sc://"そして、通常どおり Spark シェルを起動します。
./bin/pysparkPySpark シェルは、ウェルカムメッセージで示されているように、Spark Connect を使用して Spark に接続されています。
Client connected to the Spark Connect server at localhostSpark セッション作成時に Spark Connect を指定する
Spark セッションを作成する際に、Spark Connect を使用することを明示的に指定することもできます。
たとえば、以下に示すように、Spark Connect を使用して PySpark シェルを起動できます。
PySpark シェルを Spark Connect で起動するには、remote パラメータを含め、Spark サーバーの場所を指定するだけです。この例では、以前起動したローカル Spark サーバーに接続するために localhost を使用しています。
./bin/pyspark --remote "sc://"PySpark シェルのウェルカムメッセージで、Spark Connect を使用して Spark に接続したことが示されていることに気づくでしょう。
Client connected to the Spark Connect server at localhostSpark セッションのタイプを確認することもできます。もし .connect. が含まれていれば、Spark Connect を使用しています。以下はその例です。
SparkSession available as 'spark'.
>>> type(spark)
<class 'pyspark.sql.connect.session.SparkSession'>これで、シェルで PySpark コードを実行して、Spark Connect が動作していることを確認できます。
>>> columns = ["id", "name"]
>>> data = [(1,"Sarah"), (2,"Maria")]
>>> df = spark.createDataFrame(data).toDF(*columns)
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|Sarah|
| 2|Maria|
+---+-----+Scala シェルの場合、Ammonite ベースの REPL を使用します。それ以外は、PySpark シェルとほぼ同じです。
./bin/spark-shell --remote "sc://"REPL が正常に初期化されると、挨拶メッセージが表示されます。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.0.0-SNAPSHOT
/_/
Type in expressions to have them evaluated.
Spark session available as 'spark'.デフォルトでは、REPL はローカル Spark サーバーへの接続を試みます。以下の Scala コードをシェルで実行して、Spark Connect が動作していることを確認してください。
@ spark.range(10).count
res0: Long = 10Lクライアント・サーバー接続の設定
デフォルトでは、REPL はポート 15002 のローカル Spark サーバーへの接続を試みます。ただし、接続は、この接続文字列 リファレンスに記載されているいくつかの方法で構成できます。
SPARK_REMOTE 環境変数を設定する
クライアント・サーバー接続は、REPL 起動時に初期化されるクライアント・サーバー接続をカスタマイズするために、クライアントマシンで SPARK_REMOTE 環境変数を設定することで構成できます。
export SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG"
./bin/spark-shellまたは
SPARK_REMOTE="sc://myhost.com:443/;token=ABCDEFG" spark-connect-repl接続文字列でプログラム的に構成する
接続は、この例のように SparkSession#builder を使用してプログラムで作成することもできます。
@ import org.apache.spark.sql.SparkSession
@ val spark = SparkSession.builder.remote("sc://:443/;token=ABCDEFG").getOrCreate()スタンドアロンアプリケーションで Spark Connect を使用する
まず、pip install pyspark[connect]==4.0.0 で PySpark をインストールするか、パッケージ化された PySpark アプリケーション/ライブラリをビルドする場合は、setup.py ファイルに次のように追加します。
install_requires=[
'pyspark[connect]==4.0.0'
]独自のコードを作成する際は、Spark セッションを作成する際に、Spark サーバーへの参照とともに remote 関数を含めてください。以下はその例です。
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://").getOrCreate()説明のために、SimpleApp.py という簡単な Spark Connect アプリケーションを作成します。
"""SimpleApp.py"""
from pyspark.sql import SparkSession
logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
spark = SparkSession.builder.remote("sc://").appName("SimpleApp").getOrCreate()
logData = spark.read.text(logFile).cache()
numAs = logData.filter(logData.value.contains('a')).count()
numBs = logData.filter(logData.value.contains('b')).count()
print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
spark.stop()このプログラムは、テキストファイル内の「a」を含む行数と「b」を含む行数をカウントするだけです。YOUR_SPARK_HOME を Spark がインストールされている場所に置き換える必要があることに注意してください。
このアプリケーションは、通常の Python インタープリターで次のように実行できます。
# Use the Python interpreter to run your application
$ python SimpleApp.py
...
Lines with a: 72, lines with b: 39Scala アプリケーション/プロジェクトの一部として Spark Connect を使用するには、まず適切な依存関係を含める必要があります。sbt ビルドシステムを例として使用し、build.sbt ファイルに次の依存関係を追加します。
libraryDependencies += "org.apache.spark" %% "spark-connect-client-jvm" % "4.0.0"独自のコードを作成する際は、Spark セッションを作成する際に、Spark サーバーへの参照とともに remote 関数を含めてください。以下はその例です。
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().remote("sc://").getOrCreate()注意:UDF、filter、map などのユーザー定義コードを参照する操作には、必要なクラスファイルをピックアップしてアップロードするために登録された ClassFinder が必要です。また、JAR 依存関係は SparkSession#AddArtifact を使用してサーバーにアップロードする必要があります。
例
import org.apache.spark.sql.connect.client.REPLClassDirMonitor
// Register a ClassFinder to monitor and upload the classfiles from the build output.
val classFinder = new REPLClassDirMonitor(<ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR>)
spark.registerClassFinder(classFinder)
// Upload JAR dependencies
spark.addArtifact(<ABSOLUTE_PATH_JAR_DEP>)ここで、ABSOLUTE_PATH_TO_BUILD_OUTPUT_DIR は、ビルドシステムがクラスファイルを書き込む出力ディレクトリであり、ABSOLUTE_PATH_JAR_DEP はローカルファイルシステム上の JAR の場所です。
REPLClassDirMonitor は ClassFinder の提供された実装であり、特定のディレクトリを監視しますが、カスタマイズされた検索と監視のために ClassFinder を拡張する独自のクラスを実装することもできます。
Spark Connect を使用したアプリケーション開発、およびカスタム機能による Spark Connect の拡張の詳細については、Spark Connect を使用したアプリケーション開発を参照してください。
クライアントアプリケーションの認証
Spark Connect には組み込みの認証機能はありませんが、既存の認証インフラストラクチャとシームレスに連携するように設計されています。その gRPC HTTP/2 インターフェースにより、認証プロキシの使用が可能になり、Spark 自体に認証ロジックを実装することなく Spark Connect を保護できます。
サポートされているもの
PySpark:Spark 3.4 以降、Spark Connect は、DataFrame、Functions、Column を含むほとんどの PySpark API をサポートしています。ただし、SparkContext や RDD などの一部の API はサポートされていません。現在サポートされている API は、API リファレンス ドキュメントで確認できます。サポートされている API には「Spark Connect をサポート」とラベル付けされているため、既存のコードを Spark Connect に移行する前に、使用している API が利用可能かどうかを確認できます。
Scala:Spark 3.5 以降、Spark Connect は、Dataset、functions、Column、Catalog、KeyValueGroupedDataset を含むほとんどの Scala API をサポートしています。
ユーザー定義関数(UDF)は、シェルではデフォルトでサポートされ、スタンドアロンアプリケーションでは追加の設定要件があります。
DataStreamReader、DataStreamWriter、StreamingQuery、StreamingQueryListener を含む、ストリーミング API の大部分がサポートされています。
SparkContext や RDD などの API は、Spark Connect ではサポートされていません。
今後リリースされる Spark では、より多くの API のサポートが計画されています。