Skip to content

Commit

Permalink
Cancel input downloads if one of them fails
Browse files Browse the repository at this point in the history
  • Loading branch information
philandstuff committed Dec 20, 2024
1 parent f2ba468 commit f7809b4
Showing 1 changed file with 23 additions and 7 deletions.
30 changes: 23 additions & 7 deletions python/cog/server/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,18 +186,34 @@ def start_prediction() -> None:
self._input_download_pool.submit(item.convert) for item in v
]
to_await += futs[k]
futures.wait(to_await, return_when=futures.FIRST_EXCEPTION)
done, not_done = futures.wait(
to_await, return_when=futures.FIRST_EXCEPTION
)

if len(not_done) > 0:
# if any future isn't done, this is because one of the
# futures raised an exception. first we cancel outstanding
# work
for fut in not_done:
fut.cancel()
# then we find an exception to raise
for fut in done:
fut.result() # raises if the future finished with an exception
# we should never get here
raise Exception(
"Internal error: lost track of exception while downloading input files"
)

# all futures are done. some might still have raised an
# exception, but when we call fut.result() that will re-raise
# and do the right thing
for k, v in futs.items():
if isinstance(v, list):
payload[k] = []
for fut in v:
# the future may not be done if and only if another
# future finished with an exception
if fut.done():
payload[k].append(fut.result())
payload[k].append(fut.result())
elif isinstance(v, Future):
if v.done():
payload[k] = v.result()
payload[k] = v.result()
# send the prediction to the child to start
self._events.send(
Envelope(
Expand Down

0 comments on commit f7809b4

Please sign in to comment.