Python パッケージ管理#
YARN、Kubernetes などのクラスターで PySpark アプリケーションを実行したい場合は、コードと使用されているすべてのライブラリがエクゼキューターで利用可能であることを確認する必要があります。
例として、Pandas UDF の例 を実行したいとしましょう。基盤となる実装として pyarrow を使用しているため、クラスターの各エクゼキューターに pyarrow がインストールされていることを確認する必要があります。そうしないと、ModuleNotFoundError: No module named 'pyarrow' のようなエラーが発生する可能性があります。
以下は、クラスターで実行される前の例のスクリプト app.py です。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
def main(spark):
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
print(df.groupby("id").agg(mean_udf(df['v'])).collect())
if __name__ == "__main__":
main(SparkSession.builder.getOrCreate())
クラスターで Python の依存関係を管理するには、いくつかの方法があります。
PySpark ネイティブ機能の使用
Conda を使用する
Virtualenv の使用
PEX の使用
PySpark ネイティブ機能の使用#
PySpark では、Python ファイル (.py)、ZIP された Python パッケージ (.zip)、および Egg ファイル (.egg) を、次のいずれかの方法でエクゼキューターにアップロードできます。
spark.submit.pyFiles設定を設定するSpark スクリプトで
--py-filesオプションを設定するアプリケーションで
pyspark.SparkContext.addPyFile()を直接呼び出す
これは、追加のカスタム Python コードをクラスターに送信するための簡単な方法です。個々のファイルを追加したり、パッケージ全体を zip してアップロードしたりできます。pyspark.SparkContext.addPyFile() を使用すると、ジョブを開始した後でもコードをアップロードできます。
ただし、Wheel としてビルドされたパッケージを追加することはできず、ネイティブコードを含む依存関係を含めることができません。
Conda の使用#
Conda は、最も広く使用されている Python パッケージ管理システムの一つです。PySpark ユーザーは、リロケート可能な Conda 環境を作成するコマンドラインツールである conda-pack を利用することで、Conda 環境を直接使用してサードパーティの Python パッケージを送信できます。
以下の例では、ドライバーとエクゼキューターの両方で使用する Conda 環境を作成し、それをアーカイブファイルにパックします。このアーカイブファイルは、Python の Conda 環境をキャプチャし、Python インタープリターとその関連するすべての依存関係の両方を格納します。
conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz
その後、--archives オプションまたは spark.archives 設定 (spark.yarn.dist.archives in YARN) を使用して、スクリプトと一緒に、またはコード内で送信できます。エクゼキューターでアーカイブを自動的に展開します。
spark-submit スクリプトの場合、次のように使用できます。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py
上記の PYSPARK_DRIVER_PYTHON は、YARN または Kubernetes のクラスターモードでは設定しないように注意してください。
通常の Python シェルまたはノートブックを使用している場合は、以下のように試すことができます。
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)
pyspark シェルの場合
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment
Virtualenv の使用#
Virtualenv は、分離された Python 環境を作成するための Python ツールです。Python 3.3 以降、その機能の一部が Python の標準ライブラリとして venv モジュールに統合されています。PySpark ユーザーは、conda-pack と同様の方法で venv-pack を使用して、クラスターで Python の依存関係を管理できます。
ドライバーとエクゼキューターの両方で使用する仮想環境は、以下のように作成できます。現在の仮想環境をアーカイブファイルにパックし、Python インタープリターと依存関係の両方を含みます。ただし、venv-pack は Python インタープリターをシンボリックリンクとしてパックする ため、クラスターのすべてのノードに同じ Python インタープリターがインストールされている必要があります。
python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz
--archives オプションまたは spark.archives 設定 (spark.yarn.dist.archives in YARN) を活用することで、アーカイブファイルを直接渡して展開し、エクゼキューターで環境を有効にできます。
spark-submit の場合、次のようにコマンドを実行して使用できます。また、Kubernetes または YARN クラスターモードでは PYSPARK_DRIVER_PYTHON を解除する必要があることに注意してください。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py
通常の Python シェルまたはノートブックの場合
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
"spark.archives", # 'spark.yarn.dist.archives' in YARN.
"pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)
pyspark シェルの場合
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment
PEX の使用#
PySpark は、Python パッケージを一緒に送信するために PEX を使用することもできます。PEX は、自己完結型の Python 環境を作成するツールです。これは Conda や virtualenv に似ていますが、.pex ファイルはそれ自体で実行可能です。
以下の例では、ドライバーとエクゼキューターが使用する .pex ファイルを作成します。このファイルには、pex コマンドで指定された Python の依存関係が含まれています。
pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex
このファイルは、通常の Python インタープリターと同様に動作します。
./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5
ただし、.pex ファイルは内部的に Python インタープリターを含んでいないため、クラスターのすべてのノードに同じ Python インタープリターがインストールされている必要があります。
クラスターで .pex ファイルを転送して使用するには、それらはディレクトリまたはアーカイブファイルではなく通常のファイルであるため、spark.files 設定 (spark.yarn.dist.files in YARN) または --files オプションを介して送信する必要があります。
アプリケーションの送信の場合、以下のようにコマンドを実行します。YARN または Kubernetes のクラスターモードでは PYSPARK_DRIVER_PYTHON を設定しないように注意してください。
export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py
通常の Python シェルまたはノートブックの場合
import os
from pyspark.sql import SparkSession
from app import main
os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
"spark.files", # 'spark.yarn.dist.files' in YARN.
"pyspark_pex_env.pex").getOrCreate()
main(spark)
インタラクティブな pyspark シェルの場合、コマンドはほぼ同じです。
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex
SparkSession.builder と PEX を使用したスタンドアロン PySpark のデプロイのためのエンドツーエンドの Docker 例は、こちらで見つけることができます。これは、PEX の上に構築されたライブラリである cluster-pack を使用しており、PEX を手動で作成およびアップロードするという中間ステップを自動化します。