Skip to content

Commit a53f8a8

Browse files
committed
Cleanup local logs after cloud compute log manager is done uploading them to cloud stoage
Fixed an issue where schedule > Insert changelog entry or delete this section.
1 parent 9ce5277 commit a53f8a8

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
from abc import abstractmethod
66
from collections import defaultdict
77
from contextlib import contextmanager
8+
import sys
89
from typing import IO, Iterator, Optional, Sequence, Tuple
10+
from dagster._utils.error import serializable_error_info_from_exc_info
11+
912

1013
from dagster._core.instance import T_DagsterInstance
1114
from dagster._core.storage.compute_log_manager import (
@@ -87,6 +90,12 @@ def open_log_stream(
8790
def _on_capture_complete(self, log_key: Sequence[str]):
8891
self.upload_to_cloud_storage(log_key, ComputeIOType.STDOUT)
8992
self.upload_to_cloud_storage(log_key, ComputeIOType.STDERR)
93+
try:
94+
self.local_manager.delete_logs(log_key=log_key)
95+
except Exception as e:
96+
sys.stderr.write(
97+
f"Exception deleting local logs after capture complete: {serializable_error_info_from_exc_info(sys.exc_info())}\n"
98+
)
9099

91100
def is_capture_complete(self, log_key: Sequence[str]) -> bool:
92101
if self.local_manager.is_capture_complete(log_key):

python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_compute_log_manager.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@ def simple():
6969
event = capture_events[0]
7070
file_key = event.logs_captured_data.file_key
7171
log_key = manager.build_log_key_for_run(result.run_id, file_key)
72+
73+
# verify locally cached logs are deleted after they are captured
74+
local_path = manager._local_manager.get_captured_local_path( # noqa: SLF001
75+
log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT]
76+
)
77+
assert not os.path.exists(local_path)
78+
7279
log_data = manager.get_log_data(log_key)
7380
stdout = log_data.stdout.decode("utf-8") # pyright: ignore[reportOptionalMemberAccess]
7481
assert stdout == HELLO_WORLD + SEPARATOR
@@ -85,16 +92,11 @@ def simple():
8592
for expected in EXPECTED_LOGS:
8693
assert expected in stderr_s3
8794

88-
# Check download behavior by deleting locally cached logs
89-
local_dir = os.path.dirname(
90-
manager._local_manager.get_captured_local_path( # noqa: SLF001
91-
log_key, IO_TYPE_EXTENSION[ComputeIOType.STDOUT]
92-
)
93-
)
94-
for filename in os.listdir(local_dir):
95-
os.unlink(os.path.join(local_dir, filename))
96-
9795
log_data = manager.get_log_data(log_key)
96+
97+
# Re-downloads the data to the local filesystem again
98+
assert os.path.exists(local_path)
99+
98100
stdout = log_data.stdout.decode("utf-8") # pyright: ignore[reportOptionalMemberAccess]
99101
assert stdout == HELLO_WORLD + SEPARATOR
100102

0 commit comments

Comments
 (0)