diff --git a/bin/admin.js b/bin/admin.js index aaace9a..7ed0ddd 100644 --- a/bin/admin.js +++ b/bin/admin.js @@ -18,6 +18,9 @@ * hypaware-server-admin activate-config * hypaware-server-admin query "SELECT ..." * hypaware-server-admin run-mover | run-archive [--force] | run-eviction + * hypaware-server-admin github-backfill [owner/repo ...] + * hypaware-server-admin graph-project [--source ] [--dry-run] + * hypaware-server-admin graph-neighbors [--depth N] [--type T] [--edge-type T] [--direction out|in|both] [--limit N] * * Env: HYPSERVER_URL (default http://127.0.0.1:8740), HYPSERVER_ADMIN_TOKEN */ @@ -122,6 +125,30 @@ switch (command) { case 'run-eviction': await call('POST', '/v1/admin/eviction/run') break + case 'github-backfill': + // Positional owner/repo args narrow the configured selection; none = all. + await call('POST', '/v1/admin/github/backfill', { repos: rest.filter((a) => !a.startsWith('--')) }) + break + case 'graph-project': + await call('POST', '/v1/admin/graph/project', { + ...(flagValue('--source') ? { source: flagValue('--source') } : {}), + dry_run: rest.includes('--dry-run'), + }) + break + case 'graph-neighbors': { + const depth = flagValue('--depth') + const limit = flagValue('--limit') + const node = rest[0] && !rest[0].startsWith('--') ? rest[0] : '' + await call('POST', '/v1/admin/graph/neighbors', { + node, + ...(flagValue('--type') ? { type: flagValue('--type') } : {}), + ...(flagValue('--edge-type') ? { edge_types: [flagValue('--edge-type')] } : {}), + ...(flagValue('--direction') ? { direction: flagValue('--direction') } : {}), + ...(depth !== undefined ? { depth: Number(depth) } : {}), + ...(limit !== undefined ? { limit: Number(limit) } : {}), + }) + break + } default: console.error('unknown command; see header comment for usage') process.exit(2) diff --git a/llp/0010-server-side-graph.decision.md b/llp/0010-server-side-graph.decision.md new file mode 100644 index 0000000..68d238c --- /dev/null +++ b/llp/0010-server-side-graph.decision.md @@ -0,0 +1,149 @@ +# LLP 0010: Server-Side Graph — GitHub Capture, T0 Projection, and Neighbors as Admin Operations + +**Type:** Decision +**Status:** Active +**Systems:** Query, Core +**Author:** Phil / Claude +**Date:** 2026-06-22 +**Related:** LLP 0002, LLP 0004, LLP 0006 + +> The server now holds a context graph, not just forwarded logs. It loads the +> bundled `@hypaware/context-graph` plugin and a vendored `@hypaware/github` +> source plugin into its own kernel, captures GitHub activity directly, and +> exposes `graph project` / `graph neighbors` as admin operations — so the +> forwarded LLM sessions ([hypaware LLP 0032 git-bridge](../../hypaware/llp)) +> and GitHub activity converge into one node/edge graph on the server. + +## Why on the server at all + +The admin attach is a read-only SQL endpoint ([LLP 0006](./0006-admin-query-attach.decision.md)): +it can read the server's cache + archive but cannot invoke a plugin command. And +the graph's whole point is convergence — `Repo`/`Actor`/`Commit`/`File` nodes +minted from GitHub activity sharing content-addressed ids with nodes minted from +LLM sessions (the `git_remote`/`head_sha`/`repo_root` columns 1.6 added to +`ai_gateway_messages`). That convergence only happens where both datasets live in +one kernel cache. The forwarded LLM logs live in the **server's** cache. So the +projection must run there, not in a side client reading the server over SQL. + +## Decision + +**The server activates the graph plugins in its own +kernel.** `pluginSelection()` ([boot.js](../src/boot.js)) gains two entries +before the control plane: `@hypaware/context-graph` (from the bundled workspace +that ships inside the `hypaware` dependency) and `@hypaware/github` (vendored — +see below). The kernel resolver orders context-graph first from github's +`requires.plugins`, so github's eager `requireCapability('hypaware.context-graph')` +resolves. The plugins activate for their **registrations** — github_events / +node / edge datasets, the github T0 contract, and the `github backfill` / +`graph project` / `graph neighbors` commands — exactly as on a client. + +Alongside them the server also loads `@hypaware/ai-gateway-graph` — the +cross-source connector that maps the forwarded `ai_gateway_messages` (the LLM +sessions) into the same node/edge graph. It is pure contract registration (no +source, no config; requires context-graph), and its bridge-ready +`Repo`/`Commit`/`File` keys are content-addressed identically to github's, so a +single `graph project` run spans both sources and the two converge by id — the +GitHub↔LLM join 1.6's git-bridge (`git_remote`/`head_sha`/`repo_root`) was built +for (hypaware LLP 0032). + +This reuses the kernel-host substrate of [LLP 0002](./0002-kernel-reuse.decision.md): +the server expresses behavior as activated plugins, not bespoke code. Projection +and traversal are the graph plugin's own **pure functions** — `projectGraph()` and +`queryNeighbors()` — invoked over the server kernel's `runtime.query` / +`runtime.storage` handles (the same handles `executeSql` uses). The registered +contracts are read through the plugin's process-global registry singleton +(`requireGraphRuntime().registry.list()`) — the identical seam the in-process +`hyp graph project` command uses — so nothing in the `hypaware` repo had to change. +The deep reach into the bundled plugin tree is funnelled through the one sanctioned +shim ([kernel/shim.js](../src/kernel/shim.js), [LLP 0002#host-shim](./0002-kernel-reuse.decision.md#host-shim)), +anchored on the same `bundledWorkspaceDir` the loader used so the singleton is the +very instance activation populated. + +**The github plugin is vendored** under [`plugins/github/`](../plugins/github/) +rather than added as a dependency: it has no git remote yet, and `plugins/` is +already on the server's plugin path and already copied by the Docker build, so +vendoring adds the plugin with no Dockerfile or build-context change. Keep exactly +one copy of its source tree — the runtime singletons depend on module identity. + +**The server pulls GitHub directly; capture is an +admin one-shot, not a daemon source.** The github plugin registers a poll source, +but the server never starts it — and the wrapped source registry would suppress it +anyway ([LLP 0002](./0002-kernel-reuse.decision.md), only `@hypaware-server/*` may +start sources). Instead, `github backfill` runs on demand via the admin surface, +calling the plugin's `runCaptureTick(runtime, { mode: 'backfill' })` and then +flushing the `github_events` cache table so the freshly captured rows are queryable +on the same kernel. Repo selection (`orgs` / `repos` / `ignore`) comes from server +env (`HYPSERVER_GITHUB_*`), injected as the plugin's `[github]` config section by +`githubSection()` in boot. The GitHub token stays in the box environment under the +name in `token_env` (default `GITHUB_TOKEN`) and is read by the github client at +request time — never in config, consistent with the secrets-never-in-config +invariant ([LLP 0000](./0000-hypaware-server.explainer.md)). The box therefore now +makes outbound calls to the GitHub API; this is the one network egress the server +performs beyond its archive destination. + +**Graph operations are admin operations on the server +kernel, the precedent [LLP 0006](./0006-admin-query-attach.decision.md) set for SQL.** +`POST /v1/admin/github/backfill`, `/v1/admin/graph/project`, and +`/v1/admin/graph/neighbors` ([routes-admin.js](../src/http/routes-admin.js)) sit +behind the same admin bearer token, alongside the mover / archive / eviction +escape hatches. They are *operations*, not fleet config ([LLP 0009](./0009-remote-config.spec.md)): +nothing here is served to gateways. Each has a one-line `hypaware-server-admin` +wrapper for docker-exec workflows ([LLP 0008#admin-visibility](./0008-fleet-enrollment.spec.md#admin-visibility)). +Projection is idempotent — content-addressed ids plus pre-write dedup mean a +re-run with no new source data writes zero rows — so an operator firing +`graph-project` after each backfill (or after a fresh forward of LLM logs) just +folds new activity into the existing graph. + +**Graph datasets keep their own read closures; +the server's date-partition synthesis is only for forwarded ingest.** The +catalog-backed registry ([LLP 0004#catalog-backed-registry](./0004-dataset-catalog.spec.md#catalog-backed-registry)) +synthesizes `discoverPartitions` / `createDataSource` that read `date=` partitions — +correct for the wire-ingested datasets the mover fills (logs, traces, metrics, +ai_gateway_messages), and it deliberately discards the client-side closures those +plugins ship because they assume a client layout. But `github_events` is +`source=`-partitioned and `node` / `edge` are `graph_v1`-partitioned, and all three +are written **directly** into the cache by the in-kernel plugins (never wire-ingested), +each carrying a `createDataSource` authored for its own layout. So +[registry.js](../src/catalog/registry.js) routes datasets from the self-managed +plugins (`@hypaware/server`, `@hypaware/context-graph`, `@hypaware/github`) to the +`custom` map — keeping their closures — and everything else to the catalog seed + +date-partition synthesis. The split is by plugin, not by closure presence, because +`@hypaware/ai-gateway` also ships a `createDataSource` yet must take the synthesized +path. Without this, projection and neighbors silently read zero rows even though the +data is on disk. + +## Consequences and bounds + +- `github_events` / `node` / `edge` are cache-resident and never archived (no wire + signal, no archive ack). Ack-coupled eviction ([LLP 0003](./0003-spool-durability.spec.md)) + refuses to evict unarchived partitions, so they are not lost; they also are not + `date=`-partitioned, so the date-based eviction sweep does not touch them. +- A concurrent backfill and projection stay safe: pre-write dedup plus + content-addressed ids keep projection idempotent, and `graph compact` (a graph + command) cleans any residue from interleaved runs. +- The hermetic smoke test ([test/smoke.js](../test/smoke.js)) drives the full chain + — inject an in-memory GitHub client into the runtime singleton, `github backfill`, + `graph project` (asserting idempotence on re-run), and `graph neighbors` — with no + network, exercising the real cache append + read path. It also pins the surfaces a + backfill-only path leaves dormant: the per-repo cursor sidecar round-trip and a + `poll`-mode tick (no-change captures nothing and skips PR descent; an advanced + high-water captures only new rows and moves the cursor on); `resolveRepos` / + `captureRepos` repo-set resolution and per-repo error isolation; the `graph project` + `--source` filter and `dry_run` safe-preview; the `graph neighbors` + direction/`edge_types`/`limit`/`depth` parameters; and the `hypaware-server-admin` + CLI wrappers, spawned end-to-end against the running daemon. +- **`plugins/github/` is vendored third-party code and is out of scope for this + repo's `/ref-check`.** Its source carries `@ref LLP NNNN` annotations that resolve + against the **`@hypaware/github`** corpus (where, e.g., LLP 0002 is "capture-model"), + not this server's (where LLP 0002 is "kernel-reuse"). Those refs are honest in their + own repo; treat the vendored tree like a dependency and exclude it when validating + this repo's references. The server's own code carries the `@ref LLP 0010` annotations + for everything decided here. + +## Not in scope + +Ongoing server-side polling (the github poll source stays dormant by design), +enrichment beyond T0 (a non-goal for these datasets), and serving the github plugin +to fleet clients (clients do not capture GitHub in this design). A future +metadata-level partition spec for the graph datasets is deferred, as for the +archive ([LLP 0005#day-aligned-exports](./0005-archive-sink.decision.md#day-aligned-exports)). diff --git a/package-lock.json b/package-lock.json index 8a8d29b..f6a7d45 100644 --- a/package-lock.json +++ b/package-lock.json @@ -20,28 +20,29 @@ } }, "../hypaware": { - "version": "1.3.0", + "version": "1.6.0", "dependencies": { "@aws-sdk/client-s3": "3.1053.0", "@aws-sdk/credential-provider-ini": "3.972.43", "hyparquet": "1.26.0", "hyparquet-compressors": "1.1.1", - "icebird": "0.8.5", - "squirreling": "0.12.23" + "icebird": "0.8.10", + "squirreling": "0.12.24" }, "bin": { "hyp": "bin/hypaware.js", "hypaware": "bin/hypaware.js" }, "devDependencies": { - "@types/node": "25.9.2", + "@types/node": "26.0.0", "typescript": "6.0.3" }, "engines": { "node": ">=20" }, "optionalDependencies": { - "hyparquet-writer": "0.15.6" + "hyparquet-writer": "0.15.6", + "hypvector": "0.1.1" } }, "node_modules/fzstd": { diff --git a/plugins/github/hypaware.plugin.json b/plugins/github/hypaware.plugin.json new file mode 100644 index 0000000..c2ffe0a --- /dev/null +++ b/plugins/github/hypaware.plugin.json @@ -0,0 +1,34 @@ +{ + "schema_version": 1, + "name": "@hypaware/github", + "version": "0.1.0", + "hypaware_api": "^1.0.0", + "runtime": "node", + "node_engine": ">=20", + "entrypoint": "./src/index.js", + "description": "GitHub source plugin built for the context-graph. Captures GitHub activity (issues, pull requests, commits, files, reviews, comments) into a thin append-only github_events skeleton and bundles a T0 projection contract that maps it into the node/edge graph with bridge-ready natural keys. Capture is poll/backfill/sync; projection is the graph plugin's `hyp graph project`.", + "permissions": ["network", "read_state", "write_state"], + "requires": { + "plugins": { + "@hypaware/context-graph": "^0.1.0" + }, + "capabilities": { + "hypaware.context-graph": "^1.0.0" + } + }, + "contributes": { + "datasets": [ + { "name": "github_events", "summary": "Append-only GitHub activity skeleton (structural columns only; content stays in GitHub)" } + ], + "config_sections": [ + { "section": "github", "summary": "Repo/org selection, ignore list, optional poll interval, and token env-var name" } + ], + "sources": [ + { "name": "github", "summary": "Ongoing poll (opt-in via poll_interval): append events past each repo's cursor" } + ], + "commands": [ + { "name": "github backfill", "summary": "Pull a repo's full history into github_events (cold-start)" }, + { "name": "github sync", "summary": "Run one poll tick now, off the daemon" } + ] + } +} diff --git a/plugins/github/src/capture.js b/plugins/github/src/capture.js new file mode 100644 index 0000000..d996fb4 --- /dev/null +++ b/plugins/github/src/capture.js @@ -0,0 +1,407 @@ +// @ts-check + +import { commitKey, repoKey, str } from './keys.js' +import { cursorFor } from './cursors.js' + +/** + * Capture orchestration: turn the configured repo selection into appended + * `github_events` rows. Storage-agnostic — `append` and the GitHub `client` + * are injected, so tests drive the whole pipeline with a fake client and a + * row collector (no network, no cache). + * + * Three `ignore[]` invariants are enforced **here, at capture** (LLP 0002): + * rows for an ignored repo never enter `github_events`, so they never reach the + * graph; ignore is forward-only (it filters the *fetch*, it does not retract + * already-captured rows); and it is exact `owner/repo` match (case-insensitive, + * since GitHub is). There is no projection-time repo filter. + * + * @ref LLP 0002#three-invariants [implements] — ignore is enforced at capture, forward-only, exact-match + * + * @import { CursorState, GithubClient, GithubCommit, GithubConfig, PluginLogger, RepoCursor } from './types.d.ts' + */ + +/** + * Resolve the repo set to capture: `repos[]` ∪ (every repo enumerated from each + * `orgs[]` entry), minus `ignore[]` (which always wins). Returns canonical + * lowercased `owner/repo` names, deduped. + * + * @param {GithubConfig} config + * @param {GithubClient} client + * @param {PluginLogger} log + * @returns {Promise} + */ +export async function resolveRepos(config, client, log) { + const ignore = new Set(config.ignore.map((r) => r.toLowerCase())) + /** @type {Set} */ + const selected = new Set() + + for (const full of config.repos) { + const key = repoKey(full) + if (key) selected.add(key) + } + + for (const org of config.orgs) { + let enumerated + try { + enumerated = await client.listOrgRepos(org) + } catch (err) { + log.error('github.org_enumeration_failed', { org, error: errMessage(err) }) + continue + } + for (const full of enumerated) { + const key = repoKey(full) + if (key) selected.add(key) + } + } + + return [...selected].filter((repo) => !ignore.has(repo)).sort() +} + +/** + * Capture every selected repo. `mode` controls the starting cursor: + * - `backfill` resets each repo's cursor to empty (fetch full history) but + * still persists the advanced cursor, seeding the poller so it resumes past + * history rather than re-fetching it; + * - `poll` starts from the persisted cursor (incremental). + * + * Per-repo errors are caught and recorded, not thrown, so one bad repo never + * kills a whole tick. The caller persists `cursors` after this resolves. + * + * @param {object} args + * @param {GithubClient} args.client + * @param {GithubConfig} args.config + * @param {CursorState} args.cursors + * @param {(rows: Record[]) => Promise} args.append + * @param {PluginLogger} args.log + * @param {'backfill' | 'poll'} args.mode + * @param {string[]} [args.only] restrict to these repos (e.g. `hyp github backfill owner/repo`) + * @returns {Promise<{ repos: number, events: number, errors: Array<{ repo: string, error: string }> }>} + */ +export async function captureRepos({ client, config, cursors, append, log, mode, only }) { + let repos = await resolveRepos(config, client, log) + if (only && only.length > 0) { + const onlySet = new Set(only.map((r) => r.toLowerCase())) + repos = repos.filter((r) => onlySet.has(r)) + } + + let events = 0 + /** @type {Array<{ repo: string, error: string }>} */ + const errors = [] + + for (const repo of repos) { + if (mode === 'backfill') cursors.repos[repo] = {} + const cursor = cursors.repos[repo] ?? (cursors.repos[repo] = {}) + try { + const n = await captureRepo({ client, repo, cursor, mode, append, log }) + events += n + } catch (err) { + const message = errMessage(err) + errors.push({ repo, error: message }) + log.error('github.repo_capture_failed', { repo, error: message }) + } + // Persist the advanced cursor onto the shared state after each repo. + cursors.repos[repo] = cursor + } + + return { repos: repos.length, events, errors } +} + +/** + * Capture one repo through every pass, appending `github_events` rows. Returns + * the number of event rows written. + * + * @param {object} args + * @param {GithubClient} args.client + * @param {string} args.repo canonical `owner/repo` + * @param {RepoCursor} args.cursor + * @param {'backfill' | 'poll'} args.mode + * @param {(rows: Record[]) => Promise} args.append + * @param {PluginLogger} args.log + * @returns {Promise} + */ +async function captureRepo({ client, repo, cursor, mode, append }) { + const [owner, name] = repo.split('/') + /** @type {Set} */ + const emitted = new Set() + /** @type {Set} */ + const prNumbers = new Set() + let written = 0 + + /** @param {Record[]} rows */ + async function flush(rows) { + const fresh = rows.filter((r) => { + const id = String(r.event_id) + if (emitted.has(id)) return false + emitted.add(id) + return true + }) + if (fresh.length === 0) return + await append(fresh) + written += fresh.length + } + + // --- issues (non-PR; PRs come from the pulls pass) --- + const issues = await client.listIssues(owner, name, cursor) + await flush( + issues + .filter((it) => !it.pull_request) + .map((it) => issueRow(repo, it)), + ) + + // --- pull requests + their sub-resources --- + const pulls = await client.listPullRequests(owner, name, cursor) + for (const pr of pulls) prNumbers.add(pr.number) + await flush(pulls.map((pr) => pullRow(repo, pr))) + + // Only descend into a PR's sub-resources when the PR is new/changed since the + // last run (backfill processes all). Bounds the per-tick N+1 over PRs. + const pullsHigh = cursor.since?.pulls + const toDescend = mode === 'backfill' ? pulls : pulls.filter((pr) => changedSince(pr, pullsHigh)) + for (const pr of toDescend) { + const files = await client.listPullRequestFiles(owner, name, pr.number) + await flush(files.map((path) => prFileRow(repo, pr, path))) + + const reviews = await client.listPullRequestReviews(owner, name, pr.number) + await flush(reviews.map((rv) => reviewRow(repo, pr, rv))) + + const prCommits = await client.listPullRequestCommits(owner, name, pr.number) + await flush(prCommits.map((c) => commitRow(repo, c, pr.number))) + } + advancePullsHigh(cursor, pulls) + + // --- repo-level commits + their files --- + const commits = await client.listCommits(owner, name, cursor) + await flush(commits.map((c) => commitRow(repo, c, null))) + for (const c of commits) { + const files = await client.listCommitFiles(owner, name, c.sha) + await flush(files.map((path) => commitFileRow(repo, c, path))) + } + + // --- conversation comments (on issues AND PRs) --- + const comments = await client.listIssueComments(owner, name, cursor) + await flush( + comments + .map((c) => commentRow(repo, c, prNumbers)) + .filter((r) => r !== null), + ) + + return written +} + +/* ---------------------------------------------------------------- row builders */ + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubIssue} it + * @returns {Record} + */ +function issueRow(repo, it) { + return base('issue', `issue:${repoKey(repo)}#${it.number}`, repo, { + actor_login: loginOf(it.user), + actor_type: typeOf(it.user), + number: it.number, + state: str(it.state), + created_at: str(it.created_at), + }) +} + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubPull} pr + * @returns {Record} + */ +function pullRow(repo, pr) { + const merged = pr.merged_at != null + return base('pull_request', `pr:${repoKey(repo)}#${pr.number}`, repo, { + actor_login: loginOf(pr.user), + actor_type: typeOf(pr.user), + number: pr.number, + state: merged ? 'merged' : str(pr.state), + created_at: str(pr.created_at), + payload: { merged, draft: pr.draft === true }, + }) +} + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubPull} pr + * @param {string} path + * @returns {Record} + */ +function prFileRow(repo, pr, path) { + return base('pull_request_file', `prfile:${repoKey(repo)}#${pr.number}:${path}`, repo, { + number: pr.number, + path, + created_at: str(pr.created_at), + }) +} + +/** + * @param {string} repo + * @param {import('./types.d.ts').GithubPull} pr + * @param {import('./types.d.ts').GithubReview} rv + * @returns {Record} + */ +function reviewRow(repo, pr, rv) { + return base('review', `review:${rv.id}`, repo, { + actor_login: loginOf(rv.user), + actor_type: typeOf(rv.user), + review_id: rv.id, + review_state: str(rv.state), + pr_number: pr.number, + created_at: str(rv.submitted_at), + }) +} + +/** + * A commit row. `pr_number` is set when the commit was captured under a PR + * (enabling `PullRequest -references-> Commit`); the repo-level pass passes null. + * + * @param {string} repo + * @param {GithubCommit} c + * @param {number | null} prNumber + * @returns {Record} + */ +function commitRow(repo, c, prNumber) { + const id = prNumber == null ? `commit:${commitKey(c.sha)}` : `commit:${commitKey(c.sha)}:pr${prNumber}` + return base('commit', id, repo, { + actor_login: loginOf(c.author), + actor_type: typeOf(c.author), + sha: str(c.sha), + pr_number: prNumber, + created_at: commitDate(c), + }) +} + +/** + * @param {string} repo + * @param {GithubCommit} c + * @param {string} path + * @returns {Record} + */ +function commitFileRow(repo, c, path) { + return base('commit_file', `commitfile:${commitKey(c.sha)}:${path}`, repo, { + sha: str(c.sha), + path, + created_at: commitDate(c), + }) +} + +/** + * A conversation comment. Discriminated into `pull_request_comment` vs + * `issue_comment` by whether its subject number was seen as a PR this run. + * + * @param {string} repo + * @param {import('./types.d.ts').GithubComment} c + * @param {Set} prNumbers + * @returns {Record | null} + */ +function commentRow(repo, c, prNumbers) { + const number = numberFromIssueUrl(c.issue_url) + if (number == null) return null + const onPr = prNumbers.has(number) + return base(onPr ? 'pull_request_comment' : 'issue_comment', `comment:${c.id}`, repo, { + actor_login: loginOf(c.user), + actor_type: typeOf(c.user), + number, + created_at: str(c.created_at), + }) +} + +/** + * Build a row with every `github_events` column present (the cache writer wants + * a stable shape); unset columns are null. `extra` overrides the nulls. + * + * @param {string} eventType + * @param {string} eventId + * @param {string} repo + * @param {Record} extra + * @returns {Record} + */ +function base(eventType, eventId, repo, extra) { + return { + event_id: eventId, + event_type: eventType, + repo, + actor_login: null, + actor_type: null, + number: null, + sha: null, + path: null, + review_id: null, + review_state: null, + state: null, + pr_number: null, + created_at: null, + payload: null, + ...extra, + } +} + +/* ---------------------------------------------------------------- helpers */ + +/** @param {import('./types.d.ts').GithubActor | null | undefined} actor */ +function loginOf(actor) { + return actor && typeof actor.login === 'string' ? actor.login : null +} + +/** @param {import('./types.d.ts').GithubActor | null | undefined} actor */ +function typeOf(actor) { + return actor && typeof actor.type === 'string' ? actor.type : null +} + +/** @param {GithubCommit} c @returns {string | null} */ +function commitDate(c) { + const author = c.commit && typeof c.commit.author === 'object' ? c.commit.author : null + return author && typeof author.date === 'string' ? author.date : null +} + +/** + * Extract the trailing issue/PR number from a comment's `issue_url` + * (`.../issues/123`). + * + * @param {string | undefined} issueUrl + * @returns {number | null} + */ +function numberFromIssueUrl(issueUrl) { + if (!issueUrl) return null + const m = /\/issues\/(\d+)(?:$|[?#])/.exec(issueUrl) + return m ? Number(m[1]) : null +} + +/** + * @param {import('./types.d.ts').GithubPull} pr + * @param {string | undefined} high + * @returns {boolean} + */ +function changedSince(pr, high) { + if (!high) return true + const updated = updatedAt(pr) + return updated == null || updated > high +} + +/** + * @param {CursorState['repos'][string]} cursor + * @param {import('./types.d.ts').GithubPull[]} pulls + */ +function advancePullsHigh(cursor, pulls) { + let max = cursor.since?.pulls + for (const pr of pulls) { + const t = updatedAt(pr) + if (t && (!max || t > max)) max = t + } + if (max) { + if (!cursor.since) cursor.since = {} + cursor.since.pulls = max + } +} + +/** @param {import('./types.d.ts').GithubPull} pr @returns {string | null} */ +function updatedAt(pr) { + return typeof pr.updated_at === 'string' ? pr.updated_at : typeof pr.created_at === 'string' ? pr.created_at : null +} + +/** @param {unknown} err @returns {string} */ +function errMessage(err) { + return err instanceof Error ? err.message : String(err) +} diff --git a/plugins/github/src/commands.js b/plugins/github/src/commands.js new file mode 100644 index 0000000..1f7cfe7 --- /dev/null +++ b/plugins/github/src/commands.js @@ -0,0 +1,118 @@ +// @ts-check + +import { requireGithubRuntime } from './runtime.js' +import { runCaptureTick } from './tick.js' + +/** + * @import { CommandRunContext } from './types.d.ts' + */ + +/** `owner/repo` — exactly one slash, non-empty halves, no whitespace. */ +const REPO_SLUG = /^[^/\s]+\/[^/\s]+$/ + +/** + * `hyp github` — usage banner. + * + * @param {string[]} _argv + * @param {CommandRunContext} ctx + * @returns {Promise} + */ +export async function runGithub(_argv, ctx) { + ctx.stdout.write( + 'hyp github \n' + + ' backfill [owner/repo ...] pull full history into github_events (cold-start)\n' + + ' sync run one poll tick now (off the daemon)\n' + + "\nthen run 'hyp graph project' to project github_events into the node/edge graph\n", + ) + return 0 +} + +/** + * `hyp github backfill [owner/repo ...]` — the deliberate cold-start pull of + * full history (polling is forward-only, so a freshly-configured repo has years + * of history a poller would never see — LLP 0002). With no positional repos it + * backfills the whole configured selection. Fills `github_events` only; + * projection is a separate `hyp graph project`. + * + * @param {string[]} argv + * @param {CommandRunContext} ctx + * @returns {Promise} + */ +export async function runGithubBackfill(argv, ctx) { + const parsed = parseRepoArgv(argv) + if (!parsed.ok) { + ctx.stderr.write(`hyp github backfill: ${parsed.error}\n`) + return 2 + } + try { + const runtime = requireGithubRuntime() + const only = parsed.repos.length > 0 ? parsed.repos : undefined + + if (runtime.config.repos.length === 0 && runtime.config.orgs.length === 0) { + ctx.stderr.write('hyp github backfill: no repos configured ([github] repos / orgs are empty)\n') + return 1 + } + + const result = await runCaptureTick(runtime, { mode: 'backfill', only }) + ctx.stdout.write(`github backfill: ${result.events} event(s) across ${result.repos} repo(s)\n`) + reportErrors(ctx, result.errors) + if (only && result.repos === 0) { + ctx.stderr.write(`hyp github backfill: none of [${only.join(', ')}] are in the configured selection\n`) + return 1 + } + ctx.stdout.write("run 'hyp graph project' to project github_events into the graph\n") + return result.errors.length > 0 ? 1 : 0 + } catch (err) { + ctx.stderr.write(`hyp github backfill: ${errMessage(err)}\n`) + return 1 + } +} + +/** + * `hyp github sync` — run one poll tick now, off the daemon (the manual + * analogue of the ongoing source; for tests and demos — LLP 0002). + * + * @param {string[]} _argv + * @param {CommandRunContext} ctx + * @returns {Promise} + */ +export async function runGithubSync(_argv, ctx) { + try { + const runtime = requireGithubRuntime() + const result = await runCaptureTick(runtime, { mode: 'poll' }) + ctx.stdout.write(`github sync: ${result.events} event(s) across ${result.repos} repo(s)\n`) + reportErrors(ctx, result.errors) + return result.errors.length > 0 ? 1 : 0 + } catch (err) { + ctx.stderr.write(`hyp github sync: ${errMessage(err)}\n`) + return 1 + } +} + +/** + * @param {string[]} argv + * @returns {{ ok: true, repos: string[] } | { ok: false, error: string }} + */ +function parseRepoArgv(argv) { + /** @type {string[]} */ + const repos = [] + for (const token of argv) { + if (token.startsWith('--')) return { ok: false, error: `unknown flag ${token}` } + if (!REPO_SLUG.test(token)) return { ok: false, error: `expected "owner/repo", got ${token}` } + repos.push(token) + } + return { ok: true, repos } +} + +/** + * @param {CommandRunContext} ctx + * @param {Array<{ repo: string, error: string }>} errors + */ +function reportErrors(ctx, errors) { + for (const e of errors) ctx.stderr.write(` ! ${e.repo}: ${e.error}\n`) +} + +/** @param {unknown} err @returns {string} */ +function errMessage(err) { + return err instanceof Error ? err.message : String(err) +} diff --git a/plugins/github/src/config.js b/plugins/github/src/config.js new file mode 100644 index 0000000..2651a96 --- /dev/null +++ b/plugins/github/src/config.js @@ -0,0 +1,174 @@ +// @ts-check + +/** + * Config validation for the `[github]` section. Pure and dependency-free + * (mirrors the enrich / embedder / vector-search validators): returns a + * normalized config or a list of `github_config_invalid` errors. + * + * @import { GithubConfig, GithubConfigError, GithubConfigResult } from './types.d.ts' + */ + +export const CONFIG_DEFAULTS = Object.freeze({ + // The env-var NAME the token is read from at call time — never the token + // itself. Config carries only the name; the value is resolved in the client + // and never logged (LLP 0001 / hypaware LLP 0028 credential posture). + token_env: 'GITHUB_TOKEN', +}) + +/** `owner/repo` — exactly one slash, non-empty halves, no whitespace. */ +const REPO_SLUG = /^[^/\s]+\/[^/\s]+$/ +/** A bare org / owner login — no slash, no whitespace. */ +const LOGIN = /^[^/\s]+$/ +/** A POSIX env-var name (what `token_env` holds — the NAME, not the secret). */ +const ENV_NAME = /^[A-Za-z_][A-Za-z0-9_]*$/ +/** A duration like `10m`, `30s`, `1h`, `500ms`. */ +const DURATION = /^(\d+)(ms|s|m|h)$/ + +/** + * @param {unknown} value + * @returns {GithubConfigResult} + */ +export function validateGithubConfig(value) { + /** @type {GithubConfigError[]} */ + const errors = [] + + if (value !== undefined && (value === null || typeof value !== 'object' || Array.isArray(value))) { + errors.push(invalid('', 'github config must be an object')) + return { ok: false, errors } + } + const raw = /** @type {Record} */ (value ?? {}) + + const repos = readSlugArray(raw, 'repos', errors) ?? [] + const orgs = readLoginArray(raw, 'orgs', errors) ?? [] + const ignore = readSlugArray(raw, 'ignore', errors) ?? [] + const token_env = readEnvName(raw, 'token_env', errors) ?? CONFIG_DEFAULTS.token_env + const poll_interval = readDuration(raw, 'poll_interval', errors) + + if (errors.length > 0) return { ok: false, errors } + + /** @type {GithubConfig} */ + const config = { + repos, + orgs, + ignore, + token_env, + ...(poll_interval !== undefined ? { poll_interval } : {}), + } + return { ok: true, config } +} + +/** + * Parse a duration string (`10m`) to milliseconds. Returns null on a malformed + * value. The source uses this to schedule its poll; validation uses it to + * reject bad `poll_interval`s up front. + * + * @param {string} value + * @returns {number | null} + */ +export function parseInterval(value) { + const m = DURATION.exec(value) + if (!m) return null + const n = Number(m[1]) + const unit = m[2] + const mult = unit === 'ms' ? 1 : unit === 's' ? 1000 : unit === 'm' ? 60_000 : 3_600_000 + return n * mult +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string[] | undefined} + */ +function readSlugArray(raw, key, errors) { + const arr = readArray(raw, key, errors) + if (arr === undefined) return undefined + /** @type {string[]} */ + const out = [] + for (const entry of arr) { + if (typeof entry !== 'string' || !REPO_SLUG.test(entry)) { + errors.push(invalid(`/${key}`, `${key} entries must be "owner/repo" strings`)) + return undefined + } + out.push(entry) + } + return out +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string[] | undefined} + */ +function readLoginArray(raw, key, errors) { + const arr = readArray(raw, key, errors) + if (arr === undefined) return undefined + /** @type {string[]} */ + const out = [] + for (const entry of arr) { + if (typeof entry !== 'string' || !LOGIN.test(entry)) { + errors.push(invalid(`/${key}`, `${key} entries must be bare org logins (no slash)`)) + return undefined + } + out.push(entry) + } + return out +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {unknown[] | undefined} + */ +function readArray(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (!Array.isArray(v)) { + errors.push(invalid(`/${key}`, `${key} must be an array`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string | undefined} + */ +function readEnvName(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'string' || !ENV_NAME.test(v)) { + errors.push(invalid(`/${key}`, `${key} must be an environment variable NAME (e.g. "GITHUB_TOKEN"), not a token`)) + return undefined + } + return v +} + +/** + * @param {Record} raw + * @param {string} key + * @param {GithubConfigError[]} errors + * @returns {string | undefined} + */ +function readDuration(raw, key, errors) { + const v = raw[key] + if (v === undefined) return undefined + if (typeof v !== 'string' || parseInterval(v) === null) { + errors.push(invalid(`/${key}`, `${key} must be a duration like "10m", "30s", or "1h"`)) + return undefined + } + return v +} + +/** + * @param {string} pointer + * @param {string} message + * @returns {GithubConfigError} + */ +function invalid(pointer, message) { + return { pointer, message, errorKind: 'github_config_invalid' } +} diff --git a/plugins/github/src/cursors.js b/plugins/github/src/cursors.js new file mode 100644 index 0000000..c61ac23 --- /dev/null +++ b/plugins/github/src/cursors.js @@ -0,0 +1,109 @@ +// @ts-check + +import fs from 'node:fs' +import path from 'node:path' +import { randomUUID } from 'node:crypto' + +/** + * Per-repo capture cursors, persisted as a single sidecar JSON under the + * plugin's state dir (the per-session-watermark pattern from + * `@hypaware/context-graph-enrich`, applied to repos). + * + * **Why a sidecar, not `github_events` columns.** `github_events` is the + * append-only activity *skeleton* (LLP 0005) — one event per row, no mutable + * bookkeeping. A poll cursor is mutable per-repo control state, not an + * observed event, so it lives beside the table, not in it. Backfill and the + * ongoing poll share the same cursor, so a backfilled repo's poller resumes + * past history rather than re-fetching it. + * + * @ref LLP 0002#cursoring [implements] — cursored per repo, shared by backfill + poll; cursors are sidecar state, not event columns + * + * @import { CursorState, RepoCursor } from './types.d.ts' + */ + +const STATE_FILE = 'github-cursors.json' +const SCHEMA_VERSION = 1 + +/** + * @param {string} stateDir + * @returns {CursorState} + */ +export function readCursors(stateDir) { + const file = path.join(stateDir, STATE_FILE) + try { + const parsed = JSON.parse(fs.readFileSync(file, 'utf8')) + if (parsed && parsed.schema_version === SCHEMA_VERSION && parsed.repos && typeof parsed.repos === 'object') { + return { schema_version: SCHEMA_VERSION, repos: readRepos(parsed.repos) } + } + } catch { + // Missing, malformed, or an older schema — start clean (a fresh poll + // re-reads from the configured horizon; backfill ignores cursors anyway). + } + return { schema_version: SCHEMA_VERSION, repos: {} } +} + +/** + * The cursor for one repo (`owner/repo`), or an empty cursor when unseen. + * + * @param {CursorState} state + * @param {string} repo + * @returns {RepoCursor} + */ +export function cursorFor(state, repo) { + return state.repos[repo] ?? {} +} + +/** + * @param {string} stateDir + * @param {CursorState} state + */ +export function writeCursors(stateDir, state) { + fs.mkdirSync(stateDir, { recursive: true }) + const file = path.join(stateDir, STATE_FILE) + const tmp = `${file}.tmp-${process.pid}-${randomUUID()}` + fs.writeFileSync(tmp, JSON.stringify(state, null, 2) + '\n', 'utf8') + fs.renameSync(tmp, file) +} + +/** + * @param {unknown} value + * @returns {Record} + */ +function readRepos(value) { + /** @type {Record} */ + const out = {} + for (const [repo, raw] of Object.entries(/** @type {Record} */ (value))) { + const cursor = readRepoCursor(raw) + if (cursor) out[repo] = cursor + } + return out +} + +/** + * @param {unknown} value + * @returns {RepoCursor | null} + */ +function readRepoCursor(value) { + if (!value || typeof value !== 'object') return null + const v = /** @type {Record} */ (value) + /** @type {RepoCursor} */ + const cursor = {} + if (v.since && typeof v.since === 'object') { + const s = /** @type {Record} */ (v.since) + /** @type {NonNullable} */ + const since = {} + for (const k of /** @type {const} */ (['issues', 'commits', 'comments', 'pulls'])) { + if (typeof s[k] === 'string') since[k] = /** @type {string} */ (s[k]) + } + if (Object.keys(since).length > 0) cursor.since = since + } + if (v.etag && typeof v.etag === 'object') { + /** @type {Record} */ + const etag = {} + for (const [k, e] of Object.entries(/** @type {Record} */ (v.etag))) { + if (typeof e === 'string') etag[k] = e + } + if (Object.keys(etag).length > 0) cursor.etag = etag + } + return cursor +} diff --git a/plugins/github/src/dataset.js b/plugins/github/src/dataset.js new file mode 100644 index 0000000..6d50e7c --- /dev/null +++ b/plugins/github/src/dataset.js @@ -0,0 +1,184 @@ +// @ts-check + +import path from 'node:path' + +/** + * @import { ColumnSpec, DatasetDataSourceContext, DatasetDiscoveryContext, DatasetRefreshResult, DatasetRegistration, QueryPartition, QueryStorageService, ExtendedQueryStorageService, AsyncDataSource } from './types.d.ts' + */ + +export const PLUGIN_NAME = '@hypaware/github' +export const DATASET_NAME = 'github_events' +export const PARTITION_LABEL = 'all' + +/** + * The thin **activity skeleton** the T0 contract reads — structural columns + * only, no content (LLP 0005). One append-only table, an `event_type` + * discriminator the rules filter on, and an atomic grain: a commit touching N + * files emits one `commit_file` row per file alongside the `commit` row, so + * every contract rule stays a clean 1:1 map (the ai-gateway `File`-rule shape). + * + * `event_type` values: + * issue | pull_request | commit | commit_file | pull_request_file | + * review | issue_comment | pull_request_comment + * + * No `Repo`/`Actor`/`File` row exists on its own — those nodes are minted by + * the contract from the columns on every event row that mentions them. Content + * (titles, bodies, diffs, comment text) stays in GitHub and is dereferenced via + * the natural keys (LLP 0003); it never lands here. + * + * @ref LLP 0005#concrete-columns [implements] — typed structural columns + discriminator + small payload, no content + * @type {ReadonlyArray} + */ +export const GITHUB_EVENTS_COLUMNS = Object.freeze([ + { name: 'event_id', type: 'STRING', nullable: false }, // deterministic, for in-run dedup + readability + { name: 'event_type', type: 'STRING', nullable: false }, // the discriminator the rules WHERE on + { name: 'repo', type: 'STRING', nullable: false }, // owner/repo as GitHub returns it; the contract normalizes + { name: 'actor_login', type: 'STRING', nullable: true }, + { name: 'actor_type', type: 'STRING', nullable: true }, // User | Bot | Organization (Actor prop) + { name: 'number', type: 'INT64', nullable: true }, // issue / PR / comment subject number + { name: 'sha', type: 'STRING', nullable: true }, // commit sha (full 40-hex) + { name: 'path', type: 'STRING', nullable: true }, // changed-file relpath (commit_file / pull_request_file) + { name: 'review_id', type: 'INT64', nullable: true }, + { name: 'review_state', type: 'STRING', nullable: true }, // APPROVED | CHANGES_REQUESTED | COMMENTED (Review prop) + { name: 'state', type: 'STRING', nullable: true }, // issue/PR state: open | closed | merged + { name: 'pr_number', type: 'INT64', nullable: true }, // linkage: PR a review is on / PR that references a commit + { name: 'created_at', type: 'TIMESTAMP', nullable: true }, // event time (primary timestamp) + { name: 'payload', type: 'JSON', nullable: true }, // small type-specific structural extras (no content) +]) + +/** @type {{ columns: ColumnSpec[] }} */ +export const GITHUB_EVENTS_SCHEMA = { columns: [...GITHUB_EVENTS_COLUMNS] } + +/** + * On-disk table path for `github_events`. The plugin writes through the kernel + * cache service; the service owns durable spool + Iceberg flush details. + * + * @param {QueryStorageService} storage + */ +export function githubEventsTablePath(storage) { + return storage.cacheTablePath(DATASET_NAME, [PARTITION_LABEL]) +} + +/** + * Discover the single `github_events` partition (gascity's single-partition + * shape — the cache is HypAware-managed, so discovery only surfaces the path). + * + * @param {DatasetDiscoveryContext} ctx + * @returns {QueryPartition[]} + */ +export function discoverParts(ctx) { + const cacheDir = ctx.cacheDir ?? '' + if (!cacheDir) return [] + const tablePath = path.join(cacheDir, 'datasets', DATASET_NAME, PARTITION_LABEL) + return [{ dataset: DATASET_NAME, partition: { partition: PARTITION_LABEL }, tablePath }] +} + +/** + * No external source file to refresh — capture writes rows through the cache + * service from the poll/backfill/sync paths — so report `skipped` (the sentinel + * the query layer tolerates), exactly like gascity and the graph datasets. + * + * @returns {Promise} + */ +export async function refreshPartition() { + return { status: 'skipped', rows: 0 } +} + +/** + * Build a squirreling-compatible data source over the materialized + * `github_events` rows. Returns an empty source when nothing is materialized so + * a query on a cold cache still succeeds. + * + * The kernel cache service writes capture rows in its **source-table** layout + * (`datasets/github_events/source=/table/…`), not under the bare + * `PARTITION_LABEL` path `discoverParts` hand-rolls — so reading + * `partition.tablePath` directly finds an empty directory and the contract + * projects nothing. Re-discover through the service (the same path + * `@hypaware/ai-gateway` uses for its live-ingested rows) and union whatever it + * surfaces, so flushed events are visible to queries and `graph project`. + * + * @param {QueryPartition[]} partitions + * @param {DatasetDataSourceContext} ctx + * @returns {Promise} + */ +export async function createDataSource(partitions, ctx) { + const storage = /** @type {ExtendedQueryStorageService} */ (ctx.storage) + const discovered = await storage.discoverCachePartitions({ datasets: [DATASET_NAME] }) + + /** @type {Set} */ + const tablePaths = new Set() + for (const p of partitions) { + if (p.tablePath) tablePaths.add(p.tablePath) + } + for (const p of discovered) { + if (p.path) tablePaths.add(p.path) + } + + /** @type {AsyncDataSource[]} */ + const sources = [] + for (const tablePath of tablePaths) { + const source = await storage.dataSourceForTable(tablePath) + if (source && (source.numRows ?? 0) > 0) sources.push(source) + } + + if (sources.length === 0) return emptySource() + if (sources.length === 1) return sources[0] + return unionSources(sources) +} + +/** + * Concatenate multiple partition sources into one. `github_events` is + * single-source today (everything lands under `source=unknown`), but keep this + * total so a future source split stays queryable without another silent + * zero-row read. + * + * @param {AsyncDataSource[]} sources + * @returns {AsyncDataSource} + */ +function unionSources(sources) { + const columns = Array.from(new Set(sources.flatMap((s) => s.columns))) + const numRows = sources.reduce((n, s) => n + (s.numRows ?? 0), 0) + return { + columns, + numRows, + scan(options) { + return { + appliedWhere: false, + appliedLimitOffset: false, + async *rows() { + for (const s of sources) { + for await (const row of s.scan(options).rows()) yield row + } + }, + } + }, + } +} + +/** @returns {AsyncDataSource} */ +function emptySource() { + return { + columns: GITHUB_EVENTS_COLUMNS.map((c) => c.name), + numRows: 0, + scan() { + return { appliedWhere: false, appliedLimitOffset: false, async *rows() {} } + }, + } +} + +/** + * The `DatasetRegistration` handed to `ctx.query.registerDataset`. + * + * @returns {DatasetRegistration} + */ +export function githubEventsDatasetRegistration() { + return { + name: DATASET_NAME, + plugin: PLUGIN_NAME, + schema: GITHUB_EVENTS_SCHEMA, + primaryTimestampColumn: 'created_at', + discoverPartitions: discoverParts, + refreshPartition, + createDataSource, + } +} diff --git a/plugins/github/src/github_client.js b/plugins/github/src/github_client.js new file mode 100644 index 0000000..bed349e --- /dev/null +++ b/plugins/github/src/github_client.js @@ -0,0 +1,265 @@ +// @ts-check + +/** + * Thin GitHub REST client. Three things it guarantees, all from the credential + * posture (LLP 0001 / hypaware LLP 0028): + * + * 1. The token is read from the configured env var **at call time** and is + * never stored on the instance beyond the request, never logged. + * 2. Error paths never copy the response **body** (which can echo a token or + * sensitive content) into the thrown error or logs — only status + the + * query-less path. + * 3. Polling is **cheap**: time-windowed endpoints carry `since`, listing + * endpoints carry `If-None-Match` (a `304` costs no rate budget). All + * cursor mechanics live here; capture just passes the per-repo cursor and + * persists it afterward (LLP 0002 §cursoring). + * + * `fetchImpl` is injectable so tests drive the client without a network. + * + * @import { GithubClient, GithubCommit, GithubComment, GithubIssue, GithubPull, GithubReview, PluginLogger, RepoCursor } from './types.d.ts' + */ + +const API_BASE = 'https://api.github.com' +const API_VERSION = '2022-11-28' +const PER_PAGE = 100 +/** Hard page cap per endpoint per run, so a runaway listing can't hang a tick. */ +const MAX_PAGES = 50 + +/** + * @param {object} opts + * @param {string} opts.tokenEnv the env-var NAME the token is read from + * @param {NodeJS.ProcessEnv} opts.env + * @param {PluginLogger} opts.log + * @param {typeof fetch} [opts.fetchImpl] + * @param {string} [opts.baseUrl] + * @returns {GithubClient} + */ +export function createGithubClient({ tokenEnv, env, log, fetchImpl, baseUrl = API_BASE }) { + const doFetch = fetchImpl ?? fetch + + /** + * One request. Reads the token fresh from `env[tokenEnv]`. Never logs it; + * never reads the response body on a non-OK status. + * + * @param {string} pathAndQuery e.g. `/repos/o/r/commits?since=...` + * @param {{ etagKey?: string, cursor?: RepoCursor }} [opts] + * @returns {Promise<{ notModified: true } | { notModified: false, data: unknown, next: string | null, etag: string | null }>} + */ + async function request(pathAndQuery, opts = {}) { + const token = env[tokenEnv] + /** @type {Record} */ + const headers = { + Accept: 'application/vnd.github+json', + 'X-GitHub-Api-Version': API_VERSION, + 'User-Agent': '@hypaware/github', + } + if (token) headers.Authorization = `Bearer ${token}` + const priorEtag = opts.etagKey && opts.cursor?.etag ? opts.cursor.etag[opts.etagKey] : undefined + if (priorEtag) headers['If-None-Match'] = priorEtag + + const url = pathAndQuery.startsWith('http') ? pathAndQuery : `${baseUrl}${pathAndQuery}` + const res = await doFetch(url, { headers }) + + if (res.status === 304) return { notModified: true } + if (!res.ok) { + // No body, no token, no query string — just status + the path. + const safePath = pathOf(url) + const err = /** @type {import('./types.d.ts').HypError} */ (new Error(`GitHub API ${res.status} for GET ${safePath}`)) + err.hypErrorKind = 'github_api_error' + err.status = res.status + throw err + } + + const etag = res.headers.get('etag') + if (opts.etagKey && opts.cursor && etag) { + if (!opts.cursor.etag) opts.cursor.etag = {} + opts.cursor.etag[opts.etagKey] = etag + } + const next = parseNextLink(res.headers.get('link')) + const data = await res.json() + return { notModified: false, data, next, etag } + } + + /** + * Fetch all pages of a listing. The first page is conditional (If-None-Match) + * when an `etagKey` is given; a `304` there means "unchanged" → `[]`. + * + * @param {string} firstPathAndQuery + * @param {{ etagKey?: string, cursor?: RepoCursor, label: string }} opts + * @returns {Promise} raw JSON objects (untyped at the API boundary) + */ + async function paginate(firstPathAndQuery, opts) { + /** @type {any[]} */ + const out = [] + let url = firstPathAndQuery + let first = true + for (let page = 0; page < MAX_PAGES && url; page++) { + const result = await request(url, { etagKey: first ? opts.etagKey : undefined, cursor: opts.cursor }) + if (result.notModified) return [] + if (Array.isArray(result.data)) out.push(.../** @type {Record[]} */ (result.data)) + url = result.next ?? '' + first = false + if (page === MAX_PAGES - 1 && url) { + log.warn('github.listing_truncated', { label: opts.label, max_pages: MAX_PAGES, per_page: PER_PAGE }) + } + } + return out + } + + return { + async listOrgRepos(org) { + const rows = await paginate(`/orgs/${enc(org)}/repos?per_page=${PER_PAGE}`, { label: `orgs/${org}/repos` }) + /** @type {string[]} */ + const out = [] + for (const r of rows) { + const full = typeof r.full_name === 'string' ? r.full_name : null + if (full) out.push(full) + } + return out + }, + + async listIssues(owner, repo, cursor) { + const q = sinceQuery(cursor.since?.issues) + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/issues?state=all&per_page=${PER_PAGE}${q}`, { label: `${owner}/${repo}/issues`, cursor }) + advanceSince(cursor, 'issues', rows) + return /** @type {GithubIssue[]} */ (rows) + }, + + async listPullRequests(owner, repo, cursor) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls?state=all&per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls`, etagKey: 'pulls', cursor }) + return /** @type {GithubPull[]} */ (rows) + }, + + async listPullRequestFiles(owner, repo, number) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls/${number}/files?per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls/${number}/files` }) + return filenames(rows) + }, + + async listPullRequestReviews(owner, repo, number) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls/${number}/reviews?per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls/${number}/reviews` }) + return /** @type {GithubReview[]} */ (rows) + }, + + async listPullRequestCommits(owner, repo, number) { + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/pulls/${number}/commits?per_page=${PER_PAGE}`, { label: `${owner}/${repo}/pulls/${number}/commits` }) + return /** @type {GithubCommit[]} */ (rows) + }, + + async listCommits(owner, repo, cursor) { + const q = sinceQuery(cursor.since?.commits) + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/commits?per_page=${PER_PAGE}${q}`, { label: `${owner}/${repo}/commits`, cursor }) + advanceSinceCommit(cursor, rows) + return /** @type {GithubCommit[]} */ (rows) + }, + + async listCommitFiles(owner, repo, sha) { + const result = await request(`/repos/${enc(owner)}/${enc(repo)}/commits/${enc(sha)}`) + if (result.notModified) return [] + const data = /** @type {Record} */ (result.data) + return filenames(Array.isArray(data.files) ? /** @type {Record[]} */ (data.files) : []) + }, + + async listIssueComments(owner, repo, cursor) { + const q = sinceQuery(cursor.since?.comments) + const rows = await paginate(`/repos/${enc(owner)}/${enc(repo)}/issues/comments?per_page=${PER_PAGE}${q}`, { label: `${owner}/${repo}/issues/comments`, cursor }) + advanceSince(cursor, 'comments', rows) + return /** @type {GithubComment[]} */ (rows) + }, + } +} + +/** + * @param {string | undefined} since + * @returns {string} + */ +function sinceQuery(since) { + return since ? `&since=${encodeURIComponent(since)}` : '' +} + +/** + * Advance a `since` high-water to the newest `updated_at`/`created_at` in the + * fetched rows. Using `updated_at` means an edited item re-qualifies next poll. + * + * @param {RepoCursor} cursor + * @param {'issues' | 'comments'} key + * @param {Record[]} rows + */ +function advanceSince(cursor, key, rows) { + let max = cursor.since?.[key] + for (const r of rows) { + const t = typeof r.updated_at === 'string' ? r.updated_at : typeof r.created_at === 'string' ? r.created_at : null + if (t && (!max || t > max)) max = t + } + if (max) { + if (!cursor.since) cursor.since = {} + cursor.since[key] = max + } +} + +/** + * Advance the commit `since` to the newest committer date seen. + * + * @param {RepoCursor} cursor + * @param {Record[]} rows + */ +function advanceSinceCommit(cursor, rows) { + let max = cursor.since?.commits + for (const r of rows) { + const commit = /** @type {Record | undefined} */ (r.commit) + const committer = commit && typeof commit.committer === 'object' ? /** @type {Record} */ (commit.committer) : null + const author = commit && typeof commit.author === 'object' ? /** @type {Record} */ (commit.author) : null + const t = (committer && typeof committer.date === 'string' ? committer.date : null) ?? (author && typeof author.date === 'string' ? author.date : null) + if (t && (!max || t > max)) max = t + } + if (max) { + if (!cursor.since) cursor.since = {} + cursor.since.commits = max + } +} + +/** + * @param {Record[]} rows + * @returns {string[]} + */ +function filenames(rows) { + /** @type {string[]} */ + const out = [] + for (const r of rows) { + if (typeof r.filename === 'string') out.push(r.filename) + } + return out +} + +/** + * Parse the `rel="next"` URL from a GitHub `Link` header. + * + * @param {string | null} link + * @returns {string | null} + */ +function parseNextLink(link) { + if (!link) return null + for (const part of link.split(',')) { + const m = /<([^>]+)>\s*;\s*rel="next"/.exec(part) + if (m) return m[1] + } + return null +} + +/** + * The path (no query string) of a URL, for safe error messages. + * + * @param {string} url + * @returns {string} + */ +function pathOf(url) { + try { + return new URL(url).pathname + } catch { + return url.split('?')[0] + } +} + +/** @param {string} segment @returns {string} */ +function enc(segment) { + return encodeURIComponent(segment) +} diff --git a/plugins/github/src/graph_contract.js b/plugins/github/src/graph_contract.js new file mode 100644 index 0000000..9d223da --- /dev/null +++ b/plugins/github/src/graph_contract.js @@ -0,0 +1,355 @@ +// @ts-check + +import { + actorKey, + commitKey, + fileKey, + issueKey, + pullRequestKey, + repoKey, + reviewKey, + str, +} from './keys.js' +import { DATASET_NAME } from './dataset.js' + +/** + * @import { ContractRule, GraphContract, GraphKit } from './types.d.ts' + */ + +/** This plugin's name, stamped on the contract for provenance/dedup. */ +export const PLUGIN_NAME = '@hypaware/github' +/** The dataset the contract reads (this plugin's own skeleton). */ +export const SOURCE_DATASET = DATASET_NAME +/** Projector id stamped into every row's provenance. */ +export const PROJECTOR = 'github.t0' +/** + * Projector version, stamped into provenance to mark which projector + * generation minted a row. NOT a re-projection trigger — ids are + * content-addressed (hypaware LLP 0023 §inline-provenance). + */ +export const PROJECTOR_VERSION = 1 + +/** + * Build the `github_events → graph` T0 contract: a hand-authored list of + * read-only `SELECT` + `toRow` rules realizing the LLP 0003 taxonomy. Rows are + * built with the graph plugin's `kit`, so the id recipe and provenance columns + * stay owned by the graph plugin — this plugin owns only the SQL + `toRow` + * semantics and the natural-key normalization ({@link file:./keys.js}). + * + * Four node types (`Repo`, `Actor`, `Commit`, `File`) use **bridge-ready** keys + * so a node minted by another domain converges automatically (LLP 0003/0004); + * `Issue`, `PullRequest`, `Review` are GitHub-internal. + * + * @ref LLP 0003#node-types [implements] — node/edge taxonomy + bridge-ready natural keys + * @ref hypaware LLP 0023#contract-contribution [constrained-by] — central id+provenance kit; this plugin owns only its rules + * + * @param {GraphKit} kit + * @returns {GraphContract} + */ +export function createGithubGraphContract(kit) { + const { buildNode, buildEdge } = kit.makeRowBuilders({ + sourceDataset: SOURCE_DATASET, + projector: PROJECTOR, + projectorVersion: PROJECTOR_VERSION, + }) + + /** @type {ContractRule[]} */ + const rules = [ + // ---------------------------------------------------------------- nodes + + // Repo per repo column — every event row mentions its repo. ★ bridge-ready. + { + kind: 'node', + type: 'Repo', + sql: `SELECT repo, created_at FROM ${SOURCE_DATASET}`, + toRow(r) { + const key = repoKey(r.repo) + if (!key) return null + return buildNode({ type: 'Repo', key, label: key, firstSeen: r.created_at, sourceKeys: { repo: key } }) + }, + }, + + // Actor per login, from any row carrying an actor. ★ bridge-ready. + // Identity-merge across domains (github login vs session user_id) is T1/T2, + // not here (LLP 0004 §actor-identity-is-enrichment-not-t0). + { + kind: 'node', + type: 'Actor', + sql: `SELECT actor_login, actor_type, created_at FROM ${SOURCE_DATASET} WHERE actor_login IS NOT NULL`, + toRow(r) { + const key = actorKey(r.actor_login) + if (!key) return null + return buildNode({ + type: 'Actor', + key, + label: key, + props: pruned({ actor_type: str(r.actor_type) }), + firstSeen: r.created_at, + sourceKeys: { actor_login: key }, + }) + }, + }, + + // Commit per sha. ★ bridge-ready — sha is globally unique, NOT repo-qualified. + { + kind: 'node', + type: 'Commit', + sql: `SELECT sha, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit'`, + toRow(r) { + const key = commitKey(r.sha) + if (!key) return null + return buildNode({ type: 'Commit', key, firstSeen: r.created_at, sourceKeys: { sha: key } }) + }, + }, + + // File per owner/repo:relpath, from the file-grained rows. ★ bridge-ready. + { + kind: 'node', + type: 'File', + sql: `SELECT repo, path, created_at FROM ${SOURCE_DATASET} WHERE path IS NOT NULL`, + toRow(r) { + const key = fileKey(r.repo, r.path) + if (!key) return null + return buildNode({ type: 'File', key, label: basename(key), firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), path: str(r.path) } }) + }, + }, + + // Issue per owner/repo#number. GitHub-internal. + { + kind: 'node', + type: 'Issue', + sql: `SELECT repo, number, state, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'issue'`, + toRow(r) { + const key = issueKey(r.repo, r.number) + if (!key) return null + return buildNode({ type: 'Issue', key, props: pruned({ state: str(r.state) }), firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), number: numOrNull(r.number) } }) + }, + }, + + // PullRequest per owner/repo#number. GitHub-internal. + { + kind: 'node', + type: 'PullRequest', + sql: `SELECT repo, number, state, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'pull_request'`, + toRow(r) { + const key = pullRequestKey(r.repo, r.number) + if (!key) return null + return buildNode({ type: 'PullRequest', key, props: pruned({ state: str(r.state) }), firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), number: numOrNull(r.number) } }) + }, + }, + + // Review per review/. GitHub-internal. + { + kind: 'node', + type: 'Review', + sql: `SELECT review_id, review_state, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'review'`, + toRow(r) { + const key = reviewKey(r.review_id) + if (!key) return null + return buildNode({ type: 'Review', key, props: pruned({ state: str(r.review_state) }), firstSeen: r.created_at, sourceKeys: { review_id: numOrNull(r.review_id) } }) + }, + }, + + // ---------------------------------------------------------------- edges + + // Repo membership. + repoMembership('Issue', 'issue', issueKey), + repoMembership('PullRequest', 'pull_request', pullRequestKey), + { + kind: 'edge', + type: 'in', + sql: `SELECT repo, sha, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit'`, + toRow(r) { + const repo = repoKey(r.repo) + const sha = commitKey(r.sha) + if (!repo || !sha) return null + return buildEdge({ type: 'in', srcType: 'Commit', srcKey: sha, dstType: 'Repo', dstKey: repo, firstSeen: r.created_at, sourceKeys: { sha, repo } }) + }, + }, + { + kind: 'edge', + type: 'in', + sql: `SELECT repo, path, created_at FROM ${SOURCE_DATASET} WHERE path IS NOT NULL`, + toRow(r) { + const repo = repoKey(r.repo) + const file = fileKey(r.repo, r.path) + if (!repo || !file) return null + return buildEdge({ type: 'in', srcType: 'File', srcKey: file, dstType: 'Repo', dstKey: repo, firstSeen: r.created_at, sourceKeys: { repo, path: str(r.path) } }) + }, + }, + + // Authorship / activity. + actorTo('opened', 'Issue', 'issue', issueKey), + actorTo('opened', 'PullRequest', 'pull_request', pullRequestKey), + { + kind: 'edge', + type: 'authored', + sql: `SELECT actor_login, sha, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit' AND actor_login IS NOT NULL`, + toRow(r) { + const actor = actorKey(r.actor_login) + const sha = commitKey(r.sha) + if (!actor || !sha) return null + return buildEdge({ type: 'authored', srcType: 'Actor', srcKey: actor, dstType: 'Commit', dstKey: sha, firstSeen: r.created_at, sourceKeys: { actor_login: actor, sha } }) + }, + }, + actorTo('commented', 'Issue', 'issue_comment', issueKey), + actorTo('commented', 'PullRequest', 'pull_request_comment', pullRequestKey), + + // Review. + { + kind: 'edge', + type: 'submitted', + sql: `SELECT actor_login, review_id, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'review' AND actor_login IS NOT NULL`, + toRow(r) { + const actor = actorKey(r.actor_login) + const review = reviewKey(r.review_id) + if (!actor || !review) return null + return buildEdge({ type: 'submitted', srcType: 'Actor', srcKey: actor, dstType: 'Review', dstKey: review, firstSeen: r.created_at, sourceKeys: { actor_login: actor, review_id: numOrNull(r.review_id) } }) + }, + }, + { + kind: 'edge', + type: 'on', + sql: `SELECT repo, review_id, pr_number, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'review' AND pr_number IS NOT NULL`, + toRow(r) { + const review = reviewKey(r.review_id) + const pr = pullRequestKey(r.repo, r.pr_number) + if (!review || !pr) return null + return buildEdge({ type: 'on', srcType: 'Review', srcKey: review, dstType: 'PullRequest', dstKey: pr, firstSeen: r.created_at, sourceKeys: { review_id: numOrNull(r.review_id), pr_number: numOrNull(r.pr_number) } }) + }, + }, + + // Code. + { + kind: 'edge', + type: 'touched', + sql: `SELECT sha, repo, path, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit_file'`, + toRow(r) { + const sha = commitKey(r.sha) + const file = fileKey(r.repo, r.path) + if (!sha || !file) return null + return buildEdge({ type: 'touched', srcType: 'Commit', srcKey: sha, dstType: 'File', dstKey: file, firstSeen: r.created_at, sourceKeys: { sha, repo: repoKey(r.repo), path: str(r.path) } }) + }, + }, + { + kind: 'edge', + type: 'touched', + sql: `SELECT repo, number, path, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'pull_request_file'`, + toRow(r) { + const pr = pullRequestKey(r.repo, r.number) + const file = fileKey(r.repo, r.path) + if (!pr || !file) return null + return buildEdge({ type: 'touched', srcType: 'PullRequest', srcKey: pr, dstType: 'File', dstKey: file, firstSeen: r.created_at, sourceKeys: { repo: repoKey(r.repo), number: numOrNull(r.number), path: str(r.path) } }) + }, + }, + + // Linkage: a PR references the commits captured under it. + { + kind: 'edge', + type: 'references', + sql: `SELECT repo, sha, pr_number, created_at FROM ${SOURCE_DATASET} WHERE event_type = 'commit' AND pr_number IS NOT NULL`, + toRow(r) { + const pr = pullRequestKey(r.repo, r.pr_number) + const sha = commitKey(r.sha) + if (!pr || !sha) return null + return buildEdge({ type: 'references', srcType: 'PullRequest', srcKey: pr, dstType: 'Commit', dstKey: sha, firstSeen: r.created_at, sourceKeys: { pr_number: numOrNull(r.pr_number), sha } }) + }, + }, + ] + + return { + name: 'github-t0', + plugin: PLUGIN_NAME, + sourceDataset: SOURCE_DATASET, + projector: PROJECTOR, + projectorVersion: PROJECTOR_VERSION, + rules, + } + + /** + * ` -in-> Repo` for an event-discriminated subject keyed on number. + * + * @param {string} subjectType + * @param {string} eventType + * @param {(repo: unknown, number: unknown) => string | null} subjectKey + * @returns {ContractRule} + */ + function repoMembership(subjectType, eventType, subjectKey) { + return { + kind: 'edge', + type: 'in', + sql: `SELECT repo, number, created_at FROM ${SOURCE_DATASET} WHERE event_type = '${eventType}'`, + toRow(r) { + const repo = repoKey(r.repo) + const subject = subjectKey(r.repo, r.number) + if (!repo || !subject) return null + return buildEdge({ type: 'in', srcType: subjectType, srcKey: subject, dstType: 'Repo', dstKey: repo, firstSeen: r.created_at, sourceKeys: { repo, number: numOrNull(r.number) } }) + }, + } + } + + /** + * `Actor --> ` for an event-discriminated subject keyed on number. + * + * @param {string} rel + * @param {string} subjectType + * @param {string} eventType + * @param {(repo: unknown, number: unknown) => string | null} subjectKey + * @returns {ContractRule} + */ + function actorTo(rel, subjectType, eventType, subjectKey) { + return { + kind: 'edge', + type: rel, + sql: `SELECT repo, number, actor_login, created_at FROM ${SOURCE_DATASET} WHERE event_type = '${eventType}' AND actor_login IS NOT NULL`, + toRow(r) { + const actor = actorKey(r.actor_login) + const subject = subjectKey(r.repo, r.number) + if (!actor || !subject) return null + return buildEdge({ type: rel, srcType: 'Actor', srcKey: actor, dstType: subjectType, dstKey: subject, firstSeen: r.created_at, sourceKeys: { actor_login: actor, repo: repoKey(r.repo), number: numOrNull(r.number) } }) + }, + } + } +} + +/** + * Basename of a `File` natural key (`owner/repo:relpath`) — the label only. + * + * @param {string} key + * @returns {string} + */ +function basename(key) { + const colon = key.indexOf(':') + const relpath = colon >= 0 ? key.slice(colon + 1) : key + const slash = relpath.lastIndexOf('/') + return slash >= 0 ? relpath.slice(slash + 1) : relpath +} + +/** + * Coerce a GitHub number to a JS number for `source_keys` provenance, or null. + * + * @param {unknown} value + * @returns {number | null} + */ +function numOrNull(value) { + if (typeof value === 'number' && Number.isFinite(value)) return value + if (typeof value === 'bigint') return Number(value) + if (typeof value === 'string' && /^[0-9]+$/.test(value)) return Number(value) + return null +} + +/** + * Drop null/undefined entries so identical inputs build identical props + * (keys sorted for stable JSON), matching the ai-gateway contract. + * + * @param {Record} obj + * @returns {Record} + */ +function pruned(obj) { + /** @type {Record} */ + const out = {} + for (const key of Object.keys(obj).sort()) { + if (obj[key] != null) out[key] = obj[key] + } + return out +} diff --git a/plugins/github/src/index.js b/plugins/github/src/index.js new file mode 100644 index 0000000..0d88360 --- /dev/null +++ b/plugins/github/src/index.js @@ -0,0 +1,91 @@ +// @ts-check + +import { runGithub, runGithubBackfill, runGithubSync } from './commands.js' +import { validateGithubConfig } from './config.js' +import { githubEventsDatasetRegistration } from './dataset.js' +import { createGithubGraphContract } from './graph_contract.js' +import { setGithubRuntime } from './runtime.js' +import { startGithubSource } from './source.js' + +/** + * @import { ContextGraphCapabilityLike, ExtendedQueryStorageService, HypError, PluginActivationContext, ValidationResult } from './types.d.ts' + */ + +const PLUGIN_NAME = '@hypaware/github' + +/** + * Activate `@hypaware/github`. Registers the `github` config section, validates + * config, resolves the context-graph capability, registers the `github_events` + * dataset, **registers the bundled T0 contract** with the graph, and registers + * the `github` poll source + `github backfill`/`sync` commands. + * + * The graph capability is resolved **eagerly** (its `kit` + `registerContract` + * are needed now) and the plugin dependency on `@hypaware/context-graph` + * guarantees it activated first: the resolver orders by `requires.plugins`, not + * `requires.capabilities` (LLP 0001 §both-dependency-kinds; hypaware LLP 0006). + * + * @ref LLP 0001#decision [implements] — bundle the contract; require context-graph as plugin + capability; register in activate() + * + * @param {PluginActivationContext} ctx + */ +export async function activate(ctx) { + ctx.configRegistry.registerSection({ + plugin: PLUGIN_NAME, + section: 'github', + validate: (value) => toValidationResult(validateGithubConfig(value)), + }) + + const validated = validateGithubConfig(ctx.config) + if (!validated.ok) { + const detail = validated.errors.map((e) => `${e.pointer || '/'}: ${e.message}`).join('; ') + const err = /** @type {HypError} */ (new Error(`${PLUGIN_NAME}: invalid config — ${detail}`)) + err.hypErrorKind = 'github_config_invalid' + throw err + } + const config = validated.config + + const graph = /** @type {ContextGraphCapabilityLike} */ ( + ctx.requireCapability('hypaware.context-graph', '^1.0.0') + ) + + ctx.query.registerDataset(githubEventsDatasetRegistration()) + + setGithubRuntime({ + ctx, + config, + log: ctx.log, + stateDir: ctx.paths.stateDir, + storage: /** @type {ExtendedQueryStorageService} */ (ctx.storage), + env: ctx.env, + }) + + graph.registerContract(createGithubGraphContract(graph.kit)) + + ctx.sources.register({ + name: 'github', + plugin: PLUGIN_NAME, + summary: 'Ongoing poll (opt-in via poll_interval): append events past each repo cursor', + configSection: 'github', + start: startGithubSource, + }) + + ctx.commands.register({ name: 'github', plugin: PLUGIN_NAME, summary: 'GitHub capture', usage: 'hyp github ', run: runGithub }) + ctx.commands.register({ name: 'github backfill', plugin: PLUGIN_NAME, summary: 'Pull full history into github_events', usage: 'hyp github backfill [owner/repo ...]', run: runGithubBackfill }) + ctx.commands.register({ name: 'github sync', plugin: PLUGIN_NAME, summary: 'Run one poll tick now', usage: 'hyp github sync', run: runGithubSync }) + + ctx.log.info('github.activated', { + repos: config.repos.length, + orgs: config.orgs.length, + ignore: config.ignore.length, + poll: config.poll_interval ?? null, + }) +} + +/** + * @param {ReturnType} result + * @returns {ValidationResult} + */ +function toValidationResult(result) { + if (result.ok) return { ok: true } + return { ok: false, errors: result.errors.map((e) => ({ pointer: e.pointer, message: e.message })) } +} diff --git a/plugins/github/src/keys.js b/plugins/github/src/keys.js new file mode 100644 index 0000000..45fbdae --- /dev/null +++ b/plugins/github/src/keys.js @@ -0,0 +1,185 @@ +// @ts-check + +/** + * Natural-key construction + normalization for the GitHub graph taxonomy. + * + * Graph node ids are content-addressed over `(kind, type, natural key)` + * (hypaware LLP 0023 §content-addressed-ids), so the key *is* the identity: + * two contracts converge iff they normalize identically. These helpers are the + * one place the rules are stated — the contract's `toRow`s call them rather + * than each re-deriving a key — so a `Repo`/`Commit`/`File`/`Actor` minted by + * another domain (the LLM-session graph) lands on the same id and merges. + * + * Changing any recipe here orphans every committed graph row that used the old + * key; `test/graph-ids.test.js` digest-pins the outputs so a change can only + * happen deliberately, with a migration. + * + * @ref LLP 0003#key-normalization-rules [implements] — owner/repo/login lowercased; relpath POSIX; sha full-40-hex lowercase + */ + +/** + * Lowercase a GitHub identifier (`owner`, `repo`, `login`). GitHub looks these + * up case-insensitively but preserves display case; lowercasing guarantees + * convergence and kills case-drift. Display case, if needed, rides in `props`. + * + * @param {unknown} value + * @returns {string | null} + */ +export function normalizeLogin(value) { + const s = str(value) + return s ? s.toLowerCase() : null +} + +/** + * Repo-relative POSIX path: forward slashes, no leading slash, no `./`. The + * local-absolute → repo-relative reconciliation is the *other* side's job + * (LLP 0004); here we only canonicalize a path GitHub already returns + * repo-relative. + * + * @param {unknown} value + * @returns {string | null} + */ +export function normalizeRelpath(value) { + let s = str(value) + if (!s) return null + s = s.replace(/\\/g, '/') + while (s.startsWith('./')) s = s.slice(2) + while (s.startsWith('/')) s = s.slice(1) + return s.length > 0 ? s : null +} + +/** + * `Repo` key — `owner/repo`, lowercased. Accepts either `(owner, repo)` or a + * single `owner/repo` string. github.com is implied in V1 (no host segment — + * see LLP 0003 §multi-host). + * + * @param {unknown} ownerOrFull + * @param {unknown} [repo] + * @returns {string | null} + */ +export function repoKey(ownerOrFull, repo) { + if (repo === undefined) { + const full = str(ownerOrFull) + if (!full) return null + const slash = full.indexOf('/') + if (slash <= 0 || slash === full.length - 1) return null + return `${full.slice(0, slash).toLowerCase()}/${full.slice(slash + 1).toLowerCase()}` + } + const o = normalizeLogin(ownerOrFull) + const r = normalizeLogin(repo) + if (!o || !r) return null + return `${o}/${r}` +} + +/** + * `Actor` key — `login`, lowercased. + * + * @param {unknown} login + * @returns {string | null} + */ +export function actorKey(login) { + return normalizeLogin(login) +} + +/** + * `Commit` key — full 40-hex `sha`, lowercased; never abbreviated. The sha is + * globally unique across git, so it is NOT repo-qualified — that is what makes + * it the cleanest bridge key and lets it converge across forks (LLP 0003). + * + * @param {unknown} sha + * @returns {string | null} + */ +export function commitKey(sha) { + const s = str(sha) + return s ? s.toLowerCase() : null +} + +/** + * `File` key — `owner/repo:relpath`. The repo half is normalized via + * {@link repoKey}, the path via {@link normalizeRelpath}. A rename is a new + * `File` (T0 keys path, not content). + * + * @param {unknown} repoFull `owner/repo` (any case) + * @param {unknown} relpath + * @returns {string | null} + */ +export function fileKey(repoFull, relpath) { + const rk = repoKey(repoFull) + const rp = normalizeRelpath(relpath) + if (!rk || !rp) return null + return `${rk}:${rp}` +} + +/** + * `Issue` key — `owner/repo#number`. GitHub shares one number space across + * issues+PRs per repo, so a number is *either* an issue or a PR (no collision); + * the node `type` separates them in the hash regardless (LLP 0003). + * + * @param {unknown} repoFull + * @param {unknown} number + * @returns {string | null} + */ +export function issueKey(repoFull, number) { + return numberedKey(repoFull, number) +} + +/** + * `PullRequest` key — `owner/repo#number` (see {@link issueKey} on the shared + * number space). + * + * @param {unknown} repoFull + * @param {unknown} number + * @returns {string | null} + */ +export function pullRequestKey(repoFull, number) { + return numberedKey(repoFull, number) +} + +/** + * `Review` key — `review/`. The GitHub review id is globally unique. + * + * @param {unknown} reviewId + * @returns {string | null} + */ +export function reviewKey(reviewId) { + const n = intStr(reviewId) + return n ? `review/${n}` : null +} + +/** + * @param {unknown} repoFull + * @param {unknown} number + * @returns {string | null} + */ +function numberedKey(repoFull, number) { + const rk = repoKey(repoFull) + const n = intStr(number) + if (!rk || !n) return null + return `${rk}#${n}` +} + +/** + * Coerce a value to a non-empty string, or null. Numbers/bigints stringify. + * + * @param {unknown} value + * @returns {string | null} + */ +export function str(value) { + if (typeof value === 'string') return value.length > 0 ? value : null + if (typeof value === 'number' || typeof value === 'bigint') return String(value) + return null +} + +/** + * Coerce a GitHub integer id/number (which may arrive as a number, bigint, or + * numeric string) to its canonical decimal string, rejecting non-integers. + * + * @param {unknown} value + * @returns {string | null} + */ +function intStr(value) { + if (typeof value === 'number') return Number.isInteger(value) ? String(value) : null + if (typeof value === 'bigint') return String(value) + if (typeof value === 'string' && /^[0-9]+$/.test(value)) return value + return null +} diff --git a/plugins/github/src/runtime.js b/plugins/github/src/runtime.js new file mode 100644 index 0000000..672346f --- /dev/null +++ b/plugins/github/src/runtime.js @@ -0,0 +1,52 @@ +// @ts-check + +import { createGithubClient } from './github_client.js' + +/** + * Module-local runtime singleton (same pattern as context-graph-enrich and + * gascity): `activate()` captures the resolved config + handles, and the daemon + * source / commands retrieve it. Keeps the source `start` and command `run` + * functions free of constructor plumbing. + * + * @import { GithubClient, GithubConfig, PluginActivationContext, PluginLogger, ExtendedQueryStorageService } from './types.d.ts' + */ + +/** + * @typedef {object} GithubRuntime + * @property {PluginActivationContext} ctx + * @property {GithubConfig} config + * @property {PluginLogger} log + * @property {string} stateDir + * @property {ExtendedQueryStorageService} storage + * @property {NodeJS.ProcessEnv} env + * @property {(() => GithubClient) | undefined} [clientFactory] test seam; defaults to the real fetch client + */ + +/** @type {GithubRuntime | null} */ +let runtime = null + +/** @param {GithubRuntime} value */ +export function setGithubRuntime(value) { + runtime = value +} + +/** @returns {GithubRuntime} */ +export function requireGithubRuntime() { + if (!runtime) { + throw new Error('@hypaware/github: not activated yet — runtime singleton is empty') + } + return runtime +} + +/** + * Build a GitHub client for the current config. The token is read from the + * configured env var inside the client at request time, so a client built here + * never closes over the secret. + * + * @param {GithubRuntime} rt + * @returns {GithubClient} + */ +export function getClient(rt) { + if (rt.clientFactory) return rt.clientFactory() + return createGithubClient({ tokenEnv: rt.config.token_env, env: rt.env, log: rt.log }) +} diff --git a/plugins/github/src/source.js b/plugins/github/src/source.js new file mode 100644 index 0000000..35f5564 --- /dev/null +++ b/plugins/github/src/source.js @@ -0,0 +1,86 @@ +// @ts-check + +import { parseInterval } from './config.js' +import { requireGithubRuntime } from './runtime.js' +import { runCaptureTick } from './tick.js' + +/** + * `startGithubSource` is the `SourceContribution.start` callback. The poll is + * **opt-in**: it ticks only when `poll_interval` is configured; omit it and the + * source idles and capture is driven purely by `backfill` / `sync` (LLP 0002 + * §ongoing-poll-is-opt-in — capturing from an external API with a credential is + * a deliberate act that must not start itself). + * + * @ref LLP 0002#ongoing-poll-is-opt-in [implements] — poll only when poll_interval is set + * + * @import { SourceStatus, StartedSource } from './types.d.ts' + * @returns {Promise} + */ +export async function startGithubSource() { + const runtime = requireGithubRuntime() + /** @type {ReturnType | null} */ + let handle = null + /** @type {Promise | null} */ + let inFlight = null + /** @type {string | null} */ + let lastTickAt = null + let rowsWritten = 0 + /** @type {string | undefined} */ + let lastError + + async function tick() { + lastTickAt = new Date().toISOString() + try { + const result = await runCaptureTick(runtime, { mode: 'poll' }) + rowsWritten += result.events + lastError = result.errors[0]?.error + } catch (err) { + lastError = err instanceof Error ? err.message : String(err) + runtime.log.error('github.poll_tick_failed', { error: lastError }) + } + } + + function startTimer() { + const interval = runtime.config.poll_interval + if (!interval) return + const ms = parseInterval(interval) + if (ms === null) return + handle = setInterval(() => { + if (inFlight) return + inFlight = tick().finally(() => { inFlight = null }) + }, Math.max(1, ms)) + if (typeof handle.unref === 'function') handle.unref() + } + + function stopTimer() { + if (handle) clearInterval(handle) + handle = null + } + + startTimer() + + return { + async status() { + const polling = handle !== null + /** @type {SourceStatus} */ + const status = { + state: polling ? 'ready' : 'stopped', + message: runtime.config.poll_interval + ? `polling ${runtime.config.repos.length + runtime.config.orgs.length} selection(s) every ${runtime.config.poll_interval}` + : 'idle (no poll_interval; use `hyp github backfill` / `hyp github sync`)', + details: { last_tick_at: lastTickAt }, + rowsWritten, + } + if (lastError) status.lastError = lastError + return status + }, + async reload() { + stopTimer() + startTimer() + }, + async stop() { + stopTimer() + if (inFlight) await inFlight.catch(() => {}) + }, + } +} diff --git a/plugins/github/src/tick.js b/plugins/github/src/tick.js new file mode 100644 index 0000000..e47ebfd --- /dev/null +++ b/plugins/github/src/tick.js @@ -0,0 +1,48 @@ +// @ts-check + +import { captureRepos } from './capture.js' +import { readCursors, writeCursors } from './cursors.js' +import { GITHUB_EVENTS_COLUMNS, githubEventsTablePath } from './dataset.js' +import { getClient } from './runtime.js' + +/** + * Run one capture tick: read the per-repo cursors, capture every selected repo + * (appending `github_events` rows through the kernel cache), then persist the + * advanced cursors. Shared by the daemon poll source and the `sync`/`backfill` + * commands — the only difference is `mode` (and the optional `only` filter). + * + * Cursors are persisted even when a repo errors mid-run, so progress is never + * lost (the next tick resumes past what was captured). + * + * @import { GithubRuntime } from './runtime.js' + * + * @param {GithubRuntime} runtime + * @param {{ mode: 'backfill' | 'poll', only?: string[] }} opts + * @returns {Promise<{ repos: number, events: number, errors: Array<{ repo: string, error: string }> }>} + */ +export async function runCaptureTick(runtime, opts) { + const cursors = readCursors(runtime.stateDir) + const client = getClient(runtime) + const tablePath = githubEventsTablePath(runtime.storage) + const columns = [...GITHUB_EVENTS_COLUMNS] + + /** @param {Record[]} rows */ + async function append(rows) { + if (rows.length === 0) return + await runtime.storage.appendRows(tablePath, columns, rows) + } + + try { + return await captureRepos({ + client, + config: runtime.config, + cursors, + append, + log: runtime.log, + mode: opts.mode, + only: opts.only, + }) + } finally { + writeCursors(runtime.stateDir, cursors) + } +} diff --git a/plugins/github/src/types.d.ts b/plugins/github/src/types.d.ts new file mode 100644 index 0000000..55c9064 --- /dev/null +++ b/plugins/github/src/types.d.ts @@ -0,0 +1,243 @@ +// Shared interfaces for the @hypaware/github plugin. +// +// Two kinds of types live here, kept apart deliberately: +// +// 1. Kernel types — re-exported from the host runtime's published +// `collectivus-plugin-kernel-types.d.ts`. This file is the SINGLE place +// that references the sibling `hypaware` repo, so the rest of `src/` +// imports kernel shapes from `./types.d.ts` and never reaches across +// repos directly. The sibling layout (`~/workspace/hypaware`) is the +// documented assumption for this plugin (see AGENTS.md / LLP 0000). +// +// 2. Capability + plugin-local types — declared here directly. Like +// `@hypaware/ai-gateway-graph`, this plugin RE-DECLARES the +// `hypaware.context-graph` capability shape rather than importing the +// provider's internals: a contract author consumes only the kit's public +// surface, so the id recipe + provenance columns stay owned by the graph +// plugin (hypaware LLP 0023 §contract-contribution). + +import type { + AsyncDataSource, + ColumnSpec, + CommandRunContext, + DatasetDataSourceContext, + DatasetDiscoveryContext, + DatasetRefreshResult, + DatasetRegistration, + HypError, + JsonObject, + PluginActivationContext, + PluginLogger, + QueryPartition, + QueryStorageService, + SourceStatus, + StartedSource, + ValidationError, + ValidationResult, +} from '../../../../hypaware/collectivus-plugin-kernel-types.d.ts' + +export type { + AsyncDataSource, + ColumnSpec, + CommandRunContext, + DatasetDataSourceContext, + DatasetDiscoveryContext, + DatasetRefreshResult, + DatasetRegistration, + HypError, + JsonObject, + PluginActivationContext, + PluginLogger, + QueryPartition, + QueryStorageService, + SourceStatus, + StartedSource, + ValidationError, + ValidationResult, +} + +// --------------------------------------------------------------------------- +// Storage: the extended cache surface the kernel hands plugins. `AsyncDataSource` +// is the kernel's own re-export of the `squirreling` type (so a dataset's +// `createDataSource` return matches `DatasetRegistration`); only the +// `dataSourceForTable` extension is added here. +// --------------------------------------------------------------------------- + +/** `QueryStorageService` plus the `dataSourceForTable` extension used by datasets. */ +export type ExtendedQueryStorageService = QueryStorageService & { + dataSourceForTable(tablePath: string): Promise +} + +// --------------------------------------------------------------------------- +// `hypaware.context-graph` capability shape (re-declared, not imported). +// --------------------------------------------------------------------------- + +export type GraphRow = Record + +/** One T0 contract rule: a read-only SELECT plus a row mapper. */ +export interface ContractRule { + kind: 'node' | 'edge' + type: string + sql: string + toRow(row: Record): GraphRow | null +} + +/** Per-node spec a `toRow` hands `buildNode` (identity + optional display + provenance keys). */ +export interface NodeSpec { + type: string + key: string + label?: string | null + props?: Record + firstSeen: unknown + sourceKeys: Record +} + +/** Per-edge spec a `toRow` hands `buildEdge` (endpoints + relation type + provenance keys). */ +export interface EdgeSpec { + type: string + srcType: string + srcKey: string + dstType: string + dstKey: string + firstSeen: unknown + sourceKeys: Record +} + +/** The row builders the kit hands a contract author (id recipe + provenance live in the graph plugin). */ +export interface GraphRowBuilders { + buildNode(spec: NodeSpec): GraphRow + buildEdge(spec: EdgeSpec): GraphRow +} + +/** The shared authoring kit exposed on the `hypaware.context-graph` capability. */ +export interface GraphKit { + nodeId(type: string, naturalKey: string): string + edgeId(srcId: string, type: string, dstId: string): string + makeRowBuilders(meta: { + sourceDataset: string + projector: string + projectorVersion: number + }): GraphRowBuilders +} + +/** The `hypaware.context-graph` capability value, as this plugin consumes it. */ +export interface ContextGraphCapabilityLike { + registerContract(contract: unknown): void + kit: GraphKit +} + +/** The contract object `registerContract` accepts. */ +export interface GraphContract { + name: string + plugin: string + sourceDataset: string + projector: string + projectorVersion: number + rules: ContractRule[] +} + +// --------------------------------------------------------------------------- +// Plugin-local config. +// --------------------------------------------------------------------------- + +export interface GithubConfig { + repos: string[] + orgs: string[] + ignore: string[] + poll_interval?: string + token_env: string +} + +export interface GithubConfigError { + pointer: string + message: string + errorKind: 'github_config_invalid' +} + +export type GithubConfigResult = + | { ok: true; config: GithubConfig } + | { ok: false; errors: GithubConfigError[] } + +// --------------------------------------------------------------------------- +// Per-repo capture cursor (sidecar state — see LLP 0002 §cursoring). +// --------------------------------------------------------------------------- + +/** + * One repo's poll cursor: `since` high-water marks for the time-windowed + * endpoints (issues/commits/comments) and an ETag map for the listing + * endpoints that lack `since`. Shared by backfill + poll so a backfilled repo's + * poller resumes past history rather than re-fetching it. + */ +export interface RepoCursor { + since?: { issues?: string; commits?: string; comments?: string; pulls?: string } + etag?: Record +} + +export interface CursorState { + schema_version: number + repos: Record +} + +// --------------------------------------------------------------------------- +// GitHub API client surface (capture.js depends on this, not on `fetch`, so +// tests inject a fake). +// --------------------------------------------------------------------------- + +/** A normalized GitHub actor (the `user`/`author` object subset we keep). */ +export interface GithubActor { + login: string + type?: string +} + +export interface GithubClient { + /** Enumerate the repos (`owner/repo`) the token can see in an org. */ + listOrgRepos(org: string): Promise + /** Issues (state=all). PRs are filtered out by the caller via `isPullRequest`. */ + listIssues(owner: string, repo: string, cursor: RepoCursor): Promise + listPullRequests(owner: string, repo: string, cursor: RepoCursor): Promise + listPullRequestFiles(owner: string, repo: string, number: number): Promise + listPullRequestReviews(owner: string, repo: string, number: number): Promise + listPullRequestCommits(owner: string, repo: string, number: number): Promise + listCommits(owner: string, repo: string, cursor: RepoCursor): Promise + listCommitFiles(owner: string, repo: string, sha: string): Promise + listIssueComments(owner: string, repo: string, cursor: RepoCursor): Promise +} + +export interface GithubIssue { + number: number + state?: string + created_at?: string + user?: GithubActor | null + pull_request?: unknown +} + +export interface GithubPull { + number: number + state?: string + merged_at?: string | null + draft?: boolean + created_at?: string + updated_at?: string + user?: GithubActor | null +} + +export interface GithubReview { + id: number + state?: string + submitted_at?: string + user?: GithubActor | null +} + +export interface GithubCommit { + sha: string + author?: GithubActor | null + commit?: { author?: { date?: string } | null } | null +} + +export interface GithubComment { + id: number + created_at?: string + user?: GithubActor | null + /** `.../issues/{number}` — the subject the comment is attached to. */ + issue_url?: string +} diff --git a/src/boot.js b/src/boot.js index 1cfa5db..2d0e7b1 100644 --- a/src/boot.js +++ b/src/boot.js @@ -48,11 +48,50 @@ function pluginSelection(config) { }, } : { dir: path.join(bundledWorkspaceDir, 'local-fs'), config: { exports_dir: config.archive.dir } }, + // Graph plugins: activated for their dataset / contract / command + // registrations so the server can hold a github_events skeleton and + // run the T0 projection + neighbors over its own cache, where the + // forwarded LLM logs also live. context-graph must precede github + // (github requires it); resolveDependencies derives that ordering from + // github's requires.plugins regardless of position in this array. The + // github poll source stays dormant — github registers but never starts + // it, and the wrapped registry below would suppress it anyway. + // @ref LLP 0010#in-kernel-graph [implements] — context-graph + github load into the server kernel; capture is an admin one-shot, not a daemon source + { dir: path.join(bundledWorkspaceDir, 'context-graph'), config: {} }, + { dir: path.join(SERVER_PLUGINS_DIR, 'github'), config: githubSection(config) }, + // The cross-source connector: maps the forwarded ai_gateway_messages into + // the SAME node/edge graph. Pure contract registration (no source, no + // config); requires context-graph. Its bridge-ready Repo/Commit/File keys + // converge by content-addressed id with github's, so LLM sessions and + // GitHub activity join in one graph (hypaware LLP 0032 git-bridge). + // @ref LLP 0010#in-kernel-graph [implements] — load the ai_gateway_messages→graph contract so projection spans both sources + { dir: path.join(bundledWorkspaceDir, 'ai-gateway-graph'), config: {} }, { dir: path.join(SERVER_PLUGINS_DIR, 'control-plane'), config: {} }, ] return { plugins, destination } } +/** + * The `[github]` config section injected into the vendored github source + * plugin. Repo selection comes from server env (HYPSERVER_GITHUB_*); the + * token value never lands in config — the github client reads it from the + * box env by the name in `token_env` at request time. No `poll_interval`: + * capture runs only as the admin-triggered backfill one-shot, never a poll. + * + * @ref LLP 0010#server-pulls-github [implements] — server pulls GitHub directly; repo selection from env, token stays in box env + * @param {ServerConfig} config + * @returns {Record} + */ +function githubSection(config) { + const gh = config.github + return { + orgs: gh.orgs, + repos: gh.repos, + ignore: gh.ignore, + token_env: gh.tokenEnv, + } +} + /** * Boot the kernel for server use: a fresh kernel runtime with the * catalog-backed registry injected, the bundled + server plugin set diff --git a/src/catalog/registry.js b/src/catalog/registry.js index 2fb4c43..b8ba8ac 100644 --- a/src/catalog/registry.js +++ b/src/catalog/registry.js @@ -10,6 +10,19 @@ import { STAMPED_COLUMNS } from './catalog.js' * @import { ArchiveHandle, Catalog } from '../types.d.ts' */ +/** + * Plugins whose datasets are server-side-generated and manage their own cache + * layout + read closures — never wire-ingested. The registry keeps their + * `discoverPartitions`/`createDataSource` instead of synthesizing the date= + * partition reads it uses for forwarded ingest datasets (logs/traces/metrics/ + * ai_gateway_messages, which arrive over the wire and land in date= partitions + * via the mover). `@hypaware/ai-gateway` also ships a createDataSource, so the + * split must be by plugin, not by closure presence. + * + * @ref LLP 0010#self-managed-datasets [implements] — graph datasets keep their own read path; the date-partition synthesis is only for forwarded ingest + */ +const SELF_MANAGED_PLUGINS = new Set(['@hypaware/server', '@hypaware/context-graph', '@hypaware/github']) + /** * Catalog-backed implementation of the kernel's three-method * QueryRegistry interface, injected via @@ -130,9 +143,11 @@ export function createCatalogRegistry({ catalog, getArchive }) { return { registerDataset(dataset) { - if (dataset.plugin === '@hypaware/server' && typeof dataset.createDataSource === 'function' && !catalog.get(dataset.name)) { - // Server-origin datasets with bespoke read paths (the gateway - // registry) keep their own closures and skip the catalog. + if (SELF_MANAGED_PLUGINS.has(dataset.plugin) && typeof dataset.createDataSource === 'function' && !catalog.get(dataset.name)) { + // Server-origin and graph datasets with bespoke read paths (the + // gateway registry; github_events; node/edge) keep their own + // closures and skip the catalog — their cache layout is source=/ + // graph_v1 partitioned, not the date= layout the catalog synthesizes. custom.set(dataset.name, dataset) return } diff --git a/src/config.js b/src/config.js index 2328ebc..dcff5da 100644 --- a/src/config.js +++ b/src/config.js @@ -73,9 +73,31 @@ export function loadServerConfig(env = process.env) { archiveSchedule: env.HYPSERVER_ARCHIVE_SCHEDULE ?? '0 * * * *', archiveCloseLagMs: intEnv(env.HYPSERVER_ARCHIVE_CLOSE_LAG_MS, 3_600_000), maxBodyBytes: intEnv(env.HYPSERVER_MAX_BODY_BYTES, 64 * 1024 * 1024), + // @ref LLP 0010#server-pulls-github [implements] — server-side GitHub capture selection; the token value stays in the box env (read by name at request time), never in config + github: { + orgs: listEnv(env.HYPSERVER_GITHUB_ORGS), + repos: listEnv(env.HYPSERVER_GITHUB_REPOS), + ignore: listEnv(env.HYPSERVER_GITHUB_IGNORE), + tokenEnv: env.HYPSERVER_GITHUB_TOKEN_ENV ?? 'GITHUB_TOKEN', + }, } } +/** + * Split a comma/whitespace-separated env value into a trimmed, non-empty + * list. Absent or blank yields `[]`. + * + * @param {string | undefined} raw + * @returns {string[]} + */ +function listEnv(raw) { + if (!raw) return [] + return raw + .split(',') + .map((s) => s.trim()) + .filter((s) => s.length > 0) +} + /** * @param {string | undefined} raw * @param {number} fallback diff --git a/src/configs/save-pipeline.js b/src/configs/save-pipeline.js index 6ac8704..d994036 100644 --- a/src/configs/save-pipeline.js +++ b/src/configs/save-pipeline.js @@ -149,7 +149,7 @@ async function pinPlugins({ document, bundled, known, pinStateDir, log }) { // Pre-pinned documents (the air-gap escape hatch) are validated, // never re-resolved: an operator copying a pinned document from a // client lock file or another server must not need network. - if (typeof entry.content_hash === 'string' && entry.content_hash.length > 0) { + if (typeof entry.artifact_hash === 'string' && entry.artifact_hash.length > 0) { const problem = validatePrePinned(entry) if (problem) { return { ok: false, error: 'plugin_pin_invalid', detail: `${entry.name}: ${problem}` } @@ -217,19 +217,23 @@ async function resolveAndPin({ entry, pinStateDir, log }) { log.info('config.plugin_pinned', { plugin: entry.name, version: fetched.manifest.version, - content_hash: fetched.contentHash, + // The served pin field is `artifact_hash` (the content hash the client + // verifies the staged artifact against); keep the log key in step with it. + artifact_hash: fetched.contentHash, }) return { ok: true, - // The same fields the client lock file records: the config names - // exactly one artifact, and a tampered artifact fails the hash - // check client-side. + // The served config carries the kernel's config-wire pin fields, not + // a lock-file entry: `version` + `artifact_hash` (the content hash + // the client verifies against the staged artifact at apply — + // hypaware config/apply_deps.js). `source` is deliberately NOT set + // here so the entry keeps the operator's raw source *string*; the + // client re-resolves it. Storing the resolved object spec (kind/raw/ + // path) would make the served document fail the kernel's own config + // shape parser, which requires `source` to be a string. pin: { version: fetched.manifest.version, - source, - ...(fetched.resolvedRef ? { resolved_ref: fetched.resolvedRef } : {}), - content_hash: fetched.contentHash, - manifest_hash: fetched.manifestHash, + artifact_hash: fetched.contentHash, }, metadata: metadataFromManifest(fetched.manifest), } @@ -268,11 +272,15 @@ function validatePrePinned(entry) { if (typeof entry.version !== 'string' || entry.version.length === 0) { return 'pre-pinned entries must carry a version' } - if (!entry.source || typeof entry.source !== 'object' || typeof entry.source.kind !== 'string') { - return 'pre-pinned entries must carry a resolved source spec (object with kind)' + if (typeof entry.artifact_hash !== 'string' || entry.artifact_hash.length === 0) { + return 'pre-pinned entries must carry an artifact_hash' } - if (typeof entry.manifest_hash !== 'string' || entry.manifest_hash.length === 0) { - return 'pre-pinned entries must carry a manifest_hash' + // `source` is optional (first-party names re-resolve from the name), + // but when present it is the operator's raw source string the client + // re-resolves — never the resolved object spec. parseConfigShape has + // already enforced string-ness; this keeps the contract local + loud. + if (entry.source !== undefined && (typeof entry.source !== 'string' || entry.source.length === 0)) { + return 'pre-pinned entries must carry a string source when present' } return null } diff --git a/src/daemon.js b/src/daemon.js index a750d6d..c4b53f6 100644 --- a/src/daemon.js +++ b/src/daemon.js @@ -5,7 +5,19 @@ import fs from 'node:fs' import { getLogger, installObservability } from 'hypaware/core' import { executeQuerySql } from 'hypaware/core/query' -import { cronMatches, maintainCache, normalizeMaintenanceConfig } from './kernel/shim.js' +import { + cronMatches, + maintainCache, + normalizeMaintenanceConfig, + projectGraph, + queryNeighbors, + requireGraphRuntime, +} from './kernel/shim.js' +// Vendored github source plugin (LLP 0010): imported by the SAME in-repo path +// the kernel loader uses, so this is the module instance activate() populated. +import { requireGithubRuntime } from '../plugins/github/src/runtime.js' +import { runCaptureTick } from '../plugins/github/src/tick.js' +import { githubEventsTablePath } from '../plugins/github/src/dataset.js' import { loadServerConfig } from './config.js' import { bootServerKernel } from './boot.js' import { createCatalog } from './catalog/catalog.js' @@ -136,6 +148,46 @@ export async function runServerDaemon(opts = {}) { return { columns: result.columns, rows: result.rows, datasets: result.datasets } } + // Server-side graph operations (LLP 0010): the graph analogue of the + // mover/archive/eviction escape hatches above. github capture is a + // one-shot (the poll source stays dormant); projection and neighbors run + // the graph plugin's own pure functions over the same kernel query + + // storage handles executeSql uses, so they read the forwarded LLM logs + // and github_events in one cache. + // @ref LLP 0010#in-kernel-graph [implements] — capture one-shot + projection/neighbors over the server kernel, not the SQL-only attach + services.githubBackfill = async (opts = {}) => { + const report = await runCaptureTick(requireGithubRuntime(), { mode: 'backfill', only: opts.repos }) + // appendRows buffers in the cache writer; flush so the freshly captured + // github_events are visible to graph project / admin SQL on this same + // kernel — the mover flushes its spooled writes the same way. + await storage.flushTable(githubEventsTablePath(storage), { reason: 'server_github_backfill', force: true }) + return report + } + services.graphProject = async (opts = {}) => { + const all = requireGraphRuntime().registry.list() + const contracts = opts.source ? all.filter((c) => c.sourceDataset === opts.source) : all + return projectGraph({ + query: runtime.query, + storage: runtime.storage, + contracts, + config: /** @type {any} */ (hypConfig), + dryRun: opts.dryRun === true, + }) + } + services.graphNeighbors = async (args) => { + return queryNeighbors({ + query: runtime.query, + storage: runtime.storage, + config: /** @type {any} */ (hypConfig), + seed: args.node, + depth: args.depth, + type: args.type, + edgeTypes: args.edgeTypes, + direction: args.direction, + limit: args.limit, + }) + } + // ----- Loops ----- /** @type {NodeJS.Timeout[]} */ const timers = [] @@ -178,9 +230,10 @@ export async function runServerDaemon(opts = {}) { stopped = true for (const timer of timers) clearInterval(timer) // Drain what we can on the way down; spool durability means an - // unclean exit here loses nothing either way. + // unclean exit here loses nothing either way. drain() (not tick()) + // so a background tick already in flight can't make this a no-op. try { - await mover.tick() + await mover.drain() } catch { // shutdown is best-effort } diff --git a/src/http/routes-admin.js b/src/http/routes-admin.js index 64a9b99..a055a74 100644 --- a/src/http/routes-admin.js +++ b/src/http/routes-admin.js @@ -92,7 +92,9 @@ export async function handleAdmin(ctx) { // Operational escape hatches: run the mover / archive export / // eviction immediately instead of waiting for their schedules. if (url.pathname === '/v1/admin/mover/run' && req.method === 'POST') { - const moved = await services.mover.tick() + // drain(), not tick(): "run now" must guarantee every already-spooled + // row is committed before we ack, even if a background tick is mid-pass. + const moved = await services.mover.drain() return sendJson(res, 200, { moved }) } if (url.pathname === '/v1/admin/archive/run' && req.method === 'POST') { @@ -105,9 +107,88 @@ export async function handleAdmin(ctx) { return sendJson(res, 200, report) } + // Server-side graph operations: pull GitHub into github_events, run the T0 + // projection, and walk neighbors — the graph analogue of the mover/archive + // escape hatches, executed on the server's own kernel rather than the + // SQL-only attach surface. + // @ref LLP 0010#admin-operations [implements] — graph capture/project/neighbors are admin operations on the server kernel, the precedent set by LLP 0006 + if (url.pathname === '/v1/admin/github/backfill' && req.method === 'POST') { + const body = await parseJson(req, services.config.maxBodyBytes) ?? {} + const repos = normalizeRepoList(body.repos) + if (repos === null) { + return sendJson(res, 400, { error: 'invalid_repos', detail: 'repos must be an array of "owner/repo" strings' }) + } + try { + const report = await services.githubBackfill(repos.length > 0 ? { repos } : {}) + return sendJson(res, 200, report) + } catch (err) { + return sendJson(res, 400, { error: 'github_backfill_failed', detail: errText(err) }) + } + } + if (url.pathname === '/v1/admin/graph/project' && req.method === 'POST') { + const body = await parseJson(req, services.config.maxBodyBytes) ?? {} + try { + const report = await services.graphProject({ + source: typeof body.source === 'string' ? body.source : undefined, + dryRun: body.dry_run === true, + }) + return sendJson(res, 200, report) + } catch (err) { + return sendJson(res, 400, { error: 'graph_project_failed', detail: errText(err) }) + } + } + if (url.pathname === '/v1/admin/graph/neighbors' && req.method === 'POST') { + const body = await parseJson(req, services.config.maxBodyBytes) ?? {} + if (typeof body.node !== 'string' || body.node.length === 0) { + return sendJson(res, 400, { error: 'node_required' }) + } + /** @type {'out' | 'in' | 'both' | undefined} */ + const direction = body.direction === 'in' || body.direction === 'both' || body.direction === 'out' ? body.direction : undefined + try { + const result = await services.graphNeighbors({ + node: body.node, + depth: typeof body.depth === 'number' ? body.depth : undefined, + type: typeof body.type === 'string' ? body.type : undefined, + edgeTypes: Array.isArray(body.edge_types) ? body.edge_types.map(String) : undefined, + direction, + limit: typeof body.limit === 'number' ? body.limit : undefined, + }) + // A failed traversal (seed not found / ambiguous) is a 400 with the + // candidate detail the query returns, mirroring handleQuery. + return sendJson(res, result && result.ok === false ? 400 : 200, result) + } catch (err) { + return sendJson(res, 400, { error: 'graph_neighbors_failed', detail: errText(err) }) + } + } + return sendJson(res, 404, { error: 'unknown_path' }) } +/** + * Validate an optional `repos` body field for github backfill: absent → `[]` + * (whole configured selection), otherwise an array of `owner/repo` strings. + * Returns `null` on any malformed entry so the caller can 400. + * + * @param {unknown} raw + * @returns {string[] | null} + */ +function normalizeRepoList(raw) { + if (raw === undefined || raw === null) return [] + if (!Array.isArray(raw)) return null + /** @type {string[]} */ + const out = [] + for (const item of raw) { + if (typeof item !== 'string' || !/^[^/\s]+\/[^/\s]+$/.test(item)) return null + out.push(item) + } + return out +} + +/** @param {unknown} err @returns {string} */ +function errText(err) { + return err instanceof Error ? err.message : String(err) +} + /** * `POST /v1/query` — remote SQL executed by the server's kernel query * plane, transparently spanning the cache tier and the Iceberg diff --git a/src/ingest/mover.js b/src/ingest/mover.js index 3cf6dc3..d6d5e7d 100644 --- a/src/ingest/mover.js +++ b/src/ingest/mover.js @@ -20,30 +20,59 @@ * }} args */ export function createMover({ spool, catalog, storage, log }) { - let running = false + /** + * The in-flight pass, if any. A single slot serializes passes so two + * never drain the same spool file concurrently — and lets a guaranteed + * caller wait one out before starting its own. + * @type {Promise | null} + */ + let inflight = null + + /** One pass over every currently-pending spool file. */ + async function pass() { + let moved = 0 + for (const pending of spool.pending()) { + try { + moved += await drainFile(pending) + } catch (err) { + const message = err instanceof Error ? err.message : String(err) + log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message }) + } + } + return moved + } + + /** Start a pass and own clearing the in-flight slot when it settles. */ + function startPass() { + const p = pass() + inflight = p + void p.finally(() => { if (inflight === p) inflight = null }) + return p + } return { /** - * One mover pass over all pending spool files. Returns the number - * of rows committed to the cache. + * Opportunistic pass for the background timer: if one is already + * running, skip rather than queue — a dropped timer tick costs + * nothing because spool durability means the next tick still drains. + * Returns the number of rows committed to the cache (0 if skipped). */ async tick() { - if (running) return 0 - running = true - let moved = 0 - try { - for (const pending of spool.pending()) { - try { - moved += await drainFile(pending) - } catch (err) { - const message = err instanceof Error ? err.message : String(err) - log.warn('mover.drain_failed', { dataset: pending.dataset, gateway: pending.gateway, message }) - } - } - } finally { - running = false - } - return moved + if (inflight) return 0 + return startPass() + }, + + /** + * Guaranteed drain for callers that must observe every row spooled + * *before* this call committed before it returns — the admin + * `mover/run` escape hatch and shutdown. A skip would be a silent + * lie there, so wait out any in-flight pass (whose `pending()` + * snapshot may predate the caller's rows) and then run a fresh pass + * that is guaranteed to see them. + */ + async drain() { + while (inflight) await inflight.catch(() => {}) + return startPass() }, } diff --git a/src/kernel/shim.js b/src/kernel/shim.js index b8a4d9b..91d20bc 100644 --- a/src/kernel/shim.js +++ b/src/kernel/shim.js @@ -47,3 +47,21 @@ export const fetchPlugin = pluginFetch.fetchPlugin /** Bundled plugin workspace shipped inside the hypaware package. */ export const bundledWorkspaceDir = path.join(kernelRoot, 'hypaware-core', 'plugins-workspace') + +// Server-side graph: the @hypaware/context-graph plugin ships inside the +// bundled workspace. These are plugin functions (pure projection + traversal, +// plus the contract-registry singleton accessor) rather than kernel host +// internals — but the server invokes them the same disciplined way, through +// this one module. Crucially, they are anchored on the SAME bundledWorkspaceDir +// the loader used to activate the plugin, so the module URLs are identical and +// `requireGraphRuntime()` returns the very registry singleton that github's +// activate() populated. Importing here doesn't require activation; the accessor +// only throws if called before boot completes (we call it at request time). +// @ref LLP 0010#in-kernel-graph [implements] — invoke the graph plugin's own pure functions over the server kernel's query+storage handles +const contextGraphSrc = path.join(bundledWorkspaceDir, 'context-graph', 'src') +const ctxGraphProject = await import(pathToFileURL(path.join(contextGraphSrc, 'project.js')).href) +const ctxGraphQuery = await import(pathToFileURL(path.join(contextGraphSrc, 'query.js')).href) +const ctxGraphRuntime = await import(pathToFileURL(path.join(contextGraphSrc, 'runtime.js')).href) +export const projectGraph = ctxGraphProject.projectGraph +export const queryNeighbors = ctxGraphQuery.queryNeighbors +export const requireGraphRuntime = ctxGraphRuntime.requireGraphRuntime diff --git a/src/types.d.ts b/src/types.d.ts index 15b173c..7275c75 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -50,6 +50,24 @@ export interface ServerConfig { archiveSchedule: string archiveCloseLagMs: number maxBodyBytes: number + /** Server-side GitHub capture selection (LLP 0010). */ + github: GithubServerConfig +} + +/** + * Selection for the in-kernel github source plugin (LLP 0010). The GitHub + * token is NOT here: `tokenEnv` names the box env var the github client + * reads at request time, so the secret never lands in config. + */ +export interface GithubServerConfig { + /** Org logins to enumerate every repo from. */ + orgs: string[] + /** Explicit `owner/repo` entries. */ + repos: string[] + /** `owner/repo` entries to exclude (forward-only ignore). */ + ignore: string[] + /** Name of the box env var holding the GitHub token (default `GITHUB_TOKEN`). */ + tokenEnv: string } export interface ArchiveConfig { @@ -190,10 +208,44 @@ export interface ServerServices { archiveScheduler: ReturnType eviction: ReturnType executeSql(query: string): Promise + /** + * Pull GitHub history for the configured selection into `github_events` + * (admin one-shot; LLP 0010). `repos` narrows to a subset of the selection. + */ + githubBackfill(opts?: { repos?: string[] }): Promise + /** Run the T0 projection over the registered contracts (LLP 0010). */ + graphProject(opts?: { source?: string; dryRun?: boolean }): Promise + /** Walk the node/edge graph from a seed node (LLP 0010). */ + graphNeighbors(args: GraphNeighborsArgs): Promise /** Invoked by the control plane once the HTTP port is bound. */ onListening?: (port: number) => void } +export interface GithubBackfillReport { + repos: number + events: number + errors: Array<{ repo: string; error: string }> +} + +export interface GraphProjectReport { + nodes: number + edges: number + nodesWritten: number + edgesWritten: number +} + +export interface GraphNeighborsArgs { + node: string + depth?: number + type?: string + edgeTypes?: string[] + direction?: 'out' | 'in' | 'both' + limit?: number +} + +/** `TraversalOk | TraversalErr` from the context-graph query (LLP 0026). */ +export type GraphTraversalResult = { ok: boolean } & Record + export interface RouteContext { req: IncomingMessage res: ServerResponse diff --git a/test/smoke.js b/test/smoke.js index 5f63bc1..68cb549 100644 --- a/test/smoke.js +++ b/test/smoke.js @@ -12,6 +12,10 @@ import assert from 'node:assert' import fs from 'node:fs' import os from 'node:os' import path from 'node:path' +import { execFile } from 'node:child_process' +import { promisify } from 'node:util' + +const execFileP = promisify(execFile) const tmpRoot = fs.mkdtempSync(path.join(os.tmpdir(), 'hypaware-server-smoke-')) process.env.HYP_HOME = path.join(tmpRoot, 'hyp-home') @@ -43,10 +47,25 @@ const env = { // high-water gate first, so this does not perturb other checks. HYPSERVER_BYTE_BURST_PER_GATEWAY: String(8 * 1024), HYPSERVER_BYTE_RATE_PER_GATEWAY: '1', + // Server-side graph (LLP 0010): one repo in the selection so the + // github-backfill check below has something to capture (via the injected + // in-memory client, no network). + HYPSERVER_GITHUB_REPOS: 'acme/widgets', } const daemon = await runServerDaemon({ env, installSignalHandlers: false }) const base = `http://127.0.0.1:${daemon.port()}` +// The vendored github plugin's runtime singleton: the graph checks inject an +// in-memory GitHub client here so `github backfill` exercises the real cache +// append path with no network. +const { requireGithubRuntime } = await import('../plugins/github/src/runtime.js') +// Capture internals the graph checks drive directly: a `poll`-mode tick (the +// admin surface only ever runs `backfill`) and the pure repo-resolution / +// per-repo-isolation functions. +const { runCaptureTick } = await import('../plugins/github/src/tick.js') +const { resolveRepos, captureRepos } = await import('../plugins/github/src/capture.js') +// The server-local admin CLI, exercised end-to-end against this daemon. +const adminCliPath = path.join(import.meta.dirname, '..', 'bin', 'admin.js') let passed = 0 /** @@ -101,6 +120,84 @@ async function runMover() { assert.equal(r.status, 200) } +/** + * In-memory GitHub client (no network) matching the plugin's GithubClient + * surface. `repos` is keyed by lowercased `owner/repo`; only the fields the + * capture reads are returned. + * + * Beyond returning fixtures it MIRRORS github_client.js's cursor contract — the + * high-level fake stands in for the real fetch client, so the time-windowed + * listings (issues / pulls / commits / comments) return only rows strictly + * newer than the per-repo `since` high-water and advance it, exactly as a real + * `since=` / `If-None-Match` request would. That makes an unchanged poll an + * empty listing, so the incremental checks below can assert "no new activity → + * zero events". `.calls` counts PR sub-resource fetches, so a poll test can + * prove descent happens only when `changedSince` admits a PR. A repo may set + * `throwOnList` to force a per-repo capture failure (the error-isolation test). + * + * @param {{ orgRepos?: Record, repos?: Record }} data + */ +function fakeClient(data) { + const orgRepos = data.orgRepos ?? {} + const repos = data.repos ?? {} + const get = (/** @type {string} */ owner, /** @type {string} */ repo) => repos[`${owner}/${repo}`.toLowerCase()] ?? {} + const calls = { prFiles: 0, prReviews: 0, prCommits: 0 } + + const issueTs = (/** @type {any} */ r) => (typeof r.updated_at === 'string' ? r.updated_at : typeof r.created_at === 'string' ? r.created_at : null) + const commitTs = (/** @type {any} */ r) => r?.commit?.committer?.date ?? r?.commit?.author?.date ?? null + // Return rows newer than `since`, and advance the cursor high-water to the + // newest row seen — the advanceSince / advanceSinceCommit contract. + const windowed = (/** @type {any[]} */ rows, /** @type {any} */ cursor, /** @type {string} */ key, /** @type {(r: any) => string | null} */ tsOf) => { + const since = cursor.since?.[key] + let max = since + for (const r of rows) { const t = tsOf(r); if (t && (!max || t > max)) max = t } + if (max) (cursor.since ??= {})[key] = max + return rows.filter((r) => { const t = tsOf(r); return !since || (t != null && t > since) }) + } + const guard = (/** @type {any} */ r) => { if (r.throwOnList) throw new Error(r.throwOnList) } + + return { + calls, + async listOrgRepos(/** @type {string} */ org) { return orgRepos[org] ?? [] }, + async listIssues(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + return windowed(r.issues ?? [], cursor, 'issues', issueTs) + }, + async listPullRequests(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + // pulls are etag-gated in the real client and capture's advancePullsHigh + // owns this high-water, so here just drop pulls at/under it. + const since = cursor.since?.pulls + return (r.pulls ?? []).filter((/** @type {any} */ pr) => { const t = issueTs(pr); return !since || (t != null && t > since) }) + }, + async listPullRequestFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { calls.prFiles++; return get(owner, repo).prFiles?.[n] ?? [] }, + async listPullRequestReviews(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { calls.prReviews++; return get(owner, repo).prReviews?.[n] ?? [] }, + async listPullRequestCommits(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {number} */ n) { calls.prCommits++; return get(owner, repo).prCommits?.[n] ?? [] }, + async listCommits(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + return windowed(r.commits ?? [], cursor, 'commits', commitTs) + }, + async listCommitFiles(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {string} */ sha) { return get(owner, repo).commitFiles?.[sha] ?? [] }, + async listIssueComments(/** @type {string} */ owner, /** @type {string} */ repo, /** @type {any} */ cursor) { + const r = get(owner, repo); guard(r) + return windowed(r.comments ?? [], cursor, 'comments', issueTs) + }, + } +} + +/** + * Run the server-local admin CLI (`bin/admin.js`) against this daemon, returning + * its parsed stdout JSON. Exercises the real flag→body mapping end-to-end. + * + * @param {string[]} args + */ +async function adminCli(args) { + const { stdout } = await execFileP(process.execPath, [adminCliPath, ...args], { + env: { ...process.env, HYPSERVER_URL: base, HYPSERVER_ADMIN_TOKEN: admin.token }, + }) + return JSON.parse(stdout) +} + try { // ----- Config authoring (LLP 0009) ----- // A local-dir plugin exercises the real pinning path (kernel @@ -192,10 +289,12 @@ try { const getConfig = await call('GET', '/v1/admin/configs/smoke-fleet', { token: admin.token }) const pinnedEntry = getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === 'hypaware-plugin-smoke') - check('stored config pins plugin version + content hash', - pinnedEntry.version === '1.2.3' && typeof pinnedEntry.content_hash === 'string' && pinnedEntry.content_hash.length === 64, pinnedEntry) + check('stored config pins plugin version + artifact hash', + pinnedEntry.version === '1.2.3' && typeof pinnedEntry.artifact_hash === 'string' && pinnedEntry.artifact_hash.length === 64, pinnedEntry) + check('pinned source stays the operator\'s raw string (client re-resolves it)', + pinnedEntry.source === pluginDir, pinnedEntry) check('bundled plugin entries stay unpinned', - getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').content_hash === undefined) + getConfig.body.document.plugins.find((/** @type {any} */ p) => p.name === '@hypaware/central').artifact_hash === undefined) const saveAgain = await call('PUT', '/v1/admin/configs/smoke-fleet', { token: admin.token, json: fleetConfig }) check('identical re-save is revisionless', saveAgain.status === 200 && saveAgain.body.revision_created === false && saveAgain.body.etag === configEtag1) @@ -379,7 +478,10 @@ try { // present — mirror that here. const proxyRow = { gateway_id: 'client-local-gw', - schema_version: 4, + schema_version: 6, + // session_id is the non-null partition key (schema v6, LLP 0030); + // conversation_id rides along nullable. Both present here. + session_id: 'sess-1', conversation_id: 'conv-1', provider: 'smoke', conversation_started_at: new Date().toISOString(), @@ -396,7 +498,7 @@ try { token: gw2.jwt, ndjson: JSON.stringify(proxyRow) + '\n', }) - check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202) + check('proxy signal maps to ai_gateway_messages', proxyIngest.status === 202, proxyIngest.body) await runMover() const proxyCount = await sql('SELECT COUNT(*) AS n FROM ai_gateway_messages') check('proxy rows landed', Number(proxyCount.rows[0].n) === 1, proxyCount.rows) @@ -520,6 +622,196 @@ try { const remaining = fs.existsSync(cacheLogsDir) ? fs.readdirSync(cacheLogsDir).filter((d) => d.startsWith('date=')) : [] check('cache day partition actually deleted', remaining.length === 0, remaining) + // ----- Server-side graph: capture, project, traverse (LLP 0010) ----- + // Inject an in-memory GitHub client into the activated github plugin's + // runtime singleton, so `github backfill` drives the real cache append + // path with no network. The fixture is one repo with an issue, a PR (one + // changed file + a review) and a commit (touching that same file) — enough + // to mint every bridge-ready node type and a connected edge set. + const widgetsFixture = { + repos: { + 'acme/widgets': { + issues: [{ number: 1, state: 'open', created_at: '2026-01-01T00:00:00Z', user: { login: 'alice', type: 'User' } }], + pulls: [{ number: 2, state: 'open', merged_at: null, created_at: '2026-01-02T00:00:00Z', updated_at: '2026-01-02T00:00:00Z', user: { login: 'bob', type: 'User' } }], + prFiles: { 2: ['src/app.js'] }, + prReviews: { 2: [{ id: 100, state: 'APPROVED', submitted_at: '2026-01-03T00:00:00Z', user: { login: 'carol', type: 'User' } }] }, + commits: [{ sha: 'a'.repeat(40), author: { login: 'alice', type: 'User' }, commit: { author: { date: '2026-01-01T00:00:00Z' } } }], + commitFiles: { ['a'.repeat(40)]: ['src/app.js'] }, + }, + }, + } + requireGithubRuntime().clientFactory = () => fakeClient(widgetsFixture) + + const backfill = await call('POST', '/v1/admin/github/backfill', { token: admin.token, json: {} }) + check('github backfill captured events into the server cache', backfill.status === 200 && backfill.body.events > 0 && backfill.body.repos === 1, backfill.body) + check('github backfill reported no per-repo errors', Array.isArray(backfill.body.errors) && backfill.body.errors.length === 0, backfill.body.errors) + + // The cursor sidecar round-trips through disk (tick.js writeCursors → + // cursors.js), and backfill advances the pulls high-water (advancePullsHigh) + // even though it re-fetches full history — so the poll below resumes past it. + const cursorsPath = path.join(requireGithubRuntime().stateDir, 'github-cursors.json') + const cursorAfterBackfill = JSON.parse(fs.readFileSync(cursorsPath, 'utf8')) + check('github backfill persisted the per-repo cursor high-water (sidecar round-trip)', + cursorAfterBackfill.schema_version === 1 && cursorAfterBackfill.repos['acme/widgets']?.since?.pulls === '2026-01-02T00:00:00Z', + cursorAfterBackfill) + + const ghCount = await sql('SELECT COUNT(*) AS n FROM github_events') + check('github_events rows queryable on the server', Number(ghCount.rows[0].n) > 0, ghCount.rows) + + // dry_run computes the would-be graph but writes nothing — the safe-preview + // contract. Run it before the first real projection so "writes nothing" is + // observable as a still-empty node table. + const dryRun = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: { dry_run: true } }) + check('graph project dry_run computes nodes but writes none', + dryRun.status === 200 && dryRun.body.nodes > 0 && dryRun.body.nodesWritten === 0 && dryRun.body.edgesWritten === 0, dryRun.body) + const preProjectNodes = await sql('SELECT COUNT(*) AS n FROM node') + check('graph project dry_run materialized nothing', Number(preProjectNodes.rows[0].n) === 0, preProjectNodes.rows) + + const project1 = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: {} }) + check('graph project materialized nodes', project1.status === 200 && project1.body.nodes > 0, project1.body) + check('graph project materialized edges', project1.body.edges > 0, project1.body) + check('graph project wrote fresh node + edge rows', project1.body.nodesWritten > 0 && project1.body.edgesWritten > 0, project1.body) + + const project2 = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: {} }) + check('graph project is idempotent (re-run writes nothing new)', project2.body.nodesWritten === 0 && project2.body.edgesWritten === 0, project2.body) + + // --source scopes projection to one source's contracts: github_events still + // projects nodes; a source no contract claims projects nothing (the filter + // discriminates, not a silent whole-graph run). + const projectGithub = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: { source: 'github_events' } }) + check('graph project --source github_events projects that source', projectGithub.status === 200 && projectGithub.body.nodes > 0, projectGithub.body) + const projectNoSource = await call('POST', '/v1/admin/graph/project', { token: admin.token, json: { source: '__no_such_source__' } }) + check('graph project --source with no matching contract projects nothing', + projectNoSource.body.nodes === 0 && projectNoSource.body.nodesWritten === 0, projectNoSource.body) + + const nodeCount = await sql('SELECT COUNT(*) AS n FROM node') + check('node dataset queryable on the server', Number(nodeCount.rows[0].n) > 0, nodeCount.rows) + + const nbBoth = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both' } }) + check('graph neighbors resolves the Repo seed', nbBoth.status === 200 && nbBoth.body.ok === true, nbBoth.body) + check('graph neighbors reaches the repo activity', Number(nbBoth.body.reachable) > 0, nbBoth.body) + + // The filter parameters most prone to a wrong-direction / off-by-one bug: + // direction, edge_types, and limit must each change the reachable set the + // way the traversal promises (a subset, a type-restriction, a truncation). + // From the Repo seed every edge points inward (Commit/Issue/PR → Repo), so + // `in` reaches the activity while `out` reaches nothing — an asymmetry that + // only holds if direction is actually honored, not ignored. + const nbOut = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'out' } }) + const nbIn = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'in' } }) + check('graph neighbors honors direction (in and out reach different sets, both bounds them)', + nbOut.body.ok === true && nbIn.body.ok === true + && nbOut.body.neighbors.every((/** @type {any} */ n) => n.direction === 'out') + && nbIn.body.neighbors.every((/** @type {any} */ n) => n.direction === 'in') + && Number(nbOut.body.reachable) !== Number(nbIn.body.reachable) + && Number(nbOut.body.reachable) <= Number(nbBoth.body.reachable) + && Number(nbIn.body.reachable) <= Number(nbBoth.body.reachable), + { out: nbOut.body.reachable, in: nbIn.body.reachable, both: nbBoth.body.reachable }) + + const edgeTypes = [...new Set(nbBoth.body.neighbors.map((/** @type {any} */ n) => n.edge_type))] + const nbFiltered = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both', edge_types: [edgeTypes[0]] } }) + check('graph neighbors edge_types filter restricts traversal to that type', + nbFiltered.body.ok === true && nbFiltered.body.neighbors.every((/** @type {any} */ n) => n.edge_type === edgeTypes[0]) && Number(nbFiltered.body.reachable) <= Number(nbBoth.body.reachable), + { filtered: nbFiltered.body.reachable, type: edgeTypes[0] }) + + const nbBogus = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, edge_types: ['__no_such_edge__'] } }) + check('graph neighbors with an unknown edge_type reaches nothing', nbBogus.body.ok === true && Number(nbBogus.body.reachable) === 0, nbBogus.body) + + const nbLimited = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 2, direction: 'both', limit: 1 } }) + check('graph neighbors limit truncates the returned set but reports full reachable', + nbLimited.body.neighbors.length <= 1 && Number(nbLimited.body.reachable) === Number(nbBoth.body.reachable) && nbLimited.body.truncated === (Number(nbBoth.body.reachable) > 1), + { returned: nbLimited.body.neighbors.length, reachable: nbLimited.body.reachable, truncated: nbLimited.body.truncated }) + + const nbDepth1 = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'acme/widgets', type: 'Repo', depth: 1, direction: 'both' } }) + check('graph neighbors depth bounds the walk (depth 1 ⊆ depth 2)', + Number(nbDepth1.body.reachable) >= 1 && Number(nbDepth1.body.reachable) <= Number(nbBoth.body.reachable), + { d1: nbDepth1.body.reachable, d2: nbBoth.body.reachable }) + + const missing = await call('POST', '/v1/admin/graph/neighbors', { token: admin.token, json: { node: 'no/such-seed-xyz' } }) + check('graph neighbors 400s on an unknown seed', missing.status === 400 && missing.body.ok === false, missing.body) + + const badRepos = await call('POST', '/v1/admin/github/backfill', { token: admin.token, json: { repos: ['not-a-slug'] } }) + check('github backfill rejects a malformed repo slug', badRepos.status === 400 && badRepos.body.error === 'invalid_repos', badRepos.body) + + // ----- Admin CLI wrappers (bin/admin.js): flag→body mapping, end-to-end ----- + // The HTTP routes above are driven directly; the docker-exec CLI glue + // (--edge-type → edge_types:[x], Number() on --depth, --dry-run, positional + // repo) is otherwise uncovered, so spawn it against this daemon. + const cliBackfill = await adminCli(['github-backfill', 'acme/widgets']) + check('admin CLI github-backfill maps a positional repo and captures', + cliBackfill.repos === 1 && cliBackfill.events > 0 && cliBackfill.errors.length === 0, cliBackfill) + const cliProjectDry = await adminCli(['graph-project', '--dry-run']) + check('admin CLI graph-project --dry-run maps to a no-write projection', cliProjectDry.nodesWritten === 0, cliProjectDry) + const cliNeighbors = await adminCli(['graph-neighbors', 'acme/widgets', '--type', 'Repo', '--depth', '2', '--direction', 'both']) + check('admin CLI graph-neighbors maps flags and resolves the seed', + cliNeighbors.ok === true && Number(cliNeighbors.reachable) > 0, cliNeighbors) + + // ----- Repo resolution + per-repo error isolation (capture.js) ----- + // resolveRepos: repos ∪ org-enumeration − ignore, lowercased/deduped/sorted. + const resolved = await resolveRepos( + { repos: ['Acme/Widgets'], orgs: ['acme'], ignore: ['acme/ignored'], token_env: 'GITHUB_TOKEN' }, + fakeClient({ orgRepos: { acme: ['acme/widgets', 'acme/ignored', 'acme/other'] } }), + nullLog, + ) + check('resolveRepos unions repos+orgs, drops ignore, and lowercases/dedupes/sorts', + JSON.stringify(resolved) === JSON.stringify(['acme/other', 'acme/widgets']), resolved) + + // captureRepos: one repo throwing must land in errors[] without aborting the + // tick — the healthy repos around it still capture. + /** @type {Record[]} */ + const isoCaptured = [] + const isoReport = await captureRepos({ + client: fakeClient({ + repos: { + 'ok/one': { issues: [{ number: 1, state: 'open', created_at: '2026-01-01T00:00:00Z', user: { login: 'a', type: 'User' } }] }, + 'bad/two': { throwOnList: 'boom' }, + 'ok/three': { commits: [{ sha: 'c'.repeat(40), author: { login: 'x', type: 'User' }, commit: { author: { date: '2026-01-01T00:00:00Z' } } }] }, + }, + }), + config: { repos: ['ok/one', 'bad/two', 'ok/three'], orgs: [], ignore: [], token_env: 'GITHUB_TOKEN' }, + cursors: { schema_version: 1, repos: {} }, + append: async (rows) => { isoCaptured.push(...rows) }, + log: nullLog, + mode: 'backfill', + }) + check('captureRepos isolates a per-repo failure into errors[] without aborting the tick', + isoReport.repos === 3 && isoReport.errors.length === 1 && isoReport.errors[0].repo === 'bad/two', isoReport) + check('captureRepos still captures the healthy repos around a failure', + isoReport.events > 0 && isoCaptured.some((r) => r.repo === 'ok/one') && isoCaptured.some((r) => r.repo === 'ok/three'), + { events: isoReport.events }) + + // ----- Incremental poll: cursor advancement (capture.js, cursors.js) ----- + // The admin surface only runs backfill, so drive a `poll` tick directly. With + // the cursor at the backfill high-water, a no-change poll captures nothing and + // skips PR descent; a poll whose PR/commit advanced past it captures only the + // new rows and moves the high-water on. (A regression in the cursor math or + // the changedSince/toDescend gate would otherwise ship green.) + const pollSame = fakeClient(widgetsFixture) + requireGithubRuntime().clientFactory = () => pollSame + const pollNoop = await runCaptureTick(requireGithubRuntime(), { mode: 'poll' }) + check('poll tick with no new activity captures zero events', pollNoop.events === 0 && pollNoop.errors.length === 0, pollNoop) + check('poll tick skips PR sub-resource descent when changedSince admits no PR', + pollSame.calls.prFiles === 0 && pollSame.calls.prReviews === 0 && pollSame.calls.prCommits === 0, pollSame.calls) + + const pollAdvanced = fakeClient({ + repos: { + 'acme/widgets': { + pulls: [{ number: 2, state: 'open', merged_at: null, created_at: '2026-01-02T00:00:00Z', updated_at: '2026-02-01T00:00:00Z', user: { login: 'bob', type: 'User' } }], + prFiles: { 2: ['src/v2.js'] }, + prReviews: { 2: [{ id: 101, state: 'APPROVED', submitted_at: '2026-02-01T00:00:00Z', user: { login: 'carol', type: 'User' } }] }, + commits: [{ sha: 'b'.repeat(40), author: { login: 'alice', type: 'User' }, commit: { author: { date: '2026-02-01T00:00:00Z' } } }], + commitFiles: { ['b'.repeat(40)]: ['src/v2.js'] }, + }, + }, + }) + requireGithubRuntime().clientFactory = () => pollAdvanced + const pollNew = await runCaptureTick(requireGithubRuntime(), { mode: 'poll' }) + check('poll tick captures only the new activity once the high-water advances', pollNew.events > 0 && pollNew.errors.length === 0, pollNew) + check('poll tick descends PR sub-resources when changedSince admits the PR', pollAdvanced.calls.prFiles > 0, pollAdvanced.calls) + const cursorAfterPoll = JSON.parse(fs.readFileSync(cursorsPath, 'utf8')) + check('poll advanced the persisted pulls cursor high-water', + cursorAfterPoll.repos['acme/widgets'].since.pulls === '2026-02-01T00:00:00Z', cursorAfterPoll.repos['acme/widgets'].since) + // ----- Restart durability: catalog, ledger, identity survive ----- await daemon.stop() const daemon2 = await runServerDaemon({ env, installSignalHandlers: false })