汎用ロード/セーブ関数
最も単純な形式では、デフォルトのデータソース(spark.sql.sources.default
で設定されていない限りparquet
)がすべての操作に使用されます。
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
オプションの手動指定
データソースに渡したい追加オプションと共に、使用するデータソースを手動で指定することもできます。データソースは完全修飾名(例:org.apache.spark.sql.parquet
)で指定しますが、組み込みソースの場合は短い名前(json
、parquet
、jdbc
、orc
、libsvm
、csv
、text
)も使用できます。任意のデータソースタイプからロードされたDataFrameは、この構文を使用して他のタイプに変換できます。
組み込みソースで使用可能なオプションについては、APIドキュメント(例:org.apache.spark.sql.DataFrameReader
およびorg.apache.spark.sql.DataFrameWriter
)を参照してください。そこに記載されているオプションは、Scala以外のSpark API(例:PySpark)でも適用可能です。その他の形式については、特定の形式のAPIドキュメントを参照してください。
JSONファイルをロードするには、以下を使用できます。
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
Dataset<Row> peopleDF =
spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet")
CSVファイルをロードするには、以下を使用できます。
df = spark.read.load("examples/src/main/resources/people.csv",
format="csv", sep=";", inferSchema="true", header="true")
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")
Dataset<Row> peopleDFCsv = spark.read().format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv");
df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferSchema = TRUE, header = TRUE)
namesAndAges <- select(df, "name", "age")
追加オプションは書き込み操作時にも使用されます。たとえば、ORCデータソースのブルームフィルターと辞書エンコーディングを制御できます。次のORCの例では、ブルームフィルターを作成し、favorite_color
に対してのみ辞書エンコーディングを使用します。Parquetの場合も、parquet.bloom.filter.enabled
とparquet.enable.dictionary
が存在します。ORC/Parquetの追加オプションの詳細については、Apacheの公式ORC / Parquet ウェブサイトをご覧ください。
ORCデータソース
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc"))
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc")
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
.option("orc.dictionary.key.threshold", "1.0")
.option("orc.column.encoding.direct", "name")
.save("users_with_options.orc");
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING ORC
OPTIONS (
orc.bloom.filter.columns 'favorite_color',
orc.dictionary.key.threshold '1.0',
orc.column.encoding.direct 'name'
)
Parquetデータソース
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet"))
usersDF.write.format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet")
usersDF.write().format("parquet")
.option("parquet.bloom.filter.enabled#favorite_color", "true")
.option("parquet.bloom.filter.expected.ndv#favorite_color", "1000000")
.option("parquet.enable.dictionary", "true")
.option("parquet.page.write-checksum.enabled", "false")
.save("users_with_options.parquet");
df <- read.df("examples/src/main/resources/users.parquet", "parquet")
write.parquet(df, "users_with_options.parquet", parquet.bloom.filter.enabled#favorite_color = true, parquet.bloom.filter.expected.ndv#favorite_color = 1000000, parquet.enable.dictionary = true, parquet.page.write-checksum.enabled = false)
CREATE TABLE users_with_options (
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
OPTIONS (
`parquet.bloom.filter.enabled#favorite_color` true,
`parquet.bloom.filter.expected.ndv#favorite_color` 1000000,
parquet.enable.dictionary true,
parquet.page.write-checksum.enabled true
)
ファイルに対して直接SQLを実行する
read APIを使用してファイルをDataFrameにロードしてクエリを実行する代わりに、SQLを使用してそのファイルに直接クエリを実行することもできます。
df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
Dataset<Row> sqlDF =
spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
セーブモード
保存操作では、オプションでSaveMode
を指定できます。これは、既存のデータを処理する方法を指定します。これらの保存モードはロックを使用せず、アトミックではないことに注意することが重要です。さらに、Overwrite
を実行すると、新しいデータを書き込む前にデータが削除されます。
Scala/Java | 任意の言語 | 意味 |
---|---|---|
SaveMode.ErrorIfExists (デフォルト) |
"error" または "errorifexists" (デフォルト) |
DataFrameをデータソースに保存するときに、データが既に存在する場合、例外がスローされます。 |
SaveMode.Append |
"append" |
DataFrameをデータソースに保存するときに、データ/テーブルが既に存在する場合、DataFrameの内容は既存のデータに追加されます。 |
SaveMode.Overwrite |
"overwrite" |
上書きモードとは、DataFrameをデータソースに保存するときに、データ/テーブルが既に存在する場合、既存のデータがDataFrameの内容によって上書きされることを意味します。 |
SaveMode.Ignore |
"ignore" |
無視モードとは、DataFrameをデータソースに保存するときに、データが既に存在する場合、保存操作はDataFrameの内容を保存せず、既存のデータも変更しないことを意味します。これは、SQLのCREATE TABLE IF NOT EXISTS に似ています。 |
永続テーブルへの保存
DataFrame
は、saveAsTable
コマンドを使用して、永続テーブルとしてHiveメタストアに保存することもできます。この機能を使用するために既存のHiveデプロイメントは必要ありません。Sparkは、デフォルトのローカルHiveメタストア(Derbyを使用)を自動的に作成します。createOrReplaceTempView
コマンドとは異なり、saveAsTable
はDataFrameの内容を具体化し、Hiveメタストア内のデータへのポインタを作成します。同じメタストアへの接続を維持している限り、永続テーブルはSparkプログラムの再起動後も存在します。永続テーブルのDataFrameは、テーブルの名前を指定してSparkSession
でtable
メソッドを呼び出すことによって作成できます。
ファイルベースのデータソース(例:text、parquet、jsonなど)の場合、path
オプションを介してカスタムテーブルパスを指定できます(例:df.write.option("path", "/some/path").saveAsTable("t")
)。テーブルが削除されても、カスタムテーブルパスは削除されず、テーブルデータは残ります。カスタムテーブルパスが指定されていない場合、Sparkはウェアハウスディレクトリ下のデフォルトのテーブルパスにデータを書き込みます。テーブルが削除されると、デフォルトのテーブルパスも削除されます。
Spark 2.1以降、永続データソーステーブルには、Hiveメタストアに格納されたパーティションごとのメタデータがあります。これには、いくつかの利点があります。
- メタストアはクエリに必要なパーティションのみを返すことができるため、テーブルへの最初のクエリですべてのパーティションを検出する必要がなくなりました。
ALTER TABLE PARTITION ... SET LOCATION
などのHive DDLは、Datasource APIで作成されたテーブルで使用できるようになりました。
外部データソーステーブル(path
オプションを持つテーブル)を作成する場合、パーティション情報はデフォルトでは収集されないことに注意してください。メタストア内のパーティション情報を同期するには、MSCK REPAIR TABLE
を呼び出します。
バケット化、ソート、およびパーティション化
ファイルベースのデータソースの場合、出力のバケット化とソート、またはパーティション化も可能です。バケット化とソートは永続テーブルにのみ適用されます
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")
peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");
CREATE TABLE users_bucketed_by_name(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
CLUSTERED BY(name) INTO 42 BUCKETS;
Dataset APIを使用する場合、パーティション化はsave
とsaveAsTable
の両方で使用できます。
df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")
usersDF
.write()
.partitionBy("favorite_color")
.format("parquet")
.save("namesPartByColor.parquet");
CREATE TABLE users_by_favorite_color(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING csv PARTITIONED BY(favorite_color);
1つのテーブルにパーティション化とバケット化の両方を使用することが可能です
df = spark.read.parquet("examples/src/main/resources/users.parquet")
(df
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed"))
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")
usersDF
.write()
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed");
CREATE TABLE users_bucketed_and_partitioned(
name STRING,
favorite_color STRING,
favorite_numbers array<integer>
) USING parquet
PARTITIONED BY (favorite_color)
CLUSTERED BY(name) SORTED BY (favorite_numbers) INTO 42 BUCKETS;
partitionBy
は、パーティションの検出セクションで説明されているように、ディレクトリ構造を作成します。そのため、カーディナリティの高い列への適用性は限られています。対照的に、bucketBy
はデータを固定数のバケットに分散し、一意の値の数が無制限の場合に使用できます。