diff --git a/src/configs/save-pipeline.js b/src/configs/save-pipeline.js index 6ac8704..c1ff8c6 100644 --- a/src/configs/save-pipeline.js +++ b/src/configs/save-pipeline.js @@ -149,7 +149,7 @@ async function pinPlugins({ document, bundled, known, pinStateDir, log }) { // Pre-pinned documents (the air-gap escape hatch) are validated, // never re-resolved: an operator copying a pinned document from a // client lock file or another server must not need network. - if (typeof entry.content_hash === 'string' && entry.content_hash.length > 0) { + if (typeof entry.artifact_hash === 'string' && entry.artifact_hash.length > 0) { const problem = validatePrePinned(entry) if (problem) { return { ok: false, error: 'plugin_pin_invalid', detail: `${entry.name}: ${problem}` } @@ -221,15 +221,17 @@ async function resolveAndPin({ entry, pinStateDir, log }) { }) return { ok: true, - // The same fields the client lock file records: the config names - // exactly one artifact, and a tampered artifact fails the hash - // check client-side. + // The served config carries the kernel's config-wire pin fields, not + // a lock-file entry: `version` + `artifact_hash` (the content hash + // the client verifies against the staged artifact at apply — + // hypaware config/apply_deps.js). `source` is deliberately NOT set + // here so the entry keeps the operator's raw source *string*; the + // client re-resolves it. Storing the resolved object spec (kind/raw/ + // path) would make the served document fail the kernel's own config + // shape parser, which requires `source` to be a string. pin: { version: fetched.manifest.version, - source, - ...(fetched.resolvedRef ? { resolved_ref: fetched.resolvedRef } : {}), - content_hash: fetched.contentHash, - manifest_hash: fetched.manifestHash, + artifact_hash: fetched.contentHash, }, metadata: metadataFromManifest(fetched.manifest), } @@ -268,11 +270,15 @@ function validatePrePinned(entry) { if (typeof entry.version !== 'string' || entry.version.length === 0) { return 'pre-pinned entries must carry a version' } - if (!entry.source || typeof entry.source !== 'object' || typeof entry.source.kind !== 'string') { - return 'pre-pinned entries must carry a resolved source spec (object with kind)' + if (typeof entry.artifact_hash !== 'string' || entry.artifact_hash.length === 0) { + return 'pre-pinned entries must carry an artifact_hash' } - if (typeof entry.manifest_hash !== 'string' || entry.manifest_hash.length === 0) { - return 'pre-pinned entries must carry a manifest_hash' + // `source` is optional (first-party names re-resolve from the name), + // but when present it is the operator's raw source string the client + // re-resolves — never the resolved object spec. parseConfigShape has + // already enforced string-ness; this keeps the contract local + loud. + if (entry.source !== undefined && (typeof entry.source !== 'string' || entry.source.length === 0)) { + return 'pre-pinned entries must carry a string source when present' } return null } diff --git a/src/daemon.js b/src/daemon.js index a750d6d..447adde 100644 --- a/src/daemon.js +++ b/src/daemon.js @@ -178,9 +178,10 @@ export async function runServerDaemon(opts = {}) { stopped = true for (const timer of timers) clearInterval(timer) // Drain what we can on the way down; spool durability means an - // unclean exit here loses nothing either way. + // unclean exit here loses nothing either way. drain() (not tick()) + // so a background tick already in flight can't make this a no-op. try { - await mover.tick() + await mover.drain() } catch { // shutdown is best-effort } diff --git a/src/http/routes-admin.js b/src/http/routes-admin.js index 64a9b99..18494c3 100644 --- a/src/http/routes-admin.js +++ b/src/http/routes-admin.js @@ -92,7 +92,9 @@ export async function handleAdmin(ctx) { // Operational escape hatches: run the mover / archive export / // eviction immediately instead of waiting for their schedules. if (url.pathname === '/v1/admin/mover/run' && req.method === 'POST') { - const moved = await services.mover.tick() + // drain(), not tick(): "run now" must guarantee every already-spooled + // row is committed before we ack, even if a background tick is mid-pass. + const moved = await services.mover.drain() return sendJson(res, 200, { moved }) } if (url.pathname === '/v1/admin/archive/run' && req.method === 'POST') { diff --git a/src/ingest/mover.js b/src/ingest/mover.js index 3cf6dc3..d6d5e7d 100644 --- a/src/ingest/mover.js +++ b/src/ingest/mover.js @@ -20,30 +20,59 @@ * }} args */ export function createMover({ spool, catalog, storage, log }) { - let running = false + /** + * The in-flight pass, if any. A single slot serializes passes so two + * never drain the same spool file concurrently — and lets a guaranteed + * caller wait one out before starting its own. + * @type {Promise | null} + */ + let inflight = null + + /** One pass over every currently-pending spool file. */ + async function pass() { + let moved = 0 + for (const pending of spool.pending()) { + try { + moved += await drainFile(pending) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message }) + } + } + return moved + } + + /** Start a pass and own clearing the in-flight slot when it settles. */ + function startPass() { + const p = pass() + inflight = p + void p.finally(() => { if (inflight === p) inflight = null }) + return p + } return { /** - * One mover pass over all pending spool files. Returns the number - * of rows committed to the cache. + * Opportunistic pass for the background timer: if one is already + * running, skip rather than queue — a dropped timer tick costs + * nothing because spool durability means the next tick still drains. + * Returns the number of rows committed to the cache (0 if skipped). */ async tick() { - if (running) return 0 - running = true - let moved = 0 - try { - for (const pending of spool.pending()) { - try { - moved += await drainFile(pending) - } catch (err) { - const message = err instanceof Error ? err.message : String(err) - log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message }) - } - } - } finally { - running = false - } - return moved + if (inflight) return 0 + return startPass() + }, + + /** + * Guaranteed drain for callers that must observe every row spooled + * *before* this call committed before it returns — the admin + * `mover/run` escape hatch and shutdown. A skip would be a silent + * lie there, so wait out any in-flight pass (whose `pending()` + * snapshot may predate the caller's rows) and then run a fresh pass + * that is guaranteed to see them. + */ + async drain() { + while (inflight) await inflight.catch(() => {}) + return startPass() }, } diff --git a/test/smoke.js b/test/smoke.js index 5f63bc1..a9d43ee 100644 --- a/test/smoke.js +++ b/test/smoke.js @@ -192,10 +192,12 @@ try { const getConfig = await call('GET', '/v1/admin/configs/smoke-fleet', { token: admin.token }) const pinnedEntry = getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === 'hypaware-plugin-smoke') - check('stored config pins plugin version + content hash', - pinnedEntry.version === '1.2.3' && typeof pinnedEntry.content_hash === 'string' && pinnedEntry.content_hash.length === 64, pinnedEntry) + check('stored config pins plugin version + artifact hash', + pinnedEntry.version === '1.2.3' && typeof pinnedEntry.artifact_hash === 'string' && pinnedEntry.artifact_hash.length === 64, pinnedEntry) + check('pinned source stays the operator\'s raw string (client re-resolves it)', + pinnedEntry.source === pluginDir, pinnedEntry) check('bundled plugin entries stay unpinned', - getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').content_hash === undefined) + getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').artifact_hash === undefined) const saveAgain = await call('PUT', '/v1/admin/configs/smoke-fleet', { token: admin.token, json: fleetConfig }) check('identical re-save is revisionless', saveAgain.status === 200 && saveAgain.body.revision_created === false && saveAgain.body.etag === configEtag1) @@ -379,7 +381,10 @@ try { // present — mirror that here. const proxyRow = { gateway_id: 'client-local-gw', - schema_version: 4, + schema_version: 6, + // session_id is the non-null partition key (schema v6, LLP 0030); + // conversation_id rides along nullable. Both present here. + session_id: 'sess-1', conversation_id: 'conv-1', provider: 'smoke', conversation_started_at: new Date().toISOString(), @@ -396,7 +401,7 @@ try { token: gw2.jwt, ndjson: JSON.stringify(proxyRow) + '\n', }) - check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202) + check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202, proxyIngest.body) await runMover() const proxyCount = await sql('SELECT COUNT(*) AS n FROM ai_gateway_messages') check('proxy rows landed', Number(proxyCount.rows[0].n) === 1, proxyCount.rows)