From 44f31b60ec9c389963b73dc5dc743701bc2ab839 Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 11 Dec 2024 16:01:15 +0100 Subject: [PATCH] avoid reprocessing of validator activity --- indexer/beacon/block.go | 1 + indexer/beacon/debug.go | 8 ++-- indexer/beacon/epochvotes.go | 43 +++++++++--------- indexer/beacon/validatorcache.go | 76 ++++++++++++-------------------- 4 files changed, 53 insertions(+), 75 deletions(-) diff --git a/indexer/beacon/block.go b/indexer/beacon/block.go index 04488676..ae6a703a 100644 --- a/indexer/beacon/block.go +++ b/indexer/beacon/block.go @@ -35,6 +35,7 @@ type Block struct { processingStatus dbtypes.UnfinalizedBlockStatus seenMutex sync.RWMutex seenMap map[uint16]*Client + processedActivity uint8 } // BlockBodyIndex holds important block propoerties that are used as index for cache lookups. diff --git a/indexer/beacon/debug.go b/indexer/beacon/debug.go index 2ded4686..4fa749d8 100644 --- a/indexer/beacon/debug.go +++ b/indexer/beacon/debug.go @@ -162,14 +162,14 @@ func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats) refs++ } cacheStats.ValidatorCache.ValidatorDiffs += uint64(refs) - - if validator.recentActivity != nil { - cacheStats.ValidatorCache.ValidatorActivity += uint64(len(validator.recentActivity)) - } } cacheStats.ValidatorCache.ValidatorData = uint64(len(validatorsMap)) + for _, recentActivity := range indexer.validatorCache.validatorActivityMap { + cacheStats.ValidatorCache.ValidatorActivity += uint64(len(recentActivity)) + } + cacheStats.ValidatorCache.PubkeyMap = CacheDebugMapSize{ Length: len(indexer.validatorCache.pubkeyMap), Size: mapsize.Size(indexer.validatorCache.pubkeyMap), diff --git a/indexer/beacon/epochvotes.go b/indexer/beacon/epochvotes.go index c61aea07..39d3f4b0 100644 --- a/indexer/beacon/epochvotes.go +++ b/indexer/beacon/epochvotes.go @@ -51,13 +51,6 @@ type EpochVotes struct { AmountIsCount bool } -// EpochVoteActivity represents the aggregated activity for an epoch. -type EpochVoteActivity struct { - Epoch phase0.Epoch - ActiveIndices []phase0.ValidatorIndex - ActivityBitfield bitfield.Bitfield -} - // aggregateEpochVotes aggregates the votes for an epoch based on the provided chain state, blocks, and epoch stats. func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes { if len(blocks) == 0 { @@ -80,13 +73,13 @@ func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *cons return cachedVotes } - votes, _ := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats) + votes := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats) indexer.epochCache.votesCacheMiss++ return votes } -func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) (*EpochVotes, *EpochVoteActivity) { +func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes { t1 := time.Now() var targetRoot phase0.Root @@ -110,14 +103,10 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain AmountIsCount: epochStatsValues == nil, } - var activity *EpochVoteActivity + var activityBitlist bitfield.Bitlist if epochStatsValues != nil { - activity = &EpochVoteActivity{ - Epoch: epoch, - ActiveIndices: epochStatsValues.ActiveIndices, - ActivityBitfield: bitfield.NewBitlist(epochStatsValues.ActiveValidators), - } + activityBitlist = bitfield.NewBitlist(epochStatsValues.ActiveValidators) } deduplicationMap := map[voteDeduplicationKey]bool{} @@ -135,6 +124,16 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain if err != nil { continue } + + processedFlag := uint8(1) + if isNextEpoch { + processedFlag = 2 + } + processActivity := (block.processedActivity&processedFlag == 0) && votesWithValues && !votesWithPrecalc + if processActivity { + block.processedActivity |= processedFlag + } + for attIdx, attVersioned := range attestations { attData, err := attVersioned.Data() if err != nil { @@ -154,7 +153,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain voteAmount := phase0.Gwei(0) slotIndex := chainState.SlotToSlotIndex(attData.Slot) updateActivity := func(validatorIndex phase0.ValidatorIndex) { - if votesWithValues { + if processActivity { indexer.validatorCache.updateValidatorActivity(validatorIndex, epoch, attData.Slot, block) } } @@ -177,7 +176,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain } if epochStatsValues != nil { - voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, activity, updateActivity) + voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, &activityBitlist, updateActivity) voteAmount += voteAmt aggregationBitsOffset += committeeSize } else { @@ -189,7 +188,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain } else { // pre electra attestation aggregation if epochStatsValues != nil { - voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, activity, updateActivity) + voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, &activityBitlist, updateActivity) voteAmount += voteAmt } else { voteAmt := votes.aggregateVotesWithoutDuties(deduplicationMap, slotIndex, uint64(attData.Index), attAggregationBits, 1, 0) @@ -234,25 +233,25 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:]) indexer.epochCache.votesCache.Add(votesKey, votes) - return votes, activity + return votes } // aggregateVotes aggregates the votes for a specific slot and committee based on the provided epoch statistics, aggregation bits, and offset. -func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activity *EpochVoteActivity, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) { +func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activityBitlist *bitfield.Bitlist, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) { voteAmount := phase0.Gwei(0) voteDuties := epochStatsValues.AttesterDuties[slotIndex][committee] for bitIdx, validatorIndice := range voteDuties { if aggregationBits.BitAt(uint64(bitIdx) + aggregationBitsOffset) { - if activity.ActivityBitfield.BitAt(uint64(validatorIndice)) { + if activityBitlist.BitAt(uint64(validatorIndice)) { continue } effectiveBalance := epochStatsValues.EffectiveBalances[validatorIndice] voteAmount += phase0.Gwei(effectiveBalance) * EtherGweiFactor - activity.ActivityBitfield.SetBitAt(uint64(validatorIndice), true) + activityBitlist.SetBitAt(uint64(validatorIndice), true) validatorIndex := epochStatsValues.ActiveIndices[validatorIndice] updateActivity(validatorIndex) diff --git a/indexer/beacon/validatorcache.go b/indexer/beacon/validatorcache.go index ba554859..733184c1 100644 --- a/indexer/beacon/validatorcache.go +++ b/indexer/beacon/validatorcache.go @@ -14,13 +14,14 @@ import ( type validatorCache struct { indexer *Indexer - valsetCache []*validatorEntry // cache for validators - cacheMutex sync.RWMutex // mutex to protect valsetCache for concurrent access - activityMutex sync.RWMutex // mutex to protect recentActivity for concurrent access - lastFinalized phase0.Epoch // last finalized epoch - oldestActivityEpoch phase0.Epoch // oldest epoch in activity cache - pubkeyMap map[phase0.BLSPubKey]phase0.ValidatorIndex - pubkeyMutex sync.RWMutex // mutex to protect pubkeyMap for concurrent access + valsetCache []*validatorEntry // cache for validators + cacheMutex sync.RWMutex // mutex to protect valsetCache for concurrent access + validatorActivityMap map[phase0.ValidatorIndex][]ValidatorActivity + activityMutex sync.RWMutex // mutex to protect recentActivity for concurrent access + lastFinalized phase0.Epoch // last finalized epoch + oldestActivityEpoch phase0.Epoch // oldest epoch in activity cache + pubkeyMap map[phase0.BLSPubKey]phase0.ValidatorIndex + pubkeyMutex sync.RWMutex // mutex to protect pubkeyMap for concurrent access } // validatorDiffKey is the primary key for validatorDiff entries in cache. @@ -42,7 +43,6 @@ type validatorEntry struct { index phase0.ValidatorIndex finalValidator *phase0.Validator validatorDiffs map[validatorDiffKey]*validatorDiff - recentActivity []ValidatorActivity } // ValidatorActivity represents a validator's activity in an epoch. @@ -73,9 +73,10 @@ type validatorDiff struct { // newValidatorCache creates & returns a new instance of validatorCache. func newValidatorCache(indexer *Indexer) *validatorCache { cache := &validatorCache{ - indexer: indexer, - oldestActivityEpoch: math.MaxInt64, - pubkeyMap: make(map[phase0.BLSPubKey]phase0.ValidatorIndex), + indexer: indexer, + validatorActivityMap: make(map[phase0.ValidatorIndex][]ValidatorActivity), + oldestActivityEpoch: math.MaxInt64, + pubkeyMap: make(map[phase0.BLSPubKey]phase0.ValidatorIndex), } return cache @@ -210,31 +211,19 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid cache.oldestActivityEpoch = cutOffEpoch + 1 } - cache.cacheMutex.RLock() - - if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) { - cache.cacheMutex.RUnlock() - return - } - - cachedValidator := cache.valsetCache[validatorIndex] - cache.cacheMutex.RUnlock() - if cachedValidator == nil { - return - } - cache.activityMutex.Lock() defer cache.activityMutex.Unlock() - if cachedValidator.recentActivity == nil { - cachedValidator.recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength) + recentActivity := cache.validatorActivityMap[validatorIndex] + if recentActivity == nil { + recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength) } replaceIndex := -1 cutOffLength := 0 - activityLength := len(cachedValidator.recentActivity) + activityLength := len(recentActivity) for i := activityLength - 1; i >= 0; i-- { - activity := cachedValidator.recentActivity[i] + activity := recentActivity[i] if activity.VoteBlock == voteBlock { // already exists return @@ -246,7 +235,7 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid } if chainState.EpochOfSlot(dutySlot) < cutOffEpoch { - cachedValidator.recentActivity[i].VoteBlock = nil // clear for gc + recentActivity[i].VoteBlock = nil // clear for gc if replaceIndex == -1 { replaceIndex = i } else if replaceIndex == activityLength-cutOffLength-1 { @@ -255,26 +244,28 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid } else { // copy last element to current index cutOffLength++ - cachedValidator.recentActivity[i] = cachedValidator.recentActivity[activityLength-cutOffLength-1] + recentActivity[i] = recentActivity[activityLength-cutOffLength-1] } } } if replaceIndex != -1 { - cachedValidator.recentActivity[replaceIndex] = ValidatorActivity{ + recentActivity[replaceIndex] = ValidatorActivity{ VoteBlock: voteBlock, VoteDelay: uint16(voteBlock.Slot - dutySlot), } if cutOffLength > 0 { - cachedValidator.recentActivity = cachedValidator.recentActivity[:activityLength-cutOffLength] + recentActivity = recentActivity[:activityLength-cutOffLength] } } else { - cachedValidator.recentActivity = append(cachedValidator.recentActivity, ValidatorActivity{ + recentActivity = append(recentActivity, ValidatorActivity{ VoteBlock: voteBlock, VoteDelay: uint16(voteBlock.Slot - dutySlot), }) } + + cache.validatorActivityMap[validatorIndex] = recentActivity } // setFinalizedEpoch sets the last finalized epoch. @@ -416,25 +407,12 @@ func (cache *validatorCache) getValidatorIndexByPubkey(pubkey phase0.BLSPubKey) // getValidatorActivity returns the validator activity for a given validator index. func (cache *validatorCache) getValidatorActivity(validatorIndex phase0.ValidatorIndex) []ValidatorActivity { - cache.cacheMutex.RLock() - - if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) { - cache.cacheMutex.RUnlock() - return []ValidatorActivity{} - } - - cachedValidator := cache.valsetCache[validatorIndex] - cache.cacheMutex.RUnlock() - - if cachedValidator == nil { - return []ValidatorActivity{} - } - cache.activityMutex.RLock() defer cache.activityMutex.RUnlock() - recentActivity := make([]ValidatorActivity, 0, len(cachedValidator.recentActivity)) - for _, activity := range cachedValidator.recentActivity { + cachedActivity := cache.validatorActivityMap[validatorIndex] + recentActivity := make([]ValidatorActivity, 0, len(cachedActivity)) + for _, activity := range cachedActivity { if activity.VoteBlock != nil { recentActivity = append(recentActivity, activity) }