Skip to content

Commit

Permalink
Merge pull request #241 from Zekiah-A/main
Browse files Browse the repository at this point in the history
Move db worker to typescript, bug fixes and strict typing
  • Loading branch information
Zekiah-A authored Feb 25, 2024
2 parents 32598e8 + a851c95 commit 03ec9c0
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 211 deletions.
Binary file modified bun.lockb
Binary file not shown.
208 changes: 144 additions & 64 deletions db-worker.js → db-worker.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,99 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable no-inner-declarations */
import { parentPort } from 'worker_threads'
import { Database } from 'bun:sqlite'
import { Queue } from '@datastructures-js/queue'

const db = new Database("server.db")

export type LiveChatMessage = {
messageId: number,
sendDate: number,
channel: string,
message: string,
senderIntId: number,
repliesTo: number|null,
deletionId: number|null
}
export type LiveChatReaction = {
messageId: number,
reaction: string,
senderIntId: number
}
export type PlaceChatMessage = {
messageId: number,
sendDate: number,
message: string,
senderIntId: number,
x: number,
y: number
}
export type Ban = {
banId: number,
userIntId: number,
startDate: number,
finishDate: number,
moderatorIntId: number,
reason: string,
userAppeal: string,
appealRejected: number
}
export type Mute = {
muteId: number,
startDate: number,
finishDate: number,
userIntId: number,
moderatorIntId: number,
reason: string,
userAppeal: string,
appealRejected: number
}
export type User = {
intId: number,
chatName: string,
token: string,
lastJoined: number,
pixelsPlaced: number,
playTimeSeconds: number
}
export type KnownIp = {
userIntId: number,
ip: string,
lastUsed: number
}
export type UserVip = {
userIntId: number,
keyHash: string,
lastUsed: number
}
export type DeletionMessageInfo = {
messageId: number,
reason: string,
moderatorIntId: number
}
export type LiveChatDeletion = {
deletionId: number,
moderatorIntId: number,
reason: string,
deletionDate: number
}
export type AuthenticateUser = { token: string, ip: string }
export type DbInternals = {
setUserChatName: (data: { newName: string, intId: number }) => void,
getUserChatName: (intId: number) => string|null,
authenticateUser: (data: AuthenticateUser) => void,
getLiveChatHistory: (data: { messageId: number, count: number, before: number, channel?: string, includeDeleted?: boolean }) => LiveChatMessage[],
updatePixelPlace: (intId: number) => void,
getMaxLiveChatId: () => number,
getMaxPlaceChatId: () => number,
commitShutdown: () => void,
insertLiveChat: (data: LiveChatMessage) => void,
deleteLiveChat: (data: DeletionMessageInfo) => void,
insertPlaceChat: (data: PlaceChatMessage) => void,
updateUserVip: (data: { intId: number, codeHash: string}) => void,
exec: (data: { stmt: string, params: any }) => any[]|null
}

