Compare commits

...

9 Commits

Author SHA1 Message Date
Cursor Agent e80e75f2f9 Add retry utilities for Temporal heartbeat timeout failures
Add isHeartbeatTimeoutError/is_heartbeat_timeout_error to detect Temporal
activity heartbeat timeout errors, and retryOnHeartbeatTimeout/
retry_on_heartbeat_timeout to retry activities ONLY when they fail from
heartbeat timeout (not other error types).

Works by inspecting Temporal's ActivityFailure > TimeoutFailure chain
for HEARTBEAT timeout type, with fallback string matching on error messages.

Both TypeScript and Python implementations included with full test coverage.

Co-authored-by: George He <georgewho96@gmail.com>
2026-02-17 22:56:35 +00:00
Neeraj Pradhan 5ea758b853 More robust extract tests with pytest xdist (#1117) 2026-02-16 16:16:15 -08:00
dependabot[bot] 208b6f2fa5 build(deps): bump slackapi/slack-github-action from 1.27.0 to 2.1.1 (#1092)
Bumps [slackapi/slack-github-action](https://github.com/slackapi/slack-github-action) from 1.27.0 to 2.1.1.
- [Release notes](https://github.com/slackapi/slack-github-action/releases)
- [Commits](https://github.com/slackapi/slack-github-action/compare/v1.27.0...v2.1.1)

---
updated-dependencies:
- dependency-name: slackapi/slack-github-action
  dependency-version: 2.1.1
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-02-14 21:03:05 -06:00
github-actions[bot] e1b9143f79 chore: version packages (#1116)
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
2026-02-13 15:29:09 -08:00
Neeraj Pradhan 232c55bd6a Bump up patch version (#1115) 2026-02-13 15:20:52 -08:00
Neeraj Pradhan ab6f2f8da5 Allows xlsx files in the sdk for extract (#1114) 2026-02-13 14:44:25 -08:00
github-actions[bot] 66c2639ec8 chore: version packages (#1112) 2026-02-11 15:18:43 -06:00
Logan da1916c69f more loudly deprecate ancient llama-parse package (#1111) 2026-02-11 15:16:01 -06:00
Neeraj Pradhan 345e272573 Lower frequency for e2e tests (#1110) 2026-02-11 09:07:15 -08:00
22 changed files with 791 additions and 102 deletions
+4 -4
View File
@@ -1,8 +1,8 @@
name: Hourly Extract E2E Tests
name: Extract E2E Tests (every 4 hours)
on:
schedule:
- cron: "18 * * * *"
- cron: "0 */4 * * *"
workflow_dispatch:
# Allows manual triggering
inputs:
@@ -29,7 +29,7 @@ env:
jobs:
extract-e2e:
name: "Hourly Extract E2E Tests (${{ matrix.environment }})"
name: "Extract E2E Tests (${{ matrix.environment }})"
runs-on: ubuntu-latest
timeout-minutes: 30
concurrency:
@@ -149,7 +149,7 @@ jobs:
- name: Post to Extract Slack channel
id: slack
if: (failure() || cancelled()) && steps.runtime.outputs.notify_slack == 'true'
uses: slackapi/slack-github-action@v1.27.0
uses: slackapi/slack-github-action@v2.1.1
with:
channel-id: ${{ env.SLACK_CHANNEL_ID }}
slack-message: |
+12
View File
@@ -1,5 +1,17 @@
# llama-cloud-services-py
## 0.6.94
### Patch Changes
- 232c55b: Include xlsx files in extract input
## 0.6.93
### Patch Changes
- da1916c: Add more warnings
## 0.6.92
### Patch Changes
+10
View File
@@ -4,6 +4,16 @@
# Llama Cloud Services
> **⚠️ DEPRECATION NOTICE**
>
> This repository and its packages are deprecated and will be maintained until **May 1, 2026**.
>
> **Please migrate to the new packages:**
> - **Python**: `pip install llama-cloud>=1.0` ([GitHub](https://github.com/run-llama/llama-cloud-py))
> - **TypeScript**: `npm install @llamaindex/llama-cloud` ([GitHub](https://github.com/run-llama/llama-cloud-ts))
>
> The new packages provide the same functionality with improved performance, better support, and active development.
This repository contains the code for hand-written SDKs and clients for interacting with LlamaCloud.
This includes:
@@ -806,6 +806,7 @@ class LlamaExtract(BaseComponent):
# Document files
".pdf": "application/pdf",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
# Image files
".png": "image/png",
".jpg": "image/jpeg",
+12 -1
View File
@@ -4,5 +4,16 @@ from llama_cloud_services.parse.base import (
ParsingMode,
FailedPageMode,
)
from llama_cloud_services.parse.temporal import (
is_heartbeat_timeout_error,
retry_on_heartbeat_timeout,
)
__all__ = ["LlamaParse", "ResultType", "ParsingMode", "FailedPageMode"]
__all__ = [
"LlamaParse",
"ResultType",
"ParsingMode",
"FailedPageMode",
"is_heartbeat_timeout_error",
"retry_on_heartbeat_timeout",
]
+130
View File
@@ -0,0 +1,130 @@
"""Utilities for retrying LlamaParse activities in Temporal
specifically when they fail due to activity heartbeat timeout.
These utilities detect Temporal's ActivityError wrapping a TimeoutError
with timeout_type HEARTBEAT, without requiring a direct dependency on the
Temporal SDK.
Usage in a Temporal workflow::
from temporalio import workflow
from llama_cloud_services.parse.temporal import retry_on_heartbeat_timeout
@workflow.defn
class ParseWorkflow:
@workflow.run
async def run(self, input: ParseInput) -> ParseResult:
parse_activity = workflow.start_activity(
"llama_parse",
input,
start_to_close_timeout=timedelta(minutes=30),
heartbeat_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=1), # disable built-in retry
)
return await retry_on_heartbeat_timeout(parse_activity)
"""
import logging
from typing import Any, Awaitable, Callable, Optional, TypeVar
T = TypeVar("T")
logger = logging.getLogger(__name__)
# Temporal TimeoutType enum value for HEARTBEAT (from temporalio proto)
_TIMEOUT_TYPE_HEARTBEAT = 4
def is_heartbeat_timeout_error(error: BaseException) -> bool:
"""Check if an error is a Temporal activity heartbeat timeout error.
Detects both the structured Temporal error types (ActivityError wrapping
TimeoutError with timeout_type HEARTBEAT) and fallback string matching
on the error message.
Works with the ``temporalio`` Python SDK error types without requiring
a direct import.
Args:
error: The exception to check.
Returns:
True if the error represents a heartbeat timeout.
"""
error_type_name = type(error).__name__
# Check for Temporal's ActivityError → TimeoutError chain
if error_type_name in ("ActivityError", "ActivityFailure"):
cause = getattr(error, "cause", None) or error.__cause__
if cause is not None:
cause_type_name = type(cause).__name__
if cause_type_name in ("TimeoutError", "TimeoutFailure"):
timeout_type = getattr(cause, "type", None) or getattr(
cause, "timeout_type", None
)
if timeout_type is not None:
# Handle both enum and int representations
timeout_type_val = (
timeout_type.value
if hasattr(timeout_type, "value")
else timeout_type
)
return timeout_type_val == _TIMEOUT_TYPE_HEARTBEAT
# Fallback: match on error message
message = str(error).lower()
return "heartbeat timeout" in message or "heartbeat_timeout" in message
async def retry_on_heartbeat_timeout(
fn: Callable[..., Awaitable[T]],
*args: Any,
max_retries: int = 3,
on_retry: Optional[Callable[[int, BaseException], None]] = None,
**kwargs: Any,
) -> T:
"""Retry an async function only when it fails with a heartbeat timeout error.
All other errors are raised immediately without retry.
Designed for use in Temporal workflows to wrap activity calls so that
transient heartbeat timeouts (e.g. from SIGTERM, event-loop blocking)
are automatically retried while genuine failures propagate immediately.
Args:
fn: The async function to execute (typically a Temporal activity call).
*args: Positional arguments passed to ``fn``.
max_retries: Maximum number of retry attempts after the initial try
(default: 3).
on_retry: Optional callback invoked before each retry with the attempt
number and the error.
**kwargs: Keyword arguments passed to ``fn``.
Returns:
The result of the function.
Raises:
The original exception if it is not a heartbeat timeout or retries
are exhausted.
"""
last_error: Optional[BaseException] = None
for attempt in range(max_retries + 1):
try:
return await fn(*args, **kwargs)
except BaseException as err:
last_error = err
if is_heartbeat_timeout_error(err) and attempt < max_retries:
logger.warning(
"LlamaParse activity failed with heartbeat timeout "
"(attempt %d/%d), retrying...",
attempt + 1,
max_retries,
)
if on_retry:
on_retry(attempt + 1, err)
continue
raise
# Should not reach here, but satisfy type checker
assert last_error is not None
raise last_error
+16
View File
@@ -1,5 +1,21 @@
# llama_parse
## 0.6.94
### Patch Changes
- 232c55b: Include xlsx files in extract input
- Updated dependencies [232c55b]
- llama-cloud-services-py@0.6.94
## 0.6.93
### Patch Changes
- da1916c: Add more warnings
- Updated dependencies [da1916c]
- llama-cloud-services-py@0.6.93
## 0.6.92
### Patch Changes
+10
View File
@@ -1,5 +1,15 @@
# LlamaParse
> **⚠️ DEPRECATION NOTICE**
>
> This repository and its packages are deprecated and will be maintained until **May 1, 2026**.
>
> **Please migrate to the new packages:**
> - **Python**: `pip install llama-cloud>=1.0` ([GitHub](https://github.com/run-llama/llama-cloud-py))
> - **TypeScript**: `npm install @llamaindex/llama-cloud` ([GitHub](https://github.com/run-llama/llama-cloud-ts))
>
> The new packages provide the same functionality with improved performance, better support, and active development.
[![PyPI - Downloads](https://img.shields.io/pypi/dm/llama-parse)](https://pypi.org/project/llama-parse/)
[![GitHub contributors](https://img.shields.io/github/contributors/run-llama/llama_parse)](https://github.com/run-llama/llama_parse/graphs/contributors)
[![Discord](https://img.shields.io/discord/1059199217496772688)](https://discord.gg/dGcwcsnxhU)
+11 -1
View File
@@ -1,8 +1,18 @@
from llama_cloud_services.parse import (
import warnings
from llama_cloud_services.parse import ( # type: ignore[attr-defined]
LlamaParse,
ResultType,
ParsingMode,
FailedPageMode,
)
warnings.warn(
"The 'llama-parse' package is deprecated and will no longer receive updates. "
"Please migrate to the new unified SDK. "
"See https://developers.llamaindex.ai/python/cloud/llamaparse/getting_started/ "
"and https://github.com/run-llama/llama-cloud-py/blob/main/README.md for migration instructions.",
DeprecationWarning,
stacklevel=2,
)
__all__ = ["LlamaParse", "ResultType", "ParsingMode", "FailedPageMode"]
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "llama_parse",
"version": "0.6.92",
"version": "0.6.94",
"description": "",
"main": "index.js",
"private": false,
+2 -2
View File
@@ -11,13 +11,13 @@ dev = [
[project]
name = "llama-parse"
version = "0.6.92"
version = "0.6.94"
description = "Parse files into RAG-Optimized formats."
authors = [{name = "Logan Markewich", email = "logan@llamaindex.ai"}]
requires-python = ">=3.9,<4.0"
readme = "README.md"
license = "MIT"
dependencies = ["llama-cloud-services>=0.6.92"]
dependencies = ["llama-cloud-services>=0.6.94"]
[project.scripts]
llama-parse = "llama_parse.cli.main:parse"
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "llama-cloud-services-py",
"version": "0.6.92",
"version": "0.6.94",
"private": false,
"license": "MIT",
"scripts": {},
+1 -1
View File
@@ -23,7 +23,7 @@ dev = [
[project]
name = "llama-cloud-services"
version = "0.6.92"
version = "0.6.94"
description = "Tailored SDK clients for LlamaCloud services."
authors = [{name = "Logan Markewich", email = "logan@runllama.ai"}]
requires-python = ">=3.9,<4.0"
+1 -40
View File
@@ -1,5 +1,4 @@
import os
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, Optional, Union
from llama_cloud.core.api_error import ApiError
from llama_cloud.types import ExtractConfig
@@ -13,9 +12,6 @@ from tenacity import (
from llama_cloud_services.extract import ExtractionAgent, LlamaExtract
# Global storage for agents to cleanup
_TEST_AGENTS_TO_CLEANUP: List[str] = []
def _is_rate_limit_error(exception: BaseException) -> bool:
"""Check if the exception is a rate limit error (429)."""
@@ -42,38 +38,3 @@ def pytest_configure(config):
"""Register custom markers for extract tests."""
config.addinivalue_line("markers", "agent_name: custom agent name for test")
config.addinivalue_line("markers", "agent_schema: custom agent schema for test")
def pytest_sessionfinish(session, exitstatus):
"""Hook that runs after all tests complete - cleanup agents here"""
print(
f"pytest_sessionfinish hook called! Agents to cleanup: {_TEST_AGENTS_TO_CLEANUP}"
)
if _TEST_AGENTS_TO_CLEANUP:
print("Creating cleanup client...")
# Create a fresh client just for cleanup
cleanup_client = LlamaExtract(
api_key=os.getenv("LLAMA_CLOUD_API_KEY"),
base_url=os.getenv("LLAMA_CLOUD_BASE_URL"),
project_id=os.getenv("LLAMA_CLOUD_PROJECT_ID"),
verbose=True,
)
for agent_id in _TEST_AGENTS_TO_CLEANUP:
try:
print(f"Deleting agent {agent_id}...")
cleanup_client.delete_agent(agent_id)
print(f"Cleaned up agent {agent_id}")
except Exception as e:
print(f"Warning: Failed to delete agent {agent_id}: {e}")
_TEST_AGENTS_TO_CLEANUP.clear()
print("Agent cleanup completed")
else:
print("No agents to cleanup")
def register_agent_for_cleanup(agent_id: str):
"""Register an agent ID for cleanup at the end of the test session"""
_TEST_AGENTS_TO_CLEANUP.append(agent_id)
+49 -35
View File
@@ -1,4 +1,6 @@
import os
import shutil
import uuid
import pytest
from pathlib import Path
from pydantic import BaseModel
@@ -6,7 +8,7 @@ from pydantic import BaseModel
from llama_cloud_services.extract import LlamaExtract, ExtractionAgent, SourceText
from llama_cloud.types import ExtractConfig, ExtractMode, ExtractRun
from tests.extract.util import load_test_dotenv
from .conftest import register_agent_for_cleanup, create_agent_with_retry
from .conftest import create_agent_with_retry
load_test_dotenv()
@@ -59,17 +61,27 @@ def test_schema_dict():
@pytest.fixture
def test_agent(llama_extract, test_agent_name, test_schema_dict, request):
"""Creates a test agent and collects it for cleanup at the end of all tests"""
test_id = request.node.nodeid
test_hash = hex(hash(test_id))[-8:]
base_name = test_agent_name
def unique_test_pdf(tmp_path):
"""Copy test PDF to a unique path to avoid file deduplication across parallel tests.
Uses a UUID in the filename so that external_file_id is unique regardless of
whether the full path or just the filename is sent to the backend.
"""
unique_name = f"{TEST_PDF.stem}-{uuid.uuid4().hex[:8]}{TEST_PDF.suffix}"
unique_pdf = tmp_path / unique_name
shutil.copy2(TEST_PDF, unique_pdf)
return unique_pdf
@pytest.fixture
def test_agent(llama_extract, test_agent_name, test_schema_dict, request):
"""Creates a test agent with a unique name and cleans it up after the test."""
unique_id = uuid.uuid4().hex[:8]
base_name = next(
(marker.args[0] for marker in request.node.iter_markers("agent_name")),
base_name,
test_agent_name,
)
name = f"{base_name}_{test_hash}"
name = f"{base_name}_{unique_id}"
schema = next(
(
@@ -79,25 +91,20 @@ def test_agent(llama_extract, test_agent_name, test_schema_dict, request):
test_schema_dict,
)
# Cleanup existing agent
try:
for agent in llama_extract.list_agents():
if agent.name == name:
llama_extract.delete_agent(agent.id)
except Exception as e:
print(f"Warning: Failed to cleanup existing agent: {e}")
# Use config with cache invalidation to ensure fresh results in tests
config = ExtractConfig(invalidate_cache=True)
agent = create_agent_with_retry(
llama_extract, name=name, data_schema=schema, config=config
)
# Add agent to cleanup list via conftest helper
register_agent_for_cleanup(agent.id)
yield agent
# Inline cleanup -- each worker cleans up its own agents
try:
llama_extract.delete_agent(agent.id)
except Exception as e:
print(f"Warning: Failed to cleanup agent {agent.id}: {e}")
class TestLlamaExtract:
def test_init_without_api_key(self):
@@ -138,34 +145,38 @@ class TestLlamaExtract:
class TestExtractionAgent:
@pytest.mark.asyncio
async def test_extract_single_file(self, test_agent):
result = await test_agent.aextract(TEST_PDF)
async def test_extract_single_file(self, test_agent, unique_test_pdf):
result = await test_agent.aextract(unique_test_pdf)
assert result.status == "SUCCESS"
assert result.data is not None
assert isinstance(result.data, dict)
assert "title" in result.data
assert "summary" in result.data
def test_sync_extract_single_file(self, test_agent):
result = test_agent.extract(TEST_PDF)
def test_sync_extract_single_file(self, test_agent, unique_test_pdf):
result = test_agent.extract(unique_test_pdf)
assert result.status == "SUCCESS"
assert result.data is not None
assert isinstance(result.data, dict)
assert "title" in result.data
assert "summary" in result.data
def test_extract_file_from_buffered_io(self, test_agent):
result = test_agent.extract(SourceText(file=open(TEST_PDF, "rb")))
def test_extract_file_from_buffered_io(self, test_agent, unique_test_pdf):
result = test_agent.extract(
SourceText(file=open(unique_test_pdf, "rb"), filename=unique_test_pdf.name)
)
assert result.status == "SUCCESS"
assert result.data is not None
assert isinstance(result.data, dict)
assert "title" in result.data
assert "summary" in result.data
def test_extract_file_from_bytes(self, test_agent):
with open(TEST_PDF, "rb") as f:
def test_extract_file_from_bytes(self, test_agent, unique_test_pdf):
with open(unique_test_pdf, "rb") as f:
file_bytes = f.read()
result = test_agent.extract(SourceText(file=file_bytes, filename=TEST_PDF.name))
result = test_agent.extract(
SourceText(file=file_bytes, filename=unique_test_pdf.name)
)
assert result.status == "SUCCESS"
assert result.data is not None
assert isinstance(result.data, dict)
@@ -181,7 +192,10 @@ class TestExtractionAgent:
weight for 8 to 13 km (58 miles).[3] The name llama (also historically spelled
"glama") was adopted by European settlers from native Peruvians.
"""
result = test_agent.extract(SourceText(text_content=TEST_TEXT))
unique_name = f"text-{uuid.uuid4().hex[:8]}.txt"
result = test_agent.extract(
SourceText(text_content=TEST_TEXT, filename=unique_name)
)
assert result.status == "SUCCESS"
assert result.data is not None
assert isinstance(result.data, dict)
@@ -189,8 +203,8 @@ class TestExtractionAgent:
assert "summary" in result.data
@pytest.mark.asyncio
async def test_extract_multiple_files(self, test_agent):
files = [TEST_PDF, TEST_PDF] # Using same file twice for testing
async def test_extract_multiple_files(self, test_agent, unique_test_pdf):
files = [unique_test_pdf, unique_test_pdf] # Using same file twice for testing
response = await test_agent.aextract(files)
assert len(response) == 2
@@ -219,15 +233,15 @@ class TestExtractionAgent:
updated_agent = llama_extract.get_agent(name=test_agent.name)
assert "new_field" in updated_agent.data_schema["properties"]
def test_list_extraction_runs(self, test_agent: ExtractionAgent):
def test_list_extraction_runs(self, test_agent: ExtractionAgent, unique_test_pdf):
assert test_agent.list_extraction_runs().total == 0
test_agent.extract(TEST_PDF)
test_agent.extract(unique_test_pdf)
runs = test_agent.list_extraction_runs()
assert runs.total > 0
def test_delete_extraction_run(self, test_agent: ExtractionAgent):
def test_delete_extraction_run(self, test_agent: ExtractionAgent, unique_test_pdf):
assert test_agent.list_extraction_runs().total == 0
run: ExtractRun = test_agent.extract(TEST_PDF)
run: ExtractRun = test_agent.extract(unique_test_pdf)
test_agent.delete_extraction_run(run.id)
runs = test_agent.list_extraction_runs()
assert runs.total == 0
+7 -15
View File
@@ -10,7 +10,7 @@ import uuid
from llama_cloud.types import ExtractConfig, ExtractMode
from deepdiff import DeepDiff
from tests.extract.util import json_subset_match_score, load_test_dotenv
from .conftest import register_agent_for_cleanup, create_agent_with_retry
from .conftest import create_agent_with_retry
load_test_dotenv()
@@ -109,32 +109,24 @@ def extractor():
@pytest.fixture
def extraction_agent(test_case: ExtractionTestCase, extractor: LlamaExtract):
"""Fixture to create and cleanup extraction agent for each test."""
# Create unique name with random UUID (important for CI to avoid conflicts)
unique_id = uuid.uuid4().hex[:8]
agent_name = f"{test_case.name}_{unique_id}"
with open(test_case.schema_path, "r") as f:
schema = json.load(f)
# Clean up any existing agents with this name
try:
agents = extractor.list_agents()
for agent in agents:
if agent.name == agent_name:
extractor.delete_agent(agent.id)
except Exception as e:
print(f"Warning: Failed to cleanup existing agent: {str(e)}")
# Create new agent with retry logic for rate limiting
agent = create_agent_with_retry(
extractor, name=agent_name, data_schema=schema, config=test_case.config
)
# Register agent for cleanup at the end of the test session
register_agent_for_cleanup(agent.id)
yield agent
# Inline cleanup -- each worker cleans up its own agents
try:
extractor.delete_agent(agent.id)
except Exception as e:
print(f"Warning: Failed to cleanup agent {agent.id}: {e}")
@pytest.mark.skipif(
os.environ.get("LLAMA_CLOUD_API_KEY", "") == "",
+191
View File
@@ -0,0 +1,191 @@
import pytest
from llama_cloud_services.parse.temporal import (
is_heartbeat_timeout_error,
retry_on_heartbeat_timeout,
)
class FakeTimeoutError(Exception):
"""Mimics temporalio.exceptions.TimeoutError with a type attribute."""
def __init__(self, timeout_type):
super().__init__("timeout")
self.type = timeout_type
class FakeTimeoutType:
"""Mimics temporalio.exceptions.TimeoutType enum."""
HEARTBEAT = 4
START_TO_CLOSE = 1
class FakeActivityError(Exception):
"""Mimics temporalio.exceptions.ActivityError with a cause."""
def __init__(self, cause=None):
super().__init__("activity error")
self.__cause__ = cause
# Make the class names match what the code checks
FakeTimeoutError.__name__ = "TimeoutError"
FakeActivityError.__name__ = "ActivityError"
class TestIsHeartbeatTimeoutError:
def test_detects_activity_error_with_heartbeat_timeout(self):
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
activity_err = FakeActivityError(cause=timeout_err)
assert is_heartbeat_timeout_error(activity_err) is True
def test_rejects_activity_error_with_start_to_close_timeout(self):
timeout_err = FakeTimeoutError(FakeTimeoutType.START_TO_CLOSE)
activity_err = FakeActivityError(cause=timeout_err)
assert is_heartbeat_timeout_error(activity_err) is False
def test_rejects_activity_error_with_non_timeout_cause(self):
other_err = ValueError("some other error")
activity_err = FakeActivityError(cause=other_err)
assert is_heartbeat_timeout_error(activity_err) is False
def test_detects_heartbeat_timeout_from_message(self):
err = Exception("activity Heartbeat timeout")
assert is_heartbeat_timeout_error(err) is True
def test_detects_heartbeat_timeout_underscore_from_message(self):
err = Exception("Failed due to heartbeat_timeout")
assert is_heartbeat_timeout_error(err) is True
def test_rejects_non_heartbeat_errors(self):
err = Exception("Connection refused")
assert is_heartbeat_timeout_error(err) is False
def test_handles_none_error(self):
assert is_heartbeat_timeout_error(ValueError()) is False
def test_detects_heartbeat_timeout_with_enum_value_attr(self):
"""Test with an enum-like type that has .value."""
class EnumLike:
value = 4
timeout_err = FakeTimeoutError(EnumLike())
activity_err = FakeActivityError(cause=timeout_err)
assert is_heartbeat_timeout_error(activity_err) is True
class TestRetryOnHeartbeatTimeout:
@pytest.mark.asyncio
async def test_returns_result_on_success(self):
async def success():
return "ok"
result = await retry_on_heartbeat_timeout(success)
assert result == "ok"
@pytest.mark.asyncio
async def test_retries_on_heartbeat_timeout(self):
call_count = 0
async def flaky():
nonlocal call_count
call_count += 1
if call_count == 1:
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
raise FakeActivityError(cause=timeout_err)
return "recovered"
result = await retry_on_heartbeat_timeout(flaky, max_retries=3)
assert result == "recovered"
assert call_count == 2
@pytest.mark.asyncio
async def test_does_not_retry_on_non_heartbeat_errors(self):
call_count = 0
async def always_fail():
nonlocal call_count
call_count += 1
raise ValueError("Application error")
with pytest.raises(ValueError, match="Application error"):
await retry_on_heartbeat_timeout(always_fail)
assert call_count == 1
@pytest.mark.asyncio
async def test_exhausts_retries_on_persistent_heartbeat_timeout(self):
call_count = 0
async def always_heartbeat_timeout():
nonlocal call_count
call_count += 1
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
raise FakeActivityError(cause=timeout_err)
with pytest.raises(FakeActivityError):
await retry_on_heartbeat_timeout(
always_heartbeat_timeout, max_retries=2
)
assert call_count == 3 # initial + 2 retries
@pytest.mark.asyncio
async def test_calls_on_retry_callback(self):
retries_seen = []
def on_retry(attempt, error):
retries_seen.append(attempt)
call_count = 0
async def flaky():
nonlocal call_count
call_count += 1
if call_count <= 2:
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
raise FakeActivityError(cause=timeout_err)
return "ok"
result = await retry_on_heartbeat_timeout(
flaky, max_retries=3, on_retry=on_retry
)
assert result == "ok"
assert retries_seen == [1, 2]
@pytest.mark.asyncio
async def test_defaults_to_3_retries(self):
call_count = 0
async def always_heartbeat_timeout():
nonlocal call_count
call_count += 1
timeout_err = FakeTimeoutError(FakeTimeoutType.HEARTBEAT)
raise FakeActivityError(cause=timeout_err)
with pytest.raises(FakeActivityError):
await retry_on_heartbeat_timeout(always_heartbeat_timeout)
assert call_count == 4 # initial + 3 retries
@pytest.mark.asyncio
async def test_passes_args_and_kwargs(self):
async def add(a, b, extra=0):
return a + b + extra
result = await retry_on_heartbeat_timeout(add, 1, 2, extra=10)
assert result == 13
@pytest.mark.asyncio
async def test_retries_on_message_match(self):
call_count = 0
async def flaky():
nonlocal call_count
call_count += 1
if call_count == 1:
raise Exception("activity Heartbeat timeout")
return "recovered"
result = await retry_on_heartbeat_timeout(flaky, max_retries=3)
assert result == "recovered"
assert call_count == 2
+13 -1
View File
@@ -25,7 +25,8 @@
"./parse",
"./beta/agent",
"./extract",
"./classify"
"./classify",
"./temporal"
],
"exports": {
"./openapi.json": "./openapi.json",
@@ -95,6 +96,17 @@
},
"default": "./classify/dist/index.js"
},
"./temporal": {
"require": {
"types": "./temporal/dist/index.d.cts",
"default": "./temporal/dist/index.cjs"
},
"import": {
"types": "./temporal/dist/index.d.ts",
"default": "./temporal/dist/index.js"
},
"default": "./temporal/dist/index.js"
},
".": {
"require": {
"types": "./dist/index.d.cts",
+5
View File
@@ -24,3 +24,8 @@ export type {
ClassifyJobResults,
ClassifyParsingConfiguration,
} from "./classify.js";
export {
isHeartbeatTimeoutError,
retryOnHeartbeatTimeout,
type RetryOnHeartbeatTimeoutOptions,
} from "./temporal.js";
+111
View File
@@ -0,0 +1,111 @@
/**
* Utilities for retrying LlamaParse activities in Temporal
* specifically when they fail due to activity heartbeat timeout.
*
* These utilities detect Temporal's ActivityFailure wrapping a TimeoutFailure
* with timeoutType HEARTBEAT, without requiring a direct dependency on the
* Temporal SDK.
*
* Usage in a Temporal workflow:
*
* ```ts
* import { retryOnHeartbeatTimeout } from 'llama-cloud-services/temporal';
* import { proxyActivities } from '@temporalio/workflow';
*
* const { llamaParse } = proxyActivities<typeof activities>({
* startToCloseTimeout: '30m',
* heartbeatTimeout: '60s',
* retry: { maximumAttempts: 1 }, // disable built-in retry
* });
*
* export async function parseWorkflow(input: ParseInput): Promise<ParseResult> {
* return retryOnHeartbeatTimeout(() => llamaParse(input), { maxRetries: 3 });
* }
* ```
*/
// Temporal TimeoutType enum value for HEARTBEAT (from @temporalio/common proto)
const TIMEOUT_TYPE_HEARTBEAT = 4;
/**
* Checks if an error is a Temporal activity heartbeat timeout error.
*
* Detects both the structured Temporal error types (ActivityFailure wrapping
* TimeoutFailure with timeoutType HEARTBEAT) and fallback string matching
* on the error message.
*/
export function isHeartbeatTimeoutError(error: unknown): boolean {
if (!error || typeof error !== "object") return false;
const err = error as Record<string, unknown>;
// Check for Temporal's ActivityFailure → TimeoutFailure chain
if (err.name === "ActivityFailure" || err.name === "ActivityError") {
const cause =
(err.cause as Record<string, unknown>) ??
((error as { __cause__?: unknown }).__cause__ as Record<string, unknown>);
if (cause && typeof cause === "object") {
const causeName = (cause as Record<string, unknown>).name;
if (causeName === "TimeoutFailure" || causeName === "TimeoutError") {
const timeoutType = (cause as Record<string, unknown>).timeoutType;
return (
timeoutType === TIMEOUT_TYPE_HEARTBEAT ||
timeoutType === "HEARTBEAT" ||
timeoutType === "TimeoutType.HEARTBEAT"
);
}
}
}
// Fallback: match on error message
const message = String(
(err as { message?: string }).message ?? String(err),
).toLowerCase();
return (
message.includes("heartbeat timeout") ||
message.includes("heartbeat_timeout")
);
}
export interface RetryOnHeartbeatTimeoutOptions {
/** Maximum number of retry attempts after the initial try (default: 3). */
maxRetries?: number;
/** Optional callback invoked before each retry with the attempt number and error. */
onRetry?: (attempt: number, error: unknown) => void;
}
/**
* Retries an async function only when it fails with a heartbeat timeout error.
* All other errors are thrown immediately without retry.
*
* Designed for use in Temporal workflows to wrap activity calls so that
* transient heartbeat timeouts (e.g. from SIGTERM, event-loop blocking)
* are automatically retried while genuine failures propagate immediately.
*
* @param fn - The async function to execute (typically a Temporal activity call)
* @param options - Retry configuration
* @returns The result of the function
*/
export async function retryOnHeartbeatTimeout<T>(
fn: () => Promise<T>,
options: RetryOnHeartbeatTimeoutOptions = {},
): Promise<T> {
const { maxRetries = 3, onRetry } = options;
let lastError: unknown;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
if (isHeartbeatTimeoutError(err) && attempt < maxRetries) {
onRetry?.(attempt + 1, err);
continue;
}
throw err;
}
}
throw lastError;
}
@@ -0,0 +1,8 @@
{
"type": "module",
"main": "./dist/index.cjs",
"module": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": "./dist/index.js",
"private": true
}
@@ -0,0 +1,195 @@
import { describe, it, expect, vi } from "vitest";
import {
isHeartbeatTimeoutError,
retryOnHeartbeatTimeout,
} from "../src/temporal.js";
describe("Temporal Heartbeat Timeout Utilities", () => {
describe("isHeartbeatTimeoutError", () => {
it("should detect ActivityFailure wrapping TimeoutFailure with HEARTBEAT type (numeric)", () => {
const error = {
name: "ActivityFailure",
message: "activity failed",
cause: {
name: "TimeoutFailure",
message: "timeout",
timeoutType: 4,
},
};
expect(isHeartbeatTimeoutError(error)).toBe(true);
});
it("should detect ActivityFailure wrapping TimeoutFailure with HEARTBEAT type (string)", () => {
const error = {
name: "ActivityFailure",
message: "activity failed",
cause: {
name: "TimeoutFailure",
message: "timeout",
timeoutType: "HEARTBEAT",
},
};
expect(isHeartbeatTimeoutError(error)).toBe(true);
});
it("should detect ActivityError wrapping TimeoutError with HEARTBEAT type", () => {
const error = {
name: "ActivityError",
message: "activity error",
cause: {
name: "TimeoutError",
message: "timeout",
timeoutType: 4,
},
};
expect(isHeartbeatTimeoutError(error)).toBe(true);
});
it("should reject ActivityFailure with non-heartbeat timeout type", () => {
const error = {
name: "ActivityFailure",
message: "activity failed",
cause: {
name: "TimeoutFailure",
message: "timeout",
timeoutType: 1, // START_TO_CLOSE
},
};
expect(isHeartbeatTimeoutError(error)).toBe(false);
});
it("should reject ActivityFailure with non-timeout cause", () => {
const error = {
name: "ActivityFailure",
message: "activity failed",
cause: {
name: "ApplicationFailure",
message: "app error",
},
};
expect(isHeartbeatTimeoutError(error)).toBe(false);
});
it("should detect heartbeat timeout from error message", () => {
const error = new Error("activity Heartbeat timeout");
expect(isHeartbeatTimeoutError(error)).toBe(true);
});
it("should detect heartbeat_timeout in error message", () => {
const error = new Error("Failed due to heartbeat_timeout");
expect(isHeartbeatTimeoutError(error)).toBe(true);
});
it("should reject non-heartbeat errors", () => {
const error = new Error("Connection refused");
expect(isHeartbeatTimeoutError(error)).toBe(false);
});
it("should handle null/undefined", () => {
expect(isHeartbeatTimeoutError(null)).toBe(false);
expect(isHeartbeatTimeoutError(undefined)).toBe(false);
});
it("should handle non-object errors", () => {
expect(isHeartbeatTimeoutError("string error")).toBe(false);
expect(isHeartbeatTimeoutError(42)).toBe(false);
});
});
describe("retryOnHeartbeatTimeout", () => {
it("should return result on success", async () => {
const fn = vi.fn().mockResolvedValue("success");
const result = await retryOnHeartbeatTimeout(fn);
expect(result).toBe("success");
expect(fn).toHaveBeenCalledTimes(1);
});
it("should retry on heartbeat timeout error", async () => {
const heartbeatError = Object.assign(new Error("timeout"), {
name: "ActivityFailure",
cause: { name: "TimeoutFailure", timeoutType: 4 },
});
const fn = vi
.fn()
.mockRejectedValueOnce(heartbeatError)
.mockResolvedValue("recovered");
const result = await retryOnHeartbeatTimeout(fn, { maxRetries: 3 });
expect(result).toBe("recovered");
expect(fn).toHaveBeenCalledTimes(2);
});
it("should not retry on non-heartbeat errors", async () => {
const appError = new Error("Application error");
const fn = vi.fn().mockRejectedValue(appError);
await expect(retryOnHeartbeatTimeout(fn)).rejects.toThrow(
"Application error",
);
expect(fn).toHaveBeenCalledTimes(1);
});
it("should exhaust retries on persistent heartbeat timeout", async () => {
const heartbeatError = Object.assign(new Error("timeout"), {
name: "ActivityFailure",
cause: { name: "TimeoutFailure", timeoutType: 4 },
});
const fn = vi.fn().mockRejectedValue(heartbeatError);
await expect(
retryOnHeartbeatTimeout(fn, { maxRetries: 2 }),
).rejects.toBe(heartbeatError);
expect(fn).toHaveBeenCalledTimes(3); // initial + 2 retries
});
it("should call onRetry callback before each retry", async () => {
const heartbeatError = Object.assign(new Error("timeout"), {
name: "ActivityFailure",
cause: { name: "TimeoutFailure", timeoutType: 4 },
});
const fn = vi
.fn()
.mockRejectedValueOnce(heartbeatError)
.mockRejectedValueOnce(heartbeatError)
.mockResolvedValue("ok");
const onRetry = vi.fn();
const result = await retryOnHeartbeatTimeout(fn, {
maxRetries: 3,
onRetry,
});
expect(result).toBe("ok");
expect(onRetry).toHaveBeenCalledTimes(2);
expect(onRetry).toHaveBeenCalledWith(1, heartbeatError);
expect(onRetry).toHaveBeenCalledWith(2, heartbeatError);
});
it("should default to 3 retries", async () => {
const heartbeatError = Object.assign(new Error("timeout"), {
name: "ActivityFailure",
cause: { name: "TimeoutFailure", timeoutType: 4 },
});
const fn = vi.fn().mockRejectedValue(heartbeatError);
await expect(retryOnHeartbeatTimeout(fn)).rejects.toBe(heartbeatError);
expect(fn).toHaveBeenCalledTimes(4); // initial + 3 retries
});
it("should detect heartbeat timeout from message and retry", async () => {
const messageError = new Error("activity Heartbeat timeout");
const fn = vi
.fn()
.mockRejectedValueOnce(messageError)
.mockResolvedValue("recovered");
const result = await retryOnHeartbeatTimeout(fn);
expect(result).toBe("recovered");
expect(fn).toHaveBeenCalledTimes(2);
});
});
});