Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions vertexai/preview/reasoning_engines/templates/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,188 @@ async def cancel(
)


def to_a2a(
agent_engine_app: object,
agent_card: "AgentCard",
user_id: str = None,
task_store_builder: Callable[..., "TaskStore"] = None,
task_store_kwargs: Optional[Mapping[str, Any]] = None,
agent_executor_kwargs: Optional[Mapping[str, Any]] = None,
agent_executor_builder: Optional[Callable[..., "AgentExecutor"]] = None,
request_handler_kwargs: Optional[Mapping[str, Any]] = None,
request_handler_builder: Optional[Callable[..., "RequestHandler"]] = None,
extended_agent_card: "AgentCard" = None,
):
"""Converts an existing Agent Engine application to be compatible with A2A.

This function wraps an `agent_engine_app` with A2A functionalities, allowing it
to handle A2A protocol requests. It augments the app's setup and operation
registration to include A2A-specific handlers.

Args:
agent_engine_app (object): The Agent Engine application instance.
agent_card (AgentCard): The AgentCard describing the agent.
user_id (str): The user ID.
task_store_builder (Callable[..., TaskStore], optional): A callable to build the
TaskStore.
task_store_kwargs (Optional[Mapping[str, Any]], optional):
Keyword arguments for the TaskStore builder.
agent_executor_kwargs (Optional[Mapping[str, Any]], optional):
Keyword arguments for the AgentExecutor builder.
agent_executor_builder (Optional[Callable[..., AgentExecutor]], optional):
A callable to build the AgentExecutor.
If not provided, a default `AdkAgentExecutor` will be used.
request_handler_kwargs (Optional[Mapping[str, Any]], optional):
Keyword arguments for the RequestHandler builder.
request_handler_builder (Optional[Callable[..., RequestHandler]], optional):
A callable to build the RequestHandler.
extended_agent_card (AgentCard, optional): An extended AgentCard.

Returns:
object: The augmented Agent Engine application instance.
"""
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.server.tasks import TaskUpdater
from google.genai import types
from a2a.types import (
TextPart,
FilePart,
FileWithBytes,
FileWithUri,
Part,
)

class DefaultAgentExecutor(AgentExecutor):
"""Agent Executor for adapting an AE application to the A2A protocol."""

def __init__(self, adk_app_instance):
self.adk_app = adk_app_instance

def convert_genai_parts_to_a2a(
self, parts: list[types.Part]
) -> list[Part]:
"""Convert a list of Google Gen AI Part types into a list of A2A Part types."""
return [
self.convert_genai_part_to_a2a(part)
for part in parts
if (part.text or part.file_data or part.inline_data)
]

def convert_genai_part_to_a2a(self, part: types.Part) -> Part:
"""Convert a single Google Gen AI Part type into an A2A Part type."""
if part.text:
return TextPart(text=part.text)
if part.file_data:
return FilePart(
file=FileWithUri(
uri=part.file_data.file_uri,
mime_type=part.file_data.mime_type,
)
)
if part.inline_data:
return Part(
root=FilePart(
file=FileWithBytes(
bytes=part.inline_data.data,
mime_type=part.inline_data.mime_type,
)
)
)
raise ValueError(f"Unsupported part type: {part}")

async def execute(
self, context: RequestContext, event_queue: EventQueue
) -> None:
from google.adk.events.event import Event

query = context.get_user_input()

updater = TaskUpdater(
event_queue, context.task_id, context.context_id
)

if not context.current_task:
await updater.submit()

await updater.start_work()

parts = []
for stream_event in self.adk_app.stream_query(
user_id=user_id, message=query
):
event = Event(**stream_event)
parts.extend(
self.convert_genai_parts_to_a2a(event.content.parts)
)

await updater.add_artifact(
parts,
name="result",
)
await updater.complete()

async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
raise Exception("Cancel not supported for this ADK agent")

if agent_executor_builder:
agent_executor_kwargs["adk_app_instance"] = agent_engine_app
a2a_mixin = A2aAgent(
agent_card=agent_card,
task_store_builder=task_store_builder,
task_store_kwargs=task_store_kwargs,
agent_executor_builder=agent_executor_builder,
agent_executor_kwargs=agent_executor_kwargs,
request_handler_builder=request_handler_builder,
request_handler_kwargs=request_handler_kwargs,
extended_agent_card=extended_agent_card,
)
else:
a2a_mixin = A2aAgent(
agent_card=agent_card,
agent_executor_builder=DefaultAgentExecutor,
agent_executor_kwargs={"adk_app_instance": agent_engine_app},
)

def augmented_register_operations():
routes = agent_engine_app.register_operations()
a2a_routes = a2a_mixin.register_operations()
for group, ops in a2a_routes.items():
if group not in routes:
routes[group] = []
for op in ops:
if op not in routes[group]:
routes[group].append(op)
return routes

original_setup = agent_engine_app.set_up

def augmented_setup():
original_setup()
a2a_mixin.set_up()

agent_engine_app.set_up = augmented_setup

agent_engine_app.register_operations = augmented_register_operations
agent_engine_app.a2a_mixin = a2a_mixin

_original_getattr = agent_engine_app.__class__.__getattribute__

def __getattr__(instance, name):
try:
return _original_getattr(instance, name)
except AttributeError:
if hasattr(instance.a2a_mixin, name):
return getattr(instance.a2a_mixin, name)
raise

agent_engine_app.__class__.__getattr__ = __getattr__

return agent_engine_app


class A2aAgent:
"""A class to initialize and set up an Agent-to-Agent application."""

Expand Down
9 changes: 9 additions & 0 deletions vertexai/preview/reasoning_engines/templates/adk.py
Original file line number Diff line number Diff line change
Expand Up @@ -1534,3 +1534,12 @@ def _warn_if_telemetry_api_disabled(self):
r = session.post("https://telemetry.googleapis.com/v1/traces", data=None)
if "Telemetry API has not been used in project" in r.text:
_warn(_TELEMETRY_API_DISABLED_WARNING % (project, project))

def to_a2a(self, agent_card: "AgentCard"):
"""Converts an existing ADK application to be compatible with A2A."""

from vertexai.preview.reasoning_engines.templates import a2a

return a2a.to_a2a(
agent_engine_app=self, agent_card=agent_card, user_id="123"
)
9 changes: 9 additions & 0 deletions vertexai/preview/reasoning_engines/templates/langgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,3 +658,12 @@ def register_operations(self) -> Mapping[str, Sequence[str]]:
"": ["query", "get_state", "update_state"],
"stream": ["stream_query", "get_state_history"],
}

def to_a2a(self, agent_card: "AgentCard"):
"""Converts an existing Langraph application to be compatible with A2A."""

from vertexai.preview.reasoning_engines.templates import a2a

return a2a.to_a2a(
agent_engine_app=self, agent_card=agent_card
)
Loading