汎用ロード/セーブ関数

最も単純な形式では、デフォルトのデータソース(spark.sql.sources.defaultで設定されていない限りparquet)がすべての操作に使用されます。

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

オプションの手動指定

データソースに渡したい追加オプションと共に、使用するデータソースを手動で指定することもできます。データソースは完全修飾名(例:org.apache.spark.sql.parquet)で指定しますが、組み込みソースの場合は短い名前(jsonparquetjdbcorclibsvmcsvtext)も使用できます。任意のデータソースタイプからロードされたDataFrameは、この構文を使用して他のタイプに変換できます。

組み込みソースで使用可能なオプションについては、APIドキュメント(例:org.apache.spark.sql.DataFrameReaderおよびorg.apache.spark.sql.DataFrameWriter)を参照してください。そこに記載されているオプションは、Scala以外のSpark API(例:PySpark)でも適用可能です。その他の形式については、特定の形式のAPIドキュメントを参照してください。

JSONファイルをロードするには、以下を使用できます。

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

CSVファイルをロードするには、以下を使用できます。

df = spark.read.load("examples/src/main/resources/people.csv",
                     format="csv", sep=";", inferSchema="true", header="true")
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val peopleDFCsv = spark.read.format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> peopleDFCsv = spark.read().format("csv")
  .option("sep", ";")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("examples/src/main/resources/people.csv");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
完全なコード例は、Sparkリポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

追加オプションは書き込み操作時にも使用されます。たとえば、ORCデータソースのブルームフィルターと辞書エンコーディングを制御できます。次のORCの例では、ブルームフィルターを作成し、favorite_colorに対してのみ辞書エンコーディングを使用します。Parquetの場合も、parquet.bloom.filter.enabledparquet.enable.dictionaryが存在します。ORC/Parquetの追加オプションの詳細については、Apacheの公式ORC / Parquet ウェブサイトをご覧ください。

ORCデータソース

df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
    .option("orc.bloom.filter.columns", "favorite_color")
    .option("orc.dictionary.key.threshold", "1.0")
    .option("orc.column.encoding.direct", "name")
    .save("users_with_options.orc"))
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
usersDF.write.format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
usersDF.write().format("orc")
  .option("orc.bloom.filter.columns", "favorite_color")
  .option("orc.dictionary.key.threshold", "1.0")
  .option("orc.column.encoding.direct", "name")
  .save("users_with_options.orc");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
完全なコード例は、Sparkリポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING ORC
OPTIONS (
  orc.bloom.filter.columns 'favorite_color',
  orc.dictionary.key.threshold '1.0',
  orc.column.encoding.direct 'name'
)

