Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement ScyllaDB Online Store #138

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

msistla96
Copy link
Collaborator

What this PR does / why we need it:

(WIP)
This PR is to implement the ScyllaDB online store for customers seeking a low cost Online Store when compared to Redis.
More details in https://confluence.expedia.biz/display/~msistla/Scylla+DB+Online+Store+Implementation
This PR contains the following changes:

  1. Changes to RepoConfig to support ScyllaDB as an online store.
  2. Changes to setup.py, go.mod and go.sum, Dockerfile, Makefile to add the ScyllaDB specific driver dependency for Python and Go, in order to take advantage of the shard aware functionality it offers. See https://github.com/scylladb/gocql and https://github.com/scylladb/python-driver for more details.
  3. Changes to CassandraOnlineStore in Python.
  4. Addition of CassandraOnlineStore as a new Online Store in Go.

TODO

  1. Add tests
  2. Metrics?

Which issue(s) this PR fixes:

N/A

Fixes

N/A

@msistla96 msistla96 changed the title feat: Implement scylla db online store feat: Implement ScyllaDB Online Store Sep 13, 2024
Makefile Outdated
@@ -486,7 +486,7 @@ build-java-docker-dev:
build-go-docker-dev:
docker buildx build --build-arg VERSION=dev \
-t feastdev/feature-server-go:dev \
-f go/infra/docker/feature-server/Dockerfile --load .
-f SCYLLADB_SHARD_AWARE_ENABLED=$(SCYLLADB_SHARD_AWARE_ENABLED) go/infra/docker/feature-server/Dockerfile --load .
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? Doesn't the drivers auto detect this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted, we should only be using the scylladb gocql fork

go.mod Outdated
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

require github.com/aws/aws-sdk-go v1.34.28
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I guess this was part of Meenakshi's testing? Deleted it anyway along with the code that used it.

go.mod Outdated

require github.com/aws/aws-sdk-go v1.34.28

require github.com/gocql/gocql v1.6.0
Copy link
Collaborator

@EXPEbdodla EXPEbdodla Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cassandra 1.6.0. Instructions for scylladb gocql: https://github.com/scylladb/gocql?tab=readme-ov-file#2-installation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run go mod tidy after.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1,5 +1,7 @@
FROM golang:1.22.5

ARG SCYLLADB_SHARD_AWARE_ENABLED=False
ENV SCYLLADB_SHARD_AWARE_ENABLED=$SCYLLADB_SHARD_AWARE_ENABLED
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the use of this ENV?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was for optionally selecting the ScyllaDB forked driver, but we're just going to use it by default.

@@ -23,6 +25,10 @@ RUN find ./protos -name "*.proto" \
# Build the Go application
RUN go build -o feast ./go/main.go

RUN if ["$SCYLLADB_SHARD_AWARE_ENABLED"== True]; then \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to go.mod

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 48 to 139
// Parse host_name and Ips
cassandraHosts, ok := onlineStoreConfig["hosts"]
if !ok {
cassandraHosts = []interface{}{"127.0.0.1"}
log.Warn().Msg("host not provided: Using 127.0.0.1 instead")
}

var rawCassandraHosts []interface{}
if rawCassandraHosts, ok = cassandraHosts.([]interface{}); !ok {
return nil, fmt.Errorf("didn't pass a list of hosts in the 'hosts' field")
}

var cassandraHostsStr = make([]string, len(rawCassandraHosts))
for i, rawHost := range rawCassandraHosts {
hostStr, ok := rawHost.(string)
if !ok {
return nil, fmt.Errorf("failed to convert a host to a string: %+v", rawHost)
}
cassandraHostsStr[i] = hostStr
}

username, ok := onlineStoreConfig["username"]
if !ok {
username = "cassandra"
log.Warn().Msg("username not defined: Using default username instead")
}

password, ok := onlineStoreConfig["password"]
if !ok {
password = "cassandra"
log.Warn().Msg("password not defined: Using default password instead")
}

var usernameStr string
if usernameStr, ok = username.(string); !ok {
return nil, fmt.Errorf("failed to convert username to string: %+v", usernameStr)
}

var passwordStr string
if passwordStr, ok = password.(string); !ok {
return nil, fmt.Errorf("failed to convert password to string: %+v", passwordStr)
}

keyspace, ok := onlineStoreConfig["keyspace"]
if !ok {
keyspace = "scylladb"
log.Warn().Msg("Keyspace not defined: Using 'scylladb' as keyspace instead")
}
store.keyspace = keyspace.(string)

var keyspaceStr string
if keyspaceStr, ok = keyspace.(string); !ok {
return nil, fmt.Errorf("failed to convert keyspace to string: %+v", keyspaceStr)
}

protocolVersion, ok := onlineStoreConfig["protocol_version"]
if !ok {
protocolVersion = 4.0
log.Warn().Msg("protocol_version not specified: Using 4 instead")
}
protocolVersionInt := int(protocolVersion.(float64))

redisTraceServiceName := os.Getenv("DD_SERVICE") + "-cassandra"
if redisTraceServiceName == "" {
redisTraceServiceName = "cassandra.client" // default service name if DD_SERVICE is not set
}
store.clusterConfigs = gocqltrace.NewCluster(cassandraHostsStr, gocqltrace.WithServiceName(redisTraceServiceName))
// TODO: Figure out if we need to offer users the ability to tune the timeouts
//store.clusterConfigs.ConnectTimeout = 1
//store.clusterConfigs.Timeout = 1
store.clusterConfigs.ProtoVersion = protocolVersionInt
//store.clusterConfigs.Consistency = gocql.Quorum
store.clusterConfigs.Keyspace = keyspaceStr
loadBalancingPolicy, ok := onlineStoreConfig["load_balancing"]
if !ok {
loadBalancingPolicy = gocql.RoundRobinHostPolicy()
log.Warn().Msg("No load balancing policy selected; setting Round Robin Host Policy")
}
loadBalancingPolicyStr, _ := loadBalancingPolicy.(string)
if loadBalancingPolicyStr == "DCAwareRoundRobinPolicy" {
store.clusterConfigs.PoolConfig.HostSelectionPolicy = gocql.RoundRobinHostPolicy()
} else if loadBalancingPolicyStr == "TokenAwarePolicy(DCAwareRoundRobinPolicy)" {
// Configure fallback policy if unable to reach the shard
fallback := gocql.RoundRobinHostPolicy()
// If using ScyllaDB and setting this policy, this makes the driver shard aware to improve performance
store.clusterConfigs.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(fallback)
if config.OnlineStore["type"] == "scylladb" {
store.clusterConfigs.Port = 19042
} else {
store.clusterConfigs.Port = 9042
}
}
Copy link
Collaborator

@EXPEbdodla EXPEbdodla Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use type assertions by defining a struct type for online store config. If it doesn't work, move to a different function to do all the parsing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extracted to different function.

store.clusterConfigs.ProtoVersion = protocolVersionInt
//store.clusterConfigs.Consistency = gocql.Quorum
store.clusterConfigs.Keyspace = keyspaceStr
loadBalancingPolicy, ok := onlineStoreConfig["load_balancing"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

load_balancing object has load_balancing_policy and local_dc fields. Seems they are not parsed correctly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right, addressed this.

if redisTraceServiceName == "" {
redisTraceServiceName = "cassandra.client" // default service name if DD_SERVICE is not set
}
store.clusterConfigs = gocqltrace.NewCluster(cassandraHostsStr, gocqltrace.WithServiceName(redisTraceServiceName))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

return fmt.Sprintf(
`SELECT "entity_key", "feature_name", "event_ts", "value" FROM %s WHERE "entity_key" IN (%s) AND "feature_name" IN (%s)`,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entity_key IN may not be efficient when working with partitions across different nodes. We may not be utilizing the shard aware functionality efficiently.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can start testing with this first and try running concurrent queries as we start testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 175 to 188
func (c *CassandraOnlineStore) buildCassandraEntityKeys(entityKeys []*types.EntityKey) ([]interface{}, map[string]int, error) {
cassandraKeys := make([]interface{}, len(entityKeys))
cassandraKeyToEntityIndex := make(map[string]int)
for i := 0; i < len(entityKeys); i++ {
var key, err = utils.SerializeEntityKey(entityKeys[i], c.config.EntityKeySerializationVersion)
if err != nil {
return nil, nil, err
}
// encoding to hex
encodedKey := hex.EncodeToString(*key)
cassandraKeys[i] = encodedKey
cassandraKeyToEntityIndex[encodedKey] = i
}
return cassandraKeys, cassandraKeyToEntityIndex, nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it same as buildRedisEntityKeys? can we move to key_utils if its the same?

Copy link
Member

@acevedosharp acevedosharp Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost, but I chose to use the same hex key in the array and map here for consistency, so they're slightly different.

Comment on lines 42 to 46
store, err := NewRedisOnlineStore("test", rc, config)
assert.Nil(t, err)
var opts = store.client.Options()
assert.Equal(t, opts.Addr, "redis://localhost:6379")
assert.Equal(t, opts.Password, "secret")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check this.

@@ -0,0 +1,117 @@
package utils
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this if not necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

setup.py Outdated Show resolved Hide resolved
Comment on lines 110 to 113
redisTraceServiceName := os.Getenv("DD_SERVICE") + "-cassandra"
if redisTraceServiceName == "" {
redisTraceServiceName = "cassandra.client" // default service name if DD_SERVICE is not set
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

variable names.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Session object that holds information about the connection to the cluster
session *gocqltrace.Session

// keyspace of the table. Defaulted to using the project name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not specified, set to scylladb. It's set to feast_keyspace in CassandraOnlineStoreConfig in python. Bring the consistency to the keyspace name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

tableName := c.getFqTableName(featureViewName)
cqlStatement := c.getCQLStatement(tableName, featureNames, len(serializedEntityKeys))
// Bundle the entity keys in one statement (gocql handles this as concurrent queries)
scanner := c.session.Query(cqlStatement, serializedEntityKeys...).Iter().Scanner()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will easily be impacted the default page size set by client. https://github.com/scylladb/gocql/blob/30a838e39b1f0e4d559989b9f30404c5399b3d5e/cluster.go#L142-L144C2
We have a clients requesting 200-300 features and 100 keys (if they do batching at orchestrator. If not we get more keys.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we partition queries, at one point the results of all those smaller queries will be in memory. I think such clients will have to request larger pods when deploying their feature servers.

redisTraceServiceName = "cassandra.client" // default service name if DD_SERVICE is not set
}
store.clusterConfigs = gocqltrace.NewCluster(cassandraHostsStr, gocqltrace.WithServiceName(redisTraceServiceName))
// TODO: Figure out if we need to offer users the ability to tune the timeouts
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering the NumConns set to 2 as default, I would like to add a config for that. Along with that add connection and request timeouts to play when testing

https://github.com/scylladb/gocql/blob/30a838e39b1f0e4d559989b9f30404c5399b3d5e/cluster.go#L40

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add RetryPolicy

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added config for numConns, request and connection timeouts.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the README here,

When working with a Scylla cluster the driver always opens one connection per shard, so NumConns is ignored.

Not sure if setting NumConns will work, maybe this also needs to be tested.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. we can get rid of that.

quotedFeatureNames[i] = fmt.Sprintf(`'%s'`, featureName)
}

return fmt.Sprintf(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IN has limit on 100 keys. We should see how we can handle keys with more than 100 features.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the way to go once we reach that point is to partition requests and execute them in parallel, adds complexity but would work.

var eventTs time.Time
var valueStr []byte
var deserializedValue types.Value
for scanner.Next() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can switch to iterator SliceMap(). It's the same code. May reduce the number of lines.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice, but I think it's more geared towards dynamic columns as it creates a map for every row, but in our case our table schema is fixed, so this surely has to come at a cost.
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants