汎用ファイルソースオプション
これらの汎用オプション/設定は、ファイルベースのソース (parquet, orc, avro, json, csv, text) を使用している場合にのみ有効です。
以下の例で使用されているディレクトリ階層に注意してください。
dir1/
├── dir2/
│ └── file2.parquet (schema: <file: string>, content: "file2.parquet")
└── file1.parquet (schema: <file, string>, content: "file1.parquet")
└── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")破損ファイルを無視する
Spark では、設定 spark.sql.files.ignoreCorruptFiles またはデータソースオプション ignoreCorruptFiles を使用して、ファイルからのデータ読み込み時に破損ファイルを無視できます。true に設定すると、Spark ジョブは破損ファイルに遭遇しても実行を継続し、読み込まれた内容は引き続き返されます。
データファイルを読み込む際に破損ファイルを無視するには、次を使用できます。
# enable ignore corrupt files via the data source option
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df0.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
# enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
test_corrupt_df1.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+// enable ignore corrupt files via the data source option
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF0.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF1 = spark.read.parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/")
testCorruptDF1.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+// enable ignore corrupt files via the data source option
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", "true").parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF0.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
// enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF1 = spark.read().parquet(
"examples/src/main/resources/dir1/",
"examples/src/main/resources/dir1/dir2/");
testCorruptDF1.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+# enable ignore corrupt files via the data source option
# dir1/file3.json is corrupt from parquet's view
testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
head(testCorruptDF0)
# file
# 1 file1.parquet
# 2 file2.parquet
# enable ignore corrupt files via the configuration
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
head(testCorruptDF1)
# file
# 1 file1.parquet
# 2 file2.parquet存在しないファイルを無視する
Spark では、設定 spark.sql.files.ignoreMissingFiles またはデータソースオプション ignoreMissingFiles を使用して、ファイルからのデータ読み込み時に存在しないファイルを無視できます。ここで、存在しないファイルとは、DataFrame を構築した後にディレクトリの下で削除されたファイルを指します。true に設定すると、Spark ジョブは存在しないファイルに遭遇しても実行を継続し、読み込まれた内容は引き続き返されます。
パスグロブフィルター
pathGlobFilter は、ファイル名がパターンに一致するファイルのみを含めるために使用されます。構文は org.apache.hadoop.fs.GlobFilter に従います。パーティション検出の動作は変更しません。
パーティション検出の動作を維持しながら、指定されたグロブパターンに一致するパスを持つファイルを読み込むには、次を使用できます。
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", pathGlobFilter="*.parquet")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+val testGlobFilterDF = spark.read.format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
.option("pathGlobFilter", "*.parquet") // json file should be filtered out
.load("examples/src/main/resources/dir1");
testGlobFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
# file
# 1 file1.parquet再帰的なファイル検索
recursiveFileLookup は、ファイルを再帰的に読み込むために使用され、パーティション推論を無効にします。デフォルト値は false です。recursiveFileLookup が true の場合にデータソースが明示的に partitionSpec を指定すると、例外がスローされます。
すべてのファイルを再帰的に読み込むには、次を使用できます。
recursive_loaded_df = spark.read.format("parquet")\
.option("recursiveFileLookup", "true")\
.load("examples/src/main/resources/dir1")
recursive_loaded_df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+val recursiveLoadedDF = spark.read.format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
.option("recursiveFileLookup", "true")
.load("examples/src/main/resources/dir1");
recursiveLoadedDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
head(recursiveLoadedDF)
# file
# 1 file1.parquet
# 2 file2.parquet更新時刻によるパスフィルター
modifiedBefore および modifiedAfter は、Spark バッチクエリ中に読み込むファイルをより細かく制御するために、単独または組み合わせて適用できるオプションです。(Structured Streaming のファイルソースはこれらのオプションをサポートしていません。)
modifiedBefore: 指定された時刻よりも前に更新時刻が発生したファイルのみを含めるためのオプションのタイムスタンプ。提供されるタイムスタンプは、YYYY-MM-DDTHH:mm:ss の形式である必要があります (例: 2020-06-01T13:00:00)。modifiedAfter: 指定された時刻よりも後に更新時刻が発生したファイルのみを含めるためのオプションのタイムスタンプ。提供されるタイムスタンプは、YYYY-MM-DDTHH:mm:ss の形式である必要があります (例: 2020-06-01T13:00:00)。
タイムゾーンオプションが指定されていない場合、タイムスタンプは Spark セッションのタイムゾーン (spark.sql.session.timeZone) に従って解釈されます。
指定された更新時刻の範囲に一致するパスを持つファイルを読み込むには、次を使用できます。
# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# | file|
# +-------------+
# +-------------+val beforeFilterDF = spark.read.format("parquet")
// Files modified before 07/01/2020 at 05:30 are allowed
.option("modifiedBefore", "2020-07-01T05:30:00")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
// Files modified after 06/01/2020 at 05:30 are allowed
.option("modifiedAfter", "2020-06-01T05:30:00")
.load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// | file|
// +-------------+
// +-------------+Dataset<Row> beforeFilterDF = spark.read().format("parquet")
// Only load files modified before 7/1/2020 at 05:30
.option("modifiedBefore", "2020-07-01T05:30:00")
// Only load files modified after 6/1/2020 at 05:30
.option("modifiedAfter", "2020-06-01T05:30:00")
// Interpret both times above relative to CST timezone
.option("timeZone", "CST")
.load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// | file|
// +-------------+
// |file1.parquet|
// +-------------+beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
# file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
# file