当前位置: 代码迷 >> 综合 >> 【leveldb】Log(五)
  详细解决方案

【leveldb】Log(五)

热度:57   发布时间:2024-01-25 18:47:45.0

针对Log文件的作用及格式介绍系列文章中有介绍,可点此处查看Log文件介绍说明。
所有的写操作都是先成功的append到Log日志中,然后在更新内存memtable的。
这样做有如下优点:

  1. 可以将随机的写IO变成append,极大的提高写磁盘速度;
  2. 防止在节点down机导致内存数据丢失,造成数据丢失,这对系统来说是个灾难。

日志文件的切换是在写KV记录之前会进行MakeRoomForWrite来决定是否切换新的日志文件,所以在写入的过程中是不需要关注文件切换的。接下来介绍Log模块的读写流程及结构。

一、文件结构

  • log_format.h:描述Log格式及Record类型。
  • log_reader.h、log_reader.cc:读模块实现。
  • log_writer.h、log_writer.cc:写模块实现。

二、格式信息

结构字段
  1. 一共有四种Record类型。
  2. 每个Block为32KB
  3. 每个Record头大小为4 + 2 + 1 = 7个字节。
namespace log {enum RecordType {// Zero is reserved for preallocated fileskZeroType = 0,kFullType = 1,// For fragmentskFirstType = 2,kMiddleType = 3,kLastType = 4
};
static const int kMaxRecordType = kLastType;static const int kBlockSize = 32768;// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;}  // namespace log
构造格式

在这里插入图片描述

三、写流程

1.类关系图

在这里插入图片描述

2.源码
log_writer.h
namespace leveldb {class WritableFile;namespace log {class Writer {public:<!实例一个Writer,传入的参数*dest要为空,且在写期间,*dest要保持存活>// Create a writer that will append data to "*dest".// "*dest" must be initially empty.// "*dest" must remain live while this Writer is in use.explicit Writer(WritableFile* dest);// Create a writer that will append data to "*dest".// "*dest" must have initial length "dest_length".// "*dest" must remain live while this Writer is in use.Writer(WritableFile* dest, uint64_t dest_length);Writer(const Writer&) = delete;Writer& operator=(const Writer&) = delete;~Writer();<!写一个Record到文件中>Status AddRecord(const Slice& slice);private:<!实际写>Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);<!Log文件>WritableFile* dest_;<!位于当前block的哪个位置>int block_offset_;  // Current offset in block<!提前计算好的Type对应的CRC值,减少使用过程中的计算>// crc32c values for all supported record types. These are// pre-computed to reduce the overhead of computing the crc of the// record type stored in the header.uint32_t type_crc_[kMaxRecordType + 1];
};}  // namespace log
}  // namespace leveldb
log_writer.cc
namespace leveldb {
namespace log {<!计算RecordType的CRC32值>
static void InitTypeCrc(uint32_t* type_crc) {for (int i = 0; i <= kMaxRecordType; i++) {char t = static_cast<char>(i);type_crc[i] = crc32c::Value(&t, 1);}
}Writer::Writer(WritableFile* dest) : dest_(dest), block_offset_(0) {InitTypeCrc(type_crc_);
}Writer::Writer(WritableFile* dest, uint64_t dest_length): dest_(dest), block_offset_(dest_length % kBlockSize) {InitTypeCrc(type_crc_);
}<!指定默认析构函数>
Writer::~Writer() = default;<!写Record流程>
Status Writer::AddRecord(const Slice& slice) {const char* ptr = slice.data();size_t left = slice.size();<!1、有必要的情况下,需要record进行分片写入;2、如果slice数据为空,仍然会写一次,只是长度为0,读取的时候会对此种情况进行处理。>// Fragment the record if necessary and emit it. Note that if slice// is empty, we still want to iterate once to emit a single// zero-length record<!写文件是以一个Block(32KB)为单元写入的,而写入到Block这是一个个Record,每个Record的头长度为7Byte。假设这个Block剩余可写的长度为L,要写入的数据为N,则分以下情况进行处理:1、L >= N+7,说明Block空间足以容纳下一个Record和7Byte的头,则这个数据被定义为一个Type为kFullType的Record。2、N + 7 > L >= 7,即当前Block空间大于等于7Byte,但不足以保存全部内容,则在当前页生存一个Type为kFirstType的Record,Payload(Block剩余空间)保存数据前面L-7字节的内容(可以为0,那就直说一个头),如果数据剩余的长度小于32KB,则在下一个页中生成一个Type为kLastType的Record,否则在下一个Block中生成一个Type为kMiddleType的Record,依次类推,直至数据被完全保存下来。3、L < 7,当前Block的剩余长度小于7Byte,则填充0。      以上流程就是整个写流程了。>Status s;bool begin = true;do {const int leftover = kBlockSize - block_offset_;assert(leftover >= 0);if (leftover < kHeaderSize) {// Switch to a new blockif (leftover > 0) {// Fill the trailer (literal below relies on kHeaderSize being 7)static_assert(kHeaderSize == 7, "");dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));}block_offset_ = 0;}// Invariant: we never leave < kHeaderSize bytes in a block.assert(kBlockSize - block_offset_ - kHeaderSize >= 0);const size_t avail = kBlockSize - block_offset_ - kHeaderSize;const size_t fragment_length = (left < avail) ? left : avail;RecordType type;const bool end = (left == fragment_length);if (begin && end) {type = kFullType;} else if (begin) {type = kFirstType;} else if (end) {type = kLastType;} else {type = kMiddleType;}s = EmitPhysicalRecord(type, ptr, fragment_length);ptr += fragment_length;left -= fragment_length;begin = false;} while (s.ok() && left > 0);return s;
}<!实际写实现:1、格式化打包头;2、CRC校验计算;3、先写头、在写Payload,写成功之后flush下;4、将block_offset_位置重新计算下。
>
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr,size_t length) {assert(length <= 0xffff);  // Must fit in two bytesassert(block_offset_ + kHeaderSize + length <= kBlockSize);// Format the headerchar buf[kHeaderSize];buf[4] = static_cast<char>(length & 0xff);buf[5] = static_cast<char>(length >> 8);buf[6] = static_cast<char>(t);// Compute the crc of the record type and the payload.uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);crc = crc32c::Mask(crc);  // Adjust for storageEncodeFixed32(buf, crc);// Write the header and the payloadStatus s = dest_->Append(Slice(buf, kHeaderSize));if (s.ok()) {s = dest_->Append(Slice(ptr, length));if (s.ok()) {s = dest_->Flush();}}block_offset_ += kHeaderSize + length;return s;
}}  // namespace log
}  // namespace leveldb

四、读流程

1.类关系图

在这里插入图片描述

2.源码
log_reader.h
namespace leveldb {<!顺序读取文件的抽象封装类>
class SequentialFile;namespace log {class Reader {public:<!负责上报错误类>// Interface for reporting errors.class Reporter {public:virtual ~Reporter();// Some corruption was detected. "size" is the approximate number// of bytes dropped due to the corruption.virtual void Corruption(size_t bytes, const Status& status) = 0;};// Create a reader that will return log records from "*file".// "*file" must remain live while this Reader is in use.//// If "reporter" is non-null, it is notified whenever some data is// dropped due to a detected corruption. "*reporter" must remain// live while this Reader is in use.//// If "checksum" is true, verify checksums if available.//// The Reader will start reading at the first record located at physical// position >= initial_offset within the file.<!1.file: 要读取的Log文件封装。2.reporter: 错误上报类。3.checksum: 是否check校验。4.initial_offset:开始读取数据偏移位置。>Reader(SequentialFile* file, Reporter* reporter, bool checksum,uint64_t initial_offset);<!禁止拷贝构造和赋值构造>Reader(const Reader&) = delete;Reader& operator=(const Reader&) = delete;~Reader();// Read the next record into *record. Returns true if read// successfully, false if we hit end of the input. May use// "*scratch" as temporary storage. The contents filled in *record// will only be valid until the next mutating operation on this// reader or the next mutation to *scratch.<!1.读取一个Record记录,成功返回true,失败返回false2.读取的数据在*record参数中,传入的*scratch用于临时内部临时存储使用。>bool ReadRecord(Slice* record, std::string* scratch);// Returns the physical offset of the last record returned by ReadRecord.//// Undefined before the first call to ReadRecord.<!返回最近一次读取Record的偏移位,也就是这个Record的起始位>uint64_t LastRecordOffset();private:// Extend record types with the following special values<!扩展两种类型用于错误表示。1.kEof表示到达文件尾。2.kBadRecord表示以下三种错误:1)CRC校验失败、2)读取长度为03)读取的内存在initial_offset之外,比方说从64位置开始读而Record在31~63之间。>enum {kEof = kMaxRecordType + 1,// Returned whenever we find an invalid physical record.// Currently there are three situations in which this happens:// * The record has an invalid CRC (ReadPhysicalRecord reports a drop)// * The record is a 0-length record (No drop is reported)// * The record is below constructor's initial_offset (No drop is reported)kBadRecord = kMaxRecordType + 2};// Skips all blocks that are completely before "initial_offset_".//// Returns true on success. Handles reporting.<!跳到起始位置initial_offset处开始读取>bool SkipToInitialBlock();// Return type, or one of the preceding special values<!读取一个Record>unsigned int ReadPhysicalRecord(Slice* result);// Reports dropped bytes to the reporter.// buffer_ must be updated to remove the dropped bytes prior to invocation.<!上报错误和丢弃>void ReportCorruption(uint64_t bytes, const char* reason);void ReportDrop(uint64_t bytes, const Status& reason);SequentialFile* const file_;Reporter* const reporter_;bool const checksum_;<!32kb大小数据存储空间,用于从文件中读取一个Block>char* const backing_store_;<!将从文件读取到的数据封装为一个Slice,用buffer_来表示>Slice buffer_;<!当读取的文件数据大小小于kBlockSize,表示读取到文件尾,将eof_置位true>bool eof_;  // Last Read() indicated EOF by returning < kBlockSize<!最近一次读取Record的偏移位,也就是这个Record的起始位>// Offset of the last record returned by ReadRecord.uint64_t last_record_offset_;<!读取的Buffer尾部的偏移位>// Offset of the first location past the end of buffer_.uint64_t end_of_buffer_offset_;<!开始读取数据位置>// Offset at which to start looking for the first record to returnuint64_t const initial_offset_;<!是否重新开始读取Record><!在初始读取位置initial_offset > 0的情况下,resyncing_才为true,因为初始位置如果不是从0开始,首次读取到的Record的type是kMiddleType和kLastType的话,则不是一个完整的record,所以要丢弃重新读取。>// True if we are resynchronizing after a seek (initial_offset_ > 0). In// particular, a run of kMiddleType and kLastType records can be silently// skipped in this modebool resyncing_;
};}  // namespace log
}  // namespace leveldb
log_reader.cc
namespace log {
<!指定下默认析构函数>
Reader::Reporter::~Reporter() = default;<!实例化时,做如下事情:1、赋值下读取文件、异常上报程序;2、是否执行数据校验(checksum_为true,则校验);3、申请一块32KB大小的内存用于读取block;4Slice(buffer_)初始化;5、上次读取的record偏移位为0;6、读取的一个buffer尾部偏移位为0;7、初始化读取Record位置。8、重读取标志(resyncing_)
>
Reader::Reader(SequentialFile* file, Reporter* reporter, bool checksum,uint64_t initial_offset): file_(file),reporter_(reporter),checksum_(checksum),backing_store_(new char[kBlockSize]),buffer_(),eof_(false),last_record_offset_(0),end_of_buffer_offset_(0),initial_offset_(initial_offset),resyncing_(initial_offset > 0) {}<!析构时,释放内存>
Reader::~Reader() { delete[] backing_store_; }<!根据initial_offset跳转到第一个Block处>
bool Reader::SkipToInitialBlock() {const size_t offset_in_block = initial_offset_ % kBlockSize;uint64_t block_start_location = initial_offset_ - offset_in_block;<!写数据时,会有个最后6字节的0x00填充位,也就是trailer如果最后求到的余的位置落在这6字节范围内,直接跳过一个32KB的Block,进行读取。>// Don't search a block if we'd be in the trailerif (offset_in_block > kBlockSize - 6) {block_start_location += kBlockSize;}<!跳转到的开始读取位置指定为Buffer的尾部偏移位>end_of_buffer_offset_ = block_start_location;<!跳转到第一个包含初始Record的Block处,如果异常就报错>// Skip to start of first block that can contain the initial recordif (block_start_location > 0) {Status skip_status = file_->Skip(block_start_location);if (!skip_status.ok()) {ReportDrop(block_start_location, skip_status);return false;}}return true;
}<!读取Record实现>
bool Reader::ReadRecord(Slice* record, std::string* scratch) {<!如果上一次读取record位置小于当前起始读取位置则跳过中间部分,直接到开始读取数据处>if (last_record_offset_ < initial_offset_) {if (!SkipToInitialBlock()) {return false;}}<!1、初始化值;2、首次进来,肯定不在一个record片段中,所以 in_fragmented_recordw为false>scratch->clear();record->clear();bool in_fragmented_record = false;// Record offset of the logical record that we're reading// 0 is a dummy value to make compilers happy<!正在读取Record的偏移位,初始化为0>uint64_t prospective_record_offset = 0;Slice fragment;while (true) {<!读取一个Record,并返回Record的Type,实现及注释看下文>const unsigned int record_type = ReadPhysicalRecord(&fragment);// ReadPhysicalRecord may have only had an empty trailer remaining in its// internal buffer. Calculate the offset of the next physical record now// that it has returned, properly accounting for its header size.<!这里就是计算出当前读取的Record的开始位置偏移位>uint64_t physical_record_offset =end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();<!如果initial_offset > 0,则resyncing_为true1、如果读取到的record_type是kMiddleType,则少了kFirstType,重新读。2、如果读取到的record_type是kLastType,则少了kFirstType和kMiddleType,重新读,同时要把resyncing_置位false>if (resyncing_) {if (record_type == kMiddleType) {continue;} else if (record_type == kLastType) {resyncing_ = false;continue;} else {resyncing_ = false;}}switch (record_type) {case kFullType:if (in_fragmented_record) {<!早期版本有BUG,Writer会写一个空的kFirstType,然后后面跟着一个kFullType,这样读取到kFirstType之后,in_fragmented_record置位true了,如此则进入此流程>// Handle bug in earlier versions of log::Writer where// it could emit an empty kFirstType record at the tail end// of a block followed by a kFullType or kFirstType record// at the beginning of the next block.if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(1)");}}<!1、记录下当前Record起始地址,2、返回读取到的record。>prospective_record_offset = physical_record_offset;scratch->clear();*record = fragment;last_record_offset_ = prospective_record_offset;return true;case kFirstType:if (in_fragmented_record) {<!早期版本有BUG,在下一个block之前会存在一个kFirstType,这样如果读取到下一个block有kFirstType,而之前已经读了一个kFirstType,则in_fragmented_record置位true了,如此则进入此流程>// Handle bug in earlier versions of log::Writer where// it could emit an empty kFirstType record at the tail end// of a block followed by a kFullType or kFirstType record// at the beginning of the next block.if (!scratch->empty()) {ReportCorruption(scratch->size(), "partial record without end(2)");}}<!进入此流程表示一个完整的record由first、middle、last组成剩下的就是组装数据。>prospective_record_offset = physical_record_offset;scratch->assign(fragment.data(), fragment.size());in_fragmented_record = true;break;case kMiddleType:if (!in_fragmented_record) {<!理论下如果record是kMiddleType,则in_fragmented_record为true,否则报错>ReportCorruption(fragment.size(),"missing start of fragmented record(1)");} else {scratch->append(fragment.data(), fragment.size());}break;case kLastType:if (!in_fragmented_record) {ReportCorruption(fragment.size(),"missing start of fragmented record(2)");} else {<!最后一个type,Record,读完则组成一个完整的record,同时赋值下当前完整record的起始位置。>scratch->append(fragment.data(), fragment.size());*record = Slice(*scratch);last_record_offset_ = prospective_record_offset;return true;}break;<!余下的都是错误处理,很容易看懂,就不注释了>case kEof:if (in_fragmented_record) {// This can be caused by the writer dying immediately after// writing a physical record but before completing the next; don't// treat it as a corruption, just ignore the entire logical record.scratch->clear();}return false;case kBadRecord:if (in_fragmented_record) {ReportCorruption(scratch->size(), "error in middle of record");in_fragmented_record = false;scratch->clear();}break;default: {char buf[40];snprintf(buf, sizeof(buf), "unknown record type %u", record_type);ReportCorruption((fragment.size() + (in_fragmented_record ? scratch->size() : 0)),buf);in_fragmented_record = false;scratch->clear();break;}}}return false;
}
<!返回最近读取Record的偏移位>
uint64_t Reader::LastRecordOffset() { return last_record_offset_; }void Reader::ReportCorruption(uint64_t bytes, const char* reason) {ReportDrop(bytes, Status::Corruption(reason));
}void Reader::ReportDrop(uint64_t bytes, const Status& reason) {if (reporter_ != nullptr &&end_of_buffer_offset_ - buffer_.size() - bytes >= initial_offset_) {reporter_->Corruption(static_cast<size_t>(bytes), reason);}
}unsigned int Reader::ReadPhysicalRecord(Slice* result) {<!while true的目的就是读取一个完整的Record>while (true) {if (buffer_.size() < kHeaderSize) {<!kHeaderSize为7,如果buffer剩余大小小于7Byte,分两组情况:1、还未读取到文件尾部;2、已经读取到文件尾部。>if (!eof_) {// Last read was a full read, so this is a trailer to skip<!如果buffer_剩余大小小于7Byte且文件未读取到尾,那上一次读是读取了一个完整的Record,剩余的大小只是6B的填充trailer,所以只需跳过这个trailer,清空即可。>buffer_.clear();<!1、读取32KB大小数据;2、将end_of_buffer_offset_偏移下位置。>Status status = file_->Read(kBlockSize, &buffer_, backing_store_);end_of_buffer_offset_ += buffer_.size();if (!status.ok()) {<!读取失败,直接报错并返回读到文件尾>buffer_.clear();ReportDrop(kBlockSize, status);eof_ = true;return kEof;} else if (buffer_.size() < kBlockSize) {<!读取数据大小小于32KB,认为读取到文件尾了,通过continue,由上文判断下是不是小于7Byte的大小。>eof_ = true;}continue;} else {// Note that if buffer_ is non-empty, we have a truncated header at the// end of the file, which can be caused by the writer crashing in the// middle of writing the header. Instead of considering this an error,// just report EOF.<!如果buffer_是大于0,小于7(头大小)且到文件尾了,很可能是正在写头的时候,写流程崩溃了导致截断的头,这里我们只需要返回到达文件尾即可,不会影响数据。>buffer_.clear();return kEof;}}<!准备解析数据,先解析header>// Parse the headerconst char* header = buffer_.data();const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;const unsigned int type = header[6];const uint32_t length = a | (b << 8);<!解析出的数据长度大于实际读取的数据,则是异常的,返回>if (kHeaderSize + length > buffer_.size()) {size_t drop_size = buffer_.size();buffer_.clear();if (!eof_) {ReportCorruption(drop_size, "bad record length");return kBadRecord;}// If the end of the file has been reached without reading |length| bytes// of payload, assume the writer died in the middle of writing the record.// Don't report a corruption.return kEof;}<!在env_posix.cc环境下写文件时存在预分配的情况会导致此类型type,返回异常即可,不用上报>if (type == kZeroType && length == 0) {// Skip zero length record without reporting any drops since// such records are produced by the mmap based writing code in// env_posix.cc that preallocates file regions.buffer_.clear();return kBadRecord;}<!主要是校验type+data数据,校验失败这要上报数据异常,并返回>// Check crcif (checksum_) {uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);if (actual_crc != expected_crc) {// Drop the rest of the buffer since "length" itself may have// been corrupted and if we trust it, we could find some// fragment of a real log record that just happens to look// like a valid log record.size_t drop_size = buffer_.size();buffer_.clear();ReportCorruption(drop_size, "checksum mismatch");return kBadRecord;}}<!从buffer_中移除读取到的Record数据指向和大小>buffer_.remove_prefix(kHeaderSize + length);<!end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length 就是读取Record的开始位置,也就是说读取Record的开始位置在initial_offset之前,则丢弃这个Record。>// Skip physical record that started before initial_offset_if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <initial_offset_) {result->clear();return kBadRecord;}<!返回一个完整Record>*result = Slice(header + kHeaderSize, length);return type;}
}}  // namespace log

参考链接:
https://blog.csdn.net/weixin_36145588/article/details/76423194