Parquetデータソース

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet"))
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
usersDF.write.format("parquet")
  .option("parquet.bloom.filter.enabled#favorite_color", "true")
  .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
  .option("parquet.enable.dictionary", "true")
  .option("parquet.page.write-checksum.enabled", "false")
  .save("users_with_options.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
usersDF.write().format("parquet")
    .option("parquet.bloom.filter.enabled#favorite_color", "true")
    .option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
    .option("parquet.enable.dictionary", "true")
    .option("parquet.page.write-checksum.enabled", "false")
    .save("users_with_options.parquet");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
完全なコード例は、Sparkリポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。
CREATE TABLE users_with_options (
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
OPTIONS (
  `parquet.bloom.filter.enabled#favorite_color` true,
  `parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
  parquet.enable.dictionary true,
  parquet.page.write-checksum.enabled true
)

ファイルに対して直接SQLを実行する

read APIを使用してファイルをDataFrameにロードしてクエリを実行する代わりに、SQLを使用してそのファイルに直接クエリを実行することもできます。

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
完全なコード例は、Sparkリポジトリの "examples/src/main/r/RSparkSQLExample.R" にあります。

セーブモード

保存操作では、オプションでSaveModeを指定できます。これは、既存のデータを処理する方法を指定します。これらの保存モードはロックを使用せず、アトミックではないことに注意することが重要です。さらに、Overwriteを実行すると、新しいデータを書き込む前にデータが削除されます。

Scala/Java任意の言語意味
SaveMode.ErrorIfExists (デフォルト) "error" または "errorifexists" (デフォルト) DataFrameをデータソースに保存するときに、データが既に存在する場合、例外がスローされます。
SaveMode.Append "append" DataFrameをデータソースに保存するときに、データ/テーブルが既に存在する場合、DataFrameの内容は既存のデータに追加されます。
SaveMode.Overwrite "overwrite" 上書きモードとは、DataFrameをデータソースに保存するときに、データ/テーブルが既に存在する場合、既存のデータがDataFrameの内容によって上書きされることを意味します。
SaveMode.Ignore "ignore" 無視モードとは、DataFrameをデータソースに保存するときに、データが既に存在する場合、保存操作はDataFrameの内容を保存せず、既存のデータも変更しないことを意味します。これは、SQLのCREATE TABLE IF NOT EXISTSに似ています。

永続テーブルへの保存

DataFrameは、saveAsTableコマンドを使用して、永続テーブルとしてHiveメタストアに保存することもできます。この機能を使用するために既存のHiveデプロイメントは必要ありません。Sparkは、デフォルトのローカルHiveメタストア(Derbyを使用)を自動的に作成します。createOrReplaceTempViewコマンドとは異なり、saveAsTableはDataFrameの内容を具体化し、Hiveメタストア内のデータへのポインタを作成します。同じメタストアへの接続を維持している限り、永続テーブルはSparkプログラムの再起動後も存在します。永続テーブルのDataFrameは、テーブルの名前を指定してSparkSessiontableメソッドを呼び出すことによって作成できます。

ファイルベースのデータソース(例:text、parquet、jsonなど)の場合、pathオプションを介してカスタムテーブルパスを指定できます(例:df.write.option("path", "/some/path").saveAsTable("t"))。テーブルが削除されても、カスタムテーブルパスは削除されず、テーブルデータは残ります。カスタムテーブルパスが指定されていない場合、Sparkはウェアハウスディレクトリ下のデフォルトのテーブルパスにデータを書き込みます。テーブルが削除されると、デフォルトのテーブルパスも削除されます。

Spark 2.1以降、永続データソーステーブルには、Hiveメタストアに格納されたパーティションごとのメタデータがあります。これには、いくつかの利点があります。

外部データソーステーブル(pathオプションを持つテーブル)を作成する場合、パーティション情報はデフォルトでは収集されないことに注意してください。メタストア内のパーティション情報を同期するには、MSCK REPAIR TABLEを呼び出します。

バケット化、ソート、およびパーティション化

ファイルベースのデータソースの場合、出力のバケット化とソート、またはパーティション化も可能です。バケット化とソートは永続テーブルにのみ適用されます

df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
CREATE TABLE users_bucketed_by_name(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;

Dataset APIを使用する場合、パーティション化はsavesaveAsTableの両方で使用できます。

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
CREATE TABLE users_by_favorite_color(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);

1つのテーブルにパーティション化とバケット化の両方を使用することが可能です

df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
    .write
    .partitionBy("favorite_color")
    .bucketBy(42, "name")
    .saveAsTable("users_partitioned_bucketed"))
完全なコード例は、Sparkリポジトリの "examples/src/main/python/sql/datasource.py" にあります。
usersDF
  .write
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed")
完全なコード例は、Sparkリポジトリの "examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala" にあります。
usersDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("users_partitioned_bucketed");
完全なコード例は、Sparkリポジトリの "examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java" にあります。
CREATE TABLE users_bucketed_and_partitioned(
  name STRING,
  favorite_color STRING,
  favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;

partitionByは、パーティションの検出セクションで説明されているように、ディレクトリ構造を作成します。そのため、カーディナリティの高い列への適用性は限られています。対照的に、bucketByはデータを固定数のバケットに分散し、一意の値の数が無制限の場合に使用できます。