Protobuf データソース ガイド

Spark 3.4.0 リリース以降、Spark SQL は Protobuf データの読み書きをネイティブでサポートしています。

デプロイ

spark-protobuf モジュールは外部であり、デフォルトでは spark-submit または spark-shell には含まれていません。

他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-protobuf_2.13 およびその依存関係は、--packages を使用して spark-submit に直接追加できます。例:

./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...

spark-shell で実験する場合も、--packages を使用して org.apache.spark:spark-protobuf_2.13 およびその依存関係を直接追加できます。

./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...

外部依存関係を持つアプリケーションの送信に関する詳細については、アプリケーション送信ガイドを参照してください。

to_protobuf() と from_protobuf()

spark-protobuf パッケージは、列を Protobuf 形式のバイナリとしてエンコードするための to_protobuf 関数と、Protobuf バイナリデータを列にデコードするための from_protobuf() 関数を提供します。どちらの関数も 1 つの列を別の列に変換し、入力/出力の SQL データ型は複雑な型またはプリミティブ型にすることができます。

Protobuf メッセージを列として使用することは、Kafka のようなストリーミングソースから読み書きする場合に便利です。各 Kafka キー/値レコードには、Kafka への取り込みタイムスタンプ、Kafka のオフセットなどのメタデータが付加されます。

Spark SQL スキーマは、from_protobuf および to_protobuf に渡された Protobuf 記述子ファイルまたは Protobuf クラスに基づいて生成されます。指定された Protobuf クラスまたは Protobuf 記述子ファイルはデータと一致している必要があります。一致しない場合、動作は未定義です。失敗したり、任意の結果が返されたりする可能性があります。

この div は markdown エディター/ビューアを正常に動作させるためだけに使用され、Web には表示されません ```python
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf

# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
#   string name = 1;
#   int64 id = 2;
#   string context = 3;
# }
df = spark
  .readStream
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
  .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
  .where('event.name == "alice"')
  .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))

# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
  .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  .where('event.name == "alice"')

output.printSchema()
# root
#  |--event: struct (nullable = true)
#  |   |-- name : string (nullable = true)
#  |   |-- id: long (nullable = true)
#  |   |-- context: string (nullable = true)

output = output
  .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))

query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")
  .start()
```
この div は markdown エディター/ビューアを正常に動作させるためだけに使用され、Web には表示されません ```scala
import org.apache.spark.sql.protobuf.functions._

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
  .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  .where("event.name == \"alice\"")
  .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
  .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
```
この div は markdown エディター/ビューアを正常に動作させるためだけに使用され、Web には表示されません ```java
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;

// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
//   string name = 1;
//   int64 id = 2;
//   string context = 3;
// }

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
  .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  .where("event.name == \"alice\"")
  .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));

// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
  .select(
    from_protobuf(col("value"),
    "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  .where("event.name == \"alice\"")

output.printSchema()
// root
//  |--event: struct (nullable = true)
//  |    |-- name : string (nullable = true)
//  |    |-- id: long (nullable = true)
//  |    |-- context: string (nullable = true)

output = output.select(
  to_protobuf(col("event"),
  "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
```

Protobuf -> Spark SQL 変換でサポートされる型

現在、Spark は Protobuf メッセージ内の Protobuf スカラー型enum 型ネスト型、および マップ型 の読み込みをサポートしています。これらの型に加えて、spark-protobuf は Protobuf の OneOf フィールドのサポートも導入しています。これにより、複数のフィールドセットの可能性があるが、一度に 1 つのセットのみが存在できるメッセージを処理できます。これは、作業しているデータが常に同じ形式ではない状況で、エラーなしで異なるフィールドセットを持つメッセージを処理する必要がある場合に役立ちます。

Protobuf 型Spark SQL 型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
bytes BinaryType
メッセージ StructType
repeated ArrayType
map MapType
OneOf Struct

また、Timestamp および Duration の Protobuf 型の読み込みもサポートしています。

Protobuf 論理型Protobuf スキーマSpark SQL 型
duration MessageType{seconds: Long, nanos: Int} DayTimeIntervalType
timestamp MessageType{seconds: Long, nanos: Int} TimestampType

Spark SQL -> Protobuf 変換でサポートされる型

Spark は、すべての Spark SQL 型を Protobuf に書き込むことをサポートしています。ほとんどの型では、Spark 型から Protobuf 型へのマッピングは単純です (例: IntegerType は int に変換されます)。

