From d05a064649890fd931e5980a766f5619269e7ce7 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 15 Oct 2025 14:04:27 +0000 Subject: [PATCH] update snapshotter tests --- tests/unit/_autoscaling/test_snapshotter.py | 316 +++++++++++--------- 1 file changed, 168 insertions(+), 148 deletions(-) diff --git a/tests/unit/_autoscaling/test_snapshotter.py b/tests/unit/_autoscaling/test_snapshotter.py index cf6682bf31..06a5682a32 100644 --- a/tests/unit/_autoscaling/test_snapshotter.py +++ b/tests/unit/_autoscaling/test_snapshotter.py @@ -2,36 +2,59 @@ from datetime import datetime, timedelta, timezone from logging import getLogger -from typing import cast +from typing import TYPE_CHECKING, cast from unittest.mock import MagicMock import pytest from crawlee import service_locator from crawlee._autoscaling import Snapshotter -from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, EventLoopSnapshot, Snapshot +from crawlee._autoscaling._types import ClientSnapshot, CpuSnapshot, MemorySnapshot from crawlee._autoscaling.snapshotter import SortedSnapshotList from crawlee._utils.byte_size import ByteSize from crawlee._utils.system import CpuInfo, MemoryInfo from crawlee.configuration import Configuration +from crawlee.events import LocalEventManager from crawlee.events._types import Event, EventSystemInfoData +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + @pytest.fixture -def snapshotter() -> Snapshotter: +async def event_manager() -> AsyncGenerator[LocalEventManager, None]: + # Use a long interval to avoid interference from periodic system info events during tests + async with LocalEventManager(system_info_interval=timedelta(hours=9999)) as event_manager: + yield event_manager + + +@pytest.fixture +async def snapshotter(event_manager: LocalEventManager) -> AsyncGenerator[Snapshotter, None]: config = Configuration(available_memory_ratio=0.25) - return Snapshotter.from_config(config) + service_locator.set_event_manager(event_manager) + async with Snapshotter.from_config(config) as snapshotter: + yield snapshotter @pytest.fixture -def event_system_data_info() -> EventSystemInfoData: +def default_cpu_info() -> CpuInfo: + return CpuInfo(used_ratio=0.5) + + +@pytest.fixture +def default_memory_info() -> MemoryInfo: + return MemoryInfo( + total_size=ByteSize.from_gb(8), + current_size=ByteSize.from_gb(4), + system_wide_used_size=ByteSize.from_gb(5), + ) + + +@pytest.fixture +def event_system_data_info(default_cpu_info: CpuInfo, default_memory_info: MemoryInfo) -> EventSystemInfoData: return EventSystemInfoData( - cpu_info=CpuInfo(used_ratio=0.5), - memory_info=MemoryInfo( - total_size=ByteSize.from_gb(8), - current_size=ByteSize.from_gb(4), - system_wide_used_size=ByteSize.from_gb(5), - ), + cpu_info=default_cpu_info, + memory_info=default_memory_info, ) @@ -42,19 +65,29 @@ async def test_start_stop_lifecycle() -> None: pass -def test_snapshot_cpu(snapshotter: Snapshotter, event_system_data_info: EventSystemInfoData) -> None: - snapshotter._snapshot_cpu(event_system_data_info) - assert len(snapshotter._cpu_snapshots) == 1 - assert snapshotter._cpu_snapshots[0].used_ratio == event_system_data_info.cpu_info.used_ratio +async def test_snapshot_cpu( + snapshotter: Snapshotter, event_system_data_info: EventSystemInfoData, event_manager: LocalEventManager +) -> None: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_data_info) + await event_manager.wait_for_all_listeners_to_complete() + cpu_snapshots = cast('list[CpuSnapshot]', snapshotter.get_cpu_sample()) + assert len(cpu_snapshots) == 1 + assert cpu_snapshots[0].used_ratio == event_system_data_info.cpu_info.used_ratio -def test_snapshot_memory(snapshotter: Snapshotter, event_system_data_info: EventSystemInfoData) -> None: - snapshotter._snapshot_memory(event_system_data_info) - assert len(snapshotter._memory_snapshots) == 1 - assert snapshotter._memory_snapshots[0].current_size == event_system_data_info.memory_info.current_size +async def test_snapshot_memory( + snapshotter: Snapshotter, event_system_data_info: EventSystemInfoData, event_manager: LocalEventManager +) -> None: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_data_info) + await event_manager.wait_for_all_listeners_to_complete() + memory_snapshots = cast('list[MemorySnapshot]', snapshotter.get_memory_sample()) + assert len(memory_snapshots) == 1 + assert memory_snapshots[0].current_size == event_system_data_info.memory_info.current_size -def test_snapshot_memory_with_memory_info_sets_system_wide_fields(snapshotter: Snapshotter) -> None: +async def test_snapshot_memory_with_memory_info_sets_system_wide_fields( + snapshotter: Snapshotter, event_manager: LocalEventManager +) -> None: memory_info = MemoryInfo( total_size=ByteSize.from_gb(16), current_size=ByteSize.from_gb(4), @@ -66,10 +99,13 @@ def test_snapshot_memory_with_memory_info_sets_system_wide_fields(snapshotter: S memory_info=memory_info, ) - snapshotter._snapshot_memory(event_data) + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() + + memory_snapshots = cast('list[MemorySnapshot]', snapshotter.get_memory_sample()) - assert len(snapshotter._memory_snapshots) == 1 - memory_snapshot = snapshotter._memory_snapshots[0] + assert len(memory_snapshots) == 1 + memory_snapshot = memory_snapshots[0] # Test that system-wide fields are properly set assert memory_snapshot.system_wide_used_size == memory_info.system_wide_used_size @@ -77,19 +113,15 @@ def test_snapshot_memory_with_memory_info_sets_system_wide_fields(snapshotter: S def test_snapshot_event_loop(snapshotter: Snapshotter) -> None: - snapshotter._event_loop_snapshots = Snapshotter._get_sorted_list_by_created_at( - [ - EventLoopSnapshot(delay=timedelta(milliseconds=100), max_delay=timedelta(milliseconds=500)), - ] - ) - - snapshotter._snapshot_event_loop() - assert len(snapshotter._event_loop_snapshots) == 2 + # A first event loop snapshot is created when an instance is created. + event_loop_snapshots = snapshotter.get_event_loop_sample() + assert len(event_loop_snapshots) == 1 def test_snapshot_client(snapshotter: Snapshotter) -> None: - snapshotter._snapshot_client() - assert len(snapshotter._client_snapshots) == 1 + # A first client snapshot is created when an instance is created. + client_snapshots = snapshotter.get_client_sample() + assert len(client_snapshots) == 1 def test_snapshot_client_overloaded() -> None: @@ -99,35 +131,42 @@ def test_snapshot_client_overloaded() -> None: assert ClientSnapshot(error_count=7, new_error_count=3, max_error_count=2).is_overloaded -async def test_get_cpu_sample(snapshotter: Snapshotter) -> None: +async def test_get_cpu_sample( + snapshotter: Snapshotter, event_manager: LocalEventManager, default_memory_info: MemoryInfo +) -> None: now = datetime.now(timezone.utc) - cpu_snapshots = Snapshotter._get_sorted_list_by_created_at( - [ - CpuSnapshot(used_ratio=0.5, max_used_ratio=0.95, created_at=now - timedelta(hours=delta)) - for delta in range(5, 0, -1) - ] - ) - snapshotter._cpu_snapshots = cpu_snapshots + snapshotter._SNAPSHOT_HISTORY = timedelta(hours=10) # Extend history for testing - async with snapshotter: - # When no sample duration is provided it should return all snapshots - samples = snapshotter.get_cpu_sample() - assert len(samples) == len(cpu_snapshots) + events_data = [ + EventSystemInfoData( + cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), + memory_info=default_memory_info, + ) + for delta in range(5, 0, -1) + ] + for event_data in events_data: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() + + # When no sample duration is provided it should return all snapshots + samples = snapshotter.get_cpu_sample() + assert len(samples) == len(events_data) - duration = timedelta(hours=0.5) - samples = snapshotter.get_cpu_sample(duration) - assert len(samples) == 1 + duration = timedelta(hours=0.5) + samples = snapshotter.get_cpu_sample(duration) + assert len(samples) == 1 - duration = timedelta(hours=2.5) - samples = snapshotter.get_cpu_sample(duration) - assert len(samples) == 3 + duration = timedelta(hours=2.5) + samples = snapshotter.get_cpu_sample(duration) + assert len(samples) == 3 - duration = timedelta(hours=10) - samples = snapshotter.get_cpu_sample(duration) - assert len(samples) == len(cpu_snapshots) + duration = timedelta(hours=10) + samples = snapshotter.get_cpu_sample(duration) + assert len(samples) == len(events_data) -async def test_methods_raise_error_when_not_active(snapshotter: Snapshotter) -> None: +async def test_methods_raise_error_when_not_active() -> None: + snapshotter = Snapshotter.from_config(Configuration(available_memory_ratio=0.25)) assert snapshotter.active is False with pytest.raises(RuntimeError, match=r'Snapshotter is not active.'): @@ -155,119 +194,105 @@ async def test_methods_raise_error_when_not_active(snapshotter: Snapshotter) -> assert snapshotter.active is True -def test_snapshot_pruning_removes_outdated_records(snapshotter: Snapshotter) -> None: +async def test_snapshot_pruning_removes_outdated_records( + snapshotter: Snapshotter, event_manager: LocalEventManager, default_memory_info: MemoryInfo +) -> None: # Set the snapshot history to 2 hours snapshotter._SNAPSHOT_HISTORY = timedelta(hours=2) # Create timestamps for testing now = datetime.now(timezone.utc) - two_hours_ago = now - timedelta(hours=2) - three_hours_ago = now - timedelta(hours=3) - five_hours_ago = now - timedelta(hours=5) - - # Create mock snapshots with varying creation times - snapshots = Snapshotter._get_sorted_list_by_created_at( - [ - CpuSnapshot(used_ratio=0.5, max_used_ratio=0.95, created_at=five_hours_ago), - CpuSnapshot(used_ratio=0.6, max_used_ratio=0.95, created_at=three_hours_ago), - CpuSnapshot(used_ratio=0.7, max_used_ratio=0.95, created_at=two_hours_ago), - CpuSnapshot(used_ratio=0.8, max_used_ratio=0.95, created_at=now), - ] - ) - - # Assign these snapshots to one of the lists (e.g., CPU snapshots) - snapshotter._cpu_snapshots = snapshots - - # Prune snapshots older than 2 hours - snapshots_casted = cast('list[Snapshot]', snapshotter._cpu_snapshots) - snapshotter._prune_snapshots(snapshots_casted, now) - - # Check that only the last two snapshots remain - assert len(snapshotter._cpu_snapshots) == 2 - assert snapshotter._cpu_snapshots[0].created_at == two_hours_ago - assert snapshotter._cpu_snapshots[1].created_at == now + events_data = [ + EventSystemInfoData( + cpu_info=CpuInfo(used_ratio=0.5, created_at=now - timedelta(hours=delta)), + memory_info=default_memory_info, + ) + for delta in [5, 3, 2, 0] + ] -def test_pruning_empty_snapshot_list_remains_empty(snapshotter: Snapshotter) -> None: - now = datetime.now(timezone.utc) - snapshotter._cpu_snapshots = Snapshotter._get_sorted_list_by_created_at(list[CpuSnapshot]()) - snapshots_casted = cast('list[Snapshot]', snapshotter._cpu_snapshots) - snapshotter._prune_snapshots(snapshots_casted, now) - assert snapshotter._cpu_snapshots == [] - - -def test_snapshot_pruning_keeps_recent_records_unaffected(snapshotter: Snapshotter) -> None: - snapshotter._SNAPSHOT_HISTORY = timedelta(hours=2) - - # Create timestamps for testing - now = datetime.now(timezone.utc) - one_hour_ago = now - timedelta(hours=1) - - # Create mock snapshots with varying creation times - snapshots = Snapshotter._get_sorted_list_by_created_at( - [ - CpuSnapshot(used_ratio=0.7, max_used_ratio=0.95, created_at=one_hour_ago), - CpuSnapshot(used_ratio=0.8, max_used_ratio=0.95, created_at=now), - ] - ) - - # Assign these snapshots to one of the lists (e.g., CPU snapshots) - snapshotter._cpu_snapshots = snapshots + for event_data in events_data: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() - # Prune snapshots older than 2 hours - snapshots_casted = cast('list[Snapshot]', snapshotter._cpu_snapshots) - snapshotter._prune_snapshots(snapshots_casted, now) + cpu_snapshots = cast('list[CpuSnapshot]', snapshotter.get_cpu_sample()) # Check that only the last two snapshots remain - assert len(snapshotter._cpu_snapshots) == 2 - assert snapshotter._cpu_snapshots[0].created_at == one_hour_ago - assert snapshotter._cpu_snapshots[1].created_at == now + assert len(cpu_snapshots) == 2 + assert cpu_snapshots[0].created_at == now - timedelta(hours=2) + assert cpu_snapshots[1].created_at == now -def test_memory_load_evaluation_logs_warning_on_high_usage(caplog: pytest.LogCaptureFixture) -> None: - config = Configuration(memory_mbytes=ByteSize.from_gb(8).to_mb()) +async def test_memory_load_evaluation_logs_warning_on_high_usage( + caplog: pytest.LogCaptureFixture, + event_manager: LocalEventManager, + default_cpu_info: CpuInfo, +) -> None: + config = Configuration(memory_mbytes=8192) + service_locator.set_event_manager(event_manager) snapshotter = Snapshotter.from_config(config) high_memory_usage = ByteSize.from_gb(8) * 0.95 # 95% of 8 GB - snapshotter._evaluate_memory_load( - current_memory_usage_size=high_memory_usage, - snapshot_timestamp=datetime.now(timezone.utc), + event_data = EventSystemInfoData( + cpu_info=default_cpu_info, + memory_info=MemoryInfo( + total_size=ByteSize.from_gb(8), + current_size=high_memory_usage, + system_wide_used_size=ByteSize.from_gb(7), + ), ) - assert len(caplog.records) == 1 - assert caplog.records[0].levelname.lower() == 'warning' - assert 'Memory is critically overloaded' in caplog.records[0].msg + async with snapshotter: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() - # It should not log again, since the last log was short time ago - snapshotter._evaluate_memory_load( - current_memory_usage_size=high_memory_usage, - snapshot_timestamp=datetime.now(timezone.utc), - ) + # Filter log records to only include those from snapshotter + log_records = [record for record in caplog.records if 'snapshotter' in record.pathname.lower()] + + assert len(log_records) == 1 + assert log_records[0].levelname.lower() == 'warning' + assert 'Memory is critically overloaded' in log_records[0].msg + + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() + + log_records = [record for record in caplog.records if 'snapshotter' in record.pathname.lower()] - assert len(caplog.records) == 1 + assert len(log_records) == 1 -def test_memory_load_evaluation_silent_on_acceptable_usage( +async def test_memory_load_evaluation_silent_on_acceptable_usage( monkeypatch: pytest.MonkeyPatch, - snapshotter: Snapshotter, + event_manager: LocalEventManager, + default_cpu_info: CpuInfo, ) -> None: mock_logger_warn = MagicMock() monkeypatch.setattr(getLogger('crawlee.autoscaling.snapshotter'), 'warning', mock_logger_warn) - snapshotter._max_memory_size = ByteSize.from_gb(8) + + service_locator.set_event_manager(event_manager) + snapshotter = Snapshotter.from_config(Configuration(memory_mbytes=8192)) low_memory_usage = ByteSize.from_gb(8) * 0.8 # 80% of 8 GB - snapshotter._evaluate_memory_load( - current_memory_usage_size=low_memory_usage, - snapshot_timestamp=datetime.now(timezone.utc), + event_data = EventSystemInfoData( + cpu_info=default_cpu_info, + memory_info=MemoryInfo( + total_size=ByteSize.from_gb(8), + current_size=low_memory_usage, + system_wide_used_size=ByteSize.from_gb(7), + ), ) - assert mock_logger_warn.call_count == 0 + async with snapshotter: + event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_data) + await event_manager.wait_for_all_listeners_to_complete() + + assert mock_logger_warn.call_count == 0 -async def test_snapshots_time_ordered(snapshotter: Snapshotter) -> None: +async def test_snapshots_time_ordered(snapshotter: Snapshotter, event_manager: LocalEventManager) -> None: # All internal snapshot list should be ordered by creation time in ascending order. # Scenario where older emitted event arrives after newer event. # Snapshotter should not trust the event order and check events' times. @@ -285,21 +310,16 @@ def create_event_data(creation_time: datetime) -> EventSystemInfoData: ), ) - async with ( - service_locator.get_event_manager() as event_manager, - snapshotter, - ): - event_manager.emit(event=Event.SYSTEM_INFO, event_data=create_event_data(time_new)) - await event_manager.wait_for_all_listeners_to_complete() - event_manager.emit(event=Event.SYSTEM_INFO, event_data=create_event_data(time_old)) - await event_manager.wait_for_all_listeners_to_complete() + event_manager.emit(event=Event.SYSTEM_INFO, event_data=create_event_data(time_new)) + event_manager.emit(event=Event.SYSTEM_INFO, event_data=create_event_data(time_old)) + await event_manager.wait_for_all_listeners_to_complete() - memory_samples = snapshotter.get_memory_sample() - cpu_samples = snapshotter.get_cpu_sample() - assert memory_samples[0].created_at == time_old - assert cpu_samples[0].created_at == time_old - assert memory_samples[1].created_at == time_new - assert cpu_samples[1].created_at == time_new + memory_samples = snapshotter.get_memory_sample() + cpu_samples = snapshotter.get_cpu_sample() + assert memory_samples[0].created_at == time_old + assert cpu_samples[0].created_at == time_old + assert memory_samples[1].created_at == time_new + assert cpu_samples[1].created_at == time_new def test_sorted_snapshot_list_add_maintains_order() -> None: