建站教程

建站教程

Products

当前位置:首页 > 建站教程 >

Spark—15分钟教程

GG网络技术分享 2025-03-18 16:09 0


正如在我几乎所有关于这个工具的文章中都写到,Spark和SQL一样非常容易使用。但不管我花多少时间写代码,我只是无法在我的大脑中永久性地存储Spark API(有人会说我的记忆就像RAM一样,小而易失)。

无论你是想快速入门介绍sparksql,还是急于编写你的程序,还是像我一样需要一份备忘单,我相信你会发现这篇文章很有用。

这篇文章的目的是介绍sparksql的所有主要函数/特性,在片段中,你将始终看到原始的SQL查询及其在PySpark中的翻译。

在几个月前,我为另一篇文章创建了这个数据集,它由三个简单的表组成:

基础知识

Apache Spark是一个用于大规模并行数据处理的引擎。这个框架的一个令人惊奇的特性是它以多种语言公开api:我通常使用Scala与它交互,但是也可以使用SQL、Python甚至Java和R。

当我们编写Spark程序时,首先要知道的是,当我们执行代码时,我们不一定要对数据执行任何操作。实际上,该工具有两种类型的API调用:转换和操作。

Spark转换背后的范例被称为延后计算,这意味着实际的数据计算在我们要求采取行动之前不会开始。

为了理解这一概念,设想一下你需要对一个列执行SELECT和重命名的情况:如果不调用某个操作(例如collect或count),那么你的代码只不过是定义了所谓的Spark执行计划。

Spark以有向无环图(非常著名的DAG)组织执行计划。此结构描述将要执行的确切操作,并使调度器能够决定在给定时间执行哪个任务。

正如Miyagi先生告诉我们的:

  1. 上蜡:定义DAG(变换)
  2. 脱蜡:执行DAG(动作)

与Spark交互

太好了,我们从哪里开始交互?使用Spark有多种方法:

  • 使用IDE:我建议使用IntelliJ或PyCharm,但我想你可以选择任何你想要的东西。查看附录中的PyCharm快速入门(在本地运行查询)。我认为可以从你的本地环境使用远程Spark executor,但说实话,我从来没有进行过这种配置。
  • Jupyter Notebooks Sparkmagic:Sparkmagic是一组工具,用于通过Spark REST服务器Livy与远程Spark集群交互工作[1]。这是在AWS、Azure或googlecloud等云系统上工作时使用Spark的主要方式。大多数云提供商都有一项服务,可以在大约10分钟内配置集群和notebooks 。
  • 通过使用spark shell的终端:有时你不希望在你和数据之间有任何东西(例如,对一个表进行超级快速的检查);在这种情况下,你只需打开一个终端并启动spark shell。

文章的代码主要用于IDE。

在编写任何查询之前,我们需要导入一些库并启动一个Spark会话(使用DatasetDataFrame的API编程)。下面的PySpark和Scala代码段将加载你需要的所有内容(假设你已经配置了系统)。之后,为了简单起见,我们将只看到PySpark代码。除了一些细微差别外,scalaapi基本相同。

PySpark

导入Sparkimportpysparkfrompyspark.sqlimportSparkSessionfrompyspark.sql.functionsimport*初始化Spark会话spark = SparkSession.builder \

.master("local") \

.appName("SparkLikeABoss") \

.getOrCreate()

Scala

//  导入Sparkimportorg.apache.spark.sql._importorg.apache.spark.sql.functions._//  初始化Spark会话val spark =SparkSession.builder.

master("local")

.appName("spark session example")

.getOrCreate()

解释数据集、数据帧和RDD之间的差异篇幅将过长,所以我跳过这一部分,假装它不存在。

基本操作

你能写的最简单的查询可能是你所用过的最重要的查询。让我们看看如何使用Sales表进行基本操作。

简单的Select语句和显示数据

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT *

FROM sales_table执行计划sales_table_execution_plan = sales_table.select(col("*"))Show (Action) - 显示5行,列宽不受限制sales_table_execution_plan.show(5,True)

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT order_id AS the_order_id,

seller_id AS the_seller_id,

num_pieces_sold AS the_number_of_pieces_sold

FROM sales_table以一行代码执行计划和显示出来sales_table_execution_plan = sales_table.select(

col("order_id").alias("the_order_id"),

col("seller_id").alias("the_seller_id"),

col("num_pieces_sold").alias("the_number_of_pieces_sold")

).show(5,True)

我们在代码片段中所做的第一件事是定义执行计划;只有当我们获得show操作时,才会执行该计划。

我们可以在Spark计划中调用的其他操作示例包括:

  • collect()—返回整个数据集
  • count()—返回行数
  • take(n)-从数据集中返回n行
  • show(n,truncate=False)-显示n行。你可以决定截断结果或显示字段的所有长度

另一个值得注意的有趣的事情是列是由col对象标识的。在本例中,我们让Spark推断这些列属于哪个数据帧。

我们可以使用语法execution_plan_variable[column_name]来指定列来自哪个执行计划。使用此替代语法,我们可以得到:

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT order_id AS the_order_id,

seller_id AS the_seller_id,

num_pieces_sold AS the_number_of_pieces_sold

