[2]:
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("error")

第6章:旧SQL、新技巧 - 在PySpark上运行SQL#

简介#

本节将介绍如何在PySpark中使用Spark SQL API,并将其与DataFrame API进行比较。此外,还将涵盖如何在两个API之间无缝切换,以及一些实用的技巧。

使用PySpark运行SQL#

PySpark提供了两种主要方式来执行SQL操作

使用spark.sql()#

spark.sql() 函数允许您直接执行SQL查询。

[10]:
# Create a table via spark.sql()
spark.sql("DROP TABLE IF EXISTS people")
spark.sql("""
CREATE TABLE people USING PARQUET
AS SELECT * FROM VALUES (1, 'Alice', 10), (2, 'Bob', 20), (3, 'Charlie', 30) t(id, name, age)
""")
[10]:
DataFrame[]
[11]:
# Use spark.sql() to select data from a table
spark.sql("SELECT name, age FROM people WHERE age > 21").show()
+-------+---+
|   name|age|
+-------+---+
|Charlie| 30|
+-------+---+

使用PySpark DataFrame API#

PySpark DataFrame API提供了与SQL等效的功能,但采用了Pythonic的方法。

[12]:
# Read a table using the DataFrame API
people_df = spark.read.table("people")

# Use DataFrame API to select data
people_df.select("name", "age").filter("age > 21").show()
+-------+---+
|   name|age|
+-------+---+
|Charlie| 30|
+-------+---+

PySpark中的SQL与DataFrame API#

何时使用哪个API取决于您的背景和具体任务

SQL API: - 非常适合具有SQL背景,更习惯编写SQL查询的用户。

DataFrame API: - Python开发人员更倾向于使用,因为它符合Python的语法和习惯用法。 - 为复杂的转换提供了更大的灵活性,尤其是使用用户定义函数 (UDF) 时。

代码示例:SQL与DataFrame API#

以下是一些示例,比较了使用SQL API和PySpark的DataFrame API执行常见任务的方式,以帮助您了解它们的差异以及何时选择哪种API更合适。

示例:SELECT和FILTER操作#

SQL API

[15]:
spark.sql("SELECT name FROM people WHERE age > 21").show()
+-------+
|   name|
+-------+
|Charlie|
+-------+

DataFrame API

[16]:
spark.read.table("people").select("name").filter("age > 21").show()
+-------+
|   name|
+-------+
|Charlie|
+-------+

示例:JOIN操作#

[18]:
spark.sql("DROP TABLE IF EXISTS orders")
spark.sql("""
CREATE TABLE orders USING PARQUET
AS SELECT * FROM VALUES (101, 1, 200), (102, 2, 150), (103,3, 300) t(order_id, customer_id, amount)
""")
[18]:
DataFrame[]

SQL API

