Skip to content

Commit

Permalink
Merge pull request #653 from foxglove/achim/performance-single-messag…
Browse files Browse the repository at this point in the history
…e-handler

Use a single "message" event listener to dispatch received messages
  • Loading branch information
surma authored Nov 7, 2024
2 parents dffe905 + f987ffb commit ad4d2d1
Showing 1 changed file with 37 additions and 14 deletions.
51 changes: 37 additions & 14 deletions src/comlink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ interface ThrownValue {
type SerializedThrownValue =
| { isError: true; value: Error }
| { isError: false; value: unknown };
type PendingListenersMap = Map<
string,
(value: WireValue | PromiseLike<WireValue>) => void
>;

/**
* Internal transfer handler to handle thrown exceptions.
Expand Down Expand Up @@ -392,7 +396,26 @@ function closeEndPoint(endpoint: Endpoint) {
}

export function wrap<T>(ep: Endpoint, target?: any): Remote<T> {
return createProxy<T>(ep, [], target) as any;
const pendingListeners : PendingListenersMap = new Map();

ep.addEventListener("message", function handleMessage(ev: Event) {
const { data } = ev as MessageEvent;
if (!data || !data.id) {
return;
}
const resolver = pendingListeners.get(data.id);
if (!resolver) {
return;
}

try {
resolver(data);
} finally {
pendingListeners.delete(data.id);
}
});

return createProxy<T>(ep, pendingListeners, [], target) as any;
}

function throwIfProxyReleased(isReleased: boolean) {
Expand All @@ -402,7 +425,7 @@ function throwIfProxyReleased(isReleased: boolean) {
}

function releaseEndpoint(ep: Endpoint) {
return requestResponseMessage(ep, {
return requestResponseMessage(ep, new Map(), {
type: MessageType.RELEASE,
}).then(() => {
closeEndPoint(ep);
Expand Down Expand Up @@ -447,6 +470,7 @@ function unregisterProxy(proxy: object) {

function createProxy<T>(
ep: Endpoint,
pendingListeners: PendingListenersMap,
path: (string | number | symbol)[] = [],
target: object = function () {}
): Remote<T> {
Expand All @@ -458,20 +482,21 @@ function createProxy<T>(
return () => {
unregisterProxy(proxy);
releaseEndpoint(ep);
pendingListeners.clear();
isProxyReleased = true;
};
}
if (prop === "then") {
if (path.length === 0) {
return { then: () => proxy };
}
const r = requestResponseMessage(ep, {
const r = requestResponseMessage(ep, pendingListeners, {
type: MessageType.GET,
path: path.map((p) => p.toString()),
}).then(fromWireValue);
return r.then.bind(r);
}
return createProxy(ep, [...path, prop]);
return createProxy(ep, pendingListeners, [...path, prop]);
},
set(_target, prop, rawValue) {
throwIfProxyReleased(isProxyReleased);
Expand All @@ -480,6 +505,7 @@ function createProxy<T>(
const [value, transferables] = toWireValue(rawValue);
return requestResponseMessage(
ep,
pendingListeners,
{
type: MessageType.SET,
path: [...path, prop].map((p) => p.toString()),
Expand All @@ -492,17 +518,18 @@ function createProxy<T>(
throwIfProxyReleased(isProxyReleased);
const last = path[path.length - 1];
if ((last as any) === createEndpoint) {
return requestResponseMessage(ep, {
return requestResponseMessage(ep, pendingListeners, {
type: MessageType.ENDPOINT,
}).then(fromWireValue);
}
// We just pretend that `bind()` didn’t happen.
if (last === "bind") {
return createProxy(ep, path.slice(0, -1));
return createProxy(ep, pendingListeners, path.slice(0, -1));
}
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
pendingListeners,
{
type: MessageType.APPLY,
path: path.map((p) => p.toString()),
Expand All @@ -516,6 +543,7 @@ function createProxy<T>(
const [argumentList, transferables] = processArguments(rawArgumentList);
return requestResponseMessage(
ep,
pendingListeners,
{
type: MessageType.CONSTRUCT,
path: path.map((p) => p.toString()),
Expand Down Expand Up @@ -595,23 +623,18 @@ function fromWireValue(value: WireValue): any {

function requestResponseMessage(
ep: Endpoint,
pendingListeners: PendingListenersMap,
msg: Message,
transfers?: Transferable[]
): Promise<WireValue> {
return new Promise((resolve) => {
const id = generateUUID();
ep.addEventListener("message", function l(ev: MessageEvent) {
if (!ev.data || !ev.data.id || ev.data.id !== id) {
return;
}
ep.removeEventListener("message", l as any);
resolve(ev.data);
} as any);
pendingListeners.set(id, resolve);
if (ep.start) {
ep.start();
}
ep.postMessage({ id, ...msg }, transfers);
});
});
}

function generateUUID(): string {
Expand Down

0 comments on commit ad4d2d1

Please sign in to comment.