Source code for dask_image.ndinterp
# -*- coding: utf-8 -*-
from itertools import product
import numpy as np
import dask.array as da
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph
from scipy.ndimage import affine_transform as ndimage_affine_transform
import warnings
from ..dispatch._dispatch_ndinterp import (
dispatch_affine_transform,
dispatch_asarray,
)
__all__ = [
"affine_transform",
]
[docs]def affine_transform(
image,
matrix,
offset=None,
output_shape=None,
order=1,
output_chunks=None,
**kwargs
):
"""Apply an affine transform using Dask. For every
output chunk, only the slice containing the relevant part
of the image is processed. Chunkwise processing is performed
either using `ndimage.affine_transform` or
`cupyx.scipy.ndimage.affine_transform`, depending on the input type.
Notes
-----
Differences to `ndimage.affine_transformation`:
- currently, prefiltering is not supported
(affecting the output in case of interpolation `order > 1`)
- default order is 1
- modes 'reflect', 'mirror' and 'wrap' are not supported
Arguments equal to `ndimage.affine_transformation`,
except for `output_chunks`.
Parameters
----------
image : array_like (Numpy Array, Cupy Array, Dask Array...)
The image array.
matrix : array (ndim,), (ndim, ndim), (ndim, ndim+1) or (ndim+1, ndim+1)
Transformation matrix.
offset : array (ndim,)
Transformation offset.
output_shape : array (ndim,), optional
The size of the array to be returned.
order : int, optional
The order of the spline interpolation. Note that for order>1
scipy's affine_transform applies prefiltering, which is not
yet supported and skipped in this implementation.
output_chunks : array (ndim,), optional
The chunks of the output Dask Array.
Returns
-------
affine_transform : Dask Array
A dask array representing the transformed output
"""
if not type(image) == da.core.Array:
image = da.from_array(image)
if output_shape is None:
output_shape = image.shape
if output_chunks is None:
output_chunks = image.shape
# Perform test run to ensure parameter validity.
ndimage_affine_transform(np.zeros([0] * image.ndim),
matrix,
offset)
# Make sure parameters contained in matrix and offset
# are not overlapping, i.e. that the offset is valid as
# it needs to be modified for each chunk.
# Further parameter checks are performed directly by
# `ndimage.affine_transform`.
matrix = np.asarray(matrix)
offset = np.asarray(offset).squeeze()
# these lines were copied and adapted from `ndimage.affine_transform`
if (matrix.ndim == 2 and matrix.shape[1] == image.ndim + 1 and
(matrix.shape[0] in [image.ndim, image.ndim + 1])):
# assume input is homogeneous coordinate transformation matrix
offset = matrix[:image.ndim, image.ndim]
matrix = matrix[:image.ndim, :image.ndim]
# process kwargs
# prefilter is not yet supported
if 'prefilter' in kwargs:
if kwargs['prefilter'] and order > 1:
warnings.warn('Currently, `dask_image.ndinterp.affine_transform` '
'doesn\'t support `prefilter=True`. Proceeding with'
' `prefilter=False`, which if order > 1 can lead '
'to the output containing more blur than with '
'prefiltering.', UserWarning)
del kwargs['prefilter']
if 'mode' in kwargs:
if kwargs['mode'] in ['wrap', 'reflect', 'mirror']:
raise(NotImplementedError("Mode %s is not currently supported."
% kwargs['mode']))
n = image.ndim
image_shape = image.shape
# calculate output array properties
normalized_chunks = da.core.normalize_chunks(output_chunks, output_shape)
block_indices = product(*(range(len(bds)) for bds in normalized_chunks))
block_offsets = [np.cumsum((0,) + bds[:-1]) for bds in normalized_chunks]
# use dispatching mechanism to determine backend
affine_transform_method = dispatch_affine_transform(image)
asarray_method = dispatch_asarray(image)
# construct dask graph for output array
# using unique and deterministic identifier
output_name = 'affine_transform-' + tokenize(image, matrix, offset,
output_shape, output_chunks,
kwargs)
output_layer = {}
rel_images = []
for ib, block_ind in enumerate(block_indices):
out_chunk_shape = [normalized_chunks[dim][block_ind[dim]]
for dim in range(n)]
out_chunk_offset = [block_offsets[dim][block_ind[dim]]
for dim in range(n)]
out_chunk_edges = np.array([i for i in np.ndindex(tuple([2] * n))])\
* np.array(out_chunk_shape) + np.array(out_chunk_offset)
# map output chunk edges onto input image coordinates
# to define the input region relevant for the current chunk
if matrix.ndim == 1 and len(matrix) == image.ndim:
rel_image_edges = matrix * out_chunk_edges + offset
else:
rel_image_edges = np.dot(matrix, out_chunk_edges.T).T + offset
rel_image_i = np.min(rel_image_edges, 0)
rel_image_f = np.max(rel_image_edges, 0)
# Calculate edge coordinates required for the footprint of the
# spline kernel according to
# https://github.com/scipy/scipy/blob/9c0d08d7d11fc33311a96d2ac3ad73c8f6e3df00/scipy/ndimage/src/ni_interpolation.c#L412-L419 # noqa: E501
# Also see this discussion:
# https://github.com/dask/dask-image/issues/24#issuecomment-706165593 # noqa: E501
for dim in range(n):
if order % 2 == 0:
rel_image_i[dim] += 0.5
rel_image_f[dim] += 0.5
rel_image_i[dim] = np.floor(rel_image_i[dim]) - order // 2
rel_image_f[dim] = np.floor(rel_image_f[dim]) - order // 2 + order
if order == 0: # required for consistency with scipy.ndimage
rel_image_i[dim] -= 1
# clip image coordinates to image extent
for dim, s in zip(range(n), image_shape):
rel_image_i[dim] = np.clip(rel_image_i[dim], 0, s - 1)
rel_image_f[dim] = np.clip(rel_image_f[dim], 0, s - 1)
rel_image_slice = tuple([slice(int(rel_image_i[dim]),
int(rel_image_f[dim]) + 1)
for dim in range(n)])
rel_image = image[rel_image_slice]
"""Block comment for future developers explaining how `offset` is
transformed into `offset_prime` for each output chunk.
Modify offset to point into cropped image.
y = Mx + o
Coordinate substitution:
y' = y - y0(min_coord_px)
x' = x - x0(chunk_offset)
Then:
y' = Mx' + o + Mx0 - y0
M' = M
o' = o + Mx0 - y0
"""
offset_prime = offset + np.dot(matrix, out_chunk_offset) - rel_image_i
output_layer[(output_name,) + block_ind] = (
affine_transform_method,
(da.core.concatenate3, rel_image.__dask_keys__()),
asarray_method(matrix),
offset_prime,
tuple(out_chunk_shape), # output_shape
None, # out
order,
'constant' if 'mode' not in kwargs else kwargs['mode'],
0. if 'cval' not in kwargs else kwargs['cval'],
False # prefilter
)
rel_images.append(rel_image)
graph = HighLevelGraph.from_collections(output_name, output_layer,
dependencies=[image] + rel_images)
meta = dispatch_asarray(image)([0]).astype(image.dtype)
transformed = da.Array(graph,
output_name,
shape=output_shape,
# chunks=output_chunks,
chunks=normalized_chunks,
meta=meta)
return transformed