forge-embed-mcp/server.py

226 lines
6.9 KiB
Python
Executable file
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env -S uv run --quiet --script
# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp[cli]>=1.0.0",
# "httpx",
# ]
# ///
"""
forge-embed MCP server.
Exposes semantic search over forge-embed namespaces to Claude Code.
Configuration (env vars):
FORGE_EMBED_URL — forge-embed API URL (default: http://157.90.28.234:9910)
"""
import os
from typing import Optional
import httpx
from mcp.server.fastmcp import FastMCP
FORGE_URL = os.environ.get("FORGE_EMBED_URL", "https://embed.ideeealprojects.com")
FORGE_TOKEN = os.environ.get("FORGE_EMBED_TOKEN", "")
HEADERS = {"Authorization": f"Bearer {FORGE_TOKEN}"} if FORGE_TOKEN else {}
mcp = FastMCP("forge-embed")
@mcp.tool()
async def list_namespaces() -> str:
"""
List all available namespaces in forge-embed with their entry counts.
Use this to discover what projects are indexed and available for search.
"""
async with httpx.AsyncClient(timeout=10, headers=HEADERS) as client:
resp = await client.get(f"{FORGE_URL}/namespaces")
resp.raise_for_status()
data = resp.json()
lines = ["Available namespaces:"]
for name, info in sorted(data.items()):
entries = info.get("entries", 0)
if entries > 0:
lines.append(f" {name}: {entries} entries")
return "\n".join(lines)
@mcp.tool()
async def search(
namespace: str,
query: str,
top_k: int = 5,
layer: Optional[str] = None,
) -> str:
"""
Semantic search within a single project namespace.
Args:
namespace: Project namespace (e.g. "1c-buh", "cloudpos", "servicebitrix")
query: Search query in natural language (any language)
top_k: Number of results to return (default 5, max 20)
layer: Filter by layer type. Options:
- "profile": only object profiles (for "what attributes does X have?" queries)
- "code": only code chunks (for "how does X work?" queries)
- "relationship": only relationships (for "what uses X?" queries)
- None: search all layers (default)
Returns formatted results with scores, paths, and text snippets.
Examples:
search("1c-buh", "какие реквизиты у документа реализация", layer="profile")
search("1c-buh", "как рассчитывается НДС")
search("cloudpos", "authentication token dev mode")
"""
payload = {
"namespace": namespace,
"query": query,
"top_k": min(top_k, 20),
}
if layer:
payload["metadata_filter"] = {"layer": layer}
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(f"{FORGE_URL}/search", json=payload)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
if not results:
return f"No results found in '{namespace}' for query: {query}"
lines = [
f"Found {len(results)} results in '{namespace}' (total indexed: {data.get('total_indexed', 0)})",
"",
]
for i, r in enumerate(results, 1):
meta = r.get("metadata", {})
r_layer = meta.get("layer", "?")
score = r.get("score", 0)
rid = r.get("id", "")
text = r.get("text", "")
lines.append(f"--- Result {i} ---")
lines.append(f"Score: {score:.3f} | Layer: {r_layer}")
lines.append(f"ID: {rid}")
if path := meta.get("path"):
lines.append(f"Path: {path}")
if warnings := meta.get("warnings"):
lines.append(f"Warnings: {', '.join(warnings)}")
lines.append("")
lines.append(text[:1500])
lines.append("")
return "\n".join(lines)
@mcp.tool()
async def cross_search(
query: str,
top_k: int = 5,
namespaces: Optional[list[str]] = None,
) -> str:
"""
Semantic search across multiple project namespaces.
Useful for finding similar issues, patterns, or solutions across projects.
Args:
query: Search query in natural language
top_k: Number of results to return (default 5)
namespaces: Specific namespaces to search. If None, searches all.
Example:
cross_search("authentication token not returned")
cross_search("проведение документа", namespaces=["1c-buh"])
"""
payload = {
"query": query,
"top_k": min(top_k, 20),
}
if namespaces:
payload["namespaces"] = namespaces
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(f"{FORGE_URL}/cross-search", json=payload)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
if not results:
return f"No results found for query: {query}"
lines = [f"Cross-project results for: {query}", ""]
for i, r in enumerate(results, 1):
ns = r.get("namespace", "?")
meta = r.get("metadata", {})
r_layer = meta.get("layer", "?")
score = r.get("score", 0)
rid = r.get("id", "")
text = r.get("text", "")
lines.append(f"--- Result {i} [{ns}] ---")
lines.append(f"Score: {score:.3f} | Layer: {r_layer}")
lines.append(f"ID: {rid}")
lines.append("")
lines.append(text[:1000])
lines.append("")
return "\n".join(lines)
@mcp.tool()
async def record_outcome(
namespace: str,
outcome_type: str,
task: str,
approach: str,
result: str,
user: str = "",
level: str = "",
tags: Optional[list[str]] = None,
context: str = "",
) -> str:
"""
Record a task outcome (win/fail/pivot) in forge-embed for future learning.
Call this after completing a task to build a knowledge base of what works.
Args:
namespace: Project namespace
outcome_type: "win" (success), "fail" (failure), "pivot" (changed approach)
task: Short description of the task
approach: What was tried
result: What happened
user: Who did it (optional)
level: User skill level (observer/communicator/operator/architect)
tags: List of tags for categorization
context: Additional context about the situation
Example:
record_outcome("1c-buh", "win",
task="Add new calculation procedure for НДС",
approach="Created extension module with ОбщегоНазначения reuse",
result="Tests passed, deployed to staging")
"""
payload = {
"namespace": namespace,
"outcome_type": outcome_type,
"task": task,
"approach": approach,
"result": result,
"user": user,
"level": level,
"tags": tags or [],
"context": context,
}
async with httpx.AsyncClient(timeout=30, headers=HEADERS) as client:
resp = await client.post(f"{FORGE_URL}/outcome", json=payload)
resp.raise_for_status()
data = resp.json()
return f"Outcome recorded: {data.get('id')}"
if __name__ == "__main__":
mcp.run()