Skip to content

Commit

Permalink
perf+fix+breaking+refacto: Interaction timeout, db perfs, remove SQLite
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Dec 6, 2024
1 parent 65ecc12 commit ce57ca6
Show file tree
Hide file tree
Showing 30 changed files with 465 additions and 651 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ graph LR
ada["Embedding<br>(ADA)"]
app["App<br>(Container App)"]
communication_services["Call & SMS gateway<br>(Communication Services)"]
db[("Conversations and claims<br>(Cosmos DB / SQLite)")]
db[("Conversations and claims<br>(Cosmos DB)")]
eventgrid["Broker<br>(Event Grid)"]
gpt["LLM<br>(GPT-4o)"]
queues[("Queues<br>(Azure Storage)")]
Expand Down Expand Up @@ -627,7 +627,7 @@ Conversation options are represented as features. They can be configured from Ap
| `answer_hard_timeout_sec` | The hard timeout for the bot answer in seconds. | `int` | 180 |
| `answer_soft_timeout_sec` | The soft timeout for the bot answer in seconds. | `int` | 30 |
| `callback_timeout_hour` | The timeout for a callback in hours. | `int` | 3 |
| `phone_silence_timeout_sec` | The timeout for phone silence in seconds. | `int` | 10 |
| `phone_silence_timeout_sec` | The timeout for phone silence in seconds. | `int` | 20 |
| `recognition_retry_max` | The maximum number of retries for voice recognition. | `int` | 2 |
| `recording_enabled` | Whether call recording is enabled. | `bool` | false |
| `slow_llm_for_chat` | Whether to use the slow LLM for chat. | `bool` | false |
Expand Down
161 changes: 107 additions & 54 deletions app/helpers/call_events.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from collections.abc import Awaitable, Callable
from datetime import UTC, datetime

