Skip to content

Implement option to use dask to do image co-addition#388

Merged
astrofrog merged 20 commits into
astropy:mainfrom
astrofrog:dask-coadd
Jul 3, 2026
Merged

Implement option to use dask to do image co-addition#388
astrofrog merged 20 commits into
astropy:mainfrom
astrofrog:dask-coadd

Conversation

@astrofrog

@astrofrog astrofrog commented Sep 11, 2023

Copy link
Copy Markdown
Member

This is an experiment for now but it allows:

  • N-dimensional reprojection/co-addition
  • combine_function='median' (not implemented here but trivial to do)
  • reproject_and_coadd to optionally return a dask array (return_type='dask') which means actually delaying the computation of the co-addition. You can extract a cutout from a co-added mosaic and compute that without ever having to actually compute the full mosaic.

At the moment, setting block_size is what toggles between the old code and the new dask-backed co-addition.

Still a work in progress and currently looking into how to get the parallel mode to work correctly (and actually be fast).

@keflavich - would you be able to try this out with some of your data? To use this, you just need to set e.g. block_size=(100, 100, 100) (or whatever you think makes sense) when calling the reproject_and_coadd function. For now, I am still investigating performance with the parallel mode, so I don't recommend trying to set parallel=... yet (though you technically can). I'm just curious to know at this point if it even gets close to working for you.

@codecov

codecov Bot commented Sep 11, 2023

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 95.34884% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 90.05%. Comparing base (2a45ffc) to head (0d2af9c).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
reproject/mosaicking/_coadd.py 94.94% 10 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #388      +/-   ##
==========================================
+ Coverage   89.31%   90.05%   +0.74%     
==========================================
  Files          51       51              
  Lines        2171     2293     +122     
==========================================
+ Hits         1939     2065     +126     
+ Misses        232      228       -4     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread reproject/common.py Outdated

@delayed(pure=True)
def as_delayed_memmap_path(array, tmp_dir):
tmp_dir = tempfile.mkdtemp() # FIXME

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why I didn't catch this before but I ran into an issue where the temporary directory no longer exists by the time the array is computed out of dask when using return_type='dask'.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes - are you saying that is a problem with the current implementation, or this line is fixing it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary fix for this PR but I need to figure out a proper solution. It is a problem in main.

@astrofrog

astrofrog commented Sep 11, 2023

Copy link
Copy Markdown
Member Author

Ok so the parallel option seems to work provided the block size is reasonable (not too big) but I'm only seeing factors of 2-3x speedup.

@keflavich

Copy link
Copy Markdown
Contributor

I get the following traceback when trying on my big cubes:

