跳转至

这个文档简要解析了 Rocksdb 的两阶段提交的实现。

这个工程会被分解为五个关注部分:

  • 修改 WAL 格式
  • 拓展已有的事务 API
  • 修改写流程
  • 修改恢复流程
  • 与 MyRocks 整合

修改 WAL 日志

WAL 包含一个或多条日志。每个日志都是一个或者更多序列化的 WriteBatches。恢复过程中,WriteBatches 会通过日志重新构建。为了修改 WAL 格式或者拓展他的功能,我们只需要考虑我们的 WriteBatches。

一个 WriteBatches 是一个排序好的记录集合(Put(k,v), Merge(k,v), Delete(k), SingleDelete(k)),他们代表了 RocksDB 的写操作。每个记录有一个二进制字符串来表示。记录加入到一个 WriteBatch 的时候,他们的二进制表示内容会被追加到 WriteBatch 的二进制字符串表示后面。这个二进制字符串的前缀是一个批处理的开始序列号,之后是批处理中记录的数量。如果操作不是应用于 default 列族,那么每个记录可能会有一个列族修改记录作为前缀。

一个 WriteBatch 可以通过拓展 WriteBatch::Handler 被遍历。MemTableInserter 是一个 WriteBatch::Handler 的拓展,他把一个 WriteBatch 包含的操作插入到正确的列族的 Memtable。

一个已有的 WriteBatch 可能有以下逻辑表示:

Sequence(0);NumRecords(3);Put(a,1);Merge(a,1);Delete(a);

为了实现 2PC,对 WriteBatch 格式的修改包括增加四个新的记录。

  • Prepare(xid)
  • EndPrepare(xid)
  • Commit(xid)
  • Rollback(xid)

一个支持 2PC 的 WriteBatch 可能有以下逻辑表示:

Sequence(0);NumRecords(6);Prepare(foo);Put(a,b);Put(x,y);EndPrepare();Put(j,k);Commit(foo);

可以看到 Prepare(xid) 和 EndPrepare() 的关系有点类似于括号,会包含 ID 为 'foo' 的事务的操作。Commit(xid) 和 Rollback(xid) 标记表示 ID 为 xid 的事务的操作需要被提交或者回滚。

序列 ID 分布

当一个 WriteBatch 被插入到一个 memtable(通过 MemTableInserter 插入),每个操作的序列 ID 等于 WriteBatch 的序列 ID 加上 这个 WriteBatch 之前的记录消耗的的序列号。这个隐式的 WriteBatch 序列号 ID 映射在 2PC 加入后将不再存在。在于给 Prepare() 中包含的操作会消耗序列号 ID,方式是以相对 Commit() 标记的相对的位置进行消耗。这个 Commit() 标记可能在另一个 WriteBatch 或者来自他执行准备操作的日志。

向后兼容

WAL 格式没有版本号,所以我们需要注意向后兼容。一个当前版本的 RocksDB 不能回复一个带有 2PC 标记的 WAL 日志。实际上他可能会因为无法识别记录 id 而崩溃。然而,这不重要,只需要给当前版本的 RocksDB 打补丁让他可以在遇到新的 WAL 格式的时候跳过 prepared 节和未知标记即可。

当前进度

参考 这里

拓展 Transaction API

我们现阶段只关注悲观事务的 2PC。客户端必须提前声明他们是否需要使用 2PC 语义。例如,客户端代码可能是这样的:

TransactionDB* db;
TransactionDB::Open(Options(), TransactionDBOptions(), "foodb", &db);

TransactionOptions txn_options;
txn_options.two_phase_commit = tr
txn_options.xid = "12345";
Transaction* txn = db->BeginTransaction(write_options, txn_options);

txn->Put(...);
txn->Prepare();
txn->Commit();

一个事务对象现在拥有有更多的状态,所以我们修改状态的枚举:

enum ExecutionStatus {
  STARTED = 0,
  AWAITING_PREPARE = 1,
  PREPARED = 2,
  AWAITING_COMMIT = 3,
  COMMITED = 4,
  AWAITING_ROLLBACK = 5,
  ROLLEDBACK = 6,
  LOCKS_STOLEN = 7,
};

事务 API 会有一个新的成员函数 Prepare()。Prepare() 会调用 WriteImpl,把他自身的环境配置告诉 WriteImpl,并且 WriteThread 访问的 ExecutionStatus,XID 和 WriteBatch。WriteImpl 会插入 Prepare(xid) 标记,然后是 WriteBatch 的内容,之后是 EndPrepare() 标记。不会发起 memtable 插入操作。当同一个事务对象发起提交,再一次,他调用到 WriteImpl。这次,只有一个 Commit() 标记被插入到对应的 WAL,并且 WriteBatch 的内容会被插入到对应的 memtable。当对应事务的 Rollback() 被调用,事务的内容会被清理,并且如果事务已经就绪,调用 WriteImpl,以插入一个 Rollback(xid) 标记。

这些所谓的 ' 元标记 '(Prepare(xid), EndPrepare(), Commit(xid), Rollback(xid))不会直接插入到一个写批处理中。写流程(WriteImpl)会持有正在写入的事物的环境变量。它使用这个环境来插入相应的标记到 WAl 中(这样他们就被插入到完整的 WriteBatch 前面,中间不会有其他 WriteBatch)。恢复的时候,这些标记会被 MemTableInserter 发现,他会使用这个来重新构造之前的准备好的事务。

事务时钟超时

目前,如果一个事务超时,这个事务提交有一个回调会失败。类似的,如果一个事务超时,那么他的锁就可以被其他事务偷取。这些机制在 2PC 中应该被保留 —— 差别是超时回调会在准备的时候被调用。如果事务在准备阶段没有超时,那么他不会再提交的时候超时。

TransactionDB 修改

为了使用事务,用户必须打开一个 TransactionDB。这个 TransactionDB 实例之后被用于构造 Transaction。这个 TransactionDB 现在记录一个 XID 到所有已经创建的两阶段提交事务的映射。当一个事务被删除或者回滚,他从映射中被删除。同时有一个 API 用于查询所有准备好的事务。这个在 MyRocks 恢复的时候被使用。

TransactionDB 同事还追踪一个所有包含准备段的日志号码的最小堆。当一个事务是 ' 准备好 ',他的 WriteBatch 会被写入一个日志,这个日志号会被存储在事务对象以及他的最小堆。当一个事务提交,他的日志号码会从小顶堆中删除,但是他不会被遗忘!现在需要 memtable 记录他需要的最老日志,直到他被落盘到 L0。

写流程的修改

写流程可以被分解为两个主要关注的区域。DBImpl::WriteImpl(...) 和 MemTableInserter。多个客户端线程会调用到 WriteImpl。第一个线程会被指定为 leader,而一系列跟随的线程会被指定为 ' 跟随者 '。leader 和一系列的跟随者会被聚在一起,变成一个逻辑组,指向一个 ' 写组 '。leader 会处理该组的所有 WriteBatches 请求,把它们组合在一起,然后写出到 WAl。根据写组的大小以及当前 memtable 是否愿意支持并行写入,leader 可能会插入所有 WriteBatches 到 memtable 或者 让每个线程分别插入他们的的 WriteBatch 到 memtable。

所有 memtable 插入都是由 MemTableInserter 处理的。这是一个 WriteBatch::Handler 的实现 —— 一个 WriteBatch 迭代处理器。这个处理器遍历 WriteBatch 的所有元素(Put, Delete, Merge, 等待),并且对当前的 MemTable 执行对应的调用。MemTableInserter 也会处理原地合并,删除和更新。

对写路径的修改会包括增加一个可选参数给 DBImpl::WriteImpl。这个可选参数会是一个指针,指向写入数据的两阶段事务实例。这个对象会告诉写流程,当前两阶段事务的状态。一个 2PC 事务会在准备,提交,回滚分别调用一次 WriteImpl —— 尽管提交和回滚都是互斥的操作。

Status DBImpl::WriteImpl(
  const WriteOptions& write_options, 
  WriteBatch* my_batch,
  WriteCallback* callback,
  Transaction* txn
) {
  WriteThread::Writer w;
  //...
  w.txn = txn; // writethreads also have txn context for memtable insert

  // we are now the group leader
  int total_count = 0;
  uint64_t total_byte_size = 0;
  for (auto writer : write_group) {
    if (writer->CheckCallback(this)) {
      if (writer->ShouldWriteToMem())
        total_count += WriteBatchInternal::Count(writer->batch)
       }
  }
  const SequenceNumber current_sequence = last_sequence + 1;
  last_sequence += total_count;

  // now we produce the WAL entry from our write group
  for (auto writer : write_group) {
    // currently only optimistic transactions use callbacks
    // and optimistic transaction do not support 2pc
   if (writer->CallbackFailed()) {
      continue;
    } else if (writer->IsCommitPhase()) {
      WriteBatchInternal::MarkCommit(merged_batch, writer->txn->XID_);
    } else if (writer->IsRollbackPhase()) {
      WriteBatchInternal::MarkRollback(merged_batch, writer->txn->XID_);
    } else if (writer->IsPreparePhase()) {
      WriteBatchInternal::MarkBeginPrepare(merged_batch, writer->txn->XID_);
      WriteBatchInternal::Append(merged_batch, writer->batch);
      WriteBatchInternal::MarkEndPrepare(merged_batch);
      writer->txn->log_number_ = logfile_number_;
    } else {
      assert(writer->ShouldWriteToMem());
      WriteBatchInternal::Append(merged_batch, writer->batch);
    }
  }
  //now do MemTable Inserts for WriteGroup
}

WriteBatchInternal::InsertInto 可能会被修改为只迭代没有 Transaction 的写者或者 COMMIT 状态的 Transaction。

写流程对 MemTableInserter 的修改

如你上面所见,当一个事务已经准备好,事务记录他准备段的日志号。在插入的时候,每个 MemTable 必须跟踪插入到他内部的准备段的最小日志号码。这个修改会发生在 MemTableInserter 里。我们会在日志声明周期部分讨论这个值如何使用。

恢复路径的修改

当前的恢复路径已经非常适合 2PC 了。他按时间顺序迭代所有日志里的所有批处理,然后根据日志号码,提供给 MemTableInserter。MemTableInserter 之后迭代每个批处理,然后把值插入到正确的 MemTable。每个 MemTable 根据当前恢复中的日志编码,知道哪些值他可以忽略。

为了使恢复流程可以在 2PC 下工作,我们只需要修改 MemTableInserter,让他能理解我们四个新的 ' 元标记 '。

记住:当一个 2PC 事务被提交,他包含多个列族的插入(多个 memtable)。这些 memtable 会在不同时间落盘。我们仍旧使用 CF 日志号码来避免以恢复,两阶段,已提交的事务的重复插入。

考虑下列场景:

  • 两阶段事务 TXN 插入到 CFA 和 CFB
  • TXN 在 LOG 1 准备好
  • TXN 在 LOG 2 标记为 COMMITTED
  • TXN 被插入到 MemTables
  • CFA 落盘到 L0
  • CFA 的 log_number 现在是 LOG 3
  • CFB 没有落盘,并且他仍旧指向 LOG 1 准备段
  • 崩溃恢复
  • LOG 1 仍然存在,因为 CFB 在引用 LOG 1 准备段。
  • 迭代从 LOG 1 开始的日志
  • CFB 把准备好的数据插入 memtable, 再次引用 LOG 1 的准备段
  • CFA 跳过 LOG 2 的提交标记的插入,因为他在 LOG 3 是一致的。
  • CFB 落盘到 L0 并且现在 LOG 3 也是一致的了。
  • LOG 1, LOG 2 可以被释放了。

重建事务

如前面所述,恢复路径的修改只要求修改 MemTableInserter 来处理新的元标记。因为在恢复的时候,我们不能访问一个完整的 TransactionDB 实例,我们必须凭空构造一个事务。这实质上是为所有恢复起来的准备好的事务构造一个 XID->(WriteBatch,log_numb) 的映射。当我们遇到一个 Commit(xid) 标记,我们尝试重新找到这个 xid 对应的事务,并且重新插入到 Mem。如果我们遇到一个 rollback(xid) 标记,我们删除这个事务。在恢复的最后,我们只有一个包含所有准备好的事务的集合。之后我们通过这些对象构造完整的事务,获取需要的锁。RocksDB 现在已经恢复到崩溃/关闭前的状态了。

日志生命周期

为了找出必须保留的最小日志,我们先找到每个列族的最小 log_number_。

我们同时必须考虑在 TransactionDB 中已经准备好的段的堆的最小值。这代表了包含一个准备段但是没有提交的最早的日志。

我们同事必须考虑所有 Memtable 以及还没有落盘的 ImmutableMemTables 引用的准备段日志的最小值。

上面三个的最小值就是最早的还持有没有刷入 L0 的数据的日志。