Compare commits
10 Commits
00e7ef5d11
...
5ca49821ba
| Author | SHA1 | Date | |
|---|---|---|---|
| 5ca49821ba | |||
| ad828885e3 | |||
| 93ee7aea1c | |||
| 9068b7fe07 | |||
| 0fbd7dce1a | |||
| d341941f87 | |||
| 2b0c748474 | |||
| 2111964d8b | |||
| d22db07b43 | |||
| 196e9aaf27 |
+10
-1
@@ -4,6 +4,15 @@ OLLAMA_HOST=http://localhost:11435
|
||||
OLLAMA_TEMPERATURE=0
|
||||
AGENT_MARKDOWN=false
|
||||
AGENT_DEBUG_MODE=true
|
||||
AGENT_INSTRUCTIONS=You are a helpful assistant. Answer briefly and clearly.
|
||||
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
|
||||
AGENT_OS_HOST=127.0.0.1
|
||||
AGENT_OS_PORT=7777
|
||||
POLZA_BASE_URL=https://api.polza.ai/v1
|
||||
POLZA_MODEL_ID=google/gemma-4-31b-it
|
||||
POLZA_API_KEY=key
|
||||
POLZA_TEMPERATURE=0
|
||||
MCP_BASE_URL=http://127.0.0.1:8081/mcp
|
||||
MCP_TIMEOUT_SECONDS=10
|
||||
PHOENIX_TRACING_ENABLED=false
|
||||
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
|
||||
PHOENIX_PROJECT_NAME=prisma-platform
|
||||
|
||||
@@ -25,3 +25,6 @@ dist/
|
||||
.vscode/
|
||||
.DS_Store
|
||||
.cursor
|
||||
|
||||
# Cookbook code
|
||||
vendor/agno/cookbook/
|
||||
@@ -1,8 +1,19 @@
|
||||
# Prisma Platform MVP
|
||||
|
||||
Минимальный чат-агент на Agno + Ollama с рантаймом AgentOS.
|
||||
MVP-реализация сценарного раннера на Agno AgentOS.
|
||||
|
||||
В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn).
|
||||
Текущая схема исполнения:
|
||||
|
||||
- сценарий хранится в `scenarios/*.json`;
|
||||
- исполнение идет через `src/mcp_workflow_runner.py`;
|
||||
- каждый шаг вызывает MCP инструмент через `src/mcp_client.py`;
|
||||
- для подготовки аргументов шага используется planner-агент с моделью через `polza.ai`.
|
||||
|
||||
## Требования
|
||||
|
||||
- Python 3.10+
|
||||
- MCP endpoint (по умолчанию `http://127.0.0.1:8081/mcp`)
|
||||
- доступ к модели через `polza.ai` (`POLZA_API_KEY`)
|
||||
|
||||
## Текущая структура
|
||||
|
||||
@@ -11,11 +22,20 @@ prisma_platform/
|
||||
├── .env
|
||||
├── .env.example
|
||||
├── requirements.txt
|
||||
├── scenarios/
|
||||
│ ├── index.json
|
||||
│ └── news_source_discovery/
|
||||
│ └── v1.json
|
||||
└── src/
|
||||
├── __init__.py
|
||||
├── api_routes.py
|
||||
├── agent_os.py
|
||||
├── agent_runner.py
|
||||
└── main.py
|
||||
├── mcp_client.py
|
||||
├── mcp_workflow_runner.py
|
||||
├── observability.py
|
||||
├── scenario_store.py
|
||||
└── schemas.py
|
||||
```
|
||||
|
||||
## Установка
|
||||
@@ -29,44 +49,104 @@ cp .env.example .env
|
||||
|
||||
## Запуск
|
||||
|
||||
Интерактивный режим чата:
|
||||
1) Поднимите MCP stub (из соседнего репозитория):
|
||||
|
||||
```bash
|
||||
python -m src.main
|
||||
cd /home/worker/projects/docker-service/mcp-stub
|
||||
docker compose up --build -d
|
||||
```
|
||||
|
||||
Режим одного сообщения:
|
||||
2) Запустите сервер AgentOS:
|
||||
|
||||
```bash
|
||||
python -m src.main --message "Привет, что ты умеешь?"
|
||||
cd /home/worker/projects/prisma_platform
|
||||
.venv/bin/python -m src.agent_os
|
||||
```
|
||||
|
||||
## Запуск AgentOS
|
||||
По умолчанию приложение доступно на `http://127.0.0.1:7777`.
|
||||
|
||||
Запуск сервера AgentOS:
|
||||
|
||||
```bash
|
||||
python -m src.agent_os
|
||||
```
|
||||
|
||||
По умолчанию AgentOS работает на `http://127.0.0.1:7777`.
|
||||
|
||||
Документация API доступна по адресам:
|
||||
Документация API:
|
||||
|
||||
- `http://127.0.0.1:7777/docs`
|
||||
- `http://127.0.0.1:7777/redoc`
|
||||
|
||||
## Запуск сценария через HTTP
|
||||
|
||||
- `POST http://127.0.0.1:7777/api/runs`
|
||||
|
||||
Тело запроса:
|
||||
|
||||
```json
|
||||
{
|
||||
"scenario_id": "news_source_discovery_v1",
|
||||
"input": {
|
||||
"url": "https://example.com/news"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Пример:
|
||||
|
||||
```bash
|
||||
curl -s -X POST "http://127.0.0.1:7777/api/runs" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{
|
||||
"scenario_id": "news_source_discovery_v1",
|
||||
"input": {
|
||||
"url": "https://example.com/news"
|
||||
}
|
||||
}'
|
||||
```
|
||||
|
||||
Успешный ответ содержит:
|
||||
|
||||
- `status=success`
|
||||
- список `steps` со статусами шагов
|
||||
- `output_summary`
|
||||
- `result` итогового шага
|
||||
|
||||
## Переменные окружения
|
||||
|
||||
Основные переменные:
|
||||
Основные:
|
||||
|
||||
- `AGENT_ID` (по умолчанию: `prisma-agent`)
|
||||
- `OLLAMA_MODEL_ID` (по умолчанию: `gemma4:31b`)
|
||||
- `OLLAMA_HOST` (по умолчанию: `http://localhost:11435`)
|
||||
- `OLLAMA_TEMPERATURE` (по умолчанию: `0`)
|
||||
- `AGENT_MARKDOWN` (по умолчанию: `false`)
|
||||
- `AGENT_DEBUG_MODE` (по умолчанию: `true`)
|
||||
- `AGENT_INSTRUCTIONS` (по умолчанию: `You are a helpful assistant. Answer briefly and clearly.`)
|
||||
- `AGENT_OS_HOST` (по умолчанию: `127.0.0.1`)
|
||||
- `AGENT_OS_PORT` (по умолчанию: `7777`)
|
||||
- `AGENT_ID` (default: `prisma-agent`)
|
||||
- `AGENT_MARKDOWN` (default: `false`)
|
||||
- `AGENT_DEBUG_MODE` (default: `true`)
|
||||
- `AGENT_INSTRUCTIONS`
|
||||
- `AGENT_OS_HOST` (default: `127.0.0.1`)
|
||||
- `AGENT_OS_PORT` (default: `7777`)
|
||||
|
||||
Planner-модель (`polza.ai`):
|
||||
|
||||
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
|
||||
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
|
||||
- `POLZA_API_KEY` (required)
|
||||
- `POLZA_TEMPERATURE` (default: `0`)
|
||||
|
||||
MCP:
|
||||
|
||||
- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`)
|
||||
- `MCP_TIMEOUT_SECONDS` (default: `10`)
|
||||
|
||||
Phoenix tracing:
|
||||
|
||||
- `PHOENIX_TRACING_ENABLED` (default: `false`)
|
||||
- `PHOENIX_COLLECTOR_ENDPOINT` (default: `http://localhost:6006`)
|
||||
- `PHOENIX_PROJECT_NAME` (default: `prisma-platform`)
|
||||
|
||||
## Phoenix трассировка (локально)
|
||||
|
||||
1) Включите трассировку в `.env`:
|
||||
|
||||
```dotenv
|
||||
PHOENIX_TRACING_ENABLED=true
|
||||
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
|
||||
PHOENIX_PROJECT_NAME=prisma-platform
|
||||
```
|
||||
|
||||
2) Запустите приложение:
|
||||
|
||||
```bash
|
||||
.venv/bin/python -m src.agent_os
|
||||
```
|
||||
|
||||
|
||||
@@ -5,3 +5,5 @@ python-dotenv
|
||||
ollama
|
||||
socksio
|
||||
openai
|
||||
arize-phoenix-otel
|
||||
openinference-instrumentation-agno
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"scenarios": {
|
||||
"news_source_discovery_v1": "news_source_discovery/v1.json",
|
||||
"news_source_discovery_v1_planner_repair": "news_source_discovery/v1_planner_repair.json"
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,105 @@
|
||||
{
|
||||
"schema_version": "1",
|
||||
"scenario_id": "news_source_discovery_v1",
|
||||
"name": "News Source Discovery V1",
|
||||
"description": "Find earliest news source using sequential MCP tools.",
|
||||
"input_schema": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"url"
|
||||
],
|
||||
"properties": {
|
||||
"url": {
|
||||
"type": "string",
|
||||
"description": "URL of source news article"
|
||||
}
|
||||
}
|
||||
},
|
||||
"steps": [
|
||||
{
|
||||
"name": "search_news_sources",
|
||||
"type": "tool",
|
||||
"tool": "search_news_sources",
|
||||
"input": {
|
||||
"url": {
|
||||
"from": "input.url"
|
||||
}
|
||||
},
|
||||
"required_input_fields": [
|
||||
"url"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "parse_articles_batch",
|
||||
"type": "tool",
|
||||
"tool": "parse_article",
|
||||
"foreach": {
|
||||
"from": "steps.search_news_sources.payload.items",
|
||||
"as": "item"
|
||||
},
|
||||
"input": {
|
||||
"url": {
|
||||
"from": "item.url"
|
||||
}
|
||||
},
|
||||
"collect": {
|
||||
"url": {
|
||||
"from": "tool.payload.url"
|
||||
},
|
||||
"title": {
|
||||
"from": "tool.payload.title"
|
||||
},
|
||||
"text": {
|
||||
"from": "tool.payload.text"
|
||||
}
|
||||
},
|
||||
"collect_key": "items"
|
||||
},
|
||||
{
|
||||
"name": "extract_publication_date_batch",
|
||||
"type": "tool",
|
||||
"tool": "extract_publication_date",
|
||||
"foreach": {
|
||||
"from": "steps.parse_articles_batch.payload.items",
|
||||
"as": "item"
|
||||
},
|
||||
"input": {
|
||||
"article_text": {
|
||||
"from": "item.text"
|
||||
}
|
||||
},
|
||||
"collect": {
|
||||
"url": {
|
||||
"from": "item.url"
|
||||
},
|
||||
"title": {
|
||||
"from": "item.title"
|
||||
},
|
||||
"published_at": {
|
||||
"from": "tool.payload.published_at"
|
||||
}
|
||||
},
|
||||
"collect_key": "items"
|
||||
},
|
||||
{
|
||||
"name": "rank_sources_by_date",
|
||||
"type": "tool",
|
||||
"tool": "rank_sources_by_date",
|
||||
"input": {
|
||||
"items": {
|
||||
"from": "steps.extract_publication_date_batch.payload.items"
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "generate_summary",
|
||||
"type": "tool",
|
||||
"tool": "generate_summary",
|
||||
"input": {
|
||||
"items": {
|
||||
"from": "steps.rank_sources_by_date.payload.ranked_items"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
{
|
||||
"schema_version": "1",
|
||||
"scenario_id": "news_source_discovery_v1_planner_repair",
|
||||
"name": "News Source Discovery V1 Planner Repair",
|
||||
"description": "Test scenario with intentionally wrong input paths repaired by planner.",
|
||||
"input_schema": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"url"
|
||||
],
|
||||
"properties": {
|
||||
"url": {
|
||||
"type": "string",
|
||||
"description": "URL of source news article"
|
||||
}
|
||||
}
|
||||
},
|
||||
"steps": [
|
||||
{
|
||||
"name": "search_news_sources",
|
||||
"type": "tool",
|
||||
"tool": "search_news_sources",
|
||||
"input": {
|
||||
"url": {
|
||||
"from": "input.url"
|
||||
}
|
||||
},
|
||||
"required_input_fields": [
|
||||
"url"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "parse_articles_batch",
|
||||
"type": "tool",
|
||||
"tool": "parse_article",
|
||||
"foreach": {
|
||||
"from": "steps.search_news_sources.payload.items",
|
||||
"as": "item"
|
||||
},
|
||||
"input": {
|
||||
"url": {
|
||||
"from": "item.link"
|
||||
}
|
||||
},
|
||||
"required_input_fields": [
|
||||
"url"
|
||||
],
|
||||
"collect": {
|
||||
"url": {
|
||||
"from": "tool.payload.url"
|
||||
},
|
||||
"title": {
|
||||
"from": "tool.payload.title"
|
||||
},
|
||||
"text": {
|
||||
"from": "tool.payload.text"
|
||||
}
|
||||
},
|
||||
"collect_key": "items"
|
||||
},
|
||||
{
|
||||
"name": "extract_publication_date_batch",
|
||||
"type": "tool",
|
||||
"tool": "extract_publication_date",
|
||||
"foreach": {
|
||||
"from": "steps.parse_articles_batch.payload.items",
|
||||
"as": "item"
|
||||
},
|
||||
"input": {
|
||||
"article_text": {
|
||||
"from": "item.body"
|
||||
}
|
||||
},
|
||||
"required_input_fields": [
|
||||
"article_text"
|
||||
],
|
||||
"collect": {
|
||||
"url": {
|
||||
"from": "item.url"
|
||||
},
|
||||
"title": {
|
||||
"from": "item.title"
|
||||
},
|
||||
"published_at": {
|
||||
"from": "tool.payload.published_at"
|
||||
}
|
||||
},
|
||||
"collect_key": "items"
|
||||
},
|
||||
{
|
||||
"name": "rank_sources_by_date",
|
||||
"type": "tool",
|
||||
"tool": "rank_sources_by_date",
|
||||
"input": {
|
||||
"items": {
|
||||
"from": "steps.extract_publication_date_batch.payload.items"
|
||||
}
|
||||
},
|
||||
"required_input_fields": [
|
||||
"items"
|
||||
]
|
||||
},
|
||||
{
|
||||
"name": "generate_summary",
|
||||
"type": "tool",
|
||||
"tool": "generate_summary",
|
||||
"input": {
|
||||
"items": {
|
||||
"from": "steps.rank_sources_by_date.payload.items_ranked_typo"
|
||||
}
|
||||
},
|
||||
"required_input_fields": [
|
||||
"items"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
+14
-1
@@ -1,15 +1,28 @@
|
||||
import os
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from fastapi import FastAPI
|
||||
|
||||
from agno.os import AgentOS
|
||||
|
||||
from src.api_routes import router as api_router
|
||||
from src.agent_runner import get_agent
|
||||
from src.observability import init_phoenix_tracing
|
||||
|
||||
load_dotenv()
|
||||
_tracing_enabled = init_phoenix_tracing()
|
||||
|
||||
_agent = get_agent()
|
||||
_agent_os = AgentOS(agents=[_agent])
|
||||
_base_app = FastAPI(
|
||||
title="Prisma Platform API",
|
||||
version="0.1.0",
|
||||
)
|
||||
_base_app.include_router(api_router)
|
||||
_agent_os = AgentOS(
|
||||
agents=[_agent],
|
||||
tracing=_tracing_enabled,
|
||||
base_app=_base_app,
|
||||
)
|
||||
app = _agent_os.get_app()
|
||||
|
||||
|
||||
|
||||
@@ -47,9 +47,3 @@ def get_agent() -> Agent:
|
||||
debug_mode=debug_mode,
|
||||
)
|
||||
return _agent
|
||||
|
||||
|
||||
async def run_agent(message: str) -> str:
|
||||
agent = get_agent()
|
||||
response = await agent.arun(message)
|
||||
return str(response.content)
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
from fastapi import APIRouter
|
||||
|
||||
from src.mcp_workflow_runner import run_scenario_workflow
|
||||
from src.schemas import ScenarioRunRequest, ScenarioRunResponse
|
||||
|
||||
router = APIRouter(prefix="/api", tags=["workflow"])
|
||||
|
||||
|
||||
@router.post("/runs", response_model=ScenarioRunResponse)
|
||||
async def run_scenario(request: ScenarioRunRequest) -> ScenarioRunResponse:
|
||||
result = await run_scenario_workflow(
|
||||
input_data=request.input,
|
||||
scenario_id=request.scenario_id,
|
||||
)
|
||||
return ScenarioRunResponse.model_validate(result)
|
||||
-43
@@ -1,43 +0,0 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from src.agent_runner import run_agent
|
||||
|
||||
|
||||
def build_parser() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Run base chat agent.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--message",
|
||||
help="Single message mode. If omitted, starts interactive chat.",
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
async def _main() -> None:
|
||||
load_dotenv()
|
||||
args = build_parser().parse_args()
|
||||
|
||||
if args.message:
|
||||
result = await run_agent(args.message)
|
||||
print(result)
|
||||
return
|
||||
|
||||
print("Chat mode started. Type 'exit' or 'quit' to stop.")
|
||||
while True:
|
||||
user_message = input("you> ").strip()
|
||||
if not user_message:
|
||||
continue
|
||||
if user_message.lower() in {"exit", "quit"}:
|
||||
print("Bye.")
|
||||
break
|
||||
|
||||
result = await run_agent(user_message)
|
||||
print(f"agent> {result}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(_main())
|
||||
@@ -0,0 +1,56 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
import json
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
from mcp import ClientSession
|
||||
from mcp.client.streamable_http import streamablehttp_client
|
||||
from mcp.types import TextContent
|
||||
|
||||
|
||||
def _mcp_url() -> str:
|
||||
return os.getenv("MCP_BASE_URL", "http://127.0.0.1:8081/mcp")
|
||||
|
||||
|
||||
def _timeout_seconds() -> float:
|
||||
value = os.getenv("MCP_TIMEOUT_SECONDS")
|
||||
if value is None:
|
||||
return 10.0
|
||||
return float(value)
|
||||
|
||||
|
||||
async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
|
||||
try:
|
||||
async with streamablehttp_client(url=_mcp_url()) as session_params:
|
||||
read, write = session_params[0:2]
|
||||
async with ClientSession(
|
||||
read,
|
||||
write,
|
||||
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
|
||||
) as session:
|
||||
await session.initialize()
|
||||
result = await session.call_tool(tool_name, arguments)
|
||||
except TimeoutError as exc:
|
||||
raise RuntimeError(f"MCP timeout: {tool_name}") from exc
|
||||
except Exception as exc:
|
||||
raise RuntimeError(f"MCP transport error: {tool_name}") from exc
|
||||
|
||||
if result.isError:
|
||||
raise RuntimeError(f"MCP tool error: {tool_name}")
|
||||
|
||||
if isinstance(result.structuredContent, dict):
|
||||
return result.structuredContent
|
||||
|
||||
for content_item in result.content:
|
||||
if not isinstance(content_item, TextContent):
|
||||
continue
|
||||
try:
|
||||
parsed = json.loads(content_item.text)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if isinstance(parsed, dict):
|
||||
return parsed
|
||||
|
||||
raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}")
|
||||
@@ -0,0 +1,684 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from contextvars import ContextVar
|
||||
from copy import deepcopy
|
||||
from datetime import datetime, timezone
|
||||
import json
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
from agno.workflow.step import Step, StepInput, StepOutput
|
||||
from agno.workflow.workflow import Workflow
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
from src.mcp_client import call_mcp_tool
|
||||
from src.schemas import RunError, ScenarioRunResponse, StepState
|
||||
from src.scenario_store import ScenarioStoreError, load_scenario_definition
|
||||
|
||||
_planner_client: AsyncOpenAI | None = None
|
||||
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
value = os.getenv(name)
|
||||
if value is None:
|
||||
return default
|
||||
return float(value)
|
||||
|
||||
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
value = os.getenv(name)
|
||||
if value is None:
|
||||
return default
|
||||
return int(value)
|
||||
|
||||
|
||||
def _utc_now_iso() -> str:
|
||||
return datetime.now(timezone.utc).isoformat()
|
||||
|
||||
|
||||
def get_shared_step_planner_client() -> AsyncOpenAI:
|
||||
global _planner_client
|
||||
if _planner_client is not None:
|
||||
return _planner_client
|
||||
|
||||
polza_base_url = os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1")
|
||||
polza_api_key = os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY")
|
||||
_planner_client = AsyncOpenAI(
|
||||
base_url=polza_base_url,
|
||||
api_key=polza_api_key,
|
||||
)
|
||||
return _planner_client
|
||||
|
||||
|
||||
def _resolve_path(scope: dict[str, Any], path: str) -> Any:
|
||||
value: Any = scope
|
||||
for segment in path.split("."):
|
||||
key = segment.strip()
|
||||
if not key:
|
||||
continue
|
||||
if not isinstance(value, dict):
|
||||
return None
|
||||
value = value.get(key)
|
||||
return deepcopy(value)
|
||||
|
||||
|
||||
def _resolve_template(template: Any, scope: dict[str, Any]) -> Any:
|
||||
if isinstance(template, dict):
|
||||
if set(template.keys()) == {"from"}:
|
||||
return _resolve_path(scope, str(template["from"]))
|
||||
return {key: _resolve_template(value, scope) for key, value in template.items()}
|
||||
if isinstance(template, list):
|
||||
return [_resolve_template(item, scope) for item in template]
|
||||
return deepcopy(template)
|
||||
|
||||
|
||||
def _validate_required_fields(
|
||||
arguments: dict[str, Any],
|
||||
required_fields: list[str],
|
||||
step_name: str,
|
||||
) -> None:
|
||||
missing_fields: list[str] = []
|
||||
for field in required_fields:
|
||||
value = arguments.get(field)
|
||||
if isinstance(value, str) and value.strip():
|
||||
continue
|
||||
if value not in (None, "", [], {}):
|
||||
continue
|
||||
missing_fields.append(field)
|
||||
if missing_fields:
|
||||
fields_str = ", ".join(missing_fields)
|
||||
raise ValueError(f"{step_name}: missing required fields: {fields_str}")
|
||||
|
||||
|
||||
def _missing_required_fields(arguments: dict[str, Any], required_fields: list[str]) -> list[str]:
|
||||
missing_fields: list[str] = []
|
||||
for field in required_fields:
|
||||
value = arguments.get(field)
|
||||
if isinstance(value, str) and value.strip():
|
||||
continue
|
||||
if value not in (None, "", [], {}):
|
||||
continue
|
||||
missing_fields.append(field)
|
||||
return missing_fields
|
||||
|
||||
|
||||
def _build_arguments_schema(required_fields: list[str]) -> dict[str, Any]:
|
||||
properties = {field: {"type": "any"} for field in required_fields}
|
||||
return {
|
||||
"type": "object",
|
||||
"required": required_fields,
|
||||
"properties": properties,
|
||||
}
|
||||
|
||||
|
||||
def _build_polza_response_schema(required_fields: list[str]) -> dict[str, Any]:
|
||||
value_schema: dict[str, Any] = {
|
||||
"type": ["string", "number", "boolean", "array", "object", "null"]
|
||||
}
|
||||
arguments_properties = {field: value_schema for field in required_fields}
|
||||
return {
|
||||
"name": "mcp_arguments",
|
||||
"strict": True,
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"arguments": {
|
||||
"type": "object",
|
||||
"properties": arguments_properties,
|
||||
"required": required_fields,
|
||||
"additionalProperties": True,
|
||||
}
|
||||
},
|
||||
"required": ["arguments"],
|
||||
"additionalProperties": False,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _extract_planned_arguments(content: Any) -> dict[str, Any]:
|
||||
candidate: Any = content
|
||||
if isinstance(candidate, str):
|
||||
text = candidate.strip()
|
||||
if text.startswith("```"):
|
||||
text = text.strip("`").strip()
|
||||
if text.startswith("json"):
|
||||
text = text[4:].strip()
|
||||
try:
|
||||
candidate = json.loads(text)
|
||||
except json.JSONDecodeError:
|
||||
return {}
|
||||
|
||||
if isinstance(candidate, dict):
|
||||
if isinstance(candidate.get("arguments"), dict):
|
||||
return candidate["arguments"]
|
||||
# Some models return the arguments object directly.
|
||||
return candidate
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
class McpWorkflowRunner:
|
||||
"""
|
||||
Minimal workflow runner:
|
||||
- fixed step order from scenario
|
||||
- same planner agent in every step
|
||||
- MCP call executed by code, not by the agent
|
||||
- request/response persisted in run context
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._workflow_cache: dict[str, Workflow] = {}
|
||||
self._planner_repair_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
|
||||
self._run_state_ctx: ContextVar[dict[str, Any] | None] = ContextVar(
|
||||
"mcp_workflow_run_state",
|
||||
default=None,
|
||||
)
|
||||
|
||||
def _get_run_state(self) -> dict[str, Any]:
|
||||
run_state = self._run_state_ctx.get()
|
||||
if run_state is None:
|
||||
raise RuntimeError("run state is not initialized")
|
||||
return run_state
|
||||
|
||||
def _build_scope(self) -> dict[str, Any]:
|
||||
run_state = self._get_run_state()
|
||||
return {
|
||||
"input": run_state.get("input", {}),
|
||||
"steps": run_state.get("steps", {}),
|
||||
}
|
||||
|
||||
async def _plan_arguments(
|
||||
self,
|
||||
*,
|
||||
step_name: str,
|
||||
tool_name: str,
|
||||
base_arguments: dict[str, Any],
|
||||
required_fields: list[str],
|
||||
scope: dict[str, Any],
|
||||
planner_cache: dict[str, dict[str, Any]] | None = None,
|
||||
missing_fields: list[str] | None = None,
|
||||
attempt_no: int = 1,
|
||||
) -> dict[str, Any]:
|
||||
cache_key: str | None = None
|
||||
if planner_cache is not None:
|
||||
try:
|
||||
cache_payload = {
|
||||
"tool_name": tool_name,
|
||||
"base_arguments": base_arguments,
|
||||
"required_fields": required_fields,
|
||||
"missing_fields": missing_fields or [],
|
||||
"attempt_no": attempt_no,
|
||||
}
|
||||
cache_key = json.dumps(cache_payload, sort_keys=True, ensure_ascii=False)
|
||||
except TypeError:
|
||||
cache_key = None
|
||||
if cache_key is not None and cache_key in planner_cache:
|
||||
return deepcopy(planner_cache[cache_key])
|
||||
|
||||
planner_context = {
|
||||
"input": scope.get("input", {}),
|
||||
"steps": scope.get("steps", {}),
|
||||
}
|
||||
for key, value in scope.items():
|
||||
if key in {"input", "steps"}:
|
||||
continue
|
||||
planner_context[key] = value
|
||||
|
||||
prompt = {
|
||||
"task": "Prepare MCP arguments for this step.",
|
||||
"step_name": step_name,
|
||||
"tool_name": tool_name,
|
||||
"required_fields": required_fields,
|
||||
"base_arguments": base_arguments,
|
||||
"missing_fields": missing_fields or [],
|
||||
"repair_attempt": attempt_no,
|
||||
"arguments_schema": _build_arguments_schema(required_fields),
|
||||
"context": planner_context,
|
||||
"response_contract": {
|
||||
"must_return": {"arguments": "object"},
|
||||
"must_include_fields": missing_fields or [],
|
||||
"forbidden": "extra unrelated keys",
|
||||
},
|
||||
"output": (
|
||||
"Return only JSON object with key 'arguments'. "
|
||||
"If missing_fields is not empty, fill every missing field from context."
|
||||
),
|
||||
}
|
||||
prompt_json = json.dumps(prompt, ensure_ascii=False)
|
||||
planned: dict[str, Any] = {}
|
||||
|
||||
# Primary path: strict structured output via Polza response_format/json_schema.
|
||||
try:
|
||||
completion = await get_shared_step_planner_client().chat.completions.create(
|
||||
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"You are a tool-input planner. "
|
||||
"Return only JSON that matches the provided schema."
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": prompt_json},
|
||||
],
|
||||
response_format={
|
||||
"type": "json_schema",
|
||||
"json_schema": _build_polza_response_schema(required_fields),
|
||||
},
|
||||
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
|
||||
)
|
||||
raw_content = completion.choices[0].message.content if completion.choices else ""
|
||||
planned = _extract_planned_arguments(raw_content)
|
||||
except Exception:
|
||||
planned = {}
|
||||
|
||||
if not isinstance(planned, dict):
|
||||
planned = {}
|
||||
|
||||
# Allow planner to override/fill base arguments while keeping known defaults.
|
||||
merged = deepcopy(base_arguments)
|
||||
merged.update(planned)
|
||||
if planner_cache is not None and cache_key is not None:
|
||||
planner_cache[cache_key] = deepcopy(merged)
|
||||
return merged
|
||||
|
||||
def _build_tool_step_executor(self, step_spec: dict[str, Any]):
|
||||
step_name = str(step_spec["name"])
|
||||
tool_name = str(step_spec["tool"])
|
||||
input_template = step_spec.get("input", {})
|
||||
foreach_spec = step_spec.get("foreach")
|
||||
collect_template = step_spec.get("collect")
|
||||
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
|
||||
required_fields_raw = step_spec.get("required_input_fields", [])
|
||||
required_fields = (
|
||||
[field for field in required_fields_raw if isinstance(field, str)]
|
||||
if isinstance(required_fields_raw, list)
|
||||
else []
|
||||
)
|
||||
if isinstance(foreach_spec, dict):
|
||||
source_path = str(foreach_spec.get("from", "")).strip()
|
||||
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
|
||||
else:
|
||||
source_path = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
|
||||
item_alias = "item"
|
||||
|
||||
async def _executor(_step_input: StepInput) -> StepOutput:
|
||||
run_state = self._get_run_state()
|
||||
scope = self._build_scope()
|
||||
step_started_at = _utc_now_iso()
|
||||
planner_cache: dict[str, dict[str, Any]] = {}
|
||||
|
||||
async def _prepare_arguments(
|
||||
*,
|
||||
local_scope: dict[str, Any],
|
||||
local_base_arguments: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
final_arguments = deepcopy(local_base_arguments)
|
||||
for repair_attempt in range(1, self._planner_repair_attempts + 1):
|
||||
missing_fields = _missing_required_fields(final_arguments, required_fields)
|
||||
if not missing_fields:
|
||||
break
|
||||
final_arguments = await self._plan_arguments(
|
||||
step_name=step_name,
|
||||
tool_name=tool_name,
|
||||
base_arguments=final_arguments,
|
||||
required_fields=required_fields,
|
||||
scope=local_scope,
|
||||
planner_cache=planner_cache,
|
||||
missing_fields=missing_fields,
|
||||
attempt_no=repair_attempt,
|
||||
)
|
||||
_validate_required_fields(final_arguments, required_fields, step_name)
|
||||
return final_arguments
|
||||
|
||||
async def _call_tool_with_repair(
|
||||
*,
|
||||
initial_arguments: dict[str, Any],
|
||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
final_arguments = deepcopy(initial_arguments)
|
||||
tool_response = await call_mcp_tool(tool_name, final_arguments)
|
||||
return tool_response, final_arguments
|
||||
|
||||
try:
|
||||
tool_calls = run_state.setdefault("tool_calls", [])
|
||||
if not isinstance(tool_calls, list):
|
||||
tool_calls = []
|
||||
run_state["tool_calls"] = tool_calls
|
||||
|
||||
if source_path:
|
||||
iterable = _resolve_path(scope, source_path)
|
||||
if not isinstance(iterable, list):
|
||||
raise ValueError(f"{step_name}: foreach source is not list")
|
||||
|
||||
collected_items: list[Any] = []
|
||||
for index, item in enumerate(iterable):
|
||||
iteration_scope = dict(scope)
|
||||
iteration_scope[item_alias] = item
|
||||
iteration_scope["item"] = item
|
||||
iteration_scope["index"] = index
|
||||
|
||||
resolved = _resolve_template(input_template, iteration_scope)
|
||||
base_arguments = resolved if isinstance(resolved, dict) else {}
|
||||
final_arguments = await _prepare_arguments(
|
||||
local_scope=iteration_scope,
|
||||
local_base_arguments=base_arguments,
|
||||
)
|
||||
tool_response, final_arguments = await _call_tool_with_repair(
|
||||
initial_arguments=final_arguments,
|
||||
)
|
||||
tool_calls.append(
|
||||
{
|
||||
"step_name": step_name,
|
||||
"tool_name": tool_name,
|
||||
"attempt": index + 1,
|
||||
"request": final_arguments,
|
||||
"ok": True,
|
||||
"response": tool_response,
|
||||
}
|
||||
)
|
||||
|
||||
if collect_template is None:
|
||||
collected_items.append(tool_response.get("payload", {}))
|
||||
else:
|
||||
collected_items.append(
|
||||
_resolve_template(
|
||||
collect_template,
|
||||
{**iteration_scope, "tool": tool_response},
|
||||
)
|
||||
)
|
||||
|
||||
step_payload = {
|
||||
"ok": True,
|
||||
"tool_name": tool_name,
|
||||
"payload": {collect_key: collected_items},
|
||||
"request": {"foreach_from": source_path, "count": len(iterable)},
|
||||
"received_at": _utc_now_iso(),
|
||||
"started_at": step_started_at,
|
||||
"finished_at": _utc_now_iso(),
|
||||
}
|
||||
else:
|
||||
resolved = _resolve_template(input_template, scope)
|
||||
base_arguments = resolved if isinstance(resolved, dict) else {}
|
||||
final_arguments = await _prepare_arguments(
|
||||
local_scope=scope,
|
||||
local_base_arguments=base_arguments,
|
||||
)
|
||||
tool_response, final_arguments = await _call_tool_with_repair(
|
||||
initial_arguments=final_arguments,
|
||||
)
|
||||
step_payload = {
|
||||
"ok": bool(tool_response.get("ok", True)),
|
||||
"tool_name": tool_name,
|
||||
"payload": tool_response.get("payload", {}),
|
||||
"request": final_arguments,
|
||||
"response": tool_response,
|
||||
"received_at": tool_response.get("received_at"),
|
||||
"started_at": step_started_at,
|
||||
"finished_at": _utc_now_iso(),
|
||||
}
|
||||
tool_calls.append(
|
||||
{
|
||||
"step_name": step_name,
|
||||
"tool_name": tool_name,
|
||||
"request": final_arguments,
|
||||
"ok": True,
|
||||
"response": tool_response,
|
||||
}
|
||||
)
|
||||
|
||||
run_state.setdefault("steps", {})[step_name] = step_payload
|
||||
return StepOutput(
|
||||
content=json.dumps(step_payload, ensure_ascii=False),
|
||||
success=True,
|
||||
)
|
||||
except Exception as exc:
|
||||
error_payload = {
|
||||
"ok": False,
|
||||
"tool_name": tool_name,
|
||||
"request": {},
|
||||
"error": str(exc),
|
||||
"started_at": step_started_at,
|
||||
"finished_at": _utc_now_iso(),
|
||||
}
|
||||
run_state.setdefault("steps", {})[step_name] = error_payload
|
||||
run_state.setdefault("tool_calls", []).append(
|
||||
{
|
||||
"step_name": step_name,
|
||||
"tool_name": tool_name,
|
||||
"request": {},
|
||||
"ok": False,
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
raise RuntimeError(f"{step_name} failed: {exc}") from exc
|
||||
|
||||
return _executor
|
||||
|
||||
def get_workflow(self, scenario_id: str, scenario: dict[str, Any]) -> Workflow:
|
||||
cached = self._workflow_cache.get(scenario_id)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
raw_steps = scenario.get("steps")
|
||||
if not isinstance(raw_steps, list) or not raw_steps:
|
||||
raise ScenarioStoreError("Scenario must contain non-empty steps list")
|
||||
|
||||
workflow_steps: list[Step] = []
|
||||
for raw_step in raw_steps:
|
||||
if not isinstance(raw_step, dict):
|
||||
raise ScenarioStoreError("Each scenario step must be object")
|
||||
if raw_step.get("type") != "tool":
|
||||
raise ScenarioStoreError("This minimal runner supports only tool steps")
|
||||
|
||||
step_name = str(raw_step.get("name", "")).strip()
|
||||
tool_name = str(raw_step.get("tool", step_name)).strip()
|
||||
if not step_name or not tool_name:
|
||||
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
|
||||
|
||||
executor = self._build_tool_step_executor(raw_step)
|
||||
workflow_steps.append(
|
||||
Step(
|
||||
name=step_name,
|
||||
description=str(raw_step.get("description", step_name)),
|
||||
executor=executor,
|
||||
max_retries=0,
|
||||
on_error="fail",
|
||||
)
|
||||
)
|
||||
|
||||
workflow = Workflow(
|
||||
name=scenario_id,
|
||||
description=str(scenario.get("description", "")),
|
||||
steps=workflow_steps,
|
||||
)
|
||||
self._workflow_cache[scenario_id] = workflow
|
||||
return workflow
|
||||
|
||||
async def run(self, *, scenario_id: str, input_data: dict[str, Any]) -> dict[str, Any]:
|
||||
scenario = load_scenario_definition(scenario_id)
|
||||
workflow = self.get_workflow(scenario_id, scenario)
|
||||
|
||||
initial_state = {
|
||||
"input": deepcopy(input_data),
|
||||
"steps": {},
|
||||
"tool_calls": [],
|
||||
}
|
||||
token = self._run_state_ctx.set(initial_state)
|
||||
run_state = initial_state
|
||||
run_output: Any = None
|
||||
workflow_error: str | None = None
|
||||
try:
|
||||
run_output = await workflow.arun(input=input_data)
|
||||
except Exception as exc:
|
||||
workflow_error = str(exc)
|
||||
finally:
|
||||
captured = self._run_state_ctx.get()
|
||||
if isinstance(captured, dict):
|
||||
run_state = deepcopy(captured)
|
||||
self._run_state_ctx.reset(token)
|
||||
|
||||
content = run_output.content if hasattr(run_output, "content") else None
|
||||
if isinstance(content, str):
|
||||
try:
|
||||
content = json.loads(content)
|
||||
except json.JSONDecodeError:
|
||||
content = {"raw_content": content}
|
||||
if content is None:
|
||||
step_payloads = run_state.get("steps", {})
|
||||
if isinstance(step_payloads, dict):
|
||||
for payload in reversed(list(step_payloads.values())):
|
||||
if isinstance(payload, dict) and not bool(payload.get("ok", True)):
|
||||
content = deepcopy(payload)
|
||||
break
|
||||
if content is None and workflow_error is not None:
|
||||
content = {"error": workflow_error}
|
||||
|
||||
status = "success"
|
||||
if workflow_error is not None:
|
||||
status = "failed"
|
||||
elif run_output is not None and not bool(getattr(run_output, "success", True)):
|
||||
status = "failed"
|
||||
return {
|
||||
"scenario_id": scenario_id,
|
||||
"workflow_name": workflow.name,
|
||||
"status": status,
|
||||
"input": input_data,
|
||||
"final_result": content if isinstance(content, dict) else {"raw_content": content},
|
||||
"steps": run_state.get("steps", {}),
|
||||
"tool_calls": run_state.get("tool_calls", []),
|
||||
"run_id": str(getattr(run_output, "run_id", "")) or None,
|
||||
"session_id": str(getattr(run_output, "session_id", "")) or None,
|
||||
}
|
||||
|
||||
|
||||
_default_runner: McpWorkflowRunner | None = None
|
||||
|
||||
|
||||
def get_mcp_workflow_runner() -> McpWorkflowRunner:
|
||||
global _default_runner
|
||||
if _default_runner is not None:
|
||||
return _default_runner
|
||||
_default_runner = McpWorkflowRunner()
|
||||
return _default_runner
|
||||
|
||||
|
||||
def _extract_output_summary(content: Any) -> str | None:
|
||||
if not isinstance(content, dict):
|
||||
return None
|
||||
summary = content.get("summary")
|
||||
if isinstance(summary, str) and summary:
|
||||
return summary
|
||||
payload = content.get("payload")
|
||||
if isinstance(payload, dict):
|
||||
payload_summary = payload.get("summary")
|
||||
if isinstance(payload_summary, str) and payload_summary:
|
||||
return payload_summary
|
||||
return None
|
||||
|
||||
|
||||
def _build_step_states_from_minimal(
|
||||
*,
|
||||
scenario: dict[str, Any],
|
||||
minimal_steps: dict[str, Any],
|
||||
) -> list[StepState]:
|
||||
raw_steps = scenario.get("steps")
|
||||
if not isinstance(raw_steps, list):
|
||||
return []
|
||||
|
||||
step_states: list[StepState] = []
|
||||
for raw_step in raw_steps:
|
||||
if not isinstance(raw_step, dict):
|
||||
continue
|
||||
step_name = str(raw_step.get("name", "")).strip()
|
||||
if not step_name:
|
||||
continue
|
||||
payload = minimal_steps.get(step_name)
|
||||
if not isinstance(payload, dict):
|
||||
step_states.append(StepState(node_id=step_name, status="queued"))
|
||||
continue
|
||||
ok = bool(payload.get("ok", False))
|
||||
step_states.append(
|
||||
StepState(
|
||||
node_id=step_name,
|
||||
status="success" if ok else "failed",
|
||||
started_at=str(payload.get("started_at") or "") or None,
|
||||
finished_at=str(payload.get("finished_at") or "") or None,
|
||||
error=RunError(
|
||||
code="tool_error",
|
||||
message=str(payload.get("error", f"{step_name} failed")),
|
||||
)
|
||||
if not ok
|
||||
else None,
|
||||
)
|
||||
)
|
||||
return step_states
|
||||
|
||||
|
||||
async def run_scenario_workflow(
|
||||
input_data: dict[str, Any],
|
||||
scenario_id: str = "news_source_discovery_v1",
|
||||
) -> dict[str, Any]:
|
||||
try:
|
||||
scenario = load_scenario_definition(scenario_id)
|
||||
except ScenarioStoreError as exc:
|
||||
return ScenarioRunResponse(
|
||||
scenario_id=scenario_id,
|
||||
status="failed",
|
||||
input=input_data,
|
||||
steps=[],
|
||||
error=RunError(code="unknown_scenario", message=str(exc)),
|
||||
).model_dump()
|
||||
|
||||
runner = get_mcp_workflow_runner()
|
||||
scenario_name = str(scenario.get("name", scenario_id))
|
||||
try:
|
||||
minimal_result = await runner.run(
|
||||
scenario_id=scenario_id,
|
||||
input_data=input_data,
|
||||
)
|
||||
except Exception as exc:
|
||||
return ScenarioRunResponse(
|
||||
scenario_id=scenario_id,
|
||||
status="failed",
|
||||
input=input_data,
|
||||
scenario_name=scenario_name,
|
||||
steps=[],
|
||||
error=RunError(code="workflow_error", message=str(exc)),
|
||||
).model_dump()
|
||||
|
||||
minimal_steps = minimal_result.get("steps", {})
|
||||
steps = (
|
||||
minimal_steps
|
||||
if isinstance(minimal_steps, dict)
|
||||
else {}
|
||||
)
|
||||
step_states = _build_step_states_from_minimal(
|
||||
scenario=scenario,
|
||||
minimal_steps=steps,
|
||||
)
|
||||
|
||||
final_result = minimal_result.get("final_result")
|
||||
normalized_result = (
|
||||
final_result if isinstance(final_result, dict) else {"raw_content": str(final_result)}
|
||||
)
|
||||
status = "success"
|
||||
for payload in steps.values():
|
||||
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
|
||||
status = "failed"
|
||||
break
|
||||
|
||||
return ScenarioRunResponse(
|
||||
scenario_id=scenario_id,
|
||||
status=status,
|
||||
input=input_data,
|
||||
steps=step_states,
|
||||
output_summary=_extract_output_summary(normalized_result),
|
||||
scenario_name=scenario_name,
|
||||
workflow_name=str(minimal_result.get("workflow_name") or scenario_id),
|
||||
result=normalized_result,
|
||||
error=None
|
||||
if status == "success"
|
||||
else RunError(code="workflow_failed", message="Workflow finished with failed status."),
|
||||
run_id=minimal_result.get("run_id"),
|
||||
session_id=minimal_result.get("session_id"),
|
||||
).model_dump()
|
||||
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
|
||||
from phoenix.otel import register
|
||||
|
||||
_initialized = False
|
||||
|
||||
|
||||
def _env_bool(name: str, default: bool) -> bool:
|
||||
value = os.getenv(name)
|
||||
if value is None:
|
||||
return default
|
||||
return value.strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def init_phoenix_tracing() -> bool:
|
||||
global _initialized
|
||||
|
||||
enabled = _env_bool("PHOENIX_TRACING_ENABLED", False)
|
||||
if not enabled:
|
||||
return False
|
||||
|
||||
if _initialized:
|
||||
return True
|
||||
|
||||
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = os.getenv(
|
||||
"PHOENIX_COLLECTOR_ENDPOINT",
|
||||
"http://localhost:6006",
|
||||
)
|
||||
|
||||
project_name = os.getenv("PHOENIX_PROJECT_NAME", "prisma-platform")
|
||||
register(
|
||||
project_name=project_name,
|
||||
auto_instrument=True,
|
||||
)
|
||||
_initialized = True
|
||||
return True
|
||||
@@ -0,0 +1,60 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ScenarioStoreError(ValueError):
|
||||
"""Raised when scenario definitions are missing or invalid."""
|
||||
|
||||
|
||||
_SCENARIOS_ROOT = Path(__file__).resolve().parent.parent / "scenarios"
|
||||
_INDEX_PATH = _SCENARIOS_ROOT / "index.json"
|
||||
|
||||
|
||||
def _read_json(path: Path) -> dict[str, Any]:
|
||||
try:
|
||||
raw = path.read_text(encoding="utf-8")
|
||||
except FileNotFoundError as exc:
|
||||
raise ScenarioStoreError(f"Scenario file not found: {path}") from exc
|
||||
|
||||
try:
|
||||
parsed = json.loads(raw)
|
||||
except json.JSONDecodeError as exc:
|
||||
raise ScenarioStoreError(f"Invalid JSON in file: {path}") from exc
|
||||
|
||||
if not isinstance(parsed, dict):
|
||||
raise ScenarioStoreError(f"JSON root must be object: {path}")
|
||||
return parsed
|
||||
|
||||
|
||||
def load_scenario_index() -> dict[str, str]:
|
||||
index = _read_json(_INDEX_PATH)
|
||||
scenarios = index.get("scenarios")
|
||||
if not isinstance(scenarios, dict):
|
||||
raise ScenarioStoreError("index.json must contain object field 'scenarios'")
|
||||
|
||||
normalized: dict[str, str] = {}
|
||||
for scenario_id, relative_path in scenarios.items():
|
||||
if not isinstance(scenario_id, str) or not isinstance(relative_path, str):
|
||||
raise ScenarioStoreError("index.json scenario entries must be string -> string")
|
||||
normalized[scenario_id] = relative_path
|
||||
return normalized
|
||||
|
||||
|
||||
def load_scenario_definition(scenario_id: str) -> dict[str, Any]:
|
||||
index = load_scenario_index()
|
||||
relative_path = index.get(scenario_id)
|
||||
if relative_path is None:
|
||||
raise ScenarioStoreError(f"Unknown scenario_id: {scenario_id}")
|
||||
|
||||
scenario_path = _SCENARIOS_ROOT / relative_path
|
||||
scenario = _read_json(scenario_path)
|
||||
|
||||
declared_id = scenario.get("scenario_id")
|
||||
if declared_id != scenario_id:
|
||||
raise ScenarioStoreError(
|
||||
"Scenario file scenario_id does not match requested scenario_id"
|
||||
)
|
||||
return scenario
|
||||
@@ -0,0 +1,40 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
|
||||
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
|
||||
|
||||
|
||||
class RunError(BaseModel):
|
||||
code: str
|
||||
message: str
|
||||
|
||||
|
||||
class ScenarioRunRequest(BaseModel):
|
||||
scenario_id: str = "news_source_discovery_v1"
|
||||
input: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class StepState(BaseModel):
|
||||
node_id: str
|
||||
status: StepStatus
|
||||
started_at: str | None = None
|
||||
finished_at: str | None = None
|
||||
error: RunError | None = None
|
||||
|
||||
|
||||
class ScenarioRunResponse(BaseModel):
|
||||
scenario_id: str
|
||||
status: RunStatus
|
||||
input: dict[str, Any]
|
||||
steps: list[StepState] = Field(default_factory=list)
|
||||
output_summary: str | None = None
|
||||
scenario_name: str | None = None
|
||||
workflow_name: str | None = None
|
||||
result: dict[str, Any] | None = None
|
||||
error: RunError | None = None
|
||||
run_id: str | None = None
|
||||
session_id: str | None = None
|
||||
Reference in New Issue
Block a user