Writing
[                                        ] | 0% Completed | 548.30 ms
Traceback (most recent call last):
  File "<ipython-input-3-9471e55417d0>", line 1, in <module>
    make_downsampled_cube(f'{basepath}/mosaics/cubes/CS21_CubeMosaic.fits',
  File "/orange/adamginsburg/ACES/reduction_ACES/aces/imaging/make_mosaic.py", line 721, in make_downsampled_cube
    dscube.write(outcubename, overwrite=overwrite)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 1382, in write
    super().write(*args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/core.py", line 131, in __call__
    registry.write(self._instance, *args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/compat.py", line 52, in wrapper
    return getattr(registry, method_name)(*args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/registry/core.py", line 383, in write
    return writer(data, *args, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/io/fits.py", line 280, in write_fits_cube
    hdulist.writeto(filename, overwrite=overwrite)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/hdulist.py", line 1021, in writeto
    hdu._writeto(hdulist._file)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 710, in _writeto
    self._writeto_internal(fileobj, inplace, copy)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 716, in _writeto_internal
    data_offset, data_size = self._writedata(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/base.py", line 647, in _writedata
    size += self._writedata_internal(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py", line 649, in _writedata_internal
    return self._writeinternal_dask(fileobj)
  File "/blue/adamginsburg/adamginsburg/repos/astropy/astropy/io/fits/hdu/image.py", line 740, in _writeinternal_dask
    output.store(outarr, lock=True, compute=True)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1770, in store
    r = store([self], [target], **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1238, in store
    compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 342, in compute_as_if_collection
    return schedule(dsk2, keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 557, in get_sync
    return get_async(
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception

  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 224, in execute_task

  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 122, in getter
    c = a[b]
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/dask_spectral_cube.py", line 199, in __getitem__
    return self._mask._filled(data=self._data,
  File "/blue/adamginsburg/adamginsburg/repos/spectral-cube/spectral_cube/masks.py", line 240, in _filled
    return np.ma.masked_array(sliced_data, mask=ex).filled(fill)
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/site-packages/numpy/ma/core.py", line 2826, in __new__
    _data = np.array(data, dtype=dtype, copy=copy,
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1702, in __array__
    x = self.compute()
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 315, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/base.py", line 600, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 557, in get_sync
    return get_async(
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 500, in get_async
    for key, res_info, failed in queue_get(queue).result():
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/orange/adamginsburg/miniconda3/envs/python39/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 542, in submit
    fut.set_result(fn(*args, **kwargs))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in batch_execute_tasks
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 238, in <listcomp>
    return [execute_task(*a) for a in it]
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 229, in execute_task
    result = pack_exception(e, dumps)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in <genexpr>
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 1284, in finalize
    return concatenate3(results)
  File "/blue/adamginsburg/adamginsburg/repos/dask/dask/array/core.py", line 5289, in concatenate3
    result = np.empty(shape=shape, dtype=dtype(deepfirst(arrays)))
MemoryError: Unable to allocate 634. GiB for an array with shape (300, 10200, 27800) and data type >f8

I might have copy-pasted the traceback a bit incorrectly, it's very deep

@astrofrog

Copy link
Copy Markdown
Member Author

@keflavich - just to check, is that how big you would expect the output cube to be? Or is that incorrect? What is the command you are using?

If it is what you expect, have you tried using return_type='dask'? Alternatively, you could try and pass a memory mapped array as output_array?

@keflavich

Copy link
Copy Markdown
Contributor

that's the size of the input cube, not the output.

Call was this:

dscube = cube.reproject(hdr, return_type='dask', filled=False, parallel=True, block_size=[1,1000,1000])

Yes, passing the memmaped array as output sounds like a good idea - can try that next

@astrofrog

Copy link
Copy Markdown
Member Author

Which branch of spectral-cube are you using?

@keflavich

Copy link
Copy Markdown
Contributor

I'm on cubewcs_mosaic_and_dask_reproject, https://github.com/keflavich/spectral-cube/tree/cubewcs_mosaic_and_dask_reproject. It's not really a stable branch... I'm doing all sorts of experiments in parallel and getting a bit lost along the way.

@astrofrog

Copy link
Copy Markdown
Member Author

This is going to require some non-trivial rebasing

@astrofrog astrofrog added this to the v1.0 milestone Nov 2, 2025
@astrofrog

Copy link
Copy Markdown
Member Author

This has been rebased/re-written

astrofrog added 7 commits July 2, 2026 22:29
…s each image lazily, pads it onto the output grid, and combines the images with a single nan-aware reduction so the whole coaddition is computed in one deferred dask graph
…es the numpy path exactly for every combine function, support input_weights, add an unweighted median that is only available for the dask path, and cover these by parameterizing the existing coaddition tests over return_type
… deferred coaddition does not blow up the task count as the number of images grows, and divide the mean by a footprint whose zeros are replaced by one so the lazily evaluated division does not warn on uncovered pixels
…actually returns an uncomputed dask array before computing it, so that a silent numpy fallback would be caught rather than passing trivially through np.asarray
…ions so each output chunk is a single plane, which keeps the memory bounded to one plane per image and lets the reprojection and coaddition stream plane by plane rather than chunking the reprojected dimensions and holding the whole non-reprojected extent at once
…hunking when one is given, falling back to one plane per chunk along the non-reprojected dimensions otherwise
…ask' in the reproject dispatcher, since the footprint is returned as a lazy dask array in that case and allocating it in full defeats the deferred computation and can exhaust memory for large outputs
astrofrog added 7 commits July 2, 2026 23:21
… footprints and blanking, validate return_type and reject output_array, output_footprint and intermediate_memmap with return_type='dask', return a blank mosaic when no input overlaps the output, normalize block_sizes once so a single tuple or 'auto' works everywhere, bound the median chunk memory and avoid its all-NaN warnings, and forward dask_method in reproject_exact and reproject_adaptive
…ligned to the output chunking, computed once before the loop, so the rechunk before stacking only merges chunks at the cutout edges and the deferred coaddition task count scales linearly with the number of images instead of blowing up through dense split-and-recombine layers
…array_utils as pad_dask_array_to_grid since it is a generic dask array utility with no mosaicking dependencies
…enerator consumed by separate eager and deferred co-addition drivers, so the parsing, weights handling and cutout computation exist exactly once while each driver reads linearly without interleaved branching
… calling it constructs an iterator rather than performing the iteration
…o return_type='numpy' or return_type='dask' and use the return_type values consistently instead of eager and deferred
…ngs into comments so numpydoc validation does not require full parameter sections, removing the now unused ndim_out variable, and applying black formatting to a long test assertion
astrofrog added 3 commits July 3, 2026 09:41
…e a name, since dask deduplicates them in the combined graph and silently uses one input's data for both
…stead of using pytest.warns, which re-emits unrelated warnings on exit and fails under the warnings-as-errors filter when the oldest dependencies emit a DeprecationWarning
…, dtype preservation, padding-free flush and full-extent bounds, and the chunk boundaries being the target grid edges plus at most cuts at the array bounds so the final rechunk only merges chunks
@astrofrog astrofrog marked this pull request as ready for review July 3, 2026 10:22
astrofrog added 3 commits July 3, 2026 11:27
…in the coaddition, since a block size covering only the reprojected dimensions was prepended with entries from those reprojected dimensions, and pass the block size through to the per-image weights reprojection which otherwise raises NotImplementedError with non_reprojected_dims
…extent case since block sizes smaller than the output along the reprojected dimensions are not supported yet on this branch
@astrofrog

Copy link
Copy Markdown
Member Author

This seems to work well - as with all things dask it can take a bit of fine tuning to figure out what e.g. a good block size is, but that's inherent to dask. Having return_type='dask' means we can avoid relying on intermediate memmaps for instance. I'll go ahead and merge since there are a few other big PRs coming, and once they are all in I'll assess where we are and will write some docs about all this.

@astrofrog astrofrog merged commit 920f9c0 into astropy:main Jul 3, 2026
19 of 20 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants