他のDBMSとの送受信#

pandas API on Sparkで他のDBMSとやり取りするAPIは、pandasのものとは少し異なります。これは、pandas API on SparkがPySparkのJDBC APIを活用して、他のDBMSとの間で読み書きを行うためです。

外部DBMSとの読み書きを行うAPIは以下の通りです。

read_sql_table(table_name, con[, schema, ...])

SQLデータベーステーブルをDataFrameに読み込みます。

read_sql_query(sql, con[, index_col])

SQLクエリをDataFrameに読み込みます。

read_sql(sql, con[, index_col, columns])

SQLクエリまたはデータベーステーブルをDataFrameに読み込みます。

pandas-on-Sparkでは、`con`に正規化されたJDBC URLが必要であり、PySparkのJDBC APIのオプションに関する追加のキーワード引数を受け付けることができます。

ps.read_sql(..., dbtable="...", driver="", keytab="", ...)

DataFrameの読み書き#

以下の例では、SQLiteのテーブルを読み書きします。

まず、PythonのSQLiteライブラリを使用して、以下のように`example`データベースを作成します。これは後でpandas-on-Sparkに読み込まれます。

import sqlite3

con = sqlite3.connect('example.db')
cur = con.cursor()
# Create table
cur.execute(
    '''CREATE TABLE stocks
       (date text, trans text, symbol text, qty real, price real)''')
# Insert a row of data
cur.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
# Save (commit) the changes
con.commit()
con.close()

Pandas API on Sparkは読み込みにJDBCドライバーを必要とするため、使用するデータベースのドライバーがSparkのクラスパス上にある必要があります。SQLite JDBCドライバーは、例えば以下のようにダウンロードできます。

curl -O https://repo1.maven.org/maven2/org/xerial/sqlite-jdbc/3.34.0/sqlite-jdbc-3.34.0.jar

その後、まずSparkセッションに追加する必要があります。追加すると、pandas API on SparkはSparkセッションを自動的に検出し、利用します。

import os

from pyspark.sql import SparkSession

(SparkSession.builder
    .master("local")
    .appName("SQLite JDBC")
    .config(
        "spark.jars",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
    .config(
        "spark.driver.extraClassPath",
        "{}/sqlite-jdbc-3.34.0.jar".format(os.getcwd()))
    .getOrCreate())

これで、テーブルを読み込む準備ができました。

import pyspark.pandas as ps

df = ps.read_sql("stocks", con="jdbc:sqlite:{}/example.db".format(os.getcwd()))
df
         date trans symbol    qty  price
0  2006-01-05   BUY   RHAT  100.0  35.14

以下のように`stocks`テーブルに書き戻すこともできます。

df.price += 1
df.spark.to_spark_io(
    format="jdbc", mode="append",
    dbtable="stocks", url="jdbc:sqlite:{}/example.db".format(os.getcwd()))
ps.read_sql("stocks", con="jdbc:sqlite:{}/example.db".format(os.getcwd()))
         date trans symbol    qty  price
0  2006-01-05   BUY   RHAT  100.0  35.14
1  2006-01-05   BUY   RHAT  100.0  36.14