Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
Saluev committed Jan 15, 2024
1 parent 015e76b commit 3a07c41
Show file tree
Hide file tree
Showing 9 changed files with 377 additions and 10 deletions.
204 changes: 204 additions & 0 deletions suppgram/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional, List, Dict, Set, assert_never, Awaitable, Callable

from suppgram.entities import Event, MessageKind, EventKind
from suppgram.storage import Storage


@dataclass(frozen=True)
class AgentAnalytics:
agent_id: Any
telegram_user_id: Optional[int]

total_assigned: int
total_resolved: int
average_customer_rating: float
average_assignment_to_resolution_time_min: float


@dataclass(frozen=True)
class ConversationAnalytics:
conversation_id: Any
customer_id: Any
last_assigned_agent_id: Optional[Any]
customer_rating: Optional[int]

creation_time_utc: datetime
first_assignment_time_utc: Optional[datetime]
resolve_time_utc: Optional[datetime]

last_message_kind: MessageKind
last_message_time_utc: datetime


@dataclass(frozen=True)
class Report:
agents: List[AgentAnalytics]
conversations: List[ConversationAnalytics]
events: List[Event]

average_start_to_first_response_time_min: float
average_start_to_resolution_time_min: float
average_customer_rating: float


ProgressCallback = Callable[[float], Awaitable[None]]


@dataclass
class _ConversationState:
conversation_id: Any
last_assigned_agent_id: Optional[Any] = None

creation_time_utc: Optional[datetime] = None
first_assignment_time_utc: Optional[datetime] = None
first_response_time_utc: Optional[datetime] = None
last_assignment_time_utc: Optional[datetime] = None
resolve_time_utc: Optional[datetime] = None

last_message_kind: Optional[MessageKind] = None
last_message_time_utc: Optional[datetime] = None


class Reporter:
def __init__(self, storage: Storage):
self._storage = storage

async def report(self, progress_callback: Optional[ProgressCallback]) -> Report:
conversation_states: Dict[Any, _ConversationState] = {}
agent_assignments: Dict[Any, Set[Any]] = defaultdict(set)
agent_resolutions: Dict[Any, Set[Any]] = defaultdict(set)
agent_ratings: Dict[Any, List[int]] = defaultdict(list)
agent_resolution_times: Dict[Any, List[timedelta]] = defaultdict(list)

def _conv_state(conv_id: Any) -> _ConversationState:
conversation_states.setdefault(conv_id, _ConversationState(conversation_id=conv_id))
return conversation_states[conv_id]

events: List[Event] = []
async for event in self._storage.find_all_events():
events.append(event)
conv_state = _conv_state(event.conversation_id)
match event.kind:
case EventKind.AGENT_ASSIGNED:
conv_state.last_assigned_agent_id = event.agent_id
if not conv_state.first_assignment_time_utc:
conv_state.first_assignment_time_utc = event.time_utc
conv_state.last_assignment_time_utc = event.time_utc
agent_assignments[event.agent_id].add(event.conversation_id)

case EventKind.CONVERSATION_POSTPONED:
pass

case EventKind.CONVERSATION_RATED:
# We'll only take into account final ratings stored in conversations.
# Rating will be attributed to the last agent assigned to a conversation.
pass

case EventKind.CONVERSATION_RESOLVED:
conv_state.resolve_time_utc = event.time_utc
agent_resolutions[conv_state.last_assigned_agent_id].add(event.conversation_id)
if conv_state.last_assignment_time_utc:
agent_resolution_times[conv_state.last_assigned_agent_id].append(
event.time_utc - conv_state.last_assignment_time_utc
)

case EventKind.CONVERSATION_STARTED:
conv_state.creation_time_utc = event.time_utc

case EventKind.CONVERSATION_TAG_ADDED:
pass

case EventKind.CONVERSATION_TAG_REMOVED:
pass

case EventKind.MESSAGE_SENT:
if (
event.message_kind == MessageKind.FROM_AGENT
and not conv_state.first_response_time_utc
):
conv_state.first_response_time_utc = event.time_utc
if event.message_kind in (MessageKind.FROM_AGENT, MessageKind.FROM_CUSTOMER):
conv_state.last_message_kind = event.message_kind
conv_state.last_message_time_utc = event.time_utc

