数仓架构图

[[数仓分层架构图]]

[[数仓整体架构图]]

Binlog merge原理

原理:

将新增binlog和ods中涉及到的时间分区的数据取出来做union all操作,开窗,Partition by主键,Order by 版本号 desc,Order by binlog_type desc,过滤掉delete数据,并取出每个主键最新的一条数据作为最终结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
select id,batchnumber,batchtype,ctime,utime,ver,dt 
from (select binlog_type,id,batchnumber,batchtype,ctime,utime,ver,dt,row_number() over(partition by id order by ver desc,binlog_type desc) rn
from (
select case when get_json_object(content,'$.type') = 'insert' or get_json_object(content,'$.type') = 'bootstrap-insert' then 1
when get_json_object(content,'$.type') = 'update' then 2
when get_json_object(content,'$.type') = 'delete' then 3
end as binlog_type,
cast(get_json_object(get_json_object(content,'$.data'),'$.id') as bigint) id,
cast(get_json_object(get_json_object(content,'$.data'),'$.batchnumber') as string) batchnumber,
cast(get_json_object(get_json_object(content,'$.data'),'$.batchtype') as string) batchtype,
case when get_json_object(content,'$.type') != 'bootstrap-insert'
then from_unixtime(unix_timestamp(get_json_object(get_json_object(content,'$.data'),'$.ctime'))+28800,'yyyy-MM-dd HH:mm:ss')
else get_json_object(get_json_object(content,'$.data'),'$.ctime')
end as ctime,
case when get_json_object(content,'$.type') != 'bootstrap-insert'
then from_unixtime(unix_timestamp(get_json_object(get_json_object(content,'$.data'),'$.utime'))+28800,'yyyy-MM-dd HH:mm:ss')
else get_json_object(get_json_object(content,'$.data'),'$.utime')
end as utime,
cast(get_json_object(get_json_object(content,'$.data'),'$.ver') as int) ver,
substr(case when get_json_object(content,'$.type') != 'bootstrap-insert'
then from_unixtime(unix_timestamp(get_json_object(get_json_object(content,'$.data'),'$.ctime'))+28800,'yyyy-MM-dd HH:mm:ss')
else get_json_object(get_json_object(content,'$.data'),'$.ctime')
end,1,10) dt
from ods_binlog.ods_binlog_basiccomment_avatar_commentbatchsource_di
where dt >= date_add(current_date,-1)

and get_json_object(content,'$.type') != 'bootstrap-insert'
and from_unixtime(cast(get_json_object(content,'$.ts') as bigint),'yyyy-MM-dd HH:mm:ss') >= date_add(current_date,-1)
and from_unixtime(cast(get_json_object(content,'$.ts') as bigint),'yyyy-MM-dd HH:mm:ss') <= concat(date_add(current_date,0),' 00:00:00')

union all
select 1 binlog_type,
id,batchnumber,batchtype,ctime,utime,ver,dt
from ods.ods_basiccomment_avatar_commentbatchsource_dic
where dt in (
select distinct substr(case when get_json_object(content,'$.type') != 'bootstrap-insert'
then from_unixtime(unix_timestamp(get_json_object(get_json_object(content,'$.data'),'$.ctime'))+28800,'yyyy-MM-dd HH:mm:ss')
else get_json_object(get_json_object(content,'$.data'),'$.ctime')
end,1,10) dt
from ods_binlog.ods_binlog_basiccomment_avatar_commentbatchsource_di t1
where dt >= date_add(current_date,-1)
and get_json_object(content,'$.type') != 'bootstrap-insert'
and from_unixtime(cast(get_json_object(content,'$.ts') as bigint),'yyyy-MM-dd HH:mm:ss') >= date_add(current_date,-1)
and from_unixtime(cast(get_json_object(content,'$.ts') as bigint),'yyyy-MM-dd HH:mm:ss') <= concat(date_add(current_date,0),' 00:00:00')

)
) t
) tt
where tt.rn = 1 and tt.binlog_type != 3

遇到的问题

数据倾斜

order by utime数据错误

同一秒内更新多次,无法确定哪个是最终状态。改为order by ver

数据更新跨分区多

ods不分区或者按其他字段分区

ctime变更

如果old里面ctime不为空,取old;
或者扫ods全表

为什么不用datax每天把每个表的全量导过来?

DataX 通过直连 MySQL 批量拉取数据,存在以下问题:

1)性能瓶颈:随着业务规模的增长,离线批量拉取的数据规模越来越大(因为每次都是全量拉取),影响 mysql-hive 镜像表的产出时间,进而影响数仓下游任务。
对于一些需要 mysql-hive 小时级镜像的场景更加捉襟见肘。

2)影响线上业务:离线批量拉取数据,可能引起慢查询,影响业务库的线上服务。

