Skip to content

Commit

Permalink
fix: when register branch session, global session can not change (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
dk-lockdown authored Jun 12, 2022
1 parent 5024e2f commit c755946
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 37 deletions.
9 changes: 4 additions & 5 deletions pkg/core/distributed_transaction_manger.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,11 @@ func (manager *DistributedTransactionManager) processNextGlobalSession(ctx conte
return true
}
if newGlobalSession.Status == api.Begin {
if isGlobalSessionTimeout(newGlobalSession) {
_, err := manager.Rollback(context.Background(), newGlobalSession.XID)
if err != nil {
log.Error(err)
}
_, err := manager.Rollback(context.Background(), newGlobalSession.XID)
if err != nil {
log.Error(err)
}
manager.globalSessionQueue.AddAfter(gs, time.Duration(gs.Timeout)*time.Millisecond)
}
if newGlobalSession.Status == api.Committing || newGlobalSession.Status == api.Rollbacking {
bsKeys, err := manager.storageDriver.GetBranchSessionKeys(context.Background(), newGlobalSession.XID)
Expand Down
1 change: 1 addition & 0 deletions pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package errors
import "errors"

var (
GlobalTransactionFinished = errors.New("global session finished")
CouldNotFoundGlobalTransaction = errors.New("could not found global transaction")
CouldNotFoundBranchTransaction = errors.New("could not found branch transaction")
BranchLockAcquireFailed = errors.New("branch lock acquire failed")
Expand Down
88 changes: 56 additions & 32 deletions pkg/storage/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,46 +101,56 @@ func (s *store) AddGlobalSession(ctx context.Context, globalSession *api.GlobalS
}

func (s *store) AddBranchSession(ctx context.Context, branchSession *api.BranchSession) error {
data, err := branchSession.Marshal()
if err != nil {
return err
}

gs, modRevision, err := s.getGlobalSession(ctx, branchSession.XID)
if err != nil {
if errors.Is(err, err2.CouldNotFoundGlobalTransaction) {
return err2.GlobalTransactionFinished
}
return err
}
if gs.Status > api.Begin {
return err2.GlobalTransactionFinished
}

txn := s.client.Txn(ctx)
comparisons := make([]clientv3.Cmp, 0)
ops := make([]clientv3.Op, 0)

comparisons = append(comparisons, clientv3.Compare(clientv3.ModRevision(branchSession.XID), "=", modRevision))
ops = append(ops, clientv3.OpPut(branchSession.BranchID, string(data)))
// branch transactions of global transaction
globalBranchKey := fmt.Sprintf("bs/%s/%d", branchSession.XID, branchSession.BranchSessionID)
ops = append(ops, clientv3.OpPut(globalBranchKey, branchSession.BranchID))

if branchSession.Type == api.AT && branchSession.LockKey != "" {
rowKeys := misc.CollectRowKeys(branchSession.LockKey, branchSession.ResourceID)

txn := s.client.Txn(ctx)
var cmpSlice []clientv3.Cmp
for _, rowKey := range rowKeys {
cmpSlice = append(cmpSlice, notFound(rowKey))
comparisons = append(comparisons, notFound(rowKey))
}
txn = txn.If(cmpSlice...)

var ops []clientv3.Op
for _, rowKey := range rowKeys {
lockKey := fmt.Sprintf("lk/%s/%s", branchSession.XID, rowKey)
ops = append(ops, clientv3.OpPut(lockKey, rowKey))
ops = append(ops, clientv3.OpPut(rowKey, lockKey))
}
txn.Then(ops...)

txnResp, err := txn.Commit()
if err != nil {
return err
}
if !txnResp.Succeeded {
return err2.BranchLockAcquireFailed
}
}

data, err := branchSession.Marshal()
txn = txn.If(comparisons...)
txn.Then(ops...)

txnResp, err := txn.Commit()
if err != nil {
return err
}
_, err = s.client.Put(ctx, branchSession.BranchID, string(data))
if err != nil {
return err
if !txnResp.Succeeded {
return errors.Errorf("register branch session failed, xid: %s, resource id: %s", branchSession.XID, branchSession.ResourceID)
}

// 全局事务关联的事务分支
globalBranchKey := fmt.Sprintf("bs/%s/%d", branchSession.XID, branchSession.BranchSessionID)
_, err = s.client.Put(ctx, globalBranchKey, branchSession.BranchID)
return err
return nil
}

func (s *store) GlobalCommit(ctx context.Context, xid string) (api.GlobalSession_GlobalStatus, error) {
Expand All @@ -155,12 +165,16 @@ func (s *store) GlobalCommit(ctx context.Context, xid string) (api.GlobalSession
gs, err := s.GetGlobalSession(ctx, xid)
if err != nil {
if errors.Is(err, err2.CouldNotFoundGlobalTransaction) {
return api.Finished, nil
return api.Finished, err2.GlobalTransactionFinished
}
return api.Begin, err
}
if gs.Status > api.Begin {
return gs.Status, nil
if gs.Status == api.Committing {
return gs.Status, nil
} else {
return gs.Status, err2.GlobalTransactionFinished
}
}
gs.Status = api.Committing
data, err := gs.Marshal()
Expand Down Expand Up @@ -197,12 +211,16 @@ func (s *store) GlobalRollback(ctx context.Context, xid string) (api.GlobalSessi
gs, err := s.GetGlobalSession(ctx, xid)
if err != nil {
if errors.Is(err, err2.CouldNotFoundGlobalTransaction) {
return api.Finished, nil
return api.Finished, err2.GlobalTransactionFinished
}
return api.Begin, err
}
if gs.Status > api.Begin {
return gs.Status, nil
if gs.Status == api.Rollbacking {
return gs.Status, nil
} else {
return gs.Status, err2.GlobalTransactionFinished
}
}
gs.Status = api.Rollbacking
data, err := gs.Marshal()
Expand Down Expand Up @@ -236,19 +254,25 @@ func (s *store) GlobalRollback(ctx context.Context, xid string) (api.GlobalSessi
}

func (s *store) GetGlobalSession(ctx context.Context, xid string) (*api.GlobalSession, error) {
gs, _, err := s.getGlobalSession(ctx, xid)
return gs, err
}

func (s *store) getGlobalSession(ctx context.Context, xid string) (*api.GlobalSession, int64, error) {
resp, err := s.client.Get(ctx, xid, clientv3.WithSerializable())
if err != nil {
return nil, err
return nil, 0, err
}
if len(resp.Kvs) == 0 {
return nil, err2.CouldNotFoundGlobalTransaction
return nil, 0, err2.CouldNotFoundGlobalTransaction
}

globalSession := &api.GlobalSession{}
err = globalSession.Unmarshal(resp.Kvs[0].Value)
if err != nil {
return nil, err
return nil, 0, err
}
return globalSession, nil
return globalSession, resp.Kvs[0].ModRevision, nil
}

func (s *store) ListGlobalSession(ctx context.Context, applicationID string) ([]*api.GlobalSession, error) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/tm/global_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
ctx "github.com/cectc/hptx/pkg/base/context"
"github.com/cectc/hptx/pkg/config"
"github.com/cectc/hptx/pkg/core"
err2 "github.com/cectc/hptx/pkg/errors"
)

const (
Expand Down Expand Up @@ -130,6 +131,9 @@ func (gtx *DefaultGlobalTransaction) Commit(ctx *ctx.RootContext) error {
for retry > 0 {
status, err := core.GetDistributedTransactionManager().Commit(ctx, gtx.XID)
if err != nil {
if errors.Is(err, err2.GlobalTransactionFinished) {
return err
}
log.Errorf("failed to report global commit [%s],Retry Countdown: %d, reason: %s", gtx.XID, retry, err.Error())
} else {
gtx.Status = status
Expand Down Expand Up @@ -162,6 +166,9 @@ func (gtx *DefaultGlobalTransaction) Rollback(ctx *ctx.RootContext) error {
for retry > 0 {
status, err := core.GetDistributedTransactionManager().Rollback(ctx, gtx.XID)
if err != nil {
if errors.Is(err, err2.GlobalTransactionFinished) {
return err
}
log.Errorf("failed to report global rollback [%s],Retry Countdown: %d, reason: %s", gtx.XID, retry, err.Error())
} else {
gtx.Status = status
Expand Down

0 comments on commit c755946

Please sign in to comment.