Compare commits

..

13 Commits

Author SHA1 Message Date
Barabashka 2a81f5f58f Async выполнение сценариев с SSE-прогрессом + каталоги tools/scenarios
POST /api/runs теперь планирует исполнение в фоновой asyncio.Task и
   возвращает run_id (202 Accepted) — UI больше не блокируется на время
   всего workflow.

   Новый модуль src/run_registry.py держит in-memory LRU (лимит
   RUN_REGISTRY_MAX_SIZE, default 200) с RunRecord на каждый запуск:
   append-only буфер событий для replay + список подписчиков-очередей
   для live tail. EventEmitter пишет в буфер и фан-аутит по очередям.

   Новые endpoints:
   - GET /api/runs/{run_id}           снапшот состояния (частичный для running)
   - GET /api/runs/{run_id}/events    SSE: run_started, step_started,
                                      step_finished, run_finished
   - GET /api/scenarios               список сценариев с метаданными
   - GET /api/scenarios/{id}          полное определение для UI-графа
   - GET /api/tools                   проксирование MCP list_tools

   mcp_workflow_runner дополнен хуком emitter'а в session_state и
   обёрткой run_scenario_async, которая управляет лайфсайклом RunRecord:
   queued → running → success/failed + terminal sentinel в очереди
   подписчиков. На shutdown lifespan отменяет активные таски.

   Все модели в schemas.py и dict-endpoints получили реалистичные
   examples для /docs вместо дефолтного additionalProp1.
2026-04-24 12:40:49 +03:00
Barabashka 3357b3c4dd Усилить надёжность: логирование, lifespan, LRU-кэш и fail-fast семантика
Подключить loguru и заменить молчаливые except на warning/exception

в step_planner, mcp_client и mcp_workflow_runner — раньше ошибки

терялись в пустых дикт-возвратах.\n

Перенести Phoenix tracing из module-level в FastAPI lifespan, чтобы

импорт agent_os не поднимал трейсер в тестах и тулах.\n

Заменить неограниченный dict _workflow_cache на OrderedDict-LRU

с лимитом WORKFLOW_CACHE_MAX_SIZE (default 64) — чтобы кэш не рос

бесконечно при разных scenario_id.\n

Зафиксировать инвариант fail-fast: шаги, не дошедшие до исполнения

из-за падения upstream, возвращаются со статусом skipped (для UI),

а не queued; run помечается success только если все payload.ok.\n

Добавить module docstrings во все модули src/ по STYLE_GUIDE cookbook.

Запинить версии зависимостей в requirements.txt.
2026-04-24 12:00:00 +03:00
Barabashka 4d037e52eb Упрощение MCP workflow runner и обновить контракт /api/runs.
Перенесены planner/template хелперы в отдельные модули, выровнен формат статусов и сообщений в ответе, а также обновлены .env.example и README под текущие переменные и поведение API.
2026-04-23 12:41:33 +03:00
Barabashka 5ca49821ba Промежуточный вариант: ужесточить planner recovery и fail-fast workflow.
Перевел планирование аргументов на строгий json_schema response_format, добавил сценарий с битыми полями для проверки восстановления и остановку workflow на первой ошибке шага. Сейчас используется Polza.ai.
2026-04-22 17:45:17 +03:00
Barabashka ad828885e3 Перевести исполнение сценариев на MCP workflow runner.
Удален legacy workflow_runner со stub-инструментами, добавлен mcp_client и новый mcp_workflow_runner с planner-моделью через polza.ai, обновлены сценарий, API/AgentOS wiring и документация под текущий контур запуска.
2026-04-22 16:37:17 +03:00
Barabashka 93ee7aea1c Унифицировать ответ /api/runs и добавить статусы шагов workflow.
Введен единый JSON-контракт для success/failed с общими полями, добавлен трекинг step status (queued/running/success/failed) и output_summary, а сборка run-ответа централизована через общий helper.
2026-04-22 12:28:47 +03:00
Barabashka 9068b7fe07 Удалить CLI entrypoint и оставить HTTP-only запуск через AgentOS.
Убран неиспользуемый run_agent/main.py и обновлен README, чтобы запуск и документация соответствовали текущей FastAPI архитектуре.
2026-04-21 17:42:50 +03:00
Barabashka 0fbd7dce1a Добавить FastAPI endpoint запуска сценария через AgentOS base_app.
Подключен верхний HTTP-слой с POST /api/runs и обновлены схемы/README, чтобы запуск сценариев шел через единый API-контракт поверх Agno workflow.
2026-04-21 17:38:03 +03:00
Barabashka d341941f87 Улучшить валидацию входа workflow и вынести схемы ответов.
Подключена pydantic-валидация input_schema для сценария, а модели успешного и ошибочного результата запуска вынесены в отдельный модуль для более явных boundary-контрактов.
2026-04-21 17:21:35 +03:00
Barabashka 2b0c748474 Вынести сценарий в JSON и добавить динамический loader.
Переключает запуск workflow на загрузку сценария из файлового хранилища по scenario_id и собирает шаги выполнения из definition.steps вместо хардкода в раннере.
2026-04-21 17:08:20 +03:00
Barabashka 2111964d8b Добавить MVP workflow запуска сценария поиска первоисточника.
Подключает stub-инструменты и последовательный Agno workflow в CLI и AgentOS, чтобы запускать сценарий по URL и получать структурированный JSON-результат.
2026-04-21 16:24:52 +03:00
Barabashka d22db07b43 Подключить локальную трассировку Phoenix для запусков агента.
Добавлена инициализация Phoenix/OpenInference в CLI и AgentOS, а также обновлены зависимости и документация, чтобы трассировка включалась через переменные окружения.
2026-04-21 15:10:27 +03:00
Barabashka 196e9aaf27 Обновление .gitignore 2026-04-21 13:40:36 +03:00
19 changed files with 2110 additions and 87 deletions
+27 -3
View File
@@ -1,9 +1,33 @@
# Agent
AGENT_ID=prisma-agent AGENT_ID=prisma-agent
AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
# Agent model (Ollama)
OLLAMA_MODEL_ID=gemma4:31b OLLAMA_MODEL_ID=gemma4:31b
OLLAMA_HOST=http://localhost:11435 OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0 OLLAMA_TEMPERATURE=0
AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true # API runtime
AGENT_INSTRUCTIONS=You are a helpful assistant. Answer briefly and clearly.
AGENT_OS_HOST=127.0.0.1 AGENT_OS_HOST=127.0.0.1
AGENT_OS_PORT=7777 AGENT_OS_PORT=7777
# Planner
PLANNER_ENABLED=false
PLANNER_REPAIR_ATTEMPTS=3
# Planner model (Polza)
POLZA_BASE_URL=https://api.polza.ai/v1
POLZA_MODEL_ID=google/gemma-4-31b-it
POLZA_API_KEY=key
POLZA_TEMPERATURE=0
# MCP
MCP_BASE_URL=http://127.0.0.1:8081/mcp
MCP_TIMEOUT_SECONDS=10
# Observability (Phoenix)
PHOENIX_TRACING_ENABLED=false
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform
+4
View File
@@ -25,3 +25,7 @@ dist/
.vscode/ .vscode/
.DS_Store .DS_Store
.cursor .cursor
.claude
# Cookbook code
vendor/agno/cookbook/
+143 -27
View File
@@ -1,8 +1,19 @@
# Prisma Platform MVP # Prisma Platform MVP
Минимальный чат-агент на Agno + Ollama с рантаймом AgentOS. MVP-реализация сценарного раннера на Agno AgentOS.
В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn). Текущая схема исполнения:
- сценарий хранится в `scenarios/*.json`;
- исполнение идет через `src/mcp_workflow_runner.py`;
- каждый шаг вызывает MCP инструмент через `src/mcp_client.py`;
- для подготовки аргументов шага используется planner-агент с моделью через `polza.ai`.
## Требования
- Python 3.10+
- MCP endpoint (по умолчанию `http://127.0.0.1:8081/mcp`)
- доступ к модели через `polza.ai` (`POLZA_API_KEY`)
## Текущая структура ## Текущая структура
@@ -11,11 +22,23 @@ prisma_platform/
├── .env ├── .env
├── .env.example ├── .env.example
├── requirements.txt ├── requirements.txt
├── scenarios/
│ ├── index.json
│ └── news_source_discovery/
│ ├── v1.json
│ └── v1_planner_repair.json
└── src/ └── src/
├── __init__.py ├── __init__.py
├── api_routes.py
├── agent_os.py ├── agent_os.py
├── agent_runner.py ├── agent_runner.py
── main.py ── mcp_client.py
├── mcp_workflow_runner.py
├── observability.py
├── scenario_store.py
├── step_planner.py
├── template.py
└── schemas.py
``` ```
## Установка ## Установка
@@ -29,44 +52,137 @@ cp .env.example .env
## Запуск ## Запуск
Интерактивный режим чата: 1) Поднимите MCP stub (из соседнего репозитория):
```bash ```bash
python -m src.main cd /home/worker/projects/docker-service/mcp-stub
docker compose up --build -d
``` ```
Режим одного сообщения: 2) Запустите сервер AgentOS:
```bash ```bash
python -m src.main --message "Привет, что ты умеешь?" cd /home/worker/projects/prisma_platform
.venv/bin/python -m src.agent_os
``` ```
## Запуск AgentOS По умолчанию приложение доступно на `http://127.0.0.1:7777`.
Запуск сервера AgentOS: Документация API:
```bash
python -m src.agent_os
```
По умолчанию AgentOS работает на `http://127.0.0.1:7777`.
Документация API доступна по адресам:
- `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/docs`
- `http://127.0.0.1:7777/redoc` - `http://127.0.0.1:7777/redoc`
## HTTP API
### Запуск сценария (async)
`POST /api/runs` — планирует выполнение сценария и **сразу** возвращает `run_id`. Само выполнение идёт в фоне.
```bash
curl -s -X POST "http://127.0.0.1:7777/api/runs" \
-H "Content-Type: application/json" \
-d '{
"scenario_id": "news_source_discovery_v1",
"input": { "url": "https://example.com/news" }
}'
```
Ответ (`202 Accepted`):
```json
{
"run_id": "f3d9…",
"scenario_id": "news_source_discovery_v1",
"status": "queued",
"input": { "url": "..." },
"started_at": "2026-04-24T..."
}
```
### Снапшот состояния
`GET /api/runs/{run_id}` — текущее состояние: `status` (`queued|running|success|failed`), список `steps` со статусами (`success|failed|skipped|queued`), `result` и `output_summary` при завершении.
### Live-прогресс (SSE)
`GET /api/runs/{run_id}/events` — Server-Sent Events. Поздние подписчики получают replay уже накопленных событий, затем tail до завершения.
```bash
curl -N http://127.0.0.1:7777/api/runs/$RUN_ID/events
```
Типы событий:
- `run_started``{run_id, scenario_id, started_at}`
- `step_started``{run_id, step_name, index, started_at}`
- `step_finished``{run_id, step_name, index, status, started_at, finished_at, message}`
- `run_finished``{run_id, status, finished_at, message}` (терминальное, поток закрывается)
### Каталоги
- `GET /api/scenarios` — список сценариев с метаданными (`scenario_id`, `name`, `description`, `input_schema`).
- `GET /api/scenarios/{scenario_id}` — полное определение сценария (для визуализации графа в UI).
- `GET /api/tools` — MCP tool catalog: `[{name, description, input_schema}]` (проксируется на `MCP_BASE_URL`).
## Переменные окружения ## Переменные окружения
Основные переменные: Agent:
- `AGENT_ID` (по умолчанию: `prisma-agent`) - `AGENT_ID` (default: `prisma-agent`)
- `OLLAMA_MODEL_ID` (по умолчанию: `gemma4:31b`) - `AGENT_MARKDOWN` (default: `false`)
- `OLLAMA_HOST` (по умолчанию: `http://localhost:11435`) - `AGENT_DEBUG_MODE` (default: `true`)
- `OLLAMA_TEMPERATURE` (по умолчанию: `0`) - `AGENT_INSTRUCTIONS`
- `AGENT_MARKDOWN` (по умолчанию: `false`) - `OLLAMA_MODEL_ID` (default: `gemma4:31b`)
- `AGENT_DEBUG_MODE` (по умолчанию: `true`) - `OLLAMA_HOST` (default: `http://localhost:11435`)
- `AGENT_INSTRUCTIONS` (по умолчанию: `You are a helpful assistant. Answer briefly and clearly.`) - `OLLAMA_TEMPERATURE` (default: `0`)
- `AGENT_OS_HOST` (по умолчанию: `127.0.0.1`)
- `AGENT_OS_PORT` (по умолчанию: `7777`) API runtime:
- `AGENT_OS_HOST` (default: `127.0.0.1`)
- `AGENT_OS_PORT` (default: `7777`)
Planner:
- `PLANNER_ENABLED` (default: `false`)
- `PLANNER_REPAIR_ATTEMPTS` (default: `3`)
Planner model (`polza.ai`):
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
- `POLZA_API_KEY` (required)
- `POLZA_TEMPERATURE` (default: `0`)
MCP:
- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`)
- `MCP_TIMEOUT_SECONDS` (default: `10`)
Runtime caches:
- `WORKFLOW_CACHE_MAX_SIZE` (default: `64`) — лимит LRU кэша построенных workflow.
- `RUN_REGISTRY_MAX_SIZE` (default: `200`) — лимит LRU истории run'ов в памяти.
Phoenix tracing:
- `PHOENIX_TRACING_ENABLED` (default: `false`)
- `PHOENIX_COLLECTOR_ENDPOINT` (default: `http://localhost:6006`)
- `PHOENIX_PROJECT_NAME` (default: `prisma-platform`)
## Phoenix трассировка (локально)
1) Включите трассировку в `.env`:
```dotenv
PHOENIX_TRACING_ENABLED=true
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform
```
2) Запустите приложение:
```bash
.venv/bin/python -m src.agent_os
```
+10 -7
View File
@@ -1,7 +1,10 @@
agno agno==2.5.17
fastapi fastapi==0.136.0
uvicorn uvicorn==0.44.0
python-dotenv python-dotenv==1.2.2
ollama ollama==0.6.1
socksio socksio==1.0.0
openai openai==2.32.0
arize-phoenix-otel==0.15.0
openinference-instrumentation-agno==0.1.30
loguru==0.7.3
+6
View File
@@ -0,0 +1,6 @@
{
"scenarios": {
"news_source_discovery_v1": "news_source_discovery/v1.json",
"news_source_discovery_v1_planner_repair": "news_source_discovery/v1_planner_repair.json"
}
}
+105
View File
@@ -0,0 +1,105 @@
{
"schema_version": "1",
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": [
"url"
],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article"
}
}
},
"steps": [
{
"name": "search_news_sources",
"type": "tool",
"tool": "search_news_sources",
"input": {
"url": {
"from": "input.url"
}
},
"required_input_fields": [
"url"
]
},
{
"name": "parse_articles_batch",
"type": "tool",
"tool": "parse_article",
"foreach": {
"from": "steps.search_news_sources.payload.items",
"as": "item"
},
"input": {
"url": {
"from": "item.url"
}
},
"collect": {
"url": {
"from": "tool.payload.url"
},
"title": {
"from": "tool.payload.title"
},
"text": {
"from": "tool.payload.text"
}
},
"collect_key": "items"
},
{
"name": "extract_publication_date_batch",
"type": "tool",
"tool": "extract_publication_date",
"foreach": {
"from": "steps.parse_articles_batch.payload.items",
"as": "item"
},
"input": {
"article_text": {
"from": "item.text"
}
},
"collect": {
"url": {
"from": "item.url"
},
"title": {
"from": "item.title"
},
"published_at": {
"from": "tool.payload.published_at"
}
},
"collect_key": "items"
},
{
"name": "rank_sources_by_date",
"type": "tool",
"tool": "rank_sources_by_date",
"input": {
"items": {
"from": "steps.extract_publication_date_batch.payload.items"
}
}
},
{
"name": "generate_summary",
"type": "tool",
"tool": "generate_summary",
"input": {
"items": {
"from": "steps.rank_sources_by_date.payload.ranked_items"
}
}
}
]
}
@@ -0,0 +1,117 @@
{
"schema_version": "1",
"scenario_id": "news_source_discovery_v1_planner_repair",
"name": "News Source Discovery V1 Planner Repair",
"description": "Test scenario with intentionally wrong input paths repaired by planner.",
"input_schema": {
"type": "object",
"required": [
"url"
],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article"
}
}
},
"steps": [
{
"name": "search_news_sources",
"type": "tool",
"tool": "search_news_sources",
"input": {
"url": {
"from": "input.url"
}
},
"required_input_fields": [
"url"
]
},
{
"name": "parse_articles_batch",
"type": "tool",
"tool": "parse_article",
"foreach": {
"from": "steps.search_news_sources.payload.items",
"as": "item"
},
"input": {
"url": {
"from": "item.link"
}
},
"required_input_fields": [
"url"
],
"collect": {
"url": {
"from": "tool.payload.url"
},
"title": {
"from": "tool.payload.title"
},
"text": {
"from": "tool.payload.text"
}
},
"collect_key": "items"
},
{
"name": "extract_publication_date_batch",
"type": "tool",
"tool": "extract_publication_date",
"foreach": {
"from": "steps.parse_articles_batch.payload.items",
"as": "item"
},
"input": {
"article_text": {
"from": "item.body"
}
},
"required_input_fields": [
"article_text"
],
"collect": {
"url": {
"from": "item.url"
},
"title": {
"from": "item.title"
},
"published_at": {
"from": "tool.payload.published_at"
}
},
"collect_key": "items"
},
{
"name": "rank_sources_by_date",
"type": "tool",
"tool": "rank_sources_by_date",
"input": {
"items": {
"from": "steps.extract_publication_date_batch.payload.items"
}
},
"required_input_fields": [
"items"
]
},
{
"name": "generate_summary",
"type": "tool",
"tool": "generate_summary",
"input": {
"items": {
"from": "steps.rank_sources_by_date.payload.items_ranked_typo"
}
},
"required_input_fields": [
"items"
]
}
]
}
+42 -1
View File
@@ -1,15 +1,56 @@
"""AgentOS entrypoint: wires the agent, REST routes and FastAPI lifespan.
Phoenix tracing is initialized from the lifespan (not at import time) so that
importing this module for tooling or tests does not spin up the tracer.
"""
from __future__ import annotations
import os import os
from contextlib import asynccontextmanager
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI
from loguru import logger
from agno.os import AgentOS from agno.os import AgentOS
from src.agent_runner import get_agent from src.agent_runner import get_agent
from src.api_routes import router as api_router
from src.observability import init_phoenix_tracing, is_phoenix_tracing_enabled
from src.run_registry import get_registry
load_dotenv() load_dotenv()
@asynccontextmanager
async def _lifespan(_app: FastAPI):
init_phoenix_tracing()
logger.info("Prisma Platform API starting up")
try:
yield
finally:
active = get_registry().list_active()
if active:
logger.info("Cancelling {} active run(s) on shutdown", len(active))
for record in active:
if record.task is not None and not record.task.done():
record.task.cancel()
logger.info("Prisma Platform API shutting down")
_agent = get_agent() _agent = get_agent()
_agent_os = AgentOS(agents=[_agent]) _base_app = FastAPI(
title="Prisma Platform API",
version="0.1.0",
lifespan=_lifespan,
)
_base_app.include_router(api_router)
_agent_os = AgentOS(
agents=[_agent],
tracing=is_phoenix_tracing_enabled(),
base_app=_base_app,
)
app = _agent_os.get_app() app = _agent_os.get_app()
+8 -6
View File
@@ -1,3 +1,11 @@
"""Lazy factory for the top-level Prisma agent.
Config is read from environment variables so the same module can be used by
the API server, CLI tools and tests without re-wiring.
"""
from __future__ import annotations
import os import os
from agno.agent import Agent from agno.agent import Agent
@@ -47,9 +55,3 @@ def get_agent() -> Agent:
debug_mode=debug_mode, debug_mode=debug_mode,
) )
return _agent return _agent
async def run_agent(message: str) -> str:
agent = get_agent()
response = await agent.arun(message)
return str(response.content)
+279
View File
@@ -0,0 +1,279 @@
"""REST routes for scenario execution, catalogs and live run events.
Runs are executed asynchronously: ``POST /api/runs`` schedules a background
task and returns immediately with a ``run_id``. Clients consume progress
via ``GET /api/runs/{run_id}/events`` (SSE) or poll
``GET /api/runs/{run_id}`` for a snapshot.
"""
from __future__ import annotations
import asyncio
import json
from typing import Any, AsyncIterator
from fastapi import APIRouter, HTTPException
from fastapi.responses import StreamingResponse
from loguru import logger
from src.mcp_client import list_mcp_tools
from src.mcp_workflow_runner import run_scenario_async
from src.run_registry import RunRecord, get_registry
from src.scenario_store import (
ScenarioStoreError,
list_scenario_summaries,
load_scenario_definition,
)
from src.schemas import (
RunSubmitResponse,
ScenarioRunRequest,
ScenarioRunResponse,
ScenarioSummary,
StepState,
ToolSummary,
)
router = APIRouter(prefix="/api", tags=["workflow"])
# ---------------------------------------------------------------------------
# Runs
# ---------------------------------------------------------------------------
@router.post(
"/runs",
response_model=RunSubmitResponse,
status_code=202,
summary="Schedule a scenario run",
description=(
"Creates a run record and schedules execution in the background. "
"Returns immediately with a `run_id`; poll `GET /api/runs/{run_id}` "
"or subscribe to `GET /api/runs/{run_id}/events` for progress."
),
)
async def post_run(request: ScenarioRunRequest) -> RunSubmitResponse:
registry = get_registry()
record = registry.create(
scenario_id=request.scenario_id,
input_data=request.input,
)
record.task = asyncio.create_task(run_scenario_async(record))
return RunSubmitResponse(
run_id=record.run_id,
scenario_id=record.scenario_id,
status=record.status,
input=record.input,
started_at=record.started_at,
)
@router.get(
"/runs/{run_id}",
response_model=ScenarioRunResponse,
summary="Get run snapshot",
description=(
"Returns the current state of a run. For running runs the `steps` "
"list reflects progress so far; for terminal runs it is complete."
),
responses={404: {"description": "Unknown run_id"}},
)
async def get_run(run_id: str) -> ScenarioRunResponse:
record = _require_run(run_id)
if record.response is not None:
return record.response
return _snapshot_from_record(record)
@router.get(
"/runs/{run_id}/events",
summary="Live run progress (SSE)",
description=(
"Server-Sent Events stream. Late subscribers receive a replay of "
"buffered events first, then tail new events until `run_finished`.\n\n"
"Event types: `run_started`, `step_started`, `step_finished`, "
"`run_finished`. Each event is JSON in the SSE `data:` field."
),
responses={
200: {
"description": "SSE stream of run events",
"content": {
"text/event-stream": {
"example": (
"event: run_started\n"
'data: {"type":"run_started","run_id":"76d6903c-f520-4a40-b0fc-8fed3f7955d2",'
'"scenario_id":"news_source_discovery_v1","started_at":"2026-04-24T09:27:59.873+00:00"}\n\n'
"event: step_started\n"
'data: {"type":"step_started","run_id":"76d6903c-...","step_name":"search_news_sources",'
'"index":0,"started_at":"2026-04-24T09:27:59.875+00:00"}\n\n'
"event: step_finished\n"
'data: {"type":"step_finished","run_id":"76d6903c-...","step_name":"search_news_sources",'
'"index":0,"status":"success","started_at":"2026-04-24T09:27:59.875+00:00",'
'"finished_at":"2026-04-24T09:28:00.028+00:00","message":""}\n\n'
"event: run_finished\n"
'data: {"type":"run_finished","run_id":"76d6903c-...","status":"success",'
'"finished_at":"2026-04-24T09:28:01.750+00:00","message":""}\n\n'
)
}
},
},
404: {"description": "Unknown run_id"},
},
)
async def get_run_events(run_id: str) -> StreamingResponse:
record = _require_run(run_id)
return StreamingResponse(
_event_stream(record),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
# ---------------------------------------------------------------------------
# Scenario catalog
# ---------------------------------------------------------------------------
@router.get(
"/scenarios",
response_model=list[ScenarioSummary],
summary="List available scenarios",
description="Returns metadata (id, name, description, input schema) for every scenario in the index.",
)
async def get_scenarios() -> list[ScenarioSummary]:
return [ScenarioSummary(**s) for s in list_scenario_summaries()]
_SCENARIO_DEFINITION_EXAMPLE: dict[str, Any] = {
"schema_version": "1",
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": ["url"],
"properties": {
"url": {"type": "string", "description": "URL of source news article"}
},
},
"steps": [
{
"name": "search_news_sources",
"type": "tool",
"tool": "search_news_sources",
"input": {"url": {"from": "input.url"}},
"required_input_fields": ["url"],
}
],
}
@router.get(
"/scenarios/{scenario_id}",
summary="Get full scenario definition",
description="Returns the raw scenario JSON (including the `steps` graph) for UI visualization.",
responses={
200: {
"description": "Scenario definition",
"content": {"application/json": {"example": _SCENARIO_DEFINITION_EXAMPLE}},
},
404: {"description": "Unknown scenario_id"},
},
)
async def get_scenario(scenario_id: str) -> dict[str, Any]:
try:
return load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
# ---------------------------------------------------------------------------
# Tool catalog
# ---------------------------------------------------------------------------
@router.get(
"/tools",
response_model=list[ToolSummary],
summary="List MCP tools",
description="Proxies MCP `list_tools()` and returns name, description, and input schema for each tool.",
responses={502: {"description": "MCP transport error"}},
)
async def get_tools() -> list[ToolSummary]:
try:
tools = await list_mcp_tools()
except RuntimeError as exc:
logger.warning("Failed to fetch MCP tools: {}", exc)
raise HTTPException(status_code=502, detail=str(exc)) from exc
return [ToolSummary(**t) for t in tools]
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _require_run(run_id: str) -> RunRecord:
record = get_registry().get(run_id)
if record is None:
raise HTTPException(status_code=404, detail=f"Unknown run_id: {run_id}")
return record
def _snapshot_from_record(record: RunRecord) -> ScenarioRunResponse:
"""Build a partial ScenarioRunResponse for a still-running or pre-start run."""
steps: list[StepState] = []
for event in record.events:
if event.get("type") != "step_finished":
continue
steps.append(
StepState(
node_id=str(event.get("step_name", "")),
status=event.get("status", "failed"),
started_at=event.get("started_at"),
finished_at=event.get("finished_at"),
message=str(event.get("message", "")),
)
)
return ScenarioRunResponse(
scenario_id=record.scenario_id,
status=record.status,
message=record.message,
input=record.input,
steps=steps,
run_id=record.run_id,
)
async def _event_stream(record: RunRecord) -> AsyncIterator[bytes]:
"""Replay buffered events, then tail a fresh subscriber queue.
The snapshot/subscribe pair runs without any intervening ``await``, so no
emitted event can slip between the replay cutoff and the subscription.
Events emitted during replay land in the queue and are drained afterwards.
"""
queue: asyncio.Queue = asyncio.Queue()
buffered = list(record.events)
record.subscribers.append(queue)
try:
for event in buffered:
yield _format_sse(event)
if record.is_terminal():
return
while True:
event = await queue.get()
if event is None:
return
yield _format_sse(event)
finally:
if queue in record.subscribers:
record.subscribers.remove(queue)
def _format_sse(event: dict[str, Any]) -> bytes:
event_type = str(event.get("type", "message"))
payload = json.dumps(event, ensure_ascii=False)
return f"event: {event_type}\ndata: {payload}\n\n".encode("utf-8")
-43
View File
@@ -1,43 +0,0 @@
import argparse
import asyncio
from dotenv import load_dotenv
from src.agent_runner import run_agent
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run base chat agent.",
)
parser.add_argument(
"--message",
help="Single message mode. If omitted, starts interactive chat.",
)
return parser
async def _main() -> None:
load_dotenv()
args = build_parser().parse_args()
if args.message:
result = await run_agent(args.message)
print(result)
return
print("Chat mode started. Type 'exit' or 'quit' to stop.")
while True:
user_message = input("you> ").strip()
if not user_message:
continue
if user_message.lower() in {"exit", "quit"}:
print("Bye.")
break
result = await run_agent(user_message)
print(f"agent> {result}")
if __name__ == "__main__":
asyncio.run(_main())
+96
View File
@@ -0,0 +1,96 @@
"""Thin async client for MCP tool invocation over streamable HTTP.
Opens a short-lived ``ClientSession`` per call, wraps the tool response in
a normalized dict, and raises ``RuntimeError`` on transport/tool errors.
"""
from __future__ import annotations
from datetime import timedelta
import json
import os
from typing import Any
from loguru import logger
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import TextContent
def _mcp_url() -> str:
return os.getenv("MCP_BASE_URL", "http://127.0.0.1:8081/mcp")
def _timeout_seconds() -> float:
value = os.getenv("MCP_TIMEOUT_SECONDS")
if value is None:
return 10.0
return float(value)
async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
try:
async with streamablehttp_client(url=_mcp_url()) as session_params:
read, write = session_params[0:2]
async with ClientSession(
read,
write,
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
) as session:
await session.initialize()
result = await session.call_tool(tool_name, arguments)
except TimeoutError as exc:
logger.warning("MCP timeout: tool={}", tool_name)
raise RuntimeError(f"MCP timeout: {tool_name}") from exc
except Exception as exc:
logger.exception("MCP transport error: tool={}", tool_name)
raise RuntimeError(f"MCP transport error: {tool_name}") from exc
if result.isError:
raise RuntimeError(f"MCP tool error: {tool_name}")
if isinstance(result.structuredContent, dict):
return result.structuredContent
for content_item in result.content:
if not isinstance(content_item, TextContent):
continue
try:
parsed = json.loads(content_item.text)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}")
async def list_mcp_tools() -> list[dict[str, Any]]:
"""Fetch the MCP tool catalog as plain dicts for API serialization."""
try:
async with streamablehttp_client(url=_mcp_url()) as session_params:
read, write = session_params[0:2]
async with ClientSession(
read,
write,
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
) as session:
await session.initialize()
result = await session.list_tools()
except TimeoutError as exc:
logger.warning("MCP list_tools timeout")
raise RuntimeError("MCP list_tools timeout") from exc
except Exception as exc:
logger.exception("MCP list_tools transport error")
raise RuntimeError("MCP list_tools transport error") from exc
tools: list[dict[str, Any]] = []
for tool in result.tools:
tools.append(
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
}
)
return tools
+529
View File
@@ -0,0 +1,529 @@
"""Builds and runs Agno workflows from JSON scenario definitions.
Each scenario step is a typed MCP tool call. The runner resolves argument
templates from ``session_state``, optionally lets an LLM planner repair
missing fields, invokes the tool, and collects per-step results back into
``session_state`` for downstream steps.
"""
from __future__ import annotations
import asyncio
from collections import OrderedDict
from copy import deepcopy
from datetime import datetime, timezone
import json
import os
from typing import Any, Awaitable, Callable
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow
from loguru import logger
from src.mcp_client import call_mcp_tool
from src.run_registry import EventEmitter, RunRecord
from src.schemas import (
RunFinishedEvent,
RunStartedEvent,
ScenarioRunResponse,
StepFinishedEvent,
StepStartedEvent,
StepState,
)
from src.scenario_store import ScenarioStoreError, load_scenario_definition
from src.step_planner import plan_arguments, planner_enabled
from src.template import (
missing_required_fields,
resolve_path,
resolve_template,
validate_required_fields,
)
def _env_int(name: str, default: int) -> int:
value = os.getenv(name)
return int(value) if value is not None else default
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _build_scope(session_state: dict[str, Any]) -> dict[str, Any]:
return {
"input": session_state.get("input", {}),
"steps": session_state.get("steps", {}),
}
async def _prepare_arguments(
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
) -> dict[str, Any]:
final_arguments = deepcopy(base_arguments)
missing = missing_required_fields(final_arguments, required_fields)
if missing and planner_enabled():
max_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
for attempt in range(1, max_attempts + 1):
final_arguments = await plan_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=final_arguments,
required_fields=required_fields,
scope=scope,
missing_fields=missing,
attempt_no=attempt,
)
missing = missing_required_fields(final_arguments, required_fields)
if not missing:
break
validate_required_fields(final_arguments, required_fields, step_name)
return final_arguments
async def _execute_one_call(
*,
step_name: str,
tool_name: str,
required_fields: list[str],
input_template: Any,
scope: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
resolved = resolve_template(input_template, scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
arguments = await _prepare_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=base_arguments,
required_fields=required_fields,
scope=scope,
)
tool_response = await call_mcp_tool(tool_name, arguments)
return arguments, tool_response
def _build_tool_executor(
step_spec: dict[str, Any],
step_index: int = 0,
) -> Callable[[StepInput, dict[str, Any]], Awaitable[StepOutput]]:
step_name = str(step_spec["name"])
tool_name = str(step_spec["tool"])
input_template = step_spec.get("input", {})
foreach_spec = step_spec.get("foreach")
collect_template = step_spec.get("collect")
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
required_fields = [
f for f in step_spec.get("required_input_fields", []) if isinstance(f, str)
]
if isinstance(foreach_spec, dict):
foreach_from = str(foreach_spec.get("from", "")).strip()
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
else:
foreach_from = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
item_alias = "item"
async def executor(_step_input: StepInput, session_state: dict[str, Any]) -> StepOutput:
started_at = _utc_now_iso()
scope = _build_scope(session_state)
emitter: EventEmitter | None = session_state.get("_emitter")
run_id: str = session_state.get("_run_id", "")
if emitter is not None:
await emitter.emit(
StepStartedEvent(
run_id=run_id,
step_name=step_name,
index=step_index,
started_at=started_at,
).model_dump()
)
try:
if foreach_from:
iterable = resolve_path(scope, foreach_from)
if not isinstance(iterable, list):
raise ValueError(f"{step_name}: foreach source is not list")
collected: list[Any] = []
iteration_requests: list[dict[str, Any]] = []
iteration_responses: list[dict[str, Any]] = []
last_received_at: str | None = None
for index, item in enumerate(iterable):
iteration_scope = {**scope, item_alias: item, "item": item, "index": index}
arguments, tool_response = await _execute_one_call(
step_name=step_name,
tool_name=tool_name,
required_fields=required_fields,
input_template=input_template,
scope=iteration_scope,
)
iteration_requests.append(arguments)
iteration_responses.append(tool_response)
received_at = tool_response.get("received_at")
if isinstance(received_at, str) and received_at:
last_received_at = received_at
if collect_template is None:
collected.append(tool_response.get("payload", {}))
else:
collected.append(
resolve_template(
collect_template,
{**iteration_scope, "tool": tool_response},
)
)
step_payload = {
"ok": True,
"tool_name": tool_name,
"payload": {collect_key: collected},
"request": {
"foreach_from": foreach_from,
"count": len(iterable),
"items": iteration_requests,
},
"response": {"items": iteration_responses},
"received_at": last_received_at,
"started_at": started_at,
"finished_at": _utc_now_iso(),
}
else:
arguments, tool_response = await _execute_one_call(
step_name=step_name,
tool_name=tool_name,
required_fields=required_fields,
input_template=input_template,
scope=scope,
)
step_payload = {
"ok": bool(tool_response.get("ok", True)),
"tool_name": tool_name,
"payload": tool_response.get("payload", {}),
"request": arguments,
"response": tool_response,
"received_at": tool_response.get("received_at"),
"started_at": started_at,
"finished_at": _utc_now_iso(),
}
session_state.setdefault("steps", {})[step_name] = step_payload
if emitter is not None:
await emitter.emit(
StepFinishedEvent(
run_id=run_id,
step_name=step_name,
index=step_index,
status="success",
started_at=started_at,
finished_at=step_payload["finished_at"],
).model_dump()
)
return StepOutput(
content=json.dumps(step_payload, ensure_ascii=False),
success=True,
)
except Exception as exc:
finished_at = _utc_now_iso()
error_payload = {
"ok": False,
"tool_name": tool_name,
"error": str(exc),
"started_at": started_at,
"finished_at": finished_at,
}
session_state.setdefault("steps", {})[step_name] = error_payload
logger.exception("Step {} failed (tool={})", step_name, tool_name)
if emitter is not None:
await emitter.emit(
StepFinishedEvent(
run_id=run_id,
step_name=step_name,
index=step_index,
status="failed",
started_at=started_at,
finished_at=finished_at,
message=str(exc),
).model_dump()
)
raise RuntimeError(f"{step_name} failed: {exc}") from exc
return executor
def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for step_index, raw_step in enumerate(raw_steps):
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
if raw_step.get("type") != "tool":
raise ScenarioStoreError("This runner supports only tool steps")
step_name = str(raw_step.get("name", "")).strip()
tool_name = str(raw_step.get("tool", step_name)).strip()
if not step_name or not tool_name:
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
# Fail-fast by design: the run is considered successful only when every
# step passes. There is no per-step retry or skip policy — downstream
# steps rely on upstream output, so on any failure the workflow stops.
workflow_steps.append(
Step(
name=step_name,
description=str(raw_step.get("description", step_name)),
executor=_build_tool_executor(raw_step, step_index=step_index),
max_retries=0,
on_error="fail",
)
)
return Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
)
_WORKFLOW_CACHE_MAX_SIZE = _env_int("WORKFLOW_CACHE_MAX_SIZE", 64)
_workflow_cache: "OrderedDict[str, Workflow]" = OrderedDict()
def _get_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
cached = _workflow_cache.get(scenario_id)
if cached is not None:
_workflow_cache.move_to_end(scenario_id)
return cached
workflow = _build_workflow(scenario_id, scenario)
_workflow_cache[scenario_id] = workflow
if len(_workflow_cache) > _WORKFLOW_CACHE_MAX_SIZE:
evicted_id, _ = _workflow_cache.popitem(last=False)
logger.debug("Evicted workflow from LRU cache: {}", evicted_id)
return workflow
def _extract_output_summary(result: dict[str, Any] | None) -> str | None:
if not isinstance(result, dict):
return None
summary = result.get("summary")
if isinstance(summary, str) and summary:
return summary
payload = result.get("payload")
if isinstance(payload, dict):
payload_summary = payload.get("summary")
if isinstance(payload_summary, str) and payload_summary:
return payload_summary
return None
def _build_step_states(
scenario: dict[str, Any],
steps_payloads: dict[str, Any],
) -> list[StepState]:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list):
return []
states: list[StepState] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
continue
name = str(raw_step.get("name", "")).strip()
if not name:
continue
payload = steps_payloads.get(name)
if not isinstance(payload, dict):
# Workflow aborted before this step ran (strict fail-fast policy).
states.append(
StepState(
node_id=name,
status="skipped",
message="",
)
)
continue
ok = bool(payload.get("ok", False))
states.append(
StepState(
node_id=name,
status="success" if ok else "failed",
started_at=str(payload.get("started_at") or "") or None,
finished_at=str(payload.get("finished_at") or "") or None,
message="" if ok else str(payload.get("error", f"{name} failed")),
)
)
return states
async def run_scenario(
*,
scenario_id: str,
input_data: dict[str, Any],
emitter: EventEmitter | None = None,
run_id: str = "",
) -> ScenarioRunResponse:
try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
return ScenarioRunResponse(
scenario_id=scenario_id,
status="failed",
message=str(exc),
input=input_data,
run_id=run_id or None,
)
scenario_name = str(scenario.get("name", scenario_id))
try:
workflow = _get_workflow(scenario_id, scenario)
except ScenarioStoreError as exc:
return ScenarioRunResponse(
scenario_id=scenario_id,
status="failed",
message=str(exc),
input=input_data,
scenario_name=scenario_name,
run_id=run_id or None,
)
# Fresh per-run state that Agno owns during arun(..., session_state=...).
session_state: dict[str, Any] = {
"input": deepcopy(input_data),
"steps": {},
"_emitter": emitter,
"_run_id": run_id,
}
workflow_error: str | None = None
run_output: Any = None
try:
run_output = await workflow.arun(
input=input_data,
session_state=session_state,
)
except Exception as exc:
workflow_error = str(exc)
logger.exception("Workflow {} failed", scenario_id)
steps_payloads = session_state.get("steps", {}) or {}
step_states = _build_step_states(scenario, steps_payloads)
# Strict invariant: run is success only when every recorded step payload
# has ok=true. `on_error: skip` lets downstream steps keep running after a
# failure, but it does NOT whitewash the overall run status.
status = "success"
if workflow_error is not None:
status = "failed"
else:
for payload in steps_payloads.values():
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
status = "failed"
break
if run_output is not None and not bool(getattr(run_output, "success", True)):
status = "failed"
content = getattr(run_output, "content", None)
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
if content is None:
for payload in reversed(list(steps_payloads.values())):
if isinstance(payload, dict):
content = deepcopy(payload)
break
if content is None and workflow_error is not None:
content = {"message": workflow_error}
result = content if isinstance(content, dict) else {"raw_content": content}
response_message = "" if status == "success" else (workflow_error or "failed")
return ScenarioRunResponse(
scenario_id=scenario_id,
status=status,
message=response_message,
input=input_data,
steps=step_states,
output_summary=_extract_output_summary(result),
scenario_name=scenario_name,
workflow_name=workflow.name,
result=result,
run_id=run_id or str(getattr(run_output, "run_id", "")) or None,
session_id=str(getattr(run_output, "session_id", "")) or None,
)
async def run_scenario_async(record: RunRecord) -> None:
"""Execute a scenario inside a background task, emitting SSE events.
Lifecycle:
queued → running (run_started) → success|failed (run_finished) → sentinel
"""
emitter = EventEmitter(record)
record.status = "running"
record.started_at = _utc_now_iso()
try:
await emitter.emit(
RunStartedEvent(
run_id=record.run_id,
scenario_id=record.scenario_id,
started_at=record.started_at,
).model_dump()
)
response = await run_scenario(
scenario_id=record.scenario_id,
input_data=record.input,
emitter=emitter,
run_id=record.run_id,
)
record.response = response
record.status = response.status
record.message = response.message
record.finished_at = _utc_now_iso()
await emitter.emit(
RunFinishedEvent(
run_id=record.run_id,
status=response.status,
finished_at=record.finished_at,
message=response.message,
).model_dump()
)
except asyncio.CancelledError:
record.status = "failed"
record.message = "cancelled"
record.finished_at = _utc_now_iso()
await emitter.emit(
RunFinishedEvent(
run_id=record.run_id,
status="failed",
finished_at=record.finished_at,
message="cancelled",
).model_dump()
)
raise
except Exception as exc:
logger.exception("Run {} crashed", record.run_id)
record.status = "failed"
record.message = str(exc)
record.finished_at = _utc_now_iso()
await emitter.emit(
RunFinishedEvent(
run_id=record.run_id,
status="failed",
finished_at=record.finished_at,
message=str(exc),
).model_dump()
)
finally:
await emitter.close()
+50
View File
@@ -0,0 +1,50 @@
"""Phoenix (Arize) OpenTelemetry tracing setup.
Tracing is initialized via the FastAPI lifespan so that import-time side effects
stay out of module load. ``is_phoenix_tracing_enabled`` is cheap and can be
consulted before the app starts (for example, to pass a flag into AgentOS).
"""
from __future__ import annotations
import os
from loguru import logger
from phoenix.otel import register
_initialized = False
def _env_bool(name: str, default: bool) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def is_phoenix_tracing_enabled() -> bool:
return _env_bool("PHOENIX_TRACING_ENABLED", False)
def init_phoenix_tracing() -> bool:
global _initialized
if not is_phoenix_tracing_enabled():
return False
if _initialized:
return True
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = os.getenv(
"PHOENIX_COLLECTOR_ENDPOINT",
"http://localhost:6006",
)
project_name = os.getenv("PHOENIX_PROJECT_NAME", "prisma-platform")
register(
project_name=project_name,
auto_instrument=True,
)
_initialized = True
logger.info("Phoenix tracing initialized (project={})", project_name)
return True
+126
View File
@@ -0,0 +1,126 @@
"""In-memory registry of scenario runs and their event streams.
Each submitted run gets a ``RunRecord`` holding:
- mutable status / partial step state updated by the workflow runner;
- an append-only event log (used to replay history to late SSE subscribers);
- a live asyncio queue that SSE endpoints tail until a terminal ``None``
sentinel is delivered.
The registry is an LRU with ``RUN_REGISTRY_MAX_SIZE`` bound so long-running
processes do not leak run history.
"""
from __future__ import annotations
import asyncio
import os
import uuid
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from loguru import logger
from src.schemas import ScenarioRunResponse
_TERMINAL_STATUSES = {"success", "failed"}
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _env_int(name: str, default: int) -> int:
value = os.getenv(name)
return int(value) if value is not None else default
@dataclass
class RunRecord:
run_id: str
scenario_id: str
input: dict[str, Any]
status: str = "queued"
started_at: str = field(default_factory=_utc_now_iso)
finished_at: str | None = None
message: str = ""
response: ScenarioRunResponse | None = None
events: list[dict[str, Any]] = field(default_factory=list)
subscribers: list[asyncio.Queue] = field(default_factory=list)
task: asyncio.Task | None = None
def is_terminal(self) -> bool:
return self.status in _TERMINAL_STATUSES
class EventEmitter:
"""Fans events out to every live SSE subscriber plus the replay buffer.
Subscribers are ``asyncio.Queue`` instances registered by SSE endpoints;
``None`` is used as a terminal sentinel so consumers can exit cleanly.
"""
def __init__(self, record: RunRecord) -> None:
self._record = record
async def emit(self, event: dict[str, Any]) -> None:
self._record.events.append(event)
for queue in list(self._record.subscribers):
queue.put_nowait(event)
async def close(self) -> None:
for queue in list(self._record.subscribers):
queue.put_nowait(None)
class RunRegistry:
def __init__(self, max_size: int | None = None) -> None:
self._records: "OrderedDict[str, RunRecord]" = OrderedDict()
self._max_size = max_size or _env_int("RUN_REGISTRY_MAX_SIZE", 200)
def create(self, *, scenario_id: str, input_data: dict[str, Any]) -> RunRecord:
run_id = str(uuid.uuid4())
record = RunRecord(
run_id=run_id,
scenario_id=scenario_id,
input=input_data,
)
self._records[run_id] = record
self._evict_if_needed()
logger.info("Run {} created for scenario={}", run_id, scenario_id)
return record
def get(self, run_id: str) -> RunRecord | None:
record = self._records.get(run_id)
if record is not None:
self._records.move_to_end(run_id)
return record
def list_active(self) -> list[RunRecord]:
return [r for r in self._records.values() if not r.is_terminal()]
def _evict_if_needed(self) -> None:
while len(self._records) > self._max_size:
evicted_id, evicted = self._records.popitem(last=False)
if evicted.task is not None and not evicted.task.done():
# Refuse to silently drop an in-flight run — re-insert and stop.
self._records[evicted_id] = evicted
self._records.move_to_end(evicted_id, last=False)
logger.warning(
"Run registry at capacity {} but oldest run is still active; "
"not evicting {}",
self._max_size,
evicted_id,
)
return
logger.debug("Evicted run {} from registry", evicted_id)
_registry = RunRegistry()
def get_registry() -> RunRegistry:
return _registry
+87
View File
@@ -0,0 +1,87 @@
"""File-backed loader for scenario definitions.
Scenarios live under ``scenarios/`` and are indexed by ``scenarios/index.json``.
Each scenario is a JSON object with a ``scenario_id`` that must match the
index key it was looked up by.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
class ScenarioStoreError(ValueError):
"""Raised when scenario definitions are missing or invalid."""
_SCENARIOS_ROOT = Path(__file__).resolve().parent.parent / "scenarios"
_INDEX_PATH = _SCENARIOS_ROOT / "index.json"
def _read_json(path: Path) -> dict[str, Any]:
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise ScenarioStoreError(f"Scenario file not found: {path}") from exc
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise ScenarioStoreError(f"Invalid JSON in file: {path}") from exc
if not isinstance(parsed, dict):
raise ScenarioStoreError(f"JSON root must be object: {path}")
return parsed
def load_scenario_index() -> dict[str, str]:
index = _read_json(_INDEX_PATH)
scenarios = index.get("scenarios")
if not isinstance(scenarios, dict):
raise ScenarioStoreError("index.json must contain object field 'scenarios'")
normalized: dict[str, str] = {}
for scenario_id, relative_path in scenarios.items():
if not isinstance(scenario_id, str) or not isinstance(relative_path, str):
raise ScenarioStoreError("index.json scenario entries must be string -> string")
normalized[scenario_id] = relative_path
return normalized
def load_scenario_definition(scenario_id: str) -> dict[str, Any]:
index = load_scenario_index()
relative_path = index.get(scenario_id)
if relative_path is None:
raise ScenarioStoreError(f"Unknown scenario_id: {scenario_id}")
scenario_path = _SCENARIOS_ROOT / relative_path
scenario = _read_json(scenario_path)
declared_id = scenario.get("scenario_id")
if declared_id != scenario_id:
raise ScenarioStoreError(
"Scenario file scenario_id does not match requested scenario_id"
)
return scenario
def list_scenario_summaries() -> list[dict[str, Any]]:
"""Return metadata for every scenario in the index (no steps)."""
summaries: list[dict[str, Any]] = []
for scenario_id in load_scenario_index().keys():
try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError:
# Broken entry in the index should not take the whole catalog down.
continue
summaries.append(
{
"scenario_id": scenario_id,
"name": scenario.get("name"),
"description": scenario.get("description"),
"input_schema": scenario.get("input_schema"),
}
)
return summaries
+278
View File
@@ -0,0 +1,278 @@
"""Pydantic schemas for the scenario-run REST API."""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
StepStatus = Literal["queued", "running", "success", "failed", "skipped", "waiting_human"]
EventType = Literal["run_started", "step_started", "step_finished", "run_finished"]
class ScenarioRunRequest(BaseModel):
scenario_id: str = "news_source_discovery_v1"
input: dict[str, Any] = Field(default_factory=dict)
model_config = {
"json_schema_extra": {
"examples": [
{
"scenario_id": "news_source_discovery_v1",
"input": {"url": "https://example.com/news/article"},
}
]
}
}
class StepState(BaseModel):
node_id: str
status: StepStatus
started_at: str | None = None
finished_at: str | None = None
message: str = ""
model_config = {
"json_schema_extra": {
"examples": [
{
"node_id": "search_news_sources",
"status": "success",
"started_at": "2026-04-24T09:27:59.875680+00:00",
"finished_at": "2026-04-24T09:28:00.028730+00:00",
"message": "",
}
]
}
}
class ScenarioRunResponse(BaseModel):
scenario_id: str
status: RunStatus
message: str = ""
input: dict[str, Any]
steps: list[StepState] = Field(default_factory=list)
output_summary: str | None = None
scenario_name: str | None = None
workflow_name: str | None = None
result: dict[str, Any] | None = None
run_id: str | None = None
session_id: str | None = None
model_config = {
"json_schema_extra": {
"examples": [
{
"scenario_id": "news_source_discovery_v1",
"status": "success",
"message": "",
"input": {"url": "https://example.com/news/article"},
"steps": [
{
"node_id": "search_news_sources",
"status": "success",
"started_at": "2026-04-24T09:27:59.875680+00:00",
"finished_at": "2026-04-24T09:28:00.028730+00:00",
"message": "",
},
{
"node_id": "generate_summary",
"status": "success",
"started_at": "2026-04-24T09:28:00.781744+00:00",
"finished_at": "2026-04-24T09:28:00.879028+00:00",
"message": "",
},
],
"output_summary": "Самым ранним источником считается https://news-a.example/article-1",
"scenario_name": "News Source Discovery V1",
"workflow_name": "news_source_discovery_v1",
"result": {
"ok": True,
"tool_name": "generate_summary",
"payload": {
"input_count": 3,
"summary": "Самым ранним источником считается https://news-a.example/article-1",
},
},
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"session_id": None,
}
]
}
}
class RunSubmitResponse(BaseModel):
run_id: str
scenario_id: str
status: RunStatus
input: dict[str, Any]
started_at: str
model_config = {
"json_schema_extra": {
"examples": [
{
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"scenario_id": "news_source_discovery_v1",
"status": "queued",
"input": {"url": "https://example.com/news/article"},
"started_at": "2026-04-24T09:27:59.873049+00:00",
}
]
}
}
class ScenarioSummary(BaseModel):
scenario_id: str
name: str | None = None
description: str | None = None
input_schema: dict[str, Any] | None = None
model_config = {
"json_schema_extra": {
"examples": [
{
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": ["url"],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article",
}
},
},
}
]
}
}
class ToolSummary(BaseModel):
name: str
description: str | None = None
input_schema: dict[str, Any] | None = None
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "search_news_sources",
"description": "Search for candidate news source URLs for a given article.",
"input_schema": {
"type": "object",
"required": ["url"],
"properties": {
"url": {"type": "string", "title": "Url"}
},
"title": "search_news_sourcesArguments",
},
}
]
}
}
# ---------------------------------------------------------------------------
# SSE event models. Client parses by the `type` field.
# ---------------------------------------------------------------------------
class RunStartedEvent(BaseModel):
type: Literal["run_started"] = "run_started"
run_id: str
scenario_id: str
started_at: str
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "run_started",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"scenario_id": "news_source_discovery_v1",
"started_at": "2026-04-24T09:27:59.873397+00:00",
}
]
}
}
class StepStartedEvent(BaseModel):
type: Literal["step_started"] = "step_started"
run_id: str
step_name: str
index: int
started_at: str
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "step_started",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"step_name": "search_news_sources",
"index": 0,
"started_at": "2026-04-24T09:27:59.875680+00:00",
}
]
}
}
class StepFinishedEvent(BaseModel):
type: Literal["step_finished"] = "step_finished"
run_id: str
step_name: str
index: int
status: StepStatus
started_at: str | None = None
finished_at: str | None = None
message: str = ""
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "step_finished",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"step_name": "search_news_sources",
"index": 0,
"status": "success",
"started_at": "2026-04-24T09:27:59.875680+00:00",
"finished_at": "2026-04-24T09:28:00.028730+00:00",
"message": "",
}
]
}
}
class RunFinishedEvent(BaseModel):
type: Literal["run_finished"] = "run_finished"
run_id: str
status: RunStatus
finished_at: str
message: str = ""
model_config = {
"json_schema_extra": {
"examples": [
{
"type": "run_finished",
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
"status": "success",
"finished_at": "2026-04-24T09:28:01.750206+00:00",
"message": "",
}
]
}
}
+145
View File
@@ -0,0 +1,145 @@
"""LLM-backed fallback planner for MCP tool arguments.
When a step's resolved arguments are missing required fields, this module
calls an OpenAI-compatible chat completion to fill them from the current
scope (``input`` + prior ``steps``). The planner is best-effort: on any
failure it returns the base arguments unchanged so the caller's validator
can produce a clean error.
"""
from __future__ import annotations
from copy import deepcopy
import json
import os
from typing import Any
from loguru import logger
from openai import AsyncOpenAI
_planner_client: AsyncOpenAI | None = None
def _env_float(name: str, default: float) -> float:
value = os.getenv(name)
if value is None:
return default
return float(value)
def planner_enabled() -> bool:
return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"}
def _get_client() -> AsyncOpenAI:
global _planner_client
if _planner_client is not None:
return _planner_client
_planner_client = AsyncOpenAI(
base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"),
api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"),
)
return _planner_client
def _response_schema(required_fields: list[str]) -> dict[str, Any]:
value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]}
return {
"name": "mcp_arguments",
"strict": True,
"schema": {
"type": "object",
"properties": {
"arguments": {
"type": "object",
"properties": {f: value_schema for f in required_fields},
"required": required_fields,
"additionalProperties": True,
}
},
"required": ["arguments"],
"additionalProperties": False,
},
}
def _extract_arguments(content: Any) -> dict[str, Any]:
candidate: Any = content
if isinstance(candidate, str):
text = candidate.strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.startswith("json"):
text = text[4:].strip()
try:
candidate = json.loads(text)
except json.JSONDecodeError:
return {}
if isinstance(candidate, dict):
if isinstance(candidate.get("arguments"), dict):
return candidate["arguments"]
return candidate
return {}
async def plan_arguments(
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
missing_fields: list[str],
attempt_no: int,
) -> dict[str, Any]:
"""Fallback planner: asks an LLM to fill missing required fields from context.
Returns merged arguments (base + planned). On any failure returns base_arguments
unchanged — caller is responsible for validating required fields afterwards.
"""
prompt = {
"task": "Prepare MCP arguments for this step.",
"step_name": step_name,
"tool_name": tool_name,
"required_fields": required_fields,
"base_arguments": base_arguments,
"missing_fields": missing_fields,
"repair_attempt": attempt_no,
"context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})},
"output": (
"Return only JSON object with key 'arguments'. "
"Fill every missing field from context."
),
}
try:
completion = await _get_client().chat.completions.create(
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
messages=[
{
"role": "system",
"content": (
"You are a tool-input planner. "
"Return only JSON that matches the provided schema."
),
},
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
],
response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)},
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
)
raw = completion.choices[0].message.content if completion.choices else ""
planned = _extract_arguments(raw)
except Exception:
logger.warning(
"Planner call failed for step={} tool={} attempt={}",
step_name,
tool_name,
attempt_no,
)
planned = {}
merged = deepcopy(base_arguments)
if isinstance(planned, dict):
merged.update(planned)
return merged
+58
View File
@@ -0,0 +1,58 @@
"""Variable templating for scenario step inputs.
A dict of shape ``{"from": "path.to.value"}`` resolves to the value at that
dotted path in the current scope. Nested dicts/lists are resolved
recursively; plain values pass through via ``deepcopy``.
"""
from __future__ import annotations
from copy import deepcopy
from typing import Any
def resolve_path(scope: dict[str, Any], path: str) -> Any:
value: Any = scope
for segment in path.split("."):
key = segment.strip()
if not key:
continue
if not isinstance(value, dict):
return None
value = value.get(key)
return deepcopy(value)
def resolve_template(template: Any, scope: dict[str, Any]) -> Any:
if isinstance(template, dict):
if set(template.keys()) == {"from"}:
return resolve_path(scope, str(template["from"]))
return {key: resolve_template(value, scope) for key, value in template.items()}
if isinstance(template, list):
return [resolve_template(item, scope) for item in template]
return deepcopy(template)
def missing_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
) -> list[str]:
missing: list[str] = []
for field in required_fields:
value = arguments.get(field)
if isinstance(value, str) and value.strip():
continue
if value not in (None, "", [], {}):
continue
missing.append(field)
return missing
def validate_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
step_name: str,
) -> None:
missing = missing_required_fields(arguments, required_fields)
if missing:
raise ValueError(f"{step_name}: missing required fields: {', '.join(missing)}")