首页标签分类
04Spark
2023-01-04 · 更新 2026-04-10约 22 分钟 · 6161 字
大数据杂文记
000

目录

Spark
Spark的几种部署模式比较
RDD
RDD的基本概念
RDD的血缘关系
缓存(cache,persist),checkpoint的区别
任务划分:Application、Job、Stage、Task关系
常用的转换算子和行动算子,及其比较
共享变量
分布式只读共享变量
分布式只写共享变量
SparkSQL
hive on spark 和 spark on hive的比较
SparkSQL中常见的数据抽象及其区别
SparkSession的概念
SparkCore
RDD的序列化
Spark的Shuffle
Spark Shuffle和Hadoop Shuffle区别
Spark比MR快的原因
Repartition和Coalesce区别
Spark的内存管理
Spark的持久化保存
Spark-YARN 的 Cluster 模式的任务提交流程
spark的join选择策略
broadcast hash join
shuffle hash join
Spark Sort Merge Join
0.基本流程
1. SMJ 的触发条件:何时被选中?
2. 静态计划与动态调整
3. Merge Join 阶段的详细机制
4. SMJ 的核心优势:内存效率与稳定性
5. 终极优化:跳过 Shuffle 和 Sort
cartesian join
broadcast nested loop join
影响spark 选择join策略关键因素
Spark的通信原理
Spark的任务调度流程和YARN的资源调度
Spark优化
Hive on Spark的优化
单独对Spark的优化

Spark

Spark的几种部署模式比较

image-20250802144726609

  1. local模式:单台计算机部署,用于测试。

  2. standalone模式(单集群模式):需要单独配置Spark的集群,使用Spark自带的资源调度器,即资源和任务分配都由Spark自己负责。

  3. 集群模式

    • 存算分离,比如使用yarn,k8s ,mesos等资源调度平台结合spark使用,经典的比如 spark 和 yarn的结合

    YARN模式

    • client模式:driver执行在提交任务的本地客户端,任务提交失败,需要手动干预

      shell
      自动换行:关
      放大阅读
      展开代码
      # 提交spark作业时指定yarn模式下的部署方式 --deploy-mode client bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.3.0.jar
    • cluster模式:driver运行在spark集群中的任一节点,任务提交失败,spark会自动重试

      shell
      自动换行:关
      放大阅读
      展开代码
      # 提交spark作业时指定yarn模式下的部署方式 --deploy-mode cluster bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.3.0.jar

RDD

RDD的基本概念

  1. RDD的五大属性

    java
    自动换行:关
    放大阅读
    展开代码
    /* * Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) */
    • _一些分区的集合_:每个RDD读取数据后,会对文件进行分片,每个文件分片对应一个RDD分区。
    • _切片的计算函数_:每个分区的计算逻辑是相同的,计算逻辑即 call() 方法里的数据处理逻辑。
      java
      自动换行:关
      放大阅读
      展开代码
      // 按照制表符分割,并且取第五个元素 JavaRDD<String> productProvincesRdd = filterProductRdd.map(new Function<String, String>() { @Override public String call(String line) throws Exception { // 计算函数 return line.split("\t")[4]; } });
    • _其他RDD的血缘依赖_:RDD记录了对其他RDD的依赖关系。当RDD的分区数据丢失时,RDD会根据血缘关系重新计算以恢复数据。
    • _KV型数据的分区器_:对于处理K-V型数据的RDD,拥有自己的分区器,比如HashPartitioner(根据Key的哈希值计算数据应发送的分区)和RangePartitioner(对数据进行抽样,选出几个特定的Key作为划分分区的边界,从而确定数据应发往的分区)。
    • _记录切片位置的list_:一个记录分区内数据切片的优先位置列表。例如,如果数据存储在HDFS上,则每个RDD会记录要处理的数据块在HDFS上的位置,并遵循“移动计算而非移动数据”的原则进行任务调度。
  2. RDD的弹性体现

    • _分区的弹性_:每个RDD的分区数量是由数据切片的数量决定的,处理不同的数据时,分区会动态变化。
    • _计算的弹性_:每个RDD都记录了需要处理数据切片的位置(例如在HDFS上的位置)。在任务分配时,会尽量遵循本地化原则(移动计算而非移动数据)来分配计算任务。
    • _容错的弹性_:每个RDD都记录了对其他RDD的依赖关系(血缘)。当某个分区的数据丢失时,可以根据血缘关系自动重试任务,恢复分区数据。
    • _存储的弹性_:RDD在计算过程中,会优先使用内存来存储中间数据。当内存资源不足时,会自动将数据溢写到磁盘。数据存储位置是弹性的。
  3. Spark常见的端口号

    • 18080:Spark历史服务器的WebUI端口。

