PySpark でのロギング#

はじめに#

pyspark.logger モジュールは、PySpark ユーザー向けの構造化されたクライアントサイド ロギングを容易にします。

このモジュールには、さまざまなレベルで構造化された JSON 形式でメッセージをログ記録するためのいくつかのメソッドを提供する pyspark.logger.PySparkLogger クラスが含まれています。

ロガーは、コンソールまたは指定されたファイルにログを書き込むように簡単に設定できます。

ログ形式のカスタマイズ#

デフォルトのログ形式は JSON で、タイムスタンプ、ログレベル、ロガー名、および追加のコンテキストとともにログメッセージが含まれます。

ログエントリの例

{
  "ts": "2024-06-28 19:53:48,563",
  "level": "ERROR",
  "logger": "DataFrameQueryContextLogger",
  "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/.../spark/python/test_error_context.py:17\n",
  "context": {
    "file": "/path/to/file.py",
    "line": "17",
    "fragment": "divide"
    "errorClass": "DIVIDE_BY_ZERO"
  },
  "exception": {
    "class": "Py4JJavaError",
    "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"divide\" was called from\n/path/to/file.py:17 ...",
    "stacktrace": [
      {
        "class": null,
        "method": "deco",
        "file": ".../spark/python/pyspark/errors/exceptions/captured.py",
        "line": "247"
      }
    ]
  },
}

セットアップ#

PySpark ロギングモジュールを使用するには、pyspark.logger から pyspark.logger.PySparkLogger をインポートする必要があります。

from pyspark.logger import PySparkLogger

使用法#

ロガーの作成#

PySparkLogger.getLogger() を呼び出すことで、ロガーインスタンスを作成できます。デフォルトでは、INFO ログレベルで “PySparkLogger” という名前のロガーが作成されます。

logger = PySparkLogger.getLogger()

メッセージのロギング#

ロガーは、ログメッセージ用の 3 つの主要なメソッドを提供します: PySparkLogger.info()PySparkLogger.warning()、および PySparkLogger.error()

  • PySparkLogger.info: このメソッドを使用して、情報メッセージをログに記録します。

    user = "test_user"
    action = "login"
    logger.info(f"User {user} performed {action}", user=user, action=action)
    
  • PySparkLogger.warning: このメソッドを使用して、警告メッセージをログに記録します。

    user = "test_user"
    action = "access"
    logger.warning("User {user} attempted an unauthorized {action}", user=user, action=action)
    
  • PySparkLogger.error: このメソッドを使用して、エラーメッセージをログに記録します。

    user = "test_user"
    action = "update_profile"
    logger.error("An error occurred for user {user} during {action}", user=user, action=action)
    

コンソールへのロギング#

from pyspark.logger import PySparkLogger

# Create a logger that logs to console
logger = PySparkLogger.getLogger("ConsoleLogger")

user = "test_user"
action = "test_action"

logger.warning(f"User {user} takes an {action}", user=user, action=action)

これにより、次の JSON 形式で情報がログに記録されます。

{
  "ts": "2024-06-28 19:44:19,030",
  "level": "WARNING",
  "logger": "ConsoleLogger",
  "msg": "User test_user takes an test_action",
  "context": {
    "user": "test_user",
    "action": "test_action"
  },
}

ファイルへのロギング#

ファイルにメッセージをログに記録するには、標準 Python ロギングモジュールの FileHandler をロガーに追加するために PySparkLogger.addHandler() を使用します。

このアプローチは、標準の Python ロギングプラクティスに準拠しています。

from pyspark.logger import PySparkLogger
import logging

# Create a logger that logs to a file
file_logger = PySparkLogger.getLogger("FileLogger")
handler = logging.FileHandler("application.log")
file_logger.addHandler(handler)

user = "test_user"
action = "test_action"

file_logger.warning(f"User {user} takes an {action}", user=user, action=action)

ログメッセージは、同じ JSON 形式で application.log に保存されます。