tools = [
{
"name": "list_events",
"description": "List events in the user's Google Calendar",
"input_schema": {
"type": "object",
"properties": {
"calendar_id": {
"type": "string",
"description": "The ID of the calendar to list events from",
},
"days": {
"type": "number",
"description": "The number of days to list events for",
},
"max_results": {
"type": "number",
"description": "The maximum number of events to return",
},
},
"required": [],
},
},
{
"name": "create_event",
"description": "Create an event in the user's Google Calendar",
"input_schema": {
"type": "object",
"properties": {
"calendar_id": {
"type": "string",
"description": "The ID of the calendar to create the event in",
},
"summary": {
"type": "string",
"description": "The summary of the event",
},
"start_datetime": {
"type": "string",
"description": "The start date and time of the event",
},
"end_datetime": {
"type": "string",
"description": "The end date and time of the event",
},
"description": {
"type": "string",
"description": "The description of the event",
},
"location": {
"type": "string",
"description": "The location of the event",
},
"attendees": {
"type": "array",
"description": "The attendees of the event",
"items": {
"type": "string",
"description": "The email addresses of the attendee",
},
},
},
"required": ["summary", "start_datetime", "end_datetime"],
},
},
]
my_config = {
"GUMLOOP_GUMCP_API_KEY": "",
"GUMLOOP_GUMCP_SERVER_ID": "",
"ANTHROPIC_API_KEY": "",
}
gumloop_gumcp_api_key = my_config["GUMLOOP_GUMCP_API_KEY"]
gumloop_gumcp_server_id = my_config["GUMLOOP_GUMCP_SERVER_ID"]
anthropic_api_key = my_config["ANTHROPIC_API_KEY"]
gumcp_url = f"https://mcp.gumloop.com/gcalendar/{gumloop_gumcp_server_id}%3A{gumloop_gumcp_api_key}"
# @title MCP client
# Based on : https://github.com/gumloop/guMCP/blob/main/tests/clients/RemoteMCPTestClient.py
import asyncio
import argparse
import traceback
from typing import Optional, List, Dict, Any
from contextlib import AsyncExitStack
from anthropic import Anthropic
from mcp import ClientSession
from mcp.client.sse import sse_client
class RemoteMCPClient:
def __init__(self):
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.llm_client = Anthropic(api_key=anthropic_api_key)
async def connect_to_server(self, sse_endpoint: str):
"""Connect to a remote MCP server via SSE
Args:
sse_endpoint: Full SSE endpoint URL (e.g., "http://localhost:8000/simple-tools-server")
"""
print(f"Connecting to server at {sse_endpoint}")
read_stream, write_stream = await self.exit_stack.enter_async_context(
sse_client(sse_endpoint)
)
self.session = await self.exit_stack.enter_async_context(
ClientSession(read_stream, write_stream)
)
print("Initializing Client Session...")
await self.session.initialize()
print("Session initialized!")
print("Listing tools...")
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])
async def list_tools(self) -> None:
"""List all available tools from the server"""
if not self.session:
raise ValueError("Session not initialized")
try:
return await self.session.list_tools()
except Exception as e:
print(f"Error listing tools: {e}")
async def list_resources(self) -> None:
"""List all available resources from the server"""
if not self.session:
raise ValueError("Session not initialized")
try:
return await self.session.list_resources()
except Exception as e:
print(f"Error listing resources: {e}")
print(f"Stacktrace: {traceback.format_exc()}")
async def read_resource(self, uri: str) -> None:
"""Read a specific resource from the server
Args:
uri: URI of the resource to read
"""
if not self.session:
raise ValueError("Session not initialized")
try:
return await self.session.read_resource(uri)
except Exception as e:
print(f"Error reading resource: {e}")
async def cleanup(self):
"""Clean up resources"""
# TODO: Fix errors during cleanup when running test clients
try:
# Close the session first if it exists
print("Closing session...")
if self.session:
# Create a detached task for session cleanup if needed
if hasattr(self.session, "close"):
await self.session.close()
self.session = None
# Then close the exit stack
print("Closing exit stack...")
if self.exit_stack:
# Manually close each context in the stack to avoid task context issues
while True:
try:
# Pop and close each context manager one by one
cm = self.exit_stack._exit_callbacks.pop()
await cm(None, None, None)
except IndexError:
# No more callbacks
break
except Exception as e:
print(f"\nError during cleanup: {e}")
except Exception as e:
print(f"Cleanup error: {e}")
# @title Chatbot
class Chatbot:
def __init__(self, api_key: str, mcp_url: str):
self.llm_client = Anthropic(api_key=api_key)
self.llm_model = "claude-3-7-sonnet-latest"
self.mcp_url = mcp_url
self.mcp_client = None
async def process_query(self, query: str) -> str:
"""Process a query using Claude and available tools"""
messages: List[Dict[str, Any]] = [{"role": "user", "content": query}]
if self.mcp_client.session is None:
raise ValueError("Session not initialized")
response = await self.mcp_client.session.list_tools()
available_tools = [
{
"name": tool.name,
"description": tool.description,
"input_schema": tool.inputSchema,
}
for tool in response.tools
]
# Initial Claude API call
response = self.llm_client.messages.create(
model=self.llm_model,
max_tokens=1000,
messages=messages,
tools=available_tools,
)
# Process response and handle tool calls
final_text = []
assistant_message_content = []
for content in response.content:
if content.type == "text":
final_text.append(content.text)
assistant_message_content.append(content)
elif content.type == "tool_use":
tool_name = content.name
tool_args = content.input
# Execute tool call
result = await self.mcp_client.session.call_tool(tool_name, tool_args)
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")
print(f"Tool Call Result: {result}")
assistant_message_content.append(content)
messages.append(
{"role": "assistant", "content": assistant_message_content}
)
messages.append(
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": content.id,
"content": result.content,
}
],
}
)
# Get next response from Claude
response = self.llm_client.messages.create(
model=self.llm_model,
max_tokens=1000,
messages=messages,
tools=available_tools,
)
if (
response.content
and len(response.content) > 0
and hasattr(response.content[0], "text")
):
final_text.append(response.content[0].text)
else:
for content_item in result.content:
if hasattr(content_item, "text"):
final_text.append(content_item.text)
return "\n".join(final_text)
async def chat_loop(self):
"""Run an interactive chat loop"""
print("Chat loop started!")
print("Type your queries or 'quit' to exit.")
while True:
try:
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
response = await self.process_query(query)
print("\nASSISTANT RESPONSE:\n" + response)
except Exception as e:
print(f"\nError: {str(e)}")
async def run_chatbot(self):
self.mcp_client = RemoteMCPClient()
try:
await self.mcp_client.connect_to_server(sse_endpoint=self.mcp_url)
await self.chat_loop()
finally:
await self.mcp_client.cleanup()
test_mcp_client = RemoteMCPClient()
await test_mcp_client.connect_to_server(sse_endpoint=gumcp_url)
import json
available_tools = await test_mcp_client.list_tools()
if available_tools and available_tools.tools:
print("Number of tools", len(available_tools.tools))
for item in available_tools.tools:
print("\nTool name:", item.name)
print("Tool description:", item.description)
input_schema = item.inputSchema
if input_schema:
print("Tool input schema:", json.dumps(input_schema, indent=4))
available_resources = await test_mcp_client.list_resources()
if available_resources and available_resources.resources:
print("Number of resources", len(available_resources.resources))
for item in available_resources.resources:
print(item)
chatbot = Chatbot(anthropic_api_key, gumcp_url)
await chatbot.run_chatbot()