听云APMCon: 实时OLAP数据仓库架构优化

2016-09-12 17:43:08来源:威易网作者:

本文整理自APMCon 2016中国应用性能管理大会数据库性能优化专场优酷广告基础设施技术专家 张海雷演讲《实时OLAP数据仓库架构优化演进》,主要介绍了Druid实时导入、亚秒响应、支持高并发、高可用性等诸多特性。

本文整理自APMCon 2016中国应用性能管理大会数据库性能优化专场优酷广告基础设施技术专家 张海雷演讲《实时OLAP数据仓库架构优化演进》,主要介绍了Druid实时导入、亚秒响应、支持高并发、高可用性等诸多特性。

在分享之前先自我介绍一下,我来自于优酷土豆广告团队,负责广告团队的Redis Cluster和Druid集群。Druid大家听过吗?这个Druid不是那个阿里的数据库连接池Druid,Druid是一个开源的持续实时数据库。

一、需求背景

我先一下需求背景,最刚刚开始的需求就是为DSP广告主提供实时多维分析报表。然后我们看一下报表的一些需求指标:

?多维钻取 16个维度,22个metrics

?海量数据 10亿量级

?实时性 延迟为分钟级别。比较致命的就是实时性,产品那边给我们技术提供的一个技术指标就是延迟为分钟级别,我们当前要看到前一分钟的一些数据。

?多维下的UV 每天独立访客千万量级

在接到这个需求之前,我刚刚也是提到了最初是负责Redis Cluster的运维维护,我们并没有太多的OLAP经验。接到需求以后,我们就问了一下同行,搞出来了第一个架构。

\

最左边就是日志收集,有两条线,上面一个线就是离线批量处理的架构,下面是实时的,我们先从实时的开始讲。实时的就是日志收集了以后,从kafka带到了Storm,经过ETL到Redis,最终就是通过应用调用Redis获取实时的数据。离线就是走HDFS,Hive,经过Hive ETL到Mysql里面去,应用层调用Mysql的数据。离线的话需要今天去弄昨天的数据就走Mysql,今天查实时数据就是走Redis,大致架构就是这样的。接下来介绍一下具体怎么实现。

二、实时架构

实时架构的话我们是采用预算维度符合的metric度量值,维度组合做key,各项metric按照一定的格式拼装成value,Redis是一个K-V存储,这样的话我们读取的时候先拼好这些key,在Redis里面获取这些metrics。我举一个例子,比如说我们是在线广告行业,我们为广告主提供这些报表,一个广告主去查一个投放在地域维度下的曝光,点击一些数据他有可能是按照投放加地域组合成一个key,去Redis取出那些曝光。但是这样是会遇到三个问题。第一个问题就是维度组合膨胀。

因为用K-V存储的方式必须采用的就是穷举的方式,预算各种维度的组合。比如说我举一个例子,假设有4个维度,一个是投放,一个是广告位,一个是素材,还有刚刚提的地域,我就得是预算一下4个维度所有的组合。有可能就是一阶的,也有可能是全阶的4个维度的组合,还有三个维度、两个维度的组合。

有两个因素决定这个组合一个数量,第一个维度的个数。当维度个数越多,这个组合的数量也是多。还有就是维度组成的稀疏程度,还是以广告的数据来看,一个投放有可能有多个素材。比如说我们现在有5个广告,5个投放,5个素材,但是这一个投放并不是说跟5个素材都是相关联的,只是说跟其中的一个素材或者两个素材关联,这样的话就会产生一定的稀疏,跟所有的关联那就是很稠密。

另外还有维度的基数,这些概念看起来比较的抽象,我后续会解释。总之带来了一个问题,增加维度了以后,有可能带来的就是一个指数式增长。

另外一个问题如何支持Group By查询?我们都知道Redis不支持Range Scan,它只支持前缀查找。这个前缀查找没有索引,他是在Redis整个HashTable里面进行Scan,如果我们这个Redis容量很大,比如说我有1千万,那可能要扫描1千万才可以得到这个结果,这个是最害怕的地方。我们做法就是在应用层穷举这个维度组合,然后再采用MGET的方式在Redis中获取这个数据。

这种方案适合特定场景,我在应用层已经知道了这个广告投放有什么素材在那些广告位上面进行投放过。如果说是不可预知的,比如说在那些域名下投放有可能是一个开放的,这个时候这个方案不适用了。

第三个问题,如何实现UV?Redis是支持HyperLogLog的,是一个基数的近似的一个统计。

\

