Skip to content

Commit

Permalink
Add redisClusterAsyncConnect2()
Browse files Browse the repository at this point in the history
redisClusterAsyncConnect2() uses non-blocking sockets to get the
initial slot map compared to the legacy function
redisClusterAsyncConnect() which uses the blocking API internally.
  • Loading branch information
bjosv committed Jun 22, 2023
1 parent f456b45 commit a58646b
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 34 deletions.
60 changes: 47 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,17 @@ if (cc != NULL && cc->err) {
#### Events per cluster context
There is a hook to get notified when certain events occur.
Currently these events exits:
* The slotmap has been updated.
* The cluster context is freed.
```c
int redisClusterSetEventCallback(redisClusterContext *cc,
void(fn)(const redisClusterContext *cc, int event, void *privdata),
void *privdata);
```

The callback is called with `event` set to `HIRCLUSTER_EVENT_SLOTMAP_UPDATED`
when the slotmap has been updated, or set to `HIRCLUSTER_EVENT_FREEING`
when the cluster context is freeing all its data.
Currently these events exits:

* `HIRCLUSTER_EVENT_SLOTMAP_UPDATED`: The slotmap has been updated.
* `HIRCLUSTER_EVENT_FREEING`: The cluster context data is subsequently freed.

The `privdata` argument can be used to carry arbitrary data to the callback
and the freeing event will indicate when it's ok to free `privdata`.
Expand Down Expand Up @@ -349,20 +346,57 @@ for hiredis-cluster as well.

### Connecting

The function `redisClusterAsyncConnect` can be used to
establish a set of non-blocking connections to a Redis cluster.
It returns a pointer to the newly created `redisClusterAsyncContext` struct.
The `err` field should be checked after creation
to see if there were errors creating the asynchronous cluster context.
There are two alternative ways to initiate a cluster client which also determines
how the client behaves during the initial connect.

The first alternative is to use the function `redisClusterAsyncConnect`, which initially
connects to the cluster in a blocking fashion and waits for the slot map before returning.
Any command sent by the user thereafter will create a new non-blocking connection,
unless a non-blocking connection already exists to the destination.
The function returns a pointer to a newly created `redisClusterAsyncContext` struct.
The `err` field should be checked after creation to make sure the slot map could be
fetched correctly.

```c
redisClusterAsyncContext *acc = redisClusterAsyncConnect("127.0.0.1:6379", HIRCLUSTER_FLAG_NULL);
if (acc->err) {
// Handle error.
printf("Error: %s\n", acc->errstr);
// handle error
exit(1);
}

// Attach an event engine. In this example we use libevent.
struct event_base *base = event_base_new();
redisClusterLibeventAttach(acc, base);
```
The second alternative is to use `redisClusterAsyncContextInit` and `redisClusterAsyncConnect2`
which avoids the initial blocking connect. This connection alternative requires an attached
event engine when called but the initial connect and fetch of slot map is done in a
non-blocking fashion.
This means that commands sent directly after `redisClusterAsyncConnect2()` returns may fail
because the initial slotmap has not yet been retrieved and then it is not yet known to which
Redis node each command should be sent. You may use the [eventCallback](#events-per-cluster-context)
to be notified when the slotmap is updated and the client is ready to accept commands.
An crude example of using the eventCallback can be found in [this testcase](tests/ct_async.c).
```c
// Error handling omitted for brevity.
redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
// Add a cluster node address for the initial connect.
redisClusterSetOptionAddNodes(acc->cc, "127.0.0.1:6379");
// Attach an event engine. In this example we use libevent.
struct event_base *base = event_base_new();
redisClusterLibeventAttach(acc, base);
redisClusterAsyncConnect2(acc);
```

#### Events per connection

Because the connections that will be created are non-blocking,
the kernel is not able to instantly return if the specified
host and port is able to accept a connection.
Expand Down
8 changes: 8 additions & 0 deletions hircluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -3758,6 +3758,14 @@ redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs,
return acc;
}

int redisClusterAsyncConnect2(redisClusterAsyncContext *acc) {
/* An adapter to an async event library is required. */
if (acc->adapter == NULL) {
return REDIS_ERR;
}
return updateSlotMapAsync(acc, NULL /*any node*/);
}

int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc,
redisConnectCallback *fn) {
if (acc->onConnect == NULL) {
Expand Down
1 change: 1 addition & 0 deletions hircluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc,

redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs,
int flags);
int redisClusterAsyncConnect2(redisClusterAsyncContext *acc);
void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc);

/* Commands */
Expand Down
1 change: 1 addition & 0 deletions hiredis_cluster.def
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
redisClusterAsyncCommandArgvToNode
redisClusterAsyncCommandToNode
redisClusterAsyncConnect
redisClusterAsyncConnect2
redisClusterAsyncDisconnect
redisClusterAsyncFormattedCommand
redisClusterAsyncFormattedCommandToNode
Expand Down
63 changes: 42 additions & 21 deletions tests/ct_async.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,56 @@ void disconnectCallback(const redisAsyncContext *ac, int status) {
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}

void eventCallback(const redisClusterContext *cc, int event, void *privdata) {
(void)cc;
redisClusterAsyncContext *acc = (redisClusterAsyncContext *)privdata;

if (event == HIRCLUSTER_EVENT_SLOTMAP_UPDATED) {
int status;
status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key12345 value");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

/* This command will trigger a disconnect in its reply callback. */
status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key12345");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key23456 value2");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key23456");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
}
}

int main(void) {
redisClusterAsyncContext *acc =
redisClusterAsyncConnect(CLUSTER_NODE, HIRCLUSTER_FLAG_NULL);

redisClusterAsyncContext *acc = redisClusterAsyncContextInit();
assert(acc);
ASSERT_MSG(acc->err == 0, acc->errstr);

int status;
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
status = redisClusterAsyncSetConnectCallback(acc, connectCallback);
assert(status == REDIS_OK);
status = redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);
assert(status == REDIS_OK);
status = redisClusterSetEventCallback(acc->cc, eventCallback, acc);
assert(status == REDIS_OK);
status = redisClusterSetOptionAddNodes(acc->cc, CLUSTER_NODE);
assert(status == REDIS_OK);

redisClusterAsyncSetConnectCallback(acc, connectCallback);
redisClusterAsyncSetDisconnectCallback(acc, disconnectCallback);

status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key12345 value");
ASSERT_MSG(status == REDIS_OK, acc->errstr);

status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key12345");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
/* Expect error when connecting without an attached event library. */
status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_ERR);

status = redisClusterAsyncCommand(acc, setCallback, (char *)"ID",
"SET key23456 value2");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
struct event_base *base = event_base_new();
status = redisClusterLibeventAttach(acc, base);
assert(status == REDIS_OK);

status = redisClusterAsyncCommand(acc, getCallback, (char *)"ID",
"GET key23456");
ASSERT_MSG(status == REDIS_OK, acc->errstr);
status = redisClusterAsyncConnect2(acc);
assert(status == REDIS_OK);

event_base_dispatch(base);

Expand Down

0 comments on commit a58646b

Please sign in to comment.