diff --git a/.gitignore b/.gitignore index 40b878d..902b281 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -node_modules/ \ No newline at end of file +node_modules/ +data/ diff --git a/config/app.yaml b/config/app.yaml index acd816a..b5ff531 100644 --- a/config/app.yaml +++ b/config/app.yaml @@ -1,31 +1,14 @@ host: - url: http://localhost/clqms01/api/results + url: apikey: "" inst1: enabled: true connector: type: serial - port: COM1 + port: /dev/pts/2 baudRate: 9600 dataBits: 8 stopBits: 1 parity: none - config: - location: default-lab - communication_mode: unidirectional - note: ASTM instrument over serial COM - translator: - parser: astm - engine: template - file: config/translators/inst1.map - messages: - - HEADER - - PATIENT - - ORDER - - TERMINATOR - forceInstrumentId: true - meta: - translator: msg1 - connection: serial - direction: mono + translator: jokoh diff --git a/config/jokoh.map b/config/jokoh.map new file mode 100644 index 0000000..191a21e --- /dev/null +++ b/config/jokoh.map @@ -0,0 +1,8 @@ +# RESULT +<< +,{datetime:14:""},{skip:5:""},{sampleid:10:""},{skip:2,""},{sampletype:2:""}, +@for i in 1..3 +({testid:2:""},{result:5:""},{skip:2:""}) +@end +, +>> \ No newline at end of file diff --git a/config/translators/inst1.map b/config/translators/inst1.map deleted file mode 100644 index 07b9f73..0000000 --- a/config/translators/inst1.map +++ /dev/null @@ -1,5 +0,0 @@ -# TinyLink clean-room map template -HEADER = H|\^&|||WST^P1|||||{instrument_id}^System1||P|1|{specimen_id} -PATIENT = P|{patient_id}|{sample_id}|||{last_name}^{first_name}||{birth_date}|{sex}|||||{doctor}| -ORDER = O|1|{sample_id}||{order_tests}||||||{specimen_type}||||{tube_type}||||||||||O| -TERMINATOR = L|1|N diff --git a/core/app.js b/core/app.js index 2941a43..a093d3c 100644 --- a/core/app.js +++ b/core/app.js @@ -1,84 +1 @@ -const config = require('./config/config'); -const logger = require('./logger'); -const migrate = require('./maintenance/migrate'); -const { createHttpJsonConnector } = require('./connectors/httpJson'); -const { createHl7TcpConnector } = require('./connectors/tcp'); -const { createAstmSerialConnector } = require('./connectors/serial'); -const { processMessage } = require('./pipeline/pipeline'); -const { startWorker, stopWorker } = require('./worker/worker'); -const instrumentConfig = require('./config/instrumentConfig'); -const { createHttpServer } = require('./http'); - -const connectorFactories = { - 'http-json': createHttpJsonConnector, - 'hl7-tcp': createHl7TcpConnector, - 'astm-serial': createAstmSerialConnector -}; - -function buildConnectors() { - return instrumentConfig.list() - .filter((entry) => entry.enabled) - .map((entry) => { - const createConnector = connectorFactories[entry.connector]; - if (!createConnector) { - logger.warn({ connector: entry.connector, instrument_id: entry.instrument_id }, 'unknown connector in instrument config, skipping startup'); - return null; - } - return createConnector({ - ...(entry.connectorConfig || {}), - instrument_id: entry.instrument_id - }); - }) - .filter(Boolean); -} - -function attachConnectorHandlers(connectors) { - connectors.forEach((connector) => { - connector.onMessage(async (incoming) => { - try { - const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload') - ? incoming.payload - : incoming; - const context = incoming && incoming.context ? incoming.context : {}; - await processMessage(connector.name(), payload, context); - } catch (err) { - logger.error({ err: err.message, connector: connector.name() }, 'pipeline error'); - } - }); - connector.onError((err) => { - logger.error({ err: err.message }, `${connector.name()} emitted error`); - }); - }); -} - -async function start() { - instrumentConfig.validateAndLoadInstrumentConfigs(); - await migrate(); - await instrumentConfig.init(); - - const connectors = buildConnectors(); - if (!connectors.length) { - logger.warn('no enabled connectors configured, ingestion listeners are disabled'); - } - - attachConnectorHandlers(connectors); - await Promise.all(connectors.map((connector) => connector.start())); - await startWorker(); - - const app = createHttpServer(connectors); - const server = app.listen(config.healthPort, () => { - logger.info({ port: config.healthPort }, 'health server ready'); - }); - - async function shutdown() { - await stopWorker(); - await Promise.all(connectors.map((connector) => connector.stop())); - if (server) { - await new Promise((resolve) => server.close(resolve)); - } - } - - return { connectors, server, shutdown }; -} - -module.exports = { start }; +module.exports = require('./runtime/startup'); diff --git a/core/config/instrumentConfig.js b/core/config/instrumentConfig.js index acd1d30..77ffc1f 100644 --- a/core/config/instrumentConfig.js +++ b/core/config/instrumentConfig.js @@ -1,7 +1,7 @@ const fs = require('fs'); const path = require('path'); const config = require('./config'); -const logger = require('../logger'); +const logger = require('../util/logger'); let cache = new Map(); let refreshInterval; @@ -22,6 +22,11 @@ function resolveTranslatorFilePath(filePath, configFilePath) { return matched || candidates[0]; } +function defaultTranslatorFile(instrumentId) { + if (!instrumentId || typeof instrumentId !== 'string') return ''; + return path.join('config', `${instrumentId}.map`); +} + function normalizeConnectorType(type) { const value = String(type || '').trim().toLowerCase(); if (value === 'serial' || value === 'astm-serial') return 'astm-serial'; @@ -107,7 +112,17 @@ function validateAndLoadInstrumentConfigs({ continue; } - if (!translator || typeof translator !== 'object' || Array.isArray(translator)) { + if (typeof translator === 'string') { + const translatorName = translator.trim(); + if (!translatorName) { + errors.push(`${label}: translator name cannot be empty`); + continue; + } + item.translator = { + engine: 'template', + file: defaultTranslatorFile(translatorName) + }; + } else if (!translator || typeof translator !== 'object' || Array.isArray(translator)) { item.translator = {}; } @@ -120,15 +135,19 @@ function validateAndLoadInstrumentConfigs({ const translatorEngine = String(resolvedTranslator.engine || 'overrides').trim().toLowerCase(); if (translatorEngine === 'template') { - if (!resolvedTranslator.file || typeof resolvedTranslator.file !== 'string') { - errors.push(`${label}: translator.file is required when translator.engine=template`); + const configuredFile = typeof resolvedTranslator.file === 'string' && resolvedTranslator.file.trim() + ? resolvedTranslator.file.trim() + : defaultTranslatorFile(instrumentId); + if (!configuredFile) { + errors.push(`${label}: translator.file could not be resolved`); continue; } - const resolvedTranslatorFilePath = resolveTranslatorFilePath(resolvedTranslator.file, configFilePath); + const resolvedTranslatorFilePath = resolveTranslatorFilePath(configuredFile, configFilePath); if (!fs.existsSync(resolvedTranslatorFilePath)) { - errors.push(`${label}: translator.file not found: ${resolvedTranslator.file}`); + errors.push(`${label}: translator.file not found: ${configuredFile}`); continue; } + resolvedTranslator.file = configuredFile; resolvedTranslator.resolvedFile = resolvedTranslatorFilePath; } diff --git a/core/worker/client.js b/core/host/resultClient.js similarity index 99% rename from core/worker/client.js rename to core/host/resultClient.js index bdf7a62..08623ca 100644 --- a/core/worker/client.js +++ b/core/host/resultClient.js @@ -7,9 +7,11 @@ async function deliver(payload) { const headers = { 'content-type': 'application/json' }; + if (config.clqms.token) { headers.authorization = `Bearer ${config.clqms.token}`; } + const response = await request(config.clqms.url, { method: 'POST', headers, @@ -17,6 +19,7 @@ async function deliver(payload) { keepaliveTimeout: 0, bodyTimeout: config.clqms.timeout }); + const latency = Date.now() - start; const responseBody = await response.body.text(); return { diff --git a/core/worker/worker.js b/core/host/resultService.js similarity index 77% rename from core/worker/worker.js rename to core/host/resultService.js index 672592d..cb8acf3 100644 --- a/core/worker/worker.js +++ b/core/host/resultService.js @@ -1,12 +1,6 @@ const queue = require('../queue/queue'); -const client = require('./client'); -const logger = require('../logger'); const config = require('../config/config'); - -let running = false; -let workerPromise; - -const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); +const client = require('./resultClient'); const transientCodes = new Set(['ECONNRESET', 'ETIMEDOUT', 'ECONNREFUSED', 'EAI_AGAIN', 'ENETUNREACH']); @@ -24,7 +18,7 @@ function isTransientError(err) { return false; } -async function handleEntry(entry) { +async function deliverOutboxEntry(entry) { const payload = JSON.parse(entry.canonical_payload); const attemptNumber = entry.attempts + 1; let response; @@ -97,6 +91,7 @@ async function handleEntry(entry) { await queue.moveToDeadLetter(payload, error.message); return; } + const nextAttemptAt = buildNextAttempt(attemptNumber); await queue.markOutboxStatus(entry.id, 'retrying', { attempts: attemptNumber, @@ -106,35 +101,4 @@ async function handleEntry(entry) { } } -async function loop() { - while (running) { - try { - const batch = await queue.claimPending(config.worker.batchSize, config.worker.workerId); - if (!batch.length) { - await sleep(config.worker.pollInterval); - continue; - } - for (const entry of batch) { - await handleEntry(entry); - } - } catch (err) { - logger.error({ err: err.message }, 'delivery worker error'); - await sleep(config.worker.pollInterval); - } - } -} - -async function startWorker() { - if (running) return; - running = true; - workerPromise = loop(); -} - -async function stopWorker() { - running = false; - if (workerPromise) { - await workerPromise; - } -} - -module.exports = { startWorker, stopWorker }; +module.exports = { deliverOutboxEntry }; diff --git a/core/http.js b/core/http.js index 942be83..ca21ee8 100644 --- a/core/http.js +++ b/core/http.js @@ -1,94 +1 @@ -const express = require('express'); -const queue = require('./queue/queue'); -const instrumentConfig = require('./config/instrumentConfig'); - -function createHealthRouter(connectors = []) { - const router = express.Router(); - - router.get('/', async (req, res) => { - const connectorStatuses = connectors.map((connector) => connector.health()); - const pending = await queue.pendingCount(); - const retrying = await queue.retryingCount(); - const deadLetters = await queue.deadLetterCount(); - res.json({ - status: 'ok', - connectors: connectorStatuses, - metrics: { - pending, - retrying, - deadLetters - } - }); - }); - - router.get('/ready', async (req, res) => { - try { - await queue.ping(); - res.json({ status: 'ready' }); - } catch (err) { - res.status(503).json({ status: 'unready', reason: err.message }); - } - }); - - return router; -} - -const instrumentRouter = express.Router(); - -instrumentRouter.get('/', async (req, res) => { - res.json(instrumentConfig.list()); -}); - -instrumentRouter.get('/:id', async (req, res) => { - const entry = instrumentConfig.get(req.params.id); - if (!entry) { - return res.status(404).json({ error: 'not found' }); - } - res.json(entry); -}); - -const metricsRouter = express.Router(); - -function formatMetric(name, value, type = 'gauge', help = '') { - const lines = []; - if (help) { - lines.push(`# HELP ${name} ${help}`); - } - lines.push(`# TYPE ${name} ${type}`); - lines.push(`${name} ${value}`); - return lines.join('\n'); -} - -metricsRouter.get('/', async (req, res) => { - try { - const pending = await queue.pendingCount(); - const retrying = await queue.retryingCount(); - const deadLetters = await queue.deadLetterCount(); - const lastSuccess = await queue.getLastSuccessTimestamp(); - const avgLatency = await queue.getAverageLatency(); - const attempts = await queue.getDeliveryAttempts(); - const timestamp = lastSuccess ? new Date(lastSuccess).getTime() / 1000 : 0; - const metrics = [ - formatMetric('workstation_pending_total', pending, 'gauge', 'Number of pending payloads'), - formatMetric('workstation_retrying_total', retrying, 'gauge', 'Number of payloads currently retrying'), - formatMetric('workstation_dead_letters_total', deadLetters, 'gauge', 'Total dead-lettered payloads'), - formatMetric('workstation_delivery_attempts_total', attempts, 'counter', 'Total delivery attempts logged'), - formatMetric('workstation_last_success_timestamp', timestamp, 'gauge', 'Epoch seconds of last successful delivery'), - formatMetric('workstation_avg_latency_ms', Math.round(avgLatency), 'gauge', 'Average delivery latency in milliseconds') - ]; - res.set('content-type', 'text/plain; version=0.0.4; charset=utf-8'); - res.send(metrics.join('\n')); - } catch (error) { - res.status(500).send('metrics unavailable'); - } -}); - -function createHttpServer(connectors) { - const app = express(); - app.use('/health', createHealthRouter(connectors)); - app.use('/instruments', instrumentRouter); - app.use('/metrics', metricsRouter); - return app; -} - -module.exports = { createHttpServer }; +module.exports = require('./http/index'); diff --git a/core/http/dashboard/page.js b/core/http/dashboard/page.js new file mode 100644 index 0000000..f4d26ab --- /dev/null +++ b/core/http/dashboard/page.js @@ -0,0 +1,237 @@ +function renderDashboardPage() { + return ` + + + + + TinyLink Dashboard + + + +
+
+

TinyLink Dashboard

+
loading...
+
+ +
+

Pending

-
+

Retrying

-
+

Dead Letters

-
+

Delivery Attempts

-
+

Avg Latency (ms)

-
+
+ +
+
+

Instrument Connectors

+ + + +
InstrumentConnectorStatusAddress
loading...
+
+
+

Queue Tail

+ + + +
IDStatusAttemptsNext AttemptError
loading...
+
+
+ +
+

Recent Delivery Attempts

+ + + +
TimeOutboxAttemptStatusHTTPLatencyBody
loading...
+
+
+ + + +`; +} + +module.exports = { + renderDashboardPage +}; diff --git a/core/http/dashboardRouter.js b/core/http/dashboardRouter.js new file mode 100644 index 0000000..ade6a88 --- /dev/null +++ b/core/http/dashboardRouter.js @@ -0,0 +1,79 @@ +const express = require('express'); +const queue = require('../queue/queue'); +const { renderDashboardPage } = require('./dashboard/page'); + +function toIsoFromEpochSeconds(epochSeconds) { + if (epochSeconds === null || epochSeconds === undefined) return null; + const value = Number(epochSeconds); + if (!Number.isFinite(value) || value <= 0) return null; + return new Date(value * 1000).toISOString(); +} + +function createDashboardRouter(connectors = []) { + const router = express.Router(); + + router.get('/', (req, res) => { + res.type('html').send(renderDashboardPage()); + }); + + router.get('/api/summary', async (req, res) => { + const [pending, retrying, deadLetters, attempts, avgLatency] = await Promise.all([ + queue.pendingCount(), + queue.retryingCount(), + queue.deadLetterCount(), + queue.getDeliveryAttempts(), + queue.getAverageLatency() + ]); + + res.json({ + status: 'ok', + metrics: { + pending, + retrying, + deadLetters, + attempts, + avgLatency + } + }); + }); + + router.get('/api/queue', async (req, res) => { + const limit = Number(req.query.limit || 20); + const rows = await queue.listOutbox({ limit }); + res.json({ + status: 'ok', + items: rows.map((row) => ({ + ...row, + next_attempt_at_iso: toIsoFromEpochSeconds(row.next_attempt_at) + })) + }); + }); + + router.get('/api/instruments', async (req, res) => { + const items = connectors.map((connector) => { + const status = connector.health(); + const address = status.comPort + || status.port + || [status.remoteAddress, status.remotePort].filter(Boolean).join(':') + || '-'; + return { + instrument_id: status.instrument_id || null, + connector: connector.type(), + status: status.status, + address + }; + }); + + res.json({ status: 'ok', items }); + }); + + router.get('/api/recent', async (req, res) => { + const limit = Number(req.query.limit || 20); + const items = await queue.listRecentDeliveryAttempts(limit); + res.json({ status: 'ok', items }); + }); + + return router; +} + +module.exports = { createDashboardRouter }; diff --git a/core/http/healthRouter.js b/core/http/healthRouter.js new file mode 100644 index 0000000..7ea7735 --- /dev/null +++ b/core/http/healthRouter.js @@ -0,0 +1,35 @@ +const express = require('express'); +const queue = require('../queue/queue'); + +function createHealthRouter(connectors = []) { + const router = express.Router(); + + router.get('/', async (req, res) => { + const connectorStatuses = connectors.map((connector) => connector.health()); + const pending = await queue.pendingCount(); + const retrying = await queue.retryingCount(); + const deadLetters = await queue.deadLetterCount(); + res.json({ + status: 'ok', + connectors: connectorStatuses, + metrics: { + pending, + retrying, + deadLetters + } + }); + }); + + router.get('/ready', async (req, res) => { + try { + await queue.ping(); + res.json({ status: 'ready' }); + } catch (err) { + res.status(503).json({ status: 'unready', reason: err.message }); + } + }); + + return router; +} + +module.exports = { createHealthRouter }; diff --git a/core/http/index.js b/core/http/index.js new file mode 100644 index 0000000..1c5eec0 --- /dev/null +++ b/core/http/index.js @@ -0,0 +1,16 @@ +const express = require('express'); +const { createHealthRouter } = require('./healthRouter'); +const { createMetricsRouter } = require('./metricsRouter'); +const { createInstrumentRouter } = require('./instrumentRouter'); +const { createDashboardRouter } = require('./dashboardRouter'); + +function createHttpServer(connectors = []) { + const app = express(); + app.use('/health', createHealthRouter(connectors)); + app.use('/metrics', createMetricsRouter()); + app.use('/instruments', createInstrumentRouter()); + app.use('/dashboard', createDashboardRouter(connectors)); + return app; +} + +module.exports = { createHttpServer }; diff --git a/core/http/instrumentRouter.js b/core/http/instrumentRouter.js new file mode 100644 index 0000000..1ec287d --- /dev/null +++ b/core/http/instrumentRouter.js @@ -0,0 +1,22 @@ +const express = require('express'); +const instrumentConfig = require('../config/instrumentConfig'); + +function createInstrumentRouter() { + const router = express.Router(); + + router.get('/', async (req, res) => { + res.json(instrumentConfig.list()); + }); + + router.get('/:id', async (req, res) => { + const entry = instrumentConfig.get(req.params.id); + if (!entry) { + return res.status(404).json({ error: 'not found' }); + } + res.json(entry); + }); + + return router; +} + +module.exports = { createInstrumentRouter }; diff --git a/core/http/metricsRouter.js b/core/http/metricsRouter.js new file mode 100644 index 0000000..369aa69 --- /dev/null +++ b/core/http/metricsRouter.js @@ -0,0 +1,44 @@ +const express = require('express'); +const queue = require('../queue/queue'); + +function formatMetric(name, value, type = 'gauge', help = '') { + const lines = []; + if (help) { + lines.push(`# HELP ${name} ${help}`); + } + lines.push(`# TYPE ${name} ${type}`); + lines.push(`${name} ${value}`); + return lines.join('\n'); +} + +function createMetricsRouter() { + const router = express.Router(); + + router.get('/', async (req, res) => { + try { + const pending = await queue.pendingCount(); + const retrying = await queue.retryingCount(); + const deadLetters = await queue.deadLetterCount(); + const lastSuccess = await queue.getLastSuccessTimestamp(); + const avgLatency = await queue.getAverageLatency(); + const attempts = await queue.getDeliveryAttempts(); + const timestamp = lastSuccess ? new Date(lastSuccess).getTime() / 1000 : 0; + const metrics = [ + formatMetric('workstation_pending_total', pending, 'gauge', 'Number of pending payloads'), + formatMetric('workstation_retrying_total', retrying, 'gauge', 'Number of payloads currently retrying'), + formatMetric('workstation_dead_letters_total', deadLetters, 'gauge', 'Total dead-lettered payloads'), + formatMetric('workstation_delivery_attempts_total', attempts, 'counter', 'Total delivery attempts logged'), + formatMetric('workstation_last_success_timestamp', timestamp, 'gauge', 'Epoch seconds of last successful delivery'), + formatMetric('workstation_avg_latency_ms', Math.round(avgLatency), 'gauge', 'Average delivery latency in milliseconds') + ]; + res.set('content-type', 'text/plain; version=0.0.4; charset=utf-8'); + res.send(metrics.join('\n')); + } catch (error) { + res.status(500).send('metrics unavailable'); + } + }); + + return router; +} + +module.exports = { createMetricsRouter }; diff --git a/core/index.js b/core/index.js index 68f385c..de6f7f9 100644 --- a/core/index.js +++ b/core/index.js @@ -1,4 +1,4 @@ -const logger = require('./logger'); +const logger = require('./util/logger'); const { start } = require('./app'); async function bootstrap() { diff --git a/core/connectors/httpJson.js b/core/instrument/connectors/httpJson.js similarity index 95% rename from core/connectors/httpJson.js rename to core/instrument/connectors/httpJson.js index 18b63dc..c030999 100644 --- a/core/connectors/httpJson.js +++ b/core/instrument/connectors/httpJson.js @@ -1,6 +1,6 @@ const express = require('express'); -const config = require('../config/config'); -const logger = require('../logger'); +const config = require('../../config/config'); +const logger = require('../../util/logger'); function createHttpJsonConnector(options = {}) { let server; diff --git a/core/connectors/serial.js b/core/instrument/connectors/serial.js similarity index 96% rename from core/connectors/serial.js rename to core/instrument/connectors/serial.js index 3b3ad16..563d452 100644 --- a/core/connectors/serial.js +++ b/core/instrument/connectors/serial.js @@ -1,6 +1,6 @@ const { SerialPort } = require('serialport'); -const config = require('../config/config'); -const logger = require('../logger'); +const config = require('../../config/config'); +const logger = require('../../util/logger'); function createAstmSerialConnector(options = {}) { let port; diff --git a/core/connectors/tcp.js b/core/instrument/connectors/tcp.js similarity index 95% rename from core/connectors/tcp.js rename to core/instrument/connectors/tcp.js index 748cf69..0cbe66b 100644 --- a/core/connectors/tcp.js +++ b/core/instrument/connectors/tcp.js @@ -1,6 +1,6 @@ const net = require('net'); -const config = require('../config/config'); -const logger = require('../logger'); +const config = require('../../config/config'); +const logger = require('../../util/logger'); function createHl7TcpConnector(options = {}) { let server; diff --git a/core/instrument/ingest.js b/core/instrument/ingest.js new file mode 100644 index 0000000..dc70251 --- /dev/null +++ b/core/instrument/ingest.js @@ -0,0 +1,25 @@ +const logger = require('../util/logger'); +const { processMessage } = require('../pipeline/pipeline'); + +async function ingestIncomingMessage(connectorName, incoming) { + const payload = incoming && Object.prototype.hasOwnProperty.call(incoming, 'payload') + ? incoming.payload + : incoming; + const context = incoming && incoming.context ? incoming.context : {}; + return processMessage(connectorName, payload, context); +} + +function createIngestMessageHandler(connector) { + return async (incoming) => { + try { + await ingestIncomingMessage(connector.name(), incoming); + } catch (err) { + logger.error({ err: err.message, connector: connector.name() }, 'pipeline error'); + } + }; +} + +module.exports = { + ingestIncomingMessage, + createIngestMessageHandler +}; diff --git a/core/pipeline/pipeline.js b/core/pipeline/pipeline.js index a95fe3f..1c93354 100644 --- a/core/pipeline/pipeline.js +++ b/core/pipeline/pipeline.js @@ -1,5 +1,5 @@ const queue = require('../queue/queue'); -const logger = require('../logger'); +const logger = require('../util/logger'); const { normalize } = require('./normalizer'); const { dedupeKey } = require('./hash'); const instrumentConfig = require('../config/instrumentConfig'); diff --git a/core/pipeline/translator.js b/core/pipeline/translator.js index 080c9fd..dec359a 100644 --- a/core/pipeline/translator.js +++ b/core/pipeline/translator.js @@ -3,6 +3,18 @@ const path = require('path'); const mapCache = new Map(); +const CONTROL_TOKEN_MAP = { + VT: '\u000b', + FS: '\u001c', + STX: '\u0002', + ETX: '\u0003', + CR: '\r', + LF: '\n' +}; + +const SELECTOR_PATTERN = /^([A-Za-z][A-Za-z0-9_]*)\[(\d+)(?:\.(\d+))?\]$/; +const FIXED_WIDTH_DIRECTIVE_PATTERN = /^([A-Za-z][A-Za-z0-9_]*):(\d+)(?::(?:"([^"]*)"|'([^']*)'))?$/; + function buildCanonical(entry, parsedPayload, connector) { const translator = entry && typeof entry.translator === 'object' ? entry.translator : {}; const canonical = { ...parsedPayload }; @@ -36,12 +48,17 @@ function resolveTranslatorFilePath(filePath, configFilePath) { function parseMapFile(fileContent, filePath) { const lines = fileContent.split(/\r?\n/); - const rows = new Map(); + const messages = new Map(); + const fields = new Map(); + const settings = { + field_sep: '|', + component_sep: '^' + }; - lines.forEach((line, index) => { - const trimmed = line.trim(); - if (!trimmed || trimmed.startsWith('#')) return; + let pendingSection = null; + let multiline = null; + function parseKeyValue(line, index) { const separator = line.indexOf('='); if (separator < 0) { throw new Error(`${filePath}:${index + 1} invalid mapping line (expected KEY = value)`); @@ -49,28 +66,234 @@ function parseMapFile(fileContent, filePath) { const key = line.slice(0, separator).trim(); const value = line.slice(separator + 1).trim(); - if (!key) { throw new Error(`${filePath}:${index + 1} mapping key is required`); } - rows.set(key, value); + if (key === 'field_sep' || key === 'component_sep') { + settings[key] = value; + return; + } + + if (SELECTOR_PATTERN.test(value)) { + fields.set(key, value); + return; + } + + messages.set(key, value); + } + + lines.forEach((line, index) => { + const trimmed = line.trim(); + + if (multiline) { + if (trimmed === '>>') { + messages.set(multiline.key, multiline.lines.join('\n')); + multiline = null; + return; + } + multiline.lines.push(line); + return; + } + + if (pendingSection) { + if (!trimmed) return; + if (trimmed === '<<') { + multiline = { key: pendingSection, lines: [] }; + pendingSection = null; + return; + } + messages.set(pendingSection, line.trim()); + pendingSection = null; + return; + } + + if (!trimmed) return; + + const sectionMatch = trimmed.match(/^#\s*([A-Za-z0-9_.-]+)\s*$/); + if (sectionMatch && !trimmed.includes('=')) { + pendingSection = sectionMatch[1]; + return; + } + + if (trimmed.startsWith('#')) return; + parseKeyValue(line, index); }); - return rows; + if (multiline) { + throw new Error(`${filePath} unterminated multiline section for ${multiline.key} (expected >>)`); + } + + if (pendingSection) { + throw new Error(`${filePath} section ${pendingSection} is missing a body line`); + } + + return { messages, fields, settings }; } function loadMapFile(filePath) { const stat = fs.statSync(filePath); const cached = mapCache.get(filePath); if (cached && cached.mtimeMs === stat.mtimeMs) { - return cached.rows; + return cached.parsed; } const content = fs.readFileSync(filePath, 'utf8'); - const rows = parseMapFile(content, filePath); - mapCache.set(filePath, { mtimeMs: stat.mtimeMs, rows }); - return rows; + const parsed = parseMapFile(content, filePath); + mapCache.set(filePath, { mtimeMs: stat.mtimeMs, parsed }); + return parsed; +} + +function decodeControlTokens(value) { + return String(value).replace(/<(VT|FS|STX|ETX|CR|LF)>/gi, (_, token) => CONTROL_TOKEN_MAP[token.toUpperCase()] || ''); +} + +function parseSelector(selector) { + const match = String(selector || '').trim().match(SELECTOR_PATTERN); + if (!match) return null; + return { + recordType: match[1], + fieldIndex: Number(match[2]), + componentIndex: match[3] ? Number(match[3]) : null + }; +} + +function parseRecordLine(line, fieldSeparator) { + const text = String(line || '') + .replace(/[\u0002\u0003\u000b\u001c]/g, '') + .trim(); + if (!text) return null; + if (!text.includes(fieldSeparator)) return null; + const fields = text.split(fieldSeparator); + const type = String(fields[0] || '').trim(); + if (!type) return null; + return { type, fields, raw: text }; +} + +function extractRawPayloadCandidates(parsedPayload) { + const candidates = []; + + if (typeof parsedPayload.raw_payload === 'string') { + candidates.push(parsedPayload.raw_payload); + } + if (typeof parsedPayload.meta?.raw_payload === 'string') { + candidates.push(parsedPayload.meta.raw_payload); + } + if (Array.isArray(parsedPayload.results)) { + parsedPayload.results.forEach((result) => { + if (result && String(result.test_code || '').toUpperCase() === 'RAW' && typeof result.value === 'string') { + candidates.push(result.value); + } + }); + } + + return candidates; +} + +function stripFrameControlChars(value) { + return String(value || '') + .replace(/^[\u0002\u0003\u000b\u001c\r\n]+/, '') + .replace(/[\u0002\u0003\u000b\u001c\r\n]+$/, ''); +} + +function getFixedWidthSource(parsedPayload) { + const candidates = extractRawPayloadCandidates(parsedPayload); + for (let i = 0; i < candidates.length; i += 1) { + const stripped = stripFrameControlChars(candidates[i]); + if (stripped) return stripped; + } + return ''; +} + +function parseRawPayloadRecords(rawPayload, fieldSeparator) { + const normalized = String(rawPayload || '') + .replace(/\r\n/g, '\n') + .replace(/\r/g, '\n'); + + return normalized + .split('\n') + .map((line) => parseRecordLine(line, fieldSeparator)) + .filter(Boolean); +} + +function buildRecordCollections(parsedPayload, settings) { + const explicitSource = Array.isArray(parsedPayload.records) + ? parsedPayload.records + : Array.isArray(parsedPayload.meta?.records) + ? parsedPayload.meta.records + : []; + + const source = Array.isArray(explicitSource) ? [...explicitSource] : []; + const records = []; + const fieldSeparator = settings.field_sep || '|'; + + if (!source.length) { + const rawCandidates = extractRawPayloadCandidates(parsedPayload); + for (let i = 0; i < rawCandidates.length; i += 1) { + const parsed = parseRawPayloadRecords(rawCandidates[i], fieldSeparator); + if (parsed.length) { + source.push(...parsed.map((item) => item.raw)); + break; + } + } + } + + source.forEach((item) => { + if (typeof item === 'string') { + const parsed = parseRecordLine(item, fieldSeparator); + if (parsed) records.push(parsed); + return; + } + + if (!item || typeof item !== 'object') return; + if (Array.isArray(item.fields) && item.type) { + records.push({ + type: String(item.type), + fields: item.fields.map((value) => String(value ?? '')), + raw: '' + }); + } + }); + + const recordsByType = new Map(); + records.forEach((record) => { + if (!recordsByType.has(record.type)) recordsByType.set(record.type, []); + recordsByType.get(record.type).push(record); + }); + + return { records, recordsByType }; +} + +function resolveSelector(selector, context) { + const parsed = parseSelector(selector); + if (!parsed) return ''; + + const { recordType, fieldIndex, componentIndex } = parsed; + if (fieldIndex < 1) return ''; + + const record = context.currentRecord && context.currentRecord.type === recordType + ? context.currentRecord + : (context.recordsByType.get(recordType) || [])[0]; + if (!record) return ''; + + const field = record.fields[fieldIndex - 1]; + if (field === undefined || field === null) return ''; + if (!componentIndex) return field; + if (componentIndex < 1) return ''; + + const components = String(field).split(context.settings.component_sep || '^'); + return components[componentIndex - 1] || ''; +} + +function resolveFieldAlias(name, context, stack = new Set()) { + if (stack.has(name)) return ''; + if (!context.fields.has(name)) return ''; + + stack.add(name); + const selector = context.fields.get(name); + const value = resolveSelector(selector, context); + stack.delete(name); + return value; } function getPlaceholderValue(name, context) { @@ -78,6 +301,14 @@ function getPlaceholderValue(name, context) { return context.flat[name]; } + if (context.fields.has(name)) { + return resolveFieldAlias(name, context); + } + + if (SELECTOR_PATTERN.test(name)) { + return resolveSelector(name, context); + } + if (!name.includes('.')) { return ''; } @@ -93,16 +324,118 @@ function getPlaceholderValue(name, context) { return current === undefined || current === null ? '' : current; } -function renderTemplate(template, context) { - return String(template).replace(/\{([^{}]+)\}/g, (_, rawName) => { - const name = String(rawName || '').trim(); - if (!name) return ''; - const value = getPlaceholderValue(name, context); - return value === undefined || value === null ? '' : String(value); - }); +function parseFixedWidthDirective(name) { + const match = String(name || '').match(FIXED_WIDTH_DIRECTIVE_PATTERN); + if (!match) return null; + return { + fieldName: match[1], + length: Number(match[2]) + }; } -function buildTemplateContext(entry, parsedPayload, connector) { +function consumeFixedWidthField(directive, context) { + if (!directive || !context.fixedWidth) return null; + const { fieldName, length } = directive; + if (!Number.isFinite(length) || length < 0) return ''; + + const start = context.fixedWidth.cursor; + const end = start + length; + const value = context.fixedWidth.source.slice(start, end); + context.fixedWidth.cursor = end; + + if (fieldName.toLowerCase() === 'skip') return ''; + return value; +} + +function parseLoopDirective(value) { + const recordMatch = value.match(/^@for\s+([A-Za-z][A-Za-z0-9_]*)$/); + if (recordMatch) { + return { + type: 'record', + variable: recordMatch[1] + }; + } + + const rangeMatch = value.match(/^@for\s+([A-Za-z][A-Za-z0-9_]*)\s+in\s+(\d+)\.\.(\d+)$/); + if (!rangeMatch) return null; + + return { + type: 'range', + variable: rangeMatch[1], + start: Number(rangeMatch[2]), + end: Number(rangeMatch[3]) + }; +} + +function renderTemplate(template, context) { + const lines = String(template).split('\n'); + const outputLines = []; + + for (let index = 0; index < lines.length; index += 1) { + const line = lines[index]; + const trimmed = line.trim(); + const loop = parseLoopDirective(trimmed); + + if (loop) { + let endIndex = index + 1; + const loopBody = []; + + while (endIndex < lines.length && lines[endIndex].trim() !== '@end') { + loopBody.push(lines[endIndex]); + endIndex += 1; + } + + if (endIndex >= lines.length) { + throw new Error(`unterminated loop block for ${trimmed} (expected @end)`); + } + + if (loop.type === 'record') { + const records = context.recordsByType.get(loop.variable) || []; + records.forEach((record) => { + const nestedContext = { ...context, currentRecord: record }; + const body = renderTemplate(loopBody.join('\n'), nestedContext); + if (body) outputLines.push(body); + }); + } else { + const step = loop.start <= loop.end ? 1 : -1; + for (let value = loop.start; step > 0 ? value <= loop.end : value >= loop.end; value += step) { + const nestedContext = { + ...context, + flat: { + ...context.flat, + [loop.variable]: value + } + }; + const body = renderTemplate(loopBody.join('\n'), nestedContext); + if (body) outputLines.push(body); + } + } + + index = endIndex; + continue; + } + + if (trimmed === '@end') { + throw new Error('unexpected @end without matching @for'); + } + + const rendered = line.replace(/\{([^{}]+)\}/g, (_, rawName) => { + const name = String(rawName || '').trim(); + if (!name) return ''; + const fixedDirective = parseFixedWidthDirective(name); + if (fixedDirective) { + return consumeFixedWidthField(fixedDirective, context); + } + const value = getPlaceholderValue(name, context); + return value === undefined || value === null ? '' : String(value); + }); + outputLines.push(decodeControlTokens(rendered)); + } + + return outputLines.join('\n'); +} + +function buildTemplateContext(entry, parsedPayload, connector, mapDefinition) { const root = { ...parsedPayload, instrument_id: parsedPayload.instrument_id || entry.instrument_id, @@ -125,7 +458,21 @@ function buildTemplateContext(entry, parsedPayload, connector) { .join('\\'); } - return { root, flat }; + const { records, recordsByType } = buildRecordCollections(parsedPayload, mapDefinition.settings || {}); + + return { + root, + flat, + fields: mapDefinition.fields || new Map(), + settings: mapDefinition.settings || {}, + records, + recordsByType, + currentRecord: null, + fixedWidth: { + source: getFixedWidthSource(parsedPayload), + cursor: 0 + } + }; } function translateOverrides(entry, parsedPayload, connector) { @@ -148,18 +495,25 @@ function translateTemplate(entry, parsedPayload, connector) { throw new Error(`translator file not found: ${translator.file}`); } - const mapRows = loadMapFile(resolvedFilePath); + const mapDefinition = loadMapFile(resolvedFilePath); const messageKeys = Array.isArray(translator.messages) && translator.messages.length ? translator.messages.map((value) => String(value)) - : Array.from(mapRows.keys()); - const context = buildTemplateContext(entry, parsedPayload, connector); + : Array.from(mapDefinition.messages.keys()); + const context = buildTemplateContext(entry, parsedPayload, connector, mapDefinition); const renderedMessages = messageKeys.map((messageKey) => { - if (!mapRows.has(messageKey)) { + if (!mapDefinition.messages.has(messageKey)) { throw new Error(`translator message key not found in map file: ${messageKey}`); } + const messageContext = { + ...context, + fixedWidth: { + ...context.fixedWidth, + cursor: 0 + } + }; return { key: messageKey, - body: renderTemplate(mapRows.get(messageKey), context) + body: renderTemplate(mapDefinition.messages.get(messageKey), messageContext) }; }); diff --git a/core/queue/db.js b/core/queue/db.js index bd26b69..dfba255 100644 --- a/core/queue/db.js +++ b/core/queue/db.js @@ -1,7 +1,13 @@ +const fs = require('fs'); +const pathModule = require('path'); const sqlite3 = require('sqlite3'); class DatabaseClient { constructor({ path, busyTimeout = 5000 }) { + const dbDir = path ? pathModule.dirname(path) : null; + if (dbDir && dbDir !== '.') { + fs.mkdirSync(dbDir, { recursive: true }); + } this.db = new sqlite3.Database(path, (err) => { if (err) { console.error('unable to open sqlite file', err); diff --git a/core/queue/queue.js b/core/queue/queue.js index e50b825..466b7d7 100644 --- a/core/queue/queue.js +++ b/core/queue/queue.js @@ -137,6 +137,42 @@ class SqliteQueue { return row ? row.total : 0; } + async listOutbox({ status, limit = 50 } = {}) { + const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500)); + const base = `SELECT id, status, attempts, next_attempt_at, last_error, created_at FROM outbox_result`; + if (status) { + return this.db.all( + `${base} WHERE status = ? ORDER BY id DESC LIMIT ?`, + [status, safeLimit] + ); + } + return this.db.all(`${base} ORDER BY id DESC LIMIT ?`, [safeLimit]); + } + + async listRecentDeliveryAttempts(limit = 50) { + const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500)); + return this.db.all( + `SELECT l.id, l.outbox_id, l.attempt, l.status, l.response_code, l.response_body, l.latency_ms, l.created_at, + o.status AS outbox_status + FROM delivery_log l + LEFT JOIN outbox_result o ON o.id = l.outbox_id + ORDER BY l.id DESC + LIMIT ?`, + [safeLimit] + ); + } + + async listRecentDeadLetters(limit = 50) { + const safeLimit = Math.max(1, Math.min(Number(limit) || 50, 500)); + return this.db.all( + `SELECT id, payload, reason, created_at + FROM dead_letter + ORDER BY id DESC + LIMIT ?`, + [safeLimit] + ); + } + async ping() { await this.db.get('SELECT 1 as ok'); return true; diff --git a/core/rawPipeline.test.js b/core/rawPipeline.test.js index 45cce54..b50ed23 100644 --- a/core/rawPipeline.test.js +++ b/core/rawPipeline.test.js @@ -58,6 +58,107 @@ function run() { assert.strictEqual(templateNormalized.meta.rendered_messages[0].body, 'H|inst1|astm-serial'); assert.strictEqual(templateNormalized.meta.rendered_messages[1].body, 'O|raw-1|^^^RAW'); + const advancedMapFilePath = path.join(tempDir, 'jokoh.map'); + fs.writeFileSync(advancedMapFilePath, [ + '# human-friendly sections', + '# HEADER', + 'H|{instrument_id}|{sample_id}', + '', + '# RESULTS', + '<<', + '@for R', + 'R|{R[3]}|{R[4]}', + '@end', + '>>', + '', + '# TERMINATOR', + 'L|1|N' + ].join('\n')); + + const advancedEntry = { + ...entry, + files: { config: path.join(process.cwd(), 'config', 'app.yaml') }, + translator: { + engine: 'template', + file: advancedMapFilePath, + forceInstrumentId: true + } + }; + + const advancedSeed = { + ...canonicalSeed, + meta: { + raw_payload: '\u000bH|\\^&|||WST^P1\rR|1|GLU|5.6\rR|2|HGB|13.2\u001c\r' + } + }; + + const advancedTranslated = translator.translate(advancedEntry, advancedSeed, 'hl7-tcp', advancedEntry.translator.engine); + const advancedNormalized = normalize(advancedTranslated); + + assert.strictEqual(advancedNormalized.meta.rendered_messages.length, 3); + assert.strictEqual(advancedNormalized.meta.rendered_messages[0].body, '\u000bH|inst1|raw-1\r'); + assert.strictEqual(advancedNormalized.meta.rendered_messages[1].body, 'R|GLU|5.6\r\nR|HGB|13.2\r'); + assert.strictEqual(advancedNormalized.meta.rendered_messages[2].body, 'L|1|N\u001c\r'); + + const fixedWidthMapFilePath = path.join(tempDir, 'jokoh-fixed.map'); + fs.writeFileSync(fixedWidthMapFilePath, [ + '# fixed-width jokoh sections', + '# RESULT', + '<<', + ',{datetime:14:""},{skip:5:""},{sampleid:17:""},{sampletype:2:""},', + '@for i in 1..3', + '({testid:2:""},{result:5:""},{skip:2:""})', + '@end', + ',', + '>>' + ].join('\n')); + + const fixedWidthEntry = { + ...entry, + files: { config: path.join(process.cwd(), 'config', 'app.yaml') }, + translator: { + engine: 'template', + file: fixedWidthMapFilePath, + forceInstrumentId: true + } + }; + + const fixedWidthRawBody = [ + '20241008181025', + '00000', + '000000120169050 ', + '02', + 'A1', + '12.34', + ' ', + 'B2', + ' 9.8 ', + ' ', + 'C3', + '07.01', + ' ' + ].join(''); + const fixedWidthSeed = { + ...canonicalSeed, + meta: { + raw_payload: `\u0002${fixedWidthRawBody}\u0003` + } + }; + + const fixedWidthTranslated = translator.translate( + fixedWidthEntry, + fixedWidthSeed, + 'astm-serial', + fixedWidthEntry.translator.engine + ); + const fixedWidthNormalized = normalize(fixedWidthTranslated); + + assert.strictEqual(fixedWidthNormalized.meta.rendered_messages.length, 1); + assert.strictEqual( + fixedWidthNormalized.meta.rendered_messages[0].body, + '\u0002,20241008181025,,000000120169050 ,02,\n(A1,12.34,)\n(B2, 9.8 ,)\n(C3,07.01,)\n,\u0003' + ); + fs.rmSync(tempDir, { recursive: true, force: true }); console.log('Raw pipeline smoke test passed'); } diff --git a/core/runtime/connectors.js b/core/runtime/connectors.js new file mode 100644 index 0000000..4428a84 --- /dev/null +++ b/core/runtime/connectors.js @@ -0,0 +1,54 @@ +const logger = require('../util/logger'); +const instrumentConfig = require('../config/instrumentConfig'); +const { createHttpJsonConnector } = require('../instrument/connectors/httpJson'); +const { createHl7TcpConnector } = require('../instrument/connectors/tcp'); +const { createAstmSerialConnector } = require('../instrument/connectors/serial'); +const { createIngestMessageHandler } = require('../instrument/ingest'); + +const connectorFactories = { + 'http-json': createHttpJsonConnector, + 'hl7-tcp': createHl7TcpConnector, + 'astm-serial': createAstmSerialConnector +}; + +function buildConnectors() { + return instrumentConfig.list() + .filter((entry) => entry.enabled) + .map((entry) => { + const createConnector = connectorFactories[entry.connector]; + if (!createConnector) { + logger.warn({ connector: entry.connector, instrument_id: entry.instrument_id }, 'unknown connector in instrument config, skipping startup'); + return null; + } + + return createConnector({ + ...(entry.connectorConfig || {}), + instrument_id: entry.instrument_id + }); + }) + .filter(Boolean); +} + +function attachConnectorHandlers(connectors) { + connectors.forEach((connector) => { + connector.onMessage(createIngestMessageHandler(connector)); + connector.onError((err) => { + logger.error({ err: err.message }, `${connector.name()} emitted error`); + }); + }); +} + +async function startConnectors(connectors) { + await Promise.all(connectors.map((connector) => connector.start())); +} + +async function stopConnectors(connectors) { + await Promise.all(connectors.map((connector) => connector.stop())); +} + +module.exports = { + buildConnectors, + attachConnectorHandlers, + startConnectors, + stopConnectors +}; diff --git a/core/runtime/startup.js b/core/runtime/startup.js new file mode 100644 index 0000000..1684a2a --- /dev/null +++ b/core/runtime/startup.js @@ -0,0 +1,44 @@ +const config = require('../config/config'); +const logger = require('../util/logger'); +const migrate = require('../maintenance/migrate'); +const instrumentConfig = require('../config/instrumentConfig'); +const { createHttpServer } = require('../http/index'); +const { + buildConnectors, + attachConnectorHandlers, + startConnectors, + stopConnectors +} = require('./connectors'); +const { startWorkers, stopWorkers } = require('./workers'); + +async function start() { + instrumentConfig.validateAndLoadInstrumentConfigs(); + await migrate(); + await instrumentConfig.init(); + + const connectors = buildConnectors(); + if (!connectors.length) { + logger.warn('no enabled connectors configured, ingestion listeners are disabled'); + } + + attachConnectorHandlers(connectors); + await startConnectors(connectors); + await startWorkers(); + + const app = createHttpServer(connectors); + const server = app.listen(config.healthPort, () => { + logger.info({ port: config.healthPort }, 'health server ready'); + }); + + async function shutdown() { + await stopWorkers(); + await stopConnectors(connectors); + if (server) { + await new Promise((resolve) => server.close(resolve)); + } + } + + return { connectors, server, shutdown }; +} + +module.exports = { start }; diff --git a/core/runtime/workers.js b/core/runtime/workers.js new file mode 100644 index 0000000..0c995bd --- /dev/null +++ b/core/runtime/workers.js @@ -0,0 +1,14 @@ +const { startWorker, stopWorker } = require('../workers/host/resultWorker'); + +async function startWorkers() { + await startWorker(); +} + +async function stopWorkers() { + await stopWorker(); +} + +module.exports = { + startWorkers, + stopWorkers +}; diff --git a/core/logger.js b/core/util/logger.js similarity index 81% rename from core/logger.js rename to core/util/logger.js index 3bee495..499d7cc 100644 --- a/core/logger.js +++ b/core/util/logger.js @@ -1,5 +1,5 @@ const pino = require('pino'); -const config = require('./config/config'); +const config = require('../config/config'); const logger = pino({ level: process.env.LOG_LEVEL || 'info', diff --git a/core/workers/host/resultWorker.js b/core/workers/host/resultWorker.js new file mode 100644 index 0000000..261c301 --- /dev/null +++ b/core/workers/host/resultWorker.js @@ -0,0 +1,43 @@ +const queue = require('../../queue/queue'); +const logger = require('../../util/logger'); +const config = require('../../config/config'); +const { deliverOutboxEntry } = require('../../host/resultService'); + +let running = false; +let workerPromise; + +const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + +async function loop() { + while (running) { + try { + const batch = await queue.claimPending(config.worker.batchSize, config.worker.workerId); + if (!batch.length) { + await sleep(config.worker.pollInterval); + continue; + } + + for (const entry of batch) { + await deliverOutboxEntry(entry); + } + } catch (err) { + logger.error({ err: err.message }, 'delivery worker error'); + await sleep(config.worker.pollInterval); + } + } +} + +async function startWorker() { + if (running) return; + running = true; + workerPromise = loop(); +} + +async function stopWorker() { + running = false; + if (workerPromise) { + await workerPromise; + } +} + +module.exports = { startWorker, stopWorker }; diff --git a/docs/design.md b/docs/design.md index acfd7e6..80ceda5 100644 --- a/docs/design.md +++ b/docs/design.md @@ -48,6 +48,7 @@ flowchart LR ## Current vs Planned - Current: result ingest → raw capture → translator → queue → delivery worker → CLQMS01. +- Current: operations dashboard UI and APIs at `/dashboard` for queue/instrument visibility. - Planned: scheduler/poller for download requests and message router for download/query workflows. ## Key Components diff --git a/docs/jokoh.log b/docs/jokoh.log new file mode 100644 index 0000000..6266a0c --- /dev/null +++ b/docs/jokoh.log @@ -0,0 +1,21 @@ +B04SMONO Communication MONO: (08-10-24 12:57:05:842)(COM4 9600 N 8 1) +*&* +compte octet = '15000 +IN (08-10-24 18:09:50:373) +->202410081810250000000120169050 MB001134.5 02 2.93 03101.2 <- +IN (08-10-24 18:17:38:788) +->202410081818140000000220169050 MB001134.7 02 2.92 E03101.2 <- +IN (08-10-24 18:18:59:930) +->202410081819350000000320169050 MB001134.5 02 2.93 03101.4 <- +IN (08-10-24 19:48:27:681) +->202410081949030000000120169053 MB001140.3 02 3.44 03107.9 <- +IN (08-10-24 23:53:32:482) +->20241008235407000000021 MB001136.3 02 4.12 03103.1 <- +IN (08-10-24 23:54:40:722) +->20241008235516000000032 MB001170.2 02 7.08 03137.9 <- +IN (09-10-24 05:44:41:533) +->202410090545300000000420169069 MB001139.8 02 3.88 03107.4 <- +IN (09-10-24 06:58:07:558) +->202410090658400000000520169075 MB001137.9 02 3.95 03107.2 <- +IN (09-10-24 07:07:05:349) +->202410090707390000000620169074 MB001135.4 02 3.68 03102.7 <- \ No newline at end of file diff --git a/docs/separated_architecture.md b/docs/separated_architecture.md new file mode 100644 index 0000000..6f13ab4 --- /dev/null +++ b/docs/separated_architecture.md @@ -0,0 +1,114 @@ +# TinyLink Separated Architecture + +## Why this refactor + +TinyLink now separates runtime wiring, host communication, instrument communication, and operational HTTP endpoints. The goal is clearer ownership per module and easier future expansion for instrument-request workflows. + +## Folder layout + +```text +core/ + index.js + app.js + + host/ + resultClient.js + resultService.js + + instrument/ + ingest.js + connectors/ + httpJson.js + tcp.js + serial.js + + runtime/ + startup.js + connectors.js + workers.js + + workers/ + host/ + resultWorker.js + + http/ + index.js + healthRouter.js + metricsRouter.js + instrumentRouter.js + dashboardRouter.js + dashboard/ + page.js + + queue/ + db.js + queue.js + + config/ + config.js + instrumentConfig.js + instrumentCheck.js + + maintenance/ + schema.sql + migrate.js + maintenance.js + + pipeline/ + hash.js + normalizer.js + pipeline.js + translator.js + + util/ + logger.js +``` + +## Responsibilities + +- `runtime/`: startup orchestration only (migrate, config init, start connectors/workers, open HTTP server, graceful shutdown). +- `host/`: outbound result delivery to host API, including retry/dead-letter decisions. +- `instrument/`: inbound connector protocol adapters and ingest handoff to pipeline. +- `workers/`: polling loops and queue claim logic, organized by target domain. +- `http/`: all API/UI routes. +- `queue/`: SQLite persistence and query helpers. +- `runtime/`: startup and graceful shutdown orchestration only. +- `pipeline/`: translation/normalization/dedupe logic only. +- `util/`: shared generic helpers. + +## Runtime flow (result path) + +1. Connector receives raw instrument payload. +2. `instrument/ingest` forwards payload to `pipeline/processMessage`. +3. Pipeline writes to queue tables (`inbox_raw`, `outbox_result`). +4. `workers/host/resultWorker` claims pending outbox rows. +5. `host/resultService` sends to CLQMS host and updates status/retry/dead-letter. + +## Dashboard UI + +The built-in dashboard is available at: + +- `GET /dashboard` + +Dashboard APIs: + +- `GET /dashboard/api/summary` +- `GET /dashboard/api/queue` +- `GET /dashboard/api/instruments` +- `GET /dashboard/api/recent` + +The page auto-refreshes every 5 seconds and shows queue counters, instrument connector state, queue tail, and recent delivery attempts. + +## Compatibility notes + +- `core/app.js` remains as the entry import and now delegates to `core/runtime/startup.js`. +- `core/http.js` remains and delegates to `core/http/index.js`. + +## Deferred scope + +Instrument request flow (host -> instrument, and query roundtrip) is intentionally deferred. The new structure keeps clear slots for: + +- `core/instrument/` +- `core/workers/requestWorker.js` + +without mixing those concerns into the result-delivery path. diff --git a/docs/user_manual.md b/docs/user_manual.md index 3022719..221a1fd 100644 --- a/docs/user_manual.md +++ b/docs/user_manual.md @@ -31,8 +31,13 @@ Useful endpoints: - `GET http://localhost:4001/health` - `GET http://localhost:4001/health/ready` - `GET http://localhost:4001/metrics` +- `GET http://localhost:4001/dashboard` - `POST http://localhost:3001/messages` (for JSON instrument payloads) +Architecture reference: + +- `docs/separated_architecture.md` + ## Add a New Instrument (New "Inst") TinyLink now uses a single file-based configuration. There is no `POST /instruments` write flow. @@ -72,7 +77,9 @@ inst1: ### Optional: Use per-instrument `.map` translator files -If you want a simpler per-instrument translator file, use `translator.engine: template` with a `.map` file. +If you want a shorter per-instrument translator config, set `translator` to the map name. + +Example: `translator: inst1` loads `config/inst1.map`. `config/app.yaml` example: @@ -82,30 +89,45 @@ inst1: connector: type: serial port: COM1 - translator: - engine: template - file: config/translators/inst1.map - messages: [HEADER, PATIENT, ORDER, TERMINATOR] - forceInstrumentId: true + translator: inst1 ``` -`config/translators/inst1.map` example: +`config/inst1.map` example: ```text -# KEY = message body template -HEADER = H|\^&|||WST^P1|||||{instrument_id}^System1||P|1|{specimen_id} -PATIENT = P|{patient_id}|{sample_id}|||{last_name}^{first_name}||{birth_date}|{sex}|||||{doctor}| -ORDER = O|1|{sample_id}||{order_tests}||||||{specimen_type}||||{tube_type}||||||||||O| -TERMINATOR = L|1|N +# HEADER +H|\^&|||WST^P1|||||{instrument_id}^System1||P|1|{sample_id} + +SAMPLE_ID = O[3] +CHEMNUM = R[3] +RESULT = R[4] + +# ORDER +O|1|{SAMPLE_ID}||{order_tests}||||||{specimen_type}||||{tube_type}||||||||||O| + +# RESULT +<< +@for R +R|1|^^^{CHEMNUM}|{RESULT} +@end +>> + +# TERMINATOR +L|1|N ``` Notes: -- `.map` supports one `KEY = value` template per line. -- Blank lines and `#` comments are ignored. +- Preferred style is section-based: `# MESSAGE_NAME` plus one body line. +- Use `<<` and `>>` for multiline section bodies. +- `KEY = value` remains supported for backwards compatibility. +- `ALIAS = R[3]` (or `O[11]`, `P[2]`, `OBX[5.2]`) defines readable field aliases. +- You can reference selectors directly as placeholders: `{R[3]}`. +- Control tokens in message bodies are decoded: ``, ``, ``, ``, ``, ``. +- `@for R ... @end` repeats a multiline block for each incoming record of type `R`. +- If `records` are not pre-parsed, TinyLink auto-extracts records from `raw_payload`/`meta.raw_payload` for selector usage. - Placeholders use `{name}` and missing values default to empty strings. -- Keep protocol framing (`STX`, `ETX`, `CR`, `LF`) outside `.map`; use message body templates only. -- `translator.messages` controls output order. If omitted, all keys from the `.map` file are rendered in file order. +- `translator.messages` controls output order. If omitted, messages render in map file order. What it does: @@ -215,7 +237,7 @@ Use these to check system status: - Confirm exact `instrument_id` spelling and casing. - Verify the instrument exists in `config/app.yaml` as its own top-level key (`inst1`, `inst2`, ...). -- Verify that instrument has `connector.type`, connector settings, and `translator.engine`. +- Verify that instrument has `connector.type`, connector settings, and `translator`. ### Data is not flowing diff --git a/package-lock.json b/package-lock.json index ccd5e73..7ce7994 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1577,6 +1577,19 @@ "url": "https://opencollective.com/express" } }, + "node_modules/picomatch": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.4.tgz", + "integrity": "sha512-QP88BAKvMam/3NxH6vj2o21R6MjxZUAd6nlwAS/pnGvN9IVLocLHxGYIzFhg6fUQ+5th6P4dv4eW9jX3DSIj7A==", + "license": "MIT", + "optional": true, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/jonschlinkert" + } + }, "node_modules/pino": { "version": "10.3.1", "resolved": "https://registry.npmjs.org/pino/-/pino-10.3.1.tgz",