我现在举一个例子提上一页说到的维度组合膨胀的问题,假设我们有3个维度,每一个维度基数都是3,在这里提到一个概念就是维度的基数。什么是基数?就是这个维度的集合当中不重复元素的数量。比如说,这个维度A里面有A1、A2、A3,这个里面有4个值,但是不重复的数量就是A1、A2、A3,这个基数就是3。同样,纬度B也是B1、B2、B3,纬度3就是C1、C2、C3,最差情况下的组合,我们来算一下就是三阶维度,组合是ABC,每一个维度基数就是3。ABC一个组合乘3的3次方,就是27。我们看一下两种维度的组合,排列方式有ab,ac,bc这些。每一种排列的数量是3的2次方,就是9,这样加起来就是27。然后,一维有可能每一个都是3,加起来就是9。

那么,我们得到的结论是什么了?增加维度以后,维度组合数量有可能是指数增长,不是线性的。试想一下,增加一个维度变成4个,那么这个量级线性增长。当然,这里列举的是最差的情况,是现实的数据组合是稀疏的。我这里再提一下,我们当初选择技术方案的时候在这儿也是遇到一个坑,因为我们当初没有接触过大数据,一想16个维度有可能原始日志级是10亿,有可能16个维度组合起来,怎么着也得有几亿,觉得这个量比较大,我们想到方案的时候,有可能就是造成一个错误的选型。后来实现了以后,虽然原始日志级就是10亿,我们选取了维度了以后,我们发现每一天聚合了以后是500万,500万跟当初的技术选型上面肯定有不同的。

三、离线架构

\

接下来说说离线架构,是采用Hive将多种日志源的数据经过ETL以后写到Mysql全维度的宽表中。这些就是广告数据,它是有多种的,有竞价,有曝光,有点击,我们需要在Hive里面join一下,合成宽表的方式。我刚刚提到了全维度宽表,比如说,原始数据它有50个维度,但是我们这里是需要16个维度,我们按照这个16个维度Group By,然后增加一些度量值提取出来写到Mysql里面去。

瓶颈与挑战的话也是刚刚提到的那个全维度组合的数据量,当初也是错误的预估了这个量。

由于预算所有维度组合的时间成本很大,我们进行功能降级。

\

我们选取了固定的维度组合,没有实现完整全维度的OLAP,只选取了固定的某一些组合。还拿我刚刚那个例子讲一下,有投放素材广告位地域,并不是说实现任意一个组合,只实现两两一个组合。

然后Mysql这边也是做了一些降级,按照维度组合作为key组件去分表,其实这个分表的做法,就跟Redis的存储做法一样的。

刚刚提到的这个方案有一些缺点。

第一,增加维度了以后,给存储的容量以及计算量带来很大的一个挑战。

第二,Redis不支持范围查找,Group By查询需要客户端穷举维度组合,这个更适合一个特定应用场景可以做到。

第三,Redis和Mysql是属于异构的数据库,如果我有一个查询是要查历史和实时相结合的查询,我需要在那个应用层合并。

基于以上的分析三个缺点,我们着手做改进一个工作。我们当时想了2种,第一种就是HBase替换Redis和Mysql。HBase同样也是K-V存储,但是有一个优点是HBase支持范围查找,可以做Group By。另外一种我们调研另外一个方案,也是在广告领域广泛应用一个OLAP技术,就是Druid。

四、Druid是什么

Druid其实开源比较早,11年就开源了,今年4月份的时候Druid作者来中国,我们跟他交流创建Druid的目的是什么?当时他们在一个技术创业公司,提供的方案是为广告提供实时多维分析。当时他们也是尝试了很多的方案,跟我刚刚最初提到的Mysql,还有HBase方案都是很像。但是,如果实现真正的OLAP还是有一定的差距。所以,他们就是创立了Druid,是为OLAP而生,我列举了几个特点,多快好省高,现在逐一解释一下。

\

?多,可以处理海量的数据, Druid官网说可以扩展到PB级,这个量非常大。

?快,亚秒级响应,官网说10亿量级下做到亚秒响应,我们实际应用也是亚秒响应,实时导入,导入即可查询。导入了以后我们就可以查询到,这个还是非常非常的牛的。

?好,就是高可用,分布式容错架构,可以做到无宕机。

?省,采用列存储,高效压缩。我举一下我们的例子,我们原始日志是10亿量级,我们选取16个维度,22个度量值,每天生成的索引是几百兆。

?高,它支持高并发,可以是作为面向用户的应用

