Simplified api code, corrected tests
This commit is contained in:
parent
628d7ade76
commit
8929d4cd6e
5 changed files with 47 additions and 150 deletions
8
index.js
8
index.js
|
|
@ -1,3 +1,4 @@
|
|||
import fs from "node:fs";
|
||||
import { parseArgs } from "node:util";
|
||||
import { prefetchRates } from "./src/api.js";
|
||||
import { Bot } from "./src/bot.js";
|
||||
|
|
@ -56,6 +57,10 @@ if (pairs.length === 0 || pairs.some((p) => !p.match(/^[A-Z]+-?[A-Z]+$/))) {
|
|||
logger.error("Invalid pair format (expected BTC-USD or CNYUSD)");
|
||||
process.exit(1);
|
||||
}
|
||||
if (threshold <= 0 || threshold > 100) {
|
||||
logger.error("Threshold must be between 0 and 100");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
async function main() {
|
||||
logger.info("Uphold price alert bot starting...");
|
||||
|
|
@ -72,6 +77,8 @@ async function main() {
|
|||
process.exit(1);
|
||||
}
|
||||
|
||||
fs.promises.writeFile("/tmp/healthy", "ok");
|
||||
|
||||
const handleAlert = async (alertData) => {
|
||||
await insertIntoDB(alertData);
|
||||
};
|
||||
|
|
@ -91,6 +98,7 @@ async function main() {
|
|||
const shutdown = async () => {
|
||||
logger.info("Shutting down...");
|
||||
bots.forEach((b) => b.stop());
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
||||
await closePool();
|
||||
process.exit(0);
|
||||
};
|
||||
|
|
|
|||
70
src/api.js
70
src/api.js
|
|
@ -2,32 +2,16 @@ import { BigNumber } from "bignumber.js";
|
|||
import PQueue from "p-queue";
|
||||
import logger from "./logger.js";
|
||||
|
||||
const cache = new Map();
|
||||
const inflight = new Map();
|
||||
const queue = new PQueue({ interval: 1000, intervalCap: 10 });
|
||||
|
||||
const CACHE_TTL = 4500;
|
||||
|
||||
const queue = new PQueue({ interval: 1000, intervalCap: 15 });
|
||||
|
||||
export async function fetchRate(pair, forceRefresh = false) {
|
||||
export async function fetchRate(pair) {
|
||||
const key = pair.toUpperCase();
|
||||
|
||||
if (!forceRefresh) {
|
||||
const entry = cache.get(key);
|
||||
if (entry && Date.now() - entry.ts < CACHE_TTL) {
|
||||
return entry.price;
|
||||
}
|
||||
}
|
||||
|
||||
if (inflight.has(key)) {
|
||||
return inflight.get(key);
|
||||
}
|
||||
|
||||
const taskPromise = queue.add(async () => {
|
||||
return queue.add(async () => {
|
||||
for (let attempt = 1; attempt <= 3; attempt++) {
|
||||
try {
|
||||
const controller = new AbortController();
|
||||
const timeout = setTimeout(() => controller.abort(), 8000);
|
||||
const timeout = setTimeout(() => controller.abort(), 5000);
|
||||
|
||||
const res = await fetch(`https://api.uphold.com/v0/ticker/${key}`, {
|
||||
signal: controller.signal,
|
||||
|
|
@ -35,12 +19,8 @@ export async function fetchRate(pair, forceRefresh = false) {
|
|||
|
||||
clearTimeout(timeout);
|
||||
|
||||
if (res.status === 429) {
|
||||
throw new Error("Rate limited");
|
||||
}
|
||||
if (!res.ok) {
|
||||
throw new Error(`HTTP ${res.status}`);
|
||||
}
|
||||
if (res.status === 429) throw new Error("Rate limited");
|
||||
if (!res.ok) throw new Error(`HTTP ${res.status}`);
|
||||
|
||||
const data = await res.json();
|
||||
|
||||
|
|
@ -48,46 +28,22 @@ export async function fetchRate(pair, forceRefresh = false) {
|
|||
throw new Error("Invalid ask price");
|
||||
}
|
||||
|
||||
const price = new BigNumber(data.ask);
|
||||
cache.set(key, { price, ts: Date.now() });
|
||||
return price;
|
||||
return new BigNumber(data.ask);
|
||||
} catch (err) {
|
||||
const isLastAttempt = attempt === 3;
|
||||
|
||||
if (!isLastAttempt) {
|
||||
logger.warn(
|
||||
{ pair: key, attempt, error: err.message },
|
||||
"Fetch failed, retrying...",
|
||||
);
|
||||
|
||||
await new Promise((r) =>
|
||||
setTimeout(r, 1000 * Math.pow(2, attempt - 1)),
|
||||
);
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
clearTimeout(timeout);
|
||||
if (attempt === 3) throw err;
|
||||
await new Promise((r) => setTimeout(r, 500 * Math.pow(2, attempt - 1)));
|
||||
}
|
||||
}
|
||||
});
|
||||
inflight.set(key, taskPromise);
|
||||
|
||||
taskPromise
|
||||
.finally(() => {
|
||||
inflight.delete(key);
|
||||
})
|
||||
.catch(() => {});
|
||||
|
||||
return taskPromise;
|
||||
}
|
||||
|
||||
export async function prefetchRates(pairs) {
|
||||
logger.info({ pairs }, "Populating cache...");
|
||||
logger.info({ pairs }, "Verifying pairs...");
|
||||
const results = await Promise.allSettled(pairs.map((p) => fetchRate(p)));
|
||||
|
||||
const failed = results.filter((r) => r.status === "rejected");
|
||||
if (failed.length > 0) {
|
||||
logger.error(
|
||||
{ count: failed.length },
|
||||
"Some currencies failed to prefetch, have to wati for loop",
|
||||
);
|
||||
logger.warn(`[API] ${failed.length} pairs failed initial check.`);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ export class Bot {
|
|||
if (this.running) return;
|
||||
this.running = true;
|
||||
logger.info(
|
||||
`[${this.pair}] Monitoring started (${this.interval}ms, +/-${this.threshold}%)`,
|
||||
`[${this.pair}] Monitoring every ${this.interval}ms, +/-${this.threshold}%`,
|
||||
);
|
||||
this.loop();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,9 +64,8 @@ export async function initDB() {
|
|||
} catch (err) {
|
||||
if (i === maxRetries - 1) {
|
||||
logger.error(
|
||||
`[DB] Could not connect: ${err.message}. Continuing without DB.`,
|
||||
`[DB] Could not connect: ${err.message}. Running in memory`,
|
||||
);
|
||||
// Lets not try that again
|
||||
pool = null;
|
||||
return;
|
||||
}
|
||||
|
|
@ -107,7 +106,6 @@ export async function insertIntoDB(data) {
|
|||
await getPool().query(query, values);
|
||||
logger.info(`[DB] Event saved for ${data.pair}`);
|
||||
} catch (err) {
|
||||
// Re-throw so the caller knows there was a failure
|
||||
logger.error(`[DB] Failed to save alert: ${err.message}`);
|
||||
throw err;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ vi.mock("../src/logger.js", () => ({
|
|||
},
|
||||
}));
|
||||
|
||||
describe("Bot Core Functionality", () => {
|
||||
describe("Bot", () => {
|
||||
const CONFIG = {
|
||||
pair: "BTC-USD",
|
||||
interval: 100,
|
||||
|
|
@ -111,33 +111,6 @@ describe("Bot Core Functionality", () => {
|
|||
expect(onAlertSpy).toHaveBeenCalledTimes(1);
|
||||
expect(onAlertSpy.mock.calls[0][0].direction).toBe("DOWN");
|
||||
});
|
||||
|
||||
it("should alert on large price swing", async () => {
|
||||
fetchRateMock.mockResolvedValue(new BigNumber("51000.00"));
|
||||
|
||||
await bot.loop();
|
||||
|
||||
expect(onAlertSpy).toHaveBeenCalledTimes(1);
|
||||
expect(onAlertSpy.mock.calls[0][0].percentChange).toBe("2.0000");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Consecutive Alerts Logic", () => {
|
||||
it("should base next alert on last ALERT price, not last CHECKED price", async () => {
|
||||
fetchRateMock.mockResolvedValue(new BigNumber("50000.00"));
|
||||
await bot.loop();
|
||||
|
||||
fetchRateMock.mockResolvedValue(new BigNumber("50003.00"));
|
||||
await bot.loop();
|
||||
expect(onAlertSpy).not.toHaveBeenCalled();
|
||||
|
||||
fetchRateMock.mockResolvedValue(new BigNumber("50005.00"));
|
||||
await bot.loop();
|
||||
|
||||
expect(onAlertSpy).toHaveBeenCalledTimes(1);
|
||||
expect(onAlertSpy.mock.calls[0][0].previousPrice).toBe("50000.000000");
|
||||
expect(onAlertSpy.mock.calls[0][0].newPrice).toBe("50005.000000");
|
||||
});
|
||||
});
|
||||
|
||||
describe("Error Handling", () => {
|
||||
|
|
@ -145,16 +118,6 @@ describe("Bot Core Functionality", () => {
|
|||
fetchRateMock.mockRejectedValue(new Error("Network error"));
|
||||
await expect(bot.loop()).resolves.not.toThrow();
|
||||
});
|
||||
|
||||
it("should handle alert callback errors", async () => {
|
||||
fetchRateMock.mockResolvedValue(new BigNumber("50000.00"));
|
||||
await bot.loop();
|
||||
|
||||
fetchRateMock.mockResolvedValue(new BigNumber("51000.00"));
|
||||
onAlertSpy.mockRejectedValue(new Error("DB Connection Lost"));
|
||||
|
||||
await expect(bot.loop()).resolves.not.toThrow();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
|
@ -165,6 +128,7 @@ describe("API", () => {
|
|||
vi.resetModules();
|
||||
nock.cleanAll();
|
||||
|
||||
// Import the actual module for these tests
|
||||
vi.doMock("../src/api.js", async () => {
|
||||
return await vi.importActual("../src/api.js");
|
||||
});
|
||||
|
|
@ -176,60 +140,26 @@ describe("API", () => {
|
|||
nock.cleanAll();
|
||||
});
|
||||
|
||||
describe("fetchRate Caching", () => {
|
||||
it("should cache successful responses", async () => {
|
||||
describe("fetchRate", () => {
|
||||
it("should return a BigNumber on success", async () => {
|
||||
const scope = nock("https://api.uphold.com")
|
||||
.get("/v0/ticker/BTC-USD")
|
||||
.reply(200, { ask: "60000" });
|
||||
|
||||
const price1 = await api.fetchRate("BTC-USD");
|
||||
expect(price1.toFixed(0)).toBe("60000");
|
||||
|
||||
const price2 = await api.fetchRate("BTC-USD");
|
||||
expect(price2.toFixed(0)).toBe("60000");
|
||||
|
||||
const price = await api.fetchRate("BTC-USD");
|
||||
expect(price).toBeInstanceOf(BigNumber);
|
||||
expect(price.toFixed(0)).toBe("60000");
|
||||
expect(scope.isDone()).toBe(true);
|
||||
});
|
||||
|
||||
it("should bypass cache if forceRefresh is true", async () => {
|
||||
it("should retry on failure and eventually success", async () => {
|
||||
const scope = nock("https://api.uphold.com")
|
||||
.get("/v0/ticker/BTC-USD")
|
||||
.times(2)
|
||||
.reply(200, { ask: "60000" });
|
||||
|
||||
await api.fetchRate("BTC-USD");
|
||||
await api.fetchRate("BTC-USD", true);
|
||||
|
||||
expect(scope.isDone()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Request Deduplication", () => {
|
||||
it("should deduplicate concurrent requests", async () => {
|
||||
const scope = nock("https://api.uphold.com")
|
||||
.replyWithError("Socket Hangup") // Fail 1
|
||||
.get("/v0/ticker/BTC-USD")
|
||||
.delay(100)
|
||||
.reply(200, { ask: "60000" });
|
||||
.reply(200, { ask: "60000" }); // Success 2
|
||||
|
||||
const p1 = api.fetchRate("BTC-USD", true);
|
||||
const p2 = api.fetchRate("BTC-USD", true);
|
||||
|
||||
const [r1, r2] = await Promise.all([p1, p2]);
|
||||
|
||||
expect(r1).toEqual(r2);
|
||||
expect(scope.isDone()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Retry Logic", () => {
|
||||
it("should retry on failure", async () => {
|
||||
const scope = nock("https://api.uphold.com")
|
||||
.get("/v0/ticker/BTC-USD")
|
||||
.replyWithError("Socket Hangup")
|
||||
.get("/v0/ticker/BTC-USD")
|
||||
.reply(200, { ask: "60000" });
|
||||
|
||||
const price = await api.fetchRate("BTC-USD", true);
|
||||
const price = await api.fetchRate("BTC-USD");
|
||||
expect(price.toFixed(0)).toBe("60000");
|
||||
expect(scope.isDone()).toBe(true);
|
||||
});
|
||||
|
|
@ -240,14 +170,19 @@ describe("API", () => {
|
|||
.times(3)
|
||||
.replyWithError("Persistent Fail");
|
||||
|
||||
try {
|
||||
await api.fetchRate("BTC-USD", true);
|
||||
expect.fail("Should have thrown error");
|
||||
} catch (err) {
|
||||
expect(err.message).toContain("Persistent Fail");
|
||||
}
|
||||
|
||||
await expect(api.fetchRate("BTC-USD")).rejects.toThrow("Persistent Fail");
|
||||
expect(scope.isDone()).toBe(true);
|
||||
}, 10000);
|
||||
});
|
||||
|
||||
it("should handle 429 Rate Limits by retrying and eventually throwing", async () => {
|
||||
const scope = nock("https://api.uphold.com")
|
||||
.get("/v0/ticker/BTC-USD")
|
||||
// TODO: Find better way to test, takes too long
|
||||
.times(3)
|
||||
.reply(429, { error: "Too Many Requests" });
|
||||
|
||||
await expect(api.fetchRate("BTC-USD")).rejects.toThrow("Rate limited");
|
||||
expect(scope.isDone()).toBe(true);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in a new issue