Skip to content

Commit

Permalink
enhance: Enable score based balance channel policy
Browse files Browse the repository at this point in the history
current balance channel policy only consider current collection's distribution,
so if all collections has 1 channel, and all channels has been loaded on
same querynode, after querynode num increase, balance channel won't be
triggered.

This PR enable score based balance channel policy, to achieve:
1. distribute all channels evenly across multiple querynodes
2. distribute each collection's channel evenly across multiple
   querynodes.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Dec 2, 2024
1 parent 4c623ce commit 50bd48a
Show file tree
Hide file tree
Showing 14 changed files with 522 additions and 33 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (chanPlan *ChannelAssignPlan) String() string {

type Balance interface {
AssignSegment(ctx context.Context, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
BalanceReplica(ctx context.Context, replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}

Expand Down Expand Up @@ -104,7 +104,7 @@ func (b *RoundRobinBalancer) AssignSegment(ctx context.Context, collectionID int
return ret
}

func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(ctx, c.assignments, c.nodeIDs, false)
plans := suite.roundRobinBalancer.AssignChannel(ctx, 1, c.assignments, c.nodeIDs, false)
suite.ElementsMatch(c.expectPlans, plans)
})
}
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(ctx context.Context,
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range offlineNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID), meta.WithChannelName2Channel(channelName))
plans := b.AssignChannel(ctx, dmChannels, onlineNodes, false)
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, onlineNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -176,7 +176,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(ctx context.Context,

func (b *ChannelLevelScoreBalancer) genSegmentPlan(ctx context.Context, br *balanceReport, replica *meta.Replica, channelName string, onlineNodes []int64) []SegmentAssignPlan {
segmentDist := make(map[int64][]*meta.Segment)
nodeItemsMap := b.convertToNodeItems(br, replica.GetCollectionID(), onlineNodes)
nodeItemsMap := b.convertToNodeItemsBySegment(br, replica.GetCollectionID(), onlineNodes)
if len(nodeItemsMap) == 0 {
return nil
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func (b *ChannelLevelScoreBalancer) genChannelPlan(ctx context.Context, replica
return nil
}

channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
21 changes: 11 additions & 10 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (b *RowCountBasedBalancer) AssignSegment(ctx context.Context, collectionID

// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(ctx context.Context, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
versionRangeFilter := semver.MustParseRange(">2.3.x")
Expand Down Expand Up @@ -311,7 +311,7 @@ func (b *RowCountBasedBalancer) genStoppingChannelPlan(ctx context.Context, repl
channelPlans := make([]ChannelAssignPlan, 0)
for _, nodeID := range roNodes {
dmChannels := b.dist.ChannelDistManager.GetByCollectionAndFilter(replica.GetCollectionID(), meta.WithNodeID2Channel(nodeID))
plans := b.AssignChannel(ctx, dmChannels, rwNodes, false)
plans := b.AssignChannel(ctx, replica.GetCollectionID(), dmChannels, rwNodes, false)
for i := range plans {
plans[i].From = nodeID
plans[i].Replica = replica
Expand Down Expand Up @@ -349,7 +349,7 @@ func (b *RowCountBasedBalancer) genChannelPlan(ctx context.Context, br *balanceR
return nil
}

channelPlans := b.AssignChannel(ctx, channelsToMove, nodeWithLessChannel, false)
channelPlans := b.AssignChannel(ctx, replica.GetCollectionID(), channelsToMove, nodeWithLessChannel, false)
for i := range channelPlans {
channelPlans[i].From = channelPlans[i].Channel.Node
channelPlans[i].Replica = replica
Expand Down
Loading

0 comments on commit 50bd48a

Please sign in to comment.