Compare commits

..

3 Commits

Author SHA1 Message Date
Barabashka 2a81f5f58f Async выполнение сценариев с SSE-прогрессом + каталоги tools/scenarios
POST /api/runs теперь планирует исполнение в фоновой asyncio.Task и
   возвращает run_id (202 Accepted) — UI больше не блокируется на время
   всего workflow.

   Новый модуль src/run_registry.py держит in-memory LRU (лимит
   RUN_REGISTRY_MAX_SIZE, default 200) с RunRecord на каждый запуск:
   append-only буфер событий для replay + список подписчиков-очередей
   для live tail. EventEmitter пишет в буфер и фан-аутит по очередям.

   Новые endpoints:
   - GET /api/runs/{run_id}           снапшот состояния (частичный для running)
   - GET /api/runs/{run_id}/events    SSE: run_started, step_started,
                                      step_finished, run_finished
   - GET /api/scenarios               список сценариев с метаданными
   - GET /api/scenarios/{id}          полное определение для UI-графа
   - GET /api/tools                   проксирование MCP list_tools

   mcp_workflow_runner дополнен хуком emitter'а в session_state и
   обёрткой run_scenario_async, которая управляет лайфсайклом RunRecord:
   queued → running → success/failed + terminal sentinel в очереди
   подписчиков. На shutdown lifespan отменяет активные таски.

   Все модели в schemas.py и dict-endpoints получили реалистичные
   examples для /docs вместо дефолтного additionalProp1.
2026-04-24 12:40:49 +03:00
Barabashka 3357b3c4dd Усилить надёжность: логирование, lifespan, LRU-кэш и fail-fast семантика
Подключить loguru и заменить молчаливые except на warning/exception

в step_planner, mcp_client и mcp_workflow_runner — раньше ошибки

терялись в пустых дикт-возвратах.\n

Перенести Phoenix tracing из module-level в FastAPI lifespan, чтобы

импорт agent_os не поднимал трейсер в тестах и тулах.\n

Заменить неограниченный dict _workflow_cache на OrderedDict-LRU

с лимитом WORKFLOW_CACHE_MAX_SIZE (default 64) — чтобы кэш не рос

бесконечно при разных scenario_id.\n

Зафиксировать инвариант fail-fast: шаги, не дошедшие до исполнения

из-за падения upstream, возвращаются со статусом skipped (для UI),

а не queued; run помечается success только если все payload.ok.\n

Добавить module docstrings во все модули src/ по STYLE_GUIDE cookbook.

Запинить версии зависимостей в requirements.txt.
2026-04-24 12:00:00 +03:00
Barabashka 4d037e52eb Упрощение MCP workflow runner и обновить контракт /api/runs.
Перенесены planner/template хелперы в отдельные модули, выровнен формат статусов и сообщений в ответе, а также обновлены .env.example и README под текущие переменные и поведение API.
2026-04-23 12:41:33 +03:00
15 changed files with 1479 additions and 633 deletions
+18 -3
View File
@@ -1,18 +1,33 @@
# Agent
AGENT_ID=prisma-agent AGENT_ID=prisma-agent
OLLAMA_MODEL_ID=gemma4:31b
OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0
AGENT_MARKDOWN=false AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true AGENT_DEBUG_MODE=true
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly." AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
# Agent model (Ollama)
OLLAMA_MODEL_ID=gemma4:31b
OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0
# API runtime
AGENT_OS_HOST=127.0.0.1 AGENT_OS_HOST=127.0.0.1
AGENT_OS_PORT=7777 AGENT_OS_PORT=7777
# Planner
PLANNER_ENABLED=false
PLANNER_REPAIR_ATTEMPTS=3
# Planner model (Polza)
POLZA_BASE_URL=https://api.polza.ai/v1 POLZA_BASE_URL=https://api.polza.ai/v1
POLZA_MODEL_ID=google/gemma-4-31b-it POLZA_MODEL_ID=google/gemma-4-31b-it
POLZA_API_KEY=key POLZA_API_KEY=key
POLZA_TEMPERATURE=0 POLZA_TEMPERATURE=0
# MCP
MCP_BASE_URL=http://127.0.0.1:8081/mcp MCP_BASE_URL=http://127.0.0.1:8081/mcp
MCP_TIMEOUT_SECONDS=10 MCP_TIMEOUT_SECONDS=10
# Observability (Phoenix)
PHOENIX_TRACING_ENABLED=false PHOENIX_TRACING_ENABLED=false
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform PHOENIX_PROJECT_NAME=prisma-platform
+1
View File
@@ -25,6 +25,7 @@ dist/
.vscode/ .vscode/
.DS_Store .DS_Store
.cursor .cursor
.claude
# Cookbook code # Cookbook code
vendor/agno/cookbook/ vendor/agno/cookbook/
+61 -25
View File
@@ -25,7 +25,8 @@ prisma_platform/
├── scenarios/ ├── scenarios/
│ ├── index.json │ ├── index.json
│ └── news_source_discovery/ │ └── news_source_discovery/
── v1.json ── v1.json
│ └── v1_planner_repair.json
└── src/ └── src/
├── __init__.py ├── __init__.py
├── api_routes.py ├── api_routes.py
@@ -35,6 +36,8 @@ prisma_platform/
├── mcp_workflow_runner.py ├── mcp_workflow_runner.py
├── observability.py ├── observability.py
├── scenario_store.py ├── scenario_store.py
├── step_planner.py
├── template.py
└── schemas.py └── schemas.py
``` ```
@@ -70,53 +73,81 @@ cd /home/worker/projects/prisma_platform
- `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/docs`
- `http://127.0.0.1:7777/redoc` - `http://127.0.0.1:7777/redoc`
## Запуск сценария через HTTP ## HTTP API
- `POST http://127.0.0.1:7777/api/runs` ### Запуск сценария (async)
Тело запроса: `POST /api/runs` — планирует выполнение сценария и **сразу** возвращает `run_id`. Само выполнение идёт в фоне.
```json
{
"scenario_id": "news_source_discovery_v1",
"input": {
"url": "https://example.com/news"
}
}
```
Пример:
```bash ```bash
curl -s -X POST "http://127.0.0.1:7777/api/runs" \ curl -s -X POST "http://127.0.0.1:7777/api/runs" \
-H "Content-Type: application/json" \ -H "Content-Type: application/json" \
-d '{ -d '{
"scenario_id": "news_source_discovery_v1", "scenario_id": "news_source_discovery_v1",
"input": { "input": { "url": "https://example.com/news" }
"url": "https://example.com/news"
}
}' }'
``` ```
Успешный ответ содержит: Ответ (`202 Accepted`):
- `status=success` ```json
- список `steps` со статусами шагов {
- `output_summary` "run_id": "f3d9…",
- `result` итогового шага "scenario_id": "news_source_discovery_v1",
"status": "queued",
"input": { "url": "..." },
"started_at": "2026-04-24T..."
}
```
### Снапшот состояния
`GET /api/runs/{run_id}` — текущее состояние: `status` (`queued|running|success|failed`), список `steps` со статусами (`success|failed|skipped|queued`), `result` и `output_summary` при завершении.
### Live-прогресс (SSE)
`GET /api/runs/{run_id}/events` — Server-Sent Events. Поздние подписчики получают replay уже накопленных событий, затем tail до завершения.
```bash
curl -N http://127.0.0.1:7777/api/runs/$RUN_ID/events
```
Типы событий:
- `run_started``{run_id, scenario_id, started_at}`
- `step_started``{run_id, step_name, index, started_at}`
- `step_finished``{run_id, step_name, index, status, started_at, finished_at, message}`
- `run_finished``{run_id, status, finished_at, message}` (терминальное, поток закрывается)
### Каталоги
- `GET /api/scenarios` — список сценариев с метаданными (`scenario_id`, `name`, `description`, `input_schema`).
- `GET /api/scenarios/{scenario_id}` — полное определение сценария (для визуализации графа в UI).
- `GET /api/tools` — MCP tool catalog: `[{name, description, input_schema}]` (проксируется на `MCP_BASE_URL`).
## Переменные окружения ## Переменные окружения
Основные: Agent:
- `AGENT_ID` (default: `prisma-agent`) - `AGENT_ID` (default: `prisma-agent`)
- `AGENT_MARKDOWN` (default: `false`) - `AGENT_MARKDOWN` (default: `false`)
- `AGENT_DEBUG_MODE` (default: `true`) - `AGENT_DEBUG_MODE` (default: `true`)
- `AGENT_INSTRUCTIONS` - `AGENT_INSTRUCTIONS`
- `OLLAMA_MODEL_ID` (default: `gemma4:31b`)
- `OLLAMA_HOST` (default: `http://localhost:11435`)
- `OLLAMA_TEMPERATURE` (default: `0`)
API runtime:
- `AGENT_OS_HOST` (default: `127.0.0.1`) - `AGENT_OS_HOST` (default: `127.0.0.1`)
- `AGENT_OS_PORT` (default: `7777`) - `AGENT_OS_PORT` (default: `7777`)
Planner-модель (`polza.ai`): Planner:
- `PLANNER_ENABLED` (default: `false`)
- `PLANNER_REPAIR_ATTEMPTS` (default: `3`)
Planner model (`polza.ai`):
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`) - `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`) - `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
@@ -128,6 +159,11 @@ MCP:
- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`) - `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`)
- `MCP_TIMEOUT_SECONDS` (default: `10`) - `MCP_TIMEOUT_SECONDS` (default: `10`)
Runtime caches:
- `WORKFLOW_CACHE_MAX_SIZE` (default: `64`) — лимит LRU кэша построенных workflow.
- `RUN_REGISTRY_MAX_SIZE` (default: `200`) — лимит LRU истории run'ов в памяти.
Phoenix tracing: Phoenix tracing:
- `PHOENIX_TRACING_ENABLED` (default: `false`) - `PHOENIX_TRACING_ENABLED` (default: `false`)
+10 -9
View File
@@ -1,9 +1,10 @@
agno agno==2.5.17
fastapi fastapi==0.136.0
uvicorn uvicorn==0.44.0
python-dotenv python-dotenv==1.2.2
ollama ollama==0.6.1
socksio socksio==1.0.0
openai openai==2.32.0
arize-phoenix-otel arize-phoenix-otel==0.15.0
openinference-instrumentation-agno openinference-instrumentation-agno==0.1.30
loguru==0.7.3
+32 -4
View File
@@ -1,26 +1,54 @@
"""AgentOS entrypoint: wires the agent, REST routes and FastAPI lifespan.
Phoenix tracing is initialized from the lifespan (not at import time) so that
importing this module for tooling or tests does not spin up the tracer.
"""
from __future__ import annotations
import os import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI from fastapi import FastAPI
from loguru import logger
from agno.os import AgentOS from agno.os import AgentOS
from src.api_routes import router as api_router
from src.agent_runner import get_agent from src.agent_runner import get_agent
from src.observability import init_phoenix_tracing from src.api_routes import router as api_router
from src.observability import init_phoenix_tracing, is_phoenix_tracing_enabled
from src.run_registry import get_registry
load_dotenv() load_dotenv()
_tracing_enabled = init_phoenix_tracing()
@asynccontextmanager
async def _lifespan(_app: FastAPI):
init_phoenix_tracing()
logger.info("Prisma Platform API starting up")
try:
yield
finally:
active = get_registry().list_active()
if active:
logger.info("Cancelling {} active run(s) on shutdown", len(active))
for record in active:
if record.task is not None and not record.task.done():
record.task.cancel()
logger.info("Prisma Platform API shutting down")
_agent = get_agent() _agent = get_agent()
_base_app = FastAPI( _base_app = FastAPI(
title="Prisma Platform API", title="Prisma Platform API",
version="0.1.0", version="0.1.0",
lifespan=_lifespan,
) )
_base_app.include_router(api_router) _base_app.include_router(api_router)
_agent_os = AgentOS( _agent_os = AgentOS(
agents=[_agent], agents=[_agent],
tracing=_tracing_enabled, tracing=is_phoenix_tracing_enabled(),
base_app=_base_app, base_app=_base_app,
) )
app = _agent_os.get_app() app = _agent_os.get_app()
+8
View File
@@ -1,3 +1,11 @@
"""Lazy factory for the top-level Prisma agent.
Config is read from environment variables so the same module can be used by
the API server, CLI tools and tests without re-wiring.
"""
from __future__ import annotations
import os import os
from agno.agent import Agent from agno.agent import Agent
+273 -9
View File
@@ -1,15 +1,279 @@
from fastapi import APIRouter """REST routes for scenario execution, catalogs and live run events.
from src.mcp_workflow_runner import run_scenario_workflow Runs are executed asynchronously: ``POST /api/runs`` schedules a background
from src.schemas import ScenarioRunRequest, ScenarioRunResponse task and returns immediately with a ``run_id``. Clients consume progress
via ``GET /api/runs/{run_id}/events`` (SSE) or poll
``GET /api/runs/{run_id}`` for a snapshot.
"""
from __future__ import annotations
import asyncio
import json
from typing import Any, AsyncIterator
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from loguru import logger
from src.mcp_client import list_mcp_tools
from src.mcp_workflow_runner import run_scenario_async
from src.run_registry import RunRecord, get_registry
from src.scenario_store import (
ScenarioStoreError,
list_scenario_summaries,
load_scenario_definition,
)
from src.schemas import (
RunSubmitResponse,
ScenarioRunRequest,
ScenarioRunResponse,
ScenarioSummary,
StepState,
ToolSummary,
)
router = APIRouter(prefix="/api", tags=["workflow"]) router = APIRouter(prefix="/api", tags=["workflow"])
@router.post("/runs", response_model=ScenarioRunResponse) # ---------------------------------------------------------------------------
async def run_scenario(request: ScenarioRunRequest) -> ScenarioRunResponse: # Runs
result = await run_scenario_workflow( # ---------------------------------------------------------------------------
input_data=request.input,
scenario_id=request.scenario_id,
@router.post(
"/runs",
response_model=RunSubmitResponse,
status_code=202,
summary="Schedule a scenario run",
description=(
"Creates a run record and schedules execution in the background. "
"Returns immediately with a `run_id`; poll `GET /api/runs/{run_id}` "
"or subscribe to `GET /api/runs/{run_id}/events` for progress."
),
) )
return ScenarioRunResponse.model_validate(result) async def post_run(request: ScenarioRunRequest) -> RunSubmitResponse:
registry = get_registry()
record = registry.create(
scenario_id=request.scenario_id,
input_data=request.input,
)
record.task = asyncio.create_task(run_scenario_async(record))
return RunSubmitResponse(
run_id=record.run_id,
scenario_id=record.scenario_id,
status=record.status,
input=record.input,
started_at=record.started_at,
)
@router.get(
"/runs/{run_id}",
response_model=ScenarioRunResponse,
summary="Get run snapshot",
description=(
"Returns the current state of a run. For running runs the `steps` "
"list reflects progress so far; for terminal runs it is complete."
),
responses={404: {"description": "Unknown run_id"}},
)
async def get_run(run_id: str) -> ScenarioRunResponse:
record = _require_run(run_id)
if record.response is not None:
return record.response
return _snapshot_from_record(record)
@router.get(
"/runs/{run_id}/events",
summary="Live run progress (SSE)",
description=(
"Server-Sent Events stream. Late subscribers receive a replay of "
"buffered events first, then tail new events until `run_finished`.\n\n"
"Event types: `run_started`, `step_started`, `step_finished`, "
"`run_finished`. Each event is JSON in the SSE `data:` field."
),
responses={
200: {
"description": "SSE stream of run events",
"content": {
"text/event-stream": {
"example": (
"event: run_started\n"
'data: {"type":"run_started","run_id":"76d6903c-f520-4a40-b0fc-8fed3f7955d2",'
'"scenario_id":"news_source_discovery_v1","started_at":"2026-04-24T09:27:59.873+00:00"}\n\n'
"event: step_started\n"
'data: {"type":"step_started","run_id":"76d6903c-...","step_name":"search_news_sources",'
'"index":0,"started_at":"2026-04-24T09:27:59.875+00:00"}\n\n'
"event: step_finished\n"
'data: {"type":"step_finished","run_id":"76d6903c-...","step_name":"search_news_sources",'
'"index":0,"status":"success","started_at":"2026-04-24T09:27:59.875+00:00",'
'"finished_at":"2026-04-24T09:28:00.028+00:00","message":""}\n\n'
"event: run_finished\n"
'data: {"type":"run_finished","run_id":"76d6903c-...","status":"success",'
'"finished_at":"2026-04-24T09:28:01.750+00:00","message":""}\n\n'
)
}
},
},
404: {"description": "Unknown run_id"},
},
)
async def get_run_events(run_id: str) -> StreamingResponse:
record = _require_run(run_id)
return StreamingResponse(
_event_stream(record),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
# ---------------------------------------------------------------------------
# Scenario catalog
# ---------------------------------------------------------------------------
@router.get(
"/scenarios",
response_model=list[ScenarioSummary],
summary="List available scenarios",
description="Returns metadata (id, name, description, input schema) for every scenario in the index.",
)
async def get_scenarios() -> list[ScenarioSummary]:
return [ScenarioSummary(**s) for s in list_scenario_summaries()]
_SCENARIO_DEFINITION_EXAMPLE: dict[str, Any] = {
"schema_version": "1",
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": ["url"],
"properties": {
"url": {"type": "string", "description": "URL of source news article"}
},
},
"steps": [
{
"name": "search_news_sources",
"type": "tool",
"tool": "search_news_sources",
"input": {"url": {"from": "input.url"}},
"required_input_fields": ["url"],
}
],
}
@router.get(
"/scenarios/{scenario_id}",
summary="Get full scenario definition",
description="Returns the raw scenario JSON (including the `steps` graph) for UI visualization.",
responses={
200: {
"description": "Scenario definition",
"content": {"application/json": {"example": _SCENARIO_DEFINITION_EXAMPLE}},
},
404: {"description": "Unknown scenario_id"},
},
)
async def get_scenario(scenario_id: str) -> dict[str, Any]:
try:
return load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
# ---------------------------------------------------------------------------
# Tool catalog
# ---------------------------------------------------------------------------
@router.get(
"/tools",
response_model=list[ToolSummary],
summary="List MCP tools",
description="Proxies MCP `list_tools()` and returns name, description, and input schema for each tool.",
responses={502: {"description": "MCP transport error"}},
)
async def get_tools() -> list[ToolSummary]:
try:
tools = await list_mcp_tools()
except RuntimeError as exc:
logger.warning("Failed to fetch MCP tools: {}", exc)
raise HTTPException(status_code=502, detail=str(exc)) from exc
return [ToolSummary(**t) for t in tools]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _require_run(run_id: str) -> RunRecord:
record = get_registry().get(run_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Unknown run_id: {run_id}")
return record
def _snapshot_from_record(record: RunRecord) -> ScenarioRunResponse:
"""Build a partial ScenarioRunResponse for a still-running or pre-start run."""
steps: list[StepState] = []
for event in record.events:
if event.get("type") != "step_finished":
continue
steps.append(
StepState(
node_id=str(event.get("step_name", "")),
status=event.get("status", "failed"),
started_at=event.get("started_at"),
finished_at=event.get("finished_at"),
message=str(event.get("message", "")),
)
)
return ScenarioRunResponse(
scenario_id=record.scenario_id,
status=record.status,
message=record.message,
input=record.input,
steps=steps,
run_id=record.run_id,
)
async def _event_stream(record: RunRecord) -> AsyncIterator[bytes]:
"""Replay buffered events, then tail a fresh subscriber queue.
The snapshot/subscribe pair runs without any intervening ``await``, so no
emitted event can slip between the replay cutoff and the subscription.
Events emitted during replay land in the queue and are drained afterwards.
"""
queue: asyncio.Queue = asyncio.Queue()
buffered = list(record.events)
record.subscribers.append(queue)
try:
for event in buffered:
yield _format_sse(event)
if record.is_terminal():
return
while True:
event = await queue.get()
if event is None:
return
yield _format_sse(event)
finally:
if queue in record.subscribers:
record.subscribers.remove(queue)
def _format_sse(event: dict[str, Any]) -> bytes:
event_type = str(event.get("type", "message"))
payload = json.dumps(event, ensure_ascii=False)
return f"event: {event_type}\ndata: {payload}\n\n".encode("utf-8")
+40
View File
@@ -1,3 +1,9 @@
"""Thin async client for MCP tool invocation over streamable HTTP.
Opens a short-lived ``ClientSession`` per call, wraps the tool response in
a normalized dict, and raises ``RuntimeError`` on transport/tool errors.
"""
from __future__ import annotations from __future__ import annotations
from datetime import timedelta from datetime import timedelta
@@ -5,6 +11,7 @@ import json
import os import os
from typing import Any from typing import Any
from loguru import logger
from mcp import ClientSession from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client from mcp.client.streamable_http import streamablehttp_client
from mcp.types import TextContent from mcp.types import TextContent
@@ -33,8 +40,10 @@ async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str,
await session.initialize() await session.initialize()
result = await session.call_tool(tool_name, arguments) result = await session.call_tool(tool_name, arguments)
except TimeoutError as exc: except TimeoutError as exc:
logger.warning("MCP timeout: tool={}", tool_name)
raise RuntimeError(f"MCP timeout: {tool_name}") from exc raise RuntimeError(f"MCP timeout: {tool_name}") from exc
except Exception as exc: except Exception as exc:
logger.exception("MCP transport error: tool={}", tool_name)
raise RuntimeError(f"MCP transport error: {tool_name}") from exc raise RuntimeError(f"MCP transport error: {tool_name}") from exc
if result.isError: if result.isError:
@@ -54,3 +63,34 @@ async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str,
return parsed return parsed
raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}") raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}")
async def list_mcp_tools() -> list[dict[str, Any]]:
"""Fetch the MCP tool catalog as plain dicts for API serialization."""
try:
async with streamablehttp_client(url=_mcp_url()) as session_params:
read, write = session_params[0:2]
async with ClientSession(
read,
write,
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
) as session:
await session.initialize()
result = await session.list_tools()
except TimeoutError as exc:
logger.warning("MCP list_tools timeout")
raise RuntimeError("MCP list_tools timeout") from exc
except Exception as exc:
logger.exception("MCP list_tools transport error")
raise RuntimeError("MCP list_tools transport error") from exc
tools: list[dict[str, Any]] = []
for tool in result.tools:
tools.append(
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
}
)
return tools
+341 -496
View File
File diff suppressed because it is too large Load Diff
+16 -2
View File
@@ -1,5 +1,15 @@
"""Phoenix (Arize) OpenTelemetry tracing setup.
Tracing is initialized via the FastAPI lifespan so that import-time side effects
stay out of module load. ``is_phoenix_tracing_enabled`` is cheap and can be
consulted before the app starts (for example, to pass a flag into AgentOS).
"""
from __future__ import annotations
import os import os
from loguru import logger
from phoenix.otel import register from phoenix.otel import register
_initialized = False _initialized = False
@@ -12,11 +22,14 @@ def _env_bool(name: str, default: bool) -> bool:
return value.strip().lower() in {"1", "true", "yes", "on"} return value.strip().lower() in {"1", "true", "yes", "on"}
def is_phoenix_tracing_enabled() -> bool:
return _env_bool("PHOENIX_TRACING_ENABLED", False)
def init_phoenix_tracing() -> bool: def init_phoenix_tracing() -> bool:
global _initialized global _initialized
enabled = _env_bool("PHOENIX_TRACING_ENABLED", False) if not is_phoenix_tracing_enabled():
if not enabled:
return False return False
if _initialized: if _initialized:
@@ -33,4 +46,5 @@ def init_phoenix_tracing() -> bool:
auto_instrument=True, auto_instrument=True,
) )
_initialized = True _initialized = True
logger.info("Phoenix tracing initialized (project={})", project_name)
return True return True
+126
View File
@@ -0,0 +1,126 @@
"""In-memory registry of scenario runs and their event streams.
Each submitted run gets a ``RunRecord`` holding:
- mutable status / partial step state updated by the workflow runner;
- an append-only event log (used to replay history to late SSE subscribers);
- a live asyncio queue that SSE endpoints tail until a terminal ``None``
sentinel is delivered.
The registry is an LRU with ``RUN_REGISTRY_MAX_SIZE`` bound so long-running
processes do not leak run history.
"""
from __future__ import annotations
import asyncio
import os
import uuid
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from loguru import logger
from src.schemas import ScenarioRunResponse
_TERMINAL_STATUSES = {"success", "failed"}
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _env_int(name: str, default: int) -> int:
value = os.getenv(name)
return int(value) if value is not None else default
@dataclass
class RunRecord:
run_id: str
scenario_id: str
input: dict[str, Any]
status: str = "queued"
started_at: str = field(default_factory=_utc_now_iso)
finished_at: str | None = None
message: str = ""
response: ScenarioRunResponse | None = None
events: list[dict[str, Any]] = field(default_factory=list)
subscribers: list[asyncio.Queue] = field(default_factory=list)
task: asyncio.Task | None = None
def is_terminal(self) -> bool:
return self.status in _TERMINAL_STATUSES
class EventEmitter:
"""Fans events out to every live SSE subscriber plus the replay buffer.
Subscribers are ``asyncio.Queue`` instances registered by SSE endpoints;
``None`` is used as a terminal sentinel so consumers can exit cleanly.
"""
def __init__(self, record: RunRecord) -> None:
self._record = record
async def emit(self, event: dict[str, Any]) -> None:
self._record.events.append(event)
for queue in list(self._record.subscribers):
queue.put_nowait(event)
async def close(self) -> None:
for queue in list(self._record.subscribers):
queue.put_nowait(None)
class RunRegistry:
def __init__(self, max_size: int | None = None) -> None:
self._records: "OrderedDict[str, RunRecord]" = OrderedDict()
self._max_size = max_size or _env_int("RUN_REGISTRY_MAX_SIZE", 200)
def create(self, *, scenario_id: str, input_data: dict[str, Any]) -> RunRecord:
run_id = str(uuid.uuid4())
record = RunRecord(
run_id=run_id,
scenario_id=scenario_id,
input=input_data,
)
self._records[run_id] = record
self._evict_if_needed()
logger.info("Run {} created for scenario={}", run_id, scenario_id)
return record
def get(self, run_id: str) -> RunRecord | None:
record = self._records.get(run_id)
if record is not None:
self._records.move_to_end(run_id)
return record
def list_active(self) -> list[RunRecord]:
return [r for r in self._records.values() if not r.is_terminal()]
def _evict_if_needed(self) -> None:
while len(self._records) > self._max_size:
evicted_id, evicted = self._records.popitem(last=False)
if evicted.task is not None and not evicted.task.done():
# Refuse to silently drop an in-flight run — re-insert and stop.
self._records[evicted_id] = evicted
self._records.move_to_end(evicted_id, last=False)
logger.warning(
"Run registry at capacity {} but oldest run is still active; "
"not evicting {}",
self._max_size,
evicted_id,
)
return
logger.debug("Evicted run {} from registry", evicted_id)
_registry = RunRegistry()
def get_registry() -> RunRegistry:
return _registry
+27
View File
@@ -1,3 +1,10 @@
"""File-backed loader for scenario definitions.
Scenarios live under ``scenarios/`` and are indexed by ``scenarios/index.json``.
Each scenario is a JSON object with a ``scenario_id`` that must match the
index key it was looked up by.
"""
from __future__ import annotations from __future__ import annotations
import json import json
@@ -58,3 +65,23 @@ def load_scenario_definition(scenario_id: str) -> dict[str, Any]:
"Scenario file scenario_id does not match requested scenario_id" "Scenario file scenario_id does not match requested scenario_id"
) )
return scenario return scenario
def list_scenario_summaries() -> list[dict[str, Any]]:
"""Return metadata for every scenario in the index (no steps)."""
summaries: list[dict[str, Any]] = []
for scenario_id in load_scenario_index().keys():
try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError:
# Broken entry in the index should not take the whole catalog down.
continue
summaries.append(
{
"scenario_id": scenario_id,
"name": scenario.get("name"),
"description": scenario.get("description"),
"input_schema": scenario.get("input_schema"),
}
)
return summaries
+246 -8
View File
@@ -1,3 +1,5 @@
"""Pydantic schemas for the scenario-run REST API."""
from __future__ import annotations from __future__ import annotations
from typing import Any, Literal from typing import Any, Literal
@@ -5,36 +7,272 @@ from typing import Any, Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"] RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"] StepStatus = Literal["queued", "running", "success", "failed", "skipped", "waiting_human"]
EventType = Literal["run_started", "step_started", "step_finished", "run_finished"]
class RunError(BaseModel):
code: str
message: str
class ScenarioRunRequest(BaseModel): class ScenarioRunRequest(BaseModel):
scenario_id: str = "news_source_discovery_v1" scenario_id: str = "news_source_discovery_v1"
input: dict[str, Any] = Field(default_factory=dict) input: dict[str, Any] = Field(default_factory=dict)
model_config = {
"json_schema_extra": {
"examples": [
{
"scenario_id": "news_source_discovery_v1",
"input": {"url": "https://example.com/news/article"},
}
]
}
}
class StepState(BaseModel): class StepState(BaseModel):
node_id: str node_id: str
status: StepStatus status: StepStatus
started_at: str | None = None started_at: str | None = None
finished_at: str | None = None finished_at: str | None = None
error: RunError | None = None message: str = ""
model_config = {
"json_schema_extra": {
"examples": [
{
"node_id": "search_news_sources",
"status": "success",
"started_at": "2026-04-24T09:27:59.875680+00:00",
"finished_at": "2026-04-24T09:28:00.028730+00:00",
"message": "",
}
]
}
}
class ScenarioRunResponse(BaseModel): class ScenarioRunResponse(BaseModel):
scenario_id: str scenario_id: str
status: RunStatus status: RunStatus
message: str = ""
input: dict[str, Any] input: dict[str, Any]
steps: list[StepState] = Field(default_factory=list) steps: list[StepState] = Field(default_factory=list)
output_summary: str | None = None output_summary: str | None = None
scenario_name: str | None = None scenario_name: str | None = None
workflow_name: str | None = None workflow_name: str | None = None
result: dict[str, Any] | None = None result: dict[str, Any] | None = None
error: RunError | None = None
run_id: str | None = None run_id: str | None = None
session_id: str | None = None session_id: str | None = None
model_config = {
"json_schema_extra": {
"examples": [
{
"scenario_id": "news_source_discovery_v1",
"status": "success",
"message": "",
"input": {"url": "https://example.com/news/article"},
"steps": [
{
"node_id": "search_news_sources",
"status": "success",
"started_at": "2026-04-24T09:27:59.875680+00:00",
"finished_at": "2026-04-24T09:28:00.028730+00:00",
"message": "",
},
{
"node_id": "generate_summary",
"status": "success",
"started_at": "2026-04-24T09:28:00.781744+00:00",
"finished_at": "2026-04-24T09:28:00.879028+00:00",
"message": "",
},
],
"output_summary": "Самым ранним источником считается https://news-a.example/article-1",
"scenario_name": "News Source Discovery V1",
"workflow_name": "news_source_discovery_v1",
"result": {
"ok": True,
"tool_name": "generate_summary",
"payload": {
"input_count": 3,
"summary": "Самым ранним источником считается https://news-a.example/article-1",
},
},
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"session_id": None,
}
]
}
}
class RunSubmitResponse(BaseModel):
run_id: str
scenario_id: str
status: RunStatus
input: dict[str, Any]
started_at: str
model_config = {
"json_schema_extra": {
"examples": [
{
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"scenario_id": "news_source_discovery_v1",
"status": "queued",
"input": {"url": "https://example.com/news/article"},
"started_at": "2026-04-24T09:27:59.873049+00:00",
}
]
}
}
class ScenarioSummary(BaseModel):
scenario_id: str
name: str | None = None
description: str | None = None
input_schema: dict[str, Any] | None = None
model_config = {
"json_schema_extra": {
"examples": [
{
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": ["url"],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article",
}
},
},
}
]
}
}
class ToolSummary(BaseModel):
name: str
description: str | None = None
input_schema: dict[str, Any] | None = None
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "search_news_sources",
"description": "Search for candidate news source URLs for a given article.",
"input_schema": {
"type": "object",
"required": ["url"],
"properties": {
"url": {"type": "string", "title": "Url"}
},
"title": "search_news_sourcesArguments",
},
}
]
}
}
# ---------------------------------------------------------------------------
# SSE event models. Client parses by the `type` field.
# ---------------------------------------------------------------------------
class RunStartedEvent(BaseModel):
type: Literal["run_started"] = "run_started"
run_id: str
scenario_id: str
started_at: str
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "run_started",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"scenario_id": "news_source_discovery_v1",
"started_at": "2026-04-24T09:27:59.873397+00:00",
}
]
}
}
class StepStartedEvent(BaseModel):
type: Literal["step_started"] = "step_started"
run_id: str
step_name: str
index: int
started_at: str
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "step_started",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"step_name": "search_news_sources",
"index": 0,
"started_at": "2026-04-24T09:27:59.875680+00:00",
}
]
}
}
class StepFinishedEvent(BaseModel):
type: Literal["step_finished"] = "step_finished"
run_id: str
step_name: str
index: int
status: StepStatus
started_at: str | None = None
finished_at: str | None = None
message: str = ""
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "step_finished",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"step_name": "search_news_sources",
"index": 0,
"status": "success",
"started_at": "2026-04-24T09:27:59.875680+00:00",
"finished_at": "2026-04-24T09:28:00.028730+00:00",
"message": "",
}
]
}
}
class RunFinishedEvent(BaseModel):
type: Literal["run_finished"] = "run_finished"
run_id: str
status: RunStatus
finished_at: str
message: str = ""
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "run_finished",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"status": "success",
"finished_at": "2026-04-24T09:28:01.750206+00:00",
"message": "",
}
]
}
}
+145
View File
@@ -0,0 +1,145 @@
"""LLM-backed fallback planner for MCP tool arguments.
When a step's resolved arguments are missing required fields, this module
calls an OpenAI-compatible chat completion to fill them from the current
scope (``input`` + prior ``steps``). The planner is best-effort: on any
failure it returns the base arguments unchanged so the caller's validator
can produce a clean error.
"""
from __future__ import annotations
from copy import deepcopy
import json
import os
from typing import Any
from loguru import logger
from openai import AsyncOpenAI
_planner_client: AsyncOpenAI | None = None
def _env_float(name: str, default: float) -> float:
value = os.getenv(name)
if value is None:
return default
return float(value)
def planner_enabled() -> bool:
return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"}
def _get_client() -> AsyncOpenAI:
global _planner_client
if _planner_client is not None:
return _planner_client
_planner_client = AsyncOpenAI(
base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"),
api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"),
)
return _planner_client
def _response_schema(required_fields: list[str]) -> dict[str, Any]:
value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]}
return {
"name": "mcp_arguments",
"strict": True,
"schema": {
"type": "object",
"properties": {
"arguments": {
"type": "object",
"properties": {f: value_schema for f in required_fields},
"required": required_fields,
"additionalProperties": True,
}
},
"required": ["arguments"],
"additionalProperties": False,
},
}
def _extract_arguments(content: Any) -> dict[str, Any]:
candidate: Any = content
if isinstance(candidate, str):
text = candidate.strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.startswith("json"):
text = text[4:].strip()
try:
candidate = json.loads(text)
except json.JSONDecodeError:
return {}
if isinstance(candidate, dict):
if isinstance(candidate.get("arguments"), dict):
return candidate["arguments"]
return candidate
return {}
async def plan_arguments(
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
missing_fields: list[str],
attempt_no: int,
) -> dict[str, Any]:
"""Fallback planner: asks an LLM to fill missing required fields from context.
Returns merged arguments (base + planned). On any failure returns base_arguments
unchanged — caller is responsible for validating required fields afterwards.
"""
prompt = {
"task": "Prepare MCP arguments for this step.",
"step_name": step_name,
"tool_name": tool_name,
"required_fields": required_fields,
"base_arguments": base_arguments,
"missing_fields": missing_fields,
"repair_attempt": attempt_no,
"context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})},
"output": (
"Return only JSON object with key 'arguments'. "
"Fill every missing field from context."
),
}
try:
completion = await _get_client().chat.completions.create(
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
messages=[
{
"role": "system",
"content": (
"You are a tool-input planner. "
"Return only JSON that matches the provided schema."
),
},
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
],
response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)},
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
)
raw = completion.choices[0].message.content if completion.choices else ""
planned = _extract_arguments(raw)
except Exception:
logger.warning(
"Planner call failed for step={} tool={} attempt={}",
step_name,
tool_name,
attempt_no,
)
planned = {}
merged = deepcopy(base_arguments)
if isinstance(planned, dict):
merged.update(planned)
return merged
+58
View File
@@ -0,0 +1,58 @@
"""Variable templating for scenario step inputs.
A dict of shape ``{"from": "path.to.value"}`` resolves to the value at that
dotted path in the current scope. Nested dicts/lists are resolved
recursively; plain values pass through via ``deepcopy``.
"""
from __future__ import annotations
from copy import deepcopy
from typing import Any
def resolve_path(scope: dict[str, Any], path: str) -> Any:
value: Any = scope
for segment in path.split("."):
key = segment.strip()
if not key:
continue
if not isinstance(value, dict):
return None
value = value.get(key)
return deepcopy(value)
def resolve_template(template: Any, scope: dict[str, Any]) -> Any:
if isinstance(template, dict):
if set(template.keys()) == {"from"}:
return resolve_path(scope, str(template["from"]))
return {key: resolve_template(value, scope) for key, value in template.items()}
if isinstance(template, list):
return [resolve_template(item, scope) for item in template]
return deepcopy(template)
def missing_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
) -> list[str]:
missing: list[str] = []
for field in required_fields:
value = arguments.get(field)
if isinstance(value, str) and value.strip():
continue
if value not in (None, "", [], {}):
continue
missing.append(field)
return missing
def validate_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
step_name: str,
) -> None:
missing = missing_required_fields(arguments, required_fields)
if missing:
raise ValueError(f"{step_name}: missing required fields: {', '.join(missing)}")