[Feat] Add support for returning images with gemini/gemini-2.5-flash-image-preview with /chat/completions (#13983)

* add gemini-2.5-flash-image-preview

* add gemini-2.5-flash-image-preview

* add image in ChatCompletionResponseMessage

* test_gemini_image_generation_async

* Revert "Merge pull request #13394 from Deviad/feature/enhance_logging_for_containers"

This reverts commit 539b94ad4e, reversing
changes made to 71af7bcf9c.

* include `image` in Delta

* fix _process_candidates should show the image response

* fix: _handle_special_delta_attributes

* test_gemini_image_generation_async_stream

* image_generation_chat

* UI - allow looking at generated images from /chat/completions

* _create_streaming_choice

* fix import StreamingChoices

* fix ChatCompletionResponseMessage

* test_gemini_image_generation

* add gemini img migration

* fix _extract_candidate_metadata

* ui fix

* fix batch endpoint test
This commit is contained in:
Ishaan Jaff
2025-08-27 16:16:19 -07:00
committed by GitHub
parent 9acf80b8ad
commit 04dc1a5351
17 changed files with 991 additions and 815 deletions
-1
View File
@@ -10,4 +10,3 @@ tests
*.tgz
log.txt
docker/Dockerfile.*
*.whl
+1 -2
View File
@@ -95,5 +95,4 @@ test.py
litellm_config.yaml
.cursor
.vscode/launch.json
*.whl
litellm/proxy/to_delete_loadtest_work/*
litellm/proxy/to_delete_loadtest_work/*
@@ -0,0 +1,232 @@
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';
# Image Generation in Chat Completions, Responses API
This guide covers how to generate images when using the `chat/completions`. Note - if you want this on Responses API please file a Feature Request [here](https://github.com/BerriAI/litellm/issues/new).
:::info
Requires LiteLLM v1.76.1+
:::
Supported Providers:
- Google AI Studio (`gemini`)
- Vertex AI (`vertex_ai/`)
LiteLLM will standardize the `image` response in the assistant message for models that support image generation during chat completions.
```python title="Example response from litellm"
"message": {
...
"content": "Here's the image you requested:",
"image": {
"url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...",
"detail": "auto"
}
}
```
## Quick Start
<Tabs>
<TabItem value="sdk" label="SDK">
```python showLineNumbers title="Image generation with chat completion"
from litellm import completion
import os
os.environ["GEMINI_API_KEY"] = "your-api-key"
response = completion(
model="gemini/gemini-2.5-flash-image-preview",
messages=[
{"role": "user", "content": "Generate an image of a banana wearing a costume that says LiteLLM"}
],
)
print(response.choices[0].message.content) # Text response
print(response.choices[0].message.image) # Image data
```
</TabItem>
<TabItem value="proxy" label="PROXY">
1. Setup config.yaml
```yaml showLineNumbers title="config.yaml"
model_list:
- model_name: gemini-image-gen
litellm_params:
model: gemini/gemini-2.5-flash-image-preview
api_key: os.environ/GEMINI_API_KEY
```
2. Run proxy server
```bash showLineNumbers title="Start the proxy"
litellm --config config.yaml
# RUNNING on http://0.0.0.0:4000
```
3. Test it!
```bash showLineNumbers title="Make request"
curl http://0.0.0.0:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $LITELLM_KEY" \
-d '{
"model": "gemini-image-gen",
"messages": [
{
"role": "user",
"content": "Generate an image of a banana wearing a costume that says LiteLLM"
}
]
}'
```
</TabItem>
</Tabs>
**Expected Response**
```bash
{
"id": "chatcmpl-3b66124d79a708e10c603496b363574c",
"choices": [
{
"finish_reason": "stop",
"index": 0,
"message": {
"content": "Here's the image you requested:",
"role": "assistant",
"image": {
"url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...",
"detail": "auto"
}
}
}
],
"created": 1723323084,
"model": "gemini/gemini-2.5-flash-image-preview",
"object": "chat.completion",
"usage": {
"completion_tokens": 12,
"prompt_tokens": 16,
"total_tokens": 28
}
}
```
## Streaming Support
<Tabs>
<TabItem value="sdk" label="SDK">
```python showLineNumbers title="Streaming image generation"
from litellm import completion
import os
os.environ["GEMINI_API_KEY"] = "your-api-key"
response = completion(
model="gemini/gemini-2.5-flash-image-preview",
messages=[
{"role": "user", "content": "Generate an image of a banana wearing a costume that says LiteLLM"}
],
stream=True,
)
for chunk in response:
if hasattr(chunk.choices[0].delta, "image") and chunk.choices[0].delta.image is not None:
print("Generated image:", chunk.choices[0].delta.image["url"])
break
```
</TabItem>
<TabItem value="proxy" label="PROXY">
```bash showLineNumbers title="Streaming request"
curl http://0.0.0.0:4000/v1/chat/completions \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $LITELLM_KEY" \
-d '{
"model": "gemini-image-gen",
"messages": [
{
"role": "user",
"content": "Generate an image of a banana wearing a costume that says LiteLLM"
}
],
"stream": true
}'
```
</TabItem>
</Tabs>
**Expected Streaming Response**
```bash
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1723323084,"model":"gemini/gemini-2.5-flash-image-preview","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1723323084,"model":"gemini/gemini-2.5-flash-image-preview","choices":[{"index":0,"delta":{"content":"Here's the image you requested:"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1723323084,"model":"gemini/gemini-2.5-flash-image-preview","choices":[{"index":0,"delta":{"image":{"url":"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...","detail":"auto"}},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1723323084,"model":"gemini/gemini-2.5-flash-image-preview","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
```
## Async Support
```python showLineNumbers title="Async image generation"
from litellm import acompletion
import asyncio
import os
os.environ["GEMINI_API_KEY"] = "your-api-key"
async def generate_image():
response = await acompletion(
model="gemini/gemini-2.5-flash-image-preview",
messages=[
{"role": "user", "content": "Generate an image of a banana wearing a costume that says LiteLLM"}
],
)
print(response.choices[0].message.content) # Text response
print(response.choices[0].message.image) # Image data
return response
# Run the async function
asyncio.run(generate_image())
```
## Supported Models
| Provider | Model |
|----------|--------|
| Google AI Studio | `gemini/gemini-2.5-flash-image-preview` |
| Vertex AI | `vertex_ai/gemini-2.5-flash-image-preview` |
## Spec
The `image` field in the response follows this structure:
```python
"image": {
"url": "data:image/png;base64,<base64_encoded_image>",
"detail": "auto"
}
```
- `url` - str: Base64 encoded image data in data URI format
- `detail` - str: Image detail level (always "auto" for generated images)
The image is returned as a base64-encoded data URI that can be directly used in HTML `<img>` tags or saved to a file.
@@ -0,0 +1,201 @@
# Gemini Image Generation Migration Guide
## Who is impacted by this change?
Anyone using the following models with /chat/completions:
- `gemini/gemini-2.0-flash-exp-image-generation`
- `vertex_ai/gemini-2.5-flash-image-preview`
## Key Change
Gemini models now support image generation through chat completions. Images are returned in `response.choices[0].message.image` with base64 data URLs.
## Before and After
### Before
```python
from litellm import completion
response = completion(
model="gemini/gemini-2.0-flash-exp-image-generation",
messages=[{"role": "user", "content": "Generate an image of a cat"}],
modalities=["image", "text"],
)
base_64_image_data = response.choices[0].message.content
```
### After
```python
from litellm import completion
response = completion(
model="gemini/gemini-2.0-flash-exp-image-generation",
messages=[{"role": "user", "content": "Generate an image of a cat"}],
modalities=["image", "text"],
)
# Image is now available in the response
image_url = response.choices[0].message.image["url"] # "data:image/png;base64,..."
```
## Usage
### Using the Python SDK
**Key Change:**
```diff
# Before
-- base_64_image_data = response.choices[0].message.content
# After
++ image_url = response.choices[0].message.image["url"]
```
#### Basic Image Generation
```python
from litellm import completion
import os
# Set your API key
os.environ["GEMINI_API_KEY"] = "your-api-key"
# Generate an image
response = completion(
model="gemini/gemini-2.0-flash-exp-image-generation",
messages=[{"role": "user", "content": "Generate an image of a cat"}],
modalities=["image", "text"],
)
# Access the generated image
print(response.choices[0].message.content) # Text response (if any)
print(response.choices[0].message.image) # Image data
```
#### Response Format
The image is returned in the `message.image` field:
```python
{
"url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...",
"detail": "auto"
}
```
### Using the LiteLLM Proxy Server
**Key Change:**
```diff
# Before
-- "content": "base64-image-data..."
# After
++ "image": {
++ "url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...",
++ "detail": "auto"
++ }
```
#### Configuration Setup
1. **Configure your models in `config.yaml`:**
```yaml
model_list:
- model_name: gemini-image-gen
litellm_params:
model: gemini/gemini-2.0-flash-exp-image-generation
api_key: os.environ/GEMINI_API_KEY
- model_name: vertex-image-gen
litellm_params:
model: vertex_ai/gemini-2.5-flash-image-preview
vertex_project: your-project-id
vertex_location: us-central1
general_settings:
master_key: sk-1234 # Your proxy API key
```
2. **Start the proxy server:**
```bash
litellm --config /path/to/config.yaml
# RUNNING on http://0.0.0.0:4000
```
#### Making Requests
**Using OpenAI SDK:**
```python
from openai import OpenAI
# Point to your proxy server
client = OpenAI(
api_key="sk-1234", # Your proxy API key
base_url="http://0.0.0.0:4000"
)
response = client.chat.completions.create(
model="gemini-image-gen",
messages=[{"role": "user", "content": "Generate an image of a cat"}],
extra_body={"modalities": ["image", "text"]}
)
# Access the generated image
print(response.choices[0].message.content) # Text response (if any)
print(response.choices[0].message.image) # Image data
```
**Using curl:**
```bash
curl -X POST 'http://0.0.0.0:4000/v1/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gemini-image-gen",
"messages": [
{
"role": "user",
"content": "Generate an image of a cat"
}
],
"modalities": ["image", "text"]
}'
```
**Response format from proxy:**
```json
{
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1704089632,
"model": "gemini-image-gen",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Here's an image of a cat for you!",
"image": {
"url": "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAA...",
"detail": "auto"
}
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 10,
"completion_tokens": 8,
"total_tokens": 18
}
}
```
+1
View File
@@ -493,6 +493,7 @@ const sidebars = {
"guides/finetuned_models",
"guides/security_settings",
"completion/audio",
"completion/image_generation_chat",
"completion/web_search",
"completion/document_understanding",
"completion/vision",
+6 -43
View File
@@ -4,41 +4,21 @@ import os
import sys
from datetime import datetime
from logging import Formatter
set_verbose = False
def __strtobool(val: str) -> bool:
"""Convert a string representation of truth to true (1) or false (0).
True values are 'y', 'yes', 't', 'true', 'on', and '1'; false values
are 'n', 'no', 'f', 'false', 'off', and '0'. Raises ValueError if
'val' is anything else.
"""
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return True
elif val in ('n', 'no', 'f', 'false', 'off', '0'):
return False
else:
raise ValueError(f"invalid truth value {val!r}")
if set_verbose is True:
logging.warning(
"`litellm.set_verbose` is deprecated. Please set `os.environ['LITELLM_LOG'] = 'DEBUG'` for debug logs."
)
json_logs = __strtobool(os.getenv("JSON_LOGS", "False"))
json_logs = bool(os.getenv("JSON_LOGS", False))
# Create a handler for the logger (you may need to adapt this based on your needs)
log_level = os.getenv("LITELLM_LOG", "DEBUG")
numeric_level: str = getattr(logging, log_level.upper())
handler = logging.StreamHandler()
handler.setLevel(numeric_level)
log_file = os.getenv("LITELLM_LOG_FILE", "")
file_handler = None
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(numeric_level)
class JsonFormatter(Formatter):
def __init__(self):
super(JsonFormatter, self).__init__()
@@ -60,7 +40,6 @@ class JsonFormatter(Formatter):
return json.dumps(json_record)
json_formatter = JsonFormatter()
# Function to set up exception handlers for JSON logging
def _setup_json_exception_handlers(formatter):
@@ -110,10 +89,8 @@ def _setup_json_exception_handlers(formatter):
# Create a formatter and set it for the handler
if json_logs:
handler.setFormatter(json_formatter)
if file_handler:
file_handler.setFormatter(json_formatter)
_setup_json_exception_handlers(json_formatter)
handler.setFormatter(JsonFormatter())
_setup_json_exception_handlers(JsonFormatter())
else:
formatter = logging.Formatter(
"\033[92m%(asctime)s - %(name)s:%(levelname)s\033[0m: %(filename)s:%(lineno)s - %(message)s",
@@ -121,18 +98,11 @@ else:
)
handler.setFormatter(formatter)
if file_handler:
file_handler.setFormatter(formatter)
verbose_proxy_logger = logging.getLogger("LiteLLM Proxy")
verbose_router_logger = logging.getLogger("LiteLLM Router")
verbose_logger = logging.getLogger("LiteLLM")
# Set logger levels
verbose_proxy_logger.setLevel(numeric_level)
verbose_router_logger.setLevel(numeric_level)
verbose_logger.setLevel(numeric_level)
# Add the handler to the logger
verbose_router_logger.addHandler(handler)
verbose_proxy_logger.addHandler(handler)
@@ -155,13 +125,6 @@ def _suppress_loggers():
# Call the suppression function
_suppress_loggers()
if file_handler:
verbose_router_logger.addHandler(file_handler)
verbose_proxy_logger.addHandler(file_handler)
verbose_logger.addHandler(file_handler)
ALL_LOGGERS = [
logging.getLogger(),
verbose_logger,
@@ -190,10 +153,10 @@ def _turn_on_json():
- Adds a JSON formatter to all loggers
"""
handler = logging.StreamHandler()
handler.setFormatter(json_formatter)
handler.setFormatter(JsonFormatter())
_initialize_loggers_with_handler(handler)
# Set up exception handlers
_setup_json_exception_handlers(json_formatter)
_setup_json_exception_handlers(JsonFormatter())
def _turn_on_debug():
+73 -19
View File
@@ -20,7 +20,9 @@ from litellm.litellm_core_utils.redact_messages import LiteLLMLoggingObject
from litellm.litellm_core_utils.thread_pool_executor import executor
from litellm.types.llms.openai import ChatCompletionChunk
from litellm.types.router import GenericLiteLLMParams
from litellm.types.utils import Delta
from litellm.types.utils import (
Delta,
)
from litellm.types.utils import GenericStreamingChunk as GChunk
from litellm.types.utils import (
ModelResponse,
@@ -35,6 +37,12 @@ from .exception_mapping_utils import exception_type
from .llm_response_utils.get_api_base import get_api_base
from .rules import Rules
# Constants for special delta attribute names
AUDIO_ATTRIBUTE = "audio"
IMAGE_ATTRIBUTE = "image"
TOOL_CALLS_ATTRIBUTE = "tool_calls"
FUNCTION_CALL_ATTRIBUTE = "function_call"
def is_async_iterable(obj: Any) -> bool:
"""
@@ -766,6 +774,66 @@ class CustomStreamWrapper:
model_response.choices[0].delta = Delta(**_initial_delta)
return model_response
def _has_special_delta_content(self, model_response: ModelResponseStream) -> bool:
"""
Check if the delta contains special content types (tool_calls, function_call, audio, or image).
"""
if len(model_response.choices) == 0:
return False
delta = model_response.choices[0].delta
# Check for tool_calls or function_call
if getattr(delta, TOOL_CALLS_ATTRIBUTE, None) is not None or getattr(delta, FUNCTION_CALL_ATTRIBUTE, None) is not None:
return True
# Check for audio
if hasattr(delta, AUDIO_ATTRIBUTE) and getattr(delta, AUDIO_ATTRIBUTE, None) is not None:
return True
# Check for image
if hasattr(delta, IMAGE_ATTRIBUTE) and getattr(delta, IMAGE_ATTRIBUTE, None) is not None:
return True
return False
def _handle_special_delta_content(self, model_response: ModelResponseStream) -> ModelResponseStream:
"""
Handle special delta content types by stripping role and returning the response.
"""
return self.strip_role_from_delta(model_response)
def _has_special_delta_attribute(self, delta, attribute_name: str) -> bool:
"""
Check if delta has a specific attribute and it's not None.
"""
return delta is not None and getattr(delta, attribute_name, None) is not None
def _copy_delta_attribute(self, source_delta, target_delta, attribute_name: str) -> None:
"""
Copy a specific attribute from source delta to target delta.
"""
setattr(target_delta, attribute_name, getattr(source_delta, attribute_name))
def _has_any_special_delta_attributes(self, delta) -> bool:
"""
Check if delta has any special attributes (audio, image).
"""
special_attributes = [AUDIO_ATTRIBUTE, IMAGE_ATTRIBUTE]
for attribute in special_attributes:
if self._has_special_delta_attribute(delta, attribute):
return True
return False
def _handle_special_delta_attributes(self, delta, model_response: "ModelResponseStream") -> None:
"""
Handle special delta attributes (audio, image) by copying them to model_response.
"""
special_attributes = [AUDIO_ATTRIBUTE, IMAGE_ATTRIBUTE]
for attribute in special_attributes:
if self._has_special_delta_attribute(delta, attribute):
self._copy_delta_attribute(delta, model_response.choices[0].delta, attribute)
def return_processed_chunk_logic( # noqa
self,
completion_obj: Dict[str, Any],
@@ -888,20 +956,8 @@ class CustomStreamWrapper:
self.sent_last_chunk = True
return model_response
elif (
model_response.choices[0].delta.tool_calls is not None
or model_response.choices[0].delta.function_call is not None
):
model_response = self.strip_role_from_delta(model_response)
return model_response
elif (
len(model_response.choices) > 0
and hasattr(model_response.choices[0].delta, "audio")
and model_response.choices[0].delta.audio is not None
):
model_response = self.strip_role_from_delta(model_response)
return model_response
elif self._has_special_delta_content(model_response):
return self._handle_special_delta_content(model_response)
else:
if hasattr(model_response, "usage"):
self.chunks.append(model_response)
@@ -1374,10 +1430,8 @@ class CustomStreamWrapper:
)
)
model_response.choices[0].delta = Delta()
elif (
delta is not None and getattr(delta, "audio", None) is not None
):
model_response.choices[0].delta.audio = delta.audio
elif self._has_any_special_delta_attributes(delta):
self._handle_special_delta_attributes(delta, model_response)
else:
try:
delta = (
@@ -46,6 +46,7 @@ from litellm.types.llms.openai import (
ChatCompletionToolCallChunk,
ChatCompletionToolCallFunctionChunk,
ChatCompletionToolParamFunctionChunk,
ImageURLObject,
OpenAIChatCompletionFinishReason,
)
from litellm.types.llms.vertex_ai import (
@@ -89,11 +90,12 @@ from .transformation import (
if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.types.utils import ModelResponseStream
from litellm.types.utils import ModelResponseStream, StreamingChoices
LoggingClass = LiteLLMLoggingObj
else:
LoggingClass = Any
StreamingChoices = Any
class VertexAIBaseConfig:
@@ -774,8 +776,9 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
elif "inlineData" in part:
mime_type = part["inlineData"]["mimeType"]
data = part["inlineData"]["data"]
# Check if inline data is audio - if so, exclude from text content
if mime_type.startswith("audio/"):
# Check if inline data is audio or image - if so, exclude from text content
# Images and audio are now handled separately in their respective response fields
if mime_type.startswith("audio/") or mime_type.startswith("image/"):
continue
_content_str += "data:{};base64,{}".format(mime_type, data)
@@ -790,6 +793,23 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
content_str += _content_str
return content_str, reasoning_content_str
def _extract_image_response_from_parts(
self, parts: List[HttpxPartType]
) -> Optional[ImageURLObject]:
"""Extract image response from parts if present"""
for part in parts:
if "inlineData" in part:
mime_type = part["inlineData"]["mimeType"]
data = part["inlineData"]["data"]
if mime_type.startswith("image/"):
# Convert base64 data to data URI format
data_uri = f"data:{mime_type};base64,{data}"
return ImageURLObject(
url=data_uri,
detail="auto"
)
return None
def _extract_audio_response_from_parts(
self, parts: List[HttpxPartType]
@@ -1108,6 +1128,75 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
elif web_search_queries:
web_search_requests = len(grounding_metadata)
return web_search_requests
@staticmethod
def _create_streaming_choice(
chat_completion_message: ChatCompletionResponseMessage,
candidate: Candidates,
idx: int,
tools: Optional[List[ChatCompletionToolCallChunk]],
functions: Optional[ChatCompletionToolCallFunctionChunk],
chat_completion_logprobs: Optional[ChoiceLogprobs],
image_response: Optional[ImageURLObject],
) -> StreamingChoices:
"""
Helper method to create a streaming choice object for Vertex AI
"""
from litellm.types.utils import Delta, StreamingChoices
# create a streaming choice object
choice = StreamingChoices(
finish_reason=VertexGeminiConfig._check_finish_reason(
chat_completion_message, candidate.get("finishReason")
),
index=candidate.get("index", idx),
delta=Delta(
content=chat_completion_message.get("content"),
reasoning_content=chat_completion_message.get(
"reasoning_content"
),
tool_calls=tools,
image=image_response,
function_call=functions,
),
logprobs=chat_completion_logprobs,
enhancements=None,
)
return choice
@staticmethod
def _extract_candidate_metadata(candidate: Candidates) -> Tuple[List[dict], List[dict], List, List]:
"""
Extract metadata from a single candidate response.
Returns:
grounding_metadata: List[dict]
url_context_metadata: List[dict]
safety_ratings: List
citation_metadata: List
"""
grounding_metadata: List[dict] = []
url_context_metadata: List[dict] = []
safety_ratings: List = []
citation_metadata: List = []
if "groundingMetadata" in candidate:
if isinstance(candidate["groundingMetadata"], list):
grounding_metadata.extend(candidate["groundingMetadata"]) # type: ignore
else:
grounding_metadata.append(candidate["groundingMetadata"]) # type: ignore
if "safetyRatings" in candidate:
safety_ratings.append(candidate["safetyRatings"])
if "citationMetadata" in candidate:
citation_metadata.append(candidate["citationMetadata"])
if "urlContextMetadata" in candidate:
# Add URL context metadata to grounding metadata
url_context_metadata.append(cast(dict, candidate["urlContextMetadata"]))
return grounding_metadata, url_context_metadata, safety_ratings, citation_metadata
@staticmethod
def _process_candidates(
@@ -1131,6 +1220,7 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
grounding_metadata: List[dict] = []
url_context_metadata: List[dict] = []
image_response: Optional[ImageURLObject] = None
safety_ratings: List = []
citation_metadata: List = []
chat_completion_message: ChatCompletionResponseMessage = {"role": "assistant"}
@@ -1143,21 +1233,18 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
if "content" not in candidate:
continue
if "groundingMetadata" in candidate:
if isinstance(candidate["groundingMetadata"], list):
grounding_metadata.extend(candidate["groundingMetadata"]) # type: ignore
else:
grounding_metadata.append(candidate["groundingMetadata"]) # type: ignore
if "safetyRatings" in candidate:
safety_ratings.append(candidate["safetyRatings"])
if "citationMetadata" in candidate:
citation_metadata.append(candidate["citationMetadata"])
if "urlContextMetadata" in candidate:
# Add URL context metadata to grounding metadata
url_context_metadata.append(cast(dict, candidate["urlContextMetadata"]))
# Extract metadata using helper function
(
candidate_grounding_metadata,
candidate_url_context_metadata,
candidate_safety_ratings,
candidate_citation_metadata,
) = VertexGeminiConfig._extract_candidate_metadata(candidate)
grounding_metadata.extend(candidate_grounding_metadata)
url_context_metadata.extend(candidate_url_context_metadata)
safety_ratings.extend(candidate_safety_ratings)
citation_metadata.extend(candidate_citation_metadata)
if "parts" in candidate["content"]:
(
@@ -1172,18 +1259,25 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
parts=candidate["content"]["parts"]
)
)
image_response = (
VertexGeminiConfig()._extract_image_response_from_parts(
parts=candidate["content"]["parts"]
)
)
if audio_response is not None:
cast(Dict[str, Any], chat_completion_message)[
"audio"
] = audio_response
chat_completion_message["content"] = None # OpenAI spec
elif image_response is not None:
# Handle image response - combine with text content into structured format
cast(Dict[str, Any], chat_completion_message)["image"] = image_response
elif content is not None:
chat_completion_message["content"] = content
if reasoning_content is not None:
chat_completion_message["reasoning_content"] = reasoning_content
(
functions,
tools,
@@ -1206,24 +1300,14 @@ class VertexGeminiConfig(VertexAIBaseConfig, BaseConfig):
chat_completion_message["function_call"] = functions
if isinstance(model_response, ModelResponseStream):
from litellm.types.utils import Delta, StreamingChoices
# create a streaming choice object
choice = StreamingChoices(
finish_reason=VertexGeminiConfig._check_finish_reason(
chat_completion_message, candidate.get("finishReason")
),
index=candidate.get("index", idx),
delta=Delta(
content=chat_completion_message.get("content"),
reasoning_content=chat_completion_message.get(
"reasoning_content"
),
tool_calls=tools,
function_call=functions,
),
logprobs=chat_completion_logprobs,
enhancements=None,
choice = VertexGeminiConfig._create_streaming_choice(
chat_completion_message=chat_completion_message,
candidate=candidate,
idx=idx,
tools=tools,
functions=functions,
chat_completion_logprobs=chat_completion_logprobs,
image_response=image_response
)
model_response.choices.append(choice)
elif isinstance(model_response, ModelResponse):
+17
View File
@@ -51,6 +51,7 @@ from .llms.openai import (
ChatCompletionUsageBlock,
FileSearchTool,
FineTuningJob,
ImageURLObject,
OpenAIChatCompletionChunk,
OpenAIFileObject,
OpenAIRealtimeStreamList,
@@ -572,6 +573,7 @@ class Message(OpenAIObject):
tool_calls: Optional[List[ChatCompletionMessageToolCall]]
function_call: Optional[FunctionCall]
audio: Optional[ChatCompletionAudioResponse] = None
image: Optional[ImageURLObject] = None
reasoning_content: Optional[str] = None
thinking_blocks: Optional[
List[Union[ChatCompletionThinkingBlock, ChatCompletionRedactedThinkingBlock]]
@@ -588,6 +590,7 @@ class Message(OpenAIObject):
function_call=None,
tool_calls: Optional[list] = None,
audio: Optional[ChatCompletionAudioResponse] = None,
image: Optional[ImageURLObject] = None,
provider_specific_fields: Optional[Dict[str, Any]] = None,
reasoning_content: Optional[str] = None,
thinking_blocks: Optional[
@@ -621,6 +624,9 @@ class Message(OpenAIObject):
if audio is not None:
init_values["audio"] = audio
if image is not None:
init_values["image"] = image
if thinking_blocks is not None:
init_values["thinking_blocks"] = thinking_blocks
@@ -640,6 +646,10 @@ class Message(OpenAIObject):
# OpenAI compatible APIs like mistral API will raise an error if audio is passed in
if hasattr(self, "audio"):
del self.audio
if image is None:
if hasattr(self, "image"):
del self.image
if annotations is None:
# ensure default response matches OpenAI spec
@@ -693,6 +703,7 @@ class Delta(OpenAIObject):
function_call=None,
tool_calls=None,
audio: Optional[ChatCompletionAudioResponse] = None,
image: Optional[ImageURLObject] = None,
reasoning_content: Optional[str] = None,
thinking_blocks: Optional[
List[
@@ -710,6 +721,7 @@ class Delta(OpenAIObject):
self.function_call: Optional[Union[FunctionCall, Any]] = None
self.tool_calls: Optional[List[Union[ChatCompletionDeltaToolCall, Any]]] = None
self.audio: Optional[ChatCompletionAudioResponse] = None
self.image: Optional[ImageURLObject] = None
self.annotations: Optional[List[ChatCompletionAnnotation]] = None
if reasoning_content is not None:
@@ -729,6 +741,11 @@ class Delta(OpenAIObject):
self.annotations = annotations
else:
del self.annotations
if image is not None:
self.image = image
else:
del self.image
if function_call is not None and isinstance(function_call, dict):
self.function_call = FunctionCall(**function_call)
+54 -1
View File
@@ -261,7 +261,13 @@ def test_gemini_image_generation():
messages=[{"role": "user", "content": "Generate an image of a cat"}],
modalities=["image", "text"],
)
assert response.choices[0].message.content is not None
#########################################################
# Important: Validate we did get an image in the response
#########################################################
assert response.choices[0].message.image is not None
assert response.choices[0].message.image["url"] is not None
assert response.choices[0].message.image["url"].startswith("data:image/png;base64,")
def test_gemini_thinking():
@@ -571,3 +577,50 @@ def test_gemini_tool_use():
stop_reason = chunk.choices[0].finish_reason
assert stop_reason is not None
assert stop_reason == "tool_calls"
@pytest.mark.asyncio
async def test_gemini_image_generation_async():
#litellm._turn_on_debug()
response = await litellm.acompletion(
messages=[{"role": "user", "content": "Generate an image of a banana wearing a costume that says LiteLLM"}],
model="gemini/gemini-2.5-flash-image-preview",
)
CONTENT = response.choices[0].message.content
IMAGE_URL = response.choices[0].message.image
print("IMAGE_URL: ", IMAGE_URL)
assert CONTENT is not None
assert IMAGE_URL is not None
assert IMAGE_URL["url"] is not None
assert IMAGE_URL["url"].startswith("data:image/png;base64,")
@pytest.mark.asyncio
async def test_gemini_image_generation_async_stream():
#litellm._turn_on_debug()
response = await litellm.acompletion(
messages=[{"role": "user", "content": "Generate an image of a banana wearing a costume that says LiteLLM"}],
model="gemini/gemini-2.5-flash-image-preview",
stream=True,
)
print("RESPONSE: ", response)
model_response_image = None
async for chunk in response:
print("CHUNK: ", chunk)
if hasattr(chunk.choices[0].delta, "image") and chunk.choices[0].delta.image is not None:
model_response_image = chunk.choices[0].delta.image
print("MODEL_RESPONSE_IMAGE: ", model_response_image)
assert model_response_image is not None
assert model_response_image["url"].startswith("data:image/png;base64,")
break
#########################################################
# Important: Validate we did get an image in the response
#########################################################
assert model_response_image is not None
assert model_response_image["url"].startswith("data:image/png;base64,")
@@ -1 +1 @@
{"custom_id": "ae006110bb364606||/workspace/saved_models/meta-llama/Meta-Llama-3.1-8B-Instruct", "method": "POST", "url": "/v1/chat/completions", "body": {"model": "gpt-4o-mini", "temperature": 0, "max_tokens": 1024, "response_format": {"type": "json_object"}, "messages": [{"role": "user", "content": "# Instruction \n\nYou are an expert evaluator. Your task is to evaluate the quality of the responses generated by AI models. \nWe will provide you with the user query and an AI-generated responses.\nYo must respond in json"}]}}
{"custom_id": "ae006110bb364606||/workspace/saved_models/meta-llama/Meta-Llama-3.1-8B-Instruct", "method": "POST", "url": "/chat/completions", "body": {"model": "gpt-4o-mini", "temperature": 0, "max_tokens": 1024, "response_format": {"type": "json_object"}, "messages": [{"role": "user", "content": "# Instruction \n\nYou are an expert evaluator. Your task is to evaluate the quality of the responses generated by AI models. \nWe will provide you with the user query and an AI-generated responses.\nYo must respond in json"}]}}
+1 -70
View File
@@ -3,23 +3,9 @@
import importlib
import os
import sys
import tempfile
import random
import string
import pytest
# Set up a temporary log directory and file BEFORE importing litellm
temp_dir = tempfile.mkdtemp(prefix="litellm_test_")
test_log_file = os.path.join(temp_dir, "test_litellm.log")
# Store original log file for cleanup
orig_log_file = os.getenv("LITELLM_LOG_FILE")
# Set environment variables to use temporary files BEFORE importing litellm
os.environ["LITELLM_LOG_FILE"] = test_log_file
# Import litellm after setting up the environment
sys.path.insert(
0, os.path.abspath("../..")
) # Adds the parent directory to the system path
@@ -27,61 +13,6 @@ import asyncio
import litellm
@pytest.fixture(scope="function")
def temp_log_file():
"""
Creates a temporary log file in /tmp/litellm<random_number>.log for testing.
Returns the path to the temporary log file and cleans it up after the test.
"""
# Generate a random number for the log file
random_number = ''.join(random.choices(string.digits, k=8))
log_file_path = f"/tmp/litellm{random_number}.log"
# Set the environment variable for litellm to use this temporary log file
original_log_file = os.environ.get("LITELLM_LOG_FILE")
os.environ["LITELLM_LOG_FILE"] = log_file_path
yield log_file_path
# Cleanup: Restore original environment variable and remove the temporary file
if original_log_file is not None:
os.environ["LITELLM_LOG_FILE"] = original_log_file
else:
os.environ.pop("LITELLM_LOG_FILE", None)
# Remove the temporary log file if it exists
if os.path.exists(log_file_path):
try:
os.remove(log_file_path)
except OSError:
pass # Ignore errors if file can't be removed
@pytest.fixture(scope="session", autouse=True)
def cleanup_temp_log_dir():
"""
Cleans up the temporary log directory created at module import time.
This runs once per test session after all tests are complete.
"""
yield
if orig_log_file is not None:
os.environ["LITELLM_LOG_FILE"] = orig_log_file
else:
os.environ.pop("LITELLM_LOG_FILE", None)
# Cleanup: Remove the temporary directory created at module import time
if os.path.exists(temp_dir):
try:
# Remove the test log file first
if os.path.exists(test_log_file):
os.remove(test_log_file)
# Remove the temporary directory
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except OSError:
pass # Ignore errors if cleanup fails
@pytest.fixture(scope="session")
def event_loop():
@@ -94,6 +25,7 @@ def event_loop():
@pytest.fixture(scope="function", autouse=True)
def setup_and_teardown():
"""
@@ -145,4 +77,3 @@ def pytest_collection_modifyitems(config, items):
# Reorder the items list
items[:] = custom_logger_tests + other_tests
@@ -15,7 +15,10 @@ from typing import Optional
import litellm
from litellm.litellm_core_utils.litellm_logging import Logging
from litellm.litellm_core_utils.streaming_handler import CustomStreamWrapper
from litellm.litellm_core_utils.streaming_handler import (
AUDIO_ATTRIBUTE,
CustomStreamWrapper,
)
from litellm.types.utils import (
CompletionTokensDetailsWrapper,
Delta,
@@ -813,3 +816,220 @@ def test_optional_combine_thinking_block_with_none_content(
assert final_response.choices[0].delta.content == "</think>The answer is 42"
assert initialized_custom_stream_wrapper.sent_last_thinking_block is True
assert not hasattr(final_response.choices[0].delta, "reasoning_content")
def test_has_special_delta_content(
initialized_custom_stream_wrapper: CustomStreamWrapper,
):
"""Test the _has_special_delta_content helper method"""
# Test empty choices
empty_response = ModelResponseStream(
id="test", created=1742056047, model=None, choices=[]
)
assert not initialized_custom_stream_wrapper._has_special_delta_content(empty_response)
# Test with tool_calls (simulate with mock object)
tool_call_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content=None, tool_calls=[{"id": "test"}])
)
]
)
assert initialized_custom_stream_wrapper._has_special_delta_content(tool_call_response)
# Test with function_call (simulate with mock object)
function_call_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content=None, function_call={"name": "test_func"})
)
]
)
assert initialized_custom_stream_wrapper._has_special_delta_content(function_call_response)
# Test with audio (simulate by adding audio attribute)
audio_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content=None)
)
]
)
# Manually add audio attribute to delta
audio_response.choices[0].delta.audio = {"transcript": "test"}
assert initialized_custom_stream_wrapper._has_special_delta_content(audio_response)
# Test with image (simulate by adding image attribute)
image_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content=None)
)
]
)
# Manually add image attribute to delta
image_response.choices[0].delta.image = {"url": "test.jpg"}
assert initialized_custom_stream_wrapper._has_special_delta_content(image_response)
# Test with regular content (should return False)
regular_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content="Hello world")
)
]
)
assert not initialized_custom_stream_wrapper._has_special_delta_content(regular_response)
def test_handle_special_delta_content(
initialized_custom_stream_wrapper: CustomStreamWrapper,
):
"""Test the _handle_special_delta_content helper method"""
test_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content="test", role="assistant")
)
]
)
# The method should call strip_role_from_delta
result = initialized_custom_stream_wrapper._handle_special_delta_content(test_response)
# Should return the same response object (modified)
assert result is test_response
# Should have set sent_first_chunk to True
assert initialized_custom_stream_wrapper.sent_first_chunk is True
def test_has_any_special_delta_attributes(
initialized_custom_stream_wrapper: CustomStreamWrapper,
):
"""Test the _has_any_special_delta_attributes helper method"""
# Test with delta that has audio attribute
class MockDelta:
def __init__(self):
self.audio = {"transcript": "Hello world"}
audio_delta = MockDelta()
result = initialized_custom_stream_wrapper._has_any_special_delta_attributes(audio_delta)
assert result is True
# Test with delta that has image attribute
class MockDeltaImage:
def __init__(self):
self.image = {"url": "test.jpg"}
image_delta = MockDeltaImage()
result = initialized_custom_stream_wrapper._has_any_special_delta_attributes(image_delta)
assert result is True
# Test with delta that has no special attributes
class MockDeltaRegular:
def __init__(self):
self.content = "regular content"
regular_delta = MockDeltaRegular()
result = initialized_custom_stream_wrapper._has_any_special_delta_attributes(regular_delta)
assert result is False
def test_handle_special_delta_attributes(
initialized_custom_stream_wrapper: CustomStreamWrapper,
):
"""Test the _handle_special_delta_attributes helper method"""
# Create a model response
model_response = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content="test")
)
]
)
# Test with delta that has audio attribute
class MockDelta:
def __init__(self):
self.audio = {"transcript": "Hello world"}
audio_delta = MockDelta()
initialized_custom_stream_wrapper._handle_special_delta_attributes(audio_delta, model_response)
# Should copy the audio attribute
assert hasattr(model_response.choices[0].delta, "audio")
assert model_response.choices[0].delta.audio == {"transcript": "Hello world"}
# Test with delta that has image attribute
class MockDeltaImage:
def __init__(self):
self.image = {"url": "test.jpg"}
image_delta = MockDeltaImage()
model_response2 = ModelResponseStream(
id="test", created=1742056047, model=None,
choices=[
StreamingChoices(
finish_reason=None, index=0,
delta=Delta(content="test")
)
]
)
initialized_custom_stream_wrapper._handle_special_delta_attributes(image_delta, model_response2)
# Should copy the image attribute
assert hasattr(model_response2.choices[0].delta, "image")
assert model_response2.choices[0].delta.image == {"url": "test.jpg"}
def test_has_special_delta_attribute(
initialized_custom_stream_wrapper: CustomStreamWrapper,
):
"""Test the _has_special_delta_attribute helper method"""
# Test with None delta
assert not initialized_custom_stream_wrapper._has_special_delta_attribute(None, "audio")
# Test with delta that has the attribute
class MockDelta:
def __init__(self):
self.audio = {"transcript": "test"}
delta_with_audio = MockDelta()
assert initialized_custom_stream_wrapper._has_special_delta_attribute(delta_with_audio, "audio")
# Test with delta that doesn't have the attribute
class MockDeltaNoAudio:
def __init__(self):
self.content = "test"
delta_without_audio = MockDeltaNoAudio()
assert not initialized_custom_stream_wrapper._has_special_delta_attribute(delta_without_audio, "audio")
# Test with delta that has the attribute but it's None
class MockDeltaNone:
def __init__(self):
self.audio = None
delta_with_none = MockDeltaNone()
assert not initialized_custom_stream_wrapper._has_special_delta_attribute(delta_with_none, "audio")
-638
View File
@@ -1,638 +0,0 @@
import os
import tempfile
import re
import json
from pathlib import Path
from datetime import datetime
import pytest
# Import the loggers from litellm._logging
from litellm._logging import verbose_logger, verbose_proxy_logger, verbose_router_logger
class TestLoggingBehavior:
"""Test suite to verify logging behavior for all LiteLLM loggers."""
def read_log_file_contents(self, log_file_path):
"""Helper method to read and return contents of log file."""
if not os.path.exists(log_file_path):
return ""
with open(log_file_path, 'r') as f:
return f.read()
@pytest.fixture(autouse=True)
def setup_log_file(self, temp_log_file):
"""Use the temp_log_file fixture to ensure proper isolation."""
self.temp_log_path = temp_log_file
# Set environment variable before importing/reloading
original_log_file = os.environ.get("LITELLM_LOG_FILE")
os.environ["LITELLM_LOG_FILE"] = temp_log_file
# Force reload of the logging module to pick up new environment variable
import importlib
import litellm._logging
importlib.reload(litellm._logging)
yield
# Cleanup: Restore original environment variable
if original_log_file is not None:
os.environ["LITELLM_LOG_FILE"] = original_log_file
else:
os.environ.pop("LITELLM_LOG_FILE", None)
# Reload again to restore original state
importlib.reload(litellm._logging)
def test_verbose_logger_info_level(self):
"""Test that verbose_logger writes to file with INFO level."""
test_message = "INFO level test message from verbose_logger"
# Log at INFO level
verbose_logger.info(test_message)
# Force flush all handlers to ensure they write to disk
for handler in verbose_logger.handlers:
if hasattr(handler, 'flush'):
handler.flush()
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert test_message in log_contents, f"Message '{test_message}' should be found in log file"
def test_verbose_logger_debug_level(self):
"""Test that verbose_logger writes to file with DEBUG level."""
test_message = "DEBUG level test message from verbose_logger"
# Log at DEBUG level
verbose_logger.debug(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert test_message in log_contents, f"Message '{test_message}' should be found in log file"
def test_verbose_proxy_logger_info_level(self):
"""Test that verbose_proxy_logger writes to file with INFO level."""
test_message = "INFO level test message from verbose_proxy_logger"
# Log at INFO level
verbose_proxy_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert test_message in log_contents, f"Message '{test_message}' should be found in log file"
def test_verbose_proxy_logger_debug_level(self):
"""Test that verbose_proxy_logger writes to file with DEBUG level."""
test_message = "DEBUG level test message from verbose_proxy_logger"
# Log at DEBUG level
verbose_proxy_logger.debug(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert test_message in log_contents, f"Message '{test_message}' should be found in log file"
def test_verbose_router_logger_info_level(self):
"""Test that verbose_router_logger writes to file with INFO level."""
test_message = "INFO level test message from verbose_router_logger"
# Log at INFO level
verbose_router_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert test_message in log_contents, f"Message '{test_message}' should be found in log file"
def test_verbose_router_logger_debug_level(self):
"""Test that verbose_router_logger writes to file with DEBUG level."""
test_message = "DEBUG level test message from verbose_router_logger"
# Log at DEBUG level
verbose_router_logger.debug(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert test_message in log_contents, f"Message '{test_message}' should be found in log file"
def test_log_format_includes_timestamp_and_level(self):
"""Test that log entries include timestamp and level information."""
test_message = "Format test message"
# Log at INFO level
verbose_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
# Check for timestamp format (should be in HH:MM:SS format based on _logging.py)
assert re.search(r'\d{2}:\d{2}:\d{2}', log_contents), "Log should contain timestamp in HH:MM:SS format"
# Check for level information
assert 'INFO' in log_contents, "Log should contain INFO level indicator"
# Check for logger name
assert 'LiteLLM' in log_contents, "Log should contain LiteLLM logger name"
def test_multiple_loggers_write_to_same_file(self):
"""Test that all loggers write to the same file."""
messages = {
'verbose_logger': "Message from verbose_logger",
'verbose_proxy_logger': "Message from verbose_proxy_logger",
'verbose_router_logger': "Message from verbose_router_logger"
}
# Log messages from different loggers
verbose_logger.info(messages['verbose_logger'])
verbose_proxy_logger.info(messages['verbose_proxy_logger'])
verbose_router_logger.info(messages['verbose_router_logger'])
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
# Verify all messages are in the same file
for message in messages.values():
assert message in log_contents, f"Message '{message}' should be found in log file"
def test_log_file_is_not_empty(self):
"""Test that the log file is not empty after logging."""
# Log a message
verbose_logger.info("Test message to ensure file is not empty")
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
# Verify file is not empty
assert len(log_contents.strip()) > 0, "Log file should not be empty after logging"
class TestJSONLoggingBehavior:
"""Test suite to verify JSON logging behavior for all LiteLLM loggers."""
def read_log_file_contents(self, log_file_path):
"""Helper method to read and return contents of log file."""
if not os.path.exists(log_file_path):
return ""
with open(log_file_path, 'r') as f:
return f.read()
@pytest.fixture(autouse=True)
def setup_json_logging(self, temp_log_file):
"""Set up JSON logging environment and ensure proper isolation."""
self.temp_log_path = temp_log_file
# Store original environment variables
original_log_file = os.environ.get("LITELLM_LOG_FILE")
original_json_logs = os.environ.get("JSON_LOGS")
# Set environment variables for JSON logging
os.environ["LITELLM_LOG_FILE"] = temp_log_file
os.environ["JSON_LOGS"] = "True"
# Force reload of the logging module to pick up new environment variables
import importlib
import litellm._logging
importlib.reload(litellm._logging)
yield
# Cleanup: Restore original environment variables
if original_log_file is not None:
os.environ["LITELLM_LOG_FILE"] = original_log_file
else:
os.environ.pop("LITELLM_LOG_FILE", None)
if original_json_logs is not None:
os.environ["JSON_LOGS"] = original_json_logs
else:
os.environ.pop("JSON_LOGS", None)
# Reload again to restore original state
importlib.reload(litellm._logging)
def test_verbose_logger_json_info_level(self):
"""Test that verbose_logger writes JSON formatted logs at INFO level."""
test_message = "JSON INFO level test message from verbose_logger"
# Log at INFO level
verbose_logger.info(test_message)
# Force flush all handlers to ensure they write to disk
for handler in verbose_logger.handlers:
if hasattr(handler, 'flush'):
handler.flush()
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify structure
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
assert len(log_lines) > 0, "Should have at least one log line"
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
# Verify JSON structure
assert "message" in target_log, "JSON log should contain 'message' field"
assert "level" in target_log, "JSON log should contain 'level' field"
assert "timestamp" in target_log, "JSON log should contain 'timestamp' field"
# Verify content
assert target_log["message"] == test_message
assert target_log["level"] == "INFO"
# Verify timestamp is in ISO 8601 format
timestamp_str = target_log["timestamp"]
try:
datetime.fromisoformat(timestamp_str)
except ValueError:
pytest.fail(f"Timestamp '{timestamp_str}' is not in valid ISO 8601 format")
def test_verbose_logger_json_debug_level(self):
"""Test that verbose_logger writes JSON formatted logs at DEBUG level."""
test_message = "JSON DEBUG level test message from verbose_logger"
# Log at DEBUG level
verbose_logger.debug(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify structure
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
assert target_log["level"] == "DEBUG"
def test_verbose_proxy_logger_json_info_level(self):
"""Test that verbose_proxy_logger writes JSON formatted logs at INFO level."""
test_message = "JSON INFO level test message from verbose_proxy_logger"
# Log at INFO level
verbose_proxy_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify structure
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
# Verify JSON structure and content
assert target_log["message"] == test_message
assert target_log["level"] == "INFO"
# Verify timestamp is in ISO 8601 format
timestamp_str = target_log["timestamp"]
try:
datetime.fromisoformat(timestamp_str)
except ValueError:
pytest.fail(f"Timestamp '{timestamp_str}' is not in valid ISO 8601 format")
def test_verbose_proxy_logger_json_debug_level(self):
"""Test that verbose_proxy_logger writes JSON formatted logs at DEBUG level."""
test_message = "JSON DEBUG level test message from verbose_proxy_logger"
# Log at DEBUG level
verbose_proxy_logger.debug(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify structure
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
assert target_log["level"] == "DEBUG"
def test_verbose_router_logger_json_info_level(self):
"""Test that verbose_router_logger writes JSON formatted logs at INFO level."""
test_message = "JSON INFO level test message from verbose_router_logger"
# Log at INFO level
verbose_router_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify structure
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
# Verify JSON structure and content
assert target_log["message"] == test_message
assert target_log["level"] == "INFO"
# Verify timestamp is in ISO 8601 format
timestamp_str = target_log["timestamp"]
try:
datetime.fromisoformat(timestamp_str)
except ValueError:
pytest.fail(f"Timestamp '{timestamp_str}' is not in valid ISO 8601 format")
def test_verbose_router_logger_json_debug_level(self):
"""Test that verbose_router_logger writes JSON formatted logs at DEBUG level."""
test_message = "JSON DEBUG level test message from verbose_router_logger"
# Log at DEBUG level
verbose_router_logger.debug(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify structure
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
assert target_log["level"] == "DEBUG"
def test_json_output_is_valid_json(self):
"""Test that all JSON log output can be parsed as valid JSON."""
test_messages = [
"JSON test message 1",
"JSON test message 2",
"JSON test message 3"
]
# Log messages from all loggers
verbose_logger.info(test_messages[0])
verbose_proxy_logger.info(test_messages[1])
verbose_router_logger.info(test_messages[2])
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse each line as JSON
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
parsed_logs = []
for line in log_lines:
try:
parsed = json.loads(line)
parsed_logs.append(parsed)
except json.JSONDecodeError as e:
pytest.fail(f"Failed to parse JSON log line: {line}. Error: {e}")
assert len(parsed_logs) >= len(test_messages), f"Should have at least {len(test_messages)} parsed log entries"
# Verify each parsed log has required fields
for parsed_log in parsed_logs:
assert isinstance(parsed_log, dict), "Parsed log should be a dictionary"
assert "message" in parsed_log, "Each log should have a 'message' field"
assert "level" in parsed_log, "Each log should have a 'level' field"
assert "timestamp" in parsed_log, "Each log should have a 'timestamp' field"
def test_json_timestamp_iso8601_format(self):
"""Test that JSON log timestamps are in ISO 8601 format."""
test_message = "Timestamp format test message"
# Log a message
verbose_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify timestamp format
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
timestamp_str = target_log["timestamp"]
# Verify timestamp can be parsed as ISO 8601
try:
parsed_timestamp = datetime.fromisoformat(timestamp_str)
assert isinstance(parsed_timestamp, datetime), "Parsed timestamp should be a datetime object"
except ValueError as e:
pytest.fail(f"Timestamp '{timestamp_str}' is not in valid ISO 8601 format. Error: {e}")
# Verify timestamp format matches expected pattern (YYYY-MM-DDTHH:MM:SS.ffffff)
import re
iso8601_pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?$'
assert re.match(iso8601_pattern, timestamp_str), f"Timestamp '{timestamp_str}' does not match ISO 8601 pattern"
def test_json_logs_contain_expected_fields(self):
"""Test that JSON logs contain all expected fields with correct types."""
test_message = "Field validation test message"
# Log a message
verbose_logger.info(test_message)
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse JSON and verify fields
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
# Find the line containing our test message
target_log = None
for line in log_lines:
try:
parsed = json.loads(line)
if parsed.get("message") == test_message:
target_log = parsed
break
except json.JSONDecodeError:
continue
assert target_log is not None, f"Could not find JSON log entry with message: {test_message}"
# Verify required fields exist and have correct types
assert "message" in target_log, "JSON log should contain 'message' field"
assert "level" in target_log, "JSON log should contain 'level' field"
assert "timestamp" in target_log, "JSON log should contain 'timestamp' field"
assert isinstance(target_log["message"], str), "'message' field should be a string"
assert isinstance(target_log["level"], str), "'level' field should be a string"
assert isinstance(target_log["timestamp"], str), "'timestamp' field should be a string"
# Verify field values
assert target_log["message"] == test_message
assert target_log["level"] in ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], "Level should be a valid log level"
def test_multiple_json_loggers_write_to_same_file(self):
"""Test that all loggers write JSON formatted logs to the same file."""
messages = {
'verbose_logger': "JSON message from verbose_logger",
'verbose_proxy_logger': "JSON message from verbose_proxy_logger",
'verbose_router_logger': "JSON message from verbose_router_logger"
}
# Log messages from different loggers
verbose_logger.info(messages['verbose_logger'])
verbose_proxy_logger.info(messages['verbose_proxy_logger'])
verbose_router_logger.info(messages['verbose_router_logger'])
# Read log file contents
log_file_path = os.environ.get("LITELLM_LOG_FILE")
assert log_file_path is not None, "LITELLM_LOG_FILE environment variable should be set"
log_contents = self.read_log_file_contents(log_file_path)
assert log_contents.strip(), "Log file should not be empty"
# Parse all JSON logs
log_lines = [line.strip() for line in log_contents.strip().split('\n') if line.strip()]
parsed_logs = []
for line in log_lines:
try:
parsed = json.loads(line)
parsed_logs.append(parsed)
except json.JSONDecodeError:
continue
# Find logs for each message
found_messages = set()
for parsed_log in parsed_logs:
message = parsed_log.get("message", "")
if message in messages.values():
found_messages.add(message)
# Verify all messages are found in JSON format
for message in messages.values():
assert message in found_messages, f"Message '{message}' should be found in JSON logs"
@@ -450,6 +450,38 @@ const ChatUI: React.FC<ChatUIProps> = ({
]);
};
const updateChatImageUI = (imageUrl: string, model?: string) => {
setChatHistory((prev) => {
const last = prev[prev.length - 1];
// If the last message is from assistant and has content, add image to it
if (last && last.role === "assistant" && !last.isImage) {
const updated = {
...last,
image: {
url: imageUrl,
detail: "auto"
},
model: last.model ?? model
};
return [...prev.slice(0, -1), updated];
} else {
// Otherwise create a new assistant message with just the image
return [
...prev,
{
role: "assistant",
content: "",
model,
image: {
url: imageUrl,
detail: "auto"
}
}
];
}
});
};
const handleKeyDown = (event: React.KeyboardEvent<HTMLTextAreaElement>) => {
if (event.key === 'Enter' && !event.shiftKey) {
event.preventDefault(); // Prevent default to avoid newline
@@ -611,7 +643,8 @@ const ChatUI: React.FC<ChatUIProps> = ({
traceId,
selectedVectorStores.length > 0 ? selectedVectorStores : undefined,
selectedGuardrails.length > 0 ? selectedGuardrails : undefined,
selectedMCPTools // Pass the selected tool directly
selectedMCPTools, // Pass the selected tool directly
updateChatImageUI // Pass the image callback
);
} else if (endpointType === EndpointType.IMAGE) {
// For image generation
@@ -1057,6 +1090,18 @@ const ChatUI: React.FC<ChatUIProps> = ({
>
{typeof message.content === "string" ? message.content : ""}
</ReactMarkdown>
{/* Show generated image from chat completions */}
{message.image && (
<div className="mt-3">
<img
src={message.image.url}
alt="Generated image"
className="max-w-full rounded-md border border-gray-200 shadow-sm"
style={{ maxHeight: '500px' }}
/>
</div>
)}
</>
)}
@@ -17,7 +17,8 @@ export async function makeOpenAIChatCompletionRequest(
traceId?: string,
vector_store_ids?: string[],
guardrails?: string[],
selectedMCPTool?: string
selectedMCPTool?: string,
onImageGenerated?: (imageUrl: string, model?: string) => void
) {
// base url should be the current base_url
const isLocal = process.env.NODE_ENV === "development";
@@ -103,6 +104,12 @@ export async function makeOpenAIChatCompletionRequest(
fullResponseContent += content;
}
// Process image generation if present
if (delta && delta.image && onImageGenerated) {
console.log("Image generated:", delta.image);
onImageGenerated(delta.image.url, chunk.model);
}
// Process reasoning content if present - using type assertion
if (delta && delta.reasoning_content) {
const reasoningContent = delta.reasoning_content;
@@ -7,6 +7,10 @@ export interface Delta {
audio?: any;
refusal?: any;
provider_specific_fields?: any;
image?: {
url: string;
detail: string;
};
}
export interface CompletionTokensDetails {
@@ -67,6 +71,10 @@ export interface MessageType {
};
toolName?: string;
imagePreviewUrl?: string; // For storing image preview URL in chat history
image?: {
url: string;
detail: string;
};
}
export interface MultimodalContent {