汎用ファイルソースオプション

これらの汎用オプション/設定は、ファイルベースのソース (parquet、orc、avro、json、csv、text) を使用する場合にのみ有効です。

以下の例で使用されているディレクトリの階層は、次のとおりです。

dir1/
 ├── dir2/
 │    └── file2.parquet (schema: <file: string>, content: "file2.parquet")
 └── file1.parquet (schema: <file, string>, content: "file1.parquet")
 └── file3.json (schema: <file, string>, content: "{'file':'corrupt.json'}")

破損ファイルの無視

Spark では、設定 `spark.sql.files.ignoreCorruptFiles` またはデータソースオプション `ignoreCorruptFiles` を使用して、ファイルからデータを読み取る際に破損ファイルを無視できます。 true に設定すると、Spark ジョブは破損ファイルに遭遇しても実行を継続し、読み取られた内容は引き続き返されます。

データファイルの読み取り中に破損ファイルを無視するには、次のようにします。

# enable ignore corrupt files via the data source option
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df0 = spark.read.option("ignoreCorruptFiles", "true")\
    .parquet("examples/src/main/resources/dir1/",
             "examples/src/main/resources/dir1/dir2/")
test_corrupt_df0.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+

# enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
test_corrupt_df1 = spark.read.parquet("examples/src/main/resources/dir1/",
                                      "examples/src/main/resources/dir1/dir2/")
test_corrupt_df1.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/python/sql/datasource.py" にあります。
// enable ignore corrupt files via the data source option
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF0 = spark.read.option("ignoreCorruptFiles", "true").parquet(
  "examples/src/main/resources/dir1/",
  "examples/src/main/resources/dir1/dir2/")
testCorruptDF0.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

// enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true")
// dir1/file3.json is corrupt from parquet's view
val testCorruptDF1 = spark.read.parquet(
  "examples/src/main/resources/dir1/",
  "examples/src/main/resources/dir1/dir2/")
testCorruptDF1.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
// enable ignore corrupt files via the data source option
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF0 = spark.read().option("ignoreCorruptFiles", "true").parquet(
    "examples/src/main/resources/dir1/",
    "examples/src/main/resources/dir1/dir2/");
testCorruptDF0.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+

// enable ignore corrupt files via the configuration
spark.sql("set spark.sql.files.ignoreCorruptFiles=true");
// dir1/file3.json is corrupt from parquet's view
Dataset<Row> testCorruptDF1 = spark.read().parquet(
        "examples/src/main/resources/dir1/",
        "examples/src/main/resources/dir1/dir2/");
testCorruptDF1.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
# enable ignore corrupt files via the data source option
# dir1/file3.json is corrupt from parquet's view
testCorruptDF0 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"), ignoreCorruptFiles = "true")
head(testCorruptDF0)
#            file
# 1 file1.parquet
# 2 file2.parquet

# enable ignore corrupt files via the configuration
sql("set spark.sql.files.ignoreCorruptFiles=true")
# dir1/file3.json is corrupt from parquet's view
testCorruptDF1 <- read.parquet(c("examples/src/main/resources/dir1/", "examples/src/main/resources/dir1/dir2/"))
head(testCorruptDF1)
#            file
# 1 file1.parquet
# 2 file2.parquet
完全なコード例は、Spark リポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

欠落ファイルの無視

Spark では、設定 `spark.sql.files.ignoreMissingFiles` またはデータソースオプション `ignoreMissingFiles` を使用して、ファイルからデータを読み取る際に欠落ファイルを無視できます。ここで、欠落ファイルとは、`DataFrame` を構築した後にディレクトリから削除されたファイルを意味します。 true に設定すると、Spark ジョブは欠落ファイルに遭遇しても実行を継続し、読み取られた内容は引き続き返されます。

パスグロブフィルタ

`pathGlobFilter` は、パターンに一致するファイル名のファイルのみを含めるために使用されます。構文は `org.apache.hadoop.fs.GlobFilter` に従います。パーティション検出の動作は変更しません。

パーティション検出の動作を維持しながら、指定されたグロブパターンに一致するパスのファイルを読み込むには、次のようにします。

df = spark.read.load("examples/src/main/resources/dir1",
                     format="parquet", pathGlobFilter="*.parquet")
