JDBCによる他のデータベースへの接続

Spark SQLには、JDBCを使用して他のデータベースからデータを読み取ることができるデータソースも含まれています。この機能は、JdbcRDD を使用するよりも優先する必要があります。これは、結果がDataFrameとして返され、Spark SQLで簡単に処理したり、他のデータソースと結合したりできるためです。JDBCデータソースは、ユーザーがClassTagを提供する必要がないため、JavaまたはPythonからも使いやすくなっています。(これは、他のアプリケーションがSpark SQLを使用してクエリを実行できるようにするSpark SQL JDBCサーバーとは異なります。)

開始するには、Sparkのクラスパスに特定のデータベースのJDBCドライバを含める必要があります。たとえば、Spark ShellからPostgreSQLに接続するには、次のコマンドを実行します。

./bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

データソースオプション

Sparkは、JDBCに対して次の大文字と小文字を区別しないオプションをサポートしています。JDBCのデータソースオプションは、

接続プロパティについては、データソースオプションでJDBC接続プロパティを指定できます。userpasswordは通常、データソースへのログインのための接続プロパティとして提供されます。

プロパティ名デフォルト意味スコープ
url (なし) 接続するための形式jdbc:subprotocol:subnameのJDBC URL。ソース固有の接続プロパティはURLに指定できます。例:jdbc:postgresql://localhost/test?user=fred&password=secret 読み取り/書き込み
dbtable (なし) 読み取りまたは書き込みを行うJDBCテーブル。読み取りパスで使用する場合、SQLクエリにおけるFROM句で有効なものは何でも使用できます。たとえば、完全なテーブルではなく、括弧内のサブクエリを使用することもできます。dbtableオプションとqueryオプションを同時に指定することはできません。 読み取り/書き込み
query (なし) Sparkにデータを読み込むために使用されるクエリ。指定されたクエリは括弧で囲まれ、FROM句のサブクエリとして使用されます。Sparkはサブクエリ句にエイリアスも割り当てます。例として、SparkはJDBCソースに対して次の形式のクエリを実行します。

SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

このオプションを使用する場合、いくつかの制限事項があります。
  1. dbtableオプションとqueryオプションを同時に指定することはできません。
  2. queryオプションとpartitionColumnオプションを同時に指定することはできません。partitionColumnオプションを指定する必要がある場合、サブクエリはdbtableオプションを使用して指定でき、パーティション列はdbtableの一部として提供されるサブクエリエイリアスを使用して修飾できます。

    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("query", "select c1, c2 from t1")
    .load()
読み取り/書き込み
prepareQuery (なし) queryと組み合わせて最終的なクエリを形成するプレフィックス。指定されたqueryFROM句のサブクエリとして括弧で囲まれ、一部のデータベースはサブクエリですべての句をサポートしていないため、prepareQueryプロパティは、このような複雑なクエリを実行する方法を提供します。例として、SparkはJDBCソースに対して次の形式のクエリを実行します。

<prepareQuery> SELECT <columns> FROM (<user_specified_query>) spark_gen_alias

いくつかの例を以下に示します。
  1. MSSQL ServerはサブクエリでWITH句を受け付けませんが、そのようなクエリをprepareQueryqueryに分割できます。
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "WITH t AS (SELECT x, y FROM tbl)")
    .option("query", "SELECT * FROM t WHERE x > 10")
    .load()
  2. MSSQL Serverはサブクエリでテンポラリテーブル句を受け付けませんが、そのようなクエリをprepareQueryqueryに分割できます。
    spark.read.format("jdbc")
    .option("url", jdbcUrl)
    .option("prepareQuery", "(SELECT * INTO #TempTable FROM (SELECT * FROM tbl) t)")
    .option("query", "SELECT * FROM #TempTable")
    .load()
読み取り/書き込み
driver (なし) このURLに接続するために使用するJDBCドライバのクラス名。 読み取り/書き込み
partitionColumn、lowerBound、upperBound (なし) これらのオプションは、いずれか1つが指定されている場合、すべて指定する必要があります。さらに、numPartitionsを指定する必要があります。これらは、複数のワーカーから並列に読み込む場合にテーブルをパーティション分割する方法を記述します。partitionColumnは、問題のテーブルの数値、日付、またはタイムスタンプ列である必要があります。lowerBoundupperBoundは、行をテーブルでフィルタリングするためではなく、パーティションストライドを決定するためにのみ使用されることに注意してください。そのため、テーブルのすべての行がパーティション分割されて返されます。このオプションは読み取りのみに適用されます。

spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1")
.option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()
読み取り
numPartitions (なし) テーブルの読み取りと書き込みの並列処理に使用できるパーティションの最大数。これは、同時JDBC接続の最大数も決定します。書き込むパーティション数がこの制限を超える場合、書き込む前にcoalesce(numPartitions)を呼び出してこの制限に減らします。 読み取り/書き込み
queryTimeout 0 ドライバがStatementオブジェクトの実行を待機する秒数。0は制限がないことを意味します。書き込みパスでは、このオプションはJDBCドライバがAPI setQueryTimeoutを実装する方法に依存します(例:h2 JDBCドライバは、全体のJDBCバッチではなく、各クエリのタイムアウトを確認します)。 読み取り/書き込み
fetchsize 0 JDBCフェッチサイズ。これは、1回のラウンドトリップでフェッチする行数を決定します。これは、低いフェッチサイズをデフォルトとするJDBCドライバ(例:10行のOracle)のパフォーマンスを向上させるのに役立ちます。 読み取り
batchsize 1000 JDBCバッチサイズ。これは、1回のラウンドトリップで挿入する行数を決定します。これは、JDBCドライバのパフォーマンスを向上させるのに役立ちます。このオプションは書き込みのみに適用されます。 書き込み
isolationLevel READ_UNCOMMITTED 現在の接続に適用されるトランザクション分離レベル。NONEREAD_COMMITTEDREAD_UNCOMMITTEDREPEATABLE_READ、またはSERIALIZABLEのいずれかで、JDBCのConnectionオブジェクトによって定義された標準的なトランザクション分離レベルに対応し、デフォルトはREAD_UNCOMMITTEDです。java.sql.Connectionのドキュメントを参照してください。 書き込み
sessionInitStatement (なし) リモートDBへの各データベースセッションが開かれ、データの読み取りを開始する前に、このオプションはカスタムSQLステートメント(またはPL/SQLブロック)を実行します。これを使用して、セッション初期化コードを実装します。例:option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""") 読み取り
truncate false これはJDBCライター関連のオプションです。SaveMode.Overwriteが有効になっている場合、このオプションにより、Sparkは既存のテーブルを削除して再作成する代わりに切り捨てます。これはより効率的で、テーブルのメタデータ(例:インデックス)が削除されるのを防ぎます。ただし、新しいデータのスキーマが異なる場合など、一部のケースでは機能しません。失敗した場合、ユーザーはDROP TABLEを再度使用するためにtruncateオプションをオフにする必要があります。また、DBMS間でTRUNCATE TABLEの動作が異なるため、常に安全に使用できるとは限りません。MySQLDialect、DB2Dialect、MsSqlServerDialect、DerbyDialect、およびOracleDialectはこれをサポートしますが、PostgresDialectおよびデフォルトのJDBCDirectはサポートしません。不明な、およびサポートされていないJDBCDirectの場合、ユーザーオプションtruncateは無視されます。書き込み
cascadeTruncate 各JDBCDialectのisCascadeTruncateで指定されている、問題のJDBCデータベースのデフォルトのカスケード切り捨て動作。 これはJDBCライター関連のオプションです。有効になっている場合、そしてJDBCデータベース(現時点ではPostgreSQLとOracle)でサポートされている場合、このオプションにより、TRUNCATE TABLE t CASCADEの実行が許可されます(PostgreSQLの場合、子孫テーブルを誤って切り捨てるのを防ぐために、TRUNCATE TABLE ONLY t CASCADEが実行されます)。これは他のテーブルに影響するため、注意して使用する必要があります。 書き込み
createTableOptions これはJDBCライター関連のオプションです。指定されている場合、このオプションにより、テーブルの作成時にデータベース固有のテーブルとパーティションオプションを設定できます(例:CREATE TABLE t (name string) ENGINE=InnoDB.)。 書き込み
createTableColumnTypes (なし) テーブルを作成する際、デフォルトの代わりに使用するデータベース列データ型。データ型情報は、CREATE TABLE列構文と同じ形式で指定する必要があります(例:"name CHAR(64), comments VARCHAR(1024)")。指定された型は、有効なSpark SQLデータ型である必要があります。 書き込み
customSchema (なし) JDBCコネクタからデータを読み取るために使用するカスタムスキーマ。例:"id DECIMAL(38, 0), name STRING"。部分的なフィールドを指定することもでき、他のフィールドはデフォルトの型マッピングを使用します。例:"id DECIMAL(38, 0)"。列名は、JDBCテーブルの対応する列名と同一である必要があります。ユーザーは、デフォルトを使用する代わりに、Spark SQLの対応するデータ型を指定できます。 読み取り
pushDownPredicate true JDBCデータソースへの述語プッシュダウンを有効または無効にするオプション。デフォルト値はtrueで、この場合、Sparkは可能な限りフィルターをJDBCデータソースにプッシュダウンします。それ以外の場合は、falseに設定すると、フィルターはJDBCデータソースにプッシュダウンされず、すべてのフィルターはSparkによって処理されます。述語プッシュダウンは通常、述語フィルタリングがJDBCデータソースよりもSparkによって高速に実行される場合にオフにされます。 読み取り
pushDownAggregate true V2 JDBCデータソースにおける集計プッシュダウンの有効化/無効化オプションです。デフォルト値はtrueで、この場合、Sparkは集計をJDBCデータソースにプッシュダウンします。falseに設定すると、集計はJDBCデータソースにプッシュダウンされません。集計のプッシュダウンは、Sparkによる集計処理がJDBCデータソースによる処理よりも高速な場合に通常オフになります。集計がプッシュダウンされるのは、すべての集計関数と関連するフィルタがプッシュダウンできる場合のみであることに注意してください。numPartitionsが1の場合、またはグループ化キーがpartitionColumnと同一の場合、Sparkは集計をデータソースに完全にプッシュダウンし、データソース出力に対する最終的な集計を適用しません。それ以外の場合は、Sparkはデータソース出力に対して最終的な集計を適用します。 読み取り
pushDownLimit true V2 JDBCデータソースへのLIMITプッシュダウンの有効化/無効化オプションです。LIMITプッシュダウンには、LIMIT + SORT(別名Top N演算子)も含まれます。デフォルト値はtrueで、この場合、SparkはLIMITまたはLIMIT with SORTをJDBCデータソースにプッシュダウンします。falseに設定すると、LIMITまたはLIMIT with SORTはJDBCデータソースにプッシュダウンされません。numPartitionsが1より大きい場合、LIMITまたはLIMIT with SORTがプッシュダウンされていても、Sparkはデータソースからの結果に対してLIMITまたはLIMIT with SORTを適用します。一方、LIMITまたはLIMIT with SORTがプッシュダウンされ、numPartitionsが1の場合、Sparkはデータソースからの結果に対してLIMITまたはLIMIT with SORTを適用しません。 読み取り
pushDownOffset true V2 JDBCデータソースへのOFFSETプッシュダウンの有効化/無効化オプションです。デフォルト値はtrueで、この場合、SparkはOFFSETをJDBCデータソースにプッシュダウンします。falseに設定すると、SparkはOFFSETをJDBCデータソースにプッシュダウンしません。pushDownOffsetがtrueで、numPartitionsが1の場合、OFFSETはJDBCデータソースにプッシュダウンされます。それ以外の場合は、OFFSETはプッシュダウンされず、Sparkはデータソースからの結果に対してOFFSETを適用します。 読み取り
pushDownTableSample true V2 JDBCデータソースへのTABLESAMPLEプッシュダウンの有効化/無効化オプションです。デフォルト値はtrueで、この場合、SparkはTABLESAMPLEをJDBCデータソースにプッシュダウンします。falseに設定すると、TABLESAMPLEはJDBCデータソースにプッシュダウンされません。 読み取り
keytab (なし) JDBCクライアントのKerberos keytabファイルの場所です(`--files`オプションでspark-submitを使用するか、手動ですべてのノードに事前にアップロードする必要があります)。パス情報が見つかった場合、Sparkはkeytabを手動で配布したものと見なします。それ以外の場合は`--files`と見なします。keytabprincipalの両方が定義されている場合、SparkはKerberos認証を試みます。 読み取り/書き込み
principal (なし) JDBCクライアントのKerberosプリンシパル名を指定します。keytabprincipalの両方が定義されている場合、SparkはKerberos認証を試みます。 読み取り/書き込み
refreshKrb5Config false このオプションは、新しい接続を確立する前に、JDBCクライアントのKerberos構成を更新するかどうかを制御します。構成を更新する場合はtrueに、更新しない場合はfalseに設定します。デフォルト値はfalseです。このオプションをtrueに設定して複数の接続を確立しようとすると、競合状態が発生する可能性があることに注意してください。考えられる状況の1つは次のとおりです。
  1. refreshKrb5Configフラグがセキュリティコンテキスト1で設定されている
  2. 対応するDBMSにJDBC接続プロバイダが使用されている
  3. krb5.confが変更されたが、JVMはまだ再読み込みが必要であることを認識していない
  4. Sparkはセキュリティコンテキスト1に対して正常に認証された
  5. JVMは変更されたkrb5.confからセキュリティコンテキスト2を読み込む
  6. Sparkは以前保存されたセキュリティコンテキスト1を復元する
  7. 変更されたkrb5.confの内容が消えた
読み取り/書き込み
connectionProvider (なし) このURLへの接続に使用するJDBC接続プロバイダの名前です(例:`db2`、`mssql`)。JDBCデータソースでロードされたプロバイダの1つである必要があります。指定されたドライバとオプションを処理できるプロバイダが複数ある場合のあいまいさを解消するために使用されます。選択されたプロバイダは、spark.sql.sources.disabledJdbcConnProviderListによって無効にされてはいけません。 読み取り/書き込み
preferTimestampNTZ false このオプションがtrueに設定されている場合、すべてのタイムスタンプはTIMESTAMP WITHOUT TIME ZONEとして推論されます。それ以外の場合は、タイムスタンプはローカルタイムゾーン付きのTIMESTAMPとして読み取られます。 読み取り

keytabを使用したKerberos認証は、JDBCドライバによって常にサポートされるとは限りません。
keytabprincipal設定オプションを使用する前に、次の要件が満たされていることを確認してください。

次のデータベースには、組み込みの接続プロバイダがあります。

要件が満たされていない場合は、JdbcConnectionProvider開発者APIを使用してカスタム認証を処理することを検討してください。

# Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
# Loading data from a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()

jdbcDF2 = spark.read \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying dataframe column data types on read
jdbcDF3 = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .option("customSchema", "id DECIMAL(38, 0), name STRING") \
    .load()

# Saving data to a JDBC source
jdbcDF.write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .save()

jdbcDF2.write \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})

# Specifying create table column data types on write
jdbcDF.write \
    .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \
    .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
          properties={"user": "username", "password": "password"})
Sparkリポジトリの"examples/src/main/python/sql/datasource.py"に完全なコード例があります。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()

val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

jdbcDF2.write
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
Sparkリポジトリの"examples/src/main/scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala"に完全なコード例があります。
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
Dataset<Row> jdbcDF = spark.read()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load();

Properties connectionProperties = new Properties();
connectionProperties.put("user", "username");
connectionProperties.put("password", "password");
Dataset<Row> jdbcDF2 = spark.read()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Saving data to a JDBC source
jdbcDF.write()
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save();

jdbcDF2.write()
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);

// Specifying create table column data types on write
jdbcDF.write()
  .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
  .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties);
Sparkリポジトリの"examples/src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java"に完全なコード例があります。
# Loading data from a JDBC source
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")

# Saving data to a JDBC source
write.jdbc(df, "jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
Sparkリポジトリの"examples/src/main/r/RSparkSQLExample.R"に完全なコード例があります。
CREATE TEMPORARY VIEW jdbcTable
USING org.apache.spark.sql.jdbc
OPTIONS (
  url "jdbc:postgresql:dbserver",
  dbtable "schema.tablename",
  user 'username',
  password 'password'
)

INSERT INTO TABLE jdbcTable
SELECT * FROM resultTable