オプションと設定#
Pandas API on Spark には、その動作のいくつかの側面をカスタマイズできるオプションシステムがあり、ユーザーが最も調整する可能性が高いのは表示関連のオプションです。
オプションには、大文字小文字を区別しない「ドットスタイル」の名前があります (例: display.max_rows)。トップレベルの options 属性の属性として、オプションを直接取得/設定できます。
>>> import pyspark.pandas as ps
>>> ps.options.display.max_rows
1000
>>> ps.options.display.max_rows = 10
>>> ps.options.display.max_rows
10
pandas_on_spark 名前空間から直接利用できる、関連する 3 つの関数で API は構成されています。
get_option()/set_option()- 単一オプションの値を取得/設定します。reset_option()- 1 つ以上のオプションをデフォルト値にリセットします。
注意: 開発者は、詳細については pyspark.pandas/config.py を参照してください。
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 101)
>>> ps.get_option("display.max_rows")
101
オプションの取得と設定#
上記で説明したように、get_option() と set_option() は pandas_on_spark 名前空間から利用できます。オプションを変更するには、set_option('option name', new_value) を呼び出します。
>>> import pyspark.pandas as ps
>>> ps.get_option('compute.max_rows')
1000
>>> ps.set_option('compute.max_rows', 2000)
>>> ps.get_option('compute.max_rows')
2000
すべてのオプションにはデフォルト値もあり、reset_option を使用してそのデフォルト値にリセットできます。
>>> import pyspark.pandas as ps
>>> ps.reset_option("display.max_rows")
>>> import pyspark.pandas as ps
>>> ps.get_option("display.max_rows")
1000
>>> ps.set_option("display.max_rows", 999)
>>> ps.get_option("display.max_rows")
999
>>> ps.reset_option("display.max_rows")
>>> ps.get_option("display.max_rows")
1000
option_context コンテキストマネージャーはトップレベル API を介して公開されており、指定されたオプション値でコードを実行できます。オプション値は、with ブロックを終了すると自動的に復元されます。
>>> with ps.option_context("display.max_rows", 10, "compute.max_rows", 5):
... print(ps.get_option("display.max_rows"))
... print(ps.get_option("compute.max_rows"))
10
5
>>> print(ps.get_option("display.max_rows"))
>>> print(ps.get_option("compute.max_rows"))
1000
1000
異なる DataFrame に対する操作#
Pandas API on Spark は、高コストな操作を防ぐために、デフォルトでは異なる DataFrame (または Series) に対する操作を許可しません。内部的に結合操作を実行しますが、これは一般的に高コストになる可能性があります。
これは、compute.ops_on_diff_frames を True に設定することで有効にできます。以下に例を示します。
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf1 = ps.range(5)
>>> psdf2 = ps.DataFrame({'id': [5, 4, 3]})
>>> (psdf1 - psdf2).sort_index()
id
0 -5.0
1 -3.0
2 -1.0
3 NaN
4 NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.ops_on_diff_frames', True)
>>> psdf = ps.range(5)
>>> psser_a = ps.Series([1, 2, 3, 4])
>>> # 'psser_a' is not from 'psdf' DataFrame. So it is considered as a Series not from 'psdf'.
>>> psdf['new_col'] = psser_a
>>> psdf
id new_col
0 0 1.0
1 1 2.0
3 3 4.0
2 2 3.0
4 4 NaN
>>> ps.reset_option('compute.ops_on_diff_frames')
デフォルトのインデックスタイプ#
Pandas API on Spark では、Spark DataFrame が pandas-on-Spark DataFrame に変換される場合など、いくつかのケースでデフォルトのインデックスが使用されます。この場合、Pandas API on Spark は内部的に pandas-on-Spark DataFrame にデフォルトのインデックスをアタッチします。
以下のように compute.default_index_type で設定できるデフォルトインデックスにはいくつかのタイプがあります。
sequence: パーティションを指定せずに PySpark の Window 関数を使用して、1 ずつ増加するシーケンスを実装します。そのため、単一ノードにすべてのパーティションが含まれる可能性があります。データが大きい場合は、このインデックスタイプは避けるべきです。以下に例を示します。
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([0, 1, 2], dtype='int64')
これは概念的には、以下の PySpark の例と同等です。
>>> from pyspark.sql import functions as sf, Window
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> sequential_index = sf.row_number().over(
... Window.orderBy(sf.monotonically_increasing_id().asc())) - 1
>>> spark_df.select(sequential_index).rdd.map(lambda r: r[0]).collect()
[0, 1, 2]
distributed-sequence (デフォルト): グループ化とグループマップのアプローチを分散方式で、1 ずつ増加するシーケンスを実装します。それでもグローバルにシーケンシャルなインデックスを生成します。デフォルトのインデックスを大規模データセットでシーケンスにする必要がある場合は、このインデックスを使用する必要があります。以下に例を示します。
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed-sequence')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([0, 1, 2], dtype='int64')
これは概念的には、以下の PySpark の例と同等です。
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.rdd.zipWithIndex().map(lambda p: p[1]).collect()
[0, 1, 2]
警告
sequence とは異なり、distributed-sequence は分散環境で実行されるため、インデックス自体はグローバルにシーケンシャルなままであっても、各インデックスに対応する行は異なる場合があります。
これは、行が複数のパーティションとノードに分散されるため、データがロードされるときに、行とインデックスのマッピングが決定論的でなくなるためです。
さらに、apply()、groupby()、または transform() のような操作を使用すると、新しい distributed-sequence インデックスが生成される場合があります。これは元の DataFrame のインデックスと必ずしも一致するわけではありません。これにより、行とインデックスのマッピングがずれて、計算が誤って行われる可能性があります。
この問題を回避するには、distributed-sequence でのインデックスずれの処理 を参照してください。
distributed: PySpark の monotonically_increasing_id 関数を完全に分散方式で使用して、単調に増加するシーケンスを実装します。値は決定論的ではありません。インデックスが 1 ずつ増加するシーケンスである必要がない場合は、このインデックスを使用すべきです。パフォーマンスの観点からは、このインデックスは他のインデックスタイプと比較してほとんどペナルティがありません。以下に例を示します。
>>> import pyspark.pandas as ps
>>> ps.set_option('compute.default_index_type', 'distributed')
>>> psdf = ps.range(3)
>>> ps.reset_option('compute.default_index_type')
>>> psdf.index
Index([25769803776, 60129542144, 94489280512], dtype='int64')
これは概念的には、以下の PySpark の例と同等です。
>>> from pyspark.sql import functions as sf
>>> import pyspark.pandas as ps
>>> spark_df = ps.range(3).to_spark()
>>> spark_df.select(sf.monotonically_increasing_id()) \
... .rdd.map(lambda r: r[0]).collect()
[25769803776, 60129542144, 94489280512]
警告
2 つの異なる DataFrame での計算にこのタイプのインデックスが使用される可能性は非常に低いです。なぜなら、2 つの DataFrame で同じインデックスを持つことが保証されないからです。このデフォルトインデックスを使用し、compute.ops_on_diff_frames をオンにすると、2 つの異なる DataFrame 間の操作の結果は、決定論的でないインデックス値のため、予期しない出力になる可能性が高いです。
利用可能なオプション#
オプション |
デフォルト |
説明 |
|---|---|---|
display.max_rows |
1000 |
これは、さまざまな出力を表示する際に pandas-on-Spark が出力すべき最大行数を設定します。たとえば、この値は、DataFrame の repr() で表示される行数を決定します。入力長に制限がない場合は None に設定します。デフォルトは 1000 です。 |
compute.max_rows |
1000 |
‘compute.max_rows’ は、現在の pandas-on-Spark DataFrame の制限を設定します。入力長に制限がない場合は None に設定します。制限が設定されている場合、ドライバーにデータを収集し、pandas API を使用するショートカットによって実行されます。制限が解除されている場合、操作は PySpark によって実行されます。デフォルトは 1000 です。 |
compute.shortcut_limit |
1000 |
‘compute.shortcut_limit’ は、ショートカットの制限を設定します。指定された行数を計算し、そのスキーマを使用します。DataFrame の長さがこの制限を超える場合、pandas-on-Spark は PySpark を使用して計算します。 |
compute.ops_on_diff_frames |
True |
これは、2 つの異なる DataFrame 間で操作を実行するかどうかを決定します。たとえば、‘combine_frames’ 関数は内部的に結合操作を実行しますが、これは一般的に高コストになる可能性があります。そのため、compute.ops_on_diff_frames 変数が True でない場合、そのメソッドは例外をスローします。 |
compute.default_index_type |
‘distributed-sequence’ |
これは、デフォルトのインデックスタイプを設定します: sequence、distributed、および distributed-sequence。 |
compute.default_index_cache |
‘MEMORY_AND_DISK_SER’ |
これは、分散シーケンスインデックスでキャッシュされる一時 RDD のデフォルトストレージレベルを設定します: ‘NONE’、‘DISK_ONLY’、‘DISK_ONLY_2’、‘DISK_ONLY_3’、‘MEMORY_ONLY’、‘MEMORY_ONLY_2’、‘MEMORY_ONLY_SER’、‘MEMORY_ONLY_SER_2’、‘MEMORY_AND_DISK’、‘MEMORY_AND_DISK_2’、‘MEMORY_AND_DISK_SER’、‘MEMORY_AND_DISK_SER_2’、‘OFF_HEAP’、‘LOCAL_CHECKPOINT’。 |
compute.ordered_head |
False |
‘compute.ordered_head’ は、自然順序で head 操作を実行するかどうかを設定します。pandas-on-Spark は行の順序を保証しないため、head は分散パーティションからいくつかの行を返す可能性があります。‘compute.ordered_head’ が True に設定されている場合、pandas-on-Spark は事前に自然順序を実行しますが、パフォーマンスのオーバーヘッドが発生します。 |
compute.eager_check |
True |
‘compute.eager_check’ は、検証のために一部の Spark ジョブを起動するかどうかを設定します。‘compute.eager_check’ が True に設定されている場合、pandas-on-Spark は事前に検証を実行しますが、パフォーマンスのオーバーヘッドが発生します。そうでない場合、pandas-on-Spark は検証をスキップし、pandas とはわずかに異なる動作になります。影響を受ける API: Series.dot、Series.asof、Series.compare、FractionalExtensionOps.astype、IntegralExtensionOps.astype、FractionalOps.astype、DecimalOps.astype、skipna の統計関数。 |
compute.isin_limit |
80 |
‘compute.isin_limit’ は、‘Column.isin(list)’ によるフィルタリングの制限を設定します。‘list’ の長さが制限を超える場合、パフォーマンス向上のためにブロードキャスト結合が使用されます。 |
compute.pandas_fallback |
False |
‘compute.pandas_fallback’ は、Pandas の実装に自動的にフォールバックするかどうかを設定します。 |
compute.fail_on_ansi_mode |
True |
‘compute.fail_on_ansi_mode’ は、ANSI モードで動作するかどうかを設定します。True の場合、基盤となる Spark が ANSI モードを有効にして動作していると、pandas API on Spark は例外を発生させます。それ以外の場合、予期しない動作を引き起こす可能性があるにもかかわらず、動作を強制します。 |
plotting.max_rows |
1000 |
‘plotting.max_rows’ は、plot.bar や plot.pie のようなトップ N ベースのプロットの表示上の制限を設定します。1000 に設定されている場合、プロットには最初の 1000 データポイントが使用されます。デフォルトは 1000 です。 |
plotting.sample_ratio |
なし |
‘plotting.sample_ratio’ は、plot.line や plot.area のようなサンプルベースのプロットでプロットされるデータの割合を設定します。設定されていない場合、‘plotting.max_rows’ から、‘plotting.max_rows’ を総データサイズで割った比率を計算することで導出されます。 |
plotting.backend |
‘plotly’ |
プロットに使用するバックエンド。デフォルトは plotly です。トップレベルの .plot メソッドを持つ任意のパッケージをサポートします。既知のオプションは [matplotlib, plotly] です。 |