3)无法保证幂等:由于线上库在实时更新,在批量拉取 SQL 不变的情况下,每次执行可能产生不一样的结果。比如指定了 create_time 范围,但一批记录的部分字段(比如支付状态)时刻在变化。也即无法产出一个明确的 mysql-hive 镜像 , 对于一些对时点要求非常高的场景(比如离线对账) 无法接受。

4)缺乏对 DELETE 的支持:业务库做了 DELETE 操作后,只有整表全量拉取,才能在 Hive 镜像里体现。

有些需求要统计当月的数据,如果数据库时物理删除,就要记录delete操作

5)有些需求想统计mysql中某个字段在一天之中的变化,只能通过binlog来获取

常用的字符串函数

length、concat、substr、upper、lower、trim、regexp_replace、get_json_object

复杂的需求

当月首次下单金额、当月末次下单金额

两次row_number

取中间状态

根据binlog

连续3天登录

两种方法

数据监控

延迟监控

在读取binlog时记录每个表的最大utime,如果过了今天的0点,就写入mysql。

merge之前启动一个shell脚本,判断有多少表今天的binlog已经落盘,如果超过阈值就可以开始merge。

字段监控

mysql记录目前所有同步binlog的mysql表的表结构。

启动一个定时脚本检测当前ops接口中的字段与mysql里存的表结构是否一致,
如果有新增字段,就执行
ALTER TABLE [tableName] ADD COLUMNS ([columnName] [type]) cascade;

casecade用法:

实际应用中,常常存在修改数据表结构的需求,比如:增加一个新字段。

如果不使用casecade新增列,可以成功添加列。但如果数据表已经有旧的分区(例如:dt=20190101),则该旧分区中的col1将为空且无法更新,即便insert overwrite该分区也不会生效。

cascade的中文翻译为“级联”,也就是不仅变更新分区的表结构(metadata),同时也变更旧分区的表结构。

特殊值监控

比如某些字段每天都会新增,某些字段不能为空,可以通过sql脚本进行校验

表更新监控

监控表的最大更新时间,如果一段时间都没发生变化,有可能业务已经换表或者迁库,要及时进行调整,这个功能要配合白名单使用。

明文检测

binlog采集环节对照敏感字典对数据做实时明文检测,发现明文数据后对其进行-1或者md5处理,并及时通知DBA,避免敏感数据流入数仓。

维度退化

有些维度过于简单而不值得为它建立一个维表,可以将它放到事实表中。

存储在事实表中的维度列被称为退化维度,退化维度的过程称为维度退化。

事实表

事实表概述

事实表作为数据仓库维度建模的核心,紧紧围绕着业务过程来设计。其包含与业务过程有关的维度引用(维度表外键)以及该业务过程的度量(通常是可累加的数字类型字段)。

事实表特点

事实表通常比较“细长”,即列较少,但行较多,且行的增速快。

事实表分类

事务事实表、周期快照事实表、累积快照事实表

事务型事实表

概述

事务事实表用来记录各业务过程,它保存的是各业务过程的原子操作事件,即最细粒度的操作事件。粒度是指事实表中一行数据所表达的业务细节粒度。

事务型事实表可用于分析与各业务过程相关的各项统计指标,由于其保存了最细粒度的记录,可以提供最大限度的灵活性,可以支持无法预期的各种细节层次的统计需求。

设计流程

设计事务事实表时一般可遵循以下四个步骤:

选择业务过程→声明粒度→确认维度→确认事实

1)选择业务过程

在业务系统中,挑选我们感兴趣的业务过程,业务过程可以概括为一个个不可拆分的行为事件,例如电商交易中的下单,取消订单,付款,退单等,都是业务过程。通常情况下,一个业务过程对应一张事务型事实表。

2)声明粒度

业务过程确定后,需要为每个业务过程声明粒度。即精确定义每张事务型事实表的每行数据表示什么,应该尽可能选择最细粒度,以此来应对各种细节程度的需求。

典型的粒度声明如下:

订单事实表中一行数据表示的是一个订单中的一个商品项。

3)确定维度

确定维度具体是指,确定与每张事务型事实表相关的维度有哪些。

确定维度时应尽量多的选择与业务过程相关的环境信息。因为维度的丰富程度就决定了维度模型能够支持的指标丰富程度。

4)确定事实

此处的“事实”一词,指的是每个业务过程的度量值(通常是可累加的数字类型的值,例如:次数、个数、件数、金额等)。

经过上述四个步骤,事务型事实表就基本设计完成了。第一步选择业务过程可以确定有哪些事务型事实表,第二步可以确定每张事务型事实表的每行数据是什么,第三步可以确定每张事务型事实表的维度外键,第四步可以确定每张事务型事实表的度量值字段。

不足

