Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Detect and recover from TCP Simultaneous open #2950

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ type Dialer interface {
type AddrDelay struct {
Addr ma.Multiaddr
Delay time.Duration
// ForceDelay forces the dialer to wait for the full delay before dialing,
// even if there are no other dials in flight.
ForceDelay bool
}

// DialRanker provides a schedule of dialing the provided addresses
Expand Down
2 changes: 2 additions & 0 deletions core/sec/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,5 @@ func (e ErrPeerIDMismatch) Error() string {
}

var _ error = (*ErrPeerIDMismatch)(nil)

var ErrSimOpen = fmt.Errorf("TCP simultaneous open caused security handshake to fail")
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ require (
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)

// TODO remove once we have a go-multistream release
replace github.com/multiformats/go-multistream => github.com/multiformats/go-multistream v0.5.1-0.20240903234121-990321dd2b7f
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U=
github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM=
github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE=
github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA=
github.com/multiformats/go-multistream v0.5.1-0.20240903234121-990321dd2b7f h1:M6Yt9ZHP5HWoV9FwzzwBSvrrR0vPNzzsp+cCmIF4gVY=
github.com/multiformats/go-multistream v0.5.1-0.20240903234121-990321dd2b7f/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg=
github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE=
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
Expand Down
24 changes: 22 additions & 2 deletions p2p/net/swarm/dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// dialWorkerFunc is used by dialSync to spawn a new dial worker
type dialWorkerFunc func(peer.ID, <-chan dialRequest)
type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest)

// errConcurrentDialSuccessful is used to signal that a concurrent dial succeeded
var errConcurrentDialSuccessful = errors.New("concurrent dial successful")
Expand Down Expand Up @@ -55,13 +55,17 @@ func (ad *activeDial) dial(ctx context.Context) (*Conn, error) {
case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}:
case <-ctx.Done():
return nil, ctx.Err()
case <-dialCtx.Done():
return nil, dialCtx.Err()
}

select {
case res := <-resch:
return res.conn, res.err
case <-ctx.Done():
return nil, ctx.Err()
case <-dialCtx.Done():
return nil, dialCtx.Err()
}
}

Expand All @@ -79,7 +83,7 @@ func (ds *dialSync) getActiveDial(p peer.ID) (*activeDial, error) {
cancelCause: cancel,
reqch: make(chan dialRequest),
}
go ds.dialWorker(p, actd.reqch)
go ds.dialWorker(ctx, p, actd.reqch)
ds.dials[p] = actd
}
// increase ref count before dropping mutex
Expand All @@ -96,6 +100,12 @@ func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {
}

conn, err := ad.dial(ctx)
if cause := context.Cause(ad.ctx); cause != nil {
var haveInboundConn errHaveInboundConn
if errors.As(cause, &haveInboundConn) {
conn, err = haveInboundConn.c, nil
}
}

ds.mutex.Lock()
defer ds.mutex.Unlock()
Expand All @@ -113,3 +123,13 @@ func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) {

return conn, err
}

func (ds *dialSync) CancelActiveDial(p peer.ID, cause error) {
ds.mutex.Lock()
ad, ok := ds.dials[p]
ds.mutex.Unlock()
if !ok {
return
}
ad.cancelCause(cause)
}
53 changes: 45 additions & 8 deletions p2p/net/swarm/dial_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@

import (
"context"
"errors"
"fmt"

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.22.x)

"fmt" imported and not used

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-check / All

"fmt" imported and not used

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.23.x)

"fmt" imported and not used

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.22.x)

"fmt" imported and not used

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.23.x)

"fmt" imported and not used

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.22.x)

"fmt" imported and not used

Check failure on line 6 in p2p/net/swarm/dial_worker.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.23.x)

"fmt" imported and not used
"math"
"strings"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/sec"
tpt "github.com/libp2p/go-libp2p/core/transport"
"golang.org/x/exp/rand"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
)

const maxTCPSimOpenRetries = 3

