Parquet ファイル

Parquet は、多くの他のデータ処理システムでサポートされている列指向フォーマットです。Spark SQL は、元のデータのスキーマを自動的に保持する Parquet ファイルの読み書きをサポートしています。Parquet ファイルを読み込む際、互換性のためにすべての列は自動的に null 許容に変換されます。

プログラムによるデータ読み込み

上記の例のデータを使用する

peopleDF = spark.read.json("examples/src/main/resources/people.json")

# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")

# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# |  name|
# +------+
# |Justin|
# +------+
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/sql/datasource.py」にあります。
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」にあります。
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");

// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」にあります。
df <- read.df("examples/src/main/resources/people.json", "json")

# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")

# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")

# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
##     name
## 1 Justin

# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
  cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/RSparkSQLExample.R」にあります。
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
  path "examples/src/main/resources/people.parquet"
)

SELECT * FROM parquetTable

パーティション検出

テーブルパーティショニングは、Hive のようなシステムで一般的に使用される最適化アプローチです。パーティション分割されたテーブルでは、データは通常異なるディレクトリに格納され、パーティション列の値は各パーティションディレクトリのパスにエンコードされます。すべての組み込みファイルソース (Text/CSV/JSON/ORC/Parquet を含む) は、パーティション情報を自動的に検出し推論できます。たとえば、以前使用した人口データを、gendercountry という 2 つの追加列をパーティション列として、以下のディレクトリ構造を使用してパーティション分割されたテーブルに格納できます。

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

SparkSession.read.parquet または SparkSession.read.loadpath/to/table を渡すことで、Spark SQL はパスからパーティション情報を自動的に抽出します。この時点で、返される DataFrame のスキーマは以下のようになります。

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

パーティション列のデータ型は自動的に推論されることに注意してください。現在、数値データ型、日付、タイムスタンプ、文字列型がサポートされています。ユーザーは、パーティション列のデータ型を自動的に推論したくない場合があります。これらのユースケースでは、自動型推論は spark.sql.sources.partitionColumnTypeInference.enabled によって設定でき、デフォルトは true です。型推論が無効になっている場合、パーティション列には文字列型が使用されます。

Spark 1.6.0 以降、パーティション検出はデフォルトで指定されたパスの下のパーティションのみを見つけます。上記の例で、ユーザーが SparkSession.read.parquet または SparkSession.read.loadpath/to/table/gender=male を渡した場合、gender はパーティション列とは見なされません。パーティション検出を開始するベースパスを指定する必要がある場合、データソースオプションで basePath を設定できます。たとえば、データのパスが path/to/table/gender=male で、ユーザーが basePathpath/to/table/ に設定した場合、gender はパーティション列になります。

スキーマのマージ

Protocol Buffer、Avro、Thrift と同様に、Parquet もスキーマ進化をサポートしています。ユーザーはシンプルなスキーマから始め、必要に応じて徐々に列を追加できます。この方法で、ユーザーは異なるが相互に互換性のあるスキーマを持つ複数の Parquet ファイルを最終的に持つことができます。Parquet データソースは、このケースを自動的に検出し、これらのすべてのファイルのスキーマをマージできるようになりました。

スキーママージは比較的高価な操作であり、ほとんどの場合必要ではないため、1.5.0 以降はデフォルトでオフになっています。以下のように有効にできます。

  1. Parquet ファイルの読み込み時にデータソースオプション mergeSchematrue に設定する (以下の例を参照)、または
  2. グローバル SQL オプション spark.sql.parquet.mergeSchematrue に設定する。
from pyspark.sql import Row

# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
                                  .map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
                                .map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")

# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
#  |-- double: long (nullable = true)
#  |-- single: long (nullable = true)
#  |-- triple: long (nullable = true)
#  |-- key: integer (nullable = true)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/python/sql/datasource.py」にあります。
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala」にあります。
import com.google.common.collect.Lists;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public static class Square implements Serializable {
  private int value;
  private int square;

  // Getters and setters...

}

public static class Cube implements Serializable {
  private int value;
  private int cube;

  // Getters and setters...

}

List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
  Square square = new Square();
  square.setValue(value);
  square.setSquare(value * value);
  squares.add(square);
}

// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");

List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
  Cube cube = new Cube();
  cube.setValue(value);
  cube.setCube(value * value * value);
  cubes.add(cube);
}

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");

// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
//  |-- value: int (nullable = true)
//  |-- square: int (nullable = true)
//  |-- cube: int (nullable = true)
//  |-- key: int (nullable = true)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java」にあります。
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))

# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")

# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")

# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
##  |-- single: double (nullable = true)
##  |-- double: double (nullable = true)
##  |-- triple: double (nullable = true)
##  |-- key: integer (nullable = true)
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/RSparkSQLExample.R」にあります。

Hive メタストア Parquet テーブル変換

Hive メタストア Parquet テーブルから読み込み、非パーティション化された Hive メタストア Parquet テーブルに書き込む際、Spark SQL はパフォーマンス向上のために Hive SerDe ではなく独自の Parquet サポートを使用しようとします。この動作は spark.sql.hive.convertMetastoreParquet 設定によって制御され、デフォルトでオンになっています。

Hive/Parquet スキーマの調整

テーブルスキーマ処理の観点から、Hive と Parquet には 2 つの主な違いがあります。

  1. Hive は大文字小文字を区別しませんが、Parquet は区別します。
  2. Hive はすべての列を null 許容と見なしますが、Parquet では null 許容性は重要です。

このため、Hive メタストア Parquet テーブルを Spark SQL Parquet テーブルに変換する際には、Hive メタストアスキーマと Parquet スキーマを調整する必要があります。調整ルールは以下の通りです。

  1. 両方のスキーマで名前が同じフィールドは、null 許容性に関係なく同じデータ型でなければなりません。調整されたフィールドは、null 許容性が尊重されるように、Parquet 側のデータ型を持つべきです。

  2. 調整されたスキーマには、Hive メタストアスキーマで定義されたフィールドが正確に含まれます。

    • Parquet スキーマにのみ存在するフィールドは、調整されたスキーマではドロップされます。
    • Hive メタストアスキーマにのみ存在するフィールドは、調整されたスキーマで null 許容フィールドとして追加されます。

メタデータの更新

Spark SQL は、パフォーマンス向上のために Parquet メタデータをキャッシュします。Hive メタストア Parquet テーブル変換が有効になっている場合、変換されたテーブルのメタデータもキャッシュされます。これらのテーブルが Hive やその他の外部ツールによって更新された場合、一貫したメタデータを確保するために手動で更新する必要があります。

# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
refreshTable("my_table")
REFRESH TABLE my_table;

列単位の暗号化

Spark 3.2 以降、Apache Parquet 1.12 以降の Parquet テーブルで列単位の暗号化がサポートされています。

Parquet はエンベロープ暗号化方式を使用しており、ファイル部分は「データ暗号化キー」(DEK) で暗号化され、DEK は「マスター暗号化キー」(MEK) で暗号化されます。DEK は、各暗号化されたファイル/列に対して Parquet によってランダムに生成されます。MEK は、ユーザーが選択したキー管理サービス (KMS) で生成、保存、管理されます。Parquet Maven の リポジトリ には、KMS サーバーをデプロイせずに、spark-shell のみを使用して列の暗号化および復号化を実行できるモック KMS 実装を含む JAR があります ( parquet-hadoop-tests.jar ファイルをダウンロードして Spark の jars フォルダに配置してください)。

# Set hadoop configuration properties, e.g. using configuration properties of
# the Spark job:
# --conf spark.hadoop.parquet.encryption.kms.client.class=\
#           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\
# --conf spark.hadoop.parquet.encryption.key.list=\
#           "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA=="\
# --conf spark.hadoop.parquet.crypto.factory.class=\
#           "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"

# Write encrypted dataframe files.
# Column "square" will be protected with master key "keyA".
# Parquet file footers will be protected with master key "keyB"
squaresDF.write\
   .option("parquet.encryption.column.keys" , "keyA:square")\
   .option("parquet.encryption.footer.key" , "keyB")\
   .parquet("/path/to/table.parquet.encrypted")

# Read encrypted dataframe files
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
                           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")

// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
                   "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==")

// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
                   "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")

// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write.
   option("parquet.encryption.column.keys" , "keyA:square").
   option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted")

// Read encrypted dataframe files
val df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration().set("parquet.encryption.kms.client.class" ,
   "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS");

// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration().set("parquet.encryption.key.list" ,
   "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA==");

// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration().set("parquet.crypto.factory.class" ,
   "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory");

// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write().
   option("parquet.encryption.column.keys" , "keyA:square").
   option("parquet.encryption.footer.key" , "keyB").
   parquet("/path/to/table.parquet.encrypted");

// Read encrypted dataframe files
Dataset<Row> df2 = spark.read().parquet("/path/to/table.parquet.encrypted");

KMS クライアント

