Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for Memoized functions to be assigned as column hydrate functions (instead require a wrapper hydrate function) #756

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions plugin/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,32 +76,20 @@ type Column struct {
// explicitly specify the function which populates this data
// - this is only needed if any of the default hydrate functions will NOT return this column
Hydrate HydrateFunc
// if the hydrate function is memoized, populate this property by using the plugin.NamedHydrateFunc function
// this ensures the original plugin name is retained after memoizing the function (which wraps the HydraeFunc in
// an anonymous function to handle cache logic))
// NOTE: only 1 of HydrateFunc and NamedHydrateFunc should be populated
NamedHydrate NamedHydrateFunc
// the default column value
Default interface{}
// a list of transforms to generate the column value
Transform *transform.ColumnTransforms

namedHydrate namedHydrateFunc
}

func (c *Column) initialise() {
if c.Hydrate == nil && c.NamedHydrate.empty() {
if c.Hydrate == nil {
return
}
// populate the named hydrate funcs
if c.NamedHydrate.empty() {
// create a named hydrate func, assuming this function is not memoized
c.NamedHydrate = newNamedHydrateFunc(c.Hydrate)
} else {
// a named hydrate was explicitly specified - probably meaning the hydrate is memoized
// call initialize to populate IsMemoized
c.NamedHydrate.initialize()
// be sure to also set the Hydrate property to the underlying func
c.Hydrate = c.NamedHydrate.Func
}
// create a named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Hydrate)

}

Expand Down Expand Up @@ -229,6 +217,15 @@ func (c *Column) ToColumnValue(val any) (*proto.Column, error) {
return columnValue, nil
}

// validate the column - ensure the hydrate function is not memoized
func (c *Column) validate(t *Table) []string {
log.Printf("[TRACE] validate column %s", c.Name)
if c.Hydrate != nil && isMemoized(c.Hydrate) {
return []string{fmt.Sprintf("table '%s' column '%s' is using a memoized hydrate function\n This is not supported. To use a memoized hydrate function for a column hydrate call, wrap the memoized function inside another hydrate function", t.Name, c.Name)}
}
return nil
}

// QueryColumn is struct storing column name and resolved hydrate name (including List/Get call)
// this is used in the query data when the hydrate function has been resolved
type QueryColumn struct {
Expand Down
4 changes: 2 additions & 2 deletions plugin/get_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type GetConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate
MaxConcurrency int
namedHydrate NamedHydrateFunc
namedHydrate namedHydrateFunc
}

// initialise the GetConfig
Expand Down Expand Up @@ -127,7 +127,7 @@ func (c *GetConfig) initialise(table *Table) {
}
log.Printf("[TRACE] GetConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())

// create a named hydrate func, assuming this function is not memoized
// create a named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Hydrate)
}

Expand Down
31 changes: 25 additions & 6 deletions plugin/hydrate_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@ import (
"context"
"fmt"
"log"
"reflect"
"sync"
"time"

"github.com/turbot/go-kit/helpers"
)

// pointer to (all) memoized functions
// lazily populated, use for isMemoized
var memoizedFuncPtr uintptr

// map of currently executing memoized hydrate funcs

var memoizedHydrateFunctionsPending = make(map[string]*sync.WaitGroup)
var memoizedHydrateLock sync.RWMutex

/*
HydrateFunc is a function that gathers data to build table rows.
Typically this would make an API call and return the raw API output.
Expand Down Expand Up @@ -46,14 +56,13 @@ Memoize ensures the [HydrateFunc] results are saved in the [connection.Connectio
Use it to reduce the number of API calls if the HydrateFunc is used by multiple tables.

NOTE: this should only be used to memoize a function which will be manually invoked and requires caching
It should NOT be used to memoize a hydrate function being passed toi a table definition.
Instead, use [MemoizeHydrate]
It should NOT be used to memoize a hydrate function being passed to a table definition.
*/
func (f HydrateFunc) Memoize(opts ...MemoizeOption) HydrateFunc {
// TODO determine if this is already memoized
// if so, return the existing memoized function

log.Printf("[INFO] Memoize %p %s", f, helpers.GetFunctionName(f))
if isMemoized(f) {
log.Printf("[WARN] Memoize %s - already memoized", helpers.GetFunctionName(f))
}
log.Printf("[INFO] Memoize %s", helpers.GetFunctionName(f))

config := newMemoizeConfiguration(f)
for _, o := range opts {
Expand Down Expand Up @@ -123,6 +132,9 @@ func (f HydrateFunc) Memoize(opts ...MemoizeOption) HydrateFunc {

log.Printf("[INFO] Memoize %p %s", f, helpers.GetFunctionName(f))

if memoizedFuncPtr == 0 {
memoizedFuncPtr = reflect.ValueOf(memoizedFunc).Pointer()
}
return memoizedFunc
}

Expand Down Expand Up @@ -176,3 +188,10 @@ func callAndCacheHydrate(ctx context.Context, d *QueryData, h *HydrateData, hydr
// return the hydrate data
return hydrateData, nil
}

// all memoized functions have the same pointer
// - to determine if a function is memoized, compare the pointer to a memoized function
func isMemoized(hydrateFunc HydrateFunc) bool {
res := reflect.ValueOf(hydrateFunc).Pointer() == memoizedFuncPtr
return res
}
19 changes: 5 additions & 14 deletions plugin/hydrate_call.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

// hydrateCall struct encapsulates a hydrate call, its config and dependencies
type hydrateCall struct {
NamedHydrateFunc
namedHydrateFunc
// the dependencies expressed using function name
Depends []NamedHydrateFunc
Depends []namedHydrateFunc
Config *HydrateConfig

queryData *QueryData
Expand All @@ -27,7 +27,7 @@ func newHydrateCall(config *HydrateConfig, d *QueryData) (*hydrateCall, error) {
// default to empty limiter
rateLimiter: rate_limiter.EmptyMultiLimiter(),
}
res.NamedHydrateFunc = config.namedHydrate
res.namedHydrateFunc = config.namedHydrate

for _, f := range config.Depends {
res.Depends = append(res.Depends, newNamedHydrateFunc(f))
Expand All @@ -38,7 +38,7 @@ func newHydrateCall(config *HydrateConfig, d *QueryData) (*hydrateCall, error) {

func (h *hydrateCall) shallowCopy() *hydrateCall {
return &hydrateCall{
NamedHydrateFunc: NamedHydrateFunc{
namedHydrateFunc: namedHydrateFunc{
Func: h.Func,
Name: h.Name,
},
Expand All @@ -53,11 +53,6 @@ func (h *hydrateCall) shallowCopy() *hydrateCall {
func (h *hydrateCall) initialiseRateLimiter() error {
log.Printf("[INFO] hydrateCall %s initialiseRateLimiter (%s)", h.Name, h.queryData.connectionCallId)

// if this call is memoized, do not assign a rate limiter
if h.IsMemoized {
log.Printf("[INFO] hydrateCall %s is memoized - assign an empty rate limiter (%s)", h.Name, h.queryData.connectionCallId)
return nil
}
// ask plugin to build a rate limiter for us
p := h.queryData.plugin

Expand Down Expand Up @@ -99,10 +94,6 @@ func (h *hydrateCall) canStart(rowData *rowData) bool {
// Start starts a hydrate call
func (h *hydrateCall) start(ctx context.Context, r *rowData, d *QueryData) time.Duration {
var rateLimitDelay time.Duration
// if we are memoized there is no need to rate limit
if !h.IsMemoized {
rateLimitDelay = h.rateLimit(ctx, d)
}

// tell the rowData to wait for this call to complete
r.wg.Add(1)
Expand All @@ -111,7 +102,7 @@ func (h *hydrateCall) start(ctx context.Context, r *rowData, d *QueryData) time.

// call callHydrate async, ignoring return values
go func() {
r.callHydrate(ctx, d, h.NamedHydrateFunc, h.Config)
r.callHydrate(ctx, d, h.namedHydrateFunc, h.Config)
h.onFinished()
}()
// retrieve the concurrencyDelay for the call
Expand Down
10 changes: 4 additions & 6 deletions plugin/hydrate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type HydrateConfig struct {
// Deprecated: use IgnoreConfig
ShouldIgnoreError ErrorPredicate

namedHydrate NamedHydrateFunc
namedHydrate namedHydrateFunc
}

func (c *HydrateConfig) String() string {
Expand All @@ -137,10 +137,8 @@ ScopeValues: %s`,
}

func (c *HydrateConfig) initialise(table *Table) {
// create a named hydrate func if one is not already set
if c.namedHydrate.Func == nil {
c.namedHydrate = newNamedHydrateFunc(c.Func)
}
// create a named hydrate func
c.namedHydrate = newNamedHydrateFunc(c.Func)

log.Printf("[TRACE] HydrateConfig.initialise func %s, table %s", c.namedHydrate.Name, table.Name)

Expand Down Expand Up @@ -175,7 +173,7 @@ func (c *HydrateConfig) initialise(table *Table) {
log.Printf("[TRACE] HydrateConfig.initialise complete: RetryConfig: %s, IgnoreConfig: %s", c.RetryConfig.String(), c.IgnoreConfig.String())
}

func (c *HydrateConfig) Validate(table *Table) []string {
func (c *HydrateConfig) validate(table *Table) []string {
var validationErrors []string
if c.Func == nil {
validationErrors = append(validationErrors, fmt.Sprintf("table '%s' HydrateConfig does not specify a hydrate function", table.Name))
Expand Down
4 changes: 2 additions & 2 deletions plugin/hydrate_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, h
return retryNamedHydrate(ctx, d, hydrateData, newNamedHydrateFunc(hydrate), retryConfig)
}

func retryNamedHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrate NamedHydrateFunc, retryConfig *RetryConfig) (hydrateResult interface{}, err error) {
func retryNamedHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrate namedHydrateFunc, retryConfig *RetryConfig) (hydrateResult interface{}, err error) {
ctx, span := telemetry.StartSpan(ctx, d.Table.Plugin.Name, "RetryHydrate (%s)", d.Table.Name)
span.SetAttributes(
attribute.String("hydrate-func", hydrate.Name),
Expand Down Expand Up @@ -108,7 +108,7 @@ func getBackoff(retryConfig *RetryConfig) (retry.Backoff, error) {
}

// WrapHydrate is a higher order function which returns a [HydrateFunc] that handles Ignorable errors.
func WrapHydrate(hydrate NamedHydrateFunc, ignoreConfig *IgnoreConfig) NamedHydrateFunc {
func WrapHydrate(hydrate namedHydrateFunc, ignoreConfig *IgnoreConfig) namedHydrateFunc {
res := hydrate.clone()

res.Func = func(ctx context.Context, d *QueryData, h *HydrateData) (item interface{}, err error) {
Expand Down
4 changes: 2 additions & 2 deletions plugin/list_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type ListConfig struct {
// Deprecated: Use IgnoreConfig
ShouldIgnoreError ErrorPredicate

namedHydrate NamedHydrateFunc
namedParentHydrate NamedHydrateFunc
namedHydrate namedHydrateFunc
namedParentHydrate namedHydrateFunc
}

func (c *ListConfig) initialise(table *Table) {
Expand Down
42 changes: 0 additions & 42 deletions plugin/memoize.go

This file was deleted.

22 changes: 8 additions & 14 deletions plugin/named_hydrate_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,27 @@ package plugin

import "github.com/turbot/go-kit/helpers"

type NamedHydrateFunc struct {
Func HydrateFunc
Name string
IsMemoized bool
type namedHydrateFunc struct {
Func HydrateFunc
Name string
}

func newNamedHydrateFunc(f HydrateFunc) NamedHydrateFunc {
res := NamedHydrateFunc{
func newNamedHydrateFunc(f HydrateFunc) namedHydrateFunc {
res := namedHydrateFunc{
Func: f,
Name: helpers.GetFunctionName(f),
}

return res
}

func (h NamedHydrateFunc) clone() NamedHydrateFunc {
return NamedHydrateFunc{
func (h namedHydrateFunc) clone() namedHydrateFunc {
return namedHydrateFunc{
Func: h.Func,
Name: h.Name,
}
}

// determine whether we are memoized
func (h NamedHydrateFunc) initialize() {
h.IsMemoized = h.Name == helpers.GetFunctionName(h.Func)
}

func (h NamedHydrateFunc) empty() bool {
func (h namedHydrateFunc) empty() bool {
return h.Func == nil
}
14 changes: 7 additions & 7 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ type QueryData struct {

fetchMetadata *hydrateMetadata
parentHydrateMetadata *hydrateMetadata
listHydrate NamedHydrateFunc
childHydrate NamedHydrateFunc
listHydrate namedHydrateFunc
childHydrate namedHydrateFunc
}

func newQueryData(connectionCallId string, p *Plugin, queryContext *QueryContext, table *Table, connectionData *ConnectionData, executeData *proto.ExecuteConnectionData, outputChan chan *proto.ExecuteResponse) (*QueryData, error) {
Expand Down Expand Up @@ -424,11 +424,11 @@ func (d *QueryData) populateRequiredHydrateCalls() error {
hydrateName = fetchFunc.Name
} else {
// there is a hydrate call registered
hydrateName = column.NamedHydrate.Name
hydrateName = column.namedHydrate.Name

// if this column was requested in query, add the hydrate call to required calls
if helpers.StringSliceContains(colsUsed, column.Name) {
if err := requiredCallBuilder.Add(column.NamedHydrate, d.connectionCallId); err != nil {
if err := requiredCallBuilder.Add(column.namedHydrate, d.connectionCallId); err != nil {
return err
}
}
Expand Down Expand Up @@ -472,7 +472,7 @@ func (d *QueryData) setMatrixItem(matrixItem map[string]interface{}) {
log.Printf("[INFO] setMatrixItem %s", matrixItem)
for col, value := range matrixItem {
qualValue := proto.NewQualValue(value)

// replace any existing entry for both Quals and EqualsQuals
d.EqualsQuals[col] = qualValue
d.Quals[col] = &KeyColumnQuals{Name: col, Quals: []*quals.Qual{{Column: col, Operator: quals.QualOperatorEqual, Value: qualValue}}}
}
Expand Down Expand Up @@ -552,7 +552,7 @@ func (d *QueryData) verifyCallerIsListCall(callingFunction string) bool {
// if the calling function is NOT one of the other registered hydrate functions,
//it must be an anonymous function so let it go
for _, c := range d.Table.Columns {
if c.NamedHydrate.Name == callingFunction {
if c.namedHydrate.Name == callingFunction {
return false
}
}
Expand Down Expand Up @@ -916,7 +916,7 @@ func (d *QueryData) removeReservedColumns(row *proto.Row) {
}
}

func (d *QueryData) setListCalls(listCall, childHydrate NamedHydrateFunc) {
func (d *QueryData) setListCalls(listCall, childHydrate namedHydrateFunc) {
d.listHydrate = listCall
d.childHydrate = childHydrate
}
Loading
Loading