"""LLM-backed fallback planner for MCP tool arguments. When a step's resolved arguments are missing required fields, this module calls an OpenAI-compatible chat completion to fill them from the current scope (``input`` + prior ``steps``). The planner is best-effort: on any failure it returns the base arguments unchanged so the caller's validator can produce a clean error. """ from __future__ import annotations from copy import deepcopy import json import os from typing import Any from loguru import logger from openai import AsyncOpenAI _planner_client: AsyncOpenAI | None = None def _env_float(name: str, default: float) -> float: value = os.getenv(name) if value is None: return default return float(value) def planner_enabled() -> bool: return os.getenv("PLANNER_ENABLED", "false").strip().lower() in {"1", "true", "yes"} def _get_client() -> AsyncOpenAI: global _planner_client if _planner_client is not None: return _planner_client _planner_client = AsyncOpenAI( base_url=os.getenv("POLZA_BASE_URL", "https://api.polza.ai/v1"), api_key=os.getenv("POLZA_API_KEY") or os.getenv("OPENAI_API_KEY"), ) return _planner_client def _response_schema(required_fields: list[str]) -> dict[str, Any]: value_schema = {"type": ["string", "number", "boolean", "array", "object", "null"]} return { "name": "mcp_arguments", "strict": True, "schema": { "type": "object", "properties": { "arguments": { "type": "object", "properties": {f: value_schema for f in required_fields}, "required": required_fields, "additionalProperties": True, } }, "required": ["arguments"], "additionalProperties": False, }, } def _extract_arguments(content: Any) -> dict[str, Any]: candidate: Any = content if isinstance(candidate, str): text = candidate.strip() if text.startswith("```"): text = text.strip("`").strip() if text.startswith("json"): text = text[4:].strip() try: candidate = json.loads(text) except json.JSONDecodeError: return {} if isinstance(candidate, dict): if isinstance(candidate.get("arguments"), dict): return candidate["arguments"] return candidate return {} async def plan_arguments( *, step_name: str, tool_name: str, base_arguments: dict[str, Any], required_fields: list[str], scope: dict[str, Any], missing_fields: list[str], attempt_no: int, ) -> dict[str, Any]: """Fallback planner: asks an LLM to fill missing required fields from context. Returns merged arguments (base + planned). On any failure returns base_arguments unchanged — caller is responsible for validating required fields afterwards. """ prompt = { "task": "Prepare MCP arguments for this step.", "step_name": step_name, "tool_name": tool_name, "required_fields": required_fields, "base_arguments": base_arguments, "missing_fields": missing_fields, "repair_attempt": attempt_no, "context": {"input": scope.get("input", {}), "steps": scope.get("steps", {})}, "output": ( "Return only JSON object with key 'arguments'. " "Fill every missing field from context." ), } try: completion = await _get_client().chat.completions.create( model=os.getenv("POLZA_MODEL_ID", "google/gemma-4-31b-it"), messages=[ { "role": "system", "content": ( "You are a tool-input planner. " "Return only JSON that matches the provided schema." ), }, {"role": "user", "content": json.dumps(prompt, ensure_ascii=False)}, ], response_format={"type": "json_schema", "json_schema": _response_schema(required_fields)}, temperature=_env_float("POLZA_TEMPERATURE", 0.0), ) raw = completion.choices[0].message.content if completion.choices else "" planned = _extract_arguments(raw) except Exception: logger.warning( "Planner call failed for step={} tool={} attempt={}", step_name, tool_name, attempt_no, ) planned = {} merged = deepcopy(base_arguments) if isinstance(planned, dict): merged.update(planned) return merged