Упрощение MCP workflow runner и обновить контракт /api/runs.
Перенесены planner/template хелперы в отдельные модули, выровнен формат статусов и сообщений в ответе, а также обновлены .env.example и README под текущие переменные и поведение API.
This commit is contained in:
+18
-3
@@ -1,18 +1,33 @@
|
|||||||
|
# Agent
|
||||||
AGENT_ID=prisma-agent
|
AGENT_ID=prisma-agent
|
||||||
OLLAMA_MODEL_ID=gemma4:31b
|
|
||||||
OLLAMA_HOST=http://localhost:11435
|
|
||||||
OLLAMA_TEMPERATURE=0
|
|
||||||
AGENT_MARKDOWN=false
|
AGENT_MARKDOWN=false
|
||||||
AGENT_DEBUG_MODE=true
|
AGENT_DEBUG_MODE=true
|
||||||
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
|
AGENT_INSTRUCTIONS="You are a helpful assistant. Answer briefly and clearly."
|
||||||
|
|
||||||
|
# Agent model (Ollama)
|
||||||
|
OLLAMA_MODEL_ID=gemma4:31b
|
||||||
|
OLLAMA_HOST=http://localhost:11435
|
||||||
|
OLLAMA_TEMPERATURE=0
|
||||||
|
|
||||||
|
# API runtime
|
||||||
AGENT_OS_HOST=127.0.0.1
|
AGENT_OS_HOST=127.0.0.1
|
||||||
AGENT_OS_PORT=7777
|
AGENT_OS_PORT=7777
|
||||||
|
|
||||||
|
# Planner
|
||||||
|
PLANNER_ENABLED=false
|
||||||
|
PLANNER_REPAIR_ATTEMPTS=3
|
||||||
|
|
||||||
|
# Planner model (Polza)
|
||||||
POLZA_BASE_URL=https://api.polza.ai/v1
|
POLZA_BASE_URL=https://api.polza.ai/v1
|
||||||
POLZA_MODEL_ID=google/gemma-4-31b-it
|
POLZA_MODEL_ID=google/gemma-4-31b-it
|
||||||
POLZA_API_KEY=key
|
POLZA_API_KEY=key
|
||||||
POLZA_TEMPERATURE=0
|
POLZA_TEMPERATURE=0
|
||||||
|
|
||||||
|
# MCP
|
||||||
MCP_BASE_URL=http://127.0.0.1:8081/mcp
|
MCP_BASE_URL=http://127.0.0.1:8081/mcp
|
||||||
MCP_TIMEOUT_SECONDS=10
|
MCP_TIMEOUT_SECONDS=10
|
||||||
|
|
||||||
|
# Observability (Phoenix)
|
||||||
PHOENIX_TRACING_ENABLED=false
|
PHOENIX_TRACING_ENABLED=false
|
||||||
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
|
PHOENIX_COLLECTOR_ENDPOINT=http://localhost:6006
|
||||||
PHOENIX_PROJECT_NAME=prisma-platform
|
PHOENIX_PROJECT_NAME=prisma-platform
|
||||||
|
|||||||
@@ -25,7 +25,8 @@ prisma_platform/
|
|||||||
├── scenarios/
|
├── scenarios/
|
||||||
│ ├── index.json
|
│ ├── index.json
|
||||||
│ └── news_source_discovery/
|
│ └── news_source_discovery/
|
||||||
│ └── v1.json
|
│ ├── v1.json
|
||||||
|
│ └── v1_planner_repair.json
|
||||||
└── src/
|
└── src/
|
||||||
├── __init__.py
|
├── __init__.py
|
||||||
├── api_routes.py
|
├── api_routes.py
|
||||||
@@ -35,6 +36,8 @@ prisma_platform/
|
|||||||
├── mcp_workflow_runner.py
|
├── mcp_workflow_runner.py
|
||||||
├── observability.py
|
├── observability.py
|
||||||
├── scenario_store.py
|
├── scenario_store.py
|
||||||
|
├── step_planner.py
|
||||||
|
├── template.py
|
||||||
└── schemas.py
|
└── schemas.py
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -101,22 +104,39 @@ curl -s -X POST "http://127.0.0.1:7777/api/runs" \
|
|||||||
Успешный ответ содержит:
|
Успешный ответ содержит:
|
||||||
|
|
||||||
- `status=success`
|
- `status=success`
|
||||||
- список `steps` со статусами шагов
|
- `message=""`
|
||||||
|
- список `steps` со статусами и временем шагов
|
||||||
- `output_summary`
|
- `output_summary`
|
||||||
- `result` итогового шага
|
- `result` итогового шага
|
||||||
|
|
||||||
|
При ошибке:
|
||||||
|
|
||||||
|
- `status=failed`
|
||||||
|
- `message` содержит текст ошибки
|
||||||
|
|
||||||
## Переменные окружения
|
## Переменные окружения
|
||||||
|
|
||||||
Основные:
|
Agent:
|
||||||
|
|
||||||
- `AGENT_ID` (default: `prisma-agent`)
|
- `AGENT_ID` (default: `prisma-agent`)
|
||||||
- `AGENT_MARKDOWN` (default: `false`)
|
- `AGENT_MARKDOWN` (default: `false`)
|
||||||
- `AGENT_DEBUG_MODE` (default: `true`)
|
- `AGENT_DEBUG_MODE` (default: `true`)
|
||||||
- `AGENT_INSTRUCTIONS`
|
- `AGENT_INSTRUCTIONS`
|
||||||
|
- `OLLAMA_MODEL_ID` (default: `gemma4:31b`)
|
||||||
|
- `OLLAMA_HOST` (default: `http://localhost:11435`)
|
||||||
|
- `OLLAMA_TEMPERATURE` (default: `0`)
|
||||||
|
|
||||||
|
API runtime:
|
||||||
|
|
||||||
- `AGENT_OS_HOST` (default: `127.0.0.1`)
|
- `AGENT_OS_HOST` (default: `127.0.0.1`)
|
||||||
- `AGENT_OS_PORT` (default: `7777`)
|
- `AGENT_OS_PORT` (default: `7777`)
|
||||||
|
|
||||||
Planner-модель (`polza.ai`):
|
Planner:
|
||||||
|
|
||||||
|
- `PLANNER_ENABLED` (default: `false`)
|
||||||
|
- `PLANNER_REPAIR_ATTEMPTS` (default: `3`)
|
||||||
|
|
||||||
|
Planner model (`polza.ai`):
|
||||||
|
|
||||||
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
|
- `POLZA_BASE_URL` (default: `https://api.polza.ai/v1`)
|
||||||
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
|
- `POLZA_MODEL_ID` (default: `google/gemma-4-31b-it`)
|
||||||
|
|||||||
+4
-5
@@ -1,15 +1,14 @@
|
|||||||
from fastapi import APIRouter
|
from fastapi import APIRouter
|
||||||
|
|
||||||
from src.mcp_workflow_runner import run_scenario_workflow
|
from src.mcp_workflow_runner import run_scenario
|
||||||
from src.schemas import ScenarioRunRequest, ScenarioRunResponse
|
from src.schemas import ScenarioRunRequest, ScenarioRunResponse
|
||||||
|
|
||||||
router = APIRouter(prefix="/api", tags=["workflow"])
|
router = APIRouter(prefix="/api", tags=["workflow"])
|
||||||
|
|
||||||
|
|
||||||
@router.post("/runs", response_model=ScenarioRunResponse)
|
@router.post("/runs", response_model=ScenarioRunResponse)
|
||||||
async def run_scenario(request: ScenarioRunRequest) -> ScenarioRunResponse:
|
async def post_run(request: ScenarioRunRequest) -> ScenarioRunResponse:
|
||||||
result = await run_scenario_workflow(
|
return await run_scenario(
|
||||||
input_data=request.input,
|
|
||||||
scenario_id=request.scenario_id,
|
scenario_id=request.scenario_id,
|
||||||
|
input_data=request.input,
|
||||||
)
|
)
|
||||||
return ScenarioRunResponse.model_validate(result)
|
|
||||||
|
|||||||
+202
-501
@@ -1,387 +1,145 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from contextvars import ContextVar
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
from typing import Any
|
from typing import Any, Awaitable, Callable
|
||||||
|
|
||||||
from agno.workflow.step import Step, StepInput, StepOutput
|
from agno.workflow.step import Step, StepInput, StepOutput
|
||||||
from agno.workflow.workflow import Workflow
|
from agno.workflow.workflow import Workflow
|
||||||
from openai import AsyncOpenAI
|
|
||||||
|
|
||||||
from src.mcp_client import call_mcp_tool
|
from src.mcp_client import call_mcp_tool
|
||||||
from src.schemas import RunError, ScenarioRunResponse, StepState
|
from src.schemas import ScenarioRunResponse, StepState
|
||||||
from src.scenario_store import ScenarioStoreError, load_scenario_definition
|
from src.scenario_store import ScenarioStoreError, load_scenario_definition
|
||||||
|
from src.step_planner import plan_arguments, planner_enabled
|
||||||
_planner_client: AsyncOpenAI | None = None
|
from src.template import (
|
||||||
|
missing_required_fields,
|
||||||
|
resolve_path,
|
||||||
def _env_float(name: str, default: float) -> float:
|
resolve_template,
|
||||||
value = os.getenv(name)
|
validate_required_fields,
|
||||||
if value is None:
|
)
|
||||||
return default
|
|
||||||
return float(value)
|
|
||||||
|
|
||||||
|
|
||||||
def _env_int(name: str, default: int) -> int:
|
def _env_int(name: str, default: int) -> int:
|
||||||
value = os.getenv(name)
|
value = os.getenv(name)
|
||||||
if value is None:
|
return int(value) if value is not None else default
|
||||||
return default
|
|
||||||
return int(value)
|
|
||||||
|
|
||||||
|
|
||||||
def _utc_now_iso() -> str:
|
def _utc_now_iso() -> str:
|
||||||
return datetime.now(timezone.utc).isoformat()
|
return datetime.now(timezone.utc).isoformat()
|
||||||
|
|
||||||
|
|
||||||
def get_shared_step_planner_client() -> AsyncOpenAI:
|
def _build_scope(session_state: dict[str, Any]) -> dict[str, Any]:
|
||||||
global _planner_client
|
|
||||||
if _planner_client is not None:
|
|
||||||
return _planner_client
|
|
||||||
|
|
||||||
polza_base_url = os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1")
|
|
||||||
polza_api_key = os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY")
|
|
||||||
_planner_client = AsyncOpenAI(
|
|
||||||
base_url=polza_base_url,
|
|
||||||
api_key=polza_api_key,
|
|
||||||
)
|
|
||||||
return _planner_client
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_path(scope: dict[str, Any], path: str) -> Any:
|
|
||||||
value: Any = scope
|
|
||||||
for segment in path.split("."):
|
|
||||||
key = segment.strip()
|
|
||||||
if not key:
|
|
||||||
continue
|
|
||||||
if not isinstance(value, dict):
|
|
||||||
return None
|
|
||||||
value = value.get(key)
|
|
||||||
return deepcopy(value)
|
|
||||||
|
|
||||||
|
|
||||||
def _resolve_template(template: Any, scope: dict[str, Any]) -> Any:
|
|
||||||
if isinstance(template, dict):
|
|
||||||
if set(template.keys()) == {"from"}:
|
|
||||||
return _resolve_path(scope, str(template["from"]))
|
|
||||||
return {key: _resolve_template(value, scope) for key, value in template.items()}
|
|
||||||
if isinstance(template, list):
|
|
||||||
return [_resolve_template(item, scope) for item in template]
|
|
||||||
return deepcopy(template)
|
|
||||||
|
|
||||||
|
|
||||||
def _validate_required_fields(
|
|
||||||
arguments: dict[str, Any],
|
|
||||||
required_fields: list[str],
|
|
||||||
step_name: str,
|
|
||||||
) -> None:
|
|
||||||
missing_fields: list[str] = []
|
|
||||||
for field in required_fields:
|
|
||||||
value = arguments.get(field)
|
|
||||||
if isinstance(value, str) and value.strip():
|
|
||||||
continue
|
|
||||||
if value not in (None, "", [], {}):
|
|
||||||
continue
|
|
||||||
missing_fields.append(field)
|
|
||||||
if missing_fields:
|
|
||||||
fields_str = ", ".join(missing_fields)
|
|
||||||
raise ValueError(f"{step_name}: missing required fields: {fields_str}")
|
|
||||||
|
|
||||||
|
|
||||||
def _missing_required_fields(arguments: dict[str, Any], required_fields: list[str]) -> list[str]:
|
|
||||||
missing_fields: list[str] = []
|
|
||||||
for field in required_fields:
|
|
||||||
value = arguments.get(field)
|
|
||||||
if isinstance(value, str) and value.strip():
|
|
||||||
continue
|
|
||||||
if value not in (None, "", [], {}):
|
|
||||||
continue
|
|
||||||
missing_fields.append(field)
|
|
||||||
return missing_fields
|
|
||||||
|
|
||||||
|
|
||||||
def _build_arguments_schema(required_fields: list[str]) -> dict[str, Any]:
|
|
||||||
properties = {field: {"type": "any"} for field in required_fields}
|
|
||||||
return {
|
return {
|
||||||
"type": "object",
|
"input": session_state.get("input", {}),
|
||||||
"required": required_fields,
|
"steps": session_state.get("steps", {}),
|
||||||
"properties": properties,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def _build_polza_response_schema(required_fields: list[str]) -> dict[str, Any]:
|
async def _prepare_arguments(
|
||||||
value_schema: dict[str, Any] = {
|
|
||||||
"type": ["string", "number", "boolean", "array", "object", "null"]
|
|
||||||
}
|
|
||||||
arguments_properties = {field: value_schema for field in required_fields}
|
|
||||||
return {
|
|
||||||
"name": "mcp_arguments",
|
|
||||||
"strict": True,
|
|
||||||
"schema": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": {
|
|
||||||
"arguments": {
|
|
||||||
"type": "object",
|
|
||||||
"properties": arguments_properties,
|
|
||||||
"required": required_fields,
|
|
||||||
"additionalProperties": True,
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"required": ["arguments"],
|
|
||||||
"additionalProperties": False,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_planned_arguments(content: Any) -> dict[str, Any]:
|
|
||||||
candidate: Any = content
|
|
||||||
if isinstance(candidate, str):
|
|
||||||
text = candidate.strip()
|
|
||||||
if text.startswith("```"):
|
|
||||||
text = text.strip("`").strip()
|
|
||||||
if text.startswith("json"):
|
|
||||||
text = text[4:].strip()
|
|
||||||
try:
|
|
||||||
candidate = json.loads(text)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
return {}
|
|
||||||
|
|
||||||
if isinstance(candidate, dict):
|
|
||||||
if isinstance(candidate.get("arguments"), dict):
|
|
||||||
return candidate["arguments"]
|
|
||||||
# Some models return the arguments object directly.
|
|
||||||
return candidate
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
|
|
||||||
class McpWorkflowRunner:
|
|
||||||
"""
|
|
||||||
Minimal workflow runner:
|
|
||||||
- fixed step order from scenario
|
|
||||||
- same planner agent in every step
|
|
||||||
- MCP call executed by code, not by the agent
|
|
||||||
- request/response persisted in run context
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
self._workflow_cache: dict[str, Workflow] = {}
|
|
||||||
self._planner_repair_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
|
|
||||||
self._run_state_ctx: ContextVar[dict[str, Any] | None] = ContextVar(
|
|
||||||
"mcp_workflow_run_state",
|
|
||||||
default=None,
|
|
||||||
)
|
|
||||||
|
|
||||||
def _get_run_state(self) -> dict[str, Any]:
|
|
||||||
run_state = self._run_state_ctx.get()
|
|
||||||
if run_state is None:
|
|
||||||
raise RuntimeError("run state is not initialized")
|
|
||||||
return run_state
|
|
||||||
|
|
||||||
def _build_scope(self) -> dict[str, Any]:
|
|
||||||
run_state = self._get_run_state()
|
|
||||||
return {
|
|
||||||
"input": run_state.get("input", {}),
|
|
||||||
"steps": run_state.get("steps", {}),
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _plan_arguments(
|
|
||||||
self,
|
|
||||||
*,
|
*,
|
||||||
step_name: str,
|
step_name: str,
|
||||||
tool_name: str,
|
tool_name: str,
|
||||||
base_arguments: dict[str, Any],
|
base_arguments: dict[str, Any],
|
||||||
required_fields: list[str],
|
required_fields: list[str],
|
||||||
scope: dict[str, Any],
|
scope: dict[str, Any],
|
||||||
planner_cache: dict[str, dict[str, Any]] | None = None,
|
) -> dict[str, Any]:
|
||||||
missing_fields: list[str] | None = None,
|
final_arguments = deepcopy(base_arguments)
|
||||||
attempt_no: int = 1,
|
missing = missing_required_fields(final_arguments, required_fields)
|
||||||
) -> dict[str, Any]:
|
if missing and planner_enabled():
|
||||||
cache_key: str | None = None
|
max_attempts = _env_int("PLANNER_REPAIR_ATTEMPTS", 3)
|
||||||
if planner_cache is not None:
|
for attempt in range(1, max_attempts + 1):
|
||||||
try:
|
final_arguments = await plan_arguments(
|
||||||
cache_payload = {
|
step_name=step_name,
|
||||||
"tool_name": tool_name,
|
tool_name=tool_name,
|
||||||
"base_arguments": base_arguments,
|
base_arguments=final_arguments,
|
||||||
"required_fields": required_fields,
|
required_fields=required_fields,
|
||||||
"missing_fields": missing_fields or [],
|
scope=scope,
|
||||||
"attempt_no": attempt_no,
|
missing_fields=missing,
|
||||||
}
|
attempt_no=attempt,
|
||||||
cache_key = json.dumps(cache_payload, sort_keys=True, ensure_ascii=False)
|
|
||||||
except TypeError:
|
|
||||||
cache_key = None
|
|
||||||
if cache_key is not None and cache_key in planner_cache:
|
|
||||||
return deepcopy(planner_cache[cache_key])
|
|
||||||
|
|
||||||
planner_context = {
|
|
||||||
"input": scope.get("input", {}),
|
|
||||||
"steps": scope.get("steps", {}),
|
|
||||||
}
|
|
||||||
for key, value in scope.items():
|
|
||||||
if key in {"input", "steps"}:
|
|
||||||
continue
|
|
||||||
planner_context[key] = value
|
|
||||||
|
|
||||||
prompt = {
|
|
||||||
"task": "Prepare MCP arguments for this step.",
|
|
||||||
"step_name": step_name,
|
|
||||||
"tool_name": tool_name,
|
|
||||||
"required_fields": required_fields,
|
|
||||||
"base_arguments": base_arguments,
|
|
||||||
"missing_fields": missing_fields or [],
|
|
||||||
"repair_attempt": attempt_no,
|
|
||||||
"arguments_schema": _build_arguments_schema(required_fields),
|
|
||||||
"context": planner_context,
|
|
||||||
"response_contract": {
|
|
||||||
"must_return": {"arguments": "object"},
|
|
||||||
"must_include_fields": missing_fields or [],
|
|
||||||
"forbidden": "extra unrelated keys",
|
|
||||||
},
|
|
||||||
"output": (
|
|
||||||
"Return only JSON object with key 'arguments'. "
|
|
||||||
"If missing_fields is not empty, fill every missing field from context."
|
|
||||||
),
|
|
||||||
}
|
|
||||||
prompt_json = json.dumps(prompt, ensure_ascii=False)
|
|
||||||
planned: dict[str, Any] = {}
|
|
||||||
|
|
||||||
# Primary path: strict structured output via Polza response_format/json_schema.
|
|
||||||
try:
|
|
||||||
completion = await get_shared_step_planner_client().chat.completions.create(
|
|
||||||
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
|
|
||||||
messages=[
|
|
||||||
{
|
|
||||||
"role": "system",
|
|
||||||
"content": (
|
|
||||||
"You are a tool-input planner. "
|
|
||||||
"Return only JSON that matches the provided schema."
|
|
||||||
),
|
|
||||||
},
|
|
||||||
{"role": "user", "content": prompt_json},
|
|
||||||
],
|
|
||||||
response_format={
|
|
||||||
"type": "json_schema",
|
|
||||||
"json_schema": _build_polza_response_schema(required_fields),
|
|
||||||
},
|
|
||||||
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
|
|
||||||
)
|
)
|
||||||
raw_content = completion.choices[0].message.content if completion.choices else ""
|
missing = missing_required_fields(final_arguments, required_fields)
|
||||||
planned = _extract_planned_arguments(raw_content)
|
if not missing:
|
||||||
except Exception:
|
break
|
||||||
planned = {}
|
validate_required_fields(final_arguments, required_fields, step_name)
|
||||||
|
return final_arguments
|
||||||
|
|
||||||
if not isinstance(planned, dict):
|
|
||||||
planned = {}
|
|
||||||
|
|
||||||
# Allow planner to override/fill base arguments while keeping known defaults.
|
async def _execute_one_call(
|
||||||
merged = deepcopy(base_arguments)
|
*,
|
||||||
merged.update(planned)
|
step_name: str,
|
||||||
if planner_cache is not None and cache_key is not None:
|
tool_name: str,
|
||||||
planner_cache[cache_key] = deepcopy(merged)
|
required_fields: list[str],
|
||||||
return merged
|
input_template: Any,
|
||||||
|
scope: dict[str, Any],
|
||||||
|
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||||
|
resolved = resolve_template(input_template, scope)
|
||||||
|
base_arguments = resolved if isinstance(resolved, dict) else {}
|
||||||
|
arguments = await _prepare_arguments(
|
||||||
|
step_name=step_name,
|
||||||
|
tool_name=tool_name,
|
||||||
|
base_arguments=base_arguments,
|
||||||
|
required_fields=required_fields,
|
||||||
|
scope=scope,
|
||||||
|
)
|
||||||
|
tool_response = await call_mcp_tool(tool_name, arguments)
|
||||||
|
return arguments, tool_response
|
||||||
|
|
||||||
def _build_tool_step_executor(self, step_spec: dict[str, Any]):
|
|
||||||
|
def _build_tool_executor(
|
||||||
|
step_spec: dict[str, Any],
|
||||||
|
) -> Callable[[StepInput, dict[str, Any]], Awaitable[StepOutput]]:
|
||||||
step_name = str(step_spec["name"])
|
step_name = str(step_spec["name"])
|
||||||
tool_name = str(step_spec["tool"])
|
tool_name = str(step_spec["tool"])
|
||||||
input_template = step_spec.get("input", {})
|
input_template = step_spec.get("input", {})
|
||||||
foreach_spec = step_spec.get("foreach")
|
foreach_spec = step_spec.get("foreach")
|
||||||
collect_template = step_spec.get("collect")
|
collect_template = step_spec.get("collect")
|
||||||
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
|
collect_key = str(step_spec.get("collect_key", "items")).strip() or "items"
|
||||||
required_fields_raw = step_spec.get("required_input_fields", [])
|
required_fields = [
|
||||||
required_fields = (
|
f for f in step_spec.get("required_input_fields", []) if isinstance(f, str)
|
||||||
[field for field in required_fields_raw if isinstance(field, str)]
|
]
|
||||||
if isinstance(required_fields_raw, list)
|
|
||||||
else []
|
|
||||||
)
|
|
||||||
if isinstance(foreach_spec, dict):
|
if isinstance(foreach_spec, dict):
|
||||||
source_path = str(foreach_spec.get("from", "")).strip()
|
foreach_from = str(foreach_spec.get("from", "")).strip()
|
||||||
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
|
item_alias = str(foreach_spec.get("as", "item")).strip() or "item"
|
||||||
else:
|
else:
|
||||||
source_path = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
|
foreach_from = str(foreach_spec).strip() if isinstance(foreach_spec, str) else ""
|
||||||
item_alias = "item"
|
item_alias = "item"
|
||||||
|
|
||||||
async def _executor(_step_input: StepInput) -> StepOutput:
|
async def executor(_step_input: StepInput, session_state: dict[str, Any]) -> StepOutput:
|
||||||
run_state = self._get_run_state()
|
started_at = _utc_now_iso()
|
||||||
scope = self._build_scope()
|
scope = _build_scope(session_state)
|
||||||
step_started_at = _utc_now_iso()
|
|
||||||
planner_cache: dict[str, dict[str, Any]] = {}
|
|
||||||
|
|
||||||
async def _prepare_arguments(
|
|
||||||
*,
|
|
||||||
local_scope: dict[str, Any],
|
|
||||||
local_base_arguments: dict[str, Any],
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
final_arguments = deepcopy(local_base_arguments)
|
|
||||||
for repair_attempt in range(1, self._planner_repair_attempts + 1):
|
|
||||||
missing_fields = _missing_required_fields(final_arguments, required_fields)
|
|
||||||
if not missing_fields:
|
|
||||||
break
|
|
||||||
final_arguments = await self._plan_arguments(
|
|
||||||
step_name=step_name,
|
|
||||||
tool_name=tool_name,
|
|
||||||
base_arguments=final_arguments,
|
|
||||||
required_fields=required_fields,
|
|
||||||
scope=local_scope,
|
|
||||||
planner_cache=planner_cache,
|
|
||||||
missing_fields=missing_fields,
|
|
||||||
attempt_no=repair_attempt,
|
|
||||||
)
|
|
||||||
_validate_required_fields(final_arguments, required_fields, step_name)
|
|
||||||
return final_arguments
|
|
||||||
|
|
||||||
async def _call_tool_with_repair(
|
|
||||||
*,
|
|
||||||
initial_arguments: dict[str, Any],
|
|
||||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
|
||||||
final_arguments = deepcopy(initial_arguments)
|
|
||||||
tool_response = await call_mcp_tool(tool_name, final_arguments)
|
|
||||||
return tool_response, final_arguments
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
tool_calls = run_state.setdefault("tool_calls", [])
|
if foreach_from:
|
||||||
if not isinstance(tool_calls, list):
|
iterable = resolve_path(scope, foreach_from)
|
||||||
tool_calls = []
|
|
||||||
run_state["tool_calls"] = tool_calls
|
|
||||||
|
|
||||||
if source_path:
|
|
||||||
iterable = _resolve_path(scope, source_path)
|
|
||||||
if not isinstance(iterable, list):
|
if not isinstance(iterable, list):
|
||||||
raise ValueError(f"{step_name}: foreach source is not list")
|
raise ValueError(f"{step_name}: foreach source is not list")
|
||||||
|
|
||||||
collected_items: list[Any] = []
|
collected: list[Any] = []
|
||||||
|
iteration_requests: list[dict[str, Any]] = []
|
||||||
|
iteration_responses: list[dict[str, Any]] = []
|
||||||
|
last_received_at: str | None = None
|
||||||
for index, item in enumerate(iterable):
|
for index, item in enumerate(iterable):
|
||||||
iteration_scope = dict(scope)
|
iteration_scope = {**scope, item_alias: item, "item": item, "index": index}
|
||||||
iteration_scope[item_alias] = item
|
arguments, tool_response = await _execute_one_call(
|
||||||
iteration_scope["item"] = item
|
step_name=step_name,
|
||||||
iteration_scope["index"] = index
|
tool_name=tool_name,
|
||||||
|
required_fields=required_fields,
|
||||||
resolved = _resolve_template(input_template, iteration_scope)
|
input_template=input_template,
|
||||||
base_arguments = resolved if isinstance(resolved, dict) else {}
|
scope=iteration_scope,
|
||||||
final_arguments = await _prepare_arguments(
|
|
||||||
local_scope=iteration_scope,
|
|
||||||
local_base_arguments=base_arguments,
|
|
||||||
)
|
)
|
||||||
tool_response, final_arguments = await _call_tool_with_repair(
|
iteration_requests.append(arguments)
|
||||||
initial_arguments=final_arguments,
|
iteration_responses.append(tool_response)
|
||||||
)
|
received_at = tool_response.get("received_at")
|
||||||
tool_calls.append(
|
if isinstance(received_at, str) and received_at:
|
||||||
{
|
last_received_at = received_at
|
||||||
"step_name": step_name,
|
|
||||||
"tool_name": tool_name,
|
|
||||||
"attempt": index + 1,
|
|
||||||
"request": final_arguments,
|
|
||||||
"ok": True,
|
|
||||||
"response": tool_response,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if collect_template is None:
|
if collect_template is None:
|
||||||
collected_items.append(tool_response.get("payload", {}))
|
collected.append(tool_response.get("payload", {}))
|
||||||
else:
|
else:
|
||||||
collected_items.append(
|
collected.append(
|
||||||
_resolve_template(
|
resolve_template(
|
||||||
collect_template,
|
collect_template,
|
||||||
{**iteration_scope, "tool": tool_response},
|
{**iteration_scope, "tool": tool_response},
|
||||||
)
|
)
|
||||||
@@ -390,75 +148,57 @@ class McpWorkflowRunner:
|
|||||||
step_payload = {
|
step_payload = {
|
||||||
"ok": True,
|
"ok": True,
|
||||||
"tool_name": tool_name,
|
"tool_name": tool_name,
|
||||||
"payload": {collect_key: collected_items},
|
"payload": {collect_key: collected},
|
||||||
"request": {"foreach_from": source_path, "count": len(iterable)},
|
"request": {
|
||||||
"received_at": _utc_now_iso(),
|
"foreach_from": foreach_from,
|
||||||
"started_at": step_started_at,
|
"count": len(iterable),
|
||||||
|
"items": iteration_requests,
|
||||||
|
},
|
||||||
|
"response": {"items": iteration_responses},
|
||||||
|
"received_at": last_received_at,
|
||||||
|
"started_at": started_at,
|
||||||
"finished_at": _utc_now_iso(),
|
"finished_at": _utc_now_iso(),
|
||||||
}
|
}
|
||||||
else:
|
else:
|
||||||
resolved = _resolve_template(input_template, scope)
|
arguments, tool_response = await _execute_one_call(
|
||||||
base_arguments = resolved if isinstance(resolved, dict) else {}
|
step_name=step_name,
|
||||||
final_arguments = await _prepare_arguments(
|
tool_name=tool_name,
|
||||||
local_scope=scope,
|
required_fields=required_fields,
|
||||||
local_base_arguments=base_arguments,
|
input_template=input_template,
|
||||||
)
|
scope=scope,
|
||||||
tool_response, final_arguments = await _call_tool_with_repair(
|
|
||||||
initial_arguments=final_arguments,
|
|
||||||
)
|
)
|
||||||
step_payload = {
|
step_payload = {
|
||||||
"ok": bool(tool_response.get("ok", True)),
|
"ok": bool(tool_response.get("ok", True)),
|
||||||
"tool_name": tool_name,
|
"tool_name": tool_name,
|
||||||
"payload": tool_response.get("payload", {}),
|
"payload": tool_response.get("payload", {}),
|
||||||
"request": final_arguments,
|
"request": arguments,
|
||||||
"response": tool_response,
|
"response": tool_response,
|
||||||
"received_at": tool_response.get("received_at"),
|
"received_at": tool_response.get("received_at"),
|
||||||
"started_at": step_started_at,
|
"started_at": started_at,
|
||||||
"finished_at": _utc_now_iso(),
|
"finished_at": _utc_now_iso(),
|
||||||
}
|
}
|
||||||
tool_calls.append(
|
|
||||||
{
|
|
||||||
"step_name": step_name,
|
|
||||||
"tool_name": tool_name,
|
|
||||||
"request": final_arguments,
|
|
||||||
"ok": True,
|
|
||||||
"response": tool_response,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
run_state.setdefault("steps", {})[step_name] = step_payload
|
session_state.setdefault("steps", {})[step_name] = step_payload
|
||||||
return StepOutput(
|
return StepOutput(
|
||||||
content=json.dumps(step_payload, ensure_ascii=False),
|
content=json.dumps(step_payload, ensure_ascii=False),
|
||||||
success=True,
|
success=True,
|
||||||
)
|
)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
finished_at = _utc_now_iso()
|
||||||
error_payload = {
|
error_payload = {
|
||||||
"ok": False,
|
"ok": False,
|
||||||
"tool_name": tool_name,
|
"tool_name": tool_name,
|
||||||
"request": {},
|
|
||||||
"error": str(exc),
|
"error": str(exc),
|
||||||
"started_at": step_started_at,
|
"started_at": started_at,
|
||||||
"finished_at": _utc_now_iso(),
|
"finished_at": finished_at,
|
||||||
}
|
}
|
||||||
run_state.setdefault("steps", {})[step_name] = error_payload
|
session_state.setdefault("steps", {})[step_name] = error_payload
|
||||||
run_state.setdefault("tool_calls", []).append(
|
|
||||||
{
|
|
||||||
"step_name": step_name,
|
|
||||||
"tool_name": tool_name,
|
|
||||||
"request": {},
|
|
||||||
"ok": False,
|
|
||||||
"error": str(exc),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
raise RuntimeError(f"{step_name} failed: {exc}") from exc
|
raise RuntimeError(f"{step_name} failed: {exc}") from exc
|
||||||
|
|
||||||
return _executor
|
return executor
|
||||||
|
|
||||||
def get_workflow(self, scenario_id: str, scenario: dict[str, Any]) -> Workflow:
|
|
||||||
cached = self._workflow_cache.get(scenario_id)
|
|
||||||
if cached is not None:
|
|
||||||
return cached
|
|
||||||
|
|
||||||
|
def _build_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
|
||||||
raw_steps = scenario.get("steps")
|
raw_steps = scenario.get("steps")
|
||||||
if not isinstance(raw_steps, list) or not raw_steps:
|
if not isinstance(raw_steps, list) or not raw_steps:
|
||||||
raise ScenarioStoreError("Scenario must contain non-empty steps list")
|
raise ScenarioStoreError("Scenario must contain non-empty steps list")
|
||||||
@@ -468,107 +208,49 @@ class McpWorkflowRunner:
|
|||||||
if not isinstance(raw_step, dict):
|
if not isinstance(raw_step, dict):
|
||||||
raise ScenarioStoreError("Each scenario step must be object")
|
raise ScenarioStoreError("Each scenario step must be object")
|
||||||
if raw_step.get("type") != "tool":
|
if raw_step.get("type") != "tool":
|
||||||
raise ScenarioStoreError("This minimal runner supports only tool steps")
|
raise ScenarioStoreError("This runner supports only tool steps")
|
||||||
|
|
||||||
step_name = str(raw_step.get("name", "")).strip()
|
step_name = str(raw_step.get("name", "")).strip()
|
||||||
tool_name = str(raw_step.get("tool", step_name)).strip()
|
tool_name = str(raw_step.get("tool", step_name)).strip()
|
||||||
if not step_name or not tool_name:
|
if not step_name or not tool_name:
|
||||||
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
|
raise ScenarioStoreError("Each tool step must contain non-empty name and tool")
|
||||||
|
|
||||||
executor = self._build_tool_step_executor(raw_step)
|
|
||||||
workflow_steps.append(
|
workflow_steps.append(
|
||||||
Step(
|
Step(
|
||||||
name=step_name,
|
name=step_name,
|
||||||
description=str(raw_step.get("description", step_name)),
|
description=str(raw_step.get("description", step_name)),
|
||||||
executor=executor,
|
executor=_build_tool_executor(raw_step),
|
||||||
max_retries=0,
|
max_retries=0,
|
||||||
on_error="fail",
|
on_error="fail",
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
workflow = Workflow(
|
return Workflow(
|
||||||
name=scenario_id,
|
name=scenario_id,
|
||||||
description=str(scenario.get("description", "")),
|
description=str(scenario.get("description", "")),
|
||||||
steps=workflow_steps,
|
steps=workflow_steps,
|
||||||
)
|
)
|
||||||
self._workflow_cache[scenario_id] = workflow
|
|
||||||
|
|
||||||
|
_workflow_cache: dict[str, Workflow] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_workflow(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
|
||||||
|
cached = _workflow_cache.get(scenario_id)
|
||||||
|
if cached is not None:
|
||||||
|
return cached
|
||||||
|
workflow = _build_workflow(scenario_id, scenario)
|
||||||
|
_workflow_cache[scenario_id] = workflow
|
||||||
return workflow
|
return workflow
|
||||||
|
|
||||||
async def run(self, *, scenario_id: str, input_data: dict[str, Any]) -> dict[str, Any]:
|
|
||||||
scenario = load_scenario_definition(scenario_id)
|
|
||||||
workflow = self.get_workflow(scenario_id, scenario)
|
|
||||||
|
|
||||||
initial_state = {
|
def _extract_output_summary(result: dict[str, Any] | None) -> str | None:
|
||||||
"input": deepcopy(input_data),
|
if not isinstance(result, dict):
|
||||||
"steps": {},
|
|
||||||
"tool_calls": [],
|
|
||||||
}
|
|
||||||
token = self._run_state_ctx.set(initial_state)
|
|
||||||
run_state = initial_state
|
|
||||||
run_output: Any = None
|
|
||||||
workflow_error: str | None = None
|
|
||||||
try:
|
|
||||||
run_output = await workflow.arun(input=input_data)
|
|
||||||
except Exception as exc:
|
|
||||||
workflow_error = str(exc)
|
|
||||||
finally:
|
|
||||||
captured = self._run_state_ctx.get()
|
|
||||||
if isinstance(captured, dict):
|
|
||||||
run_state = deepcopy(captured)
|
|
||||||
self._run_state_ctx.reset(token)
|
|
||||||
|
|
||||||
content = run_output.content if hasattr(run_output, "content") else None
|
|
||||||
if isinstance(content, str):
|
|
||||||
try:
|
|
||||||
content = json.loads(content)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
content = {"raw_content": content}
|
|
||||||
if content is None:
|
|
||||||
step_payloads = run_state.get("steps", {})
|
|
||||||
if isinstance(step_payloads, dict):
|
|
||||||
for payload in reversed(list(step_payloads.values())):
|
|
||||||
if isinstance(payload, dict) and not bool(payload.get("ok", True)):
|
|
||||||
content = deepcopy(payload)
|
|
||||||
break
|
|
||||||
if content is None and workflow_error is not None:
|
|
||||||
content = {"error": workflow_error}
|
|
||||||
|
|
||||||
status = "success"
|
|
||||||
if workflow_error is not None:
|
|
||||||
status = "failed"
|
|
||||||
elif run_output is not None and not bool(getattr(run_output, "success", True)):
|
|
||||||
status = "failed"
|
|
||||||
return {
|
|
||||||
"scenario_id": scenario_id,
|
|
||||||
"workflow_name": workflow.name,
|
|
||||||
"status": status,
|
|
||||||
"input": input_data,
|
|
||||||
"final_result": content if isinstance(content, dict) else {"raw_content": content},
|
|
||||||
"steps": run_state.get("steps", {}),
|
|
||||||
"tool_calls": run_state.get("tool_calls", []),
|
|
||||||
"run_id": str(getattr(run_output, "run_id", "")) or None,
|
|
||||||
"session_id": str(getattr(run_output, "session_id", "")) or None,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
_default_runner: McpWorkflowRunner | None = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_mcp_workflow_runner() -> McpWorkflowRunner:
|
|
||||||
global _default_runner
|
|
||||||
if _default_runner is not None:
|
|
||||||
return _default_runner
|
|
||||||
_default_runner = McpWorkflowRunner()
|
|
||||||
return _default_runner
|
|
||||||
|
|
||||||
|
|
||||||
def _extract_output_summary(content: Any) -> str | None:
|
|
||||||
if not isinstance(content, dict):
|
|
||||||
return None
|
return None
|
||||||
summary = content.get("summary")
|
summary = result.get("summary")
|
||||||
if isinstance(summary, str) and summary:
|
if isinstance(summary, str) and summary:
|
||||||
return summary
|
return summary
|
||||||
payload = content.get("payload")
|
payload = result.get("payload")
|
||||||
if isinstance(payload, dict):
|
if isinstance(payload, dict):
|
||||||
payload_summary = payload.get("summary")
|
payload_summary = payload.get("summary")
|
||||||
if isinstance(payload_summary, str) and payload_summary:
|
if isinstance(payload_summary, str) and payload_summary:
|
||||||
@@ -576,109 +258,128 @@ def _extract_output_summary(content: Any) -> str | None:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _build_step_states_from_minimal(
|
def _build_step_states(
|
||||||
*,
|
|
||||||
scenario: dict[str, Any],
|
scenario: dict[str, Any],
|
||||||
minimal_steps: dict[str, Any],
|
steps_payloads: dict[str, Any],
|
||||||
) -> list[StepState]:
|
) -> list[StepState]:
|
||||||
raw_steps = scenario.get("steps")
|
raw_steps = scenario.get("steps")
|
||||||
if not isinstance(raw_steps, list):
|
if not isinstance(raw_steps, list):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
step_states: list[StepState] = []
|
states: list[StepState] = []
|
||||||
for raw_step in raw_steps:
|
for raw_step in raw_steps:
|
||||||
if not isinstance(raw_step, dict):
|
if not isinstance(raw_step, dict):
|
||||||
continue
|
continue
|
||||||
step_name = str(raw_step.get("name", "")).strip()
|
name = str(raw_step.get("name", "")).strip()
|
||||||
if not step_name:
|
if not name:
|
||||||
continue
|
continue
|
||||||
payload = minimal_steps.get(step_name)
|
payload = steps_payloads.get(name)
|
||||||
if not isinstance(payload, dict):
|
if not isinstance(payload, dict):
|
||||||
step_states.append(StepState(node_id=step_name, status="queued"))
|
states.append(
|
||||||
|
StepState(
|
||||||
|
node_id=name,
|
||||||
|
status="queued",
|
||||||
|
message="",
|
||||||
|
)
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
ok = bool(payload.get("ok", False))
|
ok = bool(payload.get("ok", False))
|
||||||
step_states.append(
|
states.append(
|
||||||
StepState(
|
StepState(
|
||||||
node_id=step_name,
|
node_id=name,
|
||||||
status="success" if ok else "failed",
|
status="success" if ok else "failed",
|
||||||
started_at=str(payload.get("started_at") or "") or None,
|
started_at=str(payload.get("started_at") or "") or None,
|
||||||
finished_at=str(payload.get("finished_at") or "") or None,
|
finished_at=str(payload.get("finished_at") or "") or None,
|
||||||
error=RunError(
|
message="" if ok else str(payload.get("error", f"{name} failed")),
|
||||||
code="tool_error",
|
|
||||||
message=str(payload.get("error", f"{step_name} failed")),
|
|
||||||
)
|
|
||||||
if not ok
|
|
||||||
else None,
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return step_states
|
return states
|
||||||
|
|
||||||
|
|
||||||
async def run_scenario_workflow(
|
async def run_scenario(
|
||||||
|
*,
|
||||||
|
scenario_id: str,
|
||||||
input_data: dict[str, Any],
|
input_data: dict[str, Any],
|
||||||
scenario_id: str = "news_source_discovery_v1",
|
) -> ScenarioRunResponse:
|
||||||
) -> dict[str, Any]:
|
|
||||||
try:
|
try:
|
||||||
scenario = load_scenario_definition(scenario_id)
|
scenario = load_scenario_definition(scenario_id)
|
||||||
except ScenarioStoreError as exc:
|
except ScenarioStoreError as exc:
|
||||||
return ScenarioRunResponse(
|
return ScenarioRunResponse(
|
||||||
scenario_id=scenario_id,
|
scenario_id=scenario_id,
|
||||||
status="failed",
|
status="failed",
|
||||||
|
message=str(exc),
|
||||||
input=input_data,
|
input=input_data,
|
||||||
steps=[],
|
)
|
||||||
error=RunError(code="unknown_scenario", message=str(exc)),
|
|
||||||
).model_dump()
|
|
||||||
|
|
||||||
runner = get_mcp_workflow_runner()
|
|
||||||
scenario_name = str(scenario.get("name", scenario_id))
|
scenario_name = str(scenario.get("name", scenario_id))
|
||||||
try:
|
try:
|
||||||
minimal_result = await runner.run(
|
workflow = _get_workflow(scenario_id, scenario)
|
||||||
scenario_id=scenario_id,
|
except ScenarioStoreError as exc:
|
||||||
input_data=input_data,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
return ScenarioRunResponse(
|
return ScenarioRunResponse(
|
||||||
scenario_id=scenario_id,
|
scenario_id=scenario_id,
|
||||||
status="failed",
|
status="failed",
|
||||||
|
message=str(exc),
|
||||||
input=input_data,
|
input=input_data,
|
||||||
scenario_name=scenario_name,
|
scenario_name=scenario_name,
|
||||||
steps=[],
|
|
||||||
error=RunError(code="workflow_error", message=str(exc)),
|
|
||||||
).model_dump()
|
|
||||||
|
|
||||||
minimal_steps = minimal_result.get("steps", {})
|
|
||||||
steps = (
|
|
||||||
minimal_steps
|
|
||||||
if isinstance(minimal_steps, dict)
|
|
||||||
else {}
|
|
||||||
)
|
|
||||||
step_states = _build_step_states_from_minimal(
|
|
||||||
scenario=scenario,
|
|
||||||
minimal_steps=steps,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
final_result = minimal_result.get("final_result")
|
# Fresh per-run state that Agno owns during arun(..., session_state=...).
|
||||||
normalized_result = (
|
session_state: dict[str, Any] = {
|
||||||
final_result if isinstance(final_result, dict) else {"raw_content": str(final_result)}
|
"input": deepcopy(input_data),
|
||||||
|
"steps": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
workflow_error: str | None = None
|
||||||
|
run_output: Any = None
|
||||||
|
try:
|
||||||
|
run_output = await workflow.arun(
|
||||||
|
input=input_data,
|
||||||
|
session_state=session_state,
|
||||||
)
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
workflow_error = str(exc)
|
||||||
|
|
||||||
|
steps_payloads = session_state.get("steps", {}) or {}
|
||||||
|
step_states = _build_step_states(scenario, steps_payloads)
|
||||||
|
|
||||||
status = "success"
|
status = "success"
|
||||||
for payload in steps.values():
|
if workflow_error is not None:
|
||||||
|
status = "failed"
|
||||||
|
else:
|
||||||
|
for payload in steps_payloads.values():
|
||||||
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
|
if isinstance(payload, dict) and not bool(payload.get("ok", False)):
|
||||||
status = "failed"
|
status = "failed"
|
||||||
break
|
break
|
||||||
|
if run_output is not None and not bool(getattr(run_output, "success", True)):
|
||||||
|
status = "failed"
|
||||||
|
|
||||||
|
content = getattr(run_output, "content", None)
|
||||||
|
if isinstance(content, str):
|
||||||
|
try:
|
||||||
|
content = json.loads(content)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
content = {"raw_content": content}
|
||||||
|
if content is None:
|
||||||
|
for payload in reversed(list(steps_payloads.values())):
|
||||||
|
if isinstance(payload, dict):
|
||||||
|
content = deepcopy(payload)
|
||||||
|
break
|
||||||
|
if content is None and workflow_error is not None:
|
||||||
|
content = {"message": workflow_error}
|
||||||
|
|
||||||
|
result = content if isinstance(content, dict) else {"raw_content": content}
|
||||||
|
response_message = "" if status == "success" else (workflow_error or "failed")
|
||||||
|
|
||||||
return ScenarioRunResponse(
|
return ScenarioRunResponse(
|
||||||
scenario_id=scenario_id,
|
scenario_id=scenario_id,
|
||||||
status=status,
|
status=status,
|
||||||
|
message=response_message,
|
||||||
input=input_data,
|
input=input_data,
|
||||||
steps=step_states,
|
steps=step_states,
|
||||||
output_summary=_extract_output_summary(normalized_result),
|
output_summary=_extract_output_summary(result),
|
||||||
scenario_name=scenario_name,
|
scenario_name=scenario_name,
|
||||||
workflow_name=str(minimal_result.get("workflow_name") or scenario_id),
|
workflow_name=workflow.name,
|
||||||
result=normalized_result,
|
result=result,
|
||||||
error=None
|
run_id=str(getattr(run_output, "run_id", "")) or None,
|
||||||
if status == "success"
|
session_id=str(getattr(run_output, "session_id", "")) or None,
|
||||||
else RunError(code="workflow_failed", message="Workflow finished with failed status."),
|
)
|
||||||
run_id=minimal_result.get("run_id"),
|
|
||||||
session_id=minimal_result.get("session_id"),
|
|
||||||
).model_dump()
|
|
||||||
|
|||||||
+2
-7
@@ -8,11 +8,6 @@ RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
|
|||||||
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
|
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
|
||||||
|
|
||||||
|
|
||||||
class RunError(BaseModel):
|
|
||||||
code: str
|
|
||||||
message: str
|
|
||||||
|
|
||||||
|
|
||||||
class ScenarioRunRequest(BaseModel):
|
class ScenarioRunRequest(BaseModel):
|
||||||
scenario_id: str = "news_source_discovery_v1"
|
scenario_id: str = "news_source_discovery_v1"
|
||||||
input: dict[str, Any] = Field(default_factory=dict)
|
input: dict[str, Any] = Field(default_factory=dict)
|
||||||
@@ -23,18 +18,18 @@ class StepState(BaseModel):
|
|||||||
status: StepStatus
|
status: StepStatus
|
||||||
started_at: str | None = None
|
started_at: str | None = None
|
||||||
finished_at: str | None = None
|
finished_at: str | None = None
|
||||||
error: RunError | None = None
|
message: str = ""
|
||||||
|
|
||||||
|
|
||||||
class ScenarioRunResponse(BaseModel):
|
class ScenarioRunResponse(BaseModel):
|
||||||
scenario_id: str
|
scenario_id: str
|
||||||
status: RunStatus
|
status: RunStatus
|
||||||
|
message: str = ""
|
||||||
input: dict[str, Any]
|
input: dict[str, Any]
|
||||||
steps: list[StepState] = Field(default_factory=list)
|
steps: list[StepState] = Field(default_factory=list)
|
||||||
output_summary: str | None = None
|
output_summary: str | None = None
|
||||||
scenario_name: str | None = None
|
scenario_name: str | None = None
|
||||||
workflow_name: str | None = None
|
workflow_name: str | None = None
|
||||||
result: dict[str, Any] | None = None
|
result: dict[str, Any] | None = None
|
||||||
error: RunError | None = None
|
|
||||||
run_id: str | None = None
|
run_id: str | None = None
|
||||||
session_id: str | None = None
|
session_id: str | None = None
|
||||||
|
|||||||
@@ -0,0 +1,129 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from openai import AsyncOpenAI
|
||||||
|
|
||||||
|
|
||||||
|
_planner_client: AsyncOpenAI | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def _env_float(name: str, default: float) -> float:
|
||||||
|
value = os.getenv(name)
|
||||||
|
if value is None:
|
||||||
|
return default
|
||||||
|
return float(value)
|
||||||
|
|
||||||
|
|
||||||
|
def planner_enabled() -> bool:
|
||||||
|
return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"}
|
||||||
|
|
||||||
|
|
||||||
|
def _get_client() -> AsyncOpenAI:
|
||||||
|
global _planner_client
|
||||||
|
if _planner_client is not None:
|
||||||
|
return _planner_client
|
||||||
|
_planner_client = AsyncOpenAI(
|
||||||
|
base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"),
|
||||||
|
api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"),
|
||||||
|
)
|
||||||
|
return _planner_client
|
||||||
|
|
||||||
|
|
||||||
|
def _response_schema(required_fields: list[str]) -> dict[str, Any]:
|
||||||
|
value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]}
|
||||||
|
return {
|
||||||
|
"name": "mcp_arguments",
|
||||||
|
"strict": True,
|
||||||
|
"schema": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"arguments": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {f: value_schema for f in required_fields},
|
||||||
|
"required": required_fields,
|
||||||
|
"additionalProperties": True,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["arguments"],
|
||||||
|
"additionalProperties": False,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_arguments(content: Any) -> dict[str, Any]:
|
||||||
|
candidate: Any = content
|
||||||
|
if isinstance(candidate, str):
|
||||||
|
text = candidate.strip()
|
||||||
|
if text.startswith("```"):
|
||||||
|
text = text.strip("`").strip()
|
||||||
|
if text.startswith("json"):
|
||||||
|
text = text[4:].strip()
|
||||||
|
try:
|
||||||
|
candidate = json.loads(text)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return {}
|
||||||
|
if isinstance(candidate, dict):
|
||||||
|
if isinstance(candidate.get("arguments"), dict):
|
||||||
|
return candidate["arguments"]
|
||||||
|
return candidate
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
async def plan_arguments(
|
||||||
|
*,
|
||||||
|
step_name: str,
|
||||||
|
tool_name: str,
|
||||||
|
base_arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
scope: dict[str, Any],
|
||||||
|
missing_fields: list[str],
|
||||||
|
attempt_no: int,
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Fallback planner: asks an LLM to fill missing required fields from context.
|
||||||
|
|
||||||
|
Returns merged arguments (base + planned). On any failure returns base_arguments
|
||||||
|
unchanged — caller is responsible for validating required fields afterwards.
|
||||||
|
"""
|
||||||
|
prompt = {
|
||||||
|
"task": "Prepare MCP arguments for this step.",
|
||||||
|
"step_name": step_name,
|
||||||
|
"tool_name": tool_name,
|
||||||
|
"required_fields": required_fields,
|
||||||
|
"base_arguments": base_arguments,
|
||||||
|
"missing_fields": missing_fields,
|
||||||
|
"repair_attempt": attempt_no,
|
||||||
|
"context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})},
|
||||||
|
"output": (
|
||||||
|
"Return only JSON object with key 'arguments'. "
|
||||||
|
"Fill every missing field from context."
|
||||||
|
),
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
completion = await _get_client().chat.completions.create(
|
||||||
|
model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"),
|
||||||
|
messages=[
|
||||||
|
{
|
||||||
|
"role": "system",
|
||||||
|
"content": (
|
||||||
|
"You are a tool-input planner. "
|
||||||
|
"Return only JSON that matches the provided schema."
|
||||||
|
),
|
||||||
|
},
|
||||||
|
{"role": "user", "content": json.dumps(prompt, ensure_ascii=False)},
|
||||||
|
],
|
||||||
|
response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)},
|
||||||
|
temperature=_env_float("POLZA_TEMPERATURE", 0.0),
|
||||||
|
)
|
||||||
|
raw = completion.choices[0].message.content if completion.choices else ""
|
||||||
|
planned = _extract_arguments(raw)
|
||||||
|
except Exception:
|
||||||
|
planned = {}
|
||||||
|
|
||||||
|
merged = deepcopy(base_arguments)
|
||||||
|
if isinstance(planned, dict):
|
||||||
|
merged.update(planned)
|
||||||
|
return merged
|
||||||
@@ -0,0 +1,51 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from copy import deepcopy
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_path(scope: dict[str, Any], path: str) -> Any:
|
||||||
|
value: Any = scope
|
||||||
|
for segment in path.split("."):
|
||||||
|
key = segment.strip()
|
||||||
|
if not key:
|
||||||
|
continue
|
||||||
|
if not isinstance(value, dict):
|
||||||
|
return None
|
||||||
|
value = value.get(key)
|
||||||
|
return deepcopy(value)
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_template(template: Any, scope: dict[str, Any]) -> Any:
|
||||||
|
if isinstance(template, dict):
|
||||||
|
if set(template.keys()) == {"from"}:
|
||||||
|
return resolve_path(scope, str(template["from"]))
|
||||||
|
return {key: resolve_template(value, scope) for key, value in template.items()}
|
||||||
|
if isinstance(template, list):
|
||||||
|
return [resolve_template(item, scope) for item in template]
|
||||||
|
return deepcopy(template)
|
||||||
|
|
||||||
|
|
||||||
|
def missing_required_fields(
|
||||||
|
arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
) -> list[str]:
|
||||||
|
missing: list[str] = []
|
||||||
|
for field in required_fields:
|
||||||
|
value = arguments.get(field)
|
||||||
|
if isinstance(value, str) and value.strip():
|
||||||
|
continue
|
||||||
|
if value not in (None, "", [], {}):
|
||||||
|
continue
|
||||||
|
missing.append(field)
|
||||||
|
return missing
|
||||||
|
|
||||||
|
|
||||||
|
def validate_required_fields(
|
||||||
|
arguments: dict[str, Any],
|
||||||
|
required_fields: list[str],
|
||||||
|
step_name: str,
|
||||||
|
) -> None:
|
||||||
|
missing = missing_required_fields(arguments, required_fields)
|
||||||
|
if missing:
|
||||||
|
raise ValueError(f"{step_name}: missing required fields: {', '.join(missing)}")
|
||||||
Reference in New Issue
Block a user