当前位置: 代码迷 >> 综合 >> Rocksdb 的 WAL实现 底层探索
  详细解决方案

Rocksdb 的 WAL实现 底层探索

热度:42   发布时间:2024-02-11 13:35:58.0

作为通用的单机存储引擎,一个基本的crash safe功能是需要提供的,用来保证异常时的数据一致性。像我们rocksdb所在节点出现断电异常,节点死机 等情况,内存中的数据是会丢失的。此时,需要rocksdb提供一种机制,能够在这种异常情况下尽可能挽回多的数据。

业界通用的解决方案就是WAL (write ahead log),接下来通过Rocksdb 的WAL实现来探索一下 WAL怎么能够保证数据的crash safe。
在这里插入图片描述

1. 概览

上层应用针对rocksdb的每一次更新在同步流程之上会存放在两个地方,一个是rocksdb实现的内存数据结构 memtable,另一个是在磁盘的WAL。这里为什么没有说 SST呢,是因为同步流程上只会先写入wal,再写入memtable,具体将数据写入sst则是通过异步的flush和compaction 后台线程进行的。

WAL主要作用是用来恢复发生 rocksdb非优雅退出(节点断电,死机) 时 memtable中的未commited中的数据。
所以WAL 的写入需要优先于memtable,且每一次写入都需要flush ,这也是write head的由来。

ps:
下文中会提到一些缩写,这里提前声明一下
cf – column family 列族,将key-value逻辑隔离
mem – memtable 内存数据结构,在内存中保存key-value数据
imm – immutable memtable 只读的memtable,memtable写入到配置的大小之后会切换为imm,进行后台flush,同时创建一个新的mem接收IO

2. 创建

rocksdb wal的创建有两种情况:

  1. Open db 时 会创建
    s = rocksdb::DB::Open(options, "./db/", &db);
    
  2. 当一个Column Family flush的时候会创建
     db->Flush(rocksdb::FlushOptions(), handles[1]);
    
  3. 当wal文件大小达到了max_total_wal_size时,会重新创建新的wal,不过这里的创建是会和cf的memtable flush一块进行
    如果没有配置该选项,则会将wal可写入的大小定为memtable的4倍
    if (UNLIKELY(status.ok() && !single_column_family_mode_ &&total_log_size_ > GetMaxTotalWalSize())) {WaitForPendingWrites();status = SwitchWAL(write_context);
    }
    

看看代码是怎么实现的,Open db的时候:

  • 在恢复完MANIFEST之后,需要先获取即将创建的WAL 文件编号,即xxx.log
  • 为wal文件分配预分配 一个memtale的buffer大小 max_write_buffer_size
  • 创建wal文件
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,const std::vector<ColumnFamilyDescriptor>& column_families,std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,const bool seq_per_batch, const bool batch_per_txn) {......// NewFileNumber 会在manifest中记录的next_file_number_ 基础上加一uint64_t new_log_number = impl->versions_->NewFileNumber();log::Writer* new_log = nullptr;const size_t preallocate_block_size =impl->GetWalPreallocateBlockSize(max_write_buffer_size);s = impl->CreateWAL(new_log_number, 0 /*recycle_log_number*/,preallocate_block_size, &new_log);          

当column family flush的时候,通过DBImpl::Flush 调用对应cf的memtable flush函数,在flush memtable的过程中进行新的wal的创建。
这里当触发cf的flush时,需要将内存中memtable 标记为imutable-memetable,来进行后台的写入sst文件;同时会生成新的memtable,这个时候wal记录的是旧的memtable的请求,为了数据的隔离性,且wal不会过大,每个wal文件只和一个memtable绑定,所以切换memtable的过程中会创建新的wal文件,用来接收新的请求。

Status DBImpl::Flush(const FlushOptions& flush_options,ColumnFamilyHandle* column_family) {...// 主要就是flush memtables = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);...
}Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,const FlushOptions& flush_options,FlushReason flush_reason, bool writes_stopped) {...// 切换memtables = SwitchMemtable(cfd, &context);...
}
Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {......//前面获取到需要创建的wal 文件编号,预分配足够的wal存储空间,那么接下来开始进行对应的文件创建if (creating_new_log) {// TODO: Write buffer size passed in should be max of all CF's instead// of mutable_cf_options.write_buffer_size.s = CreateWAL(new_log_number, recycle_log_number, preallocate_block_size,&new_log);}
}

3. 更新

这里通过一些测试代码,对flush时 wal的更新进行更加详细的描述。
wal的更新主要是上面 创建过程说到的后两种情形,触发cf的flush和达到wal文件大小限制。
此时

  • 新的cf中的数据会刷新到新的SST 文件之中,即SwitchMemtable,将当前mem变为imm,由imm后台flush
  • 创建一个新的wal文件,之后所有cf 中的 write请求会优先写入新的wal之中。在SwitchMemtable 调用CreateWAL
  • 旧的wal文件会被标记为不可写入,并在之后删除。这个过程是在CreateWAL 函数中进行

一个简单的测试demo如下,完整代码会补充在之后。
先打开一个db,向两个colum family中分别写入key-value,再触发一次flush一个cf。

先做一个预期结果的评估:
这个时候会存在两个wal文件,一个是open时候创建的,另一个是flush的时候创建的,还会有一个sst文件,因为我们只flush了一次且数据量较小。
因为flush是切换memtale,创建新的wal文件,所以第二个wal文件应该是空的,数据记录保存在第一个wal文件。因为只flush一个cf1,另一个cf0的数据还在内存中,所以第一个wal文件是不能被删除的。

s = rocksdb::DB::Open(options, "./db", column_families, &handles, &db);cout << handles.size() << " open status is : " << s.ToString()  << endl;
db->Put(rocksdb::WriteOptions(), handles[1], rocksdb::Slice("key1"), rocksdb::Slice("value1"));
db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key2"), rocksdb::Slice("value2"));
db->Put(rocksdb::WriteOptions(), handles[1], rocksdb::Slice("key3"), rocksdb::Slice("value3"));
db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key4"), rocksdb::Slice("value4"));db->Flush(rocksdb::FlushOptions(), handles[1]); // 只flush cf1

查看内容
在这里插入图片描述
分别查看此时的sst文件内容和两个log文件,因为我们只flush了一个cf,所以sst文件中只有一个cf的数据。
而所有的记录都会存在于第一个wal文件,第二个wal文件则是空的。

./sst_dump --file=./db/000013.sst --command=scan
from [] to []
Process ./db/000013.sst
Sst file format: block-based
'key1' seq:1, type:1 => value1
'key3' seq:3, type:1 => value3
--------------------------------------------------
#第一个log文件 保存了向两个cf中put的请求信息
./ldb dump_wal --walfile=./db/000009.log
Sequence,Count,ByteSize,Physical Offset,Key(s)        
1,1,26,0,PUT(1) : 0x6B657931  
2,1,25,33,PUT(0) : 0x6B657932 
3,1,26,65,PUT(1) : 0x6B657933 
4,1,25,98,PUT(0) : 0x6B657934 
--------------------------------------------------
./ldb dump_wal --walfile=./db/000012.log

此时继续写入数据,并将cf0也flush掉,则旧的wal文件可以被删除了,此时只会有一个文件了。
因为新写入的数据是cf0的,之前未flush的数据也是cf0的,统一flush之后只会有一个空的log文件了。

db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key5"), rocksdb::Slice("value5"));
db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key6"), rocksdb::Slice("value6"));
db->Flush(rocksdb::FlushOptions(), handles[0]);

查看文件列表
总共flush了两次,会有两个sst文件。只剩下了一个flush之后的 log文件了。
在这里插入图片描述
查看具体内容如下:

# log文件是空的
? ./ldb dump_wal --walfile=./db/000014.log --header
Sequence,Count,ByteSize,Physical Offset,Key(s)
--------------------------------------------------
# 第一个sst文件保存的是cf1的数据
? ./sst_dump --file=./db/000013.sst --command=scan 
from [] to []
Process ./db/000013.sst
Sst file format: block-based
'key1' seq:1, type:1 => value1
'key3' seq:3, type:1 => value3
--------------------------------------------------
# 第二个sst文件保存的是cf2的数据
? ./sst_dump --file=./db/000015.sst --command=scan
from [] to []
Process ./db/000015.sst
Sst file format: block-based
'key2' seq:2, type:1 => value2
'key4' seq:4, type:1 => value4
'key5' seq:5, type:1 => value5
'key6' seq:6, type:1 => value6

通过以上的简单测试,再结合对应的源码,我们基本清楚了wal文件的切换更新过程。

  1. open db的时候会创建一个wal文件
  2. flush memtable的时候会创建一个wal文件,当旧wal文件中有未flush的cf的数据时不会被删除,直到所有cf的数据都被flush到sst文件之中才会被删除。 这是正常场景保证数据一直性的wal实现,后文还会继续描述更多的WAL recovery模式。

