Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add redisClusterAsyncConnect2() #171

Merged
merged 8 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ if (cc != NULL && cc->err) {
#### Events per cluster context

There is a hook to get notified when certain events occur.
Currently, there is only one such event.
It is that the slotmap has been updated.

```c
int redisClusterSetEventCallback(redisClusterContext *cc,
Expand All @@ -224,7 +222,9 @@ The callback is called with `event` set to one of the following values:

* `HIRCLUSTER_EVENT_SLOTMAP_UPDATED` when the slot mapping has been updated;
* `HIRCLUSTER_EVENT_READY` when the slot mapping has been fetched for the first
time and the client is ready to accept commands;
time and the client is ready to accept commands, useful when initiating the
client with `redisClusterAsyncConnect2()` where a client is not immediately
ready after a successful call;
* `HIRCLUSTER_EVENT_FREE_CONTEXT` when the cluster context is being freed, so
that the user can free the event privdata.

Expand Down Expand Up @@ -354,20 +354,64 @@ for hiredis-cluster as well.

### Connecting

The function `redisClusterAsyncConnect` can be used to
establish a set of non-blocking connections to a Redis cluster.
It returns a pointer to the newly created `redisClusterAsyncContext` struct.
The `err` field should be checked after creation
to see if there were errors creating the asynchronous cluster context.
There are two alternative ways to initiate a cluster client which also determines
how the client behaves during the initial connect.

The first alternative is to use the function `redisClusterAsyncConnect`, which initially
connects to the cluster in a blocking fashion and waits for the slotmap before returning.
Any command sent by the user thereafter will create a new non-blocking connection,
unless a non-blocking connection already exists to the destination.
The function returns a pointer to a newly created `redisClusterAsyncContext` struct and
its `err` field should be checked to make sure the initial slotmap update was successful.

```c
// Insufficient error handling for brevity.
redisClusterAsyncContext *acc = redisClusterAsyncConnect("127.0.0.1:6379", HIRCLUSTER_FLAG_NULL);
if (acc->err) {
printf("Error: %s\n", acc->errstr);
// handle error
printf("error: %s\n", acc->errstr);
exit(1);
}

// Attach an event engine. In this example we use libevent.
struct event_base *base = event_base_new();
redisClusterLibeventAttach(acc, base);
```

The second alternative is to use `redisClusterAsyncContextInit` and `redisClusterAsyncConnect2`
which avoids the initial blocking connect. This connection alternative requires an attached
event engine when `redisClusterAsyncConnect2` is called, but the connect and the initial
slotmap update is done in a non-blocking fashion.

This means that commands sent directly after `redisClusterAsyncConnect2` may fail
because the initial slotmap has not yet been retrieved and the client doesn't know which
cluster node to send the command to. You may use the [eventCallback](#events-per-cluster-context)
to be notified when the slotmap is updated and the client is ready to accept commands.
An crude example of using the eventCallback can be found in [this testcase](tests/ct_async.c).

```c
// Insufficient error handling for brevity.
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
bjosv marked this conversation as resolved.
Show resolved Hide resolved

// Add a cluster node address for the initial connect.
redisClusterSetOptionAddNodes(acc->cc, "127.0.0.1:6379");

// Attach an event engine. In this example we use libevent.
struct event_base *base = event_base_new();
redisClusterLibeventAttach(acc, base);

if (redisClusterAsyncConnect2(acc) != REDIS_OK) {
printf("error: %s\n", acc->errstr);
exit(1);
}
```

#### Events per cluster context

Use [`redisClusterSetEventCallback`](#events-per-cluster-context) with `acc->cc`
as the context to get notified when certain events occur.

#### Events per connection

zuiderkwast marked this conversation as resolved.
Show resolved Hide resolved
Because the connections that will be created are non-blocking,
the kernel is not able to instantly return if the specified
host and port is able to accept a connection.
Expand Down
8 changes: 8 additions & 0 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3749,6 +3749,14 @@ redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs,
return acc;
}

int redisClusterAsyncConnect2(redisClusterAsyncContext *acc) {
/* An adapter to an async event library is required. */
if (acc->adapter == NULL) {
return REDIS_ERR;
}
return updateSlotMapAsync(acc, NULL /*any node*/);
}

int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn) {
if (acc->onConnect == NULL) {
Expand Down
3 changes: 3 additions & 0 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,11 @@ int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,
redisDisconnectCallback *fn);

/* Connect and update slotmap, will block until complete. */
redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs,
int flags);
/* Connect and update slotmap asynchronously using configured event engine. */
int redisClusterAsyncConnect2(redisClusterAsyncContext *acc);
void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc);

/* Commands */
Expand Down
1 change: 1 addition & 0 deletions hiredis_cluster.def
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
redisClusterAsyncCommandArgvToNode
redisClusterAsyncCommandToNode
redisClusterAsyncConnect
redisClusterAsyncConnect2
redisClusterAsyncDisconnect
redisClusterAsyncFormattedCommand
redisClusterAsyncFormattedCommandToNode
Expand Down
64 changes: 43 additions & 21 deletions tests/ct_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,57 @@ void disconnectCallback(const redisAsyncContext *ac, int status) {
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
(void)cc;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;

/* We send our commands when the client is ready to accept commands. */
if (event == HIRCLUSTER_EVENT_READY) {
int status;
status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key12345 value");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

/* This command will trigger a disconnect in its reply callback. */
status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key12345");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key23456 value2");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key23456");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
}
}

int main(void) {
redisClusterAsyncContext *acc =
redisClusterAsyncConnect(CLUSTER_NODE, HIRCLUSTER_FLAG_NULL);

redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
ASSERT_MSG(acc->err == 0, acc->errstr);

int status;
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
status = redisClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == REDIS_OK);
status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == REDIS_OK);
status = redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == REDIS_OK);

redisClusterAsyncSetConnectCallback(acc, connectCallback);
redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key12345 value");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key12345");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
/* Expect error when connecting without an attached event library. */
status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_ERR);

status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key23456 value2");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);

status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key23456");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_OK);

event_base_dispatch(base);

Expand Down