Hive テーブル

Spark SQL は、Apache Hive に保存されたデータの読み書きもサポートしています。ただし、Hive には多数の依存関係があるため、これらの依存関係はデフォルトの Spark ディストリビューションには含まれていません。Hive の依存関係がクラスパス上に見つかる場合、Spark はそれらを自動的にロードします。Hive に保存されたデータにアクセスするために、Hive のシリアライゼーションおよびデシリアライゼーションライブラリ(SerDes)にアクセスする必要があるため、これらの Hive 依存関係はすべてのワーカーノードにも存在する必要があることに注意してください。

Hive の設定は、hive-site.xmlcore-site.xml(セキュリティ設定用)、および hdfs-site.xml(HDFS 設定用)ファイルを conf/ ディレクトリに配置することで行われます。

Hive を使用する場合、Hive metastore への永続的な接続、Hive serdes へのサポート、および Hive ユーザー定義関数を含む、Hive サポートを有効にして SparkSession をインスタンス化する必要があります。既存の Hive デプロイメントがないユーザーでも Hive サポートを有効にすることができます。hive-site.xml で設定されていない場合、コンテキストは自動的に現在のディレクトリに metastore_db を作成し、spark.sql.warehouse.dir で設定されたディレクトリ(Spark アプリケーションが開始された現在のディレクトリにある spark-warehouse ディレクトリがデフォルト)を作成します。Spark 2.0.0 以降、hive-site.xmlhive.metastore.warehouse.dir プロパティは非推奨になりました。代わりに、データベースのデフォルトの格納場所を指定するには spark.sql.warehouse.dir を使用してください。Spark アプリケーションを開始したユーザーに書き込み権限を付与する必要がある場合があります。

from os.path import abspath

from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration example") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

# spark is an existing SparkSession
spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show()
# +---+-------+
# |key|  value|
# +---+-------+
# |238|val_238|
# | 86| val_86|
# |311|val_311|
# ...

# Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show()
# +--------+
# |count(1)|
# +--------+
# |    500 |
# +--------+

# The results of SQL queries are themselves DataFrames and support all normal functions.
sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

# The items in DataFrames are of type Row, which allows you to access each column by ordinal.
stringsDS = sqlDF.rdd.map(lambda row: "Key: %d, Value: %s" % (row.key, row.value))
for record in stringsDS.collect():
    print(record)
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# Key: 0, Value: val_0
# ...

# You can also use DataFrames to create temporary views within a SparkSession.
Record = Row("key", "value")
recordsDF = spark.createDataFrame([Record(i, "val_" + str(i)) for i in range(1, 101)])
recordsDF.createOrReplaceTempView("records")

# Queries can then join DataFrame data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
# +---+------+---+------+
# |key| value|key| value|
# +---+------+---+------+
# |  2| val_2|  2| val_2|
# |  4| val_4|  4| val_4|
# |  5| val_5|  5| val_5|
# ...
完全なサンプルコードは、Spark リポジトリの「examples/src/main/python/sql/hive.py」にあります。
import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
  case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// |  5| val_5|  5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_bigints(id bigint) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_bigints").show()
// +---+
// | id|
// +---+
// |  0|
// |  1|
// |  2|
// ... Order may vary, as spark processes the partitions in parallel.

// Turn on flag for Hive Dynamic Partitioning
spark.conf.set("hive.exec.dynamic.partition", "true")
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// |  value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()
完全なサンプルコードは、Spark リポジトリの「examples/src/main/scala/org/apache/spark/examples/sql/hive/SparkHiveExample.scala」にあります。
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public static class Record implements Serializable {
  private int key;
  private String value;

  public int getKey() {
    return key;
  }

  public void setKey(int key) {
    this.key = key;
  }

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }
}

// warehouseLocation points to the default location for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark Hive Example")
  .config("spark.sql.warehouse.dir", warehouseLocation)
  .enableHiveSupport()
  .getOrCreate();

spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive");
spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");

// Queries are expressed in HiveQL
spark.sql("SELECT * FROM src").show();
// +---+-------+
// |key|  value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
spark.sql("SELECT COUNT(*) FROM src").show();
// +--------+
// |count(1)|
// +--------+
// |    500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key");

// The items in DataFrames are of type Row, which lets you to access each column by ordinal.
Dataset<String> stringsDS = sqlDF.map(
    (MapFunction<Row, String>) row -> "Key: " + row.get(0) + ", Value: " + row.get(1),
    Encoders.STRING());
stringsDS.show();
// +--------------------+
// |               value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
List<Record> records = new ArrayList<>();
for (int key = 1; key < 100; key++) {
  Record record = new Record();
  record.setKey(key);
  record.setValue("val_" + key);
  records.add(record);
}
Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class);
recordsDF.createOrReplaceTempView("records");

// Queries can then join DataFrames data with data stored in Hive.
spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show();
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// |  2| val_2|  2| val_2|
// |  2| val_2|  2| val_2|
// |  4| val_4|  4| val_4|
// ...
完全なサンプルコードは、Spark リポジトリの「examples/src/main/java/org/apache/spark/examples/sql/hive/JavaSparkHiveExample.java」にあります。

Hive を使用する場合、Hive サポートを有効にして SparkSession をインスタンス化する必要があります。これにより、MetaStore でテーブルを検索し、HiveQL を使用してクエリを書き込むサポートが追加されます。

# enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value"))
完全なサンプルコードは、Sparkリポジトリの「examples/src/main/r/RSparkSQLExample.R」にあります。

Hive テーブルのストレージフォーマットの指定

Hive テーブルを作成する際には、このテーブルがファイルシステムからどのようにデータを読み書きするか、つまり「入力フォーマット」と「出力フォーマット」を定義する必要があります。また、このテーブルがデータをどのように行にデシリアライズするか、または行をデータにシリアライズするか、つまり「serde」を定義する必要があります。以下のオプションを使用して、ストレージフォーマット(「serde」、「入力フォーマット」、「出力フォーマット」)を指定できます。例:CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。デフォルトでは、テーブルファイルはプレーンテキストとして読み取られます。テーブル作成時に Hive ストレージハンドラはまだサポートされていないことに注意してください。Hive 側でストレージハンドラを使用してテーブルを作成し、Spark SQL でそれを読み取ることができます。

プロパティ名意味
fileFormat fileFormat は、"serde"、"input format"、"output format" を含むストレージフォーマット仕様のパッケージのようなものです。現在、'sequencefile'、'rcfile'、'orc'、'parquet'、'textfile'、'avro' の 6 つの fileFormat をサポートしています。
inputFormat, outputFormat これらの 2 つのオプションは、対応する InputFormat および OutputFormat クラスの名前を文字列リテラルとして指定します。例:org.apache.hadoop.hive.ql.io.orc.OrcInputFormat。これら 2 つのオプションはペアで出現する必要があり、fileFormat オプションが既に指定されている場合は指定できません。
serde このオプションは、serde クラスの名前を指定します。fileFormat オプションが指定されている場合、指定された fileFormat が既に serde の情報を含んでいる場合は、このオプションを指定しないでください。現在、"sequencefile"、"textfile"、"rcfile" は serde 情報を含んでいないため、これら 3 つの fileFormat でこのオプションを使用できます。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim これらのオプションは "textfile" fileFormat のみで使用できます。これらは、区切りファイルを行に読み取る方法を定義します。

OPTIONS で定義されたその他のすべてのプロパティは、Hive serde プロパティとして扱われます。

Hive メタストアの異なるバージョンとの連携

Spark SQL の Hive サポートの最も重要な要素の 1 つは、Hive メタストアとの連携であり、これにより Spark SQL は Hive テーブルのメタデータにアクセスできます。Spark 1.4.0 以降、単一の Spark SQL バイナリビルドを使用して、以下に説明する設定により、さまざまなバージョンの Hive メタストアにクエリを実行できます。メタストアと通信するために使用される Hive のバージョンに関係なく、Spark SQL は内部的に組み込み Hive に対してコンパイルされ、内部実行(serdes、UDF、UDAF など)にそれらのクラスを使用することに注意してください。

メタデータ取得に使用される Hive のバージョンを設定するために、以下のオプションを使用できます。

プロパティ名デフォルト意味バージョン以降
spark.sql.hive.metastore.version 2.3.10 Hive メタストアのバージョン。利用可能なオプションは、2.0.0 から 2.3.103.0.0 から 3.1.3、および 4.0.0 から 4.0.1 です。 1.4.0
spark.sql.hive.metastore.jars builtin HiveMetastoreClient をインスタンス化するために使用される jars の場所。このプロパティは、4 つのオプションのいずれかになります。
  1. builtin
  2. Hive 2.3.10 を使用します。これは、-Phive が有効になっている場合に Spark アセンブリにバンドルされています。このオプションが選択された場合、spark.sql.hive.metastore.version2.3.10 であるか、定義されていない必要があります。
  3. maven
  4. Maven リポジトリからダウンロードされた指定されたバージョンの Hive jars を使用します。この設定は、本番環境でのデプロイメントには一般的に推奨されません。
  5. path
  6. spark.sql.hive.metastore.jars.path でコンマ区切りの形式で設定された Hive jars を使用します。ローカルパスとリモートパスの両方をサポートします。提供された jars は spark.sql.hive.metastore.version と同じバージョンである必要があります。
  7. JVM の標準形式のクラスパス。このクラスパスには、Hive とその依存関係、および正しいバージョンの Hadoop がすべて含まれている必要があります。提供された jars は spark.sql.hive.metastore.version と同じバージョンである必要があります。これらの jars はドライバにのみ存在すればよいですが、yarn クラスターモードで実行している場合は、それらがアプリケーションと一緒にパッケージ化されていることを確認する必要があります。
1.4.0
spark.sql.hive.metastore.jars.path (空) HiveMetastoreClient をインスタンス化するために使用される jars のコンマ区切りパス。この設定は、spark.sql.hive.metastore.jarspath に設定されている場合にのみ役立ちます。
パスは以下のいずれかの形式になります。
  1. file://path/to/jar/foo.jar
  2. hdfs://nameservice/path/to/jar/foo.jar
  3. /path/to/jar/ (URI スキームなしのパスは conf fs.defaultFS の URI スキームに従います)
  4. [http/https/ftp]://path/to/jar/foo.jar
1、2、3 はワイルドカードをサポートすることに注意してください。例:
  1. file://path/to/jar/*,file://path2/to/jar/*/*.jar
  2. hdfs://nameservice/path/to/jar/*,hdfs://nameservice2/path/to/jar/*/*.jar
3.1.0
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc

Spark SQL と特定のバージョンの Hive との間で共有されるクラスローダーを使用してロードされるべきクラスプレフィックスのコンマ区切りリスト。共有されるべきクラスの例としては、メタストアと通信するために必要な JDBC ドライバーがあります。共有される必要があるその他のクラスは、既に共有されているクラスとやり取りするクラスです。例えば、log4j で使用されるカスタムアペンダーなどです。

1.4.0
spark.sql.hive.metastore.barrierPrefixes (空)

Spark SQL が通信している各 Hive バージョンのために明示的に再ロードされるべきクラスプレフィックスのコンマ区切りリスト。例えば、通常は共有されるプレフィックス(例:org.apache.spark.*)で宣言されている Hive UDF などです。

1.4.0