完整测试代码如下:

#include <iostream>
#include <string>
#include <vector>
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/table.h>
#include <rocksdb/options.h>
#include <rocksdb/env.h>
#include <ctime>using std::cout;
using std::endl;int main () {rocksdb::DB* db;rocksdb::Options options;rocksdb::Status s;std::vector<rocksdb::ColumnFamilyDescriptor> column_families;column_families.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions()));column_families.push_back(rocksdb::ColumnFamilyDescriptor("new_cf", rocksdb::ColumnFamilyOptions()));std::vector<rocksdb::ColumnFamilyHandle*> handles;options.create_if_missing = true;options.max_open_files = -1;s = rocksdb::DB::Open(options, "./db/", &db);// create column familyrocksdb::ColumnFamilyHandle* cf;s = db->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), "new_cf", &cf);assert(s.ok());// close DBs = db->DestroyColumnFamilyHandle(cf);assert(s.ok());delete db;s = rocksdb::DB::Open(options, "./db", column_families, &handles, &db);cout << handles.size() << " open status is : " << s.ToString()  << endl;db->Put(rocksdb::WriteOptions(), handles[1], rocksdb::Slice("key1"), rocksdb::Slice("value1"));db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key2"), rocksdb::Slice("value2"));db->Put(rocksdb::WriteOptions(), handles[1], rocksdb::Slice("key3"), rocksdb::Slice("value3"));db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key4"), rocksdb::Slice("value4"));db->Flush(rocksdb::FlushOptions(), handles[1]);// key5 and key6 will appear in a new WALdb->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key5"), rocksdb::Slice("value5"));db->Put(rocksdb::WriteOptions(), handles[0], rocksdb::Slice("key6"), rocksdb::Slice("value6"));db->Flush(rocksdb::FlushOptions(), handles[0]);delete db;return 0;
}

以上代码我是在mac上测试的,可以通过以下命令编译运行
g++ -std=c++11 rocksdb_wal_test.cc -lrocksdb

4. recovery mode

rocksdb 支持可以调整的4种 wal recovery模式

  • kAbsoluteConsistency
    这种级别是对一致性要求最高的级别,不允许有任何的IO错误,不能出现一个record的丢失。
  • kTolerateCorruptedTailRecords
    这个级别是允许丢失一部分数据,会忽略一些在wal末尾写入失败的请求,数据异常仅限于log文件末尾写入失败。如果出现了其他的异常,都无法进行数据重放。
  • kPointInTimeRecovery
    这个级别也是现在rocksdb默认的recovery mode,当遇到IO error的时候会停止重放,将出现异常之前的所有数据进行完成重放。
  • kSkipAnyCorruptedRecords
    这个级别是一致性要求最低的,会忽略所有的IO error,尝试尽可能多得恢复数据。

还记得之前搞CEPH时被OSD支配的恐惧,无数的rocksdb corruption,当时如果对rocksdb有足够多的理解,那么丢失的数据应该会少很多,也会有更多的时间来睡觉了。

在这里插入图片描述

详细的mode声明如下:

enum class WALRecoveryMode : char {// Original levelDB recovery// We tolerate incomplete record in trailing data on all logs// Use case : This is legacy behaviorkTolerateCorruptedTailRecords = 0x00,// Recover from clean shutdown// We don't expect to find any corruption in the WAL// Use case : This is ideal for unit tests and rare applications that// can require high consistency guaranteekAbsoluteConsistency = 0x01,// Recover to point-in-time consistency (default)// We stop the WAL playback on discovering WAL inconsistency// Use case : Ideal for systems that have disk controller cache like// hard disk, SSD without super capacitor that store related datakPointInTimeRecovery = 0x02,// Recovery after a disaster// We ignore any corruption in the WAL and try to salvage as much data as// possible// Use case : Ideal for last ditch effort to recover data or systems that// operate with low grade unrelated datakSkipAnyCorruptedRecords = 0x03,
};

以上配置,可以通过option选项Options.wal_recovery_mode = 2来进行对应模式的配置

Talk is cheap!!!

接下来我们仔细看看每一种recovery mode是如何实现各自的recovery 级别的。

