diff --git a/README.md b/README.md index c60052f..5e24e2a 100644 --- a/README.md +++ b/README.md @@ -17,12 +17,19 @@ prisma_platform/ ├── .env ├── .env.example ├── requirements.txt +├── scenarios/ +│ ├── index.json +│ └── news_source_discovery/ +│ └── v1.json └── src/ ├── __init__.py ├── agent_os.py ├── agent_runner.py ├── main.py - └── observability.py + ├── observability.py + ├── scenario_store.py + ├── stub_tools.py + └── workflow_runner.py ``` ## Установка @@ -48,6 +55,12 @@ python -m src.main python -m src.main --message "Привет, что ты умеешь?" ``` +Режим запуска сценария (идет загрузка сценария из `scenarios/index.json`): + +```bash +python -m src.main --scenario-id news_source_discovery_v1 --workflow-input-json '{"url":"https://example.com/news"}' +``` + ## Запуск AgentOS Запуск сервера AgentOS: diff --git a/scenarios/index.json b/scenarios/index.json new file mode 100644 index 0000000..e127bfd --- /dev/null +++ b/scenarios/index.json @@ -0,0 +1,5 @@ +{ + "scenarios": { + "news_source_discovery_v1": "news_source_discovery/v1.json" + } +} diff --git a/scenarios/news_source_discovery/v1.json b/scenarios/news_source_discovery/v1.json new file mode 100644 index 0000000..deb1f91 --- /dev/null +++ b/scenarios/news_source_discovery/v1.json @@ -0,0 +1,40 @@ +{ + "schema_version": "1", + "scenario_id": "news_source_discovery_v1", + "name": "News Source Discovery V1", + "description": "Find earliest news source using sequential stub tools.", + "input_schema": { + "type": "object", + "required": [ + "url" + ], + "properties": { + "url": { + "type": "string", + "description": "URL of source news article" + } + } + }, + "steps": [ + { + "name": "search_news_sources", + "type": "tool" + }, + { + "name": "parse_articles_batch", + "type": "tool" + }, + { + "name": "extract_publication_date_batch", + "type": "tool" + }, + { + "name": "rank_sources_by_date", + "type": "tool" + }, + { + "name": "generate_summary", + "type": "tool" + } + ] +} diff --git a/src/agent_os.py b/src/agent_os.py index 07a9b7f..39a2505 100644 --- a/src/agent_os.py +++ b/src/agent_os.py @@ -6,13 +6,16 @@ from agno.os import AgentOS from src.agent_runner import get_agent from src.observability import init_phoenix_tracing -from src.workflow_runner import get_news_source_workflow +from src.scenario_store import load_scenario_definition +from src.workflow_runner import get_workflow_for_scenario load_dotenv() _tracing_enabled = init_phoenix_tracing() _agent = get_agent() -_workflow = get_news_source_workflow() +_default_scenario_id = "news_source_discovery_v1" +_scenario = load_scenario_definition(_default_scenario_id) +_workflow = get_workflow_for_scenario(_default_scenario_id, _scenario) _agent_os = AgentOS(agents=[_agent], workflows=[_workflow], tracing=_tracing_enabled) app = _agent_os.get_app() diff --git a/src/main.py b/src/main.py index dc8bed9..4a379d5 100644 --- a/src/main.py +++ b/src/main.py @@ -1,12 +1,13 @@ import argparse import asyncio import json +from typing import Any from dotenv import load_dotenv from src.agent_runner import run_agent from src.observability import init_phoenix_tracing -from src.workflow_runner import run_news_source_workflow +from src.workflow_runner import run_scenario_workflow def build_parser() -> argparse.ArgumentParser: @@ -18,8 +19,8 @@ def build_parser() -> argparse.ArgumentParser: help="Single message mode. If omitted, starts interactive chat.", ) parser.add_argument( - "--workflow-input-url", - help="Run workflow mode for a news URL and print run result as JSON.", + "--workflow-input-json", + help="Run workflow mode with JSON object input, for example: '{\"url\":\"https://example.com/news\"}'.", ) parser.add_argument( "--scenario-id", @@ -34,9 +35,20 @@ async def _main() -> None: init_phoenix_tracing() args = build_parser().parse_args() - if args.workflow_input_url: - run_result = await run_news_source_workflow( - input_url=args.workflow_input_url, + workflow_input: dict[str, Any] | None = None + if args.workflow_input_json: + try: + parsed = json.loads(args.workflow_input_json) + except json.JSONDecodeError: + print("Invalid --workflow-input-json: expected valid JSON object.") + return + if not isinstance(parsed, dict): + print("Invalid --workflow-input-json: expected JSON object.") + return + workflow_input = parsed + if workflow_input is not None: + run_result = await run_scenario_workflow( + input_data=workflow_input, scenario_id=args.scenario_id, ) print(json.dumps(run_result, ensure_ascii=False, indent=2)) diff --git a/src/scenario_store.py b/src/scenario_store.py new file mode 100644 index 0000000..bc09e29 --- /dev/null +++ b/src/scenario_store.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +class ScenarioStoreError(ValueError): + """Raised when scenario definitions are missing or invalid.""" + + +_SCENARIOS_ROOT = Path(__file__).resolve().parent.parent / "scenarios" +_INDEX_PATH = _SCENARIOS_ROOT / "index.json" + + +def _read_json(path: Path) -> dict[str, Any]: + try: + raw = path.read_text(encoding="utf-8") + except FileNotFoundError as exc: + raise ScenarioStoreError(f"Scenario file not found: {path}") from exc + + try: + parsed = json.loads(raw) + except json.JSONDecodeError as exc: + raise ScenarioStoreError(f"Invalid JSON in file: {path}") from exc + + if not isinstance(parsed, dict): + raise ScenarioStoreError(f"JSON root must be object: {path}") + return parsed + + +def load_scenario_index() -> dict[str, str]: + index = _read_json(_INDEX_PATH) + scenarios = index.get("scenarios") + if not isinstance(scenarios, dict): + raise ScenarioStoreError("index.json must contain object field 'scenarios'") + + normalized: dict[str, str] = {} + for scenario_id, relative_path in scenarios.items(): + if not isinstance(scenario_id, str) or not isinstance(relative_path, str): + raise ScenarioStoreError("index.json scenario entries must be string -> string") + normalized[scenario_id] = relative_path + return normalized + + +def load_scenario_definition(scenario_id: str) -> dict[str, Any]: + index = load_scenario_index() + relative_path = index.get(scenario_id) + if relative_path is None: + raise ScenarioStoreError(f"Unknown scenario_id: {scenario_id}") + + scenario_path = _SCENARIOS_ROOT / relative_path + scenario = _read_json(scenario_path) + + declared_id = scenario.get("scenario_id") + if declared_id != scenario_id: + raise ScenarioStoreError( + "Scenario file scenario_id does not match requested scenario_id" + ) + return scenario diff --git a/src/workflow_runner.py b/src/workflow_runner.py index 9b3af72..c4f8726 100644 --- a/src/workflow_runner.py +++ b/src/workflow_runner.py @@ -5,6 +5,7 @@ from typing import Any from agno.workflow.step import Step, StepInput, StepOutput from agno.workflow.workflow import Workflow +from src.scenario_store import ScenarioStoreError, load_scenario_definition from src.stub_tools import ( stub_extract_publication_date, stub_generate_summary, @@ -13,7 +14,7 @@ from src.stub_tools import ( stub_search_news_sources, ) -_workflow: Workflow | None = None +_workflow_cache: dict[str, Workflow] = {} def _json_loads(raw: str | None) -> dict[str, Any]: @@ -103,51 +104,82 @@ async def _generate_summary_executor(step_input: StepInput) -> StepOutput: return _as_json_step_output(summary_result) -def get_news_source_workflow() -> Workflow: - global _workflow +_step_executors = { + "search_news_sources": _search_news_sources_executor, + "parse_articles_batch": _parse_article_executor, + "extract_publication_date_batch": _extract_publication_date_executor, + "rank_sources_by_date": _rank_sources_by_date_executor, + "generate_summary": _generate_summary_executor, +} - if _workflow is not None: - return _workflow - _workflow = Workflow( - name="news_source_discovery_v1", - description="Find earliest news source using sequential stub tools.", - steps=[ +def get_workflow_for_scenario(scenario_id: str, scenario: dict[str, Any]) -> Workflow: + cached_workflow = _workflow_cache.get(scenario_id) + if cached_workflow is not None: + return cached_workflow + + raw_steps = scenario.get("steps") + if not isinstance(raw_steps, list) or not raw_steps: + raise ScenarioStoreError("Scenario must contain non-empty steps list") + + workflow_steps: list[Step] = [] + for raw_step in raw_steps: + if not isinstance(raw_step, dict): + raise ScenarioStoreError("Each scenario step must be object") + step_name = str(raw_step.get("name", "")).strip() + if not step_name: + raise ScenarioStoreError("Each scenario step must have non-empty name") + step_executor = _step_executors.get(step_name) + if step_executor is None: + raise ScenarioStoreError(f"Unknown step executor: {step_name}") + workflow_steps.append( Step( - name="search_news_sources", - description="Find related source URLs for input news URL", - executor=_search_news_sources_executor, - ), - Step( - name="parse_articles_batch", - description="Parse each found source URL", - executor=_parse_article_executor, - ), - Step( - name="extract_publication_date_batch", - description="Extract publication date for each parsed article", - executor=_extract_publication_date_executor, - ), - Step( - name="rank_sources_by_date", - description="Sort sources by publication date", - executor=_rank_sources_by_date_executor, - ), - Step( - name="generate_summary", - description="Generate final workflow summary", - executor=_generate_summary_executor, - ), - ], + name=step_name, + description=str(raw_step.get("description", step_name)), + executor=step_executor, + ) + ) + + workflow = Workflow( + name=scenario_id, + description=str(scenario.get("description", "")), + steps=workflow_steps, ) - return _workflow + _workflow_cache[scenario_id] = workflow + return workflow -async def run_news_source_workflow( - input_url: str, +async def run_scenario_workflow( + input_data: dict[str, Any], scenario_id: str = "news_source_discovery_v1", ) -> dict[str, Any]: - workflow = get_news_source_workflow() + try: + scenario = load_scenario_definition(scenario_id) + except ScenarioStoreError as exc: + return { + "scenario_id": scenario_id, + "status": "failed", + "input": input_data, + "error": { + "code": "unknown_scenario", + "message": str(exc), + }, + } + + input_url = str(input_data.get("url", "")).strip() + if not input_url: + return { + "scenario_id": scenario_id, + "status": "failed", + "scenario_name": str(scenario.get("name", scenario_id)), + "input": input_data, + "error": { + "code": "invalid_input", + "message": "Current scenario expects input.url as non-empty string.", + }, + } + + workflow = get_workflow_for_scenario(scenario_id=scenario_id, scenario=scenario) run_output = await workflow.arun(input=input_url) content: Any = run_output.content if hasattr(run_output, "content") else {} @@ -160,8 +192,9 @@ async def run_news_source_workflow( response: dict[str, Any] = { "scenario_id": scenario_id, "workflow_name": workflow.name, + "scenario_name": str(scenario.get("name", scenario_id)), "status": "success", - "input": {"url": input_url}, + "input": input_data, "result": content, } if hasattr(run_output, "run_id"):