diff --git a/endpoints/slack.py b/endpoints/slack.py index fbb731f..00235e9 100644 --- a/endpoints/slack.py +++ b/endpoints/slack.py @@ -9,7 +9,6 @@ from slack_sdk import WebClient from slack_sdk.errors import SlackApiError - # ref: https://github.com/fla9ua/markdown_to_mrkdwn class SlackMarkdownConverter: """ @@ -193,7 +192,121 @@ def _convert_line(self, line: str) -> str: class SlackEndpoint(Endpoint): CACHE_PREFIX = "thread-cache" - CACHE_DURATION = 60 * 60 * 24 # 1 day + CACHE_DURATION = 60 * 60 * 24 # 24 hour + CLEANUP_INTERVAL = 60 * 60 * 24 # 24 hours + + def _add_key_to_registry(self, key: str): + """Add key to registry""" + if key == "__keys__": # Prevent infinite loop + return + + try: + # Get current registry + registry = self._get_key_registry() + + # Add key (update time if existing) + registry[key] = int(time.time()) # last_update time + registry["__last_updated"] = int(time.time()) + + # Save registry + self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8")) + + except Exception as e: + print(f"Error adding key to registry: {e}") + + def _remove_key_from_registry(self, key: str): + """Remove key from registry""" + if key == "__keys__": + return + + try: + registry = self._get_key_registry() + + if key in registry: + del registry[key] + registry["__last_updated"] = int(time.time()) + self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8")) + + except Exception as e: + print(f"Error removing key from registry: {e}") + + def _get_key_registry(self) -> dict: + """Get key registry""" + try: + raw = self.session.storage.get("__keys__") + if raw: + return json.loads(raw.decode("utf-8")) + else: + return {"__last_updated": int(time.time())} + except Exception: + return {"__last_updated": int(time.time())} + + def _get_all_keys(self) -> List[str]: + """Get all keys (except __last_updated)""" + try: + registry = self._get_key_registry() + return [k for k in registry.keys() if k != "__last_updated"] + except Exception: + return [] + + def _cleanup_storage(self, cleanup_percentage: float = 0.5): + """Clean up storage (delete old keys)""" + try: + registry = self._get_key_registry() + + # Get keys other than __last_updated + keys_with_time = [(k, v) for k, v in registry.items() if k != "__last_updated"] + + if not keys_with_time: + return + + # Sort by time in ascending order (from oldest) + keys_with_time.sort(key=lambda x: x[1]) + + # Calculate number of keys to delete + total_keys = len(keys_with_time) + keys_to_delete = int(total_keys * cleanup_percentage) + + if keys_to_delete > 0: + # Delete from oldest keys + for i in range(keys_to_delete): + key_to_delete = keys_with_time[i][0] + try: + self.session.storage.delete(key_to_delete) + self._remove_key_from_registry(key_to_delete) + except Exception as e: + print(f"Error deleting key {key_to_delete}: {e}") + + print(f"Storage cleanup completed: deleted {keys_to_delete} keys out of {total_keys}") + + except Exception as e: + print(f"Error during storage cleanup: {e}") + + def _should_cleanup_storage(self) -> bool: + """Check if storage cleanup is needed at 24-hour intervals""" + try: + registry = self._get_key_registry() + last_cleanup = registry.get("__last_cleanup", 0) + current_time = int(time.time()) + return (current_time - last_cleanup) >= self.CLEANUP_INTERVAL + except Exception: + return False + + def _periodic_cleanup_if_needed(self): + """Perform periodic cleanup as needed (30% deletion)""" + if self._should_cleanup_storage(): + try: + # Light cleanup (30% deletion) + self._cleanup_storage(cleanup_percentage=0.3) + + # Record cleanup time + registry = self._get_key_registry() + registry["__last_cleanup"] = int(time.time()) + self.session.storage.set("__keys__", json.dumps(registry).encode("utf-8")) + + print("Periodic storage cleanup completed") + except Exception as e: + print(f"Error during periodic cleanup: {e}") def _load_cached_history(self, channel: str, thread_ts: str): key = f"{self.CACHE_PREFIX}-{channel}-{thread_ts}" @@ -212,6 +325,7 @@ def _load_cached_history(self, channel: str, thread_ts: str): data["messages"] = messages data["last_cleanup"] = now try: + self._add_key_to_registry(key) self.session.storage.set(key, json.dumps(data).encode("utf-8")) except Exception: pass @@ -235,6 +349,7 @@ def _append_thread_message(self, channel: str, thread_ts: str, message: Mapping) data["messages"].append(msg) data["last_cleanup"] = now try: + self._add_key_to_registry(key) self.session.storage.set(key, json.dumps(data).encode("utf-8")) except Exception: pass @@ -243,6 +358,9 @@ def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response: """ Invokes the endpoint with the given request. """ + # Periodic cleanup check + self._periodic_cleanup_if_needed() + # Check if this is a retry and if we should ignore it retry_num = r.headers.get("X-Slack-Retry-Num") if not settings.get("allow_retry") and ( @@ -286,6 +404,41 @@ def _invoke(self, r: Request, values: Mapping, settings: Mapping) -> Response: token = settings.get("bot_token") client = WebClient(token=token) + # Check if this is a cache deletion request + if message.strip().lower() == "delcache": + # Delete thread cache + cache_key = f"{self.CACHE_PREFIX}-{channel}-{thread_ts}" + conversation_key = f"slack-{channel}-{thread_ts}" + + try: + # Delete both cache and conversation storage + self._remove_key_from_registry(cache_key) + self.session.storage.delete(cache_key) + self._remove_key_from_registry(conversation_key) + self.session.storage.delete(conversation_key) + + # Send confirmation message + client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text=f"✅ Thread cache has been successfully deleted. \n{cache_key} \n{conversation_key}" + ) + except Exception as e: + print(f"Error deleting cache: {e}") + # Send error message + try: + client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text=f"❌ Error deleting cache: {str(e)}" + ) + except SlackApiError: + pass + + return Response( + status=200, response="ok", content_type="text/plain" + ) + # store the incoming app mention message self._append_thread_message( channel, @@ -543,6 +696,7 @@ def replace_id_with_name(match): answer = response.get("answer") conversation_id = response.get("conversation_id") if conversation_id: + self._add_key_to_registry(key_to_check) self.session.storage.set( key_to_check, conversation_id.encode("utf-8") ) @@ -668,9 +822,31 @@ def replace_id_with_name(match): thread_ts=thread_ts, text=f"Sorry, I'm having trouble processing your request. Please try again later. Error: {err_msg}", ) + except SlackApiError: # Failed to send error message pass + + # Check if storage size limit exceeded and cleanup if needed + if "allocated size is greater than max storage size" in err_msg.lower(): + try: + self._cleanup_storage(cleanup_percentage=0.5) + # Send cleanup notification + client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text="🧹 Storage cleanup completed. Please try your request again.", + ) + except Exception as cleanup_error: + print(f"Error during storage cleanup: {cleanup_error}") + try: + client.chat_postMessage( + channel=channel, + thread_ts=thread_ts, + text="⚠️ Storage cleanup failed. Please contact administrator.", + ) + except SlackApiError: + pass return Response( status=200, diff --git a/manifest.yaml b/manifest.yaml index 8fc7c87..db511ae 100644 --- a/manifest.yaml +++ b/manifest.yaml @@ -33,7 +33,7 @@ resource: enabled: true storage: enabled: true - size: 1048576 + size: 10485760 plugins: endpoints: - group/slack.yaml