在Open db调用Recover的时候,如果db已经存在,会尝试从已经存在的db中的log文件 恢复上一次db的memtable数据
详细的过程是:

  • 取存在的每一个log文件,循环进行如下操作
  • 创建一个wal文件的file_reader
  • 创建一个利用file_reader创建一个log reader,并用 wal_recovery_mode 参与到log reader的初始化
  • 在具体的ReadRecord 中进行对应的recovery mode区分
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,SequenceNumber* next_sequence, bool read_only) {......// 创建一个file readerstd::unique_ptr<SequentialFileReader> file_reader;{std::unique_ptr<FSSequentialFile> file;status = fs_->NewSequentialFile(fname,fs_->OptimizeForLogRead(file_options_),&file, nullptr);    ......   // 创建一个log reader log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),&reporter, true /*checksum*/, log_number);......while (!stop_replay_by_wal_filter &&reader.ReadRecord(&record, &scratch,immutable_db_options_.wal_recovery_mode) &&status.ok()) {if (record.size() < WriteBatchInternal::kHeader) {reporter.Corruption(record.size(),Status::Corruption("log record too small"));continue;}     ......
}

进入到ReadRecord 之中,从物理Record中读数据发生异常时,如果想要上报异常,都需要确认当前的recovery mode是最高一致性
kAbsoluteConsistency的时候才会进行上报

