Skip to content

Commit

Permalink
MFGs, MTempFGs and SharedMFGS now keep the order of filenames passed …
Browse files Browse the repository at this point in the history
…in when calling their callback

This fixes a long standing gotcha when doing
def do_it(files):
    files[0].write_text(...)

and files[0] not being the first file passed to
MultiFileGeneratingJob(['b','B1']), but the alphabetically
first one.

The long term fix of course is to use MultiFileGeneratingJob(
{'key': 'b', 'other_key': 'B1'}) instead.
  • Loading branch information
TyberiusPrime committed May 3, 2024
1 parent a6a3a9f commit 6c43fc4
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 31 deletions.
4 changes: 1 addition & 3 deletions python/pypipegraph2/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
escape_logging,
)
from . import util
from rich.logging import RichHandler
from rich.console import Console


logger.level("JT", no=6, color="<yellow>", icon="🐍")
Expand Down Expand Up @@ -239,7 +237,7 @@ def smart_format(x):
str(next_number)
)
log_position_lookup_file.write(
"{} | {:>15}:{:>4} | {}\n".format(
"{} | {:>15}:{:>4} | {}\n".format(
log_position_lookup[key],
x["file"],
x["line"],
Expand Down
50 changes: 38 additions & 12 deletions python/pypipegraph2/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,13 @@ def __getitem__(self, key):
raise ValueError(
f"{self.job_id} has no lookup dictionary - files was not a dict, and key was not an integer index(into files)"
)
return self._map_filename(self._lookup[key])
try:
query = self._lookup[key]
except TypeError:
raise ValueError(
"Can not access a MultiFileGeneratingJob with a string if it was passed a list of files to generate. Either use an integer index, or pass in a dict"
)
return self._map_filename(query)

@staticmethod
def _validate_func_argument(func):
Expand Down Expand Up @@ -664,6 +670,10 @@ def _validate_files_argument(files, allow_absolute=False):
abs_files = [_normalize_path(x) for x in path_files]
if lookup:
lookup = {lookup[ii]: abs_files[ii] for ii in range(len(lookup))}
else:
temp = sorted([(f, ii) for (ii, f) in enumerate(abs_files)])
temp = {ii: f for (f, ii) in temp}
lookup = [temp[ii] for ii in range(len(temp))]
return sorted(abs_files), lookup

def readd(self):
Expand Down Expand Up @@ -760,7 +770,8 @@ def run(self, runner, historical_output): # noqa:C901
"w+",
)
exception_out = open(
runner.job_graph.dir_config.run_dir / f"{self.job_number}.exception", "w+b"
runner.job_graph.dir_config.run_dir / f"{self.job_number}.exception",
"w+b",
) # note the binary!

def aborted(sig, stack):
Expand Down Expand Up @@ -2405,10 +2416,17 @@ def __init__(
(self.output_dir_prefix / "__never_placed_here__" / f) for f in self.files
]
if self._lookup:
self._lookup = {
k: self.org_files[self.files.index(v)]
for (k, v) in self._lookup.items()
}
if isinstance(self._lookup, list):
self._lookup_for_org_order = [
(self.output_dir_prefix / "__never_placed_here__" / f)
for f in self._lookup
]
self._lookup = None
else:
self._lookup = {
k: self.org_files[self.files.index(v)]
for (k, v) in self._lookup.items()
}
self.files = self.org_files[:]

if len(self.files) != len(set(self.files)):
Expand Down Expand Up @@ -2557,6 +2575,8 @@ def run(self, runner, historical_output):
# self._raise_partial_result_exception()
missing = [x for x in fns if not x.exists()]
if missing: # pragma: no cover - defensive
for mm in sorted(missing):
log_error("Job '{self.job_id}' - missing output: '{mm}'")
raise ValueError(
"missing output files - did somebody go and delete them?!", missing
)
Expand Down Expand Up @@ -2633,7 +2653,8 @@ def run(self, runner, historical_output):
if did_build or not self.job_id in runner.history:
self._cleanup(runner)
lock_file = (
global_pipegraph.dir_config.history_dir / SharedMultiFileGeneratingJob.log_filename
global_pipegraph.dir_config.history_dir
/ SharedMultiFileGeneratingJob.log_filename
).with_suffix(".lock")
lock = filelock.FileLock(lock_file, timeout=random.randint(8, 20))
try:
Expand All @@ -2656,7 +2677,10 @@ def _log_local_usage(self, key):
and find the files"""
from . import global_pipegraph

fn = global_pipegraph.dir_config.history_dir / SharedMultiFileGeneratingJob.log_filename
fn = (
global_pipegraph.dir_config.history_dir
/ SharedMultiFileGeneratingJob.log_filename
)
with _SharedMultiFileGeneratingJob_log_local_lock:
if fn.exists():
keys = json.loads(fn.read_text())
Expand Down Expand Up @@ -2698,16 +2722,17 @@ def _map_filename(self, filename):
return Path(*out_parts)

def get_input(self): # todo: fold in?
# if self._single_file:
# return self._map_filename(self.files[0])
# else:
if self._lookup:
return (
{k: self._map_filename(f) for (k, f) in self._lookup.items()},
self.target_folder,
)
else:
return ([self._map_filename(f) for f in self.files], self.target_folder)
# return ([self._map_filename(f) for f in self.files], self.target_folder)
return (
[self._map_filename(f) for f in self._lookup_for_org_order],
self.target_folder,
)

def output_needed(self, _runner):
# output needed is called at the very beginning
Expand Down Expand Up @@ -2804,6 +2829,7 @@ def find_file(self, output_filename): # for compability with ppg1.
return self[output_filename]

def __getitem__(self, key):
print(self._lookup)
if not self._lookup:
if isinstance(key, int):
return self._map_filename(self.org_files[key])
Expand Down
20 changes: 10 additions & 10 deletions tests/test_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -1559,27 +1559,27 @@ def test_log_retention(self):
assert len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*.log"))) == 0
ppg.run()
assert (
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*")))
== 1 + 1 + 1 # for latest
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*messages*")))
== 1 + 1 # for latest
) # runtimes
ppg.run()
assert (
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*")))
== 2 + 1 + 1 # for latest
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*messages*")))
== 2 + 1 # for latest
) # runtimes
ppg.new(log_retention=2)
ppg.run()
prior = list(ppg.global_pipegraph.dir_config.log_dir.glob("*"))
prior = list(ppg.global_pipegraph.dir_config.log_dir.glob("*messages*"))
assert (
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*")))
== 3 + 1 + 1 # for latest
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*messages*")))
== 3 + 1 # for latest
) # runtimes
# no new.. still new log file please
ppg.run()
after = list(ppg.global_pipegraph.dir_config.log_dir.glob("*"))
after = list(ppg.global_pipegraph.dir_config.log_dir.glob("*messages*"))
assert (
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*")))
== 3 + 1 + 1 # for latest
len(list(ppg.global_pipegraph.dir_config.log_dir.glob("*messages*")))
== 3 + 1 # for latest
) # runtimes
assert set([x.name for x in prior]) != set([x.name for x in after])

Expand Down
25 changes: 23 additions & 2 deletions tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,16 @@ def test_single_stre(self):
with pytest.raises(TypeError):
ppg.MultiFileGeneratingJob("A", lambda of: write("out/A", param))

def test_order_of_files_is_kept_for_callback(self):
def do_b(ofs):
assert ofs[0].name == "b"
assert ofs[1].name == "B1"
ofs[0].write_text("B")
ofs[1].write_text("B")

ppg.MultiFileGeneratingJob(["b", "B1"], do_b)
ppg.run()


test_modifies_shared_global = []
shared_value = ""
Expand Down Expand Up @@ -1392,7 +1402,6 @@ def cache():
ppg.CachedAttributeLoadingJob("out/A", o2, "a", cache)

def test_no_swapping_callbacks(self):

o = Dummy()
ppg.AttributeLoadingJob("out/A", o, "a", lambda: 55, depend_on_function=False)

Expand All @@ -1402,7 +1411,6 @@ def test_no_swapping_callbacks(self):
)

def test_no_swapping_callbacks_cached(self):

o = Dummy()
ppg.CachedAttributeLoadingJob(
"out/A", o, "a", lambda: 55, depend_on_function=False
Expand Down Expand Up @@ -1933,6 +1941,19 @@ def do_a(ofs):
assert Path("a").read_text() == "4"
assert Path("c").read_text() == "1"

def test_order_of_files_is_kept_for_callback(self):
def do_b(ofs):
assert ofs[0].name == "b"
assert ofs[1].name == "B1"
ofs[0].write_text("B")
ofs[1].write_text("B")

a = ppg.MultiTempFileGeneratingJob(["b", "B1"], do_b)
b = ppg.FileGeneratingJob("c", lambda of: of.write_text("b"))
b.depends_on(a)

ppg.run()


@pytest.mark.usefixtures("create_out_dir")
@pytest.mark.usefixtures("ppg2_per_test")
Expand Down
8 changes: 4 additions & 4 deletions tests/test_other.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,10 @@ def fail(of):
3 + 1,
]
assert lc == [
1,
2,
3, # latest exluded by *.log
3,
1 * 2, # includes messages.log and lookup.log
2 * 2,
3 * 2, # latest exluded by *.log
3 * 2,
]


Expand Down
11 changes: 11 additions & 0 deletions tests/test_shared_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,3 +681,14 @@ def doit(output_files, prefix):
ppg.SharedMultiFileGeneratingJob("out", ["a"], doit)
with pytest.raises(ppg.JobOutputConflict):
ppg.SharedMultiFileGeneratingJob("out", ["b"], doit)

def test_file_order(self):
def doit(ofs, prefix):
print(ofs)
assert ofs[0].name == "b"
assert ofs[1].name == "B1"
ofs[0].write_text("B")
ofs[1].write_text("B")

ppg.SharedMultiFileGeneratingJob("out", ["b", "B1"], doit)
ppg.run()

0 comments on commit 6c43fc4

Please sign in to comment.