Hive テーブル
Spark SQL は、Apache Hive に保存されたデータの読み書きもサポートしています。ただし、Hive には多数の依存関係があるため、これらの依存関係はデフォルトの Spark ディストリビューションには含まれていません。Hive の依存関係がクラスパス上に見つかる場合、Spark はそれらを自動的にロードします。Hive に保存されたデータにアクセスするために、Hive のシリアライゼーションおよびデシリアライゼーションライブラリ(SerDes)にアクセスする必要があるため、これらの Hive 依存関係はすべてのワーカーノードにも存在する必要があることに注意してください。
Hive の設定は、hive-site.xml、core-site.xml(セキュリティ設定用)、および hdfs-site.xml(HDFS 設定用)ファイルを conf/ ディレクトリに配置することで行われます。
Hive を使用する場合、Hive metastore への永続的な接続、Hive serdes へのサポート、および Hive ユーザー定義関数を含む、Hive サポートを有効にして SparkSession をインスタンス化する必要があります。既存の Hive デプロイメントがないユーザーでも Hive サポートを有効にすることができます。hive-site.xml で設定されていない場合、コンテキストは自動的に現在のディレクトリに metastore_db を作成し、spark.sql.warehouse.dir で設定されたディレクトリ(Spark アプリケーションが開始された現在のディレクトリにある spark-warehouse ディレクトリがデフォルト)を作成します。Spark 2.0.0 以降、hive-site.xml の hive.metastore.warehouse.dir プロパティは非推奨になりました。代わりに、データベースのデフォルトの格納場所を指定するには spark.sql.warehouse.dir を使用してください。Spark アプリケーションを開始したユーザーに書き込み権限を付与する必要がある場合があります。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...Hive を使用する場合、Hive サポートを有効にして SparkSession をインスタンス化する必要があります。これにより、MetaStore でテーブルを検索し、HiveQL を使用してクエリを書き込むサポートが追加されます。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))Hive テーブルのストレージフォーマットの指定
Hive テーブルを作成する際には、このテーブルがファイルシステムからどのようにデータを読み書きするか、つまり「入力フォーマット」と「出力フォーマット」を定義する必要があります。また、このテーブルがデータをどのように行にデシリアライズするか、または行をデータにシリアライズするか、つまり「serde」を定義する必要があります。以下のオプションを使用して、ストレージフォーマット(「serde」、「入力フォーマット」、「出力フォーマット」)を指定できます。例:CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。デフォルトでは、テーブルファイルはプレーンテキストとして読み取られます。テーブル作成時に Hive ストレージハンドラはまだサポートされていないことに注意してください。Hive 側でストレージハンドラを使用してテーブルを作成し、Spark SQL でそれを読み取ることができます。
| プロパティ名 | 意味 |
|---|---|
fileFormat |
fileFormat は、"serde"、"input format"、"output format" を含むストレージフォーマット仕様のパッケージのようなものです。現在、'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile'、'avro' の 6 つの fileFormat をサポートしています。 |
inputFormat, outputFormat |
これらの 2 つのオプションは、対応する InputFormat および OutputFormat クラスの名前を文字列リテラルとして指定します。例:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。これら 2 つのオプションはペアで出現する必要があり、fileFormat オプションが既に指定されている場合は指定できません。 |
serde |
このオプションは、serde クラスの名前を指定します。fileFormat オプションが指定されている場合、指定された fileFormat が既に serde の情報を含んでいる場合は、このオプションを指定しないでください。現在、"sequencefile"、"textfile"、"rcfile" は serde 情報を含んでいないため、これら 3 つの fileFormat でこのオプションを使用できます。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
これらのオプションは "textfile" fileFormat のみで使用できます。これらは、区切りファイルを行に読み取る方法を定義します。 |
OPTIONS で定義されたその他のすべてのプロパティは、Hive serde プロパティとして扱われます。
Hive メタストアの異なるバージョンとの連携
Spark SQL の Hive サポートの最も重要な要素の 1 つは、Hive メタストアとの連携であり、これにより Spark SQL は Hive テーブルのメタデータにアクセスできます。Spark 1.4.0 以降、単一の Spark SQL バイナリビルドを使用して、以下に説明する設定により、さまざまなバージョンの Hive メタストアにクエリを実行できます。メタストアと通信するために使用される Hive のバージョンに関係なく、Spark SQL は内部的に組み込み Hive に対してコンパイルされ、内部実行(serdes、UDF、UDAF など)にそれらのクラスを使用することに注意してください。
メタデータ取得に使用される Hive のバージョンを設定するために、以下のオプションを使用できます。
| プロパティ名 | デフォルト | 意味 | バージョン以降 |
|---|---|---|---|
spark.sql.hive.metastore.version |
2.3.10 |
Hive メタストアのバージョン。利用可能なオプションは、2.0.0 から 2.3.10、3.0.0 から 3.1.3、および 4.0.0 から 4.0.1 です。 |
1.4.0 |
spark.sql.hive.metastore.jars |
builtin |
HiveMetastoreClient をインスタンス化するために使用される jars の場所。このプロパティは、4 つのオプションのいずれかになります。
-Phive が有効になっている場合に Spark アセンブリにバンドルされています。このオプションが選択された場合、spark.sql.hive.metastore.version は 2.3.10 であるか、定義されていない必要があります。spark.sql.hive.metastore.jars.path でコンマ区切りの形式で設定された Hive jars を使用します。ローカルパスとリモートパスの両方をサポートします。提供された jars は spark.sql.hive.metastore.version と同じバージョンである必要があります。 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
(空) |
HiveMetastoreClient をインスタンス化するために使用される jars のコンマ区切りパス。この設定は、spark.sql.hive.metastore.jars が path に設定されている場合にのみ役立ちます。パスは以下のいずれかの形式になります。
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
Spark SQL と特定のバージョンの Hive との間で共有されるクラスローダーを使用してロードされるべきクラスプレフィックスのコンマ区切りリスト。共有されるべきクラスの例としては、メタストアと通信するために必要な JDBC ドライバーがあります。共有される必要があるその他のクラスは、既に共有されているクラスとやり取りするクラスです。例えば、log4j で使用されるカスタムアペンダーなどです。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(空) |
Spark SQL が通信している各 Hive バージョンのために明示的に再ロードされるべきクラスプレフィックスのコンマ区切りリスト。例えば、通常は共有されるプレフィックス(例: |
1.4.0 |