local模式:单台计算机部署,用于测试。
standalone模式(单集群模式):需要单独配置Spark的集群,使用Spark自带的资源调度器,即资源和任务分配都由Spark自己负责。
集群模式
YARN模式:
client模式:driver执行在提交任务的本地客户端,任务提交失败,需要手动干预
shell自动换行:关放大阅读展开代码# 提交spark作业时指定yarn模式下的部署方式 --deploy-mode client bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.12-3.3.0.jar
cluster模式:driver运行在spark集群中的任一节点,任务提交失败,spark会自动重试
shell自动换行:关放大阅读展开代码# 提交spark作业时指定yarn模式下的部署方式 --deploy-mode cluster bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ ./examples/jars/spark-examples_2.12-3.3.0.jar
RDD的五大属性
java自动换行:关放大阅读展开代码/* * Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) */
_一些分区的集合_:每个RDD读取数据后,会对文件进行分片,每个文件分片对应一个RDD分区。_切片的计算函数_:每个分区的计算逻辑是相同的,计算逻辑即 call() 方法里的数据处理逻辑。
java自动换行:关放大阅读展开代码// 按照制表符分割,并且取第五个元素 JavaRDD<String> productProvincesRdd = filterProductRdd.map(new Function<String, String>() { @Override public String call(String line) throws Exception { // 计算函数 return line.split("\t")[4]; } });
_其他RDD的血缘依赖_:RDD记录了对其他RDD的依赖关系。当RDD的分区数据丢失时,RDD会根据血缘关系重新计算以恢复数据。_KV型数据的分区器_:对于处理K-V型数据的RDD,拥有自己的分区器,比如HashPartitioner(根据Key的哈希值计算数据应发送的分区)和RangePartitioner(对数据进行抽样,选出几个特定的Key作为划分分区的边界,从而确定数据应发往的分区)。_记录切片位置的list_:一个记录分区内数据切片的优先位置列表。例如,如果数据存储在HDFS上,则每个RDD会记录要处理的数据块在HDFS上的位置,并遵循“移动计算而非移动数据”的原则进行任务调度。RDD的弹性体现
_分区的弹性_:每个RDD的分区数量是由数据切片的数量决定的,处理不同的数据时,分区会动态变化。_计算的弹性_:每个RDD都记录了需要处理数据切片的位置(例如在HDFS上的位置)。在任务分配时,会尽量遵循本地化原则(移动计算而非移动数据)来分配计算任务。_容错的弹性_:每个RDD都记录了对其他RDD的依赖关系(血缘)。当某个分区的数据丢失时,可以根据血缘关系自动重试任务,恢复分区数据。_存储的弹性_:RDD在计算过程中,会优先使用内存来存储中间数据。当内存资源不足时,会自动将数据溢写到磁盘。数据存储位置是弹性的。Spark常见的端口号
18080:Spark历史服务器的WebUI端口。如果父RDD的每个Partition都只被一个子RDD的Partition所使用,就是窄依赖;否则就是宽依赖。
划分宽窄依赖的作用是用于判断数据流向。窄依赖必定不会产生Shuffle,而宽依赖则相反,通常意味着Shuffle的发生(父RDD的数据将被发送到不同子RDD的分区,这些分区可能分布在不同的节点上)。
Spark需要做数据持久化的原因
缓存和Checkpoint的比较
| 持久化方式 | 使用方式 | 数据保存位置 | RDD依赖关系是否保留 |
|---|---|---|---|
| 缓存 | rdd.cache()或rdd.persist() |
内存或者磁盘 | 是 |
| Checkpoint | rdd.checkpoint() |
HDFS等共享存储 | 否(会切断血缘关系) |
cache和persist的区别:
cache() 是 persist() 的一种特殊情况,等价于 persist(StorageLevel.MEMORY_ONLY)。StorageLevel.MEMORY_ONLY:一般用于小数据量场景。StorageLevel.MEMORY_AND_DISK:一般用于大数据量场景。_Application数量_ = SparkContext的个数。一个main方法中通常只有一个SparkContext,即一个Application。_Job的个数_ 由Action算子(行动算子)的个数决定。每当代码执行到一个Action算子,就会触发一个Job的提交。常见的Action算子有collect()、foreach()、count()、take()等。_Stage的个数_ = 宽依赖的个数 + 1。_Task的个数_(在一个TaskSet中)= 一个Stage中,最后一个RDD的分区数。GitHub示例代码地址:GitHub相关的例子
Transform算子(转换算子) 转换算子是惰性执行的,它们不会立即触发Job的执行,只负责定义RDD之间的转换关系。只有当遇到Action算子时,这些转换才会真正执行。
plaintext自动换行:关放大阅读展开代码1. map 2. flatMap 3. reduceByKey(预聚合算子) 4. groupByKey 5. distinct
reduceByKey和groupByKey的比较:reduceByKey相比groupByKey,在Shuffle之前会先在Map端进行预聚合(Combiner),从而减少了Shuffle传输的数据量,性能更高。应优先使用reduceByKey。map和flatMap的比较:flatMap比map更灵活,对于输入的每个元素,map只会产生一个输出元素,而flatMap可以产生0个、1个或多个输出元素。Action算子(行动算子) 行动算子会触发Spark Job的实际计算和执行。
plaintext自动换行:关放大阅读展开代码1. collect 2. count 3. foreach 4. foreachPartition 5. **reduce**
foreach和foreachPartition的比较:foreach会对RDD中每个分区里的每一条数据都执行一次指定的函数,函数的调用次数等于数据总数。而foreachPartition则是对RDD的每个分区执行一次指定的函数,函数的调用次数等于分区数。在需要进行数据库连接等昂贵操作时,使用foreachPartition可以显著减少连接的创建和销毁次数,提高执行效率。广播变量:默认情况下,Spark会将Driver端的数据给每个Task都发送一份,所以该数据占用的内存大小 = Task个数 * Driver数据大小。但是如果将该Driver数据广播,此时数据会广播给每个Executor,该Executor中的所有Task共用这一份数据,所以此时该数据占用的内存大小 = Executor个数 * Driver数据大小,相比之前节省了内存空间。
累加器:先在RDD的每个分区中进行累加,然后将各个分区的累加结果返回给Driver进行汇总。
hive on spark 和 spark on hive的比较RDD, DataFrame, DataSet
Row对象,使Spark能更高效地处理结构化数据,并进行查询优化。但DataFrame是弱类型的,在编译时不做类型检查。SparkSession是Spark 2.0以后进行Spark编程的统一入口点,它整合了SparkContext、SQLContext和HiveContext的功能,是与Spark进行交互的起点。
map)的call方法中的代码是在Executor的Task中执行的,而call方法外部的代码是在Driver中执行的。如果Task中需要引用在Driver端创建的对象,Spark会将该对象序列化后传输给Task使用。因此,这个对象及其所有字段都必须是可序列化的。plaintext自动换行:关放大阅读展开代码MR计算过程: 数据 -> InputFormat -> map方法 -> 环形缓冲区[分区、排序, 80%溢写] -> [Combiner] -> 磁盘[小文件] -> 合并小文件 -> Reducer拉取数据[归并排序] -> reduce方法 -> outputFormat -> 磁盘 MR shuffle阶段: -> 环形缓冲区[分区、排序, 80%溢写] -> [Combiner] -> 磁盘[小文件] -> 合并小文件 -> Reducer拉取数据[归并排序] -> Spark shuffle: -> 缓冲区[分区、[排序]] -> [Combiner] -> 磁盘[小文件] -> 合并小文件 -> 分区拉取数据[归并、[排序]] ->
SortShuffle
BypassShuffle
并不是所有Shuffle操作都需要排序(例如groupByKey在Shuffle阶段就不需要排序)。为了优化性能,Spark引入了BypassShuffle。
spark.shuffle.sort.bypassMergeThreshold控制)。MEMORY_ONLY),减少了磁盘I/O,而MapReduce在任务传递过程中必须依赖磁盘进行Shuffle。repartition和coalesce都有改变分区数的功能。repartition是coalesce的一种特殊情况,即repartition(n)等价于coalesce(n, shuffle = true)。repartition通常用于增加分区数。它会进行一次完整的Shuffle,将数据随机打散并重新分布到所有分区中。coalesce通常用于减少分区数。默认情况下,它会尽量避免Shuffle,通过合并相邻的分区来实现,效率更高。如果需要将数据均匀分布,可以设置shuffle = true。Spark 1.6版本后引入了统一内存管理(Unified Memory Management),Spark可以同时使用堆内内存(On-heap)和堆外内存(Off-heap)。
执行内存(Execution Memory)和存储内存(Storage Memory)的动态占用机制 这两块内存区域可以相互借用。当一方空闲而另一方资源不足时,空闲方可以将部分内存借给对方使用。但是,执行内存有优先权,如果存储内存借用了执行内存,当执行内存需要时,存储内存必须归还,可能会导致缓存的数据被移出内存。
见“缓存(cache,persist),checkpoint的区别”一节
提交重点:
_应用的提交_:客户端解析任务的提交参数。_执行环境的准备_:在ApplicationMaster中建立Driver线程,并初始化SparkContext的执行环境,包括任务的切分和调度。_任务的调度和执行_:Driver向Executor发送启动任务的命令,Executor执行具体的Task。具体流程:
spark-submit脚本提交应用。Submit进程解析提交参数,并封装成一个YARN应用。main方法。当遇到Action算子时,触发一个Job的提交。DAGScheduler根据RDD的宽依赖关系,将Job划分为一个或多个Stage。每个Stage被划分为一组Task(TaskSet)。spark中提供非常丰富的spark join策略来处理不同表数据量情况下的选择
涉及广播,hash表的形成,数据间的join;使用场景为小表 join 大表的等值连接场景,spark默认小表标准为10MB;driver将小表数据 广播到各个存储大表数据的executor,executor将小表数据利用哈希函数形成一张《joinkey,小表行数据》的哈希表,遍历大表每一行,从哈希表取得小表数据,哈希读取效率高
当join的表数据都超过小表存储的定义(10MB),数据数据量适中的等值连接情况,spark join将会采用 shuffle hash join;涉及过程:
Sort Merge Join (SMJ) 是 Spark 中处理大表等值 Join的核心策略。当参与 Join 的两张表都过大,无法利用广播机制时,SMJ 提供了一种稳健且内存友好的解决方案。虽然它通常被视为 Spark 的默认 Join 策略(由 spark.sql.join.preferSortMergeJoin 控制),但在现代 Spark 版本中,其最终应用会受到自适应查询执行(AQE)的动态调整。
SMJ 的整个过程可以分为三个核心阶段:Shuffle、Sort 和 Merge Join。
Shuffle (重分区): 对两张表的数据按照 Join Key 进行 Shuffle。此阶段的核心目标是确保所有具有相同 Key 的数据,无论它们最初位于哪个节点或分区,最终都会被发送到同一个 Executor 的同一个分区中,为后续的 Join 做好准备。这是整个流程中最昂贵的操作之一,涉及大量的网络 I/O。
Sort (排序): 在每个分区内,Spark 会独立地对来自两张表的数据块按照 Join Key 进行排序。排序是 SMJ 的基石,它将无序的数据转化为有序的流,使得后续的合并操作可以高效地进行。此阶段同样消耗大量的磁盘 I/O 和 CPU 资源。
Merge Join (合并连接): 这是 SMJ 的精髓所在。Spark 使用类似“拉链”的算法,通过两个指针分别遍历两个已排序的数据流。它逐个比较 Key,根据比较结果移动指针并生成匹配的 Join 结果。由于数据是有序的,已处理过的数据可以立即从内存中释放,这使得 SMJ 对内存的要求极低,非常适合处理海量数据。
SMJ 并非总是被选中,它的应用有明确的前提条件:
ON a.id = b.id)。对于非等值 Join(如 a.id > b.id),Spark 只能使用 BroadcastNestedLoopJoin 或 CartesianProduct,效率极低。spark.sql.autoBroadcastJoinThreshold(默认 10MB)时,Spark 才会优先考虑 SMJ。如果一张表可广播,Spark 会毫不犹豫地选择效率更高的 BHJ。虽然 spark.sql.join.preferSortMergeJoin 默认为 true,但在现代 Spark (3.0+) 中,其行为更加智能:
preferSortMergeJoin 的配置,将 SMJ 作为计划的 Join 策略。spark.sql.adaptive.enabled=true,默认开启),Spark 会在运行时根据实际的数据统计信息动态调整 Join 策略。
“对分区内的两张表数据进行遍历”这句话背后是一个精巧的算法。让我们用更具体的语言描述:
在 Merge 阶段,Spark 为每个分区内的两个已排序数据流各维护一个指针(可以想象成两个手指分别指着两份名单)。
ptrA 和 ptrB 分别指向两个已排序数据流的起始位置。keyA 和 keyB):
keyA == keyB (匹配成功):
ptrB 当前指向的行,然后仅移动 ptrA,检查 A 流中是否还有相同的 keyA。只要有,就持续与缓存的 B 行进行匹配并输出结果,直到 A 流的 key 发生变化。然后,再移动 ptrB。keyA < keyB (A 表的 Key 较小):
A 表中当前这个 keyA 在 B 表中不存在(因为 B 表已排序,后面的 key 只会更大)。ptrA,跳过 A 表中这条不匹配的记录。keyA > keyB (B 表的 Key 较小):
B 表中当前这个 keyB 在 A 表中不存在。ptrB,跳过 B 表中这条不匹配的记录。SMJ 的性能瓶颈在于 Shuffle 和 Sort。如果数据在物理存储上就已经是按照 Join Key 分区和排序的,Spark 的优化器就能识别到这一点,并完全跳过 Shuffle 和 Sort 阶段,直接执行 Merge 操作。这是 SMJ 性能的极限状态,通常通过使用 bucketBy 和 sortBy 创建 Hive 表或 Delta Lake 表来实现。
使用场景为等值/非等值,仅支持内连接(inner join),没有明确指明关联键;
与 Broadcast Hash Join 的核心区别:
spark的join策略选择优先级:
源码注释说明:
plaintext自动换行:关放大阅读展开代码// If it is an equi-join, we first look at the join hints w.r.t. the following order: // 1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides // have the broadcast hints, choose the smaller side (based on stats) to broadcast. // 2. sort merge hint: pick sort merge join if join keys are sortable. // 3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both // sides have the shuffle hash hints, choose the smaller side (based on stats) as the // build side. // 4. shuffle replicate NL hint: pick cartesian product if join type is inner like. // // If there is no hint or the hints are not applicable, we follow these rules one by one: // 1. Pick broadcast hash join if one side is small enough to broadcast, and the join type // is supported. If both sides are small, choose the smaller side (based on stats) // to broadcast. // 2. Pick shuffle hash join if one side is small enough to build local hash map, and is // much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false. // 3. Pick sort merge join if the join keys are sortable. // 4. Pick cartesian product if join type is inner like. // 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have // other choice. // ============================================================================================== // If it is not an equi-join, we first look at the join hints w.r.t. the following order: // 1. broadcast hint: pick broadcast nested loop join. If both sides have the broadcast // hints, choose the smaller side (based on stats) to broadcast for inner and full joins, // choose the left side for right join, and choose right side for left join. // 2. shuffle replicate NL hint: pick cartesian product if join type is inner like. // // If there is no hint or the hints are not applicable, we follow these rules one by one: // 1. Pick broadcast nested loop join if one side is small enough to broadcast. If only left // side is broadcast-able and it's left join, or only right side is broadcast-able and // it's right join, we skip this rule. If both sides are small, broadcasts the smaller // side for inner and full joins, broadcasts the left side for right join, and broadcasts // right side for left join. // 2. Pick cartesian product if join type is inner like. // 3. Pick broadcast nested loop join as the final solution. It may OOM but we don't have // other choice. It broadcasts the smaller side for inner and full joins, broadcasts the // left side for right join, and broadcasts right side for left join.
Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > cartesian Join > Broadcast Nested Loop Join。
Driver与Executor之间的通信依靠RPC(远程过程调用)实现。早期版本使用Akka框架,后期版本(2.x及以后)则完全迁移到了Netty框架。
Task调度要点:
调度细节:
调度的原则:
遵循数据本地化(Data Locality)的原则,尽量将计算任务分配到数据所在的节点上,以减少网络传输开销。调度优先级为:PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY。
针对Hive的优化策略((参考 03_HIVE.md 中的 "Hive框架优化"))对Hive on Spark大部分都有效。 例如,当通过Spark历史服务器发现某个任务的GC time过长时,可以调整Spark内存模型中"Other"部分的内存大小。"Other"部分内存用于存储Spark内部的元数据和用户自定义的一些数据结构。
通过合理设置Spark Application的运行参数,可以最大程度地提高资源利用效率。
参数配置示例
假设集群中每台节点配置为128G内存,64个CPU核心。
- 资源分配给YARN:通常将节点80%的资源分配给YARN。则每台节点可用资源为
128G * 0.8 ≈ 100G内存,64 * 0.8 ≈ 48个核心。- Executor配置:每个Executor分配的core数不宜过多,一般为3-6个。这里我们设置为6个core。
- 计算Executor数量:每台NodeManager可以启动
48 / 6 = 8个Executor。- 计算Executor内存:每个Executor可分配的内存为
100G / 8 ≈ 12G。- 总Executor数:假设集群共有7台NodeManager,则总共可启动
7 * 8 = 56个Executor。最终配置参数:
bash自动换行:关放大阅读展开代码--num-executors 56 --executor-cores 6 --executor-memory 12G --driver-cores 6 --driver-memory 12G
一些其他常用运行参数:
--master:指定将任务提交到哪个资源调度器(如yarn)。--class:指定待运行的带有main方法的全类名。--deploy-mode:指定YARN作为资源调度器时的部署模式(client/cluster)。--queue:指定任务提交到YARN的哪个资源队列中。本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!