Skip to content

Commit

Permalink
[FEAT] [EXPERIMENTAL] nats core requestMany() - allows implementing s…
Browse files Browse the repository at this point in the history
…catter gather receiving multi-message responses. (#311)
  • Loading branch information
aricart authored May 20, 2022
1 parent a7f28b3 commit c335a62
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 18 deletions.
3 changes: 2 additions & 1 deletion nats-base-client/internal_mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ export type { PH } from "./heartbeats.ts";
export { MuxSubscription } from "./muxsubscription.ts";
export { DataBuffer } from "./databuffer.ts";
export { checkOptions, checkUnsupportedOption } from "./options.ts";
export { Request } from "./request.ts";
export type { Request } from "./request.ts";
export { RequestOne } from "./request.ts";
export type {
Auth,
Authenticator,
Expand Down
4 changes: 2 additions & 2 deletions nats-base-client/jsmsg.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { DataBuffer } from "./databuffer.ts";
import { JSONCodec, StringCodec } from "./codec.ts";
import { MsgImpl } from "./msg.ts";
import { ProtocolHandler } from "./protocol.ts";
import { Request } from "./request.ts";
import { RequestOne } from "./request.ts";
import { nanos } from "./jsutil.ts";

export const ACK = Uint8Array.of(43, 65, 67, 75);
Expand Down Expand Up @@ -136,7 +136,7 @@ export class JsMsgImpl implements JsMsg {
if (this.msg.reply) {
const mi = this.msg as MsgImpl;
const proto = mi.publisher as unknown as ProtocolHandler;
const r = new Request(proto.muxSubscriptions, this.msg.reply);
const r = new RequestOne(proto.muxSubscriptions, this.msg.reply);
proto.request(r);
try {
proto.publish(
Expand Down
80 changes: 78 additions & 2 deletions nats-base-client/nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import {
Msg,
NatsConnection,
PublishOptions,
RequestManyOptions,
RequestOptions,
RequestStrategy,
ServerInfo,
Stats,
Status,
Expand All @@ -39,7 +41,11 @@ import { parseSemVer } from "./semver.ts";

import { parseOptions } from "./options.ts";
import { QueuedIterator, QueuedIteratorImpl } from "./queued_iterator.ts";
import { Request } from "./request.ts";
import {
RequestMany,
RequestManyOptionsInternal,
RequestOne,
} from "./request.ts";
import { isRequestError } from "./msg.ts";
import { JetStreamManagerImpl } from "./jsm.ts";
import { JetStreamClientImpl } from "./jsclient.ts";
Expand Down Expand Up @@ -144,6 +150,76 @@ export class NatsConnectionImpl implements NatsConnection {
this.protocol.resub(si, subject);
}

// possibilities are:
// stop on error or any non-100 status
// AND:
// - wait for timer
// - wait for n messages or timer
// - wait for unknown messages, done when empty or reset timer expires (with possible alt wait)
// - wait for unknown messages, done when an empty payload is received or timer expires (with possible alt wait)
requestMany(
subject: string,
data: Uint8Array = Empty,
opts: Partial<RequestManyOptions> = { maxWait: 1000, maxMessages: -1 },
): Promise<QueuedIterator<Msg | Error>> {
try {
this._check(subject, true, true);
} catch (err) {
return Promise.reject(err);
}

opts.strategy = opts.strategy || RequestStrategy.Timer;
opts.maxWait = opts.maxWait || 1000;
if (opts.maxWait < 1) {
return Promise.reject(new NatsError("timeout", ErrorCode.InvalidOption));
}

const qi = new QueuedIteratorImpl<Msg | Error>();
const stop = () => {
qi.stop();
};

const callback = (err: Error | null, msg: Msg | null) => {
if (err || msg === null) {
// FIXME: the stop function should not require commenting
if (err !== null) {
qi.push(err);
}
//@ts-ignore: stop function after consuming
qi.push(stop);
} else {
qi.push(msg);
}
};

const rmo = opts as RequestManyOptionsInternal;
rmo.callback = callback;

qi.iterClosed.then(() => {
r.cancel();
}).catch((err) => {
r.cancel(err);
});

const r = new RequestMany(this.protocol.muxSubscriptions, subject, rmo);
this.protocol.request(r);

try {
this.publish(
subject,
data,
{
reply: `${this.protocol.muxSubscriptions.baseInbox}${r.token}`,
headers: opts.headers,
},
);
} catch (err) {
r.cancel(err);
}

return Promise.resolve(qi);
}

request(
subject: string,
data: Uint8Array = Empty,
Expand Down Expand Up @@ -205,7 +281,7 @@ export class NatsConnectionImpl implements NatsConnection {
});
return d;
} else {
const r = new Request(this.protocol.muxSubscriptions, subject, opts);
const r = new RequestOne(this.protocol.muxSubscriptions, subject, opts);
this.protocol.request(r);

try {
Expand Down
117 changes: 107 additions & 10 deletions nats-base-client/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,132 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Deferred, deferred, extend, Timeout, timeout } from "./util.ts";
import type { Msg, RequestOptions } from "./types.ts";
import { Deferred, deferred, Timeout, timeout } from "./util.ts";
import type { Msg, RequestManyOptions, RequestOptions } from "./types.ts";
import { RequestStrategy } from "./types.ts";
import { ErrorCode, NatsError } from "./error.ts";
import { MuxSubscription } from "./muxsubscription.ts";
import { nuid } from "./nuid.ts";

export class Request {
export interface Request {
token: string;
requestSubject: string;
received: number;
resolver(err: Error | null, msg: Msg): void;
cancel(err?: NatsError): void;
}

export class BaseRequest {
token: string;
received: number;
deferred: Deferred<Msg>;
timer: Timeout<Msg>;
ctx: Error;
requestSubject: string;
private mux: MuxSubscription;
mux: MuxSubscription;

constructor(
mux: MuxSubscription,
requestSubject: string,
opts: RequestOptions = { timeout: 1000 },
) {
this.mux = mux;
this.requestSubject = requestSubject;
this.received = 0;
this.deferred = deferred();
this.token = nuid.next();
extend(this, opts);
this.timer = timeout<Msg>(opts.timeout);
this.ctx = new Error();
}
}

export interface RequestManyOptionsInternal extends RequestManyOptions {
callback: (err: Error | null, msg: Msg | null) => void;
}

/**
* Request expects multiple message response
* the request ends when the timer expires,
* an error arrives or an expected count of messages
* arrives, end is signaled by a null message
*/
export class RequestMany extends BaseRequest implements Request {
callback!: (err: Error | null, msg: Msg | null) => void;
done: Deferred<void>;
timer: number;
max: number;
opts: Partial<RequestManyOptionsInternal>;
constructor(
mux: MuxSubscription,
requestSubject: string,
opts: Partial<RequestManyOptions> = { maxWait: 1000 },
) {
super(mux, requestSubject);
this.opts = opts;
if (typeof this.opts.callback !== "function") {
throw new Error("callback is required");
}
this.callback = this.opts.callback;

this.max = typeof opts.maxMessages === "number" && opts.maxMessages > 0
? opts.maxMessages
: -1;
this.done = deferred();
this.done.then(() => {
this.callback(null, null);
});
this.timer = setTimeout(() => {
this.cancel();
}, opts.maxWait);
}

cancel(err?: NatsError): void {
if (err) {
this.callback(err, null);
}
clearTimeout(this.timer);
this.mux.cancel(this);
this.done.resolve();
}

resolver(err: Error | null, msg: Msg): void {
if (err) {
err.stack += `\n\n${this.ctx.stack}`;
this.cancel(err as NatsError);
} else {
this.callback(null, msg);
if (this.opts.strategy === RequestStrategy.Count) {
this.max--;
if (this.max === 0) {
this.cancel();
}
}

if (this.opts.strategy === RequestStrategy.JitterTimer) {
clearTimeout(this.timer);
this.timer = setTimeout(() => {
this.cancel();
}, 300);
}

if (this.opts.strategy === RequestStrategy.SentinelMsg) {
if (msg && msg.data.length === 0) {
this.cancel();
}
}
}
}
}

export class RequestOne extends BaseRequest implements Request {
deferred: Deferred<Msg>;
timer: Timeout<Msg>;

constructor(
mux: MuxSubscription,
requestSubject: string,
opts: RequestOptions = { timeout: 1000 },
) {
super(mux, requestSubject);
// extend(this, opts);
this.deferred = deferred();
this.timer = timeout<Msg>(opts.timeout);
}

resolver(err: Error | null, msg: Msg): void {
if (this.timer) {
Expand Down
14 changes: 14 additions & 0 deletions nats-base-client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,20 @@ export interface RequestOptions {
reply?: string;
}

export enum RequestStrategy {
Timer = "timer",
Count = "count",
JitterTimer = "jitterTimer",
SentinelMsg = "sentinelMsg",
}

export interface RequestManyOptions {
strategy: RequestStrategy;
maxWait: number;
headers?: MsgHdrs;
maxMessages?: number;
}

export interface PublishOptions {
reply?: string;
headers?: MsgHdrs;
Expand Down
2 changes: 1 addition & 1 deletion tests/auth_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ Deno.test("auth - perm sub iterator error", async () => {
const sub = nc.subscribe("q");
const iterReject = deferred<NatsError>();
(async () => {
for await (const m of sub) {
for await (const _m of sub) {
// ignored
}
})().catch((err) => {
Expand Down
Loading

0 comments on commit c335a62

Please sign in to comment.