Apache Avro データソースガイド

Spark 2.4 リリース以降、Spark SQL は Apache Avro データの読み書きをネイティブでサポートするようになりました。

デプロイ

spark-avro モジュールは外部であり、デフォルトでは spark-submitspark-shell には含まれていません。

すべての Spark アプリケーションと同様に、spark-submit を使用してアプリケーションを起動します。spark-avro_2.13 およびその依存関係は、--packages を使用して直接 spark-submit に追加できます。

./bin/spark-submit --packages org.apache.spark:spark-avro_2.13:4.0.0 ...

spark-shell で実験を行う場合も、--packages を使用して org.apache.spark:spark-avro_2.13 およびその依存関係を直接追加できます。

./bin/spark-shell --packages org.apache.spark:spark-avro_2.13:4.0.0 ...

外部依存関係を持つアプリケーションの送信に関する詳細については、アプリケーション送信ガイドを参照してください。

読み込みと保存の機能

spark-avro モジュールは外部であるため、DataFrameReaderDataFrameWriter には .avro() API はありません。

Avro フォーマットでデータを読み込む/保存するには、データソースオプション formatavro (または org.apache.spark.sql.avro) として指定する必要があります。

df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
df <- read.df("examples/src/main/resources/users.avro", "avro")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")

to_avro() と from_avro()

Avro パッケージは、列を Avro フォーマットのバイナリとしてエンコードするための関数 to_avro と、Avro バイナリデータを列にデコードするための関数 from_avro() を提供します。これらの両方の関数は、1 つの列を別の列に変換し、入力/出力の SQL データ型は複雑な型でもプリミティブな型でも構いません。

Avro レコードを列として使用することは、Kafka のようなストリーミングソースから読み書きする場合に役立ちます。各 Kafka のキーと値のレコードには、Kafka への取り込みタイムスタンプ、Kafka のオフセットなどのメタデータが追加されます。

from pyspark.sql.avro.functions import from_avro, to_avro

# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("subscribe", "topic1")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "topic2")\
  .start()
import org.apache.spark.sql.avro.functions._

// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
val output = df
  .select(from_avro($"value", jsonFormatSchema) as $"user")
  .where("user.favorite_color == \"red\"")
  .select(to_avro($"user.name") as $"value")

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start()
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.avro.functions.*;

// `from_avro` requires Avro schema in JSON string format.
String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load();

// 1. Decode the Avro data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Avro format.
Dataset<Row> output = df
  .select(from_avro(col("value"), jsonFormatSchema).as("user"))
  .where("user.favorite_color == \"red\"")
  .select(to_avro(col("user.name")).as("value"));

StreamingQuery query = output
  .writeStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic2")
  .start();
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")

df <- read.stream(
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  subscribe = "topic1"
)

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.

output <- select(
  filter(
    select(df, alias(from_avro("value", jsonFormatSchema), "user")),
    column("user.favorite_color") == "red"
  ),
  alias(to_avro("user.name"), "value")
)

write.stream(
  output,
  "kafka",
  kafka.bootstrap.servers = "host1:port1,host2:port2",
  topic = "topic2"
)
CREATE TABLE t AS
  SELECT NAMED_STRUCT('u', NAMED_STRUCT('member0', member0, 'member1', member1)) AS s
  FROM VALUES (1, NULL), (NULL,  'a') tab(member0, member1);
DECLARE avro_schema STRING;
SET VARIABLE avro_schema =
  '{ "type": "record", "name": "struct", "fields": [{ "name": "u", "type": ["int","string"] }] }';

SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t;

SELECT FROM_AVRO(result, avro_schema, MAP()).u FROM (
  SELECT TO_AVRO(s, avro_schema) AS RESULT FROM t);

DROP TEMPORARY VARIABLE avro_schema;
DROP TABLE t;

データソースオプション

Avro のデータソースオプションは、以下から設定できます。