// dialRequest is structure used to request dials to the peer associated with a
// worker loop
type dialRequest struct {
Expand Down Expand Up @@ -66,19 +73,21 @@
dialRankingDelay time.Duration
// expectedTCPUpgradeTime is the expected time by which security upgrade will complete
expectedTCPUpgradeTime time.Time
retryCount uint8
}

// dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a
// peer's address
type dialWorker struct {
ctx context.Context
s *Swarm
peer peer.ID
// reqch is used to send dial requests to the worker. close reqch to end the worker loop
reqch <-chan dialRequest
// pendingRequests is the set of pendingRequests
pendingRequests map[*pendRequest]struct{}
// trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that
// we dial an address at most once
// we dial an address at most once. Maybe twice in the case of an error from TCP simultaneous open
trackedDials map[string]*addrDial
// resch is used to receive response for dials to the peers addresses.
resch chan tpt.DialUpdate
Expand All @@ -90,11 +99,12 @@
cl Clock
}

func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dialWorker {
func newDialWorker(ctx context.Context, s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dialWorker {
if cl == nil {
cl = RealClock{}
}
return &dialWorker{
ctx: ctx,
s: s,
peer: p,
reqch: reqch,
Expand Down Expand Up @@ -130,7 +140,8 @@
}
timerRunning = false
if dq.Len() > 0 {
if dialsInFlight == 0 && !w.connected {
top := dq.top()
if !top.ForceDelay && dialsInFlight == 0 && !w.connected {
Comment on lines +143 to +144
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this, how about we introduce another queue:
fixedDelayQueue.

  1. This construct forces us to expose ForceRetry, which external users never need to use.
  2. Makes the semantics slightly incorrect for dials that are behind in the queue but don't have the forceRetryFlag.

I suggest introducing:

  • a separate fixed Delay Queue with its separate fixedDelayTimer
  • adding a now time.Time argument to NextBatch that'll return elements that have expired by now
  • Have two blocks for handling dials, 1 for dialQueue and 1 for fixedDelayDialQueue.

I don't feel strongly about the suggestion. Anything that fixes 2 would be great.

// if there are no dials in flight, trigger the next dials immediately
dialTimer.Reset(startTime)
} else {
Expand Down Expand Up @@ -159,6 +170,8 @@
// interested in dials on this address.

select {
case <-w.ctx.Done():
return
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
case req, ok := <-w.reqch:
if !ok {
if w.s.metricsTracer != nil {
Expand Down Expand Up @@ -368,13 +381,37 @@

// it must be an error -- add backoff if applicable and dispatch
// ErrDialRefusedBlackHole shouldn't end up here, just a safety check
if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected {
// we only add backoff if there has not been a successful connection
// for consistency with the old dialer behavior.
w.s.backf.AddBackoff(w.peer, res.Addr)
} else if res.Err == ErrDialRefusedBlackHole {
switch {
case res.Err == ErrDialRefusedBlackHole:
log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s",
w.peer, res.Addr)
case res.Err == context.Canceled:
case ad.retryCount < maxTCPSimOpenRetries && errors.Is(res.Err, sec.ErrSimOpen):
now := time.Now()
// these are new addresses, track them and add them to dq
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
w.trackedDials[string(ad.addr.Bytes())] = &addrDial{
addr: ad.addr,
ctx: ad.ctx,
createdAt: now,
retryCount: ad.retryCount + 1,
}
// This is an error due to simultaneous open. Let the "smaller"
// peer try again first, otherwise we'll try again after a delay
var delay time.Duration
if strings.Compare(string(w.peer), string(w.s.LocalPeer())) == -1 {
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
// Random delay to avoid the other side being able to predict when we'll try again
delay = time.Duration((50+rand.Intn(100))<<int(ad.retryCount)) * time.Millisecond
}
dq.UpdateOrAdd(network.AddrDelay{Addr: ad.addr, Delay: delay, ForceDelay: true})
scheduleNextDial()
continue loop
default:
if !w.connected {
// we only add backoff if there has not been a successful connection
// for consistency with the old dialer behavior.
w.s.backf.AddBackoff(w.peer, res.Addr)
}

}

w.dispatchError(ad, res.Err)
Expand Down
13 changes: 13 additions & 0 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,14 @@ func (s *Swarm) close() {
wg.Wait()
}

type errHaveInboundConn struct {
c *Conn
}

func (e errHaveInboundConn) Error() string {
return "have inbound connection"
}

func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
var (
p = tc.RemotePeer()
Expand Down Expand Up @@ -401,6 +409,11 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
// Clear any backoffs
s.backf.Clear(p)

// Cancel the pending dial if it exists.
if dir == network.DirInbound {
s.dsync.CancelActiveDial(p, errHaveInboundConn{c})
}

// Finally, add the peer.
s.conns.Lock()
// Check if we're still online
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {
}

// dialWorkerLoop synchronizes and executes concurrent dials to a single peer
func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) {
w := newDialWorker(s, p, reqch, nil)
func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) {
w := newDialWorker(ctx, s, p, reqch, nil)
w.loop()
}

Expand Down
14 changes: 14 additions & 0 deletions p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,20 @@ func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, ser
select {
case r := <-done:
if r.err != nil {
var errUnrecognized mss.ErrUnrecognizedResponse[protocol.ID]
if errors.As(r.err, &errUnrecognized) && len(u.securityIDs) > 0 {
// We tried to open a connection with our first security ID, and
// the other side gave us something we didn't expect that looks
// like a security ID.
//
// This hints that the error is due to TCP Simultaneous Open.
// Both sides think they are the client and suggest a protocol
// at the beginning. We fail because they suggested a protocol
// that we didn't suggest.
if errUnrecognized.Expected == u.securityIDs[0] && len(errUnrecognized.Actual) > 0 && errUnrecognized.Actual[0] == '/' {
return nil, sec.ErrSimOpen
}
}
return nil, r.err
}
if s := u.getSecurityByID(r.proto); s != nil {
Expand Down
8 changes: 8 additions & 0 deletions p2p/security/noise/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,19 @@ func (t *Transport) SecureInbound(ctx context.Context, insecure net.Conn, p peer
return SessionWithConnState(c, responderEDH.MatchMuxers(false)), err
}

// Detect if the error is due to TCP Simultaneous Open. In that case, both sides will believe they are the client.
func errIsSimOpen(err error) bool {
return err.Error() == "error reading handshake message: noise: message is too short"
}
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved

// SecureOutbound runs the Noise handshake as the initiator.
func (t *Transport) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) {
initiatorEDH := newTransportEDH(t)
c, err := newSecureSession(t, ctx, insecure, p, nil, initiatorEDH, nil, true, true)
if err != nil {
if errIsSimOpen(err) {
return nil, sec.ErrSimOpen
}
return c, err
}
return SessionWithConnState(c, initiatorEDH.MatchMuxers(true)), err
Expand Down
8 changes: 8 additions & 0 deletions p2p/security/tls/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ func (t *Transport) SecureInbound(ctx context.Context, insecure net.Conn, p peer
return cs, err
}

// Detect if the error is due to TCP Simultaneous Open. In that case, both sides will believe they are the client.
func errIsSimOpen(err error) bool {
return err.Error() == "tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.serverHelloMsg"
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
}

// SecureOutbound runs the TLS handshake as a client.
// Note that SecureOutbound will not return an error if the server doesn't
// accept the certificate. This is due to the fact that in TLS 1.3, the client
Expand All @@ -118,6 +123,9 @@ func (t *Transport) SecureOutbound(ctx context.Context, insecure net.Conn, p pee
cs, err := t.handshake(ctx, tls.Client(insecure, config), keyCh)
if err != nil {
insecure.Close()
if errIsSimOpen(err) {
return nil, sec.ErrSimOpen
}
}
return cs, err
}
Expand Down
Loading
Loading