Parquetファイル
Parquetは、他の多くのデータ処理システムでサポートされているカラムナ形式です。Spark SQLは、元のデータのスキーマを自動的に保持するParquetファイルの読み取りと書き込みの両方をサポートしています。Parquetファイルを読み取るとき、互換性の理由から、すべての列は自動的にnullableに変換されます。
プログラムによるデータのロード
上記の例のデータを使用する
peopleDF = spark.read.json("examples/src/main/resources/people.json")
# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")
# Read in the Parquet file created above.
# Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()
# +------+
# | name|
# +------+
# |Justin|
# +------+
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")
// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> peopleDF = spark.read().json("examples/src/main/resources/people.json");
// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write().parquet("people.parquet");
// Read in the Parquet file created above.
// Parquet files are self-describing so the schema is preserved
// The result of loading a parquet file is also a DataFrame
Dataset<Row> parquetFileDF = spark.read().parquet("people.parquet");
// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile");
Dataset<Row> namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19");
Dataset<String> namesDS = namesDF.map(
(MapFunction<Row, String>) row -> "Name: " + row.getString(0),
Encoders.STRING());
namesDS.show();
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+
df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame can be saved as Parquet files, maintaining the schema information.
write.parquet(df, "people.parquet")
# Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved.
# The result of loading a parquet file is also a DataFrame.
parquetFile <- read.parquet("people.parquet")
# Parquet files can also be used to create a temporary view and then used in SQL statements.
createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers)
## name
## 1 Justin
# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
schema <- structType(structField("name", "string"))
teenNames <- dapply(df, function(p) { cbind(paste("Name:", p$name)) }, schema)
for (teenName in collect(teenNames)$name) {
cat(teenName, "\n")
}
## Name: Michael
## Name: Andy
## Name: Justin
CREATE TEMPORARY VIEW parquetTable
USING org.apache.spark.sql.parquet
OPTIONS (
path "examples/src/main/resources/people.parquet"
)
SELECT * FROM parquetTable
パーティション検出
テーブルパーティショニングは、Hiveのようなシステムで一般的に使用される最適化手法です。パーティション分割されたテーブルでは、通常、データは異なるディレクトリに保存され、パーティション分割列の値は各パーティションディレクトリのパスにエンコードされます。すべての組み込みファイルソース(Text/CSV/JSON/ORC/Parquetを含む)は、パーティション情報を自動的に検出して推論できます。たとえば、以前に使用したすべての人口データを、gender
とcountry
という2つの追加列をパーティション分割列として使用して、次のディレクトリ構造を使用してパーティション分割されたテーブルに保存できます。
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
path/to/table
をSparkSession.read.parquet
またはSparkSession.read.load
に渡すことで、Spark SQLはパスからパーティション情報を自動的に抽出します。これで、返されるDataFrameのスキーマは次のようになります。
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)
パーティション分割列のデータ型が自動的に推論されることに注意してください。現在、数値データ型、日付、タイムスタンプ、および文字列型がサポートされています。場合によっては、ユーザーがパーティション分割列のデータ型を自動的に推論したくないことがあります。このようなユースケースでは、spark.sql.sources.partitionColumnTypeInference.enabled
で自動型推論を設定できます。デフォルトはtrue
です。型推論が無効になっている場合、パーティション分割列には文字列型が使用されます。
Spark 1.6.0以降、パーティション検出はデフォルトで指定されたパスの下のパーティションのみを検出します。上記の例では、path/to/table/gender=male
をSparkSession.read.parquet
またはSparkSession.read.load
に渡した場合、gender
はパーティション分割列とはみなされません。ユーザーがパーティション検出を開始するベースパスを指定する必要がある場合は、データソースオプションでbasePath
を設定できます。たとえば、path/to/table/gender=male
がデータのパスで、ユーザーがbasePath
をpath/to/table/
に設定した場合、gender
はパーティション分割列になります。
スキーマのマージ
Protocol Buffer、Avro、Thriftと同様に、Parquetもスキーマの進化をサポートしています。ユーザーは単純なスキーマから始め、必要に応じて徐々にスキーマに列を追加できます。これにより、ユーザーは異なるが相互に互換性のあるスキーマを持つ複数のParquetファイルを使用できます。Parquetデータソースは、このケースを自動的に検出して、これらすべてのファイルのスキーマをマージできるようになりました。
スキーマのマージは比較的高価な操作であり、ほとんどの場合必要ないため、1.5.0以降、デフォルトでオフにしました。次のいずれかの方法で有効にできます。
- Parquetファイルを読み取るときに、データソースオプション
mergeSchema
をtrue
に設定します(以下の例に示すとおり)、または - グローバルSQLオプション
spark.sql.parquet.mergeSchema
をtrue
に設定します。
from pyspark.sql import Row
# spark is from the previous example.
# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext
squaresDF = spark.createDataFrame(sc.parallelize(range(1, 6))
.map(lambda i: Row(single=i, double=i ** 2)))
squaresDF.write.parquet("data/test_table/key=1")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6, 11))
.map(lambda i: Row(single=i, triple=i ** 3)))
cubesDF.write.parquet("data/test_table/key=2")
# Read the partitioned table
mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths.
# root
# |-- double: long (nullable = true)
# |-- single: long (nullable = true)
# |-- triple: long (nullable = true)
# |-- key: integer (nullable = true)
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public static class Square implements Serializable {
private int value;
private int square;
// Getters and setters...
}
public static class Cube implements Serializable {
private int value;
private int cube;
// Getters and setters...
}
List<Square> squares = new ArrayList<>();
for (int value = 1; value <= 5; value++) {
Square square = new Square();
square.setValue(value);
square.setSquare(value * value);
squares.add(square);
}
// Create a simple DataFrame, store into a partition directory
Dataset<Row> squaresDF = spark.createDataFrame(squares, Square.class);
squaresDF.write().parquet("data/test_table/key=1");
List<Cube> cubes = new ArrayList<>();
for (int value = 6; value <= 10; value++) {
Cube cube = new Cube();
cube.setValue(value);
cube.setCube(value * value * value);
cubes.add(cube);
}
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
Dataset<Row> cubesDF = spark.createDataFrame(cubes, Cube.class);
cubesDF.write().parquet("data/test_table/key=2");
// Read the partitioned table
Dataset<Row> mergedDF = spark.read().option("mergeSchema", true).parquet("data/test_table");
mergedDF.printSchema();
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)
df1 <- createDataFrame(data.frame(single=c(12, 29), double=c(19, 23)))
df2 <- createDataFrame(data.frame(double=c(19, 23), triple=c(23, 18)))
# Create a simple DataFrame, stored into a partition directory
write.df(df1, "data/test_table/key=1", "parquet", "overwrite")
# Create another DataFrame in a new partition directory,
# adding a new column and dropping an existing column
write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths
## root
## |-- single: double (nullable = true)
## |-- double: double (nullable = true)
## |-- triple: double (nullable = true)
## |-- key: integer (nullable = true)
HiveメタストアParquetテーブル変換
HiveメタストアParquetテーブルから読み取り、パーティション分割されていないHiveメタストアParquetテーブルに書き込む場合、Spark SQLはパフォーマンスを向上させるために、Hive SerDeの代わりに独自のParquetサポートを使用しようとします。この動作はspark.sql.hive.convertMetastoreParquet
構成によって制御され、デフォルトでオンになっています。
Hive/Parquetスキーマの調整
テーブルスキーマ処理の観点から、HiveとParquetには2つの重要な違いがあります。
- Hiveは大文字と小文字を区別しませんが、Parquetは大文字と小文字を区別します
- Hiveはすべての列をnullableとみなしますが、Parquetではnullabilityは重要です
このため、HiveメタストアParquetテーブルをSpark SQL Parquetテーブルに変換するときは、HiveメタストアスキーマとParquetスキーマを調整する必要があります。調整ルールは次のとおりです。
-
両方のスキーマで同じ名前を持つフィールドは、nullabilityに関係なく、同じデータ型を持つ必要があります。調整されたフィールドはParquet側のデータ型を持つ必要があるため、nullabilityが尊重されます。
-
調整されたスキーマには、Hiveメタストアスキーマで定義されたフィールドが正確に含まれます。
- Parquetスキーマにのみ出現するフィールドは、調整されたスキーマで削除されます。
- Hiveメタストアスキーマにのみ出現するフィールドは、調整されたスキーマにnullableフィールドとして追加されます。
メタデータの更新
Spark SQLは、パフォーマンスを向上させるためにParquetメタデータをキャッシュします。HiveメタストアParquetテーブル変換が有効になっている場合、変換されたテーブルのメタデータもキャッシュされます。これらのテーブルがHiveまたは他の外部ツールによって更新された場合は、一貫したメタデータを確保するために手動で更新する必要があります。
# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")
// spark is an existing SparkSession
spark.catalog().refreshTable("my_table");
refreshTable("my_table")
REFRESH TABLE my_table;
列暗号化
Spark 3.2以降、Apache Parquet 1.12以降のParquetテーブルで列暗号化がサポートされています。
Parquetはエンベロープ暗号化プラクティスを使用します。このプラクティスでは、ファイルパーツは「データ暗号化キー」(DEK)で暗号化され、DEKは「マスター暗号化キー」(MEK)で暗号化されます。DEKは、暗号化されたファイル/列ごとにParquetによってランダムに生成されます。MEKは、ユーザーが選択したキー管理サービス(KMS)で生成、保存、管理されます。Parquet Maven リポジトリには、KMSサーバーをデプロイせずに、spark-shellのみを使用して列の暗号化と復号化を実行できるモックKMS実装を備えたjarがあります(parquet-hadoop-tests.jar
ファイルをダウンロードして、Sparkのjars
フォルダーに配置してください)。
# Set hadoop configuration properties, e.g. using configuration properties of
# the Spark job:
# --conf spark.hadoop.parquet.encryption.kms.client.class=\
# "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\
# --conf spark.hadoop.parquet.encryption.key.list=\
# "keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA=="\
# --conf spark.hadoop.parquet.crypto.factory.class=\
# "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"
# Write encrypted dataframe files.
# Column "square" will be protected with master key "keyA".
# Parquet file footers will be protected with master key "keyB"
squaresDF.write\
.option("parquet.encryption.column.keys" , "keyA:square")\
.option("parquet.encryption.footer.key" , "keyB")\
.parquet("/path/to/table.parquet.encrypted")
# Read encrypted dataframe files
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration.set("parquet.encryption.kms.client.class" ,
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS")
// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration.set("parquet.encryption.key.list" ,
"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==")
// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration.set("parquet.crypto.factory.class" ,
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory")
// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write.
option("parquet.encryption.column.keys" , "keyA:square").
option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted")
// Read encrypted dataframe files
val df2 = spark.read.parquet("/path/to/table.parquet.encrypted")
sc.hadoopConfiguration().set("parquet.encryption.kms.client.class" ,
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS");
// Explicit master keys (base64 encoded) - required only for mock InMemoryKMS
sc.hadoopConfiguration().set("parquet.encryption.key.list" ,
"keyA:AAECAwQFBgcICQoLDA0ODw== , keyB:AAECAAECAAECAAECAAECAA==");
// Activate Parquet encryption, driven by Hadoop properties
sc.hadoopConfiguration().set("parquet.crypto.factory.class" ,
"org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory");
// Write encrypted dataframe files.
// Column "square" will be protected with master key "keyA".
// Parquet file footers will be protected with master key "keyB"
squaresDF.write().
option("parquet.encryption.column.keys" , "keyA:square").
option("parquet.encryption.footer.key" , "keyB").
parquet("/path/to/table.parquet.encrypted");
// Read encrypted dataframe files
Dataset<Row> df2 = spark.read().parquet("/path/to/table.parquet.encrypted");
KMSクライアント
InMemoryKMSクラスは、Parquet暗号化機能の説明と簡単なデモンストレーションのみを目的として提供されています。実際のデプロイでは使用しないでください。マスター暗号化キーは、ユーザーの組織にデプロイされた本番グレードのKMSシステムで保持および管理する必要があります。Parquet暗号化を使用したSparkのロールアウトには、KMSサーバー用のクライアントクラスの実装が必要です。Parquetは、このようなクラスの開発のためにプラグインインターフェイスを提供します。
public interface KmsClient {
// Wraps a key - encrypts it with the master key.
public String wrapKey(byte[] keyBytes, String masterKeyIdentifier);
// Decrypts (unwraps) a key with the master key.
public byte[] unwrapKey(String wrappedKey, String masterKeyIdentifier);
// Use of initialization parameters is optional.
public void initialize(Configuration configuration, String kmsInstanceID,
String kmsInstanceURL, String accessToken);
}
オープンソースKMSのそのようなクラスの例は、parquet-mrリポジトリにあります。本番KMSクライアントは、組織のセキュリティ管理者と協力して設計し、アクセス制御管理の経験を持つ開発者が構築する必要があります。このようなクラスが作成されたら、parquet.encryption.kms.client.class
パラメーターを介してアプリケーションに渡すことができ、上記の暗号化されたデータフレームの書き込み/読み取りサンプルに示すように、一般的なSparkユーザーが活用できます。
注:デフォルトでは、Parquetは、SparkエグゼキューターとKMSサーバーとの相互作用を最小限に抑える「二重エンベロープ暗号化」モードを実装します。このモードでは、DEKは「キー暗号化キー」(KEK、Parquetによってランダムに生成)で暗号化されます。KEKはKMSでMEKで暗号化されます。結果とKEK自体はSparkエグゼキューターメモリにキャッシュされます。通常のエンベロープ暗号化に関心のあるユーザーは、parquet.encryption.double.wrapping
パラメーターをfalse
に設定して切り替えることができます。Parquet暗号化パラメーターの詳細については、parquet-hadoop構成のページを参照してください。
データソースオプション
Parquetのデータソースオプションは、次のように設定できます。
.option
/.options
メソッド。DataFrameReader
DataFrameWriter
DataStreamReader
DataStreamWriter
- CREATE TABLE USING DATA_SOURCEでの
OPTIONS
句
プロパティ名 | デフォルト | 意味 | スコープ |
---|---|---|---|
datetimeRebaseMode |
(spark.sql.parquet.datetimeRebaseModeInRead 構成の値) |
datetimeRebaseMode オプションを使用すると、ユリウス暦から先発グレゴリオ暦へのDATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 論理型の値のリベースモードを指定できます。現在サポートされているモードは次のとおりです。
|
読み取り |
int96RebaseMode |
(spark.sql.parquet.int96RebaseModeInRead 構成の値) |
int96RebaseMode オプションを使用すると、INT96タイムスタンプのユリウス暦から先発グレゴリオ暦へのリベースモードを指定できます。現在サポートされているモードは次のとおりです。
|
読み取り |
mergeSchema |
(spark.sql.parquet.mergeSchema 構成の値) |
すべてのParquetパートファイルから収集されたスキーマをマージするかどうかを設定します。これにより、spark.sql.parquet.mergeSchema が上書きされます。 |
読み取り |
compression |
snappy |
ファイルに保存する際に使用する圧縮コーデック。 大文字と小文字を区別しない既知の短縮名 (none, uncompressed, snappy, gzip, lzo, brotli, lz4, およびzstd) のいずれかを使用できます。これにより、spark.sql.parquet.compression.codec が上書きされます。 |
write |
その他の一般的なオプションについては、汎用ファイルソースオプションを参照してください。
構成
Parquetの設定は、SparkSession
のsetConf
メソッドを使用するか、SQLでSET key=value
コマンドを実行することで行うことができます。
プロパティ名 | デフォルト | 意味 | バージョン: |
---|---|---|---|
spark.sql.parquet.binaryAsString |
false | 一部のParquet生成システム、特にImpala、Hive、および古いバージョンのSpark SQLでは、Parquetスキーマを書き出すときにバイナリデータと文字列を区別しません。このフラグは、Spark SQLにバイナリデータを文字列として解釈させ、これらのシステムとの互換性を提供します。 | 1.1.1 |
spark.sql.parquet.int96AsTimestamp |
true | 一部のParquet生成システム、特にImpalaとHiveは、タイムスタンプをINT96に格納します。このフラグは、Spark SQLにINT96データをタイムスタンプとして解釈させ、これらのシステムとの互換性を提供します。 | 1.3.0 |
spark.sql.parquet.int96TimestampConversion |
false | これは、Impalaによって書き込まれたデータについて、タイムスタンプに変換するときに、INT96データにタイムスタンプの調整を適用する必要があるかどうかを制御します。Impalaは、HiveおよびSparkとは異なるタイムゾーンオフセットでINT96データを格納するため、これが必要です。 | 2.3.0 |
spark.sql.parquet.outputTimestampType |
INT96 | SparkがParquetファイルにデータを書き込むときに使用するParquetタイムスタンプタイプを設定します。INT96はParquetでは標準ではありませんが、一般的に使用されるタイムスタンプタイプです。TIMESTAMP_MICROSはParquetの標準タイムスタンプタイプであり、Unixエポックからのマイクロ秒数を格納します。TIMESTAMP_MILLISも標準ですが、ミリ秒精度であるため、Sparkはタイムスタンプ値のマイクロ秒部分を切り捨てる必要があります。 | 2.3.0 |
spark.sql.parquet.compression.codec |
snappy | Parquetファイルの書き込みに使用する圧縮コーデックを設定します。テーブル固有のオプション/プロパティでcompression またはparquet.compression のいずれかが指定されている場合、優先順位はcompression 、parquet.compression 、spark.sql.parquet.compression.codec になります。許容される値には、none、uncompressed、snappy、gzip、lzo、brotli、lz4、zstdが含まれます。brotli にはBrotliCodec がインストールされている必要があることに注意してください。 |
1.1.1 |
spark.sql.parquet.filterPushdown |
true | trueに設定すると、Parquetフィルタのプッシュダウン最適化が有効になります。 | 1.2.0 |
spark.sql.parquet.aggregatePushdown |
false | trueの場合、最適化のために集計がParquetにプッシュダウンされます。集計式としてMIN、MAX、COUNTをサポートします。MIN/MAXの場合、ブール値、整数、浮動小数点、日付型をサポートします。COUNTの場合、すべてのデータ型をサポートします。Parquetファイルフッターから統計情報が欠落している場合、例外がスローされます。 | 3.3.0 |
spark.sql.hive.convertMetastoreParquet |
true | falseに設定すると、Spark SQLは組み込みサポートではなく、parquetテーブルにHive SerDeを使用します。 | 1.1.1 |
spark.sql.parquet.mergeSchema |
false |
trueの場合、Parquetデータソースはすべてのデータファイルから収集されたスキーマをマージします。それ以外の場合、スキーマはサマリーファイルから選択されるか、サマリーファイルがない場合はランダムなデータファイルから選択されます。 |
1.5.0 |
spark.sql.parquet.respectSummaryFiles |
false | trueの場合、Parquetのすべてのパートファイルがサマリーファイルと一致すると仮定し、スキーマをマージするときにそれらを無視します。それ以外の場合、これがデフォルトであるfalseの場合、すべてのパートファイルをマージします。これはエキスパートのみのオプションと見なされるべきであり、それが正確に何を意味するのかを知る前に有効にすべきではありません。 | 1.5.0 |
spark.sql.parquet.writeLegacyFormat |
false | trueの場合、データはSpark 1.4以前の方法で書き込まれます。たとえば、10進数値は、Apache HiveやApache Impalaなどの他のシステムが使用する、Apache Parquetの固定長バイト配列形式で書き込まれます。falseの場合、Parquetの新しい形式が使用されます。たとえば、10進数はintベースの形式で書き込まれます。Parquet出力がこの新しい形式をサポートしていないシステムで使用されることを目的としている場合は、trueに設定します。 | 1.6.0 |
spark.sql.parquet.enableVectorizedReader |
true | ベクトル化されたParquetデコードを有効にします。 | 2.0.0 |
spark.sql.parquet.enableNestedColumnVectorizedReader |
true | ネストされた列 (例: struct、list、map) のベクトル化されたParquetデコードを有効にします。spark.sql.parquet.enableVectorizedReader を有効にする必要があります。 |
3.3.0 |
spark.sql.parquet.recordLevelFilter.enabled |
false | trueの場合、プッシュダウンされたフィルターを使用してParquetのネイティブレコードレベルフィルタリングを有効にします。この構成は、spark.sql.parquet.filterPushdown が有効で、ベクトル化されたリーダーが使用されていない場合にのみ有効です。spark.sql.parquet.enableVectorizedReader をfalseに設定することにより、ベクトル化されたリーダーが使用されていないことを確認できます。 |
2.3.0 |
spark.sql.parquet.columnarReaderBatchSize |
4096 | Parquetのベクトル化されたリーダーバッチに含める行数。データ読み込み時のオーバーヘッドを最小限に抑え、OOMを回避するために、数値を慎重に選択する必要があります。 | 2.4.0 |
spark.sql.parquet.fieldId.write.enabled |
true | フィールドIDはParquetスキーマ仕様のネイティブフィールドです。有効にすると、Parquetライターは、Sparkスキーマにある場合、フィールドIDメタデータをParquetスキーマに入力します。 | 3.3.0 |
spark.sql.parquet.fieldId.read.enabled |
false | フィールドIDはParquetスキーマ仕様のネイティブフィールドです。有効にすると、Parquetリーダーは、要求されたSparkスキーマにある場合、列名を使用する代わりに、フィールドIDを使用してParquetフィールドを検索します。 | 3.3.0 |
spark.sql.parquet.fieldId.read.ignoreMissing |
false | ParquetファイルにフィールドIDがなく、Spark読み込みスキーマがフィールドIDを使用して読み込んでいる場合、このフラグが有効になっている場合はnullを黙って返し、それ以外の場合はエラーを返します。 | 3.3.0 |
spark.sql.parquet.timestampNTZ.enabled |
true | Parquetの読み取りと書き込みでTIMESTAMP_NTZ のサポートを有効にします。有効にすると、TIMESTAMP_NTZ 値は、注釈isAdjustedToUTC = falseのParquetタイムスタンプ列として書き込まれ、同様の方法で推論されます。無効にすると、このような値はTIMESTAMP_LTZ として読み取られ、書き込みのためにTIMESTAMP_LTZ に変換する必要があります。 |
3.4.0 |
spark.sql.parquet.datetimeRebaseModeInRead | EXCEPTION |
ユリウス暦から先発グレゴリオ暦へのDATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 論理型の値のリベースモード
|
3.0.0 |
spark.sql.parquet.datetimeRebaseModeInWrite | EXCEPTION |
先発グレゴリオ暦からユリウス暦へのDATE 、TIMESTAMP_MILLIS 、TIMESTAMP_MICROS 論理型の値のリベースモード
|
3.0.0 |
spark.sql.parquet.int96RebaseModeInRead | EXCEPTION |
ユリウス暦から先発グレゴリオ暦へのINT96 タイムスタンプ型の値のリベースモード
|
3.1.0 |
spark.sql.parquet.int96RebaseModeInWrite | EXCEPTION |
先発グレゴリオ暦からユリウス暦へのINT96 タイムスタンプ型の値のリベースモード
|
3.1.0 |