Skip to content

Commit

Permalink
deps: migrate deprecated std/async/deferred.ts to `Promise.withReso…
Browse files Browse the repository at this point in the history
…lvers` (#411)
  • Loading branch information
uki00a authored Nov 25, 2023
1 parent 768cb08 commit 4539344
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 28 deletions.
20 changes: 9 additions & 11 deletions connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import {
kUnstablePipeline,
kUnstableReadReply,
} from "./internal/symbols.ts";
import {
Deferred,
deferred,
} from "./vendor/https/deno.land/std/async/deferred.ts";
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";

export interface SendCommandOptions {
Expand Down Expand Up @@ -74,7 +70,8 @@ export const kEmptyRedisArgs: Array<RedisValue> = [];
interface PendingCommand {
name: string;
args: RedisValue[];
promise: Deferred<RedisReply>;
resolve: (reply: RedisReply) => void;
reject: (error: unknown) => void;
returnUint8Arrays?: boolean;
}

Expand Down Expand Up @@ -151,11 +148,12 @@ export class RedisConnection implements Connection {
args?: Array<RedisValue>,
options?: SendCommandOptions,
): Promise<RedisReply> {
const promise = deferred<RedisReply>();
const { promise, resolve, reject } = Promise.withResolvers<RedisReply>();
this.commandQueue.push({
name: command,
args: args ?? kEmptyRedisArgs,
promise,
resolve,
reject,
returnUint8Arrays: options?.returnUint8Arrays,
});
if (this.commandQueue.length === 1) {
Expand Down Expand Up @@ -254,13 +252,13 @@ export class RedisConnection implements Connection {
command.args,
command.returnUint8Arrays,
);
command.promise.resolve(reply);
command.resolve(reply);
} catch (error) {
if (
!isRetriableError(error) ||
this.isManuallyClosedByUser()
) {
return command.promise.reject(error);
return command.reject(error);
}

for (let i = 0; i < this.maxRetryCount; i++) {
Expand All @@ -275,14 +273,14 @@ export class RedisConnection implements Connection {
command.returnUint8Arrays,
);

return command.promise.resolve(reply);
return command.resolve(reply);
} catch { // TODO: use `AggregateError`?
const backoff = this.backoff(i);
await delay(backoff);
}
}

command.promise.reject(error);
command.reject(error);
} finally {
this.commandQueue.shift();
this.processCommandQueue();
Expand Down
1 change: 0 additions & 1 deletion modules-lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"version": "@0.207.0",
"modules": [
"/assert/mod.ts",
"/async/deferred.ts",
"/async/delay.ts",
"/bytes/concat.ts",
"/testing/bdd.ts",
Expand Down
1 change: 0 additions & 1 deletion modules.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"version": "@0.207.0",
"modules": [
"/assert/mod.ts",
"/async/deferred.ts",
"/async/delay.ts",
"/bytes/concat.ts",
"/testing/bdd.ts",
Expand Down
17 changes: 7 additions & 10 deletions pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import {
} from "./protocol/shared/types.ts";
import { create, Redis } from "./redis.ts";
import { kUnstablePipeline } from "./internal/symbols.ts";
import {
Deferred,
deferred,
} from "./vendor/https/deno.land/std/async/deferred.ts";

export interface RedisPipeline extends Redis {
flush(): Promise<RawOrError[]>;
Expand Down Expand Up @@ -42,7 +38,8 @@ export class PipelineExecutor implements CommandExecutor {
args: RedisValue[];
returnUint8Arrays?: boolean;
}[];
d: Deferred<unknown[]>;
resolve: (value: RawOrError[]) => void;
reject: (error: unknown) => void;
}[] = [];

constructor(
Expand Down Expand Up @@ -80,21 +77,21 @@ export class PipelineExecutor implements CommandExecutor {
this.commands.unshift({ command: "MULTI", args: [] });
this.commands.push({ command: "EXEC", args: [] });
}
const d = deferred<RawOrError[]>();
this.queue.push({ commands: [...this.commands], d });
const { promise, resolve, reject } = Promise.withResolvers<RawOrError[]>();
this.queue.push({ commands: [...this.commands], resolve, reject });
if (this.queue.length === 1) {
this.dequeue();
}
this.commands = [];
return d;
return promise;
}

private dequeue(): void {
const [e] = this.queue;
if (!e) return;
this.connection[kUnstablePipeline](e.commands)
.then(e.d.resolve)
.catch(e.d.reject)
.then(e.resolve)
.catch(e.reject)
.finally(() => {
this.queue.shift();
this.dequeue();
Expand Down
7 changes: 3 additions & 4 deletions tests/commands/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
import { describe, it } from "../../vendor/https/deno.land/std/testing/bdd.ts";
import { nextPort, startRedis, stopRedis } from "../test_util.ts";
import type { Connector, TestServer } from "../test_util.ts";
import { deferred } from "../../vendor/https/deno.land/std/async/deferred.ts";

export function pubsubTests(
connect: Connector,
Expand Down Expand Up @@ -129,7 +128,7 @@ export function pubsubTests(

setTimeout(() => stopRedis(tempServer), 1000);

const promise = deferred();
const { promise, resolve, reject } = Promise.withResolvers<void>();
setTimeout(async () => {
try {
assertEquals(
Expand Down Expand Up @@ -158,9 +157,9 @@ export function pubsubTests(
);
assert(publisher.isConnected, "The publisher client is not connected.");

promise.resolve();
resolve();
} catch (error) {
promise.reject(error);
reject(error);
}
}, 2000);

Expand Down
1 change: 0 additions & 1 deletion vendor/https/deno.land/std/async/deferred.ts

This file was deleted.

0 comments on commit 4539344

Please sign in to comment.