mirror of
https://github.com/run-llama/llama_cloud_services.git
synced 2026-07-01 21:44:37 -04:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| bb79418b77 | |||
| efd4963ca6 |
@@ -8,6 +8,12 @@ import secrets
|
||||
import warnings
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
from tenacity import (
|
||||
retry_if_exception,
|
||||
stop_after_attempt,
|
||||
wait_exponential_jitter,
|
||||
AsyncRetrying,
|
||||
)
|
||||
from llama_cloud import (
|
||||
ExtractAgent as CloudExtractAgent,
|
||||
ExtractConfig,
|
||||
@@ -22,6 +28,7 @@ from llama_cloud import (
|
||||
PaginatedExtractRunsResponse,
|
||||
)
|
||||
from llama_cloud.client import AsyncLlamaCloud
|
||||
from llama_cloud.core.api_error import ApiError
|
||||
from llama_cloud_services.extract.utils import (
|
||||
JSONObjectType,
|
||||
augment_async_errors,
|
||||
@@ -44,6 +51,17 @@ DEFAULT_EXTRACT_CONFIG = ExtractConfig(
|
||||
)
|
||||
|
||||
|
||||
def _is_retryable_error(exception: BaseException) -> bool:
|
||||
"""Check if an exception is retryable."""
|
||||
if isinstance(exception, ApiError):
|
||||
return exception.status_code in (502, 503, 504, 425, 408)
|
||||
elif isinstance(
|
||||
exception, (httpx.HTTPStatusError, httpx.RequestError, httpx.TimeoutException)
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class SourceText:
|
||||
def __init__(
|
||||
self,
|
||||
@@ -288,35 +306,60 @@ class ExtractionAgent:
|
||||
|
||||
return await self.upload_file(source_text)
|
||||
|
||||
async def _get_job_with_retry(self, job_id: str) -> ExtractJob:
|
||||
"""Get job with retry logic for transient errors."""
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception(_is_retryable_error),
|
||||
stop=stop_after_attempt(5),
|
||||
wait=wait_exponential_jitter(initial=1, max=60, jitter=5),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
return await self._client.llama_extract.get_job(job_id=job_id)
|
||||
|
||||
async def _get_run_with_retry(self, job_id: str) -> ExtractRun:
|
||||
"""Get extraction run with retry logic for transient errors."""
|
||||
async for attempt in AsyncRetrying(
|
||||
retry=retry_if_exception(_is_retryable_error),
|
||||
stop=stop_after_attempt(3),
|
||||
wait=wait_exponential_jitter(initial=1, max=20, jitter=3),
|
||||
reraise=True,
|
||||
):
|
||||
with attempt:
|
||||
return await self._client.llama_extract.get_run_by_job_id(job_id=job_id)
|
||||
|
||||
async def _wait_for_job_result(self, job_id: str) -> Optional[ExtractRun]:
|
||||
"""Wait for and return the results of an extraction job."""
|
||||
start = time.perf_counter()
|
||||
tries = 0
|
||||
|
||||
while True:
|
||||
await asyncio.sleep(self.check_interval)
|
||||
tries += 1
|
||||
job = await self._client.llama_extract.get_job(
|
||||
job_id=job_id,
|
||||
)
|
||||
|
||||
if job.status == StatusEnum.SUCCESS:
|
||||
return await self._client.llama_extract.get_run_by_job_id(
|
||||
job_id=job_id,
|
||||
)
|
||||
elif job.status == StatusEnum.PENDING:
|
||||
end = time.perf_counter()
|
||||
if end - start > self.max_timeout:
|
||||
raise Exception(f"Timeout while extracting the file: {job_id}")
|
||||
if self._verbose and tries % 10 == 0:
|
||||
print(".", end="", flush=True)
|
||||
continue
|
||||
else:
|
||||
warnings.warn(
|
||||
f"Failure in job: {job_id}, status: {job.status}, error: {job.error}"
|
||||
)
|
||||
return await self._client.llama_extract.get_run_by_job_id(
|
||||
job_id=job_id,
|
||||
)
|
||||
try:
|
||||
job = await self._get_job_with_retry(job_id)
|
||||
|
||||
if job.status == StatusEnum.SUCCESS:
|
||||
return await self._get_run_with_retry(job_id)
|
||||
elif job.status == StatusEnum.PENDING:
|
||||
end = time.perf_counter()
|
||||
if end - start > self.max_timeout:
|
||||
raise Exception(f"Timeout while extracting the file: {job_id}")
|
||||
if self._verbose and tries % 10 == 0:
|
||||
print(".", end="", flush=True)
|
||||
continue
|
||||
else:
|
||||
warnings.warn(
|
||||
f"Failure in job: {job_id}, status: {job.status}, error: {job.error}"
|
||||
)
|
||||
return await self._get_run_with_retry(job_id)
|
||||
|
||||
except Exception as e:
|
||||
# If we get a non-retryable error or all retries are exhausted, re-raise
|
||||
if self._verbose:
|
||||
print(f"\nError in job polling for {job_id}: {e}")
|
||||
raise e
|
||||
|
||||
def save(self) -> None:
|
||||
"""Persist the extraction agent's schema and config to the database.
|
||||
|
||||
Generated
+1
-1
@@ -4623,4 +4623,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.9,<4.0"
|
||||
content-hash = "a1cf7a9a94907d14f3b41c1bf65fcdc4cb1bbda816811b750f0b65290e1dc4e8"
|
||||
content-hash = "346ac2250c92965f5e63b826d34cf958f8839093258689e4c94b7717ea1cb391"
|
||||
|
||||
@@ -24,6 +24,7 @@ click = "^8.1.7"
|
||||
python-dotenv = "^1.0.1"
|
||||
eval-type-backport = {python = "<3.10", version = "^0.2.0"}
|
||||
platformdirs = "^4.3.7"
|
||||
tenacity = "^9.0.0"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^8.0.0"
|
||||
|
||||
Reference in New Issue
Block a user