diff --git a/sync_diff_inspector/config/config.go b/sync_diff_inspector/config/config.go index 6bfdde8e2..3bcf7c8eb 100644 --- a/sync_diff_inspector/config/config.go +++ b/sync_diff_inspector/config/config.go @@ -52,6 +52,11 @@ const ( UnifiedTimeZone string = "+0:00" ) +const ( + AggregateOpBITXOR = "BIT_XOR" + AggregateOpSUM = "SUM" +) + // TableConfig is the config of table. type TableConfig struct { // table's filter to tell us which table should adapt to this config. @@ -363,6 +368,8 @@ type Config struct { ExportFixSQL bool `toml:"export-fix-sql" json:"export-fix-sql"` // only check table struct without table data. CheckStructOnly bool `toml:"check-struct-only" json:"check-struct-only"` + // aggregate operator, default value is `SUM` + AggregateOp string `toml:"aggregate-op"` // DMAddr is dm-master's address, the format should like "http://127.0.0.1:8261" DMAddr string `toml:"dm-addr" json:"dm-addr"` // DMTask string `toml:"dm-task" json:"dm-task"` @@ -401,6 +408,8 @@ func NewConfig() *Config { fs.BoolVar(&cfg.ExportFixSQL, "export-fix-sql", true, "set true if want to compare rows or set to false will only compare checksum") fs.BoolVar(&cfg.CheckStructOnly, "check-struct-only", false, "ignore check table's data") + fs.StringVarP(&cfg.AggregateOp, "aggregate-op", "", AggregateOpSUM, "aggregate operator, default value is `SUM`") + _ = fs.MarkHidden("aggregate-op") fs.SortFlags = false return cfg } @@ -582,11 +591,23 @@ func (c *Config) Init() (err error) { return nil } +func checkAggregateOp(op string) bool { + switch op { + case AggregateOpBITXOR, AggregateOpSUM: + return true + } + return false +} + func (c *Config) CheckConfig() bool { if c.CheckThreadCount <= 0 { log.Error("check-thread-count must greater than 0!") return false } + if !checkAggregateOp(c.AggregateOp) { + log.Error("illegal aggregate operator. please select one from [BIT_XOR|SUM]") + return false + } if len(c.DMAddr) != 0 { u, err := url.Parse(c.DMAddr) if err != nil || u.Scheme == "" || u.Host == "" { diff --git a/sync_diff_inspector/config/config_test.go b/sync_diff_inspector/config/config_test.go index 7c12c260b..f0817d372 100644 --- a/sync_diff_inspector/config/config_test.go +++ b/sync_diff_inspector/config/config_test.go @@ -50,7 +50,7 @@ func TestParseConfig(t *testing.T) { // we might not use the same config to run this test. e.g. MYSQL_PORT can be 4000 require.JSONEq(t, cfg.String(), - "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}") + "{\"check-thread-count\":4,\"split-thread-count\":5,\"export-fix-sql\":true,\"check-struct-only\":false,\"dm-addr\":\"\",\"dm-task\":\"\",\"data-sources\":{\"mysql1\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql2\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"mysql3\":{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null},\"tidb0\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null}},\"routes\":{\"rule1\":{\"schema-pattern\":\"test_*\",\"table-pattern\":\"t_*\",\"target-schema\":\"test\",\"target-table\":\"t\"},\"rule2\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test2\",\"target-table\":\"t2\"},\"rule3\":{\"schema-pattern\":\"test2_*\",\"table-pattern\":\"t2_*\",\"target-schema\":\"test\",\"target-table\":\"t\"}},\"table-configs\":{\"config1\":{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}},\"task\":{\"source-instances\":[\"mysql1\",\"mysql2\",\"mysql3\"],\"source-routes\":null,\"target-instance\":\"tidb0\",\"target-check-tables\":[\"schema*.table*\",\"!c.*\",\"test2.t2\"],\"target-configs\":[\"config1\"],\"output-dir\":\"/tmp/output/config\",\"SourceInstances\":[{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule2\"],\"Router\":{\"Selector\":{}},\"Conn\":null},{\"host\":\"127.0.0.1\",\"port\":3306,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":[\"rule1\",\"rule3\"],\"Router\":{\"Selector\":{}},\"Conn\":null}],\"TargetInstance\":{\"host\":\"127.0.0.1\",\"port\":4000,\"user\":\"root\",\"password\":\"******\",\"sql-mode\":\"\",\"snapshot\":\"\",\"security\":null,\"route-rules\":null,\"Router\":{\"Selector\":{}},\"Conn\":null},\"TargetTableConfigs\":[{\"target-tables\":[\"schema*.table*\",\"test2.t2\"],\"Schema\":\"\",\"Table\":\"\",\"ConfigIndex\":0,\"HasMatched\":false,\"IgnoreColumns\":[\"\",\"\"],\"Fields\":[\"\"],\"Range\":\"age \\u003e 10 AND age \\u003c 20\",\"TargetTableInfo\":null,\"Collation\":\"\",\"chunk-size\":0}],\"TargetCheckTables\":[{},{},{}],\"FixDir\":\"/tmp/output/config/fix-on-tidb0\",\"CheckpointDir\":\"/tmp/output/config/checkpoint\",\"HashFile\":\"\"},\"AggregateOp\":\"SUM\",\"ConfigFile\":\"config_sharding.toml\",\"PrintVersion\":false}") hash, err := cfg.Task.ComputeConfigHash() require.NoError(t, err) require.Equal(t, hash, "c080f9894ec24aadb4aaec1109cd1951454f09a1233f2034bc3b06e0903cb289") diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index 965a66ae7..10dad5bff 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -81,18 +81,21 @@ type Diff struct { cp *checkpoints.Checkpoint startRange *splitter.RangeInfo report *report.Report + + checksumAggregateOp string } // NewDiff returns a Diff instance. func NewDiff(ctx context.Context, cfg *config.Config) (diff *Diff, err error) { diff = &Diff{ - checkThreadCount: cfg.CheckThreadCount, - splitThreadCount: cfg.SplitThreadCount, - exportFixSQL: cfg.ExportFixSQL, - ignoreDataCheck: cfg.CheckStructOnly, - sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer), - cp: new(checkpoints.Checkpoint), - report: report.NewReport(&cfg.Task), + checkThreadCount: cfg.CheckThreadCount, + splitThreadCount: cfg.SplitThreadCount, + exportFixSQL: cfg.ExportFixSQL, + ignoreDataCheck: cfg.CheckStructOnly, + sqlCh: make(chan *ChunkDML, splitter.DefaultChannelBuffer), + cp: new(checkpoints.Checkpoint), + report: report.NewReport(&cfg.Task), + checksumAggregateOp: cfg.AggregateOp, } if err = diff.init(ctx, cfg); err != nil { diff.Close() @@ -573,9 +576,9 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli wg.Add(1) go func() { defer wg.Done() - upstreamInfo = df.upstream.GetCountAndCrc32(ctx, tableRange) + upstreamInfo = df.upstream.GetCountAndCrc32(ctx, tableRange, df.checksumAggregateOp) }() - downstreamInfo = df.downstream.GetCountAndCrc32(ctx, tableRange) + downstreamInfo = df.downstream.GetCountAndCrc32(ctx, tableRange, df.checksumAggregateOp) wg.Wait() if upstreamInfo.Err != nil { diff --git a/sync_diff_inspector/source/mysql_shard.go b/sync_diff_inspector/source/mysql_shard.go index 004b8195e..7b3f36789 100644 --- a/sync_diff_inspector/source/mysql_shard.go +++ b/sync_diff_inspector/source/mysql_shard.go @@ -93,7 +93,7 @@ func (s *MySQLSources) Close() { } } -func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo { +func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo, op string) *ChecksumInfo { beginTime := time.Now() table := s.tableDiffs[tableRange.GetTableIndex()] chunk := tableRange.GetChunk() @@ -103,7 +103,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte for _, ms := range matchSources { go func(ms *common.TableShardSource) { - count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args) + count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args, op) infoCh <- &ChecksumInfo{ Checksum: checksum, Count: count, diff --git a/sync_diff_inspector/source/source.go b/sync_diff_inspector/source/source.go index c7c235f41..eabdc411b 100644 --- a/sync_diff_inspector/source/source.go +++ b/sync_diff_inspector/source/source.go @@ -82,7 +82,7 @@ type Source interface { GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error) // GetCountAndCrc32 gets the crc32 result and the count from given range. - GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo + GetCountAndCrc32(context.Context, *splitter.RangeInfo, string) *ChecksumInfo // GetRowsIterator gets the row data iterator from given range. GetRowsIterator(context.Context, *splitter.RangeInfo) (RowDataIterator, error) diff --git a/sync_diff_inspector/source/source_test.go b/sync_diff_inspector/source/source_test.go index 1bcc9fb01..7817caa72 100644 --- a/sync_diff_inspector/source/source_test.go +++ b/sync_diff_inspector/source/source_test.go @@ -184,7 +184,7 @@ func TestTiDBSource(t *testing.T) { require.Equal(t, n, tableCase.rangeInfo.GetTableIndex()) countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456) mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows) - checksum := tidb.GetCountAndCrc32(ctx, tableCase.rangeInfo) + checksum := tidb.GetCountAndCrc32(ctx, tableCase.rangeInfo, config.AggregateOpBITXOR) require.NoError(t, checksum.Err) require.Equal(t, checksum.Count, int64(123)) require.Equal(t, checksum.Checksum, int64(456)) @@ -397,7 +397,7 @@ func TestMysqlShardSources(t *testing.T) { mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows) } - checksum := shard.GetCountAndCrc32(ctx, tableCase.rangeInfo) + checksum := shard.GetCountAndCrc32(ctx, tableCase.rangeInfo, config.AggregateOpBITXOR) require.NoError(t, checksum.Err) require.Equal(t, checksum.Count, int64(len(dbs))) require.Equal(t, checksum.Checksum, resChecksum) diff --git a/sync_diff_inspector/source/tidb.go b/sync_diff_inspector/source/tidb.go index 86ebdf826..05867efb4 100644 --- a/sync_diff_inspector/source/tidb.go +++ b/sync_diff_inspector/source/tidb.go @@ -120,13 +120,13 @@ func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo func (s *TiDBSource) Close() { s.dbConn.Close() } -func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo { +func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo, op string) *ChecksumInfo { beginTime := time.Now() table := s.tableDiffs[tableRange.GetTableIndex()] chunk := tableRange.GetChunk() matchSource := getMatchSource(s.sourceTableMap, table) - count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args) + count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args, op) cost := time.Since(beginTime) return &ChecksumInfo{ diff --git a/sync_diff_inspector/utils/utils.go b/sync_diff_inspector/utils/utils.go index ed27b0917..0305616f3 100644 --- a/sync_diff_inspector/utils/utils.go +++ b/sync_diff_inspector/utils/utils.go @@ -744,7 +744,8 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string) } // GetCountAndCRC32Checksum returns checksum code and count of some data by given condition -func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) { +// op can be [BIT_XOR|SUM] +func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}, op string) (int64, int64, error) { /* calculate CRC32 checksum and count example: mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0; @@ -770,8 +771,8 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name)) } - query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", - strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) + query := fmt.Sprintf("SELECT COUNT(*) as CNT, %s(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;", + op, strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange) log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args)) var count sql.NullInt64 diff --git a/sync_diff_inspector/utils/utils_test.go b/sync_diff_inspector/utils/utils_test.go index cc06f73da..bd070d08a 100644 --- a/sync_diff_inspector/utils/utils_test.go +++ b/sync_diff_inspector/utils/utils_test.go @@ -243,7 +243,7 @@ func TestGetCountAndCRC32Checksum(t *testing.T) { mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)) - count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"}) + count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"}, "BIT_XOR") require.NoError(t, err) require.Equal(t, count, int64(123)) require.Equal(t, checksum, int64(456))