Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/configs/save-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async function pinPlugins({ document, bundled, known, pinStateDir, log }) {
// Pre-pinned documents (the air-gap escape hatch) are validated,
// never re-resolved: an operator copying a pinned document from a
// client lock file or another server must not need network.
if (typeof entry.content_hash === 'string' && entry.content_hash.length > 0) {
if (typeof entry.artifact_hash === 'string' && entry.artifact_hash.length > 0) {
const problem = validatePrePinned(entry)
if (problem) {
return { ok: false, error: 'plugin_pin_invalid', detail: `${entry.name}: ${problem}` }
Expand Down Expand Up @@ -221,15 +221,17 @@ async function resolveAndPin({ entry, pinStateDir, log }) {
})
return {
ok: true,
// The same fields the client lock file records: the config names
// exactly one artifact, and a tampered artifact fails the hash
// check client-side.
// The served config carries the kernel's config-wire pin fields, not
// a lock-file entry: `version` + `artifact_hash` (the content hash
// the client verifies against the staged artifact at apply —
// hypaware config/apply_deps.js). `source` is deliberately NOT set
// here so the entry keeps the operator's raw source *string*; the
// client re-resolves it. Storing the resolved object spec (kind/raw/
// path) would make the served document fail the kernel's own config
// shape parser, which requires `source` to be a string.
pin: {
version: fetched.manifest.version,
source,
...(fetched.resolvedRef ? { resolved_ref: fetched.resolvedRef } : {}),
content_hash: fetched.contentHash,
manifest_hash: fetched.manifestHash,
artifact_hash: fetched.contentHash,
},
metadata: metadataFromManifest(fetched.manifest),
}
Expand Down Expand Up @@ -268,11 +270,15 @@ function validatePrePinned(entry) {
if (typeof entry.version !== 'string' || entry.version.length === 0) {
return 'pre-pinned entries must carry a version'
}
if (!entry.source || typeof entry.source !== 'object' || typeof entry.source.kind !== 'string') {
return 'pre-pinned entries must carry a resolved source spec (object with kind)'
if (typeof entry.artifact_hash !== 'string' || entry.artifact_hash.length === 0) {
return 'pre-pinned entries must carry an artifact_hash'
}
if (typeof entry.manifest_hash !== 'string' || entry.manifest_hash.length === 0) {
return 'pre-pinned entries must carry a manifest_hash'
// `source` is optional (first-party names re-resolve from the name),
// but when present it is the operator's raw source string the client
// re-resolves — never the resolved object spec. parseConfigShape has
// already enforced string-ness; this keeps the contract local + loud.
if (entry.source !== undefined && (typeof entry.source !== 'string' || entry.source.length === 0)) {
return 'pre-pinned entries must carry a string source when present'
}
return null
}
Expand Down
5 changes: 3 additions & 2 deletions src/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,10 @@ export async function runServerDaemon(opts = {}) {
stopped = true
for (const timer of timers) clearInterval(timer)
// Drain what we can on the way down; spool durability means an
// unclean exit here loses nothing either way.
// unclean exit here loses nothing either way. drain() (not tick())
// so a background tick already in flight can't make this a no-op.
try {
await mover.tick()
await mover.drain()
} catch {
// shutdown is best-effort
}
Expand Down
4 changes: 3 additions & 1 deletion src/http/routes-admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ export async function handleAdmin(ctx) {
// Operational escape hatches: run the mover / archive export /
// eviction immediately instead of waiting for their schedules.
if (url.pathname === '/v1/admin/mover/run' && req.method === 'POST') {
const moved = await services.mover.tick()
// drain(), not tick(): "run now" must guarantee every already-spooled
// row is committed before we ack, even if a background tick is mid-pass.
const moved = await services.mover.drain()
return sendJson(res, 200, { moved })
}
if (url.pathname === '/v1/admin/archive/run' && req.method === 'POST') {
Expand Down
67 changes: 48 additions & 19 deletions src/ingest/mover.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,59 @@
* }} args
*/
export function createMover({ spool, catalog, storage, log }) {
let running = false
/**
* The in-flight pass, if any. A single slot serializes passes so two
* never drain the same spool file concurrently — and lets a guaranteed
* caller wait one out before starting its own.
* @type {Promise<number> | null}
*/
let inflight = null

/** One pass over every currently-pending spool file. */
async function pass() {
let moved = 0
for (const pending of spool.pending()) {
try {
moved += await drainFile(pending)
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message })
}
}
return moved
}

/** Start a pass and own clearing the in-flight slot when it settles. */
function startPass() {
const p = pass()
inflight = p
void p.finally(() => { if (inflight === p) inflight = null })
return p
}

return {
/**
* One mover pass over all pending spool files. Returns the number
* of rows committed to the cache.
* Opportunistic pass for the background timer: if one is already
* running, skip rather than queue — a dropped timer tick costs
* nothing because spool durability means the next tick still drains.
* Returns the number of rows committed to the cache (0 if skipped).
*/
async tick() {
if (running) return 0
running = true
let moved = 0
try {
for (const pending of spool.pending()) {
try {
moved += await drainFile(pending)
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message })
}
}
} finally {
running = false
}
return moved
if (inflight) return 0
return startPass()
},

/**
* Guaranteed drain for callers that must observe every row spooled
* *before* this call committed before it returns — the admin
* `mover/run` escape hatch and shutdown. A skip would be a silent
* lie there, so wait out any in-flight pass (whose `pending()`
* snapshot may predate the caller's rows) and then run a fresh pass
* that is guaranteed to see them.
*/
async drain() {
while (inflight) await inflight.catch(() => {})
return startPass()
},
}

Expand Down
15 changes: 10 additions & 5 deletions test/smoke.js
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,12 @@ try {

const getConfig = await call('GET', '/v1/admin/configs/smoke-fleet', { token: admin.token })
const pinnedEntry = getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === 'hypaware-plugin-smoke')
check('stored config pins plugin version + content hash',
pinnedEntry.version === '1.2.3' && typeof pinnedEntry.content_hash === 'string' && pinnedEntry.content_hash.length === 64, pinnedEntry)
check('stored config pins plugin version + artifact hash',
pinnedEntry.version === '1.2.3' && typeof pinnedEntry.artifact_hash === 'string' && pinnedEntry.artifact_hash.length === 64, pinnedEntry)
check('pinned source stays the operator\'s raw string (client re-resolves it)',
pinnedEntry.source === pluginDir, pinnedEntry)
check('bundled plugin entries stay unpinned',
getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').content_hash === undefined)
getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').artifact_hash === undefined)

const saveAgain = await call('PUT', '/v1/admin/configs/smoke-fleet', { token: admin.token, json: fleetConfig })
check('identical re-save is revisionless', saveAgain.status === 200 && saveAgain.body.revision_created === false && saveAgain.body.etag === configEtag1)
Expand Down Expand Up @@ -379,7 +381,10 @@ try {
// present — mirror that here.
const proxyRow = {
gateway_id: 'client-local-gw',
schema_version: 4,
schema_version: 6,
// session_id is the non-null partition key (schema v6, LLP 0030);
// conversation_id rides along nullable. Both present here.
session_id: 'sess-1',
conversation_id: 'conv-1',
provider: 'smoke',
conversation_started_at: new Date().toISOString(),
Expand All @@ -396,7 +401,7 @@ try {
token: gw2.jwt,
ndjson: JSON.stringify(proxyRow) + '\n',
})
check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202)
check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202, proxyIngest.body)
await runMover()
const proxyCount = await sql('SELECT COUNT(*) AS n FROM ai_gateway_messages')
check('proxy rows landed', Number(proxyCount.rows[0].n) === 1, proxyCount.rows)
Expand Down