-
Notifications
You must be signed in to change notification settings - Fork 183
/
memtable.go
302 lines (268 loc) · 8.04 KB
/
memtable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
package lotusdb
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"os"
"sort"
"sync"
"github.com/bwmarrin/snowflake"
arenaskl "github.com/dgraph-io/badger/v4/skl"
"github.com/dgraph-io/badger/v4/y"
"github.com/rosedblabs/wal"
)
const (
// the wal file name format is .SEG.%d
// %d is the unique id of the memtable, used to generate wal file name
// for example, the wal file name of memtable with id 1 is .SEG.1.
walFileExt = ".SEG.%d"
initialTableID = 1
)
type (
// memtable is an in-memory data structure holding data before they are flushed into index and value log.
// Currently, the only supported data structure is skip list, see github.com/dgraph-io/badger/v4/skl.
//
// New writes always insert data to memtable, and reads has query from memtable
// before reading from index and value log, because memtable`s data is newer.
//
// Once a memtable is full(memtable has its threshold, see MemtableSize in options),
// it becomes immutable and replaced by a new memtable.
//
// A background goroutine will flush the content of memtable into index and vlog,
// after that the memtable can be deleted.
memtable struct {
mu sync.RWMutex
wal *wal.WAL // write ahead log for the memtable
skl *arenaskl.Skiplist // in-memory skip list
options memtableOptions
}
// memtableOptions represents the configuration options for a memtable.
memtableOptions struct {
dirPath string // where write ahead log wal file is stored
tableID uint32 // unique id of the memtable, used to generate wal file name
memSize uint32 // max size of the memtable
walBytesPerSync uint32 // flush wal file to disk throughput BytesPerSync parameter
walSync bool // WAL flush immediately after each writing
}
)
// find the wal file of the memtable with the specified id
// a wal is associated with a memtable, so the wal file name is generated by the memtable id
// for example, the wal file name of memtable with id 1 is .SEG.1.
func openAllMemtables(options Options) ([]*memtable, error) {
entries, err := os.ReadDir(options.DirPath)
if err != nil {
return nil, err
}
// get all memtable ids
var tableIDs []int
for _, entry := range entries {
if entry.IsDir() {
continue
}
var id int
var prefix int
_, err = fmt.Sscanf(entry.Name(), "%d"+walFileExt, &prefix, &id)
if err != nil {
continue
}
tableIDs = append(tableIDs, id)
}
if len(tableIDs) == 0 {
tableIDs = append(tableIDs, initialTableID)
}
sort.Ints(tableIDs)
tables := make([]*memtable, len(tableIDs))
for i, table := range tableIDs {
table, errOpenMemtable := openMemtable(memtableOptions{
dirPath: options.DirPath,
tableID: uint32(table),
memSize: options.MemtableSize,
walSync: options.Sync,
walBytesPerSync: options.BytesPerSync,
})
if errOpenMemtable != nil {
return nil, errOpenMemtable
}
tables[i] = table
}
return tables, nil
}
// memtable holds a wal(write ahead log), so when opening a memtable,
// actually it open the corresponding wal file.
// and load all entries from wal to rebuild the content of the skip list.
func openMemtable(options memtableOptions) (*memtable, error) {
// init skip list
//nolint:gomnd // default size
skl := arenaskl.NewSkiplist(int64(float64(options.memSize) * 1.5))
table := &memtable{options: options, skl: skl}
// open the Write Ahead Log file
walFile, err := wal.Open(wal.Options{
DirPath: options.dirPath,
SegmentSize: math.MaxInt, // no limit, guarantee that a wal file only contains one segment file
SegmentFileExt: fmt.Sprintf(walFileExt, options.tableID),
Sync: options.walSync,
BytesPerSync: options.walBytesPerSync,
})
if err != nil {
return nil, err
}
table.wal = walFile
indexRecords := make(map[uint64][]*LogRecord)
// now we get the opened wal file, we need to load all entries
// from wal to rebuild the content of the skip list
reader := table.wal.NewReader()
for {
chunk, _, errNext := reader.Next()
if errNext != nil {
if errors.Is(errNext, io.EOF) {
break
}
return nil, errNext
}
record := decodeLogRecord(chunk)
if record.Type == LogRecordBatchFinished {
batchID, errParseBytes := snowflake.ParseBytes(record.Key)
if errParseBytes != nil {
return nil, errParseBytes
}
for _, idxRecord := range indexRecords[uint64(batchID)] {
table.skl.Put(y.KeyWithTs(idxRecord.Key, 0),
y.ValueStruct{Value: idxRecord.Value, Meta: idxRecord.Type})
}
delete(indexRecords, uint64(batchID))
} else {
indexRecords[record.BatchID] = append(indexRecords[record.BatchID], record)
}
}
// open and read wal file successfully, return the memtable
return table, nil
}
// putBatch writes a batch of entries to memtable.
func (mt *memtable) putBatch(pendingWrites map[string]*LogRecord,
batchID snowflake.ID, options WriteOptions) error {
// if wal is not disabled, write to wal first to ensure durability and atomicity
if !options.DisableWal {
// add record to wal.pendingWrites
for _, record := range pendingWrites {
record.BatchID = uint64(batchID)
encRecord := encodeLogRecord(record)
mt.wal.PendingWrites(encRecord)
}
// add a record to indicate the end of the batch
endRecord := encodeLogRecord(&LogRecord{
Key: batchID.Bytes(),
Type: LogRecordBatchFinished,
})
mt.wal.PendingWrites(endRecord)
// write wal.pendingWrites
if _, err := mt.wal.WriteAll(); err != nil {
return err
}
// flush wal if necessary
if options.Sync && !mt.options.walSync {
if err := mt.wal.Sync(); err != nil {
return err
}
}
}
mt.mu.Lock()
// write to in-memory skip list
for key, record := range pendingWrites {
mt.skl.Put(y.KeyWithTs([]byte(key), 0), y.ValueStruct{Value: record.Value, Meta: record.Type})
}
mt.mu.Unlock()
return nil
}
// get value from memtable
// if the specified key is marked as deleted, a true bool value is returned.
func (mt *memtable) get(key []byte) (bool, []byte) {
mt.mu.RLock()
defer mt.mu.RUnlock()
valueStruct := mt.skl.Get(y.KeyWithTs(key, 0))
deleted := valueStruct.Meta == LogRecordDeleted
return deleted, valueStruct.Value
}
func (mt *memtable) isFull() bool {
return mt.skl.MemSize() >= int64(mt.options.memSize)
}
func (mt *memtable) deleteWAl() error {
if mt.wal != nil {
return mt.wal.Delete()
}
return nil
}
func (mt *memtable) close() error {
if mt.wal != nil {
return mt.wal.Close()
}
return nil
}
func (mt *memtable) sync() error {
if mt.wal != nil {
return mt.wal.Sync()
}
return nil
}
// memtableIterator implement baseIterator.
type memtableIterator struct {
options IteratorOptions
iter *arenaskl.UniIterator
}
func newMemtableIterator(options IteratorOptions, memtable *memtable) *memtableIterator {
return &memtableIterator{
options: options,
iter: memtable.skl.NewUniIterator(options.Reverse),
}
}
// Rewind seek the first key in the iterator.
func (mi *memtableIterator) Rewind() {
mi.iter.Rewind()
if len(mi.options.Prefix) == 0 {
return
}
// prefix scan
for mi.iter.Valid() && !bytes.HasPrefix(mi.iter.Key(), mi.options.Prefix) {
mi.iter.Next()
}
}
// Seek move the iterator to the key which is
// greater(less when reverse is true) than or equal to the specified key.
func (mi *memtableIterator) Seek(key []byte) {
mi.iter.Seek(y.KeyWithTs(key, 0))
if len(mi.options.Prefix) == 0 {
return
}
// prefix scan
for mi.Valid() && !bytes.HasPrefix(mi.Key(), mi.options.Prefix) {
mi.Next()
}
}
// Next moves the iterator to the next key.
func (mi *memtableIterator) Next() {
mi.iter.Next()
if len(mi.options.Prefix) == 0 {
return
}
// prefix scan
for mi.iter.Valid() && !bytes.HasPrefix(mi.iter.Key(), mi.options.Prefix) {
mi.iter.Next()
}
}
// Key get the current key.
func (mi *memtableIterator) Key() []byte {
return y.ParseKey(mi.iter.Key())
}
// Value get the current value.
func (mi *memtableIterator) Value() any {
return mi.iter.Value()
}
// Valid returns whether the iterator is exhausted.
func (mi *memtableIterator) Valid() bool {
return mi.iter.Valid()
}
// Close the iterator.
func (mi *memtableIterator) Close() error {
return mi.iter.Close()
}