PySpark への貢献#

貢献には、他のユーザーの支援、リリース テスト、変更レビュー、ドキュメント作成、バグ報告、JIRA 管理、コード変更など、さまざまな種類があります。これらは 一般的なガイドライン に文書化されています。このページでは PySpark に焦点を当て、PySpark 固有の追加情報を含みます。

リリース テストによる貢献#

公式リリース前に、PySpark のリリース候補版は dev@spark.apache.org メーリングリストで投票のために共有されます。これらのリリース候補版は pip を介して簡単にインストールできます。たとえば、Spark 3.0.0 RC1 の場合、以下のようにインストールできます。

pip install https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin/pyspark-3.0.0.tar.gz

https://dist.apache.org/repos/dist/dev/spark/v3.0.0-rc1-bin のようなリリースファイルへのリンクは、投票スレッドで見つけることができます。

リリース候補版に対してユーザーの既存のワークロードをテストおよび検証することは、PySpark への重要な貢献の 1 つです。これにより、公式リリース前にユーザーの既存のワークロードが壊れるのを防ぎます。リグレッション、正確性の問題、またはリリース候補版の却下に値するほどのパフォーマンス低下などの問題が発生した場合、通常はリリース候補版が却下され、コミュニティは次のリリース候補版に含めるための修正に注力します。

ドキュメント変更による貢献#

リリース ドキュメントは、Spark の docs ディレクトリの下にあります。README.md には、必要な依存関係とドキュメント生成の手順が記載されています。通常、PySpark ドキュメントは、docs ディレクトリの下で次のコマンドでテストされます。

SKIP_SCALADOC=1 SKIP_RDOC=1 SKIP_SQLDOC=1 bundle exec jekyll serve --watch

PySpark は Sphinx を使用してリリース PySpark ドキュメントを生成します。そのため、PySpark ドキュメントのみをビルドしたい場合は、python/docs ディレクトリの下で次のようにビルドできます。

make html

これにより、python/docs/build/html の下に HTML が生成されます。

最後に、新しい API がドキュメント化されていることを確認してください。これは、python/docs/source/reference の下の対応する RST ファイルにメソッドやクラスを手動で追加することによって行います。そうしないと、PySpark ドキュメントにはドキュメント化されません。

コード変更の貢献準備#

PySpark でコードの作業を開始する前に、一般的なガイドライン を読むことをお勧めします。さらに、PySpark にコードを寄付する際に考慮すべき追加の注意点がいくつかあります。

  • Pythonic になる

    The Zen of Python を参照してください。

  • Scala と Java のサイドと API を一致させる

    Apache Spark は、統一された API レイヤーを提供する統合エンジンです。一般的に、API は他の言語でも一貫してサポートされています。

  • PySpark 固有の API も受け入れ可能

    Pythonic であり、既存の他の API と競合しない限り、デコレータを使用した UDF など、API リクエストを提出しても構いません。

  • パブリック API を拡張または変更する場合は、対応する型ヒントを調整してください

    詳細については、型ヒントの貢献と保守 を参照してください。

pandas API on Spark (pyspark.pandas) パッケージに修正を加える場合は、以下の設計原則を検討してください。

  • ビッグデータの場合は pandas-on-Spark データ構造を返し、小規模データの場合は pandas データ構造を返す

    開発者は、特定の関数が pandas-on-Spark DataFrame/Series を返すか、pandas DataFrame/Series を返すかという問題に直面することがよくあります。原則は次のとおりです。返されるオブジェクトが大きくなる可能性がある場合は、pandas-on-Spark DataFrame/Series を使用します。データが小さいことがわかっている場合は、pandas DataFrame/Series を使用します。たとえば、DataFrame.dtypes は pandas Series を返します。これは、DataFrame の列数が制限されており小さいからです。一方、DataFrame.head() または Series.unique() は、結果のオブジェクトが大きくなる可能性があるため、pandas-on-Spark DataFrame/Series を返します。

  • 一般的なデータ サイエンス タスクのために検出可能な API を提供する

    過度の一般化のリスクを冒して言えば、2 つの API 設計アプローチがあります。最初のものは一般的なタスクのための API を提供することに焦点を当てており、2 番目のものは抽象化から開始し、ユーザーがプリミティブを構成することによってタスクを達成できるようにします。世界は白黒ではありませんが、pandas は前者により多くのアプローチを取り、Spark は後者により多くのアプローチを取っています。

    例として、データ サイエンスで最も一般的な操作の 1 つである値のカウント (特定のキー列によるカウント) があります。pandas の DataFrame.value_counts() は結果をソートされた順序で返します。これは、データの探索時にユーザーが 90% のケースで好むものであり、Spark のものはソートされません。これはデータパイプラインの構築時に望ましいですが、ユーザーは明示的な orderBy を追加することで pandas の動作を実現できます。

    pandas と同様に、pandas API on Spark も前者により傾き、一般的なデータ サイエンス タスクのために検出可能な API を提供する必要があります。ほとんどの場合、この原則は pandas の API を実装するだけでうまく処理されます。ただし、pandas の API が特定のニーズに対応できない状況があります (例: ビッグデータ用のプロット)。

  • ユーザーが自分自身を傷つけるのを防ぐためのガードレール

    pandas の一部の操作は、データがスケーリングすると非常に高価になり、pandas API on Spark でそのような操作に依存できるという幻想をユーザーに与えたくありません。つまり、pandas API on Spark で実装されたメソッドは、大規模データセットでデフォルトで安全に実行できるようにする必要があります。その結果、次の機能は pandas API on Spark では実装されていません。

    • 根本的に並列化できない機能: 例: 各要素を命令的にループ処理する

    • 作業セット全体を 1 つのノードのメモリにマテリアライズする必要がある機能。そのため、pandas.DataFrame.to_xarray は実装されていません。別の例は、_repr_html_ 呼び出しで、ユーザーがノートブックで DataFrame の名前を入力するだけでドライバ ノードを吹っ飛ばすのを防ぐために、表示されるレコードの総数を最大 1000 件に制限していることです。

    ただし、例外がいくつかあります。一般的な「ビッグデータ サイエンス」のパターンは、初期データセットは大きいものの、分析が深まるにつれて作業セットが小さくなるということです。たとえば、データ サイエンティストはデータセットに対して集計を実行し、その後、集計されたデータセットをローカル データ構造に変換したい場合があります。データ サイエンティストを支援するために、次の機能を提供します。

    • DataFrame.to_pandas: pandas DataFrame (pandas-on-Spark のみ) を返します。

    • DataFrame.to_numpy: numpy 配列を返します。pandas API on Spark と pandas の両方で動作します。

    これらの関数はローカル データ構造を返すことが名前から明らかであり、1 つのノードのメモリにデータをマテリアライズする必要があることに注意してください。これらの関数については、結果のデータ構造が小さくなければならないという注意書きとともに明示的に文書化しています。

環境設定#

前提条件#

PySpark の開発には Spark のビルドが必要であり、適切な JDK のインストールなどが必要です。詳細については、Spark のビルド を参照してください。

Spark Connect for Python に貢献する予定の場合は、buf が必要です。詳細については、Buf のインストール を参照してください。

Conda#

Conda を使用している場合、開発環境は次のように設定できます。

# Python 3.9+ is required
conda create --name pyspark-dev-env python=3.9
conda activate pyspark-dev-env
pip install --upgrade -r dev/requirements.txt

設定が完了したら、開発を開始する前に必ず pyspark-dev-env に切り替えてください。

conda activate pyspark-dev-env

これで、開発を開始して テストを実行 できます。

pip#

Python 3.9 以降では、pip を使用して開発環境をインストールおよび設定できます。

pip install --upgrade -r dev/requirements.txt

これで、開発を開始して テストを実行 できます。

型ヒントの貢献と保守#

PySpark の型ヒントはインライン化されており、静的型チェックを活用します。

経験則として、パブリック API のみが注釈付けされます。

注釈は、可能な限り次のようにする必要があります。

  • 基盤となる JVM API の期待を反映し、Python インタープリター外での型関連の障害を回避するのに役立ちます。

  • 広すぎる (Any) および狭すぎる引数注釈の競合がある場合は、後者を選択します。ただし、典型的なユースケースのほとんどをカバーしている限り。

  • 引数の意味のない組み合わせを @overload 注釈を使用して示します。たとえば、*Col および *Cols 引数が相互に排他的であることを示す場合。

    @overload
    def __init__(
        self,
        *,
        threshold: float = ...,
        inputCol: Optional[str] = ...,
        outputCol: Optional[str] = ...
    ) -> None: ...
    @overload
    def __init__(
        self,
        *,
        thresholds: Optional[List[float]] = ...,
        inputCols: Optional[List[str]] = ...,
        outputCols: Optional[List[str]] = ...
    ) -> None: ...
    
  • 現在の安定版 MyPy リリースと互換性がある。

