diff --git a/config/memory_statistics.py b/config/memory_statistics.py new file mode 100644 index 0000000000..4dbd3f3ad4 --- /dev/null +++ b/config/memory_statistics.py @@ -0,0 +1,114 @@ +import click +from swsscommon.swsscommon import ConfigDBConnector +# from utilities_common.cli import AbbreviationGroup + + +class AbbreviationGroup(click.Group): + def get_command(self, ctx, cmd_name): + return super().get_command(ctx, cmd_name) + + +@click.group(cls=AbbreviationGroup, name="memory-statistics") +def memory_statistics(): + """Configure the Memory Statistics feature""" + pass + + +def check_memory_statistics_table_existence(memory_statistics_table): + """Checks whether the 'MEMORY_STATISTICS' table is configured in Config DB.""" + if not memory_statistics_table: + click.echo("Unable to retrieve 'MEMORY_STATISTICS' table from Config DB.", err=True) + return False + + if "memory_statistics" not in memory_statistics_table: + click.echo("Unable to retrieve key 'memory_statistics' from MEMORY_STATISTICS table.", err=True) + return False + + return True + + +def get_memory_statistics_table(db): + """Get the MEMORY_STATISTICS table from the database.""" + return db.get_table("MEMORY_STATISTICS") + + +@memory_statistics.command(name="enable", short_help="Enable the Memory Statistics feature") +def memory_statistics_enable(): + """Enable the Memory Statistics feature""" + db = ConfigDBConnector() + db.connect() + + memory_statistics_table = get_memory_statistics_table(db) + if not check_memory_statistics_table_existence(memory_statistics_table): + return # Exit gracefully on error + + try: + db.mod_entry("MEMORY_STATISTICS", "memory_statistics", {"enabled": "true", "disabled": "false"}) + click.echo("Memory Statistics feature enabled.") + except Exception as e: + click.echo(f"Error enabling Memory Statistics feature: {str(e)}", err=True) + + click.echo("Save SONiC configuration using 'config save' to persist the changes.") + + +@memory_statistics.command(name="disable", short_help="Disable the Memory Statistics feature") +def memory_statistics_disable(): + """Disable the Memory Statistics feature""" + db = ConfigDBConnector() + db.connect() + + memory_statistics_table = get_memory_statistics_table(db) + if not check_memory_statistics_table_existence(memory_statistics_table): + return # Exit gracefully on error + + try: + db.mod_entry("MEMORY_STATISTICS", "memory_statistics", {"enabled": "false", "disabled": "true"}) + click.echo("Memory Statistics feature disabled.") + except Exception as e: + click.echo(f"Error disabling Memory Statistics feature: {str(e)}", err=True) + + click.echo("Save SONiC configuration using 'config save' to persist the changes.") + + +@memory_statistics.command(name="retention-period", short_help="Configure the retention period for Memory Statistics") +@click.argument('retention_period', metavar='', required=True, type=int) +def memory_statistics_retention_period(retention_period): + """Set the retention period for Memory Statistics""" + db = ConfigDBConnector() + db.connect() + + memory_statistics_table = get_memory_statistics_table(db) + if not check_memory_statistics_table_existence(memory_statistics_table): + return # Exit gracefully on error + + try: + db.mod_entry("MEMORY_STATISTICS", "memory_statistics", {"retention_period": retention_period}) + click.echo(f"Memory Statistics retention period set to {retention_period} days.") + except Exception as e: + click.echo(f"Error setting retention period: {str(e)}", err=True) + + click.echo("Save SONiC configuration using 'config save' to persist the changes.") + + +@memory_statistics.command(name="sampling-interval", short_help="Configure the sampling interval for Memory Statistics") +@click.argument('sampling_interval', metavar='', required=True, type=int) +def memory_statistics_sampling_interval(sampling_interval): + """Set the sampling interval for Memory Statistics""" + db = ConfigDBConnector() + db.connect() + + memory_statistics_table = get_memory_statistics_table(db) + if not check_memory_statistics_table_existence(memory_statistics_table): + return # Exit gracefully on error + + try: + db.mod_entry("MEMORY_STATISTICS", "memory_statistics", {"sampling_interval": sampling_interval}) + click.echo(f"Memory Statistics sampling interval set to {sampling_interval} minutes.") + except Exception as e: + click.echo(f"Error setting sampling interval: {str(e)}", err=True) + + click.echo("Save SONiC configuration using 'config save' to persist the changes.") + + +if __name__ == "__main__": + memory_statistics() diff --git a/show/memory_statistics.py b/show/memory_statistics.py new file mode 100644 index 0000000000..9a5aed69b9 --- /dev/null +++ b/show/memory_statistics.py @@ -0,0 +1,100 @@ +import click +from tabulate import tabulate + +import utilities_common.cli as clicommon +from swsscommon.swsscommon import ConfigDBConnector + + +# +# 'memory-statistics' group (show memory-statistics ...) +# +@click.group(cls=clicommon.AliasedGroup, name="memory-statistics") +def memory_statistics(): + """Show memory statistics configuration and logs""" + pass + + +def get_memory_statistics_config(field_name): + """Fetches the configuration of memory_statistics from `CONFIG_DB`. + + Args: + field_name: A string containing the field name in the sub-table of 'memory_statistics'. + + Returns: + field_value: If field name was found, then returns the corresponding value. + Otherwise, returns "Unknown". + """ + field_value = "Unknown" + config_db = ConfigDBConnector() + config_db.connect() + memory_statistics_table = config_db.get_table("MEMORY_STATISTICS") + if (memory_statistics_table and + "memory_statistics" in memory_statistics_table and + field_name in memory_statistics_table["config"]): + field_value = memory_statistics_table["memory_statistics"][field_name] + + return field_value + + +@memory_statistics.command(name="memory_statitics", short_help="Show the configuration of memory statistics") +def config(): + admin_mode = "Disabled" + admin_enabled = get_memory_statistics_config("enabled") + if admin_enabled == "true": + admin_mode = "Enabled" + + click.echo("Memory Statistics administrative mode: {}".format(admin_mode)) + + retention_time = get_memory_statistics_config("retention_time") + click.echo("Memory Statistics retention time (days): {}".format(retention_time)) + + sampling_interval = get_memory_statistics_config("sampling_interval") + click.echo("Memory Statistics sampling interval (minutes): {}".format(sampling_interval)) + + +def fetch_memory_statistics(starting_time=None, ending_time=None, select=None): + """Fetch memory statistics from the database. + + Args: + starting_time: The starting time for filtering the statistics. + ending_time: The ending time for filtering the statistics. + additional_options: Any additional options for filtering or formatting. + + Returns: + A list of memory statistics entries. + """ + config_db = ConfigDBConnector() + config_db.connect() + + memory_statistics_table = config_db.get_table("MEMORY_STATISTICS") + filtered_statistics = [] + + for key, entry in memory_statistics_table.items(): + # Add filtering logic here based on starting_time, ending_time, and select + if (not starting_time or entry.get("time") >= starting_time) and \ + (not ending_time or entry.get("time") <= ending_time): + # Implement additional filtering based on select if needed + filtered_statistics.append(entry) + + return filtered_statistics + + +@memory_statistics.command(name="logs", short_help="Show memory statistics logs with optional filtering") +@click.argument('starting_time', required=False) +@click.argument('ending_time', required=False) +@click.argument('additional_options', required=False, nargs=-1) +def show_memory_statistics_logs(starting_time, ending_time, select): + """Show memory statistics logs with optional filtering by time and select.""" + + # Fetch memory statistics + memory_statistics = fetch_memory_statistics(starting_time, ending_time, select) + + if not memory_statistics: + click.echo("No memory statistics available for the given parameters.") + return + + # Display the memory statistics + headers = ["Time", "Statistic", "Value"] # Adjust according to the actual fields + table_data = [[entry.get("time"), entry.get("statistic"), entry.get("value")] for entry in memory_statistics] + + click.echo(tabulate(table_data, headers=headers, tablefmt="grid")) diff --git a/tests/memory_statistics_test.py b/tests/memory_statistics_test.py new file mode 100644 index 0000000000..aa2fb6e2c4 --- /dev/null +++ b/tests/memory_statistics_test.py @@ -0,0 +1,122 @@ +import pytest +from unittest.mock import patch +from click.testing import CliRunner +from utilities_common.cli import AbbreviationGroup +from config.memory_statistics import ( + memory_statistics_enable, + memory_statistics_disable, + memory_statistics_retention_period, + memory_statistics_sampling_interval, + get_memory_statistics_table, + check_memory_statistics_table_existence, +) + + +@pytest.fixture +def mock_db(): + """Fixture for the mock database.""" + with patch("config.memory_statistics.ConfigDBConnector") as MockConfigDBConnector: + mock_db_instance = MockConfigDBConnector.return_value + yield mock_db_instance + + +def test_memory_statistics_enable(mock_db): + """Test enabling the Memory Statistics feature.""" + mock_db.get_table.return_value = {"memory_statistics": {"enabled": "false"}} + runner = CliRunner() + + with patch("click.echo") as mock_echo: + result = runner.invoke(memory_statistics_enable) + assert result.exit_code == 0 # Ensure the command exits without error + assert mock_echo.call_count == 2 # Check if the echo function was called twice + mock_db.mod_entry.assert_called_once_with( + "MEMORY_STATISTICS", "memory_statistics", + {"enabled": "true", "disabled": "false"} + ) + + +def test_memory_statistics_disable(mock_db): + """Test disabling the Memory Statistics feature.""" + mock_db.get_table.return_value = {"memory_statistics": {"enabled": "true"}} + runner = CliRunner() + + with patch("click.echo") as mock_echo: + result = runner.invoke(memory_statistics_disable) + assert result.exit_code == 0 + assert mock_echo.call_count == 2 + mock_db.mod_entry.assert_called_once_with( + "MEMORY_STATISTICS", "memory_statistics", + {"enabled": "false", "disabled": "true"} + ) + + +def test_memory_statistics_retention_period(mock_db): + """Test setting the retention period for Memory Statistics.""" + mock_db.get_table.return_value = {"memory_statistics": {}} + runner = CliRunner() + retention_period_value = 30 + + with patch("click.echo") as mock_echo: + result = runner.invoke(memory_statistics_retention_period, [str(retention_period_value)]) + assert result.exit_code == 0 + assert mock_echo.call_count == 2 + mock_db.mod_entry.assert_called_once_with( + "MEMORY_STATISTICS", "memory_statistics", + {"retention_period": retention_period_value} + ) + + +def test_memory_statistics_sampling_interval(mock_db): + """Test setting the sampling interval for Memory Statistics.""" + mock_db.get_table.return_value = {"memory_statistics": {}} + runner = CliRunner() + sampling_interval_value = 10 + + with patch("click.echo") as mock_echo: + result = runner.invoke(memory_statistics_sampling_interval, [str(sampling_interval_value)]) + assert result.exit_code == 0 + assert mock_echo.call_count == 2 + mock_db.mod_entry.assert_called_once_with( + "MEMORY_STATISTICS", "memory_statistics", + {"sampling_interval": sampling_interval_value} + ) + + +def test_check_memory_statistics_table_existence(): + """Test existence check for MEMORY_STATISTICS table.""" + assert check_memory_statistics_table_existence({"memory_statistics": {}}) is True + assert check_memory_statistics_table_existence({}) is False + + +def test_get_memory_statistics_table(mock_db): + """Test getting MEMORY_STATISTICS table.""" + mock_db.get_table.return_value = {"memory_statistics": {}} + + result = get_memory_statistics_table(mock_db) + assert result == {"memory_statistics": {}} + + +def test_abbreviation_group_get_command_existing_command(): + """Test AbbreviationGroup's get_command method with an existing command.""" + # Create an instance of AbbreviationGroup with a sample command. + group = AbbreviationGroup() + + # Invoke get_command with the name of the existing command. + command = group.get_command(ctx=None, cmd_name="existing_command") + + # Check that the correct command is returned. + assert command is None + + +def test_check_memory_statistics_table_existence_missing_key(): + """Test check_memory_statistics_table_existence when 'memory_statistics' key is missing.""" + with patch("click.echo") as mock_echo: + result = check_memory_statistics_table_existence({"another_key": {}}) + + # Ensure the function returns False when 'memory_statistics' key is missing. + assert result is False + + # Check that the specific error message was outputted. + mock_echo.assert_called_once_with( + "Unable to retrieve key 'memory_statistics' from MEMORY_STATISTICS table.", err=True + )