diff --git a/pkg/dbutil/common.go b/pkg/dbutil/common.go index e00a817ed..43285b5b3 100644 --- a/pkg/dbutil/common.go +++ b/pkg/dbutil/common.go @@ -387,7 +387,7 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", ColumnName(col.Name.O))) } - query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;", + query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s FORCE INDEX(PRIMARY) WHERE %s;", strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange) log.Debug("checksum", zap.String("sql", query), zap.Reflect("args", args)) diff --git a/pkg/diff/chunk.go b/pkg/diff/chunk.go index 06c2c926f..ee7866788 100644 --- a/pkg/diff/chunk.go +++ b/pkg/diff/chunk.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "strings" + "sync" "time" "github.com/pingcap/errors" @@ -26,6 +27,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb-tools/pkg/utils" + "go.uber.org/atomic" "go.uber.org/zap" ) @@ -322,6 +324,9 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) { var ( lowerValues, upperValues []string latestCount int64 + wg sync.WaitGroup + chunkCh = make(chan []*ChunkRange, len(buckets)+1) + splitErr atomic.Error ) indexColumns := getColumnsFromIndex(index, s.table.info) @@ -363,11 +368,16 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) { if count == 0 { continue } else if count >= 2 { - splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation) - if err != nil { - return nil, errors.Trace(err) - } - chunks = append(chunks, splitChunks...) + wg.Add(1) + go func() { + defer wg.Done() + splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation) + if err != nil && splitErr.Load() == nil { + splitErr.Store(errors.Trace(err)) + } + chunkCh <- splitChunks + }() + } else { chunks = append(chunks, chunk) } @@ -376,6 +386,22 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) { lowerValues = upperValues } + // early fail + err = splitErr.Load() + if err != nil { + return nil, err + } + wg.Wait() + err = splitErr.Load() + if err != nil { + return nil, err + } + + for len(chunkCh) > 0 { + splitChunks := <-chunkCh + chunks = append(chunks, splitChunks...) + } + if len(chunks) != 0 { break } diff --git a/pkg/diff/conn.go b/pkg/diff/conn.go index 777dffe26..7bc046fbf 100644 --- a/pkg/diff/conn.go +++ b/pkg/diff/conn.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/tidb-tools/pkg/dbutil" ) -// CreateDB creates sql.DB used for select data +// CreateDB creates sql.DB and set connection limit func CreateDB(ctx context.Context, dbConfig dbutil.DBConfig, num int) (db *sql.DB, err error) { db, err = dbutil.OpenDB(dbConfig) if err != nil { @@ -44,8 +44,6 @@ func CreateDBForCP(ctx context.Context, dbConfig dbutil.DBConfig) (cpDB *sql.DB, if err != nil { return nil, errors.Errorf("create db connections %+v error %v", dbConfig, err) } - cpDB.SetMaxOpenConns(1) - cpDB.SetMaxIdleConns(1) return cpDB, nil } diff --git a/pkg/diff/diff.go b/pkg/diff/diff.go index e61688cf8..fabbdea95 100644 --- a/pkg/diff/diff.go +++ b/pkg/diff/diff.go @@ -408,6 +408,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, }) update := func() { + checksumLimiter <- struct{}{} ctx1, cancel1 := context.WithTimeout(ctx, dbutil.DefaultTimeout) defer cancel1() @@ -415,6 +416,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, if err1 != nil { log.Warn("update chunk info", zap.Error(err1)) } + <-checksumLimiter } defer func() { @@ -434,7 +436,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, } } } - update() + go update() }() if filterByRand { @@ -447,7 +449,7 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, } chunk.State = checkingState - update() + go update() if t.UseChecksum { // first check the checksum is equal or not @@ -475,6 +477,13 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool, return equal, nil } +var checksumLimiter chan struct{} + +// SetChecksumLimit limits the number of checkpoint DB connections simultaneously +func SetChecksumLimit(num int) { + checksumLimiter = make(chan struct{}, num) +} + // checksumInfo save some information about checksum type checksumInfo struct { checksum int64 diff --git a/sync_diff_inspector/config.go b/sync_diff_inspector/config.go index ff03973a8..9c1a488ac 100644 --- a/sync_diff_inspector/config.go +++ b/sync_diff_inspector/config.go @@ -175,6 +175,9 @@ type Config struct { // how many goroutines are created to check data CheckThreadCount int `toml:"check-thread-count" json:"check-thread-count"` + // how many goroutines are created to save checkpoint + CheckpointThreadCount int `toml:"checkpoint-thread-count" json:"checkpoint-thread-count"` + // set false if want to comapre the data directly UseChecksum bool `toml:"use-checksum" json:"use-checksum"` @@ -228,6 +231,7 @@ func NewConfig() *Config { fs.IntVar(&cfg.ChunkSize, "chunk-size", 1000, "diff check chunk size") fs.IntVar(&cfg.Sample, "sample", 100, "the percent of sampling check") fs.IntVar(&cfg.CheckThreadCount, "check-thread-count", 1, "how many goroutines are created to check data") + fs.IntVar(&cfg.CheckpointThreadCount, "checkpoint-thread-count", 64, "how many goroutines are created to save checkpoint") fs.BoolVar(&cfg.UseChecksum, "use-checksum", true, "set false if want to comapre the data directly") fs.StringVar(&cfg.FixSQLFile, "fix-sql-file", "fix.sql", "the name of the file which saves sqls used to fix different data") fs.BoolVar(&cfg.PrintVersion, "V", false, "print version of sync_diff_inspector") diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index ba2cbcd2f..02c296467 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -107,6 +107,7 @@ func (df *Diff) init(cfg *Config) (err error) { if err = df.CreateDBConn(cfg); err != nil { return errors.Trace(err) } + diff.SetChecksumLimit(cfg.CheckpointThreadCount) if err = df.AdjustTableConfig(cfg); err != nil { return errors.Trace(err) @@ -120,7 +121,7 @@ func (df *Diff) init(cfg *Config) (err error) { return nil } -// CreateDBConn creates db connections for source and target. +// CreateDBConn creates db connections for source and target func (df *Diff) CreateDBConn(cfg *Config) (err error) { for _, source := range cfg.SourceDBCfg { source.Conn, err = diff.CreateDB(df.ctx, source.DBConfig, cfg.CheckThreadCount)