Source code for dask_image.ndinterp._map_coordinates

# -*- coding: utf-8 -*-

from dask import delayed
import dask.array as da
import numpy as np
from dask.base import tokenize
from scipy.ndimage import map_coordinates as ndimage_map_coordinates
from scipy.ndimage import labeled_comprehension as\
    ndimage_labeled_comprehension

from ..dispatch._utils import get_type


def _map_single_coordinates_array_chunk(
        input, coordinates, order=3, mode='constant',
        cval=0.0, prefilter=False):
    """
    Central helper function for implementing map_coordinates.

    Receives 'input' as a dask array and computed coordinates.

    Implementation details and steps:
    1) associate each coordinate in coordinates with the chunk
       it maps to in the input
    2) for each input chunk that has been associated to at least one
       coordinate, calculate the minimal slice required to map
       all coordinates that are associated to it (note that resulting slice
       coordinates can lie outside of the coordinate's chunk)
    3) for each previously obtained slice and its associated coordinates,
       define a dask task and apply ndimage.map_coordinates
    4) outputs of ndimage.map_coordinates are rearranged to match input order
    """

    # STEP 1: Associate each coordinate in coordinates with
    # the chunk it maps to in the input array

    # get the input chunks each coordinate maps onto
    coords_input_chunk_locations = coordinates.T // np.array(input.chunksize)

    # map out-of-bounds chunk locations to valid input chunks
    coords_input_chunk_locations = np.clip(
        coords_input_chunk_locations, 0, np.array(input.numblocks) - 1
    )

    # all input chunk locations
    input_chunk_locations = np.array([i for i in np.ndindex(input.numblocks)])

    # linearize input chunk locations
    coords_input_chunk_locations_linear = np.sum(
        coords_input_chunk_locations * np.array(
            [np.prod(input.numblocks[:dim])
                for dim in range(input.ndim)])[::-1],
        axis=1, dtype=np.int64)

    # determine the input chunks that have coords associated and
    # count how many coords map onto each input chunk
    chunk_indices_count = np.bincount(coords_input_chunk_locations_linear,
                                      minlength=np.prod(input.numblocks))
    required_input_chunk_indices = np.where(chunk_indices_count > 0)[0]
    required_input_chunks = input_chunk_locations[required_input_chunk_indices]
    coord_rc_count = chunk_indices_count[required_input_chunk_indices]

    # inverse mapping: input chunks to coordinates
    required_input_chunk_coords_indices = \
        [np.where(coords_input_chunk_locations_linear == irc)[0]
            for irc in required_input_chunk_indices]

    # STEP 2: for each input chunk that has been associated to at least
    # one coordinate, calculate the minimal slice required to map all
    # coordinates that are associated to it (note that resulting slice
    # coordinates can lie outside of the coordinate's chunk)

    # determine the slices of the input array that are required for
    # mapping all coordinates associated to a given input chunk.
    # Note that this slice can be larger than the given chunk when coords
    # lie at chunk borders.
    # (probably there's a more efficient way to do this)
    input_slices_lower = np.array([np.clip(
            ndimage_labeled_comprehension(
                np.floor(coordinates[dim] - order // 2),
                coords_input_chunk_locations_linear,
                required_input_chunk_indices,
                np.min, np.int64, 0),
            0, input.shape[dim] - 1)
        for dim in range(input.ndim)])

    input_slices_upper = np.array([np.clip(
            ndimage_labeled_comprehension(
                np.ceil(coordinates[dim] + order // 2) + 1,
                coords_input_chunk_locations_linear,
                required_input_chunk_indices,
                np.max, np.int64, 0),
            0, input.shape[dim])
        for dim in range(input.ndim)])

    input_slices = np.array([input_slices_lower, input_slices_upper])\
        .swapaxes(1, 2)

    # STEP 3: For each previously obtained slice and its associated
    # coordinates, define a dask task and apply ndimage.map_coordinates

    # prepare building dask graph
    # define one task per associated input chunk
    name = "map_coordinates_chunk-%s" % tokenize(
        input,
        coordinates,
        order,
        mode,
        cval,
        prefilter
        )

    keys = [(name, i) for i in range(len(required_input_chunks))]

    # pair map_coordinates calls with input slices and mapped coordinates
    values = []
    for irc in range(len(required_input_chunks)):

        ric_slice = [slice(
            input_slices[0][irc][dim],
            input_slices[1][irc][dim])
            for dim in range(input.ndim)]
        ric_offset = input_slices[0][irc]

        values.append((
            ndimage_map_coordinates,
            input[tuple(ric_slice)],
            coordinates[:, required_input_chunk_coords_indices[irc]]
            - ric_offset[:, None],
            None,
            order,
            mode,
            cval,
            prefilter
        ))

    # build dask graph
    dsk = dict(zip(keys, values))
    ar = da.Array(dsk, name, tuple([list(coord_rc_count)]), input.dtype)

    # STEP 4: rearrange outputs of ndimage.map_coordinates
    # to match input order
    orig_order = np.argsort(
        [ic for ric_ci in required_input_chunk_coords_indices
            for ic in ric_ci])

    # compute result and reorder
    # (ordering first would probably unnecessarily inflate the task graph)
    return ar.compute()[orig_order]


[docs]def map_coordinates(input, coordinates, order=3, mode='constant', cval=0.0, prefilter=False): """ Wraps ndimage.map_coordinates. Both the input and coordinate arrays can be dask arrays. GPU arrays are not supported. For each chunk in the coordinates array, the coordinates are computed and mapped onto the required slices of the input array. One task is is defined for each input array chunk that has been associated to at least one coordinate. The outputs of the tasks are then rearranged to match the input order. For more details see the docstring of '_map_single_coordinates_array_chunk'. Using this function together with schedulers that support parallelism (threads, processes, distributed) makes sense in the case of either a large input array or a large coordinates array. When both arrays are large, it is recommended to use the single-threaded scheduler. A scheduler can be specified using e.g. `with dask.config.set(scheduler='threads'): ...`. input : array_like The input array. coordinates : array_like The coordinates at which to sample the input. order : int, optional The order of the spline interpolation, default is 3. The order has to be in the range 0-5. mode : boundary behavior mode, optional cval : float, optional Value to fill past edges of input if mode is 'constant'. Default is 0.0 prefilter : bool, optional If True, prefilter the input before interpolation. Default is False. Warning: prefilter is True by default in `scipy.ndimage.map_coordinates`. Prefiltering here is performed on a chunk-by-chunk basis, which may lead to different results than `scipy.ndimage.map_coordinates` in case of chunked input arrays and order > 1. Note: prefilter is not necessary when: - You are using nearest neighbour interpolation, by setting order=0 - You are using linear interpolation, by setting order=1, or - You have already prefiltered the input array, using the spline_filter or spline_filter1d functions. Comments: - in case of a small coordinate array, it might make sense to rechunk it into a single chunk - note the different default for `prefilter` compared to `scipy.ndimage.map_coordinates`, which is True by default. """ if "cupy" in str(get_type(input)) or "cupy" in str(get_type(coordinates)): raise NotImplementedError( "GPU cupy arrays are not supported by " "dask_image.ndinterp.map_overlap") # if coordinate array is not a dask array, convert it into one if type(coordinates) is not da.Array: coordinates = da.from_array(coordinates, chunks=coordinates.shape) else: # make sure indices are not split across chunks, i.e. that there's # no chunking along the first dimension if len(coordinates.chunks[0]) > 1: coordinates = da.rechunk( coordinates, (-1,) + coordinates.chunks[1:]) # if the input array is not a dask array, convert it into one if type(input) is not da.Array: input = da.from_array(input, chunks=input.shape) # Map each chunk of the coordinates array onto the entire input array. # 'input' is passed to `_map_single_coordinates_array_chunk` using a bit of # a dirty trick: it is split into its components and passed as a delayed # object, which reconstructs the original array when the task is # executed. Therefore two `compute` calls are required to obtain the # final result, one of which is peformed by # `_map_single_coordinates_array_chunk` # Discussion https://dask.discourse.group/t/passing-dask-objects-to-delayed-computations-without-triggering-compute/1441 # noqa: E501 output = da.map_blocks( _map_single_coordinates_array_chunk, delayed(da.Array)(input.dask, input.name, input.chunks, input.dtype), coordinates, order=order, mode=mode, cval=cval, prefilter=prefilter, dtype=input.dtype, chunks=coordinates.chunks[1:], drop_axis=0, ) return output