Skip to content

Commit

Permalink
refactor(bigquery/storage/managedwriter): simplify signatures (google…
Browse files Browse the repository at this point in the history
…apis#11158)

Another unparam related PR, this one focused on reducing unused params
in the managedwriter and its adapt subpackage.

Followup to googleapis#11149
  • Loading branch information
shollyman authored Nov 21, 2024
1 parent f198452 commit 044c077
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 41 deletions.
19 changes: 5 additions & 14 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
deps = append(deps, foundDesc.ParentFile())
}
// Construct field descriptor for the message.
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
if err != nil {
return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %w", err))
}
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
fields = append(fields, fdp)
} else {
// Wrap the current struct's fields in a TableSchema outer message, and then build the submessage.
Expand All @@ -298,10 +295,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %w", err))
}
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %w", err))
}
fdp := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
fields = append(fields, fdp)
}
} else {
Expand Down Expand Up @@ -329,10 +323,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
}
}
}
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, err)
}
fd := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
fields = append(fields, fd)
}
}
Expand Down Expand Up @@ -411,7 +402,7 @@ func messageDependsOnFile(msg protoreflect.MessageDescriptor, file protoreflect.
// For proto2, we propagate the mode->label annotation as expected.
//
// Messages are always nullable, and repeated fields are as well.
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) {
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) *descriptorpb.FieldDescriptorProto {
name := field.GetName()
var fdp *descriptorpb.FieldDescriptorProto

Expand Down Expand Up @@ -474,7 +465,7 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
}
proto.SetExtension(fdp.Options, storagepb.E_ColumnName, name)
}
return fdp, nil
return fdp
}

// nameRequiresAnnotation determines whether a field name requires unicode-annotation.
Expand Down
6 changes: 3 additions & 3 deletions bigquery/storage/managedwriter/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
// TODO: Determine if/how we should report this case, as we have no viable context for propagating.

// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
pw.writer.processRetry(pw, co, nil, doneErr)
pw.writer.processRetry(pw, nil, doneErr)
}
case nextWrite, ok := <-ch:
if !ok {
Expand All @@ -557,7 +557,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
}
recordStat(metricCtx, AppendResponseErrors, 1)

nextWrite.writer.processRetry(nextWrite, co, nil, err)
nextWrite.writer.processRetry(nextWrite, nil, err)
continue
}
// Record that we did in fact get a response from the backend.
Expand All @@ -573,7 +573,7 @@ func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQue
recordStat(metricCtx, AppendResponseErrors, 1)
respErr := grpcstatus.ErrorProto(status)

nextWrite.writer.processRetry(nextWrite, co, resp, respErr)
nextWrite.writer.processRetry(nextWrite, resp, respErr)

continue
}
Expand Down
4 changes: 2 additions & 2 deletions bigquery/storage/managedwriter/managed_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (ms *ManagedStream) Finalize(ctx context.Context, opts ...gax.CallOption) (
// appendWithRetry handles the details of adding sending an append request on a stream. Appends are sent on a long
// lived bidirectional network stream, with it's own managed context (ms.ctx), and there's a per-request context
// attached to the pendingWrite.
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOption) error {
func (ms *ManagedStream) appendWithRetry(pw *pendingWrite) error {
for {
ms.mu.Lock()
err := ms.err
Expand Down Expand Up @@ -355,7 +355,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...

// processRetry is responsible for evaluating and re-enqueing an append.
// If the append is not retried, it is marked complete.
func (ms *ManagedStream) processRetry(pw *pendingWrite, srcConn *connection, appendResp *storagepb.AppendRowsResponse, initialErr error) {
func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.AppendRowsResponse, initialErr error) {
err := initialErr
for {
pause, shouldRetry := ms.statelessRetryer().Retry(err, pw.attemptCount)
Expand Down
36 changes: 14 additions & 22 deletions bigquery/storage/managedwriter/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,20 @@ var (

// This retry predicate is used for higher level retries, enqueing appends onto to a bidi
// channel and evaluating whether an append should be retried (re-enqueued).
func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
func retryPredicate(err error) bool {
if err == nil {
return
return false
}

s, ok := status.FromError(err)
// non-status based error conditions.
if !ok {
// EOF can happen in the case of connection close.
if errors.Is(err, io.EOF) {
shouldRetry = true
return
return true
}
// All other non-status errors are treated as non-retryable (including context errors).
return
return false
}
switch s.Code() {
case codes.Aborted,
Expand All @@ -56,17 +55,15 @@ func retryPredicate(err error) (shouldRetry, aggressiveBackoff bool) {
codes.FailedPrecondition,
codes.Internal,
codes.Unavailable:
shouldRetry = true
return
return true
case codes.ResourceExhausted:
if strings.HasPrefix(s.Message(), "Exceeds 'AppendRows throughput' quota") {
// Note: internal b/246031522 opened to give this a structured error
// and avoid string parsing. Should be a QuotaFailure or similar.
shouldRetry = true
return
return true
}
}
return
return false
}

// unaryRetryer is for retrying a unary-style operation, like (re)-opening the bidi connection.
Expand All @@ -75,7 +72,7 @@ type unaryRetryer struct {
}

func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) {
shouldRetry, _ := retryPredicate(err)
shouldRetry := retryPredicate(err)
return ur.bo.Pause(), shouldRetry
}

Expand All @@ -86,10 +83,9 @@ type statelessRetryer struct {
mu sync.Mutex // guards r
r *rand.Rand

minBackoff time.Duration
jitter time.Duration
aggressiveFactor int
maxAttempts int
minBackoff time.Duration
jitter time.Duration
maxAttempts int
}

func newStatelessRetryer() *statelessRetryer {
Expand All @@ -101,27 +97,23 @@ func newStatelessRetryer() *statelessRetryer {
}
}

func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
func (sr *statelessRetryer) pause() time.Duration {
jitter := sr.jitter.Nanoseconds()
if jitter > 0 {
sr.mu.Lock()
jitter = sr.r.Int63n(jitter)
sr.mu.Unlock()
}
pause := sr.minBackoff.Nanoseconds() + jitter
if aggressiveBackoff {
pause = pause * int64(sr.aggressiveFactor)
}
return time.Duration(pause)
}

func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, bool) {
if attemptCount >= sr.maxAttempts {
return 0, false
}
shouldRetry, aggressive := retryPredicate(err)
if shouldRetry {
return sr.pause(aggressive), true
if retryPredicate(err) {
return sr.pause(), true
}
return 0, false
}
Expand Down

0 comments on commit 044c077

Please sign in to comment.