From 3357b3c4dda094a5619458c94f08feb021160ace Mon Sep 17 00:00:00 2001 From: Barabashka Date: Fri, 24 Apr 2026 11:56:37 +0300 Subject: [PATCH] =?UTF-8?q?=D0=A3=D1=81=D0=B8=D0=BB=D0=B8=D1=82=D1=8C=20?= =?UTF-8?q?=D0=BD=D0=B0=D0=B4=D1=91=D0=B6=D0=BD=D0=BE=D1=81=D1=82=D1=8C:?= =?UTF-8?q?=20=D0=BB=D0=BE=D0=B3=D0=B8=D1=80=D0=BE=D0=B2=D0=B0=D0=BD=D0=B8?= =?UTF-8?q?=D0=B5,=20lifespan,=20LRU-=D0=BA=D1=8D=D1=88=20=D0=B8=20fail-fa?= =?UTF-8?q?st=20=D1=81=D0=B5=D0=BC=D0=B0=D0=BD=D1=82=D0=B8=D0=BA=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Подключить loguru и заменить молчаливые except на warning/exception в step_planner, mcp_client и mcp_workflow_runner — раньше ошибки терялись в пустых дикт-возвратах.\n Перенести Phoenix tracing из module-level в FastAPI lifespan, чтобы импорт agent_os не поднимал трейсер в тестах и тулах.\n Заменить неограниченный dict _workflow_cache на OrderedDict-LRU с лимитом WORKFLOW_CACHE_MAX_SIZE (default 64) — чтобы кэш не рос бесконечно при разных scenario_id.\n Зафиксировать инвариант fail-fast: шаги, не дошедшие до исполнения из-за падения upstream, возвращаются со статусом skipped (для UI), а не queued; run помечается success только если все payload.ok.\n Добавить module docstrings во все модули src/ по STYLE_GUIDE cookbook. Запинить версии зависимостей в requirements.txt. --- .gitignore | 1 + requirements.txt | 19 ++++++++++--------- src/agent_os.py | 29 +++++++++++++++++++++++++---- src/agent_runner.py | 8 ++++++++ src/api_routes.py | 8 ++++++++ src/mcp_client.py | 9 +++++++++ src/mcp_workflow_runner.py | 28 ++++++++++++++++++++++++++-- src/observability.py | 18 ++++++++++++++++-- src/scenario_store.py | 7 +++++++ src/schemas.py | 4 +++- src/step_planner.py | 16 ++++++++++++++++ src/template.py | 7 +++++++ 12 files changed, 136 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 43a5c0b..66aa998 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ dist/ .vscode/ .DS_Store .cursor +.claude # Cookbook code vendor/agno/cookbook/ \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index ef9f881..695724b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,9 +1,10 @@ -agno -fastapi -uvicorn -python-dotenv -ollama -socksio -openai -arize-phoenix-otel -openinference-instrumentation-agno +agno==2.5.17 +fastapi==0.136.0 +uvicorn==0.44.0 +python-dotenv==1.2.2 +ollama==0.6.1 +socksio==1.0.0 +openai==2.32.0 +arize-phoenix-otel==0.15.0 +openinference-instrumentation-agno==0.1.30 +loguru==0.7.3 diff --git a/src/agent_os.py b/src/agent_os.py index 994fbee..3840953 100644 --- a/src/agent_os.py +++ b/src/agent_os.py @@ -1,26 +1,47 @@ +"""AgentOS entrypoint: wires the agent, REST routes and FastAPI lifespan. + +Phoenix tracing is initialized from the lifespan (not at import time) so that +importing this module for tooling or tests does not spin up the tracer. +""" + +from __future__ import annotations + import os +from contextlib import asynccontextmanager from dotenv import load_dotenv from fastapi import FastAPI +from loguru import logger from agno.os import AgentOS -from src.api_routes import router as api_router from src.agent_runner import get_agent -from src.observability import init_phoenix_tracing +from src.api_routes import router as api_router +from src.observability import init_phoenix_tracing, is_phoenix_tracing_enabled load_dotenv() -_tracing_enabled = init_phoenix_tracing() + + +@asynccontextmanager +async def _lifespan(_app: FastAPI): + init_phoenix_tracing() + logger.info("Prisma Platform API starting up") + try: + yield + finally: + logger.info("Prisma Platform API shutting down") + _agent = get_agent() _base_app = FastAPI( title="Prisma Platform API", version="0.1.0", + lifespan=_lifespan, ) _base_app.include_router(api_router) _agent_os = AgentOS( agents=[_agent], - tracing=_tracing_enabled, + tracing=is_phoenix_tracing_enabled(), base_app=_base_app, ) app = _agent_os.get_app() diff --git a/src/agent_runner.py b/src/agent_runner.py index 441ee62..9bd3371 100644 --- a/src/agent_runner.py +++ b/src/agent_runner.py @@ -1,3 +1,11 @@ +"""Lazy factory for the top-level Prisma agent. + +Config is read from environment variables so the same module can be used by +the API server, CLI tools and tests without re-wiring. +""" + +from __future__ import annotations + import os from agno.agent import Agent diff --git a/src/api_routes.py b/src/api_routes.py index 08c9383..505281b 100644 --- a/src/api_routes.py +++ b/src/api_routes.py @@ -1,3 +1,11 @@ +"""REST routes for scenario execution. + +These endpoints live on the FastAPI ``base_app`` that AgentOS composes with +its own routes, so the prefix ``/api`` does not collide with AgentOS paths. +""" + +from __future__ import annotations + from fastapi import APIRouter from src.mcp_workflow_runner import run_scenario diff --git a/src/mcp_client.py b/src/mcp_client.py index 89c61d7..26cccd3 100644 --- a/src/mcp_client.py +++ b/src/mcp_client.py @@ -1,3 +1,9 @@ +"""Thin async client for MCP tool invocation over streamable HTTP. + +Opens a short-lived ``ClientSession`` per call, wraps the tool response in +a normalized dict, and raises ``RuntimeError`` on transport/tool errors. +""" + from __future__ import annotations from datetime import timedelta @@ -5,6 +11,7 @@ import json import os from typing import Any +from loguru import logger from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client from mcp.types import TextContent @@ -33,8 +40,10 @@ async def call_mcp_tool(tool_name: str, arguments: dict[str, Any]) -> dict[str, await session.initialize() result = await session.call_tool(tool_name, arguments) except TimeoutError as exc: + logger.warning("MCP timeout: tool={}", tool_name) raise RuntimeError(f"MCP timeout: {tool_name}") from exc except Exception as exc: + logger.exception("MCP transport error: tool={}", tool_name) raise RuntimeError(f"MCP transport error: {tool_name}") from exc if result.isError: diff --git a/src/mcp_workflow_runner.py b/src/mcp_workflow_runner.py index 1904baa..d9868bd 100644 --- a/src/mcp_workflow_runner.py +++ b/src/mcp_workflow_runner.py @@ -1,5 +1,14 @@ +"""Builds and runs Agno workflows from JSON scenario definitions. + +Each scenario step is a typed MCP tool call. The runner resolves argument +templates from ``session_state``, optionally lets an LLM planner repair +missing fields, invokes the tool, and collects per-step results back into +``session_state`` for downstream steps. +""" + from __future__ import annotations +from collections import OrderedDict from copy import deepcopy from datetime import datetime, timezone import json @@ -8,6 +17,7 @@ from typing import Any, Awaitable, Callable from agno.workflow.step import Step, StepInput, StepOutput from agno.workflow.workflow import Workflow +from loguru import logger from src.mcp_client import call_mcp_tool from src.schemas import ScenarioRunResponse, StepState @@ -193,6 +203,7 @@ def _build_tool_executor( "finished_at": finished_at, } session_state.setdefault("steps", {})[step_name] = error_payload + logger.exception("Step {} failed (tool={})", step_name, tool_name) raise RuntimeError(f"{step_name} failed: {exc}") from exc return executor @@ -215,6 +226,9 @@ def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow: if not step_name or not tool_name: raise ScenarioStoreError("Each tool step must contain non-empty name and tool") + # Fail-fast by design: the run is considered successful only when every + # step passes. There is no per-step retry or skip policy — downstream + # steps rely on upstream output, so on any failure the workflow stops. workflow_steps.append( Step( name=step_name, @@ -232,15 +246,20 @@ def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow: ) -_workflow_cache: dict[str, Workflow] = {} +_WORKFLOW_CACHE_MAX_SIZE = _env_int("WORKFLOW_CACHE_MAX_SIZE", 64) +_workflow_cache: "OrderedDict[str, Workflow]" = OrderedDict() def _get_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow: cached = _workflow_cache.get(scenario_id) if cached is not None: + _workflow_cache.move_to_end(scenario_id) return cached workflow = _build_workflow(scenario_id, scenario) _workflow_cache[scenario_id] = workflow + if len(_workflow_cache) > _WORKFLOW_CACHE_MAX_SIZE: + evicted_id, _ = _workflow_cache.popitem(last=False) + logger.debug("Evicted workflow from LRU cache: {}", evicted_id) return workflow @@ -275,10 +294,11 @@ def _build_step_states( continue payload = steps_payloads.get(name) if not isinstance(payload, dict): + # Workflow aborted before this step ran (strict fail-fast policy). states.append( StepState( node_id=name, - status="queued", + status="skipped", message="", ) ) @@ -338,10 +358,14 @@ async def run_scenario( ) except Exception as exc: workflow_error = str(exc) + logger.exception("Workflow {} failed", scenario_id) steps_payloads = session_state.get("steps", {}) or {} step_states = _build_step_states(scenario, steps_payloads) + # Strict invariant: run is success only when every recorded step payload + # has ok=true. `on_error: skip` lets downstream steps keep running after a + # failure, but it does NOT whitewash the overall run status. status = "success" if workflow_error is not None: status = "failed" diff --git a/src/observability.py b/src/observability.py index 0659b95..853f060 100644 --- a/src/observability.py +++ b/src/observability.py @@ -1,5 +1,15 @@ +"""Phoenix (Arize) OpenTelemetry tracing setup. + +Tracing is initialized via the FastAPI lifespan so that import-time side effects +stay out of module load. ``is_phoenix_tracing_enabled`` is cheap and can be +consulted before the app starts (for example, to pass a flag into AgentOS). +""" + +from __future__ import annotations + import os +from loguru import logger from phoenix.otel import register _initialized = False @@ -12,11 +22,14 @@ def _env_bool(name: str, default: bool) -> bool: return value.strip().lower() in {"1", "true", "yes", "on"} +def is_phoenix_tracing_enabled() -> bool: + return _env_bool("PHOENIX_TRACING_ENABLED", False) + + def init_phoenix_tracing() -> bool: global _initialized - enabled = _env_bool("PHOENIX_TRACING_ENABLED", False) - if not enabled: + if not is_phoenix_tracing_enabled(): return False if _initialized: @@ -33,4 +46,5 @@ def init_phoenix_tracing() -> bool: auto_instrument=True, ) _initialized = True + logger.info("Phoenix tracing initialized (project={})", project_name) return True diff --git a/src/scenario_store.py b/src/scenario_store.py index bc09e29..6f9c8d5 100644 --- a/src/scenario_store.py +++ b/src/scenario_store.py @@ -1,3 +1,10 @@ +"""File-backed loader for scenario definitions. + +Scenarios live under ``scenarios/`` and are indexed by ``scenarios/index.json``. +Each scenario is a JSON object with a ``scenario_id`` that must match the +index key it was looked up by. +""" + from __future__ import annotations import json diff --git a/src/schemas.py b/src/schemas.py index 8297d9f..a04bcfe 100644 --- a/src/schemas.py +++ b/src/schemas.py @@ -1,3 +1,5 @@ +"""Pydantic schemas for the scenario-run REST API.""" + from __future__ import annotations from typing import Any, Literal @@ -5,7 +7,7 @@ from typing import Any, Literal from pydantic import BaseModel, Field RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"] -StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"] +StepStatus = Literal["queued", "running", "success", "failed", "skipped", "waiting_human"] class ScenarioRunRequest(BaseModel): diff --git a/src/step_planner.py b/src/step_planner.py index 66cee20..a0e7b02 100644 --- a/src/step_planner.py +++ b/src/step_planner.py @@ -1,3 +1,12 @@ +"""LLM-backed fallback planner for MCP tool arguments. + +When a step's resolved arguments are missing required fields, this module +calls an OpenAI-compatible chat completion to fill them from the current +scope (``input`` + prior ``steps``). The planner is best-effort: on any +failure it returns the base arguments unchanged so the caller's validator +can produce a clean error. +""" + from __future__ import annotations from copy import deepcopy @@ -5,6 +14,7 @@ import json import os from typing import Any +from loguru import logger from openai import AsyncOpenAI @@ -121,6 +131,12 @@ async def plan_arguments( raw = completion.choices[0].message.content if completion.choices else "" planned = _extract_arguments(raw) except Exception: + logger.warning( + "Planner call failed for step={} tool={} attempt={}", + step_name, + tool_name, + attempt_no, + ) planned = {} merged = deepcopy(base_arguments) diff --git a/src/template.py b/src/template.py index a56394b..0485d99 100644 --- a/src/template.py +++ b/src/template.py @@ -1,3 +1,10 @@ +"""Variable templating for scenario step inputs. + +A dict of shape ``{"from": "path.to.value"}`` resolves to the value at that +dotted path in the current scope. Nested dicts/lists are resolved +recursively; plain values pass through via ``deepcopy``. +""" + from __future__ import annotations from copy import deepcopy