Python パッケージ管理#

YARN、Kubernetes などのクラスターで PySpark アプリケーションを実行したい場合は、コードと使用されているすべてのライブラリがエクゼキューターで利用可能であることを確認する必要があります。

例として、Pandas UDF の例 を実行したいとしましょう。基盤となる実装として pyarrow を使用しているため、クラスターの各エクゼキューターに pyarrow がインストールされていることを確認する必要があります。そうしないと、ModuleNotFoundError: No module named 'pyarrow' のようなエラーが発生する可能性があります。

以下は、クラスターで実行される前の例のスクリプト app.py です。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession

def main(spark):
    df = spark.createDataFrame(
        [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
        ("id", "v"))

    @pandas_udf("double")
    def mean_udf(v: pd.Series) -> float:
        return v.mean()

    print(df.groupby("id").agg(mean_udf(df['v'])).collect())


if __name__ == "__main__":
    main(SparkSession.builder.getOrCreate())

クラスターで Python の依存関係を管理するには、いくつかの方法があります。

  • PySpark ネイティブ機能の使用

  • Conda を使用する

  • Virtualenv の使用

  • PEX の使用

PySpark ネイティブ機能の使用#

PySpark では、Python ファイル (.py)、ZIP された Python パッケージ (.zip)、および Egg ファイル (.egg) を、次のいずれかの方法でエクゼキューターにアップロードできます。

  • spark.submit.pyFiles 設定を設定する

  • Spark スクリプトで --py-files オプションを設定する

  • アプリケーションで pyspark.SparkContext.addPyFile() を直接呼び出す

これは、追加のカスタム Python コードをクラスターに送信するための簡単な方法です。個々のファイルを追加したり、パッケージ全体を zip してアップロードしたりできます。pyspark.SparkContext.addPyFile() を使用すると、ジョブを開始した後でもコードをアップロードできます。

ただし、Wheel としてビルドされたパッケージを追加することはできず、ネイティブコードを含む依存関係を含めることができません。

Conda の使用#

Conda は、最も広く使用されている Python パッケージ管理システムの一つです。PySpark ユーザーは、リロケート可能な Conda 環境を作成するコマンドラインツールである conda-pack を利用することで、Conda 環境を直接使用してサードパーティの Python パッケージを送信できます。

以下の例では、ドライバーとエクゼキューターの両方で使用する Conda 環境を作成し、それをアーカイブファイルにパックします。このアーカイブファイルは、Python の Conda 環境をキャプチャし、Python インタープリターとその関連するすべての依存関係の両方を格納します。

conda create -y -n pyspark_conda_env -c conda-forge pyarrow pandas conda-pack
conda activate pyspark_conda_env
conda pack -f -o pyspark_conda_env.tar.gz

その後、--archives オプションまたは spark.archives 設定 (spark.yarn.dist.archives in YARN) を使用して、スクリプトと一緒に、またはコード内で送信できます。エクゼキューターでアーカイブを自動的に展開します。

spark-submit スクリプトの場合、次のように使用できます。

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_conda_env.tar.gz#environment app.py

上記の PYSPARK_DRIVER_PYTHON は、YARN または Kubernetes のクラスターモードでは設定しないように注意してください。

通常の Python シェルまたはノートブックを使用している場合は、以下のように試すことができます。

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_conda_env.tar.gz#environment").getOrCreate()
main(spark)

pyspark シェルの場合

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_conda_env.tar.gz#environment

Virtualenv の使用#

Virtualenv は、分離された Python 環境を作成するための Python ツールです。Python 3.3 以降、その機能の一部が Python の標準ライブラリとして venv モジュールに統合されています。PySpark ユーザーは、conda-pack と同様の方法で venv-pack を使用して、クラスターで Python の依存関係を管理できます。

ドライバーとエクゼキューターの両方で使用する仮想環境は、以下のように作成できます。現在の仮想環境をアーカイブファイルにパックし、Python インタープリターと依存関係の両方を含みます。ただし、venv-pack は Python インタープリターをシンボリックリンクとしてパックする ため、クラスターのすべてのノードに同じ Python インタープリターがインストールされている必要があります。

python -m venv pyspark_venv
source pyspark_venv/bin/activate
pip install pyarrow pandas venv-pack
venv-pack -o pyspark_venv.tar.gz

--archives オプションまたは spark.archives 設定 (spark.yarn.dist.archives in YARN) を活用することで、アーカイブファイルを直接渡して展開し、エクゼキューターで環境を有効にできます。

spark-submit の場合、次のようにコマンドを実行して使用できます。また、Kubernetes または YARN クラスターモードでは PYSPARK_DRIVER_PYTHON を解除する必要があることに注意してください。

export PYSPARK_DRIVER_PYTHON=python # Do not set in cluster modes.
export PYSPARK_PYTHON=./environment/bin/python
spark-submit --archives pyspark_venv.tar.gz#environment app.py

通常の Python シェルまたはノートブックの場合

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",  # 'spark.yarn.dist.archives' in YARN.
    "pyspark_venv.tar.gz#environment").getOrCreate()
main(spark)

pyspark シェルの場合

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python
pyspark --archives pyspark_venv.tar.gz#environment

PEX の使用#

PySpark は、Python パッケージを一緒に送信するために PEX を使用することもできます。PEX は、自己完結型の Python 環境を作成するツールです。これは Conda や virtualenv に似ていますが、.pex ファイルはそれ自体で実行可能です。

以下の例では、ドライバーとエクゼキューターが使用する .pex ファイルを作成します。このファイルには、pex コマンドで指定された Python の依存関係が含まれています。

pip install pyarrow pandas pex
pex pyspark pyarrow pandas -o pyspark_pex_env.pex

このファイルは、通常の Python インタープリターと同様に動作します。

./pyspark_pex_env.pex -c "import pandas; print(pandas.__version__)"
1.1.5

ただし、.pex ファイルは内部的に Python インタープリターを含んでいないため、クラスターのすべてのノードに同じ Python インタープリターがインストールされている必要があります。

クラスターで .pex ファイルを転送して使用するには、それらはディレクトリまたはアーカイブファイルではなく通常のファイルであるため、spark.files 設定 (spark.yarn.dist.files in YARN) または --files オプションを介して送信する必要があります。

アプリケーションの送信の場合、以下のようにコマンドを実行します。YARN または Kubernetes のクラスターモードでは PYSPARK_DRIVER_PYTHON を設定しないように注意してください。

export PYSPARK_DRIVER_PYTHON=python  # Do not set in cluster modes.
export PYSPARK_PYTHON=./pyspark_pex_env.pex
spark-submit --files pyspark_pex_env.pex app.py

通常の Python シェルまたはノートブックの場合

import os
from pyspark.sql import SparkSession
from app import main

os.environ['PYSPARK_PYTHON'] = "./pyspark_pex_env.pex"
spark = SparkSession.builder.config(
    "spark.files",  # 'spark.yarn.dist.files' in YARN.
    "pyspark_pex_env.pex").getOrCreate()
main(spark)

インタラクティブな pyspark シェルの場合、コマンドはほぼ同じです。

export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./pyspark_pex_env.pex
pyspark --files pyspark_pex_env.pex

SparkSession.builder と PEX を使用したスタンドアロン PySpark のデプロイのためのエンドツーエンドの Docker 例は、こちらで見つけることができます。これは、PEX の上に構築されたライブラリである cluster-pack を使用しており、PEX を手動で作成およびアップロードするという中間ステップを自動化します。