跳转至

leveldb 源码分析 8

本系列《leveldb 源码分析》共有 22 篇文章,这是第八篇

6 SSTable 之 2

6.4 创建 sstable 文件

了解了 sstable 文件的存储格式,以及 Data Block 的组织,下面就可以分析如何创建 sstable 文件了。相关代码在 table_builder.h/.cc 以及 block_builder.h/.cc(构建 Block)中。

6.4.1 TableBuilder 类

构建 sstable 文件的类是 TableBuilder,该类提供了几个有限的方法可以用来添加 k/v 对,Flush 到文件中等等,它依赖于 BlockBuilder 来构建 Block。

TableBuilder 的几个接口说明下:

  1. void Add(const Slice& key, const Slice& value),向当前正在构建的表添加新的 {key, value} 对,要求根据 Option 指定的 Comparator,key 必须位于所有前面添加的 key 之后;
  2. void Flush(),将当前缓存的 k/v 全部 flush 到文件中,一个高级方法,大部分的 client 不需要直接调用该方法;
  3. void Finish(),结束表的构建,该方法被调用后,将不再会使用传入的 WritableFile;
  4. void Abandon(),结束表的构建,并丢弃当前缓存的内容,该方法被调用后,将不再会使用传入的 WritableFile;【只是设置 closed 为 true,无其他操作
  5. 一旦Finish()/Abandon()方法被调用,将不能再次执行 Flush 或者 Add 操作。

下面来看看涉及到的类,如图 6.3-1 所示。img 图 6.3-1

其中 WritableFile 和 op log 一样,使用的都是 内存映射 文件。Options 是一些调用者可设置的选项。

TableBuilder 只有一个成员变量 Rep* rep_,实际上 Rep 结构体的成员就是 TableBuilder 所有的成员变量;这样做的 目的可能是为了隐藏其内部细 节。Rep 的定义也是在.cc 文件中,对外是透明的。

简单解释下成员的含义:

Options options;              // data block的选项
Options index_block_options;  // index block的选项
WritableFile* file;           // sstable文件
uint64_t offset; 
// 要写入data block在sstable文件中的偏移,初始0
Status status;                //当前状态-初始ok
BlockBuilder data_block;      //当前操作的data block
BlockBuilder index_block;     // sstable的index block
std::string last_key;         //当前data block最后的k/v对的key
int64_t num_entries;          //当前data block的个数,初始0
bool closed;                  //调用了Finish() or Abandon(),初始false
FilterBlockBuilder*filter_block; 
//根据filter数据快速定位key是否在block中
bool pending_index_entry;     //见下面的Add函数,初始false
BlockHandle pending_handle;   //添加到index block的data block的信息
std::string compressed_output;//压缩后的data block,临时存储,写入后即被清空

Filter block 是存储的 过滤器信息,它会存储 {key, 对应 data block 在 sstable 的偏移值},不一定完全精确 的,以快速定位给定 key 是否在 data block 中。

下面分析如何向 sstable 中添加 k/v 对,创建并持久化 sstable。其它函数都比较简单,略过。另外对于 Abandon,简单设置 closed=true 即返回。

6.4.2 添加 k/v 对

这是通过方法Add(constSlice& key, const Slice& value)完成的,没有返回值。下面分析下函数的逻辑:

1

S1 首先保证文件没有 close,也就是没有调用过 Finish/Abandon,以及保证当前 status 是 ok 的;如果当前有缓存的 kv 对,保证新加入的 key 是最大的。

Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) 
{
    assert(r->options.comparator->Compare(key, Slice(r->last_key))> 0);
}

S2 如果标记 r->pending_index_entry 为 true,表明遇到下一个 data block 的第一个 k/v,根据 key 调整 r->last_key,这是通过 Comparator 的 FindShortestSeparator 完成的。

if (r->pending_index_entry) 
{
     assert(r->data_block.empty());
     r->options.comparator->FindShortestSeparator(&r->last_key,key);
     std::string handle_encoding;
     r->pending_handle.EncodeTo(&handle_encoding);
     r->index_block.Add(r->last_key, Slice(handle_encoding));
     r->pending_index_entry =false;
}

接下来将 pending_handle 加入到 index block 中{r->last_key, r->pending_handle’sstring}。最后将 r->pending_index_entry 设置为 false。

值得讲讲 pending_index_entry 这个标记的意义,见代码注释:

直到 遇到 下一个 databock 的第一个 key 时,我们才为上一个 datablock 生成 index entry,这样的 好处 是:可以为 index 使用较短的 key;比如上一个 data block 最后一个 k/v 的 key 是 "the quick brown fox",其后继 data block 的第一个 key 是 "the who",我们就可以用一个 较短的字符串"the r" 作为上一个 data block 的 index block entry 的 key。

简而言之,就是在开始 下一个 datablock 时,Leveldb 才将上一个 data block 加入到 index block 中。标记 pending_index_entry 就是干这个用的,对应 data block 的 index entry 信息就保存在(BlockHandle)pending_handle。

S3 如果 filter_block 不为空,就把 key 加入到 filter_block 中。

if (r->filter_block != NULL) 
{
    r->filter_block->AddKey(key);
}

S4 设置 r->last_key = key,将 (key, value) 添加到 r->data_block 中,并更新 entry 数。

r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key,value);

S5 如果 data block 的个数超过限制,就立刻 Flush 到文件中。

const size_testimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >=r->options.block_size)  Flush();

6.4.3 Flush 文件

该函数逻辑比较简单,直接见代码如下:

Rep* r = rep_;
assert(!r->closed);               // 首先保证未关闭,且状态ok
if (!ok()) return;
if (r->data_block.empty())return; // data block是空的
// 保证pending_index_entry为false,即data block的Add已经完成
assert(!r->pending_index_entry);
// 写入data block,并设置其index entry信息—BlockHandle对象
WriteBlock(&r->data_block, &r->pending_handle);
//写入成功,则Flush文件,并设置r->pending_index_entry为true,
//以根据下一个data block的first key调整index entry的key—即r->last_key
if (ok()) 
{
     r->pending_index_entry =true;
     r->status =r->file->Flush();
}
if (r->filter_block != NULL)
{ 
     //将data block在sstable中的便宜加入到filter block中
     r->filter_block->StartBlock(r->offset); 
     // 并指明开始新的data block
}

6.4.4 WriteBlock 函数

在 Flush 文件时,会调用 WriteBlock 函数将 data block 写入到文件中,该函数同时还设置 data block 的 index entry 信息。原型为:

void WriteBlock(BlockBuilder* block, BlockHandle* handle)

该函数做些预处理工作,序列化 要写入的 data block,根据需要压缩数据,真正的写入逻辑是在 WriteRawBlock 函数中。下面分析该函数的处理逻辑。

2

S1 获得 block 的序列化数据 Slice,根据配置参数决定是否压缩,以及根据压缩格式压缩数据内容。对于 Snappy 压缩,如果压缩率太低<12.5%,还是作为未压缩内容存储。

BlockBuilder 的 Finish() 函数将 data block 的数据序列化成一个 Slice

Rep* r = rep_;
Slice raw = block->Finish(); 
// 获得data block的序列化字符串
Slice block_contents;
CompressionType type =r->options.compression;
switch (type)
{
     case kNoCompression: block_contents= raw; break; // 不压缩
     case kSnappyCompression: 
     { 
          // snappy压缩格式
          std::string* compressed =&r->compressed_output;
          if(port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
             compressed->size()< raw.size() - (raw.size() / 8u)) 
          {
                block_contents =*compressed;
          } 
          else 
          { 
                 // 如果不支持Snappy,或者压缩率低于12.5%,依然当作不压缩存储
                 block_contents = raw;
                 type = kNoCompression;
          }
          break;
     }
}  

S2 将 data 内容写入到文件,并重置 block 成初始化状态,清空 compressedoutput。

WriteRawBlock(block_contents,type, handle);  
r->compressed_output.clear();  
block->Reset();

6.4.5 WriteRawBlock 函数

在 WriteBlock 把准备工作都做好后,就可以写入到 sstable 文件中了。来看函数原型:

void WriteRawBlock(const Slice& data, CompressionType, BlockHandle*handle);

函数逻辑很简单,见代码。

Rep* r = rep_;
handle->set_offset(r->offset); 
// 为index设置data block的handle信息
handle->set_size(block_contents.size());
r->status =r->file->Append(block_contents); // 写入data block内容
if (r->status.ok()) 
{
     // 写入1byte的type和4bytes的crc32
     chartrailer[kBlockTrailerSize];
     trailer[0] = type;
     uint32_t crc = crc32c::Value(block_contents.data(),
                                  block_contents.size());
     crc = crc32c::Extend(crc, trailer, 1);  // Extend crc tocover block type
     EncodeFixed32(trailer+1, crc32c::Mask(crc));
     r->status =r->file->Append(Slice(trailer, kBlockTrailerSize));
     if (r->status.ok()) 
     { 
          // 写入成功更新offset-下一个data block的写入偏移
          r->offset +=block_contents.size() + kBlockTrailerSize;
      }
}

6.4.6 Finish 函数

调用 Finish 函数,表明调用者将所有已经添加的 k/v 对 持久化到 sstable,并关闭 sstable 文件。

该函数逻辑很清晰,可 分为 5 部分

3

S1 首先调用 Flush,写入最后的一块 data block,然后设置关闭标志 closed=true。表明该 sstable 已经关闭,不能再添加 k/v 对。

1  Rep* r = rep_;
2  Flush();
3  assert(!r->closed);
4  r->closed = true;
5  BlockHandle filter_block_handle,metaindex_block_handle, index_block_handle;

S2 写入 filter block 到文件中。

if (ok() &&r->filter_block != NULL) 
{
     WriteRawBlock(r->filter_block->Finish(), kNoCompression,&filter_block_handle);
}

S3 写入 meta index block 到文件中。

如果 filterblock 不为 NULL,则加入从 "filter.Name" 到 filter data 位置的映射。通过 meta index block,可以根据 filter 名字快速定位到 filter 的数据区。

if (ok()) 
{
     BlockBuildermeta_index_block(&r->options);
     if (r->filter_block !=NULL) 
     {
          //加入从"filter.Name"到filter data位置的映射
          std::string key ="filter.";
          key.append(r->options.filter_policy->Name());
          std::string handle_encoding;
          filter_block_handle.EncodeTo(&handle_encoding);
          meta_index_block.Add(key,handle_encoding);
     }
     // TODO(postrelease): Add stats and other metablocks
     WriteBlock(&meta_index_block, &metaindex_block_handle);
}

S4 写入 index block,如果成功 Flush 过 data block,那么需要为最后一块 data block 设置 index block,并加入到 index block 中。

if (ok()) 
{
     if (r->pending_index_entry)
     { 
          // Flush时会被设置为true
          r->options.comparator->FindShortSuccessor(&r->last_key);
          std::string handle_encoding;
          r->pending_handle.EncodeTo(&handle_encoding);
          r->index_block.Add(r->last_key, Slice(handle_encoding)); 
          // 加入到index block中
          r->pending_index_entry =false;
      }
      WriteBlock(&r->index_block, &index_block_handle);
}

S5 写入 Footer。

if (ok()) 
{
     Footer footer;
     footer.set_metaindex_handle(metaindex_block_handle);
     footer.set_index_handle(index_block_handle);
     std::string footer_encoding;
     footer.EncodeTo(&footer_encoding);
     r->status =r->file->Append(footer_encoding);
     if (r->status.ok()) 
     {
          r->offset +=footer_encoding.size();
     }
}

整个写入流程就分析完了,对于 Datablock 和 Filter Block 的操作将在 Data block 和 Filter Block 中 单独分析,下面的读取相同。