InMemoryKMS クラスは、Parquet 暗号化機能の図解と簡単なデモンストレーションのみを目的として提供されています。実際のデプロイメントでは使用しないでください。マスター暗号化キーは、ユーザーの組織にデプロイされた、本番グレードの KMS システムに保存および管理する必要があります。Spark と Parquet 暗号化のロールアウトには、KMS サーバー用のクライアントクラスの実装が必要です。Parquet は、そのようなクラスの開発のためのプラグイン インターフェース を提供しています。

public interface KmsClient {
  // Wraps a key - encrypts it with the master key.
  public String wrapKey(byte[] keyBytes, String masterKeyIdentifier);

  // Decrypts (unwraps) a key with the master key.
  public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier);

  // Use of initialization parameters is optional.
  public void initialize(Configuration configuration, String kmsInstanceID,
                         String kmsInstanceURL, String accessToken);
}

オープンソースの KMS 用のこのようなクラスの は、parquet-java リポジトリで見つけることができます。本番 KMS クライアントは、組織のセキュリティ管理者と協力して設計され、アクセス制御管理の経験を持つ開発者によって構築されるべきです。そのようなクラスが作成されると、parquet.encryption.kms.client.class パラメータを介してアプリケーションに渡され、上記の暗号化されたデータフレームの書き込み/読み込みサンプルで示されているように、一般的な Spark ユーザーによって利用できます。

注: デフォルトでは、Parquet は「ダブルエンベロープ暗号化」モードを実装しており、Spark エグゼキュータと KMS サーバーのやり取りを最小限に抑えます。このモードでは、DEK は「キー暗号化キー」(KEK、Parquet によってランダムに生成) で暗号化されます。KEK は KMS の MEK で暗号化され、その結果と KEK 自体は Spark エグゼキュータメモリにキャッシュされます。通常のエンベロープ暗号化に関心のあるユーザーは、parquet.encryption.double.wrapping パラメータを false に設定することで切り替えることができます。Parquet 暗号化パラメータの詳細については、parquet-hadoop 設定の ページ を参照してください。

データソースオプション

Parquet のデータソースオプションは、以下のように設定できます。

プロパティ名デフォルト意味スコープ
datetimeRebaseMode (spark.sql.parquet.datetimeRebaseModeInRead 設定の値) datetimeRebaseMode オプションを使用すると、Julian から Proleptic Gregorian カレンダーへの DATETIMESTAMP_MILLISTIMESTAMP_MICROS 論理型の値のリバインディングモードを指定できます。
現在サポートされているモードは以下の通りです。
  • EXCEPTION: 2 つのカレンダー間で曖昧な古い日付/タイムスタンプの読み込みで失敗します。
  • CORRECTED: リバインディングなしで日付/タイムスタンプをロードします。
  • LEGACY: 古い日付/タイムスタンプを Julian から Proleptic Gregorian カレンダーにリバインドします。
読み込み
int96RebaseMode (spark.sql.parquet.int96RebaseModeInRead 設定の値) int96RebaseMode オプションを使用すると、Julian から Proleptic Gregorian カレンダーへの INT96 タイムスタンプのリバインディングモードを指定できます。
現在サポートされているモードは以下の通りです。
  • EXCEPTION: 2 つのカレンダー間で曖昧な古い INT96 タイムスタンプの読み込みで失敗します。
  • CORRECTED: リバインディングなしで INT96 タイムスタンプをロードします。
  • LEGACY: 古いタイムスタンプを Julian から Proleptic Gregorian カレンダーにリバインドします。
読み込み
mergeSchema (spark.sql.parquet.mergeSchema 設定の値) すべての Parquet パーツファイルから収集されたスキーマをマージするかどうかを設定します。これは spark.sql.parquet.mergeSchema をオーバーライドします。 読み込み
compression snappy ファイルへの保存時に使用する圧縮コーデック。これは、known case-insensitive shorten names (none, uncompressed, snappy, gzip, lzo, brotli, lz4, lz4_raw, and zstd) のいずれかです。これは spark.sql.parquet.compression.codec をオーバーライドします。 書き込み

その他の汎用オプションについては、汎用ファイルソースオプション を参照してください。

設定

Parquet の設定は、spark.conf.set を使用するか、SQL を使用して SET key=value コマンドを実行することで行えます。

プロパティ名デフォルト意味バージョン以降
spark.sql.parquet.binaryAsString false Impala、Hive、および Spark SQL の古いバージョンなど、他の Parquet 生成システムは、バイナリデータと文字列を区別せずに Parquet スキーマを書き出します。このフラグは、Spark SQL がこれらのシステムとの互換性を提供するために、バイナリデータを文字列として解釈するように指示します。 1.1.1
spark.sql.parquet.int96AsTimestamp true Impala や Hive などの一部の Parquet 生成システムは、タイムスタンプを INT96 に格納します。このフラグは、Spark SQL がこれらのシステムとの互換性を提供するために、INT96 データをタイムスタンプとして解釈するように指示します。 1.3.0
spark.sql.parquet.int96TimestampConversion false これは、Impala によって書き込まれたデータに対して、タイムスタンプへの変換時にタイムスタンプ調整を INT96 データに適用するかどうかを制御します。これは、Impala が Hive & Spark とは異なるタイムゾーンオフセットで INT96 データを格納するため必要です。 2.3.0
spark.sql.parquet.outputTimestampType INT96 Spark がデータを Parquet ファイルに書き込む際に使用する Parquet タイムスタンプタイプを設定します。INT96 は標準ではありませんが、Parquet で一般的に使用されるタイムスタンプタイプです。TIMESTAMP_MICROS は Parquet の標準タイムスタンプタイプで、Unix エポックからのマイクロ秒数を格納します。TIMESTAMP_MILLIS も標準ですが、ミリ秒精度であり、Spark はタイムスタンプ値のマイクロ秒部分を切り捨てる必要があります。 2.3.0
spark.sql.parquet.compression.codec snappy Parquet ファイルの書き込み時に使用する圧縮コーデックを設定します。テーブル固有のオプション/プロパティで compression または parquet.compression が指定されている場合、優先順位は compressionparquet.compressionspark.sql.parquet.compression.codec の順になります。受け入れ可能な値には、none、uncompressed、snappy、gzip、lzo、brotli、lz4、lz4_raw、zstd が含まれます。brotli には BrotliCodec のインストールが必要なことに注意してください。 1.1.1
spark.sql.parquet.filterPushdown true true に設定すると、Parquet フィルタープッシュダウン最適化が有効になります。 1.2.0
spark.sql.parquet.aggregatePushdown false true の場合、集計は最適化のために Parquet にプッシュダウンされます。MIN、MAX、COUNT を集計式としてサポートします。MIN/MAX の場合、ブール値、整数、浮動小数点数、日付型をサポートします。COUNT の場合、すべてのデータ型をサポートします。Parquet ファイルフッターから統計情報が不足している場合、例外が発生します。 3.3.0
spark.sql.hive.convertMetastoreParquet true false に設定すると、Spark SQL は組み込みサポートではなく、Parquet テーブルの Hive SerDe を使用します。 1.1.1
spark.sql.parquet.mergeSchema false

true の場合、Parquet データソースはすべてのデータファイルから収集されたスキーマをマージします。それ以外の場合、スキーマはサマリーファイルから、またはサマリーファイルが利用できない場合はランダムなデータファイルから選択されます。

