Skip to content

Commit

Permalink
Implement receiving dc_response from a DS
Browse files Browse the repository at this point in the history
Signed-off-by: TheFixer <[email protected]>
  • Loading branch information
TheFixer committed Nov 17, 2023
1 parent 9070f77 commit 5300408
Show file tree
Hide file tree
Showing 3 changed files with 462 additions and 5 deletions.
78 changes: 78 additions & 0 deletions src/durability/include/dds/durability/durablesupport.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ typedef int64_t DurableSupport_duration_t;
#define DurableSupport_duration_t__alloc() \
((DurableSupport_duration_t*) dds_alloc (sizeof (DurableSupport_duration_t)));

typedef uint16_t DurableSupport_responsetype_t;

#define DurableSupport_responsetype_t__alloc() \
((DurableSupport_responsetype_t*) dds_alloc (sizeof (DurableSupport_responsetype_t)));

typedef struct DurableSupport_request_id_t
{
DurableSupport_id_t client;
Expand Down Expand Up @@ -301,6 +306,79 @@ extern const dds_topic_descriptor_t DurableSupport_request_desc;
#define DurableSupport_request_free(d,o) \
dds_sample_free ((d), &DurableSupport_request_desc, (o))

#define DurableSupport_RESPONSETYPE_SET 1
#define DurableSupport_RESPONSETYPE_DATA 2
#ifndef DDS_SEQUENCE_DURABLESUPPORT_REQUEST_ID_T_DEFINED
#define DDS_SEQUENCE_DURABLESUPPORT_REQUEST_ID_T_DEFINED
typedef struct dds_sequence_DurableSupport_request_id_t
{
uint32_t _maximum;
uint32_t _length;
struct DurableSupport_request_id_t *_buffer;
bool _release;
} dds_sequence_DurableSupport_request_id_t;

#define dds_sequence_DurableSupport_request_id_t__alloc() \
((dds_sequence_DurableSupport_request_id_t*) dds_alloc (sizeof (dds_sequence_DurableSupport_request_id_t)));

#define dds_sequence_DurableSupport_request_id_t_allocbuf(l) \
((struct DurableSupport_request_id_t *) dds_alloc ((l) * sizeof (struct DurableSupport_request_id_t)))
#endif /* DDS_SEQUENCE_DURABLESUPPORT_REQUEST_ID_T_DEFINED */

typedef struct DurableSupport_response_set_t
{
char * partition;
char * tpname;
uint32_t flags;
dds_sequence_DurableSupport_request_id_t requestids;
} DurableSupport_response_set_t;

#ifndef DDS_SEQUENCE_OCTET_DEFINED
#define DDS_SEQUENCE_OCTET_DEFINED
typedef struct dds_sequence_octet
{
uint32_t _maximum;
uint32_t _length;
uint8_t *_buffer;
bool _release;
} dds_sequence_octet;

#define dds_sequence_octet__alloc() \
((dds_sequence_octet*) dds_alloc (sizeof (dds_sequence_octet)));

#define dds_sequence_octet_allocbuf(l) \
((uint8_t *) dds_alloc ((l) * sizeof (uint8_t)))
#endif /* DDS_SEQUENCE_OCTET_DEFINED */

typedef struct DurableSupport_response_data_t
{
dds_sequence_octet blob;
} DurableSupport_response_data_t;

typedef struct DurableSupport_response_content
{
DurableSupport_responsetype_t _d;
union
{
struct DurableSupport_response_set_t set;
struct DurableSupport_response_data_t data;
} _u;
} DurableSupport_response_content;

typedef struct DurableSupport_response
{
DurableSupport_id_t id;
struct DurableSupport_response_content body;
} DurableSupport_response;

extern const dds_topic_descriptor_t DurableSupport_response_desc;

#define DurableSupport_response__alloc() \
((DurableSupport_response*) dds_alloc (sizeof (DurableSupport_response)));

#define DurableSupport_response_free(d,o) \
dds_sample_free ((d), &DurableSupport_response_desc, (o))

#ifdef __cplusplus
}
#endif
Expand Down
172 changes: 167 additions & 5 deletions src/durability/src/dds_durability.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@

#define TRACE(...) DDS_CLOG (DDS_LC_DUR, &domaingv->logconfig, __VA_ARGS__)

/************* start of common functions ****************/
/* LH:
* The following functions are duplicated from their equivalents used by the ds
* In future we might want to create a common library that is used both by ds and dc.
*/

static char *dc_stringify_id(const DurableSupport_id_t id, char *buf)
{
assert(buf);
Expand All @@ -38,6 +44,119 @@ static char *dc_stringify_id(const DurableSupport_id_t id, char *buf)
return buf;
}