1.Druid支持的查询

\

接一下讲一下Druid支持的查询,它是支持这样几种查询:TimeSeries、GroupBy、TopN、Select等常用查询。

还有二点特别重要,就是支持星型模型,提供一个lookup的功能可以实现和维度表的join。星型模型如上图中间就是维度表,举一个例子,我们这个表里面有可能还有一些ID是产品ID,地域ID,当我们给用户展现的时候,不可能给用户展现一些ID,展现的时候我们通常是采用ID在维度表里面join获取ID对应的名称。但是Druid不支持join,是提供的lookup功能得到查询结果了以后,得到是一个ID,可以根据ID去找到其中的内容。所以,这就是为什么是适合新型模型。

第三,支持的函数是count和sum。OLAP当中count和sum基本上可以满足90%以上的需求。比如说我们要求一个曝光数,点击数这个就是一个sum。当然,还有一些率比如说点击率,这些都是可以通过后处理的方式去得到。

另外可以采用java的方式实现自定义的UDF,就像刚刚说到的那个转化率,我得到了一个曝光数一个点击数,两者相除可以把这个做到UDF,做到JS的函数里面去。

另外一点,支持可扩展自定义的Aggregator,它的代码设计的非常灵活,非常容易实现自定义模块。

比如说阿里贡献了利用BitMap实现精准的DistinctCount,不知道大家在实际工作场所当中有没有DistinctCount的需求?这个还是挺重要的。比如说,我要查一天的UV,或者某一种维度下的UV,前面提到了用HyperLogLog,但是只是一个近似查询,会带来一定误差,精准的方式就是采用BitMap。

最后一点,就是支持近似查询。比如说像HyperLogLog用于计算UV,还有比较牛的DataSketches,同样也是可以计算UV,但是它与HyperLogLog不同的是,HyperLogLog只可以做并集,它可以把昨天跟今天的合并得到一个总的UV数,但是它做不到一个交集。我现在做一个用户的留存分析怎么做?就是得到昨天一个集合,跟今天的一个集合做一个交集,HyperLogLog是做不到的,DataSketches 可以做到。

2、数据格式

\

了解Druid从数据格式开始,Druid是一个实时数据库,所以这个时间戳非常重要,作为一个数据分片,路由查询一个重要的维度,所以它单独提出来。另外,还有一个metric度量值,这是它与其他的大部分的OLAP不同的一点。比如说,其他的好多都没有把度量值跟维度明确地区分出来,但是Druid是明确区分了,这是为了存储还有计算上面需求而需要的,因为维度是需要进行索引的。我们的索引都是在维度上面做一个过滤,但是,度量值一般就是一个聚合计算,不需要索引。

上图下面的数据就是一个在线广告的数据,我是从Druid官网上面拉下来的。最下面就是时间戳,中间4列像媒体广告主,性别,国家,最后两个就是点击,跟扣费价格是一个度量值。

3、Roll-up

\

Roll-up是Druid非常有用的特性,它保证了低延迟响应时间。Roll-up就是数据导入阶段进行的一次聚合,第一层聚合。上图下面用SQL展示Roll-up 的做法,相当于进行了一次GroupBy,全维度的GroupBy。Roll-up有一个优点,它会大大地减少数据集的大小,甚至可能就是百倍以上。我刚刚举了一个例子,我们原始数据集在10亿左右,Roll-up 了以后有500万,这个Roll-up 减少的数据集大概在20倍,极端的情况下如果你选取的维度比较少的话,减少的数据集有可能是百万级。缺点就是说它会丢失明细数据,因为你在导入的时候已经做了一层聚合,你看不到明细数据。

4、Druid高效的奥秘-数据布局

接下来会比较有意思,我刚才讲的Druid的几个特性,比如说它能处理海量数据,能扩展到PB级和亚秒级响应,我们来看它为什么能做到这几点。

\

同样还是由刚才列举的这些数据来看,比如说上面这部分数据,它有投放、广告位,我们由上一份数据来分析它的维度。然后就是曝光数、点击数,这是度量值。这样的一份数据,它是怎么编制成索引的呢?或者说Druid是怎么进行存储的呢?以投放这维度来分析,首先它会进行一个字典编码,投放这维度里面有C1,C2,C2,C3。我刚才提到一个概念是基数,它的基数是3,它会把C1, C2, C3构建成一个字典,字典编码值就是它在这个字典数组中的下标。C1,C2,C3编码值分别变成了0,1,2,存储时我们不再存C1,C2,C2,C3,而是0,1,1,2。

