Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add online checker #425

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", ColumnName(col.Name.O)))
}

query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
// TiDB EXPLAIN is wrong ,need tidb fixed
// FORCE PRIMARY INDEX, NOT better solution, Because maybe unique index
query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s FORCE INDEX(PRIMARY) WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
log.Debug("checksum", zap.String("sql", query), zap.Reflect("args", args))

Expand Down
31 changes: 24 additions & 7 deletions pkg/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type TableDiff struct {
// size of the split chunk
ChunkSize int `json:"chunk-size"`

FailedRetryTimes int `json:"failed-retry-times"`
FailedRetrySleep int `json:"failed-retry-sleep"`

// sampling check percent, for example 10 means only check 10% data
Sample int `json:"sample"`

Expand Down Expand Up @@ -271,6 +274,8 @@ func (t *TableDiff) CheckTableData(ctx context.Context) (equal bool, err error)
checkWg.Add(1)
go func(j int) {
defer checkWg.Done()
// check chunk data equal
// if chunk data checksum isn't consistent, then retry failedRetryTimes and sleep failedRetrySleep
t.checkChunksDataEqual(ctx, t.Sample < 100 && !fromCheckpoint, checkWorkerCh[j], checkResultCh)
}(i)
}
Expand Down Expand Up @@ -450,13 +455,25 @@ func (t *TableDiff) checkChunkDataEqual(ctx context.Context, filterByRand bool,
update()

if t.UseChecksum {
// first check the checksum is equal or not
equal, err = t.compareChecksum(ctx, chunk)
if err != nil {
return false, errors.Trace(err)
}
if equal {
return true, nil
// if chunk data checksum isn't consistent, then retry failedRetryTimes and sleep failedRetrySleep
for i := 0; i < (t.FailedRetryTimes + 1); i++ {
equal, err = t.compareChecksum(ctx, chunk)
if err != nil {
return false, errors.Trace(err)
}
if equal {
return true, nil
} else {
// sleep time and retry
log.Warn("schema table checksum failed, failed retry, please waiting",
zap.String("downstream schema", t.TargetTable.Schema),
zap.String("downstream table", t.TargetTable.Table),
zap.String("where", dbutil.ReplacePlaceholder(chunk.Where, chunk.Args)),
zap.Int("retry-counts", i),
zap.Duration("sleep", time.Duration(t.FailedRetrySleep)*time.Second))
time.Sleep(time.Duration(t.FailedRetrySleep) * time.Second)
continue
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions sync_diff_inspector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ type Config struct {
// size of the split chunk
ChunkSize int `toml:"chunk-size" json:"chunk-size"`

// Chunk check failed, retry times
FailedRetryTimes int `toml:"failed-retry-times" json:"failed-retry-times"`

// Chunk retry sleep
FailedRetrySleep int `toml:"failed-retry-sleep" json:"failed-retry-sleep"`

// sampling check percent, for example 10 means only check 10% data
Sample int `toml:"sample-percent" json:"sample-percent"`

Expand Down Expand Up @@ -299,6 +305,15 @@ func (c *Config) checkConfig() bool {
return false
}

if c.FailedRetryTimes <= 0 {
wentaojin marked this conversation as resolved.
Show resolved Hide resolved
log.Error("failed-retry-times must greater than 0!")
return false
}

if c.FailedRetrySleep <= 0 {
log.Error("failed-retry-sleep must greater than 0!")
return false
}
if len(c.DMAddr) != 0 {
u, err := url.Parse(c.DMAddr)
if err != nil || u.Scheme == "" || u.Host == "" {
Expand Down
10 changes: 8 additions & 2 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Diff struct {
sourceDBs map[string]DBConfig
targetDB DBConfig
chunkSize int
failedRetryTimes int
failedRetrySleep int
sample int
checkThreadCount int
useChecksum bool
Expand Down Expand Up @@ -66,6 +68,8 @@ func NewDiff(ctx context.Context, cfg *Config) (diff *Diff, err error) {
diff = &Diff{
sourceDBs: make(map[string]DBConfig),
chunkSize: cfg.ChunkSize,
failedRetryTimes: cfg.FailedRetryTimes,
failedRetrySleep: cfg.FailedRetrySleep,
sample: cfg.Sample,
checkThreadCount: cfg.CheckThreadCount,
useChecksum: cfg.UseChecksum,
Expand Down Expand Up @@ -598,8 +602,10 @@ func (df *Diff) Equal() (err error) {
}

td := &diff.TableDiff{
SourceTables: sourceTables,
TargetTable: targetTableInstance,
SourceTables: sourceTables,
TargetTable: targetTableInstance,
FailedRetryTimes: df.failedRetryTimes,
FailedRetrySleep: df.failedRetrySleep,

IgnoreColumns: table.IgnoreColumns,

Expand Down