diff --git a/package.json b/package.json index 0879ab3..f0ead68 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "setup": "node src/setup-gui.js", "watcher": "node src/watcher/watcher-client.js", "bridge": "node src/bridge/mcp-bridge.js", - "test": "node --test --test-timeout 30000 src/*.test.js", + "test": "node --test --test-timeout 30000 src/*.test.js src/watcher/*.test.js", "stress-test": "node --test src/stress-test.js" }, "keywords": [ diff --git a/src/watcher/exclude-patterns.js b/src/watcher/exclude-patterns.js new file mode 100644 index 0000000..d4efdd5 --- /dev/null +++ b/src/watcher/exclude-patterns.js @@ -0,0 +1,34 @@ +function escapeRegexLiteral(value) { + return value.replace(/[.+?^${}()|[\]\\]/g, '\\$&'); +} + +export function buildExcludeRegex(pattern) { + const normalizedPattern = String(pattern || '').trim().replace(/\\/g, '/'); + if (!normalizedPattern) return null; + + const escaped = escapeRegexLiteral(normalizedPattern); + const wildcardPattern = escaped + .replace(/\*\*/g, '___DOUBLE_STAR___') + .replace(/\*/g, '[^/]*') + .replace(/___DOUBLE_STAR___/g, '.*'); + + // Match on path-segment boundaries anywhere in the path. + return new RegExp(`(^|.*/)${wildcardPattern}($|/.*)`); +} + +export function compileExcludePatterns(excludePatterns = []) { + const compiled = []; + for (const pattern of excludePatterns) { + const regex = buildExcludeRegex(pattern); + if (regex) compiled.push(regex); + } + return compiled; +} + +export function shouldExcludePath(filePath, compiledExcludePatterns = []) { + const normalizedPath = String(filePath || '').replace(/\\/g, '/'); + for (const regex of compiledExcludePatterns) { + if (regex.test(normalizedPath)) return true; + } + return false; +} diff --git a/src/watcher/scan-telemetry.js b/src/watcher/scan-telemetry.js new file mode 100644 index 0000000..4c4ab0c --- /dev/null +++ b/src/watcher/scan-telemetry.js @@ -0,0 +1,28 @@ +export function mergeScanTelemetrySnapshot(current, telemetry) { + if (!telemetry) return { ...current }; + return { + filesIngested: (current.filesIngested || 0) + (telemetry.filesIngested || 0), + assetsIngested: (current.assetsIngested || 0) + (telemetry.assetsIngested || 0), + deletesProcessed: (current.deletesProcessed || 0) + (telemetry.deletesProcessed || 0), + errorsCount: (current.errorsCount || 0) + (telemetry.errorsCount || 0), + lastIngestAt: telemetry.lastIngestAt || current.lastIngestAt || null + }; +} + +export function reconcileFinalTelemetry({ streamedTelemetry, streamedTotals, finalTelemetry }) { + if (!streamedTelemetry) return finalTelemetry || null; + if (!finalTelemetry) return null; + + return { + filesIngested: Math.max(0, (finalTelemetry.filesIngested || 0) - (streamedTotals?.filesIngested || 0)), + assetsIngested: Math.max(0, (finalTelemetry.assetsIngested || 0) - (streamedTotals?.assetsIngested || 0)), + deletesProcessed: Math.max(0, (finalTelemetry.deletesProcessed || 0) - (streamedTotals?.deletesProcessed || 0)), + errorsCount: Math.max(0, (finalTelemetry.errorsCount || 0) - (streamedTotals?.errorsCount || 0)), + lastIngestAt: finalTelemetry.lastIngestAt || null + }; +} + +export function getConfigReloadAction(projectsChanged, hasActiveWatcher) { + if (!projectsChanged) return 'none'; + return hasActiveWatcher ? 'restart' : 'defer'; +} diff --git a/src/watcher/scan-worker.js b/src/watcher/scan-worker.js new file mode 100644 index 0000000..0f3c69f --- /dev/null +++ b/src/watcher/scan-worker.js @@ -0,0 +1,532 @@ +#!/usr/bin/env node + +import { parentPort, workerData } from 'worker_threads'; +import { readFile, stat } from 'fs/promises'; +import { readdirSync, statSync } from 'fs'; +import { join, relative } from 'path'; +import { gzipSync } from 'zlib'; +import { parseContent as parseAngelscriptContent } from '../parsers/angelscript-parser.js'; +import { parseCppContent } from '../parsers/cpp-parser.js'; +import { parseCSharpContent } from '../parsers/csharp-parser.js'; +import { parseUAssetHeader } from '../parsers/uasset-parser.js'; +import { compileExcludePatterns, shouldExcludePath } from './exclude-patterns.js'; + +const MAX_CONCURRENT = 10; +const BATCH_SIZE = 50; + +const serviceUrl = workerData?.serviceUrl; +const config = workerData?.config; +const task = workerData?.task; +const workerPrefix = `${workerData?.logPrefix || '[Watcher]'}[scan]`; +const compiledExcludePatterns = compileExcludePatterns(config?.exclude || []); + +let filesIngested = 0; +let assetsIngested = 0; +let deletesProcessed = 0; +let errorsCount = 0; +let lastIngestAt = null; + +function postMessage(type, payload = {}) { + if (parentPort) parentPort.postMessage({ type, ...payload }); +} + +function log(message) { + postMessage('log', { level: 'log', message: `${workerPrefix} ${message}` }); +} + +function warn(message) { + postMessage('log', { level: 'warn', message: `${workerPrefix} ${message}` }); +} + +function recordIngest({ files = 0, assets = 0, deletes = 0, errors = 0 }) { + filesIngested += files; + assetsIngested += assets; + deletesProcessed += deletes; + errorsCount += errors; + if (files > 0 || assets > 0 || deletes > 0) { + lastIngestAt = new Date().toISOString(); + postMessage('telemetry', { + delta: { + filesIngested: files, + assetsIngested: assets, + deletesProcessed: deletes, + errorsCount: errors, + lastIngestAt + } + }); + } else if (errors > 0) { + postMessage('telemetry', { + delta: { + filesIngested: 0, + assetsIngested: 0, + deletesProcessed: 0, + errorsCount: errors, + lastIngestAt + } + }); + } +} + +function findBasePathForFile(filePath, project) { + const normalized = filePath.replace(/\\/g, '/').toLowerCase(); + for (const basePath of project.paths) { + if (normalized.startsWith(basePath.replace(/\\/g, '/').toLowerCase())) { + return basePath; + } + } + return null; +} + +function deriveModule(relativePath, projectName) { + const parts = relativePath.replace(/\.(as|h|cpp|cs)$/, '').split('/'); + parts.pop(); + return [projectName, ...parts].join('.'); +} + +export function shouldExclude(filePath, excludeMatchers = compiledExcludePatterns) { + const matchers = Array.isArray(excludeMatchers) && typeof excludeMatchers[0] === 'string' + ? compileExcludePatterns(excludeMatchers) + : excludeMatchers; + return shouldExcludePath(filePath, matchers || []); +} + +function collectFiles(dirPath, projectName, extensions, language, includePatterns) { + const files = []; + const scanDir = (dir) => { + let entries; + try { + entries = readdirSync(dir, { withFileTypes: true }); + } catch { + return; + } + + for (const entry of entries) { + const fullPath = join(dir, entry.name); + if (entry.isDirectory()) { + if (shouldExclude(fullPath)) continue; + scanDir(fullPath); + } else if (entry.isFile()) { + if (!extensions.some(ext => entry.name.endsWith(ext))) continue; + if (includePatterns?.length > 0) { + if (!includePatterns.some(pat => pat.startsWith('*') ? entry.name.endsWith(pat.slice(1)) : entry.name === pat)) continue; + } + if (shouldExclude(fullPath)) continue; + try { + const mtime = Math.floor(statSync(fullPath).mtimeMs); + const relativePath = relative(dirPath, fullPath).replace(/\\/g, '/'); + const module = deriveModule(relativePath, projectName); + files.push({ path: fullPath, project: projectName, module, mtime, basePath: dirPath, language }); + } catch {} + } + } + }; + + scanDir(dirPath); + return files; +} + +async function fetchJson(url, retries = 3) { + for (let attempt = 1; attempt <= retries; attempt++) { + try { + const res = await fetch(url); + if (!res.ok) throw new Error(`${res.status} ${res.statusText}`); + return res.json(); + } catch (err) { + const code = err.code || err.cause?.code; + if (attempt < retries && (code === 'ECONNRESET' || code === 'ECONNREFUSED' || err.message.includes('fetch failed'))) { + const delay = attempt * 2000; + warn(`GET failed (${code || err.message}), retry ${attempt}/${retries} in ${delay}ms...`); + await new Promise(r => setTimeout(r, delay)); + continue; + } + throw err; + } + } +} + +async function waitForService() { + log(`Waiting for service at ${serviceUrl}...`); + while (true) { + try { + const status = await fetchJson(`${serviceUrl}/internal/status`); + log(`Service connected. DB counts: ${JSON.stringify(status.counts || {})}`); + return status; + } catch { + await new Promise(r => setTimeout(r, 2000)); + } + } +} + +async function postJson(url, body, retries = 3) { + const json = JSON.stringify(body); + const useGzip = json.length > 1024; + const payload = useGzip ? gzipSync(json) : json; + const headers = { 'Content-Type': 'application/json' }; + if (useGzip) headers['Content-Encoding'] = 'gzip'; + + for (let attempt = 1; attempt <= retries; attempt++) { + try { + const res = await fetch(url, { method: 'POST', headers, body: payload }); + if (!res.ok) { + const text = await res.text().catch(() => ''); + throw new Error(`${res.status} ${res.statusText}: ${text.slice(0, 200)}`); + } + return res.json(); + } catch (err) { + const code = err.code || err.cause?.code; + if (attempt < retries && (code === 'ECONNRESET' || code === 'ECONNREFUSED' || code === 'UND_ERR_SOCKET' || err.message.includes('fetch failed'))) { + const delay = attempt * 2000; + warn(`POST failed (${code || err.message}), retry ${attempt}/${retries} in ${delay}ms...`); + await new Promise(r => setTimeout(r, delay)); + try { + await fetchJson(`${url.replace(/\/internal\/.*/, '')}/health`); + } catch { + log('Service unavailable, waiting...'); + await waitForService(); + } + continue; + } + throw err; + } + } +} + +async function readAndParseSource(filePath, project, language) { + const basePath = findBasePathForFile(filePath, project); + if (!basePath) return null; + + const fileStat = await stat(filePath); + const mtime = Math.floor(fileStat.mtimeMs); + const relativePath = relative(basePath, filePath).replace(/\\/g, '/'); + const module = deriveModule(relativePath, project.name); + + if (language === 'config') { + const content = await readFile(filePath, 'utf-8'); + return { path: filePath, project: project.name, module, mtime, language, relativePath, content, types: [], members: [] }; + } + + const content = await readFile(filePath, 'utf-8'); + let parsed; + if (language === 'cpp') { + parsed = parseCppContent(content, filePath); + } else if (language === 'csharp') { + parsed = parseCSharpContent(content, filePath); + } else { + parsed = parseAngelscriptContent(content, filePath); + } + + const types = []; + for (const cls of parsed.classes || []) types.push({ name: cls.name, kind: cls.kind || 'class', parent: cls.parent, line: cls.line }); + for (const s of parsed.structs || []) types.push({ name: s.name, kind: 'struct', parent: s.parent || null, line: s.line }); + for (const e of parsed.enums || []) types.push({ name: e.name, kind: 'enum', parent: null, line: e.line }); + if (language === 'angelscript') { + for (const ev of parsed.events || []) types.push({ name: ev.name, kind: 'event', parent: null, line: ev.line }); + for (const d of parsed.delegates || []) types.push({ name: d.name, kind: 'delegate', parent: null, line: d.line }); + for (const ns of parsed.namespaces || []) types.push({ name: ns.name, kind: 'namespace', parent: null, line: ns.line }); + } + if (language === 'cpp' || language === 'csharp') { + for (const d of parsed.delegates || []) types.push({ name: d.name, kind: 'delegate', parent: null, line: d.line }); + } + + return { + path: filePath, + project: project.name, + module, + mtime, + language, + content, + relativePath, + types, + members: parsed.members || [] + }; +} + +function parseAsset(filePath, project) { + const contentRoot = project.contentRoot || project.paths[0]; + const mtime = Math.floor(statSync(filePath).mtimeMs); + const relativePath = relative(contentRoot, filePath).replace(/\\/g, '/'); + const ext = relativePath.match(/\.[^.]+$/)?.[0] || ''; + const contentPath = '/Game/' + relativePath.replace(/\.[^.]+$/, ''); + const name = relativePath.split('/').pop().replace(/\.[^.]+$/, ''); + const folder = '/Game/' + relativePath.split('/').slice(0, -1).join('/'); + + let assetClass = null; + let parentClass = null; + if (ext === '.uasset') { + try { + const info = parseUAssetHeader(filePath); + assetClass = info.assetClass; + parentClass = info.parentClass; + } catch {} + } + + return { + path: filePath, + name, + contentPath, + folder: folder || '/Game', + project: project.name, + extension: ext, + mtime, + assetClass, + parentClass + }; +} + +async function readAndParseBatch(batch, project, language) { + const parsed = []; + for (let j = 0; j < batch.length; j += MAX_CONCURRENT) { + const concurrent = batch.slice(j, j + MAX_CONCURRENT); + const results = await Promise.all(concurrent.map(async (f) => { + try { + return await readAndParseSource(f.path, project, language); + } catch (err) { + recordIngest({ errors: 1 }); + warn(`Error parsing ${f.path}: ${err.message}`); + return null; + } + })); + parsed.push(...results.filter(Boolean)); + } + return parsed; +} + +async function runFullScan(languages) { + log(`Starting full scan for empty languages: ${languages.join(', ')}`); + const scanStart = performance.now(); + + for (const project of config.projects) { + if (!languages.includes(project.language)) continue; + const extensions = project.extensions || (project.language === 'cpp' ? ['.h', '.cpp'] : ['.as']); + + for (const basePath of project.paths) { + const collectStart = performance.now(); + const files = collectFiles(basePath, project.name, extensions, project.language, project.includePatterns); + const collectMs = (performance.now() - collectStart).toFixed(0); + log(`Collected ${files.length} files from ${project.name} (${collectMs}ms)`); + + if (project.language === 'content') { + for (let i = 0; i < files.length; i += BATCH_SIZE * 10) { + const batch = files.slice(i, i + BATCH_SIZE * 10); + const assets = []; + for (const f of batch) { + try { + assets.push(parseAsset(f.path, project)); + } catch (err) { + recordIngest({ errors: 1 }); + warn(`Error parsing asset ${f.path}: ${err.message}`); + } + } + if (assets.length > 0) { + await postJson(`${serviceUrl}/internal/ingest`, { assets }); + recordIngest({ assets: assets.length }); + } + if ((i + batch.length) % 5000 < BATCH_SIZE * 10) { + log(`${project.name}: ${i + batch.length}/${files.length} assets`); + } + } + } else { + let pendingPost = null; + let pendingFilesCount = 0; + for (let i = 0; i < files.length; i += BATCH_SIZE) { + const batch = files.slice(i, i + BATCH_SIZE); + const parsePromise = readAndParseBatch(batch, project, project.language); + + if (pendingPost) { + await pendingPost; + recordIngest({ files: pendingFilesCount }); + } + + const parsed = await parsePromise; + + if (parsed.length > 0) { + pendingFilesCount = parsed.length; + pendingPost = postJson(`${serviceUrl}/internal/ingest`, { files: parsed }); + } else { + pendingFilesCount = 0; + pendingPost = null; + } + + if ((i + batch.length) % 500 < BATCH_SIZE) { + log(`${project.name}: ${i + batch.length}/${files.length} files`); + } + } + if (pendingPost) { + await pendingPost; + recordIngest({ files: pendingFilesCount }); + } + } + } + } + + const totalS = ((performance.now() - scanStart) / 1000).toFixed(1); + log(`Full scan complete (${totalS}s)`); +} + +async function runReconcile(project) { + const language = project.language; + const extensions = project.extensions || (language === 'cpp' ? ['.h', '.cpp'] : ['.as']); + + for (const basePath of project.paths) { + const endpoint = language === 'content' + ? `${serviceUrl}/internal/asset-mtimes?project=${encodeURIComponent(project.name)}` + : `${serviceUrl}/internal/file-mtimes?language=${encodeURIComponent(language)}&project=${encodeURIComponent(project.name)}`; + + const storedMtimes = await fetchJson(endpoint); + + const collectStart = performance.now(); + const diskFiles = collectFiles(basePath, project.name, extensions, language, project.includePatterns); + const diskMap = new Map(diskFiles.map(f => [f.path, f])); + const collectMs = (performance.now() - collectStart).toFixed(0); + + const changed = []; + const deleted = []; + + for (const f of diskFiles) { + const storedMtime = storedMtimes[f.path]; + if (storedMtime === undefined || storedMtime !== f.mtime) { + changed.push(f); + } + } + + const basePrefix = basePath.replace(/\\/g, '/'); + for (const storedPath of Object.keys(storedMtimes)) { + const normalized = storedPath.replace(/\\/g, '/'); + if (normalized.startsWith(basePrefix) && !diskMap.has(storedPath)) { + deleted.push(storedPath); + } + } + + if (changed.length === 0 && deleted.length === 0) { + log(`${project.name}: up to date (${diskFiles.length} files, scan ${collectMs}ms)`); + continue; + } + + log(`${project.name}: ${changed.length} changed, ${deleted.length} deleted (of ${diskFiles.length} on disk, scan ${collectMs}ms)`); + + if (deleted.length > 0) { + for (let i = 0; i < deleted.length; i += BATCH_SIZE) { + const batch = deleted.slice(i, i + BATCH_SIZE); + await postJson(`${serviceUrl}/internal/ingest`, { deletes: batch }); + recordIngest({ deletes: batch.length }); + } + } + + if (language === 'content') { + for (let i = 0; i < changed.length; i += BATCH_SIZE * 10) { + const batch = changed.slice(i, i + BATCH_SIZE * 10); + const assets = []; + for (const f of batch) { + try { + assets.push(parseAsset(f.path, project)); + } catch (err) { + recordIngest({ errors: 1 }); + warn(`Error parsing asset ${f.path}: ${err.message}`); + } + } + if (assets.length > 0) { + await postJson(`${serviceUrl}/internal/ingest`, { assets }); + recordIngest({ assets: assets.length }); + } + if ((i + batch.length) % 5000 < BATCH_SIZE * 10) { + log(`${project.name}: ${i + batch.length}/${changed.length} assets reconciled`); + } + } + } else { + let pendingPost = null; + let pendingFilesCount = 0; + for (let i = 0; i < changed.length; i += BATCH_SIZE) { + const batch = changed.slice(i, i + BATCH_SIZE); + const parsePromise = readAndParseBatch(batch, project, language); + + if (pendingPost) { + await pendingPost; + recordIngest({ files: pendingFilesCount }); + } + + const parsed = await parsePromise; + + if (parsed.length > 0) { + pendingFilesCount = parsed.length; + pendingPost = postJson(`${serviceUrl}/internal/ingest`, { files: parsed }); + } else { + pendingFilesCount = 0; + pendingPost = null; + } + + if ((i + batch.length) % 500 < BATCH_SIZE) { + log(`${project.name}: ${i + batch.length}/${changed.length} files reconciled`); + } + } + if (pendingPost) { + await pendingPost; + recordIngest({ files: pendingFilesCount }); + } + } + } +} + +async function runReconcileProjects(projectNames) { + log(`Reconciling languages/projects in worker...`); + const reconcileStart = performance.now(); + + const selected = projectNames?.length + ? config.projects.filter(p => projectNames.includes(p.name)) + : config.projects; + + for (const project of selected) { + try { + await runReconcile(project); + } catch (err) { + recordIngest({ errors: 1 }); + warn(`Reconcile failed for ${project.name}: ${err.message}`); + } + } + + const reconcileS = ((performance.now() - reconcileStart) / 1000).toFixed(1); + log(`Reconciliation complete (${reconcileS}s)`); +} + +async function main() { + if (!serviceUrl) throw new Error('scan-worker missing serviceUrl'); + if (!config || !Array.isArray(config.projects)) throw new Error('scan-worker missing config.projects'); + if (!task || !task.kind) throw new Error('scan-worker missing task.kind'); + + if (task.kind === 'full-scan') { + await runFullScan(task.languages || []); + return; + } + + if (task.kind === 'reconcile') { + await runReconcileProjects(task.projectNames || null); + return; + } + + throw new Error(`Unknown task kind: ${task.kind}`); +} + +export function getTelemetrySnapshot() { + return { + filesIngested, + assetsIngested, + deletesProcessed, + errorsCount, + lastIngestAt + }; +} + +if (parentPort) { + main() + .then(() => { + postMessage('result', { + telemetry: getTelemetrySnapshot() + }); + }) + .catch((err) => { + postMessage('log', { + level: 'error', + message: `${workerPrefix} Fatal error: ${err.stack || err.message}` + }); + process.exit(1); + }); +} diff --git a/src/watcher/scan-worker.test.js b/src/watcher/scan-worker.test.js new file mode 100644 index 0000000..b915b90 --- /dev/null +++ b/src/watcher/scan-worker.test.js @@ -0,0 +1,113 @@ +import { describe, it } from 'node:test'; +import assert from 'node:assert/strict'; +import { shouldExclude } from './scan-worker.js'; +import { + getConfigReloadAction, + mergeScanTelemetrySnapshot, + reconcileFinalTelemetry +} from './scan-telemetry.js'; + +describe('watcher scan telemetry helpers', () => { + it('accumulates scan telemetry counters and preserves last ingest timestamp', () => { + const base = { + filesIngested: 10, + assetsIngested: 5, + deletesProcessed: 2, + errorsCount: 1, + lastIngestAt: '2026-02-25T10:00:00.000Z' + }; + const merged = mergeScanTelemetrySnapshot(base, { + filesIngested: 3, + assetsIngested: 2, + deletesProcessed: 1, + errorsCount: 4, + lastIngestAt: '2026-02-25T11:00:00.000Z' + }); + + assert.deepEqual(merged, { + filesIngested: 13, + assetsIngested: 7, + deletesProcessed: 3, + errorsCount: 5, + lastIngestAt: '2026-02-25T11:00:00.000Z' + }); + }); + + it('reconciles final worker telemetry against streamed deltas', () => { + const delta = reconcileFinalTelemetry({ + streamedTelemetry: true, + streamedTotals: { + filesIngested: 8, + assetsIngested: 3, + deletesProcessed: 1, + errorsCount: 2 + }, + finalTelemetry: { + filesIngested: 10, + assetsIngested: 5, + deletesProcessed: 4, + errorsCount: 5, + lastIngestAt: '2026-02-25T11:00:00.000Z' + } + }); + + assert.deepEqual(delta, { + filesIngested: 2, + assetsIngested: 2, + deletesProcessed: 3, + errorsCount: 3, + lastIngestAt: '2026-02-25T11:00:00.000Z' + }); + }); + + it('clamps reconciled counters at zero and handles missing final telemetry', () => { + const clamped = reconcileFinalTelemetry({ + streamedTelemetry: true, + streamedTotals: { filesIngested: 10, assetsIngested: 2, deletesProcessed: 3, errorsCount: 2 }, + finalTelemetry: { filesIngested: 1, assetsIngested: 1, deletesProcessed: 1, errorsCount: 0, lastIngestAt: null } + }); + assert.deepEqual(clamped, { + filesIngested: 0, + assetsIngested: 0, + deletesProcessed: 0, + errorsCount: 0, + lastIngestAt: null + }); + + const missingFinal = reconcileFinalTelemetry({ + streamedTelemetry: true, + streamedTotals: { filesIngested: 1, assetsIngested: 1, deletesProcessed: 1, errorsCount: 1 }, + finalTelemetry: null + }); + assert.equal(missingFinal, null); + }); +}); + +describe('watcher exclude patterns', () => { + it('matches exact path segments, wildcards, and escaped regex literals', () => { + const patterns = [ + 'Intermediate', + 'Saved/**', + '*.tmp', + 'Plugins/*/Binaries/**', + 'Dir.With.Dot/**' + ]; + + assert.equal(shouldExclude('C:/Repo/Game/Intermediate/Foo.cpp', patterns), true); + assert.equal(shouldExclude('C:/Repo/Game/NotIntermediate/Foo.cpp', patterns), false); + assert.equal(shouldExclude('C:/Repo/Game/Saved/Logs/Run.log', patterns), true); + assert.equal(shouldExclude('C:/Repo/Game/Plugins/PluginA/Binaries/Win64/a.dll', patterns), true); + assert.equal(shouldExclude('C:/Repo/Game/Plugins/PluginA/Source/a.cpp', patterns), false); + assert.equal(shouldExclude('C:/Repo/Game/Dir.With.Dot/file.txt', patterns), true); + assert.equal(shouldExclude('C:\\Repo\\Game\\Temp\\cache.tmp', patterns), true); + }); +}); + +describe('watcher bootstrap config reload guard', () => { + it('defers watcher restart while bootstrap has not started the watcher yet', () => { + assert.equal(getConfigReloadAction(true, false), 'defer'); + assert.equal(getConfigReloadAction(true, true), 'restart'); + assert.equal(getConfigReloadAction(false, false), 'none'); + assert.equal(getConfigReloadAction(false, true), 'none'); + }); +}); diff --git a/src/watcher/watcher-client.js b/src/watcher/watcher-client.js index 949cfa5..230f4b0 100644 --- a/src/watcher/watcher-client.js +++ b/src/watcher/watcher-client.js @@ -16,11 +16,14 @@ import chokidar from 'chokidar'; import { readFile, stat } from 'fs/promises'; import { readdirSync, statSync, readFileSync, existsSync } from 'fs'; import { join, relative } from 'path'; +import { Worker } from 'worker_threads'; import { parseContent as parseAngelscriptContent } from '../parsers/angelscript-parser.js'; import { parseCppContent } from '../parsers/cpp-parser.js'; import { parseCSharpContent } from '../parsers/csharp-parser.js'; import { parseUAssetHeader } from '../parsers/uasset-parser.js'; import { gzipSync } from 'zlib'; +import { compileExcludePatterns, shouldExcludePath } from './exclude-patterns.js'; +import { getConfigReloadAction, mergeScanTelemetrySnapshot, reconcileFinalTelemetry } from './scan-telemetry.js'; // --- Config --- @@ -84,6 +87,7 @@ const logPrefix = workspaceName ? `[Watcher:${workspaceName}]` : '[Watcher]'; // Config is fetched from the service after connection — initially null let config = null; +let compiledExcludePatterns = []; let lastConfigVersion = 0; // Config-derived settings (set after config fetch) @@ -117,6 +121,90 @@ let totalErrors = 0; let lastIngestTimestamp = null; let lastReconcileTimestamp = null; let nextReconcileTimestamp = null; +let scanTaskInFlight = false; + +function mergeScanTelemetry(telemetry) { + const merged = mergeScanTelemetrySnapshot({ + filesIngested: totalFilesIngested, + assetsIngested: totalAssetsIngested, + deletesProcessed: totalDeletes, + errorsCount: totalErrors, + lastIngestAt: lastIngestTimestamp + }, telemetry); + totalFilesIngested = merged.filesIngested; + totalAssetsIngested = merged.assetsIngested; + totalDeletes = merged.deletesProcessed; + totalErrors = merged.errorsCount; + lastIngestTimestamp = merged.lastIngestAt; +} + +function refreshCompiledExcludePatterns() { + compiledExcludePatterns = compileExcludePatterns(config?.exclude || []); +} + +async function runScanTask(task) { + const scanWorkerUrl = new URL('./scan-worker.js', import.meta.url); + return new Promise((resolve, reject) => { + let done = false; + let telemetry = null; + let streamedTelemetry = false; + const streamedTotals = { + filesIngested: 0, + assetsIngested: 0, + deletesProcessed: 0, + errorsCount: 0 + }; + + const worker = new Worker(scanWorkerUrl, { + workerData: { + serviceUrl: SERVICE_URL, + config, + task, + logPrefix + } + }); + + worker.on('message', (msg) => { + if (!msg || !msg.type) return; + if (msg.type === 'log') { + if (msg.level === 'warn') console.warn(msg.message); + else if (msg.level === 'error') console.error(msg.message); + else console.log(msg.message); + return; + } + if (msg.type === 'telemetry') { + streamedTelemetry = true; + const delta = msg.delta || null; + mergeScanTelemetry(delta); + streamedTotals.filesIngested += delta?.filesIngested || 0; + streamedTotals.assetsIngested += delta?.assetsIngested || 0; + streamedTotals.deletesProcessed += delta?.deletesProcessed || 0; + streamedTotals.errorsCount += delta?.errorsCount || 0; + return; + } + if (msg.type === 'result') { + telemetry = reconcileFinalTelemetry({ + streamedTelemetry, + streamedTotals, + finalTelemetry: msg.telemetry || null + }); + } + }); + + worker.on('error', (err) => { + if (done) return; + done = true; + reject(err); + }); + + worker.on('exit', (code) => { + if (done) return; + done = true; + if (code === 0) resolve(telemetry); + else reject(new Error(`Scan worker exited with code ${code}`)); + }); + }); +} // --- Utility functions --- @@ -161,16 +249,7 @@ function deriveModule(relativePath, projectName) { } function shouldExclude(path) { - const normalized = path.replace(/\\/g, '/'); - for (const pattern of config.exclude || []) { - if (pattern.includes('**')) { - const regex = new RegExp(pattern.replace(/\*\*/g, '.*').replace(/\*/g, '[^/]*')); - if (regex.test(normalized)) return true; - } else if (normalized.includes(pattern.replace(/\*/g, ''))) { - return true; - } - } - return false; + return shouldExcludePath(path, compiledExcludePatterns); } function collectFiles(dirPath, projectName, extensions, language, includePatterns) { @@ -424,7 +503,12 @@ async function reconcile(project) { const batch = changed.slice(i, i + BATCH_SIZE * 10); const assets = []; for (const f of batch) { - try { assets.push(parseAsset(f.path, project)); } catch {} + try { + assets.push(parseAsset(f.path, project)); + } catch (err) { + totalErrors++; + console.warn(`${logPrefix} Error parsing asset ${f.path}: ${err.message}`); + } } if (assets.length > 0) { await postJson(`${SERVICE_URL}/internal/ingest`, { assets }); @@ -485,7 +569,12 @@ async function fullScan(languages) { const batch = files.slice(i, i + BATCH_SIZE * 10); const assets = []; for (const f of batch) { - try { assets.push(parseAsset(f.path, project)); } catch {} + try { + assets.push(parseAsset(f.path, project)); + } catch (err) { + totalErrors++; + console.warn(`${logPrefix} Error parsing asset ${f.path}: ${err.message}`); + } } if (assets.length > 0) { await postJson(`${SERVICE_URL}/internal/ingest`, { assets }); @@ -541,10 +630,7 @@ function startWatcher() { let debounceTimer = null; const watcher = chokidar.watch(watchPaths, { - ignored: [ - /(^|[\/\\])\../, - ...(config.exclude || []).map(p => new RegExp(p.replace(/\*\*/g, '.*').replace(/\*/g, '[^/\\\\]*'))) - ], + ignored: (watchPath) => /(^|[\/\\])\../.test(watchPath) || shouldExclude(watchPath), persistent: true, ignoreInitial: true, awaitWriteFinish: { stabilityThreshold: 100, pollInterval: 50 } @@ -641,6 +727,7 @@ async function main() { // Fetch config from service (single source of truth) try { config = await fetchConfigFromService(); + refreshCompiledExcludePatterns(); console.log(`${logPrefix} Config from service: ${config.projects.length} projects`); } catch (err) { // Fallback: try local config.json if service doesn't have the endpoint (older version) @@ -648,12 +735,15 @@ async function main() { const fallbackPath = join(import.meta.dirname, '..', '..', 'config.json'); if (existsSync(fallbackPath)) { config = JSON.parse(readFileSync(fallbackPath, 'utf-8')); + refreshCompiledExcludePatterns(); console.log(`${logPrefix} Config from local fallback: ${config.projects.length} projects`); } else { throw new Error('No config available: service /internal/config failed and no local config.json found'); } } + let activeWatcher = null; + // --- Heartbeat: send watcher status to service every 15s --- // Start heartbeat immediately so the dashboard shows "watcher connected" // even during the initial reconciliation which can take several minutes. @@ -697,10 +787,14 @@ async function main() { const newConfig = await fetchConfigFromService(); const projectsChanged = JSON.stringify(newConfig.projects) !== JSON.stringify(config.projects); config = newConfig; - if (projectsChanged) { + refreshCompiledExcludePatterns(); + const reloadAction = getConfigReloadAction(projectsChanged, Boolean(activeWatcher)); + if (reloadAction === 'restart') { console.log(`${logPrefix} Projects changed — restarting file watches...`); await activeWatcher.close(); activeWatcher = startWatcher(); + } else if (reloadAction === 'defer') { + console.log(`${logPrefix} Projects changed during bootstrap — watcher will start with updated config`); } else { console.log(`${logPrefix} Config updated (no project path changes)`); } @@ -724,45 +818,63 @@ async function main() { const populatedLanguages = configuredLanguages.filter(lang => status.counts[lang]); if (emptyLanguages.length > 0) { - await fullScan(emptyLanguages); + scanTaskInFlight = true; + try { + mergeScanTelemetry(await runScanTask({ kind: 'full-scan', languages: emptyLanguages })); + } catch (err) { + console.error(`${logPrefix} Full scan failed: ${err.message}`); + totalErrors++; + } finally { + scanTaskInFlight = false; + } } if (populatedLanguages.length > 0) { console.log(`${logPrefix} Reconciling populated languages: ${populatedLanguages.join(', ')}`); - const reconcileStart = performance.now(); - for (const project of config.projects) { - if (!populatedLanguages.includes(project.language)) continue; - try { - await reconcile(project); - } catch (err) { - console.error(`${logPrefix} Reconcile failed for ${project.name}: ${err.message}`); - } + scanTaskInFlight = true; + try { + const projectNames = config.projects + .filter(project => populatedLanguages.includes(project.language)) + .map(project => project.name); + mergeScanTelemetry(await runScanTask({ kind: 'reconcile', projectNames })); + lastReconcileTimestamp = new Date().toISOString(); + } catch (err) { + console.error(`${logPrefix} Bootstrap reconcile failed: ${err.message}`); + totalErrors++; + } finally { + scanTaskInFlight = false; } - const reconcileS = ((performance.now() - reconcileStart) / 1000).toFixed(1); - console.log(`${logPrefix} Reconciliation complete (${reconcileS}s)`); } - let activeWatcher = startWatcher(); + if (!activeWatcher) { + activeWatcher = startWatcher(); + } // --- Periodic reconciliation: catch missed file changes --- - lastReconcileTimestamp = new Date().toISOString(); + if (!lastReconcileTimestamp) { + lastReconcileTimestamp = new Date().toISOString(); + } nextReconcileTimestamp = new Date(Date.now() + getReconcileIntervalMs()).toISOString(); async function periodicReconcile() { + nextReconcileTimestamp = new Date(Date.now() + getReconcileIntervalMs()).toISOString(); + if (scanTaskInFlight) { + console.log(`${logPrefix} Skipping periodic reconciliation (scan already in progress)`); + return; + } console.log(`${logPrefix} Starting periodic reconciliation...`); - const start = performance.now(); - for (const project of config.projects) { - try { - await reconcile(project); - } catch (err) { - console.error(`${logPrefix} Periodic reconcile failed for ${project.name}: ${err.message}`); - } + scanTaskInFlight = true; + try { + mergeScanTelemetry(await runScanTask({ kind: 'reconcile' })); + lastReconcileTimestamp = new Date().toISOString(); + } catch (err) { + console.error(`${logPrefix} Periodic reconcile failed: ${err.message}`); + totalErrors++; + } finally { + scanTaskInFlight = false; + nextReconcileTimestamp = new Date(Date.now() + getReconcileIntervalMs()).toISOString(); } - const s = ((performance.now() - start) / 1000).toFixed(1); - console.log(`${logPrefix} Periodic reconciliation complete (${s}s)`); - lastReconcileTimestamp = new Date().toISOString(); - nextReconcileTimestamp = new Date(Date.now() + getReconcileIntervalMs()).toISOString(); } setInterval(periodicReconcile, getReconcileIntervalMs());