Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error out waiting clients on close, handle too many connections #18

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/memjs/server.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions lib/memjs/server.js

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion lib/memjs/utils.d.ts.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion lib/memjs/utils.js

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions lotofconnstest.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#! node
const MemJS = require(".");

async function nextTick() {
return new Promise((resolve) => setImmediate(resolve));
}

const clients = [];

async function body() {
// local memcached is default 1024 max connections
for (let i = 0; i < 1025; i++) {
const client = MemJS.Client.create(undefined, {timeout: 1, connTimeout: 2, retries: 1});
console.log("created", i);

client.servers.forEach((server) => {
server.onConnect((sock) => {
console.log("connected", i);

sock.once("close", (err) => {
console.log("closed", i, err);
})
sock.once("error", (err) => {
console.log("error handler", i, err);
})
})
})

try {
await client.get("foo");
console.log(i)
} catch (error) {
console.log("error", error, i);
}

clients.push(client);
}

}

async function main() {
try {
await body();
console.log("smoketest ok");
} catch (error) {
console.error("fatal", error);
process.exit(1);
} finally {
clients.forEach((client) => {
client.servers.forEach((server) => server._socket?.removeAllListeners("close"));
client.quit()
});
}
}

main();
12 changes: 11 additions & 1 deletion src/memjs/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,14 @@ export class Server extends events.EventEmitter {
return this.responseBuffer;
}
responseHandler(dataBuf: Buffer) {
let response = parseMessage(this.appendToBuffer(dataBuf));
let response: Message | false;
try {
response = parseMessage(this.appendToBuffer(dataBuf));
} catch (e) {
this.error(e as Error);
return;
}

let respLength: number;
while (response) {
if (response.header.opcode === 0x20) {
Expand Down Expand Up @@ -218,6 +225,9 @@ export class Server extends events.EventEmitter {
});

self._socket.on("close", function () {
if (Object.keys(self.errorCallbacks).length > 0) {
self.error(new Error("socket closed unexpectedly."));
}
self.connected = false;
if (self.timeoutSet) {
self._socket?.setTimeout(0);
Expand Down
12 changes: 12 additions & 0 deletions src/memjs/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,22 @@ export interface Message {
extras: Buffer;
}

// Error message from memcached when it rejects a request for having too many
// open connections
// https://github.com/memcached/memcached/blob/efee763c93249358ea5b3b42c7fd4e57e2599c30/memcached.c#L3044
const ERROR_TOO_MANY_OPEN_CONNECTIONS = "ERROR Too many open connections\r\n";
schleyfox marked this conversation as resolved.
Show resolved Hide resolved

export const parseMessage = function (dataBuf: Buffer): Message | false {
if (dataBuf.length < 24) {
return false;
}

if (dataBuf.length === ERROR_TOO_MANY_OPEN_CONNECTIONS.length) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any known way we would get a partial buffer?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems unlikely. I don't think a 33 byte write that would be the only thing the server writes would get split.

Even assuming that happens ,the cases are:

  1. parseMessage gets a chunk of "ERROR Too many", incomplete message so waits for more data, and then another chunk of " open connections\r\n" = handles it normally
  2. we get a partial message and wait for more that never comes = command timeout is hit
  3. we get a partial message and the socket is closed = newly fixed closed handler is hit.

if (dataBuf.toString() === ERROR_TOO_MANY_OPEN_CONNECTIONS) {
throw new Error("ERROR Too many open connections");
}
}

const responseHeader = header.fromBuffer(dataBuf);

if (
Expand Down
14 changes: 14 additions & 0 deletions src/test/server_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ test('ResponseHandler with authentication error', function(t) {
t.end();
});

test('ResponseHandler with too many connections error', function(t) {
const server = new MemJS.Server('localhost', 11211);

server.onError('test', function(err) {
t.equal('ERROR Too many open connections', err.message);
});

const responseBuf = Buffer.from('ERROR Too many open connections\r\n');

server.responseHandler(responseBuf);

t.end();
});

test('Authenticate', function(t) {
const expectedBuf = makeRequestBuffer(0x21, 'PLAIN', '', '\0user1\0password');
const dummySocket = {
Expand Down
Loading