static char *dc_responsetype_image (DurableSupport_responsetype_t type)
{
switch (type) {
case DurableSupport_RESPONSETYPE_SET:
return "set";
case DurableSupport_RESPONSETYPE_DATA:
return "data";
default:
return "?";
}
}

static char *dc_blob_image (dds_sequence_octet blob)
{
char *buf;
uint32_t len;
uint32_t n;
uint32_t i;

#define BLOB_IMAGE_SIZE 24
len = (blob._length < BLOB_IMAGE_SIZE) ? blob._length : BLOB_IMAGE_SIZE;
n = len*2+1;
buf = ddsrt_malloc(n);
if (len < BLOB_IMAGE_SIZE) {
for (i=0; i < len; i++) {
sprintf(buf+i*2, "%02x", (char)blob._buffer[i]);
}
} else {
for (i=0; i < 11; i++) {
sprintf(buf+i*2, "%02x", (char)blob._buffer[i]);
}
strcpy(buf+22, "...");
for (i=0; i < 11; i++) {
sprintf(buf+25+i*2, "%02x", (char)blob._buffer[len-11+i]);
}
}
buf[2*len] = '\0';
#undef BLOB_IMAGE_SIZE
return buf;
}

static int dc_stringify_response (char *buf, size_t n, const DurableSupport_response *response)
{
size_t i = 0, j = 0;
int l;
char *str;
char id_str[37];

if (buf == NULL) {
goto err;
}
if (response == NULL) {
buf[0] = '\0';
return 0;
}
if ((l = snprintf(buf+i, n-i, "{\"id\":\"%s\", \"type\":\"%s\", \"content\":{", dc_stringify_id(response->id, id_str), dc_responsetype_image(response->body._d))) < 0) {
goto err;
}
i += (size_t)l;
if (i >= n) {
goto trunc;
}
switch (response->body._d) {
case DurableSupport_RESPONSETYPE_SET :
if ((l = snprintf(buf+i, n-i, "\"partition\":\"%s\", \"tpname\":\"%s\", \"flags\":\"0x%04" PRIx32 "\", \"requestids\":[", response->body._u.set.partition, response->body._u.set.tpname, response->body._u.set.flags)) < 0) {
goto err;
}
i += (size_t)l;
for (j=0; j < response->body._u.set.requestids._length; j++) {
DurableSupport_request_id_t requestid = response->body._u.set.requestids._buffer[j];
if ((l = snprintf(buf+i, n-(size_t)i, "%s{\"client\":\"%s\", \"seq\":%" PRIu64 "}", (j==0) ? "" : ",", dc_stringify_id(requestid.client, id_str), requestid.seq)) < 0) {
goto err;
}
i += (size_t)l;
if (i >= n) {
goto trunc;
}
}
if (i < n) {
buf[i++] = ']';
}
break;
case DurableSupport_RESPONSETYPE_DATA :
str = dc_blob_image(response->body._u.data.blob);
if ((l = snprintf(buf+i, n-i, "\"blob\":\"%s\"", str)) < 0) {
ddsrt_free(str);
goto err;
}
ddsrt_free(str);
i += (size_t)l;
break;
default:
goto err;
}
if (i < n) {
buf[i++] = '}';
}
if (i < n) {
buf[i++] = '}';
}
trunc:
if (i >= n) {
/* truncated */
buf[n-1] = '\0';
}
return (int)i;
err:
DDS_ERROR("dc_stringify_response failed, type: %" PRIu16 ", buf: %s\n", response->body._d, !buf ? "NULL" : buf);
return -1;
}

/****** end of common functions *******/

