Compare commits

...

4 Commits

Author SHA1 Message Date
Neeraj Pradhan d246a6ea3c also add file ids to Sourcetext 2025-10-21 11:55:43 -07:00
Neeraj Pradhan 1acc52e9ea Simplify names 2025-10-21 11:22:35 -07:00
Neeraj Pradhan c9caa7e439 address comments 2025-10-20 12:41:04 -07:00
Neeraj Pradhan 6f03b6a1b3 Add common SourceText class for classify/extract text inputs 2025-10-20 12:17:53 -07:00
8 changed files with 353 additions and 133 deletions
+3 -1
View File
@@ -1,5 +1,6 @@
from llama_cloud_services.parse import LlamaParse
from llama_cloud_services.extract import LlamaExtract, ExtractionAgent, SourceText
from llama_cloud_services.extract import LlamaExtract, ExtractionAgent
from llama_cloud_services.utils import SourceText, FileInput
from llama_cloud_services.constants import EU_BASE_URL
from llama_cloud_services.index import (
LlamaCloudCompositeRetriever,
@@ -12,6 +13,7 @@ __all__ = [
"LlamaExtract",
"ExtractionAgent",
"SourceText",
"FileInput",
"EU_BASE_URL",
"LlamaCloudIndex",
"LlamaCloudRetriever",
@@ -0,0 +1,10 @@
from llama_cloud_services.beta.classifier.client import ClassifyClient
from llama_cloud_services.beta.classifier.types import ClassifyJobResultsWithFiles
from llama_cloud_services.utils import SourceText, FileInput
__all__ = [
"ClassifyClient",
"ClassifyJobResultsWithFiles",
"SourceText",
"FileInput",
]
+145 -27
View File
@@ -1,6 +1,7 @@
import asyncio
import time
from typing import Optional
import warnings
from typing import Optional, List, Union
from pydantic import BaseModel
from llama_cloud.client import AsyncLlamaCloud
from llama_cloud.types import (
@@ -14,7 +15,11 @@ from llama_cloud.types import (
from llama_cloud.resources.classifier.client import OMIT
from llama_cloud_services.files.client import FileClient
from llama_cloud_services.constants import POLLING_TIMEOUT_SECONDS
from llama_cloud_services.utils import is_terminal_status, augment_async_errors
from llama_cloud_services.utils import (
is_terminal_status,
augment_async_errors,
FileInput,
)
from llama_index.core.async_utils import DEFAULT_NUM_WORKERS, run_jobs
from llama_cloud_services.beta.classifier.types import (
ClassifyJobResultsWithFiles,
@@ -166,6 +171,98 @@ class ClassifyClient:
)
)
async def aclassify(
self,
rules: list[ClassifierRule],
files: Union[FileInput, List[FileInput]],
parsing_configuration: Optional[ClassifyParsingConfiguration] = None,
raise_on_error: bool = True,
workers: int = DEFAULT_NUM_WORKERS,
show_progress: bool = False,
) -> ClassifyJobResultsWithFiles:
"""
Classify one or more files from various input types.
Args:
rules: The rules to use for classification.
files: The file(s) to classify. Can be a single file or list of files. Each can be:
- str/Path: File path
- SourceText: Text content or file with explicit filename
- File: Already uploaded file
- BufferedIOBase: File-like object
parsing_configuration: The parsing configuration to use for classification.
raise_on_error: Whether to raise an error if the classification job fails.
workers: Number of parallel workers for uploading files.
show_progress: Whether to show progress bars.
Returns:
The results of the classification job with file metadata.
"""
# Normalize to list
if not isinstance(files, list):
files = [files]
# Upload all files
coroutines = [
self.file_client.upload_content(file_input) for file_input in files
]
uploaded_files: List[File] = await run_jobs(
coroutines,
show_progress=show_progress,
workers=workers,
desc="Uploading files for classification",
)
# Classify
results = await self.aclassify_file_ids(
rules,
[file.id for file in uploaded_files],
parsing_configuration,
raise_on_error,
)
return ClassifyJobResultsWithFiles.from_classify_job_results(
results, uploaded_files
)
def classify(
self,
rules: list[ClassifierRule],
files: Union[FileInput, List[FileInput]],
parsing_configuration: Optional[ClassifyParsingConfiguration] = None,
raise_on_error: bool = True,
workers: int = DEFAULT_NUM_WORKERS,
show_progress: bool = False,
) -> ClassifyJobResultsWithFiles:
"""
Classify one or more files from various input types (synchronous version).
Args:
rules: The rules to use for classification.
files: The file(s) to classify. Can be a single file or list of files. Each can be:
- str/Path: File path
- SourceText: Text content or file with explicit filename
- File: Already uploaded file
- BufferedIOBase: File-like object
parsing_configuration: The parsing configuration to use for classification.
raise_on_error: Whether to raise an error if the classification job fails.
workers: Number of parallel workers for uploading files.
show_progress: Whether to show progress bars.
Returns:
The results of the classification job with file metadata.
"""
with augment_async_errors():
return asyncio.run(
self.aclassify(
rules,
files,
parsing_configuration,
raise_on_error,
workers,
show_progress,
)
)
async def aclassify_file_path(
self,
rules: list[ClassifierRule],
@@ -173,11 +270,17 @@ class ClassifyClient:
parsing_configuration: Optional[ClassifyParsingConfiguration] = None,
raise_on_error: bool = True,
) -> ClassifyJobResultsWithFiles:
file = await self.file_client.upload_file(file_input_path)
results = await self.aclassify_file_ids(
rules, [file.id], parsing_configuration, raise_on_error
"""
Deprecated: Use aclassify() instead.
"""
warnings.warn(
"aclassify_file_path is deprecated, use aclassify() instead",
DeprecationWarning,
stacklevel=2,
)
return await self.aclassify(
rules, file_input_path, parsing_configuration, raise_on_error
)
return ClassifyJobResultsWithFiles.from_classify_job_results(results, [file])
def classify_file_path(
self,
@@ -186,12 +289,17 @@ class ClassifyClient:
parsing_configuration: Optional[ClassifyParsingConfiguration] = None,
raise_on_error: bool = True,
) -> ClassifyJobResultsWithFiles:
with augment_async_errors():
return asyncio.run(
self.aclassify_file_path(
rules, file_input_path, parsing_configuration, raise_on_error
)
)
"""
Deprecated: Use classify() instead.
"""
warnings.warn(
"classify_file_path is deprecated, use classify() instead",
DeprecationWarning,
stacklevel=2,
)
return self.classify(
rules, file_input_path, parsing_configuration, raise_on_error
)
async def aclassify_file_paths(
self,
@@ -202,17 +310,22 @@ class ClassifyClient:
workers: int = DEFAULT_NUM_WORKERS,
show_progress: bool = False,
) -> ClassifyJobResultsWithFiles:
coroutines = [self.file_client.upload_file(path) for path in file_input_paths]
files: list[File] = await run_jobs(
coroutines,
show_progress=show_progress,
workers=workers,
desc="Uploading files for classification",
"""
Deprecated: Use aclassify() instead.
"""
warnings.warn(
"aclassify_file_paths is deprecated, use aclassify() instead",
DeprecationWarning,
stacklevel=2,
)
results = await self.aclassify_file_ids(
rules, [file.id for file in files], parsing_configuration, raise_on_error
return await self.aclassify(
rules,
file_input_paths,
parsing_configuration,
raise_on_error,
workers,
show_progress,
)
return ClassifyJobResultsWithFiles.from_classify_job_results(results, files)
def classify_file_paths(
self,
@@ -221,12 +334,17 @@ class ClassifyClient:
parsing_configuration: Optional[ClassifyParsingConfiguration] = None,
raise_on_error: bool = True,
) -> ClassifyJobResultsWithFiles:
with augment_async_errors():
return asyncio.run(
self.aclassify_file_paths(
rules, file_input_paths, parsing_configuration, raise_on_error
)
)
"""
Deprecated: Use classify() instead.
"""
warnings.warn(
"classify_file_paths is deprecated, use classify() instead",
DeprecationWarning,
stacklevel=2,
)
return self.classify(
rules, file_input_paths, parsing_configuration, raise_on_error
)
async def wait_for_job_completion(self, job_id: str) -> ClassifyJob:
"""
+2 -1
View File
@@ -2,15 +2,16 @@ from llama_cloud_services.extract.extract import (
LlamaExtract,
ExtractConfig,
ExtractionAgent,
SourceText,
ExtractTarget,
ExtractMode,
)
from llama_cloud_services.utils import SourceText, FileInput
__all__ = [
"LlamaExtract",
"ExtractionAgent",
"SourceText",
"FileInput",
"ExtractConfig",
"ExtractTarget",
"ExtractMode",
+7 -100
View File
@@ -2,10 +2,9 @@ import asyncio
import base64
import os
import time
from io import BufferedIOBase, BufferedReader, BytesIO, TextIOWrapper
from io import BufferedIOBase, TextIOWrapper
from pathlib import Path
from typing import List, Optional, Type, Union, Coroutine, Any, TypeVar
import secrets
import warnings
import httpx
from pydantic import BaseModel
@@ -33,7 +32,8 @@ from llama_cloud_services.extract.utils import (
JSONObjectType,
ExperimentalWarning,
)
from llama_cloud_services.utils import augment_async_errors
from llama_cloud_services.utils import augment_async_errors, SourceText, FileInput
from llama_cloud_services.files.client import FileClient
from llama_index.core.schema import BaseComponent
from llama_index.core.async_utils import run_jobs
from llama_index.core.bridge.pydantic import Field, PrivateAttr
@@ -188,46 +188,6 @@ async def _wait_for_job_result(
)
class SourceText:
def __init__(
self,
*,
file: Union[bytes, BufferedIOBase, TextIOWrapper, str, Path, None] = None,
text_content: Optional[str] = None,
filename: Optional[str] = None,
):
self.file = file
self.filename = filename
self.text_content = text_content
self._validate()
def _validate(self) -> None:
"""Ensure filename is provided when needed."""
if not ((self.file is None) ^ (self.text_content is None)):
raise ValueError("Either file or text_content must be provided.")
if self.text_content is not None:
if not self.filename:
random_hex = secrets.token_hex(4)
self.filename = f"text_input_{random_hex}.txt"
return
if isinstance(self.file, (bytes, BufferedIOBase, TextIOWrapper)):
if not self.filename and hasattr(self.file, "name"):
self.filename = os.path.basename(str(self.file.name))
elif not hasattr(self.file, "name") and self.filename is None:
raise ValueError(
"filename must be provided when file is bytes or a file-like object without a name"
)
elif isinstance(self.file, (str, Path)):
if not self.filename:
self.filename = os.path.basename(str(self.file))
else:
raise ValueError(f"Unsupported file type: {type(self.file)}")
FileInput = Union[str, Path, BufferedIOBase, SourceText, File]
def run_in_thread(
coro: Coroutine[Any, Any, T],
thread_pool: ThreadPoolExecutor,
@@ -320,6 +280,7 @@ class ExtractionAgent:
self._thread_pool = ThreadPoolExecutor(
max_workers=min(10, (os.cpu_count() or 1) + 4)
)
self._file_client = FileClient(client, project_id, organization_id)
@property
def id(self) -> str:
@@ -369,65 +330,11 @@ class ExtractionAgent:
ValueError: If filename is not provided for bytes input or for file-like objects
without a name attribute.
"""
file_contents: Optional[Union[BufferedIOBase, BytesIO]] = None
try:
if file_input.text_content is not None:
# Handle direct text content
file_contents = BytesIO(file_input.text_content.encode("utf-8"))
elif isinstance(file_input.file, TextIOWrapper):
# Handle text-based IO objects
file_contents = BytesIO(file_input.file.read().encode("utf-8"))
elif isinstance(file_input.file, (str, Path)):
# Handle file paths
file_contents = open(file_input.file, "rb")
elif isinstance(file_input.file, bytes):
# Handle bytes
file_contents = BytesIO(file_input.file)
elif isinstance(file_input.file, BufferedIOBase):
# Handle binary IO objects
file_contents = file_input.file
else:
raise ValueError(f"Unsupported file type: {type(file_input.file)}")
# Add name attribute to file object if needed
if not hasattr(file_contents, "name"):
file_contents.name = file_input.filename # type: ignore
return await self._client.files.upload_file(
project_id=self._project_id, upload_file=file_contents
)
finally:
if file_contents is not None and isinstance(
file_contents, (BufferedReader, BytesIO)
):
file_contents.close()
return await self._file_client.upload_content(file_input)
async def _upload_file(self, file_input: FileInput) -> File:
source_text = None
if isinstance(file_input, File):
return file_input
if isinstance(file_input, SourceText):
source_text = file_input
elif isinstance(file_input, (str, Path)):
path = Path(file_input)
source_text = SourceText(file=path, filename=path.name)
else:
# Try to get filename from the file object if not provided
filename = None
if hasattr(file_input, "name"):
filename = os.path.basename(str(file_input.name))
if filename is None:
raise ValueError(
"Use SourceText to provide filename when uploading bytes or file-like objects."
)
warnings.warn(
"Use SourceText instead of bytes or file-like objects",
DeprecationWarning,
)
source_text = SourceText(file=file_input, filename=filename)
return await self.upload_file(source_text)
"""Upload a file from various input types using FileClient."""
return await self._file_client.upload_content(file_input)
async def _wait_for_job_result(self, job_id: str) -> Optional[ExtractRun]:
"""Wait for and return the results of an extraction job."""
+82
View File
@@ -1,9 +1,11 @@
from io import BytesIO
from typing import BinaryIO
import os
from pathlib import Path
from llama_cloud.client import AsyncLlamaCloud
from llama_cloud.types import File, FileCreate
from typing import Optional
from llama_cloud_services.utils import SourceText, FileInput
class FileClient:
@@ -95,3 +97,83 @@ class FileClient:
project_id=self.project_id,
organization_id=self.organization_id,
)
async def upload_content(
self, file_input: FileInput, external_file_id: Optional[str] = None
) -> File:
"""
Upload content from various input types or fetch an already-uploaded file.
Args:
file_input: The content to upload. Can be:
- File: Already uploaded file (returned as-is)
- str/Path: Path to a file on disk
- SourceText: Text content, file, or file_id with explicit filename
- BufferedIOBase: File-like binary object
external_file_id: Optional external identifier for the file
Returns:
File: The uploaded (or fetched) file object
Raises:
ValueError: If the input type is not supported or required info is missing
"""
# If already a File object, return it
if isinstance(file_input, File):
return file_input
# Handle SourceText
if isinstance(file_input, SourceText):
# If file_id is provided, fetch the file object
if file_input.file_id is not None:
return await self.get_file(file_input.file_id)
elif file_input.text_content is not None:
# Handle direct text content
text_bytes = file_input.text_content.encode("utf-8")
return await self.upload_bytes(
text_bytes, external_file_id or file_input.filename or "file"
)
elif isinstance(file_input.file, (str, Path)):
# Handle file paths using the existing upload_file method
return await self.upload_file(
str(file_input.file), external_file_id or file_input.filename
)
elif isinstance(file_input.file, bytes):
# Handle bytes
return await self.upload_bytes(
file_input.file, external_file_id or file_input.filename or "file"
)
elif hasattr(file_input.file, "read"):
# Handle any file-like object (TextIOWrapper, BytesIO, BufferedReader, BufferedIOBase, etc.)
content = file_input.file.read() # type: ignore
if isinstance(content, str):
content = content.encode("utf-8")
return await self.upload_bytes(
content, external_file_id or file_input.filename or "file"
)
else:
raise ValueError(f"Unsupported file type: {type(file_input.file)}")
# Handle string/Path directly
elif isinstance(file_input, (str, Path)):
return await self.upload_file(str(file_input), external_file_id)
# Handle raw file-like objects
elif hasattr(file_input, "read"):
if hasattr(file_input, "name"):
filename = os.path.basename(str(file_input.name))
else:
filename = external_file_id or "file"
# Read content to determine size
content = file_input.read()
if isinstance(content, str):
content = content.encode("utf-8")
return await self.upload_bytes(content, external_file_id or filename)
else:
raise ValueError(
f"Unsupported file input type: {type(file_input)}. "
f"Supported types: str, Path, SourceText, BufferedIOBase, or File."
)
+102 -2
View File
@@ -3,11 +3,14 @@ import importlib.metadata
from contextlib import contextmanager
from typing import Generator
import difflib
from llama_cloud.types import StatusEnum
from llama_cloud.types import StatusEnum, File
import httpx
import packaging.version
from pydantic import BaseModel
from typing import Any, Dict, List, Tuple, Type
from typing import Any, Dict, List, Tuple, Type, Union, Optional
from io import BufferedIOBase, TextIOWrapper
from pathlib import Path
import secrets
# Asyncio error messages
nest_asyncio_err = "cannot be called from a running event loop"
@@ -104,3 +107,100 @@ def augment_async_errors() -> Generator[None, None, None]:
if nest_asyncio_err in str(e):
raise RuntimeError(nest_asyncio_msg)
raise
class SourceText:
"""
A wrapper class for providing text or file input with optional filename specification.
This class allows you to provide input in multiple ways:
- Direct text content via text_content parameter
- File paths as strings or Path objects
- Raw bytes
- File-like objects (BufferedIOBase, TextIOWrapper)
- Already-uploaded file ID via file_id parameter
Args:
file: The file input (bytes, file-like object, str path, or Path).
Mutually exclusive with text_content and file_id.
text_content: Raw text content to process. Mutually exclusive with file and file_id.
file_id: ID of an already-uploaded file. Mutually exclusive with file and text_content.
filename: Optional filename. Required for bytes/file-like objects without names.
If not provided, will be auto-generated for text_content or inferred from paths.
Examples:
# Direct text input
source = SourceText(text_content="Hello world")
# File path
source = SourceText(file="document.pdf")
# Bytes with filename
source = SourceText(file=b"...", filename="document.pdf")
# File-like object (will read from current position)
with open("document.pdf", "rb") as f:
source = SourceText(file=f)
# Already-uploaded file
source = SourceText(file_id="file_abc123")
"""
def __init__(
self,
*,
file: Union[bytes, BufferedIOBase, TextIOWrapper, str, Path, None] = None,
text_content: Optional[str] = None,
file_id: Optional[str] = None,
filename: Optional[str] = None,
):
self.file = file
self.filename = filename
self.text_content = text_content
self.file_id = file_id
self._validate()
def _validate(self) -> None:
"""Ensure filename is provided when needed."""
# Check that exactly one of file, text_content, or file_id is provided
provided = sum(
[
self.file is not None,
self.text_content is not None,
self.file_id is not None,
]
)
if provided == 0:
raise ValueError("One of file, text_content, or file_id must be provided.")
elif provided > 1:
raise ValueError(
"Only one of file, text_content, or file_id can be provided."
)
# If file_id is provided, we don't need filename validation
if self.file_id is not None:
return
if self.text_content is not None:
if not self.filename:
random_hex = secrets.token_hex(4)
self.filename = f"text_input_{random_hex}.txt"
return
if isinstance(self.file, (bytes, BufferedIOBase, TextIOWrapper)):
if not self.filename and hasattr(self.file, "name"):
self.filename = os.path.basename(str(self.file.name))
elif self.filename is None and not hasattr(self.file, "name"):
raise ValueError(
"filename must be provided when file is bytes or a file-like object without a name"
)
elif isinstance(self.file, (str, Path)):
if not self.filename:
self.filename = os.path.basename(str(self.file))
else:
raise ValueError(f"Unsupported file type: {type(self.file)}")
# Type alias for file input that can be used across services
FileInput = Union[str, Path, BufferedIOBase, SourceText, File]
Generated
+2 -2
View File
@@ -1,5 +1,5 @@
version = 1
revision = 3
revision = 2
requires-python = ">=3.9, <4.0"
resolution-markers = [
"python_full_version >= '3.14'",
@@ -1596,7 +1596,7 @@ wheels = [
[[package]]
name = "llama-cloud-services"
version = "0.6.73"
version = "0.6.76"
source = { editable = "." }
dependencies = [
{ name = "click", version = "8.1.8", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" },