JDBCによる他のデータベースへの接続
Spark SQLには、JDBCを使用して他のデータベースからデータを読み取ることができるデータソースも含まれています。この機能は、JdbcRDD を使用するよりも優先する必要があります。これは、結果がDataFrameとして返され、Spark SQLで簡単に処理したり、他のデータソースと結合したりできるためです。JDBCデータソースは、ユーザーがClassTagを提供する必要がないため、JavaまたはPythonからも使いやすくなっています。(これは、他のアプリケーションがSpark SQLを使用してクエリを実行できるようにするSpark SQL JDBCサーバーとは異なります。)
開始するには、Sparkのクラスパスに特定のデータベースのJDBCドライバを含める必要があります。たとえば、Spark ShellからPostgreSQLに接続するには、次のコマンドを実行します。
./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar
データソースオプション
Sparkは、JDBCに対して次の大文字と小文字を区別しないオプションをサポートしています。JDBCのデータソースオプションは、
.option
/.options
メソッドを介して設定できます。DataFrameReader
DataFrameWriter
- CREATE TABLE USING DATA_SOURCEにおける
OPTIONS
句
接続プロパティについては、データソースオプションでJDBC接続プロパティを指定できます。user
とpassword
は通常、データソースへのログインのための接続プロパティとして提供されます。
プロパティ名 | デフォルト | 意味 | スコープ |
---|---|---|---|
url |
(なし) | 接続するための形式jdbc:subprotocol:subname のJDBC URL。ソース固有の接続プロパティはURLに指定できます。例:jdbc:postgresql://localhost/test?user=fred&password=secret |
読み取り/書き込み |
dbtable |
(なし) | 読み取りまたは書き込みを行うJDBCテーブル。読み取りパスで使用する場合、SQLクエリにおけるFROM 句で有効なものは何でも使用できます。たとえば、完全なテーブルではなく、括弧内のサブクエリを使用することもできます。dbtable オプションとquery オプションを同時に指定することはできません。 |
読み取り/書き込み |
query |
(なし) | Sparkにデータを読み込むために使用されるクエリ。指定されたクエリは括弧で囲まれ、FROM 句のサブクエリとして使用されます。Sparkはサブクエリ句にエイリアスも割り当てます。例として、SparkはJDBCソースに対して次の形式のクエリを実行します。SELECT <columns> FROM (<user_specified_query>) spark_gen_alias このオプションを使用する場合、いくつかの制限事項があります。
|
読み取り/書き込み |
prepareQuery |
(なし) | query と組み合わせて最終的なクエリを形成するプレフィックス。指定されたquery はFROM 句のサブクエリとして括弧で囲まれ、一部のデータベースはサブクエリですべての句をサポートしていないため、prepareQuery プロパティは、このような複雑なクエリを実行する方法を提供します。例として、SparkはJDBCソースに対して次の形式のクエリを実行します。<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias いくつかの例を以下に示します。
|
読み取り/書き込み |
driver |
(なし) | このURLに接続するために使用するJDBCドライバのクラス名。 | 読み取り/書き込み |
partitionColumn、lowerBound、upperBound |
(なし) | これらのオプションは、いずれか1つが指定されている場合、すべて指定する必要があります。さらに、numPartitions を指定する必要があります。これらは、複数のワーカーから並列に読み込む場合にテーブルをパーティション分割する方法を記述します。partitionColumn は、問題のテーブルの数値、日付、またはタイムスタンプ列である必要があります。lowerBound とupperBound は、行をテーブルでフィルタリングするためではなく、パーティションストライドを決定するためにのみ使用されることに注意してください。そのため、テーブルのすべての行がパーティション分割されて返されます。このオプションは読み取りのみに適用されます。例 spark.read.format("jdbc")
|
読み取り |
numPartitions |
(なし) | テーブルの読み取りと書き込みの並列処理に使用できるパーティションの最大数。これは、同時JDBC接続の最大数も決定します。書き込むパーティション数がこの制限を超える場合、書き込む前にcoalesce(numPartitions) を呼び出してこの制限に減らします。 |
読み取り/書き込み |
queryTimeout |
0 |
ドライバがStatementオブジェクトの実行を待機する秒数。0は制限がないことを意味します。書き込みパスでは、このオプションはJDBCドライバがAPI setQueryTimeout を実装する方法に依存します(例:h2 JDBCドライバは、全体のJDBCバッチではなく、各クエリのタイムアウトを確認します)。 |
読み取り/書き込み |
fetchsize |
0 |
JDBCフェッチサイズ。これは、1回のラウンドトリップでフェッチする行数を決定します。これは、低いフェッチサイズをデフォルトとするJDBCドライバ(例:10行のOracle)のパフォーマンスを向上させるのに役立ちます。 | 読み取り |
batchsize |
1000 |
JDBCバッチサイズ。これは、1回のラウンドトリップで挿入する行数を決定します。これは、JDBCドライバのパフォーマンスを向上させるのに役立ちます。このオプションは書き込みのみに適用されます。 | 書き込み |
isolationLevel |
READ_UNCOMMITTED |
現在の接続に適用されるトランザクション分離レベル。NONE 、READ_COMMITTED 、READ_UNCOMMITTED 、REPEATABLE_READ 、またはSERIALIZABLE のいずれかで、JDBCのConnectionオブジェクトによって定義された標準的なトランザクション分離レベルに対応し、デフォルトはREAD_UNCOMMITTED です。java.sql.Connection のドキュメントを参照してください。 |
書き込み |
sessionInitStatement |
(なし) | リモートDBへの各データベースセッションが開かれ、データの読み取りを開始する前に、このオプションはカスタムSQLステートメント(またはPL/SQLブロック)を実行します。これを使用して、セッション初期化コードを実装します。例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") |
読み取り |
truncate |
false |
これはJDBCライター関連のオプションです。SaveMode.Overwrite が有効になっている場合、このオプションにより、Sparkは既存のテーブルを削除して再作成する代わりに切り捨てます。これはより効率的で、テーブルのメタデータ(例:インデックス)が削除されるのを防ぎます。ただし、新しいデータのスキーマが異なる場合など、一部のケースでは機能しません。失敗した場合、ユーザーはDROP TABLE を再度使用するためにtruncate オプションをオフにする必要があります。また、DBMS間でTRUNCATE TABLE の動作が異なるため、常に安全に使用できるとは限りません。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect、およびOracleDialectはこれをサポートしますが、PostgresDialectおよびデフォルトのJDBCDirectはサポートしません。不明な、およびサポートされていないJDBCDirectの場合、ユーザーオプションtruncate は無視されます。 | 書き込み |
cascadeTruncate |
各JDBCDialectのisCascadeTruncate で指定されている、問題のJDBCデータベースのデフォルトのカスケード切り捨て動作。 |
これはJDBCライター関連のオプションです。有効になっている場合、そしてJDBCデータベース(現時点ではPostgreSQLとOracle)でサポートされている場合、このオプションにより、TRUNCATE TABLE t CASCADE の実行が許可されます(PostgreSQLの場合、子孫テーブルを誤って切り捨てるのを防ぐために、TRUNCATE TABLE ONLY t CASCADE が実行されます)。これは他のテーブルに影響するため、注意して使用する必要があります。 |
書き込み |
createTableOptions |
|
これはJDBCライター関連のオプションです。指定されている場合、このオプションにより、テーブルの作成時にデータベース固有のテーブルとパーティションオプションを設定できます(例:CREATE TABLE t (name string) ENGINE=InnoDB. )。 |
書き込み |
createTableColumnTypes |
(なし) | テーブルを作成する際、デフォルトの代わりに使用するデータベース列データ型。データ型情報は、CREATE TABLE列構文と同じ形式で指定する必要があります(例:"name CHAR(64), comments VARCHAR(1024)" )。指定された型は、有効なSpark SQLデータ型である必要があります。 |
書き込み |
customSchema |
(なし) | JDBCコネクタからデータを読み取るために使用するカスタムスキーマ。例:"id DECIMAL(38, 0), name STRING" 。部分的なフィールドを指定することもでき、他のフィールドはデフォルトの型マッピングを使用します。例:"id DECIMAL(38, 0)" 。列名は、JDBCテーブルの対応する列名と同一である必要があります。ユーザーは、デフォルトを使用する代わりに、Spark SQLの対応するデータ型を指定できます。 |
読み取り |
pushDownPredicate |
true |
JDBCデータソースへの述語プッシュダウンを有効または無効にするオプション。デフォルト値はtrueで、この場合、Sparkは可能な限りフィルターをJDBCデータソースにプッシュダウンします。それ以外の場合は、falseに設定すると、フィルターはJDBCデータソースにプッシュダウンされず、すべてのフィルターはSparkによって処理されます。述語プッシュダウンは通常、述語フィルタリングがJDBCデータソースよりもSparkによって高速に実行される場合にオフにされます。 | 読み取り |
pushDownAggregate |
true |
V2 JDBCデータソースにおける集計プッシュダウンの有効化/無効化オプションです。デフォルト値はtrueで、この場合、Sparkは集計をJDBCデータソースにプッシュダウンします。falseに設定すると、集計はJDBCデータソースにプッシュダウンされません。集計のプッシュダウンは、Sparkによる集計処理がJDBCデータソースによる処理よりも高速な場合に通常オフになります。集計がプッシュダウンされるのは、すべての集計関数と関連するフィルタがプッシュダウンできる場合のみであることに注意してください。numPartitions が1の場合、またはグループ化キーがpartitionColumn と同一の場合、Sparkは集計をデータソースに完全にプッシュダウンし、データソース出力に対する最終的な集計を適用しません。それ以外の場合は、Sparkはデータソース出力に対して最終的な集計を適用します。 |
読み取り |
pushDownLimit |
true |
V2 JDBCデータソースへのLIMITプッシュダウンの有効化/無効化オプションです。LIMITプッシュダウンには、LIMIT + SORT(別名Top N演算子)も含まれます。デフォルト値はtrueで、この場合、SparkはLIMITまたはLIMIT with SORTをJDBCデータソースにプッシュダウンします。falseに設定すると、LIMITまたはLIMIT with SORTはJDBCデータソースにプッシュダウンされません。numPartitions が1より大きい場合、LIMITまたはLIMIT with SORTがプッシュダウンされていても、Sparkはデータソースからの結果に対してLIMITまたはLIMIT with SORTを適用します。一方、LIMITまたはLIMIT with SORTがプッシュダウンされ、numPartitions が1の場合、Sparkはデータソースからの結果に対してLIMITまたはLIMIT with SORTを適用しません。 |
読み取り |
pushDownOffset |
true |
V2 JDBCデータソースへのOFFSETプッシュダウンの有効化/無効化オプションです。デフォルト値はtrueで、この場合、SparkはOFFSETをJDBCデータソースにプッシュダウンします。falseに設定すると、SparkはOFFSETをJDBCデータソースにプッシュダウンしません。pushDownOffset がtrueで、numPartitions が1の場合、OFFSETはJDBCデータソースにプッシュダウンされます。それ以外の場合は、OFFSETはプッシュダウンされず、Sparkはデータソースからの結果に対してOFFSETを適用します。 |
読み取り |
pushDownTableSample |
true |
V2 JDBCデータソースへのTABLESAMPLEプッシュダウンの有効化/無効化オプションです。デフォルト値はtrueで、この場合、SparkはTABLESAMPLEをJDBCデータソースにプッシュダウンします。falseに設定すると、TABLESAMPLEはJDBCデータソースにプッシュダウンされません。 | 読み取り |
keytab |
(なし) | JDBCクライアントのKerberos keytabファイルの場所です(`--files`オプションでspark-submitを使用するか、手動ですべてのノードに事前にアップロードする必要があります)。パス情報が見つかった場合、Sparkはkeytabを手動で配布したものと見なします。それ以外の場合は`--files`と見なします。keytab とprincipal の両方が定義されている場合、SparkはKerberos認証を試みます。 |
読み取り/書き込み |
principal |
(なし) | JDBCクライアントのKerberosプリンシパル名を指定します。keytab とprincipal の両方が定義されている場合、SparkはKerberos認証を試みます。 |
読み取り/書き込み |
refreshKrb5Config |
false |
このオプションは、新しい接続を確立する前に、JDBCクライアントのKerberos構成を更新するかどうかを制御します。構成を更新する場合はtrueに、更新しない場合はfalseに設定します。デフォルト値はfalseです。このオプションをtrueに設定して複数の接続を確立しようとすると、競合状態が発生する可能性があることに注意してください。考えられる状況の1つは次のとおりです。
|
読み取り/書き込み |
connectionProvider |
(なし) | このURLへの接続に使用するJDBC接続プロバイダの名前です(例:`db2`、`mssql`)。JDBCデータソースでロードされたプロバイダの1つである必要があります。指定されたドライバとオプションを処理できるプロバイダが複数ある場合のあいまいさを解消するために使用されます。選択されたプロバイダは、spark.sql.sources.disabledJdbcConnProviderList によって無効にされてはいけません。 |
読み取り/書き込み |
preferTimestampNTZ |
false | このオプションがtrue に設定されている場合、すべてのタイムスタンプはTIMESTAMP WITHOUT TIME ZONEとして推論されます。それ以外の場合は、タイムスタンプはローカルタイムゾーン付きのTIMESTAMPとして読み取られます。 |
読み取り |
keytabを使用したKerberos認証は、JDBCドライバによって常にサポートされるとは限りません。keytab
とprincipal
設定オプションを使用する前に、次の要件が満たされていることを確認してください。
- 含まれているJDBCドライババージョンがkeytabを使用したKerberos認証をサポートしている。
- 使用されているデータベースをサポートする組み込みの接続プロバイダがある。
次のデータベースには、組み込みの接続プロバイダがあります。
- DB2
- MariaDB
- MS SQL
- Oracle
- PostgreSQL
要件が満たされていない場合は、JdbcConnectionProvider
開発者APIを使用してカスタム認証を処理することを検討してください。
# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
jdbcDF2 = spark.read \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.option("customSchema", "id DECIMAL(38, 0), name STRING") \
.load()
# Saving data to a JDBC source
jdbcDF.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.save()
jdbcDF2.write \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
# Specifying create table column data types on write
jdbcDF.write \
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
.jdbc("jdbc:postgresql:dbserver", "schema.tablename",
properties={"user": "username", "password": "password"})
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()
jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Saving data to a JDBC source
jdbcDF.write()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save();
jdbcDF2.write()
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
// Specifying create table column data types on write
jdbcDF.write()
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
url "jdbc:postgresql:dbserver",
dbtable "schema.tablename",
user 'username',
password 'password'
)
INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable