本页描述了 Rocksdb 里的 Read-modify-write 原子操作,也叫 "Merge" 操作。这只是一个接口概述,面向那些提出:“什么时候我需要使用 Merge,以及如何使用”的问题的客户端以及 RocksDB 使用者。
为什么¶
Rocksdb 是一个高性能的嵌入式持久化 key-value 存储。它提供一些简单操作 Get,Put,Delete 来实现一个优雅的 Loopup-table-like 接口
通常,有一些通用的模式来更新一个已经存在的 value。为了在 rocksdb 里面实现这个,客户端会需要读(Get)一个已经存在的值,修改,然后写(put)回 db。来看一个常见例子。
想象我们在维护一系列的 uint64 的计数器。每个计数器都有一个不同的名字。我们希望实现四个高级操作: Set,Add,Get 以及 Remove。
首先,我们定义接口来保证语义正确。错误处理被放到一边以保证代码清晰。
class Counters {
public:
// (re)set the value of a named counter
virtual void Set(const string& key, uint64_t value);
// remove the named counter
virtual void Remove(const string& key);
// retrieve the current value of the named counter, return false if not found
virtual bool Get(const string& key, uint64_t *value);
// increase the named counter by value.
// if the counter does not exist, treat it as if the counter was initialized to zero
virtual void Add(const string& key, uint64_t value);
};
然后,我们使用现有 RocksDB 接口实现它。伪代码如下:
class RocksCounters : public Counters {
public:
static uint64_t kDefaultCount = 0;
RocksCounters(std::shared_ptr<DB> db);
// mapped to a RocksDB Put
virtual void Set(const string& key, uint64_t value) {
string serialized = Serialize(value);
db_->Put(put_option_, key, serialized));
}
// mapped to a RocksDB Delete
virtual void Remove(const string& key) {
db_->Delete(delete_option_, key);
}
// mapped to a RocksDB Get
virtual bool Get(const string& key, uint64_t *value) {
string str;
auto s = db_->Get(get_option_, key, &str);
if (s.ok()) {
*value = Deserialize(str);
return true;
} else {
return false;
}
}
// implemented as get -> modify -> set
virtual void Add(const string& key, uint64_t value) {
uint64_t base;
if (!Get(key, &base)) {
base = kDefaultValue;
}
Set(key, base + value);
}
};
注意,除了 Add 操作,所有其他三个操作都可以直接映射到 rocksdb 的一个操作。这不是坏事。然而,一个概念上的单一操作,Add,却只能映射到两个 rocksdb 操作。这里也有一个性能上的隐患——随机 Get 在 rocksdb 里面相对较慢。
现在,加入我们需要维护一个 Counter 服务。根据现在的服务器核心数,我们的服务几乎肯定是多线程的。如果线程不是根据键空间进行分片,可能同一个计数器的多个 Add 操作,会被多个线程并行之行。好吧,如果我们还有严格的一致性需求(任何一个更新都不能被丢失),我们可能需要在 Add 外面加一个额外的同步,比如锁之类的。压力就变大了。
如果 rocksdb 直接支持 Add 操作呢?我们可以提出如下代码:
virtual void Add(const string& key, uint64_t value) {
string serialized = Serialize(value);
db->Add(add_option, key, serialized);
}
这对计数器来说很合理。但你的 RocksDB 里面并不全是计数器。比如我们需要追踪一个用户去过哪些地方。我们可以在这个用户的 key 下面存储一个(序列化的)地址列表。往这个列表增加新的地址可能是一个通用操作。我们在这种情况下可能需要使用 Append 操作:db->Append(user_key, serialize(new_location))。这意味着 read-modify-write 操作的语义是由客户端定义的。为了保证库的通用性,我们最好能抽象出这个操作,然后允许客户端声明语义。这就带出了我们的提案:Merge。
是什么¶
我们以一个一等操作的待遇在 rocksdb 里面开发了一个通用 Merge 操作来捕获 read-modify-write 语义
这个 Merge 操作:
- 把 read-modify-write 语义概括成一个简单的抽象接口
- 允许用户避免引入额外的重复的 Get 调用。
- 使用后端优化来决定什么时候/如何在不改变语义的情况下合并操作运算元
- 可以在某些情况下分期偿还所有增量更新带来的压力,以提供有效的渐增式累加。
如何使用¶
在接下来的段落,那些客户端的代码将被说明。我们还提供一个简单的大纲来说明如何使用 Merge。我们假定读者已经知道如何使用经典的 Rocksdb(或者 leveldb),包括:
- DB 类(包括构造,DB::Put(), DB::Get(), 以及 DB::Delete())
- Options 类(以及指导如何在创建的基础上声明数据库配置)
- 知道所有写入的数据库 key/value 都是简单的字符串字节
接口概览¶
我们定义了一个新的接口/虚基类:MergeOperator。它导出了一些函数,告诉 RocksDB 如何使用基础值(Put/Delete)组合增量更新操作(又叫“合并运算元”)。这些函数还可以用来告诉 rocksdb 如何组合这些合并操作来构造一个新的合并操作(称为“部分 [Partial]”或者“结合性 [Associative]”合并)。
简单起见,我们暂时不关注 Partial 和非 Partial 的区别。我们已经提供了一个独立的接口,名为 AssociativeMergeOperator,他描述并隐藏了了部分合并的所有细节。对于大多数简单的应用(比如我们的 64Bit 计数器),这足够了。
读者应该假设所有合并都是通过一个名为 AssociativeMergeOperator 的接口来处理的。这是这个公用接口:
// The Associative Merge Operator interface.
// Client needs to provide an object implementing this interface.
// Essentially, this class specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class AssociativeMergeOperator : public MergeOperator {
public:
virtual ~AssociativeMergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
private:
...
};
一些需要注意的地方
- AssociativeMergeOperator 是一个名为 MergeOperator 类的子类。后面我们将看到这个更加通用化的 MergeOperator 在特定场合将更加强大。而我们用的 AssociativeMergeOperator,则更加简单明了。
- existing_value 可能是 nullptr。在 Merge 操作是一个 key 的第一个操作的时候非常有用。nullptr 暗示着这个 key 没有‘已经存在的’值。这基本需要推迟到客户端决定如何在没有预设值的时候如何处理一个 merge 操作。客户端可以做任何合理的事情。比如, Counters::Add 假设如果一个 key 不存在,那么他就是一个 0 值。
- 如果 key 的空间是分片的,并且不同的子空间指向不同的数据类型并且拥有不同的合并操作语义,那么我们把 key 传入,然后客户端就可以多路复用这个合并操作了。例如,客户端可能选择把一个用户的余额(一个数字)存在一个名为“BAL:uid”的键下面,然后把账号的活动信息(一个列表)存入一个名为“HIS:uid”的键下面,他们存在在同一个 DB 中。(这是不是一个好的实践其实非常有争议)。对于当前余额,数字加法是完美的合并操作;对于活动信息,我们需要一个 list append 操作。因此,通过把 key 传给 Merge 回调,我们使客户端支持能区分两种类型:
void Merge(...) {
if (key start with "BAL:") {
NumericAddition(...)
} else if (key start with "HIS:") {
ListAppend(...);
}
}
其他客户端可见的接口变化¶
为了在一个应用里使用 Merge,客户端必须先定义一个类,继承 AssociativeMergeOperator 接口(或者 MergeOperator 接口)。这个对象类应该实现这个接口的所有方法,不管是不是需要合并,他们都最终会被 rocksdb 在合适的时机调用。这样,合并语义就完全实现了客户端定制了。
定义好这个类之后,用户需要一个特别的方法告诉 rocksdb 需要使用合并操作符实现合并。我们已经在 DB 类和 Options 类里面引入了额外的字段/方法来完成这个工作。
// In addition to Get(), Put(), and Delete(), the DB class now also has an additional method: Merge().
class DB {
...
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is
// determined by the user provided merge_operator when opening DB.
// Returns Status::NotSupported if DB does not have a merge_operator.
virtual Status Merge(
const WriteOptions& options,
const Slice& key,
const Slice& value) = 0;
...
};
Struct Options {
...
// REQUIRES: The client must provide a merge operator if Merge operation
// needs to be accessed. Calling Merge on a DB without a merge operator
// would result in Status::NotSupported. The client must ensure that the
// merge operator supplied here has the same name and *exactly* the same
// semantics as the merge operator provided to previous open calls on
// the same DB. The only exception is reserved for upgrade, where a DB
// previously without a merge operator is introduced to Merge operation
// for the first time. It's necessary to specify a merge operator when
// opening the DB in this case.
// Default: nullptr
const std::shared_ptr<MergeOperator> merge_operator;
...
};
注意 Options::merge_operator 字段是一个 shared 指针指向一个 MergeOperator。就像上面说的,AssociativeMergeOperator 继承自 MergeOperator,所以我们可以在这里声明一个 AssociativeMergeOperator。这也是下面的例子用的方法。
客户端代码修改¶
有了上面的接口修改,客户端现在可以实现一个新版的 Counters 类,直接使用内建的 Merge 操作了。
Counters v2:
// A 'model' merge operator with uint64 addition semantics
class UInt64AddOperator : public AssociativeMergeOperator {
public:
virtual bool Merge(
const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// assuming 0 if no existing value
uint64_t existing = 0;
if (existing_value) {
if (!Deserialize(*existing_value, &existing)) {
// if existing_value is corrupted, treat it as 0
Log(logger, "existing value corruption");
existing = 0;
}
}
uint64_t oper;
if (!Deserialize(value, &oper)) {
// if operand is corrupted, treat it as 0
Log(logger, "operand value corruption");
oper = 0;
}
auto new = existing + oper;
*new_value = Serialize(new);
return true; // always return true for this, since we treat all errors as "zero".
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
};
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public RocksCounters {
public:
MergeBasedCounters(std::shared_ptr<DB> db);
// mapped to a leveldb Merge operation
virtual void Add(const string& key, uint64_t value) override {
string serialized = Serialize(value);
db_->Merge(merge_option_, key, serialized);
}
};
// How to use it
DB* dbp;
Options options;
options.merge_operator.reset(new UInt64AddOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db(dbp);
MergeBasedCounters counters(db);
counters.Add("a", 1);
...
uint64_t v;
counters.Get("a", &v);
用户界面的修改非常小。RocksDB 后端处理剩下的部分。
Associativity VS 非 Associativity¶
到现在为止,我们已经了一个相对简单的例子来维护一个计数器数据库。似乎上面说的 AssociativeMergeOperator 已经足够处理这种类型的操作了。例如,如果你希望用“append”操作维护一个字符串集合,那么我们目前看到的已经可以很简单地处理这个需求了。
那么为什么这些都被认为是“简单的”?好吧,我们隐式地假设了这个数据的一个特征:结合性。这意味着我们假设:
- 通过 Put 放入 RocksDB 的数据和通过 Merge 操作的格式相同
- 用同一个用户定义的合并操作可以将多个合并组合成一个合并操作
比如,以 Counters 为例。Rocksdb 数据库内部将每个值存储为有序的 8byte 的整形。所以,当客户端调用 Counters::Set(对应于 DB::Put()),参数是有相同格式的。类似的,当客户调用 Counters::Add(对应于一个 DB::Merge())的时候,merge 操作也是一个序列化的 8-byte 整形。这意味着,在客户的 UInt64AddOperator 里,那个 *existing_value
可能指向原始的 Put(),或者意味着一个合并操作元,这不重要!在所有情况中,只要 *existing_value
以及数值给出,UInt64AddOperator 就能用同一种方式操作:他把他们加在一起,然后计算 *new_value
。最后,这个 *new_value
会回馈给后面的合并操作,根据 merge 调用的顺序。
但是,RocksDB 的合并操作似乎可以有更强大的功能。比如,我们希望我们的数据库存储一个 json 字符串集合(比如 PHP 对象数组)。那么在数据库里,我们希望她们可以以完全格式化的 json 字符串被存储并查找,但是我们可能希望“更新”操作来更新一个 json 对象里的某个属性。所以我们可能会写这样的代码:
...
// Put/store the json string into to the database
db_->Put(put_option_, "json_obj_key",
"{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
...
// Use a pre-defined "merge operator" to incrementally update the value of the json string
db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");
在上面的伪代码中,我们看到,数据会在 RocksDB 里面被存储为一个 json 字符串(映射到 Put 操作),但是客户端需要更新一个值,一个“javascript 风格”的赋值声明串会被作为合并元被传入。数据库会按照原样存储这些字符,然后希望用户的合并操作符来处理逻辑。
现在,AssociativeMergeOperator 模型就无法处理这个了,仅仅因为他假设了我们上面提到的结合律。也就是说,在这个例子里,我们需要明确声明基础值(json 字符串)和合并操作元(赋值声明),而且我们没有一个直观的方法来将一个合并运算元和另一个合并运算元组合。所以这个使用离子不符合我们的“结合律”合并模型。这就是通用 MergeOperator 接口有用的地方。
通用 MergeOperator 接口¶
MergeOperator 接口被设计用于支持抽象并暴露部分关键方法来在 RocksDB 里面实现一个提供有效方案来实现“增量更新”。就像我们上面提到的 json 的例子,可能基本的数据类型(Put 进数据库的)可能和更新他们的运算元有完全不同的格式。而且,我们会看到导出一些合并运算元可以组合成一个单独的合并运算元的事实有时候是有好处的,但是有时候又是不好的(原文很绕,大概来说就是需要提供一个方法判断几个合并运算元是不是可以合并为一个运算元)。这都要看客户端定义的语义。MergeOperator 接口提供了一个相对简单的方法来从客户端获取这些信息:
// The Merge Operator
//
// Essentially, a MergeOperator specifies the SEMANTICS of a merge, which only
// client knows. It could be numeric addition, list append, string
// concatenation, edit data structure, ... , anything.
// The library, on the other hand, is concerned with the exercise of this
// interface, at the right time (during get, iteration, compaction...)
class MergeOperator {
public:
virtual ~MergeOperator() {}
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const = 0;
struct MergeOperationInput { ... };
struct MergeOperationOutput { ... };
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const;
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types.
// Save the result in *new_value and return true. If it is impossible
// or infeasible to combine the two operations, return false instead.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
// Determines whether the MergeOperator can be called with just a single
// merge operand.
// Override and return true for allowing a single operand. FullMergeV2 and
// PartialMerge/PartialMergeMulti should be implemented accordingly to handle
// a single operand.
virtual bool AllowSingleOperand() const { return false; }
};
一些注意的点:
- MergeOperator 有两个方法,FullMerge 和 PartialMerge。第一个方法在 Put/Delete 是 *existing_value (或者 nullptr) 时被使用。后面的方法在组合两个合并运算元的时候(如果可以)被使用。
- AssociativeMergeOperator 只需要继承 MergeOperator 然后提供私有的默认方法实现就行了,然后简单暴露一个包裹好的函数。
- 在 MergeOperator 里,FullMerge 函数提供一个 *existing_value
以及一个队列 (std::deque) 的合并运算元,而不是单个运算元。我们后面会解释。
这些方法如何工作的?¶
在上层,需要注意到,任何调用 DB::Put() 或者 DB:Merge 都不需要强制树枝马上被计算或者合并马上发生。Rocksdb 会或多或少地懒惰地决定什么时候需要执行这些操作(例如,下次用户调用 Get 的时候,或者当系统决定调用名为“压缩”的清理流程的时候)。这意味着,当 MergeOperator 真正被调用,可能会有多个“入栈的”运算元需要被执行。因此,MergeOperator::FullMerge() 方法提供一个 *existing_value
以及一个压栈的运算元列表。MergeOperator 应该一个接一个地执行这些运算元(或者任何客户端决定的优化方法,保证最终 *new_value
会被按要求计算成所有运算元执行的结果)
部分合并 VS 入栈¶
有时候,可能在系统遇到运算元就调用合并操作会更好,而不是入栈。MergeOperator::PartialMerge 就是为此准备的。如果客户声明运算符可以逻辑上处理“组合”两个运算元位一个单独的运算元,对应的语义就应该提供这个方法,然后应该返回 true。如果逻辑上不行,就简单的保留 *new_value
不变,然后返回 false 即可。
理论上,当库决定入栈然后执行操作,他先尝试对每一对运算元执行用户定义的 PartialMerge。只要这个操作返回了 false,他就会被插入到栈内,直到他遇到一个 Put/Delete 的值,他才会调用 FullMerge 操作,把所有运算元当参数传入。通常来说,这个最后的 FullMerge 应该返回 true。只有当有坏格式数据的时候才应该返回 false。
AssociativeMergeOperator 怎么做的?¶
就像上面说的,AssociativeMergeOperator 继承自 MergeOperator 并且允许用户声明一个单独的合并操作。他覆盖了 PartialMerge 和 FullMerge 让他们使用 AssociativeMergeOperator::Merge()。所以他才可以在合并运算元和设置基础值的时候使用。这也是为什么他只能在符合“结合律”的假设的时候使用。
什么时候允许单一合并元¶
基本来说,合并运算只会在有两个合并运算元的时候被调用。覆盖 AllowSingleOperand 方法使之返回 true 如果你需要合并操作符在只有一个运算元的时候被调起。一个使用例子就是如果你用合并操作来基于 TTL 修改数值,这样他就会在压缩之后被删除(或者用一个压缩过滤器)
JSON 例子¶
使用我们的通用 MergeOperator 接口,我们现在有能力实现我们的 json 例子了。
// A 'model' pseudo-code merge operator with json update semantics
// We pretend we have some in-memory data-structure (called JsonDataStructure) for
// parsing and serializing json strings.
class JsonMergeOperator : public MergeOperator { // not associative
public:
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const override {
JsonDataStructure obj;
if (existing_value) {
obj.ParseFrom(existing_value->ToString());
}
if (obj.IsInvalid()) {
Log(logger, "Invalid json string after parsing: %s", existing_value->ToString().c_str());
return false;
}
for (const auto& value : operand_list) {
auto split_vector = Split(value, " = "); // "xyz[0] = 5" might return ["xyz[0]", 5] as an std::vector, etc.
obj.SelectFromHierarchy(split_vector[0]) = split_vector[1];
if (obj.IsInvalid()) {
Log(logger, "Invalid json after parsing operand: %s", value.c_str());
return false;
}
}
obj.SerializeTo(new_value);
return true;
}
// Partial-merge two operands if and only if the two operands
// both update the same value. If so, take the "later" operand.
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const override {
auto split_vector1 = Split(left_operand, " = "); // "xyz[0] = 5" might return ["xyz[0]", 5] as an std::vector, etc.
auto split_vector2 = Split(right_operand, " = ");
// If the two operations update the same value, just take the later one.
if (split_vector1[0] == split_vector2[0]) {
new_value->assign(right_operand.data(), right_operand.size());
return true;
} else {
return false;
}
}
virtual const char* Name() const override {
return "JsonMergeOperator";
}
};
...
// How to use it
DB* dbp;
Options options;
options.merge_operator.reset(new JsonMergeOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db_(dbp);
...
// Put/store the json string into to the database
db_->Put(put_option_, "json_obj_key",
"{ employees: [ {first_name: john, last_name: doe}, {first_name: adam, last_name: smith}] }");
...
// Use the "merge operator" to incrementally update the value of the json string
db_->Merge(merge_option_, "json_obj_key", "employees[1].first_name = lucy");
db_->Merge(merge_option_, "json_obj_key", "employees[0].last_name = dow");
错误处理¶
如果 MergeOperator::PartialMerge() 返回 false,意味着这个合并需要被推迟(入栈)直到我们遇到一个 Put/Delete 数值来进行 FullMerge。然而,如果 FullMerge 返回 false,这就会被认为是“中断”错误。这意味着 RocksDB 会给客户返回一个 Status::Corruption 之类的消息。因此, MergeOperator::FullMerge() 应该在只有客户绝对无法处理这个错误的时候返回 false。
对于 AssociativeMergeOperator,Merge 方法使用跟 MergeOperator::FullMerge() 一样的错误处理逻辑。只有在没有办法从逻辑上处理这个错误的时候返回 false。在上面的计数器例子,我们的 Merge 方法总是返回 true,因为我们会把任何错误的值视为 0。
检查以及最佳实践¶
总的来说,我们已经描述了合并操作,以及如何用它。这里有一些提示,关于什么时候,如何使用 MergeOperator 和 AssociativeMergeOperator。
什么时候用 merge¶
如果下面为真:
- 你的数据需要增量更新
- 你需要在知道新数据之前读数据
那么使用上面说的两个合并操作符吧。
结合数据¶
如果下面是真:
- 如果你的合并运算元根你 Put 的值有一样的格式
- 把多个运算元组合成一个运算元是 ok 的(只要组合的顺序正确)
那么,就用 AssociativeMergeOperator
通用合并¶
如果上面的两条有一条不满足,那么使用 MergeOperator。
如果某些时候可以把多个运算元合并成一个(但不总是可以): - 使用 MergeOpertaor - 必要时让 PartialMerge 返回 true
Tips¶
多路复用:RocksDB DB 对象在构造的时候只能传入一个合并操作符,你的用户定义的合并操作类可以根据传入的数据进行不同的处理。key 和 value 本身,会被传递给合并运算符;所以大家可以在运算元里面实现不同的运算方法,然后在 MergeOperator 里面各自使用不同的函数。
我的用例是不是符合结合律?:如果你不确定是不是能满足“结合律”,你总是可以使用 MergeOperator 的。AssociativeMergeOperator 是 MergeOperator 的直接子类,所以任何使用 AssociativeMergeOperator 可以解决的问题,在通用 MergeOperator 都可以解决。AssociativeMergeOperator 只是为了提供便利性。
有用的链接¶
合并+压缩实现细节 为那些希望了解合并操作符对他们的代码的影响的 RocksDb 工程师提供。