Protobuf データソースガイド

Spark 3.4.0 リリース以降、Spark SQL は Protobuf データの読み書きを組み込みでサポートしています。

デプロイ

spark-protobuf モジュールは外部モジュールであり、spark-submit または spark-shell にはデフォルトで含まれていません。

他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-protobuf_2.12 とその依存関係は、--packages を使用して spark-submit に直接追加できます。

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...

spark-shell で実験する場合は、--packages を使用して org.apache.spark:spark-protobuf_2.12 とその依存関係を直接追加することもできます。

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...

外部依存関係を持つアプリケーションの提出に関する詳細は、アプリケーション提出ガイド を参照してください。

to_protobuf() と from_protobuf()

spark-protobuf パッケージは、列を Protobuf 形式のバイナリとしてエンコードする関数 to_protobuf と、Protobuf バイナリデータを列にデコードする関数 from_protobuf() を提供します。どちらの関数も、1 つの列を別の列に変換し、入力/出力 SQL データ型は複合型またはプリミティブ型にすることができます。

Kafka のようなストリーミングソースとの読み書きを行う場合、列として Protobuf メッセージを使用すると便利です。各 Kafka キーバリューレコードには、Kafka への取り込みタイムスタンプ、Kafka 内のオフセットなど、いくつかのメタデータが追加されます。

Spark SQL スキーマは、from_protobufto_protobuf に渡された Protobuf 記述子ファイルまたは Protobuf クラスに基づいて生成されます。指定された Protobuf クラスまたは Protobuf 記述子ファイルはデータと一致する必要があります。そうでない場合、動作は未定義です。失敗するか、任意の結果を返す可能性があります。

この div は、マークダウンエディタ/ビューアを動作させるためだけに使用され、Web 上には表示されません ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
#   string name = 1;
#   int64 id = 2;
#   string context = 3;
# }
df = spark
  .readStream
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
  .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
  .where('event.name == "alice"')
  .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
  .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  .where('event.name == "alice"')

output.printSchema()
# root
#  |--event: struct (nullable = true)
#  |   |-- name : string (nullable = true)
#  |   |-- id: long (nullable = true)
#  |   |-- context: string (nullable = true)

output = output
  .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")
  .start()
```
この div は、マークダウンエディタ/ビューアを動作させるためだけに使用され、Web 上には表示されません ```scala
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
  .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  .where("event.name == \"alice\"")
  .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
  .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
```
この div は、マークダウンエディタ/ビューアを動作させるためだけに使用され、Web 上には表示されません ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
  .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  .where("event.name == \"alice\"")
  .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
  .select(
    from_protobuf(col("value"),
    "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(
  to_protobuf(col("event"),
  "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
```

Protobuf から Spark SQL への変換でサポートされる型

現在、Spark は Protobuf のメッセージの下にあるProtobuf スカラー型列挙型ネスト型、およびマップ型の読み取りをサポートしています。これらの型に加えて、spark-protobuf は Protobuf OneOf フィールドのサポートも導入しています。これにより、複数の可能なフィールドセットを持つことができるメッセージを処理できますが、一度に存在できるのは 1 つのセットのみです。これは、扱っているデータが常に同じ形式ではない場合、エラーが発生することなく異なるフィールドセットを持つメッセージを処理する必要がある場合に役立ちます。

Protobuf 型Spark SQL 型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
bytes BinaryType
Message StructType
repeated ArrayType
map MapType
OneOf Struct

また、以下の Protobuf 型TimestampDuration の読み取りもサポートしています。

Protobuf 論理型Protobuf スキーマSpark SQL 型
duration MessageType{seconds: Long, nanos: Int} DayTimeIntervalType
timestamp MessageType{seconds: Long, nanos: Int} TimestampType

Spark SQL から Protobuf への変換でサポートされる型

Spark は、すべての Spark SQL 型を Protobuf に書き込むことをサポートしています。ほとんどの型の場合、Spark 型から Protobuf 型へのマッピングは簡単です(例:IntegerType は int に変換されます)。

Spark SQL 型Protobuf 型
BooleanType boolean
IntegerType int
LongType long
FloatType float
DoubleType double
StringType string
StringType enum
BinaryType bytes
StructType message
ArrayType repeated
MapType map

循環参照 Protobuf フィールドの処理

Protobuf データを扱う際に発生する可能性のある一般的な問題の 1 つは、循環参照の存在です。Protobuf では、循環参照は、フィールドが自分自身または元のフィールドを参照する別のフィールドを参照する場合に発生します。これは、無限ループやその他の予期しない動作を引き起こす可能性があるため、データの解析時に問題を引き起こす可能性があります。この問題に対処するために、最新バージョンの spark-protobuf は、フィールド型を通じて循環参照をチェックできる新しい機能を導入しています。これにより、ユーザーはrecursive.fields.max.depthオプションを使用して、スキーマを解析する際に許可する再帰の最大レベル数を指定できます。recursive.fields.max.depthを -1 に設定することにより、デフォルトではspark-protobufは再帰的なフィールドを許可しません。ただし、必要に応じてこのオプションを 0 から 10 に設定できます。

recursive.fields.max.depthを 0 に設定すると、すべての再帰的なフィールドが削除され、1 に設定すると 1 回再帰することが許可され、2 に設定すると 2 回再帰することが許可されます。パフォーマンスの問題やスタックオーバーフローにつながる可能性があるため、recursive.fields.max.depthの値を 10 より大きくすることはできません。

以下の Protobuf メッセージの SQL スキーマは、recursive.fields.max.depthの値によって異なります。

この div は、マークダウンエディタ/ビューアを動作させるためだけに使用され、Web 上には表示されません ```protobuf
syntax = "proto3"
message Person {
  string name = 1;
  Person bff = 2
}

// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.

0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...
```