Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid reprocessing of validator activity #190

Merged
merged 1 commit into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions indexer/beacon/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Block struct {
processingStatus dbtypes.UnfinalizedBlockStatus
seenMutex sync.RWMutex
seenMap map[uint16]*Client
processedActivity uint8
}

// BlockBodyIndex holds important block propoerties that are used as index for cache lookups.
Expand Down
8 changes: 4 additions & 4 deletions indexer/beacon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats)
refs++
}
cacheStats.ValidatorCache.ValidatorDiffs += uint64(refs)

if validator.recentActivity != nil {
cacheStats.ValidatorCache.ValidatorActivity += uint64(len(validator.recentActivity))
}
}

cacheStats.ValidatorCache.ValidatorData = uint64(len(validatorsMap))

for _, recentActivity := range indexer.validatorCache.validatorActivityMap {
cacheStats.ValidatorCache.ValidatorActivity += uint64(len(recentActivity))
}

cacheStats.ValidatorCache.PubkeyMap = CacheDebugMapSize{
Length: len(indexer.validatorCache.pubkeyMap),
Size: mapsize.Size(indexer.validatorCache.pubkeyMap),
Expand Down
43 changes: 21 additions & 22 deletions indexer/beacon/epochvotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ type EpochVotes struct {
AmountIsCount bool
}

// EpochVoteActivity represents the aggregated activity for an epoch.
type EpochVoteActivity struct {
Epoch phase0.Epoch
ActiveIndices []phase0.ValidatorIndex
ActivityBitfield bitfield.Bitfield
}

// aggregateEpochVotes aggregates the votes for an epoch based on the provided chain state, blocks, and epoch stats.
func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes {
if len(blocks) == 0 {
Expand All @@ -80,13 +73,13 @@ func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *cons
return cachedVotes
}

votes, _ := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats)
votes := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats)
indexer.epochCache.votesCacheMiss++

return votes
}

func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) (*EpochVotes, *EpochVoteActivity) {
func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes {
t1 := time.Now()

var targetRoot phase0.Root
Expand All @@ -110,14 +103,10 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
AmountIsCount: epochStatsValues == nil,
}

var activity *EpochVoteActivity
var activityBitlist bitfield.Bitlist

if epochStatsValues != nil {
activity = &EpochVoteActivity{
Epoch: epoch,
ActiveIndices: epochStatsValues.ActiveIndices,
ActivityBitfield: bitfield.NewBitlist(epochStatsValues.ActiveValidators),
}
activityBitlist = bitfield.NewBitlist(epochStatsValues.ActiveValidators)
}

deduplicationMap := map[voteDeduplicationKey]bool{}
Expand All @@ -135,6 +124,16 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
if err != nil {
continue
}

processedFlag := uint8(1)
if isNextEpoch {
processedFlag = 2
}
processActivity := (block.processedActivity&processedFlag == 0) && votesWithValues && !votesWithPrecalc
if processActivity {
block.processedActivity |= processedFlag
}

for attIdx, attVersioned := range attestations {
attData, err := attVersioned.Data()
if err != nil {
Expand All @@ -154,7 +153,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
voteAmount := phase0.Gwei(0)
slotIndex := chainState.SlotToSlotIndex(attData.Slot)
updateActivity := func(validatorIndex phase0.ValidatorIndex) {
if votesWithValues {
if processActivity {
indexer.validatorCache.updateValidatorActivity(validatorIndex, epoch, attData.Slot, block)
}
}
Expand All @@ -177,7 +176,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
}

if epochStatsValues != nil {
voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, activity, updateActivity)
voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, &activityBitlist, updateActivity)
voteAmount += voteAmt
aggregationBitsOffset += committeeSize
} else {
Expand All @@ -189,7 +188,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
} else {
// pre electra attestation aggregation
if epochStatsValues != nil {
voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, activity, updateActivity)
voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, &activityBitlist, updateActivity)
voteAmount += voteAmt
} else {
voteAmt := votes.aggregateVotesWithoutDuties(deduplicationMap, slotIndex, uint64(attData.Index), attAggregationBits, 1, 0)
Expand Down Expand Up @@ -234,25 +233,25 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:])
indexer.epochCache.votesCache.Add(votesKey, votes)

return votes, activity
return votes
}