同时我们采用倒排索引,最后我列出的这个黄色的部分。说道倒排索引,大家很容易想到Lucene,它其实跟Lucene的倒排索引很像,有一点不同就是Druid的token list采用了BitMap,最新版本的Lucene也采用了BitMap。我们来看一下BitMap是怎么做的,比如说C1,它只出现在第一行它的BitMap就是1000,C2它只出现在第2,3行,C2的BitMap是0110,同理C3只出现在第4行,C3的BitMap是0001。

以上大概就是Druid的索引和数据布局,同理我可以得到下面的广告位的数据布局。那么度量值怎么存储呢?刚才提到度量值是不做索引的,它只是用于一个快速的聚合,所以度量值就只是一个原样存储。这里只是一个示例,在真实存储的时候它会加上一些压缩,后续我会介绍,接下来讲一下它是怎么查询的。

5、Druid高效的奥秘-BitMap 快速Filter

\

如果使用BitMap,假设我有一个SQL是上面这个Sele ct Position ,su m (click) fro m t able whe re cast=C2 Gro up By Position,投放是C2,按照广告位分组,去查其点击数该怎么做呢?过程如下:

第一步,根据过滤条件cast=C2,找到C2的字典值是1;

第二步,根据字典值1找到BitMap 0110.

第三步,根据BitMap得到offset是1和2。也就是根据BitMap的值0110去看它出现在那几个位置,这里出现的位置是第2位和第3位,对应下标是1和2。

第四步,我们可以根据这个offset1,2去构建一个curs or游标去进行遍历。

第五步,单次迭代的时候根据offset1在广告位position中绿色这部分偏移为1的广告位是0,在点击position中,蓝色的这部分偏移为1的值是21,这是单次迭代的结果。接下来进行下一次迭代,下一次迭代的时候offset变成2了,在广告位绿色部分offset为2的偏移后得到的值是1,在点击蓝色部分offset为2的偏移后得到的值是14。对广告位来说,最后得到的字典编码为01,我们需要根据字典编码01去反查p1,p2,0对应21,1对应14,也就是p1广告位的点击数是21,p2广告位的点击数是14。相对于Lucene原来的parse list采用的skiplist跳表的方式,BitMap执行的速度是非常快的。

6、Druid高效的奥秘-列存储和压缩

\

接下来看下列存储和压缩,上面讲的仅仅是一个查询的示例,但真正采用的是列存储和高效的压缩。列存储的优点就是按列存储,比如说我要去读广告位这一列数据,就不需要读其他的。另外一点好处列存储可以采用字典编码,就是刚刚提到的会把C1,C2,C3编制成0,1,2,会把一个字符串转换成一个整数。所有的压缩,我觉得都是基于一个概率了,比如说这个串都很长我把它转化成一个int,这个时候压缩效率比较高。

再接下来看一下BitMap的压缩,BitMap支持两种不同的压缩方式,一种是concise BitMap,一种是roaring BitMap。BitMap它的一个优点就是计算快,但是它缺点占用空间大,同时它的一个特征就是比较稀疏的。举一个例子,比如说我们有一亿用户,用户ID都是整的,比如说就是1到1亿,根据这一个亿用户构建一个BitMap。假设我每天活跃的只有2千万,当用户ID是1亿的第一亿个用户来了,我们为他去构建一个BitMap,那它的长度肯定就是一亿除了以8,这个空间还是非常大的。但是活跃没有那么多时候,假设活跃几百万我不能不为一亿用户去浪费这样大一个空间。

刚刚提到了dimension,它的压缩有字典编码,还有变长整数编码,很多的RPC的续量化方式里面都是用到这个变长整数编码,Druid里面用到的编码非常的高效。我下面列举一下。

前面提到了就是把维度进行字典编码,在整数基础上做一个变长整数编码。怎么做的呢?比如说我的字典是1千,这个维度当中1000个不同的数,它的最大的值就是1千,这1千其实可以用一个短整存储,2个字节就可以存储,而不是用4个字节,这个压缩方式是非常非常的高效。

再举一个例子,一个维度的基数是50%,其实一个字节就可以存储。那么这个时候,原来用4个字节存储int现在一个字节压缩了25%。在其他的变长整数压缩里面标识一个位占用几个字节,但是Druid里面因为它字典的那个是一定的。这个1000就是2个字节编码,不用再用位占一个字节。

metric是按块压缩,默认64K。而且,必须是2整数倍,这个是非常非常的有意义的,必须这样做,他方便定位。

