汎用ロード/セーブ関数
最も単純な形式では、デフォルトのデータソース(spark.sql.sources.default で別途設定されていない限り parquet)がすべての操作に使用されます。
users_df = spark.read.load("examples/src/main/resources/users.parquet")
users_df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")オプションの手動指定
また、使用するデータソースと、データソースに渡したい追加オプションを、手動で指定することもできます。データソースは、完全修飾名(例: org.apache.spark.sql.parquet)で指定しますが、組み込みソースの場合は短い名前(json、parquet、jdbc、orc、libsvm、csv、text)も使用できます。どのデータソースタイプからロードされたDataFrameも、この構文を使用して他のタイプに変換できます。
組み込みソースの利用可能なオプションについては、APIドキュメントを参照してください。たとえば、org.apache.spark.sql.DataFrameReader および org.apache.spark.sql.DataFrameWriter です。そこに記載されているオプションは、Scala以外のSpark API(例: PySpark)でも適用可能であるはずです。その他のフォーマットについては、特定のフォーマットのAPIドキュメントを参照してください。
JSONファイルをロードするには、次のようにします。
people_df = spark.read.load("examples/src/main/resources/people.json", format="json")
people_df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")CSVファイルをロードするには、次のようにします。
people_df = spark.read.load(
"examples/src/main/resources/people.csv",
format="csv",
sep=";",
inferSchema="true",
header="true"
)val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")追加オプションは、書き込み操作中にも使用されます。たとえば、ORCデータソースのブルームフィルターと辞書エンコーディングを制御できます。以下のORCの例では、ブルームフィルターを作成し、favorite_color のみに辞書エンコーディングを使用します。Parquetには、parquet.bloom.filter.enabled と parquet.enable.dictionary もあります。ORC/Parquetの追加オプションの詳細については、公式のApache ORC / Parquet のウェブサイトを参照してください。
ORCデータソース
users_df = spark.read.orc("examples/src/main/resources/users.orc")
(users_df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING ORC
OPTIONS (
orc.bloom.filter.columns 'favorite_color',
orc.dictionary.key.threshold '1.0',
orc.column.encoding.direct 'name'
)Parquetデータソース
users_df = spark.read.parquet("examples/src/main/resources/users.parquet")
(users_df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
OPTIONS (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.enable.dictionary true,
parquet.page.write-checksum.enabled true
)ファイルを直接SQLで実行する
ファイルをDataFrameにロードしてクエリするのではなく、SQLでファイルを直接クエリすることもできます。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")SELECT * FROM parquet.`examples/src/main/resources/users.parquet`保存モード
保存操作には、オプションで SaveMode を指定できます。これは、既存のデータがある場合にどのように処理するかを指定します。これらの保存モードはロックを使用せず、アトミックではないことに注意することが重要です。さらに、Overwrite を実行する場合、新しいデータを書き出す前にデータが削除されます。
| Scala/Java | どの言語でも | 意味 |
|---|---|---|
SaveMode.ErrorIfExists (デフォルト) |
"error" または "errorifexists" (デフォルト) |
DataFrameをデータソースに保存する際に、データが既に存在する場合、例外がスローされることが期待されます。 |
SaveMode.Append |
"append" |
DataFrameをデータソースに保存する際に、データ/テーブルが既に存在する場合、DataFrameのコンテンツが既存のデータに追加されることが期待されます。 |
SaveMode.Overwrite |
"overwrite" |
Overwriteモードとは、DataFrameをデータソースに保存する際に、データ/テーブルが既に存在する場合、既存のデータがDataFrameのコンテンツで上書きされることが期待されることを意味します。 |
SaveMode.Ignore |
"ignore" |
Ignoreモードとは、DataFrameをデータソースに保存する際に、データが既に存在する場合、保存操作はDataFrameのコンテンツを保存せず、既存のデータを変更しないことが期待されることを意味します。これは、SQLのCREATE TABLE IF NOT EXISTSに似ています。 |
永続テーブルへの保存
DataFrames は、saveAsTable コマンドを使用して、Hiveメタストアに永続テーブルとして保存することもできます。この機能を使用するために既存のHiveデプロイメントは必要ないことに注意してください。Sparkは、デフォルトのローカルHiveメタストア(Derbyを使用)を自動的に作成します。createOrReplaceTempView コマンドとは異なり、saveAsTable はDataFrameのコンテンツを具体化し、Hiveメタストアへのデータへのポインタを作成します。永続テーブルは、同じメタストアへの接続を維持している限り、Sparkプログラムが再起動した後も存在します。永続テーブルのDataFrameは、SparkSession の table メソッドをテーブル名で呼び出すことによって作成できます。
ファイルベースのデータソース(例: text、parquet、jsonなど)の場合、path オプションを使用してカスタムテーブルパスを指定できます。例: df.write.option("path", "/some/path").saveAsTable("t")。テーブルがドロップされた場合、カスタムテーブルパスは削除されず、テーブルデータはそのまま残ります。カスタムテーブルパスが指定されていない場合、Sparkはウェアハウスディレクトリの下のデフォルトテーブルパスにデータを書き込みます。テーブルがドロップされた場合、デフォルトのテーブルパスも削除されます。
Spark 2.1以降、永続データソーステーブルには、Hiveメタストアに保存されたパーティションごとのメタデータがあります。これにより、いくつかの利点があります。
- メタストアはクエリに必要なパーティションのみを返すことができるため、テーブルへの最初のクエリで全てのパーティションを検出する必要はなくなりました。
ALTER TABLE PARTITION ... SET LOCATIONのようなHive DDLが、Datasource APIで作成されたテーブルで利用可能になりました。
注意: パーティション情報は、外部データソーステーブル(path オプション付きのもの)を作成する際にデフォルトで収集されません。メタストアでパーティション情報を同期するには、MSCK REPAIR TABLE を呼び出すことができます。
バケット分割、ソート、パーティショニング
ファイルベースのデータソースの場合、出力をバケット分割およびソート、またはパーティショニングすることも可能です。バケット分割とソートは、永続テーブルにのみ適用されます。
people_df = spark.read.json("examples/src/main/resources/people.json")
people_df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");CREATE TABLE people_bucketed
USING json
CLUSTERED BY(name) INTO 42 BUCKETS
AS SELECT * FROM json.`examples/src/main/resources/people.json`;一方、パーティショニングは、Dataset APIを使用する場合、save および saveAsTable の両方で使用できます。
users_df = spark.read.load("examples/src/main/resources/users.parquet")
users_df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");CREATE TABLE users_by_favorite_color
USING parquet
PARTITIONED BY(favorite_color)
AS SELECT * FROM parquet.`examples/src/main/resources/users.parquet`;単一のテーブルでパーティショニングとバケット分割の両方を使用することが可能です。
users_df = spark.read.parquet("examples/src/main/resources/users.parquet")
(users_df.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");CREATE TABLE users_partitioned_bucketed
USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS
AS SELECT * FROM parquet.`examples/src/main/resources/users.parquet`;partitionBy は、パーティション検出セクションで説明されているディレクトリ構造を作成します。そのため、カーディナリティの高い列には適用範囲が限定されます。対照的に bucketBy は、指定された数のバケットにデータを分散させ、一意の値の数が無制限の場合に使用できます。