Skip to content

Commit

Permalink
try concurrently split chunk
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Mar 10, 2021
1 parent e413006 commit 3309ffc
Showing 1 changed file with 31 additions and 5 deletions.
36 changes: 31 additions & 5 deletions pkg/diff/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/utils"
"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -322,6 +324,9 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) {
var (
lowerValues, upperValues []string
latestCount int64
wg sync.WaitGroup
chunkCh = make(chan []*ChunkRange, len(buckets)+1)
splitErr atomic.Error
)

indexColumns := getColumnsFromIndex(index, s.table.info)
Expand Down Expand Up @@ -363,11 +368,16 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) {
if count == 0 {
continue
} else if count >= 2 {
splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation)
if err != nil {
return nil, errors.Trace(err)
}
chunks = append(chunks, splitChunks...)
wg.Add(1)
go func() {
defer wg.Done()
splitChunks, err := splitRangeByRandom(s.table.Conn, chunk, int(count), s.table.Schema, s.table.Table, indexColumns, s.limits, s.collation)
if err != nil && splitErr.Load() == nil {
splitErr.Store(errors.Trace(err))
}
chunkCh <- splitChunks
}()

} else {
chunks = append(chunks, chunk)
}
Expand All @@ -376,6 +386,22 @@ func (s *bucketSpliter) getChunksByBuckets() (chunks []*ChunkRange, err error) {
lowerValues = upperValues
}

// early fail
err = splitErr.Load()
if err != nil {
return nil, err
}
wg.Wait()
err = splitErr.Load()
if err != nil {
return nil, err
}

for len(chunkCh) > 0 {
splitChunks := <-chunkCh
chunks = append(chunks, splitChunks...)
}

if len(chunks) != 0 {
break
}
Expand Down

0 comments on commit 3309ffc

Please sign in to comment.