df.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val testGlobFilterDF = spark.read.format("parquet")
  .option("pathGlobFilter", "*.parquet") // json file should be filtered out
  .load("examples/src/main/resources/dir1")
testGlobFilterDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> testGlobFilterDF = spark.read().format("parquet")
        .option("pathGlobFilter", "*.parquet") // json file should be filtered out
        .load("examples/src/main/resources/dir1");
testGlobFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- read.df("examples/src/main/resources/dir1", "parquet", pathGlobFilter = "*.parquet")
#            file
# 1 file1.parquet
完全なコード例は、Spark リポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

再帰的なファイル検索

`recursiveFileLookup` は、ファイルを再帰的にロードするために使用され、パーティションの推論を無効にします。デフォルト値は `false` です。 `recursiveFileLookup` が true の場合にデータソースが明示的に `partitionSpec` を指定すると、例外がスローされます。

すべてのファイルを再帰的にロードするには、次のようにします。

recursive_loaded_df = spark.read.format("parquet")\
    .option("recursiveFileLookup", "true")\
    .load("examples/src/main/resources/dir1")
recursive_loaded_df.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# |file2.parquet|
# +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val recursiveLoadedDF = spark.read.format("parquet")
  .option("recursiveFileLookup", "true")
  .load("examples/src/main/resources/dir1")
recursiveLoadedDF.show()
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> recursiveLoadedDF = spark.read().format("parquet")
        .option("recursiveFileLookup", "true")
        .load("examples/src/main/resources/dir1");
recursiveLoadedDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// |file2.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
recursiveLoadedDF <- read.df("examples/src/main/resources/dir1", "parquet", recursiveFileLookup = "true")
head(recursiveLoadedDF)
#            file
# 1 file1.parquet
# 2 file2.parquet
完全なコード例は、Spark リポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

更新日時パスフィルタ

`modifiedBefore` と `modifiedAfter` は、Spark バッチクエリ中にロードされるファイルをより細かく制御するために、一緒に、または個別に適用できるオプションです。(ストラクチャードストリーミングファイルソースはこれらのオプションをサポートしていないことに注意してください。)

タイムゾーンオプションが指定されていない場合、タイムスタンプは Spark セッションのタイムゾーン(`spark.sql.session.timeZone`)に従って解釈されます。

指定された更新日時範囲に一致するパスのファイルを読み込むには、次のようにします。

# Only load files modified before 07/1/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
                     format="parquet", modifiedBefore="2050-07-01T08:30:00")
df.show()
# +-------------+
# |         file|
# +-------------+
# |file1.parquet|
# +-------------+
# Only load files modified after 06/01/2050 @ 08:30:00
df = spark.read.load("examples/src/main/resources/dir1",
                     format="parquet", modifiedAfter="2050-06-01T08:30:00")
df.show()
# +-------------+
# |         file|
# +-------------+
# +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val beforeFilterDF = spark.read.format("parquet")
  // Files modified before 07/01/2020 at 05:30 are allowed
  .option("modifiedBefore", "2020-07-01T05:30:00")
  .load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
val afterFilterDF = spark.read.format("parquet")
   // Files modified after 06/01/2020 at 05:30 are allowed
  .option("modifiedAfter", "2020-06-01T05:30:00")
  .load("examples/src/main/resources/dir1");
afterFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> beforeFilterDF = spark.read().format("parquet")
        // Only load files modified before 7/1/2020 at 05:30
        .option("modifiedBefore", "2020-07-01T05:30:00")
        // Only load files modified after 6/1/2020 at 05:30
        .option("modifiedAfter", "2020-06-01T05:30:00")
        // Interpret both times above relative to CST timezone
        .option("timeZone", "CST")
        .load("examples/src/main/resources/dir1");
beforeFilterDF.show();
// +-------------+
// |         file|
// +-------------+
// |file1.parquet|
// +-------------+
完全なコード例は、Spark リポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
beforeDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedBefore= "2020-07-01T05:30:00")
#            file
# 1 file1.parquet
afterDF <- read.df("examples/src/main/resources/dir1", "parquet", modifiedAfter = "2020-06-01T05:30:00")
#            file
完全なコード例は、Spark リポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。