diff --git a/.gitignore b/.gitignore index a91137e..edda771 100644 --- a/.gitignore +++ b/.gitignore @@ -143,3 +143,13 @@ prototype_json.json registered_users.json student_indexes.txt registration_codes.json + +# Plugins +plugins/* +!plugins/example +plugins/packages/* +!plugins/packages/ +!plugins/packages/install.sh +plugins/requirements/* +!plugins/requirements/ +!plugins/requirements/install.sh diff --git a/Dockerfile b/Dockerfile index 5635dc5..777dcff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -10,6 +10,25 @@ COPY requirements.txt . # Install the dependencies RUN python -m pip install -r requirements.txt +# Copy the plugin packages into the container +COPY plugins/packages/ /app/plugins/packages/ + +# Give the packages/install.sh file execute permissions and run it. +# After installation, remove the packages directory and the apt cache +RUN chmod +x /app/plugins/packages/install.sh && \ + bash -x /app/plugins/packages/install.sh && \ + rm -rf /var/lib/apt/lists/* && \ + rm -rf /app/plugins/packages + +# Copy the plugin requirements into the container +COPY plugins/requirements/ /app/plugins/requirements/ + +# Give the requirements/install.sh file execute permissions, run it +# and remove the requirements directory after installation +RUN chmod +x /app/plugins/requirements/install.sh && \ + bash -x /app/plugins/requirements/install.sh && \ + rm -rf /app/plugins/requirements + # Copy the local files into the container COPY . /app/ diff --git a/plugins/example/example.py b/plugins/example/example.py new file mode 100644 index 0000000..8900f8d --- /dev/null +++ b/plugins/example/example.py @@ -0,0 +1,37 @@ +# SPDX-License-Identifier: MIT +"""An example plugin cog. + +This cog is an example of how to create a plugin cog. +It is ignored and cannot be loaded by the bot. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import nextcord +from nextcord.ext import commands +from nextcord.interactions import Interaction + +if TYPE_CHECKING: + from sggwbot.sggw_bot import SGGWBot + + +class ExampleCog(commands.Cog): + """An example plugin cog.""" + + __slots__ = ("bot",) + + _bot: SGGWBot + + def __init__(self, bot: SGGWBot): + self._bot = bot + + @nextcord.slash_command(name="example", description="Example command") + async def _example(self, interaction: Interaction) -> None: + await interaction.response.send_message("Hello, world!", ephemeral=True) + + +def setup(bot: SGGWBot): + """Loads the ExampleCog cog.""" + bot.add_cog(ExampleCog(bot)) diff --git a/plugins/example/plugin.json b/plugins/example/plugin.json new file mode 100644 index 0000000..c26ec9a --- /dev/null +++ b/plugins/example/plugin.json @@ -0,0 +1,3 @@ +{ + "enabled": true +} \ No newline at end of file diff --git a/plugins/packages/install.sh b/plugins/packages/install.sh new file mode 100644 index 0000000..8baa8e2 --- /dev/null +++ b/plugins/packages/install.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# This script will install all packages listed +# in the files in the plugins/packages directory. +# The files should contain a list of packages, one per line. +# The files should be named the same as the plugin +# they are associated with and have a .txt extension. + +apt-get update + +for file in ./plugins/packages/*; do + + # Skip this file + if [ "$file" == "./plugins/packages/install.sh" ]; then + continue + fi + + # Check if the file is a regular file and not a directory + if [ -f "$file" ]; then + # Ensure the file has LF line endings + sed -i 's/\r$//' "$file" + + echo "Installing packages for $file" + apt-get --no-install-recommends install -y $(cat "$file") + + if [ $? -eq 0 ]; then + echo "Packages from $file installed successfully." + else + echo "Failed to install packages from $file." >&2 + fi + else + echo "$file is not a valid file and will be skipped." >&2 + fi + +done \ No newline at end of file diff --git a/plugins/requirements/install.sh b/plugins/requirements/install.sh new file mode 100644 index 0000000..852fe6b --- /dev/null +++ b/plugins/requirements/install.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# This script will install all packages listed +# in the files in the plugins/requirements directory. +# The files should contain a list of packages, one per line. +# The files should be named the same as the plugin +# they are associated with and have a .txt extension. + +for file in ./plugins/requirements/*; do + + # Skip this file + if [ "$file" == "./plugins/requirements/install.sh" ]; then + continue + fi + + # Check if the file is a regular file and not a directory + if [ -f "$file" ]; then + echo "Installing Python packages for $file" + + # Install the packages listed in the current file using pip + python -m pip install -r "$file" + + # Check if the pip installation was successful + if [ $? -eq 0 ]; then + echo "Packages from $file installed successfully." + else + echo "Failed to install packages from $file." >&2 + fi + else + echo "$file is not a valid file and will be skipped." >&2 + fi + +done diff --git a/requirements.txt b/requirements.txt index 703128c..61ebc7c 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/sggwbot/errors.py b/sggwbot/errors.py index 6f6fac6..a8fe497 100644 --- a/sggwbot/errors.py +++ b/sggwbot/errors.py @@ -34,6 +34,26 @@ class MissingPermission(SGGWBotError): """Missing permission.""" +class PluginError(SGGWBotError): + """Plugin error.""" + + +class PluginNotFoundError(PluginError): + """Plugin not found.""" + + __slots__ = ("plugin_name",) + + plugin_name: str + + def __init__(self, plugin_name: str) -> None: + self.plugin_name = plugin_name + super().__init__(f"Plugin '{plugin_name}' not found.") + + +class PluginOperationError(PluginError): + """Plugin operation error.""" + + @dataclass class ExceptionData: """Exception data with attributes to be passed to the error handler. diff --git a/sggwbot/plugins.py b/sggwbot/plugins.py new file mode 100644 index 0000000..97267d2 --- /dev/null +++ b/sggwbot/plugins.py @@ -0,0 +1,437 @@ +# SPDX-License-Identifier: MIT +"""A module to control the plugins. + +The plugins are used to extend the functionality of the bot. + +Plugins are loaded from the `plugins` directory. +They can be enabled, disabled, and reloaded using the `/plugins` command. + +Directories: `example`, `packages`, `requirements` and directories starting with `_` are ignored. +The `example` directory is an example plugin and cannot be loaded by the bot. +The `packages` and `requirements` directories +are used to install system packages and Python dependencies. + +Enabled plugins are loaded when the bot starts. + +CREATE A NEW PLUGIN +-------------------- +1. Copy the plugin structure from the `plugins/example` directory. +2. Rename the directory and the files (the main file must have the same name as the directory). +3. Rerun the bot to load the new plugin. + +PLUGIN STRUCTURE +----------------- +- `plugins/` + - `example` + - `example.py` (Main file of example plugin) + - `plugin.json` (Settings file for example plugin) + - `packages/` + - `install.sh` (Script to install system packages - do not edit) + - `your_plugin.txt` (List of system packages for your plugin, if needed) + - `requirements/` + - `install.sh` (Script to install Python dependencies - do not edit) + - `your_plugin.txt` (List of Python requirements for your plugin, if needed) + - `your_plugin/` + - `your_plugin.py` (Main file of your plugin) + - `plugin.json` (Settings file for your plugin) + +NOTES +----- +- The plugin main file must have the same name as the directory. +- The plugin settings file must be named `plugin.json`. + It must contain the `enabled` key with a boolean value. + If the file doesn't exist, it will be created with the `enabled` key set to `False`. +- In your plugin directory, you can create more directories and files as needed. +- It is recommended to name required system packages + and Python dependencies files as `your_plugin.txt`. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum, auto +import json +from pathlib import Path +from typing import TYPE_CHECKING, Any, ClassVar + +import nextcord +from nextcord.ext import commands, tasks +from nextcord.interactions import Interaction + +from sggwbot.console import Console, FontColour +from sggwbot.errors import ( + ExceptionData, + InvalidSettingsFile, + PluginError, + PluginNotFoundError, + PluginOperationError, +) +from sggwbot.utils import InteractionUtils + +if TYPE_CHECKING: + from sggw_bot import SGGWBot + +IGNORED_DIRECTORIES = {"example", "packages", "requirements"} + + +def _console_message( + message: str, + /, + _type: str = "PLUGINS", + *, + colour: FontColour = FontColour.GREY, + bold_text: bool = True, + bold_type: bool = True, +) -> None: + Console.specific( + message, + _type, + colour=colour, + bold_text=bold_text, + bold_type=bold_type, + ) + + +class PluginsCog(commands.Cog): + """A cog to control the plugins.""" + + __slots__ = ( + "_bot", + "_list", + ) + + _bot: SGGWBot + _list: list[Plugin] + + _DIR: ClassVar[str] = "plugins" + + def __init__(self, bot: SGGWBot) -> None: + """Initializes the cog.""" + self._bot = bot + self._list = [] + self._load_plugins() + self._plugins_info.start() # pylint: disable=no-member + + @tasks.loop(count=1) + async def _plugins_info(self): + await self._bot.wait_until_ready() + + enabled = [plugin.name for plugin in self._list if plugin.is_enabled] + disabled = [plugin.name for plugin in self._list if plugin.is_disabled] + invalid = [plugin.name for plugin in self._list if plugin.is_invalid] + + messages = [ + "Enabled plugins: " + (", ".join(enabled) if enabled else "-"), + "Disabled plugins: " + (", ".join(disabled) if disabled else "-"), + "Invalid plugins: " + (", ".join(invalid) if invalid else "-"), + ] + + for message in messages: + _console_message(message) + + @nextcord.slash_command( + name="plugins", + description="The plugins.", + dm_permission=False, + ) + async def _plugins(self, *_) -> None: + """The placeholder for the plugins command.""" + + @_plugins.subcommand( + name="list", + description="Shows the list of plugins.", + ) + @InteractionUtils.with_log() + async def _plugins_list(self, interaction: Interaction) -> None: + """Shows the list of plugins. + + Parameters + ---------- + interaction : Interaction + The interaction. + """ + + embed = nextcord.Embed( + title="Plugins", + colour=nextcord.Colour.light_grey(), + ) + + for status in PluginStatus: + plugins = [plugin.name for plugin in self._list if plugin.status == status] + + embed.add_field( + name=status.name.title() + ":", + value="- " + ", ".join(plugins) if plugins else "-", + inline=False, + ) + + await interaction.response.send_message(embed=embed, ephemeral=True) + + @_plugins.subcommand( + name="enable", + description="Enables a plugin.", + ) + @InteractionUtils.with_info( + before="Enabling the '{name}' plugin.", + after="The '{name}' plugin has been enabled.", + catch_exceptions=[ + ExceptionData( + PluginNotFoundError, + with_traceback_in_response=False, + with_traceback_in_log=False, + ), + ExceptionData( + ValueError, + with_traceback_in_response=False, + with_traceback_in_log=False, + ), + PluginOperationError, + ], + ) + @InteractionUtils.with_log() + async def _plugins_enable(self, _: Interaction, name: str) -> None: + """Enables a plugin. + + Parameters + ---------- + interaction : Interaction + The interaction. + name : str + The name of the plugin to enable. + """ + + plugin = self._find_plugin(name) + + if plugin.is_enabled: + raise ValueError(f"Plugin '{name}' is already enabled") + + if not self._bot.load_cog(plugin.extension_name): + raise PluginOperationError(f"Plugin '{name}' couldn't be enabled") + + plugin.enable() + + _console_message(f"Plugin {plugin.name} has been enabled.") + + @_plugins.subcommand( + name="disable", + description="Disables a plugin.", + ) + @InteractionUtils.with_info( + before="Disabling the '{name}' plugin.", + after="The '{name}' plugin has been disabled.", + catch_exceptions=[ + ExceptionData( + PluginNotFoundError, + with_traceback_in_response=False, + with_traceback_in_log=False, + ), + ExceptionData( + ValueError, + with_traceback_in_response=False, + with_traceback_in_log=False, + ), + PluginOperationError, + ], + ) + @InteractionUtils.with_log() + async def _plugins_disable(self, _: Interaction, name: str) -> None: + """Disables a plugin. + + Parameters + ---------- + interaction : Interaction + The interaction. + name : str + The name of the plugin to disable. + """ + + plugin = self._find_plugin(name) + + if plugin.is_disabled: + raise ValueError(f"Plugin '{name}' is already disabled") + + if not self._bot.unload_cog(plugin.extension_name): + raise PluginOperationError(f"Plugin '{name}' couldn't be disabled") + + plugin.disable() + _console_message(f"Plugin {plugin.name} has been disabled.") + + @_plugins.subcommand( + name="reload", + description="Reloads a plugin.", + ) + @InteractionUtils.with_info( + before="Reloading the '{name}' plugin.", + after="The '{name}' plugin has been reloaded.", + catch_exceptions=[ + ExceptionData( + PluginNotFoundError, + with_traceback_in_response=False, + with_traceback_in_log=False, + ), + PluginOperationError, + ], + ) + @InteractionUtils.with_log() + async def _plugins_reload(self, _: Interaction, name: str) -> None: + """Reloads the plugins. + + Parameters + ---------- + interaction : Interaction + The interaction. + name : str + The name of the plugin to reload. + """ + + plugin = self._find_plugin(name) + + method = self._bot.reload_cog if plugin.is_enabled else self._bot.load_cog + if not method(plugin.extension_name): + raise PluginOperationError(f"Plugin '{name}' couldn't be reloaded") + + _console_message(f"Plugin {plugin.name} has been reloaded.") + + def _load_plugins(self) -> None: + """Loads the plugins.""" + plugins: list[Plugin] = [] + + for plugin_dir in Path(self._DIR).iterdir(): + if plugin_dir.name in IGNORED_DIRECTORIES: + continue + + if plugin_dir.is_dir() and not plugin_dir.name.startswith("_"): + plugin = Plugin(plugin_dir.name, plugin_dir) + plugins.append(plugin) + + for plugin in plugins: + try: + plugin.load_settings_or_create_default() + except InvalidSettingsFile as e: + Console.error( + f"Plugin {plugin.name} has invalid settings file and cannot be loaded.", + exception=e, + ) + plugin.status = PluginStatus.INVALID + continue + + if plugin.is_enabled: + if not self._bot.load_cog(plugin.extension_name): + Console.error(f"Plugin {plugin.name} couldn't be loaded.") + plugin.status = PluginStatus.INVALID + + self._list = plugins + + def _find_plugin(self, name: str) -> Plugin: + """Finds a plugin by name. + + Parameters + ---------- + name : str + The name of the plugin. + + Returns + ------- + Plugin + The plugin. + """ + try: + return next(plugin for plugin in self._list if plugin.name == name) + except StopIteration as e: + raise PluginNotFoundError(name) from e + + +class PluginStatus(Enum): + """An enumeration to represent the plugin status.""" + + ENABLED = auto() + DISABLED = auto() + INVALID = auto() + + +@dataclass(slots=True) +class Plugin: + """A class to represent a plugin.""" + + name: str + directory: Path + status: PluginStatus = field(init=False) + + @property + def is_enabled(self) -> bool: + """Whether the plugin is enabled or not.""" + return self.status == PluginStatus.ENABLED + + @property + def is_disabled(self) -> bool: + """Whether the plugin is disabled or not.""" + return self.status == PluginStatus.DISABLED + + @property + def is_invalid(self) -> bool: + """Whether the plugin is invalid or not.""" + return self.status == PluginStatus.INVALID + + @property + def extension_name(self) -> str: + """The name of the extension.""" + return str(self.directory / self.name).replace("\\", ".").replace("/", ".") + + @property + def _settings_file(self) -> Path: + """The path to the settings file.""" + return self.directory / "plugin.json" + + def load_settings_or_create_default(self) -> None: + """Loads the settings or creates the default settings.""" + try: + if (file := self._settings_file).exists(): + with file.open("r") as file: + data: dict[str, Any] = json.load(file) + status: bool | None = data.get("enabled") + if status is None: + self.status = PluginStatus.INVALID + elif status: + self.status = PluginStatus.ENABLED + else: + self.status = PluginStatus.DISABLED + else: + data = {"enabled": False} + with file.open("w") as file: + json.dump(data, file, indent=4) + self.status = PluginStatus.DISABLED + except json.JSONDecodeError as e: + raise InvalidSettingsFile(file) from e + except OSError as e: + raise PluginError(f"Couldn't load the settings file {file}") from e + + def enable(self) -> None: + """Enables the plugin.""" + settings_file = self._settings_file + + with settings_file.open("r") as file: + data: dict[str, Any] = json.load(file) + data["enabled"] = True + + with settings_file.open("w") as file: + json.dump(data, file, indent=4) + + self.status = PluginStatus.ENABLED + + def disable(self) -> None: + """Disables the plugin.""" + settings_file = self._settings_file + + with settings_file.open("r") as file: + data: dict[str, Any] = json.load(file) + data["enabled"] = False + + with settings_file.open("w") as file: + json.dump(data, file, indent=4) + + self.status = PluginStatus.DISABLED + + +def setup(bot: SGGWBot): + """Loads the PluginsCog.""" + bot.add_cog(PluginsCog(bot)) diff --git a/sggwbot/sggw_bot.py b/sggwbot/sggw_bot.py index 41f9cb6..3c0194f 100644 --- a/sggwbot/sggw_bot.py +++ b/sggwbot/sggw_bot.py @@ -20,7 +20,6 @@ def setup(bot: SGGWBot) -> None: import json import os -import sys import time from pathlib import Path @@ -53,6 +52,18 @@ class SGGWBot(commands.Bot): _prefix: str _bot_channel_id: int + _cog_names = [ + "sggwbot.role_assignment", + "sggwbot.information", + "sggwbot.project", + "sggwbot.calendar", + "sggwbot.status", + "sggwbot.registration", + "sggwbot.messaging", + "sggwbot.voice_channel_manager", + "sggwbot.plugins", + ] + def __init__(self) -> None: intents = Intents.all() intents.members = True @@ -67,7 +78,9 @@ def __init__(self) -> None: ) dotenv.load_dotenv() - self._load_cogs() + + for cog_name in self._cog_names: + self.load_cog(cog_name) lines_of_code = ProjectUtils.lines_of_code() Console.info(f"Linijek kodu: {lines_of_code}") @@ -123,37 +136,95 @@ def get_bot_channel(self) -> TextChannel: assert isinstance(channel, TextChannel) return channel - def _load_cogs(self) -> None: - paths = [ - "sggwbot.role_assignment.py", - "sggwbot.information.py", - "sggwbot.project.py", - "sggwbot.calendar.py", - "sggwbot.status.py", - "sggwbot.registration.py", - "sggwbot.messaging.py", - "sggwbot.voice_channel_manager.py", - ] - - for path in paths: - cog_name = str(path)[:-3] - start_time = time.time() - - try: - cog_path = str(path)[:-3].replace( - "\\" if sys.platform != "linux" else "/", "." - ) - self.load_extension(cog_path) - load_time = (time.time() - start_time) * 1000 - Console.info(f"Cog '{cog_name}' has been loaded! ({load_time:.2f}ms)") - except ( - commands.ExtensionError, - ModuleNotFoundError, - nextcord.errors.HTTPException, - ) as e: - Console.important_error( - f"Cog '{cog_name}' couldn't be loaded!", exception=e - ) + def load_cog(self, cog_name: str) -> bool: + """Loads the cog. + + Parameters + ---------- + cog_name : str + The name of the cog to load. + + Returns + ------- + bool + Whether the cog has been loaded successfully. + """ + start_time = time.time() + + try: + self.load_extension(cog_name) + load_time = (time.time() - start_time) * 1000 + Console.info(f"Cog '{cog_name}' has been loaded! ({load_time:.2f}ms)") + return True + except ( + commands.ExtensionError, + ModuleNotFoundError, + nextcord.errors.HTTPException, + ) as e: + Console.important_error( + f"Cog '{cog_name}' couldn't be loaded!", exception=e + ) + return False + + def unload_cog(self, cog_name: str) -> bool: + """Unloads the cog. + + Parameters + ---------- + cog_name : str + The name of the cog to unload. + + Returns + ------- + bool + Whether the cog has been unloaded successfully. + """ + start_time = time.time() + + try: + self.unload_extension(cog_name) + load_time = (time.time() - start_time) * 1000 + Console.info(f"Cog '{cog_name}' has been unloaded! ({load_time:.2f}ms)") + return True + except ( + commands.ExtensionError, + ModuleNotFoundError, + nextcord.errors.HTTPException, + ) as e: + Console.important_error( + f"Cog '{cog_name}' couldn't be unloaded!", exception=e + ) + return False + + def reload_cog(self, cog_name: str) -> bool: + """Reloads the cog. + + Parameters + ---------- + cog_name : str + The name of the cog to reload. + + Returns + ------- + bool + Whether the cog has been reloaded successfully. + """ + start_time = time.time() + + try: + self.reload_extension(cog_name) + load_time = (time.time() - start_time) * 1000 + Console.info(f"Cog '{cog_name}' has been reloaded! ({load_time:.2f}ms)") + return True + except ( + commands.ExtensionError, + ModuleNotFoundError, + nextcord.errors.HTTPException, + ) as e: + Console.important_error( + f"Cog '{cog_name}' couldn't be reloaded!", exception=e + ) + return False def main(self) -> None: """Runs the bot using `BOT_TOKEN` received from `.env` file."""