Compare commits

..

11 Commits

Author SHA1 Message Date
Barabashka 4d037e52eb Упрощение MCP workflow runner и обновить контракт /api/runs.
Перенесены planner/template хелперы в отдельные модули, выровнен формат статусов и сообщений в ответе, а также обновлены .env.example и README под текущие переменные и поведение API.
2026-04-23 12:41:33 +03:00
Barabashka 5ca49821ba Промежуточный вариант: ужесточить planner recovery и fail-fast workflow.
Перевел планирование аргументов на строгий json_schema response_format, добавил сценарий с битыми полями для проверки восстановления и остановку workflow на первой ошибке шага. Сейчас используется Polza.ai.
2026-04-22 17:45:17 +03:00
Barabashka ad828885e3 Перевести исполнение сценариев на MCP workflow runner.
Удален legacy workflow_runner со stub-инструментами, добавлен mcp_client и новый mcp_workflow_runner с planner-моделью через polza.ai, обновлены сценарий, API/AgentOS wiring и документация под текущий контур запуска.
2026-04-22 16:37:17 +03:00
Barabashka 93ee7aea1c Унифицировать ответ /api/runs и добавить статусы шагов workflow.
Введен единый JSON-контракт для success/failed с общими полями, добавлен трекинг step status (queued/running/success/failed) и output_summary, а сборка run-ответа централизована через общий helper.
2026-04-22 12:28:47 +03:00
Barabashka 9068b7fe07 Удалить CLI entrypoint и оставить HTTP-only запуск через AgentOS.
Убран неиспользуемый run_agent/main.py и обновлен README, чтобы запуск и документация соответствовали текущей FastAPI архитектуре.
2026-04-21 17:42:50 +03:00
Barabashka 0fbd7dce1a Добавить FastAPI endpoint запуска сценария через AgentOS base_app.
Подключен верхний HTTP-слой с POST /api/runs и обновлены схемы/README, чтобы запуск сценариев шел через единый API-контракт поверх Agno workflow.
2026-04-21 17:38:03 +03:00
Barabashka d341941f87 Улучшить валидацию входа workflow и вынести схемы ответов.
Подключена pydantic-валидация input_schema для сценария, а модели успешного и ошибочного результата запуска вынесены в отдельный модуль для более явных boundary-контрактов.
2026-04-21 17:21:35 +03:00
Barabashka 2b0c748474 Вынести сценарий в JSON и добавить динамический loader.
Переключает запуск workflow на загрузку сценария из файлового хранилища по scenario_id и собирает шаги выполнения из definition.steps вместо хардкода в раннере.
2026-04-21 17:08:20 +03:00
Barabashka 2111964d8b Добавить MVP workflow запуска сценария поиска первоисточника.
Подключает stub-инструменты и последовательный Agno workflow в CLI и AgentOS, чтобы запускать сценарий по URL и получать структурированный JSON-результат.
2026-04-21 16:24:52 +03:00
Barabashka d22db07b43 Подключить локальную трассировку Phoenix для запусков агента.
Добавлена инициализация Phoenix/OpenInference в CLI и AgentOS, а также обновлены зависимости и документация, чтобы трассировка включалась через переменные окружения.
2026-04-21 15:10:27 +03:00
Barabashka 196e9aaf27 Обновление .gitignore 2026-04-21 13:40:36 +03:00
18 changed files with 1167 additions and 80 deletions
+27 -3
View File
@@ -1,9 +1,33 @@
# Agent
AGENT_ID=prisma-agent AGENT_ID=prisma-agent
AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
# Agent model (Ollama)
OLLAMA_MODEL_ID=gemma4:31b OLLAMA_MODEL_ID=gemma4:31b
OLLAMA_HOST=http://localhost:11435 OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0 OLLAMA_TEMPERATURE=0
AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true # API runtime
AGENT_INSTRUCTIONS=You are a helpful assistant. Answer briefly and clearly.
AGENT_OS_HOST=127.0.0.1 AGENT_OS_HOST=127.0.0.1
AGENT_OS_PORT=7777 AGENT_OS_PORT=7777
# Planner
PLANNER_ENABLED=false
PLANNER_REPAIR_ATTEMPTS=3
# Planner model (Polza)
POLZA_BASE_URL=https://api.polza.ai/v1
POLZA_MODEL_ID=google/gemma-4-31b-it
POLZA_API_KEY=key
POLZA_TEMPERATURE=0
# MCP
MCP_BASE_URL=http://127.0.0.1:8081/mcp
MCP_TIMEOUT_SECONDS=10
# Observability (Phoenix)
PHOENIX_TRACING_ENABLED=false
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform
+3
View File
@@ -25,3 +25,6 @@ dist/
.vscode/ .vscode/
.DS_Store .DS_Store
.cursor .cursor
# Cookbook code
vendor/agno/cookbook/
+127 -27
View File
@@ -1,8 +1,19 @@
# Prisma Platform MVP # Prisma Platform MVP
Минимальный чат-агент на Agno + Ollama с рантаймом AgentOS. MVP-реализация сценарного раннера на Agno AgentOS.
В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn). Текущая схема исполнения:
- сценарий хранится в `scenarios/*.json`;
- исполнение идет через `src/mcp_workflow_runner.py`;
- каждый шаг вызывает MCP инструмент через `src/mcp_client.py`;
- для подготовки аргументов шага используется planner-агент с моделью через `polza.ai`.
## Требования
- Python 3.10+
- MCP endpoint (по умолчанию `http://127.0.0.1:8081/mcp`)
- доступ к модели через `polza.ai` (`POLZA_API_KEY`)
## Текущая структура ## Текущая структура
@@ -11,11 +22,23 @@ prisma_platform/
├── .env ├── .env
├── .env.example ├── .env.example
├── requirements.txt ├── requirements.txt
├── scenarios/
│ ├── index.json
│ └── news_source_discovery/
│ ├── v1.json
│ └── v1_planner_repair.json
└── src/ └── src/
├── __init__.py ├── __init__.py
├── api_routes.py
├── agent_os.py ├── agent_os.py
├── agent_runner.py ├── agent_runner.py
── main.py ── mcp_client.py
├── mcp_workflow_runner.py
├── observability.py
├── scenario_store.py
├── step_planner.py
├── template.py
└── schemas.py
``` ```
## Установка ## Установка
@@ -29,44 +52,121 @@ cp .env.example .env
## Запуск ## Запуск
Интерактивный режим чата: 1) Поднимите MCP stub (из соседнего репозитория):
```bash ```bash
python -m src.main cd /home/worker/projects/docker-service/mcp-stub
docker compose up --build -d
``` ```
Режим одного сообщения: 2) Запустите сервер AgentOS:
```bash ```bash
python -m src.main --message "Привет, что ты умеешь?" cd /home/worker/projects/prisma_platform
.venv/bin/python -m src.agent_os
``` ```
## Запуск AgentOS По умолчанию приложение доступно на `http://127.0.0.1:7777`.
Запуск сервера AgentOS: Документация API:
```bash
python -m src.agent_os
```
По умолчанию AgentOS работает на `http://127.0.0.1:7777`.
Документация API доступна по адресам:
- `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/docs`
- `http://127.0.0.1:7777/redoc` - `http://127.0.0.1:7777/redoc`
## Запуск сценария через HTTP
- `POST http://127.0.0.1:7777/api/runs`
Тело запроса:
```json
{
"scenario_id": "news_source_discovery_v1",
"input": {
"url": "https://example.com/news"
}
}
```
Пример:
```bash
curl -s -X POST "http://127.0.0.1:7777/api/runs" \
-H "Content-Type: application/json" \
-d '{
"scenario_id": "news_source_discovery_v1",
"input": {
"url": "https://example.com/news"
}
}'
```
Успешный ответ содержит:
- `status=success`
- `message=""`
- список `steps` со статусами и временем шагов
- `output_summary`
- `result` итогового шага
При ошибке:
- `status=failed`
- `message` содержит текст ошибки
## Переменные окружения ## Переменные окружения
Основные переменные: Agent:
- `AGENT_ID` (по умолчанию: `prisma-agent`) - `AGENT_ID` (default: `prisma-agent`)
- `OLLAMA_MODEL_ID` (по умолчанию: `gemma4:31b`) - `AGENT_MARKDOWN` (default: `false`)
- `OLLAMA_HOST` (по умолчанию: `http://localhost:11435`) - `AGENT_DEBUG_MODE` (default: `true`)
- `OLLAMA_TEMPERATURE` (по умолчанию: `0`) - `AGENT_INSTRUCTIONS`
- `AGENT_MARKDOWN` (по умолчанию: `false`) - `OLLAMA_MODEL_ID` (default: `gemma4:31b`)
- `AGENT_DEBUG_MODE` (по умолчанию: `true`) - `OLLAMA_HOST` (default: `http://localhost:11435`)
- `AGENT_INSTRUCTIONS` (по умолчанию: `You are a helpful assistant. Answer briefly and clearly.`) - `OLLAMA_TEMPERATURE` (default: `0`)
- `AGENT_OS_HOST` (по умолчанию: `127.0.0.1`)
- `AGENT_OS_PORT` (по умолчанию: `7777`) API runtime:
- `AGENT_OS_HOST` (default: `127.0.0.1`)
- `AGENT_OS_PORT` (default: `7777`)
Planner:
- `PLANNER_ENABLED` (default: `false`)
- `PLANNER_REPAIR_ATTEMPTS` (default: `3`)
Planner model (`polza.ai`):
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
- `POLZA_API_KEY` (required)
- `POLZA_TEMPERATURE` (default: `0`)
MCP:
- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`)
- `MCP_TIMEOUT_SECONDS` (default: `10`)
Phoenix tracing:
- `PHOENIX_TRACING_ENABLED` (default: `false`)
- `PHOENIX_COLLECTOR_ENDPOINT` (default: `http://localhost:6006`)
- `PHOENIX_PROJECT_NAME` (default: `prisma-platform`)
## Phoenix трассировка (локально)
1) Включите трассировку в `.env`:
```dotenv
PHOENIX_TRACING_ENABLED=true
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform
```
2) Запустите приложение:
```bash
.venv/bin/python -m src.agent_os
```
+2
View File
@@ -5,3 +5,5 @@ python-dotenv
ollama ollama
socksio socksio
openai openai
arize-phoenix-otel
openinference-instrumentation-agno
+6
View File
@@ -0,0 +1,6 @@
{
"scenarios": {
"news_source_discovery_v1": "news_source_discovery/v1.json",
"news_source_discovery_v1_planner_repair": "news_source_discovery/v1_planner_repair.json"
}
}
+105
View File
@@ -0,0 +1,105 @@
{
"schema_version": "1",
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": [
"url"
],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article"
}
}
},
"steps": [
{
"name": "search_news_sources",
"type": "tool",
"tool": "search_news_sources",
"input": {
"url": {
"from": "input.url"
}
},
"required_input_fields": [
"url"
]
},
{
"name": "parse_articles_batch",
"type": "tool",
"tool": "parse_article",
"foreach": {
"from": "steps.search_news_sources.payload.items",
"as": "item"
},
"input": {
"url": {
"from": "item.url"
}
},
"collect": {
"url": {
"from": "tool.payload.url"
},
"title": {
"from": "tool.payload.title"
},
"text": {
"from": "tool.payload.text"
}
},
"collect_key": "items"
},
{
"name": "extract_publication_date_batch",
"type": "tool",
"tool": "extract_publication_date",
"foreach": {
"from": "steps.parse_articles_batch.payload.items",
"as": "item"
},
"input": {
"article_text": {
"from": "item.text"
}
},
"collect": {
"url": {
"from": "item.url"
},
"title": {
"from": "item.title"
},
"published_at": {
"from": "tool.payload.published_at"
}
},
"collect_key": "items"
},
{
"name": "rank_sources_by_date",
"type": "tool",
"tool": "rank_sources_by_date",
"input": {
"items": {
"from": "steps.extract_publication_date_batch.payload.items"
}
}
},
{
"name": "generate_summary",
"type": "tool",
"tool": "generate_summary",
"input": {
"items": {
"from": "steps.rank_sources_by_date.payload.ranked_items"
}
}
}
]
}
@@ -0,0 +1,117 @@
{
"schema_version": "1",
"scenario_id": "news_source_discovery_v1_planner_repair",
"name": "News Source Discovery V1 Planner Repair",
"description": "Test scenario with intentionally wrong input paths repaired by planner.",
"input_schema": {
"type": "object",
"required": [
"url"
],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article"
}
}
},
"steps": [
{
"name": "search_news_sources",
"type": "tool",
"tool": "search_news_sources",
"input": {
"url": {
"from": "input.url"
}
},
"required_input_fields": [
"url"
]
},
{
"name": "parse_articles_batch",
"type": "tool",
"tool": "parse_article",
"foreach": {
"from": "steps.search_news_sources.payload.items",
"as": "item"
},
"input": {
"url": {
"from": "item.link"
}
},
"required_input_fields": [
"url"
],
"collect": {
"url": {
"from": "tool.payload.url"
},
"title": {
"from": "tool.payload.title"
},
"text": {
"from": "tool.payload.text"
}
},
"collect_key": "items"
},
{
"name": "extract_publication_date_batch",
"type": "tool",
"tool": "extract_publication_date",
"foreach": {
"from": "steps.parse_articles_batch.payload.items",
"as": "item"
},
"input": {
"article_text": {
"from": "item.body"
}
},
"required_input_fields": [
"article_text"
],
"collect": {
"url": {
"from": "item.url"
},
"title": {
"from": "item.title"
},
"published_at": {
"from": "tool.payload.published_at"
}
},
"collect_key": "items"
},
{
"name": "rank_sources_by_date",
"type": "tool",
"tool": "rank_sources_by_date",
"input": {
"items": {
"from": "steps.extract_publication_date_batch.payload.items"
}
},
"required_input_fields": [
"items"
]
},
{
"name": "generate_summary",
"type": "tool",
"tool": "generate_summary",
"input": {
"items": {
"from": "steps.rank_sources_by_date.payload.items_ranked_typo"
}
},
"required_input_fields": [
"items"
]
}
]
}
+14 -1
View File
@@ -1,15 +1,28 @@
import os import os
from dotenv import load_dotenv from dotenv import load_dotenv
from fastapi import FastAPI
from agno.os import AgentOS from agno.os import AgentOS
from src.api_routes import router as api_router
from src.agent_runner import get_agent from src.agent_runner import get_agent
from src.observability import init_phoenix_tracing
load_dotenv() load_dotenv()
_tracing_enabled = init_phoenix_tracing()
_agent = get_agent() _agent = get_agent()
_agent_os = AgentOS(agents=[_agent]) _base_app = FastAPI(
title="Prisma Platform API",
version="0.1.0",
)
_base_app.include_router(api_router)
_agent_os = AgentOS(
agents=[_agent],
tracing=_tracing_enabled,
base_app=_base_app,
)
app = _agent_os.get_app() app = _agent_os.get_app()
-6
View File
@@ -47,9 +47,3 @@ def get_agent() -> Agent:
debug_mode=debug_mode, debug_mode=debug_mode,
) )
return _agent return _agent
async def run_agent(message: str) -> str:
agent = get_agent()
response = await agent.arun(message)
return str(response.content)
+14
View File
@@ -0,0 +1,14 @@
from fastapi import APIRouter
from src.mcp_workflow_runner import run_scenario
from src.schemas import ScenarioRunRequest, ScenarioRunResponse
router = APIRouter(prefix="/api", tags=["workflow"])
@router.post("/runs", response_model=ScenarioRunResponse)
async def post_run(request: ScenarioRunRequest) -> ScenarioRunResponse:
return await run_scenario(
scenario_id=request.scenario_id,
input_data=request.input,
)
-43
View File
@@ -1,43 +0,0 @@
import argparse
import asyncio
from dotenv import load_dotenv
from src.agent_runner import run_agent
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Run base chat agent.",
)
parser.add_argument(
"--message",
help="Single message mode. If omitted, starts interactive chat.",
)
return parser
async def _main() -> None:
load_dotenv()
args = build_parser().parse_args()
if args.message:
result = await run_agent(args.message)
print(result)
return
print("Chat mode started. Type 'exit' or 'quit' to stop.")
while True:
user_message = input("you> ").strip()
if not user_message:
continue
if user_message.lower() in {"exit", "quit"}:
print("Bye.")
break
result = await run_agent(user_message)
print(f"agent> {result}")
if __name__ == "__main__":
asyncio.run(_main())
+56
View File
@@ -0,0 +1,56 @@
from __future__ import annotations
from datetime import timedelta
import json
import os
from typing import Any
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import TextContent
def _mcp_url() -> str:
return os.getenv("MCP_BASE_URL", "http://127.0.0.1:8081/mcp")
def _timeout_seconds() -> float:
value = os.getenv("MCP_TIMEOUT_SECONDS")
if value is None:
return 10.0
return float(value)
async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
try:
async with streamablehttp_client(url=_mcp_url()) as session_params:
read, write = session_params[0:2]
async with ClientSession(
read,
write,
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
) as session:
await session.initialize()
result = await session.call_tool(tool_name, arguments)
except TimeoutError as exc:
raise RuntimeError(f"MCP timeout: {tool_name}") from exc
except Exception as exc:
raise RuntimeError(f"MCP transport error: {tool_name}") from exc
if result.isError:
raise RuntimeError(f"MCP tool error: {tool_name}")
if isinstance(result.structuredContent, dict):
return result.structuredContent
for content_item in result.content:
if not isinstance(content_item, TextContent):
continue
try:
parsed = json.loads(content_item.text)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}")
+385
View File
@@ -0,0 +1,385 @@
from __future__ import annotations
from copy import deepcopy
from datetime import datetime, timezone
import json
import os
from typing import Any, Awaitable, Callable
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow
from src.mcp_client import call_mcp_tool
from src.schemas import ScenarioRunResponse, StepState
from src.scenario_store import ScenarioStoreError, load_scenario_definition
from src.step_planner import plan_arguments, planner_enabled
from src.template import (
missing_required_fields,
resolve_path,
resolve_template,
validate_required_fields,
)
def _env_int(name: str, default: int) -> int:
value = os.getenv(name)
return int(value) if value is not None else default
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _build_scope(session_state: dict[str, Any]) -> dict[str, Any]:
return {
"input": session_state.get("input", {}),
"steps": session_state.get("steps", {}),
}
async def _prepare_arguments(
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
) -> dict[str, Any]:
final_arguments = deepcopy(base_arguments)
missing = missing_required_fields(final_arguments, required_fields)
if missing and planner_enabled():
max_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
for attempt in range(1, max_attempts + 1):
final_arguments = await plan_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=final_arguments,
required_fields=required_fields,
scope=scope,
missing_fields=missing,
attempt_no=attempt,
)
missing = missing_required_fields(final_arguments, required_fields)
if not missing:
break
validate_required_fields(final_arguments, required_fields, step_name)
return final_arguments
async def _execute_one_call(
*,
step_name: str,
tool_name: str,
required_fields: list[str],
input_template: Any,
scope: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
resolved = resolve_template(input_template, scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
arguments = await _prepare_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=base_arguments,
required_fields=required_fields,
scope=scope,
)
tool_response = await call_mcp_tool(tool_name, arguments)
return arguments, tool_response
def _build_tool_executor(
step_spec: dict[str, Any],
) -> Callable[[StepInput, dict[str, Any]], Awaitable[StepOutput]]:
step_name = str(step_spec["name"])
tool_name = str(step_spec["tool"])
input_template = step_spec.get("input", {})
foreach_spec = step_spec.get("foreach")
collect_template = step_spec.get("collect")
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
required_fields = [
f for f in step_spec.get("required_input_fields", []) if isinstance(f, str)
]
if isinstance(foreach_spec, dict):
foreach_from = str(foreach_spec.get("from", "")).strip()
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
else:
foreach_from = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
item_alias = "item"
async def executor(_step_input: StepInput, session_state: dict[str, Any]) -> StepOutput:
started_at = _utc_now_iso()
scope = _build_scope(session_state)
try:
if foreach_from:
iterable = resolve_path(scope, foreach_from)
if not isinstance(iterable, list):
raise ValueError(f"{step_name}: foreach source is not list")
collected: list[Any] = []
iteration_requests: list[dict[str, Any]] = []
iteration_responses: list[dict[str, Any]] = []
last_received_at: str | None = None
for index, item in enumerate(iterable):
iteration_scope = {**scope, item_alias: item, "item": item, "index": index}
arguments, tool_response = await _execute_one_call(
step_name=step_name,
tool_name=tool_name,
required_fields=required_fields,
input_template=input_template,
scope=iteration_scope,
)
iteration_requests.append(arguments)
iteration_responses.append(tool_response)
received_at = tool_response.get("received_at")
if isinstance(received_at, str) and received_at:
last_received_at = received_at
if collect_template is None:
collected.append(tool_response.get("payload", {}))
else:
collected.append(
resolve_template(
collect_template,
{**iteration_scope, "tool": tool_response},
)
)
step_payload = {
"ok": True,
"tool_name": tool_name,
"payload": {collect_key: collected},
"request": {
"foreach_from": foreach_from,
"count": len(iterable),
"items": iteration_requests,
},
"response": {"items": iteration_responses},
"received_at": last_received_at,
"started_at": started_at,
"finished_at": _utc_now_iso(),
}
else:
arguments, tool_response = await _execute_one_call(
step_name=step_name,
tool_name=tool_name,
required_fields=required_fields,
input_template=input_template,
scope=scope,
)
step_payload = {
"ok": bool(tool_response.get("ok", True)),
"tool_name": tool_name,
"payload": tool_response.get("payload", {}),
"request": arguments,
"response": tool_response,
"received_at": tool_response.get("received_at"),
"started_at": started_at,
"finished_at": _utc_now_iso(),
}
session_state.setdefault("steps", {})[step_name] = step_payload
return StepOutput(
content=json.dumps(step_payload, ensure_ascii=False),
success=True,
)
except Exception as exc:
finished_at = _utc_now_iso()
error_payload = {
"ok": False,
"tool_name": tool_name,
"error": str(exc),
"started_at": started_at,
"finished_at": finished_at,
}
session_state.setdefault("steps", {})[step_name] = error_payload
raise RuntimeError(f"{step_name} failed: {exc}") from exc
return executor
def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
if raw_step.get("type") != "tool":
raise ScenarioStoreError("This runner supports only tool steps")
step_name = str(raw_step.get("name", "")).strip()
tool_name = str(raw_step.get("tool", step_name)).strip()
if not step_name or not tool_name:
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
workflow_steps.append(
Step(
name=step_name,
description=str(raw_step.get("description", step_name)),
executor=_build_tool_executor(raw_step),
max_retries=0,
on_error="fail",
)
)
return Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
)
_workflow_cache: dict[str, Workflow] = {}
def _get_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
cached = _workflow_cache.get(scenario_id)
if cached is not None:
return cached
workflow = _build_workflow(scenario_id, scenario)
_workflow_cache[scenario_id] = workflow
return workflow
def _extract_output_summary(result: dict[str, Any] | None) -> str | None:
if not isinstance(result, dict):
return None
summary = result.get("summary")
if isinstance(summary, str) and summary:
return summary
payload = result.get("payload")
if isinstance(payload, dict):
payload_summary = payload.get("summary")
if isinstance(payload_summary, str) and payload_summary:
return payload_summary
return None
def _build_step_states(
scenario: dict[str, Any],
steps_payloads: dict[str, Any],
) -> list[StepState]:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list):
return []
states: list[StepState] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
continue
name = str(raw_step.get("name", "")).strip()
if not name:
continue
payload = steps_payloads.get(name)
if not isinstance(payload, dict):
states.append(
StepState(
node_id=name,
status="queued",
message="",
)
)
continue
ok = bool(payload.get("ok", False))
states.append(
StepState(
node_id=name,
status="success" if ok else "failed",
started_at=str(payload.get("started_at") or "") or None,
finished_at=str(payload.get("finished_at") or "") or None,
message="" if ok else str(payload.get("error", f"{name} failed")),
)
)
return states
async def run_scenario(
*,
scenario_id: str,
input_data: dict[str, Any],
) -> ScenarioRunResponse:
try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
return ScenarioRunResponse(
scenario_id=scenario_id,
status="failed",
message=str(exc),
input=input_data,
)
scenario_name = str(scenario.get("name", scenario_id))
try:
workflow = _get_workflow(scenario_id, scenario)
except ScenarioStoreError as exc:
return ScenarioRunResponse(
scenario_id=scenario_id,
status="failed",
message=str(exc),
input=input_data,
scenario_name=scenario_name,
)
# Fresh per-run state that Agno owns during arun(..., session_state=...).
session_state: dict[str, Any] = {
"input": deepcopy(input_data),
"steps": {},
}
workflow_error: str | None = None
run_output: Any = None
try:
run_output = await workflow.arun(
input=input_data,
session_state=session_state,
)
except Exception as exc:
workflow_error = str(exc)
steps_payloads = session_state.get("steps", {}) or {}
step_states = _build_step_states(scenario, steps_payloads)
status = "success"
if workflow_error is not None:
status = "failed"
else:
for payload in steps_payloads.values():
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
status = "failed"
break
if run_output is not None and not bool(getattr(run_output, "success", True)):
status = "failed"
content = getattr(run_output, "content", None)
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
if content is None:
for payload in reversed(list(steps_payloads.values())):
if isinstance(payload, dict):
content = deepcopy(payload)
break
if content is None and workflow_error is not None:
content = {"message": workflow_error}
result = content if isinstance(content, dict) else {"raw_content": content}
response_message = "" if status == "success" else (workflow_error or "failed")
return ScenarioRunResponse(
scenario_id=scenario_id,
status=status,
message=response_message,
input=input_data,
steps=step_states,
output_summary=_extract_output_summary(result),
scenario_name=scenario_name,
workflow_name=workflow.name,
result=result,
run_id=str(getattr(run_output, "run_id", "")) or None,
session_id=str(getattr(run_output, "session_id", "")) or None,
)
+36
View File
@@ -0,0 +1,36 @@
import os
from phoenix.otel import register
_initialized = False
def _env_bool(name: str, default: bool) -> bool:
value = os.getenv(name)
if value is None:
return default
return value.strip().lower() in {"1", "true", "yes", "on"}
def init_phoenix_tracing() -> bool:
global _initialized
enabled = _env_bool("PHOENIX_TRACING_ENABLED", False)
if not enabled:
return False
if _initialized:
return True
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = os.getenv(
"PHOENIX_COLLECTOR_ENDPOINT",
"http://localhost:6006",
)
project_name = os.getenv("PHOENIX_PROJECT_NAME", "prisma-platform")
register(
project_name=project_name,
auto_instrument=True,
)
_initialized = True
return True
+60
View File
@@ -0,0 +1,60 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
class ScenarioStoreError(ValueError):
"""Raised when scenario definitions are missing or invalid."""
_SCENARIOS_ROOT = Path(__file__).resolve().parent.parent / "scenarios"
_INDEX_PATH = _SCENARIOS_ROOT / "index.json"
def _read_json(path: Path) -> dict[str, Any]:
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise ScenarioStoreError(f"Scenario file not found: {path}") from exc
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise ScenarioStoreError(f"Invalid JSON in file: {path}") from exc
if not isinstance(parsed, dict):
raise ScenarioStoreError(f"JSON root must be object: {path}")
return parsed
def load_scenario_index() -> dict[str, str]:
index = _read_json(_INDEX_PATH)
scenarios = index.get("scenarios")
if not isinstance(scenarios, dict):
raise ScenarioStoreError("index.json must contain object field 'scenarios'")
normalized: dict[str, str] = {}
for scenario_id, relative_path in scenarios.items():
if not isinstance(scenario_id, str) or not isinstance(relative_path, str):
raise ScenarioStoreError("index.json scenario entries must be string -> string")
normalized[scenario_id] = relative_path
return normalized
def load_scenario_definition(scenario_id: str) -> dict[str, Any]:
index = load_scenario_index()
relative_path = index.get(scenario_id)
if relative_path is None:
raise ScenarioStoreError(f"Unknown scenario_id: {scenario_id}")
scenario_path = _SCENARIOS_ROOT / relative_path
scenario = _read_json(scenario_path)
declared_id = scenario.get("scenario_id")
if declared_id != scenario_id:
raise ScenarioStoreError(
"Scenario file scenario_id does not match requested scenario_id"
)
return scenario
+35
View File
@@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
class ScenarioRunRequest(BaseModel):
scenario_id: str = "news_source_discovery_v1"
input: dict[str, Any] = Field(default_factory=dict)
class StepState(BaseModel):
node_id: str
status: StepStatus
started_at: str | None = None
finished_at: str | None = None
message: str = ""
class ScenarioRunResponse(BaseModel):
scenario_id: str
status: RunStatus
message: str = ""
input: dict[str, Any]
steps: list[StepState] = Field(default_factory=list)
output_summary: str | None = None
scenario_name: str | None = None
workflow_name: str | None = None
result: dict[str, Any] | None = None
run_id: str | None = None
session_id: str | None = None
+129
View File
@@ -0,0 +1,129 @@
from __future__ import annotations
from copy import deepcopy
import json
import os
from typing import Any
from openai import AsyncOpenAI
_planner_client: AsyncOpenAI | None = None
def _env_float(name: str, default: float) -> float:
value = os.getenv(name)
if value is None:
return default
return float(value)
def planner_enabled() -> bool:
return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"}
def _get_client() -> AsyncOpenAI:
global _planner_client
if _planner_client is not None:
return _planner_client
_planner_client = AsyncOpenAI(
base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"),
api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"),
)
return _planner_client
def _response_schema(required_fields: list[str]) -> dict[str, Any]:
value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]}
return {
"name": "mcp_arguments",
"strict": True,
"schema": {
"type": "object",
"properties": {
"arguments": {
"type": "object",
"properties": {f: value_schema for f in required_fields},
"required": required_fields,
"additionalProperties": True,
}
},
"required": ["arguments"],
"additionalProperties": False,
},
}
def _extract_arguments(content: Any) -> dict[str, Any]:
candidate: Any = content
if isinstance(candidate, str):
text = candidate.strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.startswith("json"):
text = text[4:].strip()
try:
candidate = json.loads(text)
except json.JSONDecodeError:
return {}
if isinstance(candidate, dict):
if isinstance(candidate.get("arguments"), dict):
return candidate["arguments"]
return candidate
return {}
async def plan_arguments(
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
missing_fields: list[str],
attempt_no: int,
) -> dict[str, Any]:
"""Fallback planner: asks an LLM to fill missing required fields from context.
Returns merged arguments (base + planned). On any failure returns base_arguments
unchanged — caller is responsible for validating required fields afterwards.
"""
prompt = {
"task": "Prepare MCP arguments for this step.",
"step_name": step_name,
"tool_name": tool_name,
"required_fields": required_fields,
"base_arguments": base_arguments,
"missing_fields": missing_fields,
"repair_attempt": attempt_no,
"context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})},
"output": (
"Return only JSON object with key 'arguments'. "
"Fill every missing field from context."
),
}
try:
completion = await _get_client().chat.completions.create(
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
messages=[
{
"role": "system",
"content": (
"You are a tool-input planner. "
"Return only JSON that matches the provided schema."
),
},
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
],
response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)},
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
)
raw = completion.choices[0].message.content if completion.choices else ""
planned = _extract_arguments(raw)
except Exception:
planned = {}
merged = deepcopy(base_arguments)
if isinstance(planned, dict):
merged.update(planned)
return merged
+51
View File
@@ -0,0 +1,51 @@
from __future__ import annotations
from copy import deepcopy
from typing import Any
def resolve_path(scope: dict[str, Any], path: str) -> Any:
value: Any = scope
for segment in path.split("."):
key = segment.strip()
if not key:
continue
if not isinstance(value, dict):
return None
value = value.get(key)
return deepcopy(value)
def resolve_template(template: Any, scope: dict[str, Any]) -> Any:
if isinstance(template, dict):
if set(template.keys()) == {"from"}:
return resolve_path(scope, str(template["from"]))
return {key: resolve_template(value, scope) for key, value in template.items()}
if isinstance(template, list):
return [resolve_template(item, scope) for item in template]
return deepcopy(template)
def missing_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
) -> list[str]:
missing: list[str] = []
for field in required_fields:
value = arguments.get(field)
if isinstance(value, str) and value.strip():
continue
if value not in (None, "", [], {}):
continue
missing.append(field)
return missing
def validate_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
step_name: str,
) -> None:
missing = missing_required_fields(arguments, required_fields)
if missing:
raise ValueError(f"{step_name}: missing required fields: {', '.join(missing)}")