diff --git a/README.md b/README.md index 269e594..1cfd61e 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,6 @@ prisma_platform/ ├── api_routes.py ├── agent_os.py ├── agent_runner.py - ├── main.py ├── observability.py ├── scenario_store.py ├── schemas.py @@ -87,7 +86,73 @@ curl -X POST "http://127.0.0.1:7777/api/runs" \ }' ``` -Endpoint возвращает структурированный ответ со статусом `success` или `failed`. +Endpoint возвращает единый JSON-контракт. Поля одинаковые для `success` и `failed`, +а в неактуальных полях приходит `null`. + +Пример успешного ответа: + +```json +{ + "scenario_id": "news_source_discovery_v1", + "status": "success", + "input": { + "url": "https://example.com/news" + }, + "steps": [ + { + "node_id": "search_news_sources", + "status": "success", + "started_at": "2026-04-22T10:00:00+00:00", + "finished_at": "2026-04-22T10:00:00+00:00", + "error": null + } + ], + "output_summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1", + "workflow_name": "news_source_discovery_v1", + "scenario_name": "News Source Discovery V1", + "result": { + "tool_name": "generate_summary", + "ok": true, + "payload": { + "input_count": 3, + "summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1" + }, + "received_at": "2026-04-22T10:00:00+00:00" + }, + "error": null, + "run_id": "run_xxx", + "session_id": "session_xxx" +} +``` + +Пример ответа с ошибкой валидации: + +```json +{ + "scenario_id": "news_source_discovery_v1", + "status": "failed", + "input": {}, + "steps": [ + { + "node_id": "search_news_sources", + "status": "queued", + "started_at": null, + "finished_at": null, + "error": null + } + ], + "output_summary": null, + "workflow_name": null, + "scenario_name": "News Source Discovery V1", + "result": null, + "error": { + "code": "invalid_input", + "message": "Input does not match scenario input_schema: ..." + }, + "run_id": null, + "session_id": null +} +``` Проверка, что сервер поднят: diff --git a/src/api_routes.py b/src/api_routes.py index 8b183bb..c03cd5e 100644 --- a/src/api_routes.py +++ b/src/api_routes.py @@ -1,11 +1,9 @@ from fastapi import APIRouter -from pydantic import TypeAdapter from src.schemas import ScenarioRunRequest, ScenarioRunResponse from src.workflow_runner import run_scenario_workflow router = APIRouter(prefix="/api", tags=["workflow"]) -_run_response_adapter = TypeAdapter(ScenarioRunResponse) @router.post("/runs", response_model=ScenarioRunResponse) @@ -14,4 +12,4 @@ async def run_scenario(request: ScenarioRunRequest) -> ScenarioRunResponse: input_data=request.input, scenario_id=request.scenario_id, ) - return _run_response_adapter.validate_python(result) + return ScenarioRunResponse.model_validate(result) diff --git a/src/schemas.py b/src/schemas.py index 9a4f95d..3d3b2d0 100644 --- a/src/schemas.py +++ b/src/schemas.py @@ -4,6 +4,9 @@ from typing import Any, Literal from pydantic import BaseModel, Field +RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"] +StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"] + class RunError(BaseModel): code: str @@ -15,25 +18,23 @@ class ScenarioRunRequest(BaseModel): input: dict[str, Any] = Field(default_factory=dict) -class ScenarioRunBase(BaseModel): +class StepState(BaseModel): + node_id: str + status: StepStatus + started_at: str | None = None + finished_at: str | None = None + error: RunError | None = None + + +class ScenarioRunResponse(BaseModel): scenario_id: str - status: Literal["success", "failed"] + status: RunStatus input: dict[str, Any] - - -class ScenarioRunFailed(ScenarioRunBase): - status: Literal["failed"] = "failed" + steps: list[StepState] = Field(default_factory=list) + output_summary: str | None = None scenario_name: str | None = None - error: RunError - - -class ScenarioRunSuccess(ScenarioRunBase): - status: Literal["success"] = "success" - workflow_name: str - scenario_name: str - result: dict[str, Any] + workflow_name: str | None = None + result: dict[str, Any] | None = None + error: RunError | None = None run_id: str | None = None session_id: str | None = None - - -ScenarioRunResponse = ScenarioRunSuccess | ScenarioRunFailed diff --git a/src/workflow_runner.py b/src/workflow_runner.py index 23fe570..d6cadbd 100644 --- a/src/workflow_runner.py +++ b/src/workflow_runner.py @@ -1,12 +1,14 @@ from __future__ import annotations +from contextvars import ContextVar +from datetime import datetime, timezone import json from typing import Any from agno.workflow.step import Step, StepInput, StepOutput from agno.workflow.workflow import Workflow from pydantic import BaseModel, ValidationError, create_model -from src.schemas import RunError, ScenarioRunFailed, ScenarioRunSuccess +from src.schemas import RunError, RunStatus, ScenarioRunResponse, StepState from src.scenario_store import ScenarioStoreError, load_scenario_definition from src.stub_tools import ( stub_extract_publication_date, @@ -18,6 +20,10 @@ from src.stub_tools import ( _workflow_cache: dict[str, Workflow] = {} _workflow_input_schemas: dict[str, type[BaseModel]] = {} +_run_steps_context: ContextVar[list[StepState] | None] = ContextVar( + "run_steps_context", + default=None, +) def _json_loads(raw: str | None) -> dict[str, Any]: @@ -36,6 +42,106 @@ def _as_json_step_output(payload: dict[str, Any]) -> StepOutput: return StepOutput(content=json.dumps(payload, ensure_ascii=False)) +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _initialize_step_states(scenario: dict[str, Any]) -> list[StepState]: + raw_steps = scenario.get("steps") + if not isinstance(raw_steps, list): + return [] + + step_states: list[StepState] = [] + for raw_step in raw_steps: + if not isinstance(raw_step, dict): + continue + node_id = str(raw_step.get("name", "")).strip() + if not node_id: + continue + step_states.append(StepState(node_id=node_id, status="queued")) + return step_states + + +def _update_step_state( + node_id: str, + status: str, + error: RunError | None = None, +) -> None: + step_states = _run_steps_context.get() + if not step_states: + return + + for step_state in step_states: + if step_state.node_id != node_id: + continue + step_state.status = status + if status == "running" and step_state.started_at is None: + step_state.started_at = _utc_now_iso() + if status in {"success", "failed", "waiting_human"}: + if step_state.started_at is None: + step_state.started_at = _utc_now_iso() + step_state.finished_at = _utc_now_iso() + step_state.error = error + return + + +def _mark_running_steps_failed(message: str) -> None: + step_states = _run_steps_context.get() + if not step_states: + return + + for step_state in step_states: + if step_state.status == "running": + step_state.status = "failed" + if step_state.started_at is None: + step_state.started_at = _utc_now_iso() + step_state.finished_at = _utc_now_iso() + step_state.error = RunError(code="workflow_error", message=message) + + +def _extract_output_summary(content: Any) -> str | None: + if not isinstance(content, dict): + return None + summary = content.get("summary") + if isinstance(summary, str) and summary: + return summary + payload = content.get("payload") + if isinstance(payload, dict): + payload_summary = payload.get("summary") + if isinstance(payload_summary, str) and payload_summary: + return payload_summary + return None + + +def _build_run_response( + *, + scenario_id: str, + input_data: dict[str, Any], + status: RunStatus, + steps: list[StepState], + scenario_name: str | None = None, + workflow_name: str | None = None, + output_summary: str | None = None, + result: dict[str, Any] | None = None, + error: RunError | None = None, + run_id: str | None = None, + session_id: str | None = None, +) -> dict[str, Any]: + return ScenarioRunResponse( + scenario_id=scenario_id, + status=status, + input=input_data, + steps=steps, + output_summary=output_summary, + scenario_name=scenario_name, + workflow_name=workflow_name, + result=result, + error=error, + run_id=run_id, + session_id=session_id, + ).model_dump() + + def _extract_input_url(step_input_value: Any) -> str: if isinstance(step_input_value, dict): return str(step_input_value.get("url", "")).strip() @@ -70,14 +176,22 @@ def _build_input_schema_model(scenario: dict[str, Any]) -> type[BaseModel] | Non async def _search_news_sources_executor(step_input: StepInput) -> StepOutput: + _update_step_state("search_news_sources", "running") input_url = _extract_input_url(step_input.input) if not input_url: + _update_step_state( + "search_news_sources", + "failed", + error=RunError(code="invalid_input", message="input.url is empty"), + ) return StepOutput(content="search_news_sources failed: input.url is empty", success=False) search_result = await stub_search_news_sources(url=input_url) + _update_step_state("search_news_sources", "success") return _as_json_step_output(search_result) async def _parse_article_executor(step_input: StepInput) -> StepOutput: + _update_step_state("parse_articles_batch", "running") previous_payload = _json_loads(step_input.previous_step_content) items = previous_payload.get("payload", {}).get("items", []) @@ -86,9 +200,15 @@ async def _parse_article_executor(step_input: StepInput) -> StepOutput: source_url = str(item.get("url", "")) parsed_result = await stub_parse_article(url=source_url) if not parsed_result.get("ok", False): + _update_step_state( + "parse_articles_batch", + "failed", + error=RunError(code="tool_error", message="parse_article failed"), + ) return StepOutput(content="parse_article failed", success=False) parsed_items.append(parsed_result.get("payload", {})) + _update_step_state("parse_articles_batch", "success") return _as_json_step_output( { "tool_name": "parse_articles_batch", @@ -99,6 +219,7 @@ async def _parse_article_executor(step_input: StepInput) -> StepOutput: async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput: + _update_step_state("extract_publication_date_batch", "running") previous_payload = _json_loads(step_input.previous_step_content) parsed_items = previous_payload.get("payload", {}).get("items", []) @@ -107,6 +228,11 @@ async def _extract_publication_date_executor(step_input: StepInput) -> StepOutpu article_text = str(item.get("text", "")) extract_result = await stub_extract_publication_date(article_text=article_text) if not extract_result.get("ok", False): + _update_step_state( + "extract_publication_date_batch", + "failed", + error=RunError(code="tool_error", message="extract_publication_date failed"), + ) return StepOutput(content="extract_publication_date failed", success=False) dated_items.append( @@ -119,6 +245,7 @@ async def _extract_publication_date_executor(step_input: StepInput) -> StepOutpu } ) + _update_step_state("extract_publication_date_batch", "success") return _as_json_step_output( { "tool_name": "extract_publication_date_batch", @@ -129,16 +256,20 @@ async def _extract_publication_date_executor(step_input: StepInput) -> StepOutpu async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput: + _update_step_state("rank_sources_by_date", "running") previous_payload = _json_loads(step_input.previous_step_content) items = previous_payload.get("payload", {}).get("items", []) rank_result = await stub_rank_sources_by_date(items=items) + _update_step_state("rank_sources_by_date", "success") return _as_json_step_output(rank_result) async def _generate_summary_executor(step_input: StepInput) -> StepOutput: + _update_step_state("generate_summary", "running") previous_payload = _json_loads(step_input.previous_step_content) ranked_items = previous_payload.get("payload", {}).get("ranked_items", []) summary_result = await stub_generate_summary(items=ranked_items) + _update_step_state("generate_summary", "success") return _as_json_step_output(summary_result) @@ -198,29 +329,49 @@ async def run_scenario_workflow( try: scenario = load_scenario_definition(scenario_id) except ScenarioStoreError as exc: - return ScenarioRunFailed( + return _build_run_response( scenario_id=scenario_id, - input=input_data, + input_data=input_data, + status="failed", + steps=[], error=RunError(code="unknown_scenario", message=str(exc)), - ).model_dump() + ) + step_states = _initialize_step_states(scenario) + scenario_name = str(scenario.get("name", scenario_id)) workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario) input_schema_model = _workflow_input_schemas.get(scenario_id) if input_schema_model is not None: try: input_schema_model.model_validate(input_data) except ValidationError as exc: - return ScenarioRunFailed( + return _build_run_response( scenario_id=scenario_id, - scenario_name=str(scenario.get("name", scenario_id)), - input=input_data, + input_data=input_data, + status="failed", + scenario_name=scenario_name, + steps=step_states, error=RunError( code="invalid_input", message=f"Input does not match scenario input_schema: {exc}", ), - ).model_dump() + ) - run_output = await workflow.arun(input=input_data) + context_token = _run_steps_context.set(step_states) + try: + run_output = await workflow.arun(input=input_data) + except Exception as exc: + _mark_running_steps_failed(str(exc)) + return _build_run_response( + scenario_id=scenario_id, + input_data=input_data, + status="failed", + scenario_name=scenario_name, + steps=step_states, + error=RunError(code="workflow_error", message=str(exc)), + ) + finally: + _run_steps_context.reset(context_token) content: Any = run_output.content if hasattr(run_output, "content") else {} if isinstance(content, str): @@ -228,6 +379,23 @@ async def run_scenario_workflow( content = json.loads(content) except json.JSONDecodeError: content = {"raw_content": content} + output_summary = _extract_output_summary(content) + normalized_result = content if isinstance(content, dict) else {"raw_content": str(content)} + + if hasattr(run_output, "success") and not bool(getattr(run_output, "success")): + return _build_run_response( + scenario_id=scenario_id, + input_data=input_data, + status="failed", + scenario_name=scenario_name, + steps=step_states, + output_summary=output_summary, + result=normalized_result, + error=RunError( + code="workflow_failed", + message="Workflow finished with failed status.", + ), + ) run_id: str | None = None session_id: str | None = None @@ -236,12 +404,15 @@ async def run_scenario_workflow( if hasattr(run_output, "session_id"): session_id = str(getattr(run_output, "session_id")) - return ScenarioRunSuccess( + return _build_run_response( scenario_id=scenario_id, + input_data=input_data, + status="success", workflow_name=workflow.name, - scenario_name=str(scenario.get("name", scenario_id)), - input=input_data, - result=content if isinstance(content, dict) else {"raw_content": str(content)}, + scenario_name=scenario_name, + steps=step_states, + output_summary=output_summary, + result=normalized_result, run_id=run_id, session_id=session_id, - ).model_dump(exclude_none=True) + )