Refactor thread cleanup tool to use LangGraph SDK

- Replace aiohttp with official langgraph-sdk for thread operations
- Add accurate run count tracking via client.runs.list()
- Optimize API calls by caching thread categorization (16x reduction)
- Update README with current functionality and examples
- Improve error handling and user experience
This commit is contained in:
Liam Bush
2025-10-07 13:51:13 -07:00
parent cf579b3f4c
commit ae4edf523a
2 changed files with 126 additions and 174 deletions
+30 -20
View File
@@ -5,13 +5,13 @@ Interactive cleanup utility for managing LangGraph threads across deployments. P
## Prerequisites
- Python 3.7+
- `aiohttp` library
- `langgraph-sdk` library
## Installation
Install required dependency:
```bash
pip install aiohttp
pip install langgraph-sdk
```
## Usage
@@ -90,31 +90,30 @@ Your API key must have permissions to:
- Check that the URL matches your actual LangGraph deployment
- Ensure the API key belongs to the correct organization/tenant
### "None of the thread endpoints worked"
- Confirm your LangGraph server is running and accessible
- Verify the URL format is correct
- Check API key permissions
- Ensure network connectivity to the deployment
### Connection Issues
- Verify the deployment URL is reachable
- Check for any firewall or network restrictions
- Confirm the API key is valid and not expired
- Ensure your LangGraph server is running and accessible
## Example Session
```
Discovering threads...
Connecting to: https://your-deployment.us.langgraph.app
Found working endpoint: POST /threads/search
Found: 15 threads
Total threads found: 15
By Status:
├─ idle: 10
├─ success: 3
├─ error: 2
├─ completed: 3
├─ failed: 2
By Runs:
├─ 0 runs: 5
├─ 1 run: 7
├─ 3 runs: 3
What would you like to delete?
1. Delete by TIME
@@ -127,19 +126,30 @@ What would you like to delete?
Select option (1-7): 2
Delete by STATUS
Select Status:
1. idle (10 threads)
2. success (3 threads)
3. error (2 threads)
4. Review all status categories
5. Back to main menu
2. completed (3 threads)
3. failed (2 threads)
4. Go back to main menu
Select option (1-5): 1
Select status (1-4): 1
You're about to delete 10 idle threads. This cannot be undone!
Do you want to continue? (yes/no): yes
Found 10 threads with status "idle".
Successfully deleted 10 threads
Do you want to:
1. Review threads before deleting
2. Delete immediately
3. Go back to status menu
Select option (1-3): 2
Deleting 10 threads...
Are you sure you want to delete 10 threads? (yes/no): yes
Deleted: 10/10
Summary: 10 deleted, 0 failed
Cleanup completed. Total threads deleted: 10
```
## Support
+96 -154
View File
@@ -8,26 +8,24 @@ Provides better observability and control over what gets deleted
import sys
import argparse
import asyncio
import aiohttp
import json
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from urllib.parse import urlparse
from langgraph_sdk import get_client
class ThreadCleanup:
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.api_key = api_key
self.headers = {'Content-Type': 'application/json'}
if api_key:
self.headers['X-Api-Key'] = api_key
self.client = get_client(url=base_url, api_key=api_key)
def ask_question(self, question: str) -> str:
"""Ask user for input"""
return input(question)
def categorize_threads(self, threads: List[Dict]) -> Dict:
async def categorize_threads(self, threads: List[Dict]) -> Dict:
"""Categorize threads by status, runs, and graph ID"""
categories = {
'byGraph': {},
@@ -35,8 +33,23 @@ class ThreadCleanup:
'byRuns': {}
}
for thread in threads:
run_count = len(thread.get('runs', []))
print('\n🔍 Fetching run counts...')
# Fetch run counts for all threads
for i, thread in enumerate(threads):
thread_id = thread.get('thread_id')
# Get run count from API
try:
runs = await self.client.runs.list(thread_id)
run_count = len(runs) if runs else 0
except:
run_count = 0
# Store run count in thread for later display
thread['_run_count'] = run_count
print(f'Analyzing thread {i+1}/{len(threads)}...', end='\r')
status = thread.get('status', 'unknown')
# Graph categorization
@@ -69,6 +82,7 @@ class ThreadCleanup:
categories['byRuns'][runs_category] = []
categories['byRuns'][runs_category].append(thread)
print(' ' * 50, end='\r') # Clear the progress line
# Add allThreads for easy access
categories['allThreads'] = threads
return categories
@@ -84,7 +98,7 @@ class ThreadCleanup:
pass
status = thread.get('status', 'unknown')
run_count = len(thread.get('runs', []))
run_count = thread.get('_run_count', 0)
metadata = json.dumps(thread.get('metadata', {})) if thread.get('metadata') else 'None'
return f""" ID: {thread.get('thread_id', 'Unknown')}
@@ -142,19 +156,17 @@ class ThreadCleanup:
choice = self.ask_question('\nSelect option (1-7): ')
if choice == '1':
return await self.select_by_time(all_threads)
return await self.select_by_time(all_threads, categories)
elif choice == '2':
return await self.select_by_status(categories['byStatus'], all_threads)
return await self.select_by_status(categories['byStatus'], all_threads, categories)
elif choice == '3':
categories_with_runs = self.categorize_threads(all_threads)
return await self.select_by_runs(categories_with_runs['byRuns'], all_threads)
return await self.select_by_runs(categories['byRuns'], all_threads, categories)
elif choice == '4':
categories_with_graph = self.categorize_threads(all_threads)
return await self.select_by_graph(categories_with_graph['byGraph'], all_threads)
return await self.select_by_graph(categories['byGraph'], all_threads, categories)
elif choice == '5':
return await self.preview_all_threads(all_threads)
return await self.preview_all_threads(all_threads, categories)
elif choice == '6':
return await self.confirm_delete_all(all_threads)
return await self.confirm_delete_all(all_threads, categories)
elif choice == '7':
print('Exiting without deleting anything.')
return None
@@ -162,7 +174,7 @@ class ThreadCleanup:
print('Invalid choice. Exiting.')
return None
async def preview_all_threads(self, all_threads: List[Dict]) -> List[Dict]:
async def preview_all_threads(self, all_threads: List[Dict], categories: Dict) -> List[Dict]:
"""Preview all threads without filtering"""
print(f'\n👁️ Previewing all {len(all_threads)} threads:')
@@ -170,8 +182,7 @@ class ThreadCleanup:
print('No threads found.')
print('1. 🚪 Go back to main menu')
self.ask_question('\nSelect option (1): ')
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
threads_per_page = 5
start_index = 0
@@ -196,8 +207,7 @@ class ThreadCleanup:
start_index = end_index
continue
elif choice == '2':
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
else:
start_index = end_index
continue
@@ -205,12 +215,11 @@ class ThreadCleanup:
print('\n--- End of all threads ---')
print('1. 🚪 Go back to main menu')
self.ask_question('\nSelect option (1): ')
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
return []
async def select_by_time(self, all_threads: List[Dict]) -> Optional[List[Dict]]:
async def select_by_time(self, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]:
"""Select threads by time"""
print('\n⏰ Delete threads created:')
print('1. Within the last hour')
@@ -237,13 +246,12 @@ class ThreadCleanup:
start_time = 0 # All time
end_time = now.timestamp()
elif choice == '5':
return await self.select_custom_date_range(all_threads)
return await self.select_custom_date_range(all_threads, categories)
elif choice == '6':
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
else:
print('Invalid choice. Going back.')
return await self.select_by_time(all_threads)
return await self.select_by_time(all_threads, categories)
# Filter threads by time range
threads_to_delete = []
@@ -269,7 +277,7 @@ class ThreadCleanup:
if len(threads_to_delete) == 0:
print('No threads match your time criteria.')
return await self.select_by_time(all_threads)
return await self.select_by_time(all_threads, categories)
# Ask if they want to review before deleting
print('\nDo you want to:')
@@ -280,15 +288,15 @@ class ThreadCleanup:
review_choice = self.ask_question('\nSelect option (1-3): ')
if review_choice == '1':
return await self.review_threads(threads_to_delete, time_range_desc, all_threads)
return await self.review_threads(threads_to_delete, time_range_desc, all_threads, categories)
elif review_choice == '2':
return threads_to_delete
elif review_choice == '3':
return await self.select_by_time(all_threads)
return await self.select_by_time(all_threads, categories)
else:
return threads_to_delete
async def select_custom_date_range(self, all_threads: List[Dict]) -> Optional[List[Dict]]:
async def select_custom_date_range(self, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]:
"""Select threads by custom cutoff date"""
print('\n📅 Delete threads created before a specific date:')
print('Enter date in format: YYYY-MM-DD HH:MM (24-hour format)')
@@ -306,11 +314,11 @@ class ThreadCleanup:
if cutoff_time > datetime.now().timestamp():
print('❌ Cutoff date cannot be in the future.')
return await self.select_custom_date_range(all_threads)
return await self.select_custom_date_range(all_threads, categories)
except ValueError:
print('❌ Invalid date format. Please use YYYY-MM-DD or YYYY-MM-DD HH:MM')
return await self.select_custom_date_range(all_threads)
return await self.select_custom_date_range(all_threads, categories)
# Filter threads created before the cutoff date
threads_to_delete = []
@@ -335,9 +343,9 @@ class ThreadCleanup:
choice = self.ask_question('\nSelect option (1-2): ')
if choice == '1':
return await self.select_custom_date_range(all_threads)
return await self.select_custom_date_range(all_threads, categories)
else:
return await self.select_by_time(all_threads)
return await self.select_by_time(all_threads, categories)
# Ask if they want to review before deleting
print('\nDo you want to:')
@@ -349,17 +357,17 @@ class ThreadCleanup:
review_choice = self.ask_question('\nSelect option (1-4): ')
if review_choice == '1':
return await self.review_threads(threads_to_delete, f'created before {cutoff_str}', all_threads)
return await self.review_threads(threads_to_delete, f'created before {cutoff_str}', all_threads, categories)
elif review_choice == '2':
return threads_to_delete
elif review_choice == '3':
return await self.select_custom_date_range(all_threads)
return await self.select_custom_date_range(all_threads, categories)
elif review_choice == '4':
return await self.select_by_time(all_threads)
return await self.select_by_time(all_threads, categories)
else:
return threads_to_delete
async def review_threads(self, threads: List[Dict], description: str = '', all_threads: Optional[List[Dict]] = None) -> List[Dict]:
async def review_threads(self, threads: List[Dict], description: str = '', all_threads: Optional[List[Dict]] = None, categories: Optional[Dict] = None) -> List[Dict]:
"""Review threads before deletion"""
description_text = f' {description}' if description else ''
print(f'\n👁️ Reviewing {len(threads)} threads{description_text}:')
@@ -390,9 +398,8 @@ class ThreadCleanup:
elif choice == '2':
return threads
elif choice == '3':
if all_threads:
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
if all_threads and categories:
return await self.select_threads_to_delete(categories, all_threads)
return []
else:
start_index = end_index
@@ -407,16 +414,15 @@ class ThreadCleanup:
if choice == '1':
return threads
elif choice == '2':
if all_threads:
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
if all_threads and categories:
return await self.select_threads_to_delete(categories, all_threads)
return []
else:
return threads
return threads
async def confirm_delete_all(self, all_threads: List[Dict]) -> List[Dict]:
async def confirm_delete_all(self, all_threads: List[Dict], categories: Dict) -> List[Dict]:
"""Confirm deletion of all threads"""
print(f'\n⚠️ WARNING: You are about to delete ALL {len(all_threads)} threads!')
print('This action cannot be undone.')
@@ -426,24 +432,21 @@ class ThreadCleanup:
initial_choice = self.ask_question('\nSelect option (1-2): ')
if initial_choice != '1':
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
confirm1 = self.ask_question('\nType "DELETE ALL" to confirm: ')
if confirm1 != 'DELETE ALL':
print('Confirmation failed. Returning to main menu.')
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
confirm2 = self.ask_question(f'\nFinal confirmation: Delete all {len(all_threads)} threads? (yes/no): ')
if confirm2.lower() != 'yes':
print('Deletion cancelled. Returning to main menu.')
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
return all_threads
async def select_by_status(self, by_status: Dict, all_threads: List[Dict]) -> Optional[List[Dict]]:
async def select_by_status(self, by_status: Dict, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]:
"""Select threads by status"""
print('\n📝 Select Status:')
statuses = list(by_status.keys())
@@ -477,21 +480,20 @@ class ThreadCleanup:
review_choice = self.ask_question('\nSelect option (1-3): ')
if review_choice == '1':
return await self.review_threads(threads_to_delete, f'with status "{selected_status}"', all_threads)
return await self.review_threads(threads_to_delete, f'with status "{selected_status}"', all_threads, categories)
elif review_choice == '2':
return threads_to_delete
elif review_choice == '3':
return await self.select_by_status(by_status, all_threads)
return await self.select_by_status(by_status, all_threads, categories)
else:
return threads_to_delete
elif index == len(statuses):
# Go back to main menu
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
return []
async def select_by_runs(self, by_runs: Dict, all_threads: List[Dict]) -> Optional[List[Dict]]:
async def select_by_runs(self, by_runs: Dict, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]:
"""Select threads by runs count"""
print('\n🏃 Select by Runs Count:')
@@ -544,21 +546,20 @@ class ThreadCleanup:
review_choice = self.ask_question('\nSelect option (1-3): ')
if review_choice == '1':
return await self.review_threads(threads_to_delete, f'with {selected_category}', all_threads)
return await self.review_threads(threads_to_delete, f'with {selected_category}', all_threads, categories)
elif review_choice == '2':
return threads_to_delete
elif review_choice == '3':
return await self.select_by_runs(by_runs, all_threads)
return await self.select_by_runs(by_runs, all_threads, categories)
else:
return threads_to_delete
elif index == len(runs_categories):
# Go back to main menu
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
return []
async def select_by_graph(self, by_graph: Dict, all_threads: List[Dict]) -> Optional[List[Dict]]:
async def select_by_graph(self, by_graph: Dict, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]:
"""Select threads by graph ID"""
print('\n🔧 Select by Graph ID:')
graphs = list(by_graph.keys())
@@ -584,17 +585,16 @@ class ThreadCleanup:
review_choice = self.ask_question('\nSelect option (1-3): ')
if review_choice == '1':
return await self.review_threads(threads_to_delete, f'for graph "{selected_graph}"', all_threads)
return await self.review_threads(threads_to_delete, f'for graph "{selected_graph}"', all_threads, categories)
elif review_choice == '2':
return threads_to_delete
elif review_choice == '3':
return await self.select_by_graph(by_graph, all_threads)
return await self.select_by_graph(by_graph, all_threads, categories)
else:
return threads_to_delete
elif index == len(graphs):
# Go back to main menu
full_categories = self.categorize_threads(all_threads)
return await self.select_threads_to_delete(full_categories, all_threads)
return await self.select_threads_to_delete(categories, all_threads)
return []
@@ -613,20 +613,14 @@ class ThreadCleanup:
deleted = 0
failed = 0
async with aiohttp.ClientSession(headers=self.headers) as session:
for thread in threads_to_delete:
try:
delete_url = f"{self.base_url}/threads/{thread['thread_id']}"
async with session.delete(delete_url) as response:
if not response.ok:
print(f"Failed to delete thread {thread['thread_id']}: {response.status} {response.reason}")
failed += 1
else:
deleted += 1
print(f"✅ Deleted: {deleted}/{len(threads_to_delete)}", end='\r')
except Exception as delete_error:
print(f"❌ Error deleting thread {thread['thread_id']}: {delete_error}")
failed += 1
for thread in threads_to_delete:
try:
await self.client.threads.delete(thread['thread_id'])
deleted += 1
print(f"✅ Deleted: {deleted}/{len(threads_to_delete)}", end='\r')
except Exception as delete_error:
print(f"Error deleting thread {thread['thread_id']}: {delete_error}")
failed += 1
print(f'\n\n📈 Summary: {deleted} deleted, {failed} failed')
return deleted
@@ -637,87 +631,35 @@ class ThreadCleanup:
print('🔍 Discovering threads...')
print(f'📡 Connecting to: {self.base_url}')
# Try different endpoint variations to find the correct one
endpoints_to_try = [
{'url': f'{self.base_url}/threads/search', 'method': 'POST', 'body': {'limit': 1000, 'offset': 0}},
{'url': f'{self.base_url}/threads', 'method': 'GET', 'body': None},
{'url': f'{self.base_url}/threads?limit=1000', 'method': 'GET', 'body': None}
]
# Get all threads using SDK
all_threads = []
offset = 0
limit = 100
search_response = None
working_endpoint = None
async with aiohttp.ClientSession(headers=self.headers) as session:
for endpoint in endpoints_to_try:
print(f"🔍 Trying {endpoint['method']} {endpoint['url']}")
try:
if endpoint['method'] == 'POST':
async with session.post(endpoint['url'], json=endpoint['body']) as response:
if response.ok:
search_response = response
working_endpoint = endpoint
print(f"✅ Found working endpoint: {endpoint['method']} {endpoint['url']}")
break
else:
print(f"{endpoint['method']} {endpoint['url']} failed: {response.status} {response.reason}")
if response.status in [401, 403]:
error_text = await response.text()
print(f" Error details: {error_text}")
else:
async with session.get(endpoint['url']) as response:
if response.ok:
search_response = response
working_endpoint = endpoint
print(f"✅ Found working endpoint: {endpoint['method']} {endpoint['url']}")
break
else:
print(f"{endpoint['method']} {endpoint['url']} failed: {response.status} {response.reason}")
if response.status in [401, 403]:
error_text = await response.text()
print(f" Error details: {error_text}")
except Exception as fetch_error:
print(f"{endpoint['method']} {endpoint['url']} error: {fetch_error}")
if not search_response or not working_endpoint:
print('\n❌ None of the thread endpoints worked. Please check:')
while True:
try:
threads = await self.client.threads.search(limit=limit, offset=offset)
if not threads or len(threads) == 0:
break
all_threads.extend(threads)
offset += len(threads)
print(f"Found: {len(all_threads)} threads", end='\r')
except Exception as search_error:
print(f'\n❌ Error searching threads: {search_error}')
print('Please check:')
print('1. Your server URL is correct')
print('2. Your API key has the right permissions')
print('3. The server is running and accessible')
raise Exception('Could not find a working threads endpoint')
# Get all threads first
all_threads = []
has_more = True
offset = 0
while has_more:
if working_endpoint['method'] == 'POST':
request_body = {'limit': 1000, 'offset': offset}
async with session.post(working_endpoint['url'], json=request_body) as response:
if not response.ok:
raise Exception(f"Search request failed: {response.status} {response.reason}")
threads = await response.json()
else:
url = f"{working_endpoint['url'].split('?')[0]}?limit=1000&offset={offset}" if offset > 0 else working_endpoint['url']
async with session.get(url) as response:
if not response.ok:
raise Exception(f"Search request failed: {response.status} {response.reason}")
threads = await response.json()
if not threads or len(threads) == 0:
has_more = False
else:
all_threads.extend(threads)
offset += len(threads)
print(f"Found: {len(all_threads)} threads", end='\r')
raise
if len(all_threads) == 0:
print('\n📋 No threads found.')
return
# Categorize threads
categories = self.categorize_threads(all_threads)
print(f'\n✅ Found {len(all_threads)} threads')
# Categorize threads ONCE and cache it
categories = await self.categorize_threads(all_threads)
self.display_categories(categories)
# Let user select what to delete (always pass fresh categories)