Compare commits
13 Commits
00e7ef5d11
..
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 2a81f5f58f | |||
| 3357b3c4dd | |||
| 4d037e52eb | |||
| 5ca49821ba | |||
| ad828885e3 | |||
| 93ee7aea1c | |||
| 9068b7fe07 | |||
| 0fbd7dce1a | |||
| d341941f87 | |||
| 2b0c748474 | |||
| 2111964d8b | |||
| d22db07b43 | |||
| 196e9aaf27 |
+27
-3
@@ -1,9 +1,33 @@
|
|||||||
|
# Agent
|
||||||
AGENT_ID=prisma-agent
|
AGENT_ID=prisma-agent
|
||||||
|
AGENT_MARKDOWN=false
|
||||||
|
AGENT_DEBUG_MODE=true
|
||||||
|
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
|
||||||
|
|
||||||
|
# Agent model (Ollama)
|
||||||
OLLAMA_MODEL_ID=gemma4:31b
|
OLLAMA_MODEL_ID=gemma4:31b
|
||||||
OLLAMA_HOST=http://localhost:11435
|
OLLAMA_HOST=http://localhost:11435
|
||||||
OLLAMA_TEMPERATURE=0
|
OLLAMA_TEMPERATURE=0
|
||||||
AGENT_MARKDOWN=false
|
|
||||||
AGENT_DEBUG_MODE=true
|
# API runtime
|
||||||
AGENT_INSTRUCTIONS=You are a helpful assistant. Answer briefly and clearly.
|
|
||||||
AGENT_OS_HOST=127.0.0.1
|
AGENT_OS_HOST=127.0.0.1
|
||||||
AGENT_OS_PORT=7777
|
AGENT_OS_PORT=7777
|
||||||
|
|
||||||
|
# Planner
|
||||||
|
PLANNER_ENABLED=false
|
||||||
|
PLANNER_REPAIR_ATTEMPTS=3
|
||||||
|
|
||||||
|
# Planner model (Polza)
|
||||||
|
POLZA_BASE_URL=https://api.polza.ai/v1
|
||||||
|
POLZA_MODEL_ID=google/gemma-4-31b-it
|
||||||
|
POLZA_API_KEY=key
|
||||||
|
POLZA_TEMPERATURE=0
|
||||||
|
|
||||||
|
# MCP
|
||||||
|
MCP_BASE_URL=http://127.0.0.1:8081/mcp
|
||||||
|
MCP_TIMEOUT_SECONDS=10
|
||||||
|
|
||||||
|
# Observability (Phoenix)
|
||||||
|
PHOENIX_TRACING_ENABLED=false
|
||||||
|
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
|
||||||
|
PHOENIX_PROJECT_NAME=prisma-platform
|
||||||
|
|||||||
@@ -25,3 +25,7 @@ dist/
|
|||||||
.vscode/
|
.vscode/
|
||||||
.DS_Store
|
.DS_Store
|
||||||
.cursor
|
.cursor
|
||||||
|
.claude
|
||||||
|
|
||||||
|
# Cookbook code
|
||||||
|
vendor/agno/cookbook/
|
||||||
@@ -1,8 +1,19 @@
|
|||||||
# Prisma Platform MVP
|
# Prisma Platform MVP
|
||||||
|
|
||||||
Минимальный чат-агент на Agno + Ollama с рантаймом AgentOS.
|
MVP-реализация сценарного раннера на Agno AgentOS.
|
||||||
|
|
||||||
В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn).
|
Текущая схема исполнения:
|
||||||
|
|
||||||
|
- сценарий хранится в `scenarios/*.json`;
|
||||||
|
- исполнение идет через `src/mcp_workflow_runner.py`;
|
||||||
|
- каждый шаг вызывает MCP инструмент через `src/mcp_client.py`;
|
||||||
|
- для подготовки аргументов шага используется planner-агент с моделью через `polza.ai`.
|
||||||
|
|
||||||
|
## Требования
|
||||||
|
|
||||||
|
- Python 3.10+
|
||||||
|
- MCP endpoint (по умолчанию `http://127.0.0.1:8081/mcp`)
|
||||||
|
- доступ к модели через `polza.ai` (`POLZA_API_KEY`)
|
||||||
|
|
||||||
## Текущая структура
|
## Текущая структура
|
||||||
|
|
||||||
@@ -11,11 +22,23 @@ prisma_platform/
|
|||||||
├── .env
|
├── .env
|
||||||
├── .env.example
|
├── .env.example
|
||||||
├── requirements.txt
|
├── requirements.txt
|
||||||
|
├── scenarios/
|
||||||
|
│ ├── index.json
|
||||||
|
│ └── news_source_discovery/
|
||||||
|
│ ├── v1.json
|
||||||
|
│ └── v1_planner_repair.json
|
||||||
└── src/
|
└── src/
|
||||||
├── __init__.py
|
├── __init__.py
|
||||||
|
├── api_routes.py
|
||||||
├── agent_os.py
|
├── agent_os.py
|
||||||
├── agent_runner.py
|
├── agent_runner.py
|
||||||
└── main.py
|
├── mcp_client.py
|
||||||
|
├── mcp_workflow_runner.py
|
||||||
|
├── observability.py
|
||||||
|
├── scenario_store.py
|
||||||
|
├── step_planner.py
|
||||||
|
├── template.py
|
||||||
|
└── schemas.py
|
||||||
```
|
```
|
||||||
|
|
||||||
## Установка
|
## Установка
|
||||||
@@ -29,44 +52,137 @@ cp .env.example .env
|
|||||||
|
|
||||||
## Запуск
|
## Запуск
|
||||||
|
|
||||||
Интерактивный режим чата:
|
1) Поднимите MCP stub (из соседнего репозитория):
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
python -m src.main
|
cd /home/worker/projects/docker-service/mcp-stub
|
||||||
|
docker compose up --build -d
|
||||||
```
|
```
|
||||||
|
|
||||||
Режим одного сообщения:
|
2) Запустите сервер AgentOS:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
python -m src.main --message "Привет, что ты умеешь?"
|
cd /home/worker/projects/prisma_platform
|
||||||
|
.venv/bin/python -m src.agent_os
|
||||||
```
|
```
|
||||||
|
|
||||||
## Запуск AgentOS
|
По умолчанию приложение доступно на `http://127.0.0.1:7777`.
|
||||||
|
|
||||||
Запуск сервера AgentOS:
|
Документация API:
|
||||||
|
|
||||||
```bash
|
|
||||||
python -m src.agent_os
|
|
||||||
```
|
|
||||||
|
|
||||||
По умолчанию AgentOS работает на `http://127.0.0.1:7777`.
|
|
||||||
|
|
||||||
Документация API доступна по адресам:
|
|
||||||
|
|
||||||
- `http://127.0.0.1:7777/docs`
|
- `http://127.0.0.1:7777/docs`
|
||||||
- `http://127.0.0.1:7777/redoc`
|
- `http://127.0.0.1:7777/redoc`
|
||||||
|
|
||||||
|
## HTTP API
|
||||||
|
|
||||||
|
### Запуск сценария (async)
|
||||||
|
|
||||||
|
`POST /api/runs` — планирует выполнение сценария и **сразу** возвращает `run_id`. Само выполнение идёт в фоне.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -s -X POST "http://127.0.0.1:7777/api/runs" \
|
||||||
|
-H "Content-Type: application/json" \
|
||||||
|
-d '{
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"input": { "url": "https://example.com/news" }
|
||||||
|
}'
|
||||||
|
```
|
||||||
|
|
||||||
|
Ответ (`202 Accepted`):
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"run_id": "f3d9…",
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"status": "queued",
|
||||||
|
"input": { "url": "..." },
|
||||||
|
"started_at": "2026-04-24T..."
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Снапшот состояния
|
||||||
|
|
||||||
|
`GET /api/runs/{run_id}` — текущее состояние: `status` (`queued|running|success|failed`), список `steps` со статусами (`success|failed|skipped|queued`), `result` и `output_summary` при завершении.
|
||||||
|
|
||||||
|
### Live-прогресс (SSE)
|
||||||
|
|
||||||
|
`GET /api/runs/{run_id}/events` — Server-Sent Events. Поздние подписчики получают replay уже накопленных событий, затем tail до завершения.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -N http://127.0.0.1:7777/api/runs/$RUN_ID/events
|
||||||
|
```
|
||||||
|
|
||||||
|
Типы событий:
|
||||||
|
|
||||||
|
- `run_started` — `{run_id, scenario_id, started_at}`
|
||||||
|
- `step_started` — `{run_id, step_name, index, started_at}`
|
||||||
|
- `step_finished` — `{run_id, step_name, index, status, started_at, finished_at, message}`
|
||||||
|
- `run_finished` — `{run_id, status, finished_at, message}` (терминальное, поток закрывается)
|
||||||
|
|
||||||
|
### Каталоги
|
||||||
|
|
||||||
|
- `GET /api/scenarios` — список сценариев с метаданными (`scenario_id`, `name`, `description`, `input_schema`).
|
||||||
|
- `GET /api/scenarios/{scenario_id}` — полное определение сценария (для визуализации графа в UI).
|
||||||
|
- `GET /api/tools` — MCP tool catalog: `[{name, description, input_schema}]` (проксируется на `MCP_BASE_URL`).
|
||||||
|
|
||||||
## Переменные окружения
|
## Переменные окружения
|
||||||
|
|
||||||
Основные переменные:
|
Agent:
|
||||||
|
|
||||||
- `AGENT_ID` (по умолчанию: `prisma-agent`)
|
- `AGENT_ID` (default: `prisma-agent`)
|
||||||
- `OLLAMA_MODEL_ID` (по умолчанию: `gemma4:31b`)
|
- `AGENT_MARKDOWN` (default: `false`)
|
||||||
- `OLLAMA_HOST` (по умолчанию: `http://localhost:11435`)
|
- `AGENT_DEBUG_MODE` (default: `true`)
|
||||||
- `OLLAMA_TEMPERATURE` (по умолчанию: `0`)
|
- `AGENT_INSTRUCTIONS`
|
||||||
- `AGENT_MARKDOWN` (по умолчанию: `false`)
|
- `OLLAMA_MODEL_ID` (default: `gemma4:31b`)
|
||||||
- `AGENT_DEBUG_MODE` (по умолчанию: `true`)
|
- `OLLAMA_HOST` (default: `http://localhost:11435`)
|
||||||
- `AGENT_INSTRUCTIONS` (по умолчанию: `You are a helpful assistant. Answer briefly and clearly.`)
|
- `OLLAMA_TEMPERATURE` (default: `0`)
|
||||||
- `AGENT_OS_HOST` (по умолчанию: `127.0.0.1`)
|
|
||||||
- `AGENT_OS_PORT` (по умолчанию: `7777`)
|
API runtime:
|
||||||
|
|
||||||
|
- `AGENT_OS_HOST` (default: `127.0.0.1`)
|
||||||
|
- `AGENT_OS_PORT` (default: `7777`)
|
||||||
|
|
||||||
|
Planner:
|
||||||
|
|
||||||
|
- `PLANNER_ENABLED` (default: `false`)
|
||||||
|
- `PLANNER_REPAIR_ATTEMPTS` (default: `3`)
|
||||||
|
|
||||||
|
Planner model (`polza.ai`):
|
||||||
|
|
||||||
|
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
|
||||||
|
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
|
||||||
|
- `POLZA_API_KEY` (required)
|
||||||
|
- `POLZA_TEMPERATURE` (default: `0`)
|
||||||
|
|
||||||
|
MCP:
|
||||||
|
|
||||||
|
- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`)
|
||||||
|
- `MCP_TIMEOUT_SECONDS` (default: `10`)
|
||||||
|
|
||||||
|
Runtime caches:
|
||||||
|
|
||||||
|
- `WORKFLOW_CACHE_MAX_SIZE` (default: `64`) — лимит LRU кэша построенных workflow.
|
||||||
|
- `RUN_REGISTRY_MAX_SIZE` (default: `200`) — лимит LRU истории run'ов в памяти.
|
||||||
|
|
||||||
|
Phoenix tracing:
|
||||||
|
|
||||||
|
- `PHOENIX_TRACING_ENABLED` (default: `false`)
|
||||||
|
- `PHOENIX_COLLECTOR_ENDPOINT` (default: `http://localhost:6006`)
|
||||||
|
- `PHOENIX_PROJECT_NAME` (default: `prisma-platform`)
|
||||||
|
|
||||||
|
## Phoenix трассировка (локально)
|
||||||
|
|
||||||
|
1) Включите трассировку в `.env`:
|
||||||
|
|
||||||
|
```dotenv
|
||||||
|
PHOENIX_TRACING_ENABLED=true
|
||||||
|
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
|
||||||
|
PHOENIX_PROJECT_NAME=prisma-platform
|
||||||
|
```
|
||||||
|
|
||||||
|
2) Запустите приложение:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
.venv/bin/python -m src.agent_os
|
||||||
|
```
|
||||||
|
|
||||||
|
|||||||
+10
-7
@@ -1,7 +1,10 @@
|
|||||||
agno
|
agno==2.5.17
|
||||||
fastapi
|
fastapi==0.136.0
|
||||||
uvicorn
|
uvicorn==0.44.0
|
||||||
python-dotenv
|
python-dotenv==1.2.2
|
||||||
ollama
|
ollama==0.6.1
|
||||||
socksio
|
socksio==1.0.0
|
||||||
openai
|
openai==2.32.0
|
||||||
|
arize-phoenix-otel==0.15.0
|
||||||
|
openinference-instrumentation-agno==0.1.30
|
||||||
|
loguru==0.7.3
|
||||||
|
|||||||
@@ -0,0 +1,6 @@
|
|||||||
|
{
|
||||||
|
"scenarios": {
|
||||||
|
"news_source_discovery_v1": "news_source_discovery/v1.json",
|
||||||
|
"news_source_discovery_v1_planner_repair": "news_source_discovery/v1_planner_repair.json"
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,105 @@
|
|||||||
|
{
|
||||||
|
"schema_version": "1",
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"name": "News Source Discovery V1",
|
||||||
|
"description": "Find earliest news source using sequential MCP tools.",
|
||||||
|
"input_schema": {
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"url"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"url": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "URL of source news article"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"name": "search_news_sources",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "search_news_sources",
|
||||||
|
"input": {
|
||||||
|
"url": {
|
||||||
|
"from": "input.url"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required_input_fields": [
|
||||||
|
"url"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "parse_articles_batch",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "parse_article",
|
||||||
|
"foreach": {
|
||||||
|
"from": "steps.search_news_sources.payload.items",
|
||||||
|
"as": "item"
|
||||||
|
},
|
||||||
|
"input": {
|
||||||
|
"url": {
|
||||||
|
"from": "item.url"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"collect": {
|
||||||
|
"url": {
|
||||||
|
"from": "tool.payload.url"
|
||||||
|
},
|
||||||
|
"title": {
|
||||||
|
"from": "tool.payload.title"
|
||||||
|
},
|
||||||
|
"text": {
|
||||||
|
"from": "tool.payload.text"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"collect_key": "items"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "extract_publication_date_batch",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "extract_publication_date",
|
||||||
|
"foreach": {
|
||||||
|
"from": "steps.parse_articles_batch.payload.items",
|
||||||
|
"as": "item"
|
||||||
|
},
|
||||||
|
"input": {
|
||||||
|
"article_text": {
|
||||||
|
"from": "item.text"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"collect": {
|
||||||
|
"url": {
|
||||||
|
"from": "item.url"
|
||||||
|
},
|
||||||
|
"title": {
|
||||||
|
"from": "item.title"
|
||||||
|
},
|
||||||
|
"published_at": {
|
||||||
|
"from": "tool.payload.published_at"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"collect_key": "items"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rank_sources_by_date",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "rank_sources_by_date",
|
||||||
|
"input": {
|
||||||
|
"items": {
|
||||||
|
"from": "steps.extract_publication_date_batch.payload.items"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "generate_summary",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "generate_summary",
|
||||||
|
"input": {
|
||||||
|
"items": {
|
||||||
|
"from": "steps.rank_sources_by_date.payload.ranked_items"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -0,0 +1,117 @@
|
|||||||
|
{
|
||||||
|
"schema_version": "1",
|
||||||
|
"scenario_id": "news_source_discovery_v1_planner_repair",
|
||||||
|
"name": "News Source Discovery V1 Planner Repair",
|
||||||
|
"description": "Test scenario with intentionally wrong input paths repaired by planner.",
|
||||||
|
"input_schema": {
|
||||||
|
"type": "object",
|
||||||
|
"required": [
|
||||||
|
"url"
|
||||||
|
],
|
||||||
|
"properties": {
|
||||||
|
"url": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "URL of source news article"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"name": "search_news_sources",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "search_news_sources",
|
||||||
|
"input": {
|
||||||
|
"url": {
|
||||||
|
"from": "input.url"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required_input_fields": [
|
||||||
|
"url"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "parse_articles_batch",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "parse_article",
|
||||||
|
"foreach": {
|
||||||
|
"from": "steps.search_news_sources.payload.items",
|
||||||
|
"as": "item"
|
||||||
|
},
|
||||||
|
"input": {
|
||||||
|
"url": {
|
||||||
|
"from": "item.link"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required_input_fields": [
|
||||||
|
"url"
|
||||||
|
],
|
||||||
|
"collect": {
|
||||||
|
"url": {
|
||||||
|
"from": "tool.payload.url"
|
||||||
|
},
|
||||||
|
"title": {
|
||||||
|
"from": "tool.payload.title"
|
||||||
|
},
|
||||||
|
"text": {
|
||||||
|
"from": "tool.payload.text"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"collect_key": "items"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "extract_publication_date_batch",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "extract_publication_date",
|
||||||
|
"foreach": {
|
||||||
|
"from": "steps.parse_articles_batch.payload.items",
|
||||||
|
"as": "item"
|
||||||
|
},
|
||||||
|
"input": {
|
||||||
|
"article_text": {
|
||||||
|
"from": "item.body"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required_input_fields": [
|
||||||
|
"article_text"
|
||||||
|
],
|
||||||
|
"collect": {
|
||||||
|
"url": {
|
||||||
|
"from": "item.url"
|
||||||
|
},
|
||||||
|
"title": {
|
||||||
|
"from": "item.title"
|
||||||
|
},
|
||||||
|
"published_at": {
|
||||||
|
"from": "tool.payload.published_at"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"collect_key": "items"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "rank_sources_by_date",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "rank_sources_by_date",
|
||||||
|
"input": {
|
||||||
|
"items": {
|
||||||
|
"from": "steps.extract_publication_date_batch.payload.items"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required_input_fields": [
|
||||||
|
"items"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "generate_summary",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "generate_summary",
|
||||||
|
"input": {
|
||||||
|
"items": {
|
||||||
|
"from": "steps.rank_sources_by_date.payload.items_ranked_typo"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required_input_fields": [
|
||||||
|
"items"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
+42
-1
@@ -1,15 +1,56 @@
|
|||||||
|
"""AgentOS entrypoint: wires the agent, REST routes and FastAPI lifespan.
|
||||||
|
|
||||||
|
Phoenix tracing is initialized from the lifespan (not at import time) so that
|
||||||
|
importing this module for tooling or tests does not spin up the tracer.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
from fastapi import FastAPI
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
from agno.os import AgentOS
|
from agno.os import AgentOS
|
||||||
|
|
||||||
from src.agent_runner import get_agent
|
from src.agent_runner import get_agent
|
||||||
|
from src.api_routes import router as api_router
|
||||||
|
from src.observability import init_phoenix_tracing, is_phoenix_tracing_enabled
|
||||||
|
from src.run_registry import get_registry
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def _lifespan(_app: FastAPI):
|
||||||
|
init_phoenix_tracing()
|
||||||
|
logger.info("Prisma Platform API starting up")
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
active = get_registry().list_active()
|
||||||
|
if active:
|
||||||
|
logger.info("Cancelling {} active run(s) on shutdown", len(active))
|
||||||
|
for record in active:
|
||||||
|
if record.task is not None and not record.task.done():
|
||||||
|
record.task.cancel()
|
||||||
|
logger.info("Prisma Platform API shutting down")
|
||||||
|
|
||||||
|
|
||||||
_agent = get_agent()
|
_agent = get_agent()
|
||||||
_agent_os = AgentOS(agents=[_agent])
|
_base_app = FastAPI(
|
||||||
|
title="Prisma Platform API",
|
||||||
|
version="0.1.0",
|
||||||
|
lifespan=_lifespan,
|
||||||
|
)
|
||||||
|
_base_app.include_router(api_router)
|
||||||
|
_agent_os = AgentOS(
|
||||||
|
agents=[_agent],
|
||||||
|
tracing=is_phoenix_tracing_enabled(),
|
||||||
|
base_app=_base_app,
|
||||||
|
)
|
||||||
app = _agent_os.get_app()
|
app = _agent_os.get_app()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
+8
-6
@@ -1,3 +1,11 @@
|
|||||||
|
"""Lazy factory for the top-level Prisma agent.
|
||||||
|
|
||||||
|
Config is read from environment variables so the same module can be used by
|
||||||
|
the API server, CLI tools and tests without re-wiring.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from agno.agent import Agent
|
from agno.agent import Agent
|
||||||
@@ -47,9 +55,3 @@ def get_agent() -> Agent:
|
|||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
)
|
)
|
||||||
return _agent
|
return _agent
|
||||||
|
|
||||||
|
|
||||||
async def run_agent(message: str) -> str:
|
|
||||||
agent = get_agent()
|
|
||||||
response = await agent.arun(message)
|
|
||||||
return str(response.content)
|
|
||||||
|
|||||||
@@ -0,0 +1,279 @@
|
|||||||
|
"""REST routes for scenario execution, catalogs and live run events.
|
||||||
|
|
||||||
|
Runs are executed asynchronously: ``POST /api/runs`` schedules a background
|
||||||
|
task and returns immediately with a ``run_id``. Clients consume progress
|
||||||
|
via ``GET /api/runs/{run_id}/events`` (SSE) or poll
|
||||||
|
``GET /api/runs/{run_id}`` for a snapshot.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from typing import Any, AsyncIterator
|
||||||
|
|
||||||
|
from fastapi import APIRouter, HTTPException
|
||||||
|
from fastapi.responses import StreamingResponse
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from src.mcp_client import list_mcp_tools
|
||||||
|
from src.mcp_workflow_runner import run_scenario_async
|
||||||
|
from src.run_registry import RunRecord, get_registry
|
||||||
|
from src.scenario_store import (
|
||||||
|
ScenarioStoreError,
|
||||||
|
list_scenario_summaries,
|
||||||
|
load_scenario_definition,
|
||||||
|
)
|
||||||
|
from src.schemas import (
|
||||||
|
RunSubmitResponse,
|
||||||
|
ScenarioRunRequest,
|
||||||
|
ScenarioRunResponse,
|
||||||
|
ScenarioSummary,
|
||||||
|
StepState,
|
||||||
|
ToolSummary,
|
||||||
|
)
|
||||||
|
|
||||||
|
router = APIRouter(prefix="/api", tags=["workflow"])
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Runs
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/runs",
|
||||||
|
response_model=RunSubmitResponse,
|
||||||
|
status_code=202,
|
||||||
|
summary="Schedule a scenario run",
|
||||||
|
description=(
|
||||||
|
"Creates a run record and schedules execution in the background. "
|
||||||
|
"Returns immediately with a `run_id`; poll `GET /api/runs/{run_id}` "
|
||||||
|
"or subscribe to `GET /api/runs/{run_id}/events` for progress."
|
||||||
|
),
|
||||||
|
)
|
||||||
|
async def post_run(request: ScenarioRunRequest) -> RunSubmitResponse:
|
||||||
|
registry = get_registry()
|
||||||
|
record = registry.create(
|
||||||
|
scenario_id=request.scenario_id,
|
||||||
|
input_data=request.input,
|
||||||
|
)
|
||||||
|
record.task = asyncio.create_task(run_scenario_async(record))
|
||||||
|
return RunSubmitResponse(
|
||||||
|
run_id=record.run_id,
|
||||||
|
scenario_id=record.scenario_id,
|
||||||
|
status=record.status,
|
||||||
|
input=record.input,
|
||||||
|
started_at=record.started_at,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/runs/{run_id}",
|
||||||
|
response_model=ScenarioRunResponse,
|
||||||
|
summary="Get run snapshot",
|
||||||
|
description=(
|
||||||
|
"Returns the current state of a run. For running runs the `steps` "
|
||||||
|
"list reflects progress so far; for terminal runs it is complete."
|
||||||
|
),
|
||||||
|
responses={404: {"description": "Unknown run_id"}},
|
||||||
|
)
|
||||||
|
async def get_run(run_id: str) -> ScenarioRunResponse:
|
||||||
|
record = _require_run(run_id)
|
||||||
|
if record.response is not None:
|
||||||
|
return record.response
|
||||||
|
return _snapshot_from_record(record)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/runs/{run_id}/events",
|
||||||
|
summary="Live run progress (SSE)",
|
||||||
|
description=(
|
||||||
|
"Server-Sent Events stream. Late subscribers receive a replay of "
|
||||||
|
"buffered events first, then tail new events until `run_finished`.\n\n"
|
||||||
|
"Event types: `run_started`, `step_started`, `step_finished`, "
|
||||||
|
"`run_finished`. Each event is JSON in the SSE `data:` field."
|
||||||
|
),
|
||||||
|
responses={
|
||||||
|
200: {
|
||||||
|
"description": "SSE stream of run events",
|
||||||
|
"content": {
|
||||||
|
"text/event-stream": {
|
||||||
|
"example": (
|
||||||
|
"event: run_started\n"
|
||||||
|
'data: {"type":"run_started","run_id":"76d6903c-f520-4a40-b0fc-8fed3f7955d2",'
|
||||||
|
'"scenario_id":"news_source_discovery_v1","started_at":"2026-04-24T09:27:59.873+00:00"}\n\n'
|
||||||
|
"event: step_started\n"
|
||||||
|
'data: {"type":"step_started","run_id":"76d6903c-...","step_name":"search_news_sources",'
|
||||||
|
'"index":0,"started_at":"2026-04-24T09:27:59.875+00:00"}\n\n'
|
||||||
|
"event: step_finished\n"
|
||||||
|
'data: {"type":"step_finished","run_id":"76d6903c-...","step_name":"search_news_sources",'
|
||||||
|
'"index":0,"status":"success","started_at":"2026-04-24T09:27:59.875+00:00",'
|
||||||
|
'"finished_at":"2026-04-24T09:28:00.028+00:00","message":""}\n\n'
|
||||||
|
"event: run_finished\n"
|
||||||
|
'data: {"type":"run_finished","run_id":"76d6903c-...","status":"success",'
|
||||||
|
'"finished_at":"2026-04-24T09:28:01.750+00:00","message":""}\n\n'
|
||||||
|
)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
404: {"description": "Unknown run_id"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
async def get_run_events(run_id: str) -> StreamingResponse:
|
||||||
|
record = _require_run(run_id)
|
||||||
|
return StreamingResponse(
|
||||||
|
_event_stream(record),
|
||||||
|
media_type="text/event-stream",
|
||||||
|
headers={
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"X-Accel-Buffering": "no",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scenario catalog
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/scenarios",
|
||||||
|
response_model=list[ScenarioSummary],
|
||||||
|
summary="List available scenarios",
|
||||||
|
description="Returns metadata (id, name, description, input schema) for every scenario in the index.",
|
||||||
|
)
|
||||||
|
async def get_scenarios() -> list[ScenarioSummary]:
|
||||||
|
return [ScenarioSummary(**s) for s in list_scenario_summaries()]
|
||||||
|
|
||||||
|
|
||||||
|
_SCENARIO_DEFINITION_EXAMPLE: dict[str, Any] = {
|
||||||
|
"schema_version": "1",
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"name": "News Source Discovery V1",
|
||||||
|
"description": "Find earliest news source using sequential MCP tools.",
|
||||||
|
"input_schema": {
|
||||||
|
"type": "object",
|
||||||
|
"required": ["url"],
|
||||||
|
"properties": {
|
||||||
|
"url": {"type": "string", "description": "URL of source news article"}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"name": "search_news_sources",
|
||||||
|
"type": "tool",
|
||||||
|
"tool": "search_news_sources",
|
||||||
|
"input": {"url": {"from": "input.url"}},
|
||||||
|
"required_input_fields": ["url"],
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/scenarios/{scenario_id}",
|
||||||
|
summary="Get full scenario definition",
|
||||||
|
description="Returns the raw scenario JSON (including the `steps` graph) for UI visualization.",
|
||||||
|
responses={
|
||||||
|
200: {
|
||||||
|
"description": "Scenario definition",
|
||||||
|
"content": {"application/json": {"example": _SCENARIO_DEFINITION_EXAMPLE}},
|
||||||
|
},
|
||||||
|
404: {"description": "Unknown scenario_id"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
async def get_scenario(scenario_id: str) -> dict[str, Any]:
|
||||||
|
try:
|
||||||
|
return load_scenario_definition(scenario_id)
|
||||||
|
except ScenarioStoreError as exc:
|
||||||
|
raise HTTPException(status_code=404, detail=str(exc)) from exc
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tool catalog
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
@router.get(
|
||||||
|
"/tools",
|
||||||
|
response_model=list[ToolSummary],
|
||||||
|
summary="List MCP tools",
|
||||||
|
description="Proxies MCP `list_tools()` and returns name, description, and input schema for each tool.",
|
||||||
|
responses={502: {"description": "MCP transport error"}},
|
||||||
|
)
|
||||||
|
async def get_tools() -> list[ToolSummary]:
|
||||||
|
try:
|
||||||
|
tools = await list_mcp_tools()
|
||||||
|
except RuntimeError as exc:
|
||||||
|
logger.warning("Failed to fetch MCP tools: {}", exc)
|
||||||
|
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||||
|
return [ToolSummary(**t) for t in tools]
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Helpers
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
def _require_run(run_id: str) -> RunRecord:
|
||||||
|
record = get_registry().get(run_id)
|
||||||
|
if record is None:
|
||||||
|
raise HTTPException(status_code=404, detail=f"Unknown run_id: {run_id}")
|
||||||
|
return record
|
||||||
|
|
||||||
|
|
||||||
|
def _snapshot_from_record(record: RunRecord) -> ScenarioRunResponse:
|
||||||
|
"""Build a partial ScenarioRunResponse for a still-running or pre-start run."""
|
||||||
|
steps: list[StepState] = []
|
||||||
|
for event in record.events:
|
||||||
|
if event.get("type") != "step_finished":
|
||||||
|
continue
|
||||||
|
steps.append(
|
||||||
|
StepState(
|
||||||
|
node_id=str(event.get("step_name", "")),
|
||||||
|
status=event.get("status", "failed"),
|
||||||
|
started_at=event.get("started_at"),
|
||||||
|
finished_at=event.get("finished_at"),
|
||||||
|
message=str(event.get("message", "")),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return ScenarioRunResponse(
|
||||||
|
scenario_id=record.scenario_id,
|
||||||
|
status=record.status,
|
||||||
|
message=record.message,
|
||||||
|
input=record.input,
|
||||||
|
steps=steps,
|
||||||
|
run_id=record.run_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _event_stream(record: RunRecord) -> AsyncIterator[bytes]:
|
||||||
|
"""Replay buffered events, then tail a fresh subscriber queue.
|
||||||
|
|
||||||
|
The snapshot/subscribe pair runs without any intervening ``await``, so no
|
||||||
|
emitted event can slip between the replay cutoff and the subscription.
|
||||||
|
Events emitted during replay land in the queue and are drained afterwards.
|
||||||
|
"""
|
||||||
|
queue: asyncio.Queue = asyncio.Queue()
|
||||||
|
buffered = list(record.events)
|
||||||
|
record.subscribers.append(queue)
|
||||||
|
try:
|
||||||
|
for event in buffered:
|
||||||
|
yield _format_sse(event)
|
||||||
|
if record.is_terminal():
|
||||||
|
return
|
||||||
|
while True:
|
||||||
|
event = await queue.get()
|
||||||
|
if event is None:
|
||||||
|
return
|
||||||
|
yield _format_sse(event)
|
||||||
|
finally:
|
||||||
|
if queue in record.subscribers:
|
||||||
|
record.subscribers.remove(queue)
|
||||||
|
|
||||||
|
|
||||||
|
def _format_sse(event: dict[str, Any]) -> bytes:
|
||||||
|
event_type = str(event.get("type", "message"))
|
||||||
|
payload = json.dumps(event, ensure_ascii=False)
|
||||||
|
return f"event: {event_type}\ndata: {payload}\n\n".encode("utf-8")
|
||||||
-43
@@ -1,43 +0,0 @@
|
|||||||
import argparse
|
|
||||||
import asyncio
|
|
||||||
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
|
|
||||||
from src.agent_runner import run_agent
|
|
||||||
|
|
||||||
|
|
||||||
def build_parser() -> argparse.ArgumentParser:
|
|
||||||
parser = argparse.ArgumentParser(
|
|
||||||
description="Run base chat agent.",
|
|
||||||
)
|
|
||||||
parser.add_argument(
|
|
||||||
"--message",
|
|
||||||
help="Single message mode. If omitted, starts interactive chat.",
|
|
||||||
)
|
|
||||||
return parser
|
|
||||||
|
|
||||||
|
|
||||||
async def _main() -> None:
|
|
||||||
load_dotenv()
|
|
||||||
args = build_parser().parse_args()
|
|
||||||
|
|
||||||
if args.message:
|
|
||||||
result = await run_agent(args.message)
|
|
||||||
print(result)
|
|
||||||
return
|
|
||||||
|
|
||||||
print("Chat mode started. Type 'exit' or 'quit' to stop.")
|
|
||||||
while True:
|
|
||||||
user_message = input("you> ").strip()
|
|
||||||
if not user_message:
|
|
||||||
continue
|
|
||||||
if user_message.lower() in {"exit", "quit"}:
|
|
||||||
print("Bye.")
|
|
||||||
break
|
|
||||||
|
|
||||||
result = await run_agent(user_message)
|
|
||||||
print(f"agent> {result}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
asyncio.run(_main())
|
|
||||||
@@ -0,0 +1,96 @@
|
|||||||
|
"""Thin async client for MCP tool invocation over streamable HTTP.
|
||||||
|
|
||||||
|
Opens a short-lived ``ClientSession`` per call, wraps the tool response in
|
||||||
|
a normalized dict, and raises ``RuntimeError`` on transport/tool errors.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import timedelta
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from mcp import ClientSession
|
||||||
|
from mcp.client.streamable_http import streamablehttp_client
|
||||||
|
from mcp.types import TextContent
|
||||||
|
|
||||||
|
|
||||||
|
def _mcp_url() -> str:
|
||||||
|
return os.getenv("MCP_BASE_URL", "http://127.0.0.1:8081/mcp")
|
||||||
|
|
||||||
|
|
||||||
|
def _timeout_seconds() -> float:
|
||||||
|
value = os.getenv("MCP_TIMEOUT_SECONDS")
|
||||||
|
if value is None:
|
||||||
|
return 10.0
|
||||||
|
return float(value)
|
||||||
|
|
||||||
|
|
||||||
|
async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
try:
|
||||||
|
async with streamablehttp_client(url=_mcp_url()) as session_params:
|
||||||
|
read, write = session_params[0:2]
|
||||||
|
async with ClientSession(
|
||||||
|
read,
|
||||||
|
write,
|
||||||
|
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
|
||||||
|
) as session:
|
||||||
|
await session.initialize()
|
||||||
|
result = await session.call_tool(tool_name, arguments)
|
||||||
|
except TimeoutError as exc:
|
||||||
|
logger.warning("MCP timeout: tool={}", tool_name)
|
||||||
|
raise RuntimeError(f"MCP timeout: {tool_name}") from exc
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception("MCP transport error: tool={}", tool_name)
|
||||||
|
raise RuntimeError(f"MCP transport error: {tool_name}") from exc
|
||||||
|
|
||||||
|
if result.isError:
|
||||||
|
raise RuntimeError(f"MCP tool error: {tool_name}")
|
||||||
|
|
||||||
|
if isinstance(result.structuredContent, dict):
|
||||||
|
return result.structuredContent
|
||||||
|
|
||||||
|
for content_item in result.content:
|
||||||
|
if not isinstance(content_item, TextContent):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
parsed = json.loads(content_item.text)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
if isinstance(parsed, dict):
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}")
|
||||||
|
|
||||||
|
|
||||||
|
async def list_mcp_tools() -> list[dict[str, Any]]:
|
||||||
|
"""Fetch the MCP tool catalog as plain dicts for API serialization."""
|
||||||
|
try:
|
||||||
|
async with streamablehttp_client(url=_mcp_url()) as session_params:
|
||||||
|
read, write = session_params[0:2]
|
||||||
|
async with ClientSession(
|
||||||
|
read,
|
||||||
|
write,
|
||||||
|
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
|
||||||
|
) as session:
|
||||||
|
await session.initialize()
|
||||||
|
result = await session.list_tools()
|
||||||
|
except TimeoutError as exc:
|
||||||
|
logger.warning("MCP list_tools timeout")
|
||||||
|
raise RuntimeError("MCP list_tools timeout") from exc
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception("MCP list_tools transport error")
|
||||||
|
raise RuntimeError("MCP list_tools transport error") from exc
|
||||||
|
|
||||||
|
tools: list[dict[str, Any]] = []
|
||||||
|
for tool in result.tools:
|
||||||
|
tools.append(
|
||||||
|
{
|
||||||
|
"name": tool.name,
|
||||||
|
"description": tool.description,
|
||||||
|
"input_schema": tool.inputSchema,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return tools
|
||||||
@@ -0,0 +1,529 @@
|
|||||||
|
"""Builds and runs Agno workflows from JSON scenario definitions.
|
||||||
|
|
||||||
|
Each scenario step is a typed MCP tool call. The runner resolves argument
|
||||||
|
templates from ``session_state``, optionally lets an LLM planner repair
|
||||||
|
missing fields, invokes the tool, and collects per-step results back into
|
||||||
|
``session_state`` for downstream steps.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
from collections import OrderedDict
|
||||||
|
from copy import deepcopy
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any, Awaitable, Callable
|
||||||
|
|
||||||
|
from agno.workflow.step import Step, StepInput, StepOutput
|
||||||
|
from agno.workflow.workflow import Workflow
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from src.mcp_client import call_mcp_tool
|
||||||
|
from src.run_registry import EventEmitter, RunRecord
|
||||||
|
from src.schemas import (
|
||||||
|
RunFinishedEvent,
|
||||||
|
RunStartedEvent,
|
||||||
|
ScenarioRunResponse,
|
||||||
|
StepFinishedEvent,
|
||||||
|
StepStartedEvent,
|
||||||
|
StepState,
|
||||||
|
)
|
||||||
|
from src.scenario_store import ScenarioStoreError, load_scenario_definition
|
||||||
|
from src.step_planner import plan_arguments, planner_enabled
|
||||||
|
from src.template import (
|
||||||
|
missing_required_fields,
|
||||||
|
resolve_path,
|
||||||
|
resolve_template,
|
||||||
|
validate_required_fields,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _env_int(name: str, default: int) -> int:
|
||||||
|
value = os.getenv(name)
|
||||||
|
return int(value) if value is not None else default
|
||||||
|
|
||||||
|
|
||||||
|
def _utc_now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def _build_scope(session_state: dict[str, Any]) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"input": session_state.get("input", {}),
|
||||||
|
"steps": session_state.get("steps", {}),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def _prepare_arguments(
|
||||||
|
*,
|
||||||
|
step_name: str,
|
||||||
|
tool_name: str,
|
||||||
|
base_arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
scope: dict[str, Any],
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
final_arguments = deepcopy(base_arguments)
|
||||||
|
missing = missing_required_fields(final_arguments, required_fields)
|
||||||
|
if missing and planner_enabled():
|
||||||
|
max_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
|
||||||
|
for attempt in range(1, max_attempts + 1):
|
||||||
|
final_arguments = await plan_arguments(
|
||||||
|
step_name=step_name,
|
||||||
|
tool_name=tool_name,
|
||||||
|
base_arguments=final_arguments,
|
||||||
|
required_fields=required_fields,
|
||||||
|
scope=scope,
|
||||||
|
missing_fields=missing,
|
||||||
|
attempt_no=attempt,
|
||||||
|
)
|
||||||
|
missing = missing_required_fields(final_arguments, required_fields)
|
||||||
|
if not missing:
|
||||||
|
break
|
||||||
|
validate_required_fields(final_arguments, required_fields, step_name)
|
||||||
|
return final_arguments
|
||||||
|
|
||||||
|
|
||||||
|
async def _execute_one_call(
|
||||||
|
*,
|
||||||
|
step_name: str,
|
||||||
|
tool_name: str,
|
||||||
|
required_fields: list[str],
|
||||||
|
input_template: Any,
|
||||||
|
scope: dict[str, Any],
|
||||||
|
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||||
|
resolved = resolve_template(input_template, scope)
|
||||||
|
base_arguments = resolved if isinstance(resolved, dict) else {}
|
||||||
|
arguments = await _prepare_arguments(
|
||||||
|
step_name=step_name,
|
||||||
|
tool_name=tool_name,
|
||||||
|
base_arguments=base_arguments,
|
||||||
|
required_fields=required_fields,
|
||||||
|
scope=scope,
|
||||||
|
)
|
||||||
|
tool_response = await call_mcp_tool(tool_name, arguments)
|
||||||
|
return arguments, tool_response
|
||||||
|
|
||||||
|
|
||||||
|
def _build_tool_executor(
|
||||||
|
step_spec: dict[str, Any],
|
||||||
|
step_index: int = 0,
|
||||||
|
) -> Callable[[StepInput, dict[str, Any]], Awaitable[StepOutput]]:
|
||||||
|
step_name = str(step_spec["name"])
|
||||||
|
tool_name = str(step_spec["tool"])
|
||||||
|
input_template = step_spec.get("input", {})
|
||||||
|
foreach_spec = step_spec.get("foreach")
|
||||||
|
collect_template = step_spec.get("collect")
|
||||||
|
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
|
||||||
|
required_fields = [
|
||||||
|
f for f in step_spec.get("required_input_fields", []) if isinstance(f, str)
|
||||||
|
]
|
||||||
|
|
||||||
|
if isinstance(foreach_spec, dict):
|
||||||
|
foreach_from = str(foreach_spec.get("from", "")).strip()
|
||||||
|
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
|
||||||
|
else:
|
||||||
|
foreach_from = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
|
||||||
|
item_alias = "item"
|
||||||
|
|
||||||
|
async def executor(_step_input: StepInput, session_state: dict[str, Any]) -> StepOutput:
|
||||||
|
started_at = _utc_now_iso()
|
||||||
|
scope = _build_scope(session_state)
|
||||||
|
emitter: EventEmitter | None = session_state.get("_emitter")
|
||||||
|
run_id: str = session_state.get("_run_id", "")
|
||||||
|
|
||||||
|
if emitter is not None:
|
||||||
|
await emitter.emit(
|
||||||
|
StepStartedEvent(
|
||||||
|
run_id=run_id,
|
||||||
|
step_name=step_name,
|
||||||
|
index=step_index,
|
||||||
|
started_at=started_at,
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
if foreach_from:
|
||||||
|
iterable = resolve_path(scope, foreach_from)
|
||||||
|
if not isinstance(iterable, list):
|
||||||
|
raise ValueError(f"{step_name}: foreach source is not list")
|
||||||
|
|
||||||
|
collected: list[Any] = []
|
||||||
|
iteration_requests: list[dict[str, Any]] = []
|
||||||
|
iteration_responses: list[dict[str, Any]] = []
|
||||||
|
last_received_at: str | None = None
|
||||||
|
for index, item in enumerate(iterable):
|
||||||
|
iteration_scope = {**scope, item_alias: item, "item": item, "index": index}
|
||||||
|
arguments, tool_response = await _execute_one_call(
|
||||||
|
step_name=step_name,
|
||||||
|
tool_name=tool_name,
|
||||||
|
required_fields=required_fields,
|
||||||
|
input_template=input_template,
|
||||||
|
scope=iteration_scope,
|
||||||
|
)
|
||||||
|
iteration_requests.append(arguments)
|
||||||
|
iteration_responses.append(tool_response)
|
||||||
|
received_at = tool_response.get("received_at")
|
||||||
|
if isinstance(received_at, str) and received_at:
|
||||||
|
last_received_at = received_at
|
||||||
|
if collect_template is None:
|
||||||
|
collected.append(tool_response.get("payload", {}))
|
||||||
|
else:
|
||||||
|
collected.append(
|
||||||
|
resolve_template(
|
||||||
|
collect_template,
|
||||||
|
{**iteration_scope, "tool": tool_response},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
step_payload = {
|
||||||
|
"ok": True,
|
||||||
|
"tool_name": tool_name,
|
||||||
|
"payload": {collect_key: collected},
|
||||||
|
"request": {
|
||||||
|
"foreach_from": foreach_from,
|
||||||
|
"count": len(iterable),
|
||||||
|
"items": iteration_requests,
|
||||||
|
},
|
||||||
|
"response": {"items": iteration_responses},
|
||||||
|
"received_at": last_received_at,
|
||||||
|
"started_at": started_at,
|
||||||
|
"finished_at": _utc_now_iso(),
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
arguments, tool_response = await _execute_one_call(
|
||||||
|
step_name=step_name,
|
||||||
|
tool_name=tool_name,
|
||||||
|
required_fields=required_fields,
|
||||||
|
input_template=input_template,
|
||||||
|
scope=scope,
|
||||||
|
)
|
||||||
|
step_payload = {
|
||||||
|
"ok": bool(tool_response.get("ok", True)),
|
||||||
|
"tool_name": tool_name,
|
||||||
|
"payload": tool_response.get("payload", {}),
|
||||||
|
"request": arguments,
|
||||||
|
"response": tool_response,
|
||||||
|
"received_at": tool_response.get("received_at"),
|
||||||
|
"started_at": started_at,
|
||||||
|
"finished_at": _utc_now_iso(),
|
||||||
|
}
|
||||||
|
|
||||||
|
session_state.setdefault("steps", {})[step_name] = step_payload
|
||||||
|
if emitter is not None:
|
||||||
|
await emitter.emit(
|
||||||
|
StepFinishedEvent(
|
||||||
|
run_id=run_id,
|
||||||
|
step_name=step_name,
|
||||||
|
index=step_index,
|
||||||
|
status="success",
|
||||||
|
started_at=started_at,
|
||||||
|
finished_at=step_payload["finished_at"],
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
return StepOutput(
|
||||||
|
content=json.dumps(step_payload, ensure_ascii=False),
|
||||||
|
success=True,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
finished_at = _utc_now_iso()
|
||||||
|
error_payload = {
|
||||||
|
"ok": False,
|
||||||
|
"tool_name": tool_name,
|
||||||
|
"error": str(exc),
|
||||||
|
"started_at": started_at,
|
||||||
|
"finished_at": finished_at,
|
||||||
|
}
|
||||||
|
session_state.setdefault("steps", {})[step_name] = error_payload
|
||||||
|
logger.exception("Step {} failed (tool={})", step_name, tool_name)
|
||||||
|
if emitter is not None:
|
||||||
|
await emitter.emit(
|
||||||
|
StepFinishedEvent(
|
||||||
|
run_id=run_id,
|
||||||
|
step_name=step_name,
|
||||||
|
index=step_index,
|
||||||
|
status="failed",
|
||||||
|
started_at=started_at,
|
||||||
|
finished_at=finished_at,
|
||||||
|
message=str(exc),
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
raise RuntimeError(f"{step_name} failed: {exc}") from exc
|
||||||
|
|
||||||
|
return executor
|
||||||
|
|
||||||
|
|
||||||
|
def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
|
||||||
|
raw_steps = scenario.get("steps")
|
||||||
|
if not isinstance(raw_steps, list) or not raw_steps:
|
||||||
|
raise ScenarioStoreError("Scenario must contain non-empty steps list")
|
||||||
|
|
||||||
|
workflow_steps: list[Step] = []
|
||||||
|
for step_index, raw_step in enumerate(raw_steps):
|
||||||
|
if not isinstance(raw_step, dict):
|
||||||
|
raise ScenarioStoreError("Each scenario step must be object")
|
||||||
|
if raw_step.get("type") != "tool":
|
||||||
|
raise ScenarioStoreError("This runner supports only tool steps")
|
||||||
|
|
||||||
|
step_name = str(raw_step.get("name", "")).strip()
|
||||||
|
tool_name = str(raw_step.get("tool", step_name)).strip()
|
||||||
|
if not step_name or not tool_name:
|
||||||
|
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
|
||||||
|
|
||||||
|
# Fail-fast by design: the run is considered successful only when every
|
||||||
|
# step passes. There is no per-step retry or skip policy — downstream
|
||||||
|
# steps rely on upstream output, so on any failure the workflow stops.
|
||||||
|
workflow_steps.append(
|
||||||
|
Step(
|
||||||
|
name=step_name,
|
||||||
|
description=str(raw_step.get("description", step_name)),
|
||||||
|
executor=_build_tool_executor(raw_step, step_index=step_index),
|
||||||
|
max_retries=0,
|
||||||
|
on_error="fail",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return Workflow(
|
||||||
|
name=scenario_id,
|
||||||
|
description=str(scenario.get("description", "")),
|
||||||
|
steps=workflow_steps,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_WORKFLOW_CACHE_MAX_SIZE = _env_int("WORKFLOW_CACHE_MAX_SIZE", 64)
|
||||||
|
_workflow_cache: "OrderedDict[str, Workflow]" = OrderedDict()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
|
||||||
|
cached = _workflow_cache.get(scenario_id)
|
||||||
|
if cached is not None:
|
||||||
|
_workflow_cache.move_to_end(scenario_id)
|
||||||
|
return cached
|
||||||
|
workflow = _build_workflow(scenario_id, scenario)
|
||||||
|
_workflow_cache[scenario_id] = workflow
|
||||||
|
if len(_workflow_cache) > _WORKFLOW_CACHE_MAX_SIZE:
|
||||||
|
evicted_id, _ = _workflow_cache.popitem(last=False)
|
||||||
|
logger.debug("Evicted workflow from LRU cache: {}", evicted_id)
|
||||||
|
return workflow
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_output_summary(result: dict[str, Any] | None) -> str | None:
|
||||||
|
if not isinstance(result, dict):
|
||||||
|
return None
|
||||||
|
summary = result.get("summary")
|
||||||
|
if isinstance(summary, str) and summary:
|
||||||
|
return summary
|
||||||
|
payload = result.get("payload")
|
||||||
|
if isinstance(payload, dict):
|
||||||
|
payload_summary = payload.get("summary")
|
||||||
|
if isinstance(payload_summary, str) and payload_summary:
|
||||||
|
return payload_summary
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _build_step_states(
|
||||||
|
scenario: dict[str, Any],
|
||||||
|
steps_payloads: dict[str, Any],
|
||||||
|
) -> list[StepState]:
|
||||||
|
raw_steps = scenario.get("steps")
|
||||||
|
if not isinstance(raw_steps, list):
|
||||||
|
return []
|
||||||
|
|
||||||
|
states: list[StepState] = []
|
||||||
|
for raw_step in raw_steps:
|
||||||
|
if not isinstance(raw_step, dict):
|
||||||
|
continue
|
||||||
|
name = str(raw_step.get("name", "")).strip()
|
||||||
|
if not name:
|
||||||
|
continue
|
||||||
|
payload = steps_payloads.get(name)
|
||||||
|
if not isinstance(payload, dict):
|
||||||
|
# Workflow aborted before this step ran (strict fail-fast policy).
|
||||||
|
states.append(
|
||||||
|
StepState(
|
||||||
|
node_id=name,
|
||||||
|
status="skipped",
|
||||||
|
message="",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
ok = bool(payload.get("ok", False))
|
||||||
|
states.append(
|
||||||
|
StepState(
|
||||||
|
node_id=name,
|
||||||
|
status="success" if ok else "failed",
|
||||||
|
started_at=str(payload.get("started_at") or "") or None,
|
||||||
|
finished_at=str(payload.get("finished_at") or "") or None,
|
||||||
|
message="" if ok else str(payload.get("error", f"{name} failed")),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return states
|
||||||
|
|
||||||
|
|
||||||
|
async def run_scenario(
|
||||||
|
*,
|
||||||
|
scenario_id: str,
|
||||||
|
input_data: dict[str, Any],
|
||||||
|
emitter: EventEmitter | None = None,
|
||||||
|
run_id: str = "",
|
||||||
|
) -> ScenarioRunResponse:
|
||||||
|
try:
|
||||||
|
scenario = load_scenario_definition(scenario_id)
|
||||||
|
except ScenarioStoreError as exc:
|
||||||
|
return ScenarioRunResponse(
|
||||||
|
scenario_id=scenario_id,
|
||||||
|
status="failed",
|
||||||
|
message=str(exc),
|
||||||
|
input=input_data,
|
||||||
|
run_id=run_id or None,
|
||||||
|
)
|
||||||
|
|
||||||
|
scenario_name = str(scenario.get("name", scenario_id))
|
||||||
|
try:
|
||||||
|
workflow = _get_workflow(scenario_id, scenario)
|
||||||
|
except ScenarioStoreError as exc:
|
||||||
|
return ScenarioRunResponse(
|
||||||
|
scenario_id=scenario_id,
|
||||||
|
status="failed",
|
||||||
|
message=str(exc),
|
||||||
|
input=input_data,
|
||||||
|
scenario_name=scenario_name,
|
||||||
|
run_id=run_id or None,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fresh per-run state that Agno owns during arun(..., session_state=...).
|
||||||
|
session_state: dict[str, Any] = {
|
||||||
|
"input": deepcopy(input_data),
|
||||||
|
"steps": {},
|
||||||
|
"_emitter": emitter,
|
||||||
|
"_run_id": run_id,
|
||||||
|
}
|
||||||
|
|
||||||
|
workflow_error: str | None = None
|
||||||
|
run_output: Any = None
|
||||||
|
try:
|
||||||
|
run_output = await workflow.arun(
|
||||||
|
input=input_data,
|
||||||
|
session_state=session_state,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
workflow_error = str(exc)
|
||||||
|
logger.exception("Workflow {} failed", scenario_id)
|
||||||
|
|
||||||
|
steps_payloads = session_state.get("steps", {}) or {}
|
||||||
|
step_states = _build_step_states(scenario, steps_payloads)
|
||||||
|
|
||||||
|
# Strict invariant: run is success only when every recorded step payload
|
||||||
|
# has ok=true. `on_error: skip` lets downstream steps keep running after a
|
||||||
|
# failure, but it does NOT whitewash the overall run status.
|
||||||
|
status = "success"
|
||||||
|
if workflow_error is not None:
|
||||||
|
status = "failed"
|
||||||
|
else:
|
||||||
|
for payload in steps_payloads.values():
|
||||||
|
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
|
||||||
|
status = "failed"
|
||||||
|
break
|
||||||
|
if run_output is not None and not bool(getattr(run_output, "success", True)):
|
||||||
|
status = "failed"
|
||||||
|
|
||||||
|
content = getattr(run_output, "content", None)
|
||||||
|
if isinstance(content, str):
|
||||||
|
try:
|
||||||
|
content = json.loads(content)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
content = {"raw_content": content}
|
||||||
|
if content is None:
|
||||||
|
for payload in reversed(list(steps_payloads.values())):
|
||||||
|
if isinstance(payload, dict):
|
||||||
|
content = deepcopy(payload)
|
||||||
|
break
|
||||||
|
if content is None and workflow_error is not None:
|
||||||
|
content = {"message": workflow_error}
|
||||||
|
|
||||||
|
result = content if isinstance(content, dict) else {"raw_content": content}
|
||||||
|
response_message = "" if status == "success" else (workflow_error or "failed")
|
||||||
|
|
||||||
|
return ScenarioRunResponse(
|
||||||
|
scenario_id=scenario_id,
|
||||||
|
status=status,
|
||||||
|
message=response_message,
|
||||||
|
input=input_data,
|
||||||
|
steps=step_states,
|
||||||
|
output_summary=_extract_output_summary(result),
|
||||||
|
scenario_name=scenario_name,
|
||||||
|
workflow_name=workflow.name,
|
||||||
|
result=result,
|
||||||
|
run_id=run_id or str(getattr(run_output, "run_id", "")) or None,
|
||||||
|
session_id=str(getattr(run_output, "session_id", "")) or None,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def run_scenario_async(record: RunRecord) -> None:
|
||||||
|
"""Execute a scenario inside a background task, emitting SSE events.
|
||||||
|
|
||||||
|
Lifecycle:
|
||||||
|
queued → running (run_started) → success|failed (run_finished) → sentinel
|
||||||
|
"""
|
||||||
|
emitter = EventEmitter(record)
|
||||||
|
record.status = "running"
|
||||||
|
record.started_at = _utc_now_iso()
|
||||||
|
|
||||||
|
try:
|
||||||
|
await emitter.emit(
|
||||||
|
RunStartedEvent(
|
||||||
|
run_id=record.run_id,
|
||||||
|
scenario_id=record.scenario_id,
|
||||||
|
started_at=record.started_at,
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
|
||||||
|
response = await run_scenario(
|
||||||
|
scenario_id=record.scenario_id,
|
||||||
|
input_data=record.input,
|
||||||
|
emitter=emitter,
|
||||||
|
run_id=record.run_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
record.response = response
|
||||||
|
record.status = response.status
|
||||||
|
record.message = response.message
|
||||||
|
record.finished_at = _utc_now_iso()
|
||||||
|
|
||||||
|
await emitter.emit(
|
||||||
|
RunFinishedEvent(
|
||||||
|
run_id=record.run_id,
|
||||||
|
status=response.status,
|
||||||
|
finished_at=record.finished_at,
|
||||||
|
message=response.message,
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
record.status = "failed"
|
||||||
|
record.message = "cancelled"
|
||||||
|
record.finished_at = _utc_now_iso()
|
||||||
|
await emitter.emit(
|
||||||
|
RunFinishedEvent(
|
||||||
|
run_id=record.run_id,
|
||||||
|
status="failed",
|
||||||
|
finished_at=record.finished_at,
|
||||||
|
message="cancelled",
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
logger.exception("Run {} crashed", record.run_id)
|
||||||
|
record.status = "failed"
|
||||||
|
record.message = str(exc)
|
||||||
|
record.finished_at = _utc_now_iso()
|
||||||
|
await emitter.emit(
|
||||||
|
RunFinishedEvent(
|
||||||
|
run_id=record.run_id,
|
||||||
|
status="failed",
|
||||||
|
finished_at=record.finished_at,
|
||||||
|
message=str(exc),
|
||||||
|
).model_dump()
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await emitter.close()
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
"""Phoenix (Arize) OpenTelemetry tracing setup.
|
||||||
|
|
||||||
|
Tracing is initialized via the FastAPI lifespan so that import-time side effects
|
||||||
|
stay out of module load. ``is_phoenix_tracing_enabled`` is cheap and can be
|
||||||
|
consulted before the app starts (for example, to pass a flag into AgentOS).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from phoenix.otel import register
|
||||||
|
|
||||||
|
_initialized = False
|
||||||
|
|
||||||
|
|
||||||
|
def _env_bool(name: str, default: bool) -> bool:
|
||||||
|
value = os.getenv(name)
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
return value.strip().lower() in {"1", "true", "yes", "on"}
|
||||||
|
|
||||||
|
|
||||||
|
def is_phoenix_tracing_enabled() -> bool:
|
||||||
|
return _env_bool("PHOENIX_TRACING_ENABLED", False)
|
||||||
|
|
||||||
|
|
||||||
|
def init_phoenix_tracing() -> bool:
|
||||||
|
global _initialized
|
||||||
|
|
||||||
|
if not is_phoenix_tracing_enabled():
|
||||||
|
return False
|
||||||
|
|
||||||
|
if _initialized:
|
||||||
|
return True
|
||||||
|
|
||||||
|
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = os.getenv(
|
||||||
|
"PHOENIX_COLLECTOR_ENDPOINT",
|
||||||
|
"http://localhost:6006",
|
||||||
|
)
|
||||||
|
|
||||||
|
project_name = os.getenv("PHOENIX_PROJECT_NAME", "prisma-platform")
|
||||||
|
register(
|
||||||
|
project_name=project_name,
|
||||||
|
auto_instrument=True,
|
||||||
|
)
|
||||||
|
_initialized = True
|
||||||
|
logger.info("Phoenix tracing initialized (project={})", project_name)
|
||||||
|
return True
|
||||||
@@ -0,0 +1,126 @@
|
|||||||
|
"""In-memory registry of scenario runs and their event streams.
|
||||||
|
|
||||||
|
Each submitted run gets a ``RunRecord`` holding:
|
||||||
|
|
||||||
|
- mutable status / partial step state updated by the workflow runner;
|
||||||
|
- an append-only event log (used to replay history to late SSE subscribers);
|
||||||
|
- a live asyncio queue that SSE endpoints tail until a terminal ``None``
|
||||||
|
sentinel is delivered.
|
||||||
|
|
||||||
|
The registry is an LRU with ``RUN_REGISTRY_MAX_SIZE`` bound so long-running
|
||||||
|
processes do not leak run history.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import uuid
|
||||||
|
from collections import OrderedDict
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
from src.schemas import ScenarioRunResponse
|
||||||
|
|
||||||
|
|
||||||
|
_TERMINAL_STATUSES = {"success", "failed"}
|
||||||
|
|
||||||
|
|
||||||
|
def _utc_now_iso() -> str:
|
||||||
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def _env_int(name: str, default: int) -> int:
|
||||||
|
value = os.getenv(name)
|
||||||
|
return int(value) if value is not None else default
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class RunRecord:
|
||||||
|
run_id: str
|
||||||
|
scenario_id: str
|
||||||
|
input: dict[str, Any]
|
||||||
|
status: str = "queued"
|
||||||
|
started_at: str = field(default_factory=_utc_now_iso)
|
||||||
|
finished_at: str | None = None
|
||||||
|
message: str = ""
|
||||||
|
response: ScenarioRunResponse | None = None
|
||||||
|
events: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
subscribers: list[asyncio.Queue] = field(default_factory=list)
|
||||||
|
task: asyncio.Task | None = None
|
||||||
|
|
||||||
|
def is_terminal(self) -> bool:
|
||||||
|
return self.status in _TERMINAL_STATUSES
|
||||||
|
|
||||||
|
|
||||||
|
class EventEmitter:
|
||||||
|
"""Fans events out to every live SSE subscriber plus the replay buffer.
|
||||||
|
|
||||||
|
Subscribers are ``asyncio.Queue`` instances registered by SSE endpoints;
|
||||||
|
``None`` is used as a terminal sentinel so consumers can exit cleanly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, record: RunRecord) -> None:
|
||||||
|
self._record = record
|
||||||
|
|
||||||
|
async def emit(self, event: dict[str, Any]) -> None:
|
||||||
|
self._record.events.append(event)
|
||||||
|
for queue in list(self._record.subscribers):
|
||||||
|
queue.put_nowait(event)
|
||||||
|
|
||||||
|
async def close(self) -> None:
|
||||||
|
for queue in list(self._record.subscribers):
|
||||||
|
queue.put_nowait(None)
|
||||||
|
|
||||||
|
|
||||||
|
class RunRegistry:
|
||||||
|
def __init__(self, max_size: int | None = None) -> None:
|
||||||
|
self._records: "OrderedDict[str, RunRecord]" = OrderedDict()
|
||||||
|
self._max_size = max_size or _env_int("RUN_REGISTRY_MAX_SIZE", 200)
|
||||||
|
|
||||||
|
def create(self, *, scenario_id: str, input_data: dict[str, Any]) -> RunRecord:
|
||||||
|
run_id = str(uuid.uuid4())
|
||||||
|
record = RunRecord(
|
||||||
|
run_id=run_id,
|
||||||
|
scenario_id=scenario_id,
|
||||||
|
input=input_data,
|
||||||
|
)
|
||||||
|
self._records[run_id] = record
|
||||||
|
self._evict_if_needed()
|
||||||
|
logger.info("Run {} created for scenario={}", run_id, scenario_id)
|
||||||
|
return record
|
||||||
|
|
||||||
|
def get(self, run_id: str) -> RunRecord | None:
|
||||||
|
record = self._records.get(run_id)
|
||||||
|
if record is not None:
|
||||||
|
self._records.move_to_end(run_id)
|
||||||
|
return record
|
||||||
|
|
||||||
|
def list_active(self) -> list[RunRecord]:
|
||||||
|
return [r for r in self._records.values() if not r.is_terminal()]
|
||||||
|
|
||||||
|
def _evict_if_needed(self) -> None:
|
||||||
|
while len(self._records) > self._max_size:
|
||||||
|
evicted_id, evicted = self._records.popitem(last=False)
|
||||||
|
if evicted.task is not None and not evicted.task.done():
|
||||||
|
# Refuse to silently drop an in-flight run — re-insert and stop.
|
||||||
|
self._records[evicted_id] = evicted
|
||||||
|
self._records.move_to_end(evicted_id, last=False)
|
||||||
|
logger.warning(
|
||||||
|
"Run registry at capacity {} but oldest run is still active; "
|
||||||
|
"not evicting {}",
|
||||||
|
self._max_size,
|
||||||
|
evicted_id,
|
||||||
|
)
|
||||||
|
return
|
||||||
|
logger.debug("Evicted run {} from registry", evicted_id)
|
||||||
|
|
||||||
|
|
||||||
|
_registry = RunRegistry()
|
||||||
|
|
||||||
|
|
||||||
|
def get_registry() -> RunRegistry:
|
||||||
|
return _registry
|
||||||
@@ -0,0 +1,87 @@
|
|||||||
|
"""File-backed loader for scenario definitions.
|
||||||
|
|
||||||
|
Scenarios live under ``scenarios/`` and are indexed by ``scenarios/index.json``.
|
||||||
|
Each scenario is a JSON object with a ``scenario_id`` that must match the
|
||||||
|
index key it was looked up by.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
class ScenarioStoreError(ValueError):
|
||||||
|
"""Raised when scenario definitions are missing or invalid."""
|
||||||
|
|
||||||
|
|
||||||
|
_SCENARIOS_ROOT = Path(__file__).resolve().parent.parent / "scenarios"
|
||||||
|
_INDEX_PATH = _SCENARIOS_ROOT / "index.json"
|
||||||
|
|
||||||
|
|
||||||
|
def _read_json(path: Path) -> dict[str, Any]:
|
||||||
|
try:
|
||||||
|
raw = path.read_text(encoding="utf-8")
|
||||||
|
except FileNotFoundError as exc:
|
||||||
|
raise ScenarioStoreError(f"Scenario file not found: {path}") from exc
|
||||||
|
|
||||||
|
try:
|
||||||
|
parsed = json.loads(raw)
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
raise ScenarioStoreError(f"Invalid JSON in file: {path}") from exc
|
||||||
|
|
||||||
|
if not isinstance(parsed, dict):
|
||||||
|
raise ScenarioStoreError(f"JSON root must be object: {path}")
|
||||||
|
return parsed
|
||||||
|
|
||||||
|
|
||||||
|
def load_scenario_index() -> dict[str, str]:
|
||||||
|
index = _read_json(_INDEX_PATH)
|
||||||
|
scenarios = index.get("scenarios")
|
||||||
|
if not isinstance(scenarios, dict):
|
||||||
|
raise ScenarioStoreError("index.json must contain object field 'scenarios'")
|
||||||
|
|
||||||
|
normalized: dict[str, str] = {}
|
||||||
|
for scenario_id, relative_path in scenarios.items():
|
||||||
|
if not isinstance(scenario_id, str) or not isinstance(relative_path, str):
|
||||||
|
raise ScenarioStoreError("index.json scenario entries must be string -> string")
|
||||||
|
normalized[scenario_id] = relative_path
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
|
||||||
|
def load_scenario_definition(scenario_id: str) -> dict[str, Any]:
|
||||||
|
index = load_scenario_index()
|
||||||
|
relative_path = index.get(scenario_id)
|
||||||
|
if relative_path is None:
|
||||||
|
raise ScenarioStoreError(f"Unknown scenario_id: {scenario_id}")
|
||||||
|
|
||||||
|
scenario_path = _SCENARIOS_ROOT / relative_path
|
||||||
|
scenario = _read_json(scenario_path)
|
||||||
|
|
||||||
|
declared_id = scenario.get("scenario_id")
|
||||||
|
if declared_id != scenario_id:
|
||||||
|
raise ScenarioStoreError(
|
||||||
|
"Scenario file scenario_id does not match requested scenario_id"
|
||||||
|
)
|
||||||
|
return scenario
|
||||||
|
|
||||||
|
|
||||||
|
def list_scenario_summaries() -> list[dict[str, Any]]:
|
||||||
|
"""Return metadata for every scenario in the index (no steps)."""
|
||||||
|
summaries: list[dict[str, Any]] = []
|
||||||
|
for scenario_id in load_scenario_index().keys():
|
||||||
|
try:
|
||||||
|
scenario = load_scenario_definition(scenario_id)
|
||||||
|
except ScenarioStoreError:
|
||||||
|
# Broken entry in the index should not take the whole catalog down.
|
||||||
|
continue
|
||||||
|
summaries.append(
|
||||||
|
{
|
||||||
|
"scenario_id": scenario_id,
|
||||||
|
"name": scenario.get("name"),
|
||||||
|
"description": scenario.get("description"),
|
||||||
|
"input_schema": scenario.get("input_schema"),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
return summaries
|
||||||
+278
@@ -0,0 +1,278 @@
|
|||||||
|
"""Pydantic schemas for the scenario-run REST API."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Literal
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
|
RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
|
||||||
|
StepStatus = Literal["queued", "running", "success", "failed", "skipped", "waiting_human"]
|
||||||
|
EventType = Literal["run_started", "step_started", "step_finished", "run_finished"]
|
||||||
|
|
||||||
|
|
||||||
|
class ScenarioRunRequest(BaseModel):
|
||||||
|
scenario_id: str = "news_source_discovery_v1"
|
||||||
|
input: dict[str, Any] = Field(default_factory=dict)
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"input": {"url": "https://example.com/news/article"},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class StepState(BaseModel):
|
||||||
|
node_id: str
|
||||||
|
status: StepStatus
|
||||||
|
started_at: str | None = None
|
||||||
|
finished_at: str | None = None
|
||||||
|
message: str = ""
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"node_id": "search_news_sources",
|
||||||
|
"status": "success",
|
||||||
|
"started_at": "2026-04-24T09:27:59.875680+00:00",
|
||||||
|
"finished_at": "2026-04-24T09:28:00.028730+00:00",
|
||||||
|
"message": "",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ScenarioRunResponse(BaseModel):
|
||||||
|
scenario_id: str
|
||||||
|
status: RunStatus
|
||||||
|
message: str = ""
|
||||||
|
input: dict[str, Any]
|
||||||
|
steps: list[StepState] = Field(default_factory=list)
|
||||||
|
output_summary: str | None = None
|
||||||
|
scenario_name: str | None = None
|
||||||
|
workflow_name: str | None = None
|
||||||
|
result: dict[str, Any] | None = None
|
||||||
|
run_id: str | None = None
|
||||||
|
session_id: str | None = None
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"status": "success",
|
||||||
|
"message": "",
|
||||||
|
"input": {"url": "https://example.com/news/article"},
|
||||||
|
"steps": [
|
||||||
|
{
|
||||||
|
"node_id": "search_news_sources",
|
||||||
|
"status": "success",
|
||||||
|
"started_at": "2026-04-24T09:27:59.875680+00:00",
|
||||||
|
"finished_at": "2026-04-24T09:28:00.028730+00:00",
|
||||||
|
"message": "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"node_id": "generate_summary",
|
||||||
|
"status": "success",
|
||||||
|
"started_at": "2026-04-24T09:28:00.781744+00:00",
|
||||||
|
"finished_at": "2026-04-24T09:28:00.879028+00:00",
|
||||||
|
"message": "",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
"output_summary": "Самым ранним источником считается https://news-a.example/article-1",
|
||||||
|
"scenario_name": "News Source Discovery V1",
|
||||||
|
"workflow_name": "news_source_discovery_v1",
|
||||||
|
"result": {
|
||||||
|
"ok": True,
|
||||||
|
"tool_name": "generate_summary",
|
||||||
|
"payload": {
|
||||||
|
"input_count": 3,
|
||||||
|
"summary": "Самым ранним источником считается https://news-a.example/article-1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
|
||||||
|
"session_id": None,
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class RunSubmitResponse(BaseModel):
|
||||||
|
run_id: str
|
||||||
|
scenario_id: str
|
||||||
|
status: RunStatus
|
||||||
|
input: dict[str, Any]
|
||||||
|
started_at: str
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"status": "queued",
|
||||||
|
"input": {"url": "https://example.com/news/article"},
|
||||||
|
"started_at": "2026-04-24T09:27:59.873049+00:00",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ScenarioSummary(BaseModel):
|
||||||
|
scenario_id: str
|
||||||
|
name: str | None = None
|
||||||
|
description: str | None = None
|
||||||
|
input_schema: dict[str, Any] | None = None
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"name": "News Source Discovery V1",
|
||||||
|
"description": "Find earliest news source using sequential MCP tools.",
|
||||||
|
"input_schema": {
|
||||||
|
"type": "object",
|
||||||
|
"required": ["url"],
|
||||||
|
"properties": {
|
||||||
|
"url": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "URL of source news article",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ToolSummary(BaseModel):
|
||||||
|
name: str
|
||||||
|
description: str | None = None
|
||||||
|
input_schema: dict[str, Any] | None = None
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"name": "search_news_sources",
|
||||||
|
"description": "Search for candidate news source URLs for a given article.",
|
||||||
|
"input_schema": {
|
||||||
|
"type": "object",
|
||||||
|
"required": ["url"],
|
||||||
|
"properties": {
|
||||||
|
"url": {"type": "string", "title": "Url"}
|
||||||
|
},
|
||||||
|
"title": "search_news_sourcesArguments",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# SSE event models. Client parses by the `type` field.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class RunStartedEvent(BaseModel):
|
||||||
|
type: Literal["run_started"] = "run_started"
|
||||||
|
run_id: str
|
||||||
|
scenario_id: str
|
||||||
|
started_at: str
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"type": "run_started",
|
||||||
|
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
|
||||||
|
"scenario_id": "news_source_discovery_v1",
|
||||||
|
"started_at": "2026-04-24T09:27:59.873397+00:00",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class StepStartedEvent(BaseModel):
|
||||||
|
type: Literal["step_started"] = "step_started"
|
||||||
|
run_id: str
|
||||||
|
step_name: str
|
||||||
|
index: int
|
||||||
|
started_at: str
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"type": "step_started",
|
||||||
|
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
|
||||||
|
"step_name": "search_news_sources",
|
||||||
|
"index": 0,
|
||||||
|
"started_at": "2026-04-24T09:27:59.875680+00:00",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class StepFinishedEvent(BaseModel):
|
||||||
|
type: Literal["step_finished"] = "step_finished"
|
||||||
|
run_id: str
|
||||||
|
step_name: str
|
||||||
|
index: int
|
||||||
|
status: StepStatus
|
||||||
|
started_at: str | None = None
|
||||||
|
finished_at: str | None = None
|
||||||
|
message: str = ""
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"type": "step_finished",
|
||||||
|
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
|
||||||
|
"step_name": "search_news_sources",
|
||||||
|
"index": 0,
|
||||||
|
"status": "success",
|
||||||
|
"started_at": "2026-04-24T09:27:59.875680+00:00",
|
||||||
|
"finished_at": "2026-04-24T09:28:00.028730+00:00",
|
||||||
|
"message": "",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class RunFinishedEvent(BaseModel):
|
||||||
|
type: Literal["run_finished"] = "run_finished"
|
||||||
|
run_id: str
|
||||||
|
status: RunStatus
|
||||||
|
finished_at: str
|
||||||
|
message: str = ""
|
||||||
|
|
||||||
|
model_config = {
|
||||||
|
"json_schema_extra": {
|
||||||
|
"examples": [
|
||||||
|
{
|
||||||
|
"type": "run_finished",
|
||||||
|
"run_id": "76d6903c-f520-4a40-b0fc-8fed3f7955d2",
|
||||||
|
"status": "success",
|
||||||
|
"finished_at": "2026-04-24T09:28:01.750206+00:00",
|
||||||
|
"message": "",
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,145 @@
|
|||||||
|
"""LLM-backed fallback planner for MCP tool arguments.
|
||||||
|
|
||||||
|
When a step's resolved arguments are missing required fields, this module
|
||||||
|
calls an OpenAI-compatible chat completion to fill them from the current
|
||||||
|
scope (``input`` + prior ``steps``). The planner is best-effort: on any
|
||||||
|
failure it returns the base arguments unchanged so the caller's validator
|
||||||
|
can produce a clean error.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
from openai import AsyncOpenAI
|
||||||
|
|
||||||
|
|
||||||
|
_planner_client: AsyncOpenAI | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _env_float(name: str, default: float) -> float:
|
||||||
|
value = os.getenv(name)
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
return float(value)
|
||||||
|
|
||||||
|
|
||||||
|
def planner_enabled() -> bool:
|
||||||
|
return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_client() -> AsyncOpenAI:
|
||||||
|
global _planner_client
|
||||||
|
if _planner_client is not None:
|
||||||
|
return _planner_client
|
||||||
|
_planner_client = AsyncOpenAI(
|
||||||
|
base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"),
|
||||||
|
api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"),
|
||||||
|
)
|
||||||
|
return _planner_client
|
||||||
|
|
||||||
|
|
||||||
|
def _response_schema(required_fields: list[str]) -> dict[str, Any]:
|
||||||
|
value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]}
|
||||||
|
return {
|
||||||
|
"name": "mcp_arguments",
|
||||||
|
"strict": True,
|
||||||
|
"schema": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"arguments": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {f: value_schema for f in required_fields},
|
||||||
|
"required": required_fields,
|
||||||
|
"additionalProperties": True,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["arguments"],
|
||||||
|
"additionalProperties": False,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_arguments(content: Any) -> dict[str, Any]:
|
||||||
|
candidate: Any = content
|
||||||
|
if isinstance(candidate, str):
|
||||||
|
text = candidate.strip()
|
||||||
|
if text.startswith("```"):
|
||||||
|
text = text.strip("`").strip()
|
||||||
|
if text.startswith("json"):
|
||||||
|
text = text[4:].strip()
|
||||||
|
try:
|
||||||
|
candidate = json.loads(text)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {}
|
||||||
|
if isinstance(candidate, dict):
|
||||||
|
if isinstance(candidate.get("arguments"), dict):
|
||||||
|
return candidate["arguments"]
|
||||||
|
return candidate
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
async def plan_arguments(
|
||||||
|
*,
|
||||||
|
step_name: str,
|
||||||
|
tool_name: str,
|
||||||
|
base_arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
scope: dict[str, Any],
|
||||||
|
missing_fields: list[str],
|
||||||
|
attempt_no: int,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Fallback planner: asks an LLM to fill missing required fields from context.
|
||||||
|
|
||||||
|
Returns merged arguments (base + planned). On any failure returns base_arguments
|
||||||
|
unchanged — caller is responsible for validating required fields afterwards.
|
||||||
|
"""
|
||||||
|
prompt = {
|
||||||
|
"task": "Prepare MCP arguments for this step.",
|
||||||
|
"step_name": step_name,
|
||||||
|
"tool_name": tool_name,
|
||||||
|
"required_fields": required_fields,
|
||||||
|
"base_arguments": base_arguments,
|
||||||
|
"missing_fields": missing_fields,
|
||||||
|
"repair_attempt": attempt_no,
|
||||||
|
"context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})},
|
||||||
|
"output": (
|
||||||
|
"Return only JSON object with key 'arguments'. "
|
||||||
|
"Fill every missing field from context."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
completion = await _get_client().chat.completions.create(
|
||||||
|
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
|
||||||
|
messages=[
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": (
|
||||||
|
"You are a tool-input planner. "
|
||||||
|
"Return only JSON that matches the provided schema."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
|
||||||
|
],
|
||||||
|
response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)},
|
||||||
|
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
|
||||||
|
)
|
||||||
|
raw = completion.choices[0].message.content if completion.choices else ""
|
||||||
|
planned = _extract_arguments(raw)
|
||||||
|
except Exception:
|
||||||
|
logger.warning(
|
||||||
|
"Planner call failed for step={} tool={} attempt={}",
|
||||||
|
step_name,
|
||||||
|
tool_name,
|
||||||
|
attempt_no,
|
||||||
|
)
|
||||||
|
planned = {}
|
||||||
|
|
||||||
|
merged = deepcopy(base_arguments)
|
||||||
|
if isinstance(planned, dict):
|
||||||
|
merged.update(planned)
|
||||||
|
return merged
|
||||||
@@ -0,0 +1,58 @@
|
|||||||
|
"""Variable templating for scenario step inputs.
|
||||||
|
|
||||||
|
A dict of shape ``{"from": "path.to.value"}`` resolves to the value at that
|
||||||
|
dotted path in the current scope. Nested dicts/lists are resolved
|
||||||
|
recursively; plain values pass through via ``deepcopy``.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_path(scope: dict[str, Any], path: str) -> Any:
|
||||||
|
value: Any = scope
|
||||||
|
for segment in path.split("."):
|
||||||
|
key = segment.strip()
|
||||||
|
if not key:
|
||||||
|
continue
|
||||||
|
if not isinstance(value, dict):
|
||||||
|
return None
|
||||||
|
value = value.get(key)
|
||||||
|
return deepcopy(value)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_template(template: Any, scope: dict[str, Any]) -> Any:
|
||||||
|
if isinstance(template, dict):
|
||||||
|
if set(template.keys()) == {"from"}:
|
||||||
|
return resolve_path(scope, str(template["from"]))
|
||||||
|
return {key: resolve_template(value, scope) for key, value in template.items()}
|
||||||
|
if isinstance(template, list):
|
||||||
|
return [resolve_template(item, scope) for item in template]
|
||||||
|
return deepcopy(template)
|
||||||
|
|
||||||
|
|
||||||
|
def missing_required_fields(
|
||||||
|
arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
) -> list[str]:
|
||||||
|
missing: list[str] = []
|
||||||
|
for field in required_fields:
|
||||||
|
value = arguments.get(field)
|
||||||
|
if isinstance(value, str) and value.strip():
|
||||||
|
continue
|
||||||
|
if value not in (None, "", [], {}):
|
||||||
|
continue
|
||||||
|
missing.append(field)
|
||||||
|
return missing
|
||||||
|
|
||||||
|
|
||||||
|
def validate_required_fields(
|
||||||
|
arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
step_name: str,
|
||||||
|
) -> None:
|
||||||
|
missing = missing_required_fields(arguments, required_fields)
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"{step_name}: missing required fields: {', '.join(missing)}")
|
||||||
Reference in New Issue
Block a user