init commit

This commit is contained in:
bracesproul
2025-05-09 16:19:52 -07:00
commit f0af7ec3c0
13 changed files with 2243 additions and 0 deletions
+3
View File
@@ -0,0 +1,3 @@
ANTHROPIC_API_KEY=...
TAVILY_API_KEY=...
OPENAI_API_KEY=...
+6
View File
@@ -0,0 +1,6 @@
.env
.ipynb_checkpoints
.langgraph-data
__pycache__
.langgraph_api/
*.egg-info/
+4
View File
@@ -0,0 +1,4 @@
# Open Agent Platform LangGraph Tools Agent
This is a LangGraph agent that can be used with the Open Agent Platform.
+12
View File
@@ -0,0 +1,12 @@
{
"dependencies": ["."],
"graphs": {
"agent": "./tools_agent/agent.py:graph"
},
"env": ".env",
"python_version": "3.11",
"auth": {
"path": "./tools_agent/security/auth.py:auth"
}
}
+23
View File
@@ -0,0 +1,23 @@
[project]
name = "tools_agent"
version = "0.1.0"
description = "LangGraph tools agent with MCP and a RAG tool"
authors = [
{ name = "langchain-ai" },
]
requires-python = ">=3.11.0,<3.13"
dependencies = [
"langgraph==0.4.3",
"langgraph-cli==0.2.10",
"langchain-anthropic==0.3.13",
"langchain-core==0.3.59",
"langchain-openai==0.3.16",
"pydantic==2.11.3",
"langchain==0.3.25",
"langchain-mcp-adapters==0.0.11",
"langgraph-api==0.2.20",
"supabase>=2.15.1",
]
[tool.setuptools]
packages = ["tools_agent"]
Binary file not shown.

After

Width:  |  Height:  |  Size: 131 KiB