Spark SQL 型Protobuf 型
BooleanType boolean
IntegerType int
LongType long
FloatType float
DoubleType double
StringType string
StringType enum
BinaryType bytes
StructType message
ArrayType repeated
MapType map

Protobuf フィールドの循環参照の処理

Protobuf データで作業する際に発生する可能性のある一般的な問題の 1 つは、循環参照の存在です。Protobuf では、フィールドがそれ自身を参照するか、元のフィールドを参照する別のフィールドを参照する場合に循環参照が発生します。これは、無限ループやその他の予期しない動作を引き起こす可能性があるため、データの解析時に問題を引き起こす可能性があります。この問題に対処するために、spark-protobuf の最新バージョンでは新しい機能が導入されています。フィールド型を介して循環参照をチェックする機能です。これにより、ユーザーは recursive.fields.max.depth オプションを使用して、スキーマの解析時に許可する再帰の最大レベル数を指定できます。デフォルトでは、spark-protobufrecursive.fields.max.depth を -1 に設定して再帰フィールドを許可しません。ただし、必要に応じてこのオプションを 1 から 10 に設定できます。

recursive.fields.max.depth を 1 に設定すると、すべての再帰フィールドがドロップされます。2 に設定すると 1 回再帰でき、3 に設定すると 2 回再帰できます。recursive.fields.max.depth の値が 10 より大きい場合、パフォーマンスの問題やスタックオーバーフローが発生する可能性があるため、許可されません。

以下の Protobuf メッセージの SQL スキーマは、recursive.fields.max.depth の値によって異なります。

この div は markdown エディター/ビューアを正常に動作させるためだけに使用され、Web には表示されません ```protobuf
syntax = "proto3"
message Person {
  string name = 1;
  Person bff = 2
}

// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.

1: struct<name: string>
2: struct<name: string, bff: struct<name: string>>
3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
...
```

データソースオプション

Protobuf のデータソースオプションは、

プロパティ名デフォルト意味スコープ
mode FAILFAST 解析中の破損したレコードを処理するためのモードを許可します。
  • PERMISSIVE: 破損したレコードに遭遇した場合、すべてのフィールドを null に設定します。
  • DROPMALFORMED: 破損したレコード全体を無視します。このモードは Protobuf の組み込み関数ではサポートされていません。
  • FAILFAST: 破損したレコードに遭遇した場合、例外をスローします。
読み込み
recursive.fields.max.depth -1 スキーマの解析時に許可する再帰レベルの最大数を指定します。詳細については、Protobuf フィールドの循環参照の処理セクションを参照してください。 読み込み
convert.any.fields.to.json false Protobuf Any フィールドを JSON に変換できるようにします。このオプションは注意して有効にする必要があります。JSON の変換と処理は非効率的です。さらに、スキーマの安全性も低下するため、ダウンストリーム処理でエラーが発生しやすくなります。 読み込み
emit.default.values false Protobuf を Spark struct にデシリアライズする際に、ゼロ値を持つフィールドをレンダリングするかどうか。シリアライズされた Protobuf でフィールドが空の場合、このライブラリはデフォルトでそれらを null としてデシリアライズします。このオプションは、型固有のゼロ値をレンダリングするかどうかを制御できます。 読み込み
enums.as.ints false enum フィールドを整数値としてレンダリングするかどうか。このオプションが false に設定されている場合、enum フィールドは StringType にマップされ、値は enum の名前になります。true に設定されている場合、enum フィールドは IntegerType にマップされ、値はその整数値になります。 読み込み
upcast.unsigned.ints false 符号なし整数をより大きな型にアップキャストするかどうか。このオプションを true に設定すると、uint32 には LongType が使用され、uint64 には Decimal(20, 0) が使用されるため、オーバーフローなしで大きな符号なし値を表現できます。 読み込み
unwrap.primitive.wrapper.types false デシリアライズ時に、よく知られたプリミティブラッパー型の struct 表現をアンラップするかどうか。デフォルトでは、プリミティブのラッパー型 (例: google.protobuf.Int32Value, google.protobuf.Int64Value など) は struct としてデシリアライズされます。 読み込み
retain.empty.message.types false スキーマで空の proto メッセージ型のフィールドを保持するかどうか。Spark は空の StructType の書き込みを許可しないため、空の proto メッセージ型はデフォルトでドロップされます。このオプションを true に設定すると、空の proto メッセージにダミー列 (__dummy_field_in_empty_struct) が挿入され、空のメッセージフィールドが保持されるようになります。 読み込み