首页标签分类
02Hive
2024-06-03 · 更新 2026-03-03约 8 分钟 · 1980 字
大数据杂文记
000

目录

Hive
hive基础框架
hive内部表和外部表
hive的4个by
常用的系统函数
自定义函数
窗口函数
字段错位的问题(分隔符引起的)
数据倾斜
group by造成的数据倾斜
join造成的数据倾斜
数据倾斜 具体场景
hive的优化
小文件问题
分区,分桶
hive的数据抽样
map预聚合,mapjoin,SMBJoin
合理选择文件存储格式
使用多引擎
sql语法优化
cbo优化
谓词(where 后条件)下推
合理设置reduce的数量
Fetch抓取
explain 查看
常见语法汇总
grouping sets ,CUBES, ROLLUP GROUPING__ID
tablesample(表采样和块采样)

Hive

hive基础框架

  1. hive 的客户端

    • 使用hive 命令行客户端
    • 使用jdbc、odbc链接hive
  2. 元数据服务

默认derby,derby不与其他客户端共享数据。所以一次只能有一个客户端在使用,如果开了另一个客户端就会连接不上。故选用其他的数据库提供元数据存储,比如Mysql

  1. driver(将Hsql转化为MR程序并执行) 整体上使用Antlr的分析器,将SQL转化为job

    • 解析器 校验sql语法是否正确?校验表名,字段
    • 语义分析器 生成查询块 ,一个子查询对应一个查询块
    • 逻辑计划生成器 比如第一步做tableSacn,第二步 取那些字段,生成operator tree;逻辑执行计划优化,比如谓词下推和map端的预聚合等物理计划手段都是为了尽量合并 Operator,达到减少 MapReduce Job,减少数据传输及 shuffle 数据量;
    • 物理计划生成器,遍历 OperatorTree,翻译为 MapReduce 任务;逻辑计划优化器,比如:调整join顺序和MapJoin
    • 执行器, 提交mr 运行

hive内部表和外部表

内部表和外部表比较:

  • 主要在删除表,内部表会删除元数据和数据,外部表只会会删除元数据 选用:
  • 公司一般需用外部表,如果误删除了内部表的重要数据,需要查看hdfs是否开启了回收站的功能,否则数据无法找回
  • 个人测试一般建立内部表

hive的4个by

  • order by 不带有limit的order by ,全局只有一个reducer,在数据量巨大的情况下,任务可能缓慢
  • distribute by 数据分区,类似于mapreduce中自定义分区,比如将数据随机分配到100个reduce中执行,distribute by cast(rand() * 100 as int)
  • sort by 分区内排序,常常结合distribute by 使用
  • cluster by 当distribute by 和sort by的字段一致时,可以选用cluster by 替换,但是结果只能升序排列

常用的系统函数

  • 日期类:date_sub/date_add, date_format,next_day(取当前天所在周的下周一),last_day (当前天所在月份的最后一天),from_utc_timestamp(ms,'yyyy-MM-dd') (毫秒时间戳转成指定时区日期),unix_timestamp('','yyyy-MM-dd')(指定格式的日期字符串转成秒时间戳)
sql
自动换行:关
放大阅读
展开代码
select last_day('2023-01-03'); -- 2023-01-31 select last_day('2023-01-03''MO'); -- 2023-01-09 select unix_timestamp('2023-01-03','yyyy-MM-dd'); -- 1672704000 秒 -- from_utc_timestamp(ms ,regexp) select date_format(from_utc_timestamp(1672704000000,'Asia/Shanghai'),'yyyy-MM-dd'); -- 2023-01-03
  • 字符串类 : substring()、split、concat、concat_ws、regexp_replace()
  • nvl()
  • 分支 if 、case when
  • 一行变多行: explode(array或map)
  • 多行变一行: collect_set collect_list
  • 一列变多列: if 、case when
  • 多列变一列: concat、concat_ws \ struct、map

自定义函数

相关的代码仓库

  • udf,(User Defined Function)用户自定义函数
  • udtf(User-defined Table Generating Function)用户自定义制表函数
  • udaf(User-defined Aggregation Function)用户自定义聚合函数自定义udf函数步骤:
    • extends GenericUDF
    • 重写initialize(),校验参数个数,参数类型
    • 重写evaluate(),函数的主要功能实现
    • 重写getDisplayString(),用于explain时,是否显示执行计划
    • 打jar包,上传hdfs,在hive中注册自定义的函数
