Skip to content

Commit

Permalink
improve error handling and reconnecting for reused UDP sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
KurtThiemann committed Nov 12, 2024
1 parent 2018877 commit 4b6effc
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "craftping",
"type": "module",
"version": "1.1.2",
"version": "1.1.3",
"main": "index.js",
"repository": "github:aternosorg/craftping",
"scripts": {
Expand Down
1 change: 1 addition & 0 deletions src/UDPSocket/UDPClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export default class UDPClient extends EventEmitter {
for (const promise of this.readQueue) {
promise.reject(error ?? new NetworkError("Client was closed"));
}
this.readQueue = [];
return this;
}

Expand Down
84 changes: 66 additions & 18 deletions src/UDPSocket/UDPSocket.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import {EventEmitter} from "node:events";
import dgram from "node:dgram";
import UDPMessage from "./UDPMessage.js";
import NetworkError from "../Error/NetworkError.js";

export default class UDPSocket extends EventEmitter {
/** @type {import("node:dgram").Socket} */ socket;
/** @type {?import("node:dgram").Socket} */ socket = null;
/** @type {Set<UDPClient>} */ clients = new Set();
/** @type {boolean} */ open = false;
/** @type {import("node:dgram").SocketOptions}} */ socketOptions;
/** @type {import("node:dgram").BindOptions}} */ bindOptions;

/**
Expand All @@ -14,23 +16,35 @@ export default class UDPSocket extends EventEmitter {
*/
constructor(options = {type: "udp4"}, bindOptions = {}) {
super();
this.socketOptions = options;
this.bindOptions = bindOptions;
this.socket = dgram.createSocket(options, this.handleMessage.bind(this));
this.socket.on("error", this.handleError.bind(this));
this.socket.on("close", this.handleClose.bind(this));
}

/**
* @param {?AbortSignal} signal
* @return {Promise<this>}
*/
bind() {
bind(signal = null) {
let socket = dgram.createSocket(this.socketOptions, this.handleMessage.bind(this));
this.socket = socket;
this.socket.on("error", this.handleError.bind(this));
this.socket.on("close", this.handleClose.bind(this));
return new Promise((resolve, reject) => {
this.socket.once("error", reject);
this.socket.bind(this.bindOptions, () => {
this.socket.off("error", reject);
let success = false;
socket.once("error", reject);
socket.bind(this.bindOptions, () => {
socket.off("error", reject);
this.open = true;
success = true;
resolve(this);
});

signal?.addEventListener("abort", () => {
if (!success) {
socket.close();
reject(new Error("Operation was aborted"));
}
});
});
}

Expand All @@ -39,19 +53,27 @@ export default class UDPSocket extends EventEmitter {
*/
close() {
return new Promise((resolve, reject) => {
if (!this.open) {
if (!this.open || !this.socket) {
resolve(this);
return;
}

this.socket.close(err => {
if (err) {
reject(err);
try {
this.socket.close(err => {
if (err && err.code !== "ERR_SOCKET_DGRAM_NOT_RUNNING") {
reject(err);
return;
}

resolve(this);
});
} catch (e) {
if (e.code === "ERR_SOCKET_DGRAM_NOT_RUNNING") {
resolve(this);
return;
}

resolve(this);
});
reject(e);
}
});
}

Expand All @@ -60,10 +82,18 @@ export default class UDPSocket extends EventEmitter {
*/
handleClose() {
this.open = false;
this.closeClients();
this.emit("close");
return this;
}

/**
* @return {this}
*/
closeClients() {
for (const client of this.clients) {
client.handleClose();
}
this.emit("close");
return this;
}

Expand All @@ -78,6 +108,13 @@ export default class UDPSocket extends EventEmitter {
if (this.listenerCount("error") > 0) {
this.emit("error", error);
}
this.open = false;
try {
this.socket?.close();
} catch (e) {
}
this.closeClients();
this.socket = null;
return this;
}

Expand All @@ -104,7 +141,7 @@ export default class UDPSocket extends EventEmitter {
*/
async register(client) {
if (!this.open) {
await this.bind();
await this.bind(client.signal);
}
this.clients.add(client);
client.on("dispose", this.handleDisposeClient.bind(this));
Expand All @@ -119,7 +156,18 @@ export default class UDPSocket extends EventEmitter {
*/
send(msg, port, address) {
return new Promise((resolve, reject) => {
this.socket.send(msg, port, address, err => {
if (!this.open || !this.socket) {
reject(new NetworkError("Socket is not open"));
return;
}

let socket = this.socket;
function close() {
reject(new NetworkError("Socket was closed"));
}
socket.once("close", close);
socket.send(msg, port, address, err => {
socket.off("close", close);
if (err) {
reject(err);
return;
Expand Down

0 comments on commit 4b6effc

Please sign in to comment.