第4章:バグ退治 - PySparkのデバッグ#

PySparkは分散環境でアプリケーションを実行するため、これらのアプリケーションの監視とデバッグは困難です。どのノードが特定のコードを実行しているかを追跡するのは難しい場合があります。しかし、PySpark内にはデバッグを支援する複数の方法があります。このセクションでは、PySparkアプリケーションを効果的にデバッグする方法を概説します。

PySparkは、基盤となるエンジンとしてSparkを使用して動作し、Spark ConnectサーバーまたはPy4J(Spark Classic)を利用してSparkにジョブを送信し、計算します。

ドライバー側では、PySparkはSpark ConnectサーバーまたはPy4J(Spark Classic)を介してJVM上のSparkドライバーと対話します。`pyspark.sql.SparkSession`が作成および初期化されると、PySparkはSparkドライバーとの通信を開始します。

エグゼキュータ側では、PythonワーカーがPythonネイティブ関数またはデータの実行と管理を担当します。これらのワーカーは、Python UDFの実行など、PySparkアプリケーションがPythonとJVM間の対話を必要とする場合にのみ起動されます。これらは、pandas UDFまたはPySpark RDD APIを実行する場合などにオンデマンドで開始されます。

Spark UI#

Python UDFの実行#

PySparkでPython UDFをデバッグするには、printステートメントを追加するだけで済みますが、関数はエグゼキュータ上で実行されるため、クライアント/ドライバー側では出力が表示されません。Spark UIで表示できます。たとえば、動作するPython UDFがある場合

[1]:
from pyspark.sql.functions import udf

@udf("integer")
def my_udf(x):
    # Do something with x
    return x

以下に示すように、デバッグのためにprintステートメントを追加できます。

[2]:
@udf("integer")
def my_udf(x):
    # Do something with x
    print("What's going on?")
    return x

spark.range(1).select(my_udf("id")).collect()
[2]:
[Row(my_udf(id)=0)]

出力は、エグゼキュータタブの`stdout`/`stderr`の下のSpark UIで表示できます。

Spark UI print

Python以外のUDF#

Python以外のUDFコードを実行する場合、デバッグは通常、Spark UIを介して、または`DataFrame.explain(True)`を使用して行われます。

たとえば、以下のコードは、大きなDataFrame(`df1`)と小さなDataFrame(`df2`)の間で結合を実行します。

[3]:
df1 = spark.createDataFrame([(x,) for x in range(100)])
df2 = spark.createDataFrame([(x,) for x in range(2)])
df1.join(df2, "_1").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [_1#6L]
   +- SortMergeJoin [_1#6L], [_1#8L], Inner
      :- Sort [_1#6L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(_1#6L, 200), ENSURE_REQUIREMENTS, [plan_id=41]
      :     +- Filter isnotnull(_1#6L)
      :        +- Scan ExistingRDD[_1#6L]
      +- Sort [_1#8L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(_1#8L, 200), ENSURE_REQUIREMENTS, [plan_id=42]
            +- Filter isnotnull(_1#8L)
               +- Scan ExistingRDD[_1#8L]


`DataFrame.explain`を使用すると、結合がどのように実行されるかを示す物理計画が表示されます。これらの物理計画は、全体の実行の個々のステップを表します。ここでは、データを交換(シャッフル)し、ソートマージ結合を実行します。

この方法で計画がどのように生成されるかを確認した後、ユーザーはクエリを最適化できます。たとえば、`df2`は非常に小さいので、エグゼキュータにブロードキャストしてシャッフルを削除できます。

[4]:
from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), "_1").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [_1#6L]
   +- BroadcastHashJoin [_1#6L], [_1#8L], Inner, BuildRight, false
      :- Filter isnotnull(_1#6L)
      :  +- Scan ExistingRDD[_1#6L]
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [plan_id=71]
         +- Filter isnotnull(_1#8L)
            +- Scan ExistingRDD[_1#8L]


シャッフルが削除され、ブロードキャストハッシュ結合が実行されていることがわかります。

これらの最適化は、実行後にSpark UIのSQL / DataFrameタブでも視覚化できます。

[5]:
df1.join(df2, "_1").collect()
[5]:
[Row(_1=0), Row(_1=1)]

PySpark UI SQL

[6]:
df1.join(broadcast(df2), "_1").collect()
[6]:
[Row(_1=0), Row(_1=1)]

PySpark UI SQL broadcast

`top`と`ps`で監視#

ドライバー側では、PySparkシェルからプロセスIDを取得してリソースを監視できます。

[7]:
import os; os.getpid()
[7]:
23976
[8]:
%%bash
ps -fe 23976
  UID   PID  PPID   C STIME   TTY           TIME CMD
  502 23976 21512   0 12:06PM ??         0:02.30 /opt/miniconda3/envs/python3.11/bin/python -m ipykernel_launcher -f /Users/hyukjin.kwon/Library/Jupyter/runtime/kernel-c8eb73ef-2b21-418e-b770-92b946454606.json

エグゼキュータ側では、`grep`を使用してPythonワーカーのプロセスIDとリソースを見つけることができます。これらは`pyspark.daemon`からフォークされているためです。

[9]:
%%bash
ps -fe | grep pyspark.daemon | head -n 5
  502 23989 23981   0 12:06PM ??         0:00.59 python3 -m pyspark.daemon pyspark.worker
  502 23990 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker
  502 23991 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker
  502 23992 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker
  502 23993 23989   0 12:06PM ??         0:00.19 python3 -m pyspark.daemon pyspark.worker

通常、ユーザーは`top`と特定されたPIDを利用して、PySparkのPythonプロセスのメモリ使用量を監視します。

PySparkプロファイラの使用#

メモリプロファイラ#

ドライバー側のデバッグのため、ユーザーは`memory_profiler`など、既存のほとんどのPythonツールを使用できます。これにより、行ごとにメモリ使用量を確認できます。ドライバープログラムが別のマシン(例:YARNクラスターモード)で実行されていない場合は、メモリプロファイラを使用してドライバー側のメモリ使用量をデバッグできます。たとえば、

[10]:
%%bash

echo "from pyspark.sql import SparkSession
#===Your function should be decorated with @profile===
from memory_profiler import profile
@profile
#=====================================================
def my_func():
    session = SparkSession.builder.getOrCreate()
    df = session.range(10000)
    return df.collect()
if __name__ == '__main__':
    my_func()" > profile_memory.py

python -m memory_profiler profile_memory.py 2> /dev/null
Filename: profile_memory.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     4     80.6 MiB     80.6 MiB           1   @profile
     5                                         #=====================================================
     6                                         def my_func():
     7     79.0 MiB     -1.7 MiB           1       session = SparkSession.builder.getOrCreate()
     8     80.1 MiB      1.1 MiB           1       df = session.range(10000)
     9     84.1 MiB      4.0 MiB           1       return df.collect()


どの行がどのくらいのメモリを消費するかを適切に示します。

PythonおよびPandas UDF#

注意:このセクションはSpark 4.0に適用されます。

PySparkは、Python/Pandas UDF用のリモート`memory_profiler`を提供します。これは、Jupyterノートブックなどの行番号付きのエディターで使用できます。SparkSessionベースのメモリプロファイラは、ランタイムSQL構成`spark.sql.pyspark.udf.profiler`を`memory`に設定することで有効にできます。

[11]:
from pyspark.sql.functions import pandas_udf

df = spark.range(10)

@pandas_udf("long")
def add1(x):
  return x + 1

spark.conf.set("spark.sql.pyspark.udf.profiler", "memory")

added = df.select(add1("id"))
spark.profile.clear()
added.collect()
spark.profile.show(type="memory")
============================================================
Profile of UDF<id=16>
============================================================
Filename: /var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/885006762.py

Line #    Mem usage    Increment  Occurrences   Line Contents
=============================================================
     5   1472.6 MiB   1472.6 MiB          10   @pandas_udf("long")
     6                                         def add1(x):
     7   1473.9 MiB      1.3 MiB          10     return x + 1


UDF IDはクエリプランで確認できます。たとえば、以下に示す`ArrowEvalPython`内の`add1(...)#16L`です。

[12]:
added.explain()
== Physical Plan ==
*(2) Project [pythonUDF0#19L AS add1(id)#17L]
+- ArrowEvalPython [add1(id#14L)#16L], [pythonUDF0#19L], 200
   +- *(1) Range (0, 10, step=1, splits=16)


パフォーマンスプロファイラ#

注意:このセクションはSpark 4.0に適用されます。

`Python Profilers`は、Python自体に組み込まれた便利な機能です。ドライバー側でこれを使用するには、ドライバー側のPySparkは通常のPythonプロセスであるため、通常のPythonプログラムと同様に使用できます。ただし、ドライバープログラムが別のマシン(例:YARNクラスターモード)で実行されている場合は除きます。

[13]:
%%bash

echo "from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.range(10).collect()" > app.py

python -m cProfile -s cumulative app.py  2> /dev/null | head -n 20
         549275 function calls (536745 primitive calls) in 3.447 seconds

   Ordered by: cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        2    0.000    0.000    3.448    1.724 app.py:1(<module>)
    792/1    0.005    0.000    3.447    3.447 {built-in method builtins.exec}
      128    0.000    0.000    2.104    0.016 socket.py:692(readinto)
      128    2.104    0.016    2.104    0.016 {method 'recv_into' of '_socket.socket' objects}
      124    0.000    0.000    2.100    0.017 java_gateway.py:1015(send_command)
      125    0.001    0.000    2.099    0.017 clientserver.py:499(send_command)
      138    0.000    0.000    2.097    0.015 {method 'readline' of '_io.BufferedReader' objects}
       55    0.000    0.000    1.622    0.029 java_gateway.py:1313(__call__)
       95    0.001    0.000    1.360    0.014 __init__.py:1(<module>)
        1    0.000    0.000    1.359    1.359 session.py:438(getOrCreate)
        1    0.000    0.000    1.311    1.311 context.py:491(getOrCreate)
        1    0.000    0.000    1.311    1.311 context.py:169(__init__)
        1    0.000    0.000    0.861    0.861 context.py:424(_ensure_initialized)
        1    0.001    0.001    0.861    0.861 java_gateway.py:39(launch_gateway)
        8    0.840    0.105    0.840    0.105 {built-in method time.sleep}

Python/Pandas UDF#

注意:このセクションはSpark 4.0に適用されます。

PySparkは、Python/Pandas UDF用のリモートPythonプロファイラを提供します。イテレータを入力/出力とするUDFはサポートされていません。SparkSessionベースのパフォーマンスプロファイラは、ランタイムSQL構成`spark.sql.pyspark.udf.profiler`を`perf`に設定することで有効にできます。例を以下に示します。

[14]:
import io
from contextlib import redirect_stdout

from pyspark.sql.functions import pandas_udf

df = spark.range(10)
@pandas_udf("long")
def add1(x):
    return x + 1

added = df.select(add1("id"))

spark.conf.set("spark.sql.pyspark.udf.profiler", "perf")
spark.profile.clear()
added.collect()

# Only show top 10 lines
output = io.StringIO()
with redirect_stdout(output):
    spark.profile.show(type="perf")

print("\n".join(output.getvalue().split("\n")[0:20]))
============================================================
Profile of UDF<id=22>
============================================================
         2130 function calls (2080 primitive calls) in 0.003 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       10    0.001    0.000    0.003    0.000 common.py:62(new_method)
       10    0.000    0.000    0.000    0.000 {built-in method _operator.add}
       10    0.000    0.000    0.002    0.000 base.py:1371(_arith_method)
       10    0.000    0.000    0.001    0.000 series.py:389(__init__)
       20    0.000    0.000    0.000    0.000 _ufunc_config.py:33(seterr)
       10    0.000    0.000    0.001    0.000 series.py:6201(_construct_result)
       10    0.000    0.000    0.000    0.000 cast.py:1605(maybe_cast_to_integer_array)
       10    0.000    0.000    0.000    0.000 construction.py:517(sanitize_array)
       10    0.000    0.000    0.002    0.000 series.py:6133(_arith_method)
       10    0.000    0.000    0.000    0.000 managers.py:1863(from_array)
       10    0.000    0.000    0.000    0.000 array_ops.py:240(arithmetic_op)
      510    0.000    0.000    0.000    0.000 {built-in method builtins.isinstance}

UDF IDはクエリプランで確認できます。たとえば、以下の`ArrowEvalPython`内の`add1(...)#22L`です。

[15]:
added.explain()
== Physical Plan ==
*(2) Project [pythonUDF0#25L AS add1(id)#23L]
+- ArrowEvalPython [add1(id#20L)#22L], [pythonUDF0#25L], 200
   +- *(1) Range (0, 10, step=1, splits=16)


以下に示すように、事前に登録されたレンダラーで結果を表示できます。

[16]:
spark.profile.render(id=2, type="perf")  # renderer="flameprof" by default

PySpark UDF profiling

スタックトレースの表示#

注意:このセクションはSpark 4.0に適用されます。

デフォルトでは、JVMスタックトレースとPython内部トレースバックは非表示になっています。特にPython UDFの実行ではそうです。たとえば、

[17]:
from pyspark.sql.functions import udf

spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
PythonException:
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3806637820.py", line 3, in <lambda>
ZeroDivisionError: division by zero

完全な内部スタックトレースを表示するには、ユーザーはそれぞれ`spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled`と`spark.sql.pyspark.jvmStacktrace.enabled`を有効にすることができます。

[18]:
spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", False)
spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", False)
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
PythonException:
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 1898, in main
    process()
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 1890, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream
    for obj in iterator:
  File "/.../python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched
    for item in iterator:
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 1798, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 1798, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 114, in <lambda>
    return args_kwargs_offsets, lambda *a: func(*a)
                                           ^^^^^^^^
  File "/.../python/lib/pyspark.zip/pyspark/util.py", line 145, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/.../python/lib/pyspark.zip/pyspark/worker.py", line 739, in profiling_func
    ret = f(*args, **kwargs)
          ^^^^^^^^^^^^^^^^^^
  File "/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3570641234.py", line 3, in <lambda>
ZeroDivisionError: division by zero

[19]:
spark.conf.set("spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled", True)
spark.conf.set("spark.sql.pyspark.jvmStacktrace.enabled", True)
spark.range(1).select(udf(lambda x: x / 0)("id")).collect()
PythonException:
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py", line 3, in <lambda>
ZeroDivisionError: division by zero


JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 13.0 failed 1 times, most recent failure: Lost task 15.0 in stage 13.0 (TID 161) (ip-192-168-45-94.ap-northeast-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/var/folders/qm/mlwmy16n5xx66ldgzmptzlc40000gp/T/ipykernel_23976/3514597595.py", line 3, in <lambda>
ZeroDivisionError: division by zero

        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:531)
        at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:103)
        at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:485)
        ...

詳細については、スタックトレースを参照してください。

IDEデバッグ#

ドライバー側では、PySparkアプリケーションのデバッグにIDEを使用するために追加の手順は必要ありません。以下のガイドを参照してください。

エグゼキュータ側では、リモートデバッガを設定するにはいくつかの手順が必要です。以下のガイドを参照してください。