- 项目的背景和意义
- 前期的调研
- 实时架构的设计和搭建
- 数仓建模
- 数仓每层的工作
- 项目遇到的问题
- 自己对于数仓的思考和优化点
- 实时数仓的监控工具
随着公司对于指标的时效性要求越来越高,永辉离线分析平台的不足以支撑公司的需求,随着数据技术的发展,实时指标需求只会越来越多;并且在搭建离线分析平台的时候,后期实时平台也是考虑在内的,符合预期的发展
用户维度,商品,优惠券,活动,时间,来源
把业务过程,和统计粒度相同的派生指标放在同一张dws表中 讲述各层时,使用如下的实例:
plaintext自动换行:关放大阅读展开代码1. 各个品牌的活动补贴率,需要每2小时更新一次 dwd 下单事实表 粒度(order_detail_id, user_id, order_id,sku_id),维度( 品牌,省份,地区等等维度),度量值(下单金额,活动减免金额) 2. 活动补贴率 = 活动优惠金额 / 品牌的订单总额 ==》 衍生指标 3. dws 每2小时各个品牌的活动优惠金额 ==》 派生指标 每2小时各个品牌的订单总额 ==》 派生指标 4. 将表转为流,写出到ck 可视化平台设置 每2小时发送一次数据请求,springboot接口这边接受请求,并拼接sql向ck查询,最后返回数据给sugar即可
只做了数据采集,在kafka上形成两个topic,一个存储日志数据,一个存储业务数据,日志数据由flume采集,业务数据由maxwell采集(使用业务表名作为业务数据的key,写入kafka,保障每张表数据有序并处于同一个分区)
常见的维度由,用户维度,商品,优惠券,活动 ,日期,省份,地区等 维度数据在业务数据topic中,需要根据不同的维度进行分离,并写入hbase
主要的事实表的话,有加购,下单,取消订单,支付,退单等事实,需要将这些事实从ods层中过滤,出来并写入到kafka中特定主题中
临时表--》建立关联表和dwd层kafka的topci关联java自动换行:关放大阅读展开代码// ods_log的简单清洗 过滤出json格式 SingleOutputStreamOperator<JSONObject> etledStream = etl(stream); ECHO is off. // 新老用户数据的纠错,修改 is_new 字段 SingleOutputStreamOperator<JSONObject> validatedStream = validateOldAndNew(etledStream); // 将用户日志数据分流成不同的5中数据,使用测流输出 // Map<String, DataStream<JSONObject>> /* DataStream<JSONObject> displayStream = splitedStream.getSideOutput(displayTag); DataStream<JSONObject> actionStream = splitedStream.getSideOutput(actionTag); DataStream<JSONObject> errStream = splitedStream.getSideOutput(errTag); DataStream<JSONObject> pageStream = splitedStream.getSideOutput(pageTag); storeSplitSteam.put(DISPLAY,displayStream); storeSplitSteam.put(ACTION,actionStream); storeSplitSteam.put(ERR,errStream); storeSplitSteam.put(PAGE,pageStream); storeSplitSteam.put(START,splitedStream); */ Map<String, DataStream<JSONObject>> splitedStream = splitStream(validatedStream); ECHO is off. // 数据写出到kafka // 获取map中的不同测流,并写入到kafka writeToKafka(splitedStream);
下单事务型事实表构建时,需要考虑维度退化,把订单来源退还到事实表中,就用了lookup join。支付成功事务事实表时,需要将订单明细,订单表,订单优惠券使用表,订单活动券参与表进行join,支付事实join 订单表需要考虑网络的延时(5seconds)和业务上的延时(15mins),使用了interval join;而订单表join优惠券使用表的话,使用regular join 为left join,但写入kafka时需要考虑数据更新的问题。普通的kafka连接器,一种是upsertkafka连接器,区别在于后者用于处理存在数据更新时的kafka写入,同时需要指定每条数据的主键(order_detail_id),此时删除的数据,在kafka中key为order_detail_id,value为数据,并且为null,当数据更新,会出现更新前和更新后相同id,不容value的kafak消息,这样的话,dws在读取dwd的订单明细的话会根据需要去重。存储轻度汇总的数据,比如现有一指标为每个2小时统计各个品牌的活动补贴率,那么dws就可以建立各个品牌,sku粒度的5秒滚动汇总表,记录每5秒各个品牌下的sku的订单总额和活动优惠金额,此外还可以加入商品维度的其他属性,使用dws表的复用
读取dwd的下单事实表时,下单事实数据加载时使用了left join,当时使用的是upsertkafka写入kafka,就会出现删除数据和这个更新的数据,那么使用clickhouse时,却使用的是普通的kafkasource,此时问题:
flink官方目前没有提供clickhouse 的连接器,可以使用Flink提供了JDBC连接器写入数据到clickhose
java自动换行:关放大阅读展开代码JdbcSink.sink(querySql, new JdbcStatementBuilder<T>() { @Override public void accept(PreparedStatement ps, T t) throws SQLException { ECHO is off. // 为占位符赋值 try { Field[] fields = bean.getDeclaredFields(); // 变量k保证每个需要sink的field准确对应赋值 for (int i = 0, k = 1; i < fields.length; i++) { Field field = fields[i]; ECHO is off. if (field.getAnnotation(NoSink.class) == null) { field.setAccessible(true); Object columnValue = field.get(t); ps.setObject(k++,columnValue); } } } catch (IllegalAccessException e) { throw new RuntimeException(e); } } }, new JdbcExecutionOptions.Builder() .withBatchIntervalMs(5000) .withBatchSize(1024 * 1024) .withMaxRetries(3) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withPassword(password) .withUsername(userName) .withUrl(url) .withDriverName(driver) .build() );
省略
省略
省略
grafana + prometheus
本文作者:hedeoer
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!