Skip to content

Update main.py #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 24 additions & 39 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,19 @@

@app.get("/", response_class=JSONResponse)
async def index_page():
print("Root endpoint accessed.")
return {"message": "Twilio Media Stream Server is running!"}

@app.api_route("/incoming-call", methods=["GET", "POST"])
async def handle_incoming_call(request: Request):
"""Handle incoming call and return TwiML response to connect to Media Stream."""
print("Incoming call received.")
response = VoiceResponse()
# <Say> punctuation to improve text-to-speech flow
response.say("Please wait while we connect your call to the A. I. voice assistant, powered by Twilio and the Open-A.I. Realtime API")
response.pause(length=1)
response.say("O.K. you can start talking!")
host = request.url.hostname
print(f"Connecting to media stream at wss://{host}/media-stream")
connect = Connect()
connect.stream(url=f'wss://{host}/media-stream')
response.append(connect)
Expand All @@ -55,7 +57,7 @@ async def handle_incoming_call(request: Request):
@app.websocket("/media-stream")
async def handle_media_stream(websocket: WebSocket):
"""Handle WebSocket connections between Twilio and OpenAI."""
print("Client connected")
print("WebSocket client connected.")
await websocket.accept()

async with websockets.connect(
Expand All @@ -65,9 +67,9 @@ async def handle_media_stream(websocket: WebSocket):
"OpenAI-Beta": "realtime=v1"
}
) as openai_ws:
print("Connected to OpenAI WebSocket.")
await initialize_session(openai_ws)

# Connection specific state
stream_sid = None
latest_media_timestamp = 0
last_assistant_item = None
Expand All @@ -77,41 +79,50 @@ async def handle_media_stream(websocket: WebSocket):
async def receive_from_twilio():
"""Receive audio data from Twilio and send it to the OpenAI Realtime API."""
nonlocal stream_sid, latest_media_timestamp
print("Listening for messages from Twilio WebSocket...")
try:
async for message in websocket.iter_text():
data = json.loads(message)
print(f"Received message from Twilio: {data}")

if data['event'] == 'media' and openai_ws.open:
latest_media_timestamp = int(data['media']['timestamp'])
print(f"Received audio payload at timestamp {latest_media_timestamp}")
audio_append = {
"type": "input_audio_buffer.append",
"audio": data['media']['payload']
}
await openai_ws.send(json.dumps(audio_append))
elif data['event'] == 'start':
stream_sid = data['start']['streamSid']
print(f"Incoming stream has started {stream_sid}")
print(f"Stream started with SID: {stream_sid}")
response_start_timestamp_twilio = None
latest_media_timestamp = 0
last_assistant_item = None
elif data['event'] == 'mark':
print("Received mark event from Twilio.")
if mark_queue:
mark_queue.pop(0)
except WebSocketDisconnect:
print("Client disconnected.")
print("Twilio WebSocket disconnected.")
if openai_ws.open:
await openai_ws.close()

async def send_to_twilio():
"""Receive events from the OpenAI Realtime API, send audio back to Twilio."""
nonlocal stream_sid, last_assistant_item, response_start_timestamp_twilio
print("Listening for messages from OpenAI WebSocket...")
try:
async for openai_message in openai_ws:
response = json.loads(openai_message)
print(f"Received message from OpenAI: {response}")

if response['type'] in LOG_EVENT_TYPES:
print(f"Received event: {response['type']}", response)
print(f"Processing event: {response['type']}")

if response.get('type') == 'response.audio.delta' and 'delta' in response:
audio_payload = base64.b64encode(base64.b64decode(response['delta'])).decode('utf-8')
print(f"Sending audio response to Twilio (stream SID: {stream_sid})")
audio_delta = {
"event": "media",
"streamSid": stream_sid,
Expand All @@ -123,20 +134,17 @@ async def send_to_twilio():

if response_start_timestamp_twilio is None:
response_start_timestamp_twilio = latest_media_timestamp
if SHOW_TIMING_MATH:
print(f"Setting start timestamp for new response: {response_start_timestamp_twilio}ms")
print(f"Set response start timestamp: {response_start_timestamp_twilio}ms")

# Update last_assistant_item safely
if response.get('item_id'):
last_assistant_item = response['item_id']

await send_mark(websocket, stream_sid)

# Trigger an interruption. Your use case might work better using `input_audio_buffer.speech_stopped`, or combining the two.
if response.get('type') == 'input_audio_buffer.speech_started':
print("Speech started detected.")
if last_assistant_item:
print(f"Interrupting response with id: {last_assistant_item}")
print(f"Interrupting assistant response (ID: {last_assistant_item})")
await handle_speech_started_event()
except Exception as e:
print(f"Error in send_to_twilio: {e}")
Expand All @@ -147,13 +155,10 @@ async def handle_speech_started_event():
print("Handling speech started event.")
if mark_queue and response_start_timestamp_twilio is not None:
elapsed_time = latest_media_timestamp - response_start_timestamp_twilio
if SHOW_TIMING_MATH:
print(f"Calculating elapsed time for truncation: {latest_media_timestamp} - {response_start_timestamp_twilio} = {elapsed_time}ms")
print(f"Elapsed time for truncation: {elapsed_time}ms")

if last_assistant_item:
if SHOW_TIMING_MATH:
print(f"Truncating item with ID: {last_assistant_item}, Truncated at: {elapsed_time}ms")

print(f"Truncating response ID {last_assistant_item} at {elapsed_time}ms")
truncate_event = {
"type": "conversation.item.truncate",
"item_id": last_assistant_item,
Expand All @@ -173,6 +178,7 @@ async def handle_speech_started_event():

async def send_mark(connection, stream_sid):
if stream_sid:
print(f"Sending mark event to Twilio (stream SID: {stream_sid})")
mark_event = {
"event": "mark",
"streamSid": stream_sid,
Expand All @@ -183,25 +189,6 @@ async def send_mark(connection, stream_sid):

await asyncio.gather(receive_from_twilio(), send_to_twilio())

async def send_initial_conversation_item(openai_ws):
"""Send initial conversation item if AI talks first."""
initial_conversation_item = {
"type": "conversation.item.create",
"item": {
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": "Greet the user with 'Hello there! I am an AI voice assistant powered by Twilio and the OpenAI Realtime API. You can ask me for facts, jokes, or anything you can imagine. How can I help you?'"
}
]
}
}
await openai_ws.send(json.dumps(initial_conversation_item))
await openai_ws.send(json.dumps({"type": "response.create"}))


async def initialize_session(openai_ws):
"""Control initial session with OpenAI."""
session_update = {
Expand All @@ -216,12 +203,10 @@ async def initialize_session(openai_ws):
"temperature": 0.8,
}
}
print('Sending session update:', json.dumps(session_update))
print('Initializing OpenAI session:', json.dumps(session_update))
await openai_ws.send(json.dumps(session_update))

# Uncomment the next line to have the AI speak first
# await send_initial_conversation_item(openai_ws)

if __name__ == "__main__":
print(f"Starting FastAPI server on port {PORT}...")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)