+5
View File
@@ -0,0 +1,5 @@
from .agent import graph
__all__ = ["graph"]
__version__ = "0.1.0"
+172
View File
@@ -0,0 +1,172 @@
from langchain_core.runnables import RunnableConfig
from typing import Optional, List
from pydantic import BaseModel, Field
from langgraph.prebuilt import create_react_agent
from tools_agent.utils.tools import create_rag_tool
from langchain.chat_models import init_chat_model
from langchain_mcp_adapters.client import MultiServerMCPClient
from tools_agent.utils.token import fetch_tokens
class RagConfig(BaseModel):
rag_url: Optional[str] = None
"""The URL of the rag server"""
collection: Optional[str] = None
"""The collection to use for rag"""
class MCPConfig(BaseModel):
url: Optional[str] = Field(
default=None,
optional=True,
)
"""The URL of the MCP server"""
tools: Optional[List[str]] = Field(
default=None,
optional=True,
)
"""The tools to make available to the LLM"""
class GraphConfigPydantic(BaseModel):
model_name: Optional[str] = Field(
default="anthropic:claude-3-7-sonnet-latest",
metadata={
"x_lg_ui_config": {
"type": "select",
"default": "anthropic:claude-3-7-sonnet-latest",
"description": "The model to use in all generations",
"options": [
{
"label": "Claude 3.7 Sonnet",
"value": "anthropic:claude-3-7-sonnet-latest",
},
{
"label": "Claude 3.5 Sonnet",
"value": "anthropic:claude-3-5-sonnet-latest",
},
{"label": "GPT 4o", "value": "openai:gpt-4o"},
{"label": "GPT 4o mini", "value": "openai:gpt-4o-mini"},
{"label": "GPT 4.1", "value": "openai:gpt-4.1"},
],
}
}
)
temperature: Optional[float] = Field(
default=0.7,
metadata={
"x_lg_ui_config": {
"type": "slider",
"default": 0.7,
"min": 0,
"max": 2,
"step": 0.1,
"description": "Controls randomness (0 = deterministic, 2 = creative)",
}
}
)
max_tokens: Optional[int] = Field(
default=4000,
metadata={
"x_lg_ui_config": {
"type": "number",
"default": 4000,
"min": 1,
"description": "The maximum number of tokens to generate",
}
}
)
system_prompt: Optional[str] = Field(
default=None,
metadata={
"x_lg_ui_config": {
"type": "textarea",
"placeholder": "Enter a system prompt...",
"description": "The system prompt to use in all generations",
}
}
)
mcp_config: Optional[MCPConfig] = Field(
default=None,
optional=True,
metadata={
"x_lg_ui_config": {
"type": "mcp",
# Here is where you would set the default tools.
# "default": {
# "tools": ["Math_Divide", "Math_Mod"]
# }
}
}
)
rag: Optional[RagConfig] = Field(
default=None,
optional=True,
metadata={
"x_lg_ui_config": {
"type": "rag",
# Here is where you would set the default collection.
# "default": {
# "collections": ["python", "langgraph docs"]
# }
}
}
)
async def graph2(config: RunnableConfig):
cfg = GraphConfigPydantic(**config.get("configurable", {}))
tools = []
# Add RAG tool if configured
if cfg.rag and cfg.rag.rag_url and cfg.rag.collection:
rag_tool = create_rag_tool(cfg.rag.rag_url, cfg.rag.collection)
print("\n\n\n\nFOUND RAG TOOL!!!\n\n\n\n")
tools.append(rag_tool)
# Add MCP tools if configured
if cfg.mcp_config and cfg.mcp_config.url:
mcp_tokens = fetch_tokens(config)
if not mcp_tokens:
print("Failed to fetch MCP tokens")
return
# Create MCP client and connect to the server
async with MultiServerMCPClient() as mcp_client:
# Connect to the server using SSE transport
await mcp_client.connect_to_server(
"mcp_server",
transport="streamable_http",
url=cfg.mcp_config.url,
headers={
"Authorization": f"Bearer {mcp_tokens['access_token']}"
}
)
# Get all tools from the server
all_mcp_tools = mcp_client.get_tools()
# Filter tools based on the config
if cfg.mcp_config.tools:
# Only include tools specified in the config
filtered_tools = []
for tool in all_mcp_tools:
if tool.name in cfg.mcp_config.tools:
filtered_tools.append(tool)
tools.extend(filtered_tools)
else:
# If no specific tools are specified, include all tools
tools.extend(all_mcp_tools)
# Initialize the model
model = init_chat_model(cfg.model_name, temperature=cfg.temperature, max_tokens=cfg.max_tokens)
print("RETURNING AGENT")
return create_react_agent(
prompt=cfg.system_prompt,
model=model,
tools=tools,
config_schema=GraphConfigPydantic,
)
graph = create_react_agent(
model="openai:gpt-4o",
tools=[],
config_schema=GraphConfigPydantic,
)
+141
View File
@@ -0,0 +1,141 @@
import os
import asyncio
from langgraph_sdk import Auth
from supabase import create_client, Client
from typing import Optional, Any
supabase_url = os.environ.get("SUPABASE_URL")
supabase_key = os.environ.get("SUPABASE_KEY")
supabase: Optional[Client] = None
if supabase_url and supabase_key:
supabase = create_client(supabase_url, supabase_key)
# The "Auth" object is a container that LangGraph will use to mark our authentication function
auth = Auth()
# The `authenticate` decorator tells LangGraph to call this function as middleware
# for every request. This will determine whether the request is allowed or not
@auth.authenticate
async def get_current_user(authorization: str | None) -> Auth.types.MinimalUserDict:
"""Check if the user's JWT token is valid using Supabase."""
# Ensure we have authorization header
if not authorization:
raise Auth.exceptions.HTTPException(
status_code=401, detail="Authorization header missing"
)
# Parse the authorization header
try:
scheme, token = authorization.split()
assert scheme.lower() == "bearer"
except (ValueError, AssertionError):
raise Auth.exceptions.HTTPException(
status_code=401, detail="Invalid authorization header format"
)
# Ensure Supabase client is initialized
if not supabase:
raise Auth.exceptions.HTTPException(
status_code=500, detail="Supabase client not initialized"
)
try:
# Verify the JWT token with Supabase using asyncio.to_thread to avoid blocking
# This will decode and verify the JWT token in a separate thread
async def verify_token() -> dict[str, Any]:
response = await asyncio.to_thread(supabase.auth.get_user, token)
return response
response = await verify_token()
user = response.user
if not user:
raise Auth.exceptions.HTTPException(
status_code=401, detail="Invalid token or user not found"
)
# Return user info if valid
return {
"identity": user.id,
}
except Exception as e:
# Handle any errors from Supabase
raise Auth.exceptions.HTTPException(
status_code=401, detail=f"Authentication error: {str(e)}"
)
@auth.on.threads.create
async def on_thread_create(
ctx: Auth.types.AuthContext,
value: Auth.types.on.threads.create.value,
):
"""Add owner when creating threads.
This handler runs when creating new threads and does two things:
1. Sets metadata on the thread being created to track ownership
2. Returns a filter that ensures only the creator can access it
"""
# Add owner metadata to the thread being created
# This metadata is stored with the thread and persists
metadata = value.setdefault("metadata", {})
metadata["owner"] = ctx.user.identity
# Return filter to restrict access to just the creator
return {"owner": ctx.user.identity}
@auth.on.threads.read
@auth.on.threads.delete
@auth.on.threads.update
@auth.on.threads.search
async def on_thread_read(
ctx: Auth.types.AuthContext,
value: Auth.types.on.threads.read.value,
):
"""Only let users read their own threads.
This handler runs on read operations. We don't need to set
metadata since the thread already exists - we just need to
return a filter to ensure users can only see their own threads.
"""
return {"owner": ctx.user.identity}
@auth.on.assistants.create
async def on_assistants_create(
ctx: Auth.types.AuthContext,
value: Auth.types.on.assistants.create.value,
):
# Add owner metadata to the assistant being created
# This metadata is stored with the assistant and persists
metadata = value.setdefault("metadata", {})
metadata["owner"] = ctx.user.identity
# Return filter to restrict access to just the creator
return {"owner": ctx.user.identity}
@auth.on.assistants.read
@auth.on.assistants.delete
@auth.on.assistants.update
@auth.on.assistants.search
async def on_assistants_read(
ctx: Auth.types.AuthContext,
value: Auth.types.on.assistants.read.value,
):
"""Only let users read their own assistants.
This handler runs on read operations. We don't need to set
metadata since the assistant already exists - we just need to
return a filter to ensure users can only see their own assistants.
"""
return {"owner": ctx.user.identity}
@auth.on.store()
async def authorize_store(ctx: Auth.types.AuthContext, value: dict):
# The "namespace" field for each store item is a tuple you can think of as the directory of an item.
namespace: tuple = value["namespace"]
assert namespace[0] == ctx.user.identity, "Not authorized"
View File
+118
View File
@@ -0,0 +1,118 @@
import logging
import os
import requests
from typing import Dict, Optional, Any
from langgraph.store.base import BaseStore
from langchain_core.runnables import RunnableConfig
from langgraph.config import get_store
def get_mcp_access_token(
supabase_token: str,
base_token_exchange_url: str,
) -> Optional[Dict[str, Any]]:
"""
Exchange a Supabase token for an MCP access token.
Args:
supabase_token: The Supabase token to exchange
base_token_exchange_url: The base URL for the token exchange service
Returns:
The token data as a dictionary if successful, None otherwise
"""
try:
# Exchange Supabase token for MCP access token
form_data = {
"client_id": "mcp_default",
"subject_token": supabase_token,
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"resource": f"{base_token_exchange_url}/mcp",
"subject_token_type": "urn:ietf:params:oauth:token-type:access_token",
}
token_response = requests.post(
f"{base_token_exchange_url}/oauth/token",
headers={"Content-Type": "application/x-www-form-urlencoded"},
data=form_data
)
if token_response.status_code == 200:
token_data = token_response.json()
return token_data
else:
logging.error(f"Token exchange failed: {token_response.text}")
except Exception as e:
logging.error(f"Error during token exchange: {e}")
return None
def get_tokens(config: RunnableConfig) :
store = get_store()
thread_id = config.get("configurable", {}).get("thread_id")
if not thread_id:
print("Thread ID not found in config")
return None
user_id = config.get("metadata", {}).get("owner")
if not user_id:
print("User ID not found in metadata")
return None
tokens = store.get([user_id, "tokens", thread_id], "data")
if not tokens:
return None
return tokens.value
def set_tokens(config: RunnableConfig, tokens: dict[str, Any]):
store = get_store()
thread_id = config.get("configurable", {}).get("thread_id")
if not thread_id:
print("Thread ID not found in config")
return
user_id = config.get("metadata", {}).get("owner")
if not user_id:
print("User ID not found in metadata")
return
store.put([user_id, "tokens", thread_id], "data", tokens)
return
def fetch_tokens(config: RunnableConfig) -> dict[str, Any]:
"""
Fetch MCP access token if it doesn't already exist in the store.
Args:
config: The runnable configuration
Raises:
ValueError: If required configuration is missing
"""
base_mcp_token_exchange_url = os.environ.get("BASE_MCP_TOKEN_EXCHANGE_URL")
if not base_mcp_token_exchange_url:
print("MCP token exchange URL is required")
return None
current_tokens = get_tokens(config)
if current_tokens:
return current_tokens
supabase_token = config.get("x-supabase-access-token")
if not supabase_token:
print("Supabase access token is required")
return None
mcp_config = config.get("configurable", {}).get("mcp_config")
if not mcp_config:
print("MCP config is required")
return None
mcp_tokens = get_mcp_access_token(
supabase_token,
base_mcp_token_exchange_url
)
set_tokens(config, mcp_tokens)
return mcp_tokens
+78
View File
@@ -0,0 +1,78 @@
from langchain_core.tools import StructuredTool
import requests
from pydantic import BaseModel, Field
def create_rag_tool(rag_url: str, collection: str):
"""Create a RAG tool for a specific collection.
Args:
rag_url: The base URL for the RAG API server
collection: The name of the collection to query
Returns:
A structured tool that can be used to query the RAG collection
"""
if rag_url.endswith('/'):
rag_url = rag_url[:-1]
collection_endpoint = f"{rag_url}/collections/{collection}"
try:
response = requests.get(collection_endpoint)
response.raise_for_status()
collection_data = response.json()
collection_name = collection_data.get("name", collection)
raw_description = collection_data.get("metadata", {}).get("description")
if not raw_description:
collection_description = "Search your collection of documents for results semantically similar to the input query"
else:
collection_description = f"Search your collection of documents for results semantically similar to the input query. Collection description: {raw_description}"
def get_documents(query: str) -> str:
"""Search for documents in the collection based on the query.
Args:
query: The search query string
Returns:
A formatted string containing all documents in XML-like format
"""
search_endpoint = f"{rag_url}/collections/{collection}/documents/search"
payload = {
"query": query,
"limit": 10
}
try:
search_response = requests.post(search_endpoint, json=payload)
search_response.raise_for_status()
documents = search_response.json()
formatted_docs = "<all-documents>\n"
for doc in documents:
doc_id = doc.get("id", "unknown")
content = doc.get("content", "")
formatted_docs += f" <document id=\"{doc_id}\">\n {content}\n </document>\n"
formatted_docs += "</all-documents>"
return formatted_docs
except Exception as e:
return f"<all-documents>\n <error>{str(e)}</error>\n</all-documents>"
class SearchArgs(BaseModel):
query: str = Field(description="The search query to find relevant documents")
return StructuredTool.from_function(
func=get_documents,
name=collection_name,
description=collection_description,
args_schema=SearchArgs,
return_direct=True,
)
except Exception as e:
raise Exception(f"Failed to create RAG tool: {str(e)}")
Generated
+1681
View File
File diff suppressed because it is too large Load Diff