プロパティ名デフォルト意味スコープバージョン以降
avroSchema なし JSON フォーマットでユーザーが提供するオプションのスキーマ。
  • Avro ファイルを読み込む場合や、関数 from_avro を呼び出す場合、このオプションは、実際の Avro スキーマとは互換性はあるが異なる、進化したスキーマに設定できます。デシリアライゼーション スキーマは、進化したスキーマと一致します。たとえば、デフォルト値を持つ追加の列を含む進化したスキーマを設定した場合、Spark での読み込み結果にも新しい列が含まれます。このオプションを from_avro と一緒に使用する場合、実際の Avro スキーマを関数にパラメータとして渡す必要があることに注意してください。
  • Avro を書き込む場合、期待される出力 Avro スキーマが Spark によって変換されたスキーマと一致しない場合に、このオプションを設定できます。たとえば、ある列の期待されるスキーマが、デフォルトの変換済みスキーマの "string" 型ではなく "enum" 型である場合などです。
読み込み、書き込み、および関数 from_avro 2.4.0
recordName topLevelRecord 書き込み結果のトップレベルレコード名。Avro 仕様で必須です。 書き込み 2.4.0
recordNamespace "" 書き込み結果のレコード名前空間。 書き込み 2.4.0
ignoreExtension true 読み込み時に、.avro 拡張子を持たないファイルを無視するかどうかを制御するオプション。
このオプションが有効な場合、すべてのファイル (.avro 拡張子の有無にかかわらず) が読み込まれます。
このオプションは非推奨であり、将来のリリースで削除される予定です。ファイル名のフィルタリングには、汎用データソースオプション pathGlobFilter を使用してください。
読み込み 2.4.0
compression snappy compression オプションを使用すると、書き込み時に使用される圧縮コーデックを指定できます。
現在サポートされているコーデックは、uncompressedsnappydeflatebzip2xzzstandard です。
オプションが設定されていない場合、spark.sql.avro.compression.codec 設定が考慮されます。
書き込み 2.4.0
mode FAILFAST mode オプションを使用すると、関数 from_avro の解析モードを指定できます。
現在サポートされているモードは次のとおりです。
  • FAILFAST: 破損したレコードの処理時に例外をスローします。
  • PERMISSIVE: 破損したレコードは null 結果として処理されます。したがって、データスキーマは完全に null 許容に強制されます。これは、ユーザーが提供したスキーマとは異なる場合があります。
関数 from_avro 2.4.0
datetimeRebaseMode (spark.sql.avro.datetimeRebaseModeInRead 設定の値) datetimeRebaseMode オプションを使用すると、Julian から Proleptic Gregorian へのカレンダーへの datetimestamp-microstimestamp-millis 論理型の値の再ベースモードを指定できます。
現在サポートされているモードは次のとおりです。
  • EXCEPTION: 2 つのカレンダー間で曖昧な古い日付/タイムスタンプの読み込みで失敗します。
  • CORRECTED: 再ベースなしで日付/タイムスタンプを読み込みます。
  • LEGACY: 古い日付/タイムスタンプを Julian から Proleptic Gregorian へのカレンダーに再ベースします。
読み込みと関数 from_avro 3.2.0
positionalFieldMatching false これは `avroSchema` オプションと組み合わせて使用でき、提供された Avro スキーマのフィールドと SQL スキーマのフィールドのマッチング動作を調整できます。デフォルトでは、マッチングはフィールド名を無視して名前で行われます。このオプションが "true" に設定されている場合、マッチングはフィールドの位置に基づいて行われます。 読み込みと書き込み 3.2.0
enableStableIdentifiersForUnionType false true に設定すると、Avro スキーマは Spark SQL スキーマにデシリアライズされ、Avro Union 型は、フィールド名がそれぞれの型と一致したままになる構造に変換されます。結果のフィールド名は小文字に変換されます (例: member_int または member_string)。ユーザー定義の型名またはユーザー定義の型名と組み込み型名が、大文字小文字を区別しない場合に同一である場合、例外が発生します。ただし、それ以外の場合は、フィールド名を一意に識別できます。 読み込み 3.5.0
stableIdentifierPrefixForUnionType member_ `enableStableIdentifiersForUnionType` が有効な場合、このオプションは Avro Union 型のフィールドのプレフィックスを設定できます。 読み込み 4.0.0
recursiveFieldMaxDepth -1 このオプションが負の値に指定されたり、0 に設定されたりすると、再帰フィールドは許可されなくなります。1 に設定するとすべての再帰フィールドがドロップされ、2 に設定すると再帰フィールドが 1 回再帰され、3 に設定すると 2 回再帰され、最大 15 までとなります。15 より大きい値は、意図せず非常に大きなスキーマが作成されるのを避けるために許可されません。Avro メッセージの深さがこの制限を超えている場合、返される Spark struct は再帰制限後に切り捨てられます。使用例は、セクション Avro フィールドの循環参照の処理 にあります。 読み込み 4.0.0

