leveldb 源码分析 21¶
本系列《leveldb 源码分析》共有 22 篇文章,这是第二十一篇
14 DB 的查询与遍历之 1¶
分析完如何打开和关闭 db,本章就继续分析如何 从 db 中根据 key 查询 value,以及 遍历整个 db。
14.1 Get()¶
函数声明:StatusGet(const ReadOptions& options, const Slice& key, std::string* value) 从 DB 中查询 key 对应的 value,参数@options 指定读取操作的选项,典型的如 snapshot 号,从指定的快照中读取。快照本质上就是一个 sequence 号,后面将单独在快照一章中分析。 下面就来分析下函数逻辑:
// S1 锁mutex,防止并发,如果指定option则尝试获取snapshot;然后增加MemTable的引用值。
MutexLock l(&mutex_);
SequenceNumber snapshot;
if (options.snapshot != NULL)
snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
else snapshot = versions_->LastSequence(); // 取当前版本的最后Sequence
MemTable *mem = mem_, *imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
current->Ref();
// S2 从sstable文件和MemTable中读取时,释放锁mutex;之后再次锁mutex。
bool have_stat_update = false;
Version::GetStats stats;
{
mutex_.Unlock();
// 先从memtable中查询,再从immutable memtable中查询
LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) {
}
else if (imm != NULL && imm->Get(lkey, value, &s)) {
}
else { // 需要从sstable文件中查询
s = current->Get(options, lkey, value, &stats);
have_stat_update = true; // 记录之,用于compaction
}
mutex_.Lock();
}
// S3 如果是从sstable文件查询出来的,检查是否需要做compaction。最后把MemTable的引用计数减1。
if (have_stat_update &¤t->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
mem->Unref();
if (imm != NULL)imm->Unref();
current->Unref();
查询是比较简单的操作,UpdateStats 在前面 Version 一节已经分析过。
14.2 NewIterator()¶
函数声明:Iterator*NewIterator(const ReadOptions& options) 通过该函数生产了一个 Iterator 对象,调用这就可以基于该对象遍历 db 内容了。 函数很简单,调用两个函数创建了一个二级 Iterator*。
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
SequenceNumber latest_snapshot;
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
returnNewDBIterator(&dbname_, env_, user_comparator(), internal_iter,
(options.snapshot != NULL
? reinterpret_cast<constSnapshotImpl*>(options.snapshot)->number_
: latest_snapshot));
}
其中,函数 NewDBIterator 直接返回了一个 DBIter 指针
Iterator* NewDBIterator(const std::string* dbname, Env* env,
const Comparator*user_key_comparator, Iterator* internal_iter,
const SequenceNumber& sequence) {
return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
}
函数 NewInternalIterator 有一些处理逻辑,就是收集所有能用到的 iterator,生产一个 Merging Iterator。这包括 MemTable,Immutable MemTable,以及各 sstable。
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
SequenceNumber*latest_snapshot) {
IterState* cleanup = newIterState;
mutex_.Lock();
// 根据last sequence设置lastest snapshot,并收集所有的子iterator
*latest_snapshot = versions_->LastSequence();
std::vector<Iterator*>list;
list.push_back(mem_->NewIterator()); // >memtable
mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator()); // >immutablememtable
imm_->Ref();
}
versions_->current()->AddIterators(options, &list); // >current的所有sstable
Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref();
// 注册清理机制
cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
mutex_.Unlock();
return internal_iter;
}
这个清理函数 CleanupIteratorState 是很简单的,对注册的对象做一下 Unref 操作即可。
static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1);
state->mu->Lock();
state->mem->Unref();
if (state->imm != NULL)state->imm->Unref();
state->version->Unref();
state->mu->Unlock();
delete state;
}
可见对于 db 的遍历依赖于 DBIter 和 Merging Iterator 这两个迭代器,它们都是 Iterator 接口的实现子类。
14.3 MergingIterator¶
MergingIterator 是一个合并迭代器,它内部使用了一组自 Iterator,保存在其成员数组 children_ 中。如上面的函数 NewInternalIterator,包括 memtable,immutable memtable,以及各 sstable 文件;它所做的就是根据调用者指定的 key 和 sequence,从这些 Iterator 中找到合适的记录。 在分析其 Iterator 接口之前,先来看看两个辅助函数 FindSmallest 和 FindLargest。FindSmallest从 0 开始向后遍历内部 Iterator 数组,找到 key 最小的 Iterator,并设置到 current_;FindLargest 从 最后一个向前遍历内部 Iterator 数组,找到 key 最大的 Iterator,并设置到 current_; MergingIterator 还定义了两个移动方向:kForward,向前移动;kReverse,向后移动。
14.3.1 Get 系接口¶
下面就把其接口拖出来一个一个分析,首先是简单接口,key 和 value 都是返回 current_ 的值,current_ 是当前 seek 到的 Iterator 位置。
virtual Slice key() const {
assert(Valid());
return current_->key();
}
virtual Slice value() const {
assert(Valid());
return current_->value();
}
virtual Status status() const {
Status status;
for (int i = 0; i < n_; i++) { // 只有所有内部Iterator都ok时,才返回ok
status = children_[i].status();
if (!status.ok()) break;
}
return status;
}
14.3.2 Seek 系接口¶
然后是几个 seek 系的函数,也比较简单,都是依次调用内部 Iterator 的 seek 系函数。然后做 merge,对于 Seek 和 SeekToFirst 都调用 FindSmallest;对于 SeekToLast 调用 FindLargest。
virtual void SeekToFirst() {
for (int i = 0; i < n_; i++) children_[i].SeekToFirst();
FindSmallest();
direction_ = kForward;
}
virtual void SeekToLast() {
for (int i = 0; i < n_; i++) children_[i].SeekToLast();
FindLargest();
direction_ = kReverse;
}
virtual void Seek(constSlice& target) {
for (int i = 0; i < n_; i++) children_[i].Seek(target);
FindSmallest();
direction_ = kForward;
}
14.3.3 逐步移动¶
最后就是 Next 和 Prev 函数,完成迭代遍历。这可能会有点绕。下面分别来说明。 首先,在 Next 移动时,如果当前 direction 不是 kForward 的,也就是上一次调用了 Prev 或者 SeekToLast 函数,就需要先调整除 current 之外的所有 iterator,为什么要做这种调整呢?啰嗦一点,考虑如下的场景,如图 14.3-1 所示。
图 14.3-1 Next 的移动
当前 direction 为 kReverse,并且有:Current = memtable Iterator。各 Iterator 位置为:{memtable, stable 0, sstable1} ={ key3:1:1, key2:3:1, key2:1:1},这符合 prev 操作的 largest key 要求。 注:需要说明下,对于每个 update 操作,leveldb 都会赋予一个全局唯一的 sequence 号,且是递增的。例子中的 sequence 号可理解为每个 key 的相对值,后面也是如此。 接下来我们来分析 Prev 移动的操作。 第一次 Prev,current(memtable iterator) 移动到 key1:3:0 上,3 者中最大者变成 sstable0;因此 current 修改为 sstable0; 第二次 Prev,current(sstable0 Iterator) 移动到 key1:2:1 上,3 者中最大者变成 sstable1;因此 current 修改为 sstable1: 此时各 Iterator 的位置为 {memtable, sstable 0, sstable1} = { key1:3:0, key1:2:1, key2:2:1},并且 current=sstable1。 接下来再调用 Next,显然当前 Key() 为 key2:2:1,综合考虑 3 个 iterator,两次 Next() 的调用结果应该是 key2:1:1 和 key3:1:1。而 memtable 和 sstable0 指向的 key 却是 key1:3:0 和 key1:2:1,这时就需要调整 memtable 和 sstable0 了,使他们都定位到 Key() 之后,也就是 key3:1:1 和 key2:3:1 上。 然后 current(current1)Next 移动到 key2:1:1 上。这就是 Next 时的调整逻辑,同理,对于 Prev 也有相同的调整逻辑。代码如下:
virtual void Next() {
assert(Valid());
// 确保所有的子Iterator都定位在key()之后.
// 如果我们在正向移动,对于除current_外的所有子Iterator这点已然成立
// 因为current_是最小的子Iterator,并且key() = current_->key()。
// 否则,我们需要明确设置其它的子Iterator
if (direction_ != kForward) {
for (int i = 0; i < n_; i++) { // 把所有current之外的Iterator定位到key()之后
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid() && comparator_->Compare(key(), child->key()) == 0)
child->Next(); // key等于current_->key()的,再向后移动一位
}
}
direction_ = kForward;
}
// current也向后移一位,然后再查找key最小的Iterator
current_->Next();
FindSmallest();
}
virtual void Prev() {
assert(Valid());
// 确保所有的子Iterator都定位在key()之前.
// 如果我们在逆向移动,对于除current_外的所有子Iterator这点已然成立
// 因为current_是最大的,并且key() = current_->key()
// 否则,我们需要明确设置其它的子Iterator
if (direction_ != kReverse) {
for (int i = 0; i < n_; i++) {
IteratorWrapper* child = &children_[i];
if (child != current_) {
child->Seek(key());
if (child->Valid()) {
// child位于>=key()的第一个entry上,prev移动一位到<key()
child->Prev();
}
else { // child所有的entry都 < key(),直接seek到last即可
child->SeekToLast();
}
}
}
direction_ = kReverse;
}
//current也向前移一位,然后再查找key最大的Iterator
current_->Prev();
FindLargest();
}
这就是 MergingIterator 的全部代码逻辑了,每次 Next 或者 Prev 移动时,都要重新遍历所有的子 Iterator 以找到 key 最小或最大的 Iterator 作为 current_。这就是 merge 的语义所在了。 但是它没有考虑到删除标记等问题,因此直接使用 MergingIterator 是不能正确的遍历 DB 的,这些问题留待给 DBIter 来解决。