Apache Avro データソースガイド
- デプロイ
- ロードおよびセーブ関数
to_avro()
とfrom_avro()
- データソースオプション
- 設定
- Databricks spark-avro との互換性
- Avro → Spark SQL 変換でサポートされる型
- Spark SQL → Avro 変換でサポートされる型
Spark 2.4 リリース以降、Spark SQL は Apache Avro データの読み書きを組み込みでサポートしています。
デプロイ
spark-avro
モジュールは外部モジュールであり、デフォルトでは spark-submit
や spark-shell
には含まれていません。
他の Spark アプリケーションと同様に、spark-submit
を使用してアプリケーションを起動します。spark-avro_2.12
とその依存関係は、--packages
を使用して spark-submit
に直接追加できます。例:
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
spark-shell
で実験するには、--packages
を使用して org.apache.spark:spark-avro_2.12
とその依存関係を直接追加することもできます。
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...
外部依存関係を持つアプリケーションの送信の詳細については、アプリケーション送信ガイド を参照してください。
ロードおよびセーブ関数
spark-avro
モジュールは外部モジュールであるため、DataFrameReader
や DataFrameWriter
に .avro
API はありません。
Avro 形式でデータを読み書きするには、データソースオプション format
を avro
(または org.apache.spark.sql.avro
)として指定する必要があります。
df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
to_avro()
と from_avro()
Avro パッケージは、列を Avro 形式のバイナリとしてエンコードする関数 to_avro
と、Avro バイナリデータを列にデコードする関数 from_avro()
を提供します。両方の関数は 1 つの列を別の列に変換し、入力/出力 SQL データ型は複合型またはプリミティブ型にすることができます。
列として Avro レコードを使用すると、Kafka のようなストリーミングソースの読み書きに役立ちます。各 Kafka キーバリューレコードには、Kafka への取り込みタイムスタンプ、Kafka 内のオフセットなど、いくつかのメタデータが追加されます。
- データを含む「value」フィールドが Avro の場合、
from_avro()
を使用してデータを取り出し、エンリッチ、クレンジングして、Kafka に再びプッシュしたり、ファイルに書き出したりすることができます。 to_avro()
は、構造体を Avro レコードに変換するために使用できます。このメソッドは、Kafka にデータを書き出す際に複数の列を 1 つの列に再エンコードする場合に特に便利です。
from pyspark.sql.avro.functions import from_avro, to_avro
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("subscribe", "topic1")\
.load()
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
query = output\
.writeStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
.option("topic", "topic2")\
.start()
import org.apache.spark.sql.avro.functions._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"user")
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as $"value")
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;
// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
.select(from_avro(col("value"), jsonFormatSchema).as("user"))
.where("user.favorite_color == \"red\"")
.select(to_avro(col("user.name")).as("value"));
StreamingQuery query = output
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
データソースオプション
Avro のデータソースオプションは、次のように設定できます。
DataFrameReader
またはDataFrameWriter
の.option
メソッドを使用します。- 関数
from_avro
のoptions
パラメータを使用します。
プロパティ名 | デフォルト | 意味 | スコープ | バージョン |
---|---|---|---|---|
avroSchema |
None | ユーザーが JSON 形式で提供するオプションのスキーマ。
|
読み取り、書き込み、関数 from_avro |
2.4.0 |
recordName |
topLevelRecord | 書き込み結果の最上位レコード名。Avro 仕様で必須です。 | 書き込み | 2.4.0 |
recordNamespace |
"" | 書き込み結果のレコードネームスペース。 | 書き込み | 2.4.0 |
ignoreExtension |
true | このオプションは、読み取り時に .avro 拡張子がないファイルの無視を制御します。このオプションが有効になっている場合、すべてのファイル( .avro 拡張子があるものとないもの)が読み込まれます。このオプションは非推奨となり、将来のリリースで削除される予定です。ファイル名のフィルタリングには、一般的なデータソースオプション pathGlobFilter を使用してください。 |
読み取り | 2.4.0 |
compression |
snappy | compression オプションを使用すると、書き込みで使用される圧縮コーデックを指定できます。現在サポートされているコーデックは、 uncompressed 、snappy 、deflate 、bzip2 、xz 、zstandard です。このオプションが設定されていない場合、設定 spark.sql.avro.compression.codec が考慮されます。 |
書き込み | 2.4.0 |
mode |
FAILFAST | mode オプションを使用すると、関数 from_avro の解析モードを指定できます。現在サポートされているモードは、
|
関数 from_avro |
2.4.0 |
datetimeRebaseMode |
(spark.sql.avro.datetimeRebaseModeInRead 設定の値) |
datetimeRebaseMode オプションを使用すると、ユリウス暦からプロレプティックグレゴリオ暦への date 、timestamp-micros 、timestamp-millis 論理型の値のリベースモードを指定できます。現在サポートされているモードは、
|
読み取りと関数 from_avro |
3.2.0 |
positionalFieldMatching |
false | これは、`avroSchema` オプションと組み合わせて使用して、提供された Avro スキーマのフィールドと SQL スキーマのフィールドの照合動作を調整できます。デフォルトでは、位置を無視してフィールド名を使用して照合が行われます。このオプションを「true」に設定すると、フィールドの位置に基づいて照合が行われます。 | 読み取りと書き込み | 3.2.0 |
enableStableIdentifiersForUnionType |
false | true に設定されている場合、Avro スキーマは Spark SQL スキーマに逆シリアル化され、Avro Union 型は、フィールド名がそれぞれの型と一致する構造に変換されます。結果のフィールド名は小文字に変換されます(例:member_int または member_string)。2 つのユーザー定義型の名前、またはユーザー定義型の名前と組み込み型の名前が、大文字と小文字を区別せずに同一である場合、例外が発生します。ただし、その他のケースでは、フィールド名を一意に識別できます。 | 読み取り | 3.5.0 |
設定
Avro の設定は、SparkSession の setConf
メソッドを使用するか、SQL を使用して SET key=value
コマンドを実行することで行うことができます。
プロパティ名 | デフォルト | 意味 | バージョン |
---|---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | true に設定されている場合、データソースプロバイダー com.databricks.spark.avro は、後方互換性のために組み込みだが外部の Avro データソースモジュールにマッピングされます。注:SQL 設定は Spark 3.2 で非推奨となり、将来削除される可能性があります。 |
2.4.0 |
spark.sql.avro.compression.codec | snappy | AVRO ファイルの書き込みに使用される圧縮コーデック。サポートされているコーデック:uncompressed、deflate、snappy、bzip2、xz、zstandard。デフォルトコーデックは snappy です。 | 2.4.0 |
spark.sql.avro.deflate.level | -1 | AVRO ファイルの書き込みに使用される deflate コーデックの圧縮レベル。有効な値は 1 から 9 の範囲内、または -1 でなければなりません。デフォルト値は -1 であり、現在の実装ではレベル 6 に対応します。 | 2.4.0 |
spark.sql.avro.datetimeRebaseModeInRead | EXCEPTION |
ユリウス暦からプロレプティックグレゴリオ暦への date 、timestamp-micros 、timestamp-millis 論理型の値のリベースモード。
|
3.0.0 |
spark.sql.avro.datetimeRebaseModeInWrite | EXCEPTION |
プロレプティックグレゴリオ暦からユリウス暦へのdate 、timestamp-micros 、timestamp-millis 論理型の値のリベースモード
|
3.0.0 |
spark.sql.avro.filterPushdown.enabled | true | trueの場合、Avroデータソースへのフィルタプッシュダウンを有効にします。 | 3.1.0 |
Databricks spark-avro との互換性
このAvroデータソースモジュールは、元々はDatabricksのオープンソースリポジトリspark-avroからのものであり、互換性があります。
SQL設定spark.sql.legacy.replaceDatabricksSparkAvro.enabled
が有効になっていると、デフォルトでデータソースプロバイダcom.databricks.spark.avro
がこの組み込みAvroモジュールにマップされます。カタログメタストアでProvider
プロパティがcom.databricks.spark.avro
として作成されたSparkテーブルの場合、この組み込みAvroモジュールを使用する場合は、これらのテーブルを読み込むためにマッピングが不可欠です。
Databricksのspark-avroでは、ショートカット関数.avro()
のために、暗黙的クラスAvroDataFrameWriter
とAvroDataFrameReader
が作成されていました。この組み込みだが外部モジュールでは、両方の暗黙的クラスは削除されています。.format("avro")
をDataFrameWriter
またはDataFrameReader
で代わりに使用してください。これはクリーンで十分です。
独自のビルドのspark-avro
jarファイルを使用する場合は、設定spark.sql.legacy.replaceDatabricksSparkAvro.enabled
を無効にして、アプリケーションのデプロイ時にオプション--jars
を使用できます。詳細については、アプリケーション提出ガイドの高度な依存関係管理セクションを参照してください。
Avro → Spark SQL 変換でサポートされる型
現在、SparkはAvroのレコード下のすべてのプリミティブ型と複合型の読み込みをサポートしています。
Avro型 | Spark SQL型 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | 下記参照 |
上記の型に加えて、union
型の読み込みをサポートしています。以下の3つの型は基本的なunion
型と見なされます。
union(int, long)
はLongTypeにマップされます。union(float, double)
はDoubleTypeにマップされます。union(something, null)
(ここでsomethingはサポートされている任意のAvro型です)。これは、somethingと同じSpark SQL型にマップされ、nullableはtrueに設定されます。その他のすべてのunion型は複合型と見なされます。それらは、unionのメンバーに従って、フィールド名がmember0、member1などになるStructTypeにマップされます。これは、AvroとParquet間の変換時の動作と一貫しています。
また、以下のAvro論理型の読み込みもサポートしています。
Avro論理型 | Avro型 | Spark SQL型 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
現時点では、Avroファイルにあるdocs、aliases、その他のプロパティは無視されます。
Spark SQL → Avro 変換でサポートされる型
Sparkは、すべてのSpark SQL型をAvroに書き込むことをサポートしています。ほとんどの型の場合、Spark型からAvro型へのマッピングは簡単です(例:IntegerTypeはintに変換されます)。ただし、以下に示すいくつかの特別なケースがあります。
Spark SQL型 | Avro型 | Avro論理型 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
オプションavroSchema
を使用して、出力Avroスキーマ全体を指定することもできます。これにより、Spark SQL型を他のAvro型に変換できます。次の変換はデフォルトでは適用されず、ユーザーが指定したAvroスキーマが必要です。
Spark SQL型 | Avro型 | Avro論理型 |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |