From 438a3831ff900539ca0216f98fc4f16e81cb0838 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Mon, 2 Feb 2026 13:27:04 -0500 Subject: [PATCH 01/15] Improve search tool to extract resolved urls --- .../aieng/agent_evals/tools/search.py | 175 +++++++++++++++--- .../aieng/agent_evals/tools/test_search.py | 65 +++++-- 2 files changed, 198 insertions(+), 42 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/search.py b/aieng-eval-agents/aieng/agent_evals/tools/search.py index 90493de..26f7952 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/search.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/search.py @@ -1,18 +1,19 @@ """Google Search tool for knowledge-grounded QA using ADK. -This module provides the GoogleSearchTool configuration for use with -Google ADK agents, enabling explicit and traceable web search capabilities. +This module provides a search tool that returns actual URLs the agent can fetch, +enabling a proper research workflow: search → fetch → verify → answer. """ +import asyncio +import concurrent.futures import logging -from typing import TYPE_CHECKING +from typing import Any -from google.adk.tools.google_search_tool import GoogleSearchTool +from google.adk.tools.function_tool import FunctionTool +from google.genai import Client, types from pydantic import BaseModel, Field - -if TYPE_CHECKING: - pass +from .web import resolve_redirect_urls_async logger = logging.getLogger(__name__) @@ -56,28 +57,6 @@ def format_with_citations(self) -> str: return "\n".join(output_parts) -def create_google_search_tool() -> GoogleSearchTool: - """Create a GoogleSearchTool configured for use with other tools. - - This creates a GoogleSearchTool with bypass_multi_tools_limit=True, - which allows it to be used alongside other custom tools in an ADK agent. - The tool calls are explicit and visible in the agent's reasoning trace. - - Returns - ------- - GoogleSearchTool - A configured GoogleSearchTool instance. - - Examples - -------- - >>> from aieng.agent_evals.tools import create_google_search_tool - >>> search_tool = create_google_search_tool() - >>> # Use with an ADK agent - >>> agent = Agent(tools=[search_tool]) - """ - return GoogleSearchTool(bypass_multi_tools_limit=True) - - def format_response_with_citations(response: GroundedResponse) -> str: """Format a grounded response with inline citations. @@ -96,3 +75,141 @@ def format_response_with_citations(response: GroundedResponse) -> str: This is a convenience wrapper around ``response.format_with_citations()``. """ return response.format_with_citations() + + +async def _google_search_async(query: str) -> dict[str, Any]: + """Execute a Google search and return results with actual URLs. + + This function calls Gemini with Google Search grounding enabled, + extracts the grounding URLs, resolves any redirects, and returns + a structured response the agent can use for further fetching. + + Parameters + ---------- + query : str + The search query. + + Returns + ------- + dict + Search results with 'summary' text and 'sources' list containing + actual URLs the agent can fetch. + """ + client = Client() + + try: + response = client.models.generate_content( + model="gemini-2.5-flash", + contents=query, + config=types.GenerateContentConfig( + tools=[types.Tool(google_search=types.GoogleSearch())], + temperature=0.0, + ), + ) + + # Extract text summary + summary = "" + if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: + for part in response.candidates[0].content.parts: + if hasattr(part, "text") and part.text: + summary += part.text + + # Extract grounding URLs + sources = [] + gm = getattr(response.candidates[0], "grounding_metadata", None) if response.candidates else None + + if gm and hasattr(gm, "grounding_chunks") and gm.grounding_chunks: + redirect_urls = [] + titles = [] + for chunk in gm.grounding_chunks: + if hasattr(chunk, "web") and chunk.web: + redirect_urls.append(getattr(chunk.web, "uri", "") or "") + titles.append(getattr(chunk.web, "title", "") or "") + + # Resolve redirect URLs to actual URLs + if redirect_urls: + resolved_urls = await resolve_redirect_urls_async(redirect_urls) + for title, url in zip(titles, resolved_urls): + if url and not url.startswith("https://vertexaisearch"): + sources.append({"title": title, "url": url}) + + return { + "status": "success", + "summary": summary, + "sources": sources, + "source_count": len(sources), + } + + except Exception as e: + logger.error(f"Search failed: {e}") + return { + "status": "error", + "error": str(e), + "summary": "", + "sources": [], + } + + +def google_search(query: str) -> dict[str, Any]: + """Search Google and return results with actual URLs for fetching. + + Use this tool to find information on the web. The results include: + - A summary of what was found + - A list of source URLs that you can fetch with web_fetch to verify information + + IMPORTANT: The summary is from search snippets which may be incomplete or outdated. + Always use web_fetch on the source URLs to verify information before answering. + + Parameters + ---------- + query : str + The search query. Be specific and include key terms. + + Returns + ------- + dict + Contains 'summary' (brief overview), 'sources' (list of URLs to fetch), + and 'source_count'. On error, contains 'status': 'error' and 'error' message. + + Examples + -------- + >>> result = google_search("highest single day snowfall Toronto") + >>> # Check the sources + >>> for source in result["sources"]: + ... print(f"{source['title']}: {source['url']}") + >>> # Then fetch to verify + >>> page = web_fetch(result["sources"][0]["url"]) + """ + logger.info(f"GoogleSearch: {query}") + + # Handle being called from async context + try: + asyncio.get_running_loop() + # We're in an async context - need to run in a new thread + with concurrent.futures.ThreadPoolExecutor() as pool: + future = pool.submit(asyncio.run, _google_search_async(query)) + return future.result() + except RuntimeError: + # No running loop - we can use asyncio.run directly + return asyncio.run(_google_search_async(query)) + + +def create_google_search_tool() -> FunctionTool: + """Create a search tool that returns actual URLs for fetching. + + This tool calls Google Search, extracts grounding URLs, resolves redirects, + and returns actual URLs the agent can use with web_fetch for verification. + + Returns + ------- + FunctionTool + A search tool that returns fetchable URLs. + + Examples + -------- + >>> from aieng.agent_evals.tools import create_google_search_tool + >>> search_tool = create_google_search_tool() + >>> # Use with an ADK agent + >>> agent = Agent(tools=[search_tool]) + """ + return FunctionTool(func=google_search) diff --git a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py index 72f031a..223eea2 100644 --- a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py +++ b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py @@ -1,14 +1,14 @@ """Tests for Google Search tool.""" -from unittest.mock import MagicMock, patch - import pytest from aieng.agent_evals.tools import ( GroundedResponse, GroundingChunk, create_google_search_tool, format_response_with_citations, + google_search, ) +from google.adk.tools.function_tool import FunctionTool class TestGroundingChunk: @@ -81,16 +81,13 @@ def test_format_with_citations_no_sources(self): class TestCreateGoogleSearchTool: """Tests for the create_google_search_tool function.""" - @patch("aieng.agent_evals.tools.search.GoogleSearchTool") - def test_creates_tool_with_bypass_flag(self, mock_tool_class): - """Test that the tool is created with bypass_multi_tools_limit=True.""" - mock_tool = MagicMock() - mock_tool_class.return_value = mock_tool - + def test_creates_function_tool(self): + """Test that the tool is created as a FunctionTool wrapping google_search.""" result = create_google_search_tool() - mock_tool_class.assert_called_once_with(bypass_multi_tools_limit=True) - assert result is mock_tool + assert isinstance(result, FunctionTool) + # The function tool should wrap the google_search function + assert result.func.__name__ == "google_search" class TestFormatResponseWithCitations: @@ -160,7 +157,49 @@ class TestGoogleSearchToolIntegration: """ def test_create_google_search_tool_real(self): - """Test creating a real GoogleSearchTool instance.""" + """Test creating a real FunctionTool instance wrapping google_search.""" tool = create_google_search_tool() - # The tool should be a GoogleSearchTool instance with bypass flag - assert tool is not None + # The tool should be a FunctionTool wrapping google_search + assert isinstance(tool, FunctionTool) + + def test_google_search_returns_urls(self): + """Test that google_search returns actual URLs, not redirect URLs.""" + result = google_search("capital of France") + + # Should have success status + assert result["status"] == "success" + + # Should have a summary + assert result["summary"], "Expected non-empty summary" + + # Should have sources with URLs + assert result["source_count"] > 0, "Expected at least one source" + assert len(result["sources"]) == result["source_count"] + + # Each source should have title and url + for source in result["sources"]: + assert "title" in source + assert "url" in source + # URL should be a real URL, not a redirect URL + assert source["url"].startswith("http"), f"Expected URL, got: {source['url']}" + assert "vertexaisearch" not in source["url"], "URL should not be a redirect URL" + + def test_google_search_response_structure(self): + """Test the complete response structure from google_search.""" + result = google_search("Python programming language") + + # Check all expected keys exist + assert "status" in result + assert "summary" in result + assert "sources" in result + assert "source_count" in result + + # Sources should be a list + assert isinstance(result["sources"], list) + + # If we have sources, verify their structure + if result["sources"]: + source = result["sources"][0] + assert isinstance(source, dict) + assert "title" in source + assert "url" in source From c2b09d25be0574b858531671f1af03716c7d665e Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Mon, 2 Feb 2026 13:29:30 -0500 Subject: [PATCH 02/15] Add web fetch tool --- .../aieng/agent_evals/tools/web.py | 488 ++++++++++++++++++ 1 file changed, 488 insertions(+) create mode 100644 aieng-eval-agents/aieng/agent_evals/tools/web.py diff --git a/aieng-eval-agents/aieng/agent_evals/tools/web.py b/aieng-eval-agents/aieng/agent_evals/tools/web.py new file mode 100644 index 0000000..feed1c8 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/tools/web.py @@ -0,0 +1,488 @@ +"""Web fetch tool for retrieving content from URLs. + +Provides the web_fetch tool which fetches content from any URL (HTML pages or PDFs) +and returns the content for the agent to analyze. Similar to Anthropic's web_fetch tool. +""" + +import asyncio +import logging +from functools import lru_cache +from io import BytesIO +from typing import Any +from urllib.parse import urljoin + +import httpx +from google.adk.tools.function_tool import FunctionTool +from html_to_markdown import convert as html_to_markdown +from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential + + +logger = logging.getLogger(__name__) + + +# Maximum content size to return (100KB for text, to avoid context overflow) +MAX_CONTENT_CHARS = 100_000 + +# Known redirect URL patterns (Vertex AI grounding redirects) +REDIRECT_URL_PATTERNS = ( + "vertexaisearch.cloud.google.com/grounding-api-redirect", + "vertexaisearch.cloud.google.com/redirect", +) + +# Cache for resolved URLs (in-memory, cleared on restart) +_redirect_cache: dict[str, str] = {} + +# Default timeouts for redirect resolution +_REDIRECT_CONNECT_TIMEOUT = 10.0 # Time to establish connection +_REDIRECT_READ_TIMEOUT = 15.0 # Time to receive response + +# User agent for redirect resolution requests +_USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + + +def _is_redirect_url(url: str) -> bool: + """Check if URL is a known redirect pattern.""" + return any(pattern in url for pattern in REDIRECT_URL_PATTERNS) + + +def _get_redirect_timeout() -> httpx.Timeout: + """Get timeout configuration for redirect resolution.""" + return httpx.Timeout( + connect=_REDIRECT_CONNECT_TIMEOUT, + read=_REDIRECT_READ_TIMEOUT, + write=10.0, + pool=10.0, + ) + + +def _resolve_with_head(client: httpx.Client, url: str) -> str | None: + """Try to resolve redirect using HEAD request.""" + try: + response = client.head(url, headers={"User-Agent": _USER_AGENT}) + return str(response.url) + except httpx.HTTPStatusError as e: + # Some servers return 405 Method Not Allowed for HEAD + if e.response.status_code in (405, 501): + return None # Signal to try GET + raise + except Exception: + return None + + +def _resolve_with_get(client: httpx.Client, url: str) -> str: + """Resolve redirect using GET request (fallback when HEAD fails).""" + # Use stream=True to avoid downloading the body + with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: + return str(response.url) + + +@lru_cache(maxsize=256) +def resolve_redirect_url(url: str) -> str: + """Resolve a redirect URL to its final destination without downloading content. + + This is useful for resolving Vertex AI grounding redirect URLs to actual URLs + before displaying them in traces, CLI output, or citations. + + Results are cached to avoid repeated HTTP calls for the same URL. + + Uses robust resolution with: + - Configurable timeouts (connect, read, total) + - HEAD request first, falls back to GET if server doesn't support HEAD + - Retries with exponential backoff for transient failures + - Realistic User-Agent to avoid blocks + + Parameters + ---------- + url : str + The URL to resolve (may be a redirect URL). + + Returns + ------- + str + The final destination URL after following redirects. + Returns the original URL if resolution fails. + """ + # Skip resolution for non-redirect URLs + if not _is_redirect_url(url): + return url + + try: + with httpx.Client(timeout=_get_redirect_timeout(), follow_redirects=True) as client: + # Try HEAD first (faster, no body download) + final_url = _resolve_with_head(client, url) + + # Fall back to GET if HEAD failed + if final_url is None: + logger.debug(f"HEAD failed for {url[:60]}..., trying GET") + final_url = _resolve_with_get(client, url) + + if final_url != url: + logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") + return final_url + except Exception as e: + logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(e).__name__}: {e}") + return url + + +async def _resolve_with_head_async(client: httpx.AsyncClient, url: str) -> str | None: + """Try to resolve redirect using async HEAD request.""" + try: + response = await client.head(url, headers={"User-Agent": _USER_AGENT}) + return str(response.url) + except httpx.HTTPStatusError as e: + # Some servers return 405 Method Not Allowed for HEAD + if e.response.status_code in (405, 501): + return None # Signal to try GET + raise + except Exception: + return None + + +async def _resolve_with_get_async(client: httpx.AsyncClient, url: str) -> str: + """Resolve redirect using async GET request (fallback when HEAD fails).""" + # Use stream to avoid downloading the body + async with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: + return str(response.url) + + +async def _resolve_single_url_async( + client: httpx.AsyncClient, + url: str, + max_retries: int = 3, + base_delay: float = 1.0, +) -> str: + """Resolve a single URL with retries and exponential backoff. + + Parameters + ---------- + client : httpx.AsyncClient + The HTTP client to use. + url : str + The URL to resolve. + max_retries : int + Maximum number of retry attempts. + base_delay : float + Base delay between retries (doubles each retry). + + Returns + ------- + str + The resolved URL, or original URL on failure. + """ + # Skip resolution for non-redirect URLs + if not _is_redirect_url(url): + return url + + # Check cache first + if url in _redirect_cache: + return _redirect_cache[url] + + last_error: Exception | None = None + + for attempt in range(max_retries): + try: + # Try HEAD first (faster, no body download) + final_url = await _resolve_with_head_async(client, url) + + # Fall back to GET if HEAD failed + if final_url is None: + logger.debug(f"HEAD failed for {url[:60]}..., trying GET (attempt {attempt + 1})") + final_url = await _resolve_with_get_async(client, url) + + if final_url != url: + logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") + + _redirect_cache[url] = final_url + return final_url + + except (httpx.TimeoutException, httpx.ConnectError, httpx.ReadError) as e: + last_error = e + if attempt < max_retries - 1: + delay = base_delay * (2**attempt) # Exponential backoff + logger.debug(f"Retry {attempt + 1}/{max_retries} for {url[:60]}... after {delay}s: {e}") + await asyncio.sleep(delay) + continue + except Exception as e: + # Non-retryable error + last_error = e + break + + # All retries exhausted or non-retryable error + logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(last_error).__name__}: {last_error}") + _redirect_cache[url] = url # Cache failures to avoid repeated attempts + return url + + +async def resolve_redirect_url_async(url: str) -> str: + """Async version of resolve_redirect_url with caching and retries. + + Parameters + ---------- + url : str + The URL to resolve (may be a redirect URL). + + Returns + ------- + str + The final destination URL after following redirects. + """ + # Skip resolution for non-redirect URLs (fast path) + if not _is_redirect_url(url): + return url + + # Check cache first (fast path) + if url in _redirect_cache: + return _redirect_cache[url] + + async with httpx.AsyncClient( + timeout=_get_redirect_timeout(), + follow_redirects=True, + ) as client: + return await _resolve_single_url_async(client, url) + + +async def resolve_redirect_urls_async(urls: list[str]) -> list[str]: + """Resolve multiple redirect URLs in parallel. + + Resolves URLs concurrently with proper error handling per URL. + + Parameters + ---------- + urls : list[str] + List of URLs to resolve. + + Returns + ------- + list[str] + List of resolved URLs in the same order. + """ + if not urls: + return [] + + async with httpx.AsyncClient( + timeout=_get_redirect_timeout(), + follow_redirects=True, + limits=httpx.Limits(max_connections=20, max_keepalive_connections=10), + ) as client: + # Resolve all URLs in parallel + tasks = [_resolve_single_url_async(client, url) for url in urls] + return list(await asyncio.gather(*tasks)) + + +_http_retry = retry( + stop=stop_after_attempt(3), + wait=wait_exponential(multiplier=1, min=1, max=10), + retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)), + reraise=True, +) + + +@_http_retry +def _fetch_with_retry(client: httpx.Client, url: str) -> httpx.Response: + """Fetch URL with automatic retry on transient failures.""" + response = client.get(url, headers={"User-Agent": "Mozilla/5.0 (compatible; ResearchBot/1.0)"}) + response.raise_for_status() + return response + + +def _html_to_markdown(html: str, base_url: str | None = None) -> str: + """Convert HTML to Markdown, preserving links, tables, and structure. + + Parameters + ---------- + html : str + The HTML content to convert. + base_url : str, optional + Base URL for resolving relative links. + + Returns + ------- + str + Markdown-formatted text with preserved links and tables. + """ + # Use html-to-markdown library for high-quality conversion + # It preserves links, tables, headings, lists, and other structure + markdown = html_to_markdown(html) + + # If base_url provided, convert relative URLs to absolute + if base_url: + import re # noqa: PLC0415 + + def make_absolute(match: re.Match) -> str: + """Convert relative URL to absolute.""" + prefix = match.group(1) # [text]( or src=" + url = match.group(2) + suffix = match.group(3) # ) or " + + # Skip if already absolute or is a data URI + if url.startswith(("http://", "https://", "data:", "mailto:", "#")): + return match.group(0) + + absolute_url = urljoin(base_url, url) + return f"{prefix}{absolute_url}{suffix}" + + # Fix markdown links: [text](url) + markdown = re.sub(r"(\[[^\]]*\]\()([^)]+)(\))", make_absolute, markdown) + + # Fix markdown images: ![alt](url) + markdown = re.sub(r"(!\[[^\]]*\]\()([^)]+)(\))", make_absolute, markdown) + + return markdown.strip() + + +def _extract_pdf_text(content: bytes, max_pages: int = 10) -> tuple[str, int]: + """Extract text from PDF bytes. + + Parameters + ---------- + content : bytes + The PDF file content. + max_pages : int + Maximum number of pages to extract. + + Returns + ------- + tuple[str, int] + The extracted text and total number of pages. + """ + from pypdf import PdfReader # noqa: PLC0415 + + pdf_file = BytesIO(content) + reader = PdfReader(pdf_file) + num_pages = len(reader.pages) + + pages_to_read = min(num_pages, max_pages) + text_parts = [] + + for i in range(pages_to_read): + page_text = reader.pages[i].extract_text() + if page_text: + text_parts.append(f"--- Page {i + 1} ---\n{page_text}") + + if pages_to_read < num_pages: + text_parts.append(f"\n[Document has {num_pages} pages. Showing first {pages_to_read}.]") + + return "\n\n".join(text_parts), num_pages + + +def _truncate_content(text: str) -> tuple[str, bool]: + """Truncate content if it exceeds the maximum length.""" + truncated = len(text) > MAX_CONTENT_CHARS + if truncated: + text = text[:MAX_CONTENT_CHARS] + "\n\n[Content truncated due to length]" + return text, truncated + + +def _make_error_response(error: str, url: str) -> dict[str, Any]: + """Create an error response dict.""" + return {"status": "error", "error": error, "url": url} + + +def _make_success_response(url: str, content: str, content_type: str, truncated: bool, **extra: Any) -> dict[str, Any]: + """Create a success response dict.""" + result = { + "status": "success", + "url": url, + "content": content, + "content_type": content_type, + "content_length": len(content), + "truncated": truncated, + } + result.update(extra) + return result + + +def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: + """Fetch content from a URL (HTML page or PDF document). + + This tool retrieves the full content from a URL for analysis. It handles + both HTML pages (converted to readable text) and PDF documents (text extracted). + + For large data files (CSV, XLSX) that need searching, use fetch_file instead. + + Parameters + ---------- + url : str + The URL to fetch. Must be a valid HTTP or HTTPS URL. + max_pages : int, optional + For PDFs, maximum number of pages to extract (default 10). + + Returns + ------- + dict + On success: 'status', 'url', 'content', 'content_type', + 'content_length', 'truncated'. For PDFs also includes: + 'num_pages', 'pages_extracted'. On error: 'status', 'error', 'url'. + + Examples + -------- + >>> # Fetch an HTML page + >>> result = web_fetch("https://example.com/about") + >>> print(result["content"]) + + >>> # Fetch a PDF + >>> result = web_fetch("https://arxiv.org/pdf/2301.00234.pdf") + >>> print(f"Pages: {result['num_pages']}") + >>> print(result["content"]) + """ + logger.info(f"WebFetch: {url}") + + # Validate URL + if not url.startswith(("http://", "https://")): + return _make_error_response("Invalid URL. Must start with http:// or https://", url) + + try: + with httpx.Client(timeout=60.0, follow_redirects=True) as client: + response = _fetch_with_retry(client, url) + content_type = response.headers.get("content-type", "") + final_url = str(response.url) + + # Handle PDF documents + if "application/pdf" in content_type or url.lower().endswith(".pdf"): + return _handle_pdf_response(response.content, max_pages, final_url, url) + + # Handle HTML and text content + if "text/html" in content_type or not content_type: + text = _html_to_markdown(response.text, base_url=final_url) + else: + text = response.text + text, truncated = _truncate_content(text) + + return _make_success_response(final_url, text, content_type or "text/html", truncated) + + except httpx.HTTPStatusError as e: + logger.warning(f"HTTP error fetching {url}: {e}") + return _make_error_response(f"HTTP {e.response.status_code}: {e.response.reason_phrase}", url) + except httpx.RequestError as e: + logger.warning(f"Request error fetching {url}: {e}") + return _make_error_response(f"Request failed: {e!s}", url) + except Exception as e: + logger.exception(f"Unexpected error in web_fetch for {url}") + return _make_error_response(f"Unexpected error: {e!s}", url) + + +def _handle_pdf_response(content: bytes, max_pages: int, final_url: str, url: str) -> dict[str, Any]: + """Handle PDF content extraction and response creation.""" + try: + text, num_pages = _extract_pdf_text(content, max_pages) + text, truncated = _truncate_content(text) + + return _make_success_response( + final_url, + text, + "application/pdf", + truncated, + num_pages=num_pages, + pages_extracted=min(num_pages, max_pages), + ) + except ImportError: + return _make_error_response("PDF support requires pypdf. Install with: pip install pypdf", url) + except Exception as e: + return _make_error_response(f"Failed to extract PDF text: {e!s}", url) + + +def create_web_fetch_tool() -> FunctionTool: + """Create an ADK FunctionTool for fetching web content.""" + return FunctionTool(func=web_fetch) From 67bb38d5fa88a91e9f5e7f48f2ae16303193344c Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Mon, 2 Feb 2026 13:31:09 -0500 Subject: [PATCH 03/15] Add html_to_markdown dependency --- pyproject.toml | 8 +++++++- uv.lock | 26 ++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9618ee0..74c34d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,6 @@ dependencies = [ "pydantic-settings>=2.7.0", "pydantic-ai-slim[logfire]>=1.26.0", "scikit-learn>=1.7.0", - "weaviate-client>=4.18.3", "urllib3>=2.6.3", "openpyxl>=3.1.5", "authlib>=1.6.6", @@ -53,6 +52,7 @@ dev = [ "pytest-mock>=3.14.0", "ruff>=0.14.7", "transformers>=4.57.3", + "pandas-stubs>=2.3.3.260113", ] docs = [ "jinja2>=3.1.6", # Pinning version to address vulnerability GHSA-cpwx-vrp4-4pq7 @@ -160,3 +160,9 @@ filterwarnings = [ [tool.coverage.run] source=["aieng-eval-agents/aieng"] omit=["aieng-eval-agents/tests/*", "*__init__.py", "scripts/*"] + +[tool.mypy] +mypy_path = "aieng-eval-agents" +namespace_packages = true +explicit_package_bases = true +exclude = ["tests/"] diff --git a/uv.lock b/uv.lock index 504473b..5f2cbed 100644 --- a/uv.lock +++ b/uv.lock @@ -42,7 +42,6 @@ dependencies = [ { name = "tenacity" }, { name = "urllib3" }, { name = "virtualenv" }, - { name = "weaviate-client" }, ] [package.dev-dependencies] @@ -56,6 +55,7 @@ dev = [ { name = "jupyterlab" }, { name = "mypy" }, { name = "nbqa" }, + { name = "pandas-stubs" }, { name = "pip-audit" }, { name = "pre-commit" }, { name = "pytest" }, @@ -107,7 +107,6 @@ requires-dist = [ { name = "tenacity", specifier = ">=9.1.2" }, { name = "urllib3", specifier = ">=2.6.3" }, { name = "virtualenv", specifier = ">=20.36.1" }, - { name = "weaviate-client", specifier = ">=4.18.3" }, ] [package.metadata.requires-dev] @@ -121,6 +120,7 @@ dev = [ { name = "jupyterlab", specifier = ">=4.4.8" }, { name = "mypy", specifier = ">=1.19.0" }, { name = "nbqa", specifier = ">=1.9.1" }, + { name = "pandas-stubs", specifier = ">=2.3.3.260113" }, { name = "pip-audit", specifier = ">=2.9.0" }, { name = "pre-commit", specifier = ">=4.2.0" }, { name = "pytest", specifier = ">=8.3.4" }, @@ -4177,6 +4177,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/70/44/5191d2e4026f86a2a109053e194d3ba7a31a2d10a9c2348368c63ed4e85a/pandas-2.3.3-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:3869faf4bd07b3b66a9f462417d0ca3a9df29a9f6abd5d0d0dbab15dac7abe87", size = 13202175 }, ] +[[package]] +name = "pandas-stubs" +version = "2.3.3.260113" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "numpy" }, + { name = "types-pytz" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/92/5d/be23854a73fda69f1dbdda7bc10fbd6f930bd1fa87aaec389f00c901c1e8/pandas_stubs-2.3.3.260113.tar.gz", hash = "sha256:076e3724bcaa73de78932b012ec64b3010463d377fa63116f4e6850643d93800", size = 116131 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d1/c6/df1fe324248424f77b89371116dab5243db7f052c32cc9fe7442ad9c5f75/pandas_stubs-2.3.3.260113-py3-none-any.whl", hash = "sha256:ec070b5c576e1badf12544ae50385872f0631fc35d99d00dc598c2954ec564d3", size = 168246 }, +] + [[package]] name = "pandocfilters" version = "1.5.1" @@ -5916,6 +5929,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c5/3f/b0e8db149896005adc938a1e7f371d6d7e9eca4053a29b108978ed15e0c2/types_python_dateutil-2.9.0.20250516-py3-none-any.whl", hash = "sha256:2b2b3f57f9c6a61fba26a9c0ffb9ea5681c9b83e69cd897c6b5f668d9c0cab93", size = 14356 }, ] +[[package]] +name = "types-pytz" +version = "2025.2.0.20251108" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/40/ff/c047ddc68c803b46470a357454ef76f4acd8c1088f5cc4891cdd909bfcf6/types_pytz-2025.2.0.20251108.tar.gz", hash = "sha256:fca87917836ae843f07129567b74c1929f1870610681b4c92cb86a3df5817bdb", size = 10961 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e7/c1/56ef16bf5dcd255155cc736d276efa6ae0a5c26fd685e28f0412a4013c01/types_pytz-2025.2.0.20251108-py3-none-any.whl", hash = "sha256:0f1c9792cab4eb0e46c52f8845c8f77cf1e313cb3d68bf826aa867fe4717d91c", size = 10116 }, +] + [[package]] name = "types-requests" version = "2.32.4.20250913" From da819954e68d8860b0cf6026b0def5b410d423dd Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Mon, 2 Feb 2026 13:51:01 -0500 Subject: [PATCH 04/15] Add html_to_markdown dependency --- .../aieng/agent_evals/tools/__init__.py | 2 ++ aieng-eval-agents/pyproject.toml | 10 +++++- implementations/report_generation/evaluate.py | 13 +++++--- uv.lock | 32 ++++++++++++++++++- 4 files changed, 50 insertions(+), 7 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/__init__.py b/aieng-eval-agents/aieng/agent_evals/tools/__init__.py index 8968cef..c32af6e 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/__init__.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/__init__.py @@ -9,6 +9,7 @@ GroundingChunk, create_google_search_tool, format_response_with_citations, + google_search, ) @@ -16,6 +17,7 @@ # Search tools "create_google_search_tool", "format_response_with_citations", + "google_search", "GroundedResponse", "GroundingChunk", ] diff --git a/aieng-eval-agents/pyproject.toml b/aieng-eval-agents/pyproject.toml index 39af54b..151dcc7 100644 --- a/aieng-eval-agents/pyproject.toml +++ b/aieng-eval-agents/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "aieng-eval-agents" -version = "0.1.0" +version = "0.2.0" description = "Agentic AI Evaluation reference implementation from Vector Institute AI Engineering" authors = [{name = "Vector AI Engineering", email = "ai_engineering@vectorinstitute.ai"}] requires-python = ">=3.12,<4.0" @@ -18,8 +18,16 @@ dependencies = [ "openinference-instrumentation-google-adk>=0.1.0", "opentelemetry-sdk>=1.20.0", "opentelemetry-exporter-otlp-proto-http>=1.20.0", + "pypdf", + "httpx>=0.27.0", + "tenacity>=8.2.0", + "python-dotenv>=1.0.0", + "html-to-markdown>=2.24.0", ] +[project.scripts] +knowledge-agent = "aieng.agent_evals.knowledge_agent.cli:main" + [dependency-groups] dev = [ "pytest>=8.3.5", diff --git a/implementations/report_generation/evaluate.py b/implementations/report_generation/evaluate.py index d5b4447..a0003f5 100644 --- a/implementations/report_generation/evaluate.py +++ b/implementations/report_generation/evaluate.py @@ -2,13 +2,13 @@ import asyncio import logging +from typing import Any import agents import click from aieng.agent_evals.async_client_manager import AsyncClientManager from dotenv import load_dotenv -from langfuse._client.datasets import DatasetItemClient -from langfuse.experiment import Evaluation +from langfuse.experiment import Evaluation, ExperimentItem from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall from pydantic import BaseModel from tenacity import retry, stop_after_attempt, wait_exponential @@ -92,12 +92,12 @@ async def evaluate(dataset_name: str): logger.warning(f"Client manager services not closed successfully: {e}") -async def agent_task(*, item: DatasetItemClient, **kwargs) -> str | None: +async def agent_task(*, item: ExperimentItem, **kwargs: Any) -> str | None: """Run the report generation agent against an item from a Langfuse dataset. Parameters ---------- - item : DatasetItemClient + item : ExperimentItem The item from the Langfuse dataset to evaluate against. Returns @@ -108,7 +108,10 @@ async def agent_task(*, item: DatasetItemClient, **kwargs) -> str | None: """ # Define and run the report generation agent report_generation_agent = get_report_generation_agent(enable_trace=True) - result = await run_agent_with_retry(report_generation_agent, item.input) + + # Handle both LocalExperimentItem (TypedDict) and DatasetItemClient + item_input = item["input"] if isinstance(item, dict) else item.input + result = await run_agent_with_retry(report_generation_agent, item_input) # Extract the report data from the result by returning the # arguments to the write_report_to_file function call diff --git a/uv.lock b/uv.lock index 5f2cbed..c561dba 100644 --- a/uv.lock +++ b/uv.lock @@ -148,11 +148,13 @@ web-search = [ [[package]] name = "aieng-eval-agents" -version = "0.1.0" +version = "0.2.0" source = { editable = "aieng-eval-agents" } dependencies = [ { name = "google-adk" }, { name = "google-genai" }, + { name = "html-to-markdown" }, + { name = "httpx" }, { name = "kagglehub" }, { name = "langfuse" }, { name = "openinference-instrumentation-google-adk" }, @@ -161,7 +163,10 @@ dependencies = [ { name = "pandas" }, { name = "pydantic" }, { name = "pydantic-settings" }, + { name = "pypdf" }, + { name = "python-dotenv" }, { name = "rich" }, + { name = "tenacity" }, ] [package.dev-dependencies] @@ -174,6 +179,8 @@ dev = [ requires-dist = [ { name = "google-adk", specifier = ">=1.23.0" }, { name = "google-genai", specifier = ">=1.52.0" }, + { name = "html-to-markdown", specifier = ">=2.24.0" }, + { name = "httpx", specifier = ">=0.27.0" }, { name = "kagglehub", specifier = ">=0.4.1" }, { name = "langfuse", specifier = ">=3.10.3" }, { name = "openinference-instrumentation-google-adk", specifier = ">=0.1.0" }, @@ -182,7 +189,10 @@ requires-dist = [ { name = "pandas", specifier = ">=2.3.0" }, { name = "pydantic", specifier = ">=2.12.0" }, { name = "pydantic-settings", specifier = ">=2.12.0" }, + { name = "pypdf" }, + { name = "python-dotenv", specifier = ">=1.0.0" }, { name = "rich", specifier = ">=13.9.0" }, + { name = "tenacity", specifier = ">=8.2.0" }, ] [package.metadata.requires-dev] @@ -2269,6 +2279,17 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/07/c6/80c95b1b2b94682a72cbdbfb85b81ae2daffa4291fbfa1b1464502ede10d/hpack-4.1.0-py3-none-any.whl", hash = "sha256:157ac792668d995c657d93111f46b4535ed114f0c9c8d672271bbec7eae1b496", size = 34357 }, ] +[[package]] +name = "html-to-markdown" +version = "2.24.5" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/90/f77a624499fd7f95dcd10e3d56fd20f7312f6c9987064a75e1da4fb7e2b4/html_to_markdown-2.24.5.tar.gz", hash = "sha256:05459e635bca56c31bdb5bbc684a54947e83d7ed4c30da3a98485ce2b60153b6", size = 293423 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a6/03/c4562d06e9b3dda55d7d5cfa5ea8d38efad500c5aa51d0028b684483a388/html_to_markdown-2.24.5-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:cb7e74839d01ee397dc5662e3474004a4baddbc5d3f94aa3c517f1cba735708a", size = 8850237 }, + { url = "https://files.pythonhosted.org/packages/93/20/b5942e47a6de5f662f629db0bf8b7206e2c9352a7123914225a4d26e0a4b/html_to_markdown-2.24.5-cp310-abi3-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c7b4903aeb38795cec2e7a6f9b8e5b5d2d874e367935e9ef7ba9f60d76505066", size = 9673130 }, + { url = "https://files.pythonhosted.org/packages/e9/26/03ef12d72f0a5fbc7838f02988865e9f62ab6084bebb8a4de0ba55ae5986/html_to_markdown-2.24.5-cp310-abi3-win_amd64.whl", hash = "sha256:66d7369ee721e5e5eb043e71e104eda829da6c9f72deaa86241653c5cd00f7a2", size = 9266646 }, +] + [[package]] name = "httpcore" version = "1.0.9" @@ -4853,6 +4874,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/05/e7/df2285f3d08fee213f2d041540fa4fc9ca6c2d44cf36d3a035bf2a8d2bcc/pyparsing-3.2.3-py3-none-any.whl", hash = "sha256:a749938e02d6fd0b59b356ca504a24982314bb090c383e3cf201c95ef7e2bfcf", size = 111120 }, ] +[[package]] +name = "pypdf" +version = "6.6.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b8/bb/a44bab1ac3c54dbcf653d7b8bcdee93dddb2d3bf025a3912cacb8149a2f2/pypdf-6.6.2.tar.gz", hash = "sha256:0a3ea3b3303982333404e22d8f75d7b3144f9cf4b2970b96856391a516f9f016", size = 5281850 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7d/be/549aaf1dfa4ab4aed29b09703d2fb02c4366fc1f05e880948c296c5764b9/pypdf-6.6.2-py3-none-any.whl", hash = "sha256:44c0c9811cfb3b83b28f1c3d054531d5b8b81abaedee0d8cb403650d023832ba", size = 329132 }, +] + [[package]] name = "pytest" version = "8.3.5" From 05471dcb3fa40e501389e773a85bb8aebe0e50d0 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Mon, 2 Feb 2026 14:06:36 -0500 Subject: [PATCH 05/15] Refactor out the redirect url code --- .../aieng/agent_evals/tools/_redirect.py | 261 ++++++++++ .../aieng/agent_evals/tools/search.py | 2 +- .../aieng/agent_evals/tools/web.py | 488 ------------------ 3 files changed, 262 insertions(+), 489 deletions(-) create mode 100644 aieng-eval-agents/aieng/agent_evals/tools/_redirect.py delete mode 100644 aieng-eval-agents/aieng/agent_evals/tools/web.py diff --git a/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py b/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py new file mode 100644 index 0000000..fd8f499 --- /dev/null +++ b/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py @@ -0,0 +1,261 @@ +"""URL redirect resolution utilities. + +Provides utilities for resolving redirect URLs (especially Vertex AI grounding +redirects) to their final destinations. Used by search and web fetch tools to +display actual URLs. +""" + +import asyncio +import logging +from functools import lru_cache + +import httpx + + +logger = logging.getLogger(__name__) + +REDIRECT_URL_PATTERNS = ( + "vertexaisearch.cloud.google.com/grounding-api-redirect", + "vertexaisearch.cloud.google.com/redirect", +) + +_REDIRECT_CONNECT_TIMEOUT = 10.0 +_REDIRECT_READ_TIMEOUT = 15.0 +_USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) +_redirect_cache: dict[str, str] = {} + + +def _is_redirect_url(url: str) -> bool: + """Check if URL is a known redirect pattern.""" + return any(pattern in url for pattern in REDIRECT_URL_PATTERNS) + + +def _get_redirect_timeout() -> httpx.Timeout: + """Get timeout configuration for redirect resolution.""" + return httpx.Timeout( + connect=_REDIRECT_CONNECT_TIMEOUT, + read=_REDIRECT_READ_TIMEOUT, + write=10.0, + pool=10.0, + ) + + +# ============================================================================ +# Synchronous Redirect Resolution +# ============================================================================ + + +def _resolve_with_head(client: httpx.Client, url: str) -> str | None: + """Try to resolve redirect using HEAD request.""" + try: + response = client.head(url, headers={"User-Agent": _USER_AGENT}) + return str(response.url) + except httpx.HTTPStatusError as e: + # Some servers return 405 Method Not Allowed for HEAD + if e.response.status_code in (405, 501): + return None # Signal to try GET + raise + except Exception: + return None + + +def _resolve_with_get(client: httpx.Client, url: str) -> str: + """Resolve redirect using GET request (fallback when HEAD fails).""" + # Use stream=True to avoid downloading the body + with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: + return str(response.url) + + +@lru_cache(maxsize=256) +def resolve_redirect_url(url: str) -> str: + """Resolve a redirect URL to its final destination without downloading content. + + This is useful for resolving Vertex AI grounding redirect URLs to actual URLs + before displaying them in traces, CLI output, or citations. + + Results are cached to avoid repeated HTTP calls for the same URL. + + Uses robust resolution with: + - Configurable timeouts (connect, read, total) + - HEAD request first, falls back to GET if server doesn't support HEAD + - Retries with exponential backoff for transient failures + - Realistic User-Agent to avoid blocks + + Parameters + ---------- + url : str + The URL to resolve (may be a redirect URL). + + Returns + ------- + str + The final destination URL after following redirects. + Returns the original URL if resolution fails. + """ + # Skip resolution for non-redirect URLs + if not _is_redirect_url(url): + return url + + try: + with httpx.Client(timeout=_get_redirect_timeout(), follow_redirects=True) as client: + # Try HEAD first (faster, no body download) + final_url = _resolve_with_head(client, url) + + # Fall back to GET if HEAD failed + if final_url is None: + logger.debug(f"HEAD failed for {url[:60]}..., trying GET") + final_url = _resolve_with_get(client, url) + + if final_url != url: + logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") + return final_url + except Exception as e: + logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(e).__name__}: {e}") + return url + + +async def _resolve_with_head_async(client: httpx.AsyncClient, url: str) -> str | None: + """Try to resolve redirect using async HEAD request.""" + try: + response = await client.head(url, headers={"User-Agent": _USER_AGENT}) + return str(response.url) + except httpx.HTTPStatusError as e: + # Some servers return 405 Method Not Allowed for HEAD + if e.response.status_code in (405, 501): + return None # Signal to try GET + raise + except Exception: + return None + + +async def _resolve_with_get_async(client: httpx.AsyncClient, url: str) -> str: + """Resolve redirect using async GET request (fallback when HEAD fails).""" + # Use stream to avoid downloading the body + async with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: + return str(response.url) + + +async def _resolve_single_url_async( + client: httpx.AsyncClient, + url: str, + max_retries: int = 3, + base_delay: float = 1.0, +) -> str: + """Resolve a single URL with retries and exponential backoff. + + Parameters + ---------- + client : httpx.AsyncClient + The HTTP client to use. + url : str + The URL to resolve. + max_retries : int + Maximum number of retry attempts. + base_delay : float + Base delay between retries (doubles each retry). + + Returns + ------- + str + The resolved URL, or original URL on failure. + """ + # Skip resolution for non-redirect URLs + if not _is_redirect_url(url): + return url + + # Check cache first + if url in _redirect_cache: + return _redirect_cache[url] + + last_error: Exception | None = None + + for attempt in range(max_retries): + try: + # Try HEAD first (faster, no body download) + final_url = await _resolve_with_head_async(client, url) + + # Fall back to GET if HEAD failed + if final_url is None: + logger.debug(f"HEAD failed for {url[:60]}..., trying GET (attempt {attempt + 1})") + final_url = await _resolve_with_get_async(client, url) + + if final_url != url: + logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") + + _redirect_cache[url] = final_url + return final_url + + except (httpx.TimeoutException, httpx.ConnectError, httpx.ReadError) as e: + last_error = e + if attempt < max_retries - 1: + delay = base_delay * (2**attempt) # Exponential backoff + logger.debug(f"Retry {attempt + 1}/{max_retries} for {url[:60]}... after {delay}s: {e}") + await asyncio.sleep(delay) + continue + except Exception as e: + # Non-retryable error + last_error = e + break + + # All retries exhausted or non-retryable error + logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(last_error).__name__}: {last_error}") + _redirect_cache[url] = url # Cache failures to avoid repeated attempts + return url + + +async def resolve_redirect_url_async(url: str) -> str: + """Async version of resolve_redirect_url with caching and retries. + + Parameters + ---------- + url : str + The URL to resolve (may be a redirect URL). + + Returns + ------- + str + The final destination URL after following redirects. + """ + # Skip resolution for non-redirect URLs (fast path) + if not _is_redirect_url(url): + return url + + # Check cache first (fast path) + if url in _redirect_cache: + return _redirect_cache[url] + + async with httpx.AsyncClient( + timeout=_get_redirect_timeout(), + follow_redirects=True, + ) as client: + return await _resolve_single_url_async(client, url) + + +async def resolve_redirect_urls_async(urls: list[str]) -> list[str]: + """Resolve multiple redirect URLs in parallel. + + Resolves URLs concurrently with proper error handling per URL. + + Parameters + ---------- + urls : list[str] + List of URLs to resolve. + + Returns + ------- + list[str] + List of resolved URLs in the same order. + """ + if not urls: + return [] + + async with httpx.AsyncClient( + timeout=_get_redirect_timeout(), + follow_redirects=True, + limits=httpx.Limits(max_connections=20, max_keepalive_connections=10), + ) as client: + # Resolve all URLs in parallel + tasks = [_resolve_single_url_async(client, url) for url in urls] + return list(await asyncio.gather(*tasks)) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/search.py b/aieng-eval-agents/aieng/agent_evals/tools/search.py index 26f7952..cb13d7b 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/search.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/search.py @@ -13,7 +13,7 @@ from google.genai import Client, types from pydantic import BaseModel, Field -from .web import resolve_redirect_urls_async +from ._redirect import resolve_redirect_urls_async logger = logging.getLogger(__name__) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/web.py b/aieng-eval-agents/aieng/agent_evals/tools/web.py deleted file mode 100644 index feed1c8..0000000 --- a/aieng-eval-agents/aieng/agent_evals/tools/web.py +++ /dev/null @@ -1,488 +0,0 @@ -"""Web fetch tool for retrieving content from URLs. - -Provides the web_fetch tool which fetches content from any URL (HTML pages or PDFs) -and returns the content for the agent to analyze. Similar to Anthropic's web_fetch tool. -""" - -import asyncio -import logging -from functools import lru_cache -from io import BytesIO -from typing import Any -from urllib.parse import urljoin - -import httpx -from google.adk.tools.function_tool import FunctionTool -from html_to_markdown import convert as html_to_markdown -from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential - - -logger = logging.getLogger(__name__) - - -# Maximum content size to return (100KB for text, to avoid context overflow) -MAX_CONTENT_CHARS = 100_000 - -# Known redirect URL patterns (Vertex AI grounding redirects) -REDIRECT_URL_PATTERNS = ( - "vertexaisearch.cloud.google.com/grounding-api-redirect", - "vertexaisearch.cloud.google.com/redirect", -) - -# Cache for resolved URLs (in-memory, cleared on restart) -_redirect_cache: dict[str, str] = {} - -# Default timeouts for redirect resolution -_REDIRECT_CONNECT_TIMEOUT = 10.0 # Time to establish connection -_REDIRECT_READ_TIMEOUT = 15.0 # Time to receive response - -# User agent for redirect resolution requests -_USER_AGENT = ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" -) - - -def _is_redirect_url(url: str) -> bool: - """Check if URL is a known redirect pattern.""" - return any(pattern in url for pattern in REDIRECT_URL_PATTERNS) - - -def _get_redirect_timeout() -> httpx.Timeout: - """Get timeout configuration for redirect resolution.""" - return httpx.Timeout( - connect=_REDIRECT_CONNECT_TIMEOUT, - read=_REDIRECT_READ_TIMEOUT, - write=10.0, - pool=10.0, - ) - - -def _resolve_with_head(client: httpx.Client, url: str) -> str | None: - """Try to resolve redirect using HEAD request.""" - try: - response = client.head(url, headers={"User-Agent": _USER_AGENT}) - return str(response.url) - except httpx.HTTPStatusError as e: - # Some servers return 405 Method Not Allowed for HEAD - if e.response.status_code in (405, 501): - return None # Signal to try GET - raise - except Exception: - return None - - -def _resolve_with_get(client: httpx.Client, url: str) -> str: - """Resolve redirect using GET request (fallback when HEAD fails).""" - # Use stream=True to avoid downloading the body - with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: - return str(response.url) - - -@lru_cache(maxsize=256) -def resolve_redirect_url(url: str) -> str: - """Resolve a redirect URL to its final destination without downloading content. - - This is useful for resolving Vertex AI grounding redirect URLs to actual URLs - before displaying them in traces, CLI output, or citations. - - Results are cached to avoid repeated HTTP calls for the same URL. - - Uses robust resolution with: - - Configurable timeouts (connect, read, total) - - HEAD request first, falls back to GET if server doesn't support HEAD - - Retries with exponential backoff for transient failures - - Realistic User-Agent to avoid blocks - - Parameters - ---------- - url : str - The URL to resolve (may be a redirect URL). - - Returns - ------- - str - The final destination URL after following redirects. - Returns the original URL if resolution fails. - """ - # Skip resolution for non-redirect URLs - if not _is_redirect_url(url): - return url - - try: - with httpx.Client(timeout=_get_redirect_timeout(), follow_redirects=True) as client: - # Try HEAD first (faster, no body download) - final_url = _resolve_with_head(client, url) - - # Fall back to GET if HEAD failed - if final_url is None: - logger.debug(f"HEAD failed for {url[:60]}..., trying GET") - final_url = _resolve_with_get(client, url) - - if final_url != url: - logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") - return final_url - except Exception as e: - logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(e).__name__}: {e}") - return url - - -async def _resolve_with_head_async(client: httpx.AsyncClient, url: str) -> str | None: - """Try to resolve redirect using async HEAD request.""" - try: - response = await client.head(url, headers={"User-Agent": _USER_AGENT}) - return str(response.url) - except httpx.HTTPStatusError as e: - # Some servers return 405 Method Not Allowed for HEAD - if e.response.status_code in (405, 501): - return None # Signal to try GET - raise - except Exception: - return None - - -async def _resolve_with_get_async(client: httpx.AsyncClient, url: str) -> str: - """Resolve redirect using async GET request (fallback when HEAD fails).""" - # Use stream to avoid downloading the body - async with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: - return str(response.url) - - -async def _resolve_single_url_async( - client: httpx.AsyncClient, - url: str, - max_retries: int = 3, - base_delay: float = 1.0, -) -> str: - """Resolve a single URL with retries and exponential backoff. - - Parameters - ---------- - client : httpx.AsyncClient - The HTTP client to use. - url : str - The URL to resolve. - max_retries : int - Maximum number of retry attempts. - base_delay : float - Base delay between retries (doubles each retry). - - Returns - ------- - str - The resolved URL, or original URL on failure. - """ - # Skip resolution for non-redirect URLs - if not _is_redirect_url(url): - return url - - # Check cache first - if url in _redirect_cache: - return _redirect_cache[url] - - last_error: Exception | None = None - - for attempt in range(max_retries): - try: - # Try HEAD first (faster, no body download) - final_url = await _resolve_with_head_async(client, url) - - # Fall back to GET if HEAD failed - if final_url is None: - logger.debug(f"HEAD failed for {url[:60]}..., trying GET (attempt {attempt + 1})") - final_url = await _resolve_with_get_async(client, url) - - if final_url != url: - logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") - - _redirect_cache[url] = final_url - return final_url - - except (httpx.TimeoutException, httpx.ConnectError, httpx.ReadError) as e: - last_error = e - if attempt < max_retries - 1: - delay = base_delay * (2**attempt) # Exponential backoff - logger.debug(f"Retry {attempt + 1}/{max_retries} for {url[:60]}... after {delay}s: {e}") - await asyncio.sleep(delay) - continue - except Exception as e: - # Non-retryable error - last_error = e - break - - # All retries exhausted or non-retryable error - logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(last_error).__name__}: {last_error}") - _redirect_cache[url] = url # Cache failures to avoid repeated attempts - return url - - -async def resolve_redirect_url_async(url: str) -> str: - """Async version of resolve_redirect_url with caching and retries. - - Parameters - ---------- - url : str - The URL to resolve (may be a redirect URL). - - Returns - ------- - str - The final destination URL after following redirects. - """ - # Skip resolution for non-redirect URLs (fast path) - if not _is_redirect_url(url): - return url - - # Check cache first (fast path) - if url in _redirect_cache: - return _redirect_cache[url] - - async with httpx.AsyncClient( - timeout=_get_redirect_timeout(), - follow_redirects=True, - ) as client: - return await _resolve_single_url_async(client, url) - - -async def resolve_redirect_urls_async(urls: list[str]) -> list[str]: - """Resolve multiple redirect URLs in parallel. - - Resolves URLs concurrently with proper error handling per URL. - - Parameters - ---------- - urls : list[str] - List of URLs to resolve. - - Returns - ------- - list[str] - List of resolved URLs in the same order. - """ - if not urls: - return [] - - async with httpx.AsyncClient( - timeout=_get_redirect_timeout(), - follow_redirects=True, - limits=httpx.Limits(max_connections=20, max_keepalive_connections=10), - ) as client: - # Resolve all URLs in parallel - tasks = [_resolve_single_url_async(client, url) for url in urls] - return list(await asyncio.gather(*tasks)) - - -_http_retry = retry( - stop=stop_after_attempt(3), - wait=wait_exponential(multiplier=1, min=1, max=10), - retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)), - reraise=True, -) - - -@_http_retry -def _fetch_with_retry(client: httpx.Client, url: str) -> httpx.Response: - """Fetch URL with automatic retry on transient failures.""" - response = client.get(url, headers={"User-Agent": "Mozilla/5.0 (compatible; ResearchBot/1.0)"}) - response.raise_for_status() - return response - - -def _html_to_markdown(html: str, base_url: str | None = None) -> str: - """Convert HTML to Markdown, preserving links, tables, and structure. - - Parameters - ---------- - html : str - The HTML content to convert. - base_url : str, optional - Base URL for resolving relative links. - - Returns - ------- - str - Markdown-formatted text with preserved links and tables. - """ - # Use html-to-markdown library for high-quality conversion - # It preserves links, tables, headings, lists, and other structure - markdown = html_to_markdown(html) - - # If base_url provided, convert relative URLs to absolute - if base_url: - import re # noqa: PLC0415 - - def make_absolute(match: re.Match) -> str: - """Convert relative URL to absolute.""" - prefix = match.group(1) # [text]( or src=" - url = match.group(2) - suffix = match.group(3) # ) or " - - # Skip if already absolute or is a data URI - if url.startswith(("http://", "https://", "data:", "mailto:", "#")): - return match.group(0) - - absolute_url = urljoin(base_url, url) - return f"{prefix}{absolute_url}{suffix}" - - # Fix markdown links: [text](url) - markdown = re.sub(r"(\[[^\]]*\]\()([^)]+)(\))", make_absolute, markdown) - - # Fix markdown images: ![alt](url) - markdown = re.sub(r"(!\[[^\]]*\]\()([^)]+)(\))", make_absolute, markdown) - - return markdown.strip() - - -def _extract_pdf_text(content: bytes, max_pages: int = 10) -> tuple[str, int]: - """Extract text from PDF bytes. - - Parameters - ---------- - content : bytes - The PDF file content. - max_pages : int - Maximum number of pages to extract. - - Returns - ------- - tuple[str, int] - The extracted text and total number of pages. - """ - from pypdf import PdfReader # noqa: PLC0415 - - pdf_file = BytesIO(content) - reader = PdfReader(pdf_file) - num_pages = len(reader.pages) - - pages_to_read = min(num_pages, max_pages) - text_parts = [] - - for i in range(pages_to_read): - page_text = reader.pages[i].extract_text() - if page_text: - text_parts.append(f"--- Page {i + 1} ---\n{page_text}") - - if pages_to_read < num_pages: - text_parts.append(f"\n[Document has {num_pages} pages. Showing first {pages_to_read}.]") - - return "\n\n".join(text_parts), num_pages - - -def _truncate_content(text: str) -> tuple[str, bool]: - """Truncate content if it exceeds the maximum length.""" - truncated = len(text) > MAX_CONTENT_CHARS - if truncated: - text = text[:MAX_CONTENT_CHARS] + "\n\n[Content truncated due to length]" - return text, truncated - - -def _make_error_response(error: str, url: str) -> dict[str, Any]: - """Create an error response dict.""" - return {"status": "error", "error": error, "url": url} - - -def _make_success_response(url: str, content: str, content_type: str, truncated: bool, **extra: Any) -> dict[str, Any]: - """Create a success response dict.""" - result = { - "status": "success", - "url": url, - "content": content, - "content_type": content_type, - "content_length": len(content), - "truncated": truncated, - } - result.update(extra) - return result - - -def web_fetch(url: str, max_pages: int = 10) -> dict[str, Any]: - """Fetch content from a URL (HTML page or PDF document). - - This tool retrieves the full content from a URL for analysis. It handles - both HTML pages (converted to readable text) and PDF documents (text extracted). - - For large data files (CSV, XLSX) that need searching, use fetch_file instead. - - Parameters - ---------- - url : str - The URL to fetch. Must be a valid HTTP or HTTPS URL. - max_pages : int, optional - For PDFs, maximum number of pages to extract (default 10). - - Returns - ------- - dict - On success: 'status', 'url', 'content', 'content_type', - 'content_length', 'truncated'. For PDFs also includes: - 'num_pages', 'pages_extracted'. On error: 'status', 'error', 'url'. - - Examples - -------- - >>> # Fetch an HTML page - >>> result = web_fetch("https://example.com/about") - >>> print(result["content"]) - - >>> # Fetch a PDF - >>> result = web_fetch("https://arxiv.org/pdf/2301.00234.pdf") - >>> print(f"Pages: {result['num_pages']}") - >>> print(result["content"]) - """ - logger.info(f"WebFetch: {url}") - - # Validate URL - if not url.startswith(("http://", "https://")): - return _make_error_response("Invalid URL. Must start with http:// or https://", url) - - try: - with httpx.Client(timeout=60.0, follow_redirects=True) as client: - response = _fetch_with_retry(client, url) - content_type = response.headers.get("content-type", "") - final_url = str(response.url) - - # Handle PDF documents - if "application/pdf" in content_type or url.lower().endswith(".pdf"): - return _handle_pdf_response(response.content, max_pages, final_url, url) - - # Handle HTML and text content - if "text/html" in content_type or not content_type: - text = _html_to_markdown(response.text, base_url=final_url) - else: - text = response.text - text, truncated = _truncate_content(text) - - return _make_success_response(final_url, text, content_type or "text/html", truncated) - - except httpx.HTTPStatusError as e: - logger.warning(f"HTTP error fetching {url}: {e}") - return _make_error_response(f"HTTP {e.response.status_code}: {e.response.reason_phrase}", url) - except httpx.RequestError as e: - logger.warning(f"Request error fetching {url}: {e}") - return _make_error_response(f"Request failed: {e!s}", url) - except Exception as e: - logger.exception(f"Unexpected error in web_fetch for {url}") - return _make_error_response(f"Unexpected error: {e!s}", url) - - -def _handle_pdf_response(content: bytes, max_pages: int, final_url: str, url: str) -> dict[str, Any]: - """Handle PDF content extraction and response creation.""" - try: - text, num_pages = _extract_pdf_text(content, max_pages) - text, truncated = _truncate_content(text) - - return _make_success_response( - final_url, - text, - "application/pdf", - truncated, - num_pages=num_pages, - pages_extracted=min(num_pages, max_pages), - ) - except ImportError: - return _make_error_response("PDF support requires pypdf. Install with: pip install pypdf", url) - except Exception as e: - return _make_error_response(f"Failed to extract PDF text: {e!s}", url) - - -def create_web_fetch_tool() -> FunctionTool: - """Create an ADK FunctionTool for fetching web content.""" - return FunctionTool(func=web_fetch) From 5685b652f98a56000b2713cb14ac172088b986b8 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Mon, 2 Feb 2026 15:14:03 -0500 Subject: [PATCH 06/15] Remove unused synchronous version --- .../aieng/agent_evals/tools/_redirect.py | 75 ------------------- 1 file changed, 75 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py b/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py index fd8f499..488f89c 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py @@ -7,7 +7,6 @@ import asyncio import logging -from functools import lru_cache import httpx @@ -42,80 +41,6 @@ def _get_redirect_timeout() -> httpx.Timeout: ) -# ============================================================================ -# Synchronous Redirect Resolution -# ============================================================================ - - -def _resolve_with_head(client: httpx.Client, url: str) -> str | None: - """Try to resolve redirect using HEAD request.""" - try: - response = client.head(url, headers={"User-Agent": _USER_AGENT}) - return str(response.url) - except httpx.HTTPStatusError as e: - # Some servers return 405 Method Not Allowed for HEAD - if e.response.status_code in (405, 501): - return None # Signal to try GET - raise - except Exception: - return None - - -def _resolve_with_get(client: httpx.Client, url: str) -> str: - """Resolve redirect using GET request (fallback when HEAD fails).""" - # Use stream=True to avoid downloading the body - with client.stream("GET", url, headers={"User-Agent": _USER_AGENT}) as response: - return str(response.url) - - -@lru_cache(maxsize=256) -def resolve_redirect_url(url: str) -> str: - """Resolve a redirect URL to its final destination without downloading content. - - This is useful for resolving Vertex AI grounding redirect URLs to actual URLs - before displaying them in traces, CLI output, or citations. - - Results are cached to avoid repeated HTTP calls for the same URL. - - Uses robust resolution with: - - Configurable timeouts (connect, read, total) - - HEAD request first, falls back to GET if server doesn't support HEAD - - Retries with exponential backoff for transient failures - - Realistic User-Agent to avoid blocks - - Parameters - ---------- - url : str - The URL to resolve (may be a redirect URL). - - Returns - ------- - str - The final destination URL after following redirects. - Returns the original URL if resolution fails. - """ - # Skip resolution for non-redirect URLs - if not _is_redirect_url(url): - return url - - try: - with httpx.Client(timeout=_get_redirect_timeout(), follow_redirects=True) as client: - # Try HEAD first (faster, no body download) - final_url = _resolve_with_head(client, url) - - # Fall back to GET if HEAD failed - if final_url is None: - logger.debug(f"HEAD failed for {url[:60]}..., trying GET") - final_url = _resolve_with_get(client, url) - - if final_url != url: - logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") - return final_url - except Exception as e: - logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(e).__name__}: {e}") - return url - - async def _resolve_with_head_async(client: httpx.AsyncClient, url: str) -> str | None: """Try to resolve redirect using async HEAD request.""" try: From 1caea9ee6242a36f182b67ae1145f4368fdfdb05 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 3 Feb 2026 13:30:44 -0500 Subject: [PATCH 07/15] Fix typing issues --- .../agent_evals/report_generation/evaluation.py | 14 +++++++++----- implementations/aml_investigation/agent.py | 5 +++-- implementations/aml_investigation/data/cli.py | 16 +++++++++++----- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py index 83c8324..f0a851e 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py @@ -14,7 +14,7 @@ import logging from pathlib import Path -from typing import Any +from typing import Any, Union import agents from aieng.agent_evals.async_client_manager import AsyncClientManager @@ -27,7 +27,7 @@ TRAJECTORY_EVALUATOR_TEMPLATE, ) from langfuse._client.datasets import DatasetItemClient -from langfuse.experiment import Evaluation +from langfuse.experiment import Evaluation, LocalExperimentItem from openai.types.responses.response_function_tool_call import ResponseFunctionToolCall from openai.types.responses.response_output_message import ResponseOutputMessage from openai.types.responses.response_output_refusal import ResponseOutputRefusal @@ -136,12 +136,14 @@ def __init__( self.reports_output_path = reports_output_path self.langfuse_project_name = langfuse_project_name - async def run(self, *, item: DatasetItemClient, **kwargs) -> EvaluationOutput: + async def run( + self, *, item: Union[LocalExperimentItem, DatasetItemClient], **kwargs: dict[str, Any] + ) -> EvaluationOutput: """Run the report generation agent against an item from a Langfuse dataset. Parameters ---------- - item : DatasetItemClient + item : LocalExperimentItem | DatasetItemClient The item from the Langfuse dataset to evaluate against. Returns @@ -157,7 +159,9 @@ async def run(self, *, item: DatasetItemClient, **kwargs) -> EvaluationOutput: reports_output_path=self.reports_output_path, langfuse_project_name=self.langfuse_project_name, ) - result = await run_agent_with_retry(report_generation_agent, item.input) + # Handle both TypedDict and class access patterns + item_input = item["input"] if isinstance(item, dict) else item.input + result = await run_agent_with_retry(report_generation_agent, item_input) # Extract the report data and trajectory from the agent's response actions = [] diff --git a/implementations/aml_investigation/agent.py b/implementations/aml_investigation/agent.py index 4d48e61..8fa34b4 100644 --- a/implementations/aml_investigation/agent.py +++ b/implementations/aml_investigation/agent.py @@ -14,7 +14,7 @@ import logging import os import uuid -from functools import lru_cache +from functools import lru_cache, partial from pathlib import Path import google.genai.types @@ -216,7 +216,7 @@ async def _analyze_cases_to_jsonl( output_path.parent.mkdir(parents=True, exist_ok=True) tasks = [ - asyncio.create_task(rate_limited(lambda r=record: _safe_analyze_case(runner, r), semaphore)) for record in cases + asyncio.create_task(rate_limited(partial(_safe_analyze_case, runner, record), semaphore)) for record in cases ] analyzed_by_id: dict[str, CaseRecord] = {} @@ -280,6 +280,7 @@ async def _main() -> None: tp = fp = fn = tn = 0 for r in scored: gt = r.groundtruth.is_laundering + assert r.analysis is not None # Guaranteed by filter above pred = r.analysis.is_laundering if gt and pred: tp += 1 diff --git a/implementations/aml_investigation/data/cli.py b/implementations/aml_investigation/data/cli.py index dd9635d..a9f6f87 100644 --- a/implementations/aml_investigation/data/cli.py +++ b/implementations/aml_investigation/data/cli.py @@ -20,7 +20,7 @@ import logging import sqlite3 from pathlib import Path -from typing import Any, Callable, get_args +from typing import Any, Callable, cast, get_args import click import pandas as pd @@ -167,9 +167,12 @@ def create_db(illicit_ratio: str, transactions_size: str, ddl_file_path: Path, d db_path = ddl_file_path.parent / "aml_transactions.db" # Download datasets from Kaggle + # Cast is safe because _validate_dataset_options ensures valid values click.echo("Downloading dataset files...") - path_to_transc_csv = download_dataset_file(illicit_ratio, transactions_size, "Trans.csv") - path_to_accts_csv = download_dataset_file(illicit_ratio, transactions_size, "accounts.csv") + ratio = cast(IllicitRatios, illicit_ratio) + size = cast(TransactionsSizes, transactions_size) + path_to_transc_csv = download_dataset_file(ratio, size, "Trans.csv") + path_to_accts_csv = download_dataset_file(ratio, size, "accounts.csv") click.echo("✅ Download complete.") with sqlite3.connect(db_path) as conn: @@ -269,8 +272,11 @@ def create_cases( if lookback_days == 0: logger.warning("lookback_days=0 creates very narrow windows (can be seed timestamp only); consider >= 1.") - path_to_transc_csv = download_dataset_file(illicit_ratio, transactions_size, "Trans.csv") - path_to_patterns_txt = download_dataset_file(illicit_ratio, transactions_size, "Patterns.txt") + # Cast is safe because _validate_dataset_options ensures valid values + ratio = cast(IllicitRatios, illicit_ratio) + size = cast(TransactionsSizes, transactions_size) + path_to_transc_csv = download_dataset_file(ratio, size, "Trans.csv") + path_to_patterns_txt = download_dataset_file(ratio, size, "Patterns.txt") click.echo("✅ Downloaded dataset files.") transc_df = pd.read_csv(path_to_transc_csv, dtype_backend="pyarrow") From fa7a95193c774496272385678cd43a3cd98a019d Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 3 Feb 2026 23:40:54 -0500 Subject: [PATCH 08/15] Update search fn to async --- .../aieng/agent_evals/tools/search.py | 21 ++++--------------- .../aieng/agent_evals/tools/test_search.py | 10 +++++---- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/search.py b/aieng-eval-agents/aieng/agent_evals/tools/search.py index cb13d7b..93dee2f 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/search.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/search.py @@ -4,8 +4,6 @@ enabling a proper research workflow: search → fetch → verify → answer. """ -import asyncio -import concurrent.futures import logging from typing import Any @@ -150,7 +148,7 @@ async def _google_search_async(query: str) -> dict[str, Any]: } -def google_search(query: str) -> dict[str, Any]: +async def google_search(query: str) -> dict[str, Any]: """Search Google and return results with actual URLs for fetching. Use this tool to find information on the web. The results include: @@ -173,25 +171,14 @@ def google_search(query: str) -> dict[str, Any]: Examples -------- - >>> result = google_search("highest single day snowfall Toronto") + >>> result = await google_search("highest single day snowfall Toronto") >>> # Check the sources >>> for source in result["sources"]: ... print(f"{source['title']}: {source['url']}") >>> # Then fetch to verify - >>> page = web_fetch(result["sources"][0]["url"]) + >>> page = await web_fetch(result["sources"][0]["url"]) """ - logger.info(f"GoogleSearch: {query}") - - # Handle being called from async context - try: - asyncio.get_running_loop() - # We're in an async context - need to run in a new thread - with concurrent.futures.ThreadPoolExecutor() as pool: - future = pool.submit(asyncio.run, _google_search_async(query)) - return future.result() - except RuntimeError: - # No running loop - we can use asyncio.run directly - return asyncio.run(_google_search_async(query)) + return await _google_search_async(query) def create_google_search_tool() -> FunctionTool: diff --git a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py index 223eea2..2e69a45 100644 --- a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py +++ b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py @@ -162,9 +162,10 @@ def test_create_google_search_tool_real(self): # The tool should be a FunctionTool wrapping google_search assert isinstance(tool, FunctionTool) - def test_google_search_returns_urls(self): + @pytest.mark.asyncio + async def test_google_search_returns_urls(self): """Test that google_search returns actual URLs, not redirect URLs.""" - result = google_search("capital of France") + result = await google_search("capital of France") # Should have success status assert result["status"] == "success" @@ -184,9 +185,10 @@ def test_google_search_returns_urls(self): assert source["url"].startswith("http"), f"Expected URL, got: {source['url']}" assert "vertexaisearch" not in source["url"], "URL should not be a redirect URL" - def test_google_search_response_structure(self): + @pytest.mark.asyncio + async def test_google_search_response_structure(self): """Test the complete response structure from google_search.""" - result = google_search("Python programming language") + result = await google_search("Python programming language") # Check all expected keys exist assert "status" in result From f6792cfb3108905d85c68fedf5c10692803f2d19 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 3 Feb 2026 23:44:03 -0500 Subject: [PATCH 09/15] Use modern operator to denote union of types --- .../aieng/agent_evals/report_generation/evaluation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py index f0a851e..8f81b14 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py @@ -14,7 +14,7 @@ import logging from pathlib import Path -from typing import Any, Union +from typing import Any import agents from aieng.agent_evals.async_client_manager import AsyncClientManager @@ -137,7 +137,7 @@ def __init__( self.langfuse_project_name = langfuse_project_name async def run( - self, *, item: Union[LocalExperimentItem, DatasetItemClient], **kwargs: dict[str, Any] + self, *, item: LocalExperimentItem | DatasetItemClient, **kwargs: dict[str, Any] ) -> EvaluationOutput: """Run the report generation agent against an item from a Langfuse dataset. From 5ecb97f86c43c81fcbf15dcedbf8c14fda33e5ce Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Feb 2026 04:45:02 +0000 Subject: [PATCH 10/15] [pre-commit.ci] Add auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- .../aieng/agent_evals/report_generation/evaluation.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py index 8f81b14..c4101d3 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py @@ -136,9 +136,7 @@ def __init__( self.reports_output_path = reports_output_path self.langfuse_project_name = langfuse_project_name - async def run( - self, *, item: LocalExperimentItem | DatasetItemClient, **kwargs: dict[str, Any] - ) -> EvaluationOutput: + async def run(self, *, item: LocalExperimentItem | DatasetItemClient, **kwargs: dict[str, Any]) -> EvaluationOutput: """Run the report generation agent against an item from a Langfuse dataset. Parameters From 1fee32a2c3e74f6daaad2ebdbc1aefb3c1972fab Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Tue, 3 Feb 2026 23:55:50 -0500 Subject: [PATCH 11/15] Use tenacity for retries --- .../report_generation/evaluation.py | 4 +- .../aieng/agent_evals/tools/_redirect.py | 67 +++++++++---------- 2 files changed, 33 insertions(+), 38 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py index 8f81b14..c4101d3 100644 --- a/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py +++ b/aieng-eval-agents/aieng/agent_evals/report_generation/evaluation.py @@ -136,9 +136,7 @@ def __init__( self.reports_output_path = reports_output_path self.langfuse_project_name = langfuse_project_name - async def run( - self, *, item: LocalExperimentItem | DatasetItemClient, **kwargs: dict[str, Any] - ) -> EvaluationOutput: + async def run(self, *, item: LocalExperimentItem | DatasetItemClient, **kwargs: dict[str, Any]) -> EvaluationOutput: """Run the report generation agent against an item from a Langfuse dataset. Parameters diff --git a/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py b/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py index 488f89c..76a9594 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/_redirect.py @@ -9,6 +9,7 @@ import logging import httpx +from tenacity import AsyncRetrying, retry_if_exception_type, stop_after_attempt, wait_exponential logger = logging.getLogger(__name__) @@ -70,6 +71,8 @@ async def _resolve_single_url_async( ) -> str: """Resolve a single URL with retries and exponential backoff. + Uses tenacity for automatic retry handling with exponential backoff. + Parameters ---------- client : httpx.AsyncClient @@ -84,7 +87,7 @@ async def _resolve_single_url_async( Returns ------- str - The resolved URL, or original URL on failure. + The resolved URL, or original URL if not a redirect or on failure. """ # Skip resolution for non-redirect URLs if not _is_redirect_url(url): @@ -94,40 +97,34 @@ async def _resolve_single_url_async( if url in _redirect_cache: return _redirect_cache[url] - last_error: Exception | None = None - - for attempt in range(max_retries): - try: - # Try HEAD first (faster, no body download) - final_url = await _resolve_with_head_async(client, url) - - # Fall back to GET if HEAD failed - if final_url is None: - logger.debug(f"HEAD failed for {url[:60]}..., trying GET (attempt {attempt + 1})") - final_url = await _resolve_with_get_async(client, url) - - if final_url != url: - logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") - - _redirect_cache[url] = final_url - return final_url - - except (httpx.TimeoutException, httpx.ConnectError, httpx.ReadError) as e: - last_error = e - if attempt < max_retries - 1: - delay = base_delay * (2**attempt) # Exponential backoff - logger.debug(f"Retry {attempt + 1}/{max_retries} for {url[:60]}... after {delay}s: {e}") - await asyncio.sleep(delay) - continue - except Exception as e: - # Non-retryable error - last_error = e - break - - # All retries exhausted or non-retryable error - logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(last_error).__name__}: {last_error}") - _redirect_cache[url] = url # Cache failures to avoid repeated attempts - return url + try: + async for attempt in AsyncRetrying( + stop=stop_after_attempt(max_retries), + wait=wait_exponential(multiplier=base_delay, min=base_delay, max=60.0), + retry=retry_if_exception_type((httpx.TimeoutException, httpx.ConnectError, httpx.ReadError)), + ): + with attempt: + # Try HEAD first (faster, no body download) + final_url = await _resolve_with_head_async(client, url) + + # Fall back to GET if HEAD failed + if final_url is None: + logger.debug(f"HEAD failed for {url[:60]}..., trying GET") + final_url = await _resolve_with_get_async(client, url) + + if final_url != url: + logger.debug(f"Resolved redirect: {url[:60]}... -> {final_url[:60]}...") + + _redirect_cache[url] = final_url + + # If we reach here, the retry loop succeeded + return _redirect_cache[url] + + except Exception as e: + # All retries exhausted or non-retryable error + logger.warning(f"Failed to resolve redirect URL {url[:60]}...: {type(e).__name__}: {e}") + _redirect_cache[url] = url # Cache failures to avoid repeated attempts + return url async def resolve_redirect_url_async(url: str) -> str: From 5eebe9ff060d9fc398114a31344015a84d5a12ce Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 4 Feb 2026 00:15:00 -0500 Subject: [PATCH 12/15] Fix config in test using mock --- .../tests/aieng/agent_evals/tools/test_search.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py index 2e69a45..74a940c 100644 --- a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py +++ b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py @@ -81,8 +81,11 @@ def test_format_with_citations_no_sources(self): class TestCreateGoogleSearchTool: """Tests for the create_google_search_tool function.""" - def test_creates_function_tool(self): + def test_creates_function_tool(self, monkeypatch): """Test that the tool is created as a FunctionTool wrapping google_search.""" + # Set required environment variable for Configs + monkeypatch.setenv("GOOGLE_API_KEY", "test-key") + result = create_google_search_tool() assert isinstance(result, FunctionTool) From 9a73472c7dadb8e25a30b41d885224db4326c341 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 4 Feb 2026 00:19:04 -0500 Subject: [PATCH 13/15] Improve return docstring --- .../aieng/agent_evals/tools/search.py | 88 ++++++++++++++++--- 1 file changed, 78 insertions(+), 10 deletions(-) diff --git a/aieng-eval-agents/aieng/agent_evals/tools/search.py b/aieng-eval-agents/aieng/agent_evals/tools/search.py index 93dee2f..33d18ae 100644 --- a/aieng-eval-agents/aieng/agent_evals/tools/search.py +++ b/aieng-eval-agents/aieng/agent_evals/tools/search.py @@ -7,6 +7,7 @@ import logging from typing import Any +from aieng.agent_evals.configs import Configs from google.adk.tools.function_tool import FunctionTool from google.genai import Client, types from pydantic import BaseModel, Field @@ -75,7 +76,7 @@ def format_response_with_citations(response: GroundedResponse) -> str: return response.format_with_citations() -async def _google_search_async(query: str) -> dict[str, Any]: +async def _google_search_async(query: str, model: str) -> dict[str, Any]: """Execute a Google search and return results with actual URLs. This function calls Gemini with Google Search grounding enabled, @@ -86,18 +87,27 @@ async def _google_search_async(query: str) -> dict[str, Any]: ---------- query : str The search query. + model : str + The Gemini model to use for search. Returns ------- dict - Search results with 'summary' text and 'sources' list containing - actual URLs the agent can fetch. + Search results with the following keys: + + - **status** (str): "success" or "error" + - **summary** (str): Brief text summary of search results + - **sources** (list[dict]): List of source dicts, each containing: + - **title** (str): Title of the webpage + - **url** (str): Actual URL that can be fetched + - **source_count** (int): Number of sources found (success case only) + - **error** (str): Error message (error case only) """ client = Client() try: response = client.models.generate_content( - model="gemini-2.5-flash", + model=model, contents=query, config=types.GenerateContentConfig( tools=[types.Tool(google_search=types.GoogleSearch())], @@ -148,7 +158,7 @@ async def _google_search_async(query: str) -> dict[str, Any]: } -async def google_search(query: str) -> dict[str, Any]: +async def google_search(query: str, model: str | None = None) -> dict[str, Any]: """Search Google and return results with actual URLs for fetching. Use this tool to find information on the web. The results include: @@ -162,12 +172,22 @@ async def google_search(query: str) -> dict[str, Any]: ---------- query : str The search query. Be specific and include key terms. + model : str, optional + The Gemini model to use for search. If not provided, uses + default_worker_model from Configs. Returns ------- dict - Contains 'summary' (brief overview), 'sources' (list of URLs to fetch), - and 'source_count'. On error, contains 'status': 'error' and 'error' message. + Search results with the following keys: + + - **status** (str): "success" or "error" + - **summary** (str): Brief text summary of search results + - **sources** (list[dict]): List of source dicts, each containing: + - **title** (str): Title of the webpage + - **url** (str): Actual URL that can be fetched + - **source_count** (int): Number of sources found (success case only) + - **error** (str): Error message (error case only) Examples -------- @@ -178,15 +198,25 @@ async def google_search(query: str) -> dict[str, Any]: >>> # Then fetch to verify >>> page = await web_fetch(result["sources"][0]["url"]) """ - return await _google_search_async(query) + if model is None: + config = Configs() # type: ignore[call-arg] + model = config.default_worker_model + + return await _google_search_async(query, model=model) -def create_google_search_tool() -> FunctionTool: +def create_google_search_tool(config: Configs | None = None) -> FunctionTool: """Create a search tool that returns actual URLs for fetching. This tool calls Google Search, extracts grounding URLs, resolves redirects, and returns actual URLs the agent can use with web_fetch for verification. + Parameters + ---------- + config : Configs, optional + Configuration settings. If not provided, creates default config. + Uses config.default_worker_model for the search model. + Returns ------- FunctionTool @@ -195,8 +225,46 @@ def create_google_search_tool() -> FunctionTool: Examples -------- >>> from aieng.agent_evals.tools import create_google_search_tool - >>> search_tool = create_google_search_tool() + >>> from aieng.agent_evals.configs import Configs + >>> config = Configs() + >>> search_tool = create_google_search_tool(config=config) >>> # Use with an ADK agent >>> agent = Agent(tools=[search_tool]) """ + if config is None: + config = Configs() # type: ignore[call-arg] + + model = config.default_worker_model + + async def google_search(query: str) -> dict[str, Any]: + """Search Google and return results with actual URLs for fetching. + + Use this tool to find information on the web. The results include: + - A summary of what was found + - A list of source URLs that you can fetch with web_fetch to verify + + IMPORTANT: The summary is from search snippets which may be incomplete + or outdated. Always use web_fetch on the source URLs to verify information + before answering. + + Parameters + ---------- + query : str + The search query. Be specific and include key terms. + + Returns + ------- + dict + Search results with the following keys: + + - **status** (str): "success" or "error" + - **summary** (str): Brief text summary of search results + - **sources** (list[dict]): List of source dicts, each containing: + - **title** (str): Title of the webpage + - **url** (str): Actual URL that can be fetched + - **source_count** (int): Number of sources found (success case only) + - **error** (str): Error message (error case only) + """ + return await _google_search_async(query, model=model) + return FunctionTool(func=google_search) From 6f93ea04a8ff19c708dc89bc5cfd98decb40a248 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 4 Feb 2026 00:23:25 -0500 Subject: [PATCH 14/15] Fix test --- .../tests/aieng/agent_evals/tools/test_search.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py index 74a940c..6ca13ee 100644 --- a/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py +++ b/aieng-eval-agents/tests/aieng/agent_evals/tools/test_search.py @@ -1,5 +1,7 @@ """Tests for Google Search tool.""" +from unittest.mock import MagicMock + import pytest from aieng.agent_evals.tools import ( GroundedResponse, @@ -81,12 +83,13 @@ def test_format_with_citations_no_sources(self): class TestCreateGoogleSearchTool: """Tests for the create_google_search_tool function.""" - def test_creates_function_tool(self, monkeypatch): + def test_creates_function_tool(self): """Test that the tool is created as a FunctionTool wrapping google_search.""" - # Set required environment variable for Configs - monkeypatch.setenv("GOOGLE_API_KEY", "test-key") + # Create a mock config with the required attribute + mock_config = MagicMock() + mock_config.default_worker_model = "gemini-2.5-flash" - result = create_google_search_tool() + result = create_google_search_tool(config=mock_config) assert isinstance(result, FunctionTool) # The function tool should wrap the google_search function From 2a9f4bbb75a4d7e762275c5477dda01a7bfaba78 Mon Sep 17 00:00:00 2001 From: Amrit Krishnan Date: Wed, 4 Feb 2026 09:40:42 -0500 Subject: [PATCH 15/15] Remove use of cast, lets stop lying to the type checker --- implementations/aml_investigation/data/cli.py | 64 +++++++++++++++---- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/implementations/aml_investigation/data/cli.py b/implementations/aml_investigation/data/cli.py index a9f6f87..0ae1d27 100644 --- a/implementations/aml_investigation/data/cli.py +++ b/implementations/aml_investigation/data/cli.py @@ -20,7 +20,7 @@ import logging import sqlite3 from pathlib import Path -from typing import Any, Callable, cast, get_args +from typing import Any, Callable, get_args import click import pandas as pd @@ -104,12 +104,50 @@ def wrapper(*args, **kwargs): return wrapper -def _validate_dataset_options(illicit_ratio: str, transactions_size: str) -> None: - """Validate dataset option values.""" - if illicit_ratio not in get_args(IllicitRatios): +def _validate_illicit_ratio(value: str) -> IllicitRatios: + """Validate and narrow illicit_ratio to its Literal type. + + Parameters + ---------- + value : str + The illicit ratio value to validate. + + Returns + ------- + IllicitRatios + The validated and type-narrowed illicit ratio. + + Raises + ------ + ValueError + If value is not a valid illicit ratio. + """ + if value not in get_args(IllicitRatios): raise ValueError(f"illicit_ratio must be one of {sorted(get_args(IllicitRatios))}") - if transactions_size not in get_args(TransactionsSizes): + return value # type: ignore[return-value] + + +def _validate_transactions_size(value: str) -> TransactionsSizes: + """Validate and narrow transactions_size to its Literal type. + + Parameters + ---------- + value : str + The transactions size value to validate. + + Returns + ------- + TransactionsSizes + The validated and type-narrowed transactions size. + + Raises + ------ + ValueError + If value is not a valid transactions size. + """ + if value not in get_args(TransactionsSizes): raise ValueError(f"transactions_size must be one of {sorted(get_args(TransactionsSizes))}") + return value # type: ignore[return-value] @click.group() @@ -159,7 +197,10 @@ def create_db(illicit_ratio: str, transactions_size: str, ddl_file_path: Path, d FileNotFoundError If the DDL file does not exist. """ - _validate_dataset_options(illicit_ratio, transactions_size) + # Validate and narrow types + ratio = _validate_illicit_ratio(illicit_ratio) + size = _validate_transactions_size(transactions_size) + if not ddl_file_path.exists(): raise FileNotFoundError(f"DDL file not found: {ddl_file_path}") @@ -167,10 +208,7 @@ def create_db(illicit_ratio: str, transactions_size: str, ddl_file_path: Path, d db_path = ddl_file_path.parent / "aml_transactions.db" # Download datasets from Kaggle - # Cast is safe because _validate_dataset_options ensures valid values click.echo("Downloading dataset files...") - ratio = cast(IllicitRatios, illicit_ratio) - size = cast(TransactionsSizes, transactions_size) path_to_transc_csv = download_dataset_file(ratio, size, "Trans.csv") path_to_accts_csv = download_dataset_file(ratio, size, "accounts.csv") click.echo("✅ Download complete.") @@ -259,7 +297,10 @@ def create_cases( ValueError If any numeric argument is negative or option values are invalid. """ - _validate_dataset_options(illicit_ratio, transactions_size) + # Validate and narrow types + ratio = _validate_illicit_ratio(illicit_ratio) + size = _validate_transactions_size(transactions_size) + for name, value in [ ("num_laundering_cases", num_laundering_cases), ("num_normal_cases", num_normal_cases), @@ -272,9 +313,6 @@ def create_cases( if lookback_days == 0: logger.warning("lookback_days=0 creates very narrow windows (can be seed timestamp only); consider >= 1.") - # Cast is safe because _validate_dataset_options ensures valid values - ratio = cast(IllicitRatios, illicit_ratio) - size = cast(TransactionsSizes, transactions_size) path_to_transc_csv = download_dataset_file(ratio, size, "Trans.csv") path_to_patterns_txt = download_dataset_file(ratio, size, "Patterns.txt") click.echo("✅ Downloaded dataset files.")