Apache Avro データソースガイド

Spark 2.4 リリース以降、Spark SQL は Apache Avro データの読み書きを組み込みでサポートしています。

デプロイ

spark-avro モジュールは外部モジュールであり、デフォルトでは spark-submitspark-shell には含まれていません。

他の Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-avro_2.12 とその依存関係は、--packages を使用して spark-submit に直接追加できます。例:

./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

spark-shell で実験するには、--packages を使用して org.apache.spark:spark-avro_2.12 とその依存関係を直接追加することもできます。

./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:3.5.1 ...

外部依存関係を持つアプリケーションの送信の詳細については、アプリケーション送信ガイド を参照してください。

ロードおよびセーブ関数

spark-avro モジュールは外部モジュールであるため、DataFrameReaderDataFrameWriter.avro API はありません。

Avro 形式でデータを読み書きするには、データソースオプション formatavro(または org.apache.spark.sql.avro)として指定する必要があります。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro()from_avro()

Avro パッケージは、列を Avro 形式のバイナリとしてエンコードする関数 to_avro と、Avro バイナリデータを列にデコードする関数 from_avro() を提供します。両方の関数は 1 つの列を別の列に変換し、入力/出力 SQL データ型は複合型またはプリミティブ型にすることができます。

列として Avro レコードを使用すると、Kafka のようなストリーミングソースの読み書きに役立ちます。各 Kafka キーバリューレコードには、Kafka への取り込みタイムスタンプ、Kafka 内のオフセットなど、いくつかのメタデータが追加されます。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)

データソースオプション

Avro のデータソースオプションは、次のように設定できます。

プロパティ名デフォルト意味スコープバージョン
avroSchema None ユーザーが JSON 形式で提供するオプションのスキーマ。
  • Avro ファイルを読み取るとき、または関数 from_avro を呼び出すとき、このオプションは進化したスキーマに設定できます。これは実際の Avro スキーマとは異なる互換性のあるスキーマです。逆シリアル化スキーマは、進化したスキーマと一貫性があります。たとえば、デフォルト値を持つ追加の列を含む進化したスキーマを設定した場合、Spark の読み取り結果は新しい列も含まれます。このオプションを from_avro と共に使用する場合、関数に実際の Avro スキーマを渡す必要があります。
  • Avro を書き込む場合、期待される出力 Avro スキーマが Spark によって変換されたスキーマと一致しない場合、このオプションを設定できます。たとえば、ある列の期待されるスキーマは、デフォルトで変換されたスキーマの「string」型ではなく「enum」型です。
読み取り、書き込み、関数 from_avro 2.4.0
recordName topLevelRecord 書き込み結果の最上位レコード名。Avro 仕様で必須です。 書き込み 2.4.0
recordNamespace "" 書き込み結果のレコードネームスペース。 書き込み 2.4.0
ignoreExtension true このオプションは、読み取り時に .avro 拡張子がないファイルの無視を制御します。
このオプションが有効になっている場合、すべてのファイル(.avro 拡張子があるものとないもの)が読み込まれます。
このオプションは非推奨となり、将来のリリースで削除される予定です。ファイル名のフィルタリングには、一般的なデータソースオプション pathGlobFilter を使用してください。
読み取り 2.4.0
compression snappy compression オプションを使用すると、書き込みで使用される圧縮コーデックを指定できます。
現在サポートされているコーデックは、uncompressedsnappydeflatebzip2xzzstandard です。
このオプションが設定されていない場合、設定 spark.sql.avro.compression.codec が考慮されます。
書き込み 2.4.0
mode FAILFAST mode オプションを使用すると、関数 from_avro の解析モードを指定できます。
現在サポートされているモードは、
  • FAILFAST:破損したレコードの処理時に例外をスローします。
  • PERMISSIVE:破損したレコードは null 結果として処理されます。そのため、データスキーマは完全に null 許容になるように強制されます。これは、ユーザーが提供したスキーマとは異なる場合があります。
関数 from_avro 2.4.0
datetimeRebaseMode (spark.sql.avro.datetimeRebaseModeInRead 設定の値) datetimeRebaseMode オプションを使用すると、ユリウス暦からプロレプティックグレゴリオ暦への datetimestamp-microstimestamp-millis 論理型の値のリベースモードを指定できます。
現在サポートされているモードは、
  • EXCEPTION:2 つの暦の間で曖昧な古い日付/タイムスタンプの読み取りに失敗します。
  • CORRECTED:リベースせずに日付/タイムスタンプを読み込みます。
  • LEGACY:ユリウス暦からプロレプティックグレゴリオ暦への古い日付/タイムスタンプのリベースを実行します。
読み取りと関数 from_avro 3.2.0
positionalFieldMatching false これは、`avroSchema` オプションと組み合わせて使用して、提供された Avro スキーマのフィールドと SQL スキーマのフィールドの照合動作を調整できます。デフォルトでは、位置を無視してフィールド名を使用して照合が行われます。このオプションを「true」に設定すると、フィールドの位置に基づいて照合が行われます。 読み取りと書き込み 3.2.0
enableStableIdentifiersForUnionType false true に設定されている場合、Avro スキーマは Spark SQL スキーマに逆シリアル化され、Avro Union 型は、フィールド名がそれぞれの型と一致する構造に変換されます。結果のフィールド名は小文字に変換されます(例:member_int または member_string)。2 つのユーザー定義型の名前、またはユーザー定義型の名前と組み込み型の名前が、大文字と小文字を区別せずに同一である場合、例外が発生します。ただし、その他のケースでは、フィールド名を一意に識別できます。 読み取り 3.5.0

設定

Avro の設定は、SparkSession の setConf メソッドを使用するか、SQL を使用して SET key=value コマンドを実行することで行うことができます。

プロパティ名デフォルト意味バージョン
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true true に設定されている場合、データソースプロバイダー com.databricks.spark.avro は、後方互換性のために組み込みだが外部の Avro データソースモジュールにマッピングされます。
注:SQL 設定は Spark 3.2 で非推奨となり、将来削除される可能性があります。
2.4.0
spark.sql.avro.compression.codec snappy AVRO ファイルの書き込みに使用される圧縮コーデック。サポートされているコーデック:uncompressed、deflate、snappy、bzip2、xz、zstandard。デフォルトコーデックは snappy です。 2.4.0
spark.sql.avro.deflate.level -1 AVRO ファイルの書き込みに使用される deflate コーデックの圧縮レベル。有効な値は 1 から 9 の範囲内、または -1 でなければなりません。デフォルト値は -1 であり、現在の実装ではレベル 6 に対応します。 2.4.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION ユリウス暦からプロレプティックグレゴリオ暦への datetimestamp-microstimestamp-millis 論理型の値のリベースモード。
  • EXCEPTION: Sparkは、2つの暦で曖昧になる古い日付/タイムスタンプを検出すると、読み込みを失敗します。
  • CORRECTED: Sparkはリベースを実行せず、日付/タイムスタンプをそのまま読み込みます。
  • LEGACY: Sparkは、Avroファイルを読み込む際に、レガシーハイブリッド(ユリウス暦 + グレゴリオ暦)からプロレプティックグレゴリオ暦へ日付/タイムスタンプをリベースします。
この設定は、Avroファイルのライター情報(Spark、Hiveなど)が不明な場合にのみ有効です。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION プロレプティックグレゴリオ暦からユリウス暦へのdatetimestamp-microstimestamp-millis論理型の値のリベースモード
  • EXCEPTION: Sparkは、2つの暦で曖昧になる古い日付/タイムスタンプを検出すると、書き込みを失敗します。
  • CORRECTED: Sparkはリベースを実行せず、日付/タイムスタンプをそのまま書き込みます。
  • LEGACY: Sparkは、Avroファイルに書き込む際に、プロレプティックグレゴリオ暦からレガシーハイブリッド(ユリウス暦 + グレゴリオ暦)へ日付/タイムスタンプをリベースします。
3.0.0
spark.sql.avro.filterPushdown.enabled true trueの場合、Avroデータソースへのフィルタプッシュダウンを有効にします。 3.1.0

Databricks spark-avro との互換性

このAvroデータソースモジュールは、元々はDatabricksのオープンソースリポジトリspark-avroからのものであり、互換性があります。

SQL設定spark.sql.legacy.replaceDatabricksSparkAvro.enabledが有効になっていると、デフォルトでデータソースプロバイダcom.databricks.spark.avroがこの組み込みAvroモジュールにマップされます。カタログメタストアでProviderプロパティがcom.databricks.spark.avroとして作成されたSparkテーブルの場合、この組み込みAvroモジュールを使用する場合は、これらのテーブルを読み込むためにマッピングが不可欠です。

Databricksのspark-avroでは、ショートカット関数.avro()のために、暗黙的クラスAvroDataFrameWriterAvroDataFrameReaderが作成されていました。この組み込みだが外部モジュールでは、両方の暗黙的クラスは削除されています。.format("avro")DataFrameWriterまたはDataFrameReaderで代わりに使用してください。これはクリーンで十分です。

独自のビルドのspark-avrojarファイルを使用する場合は、設定spark.sql.legacy.replaceDatabricksSparkAvro.enabledを無効にして、アプリケーションのデプロイ時にオプション--jarsを使用できます。詳細については、アプリケーション提出ガイドの高度な依存関係管理セクションを参照してください。

Avro → Spark SQL 変換でサポートされる型

現在、SparkはAvroのレコード下のすべてのプリミティブ型複合型の読み込みをサポートしています。

Avro型Spark SQL型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union 下記参照

上記の型に加えて、union型の読み込みをサポートしています。以下の3つの型は基本的なunion型と見なされます。

  1. union(int, long)はLongTypeにマップされます。
  2. union(float, double)はDoubleTypeにマップされます。
  3. union(something, null)(ここでsomethingはサポートされている任意のAvro型です)。これは、somethingと同じSpark SQL型にマップされ、nullableはtrueに設定されます。その他のすべてのunion型は複合型と見なされます。それらは、unionのメンバーに従って、フィールド名がmember0、member1などになるStructTypeにマップされます。これは、AvroとParquet間の変換時の動作と一貫しています。

また、以下のAvro論理型の読み込みもサポートしています。

Avro論理型Avro型Spark SQL型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

現時点では、Avroファイルにあるdocs、aliases、その他のプロパティは無視されます。

Spark SQL → Avro 変換でサポートされる型

Sparkは、すべてのSpark SQL型をAvroに書き込むことをサポートしています。ほとんどの型の場合、Spark型からAvro型へのマッピングは簡単です(例:IntegerTypeはintに変換されます)。ただし、以下に示すいくつかの特別なケースがあります。

Spark SQL型Avro型Avro論理型
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

オプションavroSchemaを使用して、出力Avroスキーマ全体を指定することもできます。これにより、Spark SQL型を他のAvro型に変換できます。次の変換はデフォルトでは適用されず、ユーザーが指定したAvroスキーマが必要です。

Spark SQL型Avro型Avro論理型
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal