Skip to content

Commit

Permalink
Add bytes scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
angelini committed Aug 1, 2024
1 parent 21ce11f commit aab30ee
Showing 1 changed file with 50 additions and 6 deletions.
56 changes: 50 additions & 6 deletions internal/db/content.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"strconv"
"sync"

"github.com/dgraph-io/ristretto"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -137,9 +138,51 @@ func (c *ContentDecoder) Decode(encoded EncodedContent) (DecodedContent, error)
return output, nil
}

type BytesScanner struct {
buf bytes.Buffer
}

func (b *BytesScanner) ScanBytes(src []byte) error {
_, err := b.buf.Write(src)
return err
}

func (b *BytesScanner) Len() int {
return b.buf.Len()
}

func (b *BytesScanner) Bytes() []byte {
return b.buf.Bytes()
}

func (b *BytesScanner) Reset() {
b.buf.Reset()
}

type BytesScannerPool struct {
pool sync.Pool
}

func NewBytesScannerPool() *BytesScannerPool {
return &BytesScannerPool{
pool: sync.Pool{
New: func() any {
return &BytesScanner{}
},
},
}
}

func (b *BytesScannerPool) GetScanner() *BytesScanner {
scanner := b.pool.Get().(*BytesScanner)
scanner.Reset()
return scanner
}

type ContentLookup struct {
cache *ristretto.Cache
decoders *puddle.Pool[*ContentDecoder]
scanners *BytesScannerPool
}

func NewContentLookup() (*ContentLookup, error) {
Expand Down Expand Up @@ -173,6 +216,7 @@ func NewContentLookup() (*ContentLookup, error) {
return &ContentLookup{
cache: cache,
decoders: decoders,
scanners: NewBytesScannerPool(),
}, nil
}

Expand All @@ -190,13 +234,13 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m
value, found := cl.cache.Get(hash.Hex())
if found {
if isEncoded {
decoded, err := decoder.Value().Decode(value.(EncodedContent))
decoded, err := decoder.Value().Decode(value.(*BytesScanner).Bytes())
if err != nil {
return nil, fmt.Errorf("cannot decode value from cache %v: %w", hash.Hex(), err)
}
contents[hash] = decoded
} else {
contents[hash] = value.(DecodedContent)
contents[hash] = value.(*BytesScanner).Bytes()
}
} else {
notFound = append(notFound, hash)
Expand All @@ -215,24 +259,24 @@ func (cl *ContentLookup) Lookup(ctx context.Context, tx pgx.Tx, hashesToLookup m

for rows.Next() {
var hash Hash
var value []byte
value := cl.scanners.GetScanner()

err = rows.Scan(&hash.H1, &hash.H2, &value)
if err != nil {
return nil, fmt.Errorf("content lookup scan: %w", err)
}

// This is a content addressable cache, any cached value will never be updated
cl.cache.Set(hash.Hex(), value, int64(len(value)))
cl.cache.Set(hash.Hex(), value, int64(value.Len()))

if hashesToLookup[hash] {
decoded, err := decoder.Value().Decode(value)
decoded, err := decoder.Value().Decode(value.Bytes())
if err != nil {
return nil, fmt.Errorf("cannot decode value from content table %v: %w", hash.Hex(), err)
}
contents[hash] = decoded
} else {
contents[hash] = value
contents[hash] = value.Bytes()
}
}

Expand Down

0 comments on commit aab30ee

Please sign in to comment.