From 545c11a2f7ac156fd4fa6d3995925ea714d26d8e Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Wed, 17 Jun 2026 13:18:38 -0700 Subject: [PATCH 1/4] fix: make the smoke test reliably green MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The end-to-end smoke test was failing — partly flaky, partly broken — from three independent causes. It exits on the first failed check, so each masked the next; this fixes all three. 1. Mover "run now" could silently no-op (the real ~50% flake). mover.tick() had a `running` guard that made a *concurrent* call return 0 immediately. The 200ms background timer and the admin /v1/admin/mover/run endpoint both call it, so when runMover() landed mid-pass it returned 200 without committing the just-spooled row, and the follow-up query saw 0 rows. Split into opportunistic tick() (the timer keeps skipping, never piles up) and guaranteed drain() (waits out any in-flight pass, then runs a fresh pass whose pending() snapshot is guaranteed to include rows spooled just before the call). The admin endpoint and the shutdown drain now use drain(). 2. Config pin format diverged from the kernel's config wire schema. The save pipeline emitted a lock-file-shaped plugin entry into a config document — object `source` ({kind,raw,path}) and `content_hash` — but the kernel's parseConfigShape requires `source` to be a string and the client verifies the pin under `artifact_hash` (hypaware config/apply_deps.js). The server thus produced a document its own shape parser rejects on re-submission. Emit `version` + `artifact_hash`, keep `source` as the operator's raw string the client re-resolves; validatePrePinned validates that shape. 3. Smoke proxy row predated the ai_gateway_messages schema. That dataset gained a required non-null `session_id` column (schema v6); add it. Verified: 50/50 runs green (was ~50% flaky). Co-Authored-By: Claude Opus 4.8 (1M context) --- src/configs/save-pipeline.js | 30 +++++++++------- src/daemon.js | 5 +-- src/http/routes-admin.js | 4 ++- src/ingest/mover.js | 67 ++++++++++++++++++++++++++---------- test/smoke.js | 15 +++++--- 5 files changed, 82 insertions(+), 39 deletions(-) diff --git a/src/configs/save-pipeline.js b/src/configs/save-pipeline.js index 6ac8704..c1ff8c6 100644 --- a/src/configs/save-pipeline.js +++ b/src/configs/save-pipeline.js @@ -149,7 +149,7 @@ async function pinPlugins({ document, bundled, known, pinStateDir, log }) { // Pre-pinned documents (the air-gap escape hatch) are validated, // never re-resolved: an operator copying a pinned document from a // client lock file or another server must not need network. - if (typeof entry.content_hash === 'string' && entry.content_hash.length > 0) { + if (typeof entry.artifact_hash === 'string' && entry.artifact_hash.length > 0) { const problem = validatePrePinned(entry) if (problem) { return { ok: false, error: 'plugin_pin_invalid', detail: `${entry.name}: ${problem}` } @@ -221,15 +221,17 @@ async function resolveAndPin({ entry, pinStateDir, log }) { }) return { ok: true, - // The same fields the client lock file records: the config names - // exactly one artifact, and a tampered artifact fails the hash - // check client-side. + // The served config carries the kernel's config-wire pin fields, not + // a lock-file entry: `version` + `artifact_hash` (the content hash + // the client verifies against the staged artifact at apply — + // hypaware config/apply_deps.js). `source` is deliberately NOT set + // here so the entry keeps the operator's raw source *string*; the + // client re-resolves it. Storing the resolved object spec (kind/raw/ + // path) would make the served document fail the kernel's own config + // shape parser, which requires `source` to be a string. pin: { version: fetched.manifest.version, - source, - ...(fetched.resolvedRef ? { resolved_ref: fetched.resolvedRef } : {}), - content_hash: fetched.contentHash, - manifest_hash: fetched.manifestHash, + artifact_hash: fetched.contentHash, }, metadata: metadataFromManifest(fetched.manifest), } @@ -268,11 +270,15 @@ function validatePrePinned(entry) { if (typeof entry.version !== 'string' || entry.version.length === 0) { return 'pre-pinned entries must carry a version' } - if (!entry.source || typeof entry.source !== 'object' || typeof entry.source.kind !== 'string') { - return 'pre-pinned entries must carry a resolved source spec (object with kind)' + if (typeof entry.artifact_hash !== 'string' || entry.artifact_hash.length === 0) { + return 'pre-pinned entries must carry an artifact_hash' } - if (typeof entry.manifest_hash !== 'string' || entry.manifest_hash.length === 0) { - return 'pre-pinned entries must carry a manifest_hash' + // `source` is optional (first-party names re-resolve from the name), + // but when present it is the operator's raw source string the client + // re-resolves — never the resolved object spec. parseConfigShape has + // already enforced string-ness; this keeps the contract local + loud. + if (entry.source !== undefined && (typeof entry.source !== 'string' || entry.source.length === 0)) { + return 'pre-pinned entries must carry a string source when present' } return null } diff --git a/src/daemon.js b/src/daemon.js index a750d6d..447adde 100644 --- a/src/daemon.js +++ b/src/daemon.js @@ -178,9 +178,10 @@ export async function runServerDaemon(opts = {}) { stopped = true for (const timer of timers) clearInterval(timer) // Drain what we can on the way down; spool durability means an - // unclean exit here loses nothing either way. + // unclean exit here loses nothing either way. drain() (not tick()) + // so a background tick already in flight can't make this a no-op. try { - await mover.tick() + await mover.drain() } catch { // shutdown is best-effort } diff --git a/src/http/routes-admin.js b/src/http/routes-admin.js index 64a9b99..18494c3 100644 --- a/src/http/routes-admin.js +++ b/src/http/routes-admin.js @@ -92,7 +92,9 @@ export async function handleAdmin(ctx) { // Operational escape hatches: run the mover / archive export / // eviction immediately instead of waiting for their schedules. if (url.pathname === '/v1/admin/mover/run' && req.method === 'POST') { - const moved = await services.mover.tick() + // drain(), not tick(): "run now" must guarantee every already-spooled + // row is committed before we ack, even if a background tick is mid-pass. + const moved = await services.mover.drain() return sendJson(res, 200, { moved }) } if (url.pathname === '/v1/admin/archive/run' && req.method === 'POST') { diff --git a/src/ingest/mover.js b/src/ingest/mover.js index 3cf6dc3..d6d5e7d 100644 --- a/src/ingest/mover.js +++ b/src/ingest/mover.js @@ -20,30 +20,59 @@ * }} args */ export function createMover({ spool, catalog, storage, log }) { - let running = false + /** + * The in-flight pass, if any. A single slot serializes passes so two + * never drain the same spool file concurrently — and lets a guaranteed + * caller wait one out before starting its own. + * @type {Promise | null} + */ + let inflight = null + + /** One pass over every currently-pending spool file. */ + async function pass() { + let moved = 0 + for (const pending of spool.pending()) { + try { + moved += await drainFile(pending) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message }) + } + } + return moved + } + + /** Start a pass and own clearing the in-flight slot when it settles. */ + function startPass() { + const p = pass() + inflight = p + void p.finally(() => { if (inflight === p) inflight = null }) + return p + } return { /** - * One mover pass over all pending spool files. Returns the number - * of rows committed to the cache. + * Opportunistic pass for the background timer: if one is already + * running, skip rather than queue — a dropped timer tick costs + * nothing because spool durability means the next tick still drains. + * Returns the number of rows committed to the cache (0 if skipped). */ async tick() { - if (running) return 0 - running = true - let moved = 0 - try { - for (const pending of spool.pending()) { - try { - moved += await drainFile(pending) - } catch (err) { - const message = err instanceof Error ? err.message : String(err) - log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message }) - } - } - } finally { - running = false - } - return moved + if (inflight) return 0 + return startPass() + }, + + /** + * Guaranteed drain for callers that must observe every row spooled + * *before* this call committed before it returns — the admin + * `mover/run` escape hatch and shutdown. A skip would be a silent + * lie there, so wait out any in-flight pass (whose `pending()` + * snapshot may predate the caller's rows) and then run a fresh pass + * that is guaranteed to see them. + */ + async drain() { + while (inflight) await inflight.catch(() => {}) + return startPass() }, } diff --git a/test/smoke.js b/test/smoke.js index 5f63bc1..a9d43ee 100644 --- a/test/smoke.js +++ b/test/smoke.js @@ -192,10 +192,12 @@ try { const getConfig = await call('GET', '/v1/admin/configs/smoke-fleet', { token: admin.token }) const pinnedEntry = getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === 'hypaware-plugin-smoke') - check('stored config pins plugin version + content hash', - pinnedEntry.version === '1.2.3' && typeof pinnedEntry.content_hash === 'string' && pinnedEntry.content_hash.length === 64, pinnedEntry) + check('stored config pins plugin version + artifact hash', + pinnedEntry.version === '1.2.3' && typeof pinnedEntry.artifact_hash === 'string' && pinnedEntry.artifact_hash.length === 64, pinnedEntry) + check('pinned source stays the operator\'s raw string (client re-resolves it)', + pinnedEntry.source === pluginDir, pinnedEntry) check('bundled plugin entries stay unpinned', - getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').content_hash === undefined) + getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').artifact_hash === undefined) const saveAgain = await call('PUT', '/v1/admin/configs/smoke-fleet', { token: admin.token, json: fleetConfig }) check('identical re-save is revisionless', saveAgain.status === 200 && saveAgain.body.revision_created === false && saveAgain.body.etag === configEtag1) @@ -379,7 +381,10 @@ try { // present — mirror that here. const proxyRow = { gateway_id: 'client-local-gw', - schema_version: 4, + schema_version: 6, + // session_id is the non-null partition key (schema v6, LLP 0030); + // conversation_id rides along nullable. Both present here. + session_id: 'sess-1', conversation_id: 'conv-1', provider: 'smoke', conversation_started_at: new Date().toISOString(), @@ -396,7 +401,7 @@ try { token: gw2.jwt, ndjson: JSON.stringify(proxyRow) + '\n', }) - check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202) + check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202, proxyIngest.body) await runMover() const proxyCount = await sql('SELECT COUNT(*) AS n FROM ai_gateway_messages') check('proxy rows landed', Number(proxyCount.rows[0].n) === 1, proxyCount.rows) From 264b06d5735bbf0d76a34bc2af2da34d47a7fc14 Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Mon, 22 Jun 2026 13:34:35 -0700 Subject: [PATCH 2/4] feat: server-side context graph (GitHub capture + T0 projection + neighbors) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Load @hypaware/context-graph and a vendored @hypaware/github into the server's own kernel so the server can capture github_events directly, project the T0 graph, and answer neighbors queries over its own cache — where the forwarded 1.6 LLM logs also live, enabling GitHub<->LLM convergence (hypaware LLP 0032). - boot: activate context-graph + github (poll source dormant); inject the [github] section from HYPSERVER_GITHUB_* env, token stays in box env - daemon: services.githubBackfill/graphProject/graphNeighbors reuse the plugins' pure functions over the kernel query+storage; flush github_events after capture so it is immediately queryable - routes-admin + admin CLI: github-backfill, graph-project, graph-neighbors - registry: self-managed graph datasets keep their own read closures (source=/graph_v1 layout), not the date= synthesis used for wire ingest - shim: re-export projectGraph/queryNeighbors/requireGraphRuntime anchored on bundledWorkspaceDir for module-singleton identity - smoke: full hermetic backfill->project->neighbors chain (no network) - LLP 0010 documents the decision; plugins/github vendored (own ref corpus) Co-Authored-By: Claude Opus 4.8 (1M context) --- bin/admin.js | 27 ++ llp/0010-server-side-graph.decision.md | 133 ++++++++ package-lock.json | 11 +- plugins/github/hypaware.plugin.json | 34 +++ plugins/github/src/capture.js | 407 +++++++++++++++++++++++++ plugins/github/src/commands.js | 118 +++++++ plugins/github/src/config.js | 174 +++++++++++ plugins/github/src/cursors.js | 109 +++++++ plugins/github/src/dataset.js | 184 +++++++++++ plugins/github/src/github_client.js | 265 ++++++++++++++++ plugins/github/src/graph_contract.js | 355 +++++++++++++++++++++ plugins/github/src/index.js | 91 ++++++ plugins/github/src/keys.js | 185 +++++++++++ plugins/github/src/runtime.js | 52 ++++ plugins/github/src/source.js | 86 ++++++ plugins/github/src/tick.js | 48 +++ plugins/github/src/types.d.ts | 243 +++++++++++++++ src/boot.js | 32 ++ src/catalog/registry.js | 21 +- src/config.js | 22 ++ src/daemon.js | 54 +++- src/http/routes-admin.js | 79 +++++ src/kernel/shim.js | 18 ++ src/types.d.ts | 52 ++++ test/smoke.js | 79 +++++ 25 files changed, 2870 insertions(+), 9 deletions(-) create mode 100644 llp/0010-server-side-graph.decision.md create mode 100644 plugins/github/hypaware.plugin.json create mode 100644 plugins/github/src/capture.js create mode 100644 plugins/github/src/commands.js create mode 100644 plugins/github/src/config.js create mode 100644 plugins/github/src/cursors.js create mode 100644 plugins/github/src/dataset.js create mode 100644 plugins/github/src/github_client.js create mode 100644 plugins/github/src/graph_contract.js create mode 100644 plugins/github/src/index.js create mode 100644 plugins/github/src/keys.js create mode 100644 plugins/github/src/runtime.js create mode 100644 plugins/github/src/source.js create mode 100644 plugins/github/src/tick.js create mode 100644 plugins/github/src/types.d.ts diff --git a/bin/admin.js b/bin/admin.js index aaace9a..7ed0ddd 100644 --- a/bin/admin.js +++ b/bin/admin.js @@ -18,6 +18,9 @@ * hypaware-server-admin activate-config * hypaware-server-admin query "SELECT ..." * hypaware-server-admin run-mover | run-archive [--force] | run-eviction + * hypaware-server-admin github-backfill [owner/repo ...] + * hypaware-server-admin graph-project [--source ] [--dry-run] + * hypaware-server-admin graph-neighbors [--depth N] [--type T] [--edge-type T] [--direction out|in|both] [--limit N] * * Env: HYPSERVER_URL (default http://127.0.0.1:8740), HYPSERVER_ADMIN_TOKEN */ @@ -122,6 +125,30 @@ switch (command) { case 'run-eviction': await call('POST', '/v1/admin/eviction/run') break + case 'github-backfill': + // Positional owner/repo args narrow the configured selection; none = all. + await call('POST', '/v1/admin/github/backfill', { repos: rest.filter((a) => !a.startsWith('--')) }) + break + case 'graph-project': + await call('POST', '/v1/admin/graph/project', { + ...(flagValue('--source') ? { source: flagValue('--source') } : {}), + dry_run: rest.includes('--dry-run'), + }) + break + case 'graph-neighbors': { + const depth = flagValue('--depth') + const limit = flagValue('--limit') + const node = rest[0] && !rest[0].startsWith('--') ? rest[0] : '' + await call('POST', '/v1/admin/graph/neighbors', { + node, + ...(flagValue('--type') ? { type: flagValue('--type') } : {}), + ...(flagValue('--edge-type') ? { edge_types: [flagValue('--edge-type')] } : {}), + ...(flagValue('--direction') ? { direction: flagValue('--direction') } : {}), + ...(depth !== undefined ? { depth: Number(depth) } : {}), + ...(limit !== undefined ? { limit: Number(limit) } : {}), + }) + break + } default: console.error('unknown command; see header comment for usage') process.exit(2) diff --git a/llp/0010-server-side-graph.decision.md b/llp/0010-server-side-graph.decision.md new file mode 100644 index 0000000..9db9a71 --- /dev/null +++ b/llp/0010-server-side-graph.decision.md @@ -0,0 +1,133 @@ +# LLP 0010: Server-Side Graph — GitHub Capture, T0 Projection, and Neighbors as Admin Operations + +**Type:** Decision +**Status:** Active +**Systems:** Query, Core +**Author:** Phil / Claude +**Date:** 2026-06-22 +**Related:** LLP 0002, LLP 0004, LLP 0006 + +> The server now holds a context graph, not just forwarded logs. It loads the +> bundled `@hypaware/context-graph` plugin and a vendored `@hypaware/github` +> source plugin into its own kernel, captures GitHub activity directly, and +> exposes `graph project` / `graph neighbors` as admin operations — so the +> forwarded LLM sessions ([hypaware LLP 0032 git-bridge](../../hypaware/llp)) +> and GitHub activity converge into one node/edge graph on the server. + +## Why on the server at all + +The admin attach is a read-only SQL endpoint ([LLP 0006](./0006-admin-query-attach.decision.md)): +it can read the server's cache + archive but cannot invoke a plugin command. And +the graph's whole point is convergence — `Repo`/`Actor`/`Commit`/`File` nodes +minted from GitHub activity sharing content-addressed ids with nodes minted from +LLM sessions (the `git_remote`/`head_sha`/`repo_root` columns 1.6 added to +`ai_gateway_messages`). That convergence only happens where both datasets live in +one kernel cache. The forwarded LLM logs live in the **server's** cache. So the +projection must run there, not in a side client reading the server over SQL. + +## Decision + +**The server activates the graph plugins in its own +kernel.** `pluginSelection()` ([boot.js](../src/boot.js)) gains two entries +before the control plane: `@hypaware/context-graph` (from the bundled workspace +that ships inside the `hypaware` dependency) and `@hypaware/github` (vendored — +see below). The kernel resolver orders context-graph first from github's +`requires.plugins`, so github's eager `requireCapability('hypaware.context-graph')` +resolves. The plugins activate for their **registrations** — github_events / +node / edge datasets, the github T0 contract, and the `github backfill` / +`graph project` / `graph neighbors` commands — exactly as on a client. + +This reuses the kernel-host substrate of [LLP 0002](./0002-kernel-reuse.decision.md): +the server expresses behavior as activated plugins, not bespoke code. Projection +and traversal are the graph plugin's own **pure functions** — `projectGraph()` and +`queryNeighbors()` — invoked over the server kernel's `runtime.query` / +`runtime.storage` handles (the same handles `executeSql` uses). The registered +contracts are read through the plugin's process-global registry singleton +(`requireGraphRuntime().registry.list()`) — the identical seam the in-process +`hyp graph project` command uses — so nothing in the `hypaware` repo had to change. +The deep reach into the bundled plugin tree is funnelled through the one sanctioned +shim ([kernel/shim.js](../src/kernel/shim.js), [LLP 0002#host-shim](./0002-kernel-reuse.decision.md#host-shim)), +anchored on the same `bundledWorkspaceDir` the loader used so the singleton is the +very instance activation populated. + +**The github plugin is vendored** under [`plugins/github/`](../plugins/github/) +rather than added as a dependency: it has no git remote yet, and `plugins/` is +already on the server's plugin path and already copied by the Docker build, so +vendoring adds the plugin with no Dockerfile or build-context change. Keep exactly +one copy of its source tree — the runtime singletons depend on module identity. + +**The server pulls GitHub directly; capture is an +admin one-shot, not a daemon source.** The github plugin registers a poll source, +but the server never starts it — and the wrapped source registry would suppress it +anyway ([LLP 0002](./0002-kernel-reuse.decision.md), only `@hypaware-server/*` may +start sources). Instead, `github backfill` runs on demand via the admin surface, +calling the plugin's `runCaptureTick(runtime, { mode: 'backfill' })` and then +flushing the `github_events` cache table so the freshly captured rows are queryable +on the same kernel. Repo selection (`orgs` / `repos` / `ignore`) comes from server +env (`HYPSERVER_GITHUB_*`), injected as the plugin's `[github]` config section by +`githubSection()` in boot. The GitHub token stays in the box environment under the +name in `token_env` (default `GITHUB_TOKEN`) and is read by the github client at +request time — never in config, consistent with the secrets-never-in-config +invariant ([LLP 0000](./0000-hypaware-server.explainer.md)). The box therefore now +makes outbound calls to the GitHub API; this is the one network egress the server +performs beyond its archive destination. + +**Graph operations are admin operations on the server +kernel, the precedent [LLP 0006](./0006-admin-query-attach.decision.md) set for SQL.** +`POST /v1/admin/github/backfill`, `/v1/admin/graph/project`, and +`/v1/admin/graph/neighbors` ([routes-admin.js](../src/http/routes-admin.js)) sit +behind the same admin bearer token, alongside the mover / archive / eviction +escape hatches. They are *operations*, not fleet config ([LLP 0009](./0009-remote-config.spec.md)): +nothing here is served to gateways. Each has a one-line `hypaware-server-admin` +wrapper for docker-exec workflows ([LLP 0008#admin-visibility](./0008-fleet-enrollment.spec.md#admin-visibility)). +Projection is idempotent — content-addressed ids plus pre-write dedup mean a +re-run with no new source data writes zero rows — so an operator firing +`graph-project` after each backfill (or after a fresh forward of LLM logs) just +folds new activity into the existing graph. + +**Graph datasets keep their own read closures; +the server's date-partition synthesis is only for forwarded ingest.** The +catalog-backed registry ([LLP 0004#catalog-backed-registry](./0004-dataset-catalog.spec.md#catalog-backed-registry)) +synthesizes `discoverPartitions` / `createDataSource` that read `date=` partitions — +correct for the wire-ingested datasets the mover fills (logs, traces, metrics, +ai_gateway_messages), and it deliberately discards the client-side closures those +plugins ship because they assume a client layout. But `github_events` is +`source=`-partitioned and `node` / `edge` are `graph_v1`-partitioned, and all three +are written **directly** into the cache by the in-kernel plugins (never wire-ingested), +each carrying a `createDataSource` authored for its own layout. So +[registry.js](../src/catalog/registry.js) routes datasets from the self-managed +plugins (`@hypaware/server`, `@hypaware/context-graph`, `@hypaware/github`) to the +`custom` map — keeping their closures — and everything else to the catalog seed + +date-partition synthesis. The split is by plugin, not by closure presence, because +`@hypaware/ai-gateway` also ships a `createDataSource` yet must take the synthesized +path. Without this, projection and neighbors silently read zero rows even though the +data is on disk. + +## Consequences and bounds + +- `github_events` / `node` / `edge` are cache-resident and never archived (no wire + signal, no archive ack). Ack-coupled eviction ([LLP 0003](./0003-spool-durability.spec.md)) + refuses to evict unarchived partitions, so they are not lost; they also are not + `date=`-partitioned, so the date-based eviction sweep does not touch them. +- A concurrent backfill and projection stay safe: pre-write dedup plus + content-addressed ids keep projection idempotent, and `graph compact` (a graph + command) cleans any residue from interleaved runs. +- The hermetic smoke test ([test/smoke.js](../test/smoke.js)) drives the full chain + — inject an in-memory GitHub client into the runtime singleton, `github backfill`, + `graph project` (asserting idempotence on re-run), and `graph neighbors` — with no + network, exercising the real cache append + read path. +- **`plugins/github/` is vendored third-party code and is out of scope for this + repo's `/ref-check`.** Its source carries `@ref LLP NNNN` annotations that resolve + against the **`@hypaware/github`** corpus (where, e.g., LLP 0002 is "capture-model"), + not this server's (where LLP 0002 is "kernel-reuse"). Those refs are honest in their + own repo; treat the vendored tree like a dependency and exclude it when validating + this repo's references. The server's own code carries the `@ref LLP 0010` annotations + for everything decided here. + +## Not in scope + +Ongoing server-side polling (the github poll source stays dormant by design), +enrichment beyond T0 (a non-goal for these datasets), and serving the github plugin +to fleet clients (clients do not capture GitHub in this design). A future +metadata-level partition spec for the graph datasets is deferred, as for the +archive ([LLP 0005#day-aligned-exports](./0005-archive-sink.decision.md#day-aligned-exports)). diff --git a/package-lock.json b/package-lock.json index 8a8d29b..f6a7d45 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,28 +20,29 @@ } }, "../hypaware": { - "version": "1.3.0", + "version": "1.6.0", "dependencies": { "@aws-sdk/client-s3": "3.1053.0", "@aws-sdk/credential-provider-ini": "3.972.43", "hyparquet": "1.26.0", "hyparquet-compressors": "1.1.1", - "icebird": "0.8.5", - "squirreling": "0.12.23" + "icebird": "0.8.10", + "squirreling": "0.12.24" }, "bin": { "hyp": "bin/hypaware.js", "hypaware": "bin/hypaware.js" }, "devDependencies": { - "@types/node": "25.9.2", + "@types/node": "26.0.0", "typescript": "6.0.3" }, "engines": { "node": ">=20" }, "optionalDependencies": { - "hyparquet-writer": "0.15.6" + "hyparquet-writer": "0.15.6", + "hypvector": "0.1.1" } }, "node_modules/fzstd": { diff --git a/plugins/github/hypaware.plugin.json b/plugins/github/hypaware.plugin.json new file mode 100644 index 0000000..c2ffe0a --- /dev/null +++ b/plugins/github/hypaware.plugin.json @@ -0,0 +1,34 @@ +{ + "schema_version": 1, + "name": "@hypaware/github", + "version": "0.1.0", + "hypaware_api": "^1.0.0", + "runtime": "node", + "node_engine": ">=20", + "entrypoint": "./src/index.js", + "description": "GitHub source plugin built for the context-graph. Captures GitHub activity (issues, pull requests, commits, files, reviews, comments) into a thin append-only github_events skeleton and bundles a T0 projection contract that maps it into the node/edge graph with bridge-ready natural keys. Capture is poll/backfill/sync; projection is the graph plugin's `hyp graph project`.", + "permissions": ["network", "read_state", "write_state"], + "requires": { + "plugins": { + "@hypaware/context-graph": "^0.1.0" + }, + "capabilities": { + "hypaware.context-graph": "^1.0.0" + } + }, + "contributes": { + "datasets": [ + { "name": "github_events", "summary": "Append-only GitHub activity skeleton (structural columns only; content stays in GitHub)" } + ], + "config_sections": [ + { "section": "github", "summary": "Repo/org selection, ignore list, optional poll interval, and token env-var name" } + ], + "sources": [ + { "name": "github", "summary": "Ongoing poll (opt-in via poll_interval): append events past each repo's cursor" } + ], + "commands": [ + { "name": "github backfill", "summary": "Pull a repo's full history into github_events (cold-start)" }, + { "name": "github sync", "summary": "Run one poll tick now, off the daemon" } + ] + } +} diff --git a/plugins/github/src/capture.js b/plugins/github/src/capture.js new file mode 100644 index 0000000..d996fb4 --- /dev/null +++ b/plugins/github/src/capture.js @@ -0,0 +1,407 @@ +// @ts-check + +import { commitKey, repoKey, str } from './keys.js' +import { cursorFor } from './cursors.js' + +/** + * Capture orchestration: turn the configured repo selection into appended + * `github_events` rows. Storage-agnostic — `append` and the GitHub `client` + * are injected, so tests drive the whole pipeline with a fake client and a + * row collector (no network, no cache). + * + * Three `ignore[]` invariants are enforced **here, at capture** (LLP 0002): + * rows for an ignored repo never enter `github_events`, so they never reach the + * graph; ignore is forward-only (it filters the *fetch*, it does not retract + * already-captured rows); and it is exact `owner/repo` match (case-insensitive, + * since GitHub is). There is no projection-time repo filter. + * + * @ref LLP 0002#three-invariants [implements] — ignore is enforced at capture, forward-only, exact-match + * + * @import { CursorState, GithubClient, GithubCommit, GithubConfig, PluginLogger, RepoCursor } from './types.d.ts' + */ + +/** + * Resolve the repo set to capture: `repos[]` ∪ (every repo enumerated from each + * `orgs[]` entry), minus `ignore[]` (which always wins). Returns canonical + * lowercased `owner/repo` names, deduped. + * + * @param {GithubConfig} config + * @param {GithubClient} client + * @param {PluginLogger} log + * @returns {Promise} + */ +export async function resolveRepos(config, client, log) { + const ignore = new Set(config.ignore.map((r) => r.toLowerCase())) + /** @type {Set} */ + const selected = new Set() + + for (const full of config.repos) { + const key = repoKey(full) + if (key) selected.add(key) + } + + for (const org of config.orgs) { + let enumerated + try { + enumerated = await client.listOrgRepos(org) + } catch (err) { + log.error('github.org_enumeration_failed', { org, error: errMessage(err) }) + continue + } + for (const full of enumerated) { + const key = repoKey(full) + if (key) selected.add(key) + } + } + + return [...selected].filter((repo) => !ignore.has(repo)).sort() +} + +/** + * Capture every selected repo. `mode` controls the starting cursor: + * - `backfill` resets each repo's cursor to empty (fetch full history) but + * still persists the advanced cursor, seeding the poller so it resumes past + * history rather than re-fetching it; + * - `poll` starts from the persisted cursor (incremental). + * + * Per-repo errors are caught and recorded, not thrown, so one bad repo never + * kills a whole tick. The caller persists `cursors` after this resolves. + * + * @param {object} args + * @param {GithubClient} args.client + * @param {GithubConfig} args.config + * @param {CursorState} args.cursors + * @param {(rows: Record[]) => Promise} args.append + * @param {PluginLogger} args.log + * @param {'backfill' | 'poll'} args.mode + * @param {string[]} [args.only] restrict to these repos (e.g. `hyp github backfill owner/repo`) + * @returns {Promise<{ repos: number, events: number, errors: Array<{ repo: string, error: string }> }>} + */ +export async function captureRepos({ client, config, cursors, append, log, mode, only }) { + let repos = await resolveRepos(config, client, log) + if (only && only.length > 0) { + const onlySet = new Set(only.map((r) => r.toLowerCase())) + repos = repos.filter((r) => onlySet.has(r)) + } + + let events = 0 + /** @type {Array<{ repo: string, error: string }>} */ + const errors = [] + + for (const repo of repos) { + if (mode === 'backfill') cursors.repos[repo] = {} + const cursor = cursors.repos[repo] ?? (cursors.repos[repo] = {}) + try { + const n = await captureRepo({ client, repo, cursor, mode, append, log }) + events += n + } catch (err) { + const message = errMessage(err) + errors.push({ repo, error: message }) + log.error('github.repo_capture_failed', { repo, error: message }) + } + // Persist the advanced cursor onto the shared state after each repo. + cursors.repos[repo] = cursor + } + + return { repos: repos.length, events, errors } +} + +/** + * Capture one repo through every pass, appending `github_events` rows. Returns + * the number of event rows written. + * + * @param {object} args + * @param {GithubClient} args.client + * @param {string} args.repo canonical `owner/repo` + * @param {RepoCursor} args.cursor + * @param {'backfill' | 'poll'} args.mode + * @param {(rows: Record[]) => Promise} args.append + * @param {PluginLogger} args.log + * @returns {Promise} + */ +async function captureRepo({ client, repo, cursor, mode, append }) { + const [owner, name] = repo.split('/') + /** @type {Set} */ + const emitted = new Set() + /** @type {Set} */ + const prNumbers = new Set() + let written = 0 + + /** @param {Record[]} rows */ + async function flush(rows) { + const fresh = rows.filter((r) => { + const id = String(r.event_id) + if (emitted.has(id)) return false + emitted.add(id) + return true + }) + if (fresh.length === 0) return + await append(fresh) + written += fresh.length + } + + // --- issues (non-PR; PRs come from the pulls pass) --- + const issues = await client.listIssues(owner, name, cursor) + await flush( + issues + .filter((it) => !it.pull_request) + .map((it) => issueRow(repo, it)), + ) + + // --- pull requests + their sub-resources --- + const pulls = await client.listPullRequests(owner, name, cursor) + for (const pr of pulls) prNumbers.add(pr.number) + await flush(pulls.map((pr) => pullRow(repo, pr))) + + // Only descend into a PR's sub-resources when the PR is new/changed since the + // last run (backfill processes all). Bounds the per-tick N+1 over PRs. + const pullsHigh = cursor.since?.pulls + const toDescend = mode === 'backfill' ? pulls : pulls.filter((pr) => changedSince(pr, pullsHigh)) + for (const pr of toDescend) { + const files = await client.listPullRequestFiles(owner, name, pr.number) + await flush(files.map((path) => prFileRow(repo, pr, path))) + + const reviews = await client.listPullRequestReviews(owner, name, pr.number) + await flush(reviews.map((rv) => reviewRow(repo, pr, rv))) + + const prCommits = await client.listPullRequestCommits(owner, name, pr.number) + await flush(prCommits.map((c) => commitRow(repo, c, pr.number))) + } + advancePullsHigh(cursor, pulls) + + // --- repo-level commits + their files --- + const commits = await client.listCommits(owner, name, cursor) + await flush(commits.map((c) => commitRow(repo, c, null))) + for (const c of commits) { + const files = await client.listCommitFiles(owner, name, c.sha) + await flush(files.map((path) => commitFileRow(repo, c, path))) + } + + // --- conversation comments (on issues AND PRs) --- + const comments = await client.listIssueComments(owner, name, cursor) + await flush( + comments + .map((c) => commentRow(repo, c, prNumbers)) + .filter((r) => r !== null), + ) + + return written +} + +/* ---------------------------------------------------------------- row builders */ + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubIssue} it + * @returns {Record} + */ +function issueRow(repo, it) { + return base('issue', `issue:${repoKey(repo)}#${it.number}`, repo, { + actor_login: loginOf(it.user), + actor_type: typeOf(it.user), + number: it.number, + state: str(it.state), + created_at: str(it.created_at), + }) +} + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubPull} pr + * @returns {Record} + */ +function pullRow(repo, pr) { + const merged = pr.merged_at != null + return base('pull_request', `pr:${repoKey(repo)}#${pr.number}`, repo, { + actor_login: loginOf(pr.user), + actor_type: typeOf(pr.user), + number: pr.number, + state: merged ? 'merged' : str(pr.state), + created_at: str(pr.created_at), + payload: { merged, draft: pr.draft === true }, + }) +} + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubPull} pr + * @param {string} path + * @returns {Record} + */ +function prFileRow(repo, pr, path) { + return base('pull_request_file', `prfile:${repoKey(repo)}#${pr.number}:${path}`, repo, { + number: pr.number, + path, + created_at: str(pr.created_at), + }) +} + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubPull} pr + * @param {import('./types.d.ts').GithubReview} rv + * @returns {Record} + */ +function reviewRow(repo, pr, rv) { + return base('review', `review:${rv.id}`, repo, { + actor_login: loginOf(rv.user), + actor_type: typeOf(rv.user), + review_id: rv.id, + review_state: str(rv.state), + pr_number: pr.number, + created_at: str(rv.submitted_at), + }) +} + +/** + * A commit row. `pr_number` is set when the commit was captured under a PR + * (enabling `PullRequest -references-> Commit`); the repo-level pass passes null. + * + * @param {string} repo + * @param {GithubCommit} c + * @param {number | null} prNumber + * @returns {Record} + */ +function commitRow(repo, c, prNumber) { + const id = prNumber == null ? `commit:${commitKey(c.sha)}` : `commit:${commitKey(c.sha)}:pr${prNumber}` + return base('commit', id, repo, { + actor_login: loginOf(c.author), + actor_type: typeOf(c.author), + sha: str(c.sha), + pr_number: prNumber, + created_at: commitDate(c), + }) +} + +/** + * @param {string} repo + * @param {GithubCommit} c + * @param {string} path + * @returns {Record} + */ +function commitFileRow(repo, c, path) { + return base('commit_file', `commitfile:${commitKey(c.sha)}:${path}`, repo, { + sha: str(c.sha), + path, + created_at: commitDate(c), + }) +} + +/** + * A conversation comment. Discriminated into `pull_request_comment` vs + * `issue_comment` by whether its subject number was seen as a PR this run. + * + * @param {string} repo + * @param {import('./types.d.ts').GithubComment} c + * @param {Set} prNumbers + * @returns {Record | null} + */ +function commentRow(repo, c, prNumbers) { + const number = numberFromIssueUrl(c.issue_url) + if (number == null) return null + const onPr = prNumbers.has(number) + return base(onPr ? 'pull_request_comment' : 'issue_comment', `comment:${c.id}`, repo, { + actor_login: loginOf(c.user), + actor_type: typeOf(c.user), + number, + created_at: str(c.created_at), + }) +} + +/** + * Build a row with every `github_events` column present (the cache writer wants + * a stable shape); unset columns are null. `extra` overrides the nulls. + * + * @param {string} eventType + * @param {string} eventId + * @param {string} repo + * @param {Record} extra + * @returns {Record} + */ +function base(eventType, eventId, repo, extra) { + return { + event_id: eventId, + event_type: eventType, + repo, + actor_login: null, + actor_type: null, + number: null, + sha: null, + path: null, + review_id: null, + review_state: null, + state: null, + pr_number: null, + created_at: null, + payload: null, + ...extra, + } +} + +/* ---------------------------------------------------------------- helpers */ + +/** @param {import('./types.d.ts').GithubActor | null | undefined} actor */ +function loginOf(actor) { + return actor && typeof actor.login === 'string' ? actor.login : null +} + +/** @param {import('./types.d.ts').GithubActor | null | undefined} actor */ +function typeOf(actor) { + return actor && typeof actor.type === 'string' ? actor.type : null +} + +/** @param {GithubCommit} c @returns {string | null} */ +function commitDate(c) { + const author = c.commit && typeof c.commit.author === 'object' ? c.commit.author : null + return author && typeof author.date === 'string' ? author.date : null +} + +/** + * Extract the trailing issue/PR number from a comment's `issue_url` + * (`.../issues/123`). + * + * @param {string | undefined} issueUrl + * @returns {number | null} + */ +function numberFromIssueUrl(issueUrl) { + if (!issueUrl) return null + const m = /\/issues\/(\d+)(?:$|[?#])/.exec(issueUrl) + return m ? Number(m[1]) : null +} + +/** + * @param {import('./types.d.ts').GithubPull} pr + * @param {string | undefined} high + * @returns {boolean} + */ +function changedSince(pr, high) { + if (!high) return true + const updated = updatedAt(pr) + return updated == null || updated > high +} + +/** + * @param {CursorState['repos'][string]} cursor + * @param {import('./types.d.ts').GithubPull[]} pulls + */ +function advancePullsHigh(cursor, pulls) { + let max = cursor.since?.pulls + for (const pr of pulls) { + const t = updatedAt(pr) + if (t && (!max || t > max)) max = t + } + if (max) { + if (!cursor.since) cursor.since = {} + cursor.since.pulls = max + } +} + +/** @param {import('./types.d.ts').GithubPull} pr @returns {string | null} */ +function updatedAt(pr) { + return typeof pr.updated_at === 'string' ? pr.updated_at : typeof pr.created_at === 'string' ? pr.created_at : null +} + +/** @param {unknown} err @returns {string} */ +function errMessage(err) { + return err instanceof Error ? err.message : String(err) +} diff --git a/plugins/github/src/commands.js b/plugins/github/src/commands.js new file mode 100644 index 0000000..1f7cfe7 --- /dev/null +++ b/plugins/github/src/commands.js @@ -0,0 +1,118 @@ +// @ts-check + +import { requireGithubRuntime } from './runtime.js' +import { runCaptureTick } from './tick.js' + +/** + * @import { CommandRunContext } from './types.d.ts' + */ + +/** `owner/repo` — exactly one slash, non-empty halves, no whitespace. */ +const REPO_SLUG = /^[^/\s]+\/[^/\s]+$/ + +/** + * `hyp github` — usage banner. + * + * @param {string[]} _argv + * @param {CommandRunContext} ctx + * @returns {Promise} + */ +export async function runGithub(_argv, ctx) { + ctx.stdout.write( + 'hyp github \n' + + ' backfill [owner/repo ...] pull full history into github_events (cold-start)\n' + + ' sync run one poll tick now (off the daemon)\n' + + "\nthen run 'hyp graph project' to project github_events into the node/edge graph\n", + ) + return 0 +} + +/** + * `hyp github backfill [owner/repo ...]` — the deliberate cold-start pull of + * full history (polling is forward-only, so a freshly-configured repo has years + * of history a poller would never see — LLP 0002). With no positional repos it + * backfills the whole configured selection. Fills `github_events` only; + * projection is a separate `hyp graph project`. + * + * @param {string[]} argv + * @param {CommandRunContext} ctx + * @returns {Promise} + */ +export async function runGithubBackfill(argv, ctx) { + const parsed = parseRepoArgv(argv) + if (!parsed.ok) { + ctx.stderr.write(`hyp github backfill: ${parsed.error}\n`) + return 2 + } + try { + const runtime = requireGithubRuntime() + const only = parsed.repos.length > 0 ? parsed.repos : undefined + + if (runtime.config.repos.length === 0 && runtime.config.orgs.length === 0) { + ctx.stderr.write('hyp github backfill: no repos configured ([github] repos / orgs are empty)\n') + return 1 + } + + const result = await runCaptureTick(runtime, { mode: 'backfill', only }) + ctx.stdout.write(`github backfill: ${result.events} event(s) across ${result.repos} repo(s)\n`) + reportErrors(ctx, result.errors) + if (only && result.repos === 0) { + ctx.stderr.write(`hyp github backfill: none of [${only.join(', ')}] are in the configured selection\n`) + return 1 + } + ctx.stdout.write("run 'hyp graph project' to project github_events into the graph\n") + return result.errors.length > 0 ? 1 : 0 + } catch (err) { + ctx.stderr.write(`hyp github backfill: ${errMessage(err)}\n`) + return 1 + } +} + +/** + * `hyp github sync` — run one poll tick now, off the daemon (the manual + * analogue of the ongoing source; for tests and demos — LLP 0002). + * + * @param {string[]} _argv + * @param {CommandRunContext} ctx + * @returns {Promise} + */ +export async function runGithubSync(_argv, ctx) { + try { + const runtime = requireGithubRuntime() + const result = await runCaptureTick(runtime, { mode: 'poll' }) + ctx.stdout.write(`github sync: ${result.events} event(s) across ${result.repos} repo(s)\n`) + reportErrors(ctx, result.errors) + return result.errors.length > 0 ? 1 : 0 + } catch (err) { + ctx.stderr.write(`hyp github sync: ${errMessage(err)}\n`) + return 1 + } +} + +/** + * @param {string[]} argv + * @returns {{ ok: true, repos: string[] } | { ok: false, error: string }} + */ +function parseRepoArgv(argv) { + /** @type {string[]} */ + const repos = [] + for (const token of argv) { + if (token.startsWith('--')) return { ok: false, error: `unknown flag ${token}` } + if (!REPO_SLUG.test(token)) return { ok: false, error: `expected "owner/repo", got ${token}` } + repos.push(token) + } + return { ok: true, repos } +} + +/** + * @param {CommandRunContext} ctx + * @param {Array<{ repo: string, error: string }>} errors + */ +function reportErrors(ctx, errors) { + for (const e of errors) ctx.stderr.write(` ! ${e.repo}: ${e.error}\n`) +} + +/** @param {unknown} err @returns {string} */ +function errMessage(err) { + return err instanceof Error ? err.message : String(err) +} diff --git a/plugins/github/src/config.js b/plugins/github/src/config.js new file mode 100644 index 0000000..2651a96 --- /dev/null +++ b/plugins/github/src/config.js @@ -0,0 +1,174 @@ +// @ts-check + +/** + * Config validation for the `[github]` section. Pure and dependency-free + * (mirrors the enrich / embedder / vector-search validators): returns a + * normalized config or a list of `github_config_invalid` errors. + * + * @import { GithubConfig, GithubConfigError, GithubConfigResult } from './types.d.ts' + */ + +export const CONFIG_DEFAULTS = Object.freeze({ + // The env-var NAME the token is read from at call time — never the token + // itself. Config carries only the name; the value is resolved in the client + // and never logged (LLP 0001 / hypaware LLP 0028 credential posture). + token_env: 'GITHUB_TOKEN', +}) + +/** `owner/repo` — exactly one slash, non-empty halves, no whitespace. */ +const REPO_SLUG = /^[^/\s]+\/[^/\s]+$/ +/** A bare org / owner login — no slash, no whitespace. */ +const LOGIN = /^[^/\s]+$/ +/** A POSIX env-var name (what `token_env` holds — the NAME, not the secret). */ +const ENV_NAME = /^[A-Za-z_][A-Za-z0-9_]*$/ +/** A duration like `10m`, `30s`, `1h`, `500ms`. */ +const DURATION = /^(\d+)(ms|s|m|h)$/ + +/** + * @param {unknown} value + * @returns {GithubConfigResult} + */ +export function validateGithubConfig(value) { + /** @type {GithubConfigError[]} */ + const errors = [] + + if (value !== undefined && (value === null || typeof value !== 'object' || Array.isArray(value))) { + errors.push(invalid('', 'github config must be an object')) + return { ok: false, errors } + } + const raw = /** @type {Record} */ (value ?? {}) + + const repos = readSlugArray(raw, 'repos', errors) ?? [] + const orgs = readLoginArray(raw, 'orgs', errors) ?? [] + const ignore = readSlugArray(raw, 'ignore', errors) ?? [] + const token_env = readEnvName(raw, 'token_env', errors) ?? CONFIG_DEFAULTS.token_env + const poll_interval = readDuration(raw, 'poll_interval', errors) + + if (errors.length > 0) return { ok: false, errors } + + /** @type {GithubConfig} */ + const config = { + repos, + orgs, + ignore, + token_env, + ...(poll_interval !== undefined ? { poll_interval } : {}), + } + return { ok: true, config } +} + +/** + * Parse a duration string (`10m`) to milliseconds. Returns null on a malformed + * value. The source uses this to schedule its poll; validation uses it to + * reject bad `poll_interval`s up front. + * + * @param {string} value + * @returns {number | null} + */ +export function parseInterval(value) { + const m = DURATION.exec(value) + if (!m) return null + const n = Number(m[1]) + const unit = m[2] + const mult = unit === 'ms' ? 1 : unit === 's' ? 1000 : unit === 'm' ? 60_000 : 3_600_000 + return n * mult +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string[] | undefined} + */ +function readSlugArray(raw, key, errors) { + const arr = readArray(raw, key, errors) + if (arr === undefined) return undefined + /** @type {string[]} */ + const out = [] + for (const entry of arr) { + if (typeof entry !== 'string' || !REPO_SLUG.test(entry)) { + errors.push(invalid(`/${key}`, `${key} entries must be "owner/repo" strings`)) + return undefined + } + out.push(entry) + } + return out +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string[] | undefined} + */ +function readLoginArray(raw, key, errors) { + const arr = readArray(raw, key, errors) + if (arr === undefined) return undefined + /** @type {string[]} */ + const out = [] + for (const entry of arr) { + if (typeof entry !== 'string' || !LOGIN.test(entry)) { + errors.push(invalid(`/${key}`, `${key} entries must be bare org logins (no slash)`)) + return undefined + } + out.push(entry) + } + return out +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {unknown[] | undefined} + */ +function readArray(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (!Array.isArray(v)) { + errors.push(invalid(`/${key}`, `${key} must be an array`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string | undefined} + */ +function readEnvName(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'string' || !ENV_NAME.test(v)) { + errors.push(invalid(`/${key}`, `${key} must be an environment variable NAME (e.g. "GITHUB_TOKEN"), not a token`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string | undefined} + */ +function readDuration(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'string' || parseInterval(v) === null) { + errors.push(invalid(`/${key}`, `${key} must be a duration like "10m", "30s", or "1h"`)) + return undefined + } + return v +} + +/** + * @param {string} pointer + * @param {string} message + * @returns {GithubConfigError} + */ +function invalid(pointer, message) { + return { pointer, message, errorKind: 'github_config_invalid' } +} diff --git a/plugins/github/src/cursors.js b/plugins/github/src/cursors.js new file mode 100644 index 0000000..c61ac23 --- /dev/null +++ b/plugins/github/src/cursors.js @@ -0,0 +1,109 @@ +// @ts-check + +import fs from 'node:fs' +import path from 'node:path' +import { randomUUID } from 'node:crypto' + +/** + * Per-repo capture cursors, persisted as a single sidecar JSON under the + * plugin's state dir (the per-session-watermark pattern from + * `@hypaware/context-graph-enrich`, applied to repos). + * + * **Why a sidecar, not `github_events` columns.** `github_events` is the + * append-only activity *skeleton* (LLP 0005) — one event per row, no mutable + * bookkeeping. A poll cursor is mutable per-repo control state, not an + * observed event, so it lives beside the table, not in it. Backfill and the + * ongoing poll share the same cursor, so a backfilled repo's poller resumes + * past history rather than re-fetching it. + * + * @ref LLP 0002#cursoring [implements] — cursored per repo, shared by backfill + poll; cursors are sidecar state, not event columns + * + * @import { CursorState, RepoCursor } from './types.d.ts' + */ + +const STATE_FILE = 'github-cursors.json' +const SCHEMA_VERSION = 1 + +/** + * @param {string} stateDir + * @returns {CursorState} + */ +export function readCursors(stateDir) { + const file = path.join(stateDir, STATE_FILE) + try { + const parsed = JSON.parse(fs.readFileSync(file, 'utf8')) + if (parsed && parsed.schema_version === SCHEMA_VERSION && parsed.repos && typeof parsed.repos === 'object') { + return { schema_version: SCHEMA_VERSION, repos: readRepos(parsed.repos) } + } + } catch { + // Missing, malformed, or an older schema — start clean (a fresh poll + // re-reads from the configured horizon; backfill ignores cursors anyway). + } + return { schema_version: SCHEMA_VERSION, repos: {} } +} + +/** + * The cursor for one repo (`owner/repo`), or an empty cursor when unseen. + * + * @param {CursorState} state + * @param {string} repo + * @returns {RepoCursor} + */ +export function cursorFor(state, repo) { + return state.repos[repo] ?? {} +} + +/** + * @param {string} stateDir + * @param {CursorState} state + */ +export function writeCursors(stateDir, state) { + fs.mkdirSync(stateDir, { recursive: true }) + const file = path.join(stateDir, STATE_FILE) + const tmp = `${file}.tmp-${process.pid}-${randomUUID()}` + fs.writeFileSync(tmp, JSON.stringify(state, null, 2) + '\n', 'utf8') + fs.renameSync(tmp, file) +} + +/** + * @param {unknown} value + * @returns {Record} + */ +function readRepos(value) { + /** @type {Record} */ + const out = {} + for (const [repo, raw] of Object.entries(/** @type {Record} */ (value))) { + const cursor = readRepoCursor(raw) + if (cursor) out[repo] = cursor + } + return out +} + +/** + * @param {unknown} value + * @returns {RepoCursor | null} + */ +function readRepoCursor(value) { + if (!value || typeof value !== 'object') return null + const v = /** @type {Record} */ (value) + /** @type {RepoCursor} */ + const cursor = {} + if (v.since && typeof v.since === 'object') { + const s = /** @type {Record} */ (v.since) + /** @type {NonNullable} */ + const since = {} + for (const k of /** @type {const} */ (['issues', 'commits', 'comments', 'pulls'])) { + if (typeof s[k] === 'string') since[k] = /** @type {string} */ (s[k]) + } + if (Object.keys(since).length > 0) cursor.since = since + } + if (v.etag && typeof v.etag === 'object') { + /** @type {Record} */ + const etag = {} + for (const [k, e] of Object.entries(/** @type {Record} */ (v.etag))) { + if (typeof e === 'string') etag[k] = e + } + if (Object.keys(etag).length > 0) cursor.etag = etag + } + return cursor +} diff --git a/plugins/github/src/dataset.js b/plugins/github/src/dataset.js new file mode 100644 index 0000000..6d50e7c --- /dev/null +++ b/plugins/github/src/dataset.js @@ -0,0 +1,184 @@ +// @ts-check + +import path from 'node:path' + +/** + * @import { ColumnSpec, DatasetDataSourceContext, DatasetDiscoveryContext, DatasetRefreshResult, DatasetRegistration, QueryPartition, QueryStorageService, ExtendedQueryStorageService, AsyncDataSource } from './types.d.ts' + */ + +export const PLUGIN_NAME = '@hypaware/github' +export const DATASET_NAME = 'github_events' +export const PARTITION_LABEL = 'all' + +/** + * The thin **activity skeleton** the T0 contract reads — structural columns + * only, no content (LLP 0005). One append-only table, an `event_type` + * discriminator the rules filter on, and an atomic grain: a commit touching N + * files emits one `commit_file` row per file alongside the `commit` row, so + * every contract rule stays a clean 1:1 map (the ai-gateway `File`-rule shape). + * + * `event_type` values: + * issue | pull_request | commit | commit_file | pull_request_file | + * review | issue_comment | pull_request_comment + * + * No `Repo`/`Actor`/`File` row exists on its own — those nodes are minted by + * the contract from the columns on every event row that mentions them. Content + * (titles, bodies, diffs, comment text) stays in GitHub and is dereferenced via + * the natural keys (LLP 0003); it never lands here. + * + * @ref LLP 0005#concrete-columns [implements] — typed structural columns + discriminator + small payload, no content + * @type {ReadonlyArray} + */ +export const GITHUB_EVENTS_COLUMNS = Object.freeze([ + { name: 'event_id', type: 'STRING', nullable: false }, // deterministic, for in-run dedup + readability + { name: 'event_type', type: 'STRING', nullable: false }, // the discriminator the rules WHERE on + { name: 'repo', type: 'STRING', nullable: false }, // owner/repo as GitHub returns it; the contract normalizes + { name: 'actor_login', type: 'STRING', nullable: true }, + { name: 'actor_type', type: 'STRING', nullable: true }, // User | Bot | Organization (Actor prop) + { name: 'number', type: 'INT64', nullable: true }, // issue / PR / comment subject number + { name: 'sha', type: 'STRING', nullable: true }, // commit sha (full 40-hex) + { name: 'path', type: 'STRING', nullable: true }, // changed-file relpath (commit_file / pull_request_file) + { name: 'review_id', type: 'INT64', nullable: true }, + { name: 'review_state', type: 'STRING', nullable: true }, // APPROVED | CHANGES_REQUESTED | COMMENTED (Review prop) + { name: 'state', type: 'STRING', nullable: true }, // issue/PR state: open | closed | merged + { name: 'pr_number', type: 'INT64', nullable: true }, // linkage: PR a review is on / PR that references a commit + { name: 'created_at', type: 'TIMESTAMP', nullable: true }, // event time (primary timestamp) + { name: 'payload', type: 'JSON', nullable: true }, // small type-specific structural extras (no content) +]) + +/** @type {{ columns: ColumnSpec[] }} */ +export const GITHUB_EVENTS_SCHEMA = { columns: [...GITHUB_EVENTS_COLUMNS] } + +/** + * On-disk table path for `github_events`. The plugin writes through the kernel + * cache service; the service owns durable spool + Iceberg flush details. + * + * @param {QueryStorageService} storage + */ +export function githubEventsTablePath(storage) { + return storage.cacheTablePath(DATASET_NAME, [PARTITION_LABEL]) +} + +/** + * Discover the single `github_events` partition (gascity's single-partition + * shape — the cache is HypAware-managed, so discovery only surfaces the path). + * + * @param {DatasetDiscoveryContext} ctx + * @returns {QueryPartition[]} + */ +export function discoverParts(ctx) { + const cacheDir = ctx.cacheDir ?? '' + if (!cacheDir) return [] + const tablePath = path.join(cacheDir, 'datasets', DATASET_NAME, PARTITION_LABEL) + return [{ dataset: DATASET_NAME, partition: { partition: PARTITION_LABEL }, tablePath }] +} + +/** + * No external source file to refresh — capture writes rows through the cache + * service from the poll/backfill/sync paths — so report `skipped` (the sentinel + * the query layer tolerates), exactly like gascity and the graph datasets. + * + * @returns {Promise} + */ +export async function refreshPartition() { + return { status: 'skipped', rows: 0 } +} + +/** + * Build a squirreling-compatible data source over the materialized + * `github_events` rows. Returns an empty source when nothing is materialized so + * a query on a cold cache still succeeds. + * + * The kernel cache service writes capture rows in its **source-table** layout + * (`datasets/github_events/source=/table/…`), not under the bare + * `PARTITION_LABEL` path `discoverParts` hand-rolls — so reading + * `partition.tablePath` directly finds an empty directory and the contract + * projects nothing. Re-discover through the service (the same path + * `@hypaware/ai-gateway` uses for its live-ingested rows) and union whatever it + * surfaces, so flushed events are visible to queries and `graph project`. + * + * @param {QueryPartition[]} partitions + * @param {DatasetDataSourceContext} ctx + * @returns {Promise} + */ +export async function createDataSource(partitions, ctx) { + const storage = /** @type {ExtendedQueryStorageService} */ (ctx.storage) + const discovered = await storage.discoverCachePartitions({ datasets: [DATASET_NAME] }) + + /** @type {Set} */ + const tablePaths = new Set() + for (const p of partitions) { + if (p.tablePath) tablePaths.add(p.tablePath) + } + for (const p of discovered) { + if (p.path) tablePaths.add(p.path) + } + + /** @type {AsyncDataSource[]} */ + const sources = [] + for (const tablePath of tablePaths) { + const source = await storage.dataSourceForTable(tablePath) + if (source && (source.numRows ?? 0) > 0) sources.push(source) + } + + if (sources.length === 0) return emptySource() + if (sources.length === 1) return sources[0] + return unionSources(sources) +} + +/** + * Concatenate multiple partition sources into one. `github_events` is + * single-source today (everything lands under `source=unknown`), but keep this + * total so a future source split stays queryable without another silent + * zero-row read. + * + * @param {AsyncDataSource[]} sources + * @returns {AsyncDataSource} + */ +function unionSources(sources) { + const columns = Array.from(new Set(sources.flatMap((s) => s.columns))) + const numRows = sources.reduce((n, s) => n + (s.numRows ?? 0), 0) + return { + columns, + numRows, + scan(options) { + return { + appliedWhere: false, + appliedLimitOffset: false, + async *rows() { + for (const s of sources) { + for await (const row of s.scan(options).rows()) yield row + } + }, + } + }, + } +} + +/** @returns {AsyncDataSource} */ +function emptySource() { + return { + columns: GITHUB_EVENTS_COLUMNS.map((c) => c.name), + numRows: 0, + scan() { + return { appliedWhere: false, appliedLimitOffset: false, async *rows() {} } + }, + } +} + +/** + * The `DatasetRegistration` handed to `ctx.query.registerDataset`. + * + * @returns {DatasetRegistration} + */ +export function githubEventsDatasetRegistration() { + return { + name: DATASET_NAME, + plugin: PLUGIN_NAME, + schema: GITHUB_EVENTS_SCHEMA, + primaryTimestampColumn: 'created_at', + discoverPartitions: discoverParts, + refreshPartition, + createDataSource, + } +} diff --git a/plugins/github/src/github_client.js b/plugins/github/src/github_client.js new file mode 100644 index 0000000..bed349e --- /dev/null +++ b/plugins/github/src/github_client.js @@ -0,0 +1,265 @@ +// @ts-check + +/** + * Thin GitHub REST client. Three things it guarantees, all from the credential + * posture (LLP 0001 / hypaware LLP 0028): + * + * 1. The token is read from the configured env var **at call time** and is + * never stored on the instance beyond the request, never logged. + * 2. Error paths never copy the response **body** (which can echo a token or + * sensitive content) into the thrown error or logs — only status + the + * query-less path. + * 3. Polling is **cheap**: time-windowed endpoints carry `since`, listing + * endpoints carry `If-None-Match` (a `304` costs no rate budget). All + * cursor mechanics live here; capture just passes the per-repo cursor and + * persists it afterward (LLP 0002 §cursoring). + * + * `fetchImpl` is injectable so tests drive the client without a network. + * + * @import { GithubClient, GithubCommit, GithubComment, GithubIssue, GithubPull, GithubReview, PluginLogger, RepoCursor } from './types.d.ts' + */ + +const API_BASE = 'https://api.github.com' +const API_VERSION = '2022-11-28' +const PER_PAGE = 100 +/** Hard page cap per endpoint per run, so a runaway listing can't hang a tick. */ +const MAX_PAGES = 50 + +/** + * @param {object} opts + * @param {string} opts.tokenEnv the env-var NAME the token is read from + * @param {NodeJS.ProcessEnv} opts.env + * @param {PluginLogger} opts.log + * @param {typeof fetch} [opts.fetchImpl] + * @param {string} [opts.baseUrl] + * @returns {GithubClient} + */ +export function createGithubClient({ tokenEnv, env, log, fetchImpl, baseUrl = API_BASE }) { + const doFetch = fetchImpl ?? fetch + + /** + * One request. Reads the token fresh from `env[tokenEnv]`. Never logs it; + * never reads the response body on a non-OK status. + * + * @param {string} pathAndQuery e.g. `/repos/o/r/commits?since=...` + * @param {{ etagKey?: string, cursor?: RepoCursor }} [opts] + * @returns {Promise<{ notModified: true } | { notModified: false, data: unknown, next: string | null, etag: string | null }>} + */ + async function request(pathAndQuery, opts = {}) { + const token = env[tokenEnv] + /** @type {Record} */ + const headers = { + Accept: 'application/vnd.github+json', + 'X-GitHub-Api-Version': API_VERSION, + 'User-Agent': '@hypaware/github', + } + if (token) headers.Authorization = `Bearer ${token}` + const priorEtag = opts.etagKey && opts.cursor?.etag ? opts.cursor.etag[opts.etagKey] : undefined + if (priorEtag) headers['If-None-Match'] = priorEtag + + const url = pathAndQuery.startsWith('http') ? pathAndQuery : `${baseUrl}${pathAndQuery}` + const res = await doFetch(url, { headers }) + + if (res.status === 304) return { notModified: true } + if (!res.ok) { + // No body, no token, no query string — just status + the path. + const safePath = pathOf(url) + const err = /** @type {import('./types.d.ts').HypError} */ (new Error(`GitHub API ${res.status} for GET ${safePath}`)) + err.hypErrorKind = 'github_api_error' + err.status = res.status + throw err + } + + const etag = res.headers.get('etag') + if (opts.etagKey && opts.cursor && etag) { + if (!opts.cursor.etag) opts.cursor.etag = {} + opts.cursor.etag[opts.etagKey] = etag + } + const next = parseNextLink(res.headers.get('link')) + const data = await res.json() + return { notModified: false, data, next, etag } + } + + /** + * Fetch all pages of a listing. The first page is conditional (If-None-Match) + * when an `etagKey` is given; a `304` there means "unchanged" → `[]`. + * + * @param {string} firstPathAndQuery + * @param {{ etagKey?: string, cursor?: RepoCursor, label: string }} opts + * @returns {Promise} raw JSON objects (untyped at the API boundary) + */ + async function paginate(firstPathAndQuery, opts) { + /** @type {any[]} */ + const out = [] + let url = firstPathAndQuery + let first = true + for (let page = 0; page < MAX_PAGES && url; page++) { + const result = await request(url, { etagKey: first ? opts.etagKey : undefined, cursor: opts.cursor }) + if (result.notModified) return [] + if (Array.isArray(result.data)) out.push(.../** @type {Record[]} */ (result.data)) + url = result.next ?? '' + first = false + if (page === MAX_PAGES - 1 && url) { + log.warn('github.listing_truncated', { label: opts.label, max_pages: MAX_PAGES, per_page: PER_PAGE }) + } + } + return out + } + + return { + async listOrgRepos(org) { + const rows = await paginate(`/orgs/${enc(org)}/repos?per_page=${PER_PAGE}`, { label: `orgs/${org}/repos` }) + /** @type {string[]} */ + const out = [] + for (const r of rows) { + const full = typeof r.full_name === 'string' ? r.full_name : null + if (full) out.push(full) + } + return out + }, + + async listIssues(owner, repo, cursor) { + const q = sinceQuery(cursor.since?.issues) + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/issues?state=all&per_page=${PER_PAGE}${q}`, { label: `${owner}/${repo}/issues`, cursor }) + advanceSince(cursor, 'issues', rows) + return /** @type {GithubIssue[]} */ (rows) + }, + + async listPullRequests(owner, repo, cursor) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls?state=all&per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls`, etagKey: 'pulls', cursor }) + return /** @type {GithubPull[]} */ (rows) + }, + + async listPullRequestFiles(owner, repo, number) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls/${number}/files?per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls/${number}/files` }) + return filenames(rows) + }, + + async listPullRequestReviews(owner, repo, number) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls/${number}/reviews?per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls/${number}/reviews` }) + return /** @type {GithubReview[]} */ (rows) + }, + + async listPullRequestCommits(owner, repo, number) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls/${number}/commits?per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls/${number}/commits` }) + return /** @type {GithubCommit[]} */ (rows) + }, + + async listCommits(owner, repo, cursor) { + const q = sinceQuery(cursor.since?.commits) + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/commits?per_page=${PER_PAGE}${q}`, { label: `${owner}/${repo}/commits`, cursor }) + advanceSinceCommit(cursor, rows) + return /** @type {GithubCommit[]} */ (rows) + }, + + async listCommitFiles(owner, repo, sha) { + const result = await request(`/repos/${enc(owner)}/${enc(repo)}/commits/${enc(sha)}`) + if (result.notModified) return [] + const data = /** @type {Record} */ (result.data) + return filenames(Array.isArray(data.files) ? /** @type {Record[]} */ (data.files) : []) + }, + + async listIssueComments(owner, repo, cursor) { + const q = sinceQuery(cursor.since?.comments) + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/issues/comments?per_page=${PER_PAGE}${q}`, { label: `${owner}/${repo}/issues/comments`, cursor }) + advanceSince(cursor, 'comments', rows) + return /** @type {GithubComment[]} */ (rows) + }, + } +} + +/** + * @param {string | undefined} since + * @returns {string} + */ +function sinceQuery(since) { + return since ? `&since=${encodeURIComponent(since)}` : '' +} + +/** + * Advance a `since` high-water to the newest `updated_at`/`created_at` in the + * fetched rows. Using `updated_at` means an edited item re-qualifies next poll. + * + * @param {RepoCursor} cursor + * @param {'issues' | 'comments'} key + * @param {Record[]} rows + */ +function advanceSince(cursor, key, rows) { + let max = cursor.since?.[key] + for (const r of rows) { + const t = typeof r.updated_at === 'string' ? r.updated_at : typeof r.created_at === 'string' ? r.created_at : null + if (t && (!max || t > max)) max = t + } + if (max) { + if (!cursor.since) cursor.since = {} + cursor.since[key] = max + } +} + +/** + * Advance the commit `since` to the newest committer date seen. + * + * @param {RepoCursor} cursor + * @param {Record[]} rows + */ +function advanceSinceCommit(cursor, rows) { + let max = cursor.since?.commits + for (const r of rows) { + const commit = /** @type {Record | undefined} */ (r.commit) + const committer = commit && typeof commit.committer === 'object' ? /** @type {Record} */ (commit.committer) : null + const author = commit && typeof commit.author === 'object' ? /** @type {Record} */ (commit.author) : null + const t = (committer && typeof committer.date === 'string' ? committer.date : null) ?? (author && typeof author.date === 'string' ? author.date : null) + if (t && (!max || t > max)) max = t + } + if (max) { + if (!cursor.since) cursor.since = {} + cursor.since.commits = max + } +} + +/** + * @param {Record[]} rows + * @returns {string[]} + */ +function filenames(rows) { + /** @type {string[]} */ + const out = [] + for (const r of rows) { + if (typeof r.filename === 'string') out.push(r.filename) + } + return out +} + +/** + * Parse the `rel="next"` URL from a GitHub `Link` header. + * + * @param {string | null} link + * @returns {string | null} + */ +function parseNextLink(link) { + if (!link) return null + for (const part of link.split(',')) { + const m = /<([^>]+)>\s*;\s*rel="next"/.exec(part) + if (m) return m[1] + } + return null +} + +/** + * The path (no query string) of a URL, for safe error messages. + * + * @param {string} url + * @returns {string} + */ +function pathOf(url) { + try { + return new URL(url).pathname + } catch { + return url.split('?')[0] + } +} + +/** @param {string} segment @returns {string} */ +function enc(segment) { + return encodeURIComponent(segment) +} diff --git a/plugins/github/src/graph_contract.js b/plugins/github/src/graph_contract.js new file mode 100644 index 0000000..9d223da --- /dev/null +++ b/plugins/github/src/graph_contract.js @@ -0,0 +1,355 @@ +// @ts-check + +import { + actorKey, + commitKey, + fileKey, + issueKey, + pullRequestKey, + repoKey, + reviewKey, + str, +} from './keys.js' +import { DATASET_NAME } from './dataset.js' + +/** + * @import { ContractRule, GraphContract, GraphKit } from './types.d.ts' + */ + +/** This plugin's name, stamped on the contract for provenance/dedup. */ +export const PLUGIN_NAME = '@hypaware/github' +/** The dataset the contract reads (this plugin's own skeleton). */ +export const SOURCE_DATASET = DATASET_NAME +/** Projector id stamped into every row's provenance. */ +export const PROJECTOR = 'github.t0' +/** + * Projector version, stamped into provenance to mark which projector + * generation minted a row. NOT a re-projection trigger — ids are + * content-addressed (hypaware LLP 0023 §inline-provenance). + */ +export const PROJECTOR_VERSION = 1 + +/** + * Build the `github_events → graph` T0 contract: a hand-authored list of + * read-only `SELECT` + `toRow` rules realizing the LLP 0003 taxonomy. Rows are + * built with the graph plugin's `kit`, so the id recipe and provenance columns + * stay owned by the graph plugin — this plugin owns only the SQL + `toRow` + * semantics and the natural-key normalization ({@link file:./keys.js}). + * + * Four node types (`Repo`, `Actor`, `Commit`, `File`) use **bridge-ready** keys + * so a node minted by another domain converges automatically (LLP 0003/0004); + * `Issue`, `PullRequest`, `Review` are GitHub-internal. + * + * @ref LLP 0003#node-types [implements] — node/edge taxonomy + bridge-ready natural keys + * @ref hypaware LLP 0023#contract-contribution [constrained-by] — central id+provenance kit; this plugin owns only its rules + * + * @param {GraphKit} kit + * @returns {GraphContract} + */ +export function createGithubGraphContract(kit) { + const { buildNode, buildEdge } = kit.makeRowBuilders({ + sourceDataset: SOURCE_DATASET, + projector: PROJECTOR, + projectorVersion: PROJECTOR_VERSION, + }) + + /** @type {ContractRule[]} */ + const rules = [ + // ---------------------------------------------------------------- nodes + + // Repo per repo column — every event row mentions its repo. ★ bridge-ready. + { + kind: 'node', + type: 'Repo', + sql: `SELECT repo, created_at FROM ${SOURCE_DATASET}`, + toRow(r) { + const key = repoKey(r.repo) + if (!key) return null + return buildNode({ type: 'Repo', key, label: key, firstSeen: r.created_at, sourceKeys: { repo: key } }) + }, + }, + + // Actor per login, from any row carrying an actor. ★ bridge-ready. + // Identity-merge across domains (github login vs session user_id) is T1/T2, + // not here (LLP 0004 §actor-identity-is-enrichment-not-t0). + { + kind: 'node', + type: 'Actor', + sql: `SELECT actor_login, actor_type, created_at FROM ${SOURCE_DATASET} WHERE actor_login IS NOT NULL`, + toRow(r) { + const key = actorKey(r.actor_login) + if (!key) return null + return buildNode({ + type: 'Actor', + key, + label: key, + props: pruned({ actor_type: str(r.actor_type) }), + firstSeen: r.created_at, + sourceKeys: { actor_login: key }, + }) + }, + }, + + // Commit per sha. ★ bridge-ready — sha is globally unique, NOT repo-qualified. + { + kind: 'node', + type: 'Commit', + sql: `SELECT sha, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit'`, + toRow(r) { + const key = commitKey(r.sha) + if (!key) return null + return buildNode({ type: 'Commit', key, firstSeen: r.created_at, sourceKeys: { sha: key } }) + }, + }, + + // File per owner/repo:relpath, from the file-grained rows. ★ bridge-ready. + { + kind: 'node', + type: 'File', + sql: `SELECT repo, path, created_at FROM ${SOURCE_DATASET} WHERE path IS NOT NULL`, + toRow(r) { + const key = fileKey(r.repo, r.path) + if (!key) return null + return buildNode({ type: 'File', key, label: basename(key), firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), path: str(r.path) } }) + }, + }, + + // Issue per owner/repo#number. GitHub-internal. + { + kind: 'node', + type: 'Issue', + sql: `SELECT repo, number, state, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'issue'`, + toRow(r) { + const key = issueKey(r.repo, r.number) + if (!key) return null + return buildNode({ type: 'Issue', key, props: pruned({ state: str(r.state) }), firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), number: numOrNull(r.number) } }) + }, + }, + + // PullRequest per owner/repo#number. GitHub-internal. + { + kind: 'node', + type: 'PullRequest', + sql: `SELECT repo, number, state, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'pull_request'`, + toRow(r) { + const key = pullRequestKey(r.repo, r.number) + if (!key) return null + return buildNode({ type: 'PullRequest', key, props: pruned({ state: str(r.state) }), firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), number: numOrNull(r.number) } }) + }, + }, + + // Review per review/. GitHub-internal. + { + kind: 'node', + type: 'Review', + sql: `SELECT review_id, review_state, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'review'`, + toRow(r) { + const key = reviewKey(r.review_id) + if (!key) return null + return buildNode({ type: 'Review', key, props: pruned({ state: str(r.review_state) }), firstSeen: r.created_at, sourceKeys: { review_id: numOrNull(r.review_id) } }) + }, + }, + + // ---------------------------------------------------------------- edges + + // Repo membership. + repoMembership('Issue', 'issue', issueKey), + repoMembership('PullRequest', 'pull_request', pullRequestKey), + { + kind: 'edge', + type: 'in', + sql: `SELECT repo, sha, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit'`, + toRow(r) { + const repo = repoKey(r.repo) + const sha = commitKey(r.sha) + if (!repo || !sha) return null + return buildEdge({ type: 'in', srcType: 'Commit', srcKey: sha, dstType: 'Repo', dstKey: repo, firstSeen: r.created_at, sourceKeys: { sha, repo } }) + }, + }, + { + kind: 'edge', + type: 'in', + sql: `SELECT repo, path, created_at FROM ${SOURCE_DATASET} WHERE path IS NOT NULL`, + toRow(r) { + const repo = repoKey(r.repo) + const file = fileKey(r.repo, r.path) + if (!repo || !file) return null + return buildEdge({ type: 'in', srcType: 'File', srcKey: file, dstType: 'Repo', dstKey: repo, firstSeen: r.created_at, sourceKeys: { repo, path: str(r.path) } }) + }, + }, + + // Authorship / activity. + actorTo('opened', 'Issue', 'issue', issueKey), + actorTo('opened', 'PullRequest', 'pull_request', pullRequestKey), + { + kind: 'edge', + type: 'authored', + sql: `SELECT actor_login, sha, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit' AND actor_login IS NOT NULL`, + toRow(r) { + const actor = actorKey(r.actor_login) + const sha = commitKey(r.sha) + if (!actor || !sha) return null + return buildEdge({ type: 'authored', srcType: 'Actor', srcKey: actor, dstType: 'Commit', dstKey: sha, firstSeen: r.created_at, sourceKeys: { actor_login: actor, sha } }) + }, + }, + actorTo('commented', 'Issue', 'issue_comment', issueKey), + actorTo('commented', 'PullRequest', 'pull_request_comment', pullRequestKey), + + // Review. + { + kind: 'edge', + type: 'submitted', + sql: `SELECT actor_login, review_id, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'review' AND actor_login IS NOT NULL`, + toRow(r) { + const actor = actorKey(r.actor_login) + const review = reviewKey(r.review_id) + if (!actor || !review) return null + return buildEdge({ type: 'submitted', srcType: 'Actor', srcKey: actor, dstType: 'Review', dstKey: review, firstSeen: r.created_at, sourceKeys: { actor_login: actor, review_id: numOrNull(r.review_id) } }) + }, + }, + { + kind: 'edge', + type: 'on', + sql: `SELECT repo, review_id, pr_number, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'review' AND pr_number IS NOT NULL`, + toRow(r) { + const review = reviewKey(r.review_id) + const pr = pullRequestKey(r.repo, r.pr_number) + if (!review || !pr) return null + return buildEdge({ type: 'on', srcType: 'Review', srcKey: review, dstType: 'PullRequest', dstKey: pr, firstSeen: r.created_at, sourceKeys: { review_id: numOrNull(r.review_id), pr_number: numOrNull(r.pr_number) } }) + }, + }, + + // Code. + { + kind: 'edge', + type: 'touched', + sql: `SELECT sha, repo, path, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit_file'`, + toRow(r) { + const sha = commitKey(r.sha) + const file = fileKey(r.repo, r.path) + if (!sha || !file) return null + return buildEdge({ type: 'touched', srcType: 'Commit', srcKey: sha, dstType: 'File', dstKey: file, firstSeen: r.created_at, sourceKeys: { sha, repo: repoKey(r.repo), path: str(r.path) } }) + }, + }, + { + kind: 'edge', + type: 'touched', + sql: `SELECT repo, number, path, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'pull_request_file'`, + toRow(r) { + const pr = pullRequestKey(r.repo, r.number) + const file = fileKey(r.repo, r.path) + if (!pr || !file) return null + return buildEdge({ type: 'touched', srcType: 'PullRequest', srcKey: pr, dstType: 'File', dstKey: file, firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), number: numOrNull(r.number), path: str(r.path) } }) + }, + }, + + // Linkage: a PR references the commits captured under it. + { + kind: 'edge', + type: 'references', + sql: `SELECT repo, sha, pr_number, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit' AND pr_number IS NOT NULL`, + toRow(r) { + const pr = pullRequestKey(r.repo, r.pr_number) + const sha = commitKey(r.sha) + if (!pr || !sha) return null + return buildEdge({ type: 'references', srcType: 'PullRequest', srcKey: pr, dstType: 'Commit', dstKey: sha, firstSeen: r.created_at, sourceKeys: { pr_number: numOrNull(r.pr_number), sha } }) + }, + }, + ] + + return { + name: 'github-t0', + plugin: PLUGIN_NAME, + sourceDataset: SOURCE_DATASET, + projector: PROJECTOR, + projectorVersion: PROJECTOR_VERSION, + rules, + } + + /** + * ` -in-> Repo` for an event-discriminated subject keyed on number. + * + * @param {string} subjectType + * @param {string} eventType + * @param {(repo: unknown, number: unknown) => string | null} subjectKey + * @returns {ContractRule} + */ + function repoMembership(subjectType, eventType, subjectKey) { + return { + kind: 'edge', + type: 'in', + sql: `SELECT repo, number, created_at FROM ${SOURCE_DATASET} WHERE event_type = '${eventType}'`, + toRow(r) { + const repo = repoKey(r.repo) + const subject = subjectKey(r.repo, r.number) + if (!repo || !subject) return null + return buildEdge({ type: 'in', srcType: subjectType, srcKey: subject, dstType: 'Repo', dstKey: repo, firstSeen: r.created_at, sourceKeys: { repo, number: numOrNull(r.number) } }) + }, + } + } + + /** + * `Actor --> ` for an event-discriminated subject keyed on number. + * + * @param {string} rel + * @param {string} subjectType + * @param {string} eventType + * @param {(repo: unknown, number: unknown) => string | null} subjectKey + * @returns {ContractRule} + */ + function actorTo(rel, subjectType, eventType, subjectKey) { + return { + kind: 'edge', + type: rel, + sql: `SELECT repo, number, actor_login, created_at FROM ${SOURCE_DATASET} WHERE event_type = '${eventType}' AND actor_login IS NOT NULL`, + toRow(r) { + const actor = actorKey(r.actor_login) + const subject = subjectKey(r.repo, r.number) + if (!actor || !subject) return null + return buildEdge({ type: rel, srcType: 'Actor', srcKey: actor, dstType: subjectType, dstKey: subject, firstSeen: r.created_at, sourceKeys: { actor_login: actor, repo: repoKey(r.repo), number: numOrNull(r.number) } }) + }, + } + } +} + +/** + * Basename of a `File` natural key (`owner/repo:relpath`) — the label only. + * + * @param {string} key + * @returns {string} + */ +function basename(key) { + const colon = key.indexOf(':') + const relpath = colon >= 0 ? key.slice(colon + 1) : key + const slash = relpath.lastIndexOf('/') + return slash >= 0 ? relpath.slice(slash + 1) : relpath +} + +/** + * Coerce a GitHub number to a JS number for `source_keys` provenance, or null. + * + * @param {unknown} value + * @returns {number | null} + */ +function numOrNull(value) { + if (typeof value === 'number' && Number.isFinite(value)) return value + if (typeof value === 'bigint') return Number(value) + if (typeof value === 'string' && /^[0-9]+$/.test(value)) return Number(value) + return null +} + +/** + * Drop null/undefined entries so identical inputs build identical props + * (keys sorted for stable JSON), matching the ai-gateway contract. + * + * @param {Record} obj + * @returns {Record} + */ +function pruned(obj) { + /** @type {Record} */ + const out = {} + for (const key of Object.keys(obj).sort()) { + if (obj[key] != null) out[key] = obj[key] + } + return out +} diff --git a/plugins/github/src/index.js b/plugins/github/src/index.js new file mode 100644 index 0000000..0d88360 --- /dev/null +++ b/plugins/github/src/index.js @@ -0,0 +1,91 @@ +// @ts-check + +import { runGithub, runGithubBackfill, runGithubSync } from './commands.js' +import { validateGithubConfig } from './config.js' +import { githubEventsDatasetRegistration } from './dataset.js' +import { createGithubGraphContract } from './graph_contract.js' +import { setGithubRuntime } from './runtime.js' +import { startGithubSource } from './source.js' + +/** + * @import { ContextGraphCapabilityLike, ExtendedQueryStorageService, HypError, PluginActivationContext, ValidationResult } from './types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/github' + +/** + * Activate `@hypaware/github`. Registers the `github` config section, validates + * config, resolves the context-graph capability, registers the `github_events` + * dataset, **registers the bundled T0 contract** with the graph, and registers + * the `github` poll source + `github backfill`/`sync` commands. + * + * The graph capability is resolved **eagerly** (its `kit` + `registerContract` + * are needed now) and the plugin dependency on `@hypaware/context-graph` + * guarantees it activated first: the resolver orders by `requires.plugins`, not + * `requires.capabilities` (LLP 0001 §both-dependency-kinds; hypaware LLP 0006). + * + * @ref LLP 0001#decision [implements] — bundle the contract; require context-graph as plugin + capability; register in activate() + * + * @param {PluginActivationContext} ctx + */ +export async function activate(ctx) { + ctx.configRegistry.registerSection({ + plugin: PLUGIN_NAME, + section: 'github', + validate: (value) => toValidationResult(validateGithubConfig(value)), + }) + + const validated = validateGithubConfig(ctx.config) + if (!validated.ok) { + const detail = validated.errors.map((e) => `${e.pointer || '/'}: ${e.message}`).join('; ') + const err = /** @type {HypError} */ (new Error(`${PLUGIN_NAME}: invalid config — ${detail}`)) + err.hypErrorKind = 'github_config_invalid' + throw err + } + const config = validated.config + + const graph = /** @type {ContextGraphCapabilityLike} */ ( + ctx.requireCapability('hypaware.context-graph', '^1.0.0') + ) + + ctx.query.registerDataset(githubEventsDatasetRegistration()) + + setGithubRuntime({ + ctx, + config, + log: ctx.log, + stateDir: ctx.paths.stateDir, + storage: /** @type {ExtendedQueryStorageService} */ (ctx.storage), + env: ctx.env, + }) + + graph.registerContract(createGithubGraphContract(graph.kit)) + + ctx.sources.register({ + name: 'github', + plugin: PLUGIN_NAME, + summary: 'Ongoing poll (opt-in via poll_interval): append events past each repo cursor', + configSection: 'github', + start: startGithubSource, + }) + + ctx.commands.register({ name: 'github', plugin: PLUGIN_NAME, summary: 'GitHub capture', usage: 'hyp github ', run: runGithub }) + ctx.commands.register({ name: 'github backfill', plugin: PLUGIN_NAME, summary: 'Pull full history into github_events', usage: 'hyp github backfill [owner/repo ...]', run: runGithubBackfill }) + ctx.commands.register({ name: 'github sync', plugin: PLUGIN_NAME, summary: 'Run one poll tick now', usage: 'hyp github sync', run: runGithubSync }) + + ctx.log.info('github.activated', { + repos: config.repos.length, + orgs: config.orgs.length, + ignore: config.ignore.length, + poll: config.poll_interval ?? null, + }) +} + +/** + * @param {ReturnType} result + * @returns {ValidationResult} + */ +function toValidationResult(result) { + if (result.ok) return { ok: true } + return { ok: false, errors: result.errors.map((e) => ({ pointer: e.pointer, message: e.message })) } +} diff --git a/plugins/github/src/keys.js b/plugins/github/src/keys.js new file mode 100644 index 0000000..45fbdae --- /dev/null +++ b/plugins/github/src/keys.js @@ -0,0 +1,185 @@ +// @ts-check + +/** + * Natural-key construction + normalization for the GitHub graph taxonomy. + * + * Graph node ids are content-addressed over `(kind, type, natural key)` + * (hypaware LLP 0023 §content-addressed-ids), so the key *is* the identity: + * two contracts converge iff they normalize identically. These helpers are the + * one place the rules are stated — the contract's `toRow`s call them rather + * than each re-deriving a key — so a `Repo`/`Commit`/`File`/`Actor` minted by + * another domain (the LLM-session graph) lands on the same id and merges. + * + * Changing any recipe here orphans every committed graph row that used the old + * key; `test/graph-ids.test.js` digest-pins the outputs so a change can only + * happen deliberately, with a migration. + * + * @ref LLP 0003#key-normalization-rules [implements] — owner/repo/login lowercased; relpath POSIX; sha full-40-hex lowercase + */ + +/** + * Lowercase a GitHub identifier (`owner`, `repo`, `login`). GitHub looks these + * up case-insensitively but preserves display case; lowercasing guarantees + * convergence and kills case-drift. Display case, if needed, rides in `props`. + * + * @param {unknown} value + * @returns {string | null} + */ +export function normalizeLogin(value) { + const s = str(value) + return s ? s.toLowerCase() : null +} + +/** + * Repo-relative POSIX path: forward slashes, no leading slash, no `./`. The + * local-absolute → repo-relative reconciliation is the *other* side's job + * (LLP 0004); here we only canonicalize a path GitHub already returns + * repo-relative. + * + * @param {unknown} value + * @returns {string | null} + */ +export function normalizeRelpath(value) { + let s = str(value) + if (!s) return null + s = s.replace(/\\/g, '/') + while (s.startsWith('./')) s = s.slice(2) + while (s.startsWith('/')) s = s.slice(1) + return s.length > 0 ? s : null +} + +/** + * `Repo` key — `owner/repo`, lowercased. Accepts either `(owner, repo)` or a + * single `owner/repo` string. github.com is implied in V1 (no host segment — + * see LLP 0003 §multi-host). + * + * @param {unknown} ownerOrFull + * @param {unknown} [repo] + * @returns {string | null} + */ +export function repoKey(ownerOrFull, repo) { + if (repo === undefined) { + const full = str(ownerOrFull) + if (!full) return null + const slash = full.indexOf('/') + if (slash <= 0 || slash === full.length - 1) return null + return `${full.slice(0, slash).toLowerCase()}/${full.slice(slash + 1).toLowerCase()}` + } + const o = normalizeLogin(ownerOrFull) + const r = normalizeLogin(repo) + if (!o || !r) return null + return `${o}/${r}` +} + +/** + * `Actor` key — `login`, lowercased. + * + * @param {unknown} login + * @returns {string | null} + */ +export function actorKey(login) { + return normalizeLogin(login) +} + +/** + * `Commit` key — full 40-hex `sha`, lowercased; never abbreviated. The sha is + * globally unique across git, so it is NOT repo-qualified — that is what makes + * it the cleanest bridge key and lets it converge across forks (LLP 0003). + * + * @param {unknown} sha + * @returns {string | null} + */ +export function commitKey(sha) { + const s = str(sha) + return s ? s.toLowerCase() : null +} + +/** + * `File` key — `owner/repo:relpath`. The repo half is normalized via + * {@link repoKey}, the path via {@link normalizeRelpath}. A rename is a new + * `File` (T0 keys path, not content). + * + * @param {unknown} repoFull `owner/repo` (any case) + * @param {unknown} relpath + * @returns {string | null} + */ +export function fileKey(repoFull, relpath) { + const rk = repoKey(repoFull) + const rp = normalizeRelpath(relpath) + if (!rk || !rp) return null + return `${rk}:${rp}` +} + +/** + * `Issue` key — `owner/repo#number`. GitHub shares one number space across + * issues+PRs per repo, so a number is *either* an issue or a PR (no collision); + * the node `type` separates them in the hash regardless (LLP 0003). + * + * @param {unknown} repoFull + * @param {unknown} number + * @returns {string | null} + */ +export function issueKey(repoFull, number) { + return numberedKey(repoFull, number) +} + +/** + * `PullRequest` key — `owner/repo#number` (see {@link issueKey} on the shared + * number space). + * + * @param {unknown} repoFull + * @param {unknown} number + * @returns {string | null} + */ +export function pullRequestKey(repoFull, number) { + return numberedKey(repoFull, number) +} + +/** + * `Review` key — `review/`. The GitHub review id is globally unique. + * + * @param {unknown} reviewId + * @returns {string | null} + */ +export function reviewKey(reviewId) { + const n = intStr(reviewId) + return n ? `review/${n}` : null +} + +/** + * @param {unknown} repoFull + * @param {unknown} number + * @returns {string | null} + */ +function numberedKey(repoFull, number) { + const rk = repoKey(repoFull) + const n = intStr(number) + if (!rk || !n) return null + return `${rk}#${n}` +} + +/** + * Coerce a value to a non-empty string, or null. Numbers/bigints stringify. + * + * @param {unknown} value + * @returns {string | null} + */ +export function str(value) { + if (typeof value === 'string') return value.length > 0 ? value : null + if (typeof value === 'number' || typeof value === 'bigint') return String(value) + return null +} + +/** + * Coerce a GitHub integer id/number (which may arrive as a number, bigint, or + * numeric string) to its canonical decimal string, rejecting non-integers. + * + * @param {unknown} value + * @returns {string | null} + */ +function intStr(value) { + if (typeof value === 'number') return Number.isInteger(value) ? String(value) : null + if (typeof value === 'bigint') return String(value) + if (typeof value === 'string' && /^[0-9]+$/.test(value)) return value + return null +} diff --git a/plugins/github/src/runtime.js b/plugins/github/src/runtime.js new file mode 100644 index 0000000..672346f --- /dev/null +++ b/plugins/github/src/runtime.js @@ -0,0 +1,52 @@ +// @ts-check + +import { createGithubClient } from './github_client.js' + +/** + * Module-local runtime singleton (same pattern as context-graph-enrich and + * gascity): `activate()` captures the resolved config + handles, and the daemon + * source / commands retrieve it. Keeps the source `start` and command `run` + * functions free of constructor plumbing. + * + * @import { GithubClient, GithubConfig, PluginActivationContext, PluginLogger, ExtendedQueryStorageService } from './types.d.ts' + */ + +/** + * @typedef {object} GithubRuntime + * @property {PluginActivationContext} ctx + * @property {GithubConfig} config + * @property {PluginLogger} log + * @property {string} stateDir + * @property {ExtendedQueryStorageService} storage + * @property {NodeJS.ProcessEnv} env + * @property {(() => GithubClient) | undefined} [clientFactory] test seam; defaults to the real fetch client + */ + +/** @type {GithubRuntime | null} */ +let runtime = null + +/** @param {GithubRuntime} value */ +export function setGithubRuntime(value) { + runtime = value +} + +/** @returns {GithubRuntime} */ +export function requireGithubRuntime() { + if (!runtime) { + throw new Error('@hypaware/github: not activated yet — runtime singleton is empty') + } + return runtime +} + +/** + * Build a GitHub client for the current config. The token is read from the + * configured env var inside the client at request time, so a client built here + * never closes over the secret. + * + * @param {GithubRuntime} rt + * @returns {GithubClient} + */ +export function getClient(rt) { + if (rt.clientFactory) return rt.clientFactory() + return createGithubClient({ tokenEnv: rt.config.token_env, env: rt.env, log: rt.log }) +} diff --git a/plugins/github/src/source.js b/plugins/github/src/source.js new file mode 100644 index 0000000..35f5564 --- /dev/null +++ b/plugins/github/src/source.js @@ -0,0 +1,86 @@ +// @ts-check + +import { parseInterval } from './config.js' +import { requireGithubRuntime } from './runtime.js' +import { runCaptureTick } from './tick.js' + +/** + * `startGithubSource` is the `SourceContribution.start` callback. The poll is + * **opt-in**: it ticks only when `poll_interval` is configured; omit it and the + * source idles and capture is driven purely by `backfill` / `sync` (LLP 0002 + * §ongoing-poll-is-opt-in — capturing from an external API with a credential is + * a deliberate act that must not start itself). + * + * @ref LLP 0002#ongoing-poll-is-opt-in [implements] — poll only when poll_interval is set + * + * @import { SourceStatus, StartedSource } from './types.d.ts' + * @returns {Promise} + */ +export async function startGithubSource() { + const runtime = requireGithubRuntime() + /** @type {ReturnType | null} */ + let handle = null + /** @type {Promise | null} */ + let inFlight = null + /** @type {string | null} */ + let lastTickAt = null + let rowsWritten = 0 + /** @type {string | undefined} */ + let lastError + + async function tick() { + lastTickAt = new Date().toISOString() + try { + const result = await runCaptureTick(runtime, { mode: 'poll' }) + rowsWritten += result.events + lastError = result.errors[0]?.error + } catch (err) { + lastError = err instanceof Error ? err.message : String(err) + runtime.log.error('github.poll_tick_failed', { error: lastError }) + } + } + + function startTimer() { + const interval = runtime.config.poll_interval + if (!interval) return + const ms = parseInterval(interval) + if (ms === null) return + handle = setInterval(() => { + if (inFlight) return + inFlight = tick().finally(() => { inFlight = null }) + }, Math.max(1, ms)) + if (typeof handle.unref === 'function') handle.unref() + } + + function stopTimer() { + if (handle) clearInterval(handle) + handle = null + } + + startTimer() + + return { + async status() { + const polling = handle !== null + /** @type {SourceStatus} */ + const status = { + state: polling ? 'ready' : 'stopped', + message: runtime.config.poll_interval + ? `polling ${runtime.config.repos.length + runtime.config.orgs.length} selection(s) every ${runtime.config.poll_interval}` + : 'idle (no poll_interval; use `hyp github backfill` / `hyp github sync`)', + details: { last_tick_at: lastTickAt }, + rowsWritten, + } + if (lastError) status.lastError = lastError + return status + }, + async reload() { + stopTimer() + startTimer() + }, + async stop() { + stopTimer() + if (inFlight) await inFlight.catch(() => {}) + }, + } +} diff --git a/plugins/github/src/tick.js b/plugins/github/src/tick.js new file mode 100644 index 0000000..e47ebfd --- /dev/null +++ b/plugins/github/src/tick.js @@ -0,0 +1,48 @@ +// @ts-check + +import { captureRepos } from './capture.js' +import { readCursors, writeCursors } from './cursors.js' +import { GITHUB_EVENTS_COLUMNS, githubEventsTablePath } from './dataset.js' +import { getClient } from './runtime.js' + +/** + * Run one capture tick: read the per-repo cursors, capture every selected repo + * (appending `github_events` rows through the kernel cache), then persist the + * advanced cursors. Shared by the daemon poll source and the `sync`/`backfill` + * commands — the only difference is `mode` (and the optional `only` filter). + * + * Cursors are persisted even when a repo errors mid-run, so progress is never + * lost (the next tick resumes past what was captured). + * + * @import { GithubRuntime } from './runtime.js' + * + * @param {GithubRuntime} runtime + * @param {{ mode: 'backfill' | 'poll', only?: string[] }} opts + * @returns {Promise<{ repos: number, events: number, errors: Array<{ repo: string, error: string }> }>} + */ +export async function runCaptureTick(runtime, opts) { + const cursors = readCursors(runtime.stateDir) + const client = getClient(runtime) + const tablePath = githubEventsTablePath(runtime.storage) + const columns = [...GITHUB_EVENTS_COLUMNS] + + /** @param {Record[]} rows */ + async function append(rows) { + if (rows.length === 0) return + await runtime.storage.appendRows(tablePath, columns, rows) + } + + try { + return await captureRepos({ + client, + config: runtime.config, + cursors, + append, + log: runtime.log, + mode: opts.mode, + only: opts.only, + }) + } finally { + writeCursors(runtime.stateDir, cursors) + } +} diff --git a/plugins/github/src/types.d.ts b/plugins/github/src/types.d.ts new file mode 100644 index 0000000..55c9064 --- /dev/null +++ b/plugins/github/src/types.d.ts @@ -0,0 +1,243 @@ +// Shared interfaces for the @hypaware/github plugin. +// +// Two kinds of types live here, kept apart deliberately: +// +// 1. Kernel types — re-exported from the host runtime's published +// `collectivus-plugin-kernel-types.d.ts`. This file is the SINGLE place +// that references the sibling `hypaware` repo, so the rest of `src/` +// imports kernel shapes from `./types.d.ts` and never reaches across +// repos directly. The sibling layout (`~/workspace/hypaware`) is the +// documented assumption for this plugin (see AGENTS.md / LLP 0000). +// +// 2. Capability + plugin-local types — declared here directly. Like +// `@hypaware/ai-gateway-graph`, this plugin RE-DECLARES the +// `hypaware.context-graph` capability shape rather than importing the +// provider's internals: a contract author consumes only the kit's public +// surface, so the id recipe + provenance columns stay owned by the graph +// plugin (hypaware LLP 0023 §contract-contribution). + +import type { + AsyncDataSource, + ColumnSpec, + CommandRunContext, + DatasetDataSourceContext, + DatasetDiscoveryContext, + DatasetRefreshResult, + DatasetRegistration, + HypError, + JsonObject, + PluginActivationContext, + PluginLogger, + QueryPartition, + QueryStorageService, + SourceStatus, + StartedSource, + ValidationError, + ValidationResult, +} from '../../../../hypaware/collectivus-plugin-kernel-types.d.ts' + +export type { + AsyncDataSource, + ColumnSpec, + CommandRunContext, + DatasetDataSourceContext, + DatasetDiscoveryContext, + DatasetRefreshResult, + DatasetRegistration, + HypError, + JsonObject, + PluginActivationContext, + PluginLogger, + QueryPartition, + QueryStorageService, + SourceStatus, + StartedSource, + ValidationError, + ValidationResult, +} + +// --------------------------------------------------------------------------- +// Storage: the extended cache surface the kernel hands plugins. `AsyncDataSource` +// is the kernel's own re-export of the `squirreling` type (so a dataset's +// `createDataSource` return matches `DatasetRegistration`); only the +// `dataSourceForTable` extension is added here. +// --------------------------------------------------------------------------- + +/** `QueryStorageService` plus the `dataSourceForTable` extension used by datasets. */ +export type ExtendedQueryStorageService = QueryStorageService & { + dataSourceForTable(tablePath: string): Promise +} + +// --------------------------------------------------------------------------- +// `hypaware.context-graph` capability shape (re-declared, not imported). +// --------------------------------------------------------------------------- + +export type GraphRow = Record + +/** One T0 contract rule: a read-only SELECT plus a row mapper. */ +export interface ContractRule { + kind: 'node' | 'edge' + type: string + sql: string + toRow(row: Record): GraphRow | null +} + +/** Per-node spec a `toRow` hands `buildNode` (identity + optional display + provenance keys). */ +export interface NodeSpec { + type: string + key: string + label?: string | null + props?: Record + firstSeen: unknown + sourceKeys: Record +} + +/** Per-edge spec a `toRow` hands `buildEdge` (endpoints + relation type + provenance keys). */ +export interface EdgeSpec { + type: string + srcType: string + srcKey: string + dstType: string + dstKey: string + firstSeen: unknown + sourceKeys: Record +} + +/** The row builders the kit hands a contract author (id recipe + provenance live in the graph plugin). */ +export interface GraphRowBuilders { + buildNode(spec: NodeSpec): GraphRow + buildEdge(spec: EdgeSpec): GraphRow +} + +/** The shared authoring kit exposed on the `hypaware.context-graph` capability. */ +export interface GraphKit { + nodeId(type: string, naturalKey: string): string + edgeId(srcId: string, type: string, dstId: string): string + makeRowBuilders(meta: { + sourceDataset: string + projector: string + projectorVersion: number + }): GraphRowBuilders +} + +/** The `hypaware.context-graph` capability value, as this plugin consumes it. */ +export interface ContextGraphCapabilityLike { + registerContract(contract: unknown): void + kit: GraphKit +} + +/** The contract object `registerContract` accepts. */ +export interface GraphContract { + name: string + plugin: string + sourceDataset: string + projector: string + projectorVersion: number + rules: ContractRule[] +} + +// --------------------------------------------------------------------------- +// Plugin-local config. +// --------------------------------------------------------------------------- + +export interface GithubConfig { + repos: string[] + orgs: string[] + ignore: string[] + poll_interval?: string + token_env: string +} + +export interface GithubConfigError { + pointer: string + message: string + errorKind: 'github_config_invalid' +} + +export type GithubConfigResult = + | { ok: true; config: GithubConfig } + | { ok: false; errors: GithubConfigError[] } + +// --------------------------------------------------------------------------- +// Per-repo capture cursor (sidecar state — see LLP 0002 §cursoring). +// --------------------------------------------------------------------------- + +/** + * One repo's poll cursor: `since` high-water marks for the time-windowed + * endpoints (issues/commits/comments) and an ETag map for the listing + * endpoints that lack `since`. Shared by backfill + poll so a backfilled repo's + * poller resumes past history rather than re-fetching it. + */ +export interface RepoCursor { + since?: { issues?: string; commits?: string; comments?: string; pulls?: string } + etag?: Record +} + +export interface CursorState { + schema_version: number + repos: Record +} + +// --------------------------------------------------------------------------- +// GitHub API client surface (capture.js depends on this, not on `fetch`, so +// tests inject a fake). +// --------------------------------------------------------------------------- + +/** A normalized GitHub actor (the `user`/`author` object subset we keep). */ +export interface GithubActor { + login: string + type?: string +} + +export interface GithubClient { + /** Enumerate the repos (`owner/repo`) the token can see in an org. */ + listOrgRepos(org: string): Promise + /** Issues (state=all). PRs are filtered out by the caller via `isPullRequest`. */ + listIssues(owner: string, repo: string, cursor: RepoCursor): Promise + listPullRequests(owner: string, repo: string, cursor: RepoCursor): Promise + listPullRequestFiles(owner: string, repo: string, number: number): Promise + listPullRequestReviews(owner: string, repo: string, number: number): Promise + listPullRequestCommits(owner: string, repo: string, number: number): Promise + listCommits(owner: string, repo: string, cursor: RepoCursor): Promise + listCommitFiles(owner: string, repo: string, sha: string): Promise + listIssueComments(owner: string, repo: string, cursor: RepoCursor): Promise +} + +export interface GithubIssue { + number: number + state?: string + created_at?: string + user?: GithubActor | null + pull_request?: unknown +} + +export interface GithubPull { + number: number + state?: string + merged_at?: string | null + draft?: boolean + created_at?: string + updated_at?: string + user?: GithubActor | null +} + +export interface GithubReview { + id: number + state?: string + submitted_at?: string + user?: GithubActor | null +} + +export interface GithubCommit { + sha: string + author?: GithubActor | null + commit?: { author?: { date?: string } | null } | null +} + +export interface GithubComment { + id: number + created_at?: string + user?: GithubActor | null + /** `.../issues/{number}` — the subject the comment is attached to. */ + issue_url?: string +} diff --git a/src/boot.js b/src/boot.js index 1cfa5db..98ca39e 100644 --- a/src/boot.js +++ b/src/boot.js @@ -48,11 +48,43 @@ function pluginSelection(config) { }, } : { dir: path.join(bundledWorkspaceDir, 'local-fs'), config: { exports_dir: config.archive.dir } }, + // Graph plugins: activated for their dataset / contract / command + // registrations so the server can hold a github_events skeleton and + // run the T0 projection + neighbors over its own cache, where the + // forwarded LLM logs also live. context-graph must precede github + // (github requires it); resolveDependencies derives that ordering from + // github's requires.plugins regardless of position in this array. The + // github poll source stays dormant — github registers but never starts + // it, and the wrapped registry below would suppress it anyway. + // @ref LLP 0010#in-kernel-graph [implements] — context-graph + github load into the server kernel; capture is an admin one-shot, not a daemon source + { dir: path.join(bundledWorkspaceDir, 'context-graph'), config: {} }, + { dir: path.join(SERVER_PLUGINS_DIR, 'github'), config: githubSection(config) }, { dir: path.join(SERVER_PLUGINS_DIR, 'control-plane'), config: {} }, ] return { plugins, destination } } +/** + * The `[github]` config section injected into the vendored github source + * plugin. Repo selection comes from server env (HYPSERVER_GITHUB_*); the + * token value never lands in config — the github client reads it from the + * box env by the name in `token_env` at request time. No `poll_interval`: + * capture runs only as the admin-triggered backfill one-shot, never a poll. + * + * @ref LLP 0010#server-pulls-github [implements] — server pulls GitHub directly; repo selection from env, token stays in box env + * @param {ServerConfig} config + * @returns {Record} + */ +function githubSection(config) { + const gh = config.github + return { + orgs: gh.orgs, + repos: gh.repos, + ignore: gh.ignore, + token_env: gh.tokenEnv, + } +} + /** * Boot the kernel for server use: a fresh kernel runtime with the * catalog-backed registry injected, the bundled + server plugin set diff --git a/src/catalog/registry.js b/src/catalog/registry.js index 2fb4c43..b8ba8ac 100644 --- a/src/catalog/registry.js +++ b/src/catalog/registry.js @@ -10,6 +10,19 @@ import { STAMPED_COLUMNS } from './catalog.js' * @import { ArchiveHandle, Catalog } from '../types.d.ts' */ +/** + * Plugins whose datasets are server-side-generated and manage their own cache + * layout + read closures — never wire-ingested. The registry keeps their + * `discoverPartitions`/`createDataSource` instead of synthesizing the date= + * partition reads it uses for forwarded ingest datasets (logs/traces/metrics/ + * ai_gateway_messages, which arrive over the wire and land in date= partitions + * via the mover). `@hypaware/ai-gateway` also ships a createDataSource, so the + * split must be by plugin, not by closure presence. + * + * @ref LLP 0010#self-managed-datasets [implements] — graph datasets keep their own read path; the date-partition synthesis is only for forwarded ingest + */ +const SELF_MANAGED_PLUGINS = new Set(['@hypaware/server', '@hypaware/context-graph', '@hypaware/github']) + /** * Catalog-backed implementation of the kernel's three-method * QueryRegistry interface, injected via @@ -130,9 +143,11 @@ export function createCatalogRegistry({ catalog, getArchive }) { return { registerDataset(dataset) { - if (dataset.plugin === '@hypaware/server' && typeof dataset.createDataSource === 'function' && !catalog.get(dataset.name)) { - // Server-origin datasets with bespoke read paths (the gateway - // registry) keep their own closures and skip the catalog. + if (SELF_MANAGED_PLUGINS.has(dataset.plugin) && typeof dataset.createDataSource === 'function' && !catalog.get(dataset.name)) { + // Server-origin and graph datasets with bespoke read paths (the + // gateway registry; github_events; node/edge) keep their own + // closures and skip the catalog — their cache layout is source=/ + // graph_v1 partitioned, not the date= layout the catalog synthesizes. custom.set(dataset.name, dataset) return } diff --git a/src/config.js b/src/config.js index 2328ebc..dcff5da 100644 --- a/src/config.js +++ b/src/config.js @@ -73,9 +73,31 @@ export function loadServerConfig(env = process.env) { archiveSchedule: env.HYPSERVER_ARCHIVE_SCHEDULE ?? '0 * * * *', archiveCloseLagMs: intEnv(env.HYPSERVER_ARCHIVE_CLOSE_LAG_MS, 3_600_000), maxBodyBytes: intEnv(env.HYPSERVER_MAX_BODY_BYTES, 64 * 1024 * 1024), + // @ref LLP 0010#server-pulls-github [implements] — server-side GitHub capture selection; the token value stays in the box env (read by name at request time), never in config + github: { + orgs: listEnv(env.HYPSERVER_GITHUB_ORGS), + repos: listEnv(env.HYPSERVER_GITHUB_REPOS), + ignore: listEnv(env.HYPSERVER_GITHUB_IGNORE), + tokenEnv: env.HYPSERVER_GITHUB_TOKEN_ENV ?? 'GITHUB_TOKEN', + }, } } +/** + * Split a comma/whitespace-separated env value into a trimmed, non-empty + * list. Absent or blank yields `[]`. + * + * @param {string | undefined} raw + * @returns {string[]} + */ +function listEnv(raw) { + if (!raw) return [] + return raw + .split(',') + .map((s) => s.trim()) + .filter((s) => s.length > 0) +} + /** * @param {string | undefined} raw * @param {number} fallback diff --git a/src/daemon.js b/src/daemon.js index 447adde..c4b53f6 100644 --- a/src/daemon.js +++ b/src/daemon.js @@ -5,7 +5,19 @@ import fs from 'node:fs' import { getLogger, installObservability } from 'hypaware/core' import { executeQuerySql } from 'hypaware/core/query' -import { cronMatches, maintainCache, normalizeMaintenanceConfig } from './kernel/shim.js' +import { + cronMatches, + maintainCache, + normalizeMaintenanceConfig, + projectGraph, + queryNeighbors, + requireGraphRuntime, +} from './kernel/shim.js' +// Vendored github source plugin (LLP 0010): imported by the SAME in-repo path +// the kernel loader uses, so this is the module instance activate() populated. +import { requireGithubRuntime } from '../plugins/github/src/runtime.js' +import { runCaptureTick } from '../plugins/github/src/tick.js' +import { githubEventsTablePath } from '../plugins/github/src/dataset.js' import { loadServerConfig } from './config.js' import { bootServerKernel } from './boot.js' import { createCatalog } from './catalog/catalog.js' @@ -136,6 +148,46 @@ export async function runServerDaemon(opts = {}) { return { columns: result.columns, rows: result.rows, datasets: result.datasets } } + // Server-side graph operations (LLP 0010): the graph analogue of the + // mover/archive/eviction escape hatches above. github capture is a + // one-shot (the poll source stays dormant); projection and neighbors run + // the graph plugin's own pure functions over the same kernel query + + // storage handles executeSql uses, so they read the forwarded LLM logs + // and github_events in one cache. + // @ref LLP 0010#in-kernel-graph [implements] — capture one-shot + projection/neighbors over the server kernel, not the SQL-only attach + services.githubBackfill = async (opts = {}) => { + const report = await runCaptureTick(requireGithubRuntime(), { mode: 'backfill', only: opts.repos }) + // appendRows buffers in the cache writer; flush so the freshly captured + // github_events are visible to graph project / admin SQL on this same + // kernel — the mover flushes its spooled writes the same way. + await storage.flushTable(githubEventsTablePath(storage), { reason: 'server_github_backfill', force: true }) + return report + } + services.graphProject = async (opts = {}) => { + const all = requireGraphRuntime().registry.list() + const contracts = opts.source ? all.filter((c) => c.sourceDataset === opts.source) : all + return projectGraph({ + query: runtime.query, + storage: runtime.storage, + contracts, + config: /** @type {any} */ (hypConfig), + dryRun: opts.dryRun === true, + }) + } + services.graphNeighbors = async (args) => { + return queryNeighbors({ + query: runtime.query, + storage: runtime.storage, + config: /** @type {any} */ (hypConfig), + seed: args.node, + depth: args.depth, + type: args.type, + edgeTypes: args.edgeTypes, + direction: args.direction, + limit: args.limit, + }) + } + // ----- Loops ----- /** @type {NodeJS.Timeout[]} */ const timers = [] diff --git a/src/http/routes-admin.js b/src/http/routes-admin.js index 18494c3..a055a74 100644 --- a/src/http/routes-admin.js +++ b/src/http/routes-admin.js @@ -107,9 +107,88 @@ export async function handleAdmin(ctx) { return sendJson(res, 200, report) } + // Server-side graph operations: pull GitHub into github_events, run the T0 + // projection, and walk neighbors — the graph analogue of the mover/archive + // escape hatches, executed on the server's own kernel rather than the + // SQL-only attach surface. + // @ref LLP 0010#admin-operations [implements] — graph capture/project/neighbors are admin operations on the server kernel, the precedent set by LLP 0006 + if (url.pathname === '/v1/admin/github/backfill' && req.method === 'POST') { + const body = await parseJson(req, services.config.maxBodyBytes) ?? {} + const repos = normalizeRepoList(body.repos) + if (repos === null) { + return sendJson(res, 400, { error: 'invalid_repos', detail: 'repos must be an array of "owner/repo" strings' }) + } + try { + const report = await services.githubBackfill(repos.length > 0 ? { repos } : {}) + return sendJson(res, 200, report) + } catch (err) { + return sendJson(res, 400, { error: 'github_backfill_failed', detail: errText(err) }) + } + } + if (url.pathname === '/v1/admin/graph/project' && req.method === 'POST') { + const body = await parseJson(req, services.config.maxBodyBytes) ?? {} + try { + const report = await services.graphProject({ + source: typeof body.source === 'string' ? body.source : undefined, + dryRun: body.dry_run === true, + }) + return sendJson(res, 200, report) + } catch (err) { + return sendJson(res, 400, { error: 'graph_project_failed', detail: errText(err) }) + } + } + if (url.pathname === '/v1/admin/graph/neighbors' && req.method === 'POST') { + const body = await parseJson(req, services.config.maxBodyBytes) ?? {} + if (typeof body.node !== 'string' || body.node.length === 0) { + return sendJson(res, 400, { error: 'node_required' }) + } + /** @type {'out' | 'in' | 'both' | undefined} */ + const direction = body.direction === 'in' || body.direction === 'both' || body.direction === 'out' ? body.direction : undefined + try { + const result = await services.graphNeighbors({ + node: body.node, + depth: typeof body.depth === 'number' ? body.depth : undefined, + type: typeof body.type === 'string' ? body.type : undefined, + edgeTypes: Array.isArray(body.edge_types) ? body.edge_types.map(String) : undefined, + direction, + limit: typeof body.limit === 'number' ? body.limit : undefined, + }) + // A failed traversal (seed not found / ambiguous) is a 400 with the + // candidate detail the query returns, mirroring handleQuery. + return sendJson(res, result && result.ok === false ? 400 : 200, result) + } catch (err) { + return sendJson(res, 400, { error: 'graph_neighbors_failed', detail: errText(err) }) + } + } + return sendJson(res, 404, { error: 'unknown_path' }) } +/** + * Validate an optional `repos` body field for github backfill: absent → `[]` + * (whole configured selection), otherwise an array of `owner/repo` strings. + * Returns `null` on any malformed entry so the caller can 400. + * + * @param {unknown} raw + * @returns {string[] | null} + */ +function normalizeRepoList(raw) { + if (raw === undefined || raw === null) return [] + if (!Array.isArray(raw)) return null + /** @type {string[]} */ + const out = [] + for (const item of raw) { + if (typeof item !== 'string' || !/^[^/\s]+\/[^/\s]+$/.test(item)) return null + out.push(item) + } + return out +} + +/** @param {unknown} err @returns {string} */ +function errText(err) { + return err instanceof Error ? err.message : String(err) +} + /** * `POST /v1/query` — remote SQL executed by the server's kernel query * plane, transparently spanning the cache tier and the Iceberg diff --git a/src/kernel/shim.js b/src/kernel/shim.js index b8a4d9b..91d20bc 100644 --- a/src/kernel/shim.js +++ b/src/kernel/shim.js @@ -47,3 +47,21 @@ export const fetchPlugin = pluginFetch.fetchPlugin /** Bundled plugin workspace shipped inside the hypaware package. */ export const bundledWorkspaceDir = path.join(kernelRoot, 'hypaware-core', 'plugins-workspace') + +// Server-side graph: the @hypaware/context-graph plugin ships inside the +// bundled workspace. These are plugin functions (pure projection + traversal, +// plus the contract-registry singleton accessor) rather than kernel host +// internals — but the server invokes them the same disciplined way, through +// this one module. Crucially, they are anchored on the SAME bundledWorkspaceDir +// the loader used to activate the plugin, so the module URLs are identical and +// `requireGraphRuntime()` returns the very registry singleton that github's +// activate() populated. Importing here doesn't require activation; the accessor +// only throws if called before boot completes (we call it at request time). +// @ref LLP 0010#in-kernel-graph [implements] — invoke the graph plugin's own pure functions over the server kernel's query+storage handles +const contextGraphSrc = path.join(bundledWorkspaceDir, 'context-graph', 'src') +const ctxGraphProject = await import(pathToFileURL(path.join(contextGraphSrc, 'project.js')).href) +const ctxGraphQuery = await import(pathToFileURL(path.join(contextGraphSrc, 'query.js')).href) +const ctxGraphRuntime = await import(pathToFileURL(path.join(contextGraphSrc, 'runtime.js')).href) +export const projectGraph = ctxGraphProject.projectGraph +export const queryNeighbors = ctxGraphQuery.queryNeighbors +export const requireGraphRuntime = ctxGraphRuntime.requireGraphRuntime diff --git a/src/types.d.ts b/src/types.d.ts index 15b173c..7275c75 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -50,6 +50,24 @@ export interface ServerConfig { archiveSchedule: string archiveCloseLagMs: number maxBodyBytes: number + /** Server-side GitHub capture selection (LLP 0010). */ + github: GithubServerConfig +} + +/** + * Selection for the in-kernel github source plugin (LLP 0010). The GitHub + * token is NOT here: `tokenEnv` names the box env var the github client + * reads at request time, so the secret never lands in config. + */ +export interface GithubServerConfig { + /** Org logins to enumerate every repo from. */ + orgs: string[] + /** Explicit `owner/repo` entries. */ + repos: string[] + /** `owner/repo` entries to exclude (forward-only ignore). */ + ignore: string[] + /** Name of the box env var holding the GitHub token (default `GITHUB_TOKEN`). */ + tokenEnv: string } export interface ArchiveConfig { @@ -190,10 +208,44 @@ export interface ServerServices { archiveScheduler: ReturnType eviction: ReturnType executeSql(query: string): Promise + /** + * Pull GitHub history for the configured selection into `github_events` + * (admin one-shot; LLP 0010). `repos` narrows to a subset of the selection. + */ + githubBackfill(opts?: { repos?: string[] }): Promise + /** Run the T0 projection over the registered contracts (LLP 0010). */ + graphProject(opts?: { source?: string; dryRun?: boolean }): Promise + /** Walk the node/edge graph from a seed node (LLP 0010). */ + graphNeighbors(args: GraphNeighborsArgs): Promise /** Invoked by the control plane once the HTTP port is bound. */ onListening?: (port: number) => void } +export interface GithubBackfillReport { + repos: number + events: number + errors: Array<{ repo: string; error: string }> +} + +export interface GraphProjectReport { + nodes: number + edges: number + nodesWritten: number + edgesWritten: number +} + +export interface GraphNeighborsArgs { + node: string + depth?: number + type?: string + edgeTypes?: string[] + direction?: 'out' | 'in' | 'both' + limit?: number +} + +/** `TraversalOk | TraversalErr` from the context-graph query (LLP 0026). */ +export type GraphTraversalResult = { ok: boolean } & Record + export interface RouteContext { req: IncomingMessage res: ServerResponse diff --git a/test/smoke.js b/test/smoke.js index a9d43ee..137a433 100644 --- a/test/smoke.js +++ b/test/smoke.js @@ -43,10 +43,18 @@ const env = { // high-water gate first, so this does not perturb other checks. HYPSERVER_BYTE_BURST_PER_GATEWAY: String(8 * 1024), HYPSERVER_BYTE_RATE_PER_GATEWAY: '1', + // Server-side graph (LLP 0010): one repo in the selection so the + // github-backfill check below has something to capture (via the injected + // in-memory client, no network). + HYPSERVER_GITHUB_REPOS: 'acme/widgets', } const daemon = await runServerDaemon({ env, installSignalHandlers: false }) const base = `http://127.0.0.1:${daemon.port()}` +// The vendored github plugin's runtime singleton: the graph checks inject an +// in-memory GitHub client here so `github backfill` exercises the real cache +// append path with no network. +const { requireGithubRuntime } = await import('../plugins/github/src/runtime.js') let passed = 0 /** @@ -101,6 +109,30 @@ async function runMover() { assert.equal(r.status, 200) } +/** + * In-memory GitHub client (no network) matching the plugin's GithubClient + * surface. `repos` is keyed by lowercased `owner/repo`; only the fields the + * capture passes read are returned. Mirrors the github plugin's own test fake. + * + * @param {{ orgRepos?: Record, repos?: Record }} data + */ +function fakeClient(data) { + const orgRepos = data.orgRepos ?? {} + const repos = data.repos ?? {} + const get = (/** @type {string} */ owner, /** @type {string} */ repo) => repos[`${owner}/${repo}`.toLowerCase()] ?? {} + return { + async listOrgRepos(/** @type {string} */ org) { return orgRepos[org] ?? [] }, + async listIssues(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).issues ?? [] }, + async listPullRequests(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).pulls ?? [] }, + async listPullRequestFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { return get(owner, repo).prFiles?.[n] ?? [] }, + async listPullRequestReviews(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { return get(owner, repo).prReviews?.[n] ?? [] }, + async listPullRequestCommits(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { return get(owner, repo).prCommits?.[n] ?? [] }, + async listCommits(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).commits ?? [] }, + async listCommitFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {string} */ sha) { return get(owner, repo).commitFiles?.[sha] ?? [] }, + async listIssueComments(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).comments ?? [] }, + } +} + try { // ----- Config authoring (LLP 0009) ----- // A local-dir plugin exercises the real pinning path (kernel @@ -525,6 +557,53 @@ try { const remaining = fs.existsSync(cacheLogsDir) ? fs.readdirSync(cacheLogsDir).filter((d) => d.startsWith('date=')) : [] check('cache day partition actually deleted', remaining.length === 0, remaining) + // ----- Server-side graph: capture, project, traverse (LLP 0010) ----- + // Inject an in-memory GitHub client into the activated github plugin's + // runtime singleton, so `github backfill` drives the real cache append + // path with no network. The fixture is one repo with an issue, a PR (one + // changed file + a review) and a commit (touching that same file) — enough + // to mint every bridge-ready node type and a connected edge set. + requireGithubRuntime().clientFactory = () => fakeClient({ + repos: { + 'acme/widgets': { + issues: [{ number: 1, state: 'open', created_at: '2026-01-01T00:00:00Z', user: { login: 'alice', type: 'User' } }], + pulls: [{ number: 2, state: 'open', merged_at: null, created_at: '2026-01-02T00:00:00Z', updated_at: '2026-01-02T00:00:00Z', user: { login: 'bob', type: 'User' } }], + prFiles: { 2: ['src/app.js'] }, + prReviews: { 2: [{ id: 100, state: 'APPROVED', submitted_at: '2026-01-03T00:00:00Z', user: { login: 'carol', type: 'User' } }] }, + commits: [{ sha: 'a'.repeat(40), author: { login: 'alice', type: 'User' }, commit: { author: { date: '2026-01-01T00:00:00Z' } } }], + commitFiles: { ['a'.repeat(40)]: ['src/app.js'] }, + }, + }, + }) + + const backfill = await call('POST', '/v1/admin/github/backfill', { token: admin.token, json: {} }) + check('github backfill captured events into the server cache', backfill.status === 200 && backfill.body.events > 0 && backfill.body.repos === 1, backfill.body) + check('github backfill reported no per-repo errors', Array.isArray(backfill.body.errors) && backfill.body.errors.length === 0, backfill.body.errors) + + const ghCount = await sql('SELECT COUNT(*) AS n FROM github_events') + check('github_events rows queryable on the server', Number(ghCount.rows[0].n) > 0, ghCount.rows) + + const project1 = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: {} }) + check('graph project materialized nodes', project1.status === 200 && project1.body.nodes > 0, project1.body) + check('graph project materialized edges', project1.body.edges > 0, project1.body) + check('graph project wrote fresh node + edge rows', project1.body.nodesWritten > 0 && project1.body.edgesWritten > 0, project1.body) + + const project2 = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: {} }) + check('graph project is idempotent (re-run writes nothing new)', project2.body.nodesWritten === 0 && project2.body.edgesWritten === 0, project2.body) + + const nodeCount = await sql('SELECT COUNT(*) AS n FROM node') + check('node dataset queryable on the server', Number(nodeCount.rows[0].n) > 0, nodeCount.rows) + + const neighbors = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both' } }) + check('graph neighbors resolves the Repo seed', neighbors.status === 200 && neighbors.body.ok === true, neighbors.body) + check('graph neighbors reaches the repo activity', Number(neighbors.body.reachable) > 0, neighbors.body) + + const missing = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'no/such-seed-xyz' } }) + check('graph neighbors 400s on an unknown seed', missing.status === 400 && missing.body.ok === false, missing.body) + + const badRepos = await call('POST', '/v1/admin/github/backfill', { token: admin.token, json: { repos: ['not-a-slug'] } }) + check('github backfill rejects a malformed repo slug', badRepos.status === 400 && badRepos.body.error === 'invalid_repos', badRepos.body) + // ----- Restart durability: catalog, ledger, identity survive ----- await daemon.stop() const daemon2 = await runServerDaemon({ env, installSignalHandlers: false }) From 4bf271f89e3ce2e8ab427961206f38f72f268b2c Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Mon, 22 Jun 2026 14:08:07 -0700 Subject: [PATCH 3/4] feat: load ai-gateway-graph connector for GitHub<->LLM convergence Maps forwarded ai_gateway_messages into the same node/edge graph; its bridge-ready Repo/Commit/File keys converge by content-addressed id with github's, so one graph-project spans both sources. Co-Authored-By: Claude Opus 4.8 (1M context) --- llp/0010-server-side-graph.decision.md | 9 +++++++++ src/boot.js | 7 +++++++ 2 files changed, 16 insertions(+) diff --git a/llp/0010-server-side-graph.decision.md b/llp/0010-server-side-graph.decision.md index 9db9a71..f86a87a 100644 --- a/llp/0010-server-side-graph.decision.md +++ b/llp/0010-server-side-graph.decision.md @@ -37,6 +37,15 @@ resolves. The plugins activate for their **registrations** — github_events / node / edge datasets, the github T0 contract, and the `github backfill` / `graph project` / `graph neighbors` commands — exactly as on a client. +Alongside them the server also loads `@hypaware/ai-gateway-graph` — the +cross-source connector that maps the forwarded `ai_gateway_messages` (the LLM +sessions) into the same node/edge graph. It is pure contract registration (no +source, no config; requires context-graph), and its bridge-ready +`Repo`/`Commit`/`File` keys are content-addressed identically to github's, so a +single `graph project` run spans both sources and the two converge by id — the +GitHub↔LLM join 1.6's git-bridge (`git_remote`/`head_sha`/`repo_root`) was built +for (hypaware LLP 0032). + This reuses the kernel-host substrate of [LLP 0002](./0002-kernel-reuse.decision.md): the server expresses behavior as activated plugins, not bespoke code. Projection and traversal are the graph plugin's own **pure functions** — `projectGraph()` and diff --git a/src/boot.js b/src/boot.js index 98ca39e..2d0e7b1 100644 --- a/src/boot.js +++ b/src/boot.js @@ -59,6 +59,13 @@ function pluginSelection(config) { // @ref LLP 0010#in-kernel-graph [implements] — context-graph + github load into the server kernel; capture is an admin one-shot, not a daemon source { dir: path.join(bundledWorkspaceDir, 'context-graph'), config: {} }, { dir: path.join(SERVER_PLUGINS_DIR, 'github'), config: githubSection(config) }, + // The cross-source connector: maps the forwarded ai_gateway_messages into + // the SAME node/edge graph. Pure contract registration (no source, no + // config); requires context-graph. Its bridge-ready Repo/Commit/File keys + // converge by content-addressed id with github's, so LLM sessions and + // GitHub activity join in one graph (hypaware LLP 0032 git-bridge). + // @ref LLP 0010#in-kernel-graph [implements] — load the ai_gateway_messages→graph contract so projection spans both sources + { dir: path.join(bundledWorkspaceDir, 'ai-gateway-graph'), config: {} }, { dir: path.join(SERVER_PLUGINS_DIR, 'control-plane'), config: {} }, ] return { plugins, destination } From 740c32035789d299ead9c6faf1718bcb2b212938 Mon Sep 17 00:00:00 2001 From: Phillip Cunliffe Date: Tue, 23 Jun 2026 12:11:09 -0700 Subject: [PATCH 4/4] test: close server-side-graph coverage gaps from PR #6 review The dual-review's request_changes rested on five test-coverage gaps on the new GitHub-capture surface (no logic bug survived scrutiny). Extend the hermetic smoke (LLP 0010's designated driver), 92 -> 113 checks: - incremental poll / cursor advancement: a cursor-aware fake + a real poll-mode tick (no-change captures nothing & skips PR descent; advanced high-water captures only new rows & moves the cursor on) + the github-cursors.json sidecar round-trip - graph/neighbors filters: direction in/out asymmetry, edge_types restriction (+ bogus -> 0), limit truncation, depth bounding - bin/admin.js CLI wrappers: spawned end-to-end against the daemon - resolveRepos (union/minus/lowercase/dedupe/sort) + captureRepos per-repo error isolation - graph/project --source filter + dry_run safe-preview Also align the stale config.plugin_pinned log key (content_hash -> artifact_hash, matching the renamed wire pin field) and update LLP 0010's smoke-coverage bullet. The sub-threshold poll-mode comment-misclassification and vendored Code-Style/@ref items stay deferred: plugins/github/ is LLP 0010's vendored carve-out and the poll path is dormant. Co-Authored-By: Claude Opus 4.8 (1M context) --- llp/0010-server-side-graph.decision.md | 9 +- src/configs/save-pipeline.js | 4 +- test/smoke.js | 234 +++++++++++++++++++++++-- 3 files changed, 232 insertions(+), 15 deletions(-) diff --git a/llp/0010-server-side-graph.decision.md b/llp/0010-server-side-graph.decision.md index f86a87a..68d238c 100644 --- a/llp/0010-server-side-graph.decision.md +++ b/llp/0010-server-side-graph.decision.md @@ -124,7 +124,14 @@ data is on disk. - The hermetic smoke test ([test/smoke.js](../test/smoke.js)) drives the full chain — inject an in-memory GitHub client into the runtime singleton, `github backfill`, `graph project` (asserting idempotence on re-run), and `graph neighbors` — with no - network, exercising the real cache append + read path. + network, exercising the real cache append + read path. It also pins the surfaces a + backfill-only path leaves dormant: the per-repo cursor sidecar round-trip and a + `poll`-mode tick (no-change captures nothing and skips PR descent; an advanced + high-water captures only new rows and moves the cursor on); `resolveRepos` / + `captureRepos` repo-set resolution and per-repo error isolation; the `graph project` + `--source` filter and `dry_run` safe-preview; the `graph neighbors` + direction/`edge_types`/`limit`/`depth` parameters; and the `hypaware-server-admin` + CLI wrappers, spawned end-to-end against the running daemon. - **`plugins/github/` is vendored third-party code and is out of scope for this repo's `/ref-check`.** Its source carries `@ref LLP NNNN` annotations that resolve against the **`@hypaware/github`** corpus (where, e.g., LLP 0002 is "capture-model"), diff --git a/src/configs/save-pipeline.js b/src/configs/save-pipeline.js index c1ff8c6..d994036 100644 --- a/src/configs/save-pipeline.js +++ b/src/configs/save-pipeline.js @@ -217,7 +217,9 @@ async function resolveAndPin({ entry, pinStateDir, log }) { log.info('config.plugin_pinned', { plugin: entry.name, version: fetched.manifest.version, - content_hash: fetched.contentHash, + // The served pin field is `artifact_hash` (the content hash the client + // verifies the staged artifact against); keep the log key in step with it. + artifact_hash: fetched.contentHash, }) return { ok: true, diff --git a/test/smoke.js b/test/smoke.js index 137a433..68cb549 100644 --- a/test/smoke.js +++ b/test/smoke.js @@ -12,6 +12,10 @@ import assert from 'node:assert' import fs from 'node:fs' import os from 'node:os' import path from 'node:path' +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' + +const execFileP = promisify(execFile) const tmpRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'hypaware-server-smoke-')) process.env.HYP_HOME = path.join(tmpRoot, 'hyp-home') @@ -55,6 +59,13 @@ const base = `http://127.0.0.1:${daemon.port()}` // in-memory GitHub client here so `github backfill` exercises the real cache // append path with no network. const { requireGithubRuntime } = await import('../plugins/github/src/runtime.js') +// Capture internals the graph checks drive directly: a `poll`-mode tick (the +// admin surface only ever runs `backfill`) and the pure repo-resolution / +// per-repo-isolation functions. +const { runCaptureTick } = await import('../plugins/github/src/tick.js') +const { resolveRepos, captureRepos } = await import('../plugins/github/src/capture.js') +// The server-local admin CLI, exercised end-to-end against this daemon. +const adminCliPath = path.join(import.meta.dirname, '..', 'bin', 'admin.js') let passed = 0 /** @@ -112,7 +123,17 @@ async function runMover() { /** * In-memory GitHub client (no network) matching the plugin's GithubClient * surface. `repos` is keyed by lowercased `owner/repo`; only the fields the - * capture passes read are returned. Mirrors the github plugin's own test fake. + * capture reads are returned. + * + * Beyond returning fixtures it MIRRORS github_client.js's cursor contract — the + * high-level fake stands in for the real fetch client, so the time-windowed + * listings (issues / pulls / commits / comments) return only rows strictly + * newer than the per-repo `since` high-water and advance it, exactly as a real + * `since=` / `If-None-Match` request would. That makes an unchanged poll an + * empty listing, so the incremental checks below can assert "no new activity → + * zero events". `.calls` counts PR sub-resource fetches, so a poll test can + * prove descent happens only when `changedSince` admits a PR. A repo may set + * `throwOnList` to force a per-repo capture failure (the error-isolation test). * * @param {{ orgRepos?: Record, repos?: Record }} data */ @@ -120,19 +141,63 @@ function fakeClient(data) { const orgRepos = data.orgRepos ?? {} const repos = data.repos ?? {} const get = (/** @type {string} */ owner, /** @type {string} */ repo) => repos[`${owner}/${repo}`.toLowerCase()] ?? {} + const calls = { prFiles: 0, prReviews: 0, prCommits: 0 } + + const issueTs = (/** @type {any} */ r) => (typeof r.updated_at === 'string' ? r.updated_at : typeof r.created_at === 'string' ? r.created_at : null) + const commitTs = (/** @type {any} */ r) => r?.commit?.committer?.date ?? r?.commit?.author?.date ?? null + // Return rows newer than `since`, and advance the cursor high-water to the + // newest row seen — the advanceSince / advanceSinceCommit contract. + const windowed = (/** @type {any[]} */ rows, /** @type {any} */ cursor, /** @type {string} */ key, /** @type {(r: any) => string | null} */ tsOf) => { + const since = cursor.since?.[key] + let max = since + for (const r of rows) { const t = tsOf(r); if (t && (!max || t > max)) max = t } + if (max) (cursor.since ??= {})[key] = max + return rows.filter((r) => { const t = tsOf(r); return !since || (t != null && t > since) }) + } + const guard = (/** @type {any} */ r) => { if (r.throwOnList) throw new Error(r.throwOnList) } + return { + calls, async listOrgRepos(/** @type {string} */ org) { return orgRepos[org] ?? [] }, - async listIssues(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).issues ?? [] }, - async listPullRequests(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).pulls ?? [] }, - async listPullRequestFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { return get(owner, repo).prFiles?.[n] ?? [] }, - async listPullRequestReviews(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { return get(owner, repo).prReviews?.[n] ?? [] }, - async listPullRequestCommits(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { return get(owner, repo).prCommits?.[n] ?? [] }, - async listCommits(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).commits ?? [] }, + async listIssues(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + return windowed(r.issues ?? [], cursor, 'issues', issueTs) + }, + async listPullRequests(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + // pulls are etag-gated in the real client and capture's advancePullsHigh + // owns this high-water, so here just drop pulls at/under it. + const since = cursor.since?.pulls + return (r.pulls ?? []).filter((/** @type {any} */ pr) => { const t = issueTs(pr); return !since || (t != null && t > since) }) + }, + async listPullRequestFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { calls.prFiles++; return get(owner, repo).prFiles?.[n] ?? [] }, + async listPullRequestReviews(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { calls.prReviews++; return get(owner, repo).prReviews?.[n] ?? [] }, + async listPullRequestCommits(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { calls.prCommits++; return get(owner, repo).prCommits?.[n] ?? [] }, + async listCommits(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + return windowed(r.commits ?? [], cursor, 'commits', commitTs) + }, async listCommitFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {string} */ sha) { return get(owner, repo).commitFiles?.[sha] ?? [] }, - async listIssueComments(/** @type {string} */ owner, /** @type {string} */ repo) { return get(owner, repo).comments ?? [] }, + async listIssueComments(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + return windowed(r.comments ?? [], cursor, 'comments', issueTs) + }, } } +/** + * Run the server-local admin CLI (`bin/admin.js`) against this daemon, returning + * its parsed stdout JSON. Exercises the real flag→body mapping end-to-end. + * + * @param {string[]} args + */ +async function adminCli(args) { + const { stdout } = await execFileP(process.execPath, [adminCliPath, ...args], { + env: { ...process.env, HYPSERVER_URL: base, HYPSERVER_ADMIN_TOKEN: admin.token }, + }) + return JSON.parse(stdout) +} + try { // ----- Config authoring (LLP 0009) ----- // A local-dir plugin exercises the real pinning path (kernel @@ -563,7 +628,7 @@ try { // path with no network. The fixture is one repo with an issue, a PR (one // changed file + a review) and a commit (touching that same file) — enough // to mint every bridge-ready node type and a connected edge set. - requireGithubRuntime().clientFactory = () => fakeClient({ + const widgetsFixture = { repos: { 'acme/widgets': { issues: [{ number: 1, state: 'open', created_at: '2026-01-01T00:00:00Z', user: { login: 'alice', type: 'User' } }], @@ -574,15 +639,34 @@ try { commitFiles: { ['a'.repeat(40)]: ['src/app.js'] }, }, }, - }) + } + requireGithubRuntime().clientFactory = () => fakeClient(widgetsFixture) const backfill = await call('POST', '/v1/admin/github/backfill', { token: admin.token, json: {} }) check('github backfill captured events into the server cache', backfill.status === 200 && backfill.body.events > 0 && backfill.body.repos === 1, backfill.body) check('github backfill reported no per-repo errors', Array.isArray(backfill.body.errors) && backfill.body.errors.length === 0, backfill.body.errors) + // The cursor sidecar round-trips through disk (tick.js writeCursors → + // cursors.js), and backfill advances the pulls high-water (advancePullsHigh) + // even though it re-fetches full history — so the poll below resumes past it. + const cursorsPath = path.join(requireGithubRuntime().stateDir, 'github-cursors.json') + const cursorAfterBackfill = JSON.parse(fs.readFileSync(cursorsPath, 'utf8')) + check('github backfill persisted the per-repo cursor high-water (sidecar round-trip)', + cursorAfterBackfill.schema_version === 1 && cursorAfterBackfill.repos['acme/widgets']?.since?.pulls === '2026-01-02T00:00:00Z', + cursorAfterBackfill) + const ghCount = await sql('SELECT COUNT(*) AS n FROM github_events') check('github_events rows queryable on the server', Number(ghCount.rows[0].n) > 0, ghCount.rows) + // dry_run computes the would-be graph but writes nothing — the safe-preview + // contract. Run it before the first real projection so "writes nothing" is + // observable as a still-empty node table. + const dryRun = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: { dry_run: true } }) + check('graph project dry_run computes nodes but writes none', + dryRun.status === 200 && dryRun.body.nodes > 0 && dryRun.body.nodesWritten === 0 && dryRun.body.edgesWritten === 0, dryRun.body) + const preProjectNodes = await sql('SELECT COUNT(*) AS n FROM node') + check('graph project dry_run materialized nothing', Number(preProjectNodes.rows[0].n) === 0, preProjectNodes.rows) + const project1 = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: {} }) check('graph project materialized nodes', project1.status === 200 && project1.body.nodes > 0, project1.body) check('graph project materialized edges', project1.body.edges > 0, project1.body) @@ -591,12 +675,57 @@ try { const project2 = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: {} }) check('graph project is idempotent (re-run writes nothing new)', project2.body.nodesWritten === 0 && project2.body.edgesWritten === 0, project2.body) + // --source scopes projection to one source's contracts: github_events still + // projects nodes; a source no contract claims projects nothing (the filter + // discriminates, not a silent whole-graph run). + const projectGithub = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: { source: 'github_events' } }) + check('graph project --source github_events projects that source', projectGithub.status === 200 && projectGithub.body.nodes > 0, projectGithub.body) + const projectNoSource = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: { source: '__no_such_source__' } }) + check('graph project --source with no matching contract projects nothing', + projectNoSource.body.nodes === 0 && projectNoSource.body.nodesWritten === 0, projectNoSource.body) + const nodeCount = await sql('SELECT COUNT(*) AS n FROM node') check('node dataset queryable on the server', Number(nodeCount.rows[0].n) > 0, nodeCount.rows) - const neighbors = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both' } }) - check('graph neighbors resolves the Repo seed', neighbors.status === 200 && neighbors.body.ok === true, neighbors.body) - check('graph neighbors reaches the repo activity', Number(neighbors.body.reachable) > 0, neighbors.body) + const nbBoth = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both' } }) + check('graph neighbors resolves the Repo seed', nbBoth.status === 200 && nbBoth.body.ok === true, nbBoth.body) + check('graph neighbors reaches the repo activity', Number(nbBoth.body.reachable) > 0, nbBoth.body) + + // The filter parameters most prone to a wrong-direction / off-by-one bug: + // direction, edge_types, and limit must each change the reachable set the + // way the traversal promises (a subset, a type-restriction, a truncation). + // From the Repo seed every edge points inward (Commit/Issue/PR → Repo), so + // `in` reaches the activity while `out` reaches nothing — an asymmetry that + // only holds if direction is actually honored, not ignored. + const nbOut = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'out' } }) + const nbIn = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'in' } }) + check('graph neighbors honors direction (in and out reach different sets, both bounds them)', + nbOut.body.ok === true && nbIn.body.ok === true + && nbOut.body.neighbors.every((/** @type {any} */ n) => n.direction === 'out') + && nbIn.body.neighbors.every((/** @type {any} */ n) => n.direction === 'in') + && Number(nbOut.body.reachable) !== Number(nbIn.body.reachable) + && Number(nbOut.body.reachable) <= Number(nbBoth.body.reachable) + && Number(nbIn.body.reachable) <= Number(nbBoth.body.reachable), + { out: nbOut.body.reachable, in: nbIn.body.reachable, both: nbBoth.body.reachable }) + + const edgeTypes = [...new Set(nbBoth.body.neighbors.map((/** @type {any} */ n) => n.edge_type))] + const nbFiltered = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both', edge_types: [edgeTypes[0]] } }) + check('graph neighbors edge_types filter restricts traversal to that type', + nbFiltered.body.ok === true && nbFiltered.body.neighbors.every((/** @type {any} */ n) => n.edge_type === edgeTypes[0]) && Number(nbFiltered.body.reachable) <= Number(nbBoth.body.reachable), + { filtered: nbFiltered.body.reachable, type: edgeTypes[0] }) + + const nbBogus = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, edge_types: ['__no_such_edge__'] } }) + check('graph neighbors with an unknown edge_type reaches nothing', nbBogus.body.ok === true && Number(nbBogus.body.reachable) === 0, nbBogus.body) + + const nbLimited = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both', limit: 1 } }) + check('graph neighbors limit truncates the returned set but reports full reachable', + nbLimited.body.neighbors.length <= 1 && Number(nbLimited.body.reachable) === Number(nbBoth.body.reachable) && nbLimited.body.truncated === (Number(nbBoth.body.reachable) > 1), + { returned: nbLimited.body.neighbors.length, reachable: nbLimited.body.reachable, truncated: nbLimited.body.truncated }) + + const nbDepth1 = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 1, direction: 'both' } }) + check('graph neighbors depth bounds the walk (depth 1 ⊆ depth 2)', + Number(nbDepth1.body.reachable) >= 1 && Number(nbDepth1.body.reachable) <= Number(nbBoth.body.reachable), + { d1: nbDepth1.body.reachable, d2: nbBoth.body.reachable }) const missing = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'no/such-seed-xyz' } }) check('graph neighbors 400s on an unknown seed', missing.status === 400 && missing.body.ok === false, missing.body) @@ -604,6 +733,85 @@ try { const badRepos = await call('POST', '/v1/admin/github/backfill', { token: admin.token, json: { repos: ['not-a-slug'] } }) check('github backfill rejects a malformed repo slug', badRepos.status === 400 && badRepos.body.error === 'invalid_repos', badRepos.body) + // ----- Admin CLI wrappers (bin/admin.js): flag→body mapping, end-to-end ----- + // The HTTP routes above are driven directly; the docker-exec CLI glue + // (--edge-type → edge_types:[x], Number() on --depth, --dry-run, positional + // repo) is otherwise uncovered, so spawn it against this daemon. + const cliBackfill = await adminCli(['github-backfill', 'acme/widgets']) + check('admin CLI github-backfill maps a positional repo and captures', + cliBackfill.repos === 1 && cliBackfill.events > 0 && cliBackfill.errors.length === 0, cliBackfill) + const cliProjectDry = await adminCli(['graph-project', '--dry-run']) + check('admin CLI graph-project --dry-run maps to a no-write projection', cliProjectDry.nodesWritten === 0, cliProjectDry) + const cliNeighbors = await adminCli(['graph-neighbors', 'acme/widgets', '--type', 'Repo', '--depth', '2', '--direction', 'both']) + check('admin CLI graph-neighbors maps flags and resolves the seed', + cliNeighbors.ok === true && Number(cliNeighbors.reachable) > 0, cliNeighbors) + + // ----- Repo resolution + per-repo error isolation (capture.js) ----- + // resolveRepos: repos ∪ org-enumeration − ignore, lowercased/deduped/sorted. + const resolved = await resolveRepos( + { repos: ['Acme/Widgets'], orgs: ['acme'], ignore: ['acme/ignored'], token_env: 'GITHUB_TOKEN' }, + fakeClient({ orgRepos: { acme: ['acme/widgets', 'acme/ignored', 'acme/other'] } }), + nullLog, + ) + check('resolveRepos unions repos+orgs, drops ignore, and lowercases/dedupes/sorts', + JSON.stringify(resolved) === JSON.stringify(['acme/other', 'acme/widgets']), resolved) + + // captureRepos: one repo throwing must land in errors[] without aborting the + // tick — the healthy repos around it still capture. + /** @type {Record[]} */ + const isoCaptured = [] + const isoReport = await captureRepos({ + client: fakeClient({ + repos: { + 'ok/one': { issues: [{ number: 1, state: 'open', created_at: '2026-01-01T00:00:00Z', user: { login: 'a', type: 'User' } }] }, + 'bad/two': { throwOnList: 'boom' }, + 'ok/three': { commits: [{ sha: 'c'.repeat(40), author: { login: 'x', type: 'User' }, commit: { author: { date: '2026-01-01T00:00:00Z' } } }] }, + }, + }), + config: { repos: ['ok/one', 'bad/two', 'ok/three'], orgs: [], ignore: [], token_env: 'GITHUB_TOKEN' }, + cursors: { schema_version: 1, repos: {} }, + append: async (rows) => { isoCaptured.push(...rows) }, + log: nullLog, + mode: 'backfill', + }) + check('captureRepos isolates a per-repo failure into errors[] without aborting the tick', + isoReport.repos === 3 && isoReport.errors.length === 1 && isoReport.errors[0].repo === 'bad/two', isoReport) + check('captureRepos still captures the healthy repos around a failure', + isoReport.events > 0 && isoCaptured.some((r) => r.repo === 'ok/one') && isoCaptured.some((r) => r.repo === 'ok/three'), + { events: isoReport.events }) + + // ----- Incremental poll: cursor advancement (capture.js, cursors.js) ----- + // The admin surface only runs backfill, so drive a `poll` tick directly. With + // the cursor at the backfill high-water, a no-change poll captures nothing and + // skips PR descent; a poll whose PR/commit advanced past it captures only the + // new rows and moves the high-water on. (A regression in the cursor math or + // the changedSince/toDescend gate would otherwise ship green.) + const pollSame = fakeClient(widgetsFixture) + requireGithubRuntime().clientFactory = () => pollSame + const pollNoop = await runCaptureTick(requireGithubRuntime(), { mode: 'poll' }) + check('poll tick with no new activity captures zero events', pollNoop.events === 0 && pollNoop.errors.length === 0, pollNoop) + check('poll tick skips PR sub-resource descent when changedSince admits no PR', + pollSame.calls.prFiles === 0 && pollSame.calls.prReviews === 0 && pollSame.calls.prCommits === 0, pollSame.calls) + + const pollAdvanced = fakeClient({ + repos: { + 'acme/widgets': { + pulls: [{ number: 2, state: 'open', merged_at: null, created_at: '2026-01-02T00:00:00Z', updated_at: '2026-02-01T00:00:00Z', user: { login: 'bob', type: 'User' } }], + prFiles: { 2: ['src/v2.js'] }, + prReviews: { 2: [{ id: 101, state: 'APPROVED', submitted_at: '2026-02-01T00:00:00Z', user: { login: 'carol', type: 'User' } }] }, + commits: [{ sha: 'b'.repeat(40), author: { login: 'alice', type: 'User' }, commit: { author: { date: '2026-02-01T00:00:00Z' } } }], + commitFiles: { ['b'.repeat(40)]: ['src/v2.js'] }, + }, + }, + }) + requireGithubRuntime().clientFactory = () => pollAdvanced + const pollNew = await runCaptureTick(requireGithubRuntime(), { mode: 'poll' }) + check('poll tick captures only the new activity once the high-water advances', pollNew.events > 0 && pollNew.errors.length === 0, pollNew) + check('poll tick descends PR sub-resources when changedSince admits the PR', pollAdvanced.calls.prFiles > 0, pollAdvanced.calls) + const cursorAfterPoll = JSON.parse(fs.readFileSync(cursorsPath, 'utf8')) + check('poll advanced the persisted pulls cursor high-water', + cursorAfterPoll.repos['acme/widgets'].since.pulls === '2026-02-01T00:00:00Z', cursorAfterPoll.repos['acme/widgets'].since) + // ----- Restart durability: catalog, ledger, identity survive ----- await daemon.stop() const daemon2 = await runServerDaemon({ env, installSignalHandlers: false })