diff --git a/index.js b/index.js index 7e856a4..0f5f284 100644 --- a/index.js +++ b/index.js @@ -1,8 +1,7 @@ import { parseArgs } from "node:util"; -import { prefetchRates } from "./src/api.js"; -import { Bot } from "./src/bot.js"; +import { Manager } from "./src/manager.js"; import logger from "./src/logger.js"; -import { initDB, insertIntoDB, closePool } from "./src/db.js"; +import { initDB, closePool } from "./src/db.js"; import { startServer } from "./src/server.js"; process.on("uncaughtException", (err) => { @@ -64,35 +63,17 @@ async function main() { ); await initDB(); + const manager = new Manager(); + pairs.forEach((pair) => { + manager.addBot(pair, { interval, threshold }); + }); + manager.start(); + startServer(); - try { - await prefetchRates(pairs); - } catch (err) { - logger.error(err, "Critical failure during cache population"); - process.exit(1); - } - - const handleAlert = async (alertData) => { - await insertIntoDB(alertData); - }; - - const bots = pairs.map((pair) => { - return new Bot( - { - pair, - interval, - threshold, - }, - handleAlert, - ); - }); - bots.forEach((b) => b.start()); - const shutdown = async () => { - logger.info("Shutting down..."); - bots.forEach((b) => b.stop()); - await new Promise((resolve) => setTimeout(resolve, 1000)); + await manager.shutdown(); + // Safe to close DB await closePool(); process.exit(0); }; diff --git a/src/db.js b/src/db.js index 9e6dccc..66ab775 100644 --- a/src/db.js +++ b/src/db.js @@ -111,14 +111,24 @@ export async function insertIntoDB(data) { } } -export async function getAlerts(limit = 50) { +export async function getAlertCount() { + const pool = getPool(); + const res = await pool.query("SELECT COUNT(*) FROM ALERTS"); + return parseInt(res.rows[0].count, 10); +} + +// Offset pagination for now +// Page 1 or 0, don't know which is best +export async function getAlerts(limit = 50, page = 1) { + const pool = getPool(); if (!pool) { return []; } - const res = await pool.query( - "SELECT * FROM alerts ORDER BY created_at DESC LIMIT $1", - [limit], - ); + + const offset = (page - 1) * limit; + // OFFSET probably not the best thing here + const query = `SELECT * FROM alerts ORDER BY created_at DESC LIMIT $1 OFFSET $2`; + const res = await pool.query(query, [limit, offset]); return res.rows; } diff --git a/src/manager.js b/src/manager.js new file mode 100644 index 0000000..6706fa6 --- /dev/null +++ b/src/manager.js @@ -0,0 +1,47 @@ +import { Bot } from "./bot.js"; +import { AlertQueue } from "./queue.js"; +import logger from "./logger.js"; + +export class Manager { + constructor() { + this.bots = new Map(); + this.queue = new AlertQueue(); + } + + addBot(pair, config) { + const normPair = pair.toUpperCase(); + if (this.bots.has(normPair)) return; + + const queueStrategy = async (alertData) => { + this.queue.push(alertData); + }; + + const bot = new Bot({ pair: normPair, ...config }, queueStrategy); + + this.bots.set(normPair, bot); + + logger.info(`[Manager] Bot added for ${pair}`); + } + + start() { + logger.info("[Manager] Starting all bots..."); + this.bots.forEach((bot) => bot.start()); + } + + async shutdown() { + this.bots.forEach((bot) => bot.stop()); + logger.info("[Manager] All bots stopped."); + await this.queue.drain(); + this.bots.clear(); + logger.info("[Manager] Shutdown complete."); + } + + restartBot(pair) { + const bot = this.bots.get(pair); + if (bot) { + bot.stop(); + bot.start(); + logger.info(`[Manager] Restarted ${pair}`); + } + } +} diff --git a/src/queue.js b/src/queue.js new file mode 100644 index 0000000..092d073 --- /dev/null +++ b/src/queue.js @@ -0,0 +1,52 @@ +import logger from "./logger.js"; +import { insertIntoDB } from "./db.js"; + +// This could all be done with Redis, not sure how hard that would be to implement +export class AlertQueue { + constructor() { + this.queue = []; + this.isPushing = false; + } + + push(alertData) { + this.queue.push(alertData); + logger.debug(`[Queue] Item added. Queue size: ${this.queue.length}`); + + this.process(); + } + + // Doing fifo, not sure if correct + async process() { + if (this.isPushing) return; + + this.isPushing = true; + + while (this.queue.length > 0) { + const task = this.queue.shift(); + + try { + await insertIntoDB(task); + } catch (err) { + logger.error(err, "[Queue] Failed to process alert"); + } + } + + this.isPushing = false; + } + + // This resolves when the queue is empty, i.e., when everything is processed + async clear() { + if (this.queue.length === 0 && !this.isPushing) return; + + logger.info(`[Queue] Clearing ${this.queue.length} remaining items...`); + + while (this.queue.length > 0 || this.isPushing) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + logger.info("[Queue] Drained."); + } +} + +// Singleton if needed +export const alertQueue = new AlertQueue(); diff --git a/src/server.js b/src/server.js index ed19b7b..946f1c7 100644 --- a/src/server.js +++ b/src/server.js @@ -1,7 +1,7 @@ import http from "node:http"; import { URL } from "node:url"; import { stats } from "./stats.js"; -import { getAlerts } from "./db.js"; +import { getAlertCount, getAlerts } from "./db.js"; import logger from "./logger.js"; const PORT = process.env.PORT || 3000; @@ -26,8 +26,26 @@ const server = http.createServer(async (req, res) => { if (req.method === "GET" && pathname === "/alerts") { const limit = parseInt(searchParams.get("limit")) || 50; - const alerts = await getAlerts(limit); - return sendJSON(res, { count: alerts.length, data: alerts }); + const page = parseInt(searchParams.get("page")) || 1; + if (limit > 100) limit = 100; + if (page < 1) page = 1; + const [data, totalItems] = await Promise.all([ + getAlerts(limit, page), + getAlertCount(), + ]); + const totalPages = Math.ceil(totalItems / limit); + + return sendJSON(res, { + metadata: { + page, + limit, + numItems, + numPages, + hasNext: page < totalPages, + hasPrev: page > 1, + }, + data, + }); } sendJSON(res, { error: "Not Found" }, 404);