Implemented offset based pagination, added simple queue. Looking into Redis

This commit is contained in:
Vítor Vieira 2026-01-07 21:44:51 +00:00
parent 532169d748
commit 494f8de101
5 changed files with 145 additions and 37 deletions

View file

@ -1,8 +1,7 @@
import { parseArgs } from "node:util";
import { prefetchRates } from "./src/api.js";
import { Bot } from "./src/bot.js";
import { Manager } from "./src/manager.js";
import logger from "./src/logger.js";
import { initDB, insertIntoDB, closePool } from "./src/db.js";
import { initDB, closePool } from "./src/db.js";
import { startServer } from "./src/server.js";
process.on("uncaughtException", (err) => {
@ -64,35 +63,17 @@ async function main() {
);
await initDB();
const manager = new Manager();
pairs.forEach((pair) => {
manager.addBot(pair, { interval, threshold });
});
manager.start();
startServer();
try {
await prefetchRates(pairs);
} catch (err) {
logger.error(err, "Critical failure during cache population");
process.exit(1);
}
const handleAlert = async (alertData) => {
await insertIntoDB(alertData);
};
const bots = pairs.map((pair) => {
return new Bot(
{
pair,
interval,
threshold,
},
handleAlert,
);
});
bots.forEach((b) => b.start());
const shutdown = async () => {
logger.info("Shutting down...");
bots.forEach((b) => b.stop());
await new Promise((resolve) => setTimeout(resolve, 1000));
await manager.shutdown();
// Safe to close DB
await closePool();
process.exit(0);
};

View file

@ -111,14 +111,24 @@ export async function insertIntoDB(data) {
}
}
export async function getAlerts(limit = 50) {
export async function getAlertCount() {
const pool = getPool();
const res = await pool.query("SELECT COUNT(*) FROM ALERTS");
return parseInt(res.rows[0].count, 10);
}
// Offset pagination for now
// Page 1 or 0, don't know which is best
export async function getAlerts(limit = 50, page = 1) {
const pool = getPool();
if (!pool) {
return [];
}
const res = await pool.query(
"SELECT * FROM alerts ORDER BY created_at DESC LIMIT $1",
[limit],
);
const offset = (page - 1) * limit;
// OFFSET probably not the best thing here
const query = `SELECT * FROM alerts ORDER BY created_at DESC LIMIT $1 OFFSET $2`;
const res = await pool.query(query, [limit, offset]);
return res.rows;
}

47
src/manager.js Normal file
View file

@ -0,0 +1,47 @@
import { Bot } from "./bot.js";
import { AlertQueue } from "./queue.js";
import logger from "./logger.js";
export class Manager {
constructor() {
this.bots = new Map();
this.queue = new AlertQueue();
}
addBot(pair, config) {
const normPair = pair.toUpperCase();
if (this.bots.has(normPair)) return;
const queueStrategy = async (alertData) => {
this.queue.push(alertData);
};
const bot = new Bot({ pair: normPair, ...config }, queueStrategy);
this.bots.set(normPair, bot);
logger.info(`[Manager] Bot added for ${pair}`);
}
start() {
logger.info("[Manager] Starting all bots...");
this.bots.forEach((bot) => bot.start());
}
async shutdown() {
this.bots.forEach((bot) => bot.stop());
logger.info("[Manager] All bots stopped.");
await this.queue.drain();
this.bots.clear();
logger.info("[Manager] Shutdown complete.");
}
restartBot(pair) {
const bot = this.bots.get(pair);
if (bot) {
bot.stop();
bot.start();
logger.info(`[Manager] Restarted ${pair}`);
}
}
}

52
src/queue.js Normal file
View file

@ -0,0 +1,52 @@
import logger from "./logger.js";
import { insertIntoDB } from "./db.js";
// This could all be done with Redis, not sure how hard that would be to implement
export class AlertQueue {
constructor() {
this.queue = [];
this.isPushing = false;
}
push(alertData) {
this.queue.push(alertData);
logger.debug(`[Queue] Item added. Queue size: ${this.queue.length}`);
this.process();
}
// Doing fifo, not sure if correct
async process() {
if (this.isPushing) return;
this.isPushing = true;
while (this.queue.length > 0) {
const task = this.queue.shift();
try {
await insertIntoDB(task);
} catch (err) {
logger.error(err, "[Queue] Failed to process alert");
}
}
this.isPushing = false;
}
// This resolves when the queue is empty, i.e., when everything is processed
async clear() {
if (this.queue.length === 0 && !this.isPushing) return;
logger.info(`[Queue] Clearing ${this.queue.length} remaining items...`);
while (this.queue.length > 0 || this.isPushing) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
logger.info("[Queue] Drained.");
}
}
// Singleton if needed
export const alertQueue = new AlertQueue();

View file

@ -1,7 +1,7 @@
import http from "node:http";
import { URL } from "node:url";
import { stats } from "./stats.js";
import { getAlerts } from "./db.js";
import { getAlertCount, getAlerts } from "./db.js";
import logger from "./logger.js";
const PORT = process.env.PORT || 3000;
@ -26,8 +26,26 @@ const server = http.createServer(async (req, res) => {
if (req.method === "GET" && pathname === "/alerts") {
const limit = parseInt(searchParams.get("limit")) || 50;
const alerts = await getAlerts(limit);
return sendJSON(res, { count: alerts.length, data: alerts });
const page = parseInt(searchParams.get("page")) || 1;
if (limit > 100) limit = 100;
if (page < 1) page = 1;
const [data, totalItems] = await Promise.all([
getAlerts(limit, page),
getAlertCount(),
]);
const totalPages = Math.ceil(totalItems / limit);
return sendJSON(res, {
metadata: {
page,
limit,
numItems,
numPages,
hasNext: page < totalPages,
hasPrev: page > 1,
},
data,
});
}
sendJSON(res, { error: "Not Found" }, 404);