Вынести сценарий в JSON и добавить динамический loader.

Переключает запуск workflow на загрузку сценария из файлового хранилища по scenario_id и собирает шаги выполнения из definition.steps вместо хардкода в раннере.
This commit is contained in:
Barabashka
2026-04-21 17:07:24 +03:00
parent 2111964d8b
commit 2b0c748474
7 changed files with 214 additions and 48 deletions
+14 -1
View File
@@ -17,12 +17,19 @@ prisma_platform/
├── .env ├── .env
├── .env.example ├── .env.example
├── requirements.txt ├── requirements.txt
├── scenarios/
│ ├── index.json
│ └── news_source_discovery/
│ └── v1.json
└── src/ └── src/
├── __init__.py ├── __init__.py
├── agent_os.py ├── agent_os.py
├── agent_runner.py ├── agent_runner.py
├── main.py ├── main.py
── observability.py ── observability.py
├── scenario_store.py
├── stub_tools.py
└── workflow_runner.py
``` ```
## Установка ## Установка
@@ -48,6 +55,12 @@ python -m src.main
python -m src.main --message "Привет, что ты умеешь?" python -m src.main --message "Привет, что ты умеешь?"
``` ```
Режим запуска сценария (идет загрузка сценария из `scenarios/index.json`):
```bash
python -m src.main --scenario-id news_source_discovery_v1 --workflow-input-json '{"url":"https://example.com/news"}'
```
## Запуск AgentOS ## Запуск AgentOS
Запуск сервера AgentOS: Запуск сервера AgentOS:
+5
View File
@@ -0,0 +1,5 @@
{
"scenarios": {
"news_source_discovery_v1": "news_source_discovery/v1.json"
}
}
+40
View File
@@ -0,0 +1,40 @@
{
"schema_version": "1",
"scenario_id": "news_source_discovery_v1",
"name": "News Source Discovery V1",
"description": "Find earliest news source using sequential stub tools.",
"input_schema": {
"type": "object",
"required": [
"url"
],
"properties": {
"url": {
"type": "string",
"description": "URL of source news article"
}
}
},
"steps": [
{
"name": "search_news_sources",
"type": "tool"
},
{
"name": "parse_articles_batch",
"type": "tool"
},
{
"name": "extract_publication_date_batch",
"type": "tool"
},
{
"name": "rank_sources_by_date",
"type": "tool"
},
{
"name": "generate_summary",
"type": "tool"
}
]
}
+5 -2
View File
@@ -6,13 +6,16 @@ from agno.os import AgentOS
from src.agent_runner import get_agent from src.agent_runner import get_agent
from src.observability import init_phoenix_tracing from src.observability import init_phoenix_tracing
from src.workflow_runner import get_news_source_workflow from src.scenario_store import load_scenario_definition
from src.workflow_runner import get_workflow_for_scenario
load_dotenv() load_dotenv()
_tracing_enabled = init_phoenix_tracing() _tracing_enabled = init_phoenix_tracing()
_agent = get_agent() _agent = get_agent()
_workflow = get_news_source_workflow() _default_scenario_id = "news_source_discovery_v1"
_scenario = load_scenario_definition(_default_scenario_id)
_workflow = get_workflow_for_scenario(_default_scenario_id, _scenario)
_agent_os = AgentOS(agents=[_agent], workflows=[_workflow], tracing=_tracing_enabled) _agent_os = AgentOS(agents=[_agent], workflows=[_workflow], tracing=_tracing_enabled)
app = _agent_os.get_app() app = _agent_os.get_app()
+18 -6
View File
@@ -1,12 +1,13 @@
import argparse import argparse
import asyncio import asyncio
import json import json
from typing import Any
from dotenv import load_dotenv from dotenv import load_dotenv
from src.agent_runner import run_agent from src.agent_runner import run_agent
from src.observability import init_phoenix_tracing from src.observability import init_phoenix_tracing
from src.workflow_runner import run_news_source_workflow from src.workflow_runner import run_scenario_workflow
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
@@ -18,8 +19,8 @@ def build_parser() -> argparse.ArgumentParser:
help="Single message mode. If omitted, starts interactive chat.", help="Single message mode. If omitted, starts interactive chat.",
) )
parser.add_argument( parser.add_argument(
"--workflow-input-url", "--workflow-input-json",
help="Run workflow mode for a news URL and print run result as JSON.", help="Run workflow mode with JSON object input, for example: '{\"url\":\"https://example.com/news\"}'.",
) )
parser.add_argument( parser.add_argument(
"--scenario-id", "--scenario-id",
@@ -34,9 +35,20 @@ async def _main() -> None:
init_phoenix_tracing() init_phoenix_tracing()
args = build_parser().parse_args() args = build_parser().parse_args()
if args.workflow_input_url: workflow_input: dict[str, Any] | None = None
run_result = await run_news_source_workflow( if args.workflow_input_json:
input_url=args.workflow_input_url, try:
parsed = json.loads(args.workflow_input_json)
except json.JSONDecodeError:
print("Invalid --workflow-input-json: expected valid JSON object.")
return
if not isinstance(parsed, dict):
print("Invalid --workflow-input-json: expected JSON object.")
return
workflow_input = parsed
if workflow_input is not None:
run_result = await run_scenario_workflow(
input_data=workflow_input,
scenario_id=args.scenario_id, scenario_id=args.scenario_id,
) )
print(json.dumps(run_result, ensure_ascii=False, indent=2)) print(json.dumps(run_result, ensure_ascii=False, indent=2))
+60
View File
@@ -0,0 +1,60 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
class ScenarioStoreError(ValueError):
"""Raised when scenario definitions are missing or invalid."""
_SCENARIOS_ROOT = Path(__file__).resolve().parent.parent / "scenarios"
_INDEX_PATH = _SCENARIOS_ROOT / "index.json"
def _read_json(path: Path) -> dict[str, Any]:
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError as exc:
raise ScenarioStoreError(f"Scenario file not found: {path}") from exc
try:
parsed = json.loads(raw)
except json.JSONDecodeError as exc:
raise ScenarioStoreError(f"Invalid JSON in file: {path}") from exc
if not isinstance(parsed, dict):
raise ScenarioStoreError(f"JSON root must be object: {path}")
return parsed
def load_scenario_index() -> dict[str, str]:
index = _read_json(_INDEX_PATH)
scenarios = index.get("scenarios")
if not isinstance(scenarios, dict):
raise ScenarioStoreError("index.json must contain object field 'scenarios'")
normalized: dict[str, str] = {}
for scenario_id, relative_path in scenarios.items():
if not isinstance(scenario_id, str) or not isinstance(relative_path, str):
raise ScenarioStoreError("index.json scenario entries must be string -> string")
normalized[scenario_id] = relative_path
return normalized
def load_scenario_definition(scenario_id: str) -> dict[str, Any]:
index = load_scenario_index()
relative_path = index.get(scenario_id)
if relative_path is None:
raise ScenarioStoreError(f"Unknown scenario_id: {scenario_id}")
scenario_path = _SCENARIOS_ROOT / relative_path
scenario = _read_json(scenario_path)
declared_id = scenario.get("scenario_id")
if declared_id != scenario_id:
raise ScenarioStoreError(
"Scenario file scenario_id does not match requested scenario_id"
)
return scenario
+72 -39
View File
@@ -5,6 +5,7 @@ from typing import Any
from agno.workflow.step import Step, StepInput, StepOutput from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow from agno.workflow.workflow import Workflow
from src.scenario_store import ScenarioStoreError, load_scenario_definition
from src.stub_tools import ( from src.stub_tools import (
stub_extract_publication_date, stub_extract_publication_date,
stub_generate_summary, stub_generate_summary,
@@ -13,7 +14,7 @@ from src.stub_tools import (
stub_search_news_sources, stub_search_news_sources,
) )
_workflow: Workflow | None = None _workflow_cache: dict[str, Workflow] = {}
def _json_loads(raw: str | None) -> dict[str, Any]: def _json_loads(raw: str | None) -> dict[str, Any]:
@@ -103,51 +104,82 @@ async def _generate_summary_executor(step_input: StepInput) -> StepOutput:
return _as_json_step_output(summary_result) return _as_json_step_output(summary_result)
def get_news_source_workflow() -> Workflow: _step_executors = {
global _workflow "search_news_sources": _search_news_sources_executor,
"parse_articles_batch": _parse_article_executor,
"extract_publication_date_batch": _extract_publication_date_executor,
"rank_sources_by_date": _rank_sources_by_date_executor,
"generate_summary": _generate_summary_executor,
}
if _workflow is not None:
return _workflow
_workflow = Workflow( def get_workflow_for_scenario(scenario_id: str, scenario: dict[str, Any]) -> Workflow:
name="news_source_discovery_v1", cached_workflow = _workflow_cache.get(scenario_id)
description="Find earliest news source using sequential stub tools.", if cached_workflow is not None:
steps=[ return cached_workflow
raw_steps = scenario.get("steps")
if not isinstance(raw_steps, list) or not raw_steps:
raise ScenarioStoreError("Scenario must contain non-empty steps list")
workflow_steps: list[Step] = []
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
raise ScenarioStoreError("Each scenario step must be object")
step_name = str(raw_step.get("name", "")).strip()
if not step_name:
raise ScenarioStoreError("Each scenario step must have non-empty name")
step_executor = _step_executors.get(step_name)
if step_executor is None:
raise ScenarioStoreError(f"Unknown step executor: {step_name}")
workflow_steps.append(
Step( Step(
name="search_news_sources", name=step_name,
description="Find related source URLs for input news URL", description=str(raw_step.get("description", step_name)),
executor=_search_news_sources_executor, executor=step_executor,
),
Step(
name="parse_articles_batch",
description="Parse each found source URL",
executor=_parse_article_executor,
),
Step(
name="extract_publication_date_batch",
description="Extract publication date for each parsed article",
executor=_extract_publication_date_executor,
),
Step(
name="rank_sources_by_date",
description="Sort sources by publication date",
executor=_rank_sources_by_date_executor,
),
Step(
name="generate_summary",
description="Generate final workflow summary",
executor=_generate_summary_executor,
),
],
) )
return _workflow )
workflow = Workflow(
name=scenario_id,
description=str(scenario.get("description", "")),
steps=workflow_steps,
)
_workflow_cache[scenario_id] = workflow
return workflow
async def run_news_source_workflow( async def run_scenario_workflow(
input_url: str, input_data: dict[str, Any],
scenario_id: str = "news_source_discovery_v1", scenario_id: str = "news_source_discovery_v1",
) -> dict[str, Any]: ) -> dict[str, Any]:
workflow = get_news_source_workflow() try:
scenario = load_scenario_definition(scenario_id)
except ScenarioStoreError as exc:
return {
"scenario_id": scenario_id,
"status": "failed",
"input": input_data,
"error": {
"code": "unknown_scenario",
"message": str(exc),
},
}
input_url = str(input_data.get("url", "")).strip()
if not input_url:
return {
"scenario_id": scenario_id,
"status": "failed",
"scenario_name": str(scenario.get("name", scenario_id)),
"input": input_data,
"error": {
"code": "invalid_input",
"message": "Current scenario expects input.url as non-empty string.",
},
}
workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario)
run_output = await workflow.arun(input=input_url) run_output = await workflow.arun(input=input_url)
content: Any = run_output.content if hasattr(run_output, "content") else {} content: Any = run_output.content if hasattr(run_output, "content") else {}
@@ -160,8 +192,9 @@ async def run_news_source_workflow(
response: dict[str, Any] = { response: dict[str, Any] = {
"scenario_id": scenario_id, "scenario_id": scenario_id,
"workflow_name": workflow.name, "workflow_name": workflow.name,
"scenario_name": str(scenario.get("name", scenario_id)),
"status": "success", "status": "success",
"input": {"url": input_url}, "input": input_data,
"result": content, "result": content,
} }
if hasattr(run_output, "run_id"): if hasattr(run_output, "run_id"):