RDD的血缘关系

  • 宽依赖(Wide Dependency):同一个父RDD的Partition被多个子RDD的一个Partition使用(一对多关系)。
  • 窄依赖(Narrow Dependency):每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一或多对一关系)。

如果父RDD的每个Partition都只被一个子RDD的Partition所使用,就是窄依赖;否则就是宽依赖。

划分宽窄依赖的作用是用于判断数据流向。窄依赖必定不会产生Shuffle,而宽依赖则相反,通常意味着Shuffle的发生(父RDD的数据将被发送到不同子RDD的分区,这些分区可能分布在不同的节点上)。

缓存(cache,persist),checkpoint的区别

  1. Spark需要做数据持久化的原因

    • 一个RDD在多个Job中被重复使用。
    • 一个Job的数据处理链路特别长,如果不进行持久化,一旦发生错误,重算成本很高,容错率会降低。
  2. 缓存和Checkpoint的比较

持久化方式 使用方式 数据保存位置 RDD依赖关系是否保留
缓存 rdd.cache()rdd.persist() 内存或者磁盘
Checkpoint rdd.checkpoint() HDFS等共享存储 否(会切断血缘关系)
  • cachepersist的区别cache()persist() 的一种特殊情况,等价于 persist(StorageLevel.MEMORY_ONLY)
  • 常用的存储级别:
    • StorageLevel.MEMORY_ONLY:一般用于小数据量场景。
    • StorageLevel.MEMORY_AND_DISK:一般用于大数据量场景。

任务划分:Application、Job、Stage、Task关系

  1. 切分Stage的流程: 根据最后一个调用Action算子的RDD,从后往前沿着其依赖关系(血缘)进行回溯,一直到Job的第一个RDD。在回溯过程中,每遇到一个宽依赖,就切分出一个新的Stage。
  2. _Application数量_ = SparkContext的个数。一个main方法中通常只有一个SparkContext,即一个Application。
  3. _Job的个数_ 由Action算子(行动算子)的个数决定。每当代码执行到一个Action算子,就会触发一个Job的提交。常见的Action算子有collect()foreach()count()take()等。
  4. _Stage的个数_ = 宽依赖的个数 + 1。
  5. _Task的个数_(在一个TaskSet中)= 一个Stage中,最后一个RDD的分区数。

常用的转换算子和行动算子,及其比较

GitHub示例代码地址:GitHub相关的例子

  1. Transform算子(转换算子) 转换算子是惰性执行的,它们不会立即触发Job的执行,只负责定义RDD之间的转换关系。只有当遇到Action算子时,这些转换才会真正执行。

    plaintext
    自动换行:关
    放大阅读
    展开代码
    1. map 2. flatMap 3. reduceByKey(预聚合算子) 4. groupByKey 5. distinct
    • reduceByKeygroupByKey的比较reduceByKey相比groupByKey,在Shuffle之前会先在Map端进行预聚合(Combiner),从而减少了Shuffle传输的数据量,性能更高。应优先使用reduceByKey
    • mapflatMap的比较flatMapmap更灵活,对于输入的每个元素,map只会产生一个输出元素,而flatMap可以产生0个、1个或多个输出元素。
  2. Action算子(行动算子) 行动算子会触发Spark Job的实际计算和执行。

    plaintext
    自动换行:关
    放大阅读
    展开代码
    1. collect 2. count 3. foreach 4. foreachPartition 5. **reduce**
    • foreachforeachPartition的比较foreach会对RDD中每个分区里的每一条数据都执行一次指定的函数,函数的调用次数等于数据总数。而foreachPartition则是对RDD的每个分区执行一次指定的函数,函数的调用次数等于分区数。在需要进行数据库连接等昂贵操作时,使用foreachPartition可以显著减少连接的创建和销毁次数,提高执行效率。

共享变量

分布式只读共享变量

广播变量:默认情况下,Spark会将Driver端的数据给每个Task都发送一份,所以该数据占用的内存大小 = Task个数 * Driver数据大小。但是如果将该Driver数据广播,此时数据会广播给每个Executor,该Executor中的所有Task共用这一份数据,所以此时该数据占用的内存大小 = Executor个数 * Driver数据大小,相比之前节省了内存空间。

分布式只写共享变量

累加器:先在RDD的每个分区中进行累加,然后将各个分区的累加结果返回给Driver进行汇总。

SparkSQL

hive on sparkspark on hive的比较

  • Hive on Spark:Hive作为SQL引擎,负责SQL的解析、生成逻辑计划、优化、生成物理计划等,但将底层的计算引擎从MapReduce替换为Spark。执行任务时,Hive将物理计划转换为Spark作业来执行。(参考 03_HIVE.md 中的 "hive的Driver组成")
  • Spark on Hive:Spark SQL作为SQL引擎,负责SQL解析和执行的所有环节。Hive只负责提供元数据管理服务(Metastore),让Spark可以读写Hive表。