case _ as unreachable:
assert_never(unreachable)

conversations: List[ConversationAnalytics] = []
async for conv in self._storage.find_all_conversations():
conv_state = _conv_state(conv.id)
if (
not conv_state.creation_time_utc
or not conv_state.last_message_kind
or not conv_state.last_message_time_utc
):
# Not a single message within conversation — ignoring it.
continue

if conv.customer_rating is not None and conv_state.last_assigned_agent_id is not None:
agent_ratings[conv_state.last_assigned_agent_id].append(conv.customer_rating)
conversations.append(
ConversationAnalytics(
conversation_id=conv.id,
customer_id=conv.customer.id,
last_assigned_agent_id=conv_state.last_assigned_agent_id,
customer_rating=conv.customer_rating,
creation_time_utc=conv_state.creation_time_utc,
first_assignment_time_utc=conv_state.first_assignment_time_utc,
resolve_time_utc=conv_state.resolve_time_utc,
last_message_kind=conv_state.last_message_kind,
last_message_time_utc=conv_state.last_message_time_utc,
)
)

agents = [
AgentAnalytics(
agent_id=agent.id,
telegram_user_id=agent.telegram_user_id,
total_assigned=len(agent_assignments[agent.id]),
total_resolved=len(agent_resolutions[agent.id]),
average_customer_rating=sum(agent_ratings[agent.id]) / len(agent_ratings[agent.id])
if agent_ratings[agent.id]
else float("nan"),
average_assignment_to_resolution_time_min=sum(
td.total_seconds() / 60.0 for td in agent_resolution_times[agent.id]
)
/ len(agent_resolution_times[agent.id])
if agent_resolution_times[agent.id]
else float("nan"),
)
async for agent in self._storage.find_all_agents()
]

first_response_times_min = [
(conv_state.first_response_time_utc - conv_state.creation_time_utc).total_seconds()
/ 60.0
for conv_state in conversation_states.values()
if conv_state.creation_time_utc and conv_state.first_response_time_utc
]
resolution_times_min = [
(conv_state.resolve_time_utc - conv_state.creation_time_utc).total_seconds() / 60.0
for conv_state in conversation_states.values()
if conv_state.creation_time_utc and conv_state.resolve_time_utc
]
ratings = [
conv.customer_rating for conv in conversations if conv.customer_rating is not None
]