// aggregateVotes aggregates the votes for a specific slot and committee based on the provided epoch statistics, aggregation bits, and offset.
func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activity *EpochVoteActivity, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) {
func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activityBitlist *bitfield.Bitlist, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) {
voteAmount := phase0.Gwei(0)

voteDuties := epochStatsValues.AttesterDuties[slotIndex][committee]
for bitIdx, validatorIndice := range voteDuties {
if aggregationBits.BitAt(uint64(bitIdx) + aggregationBitsOffset) {

if activity.ActivityBitfield.BitAt(uint64(validatorIndice)) {
if activityBitlist.BitAt(uint64(validatorIndice)) {
continue
}

effectiveBalance := epochStatsValues.EffectiveBalances[validatorIndice]
voteAmount += phase0.Gwei(effectiveBalance) * EtherGweiFactor

activity.ActivityBitfield.SetBitAt(uint64(validatorIndice), true)
activityBitlist.SetBitAt(uint64(validatorIndice), true)

validatorIndex := epochStatsValues.ActiveIndices[validatorIndice]
updateActivity(validatorIndex)
Expand Down
76 changes: 27 additions & 49 deletions indexer/beacon/validatorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
type validatorCache struct {
indexer *Indexer

valsetCache []*validatorEntry // cache for validators
cacheMutex sync.RWMutex // mutex to protect valsetCache for concurrent access
activityMutex sync.RWMutex // mutex to protect recentActivity for concurrent access
lastFinalized phase0.Epoch // last finalized epoch
oldestActivityEpoch phase0.Epoch // oldest epoch in activity cache
pubkeyMap map[phase0.BLSPubKey]phase0.ValidatorIndex
pubkeyMutex sync.RWMutex // mutex to protect pubkeyMap for concurrent access
valsetCache []*validatorEntry // cache for validators
cacheMutex sync.RWMutex // mutex to protect valsetCache for concurrent access
validatorActivityMap map[phase0.ValidatorIndex][]ValidatorActivity
activityMutex sync.RWMutex // mutex to protect recentActivity for concurrent access
lastFinalized phase0.Epoch // last finalized epoch
oldestActivityEpoch phase0.Epoch // oldest epoch in activity cache
pubkeyMap map[phase0.BLSPubKey]phase0.ValidatorIndex
pubkeyMutex sync.RWMutex // mutex to protect pubkeyMap for concurrent access
}

// validatorDiffKey is the primary key for validatorDiff entries in cache.
Expand All @@ -42,7 +43,6 @@ type validatorEntry struct {
index phase0.ValidatorIndex
finalValidator *phase0.Validator
validatorDiffs map[validatorDiffKey]*validatorDiff
recentActivity []ValidatorActivity
}

// ValidatorActivity represents a validator's activity in an epoch.
Expand Down Expand Up @@ -73,9 +73,10 @@ type validatorDiff struct {
// newValidatorCache creates & returns a new instance of validatorCache.
func newValidatorCache(indexer *Indexer) *validatorCache {
cache := &validatorCache{
indexer: indexer,
oldestActivityEpoch: math.MaxInt64,
pubkeyMap: make(map[phase0.BLSPubKey]phase0.ValidatorIndex),
indexer: indexer,
validatorActivityMap: make(map[phase0.ValidatorIndex][]ValidatorActivity),
oldestActivityEpoch: math.MaxInt64,
pubkeyMap: make(map[phase0.BLSPubKey]phase0.ValidatorIndex),
}

return cache
Expand Down Expand Up @@ -210,31 +211,19 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
cache.oldestActivityEpoch = cutOffEpoch + 1
}

cache.cacheMutex.RLock()

if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) {
cache.cacheMutex.RUnlock()
return
}

cachedValidator := cache.valsetCache[validatorIndex]
cache.cacheMutex.RUnlock()
if cachedValidator == nil {
return
}

cache.activityMutex.Lock()
defer cache.activityMutex.Unlock()

if cachedValidator.recentActivity == nil {
cachedValidator.recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength)
recentActivity := cache.validatorActivityMap[validatorIndex]
if recentActivity == nil {
recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength)
}

replaceIndex := -1
cutOffLength := 0
activityLength := len(cachedValidator.recentActivity)
activityLength := len(recentActivity)
for i := activityLength - 1; i >= 0; i-- {
activity := cachedValidator.recentActivity[i]
activity := recentActivity[i]
if activity.VoteBlock == voteBlock {
// already exists
return
Expand All @@ -246,7 +235,7 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
}

if chainState.EpochOfSlot(dutySlot) < cutOffEpoch {
cachedValidator.recentActivity[i].VoteBlock = nil // clear for gc
recentActivity[i].VoteBlock = nil // clear for gc
if replaceIndex == -1 {
replaceIndex = i
} else if replaceIndex == activityLength-cutOffLength-1 {
Expand All @@ -255,26 +244,28 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
} else {
// copy last element to current index
cutOffLength++
cachedValidator.recentActivity[i] = cachedValidator.recentActivity[activityLength-cutOffLength-1]
recentActivity[i] = recentActivity[activityLength-cutOffLength-1]
}
}
}

if replaceIndex != -1 {
cachedValidator.recentActivity[replaceIndex] = ValidatorActivity{
recentActivity[replaceIndex] = ValidatorActivity{
VoteBlock: voteBlock,
VoteDelay: uint16(voteBlock.Slot - dutySlot),
}

if cutOffLength > 0 {
cachedValidator.recentActivity = cachedValidator.recentActivity[:activityLength-cutOffLength]
recentActivity = recentActivity[:activityLength-cutOffLength]
}
} else {
cachedValidator.recentActivity = append(cachedValidator.recentActivity, ValidatorActivity{
recentActivity = append(recentActivity, ValidatorActivity{
VoteBlock: voteBlock,
VoteDelay: uint16(voteBlock.Slot - dutySlot),
})
}

cache.validatorActivityMap[validatorIndex] = recentActivity
}

// setFinalizedEpoch sets the last finalized epoch.
Expand Down Expand Up @@ -416,25 +407,12 @@ func (cache *validatorCache) getValidatorIndexByPubkey(pubkey phase0.BLSPubKey)

// getValidatorActivity returns the validator activity for a given validator index.
func (cache *validatorCache) getValidatorActivity(validatorIndex phase0.ValidatorIndex) []ValidatorActivity {
cache.cacheMutex.RLock()

if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) {
cache.cacheMutex.RUnlock()
return []ValidatorActivity{}
}

cachedValidator := cache.valsetCache[validatorIndex]
cache.cacheMutex.RUnlock()

if cachedValidator == nil {
return []ValidatorActivity{}
}

cache.activityMutex.RLock()
defer cache.activityMutex.RUnlock()

recentActivity := make([]ValidatorActivity, 0, len(cachedValidator.recentActivity))
for _, activity := range cachedValidator.recentActivity {
cachedActivity := cache.validatorActivityMap[validatorIndex]
recentActivity := make([]ValidatorActivity, 0, len(cachedActivity))
for _, activity := range cachedActivity {
if activity.VoteBlock != nil {
recentActivity = append(recentActivity, activity)
}
Expand Down
Loading