Skip to content

Commit

Permalink
Merge pull request #190 from ethpandaops/pk910/avoid-activity-reproce…
Browse files Browse the repository at this point in the history
…ssing

fix: avoid reprocessing of validator activity
  • Loading branch information
pk910 authored Dec 11, 2024
2 parents 45351a7 + 44f31b6 commit 45e50b0
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 75 deletions.
1 change: 1 addition & 0 deletions indexer/beacon/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Block struct {
processingStatus dbtypes.UnfinalizedBlockStatus
seenMutex sync.RWMutex
seenMap map[uint16]*Client
processedActivity uint8
}

// BlockBodyIndex holds important block propoerties that are used as index for cache lookups.
Expand Down
8 changes: 4 additions & 4 deletions indexer/beacon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats)
refs++
}
cacheStats.ValidatorCache.ValidatorDiffs += uint64(refs)

if validator.recentActivity != nil {
cacheStats.ValidatorCache.ValidatorActivity += uint64(len(validator.recentActivity))
}
}

cacheStats.ValidatorCache.ValidatorData = uint64(len(validatorsMap))

for _, recentActivity := range indexer.validatorCache.validatorActivityMap {
cacheStats.ValidatorCache.ValidatorActivity += uint64(len(recentActivity))
}

cacheStats.ValidatorCache.PubkeyMap = CacheDebugMapSize{
Length: len(indexer.validatorCache.pubkeyMap),
Size: mapsize.Size(indexer.validatorCache.pubkeyMap),
Expand Down
43 changes: 21 additions & 22 deletions indexer/beacon/epochvotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ type EpochVotes struct {
AmountIsCount bool
}

// EpochVoteActivity represents the aggregated activity for an epoch.
type EpochVoteActivity struct {
Epoch phase0.Epoch
ActiveIndices []phase0.ValidatorIndex
ActivityBitfield bitfield.Bitfield
}

// aggregateEpochVotes aggregates the votes for an epoch based on the provided chain state, blocks, and epoch stats.
func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes {
if len(blocks) == 0 {
Expand All @@ -80,13 +73,13 @@ func (indexer *Indexer) aggregateEpochVotes(epoch phase0.Epoch, chainState *cons
return cachedVotes
}

votes, _ := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats)
votes := indexer.aggregateEpochVotesAndActivity(epoch, chainState, blocks, epochStats)
indexer.epochCache.votesCacheMiss++

return votes
}

func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) (*EpochVotes, *EpochVoteActivity) {
func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes {
t1 := time.Now()

var targetRoot phase0.Root
Expand All @@ -110,14 +103,10 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
AmountIsCount: epochStatsValues == nil,
}

var activity *EpochVoteActivity
var activityBitlist bitfield.Bitlist

if epochStatsValues != nil {
activity = &EpochVoteActivity{
Epoch: epoch,
ActiveIndices: epochStatsValues.ActiveIndices,
ActivityBitfield: bitfield.NewBitlist(epochStatsValues.ActiveValidators),
}
activityBitlist = bitfield.NewBitlist(epochStatsValues.ActiveValidators)
}

deduplicationMap := map[voteDeduplicationKey]bool{}
Expand All @@ -135,6 +124,16 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
if err != nil {
continue
}

processedFlag := uint8(1)
if isNextEpoch {
processedFlag = 2
}
processActivity := (block.processedActivity&processedFlag == 0) && votesWithValues && !votesWithPrecalc
if processActivity {
block.processedActivity |= processedFlag
}

for attIdx, attVersioned := range attestations {
attData, err := attVersioned.Data()
if err != nil {
Expand All @@ -154,7 +153,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
voteAmount := phase0.Gwei(0)
slotIndex := chainState.SlotToSlotIndex(attData.Slot)
updateActivity := func(validatorIndex phase0.ValidatorIndex) {
if votesWithValues {
if processActivity {
indexer.validatorCache.updateValidatorActivity(validatorIndex, epoch, attData.Slot, block)
}
}
Expand All @@ -177,7 +176,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
}

if epochStatsValues != nil {
voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, activity, updateActivity)
voteAmt, committeeSize := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset, &activityBitlist, updateActivity)
voteAmount += voteAmt
aggregationBitsOffset += committeeSize
} else {
Expand All @@ -189,7 +188,7 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
} else {
// pre electra attestation aggregation
if epochStatsValues != nil {
voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, activity, updateActivity)
voteAmt, _ := votes.aggregateVotes(epochStatsValues, slotIndex, uint64(attData.Index), attAggregationBits, 0, &activityBitlist, updateActivity)
voteAmount += voteAmt
} else {
voteAmt := votes.aggregateVotesWithoutDuties(deduplicationMap, slotIndex, uint64(attData.Index), attAggregationBits, 1, 0)
Expand Down Expand Up @@ -234,25 +233,25 @@ func (indexer *Indexer) aggregateEpochVotesAndActivity(epoch phase0.Epoch, chain
indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v) [0x%x]", epoch, time.Since(t1), len(blocks), votesKey[:])
indexer.epochCache.votesCache.Add(votesKey, votes)

return votes, activity
return votes
}

// aggregateVotes aggregates the votes for a specific slot and committee based on the provided epoch statistics, aggregation bits, and offset.
func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activity *EpochVoteActivity, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) {
func (votes *EpochVotes) aggregateVotes(epochStatsValues *EpochStatsValues, slotIndex phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64, activityBitlist *bitfield.Bitlist, updateActivity func(validatorIndex phase0.ValidatorIndex)) (phase0.Gwei, uint64) {
voteAmount := phase0.Gwei(0)

voteDuties := epochStatsValues.AttesterDuties[slotIndex][committee]
for bitIdx, validatorIndice := range voteDuties {
if aggregationBits.BitAt(uint64(bitIdx) + aggregationBitsOffset) {

if activity.ActivityBitfield.BitAt(uint64(validatorIndice)) {
if activityBitlist.BitAt(uint64(validatorIndice)) {
continue
}

effectiveBalance := epochStatsValues.EffectiveBalances[validatorIndice]
voteAmount += phase0.Gwei(effectiveBalance) * EtherGweiFactor

activity.ActivityBitfield.SetBitAt(uint64(validatorIndice), true)
activityBitlist.SetBitAt(uint64(validatorIndice), true)

validatorIndex := epochStatsValues.ActiveIndices[validatorIndice]
updateActivity(validatorIndex)
Expand Down
76 changes: 27 additions & 49 deletions indexer/beacon/validatorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
type validatorCache struct {
indexer *Indexer

valsetCache []*validatorEntry // cache for validators
cacheMutex sync.RWMutex // mutex to protect valsetCache for concurrent access
activityMutex sync.RWMutex // mutex to protect recentActivity for concurrent access
lastFinalized phase0.Epoch // last finalized epoch
oldestActivityEpoch phase0.Epoch // oldest epoch in activity cache
pubkeyMap map[phase0.BLSPubKey]phase0.ValidatorIndex
pubkeyMutex sync.RWMutex // mutex to protect pubkeyMap for concurrent access
valsetCache []*validatorEntry // cache for validators
cacheMutex sync.RWMutex // mutex to protect valsetCache for concurrent access
validatorActivityMap map[phase0.ValidatorIndex][]ValidatorActivity
activityMutex sync.RWMutex // mutex to protect recentActivity for concurrent access
lastFinalized phase0.Epoch // last finalized epoch
oldestActivityEpoch phase0.Epoch // oldest epoch in activity cache
pubkeyMap map[phase0.BLSPubKey]phase0.ValidatorIndex
pubkeyMutex sync.RWMutex // mutex to protect pubkeyMap for concurrent access
}

// validatorDiffKey is the primary key for validatorDiff entries in cache.
Expand All @@ -42,7 +43,6 @@ type validatorEntry struct {
index phase0.ValidatorIndex
finalValidator *phase0.Validator
validatorDiffs map[validatorDiffKey]*validatorDiff
recentActivity []ValidatorActivity
}

// ValidatorActivity represents a validator's activity in an epoch.
Expand Down Expand Up @@ -73,9 +73,10 @@ type validatorDiff struct {
// newValidatorCache creates & returns a new instance of validatorCache.
func newValidatorCache(indexer *Indexer) *validatorCache {
cache := &validatorCache{
indexer: indexer,
oldestActivityEpoch: math.MaxInt64,
pubkeyMap: make(map[phase0.BLSPubKey]phase0.ValidatorIndex),
indexer: indexer,
validatorActivityMap: make(map[phase0.ValidatorIndex][]ValidatorActivity),
oldestActivityEpoch: math.MaxInt64,
pubkeyMap: make(map[phase0.BLSPubKey]phase0.ValidatorIndex),
}

return cache
Expand Down Expand Up @@ -210,31 +211,19 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
cache.oldestActivityEpoch = cutOffEpoch + 1
}

cache.cacheMutex.RLock()

if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) {
cache.cacheMutex.RUnlock()
return
}

cachedValidator := cache.valsetCache[validatorIndex]
cache.cacheMutex.RUnlock()
if cachedValidator == nil {
return
}

cache.activityMutex.Lock()
defer cache.activityMutex.Unlock()

if cachedValidator.recentActivity == nil {
cachedValidator.recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength)
recentActivity := cache.validatorActivityMap[validatorIndex]
if recentActivity == nil {
recentActivity = make([]ValidatorActivity, 0, cache.indexer.activityHistoryLength)
}

replaceIndex := -1
cutOffLength := 0
activityLength := len(cachedValidator.recentActivity)
activityLength := len(recentActivity)
for i := activityLength - 1; i >= 0; i-- {
activity := cachedValidator.recentActivity[i]
activity := recentActivity[i]
if activity.VoteBlock == voteBlock {
// already exists
return
Expand All @@ -246,7 +235,7 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
}

if chainState.EpochOfSlot(dutySlot) < cutOffEpoch {
cachedValidator.recentActivity[i].VoteBlock = nil // clear for gc
recentActivity[i].VoteBlock = nil // clear for gc
if replaceIndex == -1 {
replaceIndex = i
} else if replaceIndex == activityLength-cutOffLength-1 {
Expand All @@ -255,26 +244,28 @@ func (cache *validatorCache) updateValidatorActivity(validatorIndex phase0.Valid
} else {
// copy last element to current index
cutOffLength++
cachedValidator.recentActivity[i] = cachedValidator.recentActivity[activityLength-cutOffLength-1]
recentActivity[i] = recentActivity[activityLength-cutOffLength-1]
}
}
}

if replaceIndex != -1 {
cachedValidator.recentActivity[replaceIndex] = ValidatorActivity{
recentActivity[replaceIndex] = ValidatorActivity{
VoteBlock: voteBlock,
VoteDelay: uint16(voteBlock.Slot - dutySlot),
}

if cutOffLength > 0 {
cachedValidator.recentActivity = cachedValidator.recentActivity[:activityLength-cutOffLength]
recentActivity = recentActivity[:activityLength-cutOffLength]
}
} else {
cachedValidator.recentActivity = append(cachedValidator.recentActivity, ValidatorActivity{
recentActivity = append(recentActivity, ValidatorActivity{
VoteBlock: voteBlock,
VoteDelay: uint16(voteBlock.Slot - dutySlot),
})
}

cache.validatorActivityMap[validatorIndex] = recentActivity
}

// setFinalizedEpoch sets the last finalized epoch.
Expand Down Expand Up @@ -416,25 +407,12 @@ func (cache *validatorCache) getValidatorIndexByPubkey(pubkey phase0.BLSPubKey)

// getValidatorActivity returns the validator activity for a given validator index.
func (cache *validatorCache) getValidatorActivity(validatorIndex phase0.ValidatorIndex) []ValidatorActivity {
cache.cacheMutex.RLock()

if validatorIndex >= phase0.ValidatorIndex(len(cache.valsetCache)) {
cache.cacheMutex.RUnlock()
return []ValidatorActivity{}
}

cachedValidator := cache.valsetCache[validatorIndex]
cache.cacheMutex.RUnlock()

if cachedValidator == nil {
return []ValidatorActivity{}
}

cache.activityMutex.RLock()
defer cache.activityMutex.RUnlock()

recentActivity := make([]ValidatorActivity, 0, len(cachedValidator.recentActivity))
for _, activity := range cachedValidator.recentActivity {
cachedActivity := cache.validatorActivityMap[validatorIndex]
recentActivity := make([]ValidatorActivity, 0, len(cachedActivity))
for _, activity := range cachedActivity {
if activity.VoteBlock != nil {
recentActivity = append(recentActivity, activity)
}
Expand Down

0 comments on commit 45e50b0

Please sign in to comment.