more examples

This commit is contained in:
Logan Markewich
2024-10-08 22:54:26 -06:00
parent 044e768c35
commit a8d6b7a430
12 changed files with 2604 additions and 179 deletions
+2
View File
@@ -0,0 +1,2 @@
*.pyc
__pycache__
+16 -3
View File
@@ -1,5 +1,9 @@
# OpenAI Realtime API Client for Python
This is an experimental OpenAI Realtime API client for Python and LlamaIndex. It integrates with LlamaIndex's tools, allowing you to quickly build custom voice assistants.
Include two examples that run directly in the terminal -- using both manual and Server VAD mode (i.e. allowing you to interrupt the chatbot).
## Installation
Install system deps:
@@ -11,7 +15,8 @@ brew install ffmpeg
Install python deps:
```bash
pip install -U openai pyaudio pynput pydub websockets
git clone https://github.com/run-llama/openai_realtime_client.git
pip install -e .
```
Set your openai key:
@@ -22,8 +27,16 @@ export OPENAI_API_KEY="sk-..."
## Usage
Run the interactive CLI:
Run the interactive CLI with manual VAD:
```bash
python ./cli.py
python ./examples/manual_cli.py
```
Or to use streaming mode (which allows you to interrupt the chatbot):
```bash
python ./examples/streaming_cli.py
```
**NOTE:** Streaming mode can be a little janky, best to use headphones in a quiet environment.
+86
View File
@@ -0,0 +1,86 @@
import asyncio
import os
from pynput import keyboard
from openai_realtime_client import RealtimeClient, InputHandler, AudioHandler
from llama_index.core.tools import FunctionTool
# Add your own tools here!
# def get_my_phone_number(name: str) -> str:
# """Get my phone number."""
# return "1234567890"
# tools = [FunctionTool.from_defaults(fn=get_my_phone_number)]
tools = []
async def main():
# Initialize handlers
audio_handler = AudioHandler()
input_handler = InputHandler()
input_handler.loop = asyncio.get_running_loop()
# Initialize the realtime client
client = RealtimeClient(
api_key=os.environ.get("OPENAI_API_KEY"),
on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
on_audio_delta=lambda audio: audio_handler.play_audio(audio),
tools=tools,
)
# Start keyboard listener in a separate thread
listener = keyboard.Listener(on_press=input_handler.on_press)
listener.start()
try:
# Connect to the API
await client.connect()
# Start message handling in the background
message_handler = asyncio.create_task(client.handle_messages())
print("Connected to OpenAI Realtime API!")
print("Commands:")
print("- Type your message and press Enter to send text")
print("- Press 'r' to start recording audio")
print("- Press 'space' to stop recording")
print("- Press 'q' to quit")
print("")
while True:
# Wait for commands from the input handler
command, data = await input_handler.command_queue.get()
if command == 'q':
break
elif command == 'r':
# Start recording
audio_handler.start_recording()
elif command == 'space':
print("[About to stop recording]")
if audio_handler.recording:
# Stop recording and get audio data
audio_data = audio_handler.stop_recording()
print("[Recording stopped]")
if audio_data:
await client.send_audio(audio_data)
print("[Audio sent]")
elif command == 'enter' and data:
# Send text message
await client.send_text(data)
await asyncio.sleep(0.01)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
listener.stop()
audio_handler.cleanup()
await client.close()
if __name__ == "__main__":
# Install required packages:
# pip install pyaudio pynput pydub websockets
print("Starting Realtime API CLI...")
asyncio.run(main())
+61
View File
@@ -0,0 +1,61 @@
import asyncio
import os
from pynput import keyboard
from openai_realtime_client import RealtimeClient, AudioHandler, InputHandler, TurnDetectionMode
from llama_index.core.tools import FunctionTool
# Add your own tools here!
def get_my_phone_number(name: str) -> str:
"""Get my phone number."""
return "1234567890"
tools = [FunctionTool.from_defaults(fn=get_my_phone_number)]
# tools = []
async def main():
audio_handler = AudioHandler()
input_handler = InputHandler()
input_handler.loop = asyncio.get_running_loop()
client = RealtimeClient(
api_key=os.environ.get("OPENAI_API_KEY"),
on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
on_audio_delta=lambda audio: audio_handler.play_audio(audio),
turn_detection_mode=TurnDetectionMode.SERVER_VAD,
tools=tools,
)
# Start keyboard listener in a separate thread
listener = keyboard.Listener(on_press=input_handler.on_press)
listener.start()
try:
await client.connect()
message_handler = asyncio.create_task(client.handle_messages())
print("Connected to OpenAI Realtime API!")
print("Audio streaming will start automatically.")
print("Press 'q' to quit")
print("")
# Start continuous audio streaming
streaming_task = asyncio.create_task(audio_handler.start_streaming(client))
# Simple input loop for quit command
while True:
command, _ = await input_handler.command_queue.get()
if command == 'q':
break
except Exception as e:
print(f"Error: {e}")
finally:
audio_handler.stop_streaming()
audio_handler.cleanup()
await client.close()
if __name__ == "__main__":
print("Starting Realtime API CLI with Server VAD...")
asyncio.run(main())
+5
View File
@@ -0,0 +1,5 @@
from .client.realtime_client import RealtimeClient, TurnDetectionMode
from .handlers.audio_handler import AudioHandler
from .handlers.input_handler import InputHandler
__all__ = ["RealtimeClient", "TurnDetectionMode", "AudioHandler", "InputHandler"]
@@ -0,0 +1,3 @@
from .realtime_client import RealtimeClient
__all__ = ["RealtimeClient"]
@@ -1,11 +1,13 @@
import websockets
import json
import base64
import asyncio
import io
from typing import Optional, Callable, List, Dict, Any
from enum import Enum
from pydub import AudioSegment
import io
from llama_index.core.tools import BaseTool, AsyncBaseTool, ToolSelection, adapt_to_async_tool, acall_tool_with_selection
class TurnDetectionMode(Enum):
@@ -13,15 +15,34 @@ class TurnDetectionMode(Enum):
MANUAL = "manual"
class RealtimeClient:
"""
A client for interacting with the OpenAI Realtime API.
This class provides methods to connect to the Realtime API, send text and audio data,
handle responses, and manage the WebSocket connection.
Attributes:
api_key (str): The API key for authentication.
model (str): The model to use for text and audio processing.
voice (str): The voice to use for audio output.
instructions (str): The instructions for the chatbot.
turn_detection_mode (TurnDetectionMode): The mode for turn detection.
tools (List[BaseTool]): The tools to use for function calling.
on_text_delta (Callable[[str], None]): Callback for text delta events.
on_audio_delta (Callable[[bytes], None]): Callback for audio delta events.
extra_event_handlers (Dict[str, Callable[[Dict[str, Any]], None]]): Additional event handlers.
"""
def __init__(
self,
api_key: str,
model: str = "gpt-4o-realtime-preview-2024-10-01",
voice: str = "alloy",
instructions: str = "You are a helpful assistant",
turn_detection_mode: TurnDetectionMode = TurnDetectionMode.MANUAL,
tools: Optional[List[BaseTool]] = None,
on_text_delta: Optional[Callable[[str], None]] = None,
on_audio_delta: Optional[Callable[[bytes], None]] = None,
on_function_call: Optional[Callable[[Dict[str, Any]], None]] = None
extra_event_handlers: Optional[Dict[str, Callable[[Dict[str, Any]], None]]] = None
):
self.api_key = api_key
self.model = model
@@ -29,10 +50,15 @@ class RealtimeClient:
self.ws = None
self.on_text_delta = on_text_delta
self.on_audio_delta = on_audio_delta
self.on_function_call = on_function_call
self.instructions = instructions
self.base_url = "wss://api.openai.com/v1/realtime"
self.conversation_history = []
self.extra_event_handlers = extra_event_handlers or {}
self.turn_detection_mode = turn_detection_mode
tools = tools or []
for i, tool in enumerate(tools):
tools[i] = adapt_to_async_tool(tool)
self.tools: List[AsyncBaseTool] = tools
async def connect(self) -> None:
"""Establish WebSocket connection with the Realtime API."""
@@ -45,20 +71,47 @@ class RealtimeClient:
self.ws = await websockets.connect(url, extra_headers=headers)
# Set up default session configuration
await self.update_session({
"modalities": ["text", "audio"],
"instructions": self.instructions,
"voice": self.voice,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1"
},
"tools": [],
"tool_choice": "auto",
"temperature": 0.8,
}
)
tools = [t.metadata.to_openai_tool()['function'] for t in self.tools]
for t in tools:
t['type'] = 'function' # TODO: OpenAI docs didn't say this was needed, but it was
if self.turn_detection_mode == TurnDetectionMode.MANUAL:
await self.update_session({
"modalities": ["text", "audio"],
"instructions": self.instructions,
"voice": self.voice,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1"
},
"tools": tools,
"tool_choice": "auto",
"temperature": 0.8,
})
elif self.turn_detection_mode == TurnDetectionMode.SERVER_VAD:
await self.update_session({
"modalities": ["text", "audio"],
"instructions": self.instructions,
"voice": self.voice,
"input_audio_format": "pcm16",
"output_audio_format": "pcm16",
"input_audio_transcription": {
"model": "whisper-1"
},
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 500,
"silence_duration_ms": 200
},
"tools": tools,
"tool_choice": "auto",
"temperature": 0.8,
})
else:
raise ValueError(f"Invalid turn detection mode: {self.turn_detection_mode}")
async def update_session(self, config: Dict[str, Any]) -> None:
"""Update session configuration."""
@@ -105,10 +158,21 @@ class RealtimeClient:
await self.ws.send(json.dumps(commit_event))
# In manual mode, we need to explicitly request a response
await self.create_response()
if self.turn_detection_mode == TurnDetectionMode.MANUAL:
await self.create_response()
async def stream_audio(self, audio_chunk: bytes) -> None:
"""Stream raw PCM audio data to the API."""
audio_b64 = base64.b64encode(audio_chunk).decode()
append_event = {
"type": "input_audio_buffer.append",
"audio": audio_b64
}
await self.ws.send(json.dumps(append_event))
async def create_response(self, functions: Optional[List[Dict[str, Any]]] = None) -> None:
"""Request a response from the API."""
"""Request a response from the API. Needed when using manual mode."""
event = {
"type": "response.create",
"response": {
@@ -120,24 +184,36 @@ class RealtimeClient:
await self.ws.send(json.dumps(event))
async def send_function_result(self, function_call_id: str, result: Any) -> None:
async def send_function_result(self, call_id: str, result: Any) -> None:
"""Send function call result back to the API."""
event = {
"type": "conversation.item.create",
"item": {
"type": "function_call_output",
"function_call_id": function_call_id,
"content": result
"call_id": call_id,
"output": result
}
}
await self.ws.send(json.dumps(event))
# functions need a manual response
await self.create_response()
async def cancel_response(self) -> None:
"""Cancel the current response."""
event = {
"type": "response.cancel"
}
await self.ws.send(json.dumps(event))
async def call_tool(self, call_id: str,tool_name: str, tool_arguments: Dict[str, Any]) -> None:
tool_selection = ToolSelection(
tool_id="tool_id",
tool_name=tool_name,
tool_kwargs=tool_arguments
)
tool_result = await acall_tool_with_selection(tool_selection, self.tools)
await self.send_function_result(call_id, str(tool_result))
async def handle_messages(self) -> None:
"""Main message handling loop."""
@@ -146,8 +222,6 @@ class RealtimeClient:
event = json.loads(message)
event_type = event.get("type")
print(f"Event: {event_type}")
if event_type == "error":
print(f"Error: {event['error']}")
continue
@@ -162,11 +236,11 @@ class RealtimeClient:
self.on_audio_delta(audio_bytes)
elif event_type == "response.function_call_arguments.done":
if self.on_function_call:
self.on_function_call({
"name": event["name"],
"arguments": event["arguments"]
})
print(f"Function call arguments done: {event}")
await self.call_tool(event["call_id"], event['name'], json.loads(event['arguments']))
elif event_type in self.extra_event_handlers:
self.extra_event_handlers[event_type](event)
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
@@ -177,40 +251,3 @@ class RealtimeClient:
"""Close the WebSocket connection."""
if self.ws:
await self.ws.close()
# Example usage:
async def main():
def on_text(text: str):
print(f"Text received: {text}")
def on_audio(audio: bytes):
# Handle audio chunks (e.g., play them or save to file)
pass
def on_function_call(func_data: Dict[str, Any]):
print(f"Function call: {func_data}")
client = RealtimeClient(
api_key="your-api-key",
on_text_delta=on_text,
on_audio_delta=on_audio,
on_function_call=on_function_call
)
try:
await client.connect()
# Start message handling in the background
message_handler = asyncio.create_task(client.handle_messages())
# Send a text message
await client.send_text("Hello! How are you today?")
# Wait for a while to receive responses
await asyncio.sleep(5)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,4 @@
from .input_handler import InputHandler
from .audio_handler import AudioHandler
__all__ = ["InputHandler", "AudioHandler"]
@@ -3,16 +3,37 @@ import pyaudio
import wave
import queue
import io
import os
from typing import Optional
from pydub import AudioSegment
from pynput import keyboard
import threading
from realtime_client import RealtimeClient
from ..client.realtime_client import RealtimeClient
class AudioHandler:
"""
Handles audio input and output for the chatbot.
Uses PyAudio for audio input and output, and runs a separate thread for recording and playing audio.
When playing audio, it uses a buffer to store audio data and plays it continuously to ensure smooth playback.
Attributes:
format (int): The audio format (paInt16).
channels (int): The number of audio channels (1).
rate (int): The sample rate (24000).
chunk (int): The size of the audio buffer (1024).
audio (pyaudio.PyAudio): The PyAudio object.
recording_stream (pyaudio.Stream): The stream for recording audio.
recording_thread (threading.Thread): The thread for recording audio.
recording (bool): Whether the audio is currently being recorded.
streaming (bool): Whether the audio is currently being streamed.
stream (pyaudio.Stream): The stream for streaming audio.
playback_stream (pyaudio.Stream): The stream for playing audio.
playback_buffer (queue.Queue): The buffer for playing audio.
stop_playback (bool): Whether the audio playback should be stopped.
"""
def __init__(self):
# Audio parameters
self.format = pyaudio.paInt16
@@ -20,12 +41,17 @@ class AudioHandler:
self.rate = 24000
self.chunk = 1024
# Recording params
self.audio = pyaudio.PyAudio()
# Recording params
self.recording_stream: Optional[pyaudio.Stream] = None
self.recording_thread = None
self.recording = False
# streaming params
self.streaming = False
self.stream = None
# Playback params
self.playback_stream = None
self.playback_buffer = queue.Queue(maxsize=20)
@@ -90,6 +116,41 @@ class AudioHandler:
wav_buffer.seek(0)
return wav_buffer.read()
async def start_streaming(self, client: RealtimeClient):
"""Start continuous audio streaming."""
if self.streaming:
return
self.streaming = True
self.stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk
)
print("\nStreaming audio... Press 'q' to stop.")
while self.streaming:
try:
# Read raw PCM data
data = self.stream.read(self.chunk, exception_on_overflow=False)
# Stream directly without trying to decode
await client.stream_audio(data)
except Exception as e:
print(f"Error streaming: {e}")
break
await asyncio.sleep(0.01)
def stop_streaming(self):
"""Stop audio streaming."""
self.streaming = False
if self.stream:
self.stream.stop_stream()
self.stream.close()
self.stream = None
def play_audio(self, audio_data: bytes):
"""Add audio data to the buffer"""
try:
@@ -154,109 +215,9 @@ class AudioHandler:
if self.recording_stream:
self.recording_stream.stop_stream()
self.recording_stream.close()
self.audio.terminate()
class InputHandler:
def __init__(self):
self.text_input = ""
self.text_ready = asyncio.Event()
self.command_queue = asyncio.Queue()
self.loop = None
def on_press(self, key):
try:
if key == keyboard.Key.space:
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('space', None)
)
elif key == keyboard.Key.enter:
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('enter', self.text_input)
)
self.text_input = ""
elif key == keyboard.KeyCode.from_char('r'):
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('r', None)
)
elif key == keyboard.KeyCode.from_char('q'):
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('q', None)
)
elif hasattr(key, 'char'):
if key == keyboard.Key.backspace:
self.text_input = self.text_input[:-1]
else:
self.text_input += key.char
except AttributeError:
pass
async def main():
# Initialize handlers
audio_handler = AudioHandler()
input_handler = InputHandler()
input_handler.loop = asyncio.get_running_loop()
# Initialize the realtime client
client = RealtimeClient(
api_key=os.environ.get("OPENAI_API_KEY"),
on_text_delta=lambda text: print(f"\nAssistant: {text}", end="", flush=True),
on_audio_delta=lambda audio: audio_handler.play_audio(audio)
)
# Start keyboard listener in a separate thread
listener = keyboard.Listener(on_press=input_handler.on_press)
listener.start()
try:
# Connect to the API
await client.connect()
# Start message handling in the background
message_handler = asyncio.create_task(client.handle_messages())
print("Connected to OpenAI Realtime API!")
print("Commands:")
print("- Type your message and press Enter to send text")
print("- Press 'r' to start recording audio")
print("- Press 'space' to stop recording")
print("- Press 'q' to quit")
print("")
while True:
# Wait for commands from the input handler
command, data = await input_handler.command_queue.get()
if command == 'q':
break
elif command == 'r':
# Start recording
audio_handler.start_recording()
elif command == 'space':
print("[About to stop recording]")
if audio_handler.recording:
# Stop recording and get audio data
audio_data = audio_handler.stop_recording()
print("[Recording stopped]")
if audio_data:
await client.send_audio(audio_data)
print("[Audio sent]")
elif command == 'enter' and data:
# Send text message
await client.send_text(data)
if self.stream:
self.stream.stop_stream()
self.stream.close()
await asyncio.sleep(0.01)
except Exception as e:
print(f"Error: {e}")
finally:
# Clean up
listener.stop()
audio_handler.cleanup()
await client.close()
if __name__ == "__main__":
# Install required packages:
# pip install pyaudio pynput pydub websockets
print("Starting Realtime API CLI...")
asyncio.run(main())
self.audio.terminate()
@@ -0,0 +1,48 @@
import asyncio
from pynput import keyboard
class InputHandler:
"""
Handles keyboard input for the chatbot.
This class is responsible for capturing keyboard input and translating it into commands for the chatbot.
Attributes:
text_input (str): The current text input from the user.
text_ready (asyncio.Event): An event that is set when the user has finished typing.
command_queue (asyncio.Queue): A queue that stores commands for the chatbot.
loop (asyncio.AbstractEventLoop): The event loop for the input handler.
"""
def __init__(self):
self.text_input = ""
self.text_ready = asyncio.Event()
self.command_queue = asyncio.Queue()
self.loop = None
def on_press(self, key):
try:
if key == keyboard.Key.space:
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('space', None)
)
elif key == keyboard.Key.enter:
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('enter', self.text_input)
)
self.text_input = ""
elif key == keyboard.KeyCode.from_char('r'):
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('r', None)
)
elif key == keyboard.KeyCode.from_char('q'):
self.loop.call_soon_threadsafe(
self.command_queue.put_nowait, ('q', None)
)
elif hasattr(key, 'char'):
if key == keyboard.Key.backspace:
self.text_input = self.text_input[:-1]
else:
self.text_input += key.char
except AttributeError:
pass
Generated
+2185
View File
File diff suppressed because it is too large Load Diff
+20
View File
@@ -0,0 +1,20 @@
[tool.poetry]
name = "openai-realtime-client"
version = "0.1.0"
description = "A python-based client for OpenAI's Realtime API"
authors = ["Logan Markewich <logan@runllama.ai>"]
license = "MIT"
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
llama-index-core = "^0.11.17"
pyaudio = "^0.2.14"
pynput = "^1.7.7"
pydub = "^0.25.1"
websockets = "^13.1"
wave = "^0.0.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"