FROM sales_table以一行代码执行计划和显示出来sales_table_execution_plan = sales_table.select(

sales_table["order_id"].alias("the_order_id"),

sales_table["seller_id"].alias("the_seller_id"),

sales_table["num_pieces_sold"].alias("the_number_of_pieces_sold")

).show(5,True)

在处理连接时,限定字段的源表尤为重要(例如,两个表可能有两个同名字段,因此仅使用col对象不足以消除歧义)。Scala中的语法略有不同:

// Qualify the source execution plan in Scalasales_table.col("order_id")

重命名和添加列

有时我们只想重命名一个列,或者我们想添加一个新的列并进行一些计算(例如,在以下情况下):

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT order_id,

product_id,

seller_id,

date,

num_pieces_sold AS pieces,

bill_raw_text

FROM sales_table asales_table_execution_plan = sales_table. \

withColumnRenamed("num_pieces_sold","pieces")

sales_table_execution_plan.show()

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT order_id,

product_id,

seller_id,

date,

num_pieces_sold,

bill_raw_text,

num_pieces_sold % 2 AS num_pieces_sold_is_even

FROM sales_table asales_table_execution_plan = sales_table. \

withColumn("num_pieces_sold_is_even", col("num_pieces_sold")%2)

sales_table_execution_plan.show()

简单聚合

Spark支持所有主要的聚合函数。以下示例仅指简单的示例(例如平均值、总和、计数等)。稍后将介绍数组的聚合。

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT product_id,

SUM(num_pieces_sold) AS total_pieces_sold,

AVG(num_pieces_sold) AS average_pieces_sold,

MAX(num_pieces_sold) AS max_pieces_sold_of_product_in_orders,

MIN(num_pieces_sold) AS min_pieces_sold_of_product_in_orders,

COUNT(num_pieces_sold) AS num_times_product_sold

FROM sales_table

GROUP BY product_idsales_table_execution_plan = sales_table.groupBy(

col("product_id")

).agg(

sum("num_pieces_sold").alias("total_pieces_sold"),

avg("num_pieces_sold").alias("average_pieces_sold"),

max("num_pieces_sold").alias("max_pieces_sold_of_product_in_orders"),

min("num_pieces_sold").alias("min_pieces_sold_of_product_in_orders"),

count("num_pieces_sold").alias("num_times_product_sold")

)

sales_table_execution_plan.show()

显示架构

显示命令的table有点误导人;更精确的定义是显示执行计划。使用Spark API,我们可以一个接一个地传递多个操作;使用printSchema API,如果在磁盘上写入执行计划的结果,我们将输出最终表的样子。

在下面的示例中,我们重命名一些列,进行聚合,然后添加另一列。

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")-- 创建一个临时表,进行一些重命名CREATETABLEtemp_1ASSELECTseller_idASthe_seller,

num_pieces_soldASpieces,

product_idFROMsales_table;--对新表进行聚合CREATETABLEtemp_2ASSELECTproduct_id,SUM(pieces)AStotal_piecesFROMtemp_1GROUPBYproduct_id;-- 添加列SELECTa.*,1ASfake_columnFROMtemp2 a;

sales_table_execution_plan = sales_table. \

withColumnRenamed("seller_id", "the_seller"). \

withColumnRenamed("num_pieces_sold", "pieces").\

groupBy(

col("product_id")

).agg(

sum("pieces").alias("total_pieces")

).withColumn("fake_column", lit(1))输出 Schemasales_table_execution_plan.printSchema()

printSchema的输出是:

root|--product_id:string(nullable=true)|--total_pieces:double(nullable=true)|--fake_column:integer(nullable=false)

请注意,printSchema不会触发操作;相反,Spark会评估执行计划,以了解DAG在输出列中的位置。由于这个原因,这个操作比show快得多,show会触发DAG的执行。

解释执行计划

可以通过explain API获得有关触发操作时引擎将执行的操作的更详细的说明。在这种情况下,我们将获得Spark将执行的操作的详细说明。让我们对上一个查询调用explain:

输出 Schemasales_table_execution_plan.printSchema()
==PhysicalPlan==*(2)HashAggregate(keys=[product_id361],functions=[sum(cast(pieces379asdouble))]) -Exchangehashpartitioning(product_id361,200) -*(1)HashAggregate(keys=[product_id361],functions=[partial_sum(cast(pieces379asdouble))]) -*(1)Project[product_id361,num_pieces_sold364ASpieces379] -*(1)FileScanparquet[product_id361,num_pieces_sold364]Batched:true,Format:Parquet,Location:InMemoryFileIndex[file:/sales_parquet],PartitionFilters:[],PushedFilters:[],ReadSchema:struct

老实说,我从来没有发现explain API太有用,尤其是当DAG开始变得庞大和复杂时。在Spark UI中可以找到一个更好的视图,它公开了相同信息的图形表示。

Select Distinct

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT DISTINCT seller_id,

date

FROM sales_tablesales_table_execution_plan = sales_table.select(

col("seller_id"), col("date")

).distinct()输出 Schemasales_table_execution_plan.show()

Case When

在Spark中很好地实现了该操作(不需要特殊的udf);让我们简单地用sales_table将每一行插入到不同的bucket中,具体取决于num_pieces_selled:

以Parquet格式读取源表sales_table = spark.read.parquet("./data/sales_parquet")SELECT seller_id,

CASE WHEN num_pieces_sold

标签:

提交需求或反馈

Demand feedback