From 89f15d1a3616768352290529292d75a9c5b99b3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Fri, 28 Oct 2022 10:54:05 +0800 Subject: [PATCH 01/11] add comment --- leveldb/batch.go | 1 + leveldb/cache/cache.go | 16 ++++++++++----- leveldb/comparer.go | 3 +++ leveldb/comparer/bytes_comparer.go | 4 +++- leveldb/db.go | 4 ++-- leveldb/db_state.go | 9 +++++++-- leveldb/db_write.go | 17 ++++++++++++---- leveldb/journal/journal.go | 17 ++++++++++++---- leveldb/table.go | 19 ++++++++++++------ leveldb/table/reader.go | 32 +++++++++++++++++++++--------- leveldb/table/table.go | 1 + leveldb/table/writer.go | 2 +- leveldb/version.go | 1 + 13 files changed, 92 insertions(+), 34 deletions(-) diff --git a/leveldb/batch.go b/leveldb/batch.go index d5ecf721..87dc278c 100644 --- a/leveldb/batch.go +++ b/leveldb/batch.go @@ -372,6 +372,7 @@ func batchesLen(batches []*Batch) int { } func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error { + // 实现是 singleWriter.Write if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil { return err } diff --git a/leveldb/cache/cache.go b/leveldb/cache/cache.go index 8e4f397c..2c7f1682 100644 --- a/leveldb/cache/cache.go +++ b/leveldb/cache/cache.go @@ -81,6 +81,7 @@ func (x mNodes) Swap(i, j int) { x[i], x[j] = x[j], x[i] } func (x mNodes) sort() { sort.Sort(x) } +// 找第一个大于等于 的index func (x mNodes) search(ns, key uint64) int { return sort.Search(len(x), func(i int) bool { a := x[i].ns @@ -155,7 +156,8 @@ func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly b if i == len(b.nodes) { b.nodes = append(b.nodes, n) } else { - b.nodes = append(b.nodes[:i+1], b.nodes[i:]...) + // 把新的 node 插入到 i 的前面 + b.nodes = append(b.nodes[:i+1], b.nodes[i:]...) // 添加到有序数组的对应位置,插入排序,所以 b.nodes 不可以过长 b.nodes[i] = n } bLen := len(b.nodes) @@ -188,6 +190,7 @@ func (b *mBucket) get(r *Cache, h *mHead, hash uint32, ns, key uint64, getOnly b return true, true, n } +// delete 的返回值约定值得参考 func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done, deleted bool) { b.mu.Lock() @@ -260,9 +263,12 @@ func (b *mBucket) delete(r *Cache, h *mHead, hash uint32, ns, key uint64) (done, } type mHead struct { - buckets []mBucket - mask uint32 - predecessor unsafe.Pointer // *mNode + buckets []mBucket + mask uint32 // 用当前的 mask 和 predecessor 的 mask 作对比,可以知道是在 grow 还是在 shrink + + // 下面注释应该错了, 应该是 *mHead 吧 + predecessor unsafe.Pointer // *mNode + resizeInProgress int32 overflow int32 @@ -301,7 +307,7 @@ func (h *mHead) initBucket(i uint32) *mBucket { nodes = make(mNodes, 0, len(m0)+len(m1)) nodes = append(nodes, m0...) nodes = append(nodes, m1...) - nodes.sort() + nodes.sort() // 可以用 merge sort?不过 m0, m1 的 size 比较小,也无所谓 } b.nodes = nodes b.state = bucketInitialized diff --git a/leveldb/comparer.go b/leveldb/comparer.go index 448402b8..835de420 100644 --- a/leveldb/comparer.go +++ b/leveldb/comparer.go @@ -10,6 +10,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/comparer" ) +// iComparer 处理关于 sequence 相关的逻辑 type iComparer struct { ucmp comparer.Comparer } @@ -37,6 +38,8 @@ func (icmp *iComparer) Name() string { func (icmp *iComparer) Compare(a, b []byte) int { x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey()) if x == 0 { + // 按照 seq 排序,seq 大的排在前面 + // uCompare 指的是 user 指定的 compare function if m, n := internalKey(a).num(), internalKey(b).num(); m > n { return -1 } else if m < n { diff --git a/leveldb/comparer/bytes_comparer.go b/leveldb/comparer/bytes_comparer.go index abf9fb65..2caf8756 100644 --- a/leveldb/comparer/bytes_comparer.go +++ b/leveldb/comparer/bytes_comparer.go @@ -6,7 +6,9 @@ package comparer -import "bytes" +import ( + "bytes" +) type bytesComparer struct{} diff --git a/leveldb/db.go b/leveldb/db.go index b2724cd9..25da7ce5 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -52,11 +52,11 @@ type DB struct { memMu sync.RWMutex memPool chan *memdb.DB mem, frozenMem *memDB - journal *journal.Writer + journal *journal.Writer // 写 WAL journalWriter storage.Writer journalFd storage.FileDesc frozenJournalFd storage.FileDesc - frozenSeq uint64 + frozenSeq uint64 // 哪一个 seq 的写操作导致了 memDB 的切换,或者说导致了当前的 memDB 被 freeze 成 immutable memDB // Snapshot. snapsMu sync.Mutex diff --git a/leveldb/db_state.go b/leveldb/db_state.go index 29430fee..d4963c41 100644 --- a/leveldb/db_state.go +++ b/leveldb/db_state.go @@ -87,6 +87,7 @@ func (db *DB) mpoolGet(n int) *memDB { default: } if mdb == nil || mdb.Capacity() < n { + // 取 WriteBuffer 和 n 的最大值,很小的 n 可能在满的 memDB 下触发 rotateMem 操作,所以需要取这个最大值 mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) } return &memDB{ @@ -120,6 +121,8 @@ func (db *DB) mpoolDrain() { // Create new memdb and froze the old one; need external synchronization. // newMem only called synchronously by the writer. func (db *DB) newMem(n int) (mem *memDB, err error) { + + // Why: 先创建了 journal 文件,如果后面有 err 不会导致此文件一直存在得不到清理吗? fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()} w, err := db.s.stor.Create(fd) if err != nil { @@ -130,6 +133,8 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { db.memMu.Lock() defer db.memMu.Unlock() + // 已经有 immutable memtable 的话不可以再创建 immutable memtable + // 这也是 LevelDB 的一个限制 if db.frozenMem != nil { return nil, errHasFrozenMem } @@ -146,12 +151,12 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { db.frozenJournalFd = db.journalFd } db.journalWriter = w - db.journalFd = fd + db.journalFd = fd // 新的 journal fd db.frozenMem = db.mem mem = db.mpoolGet(n) mem.incref() // for self mem.incref() // for caller - db.mem = mem + db.mem = mem // 切换了 memDBs // The seq only incremented by the writer. And whoever called newMem // should hold write lock, so no need additional synchronization here. db.frozenSeq = db.seq diff --git a/leveldb/db_write.go b/leveldb/db_write.go index 18eddbe1..7d985e3d 100644 --- a/leveldb/db_write.go +++ b/leveldb/db_write.go @@ -36,7 +36,7 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { retryLimit := 3 retry: // Wait for pending memdb compaction. - err = db.compTriggerWait(db.mcompCmdC) + err = db.compTriggerWait(db.mcompCmdC) // 这一步成功之后,就不该还有 frozen memDB if err != nil { return } @@ -47,6 +47,7 @@ retry: if err != nil { if err == errHasFrozenMem { if retryLimit <= 0 { + // 既然之前已经 wait 过 compaction了,那么这时就不该还有 frozen memDB panic("BUG: still has frozen memdb") } goto retry @@ -63,6 +64,7 @@ retry: return } +// 如果 db 写入数据量过快,则会触发 throttle func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { delayed := false slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger() @@ -79,15 +81,18 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { mdb = nil } }() - tLen := db.s.tLen(0) - mdbFree = mdb.Free() + tLen := db.s.tLen(0) // 当前版本 Level-0 的 SST 个数s + mdbFree = mdb.Free() // cap - size,即 mdb 还有多少剩下的空间s switch { case tLen >= slowdownTrigger && !delayed: + // level-0 的文件个数超出了阈值,默认为 slowdownTrigger=8,会减缓写入速度一次 delayed = true time.Sleep(time.Millisecond) case mdbFree >= n: + // mdb 的剩余空间足够容纳这么多数据量 return false case tLen >= pauseTrigger: + // 注意:此时 mdbFree < n,也就是说 mdb 目前不足以容纳 n 规模的数据 delayed = true // Set the write paused flag explicitly. atomic.StoreInt32(&db.inWritePaused, 1) @@ -97,13 +102,17 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { if err != nil { return false } + + // 注意:当 err==nil 时,会 return true,也就是继续重试,如果还是 tLen >= pausedTrigger,则继续触发 L0 compaction + // 直到 Level-0 的 SST 数目不 block write default: + // 此时,mdb 的空间不足以容纳 n 规模的数据,且 write 没有被 Level-0 的 compaction block // Allow memdb to grow if it has no entry. if mdb.Len() == 0 { mdbFree = n } else { mdb.decref() - mdb, err = db.rotateMem(n, false) + mdb, err = db.rotateMem(n, false) // 不会等待 memDB compaction 完成 if err == nil { mdbFree = mdb.Free() } else { diff --git a/leveldb/journal/journal.go b/leveldb/journal/journal.go index f7f8b540..d8f6adc1 100644 --- a/leveldb/journal/journal.go +++ b/leveldb/journal/journal.go @@ -413,12 +413,20 @@ func (w *Writer) writePending() { if w.err != nil { return } + + // pending 的 chunk 指的是写了payload,但是尚未 fillHeader 的 chunk + // 只有把 header 填充好,才是一个完备的 chunk,而填充 header 必须得在 chunk 的内容锁定之后 + // chunk 内容的锁定有两种情况: + // 1. block 写满了,不得不截断出一个 chunk; + // 2. 需要写的 payload 的确写完了,没东西继续写了,要终止 chunk,跟下一条 journal record 使用的 chunk 分隔开 if w.pending { w.fillHeader(true) - w.pending = false + w.pending = false // chunk 都写完了,没有还没有 header 的 chunk 了 } _, w.err = w.w.Write(w.buf[w.written:w.j]) w.written = w.j + + // 可以调用 Next() 了 } // Close finishes the current journal and closes the writer. @@ -508,6 +516,7 @@ type singleWriter struct { seq int } +// singleWrite 可以写入多次 p,写满 block 时会自动 fillHeader 然后开启下一个 block func (x singleWriter) Write(p []byte) (int, error) { w := x.w if w.seq != x.seq { @@ -520,12 +529,12 @@ func (x singleWriter) Write(p []byte) (int, error) { for len(p) > 0 { // Write a block, if it is full. if w.j == blockSize { - w.fillHeader(false) + w.fillHeader(false) // p 还没有写完,这时候一个 block 写满了,要开启下一个 block,所以这个 chunk 就不会是 last w.writeBlock() if w.err != nil { - return 0, w.err + return 0, w.err // 写 journal 的时候任意一段 chunk 写入失败就算作失败 } - w.first = false + w.first = false // 写了一个 chunk 后,下一个要写的 chunk 就不再是 first 了 } // Copy bytes into the buffer. n := copy(w.buf[w.j:], p) diff --git a/leveldb/table.go b/leveldb/table.go index d0fab40c..04453595 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -149,6 +149,7 @@ func (tf tFiles) searchNumLess(num int64) int { // Searches smallest index of tables whose its smallest // key is after the given key. +// 按照 user key 来作比较 func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int { return sort.Search(len(tf), func(i int) bool { return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0 @@ -157,6 +158,7 @@ func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int { // Searches smallest index of tables whose its largest // key is after the given key. +// 按照 user key 来作比较 func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int { return sort.Search(len(tf), func(i int) bool { return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0 @@ -179,13 +181,14 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo i := 0 if len(umin) > 0 { // Find the earliest possible internal key for min. + // 先找第一个 imax 大于等于 umin 的 SSTs i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek)) } if i >= len(tf) { // Beginning of range is after all files, so no overlap. return false } - return !tf[i].before(icmp, umax) + return !tf[i].before(icmp, umax) // umax 不在这个文件前面,即此 SST 与 [umin, umax] 有交集 } // Returns tables whose its key range overlaps with given key range. @@ -202,7 +205,7 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove // And what's more, the files in these levels are strictly sorted, // so use binary search instead of heavy traverse. if !overlapped { - var begin, end int + var begin, end int // 区间 [begin, end) // Determine the begin index of the overlapped file if umin != nil { index := tf.searchMinUkey(icmp, umin) @@ -242,6 +245,8 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove for i := 0; i < len(tf); { t := tf[i] if t.overlaps(icmp, umin, umax) { + + // 扩大 [umin, umax] 的范围 if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 { umin = t.imin.ukey() dst = dst[:0] @@ -284,7 +289,7 @@ func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) { // Creates iterator index from tables. func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { if slice != nil { - var start, limit int + var start, limit int // 区间 [start, limit),limit 更像是 end 的概念 if slice.Start != nil { start = tf.searchMax(icmp, internalKey(slice.Start)) } @@ -389,7 +394,7 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { return } } - err = src.Error() + err = src.Error() // accumulated error if err != nil { return } @@ -402,7 +407,8 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { // Opens table. It returns a cache handle, which should // be released after use. func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { - ch = t.fileCache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) { + + loader := func() (size int, value cache.Value) { var r storage.Reader r, err = t.s.stor.Open(f.fd) if err != nil { @@ -421,8 +427,9 @@ func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { return 0, nil } return 1, tr + } - }) + ch = t.fileCache.Get(0, uint64(f.fd.Num), loader) if ch == nil && err == nil { err = ErrClosed } diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go index 8128794c..626dc543 100644 --- a/leveldb/table/reader.go +++ b/leveldb/table/reader.go @@ -62,14 +62,16 @@ type block struct { } func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { - index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { - offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) - offset++ // shared always zero, since this is a restart point - v1, n1 := binary.Uvarint(b.data[offset:]) // key length - _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length - m := offset + n1 + n2 - return cmp.Compare(b.data[m:m+int(v1)], key) > 0 - }) + rstart - 1 + index = sort.Search( + b.restartsLen-rstart-(b.restartsLen-rlimit), + func(i int) bool { + offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) + offset++ // shared always zero, since this is a restart point + v1, n1 := binary.Uvarint(b.data[offset:]) // key length + _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length + m := offset + n1 + n2 + return cmp.Compare(b.data[m:m+int(v1)], key) > 0 + }) + rstart - 1 if index < rstart { // The smallest key is greater-than key sought. index = rstart @@ -121,7 +123,7 @@ type dir int const ( dirReleased dir = iota - 1 dirSOI - dirEOI + dirEOI // 到了流的尾部,在用 dir 表达iterator的状态 dirBackward dirForward ) @@ -221,6 +223,7 @@ func (i *blockIter) Last() bool { return i.Prev() } +// 找到第一个大于等于 key 的记录 func (i *blockIter) Seek(key []byte) bool { if i.err != nil { return false @@ -262,20 +265,27 @@ func (i *blockIter) Next() bool { i.prevNode = i.prevNode[:0] i.prevKeys = i.prevKeys[:0] } + + // 使用 i.offset 在作iteration + // 为什么时一个循环?如果多次循环,那么 i.key 和 i.value 之前的值不久被覆盖了吗?s for i.offset < i.offsetRealStart { key, value, nShared, n, err := i.block.entry(i.offset) if err != nil { i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) return false } + if n == 0 { i.dir = dirEOI return false } + + // 读到新的key-value pair i.key = append(i.key[:nShared], key...) i.value = value i.offset += n } + if i.offset >= i.offsetLimit { i.dir = dirEOI if i.offset != i.offsetLimit { @@ -283,6 +293,7 @@ func (i *blockIter) Next() bool { } return false } + key, value, nShared, n, err := i.block.entry(i.offset) if err != nil { i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) @@ -292,6 +303,7 @@ func (i *blockIter) Next() bool { i.dir = dirEOI return false } + i.key = append(i.key[:nShared], key...) i.value = value i.prevOffset = i.offset @@ -527,6 +539,7 @@ type Reader struct { filterBlock *filterBlock } +// 一个 SST 包含多个blocks func (r *Reader) blockKind(bh blockHandle) string { switch bh.offset { case r.metaBH.offset: @@ -549,6 +562,7 @@ func (r *Reader) newErrCorruptedBH(bh blockHandle, reason string) error { return r.newErrCorrupted(int64(bh.offset), int64(bh.length), r.blockKind(bh), reason) } +// 汇报一个block的错误 func (r *Reader) fixErrCorruptedBH(bh blockHandle, err error) error { if cerr, ok := err.(*ErrCorrupted); ok { cerr.Pos = int64(bh.offset) diff --git a/leveldb/table/table.go b/leveldb/table/table.go index 29f80f8e..29bb9ead 100644 --- a/leveldb/table/table.go +++ b/leveldb/table/table.go @@ -153,6 +153,7 @@ const ( blockTypeSnappyCompression = 1 ) +// 一个 blockHandle 可以指定一块数据区域s type blockHandle struct { offset, length uint64 } diff --git a/leveldb/table/writer.go b/leveldb/table/writer.go index ea89d600..6f3273d7 100644 --- a/leveldb/table/writer.go +++ b/leveldb/table/writer.go @@ -396,7 +396,7 @@ func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Wr } else { bufBytes = pool.Get(size) } - bufBytes = bufBytes[:0] + bufBytes = bufBytes[:0] // clear up bufBytes, in case it is fetched from pool w := &Writer{ writer: f, diff --git a/leveldb/version.go b/leveldb/version.go index 46725091..458f1dbb 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -22,6 +22,7 @@ type tSet struct { table *tFile } +// 保存每个层级的文件信息以及判断是否需要更新compaction相关的变量 type version struct { id int64 // unique monotonous increasing version id s *session From 38a4b25065d2922c1320c493355f808e7a98ee05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Fri, 28 Oct 2022 12:52:22 +0800 Subject: [PATCH 02/11] add comments --- leveldb/db.go | 6 +++--- leveldb/db_compaction.go | 1 + leveldb/db_write.go | 7 +++++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/leveldb/db.go b/leveldb/db.go index 25da7ce5..1d0620f0 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -64,9 +64,9 @@ type DB struct { // Write. batchPool sync.Pool - writeMergeC chan writeMerge + writeMergeC chan writeMerge // 有 merge write 需求时 chan<-,执行 merge write 的一方会不断地 <-chan 合并可以合并过来的写请求 writeMergedC chan bool - writeLockC chan struct{} + writeLockC chan struct{} // write 的写锁,成功 chan<- 说明拿到了锁,用完了之后 <-chan 就是释放锁 writeAckC chan error writeDelay time.Duration writeDelayN int @@ -76,7 +76,7 @@ type DB struct { compCommitLk sync.Mutex tcompCmdC chan cCmd tcompPauseC chan chan<- struct{} - mcompCmdC chan cCmd + mcompCmdC chan cCmd // 全称 memdb compaction command channel? compErrC chan error compPerErrC chan error compErrSetC chan error diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index cc275ace..fe455c0a 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -678,6 +678,7 @@ func (db *DB) pauseCompaction(ch chan<- struct{}) { } } +// 是 compaction command? type cCmd interface { ack(err error) } diff --git a/leveldb/db_write.go b/leveldb/db_write.go index 7d985e3d..26d581f9 100644 --- a/leveldb/db_write.go +++ b/leveldb/db_write.go @@ -192,14 +192,14 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { merge: for mergeLimit > 0 { select { - case incoming := <-db.writeMergeC: + case incoming := <-db.writeMergeC: // 不断合并可得的写请求,可能是要合并一个 batch write,或者合并一个 put if incoming.batch != nil { // Merge batch. if incoming.batch.internalLen > mergeLimit { overflow = true break merge } - batches = append(batches, incoming.batch) + batches = append(batches, incoming.batch) // 注意:这里是整个 batch 合并,保证 batch 执行的原子性,不会存在 batch 一部分成功了,另一部分失败了的情况 mergeLimit -= incoming.batch.internalLen } else { // Merge put. @@ -237,6 +237,8 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { seq := db.seq + 1 // Write journal. + // 先写 journal,journal 写成功了就是 Write 成功了 + // journal 的 seq 写入的时候用的是相同的 seq if err := db.writeJournal(batches, seq, sync); err != nil { db.unlockWrite(overflow, merged, err) return err @@ -244,6 +246,7 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { // Put batches. for _, batch := range batches { + // mdb 写入的时候不应该出现问题 if err := batch.putMem(seq, mdb.DB); err != nil { panic(err) } From 64152cd719d1e19bee759c87a123e2e398041d0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Sun, 30 Oct 2022 22:35:03 +0800 Subject: [PATCH 03/11] add comments --- leveldb/comparer/comparer.go | 10 +++++- leveldb/db.go | 18 +++++------ leveldb/db_compaction.go | 11 +++++++ leveldb/db_state.go | 3 ++ leveldb/db_write.go | 9 +++++- leveldb/session.go | 1 + leveldb/session_compaction.go | 1 + leveldb/session_record.go | 8 +++-- leveldb/table/reader.go | 61 +++++++++++++++++++++++++---------- leveldb/table/table.go | 2 +- leveldb/table/writer.go | 34 +++++++++++++++---- leveldb/version.go | 1 + 12 files changed, 121 insertions(+), 38 deletions(-) diff --git a/leveldb/comparer/comparer.go b/leveldb/comparer/comparer.go index 2c522db2..e68f274d 100644 --- a/leveldb/comparer/comparer.go +++ b/leveldb/comparer/comparer.go @@ -45,13 +45,21 @@ type Comparer interface { // // Either contents of a or b should not by any means modified. Doing so // may cause corruption on the internal state. + // + // 以 LevelDB 提供的 bytes comparer,a = {0xff, 0xff, 0x11}, b = {0xff, 0xff, 0x1A, ...} 来讲, + // 前提是 b > a,否则返回 nil + // Separator(dst, a, b) 的结果是:dst = dst + {0xff, 0xff, 0x12} + // “分隔”体现在 dst 的最后一个 0x12 把 a 和 b 给分隔开了 Separator(dst, a, b []byte) []byte // Successor appends a sequence of bytes x to dst such that x >= b, where // 'less than' is consistent with Compare. An implementation should return // nil if x equal to b. - // // Contents of b should not by any means modified. Doing so may cause // corruption on the internal state. + // + // 以 LevelDB 提供的 bytes comparer,b = {0xff, 0xff, 0x1A, ...} 来讲, Successor(dst, b) 的结果是: + // dst = dst + {0xff, 0xff, 0x1B},注意结尾是比 0x1A 大了 1 的 0x1B + // 如果 b 里面都是 0xff,会返回 nil Successor(dst, b []byte) []byte } diff --git a/leveldb/db.go b/leveldb/db.go index 1d0620f0..03da03a0 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -65,20 +65,20 @@ type DB struct { // Write. batchPool sync.Pool writeMergeC chan writeMerge // 有 merge write 需求时 chan<-,执行 merge write 的一方会不断地 <-chan 合并可以合并过来的写请求 - writeMergedC chan bool - writeLockC chan struct{} // write 的写锁,成功 chan<- 说明拿到了锁,用完了之后 <-chan 就是释放锁 - writeAckC chan error - writeDelay time.Duration - writeDelayN int + writeMergedC chan bool // chan<- false 用来通知某个 write 它没有被 merge + writeLockC chan struct{} // write 的写锁,成功 chan<- 说明拿到了锁,用完了之后 <-chan 就是释放锁 + writeAckC chan error // chan<- 通知等待 ACK 的一方;<-chan 等待的一方读取 Write 的结果 + writeDelay time.Duration // 记录 Write 被 compaction 所 delay 的时长 + writeDelayN int // 记录 Write 被 compaction 所 delay 的次数 tr *Transaction // Compaction. compCommitLk sync.Mutex tcompCmdC chan cCmd tcompPauseC chan chan<- struct{} - mcompCmdC chan cCmd // 全称 memdb compaction command channel? - compErrC chan error - compPerErrC chan error + mcompCmdC chan cCmd // 全称 memdb compaction command channel? + compErrC chan error // compaction error + compPerErrC chan error // 全称 compaction persistent error compErrSetC chan error compWriteLocking bool compStats cStats @@ -106,7 +106,7 @@ func openDB(s *session) (*DB, error) { batchPool: sync.Pool{New: newBatch}, writeMergeC: make(chan writeMerge), writeMergedC: make(chan bool), - writeLockC: make(chan struct{}, 1), + writeLockC: make(chan struct{}, 1), // 注意:channel 的 buffer 为 1,因为首个去获取 Write Lock 的 goroutine 必须得能通过 chan<- 拿到 Lock writeAckC: make(chan error), // Compaction tcompCmdC: make(chan cCmd), diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index fe455c0a..94155135 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -59,6 +59,7 @@ func (p *cStatStaging) stopTimer() { } } +// 按照 level 来组织 cStat,level[n] 表示 level-n 的 cStat,level 从 0 开始 type cStats struct { lk sync.Mutex stats []cStat @@ -173,6 +174,8 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { disableBackoff = db.s.o.GetDisableCompactionBackoff() ) + + // 有 backOff retry for n := 0; ; n++ { // Check whether the DB is closed. if db.isClosed() { @@ -192,6 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { // Set compaction error status. select { case db.compErrSetC <- err: + // 注意:err == nil 时也要放入 compErrSetC case perr := <-db.compPerErrC: if err != nil { db.logf("%s exiting (persistent error %q)", name, perr) @@ -204,6 +208,8 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) { if err == nil { return } + + // 如果是 corrupted error,是不可以重试解决的 if errors.IsCorrupted(err) { db.logf("%s exiting (corruption detected)", name) db.compactionExitTransact() @@ -279,6 +285,7 @@ func (db *DB) memCompaction() { if mdb.Len() == 0 { db.logf("memdb@flush skipping") // drop frozen memdb + // 此时 immutable memdb 是空的,所以不用 compaction 了,直接删除掉对应的 journal 文件即可 db.dropFrozenMem() return } @@ -304,11 +311,15 @@ func (db *DB) memCompaction() { db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() flushLevel, err = db.s.flushMemdb(rec, mdb.DB, db.memdbMaxLevel) + + // 这里 rec 应该只有一条记录,就是当前 memdb compaction 出来的 SST + stats.stopTimer() return }, func() error { for _, r := range rec.addedTables { db.logf("memdb@flush revert @%d", r.num) + // 删除创建的 SST,完成 revert if err := db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: r.num}); err != nil { return err } diff --git a/leveldb/db_state.go b/leveldb/db_state.go index d4963c41..e3e7ebec 100644 --- a/leveldb/db_state.go +++ b/leveldb/db_state.go @@ -203,11 +203,14 @@ func (db *DB) getFrozenMem() *memDB { // Drop frozen memdb; assume that frozen memdb isn't nil. func (db *DB) dropFrozenMem() { db.memMu.Lock() + + // 删除 immutable memtable 的 journal if err := db.s.stor.Remove(db.frozenJournalFd); err != nil { db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err) } else { db.logf("journal@remove removed @%d", db.frozenJournalFd.Num) } + db.frozenJournalFd = storage.FileDesc{} db.frozenMem.decref() db.frozenMem = nil diff --git a/leveldb/db_write.go b/leveldb/db_write.go index 26d581f9..1f4da79d 100644 --- a/leveldb/db_write.go +++ b/leveldb/db_write.go @@ -190,6 +190,10 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { } merge: + // 只要容量还有剩余,就 merge 下一个 Write,直到超出容量上限 + // 对于 merge 进来的 Write,通知负责 Write 的一端这个 Write 已经被 merge,于是负责的那一方可以不用管了,我们这里会负责到底,直到 Write 成功或失败 + // 对于因为超出了容量,merge 不进来的那个 Write,我们通知它的负责人“这个 Write 得由你自己负责了,没赶上上一波发车”, + // Write Lock 交接给那个负责人,它可以继续执行从它开始的 merge write for mergeLimit > 0 { select { case incoming := <-db.writeMergeC: // 不断合并可得的写请求,可能是要合并一个 batch write,或者合并一个 put @@ -220,7 +224,7 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { } sync = sync || incoming.sync merged++ - db.writeMergedC <- true + db.writeMergedC <- true // 通知各个 Write,这次操作已经被 merge 了 default: break merge @@ -239,6 +243,7 @@ func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { // Write journal. // 先写 journal,journal 写成功了就是 Write 成功了 // journal 的 seq 写入的时候用的是相同的 seq + // journal 包含了所有的 batches,同时成功或失败 if err := db.writeJournal(batches, seq, sync); err != nil { db.unlockWrite(overflow, merged, err) return err @@ -374,6 +379,8 @@ func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error batch := db.batchPool.Get().(*Batch) batch.Reset() batch.appendRec(kt, key, value) + + // 这里传参 batch 和 ourBatch 都是同一个 `batch`,所以函数内 ourBatch 在合并 put 的时候,就相当于那个共同的 `batch` 把 put 合并掉了 return db.writeLocked(batch, batch, merge, sync) } diff --git a/leveldb/session.go b/leveldb/session.go index 036570e0..67cef61b 100644 --- a/leveldb/session.go +++ b/leveldb/session.go @@ -213,6 +213,7 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) { defer v.release() // spawn new version based on current version + // 记录在 current version 中! nv := v.spawn(r, trivial) // abandon useless version id to prevent blocking version processing loop. diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go index 2fd5f32e..35106e9a 100644 --- a/leveldb/session_compaction.go +++ b/leveldb/session_compaction.go @@ -28,6 +28,7 @@ func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int { return v.pickMemdbLevel(umin, umax, maxLevel) } +// 创建 SST 并添加到 rec func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) { // Create sorted table. iter := mdb.NewIterator(nil) diff --git a/leveldb/session_record.go b/leveldb/session_record.go index b1a352f6..b6c17f9e 100644 --- a/leveldb/session_record.go +++ b/leveldb/session_record.go @@ -39,9 +39,11 @@ type cpRecord struct { ikey internalKey } +// add table Record +// add SSTable type atRecord struct { - level int - num int64 + level int // 把 table 添加到第几个 level + num int64 // fileDesc num size int64 imin internalKey imax internalKey @@ -53,7 +55,7 @@ type dtRecord struct { } type sessionRecord struct { - hasRec int + hasRec int // enum, addTable: 7 comparer string journalNum int64 prevJournalNum int64 diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go index 626dc543..e768aa42 100644 --- a/leveldb/table/reader.go +++ b/leveldb/table/reader.go @@ -57,21 +57,36 @@ type block struct { bpool *util.BufferPool bh blockHandle data []byte - restartsLen int + restartsLen int // restart point 的个数 restartsOffset int } +// 返回最后一个小于等于 key 的 restart point index,和对应 restart point 的 offset +// 后续一半从这个 restart point 开始查找 key func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { + // 查 [rstart, rlimit) 上的 restart point + // 这个二分查找是要找到第一个比 key 大的 restart point,返回的下标是针对 [rstart, rlimit) 而言的 + // 比如返回 i==0 的话,其实对应的是 restart point rstart + // 注意配合上后面的 + rstart - 1 使用,那么最终 index 是:最后一个小于等于 key 的 restart point index = sort.Search( - b.restartsLen-rstart-(b.restartsLen-rlimit), + b.restartsLen-rstart-(b.restartsLen-rlimit), // [0, rlimit - rstart) func(i int) bool { + // 看从 rstart 算起, restart point i (based 0) 的 key 是否比 key 大 + + // [rstart, rlimit] 上的第 i 个 offset,比如 i==0 的话,就是 rstart 这个 restart point + // offset 就定位到这个 restart point 指向的数据 + // 因为 restart point 指向的数据没有“前面的”数据,所以 shared_length 必然等于 0 offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) - offset++ // shared always zero, since this is a restart point + offset++ // shared always zero, since this is a restart point + v1, n1 := binary.Uvarint(b.data[offset:]) // key length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length - m := offset + n1 + n2 + m := offset + n1 + n2 // key offset return cmp.Compare(b.data[m:m+int(v1)], key) > 0 }) + rstart - 1 + + // 如果 [rstart, rlimit) 都比 key 大,那么最终调整 index=rstart,因为(调用方)最终是找第一个大于等于 key 的,这样一来对调用方来讲最终答案就是 rstart + // 如果 [rstart, rlimit) 都不比 key 大,那么最终 index=rlimit-1,即区间上的最后一个 restart point,因为数据有序排布,所以这个最后的 restart point 之后的数据也是可能会大于等于 key 的 if index < rstart { // The smallest key is greater-than key sought. index = rstart @@ -90,22 +105,27 @@ func (b *block) restartOffset(index int) int { return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) } +// n: 读取的这条 k/v 的总存储长度,包括 shared_bytes, unshared_bytes 等信息的所有 bytes func (b *block) entry(offset int) (key, value []byte, nShared, n int, err error) { + + // offset 不应该伸展到 restartsOffset 区域 if offset >= b.restartsOffset { if offset != b.restartsOffset { err = &ErrCorrupted{Reason: "entries offset not aligned"} } return } + v0, n0 := binary.Uvarint(b.data[offset:]) // Shared prefix length - v1, n1 := binary.Uvarint(b.data[offset+n0:]) // Key length + v1, n1 := binary.Uvarint(b.data[offset+n0:]) // 注:Unshard key length v2, n2 := binary.Uvarint(b.data[offset+n0+n1:]) // Value length m := n0 + n1 + n2 - n = m + int(v1) + int(v2) + n = m + int(v1) + int(v2) // 一条 k/v 的长度 if n0 <= 0 || n1 <= 0 || n2 <= 0 || offset+n > b.restartsOffset { err = &ErrCorrupted{Reason: "entries corrupted"} return } + key = b.data[offset+m : offset+m+int(v1)] value = b.data[offset+m+int(v1) : offset+n] nShared = int(v0) @@ -122,8 +142,8 @@ type dir int const ( dirReleased dir = iota - 1 - dirSOI - dirEOI // 到了流的尾部,在用 dir 表达iterator的状态 + dirSOI // 在 iteration 的头部,Start Of Iteration + dirEOI // 到了 iteration 的尾部,在用 dir 表达iterator的状态 dirBackward dirForward ) @@ -139,14 +159,14 @@ type blockIter struct { prevOffset int prevNode []int prevKeys []byte - restartIndex int + restartIndex int // 注:the index of restart point // Iterator direction. dir dir // Restart index slice range. - riStart int - riLimit int + riStart int // 框定 iterator 的范围,起始的 restart point index + riLimit int // 框定 iterator 的范围,结束的 restart point index(不包括) // Offset slice range. - offsetStart int + offsetStart int // 框定 iterator 的范围,起始的 offset offsetRealStart int offsetLimit int // Error. @@ -232,6 +252,8 @@ func (i *blockIter) Seek(key []byte) bool { return false } + // 按照 restart point 二分查找 + // 找第一个小于 key 的 restart point ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key) if err != nil { i.sErr(err) @@ -242,6 +264,8 @@ func (i *blockIter) Seek(key []byte) bool { if i.dir == dirSOI || i.dir == dirEOI { i.dir = dirForward } + + // 最终找第一个大于等于 key 的记录 for i.Next() { if i.tr.cmp.Compare(i.key, key) >= 0 { return true @@ -266,8 +290,8 @@ func (i *blockIter) Next() bool { i.prevKeys = i.prevKeys[:0] } - // 使用 i.offset 在作iteration - // 为什么时一个循环?如果多次循环,那么 i.key 和 i.value 之前的值不久被覆盖了吗?s + // 使用 i.offset 作 iteration + // 把 i.offset 推进到 i.offsetRealStart for i.offset < i.offsetRealStart { key, value, nShared, n, err := i.block.entry(i.offset) if err != nil { @@ -280,10 +304,11 @@ func (i *blockIter) Next() bool { return false } - // 读到新的key-value pair + // 读到新的 k/v + // 注意:这里新读到的 key 是由 shared + unshared 拼凑出来的 i.key = append(i.key[:nShared], key...) i.value = value - i.offset += n + i.offset += n // 跨过整条 k/v } if i.offset >= i.offsetLimit { @@ -533,7 +558,9 @@ type Reader struct { filter filter.Filter verifyChecksum bool - dataEnd int64 + dataEnd int64 + + // indexBH: data index block handle,检索 data blocks metaBH, indexBH, filterBH blockHandle indexBlock *block filterBlock *filterBlock diff --git a/leveldb/table/table.go b/leveldb/table/table.go index 29bb9ead..1f06d6a4 100644 --- a/leveldb/table/table.go +++ b/leveldb/table/table.go @@ -153,7 +153,7 @@ const ( blockTypeSnappyCompression = 1 ) -// 一个 blockHandle 可以指定一块数据区域s +// 一个 blockHandle 可以指定一块数据区域 type blockHandle struct { offset, length uint64 } diff --git a/leveldb/table/writer.go b/leveldb/table/writer.go index 6f3273d7..1b17cfb0 100644 --- a/leveldb/table/writer.go +++ b/leveldb/table/writer.go @@ -31,6 +31,8 @@ func sharedPrefixLen(a, b []byte) int { return i } +// 是 data block,就是存 k/v +// 默认 blockSize = 4kb type blockWriter struct { restartInterval int buf util.Buffer @@ -158,7 +160,7 @@ type Writer struct { dataBlock blockWriter indexBlock blockWriter filterBlock filterWriter - pendingBH blockHandle + pendingBH blockHandle // 上一个写入了的 dataBlock 的 block handle offset uint64 nEntries int // Scratch allocated enough for 5 uvarint. Block writer should not use @@ -166,7 +168,7 @@ type Writer struct { // then passed to the block writer itself. scratch [50]byte comparerScratch []byte - compressionScratch []byte + compressionScratch []byte // 写入 dataBlock 的时候保存 dataBlock 被 compression 之后的数据 } func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) { @@ -182,6 +184,9 @@ func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh b b = compressed[:n+blockTrailerLen] b[n] = blockTypeSnappyCompression } else { + // buf.Alloc 会原地增长 buf 本身,预留出来 blockTrailerLen 这么多的 bytes + // 所以这里的 else 会得到跟上面选择压缩的时候同样布局的 b: + // {b's data} | {CRC checksum, 4 bytes} | {compression type, 1 byte} tmp := buf.Alloc(blockTrailerLen) tmp[0] = blockTypeNoCompression b = buf.Bytes() @@ -193,12 +198,15 @@ func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh b binary.LittleEndian.PutUint32(b[n:], checksum) // Write the buffer to the file. + // 此时的 b 保存了排布好的 entry 和 restart points,直接写进文件就可以了 _, err = w.writer.Write(b) if err != nil { return } + + // bh 保存的是当前写入的 b 的 blockHandle bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)} - w.offset += uint64(len(b)) + w.offset += uint64(len(b)) // blockTrailer 在文件中肯定也是算长度的,所以这里 offset 增长的就是 + len(b),包括了 blockTrailer 的长度 return } @@ -206,6 +214,8 @@ func (w *Writer) flushPendingBH(key []byte) error { if w.pendingBH.length == 0 { return nil } + + // 前一个 block 的最后一个 key,与当前 block 的第一个 key,两者的最短分隔符 var separator []byte if len(key) == 0 { separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey) @@ -213,15 +223,19 @@ func (w *Writer) flushPendingBH(key []byte) error { separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key) } if separator == nil { + // 比如当 len(key) == 0 && w.dataBlock.prevKey 里面都是 0xff 时,separator 将会是 nil + // 或者 w.dataBlock.prevKey == key 时,separator 也会是 nil separator = w.dataBlock.prevKey } else { w.comparerScratch = separator } - n := encodeBlockHandle(w.scratch[:20], w.pendingBH) + + n := encodeBlockHandle(w.scratch[:20], w.pendingBH) // pendingBH 是 prev dataBlock 的 handle // Append the block handle to the index block. if err := w.indexBlock.append(separator, w.scratch[:n]); err != nil { return err } + // Reset prev key of the data block. w.dataBlock.prevKey = w.dataBlock.prevKey[:0] // Clear pending block handle. @@ -230,14 +244,16 @@ func (w *Writer) flushPendingBH(key []byte) error { } func (w *Writer) finishBlock() error { + // 写入 dataBlock 的 restart points if err := w.dataBlock.finish(); err != nil { return err } + bh, err := w.writeBlock(&w.dataBlock.buf, w.compression) if err != nil { return err } - w.pendingBH = bh + w.pendingBH = bh // pendingBH 记录下目前写入的 blockHandle,下一次再写入 key 的时候,会开启一个新的 dataBlock,那时 pendingBH 会被写入 // Reset the data block. w.dataBlock.reset() // Flush the filter block. @@ -269,6 +285,8 @@ func (w *Writer) Append(key, value []byte) error { w.filterBlock.add(key) // Finish the data block if block size target reached. + // 那么一个 data block 的实际大小是可能超过 blockSize 的,因为不是预留空间然后保证不超 + // 所以,一条很大的数据可能会占据一个超大的 block?允许吗?Why if w.dataBlock.bytesLen() >= w.blockSize { if err := w.finishBlock(); err != nil { w.err = err @@ -319,6 +337,7 @@ func (w *Writer) Close() error { // Write the last data block. Or empty data block if there // aren't any data blocks at all. + // 如果 data block 是空的,至少会写入一个 restart point if w.dataBlock.nEntries > 0 || w.nEntries == 0 { if err := w.finishBlock(); err != nil { w.err = err @@ -343,8 +362,11 @@ func (w *Writer) Close() error { // Write the metaindex block. if filterBH.length > 0 { + // key: "filter.{w.filter.Name()}" + // val: filter block handle key := []byte("filter." + w.filter.Name()) n := encodeBlockHandle(w.scratch[:20], filterBH) + // 注意:这里写 k/v 也是用的 w.dataBlock,即 metaindex block 本身的格式也是 data block 的格式 if err := w.dataBlock.append(key, w.scratch[:n]); err != nil { return err } @@ -375,7 +397,7 @@ func (w *Writer) Close() error { } n := encodeBlockHandle(footer, metaindexBH) encodeBlockHandle(footer[n:], indexBH) - copy(footer[footerLen-len(magic):], magic) + copy(footer[footerLen-len(magic):], magic) // 文件的尾部是 magic number if _, err := w.writer.Write(footer); err != nil { w.err = err return w.err diff --git a/leveldb/version.go b/leveldb/version.go index 458f1dbb..51afd0fd 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -328,6 +328,7 @@ func (v *version) offsetOf(ikey internalKey) (n int64, err error) { return } +// maxLevel 通常只用于 testing,prod 使用中取 0 func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) { if maxLevel > 0 { if len(v.levels) == 0 { From 86c702be28c741c2478ce783428bcb1b60980b80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Mon, 31 Oct 2022 10:32:58 +0800 Subject: [PATCH 04/11] add comments --- leveldb/table/reader.go | 15 +++++++++++++++ leveldb/table/writer.go | 1 + 2 files changed, 16 insertions(+) diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go index e768aa42..62358939 100644 --- a/leveldb/table/reader.go +++ b/leveldb/table/reader.go @@ -802,6 +802,7 @@ func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Ran return bi } +// 读取 data block 的 iterator func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, verifyChecksum, fillCache bool) iterator.Iterator { b, rel, err := r.readBlockCached(dataBH, verifyChecksum, fillCache) if err != nil { @@ -876,6 +877,17 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo index := r.newBlockIter(indexBlock, nil, nil, true) defer index.Release() + // index 也是一个 data block + // 所以 index.Seek 方法也是去找第一个大于等于 key 的记录,只是该条记录的 val 是 data block handle + // data block handle 的 key 是这个 block 的 successor,所以比所索引的 data block 的所有记录都大 + // + // 所以我们从第一个大于等于 key 的 data block handle 开始找就可以了, + // 前面的那个 data block 因为索引键都小于要找的key了,里面的数据就更不可能大于等于 key 了 + // + // Why: data index 的 key 是 data block 的 last key 的 successor, + // 如果 user key 就全是 0xff, 0xff, 0xff...,那么 last key 的 successor 就必然会增加 seq number 的字段 + // 那么,这个 successor 不就会“小于” data block 里面的记录了吗?不就不是 "successor" 了吗? + // 当然,这种情况生产环境下不大可能会出现 if !index.Seek(key) { if err = index.Error(); err == nil { err = ErrNotFound @@ -883,6 +895,7 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo return } + // 找到了潜在的 data block dataBH, n := decodeBlockHandle(index.Value()) if n == 0 { r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") @@ -893,6 +906,7 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo if filtered && r.filter != nil { filterBlock, frel, ferr := r.getFilterBlock(true) if ferr == nil { + // 使用 filter 来提前判断 key 不存在,省去读取 block 的内容再查询 if !filterBlock.contains(r.filter, dataBH.offset, key) { frel.Release() return nil, nil, ErrNotFound @@ -904,6 +918,7 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo } data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache()) + // 在具体的 block 中去找 if !data.Seek(key) { data.Release() if err = data.Error(); err != nil { diff --git a/leveldb/table/writer.go b/leveldb/table/writer.go index 1b17cfb0..429ab313 100644 --- a/leveldb/table/writer.go +++ b/leveldb/table/writer.go @@ -225,6 +225,7 @@ func (w *Writer) flushPendingBH(key []byte) error { if separator == nil { // 比如当 len(key) == 0 && w.dataBlock.prevKey 里面都是 0xff 时,separator 将会是 nil // 或者 w.dataBlock.prevKey == key 时,separator 也会是 nil + // 总之,separator 得是大于等于 w.dataBlock 中的所有 key 的 separator = w.dataBlock.prevKey } else { w.comparerScratch = separator From cccf7609e3be69d2682c15295ab3d18dba88f8a6 Mon Sep 17 00:00:00 2001 From: songyinglong Date: Mon, 31 Oct 2022 16:01:49 +0800 Subject: [PATCH 05/11] add comments Change-Id: I4fa039e7df54239d9bbdeeadc63615a7277c0d1d --- leveldb/comparer.go | 2 + leveldb/db.go | 4 + leveldb/session.go | 4 +- leveldb/table/reader.go | 2 + leveldb/version.go | 162 ++++++++++++++++++++++++---------------- 5 files changed, 106 insertions(+), 68 deletions(-) diff --git a/leveldb/comparer.go b/leveldb/comparer.go index 835de420..710938f4 100644 --- a/leveldb/comparer.go +++ b/leveldb/comparer.go @@ -11,6 +11,7 @@ import ( ) // iComparer 处理关于 sequence 相关的逻辑 +// 感觉可以称之为 internal Comparer? type iComparer struct { ucmp comparer.Comparer } @@ -35,6 +36,7 @@ func (icmp *iComparer) Name() string { return icmp.uName() } +// func (icmp *iComparer) Compare(a, b []byte) int { x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey()) if x == 0 { diff --git a/leveldb/db.go b/leveldb/db.go index 03da03a0..9777061b 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -778,6 +778,8 @@ func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byt return } +// 正常的查询 auxm 和 auxt 都为 nil +// 猜:auxm and auxt is only for testing? func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { ikey := makeInternalKey(nil, key, seq, keyTypeSeek) @@ -794,6 +796,8 @@ func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.R } defer m.decref() + // 依次从 memtable 和 immutable memtable 获取 + // 如果找到的话,可以返回结果,因为找到的这个必然是 seq number 小于等于 seq 且是最大的那个,小的 seq number 会被 compaction if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok { return append([]byte(nil), mv...), me } diff --git a/leveldb/session.go b/leveldb/session.go index 67cef61b..ae6bfd25 100644 --- a/leveldb/session.go +++ b/leveldb/session.go @@ -55,8 +55,8 @@ type session struct { stCompPtrs []internalKey // compaction pointers; need external synchronization stVersion *version // current version ntVersionID int64 // next version id to assign - refCh chan *vTask - relCh chan *vTask + refCh chan *vTask // 注:reference version task + relCh chan *vTask // 注:release version task deltaCh chan *vDelta abandon chan int64 closeC chan struct{} diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go index 62358939..2b986d4d 100644 --- a/leveldb/table/reader.go +++ b/leveldb/table/reader.go @@ -859,6 +859,8 @@ func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It return iterator.NewIndexedIterator(index, opt.GetStrict(r.o, ro, opt.StrictReader)) } +// 在 SSTable 中查找 key +// 从调用端来看,key 是 internal key func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bool) (rkey, value []byte, err error) { r.mu.RLock() defer r.mu.RUnlock() diff --git a/leveldb/version.go b/leveldb/version.go index 51afd0fd..a8afb2cb 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -35,7 +35,7 @@ type version struct { cLevel int cScore float64 - cSeek unsafe.Pointer + cSeek unsafe.Pointer // 保存 tSet,需要被 compaction 的下一个 SSTable? closing bool ref int @@ -65,6 +65,8 @@ func (v *version) incref() { } } +// 释放 version +// 当一个 version 引用计数减为 0 时,可以删除掉这个 version func (v *version) releaseNB() { v.ref-- if v.ref > 0 { @@ -72,6 +74,8 @@ func (v *version) releaseNB() { } else if v.ref < 0 { panic("negative version ref") } + + // 当 reference count 减少到 0 时,释放 version select { case v.s.relCh <- &vTask{vid: v.id, files: v.levels, created: time.Now()}: // We can use v.levels directly here since it is immutable. @@ -88,12 +92,14 @@ func (v *version) release() { v.s.vmu.Unlock() } +// 因为 version 上有当前的 SSTable 信息,所以一些使用 SSTable 进行查询的方法定义在了 version 上 func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int, t *tFile) bool, lf func(level int) bool) { ukey := ikey.ukey() // Aux level. if aux != nil { for _, t := range aux { + // 注意:使用的是 user key 来判定是否和 SSTable 的 range 重叠 if t.overlaps(v.s.icmp, ukey, ukey) { if !f(-1, t) { return @@ -123,6 +129,10 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int } } } else { + + // Searches smallest index of tables whose its largest + // key is after or equal with given key. + // 找 SSTable 最小的 index,其 imax 大于等于 ikey,这个 SSTable 就是潜在的 table if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) { t := tables[i] if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { @@ -134,26 +144,29 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int } if lf != nil && !lf(level) { + // lf 内部会判断是否在 level-0 上已经找到,如果已经找到的话,这里会 return,就不会继续往下找了 return } } } +// 在 SSTable 中查找 ikey +// v 上说明了本次查询所有的 SSTable func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { if v.closing { return nil, false, ErrClosed } ukey := ikey.ukey() - sampleSeeks := !v.s.o.GetDisableSeeksCompaction() + sampleSeeks := !v.s.o.GetDisableSeeksCompaction() // 如果 seek 的时候 miss 过多,需要出发 compaction var ( tset *tSet - tseek bool + tseek bool // 注:trigger seek compaction? // Level-0. - zfound bool - zseq uint64 + zfound bool // 标记 level-0 上已经找到了要找的 key,不可以继续往下找了,因为 level-0 上的数据肯定是最新的,如果继续往下找的话就可能找到旧的数据了 + zseq uint64 // 用来和找到的 seq number 做比较,当遇到更大的 seq number 时更新结果,使用离 snapshot 最近的结果 zkt keyType zval []byte ) @@ -162,78 +175,95 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue // Since entries never hop across level, finding key/value // in smaller level make later levels irrelevant. - v.walkOverlapping(aux, ikey, func(level int, t *tFile) bool { - if sampleSeeks && level >= 0 && !tseek { - if tset == nil { - tset = &tSet{level, t} - } else { - tseek = true + v.walkOverlapping(aux, ikey, + // 在一个 SSTable 内部查找,返回是否还需要继续到别的 SSTable 中搜索 + func(level int, t *tFile) bool { + if sampleSeeks && level >= 0 && !tseek { + if tset == nil { + tset = &tSet{level, t} + } else { + // 当 tSet 不为 nil 时,tSet 保存的是之前一个 SSTable,之前的 SSTable 没有找到这个 key,所以要 consume 一次 seekNumber + tseek = true + } } - } - var ( - fikey, fval []byte - ferr error - ) - if noValue { - fikey, ferr = v.s.tops.findKey(t, ikey, ro) - } else { - fikey, fval, ferr = v.s.tops.find(t, ikey, ro) - } + var ( + fikey, fval []byte + ferr error + ) + // 查询的是 internal key + if noValue { + fikey, ferr = v.s.tops.findKey(t, ikey, ro) + } else { + fikey, fval, ferr = v.s.tops.find(t, ikey, ro) + } - switch ferr { - case nil: - case ErrNotFound: - return true - default: - err = ferr - return false - } + switch ferr { + case nil: + case ErrNotFound: + // 在一个 SSTable 中没找到的话要继续寻找 + return true + default: + err = ferr + // 查询过程中出错,就结束查询,不继续在别的 SSTable 中查询 + return false + } - if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil { - if v.s.icmp.uCompare(ukey, fukey) == 0 { - // Level <= 0 may overlaps each-other. - if level <= 0 { - if fseq >= zseq { - zfound = true - zseq = fseq - zkt = fkt - zval = fval - } - } else { - switch fkt { - case keyTypeVal: - value = fval - err = nil - case keyTypeDel: - default: - panic("leveldb: invalid internalKey type") + if fukey, fseq, fkt, fkerr := parseInternalKey(fikey); fkerr == nil { + if v.s.icmp.uCompare(ukey, fukey) == 0 { + // Level <= 0 may overlaps each-other. + if level <= 0 { + if fseq >= zseq { + // 使用更大的 seq number,越大的 seq number 越新,越靠近 snapshot + zfound = true + zseq = fseq + zkt = fkt + zval = fval + } + } else { + switch fkt { + case keyTypeVal: + value = fval + err = nil + case keyTypeDel: + // 这里因为处于 level 0 之下的 level,一旦发现了 ukey,就不用继续找了,所以不存在更新结果的情况,value 不用给 reset 成 nil + // 只会赋值一次 + default: + panic("leveldb: invalid internalKey type") + } + // 在 level 0 以下的层,一旦发现了就不用继续向下找了 + return false } - return false } + } else { + err = fkerr + // 查询过程中出错,就结束查询,不继续在别的 SSTable 中查询 + return false } - } else { - err = fkerr - return false - } - return true - }, func(level int) bool { - if zfound { - switch zkt { - case keyTypeVal: - value = zval - err = nil - case keyTypeDel: - default: - panic("leveldb: invalid internalKey type") + return true + }, func(level int) bool { + // 遍历完 level-0 之后会调用这个函数,检查是否在 level-0 中已经发现了 key + // 如果 zfound,就要返回结果了,不可以继续往下找了,不然的话可能会找到下层 level 上的、旧的数据 + if zfound { + switch zkt { + case keyTypeVal: + value = zval + err = nil + case keyTypeDel: + default: + panic("leveldb: invalid internalKey type") + } + return false } - return false - } - return true - }) + // 对 level-0 之下的 level,调用是都返回 true,继续往下找 + // 所以 lf 这个函数只是对 zfound,“在 level-0 上是否找到”生效,用来终止向下的查找 + return true + }) + // 感觉这里 consume seek 的条件有点鲁莽,如果这个 key 本来就是不存在的,目前看来也会 consume 一次 seek + // 应该是最终这个 key 被找到了,然后对和它有 overlap 的 SSTable 都去 consume 一次 seek? if tseek && tset.table.consumeSeek() <= 0 { tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) } From 537b7291e351fdd909a831365313027407b6eeb2 Mon Sep 17 00:00:00 2001 From: songyinglong Date: Tue, 1 Nov 2022 14:56:16 +0800 Subject: [PATCH 06/11] add log about compaction Change-Id: I389550e1575517fe8a48eb89579a2d790d58c6c1 --- leveldb/db.go | 2 +- leveldb/db_compaction.go | 6 +++++- leveldb/journal/journal.go | 2 +- leveldb/session.go | 3 ++- leveldb/session_record.go | 2 +- leveldb/session_util.go | 6 +++++- leveldb/version.go | 8 +++++++- 7 files changed, 22 insertions(+), 7 deletions(-) diff --git a/leveldb/db.go b/leveldb/db.go index 9777061b..99317daf 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -104,7 +104,7 @@ func openDB(s *session) (*DB, error) { snapsList: list.New(), // Write batchPool: sync.Pool{New: newBatch}, - writeMergeC: make(chan writeMerge), + writeMergeC: make(chan writeMerge), // 用来合并 Write 操作 writeMergedC: make(chan bool), writeLockC: make(chan struct{}, 1), // 注意:channel 的 buffer 为 1,因为首个去获取 Write Lock 的 goroutine 必须得能通过 chan<- 拿到 Lock writeAckC: make(chan error), diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index 94155135..1fd9abed 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -272,6 +272,7 @@ func (db *DB) compactionCommit(name string, rec *sessionRecord) { }, nil) } +// 触发 immutable memtable 的 compaction func (db *DB) memCompaction() { mdb := db.getFrozenMem() if mdb == nil { @@ -691,7 +692,7 @@ func (db *DB) pauseCompaction(ch chan<- struct{}) { // 是 compaction command? type cCmd interface { - ack(err error) + ack(err error) // 回传 ack 消息 } type cAuto struct { @@ -702,6 +703,7 @@ type cAuto struct { func (r cAuto) ack(err error) { if r.ackC != nil { defer func() { + // 防止 r.ackC 被意外 close _ = recover() }() r.ackC <- err @@ -726,6 +728,7 @@ func (r cRange) ack(err error) { // This will trigger auto compaction but will not wait for it. func (db *DB) compTrigger(compC chan<- cCmd) { select { + // 注意:尽量地触发 compaction,如果当前 compC 写不进去,实际的 compaction 可能就不触发了 case compC <- cAuto{}: default: } @@ -775,6 +778,7 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e return err } +// db 启动时一个 go routine 专门执行此函数,进行 memtable 的 compaction func (db *DB) mCompaction() { var x cCmd diff --git a/leveldb/journal/journal.go b/leveldb/journal/journal.go index d8f6adc1..20d45b7b 100644 --- a/leveldb/journal/journal.go +++ b/leveldb/journal/journal.go @@ -368,7 +368,7 @@ type Writer struct { // NewWriter returns a new Writer. func NewWriter(w io.Writer) *Writer { - f, _ := w.(flusher) + f, _ := w.(flusher) // why: 从代码来看好像 storeage.Write 并没有 flush 方法,那怎么转化为 flusher ? return &Writer{ w: w, f: f, diff --git a/leveldb/session.go b/leveldb/session.go index ae6bfd25..e1e1e408 100644 --- a/leveldb/session.go +++ b/leveldb/session.go @@ -42,7 +42,7 @@ type session struct { stTempFileNum int64 stSeqNum uint64 // last mem compacted seq; need external synchronization - stor *iStorage + stor *iStorage // 底层是 file storage storLock storage.Locker o *cachedOptions icmp *iComparer @@ -208,6 +208,7 @@ func (s *session) recover() (err error) { } // Commit session; need external synchronization. +// Commit 一次 compaction func (s *session) commit(r *sessionRecord, trivial bool) (err error) { v := s.version() defer v.release() diff --git a/leveldb/session_record.go b/leveldb/session_record.go index b6c17f9e..1c5bd9c2 100644 --- a/leveldb/session_record.go +++ b/leveldb/session_record.go @@ -55,7 +55,7 @@ type dtRecord struct { } type sessionRecord struct { - hasRec int // enum, addTable: 7 + hasRec int // bitsmap, addTable: 7 comparer string journalNum int64 prevJournalNum int64 diff --git a/leveldb/session_util.go b/leveldb/session_util.go index f467f2d4..1930240a 100644 --- a/leveldb/session_util.go +++ b/leveldb/session_util.go @@ -264,6 +264,7 @@ func (s *session) tLen(level int) int { } // Set current version to v. +// r: 包含下一个 version 相对于当前 stVersion 的 change func (s *session) setVersion(r *sessionRecord, v *version) { s.vmu.Lock() defer s.vmu.Unlock() @@ -367,7 +368,7 @@ func (s *session) fillRecord(r *sessionRecord, snapshot bool) { } if !r.has(recSeqNum) { - r.setSeqNum(s.stSeqNum) + r.setSeqNum(s.stSeqNum) // mem compacted seq } for level, ik := range s.stCompPtrs { @@ -455,6 +456,8 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { if err != nil { return } + + // 默认对 manifest 文件用 sync,GetNoSync 默认返回 false if !s.o.GetNoSync() { err = writer.Sync() if err != nil { @@ -466,6 +469,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { } // Flush record to disk. +// Manifest 的写入复用了 journal func (s *session) flushManifest(rec *sessionRecord) (err error) { s.fillRecord(rec, false) w, err := s.manifest.Next() diff --git a/leveldb/version.go b/leveldb/version.go index a8afb2cb..33bd3716 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -310,10 +310,11 @@ func (v *version) newStaging() *versionStaging { // Spawn a new version based on this version. func (v *version) spawn(r *sessionRecord, trivial bool) *version { staging := v.newStaging() - staging.commit(r) + staging.commit(r) // 将变更记录、收集到 version staging 中 return staging.finish(trivial) } +// 把 v 的所有 level 都添加到 r 中 func (v *version) fillRecord(r *sessionRecord) { for level, tables := range v.levels { for _, t := range tables { @@ -440,11 +441,13 @@ type tablesScratch struct { deleted map[int64]struct{} } +// 描述下一个 version 相比于 base version 的变化,增加了哪些 SSTable,删掉了哪些 SSTable type versionStaging struct { base *version levels []tablesScratch } +// 之所以叫 "Scratch",是因为我们需要收集 rec 形成这个 version staging,收集的过程像是 from the scratch func (p *versionStaging) getScratch(level int) *tablesScratch { if level >= len(p.levels) { newLevels := make([]tablesScratch, level+1) @@ -459,12 +462,14 @@ func (p *versionStaging) commit(r *sessionRecord) { for _, r := range r.deletedTables { scratch := p.getScratch(r.level) if r.level < len(p.base.levels) && len(p.base.levels[r.level]) > 0 { + // 只有 base version 真的有要删的这个 level,这里做一个简单的预 check if scratch.deleted == nil { scratch.deleted = make(map[int64]struct{}) } scratch.deleted[r.num] = struct{}{} } if scratch.added != nil { + // 如果是要删的,就不能继续在 add 里面了,预处理 delete(scratch.added, r.num) } } @@ -482,6 +487,7 @@ func (p *versionStaging) commit(r *sessionRecord) { } } +// 由 version staging 构建一个新的 version func (p *versionStaging) finish(trivial bool) *version { // Build new version. nv := newVersion(p.base.s) From b60e7f84a8d3210a3a8c4e99ca438063b00f0127 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Tue, 1 Nov 2022 23:30:35 +0800 Subject: [PATCH 07/11] add comment about compaction --- leveldb/db_compaction.go | 20 ++++++++++++++++++-- leveldb/db_write.go | 6 ++++++ leveldb/session.go | 7 +++++++ leveldb/session_compaction.go | 4 ++++ leveldb/session_record.go | 3 ++- leveldb/session_util.go | 4 ++-- leveldb/version.go | 4 +++- 7 files changed, 42 insertions(+), 6 deletions(-) diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index 94155135..b783fdbe 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -332,6 +332,9 @@ func (db *DB) memCompaction() { // Commit. stats.startTimer() + + // 写入 new version 的 manifest 到磁盘,完成持久化,然后切换当前 stVersion 到 new version + // new version 是在当前 stVersion 上 apply 了 rec 中的 change 之后产生的 db.compactionCommit("memdb", rec) stats.stopTimer() @@ -357,6 +360,8 @@ func (db *DB) memCompaction() { } } + // 注意:在做 memtable compaction 的时候,major compaction 是被停止的,也就是说不存在并行的 compaction + // 在 memtable compaction 结束之后会 trigger 一次 major compaction // Trigger table compaction. db.compTrigger(db.tcompCmdC) } @@ -624,6 +629,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { } } +// 注意,这里的 func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { db.logf("table@compaction range L%d %q:%q", level, umin, umax) if level >= 0 { @@ -681,6 +687,9 @@ func (db *DB) resumeWrite() bool { return v.tLen(0) < db.s.o.GetWriteL0PauseTrigger() } +// 暂停 table compaction +// 在 memtable compaction 期间,会暂停 table compaction +// pause 直到 ch 可以写入 func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: @@ -689,7 +698,8 @@ func (db *DB) pauseCompaction(ch chan<- struct{}) { } } -// 是 compaction command? +// 是 compaction command +// 只有两种 cmd:cAuto | cRange type cCmd interface { ack(err error) } @@ -710,7 +720,7 @@ func (r cAuto) ack(err error) { type cRange struct { level int - min, max []byte + min, max []byte // 注意:min, max 都是 user key,用 user key 来在选择参与 compaction 的 SSTable 时判定是否重叠(是否选中) ackC chan<- error } @@ -738,6 +748,7 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { // Send cmd. select { case compC <- cAuto{ch}: + // 后续会通过读 <-ch 来等待 compaction 的结果 case err = <-db.compErrC: return case <-db.closeC: @@ -807,6 +818,7 @@ func (db *DB) mCompaction() { } } +// 有专门的 go routine 来执行 tCompaction func (db *DB) tCompaction() { var ( x cCmd @@ -833,7 +845,9 @@ func (db *DB) tCompaction() { if db.tableNeedCompaction() { select { case x = <-db.tcompCmdC: + // 等到了 table compaction command case ch := <-db.tcompPauseC: + // pause table compaction,直到 ch 可以写入 db.pauseCompaction(ch) continue case <-db.closeC: @@ -863,6 +877,7 @@ func (db *DB) tCompaction() { return } } + if x != nil { switch cmd := x.(type) { case cAuto: @@ -871,6 +886,7 @@ func (db *DB) tCompaction() { if db.resumeWrite() { x.ack(nil) } else { + // L0 的 SSTable 还很多,block 住了 Wrtie 操作 waitQ = append(waitQ, x) } } diff --git a/leveldb/db_write.go b/leveldb/db_write.go index 1f4da79d..571f9f32 100644 --- a/leveldb/db_write.go +++ b/leveldb/db_write.go @@ -403,6 +403,7 @@ func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { return db.putRec(keyTypeDel, key, nil, wo) } +// 注意:这里的 min, max 都是 user key,是使用 user key 来判定是否重叠 func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { iter := mem.NewIterator(nil) defer iter.Release() @@ -427,6 +428,8 @@ func (db *DB) CompactRange(r util.Range) error { // Lock writer. select { case db.writeLockC <- struct{}{}: + // 获得了 Write lock,此时 DB 的 Write 操作会被 block + // 获得这个锁是为了完成 mem compaction case err := <-db.compPerErrC: return err case <-db.closeC: @@ -447,9 +450,12 @@ func (db *DB) CompactRange(r util.Range) error { } <-db.writeLockC if err := db.compTriggerWait(db.mcompCmdC); err != nil { + // 等待 mem compaction 完毕,也就是刚才在 rotateMem 时被转化为 frozen memtable 的(也就是执行 CompactRange 时的当前 memtable)完成 mem compaction + // 此时就完成了 memtable 的 compaction,所以 Write lock 就被释放了 return err } } else { + // memtable 与 range 不重叠,不需要进行 mem compaction,所以直接释放锁 <-db.writeLockC } diff --git a/leveldb/session.go b/leveldb/session.go index ae6bfd25..903d4cc9 100644 --- a/leveldb/session.go +++ b/leveldb/session.go @@ -208,6 +208,7 @@ func (s *session) recover() (err error) { } // Commit session; need external synchronization. +// r: 保存 new version 相比于当前 version 的 change func (s *session) commit(r *sessionRecord, trivial bool) (err error) { v := s.version() defer v.release() @@ -226,15 +227,21 @@ func (s *session) commit(r *sessionRecord, trivial bool) (err error) { if s.manifest == nil { // manifest journal writer not yet created, create one + // why: 为什么要传 r,而不是传 nil ? 应该是保存新的全量 version 到 manifest 中就可以了 ? + // nv 已经是新的 version 了 err = s.newManifest(r, nv) } else if s.manifest.Size() >= s.o.GetMaxManifestFileSize() { // pass nil sessionRecord to avoid over-reference table file + // 当 manifest 的大小超出了阈值,就把 new version 里面的 SSTables 刷到一个全新的 manifest 里面 + // 这个 manifest 就只包含 atRecord,不会有 dtRecord 了 err = s.newManifest(nil, nv) } else { + // 当前已经有了 manifest,就将 version 的变更继续写在这个 manifest 中 err = s.flushManifest(r) } // finally, apply new version if no error rise + // 注意:这一步是在 new version 的 manifest 写入成功了之后才做的 version 切换 if err == nil { s.setVersion(r, nv) } diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go index 35106e9a..bb4dcd87 100644 --- a/leveldb/session_compaction.go +++ b/leveldb/session_compaction.go @@ -100,10 +100,14 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit v := s.version() if sourceLevel >= len(v.levels) { + // sourceLevel >= 7,没有这样的 level v.release() return nil } + // 注意:sourceLevel==0 时,因为 L0 上的 SSTable 可能有交叠,所以需要拓展 umin, umax 的范围 + // 比如在 L0 上,umax 涉及到了一个新的 SSTable,那么这个 SSTable 的 max 会拓宽一点,就可能会交叠上下一个新的 SSTable + // 所以需要拓展 umin, umax 的范围,直到不可以继续拓展 t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0) if len(t0) == 0 { v.release() diff --git a/leveldb/session_record.go b/leveldb/session_record.go index b6c17f9e..0af57452 100644 --- a/leveldb/session_record.go +++ b/leveldb/session_record.go @@ -39,7 +39,7 @@ type cpRecord struct { ikey internalKey } -// add table Record +// add table record // add SSTable type atRecord struct { level int // 把 table 添加到第几个 level @@ -49,6 +49,7 @@ type atRecord struct { imax internalKey } +// delete table record type dtRecord struct { level int num int64 diff --git a/leveldb/session_util.go b/leveldb/session_util.go index f467f2d4..fcf71568 100644 --- a/leveldb/session_util.go +++ b/leveldb/session_util.go @@ -403,7 +403,7 @@ func (s *session) recordCommited(rec *sessionRecord) { // Create a new manifest file; need external synchronization. func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()} - writer, err := s.stor.Create(fd) + writer, err := s.stor.Create(fd) // 创建 manifest 文件 if err != nil { return } @@ -461,7 +461,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { return } } - err = s.stor.SetMeta(fd) + err = s.stor.SetMeta(fd) // 设置 CURRENT 文件的内容 return } diff --git a/leveldb/version.go b/leveldb/version.go index a8afb2cb..4ea7461f 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -122,7 +122,7 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int // Level-0 files may overlap each other. Find all files that // overlap ukey. for _, t := range tables { - if t.overlaps(v.s.icmp, ukey, ukey) { + if t.overlaps(v.s.icmp, ukey, ukey) { // 按照 ukey 来做判定 if !f(level, t) { return } @@ -271,6 +271,7 @@ func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue return } +// 触发 ikey 所在 func (v *version) sampleSeek(ikey internalKey) (tcomp bool) { var tset *tSet @@ -314,6 +315,7 @@ func (v *version) spawn(r *sessionRecord, trivial bool) *version { return staging.finish(trivial) } +// 把 version 中的 SSTables,全量转化为 atRecords func (v *version) fillRecord(r *sessionRecord) { for level, tables := range v.levels { for _, t := range tables { From 09b2af1b590a9334a1908eaa3f2965a19e986e74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Wed, 2 Nov 2022 01:05:20 +0800 Subject: [PATCH 08/11] add comments about compaction --- leveldb/db_compaction.go | 3 ++- leveldb/db_write.go | 2 +- leveldb/session_compaction.go | 12 +++++++----- leveldb/table.go | 3 +++ 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index 7ce47254..7950a0ed 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -630,7 +630,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { } } -// 注意,这里的 +// 注意,这里的 umin, umax 指的是 user key func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { db.logf("table@compaction range L%d %q:%q", level, umin, umax) if level >= 0 { @@ -638,6 +638,7 @@ func (db *DB) tableRangeCompaction(level int, umin, umax []byte) error { db.tableCompaction(c, true) } } else { + // level==-1 时,对 DB 所有的 SSTable 做 compaction 整理 // Retry until nothing to compact. for { compacted := false diff --git a/leveldb/db_write.go b/leveldb/db_write.go index 571f9f32..816342cc 100644 --- a/leveldb/db_write.go +++ b/leveldb/db_write.go @@ -429,7 +429,7 @@ func (db *DB) CompactRange(r util.Range) error { select { case db.writeLockC <- struct{}{}: // 获得了 Write lock,此时 DB 的 Write 操作会被 block - // 获得这个锁是为了完成 mem compaction + // 获得这个锁是为了完成 memtable 的 compaction,要锁定 memtable 的内容 case err := <-db.compPerErrC: return err case <-db.closeC: diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go index bb4dcd87..de2e6e76 100644 --- a/leveldb/session_compaction.go +++ b/leveldb/session_compaction.go @@ -158,16 +158,16 @@ type compaction struct { s *session v *version - typ int - sourceLevel int - levels [2]tFiles + typ int // compaction 的 type: L0, non-L0, seek compaction + sourceLevel int // compaction 的 source level,发起 compaction 的 level + levels [2]tFiles // 参与 compaction 的两个 level 上的文件 maxGPOverlaps int64 - gp tFiles + gp tFiles // sourceLevel+2 上的、与某次 compaction 拓展之后的 range 相重叠的文件,gp 的意思是 grad parent gpi int seenKey bool gpOverlappedBytes int64 - imin, imax internalKey + imin, imax internalKey // 参与 compaction 的 sourceLevel 上的 internalKey range tPtrs []int released bool @@ -230,6 +230,8 @@ func (c *compaction) expand() { xmin, xmax := exp0.getRange(c.s.icmp) exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) if len(exp1) == len(t1) { + // 增选了 t0 上的 SSTable,并没有使的 t1 的 SSTable 被增选,即可以确定参与 compaction 的文件 + // 为了避免这种 expand 变得无穷无尽 c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(t0.size()), len(t1), shortenb(t1.size()), len(exp0), shortenb(exp0.size()), len(exp1), shortenb(exp1.size())) diff --git a/leveldb/table.go b/leveldb/table.go index 04453595..a370dbe3 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -196,6 +196,7 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo // If overlapped is true then the search will be restarted if umax // expanded. // The dst content will be overwritten. +// why: 为什么要用 ukey 参与 overlap 的判断? func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { // Short circuit if tf is empty if len(tf) == 0 { @@ -225,11 +226,13 @@ func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, ove end = len(tf) } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 { // The max ukey overlaps with the index file, expand it. + // 注意:end 是开区间,所以这里要等于 index+1,是不包括 index+1 这个 SSTable 的 end = index + 1 } else { end = index } } else { + // 注意:end 是开区间,所以这里要等于 len(tf),也就是不存在的一个文件 end = len(tf) } // Ensure the overlapped file indexes are valid. From 588f579da4e4f11d8d5412253ef453f952bf6fa2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Thu, 3 Nov 2022 01:43:34 +0800 Subject: [PATCH 09/11] add notations for compaction --- leveldb/db_compaction.go | 6 +++++- leveldb/opt/options.go | 1 + leveldb/session_compaction.go | 6 +++++- leveldb/session_record.go | 11 ++++++++--- leveldb/session_util.go | 1 + leveldb/table.go | 2 +- leveldb/version.go | 4 ++++ 7 files changed, 25 insertions(+), 6 deletions(-) diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index 7950a0ed..4c438350 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -443,6 +443,7 @@ func (b *tableCompactionBuilder) cleanup() error { return nil } +// compactionTransactInterface func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) { snapResumed := b.snapIter > 0 hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary. @@ -569,6 +570,7 @@ func (b *tableCompactionBuilder) revert() error { return nil } +// 执行 SSTable compaction func (db *DB) tableCompaction(c *compaction, noTrivial bool) { defer c.release() @@ -576,6 +578,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { rec.addCompPtr(c.sourceLevel, c.imax) if !noTrivial && c.trivial() { + // 只需要移动 SSTable t := c.levels[0][0] db.logf("table@move L%d@%d -> L%d", c.sourceLevel, t.fd.Num, c.sourceLevel+1) rec.delTable(c.sourceLevel, t.fd.Num) @@ -589,6 +592,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { for _, t := range tables { stats[i].read += t.size // Insert deleted tables into record + // 旧的 SSTable 都是可以删除的 rec.delTable(c.sourceLevel+i, t.fd.Num) } } @@ -604,7 +608,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) { stat1: &stats[1], minSeq: minSeq, strict: db.s.o.GetStrict(opt.StrictCompaction), - tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1), + tableSize: db.s.o.GetCompactionTableSize(c.sourceLevel + 1), // 在 default compaction table size 默认为 2MB,切 default multiplier 为 1 的情况下,tableSize 为 2MB } db.compactionTransact("table@build", b) diff --git a/leveldb/opt/options.go b/leveldb/opt/options.go index 48fb0416..90483c85 100644 --- a/leveldb/opt/options.go +++ b/leveldb/opt/options.go @@ -510,6 +510,7 @@ func (o *Options) GetCompactionTableSize(level int) int { if level < len(o.CompactionTableSizeMultiplierPerLevel) && o.CompactionTableSizeMultiplierPerLevel[level] > 0 { mult = o.CompactionTableSizeMultiplierPerLevel[level] } else if o.CompactionTableSizeMultiplier > 0 { + // default CompactionTableSizeMultiplier is 1 mult = math.Pow(o.CompactionTableSizeMultiplier, float64(level)) } } diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go index de2e6e76..ae800518 100644 --- a/leveldb/session_compaction.go +++ b/leveldb/session_compaction.go @@ -67,12 +67,14 @@ func (s *session) pickCompaction() *compaction { if cptr != nil && sourceLevel > 0 { n := len(tables) if i := sort.Search(n, func(i int) bool { + // 选取第一个 imax 大于 cptr 的 SSTable return s.icmp.Compare(tables[i].imax, cptr) > 0 }); i < n { t0 = append(t0, tables[i]) } } if len(t0) == 0 { + // 如果 ctpr==nil 或 cptr已经到了最后一个 SSTable,则从头开始循环 t0 = append(t0, tables[0]) } if sourceLevel == 0 { @@ -81,6 +83,7 @@ func (s *session) pickCompaction() *compaction { typ = nonLevel0Compaction } } else { + // 执行 seek compaction if p := atomic.LoadPointer(&v.cSeek); p != nil { ts := (*tSet)(p) sourceLevel = ts.level @@ -108,6 +111,7 @@ func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit // 注意:sourceLevel==0 时,因为 L0 上的 SSTable 可能有交叠,所以需要拓展 umin, umax 的范围 // 比如在 L0 上,umax 涉及到了一个新的 SSTable,那么这个 SSTable 的 max 会拓宽一点,就可能会交叠上下一个新的 SSTable // 所以需要拓展 umin, umax 的范围,直到不可以继续拓展 + // 使用的是 ukey,完整的 ukey 要完整地被 dump,不能出现部分 ukey 被 dump 了,这样会出现查询的错误 t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0) if len(t0) == 0 { v.release() @@ -308,7 +312,7 @@ func (c *compaction) newIterator() iterator.Iterator { // Options. ro := &opt.ReadOptions{ - DontFillCache: true, + DontFillCache: true, // 作 compaction 的时候读取的数据跟业务无关,不要填充 cache Strict: opt.StrictOverride, } strict := c.s.o.GetStrict(opt.StrictCompaction) diff --git a/leveldb/session_record.go b/leveldb/session_record.go index 5d449ceb..2d84b6a2 100644 --- a/leveldb/session_record.go +++ b/leveldb/session_record.go @@ -62,9 +62,14 @@ type sessionRecord struct { prevJournalNum int64 nextFileNum int64 seqNum uint64 - compPtrs []cpRecord - addedTables []atRecord - deletedTables []dtRecord + + // compact pointers 指示每个层级下一次进行 compaction 操作时需要从哪个键开始 + // 对每个层级 L,会记录该层上一次进行 compaction 时操作的最大值,于是当 L 层进行下一次 compaction 需要选取文件时,选取第一个 imax 大于 compPtr.ikey 的文件 + // 于是,每一层的 compaction 操作在该层的键空间循环 + compPtrs []cpRecord + + addedTables []atRecord + deletedTables []dtRecord scratch [binary.MaxVarintLen64]byte err error diff --git a/leveldb/session_util.go b/leveldb/session_util.go index 1927ebd1..e03362ec 100644 --- a/leveldb/session_util.go +++ b/leveldb/session_util.go @@ -350,6 +350,7 @@ func (s *session) setCompPtr(level int, ik internalKey) { // Get compaction ptr at given level; need external synchronization. func (s *session) getCompPtr(level int) internalKey { if level >= len(s.stCompPtrs) { + // 从最小的 ikey 开始 return nil } return s.stCompPtrs[level] diff --git a/leveldb/table.go b/leveldb/table.go index a370dbe3..6f963b02 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -196,7 +196,7 @@ func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) boo // If overlapped is true then the search will be restarted if umax // expanded. // The dst content will be overwritten. -// why: 为什么要用 ukey 参与 overlap 的判断? +// why: 为什么要用 ukey 参与 overlap 的判断?Ans:使的 ukey 被完整地存在在一个 Level 中,不然如果部分被 dump 的话,可能某个 snapshot 读的时候读到旧的数据 func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { // Short circuit if tf is empty if len(tf) == 0 { diff --git a/leveldb/version.go b/leveldb/version.go index abf8ffcf..8970110f 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -386,6 +386,7 @@ func (v *version) pickMemdbLevel(umin, umax []byte, maxLevel int) (level int) { return } +// 计算 compaction score func (v *version) computeCompaction() { // Precomputed best level for next compaction bestLevel := int(-1) @@ -411,8 +412,10 @@ func (v *version) computeCompaction() { // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). + // 默认 CompactionL0Trigger=4, 当 L0 的文件数目超过 4 个时触发 L0 compcation score = float64(len(tables)) / float64(v.s.o.GetCompactionL0Trigger()) } else { + // 其他 Level,当数据规模过大时触发 compaction score = float64(size) / float64(v.s.o.GetCompactionTotalSize(level)) } @@ -591,6 +594,7 @@ func (p *versionStaging) finish(trivial bool) *version { nv.levels = nv.levels[:n] // Compute compaction score for new version. + // 当一次 compaction 产生了新的 version 时,立刻计算 new version 的 compaction score nv.computeCompaction() return nv From f6678d6c1160c12de438db38ee3d9431053e2aff Mon Sep 17 00:00:00 2001 From: songyinglong Date: Thu, 3 Nov 2022 17:25:52 +0800 Subject: [PATCH 10/11] add anotation Change-Id: Ie1ddc0a69089174126b8d4f92555fb283a7cfcdf --- leveldb/version.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/leveldb/version.go b/leveldb/version.go index 33bd3716..7a379746 100644 --- a/leveldb/version.go +++ b/leveldb/version.go @@ -437,8 +437,8 @@ func (v *version) needCompaction() bool { } type tablesScratch struct { - added map[int64]atRecord - deleted map[int64]struct{} + added map[int64]atRecord // key: file num + deleted map[int64]struct{} // key: file num } // 描述下一个 version 相比于 base version 的变化,增加了哪些 SSTable,删掉了哪些 SSTable @@ -497,6 +497,7 @@ func (p *versionStaging) finish(trivial bool) *version { } nv.levels = make([]tFiles, numLevel) for level := 0; level < numLevel; level++ { + // 逐个 level 地 delete、add SSTables var baseTabels tFiles if level < len(p.base.levels) { baseTabels = p.base.levels[level] @@ -549,14 +550,18 @@ func (p *versionStaging) finish(trivial bool) *version { for _, r := range scratch.added { added = append(added, tableFileFromRecord(r)) } + if level == 0 { + // 在 level=0 上,added 按照 file number 降序排序 + // level-0 上的 SSTable 可能有 range 上的重叠,而且没有顺序性 added.sortByNum() index := nt.searchNumLess(added[len(added)-1].fd.Num) nt = append(nt[:index], append(added, nt[index:]...)...) } else { - added.sortByKey(p.base.s.icmp) + // 其他 level 上,SSTable 是按照 imin 排序的,imin 相同的话按照 num 升序排序; + added.sortByKey(p.base.s.icmp) // 按照 imin 升序排序 _, amax := added.getRange(p.base.s.icmp) - index := nt.searchMin(p.base.s.icmp, amax) + index := nt.searchMin(p.base.s.icmp, amax) // 找 nt 中第一个 imin 大于等于 amax 的 SSTable,于是 added 可以被 insert 到 index 前面 nt = append(nt[:index], append(added, nt[index:]...)...) } nv.levels[level] = nt From 2ba436b3513df35b8922c377164bcea8090906af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=8B=E6=98=A0=E9=BE=99?= Date: Fri, 4 Nov 2022 00:36:54 +0800 Subject: [PATCH 11/11] about keep correct data during compaction --- leveldb/db.go | 8 ++++---- leveldb/db_compaction.go | 19 +++++++++++++++++-- leveldb/iterator/merged_iter.go | 7 +++++-- leveldb/session_compaction.go | 15 +++++++++++++-- leveldb/table.go | 1 + leveldb/table/reader.go | 13 +++++++++---- 6 files changed, 49 insertions(+), 14 deletions(-) diff --git a/leveldb/db.go b/leveldb/db.go index 99317daf..95726728 100644 --- a/leveldb/db.go +++ b/leveldb/db.go @@ -75,10 +75,10 @@ type DB struct { // Compaction. compCommitLk sync.Mutex tcompCmdC chan cCmd - tcompPauseC chan chan<- struct{} - mcompCmdC chan cCmd // 全称 memdb compaction command channel? - compErrC chan error // compaction error - compPerErrC chan error // 全称 compaction persistent error + tcompPauseC chan chan<- struct{} // 从 tcompPauseC 读取 <-chan 读到了一个 chan<- struct{} 到了之后要 pause compaction,直到可以写入 chan<- struct{} 才恢复 + mcompCmdC chan cCmd // 全称 memdb compaction command channel? + compErrC chan error // compaction error + compPerErrC chan error // 全称 compaction persistent error compErrSetC chan error compWriteLocking bool compStats cStats diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index 4c438350..8837e9ea 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -374,6 +374,7 @@ type tableCompactionBuilder struct { rec *sessionRecord stat1 *cStatStaging + // 暂存 snapshot 状态 snapHasLastUkey bool snapLastUkey []byte snapLastSeq uint64 @@ -384,7 +385,7 @@ type tableCompactionBuilder struct { kerrCnt int dropCnt int - minSeq uint64 + minSeq uint64 // compaction 时的最小 seq strict bool tableSize int @@ -444,6 +445,7 @@ func (b *tableCompactionBuilder) cleanup() error { } // compactionTransactInterface +// b 记录了 compaction 的状态 func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) { snapResumed := b.snapIter > 0 hasLastUkey := b.snapHasLastUkey // The key might has zero length, so this is necessary. @@ -491,9 +493,13 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) shouldStop := !resumed && b.c.shouldStopBefore(ikey) if !hasLastUkey || b.s.icmp.uCompare(lastUkey, ukey) != 0 { + // 做 iteration 的时候相同的 ukey 必然是连在一起的 + // 这时遇到了 ukey 的切换 // First occurrence of this user key. // Only rotate tables if ukey doesn't hop across. + // 相同的 ukey 不会写到两个 SSTable 中 + // 注意这里 b.flush 是以 uKey 切换了为前提的,也就是说 shouldStop 只在 ukey 切换时才有效,相同的 ukey 是不能 stop 的 if b.tw != nil && (shouldStop || b.needFlush()) { if err := b.flush(); err != nil { return err @@ -509,6 +515,7 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) b.snapDropCnt = b.dropCnt } + // ukey 成为新的 lastUkey hasLastUkey = true lastUkey = append(lastUkey[:0], ukey...) lastSeq = keyMaxSeq @@ -517,8 +524,12 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) switch { case lastSeq <= b.minSeq: // Dropped because newer entry for same user key exist + // 注意:这里的 fallthrough 会直接掉落到下一个 case,无论下一个 case 的条件是否满足! + // 于是,ukey 的 seq 在 minSeq 之后的、非首个记录必然会 fallthrough 到下一个 case,就会被 drop 掉了 + // fallthrough 用的刚刚好! fallthrough // (A) case kt == keyTypeDel && seq <= b.minSeq && b.c.baseLevelForKey(lastUkey): + // 对于 ukey 的 seq 在 minSeq 之后的、首个记录,且 keyType==del,更高层又没有这个 ukey 的,可以 drop // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger seq numbers @@ -528,11 +539,14 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) // Therefore this deletion marker is obsolete and can be dropped. lastSeq = seq b.dropCnt++ - continue + continue // 这条数据就不会被写到新的 SSTable 中了 default: + // 对于 ukey 的 seq > minSeq,即在 minSeq 之前的,会到 default,记录被写入 + // 对于 ukey 的 seq 在 minSeq 之后的、首个记录(因为必然有 lastSeq > b.minSeq,不会命中前两个 case),如果其 keyType==val,那么会到 default,记录会被写入 lastSeq = seq } } else { + // key error is not nil if b.strict { return kerr } @@ -544,6 +558,7 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) b.kerrCnt++ } + // 写到新的 SSTable if err := b.appendKV(ikey, iter.Value()); err != nil { return err } diff --git a/leveldb/iterator/merged_iter.go b/leveldb/iterator/merged_iter.go index 374e82b6..10aa47ee 100644 --- a/leveldb/iterator/merged_iter.go +++ b/leveldb/iterator/merged_iter.go @@ -30,7 +30,7 @@ type mergedIterator struct { strict bool keys [][]byte - index int + index int // 当前放出的 index, keys[index] 就是放出的 key, index in range [0, len(iters)) dir dir err error errf func(err error) @@ -77,11 +77,13 @@ func (i *mergedIterator) First() bool { for x, iter := range i.iters { switch { case iter.First(): + // merge sort i.keys[x] = assertKey(iter.Key()) h.Push(x) case i.iterErr(iter): return false default: + // x 从一开始就是空的 i.keys[x] = nil } } @@ -181,6 +183,7 @@ func (i *mergedIterator) Next() bool { case i.iterErr(iter): return false default: + // 表示 x 这个 iter 已经空了 i.keys[x] = nil } return i.next() @@ -322,7 +325,7 @@ type indexHeap mergedIterator func (h *indexHeap) Len() int { return len(h.indexes) } func (h *indexHeap) Less(i, j int) bool { i, j = h.indexes[i], h.indexes[j] - r := h.cmp.Compare(h.keys[i], h.keys[j]) + r := h.cmp.Compare(h.keys[i], h.keys[j]) // why: 如果 h.keys[i]==nil 的话,怎么做比较? if h.reverse { return r > 0 } diff --git a/leveldb/session_compaction.go b/leveldb/session_compaction.go index ae800518..b1836194 100644 --- a/leveldb/session_compaction.go +++ b/leveldb/session_compaction.go @@ -167,12 +167,12 @@ type compaction struct { levels [2]tFiles // 参与 compaction 的两个 level 上的文件 maxGPOverlaps int64 - gp tFiles // sourceLevel+2 上的、与某次 compaction 拓展之后的 range 相重叠的文件,gp 的意思是 grad parent + gp tFiles // sourceLevel+2 上的、与某次 compaction expand 之后的 range 相重叠的文件,gp 的意思是 grad parent gpi int seenKey bool gpOverlappedBytes int64 imin, imax internalKey // 参与 compaction 的 sourceLevel 上的 internalKey range - tPtrs []int + tPtrs []int // tPtrs[level] 是 level 上的一个 SSTable 的 index,用于在 compaction 的时候快速判断 ukey 是否不会在 sourceLevel+2 及更深的 level 上出现 released bool snapGPI int @@ -261,7 +261,12 @@ func (c *compaction) trivial() bool { return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps } +// 如果 ukey 只存在于 compaction 涉及到的两个 level,不在更高的 level 出现,返回 true,否则返回 false +// "baseLevel" 的意思在此 +// 因为在 compaction 的过程中,从 iterator 出来的 ukey 必然是递增的,后面的 ukey 只会大于等于前面的 ukey +// 所以这个函数内部用到的 c.tPtrs[level] 是可以单调向前移动的 func (c *compaction) baseLevelForKey(ukey []byte) bool { + // 从 sourceLevel+2 开始搜索 for level := c.sourceLevel + 2; level < len(c.v.levels); level++ { tables := c.v.levels[level] for c.tPtrs[level] < len(tables) { @@ -270,6 +275,8 @@ func (c *compaction) baseLevelForKey(ukey []byte) bool { // We've advanced far enough. if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { // Key falls in this file's range, so definitely not base level. + // ukey 只是在这个文件的 range 内,其实并不一定代表这个文件一定包含这个 ukey ? + // 感觉这里是为了判断的速度,于是保守一点,给出了 ukey 可能出现在更高层的结论 return false } break @@ -294,6 +301,9 @@ func (c *compaction) shouldStopBefore(ikey internalKey) bool { if c.gpOverlappedBytes > c.maxGPOverlaps { // Too much overlap for current output; start new output. + // compaction 之后在 sourceLevel+1 之后产生的新文件 f 也不可以太大,如果过大了的话,f 会跟 sourceLevel+2 的文件有过多的交集 + // 那么将来当 f 需要做 compcation 的时候,下一层就会涉及到过多的文件,那时的 compaction 就过于 heavy 了 + // 所以这时要终止当前的 SSTable,开启下一个 SSTable c.gpOverlappedBytes = 0 return true } @@ -331,6 +341,7 @@ func (c *compaction) newIterator() iterator.Iterator { its = append(its, c.s.tops.newIterator(t, nil, ro)) } } else { + // tables 本身就是排好序的,所以 newIndexIterator 内部做二分是没问题的 it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict) its = append(its, it) } diff --git a/leveldb/table.go b/leveldb/table.go index 6f963b02..ccf32b23 100644 --- a/leveldb/table.go +++ b/leveldb/table.go @@ -409,6 +409,7 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { // Opens table. It returns a cache handle, which should // be released after use. +// 会 cache SSTable func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { loader := func() (size int, value cache.Value) { diff --git a/leveldb/table/reader.go b/leveldb/table/reader.go index 2b986d4d..966a2453 100644 --- a/leveldb/table/reader.go +++ b/leveldb/table/reader.go @@ -159,7 +159,7 @@ type blockIter struct { prevOffset int prevNode []int prevKeys []byte - restartIndex int // 注:the index of restart point + restartIndex int // blockIter 当前下标所在的(在这个restart point后面)restart point index // Iterator direction. dir dir // Restart index slice range. @@ -168,7 +168,7 @@ type blockIter struct { // Offset slice range. offsetStart int // 框定 iterator 的范围,起始的 offset offsetRealStart int - offsetLimit int + offsetLimit int // 框定 iterator 的范围,结束的 offset,当 blockIter.offset==offsetLimit 的时候,表示 iteration 结束,不应该出现 offset > offsetLimit 的情况 // Error. err error } @@ -254,6 +254,7 @@ func (i *blockIter) Seek(key []byte) bool { // 按照 restart point 二分查找 // 找第一个小于 key 的 restart point + // 因为 restart point 是到下一个 restart point 最小的 ikey,如果不是选小于 key 的话,就错过这个 key 了 ri, offset, err := i.block.seek(i.tr.cmp, i.riStart, i.riLimit, key) if err != nil { i.sErr(err) @@ -274,6 +275,8 @@ func (i *blockIter) Seek(key []byte) bool { return false } +// 按照 dirForward 的方向,如果 blockIter 还有下一个值,读取这个值并返回 true +// 只做 dirForward 方向 func (i *blockIter) Next() bool { if i.dir == dirEOI || i.err != nil { return false @@ -283,6 +286,7 @@ func (i *blockIter) Next() bool { } if i.dir == dirSOI { + // 回到起点 i.restartIndex = i.riStart i.offset = i.offsetStart } else if i.dir == dirBackward { @@ -319,7 +323,7 @@ func (i *blockIter) Next() bool { return false } - key, value, nShared, n, err := i.block.entry(i.offset) + key, value, nShared, n, err := i.block.entry(i.offset) // n: 整条 entry 的 size,于是 offset+=n 会跳到 下一个 entry if err != nil { i.sErr(i.tr.fixErrCorruptedBH(i.block.bh, err)) return false @@ -329,7 +333,7 @@ func (i *blockIter) Next() bool { return false } - i.key = append(i.key[:nShared], key...) + i.key = append(i.key[:nShared], key...) // 当前的值由 shard+unshared 拼凑起来得到 i.value = value i.prevOffset = i.offset i.offset += n @@ -783,6 +787,7 @@ func (r *Reader) newBlockIter(b *block, bReleaser util.Releaser, slice *util.Ran bi.offsetStart = b.restartOffset(bi.riStart) bi.offsetRealStart = bi.prevOffset } else { + // 整个 block 都不在 range 内 bi.riStart = b.restartsLen bi.offsetStart = b.restartsOffset bi.offsetRealStart = b.restartsOffset