-
Notifications
You must be signed in to change notification settings - Fork 77
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resend Broadcast #1267
Resend Broadcast #1267
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -50,11 +50,15 @@ def execute(self, event_id: Text, **kwargs): | |||||||||||||||||||||||||||||||||||||||||||
reference_id = None | ||||||||||||||||||||||||||||||||||||||||||||
status = EVENT_STATUS.FAIL.value | ||||||||||||||||||||||||||||||||||||||||||||
exception = None | ||||||||||||||||||||||||||||||||||||||||||||
is_resend = kwargs.get('is_resend', False) | ||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||
config, reference_id = self.__retrieve_config(event_id) | ||||||||||||||||||||||||||||||||||||||||||||
config, reference_id = self.__retrieve_config(event_id, is_resend) | ||||||||||||||||||||||||||||||||||||||||||||
broadcast = MessageBroadcastFactory.get_instance(config["connector_type"]).from_config(config, event_id, reference_id) | ||||||||||||||||||||||||||||||||||||||||||||
recipients = broadcast.get_recipients() | ||||||||||||||||||||||||||||||||||||||||||||
broadcast.send(recipients) | ||||||||||||||||||||||||||||||||||||||||||||
if is_resend: | ||||||||||||||||||||||||||||||||||||||||||||
broadcast.resend_broadcast() | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+53
to
+58
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def execute(self, event_id: Text, **kwargs):
+ def execute(self, event_id: str, **kwargs): The addition of the
|
||||||||||||||||||||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||||||||||||||||||||
recipients = broadcast.get_recipients() | ||||||||||||||||||||||||||||||||||||||||||||
broadcast.send(recipients) | ||||||||||||||||||||||||||||||||||||||||||||
status = EVENT_STATUS.COMPLETED.value | ||||||||||||||||||||||||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||
logger.exception(e) | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -99,6 +103,16 @@ def _add_schedule(self, config: Dict): | |||||||||||||||||||||||||||||||||||||||||||
MessageBroadcastProcessor.delete_task(msg_broadcast_id, self.bot) | ||||||||||||||||||||||||||||||||||||||||||||
raise AppException(e) | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def _resend_broadcast(self, msg_broadcast_id: Text): | ||||||||||||||||||||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||||||||||||||||||||
payload = {'bot': self.bot, 'user': self.user, | ||||||||||||||||||||||||||||||||||||||||||||
"event_id": msg_broadcast_id, "is_resend": True} | ||||||||||||||||||||||||||||||||||||||||||||
Utility.request_event_server(EventClass.message_broadcast, payload) | ||||||||||||||||||||||||||||||||||||||||||||
return msg_broadcast_id | ||||||||||||||||||||||||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||||||||||||||||||||||||
logger.error(e) | ||||||||||||||||||||||||||||||||||||||||||||
raise e | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+106
to
+114
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def _resend_broadcast(self, msg_broadcast_id: Text):
+ def _resend_broadcast(self, msg_broadcast_id: str): Committable suggestion
Suggested change
ToolsRuff
|
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def _update_schedule(self, msg_broadcast_id: Text, config: Dict): | ||||||||||||||||||||||||||||||||||||||||||||
settings_updated = False | ||||||||||||||||||||||||||||||||||||||||||||
current_settings = {} | ||||||||||||||||||||||||||||||||||||||||||||
|
@@ -130,13 +144,15 @@ def delete_schedule(self, msg_broadcast_id: Text): | |||||||||||||||||||||||||||||||||||||||||||
logger.error(e) | ||||||||||||||||||||||||||||||||||||||||||||
raise e | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
def __retrieve_config(self, event_id: Text): | ||||||||||||||||||||||||||||||||||||||||||||
reference_id = ObjectId().__str__() | ||||||||||||||||||||||||||||||||||||||||||||
config = MessageBroadcastProcessor.get_settings(event_id, self.bot) | ||||||||||||||||||||||||||||||||||||||||||||
def __retrieve_config(self, event_id: Text, is_resend: bool): | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
reference_id = MessageBroadcastProcessor.get_reference_id_from_broadcasting_logs(event_id) \ | ||||||||||||||||||||||||||||||||||||||||||||
if is_resend else ObjectId().__str__() | ||||||||||||||||||||||||||||||||||||||||||||
config = MessageBroadcastProcessor.get_settings(event_id, self.bot, is_resend=is_resend) | ||||||||||||||||||||||||||||||||||||||||||||
bot_settings = MongoProcessor.get_bot_settings(self.bot, self.user) | ||||||||||||||||||||||||||||||||||||||||||||
config["pyscript_timeout"] = bot_settings["dynamic_broadcast_execution_timeout"] | ||||||||||||||||||||||||||||||||||||||||||||
MessageBroadcastProcessor.add_event_log( | ||||||||||||||||||||||||||||||||||||||||||||
self.bot, MessageBroadcastLogType.common.value, reference_id, user=self.user, config=config, | ||||||||||||||||||||||||||||||||||||||||||||
status=EVENT_STATUS.INPROGRESS.value, event_id=event_id, is_new_log=True | ||||||||||||||||||||||||||||||||||||||||||||
status=EVENT_STATUS.INPROGRESS.value, event_id=event_id, is_new_log=True, is_resend=is_resend | ||||||||||||||||||||||||||||||||||||||||||||
Comment on lines
+147
to
+156
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def __retrieve_config(self, event_id: Text, is_resend: bool):
+ def __retrieve_config(self, event_id: str, is_resend: bool): The addition of the Committable suggestion
Suggested change
ToolsRuff
|
||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
return config, reference_id |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -15,7 +15,8 @@ def enqueue(self, event_request_type: Text, **kwargs): | |||||||||||
request_implementation = { | ||||||||||||
EventRequestType.trigger_async.value: self._trigger_async, | ||||||||||||
EventRequestType.add_schedule.value: self._add_schedule, | ||||||||||||
EventRequestType.update_schedule.value: self._update_schedule | ||||||||||||
EventRequestType.update_schedule.value: self._update_schedule, | ||||||||||||
EventRequestType.resend_broadcast.value: self._resend_broadcast | ||||||||||||
Comment on lines
+18
to
+19
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove Use - if event_request_type not in request_implementation.keys():
+ if event_request_type not in request_implementation: Committable suggestion
Suggested change
|
||||||||||||
} | ||||||||||||
if event_request_type not in request_implementation.keys(): | ||||||||||||
raise AppException(f"'{event_request_type}' is not a valid event server request!") | ||||||||||||
|
@@ -34,6 +35,10 @@ def _add_schedule(self, config: Dict): | |||||||||||
def _update_schedule(self, msg_broadcast_id: Text, config: Dict): | ||||||||||||
raise NotImplementedError("Provider not implemented") | ||||||||||||
|
||||||||||||
@abstractmethod | ||||||||||||
def _resend_broadcast(self, msg_broadcast_id: Text): | ||||||||||||
raise NotImplementedError("Provider not implemented") | ||||||||||||
Comment on lines
+39
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def _resend_broadcast(self, msg_broadcast_id: Text):
+ def _resend_broadcast(self, msg_broadcast_id: str): Committable suggestion
Suggested change
ToolsRuff
|
||||||||||||
|
||||||||||||
@abstractmethod | ||||||||||||
def delete_schedule(self, msg_broadcast_id: Text): | ||||||||||||
raise NotImplementedError("Provider not implemented") |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,9 +18,10 @@ | |||||||||||||||||||||||||
class MessageBroadcastProcessor: | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def get_settings(notification_id: Text, bot: Text): | ||||||||||||||||||||||||||
def get_settings(notification_id: Text, bot: Text, **kwargs): | ||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||
settings = MessageBroadcastSettings.objects(id=notification_id, bot=bot, status=True).get() | ||||||||||||||||||||||||||
status = not kwargs.get("is_resend", False) | ||||||||||||||||||||||||||
settings = MessageBroadcastSettings.objects(id=notification_id, bot=bot, status=status).get() | ||||||||||||||||||||||||||
Comment on lines
+21
to
+24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def get_settings(notification_id: Text, bot: Text, **kwargs):
+ def get_settings(notification_id: str, bot: str, **kwargs): The addition of the Committable suggestion
Suggested change
ToolsRuff
|
||||||||||||||||||||||||||
settings = settings.to_mongo().to_dict() | ||||||||||||||||||||||||||
settings["_id"] = settings["_id"].__str__() | ||||||||||||||||||||||||||
return settings | ||||||||||||||||||||||||||
|
@@ -81,10 +82,12 @@ def delete_task(notification_id: Text, bot: Text, delete_permanently: bool = Tru | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def add_event_log(bot: Text, log_type: Text, reference_id: Text = None, status: Text = None, **kwargs): | ||||||||||||||||||||||||||
event_completion_states = [EVENT_STATUS.FAIL.value, EVENT_STATUS.COMPLETED.value] | ||||||||||||||||||||||||||
is_new_log = log_type in {MessageBroadcastLogType.send.value, MessageBroadcastLogType.self.value} or kwargs.pop("is_new_log", None) | ||||||||||||||||||||||||||
is_resend = kwargs.pop("is_resend", False) | ||||||||||||||||||||||||||
event_completion_states = [] if is_resend else [EVENT_STATUS.FAIL.value, EVENT_STATUS.COMPLETED.value] | ||||||||||||||||||||||||||
is_new_log = log_type in {MessageBroadcastLogType.send.value, | ||||||||||||||||||||||||||
MessageBroadcastLogType.self.value} or kwargs.pop("is_new_log", None) | ||||||||||||||||||||||||||
Comment on lines
84
to
+88
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def add_event_log(bot: Text, log_type: Text, reference_id: Text = None, status: Text = None, **kwargs):
+ def add_event_log(bot: str, log_type: str, reference_id: str = None, status: str = None, **kwargs): The addition of the Committable suggestion
Suggested change
ToolsRuff
|
||||||||||||||||||||||||||
try: | ||||||||||||||||||||||||||
if is_new_log: | ||||||||||||||||||||||||||
if is_new_log and not is_resend: | ||||||||||||||||||||||||||
raise DoesNotExist() | ||||||||||||||||||||||||||
log = MessageBroadcastLogs.objects(bot=bot, reference_id=reference_id, log_type=log_type, | ||||||||||||||||||||||||||
status__nin=event_completion_states).get() | ||||||||||||||||||||||||||
|
@@ -108,18 +111,34 @@ def get_broadcast_logs(bot: Text, start_idx: int = 0, page_size: int = 10, **kwa | |||||||||||||||||||||||||
logs = json.loads(logs) | ||||||||||||||||||||||||||
return logs, total_count | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def get_reference_id_from_broadcasting_logs(event_id): | ||||||||||||||||||||||||||
log = MessageBroadcastLogs.objects(event_id=event_id, log_type=MessageBroadcastLogType.common.value).get() | ||||||||||||||||||||||||||
return log.reference_id | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def get_recent_broadcasting_logs(message_broadcast_logs): | ||||||||||||||||||||||||||
recent_logs, max_resend_count = [], 0 | ||||||||||||||||||||||||||
if message_broadcast_logs: | ||||||||||||||||||||||||||
max_resend_count = max(log["resend_count"] for log in message_broadcast_logs) | ||||||||||||||||||||||||||
recent_logs = [log for log in message_broadcast_logs if log["resend_count"] == max_resend_count] | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
return recent_logs, max_resend_count | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def extract_message_ids_from_broadcast_logs(reference_id: Text): | ||||||||||||||||||||||||||
message_broadcast_logs = MessageBroadcastLogs.objects(reference_id=reference_id, | ||||||||||||||||||||||||||
log_type=MessageBroadcastLogType.send.value) | ||||||||||||||||||||||||||
broadcast_logs = MessageBroadcastLogs.objects(reference_id=reference_id, | ||||||||||||||||||||||||||
log_type=MessageBroadcastLogType.send.value) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
message_broadcast_logs, resend_count = MessageBroadcastProcessor.get_recent_broadcasting_logs(broadcast_logs) | ||||||||||||||||||||||||||
broadcast_logs = { | ||||||||||||||||||||||||||
message['id']: log | ||||||||||||||||||||||||||
for log in message_broadcast_logs | ||||||||||||||||||||||||||
if log.api_response and log.api_response.get('messages', []) | ||||||||||||||||||||||||||
for message in log.api_response['messages'] | ||||||||||||||||||||||||||
if message['id'] | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
return broadcast_logs | ||||||||||||||||||||||||||
return broadcast_logs, resend_count | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def get_db_client(bot: Text): | ||||||||||||||||||||||||||
|
@@ -146,7 +165,8 @@ def log_broadcast_in_conversation_history(template_id, contact: Text, template_p | |||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def __add_broadcast_logs_status_and_errors(reference_id: Text, campaign_name: Text, broadcast_logs: Dict[Text, Document]): | ||||||||||||||||||||||||||
def __add_broadcast_logs_status_and_errors(reference_id: Text, campaign_name: Text, | ||||||||||||||||||||||||||
broadcast_logs: Dict[Text, Document], resend_count: int = 0): | ||||||||||||||||||||||||||
message_ids = list(broadcast_logs.keys()) | ||||||||||||||||||||||||||
channel_logs = ChannelLogs.objects(message_id__in=message_ids, type=ChannelTypes.WHATSAPP.value) | ||||||||||||||||||||||||||
for log in channel_logs: | ||||||||||||||||||||||||||
|
@@ -155,23 +175,28 @@ def __add_broadcast_logs_status_and_errors(reference_id: Text, campaign_name: Te | |||||||||||||||||||||||||
client = MessageBroadcastProcessor.get_db_client(broadcast_log['bot']) | ||||||||||||||||||||||||||
if log['errors']: | ||||||||||||||||||||||||||
status = "Failed" | ||||||||||||||||||||||||||
broadcast_log.update(errors=log['errors'], status="Failed") | ||||||||||||||||||||||||||
errors = log['errors'] | ||||||||||||||||||||||||||
else: | ||||||||||||||||||||||||||
status = "Success" | ||||||||||||||||||||||||||
errors = [] | ||||||||||||||||||||||||||
broadcast_log.update(errors=errors, status=status) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
MessageBroadcastProcessor.log_broadcast_in_conversation_history( | ||||||||||||||||||||||||||
template_id=broadcast_log['template_name'], contact=broadcast_log['recipient'], | ||||||||||||||||||||||||||
template_params=broadcast_log['template_params'], template=broadcast_log['template'], | ||||||||||||||||||||||||||
status=status, mongo_client=client | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
ChannelLogs.objects(message_id__in=message_ids, type=ChannelTypes.WHATSAPP.value).update(campaign_id=reference_id, campaign_name=campaign_name) | ||||||||||||||||||||||||||
ChannelLogs.objects(message_id__in=message_ids, type=ChannelTypes.WHATSAPP.value).update( | ||||||||||||||||||||||||||
campaign_id=reference_id, campaign_name=campaign_name, resend_count=resend_count | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def insert_status_received_on_channel_webhook(reference_id: Text, broadcast_name: Text): | ||||||||||||||||||||||||||
broadcast_logs = MessageBroadcastProcessor.extract_message_ids_from_broadcast_logs(reference_id) | ||||||||||||||||||||||||||
broadcast_logs, resend_count = MessageBroadcastProcessor.extract_message_ids_from_broadcast_logs(reference_id) | ||||||||||||||||||||||||||
if broadcast_logs: | ||||||||||||||||||||||||||
MessageBroadcastProcessor.__add_broadcast_logs_status_and_errors(reference_id, broadcast_name, broadcast_logs) | ||||||||||||||||||||||||||
MessageBroadcastProcessor.__add_broadcast_logs_status_and_errors(reference_id, broadcast_name, | ||||||||||||||||||||||||||
broadcast_logs, resend_count) | ||||||||||||||||||||||||||
Comment on lines
+196
to
+199
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Replace
- def insert_status_received_on_channel_webhook(reference_id: Text, broadcast_name: Text):
+ def insert_status_received_on_channel_webhook(reference_id: str, broadcast_name: str):
|
||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
@staticmethod | ||||||||||||||||||||||||||
def get_channel_metrics(channel_type: Text, bot: Text): | ||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid function call
Security
in argument defaults.Move the
Security
function call inside the function.Committable suggestion
Tools
Ruff