Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager processing for faster and more continuous image delivery #160

Open
gerritholl opened this issue Oct 5, 2022 · 12 comments · Fixed by #168
Open

Eager processing for faster and more continuous image delivery #160

gerritholl opened this issue Oct 5, 2022 · 12 comments · Fixed by #168
Assignees

Comments

@gerritholl
Copy link
Member

Feature Request

NB: some aspects of this feature request affect multiple Pytroll packages. I've put this issue here because I think it affects trollflow2 the most. It's actually two feature requests, but they serve the same goal.

Is your feature request related to a problem? Please describe.

The way we produce images and deliver them to our client software is inefficient.

  1. In Pytroll, we wait until all parts of the input data are there (via geographic gatherer or segment gatherer), then send a message to trollflow2 to start processing. The images would be produced faster if we could start processing each segment as it comes in.
  2. In several parts of trollflow2, images are only finalised when all products have been produced. For example, if we use the file publisher plugin (or any other plugin that comes after save_datasets), this is only called after save_datasets has completed. If we use a staging zone, all files are moved from the staging zone to the output directory only when all images have been produced. When we produce many images, there can be many minutes between the finalisation of the first and the last one. In this case, it would be beneficial to announce each image (or move it to the output directory) as soon as it is finished.

Both of those aspects cause a delay in the latency of our image production. This may become more critical with FCI due to the much larger image size compared to SEVIRI. It also means all our images are delivered to our client software at the same time, which leads to bottlenecks in the images being imported there, that would be avoided or at least reduced if the images were delivered one by one.

Describe the solution you'd like

I would like a solution that minimises the latency of image delivery to clients without sacrificing quality. How to do this is of secondary concern. Problem (1) is difficult to solve if we are doing any resampling. Problem (2) is in theory easier to solve, but may need significant changes in trollflow2. Instead of sequentially calling the plugins, it might need to call the filepublisher plugin many times in parallel, while communicating with the save_datasets plugin to know when each file is ready. Or the save datasets plugin would need to do the publishing itself (or moving the final files in case of using a staging zone or temporary files).

Describe any changes to existing user workflow

I think either problem should be solvable without breaking existing user workflow, although either problem is large enough that the amount of restructuring needed to solve them may lead to accidental breaking of backwards compatibility.

Additional context

For point (1), it would be possible to skip the segment gatherer and send each segment immediately to trollflow2, producing images in native projection, that are then at the end all read and resampled, before stored again, maybe. I think such a setup could be built without writing new code. For point (2), we could write directly to the final output directory, but because our file distribution software is external, it doesn't have a way to know when a file is finished (I think). This leads to the file distribution software copying files before they are finished, in particular for large files that take a while to produce.

@gerritholl gerritholl self-assigned this Oct 5, 2022
@mraspaud
Copy link
Member

mraspaud commented Oct 5, 2022

Thanks for the writeup!

Some thoughts popping in my mind when reading this:

  1. Using the futures interface with dask, if we know in advance the files we are expecting, we could in theory create a dask graph before the files are received, and the computations for each chunk would be triggered as soon as the corresponding file is there. https://docs.dask.org/en/stable/futures.html
  2. In theory the files are written in parallel, so they should all be ready approximately at the same time. But in practice it's maybe not the case, so we would need to trigger a message before dask is ready...

@gerritholl
Copy link
Member Author

  1. In theory the files are written in parallel, so they should all be ready approximately at the same time.

Only if they all take the same time to produce. If we use the same trollflow2 instance with different areas, there can be large differences between when they are ready. And are the files always written all in parallel? Or only the ones with the same priority as defined in the trollflow2 configuration?

@mraspaud
Copy link
Member

mraspaud commented Oct 5, 2022

if they have different priorities they publish their messages at different times iirc.

@pnuu
Copy link
Member

pnuu commented Oct 6, 2022

if they have different priorities they publish their messages at different times iirc.

Exactly. The areas are then processed sequentially with the lowest priority number first, publish the finalized images, and start with the next priority.

I'll read the issue again tomorrow and see if I come up with other points.

@gerritholl
Copy link
Member Author

Could we somehow trigger a posttroll message for each successfully created file, as soon as it has been created? Where would such a message-sending fit? Maybe inside a dask.delayed function somewhere, such that it's only sent upon computation?

@gerritholl
Copy link
Member Author

A callback could be added to trollimage.xrimage.delayed_pil_save, pyninjotiff.pyninjotiff.tiffwrite, and similar writer "exit points"?

@gerritholl
Copy link
Member Author

As pointed out here, such a callback could work if the callback gets passed the result to be returned, i.e. where we currently have

objs.append(obj)

we would instead write

objs.append(callback(obj))

where it would be the responsibility of the callback to accept obj and return obj again (and probably we would want to pass fmat_config['filename'] as well).

@gerritholl
Copy link
Member Author

Reopening, because #168 only solves part of the problem.

@gerritholl
Copy link
Member Author

I've set up my trollstalker + trollflow2 to process one chunk/segment at a time. A problem is that this increases the overhead considerably. The dask graph for processing a single chunk looks like this:

image

and that's only for FDHSI. Source:

import hdf5plugin
import os
from satpy import Scene
from pyresample import create_area_def
from satpy.utils import debug_on; debug_on()
from sattools.io import plotdir
from dask.diagnostics import Profiler, ResourceProfiler, visualize
from satpy.writers import compute_writer_results
import dask.config
from satpy.tests.utils import CustomScheduler

def main():
    ar1 = create_area_def("dyn_eqc_1km", 4087, resolution=1000)
    ar2 = create_area_def("dyn_eqc_2km", 4087, resolution=2000)
    fci_file = "/media/nas/x23352/MTG/FCI/L1c/2023/12/04/W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20231204094727_IDPFI_OPE_20231204094258_20231204094347_N_JLS_C_0059_0016.nc"
    chans2 = ['dwd_ir105', 'dwd_ir123', 'dwd_ir133', 'dwd_ir38', 'dwd_ir87',
              'dwd_ir97', 'dwd_wv63', 'dwd_wv73', "airmass"]
    chans1 = ['dwd_nir08', 'dwd_nir09', 'dwd_nir13', 'dwd_nir16', 'dwd_nir22',
              'dwd_vis04', 'dwd_vis05', 'dwd_vis06', "true_color"]
    sc = Scene(filenames={"fci_l1c_nc": [fci_file]})
    sc.load(chans1 + chans2, pad_data=False)
    ls1 = sc.resample(ar1, cache_dir="/data/gholl/cache/satpy", reduce_data=False)
    ls2 = sc.resample(ar2, cache_dir="/data/gholl/cache/satpy", reduce_data=False)
    cmps = []
    for ch in chans1:
        cmps.append(ls1.save_dataset(ch,
            filename=os.fspath(plotdir() /
                           "{start_time:%Y%m%d%H%M}-{platform_name}-{sensor}-{area.area_id}-{name}.tif"),
            compute=False))
    for ch in chans2:
        cmps.append(ls2.save_dataset(ch,
            filename=os.fspath(plotdir() /
                           "{start_time:%Y%m%d%H%M}-{platform_name}-{sensor}-{area.area_id}-{name}.tif"),
            compute=False))
    compute_writer_results(cmps)

with Profiler() as prof, ResourceProfiler(dt=0.25) as rprof:
#with dask.config.set(scheduler=CustomScheduler(max_computes=1)):
    main()
visualize([prof, rprof], show=False, save=True, filename="/tmp/dask-prof.html")

That's nearly 11 seconds of overhead per chunk, or around 7 minutes per full disc. Unless we can bring this down significantly or find a workaround, processing one chunk at a time in this way is not a feasible solution.

@pnuu
Copy link
Member

pnuu commented Dec 11, 2023

This is actually pretty interesting.

I tested with a single FDHSI segment and compared with all 40+40 FDHSI and HRFI segments. The initial "dead" time of creating the Scene, loading channels and resampling (single area, gradient search) is approx. 8 s and 16 seconds respectively. Of this Scene creation is ~0.1 s / 4 s, composite loading 3 s / 6 s and resampling 2 seconds for both (same padded composites for both).

For you the lazy saving is something extra. Without compute_writer_results(cmps) you should get only the "dead" portion in the beginning.

@gerritholl
Copy link
Member Author

gerritholl commented Dec 11, 2023

Timing results, estimating the overhead (using regular area definitions corresponding to the ones resulting from the dynamic ones):

One segment

Step Total Delta
Startup and imports 1.38 1.38
Area creation 1.43 0.05
Scene creation 2.01 0.58
Loading 3.1 1.1
Resampling (from cache) 4.1 1.0
Saving 7.4 3.0
Computing 15.1 6.7

Overhead: 7.4 seconds

Ten segments

Step Total Delta Per segment
Startup and imports 1.37 1.37
Area creation 1.43 0.06
Scene creation 4.0 1.6 0.16
Loading 7.3 3.3 0.33
Resampling (from cache) 10.4 3.1 0.31
Saving 15.0 4.6 0.46
Computing 74.6 59.6 5.96

Overhead: 15.0 seconds (1.5 seconds per segment)

40 segments

Step Total Delta Per segment
Startup and imports 1.36 1.36
Area creation 1.38 0.02
Scene creation 12.4 11.0 0.27
Loading 23.1 10.7 0.27
Resampling (from cache) 34.8 11.7 0.29
Saving 38.5 3.7 0.09
Computing 306 267 6.7

Overhead: 38.5 seconds (0.96 seconds per segment)

NB: I also tried swapping between nearest and gradient_search, but I did not see much difference.

@gerritholl
Copy link
Member Author

This appears to work in principle:

import dask
import dask.array as da
import time
import netCDF4
import xarray as xr
import datetime
from satpy import Scene
import satpy.resample
from satpy.utils import debug_on; debug_on()
from satpy.writers import compute_writer_results
def get_value(fn, max_tries=10, wait=1):
    for _ in range(max_tries):
        try:
            nc = netCDF4.Dataset(fn, "r")
        except FileNotFoundError:
            time.sleep(wait)
        else:
            break
    else:
        raise TimeoutError("File failed to materialise")
    return nc["vis_06"][:, :]
value = dask.delayed(get_value)("/tmp/test.nc")
array = da.from_delayed(value, (22272, 22272,), dtype="float32")
area = satpy.resample.get_area_def("mtg_fci_fdss_500m")
da = xr.DataArray(
        array, dims=("y", "x"),
        attrs={"area": area, "start_time": datetime.datetime(2025, 1, 1)})
sc = Scene()
sc["vis_06"] = da
ls = sc.resample("eurol")
res = ls.save_datasets(compute=False)
compute_writer_results(res)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Status: Done
Status: In Progress
Development

Successfully merging a pull request may close this issue.

3 participants