SQL パイプ構文

構文

概要

Apache Spark は SQL パイプ構文をサポートしており、演算子の組み合わせからクエリを構成できます。

FROM <tableName> は、TABLE <tableName> と同じように動作するスタンドアロンクエリとしてサポートされるようになりました。これは、チェーンされたパイプ SQL クエリを開始するための便利な場所を提供しますが、有効な Spark SQL クエリの末尾に 1 つ以上のパイプ演算子を追加して、ここで記述されているのと同じ一貫した動作をさせることも可能です。

サポートされているすべての演算子とそのセマンティクスの一覧については、このドキュメントの末尾にある表を参照してください。

たとえば、これは TPC-H ベンチマークのクエリ 13 です

SELECT c_count, COUNT(*) AS custdist
FROM
  (SELECT c_custkey, COUNT(o_orderkey) c_count
   FROM customer
   LEFT OUTER JOIN orders ON c_custkey = o_custkey
     AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey
  ) AS c_orders
GROUP BY c_count
ORDER BY custdist DESC, c_count DESC;

同じロジックを SQL パイプ演算子を使用して記述するには、次のように表現します

FROM customer
|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
   AND o_comment NOT LIKE '%unusual%packages%'
|> AGGREGATE COUNT(o_orderkey) c_count
   GROUP BY c_custkey
|> AGGREGATE COUNT(*) AS custdist
   GROUP BY c_count
|> ORDER BY custdist DESC, c_count DESC;

ソーステーブル

SQL パイプ構文を使用して新しいクエリを開始するには、FROM <tableName> または TABLE <tableName> 句を使用します。これにより、ソーステーブルのすべての行で構成されるリレーションが作成されます。次に、この句の末尾に 1 つ以上のパイプ演算子を追加して、さらなる変換を実行します。

射影

SQL パイプ構文は、式を評価するための合成可能な方法をサポートしています。これらの射影機能の主な利点は、以前の式に基づいて新しい式を段階的に計算できることです。各演算子は、演算子の出現順序に関係なく、入力テーブルに独立して適用されるため、ここでは横方向の列参照は必要ありません。これらの計算された列のそれぞれが、後続の演算子で使用できるようになります。

SELECT は、提供された式を評価することにより新しいテーブルを生成します。
必要に応じて DISTINCT* を使用できます。
これは、通常の Spark SQL のテーブルサブクエリにおける最も外側の SELECT と同様に機能します。

EXTEND は、提供された式を評価することにより、入力テーブルに新しい列を追加します。
これはテーブルエイリアスも保持します。
これは、通常の Spark SQL の SELECT *, new_column と同様に機能します。

DROP は、入力テーブルから列を削除します。
これは、通常の Spark SQL の SELECT * EXCEPT (column) と似ています。

SET は、入力テーブルの列値を置き換えます。
これは、通常の Spark SQL の SELECT * REPLACE (expression AS column) と似ています。

AS は、入力テーブルを転送し、各行に新しいエイリアスを導入します。

集計

一般に、集計は、通常の Spark SQL とは異なる方法で SQL パイプ構文を使用して実行されます。

フルテーブル集計を実行するには、集計式のリストを評価する AGGREGATE 演算子を使用します。これにより、出力テーブルに 1 つの行が返されます。

グループ化による集計を実行するには、GROUP BY 句を使用して AGGREGATE 演算子を使用します。これにより、グループ化式の値の各一意の組み合わせに対して 1 行が返されます。出力テーブルには、評価されたグループ化式と、それに続く評価された集計関数が含まれます。グループ化式は、後続の演算子で参照できるようにエイリアスを割り当てることができます。このように、GROUP BYSELECT の間で完全な式を繰り返す必要はありません。なぜなら、AGGREGATE は両方を実行する単一の演算子であるためです。

その他の変換

残りの演算子は、フィルタリング、結合、ソート、サンプリング、セット操作などのその他の変換に使用されます。これらの演算子は、通常、以下の表で説明されているように、通常の Spark SQL と同じように機能します。

独立性と相互運用性

SQL パイプ構文は、既存の SQL クエリとの下位互換性の問題なしに Spark で機能します。通常の Spark SQL、パイプ構文、またはその両方の組み合わせを使用して任意のクエリを記述できます。その結果、次の不変条件が常に成り立ちます。

サポートされている演算子

演算子 出力行
FROM または TABLE ソーステーブルのすべての出力行を、変更せずに返します。
SELECT 入力テーブルの各行に対して、指定された式を評価します。
EXTEND 入力テーブルの各行に対して指定された式を評価することにより、新しい列を入力テーブルに追加します。
SET 指定された式を評価した結果で入力テーブルの列を置き換えることにより、更新します。
DROP 名前によって入力テーブルの列を削除します。
AS 入力テーブルの行と列名を保持しますが、新しいテーブルエイリアスが付きます。
WHERE 条件を通過した入力行のサブセットを返します。
LIMIT 指定された行数を返します。順序 (存在する場合) を保持します。
AGGREGATE グループ化ありまたはなしで集計を実行します。
JOIN 両方の入力の行を結合し、入力テーブルとテーブル引数のフィルタリングされたクロス積を返します。
ORDER BY 指示されたとおりにソートされた後、入力行を返します。
UNION ALL 入力テーブルおよびその他のテーブル引数からの行を結合したユニオンまたはその他のセット操作を実行します。
TABLESAMPLE 指定されたサンプリングアルゴリズムによって選択された行のサブセットを返します。
PIVOT 入力行を列にピボットした新しいテーブルを返します。
UNPIVOT 入力列を行にピボットした新しいテーブルを返します。

この表は、サポートされている各パイプ演算子をリストし、それらが生成する出力行について説明しています。各演算子は、|> シンボルの前のクエリによって生成された行で構成される入力リレーションを受け入れることに注意してください。

FROM または TABLE

FROM <tableName>
TABLE <tableName>

ソーステーブルのすべての出力行を、変更せずに返します。

CREATE TABLE t AS VALUES (1, 2), (3, 4) AS t(a, b);
TABLE t;

+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

SELECT

|> SELECT <expr> [[AS] alias], ...

入力テーブルの各行に対して、指定された式を評価します。

一般に、この演算子は SQL パイプ構文で常に必要とされるわけではありません。クエリの最後またはその近くで使用して、式を評価したり、出力列のリストを指定したりできます。

最終的なクエリ結果は常に最後のパイプ演算子から返された列で構成されるため、この SELECT 演算子がない場合、出力には完全な行のすべての列が含まれます。この動作は、標準 SQL 構文の SELECT * と似ています。

必要に応じて DISTINCT* を使用できます。
これは、通常の Spark SQL のテーブルサブクエリにおける最も外側の SELECT と同様に機能します。

ウィンドウ関数も SELECT リストでサポートされています。それらを使用するには、OVER 句を指定する必要があります。WINDOW 句にウィンドウ仕様を提供できます。

集計関数はこの演算子ではサポートされていません。集計を実行するには、代わりに AGGREGATE 演算子を使用してください。

CREATE TABLE t AS VALUES (0), (1) AS t(col);

FROM t
|> SELECT col * 2 AS result;

+------+
|result|
+------+
|     0|
|     2|
+------+

EXTEND

|> EXTEND <expr> [[AS] alias], ...

入力テーブルの各行に対して指定された式を評価することにより、新しい列を入力テーブルに追加します。

EXTEND 操作の後、トップレベルの列名は更新されますが、テーブルエイリアスは元の行値を参照し続けます (たとえば、2 つのテーブル lhsrhs の間の内部結合の後、EXTEND、次に SELECT lhs.col, rhs.col)。

VALUES (0), (1) tab(col)
|> EXTEND col * 2 AS result;

+---+------+
|col|result|
+---+------+
|  0|     0|
|  1|     2|
+---+------+

SET

|> SET <column> = <expression>, ...

指定された式を評価した結果で入力テーブルの列を置き換えることにより、更新します。各このような列参照は、入力テーブルに正確に 1 回出現する必要があります。

これは、通常の Spark SQL の SELECT * EXCEPT (column), <expression> AS column と似ています。

単一の SET 句で複数の代入を実行できます。各代入は、前の代入の結果を参照できます。

SET 操作の後、トップレベルの列名は更新されますが、テーブルエイリアスは元の行値を参照し続けます (たとえば、2 つのテーブル lhsrhs の間の内部結合の後、SET、次に SELECT lhs.col, rhs.col)。

VALUES (0), (1) tab(col)
|> SET col = col * 2;

+---+
|col|
+---+
|  0|
|  2|
+---+

VALUES (0), (1) tab(col)
|> SET col = col * 2;

+---+
|col|
+---+
|  0|
|  2|
+---+

DROP

|> DROP <column>, ...

名前によって入力テーブルの列を削除します。各このような列参照は、入力テーブルに正確に 1 回出現する必要があります。

これは、通常の Spark SQL の SELECT * EXCEPT (column) と似ています。

DROP 操作の後、トップレベルの列名は更新されますが、テーブルエイリアスは元の行値を参照し続けます (たとえば、2 つのテーブル lhsrhs の間の内部結合の後、DROP、次に SELECT lhs.col, rhs.col)。

VALUES (0, 1) tab(col1, col2)
|> DROP col1;

+----+
|col2|
+----+
|   1|
+----+

AS

|> AS <alias>

入力テーブルの行と列名を保持しますが、新しいテーブルエイリアスが付きます。

この演算子は、入力テーブルに新しいエイリアスを導入するのに役立ち、後続の演算子で参照できます。テーブルの既存のエイリアスは、新しいエイリアスに置き換えられます。

SELECT または EXTEND で新しい列を追加した後、または AGGREGATE で集計を実行した後に、この演算子を使用すると便利です。これにより、後続の JOIN 演算子から列を参照するプロセスが簡素化され、より読みやすいクエリが可能になります。

VALUES (0, 1) tab(col1, col2)
|> AS new_tab
|> SELECT col1 + col2 FROM new_tab;

+-----------+
|col1 + col2|
+-----------+
|          1|
+-----------+

WHERE

|> WHERE <condition>

条件を通過した入力行のサブセットを返します。

この演算子はどこにでも出現できるため、個別の HAVING または QUALIFY 構文は必要ありません。

VALUES (0), (1) tab(col)
|> WHERE col = 1;

+---+
|col|
+---+
|  1|
+---+

LIMIT

|> [LIMIT <n>] [OFFSET <m>]

指定された行数を返します。順序 (存在する場合) を保持します。

LIMIT および OFFSET は一緒にサポートされています。LIMIT 句は OFFSET 句なしで使用することもでき、OFFSET 句は LIMIT 句なしで使用することもできます。

VALUES (0), (0) tab(col)
|> LIMIT 1;

+---+
|col|
+---+
|  0|
+---+

AGGREGATE

-- Full-table aggregation
|> AGGREGATE <agg_expr> [[AS] alias], ...

-- Aggregation with grouping
|> AGGREGATE [<agg_expr> [[AS] alias], ...] GROUP BY <grouping_expr> [AS alias], ...

グループ化された行全体または入力テーブル全体で集計を実行します。

GROUP BY 句が存在しない場合、これはフルテーブル集計を実行し、各集計式の列を持つ 1 つの結果行を返します。それ以外の場合、これはグループ化による集計を実行し、グループごとに 1 行を返します。エイリアスはグループ化式に直接割り当てることができます。

この演算子の出力列リストには、最初にグループ化列 (存在する場合)、次に集計列が含まれます。

<agg_expr> 式には、COUNTSUMAVGMIN などの標準的な集計関数、または Spark SQL がサポートするその他の集計関数を含めることができます。追加の式は、集計関数より下または上に表示される場合があります (例: MIN(FLOOR(col)) + 1)。各 <agg_expr> 式には、少なくとも 1 つの集計関数が含まれている必要があります (そうでない場合、クエリはエラーを返します)。各 <agg_expr> 式には、AS <alias> を使用した列エイリアスを含めることができ、集計関数を適用する前に重複値を削除するために DISTINCT キーワードを含めることもできます (例: COUNT(DISTINCT col))。

存在する場合、GROUP BY 句には任意の数のグループ化式を含めることができ、各 <agg_expr> 式は、グループ化式の値の各一意の組み合わせに対して評価されます。出力テーブルには、評価されたグループ化式と、それに続く評価された集計関数が含まれます。GROUP BY 式には、1 ベースの序数を含めることができます。このような序数が、後続の SELECT 句の式を参照する標準 SQL とは異なり、SQL パイプ構文では、それらは前の演算子によって生成されたリレーションの列を参照します。たとえば、TABLE t |> AGGREGATE COUNT(*) GROUP BY 2 では、入力テーブル t の 2 番目の列を参照します。

AGGREGATE 演算子は評価されたグループ化式を自動的に出力に含めるため、GROUP BYSELECT の間で完全な式を繰り返す必要はありません。同様に、AGGREGATE 演算子の後、AGGREGATE がグループ化列と集計列の両方を 1 ステップで返すため、後続の SELECT 演算子を発行する必要がないことがよくあります。

-- Full-table aggregation
VALUES (0), (1) tab(col)
|> AGGREGATE COUNT(col) AS count;

+-----+
|count|
+-----+
|    2|
+-----+

-- Aggregation with grouping
VALUES (0, 1), (0, 2) tab(col1, col2)
|> AGGREGATE COUNT(col2) AS count GROUP BY col1;

