首页标签分类
实时数仓梳理
2024-06-03 · 更新 2026-03-03约 10 分钟 · 2606 字
大数据杂文记
000

目录

架构
介绍思路
01项目的背景和意义
02调研工作、
03架构设计和搭建
04数仓建模
数据域的划分
指标分析
构建业务总线矩阵(***)
dim层(雪花模型)
dwd层
dws层
05数仓各层的工作
ods
dim
dwd
dws
数据的去重(upsertkafka造成的问题)
维度信息的补充
选用clickhouse的原因
写数据到clickhouse
06项目中遇到的问题
业务理解参差导致的flink 反压
flink数据一致性问题
新老客户纠正造成的大状态问题
clickhouse too many parts异常
07对实时数仓的思考
08实时数仓的监控工具

架构

介绍思路

  1. 项目的背景和意义
  2. 前期的调研
  3. 实时架构的设计和搭建
  4. 数仓建模
  5. 数仓每层的工作
  6. 项目遇到的问题
  7. 自己对于数仓的思考和优化点
  8. 实时数仓的监控工具

01项目的背景和意义

随着公司对于指标的时效性要求越来越高,永辉离线分析平台的不足以支撑公司的需求,随着数据技术的发展,实时指标需求只会越来越多;并且在搭建离线分析平台的时候,后期实时平台也是考虑在内的,符合预期的发展

02调研工作、

  1. 业务调研
  2. 数据调研
  3. 需求调研

03架构设计和搭建

  1. 采集搭建
    复用了离线数仓的采集,数据来源有业务数据和日志数据
  2. 架构

04数仓建模

数据域的划分

  1. 交易域,用户域,流量域,交易域,工具域,互动域
    ...

指标分析

  1. 原子指标
  2. 派生指标
  3. 衍生指标

构建业务总线矩阵(***)

dim层(雪花模型)

用户维度,商品,优惠券,活动,时间,来源

  1. 主维表,相关维表
  2. 维度尽可能丰富
  3. 多值维度
  4. 多值属性
  5. 维度退化

dwd层

  1. 选择需要的业务过程
  2. 确定事实粒度
  3. 确定维度
  4. 确定事实
  5. 事实表分类
    • 事务型事实表
    • 周期快照事实表(实时项目中没有周期型的指标,没做)
    • 累积型快照事实表(累计型事实表涉及多个事实,需要Join来更新维度,比较麻烦,项目中没做)

dws层

把业务过程,和统计粒度相同的派生指标放在同一张dws表中 讲述各层时,使用如下的实例:

plaintext
自动换行:关
放大阅读
展开代码
1. 各个品牌的活动补贴率,需要每2小时更新一次 dwd 下单事实表 粒度(order_detail_id, user_id, order_id,sku_id),维度( 品牌,省份,地区等等维度),度量值(下单金额,活动减免金额) 2. 活动补贴率 = 活动优惠金额 / 品牌的订单总额 ==》 衍生指标 3. dws 每2小时各个品牌的活动优惠金额 ==》 派生指标 每2小时各个品牌的订单总额 ==》 派生指标 4. 将表转为流,写出到ck 可视化平台设置 每2小时发送一次数据请求,springboot接口这边接受请求,并拼接sql向ck查询,最后返回数据给sugar即可

05数仓各层的工作

ods

只做了数据采集,在kafka上形成两个topic,一个存储日志数据,一个存储业务数据,日志数据由flume采集,业务数据由maxwell采集(使用业务表名作为业务数据的key,写入kafka,保障每张表数据有序并处于同一个分区)

dim

常见的维度由,用户维度,商品,优惠券,活动 ,日期,省份,地区等 维度数据在业务数据topic中,需要根据不同的维度进行分离,并写入hbase

  1. 维度数据存储方案的选择
    • 第一种方案:使用kafka存储,kafak不支持单点的查询,明显是不可以的
    • 第二种方案:使用flink的状态存存,在算子的open方法中每次都将维度数据预加载到状态缓存起来,这样的话,首先是资源消耗是很多的,其次的话存储全量的维度数据,读取的时候时效性可能不太好
    • 第三种方案:使用外部系统热存储的方式,mysql,redis,hbase
  2. 维度数据的分流写入
    • flinkcdc动态捕捉配置表的变化
    • 广播流和connect为每条数据添加配置
    • 程序第一次启动带来的冷启动问题
    • 使用phoenix on hbase 写入hbase
  3. phoenix盐表的建立
    用户维度表,商品维度表使用了加盐表,需要在创建phoenix表时指定表主键(sku_id/ user_id),和分桶数
  4. 根据maxwell同步过来的数据类型(insert,update,delete),在hbase中删除表(delete),建立表(insert),删表后再建表(update)
  5. 写入数据到hbase,使用phoenix的upsert语句

