mirror of
https://github.com/run-llama/llama_cloud_services.git
synced 2026-07-01 21:44:37 -04:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e80e75f2f9 | |||
| 5ea758b853 | |||
| 208b6f2fa5 | |||
| e1b9143f79 | |||
| 232c55bd6a | |||
| ab6f2f8da5 | |||
| 66c2639ec8 | |||
| da1916c69f | |||
| 345e272573 |
@@ -1,8 +1,8 @@
|
||||
name: Hourly Extract E2E Tests
|
||||
name: Extract E2E Tests (every 4 hours)
|
||||
|
||||
on:
|
||||
schedule:
|
||||
- cron: "18 * * * *"
|
||||
- cron: "0 */4 * * *"
|
||||
workflow_dispatch:
|
||||
# Allows manual triggering
|
||||
inputs:
|
||||
@@ -29,7 +29,7 @@ env:
|
||||
|
||||
jobs:
|
||||
extract-e2e:
|
||||
name: "Hourly Extract E2E Tests (${{ matrix.environment }})"
|
||||
name: "Extract E2E Tests (${{ matrix.environment }})"
|
||||
runs-on: ubuntu-latest
|
||||
timeout-minutes: 30
|
||||
concurrency:
|
||||
@@ -149,7 +149,7 @@ jobs:
|
||||
- name: Post to Extract Slack channel
|
||||
id: slack
|
||||
if: (failure() || cancelled()) && steps.runtime.outputs.notify_slack == 'true'
|
||||
uses: slackapi/slack-github-action@v1.27.0
|
||||
uses: slackapi/slack-github-action@v2.1.1
|
||||
with:
|
||||
channel-id: ${{ env.SLACK_CHANNEL_ID }}
|
||||
slack-message: |
|
||||
|
||||
@@ -1,5 +1,17 @@
|
||||
# llama-cloud-services-py
|
||||
|
||||
## 0.6.94
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- 232c55b: Include xlsx files in extract input
|
||||
|
||||
## 0.6.93
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- da1916c: Add more warnings
|
||||
|
||||
## 0.6.92
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -4,6 +4,16 @@
|
||||
|
||||
# Llama Cloud Services
|
||||
|
||||
> **⚠️ DEPRECATION NOTICE**
|
||||
>
|
||||
> This repository and its packages are deprecated and will be maintained until **May 1, 2026**.
|
||||
>
|
||||
> **Please migrate to the new packages:**
|
||||
> - **Python**: `pip install llama-cloud>=1.0` ([GitHub](https://github.com/run-llama/llama-cloud-py))
|
||||
> - **TypeScript**: `npm install @llamaindex/llama-cloud` ([GitHub](https://github.com/run-llama/llama-cloud-ts))
|
||||
>
|
||||
> The new packages provide the same functionality with improved performance, better support, and active development.
|
||||
|
||||
This repository contains the code for hand-written SDKs and clients for interacting with LlamaCloud.
|
||||
|
||||
This includes:
|
||||
|
||||
@@ -806,6 +806,7 @@ class LlamaExtract(BaseComponent):
|
||||
# Document files
|
||||
".pdf": "application/pdf",
|
||||
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
|
||||
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||
# Image files
|
||||
".png": "image/png",
|
||||
".jpg": "image/jpeg",
|
||||
|
||||
@@ -4,5 +4,16 @@ from llama_cloud_services.parse.base import (
|
||||
ParsingMode,
|
||||
FailedPageMode,
|
||||
)
|
||||
from llama_cloud_services.parse.temporal import (
|
||||
is_heartbeat_timeout_error,
|
||||
retry_on_heartbeat_timeout,
|
||||
)
|
||||
|
||||
__all__ = ["LlamaParse", "ResultType", "ParsingMode", "FailedPageMode"]
|
||||
__all__ = [
|
||||
"LlamaParse",
|
||||
"ResultType",
|
||||
"ParsingMode",
|
||||
"FailedPageMode",
|
||||
"is_heartbeat_timeout_error",
|
||||
"retry_on_heartbeat_timeout",
|
||||
]
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
"""Utilities for retrying LlamaParse activities in Temporal
|
||||
specifically when they fail due to activity heartbeat timeout.
|
||||
|
||||
These utilities detect Temporal's ActivityError wrapping a TimeoutError
|
||||
with timeout_type HEARTBEAT, without requiring a direct dependency on the
|
||||
Temporal SDK.
|
||||
|
||||
Usage in a Temporal workflow::
|
||||
|
||||
from temporalio import workflow
|
||||
from llama_cloud_services.parse.temporal import retry_on_heartbeat_timeout
|
||||
|
||||
@workflow.defn
|
||||
class ParseWorkflow:
|
||||
@workflow.run
|
||||
async def run(self, input: ParseInput) -> ParseResult:
|
||||
parse_activity = workflow.start_activity(
|
||||
"llama_parse",
|
||||
input,
|
||||
start_to_close_timeout=timedelta(minutes=30),
|
||||
heartbeat_timeout=timedelta(seconds=60),
|
||||
retry_policy=RetryPolicy(maximum_attempts=1), # disable built-in retry
|
||||
)
|
||||
return await retry_on_heartbeat_timeout(parse_activity)
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Any, Awaitable, Callable, Optional, TypeVar
|
||||
|
||||
T = TypeVar("T")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Temporal TimeoutType enum value for HEARTBEAT (from temporalio proto)
|
||||
_TIMEOUT_TYPE_HEARTBEAT = 4
|
||||
|
||||
|
||||
def is_heartbeat_timeout_error(error: BaseException) -> bool:
|
||||
"""Check if an error is a Temporal activity heartbeat timeout error.
|
||||
|
||||
Detects both the structured Temporal error types (ActivityError wrapping
|
||||
TimeoutError with timeout_type HEARTBEAT) and fallback string matching
|
||||
on the error message.
|
||||
|
||||
Works with the ``temporalio`` Python SDK error types without requiring
|
||||
a direct import.
|
||||
|
||||
Args:
|
||||
error: The exception to check.
|
||||
|
||||
Returns:
|
||||
True if the error represents a heartbeat timeout.
|
||||
"""
|
||||
error_type_name = type(error).__name__
|
||||
|
||||
# Check for Temporal's ActivityError → TimeoutError chain
|
||||
if error_type_name in ("ActivityError", "ActivityFailure"):
|
||||
cause = getattr(error, "cause", None) or error.__cause__
|
||||
if cause is not None:
|
||||
cause_type_name = type(cause).__name__
|
||||
if cause_type_name in ("TimeoutError", "TimeoutFailure"):
|
||||
timeout_type = getattr(cause, "type", None) or getattr(
|
||||
cause, "timeout_type", None
|
||||
)
|
||||
if timeout_type is not None:
|
||||
# Handle both enum and int representations
|
||||
timeout_type_val = (
|
||||
timeout_type.value
|
||||
if hasattr(timeout_type, "value")
|
||||
else timeout_type
|
||||
)
|
||||
return timeout_type_val == _TIMEOUT_TYPE_HEARTBEAT
|
||||
|
||||
# Fallback: match on error message
|
||||
message = str(error).lower()
|
||||
return "heartbeat timeout" in message or "heartbeat_timeout" in message
|
||||
|
||||
|
||||
async def retry_on_heartbeat_timeout(
|
||||
fn: Callable[..., Awaitable[T]],
|
||||
*args: Any,
|
||||
max_retries: int = 3,
|
||||
on_retry: Optional[Callable[[int, BaseException], None]] = None,
|
||||
**kwargs: Any,
|
||||
) -> T:
|
||||
"""Retry an async function only when it fails with a heartbeat timeout error.
|
||||
|
||||
All other errors are raised immediately without retry.
|
||||
|
||||
Designed for use in Temporal workflows to wrap activity calls so that
|
||||
transient heartbeat timeouts (e.g. from SIGTERM, event-loop blocking)
|
||||
are automatically retried while genuine failures propagate immediately.
|
||||
|
||||
Args:
|
||||
fn: The async function to execute (typically a Temporal activity call).
|
||||
*args: Positional arguments passed to ``fn``.
|
||||
max_retries: Maximum number of retry attempts after the initial try
|
||||
(default: 3).
|
||||
on_retry: Optional callback invoked before each retry with the attempt
|
||||
number and the error.
|
||||
**kwargs: Keyword arguments passed to ``fn``.
|
||||
|
||||
Returns:
|
||||
The result of the function.
|
||||
|
||||
Raises:
|
||||
The original exception if it is not a heartbeat timeout or retries
|
||||
are exhausted.
|
||||
"""
|
||||
last_error: Optional[BaseException] = None
|
||||
|
||||
for attempt in range(max_retries + 1):
|
||||
try:
|
||||
return await fn(*args, **kwargs)
|
||||
except BaseException as err:
|
||||
last_error = err
|
||||
if is_heartbeat_timeout_error(err) and attempt < max_retries:
|
||||
logger.warning(
|
||||
"LlamaParse activity failed with heartbeat timeout "
|
||||
"(attempt %d/%d), retrying...",
|
||||
attempt + 1,
|
||||
max_retries,
|
||||
)
|
||||
if on_retry:
|
||||
on_retry(attempt + 1, err)
|
||||
continue
|
||||
raise
|
||||
|
||||
# Should not reach here, but satisfy type checker
|
||||
assert last_error is not None
|
||||
raise last_error
|
||||
@@ -1,5 +1,21 @@
|
||||
# llama_parse
|
||||
|
||||
## 0.6.94
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- 232c55b: Include xlsx files in extract input
|
||||
- Updated dependencies [232c55b]
|
||||
- llama-cloud-services-py@0.6.94
|
||||
|
||||
## 0.6.93
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- da1916c: Add more warnings
|
||||
- Updated dependencies [da1916c]
|
||||
- llama-cloud-services-py@0.6.93
|
||||
|
||||
## 0.6.92
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,5 +1,15 @@
|
||||
# LlamaParse
|
||||
|
||||
> **⚠️ DEPRECATION NOTICE**
|
||||
>
|
||||
> This repository and its packages are deprecated and will be maintained until **May 1, 2026**.
|
||||
>
|
||||
> **Please migrate to the new packages:**
|
||||
> - **Python**: `pip install llama-cloud>=1.0` ([GitHub](https://github.com/run-llama/llama-cloud-py))
|
||||
> - **TypeScript**: `npm install @llamaindex/llama-cloud` ([GitHub](https://github.com/run-llama/llama-cloud-ts))
|
||||
>
|
||||
> The new packages provide the same functionality with improved performance, better support, and active development.
|
||||
|
||||
[](https://pypi.org/project/llama-parse/)
|
||||
[](https://github.com/run-llama/llama_parse/graphs/contributors)
|
||||
[](https://discord.gg/dGcwcsnxhU)
|
||||
|
||||
@@ -1,8 +1,18 @@
|
||||
from llama_cloud_services.parse import (
|
||||
import warnings
|
||||
from llama_cloud_services.parse import ( # type: ignore[attr-defined]
|
||||
LlamaParse,
|
||||
ResultType,
|
||||
ParsingMode,
|
||||
FailedPageMode,
|
||||
)
|
||||
|
||||
warnings.warn(
|
||||
"The 'llama-parse' package is deprecated and will no longer receive updates. "
|
||||
"Please migrate to the new unified SDK. "
|
||||
"See https://developers.llamaindex.ai/python/cloud/llamaparse/getting_started/ "
|
||||
"and https://github.com/run-llama/llama-cloud-py/blob/main/README.md for migration instructions.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
|
||||
__all__ = ["LlamaParse", "ResultType", "ParsingMode", "FailedPageMode"]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "llama_parse",
|
||||
"version": "0.6.92",
|
||||
"version": "0.6.94",
|
||||
"description": "",
|
||||
"main": "index.js",
|
||||
"private": false,
|
||||
|
||||
@@ -11,13 +11,13 @@ dev = [
|
||||
|
||||
[project]
|
||||
name = "llama-parse"
|
||||
version = "0.6.92"
|
||||
version = "0.6.94"
|
||||
description = "Parse files into RAG-Optimized formats."
|
||||
authors = [{name = "Logan Markewich", email = "logan@llamaindex.ai"}]
|
||||
requires-python = ">=3.9,<4.0"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
dependencies = ["llama-cloud-services>=0.6.92"]
|
||||
dependencies = ["llama-cloud-services>=0.6.94"]
|
||||
|
||||
[project.scripts]
|
||||
llama-parse = "llama_parse.cli.main:parse"
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "llama-cloud-services-py",
|
||||
"version": "0.6.92",
|
||||
"version": "0.6.94",
|
||||
"private": false,
|
||||
"license": "MIT",
|
||||
"scripts": {},
|
||||
|
||||
+1
-1
@@ -23,7 +23,7 @@ dev = [
|
||||
|
||||
[project]
|
||||
name = "llama-cloud-services"
|
||||
version = "0.6.92"
|
||||
version = "0.6.94"
|
||||
description = "Tailored SDK clients for LlamaCloud services."
|
||||
authors = [{name = "Logan Markewich", email = "logan@runllama.ai"}]
|
||||
requires-python = ">=3.9,<4.0"
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from typing import Any, Dict, Optional, Union
|
||||
|
||||
from llama_cloud.core.api_error import ApiError
|
||||
from llama_cloud.types import ExtractConfig
|
||||
@@ -13,9 +12,6 @@ from tenacity import (
|
||||
|
||||
from llama_cloud_services.extract import ExtractionAgent, LlamaExtract
|
||||
|
||||
# Global storage for agents to cleanup
|
||||
_TEST_AGENTS_TO_CLEANUP: List[str] = []
|
||||
|
||||
|
||||
def _is_rate_limit_error(exception: BaseException) -> bool:
|
||||
"""Check if the exception is a rate limit error (429)."""
|
||||
@@ -42,38 +38,3 @@ def pytest_configure(config):
|
||||
"""Register custom markers for extract tests."""
|
||||
config.addinivalue_line("markers", "agent_name: custom agent name for test")
|
||||
config.addinivalue_line("markers", "agent_schema: custom agent schema for test")
|
||||
|
||||
|
||||
def pytest_sessionfinish(session, exitstatus):
|
||||
"""Hook that runs after all tests complete - cleanup agents here"""
|
||||
print(
|
||||
f"pytest_sessionfinish hook called! Agents to cleanup: {_TEST_AGENTS_TO_CLEANUP}"
|
||||
)
|
||||
|
||||
if _TEST_AGENTS_TO_CLEANUP:
|
||||
print("Creating cleanup client...")
|
||||
# Create a fresh client just for cleanup
|
||||
cleanup_client = LlamaExtract(
|
||||
api_key=os.getenv("LLAMA_CLOUD_API_KEY"),
|
||||
base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
|
||||
project_id=os.getenv("LLAMA_CLOUD_PROJECT_ID"),
|
||||
verbose=True,
|
||||
)
|
||||
|
||||
for agent_id in _TEST_AGENTS_TO_CLEANUP:
|
||||
try:
|
||||
print(f"Deleting agent {agent_id}...")
|
||||
cleanup_client.delete_agent(agent_id)
|
||||
print(f"Cleaned up agent {agent_id}")
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to delete agent {agent_id}: {e}")
|
||||
|
||||
_TEST_AGENTS_TO_CLEANUP.clear()
|
||||
print("Agent cleanup completed")
|
||||
else:
|
||||
print("No agents to cleanup")
|
||||
|
||||
|
||||
def register_agent_for_cleanup(agent_id: str):
|
||||
"""Register an agent ID for cleanup at the end of the test session"""
|
||||
_TEST_AGENTS_TO_CLEANUP.append(agent_id)
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import os
|
||||
import shutil
|
||||
import uuid
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from pydantic import BaseModel
|
||||
@@ -6,7 +8,7 @@ from pydantic import BaseModel
|
||||
from llama_cloud_services.extract import LlamaExtract, ExtractionAgent, SourceText
|
||||
from llama_cloud.types import ExtractConfig, ExtractMode, ExtractRun
|
||||
from tests.extract.util import load_test_dotenv
|
||||
from .conftest import register_agent_for_cleanup, create_agent_with_retry
|
||||
from .conftest import create_agent_with_retry
|
||||
|
||||
load_test_dotenv()
|
||||
|
||||
@@ -59,17 +61,27 @@ def test_schema_dict():
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_agent(llama_extract, test_agent_name, test_schema_dict, request):
|
||||
"""Creates a test agent and collects it for cleanup at the end of all tests"""
|
||||
test_id = request.node.nodeid
|
||||
test_hash = hex(hash(test_id))[-8:]
|
||||
base_name = test_agent_name
|
||||
def unique_test_pdf(tmp_path):
|
||||
"""Copy test PDF to a unique path to avoid file deduplication across parallel tests.
|
||||
|
||||
Uses a UUID in the filename so that external_file_id is unique regardless of
|
||||
whether the full path or just the filename is sent to the backend.
|
||||
"""
|
||||
unique_name = f"{TEST_PDF.stem}-{uuid.uuid4().hex[:8]}{TEST_PDF.suffix}"
|
||||
unique_pdf = tmp_path / unique_name
|
||||
shutil.copy2(TEST_PDF, unique_pdf)
|
||||
return unique_pdf
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_agent(llama_extract, test_agent_name, test_schema_dict, request):
|
||||
"""Creates a test agent with a unique name and cleans it up after the test."""
|
||||
unique_id = uuid.uuid4().hex[:8]
|
||||
base_name = next(
|
||||
(marker.args[0] for marker in request.node.iter_markers("agent_name")),
|
||||
base_name,
|
||||
test_agent_name,
|
||||
)
|
||||
name = f"{base_name}_{test_hash}"
|
||||
name = f"{base_name}_{unique_id}"
|
||||
|
||||
schema = next(
|
||||
(
|
||||
@@ -79,25 +91,20 @@ def test_agent(llama_extract, test_agent_name, test_schema_dict, request):
|
||||
test_schema_dict,
|
||||
)
|
||||
|
||||
# Cleanup existing agent
|
||||
try:
|
||||
for agent in llama_extract.list_agents():
|
||||
if agent.name == name:
|
||||
llama_extract.delete_agent(agent.id)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to cleanup existing agent: {e}")
|
||||
|
||||
# Use config with cache invalidation to ensure fresh results in tests
|
||||
config = ExtractConfig(invalidate_cache=True)
|
||||
agent = create_agent_with_retry(
|
||||
llama_extract, name=name, data_schema=schema, config=config
|
||||
)
|
||||
|
||||
# Add agent to cleanup list via conftest helper
|
||||
register_agent_for_cleanup(agent.id)
|
||||
|
||||
yield agent
|
||||
|
||||
# Inline cleanup -- each worker cleans up its own agents
|
||||
try:
|
||||
llama_extract.delete_agent(agent.id)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to cleanup agent {agent.id}: {e}")
|
||||
|
||||
|
||||
class TestLlamaExtract:
|
||||
def test_init_without_api_key(self):
|
||||
@@ -138,34 +145,38 @@ class TestLlamaExtract:
|
||||
|
||||
class TestExtractionAgent:
|
||||
@pytest.mark.asyncio
|
||||
async def test_extract_single_file(self, test_agent):
|
||||
result = await test_agent.aextract(TEST_PDF)
|
||||
async def test_extract_single_file(self, test_agent, unique_test_pdf):
|
||||
result = await test_agent.aextract(unique_test_pdf)
|
||||
assert result.status == "SUCCESS"
|
||||
assert result.data is not None
|
||||
assert isinstance(result.data, dict)
|
||||
assert "title" in result.data
|
||||
assert "summary" in result.data
|
||||
|
||||
def test_sync_extract_single_file(self, test_agent):
|
||||
result = test_agent.extract(TEST_PDF)
|
||||
def test_sync_extract_single_file(self, test_agent, unique_test_pdf):
|
||||
result = test_agent.extract(unique_test_pdf)
|
||||
assert result.status == "SUCCESS"
|
||||
assert result.data is not None
|
||||
assert isinstance(result.data, dict)
|
||||
assert "title" in result.data
|
||||
assert "summary" in result.data
|
||||
|
||||
def test_extract_file_from_buffered_io(self, test_agent):
|
||||
result = test_agent.extract(SourceText(file=open(TEST_PDF, "rb")))
|
||||
def test_extract_file_from_buffered_io(self, test_agent, unique_test_pdf):
|
||||
result = test_agent.extract(
|
||||
SourceText(file=open(unique_test_pdf, "rb"), filename=unique_test_pdf.name)
|
||||
)
|
||||
assert result.status == "SUCCESS"
|
||||
assert result.data is not None
|
||||
assert isinstance(result.data, dict)
|
||||
assert "title" in result.data
|
||||
assert "summary" in result.data
|
||||
|
||||
def test_extract_file_from_bytes(self, test_agent):
|
||||
with open(TEST_PDF, "rb") as f:
|
||||
def test_extract_file_from_bytes(self, test_agent, unique_test_pdf):
|
||||
with open(unique_test_pdf, "rb") as f:
|
||||
file_bytes = f.read()
|
||||
result = test_agent.extract(SourceText(file=file_bytes, filename=TEST_PDF.name))
|
||||
result = test_agent.extract(
|
||||
SourceText(file=file_bytes, filename=unique_test_pdf.name)
|
||||
)
|
||||
assert result.status == "SUCCESS"
|
||||
assert result.data is not None
|
||||
assert isinstance(result.data, dict)
|
||||
@@ -181,7 +192,10 @@ class TestExtractionAgent:
|
||||
weight for 8 to 13 km (5–8 miles).[3] The name llama (also historically spelled
|
||||
"glama") was adopted by European settlers from native Peruvians.
|
||||
"""
|
||||
result = test_agent.extract(SourceText(text_content=TEST_TEXT))
|
||||
unique_name = f"text-{uuid.uuid4().hex[:8]}.txt"
|
||||
result = test_agent.extract(
|
||||
SourceText(text_content=TEST_TEXT, filename=unique_name)
|
||||
)
|
||||
assert result.status == "SUCCESS"
|
||||
assert result.data is not None
|
||||
assert isinstance(result.data, dict)
|
||||
@@ -189,8 +203,8 @@ class TestExtractionAgent:
|
||||
assert "summary" in result.data
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extract_multiple_files(self, test_agent):
|
||||
files = [TEST_PDF, TEST_PDF] # Using same file twice for testing
|
||||
async def test_extract_multiple_files(self, test_agent, unique_test_pdf):
|
||||
files = [unique_test_pdf, unique_test_pdf] # Using same file twice for testing
|
||||
response = await test_agent.aextract(files)
|
||||
|
||||
assert len(response) == 2
|
||||
@@ -219,15 +233,15 @@ class TestExtractionAgent:
|
||||
updated_agent = llama_extract.get_agent(name=test_agent.name)
|
||||
assert "new_field" in updated_agent.data_schema["properties"]
|
||||
|
||||
def test_list_extraction_runs(self, test_agent: ExtractionAgent):
|
||||
def test_list_extraction_runs(self, test_agent: ExtractionAgent, unique_test_pdf):
|
||||
assert test_agent.list_extraction_runs().total == 0
|
||||
test_agent.extract(TEST_PDF)
|
||||
test_agent.extract(unique_test_pdf)
|
||||
runs = test_agent.list_extraction_runs()
|
||||
assert runs.total > 0
|
||||
|
||||
def test_delete_extraction_run(self, test_agent: ExtractionAgent):
|
||||
def test_delete_extraction_run(self, test_agent: ExtractionAgent, unique_test_pdf):
|
||||
assert test_agent.list_extraction_runs().total == 0
|
||||
run: ExtractRun = test_agent.extract(TEST_PDF)
|
||||
run: ExtractRun = test_agent.extract(unique_test_pdf)
|
||||
test_agent.delete_extraction_run(run.id)
|
||||
runs = test_agent.list_extraction_runs()
|
||||
assert runs.total == 0
|
||||
|
||||
@@ -10,7 +10,7 @@ import uuid
|
||||
from llama_cloud.types import ExtractConfig, ExtractMode
|
||||
from deepdiff import DeepDiff
|
||||
from tests.extract.util import json_subset_match_score, load_test_dotenv
|
||||
from .conftest import register_agent_for_cleanup, create_agent_with_retry
|
||||
from .conftest import create_agent_with_retry
|
||||
|
||||
load_test_dotenv()
|
||||
|
||||
@@ -109,32 +109,24 @@ def extractor():
|
||||
@pytest.fixture
|
||||
def extraction_agent(test_case: ExtractionTestCase, extractor: LlamaExtract):
|
||||
"""Fixture to create and cleanup extraction agent for each test."""
|
||||
# Create unique name with random UUID (important for CI to avoid conflicts)
|
||||
unique_id = uuid.uuid4().hex[:8]
|
||||
agent_name = f"{test_case.name}_{unique_id}"
|
||||
|
||||
with open(test_case.schema_path, "r") as f:
|
||||
schema = json.load(f)
|
||||
|
||||
# Clean up any existing agents with this name
|
||||
try:
|
||||
agents = extractor.list_agents()
|
||||
for agent in agents:
|
||||
if agent.name == agent_name:
|
||||
extractor.delete_agent(agent.id)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to cleanup existing agent: {str(e)}")
|
||||
|
||||
# Create new agent with retry logic for rate limiting
|
||||
agent = create_agent_with_retry(
|
||||
extractor, name=agent_name, data_schema=schema, config=test_case.config
|
||||
)
|
||||
|
||||
# Register agent for cleanup at the end of the test session
|
||||
register_agent_for_cleanup(agent.id)
|
||||
|
||||
yield agent
|
||||
|
||||
# Inline cleanup -- each worker cleans up its own agents
|
||||
try:
|
||||
extractor.delete_agent(agent.id)
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to cleanup agent {agent.id}: {e}")
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
os.environ.get("LLAMA_CLOUD_API_KEY", "") == "",
|
||||
|
||||
@@ -0,0 +1,191 @@
|
||||
import pytest
|
||||
|
||||
from llama_cloud_services.parse.temporal import (
|
||||
is_heartbeat_timeout_error,
|
||||
retry_on_heartbeat_timeout,
|
||||
)
|
||||
|
||||
|
||||
class FakeTimeoutError(Exception):
|
||||
"""Mimics temporalio.exceptions.TimeoutError with a type attribute."""
|
||||
|
||||
def __init__(self, timeout_type):
|
||||
super().__init__("timeout")
|
||||
self.type = timeout_type
|
||||
|
||||
|
||||
class FakeTimeoutType:
|
||||
"""Mimics temporalio.exceptions.TimeoutType enum."""
|
||||
|
||||
HEARTBEAT = 4
|
||||
START_TO_CLOSE = 1
|
||||
|
||||
|
||||
class FakeActivityError(Exception):
|
||||
"""Mimics temporalio.exceptions.ActivityError with a cause."""
|
||||
|
||||
def __init__(self, cause=None):
|
||||
super().__init__("activity error")
|
||||
self.__cause__ = cause
|
||||
|
||||
|
||||
# Make the class names match what the code checks
|
||||
FakeTimeoutError.__name__ = "TimeoutError"
|
||||
FakeActivityError.__name__ = "ActivityError"
|
||||
|
||||
|
||||
class TestIsHeartbeatTimeoutError:
|
||||
def test_detects_activity_error_with_heartbeat_timeout(self):
|
||||
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
|
||||
activity_err = FakeActivityError(cause=timeout_err)
|
||||
assert is_heartbeat_timeout_error(activity_err) is True
|
||||
|
||||
def test_rejects_activity_error_with_start_to_close_timeout(self):
|
||||
timeout_err = FakeTimeoutError(FakeTimeoutType.START_TO_CLOSE)
|
||||
activity_err = FakeActivityError(cause=timeout_err)
|
||||
assert is_heartbeat_timeout_error(activity_err) is False
|
||||
|
||||
def test_rejects_activity_error_with_non_timeout_cause(self):
|
||||
other_err = ValueError("some other error")
|
||||
activity_err = FakeActivityError(cause=other_err)
|
||||
assert is_heartbeat_timeout_error(activity_err) is False
|
||||
|
||||
def test_detects_heartbeat_timeout_from_message(self):
|
||||
err = Exception("activity Heartbeat timeout")
|
||||
assert is_heartbeat_timeout_error(err) is True
|
||||
|
||||
def test_detects_heartbeat_timeout_underscore_from_message(self):
|
||||
err = Exception("Failed due to heartbeat_timeout")
|
||||
assert is_heartbeat_timeout_error(err) is True
|
||||
|
||||
def test_rejects_non_heartbeat_errors(self):
|
||||
err = Exception("Connection refused")
|
||||
assert is_heartbeat_timeout_error(err) is False
|
||||
|
||||
def test_handles_none_error(self):
|
||||
assert is_heartbeat_timeout_error(ValueError()) is False
|
||||
|
||||
def test_detects_heartbeat_timeout_with_enum_value_attr(self):
|
||||
"""Test with an enum-like type that has .value."""
|
||||
|
||||
class EnumLike:
|
||||
value = 4
|
||||
|
||||
timeout_err = FakeTimeoutError(EnumLike())
|
||||
activity_err = FakeActivityError(cause=timeout_err)
|
||||
assert is_heartbeat_timeout_error(activity_err) is True
|
||||
|
||||
|
||||
class TestRetryOnHeartbeatTimeout:
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_result_on_success(self):
|
||||
async def success():
|
||||
return "ok"
|
||||
|
||||
result = await retry_on_heartbeat_timeout(success)
|
||||
assert result == "ok"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retries_on_heartbeat_timeout(self):
|
||||
call_count = 0
|
||||
|
||||
async def flaky():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
|
||||
raise FakeActivityError(cause=timeout_err)
|
||||
return "recovered"
|
||||
|
||||
result = await retry_on_heartbeat_timeout(flaky, max_retries=3)
|
||||
assert result == "recovered"
|
||||
assert call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_does_not_retry_on_non_heartbeat_errors(self):
|
||||
call_count = 0
|
||||
|
||||
async def always_fail():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
raise ValueError("Application error")
|
||||
|
||||
with pytest.raises(ValueError, match="Application error"):
|
||||
await retry_on_heartbeat_timeout(always_fail)
|
||||
assert call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_exhausts_retries_on_persistent_heartbeat_timeout(self):
|
||||
call_count = 0
|
||||
|
||||
async def always_heartbeat_timeout():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
|
||||
raise FakeActivityError(cause=timeout_err)
|
||||
|
||||
with pytest.raises(FakeActivityError):
|
||||
await retry_on_heartbeat_timeout(
|
||||
always_heartbeat_timeout, max_retries=2
|
||||
)
|
||||
assert call_count == 3 # initial + 2 retries
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_calls_on_retry_callback(self):
|
||||
retries_seen = []
|
||||
|
||||
def on_retry(attempt, error):
|
||||
retries_seen.append(attempt)
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def flaky():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count <= 2:
|
||||
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
|
||||
raise FakeActivityError(cause=timeout_err)
|
||||
return "ok"
|
||||
|
||||
result = await retry_on_heartbeat_timeout(
|
||||
flaky, max_retries=3, on_retry=on_retry
|
||||
)
|
||||
assert result == "ok"
|
||||
assert retries_seen == [1, 2]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_defaults_to_3_retries(self):
|
||||
call_count = 0
|
||||
|
||||
async def always_heartbeat_timeout():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
|
||||
raise FakeActivityError(cause=timeout_err)
|
||||
|
||||
with pytest.raises(FakeActivityError):
|
||||
await retry_on_heartbeat_timeout(always_heartbeat_timeout)
|
||||
assert call_count == 4 # initial + 3 retries
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_passes_args_and_kwargs(self):
|
||||
async def add(a, b, extra=0):
|
||||
return a + b + extra
|
||||
|
||||
result = await retry_on_heartbeat_timeout(add, 1, 2, extra=10)
|
||||
assert result == 13
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retries_on_message_match(self):
|
||||
call_count = 0
|
||||
|
||||
async def flaky():
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
raise Exception("activity Heartbeat timeout")
|
||||
return "recovered"
|
||||
|
||||
result = await retry_on_heartbeat_timeout(flaky, max_retries=3)
|
||||
assert result == "recovered"
|
||||
assert call_count == 2
|
||||
@@ -25,7 +25,8 @@
|
||||
"./parse",
|
||||
"./beta/agent",
|
||||
"./extract",
|
||||
"./classify"
|
||||
"./classify",
|
||||
"./temporal"
|
||||
],
|
||||
"exports": {
|
||||
"./openapi.json": "./openapi.json",
|
||||
@@ -95,6 +96,17 @@
|
||||
},
|
||||
"default": "./classify/dist/index.js"
|
||||
},
|
||||
"./temporal": {
|
||||
"require": {
|
||||
"types": "./temporal/dist/index.d.cts",
|
||||
"default": "./temporal/dist/index.cjs"
|
||||
},
|
||||
"import": {
|
||||
"types": "./temporal/dist/index.d.ts",
|
||||
"default": "./temporal/dist/index.js"
|
||||
},
|
||||
"default": "./temporal/dist/index.js"
|
||||
},
|
||||
".": {
|
||||
"require": {
|
||||
"types": "./dist/index.d.cts",
|
||||
|
||||
@@ -24,3 +24,8 @@ export type {
|
||||
ClassifyJobResults,
|
||||
ClassifyParsingConfiguration,
|
||||
} from "./classify.js";
|
||||
export {
|
||||
isHeartbeatTimeoutError,
|
||||
retryOnHeartbeatTimeout,
|
||||
type RetryOnHeartbeatTimeoutOptions,
|
||||
} from "./temporal.js";
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
/**
|
||||
* Utilities for retrying LlamaParse activities in Temporal
|
||||
* specifically when they fail due to activity heartbeat timeout.
|
||||
*
|
||||
* These utilities detect Temporal's ActivityFailure wrapping a TimeoutFailure
|
||||
* with timeoutType HEARTBEAT, without requiring a direct dependency on the
|
||||
* Temporal SDK.
|
||||
*
|
||||
* Usage in a Temporal workflow:
|
||||
*
|
||||
* ```ts
|
||||
* import { retryOnHeartbeatTimeout } from 'llama-cloud-services/temporal';
|
||||
* import { proxyActivities } from '@temporalio/workflow';
|
||||
*
|
||||
* const { llamaParse } = proxyActivities<typeof activities>({
|
||||
* startToCloseTimeout: '30m',
|
||||
* heartbeatTimeout: '60s',
|
||||
* retry: { maximumAttempts: 1 }, // disable built-in retry
|
||||
* });
|
||||
*
|
||||
* export async function parseWorkflow(input: ParseInput): Promise<ParseResult> {
|
||||
* return retryOnHeartbeatTimeout(() => llamaParse(input), { maxRetries: 3 });
|
||||
* }
|
||||
* ```
|
||||
*/
|
||||
|
||||
// Temporal TimeoutType enum value for HEARTBEAT (from @temporalio/common proto)
|
||||
const TIMEOUT_TYPE_HEARTBEAT = 4;
|
||||
|
||||
/**
|
||||
* Checks if an error is a Temporal activity heartbeat timeout error.
|
||||
*
|
||||
* Detects both the structured Temporal error types (ActivityFailure wrapping
|
||||
* TimeoutFailure with timeoutType HEARTBEAT) and fallback string matching
|
||||
* on the error message.
|
||||
*/
|
||||
export function isHeartbeatTimeoutError(error: unknown): boolean {
|
||||
if (!error || typeof error !== "object") return false;
|
||||
|
||||
const err = error as Record<string, unknown>;
|
||||
|
||||
// Check for Temporal's ActivityFailure → TimeoutFailure chain
|
||||
if (err.name === "ActivityFailure" || err.name === "ActivityError") {
|
||||
const cause =
|
||||
(err.cause as Record<string, unknown>) ??
|
||||
((error as { __cause__?: unknown }).__cause__ as Record<string, unknown>);
|
||||
|
||||
if (cause && typeof cause === "object") {
|
||||
const causeName = (cause as Record<string, unknown>).name;
|
||||
if (causeName === "TimeoutFailure" || causeName === "TimeoutError") {
|
||||
const timeoutType = (cause as Record<string, unknown>).timeoutType;
|
||||
return (
|
||||
timeoutType === TIMEOUT_TYPE_HEARTBEAT ||
|
||||
timeoutType === "HEARTBEAT" ||
|
||||
timeoutType === "TimeoutType.HEARTBEAT"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: match on error message
|
||||
const message = String(
|
||||
(err as { message?: string }).message ?? String(err),
|
||||
).toLowerCase();
|
||||
return (
|
||||
message.includes("heartbeat timeout") ||
|
||||
message.includes("heartbeat_timeout")
|
||||
);
|
||||
}
|
||||
|
||||
export interface RetryOnHeartbeatTimeoutOptions {
|
||||
/** Maximum number of retry attempts after the initial try (default: 3). */
|
||||
maxRetries?: number;
|
||||
/** Optional callback invoked before each retry with the attempt number and error. */
|
||||
onRetry?: (attempt: number, error: unknown) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retries an async function only when it fails with a heartbeat timeout error.
|
||||
* All other errors are thrown immediately without retry.
|
||||
*
|
||||
* Designed for use in Temporal workflows to wrap activity calls so that
|
||||
* transient heartbeat timeouts (e.g. from SIGTERM, event-loop blocking)
|
||||
* are automatically retried while genuine failures propagate immediately.
|
||||
*
|
||||
* @param fn - The async function to execute (typically a Temporal activity call)
|
||||
* @param options - Retry configuration
|
||||
* @returns The result of the function
|
||||
*/
|
||||
export async function retryOnHeartbeatTimeout<T>(
|
||||
fn: () => Promise<T>,
|
||||
options: RetryOnHeartbeatTimeoutOptions = {},
|
||||
): Promise<T> {
|
||||
const { maxRetries = 3, onRetry } = options;
|
||||
let lastError: unknown;
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
try {
|
||||
return await fn();
|
||||
} catch (err) {
|
||||
lastError = err;
|
||||
if (isHeartbeatTimeoutError(err) && attempt < maxRetries) {
|
||||
onRetry?.(attempt + 1, err);
|
||||
continue;
|
||||
}
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
|
||||
throw lastError;
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
{
|
||||
"type": "module",
|
||||
"main": "./dist/index.cjs",
|
||||
"module": "./dist/index.js",
|
||||
"types": "./dist/index.d.ts",
|
||||
"exports": "./dist/index.js",
|
||||
"private": true
|
||||
}
|
||||
@@ -0,0 +1,195 @@
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import {
|
||||
isHeartbeatTimeoutError,
|
||||
retryOnHeartbeatTimeout,
|
||||
} from "../src/temporal.js";
|
||||
|
||||
describe("Temporal Heartbeat Timeout Utilities", () => {
|
||||
describe("isHeartbeatTimeoutError", () => {
|
||||
it("should detect ActivityFailure wrapping TimeoutFailure with HEARTBEAT type (numeric)", () => {
|
||||
const error = {
|
||||
name: "ActivityFailure",
|
||||
message: "activity failed",
|
||||
cause: {
|
||||
name: "TimeoutFailure",
|
||||
message: "timeout",
|
||||
timeoutType: 4,
|
||||
},
|
||||
};
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(true);
|
||||
});
|
||||
|
||||
it("should detect ActivityFailure wrapping TimeoutFailure with HEARTBEAT type (string)", () => {
|
||||
const error = {
|
||||
name: "ActivityFailure",
|
||||
message: "activity failed",
|
||||
cause: {
|
||||
name: "TimeoutFailure",
|
||||
message: "timeout",
|
||||
timeoutType: "HEARTBEAT",
|
||||
},
|
||||
};
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(true);
|
||||
});
|
||||
|
||||
it("should detect ActivityError wrapping TimeoutError with HEARTBEAT type", () => {
|
||||
const error = {
|
||||
name: "ActivityError",
|
||||
message: "activity error",
|
||||
cause: {
|
||||
name: "TimeoutError",
|
||||
message: "timeout",
|
||||
timeoutType: 4,
|
||||
},
|
||||
};
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(true);
|
||||
});
|
||||
|
||||
it("should reject ActivityFailure with non-heartbeat timeout type", () => {
|
||||
const error = {
|
||||
name: "ActivityFailure",
|
||||
message: "activity failed",
|
||||
cause: {
|
||||
name: "TimeoutFailure",
|
||||
message: "timeout",
|
||||
timeoutType: 1, // START_TO_CLOSE
|
||||
},
|
||||
};
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(false);
|
||||
});
|
||||
|
||||
it("should reject ActivityFailure with non-timeout cause", () => {
|
||||
const error = {
|
||||
name: "ActivityFailure",
|
||||
message: "activity failed",
|
||||
cause: {
|
||||
name: "ApplicationFailure",
|
||||
message: "app error",
|
||||
},
|
||||
};
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(false);
|
||||
});
|
||||
|
||||
it("should detect heartbeat timeout from error message", () => {
|
||||
const error = new Error("activity Heartbeat timeout");
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(true);
|
||||
});
|
||||
|
||||
it("should detect heartbeat_timeout in error message", () => {
|
||||
const error = new Error("Failed due to heartbeat_timeout");
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(true);
|
||||
});
|
||||
|
||||
it("should reject non-heartbeat errors", () => {
|
||||
const error = new Error("Connection refused");
|
||||
expect(isHeartbeatTimeoutError(error)).toBe(false);
|
||||
});
|
||||
|
||||
it("should handle null/undefined", () => {
|
||||
expect(isHeartbeatTimeoutError(null)).toBe(false);
|
||||
expect(isHeartbeatTimeoutError(undefined)).toBe(false);
|
||||
});
|
||||
|
||||
it("should handle non-object errors", () => {
|
||||
expect(isHeartbeatTimeoutError("string error")).toBe(false);
|
||||
expect(isHeartbeatTimeoutError(42)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("retryOnHeartbeatTimeout", () => {
|
||||
it("should return result on success", async () => {
|
||||
const fn = vi.fn().mockResolvedValue("success");
|
||||
const result = await retryOnHeartbeatTimeout(fn);
|
||||
expect(result).toBe("success");
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should retry on heartbeat timeout error", async () => {
|
||||
const heartbeatError = Object.assign(new Error("timeout"), {
|
||||
name: "ActivityFailure",
|
||||
cause: { name: "TimeoutFailure", timeoutType: 4 },
|
||||
});
|
||||
|
||||
const fn = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(heartbeatError)
|
||||
.mockResolvedValue("recovered");
|
||||
|
||||
const result = await retryOnHeartbeatTimeout(fn, { maxRetries: 3 });
|
||||
expect(result).toBe("recovered");
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("should not retry on non-heartbeat errors", async () => {
|
||||
const appError = new Error("Application error");
|
||||
const fn = vi.fn().mockRejectedValue(appError);
|
||||
|
||||
await expect(retryOnHeartbeatTimeout(fn)).rejects.toThrow(
|
||||
"Application error",
|
||||
);
|
||||
expect(fn).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("should exhaust retries on persistent heartbeat timeout", async () => {
|
||||
const heartbeatError = Object.assign(new Error("timeout"), {
|
||||
name: "ActivityFailure",
|
||||
cause: { name: "TimeoutFailure", timeoutType: 4 },
|
||||
});
|
||||
|
||||
const fn = vi.fn().mockRejectedValue(heartbeatError);
|
||||
|
||||
await expect(
|
||||
retryOnHeartbeatTimeout(fn, { maxRetries: 2 }),
|
||||
).rejects.toBe(heartbeatError);
|
||||
expect(fn).toHaveBeenCalledTimes(3); // initial + 2 retries
|
||||
});
|
||||
|
||||
it("should call onRetry callback before each retry", async () => {
|
||||
const heartbeatError = Object.assign(new Error("timeout"), {
|
||||
name: "ActivityFailure",
|
||||
cause: { name: "TimeoutFailure", timeoutType: 4 },
|
||||
});
|
||||
|
||||
const fn = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(heartbeatError)
|
||||
.mockRejectedValueOnce(heartbeatError)
|
||||
.mockResolvedValue("ok");
|
||||
|
||||
const onRetry = vi.fn();
|
||||
const result = await retryOnHeartbeatTimeout(fn, {
|
||||
maxRetries: 3,
|
||||
onRetry,
|
||||
});
|
||||
|
||||
expect(result).toBe("ok");
|
||||
expect(onRetry).toHaveBeenCalledTimes(2);
|
||||
expect(onRetry).toHaveBeenCalledWith(1, heartbeatError);
|
||||
expect(onRetry).toHaveBeenCalledWith(2, heartbeatError);
|
||||
});
|
||||
|
||||
it("should default to 3 retries", async () => {
|
||||
const heartbeatError = Object.assign(new Error("timeout"), {
|
||||
name: "ActivityFailure",
|
||||
cause: { name: "TimeoutFailure", timeoutType: 4 },
|
||||
});
|
||||
|
||||
const fn = vi.fn().mockRejectedValue(heartbeatError);
|
||||
|
||||
await expect(retryOnHeartbeatTimeout(fn)).rejects.toBe(heartbeatError);
|
||||
expect(fn).toHaveBeenCalledTimes(4); // initial + 3 retries
|
||||
});
|
||||
|
||||
it("should detect heartbeat timeout from message and retry", async () => {
|
||||
const messageError = new Error("activity Heartbeat timeout");
|
||||
const fn = vi
|
||||
.fn()
|
||||
.mockRejectedValueOnce(messageError)
|
||||
.mockResolvedValue("recovered");
|
||||
|
||||
const result = await retryOnHeartbeatTimeout(fn);
|
||||
expect(result).toBe("recovered");
|
||||
expect(fn).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user