发布时间:2022-08-19 11:45
动手点关注 干货不迷路
在数据领域,有几类经典的查询场景:
想要统计某段时间内访问某网站的 UV 数,或是统计某段时间内既访问了页面 A 又访问了页面 B 的 UV 数,亦或是统计某段时间内访问了页面 A 但未访问页面 B 的 UV 数,通常我们对这种查询叫做基数统计。
想要观察某些指标的数据分布,例如统计某段时间范围内访问页面 A 与页面 B 各自的浏览时长 95 分位数、50 分位数,则需要用到分位数统计。
想要统计某段时间内播放量最多或者点击率最高的 10 个视频或者文章(热榜列表),则需要用到 TopN 统计。
这几类问题在数据量不大的情况下都是非常容易处理的。我们可以通过遍历+排序轻易而准确的解决这种问题。但一旦数据到达 Billion 量级,常规算法可能要花费数小时甚至数天的时间,并且即使提供充足的计算资源也于事无补,因为这几类问题都难以并行化处理。
DataSketches[1] 就是为了解决大数据和实时场景下的这几类典型问题而诞生的一组算法,最初由雅虎开源。这些算法以牺牲查询结果的精确性为代价,可以在极小的空间内并行、快速地解决上述几类问题。
Sketch 结构即「数据草图」结构,主要是为了计算海量的流式数据的概率指标而设计的一种数据结构。一般占用固定大小的内存,不随着数据量的增加而增大。这种结构通过巧妙地保存或丢弃一些数据的策略,将数据流的信息抽象存储起来,汇总成 Sketch 结构,最终能根据 Sketch 结构还原始数据的分布,实现基数统计、分位数计算等操作。
Sketch、Summary 都可以理解为数据草图,不同论文中使用的称呼不太一致,但是符号一般都是大写的 S。
Sketch 一般具有以下几个特征:
1. 单次遍历
可以把 Sketch 理解为一个状态存储器,它时刻承载着数据流迄今为止的所有历史信息,因此 Sketch 通常是 single-pass 的,只需要遍历一遍数据即可取得所需的统计信息。
2. 占用空间小
传统的统计方式需要维护一个巨大的数据列表,且随着数据的输入越来越大。Sketch 可以在很小的常量空间内摄入海量的数据,通常在 KB 量级。这使得 Sketch 在海量数据的统计中非常有优势。
对于一个包含上亿条数据、包含多个维度组合的数据集,可以在每个维度组合上转化生成一个 KB 大小的 Sketch 结构,从而加快查询。
3. 可合并性
可合并性使得 Sketch 可以自由地分布式并行处理大量数据,因此具有快速、高效的优势。以基数统计 (Distinct Value, DV)为例,要解决的问题是从具有重复元素的数据流中查找不同元素的数量,Sketch 可以轻易地将局部统计结果合并为全局统计结果,而直接计数则做不到这一点,即:
DV(uid | city=北京 or 上海) ≠ DV(uid | city=北京) + DV(uid | city=上海)
Sketch(uid | city=北京 or 上海) = Sketch(uid | city=北京) + Sketch(uid | city=上海)
PS: 第二个式子中的加号代表 Sketch 的合并操作。
在分布式计算中,两个处在不同机器的 Sketch 的局部统计结果可以直接通过一个 Query 方法合并成一个 Sketch 结构,进行最终的分位数查询。
4. 误差可控的近似值
Sketch 为了节省空间必然会丢失一部分信息,因此统计结果不可能是完全精确的。但在现实中,许多分析和决策也并不要求数据是绝对精确的,有时候知道某个统计数据在 1% 的误差范围内往往跟精确的答案一样有效。Sketch 可以在计算复杂度与误差之间进行权衡,足以满足大数据场景下大部分的统计需求。
一个 Sketch 算法的使用流程通常如下:
原始数据经过转化生成一个 Sketch 结构,当要进行查询的时候,从 Sketch 生成一个 Estimator 返回查询结果
分位点/分位数(Quantile):考虑误差近似,即给定误差 ε 和分位点 φ,只需要给定排序区间[(φ - ε)*N, (φ + ε)*N] 内的任意元素即可(N 为元素个数)。
举个例子,如果给定 input 序列如下,在 ε= 0.1 的情况下,求 PCT50(即 φ = 0.5)。返回 24、39、51 都是可以满足条件的。但如果 ε= 0.01,只有返回 39 才是正确的结果。
φ= 0.5 and ε = 0.1
Rank = [4, 5, 6] → V = [24, 39, 51]
这也就是为什么我们使用 DoublesSketch UDF 或者 Spark 的分位数近似算法的时候可能会出现分位数计算结果不准确的问题。此外,不同的分位数定位方法也会导致数据有细微的差距,后面会提及。
图中,在当前维度下的数据只有两条,针对这两条数据求 50 分位数的结果会不一样。在小基数求分位数问题的时候,数据会出现较大的误差。
quantile 和 rank 实际上是两个函数:
我们期待一对完美的函数,使得 q = quantile(rank(q)) 或者 r = rank(quantile(r)),但现实中,我们往往只能得到两个有误差的函数 r' = rank(q)和 q' = quantile(r)
分位数问题其实就是 quantile(r)问题,即给定 r,根据估计出来的 quantile 函数求出 q'。
函数的误差由多种途径带来:
海量数据必然导致我们需要对数据进行有条件的整合和过滤,由此引入误差。但合理的整合、过滤机制能够将误差控制在一定范围内。为此,无数 researcher 贡献了各种 idea,这也是文档后半部分介绍的主要内容。
重复值也会带来误差。但实际上,如果我们对分位点的定义和边界条件做了正确的假设,那么这种误差已经被考虑在了算法之内,本文不再做详细解释。举一个简单的例子给大家感受一下,不同分位数定位方法与重复值是如何带来误差的:
Example
有五个数据输入进来计算分位点:{10, 20, 20, 20, 30}
上边是原始数据经过排序后的数值和位置对应图,下边则可以想象成一个简单的存储了这些数据的 Sketch 结构。
在计算分位点的时候有两种定位分位点的规则,LT (less-than) 和 LE (less-than or equals)。两者在举例中有区别,但是上升到泛化问题上则区别不大,都属于可接受误差范围内,在这个例子中我们对比 LT 和 LE 规则下得到的结果,能发现在重复值 20 上的 Rank' 和 Quantile' 是不一样的,并且在边界上的值的 Rank' 和 Quantile' 也有误差。
LT (less-than) & GT(greater-than)
LE (less-than or equals) & GE(greater-than-or-equal)
想了解 LT 和 LE 规则的区别可以浏览 DataSketches 官网里关于 Quantiles 的定义[2]
Quantile Sketches 经过层层迭代和不断的演进,已经形成多种变种。以 Apache DataSketches 中的实现总结[3]为例,很多 Sketches 算法早已应用在了诸如 Spark、Hive、Druid 等等大数据开发常用框架中,当我们在 SQL 中调用 percentile 类函数的时候,不同框架会调用其对应的算法。但由于不同框架实现的算法不一样,实现的效率也有高低,最终会导致在使用的时候能明显感知到计算速度的差异。
KLL 和 GK 算法应该是目前被应用最广泛的算法。这里,我们选取两个大数据开发场景下最常用的两个框架 Spark 和 Hive 来举例,对比其中的分位数计算函数 percentile_approx
与一个由 Apache DataSketches 提供的算法实现,并简单讲解一下如何将 Apache DataSketches 提供的更高效的算法引入日常开发工作中。
在 Spark 中计算分位数不需要引入 UDF,Spark 中的 ApproximatePercentile
[4] 类实现了 GK 算法,以 QuantileSummaries
[5] 的结构作为数据 Sketch,后面会提到 GK 算法并且简单解释其概念。这个函数在数据量小的时候的计算效率是比较快的。
同样,在 Hive 中计算分位数可以直接使用原生的分位数计算方法,但该方法背后算法并没有 Spark 中的算法效率高效。Hive 中的 GenericUDAFPercentileApprox
[6] 是通过计算近似数据直方图的方式估算分位数。如 Hive 源码提示,这个函数在数据量巨大的时候可能存在 OOM 的问题。此外,Hive 实现估计直方图的算法主要依据 A Streaming Parallel Decision Tree Algorithm[7]。值得一提的是,这个算法的核心想法是如何在有限的内存中构建数据的直方图。
如果在面对海量数据的时候,Spark 原生分位数计算函数会显得乏力,这时候就推荐引入 DataSketches 提供的分位数计算方法。该算法不光在空间占用上更优,其计算效率也更高。DataSketches 的 DoubleSketch
[8] 是根据 Mergeable Summaries[9] 中提到的 Low Discrepancy Mergeable Quantiles Sketch 算法实现的。这篇论文主要讨论了什么是 Mergeability 并对之前著名的算法的 Mergeability 进行了证明。同时提出了一个结合抽样的 Randomized Mergeable Quantiles Sketch 算法。
作为大数据开发同学,对于该算法需要了解以下几点:
这个算法的 Sketch 结构能够分布式进行。
由于算法引入随机,每次的结果可能不同(但可以通过指定随机种子来固定)。
算法实现上通过压缩、位图等操作进一步节省时空开销。
能用这个算法就用这个算法,快得很!
算法落地到日常开发也很容易,Datasketches 已经提供了多种 UDF、UDAF,基本能够满足常见业务需求。需要先在 pom 文件中加入 org.apache.datasketches 依赖,然后可以根据业务需求,对 datasketches 提供的 UDF、UDAF 再次封装。打包成 jar 包并注册 UDF 之后,就可以在 SQL 中使用了。
Quantile Sketches 算法最早可以追溯到上世纪 90 年代,当流式数据以及分布式计算的概念刚刚在学术界得到普遍的认可,这个问题被抛了出来。如何在海量数据中,甚至无限数据中,使用有限的空间,找到其中的某一个 rank 对应的值,或找到某一个数对应的 rank。
Quantile Sketches 算法的发展依据重要的算法的推出,形成了几个重要的阶段。实际上,这个领域的初心是如何使用最小的内存空间,解决一个最泛化的问题。
什么是最泛化的问题呢?学术界把这种问题称为 All Quantiles Approximation Problem,其定义如下:
与最泛化问题相对应,狭义问题被称为 Single Quantile Approximation Problem。当然还有很多问题的变体,例如,给定 rank 求对应的数值(最常见的分位数问题)或者已知流数据大小求解 rank 或分位数等。
在明确了问题定义后,同样也需要明确对算法的定义,一个合格的 Quantile Estimators 应该具备以下四种特性:
提供可调节的、可以被明确的误差区间。
与输入数据独立。
只遍历一次所有数据。
应使用尽可能小的内存。
合格的 Estimator 并没有对可合并性作出什么显性的要求,mergeability 只是一个 nice to have 的特性。问题定义中并没有考虑 mergeability 的问题,所以有些算法没有实现 mergeability,导致无法完全适配实际生产中的分布式计算模式。即使适配,往往也需要更多空间,更高的计算复杂度。
业界和学术对于探索未知的东西往往具有相同的方法论:先找到最小内存占用的天花板在哪里。通过构造特殊的对抗数据流并且证明了在极端情况下,任何算法都需要的最小的内存。如何找到这个问题的天花板并不是本文的重点,感兴趣的读者可以阅读论文 An Ω (1/ε log 1/ε) Space Lower Bound for Finding ε-Approximate Quantiles in a Data Stream[10] 进一步了解。
数据丢弃
回想之前讲解基础概念的时候提及的例子,我们可以直观的感受到,Sketch 数据结构的一个特性就是对数据进行合理的压缩。压缩后的数据尽可能的能够全面还原数据原本的分布。为了实现这一原理,最直观的想法就是针对每一个输入数据通过某种规则选择丢弃或保留,并确保将误差控制在 ε 以内。那么问题就变成了如何找到合适的丢弃规则。
举个例子,可以根据数据的 index 来判断是否丢弃:对于一个未排序的数据流,丢弃所有偶数位的数字,而保留奇数位的数字。但这显然是一个有缺陷的方法,而且很容易证明:只需要构建一个数据流,将所有较小的数据都放在偶数位,那么留下的数据则都是较大的数据,其中最小的数据也会大于或等于中位数。
从这个例子中得到启发,我们可以先对数据流进行排序,然后再根据上面的原则来丢弃数据,那么这个方法就变得可行了。
权重增加
另一个显而易见的逻辑是,丢弃的数据不能单纯地丢弃,它的信息必须以某种方式保存在未丢弃的数据中。继续上例,偶数位置的数据被丢弃,可以同时增大它前一个奇数位置数据的权重,使得一个奇数位数字表示原本一个奇数位+偶数位的数字。这样,数据量的信息就保存了下来,权重越大,也就意味着在这个区域内的数据越多。
Batch 思想
然而流数据并不支持实时排序,并且随着数据规模增大,排序所需要的时间和空间开销都会不断增长。一个自然的想法是,可以把流数据划分成一个个的 batch,在每一个 batch 内部排序。
下图中的例子结合了数据丢弃和权重增加两个策略。其中,第一行是输入数据被切分成一个个小 batch 并经过内部排序后的样子。第二行表示丢弃所有偶数位的数据,并增加前一位奇数位数据的权重(小方格的高度增大了一倍)。第三行表示丢弃所有奇数位的数据,并增加后一位偶数位数据的权重。可以观察到,一些位置的 Rank 发生改变,如果 Compactor 内部数据是偶数的时候 Rank 不发生改变,如果是奇数则会相对加一或减一。按照这个过程就可以构建一个最简单的 Sketch 结构。
单个 batch 数据压缩问题的思路得到了初步的验证,那么问题来了,单个 batch 如何推演到多个 batch 甚至流数据上的呢?
假设有两个 Sketch 结构,我们想要达成的效果是,当把这两个 Sketch 结构合并成一个之后,仍然能够提供准确的 Rank。
如下图所示,红点表示数据 s_i < v,当我们把两个 Sketch 结构合并后发现,v 在合并后的数据集中的 Rank 就等于两个 Sketch 分别统计红点个数的和,合并数据集的 Rank 不会因为拆分统计而变化。
因此,我们得到两条推论:
但如果单纯地把两组数据拼接在一起,显然会面临数据量增加带来的空间开销增大的问题。此时再回到之前提到的压缩、合并的思路,可以将这个过程不断循环起来,即合并、压缩、再合并、再压缩……
Random Sampling Techniques for Space Efficient Online Computation of Order Statistics of Large Datasets[11](MRL 算法)就是在压缩、合并思想上加以改进得来的。MRL 算法构建了一个树状多层级压缩合并结构:
选择一个固定大小的 k 作为 Sketch 结构的大小。根据流数据量大小可以反推需要压缩的层级数 H。显然,k 越大压缩层级越少,相对丢失的信息就越少,结果就越精确。
过程很简单,原始流数据输出到 level0 的一个 Sketch 结构中,当数据填满了大小为 k 的 Sketch 结构后,如果 level0 没有其他 Sketch,则这个 Sketch 暂时缓存下来,等待另一个 Sketch 到来。如果有另一个 Sketch,则将两个 Sketch 归并排序后保留所有奇数位置的数据,将 2k 大小的数据压缩为 k 大小并传入下一层级。同样,如果下一层级已经存在一个 Sketch,那么进行类似的归并排序和丢弃压缩后,将 Sketch 传入下一层级,层层递进。
k 的大小直接影响数据准确性,甚至还决定了这个算法能否收敛,比较合适的取值为 k = O(ε^{−1} _ log^2(εn))。MRL 算法在每一层级只需要两个 Sketch 结构存储数据。层级数决定了所需要的总共空间大小,而层级数的是根据总数据量和单个 Sketch 结构大小推演得到的:
O(H) = O(log^2(n/k)) = O(log^2(εn)), 总共空间 O(kH) = O(ε^{-1} _ log^2(εn))。
假如现有一亿个数字,我们想要对其中的某一个数字 x 求他的 Rank 是多少,Reservoir-Based Random Sampling with Replacement from Data Stream[12] 告诉我们,通过随机抽样我们可以用下面的公式从样本估计整体的 Rank:
但这个是一个近似公式。一个严谨的算法需要搞清楚,这东西有多近似。碰巧存在这么一个不等式 Hoeffding's inequality 并根据 Hoeffding's inequality 推出了这么一个定理
作为使用方,能理解证明是最好的,所以指路这篇文章的 VI 部分[13]。但是如果不理解证明,那么下面这段话会告诉你这个证明干了什么,有什么重要的意义(个人理解,可能存在不准确的地方,欢迎指正)。
本质上,Hoeffding's inequality 给出了随机变量的和与其期望值偏差的概率上限。这个不等式是一个 Bernoulli 过程的一个衍生(关于 Hoeffding's inequality 更详细的指路这里[14])。这种随机抽样到估计一个数字的 Rank 也是存在联系的(误差上限)。这种上限恰巧能够让把估计误差限定在一个范围内,满足了 Quantile Sketch 算法的要求。况且抽样的过程将 Rank 结果与数据量 n 独立开来。结果的误差只取决于 m。而 m 是我们可以设定的一个数,换句话说,m 是我们可以设定的一个内存大小,这个内存大小决定了抽样后估计的 Rank 的误差上限。
可以观察到,这里所需要的内存大于 MRL 算法。但是它的最大意义在于移除了与数据量大小 n 的关系。需要明确的是,这种抽样需要预先知道数据量大小 n,如果不知道数据量大小 n,抽样仍然是有用的,但需要随着数据量增大而降低抽样概率。详情见[13]
GK 算法 Space-Efficient Online Computation of Quantile Summaries[15] 的灵感来源于下面这个想法:假设我们收到的流数据的第一个数字就是中位数,那么我们只需要随着数据的输入统计大于这个数的数量和小于这个数的数量,最后就能很轻易的验证这个数是不是中位数。我们能不能对 k 个输入的数据都保持这个一个结构,记录大于这个结构的数据的个数和小于这个结构的数据的个数,那能找到对应数字的 Rank 是多少了。
存在下面一个结构,每一个 tuple 存储真实值、最小 Rank、最大 Rank,每一个 tuple 叫 Summary:
对于任意数据,只要满足下面的公式,那么算法总体误差就是收敛的。
但是这个结构存在缺陷。在处理流数据的时候,如果新来的数据需要插入中间,那么每次都需要更新后面所有的 Summary。这样更新的复杂度实在太高了。GK 算法就此提出了一个针对插入数据操作更友好的 Summary 结构:
将绝对位置转化成相对位置进行表示。
g 表示两个相邻 Summary 的相对位置差异,根据 g 和一个起始点我们能够推演出这个起始点和 Summary 的距离是多少。
Δ 表示这个 Summary 覆盖的 Rank 的范围是多少。