SparkSQL中常见的数据抽象及其区别

  1. RDD, DataFrame, DataSet

    • RDD(弹性分布式数据集)是Spark的基础数据结构,它关注于对数据的细粒度处理,提供了丰富的算子,但缺乏结构化信息。
    • DataFrame为数据引入了Schema(模式),类似于关系型数据库中的表。它将每条数据封装为Row对象,使Spark能更高效地处理结构化数据,并进行查询优化。但DataFrame是弱类型的,在编译时不做类型检查。
    • DataSet结合了RDD的类型安全和DataFrame的性能优势。它既有强类型检查(面向对象),也保留了DataFrame的非类型化操作接口,是Spark推荐使用的数据结构。

SparkSession的概念

SparkSession是Spark 2.0以后进行Spark编程的统一入口点,它整合了SparkContextSQLContextHiveContext的功能,是与Spark进行交互的起点。

SparkCore

RDD的序列化

  • RDD需要序列化的原因:Spark算子(如map)的call方法中的代码是在Executor的Task中执行的,而call方法外部的代码是在Driver中执行的。如果Task中需要引用在Driver端创建的对象,Spark会将该对象序列化后传输给Task使用。因此,这个对象及其所有字段都必须是可序列化的。
  • 如何序列化RDD:Spark默认使用Java序列化,但为了获得更好的性能,推荐使用Kryo序列化库。

Spark的Shuffle

plaintext
自动换行:关
放大阅读
展开代码
MR计算过程: 数据 -> InputFormat -> map方法 -> 环形缓冲区[分区、排序, 80%溢写] -> [Combiner] -> 磁盘[小文件] -> 合并小文件 -> Reducer拉取数据[归并排序] -> reduce方法 -> outputFormat -> 磁盘 MR shuffle阶段: -> 环形缓冲区[分区、排序, 80%溢写] -> [Combiner] -> 磁盘[小文件] -> 合并小文件 -> Reducer拉取数据[归并排序] -> Spark shuffle: -> 缓冲区[分区、[排序]] -> [Combiner] -> 磁盘[小文件] -> 合并小文件 -> 分区拉取数据[归并、[排序]] ->
  • MR的Shuffle
  • Spark的Shuffle: 在生产环境中,Spark的分区数一般设置为CPU核心数的2-3倍,以充分利用计算资源。
  1. SortShuffle

    • Shuffle后的小文件数量 = 分配给Executor的CPU核数(一个核生成一个中间文件)。
    • 使用SortShuffle时,每个Executor通过一个缓冲区处理数据,溢写到磁盘后,会合并所有产生的溢写文件,并按照分区建立索引。最终每个Task产生一个归并后的数据文件和一个索引文件。
    • 下游的子RDD向父RDD拉取数据时,会根据索引文件只拉取自己需要的分区数据。
  2. BypassShuffle 并不是所有Shuffle操作都需要排序(例如groupByKey在Shuffle阶段就不需要排序)。为了优化性能,Spark引入了BypassShuffle。

    • 使用BypassShuffle的前提
      • Shuffle任务中没有预聚合(Combiner)操作。
      • 子RDD的分区数小于一个阈值(默认为200,由spark.shuffle.sort.bypassMergeThreshold控制)。
    • 当满足条件时,Spark会跳过排序步骤,直接将每个Map Task的输出按分区写入单独的文件,从而大大缩短任务的执行时间。

drawio

Spark Shuffle和Hadoop Shuffle区别

  1. Hadoop的MapReduce Shuffle无论任务是否需要排序,都必须进行排序。而Spark的BypassShuffle在满足一定条件时可以跳过排序,提高效率。
  2. 在MapReduce中,MapTask的环形缓冲区大小是固定的。而Spark中Map阶段的缓冲区是动态的,可以根据内存使用情况进行扩容。
  3. 在Hadoop中,ReduceTask在部分MapTask完成后就可以开始启动并拉取数据。而在Spark中,必须等待前一个Stage的所有Task全部执行完成后,下一个Stage才能开始执行。

Spark比MR快的原因

  1. 内存计算:Spark优先使用内存进行数据处理和存储(默认存储级别为MEMORY_ONLY),减少了磁盘I/O,而MapReduce在任务传递过程中必须依赖磁盘进行Shuffle。
  2. 执行模型:Spark的任务执行是基于线程的(一个Executor中可以运行多个Task线程),而MapReduce的任务是基于进程的(每个MapTask或ReduceTask都是一个独立的JVM进程)。线程的创建和调度开销通常远小于进程。
  3. DAG优化:Spark使用有向无环图(DAG)来表示计算流程,可以进行更复杂的优化,例如将多个操作流水线化(Pipelining)在一个Stage中执行,避免了不必要的中间数据落地。

