一个Join就是一个mapreduce任务,但当多个join的字段间相同时,会使用一个mr任务就完成多个join任务。
sql自动换行:关放大阅读展开代码select from t1 on t1.order_id = t2.order_id on t3.order_id = t2.order_id -- 此时就只会有一个mr任务!!!
Hive关于mapjoin的使用条件说明 在map端就完成了所有任务,避免了shuffle,效率大大提升
sql自动换行:关放大阅读展开代码-- 开启map join set hive.auto.convert.join=true; -- 小表大小设置 --设置为 25MB set hive.mapjoin.smalltable.filesize=25000000; -- 无条件开启conditionaltask set hive.auto.convert.join.noconditionaltask=true; -- 设置conditionaltask的大小为10MB set hive.auto.convert.join.noconditionaltask.size=10000000;
sql自动换行:关放大阅读展开代码SELECT /*+ MAPJOIN(a) MAPJOIN(b) */ * FROM table_a a JOIN table_b b ON a.key = b.key; -- 需要开启的参数 /* 1. hive.auto.convert.join:设置为 true,开启自动连接转换功能。 2. hive.optimize.sort.dynamic.partition:设置为 true,允许动态分区优化。 3. hive.optimize.sort.dynamic.partition.sortby:设置为 true,开启基于排序的动态分区优化。 4. hive.optimize.sortmerge.join:设置为 true,开启 Sorted Merge Join 优化。 5. hive.exec.parallel:设置为 true,开启并行执行。 */
适用于连接键基数较高的情况,这时候可以使用桶的元数据信息来直接跳过不匹配的行,从而提高查询性能。Bucket Map Join 在数据倾斜的情况下也表现良好,因为桶可以将数据均匀分布在不同的节点上,避免单节点负载过大的问题
sql自动换行:关放大阅读展开代码--关闭cbo优化,cbo会导致hint信息被忽略 set hive.cbo.enable=false; --map join hint默认会被忽略(因为已经过时),需将如下参数设置为false set hive.ignore.mapjoin.hint=false; --启用bucket map join优化功能 set hive.optimize.bucketmapjoin = true;
sql自动换行:关放大阅读展开代码--启动Sort Merge Bucket Map Join优化 hive.optimize.bucketmapjoin.sortedmerge=true; --使用自动转换SMB Join set hive.auto.convert.sortmerge.join=true;
sql自动换行:关放大阅读展开代码-- 统计中国每个地区的人口数量 -- 会出现数据倾斜的sql select region,count(1) from population group by region;
**set hive.map.aggr=true;**sql自动换行:关放大阅读展开代码-- 预聚合的条件 set hive.map.aggr.hash.min.reduction=0.5; --用于检测源表是否适合map-side聚合的条数。 set hive.groupby.mapaggr.checkinterval=100000; --map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
**set hive.groupby.skewindata=true;**hive.groupby.skewindata=truehive.groupby.skewindata=true 参数无法开启吗?
手工实现预聚合
sql自动换行:关放大阅读展开代码-- skewindata手动实现 select region,sum(1) from ( select region,count(1) from population group by region,cast(rand() * 7 as int) ) tmp group by region;
sql自动换行:关放大阅读展开代码select od.order_detail_id, oi.order_id, oi.create_date, oi.order_id, total_amount from order_info oi join order_detail od on oi.order_id = od.order_id;
**set hive.auto.convert.join=true;** ,其他相关参数见官网**set hive.optimize.skewjoin=true;** ,sql自动换行:关放大阅读展开代码--- 使用扩容表方式join select od.order_detail_id, oi.order_id, oi.create_date, oi.order_id, total_amount from order_detail od -- 将较小的表order_info,增加字段new_id,并使用Union all 上下连接扩容后的表。右表随机且扩容 join (select order_id, concat(order_id, '-', 0) new_id, user_id, create_date, total_amount from order_info o union all select order_id, concat(order_id, '-', 1) new_id, user_id, create_date, total_amount from order_info o union all select order_id, concat(order_id, '-', 2) new_id, user_id, create_date, total_amount from order_info o ) oi -- 此时order_detail(大表)与小表order_info扩容后的新表的连接条件转化 -- 但需要保证通过扩容打散后join的最终结果一致,左表随机 on oi.new_id = concat(od.order_id, '-', `floor`(rand() * 3));
测试数据:
sparksql自动换行:关放大阅读展开代码CREATE TABLE IF NOT EXISTS user_orders ( order_id STRING COMMENT '订单ID', user_id BIGINT COMMENT '用户ID', shop_account_id BIGINT COMMENT '商家店铺ID', order_amount DECIMAL(10,2) COMMENT '订单金额', order_status STRING COMMENT '订单状态', order_time STRING COMMENT '下单时间' ) COMMENT '用户订单事实测试表' PARTITIONED BY (dt STRING COMMENT '业务日期分区'); CREATE TABLE IF NOT EXISTS user_payments ( payment_id STRING COMMENT '支付流水ID', pay_order_id STRING COMMENT '支付关联业务单号', user_id BIGINT COMMENT '支付用户ID', shop_account_id BIGINT COMMENT '商家店铺ID', pay_amount DECIMAL(10,2) COMMENT '支付金额', pay_channel STRING COMMENT '支付渠道', pay_status STRING COMMENT '支付状态', pay_time STRING COMMENT '支付时间' ) COMMENT '用户支付事实测试表' PARTITIONED BY (dt STRING COMMENT '业务日期分区'); CREATE TABLE IF NOT EXISTS users ( user_id BIGINT COMMENT '用户ID', user_name STRING COMMENT '用户名', vip_level STRING COMMENT 'VIP等级' ) COMMENT '用户信息维度表'; insert overwrite table users select user_id, concat('user_', cast(user_id as string)) as user_name, case when user_id in (1,2,3,4,5,6,7,8,9,10) then 'VIP5' when user_id between 11 and 100 then 'VIP4' when user_id between 101 and 1000 then 'VIP3' when user_id between 1001 and 10000 then 'VIP2' else 'VIP1' end as vip_level from ( select explode(sequence(1, 100000)) as user_id ) t union all select 0 as user_id, 'user_0' as user_name, 'VIP0' as vip_level union all select -1 as user_id, 'user_-1' as user_name, 'VIP0' as vip_level union all select 999999999 as user_id, 'user_999999999' as user_name, 'VIP0' as vip_level; insert overwrite table user_orders partition (dt) select order_id, user_id, shop_account_id, order_amount, order_status, order_time, dt from ( -- 1) 普通背景订单 select concat('ord_norm_', cast(dt_id as string), '_', cast(user_id as string), '_', cast(seq_id as string)) as order_id, cast(user_id as bigint) as user_id, cast( case when pmod(user_id, 1000) = 0 then 0 when pmod(user_id, 777) = 0 then -1 when pmod(user_id, 666) = 0 then 999999999 when user_id between 201 and 500 then 10001 + pmod(user_id, 5) when user_id between 501 and 5000 then 10006 + pmod(user_id, 95) else 10101 + pmod(user_id, 9900) end as bigint ) as shop_account_id, cast((50 + pmod(user_id * 7 + seq_id, 1000)) / 10.0 as decimal(10,2)) as order_amount, case when pmod(user_id + seq_id, 10) = 0 then 'CANCEL' when pmod(user_id + seq_id, 7) = 0 then 'FINISHED' else 'CREATED' end as order_status, concat( case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end, ' ', lpad(cast(pmod(user_id + seq_id, 24) as string), 2, '0'), ':', lpad(cast(pmod(user_id * 3 + seq_id, 60) as string), 2, '0'), ':', lpad(cast(pmod(user_id * 7 + seq_id, 60) as string), 2, '0') ) as order_time, case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end as dt from ( select dt_id, user_id, seq_id from (select explode(array(1,2,3)) as dt_id) d cross join (select explode(sequence(201, 100000)) as user_id) u cross join (select explode(sequence(1, 3)) as seq_id) s ) t1 union all -- 2) 少量超热点用户订单 select concat('ord_superhot_', cast(dt_id as string), '_', cast(user_id as string), '_', cast(seq_id as string)) as order_id, cast(user_id as bigint) as user_id, cast(10001 + pmod(user_id, 5) as bigint) as shop_account_id, cast((100 + pmod(seq_id * 9, 800)) / 10.0 as decimal(10,2)) as order_amount, case when pmod(seq_id, 20) = 0 then 'CANCEL' when pmod(seq_id, 3) = 0 then 'FINISHED' else 'CREATED' end as order_status, concat( case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end, ' ', lpad(cast(pmod(seq_id, 24) as string), 2, '0'), ':', lpad(cast(pmod(seq_id * 2, 60) as string), 2, '0'), ':', lpad(cast(pmod(seq_id * 5, 60) as string), 2, '0') ) as order_time, case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end as dt from ( select dt_id, user_id, seq_id from (select explode(array(1,2,3)) as dt_id) d cross join (select explode(sequence(1, 10)) as user_id) u cross join (select explode(sequence(1, 20000)) as seq_id) s ) t2 union all -- 3) 中热点用户订单 select concat('ord_hot_', cast(dt_id as string), '_', cast(user_id as string), '_', cast(seq_id as string)) as order_id, cast(user_id as bigint) as user_id, cast(10006 + pmod(user_id, 95) as bigint) as shop_account_id, cast((80 + pmod(user_id + seq_id * 3, 600)) / 10.0 as decimal(10,2)) as order_amount, case when pmod(seq_id, 15) = 0 then 'CANCEL' when pmod(seq_id, 4) = 0 then 'FINISHED' else 'CREATED' end as order_status, concat( case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end, ' ', lpad(cast(pmod(user_id + seq_id, 24) as string), 2, '0'), ':', lpad(cast(pmod(user_id * 2 + seq_id, 60) as string), 2, '0'), ':', lpad(cast(pmod(user_id * 5 + seq_id, 60) as string), 2, '0') ) as order_time, case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end as dt from ( select dt_id, user_id, seq_id from (select explode(array(1,2,3)) as dt_id) d cross join (select explode(sequence(11, 200)) as user_id) u cross join (select explode(sequence(1, 1500)) as seq_id) s ) t3 union all -- 4) 异常用户 / 异常店铺订单 select concat('ord_invalid_', cast(dt_id as string), '_', cast(user_id as string), '_', cast(seq_id as string)) as order_id, cast(user_id as bigint) as user_id, cast(shop_account_id as bigint) as shop_account_id, cast((20 + pmod(seq_id * 4, 300)) / 10.0 as decimal(10,2)) as order_amount, 'CREATED' as order_status, concat( case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end, ' ', lpad(cast(pmod(seq_id, 24) as string), 2, '0'), ':', lpad(cast(pmod(seq_id * 2, 60) as string), 2, '0'), ':', lpad(cast(pmod(seq_id * 3, 60) as string), 2, '0') ) as order_time, case dt_id when 1 then '2025-01-01' when 2 then '2025-01-02' when 3 then '2025-01-03' end as dt from ( select dt_id, user_id, shop_account_id, seq_id from (select explode(array(1,2,3)) as dt_id) d cross join ( select cast(0 as bigint) as user_id, cast(0 as bigint) as shop_account_id union all select cast(-1 as bigint) as user_id, cast(-1 as bigint) as shop_account_id union all select cast(999999999 as bigint) as user_id, cast(999999999 as bigint) as shop_account_id ) u cross join (select explode(sequence(1, 25000)) as seq_id) s ) t4 ) all_orders; insert overwrite table user_payments partition (dt) select concat('pay_', order_id, '_', cast(pay_seq as string)) as payment_id, order_id as pay_order_id, user_id, shop_account_id, cast( case when pay_cnt = 1 then order_amount when pay_cnt = 2 and pay_seq = 1 then round(order_amount * 0.5, 2) when pay_cnt = 2 and pay_seq = 2 then order_amount - round(order_amount * 0.5, 2) when pay_cnt = 3 and pay_seq = 1 then round(order_amount * 0.4, 2) when pay_cnt = 3 and pay_seq = 2 then round(order_amount * 0.3, 2) else order_amount - round(order_amount * 0.4, 2) - round(order_amount * 0.3, 2) end as decimal(10,2) ) as pay_amount, case when pmod(abs(hash(concat(order_id, cast(pay_seq as string)))), 4) = 0 then 'ALIPAY' when pmod(abs(hash(concat(order_id, cast(pay_seq as string)))), 4) = 1 then 'WECHAT' when pmod(abs(hash(concat(order_id, cast(pay_seq as string)))), 4) = 2 then 'CARD' else 'BANK' end as pay_channel, case when pay_seq < pay_cnt then 'FAILED' else 'SUCCESS' end as pay_status, concat( substr(order_time, 1, 10), ' ', lpad(cast(pmod(abs(hash(concat(order_id, cast(pay_seq as string), '_hh'))), 24) as string), 2, '0'), ':', lpad(cast(pmod(abs(hash(concat(order_id, cast(pay_seq as string), '_mi'))), 60) as string), 2, '0'), ':', lpad(cast(pmod(abs(hash(concat(order_id, cast(pay_seq as string), '_ss'))), 60) as string), 2, '0') ) as pay_time, dt from ( select order_id, user_id, shop_account_id, order_amount, order_time, dt, pay_cnt from ( select order_id, user_id, shop_account_id, order_amount, order_time, dt, case when pmod(abs(hash(order_id)), 10) = 0 then 0 when pmod(abs(hash(order_id)), 20) = 0 then 3 when pmod(abs(hash(order_id)), 5) = 0 then 2 else 1 end as pay_cnt from user_orders where dt in ('2025-01-01', '2025-01-02', '2025-01-03') ) t where pay_cnt > 0 ) base lateral view explode(sequence(1, pay_cnt)) e as pay_seq;
数据特点: user_orders表存在user_id为null,999999999,小于0的无效记录,且这些特征的记录在与users关联时存在数据倾斜。
使用 inner join 或者 直接过滤,user_id为null,999999999,小于0的记录
sparksql自动换行:关放大阅读展开代码select count(*) from ( select t1.user_id, u.user_name, t1.order_time from user_orders t1 inner join users u on t1.user_id = u.user_id and t1.dt = '2025-01-03' ) ; select count(*) from ( select t1.user_id, u.user_name, t1.order_time from user_orders t1 left join users u on t1.user_id = u.user_id where t1.dt = '2025-01-03' and u.user_id is not null and u.user_id != 999999999 and u.user_id > 0 );
需要保留user_id为null,999999999,小于0的记录,但是user_id为null,999999999,小于0的记录join造成数据倾斜。
user_id和order_id只有不小于0的才有效,并且order_id分布均匀
sparksql自动换行:关放大阅读展开代码select count(*) from ( select t1.user_id, u.user_name, t1.order_time from user_orders t1 left join users u on if(u.user_id is null or u.user_id = 999999999 or u.user_id < 0, t1.order_id * -1, t1.user_id) = u.user_id where t1.dt = '2025-01-03' );
- 避免随机打散的值跟右表关联的上,比如右表也是正整数,那么左表随机打散不能使用正整数,避免碰撞上
- 如果表中没有分布较为均匀的key,避免直接使用rand(),因为 spark task 失败后重试会导致rand(生成跟之前不一样的数据),可以考虑使用rand(3),rand(N)进行固定种子的随机
设置小表广播的阈值,需要注意如果小表太大,反而广播可能会比shuffle更慢
sparksql自动换行:关放大阅读展开代码set spark.sql.autoBroadcastJoinThreshold = 100M; EXPLAIN EXTENDED select /*+MAPJOIN(T2)*/ t1.user_id, t2.user_name, t1.order_time from ( select user_id, order_time from user_orders where dt = '2025-01-03' ) t1 left join users t2 on t1.user_id = t2.user_id
提高shuffle并发
这个参数决定shuffle 之后,数据被分成多少份来并行处理,提高 spark.sql.shuffle.partitions 的本质,不是“消灭数据倾斜”,而是把一次 shuffle 后的数据切得更细,让每个 reduce task 处理的数据更少,从而:
但是对重度热点倾斜效果有限
sparksql自动换行:关放大阅读展开代码set spark.sql.shuffle.partitions = 400
key拆分
如果热点Key方便定位并且热Key少量,可以考虑冷热拆分和union all; 需要注意的是在 left join中,冷热key的拆分尽量拆分左表,这样union all合并时,逻辑会稍微简单一些,如果拆分右表,在left join的情况下,需要非常复杂的拆分逻辑才能实现最终的冷热优化效果,即使实现了,后续的维护逻辑成本也会非常高。
sparksql自动换行:关放大阅读展开代码-- 会发生数据倾斜的方式 select t1.user_id, t1.order_time, t2.pay_time, t2.pay_amount from user_orders t1 left join user_payments t2 on t1.order_id = t2.pay_order_id and t2.dt = '2025-01-03' where t1.dt = '2025-01-03'; -- 优化方案 with hot_keys as ( select shop_account_id from ( select shop_account_id, count( order_id) order_cnts, row_number() over (order by count( order_id) desc) order_cntNum from user_orders t1 where dt = '2025-01-03' group by t1.shop_account_id ) t2 where order_cntNum < 20 ) select hot_orders.order_id, hot_orders.order_time, t3.pay_time, t3.pay_amount from (select t1.order_id, t1.order_time from user_orders t1 inner join hot_keys t2 on t1.shop_account_id = t2.shop_account_id where t1.dt = '2025-01-03') hot_orders left join user_payments t3 on hot_orders.order_id = t3.pay_order_id and t3.dt = '2025-01-03' union all select hot_orders.order_id, hot_orders.order_time, t3.pay_time, t3.pay_amount from (select t1.order_id, t1.order_time from user_orders t1 left join hot_keys t2 on t1.shop_account_id = t2.shop_account_id where t1.dt = '2025-01-03' and t2.shop_account_id is null) hot_orders left join user_payments t3 on hot_orders.order_id = t3.pay_order_id and t3.dt = '2025-01-03'
如果单独拆分Key后,热Key依旧很多,则采用倍数扩容
sparksql自动换行:关放大阅读展开代码select t1.order_id, t1.order_time, t2.pay_time, t2.pay_amount from user_orders t1 left join user_payments t2 on t1.order_id = t2.pay_order_id and t2.dt = '2025-01-03' where t1.dt = '2025-01-03'; -- 优化 select t2.order_id, t2.order_time, t3.pay_time, t3.pay_amount from (select t1.order_id, t1.order_time, pmod(t1.shop_account_id, 10) pNum from mydb.user_orders t1 where t1.dt = '2025-01-03') t2 left join (select pay.pay_time, pay.pay_amount, pay.pay_order_id, seri.pNum from mydb.user_payments pay lateral view posexplode(split(space(10), ' ')) seri as pNum, val where pay.dt = '2025-01-03') t3 on t2.order_id = t3.pay_order_id and t2.pNum = t3.pNum;
适用于 count,sum,min,max,avg,不适合直接用于 count(distinct)和一些不可分解的聚合
count类型聚合拆解
sparksql自动换行:关放大阅读展开代码select shop_account_id, count(*) order_num from mydb.user_orders t1 where dt = '2025-01-03' group by t1.shop_account_id; select shop_account_id, sum(order_date_num) order_num from ( select shop_account_id, count(*) order_date_num from mydb.user_orders t1 where dt = '2025-01-03' group by t1.shop_account_id, to_date(t1.order_time) ) tt group by shop_account_id;
avg类型聚合拆解
sparksql自动换行:关放大阅读展开代码select t1.pay_channel, avg(t1.pay_amount) avg_amount from mydb.user_payments t1 where dt = '2025-01-03' group by t1.pay_channel; -- 优化后 select pay_channel, sum(hour_amount) / sum(hour_times) from ( select t1.pay_channel, sum(t1.pay_amount) hour_amount, count(t1.pay_amount) hour_times from mydb.user_payments t1 where dt = '2025-01-03' group by t1.pay_channel,hour(t1.pay_time) ) t2 group by pay_channel
待补充
也可使用两阶段
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!