設定

Avro の設定は、spark.conf.set を使用するか、SQL を使用して SET key=value コマンドを実行することで行うことができます。

プロパティ名デフォルト意味バージョン以降
spark.sql.legacy.replaceDatabricksSparkAvro.enabled true true に設定すると、データソースプロバイダ com.databricks.spark.avro は、後方互換性のために組み込みの外部 Avro データソースモジュールにマッピングされます。
注意: この SQL 設定は Spark 3.2 で非推奨となり、将来削除される可能性があります。
2.4.0
spark.sql.avro.compression.codec snappy AVRO ファイルの書き込みに使用される圧縮コーデック。サポートされるコーデックは、uncompressed、deflate、snappy、bzip2、xz、zstandard です。デフォルトのコーデックは snappy です。 2.4.0
spark.sql.avro.deflate.level -1 AVRO ファイルの書き込みに使用される deflate コーデックの圧縮レベル。有効な値は 1 から 9 (両端を含む) または -1 の範囲です。デフォルト値は -1 で、現在の実装ではレベル 6 に対応します。 2.4.0
spark.sql.avro.xz.level 6 AVRO ファイルの書き込みに使用される xz コーデックの圧縮レベル。有効な値は 1 から 9 (両端を含む) の範囲です。現在の実装ではデフォルト値は 6 です。 4.0.0
spark.sql.avro.zstandard.level 3 AVRO ファイルの書き込みに使用される zstandard コーデックの圧縮レベル。現在の実装ではデフォルト値は 3 です。 4.0.0
spark.sql.avro.zstandard.bufferPool.enabled false true の場合、AVRO ファイルの書き込み時に ZSTD JNI ライブラリのバッファプールを有効にします。 4.0.0
spark.sql.avro.datetimeRebaseModeInRead EXCEPTION Julian から Proleptic Gregorian へのカレンダーへの datetimestamp-microstimestamp-millis 論理型の値の再ベースモード。
  • EXCEPTION: Spark は、2 つのカレンダー間で曖昧な古い日付/タイムスタンプを検出した場合、読み込みを失敗させます。
  • CORRECTED: Spark は再ベースを行わず、日付/タイムスタンプをそのまま読み込みます。
  • LEGACY: Spark は、Avro ファイルを読み込む際に、古いハイブリッド (Julian + Gregorian) カレンダーから Proleptic Gregorian カレンダーに日付/タイムスタンプを再ベースします。
この設定は、Avro ファイルのライター情報 (Spark、Hive など) が不明な場合にのみ有効です。
3.0.0
spark.sql.avro.datetimeRebaseModeInWrite EXCEPTION Proleptic Gregorian から Julian へのカレンダーへの datetimestamp-microstimestamp-millis 論理型の値の再ベースモード。
  • EXCEPTION: Spark は、2 つのカレンダー間で曖昧な古い日付/タイムスタンプを検出した場合、書き込みを失敗させます。
  • CORRECTED: Spark は再ベースを行わず、日付/タイムスタンプをそのまま書き込みます。
  • LEGACY: Spark は、Avro ファイルを書き込む際に、Proleptic Gregorian カレンダーから古いハイブリッド (Julian + Gregorian) カレンダーに日付/タイムスタンプを再ベースします。
3.0.0
spark.sql.avro.filterPushdown.enabled true true の場合、Avro データソースへのフィルタプッシュダウンを有効にします。 3.1.0

Databricks spark-avro との互換性

この Avro データソースモジュールは、Databricks のオープンソースリポジトリ spark-avro に由来し、互換性があります。

デフォルトでは、SQL 設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled が有効になっている場合、データソースプロバイダ com.databricks.spark.avro は、この組み込みの Avro モジュールにマッピングされます。カタログメタストアで Provider プロパティが com.databricks.spark.avro である Spark テーブルの場合、この組み込み Avro モジュールを使用している場合にこれらのテーブルを読み込むには、このマッピングが不可欠です。

Databricks の spark-avro では、ショートカット関数 .avro() のために暗黙のクラス AvroDataFrameWriterAvroDataFrameReader が作成されていました。この組み込みの外部モジュールでは、これらの暗黙のクラスは削除されました。代わりに、DataFrameWriter または DataFrameReader.format("avro") を使用してください。これはクリーンで十分なはずです。

