Skip to content

Commit

Permalink
Add recover for setRateLimiters and setCacheOptions
Browse files Browse the repository at this point in the history
 Define StartupPanicMessage and UnrecognizedRemotePluginMessage to pass panic messages back to plugin manager
  • Loading branch information
kaidaguerre committed Aug 7, 2023
1 parent 501fbeb commit ade4e7f
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
5 changes: 2 additions & 3 deletions plugin/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ type GetConfig struct {

// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate
// Deprecated: use RateLimit.MaxConcurrency
MaxConcurrency int
MaxConcurrency int
}

// initialise the GetConfig
Expand All @@ -100,7 +99,7 @@ func (c *GetConfig) initialise(table *Table) {
c.IgnoreConfig = &IgnoreConfig{}
}

// create empty scope values if needed
// create empty tags if needed
if c.Tags == nil {
c.Tags = map[string]string{}
}
Expand Down
19 changes: 17 additions & 2 deletions plugin/plugin_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,15 +316,30 @@ func (p *Plugin) establishMessageStream(stream proto.WrapperPlugin_EstablishMess
return nil
}

func (p *Plugin) setCacheOptions(request *proto.SetCacheOptionsRequest) error {
func (p *Plugin) setCacheOptions(request *proto.SetCacheOptionsRequest) (err error) {
defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("setCacheOptions experienced unhandled exception: %s", helpers.ToError(r).Error())
log.Println("[WARN]", msg)
err = fmt.Errorf(msg)
}
}()

return p.ensureCache(p.buildConnectionSchemaMap(), query_cache.NewQueryCacheOptions(request))
}

// clear current rate limiter definitions and instances and repopulate resolvedRateLimiterDefs using the
// plugin defined rate limiters and any config defined rate limiters
func (p *Plugin) setRateLimiters(request *proto.SetRateLimitersRequest) error {
func (p *Plugin) setRateLimiters(request *proto.SetRateLimitersRequest) (err error) {
log.Printf("[INFO] setRateLimiters")

defer func() {
if r := recover(); r != nil {
msg := fmt.Sprintf("setRateLimiters experienced unhandled exception: %s", helpers.ToError(r).Error())
log.Println("[WARN]", msg)
err = fmt.Errorf(msg)
}
}()
var errors []error
// clear all current rate limiters
p.rateLimiterDefsMut.Lock()
Expand Down
2 changes: 1 addition & 1 deletion plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ func (d *QueryData) buildRowsAsync(ctx context.Context, rowChan chan *proto.Row,
//log.Printf("[INFO] buildRowsAsync acquire semaphore (%s)", d.connectionCallId)
if err := rowSemaphore.Acquire(ctx, 1); err != nil {
log.Printf("[INFO] SEMAPHORE ERROR %s", err)
// TODO does this quit??
// TODO KAI does this quit??
d.errorChan <- err
return
}
Expand Down

0 comments on commit ade4e7f

Please sign in to comment.