複雑なサポート型定義は、専用の _typing.pyi スタブに配置する必要があります。たとえば、pyspark.sql._typing.pyi を参照してください。

注釈は、dev/lint-python スクリプトを使用して検証するか、mypy を直接呼び出すことで検証できます。

mypy --config python/mypy.ini python/pyspark

コードとドキュメント文字列のガイド#

コード規約#

既存のコードベースのスタイルをそのまま従ってください。これは事実上 PEP 8 ですが、1 つの例外があります。行の長さは 79 文字ではなく 100 文字まで許可されます。

注意:

  • PySpark のメソッド名と変数名は、Python 自体の threading ライブラリと同様のケースです。API は Java に触発されました。PySpark は、Scala および Java に一致する公開 API に camelCase を採用しています。

  • 対照的に、functions.py は、API を SQL (および Python) フレンドリーにするために snake_case を使用しています。

  • さらに、pandas-on-Spark (pyspark.pandas) も snake_case を使用しています。これは、このパッケージが他の言語との API 一貫性から解放されているためです。

PySpark は、dev/lint-python が実行する pycodestyleflake8 などのリンターを利用しています。そのため、確認のためにそのスクリプトを実行するようにしてください。

ドキュメント文字列の規約#

PySpark は NumPy ドキュメント スタイル に従います。

Doctest の規約#

一般的に、doctest は改行で区切って論理的にグループ化する必要があります。

たとえば、最初のブロックは準備ステートメント用、2 番目のブロックは特定の引数を使用した関数の使用用、3 番目のブロックは別の引数用です。例として、pandas の DataFrame.rsub を参照してください。

これらのブロックは PySpark の doctest で一貫して分離する必要があり、doctest のカバレッジまたは表示する例の数が不十分な場合は、さらに doctest を追加する必要があります。

エラーと例外の貢献#

標準化されたユーザー向けの例外を発生させるには、開発者は任意の例外メッセージではなく、エラー クラスとメッセージ パラメーターを指定する必要があります。

使用法#

  1. PySpark のエラークラス で適切なエラー クラスが既に存在するかどうかを確認してください。存在する場合は、エラー クラスを使用し、ステップ 3 にスキップします。

  2. error-conditions.json に新しいクラスを追加します。以下の不変条件に注意してください。

  3. 例外タイプが既に PySparkException を継承しているか確認してください。存在する場合は、ステップ 5 にスキップします。

  4. PySparkException を例外にミックスインします。

  5. エラー クラスとメッセージ パラメーターを使用して例外を発生させます。

任意の例外メッセージで発生させる

raise ValueError("Problem A because B")

error-conditions.json

"PROBLEM_BECAUSE": {
  "message": ["Problem <problem> because <cause>"]
}

exceptions.py

class PySparkTestError(PySparkException):
    def __init__(self, errorClass: str, messageParameters: Dict[str, str]):
        super().__init__(errorClass=errorClass, messageParameters=messageParameters)

    def getMessageParameters(self) -> Optional[Dict[str, str]]:
        return super().getMessageParameters()

エラー クラスとメッセージ パラメーターで発生させる

raise PySparkTestError("PROBLEM_BECAUSE", {"problem": "A", "cause": "B"})

フィールドへのアクセス#

エラー フィールドにアクセスするには、PySparkException を継承する例外をキャッチし、PySparkException.getCondition() を介してエラー クラスにアクセスします。

try:
    ...
except PySparkException as pe:
    if pe.getCondition() == "PROBLEM_BECAUSE":
        ...

フィールド#

エラー クラス

エラー クラスは、エラー カテゴリの簡潔で人間が読める表現です。

カテゴリ化されていないエラーは、プレフィックス _LEGACY_ERROR_TEMP_ と未使用の連番 (例: _LEGACY_ERROR_TEMP_0053) を持つレガシー エラー クラスに割り当てることができます。

不変条件

  • 一意

  • リリース間で一貫性がある

  • アルファベット順にソートされている

メッセージ

エラーメッセージは、エラーの記述的で人間が読める表現を提供します。メッセージ形式は、C スタイルの printf 構文を使用して文字列パラメーターを受け入れます。

エラー メッセージの品質は、Apache Spark エラー メッセージ ガイドライン に一致する必要があります。

不変条件

  • 一意