dwd

主要的事实表的话,有加购,下单,取消订单,支付,退单等事实,需要将这些事实从ods层中过滤,出来并写入到kafka中特定主题中

  1. 数据加载
    从ods中过滤数据 --》 形成Flink table --》 注册 临时表--》建立关联表和dwd层kafka的topci关联
    --> 写数据到关联表 --》 即向kafka特定topic写数据
  2. 一些事实表的动态分流(将ods层的log数据分流成不同的Map)
    不可能每个事实表都需要一个主程序,使用了同dim层类似的分流机制,
    ①在外部配置表中定义需要建立的事实表,使用flinkcdc读取,形成配置流,并进行广播
    ②connect来自ods的数据流,为每条数据加上配置,之后写入kafka
  3. ods层log日志的事实分流
java
自动换行:关
放大阅读
展开代码
// ods_log的简单清洗 过滤出json格式 SingleOutputStreamOperator<JSONObject> etledStream = etl(stream); ECHO is off. // 新老用户数据的纠错,修改 is_new 字段 SingleOutputStreamOperator<JSONObject> validatedStream = validateOldAndNew(etledStream); // 将用户日志数据分流成不同的5中数据,使用测流输出 // Map<String, DataStream<JSONObject>> /* DataStream<JSONObject> displayStream = splitedStream.getSideOutput(displayTag); DataStream<JSONObject> actionStream = splitedStream.getSideOutput(actionTag); DataStream<JSONObject> errStream = splitedStream.getSideOutput(errTag); DataStream<JSONObject> pageStream = splitedStream.getSideOutput(pageTag); storeSplitSteam.put(DISPLAY,displayStream); storeSplitSteam.put(ACTION,actionStream); storeSplitSteam.put(ERR,errStream); storeSplitSteam.put(PAGE,pageStream); storeSplitSteam.put(START,splitedStream); */ Map<String, DataStream<JSONObject>> splitedStream = splitStream(validatedStream); ECHO is off. // 数据写出到kafka // 获取map中的不同测流,并写入到kafka writeToKafka(splitedStream);
  1. 日志数据中新老客户的纠正
  2. 事实表中不可避免的使用join来进行一些维度的整合
    • api window join (不能限制了窗口的范围,迟到数据join不上),interval join(只能拿到join上的数据,join上才会有输出)。
    • sql regular join(需要设置ttl) , temporary join,lookup join(进行事实表维度退化时使用,把维度数据缓存在状态中,每次都取缓存中最新的数据),等等。
    • 比如在下单事务型事实表构建时,需要考虑维度退化,把订单来源退还到事实表中,就用了lookup join
    • 比如在在构建这个支付成功事务事实表时,需要将订单明细,订单表,订单优惠券使用表,订单活动券参与表进行join,支付事实join 订单表需要考虑网络的延时(5seconds)和业务上的延时(15mins),使用了interval join;而订单表join优惠券使用表的话,使用regular join 为left join,但写入kafka时需要考虑数据更新的问题。
    • 比如,flink sql写数据到kafka提供了两种类型的kafka连接器,一种是普通的kafka连接器,一种是upsertkafka连接器,区别在于后者用于处理存在数据更新时的kafka写入,同时需要指定每条数据的主键(order_detail_id),此时删除的数据,在kafka中key为order_detail_id,value为数据,并且为null,当数据更新,会出现更新前和更新后相同id,不容value的kafak消息,这样的话,dws在读取dwd的订单明细的话会根据需要去重。

dws

存储轻度汇总的数据,比如现有一指标为每个2小时统计各个品牌的活动补贴率,那么dws就可以建立各个品牌,sku粒度的5秒滚动汇总表,记录每5秒各个品牌下的sku的订单总额和活动优惠金额,此外还可以加入商品维度的其他属性,使用dws表的复用

