From ae4edf523a073cfcc56a7535e737a60f0a2bef6a Mon Sep 17 00:00:00 2001 From: Liam Bush Date: Tue, 7 Oct 2025 13:51:13 -0700 Subject: [PATCH] Refactor thread cleanup tool to use LangGraph SDK - Replace aiohttp with official langgraph-sdk for thread operations - Add accurate run count tracking via client.runs.list() - Optimize API calls by caching thread categorization (16x reduction) - Update README with current functionality and examples - Improve error handling and user experience --- thread-cleanup/README.md | 50 ++++---- thread-cleanup/delete.py | 250 +++++++++++++++------------------------ 2 files changed, 126 insertions(+), 174 deletions(-) diff --git a/thread-cleanup/README.md b/thread-cleanup/README.md index 5247dd1..2279530 100644 --- a/thread-cleanup/README.md +++ b/thread-cleanup/README.md @@ -5,13 +5,13 @@ Interactive cleanup utility for managing LangGraph threads across deployments. P ## Prerequisites - Python 3.7+ -- `aiohttp` library +- `langgraph-sdk` library ## Installation Install required dependency: ```bash -pip install aiohttp +pip install langgraph-sdk ``` ## Usage @@ -90,31 +90,30 @@ Your API key must have permissions to: - Check that the URL matches your actual LangGraph deployment - Ensure the API key belongs to the correct organization/tenant -### "None of the thread endpoints worked" -- Confirm your LangGraph server is running and accessible -- Verify the URL format is correct -- Check API key permissions -- Ensure network connectivity to the deployment - ### Connection Issues - Verify the deployment URL is reachable - Check for any firewall or network restrictions - Confirm the API key is valid and not expired +- Ensure your LangGraph server is running and accessible ## Example Session ``` Discovering threads... Connecting to: https://your-deployment.us.langgraph.app -Found working endpoint: POST /threads/search Found: 15 threads Total threads found: 15 By Status: ├─ idle: 10 -├─ success: 3 -├─ error: 2 +├─ completed: 3 +├─ failed: 2 + +By Runs: +├─ 0 runs: 5 +├─ 1 run: 7 +├─ 3 runs: 3 What would you like to delete? 1. Delete by TIME @@ -127,19 +126,30 @@ What would you like to delete? Select option (1-7): 2 -Delete by STATUS +Select Status: 1. idle (10 threads) -2. success (3 threads) -3. error (2 threads) -4. Review all status categories -5. Back to main menu +2. completed (3 threads) +3. failed (2 threads) +4. Go back to main menu -Select option (1-5): 1 +Select status (1-4): 1 -You're about to delete 10 idle threads. This cannot be undone! -Do you want to continue? (yes/no): yes +Found 10 threads with status "idle". -Successfully deleted 10 threads +Do you want to: +1. Review threads before deleting +2. Delete immediately +3. Go back to status menu + +Select option (1-3): 2 + +Deleting 10 threads... +Are you sure you want to delete 10 threads? (yes/no): yes +Deleted: 10/10 + +Summary: 10 deleted, 0 failed + +Cleanup completed. Total threads deleted: 10 ``` ## Support diff --git a/thread-cleanup/delete.py b/thread-cleanup/delete.py index 4140819..c1b67c5 100644 --- a/thread-cleanup/delete.py +++ b/thread-cleanup/delete.py @@ -8,26 +8,24 @@ Provides better observability and control over what gets deleted import sys import argparse import asyncio -import aiohttp import json from datetime import datetime, timezone from typing import Dict, List, Optional, Any from urllib.parse import urlparse +from langgraph_sdk import get_client class ThreadCleanup: def __init__(self, base_url: str, api_key: Optional[str] = None): self.base_url = base_url self.api_key = api_key - self.headers = {'Content-Type': 'application/json'} - if api_key: - self.headers['X-Api-Key'] = api_key + self.client = get_client(url=base_url, api_key=api_key) def ask_question(self, question: str) -> str: """Ask user for input""" return input(question) - def categorize_threads(self, threads: List[Dict]) -> Dict: + async def categorize_threads(self, threads: List[Dict]) -> Dict: """Categorize threads by status, runs, and graph ID""" categories = { 'byGraph': {}, @@ -35,8 +33,23 @@ class ThreadCleanup: 'byRuns': {} } - for thread in threads: - run_count = len(thread.get('runs', [])) + print('\n🔍 Fetching run counts...') + # Fetch run counts for all threads + for i, thread in enumerate(threads): + thread_id = thread.get('thread_id') + + # Get run count from API + try: + runs = await self.client.runs.list(thread_id) + run_count = len(runs) if runs else 0 + except: + run_count = 0 + + # Store run count in thread for later display + thread['_run_count'] = run_count + + print(f'Analyzing thread {i+1}/{len(threads)}...', end='\r') + status = thread.get('status', 'unknown') # Graph categorization @@ -69,6 +82,7 @@ class ThreadCleanup: categories['byRuns'][runs_category] = [] categories['byRuns'][runs_category].append(thread) + print(' ' * 50, end='\r') # Clear the progress line # Add allThreads for easy access categories['allThreads'] = threads return categories @@ -84,7 +98,7 @@ class ThreadCleanup: pass status = thread.get('status', 'unknown') - run_count = len(thread.get('runs', [])) + run_count = thread.get('_run_count', 0) metadata = json.dumps(thread.get('metadata', {})) if thread.get('metadata') else 'None' return f""" ID: {thread.get('thread_id', 'Unknown')} @@ -142,19 +156,17 @@ class ThreadCleanup: choice = self.ask_question('\nSelect option (1-7): ') if choice == '1': - return await self.select_by_time(all_threads) + return await self.select_by_time(all_threads, categories) elif choice == '2': - return await self.select_by_status(categories['byStatus'], all_threads) + return await self.select_by_status(categories['byStatus'], all_threads, categories) elif choice == '3': - categories_with_runs = self.categorize_threads(all_threads) - return await self.select_by_runs(categories_with_runs['byRuns'], all_threads) + return await self.select_by_runs(categories['byRuns'], all_threads, categories) elif choice == '4': - categories_with_graph = self.categorize_threads(all_threads) - return await self.select_by_graph(categories_with_graph['byGraph'], all_threads) + return await self.select_by_graph(categories['byGraph'], all_threads, categories) elif choice == '5': - return await self.preview_all_threads(all_threads) + return await self.preview_all_threads(all_threads, categories) elif choice == '6': - return await self.confirm_delete_all(all_threads) + return await self.confirm_delete_all(all_threads, categories) elif choice == '7': print('Exiting without deleting anything.') return None @@ -162,7 +174,7 @@ class ThreadCleanup: print('Invalid choice. Exiting.') return None - async def preview_all_threads(self, all_threads: List[Dict]) -> List[Dict]: + async def preview_all_threads(self, all_threads: List[Dict], categories: Dict) -> List[Dict]: """Preview all threads without filtering""" print(f'\n👁️ Previewing all {len(all_threads)} threads:') @@ -170,8 +182,7 @@ class ThreadCleanup: print('No threads found.') print('1. 🚪 Go back to main menu') self.ask_question('\nSelect option (1): ') - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) threads_per_page = 5 start_index = 0 @@ -196,8 +207,7 @@ class ThreadCleanup: start_index = end_index continue elif choice == '2': - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) else: start_index = end_index continue @@ -205,12 +215,11 @@ class ThreadCleanup: print('\n--- End of all threads ---') print('1. 🚪 Go back to main menu') self.ask_question('\nSelect option (1): ') - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) return [] - async def select_by_time(self, all_threads: List[Dict]) -> Optional[List[Dict]]: + async def select_by_time(self, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]: """Select threads by time""" print('\n⏰ Delete threads created:') print('1. Within the last hour') @@ -237,13 +246,12 @@ class ThreadCleanup: start_time = 0 # All time end_time = now.timestamp() elif choice == '5': - return await self.select_custom_date_range(all_threads) + return await self.select_custom_date_range(all_threads, categories) elif choice == '6': - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) else: print('Invalid choice. Going back.') - return await self.select_by_time(all_threads) + return await self.select_by_time(all_threads, categories) # Filter threads by time range threads_to_delete = [] @@ -269,7 +277,7 @@ class ThreadCleanup: if len(threads_to_delete) == 0: print('No threads match your time criteria.') - return await self.select_by_time(all_threads) + return await self.select_by_time(all_threads, categories) # Ask if they want to review before deleting print('\nDo you want to:') @@ -280,15 +288,15 @@ class ThreadCleanup: review_choice = self.ask_question('\nSelect option (1-3): ') if review_choice == '1': - return await self.review_threads(threads_to_delete, time_range_desc, all_threads) + return await self.review_threads(threads_to_delete, time_range_desc, all_threads, categories) elif review_choice == '2': return threads_to_delete elif review_choice == '3': - return await self.select_by_time(all_threads) + return await self.select_by_time(all_threads, categories) else: return threads_to_delete - async def select_custom_date_range(self, all_threads: List[Dict]) -> Optional[List[Dict]]: + async def select_custom_date_range(self, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]: """Select threads by custom cutoff date""" print('\n📅 Delete threads created before a specific date:') print('Enter date in format: YYYY-MM-DD HH:MM (24-hour format)') @@ -306,11 +314,11 @@ class ThreadCleanup: if cutoff_time > datetime.now().timestamp(): print('❌ Cutoff date cannot be in the future.') - return await self.select_custom_date_range(all_threads) + return await self.select_custom_date_range(all_threads, categories) except ValueError: print('❌ Invalid date format. Please use YYYY-MM-DD or YYYY-MM-DD HH:MM') - return await self.select_custom_date_range(all_threads) + return await self.select_custom_date_range(all_threads, categories) # Filter threads created before the cutoff date threads_to_delete = [] @@ -335,9 +343,9 @@ class ThreadCleanup: choice = self.ask_question('\nSelect option (1-2): ') if choice == '1': - return await self.select_custom_date_range(all_threads) + return await self.select_custom_date_range(all_threads, categories) else: - return await self.select_by_time(all_threads) + return await self.select_by_time(all_threads, categories) # Ask if they want to review before deleting print('\nDo you want to:') @@ -349,17 +357,17 @@ class ThreadCleanup: review_choice = self.ask_question('\nSelect option (1-4): ') if review_choice == '1': - return await self.review_threads(threads_to_delete, f'created before {cutoff_str}', all_threads) + return await self.review_threads(threads_to_delete, f'created before {cutoff_str}', all_threads, categories) elif review_choice == '2': return threads_to_delete elif review_choice == '3': - return await self.select_custom_date_range(all_threads) + return await self.select_custom_date_range(all_threads, categories) elif review_choice == '4': - return await self.select_by_time(all_threads) + return await self.select_by_time(all_threads, categories) else: return threads_to_delete - async def review_threads(self, threads: List[Dict], description: str = '', all_threads: Optional[List[Dict]] = None) -> List[Dict]: + async def review_threads(self, threads: List[Dict], description: str = '', all_threads: Optional[List[Dict]] = None, categories: Optional[Dict] = None) -> List[Dict]: """Review threads before deletion""" description_text = f' {description}' if description else '' print(f'\n👁️ Reviewing {len(threads)} threads{description_text}:') @@ -390,9 +398,8 @@ class ThreadCleanup: elif choice == '2': return threads elif choice == '3': - if all_threads: - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + if all_threads and categories: + return await self.select_threads_to_delete(categories, all_threads) return [] else: start_index = end_index @@ -407,16 +414,15 @@ class ThreadCleanup: if choice == '1': return threads elif choice == '2': - if all_threads: - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + if all_threads and categories: + return await self.select_threads_to_delete(categories, all_threads) return [] else: return threads return threads - async def confirm_delete_all(self, all_threads: List[Dict]) -> List[Dict]: + async def confirm_delete_all(self, all_threads: List[Dict], categories: Dict) -> List[Dict]: """Confirm deletion of all threads""" print(f'\n⚠️ WARNING: You are about to delete ALL {len(all_threads)} threads!') print('This action cannot be undone.') @@ -426,24 +432,21 @@ class ThreadCleanup: initial_choice = self.ask_question('\nSelect option (1-2): ') if initial_choice != '1': - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) confirm1 = self.ask_question('\nType "DELETE ALL" to confirm: ') if confirm1 != 'DELETE ALL': print('Confirmation failed. Returning to main menu.') - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) confirm2 = self.ask_question(f'\nFinal confirmation: Delete all {len(all_threads)} threads? (yes/no): ') if confirm2.lower() != 'yes': print('Deletion cancelled. Returning to main menu.') - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) return all_threads - async def select_by_status(self, by_status: Dict, all_threads: List[Dict]) -> Optional[List[Dict]]: + async def select_by_status(self, by_status: Dict, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]: """Select threads by status""" print('\n📝 Select Status:') statuses = list(by_status.keys()) @@ -477,21 +480,20 @@ class ThreadCleanup: review_choice = self.ask_question('\nSelect option (1-3): ') if review_choice == '1': - return await self.review_threads(threads_to_delete, f'with status "{selected_status}"', all_threads) + return await self.review_threads(threads_to_delete, f'with status "{selected_status}"', all_threads, categories) elif review_choice == '2': return threads_to_delete elif review_choice == '3': - return await self.select_by_status(by_status, all_threads) + return await self.select_by_status(by_status, all_threads, categories) else: return threads_to_delete elif index == len(statuses): # Go back to main menu - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) return [] - async def select_by_runs(self, by_runs: Dict, all_threads: List[Dict]) -> Optional[List[Dict]]: + async def select_by_runs(self, by_runs: Dict, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]: """Select threads by runs count""" print('\n🏃 Select by Runs Count:') @@ -544,21 +546,20 @@ class ThreadCleanup: review_choice = self.ask_question('\nSelect option (1-3): ') if review_choice == '1': - return await self.review_threads(threads_to_delete, f'with {selected_category}', all_threads) + return await self.review_threads(threads_to_delete, f'with {selected_category}', all_threads, categories) elif review_choice == '2': return threads_to_delete elif review_choice == '3': - return await self.select_by_runs(by_runs, all_threads) + return await self.select_by_runs(by_runs, all_threads, categories) else: return threads_to_delete elif index == len(runs_categories): # Go back to main menu - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) return [] - async def select_by_graph(self, by_graph: Dict, all_threads: List[Dict]) -> Optional[List[Dict]]: + async def select_by_graph(self, by_graph: Dict, all_threads: List[Dict], categories: Dict) -> Optional[List[Dict]]: """Select threads by graph ID""" print('\n🔧 Select by Graph ID:') graphs = list(by_graph.keys()) @@ -584,17 +585,16 @@ class ThreadCleanup: review_choice = self.ask_question('\nSelect option (1-3): ') if review_choice == '1': - return await self.review_threads(threads_to_delete, f'for graph "{selected_graph}"', all_threads) + return await self.review_threads(threads_to_delete, f'for graph "{selected_graph}"', all_threads, categories) elif review_choice == '2': return threads_to_delete elif review_choice == '3': - return await self.select_by_graph(by_graph, all_threads) + return await self.select_by_graph(by_graph, all_threads, categories) else: return threads_to_delete elif index == len(graphs): # Go back to main menu - full_categories = self.categorize_threads(all_threads) - return await self.select_threads_to_delete(full_categories, all_threads) + return await self.select_threads_to_delete(categories, all_threads) return [] @@ -613,20 +613,14 @@ class ThreadCleanup: deleted = 0 failed = 0 - async with aiohttp.ClientSession(headers=self.headers) as session: - for thread in threads_to_delete: - try: - delete_url = f"{self.base_url}/threads/{thread['thread_id']}" - async with session.delete(delete_url) as response: - if not response.ok: - print(f"❌ Failed to delete thread {thread['thread_id']}: {response.status} {response.reason}") - failed += 1 - else: - deleted += 1 - print(f"✅ Deleted: {deleted}/{len(threads_to_delete)}", end='\r') - except Exception as delete_error: - print(f"❌ Error deleting thread {thread['thread_id']}: {delete_error}") - failed += 1 + for thread in threads_to_delete: + try: + await self.client.threads.delete(thread['thread_id']) + deleted += 1 + print(f"✅ Deleted: {deleted}/{len(threads_to_delete)}", end='\r') + except Exception as delete_error: + print(f"❌ Error deleting thread {thread['thread_id']}: {delete_error}") + failed += 1 print(f'\n\n📈 Summary: {deleted} deleted, {failed} failed') return deleted @@ -637,87 +631,35 @@ class ThreadCleanup: print('🔍 Discovering threads...') print(f'📡 Connecting to: {self.base_url}') - # Try different endpoint variations to find the correct one - endpoints_to_try = [ - {'url': f'{self.base_url}/threads/search', 'method': 'POST', 'body': {'limit': 1000, 'offset': 0}}, - {'url': f'{self.base_url}/threads', 'method': 'GET', 'body': None}, - {'url': f'{self.base_url}/threads?limit=1000', 'method': 'GET', 'body': None} - ] + # Get all threads using SDK + all_threads = [] + offset = 0 + limit = 100 - search_response = None - working_endpoint = None - - async with aiohttp.ClientSession(headers=self.headers) as session: - for endpoint in endpoints_to_try: - print(f"🔍 Trying {endpoint['method']} {endpoint['url']}") - - try: - if endpoint['method'] == 'POST': - async with session.post(endpoint['url'], json=endpoint['body']) as response: - if response.ok: - search_response = response - working_endpoint = endpoint - print(f"✅ Found working endpoint: {endpoint['method']} {endpoint['url']}") - break - else: - print(f"❌ {endpoint['method']} {endpoint['url']} failed: {response.status} {response.reason}") - if response.status in [401, 403]: - error_text = await response.text() - print(f" Error details: {error_text}") - else: - async with session.get(endpoint['url']) as response: - if response.ok: - search_response = response - working_endpoint = endpoint - print(f"✅ Found working endpoint: {endpoint['method']} {endpoint['url']}") - break - else: - print(f"❌ {endpoint['method']} {endpoint['url']} failed: {response.status} {response.reason}") - if response.status in [401, 403]: - error_text = await response.text() - print(f" Error details: {error_text}") - except Exception as fetch_error: - print(f"❌ {endpoint['method']} {endpoint['url']} error: {fetch_error}") - - if not search_response or not working_endpoint: - print('\n❌ None of the thread endpoints worked. Please check:') + while True: + try: + threads = await self.client.threads.search(limit=limit, offset=offset) + if not threads or len(threads) == 0: + break + all_threads.extend(threads) + offset += len(threads) + print(f"Found: {len(all_threads)} threads", end='\r') + except Exception as search_error: + print(f'\n❌ Error searching threads: {search_error}') + print('Please check:') print('1. Your server URL is correct') print('2. Your API key has the right permissions') print('3. The server is running and accessible') - raise Exception('Could not find a working threads endpoint') - - # Get all threads first - all_threads = [] - has_more = True - offset = 0 - - while has_more: - if working_endpoint['method'] == 'POST': - request_body = {'limit': 1000, 'offset': offset} - async with session.post(working_endpoint['url'], json=request_body) as response: - if not response.ok: - raise Exception(f"Search request failed: {response.status} {response.reason}") - threads = await response.json() - else: - url = f"{working_endpoint['url'].split('?')[0]}?limit=1000&offset={offset}" if offset > 0 else working_endpoint['url'] - async with session.get(url) as response: - if not response.ok: - raise Exception(f"Search request failed: {response.status} {response.reason}") - threads = await response.json() - - if not threads or len(threads) == 0: - has_more = False - else: - all_threads.extend(threads) - offset += len(threads) - print(f"Found: {len(all_threads)} threads", end='\r') + raise if len(all_threads) == 0: print('\n📋 No threads found.') return - # Categorize threads - categories = self.categorize_threads(all_threads) + print(f'\n✅ Found {len(all_threads)} threads') + + # Categorize threads ONCE and cache it + categories = await self.categorize_threads(all_threads) self.display_categories(categories) # Let user select what to delete (always pass fresh categories)