hive 的客户端
元数据服务
默认derby,derby不与其他客户端共享数据。所以一次只能有一个客户端在使用,如果开了另一个客户端就会连接不上。故选用其他的数据库提供元数据存储,比如Mysql
driver(将Hsql转化为MR程序并执行) 整体上使用Antlr的分析器,将SQL转化为job
内部表和外部表比较:
distribute by cast(rand() * 100 as int)sql自动换行:关放大阅读展开代码select last_day('2023-01-03'); -- 2023-01-31 select last_day('2023-01-03','MO'); -- 2023-01-09 select unix_timestamp('2023-01-03','yyyy-MM-dd'); -- 1672704000 秒 -- from_utc_timestamp(ms ,regexp) select date_format(from_utc_timestamp(1672704000000,'Asia/Shanghai'),'yyyy-MM-dd'); -- 2023-01-03
java自动换行:关放大阅读展开代码import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import java.text.DecimalFormat; public class IntToPrecent extends GenericUDF { // 处理逻辑 public String intToPrecent(String i1, String i2){ int i = Integer.parseInt(i1); int j = Integer.parseInt(i2); double result = (double)i/j; DecimalFormat df = new DecimalFormat("0%"); return df.format(result); } public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException { // 判断传入的参数个数 if(objectInspectors.length != 2){ throw new UDFArgumentLengthException("Input Args Length Error !!!"); } // 判断传入参数的类型 if (!objectInspectors[0].getCategory().equals(ObjectInspector.Category.PRIMITIVE) || !PrimitiveObjectInspector.PrimitiveCategory.INT.equals(((PrimitiveObjectInspector)objectInspectors[0]).getPrimitiveCategory())){ throw new UDFArgumentException("函数第一个参数为int类型"); // 当自定义UDF参数与预期不符时,抛出异常 } if (!objectInspectors[1].getCategory().equals(ObjectInspector.Category.PRIMITIVE) || !PrimitiveObjectInspector.PrimitiveCategory.INT.equals(((PrimitiveObjectInspector)objectInspectors[1]).getPrimitiveCategory())){ throw new UDFArgumentException("函数第二个参数为int类型"); } return PrimitiveObjectInspectorFactory.javaStringObjectInspector; } public String evaluate(DeferredObject[] deferredObjects) throws HiveException { String num1 = deferredObjects[0].get().toString(); String num2 = deferredObjects[1].get().toString(); return intToPrecent(num1,num2); } public String getDisplayString(String[] strings) { // 生成HQL explain子句中显示的日志 return strings[0]; } }
sql自动换行:关放大阅读展开代码-- hdfs的路径不存在需要提前创建 /user/hive/warehouse/udf/ hadoop fs -mkdir /user/hive/warehouse/udf/ -- 上传jar至hdfs hadoop fs -put hivedemo-1.0-SNAPSHOT.jar /user/hive/warehouse/udf/ -- 永久注册的方式(在那个库中注册,在那个库中使用) ADD JAR hdfs:///user/hive/warehouse/udf/hivedemo-1.0-SNAPSHOT.jar; -- hive中注册自定义的函数 create function myfunc as "cn.hedeoer.udf.IntToPrecent" using jar "hdfs:///user/hive/warehouse/hivedemo-1.0-SNAPSHOT.jar"; -- 使用自定义的函数 select myfunc('50','100'); -- 50%
udf 函数的使用范围:
sql自动换行:关放大阅读展开代码Temporary Functions 会话级别 CREATE TEMPORARY FUNCTION function_name AS class_name; permanent functions 全局使用 CREATE FUNCTION [db_name.]function_name AS class_name [USING JAR|FILE|ARCHIVE 'file_uri' [, JAR|FILE|ARCHIVE 'file_uri'] ];
plaintext自动换行:关放大阅读展开代码MySQL: A id name age 1 z\r\ns 18 HDFS: 1\tz\ts\t18 ECHO is off. 1\tz s\t18 Hive: id name age 1 zs 18 \ 1 z
hive默认列分隔符: \001 \u0001
已经出现了,怎么解决的:
用datax来采集mysql
配置文件可以指定 transformer 去掉字段里的特殊字符,比如 回车换行\r\n
sql自动换行:关放大阅读展开代码-- 统计中国每个地区的人口数量 -- 会出现数据倾斜的sql select region,count(1) from population group by region; -- skewindata手动实现 select region,sum(1) from ( select region,count(1) from population group by region,cast(rand() * 7 as int) ) tmp group by region;
需要用户自己去保证join的表数据sorted,否则可能数据不正确。
- 不是分桶表处理。左表随机,右表随机且扩容
sql自动换行:关放大阅读展开代码-- 数据倾斜之扩容表和打散 -- order_detail(大表) order_info(小表),同样适用于大表 Join 大表时,大表和小表都都不满足SMBJoin 的场景 ECHO is off. ECHO is off. -- 直接大表join小表,可能JOB无法完成 ECHO is off. select od.order_detail_id, oi.order_id, oi.create_date, oi.order_id, total_amount from order_info oi join order_detail od on oi.order_id = od.order_id; ECHO is off. select od.order_detail_id, oi.order_id, oi.create_date, oi.order_id, total_amount from order_detail od -- 将较小的表order_info,增加字段new_id,并使用Union all 上下连接扩容后的表。右表随机且扩容 join (select order_id, concat(order_id, '-', 0) new_id, user_id, create_date, total_amount from order_info o ECHO is off. union all ECHO is off. select order_id, concat(order_id, '-', 1) new_id, user_id, create_date, total_amount from order_info o ECHO is off. union all ECHO is off. select order_id, concat(order_id, '-', 2) new_id, user_id, create_date, total_amount from order_info o ) oi -- 此时order_detail(大表)与小表order_info扩容后的新表的连接条件转化 -- 但需要保证通过扩容打散后join的最终结果一致,左表随机 on oi.new_id = concat(od.order_id, '-', `floor`(rand() * 3));
join 造成倾斜
dwd层页面浏览事务事实表
业务过程 度量值
页面浏览 一次,页面浏览持续时间,页面浏览所属的会话
ods层日志增量表(大表) left join ods层省份表(小表),
^9b4ed2
java自动换行:关放大阅读展开代码set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
java自动换行:关放大阅读展开代码//设置map端输出进行合并,默认为true set hive.merge.mapfiles = true //设置reduce端输出进行合并,默认为false set hive.merge.mapredfiles = true //设置合并文件的大小 set hive.merge.size.per.task = 256*1000*1000 //当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。 set hive.merge.smallfiles.avgsize=16000000
sql自动换行:关放大阅读展开代码-- bigtale 大概有一百万条记录 select * from bigtable tablesample(bucket 1 out of 4 on id);
2. 分桶抽样,对于hive的分桶表来说,可以使用分桶抽样功能大表抽样,
select * from 分桶表 tablesample(bucket x out of y on 分桶字段);
对于读多写少的场景,适用列式存储,比如orc,parquent,列式存储结构较行式存储结构更加紧凑,压缩后更小
orc 和 parquet格式选择?
sql自动换行:关放大阅读展开代码--不同的sql 跑不同的引擎 --准备好各个引擎的环境:上传jar包、修改配置..... --执行sql前使用hive的会话设置命令 set 配置执行引擎 set hive.execution.engine=mr; -- sql1 set hive.execution.engine=spark; -- sql2
spark比mr快的原因:
提前使用where进行行过滤
避免使用全字段查询
这个属性是默认开启的,它可以自动优化HQL中多个Join的顺序,并选择合适的Join算法
RBO和CBO
2.1 RBO 基于规则的优化,或者说是基于经验的优化,但往往sql执行的效果可能不会太理想
2.2 CBO 基于成本(cost)的优化,比如合理调整join的顺序
将SQL语句中的where谓词逻辑都尽可能提前执行,减少下游处理的数据量
reduce 数 = 输入reduce的数据量 / 每个reduce处理的默认数据量 256M
plaintext自动换行:关放大阅读展开代码Hive中对某些情况的查询可以不必使用MapReduce计算,比如一些简单的过滤和字段查询
sql自动换行:关放大阅读展开代码-- 开启以下参数 set hive.fetch.task.conversion = more -- 以下的sql均不走mr hive (default)> set hive.fetch.task.conversion=more; hive (default)> select * from emp; hive (default)> select ename from emp; hive (default)> select ename from emp limit 3;
sql自动换行:关放大阅读展开代码-- 1. 服务器启动 服务 python -m SimpleHTTPServer 端口 -- 2. 浏览器访问 端口 -- 3. 获取hsql的执行计划 explain formatted sql;
大致效果:
[hive官方有关文档说明(https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C+Grouping+and+Rollup)
sql自动换行:关放大阅读展开代码create temporary table dwd_learn_play_inc ( mid_id string comment '手机唯一编号', province_id string comment '省份id', brand string comment '手机品牌', is_new string comment '是否新用户', model string comment '手机型号', os string comment '手机品牌', session_id string comment '会话id', user_id string comment '用户id', version_code string comment '版本号', source_id string comment '数据来源', video_id string comment '视频id', video_name string comment '视频名称', chapter_id string comment '章节id', chapter_name string comment '章节名称', course_id string comment '课程id', course_name string comment '课程名称', play_sec bigint comment '播放时长', ts bigint comment '跳入时间', dt string ); alter table dwd_learn_play_inc set tblproperties ('comment' = '学习域播放事务事实表'); ## 等效写法====================================================================== select course_id,chapter_id,video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id,video_id grouping sets ((course_id,chapter_id,video_id), (course_id, chapter_id), (course_id)); select course_id,chapter_id,video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id,video_id union all select course_id,chapter_id, null video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id union all select course_id, null chapter_id, null video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id;
结果一致:
sql自动换行:关放大阅读展开代码select course_id,chapter_id,video_id, sum(play_sec) play_sec_sum from dwd_learn_play_inc where dt = '2022-02-21' and course_id = '126' group by course_id,chapter_id,video_id with rollup ;
sql自动换行:关放大阅读展开代码## GROUP BY a, b, c WITH CUBE is equivalent to ## GROUP BY a, b, c GROUPING SETS ( (a, b, c), (a, b), (b, c), (a, c), (a), (b), (c), ( )).
sql自动换行:关放大阅读展开代码drop table if exists sales_data; CREATE TABLE sales_data ( region STRING, product STRING, month STRING, sales_amount DOUBLE ); INSERT INTO sales_data VALUES -- ('North', 'ProductA', '2024-01', 1000), ('North', null, '2024-01', 1500); select *, row_number() over (order by region desc,product desc,month desc) from ( SELECT region, product, month, GROUPING__ID, SUM(sales_amount) AS total_sales FROM sales_data GROUP BY region, product, month with rollup) t1
sql自动换行:关放大阅读展开代码0.插入数据 INSERT INTO sales_data VALUES('North', null, '2024-01', 1500); 1.维度上卷计算 group by region, product, month with rollup 2. 其中produce存在null的情况,三个维度排列组合 +--------+---------+-------+-------------+ | region | product | month | grouping_id | +--------+---------+-------+-------------+ | 0 | 0 | 0 | 0 | | 0 | 0 | 1 | 1 | | 0 | 1 | 1 | 3 | | 1 | 1 | 1 | 7 | +--------+---------+-------+-------------+ 3. 规律 如果该列参与维度统计,则置为有效1,否则为0;之后每种组合的2进制数转化为10进制 4. 可以使用grouping(字段名)的方式查看该位是否参与维度计算
sql自动换行:关放大阅读展开代码-- tablesmaple -- 相当于每行数据生成一个随机数,然后分成32bucket,取第三桶 SELECT * FROM source TABLESAMPLE(BUCKET 3 OUT OF 32 ON rand()) s; -- 设置采样种子参数,可以保证每次采样的结果一致 set hive.sample.seednumber=1; -- 按照province_id字段将表数据分成3bucket,取第一桶的数据 select * from dwd_learn_play_inc tablesample (bucket 1 out of 3 on province_id) ; -- blocksample -- 指定采样的行数 select * from dwd_learn_play_inc tablesample (10 rows) ; -- 指定采样的数据量,和hdfs的文件块大小有关,比如hdfs的文件块大小为128M,那么采样的数据量设置为100M,依旧会放回一个文件块(128M)大小的数据 select * from dwd_learn_play_inc tablesample (10M) ;
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!