Skip to content

Commit

Permalink
Implement basic analytical report
Browse files Browse the repository at this point in the history
  • Loading branch information
Saluev committed Jan 15, 2024
1 parent 015e76b commit ef3e9d2
Show file tree
Hide file tree
Showing 16 changed files with 532 additions and 33 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ jobs:
- name: Run tests
# TODO properly wait until postgres is ready https://stackoverflow.com/a/55835081/999858
run: |
docker run -p 5432:5432 -e POSTGRES_USER=suppgram -e POSTGRES_PASSWORD=test -e POSTGRES_DB=suppgram_test -d postgres:latest
docker run -p 27017:27017 -d mongo:latest
docker run -p 5432:5432 -e POSTGRES_USER=suppgram -e POSTGRES_PASSWORD=test -e POSTGRES_DB=suppgram_test --name suppgram_postgres -d postgres:latest
docker run -p 27017:27017 --name suppgram_mongodb -d mongo:latest
PYTHONPATH=. pytest .
216 changes: 216 additions & 0 deletions suppgram/analytics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Optional, List, Dict, Set, Awaitable, Callable

from suppgram.entities import Event, MessageKind, EventKind
from suppgram.helpers import aenumerate
from suppgram.storage import Storage


@dataclass(frozen=True)
class AgentAnalytics:
agent_id: Any
telegram_user_id: Optional[int]

total_assigned: int
total_resolved: int
average_customer_rating: float
average_assignment_to_resolution_time_min: float


@dataclass(frozen=True)
class ConversationAnalytics:
conversation_id: Any
customer_id: Any
last_assigned_agent_id: Optional[Any]
customer_rating: Optional[int]

creation_time_utc: datetime
first_assignment_time_utc: Optional[datetime]
resolve_time_utc: Optional[datetime]

last_message_kind: MessageKind
last_message_time_utc: datetime


@dataclass(frozen=True)
class Report:
agents: List[AgentAnalytics]
conversations: List[ConversationAnalytics]
events: List[Event]

average_start_to_first_response_time_min: float
average_start_to_resolution_time_min: float
average_customer_rating: float


ProgressCallback = Callable[[float], Awaitable[None]]


@dataclass
class _ConversationState:
conversation_id: Any
last_assigned_agent_id: Optional[Any] = None

creation_time_utc: Optional[datetime] = None
first_assignment_time_utc: Optional[datetime] = None
first_response_time_utc: Optional[datetime] = None
last_assignment_time_utc: Optional[datetime] = None
resolve_time_utc: Optional[datetime] = None

last_message_kind: Optional[MessageKind] = None
last_message_time_utc: Optional[datetime] = None


class Reporter:
def __init__(self, storage: Storage):
self._storage = storage

async def report(self, progress_callback: Optional[ProgressCallback]) -> Report:
if progress_callback is not None:
await progress_callback(0.0)
total_events = await self._storage.count_all_events()
total_conversations = await self._storage.count_all_conversations()
total_steps = max(1, total_events + total_conversations)
# Might also count agents, but there are probably going
# to be very few compared to conversations and events.

conversation_states: Dict[Any, _ConversationState] = {}
agent_assignments: Dict[Any, Set[Any]] = defaultdict(set)
agent_resolutions: Dict[Any, Set[Any]] = defaultdict(set)
agent_ratings: Dict[Any, List[int]] = defaultdict(list)
agent_resolution_times: Dict[Any, List[timedelta]] = defaultdict(list)

def _conv_state(conv_id: Any) -> _ConversationState:
conversation_states.setdefault(conv_id, _ConversationState(conversation_id=conv_id))
return conversation_states[conv_id]

events: List[Event] = []
async for i, event in aenumerate(self._storage.find_all_events()):
events.append(event)
conv_state = _conv_state(event.conversation_id)

if event.kind == EventKind.AGENT_ASSIGNED:
conv_state.last_assigned_agent_id = event.agent_id
if not conv_state.first_assignment_time_utc:
conv_state.first_assignment_time_utc = event.time_utc
conv_state.last_assignment_time_utc = event.time_utc
agent_assignments[event.agent_id].add(event.conversation_id)

elif event.kind == EventKind.CONVERSATION_POSTPONED:
pass

elif event.kind == EventKind.CONVERSATION_RATED:
# We'll only take into account final ratings stored in conversations.
# Rating will be attributed to the last agent assigned to a conversation.
pass