try{
const createLiveChatMessages = `
CREATE TABLE IF NOT EXISTS LiveChatMessages (
Expand Down Expand Up @@ -84,24 +173,24 @@ const createUsers = `
)
`
db.exec(createUsers)
const createUserIps = `
const createKnownIps = `
CREATE TABLE IF NOT EXISTS KnownIps (
userIntId INTEGER NOT NULL,
ip TEXT NOT NULL,
lastUsed INTEGER,
FOREIGN KEY (userIntId) REFERENCES Users(intId)
)
` // ip and userIntId combined form a composite key to identify a record
db.exec(createUserIps)
const createVipKeys = `
db.exec(createKnownIps)
const createVips = `
CREATE TABLE IF NOT EXISTS UserVips (
userIntId INTEGER NOT NULL,
keyHash TEXT NOT NULL,
lastUsed INTEGER,
FOREIGN KEY(userIntId) REFERENCES Users(intId)
)
`
db.exec(createVipKeys)
db.exec(createVips)
const createLiveChatDeletions = `
CREATE TABLE IF NOT EXISTS LiveChatDeletions (
deletionId INTEGER PRIMARY KEY,
Expand All @@ -114,17 +203,29 @@ const createLiveChatDeletions = `
db.exec(createLiveChatDeletions)


const insertLiveChat = db.prepare("INSERT INTO LiveChatMessages (messageId, message, sendDate, channel, senderIntId, repliesTo) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")
const insertPlaceChat = db.prepare("INSERT INTO PlaceChatMessages (messageId, message, sendDate, senderIntId, x, y) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")
const updatePixelPlaces = db.prepare("UPDATE Users SET pixelsPlaced = pixelsPlaced + ?1 WHERE intId = ?2")
const insertLiveChat = db.prepare<void, LiveChatMessage>(`
INSERT INTO LiveChatMessages (messageId, message, sendDate, channel, senderIntId, repliesTo, deletionId)
VALUES ($messageId, $message, $sendDate, $channel, $senderIntId, $repliesTo, $deletionId)`)
const insertPlaceChat = db.prepare<void, PlaceChatMessage>(`
INSERT INTO PlaceChatMessages (messageId, message, sendDate, senderIntId, x, y)
VALUES ($messageId, $message, $sendDate, $senderIntId, $x, $y)`)
const updatePixelPlaces = db.prepare<void, [number, number]>(`
UPDATE Users SET pixelsPlaced = pixelsPlaced + ?1 WHERE intId = ?2`)

const pixelPlaces = new Map() // intId, count
const liveChatInserts = new Queue()
const placeChatInserts = new Queue()
interface PublicQueue<T> extends Queue<T> {
_elements: T[]
}
const pixelPlaces = new Map<number, number>() // intId, count
// @ts-expect-error Some chicanery to bypass them hiding _elements from the definition
const liveChatInserts:PublicQueue<LiveChatMessage> = new Queue<LiveChatMessage>()
const placeChatInserts = new Queue<PlaceChatMessage>()
/**
* Bulk insert live chat messages, pixel places and place chats on an interval loop
*/
function performBulkInsertions() {
// insert all new pixel places
db.transaction(() => {
for (let placePair of pixelPlaces) {
for (const placePair of pixelPlaces) {
updatePixelPlaces.run(placePair[1], placePair[0])
pixelPlaces.delete(placePair[0])
}
Expand All @@ -134,58 +235,54 @@ function performBulkInsertions() {
db.transaction(() => {
while (!liveChatInserts.isEmpty()) {
const data = liveChatInserts.dequeue()
insertLiveChat.run(...data)
insertLiveChat.run(data)
}
while (!placeChatInserts.isEmpty()) {
const data = placeChatInserts.dequeue()
insertPlaceChat.run(...data)
insertPlaceChat.run(data)
}
})()
}
setInterval(performBulkInsertions, 10000)

const internal = {
/** @param {{ newName: string, intId: number }} data */
const internal: DbInternals = {
setUserChatName: function(data) {
const updateQuery = db.query("UPDATE Users SET chatName = ?1 WHERE intId = ?2")
updateQuery.run(data.newName, data.intId)
},
getUserChatName: function(intId) {
const getNameQuery = db.query("SELECT chatName FROM Users WHERE intId = ?1")
const getNameQuery = db.query<User, number>("SELECT chatName FROM Users WHERE intId = ?1")
const result = getNameQuery.get(intId)
return result ? result.chatName : null
},
/** @param {{ token: string, ip: string }} data */
authenticateUser: function(data) {
const selectUser = db.query("SELECT * FROM Users WHERE token = ?1")
const selectUser = db.query<User, string>("SELECT * FROM Users WHERE token = ?1")
const epochMs = Date.now()

let user = selectUser.get(data.token)
if (!user) { // Create new user
const insertUser = db.query(
"INSERT INTO Users (token, lastJoined, pixelsPlaced, playTimeSeconds) VALUES (?1, ?2, ?3, ?4) RETURNING intId")
const insertUser = db.query<User, [string, number, number, number]>(
"INSERT INTO Users (token, lastJoined, pixelsPlaced, playTimeSeconds) VALUES (?1, ?2, ?3, ?4) RETURNING *")
user = insertUser.get(data.token, epochMs, 0, 0)
return user.intId
}
else { // Update last joined
const updateUser = db.query("UPDATE Users SET lastJoined = ?1 WHERE intId = ?2")
updateUser.run(epochMs, user.intId)
}
// Add known IP if not already there
const getIpsQuery = db.query("SELECT * FROM KnownIps WHERE userIntId = ?1 AND ip = ?2")
const getIpsQuery = db.query<KnownIp, [number, string]>("SELECT * FROM KnownIps WHERE userIntId = ?1 AND ip = ?2")
const ipExists = getIpsQuery.get(user.intId, data.ip)
if (ipExists) { // Update last used
const updateIp = db.query("UPDATE KnownIps SET lastUsed = ?1 WHERE userIntId = ?2 AND ip = ?3")
const updateIp = db.query<void, [number, number, string]>("UPDATE KnownIps SET lastUsed = ?1 WHERE userIntId = ?2 AND ip = ?3")
updateIp.run(epochMs, user.intId, data.ip)
}
else { // Create new
const createIp = db.query("INSERT INTO KnownIps (userIntId, ip, lastUsed) VALUES (?1, ?2, ?3)")
const createIp = db.query<void, [number, string, number]>("INSERT INTO KnownIps (userIntId, ip, lastUsed) VALUES (?1, ?2, ?3)")
createIp.run(user.intId, data.ip, epochMs)
}

return user.intId
},
/** @param {{ messageId: number, count: number, before: boolean, channel: string?, includeDeleted?: boolean }} data */
getLiveChatHistory: function(data) {
const liveChatMessageId = internal.getMaxLiveChatId()
let params = []
Expand Down Expand Up @@ -236,58 +333,45 @@ const internal = {
pixelPlaces.set(intId, (pixelPlaces.get(intId)||0) + 1)
},
getMaxLiveChatId: function() {
const getMaxMessageId = db.query("SELECT MAX(messageID) AS maxMessageID FROM LiveChatMessages")
const maxMessageID = getMaxMessageId.get().maxMessageID || 0
return maxMessageID
const getMaxMessageId = db.query<{ maxMessageId: number }, any>("SELECT MAX(messageId) AS maxMessageId FROM LiveChatMessages")
const maxMessageId = getMaxMessageId.get().maxMessageId || 0
return maxMessageId
},
getMaxPlaceChatId: function() {
const getMaxMessageId = db.query("SELECT MAX(messageID) AS maxMessageID FROM PlaceChatMessages")
const maxMessageID = getMaxMessageId.get().maxMessageID || 0
return maxMessageID
const getMaxMessageId = db.query<{ maxMessageId: number }, any>("SELECT MAX(messageId) AS maxMessageId FROM PlaceChatMessages")
const maxMessageId = getMaxMessageId.get().maxMessageId || 0
return maxMessageId
},
commitShutdown: function() {
performBulkInsertions()
db.close()
},
/** Send date is seconds unix epoch offset, we just hope whoever calls these funcs passed in the args in the right order
* else the DB is screwed.
* @param {[ messageId: number, message: string, sendDate: number, channel: string, senderIntId: number, repliesTo: number ]} data */
/* Send date is seconds unix epoch offset */
insertLiveChat: function(data) {
if (!Array.isArray(data) || data.length < 5) {
return
}
if (data.length == 5) {
data[5] = null // Set column 6 to repliesTo default
}
data.repliesTo = null
data.deletionId = null
liveChatInserts.push(data)
},
/** Messages may or may not be in the DB by the time they are being asked to be deleted due to periodic transactions
* @param {{ messageId: number, reason: string, moderatorIntId: number }} data
*/
// Messages may or may not be in the DB by the time they are being asked to be deleted due to periodic transactions
deleteLiveChat: function(data) {
const deletionQuery = db.query(
"INSERT INTO LiveChatDeletions (moderatorIntId, reason, deletionDate) VALUES (?1, ?2, ?3) RETURNING deletionId")
const deletionId = deletionQuery.get()
const deletionQuery = db.query<LiveChatDeletion, DeletionMessageInfo>(
"INSERT INTO LiveChatDeletions (moderatorIntId, reason, deletionDate) VALUES ($moderatorIntId, $reason, $messageId) RETURNING *")
const deletion = deletionQuery.get(data)

// If pending we can update the record in preflight
for (let messageData of liveChatInserts._elements) {
if (messageData[0] === data.messageId) {
messageData[6] = deletionId // Live chat deletion
// If pending to be inserted into DB we can update the record in preflight
for (const messageData of liveChatInserts._elements) {
if (messageData.messageId === data.messageId) {
messageData.deletionId = deletion.deletionId
return
}
}

const query = db.query("UPDATE LiveChatMessages SET deletionId = ?1 WHERE messageId = ?2")
query.run(deletionId, data.messageId)
const query = db.query<void, LiveChatDeletion>("UPDATE LiveChatMessages SET deletionId = $deletionId WHERE messageId = $messageId")
query.run(deletion)
},
/** @param {[messageId: number, message: string, sendDate: number, senderIntId: number, x: number, y: number ]} data */
insertPlaceChat: function(data) {
if (!Array.isArray(data) || data.length < 6) {
return
}
placeChatInserts.push(data)
},
/** @param {{ intId: number, codeHash: string,}} data */
updateUserVip: function(data) {
const epochMs = Date.now()
const getKeysQuery = db.query("SELECT * FROM UserVips WHERE userIntId = ?1 AND keyHash = ?2")
Expand All @@ -302,13 +386,9 @@ const internal = {
createVipQuery.run(data.intId, data.codeHash, epochMs)
}
},
/**
* @param {{ stmt: string, params: any }} data SQL stmt, bind parms to be passed into query
* @returns {any[]|null} query reusult
*/
exec: function(data) {
try {
let query = db.query(data.stmt)
const query = db.query(data.stmt)
return (typeof data.params[Symbol.iterator] === "function"
? query.all(...data.params)
: query.all(data.params))
Expand All @@ -320,9 +400,9 @@ const internal = {
},
}

parentPort.on("message", (message) => {
parentPort?.on("message", (message) => {
const result = internal[message.call] && internal[message.call](message.data)
parentPort.postMessage({ handle: message.handle, data: result })
parentPort?.postMessage({ handle: message.handle, data: result })
})
}
catch(e){
Expand Down
9 changes: 0 additions & 9 deletions jsconfig.json

This file was deleted.

Loading

0 comments on commit 03ec9c0

Please sign in to comment.