java
自动换行:关
放大阅读
展开代码
import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.text.DecimalFormat; public class IntToPrecent extends GenericUDF { // 处理逻辑 public String intToPrecent(String i1, String i2){ int i = Integer.parseInt(i1); int j = Integer.parseInt(i2); double result = (double)i/j; DecimalFormat df = new DecimalFormat("0%"); return df.format(result); } public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { // 判断传入的参数个数 if(objectInspectors.length != 2){ throw new UDFArgumentLengthException("Input Args Length Error !!!"); } // 判断传入参数的类型 if (!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE) || !PrimitiveObjectInspector.PrimitiveCategory.INT.equals(((PrimitiveObjectInspector)objectInspectors[0]).getPrimitiveCategory())){ throw new UDFArgumentException("函数第一个参数为int类型"); // 当自定义UDF参数与预期不符时,抛出异常 } if (!objectInspectors[1].getCategory().equals(ObjectInspector.Category.PRIMITIVE) || !PrimitiveObjectInspector.PrimitiveCategory.INT.equals(((PrimitiveObjectInspector)objectInspectors[1]).getPrimitiveCategory())){ throw new UDFArgumentException("函数第二个参数为int类型"); } return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } public String evaluate(DeferredObject[] deferredObjects) throws HiveException { String num1 = deferredObjects[0].get().toString(); String num2 = deferredObjects[1].get().toString(); return intToPrecent(num1,num2); } public String getDisplayString(String[] strings) { // 生成HQL explain子句中显示的日志 return strings[0]; } }
sql
自动换行:关
放大阅读
展开代码
-- hdfs的路径不存在需要提前创建 /user/hive/warehouse/udf/ hadoop fs -mkdir /user/hive/warehouse/udf/ -- 上传jar至hdfs hadoop fs -put hivedemo-1.0-SNAPSHOT.jar /user/hive/warehouse/udf/ -- 永久注册的方式(在那个库中注册,在那个库中使用) ADD JAR hdfs:///user/hive/warehouse/udf/hivedemo-1.0-SNAPSHOT.jar; -- hive中注册自定义的函数 create function myfunc as "cn.hedeoer.udf.IntToPrecent" using jar "hdfs:///user/hive/warehouse/hivedemo-1.0-SNAPSHOT.jar"; -- 使用自定义的函数 select myfunc('50','100'); -- 50%

image-20250802170345841 udf 函数的使用范围:

sql
自动换行:关
放大阅读
展开代码
Temporary Functions 会话级别 CREATE TEMPORARY FUNCTION function_name AS class_name; permanent functions 全局使用 CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];

窗口函数

  • 聚合类: sum、avg、count
  • 排名类: row_number、rank、rank_dense
  • 跨行取值: lead 、lag
  • 其他窗口分析函数: first_value、last_value
  • over(partition by xx order by xxx rows between xxx and xxx)

字段错位的问题(分隔符引起的)

plaintext
自动换行:关
放大阅读
展开代码
MySQL: A id name age 1 z\r\ns 18 HDFS: 1\tz\ts\t18 ECHO is off. 1\tz s\t18 Hive: id name age 1 zs 18 \ 1 z

hive默认列分隔符: \001 \u0001 已经出现了,怎么解决的:
用datax来采集mysql
配置文件可以指定 transformer 去掉字段里的特殊字符,比如 回车换行\r\n

数据倾斜

group by造成的数据倾斜

  1. hive.map.aggr=true参数开启预聚合,hive3.x版本默认开启,相当于map阶段的提前combiner
  2. hive.groupby.skewindata = true参数开启(底层:先加随机数 之后 将随机数去掉 再聚合),hive默认不开启,hive版本过低,没有该参数优化,手动sql实现
sql
自动换行:关
放大阅读
展开代码
-- 统计中国每个地区的人口数量 -- 会出现数据倾斜的sql select region,count(1) from population group by region; -- skewindata手动实现 select region,sum(1) from ( select region,count(1) from population group by region,cast(rand() * 7 as int) ) tmp group by region;

join造成的数据倾斜

  1. 大表join小表
    mapjoin 参数 set hive.auto.convert.join=true,直接在map端出结果,避免shuffle
  2. 大表 join 大表
    • 首先会对数据进行一个采样,判断导致数据倾斜的key
    • hive.optimize.skewjoin=true; 自动对数据量大的key单独做Mapjoin,默认 10万个相同key进入一个reduce ,进行二次聚合
    • SMBjoin(sort merge bucket join),表进行分桶,每个桶内数据要求有序,hive并不检查两个join的表是否已经做好bucket且sorted,

需要用户自己去保证join的表数据sorted,否则可能数据不正确。

  • 不是分桶表处理。左表随机,右表随机且扩容
sql
自动换行:关
放大阅读
展开代码
-- 数据倾斜之扩容表和打散 -- order_detail(大表) order_info(小表),同样适用于大表 Join 大表时,大表和小表都都不满足SMBJoin 的场景 ECHO is off. ECHO is off. -- 直接大表join小表,可能JOB无法完成 ECHO is off. select od.order_detail_id, oi.order_id, oi.create_date, oi.order_id, total_amount from order_info oi join order_detail od on oi.order_id = od.order_id; ECHO is off. select od.order_detail_id, oi.order_id, oi.create_date, oi.order_id, total_amount from order_detail od -- 将较小的表order_info,增加字段new_id,并使用Union all 上下连接扩容后的表。右表随机且扩容 join (select order_id, concat(order_id, '-', 0) new_id, user_id, create_date, total_amount from order_info o ECHO is off. union all ECHO is off. select order_id, concat(order_id, '-', 1) new_id, user_id, create_date, total_amount from order_info o ECHO is off. union all ECHO is off. select order_id, concat(order_id, '-', 2) new_id, user_id, create_date, total_amount from order_info o ) oi -- 此时order_detail(大表)与小表order_info扩容后的新表的连接条件转化 -- 但需要保证通过扩容打散后join的最终结果一致,左表随机 on oi.new_id = concat(od.order_id, '-', `floor`(rand() * 3));

数据倾斜 具体场景

join 造成倾斜
dwd层页面浏览事务事实表
业务过程 度量值
页面浏览 一次,页面浏览持续时间,页面浏览所属的会话
ods层日志增量表(大表) left join ods层省份表(小表),

hive的优化

^9b4ed2

小文件问题

  1. 产生原因
  • reduce 数量过多(无法控制)
  • 动态分区
  1. 解决办法
  • 使用hadoop archive的功能对小文件进行归档
  • 参数开启JVM重用,提高map端的效率
  • 适用combineHiveInputFormat,读取256M作为一个切片
java
自动换行:关
放大阅读
展开代码
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  • 开启map或者reduce后合并文件功能。当输出的文件平均大小大于设置merge的阈值则读取小文件进行合
java
自动换行:关
放大阅读
展开代码
//设置map端输出进行合并,默认为true set hive.merge.mapfiles = true //设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true //设置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000

分区,分桶

  • 建立分区表避免全表扫描
  • 对于大表,合理建立分桶表,在分桶后,可以结合hive提供的抽样查询,只查询指定桶的数据

hive的数据抽样

  1. 数据块抽样,tablesample(n percent) 根据hive表数据的大小按比例抽取数据,获取表的大致数据分
sql
自动换行:关
放大阅读
展开代码
-- bigtale 大概有一百万条记录 select * from bigtable tablesample(bucket 1 out of 4 on id);

2. 分桶抽样,对于hive的分桶表来说,可以使用分桶抽样功能大表抽样,select * from 分桶表 tablesample(bucket x out of y on 分桶字段);

map预聚合,mapjoin,SMBJoin

  1. map端预聚合, 减少Shuffle数据量
  2. mapJoin,对于大表 join 小表,可以避免shuffle
  3. SMBJoin,对于大表join 大表,可以合理构建分桶表,减少数据扫描

合理选择文件存储格式

对于读多写少的场景,适用列式存储,比如orc,parquent,列式存储结构较行式存储结构更加紧凑,压缩后更小
orc 和 parquet格式选择?

  • orc 为hive而生,hive兼容性更好,但对嵌套的复杂数据结构不友好,比较耗CPU,内存等资源
  • parquet 开源的格式,兼容性更好

使用多引擎

  • mr 年月时间跨度大和 数据量大 的指标
  • spark 日常的天指标
  • tez 项目没用
sql
自动换行:关
放大阅读
展开代码
--不同的sql 跑不同的引擎 --准备好各个引擎的环境:上传jar包、修改配置..... --执行sql前使用hive的会话设置命令 set 配置执行引擎 set hive.execution.engine=mr; -- sql1 set hive.execution.engine=spark; -- sql2

spark比mr快的原因:

  • spark内存利用更多(存储级别:默认 memory_only)
  • spark有DAG(有向无环图),多个stage间有血缘关系,使得数据落盘更少(窄依赖时数据不会落盘)

sql语法优化

提前使用where进行行过滤
避免使用全字段查询

cbo优化

这个属性是默认开启的,它可以自动优化HQL中多个Join的顺序,并选择合适的Join算法

RBO和CBO
2.1 RBO 基于规则的优化,或者说是基于经验的优化,但往往sql执行的效果可能不会太理想
2.2 CBO 基于成本(cost)的优化,比如合理调整join的顺序

谓词(where 后条件)下推

将SQL语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量

合理设置reduce的数量

reduce 数 = 输入reduce的数据量 / 每个reduce处理的默认数据量 256M

Fetch抓取