7、Druid架构

\

下面来看一下Druid的架构,它是由一系列不同的决策节点组成。有Realtime,有Coordinator,有Broker,还有Historical,下面来分别介绍一下他们每一个节点的作用。

Realtime:

?实时接收(拉取)数据,生成Segment

?负责实时部分的查询

?采用类LSM-tree的架构,对写友好,支撑高并发和高吞吐量的写入

?内存增量索引采用ConcurrentSkipListMap

?达到阈值以后,异步线程将内存增量索引持久化成倒排索引

?达到Segment设定的时间粒度时,将这一时间段内的持久化索引合并成Segment

?具体实现有Realtime Node和Indexing Service两种方式

Coordinator:

?负责协调Segment的均衡加载

?动态均衡Historical节点的负载

?根据用户指定的Load Rule加载指定时间段的Segment

Historical:

?Historical是查询的主力

?从Deep Storage中拉取Segment,采用mmap的方式Load

?Historical可以配置负载能力

?Scatter/Gather 模型。查询时分发给多线程执行,每个Segment分配一个线程,然后再聚合。

?Shared-Nothing架构,扩展性强

Broker:

\

?通过ZooKeeper获取TimeLine.

?Scatter/Gather模型,把时间段的查询转化成Segment的查询分发给Historical或者RealTime,然后再在Broker层聚合

?采用Http Restful API提供查询

?自实现NettyHttpClient,异步Nio的方式

?缓存查询结果

同时还有一些外部依赖:

Zookeeper:

?LoadQueue,Coordinator分发Segment的实现是将Segment加到指定节点的LoadQueue

?Coordinator的failover实现,利用Zookeeper的选主

?时间轴,用于查询路由分发到指定的节点

Deep Storage 用于Segment的备份存储:

?HDFS、S3和Cassandra等

?使用其他任何对象存储,自定义模块实现

Meta信息存储:

?Mysql

?其他关系型数据库,自定义模块实现

所以如何保障他们的高可用?

RealTime:RealTime Node 采用从Kafka中拉取数据的模式,不支持多副本,不能保障高可用,Indexing Service 实现了导入阶段多副本,保障高可用

Historical:采用多副本加载

Coordinator:主备模式,采用Zookeeper选主

Broker:利用Zookeeper的节点发现以及客户端负载均衡

接下来是Druid的几个比较:

Druid vs Elasticsearch:

?Druid在导入过程会对原始数据进行Roll-up,而ES会保留原始数据。

?Druid专注于OLAP,针对数据导入以及快速聚合操作做了优化。

?Druid不支持全文检索。

Druid vs. Key/Value Stores (HBase/Cassandra/OpenTSDB):

KV存储的通用方式:

1、预算所有可能的维度组合

2、在事件记录上进行Range Scan,所有的维度组合作为key,metrics作为value

缺点:

第一种,给空间和计算带来挑战,增加维度以后指数式增长。

第二种,所有的维度组合作为ke y ,Range Scan没有索引的话会扫描大量的数据

Druid vs SQL-on-Hadoop (Impala/Drill/Spark SQL/Presto):

SQL-on-Hadoop提供了执行引擎可以在多种数据格式上进行查询,它也可以查询Druid的数据。

查询模式

?Druid Scatter-Gather模型,算子下放,算子执行和存储在同一节点,broker只是聚合结果

?SQL-on-Hadoop一般采用MPP,执行计划的不同算子分布在不同节点执行,增加数据序列化以及网络传输的开销

数据摄入:Druid 支持实时导入

SQL支持:Druid只支持单表,不支持大表之间的Full join

\

这个是使用Druid一个结构,也是lamda一个架构,也是两个线,上面这个线通过hadoop生成索引,下面经过ETL实时生成索引。在使用Druid以后的实时部分如下:

\

?Druid只支持单表,采用把多表预先Joi n成单表

?每种日志维度冗余,采用Storm的Fiel d Grou ping把相同维度的不同日志连接在一起

?Storm中在一定时间窗口内预聚合,减小Druid的压力

?写入Kafka时采用Hash分区,使相同维度组合的数据落在同一分区

接下来是使用Druid以后的性能表现:

\

最后是使用Druid以后的数据导入吞吐量:

\

上图是吞吐量,这个吞吐量在这里我说明一下,Druid数据是导入一个吞吐量,不是查询吞吐量,我算了一下,大概最高的时候,峰值达到8万的量。

关键词:听云APMCon