Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/fetcher: don't skip block/header when parent is not found #633

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 116 additions & 37 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package fetcher
import (
"errors"
"math/rand"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -32,10 +33,11 @@ import (
)

const (
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
lightTimeout = time.Millisecond // Time allowance before an announced header is explicitly requested
arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
cleanMissingParentInterval = 30 * time.Second // Interval to clean missing parent mapping
)

const (
Expand Down Expand Up @@ -183,6 +185,10 @@ type BlockFetcher struct {
queues map[string]int // Per peer block counts to prevent memory exhaustion
queued map[common.Hash]*blockOrHeaderInject // Set of already queued blocks (to dedup imports)

missingParentLock sync.Mutex // Protect missingParent mapping from concurrent use
missingParent map[common.Hash][]common.Hash // Mapping from parent hash to slice of block hashes of missing parent blocks
importMissingParent chan common.Hash

// Callbacks
getHeader HeaderRetrievalFn // Retrieves a header from the local chain
getBlock blockRetrievalFn // Retrieves a block from the local chain
Expand All @@ -209,30 +215,32 @@ func NewBlockFetcher(light bool, getHeader HeaderRetrievalFn, getBlock blockRetr
) *BlockFetcher {

return &BlockFetcher{
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
verifyBlobHeader: verifyBlobHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
light: light,
notify: make(chan *blockAnnounce),
inject: make(chan *blockOrHeaderInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
announced: make(map[common.Hash][]*blockAnnounce),
fetching: make(map[common.Hash]*blockAnnounce),
fetched: make(map[common.Hash][]*blockAnnounce),
completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
queued: make(map[common.Hash]*blockOrHeaderInject),
missingParent: make(map[common.Hash][]common.Hash),
importMissingParent: make(chan common.Hash, blockLimit),
getHeader: getHeader,
getBlock: getBlock,
verifyHeader: verifyHeader,
verifyBlobHeader: verifyBlobHeader,
broadcastBlock: broadcastBlock,
chainHeight: chainHeight,
insertHeaders: insertHeaders,
insertChain: insertChain,
dropPeer: dropPeer,
}
}

Expand Down Expand Up @@ -344,13 +352,15 @@ func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transac
func (f *BlockFetcher) loop() {
// Iterate the block fetching until a quit is requested
var (
fetchTimer = time.NewTimer(0)
completeTimer = time.NewTimer(0)
fetchTimer = time.NewTimer(0)
completeTimer = time.NewTimer(0)
cleanMissingParentTicker = time.NewTicker(cleanMissingParentInterval)
)
<-fetchTimer.C // clear out the channel
<-completeTimer.C
defer fetchTimer.Stop()
defer completeTimer.Stop()
defer cleanMissingParentTicker.Stop()

for {
// Clean up any expired block fetches
Expand Down Expand Up @@ -378,7 +388,9 @@ func (f *BlockFetcher) loop() {
}
// Otherwise if fresh and still unknown, try and import
if (number+maxUncleDist < height) || (f.light && f.getHeader(hash) != nil) || (!f.light && f.getBlock(hash) != nil) {
f.missingParentLock.Lock()
f.forgetBlock(hash)
f.missingParentLock.Unlock()
continue
}
if f.light {
Expand Down Expand Up @@ -442,7 +454,9 @@ func (f *BlockFetcher) loop() {
case hash := <-f.done:
// A pending import finished, remove all traces of the notification
f.forgetHash(hash)
f.missingParentLock.Lock()
f.forgetBlock(hash)
f.missingParentLock.Unlock()

case <-fetchTimer.C:
// At least one block's timer ran out, check for needing retrieval
Expand Down Expand Up @@ -684,6 +698,28 @@ func (f *BlockFetcher) loop() {
f.enqueue(announce.origin, nil, block, sidecars)
}
}

case hash := <-f.importMissingParent:
if op := f.queued[hash]; op != nil {
if f.light {
f.importHeaders(op.origin, op.header)
} else {
f.importBlocks(op.origin, op.block, op.sidecars)
}
}
case <-cleanMissingParentTicker.C:
height := f.chainHeight()
f.missingParentLock.Lock()
for _, blocks := range f.missingParent {
for _, block := range blocks {
if op := f.queued[block]; op != nil {
if op.number()+maxUncleDist < height {
f.forgetBlock(block)
}
}
}
}
f.missingParentLock.Unlock()
}
}
}
Expand Down Expand Up @@ -780,13 +816,16 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Importing propagated header", "peer", peer, "number", header.Number, "hash", hash)

go func() {
defer func() { f.done <- hash }()
// If the parent's unknown, abort insertion
// If the parent's unknown, queue for later processing when parent block is imported
parent := f.getHeader(header.ParentHash)
if parent == nil {
log.Debug("Unknown parent of propagated header", "peer", peer, "number", header.Number, "hash", hash, "parent", header.ParentHash)
f.missingParentLock.Lock()
f.missingParent[header.ParentHash] = append(f.missingParent[header.ParentHash], hash)
f.missingParentLock.Unlock()
return
}
defer func() { f.done <- hash }()
// Validate the header and if something went wrong, drop the peer
if err := f.verifyHeader(header); err != nil && err != consensus.ErrFutureBlock {
log.Debug("Propagated header verification failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
Expand All @@ -798,6 +837,14 @@ func (f *BlockFetcher) importHeaders(peer string, header *types.Header) {
log.Debug("Propagated header import failed", "peer", peer, "number", header.Number, "hash", hash, "err", err)
return
}
f.missingParentLock.Lock()
nextBlockHashes, ok := f.missingParent[hash]
f.missingParentLock.Unlock()
if ok {
for _, nextBlockHash := range nextBlockHashes {
f.importMissingParent <- nextBlockHash
}
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(header, nil)
Expand All @@ -814,14 +861,17 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
// Run the import on a new thread
log.Debug("Importing propagated block", "peer", peer, "number", block.Number(), "hash", hash)
go func() {
defer func() { f.done <- hash }()

// If the parent's unknown, abort insertion
parent := f.getBlock(block.ParentHash())
// If the parent's unknown, queue for later processing when parent block is imported
parentHash := block.ParentHash()
parent := f.getBlock(parentHash)
if parent == nil {
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", block.ParentHash())
log.Debug("Unknown parent of propagated block", "peer", peer, "number", block.Number(), "hash", hash, "parent", parentHash)
f.missingParentLock.Lock()
f.missingParent[parentHash] = append(f.missingParent[parentHash], hash)
f.missingParentLock.Unlock()
return
}
defer func() { f.done <- hash }()
// Quickly validate the header and propagate the block if it passes
err := f.verifyHeader(block.Header())
if err == nil {
Expand Down Expand Up @@ -853,6 +903,14 @@ func (f *BlockFetcher) importBlocks(peer string, block *types.Block, sidecars []
blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, nil, false)

f.missingParentLock.Lock()
nextBlockHashes, ok := f.missingParent[hash]
f.missingParentLock.Unlock()
if ok {
for _, nextBlockHash := range nextBlockHashes {
f.importMissingParent <- nextBlockHash
}
}
// Invoke the testing hook if needed
if f.importedHook != nil {
f.importedHook(nil, block)
Expand Down Expand Up @@ -906,12 +964,33 @@ func (f *BlockFetcher) forgetHash(hash common.Hash) {

// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
// The caller must hold the missingParentLock.
func (f *BlockFetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
f.queues[insert.origin]--
if f.queues[insert.origin] == 0 {
delete(f.queues, insert.origin)
}
delete(f.queued, hash)
var parentHash common.Hash
if f.light {
parentHash = insert.header.ParentHash
} else {
parentHash = insert.block.ParentHash()
}
blocks := f.missingParent[parentHash]
for i, block := range blocks {
if block == hash {
// Swap with the last element then decrease the length
blocks[i] = blocks[len(blocks)-1]
blocks = blocks[:len(blocks)-1]
break
}
}
if len(blocks) == 0 {
delete(f.missingParent, parentHash)
} else {
f.missingParent[parentHash] = blocks
}
}
}
Loading