1.5.0
spark.sql.parquet.respectSummaryFiles false true の場合、Parquet のすべてのパーツファイルがサマリーファイルと一貫していると仮定し、スキーマのマージ時にそれらを無視します。それ以外の場合 (デフォルトは false)、すべてのパーツファイルをマージします。これはエキスパート専用オプションと見なされるべきであり、その意味を正確に理解する前に有効にすべきではありません。 1.5.0
spark.sql.parquet.writeLegacyFormat false true の場合、データは Spark 1.4 以前の方法で書き込まれます。たとえば、decimal 値は、Apache Hive や Apache Impala などの他のシステムが使用する Apache Parquet の固定長バイト配列形式で書き込まれます。false の場合、Parquet の新しい形式が使用されます。たとえば、decimal は整数ベースの形式で書き込まれます。Parquet 出力がこの新しい形式をサポートしていないシステムでの使用を意図している場合は、true に設定してください。 1.6.0
spark.sql.parquet.enableVectorizedReader true ベクトル化された Parquet デコーディングを有効にします。 2.0.0
spark.sql.parquet.enableNestedColumnVectorizedReader true ネストされた列 (struct、list、map など) のベクトル化された Parquet デコーディングを有効にします。spark.sql.parquet.enableVectorizedReader が有効になっている必要があります。 3.3.0
spark.sql.parquet.recordLevelFilter.enabled false true の場合、プッシュダウンされたフィルターを使用した Parquet のネイティブレコードレベルフィルタリングが有効になります。この設定は、spark.sql.parquet.filterPushdown が有効で、ベクトル化リーダーが使用されていない場合にのみ効果があります。spark.sql.parquet.enableVectorizedReader を false に設定することで、ベクトル化リーダーが使用されていないことを確認できます。 2.3.0
spark.sql.parquet.columnarReaderBatchSize 4096 Parquet ベクトル化リーダーのバッチに含まれる行数。オーバーヘッドを最小限に抑え、データの読み込み時に OOM を回避するために、この数値は慎重に選択する必要があります。 2.4.0
spark.sql.parquet.fieldId.write.enabled true フィールド ID は Parquet スキーマ仕様のネイティブフィールドです。有効にすると、Parquet ライターは Spark スキーマに存在するフィールド ID メタデータを Parquet スキーマに格納します。 3.3.0
spark.sql.parquet.fieldId.read.enabled false フィールド ID は Parquet スキーマ仕様のネイティブフィールドです。有効にすると、Parquet リーダーは、列名ではなく、フィールド ID (存在する場合) を使用して、要求された Spark スキーマの Parquet フィールドを検索します。 3.3.0
spark.sql.parquet.fieldId.read.ignoreMissing false Parquet ファイルにフィールド ID が存在しないが、Spark の読み込みスキーマがフィールド ID を使用している場合、このフラグが有効になっている場合はサイレントに null が返され、それ以外の場合はエラーが発生します。 3.3.0
spark.sql.parquet.inferTimestampNTZ.enabled true 有効にすると、アノテーション isAdjustedToUTC = false を持つ Parquet タイムスタンプ列は、スキーマ推論中に TIMESTAMP_NTZ 型として推論されます。それ以外の場合、すべての Parquet タイムスタンプ列は TIMESTAMP_LTZ 型として推論されます。Spark はファイル書き込み時に出力スキーマを Parquet のフッターメタデータに書き込み、ファイル読み込み時にそれを利用することに注意してください。したがって、この設定は Spark によって書き込まれていない Parquet ファイルのスキーマ推論にのみ影響します。 3.4.0
spark.sql.parquet.datetimeRebaseModeInRead EXCEPTION Julian から Proleptic Gregorian カレンダーへの DATETIMESTAMP_MILLISTIMESTAMP_MICROS 論理型の値のリバインディングモード。
  • EXCEPTION: Spark は、2 つのカレンダー間で曖昧な古い日付/タイムスタンプを検出した場合、読み込みを失敗します。
  • CORRECTED: Spark はリバインドせず、日付/タイムスタンプをそのまま読み込みます。
  • LEGACY: Spark は、Parquet ファイルを読み込む際に、古いハイブリッド (Julian + Gregorian) カレンダーから Proleptic Gregorian カレンダーへ日付/タイムスタンプをリバインドします。
この設定は、Parquet ファイルのライター情報 (Spark、Hive など) が不明な場合にのみ有効です。
3.0.0
spark.sql.parquet.datetimeRebaseModeInWrite EXCEPTION Proleptic Gregorian から Julian カレンダーへの DATETIMESTAMP_MILLISTIMESTAMP_MICROS 論理型の値のリバインディングモード。
  • EXCEPTION: Spark は、2 つのカレンダー間で曖昧な古い日付/タイムスタンプを検出した場合、書き込みを失敗します。
  • CORRECTED: Spark はリバインドせず、日付/タイムスタンプをそのまま書き込みます。
  • LEGACY: Spark は、Parquet ファイルを書き込む際に、Proleptic Gregorian カレンダーから古いハイブリッド (Julian + Gregorian) カレンダーへ日付/タイムスタンプをリバインドします。
3.0.0
spark.sql.parquet.int96RebaseModeInRead EXCEPTION Julian から Proleptic Gregorian カレンダーへの INT96 タイムスタンプ型の値のリバインディングモード。
  • EXCEPTION: Spark は、2 つのカレンダー間で曖昧な古い INT96 タイムスタンプを検出した場合、読み込みを失敗します。
  • CORRECTED: Spark はリバインドせず、日付/タイムスタンプをそのまま読み込みます。
  • LEGACY: Spark は、Parquet ファイルを読み込む際に、古いハイブリッド (Julian + Gregorian) カレンダーから Proleptic Gregorian カレンダーへ INT96 タイムスタンプをリバインドします。
この設定は、Parquet ファイルのライター情報 (Spark、Hive など) が不明な場合にのみ有効です。
3.1.0
spark.sql.parquet.int96RebaseModeInWrite EXCEPTION Proleptic Gregorian から Julian カレンダーへの INT96 タイムスタンプ型の値のリバインディングモード。
  • EXCEPTION: Spark は、2 つのカレンダー間で曖昧な古いタイムスタンプを検出した場合、書き込みを失敗します。
  • CORRECTED: Spark はリバインドせず、日付/タイムスタンプをそのまま書き込みます。
  • LEGACY: Spark は、Parquet ファイルを書き込む際に、Proleptic Gregorian カレンダーから古いハイブリッド (Julian + Gregorian) カレンダーへ INT96 タイムスタンプをリバインドします。
3.1.0