事务型事实表可以保存所有业务过程的最细粒度的操作事件,理论上其可以支撑与各业务过程相关的各种统计粒度的需求。但对于某些特定类型的需求,其逻辑可能会比较复杂,或者效率会比较低下。例如:

1)存量型指标

例如商品库存,账户余额等。此处以电商中的虚拟货币为例,虚拟货币业务包含的业务过程主要包括获取货币和使用货币,两个业务过程各自对应一张事务型事实表,一张存储所有的获取货币的原子操作事件,另一张存储所有使用货币的原子操作事件。

假定现有一个需求,要求统计截至当日的各用户虚拟货币余额。由于获取货币和使用货币均会影响到余额,故需要对两张事务型事实表进行聚合,且需要区分两者对余额的影响(加或减),另外需要对两张表的全表数据聚合才能得到统计结果。

可以看到,不论是从逻辑上还是效率上考虑,这都不是一个好的方案。

2)多事务关联统计

例如,现需要统计最近30天,用户下单到支付的时间间隔的平均值。统计思路应该是找到下单事务事实表和支付事务事实表,过滤出最近30天的记录,然后按照订单id对两张事实表进行关联,之后用支付时间减去下单时间,然后再求平均值。

逻辑上虽然并不复杂,但是其效率较低,应为下单事务事实表和支付事务事实表均为大表,大表join大表的操作应尽量避免。

可以看到,在上述两种场景下事务型事实表的表现并不理想。下面要介绍的另外两种类型的事实表就是为了弥补事务型事实表的不足的。

周期快照事实表

概述

周期快照事实表以具有规律性的、可预见的时间间隔来记录事实,主要用于分析一些存量型(例如商品库存,账户余额)或者状态型(空气温度,行驶速度)指标。

对于商品库存、账户余额这些存量型指标,业务系统中通常就会计算并保存最新结果,所以定期同步一份全量数据到数据仓库,构建周期型快照事实表,就能轻松应对此类统计需求,而无需再对事务型事实表中大量的历史记录进行聚合了。

对于空气温度、行驶速度这些状态型指标,由于它们的值往往是连续的,我们无法捕获其变动的原子事务操作,所以无法使用事务型事实表统计此类需求。而只能定期对其进行采样,构建周期型快照事实表。

设计流程

1)确定粒度

周期型快照事实表的粒度可由采样周期和维度描述,故确定采样周期和维度后即可确定粒度。

采样周期通常选择每日。

维度可根据统计指标决定,例如指标为统计每个仓库中每种商品的库存,则可确定维度为仓库和商品。确定完采样和维度后,即可确定该表粒度为每日-仓库-商品。

2)确定事实

事实也可根据统计指标决定,例如指标为统计每个仓库中每种商品的库存,则事实为商品库存。

事实类型

此处的事实类型是指度量值的类型,而非事实表的类型。事实(度量值)共分为三类,分别是可加事实,半可加事实和不可加事实。

1)可加事实

可加事实是指可以按照与事实表相关的所有维度进行累加,例如事务型事实表中的事实。

2)半可加事实

半可加事实是指只能按照与事实表相关的一部分维度进行累加,例如周期型快照事实表中的事实。以上述各仓库中各商品的库存每天快照事实表为例,这张表中的库存事实可以按照仓库或者商品维度进行累加,但是不能按照时间维度进行累加,因为将每天的库存累加起来是没有任何意义的。

3)不可加事实

不可加事实是指完全不具备可加性,例如比率型事实。不可加事实通常需要转化为可加事实,例如比率可转化为分子和分母。

累积快照事实表

概述

累计快照事实表是基于一个业务流程中的多个关键业务过程联合处理而构建的事实表,如交易流程中的下单、支付、发货、确认收货业务过程。

累积型快照事实表通常具有多个日期字段,每个日期对应业务流程中的一个关键业务过程(里程碑)。

订单id 用户id 下单日期 支付日期 发货日期 收货日期 订单金额
1001 1234 2023-01-01 2023-01-02 2023-01-03 2023-01-08 100

累积型快照事实表主要用于分析业务过程(里程碑)之间的时间间隔等需求。例如前文提到的用户下单到支付的平均时间间隔,使用累积型快照事实表进行统计,就能避免两个事务事实表的关联操作,从而变得十分简单高效。

设计流程

累积型快照事实表的设计流程同事务型事实表类似,也可采用以下四个步骤,下面重点描述与事务型事实表的不同之处。

选择业务过程→声明粒度→确认维度→确认事实。

1)选择业务过程

选择一个业务流程中需要关联分析的多个关键业务过程,多个业务过程对应一张累积型快照事实表。

2)声明粒度

精确定义每行数据表示的是什么,尽量选择最小粒度。

3)确认维度

选择与各业务过程相关的维度,需要注意的是,每各业务过程均需要一个日期维度。

4)确认事实

选择各业务过程的度量值。