Skip to content

Commit

Permalink
Merge pull request #788 from skalenetwork/repair-sync-node
Browse files Browse the repository at this point in the history
Add sync-node repair command
  • Loading branch information
badrogger authored Oct 16, 2024
2 parents 3cdc4f6 + 110b498 commit ccbfa96
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 17 deletions.
43 changes: 42 additions & 1 deletion node_cli/cli/sync_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import click

from node_cli.core.node import init_sync, update_sync
from node_cli.core.node import init_sync, update_sync, repair_sync
from node_cli.utils.helper import (
abort_if_false,
error_exit,
Expand Down Expand Up @@ -95,3 +95,44 @@ def _init_sync(env_file, archive, catchup, historic_state, snapshot_from: Option
@streamed_cmd
def _update_sync(env_file, unsafe_ok):
update_sync(env_file)


@sync_node.command('repair', help='Start sync node from empty database')
@click.option('--yes', is_flag=True, callback=abort_if_false,
expose_value=False,
prompt='Are you sure you want to start sync node from empty database?')
@click.option(
'--archive',
help=TEXTS['init']['archive'],
is_flag=True
)
@click.option(
'--catchup',
help=TEXTS['init']['catchup'],
is_flag=True
)
@click.option(
'--historic-state',
help=TEXTS['init']['historic_state'],
is_flag=True
)
@click.option(
'--snapshot-from',
type=IP_TYPE,
default=None,
hidden=True,
help='Ip of the node from to download snapshot from'
)
@streamed_cmd
def _repair_sync(
archive: str,
catchup: str,
historic_state: str,
snapshot_from: Optional[str] = None
) -> None:
repair_sync(
archive=archive,
catchup=catchup,
historic_state=historic_state,
snapshot_from=snapshot_from
)
22 changes: 22 additions & 0 deletions node_cli/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
turn_on_op,
restore_op,
init_sync_op,
repair_sync_op,
update_sync_op
)
from node_cli.utils.print_formatters import (
Expand Down Expand Up @@ -234,6 +235,27 @@ def update_sync(env_filepath: str, unsafe_ok: bool = False) -> None:
logger.info('Node update finished')


@check_inited
@check_user
def repair_sync(
archive: bool,
catchup: bool,
historic_state: bool,
snapshot_from: str
) -> None:

env_params = extract_env_params(INIT_ENV_FILEPATH, sync_node=True)
schain_name = env_params['SCHAIN_NAME']
repair_sync_op(
schain_name=schain_name,
archive=archive,
catchup=catchup,
historic_state=historic_state,
snapshot_from=snapshot_from
)
logger.info('Schain was started from scratch')


def get_node_env(
env_filepath,
inited_node=False,
Expand Down
59 changes: 53 additions & 6 deletions node_cli/core/schains.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import glob
import logging
import os
import pprint
Expand All @@ -11,7 +12,8 @@
ALLOCATION_FILEPATH,
NODE_CONFIG_PATH,
NODE_CLI_STATUS_FILENAME,
SCHAIN_NODE_DATA_PATH
SCHAIN_NODE_DATA_PATH,
SCHAINS_MNT_DIR_SYNC
)
from node_cli.configs.env import get_env_config

Expand Down Expand Up @@ -94,20 +96,37 @@ def get_node_cli_schain_status_filepath(schain_name: str) -> str:
return os.path.join(SCHAIN_NODE_DATA_PATH, schain_name, NODE_CLI_STATUS_FILENAME)


def update_node_cli_schain_status(schain_name: str, status: dict) -> None:
def update_node_cli_schain_status(
schain_name: str,
repair_ts: Optional[int] = None,
snapshot_from: Optional[str] = None
) -> None:
path = get_node_cli_schain_status_filepath(schain_name)
os.makedirs(os.path.dirname(path), exist_ok=True)
if os.path.isdir(path):
orig_status = get_node_cli_schain_status(schain_name=schain_name)
orig_status.update({'repair_ts': repair_ts, 'snapshot_from': snapshot_from})
status = orig_status
else:
status = {
'schain_name': schain_name,
'repair_ts': repair_ts,
'snapshot_from': snapshot_from
}
os.makedirs(os.path.dirname(path), exist_ok=True)
save_json(path, status)


def get_node_cli_schain_status(schain_name: str) -> dict:
path = get_node_cli_schain_status_filepath(schain_name)
return read_json(path)


def toggle_schain_repair_mode(
schain: str,
snapshot_from: Optional[str] = None
) -> None:
ts = int(time.time())
status = {'schain_name': schain, 'repair_ts': ts}
status.update({'snapshot_from': snapshot_from})
update_node_cli_schain_status(schain, status)
update_node_cli_schain_status(schain_name=schain, repair_ts=ts, snapshot_from=snapshot_from)
print('Schain has been set for repair')


Expand Down Expand Up @@ -168,6 +187,10 @@ def make_btrfs_snapshot(src: str, dst: str) -> None:
run_cmd(['btrfs', 'subvolume', 'snapshot', src, dst])


def rm_btrfs_subvolume(subvolume: str) -> None:
run_cmd(['btrfs', 'subvolume', 'delete', subvolume])


def fillin_snapshot_folder(src_path: str, block_number: int) -> None:
snapshots_dirname = 'snapshots'
snapshot_folder_path = os.path.join(
Expand Down Expand Up @@ -224,3 +247,27 @@ def ensure_schain_volume(schain: str, schain_type: str, env_type: str) -> None:
ensure_volume(schain, size)
else:
logger.warning('Volume %s already exists', schain)


def cleanup_sync_datadir(schain_name: str, base_path: str = SCHAINS_MNT_DIR_SYNC) -> None:
base_path = os.path.join(base_path, schain_name)
regular_folders_pattern = f'{base_path}/[!snapshots]*'
logger.info('Removing regular folders')
for filepath in glob.glob(regular_folders_pattern):
if os.path.isdir(filepath):
logger.debug('Removing recursively %s', filepath)
shutil.rmtree(filepath)
if os.path.isfile(filepath):
os.remove(filepath)

logger.info('Removing subvolumes')
subvolumes_pattern = f'{base_path}/snapshots/*/*'
for filepath in glob.glob(subvolumes_pattern):
logger.debug('Deleting subvolume %s', filepath)
if os.path.isdir(filepath):
rm_btrfs_subvolume(filepath)
else:
os.remove(filepath)
logger.info('Cleaning up snapshots folder')
if os.path.isdir(base_path):
shutil.rmtree(base_path)
3 changes: 2 additions & 1 deletion node_cli/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
update_sync as update_sync_op,
turn_off as turn_off_op,
turn_on as turn_on_op,
restore as restore_op
restore as restore_op,
repair_sync as repair_sync_op
)
31 changes: 29 additions & 2 deletions node_cli/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@
from node_cli.operations.skale_node import download_skale_node, sync_skale_node, update_images
from node_cli.core.checks import CheckType, run_checks as run_host_checks
from node_cli.core.iptables import configure_iptables
from node_cli.core.schains import update_node_cli_schain_status
from node_cli.core.schains import update_node_cli_schain_status, cleanup_sync_datadir
from node_cli.utils.docker_utils import (
compose_rm,
compose_up,
docker_cleanup,
remove_dynamic_containers
remove_dynamic_containers,
remove_schain_container,
start_admin,
stop_admin
)
from node_cli.utils.meta import get_meta_info, update_meta
from node_cli.utils.print_formatters import print_failed_requirements_checks
Expand Down Expand Up @@ -344,3 +347,27 @@ def restore(env, backup_path, config_only=False):
print_failed_requirements_checks(failed_checks)
return False
return True


def repair_sync(
schain_name: str,
archive: bool,
catchup: bool,
historic_state: bool,
snapshot_from: Optional[str]
) -> None:
stop_admin(sync_node=True)
remove_schain_container(schain_name=schain_name)

logger.info('Updating node options')
cleanup_sync_datadir(schain_name=schain_name)

logger.info('Updating node options')
node_options = NodeOptions()
node_options.archive = archive
node_options.catchup = catchup
node_options.historic_state = historic_state

logger.info('Updating cli status')
update_node_cli_schain_status(schain_name, snapshot_from=snapshot_from)
start_admin(sync_node=True)
50 changes: 50 additions & 0 deletions node_cli/utils/docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import itertools
import os
import logging
from typing import Optional

import docker
from docker.client import DockerClient
Expand All @@ -39,6 +40,7 @@

logger = logging.getLogger(__name__)

ADMIN_REMOVE_TIMEOUT = 60
SCHAIN_REMOVE_TIMEOUT = 300
IMA_REMOVE_TIMEOUT = 20
TELEGRAF_REMOVE_TIMEOUT = 20
Expand Down Expand Up @@ -131,6 +133,54 @@ def safe_rm(container: Container, timeout=DOCKER_DEFAULT_STOP_TIMEOUT, **kwargs)
logger.info(f'Container removed: {container_name}')


def stop_container(
container_name: str,
timeout: int = DOCKER_DEFAULT_STOP_TIMEOUT,
dclient: Optional[DockerClient] = None
) -> None:
dc = dclient or docker_client()
container = dc.containers.get(container_name)
logger.info('Stopping container: %s, timeout: %s', container_name, timeout)
container.stop(timeout=timeout)


def rm_container(
container_name: str,
timeout: int = DOCKER_DEFAULT_STOP_TIMEOUT,
dclient: Optional[DockerClient] = None
) -> None:
dc = dclient or docker_client()
container_names = [container.name for container in get_containers()]
if container_name in container_names:
container = dc.containers.get(container_name)
safe_rm(container)


def start_container(
container_name: str,
dclient: Optional[DockerClient] = None
) -> None:
dc = dclient or docker_client()
container = dc.containers.get(container_name)
logger.info('Starting container %s', container_name)
container.start()


def start_admin(sync_node: bool = False, dclient: Optional[DockerClient] = None) -> None:
container_name = 'skale_sync_admin' if sync_node else 'skale_admin'
start_container(container_name=container_name, dclient=dclient)


def stop_admin(sync_node: bool = False, dclient: Optional[DockerClient] = None) -> None:
container_name = 'skale_sync_admin' if sync_node else 'skale_admin'
stop_container(container_name=container_name, timeout=ADMIN_REMOVE_TIMEOUT, dclient=dclient)


def remove_schain_container(schain_name: str, dclient: Optional[DockerClient] = None) -> None:
container_name = f'skale_schain_{schain_name}'
rm_container(container_name, timeout=SCHAIN_REMOVE_TIMEOUT, dclient=dclient)


def backup_container_logs(
container: Container,
head: int = DOCKER_DEFAULT_HEAD_LINES,
Expand Down
13 changes: 11 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from node_cli.utils.docker_utils import docker_client
from node_cli.utils.global_config import generate_g_config_file

from tests.helper import TEST_META_V1, TEST_META_V2, TEST_META_V3
from tests.helper import TEST_META_V1, TEST_META_V2, TEST_META_V3, TEST_SCHAINS_MNT_DIR_SYNC


TEST_ENV_PARAMS = """
Expand Down Expand Up @@ -307,8 +307,17 @@ def tmp_config_dir():

@pytest.fixture
def tmp_schains_dir():
os.makedirs(SCHAIN_NODE_DATA_PATH)
os.makedirs(SCHAIN_NODE_DATA_PATH, exist_ok=True)
try:
yield SCHAIN_NODE_DATA_PATH
finally:
shutil.rmtree(SCHAIN_NODE_DATA_PATH)


@pytest.fixture
def tmp_sync_datadir():
os.makedirs(TEST_SCHAINS_MNT_DIR_SYNC, exist_ok=True)
try:
yield TEST_SCHAINS_MNT_DIR_SYNC
finally:
shutil.rmtree(TEST_SCHAINS_MNT_DIR_SYNC)
13 changes: 11 additions & 2 deletions tests/core/core_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from node_cli.configs import NODE_DATA_PATH
from node_cli.configs.resource_allocation import RESOURCE_ALLOCATION_FILEPATH
from node_cli.core.node import BASE_CONTAINERS_AMOUNT, is_base_containers_alive
from node_cli.core.node import init, pack_dir, update, is_update_safe
from node_cli.core.node import init, pack_dir, update, is_update_safe, repair_sync

from tests.helper import response_mock, safe_update_api_response, subprocess_run_mock
from tests.resources_test import BIG_DISK_SIZE
Expand Down Expand Up @@ -169,7 +169,9 @@ def test_update_node(mocked_g_config, resource_file):
), mock.patch('node_cli.core.resources.get_disk_size', return_value=BIG_DISK_SIZE), mock.patch(
'node_cli.core.host.init_data_dir'
):
with mock.patch('node_cli.utils.helper.requests.get', return_value=safe_update_api_response()): # noqa
with mock.patch(
'node_cli.utils.helper.requests.get', return_value=safe_update_api_response()
): # noqa
result = update(env_filepath, pull_config_for_schain=None)
assert result is None

Expand All @@ -183,3 +185,10 @@ def test_is_update_safe():
'node_cli.utils.helper.requests.get', return_value=safe_update_api_response(safe=False)
):
assert not is_update_safe()


def test_repair_sync(tmp_sync_datadir, mocked_g_config, resource_file):
with mock.patch('node_cli.core.schains.rm_btrfs_subvolume'), \
mock.patch('node_cli.utils.docker_utils.stop_container'), \
mock.patch('node_cli.utils.docker_utils.start_container'):
repair_sync(archive=True, catchup=True, historic_state=True, snapshot_from='127.0.0.1')
Loading

0 comments on commit ccbfa96

Please sign in to comment.