Добавить MVP workflow запуска сценария поиска первоисточника.

Подключает stub-инструменты и последовательный Agno workflow в CLI и AgentOS, чтобы запускать сценарий по URL и получать структурированный JSON-результат.
This commit is contained in:
Barabashka
2026-04-21 16:24:19 +03:00
parent d22db07b43
commit 2111964d8b
5 changed files with 301 additions and 1 deletions
+12
View File
@@ -4,6 +4,12 @@
В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn). В этом проекте AgentOS работает как HTTP API сервер (FastAPI + Uvicorn).
## Требования
- Python 3.11+
- Запущенный Ollama endpoint (по умолчанию: `http://localhost:11435`)
- Доступная модель в Ollama (по умолчанию: `gemma4:31b`)
## Текущая структура ## Текущая структура
```text ```text
@@ -57,6 +63,12 @@ python -m src.agent_os
- `http://127.0.0.1:7777/docs` - `http://127.0.0.1:7777/docs`
- `http://127.0.0.1:7777/redoc` - `http://127.0.0.1:7777/redoc`
Проверка, что сервер поднят:
```bash
curl -s "http://127.0.0.1:7777/docs" | grep -n "Swagger UI"
```
## Переменные окружения ## Переменные окружения
Основные переменные: Основные переменные:
+3 -1
View File
@@ -6,12 +6,14 @@ from agno.os import AgentOS
from src.agent_runner import get_agent from src.agent_runner import get_agent
from src.observability import init_phoenix_tracing from src.observability import init_phoenix_tracing
from src.workflow_runner import get_news_source_workflow
load_dotenv() load_dotenv()
_tracing_enabled = init_phoenix_tracing() _tracing_enabled = init_phoenix_tracing()
_agent = get_agent() _agent = get_agent()
_agent_os = AgentOS(agents=[_agent], tracing=_tracing_enabled) _workflow = get_news_source_workflow()
_agent_os = AgentOS(agents=[_agent], workflows=[_workflow], tracing=_tracing_enabled)
app = _agent_os.get_app() app = _agent_os.get_app()
+19
View File
@@ -1,10 +1,12 @@
import argparse import argparse
import asyncio import asyncio
import json
from dotenv import load_dotenv from dotenv import load_dotenv
from src.agent_runner import run_agent from src.agent_runner import run_agent
from src.observability import init_phoenix_tracing from src.observability import init_phoenix_tracing
from src.workflow_runner import run_news_source_workflow
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
@@ -15,6 +17,15 @@ def build_parser() -> argparse.ArgumentParser:
"--message", "--message",
help="Single message mode. If omitted, starts interactive chat.", help="Single message mode. If omitted, starts interactive chat.",
) )
parser.add_argument(
"--workflow-input-url",
help="Run workflow mode for a news URL and print run result as JSON.",
)
parser.add_argument(
"--scenario-id",
default="news_source_discovery_v1",
help="Scenario id for workflow mode.",
)
return parser return parser
@@ -23,6 +34,14 @@ async def _main() -> None:
init_phoenix_tracing() init_phoenix_tracing()
args = build_parser().parse_args() args = build_parser().parse_args()
if args.workflow_input_url:
run_result = await run_news_source_workflow(
input_url=args.workflow_input_url,
scenario_id=args.scenario_id,
)
print(json.dumps(run_result, ensure_ascii=False, indent=2))
return
if args.message: if args.message:
result = await run_agent(args.message) result = await run_agent(args.message)
print(result) print(result)
+96
View File
@@ -0,0 +1,96 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _base_result(tool_name: str, ok: bool, payload: dict[str, Any]) -> dict[str, Any]:
return {
"tool_name": tool_name,
"ok": ok,
"payload": payload,
"received_at": _utc_now_iso(),
}
async def stub_search_news_sources(url: str) -> dict[str, Any]:
return _base_result(
tool_name="search_news_sources",
ok=True,
payload={
"input_url": url,
"items": [
{"url": "https://news-a.example/article-1"},
{"url": "https://news-b.example/article-2"},
{"url": "https://news-c.example/article-3"},
],
},
)
async def stub_parse_article(url: str) -> dict[str, Any]:
return _base_result(
tool_name="parse_article",
ok=True,
payload={
"url": url,
"title": "Stub article title",
"published_at": "2026-01-01T10:00:00+00:00",
"text": "Stub parsed article content.",
},
)
async def stub_extract_publication_date(article_text: str) -> dict[str, Any]:
return _base_result(
tool_name="extract_publication_date",
ok=True,
payload={
"text_size": len(article_text),
"published_at": "2026-01-01T10:00:00+00:00",
"confidence": 0.77,
},
)
async def stub_rank_sources_by_date(items: list[dict[str, Any]]) -> dict[str, Any]:
ranked = sorted(items, key=lambda item: str(item.get("published_at", "")))
return _base_result(
tool_name="rank_sources_by_date",
ok=True,
payload={
"input_count": len(items),
"ranked_items": ranked,
},
)
async def stub_generate_summary(items: list[dict[str, Any]]) -> dict[str, Any]:
first_url = ""
if items:
first_url = str(items[0].get("url", ""))
return _base_result(
tool_name="generate_summary",
ok=True,
payload={
"input_count": len(items),
"summary": (
"По заглушечным данным самым ранним источником считается "
+ first_url
),
},
)
STUB_TOOLS: dict[str, Any] = {
"search_news_sources": stub_search_news_sources,
"parse_article": stub_parse_article,
"extract_publication_date": stub_extract_publication_date,
"rank_sources_by_date": stub_rank_sources_by_date,
"generate_summary": stub_generate_summary,
}
+171
View File
@@ -0,0 +1,171 @@
from __future__ import annotations
import json
from typing import Any
from agno.workflow.step import Step, StepInput, StepOutput
from agno.workflow.workflow import Workflow
from src.stub_tools import (
stub_extract_publication_date,
stub_generate_summary,
stub_parse_article,
stub_rank_sources_by_date,
stub_search_news_sources,
)
_workflow: Workflow | None = None
def _json_loads(raw: str | None) -> dict[str, Any]:
if not raw:
return {}
try:
parsed = json.loads(raw)
except json.JSONDecodeError:
return {}
if isinstance(parsed, dict):
return parsed
return {}
def _as_json_step_output(payload: dict[str, Any]) -> StepOutput:
return StepOutput(content=json.dumps(payload, ensure_ascii=False))
async def _search_news_sources_executor(step_input: StepInput) -> StepOutput:
input_url = str(step_input.input)
search_result = await stub_search_news_sources(url=input_url)
return _as_json_step_output(search_result)
async def _parse_article_executor(step_input: StepInput) -> StepOutput:
previous_payload = _json_loads(step_input.previous_step_content)
items = previous_payload.get("payload", {}).get("items", [])
parsed_items: list[dict[str, Any]] = []
for item in items:
source_url = str(item.get("url", ""))
parsed_result = await stub_parse_article(url=source_url)
if not parsed_result.get("ok", False):
return StepOutput(content="parse_article failed", success=False)
parsed_items.append(parsed_result.get("payload", {}))
return _as_json_step_output(
{
"tool_name": "parse_articles_batch",
"ok": True,
"payload": {"items": parsed_items},
}
)
async def _extract_publication_date_executor(step_input: StepInput) -> StepOutput:
previous_payload = _json_loads(step_input.previous_step_content)
parsed_items = previous_payload.get("payload", {}).get("items", [])
dated_items: list[dict[str, Any]] = []
for item in parsed_items:
article_text = str(item.get("text", ""))
extract_result = await stub_extract_publication_date(article_text=article_text)
if not extract_result.get("ok", False):
return StepOutput(content="extract_publication_date failed", success=False)
dated_items.append(
{
"url": str(item.get("url", "")),
"title": str(item.get("title", "")),
"published_at": str(
extract_result.get("payload", {}).get("published_at", "")
),
}
)
return _as_json_step_output(
{
"tool_name": "extract_publication_date_batch",
"ok": True,
"payload": {"items": dated_items},
}
)
async def _rank_sources_by_date_executor(step_input: StepInput) -> StepOutput:
previous_payload = _json_loads(step_input.previous_step_content)
items = previous_payload.get("payload", {}).get("items", [])
rank_result = await stub_rank_sources_by_date(items=items)
return _as_json_step_output(rank_result)
async def _generate_summary_executor(step_input: StepInput) -> StepOutput:
previous_payload = _json_loads(step_input.previous_step_content)
ranked_items = previous_payload.get("payload", {}).get("ranked_items", [])
summary_result = await stub_generate_summary(items=ranked_items)
return _as_json_step_output(summary_result)
def get_news_source_workflow() -> Workflow:
global _workflow
if _workflow is not None:
return _workflow
_workflow = Workflow(
name="news_source_discovery_v1",
description="Find earliest news source using sequential stub tools.",
steps=[
Step(
name="search_news_sources",
description="Find related source URLs for input news URL",
executor=_search_news_sources_executor,
),
Step(
name="parse_articles_batch",
description="Parse each found source URL",
executor=_parse_article_executor,
),
Step(
name="extract_publication_date_batch",
description="Extract publication date for each parsed article",
executor=_extract_publication_date_executor,
),
Step(
name="rank_sources_by_date",
description="Sort sources by publication date",
executor=_rank_sources_by_date_executor,
),
Step(
name="generate_summary",
description="Generate final workflow summary",
executor=_generate_summary_executor,
),
],
)
return _workflow
async def run_news_source_workflow(
input_url: str,
scenario_id: str = "news_source_discovery_v1",
) -> dict[str, Any]:
workflow = get_news_source_workflow()
run_output = await workflow.arun(input=input_url)
content: Any = run_output.content if hasattr(run_output, "content") else {}
if isinstance(content, str):
try:
content = json.loads(content)
except json.JSONDecodeError:
content = {"raw_content": content}
response: dict[str, Any] = {
"scenario_id": scenario_id,
"workflow_name": workflow.name,
"status": "success",
"input": {"url": input_url},
"result": content,
}
if hasattr(run_output, "run_id"):
response["run_id"] = str(getattr(run_output, "run_id"))
if hasattr(run_output, "session_id"):
response["session_id"] = str(getattr(run_output, "session_id"))
return response