Repartition和Coalesce区别

  1. repartitioncoalesce都有改变分区数的功能。repartitioncoalesce的一种特殊情况,即repartition(n)等价于coalesce(n, shuffle = true)
  2. repartition通常用于增加分区数。它会进行一次完整的Shuffle,将数据随机打散并重新分布到所有分区中。
  3. coalesce通常用于减少分区数。默认情况下,它会尽量避免Shuffle,通过合并相邻的分区来实现,效率更高。如果需要将数据均匀分布,可以设置shuffle = true

Spark的内存管理

Spark 1.6版本后引入了统一内存管理(Unified Memory Management),Spark可以同时使用堆内内存(On-heap)和堆外内存(Off-heap)。

执行内存(Execution Memory)和存储内存(Storage Memory)的动态占用机制 这两块内存区域可以相互借用。当一方空闲而另一方资源不足时,空闲方可以将部分内存借给对方使用。但是,执行内存有优先权,如果存储内存借用了执行内存,当执行内存需要时,存储内存必须归还,可能会导致缓存的数据被移出内存。

Spark的持久化保存

见“缓存(cache,persist),checkpoint的区别”一节

  1. 数据持久化原因
  2. 使用方式
  3. 比较

Spark-YARN 的 Cluster 模式的任务提交流程

提交重点:

  1. _应用的提交_:客户端解析任务的提交参数。
  2. _执行环境的准备_:在ApplicationMaster中建立Driver线程,并初始化SparkContext的执行环境,包括任务的切分和调度。
  3. _任务的调度和执行_:Driver向Executor发送启动任务的命令,Executor执行具体的Task。

具体流程:

  1. 用户通过spark-submit脚本提交应用。Submit进程解析提交参数,并封装成一个YARN应用。
  2. Submit进程与YARN的ResourceManager通信,申请启动一个ApplicationMaster(AM)。
  3. ResourceManager在一个NodeManager上启动ApplicationMaster。AM内部会启动Driver线程,并初始化Spark的执行环境(如DAGScheduler、TaskScheduler)。
  4. ApplicationMaster(作为Driver的一部分)向ResourceManager申请计算资源(Container)。
  5. ResourceManager分配Container后,AM在这些Container中启动Executor进程。
  6. Executor启动后,会反向注册到Driver,并与Driver建立心跳连接。至此,资源申请和执行环境准备完成。
  7. Driver中运行用户的main方法。当遇到Action算子时,触发一个Job的提交。DAGScheduler根据RDD的宽依赖关系,将Job划分为一个或多个Stage。每个Stage被划分为一组Task(TaskSet)。
  8. TaskScheduler将TaskSet中的Task分配给可用的Executor执行,并监控任务的执行情况。
  9. 当所有Task执行完成,ApplicationMaster向ResourceManager取消注册,YARN回收所有资源,应用结束。

spark的join选择策略

spark中提供非常丰富的spark join策略来处理不同表数据量情况下的选择

  1. broadcast hash join
  2. shuffle hash join
  3. sort merge join
  4. cartesian join
  5. broadcast nested loop join

broadcast hash join

涉及广播,hash表的形成,数据间的join;使用场景为小表 join 大表等值连接场景,spark默认小表标准为10MB;driver将小表数据 广播到各个存储大表数据的executor,executor将小表数据利用哈希函数形成一张《joinkey,小表行数据》的哈希表,遍历大表每一行,从哈希表取得小表数据,哈希读取效率高

image-20250802160226763

shuffle hash join

当join的表数据都超过小表存储的定义(10MB),数据数据量适中等值连接情况,spark join将会采用 shuffle hash join;涉及过程:

  • shuffle:将两个表都按照joinkey进行shuffle,将相同关联key的数据发往同一节点的相同分区
  • hash:在相同分区内,取数据量较小的表制为hash表,遍历较大表的数据,完成join
graph TD subgraph "初始状态: 两个大表分布在不同分区" A_P1["<b>Table A - Partition 1</b><br/>(k1, valA1)<br/>(k2, valA2)"] A_P2["<b>Table A - Partition 2</b><br/>(k1, valA3)<br/>(k3, valA4)"] B_P1["<b>Table B - Partition 1</b><br/>(k1, valB1)<br/>(k3, valB2)"] B_P2["<b>Table B - Partition 2</b><br/>(k2, valB3)<br/>(k3, valB4)"] end subgraph "第一阶段: Shuffle (按 Join Key 重新分区)" direction LR Shuffle_Process["<font size=5><b>Shuffle</b></font><br/>按 Key (k1, k2, k3...)<br/>重新分发数据"] end A_P1 --> Shuffle_Process A_P2 --> Shuffle_Process B_P1 --> Shuffle_Process B_P2 --> Shuffle_Process subgraph "Shuffle 后: 相同 Key 的数据汇集到同一分区" New_P1["<b>Partition X (Executor 1)</b><br/><i>--- Data for Key k1 ---</i><br/>from A: (k1, valA1), (k1, valA3)<br/>from B: (k1, valB1)"] New_P2["<b>Partition Y (Executor 2)</b><br/><i>--- Data for Key k2 ---</i><br/>from A: (k2, valA2)<br/>from B: (k2, valB3)"] New_P3["<b>Partition Z (Executor 3)</b><br/><i>--- Data for Key k3 ---</i><br/>from A: (k3, valA4)<br/>from B: (k3, valB2), (k3, valB4)"] end Shuffle_Process --> New_P1 Shuffle_Process --> New_P2 Shuffle_Process --> New_P3 subgraph "第二阶段: Hash Join (在单个分区内执行)" subgraph "在 Partition Z 内部" Build["<b>1. 构建哈希表</b><br/>(用较小的表A的数据)"] HashTable["<b>Hash Table</b><br/>{ k3 -> (k3, valA4) }"] Build --> HashTable Probe["<b>2. 遍历大表B的数据 (Probe)</b><br/>(k3, valB2)<br/>(k3, valB4)"] Probe -- "用 Key 'k3' 查找" --> HashTable Result["<b>3. 输出Join结果</b><br/>(k3, valA4, valB2)<br/>(k3, valA4, valB4)"] HashTable --> Result end end New_P3 --> Build style A_P1 fill:#cde4ff style A_P2 fill:#cde4ff style B_P1 fill:#d5e8d4 style B_P2 fill:#d5e8d4 style New_P1 fill:#f5f5f5,stroke:#333,stroke-width:2px style New_P2 fill:#f5f5f5,stroke:#333,stroke-width:2px style New_P3 fill:#fff2cc,stroke:#d6b656,stroke-width:2px,color:#000 style Shuffle_Process fill:#dae8fc,stroke:#6c8ebf,stroke-width:2px

Spark Sort Merge Join

Sort Merge Join (SMJ) 是 Spark 中处理大表等值 Join的核心策略。当参与 Join 的两张表都过大,无法利用广播机制时,SMJ 提供了一种稳健且内存友好的解决方案。虽然它通常被视为 Spark 的默认 Join 策略(由 spark.sql.join.preferSortMergeJoin 控制),但在现代 Spark 版本中,其最终应用会受到自适应查询执行(AQE)的动态调整。

SMJ 的整个过程可以分为三个核心阶段:ShuffleSortMerge Join

0.基本流程
  • Shuffle (重分区): 对两张表的数据按照 Join Key 进行 Shuffle。此阶段的核心目标是确保所有具有相同 Key 的数据,无论它们最初位于哪个节点或分区,最终都会被发送到同一个 Executor 的同一个分区中,为后续的 Join 做好准备。这是整个流程中最昂贵的操作之一,涉及大量的网络 I/O。

  • Sort (排序): 在每个分区内,Spark 会独立地对来自两张表的数据块按照 Join Key 进行排序。排序是 SMJ 的基石,它将无序的数据转化为有序的流,使得后续的合并操作可以高效地进行。此阶段同样消耗大量的磁盘 I/O 和 CPU 资源。

  • Merge Join (合并连接): 这是 SMJ 的精髓所在。Spark 使用类似“拉链”的算法,通过两个指针分别遍历两个已排序的数据流。它逐个比较 Key,根据比较结果移动指针并生成匹配的 Join 结果。由于数据是有序的,已处理过的数据可以立即从内存中释放,这使得 SMJ 对内存的要求极低,非常适合处理海量数据。

graph TD subgraph "Phase 1: Initial State (Scattered Data)" A1["Table A<br/>Partition 1"] A2["Table A<br/>Partition 2"] B1["Table B<br/>Partition 1"] B2["Table B<br/>Partition 2"] end subgraph "Phase 2: Shuffle (Group by Join Key)" SHUFFLE["<font size=5><b>SHUFFLE</b></font><br/>Move data so same keys<br/>are in the same partition"] end A1 & A2 & B1 & B2 --> SHUFFLE subgraph "Phase 3: Sort (Inside Each Partition)" P1_unsorted["<b>Partition X (Unsorted)</b><br/>All data for Keys 1, 5, 2"] P2_unsorted["<b>Partition Y (Unsorted)</b><br/>All data for Keys 8, 4, 6"] SORT1["<font size=4><b>SORT</b></font><br/>by Join Key"] SORT2["<font size=4><b>SORT</b></font><br/>by Join Key"] P1_sorted["<b>Partition X (Sorted)</b><br/><i>Table A part:</i> [k1, k2, k5]<br/><i>Table B part:</i> [k1, k5]"] P2_sorted["<b>Partition Y (Sorted)</b><br/><i>Table A part:</i> [k4, k6]<br/><i>Table B part:</i> [k4, k6, k8]"] P1_unsorted --> SORT1 --> P1_sorted P2_unsorted --> SORT2 --> P2_sorted end SHUFFLE --> P1_unsorted SHUFFLE --> P2_unsorted subgraph "Phase 4: Merge Join (Pointer-based Iteration)" subgraph "Focus on Partition X" DataA["<b>Sorted A data</b><br/>[ (k1, vA1), (k2, vA2), (k5, vA3) ]<br><font color=blue><b>↑</b></font><br>Pointer A"] DataB["<b>Sorted B data</b><br/>[ (k1, vB1), (k5, vB2) ]<br><font color=green><b>↑</b></font><br>Pointer B"] MERGE["<b>3. Merge & Join</b><br/>Advance pointers and match keys"] DataA --> MERGE DataB --> MERGE Result["<font size=4><b>Joined Result</b></font><br/>(k1, vA1, vB1)<br/>(k5, vA3, vB2)"] MERGE --> Result end end P1_sorted --> DataA P1_sorted --> DataB %% Styling style A1 fill:#cde4ff,stroke:#6c8ebf style A2 fill:#cde4ff,stroke:#6c8ebf style B1 fill:#d5e8d4,stroke:#82b366 style B2 fill:#d5e8d4,stroke:#82b366 style SHUFFLE fill:#dae8fc,stroke:#6c8ebf,stroke-width:2px,stroke-dasharray: 5 5 style P1_unsorted fill:#f5f5f5,stroke:#333 style P2_unsorted fill:#f5f5f5,stroke:#333 style P1_sorted fill:#fff2cc,stroke:#d6b656,stroke-width:2px style P2_sorted fill:#fff2cc,stroke:#d6b656,stroke-width:2px style SORT1 fill:#e1d5e7,stroke:#9673a6 style SORT2 fill:#e1d5e7,stroke:#9673a6 style Result fill:#d5e8d4,stroke:#82b366,stroke-width:2px

1. SMJ 的触发条件:何时被选中?

SMJ 并非总是被选中,它的应用有明确的前提条件:

  • 等值 Join: Join 条件必须包含至少一个等值表达式(例如 ON a.id = b.id)。对于非等值 Join(如 a.id > b.id),Spark 只能使用 BroadcastNestedLoopJoinCartesianProduct,效率极低。
  • 无法广播: 这是关键。SMJ 是在 Broadcast Hash Join (BHJ) 无法应用时的备选方案。也就是说,当参与 Join 的两张表,没有任何一张的大小小于 spark.sql.autoBroadcastJoinThreshold(默认 10MB)时,Spark 才会优先考虑 SMJ。如果一张表可广播,Spark 会毫不犹豫地选择效率更高的 BHJ。
2. 静态计划与动态调整

虽然 spark.sql.join.preferSortMergeJoin 默认为 true,但在现代 Spark (3.0+) 中,其行为更加智能:

  • 静态计划阶段: 当 Spark 生成初始的物理执行计划时,如果判定无法使用 BHJ,它会遵循 preferSortMergeJoin 的配置,将 SMJ 作为计划的 Join 策略。
  • 动态执行阶段 (AQE): 如果开启了 Adaptive Query Execution (AQE) (spark.sql.adaptive.enabled=true,默认开启),Spark 会在运行时根据实际的数据统计信息动态调整 Join 策略。
    • 场景: 初始计划为 SMJ,因为统计信息显示表 A 和 B 都很大。
    • 动态优化: 在 Shuffle 完成后,AQE 获取到了每个分区数据的精确大小。如果发现其中一张表(比如 B)在 Shuffle 后的实际数据大小小于广播阈值,AQE 会将 SMJ 在运行时切换为 Broadcast Hash Join
    • 结论: 因此,更准确的说法是,SMJ 是 Spark 处理大表 Join 的基础和首选策略,但并非不可更改的“最终”策略。AQE 的存在使得 Spark 能够做出更优的实时决策。
3. Merge Join 阶段的详细机制

“对分区内的两张表数据进行遍历”这句话背后是一个精巧的算法。让我们用更具体的语言描述:

在 Merge 阶段,Spark 为每个分区内的两个已排序数据流各维护一个指针(可以想象成两个手指分别指着两份名单)。

  1. 初始化: 两个指针 ptrAptrB 分别指向两个已排序数据流的起始位置。
  2. 循环比较: 进入一个循环,不断比较两个指针当前位置的 Join Key (keyAkeyB):
    • keyA == keyB (匹配成功):
      • 找到了匹配项!将两行数据合并,生成一条 Join 结果。
      • 处理一对多: 此时,需要处理可能存在的“一对多”关系。Spark 会缓存 ptrB 当前指向的行,然后仅移动 ptrA,检查 A 流中是否还有相同的 keyA。只要有,就持续与缓存的 B 行进行匹配并输出结果,直到 A 流的 key 发生变化。然后,再移动 ptrB
    • keyA < keyB (A 表的 Key 较小):
      • 这意味着 A 表中当前这个 keyAB 表中不存在(因为 B 表已排序,后面的 key 只会更大)。
      • 移动 ptrA,跳过 A 表中这条不匹配的记录。
    • keyA > keyB (B 表的 Key 较小):
      • 同理,这意味着 B 表中当前这个 keyBA 表中不存在。
      • 移动 ptrB,跳过 B 表中这条不匹配的记录。
  3. 终止: 当任一指针到达其数据流的末尾时,整个 Merge 过程结束。
4. SMJ 的核心优势:内存效率与稳定性
  • 流式处理,内存友好: Merge 阶段是流式的,它不需要在内存中构建任何一张表的完整哈希表。它只需要在内存中保留当前正在处理的 Key 及其关联的少量行。一旦某个 Key 处理完毕,相关数据即可被垃圾回收。这使得 SMJ 对 Executor 内存的要求非常低,极大地降低了 Out Of Memory (OOM) 的风险。
  • 对数据倾斜有更好的容忍度: 即使某个 Key 的数据量特别大(数据倾斜),SMJ 也能稳定处理,因为它不会因为单个 Key 过大而导致内存崩溃。它只会花费更多的时间来处理这个倾斜 Key 的所有匹配项。相比之下,Hash Join 在遇到严重数据倾斜时,构建的哈希表可能会撑爆内存。
5. 终极优化:跳过 Shuffle 和 Sort

SMJ 的性能瓶颈在于 Shuffle 和 Sort。如果数据在物理存储上就已经是按照 Join Key 分区和排序的,Spark 的优化器就能识别到这一点,并完全跳过 Shuffle 和 Sort 阶段,直接执行 Merge 操作。这是 SMJ 性能的极限状态,通常通过使用 bucketBysortBy 创建 Hive 表或 Delta Lake 表来实现。

cartesian join

使用场景为等值/非等值,仅支持内连接(inner join),没有明确指明关联键;

broadcast nested loop join

与 Broadcast Hash Join 的核心区别:

  • BNLJ: 对于大表的每一行,都要遍历扫描小表,效率最低
  • BHJ: 对于大表的每一行,只需在小表构建的哈希表中进行一次 O(1) 的快速查找
graph TD subgraph "Phase 1: Initial State & Broadcast" BigTable["<b>Big Table (Distributed)</b><br/>Partition 1<br/>Partition 2<br/>..."] SmallTable["<b>Small Table</b><br/>(on Driver)"] SmallTable -- "1. Collect to Driver" --> Driver["<font size=5><b>Driver</b></font>"] Driver -- "2. Broadcast to All Executors" --> Executor1 & Executor2 end subgraph "Phase 2: Nested Loop Join (Inside Each Executor)" subgraph "Focus on Executor 1" P1["<b>Partition 1 of Big Table</b><br/>(row A1)<br/>(row A2)<br/>(row A3)<br/>..."] Broadcasted_SmallTable["<b>Broadcasted Small Table (in Memory)</b><br/>(row B1)<br/>(row B2)<br/>..."] Loop["<font size=5><b>Nested Loop</b></font><br/>For each row in Big Table,<br/>iterate through <b>ALL</b> rows in Small Table"] P1 --> Loop Broadcasted_SmallTable --> Loop Check["<b>Join Condition Check</b><br/>e.g., big.date > small.date<br/>(N x M comparisons)"] Loop --> Check Result["<font size=4><b>Joined<br/>Result</b></font>"] Check -- "If True" --> Result end Executor2["<b>Executor 2</b><br/>(Performs the same<br/>nested loop on its partition)"] end BigTable --> P1 %% Styling style BigTable fill:#cde4ff,stroke:#6c8ebf style SmallTable fill:#d5e8d4,stroke:#82b366 style Driver fill:#f8cecc,stroke:#b85450,stroke-width:2px style Executor1 fill:#fff2cc,stroke:#d6b656,stroke-width:2px style Executor2 fill:#f5f5f5,stroke:#333 style Loop fill:#f8cecc,stroke:#b85450,stroke-width:2px,stroke-dasharray: 5 5 style Check fill:#e1d5e7,stroke:#9673a6 style Result fill:#d5e8d4,stroke:#82b366,stroke-width:2px

spark的join策略选择优先级

源码注释说明:

plaintext
自动换行:关
放大阅读
展开代码
// If it is an equi-join, we first look at the join hints w.r.t. the following order: // 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides // have the broadcast hints, choose the smaller side (based on stats) to broadcast. // 2. sort merge hint: pick sort merge join if join keys are sortable. // 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both // sides have the shuffle hash hints, choose the smaller side (based on stats) as the // build side. // 4. shuffle replicate NL hint: pick cartesian product if join type is inner like. // // If there is no hint or the hints are not applicable, we follow these rules one by one: // 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type // is supported. If both sides are small, choose the smaller side (based on stats) // to broadcast. // 2. Pick shuffle hash join if one side is small enough to build local hash map, and is // much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false. // 3. Pick sort merge join if the join keys are sortable. // 4. Pick cartesian product if join type is inner like. // 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have // other choice. // ============================================================================================== // If it is not an equi-join, we first look at the join hints w.r.t. the following order: // 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast // hints, choose the smaller side (based on stats) to broadcast for inner and full joins, // choose the left side for right join, and choose right side for left join. // 2. shuffle replicate NL hint: pick cartesian product if join type is inner like. // // If there is no hint or the hints are not applicable, we follow these rules one by one: // 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left // side is broadcast-able and it's left join, or only right side is broadcast-able and // it's right join, we skip this rule. If both sides are small, broadcasts the smaller // side for inner and full joins, broadcasts the left side for right join, and broadcasts // right side for left join. // 2. Pick cartesian product if join type is inner like. // 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have // other choice. It broadcasts the smaller side for inner and full joins, broadcasts the // left side for right join, and broadcasts right side for left join.

Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join。

影响spark 选择join策略关键因素

  • 是否申明hint
  • 是否等值连接
  • join表的数据量大小

Spark的通信原理

Driver与Executor之间的通信依靠RPC(远程过程调用)实现。早期版本使用Akka框架,后期版本(2.x及以后)则完全迁移到了Netty框架。

Spark的任务调度流程和YARN的资源调度

Task调度要点:

  • 任务的切分:将不同的RDD按照宽依赖划分成Stage,每个Stage再按照分区数划分为等量的Task。
  • Task的调度和执行:将Task分配到Executor上运行。

调度细节:

  1. 用户代码在执行时遇到行动算子,会触发一个Job的提交。
  2. DAGScheduler接收到Job后,会根据RDD的血缘关系从后往前遍历,每遇到一个宽依赖就划分出一个Stage。Stage内部的任务(Task)数量由该Stage最后一个RDD的分区数决定。这些Task被打包成一个TaskSet。
  3. TaskScheduler接收到TaskSet后,通过SchedulerBackend向资源管理器(如YARN)申请资源。
  4. 在资源充足的情况下,TaskScheduler将Task发往不同的Executor执行。
  5. TaskScheduler在调度Task时,有两种策略:FIFO(先进先出)和FAIR(公平调度),默认使用FIFO。 FIFO是为了保证划分的Stage能按照依赖顺序执行。

调度的原则: 遵循数据本地化(Data Locality)的原则,尽量将计算任务分配到数据所在的节点上,以减少网络传输开销。调度优先级为:PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY

Spark优化

Hive on Spark的优化

针对Hive的优化策略((参考 03_HIVE.md 中的 "Hive框架优化"))对Hive on Spark大部分都有效。 例如,当通过Spark历史服务器发现某个任务的GC time过长时,可以调整Spark内存模型中"Other"部分的内存大小。"Other"部分内存用于存储Spark内部的元数据和用户自定义的一些数据结构。

单独对Spark的优化

通过合理设置Spark Application的运行参数,可以最大程度地提高资源利用效率。

参数配置示例

假设集群中每台节点配置为128G内存,64个CPU核心。

  1. 资源分配给YARN:通常将节点80%的资源分配给YARN。则每台节点可用资源为 128G * 0.8 ≈ 100G 内存,64 * 0.8 ≈ 48 个核心。
  2. Executor配置:每个Executor分配的core数不宜过多,一般为3-6个。这里我们设置为6个core。
  3. 计算Executor数量:每台NodeManager可以启动 48 / 6 = 8 个Executor。
  4. 计算Executor内存:每个Executor可分配的内存为 100G / 8 ≈ 12G
  5. 总Executor数:假设集群共有7台NodeManager,则总共可启动 7 * 8 = 56 个Executor。

最终配置参数:

bash
自动换行:关
放大阅读
展开代码
--num-executors 56 --executor-cores 6 --executor-memory 12G --driver-cores 6 --driver-memory 12G

一些其他常用运行参数:

  • --master:指定将任务提交到哪个资源调度器(如yarn)。
  • --class:指定待运行的带有main方法的全类名。
  • --deploy-mode:指定YARN作为资源调度器时的部署模式(client/cluster)。
  • --queue:指定任务提交到YARN的哪个资源队列中。

本文作者:hedeoer

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!