diff --git a/.env.example b/.env.example index 48e2b5b..ae3c861 100644 --- a/.env.example +++ b/.env.example @@ -4,9 +4,15 @@ OLLAMA_HOST=http://localhost:11435 OLLAMA_TEMPERATURE=0 AGENT_MARKDOWN=false AGENT_DEBUG_MODE=true -AGENT_INSTRUCTIONS=You are a helpful assistant. Answer briefly and clearly. +AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly." AGENT_OS_HOST=127.0.0.1 AGENT_OS_PORT=7777 +POLZA_BASE_URL=https://api.polza.ai/v1 +POLZA_MODEL_ID=google/gemma-4-31b-it +POLZA_API_KEY=key +POLZA_TEMPERATURE=0 +MCP_BASE_URL=http://127.0.0.1:8081/mcp +MCP_TIMEOUT_SECONDS=10 PHOENIX_TRACING_ENABLED=false PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 PHOENIX_PROJECT_NAME=prisma-platform diff --git a/README.md b/README.md index 1cfd61e..5175d53 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,19 @@ # Prisma Platform MVP -Минимальный чат-агент на Agno + Ollama с рантаймом AgentOS. +MVP-реализация сценарного раннера на Agno AgentOS. -В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn). +Текущая схема исполнения: + +- сценарий хранится в `scenarios/*.json`; +- исполнение идет через `src/mcp_workflow_runner.py`; +- каждый шаг вызывает MCP инструмент через `src/mcp_client.py`; +- для подготовки аргументов шага используется planner-агент с моделью через `polza.ai`. ## Требования - Python 3.10+ -- Запущенный Ollama endpoint (по умолчанию: `http://localhost:11435`) -- Доступная модель в Ollama (по умолчанию: `gemma4:31b`) +- MCP endpoint (по умолчанию `http://127.0.0.1:8081/mcp`) +- доступ к модели через `polza.ai` (`POLZA_API_KEY`) ## Текущая структура @@ -26,11 +31,11 @@ prisma_platform/ ├── api_routes.py ├── agent_os.py ├── agent_runner.py + ├── mcp_client.py + ├── mcp_workflow_runner.py ├── observability.py ├── scenario_store.py - ├── schemas.py - ├── stub_tools.py - └── workflow_runner.py + └── schemas.py ``` ## Установка @@ -44,25 +49,32 @@ cp .env.example .env ## Запуск -Запуск сервера AgentOS: +1) Поднимите MCP stub (из соседнего репозитория): ```bash -python -m src.agent_os +cd /home/worker/projects/docker-service/mcp-stub +docker compose up --build -d ``` -По умолчанию AgentOS работает на `http://127.0.0.1:7777`. +2) Запустите сервер AgentOS: -Документация API доступна по адресам: +```bash +cd /home/worker/projects/prisma_platform +.venv/bin/python -m src.agent_os +``` + +По умолчанию приложение доступно на `http://127.0.0.1:7777`. + +Документация API: - `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/redoc` -Верхний слой сервиса реализован как кастомные FastAPI роуты (`src/api_routes.py`), подключенные через `AgentOS(base_app=...)`. - -### Запуск сценария через HTTP +## Запуск сценария через HTTP - `POST http://127.0.0.1:7777/api/runs` -- Тело запроса (JSON): + +Тело запроса: ```json { @@ -73,10 +85,10 @@ python -m src.agent_os } ``` -Пример запроса: +Пример: ```bash -curl -X POST "http://127.0.0.1:7777/api/runs" \ +curl -s -X POST "http://127.0.0.1:7777/api/runs" \ -H "Content-Type: application/json" \ -d '{ "scenario_id": "news_source_discovery_v1", @@ -86,106 +98,45 @@ curl -X POST "http://127.0.0.1:7777/api/runs" \ }' ``` -Endpoint возвращает единый JSON-контракт. Поля одинаковые для `success` и `failed`, -а в неактуальных полях приходит `null`. +Успешный ответ содержит: -Пример успешного ответа: - -```json -{ - "scenario_id": "news_source_discovery_v1", - "status": "success", - "input": { - "url": "https://example.com/news" - }, - "steps": [ - { - "node_id": "search_news_sources", - "status": "success", - "started_at": "2026-04-22T10:00:00+00:00", - "finished_at": "2026-04-22T10:00:00+00:00", - "error": null - } - ], - "output_summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1", - "workflow_name": "news_source_discovery_v1", - "scenario_name": "News Source Discovery V1", - "result": { - "tool_name": "generate_summary", - "ok": true, - "payload": { - "input_count": 3, - "summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1" - }, - "received_at": "2026-04-22T10:00:00+00:00" - }, - "error": null, - "run_id": "run_xxx", - "session_id": "session_xxx" -} -``` - -Пример ответа с ошибкой валидации: - -```json -{ - "scenario_id": "news_source_discovery_v1", - "status": "failed", - "input": {}, - "steps": [ - { - "node_id": "search_news_sources", - "status": "queued", - "started_at": null, - "finished_at": null, - "error": null - } - ], - "output_summary": null, - "workflow_name": null, - "scenario_name": "News Source Discovery V1", - "result": null, - "error": { - "code": "invalid_input", - "message": "Input does not match scenario input_schema: ..." - }, - "run_id": null, - "session_id": null -} -``` - -Проверка, что сервер поднят: - -```bash -curl -s "http://127.0.0.1:7777/docs" | grep -n "Swagger UI" -``` +- `status=success` +- список `steps` со статусами шагов +- `output_summary` +- `result` итогового шага ## Переменные окружения -Основные переменные: +Основные: -- `AGENT_ID` (по умолчанию: `prisma-agent`) -- `OLLAMA_MODEL_ID` (по умолчанию: `gemma4:31b`) -- `OLLAMA_HOST` (по умолчанию: `http://localhost:11435`) -- `OLLAMA_TEMPERATURE` (по умолчанию: `0`) -- `AGENT_MARKDOWN` (по умолчанию: `false`) -- `AGENT_DEBUG_MODE` (по умолчанию: `true`) -- `AGENT_INSTRUCTIONS` (по умолчанию: `You are a helpful assistant. Answer briefly and clearly.`) -- `AGENT_OS_HOST` (по умолчанию: `127.0.0.1`) -- `AGENT_OS_PORT` (по умолчанию: `7777`) -- `PHOENIX_TRACING_ENABLED` (по умолчанию: `false`) -- `PHOENIX_COLLECTOR_ENDPOINT` (по умолчанию: `http://localhost:6006`) -- `PHOENIX_PROJECT_NAME` (по умолчанию: `prisma-platform`) +- `AGENT_ID` (default: `prisma-agent`) +- `AGENT_MARKDOWN` (default: `false`) +- `AGENT_DEBUG_MODE` (default: `true`) +- `AGENT_INSTRUCTIONS` +- `AGENT_OS_HOST` (default: `127.0.0.1`) +- `AGENT_OS_PORT` (default: `7777`) + +Planner-модель (`polza.ai`): + +- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`) +- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`) +- `POLZA_API_KEY` (required) +- `POLZA_TEMPERATURE` (default: `0`) + +MCP: + +- `MCP_BASE_URL` (default: `http://127.0.0.1:8081/mcp`) +- `MCP_TIMEOUT_SECONDS` (default: `10`) + +Phoenix tracing: + +- `PHOENIX_TRACING_ENABLED` (default: `false`) +- `PHOENIX_COLLECTOR_ENDPOINT` (default: `http://localhost:6006`) +- `PHOENIX_PROJECT_NAME` (default: `prisma-platform`) ## Phoenix трассировка (локально) -1. Установите зависимости: - -```bash -pip install -r requirements.txt -``` - -2. Поднимите Phoenix (см. `docker-service/docker-compose.yml`) и включите трассировку в `.env`: +1) Включите трассировку в `.env`: ```dotenv PHOENIX_TRACING_ENABLED=true @@ -193,5 +144,9 @@ PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 PHOENIX_PROJECT_NAME=prisma-platform ``` -3. Запустите приложение как обычно (`python -m src.agent_os`). +2) Запустите приложение: + +```bash +.venv/bin/python -m src.agent_os +``` diff --git a/scenarios/news_source_discovery/v1.json b/scenarios/news_source_discovery/v1.json index deb1f91..a19165d 100644 --- a/scenarios/news_source_discovery/v1.json +++ b/scenarios/news_source_discovery/v1.json @@ -2,7 +2,7 @@ "schema_version": "1", "scenario_id": "news_source_discovery_v1", "name": "News Source Discovery V1", - "description": "Find earliest news source using sequential stub tools.", + "description": "Find earliest news source using sequential MCP tools.", "input_schema": { "type": "object", "required": [ @@ -18,23 +18,88 @@ "steps": [ { "name": "search_news_sources", - "type": "tool" + "type": "tool", + "tool": "search_news_sources", + "input": { + "url": { + "from": "input.url" + } + }, + "required_input_fields": [ + "url" + ] }, { "name": "parse_articles_batch", - "type": "tool" + "type": "tool", + "tool": "parse_article", + "foreach": { + "from": "steps.search_news_sources.payload.items", + "as": "item" + }, + "input": { + "url": { + "from": "item.url" + } + }, + "collect": { + "url": { + "from": "tool.payload.url" + }, + "title": { + "from": "tool.payload.title" + }, + "text": { + "from": "tool.payload.text" + } + }, + "collect_key": "items" }, { "name": "extract_publication_date_batch", - "type": "tool" + "type": "tool", + "tool": "extract_publication_date", + "foreach": { + "from": "steps.parse_articles_batch.payload.items", + "as": "item" + }, + "input": { + "article_text": { + "from": "item.text" + } + }, + "collect": { + "url": { + "from": "item.url" + }, + "title": { + "from": "item.title" + }, + "published_at": { + "from": "tool.payload.published_at" + } + }, + "collect_key": "items" }, { "name": "rank_sources_by_date", - "type": "tool" + "type": "tool", + "tool": "rank_sources_by_date", + "input": { + "items": { + "from": "steps.extract_publication_date_batch.payload.items" + } + } }, { "name": "generate_summary", - "type": "tool" + "type": "tool", + "tool": "generate_summary", + "input": { + "items": { + "from": "steps.rank_sources_by_date.payload.ranked_items" + } + } } ] } diff --git a/src/agent_os.py b/src/agent_os.py index 7e5f648..994fbee 100644 --- a/src/agent_os.py +++ b/src/agent_os.py @@ -8,16 +8,11 @@ from agno.os import AgentOS from src.api_routes import router as api_router from src.agent_runner import get_agent from src.observability import init_phoenix_tracing -from src.scenario_store import load_scenario_definition -from src.workflow_runner import get_workflow_for_scenario load_dotenv() _tracing_enabled = init_phoenix_tracing() _agent = get_agent() -_default_scenario_id = "news_source_discovery_v1" -_scenario = load_scenario_definition(_default_scenario_id) -_workflow = get_workflow_for_scenario(_default_scenario_id, _scenario) _base_app = FastAPI( title="Prisma Platform API", version="0.1.0", @@ -25,7 +20,6 @@ _base_app = FastAPI( _base_app.include_router(api_router) _agent_os = AgentOS( agents=[_agent], - workflows=[_workflow], tracing=_tracing_enabled, base_app=_base_app, ) diff --git a/src/api_routes.py b/src/api_routes.py index c03cd5e..5ba5f9e 100644 --- a/src/api_routes.py +++ b/src/api_routes.py @@ -1,7 +1,7 @@ from fastapi import APIRouter +from src.mcp_workflow_runner import run_scenario_workflow from src.schemas import ScenarioRunRequest, ScenarioRunResponse -from src.workflow_runner import run_scenario_workflow router = APIRouter(prefix="/api", tags=["workflow"]) diff --git a/src/mcp_client.py b/src/mcp_client.py new file mode 100644 index 0000000..89c61d7 --- /dev/null +++ b/src/mcp_client.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from datetime import timedelta +import json +import os +from typing import Any + +from mcp import ClientSession +from mcp.client.streamable_http import streamablehttp_client +from mcp.types import TextContent + + +def _mcp_url() -> str: + return os.getenv("MCP_BASE_URL", "http://127.0.0.1:8081/mcp") + + +def _timeout_seconds() -> float: + value = os.getenv("MCP_TIMEOUT_SECONDS") + if value is None: + return 10.0 + return float(value) + + +async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, Any]: + try: + async with streamablehttp_client(url=_mcp_url()) as session_params: + read, write = session_params[0:2] + async with ClientSession( + read, + write, + read_timeout_seconds=timedelta(seconds=_timeout_seconds()), + ) as session: + await session.initialize() + result = await session.call_tool(tool_name, arguments) + except TimeoutError as exc: + raise RuntimeError(f"MCP timeout: {tool_name}") from exc + except Exception as exc: + raise RuntimeError(f"MCP transport error: {tool_name}") from exc + + if result.isError: + raise RuntimeError(f"MCP tool error: {tool_name}") + + if isinstance(result.structuredContent, dict): + return result.structuredContent + + for content_item in result.content: + if not isinstance(content_item, TextContent): + continue + try: + parsed = json.loads(content_item.text) + except json.JSONDecodeError: + continue + if isinstance(parsed, dict): + return parsed + + raise RuntimeError(f"MCP tool returned invalid payload: {tool_name}") diff --git a/src/mcp_workflow_runner.py b/src/mcp_workflow_runner.py new file mode 100644 index 0000000..9730347 --- /dev/null +++ b/src/mcp_workflow_runner.py @@ -0,0 +1,546 @@ +from __future__ import annotations + +from contextvars import ContextVar +from copy import deepcopy +from datetime import datetime, timezone +import json +import os +from typing import Any + +from agno.agent import Agent +from agno.models.openai import OpenAIChat +from agno.workflow.step import Step, StepInput, StepOutput +from agno.workflow.workflow import Workflow +from pydantic import BaseModel, Field + +from src.mcp_client import call_mcp_tool +from src.schemas import RunError, ScenarioRunResponse, StepState +from src.scenario_store import ScenarioStoreError, load_scenario_definition + + +class McpArgumentsPlan(BaseModel): + """Structured planner output for one MCP tool call.""" + + arguments: dict[str, Any] = Field(default_factory=dict) + + +_planner_agent: Agent | None = None + + +def _env_float(name: str, default: float) -> float: + value = os.getenv(name) + if value is None: + return default + return float(value) + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def get_shared_step_planner_agent() -> Agent: + """ + Create one reusable planner agent for all workflow steps. + + This agent never calls MCP directly. It only prepares arguments + for a fixed MCP method selected by the workflow step. + """ + global _planner_agent + if _planner_agent is not None: + return _planner_agent + + model_id = os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it") + polza_base_url = os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1") + polza_api_key = os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY") + temperature = _env_float("POLZA_TEMPERATURE", 0.0) + + llm = OpenAIChat( + id=model_id, + api_key=polza_api_key, + base_url=polza_base_url, + temperature=temperature, + ) + _planner_agent = Agent( + id="workflow-step-planner", + model=llm, + output_schema=McpArgumentsPlan, + markdown=False, + debug_mode=False, + instructions=[ + "You are a strict tool-input planner.", + "You receive step metadata and current workflow context.", + "Return only arguments that should be sent to MCP tool.", + "Do not add extra keys that are unrelated to the tool.", + "Do not invent values if they are absent in context.", + ], + ) + return _planner_agent + + +def _resolve_path(scope: dict[str, Any], path: str) -> Any: + value: Any = scope + for segment in path.split("."): + key = segment.strip() + if not key: + continue + if not isinstance(value, dict): + return None + value = value.get(key) + return deepcopy(value) + + +def _resolve_template(template: Any, scope: dict[str, Any]) -> Any: + if isinstance(template, dict): + if set(template.keys()) == {"from"}: + return _resolve_path(scope, str(template["from"])) + return {key: _resolve_template(value, scope) for key, value in template.items()} + if isinstance(template, list): + return [_resolve_template(item, scope) for item in template] + return deepcopy(template) + + +def _validate_required_fields( + arguments: dict[str, Any], + required_fields: list[str], + step_name: str, +) -> None: + for field in required_fields: + value = arguments.get(field) + if isinstance(value, str) and value.strip(): + continue + if value not in (None, "", [], {}): + continue + raise ValueError(f"{step_name}: input.{field} is empty") + + +class McpWorkflowRunner: + """ + Minimal workflow runner: + - fixed step order from scenario + - same planner agent in every step + - MCP call executed by code, not by the agent + - request/response persisted in run context + """ + + def __init__(self, planner_agent: Agent | None = None) -> None: + self._planner_agent = planner_agent or get_shared_step_planner_agent() + self._workflow_cache: dict[str, Workflow] = {} + self._run_state_ctx: ContextVar[dict[str, Any] | None] = ContextVar( + "mcp_workflow_run_state", + default=None, + ) + + def _get_run_state(self) -> dict[str, Any]: + run_state = self._run_state_ctx.get() + if run_state is None: + raise RuntimeError("run state is not initialized") + return run_state + + def _build_scope(self) -> dict[str, Any]: + run_state = self._get_run_state() + return { + "input": run_state.get("input", {}), + "steps": run_state.get("steps", {}), + } + + async def _plan_arguments( + self, + *, + step_name: str, + tool_name: str, + base_arguments: dict[str, Any], + required_fields: list[str], + scope: dict[str, Any], + ) -> dict[str, Any]: + prompt = { + "task": "Prepare MCP arguments for this step.", + "step_name": step_name, + "tool_name": tool_name, + "required_fields": required_fields, + "base_arguments": base_arguments, + "context": { + "input": scope.get("input", {}), + "steps": scope.get("steps", {}), + }, + "output": "Return arguments object only.", + } + run_output = await self._planner_agent.arun(json.dumps(prompt, ensure_ascii=False)) + content = run_output.content if hasattr(run_output, "content") else {} + + if isinstance(content, McpArgumentsPlan): + planned = content.arguments + elif isinstance(content, dict): + planned = content.get("arguments", {}) + else: + planned = {} + + if not isinstance(planned, dict): + planned = {} + + # Allow planner to override/fill base arguments while keeping known defaults. + merged = deepcopy(base_arguments) + merged.update(planned) + return merged + + def _build_tool_step_executor(self, step_spec: dict[str, Any]): + step_name = str(step_spec["name"]) + tool_name = str(step_spec["tool"]) + input_template = step_spec.get("input", {}) + foreach_spec = step_spec.get("foreach") + collect_template = step_spec.get("collect") + collect_key = str(step_spec.get("collect_key", "items")).strip() or "items" + required_fields_raw = step_spec.get("required_input_fields", []) + required_fields = ( + [field for field in required_fields_raw if isinstance(field, str)] + if isinstance(required_fields_raw, list) + else [] + ) + if isinstance(foreach_spec, dict): + source_path = str(foreach_spec.get("from", "")).strip() + item_alias = str(foreach_spec.get("as", "item")).strip() or "item" + else: + source_path = str(foreach_spec).strip() if isinstance(foreach_spec, str) else "" + item_alias = "item" + + async def _executor(_step_input: StepInput) -> StepOutput: + run_state = self._get_run_state() + scope = self._build_scope() + step_started_at = _utc_now_iso() + + try: + tool_calls = run_state.setdefault("tool_calls", []) + if not isinstance(tool_calls, list): + tool_calls = [] + run_state["tool_calls"] = tool_calls + + if source_path: + iterable = _resolve_path(scope, source_path) + if not isinstance(iterable, list): + raise ValueError(f"{step_name}: foreach source is not list") + + collected_items: list[Any] = [] + for index, item in enumerate(iterable): + iteration_scope = dict(scope) + iteration_scope[item_alias] = item + iteration_scope["item"] = item + iteration_scope["index"] = index + + resolved = _resolve_template(input_template, iteration_scope) + base_arguments = resolved if isinstance(resolved, dict) else {} + + final_arguments = await self._plan_arguments( + step_name=step_name, + tool_name=tool_name, + base_arguments=base_arguments, + required_fields=required_fields, + scope=iteration_scope, + ) + _validate_required_fields(final_arguments, required_fields, step_name) + + tool_response = await call_mcp_tool(tool_name, final_arguments) + tool_calls.append( + { + "step_name": step_name, + "tool_name": tool_name, + "attempt": index + 1, + "request": final_arguments, + "ok": True, + "response": tool_response, + } + ) + + if collect_template is None: + collected_items.append(tool_response.get("payload", {})) + else: + collected_items.append( + _resolve_template( + collect_template, + {**iteration_scope, "tool": tool_response}, + ) + ) + + step_payload = { + "ok": True, + "tool_name": step_name, + "payload": {collect_key: collected_items}, + "request": {"foreach_from": source_path, "count": len(iterable)}, + "received_at": _utc_now_iso(), + "started_at": step_started_at, + "finished_at": _utc_now_iso(), + } + else: + resolved = _resolve_template(input_template, scope) + base_arguments = resolved if isinstance(resolved, dict) else {} + + final_arguments = await self._plan_arguments( + step_name=step_name, + tool_name=tool_name, + base_arguments=base_arguments, + required_fields=required_fields, + scope=scope, + ) + _validate_required_fields(final_arguments, required_fields, step_name) + + tool_response = await call_mcp_tool(tool_name, final_arguments) + step_payload = { + "ok": bool(tool_response.get("ok", True)), + "tool_name": tool_name, + "payload": tool_response.get("payload", {}), + "request": final_arguments, + "response": tool_response, + "received_at": tool_response.get("received_at"), + "started_at": step_started_at, + "finished_at": _utc_now_iso(), + } + tool_calls.append( + { + "step_name": step_name, + "tool_name": tool_name, + "request": final_arguments, + "ok": True, + "response": tool_response, + } + ) + + run_state.setdefault("steps", {})[step_name] = step_payload + return StepOutput( + content=json.dumps(step_payload, ensure_ascii=False), + success=True, + ) + except Exception as exc: + error_payload = { + "ok": False, + "tool_name": tool_name, + "request": {}, + "error": str(exc), + "started_at": step_started_at, + "finished_at": _utc_now_iso(), + } + run_state.setdefault("steps", {})[step_name] = error_payload + run_state.setdefault("tool_calls", []).append( + { + "step_name": step_name, + "tool_name": tool_name, + "request": {}, + "ok": False, + "error": str(exc), + } + ) + return StepOutput( + content=json.dumps(error_payload, ensure_ascii=False), + success=False, + ) + + return _executor + + def get_workflow(self, scenario_id: str, scenario: dict[str, Any]) -> Workflow: + cached = self._workflow_cache.get(scenario_id) + if cached is not None: + return cached + + raw_steps = scenario.get("steps") + if not isinstance(raw_steps, list) or not raw_steps: + raise ScenarioStoreError("Scenario must contain non-empty steps list") + + workflow_steps: list[Step] = [] + for raw_step in raw_steps: + if not isinstance(raw_step, dict): + raise ScenarioStoreError("Each scenario step must be object") + if raw_step.get("type") != "tool": + raise ScenarioStoreError("This minimal runner supports only tool steps") + + step_name = str(raw_step.get("name", "")).strip() + tool_name = str(raw_step.get("tool", step_name)).strip() + if not step_name or not tool_name: + raise ScenarioStoreError("Each tool step must contain non-empty name and tool") + + executor = self._build_tool_step_executor(raw_step) + workflow_steps.append( + Step( + name=step_name, + description=str(raw_step.get("description", step_name)), + executor=executor, + ) + ) + + workflow = Workflow( + name=scenario_id, + description=str(scenario.get("description", "")), + steps=workflow_steps, + ) + self._workflow_cache[scenario_id] = workflow + return workflow + + async def run(self, *, scenario_id: str, input_data: dict[str, Any]) -> dict[str, Any]: + scenario = load_scenario_definition(scenario_id) + workflow = self.get_workflow(scenario_id, scenario) + + initial_state = { + "input": deepcopy(input_data), + "steps": {}, + "tool_calls": [], + } + token = self._run_state_ctx.set(initial_state) + run_state = initial_state + run_output: Any = None + try: + run_output = await workflow.arun(input=input_data) + finally: + captured = self._run_state_ctx.get() + if isinstance(captured, dict): + run_state = deepcopy(captured) + self._run_state_ctx.reset(token) + + content = run_output.content if hasattr(run_output, "content") else None + if isinstance(content, str): + try: + content = json.loads(content) + except json.JSONDecodeError: + content = {"raw_content": content} + + return { + "scenario_id": scenario_id, + "workflow_name": workflow.name, + "status": "success" + if getattr(run_output, "success", True) + else "failed", + "input": input_data, + "final_result": content if isinstance(content, dict) else {"raw_content": content}, + "steps": run_state.get("steps", {}), + "tool_calls": run_state.get("tool_calls", []), + "run_id": str(getattr(run_output, "run_id", "")) or None, + "session_id": str(getattr(run_output, "session_id", "")) or None, + } + + +_default_runner: McpWorkflowRunner | None = None + + +def get_mcp_workflow_runner() -> McpWorkflowRunner: + global _default_runner + if _default_runner is not None: + return _default_runner + _default_runner = McpWorkflowRunner() + return _default_runner + + +def _extract_output_summary(content: Any) -> str | None: + if not isinstance(content, dict): + return None + summary = content.get("summary") + if isinstance(summary, str) and summary: + return summary + payload = content.get("payload") + if isinstance(payload, dict): + payload_summary = payload.get("summary") + if isinstance(payload_summary, str) and payload_summary: + return payload_summary + return None + + +def _build_step_states_from_minimal( + *, + scenario: dict[str, Any], + minimal_steps: dict[str, Any], +) -> list[StepState]: + raw_steps = scenario.get("steps") + if not isinstance(raw_steps, list): + return [] + + step_states: list[StepState] = [] + for raw_step in raw_steps: + if not isinstance(raw_step, dict): + continue + step_name = str(raw_step.get("name", "")).strip() + if not step_name: + continue + payload = minimal_steps.get(step_name) + if not isinstance(payload, dict): + step_states.append(StepState(node_id=step_name, status="queued")) + continue + ok = bool(payload.get("ok", False)) + step_states.append( + StepState( + node_id=step_name, + status="success" if ok else "failed", + started_at=str(payload.get("started_at") or "") or None, + finished_at=str(payload.get("finished_at") or "") or None, + error=RunError( + code="tool_error", + message=str(payload.get("error", f"{step_name} failed")), + ) + if not ok + else None, + ) + ) + return step_states + + +async def run_scenario_workflow( + input_data: dict[str, Any], + scenario_id: str = "news_source_discovery_v1", +) -> dict[str, Any]: + try: + scenario = load_scenario_definition(scenario_id) + except ScenarioStoreError as exc: + return ScenarioRunResponse( + scenario_id=scenario_id, + status="failed", + input=input_data, + steps=[], + error=RunError(code="unknown_scenario", message=str(exc)), + ).model_dump() + + runner = get_mcp_workflow_runner() + scenario_name = str(scenario.get("name", scenario_id)) + try: + minimal_result = await runner.run( + scenario_id=scenario_id, + input_data=input_data, + ) + except Exception as exc: + return ScenarioRunResponse( + scenario_id=scenario_id, + status="failed", + input=input_data, + scenario_name=scenario_name, + steps=[], + error=RunError(code="workflow_error", message=str(exc)), + ).model_dump() + + minimal_steps = minimal_result.get("steps", {}) + steps = ( + minimal_steps + if isinstance(minimal_steps, dict) + else {} + ) + step_states = _build_step_states_from_minimal( + scenario=scenario, + minimal_steps=steps, + ) + + final_result = minimal_result.get("final_result") + normalized_result = ( + final_result if isinstance(final_result, dict) else {"raw_content": str(final_result)} + ) + status = "success" + for payload in steps.values(): + if isinstance(payload, dict) and not bool(payload.get("ok", False)): + status = "failed" + break + + return ScenarioRunResponse( + scenario_id=scenario_id, + status=status, + input=input_data, + steps=step_states, + output_summary=_extract_output_summary(normalized_result), + scenario_name=scenario_name, + workflow_name=str(minimal_result.get("workflow_name") or scenario_id), + result=normalized_result, + error=None + if status == "success" + else RunError(code="workflow_failed", message="Workflow finished with failed status."), + run_id=minimal_result.get("run_id"), + session_id=minimal_result.get("session_id"), + ).model_dump() diff --git a/src/stub_tools.py b/src/stub_tools.py deleted file mode 100644 index 4d35eb9..0000000 --- a/src/stub_tools.py +++ /dev/null @@ -1,96 +0,0 @@ -from __future__ import annotations - -from datetime import datetime, timezone -from typing import Any - - -def _utc_now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -def _base_result(tool_name: str, ok: bool, payload: dict[str, Any]) -> dict[str, Any]: - return { - "tool_name": tool_name, - "ok": ok, - "payload": payload, - "received_at": _utc_now_iso(), - } - - -async def stub_search_news_sources(url: str) -> dict[str, Any]: - return _base_result( - tool_name="search_news_sources", - ok=True, - payload={ - "input_url": url, - "items": [ - {"url": "https://news-a.example/article-1"}, - {"url": "https://news-b.example/article-2"}, - {"url": "https://news-c.example/article-3"}, - ], - }, - ) - - -async def stub_parse_article(url: str) -> dict[str, Any]: - return _base_result( - tool_name="parse_article", - ok=True, - payload={ - "url": url, - "title": "Stub article title", - "published_at": "2026-01-01T10:00:00+00:00", - "text": "Stub parsed article content.", - }, - ) - - -async def stub_extract_publication_date(article_text: str) -> dict[str, Any]: - return _base_result( - tool_name="extract_publication_date", - ok=True, - payload={ - "text_size": len(article_text), - "published_at": "2026-01-01T10:00:00+00:00", - "confidence": 0.77, - }, - ) - - -async def stub_rank_sources_by_date(items: list[dict[str, Any]]) -> dict[str, Any]: - ranked = sorted(items, key=lambda item: str(item.get("published_at", ""))) - return _base_result( - tool_name="rank_sources_by_date", - ok=True, - payload={ - "input_count": len(items), - "ranked_items": ranked, - }, - ) - - -async def stub_generate_summary(items: list[dict[str, Any]]) -> dict[str, Any]: - first_url = "" - if items: - first_url = str(items[0].get("url", "")) - - return _base_result( - tool_name="generate_summary", - ok=True, - payload={ - "input_count": len(items), - "summary": ( - "По заглушечным данным самым ранним источником считается " - + first_url - ), - }, - ) - - -STUB_TOOLS: dict[str, Any] = { - "search_news_sources": stub_search_news_sources, - "parse_article": stub_parse_article, - "extract_publication_date": stub_extract_publication_date, - "rank_sources_by_date": stub_rank_sources_by_date, - "generate_summary": stub_generate_summary, -} diff --git a/src/workflow_runner.py b/src/workflow_runner.py deleted file mode 100644 index d6cadbd..0000000 --- a/src/workflow_runner.py +++ /dev/null @@ -1,418 +0,0 @@ -from __future__ import annotations - -from contextvars import ContextVar -from datetime import datetime, timezone -import json -from typing import Any - -from agno.workflow.step import Step, StepInput, StepOutput -from agno.workflow.workflow import Workflow -from pydantic import BaseModel, ValidationError, create_model -from src.schemas import RunError, RunStatus, ScenarioRunResponse, StepState -from src.scenario_store import ScenarioStoreError, load_scenario_definition -from src.stub_tools import ( - stub_extract_publication_date, - stub_generate_summary, - stub_parse_article, - stub_rank_sources_by_date, - stub_search_news_sources, -) - -_workflow_cache: dict[str, Workflow] = {} -_workflow_input_schemas: dict[str, type[BaseModel]] = {} -_run_steps_context: ContextVar[list[StepState] | None] = ContextVar( - "run_steps_context", - default=None, -) - - -def _json_loads(raw: str | None) -> dict[str, Any]: - if not raw: - return {} - try: - parsed = json.loads(raw) - except json.JSONDecodeError: - return {} - if isinstance(parsed, dict): - return parsed - return {} - - -def _as_json_step_output(payload: dict[str, Any]) -> StepOutput: - return StepOutput(content=json.dumps(payload, ensure_ascii=False)) - - -def _utc_now_iso() -> str: - return datetime.now(timezone.utc).isoformat() - - -def _initialize_step_states(scenario: dict[str, Any]) -> list[StepState]: - raw_steps = scenario.get("steps") - if not isinstance(raw_steps, list): - return [] - - step_states: list[StepState] = [] - for raw_step in raw_steps: - if not isinstance(raw_step, dict): - continue - node_id = str(raw_step.get("name", "")).strip() - if not node_id: - continue - step_states.append(StepState(node_id=node_id, status="queued")) - return step_states - - -def _update_step_state( - node_id: str, - status: str, - error: RunError | None = None, -) -> None: - step_states = _run_steps_context.get() - if not step_states: - return - - for step_state in step_states: - if step_state.node_id != node_id: - continue - step_state.status = status - if status == "running" and step_state.started_at is None: - step_state.started_at = _utc_now_iso() - if status in {"success", "failed", "waiting_human"}: - if step_state.started_at is None: - step_state.started_at = _utc_now_iso() - step_state.finished_at = _utc_now_iso() - step_state.error = error - return - - -def _mark_running_steps_failed(message: str) -> None: - step_states = _run_steps_context.get() - if not step_states: - return - - for step_state in step_states: - if step_state.status == "running": - step_state.status = "failed" - if step_state.started_at is None: - step_state.started_at = _utc_now_iso() - step_state.finished_at = _utc_now_iso() - step_state.error = RunError(code="workflow_error", message=message) - - -def _extract_output_summary(content: Any) -> str | None: - if not isinstance(content, dict): - return None - summary = content.get("summary") - if isinstance(summary, str) and summary: - return summary - payload = content.get("payload") - if isinstance(payload, dict): - payload_summary = payload.get("summary") - if isinstance(payload_summary, str) and payload_summary: - return payload_summary - return None - - -def _build_run_response( - *, - scenario_id: str, - input_data: dict[str, Any], - status: RunStatus, - steps: list[StepState], - scenario_name: str | None = None, - workflow_name: str | None = None, - output_summary: str | None = None, - result: dict[str, Any] | None = None, - error: RunError | None = None, - run_id: str | None = None, - session_id: str | None = None, -) -> dict[str, Any]: - return ScenarioRunResponse( - scenario_id=scenario_id, - status=status, - input=input_data, - steps=steps, - output_summary=output_summary, - scenario_name=scenario_name, - workflow_name=workflow_name, - result=result, - error=error, - run_id=run_id, - session_id=session_id, - ).model_dump() - - -def _extract_input_url(step_input_value: Any) -> str: - if isinstance(step_input_value, dict): - return str(step_input_value.get("url", "")).strip() - return str(step_input_value).strip() - - -def _build_input_schema_model(scenario: dict[str, Any]) -> type[BaseModel] | None: - input_schema = scenario.get("input_schema") - if not isinstance(input_schema, dict): - return None - - properties = input_schema.get("properties") - if not isinstance(properties, dict): - return None - - required_raw = input_schema.get("required", []) - required_fields = set(required_raw) if isinstance(required_raw, list) else set() - fields: dict[str, tuple[type[str], Any]] = {} - - for field_name, field_schema in properties.items(): - if not isinstance(field_name, str) or not isinstance(field_schema, dict): - continue - if field_schema.get("type") != "string": - continue - default_value = ... if field_name in required_fields else "" - fields[field_name] = (str, default_value) - - if not fields: - return None - - return create_model(f"{scenario.get('scenario_id', 'Scenario')}Input", **fields) - - -async def _search_news_sources_executor(step_input: StepInput) -> StepOutput: - _update_step_state("search_news_sources", "running") - input_url = _extract_input_url(step_input.input) - if not input_url: - _update_step_state( - "search_news_sources", - "failed", - error=RunError(code="invalid_input", message="input.url is empty"), - ) - return StepOutput(content="search_news_sources failed: input.url is empty", success=False) - search_result = await stub_search_news_sources(url=input_url) - _update_step_state("search_news_sources", "success") - return _as_json_step_output(search_result) - - -async def _parse_article_executor(step_input: StepInput) -> StepOutput: - _update_step_state("parse_articles_batch", "running") - previous_payload = _json_loads(step_input.previous_step_content) - items = previous_payload.get("payload", {}).get("items", []) - - parsed_items: list[dict[str, Any]] = [] - for item in items: - source_url = str(item.get("url", "")) - parsed_result = await stub_parse_article(url=source_url) - if not parsed_result.get("ok", False): - _update_step_state( - "parse_articles_batch", - "failed", - error=RunError(code="tool_error", message="parse_article failed"), - ) - return StepOutput(content="parse_article failed", success=False) - parsed_items.append(parsed_result.get("payload", {})) - - _update_step_state("parse_articles_batch", "success") - return _as_json_step_output( - { - "tool_name": "parse_articles_batch", - "ok": True, - "payload": {"items": parsed_items}, - } - ) - - -async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput: - _update_step_state("extract_publication_date_batch", "running") - previous_payload = _json_loads(step_input.previous_step_content) - parsed_items = previous_payload.get("payload", {}).get("items", []) - - dated_items: list[dict[str, Any]] = [] - for item in parsed_items: - article_text = str(item.get("text", "")) - extract_result = await stub_extract_publication_date(article_text=article_text) - if not extract_result.get("ok", False): - _update_step_state( - "extract_publication_date_batch", - "failed", - error=RunError(code="tool_error", message="extract_publication_date failed"), - ) - return StepOutput(content="extract_publication_date failed", success=False) - - dated_items.append( - { - "url": str(item.get("url", "")), - "title": str(item.get("title", "")), - "published_at": str( - extract_result.get("payload", {}).get("published_at", "") - ), - } - ) - - _update_step_state("extract_publication_date_batch", "success") - return _as_json_step_output( - { - "tool_name": "extract_publication_date_batch", - "ok": True, - "payload": {"items": dated_items}, - } - ) - - -async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput: - _update_step_state("rank_sources_by_date", "running") - previous_payload = _json_loads(step_input.previous_step_content) - items = previous_payload.get("payload", {}).get("items", []) - rank_result = await stub_rank_sources_by_date(items=items) - _update_step_state("rank_sources_by_date", "success") - return _as_json_step_output(rank_result) - - -async def _generate_summary_executor(step_input: StepInput) -> StepOutput: - _update_step_state("generate_summary", "running") - previous_payload = _json_loads(step_input.previous_step_content) - ranked_items = previous_payload.get("payload", {}).get("ranked_items", []) - summary_result = await stub_generate_summary(items=ranked_items) - _update_step_state("generate_summary", "success") - return _as_json_step_output(summary_result) - - -_step_executors = { - "search_news_sources": _search_news_sources_executor, - "parse_articles_batch": _parse_article_executor, - "extract_publication_date_batch": _extract_publication_date_executor, - "rank_sources_by_date": _rank_sources_by_date_executor, - "generate_summary": _generate_summary_executor, -} - - -def get_workflow_for_scenario(scenario_id: str, scenario: dict[str, Any]) -> Workflow: - cached_workflow = _workflow_cache.get(scenario_id) - if cached_workflow is not None: - return cached_workflow - - raw_steps = scenario.get("steps") - if not isinstance(raw_steps, list) or not raw_steps: - raise ScenarioStoreError("Scenario must contain non-empty steps list") - - workflow_steps: list[Step] = [] - for raw_step in raw_steps: - if not isinstance(raw_step, dict): - raise ScenarioStoreError("Each scenario step must be object") - step_name = str(raw_step.get("name", "")).strip() - if not step_name: - raise ScenarioStoreError("Each scenario step must have non-empty name") - step_executor = _step_executors.get(step_name) - if step_executor is None: - raise ScenarioStoreError(f"Unknown step executor: {step_name}") - workflow_steps.append( - Step( - name=step_name, - description=str(raw_step.get("description", step_name)), - executor=step_executor, - ) - ) - - input_schema_model = _build_input_schema_model(scenario) - workflow = Workflow( - name=scenario_id, - description=str(scenario.get("description", "")), - steps=workflow_steps, - input_schema=input_schema_model, - ) - if input_schema_model is not None: - _workflow_input_schemas[scenario_id] = input_schema_model - _workflow_cache[scenario_id] = workflow - return workflow - - -async def run_scenario_workflow( - input_data: dict[str, Any], - scenario_id: str = "news_source_discovery_v1", -) -> dict[str, Any]: - try: - scenario = load_scenario_definition(scenario_id) - except ScenarioStoreError as exc: - return _build_run_response( - scenario_id=scenario_id, - input_data=input_data, - status="failed", - steps=[], - error=RunError(code="unknown_scenario", message=str(exc)), - ) - - step_states = _initialize_step_states(scenario) - scenario_name = str(scenario.get("name", scenario_id)) - workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario) - input_schema_model = _workflow_input_schemas.get(scenario_id) - if input_schema_model is not None: - try: - input_schema_model.model_validate(input_data) - except ValidationError as exc: - return _build_run_response( - scenario_id=scenario_id, - input_data=input_data, - status="failed", - scenario_name=scenario_name, - steps=step_states, - error=RunError( - code="invalid_input", - message=f"Input does not match scenario input_schema: {exc}", - ), - ) - - context_token = _run_steps_context.set(step_states) - try: - run_output = await workflow.arun(input=input_data) - except Exception as exc: - _mark_running_steps_failed(str(exc)) - return _build_run_response( - scenario_id=scenario_id, - input_data=input_data, - status="failed", - scenario_name=scenario_name, - steps=step_states, - error=RunError(code="workflow_error", message=str(exc)), - ) - finally: - _run_steps_context.reset(context_token) - - content: Any = run_output.content if hasattr(run_output, "content") else {} - if isinstance(content, str): - try: - content = json.loads(content) - except json.JSONDecodeError: - content = {"raw_content": content} - output_summary = _extract_output_summary(content) - normalized_result = content if isinstance(content, dict) else {"raw_content": str(content)} - - if hasattr(run_output, "success") and not bool(getattr(run_output, "success")): - return _build_run_response( - scenario_id=scenario_id, - input_data=input_data, - status="failed", - scenario_name=scenario_name, - steps=step_states, - output_summary=output_summary, - result=normalized_result, - error=RunError( - code="workflow_failed", - message="Workflow finished with failed status.", - ), - ) - - run_id: str | None = None - session_id: str | None = None - if hasattr(run_output, "run_id"): - run_id = str(getattr(run_output, "run_id")) - if hasattr(run_output, "session_id"): - session_id = str(getattr(run_output, "session_id")) - - return _build_run_response( - scenario_id=scenario_id, - input_data=input_data, - status="success", - workflow_name=workflow.name, - scenario_name=scenario_name, - steps=step_states, - output_summary=output_summary, - result=normalized_result, - run_id=run_id, - session_id=session_id, - )