Protobuf データソース ガイド
- デプロイ
- to_protobuf() と from_protobuf()
- Protobuf -> Spark SQL 変換でサポートされる型
- Spark SQL -> Protobuf 変換でサポートされる型
- Protobuf フィールドの循環参照の処理
- データソースオプション
Spark 3.4.0 リリース以降、Spark SQL は Protobuf データの読み書きをネイティブでサポートしています。
デプロイ
spark-protobuf モジュールは外部であり、デフォルトでは spark-submit または spark-shell には含まれていません。
他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-protobuf_2.13 およびその依存関係は、--packages を使用して spark-submit に直接追加できます。例:
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...
spark-shell で実験する場合も、--packages を使用して org.apache.spark:spark-protobuf_2.13 およびその依存関係を直接追加できます。
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.13:4.0.0 ...
外部依存関係を持つアプリケーションの送信に関する詳細については、アプリケーション送信ガイドを参照してください。
to_protobuf() と from_protobuf()
spark-protobuf パッケージは、列を Protobuf 形式のバイナリとしてエンコードするための to_protobuf 関数と、Protobuf バイナリデータを列にデコードするための from_protobuf() 関数を提供します。どちらの関数も 1 つの列を別の列に変換し、入力/出力の SQL データ型は複雑な型またはプリミティブ型にすることができます。
Protobuf メッセージを列として使用することは、Kafka のようなストリーミングソースから読み書きする場合に便利です。各 Kafka キー/値レコードには、Kafka への取り込みタイムスタンプ、Kafka のオフセットなどのメタデータが付加されます。
- データが含まれている「value」フィールドが Protobuf の場合、
from_protobuf()を使用してデータを抽出し、エンリッチし、クリーニングしてから、Kafka に再度プッシュするか、別のシンクに書き出すことができます。 to_protobuf()は、構造体を Protobuf メッセージに変換するために使用できます。このメソッドは、Kafka にデータを書き出す際に複数の列を 1 つの列に再エンコードしたい場合に特に便利です。
Spark SQL スキーマは、from_protobuf および to_protobuf に渡された Protobuf 記述子ファイルまたは Protobuf クラスに基づいて生成されます。指定された Protobuf クラスまたは Protobuf 記述子ファイルはデータと一致している必要があります。一致しない場合、動作は未定義です。失敗したり、任意の結果が返されたりする可能性があります。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();Protobuf -> Spark SQL 変換でサポートされる型
現在、Spark は Protobuf メッセージ内の Protobuf スカラー型、enum 型、ネスト型、および マップ型 の読み込みをサポートしています。これらの型に加えて、spark-protobuf は Protobuf の OneOf フィールドのサポートも導入しています。これにより、複数のフィールドセットの可能性があるが、一度に 1 つのセットのみが存在できるメッセージを処理できます。これは、作業しているデータが常に同じ形式ではない状況で、エラーなしで異なるフィールドセットを持つメッセージを処理する必要がある場合に役立ちます。
| Protobuf 型 | Spark SQL 型 |
|---|---|
| boolean | BooleanType |
| int | IntegerType |
| long | LongType |
| float | FloatType |
| double | DoubleType |
| string | StringType |
| enum | StringType |
| bytes | BinaryType |
| メッセージ | StructType |
| repeated | ArrayType |
| map | MapType |
| OneOf | Struct |
また、Timestamp および Duration の Protobuf 型の読み込みもサポートしています。
| Protobuf 論理型 | Protobuf スキーマ | Spark SQL 型 |
|---|---|---|
| duration | MessageType{seconds: Long, nanos: Int} | DayTimeIntervalType |
| timestamp | MessageType{seconds: Long, nanos: Int} | TimestampType |
Spark SQL -> Protobuf 変換でサポートされる型
Spark は、すべての Spark SQL 型を Protobuf に書き込むことをサポートしています。ほとんどの型では、Spark 型から Protobuf 型へのマッピングは単純です (例: IntegerType は int に変換されます)。
| Spark SQL 型 | Protobuf 型 |
|---|---|
| BooleanType | boolean |
| IntegerType | int |
| LongType | long |
| FloatType | float |
| DoubleType | double |
| StringType | string |
| StringType | enum |
| BinaryType | bytes |
| StructType | message |
| ArrayType | repeated |
| MapType | map |
Protobuf フィールドの循環参照の処理
Protobuf データで作業する際に発生する可能性のある一般的な問題の 1 つは、循環参照の存在です。Protobuf では、フィールドがそれ自身を参照するか、元のフィールドを参照する別のフィールドを参照する場合に循環参照が発生します。これは、無限ループやその他の予期しない動作を引き起こす可能性があるため、データの解析時に問題を引き起こす可能性があります。この問題に対処するために、spark-protobuf の最新バージョンでは新しい機能が導入されています。フィールド型を介して循環参照をチェックする機能です。これにより、ユーザーは recursive.fields.max.depth オプションを使用して、スキーマの解析時に許可する再帰の最大レベル数を指定できます。デフォルトでは、spark-protobuf は recursive.fields.max.depth を -1 に設定して再帰フィールドを許可しません。ただし、必要に応じてこのオプションを 1 から 10 に設定できます。
recursive.fields.max.depth を 1 に設定すると、すべての再帰フィールドがドロップされます。2 に設定すると 1 回再帰でき、3 に設定すると 2 回再帰できます。recursive.fields.max.depth の値が 10 より大きい場合、パフォーマンスの問題やスタックオーバーフローが発生する可能性があるため、許可されません。
以下の Protobuf メッセージの SQL スキーマは、recursive.fields.max.depth の値によって異なります。
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
1: struct<name: string>
2: struct<name: string, bff: struct<name: string>>
3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
...データソースオプション
Protobuf のデータソースオプションは、
- 以下の組み込み関数を介して設定できます。
from_protobufto_protobuf
| プロパティ名 | デフォルト | 意味 | スコープ |
|---|---|---|---|
mode |
FAILFAST |
解析中の破損したレコードを処理するためのモードを許可します。
|
読み込み |
recursive.fields.max.depth |
-1 |
スキーマの解析時に許可する再帰レベルの最大数を指定します。詳細については、Protobuf フィールドの循環参照の処理セクションを参照してください。 | 読み込み |
convert.any.fields.to.json |
false |
Protobuf Any フィールドを JSON に変換できるようにします。このオプションは注意して有効にする必要があります。JSON の変換と処理は非効率的です。さらに、スキーマの安全性も低下するため、ダウンストリーム処理でエラーが発生しやすくなります。 |
読み込み |
emit.default.values |
false |
Protobuf を Spark struct にデシリアライズする際に、ゼロ値を持つフィールドをレンダリングするかどうか。シリアライズされた Protobuf でフィールドが空の場合、このライブラリはデフォルトでそれらを null としてデシリアライズします。このオプションは、型固有のゼロ値をレンダリングするかどうかを制御できます。 |
読み込み |
enums.as.ints |
false |
enum フィールドを整数値としてレンダリングするかどうか。このオプションが false に設定されている場合、enum フィールドは StringType にマップされ、値は enum の名前になります。true に設定されている場合、enum フィールドは IntegerType にマップされ、値はその整数値になります。 |
読み込み |
upcast.unsigned.ints |
false |
符号なし整数をより大きな型にアップキャストするかどうか。このオプションを true に設定すると、uint32 には LongType が使用され、uint64 には Decimal(20, 0) が使用されるため、オーバーフローなしで大きな符号なし値を表現できます。 |
読み込み |
unwrap.primitive.wrapper.types |
false |
デシリアライズ時に、よく知られたプリミティブラッパー型の struct 表現をアンラップするかどうか。デフォルトでは、プリミティブのラッパー型 (例: google.protobuf.Int32Value, google.protobuf.Int64Value など) は struct としてデシリアライズされます。 | 読み込み |
retain.empty.message.types |
false |
スキーマで空の proto メッセージ型のフィールドを保持するかどうか。Spark は空の StructType の書き込みを許可しないため、空の proto メッセージ型はデフォルトでドロップされます。このオプションを true に設定すると、空の proto メッセージにダミー列 (__dummy_field_in_empty_struct) が挿入され、空のメッセージフィールドが保持されるようになります。 |
読み込み |