[19]:
spark.sql("""
SELECT p.name, o.order_id
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+--------+
|   name|order_id|
+-------+--------+
|Charlie|     103|
|  Alice|     101|
|    Bob|     102|
+-------+--------+

DataFrame API

[20]:
people_df = spark.read.table("people")
orders_df = spark.read.table("orders")
(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .select(people_df.name, orders_df.order_id)
        .show()
)
+-------+--------+
|   name|order_id|
+-------+--------+
|Charlie|     103|
|  Alice|     101|
|    Bob|     102|
+-------+--------+

示例:GROUP BY和聚合操作#

SQL API

[21]:
spark.sql("""
SELECT p.name, SUM(o.amount) AS total_amount
FROM people p
JOIN orders o ON p.id = o.customer_id
GROUP BY p.name
""").show()
+-------+------------+
|   name|total_amount|
+-------+------------+
|Charlie|         300|
|  Alice|         200|
|    Bob|         150|
+-------+------------+

DataFrame API

[22]:
from pyspark.sql.functions import sum

(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .groupBy("name")
        .agg(sum("amount").alias("total_amount"))
        .show()
)
+-------+------------+
|   name|total_amount|
+-------+------------+
|Charlie|         300|
|  Alice|         200|
|    Bob|         150|
+-------+------------+

示例:窗口操作#

SQL API

[23]:
spark.sql("""
SELECT
    p.name,
    o.amount,
    RANK() OVER (PARTITION BY p.name ORDER BY o.amount DESC) AS rank
FROM people p
JOIN orders o ON p.id = o.customer_id
""").show()
+-------+------+----+
|   name|amount|rank|
+-------+------+----+
|  Alice|   200|   1|
|    Bob|   150|   1|
|Charlie|   300|   1|
+-------+------+----+

DataFrame API

[24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Define the window specification
window_spec = Window.partitionBy("name").orderBy(orders_df.amount.desc())

# Window operation with RANK
(
    people_df
        .join(orders_df, people_df.id == orders_df.customer_id)
        .withColumn("rank", rank().over(window_spec))
        .select("name", "amount", "rank")
        .show()
)
+-------+------+----+
|   name|amount|rank|
+-------+------+----+
|  Alice|   200|   1|
|    Bob|   150|   1|
|Charlie|   300|   1|
+-------+------+----+

示例:UNION操作#

SQL API: - `UNION` 操作符默认情况下会合并来自两个查询的行并删除重复项。

[25]:
spark.sql("CREATE OR REPLACE TEMP VIEW people2 AS SELECT * FROM VALUES (1, 'Alice', 10), (4, 'David', 35) t(id, name, age)")
[25]:
DataFrame[]
[26]:
spark.sql("""
SELECT * FROM people
UNION
SELECT * FROM people2
""").show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
|  1|  Alice| 10|
|  2|    Bob| 20|
|  4|  David| 35|
+---+-------+---+

DataFrame API: - `union()` 方法用于合并两个DataFrame,但默认情况下不删除重复项。 - 为了匹配SQL的UNION行为,我们在union操作后使用`.dropDuplicates()` 方法来消除重复项。

[27]:
people_df = spark.read.table("people")
people2_df = spark.read.table("people2")
# This will have duplicate values.
people_df.union(people2_df).show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
|  1|  Alice| 10|
|  2|    Bob| 20|
|  1|  Alice| 10|
|  4|  David| 35|
+---+-------+---+

[28]:
# Remove duplicate values
people_df.union(people2_df).dropDuplicates().show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
|  1|  Alice| 10|
|  2|    Bob| 20|
|  4|  David| 35|
+---+-------+---+

示例:SET配置#

SQL API

[29]:
spark.sql("SET spark.sql.shuffle.partitions=8")
[29]:
DataFrame[key: string, value: string]
[30]:
spark.sql("SET spark.sql.shuffle.partitions").show(truncate=False)
+----------------------------+-----+
|key                         |value|
+----------------------------+-----+
|spark.sql.shuffle.partitions|8    |
+----------------------------+-----+

DataFrame API

[31]:
spark.conf.set("spark.sql.shuffle.partitions", 10)
[32]:
spark.conf.get("spark.sql.shuffle.partitions")
[32]:
'10'

示例:列出表和视图#

SQL API

[33]:
spark.sql("SHOW TABLES").show()
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|   orders|      false|
|  default|   people|      false|
|         |  people2|       true|
+---------+---------+-----------+

DataFrame API

[34]:
tables = spark.catalog.listTables()
for table in tables:
    print(f"Name: {table.name}, isTemporary: {table.isTemporary}")
Name: orders, isTemporary: False
Name: people, isTemporary: False
Name: people2, isTemporary: True

DataFrame API专属函数#

某些操作是DataFrame API独有的,SQL不支持,例如

withColumn:向DataFrame添加或修改列。

[35]:
people_df.withColumn("new_col", people_df["age"] + 10).show()
+---+-------+---+-------+
| id|   name|age|new_col|
+---+-------+---+-------+
|  3|Charlie| 30|     40|
|  1|  Alice| 10|     20|
|  2|    Bob| 20|     30|
+---+-------+---+-------+

[39]:
people_df.withColumn("age", people_df["age"] + 10).show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 40|
|  1|  Alice| 20|
|  2|    Bob| 30|
+---+-------+---+

交替使用SQL和DataFrame API#

PySpark支持在SQL和DataFrame API之间切换,可以轻松地混合使用。

在SQL输出上链式调用DataFrame操作#

PySpark的DataFrame API允许您将多个操作链接在一起,以创建高效且易于阅读的转换。

[36]:
# Chaining DataFrame operations on SQL results
spark.sql("SELECT name, age FROM people").filter("age > 21").show()
+-------+---+
|   name|age|
+-------+---+
|Charlie| 30|
+-------+---+

使用selectExpr()#

selectExpr() 方法允许您在DataFrame API中运行SQL表达式。

[37]:
people_df.selectExpr("name", "age + 1 AS age_plus_one").show()
+-------+------------+
|   name|age_plus_one|
+-------+------------+
|Charlie|          31|
|  Alice|          11|
|    Bob|          21|
+-------+------------+

在SQL中查询DataFrame#

您可以从DataFrame创建一个临时视图,并在其上运行SQL查询。

[38]:
# First create a temp view on top of the DataFrame.
people_df.createOrReplaceTempView("people_view")

# Then it can be referenced in SQL.
spark.sql("SELECT * FROM people_view WHERE age > 21").show()
+---+-------+---+
| id|   name|age|
+---+-------+---+
|  3|Charlie| 30|
+---+-------+---+

在SQL中使用Python用户定义函数#

您可以在SQL查询中注册Python用户定义函数 (UDF),从而在SQL语法内实现自定义转换。

[41]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF
@udf("string")
def uppercase_name(name):
    return name.upper()

# Register the UDF
spark.udf.register("uppercase_name", uppercase_name)

# Use it in SQL
spark.sql("SELECT name, uppercase_name(name) FROM people_view WHERE age > 21").show()
+-------+--------------------+
|   name|uppercase_name(name)|
+-------+--------------------+
|Charlie|             CHARLIE|
+-------+--------------------+

[ ]: