diff --git a/specs/001-project-aether/features/41-guided-workflow-agent/plan.md b/specs/001-project-aether/features/41-guided-workflow-agent/plan.md new file mode 100644 index 00000000..58964dcf --- /dev/null +++ b/specs/001-project-aether/features/41-guided-workflow-agent/plan.md @@ -0,0 +1,153 @@ +# Implementation Plan: Guided Workflow Agent + +**Feature**: [spec.md](./spec.md) +**Status**: Planned +**Date**: 2026-03-12 + +## Summary + +Wire the existing Feature 36 automation builder workflow into the system as a dedicated, guided agent. This involves three layers: (1) a backend API route and chat preset, (2) orchestrator intent routing, and (3) conversational streaming with step progress in the activity panel. + +## Technical Context + +**Existing infrastructure (Feature 36)**: +- `src/graph/workflows/automation_builder.py` — LangGraph workflow with 6 nodes, HITL interrupt at preview +- `src/graph/nodes/automation_builder.py` — Node implementations (gather_intent, validate_entities, check_duplicates, generate_yaml, validate_yaml, preview) +- `src/graph/state/automation_builder.py` — `AutomationBuilderState` typed dict +- `src/tools/automation_builder_tools.py` — check_entity_exists, find_similar_automations, validate_automation_draft +- Registered in `src/graph/workflows/_registry.py` as `automation_builder` + +**What's missing**: +- No API endpoint invokes the workflow +- No chat preset for "Create Automation" +- Orchestrator doesn't route automation intents to the builder +- No streaming/activity integration +- `preview_node` generates a `proposal_id` but doesn't actually call `seek_approval` + +## Constitution Check + +- **Safety First**: Automations go through HITL approval (interrupt_before=["preview"]) and the proposal pipeline. No deployment without user consent. +- **Isolation**: YAML generation only; no script execution. +- **Observability**: All nodes wrapped in `traced_node()`, MLflow spans. +- **State**: PostgreSQL checkpointing via `PostgresCheckpointer`. + +## Architecture + +### Two entry points + +``` +Entry 1: Chat preset "Create Automation" + | + POST /v1/chat/completions + |-- agent="automation_builder" + |-- Bypasses orchestrator, goes directly to builder + v + AutomationBuilderWorkflow.stream() + +Entry 2: General chat with automation intent + | + POST /v1/chat/completions + |-- agent="auto" (default) + |-- Orchestrator.classify_intent() + |-- intent = "create_automation" + |-- Routes to AutomationBuilderWorkflow + v + AutomationBuilderWorkflow.stream() +``` + +### Streaming flow + +``` +User message + | + v +gather_intent_node + |-- emit_job_status("Extracting automation intent...") + |-- If needs_clarification: stream question back, wait for user reply + v +validate_entities_node + |-- emit_job_status("Validating entities...") + |-- If errors: stream suggestions, loop to gather_intent + v +check_duplicates_node + |-- emit_job_status("Checking for duplicates...") + |-- If found: stream warning, ask user to confirm + v +generate_yaml_node + |-- emit_job_status("Generating automation YAML...") + v +validate_yaml_node + |-- emit_job_status("Validating YAML...") + |-- If errors (< 3 attempts): loop to generate + v +preview_node + |-- emit_job_status("Preparing preview...") + |-- Stream YAML preview to user + |-- Call seek_approval() to create proposal + |-- Stream proposal link + v +END +``` + +## Key Design Decisions + +1. **Wrap workflow as an agent**: Create `AutomationBuilderAgent` that wraps the compiled LangGraph graph with streaming, job events, and message formatting. This parallels how `ArchitectWorkflow` wraps the conversation graph. + +2. **Conversational multi-turn**: The HITL interrupt at `preview` pauses the graph. For clarification loops (needs_clarification, entity_errors), the gather_intent node already loops. We enhance it to stream the question as an SSE message and wait for the user's next message (resume from checkpoint). + +3. **Orchestrator routing**: Add `"create_automation"` as an intent in `OrchestratorAgent.classify_intent()`. When detected, set `target_agent = "automation_builder"` in the `TaskPlan`. + +4. **Proposal creation**: `preview_node` currently only generates a message. Enhance it to call `seek_approval()` to create an actual `AutomationProposal` in the database. + +5. **Reuse streaming infrastructure**: Use the same SSE streaming as the conversation workflow (`/v1/chat/completions` with `stream=true`). + +## How Users Can Create Automations + +### Method 1: Guided Agent (via preset) + +1. Open chat, select "Create Automation" preset from the workflow picker +2. Describe the automation: "Turn off all lights at 11pm" +3. Agent validates entities, checks for duplicates +4. Agent shows YAML preview +5. User approves -> proposal created +6. Navigate to Proposals page to deploy + +### Method 2: Natural Language in Chat + +1. Open chat (general mode or any preset) +2. Say: "Create an automation that turns off the lights when nobody is home" +3. Orchestrator routes to automation builder automatically +4. Same guided flow as Method 1 + +### Method 3: Manual via Architect + +1. Open chat, select any preset +2. Tell the Architect exactly what you want: + - "Use seek_approval to create an automation with this YAML: ..." + - "Create a proposal for a time-triggered automation that..." +3. Architect generates YAML inline and uses `seek_approval` to create a proposal +4. This path is free-form — no step-by-step guidance + +### Method 4: Registry Page + +1. Go to the Registry page, Automations tab +2. Use the "Ask Architect" inline assistant +3. Say: "Create a new automation that locks the door when I leave" +4. Architect generates and proposes via `seek_approval` +5. Navigate to Proposals page to review and deploy + +## Files to Create + +- `src/agents/automation_builder/agent.py` — `AutomationBuilderAgent` wrapper (stream, resume, job events) +- `src/agents/automation_builder/__init__.py` — Module init +- `tests/unit/test_automation_builder_agent.py` — Agent wrapper tests +- `tests/unit/test_orchestrator_routing.py` — Intent routing tests + +## Files to Modify + +- `src/graph/nodes/automation_builder.py` — Enhance `preview_node` to call `seek_approval`; add step status emissions +- `src/agents/orchestrator.py` — Add `create_automation` intent, route to builder +- `src/api/routes/openai_compat/handlers.py` — Handle `agent="automation_builder"` in stream handler +- `src/api/routes/workflows.py` — Add "Create Automation" preset +- `ui/src/api/client/conversations.ts` — No changes (uses existing stream API) +- `ui/src/pages/chat/WorkflowPresetSelector.tsx` — Picks up new preset from API automatically diff --git a/specs/001-project-aether/features/41-guided-workflow-agent/spec.md b/specs/001-project-aether/features/41-guided-workflow-agent/spec.md new file mode 100644 index 00000000..9f2a1ae1 --- /dev/null +++ b/specs/001-project-aether/features/41-guided-workflow-agent/spec.md @@ -0,0 +1,83 @@ +# Feature Specification: Guided Workflow Agent + +**Feature Branch**: `feat/41-guided-workflow-agent` +**Created**: 2026-03-12 +**Status**: Draft +**Depends on**: Feature 36 (NL Automation Builder — state, nodes, tools exist but are not exposed) +**Input**: User request: "I want the workflows to be guided with a dedicated agent." + +## Problem Statement + +Feature 36 implemented the automation builder workflow (LangGraph graph, nodes, tools, state) but it was never wired to an API endpoint, chat preset, or UI. Users currently create automations through free-form chat with the Architect agent, which requires them to know HA automation structure. There is no guided, step-by-step experience and no way to invoke the automation builder at all. + +Two gaps need filling: + +1. **Guided mode** — a dedicated agent that walks users through automation creation step by step (trigger, entities, conditions, actions), with live validation and previews at each stage. +2. **Manual instruction** — users who prefer free-form chat should be able to say "create an automation that..." to the Architect, which routes the intent to the automation builder workflow automatically. + +## User Scenarios & Testing + +### User Story 1 — Guided Automation Creation via Dedicated Agent (Priority: P1) + +As a user, I want to select "Create Automation" from the chat preset picker and be guided through each step (describe intent, confirm entities, review conditions, preview YAML, approve/deploy) interactively. + +**Acceptance Scenarios**: + +1. **Given** I select the "Create Automation" preset, **When** I type "turn off lights at 10pm", **Then** the agent extracts intent, validates entities, and presents a step-by-step summary before generating YAML. +2. **Given** the agent asks me to confirm entities, **When** I correct an entity name, **Then** it re-validates and proceeds with the corrected entity. +3. **Given** the agent generates YAML, **When** I see the preview, **Then** I can approve (creating a proposal), request modifications, or cancel. +4. **Given** I approve, **Then** a proposal is created via the existing proposal pipeline and I'm directed to the proposals page. + +### User Story 2 — Architect Routes to Automation Builder (Priority: P1) + +As a user, I want to tell the Architect "create an automation that turns off lights when nobody is home" in general chat and have it automatically route to the automation builder workflow. + +**Acceptance Scenarios**: + +1. **Given** I send an automation-creation intent in general chat, **When** the orchestrator classifies the intent, **Then** it routes to the automation builder workflow (not free-form Architect). +2. **Given** the automation builder handles my request, **When** it needs clarification, **Then** it asks within the same chat session. +3. **Given** the builder generates valid YAML, **Then** it creates a proposal via `seek_approval` and informs me. + +### User Story 3 — Step Progress UI (Priority: P2) + +As a user, I want to see which step I'm on during guided automation creation (intent, entities, duplicates, YAML, preview) so I know the progress. + +**Acceptance Scenarios**: + +1. **Given** the automation builder is running, **When** each node completes, **Then** the activity panel shows the current step name and progress. +2. **Given** validation errors occur, **When** the agent loops back, **Then** the step indicator shows the regression clearly. + +### Edge Cases + +- User starts guided flow then switches to a different chat session: state is checkpointed and can be resumed. +- User describes something that isn't an automation (e.g. "tell me the weather"): agent recognizes this and redirects to Architect. +- User requests a complex automation (multiple triggers, parallel actions): agent handles multi-step intent extraction. +- The automation builder generates invalid YAML 3 times: agent gives up, presents best attempt, and suggests manual editing. + +## Requirements + +### Functional Requirements + +- **FR-001**: Automation builder workflow MUST be invocable via a chat workflow preset ("Create Automation"). +- **FR-002**: Orchestrator MUST route automation-creation intents to the automation builder (not Architect). +- **FR-003**: Automation builder MUST create proposals via `seek_approval` (reusing existing proposal pipeline). +- **FR-004**: Each step of the builder MUST stream status updates to the activity panel. +- **FR-005**: Builder MUST support multi-turn clarification within a single chat session. +- **FR-006**: Builder MUST checkpoint state to PostgreSQL for resumability. + +### Non-Functional Requirements + +- **NFR-001**: Step transitions should complete within 5 seconds for simple automations. +- **NFR-002**: All nodes traced via MLflow (existing `traced_node` wrapper). + +### Key Entities + +- **AutomationBuilderState** (exists): Extended if needed for streaming step updates. +- Reuses: AutomationProposal, HAEntity, workflow presets. + +## Success Criteria + +- **SC-001**: "Create Automation" preset appears in the chat workflow picker. +- **SC-002**: Simple automations complete end-to-end (NL -> preview -> proposal) within 30 seconds. +- **SC-003**: Orchestrator correctly routes 90%+ of automation-creation intents to the builder. +- **SC-004**: Activity panel shows step progress during guided flow. diff --git a/specs/001-project-aether/features/41-guided-workflow-agent/tasks.md b/specs/001-project-aether/features/41-guided-workflow-agent/tasks.md new file mode 100644 index 00000000..3944abed --- /dev/null +++ b/specs/001-project-aether/features/41-guided-workflow-agent/tasks.md @@ -0,0 +1,90 @@ +# Tasks: Guided Workflow Agent + +**Feature**: [spec.md](./spec.md) | **Plan**: [plan.md](./plan.md) + +--- + +## Phase 1 — Wire the Automation Builder Agent + +- [ ] T4101 Create `src/agents/automation_builder/__init__.py` and `agent.py` with `AutomationBuilderAgent` class that: + - Compiles the existing `automation_builder` graph with PostgreSQL checkpointing + - Provides `start_session(user_message, session_id)` and `continue_session(user_message, session_id)` methods + - Streams node outputs as SSE-compatible messages + - Emits `emit_job_start` / `emit_job_status` / `emit_job_complete` events at each step + - Handles the HITL interrupt at preview (pauses and streams the YAML preview) + +- [ ] T4102 Enhance `preview_node` in `src/graph/nodes/automation_builder.py` to: + - Call `seek_approval()` from `src/tools/approval_tools.py` to create an actual `AutomationProposal` + - Include the `proposal_id` from the created proposal in the state + - Format a user-friendly preview message with YAML and a link to the proposals page + +- [ ] T4103 [P] Unit tests for `AutomationBuilderAgent` — mock LLM, verify streaming, verify job events, verify checkpoint resume + +**Checkpoint**: Agent compiles and can stream a simple automation flow end-to-end with mock LLM + +--- + +## Phase 2 — Orchestrator Routing + +- [ ] T4104 Add `"create_automation"` intent to `OrchestratorAgent.classify_intent()` in `src/agents/orchestrator.py`: + - Add intent patterns: "create automation", "make automation", "set up automation", "automate", "when X then Y", "turn on/off ... at/when" + - When classified, set `target_agent = "automation_builder"` in `TaskPlan` + +- [ ] T4105 Update `src/api/routes/openai_compat/handlers.py` to handle `agent="automation_builder"`: + - When `agent` is `"automation_builder"` (explicit) or orchestrator routes to it (auto), invoke `AutomationBuilderAgent` instead of `ArchitectWorkflow` + - Reuse existing SSE streaming format + +- [ ] T4106 [P] Unit tests for orchestrator routing — verify "create an automation" classifies as `create_automation`, verify handler invokes correct agent + +**Checkpoint**: Saying "create an automation" in auto mode routes to the builder + +--- + +## Phase 3 — Chat Preset and Activity Integration + +- [ ] T4107 Add "Create Automation" preset in `src/api/routes/workflows.py`: + - Name: "Create Automation" + - Description: "Guided step-by-step automation creation with live validation" + - Agents: `["automation_builder"]` + - Default agent: `automation_builder` + +- [ ] T4108 Add step status emissions to each node in `src/graph/nodes/automation_builder.py`: + - `gather_intent_node`: "Extracting automation intent..." + - `validate_entities_node`: "Validating {n} entities..." + - `check_duplicates_node`: "Checking for duplicates..." + - `generate_yaml_node`: "Generating YAML (attempt {n})..." + - `validate_yaml_node`: "Validating YAML..." + - `preview_node`: "Automation ready for review" + +- [ ] T4109 [P] Verify the WorkflowPresetSelector picks up the new preset automatically (no frontend changes needed — it reads from API) + +**Checkpoint**: "Create Automation" preset visible in chat, activity panel shows step progress + +--- + +## Phase 4 — Multi-turn Clarification + +- [ ] T4110 Enhance `AutomationBuilderAgent` to support multi-turn clarification: + - When `needs_clarification=True`, stream the clarification question, then pause (checkpoint) + - On next user message, resume from checkpoint with the new message appended + - Same pattern for `entity_errors` — stream suggestions, wait for user correction + +- [ ] T4111 Add conversation threading: `AutomationBuilderAgent` uses `thread_id` (session ID) for LangGraph checkpointing so the user can pick up where they left off + +- [ ] T4112 [P] Integration test: multi-turn flow (user sends intent -> agent asks clarification -> user responds -> agent proceeds to YAML -> user approves) + +**Checkpoint**: Multi-turn guided flow works end-to-end + +--- + +## Phase 5 — Polish and Documentation + +- [ ] T4113 Add help text to the "Create Automation" preset description explaining what users can do +- [ ] T4114 Ensure error handling: LLM failures, invalid state, timeout — all surface user-friendly messages +- [ ] T4115 Run `make ci-local` and fix any issues + +**Checkpoint**: Feature complete, CI green + +--- + +`[P]` = Can run in parallel (different files, no dependencies) diff --git a/src/api/routes/diagnostics.py b/src/api/routes/diagnostics.py index 0d21bbc1..05a3d116 100644 --- a/src/api/routes/diagnostics.py +++ b/src/api/routes/diagnostics.py @@ -86,13 +86,23 @@ async def error_log() -> dict[str, Any]: entries = parse_error_log(raw_log) summary = get_error_summary(entries) by_integration = categorize_by_integration(entries) - known_patterns = analyze_errors(entries) + raw_patterns = analyze_errors(entries) - # Serialize by_integration: integration -> list of entry dicts serialized_by_int = {} for integration, int_entries in by_integration.items(): serialized_by_int[integration] = [asdict(e) for e in int_entries] + _LEVEL_TO_SEVERITY = {"ERROR": "high", "CRITICAL": "high", "WARNING": "medium"} + known_patterns = [ + { + "pattern": p.get("category", p.get("message", "")), + "severity": _LEVEL_TO_SEVERITY.get(p.get("level", ""), "low"), + "matched_entries": p.get("count", 0), + "suggestion": p.get("suggestion", ""), + } + for p in raw_patterns + ] + return { "summary": summary, "by_integration": serialized_by_int, @@ -172,9 +182,16 @@ async def recent_traces(limit: int = 50) -> dict[str, Any]: include_spans=False, ) + from src.api.routes.jobs import _resolve_job_type + items = [] for t in traces: info = t.info + tags: dict[str, str] = getattr(info, "tags", {}) or {} + run_name = tags.get("mlflow.runName", "") + job_title = tags.get("ha.job_title", "") + title = job_title or (run_name.replace("_", " ").title() if run_name else "Unknown") + items.append( { "trace_id": info.request_id, @@ -183,6 +200,10 @@ async def recent_traces(limit: int = 50) -> dict[str, Any]: else str(info.status), "timestamp_ms": info.timestamp_ms, "duration_ms": info.execution_time_ms, + "run_name": run_name, + "job_type": _resolve_job_type(run_name), + "title": title, + "conversation_id": tags.get("mlflow.trace.session", ""), } ) diff --git a/src/api/routes/jobs.py b/src/api/routes/jobs.py index 8368c1a6..71182098 100644 --- a/src/api/routes/jobs.py +++ b/src/api/routes/jobs.py @@ -55,11 +55,14 @@ def _map_trace_to_job(trace: Any) -> dict[str, Any]: raw_status = info.status.value if hasattr(info.status, "value") else str(info.status) run_name = tags.get("mlflow.runName", "") + job_title = tags.get("ha.job_title", "") + title = job_title or (run_name.replace("_", " ").title() if run_name else "Unknown") + return { "job_id": info.request_id, - "job_type": _resolve_job_type(run_name), + "job_type": tags.get("ha.job_type") or _resolve_job_type(run_name), "status": _resolve_status(raw_status), - "title": run_name.replace("_", " ").title() if run_name else "Unknown", + "title": title, "started_at": info.timestamp_ms, "duration_ms": info.execution_time_ms, "conversation_id": tags.get("mlflow.trace.session") or tags.get("conversation_id"), @@ -90,9 +93,11 @@ async def list_jobs( if experiment is None: return {"jobs": [], "total": 0} + fetch_limit = limit * 5 if job_type else limit + traces = client.search_traces( experiment_ids=[experiment.experiment_id], - max_results=limit, + max_results=fetch_limit, order_by=["timestamp_ms DESC"], include_spans=False, ) @@ -102,6 +107,9 @@ async def list_jobs( if job_type: jobs = [j for j in jobs if j["job_type"] == job_type] + jobs.sort(key=lambda j: j["started_at"] or 0, reverse=True) + jobs = jobs[:limit] + return {"jobs": jobs, "total": len(jobs)} except ImportError as exc: diff --git a/src/diagnostics/log_parser.py b/src/diagnostics/log_parser.py index 7858f06d..99e7a923 100644 --- a/src/diagnostics/log_parser.py +++ b/src/diagnostics/log_parser.py @@ -177,6 +177,8 @@ def get_error_summary(entries: list[ErrorLogEntry]) -> dict: if not entries: return { "total": 0, + "errors": 0, + "warnings": 0, "counts_by_level": {}, "top_integrations": [], } @@ -192,6 +194,8 @@ def get_error_summary(entries: list[ErrorLogEntry]) -> dict: return { "total": len(entries), + "errors": level_counts.get("ERROR", 0), + "warnings": level_counts.get("WARNING", 0), "counts_by_level": dict(level_counts), "top_integrations": integration_counts.most_common(10), } diff --git a/src/jobs/events.py b/src/jobs/events.py index 5e1fd3ad..f1f583e6 100644 --- a/src/jobs/events.py +++ b/src/jobs/events.py @@ -40,7 +40,7 @@ def _publish(event: dict[str, object]) -> None: def emit_job_start(job_id: str, job_type: JobType, title: str) -> None: - """Emit a job start event.""" + """Emit a job start event and persist the title as an MLflow tag.""" _publish( { "type": "job", @@ -51,6 +51,19 @@ def emit_job_start(job_id: str, job_type: JobType, title: str) -> None: "ts": time.time(), } ) + _tag_mlflow_trace(job_id, title, job_type) + + +def _tag_mlflow_trace(job_id: str, title: str, job_type: str) -> None: + """Best-effort: tag the active MLflow trace with a descriptive title.""" + try: + import mlflow + + mlflow.update_current_trace( + tags={"ha.job_title": title, "ha.job_type": job_type}, + ) + except Exception: + logger.debug("Could not tag MLflow trace for job %s", job_id) def emit_job_agent(job_id: str, agent: str, event: Literal["start", "end"]) -> None: diff --git a/src/storage/entities/insight.py b/src/storage/entities/insight.py index 3d0ef335..dc83adab 100644 --- a/src/storage/entities/insight.py +++ b/src/storage/entities/insight.py @@ -8,54 +8,15 @@ import enum from datetime import UTC, datetime -from typing import Any, TypeVar +from typing import Any -from sqlalchemy import JSON, DateTime, Float, String, Text, TypeDecorator, Uuid, func +from sqlalchemy import JSON, DateTime, Float, String, Text, Uuid, func from sqlalchemy.dialects.postgresql import ARRAY +from sqlalchemy.dialects.postgresql import ENUM as PgENUM from sqlalchemy.orm import Mapped, mapped_column from src.storage.models import Base -_E = TypeVar("_E", bound=enum.Enum) - - -def _enum_column(enum_cls: type[_E]) -> TypeDecorator[_E]: - """Enum column that persists by value and loads by value or name (legacy). - - Uses String as the impl so SQLAlchemy's internal Enum validation - is bypassed; the DB-level enum constraint is maintained by the - PostgreSQL column type created in migrations. - """ - - class _EnumByNameOrValue(TypeDecorator[_E]): - impl = String(255) - cache_ok = True - - def process_bind_param(self, value: Any, dialect: Any) -> str | None: - if value is None: - return None - if isinstance(value, enum_cls): - return str(value.value) - return str(value) - - def process_result_value(self, value: Any, dialect: Any) -> _E | None: - if value is None: - return None - if isinstance(value, enum_cls): - return value - s = str(value).strip() - s_lower = s.lower() - for member in enum_cls: - if s in (member.value, member.name) or s_lower == member.name.lower(): - return member - raise LookupError( - f"{value!r} is not among the defined enum values. " - f"Enum: {enum_cls.__name__}. " - f"Possible: {[e.value for e in enum_cls]}" - ) - - return _EnumByNameOrValue() - class InsightType(str, enum.Enum): """Types of insights the Data Science team can generate.""" @@ -108,9 +69,13 @@ class Insight(Base): id: Mapped[str] = mapped_column(Uuid(as_uuid=False), primary_key=True) - # Insight classification: persist by value; load by value or name (legacy rows) type: Mapped[InsightType] = mapped_column( - _enum_column(InsightType), + PgENUM( + InsightType, + name="insighttype", + create_type=False, + values_callable=lambda e: [m.value for m in e], + ), nullable=False, index=True, ) @@ -132,7 +97,12 @@ class Insight(Base): doc="Confidence score 0.0-1.0", ) impact: Mapped[InsightImpact] = mapped_column( - _enum_column(InsightImpact), + PgENUM( + InsightImpact, + name="insightimpact", + create_type=False, + values_callable=lambda e: [m.value for m in e], + ), nullable=False, doc="Impact level: low, medium, high, critical", ) @@ -157,9 +127,13 @@ class Insight(Base): doc="Output from script execution", ) - # Status tracking: persist by value; load by value or name (legacy rows) status: Mapped[InsightStatus] = mapped_column( - _enum_column(InsightStatus), + PgENUM( + InsightStatus, + name="insightstatus", + create_type=False, + values_callable=lambda e: [m.value for m in e], + ), nullable=False, default=InsightStatus.PENDING, index=True, diff --git a/src/tools/tariff_tools.py b/src/tools/tariff_tools.py index 4f361cf1..f3931fa0 100644 --- a/src/tools/tariff_tools.py +++ b/src/tools/tariff_tools.py @@ -273,6 +273,25 @@ async def setup_electricity_tariffs( ) +def _resolve_current_period() -> str: + """Determine the current tariff period from TARIFF_SCHEDULE and local time.""" + from datetime import datetime as _dt + + now = _dt.now() + t = now.hour * 60 + now.minute + + peak_start = 17 * 60 # 17:00 + peak_end = 19 * 60 # 19:00 + day_start = 8 * 60 # 08:00 + night_start = 23 * 60 # 23:00 + + if peak_start <= t < peak_end: + return "peak" + if day_start <= t < night_start: + return "day" + return "night" + + async def get_tariff_rates(ha_client: Any) -> dict[str, Any]: """Read current tariff rates from HA entities. @@ -312,11 +331,21 @@ def _str_state(entity: dict[str, Any] | None, default: str = "") -> str: night_rate = _float_state(night_entity) peak_rate = _float_state(peak_entity) + ha_period = _str_state(period_entity, "") + current_period = ( + ha_period if ha_period in ("day", "night", "peak") else _resolve_current_period() + ) + + period_rates = {"day": day_rate, "night": night_rate, "peak": peak_rate} + current_rate = _float_state(current_entity) + if current_rate == 0.0 and current_period in period_rates: + current_rate = period_rates[current_period] + return { "configured": True, "plan_name": _str_state(plan_entity, "Unknown Plan"), - "current_period": _str_state(period_entity, "day"), - "current_rate": _float_state(current_entity), + "current_period": current_period, + "current_rate": current_rate, "rates": { "day": {"rate": day_rate, **TARIFF_SCHEDULE["day"]}, "night": {"rate": night_rate, **TARIFF_SCHEDULE["night"]}, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c2110fdf..62c52fcf 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -66,6 +66,7 @@ def _configure_container_runtime() -> None: import pytest import pytest_asyncio +import sqlalchemy as sa from sqlalchemy import event from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine @@ -132,6 +133,28 @@ def postgres_url(postgres_container: Any) -> str: return async_url +def _create_pg_enums(conn: Any) -> None: + """Pre-create PostgreSQL enum types that models declare with create_type=False. + + The Insight entity uses PgENUM(..., create_type=False) because the enum + types are managed by Alembic migrations. Integration tests skip Alembic + and use Base.metadata.create_all, so the types must exist beforehand. + """ + from src.storage.entities.insight import InsightImpact, InsightStatus, InsightType + + for enum_cls, name in [ + (InsightType, "insighttype"), + (InsightImpact, "insightimpact"), + (InsightStatus, "insightstatus"), + ]: + values = ", ".join(f"'{m.value}'" for m in enum_cls) + conn.execute( + sa.text( + f"DO $$ BEGIN CREATE TYPE {name} AS ENUM ({values}); EXCEPTION WHEN duplicate_object THEN NULL; END $$" + ) + ) + + @pytest_asyncio.fixture(scope="session", loop_scope="session") async def integration_engine(postgres_url: str) -> AsyncGenerator[Any, None]: """Create async engine connected to the test container.""" @@ -141,8 +164,8 @@ async def integration_engine(postgres_url: str) -> AsyncGenerator[Any, None]: pool_pre_ping=True, ) - # Create all tables async with engine.begin() as conn: + await conn.run_sync(_create_pg_enums) await conn.run_sync(Base.metadata.create_all) yield engine diff --git a/tests/unit/test_ha_event_handler.py b/tests/unit/test_ha_event_handler.py new file mode 100644 index 00000000..b9d94c6f --- /dev/null +++ b/tests/unit/test_ha_event_handler.py @@ -0,0 +1,288 @@ +"""Tests for EventHandler debounced batch upsert logic. + +Feature 35: Real-Time HA Event Stream. +Covers event queuing, per-entity debounce, batch DB writes, +queue overflow handling, stats tracking, and flush-on-stop. +""" + +from __future__ import annotations + +import asyncio +from contextlib import asynccontextmanager, suppress +from typing import TYPE_CHECKING +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + +from src.ha.event_handler import EventHandler + + +def _state_event(entity_id: str, state: str, **attrs: object) -> dict[str, object]: + """Build a minimal state_changed event payload.""" + return { + "event_type": "state_changed", + "data": { + "entity_id": entity_id, + "new_state": { + "state": state, + "attributes": {"friendly_name": entity_id, **attrs}, + }, + }, + } + + +class TestHandleEvent: + """Tests for handle_event() queuing.""" + + @pytest.mark.asyncio + async def test_event_queued(self) -> None: + """Events are placed on the internal queue.""" + handler = EventHandler(queue_size=10) + event = _state_event("light.kitchen", "on") + await handler.handle_event(event) + + assert handler._queue.qsize() == 1 + assert handler.stats["events_received"] == 1 + + @pytest.mark.asyncio + async def test_multiple_events_queued(self) -> None: + """Multiple events accumulate in the queue.""" + handler = EventHandler(queue_size=10) + for i in range(5): + await handler.handle_event(_state_event(f"sensor.t{i}", str(i))) + + assert handler._queue.qsize() == 5 + assert handler.stats["events_received"] == 5 + + @pytest.mark.asyncio + async def test_queue_overflow_drops_oldest(self) -> None: + """When queue is full, oldest event is dropped to make room.""" + handler = EventHandler(queue_size=2) + await handler.handle_event(_state_event("a", "1")) + await handler.handle_event(_state_event("b", "2")) + assert handler._queue.qsize() == 2 + + await handler.handle_event(_state_event("c", "3")) + assert handler._queue.qsize() == 2 + assert handler.stats["events_received"] == 3 + + +class TestDrainQueue: + """Tests for _drain_queue() debounce logic.""" + + @pytest.mark.asyncio + async def test_debounce_keeps_latest(self) -> None: + """Same entity_id updated multiple times: only latest state kept.""" + handler = EventHandler() + await handler.handle_event(_state_event("light.living", "off")) + await handler.handle_event(_state_event("light.living", "on")) + await handler.handle_event(_state_event("light.living", "on", brightness=200)) + + handler._drain_queue() + assert len(handler._pending) == 1 + assert handler._pending["light.living"]["state"] == "on" + assert handler._pending["light.living"]["attributes"]["brightness"] == 200 + + @pytest.mark.asyncio + async def test_multiple_entities_kept(self) -> None: + """Different entity_ids each get their own pending entry.""" + handler = EventHandler() + await handler.handle_event(_state_event("light.a", "on")) + await handler.handle_event(_state_event("light.b", "off")) + + handler._drain_queue() + assert len(handler._pending) == 2 + assert "light.a" in handler._pending + assert "light.b" in handler._pending + + @pytest.mark.asyncio + async def test_events_without_entity_id_skipped(self) -> None: + """Events missing entity_id or new_state are silently skipped.""" + handler = EventHandler() + await handler.handle_event({"data": {}}) + await handler.handle_event({"data": {"entity_id": "x"}}) + + handler._drain_queue() + assert len(handler._pending) == 0 + + +class TestFlushToDb: + """Tests for _flush_to_db() batch upsert logic.""" + + @pytest.mark.asyncio + async def test_upsert_called_with_correct_data(self) -> None: + """_flush_to_db should call EntityRepository.upsert_many with correct shape.""" + handler = EventHandler() + handler._pending = { + "light.kitchen": { + "state": "on", + "attributes": {"friendly_name": "Kitchen Light"}, + }, + } + + mock_repo = MagicMock() + mock_repo.upsert_many = AsyncMock(return_value=([], {"created": 0, "updated": 1})) + + mock_session = MagicMock() + mock_session.commit = AsyncMock() + + @asynccontextmanager + async def _fake_session() -> AsyncIterator[MagicMock]: + yield mock_session + + with ( + patch("src.storage.get_session", _fake_session), + patch("src.dal.entities.EntityRepository", return_value=mock_repo), + ): + await handler._flush_to_db() + + mock_repo.upsert_many.assert_called_once() + upsert_data = mock_repo.upsert_many.call_args[0][0] + assert len(upsert_data) == 1 + assert upsert_data[0]["entity_id"] == "light.kitchen" + assert upsert_data[0]["domain"] == "light" + assert upsert_data[0]["state"] == "on" + assert upsert_data[0]["name"] == "Kitchen Light" + assert handler.stats["events_flushed"] == 1 + + @pytest.mark.asyncio + async def test_flush_restores_pending_on_error(self) -> None: + """On DB error, unflushed entities are restored to pending.""" + handler = EventHandler() + handler._pending = { + "sensor.temp": {"state": "22", "attributes": {}}, + } + + @asynccontextmanager + async def _failing_session() -> AsyncIterator[MagicMock]: + raise RuntimeError("DB down") + yield MagicMock() # pragma: no cover + + with patch("src.storage.get_session", _failing_session): + await handler._flush_to_db() + + assert "sensor.temp" in handler._pending + assert handler.stats["events_flushed"] == 0 + + @pytest.mark.asyncio + async def test_automation_state_triggers_proposal_sync(self) -> None: + """Automation entity updates should trigger _sync_proposal_statuses.""" + handler = EventHandler() + handler._pending = { + "automation.evening_lights": {"state": "off", "attributes": {}}, + } + + mock_repo = MagicMock() + mock_repo.upsert_many = AsyncMock(return_value=([], {"created": 0, "updated": 1})) + + mock_session = MagicMock() + mock_session.commit = AsyncMock() + + @asynccontextmanager + async def _fake_session() -> AsyncIterator[MagicMock]: + yield mock_session + + with ( + patch("src.storage.get_session", _fake_session), + patch("src.dal.entities.EntityRepository", return_value=mock_repo), + patch.object(handler, "_sync_proposal_statuses", new_callable=AsyncMock) as mock_sync, + ): + await handler._flush_to_db() + + mock_sync.assert_called_once_with({"automation.evening_lights": "off"}) + + @pytest.mark.asyncio + async def test_non_automation_skips_proposal_sync(self) -> None: + """Non-automation entities should not trigger proposal sync.""" + handler = EventHandler() + handler._pending = { + "light.kitchen": {"state": "on", "attributes": {}}, + } + + mock_repo = MagicMock() + mock_repo.upsert_many = AsyncMock(return_value=([], {"created": 0, "updated": 1})) + + mock_session = MagicMock() + mock_session.commit = AsyncMock() + + @asynccontextmanager + async def _fake_session() -> AsyncIterator[MagicMock]: + yield mock_session + + with ( + patch("src.storage.get_session", _fake_session), + patch("src.dal.entities.EntityRepository", return_value=mock_repo), + patch.object(handler, "_sync_proposal_statuses", new_callable=AsyncMock) as mock_sync, + ): + await handler._flush_to_db() + + mock_sync.assert_not_called() + + +class TestStats: + """Tests for stats property.""" + + @pytest.mark.asyncio + async def test_initial_stats(self) -> None: + """Fresh handler has zero stats.""" + handler = EventHandler() + stats = handler.stats + assert stats["events_received"] == 0 + assert stats["events_flushed"] == 0 + assert stats["pending"] == 0 + assert stats["queue_size"] == 0 + + @pytest.mark.asyncio + async def test_stats_after_events(self) -> None: + """Stats reflect received events and queue size.""" + handler = EventHandler() + await handler.handle_event(_state_event("a", "1")) + await handler.handle_event(_state_event("b", "2")) + + stats = handler.stats + assert stats["events_received"] == 2 + assert stats["queue_size"] == 2 + + +class TestLifecycle: + """Tests for start/stop lifecycle.""" + + @pytest.mark.asyncio + async def test_stop_flushes_remaining(self) -> None: + """stop() should drain queue and flush remaining pending events.""" + handler = EventHandler(batch_interval=999) + await handler.handle_event(_state_event("light.a", "on")) + + mock_repo = MagicMock() + mock_repo.upsert_many = AsyncMock(return_value=([], {"created": 0, "updated": 1})) + mock_session = MagicMock() + mock_session.commit = AsyncMock() + + @asynccontextmanager + async def _fake_session() -> AsyncIterator[MagicMock]: + yield mock_session + + with ( + patch("src.storage.get_session", _fake_session), + patch("src.dal.entities.EntityRepository", return_value=mock_repo), + ): + await handler.start() + await asyncio.sleep(0.01) + await handler.stop() + + mock_repo.upsert_many.assert_called_once() + + @pytest.mark.asyncio + async def test_start_creates_flush_task(self) -> None: + """start() creates the flush background task.""" + handler = EventHandler(batch_interval=999) + await handler.start() + assert handler._flush_task is not None + assert handler._running is True + handler._running = False + handler._flush_task.cancel() + with suppress(asyncio.CancelledError): + await handler._flush_task diff --git a/tests/unit/test_ha_event_stream.py b/tests/unit/test_ha_event_stream.py new file mode 100644 index 00000000..6d7d57ed --- /dev/null +++ b/tests/unit/test_ha_event_stream.py @@ -0,0 +1,368 @@ +"""Tests for HAEventStream persistent WebSocket subscription. + +Feature 35: Real-Time HA Event Stream. +Covers connection, authentication, subscription, reconnection with +exponential backoff, event dispatch, and graceful shutdown. +""" + +from __future__ import annotations + +import asyncio +import contextlib +import json +from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest + +from src.ha.event_stream import ( + _BACKOFF_BASE, + _BACKOFF_FACTOR, + _BACKOFF_MAX, + HAEventStream, +) + +WS_URL = "ws://homeassistant.local:8123/api/websocket" +TOKEN = "test-token-abc123" + + +class _AsyncCtx: + """Wrap an object for ``async with ws_connect(...) as ws``.""" + + def __init__(self, obj: Any) -> None: + self._obj = obj + + async def __aenter__(self) -> Any: + return self._obj + + async def __aexit__(self, *args: Any) -> None: + pass + + +def _make_ws( + messages: list[dict[str, Any]], + *, + hang_after: int | None = None, +) -> AsyncMock: + """Mock WebSocket that yields predefined messages then optionally hangs. + + Args: + messages: Responses to return from recv(). + hang_after: After this many recv() calls, block forever (simulates disconnect). + """ + call_count = 0 + + async def _recv() -> str: + nonlocal call_count + call_count += 1 + idx = call_count - 1 + if hang_after is not None and call_count > hang_after: + await asyncio.sleep(999) + return "{}" + if idx < len(messages): + return json.dumps(messages[idx]) + raise ConnectionError("Connection closed") + + ws = AsyncMock() + ws.recv = _recv + ws.send = AsyncMock() + + async def _aiter_impl(_self: object = None) -> Any: + """Make ``async for msg in ws`` work by delegating to recv().""" + while True: + try: + yield await _recv() + except (ConnectionError, asyncio.CancelledError): + return + + ws.__aiter__ = _aiter_impl + return ws + + +class TestHAEventStreamConnect: + """Connection and authentication tests.""" + + @pytest.mark.asyncio + async def test_connect_authenticate_subscribe(self) -> None: + """Happy path: auth -> subscribe -> receive event -> dispatch.""" + received: list[dict[str, Any]] = [] + + async def handler(event: dict[str, Any]) -> None: + received.append(event) + + event_msg = { + "type": "event", + "event": { + "event_type": "state_changed", + "data": {"entity_id": "light.living_room", "new_state": {"state": "on"}}, + }, + } + + ws = _make_ws( + [ + {"type": "auth_required", "ha_version": "2025.1.0"}, + {"type": "auth_ok", "ha_version": "2025.1.0"}, + {"id": 1, "type": "result", "success": True}, + event_msg, + ] + ) + + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + + with patch("src.ha.event_stream.ws_connect", return_value=_AsyncCtx(ws)): + task = asyncio.create_task(stream.run()) + await asyncio.sleep(0.05) + await stream.stop() + await asyncio.sleep(0.01) + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + auth_call = ws.send.call_args_list[0] + auth_msg = json.loads(auth_call[0][0]) + assert auth_msg["type"] == "auth" + assert auth_msg["access_token"] == TOKEN + + sub_call = ws.send.call_args_list[1] + sub_msg = json.loads(sub_call[0][0]) + assert sub_msg["type"] == "subscribe_events" + assert sub_msg["event_type"] == "state_changed" + + assert len(received) >= 1 + assert received[0]["event_type"] == "state_changed" + + @pytest.mark.asyncio + async def test_subscribe_failure_triggers_reconnect(self) -> None: + """Subscribe failure should trigger reconnect with backoff.""" + handler = AsyncMock() + sleep_calls: list[float] = [] + + async def _fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + raise asyncio.CancelledError + + ws = _make_ws( + [ + {"type": "auth_required", "ha_version": "2025.1.0"}, + {"type": "auth_ok", "ha_version": "2025.1.0"}, + {"id": 1, "type": "result", "success": False, "error": {"message": "fail"}}, + ] + ) + + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + + with ( + patch("src.ha.event_stream.ws_connect", return_value=_AsyncCtx(ws)), + patch("asyncio.sleep", side_effect=_fake_sleep), + ): + task = asyncio.create_task(stream.run()) + with contextlib.suppress(asyncio.CancelledError): + await task + + assert len(sleep_calls) >= 1 + assert sleep_calls[0] == _BACKOFF_BASE + + +class TestHAEventStreamReconnect: + """Exponential backoff reconnection tests.""" + + @pytest.mark.asyncio + async def test_backoff_doubles_on_failure(self) -> None: + """Backoff should increase: 1s -> 2s -> 4s.""" + handler = AsyncMock() + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + + sleep_calls: list[float] = [] + + async def _fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + if len(sleep_calls) >= 3: + raise asyncio.CancelledError + + with ( + patch( + "src.ha.event_stream.ws_connect", + side_effect=ConnectionError("refused"), + ), + patch("asyncio.sleep", side_effect=_fake_sleep), + ): + task = asyncio.create_task(stream.run()) + with contextlib.suppress(asyncio.CancelledError): + await task + + assert len(sleep_calls) >= 3 + assert sleep_calls[0] == _BACKOFF_BASE + assert sleep_calls[1] == _BACKOFF_BASE * _BACKOFF_FACTOR + assert sleep_calls[2] == _BACKOFF_BASE * _BACKOFF_FACTOR * _BACKOFF_FACTOR + + @pytest.mark.asyncio + async def test_backoff_capped_at_max(self) -> None: + """Backoff should never exceed _BACKOFF_MAX (60s).""" + handler = AsyncMock() + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + + sleep_calls: list[float] = [] + + async def _fake_sleep(delay: float) -> None: + sleep_calls.append(delay) + if len(sleep_calls) >= 10: + raise asyncio.CancelledError + + with ( + patch( + "src.ha.event_stream.ws_connect", + side_effect=ConnectionError("refused"), + ), + patch("asyncio.sleep", side_effect=_fake_sleep), + ): + task = asyncio.create_task(stream.run()) + with contextlib.suppress(asyncio.CancelledError): + await task + + assert len(sleep_calls) >= 7 # enough to hit ceiling + for delay in sleep_calls: + assert delay <= _BACKOFF_MAX + + @pytest.mark.asyncio + async def test_backoff_resets_on_successful_connect(self) -> None: + """After a successful connection, backoff resets to base.""" + handler = AsyncMock() + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + assert stream._backoff == _BACKOFF_BASE + + stream._backoff = 32.0 + + ws = _make_ws( + [ + {"type": "auth_required", "ha_version": "2025.1.0"}, + {"type": "auth_ok", "ha_version": "2025.1.0"}, + {"id": 1, "type": "result", "success": True}, + ] + ) + + with patch("src.ha.event_stream.ws_connect", return_value=_AsyncCtx(ws)): + task = asyncio.create_task(stream.run()) + await asyncio.sleep(0.05) + await stream.stop() + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + assert stream._backoff == _BACKOFF_BASE + + +class TestHAEventStreamLifecycle: + """Start/stop lifecycle tests.""" + + @pytest.mark.asyncio + async def test_stop_cancels_task(self) -> None: + """stop() should cancel the background task.""" + handler = AsyncMock() + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + entered = asyncio.Event() + _real_sleep = asyncio.sleep + + async def _blocking_sleep(delay: float) -> None: + entered.set() + await _real_sleep(999) + + with ( + patch( + "src.ha.event_stream.ws_connect", + side_effect=ConnectionError("refused"), + ), + patch("asyncio.sleep", side_effect=_blocking_sleep), + ): + task = stream.start_task() + assert not task.done() + await _real_sleep(0.05) + await asyncio.wait_for(entered.wait(), timeout=2) + await stream.stop() + assert stream.is_running is False + + @pytest.mark.asyncio + async def test_start_task_returns_task(self) -> None: + """start_task() should return an asyncio.Task.""" + handler = AsyncMock() + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + entered = asyncio.Event() + _real_sleep = asyncio.sleep + + async def _blocking_sleep(delay: float) -> None: + entered.set() + await _real_sleep(999) + + with ( + patch( + "src.ha.event_stream.ws_connect", + side_effect=ConnectionError("refused"), + ), + patch("asyncio.sleep", side_effect=_blocking_sleep), + ): + task = stream.start_task() + assert isinstance(task, asyncio.Task) + await _real_sleep(0.05) + await asyncio.wait_for(entered.wait(), timeout=2) + await stream.stop() + + @pytest.mark.asyncio + async def test_is_running_property(self) -> None: + """is_running reflects stream state.""" + handler = AsyncMock() + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + assert stream.is_running is False + + entered = asyncio.Event() + _real_sleep = asyncio.sleep + + async def _blocking_sleep(delay: float) -> None: + entered.set() + await _real_sleep(999) + + with ( + patch( + "src.ha.event_stream.ws_connect", + side_effect=ConnectionError("refused"), + ), + patch("asyncio.sleep", side_effect=_blocking_sleep), + ): + stream.start_task() + await _real_sleep(0.05) + await asyncio.wait_for(entered.wait(), timeout=2) + assert stream.is_running is True + await stream.stop() + assert stream.is_running is False + + @pytest.mark.asyncio + async def test_non_event_messages_ignored(self) -> None: + """Messages that aren't type='event' should be silently ignored.""" + received: list[dict[str, Any]] = [] + + async def handler(event: dict[str, Any]) -> None: + received.append(event) + + ws = _make_ws( + [ + {"type": "auth_required", "ha_version": "2025.1.0"}, + {"type": "auth_ok", "ha_version": "2025.1.0"}, + {"id": 1, "type": "result", "success": True}, + {"type": "pong", "id": 2}, + { + "type": "event", + "event": {"event_type": "state_changed", "data": {"entity_id": "sensor.temp"}}, + }, + ] + ) + + stream = HAEventStream(WS_URL, TOKEN, handler=handler) + + with patch("src.ha.event_stream.ws_connect", return_value=_AsyncCtx(ws)): + task = asyncio.create_task(stream.run()) + await asyncio.sleep(0.05) + await stream.stop() + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + + assert len(received) == 1 + assert received[0]["event_type"] == "state_changed" diff --git a/tests/unit/test_hitl_insight_notifier.py b/tests/unit/test_hitl_insight_notifier.py new file mode 100644 index 00000000..8aadd7ec --- /dev/null +++ b/tests/unit/test_hitl_insight_notifier.py @@ -0,0 +1,269 @@ +"""Tests for InsightNotifier proactive notification logic. + +Feature 37: Proactive Insight Notifications. +Covers threshold filtering, quiet hours, batch vs single +notification, disabled state, from_settings factory, and +graceful failure when push service is unavailable. +""" + +from __future__ import annotations + +from datetime import time as dt_time +from types import SimpleNamespace +from typing import TYPE_CHECKING, cast +from unittest.mock import AsyncMock, patch + +import pytest + +from src.hitl.insight_notifier import InsightNotifier + +if TYPE_CHECKING: + from src.storage.entities.insight import Insight + + +def _insight( + impact: str = "high", + title: str = "Test insight", + confidence: float = 0.9, + insight_id: str = "ins-1", +) -> Insight: + """Create a minimal Insight-like object for testing.""" + return cast( + "Insight", + SimpleNamespace( + id=insight_id, + title=title, + impact=impact, + confidence=confidence, + ), + ) + + +class TestThresholdFiltering: + """Tests for _passes_threshold impact filtering.""" + + def test_high_passes_high(self) -> None: + n = InsightNotifier(min_impact="high") + assert n._passes_threshold("high") is True + + def test_critical_passes_high(self) -> None: + n = InsightNotifier(min_impact="high") + assert n._passes_threshold("critical") is True + + def test_medium_fails_high(self) -> None: + n = InsightNotifier(min_impact="high") + assert n._passes_threshold("medium") is False + + def test_low_fails_high(self) -> None: + n = InsightNotifier(min_impact="high") + assert n._passes_threshold("low") is False + + def test_low_passes_low(self) -> None: + n = InsightNotifier(min_impact="low") + assert n._passes_threshold("low") is True + + def test_medium_passes_medium(self) -> None: + n = InsightNotifier(min_impact="medium") + assert n._passes_threshold("medium") is True + + def test_high_fails_critical(self) -> None: + n = InsightNotifier(min_impact="critical") + assert n._passes_threshold("high") is False + + def test_unknown_impact_treated_as_zero(self) -> None: + n = InsightNotifier(min_impact="high") + assert n._passes_threshold("unknown") is False + + +class TestQuietHours: + """Tests for _is_quiet_hours logic.""" + + def test_no_quiet_hours_configured(self) -> None: + n = InsightNotifier() + assert n._is_quiet_hours() is False + + def test_within_same_day_window(self) -> None: + n = InsightNotifier(quiet_start="09:00", quiet_end="17:00") + with patch("src.hitl.insight_notifier.datetime") as mock_dt: + mock_dt.now.return_value.time.return_value = dt_time(12, 0) + assert n._is_quiet_hours() is True + + def test_outside_same_day_window(self) -> None: + n = InsightNotifier(quiet_start="09:00", quiet_end="17:00") + with patch("src.hitl.insight_notifier.datetime") as mock_dt: + mock_dt.now.return_value.time.return_value = dt_time(20, 0) + assert n._is_quiet_hours() is False + + def test_midnight_crossing_late_night(self) -> None: + """22:00-07:00 window, time is 23:30 -> quiet.""" + n = InsightNotifier(quiet_start="22:00", quiet_end="07:00") + with patch("src.hitl.insight_notifier.datetime") as mock_dt: + mock_dt.now.return_value.time.return_value = dt_time(23, 30) + assert n._is_quiet_hours() is True + + def test_midnight_crossing_early_morning(self) -> None: + """22:00-07:00 window, time is 05:00 -> quiet.""" + n = InsightNotifier(quiet_start="22:00", quiet_end="07:00") + with patch("src.hitl.insight_notifier.datetime") as mock_dt: + mock_dt.now.return_value.time.return_value = dt_time(5, 0) + assert n._is_quiet_hours() is True + + def test_midnight_crossing_afternoon(self) -> None: + """22:00-07:00 window, time is 14:00 -> not quiet.""" + n = InsightNotifier(quiet_start="22:00", quiet_end="07:00") + with patch("src.hitl.insight_notifier.datetime") as mock_dt: + mock_dt.now.return_value.time.return_value = dt_time(14, 0) + assert n._is_quiet_hours() is False + + +class TestNotifyIfActionable: + """Tests for notify_if_actionable method.""" + + @pytest.mark.asyncio + async def test_disabled_returns_zero(self) -> None: + n = InsightNotifier(enabled=False) + result = await n.notify_if_actionable([_insight()]) + assert result == 0 + + @pytest.mark.asyncio + async def test_quiet_hours_returns_zero(self) -> None: + n = InsightNotifier(quiet_start="00:00", quiet_end="23:59") + with patch("src.hitl.insight_notifier.datetime") as mock_dt: + mock_dt.now.return_value.time.return_value = dt_time(12, 0) + result = await n.notify_if_actionable([_insight()]) + assert result == 0 + + @pytest.mark.asyncio + async def test_no_actionable_returns_zero(self) -> None: + n = InsightNotifier(min_impact="critical") + result = await n.notify_if_actionable([_insight(impact="low")]) + assert result == 0 + + @pytest.mark.asyncio + async def test_single_insight_sends_detailed(self) -> None: + n = InsightNotifier(min_impact="high") + mock_send = AsyncMock(return_value={"success": True}) + + with patch( + "src.hitl.push_notification.send_insight_notification", + mock_send, + ): + result = await n.notify_if_actionable([_insight(title="Energy spike")]) + + assert result == 1 + mock_send.assert_called_once() + call_kwargs = mock_send.call_args[1] + assert call_kwargs["title"] == "Energy spike" + assert "90%" in call_kwargs["message"] + assert call_kwargs["insight_id"] == "ins-1" + + @pytest.mark.asyncio + async def test_multiple_insights_sends_batch(self) -> None: + n = InsightNotifier(min_impact="high") + mock_send = AsyncMock(return_value={"success": True}) + insights = [ + _insight(impact="high", insight_id="a"), + _insight(impact="critical", insight_id="b"), + _insight(impact="high", insight_id="c"), + ] + + with patch( + "src.hitl.push_notification.send_insight_notification", + mock_send, + ): + result = await n.notify_if_actionable(insights) + + assert result == 3 + mock_send.assert_called_once() + call_kwargs = mock_send.call_args[1] + assert "3 new insights" in call_kwargs["title"] + assert call_kwargs["insight_id"] is None + + @pytest.mark.asyncio + async def test_mixed_impacts_filters_correctly(self) -> None: + """Only insights meeting threshold are counted and notified.""" + n = InsightNotifier(min_impact="high") + mock_send = AsyncMock(return_value={"success": True}) + insights = [ + _insight(impact="low"), + _insight(impact="high"), + _insight(impact="medium"), + ] + + with patch( + "src.hitl.push_notification.send_insight_notification", + mock_send, + ): + result = await n.notify_if_actionable(insights) + + assert result == 1 # only "high" passes threshold + + @pytest.mark.asyncio + async def test_send_failure_returns_zero(self) -> None: + """If push notification fails, return 0 gracefully.""" + n = InsightNotifier(min_impact="high") + + with patch( + "src.hitl.push_notification.send_insight_notification", + side_effect=RuntimeError("push failed"), + ): + result = await n.notify_if_actionable([_insight()]) + + assert result == 0 + + +class TestFromSettings: + """Tests for the from_settings class method.""" + + @pytest.mark.asyncio + async def test_loads_from_db_settings(self) -> None: + mock_merged = { + "notifications": { + "enabled": False, + "min_impact": "critical", + "quiet_hours_start": "22:00", + "quiet_hours_end": "07:00", + } + } + + with patch( + "src.dal.app_settings.get_app_settings_merged", + new_callable=AsyncMock, + return_value=mock_merged, + ): + notifier = await InsightNotifier.from_settings() + + assert notifier.enabled is False + assert notifier.min_impact == "critical" + assert notifier.quiet_start == dt_time(22, 0) + assert notifier.quiet_end == dt_time(7, 0) + + @pytest.mark.asyncio + async def test_falls_back_to_defaults_on_error(self) -> None: + with patch( + "src.dal.app_settings.get_app_settings_merged", + new_callable=AsyncMock, + side_effect=RuntimeError("DB down"), + ): + notifier = await InsightNotifier.from_settings() + + assert notifier.enabled is True + assert notifier.min_impact == "high" + assert notifier.quiet_start is None + assert notifier.quiet_end is None + + +class TestParseTime: + """Tests for _parse_time helper.""" + + def test_valid_time(self) -> None: + assert InsightNotifier._parse_time("22:30") == dt_time(22, 30) + + def test_midnight(self) -> None: + assert InsightNotifier._parse_time("00:00") == dt_time(0, 0) + + def test_none_returns_none(self) -> None: + assert InsightNotifier._parse_time(None) is None + + def test_empty_returns_none(self) -> None: + assert InsightNotifier._parse_time("") is None diff --git a/ui/src/api/client/optimization.ts b/ui/src/api/client/optimization.ts index 9c8cf9f2..f6424a43 100644 --- a/ui/src/api/client/optimization.ts +++ b/ui/src/api/client/optimization.ts @@ -67,4 +67,12 @@ export const optimization = { body: JSON.stringify({ reason }), }, ), + + jobHistory: (status?: string, limit?: number) => { + const params = new URLSearchParams(); + if (status) params.set("status", status); + if (limit !== undefined) params.set("limit", limit.toString()); + const qs = params.toString(); + return request(`/optimize/jobs${qs ? `?${qs}` : ""}`); + }, }; diff --git a/ui/src/api/client/settings.ts b/ui/src/api/client/settings.ts index 03e670fe..4de4eeb1 100644 --- a/ui/src/api/client/settings.ts +++ b/ui/src/api/client/settings.ts @@ -1,18 +1,22 @@ import { request } from "./core"; +type SettingsValue = number | boolean | string; + export interface AppSettingsResponse { - chat: Record; - dashboard: Record; - data_science: Record; + chat: Record; + dashboard: Record; + data_science: Record; + notifications: Record; } export const appSettings = { get: () => request("/settings"), patch: (body: { - chat?: Record; - dashboard?: Record; - data_science?: Record; + chat?: Record; + dashboard?: Record; + data_science?: Record; + notifications?: Record; }) => request("/settings", { method: "PATCH", diff --git a/ui/src/api/client/system.ts b/ui/src/api/client/system.ts index be2c00b9..6cc8fdf1 100644 --- a/ui/src/api/client/system.ts +++ b/ui/src/api/client/system.ts @@ -86,6 +86,10 @@ export interface RecentTracesResponse { status: string; timestamp_ms: number; duration_ms: number | null; + run_name?: string; + job_type?: string; + title?: string; + conversation_id?: string; }>; total: number; } diff --git a/ui/src/api/hooks/index.ts b/ui/src/api/hooks/index.ts index a54c5984..cd4920c7 100644 --- a/ui/src/api/hooks/index.ts +++ b/ui/src/api/hooks/index.ts @@ -109,6 +109,7 @@ export { useRunOptimization, useAcceptSuggestion, useRejectSuggestion, + useJobHistory, } from "./optimization"; export { useEvaluationSummary, diff --git a/ui/src/api/hooks/optimization.ts b/ui/src/api/hooks/optimization.ts index 4c4b6673..95274294 100644 --- a/ui/src/api/hooks/optimization.ts +++ b/ui/src/api/hooks/optimization.ts @@ -56,3 +56,10 @@ export function useRejectSuggestion() { }, }); } + +export function useJobHistory(status?: string) { + return useQuery({ + queryKey: queryKeys.optimization.history(status), + queryFn: () => optimization.jobHistory(status), + }); +} diff --git a/ui/src/api/hooks/queryKeys.ts b/ui/src/api/hooks/queryKeys.ts index d09cdc49..86f860c9 100644 --- a/ui/src/api/hooks/queryKeys.ts +++ b/ui/src/api/hooks/queryKeys.ts @@ -156,6 +156,8 @@ export const queryKeys = { all: ["optimization"] as const, job: (jobId: string) => ["optimization", "job", jobId] as const, suggestions: ["optimization", "suggestions"] as const, + history: (status?: string) => + ["optimization", "history", status] as const, }, // ── Evaluations ──────────────────────────────────────────────────────── diff --git a/ui/src/api/hooks/settings.ts b/ui/src/api/hooks/settings.ts index c6c12ce2..6648b0e9 100644 --- a/ui/src/api/hooks/settings.ts +++ b/ui/src/api/hooks/settings.ts @@ -1,4 +1,5 @@ import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import type { AppSettingsResponse } from "../client/settings"; import { appSettings } from "../client"; import { queryKeys } from "./queryKeys"; @@ -14,8 +15,8 @@ export function usePatchSettings() { const qc = useQueryClient(); return useMutation({ mutationFn: appSettings.patch, - onSuccess: () => { - qc.invalidateQueries({ queryKey: queryKeys.appSettings.all }); + onSuccess: (data: AppSettingsResponse) => { + qc.setQueryData(queryKeys.appSettings.all, data); }, }); } @@ -24,8 +25,8 @@ export function useResetSettings() { const qc = useQueryClient(); return useMutation({ mutationFn: appSettings.reset, - onSuccess: () => { - qc.invalidateQueries({ queryKey: queryKeys.appSettings.all }); + onSuccess: (data: AppSettingsResponse) => { + qc.setQueryData(queryKeys.appSettings.all, data); }, }); } diff --git a/ui/src/components/chat/agent-activity-panel.tsx b/ui/src/components/chat/agent-activity-panel.tsx index 6c95dc44..70ae417b 100644 --- a/ui/src/components/chat/agent-activity-panel.tsx +++ b/ui/src/components/chat/agent-activity-panel.tsx @@ -1,6 +1,7 @@ import { useEffect, useMemo, useRef, useState, useCallback } from "react"; import { motion, AnimatePresence } from "framer-motion"; import { X, Brain, ChevronDown, ChevronLeft, ChevronRight, Loader2, Cpu, Clock, MessageSquare, Sparkles, BarChart3, Calendar, Webhook, FlaskConical, Compass } from "lucide-react"; +import { useNavigate } from "react-router-dom"; import { cn } from "@/lib/utils"; import { AgentTopology } from "./agent-topology"; import { TOPOLOGY_AGENT_IDS, agentColor, buildTopologyAgents } from "@/lib/agent-registry"; @@ -331,6 +332,7 @@ const JOB_TYPE_ICONS: Record = { function JobListSection() { const jobList = useJobList(); + const navigate = useNavigate(); if (jobList.length === 0) return null; const running = jobList.filter((j) => j.status === "running"); @@ -351,12 +353,14 @@ function JobListSection() { {display.map((job) => { const Icon = JOB_TYPE_ICONS[job.jobType] ?? Cpu; return ( -
navigate("/jobs")} className={cn( - "flex items-center gap-2 rounded-md px-2 py-1 text-[11px]", + "flex w-full items-center gap-2 rounded-md px-2 py-1 text-[11px] cursor-pointer transition-colors text-left", job.status === "running" - ? "bg-primary/5 text-foreground" + ? "bg-primary/5 text-foreground hover:bg-primary/10" : "text-muted-foreground hover:bg-muted/50", )} > @@ -373,7 +377,7 @@ function JobListSection() { ) : ( failed )} -
+ ); })} diff --git a/ui/src/pages/diagnostics/TracingTab.tsx b/ui/src/pages/diagnostics/TracingTab.tsx index 3f667167..57677f1a 100644 --- a/ui/src/pages/diagnostics/TracingTab.tsx +++ b/ui/src/pages/diagnostics/TracingTab.tsx @@ -37,8 +37,8 @@ export function TracingTab() { {data?.traces && data.traces.length > 0 ? (
-
- Trace ID +
+ Trace Status Duration Time @@ -46,24 +46,29 @@ export function TracingTab() { {data.traces.map((t) => (
- - {t.trace_id} - +
+ + {(t as Record).title as string || t.trace_id} + + + {t.trace_id} + +
{t.status} - + {t.duration_ms != null ? `${(t.duration_ms / 1000).toFixed(1)}s` : "N/A"} - + {new Date(t.timestamp_ms).toLocaleString()}
diff --git a/ui/src/pages/energy/ConsumptionSummary.tsx b/ui/src/pages/energy/ConsumptionSummary.tsx index dae5d25c..4e539281 100644 --- a/ui/src/pages/energy/ConsumptionSummary.tsx +++ b/ui/src/pages/energy/ConsumptionSummary.tsx @@ -6,11 +6,23 @@ import { useEntities } from "@/api/hooks"; export function ConsumptionSummary() { const { data, isLoading } = useEntities("sensor"); + const isAggregate = (e: { entity_id: string; attributes: Record }) => { + const id = e.entity_id.toLowerCase(); + if (/_total|_sum|_combined|_aggregate/.test(id)) return true; + const src = e.attributes?.source; + if (Array.isArray(src) && src.length > 0) return true; + const lastReset = e.attributes?.last_reset; + const stateClass = e.attributes?.state_class; + if (stateClass === "total" && lastReset == null) return true; + return false; + }; + const energySensors = (data?.entities ?? []).filter( (e) => - e.device_class === "energy" || - e.unit_of_measurement === "kWh" || - e.unit_of_measurement === "Wh", + (e.device_class === "energy" || + e.unit_of_measurement === "kWh" || + e.unit_of_measurement === "Wh") && + !isAggregate(e), ); const powerSensors = (data?.entities ?? []).filter( diff --git a/ui/src/pages/optimization/index.tsx b/ui/src/pages/optimization/index.tsx index 0b40990f..5c8f7433 100644 --- a/ui/src/pages/optimization/index.tsx +++ b/ui/src/pages/optimization/index.tsx @@ -9,6 +9,8 @@ import { Clock, Zap, ArrowRight, + History, + ChevronDown, } from "lucide-react"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Badge } from "@/components/ui/badge"; @@ -19,8 +21,12 @@ import { useSuggestions, useRunOptimization, useOptimizationJob, + useJobHistory, } from "@/api/hooks"; -import type { AutomationSuggestion } from "@/api/client/optimization"; +import type { + AutomationSuggestion, + OptimizationJob, +} from "@/api/client/optimization"; import { SuggestionDetail } from "./SuggestionDetail"; const ANALYSIS_TYPES = [ @@ -46,11 +52,16 @@ export function OptimizationPage() { const [hours, setHours] = useState(168); const [activeJobId, setActiveJobId] = usePersistedState("optimization:activeJobId", null); const [expandedSuggestionId, setExpandedSuggestionId] = useState(null); + const [historyOpen, setHistoryOpen] = useState(false); + const [historyFilter, setHistoryFilter] = useState(undefined); const runOpt = useRunOptimization(); const { data: jobData } = useOptimizationJob(activeJobId); const { data: suggestionsData, isLoading: loadingSuggestions } = useSuggestions(); + const { data: historyData, isLoading: loadingHistory } = useJobHistory( + historyOpen ? historyFilter : undefined, + ); const handleRun = () => { runOpt.mutate( @@ -237,6 +248,72 @@ export function OptimizationPage() { )}
+ {/* Job History */} +
+ + + {historyOpen && ( + + +
+ + +
+ + {loadingHistory ? ( +
+ {Array.from({ length: 3 }).map((_, i) => ( + + ))} +
+ ) : !historyData || historyData.length === 0 ? ( +

+ No jobs found. +

+ ) : ( +
+ {historyData.map((job) => ( + setActiveJobId(job.job_id)} + /> + ))} +
+ )} +
+
+ )} +
+ {/* Detail overlay */} {expandedSuggestion && ( void; +}) { + const cfg = STATUS_CONFIG[job.status] ?? STATUS_CONFIG.pending; + const Icon = cfg.icon; + const dateStr = job.started_at + ? new Date(job.started_at).toLocaleDateString(undefined, { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + }) + : "—"; + + return ( + + ); +} + function SuggestionCard({ suggestion, isExpanded, diff --git a/ui/src/pages/settings/index.tsx b/ui/src/pages/settings/index.tsx index e9a23a82..9e5bdee7 100644 --- a/ui/src/pages/settings/index.tsx +++ b/ui/src/pages/settings/index.tsx @@ -6,8 +6,10 @@ import { Timer, LayoutDashboard, FlaskConical, + Bell, Loader2, CheckCircle, + AlertCircle, } from "lucide-react"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Button } from "@/components/ui/button"; @@ -158,6 +160,8 @@ const DATA_SCIENCE_FIELDS: FieldDef[] = [ // ─── Section Form Component ───────────────────────────────────────────────── +type SettingsValue = number | boolean | string; + function SectionForm({ section, fields, @@ -166,16 +170,18 @@ function SectionForm({ onReset, isSaving, isResetting, + error, }: { section: string; fields: FieldDef[]; - values: Record; - onSave: (section: string, data: Record) => void; + values: Record; + onSave: (section: string, data: Record) => void; onReset: (section: string) => void; isSaving: boolean; isResetting: boolean; + error?: string | null; }) { - const [local, setLocal] = useState>({}); + const [local, setLocal] = useState>({}); const [saved, setSaved] = useState(false); // Sync local state when server values change @@ -190,8 +196,7 @@ function SectionForm({ }); const handleSave = () => { - // Only send changed fields - const changed: Record = {}; + const changed: Record = {}; for (const f of fields) { if (local[f.key] !== values[f.key]) { changed[f.key] = local[f.key]; @@ -247,34 +252,232 @@ function SectionForm({
))} -
- - +
+
+ + +
+ {error && ( +

+ + {error} +

+ )} +
+
+ ); +} + +// ─── Notification Form Component ───────────────────────────────────────────── + +const IMPACT_OPTIONS = [ + { value: "low", label: "Low" }, + { value: "medium", label: "Medium" }, + { value: "high", label: "High" }, + { value: "critical", label: "Critical" }, +]; + +function NotificationForm({ + values, + onSave, + onReset, + isSaving, + isResetting, + error, +}: { + values: Record; + onSave: (section: string, data: Record) => void; + onReset: (section: string) => void; + isSaving: boolean; + isResetting: boolean; + error?: string | null; +}) { + const [enabled, setEnabled] = useState(!!values.enabled); + const [minImpact, setMinImpact] = useState( + String(values.min_impact || "high"), + ); + const [quietStart, setQuietStart] = useState( + String(values.quiet_hours_start || ""), + ); + const [quietEnd, setQuietEnd] = useState( + String(values.quiet_hours_end || ""), + ); + const [saved, setSaved] = useState(false); + + useEffect(() => { + setEnabled(!!values.enabled); + setMinImpact(String(values.min_impact || "high")); + setQuietStart(String(values.quiet_hours_start || "")); + setQuietEnd(String(values.quiet_hours_end || "")); + }, [values]); + + const hasChanges = + enabled !== !!values.enabled || + minImpact !== String(values.min_impact || "high") || + quietStart !== String(values.quiet_hours_start || "") || + quietEnd !== String(values.quiet_hours_end || ""); + + const handleSave = () => { + const changed: Record = {}; + if (enabled !== !!values.enabled) changed.enabled = enabled; + if (minImpact !== String(values.min_impact || "high")) + changed.min_impact = minImpact; + if (quietStart !== String(values.quiet_hours_start || "")) + changed.quiet_hours_start = quietStart || ""; + if (quietEnd !== String(values.quiet_hours_end || "")) + changed.quiet_hours_end = quietEnd || ""; + onSave("notifications", changed as Record); + setSaved(true); + setTimeout(() => setSaved(false), 2000); + }; + + return ( +
+
+
+ +

+ Send push notifications when scheduled analysis finds actionable + insights +

+
+
+ +
+
+ +
+
+ +

+ Only notify for insights at or above this impact level +

+
+
+ +
+
+ +
+
+ +

+ Suppress notifications from this time (HH:MM) +

+
+
+ setQuietStart(e.target.value)} + className="h-8 text-sm" + /> +
+
+ +
+
+ +

+ Resume notifications at this time (HH:MM) +

+
+
+ setQuietEnd(e.target.value)} + className="h-8 text-sm" + /> +
+
+ +
+
+ + +
+ {error && ( +

+ + {error} +

+ )}
); @@ -287,14 +490,21 @@ export function SettingsPage() { const patchMut = usePatchSettings(); const resetMut = useResetSettings(); + const mutError = + (patchMut.error as Error | null)?.message ?? + (resetMut.error as Error | null)?.message ?? + null; + const handleSave = ( section: string, - changed: Record, + changed: Record, ) => { + patchMut.reset(); patchMut.mutate({ [section]: changed }); }; const handleReset = (section: string) => { + resetMut.reset(); resetMut.mutate(section); }; @@ -320,10 +530,10 @@ export function SettingsPage() { - + - Chat & Streaming + Chat @@ -333,6 +543,10 @@ export function SettingsPage() { Data Science + + + Notifications + @@ -349,6 +563,7 @@ export function SettingsPage() { onReset={handleReset} isSaving={patchMut.isPending} isResetting={resetMut.isPending} + error={mutError} /> @@ -368,6 +583,7 @@ export function SettingsPage() { onReset={handleReset} isSaving={patchMut.isPending} isResetting={resetMut.isPending} + error={mutError} /> @@ -387,6 +603,27 @@ export function SettingsPage() { onReset={handleReset} isSaving={patchMut.isPending} isResetting={resetMut.isPending} + error={mutError} + /> + + + + + + + + + Insight Notifications + + + +