数据的去重(upsertkafka造成的问题)

读取dwd的下单事实表时,下单事实数据加载时使用了left join,当时使用的是upsertkafka写入kafka,就会出现删除数据和这个更新的数据,那么使用clickhouse时,却使用的是普通的kafkasource,此时问题:

  1. kafka中删除的数据如何处理?
    自定kafka的反序列化器,过滤掉标识为null的删除数据,否则会出现空指针
  2. 更新的订单数据如何处理?
    项目使用的抵消的方式,处理update时,产生的重复数据
  • 按照order_detail_id分组
  • 使用valueState记录前一个相同订单明细的数据,具有相同order_detail_id的数据进入相同process
  • 抵消订单金额,优惠减免金额等,并直接collect输出,失效性最高
  • clickhouse使用replacingmergertree,具有去重功能,实现最终的去重

维度信息的补充

  1. 热缓存redis
    • 选择redis的原因,对比flink的状态
    • 选用的redis的数据类型,对比redis各个数据类型的特点
    • 使用redis的缓存更新问题
  2. flink异步编程方式实现维度数据读取
    由于是与flink之外的外部系统hbase交互,导致flink的算子会处于阻塞状态,等待外部系统给予响应,这样就降低了flink程序的吞吐能力,因此flink的提供的异步编程的算子,这些算子封装了线程池,和回调函数的功能,用户仅需实现特点的接口即可。
    比如dws层需要读取hbase的维度数据进行补充维度,那么需要与hbase频繁的交汇,就需要异步的i/o

选用clickhouse的原因

  1. 列式存储,适合按列查询相比行式存储更快
  2. 向量化的查询,并且支持语句级别的多线程查询
  3. 不依赖hadoop的生态,自己管资源,减少了资源申请带来的一些开销
  4. 稀疏索引和二级索引,很适合构建数亿行的大表存储查询
  5. 并发性不高,参数配置:并发读写量100 ——》 300,使用攒批写入clickhouse

写数据到clickhouse

flink官方目前没有提供clickhouse 的连接器,可以使用Flink提供了JDBC连接器写入数据到clickhose

java
自动换行:关
放大阅读
展开代码
JdbcSink.sink(querySql, new JdbcStatementBuilder<T>() { @Override public void accept(PreparedStatement ps, T t) throws SQLException { ECHO is off. // 为占位符赋值 try { Field[] fields = bean.getDeclaredFields(); // 变量k保证每个需要sink的field准确对应赋值 for (int i = 0, k = 1; i < fields.length; i++) { Field field = fields[i]; ECHO is off. if (field.getAnnotation(NoSink.class) == null) { field.setAccessible(true); Object columnValue = field.get(t); ps.setObject(k++,columnValue); } } } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }, new JdbcExecutionOptions.Builder() .withBatchIntervalMs(5000) .withBatchSize(1024 * 1024) .withMaxRetries(3) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withPassword(password) .withUsername(userName) .withUrl(url) .withDriverName(driver) .build() );

06项目中遇到的问题

省略

flink数据一致性问题

省略

新老客户纠正造成的大状态问题

省略

clickhouse too many parts异常

  1. 原因:clickhouse每次写入数据都会产生一个临时分区,项目中使用的mergetree引擎,之后,会对临时分区和主分区做一个merge,当短时间内多次写入一条或者少量的数据,就会产生大量的临时分区,就会会merge和查询造成压力,因此clickhouse内部对data parts的做了许多的限制
  2. 解决方法
    写入clickhouse的时候使用攒批次写入,使用jdbcsink实现攒批次,每1M一个批次,批次间隔的设置为5seconds

07对实时数仓的思考

  1. 当flink写数据到kafka时,可以考虑进行flink的双写,比如写入ods层,同时写一份数据到外部的持久化设备,这样数据容错就高了
  2. 业务数据的入仓的话,目前是复用了离线数仓的采集链路,比如业务数据使用maxwell实时采集到kafka,
  3. 我们的实时数仓没有进行数据的持久化(不主动提)
    kafka的数据时效性只有3天,假设现在需要7天以前的数据,就要进行数据的重放,日志数据是以文件保留,可以重新读取,但业务数据如何根据mysql的binlog来回放数据的话,那就太麻烦了,

08实时数仓的监控工具

grafana + prometheus

本文作者:hedeoer

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!