from __future__ import annotations import os from datetime import datetime, timezone from typing import Any from mcp.server.fastmcp import FastMCP def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _base_result(tool_name: str, ok: bool, payload: dict[str, Any]) -> dict[str, Any]: return { "tool_name": tool_name, "ok": ok, "payload": payload, "received_at": _utc_now_iso(), } mcp = FastMCP( "prisma_mcp_stub", host=os.getenv("MCP_STUB_HOST", "0.0.0.0"), port=int(os.getenv("MCP_STUB_PORT", "8081")), streamable_http_path="/mcp", ) @mcp.tool() def search_news_sources(url: str) -> dict[str, Any]: return _base_result( tool_name="search_news_sources", ok=True, payload={ "input_url": str(url), "items": [ {"url": "https://news-a.example/article-1"}, {"url": "https://news-b.example/article-2"}, {"url": "https://news-c.example/article-3"}, ], }, ) @mcp.tool() def parse_article(url: str) -> dict[str, Any]: return _base_result( tool_name="parse_article", ok=True, payload={ "url": str(url), "title": "Stub article title", "published_at": "2026-01-01T10:00:00+00:00", "text": "Stub parsed article content.", }, ) @mcp.tool() def extract_publication_date(article_text: str) -> dict[str, Any]: return _base_result( tool_name="extract_publication_date", ok=True, payload={ "text_size": len(article_text), "published_at": "2026-01-01T10:00:00+00:00", "confidence": 0.77, }, ) @mcp.tool() def rank_sources_by_date(items: list[dict[str, Any]]) -> dict[str, Any]: ranked = sorted(items, key=lambda item: str(item.get("published_at", ""))) return _base_result( tool_name="rank_sources_by_date", ok=True, payload={ "input_count": len(items), "ranked_items": ranked, }, ) @mcp.tool() def generate_summary(items: list[dict[str, Any]]) -> dict[str, Any]: first_url = "" if items: first_url = str(items[0].get("url", "")) return _base_result( tool_name="generate_summary", ok=True, payload={ "input_count": len(items), "summary": "По заглушечным данным самым ранним источником считается " + first_url, }, ) if __name__ == "__main__": mcp.run(transport="streamable-http")