From 2111964d8bdb86785fb9b58981d45a193775a4dd Mon Sep 17 00:00:00 2001 From: Barabashka Date: Tue, 21 Apr 2026 16:24:19 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20MVP=20workflow=20=D0=B7=D0=B0=D0=BF=D1=83=D1=81=D0=BA?= =?UTF-8?q?=D0=B0=20=D1=81=D1=86=D0=B5=D0=BD=D0=B0=D1=80=D0=B8=D1=8F=20?= =?UTF-8?q?=D0=BF=D0=BE=D0=B8=D1=81=D0=BA=D0=B0=20=D0=BF=D0=B5=D1=80=D0=B2?= =?UTF-8?q?=D0=BE=D0=B8=D1=81=D1=82=D0=BE=D1=87=D0=BD=D0=B8=D0=BA=D0=B0.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Подключает stub-инструменты и последовательный Agno workflow в CLI и AgentOS, чтобы запускать сценарий по URL и получать структурированный JSON-результат. --- README.md | 12 +++ src/agent_os.py | 4 +- src/main.py | 19 +++++ src/stub_tools.py | 96 +++++++++++++++++++++++ src/workflow_runner.py | 171 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 301 insertions(+), 1 deletion(-) create mode 100644 src/stub_tools.py create mode 100644 src/workflow_runner.py diff --git a/README.md b/README.md index ed6d15a..c60052f 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,12 @@ В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn). +## Требования + +- Python 3.11+ +- Запущенный Ollama endpoint (по умолчанию: `http://localhost:11435`) +- Доступная модель в Ollama (по умолчанию: `gemma4:31b`) + ## Текущая структура ```text @@ -57,6 +63,12 @@ python -m src.agent_os - `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/redoc` +Проверка, что сервер поднят: + +```bash +curl -s "http://127.0.0.1:7777/docs" | grep -n "Swagger UI" +``` + ## Переменные окружения Основные переменные: diff --git a/src/agent_os.py b/src/agent_os.py index 1aebe0a..07a9b7f 100644 --- a/src/agent_os.py +++ b/src/agent_os.py @@ -6,12 +6,14 @@ from agno.os import AgentOS from src.agent_runner import get_agent from src.observability import init_phoenix_tracing +from src.workflow_runner import get_news_source_workflow load_dotenv() _tracing_enabled = init_phoenix_tracing() _agent = get_agent() -_agent_os = AgentOS(agents=[_agent], tracing=_tracing_enabled) +_workflow = get_news_source_workflow() +_agent_os = AgentOS(agents=[_agent], workflows=[_workflow], tracing=_tracing_enabled) app = _agent_os.get_app() diff --git a/src/main.py b/src/main.py index 69ac1ea..dc8bed9 100644 --- a/src/main.py +++ b/src/main.py @@ -1,10 +1,12 @@ import argparse import asyncio +import json from dotenv import load_dotenv from src.agent_runner import run_agent from src.observability import init_phoenix_tracing +from src.workflow_runner import run_news_source_workflow def build_parser() -> argparse.ArgumentParser: @@ -15,6 +17,15 @@ def build_parser() -> argparse.ArgumentParser: "--message", help="Single message mode. If omitted, starts interactive chat.", ) + parser.add_argument( + "--workflow-input-url", + help="Run workflow mode for a news URL and print run result as JSON.", + ) + parser.add_argument( + "--scenario-id", + default="news_source_discovery_v1", + help="Scenario id for workflow mode.", + ) return parser @@ -23,6 +34,14 @@ async def _main() -> None: init_phoenix_tracing() args = build_parser().parse_args() + if args.workflow_input_url: + run_result = await run_news_source_workflow( + input_url=args.workflow_input_url, + scenario_id=args.scenario_id, + ) + print(json.dumps(run_result, ensure_ascii=False, indent=2)) + return + if args.message: result = await run_agent(args.message) print(result) diff --git a/src/stub_tools.py b/src/stub_tools.py new file mode 100644 index 0000000..4d35eb9 --- /dev/null +++ b/src/stub_tools.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + + +def _utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _base_result(tool_name: str, ok: bool, payload: dict[str, Any]) -> dict[str, Any]: + return { + "tool_name": tool_name, + "ok": ok, + "payload": payload, + "received_at": _utc_now_iso(), + } + + +async def stub_search_news_sources(url: str) -> dict[str, Any]: + return _base_result( + tool_name="search_news_sources", + ok=True, + payload={ + "input_url": url, + "items": [ + {"url": "https://news-a.example/article-1"}, + {"url": "https://news-b.example/article-2"}, + {"url": "https://news-c.example/article-3"}, + ], + }, + ) + + +async def stub_parse_article(url: str) -> dict[str, Any]: + return _base_result( + tool_name="parse_article", + ok=True, + payload={ + "url": url, + "title": "Stub article title", + "published_at": "2026-01-01T10:00:00+00:00", + "text": "Stub parsed article content.", + }, + ) + + +async def stub_extract_publication_date(article_text: str) -> dict[str, Any]: + return _base_result( + tool_name="extract_publication_date", + ok=True, + payload={ + "text_size": len(article_text), + "published_at": "2026-01-01T10:00:00+00:00", + "confidence": 0.77, + }, + ) + + +async def stub_rank_sources_by_date(items: list[dict[str, Any]]) -> dict[str, Any]: + ranked = sorted(items, key=lambda item: str(item.get("published_at", ""))) + return _base_result( + tool_name="rank_sources_by_date", + ok=True, + payload={ + "input_count": len(items), + "ranked_items": ranked, + }, + ) + + +async def stub_generate_summary(items: list[dict[str, Any]]) -> dict[str, Any]: + first_url = "" + if items: + first_url = str(items[0].get("url", "")) + + return _base_result( + tool_name="generate_summary", + ok=True, + payload={ + "input_count": len(items), + "summary": ( + "По заглушечным данным самым ранним источником считается " + + first_url + ), + }, + ) + + +STUB_TOOLS: dict[str, Any] = { + "search_news_sources": stub_search_news_sources, + "parse_article": stub_parse_article, + "extract_publication_date": stub_extract_publication_date, + "rank_sources_by_date": stub_rank_sources_by_date, + "generate_summary": stub_generate_summary, +} diff --git a/src/workflow_runner.py b/src/workflow_runner.py new file mode 100644 index 0000000..9b3af72 --- /dev/null +++ b/src/workflow_runner.py @@ -0,0 +1,171 @@ +from __future__ import annotations + +import json +from typing import Any + +from agno.workflow.step import Step, StepInput, StepOutput +from agno.workflow.workflow import Workflow +from src.stub_tools import ( + stub_extract_publication_date, + stub_generate_summary, + stub_parse_article, + stub_rank_sources_by_date, + stub_search_news_sources, +) + +_workflow: Workflow | None = None + + +def _json_loads(raw: str | None) -> dict[str, Any]: + if not raw: + return {} + try: + parsed = json.loads(raw) + except json.JSONDecodeError: + return {} + if isinstance(parsed, dict): + return parsed + return {} + + +def _as_json_step_output(payload: dict[str, Any]) -> StepOutput: + return StepOutput(content=json.dumps(payload, ensure_ascii=False)) + + +async def _search_news_sources_executor(step_input: StepInput) -> StepOutput: + input_url = str(step_input.input) + search_result = await stub_search_news_sources(url=input_url) + return _as_json_step_output(search_result) + + +async def _parse_article_executor(step_input: StepInput) -> StepOutput: + previous_payload = _json_loads(step_input.previous_step_content) + items = previous_payload.get("payload", {}).get("items", []) + + parsed_items: list[dict[str, Any]] = [] + for item in items: + source_url = str(item.get("url", "")) + parsed_result = await stub_parse_article(url=source_url) + if not parsed_result.get("ok", False): + return StepOutput(content="parse_article failed", success=False) + parsed_items.append(parsed_result.get("payload", {})) + + return _as_json_step_output( + { + "tool_name": "parse_articles_batch", + "ok": True, + "payload": {"items": parsed_items}, + } + ) + + +async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput: + previous_payload = _json_loads(step_input.previous_step_content) + parsed_items = previous_payload.get("payload", {}).get("items", []) + + dated_items: list[dict[str, Any]] = [] + for item in parsed_items: + article_text = str(item.get("text", "")) + extract_result = await stub_extract_publication_date(article_text=article_text) + if not extract_result.get("ok", False): + return StepOutput(content="extract_publication_date failed", success=False) + + dated_items.append( + { + "url": str(item.get("url", "")), + "title": str(item.get("title", "")), + "published_at": str( + extract_result.get("payload", {}).get("published_at", "") + ), + } + ) + + return _as_json_step_output( + { + "tool_name": "extract_publication_date_batch", + "ok": True, + "payload": {"items": dated_items}, + } + ) + + +async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput: + previous_payload = _json_loads(step_input.previous_step_content) + items = previous_payload.get("payload", {}).get("items", []) + rank_result = await stub_rank_sources_by_date(items=items) + return _as_json_step_output(rank_result) + + +async def _generate_summary_executor(step_input: StepInput) -> StepOutput: + previous_payload = _json_loads(step_input.previous_step_content) + ranked_items = previous_payload.get("payload", {}).get("ranked_items", []) + summary_result = await stub_generate_summary(items=ranked_items) + return _as_json_step_output(summary_result) + + +def get_news_source_workflow() -> Workflow: + global _workflow + + if _workflow is not None: + return _workflow + + _workflow = Workflow( + name="news_source_discovery_v1", + description="Find earliest news source using sequential stub tools.", + steps=[ + Step( + name="search_news_sources", + description="Find related source URLs for input news URL", + executor=_search_news_sources_executor, + ), + Step( + name="parse_articles_batch", + description="Parse each found source URL", + executor=_parse_article_executor, + ), + Step( + name="extract_publication_date_batch", + description="Extract publication date for each parsed article", + executor=_extract_publication_date_executor, + ), + Step( + name="rank_sources_by_date", + description="Sort sources by publication date", + executor=_rank_sources_by_date_executor, + ), + Step( + name="generate_summary", + description="Generate final workflow summary", + executor=_generate_summary_executor, + ), + ], + ) + return _workflow + + +async def run_news_source_workflow( + input_url: str, + scenario_id: str = "news_source_discovery_v1", +) -> dict[str, Any]: + workflow = get_news_source_workflow() + run_output = await workflow.arun(input=input_url) + + content: Any = run_output.content if hasattr(run_output, "content") else {} + if isinstance(content, str): + try: + content = json.loads(content) + except json.JSONDecodeError: + content = {"raw_content": content} + + response: dict[str, Any] = { + "scenario_id": scenario_id, + "workflow_name": workflow.name, + "status": "success", + "input": {"url": input_url}, + "result": content, + } + if hasattr(run_output, "run_id"): + response["run_id"] = str(getattr(run_output, "run_id")) + if hasattr(run_output, "session_id"): + response["session_id"] = str(getattr(run_output, "session_id")) + return response