移行ガイド: SQL、データセット、データフレーム

Spark SQL 3.4 から 3.5 へのアップグレード

Spark SQL 3.3 から 3.4 へのアップグレード

Spark SQL 3.2 から 3.3 へのアップグレード

Spark SQL 3.1 から 3.2 へのアップグレード

Spark SQL 3.0 から 3.1 へのアップグレード

Spark SQL 3.0.1 から 3.0.2 へのアップグレード

Spark SQL 3.0 から 3.0.1 へのアップグレード

Spark SQL 2.4 から 3.0 へのアップグレード

データセット/データフレーム API

DDL ステートメント

UDF と組み込み関数

クエリエンジン

データソース

その他

Spark SQL 2.4.7 から 2.4.8 へのアップグレード

Spark SQL 2.4.5 から 2.4.6 へのアップグレード

Spark SQL 2.4.4 から 2.4.5 へのアップグレード

Spark SQL 2.4.3 から 2.4.4 へのアップグレード

Spark SQL 2.4 から 2.4.1 へのアップグレード

Spark SQL 2.3 から 2.4 へのアップグレード

Spark SQL 2.2 から 2.3 へのアップグレード

</thead> <tr> <td> NullType </td> <td>NullType</td> <td>IntegerType</td> <td>LongType</td> <td>DecimalType(38,0)</td> <td>DoubleType</td> <td>DateType</td> <td>TimestampType</td> <td>StringType</td> </tr> <tr> <td> IntegerType </td> <td>IntegerType</td> <td>IntegerType</td> <td>LongType</td> <td>DecimalType(38,0)</td> <td>DoubleType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> LongType </td> <td>LongType</td> <td>LongType</td> <td>DecimalType(38,0)</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> DecimalType(38,0)* </td> <td>DecimalType(38,0)</td> <td>DecimalType(38,0)</td> <td>DecimalType(38,0)</td> <td>DecimalType(38,0)</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> DoubleType </td> <td>DoubleType</td> <td>DoubleType</td> <td>StringType</td> <td>StringType</td> <td>DoubleType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> <tr> <td> DateType </td> <td>DateType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>DateType</td> <td>TimestampType</td> <td>StringType</td> </tr> <tr> <td> TimestampType </td> <td>TimestampType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>TimestampType</td> <td>TimestampType</td> <td>StringType</td> </tr> <tr> <td> StringType </td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> <td>StringType</td> </tr> </table>

Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally does not cover all other combinations of scales and precisions because currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.

Spark SQL 2.1 から 2.2 へのアップグレード

Spark SQL 2.0 から 2.1 へのアップグレード

Spark SQL 1.6 から 2.0 へのアップグレード

Spark SQL 1.5 から 1.6 へのアップグレード

   ./sbin/start-thriftserver.sh \
     --conf spark.sql.hive.thriftServer.singleSession=true \
     ...
   

Spark SQL 1.4 から 1.5 へのアップグレード

Spark SQL 1.3 から 1.4 へのアップグレード

DataFrameデータリーダー/ライターインターフェイス

ユーザーからのフィードバックに基づいて、データの読み込み(SQLContext.read)とデータの書き出し(DataFrame.write)のための、より流動的な新しいAPIを作成し、古いAPI(例:SQLContext.parquetFileSQLContext.jsonFile)を非推奨にしました。

詳細については、SQLContext.readScalaJavaPython)と DataFrame.writeScalaJavaPython)のAPIドキュメントを参照してください。

DataFrame.groupByはグループ化カラムを保持します。

ユーザーからのフィードバックに基づいて、DataFrame.groupBy().agg()のデフォルトの動作を変更し、結果のDataFrameにグループ化カラムを保持するようにしました。1.3の動作を維持するには、spark.sql.retainGroupColumnsfalseに設定します。

import pyspark.sql.functions as func

# In 1.3.x, in order for the grouping column "department" to show up,
# it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))

# In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(func.max("age"), func.sum("expense"))

# Revert to 1.3.x behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg($"department", max("age"), sum("expense"))

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"))

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false")
// In 1.3.x, in order for the grouping column "department" to show up,
// it must be included explicitly as part of the agg function call.
df.groupBy("department").agg(col("department"), max("age"), sum("expense"));

// In 1.4+, grouping column "department" is included automatically.
df.groupBy("department").agg(max("age"), sum("expense"));

// Revert to 1.3 behavior (not retaining grouping column) by:
sqlContext.setConf("spark.sql.retainGroupColumns", "false");

DataFrame.withColumnの動作変更

1.4より前は、DataFrame.withColumn()はカラムの追加のみをサポートしていました。同じ名前の既存のカラムがある場合でも、カラムは常に指定された名前の新しいカラムとして結果のDataFrameに追加されます。1.4以降、DataFrame.withColumn()は、すべての既存のカラムの名前と異なる名前のカラムを追加するか、同じ名前の既存のカラムを置き換えることをサポートしています。

この変更はScala APIのみであり、PySparkおよびSparkRには適用されないことに注意してください。

