Перевести исполнение сценариев на MCP workflow runner.

Удален legacy workflow_runner со stub-инструментами, добавлен mcp_client и новый mcp_workflow_runner с planner-моделью через polza.ai, обновлены сценарий, API/AgentOS wiring и документация под текущий контур запуска.
This commit is contained in:
Barabashka
2026-04-22 16:36:42 +03:00
parent 93ee7aea1c
commit ad828885e3
9 changed files with 746 additions and 638 deletions
+7 -1
View File
@@ -4,9 +4,15 @@ OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0
AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true
AGENT_INSTRUCTIONS=You are a helpful assistant. Answer briefly and clearly.
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
AGENT_OS_HOST=127.0.0.1
AGENT_OS_PORT=7777
POLZA_BASE_URL=https://api.polza.ai/v1
POLZA_MODEL_ID=google/gemma-4-31b-it
POLZA_API_KEY=key
POLZA_TEMPERATURE=0
MCP_BASE_URL=http://127.0.0.1:8081/mcp
MCP_TIMEOUT_SECONDS=10
PHOENIX_TRACING_ENABLED=false
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform
+65 -110
View File
@@ -1,14 +1,19 @@
# Prisma Platform MVP
Минимальный чат-агент на Agno + Ollama с рантаймом AgentOS.
MVP-реализация сценарного раннера на Agno AgentOS.
В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn).
Текущая схема исполнения:
- сценарий хранится в `scenarios/*.json`;
- исполнение идет через `src/mcp_workflow_runner.py`;
- каждый шаг вызывает MCP инструмент через `src/mcp_client.py`;
- для подготовки аргументов шага используется planner-агент с моделью через `polza.ai`.
## Требования
- Python 3.10+
- Запущенный Ollama endpoint (по умолчанию: `http://localhost:11435`)
- Доступная модель в Ollama (по умолчанию: `gemma4:31b`)
- MCP endpoint (по умолчанию `http://127.0.0.1:8081/mcp`)
- доступ к модели через `polza.ai` (`POLZA_API_KEY`)
## Текущая структура
@@ -26,11 +31,11 @@ prisma_platform/
├── api_routes.py
├── agent_os.py
├── agent_runner.py
├── mcp_client.py
├── mcp_workflow_runner.py
├── observability.py
├── scenario_store.py
── schemas.py
├── stub_tools.py
└── workflow_runner.py
── schemas.py
```
## Установка
@@ -44,25 +49,32 @@ cp .env.example .env
## Запуск
Запуск сервера AgentOS:
1) Поднимите MCP stub (из соседнего репозитория):
```bash
python -m src.agent_os
cd /home/worker/projects/docker-service/mcp-stub
docker compose up --build -d
```
По умолчанию AgentOS работает на `http://127.0.0.1:7777`.
2) Запустите сервер AgentOS:
Документация API доступна по адресам:
```bash
cd /home/worker/projects/prisma_platform
.venv/bin/python -m src.agent_os
```
По умолчанию приложение доступно на `http://127.0.0.1:7777`.
Документация API:
- `http://127.0.0.1:7777/docs`
- `http://127.0.0.1:7777/redoc`
Верхний слой сервиса реализован как кастомные FastAPI роуты (`src/api_routes.py`), подключенные через `AgentOS(base_app=...)`.
### Запуск сценария через HTTP
## Запуск сценария через HTTP
- `POST http://127.0.0.1:7777/api/runs`
- Тело запроса (JSON):
Тело запроса:
```json
{
@@ -73,10 +85,10 @@ python -m src.agent_os
}
```
Пример запроса:
Пример:
```bash
curl -X POST "http://127.0.0.1:7777/api/runs" \
curl -s -X POST "http://127.0.0.1:7777/api/runs" \
-H "Content-Type: application/json" \
-d '{
"scenario_id": "news_source_discovery_v1",
@@ -86,106 +98,45 @@ curl -X POST "http://127.0.0.1:7777/api/runs" \
}'
```
Endpoint возвращает единый JSON-контракт. Поля одинаковые для `success` и `failed`,
а в неактуальных полях приходит `null`.
Успешный ответ содержит:
Пример успешного ответа:
```json
{
"scenario_id": "news_source_discovery_v1",
"status": "success",
"input": {
"url": "https://example.com/news"
},
"steps": [
{
"node_id": "search_news_sources",
"status": "success",
"started_at": "2026-04-22T10:00:00+00:00",
"finished_at": "2026-04-22T10:00:00+00:00",
"error": null
}
],
"output_summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1",
"workflow_name": "news_source_discovery_v1",
"scenario_name": "News Source Discovery V1",
"result": {
"tool_name": "generate_summary",
"ok": true,
"payload": {
"input_count": 3,
"summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1"
},
"received_at": "2026-04-22T10:00:00+00:00"
},
"error": null,
"run_id": "run_xxx",
"session_id": "session_xxx"
}
```
Пример ответа с ошибкой валидации:
```json
{
"scenario_id": "news_source_discovery_v1",
"status": "failed",
"input": {},
"steps": [
{
"node_id": "search_news_sources",
"status": "queued",
"started_at": null,
"finished_at": null,
"error": null
}
],
"output_summary": null,
"workflow_name": null,
"scenario_name": "News Source Discovery V1",
"result": null,
"error": {
"code": "invalid_input",
"message": "Input does not match scenario input_schema: ..."
},
"run_id": null,
"session_id": null
}
```
Проверка, что сервер поднят:
```bash
curl -s "http://127.0.0.1:7777/docs" | grep -n "Swagger UI"
```
- `status=success`
- список `steps` со статусами шагов
- `output_summary`
- `result` итогового шага
## Переменные окружения
Основные переменные:
Основные:
- `AGENT_ID` (по умолчанию: `prisma-agent`)
- `OLLAMA_MODEL_ID` (по умолчанию: `gemma4:31b`)
- `OLLAMA_HOST` (по умолчанию: `http://localhost:11435`)
- `OLLAMA_TEMPERATURE` (по умолчанию: `0`)
- `AGENT_MARKDOWN` (по умолчанию: `false`)
- `AGENT_DEBUG_MODE` (по умолчанию: `true`)
- `AGENT_INSTRUCTIONS` (по умолчанию: `You are a helpful assistant. Answer briefly and clearly.`)
- `AGENT_OS_HOST` (по умолчанию: `127.0.0.1`)
- `AGENT_OS_PORT` (по умолчанию: `7777`)
- `PHOENIX_TRACING_ENABLED` (по умолчанию: `false`)
- `PHOENIX_COLLECTOR_ENDPOINT` (по умолчанию: `http://localhost:6006`)
- `PHOENIX_PROJECT_NAME` (по умолчанию: `prisma-platform`)
- `AGENT_ID` (default: `prisma-agent`)
- `AGENT_MARKDOWN` (default: `false`)
- `AGENT_DEBUG_MODE` (default: `true`)
- `AGENT_INSTRUCTIONS`
- `AGENT_OS_HOST` (default: `127.0.0.1`)
- `AGENT_OS_PORT` (default: `7777`)
Planner-модель (`polza.ai`):
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
- `POLZA_API_KEY` (required)
- `POLZA_TEMPERATURE` (default: `0`)
MCP:
- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`)
- `MCP_TIMEOUT_SECONDS` (default: `10`)
Phoenix tracing:
- `PHOENIX_TRACING_ENABLED` (default: `false`)
- `PHOENIX_COLLECTOR_ENDPOINT` (default: `http://localhost:6006`)
- `PHOENIX_PROJECT_NAME` (default: `prisma-platform`)
## Phoenix трассировка (локально)
1. Установите зависимости:
```bash
pip install -r requirements.txt
```
2. Поднимите Phoenix (см. `docker-service/docker-compose.yml`) и включите трассировку в `.env`:
1) Включите трассировку в `.env`:
```dotenv
PHOENIX_TRACING_ENABLED=true
@@ -193,5 +144,9 @@ PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform
```
3. Запустите приложение как обычно (`python -m src.agent_os`).
2) Запустите приложение:
```bash
.venv/bin/python -m src.agent_os
```
+71 -6
View File
@@ -2,7 +2,7 @@
"schema_version": "1",
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential stub tools.",
"description": "Find earliest news source using sequential MCP tools.",
"input_schema": {
"type": "object",
"required": [
@@ -18,23 +18,88 @@
"steps": [
{
"name": "search_news_sources",
"type": "tool"
"type": "tool",
"tool": "search_news_sources",
"input": {
"url": {
"from": "input.url"
}
},
"required_input_fields": [
"url"
]
},
{
"name": "parse_articles_batch",
"type": "tool"
"type": "tool",
"tool": "parse_article",
"foreach": {
"from": "steps.search_news_sources.payload.items",
"as": "item"
},
"input": {
"url": {
"from": "item.url"
}
},
"collect": {
"url": {
"from": "tool.payload.url"
},
"title": {
"from": "tool.payload.title"
},
"text": {
"from": "tool.payload.text"
}
},
"collect_key": "items"
},
{
"name": "extract_publication_date_batch",
"type": "tool"
"type": "tool",
"tool": "extract_publication_date",
"foreach": {
"from": "steps.parse_articles_batch.payload.items",
"as": "item"
},
"input": {
"article_text": {
"from": "item.text"
}
},
"collect": {
"url": {
"from": "item.url"
},
"title": {
"from": "item.title"
},
"published_at": {
"from": "tool.payload.published_at"
}
},
"collect_key": "items"
},
{
"name": "rank_sources_by_date",
"type": "tool"
"type": "tool",
"tool": "rank_sources_by_date",
"input": {
"items": {
"from": "steps.extract_publication_date_batch.payload.items"
}
}
},
{
"name": "generate_summary",
"type": "tool"
"type": "tool",
"tool": "generate_summary",
"input": {
"items": {
"from": "steps.rank_sources_by_date.payload.ranked_items"
}
}
}
]
}
-6
View File
@@ -8,16 +8,11 @@ from agno.os import AgentOS
from src.api_routes import router as api_router
from src.agent_runner import get_agent
from src.observability import init_phoenix_tracing
from src.scenario_store import load_scenario_definition
from src.workflow_runner import get_workflow_for_scenario
load_dotenv()
_tracing_enabled = init_phoenix_tracing()
_agent = get_agent()
_default_scenario_id = "news_source_discovery_v1"
_scenario = load_scenario_definition(_default_scenario_id)
_workflow = get_workflow_for_scenario(_default_scenario_id, _scenario)
_base_app = FastAPI(
title="Prisma Platform API",
version="0.1.0",
@@ -25,7 +20,6 @@ _base_app = FastAPI(
_base_app.include_router(api_router)
_agent_os = AgentOS(
agents=[_agent],
workflows=[_workflow],
tracing=_tracing_enabled,
base_app=_base_app,
)
+1 -1
View File
@@ -1,7 +1,7 @@
from fastapi import APIRouter
from src.mcp_workflow_runner import run_scenario_workflow
from src.schemas import ScenarioRunRequest, ScenarioRunResponse
from src.workflow_runner import run_scenario_workflow
router = APIRouter(prefix="/api", tags=["workflow"])
+56
View File
@@ -0,0 +1,56 @@
from __future__ import annotations
from datetime import timedelta
import json
import os
from typing import Any
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import TextContent
def _mcp_url() -> str:
return os.getenv("MCP_BASE_URL", "http://127.0.0.1:8081/mcp")
def _timeout_seconds() -> float:
value = os.getenv("MCP_TIMEOUT_SECONDS")
if value is None:
return 10.0
return float(value)
async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]:
try:
async with streamablehttp_client(url=_mcp_url()) as session_params:
read, write = session_params[0:2]
async with ClientSession(
read,
write,
read_timeout_seconds=timedelta(seconds=_timeout_seconds()),
) as session:
await session.initialize()
result = await session.call_tool(tool_name, arguments)
except TimeoutError as exc:
raise RuntimeError(f"MCP timeout: {tool_name}") from exc
except Exception as exc:
raise RuntimeError(f"MCP transport error: {tool_name}") from exc
if result.isError:
raise RuntimeError(f"MCP tool error: {tool_name}")
if isinstance(result.structuredContent, dict):
return result.structuredContent
for content_item in result.content:
if not isinstance(content_item, TextContent):
continue
try:
parsed = json.loads(content_item.text)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
return parsed
raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}")
+546
View File
@@ -0,0 +1,546 @@
from __future__ import annotations
from contextvars import ContextVar
from copy import deepcopy
from datetime import datetime, timezone
import json
import os
from typing import Any
from agno.agent import Agent
from agno.models.openai import OpenAIChat
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow
from pydantic import BaseModel, Field
from src.mcp_client import call_mcp_tool
from src.schemas import RunError, ScenarioRunResponse, StepState
from src.scenario_store import ScenarioStoreError, load_scenario_definition
class McpArgumentsPlan(BaseModel):
"""Structured planner output for one MCP tool call."""
arguments: dict[str, Any] = Field(default_factory=dict)
_planner_agent: Agent | None = None
def _env_float(name: str, default: float) -> float:
value = os.getenv(name)
if value is None:
return default
return float(value)
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def get_shared_step_planner_agent() -> Agent:
"""
Create one reusable planner agent for all workflow steps.
This agent never calls MCP directly. It only prepares arguments
for a fixed MCP method selected by the workflow step.
"""
global _planner_agent
if _planner_agent is not None:
return _planner_agent
model_id = os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it")
polza_base_url = os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1")
polza_api_key = os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY")
temperature = _env_float("POLZA_TEMPERATURE", 0.0)
llm = OpenAIChat(
id=model_id,
api_key=polza_api_key,
base_url=polza_base_url,
temperature=temperature,
)
_planner_agent = Agent(
id="workflow-step-planner",
model=llm,
output_schema=McpArgumentsPlan,
markdown=False,
debug_mode=False,
instructions=[
"You are a strict tool-input planner.",
"You receive step metadata and current workflow context.",
"Return only arguments that should be sent to MCP tool.",
"Do not add extra keys that are unrelated to the tool.",
"Do not invent values if they are absent in context.",
],
)
return _planner_agent
def _resolve_path(scope: dict[str, Any], path: str) -> Any:
value: Any = scope
for segment in path.split("."):
key = segment.strip()
if not key:
continue
if not isinstance(value, dict):
return None
value = value.get(key)
return deepcopy(value)
def _resolve_template(template: Any, scope: dict[str, Any]) -> Any:
if isinstance(template, dict):
if set(template.keys()) == {"from"}:
return _resolve_path(scope, str(template["from"]))
return {key: _resolve_template(value, scope) for key, value in template.items()}
if isinstance(template, list):
return [_resolve_template(item, scope) for item in template]
return deepcopy(template)
def _validate_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
step_name: str,
) -> None:
for field in required_fields:
value = arguments.get(field)
if isinstance(value, str) and value.strip():
continue
if value not in (None, "", [], {}):
continue
raise ValueError(f"{step_name}: input.{field} is empty")
class McpWorkflowRunner:
"""
Minimal workflow runner:
- fixed step order from scenario
- same planner agent in every step
- MCP call executed by code, not by the agent
- request/response persisted in run context
"""
def __init__(self, planner_agent: Agent | None = None) -> None:
self._planner_agent = planner_agent or get_shared_step_planner_agent()
self._workflow_cache: dict[str, Workflow] = {}
self._run_state_ctx: ContextVar[dict[str, Any] | None] = ContextVar(
"mcp_workflow_run_state",
default=None,
)
def _get_run_state(self) -> dict[str, Any]:
run_state = self._run_state_ctx.get()
if run_state is None:
raise RuntimeError("run state is not initialized")
return run_state
def _build_scope(self) -> dict[str, Any]:
run_state = self._get_run_state()
return {
"input": run_state.get("input", {}),
"steps": run_state.get("steps", {}),
}
async def _plan_arguments(
self,
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
) -> dict[str, Any]:
prompt = {
"task": "Prepare MCP arguments for this step.",
"step_name": step_name,
"tool_name": tool_name,
"required_fields": required_fields,
"base_arguments": base_arguments,
"context": {
"input": scope.get("input", {}),
"steps": scope.get("steps", {}),
},
"output": "Return arguments object only.",
}
run_output = await self._planner_agent.arun(json.dumps(prompt, ensure_ascii=False))
content = run_output.content if hasattr(run_output, "content") else {}
if isinstance(content, McpArgumentsPlan):
planned = content.arguments
elif isinstance(content, dict):
planned = content.get("arguments", {})
else:
planned = {}
if not isinstance(planned, dict):
planned = {}
# Allow planner to override/fill base arguments while keeping known defaults.
merged = deepcopy(base_arguments)
merged.update(planned)
return merged
def _build_tool_step_executor(self, step_spec: dict[str, Any]):
step_name = str(step_spec["name"])
tool_name = str(step_spec["tool"])
input_template = step_spec.get("input", {})
foreach_spec = step_spec.get("foreach")
collect_template = step_spec.get("collect")
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
required_fields_raw = step_spec.get("required_input_fields", [])
required_fields = (
[field for field in required_fields_raw if isinstance(field, str)]
if isinstance(required_fields_raw, list)
else []
)
if isinstance(foreach_spec, dict):
source_path = str(foreach_spec.get("from", "")).strip()
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
else:
source_path = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
item_alias = "item"
async def _executor(_step_input: StepInput) -> StepOutput:
run_state = self._get_run_state()
scope = self._build_scope()
step_started_at = _utc_now_iso()
try:
tool_calls = run_state.setdefault("tool_calls", [])
if not isinstance(tool_calls, list):
tool_calls = []
run_state["tool_calls"] = tool_calls
if source_path:
iterable = _resolve_path(scope, source_path)
if not isinstance(iterable, list):
raise ValueError(f"{step_name}: foreach source is not list")
collected_items: list[Any] = []
for index, item in enumerate(iterable):
iteration_scope = dict(scope)
iteration_scope[item_alias] = item
iteration_scope["item"] = item
iteration_scope["index"] = index
resolved = _resolve_template(input_template, iteration_scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
final_arguments = await self._plan_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=base_arguments,
required_fields=required_fields,
scope=iteration_scope,
)
_validate_required_fields(final_arguments, required_fields, step_name)
tool_response = await call_mcp_tool(tool_name, final_arguments)
tool_calls.append(
{
"step_name": step_name,
"tool_name": tool_name,
"attempt": index + 1,
"request": final_arguments,
"ok": True,
"response": tool_response,
}
)
if collect_template is None:
collected_items.append(tool_response.get("payload", {}))
else:
collected_items.append(
_resolve_template(
collect_template,
{**iteration_scope, "tool": tool_response},
)
)
step_payload = {
"ok": True,
"tool_name": step_name,
"payload": {collect_key: collected_items},
"request": {"foreach_from": source_path, "count": len(iterable)},
"received_at": _utc_now_iso(),
"started_at": step_started_at,
"finished_at": _utc_now_iso(),
}
else:
resolved = _resolve_template(input_template, scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
final_arguments = await self._plan_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=base_arguments,
required_fields=required_fields,
scope=scope,
)
_validate_required_fields(final_arguments, required_fields, step_name)
tool_response = await call_mcp_tool(tool_name, final_arguments)
step_payload = {
"ok": bool(tool_response.get("ok", True)),
"tool_name": tool_name,
"payload": tool_response.get("payload", {}),
"request": final_arguments,
"response": tool_response,
"received_at": tool_response.get("received_at"),
"started_at": step_started_at,
"finished_at": _utc_now_iso(),
}
tool_calls.append(
{
"step_name": step_name,
"tool_name": tool_name,
"request": final_arguments,
"ok": True,
"response": tool_response,
}
)
run_state.setdefault("steps", {})[step_name] = step_payload
return StepOutput(
content=json.dumps(step_payload, ensure_ascii=False),
success=True,
)
except Exception as exc:
error_payload = {
"ok": False,
"tool_name": tool_name,
"request": {},
"error": str(exc),
"started_at": step_started_at,
"finished_at": _utc_now_iso(),
}
run_state.setdefault("steps", {})[step_name] = error_payload
run_state.setdefault("tool_calls", []).append(
{
"step_name": step_name,
"tool_name": tool_name,
"request": {},
"ok": False,
"error": str(exc),
}
)
return StepOutput(
content=json.dumps(error_payload, ensure_ascii=False),
success=False,
)
return _executor
def get_workflow(self, scenario_id: str, scenario: dict[str, Any]) -> Workflow:
cached = self._workflow_cache.get(scenario_id)
if cached is not None:
return cached
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
if raw_step.get("type") != "tool":
raise ScenarioStoreError("This minimal runner supports only tool steps")
step_name = str(raw_step.get("name", "")).strip()
tool_name = str(raw_step.get("tool", step_name)).strip()
if not step_name or not tool_name:
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
executor = self._build_tool_step_executor(raw_step)
workflow_steps.append(
Step(
name=step_name,
description=str(raw_step.get("description", step_name)),
executor=executor,
)
)
workflow = Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
)
self._workflow_cache[scenario_id] = workflow
return workflow
async def run(self, *, scenario_id: str, input_data: dict[str, Any]) -> dict[str, Any]:
scenario = load_scenario_definition(scenario_id)
workflow = self.get_workflow(scenario_id, scenario)
initial_state = {
"input": deepcopy(input_data),
"steps": {},
"tool_calls": [],
}
token = self._run_state_ctx.set(initial_state)
run_state = initial_state
run_output: Any = None
try:
run_output = await workflow.arun(input=input_data)
finally:
captured = self._run_state_ctx.get()
if isinstance(captured, dict):
run_state = deepcopy(captured)
self._run_state_ctx.reset(token)
content = run_output.content if hasattr(run_output, "content") else None
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
return {
"scenario_id": scenario_id,
"workflow_name": workflow.name,
"status": "success"
if getattr(run_output, "success", True)
else "failed",
"input": input_data,
"final_result": content if isinstance(content, dict) else {"raw_content": content},
"steps": run_state.get("steps", {}),
"tool_calls": run_state.get("tool_calls", []),
"run_id": str(getattr(run_output, "run_id", "")) or None,
"session_id": str(getattr(run_output, "session_id", "")) or None,
}
_default_runner: McpWorkflowRunner | None = None
def get_mcp_workflow_runner() -> McpWorkflowRunner:
global _default_runner
if _default_runner is not None:
return _default_runner
_default_runner = McpWorkflowRunner()
return _default_runner
def _extract_output_summary(content: Any) -> str | None:
if not isinstance(content, dict):
return None
summary = content.get("summary")
if isinstance(summary, str) and summary:
return summary
payload = content.get("payload")
if isinstance(payload, dict):
payload_summary = payload.get("summary")
if isinstance(payload_summary, str) and payload_summary:
return payload_summary
return None
def _build_step_states_from_minimal(
*,
scenario: dict[str, Any],
minimal_steps: dict[str, Any],
) -> list[StepState]:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list):
return []
step_states: list[StepState] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
continue
step_name = str(raw_step.get("name", "")).strip()
if not step_name:
continue
payload = minimal_steps.get(step_name)
if not isinstance(payload, dict):
step_states.append(StepState(node_id=step_name, status="queued"))
continue
ok = bool(payload.get("ok", False))
step_states.append(
StepState(
node_id=step_name,
status="success" if ok else "failed",
started_at=str(payload.get("started_at") or "") or None,
finished_at=str(payload.get("finished_at") or "") or None,
error=RunError(
code="tool_error",
message=str(payload.get("error", f"{step_name} failed")),
)
if not ok
else None,
)
)
return step_states
async def run_scenario_workflow(
input_data: dict[str, Any],
scenario_id: str = "news_source_discovery_v1",
) -> dict[str, Any]:
try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
return ScenarioRunResponse(
scenario_id=scenario_id,
status="failed",
input=input_data,
steps=[],
error=RunError(code="unknown_scenario", message=str(exc)),
).model_dump()
runner = get_mcp_workflow_runner()
scenario_name = str(scenario.get("name", scenario_id))
try:
minimal_result = await runner.run(
scenario_id=scenario_id,
input_data=input_data,
)
except Exception as exc:
return ScenarioRunResponse(
scenario_id=scenario_id,
status="failed",
input=input_data,
scenario_name=scenario_name,
steps=[],
error=RunError(code="workflow_error", message=str(exc)),
).model_dump()
minimal_steps = minimal_result.get("steps", {})
steps = (
minimal_steps
if isinstance(minimal_steps, dict)
else {}
)
step_states = _build_step_states_from_minimal(
scenario=scenario,
minimal_steps=steps,
)
final_result = minimal_result.get("final_result")
normalized_result = (
final_result if isinstance(final_result, dict) else {"raw_content": str(final_result)}
)
status = "success"
for payload in steps.values():
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
status = "failed"
break
return ScenarioRunResponse(
scenario_id=scenario_id,
status=status,
input=input_data,
steps=step_states,
output_summary=_extract_output_summary(normalized_result),
scenario_name=scenario_name,
workflow_name=str(minimal_result.get("workflow_name") or scenario_id),
result=normalized_result,
error=None
if status == "success"
else RunError(code="workflow_failed", message="Workflow finished with failed status."),
run_id=minimal_result.get("run_id"),
session_id=minimal_result.get("session_id"),
).model_dump()
-96
View File
@@ -1,96 +0,0 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _base_result(tool_name: str, ok: bool, payload: dict[str, Any]) -> dict[str, Any]:
return {
"tool_name": tool_name,
"ok": ok,
"payload": payload,
"received_at": _utc_now_iso(),
}
async def stub_search_news_sources(url: str) -> dict[str, Any]:
return _base_result(
tool_name="search_news_sources",
ok=True,
payload={
"input_url": url,
"items": [
{"url": "https://news-a.example/article-1"},
{"url": "https://news-b.example/article-2"},
{"url": "https://news-c.example/article-3"},
],
},
)
async def stub_parse_article(url: str) -> dict[str, Any]:
return _base_result(
tool_name="parse_article",
ok=True,
payload={
"url": url,
"title": "Stub article title",
"published_at": "2026-01-01T10:00:00+00:00",
"text": "Stub parsed article content.",
},
)
async def stub_extract_publication_date(article_text: str) -> dict[str, Any]:
return _base_result(
tool_name="extract_publication_date",
ok=True,
payload={
"text_size": len(article_text),
"published_at": "2026-01-01T10:00:00+00:00",
"confidence": 0.77,
},
)
async def stub_rank_sources_by_date(items: list[dict[str, Any]]) -> dict[str, Any]:
ranked = sorted(items, key=lambda item: str(item.get("published_at", "")))
return _base_result(
tool_name="rank_sources_by_date",
ok=True,
payload={
"input_count": len(items),
"ranked_items": ranked,
},
)
async def stub_generate_summary(items: list[dict[str, Any]]) -> dict[str, Any]:
first_url = ""
if items:
first_url = str(items[0].get("url", ""))
return _base_result(
tool_name="generate_summary",
ok=True,
payload={
"input_count": len(items),
"summary": (
"По заглушечным данным самым ранним источником считается "
+ first_url
),
},
)
STUB_TOOLS: dict[str, Any] = {
"search_news_sources": stub_search_news_sources,
"parse_article": stub_parse_article,
"extract_publication_date": stub_extract_publication_date,
"rank_sources_by_date": stub_rank_sources_by_date,
"generate_summary": stub_generate_summary,
}
-418
View File
@@ -1,418 +0,0 @@
from __future__ import annotations
from contextvars import ContextVar
from datetime import datetime, timezone
import json
from typing import Any
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow
from pydantic import BaseModel, ValidationError, create_model
from src.schemas import RunError, RunStatus, ScenarioRunResponse, StepState
from src.scenario_store import ScenarioStoreError, load_scenario_definition
from src.stub_tools import (
stub_extract_publication_date,
stub_generate_summary,
stub_parse_article,
stub_rank_sources_by_date,
stub_search_news_sources,
)
_workflow_cache: dict[str, Workflow] = {}
_workflow_input_schemas: dict[str, type[BaseModel]] = {}
_run_steps_context: ContextVar[list[StepState] | None] = ContextVar(
"run_steps_context",
default=None,
)
def _json_loads(raw: str | None) -> dict[str, Any]:
if not raw:
return {}
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return {}
if isinstance(parsed, dict):
return parsed
return {}
def _as_json_step_output(payload: dict[str, Any]) -> StepOutput:
return StepOutput(content=json.dumps(payload, ensure_ascii=False))
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _initialize_step_states(scenario: dict[str, Any]) -> list[StepState]:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list):
return []
step_states: list[StepState] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
continue
node_id = str(raw_step.get("name", "")).strip()
if not node_id:
continue
step_states.append(StepState(node_id=node_id, status="queued"))
return step_states
def _update_step_state(
node_id: str,
status: str,
error: RunError | None = None,
) -> None:
step_states = _run_steps_context.get()
if not step_states:
return
for step_state in step_states:
if step_state.node_id != node_id:
continue
step_state.status = status
if status == "running" and step_state.started_at is None:
step_state.started_at = _utc_now_iso()
if status in {"success", "failed", "waiting_human"}:
if step_state.started_at is None:
step_state.started_at = _utc_now_iso()
step_state.finished_at = _utc_now_iso()
step_state.error = error
return
def _mark_running_steps_failed(message: str) -> None:
step_states = _run_steps_context.get()
if not step_states:
return
for step_state in step_states:
if step_state.status == "running":
step_state.status = "failed"
if step_state.started_at is None:
step_state.started_at = _utc_now_iso()
step_state.finished_at = _utc_now_iso()
step_state.error = RunError(code="workflow_error", message=message)
def _extract_output_summary(content: Any) -> str | None:
if not isinstance(content, dict):
return None
summary = content.get("summary")
if isinstance(summary, str) and summary:
return summary
payload = content.get("payload")
if isinstance(payload, dict):
payload_summary = payload.get("summary")
if isinstance(payload_summary, str) and payload_summary:
return payload_summary
return None
def _build_run_response(
*,
scenario_id: str,
input_data: dict[str, Any],
status: RunStatus,
steps: list[StepState],
scenario_name: str | None = None,
workflow_name: str | None = None,
output_summary: str | None = None,
result: dict[str, Any] | None = None,
error: RunError | None = None,
run_id: str | None = None,
session_id: str | None = None,
) -> dict[str, Any]:
return ScenarioRunResponse(
scenario_id=scenario_id,
status=status,
input=input_data,
steps=steps,
output_summary=output_summary,
scenario_name=scenario_name,
workflow_name=workflow_name,
result=result,
error=error,
run_id=run_id,
session_id=session_id,
).model_dump()
def _extract_input_url(step_input_value: Any) -> str:
if isinstance(step_input_value, dict):
return str(step_input_value.get("url", "")).strip()
return str(step_input_value).strip()
def _build_input_schema_model(scenario: dict[str, Any]) -> type[BaseModel] | None:
input_schema = scenario.get("input_schema")
if not isinstance(input_schema, dict):
return None
properties = input_schema.get("properties")
if not isinstance(properties, dict):
return None
required_raw = input_schema.get("required", [])
required_fields = set(required_raw) if isinstance(required_raw, list) else set()
fields: dict[str, tuple[type[str], Any]] = {}
for field_name, field_schema in properties.items():
if not isinstance(field_name, str) or not isinstance(field_schema, dict):
continue
if field_schema.get("type") != "string":
continue
default_value = ... if field_name in required_fields else ""
fields[field_name] = (str, default_value)
if not fields:
return None
return create_model(f"{scenario.get('scenario_id', 'Scenario')}Input", **fields)
async def _search_news_sources_executor(step_input: StepInput) -> StepOutput:
_update_step_state("search_news_sources", "running")
input_url = _extract_input_url(step_input.input)
if not input_url:
_update_step_state(
"search_news_sources",
"failed",
error=RunError(code="invalid_input", message="input.url is empty"),
)
return StepOutput(content="search_news_sources failed: input.url is empty", success=False)
search_result = await stub_search_news_sources(url=input_url)
_update_step_state("search_news_sources", "success")
return _as_json_step_output(search_result)
async def _parse_article_executor(step_input: StepInput) -> StepOutput:
_update_step_state("parse_articles_batch", "running")
previous_payload = _json_loads(step_input.previous_step_content)
items = previous_payload.get("payload", {}).get("items", [])
parsed_items: list[dict[str, Any]] = []
for item in items:
source_url = str(item.get("url", ""))
parsed_result = await stub_parse_article(url=source_url)
if not parsed_result.get("ok", False):
_update_step_state(
"parse_articles_batch",
"failed",
error=RunError(code="tool_error", message="parse_article failed"),
)
return StepOutput(content="parse_article failed", success=False)
parsed_items.append(parsed_result.get("payload", {}))
_update_step_state("parse_articles_batch", "success")
return _as_json_step_output(
{
"tool_name": "parse_articles_batch",
"ok": True,
"payload": {"items": parsed_items},
}
)
async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput:
_update_step_state("extract_publication_date_batch", "running")
previous_payload = _json_loads(step_input.previous_step_content)
parsed_items = previous_payload.get("payload", {}).get("items", [])
dated_items: list[dict[str, Any]] = []
for item in parsed_items:
article_text = str(item.get("text", ""))
extract_result = await stub_extract_publication_date(article_text=article_text)
if not extract_result.get("ok", False):
_update_step_state(
"extract_publication_date_batch",
"failed",
error=RunError(code="tool_error", message="extract_publication_date failed"),
)
return StepOutput(content="extract_publication_date failed", success=False)
dated_items.append(
{
"url": str(item.get("url", "")),
"title": str(item.get("title", "")),
"published_at": str(
extract_result.get("payload", {}).get("published_at", "")
),
}
)
_update_step_state("extract_publication_date_batch", "success")
return _as_json_step_output(
{
"tool_name": "extract_publication_date_batch",
"ok": True,
"payload": {"items": dated_items},
}
)
async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput:
_update_step_state("rank_sources_by_date", "running")
previous_payload = _json_loads(step_input.previous_step_content)
items = previous_payload.get("payload", {}).get("items", [])
rank_result = await stub_rank_sources_by_date(items=items)
_update_step_state("rank_sources_by_date", "success")
return _as_json_step_output(rank_result)
async def _generate_summary_executor(step_input: StepInput) -> StepOutput:
_update_step_state("generate_summary", "running")
previous_payload = _json_loads(step_input.previous_step_content)
ranked_items = previous_payload.get("payload", {}).get("ranked_items", [])
summary_result = await stub_generate_summary(items=ranked_items)
_update_step_state("generate_summary", "success")
return _as_json_step_output(summary_result)
_step_executors = {
"search_news_sources": _search_news_sources_executor,
"parse_articles_batch": _parse_article_executor,
"extract_publication_date_batch": _extract_publication_date_executor,
"rank_sources_by_date": _rank_sources_by_date_executor,
"generate_summary": _generate_summary_executor,
}
def get_workflow_for_scenario(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
cached_workflow = _workflow_cache.get(scenario_id)
if cached_workflow is not None:
return cached_workflow
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
step_name = str(raw_step.get("name", "")).strip()
if not step_name:
raise ScenarioStoreError("Each scenario step must have non-empty name")
step_executor = _step_executors.get(step_name)
if step_executor is None:
raise ScenarioStoreError(f"Unknown step executor: {step_name}")
workflow_steps.append(
Step(
name=step_name,
description=str(raw_step.get("description", step_name)),
executor=step_executor,
)
)
input_schema_model = _build_input_schema_model(scenario)
workflow = Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
input_schema=input_schema_model,
)
if input_schema_model is not None:
_workflow_input_schemas[scenario_id] = input_schema_model
_workflow_cache[scenario_id] = workflow
return workflow
async def run_scenario_workflow(
input_data: dict[str, Any],
scenario_id: str = "news_source_discovery_v1",
) -> dict[str, Any]:
try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="failed",
steps=[],
error=RunError(code="unknown_scenario", message=str(exc)),
)
step_states = _initialize_step_states(scenario)
scenario_name = str(scenario.get("name", scenario_id))
workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario)
input_schema_model = _workflow_input_schemas.get(scenario_id)
if input_schema_model is not None:
try:
input_schema_model.model_validate(input_data)
except ValidationError as exc:
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="failed",
scenario_name=scenario_name,
steps=step_states,
error=RunError(
code="invalid_input",
message=f"Input does not match scenario input_schema: {exc}",
),
)
context_token = _run_steps_context.set(step_states)
try:
run_output = await workflow.arun(input=input_data)
except Exception as exc:
_mark_running_steps_failed(str(exc))
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="failed",
scenario_name=scenario_name,
steps=step_states,
error=RunError(code="workflow_error", message=str(exc)),
)
finally:
_run_steps_context.reset(context_token)
content: Any = run_output.content if hasattr(run_output, "content") else {}
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
output_summary = _extract_output_summary(content)
normalized_result = content if isinstance(content, dict) else {"raw_content": str(content)}
if hasattr(run_output, "success") and not bool(getattr(run_output, "success")):
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="failed",
scenario_name=scenario_name,
steps=step_states,
output_summary=output_summary,
result=normalized_result,
error=RunError(
code="workflow_failed",
message="Workflow finished with failed status.",
),
)
run_id: str | None = None
session_id: str | None = None
if hasattr(run_output, "run_id"):
run_id = str(getattr(run_output, "run_id"))
if hasattr(run_output, "session_id"):
session_id = str(getattr(run_output, "session_id"))
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="success",
workflow_name=workflow.name,
scenario_name=scenario_name,
steps=step_states,
output_summary=output_summary,
result=normalized_result,
run_id=run_id,
session_id=session_id,
)