diff --git a/README.md b/README.md index 5c165a3..152f5b9 100644 --- a/README.md +++ b/README.md @@ -73,46 +73,57 @@ cd /home/worker/projects/prisma_platform - `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/redoc` -## Запуск сценария через HTTP +## HTTP API -- `POST http://127.0.0.1:7777/api/runs` +### Запуск сценария (async) -Тело запроса: - -```json -{ - "scenario_id": "news_source_discovery_v1", - "input": { - "url": "https://example.com/news" - } -} -``` - -Пример: +`POST /api/runs` — планирует выполнение сценария и **сразу** возвращает `run_id`. Само выполнение идёт в фоне. ```bash curl -s -X POST "http://127.0.0.1:7777/api/runs" \ -H "Content-Type: application/json" \ -d '{ "scenario_id": "news_source_discovery_v1", - "input": { - "url": "https://example.com/news" - } + "input": { "url": "https://example.com/news" } }' ``` -Успешный ответ содержит: +Ответ (`202 Accepted`): -- `status=success` -- `message=""` -- список `steps` со статусами и временем шагов -- `output_summary` -- `result` итогового шага +```json +{ + "run_id": "f3d9…", + "scenario_id": "news_source_discovery_v1", + "status": "queued", + "input": { "url": "..." }, + "started_at": "2026-04-24T..." +} +``` -При ошибке: +### Снапшот состояния -- `status=failed` -- `message` содержит текст ошибки +`GET /api/runs/{run_id}` — текущее состояние: `status` (`queued|running|success|failed`), список `steps` со статусами (`success|failed|skipped|queued`), `result` и `output_summary` при завершении. + +### Live-прогресс (SSE) + +`GET /api/runs/{run_id}/events` — Server-Sent Events. Поздние подписчики получают replay уже накопленных событий, затем tail до завершения. + +```bash +curl -N http://127.0.0.1:7777/api/runs/$RUN_ID/events +``` + +Типы событий: + +- `run_started` — `{run_id, scenario_id, started_at}` +- `step_started` — `{run_id, step_name, index, started_at}` +- `step_finished` — `{run_id, step_name, index, status, started_at, finished_at, message}` +- `run_finished` — `{run_id, status, finished_at, message}` (терминальное, поток закрывается) + +### Каталоги + +- `GET /api/scenarios` — список сценариев с метаданными (`scenario_id`, `name`, `description`, `input_schema`). +- `GET /api/scenarios/{scenario_id}` — полное определение сценария (для визуализации графа в UI). +- `GET /api/tools` — MCP tool catalog: `[{name, description, input_schema}]` (проксируется на `MCP_BASE_URL`). ## Переменные окружения @@ -148,6 +159,11 @@ MCP: - `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`) - `MCP_TIMEOUT_SECONDS` (default: `10`) +Runtime caches: + +- `WORKFLOW_CACHE_MAX_SIZE` (default: `64`) — лимит LRU кэша построенных workflow. +- `RUN_REGISTRY_MAX_SIZE` (default: `200`) — лимит LRU истории run'ов в памяти. + Phoenix tracing: - `PHOENIX_TRACING_ENABLED` (default: `false`) diff --git a/src/agent_os.py b/src/agent_os.py index 3840953..4b37f49 100644 --- a/src/agent_os.py +++ b/src/agent_os.py @@ -18,6 +18,7 @@ from agno.os import AgentOS from src.agent_runner import get_agent from src.api_routes import router as api_router from src.observability import init_phoenix_tracing, is_phoenix_tracing_enabled +from src.run_registry import get_registry load_dotenv() @@ -29,6 +30,12 @@ async def _lifespan(_app: FastAPI): try: yield finally: + active = get_registry().list_active() + if active: + logger.info("Cancelling {} active run(s) on shutdown", len(active)) + for record in active: + if record.task is not None and not record.task.done(): + record.task.cancel() logger.info("Prisma Platform API shutting down") diff --git a/src/api_routes.py b/src/api_routes.py index 505281b..2d1c65d 100644 --- a/src/api_routes.py +++ b/src/api_routes.py @@ -1,22 +1,279 @@ -"""REST routes for scenario execution. +"""REST routes for scenario execution, catalogs and live run events. -These endpoints live on the FastAPI ``base_app`` that AgentOS composes with -its own routes, so the prefix ``/api`` does not collide with AgentOS paths. +Runs are executed asynchronously: ``POST /api/runs`` schedules a background +task and returns immediately with a ``run_id``. Clients consume progress +via ``GET /api/runs/{run_id}/events`` (SSE) or poll +``GET /api/runs/{run_id}`` for a snapshot. """ from __future__ import annotations -from fastapi import APIRouter +import asyncio +import json +from typing import Any, AsyncIterator -from src.mcp_workflow_runner import run_scenario -from src.schemas import ScenarioRunRequest, ScenarioRunResponse +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse +from loguru import logger + +from src.mcp_client import list_mcp_tools +from src.mcp_workflow_runner import run_scenario_async +from src.run_registry import RunRecord, get_registry +from src.scenario_store import ( + ScenarioStoreError, + list_scenario_summaries, + load_scenario_definition, +) +from src.schemas import ( + RunSubmitResponse, + ScenarioRunRequest, + ScenarioRunResponse, + ScenarioSummary, + StepState, + ToolSummary, +) router = APIRouter(prefix="/api", tags=["workflow"]) -@router.post("/runs", response_model=ScenarioRunResponse) -async def post_run(request: ScenarioRunRequest) -> ScenarioRunResponse: - return await run_scenario( +# --------------------------------------------------------------------------- +# Runs +# --------------------------------------------------------------------------- + + +@router.post( + "/runs", + response_model=RunSubmitResponse, + status_code=202, + summary="Schedule a scenario run", + description=( + "Creates a run record and schedules execution in the background. " + "Returns immediately with a `run_id`; poll `GET /api/runs/{run_id}` " + "or subscribe to `GET /api/runs/{run_id}/events` for progress." + ), +) +async def post_run(request: ScenarioRunRequest) -> RunSubmitResponse: + registry = get_registry() + record = registry.create( scenario_id=request.scenario_id, input_data=request.input, ) + record.task = asyncio.create_task(run_scenario_async(record)) + return RunSubmitResponse( + run_id=record.run_id, + scenario_id=record.scenario_id, + status=record.status, + input=record.input, + started_at=record.started_at, + ) + + +@router.get( + "/runs/{run_id}", + response_model=ScenarioRunResponse, + summary="Get run snapshot", + description=( + "Returns the current state of a run. For running runs the `steps` " + "list reflects progress so far; for terminal runs it is complete." + ), + responses={404: {"description": "Unknown run_id"}}, +) +async def get_run(run_id: str) -> ScenarioRunResponse: + record = _require_run(run_id) + if record.response is not None: + return record.response + return _snapshot_from_record(record) + + +@router.get( + "/runs/{run_id}/events", + summary="Live run progress (SSE)", + description=( + "Server-Sent Events stream. Late subscribers receive a replay of " + "buffered events first, then tail new events until `run_finished`.\n\n" + "Event types: `run_started`, `step_started`, `step_finished`, " + "`run_finished`. Each event is JSON in the SSE `data:` field." + ), + responses={ + 200: { + "description": "SSE stream of run events", + "content": { + "text/event-stream": { + "example": ( + "event: run_started\n" + 'data: {"type":"run_started","run_id":"76d6903c-f520-4a40-b0fc-8fed3f7955d2",' + '"scenario_id":"news_source_discovery_v1","started_at":"2026-04-24T09:27:59.873+00:00"}\n\n' + "event: step_started\n" + 'data: {"type":"step_started","run_id":"76d6903c-...","step_name":"search_news_sources",' + '"index":0,"started_at":"2026-04-24T09:27:59.875+00:00"}\n\n' + "event: step_finished\n" + 'data: {"type":"step_finished","run_id":"76d6903c-...","step_name":"search_news_sources",' + '"index":0,"status":"success","started_at":"2026-04-24T09:27:59.875+00:00",' + '"finished_at":"2026-04-24T09:28:00.028+00:00","message":""}\n\n' + "event: run_finished\n" + 'data: {"type":"run_finished","run_id":"76d6903c-...","status":"success",' + '"finished_at":"2026-04-24T09:28:01.750+00:00","message":""}\n\n' + ) + } + }, + }, + 404: {"description": "Unknown run_id"}, + }, +) +async def get_run_events(run_id: str) -> StreamingResponse: + record = _require_run(run_id) + return StreamingResponse( + _event_stream(record), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +# --------------------------------------------------------------------------- +# Scenario catalog +# --------------------------------------------------------------------------- + + +@router.get( + "/scenarios", + response_model=list[ScenarioSummary], + summary="List available scenarios", + description="Returns metadata (id, name, description, input schema) for every scenario in the index.", +) +async def get_scenarios() -> list[ScenarioSummary]: + return [ScenarioSummary(**s) for s in list_scenario_summaries()] + + +_SCENARIO_DEFINITION_EXAMPLE: dict[str, Any] = { + "schema_version": "1", + "scenario_id": "news_source_discovery_v1", + "name": "News Source Discovery V1", + "description": "Find earliest news source using sequential MCP tools.", + "input_schema": { + "type": "object", + "required": ["url"], + "properties": { + "url": {"type": "string", "description": "URL of source news article"} + }, + }, + "steps": [ + { + "name": "search_news_sources", + "type": "tool", + "tool": "search_news_sources", + "input": {"url": {"from": "input.url"}}, + "required_input_fields": ["url"], + } + ], +} + + +@router.get( + "/scenarios/{scenario_id}", + summary="Get full scenario definition", + description="Returns the raw scenario JSON (including the `steps` graph) for UI visualization.", + responses={ + 200: { + "description": "Scenario definition", + "content": {"application/json": {"example": _SCENARIO_DEFINITION_EXAMPLE}}, + }, + 404: {"description": "Unknown scenario_id"}, + }, +) +async def get_scenario(scenario_id: str) -> dict[str, Any]: + try: + return load_scenario_definition(scenario_id) + except ScenarioStoreError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + + +# --------------------------------------------------------------------------- +# Tool catalog +# --------------------------------------------------------------------------- + + +@router.get( + "/tools", + response_model=list[ToolSummary], + summary="List MCP tools", + description="Proxies MCP `list_tools()` and returns name, description, and input schema for each tool.", + responses={502: {"description": "MCP transport error"}}, +) +async def get_tools() -> list[ToolSummary]: + try: + tools = await list_mcp_tools() + except RuntimeError as exc: + logger.warning("Failed to fetch MCP tools: {}", exc) + raise HTTPException(status_code=502, detail=str(exc)) from exc + return [ToolSummary(**t) for t in tools] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _require_run(run_id: str) -> RunRecord: + record = get_registry().get(run_id) + if record is None: + raise HTTPException(status_code=404, detail=f"Unknown run_id: {run_id}") + return record + + +def _snapshot_from_record(record: RunRecord) -> ScenarioRunResponse: + """Build a partial ScenarioRunResponse for a still-running or pre-start run.""" + steps: list[StepState] = [] + for event in record.events: + if event.get("type") != "step_finished": + continue + steps.append( + StepState( + node_id=str(event.get("step_name", "")), + status=event.get("status", "failed"), + started_at=event.get("started_at"), + finished_at=event.get("finished_at"), + message=str(event.get("message", "")), + ) + ) + return ScenarioRunResponse( + scenario_id=record.scenario_id, + status=record.status, + message=record.message, + input=record.input, + steps=steps, + run_id=record.run_id, + ) + + +async def _event_stream(record: RunRecord) -> AsyncIterator[bytes]: + """Replay buffered events, then tail a fresh subscriber queue. + + The snapshot/subscribe pair runs without any intervening ``await``, so no + emitted event can slip between the replay cutoff and the subscription. + Events emitted during replay land in the queue and are drained afterwards. + """ + queue: asyncio.Queue = asyncio.Queue() + buffered = list(record.events) + record.subscribers.append(queue) + try: + for event in buffered: + yield _format_sse(event) + if record.is_terminal(): + return + while True: + event = await queue.get() + if event is None: + return + yield _format_sse(event) + finally: + if queue in record.subscribers: + record.subscribers.remove(queue) + + +def _format_sse(event: dict[str, Any]) -> bytes: + event_type = str(event.get("type", "message")) + payload = json.dumps(event, ensure_ascii=False) + return f"event: {event_type}\ndata: {payload}\n\n".encode("utf-8") diff --git a/src/mcp_client.py b/src/mcp_client.py index 26cccd3..2172715 100644 --- a/src/mcp_client.py +++ b/src/mcp_client.py @@ -63,3 +63,34 @@ async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, return parsed raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}") + + +async def list_mcp_tools() -> list[dict[str, Any]]: + """Fetch the MCP tool catalog as plain dicts for API serialization.""" + try: + async with streamablehttp_client(url=_mcp_url()) as session_params: + read, write = session_params[0:2] + async with ClientSession( + read, + write, + read_timeout_seconds=timedelta(seconds=_timeout_seconds()), + ) as session: + await session.initialize() + result = await session.list_tools() + except TimeoutError as exc: + logger.warning("MCP list_tools timeout") + raise RuntimeError("MCP list_tools timeout") from exc + except Exception as exc: + logger.exception("MCP list_tools transport error") + raise RuntimeError("MCP list_tools transport error") from exc + + tools: list[dict[str, Any]] = [] + for tool in result.tools: + tools.append( + { + "name": tool.name, + "description": tool.description, + "input_schema": tool.inputSchema, + } + ) + return tools diff --git a/src/mcp_workflow_runner.py b/src/mcp_workflow_runner.py index d9868bd..e921a9c 100644 --- a/src/mcp_workflow_runner.py +++ b/src/mcp_workflow_runner.py @@ -8,6 +8,7 @@ missing fields, invokes the tool, and collects per-step results back into from __future__ import annotations +import asyncio from collections import OrderedDict from copy import deepcopy from datetime import datetime, timezone @@ -20,7 +21,15 @@ from agno.workflow.workflow import Workflow from loguru import logger from src.mcp_client import call_mcp_tool -from src.schemas import ScenarioRunResponse, StepState +from src.run_registry import EventEmitter, RunRecord +from src.schemas import ( + RunFinishedEvent, + RunStartedEvent, + ScenarioRunResponse, + StepFinishedEvent, + StepStartedEvent, + StepState, +) from src.scenario_store import ScenarioStoreError, load_scenario_definition from src.step_planner import plan_arguments, planner_enabled from src.template import ( @@ -99,6 +108,7 @@ async def _execute_one_call( def _build_tool_executor( step_spec: dict[str, Any], + step_index: int = 0, ) -> Callable[[StepInput, dict[str, Any]], Awaitable[StepOutput]]: step_name = str(step_spec["name"]) tool_name = str(step_spec["tool"]) @@ -120,6 +130,18 @@ def _build_tool_executor( async def executor(_step_input: StepInput, session_state: dict[str, Any]) -> StepOutput: started_at = _utc_now_iso() scope = _build_scope(session_state) + emitter: EventEmitter | None = session_state.get("_emitter") + run_id: str = session_state.get("_run_id", "") + + if emitter is not None: + await emitter.emit( + StepStartedEvent( + run_id=run_id, + step_name=step_name, + index=step_index, + started_at=started_at, + ).model_dump() + ) try: if foreach_from: @@ -189,6 +211,17 @@ def _build_tool_executor( } session_state.setdefault("steps", {})[step_name] = step_payload + if emitter is not None: + await emitter.emit( + StepFinishedEvent( + run_id=run_id, + step_name=step_name, + index=step_index, + status="success", + started_at=started_at, + finished_at=step_payload["finished_at"], + ).model_dump() + ) return StepOutput( content=json.dumps(step_payload, ensure_ascii=False), success=True, @@ -204,6 +237,18 @@ def _build_tool_executor( } session_state.setdefault("steps", {})[step_name] = error_payload logger.exception("Step {} failed (tool={})", step_name, tool_name) + if emitter is not None: + await emitter.emit( + StepFinishedEvent( + run_id=run_id, + step_name=step_name, + index=step_index, + status="failed", + started_at=started_at, + finished_at=finished_at, + message=str(exc), + ).model_dump() + ) raise RuntimeError(f"{step_name} failed: {exc}") from exc return executor @@ -215,7 +260,7 @@ def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow: raise ScenarioStoreError("Scenario must contain non-empty steps list") workflow_steps: list[Step] = [] - for raw_step in raw_steps: + for step_index, raw_step in enumerate(raw_steps): if not isinstance(raw_step, dict): raise ScenarioStoreError("Each scenario step must be object") if raw_step.get("type") != "tool": @@ -233,7 +278,7 @@ def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow: Step( name=step_name, description=str(raw_step.get("description", step_name)), - executor=_build_tool_executor(raw_step), + executor=_build_tool_executor(raw_step, step_index=step_index), max_retries=0, on_error="fail", ) @@ -320,6 +365,8 @@ async def run_scenario( *, scenario_id: str, input_data: dict[str, Any], + emitter: EventEmitter | None = None, + run_id: str = "", ) -> ScenarioRunResponse: try: scenario = load_scenario_definition(scenario_id) @@ -329,6 +376,7 @@ async def run_scenario( status="failed", message=str(exc), input=input_data, + run_id=run_id or None, ) scenario_name = str(scenario.get("name", scenario_id)) @@ -341,12 +389,15 @@ async def run_scenario( message=str(exc), input=input_data, scenario_name=scenario_name, + run_id=run_id or None, ) # Fresh per-run state that Agno owns during arun(..., session_state=...). session_state: dict[str, Any] = { "input": deepcopy(input_data), "steps": {}, + "_emitter": emitter, + "_run_id": run_id, } workflow_error: str | None = None @@ -404,6 +455,75 @@ async def run_scenario( scenario_name=scenario_name, workflow_name=workflow.name, result=result, - run_id=str(getattr(run_output, "run_id", "")) or None, + run_id=run_id or str(getattr(run_output, "run_id", "")) or None, session_id=str(getattr(run_output, "session_id", "")) or None, ) + + +async def run_scenario_async(record: RunRecord) -> None: + """Execute a scenario inside a background task, emitting SSE events. + + Lifecycle: + queued → running (run_started) → success|failed (run_finished) → sentinel + """ + emitter = EventEmitter(record) + record.status = "running" + record.started_at = _utc_now_iso() + + try: + await emitter.emit( + RunStartedEvent( + run_id=record.run_id, + scenario_id=record.scenario_id, + started_at=record.started_at, + ).model_dump() + ) + + response = await run_scenario( + scenario_id=record.scenario_id, + input_data=record.input, + emitter=emitter, + run_id=record.run_id, + ) + + record.response = response + record.status = response.status + record.message = response.message + record.finished_at = _utc_now_iso() + + await emitter.emit( + RunFinishedEvent( + run_id=record.run_id, + status=response.status, + finished_at=record.finished_at, + message=response.message, + ).model_dump() + ) + except asyncio.CancelledError: + record.status = "failed" + record.message = "cancelled" + record.finished_at = _utc_now_iso() + await emitter.emit( + RunFinishedEvent( + run_id=record.run_id, + status="failed", + finished_at=record.finished_at, + message="cancelled", + ).model_dump() + ) + raise + except Exception as exc: + logger.exception("Run {} crashed", record.run_id) + record.status = "failed" + record.message = str(exc) + record.finished_at = _utc_now_iso() + await emitter.emit( + RunFinishedEvent( + run_id=record.run_id, + status="failed", + finished_at=record.finished_at, + message=str(exc), + ).model_dump() + ) + finally: + await emitter.close() diff --git a/src/run_registry.py b/src/run_registry.py new file mode 100644 index 0000000..c45f57c --- /dev/null +++ b/src/run_registry.py @@ -0,0 +1,126 @@ +"""In-memory registry of scenario runs and their event streams. + +Each submitted run gets a ``RunRecord`` holding: + +- mutable status / partial step state updated by the workflow runner; +- an append-only event log (used to replay history to late SSE subscribers); +- a live asyncio queue that SSE endpoints tail until a terminal ``None`` + sentinel is delivered. + +The registry is an LRU with ``RUN_REGISTRY_MAX_SIZE`` bound so long-running +processes do not leak run history. +""" + +from __future__ import annotations + +import asyncio +import os +import uuid +from collections import OrderedDict +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any + +from loguru import logger + +from src.schemas import ScenarioRunResponse + + +_TERMINAL_STATUSES = {"success", "failed"} + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _env_int(name: str, default: int) -> int: + value = os.getenv(name) + return int(value) if value is not None else default + + +@dataclass +class RunRecord: + run_id: str + scenario_id: str + input: dict[str, Any] + status: str = "queued" + started_at: str = field(default_factory=_utc_now_iso) + finished_at: str | None = None + message: str = "" + response: ScenarioRunResponse | None = None + events: list[dict[str, Any]] = field(default_factory=list) + subscribers: list[asyncio.Queue] = field(default_factory=list) + task: asyncio.Task | None = None + + def is_terminal(self) -> bool: + return self.status in _TERMINAL_STATUSES + + +class EventEmitter: + """Fans events out to every live SSE subscriber plus the replay buffer. + + Subscribers are ``asyncio.Queue`` instances registered by SSE endpoints; + ``None`` is used as a terminal sentinel so consumers can exit cleanly. + """ + + def __init__(self, record: RunRecord) -> None: + self._record = record + + async def emit(self, event: dict[str, Any]) -> None: + self._record.events.append(event) + for queue in list(self._record.subscribers): + queue.put_nowait(event) + + async def close(self) -> None: + for queue in list(self._record.subscribers): + queue.put_nowait(None) + + +class RunRegistry: + def __init__(self, max_size: int | None = None) -> None: + self._records: "OrderedDict[str, RunRecord]" = OrderedDict() + self._max_size = max_size or _env_int("RUN_REGISTRY_MAX_SIZE", 200) + + def create(self, *, scenario_id: str, input_data: dict[str, Any]) -> RunRecord: + run_id = str(uuid.uuid4()) + record = RunRecord( + run_id=run_id, + scenario_id=scenario_id, + input=input_data, + ) + self._records[run_id] = record + self._evict_if_needed() + logger.info("Run {} created for scenario={}", run_id, scenario_id) + return record + + def get(self, run_id: str) -> RunRecord | None: + record = self._records.get(run_id) + if record is not None: + self._records.move_to_end(run_id) + return record + + def list_active(self) -> list[RunRecord]: + return [r for r in self._records.values() if not r.is_terminal()] + + def _evict_if_needed(self) -> None: + while len(self._records) > self._max_size: + evicted_id, evicted = self._records.popitem(last=False) + if evicted.task is not None and not evicted.task.done(): + # Refuse to silently drop an in-flight run — re-insert and stop. + self._records[evicted_id] = evicted + self._records.move_to_end(evicted_id, last=False) + logger.warning( + "Run registry at capacity {} but oldest run is still active; " + "not evicting {}", + self._max_size, + evicted_id, + ) + return + logger.debug("Evicted run {} from registry", evicted_id) + + +_registry = RunRegistry() + + +def get_registry() -> RunRegistry: + return _registry diff --git a/src/scenario_store.py b/src/scenario_store.py index 6f9c8d5..5342a0b 100644 --- a/src/scenario_store.py +++ b/src/scenario_store.py @@ -65,3 +65,23 @@ def load_scenario_definition(scenario_id: str) -> dict[str, Any]: "Scenario file scenario_id does not match requested scenario_id" ) return scenario + + +def list_scenario_summaries() -> list[dict[str, Any]]: + """Return metadata for every scenario in the index (no steps).""" + summaries: list[dict[str, Any]] = [] + for scenario_id in load_scenario_index().keys(): + try: + scenario = load_scenario_definition(scenario_id) + except ScenarioStoreError: + # Broken entry in the index should not take the whole catalog down. + continue + summaries.append( + { + "scenario_id": scenario_id, + "name": scenario.get("name"), + "description": scenario.get("description"), + "input_schema": scenario.get("input_schema"), + } + ) + return summaries diff --git a/src/schemas.py b/src/schemas.py index a04bcfe..d3083c9 100644 --- a/src/schemas.py +++ b/src/schemas.py @@ -8,12 +8,24 @@ from pydantic import BaseModel, Field RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"] StepStatus = Literal["queued", "running", "success", "failed", "skipped", "waiting_human"] +EventType = Literal["run_started", "step_started", "step_finished", "run_finished"] class ScenarioRunRequest(BaseModel): scenario_id: str = "news_source_discovery_v1" input: dict[str, Any] = Field(default_factory=dict) + model_config = { + "json_schema_extra": { + "examples": [ + { + "scenario_id": "news_source_discovery_v1", + "input": {"url": "https://example.com/news/article"}, + } + ] + } + } + class StepState(BaseModel): node_id: str @@ -22,6 +34,20 @@ class StepState(BaseModel): finished_at: str | None = None message: str = "" + model_config = { + "json_schema_extra": { + "examples": [ + { + "node_id": "search_news_sources", + "status": "success", + "started_at": "2026-04-24T09:27:59.875680+00:00", + "finished_at": "2026-04-24T09:28:00.028730+00:00", + "message": "", + } + ] + } + } + class ScenarioRunResponse(BaseModel): scenario_id: str @@ -35,3 +61,218 @@ class ScenarioRunResponse(BaseModel): result: dict[str, Any] | None = None run_id: str | None = None session_id: str | None = None + + model_config = { + "json_schema_extra": { + "examples": [ + { + "scenario_id": "news_source_discovery_v1", + "status": "success", + "message": "", + "input": {"url": "https://example.com/news/article"}, + "steps": [ + { + "node_id": "search_news_sources", + "status": "success", + "started_at": "2026-04-24T09:27:59.875680+00:00", + "finished_at": "2026-04-24T09:28:00.028730+00:00", + "message": "", + }, + { + "node_id": "generate_summary", + "status": "success", + "started_at": "2026-04-24T09:28:00.781744+00:00", + "finished_at": "2026-04-24T09:28:00.879028+00:00", + "message": "", + }, + ], + "output_summary": "Самым ранним источником считается https://news-a.example/article-1", + "scenario_name": "News Source Discovery V1", + "workflow_name": "news_source_discovery_v1", + "result": { + "ok": True, + "tool_name": "generate_summary", + "payload": { + "input_count": 3, + "summary": "Самым ранним источником считается https://news-a.example/article-1", + }, + }, + "run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2", + "session_id": None, + } + ] + } + } + + +class RunSubmitResponse(BaseModel): + run_id: str + scenario_id: str + status: RunStatus + input: dict[str, Any] + started_at: str + + model_config = { + "json_schema_extra": { + "examples": [ + { + "run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2", + "scenario_id": "news_source_discovery_v1", + "status": "queued", + "input": {"url": "https://example.com/news/article"}, + "started_at": "2026-04-24T09:27:59.873049+00:00", + } + ] + } + } + + +class ScenarioSummary(BaseModel): + scenario_id: str + name: str | None = None + description: str | None = None + input_schema: dict[str, Any] | None = None + + model_config = { + "json_schema_extra": { + "examples": [ + { + "scenario_id": "news_source_discovery_v1", + "name": "News Source Discovery V1", + "description": "Find earliest news source using sequential MCP tools.", + "input_schema": { + "type": "object", + "required": ["url"], + "properties": { + "url": { + "type": "string", + "description": "URL of source news article", + } + }, + }, + } + ] + } + } + + +class ToolSummary(BaseModel): + name: str + description: str | None = None + input_schema: dict[str, Any] | None = None + + model_config = { + "json_schema_extra": { + "examples": [ + { + "name": "search_news_sources", + "description": "Search for candidate news source URLs for a given article.", + "input_schema": { + "type": "object", + "required": ["url"], + "properties": { + "url": {"type": "string", "title": "Url"} + }, + "title": "search_news_sourcesArguments", + }, + } + ] + } + } + + +# --------------------------------------------------------------------------- +# SSE event models. Client parses by the `type` field. +# --------------------------------------------------------------------------- + + +class RunStartedEvent(BaseModel): + type: Literal["run_started"] = "run_started" + run_id: str + scenario_id: str + started_at: str + + model_config = { + "json_schema_extra": { + "examples": [ + { + "type": "run_started", + "run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2", + "scenario_id": "news_source_discovery_v1", + "started_at": "2026-04-24T09:27:59.873397+00:00", + } + ] + } + } + + +class StepStartedEvent(BaseModel): + type: Literal["step_started"] = "step_started" + run_id: str + step_name: str + index: int + started_at: str + + model_config = { + "json_schema_extra": { + "examples": [ + { + "type": "step_started", + "run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2", + "step_name": "search_news_sources", + "index": 0, + "started_at": "2026-04-24T09:27:59.875680+00:00", + } + ] + } + } + + +class StepFinishedEvent(BaseModel): + type: Literal["step_finished"] = "step_finished" + run_id: str + step_name: str + index: int + status: StepStatus + started_at: str | None = None + finished_at: str | None = None + message: str = "" + + model_config = { + "json_schema_extra": { + "examples": [ + { + "type": "step_finished", + "run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2", + "step_name": "search_news_sources", + "index": 0, + "status": "success", + "started_at": "2026-04-24T09:27:59.875680+00:00", + "finished_at": "2026-04-24T09:28:00.028730+00:00", + "message": "", + } + ] + } + } + + +class RunFinishedEvent(BaseModel): + type: Literal["run_finished"] = "run_finished" + run_id: str + status: RunStatus + finished_at: str + message: str = "" + + model_config = { + "json_schema_extra": { + "examples": [ + { + "type": "run_finished", + "run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2", + "status": "success", + "finished_at": "2026-04-24T09:28:01.750206+00:00", + "message": "", + } + ] + } + }