plaintext
自动换行:关
放大阅读
展开代码
Hive中对某些情况的查询可以不必使用MapReduce计算,比如一些简单的过滤和字段查询
sql
自动换行:关
放大阅读
展开代码
-- 开启以下参数 set hive.fetch.task.conversion = more -- 以下的sql均不走mr hive (default)> set hive.fetch.task.conversion=more; hive (default)> select * from emp; hive (default)> select ename from emp; hive (default)> select ename from emp limit 3;

explain 查看

sql
自动换行:关
放大阅读
展开代码
-- 1. 服务器启动 服务 python -m SimpleHTTPServer 端口 -- 2. 浏览器访问 端口 -- 3. 获取hsql的执行计划 explain formatted sql

大致效果:

常见语法汇总

grouping sets ,CUBES, ROLLUP GROUPING__ID

[hive官方有关文档说明(https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C+Grouping+and+Rollup)

  • grouping sets
sql
自动换行:关
放大阅读
展开代码
create temporary table dwd_learn_play_inc ( mid_id string comment '手机唯一编号', province_id string comment '省份id', brand string comment '手机品牌', is_new string comment '是否新用户', model string comment '手机型号', os string comment '手机品牌', session_id string comment '会话id', user_id string comment '用户id', version_code string comment '版本号', source_id string comment '数据来源', video_id string comment '视频id', video_name string comment '视频名称', chapter_id string comment '章节id', chapter_name string comment '章节名称', course_id string comment '课程id', course_name string comment '课程名称', play_sec bigint comment '播放时长', ts bigint comment '跳入时间', dt string ); alter table dwd_learn_play_inc set tblproperties ('comment' = '学习域播放事务事实表'); ## 等效写法====================================================================== select course_id,chapter_id,video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id,video_id grouping sets ((course_id,chapter_id,video_id), (course_id, chapter_id), (course_id)); select course_id,chapter_id,video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id,video_id union all select course_id,chapter_id, null video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id union all select course_id, null chapter_id, null video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id;

结果一致: image-20250802170623927

  • rollups
sql
自动换行:关
放大阅读
展开代码
select course_id,chapter_id,video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id,video_id with rollup ;

image-20250802170636199

  • cubes
sql
自动换行:关
放大阅读
展开代码
## GROUP BY a, b, c WITH CUBE is equivalent to ## GROUP BY a, b, c GROUPING SETS ( (a, b, c), (a, b), (b, c), (a, c), (a), (b), (c), ( )).
  • grouping__id
sql
自动换行:关
放大阅读
展开代码
drop table if exists sales_data; CREATE TABLE sales_data ( region STRING, product STRING, month STRING, sales_amount DOUBLE ); INSERT INTO sales_data VALUES -- ('North', 'ProductA', '2024-01', 1000), ('North', null, '2024-01', 1500); select *, row_number() over (order by region desc,product desc,month desc) from ( SELECT region, product, month, GROUPING__ID, SUM(sales_amount) AS total_sales FROM sales_data GROUP BY region, product, month with rollup) t1

image-20250802170654857

  • grouping__id 的计算逻辑
sql
自动换行:关
放大阅读
展开代码
0.插入数据 INSERT INTO sales_data VALUES('North', null, '2024-01', 1500); 1.维度上卷计算 group by region, product, month with rollup 2. 其中produce存在null的情况,三个维度排列组合 +--------+---------+-------+-------------+ | region | product | month | grouping_id | +--------+---------+-------+-------------+ | 0 | 0 | 0 | 0 | | 0 | 0 | 1 | 1 | | 0 | 1 | 1 | 3 | | 1 | 1 | 1 | 7 | +--------+---------+-------+-------------+ 3. 规律 如果该列参与维度统计,则置为有效1,否则为0;之后每种组合的2进制数转化为10进制 4. 可以使用grouping(字段名)的方式查看该位是否参与维度计算

tablesample(表采样和块采样)

hive文档tablesample介绍

sql
自动换行:关
放大阅读
展开代码
-- tablesmaple -- 相当于每行数据生成一个随机数,然后分成32bucket,取第三桶 SELECT * FROM source TABLESAMPLE(BUCKET 3 OUT OF 32 ON rand()) s; -- 设置采样种子参数,可以保证每次采样的结果一致 set hive.sample.seednumber=1; -- 按照province_id字段将表数据分成3bucket,取第一桶的数据 select * from dwd_learn_play_inc tablesample (bucket 1 out of 3 on province_id) ; -- blocksample -- 指定采样的行数 select * from dwd_learn_play_inc tablesample (10 rows) ; -- 指定采样的数据量,和hdfs的文件块大小有关,比如hdfs的文件块大小为128M,那么采样的数据量设置为100M,依旧会放回一个文件块(128M)大小的数据 select * from dwd_learn_play_inc tablesample (10M) ;

本文作者:hedeoer

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!