187 lines
5.9 KiB
JavaScript
187 lines
5.9 KiB
JavaScript
const DatabaseClient = require('./db');
|
|
const config = require('../config/config');
|
|
|
|
class SqliteQueue {
|
|
constructor() {
|
|
this.db = new DatabaseClient(config.db);
|
|
}
|
|
|
|
_serial(payload) {
|
|
return typeof payload === 'string' ? payload : JSON.stringify(payload);
|
|
}
|
|
|
|
async insertRaw(connector, payload) {
|
|
const content = this._serial(payload);
|
|
return this.db.run(
|
|
`INSERT INTO inbox_raw (connector, payload) VALUES (?, ?)`,
|
|
[connector, content]
|
|
);
|
|
}
|
|
|
|
async markRawParsed(id, status, error) {
|
|
await this.db.run(
|
|
`UPDATE inbox_raw SET status = ?, parse_status = ?, error = ? WHERE id = ?`,
|
|
[status, error ? 'error' : 'ok', error, id]
|
|
);
|
|
}
|
|
|
|
async insertOutbox(canonical, dedupeKey) {
|
|
const payload = this._serial(canonical);
|
|
try {
|
|
return await this.db.run(
|
|
`INSERT INTO outbox_result (canonical_payload, status, dedupe_key, next_attempt_at) VALUES (?, 'pending', ?, 0)`,
|
|
[payload, dedupeKey]
|
|
);
|
|
} catch (err) {
|
|
if (err.message && err.message.includes('UNIQUE constraint failed')) {
|
|
const existing = await this.db.get(`SELECT * FROM outbox_result WHERE dedupe_key = ?`, [dedupeKey]);
|
|
return { existing, duplicate: true };
|
|
}
|
|
throw err;
|
|
}
|
|
}
|
|
|
|
async fetchPending(batchSize) {
|
|
const now = Math.floor(Date.now() / 1000);
|
|
return this.db.all(
|
|
`SELECT * FROM outbox_result WHERE status IN ('pending','retrying') AND next_attempt_at <= ? ORDER BY next_attempt_at ASC LIMIT ?`,
|
|
[now, batchSize]
|
|
);
|
|
}
|
|
|
|
async claimPending(batchSize, workerId) {
|
|
const now = Math.floor(Date.now() / 1000);
|
|
const candidates = await this.db.all(
|
|
`SELECT id FROM outbox_result WHERE status IN ('pending','retrying') AND next_attempt_at <= ? ORDER BY next_attempt_at ASC LIMIT ?`,
|
|
[now, batchSize * 2]
|
|
);
|
|
const locked = [];
|
|
const stale = now - (config.worker.lockTTLSeconds || 60);
|
|
for (const candidate of candidates) {
|
|
const result = await this.db.run(
|
|
`UPDATE outbox_result SET locked_at = ?, locked_by = ? WHERE id = ? AND (locked_at IS NULL OR locked_at <= ?)`,
|
|
[now, workerId, candidate.id, stale]
|
|
);
|
|
if (result.changes === 1) {
|
|
const entry = await this.db.get(`SELECT * FROM outbox_result WHERE id = ?`, [candidate.id]);
|
|
locked.push(entry);
|
|
if (locked.length >= batchSize) break;
|
|
}
|
|
}
|
|
return locked;
|
|
}
|
|
|
|
async markOutboxStatus(id, status, { lastError = null, attempts = null, nextAttemptAt = null } = {}) {
|
|
const fields = ['status = ?'];
|
|
const params = [status];
|
|
if (lastError !== null) {
|
|
fields.push('last_error = ?');
|
|
params.push(lastError);
|
|
}
|
|
if (attempts !== null) {
|
|
fields.push('attempts = ?');
|
|
params.push(attempts);
|
|
}
|
|
if (nextAttemptAt !== null) {
|
|
fields.push('next_attempt_at = ?');
|
|
params.push(nextAttemptAt);
|
|
}
|
|
fields.push('locked_at = NULL', 'locked_by = NULL');
|
|
params.push(id);
|
|
await this.db.run(`UPDATE outbox_result SET ${fields.join(', ')} WHERE id = ?`, params);
|
|
}
|
|
|
|
async recordDeliveryAttempt({ outboxId, attempt, status, responseCode, responseBody, latency }) {
|
|
await this.db.run(
|
|
`INSERT INTO delivery_log (outbox_id, attempt, status, response_code, response_body, latency_ms) VALUES (?, ?, ?, ?, ?, ?)`,
|
|
[outboxId, attempt, status, responseCode, responseBody, latency]
|
|
);
|
|
}
|
|
|
|
async moveToDeadLetter(payload, reason) {
|
|
await this.db.run(
|
|
`INSERT INTO dead_letter (payload, reason) VALUES (?, ?)`,
|
|
[this._serial(payload), reason]
|
|
);
|
|
}
|
|
|
|
async pendingCount() {
|
|
const row = await this.db.get(`SELECT COUNT(*) as count FROM outbox_result WHERE status = 'pending'`);
|
|
return row ? row.count : 0;
|
|
}
|
|
|
|
async retryingCount() {
|
|
const row = await this.db.get(`SELECT COUNT(*) as count FROM outbox_result WHERE status = 'retrying'`);
|
|
return row ? row.count : 0;
|
|
}
|
|
|
|
async deadLetterCount() {
|
|
const row = await this.db.get(`SELECT COUNT(*) as count FROM dead_letter`);
|
|
return row ? row.count : 0;
|
|
}
|
|
|
|
async getLastSuccessTimestamp() {
|
|
const row = await this.db.get(
|
|
`SELECT MAX(created_at) as last_success FROM delivery_log WHERE status = 'success'`
|
|
);
|
|
return row?.last_success || null;
|
|
}
|
|
|
|
async getAverageLatency() {
|
|
const row = await this.db.get(`SELECT AVG(latency_ms) as avg_latency FROM delivery_log WHERE latency_ms IS NOT NULL`);
|
|
return row?.avg_latency || 0;
|
|
}
|
|
|
|
async getDeliveryAttempts() {
|
|
const row = await this.db.get(`SELECT COUNT(*) as total FROM delivery_log`);
|
|
return row ? row.total : 0;
|
|
}
|
|
|
|
async listOutbox({ status, limit = 50 } = {}) {
|
|
const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500));
|
|
const base = `SELECT id, status, attempts, next_attempt_at, last_error, created_at FROM outbox_result`;
|
|
if (status) {
|
|
return this.db.all(
|
|
`${base} WHERE status = ? ORDER BY id DESC LIMIT ?`,
|
|
[status, safeLimit]
|
|
);
|
|
}
|
|
return this.db.all(`${base} ORDER BY id DESC LIMIT ?`, [safeLimit]);
|
|
}
|
|
|
|
async listRecentDeliveryAttempts(limit = 50) {
|
|
const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500));
|
|
return this.db.all(
|
|
`SELECT l.id, l.outbox_id, l.attempt, l.status, l.response_code, l.response_body, l.latency_ms, l.created_at,
|
|
o.status AS outbox_status
|
|
FROM delivery_log l
|
|
LEFT JOIN outbox_result o ON o.id = l.outbox_id
|
|
ORDER BY l.id DESC
|
|
LIMIT ?`,
|
|
[safeLimit]
|
|
);
|
|
}
|
|
|
|
async listRecentDeadLetters(limit = 50) {
|
|
const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500));
|
|
return this.db.all(
|
|
`SELECT id, payload, reason, created_at
|
|
FROM dead_letter
|
|
ORDER BY id DESC
|
|
LIMIT ?`,
|
|
[safeLimit]
|
|
);
|
|
}
|
|
|
|
async ping() {
|
|
await this.db.get('SELECT 1 as ok');
|
|
return true;
|
|
}
|
|
|
|
async close() {
|
|
await this.db.close();
|
|
}
|
|
}
|
|
|
|
module.exports = new SqliteQueue();
|