elif event.kind == EventKind.CONVERSATION_RESOLVED:
conv_state.resolve_time_utc = event.time_utc
agent_resolutions[conv_state.last_assigned_agent_id].add(event.conversation_id)
if conv_state.last_assignment_time_utc:
agent_resolution_times[conv_state.last_assigned_agent_id].append(
event.time_utc - conv_state.last_assignment_time_utc
)

elif event.kind == EventKind.CONVERSATION_STARTED:
conv_state.creation_time_utc = event.time_utc

elif event.kind == EventKind.CONVERSATION_TAG_ADDED:
pass

elif event.kind == EventKind.CONVERSATION_TAG_REMOVED:
pass

elif event.kind == EventKind.MESSAGE_SENT:
if (
event.message_kind == MessageKind.FROM_AGENT
and not conv_state.first_response_time_utc
):
conv_state.first_response_time_utc = event.time_utc
if event.message_kind in (MessageKind.FROM_AGENT, MessageKind.FROM_CUSTOMER):
conv_state.last_message_kind = event.message_kind
conv_state.last_message_time_utc = event.time_utc

if progress_callback is not None:
await progress_callback((i + 1) / total_steps)

conversations: List[ConversationAnalytics] = []
async for i, conv in aenumerate(self._storage.find_all_conversations()):
conv_state = _conv_state(conv.id)
if (
not conv_state.creation_time_utc
or not conv_state.last_message_kind
or not conv_state.last_message_time_utc
):
# Not a single message within conversation — ignoring it.
continue

if conv.customer_rating is not None and conv_state.last_assigned_agent_id is not None:
agent_ratings[conv_state.last_assigned_agent_id].append(conv.customer_rating)
conversations.append(
ConversationAnalytics(
conversation_id=conv.id,
customer_id=conv.customer.id,
last_assigned_agent_id=conv_state.last_assigned_agent_id,
customer_rating=conv.customer_rating,
creation_time_utc=conv_state.creation_time_utc,
first_assignment_time_utc=conv_state.first_assignment_time_utc,
resolve_time_utc=conv_state.resolve_time_utc,
last_message_kind=conv_state.last_message_kind,
last_message_time_utc=conv_state.last_message_time_utc,
)
)

if progress_callback is not None:
await progress_callback((total_events + i + 1) / total_steps)

agents = [
AgentAnalytics(
agent_id=agent.id,
telegram_user_id=agent.telegram_user_id,
total_assigned=len(agent_assignments[agent.id]),
total_resolved=len(agent_resolutions[agent.id]),
average_customer_rating=sum(agent_ratings[agent.id]) / len(agent_ratings[agent.id])
if agent_ratings[agent.id]
else float("nan"),
average_assignment_to_resolution_time_min=sum(
td.total_seconds() / 60.0 for td in agent_resolution_times[agent.id]
)
/ len(agent_resolution_times[agent.id])
if agent_resolution_times[agent.id]
else float("nan"),
)
async for agent in self._storage.find_all_agents()
]

first_response_times_min = [
(conv_state.first_response_time_utc - conv_state.creation_time_utc).total_seconds()
/ 60.0
for conv_state in conversation_states.values()
if conv_state.creation_time_utc and conv_state.first_response_time_utc
]
resolution_times_min = [
(conv_state.resolve_time_utc - conv_state.creation_time_utc).total_seconds() / 60.0
for conv_state in conversation_states.values()
if conv_state.creation_time_utc and conv_state.resolve_time_utc
]
ratings = [
conv.customer_rating for conv in conversations if conv.customer_rating is not None
]

