Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(integration): add integration test for master lost during syncing sst files. #2691

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tests/gocase/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.9.0 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/sync v0.10.0
golang.org/x/sys v0.27.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 2 additions & 4 deletions tests/gocase/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tklauser/go-sysconf v0.3.14 h1:g5vzr9iPFFz24v2KZXs/pvpvh8/V9Fw6vQK5ZZb78yU=
Expand All @@ -39,10 +37,10 @@ github.com/tklauser/numcpus v0.9.0 h1:lmyCHtANi8aRUgkckBgoDk1nHCux3n2cgkJLXdQGPD
github.com/tklauser/numcpus v0.9.0/go.mod h1:SN6Nq1O3VychhC1npsWostA+oW+VOQTxZrS604NSRyI=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo=
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
57 changes: 57 additions & 0 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,3 +552,60 @@ func TestFullSyncReplication(t *testing.T) {
require.Equal(t, "bar", slaveClient.Get(ctx, "foo").Val())
})
}

func TestSlaveLostMaster(t *testing.T) {
// integration test for #2662 and #2671
ctx := context.Background()

masterSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"max-replication-mb": "1",
"rocksdb.compression": "no",
"rocksdb.write_buffer_size": "1",
"rocksdb.target_file_size_base": "1",
})
defer func() { masterSrv.Close() }()
masterClient := masterSrv.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()
masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err())

replicaSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
"replication-connect-timeout-ms": "5000",
"replication-recv-timeout-ms": "5100",
})
defer func() { replicaSrv.Close() }()
replicaClient := replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())
defer func() { require.NoError(t, replicaClient.Close()) }()
replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err())

proxyCtx, cancelProxy := context.WithCancel(ctx)
newMasterPort := util.SimpleTCPProxy(proxyCtx, t, fmt.Sprintf("127.0.0.1:%d", masterSrv.Port()), true)

masterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 127.0.0.1 %d slave %s",
masterNodeID, masterSrv.Port(), replicaNodeID, replicaSrv.Port(), masterNodeID)
clusterNodesInfo := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383\n%s 127.0.0.1 %d slave %s",
masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), masterNodeID)
unexistNodesInfo := fmt.Sprintf("%s 127.0.0.2 %d master - 0-16383\n%s 127.0.0.1 %d slave %s",
masterNodeID, newMasterPort, replicaNodeID, replicaSrv.Port(), masterNodeID)

require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", masterNodesInfo, "1").Err())
value := strings.Repeat("a", 128*1024)

for i := 0; i < 1024; i++ {
require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), value, 0).Err())
}

require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", clusterNodesInfo, "1").Err())

time.Sleep(2 * time.Second)
cancelProxy()
start := time.Now()
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", unexistNodesInfo, "2").Err())
duration := time.Since(start)
require.Less(t, duration, time.Second*6)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the time out is 5000/5100 but the duration be 6 here 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mapleFU ,

The time duration may not be a precise value due to network.

So I reserved 1 second to reduce flaky failure. 😊

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would a duration really looks like? Would it:

  1. Force greater than 5?
  2. Always less than 6? (I guess this is not guranteed ...)

Copy link
Contributor Author

@LindaSummer LindaSummer Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @mapleFU ,

Yes, I thought it should be in a [5, 6 or longer] range.

But I only got about one second in my own environment and make me confused and not sure whether the case full covers the issue. 🤔

Maybe we can set the upper bound to a conservative value just for integration test. 😊

Best Regards,
Edward

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I only got about one second in my own environment and make me confused and not sure whether the case full covers the issue. 🤔

Thanks! I'll dive in my env later, we can first move forward

}
80 changes: 80 additions & 0 deletions tests/gocase/util/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ package util

import (
"context"
"errors"
"fmt"
"io"
"net"
"regexp"
"strings"
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func FindInfoEntry(rdb *redis.Client, key string, section ...string) string {
Expand Down Expand Up @@ -71,3 +75,79 @@ func Populate(t testing.TB, rdb *redis.Client, prefix string, n, size int) {
_, err := p.Exec(ctx)
require.NoError(t, err)
}

func SimpleTCPProxy(ctx context.Context, t testing.TB, to string, slowdown bool) uint64 {
addr, err := findFreePort()
if err != nil {
t.Fatalf("can't find a free port, %v", err)
}
from := addr.String()

listener, err := net.Listen("tcp", from)
if err != nil {
t.Fatalf("listen to %s failed, err: %v", from, err)
}

copyBytes := func(src, dest io.ReadWriter) func() error {
buffer := make([]byte, 4096)
return func() error {
COPY_LOOP:
for {
select {
case <-ctx.Done():
t.Log("forwarding tcp stream stopped")
break COPY_LOOP
default:
if slowdown {
time.Sleep(time.Millisecond * 100)
}
n, err := src.Read(buffer)
if err != nil {
if errors.Is(err, io.EOF) {
break COPY_LOOP
}
return err
}
_, err = dest.Write(buffer[:n])
if err != nil {
if errors.Is(err, io.EOF) {
break COPY_LOOP
}
return err
}
}
}
return nil
}
}

go func() {
defer listener.Close()
LISTEN_LOOP:
for {
select {
case <-ctx.Done():
break LISTEN_LOOP

default:
conn, err := listener.Accept()
if err != nil {
t.Fatalf("accept conn failed, err: %v", err)
}
dest, err := net.Dial("tcp", to)
if err != nil {
t.Fatalf("accept conn failed, err: %v", err)
}
var errGrp errgroup.Group
errGrp.Go(copyBytes(conn, dest))
errGrp.Go(copyBytes(dest, conn))
err = errGrp.Wait()
if err != nil {
t.Fatalf("forward tcp stream failed, err: %v", err)
}

}
}
}()
return uint64(addr.Port)
}
Loading