Hiveテーブル
Spark SQLは、Apache Hiveに格納されたデータの読み書きもサポートしています。ただし、Hiveには多数の依存関係があるため、これらの依存関係はデフォルトのSparkディストリビューションには含まれていません。Hiveの依存関係がクラスパスで見つかると、Sparkは自動的にそれらをロードします。これらのHiveの依存関係は、Hiveに格納されたデータにアクセスするためにHiveのシリアライゼーションおよびデシリアライゼーションライブラリ(SerDes)にアクセスする必要があるため、すべてのワーカーノードにも存在する必要があることに注意してください。
Hiveの設定は、hive-site.xml
、core-site.xml
(セキュリティ設定用)、およびhdfs-site.xml
(HDFS設定用)ファイルをconf/
に配置することで行います。
Hiveを使用する場合、永続的なHiveメタストアへの接続、Hive serdesのサポート、およびHiveユーザー定義関数を含む、Hiveサポート付きのSparkSession
をインスタンス化する必要があります。既存のHiveデプロイメントがないユーザーでも、Hiveサポートを有効にできます。hive-site.xml
で構成されていない場合、コンテキストは現在のディレクトリにmetastore_db
を自動的に作成し、spark.sql.warehouse.dir
で構成されたディレクトリを作成します。これはデフォルトでは、Sparkアプリケーションが開始された現在のディレクトリのspark-warehouse
ディレクトリになります。Spark 2.0.0以降、hive-site.xml
のhive.metastore.warehouse.dir
プロパティは非推奨になっていることに注意してください。代わりに、spark.sql.warehouse.dir
を使用して、ウェアハウス内のデータベースのデフォルトの場所を指定します。Sparkアプリケーションを開始するユーザーに書き込み権限を付与する必要がある場合があります。
from os.path import abspath
from pyspark.sql import SparkSession
from pyspark.sql import Row
# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key| value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...
# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# | 500 |
# +--------+
# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...
# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")
# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# | 2| val_2| 2| val_2|
# | 4| val_4| 4| val_4|
# | 5| val_5| 5| val_5|
# ...
import java.io.File
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
case class Record(key: Int, value: String)
// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
import spark.sql
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")
// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...
// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// | 0|
// | 1|
// | 2|
// ... Order may vary, as spark processes the partitions in parallel.
// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...
spark.stop()
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public static class Record implements Serializable {
private int key;
private String value;
public int getKey() {
return key;
}
public void setKey(int key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}
// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("Java Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...
// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+
// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");
// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
(MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
Encoders.STRING());
stringsDS.show();
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...
// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
Record record = new Record();
record.setKey(key);
record.setValue("val_" + key);
records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");
// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// ...
Hiveを使用する場合は、Hiveサポート付きのSparkSession
をインスタンス化する必要があります。これにより、MetaStoreでテーブルを検索し、HiveQLを使用してクエリを記述するサポートが追加されます。
# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
Hiveテーブルのストレージ形式の指定
Hiveテーブルを作成するときは、このテーブルがファイルシステムとの間でデータを読み書きする方法、つまり「入力形式」と「出力形式」を定義する必要があります。また、このテーブルがデータを行にデシリアライズする方法、または行をデータにシリアライズする方法、つまり「serde」を定義する必要があります。ストレージ形式(「serde」、「入力形式」、「出力形式」)を指定するために、次のオプションを使用できます。たとえば、CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')
のようになります。デフォルトでは、テーブルファイルをプレーンテキストとして読み取ります。テーブルを作成するときは、Hiveストレージハンドラーはまだサポートされていないことに注意してください。Hive側でストレージハンドラーを使用してテーブルを作成し、Spark SQLを使用して読み取ることができます。
プロパティ名 | 意味 |
---|---|
fileFormat |
fileFormatは、「serde」、「入力形式」、「出力形式」を含むストレージ形式仕様のパッケージのようなものです。現在、'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile'、'avro'の6つのfileFormatをサポートしています。 |
inputFormat, outputFormat |
これらの2つのオプションは、対応するInputFormat およびOutputFormat クラスの名前を、文字列リテラルとして指定します。例:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat 。これらの2つのオプションはペアで表示する必要があり、fileFormat オプションをすでに指定している場合は指定できません。 |
serde |
このオプションは、serdeクラスの名前を指定します。fileFormat オプションが指定されている場合、指定されたfileFormat にserdeの情報がすでに含まれている場合は、このオプションを指定しないでください。現在、「sequencefile」、「textfile」、「rcfile」にはserde情報が含まれていないため、これらの3つのfileFormatでこのオプションを使用できます。 |
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim |
これらのオプションは、「textfile」fileFormatでのみ使用できます。これらは、区切り付きファイルをどのように行に読み取るかを定義します。 |
OPTIONS
で定義された他のすべてのプロパティは、Hive serdeプロパティと見なされます。
異なるバージョンのHive Metastoreとの連携
Spark SQLのHiveサポートの最も重要な部分の1つは、HiveテーブルのメタデータへのSpark SQLのアクセスを可能にするHiveメタストアとの連携です。Spark 1.4.0以降、Spark SQLの単一のバイナリビルドを使用して、以下に示す構成を使用して、異なるバージョンのHiveメタストアをクエリできます。メタストアとの通信に使用されているHiveのバージョンに関係なく、Spark SQLは内部的に組み込みのHiveに対してコンパイルし、内部実行(serdes、UDF、UDAFなど)にそれらのクラスを使用することに注意してください。
メタデータの取得に使用するHiveのバージョンを構成するために、次のオプションを使用できます。
プロパティ名 | デフォルト | 意味 | バージョン |
---|---|---|---|
spark.sql.hive.metastore.version |
2.3.9 |
Hiveメタストアのバージョン。使用可能なオプションは、0.12.0 から2.3.9 、および3.0.0 から3.1.3 です。 |
1.4.0 |
spark.sql.hive.metastore.jars |
builtin |
HiveMetastoreClientをインスタンス化するために使用する必要があるjarの場所。このプロパティは、次の4つのオプションのいずれかになります。
-Phive が有効な場合、SparkアセンブリにバンドルされているHive 2.3.9を使用します。このオプションが選択されている場合、spark.sql.hive.metastore.version は2.3.9 であるか、定義されていない必要があります。spark.sql.hive.metastore.jars.path で構成されたHive jarを使用します。ローカルパスとリモートパスの両方をサポートします。提供されるjarは、spark.sql.hive.metastore.version と同じバージョンである必要があります。 |
1.4.0 |
spark.sql.hive.metastore.jars.path |
(空) |
HiveMetastoreClientをインスタンス化するために使用されるjarのコンマ区切りパス。この構成は、spark.sql.hive.metastore.jars がpath として設定されている場合にのみ役立ちます。パスは、次のいずれかの形式にできます。
|
3.1.0 |
spark.sql.hive.metastore.sharedPrefixes |
com.mysql.jdbc, |
Spark SQLと特定のバージョンのHiveの間で共有されているクラスローダーを使用してロードする必要があるクラスプレフィックスのコンマ区切りリスト。共有する必要があるクラスの例は、メタストアと通信するために必要なJDBCドライバーです。共有する必要がある他のクラスは、すでに共有されているクラスとやり取りするクラスです。たとえば、log4jで使用されるカスタムアペンダーなどです。 |
1.4.0 |
spark.sql.hive.metastore.barrierPrefixes |
(空) |
Spark SQLが通信しているHiveの各バージョンに対して明示的にリロードする必要があるクラスプレフィックスのコンマ区切りリスト。たとえば、通常は共有されるプレフィックス(つまり、 |
1.4.0 |