Protobuf データソースガイド
- デプロイ
- to_protobuf() と from_protobuf()
- Protobuf から Spark SQL への変換でサポートされる型
- Spark SQL から Protobuf への変換でサポートされる型
- 循環参照 Protobuf フィールドの処理
Spark 3.4.0 リリース以降、Spark SQL は Protobuf データの読み書きを組み込みでサポートしています。
デプロイ
spark-protobuf
モジュールは外部モジュールであり、spark-submit
または spark-shell
にはデフォルトで含まれていません。
他の Spark アプリケーションと同様に、spark-submit
を使用してアプリケーションを起動します。spark-protobuf_2.12
とその依存関係は、--packages
を使用して spark-submit
に直接追加できます。
./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...
spark-shell
で実験する場合は、--packages
を使用して org.apache.spark:spark-protobuf_2.12
とその依存関係を直接追加することもできます。
./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.5.1 ...
外部依存関係を持つアプリケーションの提出に関する詳細は、アプリケーション提出ガイド を参照してください。
to_protobuf() と from_protobuf()
spark-protobuf パッケージは、列を Protobuf 形式のバイナリとしてエンコードする関数 to_protobuf
と、Protobuf バイナリデータを列にデコードする関数 from_protobuf()
を提供します。どちらの関数も、1 つの列を別の列に変換し、入力/出力 SQL データ型は複合型またはプリミティブ型にすることができます。
Kafka のようなストリーミングソースとの読み書きを行う場合、列として Protobuf メッセージを使用すると便利です。各 Kafka キーバリューレコードには、Kafka への取り込みタイムスタンプ、Kafka 内のオフセットなど、いくつかのメタデータが追加されます。
- データを含む「value」フィールドが Protobuf である場合、
from_protobuf()
を使用してデータを抽出し、エンリッチ、クリーンアップしてから、Kafka に再びプッシュするか、別のシンクに書き出すことができます。 to_protobuf()
は、struct を Protobuf メッセージに変換するために使用できます。このメソッドは、データを Kafka に書き出す際に複数の列を 1 つの列に再エンコードしたい場合に特に役立ちます。
Spark SQL スキーマは、from_protobuf
と to_protobuf
に渡された Protobuf 記述子ファイルまたは Protobuf クラスに基づいて生成されます。指定された Protobuf クラスまたは Protobuf 記述子ファイルはデータと一致する必要があります。そうでない場合、動作は未定義です。失敗するか、任意の結果を返す可能性があります。
from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
# from_protobuf and to_protobuf provide two schema choices. Via Protobuf descriptor file,
# or via shaded Java class.
# give input .proto protobuf schema
# syntax = "proto3"
# message AppEvent {
# string name = 1;
# int64 id = 2;
# string context = 3;
# }
df = spark
.readStream
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
# 1. Decode the Protobuf data of schema `AppEvent` into a struct;
# 2. Filter by column `name`;
# 3. Encode the column `event` in Protobuf format.
# The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
output = df
.select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))
.where('event.name == "alice"')
.select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
# Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
# class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
# it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
# 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
# https://github.com/rangadi/shaded-protobuf-classes.
output = df
.select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
.where('event.name == "alice"')
output.printSchema()
# root
# |--event: struct (nullable = true)
# | |-- name : string (nullable = true)
# | |-- id: long (nullable = true)
# | |-- context: string (nullable = true)
output = output
.select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")
.start()
import org.apache.spark.sql.protobuf.functions._
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
val output = df
.select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
.where("event.name == \"alice\"")
.select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
var output = df
.select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.protobuf.functions.*;
// `from_protobuf` and `to_protobuf` provides two schema choices. Via the protobuf descriptor file,
// or via shaded Java class.
// give input .proto protobuf schema
// syntax = "proto3"
// message AppEvent {
// string name = 1;
// int64 id = 2;
// string context = 3;
// }
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Protobuf data of schema `AppEvent` into a struct;
// 2. Filter by column `name`;
// 3. Encode the column `event` in Protobuf format.
// The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
Dataset<Row> output = df
.select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
.where("event.name == \"alice\"")
.select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
// Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
// class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
// it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
// 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
// https://github.com/rangadi/shaded-protobuf-classes.
Dataset<Row> output = df
.select(
from_protobuf(col("value"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
.where("event.name == \"alice\"")
output.printSchema()
// root
// |--event: struct (nullable = true)
// | |-- name : string (nullable = true)
// | |-- id: long (nullable = true)
// | |-- context: string (nullable = true)
output = output.select(
to_protobuf(col("event"),
"org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
Protobuf から Spark SQL への変換でサポートされる型
現在、Spark は Protobuf のメッセージの下にあるProtobuf スカラー型、列挙型、ネスト型、およびマップ型の読み取りをサポートしています。これらの型に加えて、spark-protobuf
は Protobuf OneOf
フィールドのサポートも導入しています。これにより、複数の可能なフィールドセットを持つことができるメッセージを処理できますが、一度に存在できるのは 1 つのセットのみです。これは、扱っているデータが常に同じ形式ではない場合、エラーが発生することなく異なるフィールドセットを持つメッセージを処理する必要がある場合に役立ちます。
Protobuf 型 | Spark SQL 型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
bytes | BinaryType |
Message | StructType |
repeated | ArrayType |
map | MapType |
OneOf | Struct |
また、以下の Protobuf 型Timestamp と Duration の読み取りもサポートしています。
Protobuf 論理型 | Protobuf スキーマ | Spark SQL 型 |
---|---|---|
duration | MessageType{seconds: Long, nanos: Int} | DayTimeIntervalType |
timestamp | MessageType{seconds: Long, nanos: Int} | TimestampType |
Spark SQL から Protobuf への変換でサポートされる型
Spark は、すべての Spark SQL 型を Protobuf に書き込むことをサポートしています。ほとんどの型の場合、Spark 型から Protobuf 型へのマッピングは簡単です(例:IntegerType は int に変換されます)。
Spark SQL 型 | Protobuf 型 |
---|---|
BooleanType | boolean |
IntegerType | int |
LongType | long |
FloatType | float |
DoubleType | double |
StringType | string |
StringType | enum |
BinaryType | bytes |
StructType | message |
ArrayType | repeated |
MapType | map |
循環参照 Protobuf フィールドの処理
Protobuf データを扱う際に発生する可能性のある一般的な問題の 1 つは、循環参照の存在です。Protobuf では、循環参照は、フィールドが自分自身または元のフィールドを参照する別のフィールドを参照する場合に発生します。これは、無限ループやその他の予期しない動作を引き起こす可能性があるため、データの解析時に問題を引き起こす可能性があります。この問題に対処するために、最新バージョンの spark-protobuf は、フィールド型を通じて循環参照をチェックできる新しい機能を導入しています。これにより、ユーザーはrecursive.fields.max.depth
オプションを使用して、スキーマを解析する際に許可する再帰の最大レベル数を指定できます。recursive.fields.max.depth
を -1 に設定することにより、デフォルトではspark-protobuf
は再帰的なフィールドを許可しません。ただし、必要に応じてこのオプションを 0 から 10 に設定できます。
recursive.fields.max.depth
を 0 に設定すると、すべての再帰的なフィールドが削除され、1 に設定すると 1 回再帰することが許可され、2 に設定すると 2 回再帰することが許可されます。パフォーマンスの問題やスタックオーバーフローにつながる可能性があるため、recursive.fields.max.depth
の値を 10 より大きくすることはできません。
以下の Protobuf メッセージの SQL スキーマは、recursive.fields.max.depth
の値によって異なります。
syntax = "proto3"
message Person {
string name = 1;
Person bff = 2
}
// The protobuf schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursive.fields.max.depth` value.
0: struct<name: string, bff: null>
1: struct<name string, bff: <name: string, bff: null>>
2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...