Spark SQL 1.0-1.2 から 1.3 へのアップグレード

Spark 1.3では、Spark SQLから「Alpha」ラベルを削除し、その一環として利用可能なAPIを整理しました。Spark 1.3以降、Spark SQLは1.Xシリーズの他のリリースとのバイナリ互換性を提供します。この互換性保証は、明示的に不安定(つまり、DeveloperAPIまたはExperimental)としてマークされているAPIを除外します。

SchemaRDDからDataFrameへの名前変更

Spark SQL 1.3にアップグレードする際にユーザーが最も気づく大きな変更は、SchemaRDDDataFrame に名前変更されたことです。これは主に、DataFrameがRDDから直接継承するのではなく、独自の実装を通じてRDDが提供する機能のほとんどを提供するためです。DataFrameは、.rdd メソッドを呼び出すことで、引き続きRDDに変換できます。

Scalaでは、一部のユースケースでソースコードの互換性を提供するために、SchemaRDD から DataFrame への型エイリアスがあります。それでも、ユーザーはコードを DataFrame を使用するように更新することを推奨します。JavaおよびPythonユーザーは、コードを更新する必要があります。

JavaとScala APIの統一

Spark 1.3より前には、Scala APIをミラーリングする個別のJava互換クラス(JavaSQLContext および JavaSchemaRDD)がありました。Spark 1.3では、Java APIとScala APIが統合されました。どちらの言語のユーザーも、SQLContextDataFrame を使用する必要があります。一般に、これらのクラスは両方の言語で使用可能な型(つまり、言語固有のコレクションの代わりに Array など)を使用しようとします。共通の型が存在しない場合(たとえば、クロージャまたはマップを渡す場合)、代わりに関数オーバーロードが使用されます。

さらに、Java固有の型APIは削除されました。ScalaとJavaの両方のユーザーは、org.apache.spark.sql.types に存在するクラスを使用して、プログラムでスキーマを記述する必要があります。

暗黙的な変換の分離とdslパッケージの削除(Scalaのみ)

Spark 1.3より前のコード例の多くは、import sqlContext._ で始まり、これによりsqlContextのすべての関数がスコープに取り込まれていました。Spark 1.3では、RDDDataFrame に変換するための暗黙的な変換を、SQLContext 内のオブジェクトに分離しました。ユーザーは import sqlContext.implicits._ と記述する必要があります。

さらに、暗黙的な変換は、自動的に適用されるのではなく、Product(つまり、ケースクラスまたはタプル)で構成されるRDDにのみ toDF メソッドで拡張するようになりました。

DSL内で関数を使用する場合(現在はDataFrame APIに置き換えられています)、ユーザーは org.apache.spark.sql.catalyst.dsl をインポートしていました。代わりに、公開されているdataframe関数APIを使用する必要があります:import org.apache.spark.sql.functions._

org.apache.spark.sqlのDataTypeの型エイリアスの削除(Scalaのみ)

Spark 1.3では、DataType の基本sqlパッケージに存在していた型エイリアスが削除されます。代わりに、org.apache.spark.sql.types のクラスをインポートする必要があります。

UDF登録がsqlContext.udfに移動(JavaおよびScala)

DataFrame DSLまたはSQLでの使用のためにUDFを登録するために使用される関数は、SQLContext の udf オブジェクトに移動されました。

sqlContext.udf.register("strLen", (s: String) => s.length())
sqlContext.udf().register("strLen", (String s) -> s.length(), DataTypes.IntegerType);

Python UDFの登録は変更されていません。

Apache Hive との互換性

Spark SQLは、Hive Metastore、SerDe、およびUDFとの互換性を持つように設計されています。現在、Hive SerDeとUDFは組み込みのHiveに基づいており、Spark SQLはさまざまなバージョンのHive Metastore(0.12.0から2.3.9、および3.0.0から3.1.3)に接続できます。また、異なるバージョンのHive Metastoreとの対話を参照してください。

既存のHiveウェアハウスへのデプロイ

Spark SQL Thrift JDBCサーバーは、既存のHiveインストールとの「すぐに使える」互換性を持つように設計されています。既存のHive Metastoreを変更したり、テーブルのデータ配置やパーティショニングを変更したりする必要はありません。

サポートされているHive機能

Spark SQLは、次のようなHive機能の大部分をサポートしています。

サポートされていないHive機能

以下は、現在サポートしていないHive機能のリストです。これらの機能のほとんどは、Hiveのデプロイではめったに使用されません。

難解なHive機能

Hive入出力フォーマット

Hiveの最適化

いくつかのHiveの最適化はまだSparkには含まれていません。これらのいくつか(インデックスなど)は、Spark SQLのインメモリ計算モデルのため重要性が低くなっています。その他は、今後のSpark SQLのリリースで対応予定です。

Hive UDF/UDTF/UDAF

Hive UDF/UDTF/UDAFのすべてのAPIがSpark SQLでサポートされているわけではありません。以下は、サポートされていないAPIです。

互換性のないHive UDF

以下は、HiveとSparkが異なる結果を生成するシナリオです。