Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync_diff_inspector: Table match checker #685

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ func (df *Diff) init(ctx context.Context, cfg *config.Config) (err error) {
return errors.Trace(err)
}

// check the upstream's table whether some tables are not matched with that from tableDiffs.
newDiffTables, passed := df.upstream.CheckTablesMatched(df.report)
if !passed {
df.upstream.UpdateTables(newDiffTables)
df.downstream.UpdateTables(newDiffTables)
} else {
log.Info("table match check passed!!")
}

df.workSource = df.pickSource(ctx)
df.FixSQLDir = cfg.Task.FixDir
df.CheckpointDir = cfg.Task.CheckpointDir
Expand Down
68 changes: 59 additions & 9 deletions sync_diff_inspector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
// Fail means not all data or struct of tables are equal
Fail = "fail"
Error = "error"
Warn = "warn"
)

// ReportConfig stores the config information for the user
Expand Down Expand Up @@ -74,18 +75,24 @@ type ChunkResult struct {
RowsDelete int `json:"rows-delete"` // `RowsDelete` is the number of rows needed to delete
}

type MissingTables struct {
MissingTargetTables []string `json:"missing-target-tables"`
MissingSourceTables []string `json:"missing-source-tables"`
}

// Report saves the check results.
type Report struct {
sync.RWMutex
Result string `json:"-"` // Result is pass or fail
PassNum int32 `json:"-"` // The pass number of tables
FailedNum int32 `json:"-"` // The failed number of tables
TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult`
StartTime time.Time `json:"start-time"`
Duration time.Duration `json:"time-duration"`
TotalSize int64 `json:"-"` // Total size of the checked tables
SourceConfig [][]byte `json:"-"`
TargetConfig []byte `json:"-"`
Result string `json:"-"` // Result is pass or fail
PassNum int32 `json:"-"` // The pass number of tables
FailedNum int32 `json:"-"` // The failed number of tables
TableResults map[string]map[string]*TableResult `json:"table-results"` // TableResult saved the map of `schema` => `table` => `tableResult`
StartTime time.Time `json:"start-time"`
Duration time.Duration `json:"time-duration"`
TotalSize int64 `json:"-"` // Total size of the checked tables
SourceConfig [][]byte `json:"-"`
TargetConfig []byte `json:"-"`
MissingTables MissingTables `json:"missing-tables"`

task *config.TaskConfig `json:"-"`
}
Expand Down Expand Up @@ -220,6 +227,21 @@ func (r *Report) CommitSummary() error {
table.Render()
summaryFile.WriteString(tableString.String())
}

if len(r.MissingTables.MissingSourceTables) > 0 {
summaryFile.WriteString("\nWarn: some tables from source are skipped, because the target has no table to becompared with:")
for _, tableName := range r.MissingTables.MissingSourceTables {
summaryFile.WriteString(fmt.Sprintf("\n\t%s", tableName))
}
}

if len(r.MissingTables.MissingTargetTables) > 0 {
summaryFile.WriteString("\nWarn: some tables from target are skipped, because the source has no table to becompared with:")
for _, tableName := range r.MissingTables.MissingTargetTables {
summaryFile.WriteString(fmt.Sprintf("\n\t%s", tableName))
}
}

duration := r.Duration + time.Since(r.StartTime)
summaryFile.WriteString(fmt.Sprintf("\nTime Cost: %s\n", duration))
summaryFile.WriteString(fmt.Sprintf("Average Speed: %fMB/s\n", float64(r.TotalSize)/(1024.0*1024.0*duration.Seconds())))
Expand Down Expand Up @@ -261,6 +283,20 @@ func (r *Report) Print(w io.Writer) error {
}
summary.WriteString(fmt.Sprintf("You can view the comparision details through '%s/%s'\n", r.task.OutputDir, config.LogFileName))
}

if len(r.MissingTables.MissingSourceTables) > 0 {
summary.WriteString("\nWarn: some tables from source are skipped, because the target has no table to becompared with:\n")
for _, tableName := range r.MissingTables.MissingSourceTables {
summary.WriteString(fmt.Sprintf("\t%s\n", tableName))
}
}

if len(r.MissingTables.MissingTargetTables) > 0 {
summary.WriteString("\nWarn: some tables from target are skipped, because the source has no table to becompared with:\n")
for _, tableName := range r.MissingTables.MissingTargetTables {
summary.WriteString(fmt.Sprintf("\t%s\n", tableName))
}
}
fmt.Fprint(w, summary.String())
return nil
}
Expand All @@ -271,9 +307,23 @@ func NewReport(task *config.TaskConfig) *Report {
TableResults: make(map[string]map[string]*TableResult),
Result: Pass,
task: task,
MissingTables: MissingTables{
MissingTargetTables: make([]string, 0),
MissingSourceTables: make([]string, 0),
},
}
}

func (r *Report) AddMissingTargetTable(table string) {
r.Result = Warn
r.MissingTables.MissingTargetTables = append(r.MissingTables.MissingTargetTables, table)
}

func (r *Report) AddMissingSourceTable(table string) {
r.Result = Warn
r.MissingTables.MissingSourceTables = append(r.MissingTables.MissingSourceTables, table)
}

func (r *Report) Init(tableDiffs []*common.TableDiff, sourceConfig [][]byte, targetConfig []byte) {
r.StartTime = time.Now()
r.SourceConfig = sourceConfig
Expand Down
24 changes: 18 additions & 6 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/sync_diff_inspector/config"
"github.com/pingcap/tidb-tools/sync_diff_inspector/report"
"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
"github.com/pingcap/tidb-tools/sync_diff_inspector/splitter"
"github.com/pingcap/tidb-tools/sync_diff_inspector/utils"
Expand Down Expand Up @@ -62,6 +63,10 @@ type MySQLSources struct {
tableDiffs []*common.TableDiff

sourceTablesMap map[string][]*common.TableShardSource

// only for check
targetUniqueTableMap map[string]struct{}
sourceTablesAfterRoute map[string][]string
}

func getMatchedSourcesForTable(sourceTablesMap map[string][]*common.TableShardSource, table *common.TableDiff) []*common.TableShardSource {
Expand Down Expand Up @@ -142,6 +147,14 @@ func (s *MySQLSources) GetTables() []*common.TableDiff {
return s.tableDiffs
}

func (s *MySQLSources) UpdateTables(tableDiffs []*common.TableDiff) {
s.tableDiffs = tableDiffs
}

func (s *MySQLSources) CheckTablesMatched(report *report.Report) ([]*common.TableDiff, bool) {
return checkTableMatched(s.targetUniqueTableMap, s.sourceTablesAfterRoute, s.tableDiffs, report)
}

func (s *MySQLSources) GenerateFixSQL(t DMLType, upstreamData, downstreamData map[string]*dbutil.ColumnData, tableIndex int) string {
switch t {
case Insert:
Expand Down Expand Up @@ -292,7 +305,7 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*
}

// only used for check
sourceTablesAfterRoute := make(map[string]struct{})
sourceTablesAfterRoute := make(map[string][]string)

for i, sourceDB := range ds {
sourceSchemas, err := dbutil.GetSchemas(ctx, sourceDB.Conn)
Expand Down Expand Up @@ -323,7 +336,7 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*
// get all tables from all source db instance
if f.MatchTable(targetSchema, targetTable) {
// if match the filter, we should respect it and check target has this table later.
sourceTablesAfterRoute[uniqueId] = struct{}{}
sourceTablesAfterRoute[uniqueId] = append(sourceTablesAfterRoute[uniqueId], utils.UniqueID(schema, table))
}
if _, ok := targetUniqueTableMap[uniqueId]; !ok {
continue
Expand Down Expand Up @@ -355,13 +368,12 @@ func NewMySQLSources(ctx context.Context, tableDiffs []*common.TableDiff, ds []*

}

if err := checkTableMatched(targetUniqueTableMap, sourceTablesAfterRoute); err != nil {
return nil, errors.Annotatef(err, "please make sure the filter is correct.")
}

mss := &MySQLSources{
tableDiffs: tableDiffs,
sourceTablesMap: sourceTablesMap,

targetUniqueTableMap: targetUniqueTableMap,
sourceTablesAfterRoute: sourceTablesAfterRoute,
}
return mss, nil
}
37 changes: 28 additions & 9 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb-tools/pkg/filter"
tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb-tools/sync_diff_inspector/config"
"github.com/pingcap/tidb-tools/sync_diff_inspector/report"
"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
"github.com/pingcap/tidb-tools/sync_diff_inspector/splitter"
"github.com/pingcap/tidb-tools/sync_diff_inspector/utils"
Expand Down Expand Up @@ -93,6 +94,13 @@ type Source interface {
// GetTables represents the tableDiffs.
GetTables() []*common.TableDiff

// UpdateTables reset the tableDiffs.
UpdateTables([]*common.TableDiff)

// check the upstream's table whether some tables are not matched with that from tableDiffs.
// and return new tablediffs.
CheckTablesMatched(*report.Report) ([]*common.TableDiff, bool)

// GetSourceStructInfo get the source table info from a given target table
GetSourceStructInfo(context.Context, int) ([]*model.TableInfo, error)

Expand Down Expand Up @@ -374,21 +382,32 @@ type RangeIterator interface {
Close()
}

func checkTableMatched(targetMap map[string]struct{}, sourceMap map[string]struct{}) error {
func checkTableMatched(targetMap map[string]struct{}, sourceMap map[string][]string, tableDiffs []*common.TableDiff, report *report.Report) ([]*common.TableDiff, bool) {
newTableDiffs := make([]*common.TableDiff, 0, len(tableDiffs))
passed := true
// check target exists but source not found
for tableDiff := range targetMap {
for _, tableDiff := range tableDiffs {
// target table have all passed in tableFilter
if _, ok := sourceMap[tableDiff]; !ok {
return errors.Errorf("the source has no table to be compared. target-table is `%s`", tableDiff)
tableName := utils.UniqueID(tableDiff.Schema, tableDiff.Table)
if _, ok := sourceMap[tableName]; !ok {
log.Warn("the source has no table to be compared", zap.String("target-table", tableName))
report.AddMissingTargetTable(tableName)
passed = false
} else {
newTableDiffs = append(newTableDiffs, tableDiff)
}
}
// check source exists but target not found
for tableDiff := range sourceMap {
for tableName, sourceNames := range sourceMap {
// need check source table have passd in tableFilter here
if _, ok := targetMap[tableDiff]; !ok {
return errors.Errorf("the target has no table to be compared. source-table is `%s`", tableDiff)
if _, ok := targetMap[tableName]; !ok {
for _, sourceName := range sourceNames {
log.Warn("the target has no table to be compared", zap.String("source-table", sourceName))
report.AddMissingSourceTable(sourceName)
passed = false
}
}
}
log.Info("table match check passed!!")
return nil

return newTableDiffs, passed
}
54 changes: 39 additions & 15 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb-tools/sync_diff_inspector/chunk"
"github.com/pingcap/tidb-tools/sync_diff_inspector/config"
"github.com/pingcap/tidb-tools/sync_diff_inspector/report"
"github.com/pingcap/tidb-tools/sync_diff_inspector/source/common"
"github.com/pingcap/tidb-tools/sync_diff_inspector/splitter"
"github.com/pingcap/tidb-tools/sync_diff_inspector/utils"
Expand Down Expand Up @@ -911,19 +912,42 @@ func TestInitTables(t *testing.T) {

func TestCheckTableMatched(t *testing.T) {
tmap := make(map[string]struct{})
smap := make(map[string]struct{})

tmap["1"] = struct{}{}
tmap["2"] = struct{}{}

smap["1"] = struct{}{}
smap["2"] = struct{}{}
require.NoError(t, checkTableMatched(tmap, smap))

delete(smap, "1")
require.Contains(t, checkTableMatched(tmap, smap).Error(), "the source has no table to be compared. target-table")

delete(tmap, "1")
smap["1"] = struct{}{}
require.Contains(t, checkTableMatched(tmap, smap).Error(), "the target has no table to be compared. source-table")
smap := make(map[string][]string)

tmap["`1`.`1`"] = struct{}{}
tmap["`2`.`2`"] = struct{}{}

smap["`1`.`1`"] = []string{"1", "`1`.`1`"}
smap["`2`.`2`"] = []string{"2"}

r := report.NewReport(nil)
tableDiff := []*common.TableDiff{
{Schema: "1", Table: "1"},
{Schema: "2", Table: "2"},
}
newTableDiff, passed := checkTableMatched(tmap, smap, tableDiff, r)
require.True(t, passed)
require.Equal(t, len(newTableDiff), 2)
require.Equal(t, len(r.MissingTables.MissingSourceTables), 0)
require.Equal(t, len(r.MissingTables.MissingTargetTables), 0)

r = report.NewReport(nil)
delete(smap, "`1`.`1`")
newTableDiff, passed = checkTableMatched(tmap, smap, tableDiff, r)
require.False(t, passed)
require.Equal(t, len(newTableDiff), 1)
require.Equal(t, len(r.MissingTables.MissingSourceTables), 0)
require.Equal(t, len(r.MissingTables.MissingTargetTables), 1)

tableDiff = []*common.TableDiff{
{Schema: "2", Table: "2"},
}
r = report.NewReport(nil)
delete(tmap, "`1`.`1`")
smap["`1`.`1`"] = []string{"1", "`1`.`1`"}
newTableDiff, passed = checkTableMatched(tmap, smap, tableDiff, r)
require.False(t, passed)
require.Equal(t, len(newTableDiff), 1)
require.Equal(t, len(r.MissingTables.MissingSourceTables), 2)
require.Equal(t, len(r.MissingTables.MissingTargetTables), 0)
}
Loading