spark-avro jar ファイルの独自のビルドを使用したい場合は、設定 spark.sql.legacy.replaceDatabricksSparkAvro.enabled を無効にし、アプリケーションのデプロイ時に --jars オプションを使用するだけです。詳細については、アプリケーション送信ガイドの 高度な依存関係管理 セクションを参照してください。

Avro -> Spark SQL 変換でサポートされる型

現在、Spark は Avro のレコード内のすべての プリミティブ型複雑な型 の読み込みをサポートしています。

Avro 型Spark SQL 型
boolean BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
string StringType
enum StringType
fixed BinaryType
bytes BinaryType
record StructType
array ArrayType
map MapType
union 下記参照

上記の型に加えて、union 型の読み込みもサポートしています。次の 3 つの型は基本的な union 型と見なされます。

  1. union(int, long) は LongType にマッピングされます。
  2. union(float, double) は DoubleType にマッピングされます。
  3. union(something, null)、ここで something はサポートされている任意の Avro 型です。これは、something の Spark SQL 型と同じ型にマッピングされ、nullable が true に設定されます。その他のすべての union 型は複雑な型と見なされます。これらは、union のメンバーに従って、フィールド名が member0、member1 などとなる StructType にマッピングされます。これは、Avro と Parquet の間で変換する場合の動作と一致します。

以下の Avro 論理型 の読み込みもサポートしています。

Avro 論理型Avro 型Spark SQL 型
date int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

現時点では、Avro ファイルに含まれる docs、aliases、およびその他のプロパティは無視されます。

Spark SQL -> Avro 変換でサポートされる型

Spark は、すべての Spark SQL 型を Avro に書き込むことをサポートしています。ほとんどの型では、Spark 型から Avro 型へのマッピングは単純です (例: IntegerType は int に変換されます)。ただし、以下にリストされているいくつかの特別なケースがあります。

Spark SQL 型Avro 型Avro 論理型
ByteType int
ShortType int
BinaryType bytes
DateType int date
TimestampType long timestamp-micros
DecimalType fixed decimal

オプション avroSchema を使用して、出力 Avro スキーマ全体を指定することもできます。これにより、Spark SQL 型を他の Avro 型に変換できます。次の変換はデフォルトでは適用されず、ユーザー指定の Avro スキーマが必要です。

Spark SQL 型Avro 型Avro 論理型
BinaryType fixed
StringType enum
TimestampType long timestamp-millis
DecimalType bytes decimal

Avro フィールドの循環参照の処理

Avro では、フィールドの型が親レコードのいずれかで定義されている場合に循環参照が発生します。これは、無限ループやその他の予期しない動作を引き起こす可能性があるため、データの解析時に問題を引き起こす可能性があります。循環参照を持つスキーマで Avro データを読み込むには、ユーザーは recursiveFieldMaxDepth オプションを使用して、スキーマの解析時に許可される再帰の最大レベル数を指定できます。デフォルトでは、Spark Avro データソースは recursiveFieldMaxDepth を -1 に設定することで再帰フィールドを許可しません。ただし、必要に応じてこのオプションを 1 から 15 に設定できます。

recursiveFieldMaxDepth を 1 に設定するとすべての再帰フィールドがドロップされ、2 に設定すると 1 回再帰され、3 に設定すると 2 回再帰されます。15 より大きい recursiveFieldMaxDepth の値は、パフォーマンスの問題やスタックオーバーフローを引き起こす可能性があるため許可されません。

以下の Avro メッセージの SQL スキーマは、recursiveFieldMaxDepth の値によって異なります。

この div は Markdown エディタ/ビューアを機能させるためだけにあり、Web では表示されません ```avro
{
  "type": "record",
  "name": "Node",
  "fields": [
    {"name": "Id", "type": "int"},
    {"name": "Next", "type": ["null", "Node"]}
  ]
}

// The Avro schema defined above, would be converted into a Spark SQL columns with the following
// structure based on `recursiveFieldMaxDepth` value.

1: struct<Id: int>
2: struct<Id: int, Next: struct<Id: int>>
3: struct<Id: int, Next: struct<Id: int, Next: struct<Id: int>>>
```