Apache Avro データソースガイド
- デプロイ
- 読み込みと保存の機能
- to_avro() と from_avro()
- データソースオプション
- 設定
- Databricks spark-avro との互換性
- Avro -> Spark SQL 変換でサポートされる型
- Spark SQL -> Avro 変換でサポートされる型
- Avro フィールドの循環参照の処理
Spark 2.4 リリース以降、Spark SQL は Apache Avro データの読み書きをネイティブでサポートするようになりました。
デプロイ
spark-avro モジュールは外部であり、デフォルトでは spark-submit や spark-shell には含まれていません。
すべての Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-avro_2.13 およびその依存関係は、--packages を使用して直接 spark-submit に追加できます。
./bin/spark-submit --packages org.apache.spark:spark-avro_2.13:4.0.0 ...
spark-shell で実験を行う場合も、--packages を使用して org.apache.spark:spark-avro_2.13 およびその依存関係を直接追加できます。
./bin/spark-shell --packages org.apache.spark:spark-avro_2.13:4.0.0 ...
外部依存関係を持つアプリケーションの送信に関する詳細については、アプリケーション送信ガイドを参照してください。
読み込みと保存の機能
spark-avro モジュールは外部であるため、DataFrameReader や DataFrameWriter には .avro() API はありません。
Avro フォーマットでデータを読み込む/保存するには、データソースオプション format を avro (または org.apache.spark.sql.avro) として指定する必要があります。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")to_avro() と from_avro()
Avro パッケージは、列を Avro フォーマットのバイナリとしてエンコードするための関数 to_avro と、Avro バイナリデータを列にデコードするための関数 from_avro() を提供します。これらの両方の関数は、1 つの列を別の列に変換し、入力/出力の SQL データ型は複雑な型でもプリミティブな型でも構いません。
Avro レコードを列として使用することは、Kafka のようなストリーミングソースから読み書きする場合に役立ちます。各 Kafka のキーと値のレコードには、Kafka への取り込みタイムスタンプ、Kafka のオフセットなどのメタデータが追加されます。
- データが含まれている "value" フィールドが Avro の場合、
from_avro()を使用してデータを抽出し、エンリッチし、クリーニングしてから、さらに下流の Kafka に再度プッシュしたり、ファイルに書き出したりできます。 to_avro()は、構造体を Avro レコードに変換するために使用できます。このメソッドは、Kafka にデータを書き出す際に複数の列を 1 つの列に再エンコードしたい場合に特に役立ちます。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as $"value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)CREATE TABLE t AS
SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s
FROM VALUES (1, NULL), (NULL, 'a') tab(member0, member1);
DECLARE avro_schema STRING;
SET VARIABLE avro_schema =
'{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';
SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t;
SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM (
SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t);
DROP TEMPORARY VARIABLE avro_schema;
DROP TABLE t;データソースオプション
Avro のデータソースオプションは、以下から設定できます。
DataFrameReaderまたはDataFrameWriterの.optionメソッド。from_avro関数のoptionsパラメータ。
| プロパティ名 | デフォルト | 意味 | スコープ | バージョン以降 |
|---|---|---|---|---|
avroSchema |
なし | JSON フォーマットでユーザーが提供するオプションのスキーマ。
|
読み込み、書き込み、および関数 from_avro |
2.4.0 |
recordName |
topLevelRecord | 書き込み結果のトップレベルレコード名。Avro 仕様で必須です。 | 書き込み | 2.4.0 |
recordNamespace |
"" | 書き込み結果のレコード名前空間。 | 書き込み | 2.4.0 |
ignoreExtension |
true | 読み込み時に、.avro 拡張子を持たないファイルを無視するかどうかを制御するオプション。このオプションが有効な場合、すべてのファイル ( .avro 拡張子の有無にかかわらず) が読み込まれます。このオプションは非推奨であり、将来のリリースで削除される予定です。ファイル名のフィルタリングには、汎用データソースオプション pathGlobFilter を使用してください。 |
読み込み | 2.4.0 |
compression |
snappy | compression オプションを使用すると、書き込み時に使用される圧縮コーデックを指定できます。現在サポートされているコーデックは、 uncompressed、snappy、deflate、bzip2、xz、zstandard です。オプションが設定されていない場合、 spark.sql.avro.compression.codec 設定が考慮されます。 |
書き込み | 2.4.0 |
mode |
FAILFAST | mode オプションを使用すると、関数 from_avro の解析モードを指定できます。現在サポートされているモードは次のとおりです。
|
関数 from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 設定の値) |
datetimeRebaseMode オプションを使用すると、Julian から Proleptic Gregorian へのカレンダーへの date、timestamp-micros、timestamp-millis 論理型の値の再ベースモードを指定できます。現在サポートされているモードは次のとおりです。
|
読み込みと関数 from_avro |
3.2.0 |
positionalFieldMatching |
false | これは `avroSchema` オプションと組み合わせて使用でき、提供された Avro スキーマのフィールドと SQL スキーマのフィールドのマッチング動作を調整できます。デフォルトでは、マッチングはフィールド名を無視して名前で行われます。このオプションが "true" に設定されている場合、マッチングはフィールドの位置に基づいて行われます。 | 読み込みと書き込み | 3.2.0 |
enableStableIdentifiersForUnionType |
false | true に設定すると、Avro スキーマは Spark SQL スキーマにデシリアライズされ、Avro Union 型は、フィールド名がそれぞれの型と一致したままになる構造に変換されます。結果のフィールド名は小文字に変換されます (例: member_int または member_string)。ユーザー定義の型名またはユーザー定義の型名と組み込み型名が、大文字小文字を区別しない場合に同一である場合、例外が発生します。ただし、それ以外の場合は、フィールド名を一意に識別できます。 | 読み込み | 3.5.0 |
stableIdentifierPrefixForUnionType |
member_ | `enableStableIdentifiersForUnionType` が有効な場合、このオプションは Avro Union 型のフィールドのプレフィックスを設定できます。 | 読み込み | 4.0.0 |
recursiveFieldMaxDepth |
-1 | このオプションが負の値に指定されたり、0 に設定されたりすると、再帰フィールドは許可されなくなります。1 に設定するとすべての再帰フィールドがドロップされ、2 に設定すると再帰フィールドが 1 回再帰され、3 に設定すると 2 回再帰され、最大 15 までとなります。15 より大きい値は、意図せず非常に大きなスキーマが作成されるのを避けるために許可されません。Avro メッセージの深さがこの制限を超えている場合、返される Spark struct は再帰制限後に切り捨てられます。使用例は、セクション Avro フィールドの循環参照の処理 にあります。 | 読み込み | 4.0.0 |
設定
Avro の設定は、spark.conf.set を使用するか、SQL を使用して SET key=value コマンドを実行することで行うことができます。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
| spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | true に設定すると、データソースプロバイダ com.databricks.spark.avro は、後方互換性のために組み込みの外部 Avro データソースモジュールにマッピングされます。注意: この SQL 設定は Spark 3.2 で非推奨となり、将来削除される可能性があります。 |
2.4.0 |
| spark.sql.avro.compression.codec | snappy | AVRO ファイルの書き込みに使用される圧縮コーデック。サポートされるコーデックは、uncompressed、deflate、snappy、bzip2、xz、zstandard です。デフォルトのコーデックは snappy です。 | 2.4.0 |
| spark.sql.avro.deflate.level | -1 | AVRO ファイルの書き込みに使用される deflate コーデックの圧縮レベル。有効な値は 1 から 9 (両端を含む) または -1 の範囲です。デフォルト値は -1 で、現在の実装ではレベル 6 に対応します。 | 2.4.0 |
| spark.sql.avro.xz.level | 6 | AVRO ファイルの書き込みに使用される xz コーデックの圧縮レベル。有効な値は 1 から 9 (両端を含む) の範囲です。現在の実装ではデフォルト値は 6 です。 | 4.0.0 |
| spark.sql.avro.zstandard.level | 3 | AVRO ファイルの書き込みに使用される zstandard コーデックの圧縮レベル。現在の実装ではデフォルト値は 3 です。 | 4.0.0 |
| spark.sql.avro.zstandard.bufferPool.enabled | false | true の場合、AVRO ファイルの書き込み時に ZSTD JNI ライブラリのバッファプールを有効にします。 | 4.0.0 |
| spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
Julian から Proleptic Gregorian へのカレンダーへの date、timestamp-micros、timestamp-millis 論理型の値の再ベースモード。
|
3.0.0 |
| spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
Proleptic Gregorian から Julian へのカレンダーへの date、timestamp-micros、timestamp-millis 論理型の値の再ベースモード。
|
3.0.0 |
| spark.sql.avro.filterPushdown.enabled | true | true の場合、Avro データソースへのフィルタプッシュダウンを有効にします。 | 3.1.0 |
Databricks spark-avro との互換性
この Avro データソースモジュールは、Databricks のオープンソースリポジトリ spark-avro に由来し、互換性があります。
デフォルトでは、SQL 設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled が有効になっている場合、データソースプロバイダ com.databricks.spark.avro は、この組み込みの Avro モジュールにマッピングされます。カタログメタストアで Provider プロパティが com.databricks.spark.avro である Spark テーブルの場合、この組み込み Avro モジュールを使用している場合にこれらのテーブルを読み込むには、このマッピングが不可欠です。
Databricks の spark-avro では、ショートカット関数 .avro() のために暗黙のクラス AvroDataFrameWriter と AvroDataFrameReader が作成されていました。この組み込みの外部モジュールでは、これらの暗黙のクラスは削除されました。代わりに、DataFrameWriter または DataFrameReader で .format("avro") を使用してください。これはクリーンで十分なはずです。
spark-avro jar ファイルの独自のビルドを使用したい場合は、設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled を無効にし、アプリケーションのデプロイ時に --jars オプションを使用するだけです。詳細については、アプリケーション送信ガイドの 高度な依存関係管理 セクションを参照してください。
Avro -> Spark SQL 変換でサポートされる型
現在、Spark は Avro のレコード内のすべての プリミティブ型 と 複雑な型 の読み込みをサポートしています。
| Avro 型 | Spark SQL 型 |
|---|---|
| boolean | BooleanType |
| int | IntegerType |
| long | LongType |
| float | FloatType |
| double | DoubleType |
| string | StringType |
| enum | StringType |
| fixed | BinaryType |
| bytes | BinaryType |
| record | StructType |
| array | ArrayType |
| map | MapType |
| union | 下記参照 |
上記の型に加えて、union 型の読み込みもサポートしています。次の 3 つの型は基本的な union 型と見なされます。
union(int, long)は LongType にマッピングされます。union(float, double)は DoubleType にマッピングされます。union(something, null)、ここで something はサポートされている任意の Avro 型です。これは、something の Spark SQL 型と同じ型にマッピングされ、nullable が true に設定されます。その他のすべての union 型は複雑な型と見なされます。これらは、union のメンバーに従って、フィールド名が member0、member1 などとなる StructType にマッピングされます。これは、Avro と Parquet の間で変換する場合の動作と一致します。
以下の Avro 論理型 の読み込みもサポートしています。
| Avro 論理型 | Avro 型 | Spark SQL 型 |
|---|---|---|
| date | int | DateType |
| timestamp-millis | long | TimestampType |
| timestamp-micros | long | TimestampType |
| decimal | fixed | DecimalType |
| decimal | bytes | DecimalType |
現時点では、Avro ファイルに含まれる docs、aliases、およびその他のプロパティは無視されます。
Spark SQL -> Avro 変換でサポートされる型
Spark は、すべての Spark SQL 型を Avro に書き込むことをサポートしています。ほとんどの型では、Spark 型から Avro 型へのマッピングは単純です (例: IntegerType は int に変換されます)。ただし、以下にリストされているいくつかの特別なケースがあります。
| Spark SQL 型 | Avro 型 | Avro 論理型 |
|---|---|---|
| ByteType | int | |
| ShortType | int | |
| BinaryType | bytes | |
| DateType | int | date |
| TimestampType | long | timestamp-micros |
| DecimalType | fixed | decimal |
オプション avroSchema を使用して、出力 Avro スキーマ全体を指定することもできます。これにより、Spark SQL 型を他の Avro 型に変換できます。次の変換はデフォルトでは適用されず、ユーザー指定の Avro スキーマが必要です。
| Spark SQL 型 | Avro 型 | Avro 論理型 |
|---|---|---|
| BinaryType | fixed | |
| StringType | enum | |
| TimestampType | long | timestamp-millis |
| DecimalType | bytes | decimal |
Avro フィールドの循環参照の処理
Avro では、フィールドの型が親レコードのいずれかで定義されている場合に循環参照が発生します。これは、無限ループやその他の予期しない動作を引き起こす可能性があるため、データの解析時に問題を引き起こす可能性があります。循環参照を持つスキーマで Avro データを読み込むには、ユーザーは recursiveFieldMaxDepth オプションを使用して、スキーマの解析時に許可される再帰の最大レベル数を指定できます。デフォルトでは、Spark Avro データソースは recursiveFieldMaxDepth を -1 に設定することで再帰フィールドを許可しません。ただし、必要に応じてこのオプションを 1 から 15 に設定できます。
recursiveFieldMaxDepth を 1 に設定するとすべての再帰フィールドがドロップされ、2 に設定すると 1 回再帰され、3 に設定すると 2 回再帰されます。15 より大きい recursiveFieldMaxDepth の値は、パフォーマンスの問題やスタックオーバーフローを引き起こす可能性があるため許可されません。
以下の Avro メッセージの SQL スキーマは、recursiveFieldMaxDepth の値によって異なります。
{
"type": "record",
"name": "Node",
"fields": [
{"name": "Id", "type": "int"},
{"name": "Next", "type": ["null", "Node"]}
]
}
// The Avro schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursiveFieldMaxDepth` value.
1: struct<Id: int>
2: struct<Id: int, Next: struct<Id: int>>
3: struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>