Source code for pixell.fft

"""This is a convenience wrapper of pyfftw."""
from __future__ import print_function, division
import numpy as np, multiprocessing, os
from . import utils
engines = {}

# Define our engines. First a baseline numpy-based engine
[docs]class numpy_FFTW: """Minimal wrapper of numpy in order to be able to provide it as an engine. Not a full-blown interface.""" def __init__(self, a, b, axes=(-1), direction='FFTW_FORWARD', *args, **kwargs): self.a, self.b = a, b self.axes = axes self.direction = direction def __call__(self, normalise_idft=False): if self.direction == 'FFTW_FORWARD': if self.a.shape == self.b.shape: # Complex to complex self.b[:] = np.fft.fftn(self.a, axes=self.axes) else: # Real to complex self.b[:] = np.fft.rfftn(self.a, axes=self.axes) else: if self.a.shape == self.b.shape: # Complex to complex self.b[:] = np.fft.ifftn(self.a, axes=self.axes) else: self.b[:] = np.fft.irfftn(self.a, s=[self.b.shape[i] for i in self.axes], axes=self.axes) # Numpy already normalizes, so undo this if necessary if not normalise_idft: self.b *= np.prod([self.b.shape[i] for i in self.axes])
[docs]class ducc_FFTW: """Minimal wrapper of ducc in order to be able to provide it as an engine. Not a full-blown interface.""" def __init__(self, a, b, axes=(-1,), direction='FFTW_FORWARD', threads=1, *args, **kwargs): self.a, self.b = np.asarray(a), np.asarray(b) self.axes = tuple(axes) self.direction = direction self.threads = threads
[docs] def do_dct(self, kind, *args, **kwargs): # Expect format of type FFTW_REDFT00 name = {"REDFT":"DCT","RODFT":"DST"}[kind[5:10]] num = {"00":1, "10": 2, "01": 3, "11":4}[kind[10:12]] if name == "DCT": return ducc0.fft.dct(*args, type=num, **kwargs) elif name == "DST": return ducc0.fft.dst(*args, type=num, **kwargs)
def __call__(self, normalise_idft=False): if self.direction == 'FFTW_FORWARD': if self.a.shape == self.b.shape: # Complex to complex ducc0.fft.c2c(self.a, axes=self.axes, out=self.b, nthreads=self.threads) else: # Real to complex ducc0.fft.r2c(self.a, axes=self.axes, out=self.b, nthreads=self.threads) elif self.direction == "FFTW_BACKWARD": if self.a.shape == self.b.shape: # Complex to complex ducc0.fft.c2c(a=self.a, axes=self.axes, out=self.b, forward=False, inorm=2 if normalise_idft else 0, nthreads=self.threads) else: ducc0.fft.c2r(a=self.a, axes=self.axes, out=self.b, forward=False, lastsize=self.b.shape[self.axes[-1]], inorm=2 if normalise_idft else 0, nthreads=self.threads) elif _check_ducc_r2r(self.direction): # dct and dst are passed with a list with one entry per dimension of the transform, # but ducc doesn't support heterogeneous transforms like this, so just use the first element self.do_dct(self.direction[0], self.a, axes=self.axes, out=self.b, nthreads=self.threads)
def _check_ducc_r2r(direction): if isinstance(direction, str): return False for d in direction: if d != direction[0]: raise ValueError("ducc only supports homogeneous r2r transforms") return direction[0].startswith("FFTW_REDFT") or direction[0].startswith("FFTW_RODFT")
[docs]def numpy_empty_aligned(shape, dtype, n=None): """This dummy function just skips the alignment, since numpy doesn't provide an easy way to get it.""" return np.empty(shape, dtype)
[docs]class NumpyEngine: pass
numpy_engine = NumpyEngine() numpy_engine.FFTW = numpy_FFTW numpy_engine.empty_aligned = numpy_empty_aligned engines["numpy"] = numpy_engine engine = "numpy" # Then optional, faster engines try: import pyfftw engines["fftw"] = pyfftw engine = "fftw" except ImportError: pass else: # Is FFTW actually using intel MKL as a backend? Check if 1D FT crashes for 3D input. try: engines['fftw'].FFTW(np.zeros((1,1,1)), np.zeros((1,1,1), dtype=np.complex128), flags=['FFTW_ESTIMATE'], threads=1, axes=[-1]) except RuntimeError as e: engines['intel'] = engines.pop('fftw') engine = 'intel' try: import pyfftw_intel as intel engines["intel"] = intel engine = "intel" except ImportError: pass # ducc is slower than intel, but can be faster than pyfftw try: import ducc0
[docs] class DuccEngine: pass
ducc_engine = DuccEngine() ducc_engine.FFTW = ducc_FFTW ducc_engine.empty_aligned = numpy_empty_aligned engines["ducc"] = ducc_engine if engine != "intel": engine = "ducc" except ImportError: pass if len(engines) == 0: # This should not happen due to the numpy fallback raise ImportError("Could not find any fftw implementations!") try: nthread_fft = int(os.environ['OMP_NUM_THREADS']) except (KeyError, ValueError): nthread_fft=multiprocessing.cpu_count() nthread_ifft=nthread_fft default_flags=['FFTW_ESTIMATE'] alignment = 32
[docs]def set_engine(eng): global engine engine = eng
[docs]def get_engine(eng): return engine if eng == "auto" else eng
[docs]def fft(tod, ft=None, nthread=0, axes=[-1], flags=None, _direction="FFTW_FORWARD", engine="auto"): """Compute discrete fourier transform of tod, and store it in ft. What transform to do (real or complex, number of dimension etc.) is determined from the size and type of tod and ft. If ft is left out, a complex transform is assumed. The optional nthread argument specifies the number of theads to use in the fft. The default (0) uses the value specified by the OMP_NUM_THREAD environment varible if that is specified, or the total number of cores on the computer otherwise.""" tod = asfcarray(tod) if tod.size == 0: return nt = nthread or nthread_fft if flags is None: flags = default_flags if ft is None: otype = np.result_type(tod.dtype,0j) ft = empty(tod.shape, otype) tod = tod.astype(otype, copy=False) engine = get_engine(engine) if engine == 'intel': ft[:] = fft_flat(tod, ft, axes=axes, nthread=nt, flags=flags, _direction=_direction) else: plan = engines[engine].FFTW(tod, ft, flags=flags, threads=nt, axes=axes, direction=_direction) plan() return ft
[docs]def ifft(ft, tod=None, nthread=0, normalize=False, axes=[-1],flags=None, engine="auto"): """Compute inverse discrete fourier transform of ft, and store it in tod. What transform to do (real or complex, number of dimension etc.) is determined from the size and type of tod and ft. The optional nthread argument specifies the number of theads to use in the fft. The default (0) uses the value specified by the OMP_NUM_THREAD environment varible if that is specified, or the total number of cores on the computer otherwise. By default this is not normalized, meaning that fft followed by ifft will multiply the data by the length of the transform. By specifying the normalize argument, you can turn normalization on, though the normalization step will not use paralellization.""" ft = asfcarray(ft) if ft.size == 0: return nt = nthread or nthread_ifft if flags is None: flags = default_flags if tod is None: tod = empty(ft.shape, ft.dtype) engine = get_engine(engine) if engine == 'intel': tod[:] = ifft_flat(ft, tod, axes=axes, nthread=nt, flags=flags) else: plan = engines[engine].FFTW(ft, tod, flags=flags, direction='FFTW_BACKWARD', threads=nt, axes=axes) plan(normalise_idft=False) # I get a small, cumulative loss in amplitude when using # pyfftw's normalize function.. So normalize manually instead if normalize: tod /= np.prod([tod.shape[i] for i in axes]) return tod
[docs]def rfft(tod, ft=None, nthread=0, axes=[-1], flags=None, engine="auto"): """Equivalent to fft, except that if ft is not passed, it is allocated with appropriate shape and data type for a real-to-complex transform.""" tod = asfcarray(tod) if ft is None: oshape = list(tod.shape) oshape[axes[-1]] = oshape[axes[-1]]//2+1 dtype = np.result_type(tod.dtype,0j) ft = empty(oshape, dtype) return fft(tod, ft, nthread, axes, flags=flags, engine=engine)
[docs]def irfft(ft, tod=None, n=None, nthread=0, normalize=False, axes=[-1], flags=None, engine="auto"): """Equivalent to ifft, except that if tod is not passed, it is allocated with appropriate shape and data type for a complex-to-real transform. If n is specified, that is used as the length of the last transform axis of the output array. Otherwise, the length of this axis is computed assuming an even original array.""" ft = asfcarray(ft) if tod is None: oshape = list(ft.shape) oshape[axes[-1]] = n or (oshape[axes[-1]]-1)*2 dtype = np.zeros([],ft.dtype).real.dtype tod = empty(oshape, dtype) return ifft(ft, tod, nthread, normalize, axes, flags=flags, engine=engine)
[docs]def dct(tod, dt=None, nthread=0, normalize=False, axes=[-1], flags=None, type="DCT-I", engine="auto"): """Compute discrete cosine transform of tod, and store it in dt. By default it will do a DCT-I trasnform, but this can be controlled with the type argument. Even the much less common discrete sine transforms are avialble by passing e.g. type="DST-I". Valid values are DCT-I, DCT-II, DCT-III, DCT-IV, DST-I, DST-II, DST-III and DST-IV, or the raw FFTW names the correspond to (e.g. FFTW_REDFT00). If dt is not passed, it will be allocated with the same shape and data type as tod. The optional nthread argument specifies the number of theads to use in the fft. The default (0) uses the value specified by the OMP_NUM_THREAD environment varible if that is specified, or the total number of cores on the computer otherwise. Note that DCTs and DSTs were only added to pyfftw in version 13.0. The function will fail with an Invalid scheme error for older versions. """ tod = asfcarray(tod) type= _dct_names[type] if dt is None: dt = empty(tod.shape, tod.dtype) return fft(tod, dt, nthread=nthread, axes=axes, flags=flags, _direction=[type]*len(axes), engine=engine)
[docs]def idct(dt, tod=None, nthread=0, normalize=False, axes=[-1], flags=None, type="DCT-I", engine="auto"): """Compute the inverse discrete cosine transform of dt, and store it in tod. By default it will do the inverse of a DCT-I trasnform, but this can be controlled with the type argument. Even the much less common discrete sine transforms are avialble by passing e.g. type="DST-I". Valid values are DCT-I, DCT-II, DCT-III, DCT-IV, DST-I, DST-II, DST-III and DST-IV, or the raw FFTW names the correspond to (e.g. FFTW_REDFT00). If tod is not passed, it will be allocated with the same shape and data type as tod. By the default an unnormalized transform is performed. Pass normalize=True to get an actual inverse transform. This divides by a factor of 2*N+d for each axis the transform is performed along, where N is the length of the axis and d is -1 for DCT-1, +1 for DST-I and 0 for all the others. Usually it's faster to compute this factor once and combine it with other scalar factors in your math than to let this function do it, which is why it's turned off by default. Note that this function already takes care of figuring out which transform is the appropriate inverse. E.g. the inverse of b = dct(a, type="DCT-II") is idct(b, type="DCT-II", normalize=True), not idct(b, type="DCT-III", normalize=True) even though DCT-III is the inverse of DCT-II. The optional nthread argument specifies the number of theads to use in the fft. The default (0) uses the value specified by the OMP_NUM_THREAD environment varible if that is specified, or the total number of cores on the computer otherwise. Note that DCTs and DSTs were only added to pyfftw in version 13.0. The function will fail with an Invalid scheme error for older versions.""" dt = asfcarray(dt) type = _dct_inverses[_dct_names[type]] off = _dct_sizes[type] if tod is None: tod = empty(dt.shape, dt.dtype) fft(dt, tod, nthread=nthread, axes=axes, flags=flags, _direction=[type]*len(axes), engine=engine) if normalize: tod /= np.prod([2*(tod.shape[i]+off) for i in axes]) return tod
_dct_names = { "DCT-I": "FFTW_REDFT00", "FFTW_REDFT00":"FFTW_REDFT00", "DCT-II": "FFTW_REDFT10", "FFTW_REDFT10":"FFTW_REDFT10", "DCT-III": "FFTW_REDFT01", "FFTW_REDFT01":"FFTW_REDFT01", "DCT-IV": "FFTW_REDFT11", "FFTW_REDFT11":"FFTW_REDFT11", "DST-I": "FFTW_RODFT00", "FFTW_RODFT00":"FFTW_RODFT00", "DST-II": "FFTW_RODFT10", "FFTW_RODFT10":"FFTW_RODFT10", "DST-III": "FFTW_RODFT01", "FFTW_RODFT01":"FFTW_RODFT01", "DST-IV": "FFTW_RODFT11", "FFTW_RODFT11":"FFTW_RODFT11", } _dct_inverses = { "FFTW_REDFT00":"FFTW_REDFT00", "FFTW_REDFT10":"FFTW_REDFT01", "FFTW_REDFT01":"FFTW_REDFT10", "FFTW_REDFT11":"FFTW_REDFT11", "FFTW_RODFT00":"FFTW_RODFT00", "FFTW_RODFT10":"FFTW_RODFT01", "FFTW_RODFT01":"FFTW_RODFT10", "FFTW_RODFT11":"FFTW_RODFT11", } _dct_sizes = { "FFTW_REDFT00":-1, "FFTW_REDFT10":0, "FFTW_REDFT01":0, "FFTW_REDFT11":0, "FFTW_RODFT00":+1, "FFTW_RODFT10":0, "FFTW_RODFT01":0, "FFTW_RODFT11":0, }
[docs]def redft00(a, b=None, nthread=0, normalize=False, flags=None, engine="auto"): """Old brute-force work-around for missing dcts in pyfftw. Can be removed when newer versions of pyfftw become common. It's not very fast, sadly - about 5 times slower than an rfft. Transforms along the last axis.""" a = asfcarray(a) if b is None: b = empty(a.shape, a.dtype) n = a.shape[-1] tshape = a.shape[:-1] + (2*(n-1),) itmp = empty(tshape, a.dtype) itmp[...,:n] = a[...,:n] itmp[...,n:] = a[...,-2:0:-1] otmp = rfft(itmp, axes=[-1], nthread=nthread, flags=flags, engine=engine) del itmp b[...] = otmp[...,:n].real if normalize: b /= 2*(n-1) return b
[docs]def chebt(a, b=None, nthread=0, flags=None, engine="auto"): """The chebyshev transform of a, along its last dimension.""" b = redft00(a, b, nthread, normalize=True, flags=flags, engine=engine) b[1:-1] *= 2 return b
[docs]def ichebt(a, b=None, nthread=0, engine="auto"): """The inverse chebyshev transform of a, along its last dimension.""" a = asfcarray(a).copy() a[1:-1] *= 0.5 return redft00(a, b, nthread, engine=engine)
[docs]def fft_len(n, direction="below", factors=None): if factors is None: factors = [2,3,5,7,11,13] return utils.nearest_product(n, factors, direction)
[docs]def asfcarray(a): a = np.asarray(a) return np.asarray(a, np.result_type(a,0.0))
[docs]def empty(shape, dtype): return engines[engine].empty_aligned(shape, dtype=dtype, n=alignment)
[docs]def fftfreq(n, d=1.0): return np.fft.fftfreq(n, d=d)
[docs]def rfftfreq(n, d=1.0): return np.arange(n//2+1)/(n*d)
[docs]def ind2freq (n, i, d=1.0): return np.where(i < n/2, i, -n+i)/(d*n)
[docs]def int2rfreq(n, i, d=1.0): return i/(n*d)
[docs]def freq2ind(n, f, d=1.0): j = f*(d*n) return np.where(j >= 0, j, n+j)
[docs]def rfreq2ind(n, f, d=1.0): return f*(n*d)
[docs]def shift(a, shift, axes=None, nofft=False, deriv=None, engine="auto"): """Shift the array a by a (possibly fractional) number of samples "shift" to the right, along the specified axis, which defaults to the last one. shift can also be an array, in which case multiple axes are shifted together.""" a = np.asanyarray(a) ca = a+0j shift = np.atleast_1d(shift) if axes is None: axes = range(-len(shift),0) fa = fft(ca, axes=axes, engine=engine) if not nofft else ca for i, ax in enumerate(axes): ax %= ca.ndim freqs = fftfreq(ca.shape[ax]) phase = np.exp(-2j*np.pi*freqs*shift[i]) if deriv == i: phase *= -2j*np.pi*freqs fa *= phase[(None,)*ax + (slice(None),) + (None,)*(a.ndim-ax-1)] if not nofft: ifft(fa, ca, axes=axes, normalize=True, engine=engine) else: ca = fa return ca if np.iscomplexobj(a) else ca.real
[docs]def resample_fft(fa, n, out=None, axes=-1, norm=1, op=lambda a,b:b): """Given array fa[{dims}] which is the fourier transform of some array a, transform it so that that it corresponds to the fourier transform of a version of a with a different number of samples by padding or truncating the fourier space. The argument n controls the new number of samples. By default this is for the last axis, but this can be changed using the axes argument. Multiple axes can be resampled at once by specifying a tuple for axes and n. The resulting array is multiplied by the argument norm. This can be used for normalization purposes. If norm is 1, then the multiplication is skipped. The argument out can be used to specify an already allocated output array. If it is None (the default), then an array will be allocated automatically. Normally the output array is overwritten, but this can be controlled using the op argument, which should be a function (out,fa)->out""" fa = np.asanyarray(fa) # Support n and axes being either tuples or a single number, # and broadcast n to match axes try: axes = tuple(axes) except TypeError: axes = (axes,) n = np.zeros(len(axes),int)+n # Determine the shape of the output array oshape = list(fa.shape) for i, ax in enumerate(axes): oshape[ax] = n[i] oshape = tuple(oshape) # Check or allocate output array if out is None: out = np.zeros(oshape, fa.dtype) else: if out.shape != oshape: raise ValueError("out argument has wrong shape in resample. Expected %s but got %s" % (str(oshape), str(out.shape))) # This function is used to avoid paying the cost of multiplying by norm when it's one def transfer(dest, source, norm, op): if norm != 1: source = source*norm dest[:] = op(dest, source) # Loop over start and end blocks for all dimensions for bi, I in enumerate(utils.nditer([2]*len(axes))): sel = [slice(None) for n in oshape] for ai, ax in enumerate(axes): c = min(fa.shape[ax], oshape[ax]) if I[ai] == 0: sel[ax] = slice(0,c//2) else: sel[ax] = slice(-(c-c//2),None) transfer(out[sel], fa[sel], norm, op) return out
[docs]def fft_flat(tod, ft, nthread=1, axes=[-1], flags=None, _direction="FFTW_FORWARD"): """Workaround for intel FFTW wrapper. Flattens appropriate dimensions of intput and output arrays to avoid crash that otherwise happens for arrays with ndim > N + 1, where N is the dimension of the transform. If 'axes' correspond to the last dimensions of the arrays, the workaround is essentially free. If `axes` correspond to other axes, copies are made when reshaping the arrays.""" shape_ft = ft.shape naxes = np.atleast_1d(axes).size axes_new = list(range(-1, -1 - naxes, -1)) ft = utils.partial_flatten(ft, axes=axes, pos=0) tod = utils.partial_flatten(tod, axes=axes, pos=0) plan = engines[engine].FFTW(tod, ft, flags=flags, threads=nthread, axes=axes_new, direction=_direction) plan() ft = utils.partial_expand(ft, shape_ft, axes=axes, pos=0) return ft
[docs]def ifft_flat(ft, tod, nthread=1, axes=[-1], flags=None): """Same workaround as fft_flat but now for the inverse transform.""" shape_tod = tod.shape naxes = np.atleast_1d(axes).size axes_new = list(range(-1, -1 - naxes, -1)) tod = utils.partial_flatten(tod, axes=axes, pos=0) ft = utils.partial_flatten(ft, axes=axes, pos=0) plan = engines[engine].FFTW(ft, tod, flags=flags, direction='FFTW_BACKWARD', threads=nthread, axes=axes_new) plan(normalise_idft=False) tod = utils.partial_expand(tod, shape_tod, axes=axes, pos=0) return tod