Compare commits

...

2 Commits

Author SHA1 Message Date
Neeraj Pradhan bb79418b77 add lock file 2025-06-23 13:01:03 -07:00
Neeraj Pradhan efd4963ca6 Make job fetching more robust to connection errors 2025-06-23 12:24:20 -07:00
3 changed files with 66 additions and 22 deletions
+64 -21
View File
@@ -8,6 +8,12 @@ import secrets
import warnings
import httpx
from pydantic import BaseModel
from tenacity import (
retry_if_exception,
stop_after_attempt,
wait_exponential_jitter,
AsyncRetrying,
)
from llama_cloud import (
ExtractAgent as CloudExtractAgent,
ExtractConfig,
@@ -22,6 +28,7 @@ from llama_cloud import (
PaginatedExtractRunsResponse,
)
from llama_cloud.client import AsyncLlamaCloud
from llama_cloud.core.api_error import ApiError
from llama_cloud_services.extract.utils import (
JSONObjectType,
augment_async_errors,
@@ -44,6 +51,17 @@ DEFAULT_EXTRACT_CONFIG = ExtractConfig(
)
def _is_retryable_error(exception: BaseException) -> bool:
"""Check if an exception is retryable."""
if isinstance(exception, ApiError):
return exception.status_code in (502, 503, 504, 425, 408)
elif isinstance(
exception, (httpx.HTTPStatusError, httpx.RequestError, httpx.TimeoutException)
):
return True
return False
class SourceText:
def __init__(
self,
@@ -288,35 +306,60 @@ class ExtractionAgent:
return await self.upload_file(source_text)
async def _get_job_with_retry(self, job_id: str) -> ExtractJob:
"""Get job with retry logic for transient errors."""
async for attempt in AsyncRetrying(
retry=retry_if_exception(_is_retryable_error),
stop=stop_after_attempt(5),
wait=wait_exponential_jitter(initial=1, max=60, jitter=5),
reraise=True,
):
with attempt:
return await self._client.llama_extract.get_job(job_id=job_id)
async def _get_run_with_retry(self, job_id: str) -> ExtractRun:
"""Get extraction run with retry logic for transient errors."""
async for attempt in AsyncRetrying(
retry=retry_if_exception(_is_retryable_error),
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=1, max=20, jitter=3),
reraise=True,
):
with attempt:
return await self._client.llama_extract.get_run_by_job_id(job_id=job_id)
async def _wait_for_job_result(self, job_id: str) -> Optional[ExtractRun]:
"""Wait for and return the results of an extraction job."""
start = time.perf_counter()
tries = 0
while True:
await asyncio.sleep(self.check_interval)
tries += 1
job = await self._client.llama_extract.get_job(
job_id=job_id,
)
if job.status == StatusEnum.SUCCESS:
return await self._client.llama_extract.get_run_by_job_id(
job_id=job_id,
)
elif job.status == StatusEnum.PENDING:
end = time.perf_counter()
if end - start > self.max_timeout:
raise Exception(f"Timeout while extracting the file: {job_id}")
if self._verbose and tries % 10 == 0:
print(".", end="", flush=True)
continue
else:
warnings.warn(
f"Failure in job: {job_id}, status: {job.status}, error: {job.error}"
)
return await self._client.llama_extract.get_run_by_job_id(
job_id=job_id,
)
try:
job = await self._get_job_with_retry(job_id)
if job.status == StatusEnum.SUCCESS:
return await self._get_run_with_retry(job_id)
elif job.status == StatusEnum.PENDING:
end = time.perf_counter()
if end - start > self.max_timeout:
raise Exception(f"Timeout while extracting the file: {job_id}")
if self._verbose and tries % 10 == 0:
print(".", end="", flush=True)
continue
else:
warnings.warn(
f"Failure in job: {job_id}, status: {job.status}, error: {job.error}"
)
return await self._get_run_with_retry(job_id)
except Exception as e:
# If we get a non-retryable error or all retries are exhausted, re-raise
if self._verbose:
print(f"\nError in job polling for {job_id}: {e}")
raise e
def save(self) -> None:
"""Persist the extraction agent's schema and config to the database.
Generated
+1 -1
View File
@@ -4623,4 +4623,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.9,<4.0"
content-hash = "a1cf7a9a94907d14f3b41c1bf65fcdc4cb1bbda816811b750f0b65290e1dc4e8"
content-hash = "346ac2250c92965f5e63b826d34cf958f8839093258689e4c94b7717ea1cb391"
+1
View File
@@ -24,6 +24,7 @@ click = "^8.1.7"
python-dotenv = "^1.0.1"
eval-type-backport = {python = "<3.10", version = "^0.2.0"}
platformdirs = "^4.3.7"
tenacity = "^9.0.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.0.0"