Spark SQL 原理
Spark SQL的发展历程
为了给熟悉的 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具,Hive 应运而生,他是当时唯一运行在 Hadoop 上的SQL-On-Hadoop 工具。 但是 MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O,降低的运行效率,为了提高 SQL 的执行效率,大量的 SQL-On-Hadoop工具开始产生,而 Shark 是其中一个表现较为突出的项目。
Shark是伯克利实验室 Spark 生态环境的组件之一,它主要修改了内存管理,物理计划和执行三个模块,值得它能运行在 Spark 的引擎上,从而提高 SQL 查询的效率。
但是随着 Spark 的发展,Shark 对 Hive 过多的依赖制约了 Spark 的设计理念和各个组件之间的相互继承,所以 Spark 团队停止了对 Shark 的开发,提出了 SparkSQL 项目。 因为摆脱了Hive 的过度依赖,Spark SQL在数据兼容性,性能优化和组件扩展等各个方面都得到了极大的方便和发展。
提出了 SparkSQL 项目之后,SQL On Spark 发展出了两条支线,SparkSQL 和Hive on Spark。
Spark SQL 解析流程
Spark SQL 对 SQL 语句的处理和关系型数据库类似,即词法/语法解析,绑定,优化,执行。 Spark SQL 会先将 SQL 语句解析成一棵树,然后使用各种规则对 Tree 进行绑定和优化等处理。
使用SessionCatalog保存元数据
在解析 SQL 语句之前,会创建 SparkSession,在 Spark2。0版本之后,SparkSession封装了 SparkContext 和 SQLContext 创建,也不再区分 SQLContext 和 HiveContext。
涉及到诸如表名,字段名称和字段类型等元数据都会保存在 SessionCatalog 中。 此外,创建临时表或者试图的过程,其实就会往 SessionCatalog 注册。
解析 SQL: 使用 ANTLR 生成未解析的逻辑计划
只要是在数据库类型的技术里面,比如传统的 MySQL,Oracle 或者现在大数据领域的Hive。 它的基本的 SQL 执行的模型,都是类似的,首先都是要生成一条 SQL 语句的执行计划。
当调用 SparkSession 的 sql 或者 SQLContext 的 sql 方法时,就会使用 SparkSqlParser 进行解析 SQL。 使用的 ANTLR 进行词法解析和语法解析。 它分为词法分析和构建分析树或者语法树 AST 2个步骤来生成UnresolvedLogicalPlan。
使用分析器 Analyzer 解析逻辑计划
在该阶段,Analyzer 会使用 Analyzer Rules,并结合 SessionCatalog,对未解析的逻辑计划进行解析,生成已解析的逻辑计划。
使用优化器 Optimizer 优化逻辑计划
优化器也是会定义一套 Rules,利用这些 Rules 对逻辑计划和 Expression 进行迭代处理,从而使得树的节点进行优化。 在传统的 Oracle 等数据库中,通常都会生成多个执行计划,然后根据优化器针对多个计划选择一个最好的计划。 而SparkSQL中,是对一个已生成的执行进行优化生成一个新的执行计划。
比如,一个 SQL 语句
1 |
|
此时,在执行计划解析出来的时候。 其实就是按照sql 原封不动的样子来解析成可以执行的计划的。 但是Optimizer会对执行计划进行优化。 在这个例子中的 where 条件是可以放入子查询中的,这样子查询的数据量大大变小,可以优化执行速度。 相应的执行计划就会变为
1 |
|
使用SparkPlanner生成物理计划
SparkPlanner使用 Planning Strategies,对优化后的逻辑计划进行转换,生成可以执行的物理计划SparkPlan。
生成的物理计划区别于逻辑计划的是执行计划在这里已经非常清晰了。 从哪个文件读取什么数据,从哪里读取数据,如何操作等等都已经明确了。
使用 QueryExecution 执行物理计划
此时调用 SparkPlan 的 execute 方法,底层其实已经在触发创建并执行 Job 了,然后返回相应结果。
三 SparkSQL中物理计划的执行
Spark 的根基: RDD
与许多专有的大数据处理平台不同,Spark 建立在统一抽象的 RDD 之上,使得它可以以基本一致的方式应对不同的大数据处理场景。
也就是说,要理解SparkSQL,首先要理解 SparkRDD。
RDD的全称是 Resilient Distributed Datasets,是一个容错的,并行的数据结构,可以让用户显示的将数据存储到磁盘和内存中。 同时,RDD 还提供了一组丰富的操作来操作这些数据。 在这些操作中,诸如 map,flatMap,filter 等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作(注意,reduceByKey是action,而非transformation),以支持常见的数据运算。
RDD提供了两方面的特性persistence和patitioning,用户可以通过persist与patitionBy函数来控制RDD的这两个方面。RDD的分区特性与并行计算能力(RDD定义了parallerize函数),使得Spark可以更好地利用可伸缩的硬件资源。若将分区与持久化二者结合起来,就能更加高效地处理海量数据。
既然 RDD 是 spark job 的根基,所有 Spark 的计算都是基于 RDD 的相应Transformation 和 Action 操作。 Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。 Spark提供了可以读取 Hadoop 文件的接口,对于外部存储创建而言,hadoopRDD 函数是最常用的在 Spark 中创建 RDD 的接口。 也就是说用户可以用这个接口读取集群中的数据并转换成相应的 RDD,然后接入 spark 计算引擎中。
从物理执行计划到 RDD
经过 Spark 的 Catalyst 解析优化器对于 sql 的分析之后,已经生成了相应的物理执行计划,而对于已经可以开始执行的 SparkSQL的 Physical Plan,其执行目标就是调用 action 算子之后的 RDD。
SparkPlan主要包含四种操作类型:
- BasicOperator
- Join
- Aggregate
- Sort
对于 SparkSQL 的物理执行计划,基本可以通过这四种操作类型来完成。 Spark 会以这四种操作为基础创建出相应的 task来分别执行。 这四种操作类型的详细介绍这里不做过多阐述。
SparkSQL中任务的执行模式
在介绍了 SparkSQL 如何生成相应的物理执行计划以及物理执行计划是如何执行之后,SparkSQL 任务就进入了具体的执行阶段。 SparkSQL 具体任务的执行方式和普通 Spark 任务的执行方式并无差别。 Spark 任务运行时的角色可以分为三种: driver,executor(worker) 以及cluster manager。
Spark中的driver其实和yarn中Application Master的功能相类似。主要完成任务的调度以及和executor和cluster manager进行协调。
在YARN中,每个Application实例都有一个Application Master进程,它是Application启动的第一个容器。它负责和ResourceManager打交道,并请求资源。获取资源之后告诉NodeManager为其启动container。如果对 MapReduce 比较熟悉的话,一定不会对 Applicaiton Master 陌生。 在 Spark on Yarn 中,任务的执行模式可以分为 yarn-client 和 yarn-cluster 两种。
从深层次的含义讲,yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行。然而yarn-cluster模式不适合运行交互类型的作业。而yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开。看下下面的两幅图应该会明白(上图是yarn-cluster模式,下图是yarn-client模式):
SparkSQL 中任务的具体执行
了解了 Spark 任务的执行模式之后,相应的 job 还不能马上开始运行。 这是因为一个完整的执行计划可能比较复杂,完整的数据量也比较大,直接完整执行不太现实。 通常情况下,一个分布式计算引擎都会将一个完整 job 进行切割分批运行,spark 也是这么做的。
在 Spark 中,task 是一个 Job 进行切割后运行的最小运算单元,一般情况下,一个 rdd 有多少个 partition,就会有多少个 task,因为每一个 task 只是处理一个 partition 上的数据。 当在YARN上运行Spark作业,每个Spark executor作为一个YARN容器(container)也就是 executor运行。Spark可以使得多个Tasks在同一个容器(container)里面运行。这种情况在 spark 中通过核数来控制,这也是一个与 MapReduce 较大不同的地方。
而 task 进行组合分批后,通常称为 stage。 也就是说一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage。Stage的划分在RDD的论文中有详细的介绍,简单的说是以shuffle和result这两种类型来划分。在Spark中有两类task,一类是shuffleMapTask,一类是resultTask,第一类task的输出是shuffle所需数据,第二类task的输出是result,stage的划分也以此为依据,shuffle之前的所有变换是一个stage,shuffle之后的操作是另一个stage。
Spark 会为不同的 stage 以及不同的 task设好前后依赖,来保证整个 job 运行的正确性和完整性。
当最后一个resultTask 的上游 task 全部运行完之后即可开始运行,该resultTask 的结束也意味着 job 的成功运行。
至此,一个 SparkSQL 的任务从解析到生成逻辑计划,生成物理执行计划,再到具体执行返回结果的完整运行原理介绍完毕。