Compare commits

...

1 Commits

Author SHA1 Message Date
Barabashka 4d037e52eb Упрощение MCP workflow runner и обновить контракт /api/runs.
Перенесены planner/template хелперы в отдельные модули, выровнен формат статусов и сообщений в ответе, а также обновлены .env.example и README под текущие переменные и поведение API.
2026-04-23 12:41:33 +03:00
7 changed files with 504 additions and 594 deletions
+18 -3
View File
@@ -1,18 +1,33 @@
# Agent
AGENT_ID=prisma-agent AGENT_ID=prisma-agent
OLLAMA_MODEL_ID=gemma4:31b
OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0
AGENT_MARKDOWN=false AGENT_MARKDOWN=false
AGENT_DEBUG_MODE=true AGENT_DEBUG_MODE=true
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly." AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
# Agent model (Ollama)
OLLAMA_MODEL_ID=gemma4:31b
OLLAMA_HOST=http://localhost:11435
OLLAMA_TEMPERATURE=0
# API runtime
AGENT_OS_HOST=127.0.0.1 AGENT_OS_HOST=127.0.0.1
AGENT_OS_PORT=7777 AGENT_OS_PORT=7777
# Planner
PLANNER_ENABLED=false
PLANNER_REPAIR_ATTEMPTS=3
# Planner model (Polza)
POLZA_BASE_URL=https://api.polza.ai/v1 POLZA_BASE_URL=https://api.polza.ai/v1
POLZA_MODEL_ID=google/gemma-4-31b-it POLZA_MODEL_ID=google/gemma-4-31b-it
POLZA_API_KEY=key POLZA_API_KEY=key
POLZA_TEMPERATURE=0 POLZA_TEMPERATURE=0
# MCP
MCP_BASE_URL=http://127.0.0.1:8081/mcp MCP_BASE_URL=http://127.0.0.1:8081/mcp
MCP_TIMEOUT_SECONDS=10 MCP_TIMEOUT_SECONDS=10
# Observability (Phoenix)
PHOENIX_TRACING_ENABLED=false PHOENIX_TRACING_ENABLED=false
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006 PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
PHOENIX_PROJECT_NAME=prisma-platform PHOENIX_PROJECT_NAME=prisma-platform
+24 -4
View File
@@ -25,7 +25,8 @@ prisma_platform/
├── scenarios/ ├── scenarios/
│ ├── index.json │ ├── index.json
│ └── news_source_discovery/ │ └── news_source_discovery/
── v1.json ── v1.json
│ └── v1_planner_repair.json
└── src/ └── src/
├── __init__.py ├── __init__.py
├── api_routes.py ├── api_routes.py
@@ -35,6 +36,8 @@ prisma_platform/
├── mcp_workflow_runner.py ├── mcp_workflow_runner.py
├── observability.py ├── observability.py
├── scenario_store.py ├── scenario_store.py
├── step_planner.py
├── template.py
└── schemas.py └── schemas.py
``` ```
@@ -101,22 +104,39 @@ curl -s -X POST "http://127.0.0.1:7777/api/runs" \
Успешный ответ содержит: Успешный ответ содержит:
- `status=success` - `status=success`
- список `steps` со статусами шагов - `message=""`
- список `steps` со статусами и временем шагов
- `output_summary` - `output_summary`
- `result` итогового шага - `result` итогового шага
При ошибке:
- `status=failed`
- `message` содержит текст ошибки
## Переменные окружения ## Переменные окружения
Основные: Agent:
- `AGENT_ID` (default: `prisma-agent`) - `AGENT_ID` (default: `prisma-agent`)
- `AGENT_MARKDOWN` (default: `false`) - `AGENT_MARKDOWN` (default: `false`)
- `AGENT_DEBUG_MODE` (default: `true`) - `AGENT_DEBUG_MODE` (default: `true`)
- `AGENT_INSTRUCTIONS` - `AGENT_INSTRUCTIONS`
- `OLLAMA_MODEL_ID` (default: `gemma4:31b`)
- `OLLAMA_HOST` (default: `http://localhost:11435`)
- `OLLAMA_TEMPERATURE` (default: `0`)
API runtime:
- `AGENT_OS_HOST` (default: `127.0.0.1`) - `AGENT_OS_HOST` (default: `127.0.0.1`)
- `AGENT_OS_PORT` (default: `7777`) - `AGENT_OS_PORT` (default: `7777`)
Planner-модель (`polza.ai`): Planner:
- `PLANNER_ENABLED` (default: `false`)
- `PLANNER_REPAIR_ATTEMPTS` (default: `3`)
Planner model (`polza.ai`):
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`) - `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`) - `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
+4 -5
View File
@@ -1,15 +1,14 @@
from fastapi import APIRouter from fastapi import APIRouter
from src.mcp_workflow_runner import run_scenario_workflow from src.mcp_workflow_runner import run_scenario
from src.schemas import ScenarioRunRequest, ScenarioRunResponse from src.schemas import ScenarioRunRequest, ScenarioRunResponse
router = APIRouter(prefix="/api", tags=["workflow"]) router = APIRouter(prefix="/api", tags=["workflow"])
@router.post("/runs", response_model=ScenarioRunResponse) @router.post("/runs", response_model=ScenarioRunResponse)
async def run_scenario(request: ScenarioRunRequest) -> ScenarioRunResponse: async def post_run(request: ScenarioRunRequest) -> ScenarioRunResponse:
result = await run_scenario_workflow( return await run_scenario(
input_data=request.input,
scenario_id=request.scenario_id, scenario_id=request.scenario_id,
input_data=request.input,
) )
return ScenarioRunResponse.model_validate(result)
+276 -575
View File
@@ -1,574 +1,256 @@
from __future__ import annotations from __future__ import annotations
from contextvars import ContextVar
from copy import deepcopy from copy import deepcopy
from datetime import datetime, timezone from datetime import datetime, timezone
import json import json
import os import os
from typing import Any from typing import Any, Awaitable, Callable
from agno.workflow.step import Step, StepInput, StepOutput from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow from agno.workflow.workflow import Workflow
from openai import AsyncOpenAI
from src.mcp_client import call_mcp_tool from src.mcp_client import call_mcp_tool
from src.schemas import RunError, ScenarioRunResponse, StepState from src.schemas import ScenarioRunResponse, StepState
from src.scenario_store import ScenarioStoreError, load_scenario_definition from src.scenario_store import ScenarioStoreError, load_scenario_definition
from src.step_planner import plan_arguments, planner_enabled
_planner_client: AsyncOpenAI | None = None from src.template import (
missing_required_fields,
resolve_path,
def _env_float(name: str, default: float) -> float: resolve_template,
value = os.getenv(name) validate_required_fields,
if value is None: )
return default
return float(value)
def _env_int(name: str, default: int) -> int: def _env_int(name: str, default: int) -> int:
value = os.getenv(name) value = os.getenv(name)
if value is None: return int(value) if value is not None else default
return default
return int(value)
def _utc_now_iso() -> str: def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
def get_shared_step_planner_client() -> AsyncOpenAI: def _build_scope(session_state: dict[str, Any]) -> dict[str, Any]:
global _planner_client return {
if _planner_client is not None: "input": session_state.get("input", {}),
return _planner_client "steps": session_state.get("steps", {}),
}
polza_base_url = os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1")
polza_api_key = os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY")
_planner_client = AsyncOpenAI(
base_url=polza_base_url,
api_key=polza_api_key,
)
return _planner_client
def _resolve_path(scope: dict[str, Any], path: str) -> Any: async def _prepare_arguments(
value: Any = scope *,
for segment in path.split("."):
key = segment.strip()
if not key:
continue
if not isinstance(value, dict):
return None
value = value.get(key)
return deepcopy(value)
def _resolve_template(template: Any, scope: dict[str, Any]) -> Any:
if isinstance(template, dict):
if set(template.keys()) == {"from"}:
return _resolve_path(scope, str(template["from"]))
return {key: _resolve_template(value, scope) for key, value in template.items()}
if isinstance(template, list):
return [_resolve_template(item, scope) for item in template]
return deepcopy(template)
def _validate_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
step_name: str, step_name: str,
) -> None: tool_name: str,
missing_fields: list[str] = [] base_arguments: dict[str, Any],
for field in required_fields: required_fields: list[str],
value = arguments.get(field) scope: dict[str, Any],
if isinstance(value, str) and value.strip(): ) -> dict[str, Any]:
continue final_arguments = deepcopy(base_arguments)
if value not in (None, "", [], {}): missing = missing_required_fields(final_arguments, required_fields)
continue if missing and planner_enabled():
missing_fields.append(field) max_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
if missing_fields: for attempt in range(1, max_attempts + 1):
fields_str = ", ".join(missing_fields) final_arguments = await plan_arguments(
raise ValueError(f"{step_name}: missing required fields: {fields_str}") step_name=step_name,
tool_name=tool_name,
base_arguments=final_arguments,
def _missing_required_fields(arguments: dict[str, Any], required_fields: list[str]) -> list[str]: required_fields=required_fields,
missing_fields: list[str] = [] scope=scope,
for field in required_fields: missing_fields=missing,
value = arguments.get(field) attempt_no=attempt,
if isinstance(value, str) and value.strip():
continue
if value not in (None, "", [], {}):
continue
missing_fields.append(field)
return missing_fields
def _build_arguments_schema(required_fields: list[str]) -> dict[str, Any]:
properties = {field: {"type": "any"} for field in required_fields}
return {
"type": "object",
"required": required_fields,
"properties": properties,
}
def _build_polza_response_schema(required_fields: list[str]) -> dict[str, Any]:
value_schema: dict[str, Any] = {
"type": ["string", "number", "boolean", "array", "object", "null"]
}
arguments_properties = {field: value_schema for field in required_fields}
return {
"name": "mcp_arguments",
"strict": True,
"schema": {
"type": "object",
"properties": {
"arguments": {
"type": "object",
"properties": arguments_properties,
"required": required_fields,
"additionalProperties": True,
}
},
"required": ["arguments"],
"additionalProperties": False,
},
}
def _extract_planned_arguments(content: Any) -> dict[str, Any]:
candidate: Any = content
if isinstance(candidate, str):
text = candidate.strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.startswith("json"):
text = text[4:].strip()
try:
candidate = json.loads(text)
except json.JSONDecodeError:
return {}
if isinstance(candidate, dict):
if isinstance(candidate.get("arguments"), dict):
return candidate["arguments"]
# Some models return the arguments object directly.
return candidate
return {}
class McpWorkflowRunner:
"""
Minimal workflow runner:
- fixed step order from scenario
- same planner agent in every step
- MCP call executed by code, not by the agent
- request/response persisted in run context
"""
def __init__(self) -> None:
self._workflow_cache: dict[str, Workflow] = {}
self._planner_repair_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
self._run_state_ctx: ContextVar[dict[str, Any] | None] = ContextVar(
"mcp_workflow_run_state",
default=None,
)
def _get_run_state(self) -> dict[str, Any]:
run_state = self._run_state_ctx.get()
if run_state is None:
raise RuntimeError("run state is not initialized")
return run_state
def _build_scope(self) -> dict[str, Any]:
run_state = self._get_run_state()
return {
"input": run_state.get("input", {}),
"steps": run_state.get("steps", {}),
}
async def _plan_arguments(
self,
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
planner_cache: dict[str, dict[str, Any]] | None = None,
missing_fields: list[str] | None = None,
attempt_no: int = 1,
) -> dict[str, Any]:
cache_key: str | None = None
if planner_cache is not None:
try:
cache_payload = {
"tool_name": tool_name,
"base_arguments": base_arguments,
"required_fields": required_fields,
"missing_fields": missing_fields or [],
"attempt_no": attempt_no,
}
cache_key = json.dumps(cache_payload, sort_keys=True, ensure_ascii=False)
except TypeError:
cache_key = None
if cache_key is not None and cache_key in planner_cache:
return deepcopy(planner_cache[cache_key])
planner_context = {
"input": scope.get("input", {}),
"steps": scope.get("steps", {}),
}
for key, value in scope.items():
if key in {"input", "steps"}:
continue
planner_context[key] = value
prompt = {
"task": "Prepare MCP arguments for this step.",
"step_name": step_name,
"tool_name": tool_name,
"required_fields": required_fields,
"base_arguments": base_arguments,
"missing_fields": missing_fields or [],
"repair_attempt": attempt_no,
"arguments_schema": _build_arguments_schema(required_fields),
"context": planner_context,
"response_contract": {
"must_return": {"arguments": "object"},
"must_include_fields": missing_fields or [],
"forbidden": "extra unrelated keys",
},
"output": (
"Return only JSON object with key 'arguments'. "
"If missing_fields is not empty, fill every missing field from context."
),
}
prompt_json = json.dumps(prompt, ensure_ascii=False)
planned: dict[str, Any] = {}
# Primary path: strict structured output via Polza response_format/json_schema.
try:
completion = await get_shared_step_planner_client().chat.completions.create(
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
messages=[
{
"role": "system",
"content": (
"You are a tool-input planner. "
"Return only JSON that matches the provided schema."
),
},
{"role": "user", "content": prompt_json},
],
response_format={
"type": "json_schema",
"json_schema": _build_polza_response_schema(required_fields),
},
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
) )
raw_content = completion.choices[0].message.content if completion.choices else "" missing = missing_required_fields(final_arguments, required_fields)
planned = _extract_planned_arguments(raw_content) if not missing:
except Exception: break
planned = {} validate_required_fields(final_arguments, required_fields, step_name)
return final_arguments
if not isinstance(planned, dict):
planned = {}
# Allow planner to override/fill base arguments while keeping known defaults. async def _execute_one_call(
merged = deepcopy(base_arguments) *,
merged.update(planned) step_name: str,
if planner_cache is not None and cache_key is not None: tool_name: str,
planner_cache[cache_key] = deepcopy(merged) required_fields: list[str],
return merged input_template: Any,
scope: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Any]]:
resolved = resolve_template(input_template, scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
arguments = await _prepare_arguments(
step_name=step_name,
tool_name=tool_name,
base_arguments=base_arguments,
required_fields=required_fields,
scope=scope,
)
tool_response = await call_mcp_tool(tool_name, arguments)
return arguments, tool_response
def _build_tool_step_executor(self, step_spec: dict[str, Any]):
step_name = str(step_spec["name"])
tool_name = str(step_spec["tool"])
input_template = step_spec.get("input", {})
foreach_spec = step_spec.get("foreach")
collect_template = step_spec.get("collect")
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
required_fields_raw = step_spec.get("required_input_fields", [])
required_fields = (
[field for field in required_fields_raw if isinstance(field, str)]
if isinstance(required_fields_raw, list)
else []
)
if isinstance(foreach_spec, dict):
source_path = str(foreach_spec.get("from", "")).strip()
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
else:
source_path = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
item_alias = "item"
async def _executor(_step_input: StepInput) -> StepOutput: def _build_tool_executor(
run_state = self._get_run_state() step_spec: dict[str, Any],
scope = self._build_scope() ) -> Callable[[StepInput, dict[str, Any]], Awaitable[StepOutput]]:
step_started_at = _utc_now_iso() step_name = str(step_spec["name"])
planner_cache: dict[str, dict[str, Any]] = {} tool_name = str(step_spec["tool"])
input_template = step_spec.get("input", {})
foreach_spec = step_spec.get("foreach")
collect_template = step_spec.get("collect")
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
required_fields = [
f for f in step_spec.get("required_input_fields", []) if isinstance(f, str)
]
async def _prepare_arguments( if isinstance(foreach_spec, dict):
*, foreach_from = str(foreach_spec.get("from", "")).strip()
local_scope: dict[str, Any], item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
local_base_arguments: dict[str, Any], else:
) -> dict[str, Any]: foreach_from = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
final_arguments = deepcopy(local_base_arguments) item_alias = "item"
for repair_attempt in range(1, self._planner_repair_attempts + 1):
missing_fields = _missing_required_fields(final_arguments, required_fields) async def executor(_step_input: StepInput, session_state: dict[str, Any]) -> StepOutput:
if not missing_fields: started_at = _utc_now_iso()
break scope = _build_scope(session_state)
final_arguments = await self._plan_arguments(
try:
if foreach_from:
iterable = resolve_path(scope, foreach_from)
if not isinstance(iterable, list):
raise ValueError(f"{step_name}: foreach source is not list")
collected: list[Any] = []
iteration_requests: list[dict[str, Any]] = []
iteration_responses: list[dict[str, Any]] = []
last_received_at: str | None = None
for index, item in enumerate(iterable):
iteration_scope = {**scope, item_alias: item, "item": item, "index": index}
arguments, tool_response = await _execute_one_call(
step_name=step_name, step_name=step_name,
tool_name=tool_name, tool_name=tool_name,
base_arguments=final_arguments,
required_fields=required_fields, required_fields=required_fields,
scope=local_scope, input_template=input_template,
planner_cache=planner_cache, scope=iteration_scope,
missing_fields=missing_fields,
attempt_no=repair_attempt,
) )
_validate_required_fields(final_arguments, required_fields, step_name) iteration_requests.append(arguments)
return final_arguments iteration_responses.append(tool_response)
received_at = tool_response.get("received_at")
async def _call_tool_with_repair( if isinstance(received_at, str) and received_at:
*, last_received_at = received_at
initial_arguments: dict[str, Any], if collect_template is None:
) -> tuple[dict[str, Any], dict[str, Any]]: collected.append(tool_response.get("payload", {}))
final_arguments = deepcopy(initial_arguments) else:
tool_response = await call_mcp_tool(tool_name, final_arguments) collected.append(
return tool_response, final_arguments resolve_template(
collect_template,
try: {**iteration_scope, "tool": tool_response},
tool_calls = run_state.setdefault("tool_calls", [])
if not isinstance(tool_calls, list):
tool_calls = []
run_state["tool_calls"] = tool_calls
if source_path:
iterable = _resolve_path(scope, source_path)
if not isinstance(iterable, list):
raise ValueError(f"{step_name}: foreach source is not list")
collected_items: list[Any] = []
for index, item in enumerate(iterable):
iteration_scope = dict(scope)
iteration_scope[item_alias] = item
iteration_scope["item"] = item
iteration_scope["index"] = index
resolved = _resolve_template(input_template, iteration_scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
final_arguments = await _prepare_arguments(
local_scope=iteration_scope,
local_base_arguments=base_arguments,
)
tool_response, final_arguments = await _call_tool_with_repair(
initial_arguments=final_arguments,
)
tool_calls.append(
{
"step_name": step_name,
"tool_name": tool_name,
"attempt": index + 1,
"request": final_arguments,
"ok": True,
"response": tool_response,
}
)
if collect_template is None:
collected_items.append(tool_response.get("payload", {}))
else:
collected_items.append(
_resolve_template(
collect_template,
{**iteration_scope, "tool": tool_response},
)
) )
)
step_payload = { step_payload = {
"ok": True, "ok": True,
"tool_name": tool_name,
"payload": {collect_key: collected_items},
"request": {"foreach_from": source_path, "count": len(iterable)},
"received_at": _utc_now_iso(),
"started_at": step_started_at,
"finished_at": _utc_now_iso(),
}
else:
resolved = _resolve_template(input_template, scope)
base_arguments = resolved if isinstance(resolved, dict) else {}
final_arguments = await _prepare_arguments(
local_scope=scope,
local_base_arguments=base_arguments,
)
tool_response, final_arguments = await _call_tool_with_repair(
initial_arguments=final_arguments,
)
step_payload = {
"ok": bool(tool_response.get("ok", True)),
"tool_name": tool_name,
"payload": tool_response.get("payload", {}),
"request": final_arguments,
"response": tool_response,
"received_at": tool_response.get("received_at"),
"started_at": step_started_at,
"finished_at": _utc_now_iso(),
}
tool_calls.append(
{
"step_name": step_name,
"tool_name": tool_name,
"request": final_arguments,
"ok": True,
"response": tool_response,
}
)
run_state.setdefault("steps", {})[step_name] = step_payload
return StepOutput(
content=json.dumps(step_payload, ensure_ascii=False),
success=True,
)
except Exception as exc:
error_payload = {
"ok": False,
"tool_name": tool_name, "tool_name": tool_name,
"request": {}, "payload": {collect_key: collected},
"error": str(exc), "request": {
"started_at": step_started_at, "foreach_from": foreach_from,
"count": len(iterable),
"items": iteration_requests,
},
"response": {"items": iteration_responses},
"received_at": last_received_at,
"started_at": started_at,
"finished_at": _utc_now_iso(), "finished_at": _utc_now_iso(),
} }
run_state.setdefault("steps", {})[step_name] = error_payload else:
run_state.setdefault("tool_calls", []).append( arguments, tool_response = await _execute_one_call(
{ step_name=step_name,
"step_name": step_name, tool_name=tool_name,
"tool_name": tool_name, required_fields=required_fields,
"request": {}, input_template=input_template,
"ok": False, scope=scope,
"error": str(exc),
}
) )
raise RuntimeError(f"{step_name} failed: {exc}") from exc step_payload = {
"ok": bool(tool_response.get("ok", True)),
"tool_name": tool_name,
"payload": tool_response.get("payload", {}),
"request": arguments,
"response": tool_response,
"received_at": tool_response.get("received_at"),
"started_at": started_at,
"finished_at": _utc_now_iso(),
}
return _executor session_state.setdefault("steps", {})[step_name] = step_payload
return StepOutput(
def get_workflow(self, scenario_id: str, scenario: dict[str, Any]) -> Workflow: content=json.dumps(step_payload, ensure_ascii=False),
cached = self._workflow_cache.get(scenario_id) success=True,
if cached is not None:
return cached
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
if raw_step.get("type") != "tool":
raise ScenarioStoreError("This minimal runner supports only tool steps")
step_name = str(raw_step.get("name", "")).strip()
tool_name = str(raw_step.get("tool", step_name)).strip()
if not step_name or not tool_name:
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
executor = self._build_tool_step_executor(raw_step)
workflow_steps.append(
Step(
name=step_name,
description=str(raw_step.get("description", step_name)),
executor=executor,
max_retries=0,
on_error="fail",
)
) )
workflow = Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
)
self._workflow_cache[scenario_id] = workflow
return workflow
async def run(self, *, scenario_id: str, input_data: dict[str, Any]) -> dict[str, Any]:
scenario = load_scenario_definition(scenario_id)
workflow = self.get_workflow(scenario_id, scenario)
initial_state = {
"input": deepcopy(input_data),
"steps": {},
"tool_calls": [],
}
token = self._run_state_ctx.set(initial_state)
run_state = initial_state
run_output: Any = None
workflow_error: str | None = None
try:
run_output = await workflow.arun(input=input_data)
except Exception as exc: except Exception as exc:
workflow_error = str(exc) finished_at = _utc_now_iso()
finally: error_payload = {
captured = self._run_state_ctx.get() "ok": False,
if isinstance(captured, dict): "tool_name": tool_name,
run_state = deepcopy(captured) "error": str(exc),
self._run_state_ctx.reset(token) "started_at": started_at,
"finished_at": finished_at,
}
session_state.setdefault("steps", {})[step_name] = error_payload
raise RuntimeError(f"{step_name} failed: {exc}") from exc
content = run_output.content if hasattr(run_output, "content") else None return executor
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
if content is None:
step_payloads = run_state.get("steps", {})
if isinstance(step_payloads, dict):
for payload in reversed(list(step_payloads.values())):
if isinstance(payload, dict) and not bool(payload.get("ok", True)):
content = deepcopy(payload)
break
if content is None and workflow_error is not None:
content = {"error": workflow_error}
status = "success"
if workflow_error is not None:
status = "failed"
elif run_output is not None and not bool(getattr(run_output, "success", True)):
status = "failed"
return {
"scenario_id": scenario_id,
"workflow_name": workflow.name,
"status": status,
"input": input_data,
"final_result": content if isinstance(content, dict) else {"raw_content": content},
"steps": run_state.get("steps", {}),
"tool_calls": run_state.get("tool_calls", []),
"run_id": str(getattr(run_output, "run_id", "")) or None,
"session_id": str(getattr(run_output, "session_id", "")) or None,
}
_default_runner: McpWorkflowRunner | None = None def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
if raw_step.get("type") != "tool":
raise ScenarioStoreError("This runner supports only tool steps")
step_name = str(raw_step.get("name", "")).strip()
tool_name = str(raw_step.get("tool", step_name)).strip()
if not step_name or not tool_name:
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
workflow_steps.append(
Step(
name=step_name,
description=str(raw_step.get("description", step_name)),
executor=_build_tool_executor(raw_step),
max_retries=0,
on_error="fail",
)
)
return Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
)
def get_mcp_workflow_runner() -> McpWorkflowRunner: _workflow_cache: dict[str, Workflow] = {}
global _default_runner
if _default_runner is not None:
return _default_runner
_default_runner = McpWorkflowRunner()
return _default_runner
def _extract_output_summary(content: Any) -> str | None: def _get_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
if not isinstance(content, dict): cached = _workflow_cache.get(scenario_id)
if cached is not None:
return cached
workflow = _build_workflow(scenario_id, scenario)
_workflow_cache[scenario_id] = workflow
return workflow
def _extract_output_summary(result: dict[str, Any] | None) -> str | None:
if not isinstance(result, dict):
return None return None
summary = content.get("summary") summary = result.get("summary")
if isinstance(summary, str) and summary: if isinstance(summary, str) and summary:
return summary return summary
payload = content.get("payload") payload = result.get("payload")
if isinstance(payload, dict): if isinstance(payload, dict):
payload_summary = payload.get("summary") payload_summary = payload.get("summary")
if isinstance(payload_summary, str) and payload_summary: if isinstance(payload_summary, str) and payload_summary:
@@ -576,109 +258,128 @@ def _extract_output_summary(content: Any) -> str | None:
return None return None
def _build_step_states_from_minimal( def _build_step_states(
*,
scenario: dict[str, Any], scenario: dict[str, Any],
minimal_steps: dict[str, Any], steps_payloads: dict[str, Any],
) -> list[StepState]: ) -> list[StepState]:
raw_steps = scenario.get("steps") raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list): if not isinstance(raw_steps, list):
return [] return []
step_states: list[StepState] = [] states: list[StepState] = []
for raw_step in raw_steps: for raw_step in raw_steps:
if not isinstance(raw_step, dict): if not isinstance(raw_step, dict):
continue continue
step_name = str(raw_step.get("name", "")).strip() name = str(raw_step.get("name", "")).strip()
if not step_name: if not name:
continue continue
payload = minimal_steps.get(step_name) payload = steps_payloads.get(name)
if not isinstance(payload, dict): if not isinstance(payload, dict):
step_states.append(StepState(node_id=step_name, status="queued")) states.append(
StepState(
node_id=name,
status="queued",
message="",
)
)
continue continue
ok = bool(payload.get("ok", False)) ok = bool(payload.get("ok", False))
step_states.append( states.append(
StepState( StepState(
node_id=step_name, node_id=name,
status="success" if ok else "failed", status="success" if ok else "failed",
started_at=str(payload.get("started_at") or "") or None, started_at=str(payload.get("started_at") or "") or None,
finished_at=str(payload.get("finished_at") or "") or None, finished_at=str(payload.get("finished_at") or "") or None,
error=RunError( message="" if ok else str(payload.get("error", f"{name} failed")),
code="tool_error",
message=str(payload.get("error", f"{step_name} failed")),
)
if not ok
else None,
) )
) )
return step_states return states
async def run_scenario_workflow( async def run_scenario(
*,
scenario_id: str,
input_data: dict[str, Any], input_data: dict[str, Any],
scenario_id: str = "news_source_discovery_v1", ) -> ScenarioRunResponse:
) -> dict[str, Any]:
try: try:
scenario = load_scenario_definition(scenario_id) scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc: except ScenarioStoreError as exc:
return ScenarioRunResponse( return ScenarioRunResponse(
scenario_id=scenario_id, scenario_id=scenario_id,
status="failed", status="failed",
message=str(exc),
input=input_data, input=input_data,
steps=[], )
error=RunError(code="unknown_scenario", message=str(exc)),
).model_dump()
runner = get_mcp_workflow_runner()
scenario_name = str(scenario.get("name", scenario_id)) scenario_name = str(scenario.get("name", scenario_id))
try: try:
minimal_result = await runner.run( workflow = _get_workflow(scenario_id, scenario)
scenario_id=scenario_id, except ScenarioStoreError as exc:
input_data=input_data,
)
except Exception as exc:
return ScenarioRunResponse( return ScenarioRunResponse(
scenario_id=scenario_id, scenario_id=scenario_id,
status="failed", status="failed",
message=str(exc),
input=input_data, input=input_data,
scenario_name=scenario_name, scenario_name=scenario_name,
steps=[], )
error=RunError(code="workflow_error", message=str(exc)),
).model_dump()
minimal_steps = minimal_result.get("steps", {}) # Fresh per-run state that Agno owns during arun(..., session_state=...).
steps = ( session_state: dict[str, Any] = {
minimal_steps "input": deepcopy(input_data),
if isinstance(minimal_steps, dict) "steps": {},
else {} }
)
step_states = _build_step_states_from_minimal( workflow_error: str | None = None
scenario=scenario, run_output: Any = None
minimal_steps=steps, try:
) run_output = await workflow.arun(
input=input_data,
session_state=session_state,
)
except Exception as exc:
workflow_error = str(exc)
steps_payloads = session_state.get("steps", {}) or {}
step_states = _build_step_states(scenario, steps_payloads)
final_result = minimal_result.get("final_result")
normalized_result = (
final_result if isinstance(final_result, dict) else {"raw_content": str(final_result)}
)
status = "success" status = "success"
for payload in steps.values(): if workflow_error is not None:
if isinstance(payload, dict) and not bool(payload.get("ok", False)): status = "failed"
else:
for payload in steps_payloads.values():
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
status = "failed"
break
if run_output is not None and not bool(getattr(run_output, "success", True)):
status = "failed" status = "failed"
break
content = getattr(run_output, "content", None)
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
if content is None:
for payload in reversed(list(steps_payloads.values())):
if isinstance(payload, dict):
content = deepcopy(payload)
break
if content is None and workflow_error is not None:
content = {"message": workflow_error}
result = content if isinstance(content, dict) else {"raw_content": content}
response_message = "" if status == "success" else (workflow_error or "failed")
return ScenarioRunResponse( return ScenarioRunResponse(
scenario_id=scenario_id, scenario_id=scenario_id,
status=status, status=status,
message=response_message,
input=input_data, input=input_data,
steps=step_states, steps=step_states,
output_summary=_extract_output_summary(normalized_result), output_summary=_extract_output_summary(result),
scenario_name=scenario_name, scenario_name=scenario_name,
workflow_name=str(minimal_result.get("workflow_name") or scenario_id), workflow_name=workflow.name,
result=normalized_result, result=result,
error=None run_id=str(getattr(run_output, "run_id", "")) or None,
if status == "success" session_id=str(getattr(run_output, "session_id", "")) or None,
else RunError(code="workflow_failed", message="Workflow finished with failed status."), )
run_id=minimal_result.get("run_id"),
session_id=minimal_result.get("session_id"),
).model_dump()
+2 -7
View File
@@ -8,11 +8,6 @@ RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"] StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
class RunError(BaseModel):
code: str
message: str
class ScenarioRunRequest(BaseModel): class ScenarioRunRequest(BaseModel):
scenario_id: str = "news_source_discovery_v1" scenario_id: str = "news_source_discovery_v1"
input: dict[str, Any] = Field(default_factory=dict) input: dict[str, Any] = Field(default_factory=dict)
@@ -23,18 +18,18 @@ class StepState(BaseModel):
status: StepStatus status: StepStatus
started_at: str | None = None started_at: str | None = None
finished_at: str | None = None finished_at: str | None = None
error: RunError | None = None message: str = ""
class ScenarioRunResponse(BaseModel): class ScenarioRunResponse(BaseModel):
scenario_id: str scenario_id: str
status: RunStatus status: RunStatus
message: str = ""
input: dict[str, Any] input: dict[str, Any]
steps: list[StepState] = Field(default_factory=list) steps: list[StepState] = Field(default_factory=list)
output_summary: str | None = None output_summary: str | None = None
scenario_name: str | None = None scenario_name: str | None = None
workflow_name: str | None = None workflow_name: str | None = None
result: dict[str, Any] | None = None result: dict[str, Any] | None = None
error: RunError | None = None
run_id: str | None = None run_id: str | None = None
session_id: str | None = None session_id: str | None = None
+129
View File
@@ -0,0 +1,129 @@
from __future__ import annotations
from copy import deepcopy
import json
import os
from typing import Any
from openai import AsyncOpenAI
_planner_client: AsyncOpenAI | None = None
def _env_float(name: str, default: float) -> float:
value = os.getenv(name)
if value is None:
return default
return float(value)
def planner_enabled() -> bool:
return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"}
def _get_client() -> AsyncOpenAI:
global _planner_client
if _planner_client is not None:
return _planner_client
_planner_client = AsyncOpenAI(
base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"),
api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"),
)
return _planner_client
def _response_schema(required_fields: list[str]) -> dict[str, Any]:
value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]}
return {
"name": "mcp_arguments",
"strict": True,
"schema": {
"type": "object",
"properties": {
"arguments": {
"type": "object",
"properties": {f: value_schema for f in required_fields},
"required": required_fields,
"additionalProperties": True,
}
},
"required": ["arguments"],
"additionalProperties": False,
},
}
def _extract_arguments(content: Any) -> dict[str, Any]:
candidate: Any = content
if isinstance(candidate, str):
text = candidate.strip()
if text.startswith("```"):
text = text.strip("`").strip()
if text.startswith("json"):
text = text[4:].strip()
try:
candidate = json.loads(text)
except json.JSONDecodeError:
return {}
if isinstance(candidate, dict):
if isinstance(candidate.get("arguments"), dict):
return candidate["arguments"]
return candidate
return {}
async def plan_arguments(
*,
step_name: str,
tool_name: str,
base_arguments: dict[str, Any],
required_fields: list[str],
scope: dict[str, Any],
missing_fields: list[str],
attempt_no: int,
) -> dict[str, Any]:
"""Fallback planner: asks an LLM to fill missing required fields from context.
Returns merged arguments (base + planned). On any failure returns base_arguments
unchanged — caller is responsible for validating required fields afterwards.
"""
prompt = {
"task": "Prepare MCP arguments for this step.",
"step_name": step_name,
"tool_name": tool_name,
"required_fields": required_fields,
"base_arguments": base_arguments,
"missing_fields": missing_fields,
"repair_attempt": attempt_no,
"context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})},
"output": (
"Return only JSON object with key 'arguments'. "
"Fill every missing field from context."
),
}
try:
completion = await _get_client().chat.completions.create(
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
messages=[
{
"role": "system",
"content": (
"You are a tool-input planner. "
"Return only JSON that matches the provided schema."
),
},
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
],
response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)},
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
)
raw = completion.choices[0].message.content if completion.choices else ""
planned = _extract_arguments(raw)
except Exception:
planned = {}
merged = deepcopy(base_arguments)
if isinstance(planned, dict):
merged.update(planned)
return merged
+51
View File
@@ -0,0 +1,51 @@
from __future__ import annotations
from copy import deepcopy
from typing import Any
def resolve_path(scope: dict[str, Any], path: str) -> Any:
value: Any = scope
for segment in path.split("."):
key = segment.strip()
if not key:
continue
if not isinstance(value, dict):
return None
value = value.get(key)
return deepcopy(value)
def resolve_template(template: Any, scope: dict[str, Any]) -> Any:
if isinstance(template, dict):
if set(template.keys()) == {"from"}:
return resolve_path(scope, str(template["from"]))
return {key: resolve_template(value, scope) for key, value in template.items()}
if isinstance(template, list):
return [resolve_template(item, scope) for item in template]
return deepcopy(template)
def missing_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
) -> list[str]:
missing: list[str] = []
for field in required_fields:
value = arguments.get(field)
if isinstance(value, str) and value.strip():
continue
if value not in (None, "", [], {}):
continue
missing.append(field)
return missing
def validate_required_fields(
arguments: dict[str, Any],
required_fields: list[str],
step_name: str,
) -> None:
missing = missing_required_fields(arguments, required_fields)
if missing:
raise ValueError(f"{step_name}: missing required fields: {', '.join(missing)}")