return Report(
agents=agents,
conversations=conversations,
events=events,
average_start_to_first_response_time_min=sum(first_response_times_min)
/ len(first_response_times_min)
if first_response_times_min
else float("nan"),
average_start_to_resolution_time_min=sum(resolution_times_min)
/ len(resolution_times_min)
if resolution_times_min
else float("nan"),
average_customer_rating=sum(ratings) / len(ratings) if ratings else float("nan"),
)
77 changes: 77 additions & 0 deletions suppgram/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
ConversationTagEvent,
Customer,
TagEvent,
Event,
EventKind,
)
from suppgram.errors import PermissionDenied, AgentDeactivated
from suppgram.helpers import flat_gather
Expand Down Expand Up @@ -123,6 +125,14 @@ async def _process_message_from_user(self, conversation: Conversation, message:
conversation.messages.append(message)
if len(conversation.messages) == 1:
await self.on_new_conversation.trigger(ConversationEvent(conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_STARTED,
time_utc=message.time_utc,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)
if conversation.assigned_agent and conversation.assigned_workplace:
await self.on_new_message_for_agent.trigger(
NewMessageForAgentEvent(
Expand All @@ -135,6 +145,16 @@ async def _process_message_from_user(self, conversation: Conversation, message:
await self.on_new_unassigned_message_from_customer.trigger(
NewUnassignedMessageFromCustomerEvent(message=message, conversation=conversation)
)
await self._storage.save_event(
Event(
kind=EventKind.MESSAGE_SENT,
time_utc=message.time_utc,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
message_kind=message.kind,
message_media_kind=message.media_kind,
)
)

async def _process_message_from_agent(self, conversation: Conversation, message: Message):
if conversation.assigned_agent and conversation.assigned_agent.deactivated:
Expand All @@ -147,6 +167,16 @@ async def _process_message_from_agent(self, conversation: Conversation, message:
message=message,
)
)
await self._storage.save_event(
Event(
kind=EventKind.MESSAGE_SENT,
time_utc=message.time_utc,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
message_kind=message.kind,
message_media_kind=message.media_kind,
)
)

async def _process_internal_message(self, conversation: Conversation, message: Message):
await self._storage.save_message(conversation, message)
Expand Down Expand Up @@ -179,6 +209,14 @@ async def assign_agent(self, assigner: Agent, assignee: Agent, conversation_id:
)
conversation = await self._storage.get_agent_conversation(workplace.identification)
await self.on_conversation_assignment.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.AGENT_ASSIGNED,
agent_id=assignee.id,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def get_conversations(
self, conversation_ids: List[Any], with_messages: bool = False
Expand All @@ -196,6 +234,14 @@ async def add_tag_to_conversation(self, conversation: Conversation, tag: Tag):
await self.on_conversation_tag_added.trigger(
ConversationTagEvent(conversation=conversation, tag=tag)
)
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_TAG_ADDED,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
tag_id=tag.id,
)
)

async def remove_tag_from_conversation(self, conversation: Conversation, tag: Tag):
await self._storage.update_conversation(
Expand All @@ -205,13 +251,28 @@ async def remove_tag_from_conversation(self, conversation: Conversation, tag: Ta
await self.on_conversation_tag_removed.trigger(
ConversationTagEvent(conversation=conversation, tag=tag)
)
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_TAG_REMOVED,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
tag_id=tag.id,
)
)

async def rate_conversation(self, conversation: Conversation, rating: int):
await self._storage.update_conversation(
conversation.id, ConversationDiff(customer_rating=rating)
)
conversation = await self.get_conversation(conversation.id)
await self.on_conversation_rated.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_RATED,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def postpone_conversation(self, postponer: Agent, conversation: Conversation):
if postponer != conversation.assigned_agent:
Expand All @@ -233,6 +294,14 @@ async def postpone_conversation(self, postponer: Agent, conversation: Conversati
tags=conversation.tags,
)
await self.on_new_conversation.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_POSTPONED,
agent_id=postponer.id,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def resolve_conversation(self, resolver: Agent, conversation: Conversation):
if resolver != conversation.assigned_agent:
Expand All @@ -258,6 +327,14 @@ async def resolve_conversation(self, resolver: Agent, conversation: Conversation
tags=conversation.tags,
)
await self.on_conversation_resolution.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_POSTPONED,
agent_id=resolver.id,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def _choose_workplace(self, agent: Agent) -> Workplace:
existing_workplaces = await self._storage.get_agent_workplaces(agent)
Expand Down
17 changes: 15 additions & 2 deletions suppgram/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import Optional, Any, List, Union
from uuid import UUID
Expand Down Expand Up @@ -314,8 +314,18 @@ def assigned_workplace_identification(self) -> Optional[WorkplaceIdentification]
return WorkplaceIdentification(id=self.assigned_workplace_id)


# Data classes for analytics


class EventKind(str, Enum):
AGENT_ASSIGNED = "agent_assigned"
CONVERSATION_POSTPONED = "conversation_postponed"
CONVERSATION_RATED = "conversation_rated"
CONVERSATION_RESOLVED = "conversation_resolved"
CONVERSATION_STARTED = "conversation_started"
CONVERSATION_TAG_ADDED = "conversation_tag_added"
CONVERSATION_TAG_REMOVED = "conversation_tag_removed"
MESSAGE_SENT = "message_sent"


class MessageMediaKind(str, Enum):
Expand All @@ -329,7 +339,7 @@ class Event:
"""Describes arbitrary event within Suppgram application, with all relevant entities linked by their IDs."""

kind: EventKind
time_utc: datetime
time_utc: datetime = field(default_factory=lambda: datetime.now(timezone.utc))

agent_id: Optional[Any] = None
conversation_id: Optional[Any] = None
Expand All @@ -340,6 +350,9 @@ class Event:
workplace_id: Optional[Any] = None


# Data classes for observables


@dataclass(frozen=True)
class ConversationEvent:
conversation: Conversation
Expand Down
Loading

0 comments on commit 3a07c41

Please sign in to comment.