Skip to content

Commit

Permalink
router: small refactoring to remove the need for a batchconn.WriteTo (#…
Browse files Browse the repository at this point in the history
…4651)

WriteTo needs to be given the destination address explicitly.
WriteBatch, on the other end, can either find it in each packet
structure, or rely on the connection's destination. WriteTo is only used
to send BFD packets. It turns out that BFD packets can also very easily
be sent via the regular forwarders that use WriteBtach.

The motivation to do that is to simplify the interface between the
dataplane and the forwarders, in view of supporting multiple underlays
with possibly very different interfaces. So the channels around the
processor tasks seems like a good universal interface.

In passing, also removed a duplicate field. Slightly off-topic but still
in the spirit of noise abatement.

As this is to facilitate the necessary refactoring...
Contributes to #4593
  • Loading branch information
jiceatscion authored Nov 15, 2024
1 parent dd852aa commit 5c3b50d
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 61 deletions.
13 changes: 7 additions & 6 deletions router/control/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,14 @@ func confExternalInterfaces(dp Dataplane, cfg *Config) error {

_, owned := cfg.BR.IFs[ifID]
if !owned {
// XXX The current implementation effectively uses IP/UDP tunnels to create
// the SCION network as an overlay, with forwarding to local hosts being a special case.
// When setting up external interfaces that belong to other routers in the AS, they
// are basically IP/UDP tunnels between the two border routers, and as such is
// configured in the data plane.
// The current implementation effectively uses IP/UDP tunnels to create the SCION
// network as an overlay, with forwarding to local hosts being a special case. When
// setting up external interfaces that belong to other routers in the AS, they are
// basically IP/UDP tunnels between the two border routers. Those are described as a
// link from the internal address of the local router to the internal address of the
// sibling router; and not to the router in the remote AS.
linkInfo.Local.Addr = cfg.BR.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr
linkInfo.Remote.Addr = iface.InternalAddr // i.e. via sibling router.
// For internal BFD always use the default configuration.
linkInfo.BFD = BFD{}
}
Expand Down
73 changes: 54 additions & 19 deletions router/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ type bfdSession interface {
// BatchConn is a connection that supports batch reads and writes.
type BatchConn interface {
ReadBatch(underlayconn.Messages) (int, error)
WriteTo([]byte, netip.AddrPort) (int, error)
WriteBatch(msgs underlayconn.Messages, flags int) (int, error)
Close() error
}
Expand Down Expand Up @@ -177,7 +176,6 @@ type DataPlane struct {
linkTypes map[uint16]topology.LinkType
neighborIAs map[uint16]addr.IA
peerInterfaces map[uint16]uint16
internal BatchConn
internalIP netip.Addr
internalNextHops map[uint16]netip.AddrPort
svc *services
Expand All @@ -193,6 +191,10 @@ type DataPlane struct {

ExperimentalSCMPAuthentication bool

// The forwarding queues. Each is consumed by a forwarder process and fed by
// one bfd sender and the packet processors.
fwQs map[uint16]chan *packet

// The pool that stores all the packet buffers as described in the design document. See
// https://github.com/scionproto/scion/blob/master/doc/dev/design/BorderRouter.rst
// To avoid garbage collection, most the meta-data that is produced during the processing of a
Expand Down Expand Up @@ -334,14 +336,12 @@ func (d *DataPlane) AddInternalInterface(conn BatchConn, ip netip.Addr) error {
if conn == nil {
return emptyValue
}
if d.internal != nil {
return alreadySet
}
if d.interfaces == nil {
d.interfaces = make(map[uint16]BatchConn)
} else if d.interfaces[0] != nil {
return alreadySet
}
d.interfaces[0] = conn
d.internal = conn
d.internalIP = ip
return nil
}
Expand All @@ -361,7 +361,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,
if conn == nil || !src.Addr.IsValid() || !dst.Addr.IsValid() {
return emptyValue
}
err := d.addExternalInterfaceBFD(ifID, conn, src, dst, cfg)
err := d.addExternalInterfaceBFD(ifID, src, dst, cfg)
if err != nil {
return serrors.Wrap("adding external BFD", err, "if_id", ifID)
}
Expand All @@ -381,7 +381,7 @@ func (d *DataPlane) AddExternalInterface(ifID uint16, conn BatchConn,

// AddNeighborIA adds the neighboring IA for a given interface ID. If an IA for
// the given ID is already set, this method will return an error. This can only
// be called on a yet running dataplane.
// be called on a not yet running dataplane.
func (d *DataPlane) AddNeighborIA(ifID uint16, remote addr.IA) error {
d.mtx.Lock()
defer d.mtx.Unlock()
Expand Down Expand Up @@ -439,7 +439,7 @@ func (d *DataPlane) AddRemotePeer(local, remote uint16) error {
}

// AddExternalInterfaceBFD adds the inter AS connection BFD session.
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
func (d *DataPlane) addExternalInterfaceBFD(ifID uint16,
src, dst control.LinkEnd, cfg control.BFD) error {

if *cfg.Disable {
Expand All @@ -459,7 +459,7 @@ func (d *DataPlane) addExternalInterfaceBFD(ifID uint16, conn BatchConn,
PacketsReceived: d.Metrics.BFDPacketsReceived.With(labels),
}
}
s, err := newBFDSend(conn, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
s, err := newBFDSend(d, src.IA, dst.IA, src.Addr, dst.Addr, ifID, d.macFactory())
if err != nil {
return err
}
Expand All @@ -477,7 +477,7 @@ func (d *DataPlane) getInterfaceState(ifID uint16) control.InterfaceState {
return control.InterfaceUp
}

func (d *DataPlane) addBFDController(ifID uint16, s *bfdSend, cfg control.BFD,
func (d *DataPlane) addBFDController(ifID uint16, s bfd.Sender, cfg control.BFD,
metrics bfd.Metrics) error {

if d.bfdSessions == nil {
Expand Down Expand Up @@ -599,7 +599,7 @@ func (d *DataPlane) addNextHopBFD(ifID uint16, src, dst netip.AddrPort, cfg cont
}
}

s, err := newBFDSend(d.internal, d.localIA, d.localIA, src, dst, 0, d.macFactory())
s, err := newBFDSend(d, d.localIA, d.localIA, src, dst, 0, d.macFactory())
if err != nil {
return err
}
Expand All @@ -621,7 +621,6 @@ type RunConfig struct {

func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {
d.mtx.Lock()
d.setRunning()
d.initMetrics()

processorQueueSize := max(
Expand All @@ -630,7 +629,9 @@ func (d *DataPlane) Run(ctx context.Context, cfg *RunConfig) error {

d.initPacketPool(cfg, processorQueueSize)
procQs, fwQs, slowQs := initQueues(cfg, d.interfaces, processorQueueSize)
d.fwQs = fwQs // Shared with BFD senders

d.setRunning()
for ifID, conn := range d.interfaces {
go func(ifID uint16, conn BatchConn) {
defer log.HandlePanic()
Expand Down Expand Up @@ -759,7 +760,7 @@ func (d *DataPlane) runReceiver(ifID uint16, conn BatchConn, cfg *RunConfig,

// Give a new buffer to the msgs elements that have been used in the previous loop.
for i := 0; i < cfg.BatchSize-numReusable; i++ {
p := <-d.packetPool
p := d.getPacketFromPool()
p.reset()
packets[i] = p
msgs[i].Buffers[0] = p.rawPacket
Expand Down Expand Up @@ -805,6 +806,10 @@ func computeProcID(data []byte, numProcRoutines int, hashSeed uint32) (uint32, e
return s % uint32(numProcRoutines), nil
}

func (d *DataPlane) getPacketFromPool() *packet {
return <-d.packetPool
}

func (d *DataPlane) returnPacketToPool(pkt *packet) {
d.packetPool <- pkt
}
Expand Down Expand Up @@ -1821,6 +1826,7 @@ func (p *scionPacketProcessor) handleEgressRouterAlert() disposition {
return pForward
}
if _, ok := p.d.external[p.pkt.egress]; !ok {
// the egress router is not this one.
return pForward
}
*alert = false
Expand Down Expand Up @@ -1990,7 +1996,6 @@ func (p *scionPacketProcessor) process() disposition {
if disp := p.validateEgressUp(); disp != pForward {
return disp
}

if _, ok := p.d.external[egressID]; ok {
// Not ASTransit in
if disp := p.processEgress(); disp != pForward {
Expand Down Expand Up @@ -2316,7 +2321,8 @@ func updateSCIONLayer(rawPkt []byte, s slayers.SCION, buffer gopacket.SerializeB
}

type bfdSend struct {
conn BatchConn
dataPlane *DataPlane
ifID uint16
srcAddr, dstAddr netip.AddrPort
scn *slayers.SCION
ohp *onehop.Path
Expand All @@ -2326,7 +2332,7 @@ type bfdSend struct {
}

// newBFDSend creates and initializes a BFD Sender
func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
func newBFDSend(d *DataPlane, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.AddrPort,
ifID uint16, mac hash.Hash) (*bfdSend, error) {

scn := &slayers.SCION{
Expand Down Expand Up @@ -2373,8 +2379,13 @@ func newBFDSend(conn BatchConn, srcIA, dstIA addr.IA, srcAddr, dstAddr netip.Add
scn.Path = ohp
}

// bfdSend includes a reference to the dataplane. In general this must not be used until the
// dataplane is running. This is ensured by the fact that bfdSend objects are owned by bfd
// sessions, which are started by dataplane.Run() itself.

return &bfdSend{
conn: conn,
dataPlane: d,
ifID: ifID,
srcAddr: srcAddr,
dstAddr: dstAddr,
scn: scn,
Expand Down Expand Up @@ -2405,7 +2416,31 @@ func (b *bfdSend) Send(bfd *layers.BFD) error {
if err != nil {
return err
}
_, err = b.conn.WriteTo(b.buffer.Bytes(), b.dstAddr)

// BfdControllers and fwQs are initialized from the same set of ifIDs. So not finding
// the forwarding queue is an serious internal error. Let that panic.
fwChan := b.dataPlane.fwQs[b.ifID]

p := b.dataPlane.getPacketFromPool()
p.reset()

// TODO: it would be best to serialize directly into the packet buffer. This would require
// a custom SerializeBuffer implementation and some changes to the packet structure. To be
// considered in a future refactoring.
sz := copy(p.rawPacket, b.buffer.Bytes())
p.rawPacket = p.rawPacket[:sz]
if b.ifID == 0 {
// Using the internal interface: must specify the destination address
updateNetAddrFromAddrPort(p.dstAddr, b.dstAddr)
}
// No need to specify pkt.egress. It isn't used downstream from here.
select {
case fwChan <- p:
default:
// We do not care if some BFD packets get bounced under high load. If it becomes a problem,
// the solution is do use BFD's demand-mode. To be considered in a future refactoring.
b.dataPlane.returnPacketToPool(p)
}
return err
}

Expand Down
4 changes: 2 additions & 2 deletions router/dataplane_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestReceiver(t *testing.T) {
dp.setRunning()
dp.initMetrics()
go func() {
dp.runReceiver(0, dp.internal, runConfig, procCh)
dp.runReceiver(0, dp.interfaces[0], runConfig, procCh)
}()
ptrMap := make(map[uintptr]struct{})
for i := 0; i < 21; i++ {
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestForwarder(t *testing.T) {
initialPoolSize := len(dp.packetPool)
dp.setRunning()
dp.initMetrics()
go dp.runForwarder(0, dp.internal, runConfig, fwCh[0])
go dp.runForwarder(0, dp.interfaces[0], runConfig, fwCh[0])

dstAddr := &net.UDPAddr{IP: net.IP{10, 0, 200, 200}}
for i := 0; i < 255; i++ {
Expand Down
33 changes: 16 additions & 17 deletions router/dataplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
l := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Addr: netip.MustParseAddrPort("10.0.0.100:0"),
Expand Down Expand Up @@ -373,10 +373,9 @@ func TestDataPlaneRun(t *testing.T) {
},
).Times(1)
mInternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)
if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
v := b.(*layers.BFD).YourDiscriminator
Expand All @@ -391,7 +390,7 @@ func TestDataPlaneRun(t *testing.T) {

return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := netip.MustParseAddrPort("10.0.200.100:0")
_ = ret.SetKey([]byte("randomkeyformacs"))
Expand All @@ -410,9 +409,9 @@ func TestDataPlaneRun(t *testing.T) {
localAddr := netip.MustParseAddrPort("10.0.200.100:0")
remoteAddr := netip.MustParseAddrPort("10.0.200.200:0")
mInternal := mock_router.NewMockBatchConn(ctrl)
mInternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mInternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
Expand Down Expand Up @@ -464,9 +463,9 @@ func TestDataPlaneRun(t *testing.T) {

mExternal := mock_router.NewMockBatchConn(ctrl)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b == nil {
Expand All @@ -491,7 +490,7 @@ func TestDataPlaneRun(t *testing.T) {
done <- struct{}{}
return 1, nil
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Expand Down Expand Up @@ -552,9 +551,9 @@ func TestDataPlaneRun(t *testing.T) {
).Times(1)
mExternal.EXPECT().ReadBatch(gomock.Any()).Return(0, nil).AnyTimes()

mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).DoAndReturn(
func(data []byte, _ netip.AddrPort) (int, error) {
pkt := gopacket.NewPacket(data,
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).DoAndReturn(
func(msgs underlayconn.Messages, _ int) (int, error) {
pkt := gopacket.NewPacket(msgs[0].Buffers[0],
slayers.LayerTypeSCION, gopacket.Default)

if b := pkt.Layer(layers.LayerTypeBFD); b != nil {
Expand All @@ -569,7 +568,7 @@ func TestDataPlaneRun(t *testing.T) {
}
return 0, fmt.Errorf("no valid BFD message")
}).MinTimes(1)
mExternal.EXPECT().WriteTo(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()
mExternal.EXPECT().WriteBatch(gomock.Any(), gomock.Any()).Return(0, nil).AnyTimes()

local := control.LinkEnd{
IA: addr.MustParseIA("1-ff00:0:1"),
Expand Down
3 changes: 2 additions & 1 deletion router/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewDP(
key []byte) *DataPlane {

dp := &DataPlane{
interfaces: map[uint16]BatchConn{0: internal},
localIA: local,
external: external,
linkTypes: linkTypes,
Expand All @@ -84,10 +85,10 @@ func NewDP(
dispatchedPortStart: uint16(dispatchedPortStart),
dispatchedPortEnd: uint16(dispatchedPortEnd),
svc: &services{m: svc},
internal: internal,
internalIP: netip.MustParseAddr("198.51.100.1"),
Metrics: metrics,
}

if err := dp.SetKey(key); err != nil {
panic(err)
}
Expand Down
16 changes: 0 additions & 16 deletions router/mock_router/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5c3b50d

Please sign in to comment.