+----+-----+
|col1|count|
+----+-----+
|   0|    2|
+----+-----+

JOIN

|> [LEFT | RIGHT | FULL | CROSS | SEMI | ANTI | NATURAL | LATERAL] JOIN <table> [ON <condition> | USING(col, ...)]

両方の入力の行を結合し、パイプ入力テーブルと JOIN キーワードの後のテーブル式のフィルタリングされたクロス積を返します。これは、パイプ演算子入力テーブルが結合の左側になり、テーブル引数が結合の右側になる、通常の SQL の JOIN 句と同様の方法で機能します。

標準的な結合修飾子 (例: LEFTRIGHTFULL) は、JOIN キーワードの前にサポートされています。

結合述語は、結合の両方の入力からの列を参照する必要がある場合があります。この場合、両方の入力に同じ名前の列がある場合に列を区別するためにテーブルエイリアスを使用する必要がある場合があります。AS 演算子は、結合の左側になるパイプ入力テーブルに新しいエイリアスを導入するのに役立ちます。必要に応じて、標準構文を使用して、結合の右側になるテーブル引数にエイリアスを割り当てます。

SELECT 0 AS a, 1 AS b
|> AS lhs
|> JOIN VALUES (0, 2) rhs(a, b) ON (lhs.a = rhs.a);

+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
|  0|  1|  0|  2|
+---+---+---+---+

VALUES ('apples', 3), ('bananas', 4) t(item, sales)
|> AS produce_sales
|> LEFT JOIN
     (SELECT "apples" AS item, 123 AS id) AS produce_data
     USING (item)
|> SELECT produce_sales.item, sales, id;

/*---------+-------+------+
 | item    | sales | id   |
 +---------+-------+------+
 | apples  | 3     | 123  |
 | bananas | 4     | NULL |
 +---------+-------+------*/

ORDER BY

|> ORDER BY <expr> [ASC | DESC], ...

指示されたとおりにソートされた後、入力行を返します。NULLS FIRST/LAST を含む標準修飾子がサポートされています。

VALUES (0), (1) tab(col)
|> ORDER BY col DESC;

+---+
|col|
+---+
|  1|
|  0|
+---+

UNION、INTERSECT、EXCEPT

|> {UNION | INTERSECT | EXCEPT} {ALL | DISTINCT} (<query>)

入力テーブルまたはサブクエリからの行を結合したユニオンまたはその他のセット操作を実行します。

VALUES (0), (1) tab(a, b)
|> UNION ALL VALUES (2), (3) tab(c, d);

+---+----+
|  a|   b|
+---+----+
|  0|   1|
|  2|   3|
+---+----+

TABLESAMPLE

|> TABLESAMPLE <method>(<size> {ROWS | PERCENT})

指定されたサンプリングアルゴリズムによって選択された行のサブセットを返します。

VALUES (0), (0), (0), (0) tab(col)
|> TABLESAMPLE (1 ROWS);

+---+
|col|
+---+
|  0|
+---+

VALUES (0), (0) tab(col)
|> TABLESAMPLE (100 PERCENT);

+---+
|col|
+---+
|  0|
|  0|
+---+

PIVOT

|> PIVOT (agg_expr FOR col IN (val1, ...))

入力行を列にピボットした新しいテーブルを返します。

VALUES
  ("dotNET", 2012, 10000),
  ("Java", 2012, 20000),
  ("dotNET", 2012, 5000),
  ("dotNET", 2013, 48000),
  ("Java", 2013, 30000)
  courseSales(course, year, earnings)
|> PIVOT (
     SUM(earnings)
     FOR COURSE IN ('dotNET', 'Java')
  )

+----+------+------+
|year|dotNET|  Java|
+----+------+------+
|2012| 15000| 20000|
|2013| 48000| 30000|
+----+------+------+

UNPIVOT

|> UNPIVOT (value_col FOR key_col IN (col1, ...))

入力列を行にピボットした新しいテーブルを返します。

VALUES
  ("dotNET", 2012, 10000),
  ("Java", 2012, 20000),
  ("dotNET", 2012, 5000),
  ("dotNET", 2013, 48000),
  ("Java", 2013, 30000)
  courseSales(course, year, earnings)
|> UNPIVOT (
  earningsYear FOR `year` IN (`2012`, `2013`, `2014`)

+--------+------+--------+
|  course|  year|earnings|
+--------+------+--------+
|    Java|  2012|   20000|
|    Java|  2013|   30000|
|  dotNET|  2012|   15000|
|  dotNET|  2013|   48000|
|  dotNET|  2014|   22500|
+--------+------+--------+