from azure.communication.callautomation import (
AzureBlobContainerRecordingStorage,
Expand Down Expand Up @@ -76,19 +77,17 @@ async def on_new_call(
return True

except ClientAuthenticationError:
logger.error(
"Authentication error with Communication Services, check the credentials",
exc_info=True,
logger.exception(
"Authentication error with Communication Services, check the credentials"
)

except HttpResponseError as e:
if "lifetime validation of the signed http request failed" in e.message.lower():
logger.debug("Old call event received, ignoring")
else:
logger.error(
logger.exception(
"Unknown error answering call with %s",
phone_number,
exc_info=True,
)

return False
Expand All @@ -101,23 +100,25 @@ async def on_call_connected(
server_call_id: str,
) -> None:
logger.info("Call connected, asking for language")
call.recognition_retry = 0 # Reset recognition retry counter
call.in_progress = True
call.messages.append(
MessageModel(
action=MessageActionEnum.CALL,
content="",
persona=MessagePersonaEnum.HUMAN,

# Add define the call as in progress
async with _db.call_transac(call):
call.in_progress = True
call.recognition_retry = 0
call.messages.append(
MessageModel(
action=MessageActionEnum.CALL,
content="",
persona=MessagePersonaEnum.HUMAN,
)
)
)

# Execute business logic
await asyncio.gather(
_handle_ivr_language(
call=call,
client=client,
), # First, every time a call is answered, confirm the language
_db.call_aset(
call
), # Second, save in DB allowing SMS answers to be more "in-sync", should be quick enough to be in sync with the next message
_handle_recording(
call=call,
client=client,
Expand Down Expand Up @@ -172,7 +173,10 @@ async def on_recognize_error(
) -> None:
# Retry IVR recognition
if contexts and CallContextEnum.IVR_LANG_SELECT in contexts:
# Enrich span
span_attribute(CallAttributes.CALL_CHANNEL, "ivr")

# Retry IVR recognition
if call.recognition_retry < await recognition_retry_max():
logger.info(
"Timeout, retrying language selection (%s/%s)",
Expand All @@ -183,13 +187,16 @@ async def on_recognize_error(
call=call,
client=client,
)
else: # IVR retries are exhausted, end call

# IVR retries are exhausted, end call
else:
logger.info("Timeout, ending call")
await _handle_goodbye(
call=call,
client=client,
post_callback=post_callback,
)

return

# Voice retries are exhausted, end call
Expand All @@ -203,7 +210,8 @@ async def on_recognize_error(
return

# Increment the recognition retry counter
call.recognition_retry += 1
async with _db.call_transac(call):
call.recognition_retry += 1

# Play a timeout prompt
await handle_play_text(
Expand Down Expand Up @@ -236,6 +244,20 @@ async def _handle_goodbye(
)


@tracer.start_as_current_span("on_play_started")
async def on_play_started(
call: CallStateModel,
) -> None:
logger.debug("Play started")

# Enrich span
span_attribute(CallAttributes.CALL_CHANNEL, "voice")

# Update last interaction
async with _db.call_transac(call):
call.last_interaction_at = datetime.now(UTC)


@tracer.start_as_current_span("on_play_completed")
async def on_play_completed(
call: CallStateModel,
Expand All @@ -244,35 +266,52 @@ async def on_play_completed(
post_callback: Callable[[CallStateModel], Awaitable[None]],
) -> None:
logger.debug("Play completed")

# Enrich span
span_attribute(CallAttributes.CALL_CHANNEL, "voice")

# Update last interaction
async with _db.call_transac(call):
call.last_interaction_at = datetime.now(UTC)

# Skip if no context data
if not contexts:
return

# Call ended context
if (
CallContextEnum.TRANSFER_FAILED in contexts
or CallContextEnum.GOODBYE in contexts
): # Call ended
):
logger.info("Ending call")
await _handle_hangup(
call=call,
client=client,
post_callback=post_callback,
)
return

elif CallContextEnum.CONNECT_AGENT in contexts: # Call transfer
# Call transfer context
if CallContextEnum.CONNECT_AGENT in contexts:
logger.info("Initiating transfer call initiated")
await handle_transfer(
call=call,
client=client,
target=call.initiate.agent_phone_number,
)
return

logger.warning("Unknown context %s", contexts)


@tracer.start_as_current_span("on_play_error")
async def on_play_error(error_code: int) -> None:
logger.debug("Play failed")

# Enrich span
span_attribute(CallAttributes.CALL_CHANNEL, "voice")

# Suppress known errors
# See: https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/communication-services/how-tos/call-automation/play-action.md
match error_code:
case 8535:
Expand Down Expand Up @@ -300,9 +339,12 @@ async def on_ivr_recognized(
label: str,
) -> None:
logger.info("IVR recognized: %s", label)

# Enrich span
span_attribute(CallAttributes.CALL_CHANNEL, "ivr")
span_attribute(CallAttributes.CALL_MESSAGE, label)
call.recognition_retry = 0 # Reset recognition retry counter

# Parse language from label
try:
lang = next(
(x for x in call.initiate.lang.availables if x.short_code == label),
Expand All @@ -313,21 +355,21 @@ async def on_ivr_recognized(
return

logger.info("Setting call language to %s", lang)
call.lang = lang.short_code
persist_coro = _db.call_aset(call)
async with _db.call_transac(call):
call.lang = lang.short_code
call.recognition_retry = 0

if len(call.messages) <= 1: # First call, or only the call action
await asyncio.gather(
handle_play_text(
call=call,
client=client,
text=await CONFIG.prompts.tts.hello(call),
), # First, greet the user
persist_coro, # Second, persist language change for next messages, should be quick enough to be in sync with the next message
), # First, greet the userwith the next message
start_audio_streaming(
call=call,
client=client,
), # Third, the conversation with the LLM should start
), # Second, the conversation with the LLM should start
) # All in parallel to lower the response latency

else: # Returning call
Expand All @@ -338,11 +380,10 @@ async def on_ivr_recognized(
style=MessageStyleEnum.CHEERFUL,
text=await CONFIG.prompts.tts.welcome_back(call),
), # First, welcome back the user
persist_coro, # Second, persist language change for next messages, should be quick enough to be in sync with the next message
start_audio_streaming(
call=call,
client=client,
), # Third, the conversation with the LLM should start
), # Second, the conversation with the LLM should start
)


Expand Down Expand Up @@ -373,18 +414,26 @@ async def on_sms_received(
message: str,
) -> bool:
logger.info("SMS received from %s: %s", call.initiate.phone_number, message)

# Enrich span
span_attribute(CallAttributes.CALL_CHANNEL, "sms")
span_attribute(CallAttributes.CALL_MESSAGE, message)
call.messages.append(
MessageModel(
action=MessageActionEnum.SMS,
content=message,
persona=MessagePersonaEnum.HUMAN,

# Add the SMS to the call history
async with _db.call_transac(call):
call.messages.append(
MessageModel(
action=MessageActionEnum.SMS,
content=message,
persona=MessagePersonaEnum.HUMAN,
)
)
)
await _db.call_aset(call) # save in DB allowing SMS answers to be more "in-sync"

# If the call is not in progress, answer with SMS
if not call.in_progress:
logger.info("Call not in progress, answering with SMS")

# If the call is in progress, answer with voice
else:
logger.info("Call in progress, answering with voice")
# TODO: Reimplement SMS answers in voice
Expand All @@ -393,6 +442,7 @@ async def on_sms_received(
# client=client,
# post_callback=post_callback,
# )

return True


Expand All @@ -402,14 +452,17 @@ async def _handle_hangup(
post_callback: Callable[[CallStateModel], Awaitable[None]],
) -> None:
await handle_hangup(client=client, call=call)
call.in_progress = False
call.messages.append(
MessageModel(
action=MessageActionEnum.HANGUP,
content="",
persona=MessagePersonaEnum.HUMAN,

async with _db.call_transac(call):
call.in_progress = False
call.messages.append(
MessageModel(
action=MessageActionEnum.HANGUP,
content="",
persona=MessagePersonaEnum.HUMAN,
)
)
)

await post_callback(call)


Expand Down Expand Up @@ -467,21 +520,21 @@ def _validate(req: str | None) -> tuple[bool, str | None, str | None]:
):
if not number:
continue
res = await _sms.asend(content, number)
res = await _sms.send(content, number)
if not res:
logger.warning("Failed sending SMS report to %s", number)
continue
success = True

if success:
call.messages.append(
MessageModel(
action=MessageActionEnum.SMS,
content=content,
persona=MessagePersonaEnum.ASSISTANT,
async with _db.call_transac(call):
call.messages.append(
MessageModel(
action=MessageActionEnum.SMS,
content=content,
persona=MessagePersonaEnum.ASSISTANT,
)
)
)
await _db.call_aset(call)


async def _intelligence_synthesis(call: CallStateModel) -> None:
Expand Down Expand Up @@ -511,8 +564,8 @@ def _validate(
return

logger.info("Synthesis: %s", model)
call.synthesis = model
await _db.call_aset(call)
async with _db.call_transac(call):
call.synthesis = model


async def _intelligence_next(call: CallStateModel) -> None:
Expand Down Expand Up @@ -542,8 +595,8 @@ def _validate(
return

logger.info("Next action: %s", model)
call.next = model
await _db.call_aset(call)
async with _db.call_transac(call):
call.next = model


async def _handle_ivr_language(
Expand Down
Loading

0 comments on commit ce57ca6

Please sign in to comment.