这个页面描述了 RocksDB 的 Merge 功能的内部实现细节。这篇文章面向高级 Rocksdb 工程师以及/或其他 Facebook 中对 Merge 如何工作感兴趣的工程师。
如果你是一个 RocksDB 的用户,并且你只是希望知道如何在生产环境使用 Merge,参考 客户端接口 页面。否则,这里假设你已经度过这个页面
内容¶
下面是我们为了实现 Merge 进行的代码修改的高度概括:
- 我们创建了一个抽象类,名为 MergeOperator,用户需要继承这个基类。
- 我们更新了 Get,iteration 以及 Compaction() 调用路径,以便在必要的时候调用 MergeOperator 的 FullMerge() 和 PartialMerge() 函数。
- 主要需要做的修改是实现 " 入栈的 " 合并运算元,我们会在下面描述。
- 我们引入了一些其他的接口变更(比如,更新了 Options 类以及 DB 类以支持 MergeOperator)
- 我们创建了一个更简单的 AssociativeMergeOperator 以帮助用户简化一些通用使用场景。注意这个可能降低效率
对于读者,如果上面的描述不足以在一个高层次进行描述,你可能应该先阅读 客户端接口。否则,我们直接进入下面的细节,并聊一些设计决定和依据,以解释我们最终的实现。
接口¶
这里快速过一下接口(假设读者已经熟悉这部分内容):
// The Merge Operator
//
// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, edit data structure, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class MergeOperator {
public:
virtual ~MergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success, false on failure/corruption/etc.
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const = 0;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types.
// Save the result in *new_value and return true. If it is impossible
// or infeasible to combine the two operations, return false instead.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
};
RocksDB 数据模型¶
在详细解释 merge 如何工作之前,我们先简单介绍一下 RocksDB 的数据模型。
简而言之,RocksDB 是一个带版本号的 kv 存储。每个对 DB 的修改都会有一个全局排序并且单调递增的序列号。对于每个 key,RocksDB 保留操作的历史。我们用 OPi 表示每个操作。一个 key(K) 经历了 n 次修改,逻辑上大概这样(物理上,这些修改可能会在活跃 memtable,不可变 memtable 或者 level 文件):
K: OP1 OP2 OP3 ... OPn
一个操作有三个属性:他的类型——要么是一个 Delete,要么是一个 Put(现在我们还有 Merge),他的序列号,和他的值(Delete 可以被认为是一个没有值的退化场景)。序列号会不断增加,但是对单一的 key 不保证连续性,因为他们在全局被所有的 key 共享。
当一个客户端发起 db->Put 或者 db->Delete 请求,这个库简单地把这个操作追加到历史记录里。不会检查这个 key 是否存在,大概是为了性能考虑吧(怪不得 Delete 在 key 不存在的时候也不会报错了。。。)
那么 db->Get 呢?他返回某个时间点的 key 的值,时间点通过序列号指定。key 的状态可以是不存在,或者是一个无法理解的字符数值。他从不存在开始。每个操作吧 key 移动到一个新的状态。在这个场景,每个 key 是一个使用操作进行状态转移的状态机。
从状态机的视角看,Merge 是一个通用状态转移操作,他先确认当前状态(已有值,或者不存在),然后与运算元(Merge 操作引入的值)结合,然后生成一个新值(状态)。Put,是 Merge 的一个退化场景。Delete 则更进一步——他甚至没有运算元,并且总是把 key 带回到他的原始状态——不存在。
Get¶
实践中,Get 返回一个 key 在特定时间的状态。
K: OP1 OP2 OP3 .... OPk .... OPn
^
|
Get.seq
假设 OPk 是 Get 可以看到的最后一次操作:
k = max(i) {seq(OPi) <= Get.seq}
那么如果 OPk 是一个 Put 或者 Delete,Get 只需要简单返回 (Put 指定的) 值或者 (因为 Delete 导致)NotFound 状态。他可以忽略前面的值。
对于新的 Merge 操作,我们需要向后看。我们要看多远呢?直到一个 Put 或者 Delete(之后的历史无所谓了)。
K: OP1 OP2 OP3 .... OPk .... OPn
Put Merge Merge Merge
^
|
Get.seq
-------------------->
以上面的情况为例,Get 应该返回这样的东西:
Merge(...Merge(Merge(operand(OP2), operand(OP3)), operand(OP4)..., operand(OPk))))
在内部,RocksDB 会根据 key 的历史,从新到旧进行处理。RocksDB 内部数据结构支持一个很好的“二分搜索”风格 Seek 函数。所以,提供一个序列号,他可以高效地返回:k = max(i) {seq(OPi) <= Get.seq}。然后从 OPk 开始,他会遍历历史,直到发现一个 Put 或者 Delete。
为了真正地做好合并,rocksdb 使用两个特殊的 MergeOperator 方法:FullMerge() 和 PartialMerge()。客户接口页提供了一个很好的概述,展示这些函数在上层的意义。但是,为了保证完整性,需要知道 PartialMerge 是一个可选函数,用于合并两个 merge 操作(运算元)为一个运算元。例如,合并 OP(k-1) 和 OPk 以生成 OP',他也是一个合并操作类型。一旦 PartialMerge 不能合并两个运算元,他返回 false,告诉 rocksdb 让他自行处理运算元。怎么做的?好吧,内部,rocksdb 提供一个内存栈型数据结构(我们实际使用了 STL 的 Deque)来堆叠操作元,维护他们的相对顺序,直到一个 Put/Delete 操作,此时 FullMerge 被用于在基础值之上处理运算元列表。
Get 的算法如下:
```Get(key): Let stack = [ ]; // in reality, this should be a "deque", but stack is simpler to conceptualize for this pseudocode for each entry OPi from newest to oldest: if OPi.type is "merge_operand": push OPi to stack while (stack has at least 2 elements and (stack.top() and stack.second_from_top() can be partial-merged) OP_left = stack.pop() OP_right = stack.pop() result_OP = client_merge_operator.PartialMerge(OP_left, OP_right) push result_OP to stack else if OPi.type is "put": return client_merge_operator.FullMerge(v, stack); else if v.type is "delete": return client_merge_operator.FullMerge(nullptr, stack);
// We've reached the end (OP0) and we have no Put/Delete, just interpret it as empty (like Delete would) return client_merge_operator.FullMerge(nullptr, stack);
因此,Rocksdb 会堆叠操作直到他遇到一个 Put 或者一个 Delete(或者 key 历史的开始处),然后会按照顺序把数据当参数,调用用户定义的 FullMerge 操作。在上面的例子,他从 OPk 开始,然后 OPk-1,。。。,等等。当 Rocksdb 遇到 OP2,他会有一个像 [OP3, OP4, ..., OPk] 的合并运算元栈(OP3 在栈顶)。他之后会调用用户定义的 MergeOperator::FullMerge(key, existing_value = OP2, operands = [OP3, OP4, ..., OPk])。这个结果会返回给用户。
# 压缩
这一部分比较有趣,rocksdb 的最重要的后台线程。压缩是一个减少 key 历史数据,但是不影响外部观察结果的过程。什么事外部观察结果?基本上, 通过一个指定序列号的一个快照。举个例子:
对于每个快照,我们可以定义 Supporting 操作为最后可以被快照观察到的的操作。(OP2 是 snapshot1 的 Supporting 操作,OP4 是 snapshot2 的 Supporting 操作。。。)
显然,我们没法删除任何 Supporting 操作,而不影响外部观测结果。那么其他操作呢?在引入 Merge 前,我们可以丢弃所有非 Supporting 操作。在上面的例子,一个全压缩可以删除 K 的历史,直到只剩 OP2 OP4 和 OPn。原因很简单,Put 和 Delete 操作都是快捷操作,他们隐藏之前的操作。
而 merge,过程就不一样了。尽管有些运算元不是任何快照的 Supporting 操作,我们还是不能简单删除它,因为后续的 merge 操作可能依赖他来生成结果。同事,实际上,这意味着我们甚至不能丢弃之前的 Put 或者 Delete 操作,因为他们可能被后续的 merge 依赖。
那么我们怎么办呢、我们从最新的处理到最老的,“堆叠”(以及/或者 PartialMerging)merge 运算元。在下面的任意条件成立的时候(看谁先发生),我们停止堆叠过程然后处理栈
- 一个 Put/Delete 发生——我们调用 FullMerge(值或者 nullptr,stack)
- key 的历史结束——我们调用 FullMerge(nullptr, stack)
- 遇到一个 Supporting 操作(快照)——参考下面
- 文件结束——参考下面
签名两个例子有点类似于 Get()。如果你看到一个 Put,调用 FullMerge(put 的值, stack)。如果你看到一个 delete,类似。
压缩引入两个新的场景。首先,如果遇到一个快照,我们必须停止合并过程。当这个发生,我们简单写出未合并的运算元,清空栈,然后继续压缩(从 Supporting 操作开始)。类似的,如果我们完成了压缩(“文件结束”),我们不能简单执行 FullMerge(nullptr, stack),因为我们可能没有看到 key 的历史的开始;可能有一些文件刚好没有被包含到这一次压缩中来。因此,在这种场景,我们也只能简单写出未合并的运算元,然后清空栈。两种例子中,所有合并运算元都变成了类似于“Supporting 操作”的东西,不能被丢弃。
这里 PartialMerge 的角色则是为了加快压缩。由于他可以支持类似于“文件结束”的场景,可能大多数的合并运算元都可以被亚索吊。因此,支持 Partial Merge 的 Merge Operator 对压缩来说更简单,因为剩下的运算元不会被堆叠,而是在写出到新文件前合并为一个运算元。
# 例子
我们来介绍一个实际的例子,以解释上面的规则。比如有一个计数器 K,从 0 开始,经过一系列的 Add 操作,被重置为 2,然后继续更多的 Add 操作。现在一个全量压缩发生(同时还有一些外部可见的快照)—— 会发生什么呢?
我们一步步看, 我们从最新的操作扫描到最老的操作
K: 0 +1 +2 +3 +4 +5 2 (+1 +2) ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
一个 Merge 操作消费了之前的 Merge 操作,生成了一个新的 Merge 操作(或者一个栈)(+1 +2) => PartialMerge(1,2) => +3
K: 0 +1 +2 +3 +4 +5 2 +3 ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
K: 0 +1 +2 +3 +4 +5 (2 +3) ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
一个合并操作消费了之前的 Put 操作,然后生成了一个新的 Put 操作
(2 +3) => FullMerge(2, 3) => 5
一个新处理的 Put 操作结果仍旧是 Put,因此隐藏所有非 Supporting 操作
(+5 5) => 5
K: 0 +1 +2 (+3 +4) 5 ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
(+3 +4) => PartialMerge(3,4) => +7
K: 0 +1 +2 +7 5 ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
Merge 操作无法消费之前的 Supporting 操作 (+2 +7) 无法合并
(+1 +2) => PartialMerge(1,2) => +3
K: 0 +3 +7 5 ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
K: (0 +3) +7 5 ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
(0 +3) => FullMerge(0,3) => 3
K: 3 +7 5 ^ ^ ^ | | | snapshot1 snapshot2 snapshot3
总结起来:压缩中,如果一个 Supporting 操作是 Merge,他会(通过 PartialMerge 或者堆叠)和之前的操作组合,直到:
- 遇到另一个 PartialMerge 操作(换句话说,我们跨过了快照的边界)
- 遇到一个 Put 或者一个 Delete 操作,我们把 Merge 操作转换为 Put 操作
- 遇到 key 历史的结束,我们把 Merge 操作转换为 Put
- 遇到压缩文件的结束,我们认为跨越了快照边界
注意上面的例子,我们假设 merge 操作定义了 PartialMerge。对于没有定义 PartialMerge 的操作,运算元会被组合刀一个栈中,直到其中一个条件触发。
# 这个压缩模型的问题
如果 Put/Delete 没有找到,比如,如果 Put/Delete 发生在另一个文件,没有被压缩,那么压缩会简单地一个个地写出 key,就好像他们没有被压缩一样。主要的问题是我们做了很多无用功来把它们压入 deque。
类似的,如果有一个单一的 key,有 大量 merge 操作应用于他,如果没有 PartialMerge,那么所有这些操作必须被存储在内存。最后,可能会导致内存溢出或者类似的问题。
**未来的可能方案**:为了避免为何 stack/deque 的内存使用,可能遍历这个列表两次会更好,一次向前找到一个 Put/Delete,然后一次反向。这可能会需要大量的磁盘 IO,但是这只是一个建议。最后我们决定不这么做,因为大多数(如果不是全部)工作符合下,内存处理应该对每个单独的 key 都是足够的。未来,可以围绕这个进行讨论和压力测试。
# 压缩算法
算法上,压缩现在这么工作:
Let stack = []; // in reality, this should be a "deque", but stack is simpler to conceptualize in this pseudo-code for each v from newest to oldest in input: clear_stack = false if v.sequence_number is in snaps: clear_stack = true else if stack not empty && v.key != stack.top.key: clear_stack = true
if clear_stack:
write out all operands on stack to output (in the same order as encountered)
clear(stack)
if v.type is "merge_operand":
push v to stack
while (stack has at least 2 elements and (stack.top and stack.second_from_top can be partial-merged)):
v1 = stack.pop();
v2 = stack.pop();
result_v = client_merge_operator.PartialMerge(v1,v2)
push result_v to stack
if v.type is "put":
write client_merge_operator.FullMerge(v, stack) to output
clear stack
if v.type is "delete":
write client_merge_operator.FullMerge(nullptr, stack) to output
clear stack
If stack not empty: if end-of-key-history for key on stack: write client_merge_operator.FullMerge(nullptr, stack) to output clear(stack) else write out all operands on stack to output clear(stack)
return output ```
压缩过程中选择上层文件¶
注意,所有的合并运算元的相对顺序应该固定。由于所有迭代器”一层层地“搜索数据库,我们不希望在早期的 level 发现新旧运算元顺序相反的情况。所以,我们需要更新压缩过程,当他选择压缩的文件的时候,他拓展他的上层文件,以包含所有”更早的“合并运算元。为什么改变这个?因为当任何项目被压缩,他总是移动到一个更低的层。所以如果对于给定的 key,其合并运算元分散在同一层的各个文件,但是只有一些进入了压缩,那么有可能出现一种奇怪的情况,就是更新的 key 被压缩到了更底层的 level。
** 技术上来说,这是经典 rocksdb 的一个 bug! ** 特别地,这个问题总是在那,但是对于大多数不使用合并运算的应用,可以假设每个层都有一个 key 的一个版本(除了 level0),因为压缩总是合并重复的 Put,只要简单的记录最后的 put 的值。因此这个交换顺序的概念无关紧要,除了在 level-0(压缩总是包括所有有交集的文件)。所以这被修复了。
一些问题:在恶意输入下,这会导致总是要在压缩的时候包含大量文件,即使系统只希望挑选一个文件。这可能会降低速度,但是压力测试显示这不会是个问题。
效率相关的笔记¶
关于 merge 和部分 merge 的性能的快速讨论。
使用一个运算元的栈可以更高效。比如,一个追加字符串的例子(假设没有部分 merge),给用户提供一个堆叠的 string 运算元集合来追加允许用户分散构造最终字符串的压力。例如,如果我给出了一个 1000 个需要追加的小字符串的列表,我可以使用这些字符串来计算最后的字符串的大小,预分配空间,然后处理并拷贝所有数据到新分配的内存数组。作为比较,如果我强制要求总是部分合并,系统需要执行 1000 次重新分配,开销非常大。在多数使用场景,这可能不是问题;在我们的压力测试,我们发现这只对一个巨大的有热点 key 的内存数据库有影响,但是这需要考虑”增长数据“。在所有场景,我们都给用户提供了选择。
最重要的是,存在使用运算元栈(而不是一个运算元)提供一个显著增加操作运算效率的方法。比如,在上面的追加字符串的例子,合并运算符可以让字符追加操作做到大概 O(N) 时间(N 是最后字符串的长度),而不使用栈,我们需要使用 O(N^2) 时间复杂度。
更多信息,请联系 rocksdb 团队,或者参考 RocksDB 的 wiki 页。