struct server_t {
ddsrt_avl_node_t node;
DurableSupport_id_t id; /* id key */
Expand Down Expand Up @@ -709,7 +828,9 @@ static struct com_t *dc_com_new (struct dc_t *dc, const dds_domainid_t domainid,
DDS_ERROR("failed to create dc response subscriber [%s]\n", dds_strretcode(-com->response_subscriber));
goto err_response_subscriber;
}
if ((com->tp_response = dds_create_topic (com->participant, &DurableSupport_bead_desc, "dc_response", tqos, NULL)) < 0) {
dds_qset_durability(tqos, DDS_DURABILITY_VOLATILE);
dds_qset_history(tqos, DDS_HISTORY_KEEP_ALL, DDS_LENGTH_UNLIMITED);
if ((com->tp_response = dds_create_topic (com->participant, &DurableSupport_response_desc, "dc_response", tqos, NULL)) < 0) {
DDS_ERROR("failed to create dc response topic [%s]\n", dds_strretcode(-com->tp_response));
goto err_tp_response;
}
Expand Down Expand Up @@ -921,7 +1042,8 @@ static dds_return_t dc_com_request_write (struct com_t *com, dds_entity_t reader
request->partitions._buffer[i] = ddsrt_strdup(partitions[i]);
}
request->tpname = ddsrt_strdup(tpname);
request->timeout = DDS_SECS(5);
// request->timeout = DDS_SECS(5);
request->timeout = DDS_INFINITY;
if ((ret = dds_write(com->wr_request, request)) < 0) {
DDS_ERROR("failed to publish dc_request [%s]", dds_strretcode(-ret));
goto err_request_write;
Expand Down Expand Up @@ -1020,6 +1142,44 @@ static int dc_process_status (dds_entity_t rd, struct dc_t *dc)
#undef MAX_SAMPLES
}

static int dc_process_response (dds_entity_t rd, struct dc_t *dc)
{
#define MAX_SAMPLES 100

void *samples[MAX_SAMPLES];
dds_sample_info_t info[MAX_SAMPLES];
int samplecount;
int j;

/* dds_read/take allocates memory for the data if samples[0] is a null pointer.
* The memory must be released when done by returning the loan */
samples[0] = NULL;
samplecount = dds_take_mask (rd, samples, info, MAX_SAMPLES, MAX_SAMPLES, DDS_NOT_READ_SAMPLE_STATE | DDS_ANY_VIEW_STATE | DDS_ANY_INSTANCE_STATE);
if (samplecount < 0) {
DDS_ERROR("failed to take dc_response [%s]", dds_strretcode(-samplecount));
goto err_samplecount;
} else {
/* process the response
* we ignore invalid samples and only process valid responses */
for (j = 0; !dds_triggered(dc->com->ws) && j < samplecount; j++) {
DurableSupport_response *response = (DurableSupport_response *)samples[j];
char str[1024] = { 0 }; /* max string representation size */
int l;
if (info[j].valid_data && ((l = dc_stringify_response(str, sizeof(str), response)) > 0)) {
size_t len = (size_t)l;

/* LH: TODO: add statistics for responses */
DDS_CLOG(DDS_LC_DUR, &dc->gv->logconfig, "dc_response %s%s\n", str, (len >= sizeof(str)) ? "..(trunc)" : "");
}
}
}
(void)dds_return_loan (rd, samples, samplecount);
err_samplecount:
return samplecount;

#undef MAX_SAMPLES
}

/* called when there is a match for a durable writer */
static void default_durable_writer_matched_cb (dds_entity_t writer, dds_publication_matched_status_t status, void *arg)
{
Expand Down Expand Up @@ -1110,14 +1270,14 @@ static struct pending_request_t *dc_publish_reader_request (struct dc_t *dc, dds
pr->seq = ++(dc->seq); /* sequence number; key for the fibheap tree */
pr->reader = reader;
pr->rhc = rhc;
/* LH: We currently never dispose a request to a DS on the same hos
/* LH: We currently never explicitly dispose a request to a DS on the same host
* (which is the only mode we currently support for now). In this way
* a disconnect with the DS will lead to a dispose of the request
* on the DS (by virtue of the autodispose property). A reconnect with
* the DS will lead to the request being received again.
* Once we allow sending requests to (multiple) remote DSs, we need to
* implement a mechanism to detect a disconnect with the DS, and
* perhaps choose to another DS to send a request.
* perhaps choose another DS to send a request to.
* We also need to cancel requests to DSs that have not responded yet in
* case we received the required data from one of the DSs */
pr->exp_time = DDS_NEVER;
Expand Down Expand Up @@ -1270,7 +1430,7 @@ static uint32_t recv_handler (void *a)
{
struct dc_t *dc = (struct dc_t *)a;
dds_time_t timeout = DDS_NEVER;
dds_attach_t wsresults[2];
dds_attach_t wsresults[3];
size_t wsresultsize = sizeof(wsresults) / sizeof(wsresults[0]);
int n, j;

Expand All @@ -1283,6 +1443,8 @@ static uint32_t recv_handler (void *a)
for (j=0; j < n && (size_t)j < wsresultsize; j++) {
if (wsresults[j] == dc->com->rd_status) {
dc_process_status(dc->com->rd_status, dc);
} else if (wsresults[j] == dc->com->rd_response) {
dc_process_response(dc->com->rd_response, dc);
} else if (wsresults[j] == dc->com->pending_request_guard) {
dc_process_pending_request_guard(dc, &timeout);
}
Expand Down
Loading

0 comments on commit 5300408

Please sign in to comment.