return Report(
agents=agents,
conversations=conversations,
events=events,
average_start_to_first_response_time_min=sum(first_response_times_min)
/ len(first_response_times_min)
if first_response_times_min
else float("nan"),
average_start_to_resolution_time_min=sum(resolution_times_min)
/ len(resolution_times_min)
if resolution_times_min
else float("nan"),
average_customer_rating=sum(ratings) / len(ratings) if ratings else float("nan"),
)
77 changes: 77 additions & 0 deletions suppgram/backends/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
ConversationTagEvent,
Customer,
TagEvent,
Event,
EventKind,
)
from suppgram.errors import PermissionDenied, AgentDeactivated
from suppgram.helpers import flat_gather
Expand Down Expand Up @@ -123,6 +125,14 @@ async def _process_message_from_user(self, conversation: Conversation, message:
conversation.messages.append(message)
if len(conversation.messages) == 1:
await self.on_new_conversation.trigger(ConversationEvent(conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_STARTED,
time_utc=message.time_utc,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)
if conversation.assigned_agent and conversation.assigned_workplace:
await self.on_new_message_for_agent.trigger(
NewMessageForAgentEvent(
Expand All @@ -135,6 +145,16 @@ async def _process_message_from_user(self, conversation: Conversation, message:
await self.on_new_unassigned_message_from_customer.trigger(
NewUnassignedMessageFromCustomerEvent(message=message, conversation=conversation)
)
await self._storage.save_event(
Event(
kind=EventKind.MESSAGE_SENT,
time_utc=message.time_utc,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
message_kind=message.kind,
message_media_kind=message.media_kind,
)
)

async def _process_message_from_agent(self, conversation: Conversation, message: Message):
if conversation.assigned_agent and conversation.assigned_agent.deactivated:
Expand All @@ -147,6 +167,16 @@ async def _process_message_from_agent(self, conversation: Conversation, message:
message=message,
)
)
await self._storage.save_event(
Event(
kind=EventKind.MESSAGE_SENT,
time_utc=message.time_utc,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
message_kind=message.kind,
message_media_kind=message.media_kind,
)
)

async def _process_internal_message(self, conversation: Conversation, message: Message):
await self._storage.save_message(conversation, message)
Expand Down Expand Up @@ -179,6 +209,14 @@ async def assign_agent(self, assigner: Agent, assignee: Agent, conversation_id:
)
conversation = await self._storage.get_agent_conversation(workplace.identification)
await self.on_conversation_assignment.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.AGENT_ASSIGNED,
agent_id=assignee.id,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def get_conversations(
self, conversation_ids: List[Any], with_messages: bool = False
Expand All @@ -196,6 +234,14 @@ async def add_tag_to_conversation(self, conversation: Conversation, tag: Tag):
await self.on_conversation_tag_added.trigger(
ConversationTagEvent(conversation=conversation, tag=tag)
)
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_TAG_ADDED,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
tag_id=tag.id,
)
)

async def remove_tag_from_conversation(self, conversation: Conversation, tag: Tag):
await self._storage.update_conversation(
Expand All @@ -205,13 +251,28 @@ async def remove_tag_from_conversation(self, conversation: Conversation, tag: Ta
await self.on_conversation_tag_removed.trigger(
ConversationTagEvent(conversation=conversation, tag=tag)
)
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_TAG_REMOVED,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
tag_id=tag.id,
)
)

async def rate_conversation(self, conversation: Conversation, rating: int):
await self._storage.update_conversation(
conversation.id, ConversationDiff(customer_rating=rating)
)
conversation = await self.get_conversation(conversation.id)
await self.on_conversation_rated.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_RATED,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def postpone_conversation(self, postponer: Agent, conversation: Conversation):
if postponer != conversation.assigned_agent:
Expand All @@ -233,6 +294,14 @@ async def postpone_conversation(self, postponer: Agent, conversation: Conversati
tags=conversation.tags,
)
await self.on_new_conversation.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_POSTPONED,
agent_id=postponer.id,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def resolve_conversation(self, resolver: Agent, conversation: Conversation):
if resolver != conversation.assigned_agent:
Expand All @@ -258,6 +327,14 @@ async def resolve_conversation(self, resolver: Agent, conversation: Conversation
tags=conversation.tags,
)
await self.on_conversation_resolution.trigger(ConversationEvent(conversation=conversation))
await self._storage.save_event(
Event(
kind=EventKind.CONVERSATION_POSTPONED,
agent_id=resolver.id,
conversation_id=conversation.id,
customer_id=conversation.customer.id,
)
)

async def _choose_workplace(self, agent: Agent) -> Workplace:
existing_workplaces = await self._storage.get_agent_workplaces(agent)
Expand Down
2 changes: 2 additions & 0 deletions suppgram/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Optional, List, Any, Iterable, cast, Union
from uuid import UUID, uuid4

from suppgram.analytics import Reporter
from suppgram.backend import WorkplaceManager, Backend
from suppgram.entities import AgentIdentification
from suppgram.errors import NoStorageSpecified, NoFrontendSpecified
Expand Down Expand Up @@ -320,6 +321,7 @@ def _build_manager_frontend(self) -> Optional[ManagerFrontend]:
backend=self._build_backend(),
helper=self._build_telegram_helper(),
storage=self._build_telegram_storage(),
reporter=Reporter(self._build_storage()),
texts=self._build_texts(),
)

Expand Down
Loading

0 comments on commit ef3e9d2

Please sign in to comment.