Унифицировать ответ /api/runs и добавить статусы шагов workflow.

Введен единый JSON-контракт для success/failed с общими полями, добавлен трекинг step status (queued/running/success/failed) и output_summary, а сборка run-ответа централизована через общий helper.
This commit is contained in:
Barabashka
2026-04-22 12:28:08 +03:00
parent 9068b7fe07
commit 93ee7aea1c
4 changed files with 271 additions and 36 deletions
+67 -2
View File
@@ -26,7 +26,6 @@ prisma_platform/
├── api_routes.py ├── api_routes.py
├── agent_os.py ├── agent_os.py
├── agent_runner.py ├── agent_runner.py
├── main.py
├── observability.py ├── observability.py
├── scenario_store.py ├── scenario_store.py
├── schemas.py ├── schemas.py
@@ -87,7 +86,73 @@ curl -X POST "http://127.0.0.1:7777/api/runs" \
}' }'
``` ```
Endpoint возвращает структурированный ответ со статусом `success` или `failed`. Endpoint возвращает единый JSON-контракт. Поля одинаковые для `success` и `failed`,
а в неактуальных полях приходит `null`.
Пример успешного ответа:
```json
{
"scenario_id": "news_source_discovery_v1",
"status": "success",
"input": {
"url": "https://example.com/news"
},
"steps": [
{
"node_id": "search_news_sources",
"status": "success",
"started_at": "2026-04-22T10:00:00+00:00",
"finished_at": "2026-04-22T10:00:00+00:00",
"error": null
}
],
"output_summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1",
"workflow_name": "news_source_discovery_v1",
"scenario_name": "News Source Discovery V1",
"result": {
"tool_name": "generate_summary",
"ok": true,
"payload": {
"input_count": 3,
"summary": "По заглушечным данным самым ранним источником считается https://news-a.example/article-1"
},
"received_at": "2026-04-22T10:00:00+00:00"
},
"error": null,
"run_id": "run_xxx",
"session_id": "session_xxx"
}
```
Пример ответа с ошибкой валидации:
```json
{
"scenario_id": "news_source_discovery_v1",
"status": "failed",
"input": {},
"steps": [
{
"node_id": "search_news_sources",
"status": "queued",
"started_at": null,
"finished_at": null,
"error": null
}
],
"output_summary": null,
"workflow_name": null,
"scenario_name": "News Source Discovery V1",
"result": null,
"error": {
"code": "invalid_input",
"message": "Input does not match scenario input_schema: ..."
},
"run_id": null,
"session_id": null
}
```
Проверка, что сервер поднят: Проверка, что сервер поднят:
+1 -3
View File
@@ -1,11 +1,9 @@
from fastapi import APIRouter from fastapi import APIRouter
from pydantic import TypeAdapter
from src.schemas import ScenarioRunRequest, ScenarioRunResponse from src.schemas import ScenarioRunRequest, ScenarioRunResponse
from src.workflow_runner import run_scenario_workflow from src.workflow_runner import run_scenario_workflow
router = APIRouter(prefix="/api", tags=["workflow"]) router = APIRouter(prefix="/api", tags=["workflow"])
_run_response_adapter = TypeAdapter(ScenarioRunResponse)
@router.post("/runs", response_model=ScenarioRunResponse) @router.post("/runs", response_model=ScenarioRunResponse)
@@ -14,4 +12,4 @@ async def run_scenario(request: ScenarioRunRequest) -> ScenarioRunResponse:
input_data=request.input, input_data=request.input,
scenario_id=request.scenario_id, scenario_id=request.scenario_id,
) )
return _run_response_adapter.validate_python(result) return ScenarioRunResponse.model_validate(result)
+18 -17
View File
@@ -4,6 +4,9 @@ from typing import Any, Literal
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
RunStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
StepStatus = Literal["queued", "running", "success", "failed", "waiting_human"]
class RunError(BaseModel): class RunError(BaseModel):
code: str code: str
@@ -15,25 +18,23 @@ class ScenarioRunRequest(BaseModel):
input: dict[str, Any] = Field(default_factory=dict) input: dict[str, Any] = Field(default_factory=dict)
class ScenarioRunBase(BaseModel): class StepState(BaseModel):
node_id: str
status: StepStatus
started_at: str | None = None
finished_at: str | None = None
error: RunError | None = None
class ScenarioRunResponse(BaseModel):
scenario_id: str scenario_id: str
status: Literal["success", "failed"] status: RunStatus
input: dict[str, Any] input: dict[str, Any]
steps: list[StepState] = Field(default_factory=list)
output_summary: str | None = None
class ScenarioRunFailed(ScenarioRunBase):
status: Literal["failed"] = "failed"
scenario_name: str | None = None scenario_name: str | None = None
error: RunError workflow_name: str | None = None
result: dict[str, Any] | None = None
error: RunError | None = None
class ScenarioRunSuccess(ScenarioRunBase):
status: Literal["success"] = "success"
workflow_name: str
scenario_name: str
result: dict[str, Any]
run_id: str | None = None run_id: str | None = None
session_id: str | None = None session_id: str | None = None
ScenarioRunResponse = ScenarioRunSuccess | ScenarioRunFailed
+184 -13
View File
@@ -1,12 +1,14 @@
from __future__ import annotations from __future__ import annotations
from contextvars import ContextVar
from datetime import datetime, timezone
import json import json
from typing import Any from typing import Any
from agno.workflow.step import Step, StepInput, StepOutput from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow from agno.workflow.workflow import Workflow
from pydantic import BaseModel, ValidationError, create_model from pydantic import BaseModel, ValidationError, create_model
from src.schemas import RunError, ScenarioRunFailed, ScenarioRunSuccess from src.schemas import RunError, RunStatus, ScenarioRunResponse, StepState
from src.scenario_store import ScenarioStoreError, load_scenario_definition from src.scenario_store import ScenarioStoreError, load_scenario_definition
from src.stub_tools import ( from src.stub_tools import (
stub_extract_publication_date, stub_extract_publication_date,
@@ -18,6 +20,10 @@ from src.stub_tools import (
_workflow_cache: dict[str, Workflow] = {} _workflow_cache: dict[str, Workflow] = {}
_workflow_input_schemas: dict[str, type[BaseModel]] = {} _workflow_input_schemas: dict[str, type[BaseModel]] = {}
_run_steps_context: ContextVar[list[StepState] | None] = ContextVar(
"run_steps_context",
default=None,
)
def _json_loads(raw: str | None) -> dict[str, Any]: def _json_loads(raw: str | None) -> dict[str, Any]:
@@ -36,6 +42,106 @@ def _as_json_step_output(payload: dict[str, Any]) -> StepOutput:
return StepOutput(content=json.dumps(payload, ensure_ascii=False)) return StepOutput(content=json.dumps(payload, ensure_ascii=False))
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _initialize_step_states(scenario: dict[str, Any]) -> list[StepState]:
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list):
return []
step_states: list[StepState] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
continue
node_id = str(raw_step.get("name", "")).strip()
if not node_id:
continue
step_states.append(StepState(node_id=node_id, status="queued"))
return step_states
def _update_step_state(
node_id: str,
status: str,
error: RunError | None = None,
) -> None:
step_states = _run_steps_context.get()
if not step_states:
return
for step_state in step_states:
if step_state.node_id != node_id:
continue
step_state.status = status
if status == "running" and step_state.started_at is None:
step_state.started_at = _utc_now_iso()
if status in {"success", "failed", "waiting_human"}:
if step_state.started_at is None:
step_state.started_at = _utc_now_iso()
step_state.finished_at = _utc_now_iso()
step_state.error = error
return
def _mark_running_steps_failed(message: str) -> None:
step_states = _run_steps_context.get()
if not step_states:
return
for step_state in step_states:
if step_state.status == "running":
step_state.status = "failed"
if step_state.started_at is None:
step_state.started_at = _utc_now_iso()
step_state.finished_at = _utc_now_iso()
step_state.error = RunError(code="workflow_error", message=message)
def _extract_output_summary(content: Any) -> str | None:
if not isinstance(content, dict):
return None
summary = content.get("summary")
if isinstance(summary, str) and summary:
return summary
payload = content.get("payload")
if isinstance(payload, dict):
payload_summary = payload.get("summary")
if isinstance(payload_summary, str) and payload_summary:
return payload_summary
return None
def _build_run_response(
*,
scenario_id: str,
input_data: dict[str, Any],
status: RunStatus,
steps: list[StepState],
scenario_name: str | None = None,
workflow_name: str | None = None,
output_summary: str | None = None,
result: dict[str, Any] | None = None,
error: RunError | None = None,
run_id: str | None = None,
session_id: str | None = None,
) -> dict[str, Any]:
return ScenarioRunResponse(
scenario_id=scenario_id,
status=status,
input=input_data,
steps=steps,
output_summary=output_summary,
scenario_name=scenario_name,
workflow_name=workflow_name,
result=result,
error=error,
run_id=run_id,
session_id=session_id,
).model_dump()
def _extract_input_url(step_input_value: Any) -> str: def _extract_input_url(step_input_value: Any) -> str:
if isinstance(step_input_value, dict): if isinstance(step_input_value, dict):
return str(step_input_value.get("url", "")).strip() return str(step_input_value.get("url", "")).strip()
@@ -70,14 +176,22 @@ def _build_input_schema_model(scenario: dict[str, Any]) -> type[BaseModel] | Non
async def _search_news_sources_executor(step_input: StepInput) -> StepOutput: async def _search_news_sources_executor(step_input: StepInput) -> StepOutput:
_update_step_state("search_news_sources", "running")
input_url = _extract_input_url(step_input.input) input_url = _extract_input_url(step_input.input)
if not input_url: if not input_url:
_update_step_state(
"search_news_sources",
"failed",
error=RunError(code="invalid_input", message="input.url is empty"),
)
return StepOutput(content="search_news_sources failed: input.url is empty", success=False) return StepOutput(content="search_news_sources failed: input.url is empty", success=False)
search_result = await stub_search_news_sources(url=input_url) search_result = await stub_search_news_sources(url=input_url)
_update_step_state("search_news_sources", "success")
return _as_json_step_output(search_result) return _as_json_step_output(search_result)
async def _parse_article_executor(step_input: StepInput) -> StepOutput: async def _parse_article_executor(step_input: StepInput) -> StepOutput:
_update_step_state("parse_articles_batch", "running")
previous_payload = _json_loads(step_input.previous_step_content) previous_payload = _json_loads(step_input.previous_step_content)
items = previous_payload.get("payload", {}).get("items", []) items = previous_payload.get("payload", {}).get("items", [])
@@ -86,9 +200,15 @@ async def _parse_article_executor(step_input: StepInput) -> StepOutput:
source_url = str(item.get("url", "")) source_url = str(item.get("url", ""))
parsed_result = await stub_parse_article(url=source_url) parsed_result = await stub_parse_article(url=source_url)
if not parsed_result.get("ok", False): if not parsed_result.get("ok", False):
_update_step_state(
"parse_articles_batch",
"failed",
error=RunError(code="tool_error", message="parse_article failed"),
)
return StepOutput(content="parse_article failed", success=False) return StepOutput(content="parse_article failed", success=False)
parsed_items.append(parsed_result.get("payload", {})) parsed_items.append(parsed_result.get("payload", {}))
_update_step_state("parse_articles_batch", "success")
return _as_json_step_output( return _as_json_step_output(
{ {
"tool_name": "parse_articles_batch", "tool_name": "parse_articles_batch",
@@ -99,6 +219,7 @@ async def _parse_article_executor(step_input: StepInput) -> StepOutput:
async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput: async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput:
_update_step_state("extract_publication_date_batch", "running")
previous_payload = _json_loads(step_input.previous_step_content) previous_payload = _json_loads(step_input.previous_step_content)
parsed_items = previous_payload.get("payload", {}).get("items", []) parsed_items = previous_payload.get("payload", {}).get("items", [])
@@ -107,6 +228,11 @@ async def _extract_publication_date_executor(step_input: StepInput) -> StepOutpu
article_text = str(item.get("text", "")) article_text = str(item.get("text", ""))
extract_result = await stub_extract_publication_date(article_text=article_text) extract_result = await stub_extract_publication_date(article_text=article_text)
if not extract_result.get("ok", False): if not extract_result.get("ok", False):
_update_step_state(
"extract_publication_date_batch",
"failed",
error=RunError(code="tool_error", message="extract_publication_date failed"),
)
return StepOutput(content="extract_publication_date failed", success=False) return StepOutput(content="extract_publication_date failed", success=False)
dated_items.append( dated_items.append(
@@ -119,6 +245,7 @@ async def _extract_publication_date_executor(step_input: StepInput) -> StepOutpu
} }
) )
_update_step_state("extract_publication_date_batch", "success")
return _as_json_step_output( return _as_json_step_output(
{ {
"tool_name": "extract_publication_date_batch", "tool_name": "extract_publication_date_batch",
@@ -129,16 +256,20 @@ async def _extract_publication_date_executor(step_input: StepInput) -> StepOutpu
async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput: async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput:
_update_step_state("rank_sources_by_date", "running")
previous_payload = _json_loads(step_input.previous_step_content) previous_payload = _json_loads(step_input.previous_step_content)
items = previous_payload.get("payload", {}).get("items", []) items = previous_payload.get("payload", {}).get("items", [])
rank_result = await stub_rank_sources_by_date(items=items) rank_result = await stub_rank_sources_by_date(items=items)
_update_step_state("rank_sources_by_date", "success")
return _as_json_step_output(rank_result) return _as_json_step_output(rank_result)
async def _generate_summary_executor(step_input: StepInput) -> StepOutput: async def _generate_summary_executor(step_input: StepInput) -> StepOutput:
_update_step_state("generate_summary", "running")
previous_payload = _json_loads(step_input.previous_step_content) previous_payload = _json_loads(step_input.previous_step_content)
ranked_items = previous_payload.get("payload", {}).get("ranked_items", []) ranked_items = previous_payload.get("payload", {}).get("ranked_items", [])
summary_result = await stub_generate_summary(items=ranked_items) summary_result = await stub_generate_summary(items=ranked_items)
_update_step_state("generate_summary", "success")
return _as_json_step_output(summary_result) return _as_json_step_output(summary_result)
@@ -198,29 +329,49 @@ async def run_scenario_workflow(
try: try:
scenario = load_scenario_definition(scenario_id) scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc: except ScenarioStoreError as exc:
return ScenarioRunFailed( return _build_run_response(
scenario_id=scenario_id, scenario_id=scenario_id,
input=input_data, input_data=input_data,
status="failed",
steps=[],
error=RunError(code="unknown_scenario", message=str(exc)), error=RunError(code="unknown_scenario", message=str(exc)),
).model_dump() )
step_states = _initialize_step_states(scenario)
scenario_name = str(scenario.get("name", scenario_id))
workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario) workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario)
input_schema_model = _workflow_input_schemas.get(scenario_id) input_schema_model = _workflow_input_schemas.get(scenario_id)
if input_schema_model is not None: if input_schema_model is not None:
try: try:
input_schema_model.model_validate(input_data) input_schema_model.model_validate(input_data)
except ValidationError as exc: except ValidationError as exc:
return ScenarioRunFailed( return _build_run_response(
scenario_id=scenario_id, scenario_id=scenario_id,
scenario_name=str(scenario.get("name", scenario_id)), input_data=input_data,
input=input_data, status="failed",
scenario_name=scenario_name,
steps=step_states,
error=RunError( error=RunError(
code="invalid_input", code="invalid_input",
message=f"Input does not match scenario input_schema: {exc}", message=f"Input does not match scenario input_schema: {exc}",
), ),
).model_dump() )
context_token = _run_steps_context.set(step_states)
try:
run_output = await workflow.arun(input=input_data) run_output = await workflow.arun(input=input_data)
except Exception as exc:
_mark_running_steps_failed(str(exc))
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="failed",
scenario_name=scenario_name,
steps=step_states,
error=RunError(code="workflow_error", message=str(exc)),
)
finally:
_run_steps_context.reset(context_token)
content: Any = run_output.content if hasattr(run_output, "content") else {} content: Any = run_output.content if hasattr(run_output, "content") else {}
if isinstance(content, str): if isinstance(content, str):
@@ -228,6 +379,23 @@ async def run_scenario_workflow(
content = json.loads(content) content = json.loads(content)
except json.JSONDecodeError: except json.JSONDecodeError:
content = {"raw_content": content} content = {"raw_content": content}
output_summary = _extract_output_summary(content)
normalized_result = content if isinstance(content, dict) else {"raw_content": str(content)}
if hasattr(run_output, "success") and not bool(getattr(run_output, "success")):
return _build_run_response(
scenario_id=scenario_id,
input_data=input_data,
status="failed",
scenario_name=scenario_name,
steps=step_states,
output_summary=output_summary,
result=normalized_result,
error=RunError(
code="workflow_failed",
message="Workflow finished with failed status.",
),
)
run_id: str | None = None run_id: str | None = None
session_id: str | None = None session_id: str | None = None
@@ -236,12 +404,15 @@ async def run_scenario_workflow(
if hasattr(run_output, "session_id"): if hasattr(run_output, "session_id"):
session_id = str(getattr(run_output, "session_id")) session_id = str(getattr(run_output, "session_id"))
return ScenarioRunSuccess( return _build_run_response(
scenario_id=scenario_id, scenario_id=scenario_id,
input_data=input_data,
status="success",
workflow_name=workflow.name, workflow_name=workflow.name,
scenario_name=str(scenario.get("name", scenario_id)), scenario_name=scenario_name,
input=input_data, steps=step_states,
result=content if isinstance(content, dict) else {"raw_content": str(content)}, output_summary=output_summary,
result=normalized_result,
run_id=run_id, run_id=run_id,
session_id=session_id, session_id=session_id,
).model_dump(exclude_none=True) )