tinylink/core/queue/queue.js

187 lines
5.9 KiB
JavaScript

const DatabaseClient = require('./db');
const config = require('../config/config');
class SqliteQueue {
constructor() {
this.db = new DatabaseClient(config.db);
}
_serial(payload) {
return typeof payload === 'string' ? payload : JSON.stringify(payload);
}
async insertRaw(connector, payload) {
const content = this._serial(payload);
return this.db.run(
`INSERT INTO inbox_raw (connector, payload) VALUES (?, ?)`,
[connector, content]
);
}
async markRawParsed(id, status, error) {
await this.db.run(
`UPDATE inbox_raw SET status = ?, parse_status = ?, error = ? WHERE id = ?`,
[status, error ? 'error' : 'ok', error, id]
);
}
async insertOutbox(canonical, dedupeKey) {
const payload = this._serial(canonical);
try {
return await this.db.run(
`INSERT INTO outbox_result (canonical_payload, status, dedupe_key, next_attempt_at) VALUES (?, 'pending', ?, 0)`,
[payload, dedupeKey]
);
} catch (err) {
if (err.message && err.message.includes('UNIQUE constraint failed')) {
const existing = await this.db.get(`SELECT * FROM outbox_result WHERE dedupe_key = ?`, [dedupeKey]);
return { existing, duplicate: true };
}
throw err;
}
}
async fetchPending(batchSize) {
const now = Math.floor(Date.now() / 1000);
return this.db.all(
`SELECT * FROM outbox_result WHERE status IN ('pending','retrying') AND next_attempt_at <= ? ORDER BY next_attempt_at ASC LIMIT ?`,
[now, batchSize]
);
}
async claimPending(batchSize, workerId) {
const now = Math.floor(Date.now() / 1000);
const candidates = await this.db.all(
`SELECT id FROM outbox_result WHERE status IN ('pending','retrying') AND next_attempt_at <= ? ORDER BY next_attempt_at ASC LIMIT ?`,
[now, batchSize * 2]
);
const locked = [];
const stale = now - (config.worker.lockTTLSeconds || 60);
for (const candidate of candidates) {
const result = await this.db.run(
`UPDATE outbox_result SET locked_at = ?, locked_by = ? WHERE id = ? AND (locked_at IS NULL OR locked_at <= ?)`,
[now, workerId, candidate.id, stale]
);
if (result.changes === 1) {
const entry = await this.db.get(`SELECT * FROM outbox_result WHERE id = ?`, [candidate.id]);
locked.push(entry);
if (locked.length >= batchSize) break;
}
}
return locked;
}
async markOutboxStatus(id, status, { lastError = null, attempts = null, nextAttemptAt = null } = {}) {
const fields = ['status = ?'];
const params = [status];
if (lastError !== null) {
fields.push('last_error = ?');
params.push(lastError);
}
if (attempts !== null) {
fields.push('attempts = ?');
params.push(attempts);
}
if (nextAttemptAt !== null) {
fields.push('next_attempt_at = ?');
params.push(nextAttemptAt);
}
fields.push('locked_at = NULL', 'locked_by = NULL');
params.push(id);
await this.db.run(`UPDATE outbox_result SET ${fields.join(', ')} WHERE id = ?`, params);
}
async recordDeliveryAttempt({ outboxId, attempt, status, responseCode, responseBody, latency }) {
await this.db.run(
`INSERT INTO delivery_log (outbox_id, attempt, status, response_code, response_body, latency_ms) VALUES (?, ?, ?, ?, ?, ?)`,
[outboxId, attempt, status, responseCode, responseBody, latency]
);
}
async moveToDeadLetter(payload, reason) {
await this.db.run(
`INSERT INTO dead_letter (payload, reason) VALUES (?, ?)`,
[this._serial(payload), reason]
);
}
async pendingCount() {
const row = await this.db.get(`SELECT COUNT(*) as count FROM outbox_result WHERE status = 'pending'`);
return row ? row.count : 0;
}
async retryingCount() {
const row = await this.db.get(`SELECT COUNT(*) as count FROM outbox_result WHERE status = 'retrying'`);
return row ? row.count : 0;
}
async deadLetterCount() {
const row = await this.db.get(`SELECT COUNT(*) as count FROM dead_letter`);
return row ? row.count : 0;
}
async getLastSuccessTimestamp() {
const row = await this.db.get(
`SELECT MAX(created_at) as last_success FROM delivery_log WHERE status = 'success'`
);
return row?.last_success || null;
}
async getAverageLatency() {
const row = await this.db.get(`SELECT AVG(latency_ms) as avg_latency FROM delivery_log WHERE latency_ms IS NOT NULL`);
return row?.avg_latency || 0;
}
async getDeliveryAttempts() {
const row = await this.db.get(`SELECT COUNT(*) as total FROM delivery_log`);
return row ? row.total : 0;
}
async listOutbox({ status, limit = 50 } = {}) {
const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500));
const base = `SELECT id, status, attempts, next_attempt_at, last_error, created_at FROM outbox_result`;
if (status) {
return this.db.all(
`${base} WHERE status = ? ORDER BY id DESC LIMIT ?`,
[status, safeLimit]
);
}
return this.db.all(`${base} ORDER BY id DESC LIMIT ?`, [safeLimit]);
}
async listRecentDeliveryAttempts(limit = 50) {
const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500));
return this.db.all(
`SELECT l.id, l.outbox_id, l.attempt, l.status, l.response_code, l.response_body, l.latency_ms, l.created_at,
o.status AS outbox_status
FROM delivery_log l
LEFT JOIN outbox_result o ON o.id = l.outbox_id
ORDER BY l.id DESC
LIMIT ?`,
[safeLimit]
);
}
async listRecentDeadLetters(limit = 50) {
const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500));
return this.db.all(
`SELECT id, payload, reason, created_at
FROM dead_letter
ORDER BY id DESC
LIMIT ?`,
[safeLimit]
);
}
async ping() {
await this.db.get('SELECT 1 as ok');
return true;
}
async close() {
await this.db.close();
}
}
module.exports = new SqliteQueue();