-
Notifications
You must be signed in to change notification settings - Fork 98
/
keystore.go
370 lines (327 loc) · 10.9 KB
/
keystore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
// Copyright 2023 - MinIO, Inc. All rights reserved.
// Use of this source code is governed by the AGPLv3
// license that can be found in the LICENSE file.
package kes
import (
"context"
"errors"
"io"
"slices"
"strings"
"sync/atomic"
"time"
"github.com/minio/kes/internal/cache"
"github.com/minio/kes/internal/crypto"
"github.com/minio/kes/internal/keystore"
"github.com/minio/kms-go/kes"
)
// A KeyStore stores key-value pairs. It provides durable storage for a
// KES server to persist and access keys. A KeyStore may be modified
// concurrently by different go routines.
type KeyStore interface {
// Closes the key store and releases associated resources,
// like background go routines, if any.
io.Closer
// Status returns the current state of the KeyStore.
Status(context.Context) (KeyStoreState, error)
// Create creates a new entry with the given name if and only
// if no such entry exists.
// Otherwise, Create returns kes.ErrKeyExists.
Create(ctx context.Context, name string, value []byte) error
// Delete removes the entry. It may return either no error or
// kes.ErrKeyNotFound if no such entry exists.
Delete(ctx context.Context, name string) error
// Get returns the value for the given name. It returns
// kes.ErrKeyNotFound if no such entry exits.
Get(ctx context.Context, name string) ([]byte, error)
// List returns the first n key names, that start with the given
// prefix, and the next prefix from which the listing should
// continue.
//
// It returns all keys with the prefix if n < 0 and less than n
// names if n is greater than the number of keys with the prefix.
//
// An empty prefix matches any key name. At the end of the listing
// or when there are no (more) keys starting with the prefix, the
// returned prefix is empty.
List(ctx context.Context, prefix string, n int) ([]string, string, error)
}
// KeyStoreState is a structure containing information about
// the current state of a KeyStore.
type KeyStoreState struct {
Latency time.Duration
}
// MemKeyStore is a volatile KeyStore that stores key-value pairs in
// memory. Its zero value is ready and safe to be used concurrently
// from different go routines. It is optimized for reads but not
// well-suited for many writes/deletes.
type MemKeyStore struct {
keys cache.Cow[string, []byte]
}
var _ KeyStore = (*MemKeyStore)(nil) // compiler check
func (ks *MemKeyStore) String() string { return "In Memory" }
// Status returns the current state of the MemKeyStore.
// It never returns an error.
func (ks *MemKeyStore) Status(context.Context) (KeyStoreState, error) {
return KeyStoreState{
Latency: 1 * time.Millisecond,
}, nil
}
// Create creates a new entry with the given name if and only
// if no such entry exists.
// Otherwise, Create returns kes.ErrKeyExists.
func (ks *MemKeyStore) Create(_ context.Context, name string, value []byte) error {
if !ks.keys.Add(name, slices.Clone(value)) {
return kes.ErrKeyExists
}
return nil
}
// Delete removes the entry. It may return either no error or
// kes.ErrKeyNotFound if no such entry exists.
func (ks *MemKeyStore) Delete(_ context.Context, name string) error {
if !ks.keys.Delete(name) {
return kes.ErrKeyNotFound
}
return nil
}
// Get returns the value for the given name. It returns
// kes.ErrKeyNotFound if no such entry exits.
func (ks *MemKeyStore) Get(_ context.Context, name string) ([]byte, error) {
if val, ok := ks.keys.Get(name); ok {
return slices.Clone(val), nil
}
return nil, kes.ErrKeyNotFound
}
// List returns the first n key names that start with the given
// prefix and the next prefix from which to continue the listing.
//
// It returns all keys with the prefix if n < 0 and less than n
// names if n is grater than the number of keys with the prefix.
//
// An empty prefix matches any key name. At the end of the listing
// or when there are no (more) keys starting with the prefix, the
// returned prefix is empty.
//
// List never returns an error.
func (ks *MemKeyStore) List(_ context.Context, prefix string, n int) ([]string, string, error) {
if n == 0 {
return []string{}, prefix, nil
}
keys := ks.keys.Keys()
slices.Sort(keys)
if prefix == "" {
if n < 0 || n >= len(keys) {
return keys, "", nil
}
return keys[:n], keys[n], nil
}
i := slices.IndexFunc(keys, func(key string) bool { return strings.HasPrefix(key, prefix) })
if i < 0 {
return []string{}, "", nil
}
for j, key := range keys[i:] {
if !strings.HasPrefix(key, prefix) {
return keys[i : i+j], "", nil
}
if n > 0 && j == n {
return keys[i : i+j], key, nil
}
}
return keys[i:], "", nil
}
// Close does nothing and returns no error.
//
// It is implemented to satisfy the KeyStore
// interface.
func (ks *MemKeyStore) Close() error { return nil }
// newCache returns a new keyCache wrapping the KeyStore.
// It caches keys in memory and evicts cache entries based
// on the CacheConfig.
//
// Close the keyCache to release to the stop background
// garbage collector evicting cache entries and release
// associated resources.
func newCache(store KeyStore, conf *CacheConfig) *keyCache {
ctx, stop := context.WithCancel(context.Background())
c := &keyCache{
store: store,
stop: stop,
}
expiryOffline := conf.ExpiryOffline
go c.gc(ctx, conf.Expiry, func() {
if offline := c.offline.Load(); !offline || expiryOffline <= 0 {
c.cache.DeleteAll()
}
})
go c.gc(ctx, conf.ExpiryUnused/2, func() {
if offline := c.offline.Load(); !offline || conf.ExpiryOffline <= 0 {
c.cache.DeleteFunc(func(_ string, e *cacheEntry) bool {
// We remove an entry if it isn't marked as used.
// We also change all other entries to unused such
// that they get evicted on the next GC run unless
// they're used in between.
//
// Therefore, we try to switch the Used flag from
// true (used) to flase (unused). If this succeeds,
// the entry was in fact marked as used and must
// not be removed. Otherwise, the entry wasn't marked
// as used and we should evict it.
return !e.Used.CompareAndSwap(true, false)
})
}
})
go c.gc(ctx, conf.ExpiryOffline, func() {
if offline := c.offline.Load(); offline && expiryOffline > 0 {
c.cache.DeleteAll()
}
})
go c.gc(ctx, 10*time.Second, func() {
_, err := c.store.Status(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
c.offline.Store(true)
} else {
c.offline.Store(false)
}
})
return c
}
// keyCache is an in-memory cache for keys fetched from a Keystore.
// A keyCache runs a background garbage collector that periodically
// evicts cache entries based on a CacheConfig.
//
// It uses lock-free concurrency primitives to optimize for fast
// concurrent reads.
type keyCache struct {
store KeyStore
cache cache.Cow[string, *cacheEntry]
// The barrier prevents reading the same key multiple
// times concurrently from the kv.Store.
// When a particular key isn't cached, we don't want
// to fetch it N times given N concurrent requests.
// Instead, we want the first request to fetch it and
// all others to wait until the first is done.
barrier cache.Barrier[string]
// Controls whether we treat the cache as offline
// cache (with different GC config).
offline atomic.Bool
stop func() // Stops the GC
}
// A cache entry with a recently used flag.
type cacheEntry struct {
Key crypto.KeyVersion
Used atomic.Bool
}
// Status returns the current state of the underlying KeyStore.
//
// It immediately returns an error if the backend keystore is not
// reachable and offline caching is enabled.
func (c *keyCache) Status(ctx context.Context) (KeyStoreState, error) {
if c.offline.Load() {
return KeyStoreState{}, &keystore.ErrUnreachable{Err: errors.New("keystore is offline")}
}
return c.store.Status(ctx)
}
// Create creates a new key with the given name if and only if
// no such entry exists. Otherwise, kes.ErrKeyExists is returned.
func (c *keyCache) Create(ctx context.Context, name string, key crypto.KeyVersion) error {
b, err := crypto.EncodeKeyVersion(key)
if err != nil {
return err
}
if err = c.store.Create(ctx, name, b); err != nil {
if errors.Is(err, kes.ErrKeyExists) {
return kes.ErrKeyExists
}
}
return err
}
// Delete deletes the key from the key store and removes it from the
// cache. It may return either no error or kes.ErrKeyNotFound if no
// such entry exists.
func (c *keyCache) Delete(ctx context.Context, name string) error {
if err := c.store.Delete(ctx, name); err != nil {
if errors.Is(err, kes.ErrKeyNotFound) {
return err
}
return err
}
c.cache.Delete(name)
return nil
}
// Get returns the key from the cache. If it key is not in the cache,
// Get tries to fetch it from the key store and put it into the cache.
// If the key is also not found at the key store, it returns
// kes.ErrKeyNotFound.
//
// Get tries to make as few calls to the underlying key store. Multiple
// concurrent Get calls for the same key, that is not in the cache, are
// serialized.
func (c *keyCache) Get(ctx context.Context, name string) (crypto.KeyVersion, error) {
if entry, ok := c.cache.Get(name); ok {
entry.Used.Store(true)
return entry.Key, nil
}
// Since the key is not in the cache, we want to fetch it, once.
// However, we also don't want to block conccurent reads for different
// key names.
// Hence, we acquire a lock per key and release it once done.
c.barrier.Lock(name)
defer c.barrier.Unlock(name)
// Check the cache again, a previous request might have fetched the key
// while we were blocked by the barrier.
if entry, ok := c.cache.Get(name); ok {
entry.Used.Store(true)
return entry.Key, nil
}
b, err := c.store.Get(ctx, name)
if err != nil {
if errors.Is(err, kes.ErrKeyNotFound) {
return crypto.KeyVersion{}, kes.ErrKeyNotFound
}
return crypto.KeyVersion{}, err
}
k, err := crypto.ParseKeyVersion(b)
if err != nil {
return crypto.KeyVersion{}, err
}
entry := &cacheEntry{
Key: k,
}
entry.Used.Store(true)
c.cache.Set(name, entry)
return entry.Key, nil
}
// List returns the first n key names, that start with the given prefix,
// and the next prefix from which the listing should continue.
//
// It returns all keys with the prefix if n < 0 and less then n
// names if n is greater than the number of keys with the prefix.
//
// An empty prefix matches any key name. At the end of the listing
// or when there are no (more) keys starting with the prefix, the
// returned prefix is empty.
func (c *keyCache) List(ctx context.Context, prefix string, n int) ([]string, string, error) {
return c.store.List(ctx, prefix, n)
}
// Close stops the cache's background garbage collector and
// releases associated resources.
func (c *keyCache) Close() error {
c.stop()
return c.store.Close()
}
// gc executes f periodically until the ctx.Done() channel returns.
func (c *keyCache) gc(ctx context.Context, interval time.Duration, f func()) {
if interval <= 0 {
return
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f()
case <-ctx.Done():
return
}
}
}