bool Reader::ReadRecord(Slice* record, std::string* scratch,WALRecoveryMode wal_recovery_mode) {while (true) {uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();size_t drop_size = 0;// 会从物理的record中读取数据,将读取过程中发生的异常返回给record_typeconst unsigned int record_type = ReadPhysicalRecord(&fragment, &drop_size);......//接下来通过不同的recovery mode来针对异常的record_type 进行处理,决定是否需要返回数据异常//当出现 header,eof(log末尾),kOldRecord(和eof异常类似)异常时都会 确认recovery mode是否是 kAbsoluteConsistency//是的话直接report异常case kBadHeader:if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {// in clean shutdown we don't expect any error in the log filesReportCorruption(drop_size, "truncated header");}......case kEof:if (in_fragmented_record) {if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {// in clean shutdown we don't expect any error in the log filesReportCorruption(scratch->size(), "error reading trailing data");}......case kOldRecord:if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) {//如果是跳过所有异常的话,不会report corruption// Treat a record from a previous instance of the log as EOF.if (in_fragmented_record) {if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency) {// in clean shutdown we don't expect any error in the log filesReportCorruption(scratch->size(), "error reading trailing data");}// This can be caused by the writer dying immediately after// writing a physical record but before completing the next; don't// treat it as a corruption, just ignore the entire logical record.scratch->clear();}return false;}

我们再回到RecoverLogFiles 函数之中,在处理完Record之后,后续还会有将重放后的数据写入memtable的过程。
当完成所有的读取和写入memtable之后,会对所有的返回状态进行确认,因为之上所有的操作都是在重放wal,可能会有失败的情况。

接下来进入如下逻辑,根据重放的状态进行后续的recovery 操作

  • 如果mode是 kSkipAnyCorruptedRecords ,则跳过所有的异常,直接返回OK
  • 如果mode是 kPointInTimeRecovery 或者 kTolerateCorruptedTailRecords 则会暂停处理,将stop_replay_for_corruption 置为true, 后续会跳过当前的log number进行重放
  • 如果是kAbsoluteConsistency 或者 kTolerateCorruptedTailRecords 则直接返回
      if (immutable_db_options_.wal_recovery_mode ==WALRecoveryMode::kSkipAnyCorruptedRecords) { // 如果rev// We should ignore all errors unconditionallystatus = Status::OK();} else if (immutable_db_options_.wal_recovery_mode ==WALRecoveryMode::kPointInTimeRecovery) {// We should ignore the error but not continue replayingstatus = Status::OK();stop_replay_for_corruption = true;corrupted_log_number = log_number;ROCKS_LOG_INFO(immutable_db_options_.info_log,"Point in time recovered to log #%" PRIu64" seq #%" PRIu64,log_number, *next_sequence);} else {assert(immutable_db_options_.wal_recovery_mode ==WALRecoveryMode::kTolerateCorruptedTailRecords ||immutable_db_options_.wal_recovery_mode ==WALRecoveryMode::kAbsoluteConsistency);return status;}// 主要是对kPointInTimeRecovery 和 kTolerateCorruptedTailRecords进行恢复处理// 将corrupted_log_number 之前所有数据完成恢复,再报告异常if (stop_replay_for_corruption == true &&(immutable_db_options_.wal_recovery_mode ==WALRecoveryMode::kPointInTimeRecovery ||immutable_db_options_.wal_recovery_mode ==WALRecoveryMode::kTolerateCorruptedTailRecords)) {for (auto cfd : *versions_->GetColumnFamilySet()) {if (cfd->GetLogNumber() > corrupted_log_number) {ROCKS_LOG_ERROR(immutable_db_options_.info_log,"Column family inconsistency: SST file contains data"" beyond the point of corruption.");return Status::Corruption("SST file is ahead of WALs");}}}

5. WAL 的写入

这里再补充以下wal文件是如何在写入memtable之前写入的,会如何更新。

还是通过用户接口Put,该接口会调用到底层的Status DBImpl::WriteImpl,在该函数中,如果我们开启来pipeline,则会走pipeline的写入逻辑。如果未开启pipeline,则会正常写入。

WriteToWAL的函数入口如下:

 status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,need_log_dir_sync, last_sequence + 1);

因为实际写入的过程中可能多个客户端会并发调用put,每一个Put都会有自己的writer,为了提高写性能,会从多个writer选出来一个leader,让这个leader将所有 writer要写的wal收集到一块,进行batch写入,其他从writer等待leader写完之后再并发写memtable。

所以以上WriteToWAL内部调用了一个重载 了batch写的wal函数

  WriteBatch* merged_batch = MergeBatch(write_group, &tmp_batch_,&write_with_wal, &to_be_cached_state);......uint64_t log_size;status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);if (to_be_cached_state) {cached_recoverable_state_ = *to_be_cached_state;cached_recoverable_state_empty_ = false;}

进入到实际执行写入的WriteToWAL函数,会调用AddRecord函数执行实际的write,这里会用到文件系统的write接口,选择的文件系统是在db open的时候 进行DBImpl类初始化,根据env传入的参数进行文件系统的选择, 默认是PosixFileSystem

std::shared_ptr<FileSystem> FileSystem::Default() {static PosixFileSystem default_fs;static std::shared_ptr<PosixFileSystem> default_fs_ptr(&default_fs, [](PosixFileSystem*) {});return default_fs_ptr;
}

AddRecord函数中将之前合并好的 log_enery,每次写入大小不能超过11bytes,这个是wal的具体约定的格式了。通过文件系统接口进行Append写入,直到把log_entry完全写入。

Status Writer::AddRecord(const Slice& slice) {......do {......s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",static_cast<size_t>(leftover)));......}while(s.ok() && left > 0);......
}

因为我们没有配置direct,则最终通过
Flush -->WriteBuffered–> PosixWritableFile::Append --> PosixWrite --> write 直到底层的 write系统调用,进行写入。
其实这个写入链路还是比较长,不过耗时的话中间仅仅是做一些字符串的拼接,到write这里才是耗时所在。因为,还要走一遍庞大的内核文件系统的写入链路,这里也能够理解为什么ceph需要单独再做一个小型的文件系统来代替内核文件系统了。

写WAL是我们整个rocksdb写入最为耗时的一段,memtable的写入是在wal写完成之后才能写入,而memtable都是纯内存操作,所以耗时还是消耗在了写WAL之上,在传统的xfs之上,一个200bytes的请求,写WAL耗时大概2-4us。

为了保证WAL能够落盘,我们还需要配置options.sync=true,在WriteToWAL函数中,完成了文件系统的写入之后 会调用fsync来进行 sync写。

通过如下简单的stap脚本,可以非常方便得抓取rocksdb内部函数耗时:

#!/bin/stap
global sends #打印出来的单位是微妙
probe process("test").function("rocksdb::DBImpl::GetImpl").return {sends <<< gettimeofday_us() - @entry(gettimeofday_us()) 
}probe timer.s(1) { #每隔一秒打印一次print(ctime(gettimeofday_s()))print("\n")print(@hist_log(sends))delete sends    
}

6. 总结

通过对 rocksdb实现的WAL 底层原理分析,我们能够清晰得看到整个wal的写入 都一定是保证优先写入,在保证了数据一致性的前提下却增加了一些额外的性能开销。
一致性的约束下,写WAL和写数据之间只能串行 ,那么在搭建以rocksdb为基础的分布式存储系统时 是不是可以在出了rocksdb链路之上的 其他方面进行一些优化,比如像CEPH的bluefs来代替内核文件系统,再使用一些新硬件特性:像底层的 SST open channel,AEP(据说性能接近内存的持久化存储),还有近两年Intel推出的pmem。
一篇论文Scalability of write-ahead logging on multicore and multisocket hardware推出的算法,将多核cpu的numa和log的写入进行整合,达到整体提升log写入的性能。