from __future__ import print_function
import numpy as np, scipy.ndimage, warnings, astropy.io.fits, sys, time, os, contextlib
from . import utils, wcsutils, powspec, fft as enfft
# Things that could be improved:
# 1. We assume exactly 2 WCS axes in spherical projection in {dec,ra} order.
# It would be nice to support other configurations too. I have for example
# needed [det,ra] or even [time,det,ra]. Adding support for this would
# probably necessitate breaking backwards compatibility due to units.
# WCS uses the units specified in the fits file, but I use radians.
# Once we allos non-degree axes, the simple pi/180 conversion I use
# won't work for all axes. It is simpler to just go with the flow and
# use the same units as wcs. I need to think about how this would
# interact with fourier units. Also, reordering or removing axes
# can be difficult. I disallow that now, but for > 2 wcs dimensions,
# these would be useful operations.
# 2. Passing around shape, wcs, dtype all the time is tedious. A simple
# geometry object would make this less tedious, as long as it is
# simple to override individual properties.
# Python 2/3 compatibility
try: basestring
except NameError: basestring = str
mute = {
"polconv_fix": True,
}
# PyFits uses row-major ordering, i.e. C ordering, while the fits file
# itself uses column-major ordering. So an array which is (ncomp,ny,nx)
# will be (nx,ny,ncomp) in the file. This means that the axes in the ndmap
# will be in the opposite order of those in the wcs object.
[docs]class ndmap(np.ndarray):
"""Implements (stacks of) flat, rectangular, 2-dimensional maps as a dense
numpy array with a fits WCS. The axes have the reverse ordering as in the
fits file, and hence the WCS object. This class is usually constructed by
using one of the functions following it, much like numpy arrays. We assume
that the WCS only has two axes with unit degrees. The ndmap itself uses
radians for everything."""
def __new__(cls, arr, wcs):
"""Wraps a numpy and a wcslib world coordinate system object into an ndmap."""
obj = np.asarray(arr).view(cls)
obj.wcs = wcs.deepcopy()
return obj
def __array_finalize__(self, obj):
if obj is None: return
self.wcs = getattr(obj, "wcs", None)
def __repr__(self):
return "ndmap(%s,%s)" % (np.asarray(self), wcsutils.describe(self.wcs))
def __str__(self): return repr(self)
def __array_wrap__(self, arr, context=None):
if arr.ndim < 2: return arr
return ndmap(arr, self.wcs)
def __reduce__(self):
reconstructor, args, state = super(ndmap, self).__reduce__()
state += (self.wcs.to_header_string(),)
return reconstructor, args, state
def __setstate__(self, state):
wcs = wcsutils.WCS(header=state[-1])
super(ndmap, self).__setstate__(state[:-1])
self.wcs = wcs
[docs] def copy(self, order='K'):
return ndmap(np.copy(self,order), self.wcs)
[docs] def sky2pix(self, coords, safe=True, corner=False): return sky2pix(self.shape, self.wcs, coords, safe, corner)
[docs] def pix2sky(self, pix, safe=True, corner=False): return pix2sky(self.shape, self.wcs, pix, safe, corner)
[docs] def l2pix(self, ls): return l2pix(self.shape, self.wcs, ls)
[docs] def pix2l(self, pix): return pix2l(self.shape, self.wcs, pix)
[docs] def contains(self, pos, unit="coord"): return contains(self.shape, self.wcs, pos, unit=unit)
[docs] def corners(self, npoint=10, corner=True): return corners(self.shape, self.wcs, npoint=npoint, corner=corner)
[docs] def box(self, npoint=10, corner=True): return box(self.shape, self.wcs, npoint=npoint, corner=corner)
[docs] def pixbox_of(self,oshape,owcs): return pixbox_of(self.wcs, oshape,owcs)
[docs] def posmap(self, safe=True, corner=False, separable="auto", dtype=np.float64): return posmap(self.shape, self.wcs, safe=safe, corner=corner, separable=separable, dtype=dtype)
[docs] def posaxes(self, safe=True, corner=False, dtype=np.float64): return posaxes(self.shape, self.wcs, safe=safe, corner=corner, dtype=dtype)
[docs] def pixmap(self): return pixmap(self.shape, self.wcs)
[docs] def laxes(self, oversample=1, method="auto"): return laxes(self.shape, self.wcs, oversample=oversample, method=method)
[docs] def lmap(self, oversample=1): return lmap(self.shape, self.wcs, oversample=oversample)
[docs] def modlmap(self, oversample=1, min=0): return modlmap(self.shape, self.wcs, oversample=oversample, min=min)
[docs] def modrmap(self, ref="center", safe=True, corner=False): return modrmap(self.shape, self.wcs, ref=ref, safe=safe, corner=corner)
[docs] def lbin(self, bsize=None, brel=1.0, return_nhit=False, return_bins=False): return lbin(self, bsize=bsize, brel=brel, return_nhit=return_nhit, return_bins=return_bins)
[docs] def rbin(self, center=[0,0], bsize=None, brel=1.0, return_nhit=False, return_bins=False): return rbin(self, center=center, bsize=bsize, brel=brel, return_nhit=return_nhit, return_bins=return_bins)
[docs] def area(self): return area(self.shape, self.wcs)
[docs] def pixsize(self): return pixsize(self.shape, self.wcs)
[docs] def pixshape(self, signed=False): return pixshape(self.shape, self.wcs, signed=signed)
[docs] def pixsizemap(self, separable="auto", broadcastable=False): return pixsizemap(self.shape, self.wcs, separable=separable, broadcastable=broadcastable)
[docs] def pixshapemap(self, separable="auto", signed=False): return pixshapemap(self.shape, self.wcs, separable=separable, signed=signed)
[docs] def lpixsize(self, signed=False, method="auto"): return lpixsize(self.shape, self.wcs, signed=signed, method=method)
[docs] def lpixshape(self, signed=False, method="auto"): return lpixshape(self.shape, self.wcs, signed=signed, method=method)
[docs] def extent(self, method="auto", signed=False): return extent(self.shape, self.wcs, method=method, signed=signed)
@property
def preflat(self):
"""Returns a view of the map with the non-pixel dimensions flattened."""
return self.reshape(-1, self.shape[-2], self.shape[-1])
@property
def npix(self): return np.prod(self.shape[-2:])
@property
def geometry(self): return self.shape, self.wcs
[docs] def resample(self, oshape, off=(0,0), method="fft", mode="wrap", corner=False, order=3): return resample(self, oshape, off=off, method=method, mode=mode, corner=corner, order=order)
[docs] def project(self, shape, wcs, order=3, mode="constant", cval=0, prefilter=True, mask_nan=False, safe=True): return project(self, shape, wcs, order, mode=mode, cval=cval, prefilter=prefilter, mask_nan=mask_nan, safe=safe)
[docs] def insert(self, imap, wrap="auto", op=lambda a,b:b, cval=0, iwcs=None): return insert(self, imap, wrap=wrap, op=op, cval=cval, iwcs=iwcs)
[docs] def insert_at(self, pix, imap, wrap="auto", op=lambda a,b:b, cval=0, iwcs=None): return insert_at(self, pix, imap, wrap=wrap, op=op, cval=cval, iwcs=iwcs)
[docs] def at(self, pos, order=3, mode="constant", cval=0.0, unit="coord", prefilter=True, mask_nan=False, safe=True): return at(self, pos, order, mode=mode, cval=0, unit=unit, prefilter=prefilter, mask_nan=mask_nan, safe=safe)
[docs] def argmax(self, unit="coord"): return argmax(self, unit=unit)
[docs] def autocrop(self, method="plain", value="auto", margin=0, factors=None, return_info=False): return autocrop(self, method, value, margin, factors, return_info)
[docs] def apod(self, width, profile="cos", fill="zero"): return apod(self, width, profile=profile, fill=fill)
[docs] def stamps(self, pos, shape, aslist=False): return stamps(self, pos, shape, aslist=aslist)
[docs] def distance_from(self, points, omap=None, odomains=None, domains=False, method="cellgrid", rmax=None, step=1024): return distance_from(self.shape, self.wcs, points, omap=omap, odomains=odomains, domains=domains, method=method, rmax=rmax, step=step)
@property
def plain(self): return ndmap(self, wcsutils.WCS(naxis=2))
[docs] def padslice(self, box, default=np.nan): return padslice(self, box, default=default)
[docs] def center(self): return center(self.shape,self.wcs)
[docs] def downgrade(self, factor, op=np.mean, ref=None, off=None): return downgrade(self, factor, op=op, ref=ref, off=off)
[docs] def upgrade(self, factor, off=None, oshape=None, inclusive=False): return upgrade(self, factor, off=off, oshape=oshape, inclusive=inclusive)
[docs] def fillbad(self, val=0, inplace=False): fillbad(self, val=val, inplace=inplace)
[docs] def to_healpix(self, nside=0, order=3, omap=None, chunk=100000, destroy_input=False):
return to_healpix(self, nside=nside, order=order, omap=omap, chunk=chunk, destroy_input=destroy_input)
[docs] def to_flipper(self, omap=None, unpack=True): return to_flipper(self, omap=omap, unpack=unpack)
def __getitem__(self, sel):
# Split sel into normal and wcs parts.
sel1, sel2 = utils.split_slice(sel, [self.ndim-2,2])
# If any wcs-associated indices are None, then we don't know how to update the
# wcs, and assume the user knows what it's doing
if any([s is None for s in sel2]):
return ndmap(np.ndarray.__getitem__(self, sel), self.wcs)
if len(sel2) > 2:
raise IndexError("too many indices")
# If the wcs slice includes direct indexing, so that wcs
# axes are lost, then degrade to a normal numpy array,
# since this class assumes that the two last axes are
# wcs axes.
if any([type(s) is not slice for s in sel2]):
return np.asarray(self)[sel]
# Otherwise we will return a full ndmap, including a
# (possibly) sliced wcs.
_, wcs = slice_geometry(self.shape[-2:], self.wcs, sel2)
return ndmap(np.ndarray.__getitem__(self, sel), wcs)
def __getslice__(self, a, b=None, c=None): return self[slice(a,b,c)]
[docs] def submap(self, box, mode=None, wrap="auto"):
"""Extract the part of the map inside the given coordinate box
box : array_like
The [[fromy,fromx],[toy,tox]] coordinate box to select.
The resulting map will have bottom-left and top-right corners
as close as possible to this, but will differ slightly due to
the finite pixel size.
mode : str
How to handle partially selected pixels:
"round": round bounds using standard rules
"floor": both upper and lower bounds will be rounded down
"ceil": both upper and lower bounds will be rounded up
"inclusive": lower bounds are rounded down, and upper bounds up
"exclusive": lower bounds are rounded up, and upper bounds down"""
return submap(self, box, mode=mode, wrap=wrap)
[docs] def subinds(self, box, mode=None, cap=True):
return subinds(self.shape, self.wcs, box=box, mode=mode, cap=cap)
[docs] def write(self, fname, fmt=None):
write_map(fname, self, fmt=fmt)
[docs]def submap(map, box, mode=None, wrap="auto", iwcs=None):
"""Extract the part of the map inside the given coordinate box
box : array_like
The [[fromy,fromx],[toy,tox]] coordinate box to select.
The resulting map will have corners as close
as possible to this, but will differ slightly due to
the finite pixel size.
mode : str
How to handle partially selected pixels:
"round": round bounds using standard rules
"floor": both upper and lower bounds will be rounded down
"ceil": both upper and lower bounds will be rounded up
"inclusive": lower bounds are rounded down, and upper bounds up
"exclusive": lower bounds are rounded up, and upper bounds down
The iwcs argument allows the wcs to be overriden. This is usually
not necessary."""
if iwcs is None: iwcs = map.wcs
ibox = subinds(map.shape, iwcs, box, mode=mode, cap=False)
def helper(b):
if b[2] >= 0: return False, slice(b[0],b[1],b[2])
else: return True, slice(b[1]-b[2],b[0]-b[2],-b[2])
yflip, yslice = helper(ibox[:,0])
xflip, xslice = helper(ibox[:,1])
oshape, owcs = slice_geometry(map.shape, iwcs, (yslice, xslice), nowrap=True)
omap = extract(map, oshape, owcs, wrap=wrap, iwcs=iwcs)
# Unflip if neccessary
if yflip: omap = omap[...,::-1,:]
if xflip: omap = omap[...,:,::-1]
return omap
[docs]def subinds(shape, wcs, box, mode=None, cap=True, noflip=False):
"""Helper function for submap. Translates the coordinate box provided
into a pixel units.
When box is translated into pixels, the result will in general have
fractional pixels, which need to be rounded before we can do any slicing.
To get as robust results as possible, we want
1. two boxes that touch should results in iboxses that also touch.
This means that upper and lower bounds must be handled consistently.
inclusive and exclusive modes break this, and should be used with caution.
2. tiny floating point errors should not usually be able to cause
the ibox to change. Most boxes will have some simple fraction of
a whole degree, and most have pixels with centers at a simple fraction
of a whole degree. Hence, it is likely that box edges will fall
almost exactly on an integer pixel value. floor and ceil will
then move us around by a whole pixel based on tiny numerical
jitter around this value. Hence these should be used with caution.
These concerns leave us with mode = "round" as the only generally
safe alternative, which is why it's default.
"""
if mode is None: mode = "round"
box = np.asarray(box)
# Translate the box to pixels
bpix = skybox2pixbox(shape, wcs, box, include_direction=True)
if noflip:
for b in bpix.T:
if b[2] < 0: b[:] = [b[1],b[0],-b[2]]
if mode == "round": bpix = np.round(bpix)
elif mode == "floor": bpix = np.floor(bpix)
elif mode == "ceil": bpix = np.ceil(bpix)
elif mode == "inclusive": bpix = [np.floor(bpix[0]),np.ceil (bpix[1]), bpix[2]]
elif mode == "exclusive": bpix = [np.ceil (bpix[0]),np.floor(bpix[1]), bpix[2]]
else: raise ValueError("Unrecognized mode '%s' in subinds" % str(mode))
bpix = np.array(bpix, int)
if cap:
# Make sure we stay inside our map bounds
for b, n in zip(bpix.T,shape[-2:]):
if b[2] > 0: b[:2] = [max(b[0], 0),min(b[1], n)]
else: b[:2] = [min(b[0],n-1),max(b[1],-1)]
return bpix
[docs]def slice_geometry(shape, wcs, sel, nowrap=False):
"""Slice a geometry specified by shape and wcs according to the
slice sel. Returns a tuple of the output shape and the correponding
wcs."""
wcs = wcs.deepcopy()
pre, shape = shape[:-2], shape[-2:]
oshape = np.array(shape)
# The wcs object has the indices in reverse order
for i,s in enumerate(sel[-2:]):
s = utils.expand_slice(s, shape[i], nowrap=nowrap)
j = -1-i
start = s.start if s.step > 0 else s.start + 1
wcs.wcs.crpix[j] -= start+0.5
wcs.wcs.crpix[j] /= s.step
wcs.wcs.cdelt[j] *= s.step
wcs.wcs.crpix[j] += 0.5
oshape[i] = (s.stop-s.start+s.step-np.sign(s.step))//s.step
return tuple(pre)+tuple(oshape), wcs
[docs]def scale_geometry(shape, wcs, scale):
"""Scale the geometry so that the number of pixels is scaled
by the factor `scale`.
"""
scale = np.zeros(2)+scale
oshape = tuple(shape[:-2])+tuple(utils.nint(shape[-2:]*scale))
owcs = wcsutils.scale(wcs, scale, rowmajor=True)
return oshape, owcs
[docs]def get_unit(wcs):
return utils.degree
[docs]def npix(shape): return shape[-2]*shape[-1]
[docs]class Geometry:
def __init__(self, shape, wcs=None):
try: self.shape, self.wcs = tuple(shape.shape), shape.wcs
except AttributeError: self.shape, self.wcs = tuple(shape), wcs
assert wcs is not None, "Geometry __init__ needs either a Geometry object or a shape, wcs pair"
@property
def npix(self): return self.shape[-2]*self.shape[-1]
# Make it behave a bit like a tuple, so we can use it interchangably with a shape, wcs pair
# for compatibility
def __len__(self): return 2
def __iter__(self):
yield self.shape
yield self.wcs
def __getitem__(self, sel):
if not isinstance(sel,tuple): sel = (sel,)
shape, wcs = slice_geometry(self.shape, self.wcs, sel)
return Geometry(shape, wcs)
def __repr__(self):
return "Geometry(" + str(self.shape) + ","+str(self.wcs)+")"
@property
def nopre(self): return Geometry(self.shape[-2:], self.wcs)
[docs] def with_pre(self, pre): return Geometry(tuple(pre) + self.shape[-2:], self.wcs)
[docs] def submap(self, box=None, pixbox=None, mode=None, wrap="auto", noflip=False):
if pixbox is None:
pixbox = subinds(self.shape, self.wcs, box, mode=mode, cap=False, noflip=noflip)
def helper(b):
if len(b) < 3: return False, slice(b[0],b[1],1)
elif b[2] >= 0: return False, slice(b[0],b[1],b[2])
else: return True, slice(b[1]-b[2],b[0]-b[2],-b[2])
yflip, yslice = helper(pixbox[:,0])
xflip, xslice = helper(pixbox[:,1])
shape, wcs = slice_geometry(self.shape, self.wcs, (yslice, xslice), nowrap=True)
res = Geometry(shape,wcs)
# Unflip if neccessary
if yflip: res = res[::-1,:]
if xflip: res = res[:,::-1]
return res
[docs] def scale(self, scale):
shape, wcs = scale_geometry(self.shape, self.wcs, scale)
return Geometry(shape, wcs)
[docs] def downgrade(self, factor, op=np.mean):
shape, wcs = downgrade_geometry(self.shape, self.wcs, factor, op=op)
return Geometry(shape, wcs)
[docs] def copy(self):
return Geometry(tuple(self.shape), self.wcs.deepcopy())
[docs] def sky2pix(self, coords, safe=True, corner=False): return sky2pix(self.shape, self.wcs, coords, safe, corner)
[docs] def pix2sky(self, pix, safe=True, corner=False): return pix2sky(self.shape, self.wcs, pix, safe, corner)
[docs] def l2pix(self, ls): return l2pix(self.shape, self.wcs, ls)
[docs] def pix2l(self, pix): return pix2l(self.shape, self.wcs, pix)
[docs]def corners(shape, wcs, npoint=10, corner=True):
"""Return the coordinates of the bottom left and top right corners of the
geometry given by shape, wcs.
If corner==True it is similar to
enmap.pix2sky([[-0.5,shape[-2]-0.5],[-0.5,shape[-1]-0.5]]). That is, it
return sthe coordinate of the bottom left corner of the bottom left pixel and
the top right corner of the top right pixel. If corner==False, then it
instead returns the corresponding pixel centers.
It differs from the simple pix2sky calls above by handling 2*pi wrapping
ambiguities differently. enmap.corners ensures that the coordinates returned
are on the same side of the wrapping cut so that the coordinates of the
two corners can be compared without worrying about wrapping. It does this
by evaluating a set of intermediate points between the corners and counting
and undoing any sudden jumps in coordinates it finds. This is controlled by
the npoint option. The default of 10 should be more than enough.
Returns [{bottom left,top right},{dec,ra}] in radians
(or equivalent for other coordinate systems).
e.g. an array of the form [[dec_min, ra_min ], [dec_max, ra_max]]."""
# Because of wcs's wrapping, we need to evaluate several
# extra pixels to make our unwinding unambiguous.
# Could reduce code duplication a bit here, but I think it's clearer
# when written like this
if corner:
pix = np.array([
np.linspace(-0.5,shape[-2]-0.5,num=npoint,endpoint=True),
np.linspace(-0.5,shape[-1]-0.5,num=npoint,endpoint=True)])
else:
pix = np.array([
np.linspace(0,shape[-2]-1,num=npoint,endpoint=True),
np.linspace(0,shape[-1]-1,num=npoint,endpoint=True)])
coords = wcsutils.nobcheck(wcs).wcs_pix2world(pix[1],pix[0],0)[::-1]
if wcsutils.is_plain(wcs):
return np.array(coords).T[[0,-1]]*get_unit(wcs)
else:
return utils.unwind(np.array(coords)*get_unit(wcs),refmode="middle").T[[0,-1]]
[docs]def box(shape, wcs, npoint=10, corner=True):
"""Alias for corners."""
return corners(shape, wcs, npoint=npoint, corner=corner)
[docs]def enmap(arr, wcs=None, dtype=None, copy=True):
"""Construct an ndmap from data.
Parameters
----------
arr : array_like
The data to initialize the map with.
Must be at least two-dimensional.
wcs : WCS object
dtype : data-type, optional
The data type of the map.
Default: Same as arr.
copy : boolean
If true, arr is copied. Otherwise, a referance is kept."""
def has_wcs(m):
try:
m.wcs
return True
except AttributeError:
return False
if wcs is None:
if has_wcs(arr):
wcs = arr.wcs
elif isinstance(arr, list) and len(arr) > 0 and has_wcs(arr[0]):
wcs = arr[0].wcs
else:
wcs = wcsutils.WCS(naxis=2)
if copy:
arr = np.asanyarray(arr, dtype=dtype).copy()
return ndmap(arr, wcs)
[docs]def empty(shape, wcs=None, dtype=None):
"""
Return an enmap with entries uninitialized (like numpy.empty).
"""
return enmap(np.empty(shape, dtype=dtype), wcs, copy=False)
[docs]def zeros(shape, wcs=None, dtype=None):
"""
Return an enmap with entries initialized to zero (like
numpy.zeros).
"""
return enmap(np.zeros(shape, dtype=dtype), wcs, copy=False)
[docs]def ones(shape, wcs=None, dtype=None):
"""
Return an enmap with entries initialized to one (like numpy.ones).
"""
return enmap(np.ones(shape, dtype=dtype), wcs, copy=False)
[docs]def full(shape, wcs, val, dtype=None):
"""
Return an enmap with entries initialized to val (like numpy.full).
"""
return enmap(np.full(shape, val, dtype=dtype), wcs, copy=False)
[docs]def posmap(shape, wcs, safe=True, corner=False, separable="auto", dtype=np.float64, bsize=1e6, bcheck=False):
"""Return an enmap where each entry is the coordinate of that entry,
such that posmap(shape,wcs)[{0,1},j,k] is the {y,x}-coordinate of
pixel (j,k) in the map. Results are returned in radians, and
if safe is true (default), then sharp coordinate edges will be
avoided. separable controls whether a fast calculation that assumes that
ra is only a function of x and dec is only a function of y is used.
The default is "auto", which determines this based on the wcs, but
True or False can also be passed to control this manually.
For even greater speed, and to save memory, consider using posaxes directly
for cases where you know that the wcs will be separable. For separable cases,
separable=True is typically 15-20x faster than separable=False, while posaxes
is 1000x faster.
"""
res = zeros((2,)+tuple(shape[-2:]), wcs, dtype)
if separable == "auto": separable = wcsutils.is_cyl(wcs)
if separable:
# If posmap could return a (dec,ra) tuple instead of an ndmap,
# we could have returned np.broadcast_arrays(dec, ra) instead.
# That would have been as fast and memory-saving as broadcast-arrays.
dec, ra = posaxes(shape, wcs, safe=safe, corner=corner, bcheck=bcheck)
res[0] = dec[:,None]
res[1] = ra[None,:]
else:
rowstep = int((bsize+shape[-1]-1)//shape[-1])
for i1 in range(0, shape[-2], rowstep):
i2 = min(i1+rowstep, shape[-2])
pix = np.mgrid[i1:i2,:shape[-1]]
res[:,i1:i2,:] = pix2sky(shape, wcs, pix, safe, corner, bcheck=bcheck)
return res
[docs]def posmap_old(shape, wcs, safe=True, corner=False):
pix = np.mgrid[:shape[-2],:shape[-1]]
return ndmap(pix2sky(shape, wcs, pix, safe, corner), wcs)
[docs]def posaxes(shape, wcs, safe=True, corner=False, dtype=np.float64, bcheck=False):
y = np.arange(shape[-2])
x = np.arange(shape[-1])
dec = pix2sky(shape, wcs, np.array([y,y*0]), safe=safe, corner=corner, bcheck=bcheck)[0].astype(dtype, copy=False)
ra = pix2sky(shape, wcs, np.array([x*0,x]), safe=safe, corner=corner, bcheck=bcheck)[1].astype(dtype, copy=False)
return dec, ra
[docs]def pixmap(shape, wcs=None):
"""Return an enmap where each entry is the pixel coordinate of that entry."""
res = np.mgrid[:shape[-2],:shape[-1]]
return res if wcs is None else ndmap(res,wcs)
[docs]def pix2sky(shape, wcs, pix, safe=True, corner=False, bcheck=False):
"""Given an array of pixel coordinates [{y,x},...],
return sky coordinates in the same ordering."""
pix = np.asarray(pix).astype(float)
if corner: pix -= 0.5
pflat = pix.reshape(pix.shape[0], -1)
if not bcheck: wcs = wcsutils.nobcheck(wcs)
coords = np.asarray(wcs.wcs_pix2world(*(tuple(pflat)[::-1]+(0,)))[::-1])*get_unit(wcs)
coords = coords.reshape(pix.shape)
if safe and not wcsutils.is_plain(wcs):
coords = utils.unwind(coords, refmode="middle")
return coords
[docs]def sky2pix(shape, wcs, coords, safe=True, corner=False, bcheck=False):
"""Given an array of coordinates [{dec,ra},...], return
pixel coordinates with the same ordering. The corner argument
specifies whether pixel coordinates start at pixel corners
or pixel centers. This represents a shift of half a pixel.
If corner is False, then the integer pixel closest to a position
is round(sky2pix(...)). Otherwise, it is floor(sky2pix(...))."""
coords = np.asarray(coords)/get_unit(wcs)
cflat = coords.reshape(coords.shape[0], -1)
# Quantities with a w prefix are in wcs ordering (ra,dec)
if not bcheck: wcs = wcsutils.nobcheck(wcs)
wpix = np.asarray(wcs.wcs_world2pix(*tuple(cflat)[::-1]+(0,)))
if corner: wpix += 0.5
if safe and not wcsutils.is_plain(wcs):
wshape = shape[-2:][::-1]
# Put the angle cut as far away from the map as possible.
# We do this by putting the reference point in the middle
# of the map.
wrefpix = np.array(wshape)/2.
if corner: wrefpix += 0.5
for i in range(len(wpix)):
wn = np.abs(360./wcs.wcs.cdelt[i])
if safe == 1:
wpix[i] = utils.rewind(wpix[i], wrefpix[i], wn)
else:
wpix[i] = utils.unwind(wpix[i], period=wn, ref=wrefpix[i])
return wpix[::-1].reshape(coords.shape)
[docs]def pix2l(shape, wcs, pix):
"""Given an array of fourier-pixel coordinates [{y,x},...], returns
the 2d fourier coordinates [{ly,lx},...]."""
pix = np.asanyarray(pix)
pshape = pixshape(shape, wcs, signed=True)
return enfft.ind2freq(np.array(shape[-2:]).T, pix.T, pshape.T/(2*np.pi)).T
[docs]def l2pix(shape, wcs, ls):
"""Given an array of fourier-pixel coordinates [{y,x},...], returns
the 2d fourier coordinates [{ly,lx},...]."""
ls = np.asanyarray(ls)
pshape = pixshape(shape, wcs, signed=True)
return enfft.freq2ind(np.array(shape[-2:]).T, ls.T, pshape.T/(2*np.pi)).T
[docs]def skybox2pixbox(shape, wcs, skybox, npoint=10, corner=False, include_direction=False):
"""Given a coordinate box [{from,to},{dec,ra}], compute a
corresponding pixel box [{from,to},{y,x}]. We avoid
wrapping issues by evaluating a number of subpoints."""
coords = np.array([
np.linspace(skybox[0,0],skybox[1,0],num=npoint,endpoint=True),
np.linspace(skybox[0,1],skybox[1,1],num=npoint,endpoint=True)])
pix = sky2pix(shape, wcs, coords, corner=corner, safe=2)
dir = np.sign(pix[:,1]-pix[:,0])
res = pix[:,[0,-1]].T
if include_direction: res = np.concatenate([res,dir[None]],0)
return res
[docs]def pixbox2skybox(shape, wcs, pixbox):
return pix2sky(shape, wcs, np.asanyarray(pixbox).T).T
[docs]def contains(shape, wcs, pos, unit="coord"):
"""For the points with coordinates pos[{dec,ra},...] return whether
each is inside the geometry given by shape, wcs"""
if unit == "coord": pix = sky2pix(shape, wcs, pos)
else: pix = pos
return np.all((pix>=0)&(pix.T<shape[-2:]).T,0)
[docs]def project(map, shape, wcs, order=3, mode="constant", cval=0.0, force=False, prefilter=True, mask_nan=False, safe=True, bsize=1000):
"""Project the map into a new map given by the specified
shape and wcs, interpolating as necessary. Handles nan
regions in the map by masking them before interpolating.
This uses local interpolation, and will lose information
when downgrading compared to averaging down."""
# Skip expensive operation if map is compatible
if not force:
if wcsutils.equal(map.wcs, wcs) and tuple(shape[-2:]) == tuple(shape[-2:]):
return map.copy()
elif wcsutils.is_compatible(map.wcs, wcs) and mode == "constant":
return extract(map, shape, wcs, cval=cval)
omap = zeros(map.shape[:-2]+shape[-2:], wcs, map.dtype)
# Save memory by looping over rows
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
somap = omap[...,i1:i2,:]
pix = map.sky2pix(somap.posmap(), safe=safe)
y1 = max(np.min(pix[0]).astype(int)-3,0)
y2 = min(np.max(pix[0]).astype(int)+3,map.shape[-2])
if y2-y1 <= 0: continue
pix[0] -= y1
somap[:] = utils.interpol(map[...,y1:y2,:], pix, order=order, mode=mode, cval=cval, prefilter=prefilter, mask_nan=mask_nan)
return omap
[docs]def pixbox_of(iwcs,oshape,owcs):
"""Obtain the pixbox which when extracted from a map with WCS=iwcs
returns a map that has geometry oshape,owcs.
"""
# First check that our wcs is compatible
assert wcsutils.is_compatible(iwcs, owcs), "Incompatible wcs in enmap.extract: %s vs. %s" % (str(iwcs), str(owcs))
# Find the pixel bounding box of the output in terms of the input.
# This is simple because our wcses are compatible, so they
# can only differ by a simple pixel offset. Here pixoff is
# pos_input - pos_output. This is equivalent to the coordinates of
pixoff = utils.nint((iwcs.wcs.crpix-owcs.wcs.crpix) - (iwcs.wcs.crval-owcs.wcs.crval)/iwcs.wcs.cdelt)[::-1]
pixbox = np.array([pixoff,pixoff+np.array(oshape[-2:])])
return pixbox
[docs]def insert(omap, imap, wrap="auto", op=lambda a,b:b, cval=0, iwcs=None):
"""Insert imap into omap based on their world coordinate systems, which
must be compatible. Essentially the reverse of extract."""
extract(omap, imap.shape, imap.wcs, imap, wrap="auto", op=op, cval=0, iwcs=None, reverse=True)
return omap
[docs]def insert_at(omap, pix, imap, wrap="auto", op=lambda a,b:b, cval=0, iwcs=None):
"""Insert imap into omap at the position given by pix. If pix is [y,x], then
[0:ny,0:nx] in imap will be copied into [y:y+ny,x:x+nx] in omap. If pix is
[{from,to,[stride]},{y,x}], then this specifies the omap pixbox into which to
copy imap. Wrapping is handled the same way as in extract."""
pixbox = np.array(pix)
if pixbox.ndim == 1: pixbox = np.array([pixbox,pixbox+imap.shape[-2:]])
extract_pixbox(omap, pixbox, imap, wrap=wrap, op=op, cval=cval, iwcs=iwcs, reverse=True)
return omap
[docs]def overlap(shape, wcs, shape2_or_pixbox, wcs2=None, wrap="auto"):
"""Compute the overlap between the given geometry (shape, wcs) and another *compatible*
geometry. This can be either another shape, wcs pair or a pixbox[{from,to},{y,x}].
Returns the geometry of the overlapping region."""
# Is it a shape or a pixbox
tmp = np.asarray(shape2_or_pixbox)
if tmp.ndim == 1: pixbox = pixbox_of(wcs, shape2_or_pixbox, wcs2)
elif tmp.ndim == 2: pixbox = shape2_or_pixbox
else: raise ValueError("3rd argument of overlap should be an enmap, a shape tuple or a pixbox")
# Handle wrapping
nphi = utils.nint(360/np.abs(wcs.wcs.cdelt[0]))
# If our map is wider than the wrapping length, assume we're a lower-spin field
nphi *= (nphi+shape[-1]-1)//nphi
if utils.streq(wrap, "auto"):
wrap = [0,0] if wcsutils.is_plain(wcs) else [0,nphi]
# Looks like the sbox stuff in utils doesn't do this, so do it ourself.
for i in range(2):
# If pixbox goes below 0, truncate it unless it goes negative
# enough to reach our wrapped end.
if pixbox[0,i] < 0 and (not wrap[i] or pixbox[0,i]+wrap[i] >= shape[-2+i]):
pixbox[0,i] = 0
# Similarly, if it goes above our end, truncate it unless it
# goes far enough to reach our wrapped beginning
if pixbox[1,i] > shape[-2+i] and (not wrap[i] or pixbox[1,i]-wrap[i] <= 0):
pixbox[1,i] = shape[-2+i]
# This will ensure that we get a zero shape instead of a negative one if
# there is no overlap
pixbox[1] = np.maximum(pixbox[1],pixbox[0])
# Good, we now have the capped, wrapped pixbox
oshape = tuple(pixbox[1]-pixbox[0])
owcs = wcs.deepcopy()
owcs.wcs.crpix -= pixbox[0,1::-1]
return oshape, owcs
[docs]def neighborhood_pixboxes(shape, wcs, poss, r):
"""Given a set of positions poss[npos,{dec,ra}] in radians and a distance r in radians,
return pixboxes[npos][{from,to},{y,x}] corresponding to the regions within a
distance of r from each entry in poss."""
if wcsutils.is_plain(wcs):
rpix = r/pixsize(shape, wcs)
centers = sky2pix(poss.T).T
return np.moveaxis([centers-rpix,center+rpix+1],0,1)
poss, r = utils.broadcast_arrays(poss, r, npost=(1,0))
res = np.zeros(poss.shape[:-1]+(2,2))
for I in utils.nditer(poss.shape[:-1]):
pos, r_ = poss[I], r[I]
# Find the coordinate box we need
dec, ra = pos[:2]
dec1, dec2 = max(dec-r_,-np.pi/2), min(dec+r_,np.pi/2)
with utils.nowarn():
scale = 1/min(np.cos(dec1), np.cos(dec2))
dra = min(r_*scale, np.pi)
ra1, ra2 = ra-dra, ra+dra
box = np.array([[dec1,ra1],[dec2,ra2]])
# And get the corresponding pixbox
res[I] = skybox2pixbox(shape, wcs, box)
# Turn ranges into from-inclusive, to-exclusive integers.
res = utils.nint(res)
res = np.sort(res, -2)
res[...,1,:] += 1
return res
[docs]def at(map, pos, order=3, mode="constant", cval=0.0, unit="coord", prefilter=True, mask_nan=False, safe=True):
if unit != "pix": pos = sky2pix(map.shape, map.wcs, pos, safe=safe)
return utils.interpol(map, pos, order=order, mode=mode, cval=cval, prefilter=prefilter, mask_nan=mask_nan)
[docs]def argmax(map, unit="coord"):
"""Return the coordinates of the maximum value in the specified map.
If map has multiple components, the maximum value for each is returned
separately, with the last axis being the position. If unit is "pix",
the position will be given in pixels. Otherwise it will be in physical
coordinates."""
return _arghelper(map, np.argmax, unit)
[docs]def argmin(map, unit="coord"):
"""Return the coordinates of the minimum value in the specified map.
See argmax for details."""
return _arghelper(map, np.argmin, unit)
def _arghelper(map, func, unit):
res = func(map.reshape(-1,map.npix),-1)
res = np.array([np.unravel_index(r, map.shape[-2:]) for r in res])
res = res.reshape(map.shape[:-2]+(2,))
if unit == "coord": res = pix2sky(map.shape, map.wcs, res.T).T
return res
[docs]def rand_map(shape, wcs, cov, scalar=False, seed=None, pixel_units=False, iau=False, spin=[0,2]):
"""Generate a standard flat-sky pixel-space CMB map in TQU convention based on
the provided power spectrum. If cov.ndim is 4, 2D power is assumed else 1D
power is assumed. If pixel_units is True, the 2D power spectra is assumed
to be in pixel units, not in steradians."""
if seed is not None: np.random.seed(seed)
kmap = rand_gauss_iso_harm(shape, wcs, cov, pixel_units)
if scalar:
return ifft(kmap,normalize=True).real
else:
return harm2map(kmap, iau=iau, spin=spin)
[docs]def rand_gauss(shape, wcs, dtype=None):
"""Generate a map with random gaussian noise in pixel space."""
return ndmap(np.random.standard_normal(shape), wcs).astype(dtype,copy=False)
[docs]def rand_gauss_harm(shape, wcs):
"""Mostly equivalent to np.fft.fft2(np.random.standard_normal(shape)),
but avoids the fft by generating the numbers directly in frequency
domain. Does not enforce the symmetry requried for a real map. If box is
passed, the result will be an enmap."""
return ndmap(np.random.standard_normal(shape)+1j*np.random.standard_normal(shape),wcs)
[docs]def rand_gauss_iso_harm(shape, wcs, cov, pixel_units=False):
"""Generates a random map with component covariance
cov in harmonic space, where cov is a (comp,comp,l) array or a
(comp,comp,Ny,Nx) array. Despite the name, the map doesn't need
to be isotropic since 2D power spectra are allowed.
If cov.ndim is 4, cov is assumed to be an array of 2D power spectra.
else cov is assumed to be an array of 1D power spectra.
If pixel_units is True, the 2D power spectra is assumed to be in pixel units,
not in steradians."""
if cov.ndim==4:
if not(pixel_units): cov = cov * np.prod(shape[-2:])/area(shape,wcs )
covsqrt = multi_pow(cov, 0.5)
else:
covsqrt = spec2flat(shape, wcs, massage_spectrum(cov, shape), 0.5, mode="constant")
data = map_mul(covsqrt, rand_gauss_harm(shape, wcs))
return ndmap(data, wcs)
[docs]def massage_spectrum(cov, shape):
"""given a spectrum cov[nl] or cov[n,n,nl] and a shape
(stokes,ny,nx) or (ny,nx), return a new ocov that has
a shape compatible with shape, padded with zeros if necessary.
If shape is scalar (ny,nx), then ocov will be scalar (nl).
If shape is (stokes,ny,nx), then ocov will be (stokes,stokes,nl)."""
cov = np.asarray(cov)
if cov.ndim == 1: cov = cov[None,None]
if len(shape) == 2: return cov[0,0]
ocov = np.zeros((shape[0],shape[0])+cov.shape[2:])
nmin = min(cov.shape[0],ocov.shape[0])
ocov[:nmin,:nmin] = cov[:nmin,:nmin]
return ocov
[docs]def extent(shape, wcs, nsub=None, signed=False, method="auto"):
"""Returns the area of a patch with the given shape
and wcs, in steradians."""
if method == "auto":
if wcsutils.is_plain(wcs): method = "intermediate"
elif wcsutils.is_cyl(wcs): method = "cylindrical"
else: method = "subgrid"
if method in ["inter","intermediate"]: return extent_intermediate(shape, wcs, signed=signed)
elif method in ["cyl", "cylindrical"]: return extent_cyl(shape, wcs, signed=signed)
elif method in ["sub", "subgrid"]: return extent_subgrid(shape, wcs, nsub=nsub, signed=signed)
else: raise ValueError("Unrecognized method '%s' in extent()" % method)
# Approximations to physical box size and area are needed
# for transforming to l-space. We can do this by dividing
# our map into a set of rectangles and computing the
# coordinates of their corners. The rectangles are assumed
# to be small, so cos(dec) is constant across them, letting
# us rescale RA by cos(dec) inside each. We also assume each
# rectangle to be .. a rectangle (:D), so area is given by
# two side lengths.
# The total length in each direction could be computed by
# 1. Average of top and bottom length
# 2. Mean of all row lengths
# 3. Area-weighted mean of row lengths
# 4. Some sort of compromise that makes length*height = area.
# To construct the coarser system, slicing won't do, as it
# shaves off some of our area. Instead, we must modify
# cdelt to match our new pixels: cdelt /= nnew/nold
[docs]def extent_subgrid(shape, wcs, nsub=None, safe=True, signed=False):
"""Returns an estimate of the "physical" extent of the
patch given by shape and wcs as [height,width] in
radians. That is, if the patch were on a sphere with
radius 1 m, then this function returns approximately how many meters
tall and wide the patch is. These are defined such that
their product equals the physical area of the patch.
Obs: Has trouble with areas near poles."""
total_area = area(shape, wcs)
if nsub is None: nsub = 17
# Create a new wcs with (nsub,nsub) pixels
wcs = wcs.deepcopy()
step = (np.asfarray(shape[-2:])/nsub)[::-1]
wcs.wcs.crpix -= 0.5
wcs.wcs.cdelt *= step
wcs.wcs.crpix /= step
wcs.wcs.crpix += 0.5
# We need a representative cos(dec) for each pixel. Will use the center
coss = np.cos(posmap([nsub,nsub], wcs, safe=False)[0])
# Get the length of each row in the image. This will be the distance
# from the middle of the left edge of the left-most pixel to the middle
# of the right edge of the right-most pixel
pixs = np.mgrid[:nsub,:nsub+1].astype(float)
pixs[1] -= 0.5
decs, ras = pix2sky(nsub, wcs, pixs, safe=False)
pix_lengths = (utils.rewind(decs[:,1:]-decs[:,:-1])**2 + (utils.rewind(ras[:,1:]-ras[:,:-1])*coss)**2)**0.5
# Get the height of each col in the image
pixs = np.mgrid[:nsub+1,:nsub].astype(float)
pixs[0] -= 0.5
decs, ras = pix2sky(nsub, wcs, pixs, safe=False)
pix_heights = (utils.rewind(decs[1:,:]-decs[:-1,:])**2 + (utils.rewind(ras[1:,:]-ras[:-1,:])*coss)**2)**0.5
# The area is the sum of their product
pix_areas = pix_lengths*pix_heights
# The extent should be a compromise between the different lengths and heights
# that gives the right area.
mean_length = np.mean(pix_lengths)*nsub
mean_height = np.mean(pix_heights)*nsub
# Then scale both to ensure we get the right area
correction = (total_area/(mean_length*mean_height))**0.5
mean_length *= correction
mean_height *= correction
ext = np.array([mean_height, mean_length])
if signed: ext *= np.sign(wcs.wcs.cdelt[::-1])
return ext
[docs]def extent_cyl(shape, wcs, signed=False):
"""Extent specialized for a cylindrical projection.
Vertical: ny*cdelt[1]
Horizontal: Each row is nx*cdelt[0]*cos(dec), but we
want a single representative number, which will be
some kind of average, and we're free to choose which. We choose
the one that makes the product equal the true area.
Area = nx*ny*cdelt[0]*cdelt[1]*mean(cos(dec)) = vertical*(nx*cdelt[0]*mean(cos)),
so horizontal = nx*cdelt[0]*mean(cos)"""
dec1, dec2 = pix2sky(shape, wcs, [[-0.5,shape[-2]-1+0.5],[0,0]], safe=False)[0]
if dec1 <= dec2: ysign = 1
else: dec1, dec2, ysign = dec2, dec1, -1
dec1, dec2 = max(-np.pi/2, dec1), min(np.pi/2, dec2)
mean_cos = (np.sin(dec2)-np.sin(dec1))/(dec2-dec1)
ext = np.array([(dec2-dec1)*ysign, shape[-1]*wcs.wcs.cdelt[0]*mean_cos*get_unit(wcs)])
if not signed: ext = np.abs(ext)
return ext
[docs]def area(shape, wcs, nsamp=1000, method="auto"):
"""Returns the area of a patch with the given shape
and wcs, in steradians."""
if method == "auto":
if wcsutils.is_plain(wcs): method = "intermediate"
elif wcsutils.is_cyl(wcs): method = "cylindrical"
else: method = "contour"
if method in ["inter","intermediate"]: return area_intermediate(shape, wcs)
elif method in ["cyl", "cylindrical"]: return area_cyl(shape, wcs)
elif method in ["cont", "contour"]: return area_contour(shape, wcs, nsamp=nsamp)
else: raise ValueError("Unrecognized method '%s' in area()" % method)
[docs]def area_cyl(shape, wcs):
"""Get the area of a cylindrical projection. Fast and exact."""
dec1, dec2 = np.sort(pix2sky(shape, wcs, [[-0.5,shape[-2]-1+0.5],[0,0]], safe=False)[0])
dec1, dec2 = max(-np.pi/2, dec1), min(np.pi/2, dec2)
return (np.sin(dec2)-np.sin(dec1))*abs(wcs.wcs.cdelt[0])*shape[-1]*get_unit(wcs)
[docs]def area_contour(shape, wcs, nsamp=1000):
"""Get the area of the map by doing a contour integral (1-sin(dec)) d(RA)
over the closed path (dec(t), ra(t)) that bounds the valid region of
the map, so it only works for projections where we can figure out this
boundary. Using only d(RA) in the integral corresponds to doing a top-hat
integral instead of something trapezoidal, but this method is fast enough
that we can afford many points to compensate.
The present implementation works for cases where the valid
region of the map runs through the centers of the pixels on
each edge or through the outer edge of those pixels (this
detail can be different for each edge). The former case is
needed in the full-sky cylindrical projections that have
pixels centered exactly on the poles.
"""
n2, n1 = shape[-2:]
row_lims, col_lims = [], []
# Ideally we want to integrate around the real outer edge
# of our patch, which is displaced by half a pixel coordinate
# from the pixel centers, but sometimes those coordinates are
# not valid. The nobcheck should avoid that, but we still include
# them to be safe. For the case where nobcheck avoids invalid values,
# the clip() later cuts off the parts of the pixels that go outside
# bounds. This differs from using the backup points by also handling
# pixels that are offset from the poles by a non-half-integer amount.
for dest_list, test_points in [
(col_lims, [( -0.5, 0.0), ( 0.0, 0.0)]),
(col_lims, [(n1-0.5, 0.0), (n1-1.0, 0.0)]),
(row_lims, [(0.0, -0.5), (0.0, 0.0)]),
(row_lims, [(0.0, n2-0.5), (0.0, n2-1.0)]),
]:
for t in test_points:
if not np.any(np.isnan(wcsutils.nobcheck(wcs).wcs_pix2world([t], 0))):
dest_list.append(np.array(t, float))
break
else:
raise ValueError("Could not identify map_boundary; last test point was %s" % t)
# We want to draw a closed patch connecting the four corners
# of the boundary.
col_lims = [_c[0] for _c in col_lims]
row_lims = [_r[1] for _r in row_lims]
vertices = np.array([
(col_lims[0], row_lims[0]),
(col_lims[1], row_lims[0]),
(col_lims[1], row_lims[1]),
(col_lims[0], row_lims[1]),
(col_lims[0], row_lims[0])])
total = 0
tot_dra = 0
for v0, v1 in zip(vertices[:-1], vertices[1:]):
line_pix = np.linspace(0, 1, nsamp)[:,None] * (v1 - v0) + v0
line = wcsutils.nobcheck(wcs).wcs_pix2world(line_pix, 0)
# Stay within valid dec values. Used for pixels at the poles
line[:,1] = np.clip(line[:,1], -90, 90)
dec = (line[1:,1] + line[:-1,1]) / 2 # average dec
dra = line[1:,0] - line[:-1,0] # delta RA
dra = (dra+180) % 360 - 180 # safetyize branch crossing.
total += ((1-np.sin(dec*utils.degree)) * dra).sum()*utils.degree
return abs(total)
[docs]def pixsize(shape, wcs):
"""Returns the average pixel area, in steradians."""
return area(shape, wcs)/np.prod(shape[-2:])
[docs]def pixshape(shape, wcs, signed=False):
"""Returns the average pixel height and width, in radians."""
return extent(shape, wcs, signed=signed)/shape[-2:]
[docs]def pixshapemap(shape, wcs, bsize=1000, separable="auto", signed=False, bcheck=False):
"""Returns the physical width and heigh of each pixel in the map in radians.
Heavy for big maps. Much faster approaches are possible for known pixelizations."""
if wcsutils.is_plain(wcs):
cdelt = wcs.wcs.cdelt
pshape = np.zeros([2])
pshape[0] = wcs.wcs.cdelt[1]*get_unit(wcs)
pshape[1] = wcs.wcs.cdelt[0]*get_unit(wcs)
if not signed: pshape = np.abs(pshape)
pshape = np.broadcast_to(pshape[:,None,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
elif separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
pshape = pixshapes_cyl(shape, wcs, signed=signed, bcheck=bcheck)
pshape = np.broadcast_to(pshape[:,:,None], (2,)+shape[-2:])
return ndmap(pshape, wcs)
else:
pshape = zeros((2,)+shape[-2:], wcs)
# Loop over blocks in y to reduce memory usage
for i1 in range(0, shape[-2], bsize):
i2 = min(i1+bsize, shape[-2])
pix = np.mgrid[i1:i2+1,:shape[-1]+1]
with utils.nowarn():
y, x = pix2sky(shape, wcs, pix, safe=True, corner=True, bcheck=bcheck)
del pix
dy = y[1:,1:]-y[:-1,:-1]
dx = x[1:,1:]-x[:-1,:-1]
if not signed: dy, dx = np.abs(dy), np.abs(dx)
cy = np.cos(y)
bad= cy<= 0
cy[bad] = np.mean(cy[~bad])
dx *= 0.5*(cy[1:,1:]+cy[:-1,:-1])
del y, x, cy
# Due to wcs fragility, we may have some nans at wraparound points.
# Fill these with the mean non-nan value. Since most maps will be cylindrical,
# it makes sense to do this by row
bad = ~np.isfinite(dy)
dy[bad] = np.mean(dy[~bad])
bad = ~np.isfinite(dx)
dx[bad] = np.mean(dx[~bad])
# Copy over to our output array
pshape[0,i1:i2,:] = dy
pshape[1,i1:i2,:] = dx
del dx, dy
return pshape
[docs]def pixshapes_cyl(shape, wcs, signed=False, bcheck=False):
"""Returns the physical width and height of pixels for each row of a cylindrical
map with the given shape, wcs, in radians, as an array [{height,width},ny]. All
pixels in a row have the same shape in a cylindrical projection."""
res = np.zeros([2,shape[-2]])
ny = shape[-2]
# Get the dec of all the pixel edges, and use that to get the heights.
y = np.arange(ny+1)-0.5
x = y*0
dec, ra = pix2sky(shape, wcs, [y,x], safe=False, bcheck=bcheck)
if not np.isfinite(dec[0]): dec[0] = -np.pi/2 if wcs.wcs.cdelt[1] >= 0 else np.pi/2
if not np.isfinite(dec[-1]): dec[-1] = np.pi/2 if wcs.wcs.cdelt[1] >= 0 else -np.pi/2
dec = np.clip(dec, -np.pi/2, np.pi/2)
heights = dec[1:]-dec[:-1]
# A pixel that goes from dec1 to dec2 with a RA interval of dRA has an area of
# (sin(dec2)-sin(dec1))*dRA. We will assign a width of area/height to each pixel,
# such that we can simply define pixsize in terms of pixshape. That is,
# width = dRA * (sin(dec2)-sin(dec1))/(dec2-dec1)
dRA = wcs.wcs.cdelt[0]*utils.degree
sdec = np.sin(dec)
widths= dRA * (sdec[1:]-sdec[:-1])/heights
res[0] = heights
res[1] = widths
if not signed:
res = np.abs(res)
return res
[docs]def pixsizemap(shape, wcs, separable="auto", broadcastable=False, bsize=1000, bcheck=False):
"""Returns the physical area of each pixel in the map in steradians.
If separable is True, then the map will be assumed to be in a cylindircal
projection, where the pixel size is only a function of declination.
This makes the calculation dramatically faster, and the resulting array
will also use much less memory due to numpy striding tricks. The default,
separable=auto", determines whether to use this shortcut based on the
properties of the wcs.
Normally the function returns a ndmap of shape [ny,nx]. If broadcastable
is True, then it is allowed to return a smaller array that would still
broadcast correctly to the right shape. This can be useful to save work
if one is going to be doing additional manipulation of the pixel size
before using it. For a cylindrical map, the result would have shape [ny,1]
if broadcastable is True.
BUG: This function assumes parallelogram-shaped pixels. This breaks for
non-cylindrical projections!
"""
if wcsutils.is_plain(wcs):
return full(shape[-2:], wcs, np.abs(wcs.wcs.cdelt[0]*wcs.wcs.cdelt[1])*utils.degree**2)
elif separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
psize = np.prod(pixshapes_cyl(shape, wcs, bcheck=bcheck),0)[:,None]
# Expand to full shape unless we are willing to accept an array
# with smaller size that is still broadcastable to the right result
if not broadcastable:
psize = np.broadcast_to(psize, shape[-2:])
return ndmap(psize, wcs)
else:
return pixsizemap_contour(shape, wcs, bsize=bsize, bcheck=bcheck)
[docs]def pixsizemap_contour(shape, wcs, bsize=1000, bcheck=False):
# Loop to save memory. Numba-candidate?
psizes = zeros(shape[-2:], wcs)
for y1 in range(0, shape[-2], bsize):
y2 = min(y1+bsize, shape[-2])
# Get the pixel coordinates our pixels' corners, and
# turn them into dec,ra
pixs = np.mgrid[y1:y2+1,:shape[-1]+1]-0.5
poss = pix2sky(shape, wcs, pixs, bcheck=bcheck)
del pixs
# Avoid impossible coordinates
poss[0] = np.clip(poss[0], -np.pi/2, np.pi/2)
dec, ra = poss
# Integrate (1-sin(dec))*dRA from [0,0] to [1,0], using the
# average msin value along this path
msin = 1-np.sin(dec)
areas = (ra[ 1:,:-1]-ra[:-1,:-1])*(msin[ 1:,:-1]+msin[:-1,:-1])/2
# [1,0] to [1,1]
areas += (ra[ 1:, 1:]-ra[ 1:,:-1])*(msin[ 1:, 1:]+msin[ 1:,:-1])/2
# [1,1] to [0,1]
areas += (ra[:-1, 1:]-ra[ 1:, 1:])*(msin[:-1, 1:]+msin[ 1:, 1:])/2
# [0,1] to [0,0]
areas += (ra[:-1,:-1]-ra[:-1, 1:])*(msin[:-1,:-1]+msin[:-1, 1:])/2
psizes[y1:y2] = np.abs(areas)
return psizes
[docs]def pixshapebounds(shape, wcs, separable="auto"):
"""Return the minimum and maximum pixel height and width for the given
geometry, in the form [{min,max},{y,x}]. Fast for separable geometries like
cylindrical ones, which it will try to recognize, but this can be forced by
setting separable to True (or disabled with False). Heavy in the general case."""
if separable == True or (separable == "auto" and wcsutils.is_cyl(wcs)):
return utils.minmax(pixshapes_cyl(shape, wcs),1)
else:
return utils.minmax(pixshapemap(shape, wcs))
[docs]def lmap(shape, wcs, oversample=1, method="auto"):
"""Return a map of all the wavenumbers in the fourier transform
of a map with the given shape and wcs."""
ly, lx = laxes(shape, wcs, oversample=oversample, method=method)
data = np.empty((2,ly.size,lx.size))
data[0] = ly[:,None]
data[1] = lx[None,:]
return ndmap(data, wcs)
[docs]def modlmap(shape, wcs, oversample=1, method="auto", min=0):
"""Return a map of all the abs wavenumbers in the fourier transform
of a map with the given shape and wcs."""
slmap = lmap(shape,wcs,oversample=oversample, method=method)
l = np.sum(slmap**2,0)**0.5
if min > 0: l = np.maximum(l, min)
return l
[docs]def center(shape,wcs):
cpix = (np.array(shape[-2:])-1)/2.
return pix2sky(shape,wcs,cpix)
[docs]def modrmap(shape, wcs, ref="center", safe=True, corner=False):
"""Return an enmap where each entry is the distance from center
of that entry. Results are returned in radians, and if safe is true
(default), then sharp coordinate edges will be avoided."""
slmap = posmap(shape,wcs,safe=safe,corner=corner)
if isinstance(ref,basestring):
if ref=="center": ref = center(shape,wcs)
else: raise ValueError
ref = np.array(ref)[:,None,None]
if wcsutils.is_plain(wcs): return np.sum((slmap-ref)**2,0)**0.5
return ndmap(utils.angdist(slmap[::-1],ref[::-1],zenith=False),wcs)
[docs]def laxes(shape, wcs, oversample=1, method="auto"):
oversample = int(oversample)
step = extent(shape, wcs, signed=True, method=method)/shape[-2:]
ly = np.fft.fftfreq(shape[-2]*oversample, step[0])*2*np.pi
lx = np.fft.fftfreq(shape[-1]*oversample, step[1])*2*np.pi
if oversample > 1:
# When oversampling, we want even coverage of fourier-space
# pixels. Because the pixel value indicates the *center* l
# of that pixel, we must shift ls when oversampling.
# E.g. [0,100,200,...] oversample 2 => [-25,25,75,125,175,...],
# not [0,50,100,150,200,...].
# And [0,100,200,...] os 3 => [-33,0,33,66,100,133,...]
# In general [0,a,2a,3a,...] os n => a*(-1+(2*i+1)/n)/2
# Since fftfreq generates a*i, the difference is a/2*(-1+1/n)
def shift(l,a,n): return l+a/2*(-1+1./n)
ly = shift(ly,ly[oversample],oversample)
lx = shift(lx,lx[oversample],oversample)
return ly, lx
[docs]def lrmap(shape, wcs, oversample=1):
"""Return a map of all the wavenumbers in the fourier transform
of a map with the given shape and wcs."""
return lmap(shape, wcs, oversample=oversample)[...,:shape[-1]//2+1]
[docs]def lpixsize(shape, wcs, signed=False, method="auto"):
return np.prod(lpixshape(shape, wcs, signed=signed, method=method))
[docs]def lpixshape(shape, wcs, signed=False, method="auto"):
return 2*np.pi/extent(shape,wcs, signed=signed, method=method)
[docs]def fft(emap, omap=None, nthread=0, normalize=True, adjoint_ifft=False, dct=False):
"""Performs the 2d FFT of the enmap pixels, returning a complex enmap.
If normalize is "phy", "phys" or "physical", then an additional normalization
is applied such that the binned square of the fourier transform can
be directly compared to theory (apart from mask corrections)
, i.e., pixel area factors are corrected for.
"""
if dct: res = samewcs(enfft.dct(emap,omap,axes=[-2,-1],nthread=nthread), emap)
else: res = samewcs(enfft.fft(emap,omap,axes=[-2,-1],nthread=nthread), emap)
norm = 1
if normalize:
if dct: norm /= np.prod(2*np.array(emap.shape[-2:])-1)**0.5
else: norm /= np.prod(emap.shape[-2:])**0.5
if normalize in ["phy","phys","physical"]:
if adjoint_ifft: norm /= emap.pixsize()**0.5
else: norm *= emap.pixsize()**0.5
if norm != 1: res *= norm
return res
[docs]def ifft(emap, omap=None, nthread=0, normalize=True, adjoint_fft=False, dct=False):
"""Performs the 2d iFFT of the complex enmap given, and returns a pixel-space enmap."""
if dct: res = samewcs(enfft.idct(emap,omap,axes=[-2,-1],nthread=nthread, normalize=False), emap)
else: res = samewcs(enfft.ifft(emap,omap,axes=[-2,-1],nthread=nthread, normalize=False), emap)
norm = 1
if normalize:
if dct: norm /= np.prod(2*np.array(emap.shape[-2:])-1)**0.5
else: norm /= np.prod(emap.shape[-2:])**0.5
if normalize in ["phy","phys","physical"]:
if adjoint_fft: norm *= emap.pixsize()**0.5
else: norm /= emap.pixsize()**0.5
if norm != 1: res *= norm
return res
[docs]def dct(emap, omap=None, nthread=0, normalize=True):
return fft(emap, omap=omap, nthread=nthread, normalize=normalize, dct=True)
[docs]def idct(emap, omap=None, nthread=0, normalize=True):
return ifft(emap, omap=omap, nthread=nthread, normalize=normalize, dct=True)
[docs]def fft_adjoint(emap, omap=None, nthread=0, normalize=True):
return ifft(emap, omap=omap, nthread=nthread, normalize=normalize, adjoint_fft=True)
[docs]def ifft_adjoint(emap, omap=None, nthread=0, normalize=True):
return fft(emap, omap=omap, nthread=nthread, normalize=normalize, adjoint_ifft=True)
[docs]def idct_adjoint(emap, omap=None, nthread=0, normalize=True):
return fft(emap, omap=omap, nthread=nthread, normalize=normalize, adjoint_ifft=True, dct=True)
[docs]def dct_adjoint(emap, omap=None, nthread=0, normalize=True):
return ifft(emap, omap=omap, nthread=nthread, normalize=normalize, adjoint_fft=True, dct=True)
# These are shortcuts for transforming from T,Q,U real-space maps to
# T,E,B hamonic maps. They are not the most efficient way of doing this.
# It would be better to precompute the rotation matrix and buffers, and
# use real transforms.
[docs]def map2harm(emap, nthread=0, normalize=True, iau=False, spin=[0,2], adjoint_harm2map=False):
"""Performs the 2d FFT of the enmap pixels, returning a complex enmap.
If normalize starts with "phy" (for physical), then an additional normalization
is applied such that the binned square of the fourier transform can
be directly compared to theory (apart from mask corrections)
, i.e., pixel area factors are corrected for.
"""
emap = samewcs(fft(emap,nthread=nthread,normalize=normalize, adjoint_ifft=adjoint_harm2map), emap)
if emap.ndim > 2:
rot, s0 = None, None
for s, i1, i2 in spin_helper(spin, emap.shape[-3]):
if s == 0: continue
if s != s0: s0, rot = s, queb_rotmat(emap.lmap(), iau=iau, spin=s)
emap[...,i1:i2,:,:] = map_mul(rot, emap[...,i1:i2,:,:])
return emap
[docs]def harm2map(emap, nthread=0, normalize=True, iau=False, spin=[0,2], keep_imag=False, adjoint_map2harm=False):
if emap.ndim > 2:
emap = emap.copy()
rot, s0 = None, None
for s, i1, i2 in spin_helper(spin, emap.shape[-3]):
if s == 0: continue
if s != s0: s0, rot = s, queb_rotmat(emap.lmap(), iau=iau, spin=s, inverse=True)
emap[...,i1:i2,:,:] = map_mul(rot, emap[...,i1:i2,:,:])
res = samewcs(ifft(emap,nthread=nthread,normalize=normalize, adjoint_fft=adjoint_map2harm), emap)
if not keep_imag: res = res.real
return res
[docs]def map2harm_adjoint(emap, nthread=0, normalize=True, iau=False, spin=[0,2], keep_imag=False):
return harm2map(emap, nthread=nthread, normalize=normalize, iau=iau, spin=spin, keep_imag=keep_imag, adjoint_map2harm=True)
[docs]def harm2map_adjoint(emap, nthread=0, normalize=True, iau=False, spin=[0,2]):
return map2harm(emap, nthread=nthread, normalize=normalize, iau=iau, spin=spin, adjoint_harm2map=True)
[docs]def queb_rotmat(lmap, inverse=False, iau=False, spin=2):
# atan2(x,y) instead of (y,x) because Qr points in the
# tangential direction, not radial. This matches flipperpol too.
# This corresponds to the Healpix convention. To get IAU,
# flip the sign of a.
sgn = 1 if iau else -1
if not mute["polconv_fix"]:
warnings.warn("enmap polarization convention changed. The old iau was actually healpix, and vice versa. This has been fixed now, and the enmap meaning of iau and healpix now matches that of curvedsky, and the rest of the world. If you suddenly get gibberish EE spectra, then you will need to change the value of the iau flag you pass to harm2map, map2harm, etc.. To mute this warnings, set enmapl.mute[\"polconv_fix\"] = True")
a = sgn*spin*np.arctan2(-lmap[1], lmap[0])
c, s = np.cos(a), np.sin(a)
if inverse: s = -s
return samewcs(np.array([[c,-s],[s,c]]),lmap)
[docs]def rotate_pol(emap, angle, comps=[-2,-1], spin=2, axis=-3):
"""Rotate the polarization of the given enmap "emap" by angle
(in radians) along the given components (the last two by default)
of the given axis (the 3rd-last axis by default). In standard enmaps
the 3rd-last axis is holds the Stokes components of the map in the order
T, Q, U. The spin argument controls the spin, and defaults to spin-2.
This function is flexible enough to work with non-enmaps too."""
if spin == 0: return emap
axis %= emap.ndim
c, s = np.cos(spin*angle), np.sin(spin*angle)
res = emap.copy()
pre = (slice(None),)*axis
res[pre+(comps[0],)] = c*emap[pre+(comps[0],)] - s*emap[pre+(comps[1],)]
res[pre+(comps[1],)] = s*emap[pre+(comps[0],)] + c*emap[pre+(comps[1],)]
return res
[docs]def map_mul(mat, vec):
"""Elementwise matrix multiplication mat*vec. Result will have
the same shape as vec. Multiplication happens along the last non-pixel
indices."""
# Allow scalar product, broadcasting if necessary
mat = np.asanyarray(mat)
if mat.ndim <= 3: return mat*vec
# Otherwise we do a matrix product along the last axes
ovec = samewcs(np.einsum("...abyx,...byx->...ayx", mat, vec), mat, vec)
return ovec
[docs]def smooth_gauss(emap, sigma):
"""Smooth the map given as the first argument with a gaussian beam
with the given standard deviation sigma in radians. If sigma is negative,
then the complement of the smoothed map will be returned instead (so
it will be a highpass filter)."""
if np.all(sigma == 0): return emap.copy()
f = fft(emap)
x2 = np.sum(emap.lmap()**2*sigma**2,0)
if sigma >= 0: f *= np.exp(-0.5*x2)
else: f *= 1-np.exp(-0.5*x2)
return ifft(f).real
[docs]def inpaint(map, mask, method="nearest"):
"""Inpaint regions in emap where mask==True based on the nearest unmasked pixels.
Uses scipy.interpolate.griddata internally. See its documentation for the meaning of
method. Note that if method is not "nearest", then areas where the mask touches the edge
will be filled with NaN instead of sensible values.
The purpose of this function is mainly to allow inapinting bad values with a
continuous signal with the right order of magnitude, for example to allow fourier
operations of masked data with large values near the edge of the mask (e.g. a
galactic mask). Its goal is not to inpaint with something realistic-looking. For
that heavier methods are needed.
FIXME: This function is slow and not very good. Fix or remove.
"""
from scipy import interpolate
# Find innermost good pixels at border of mask. These are the pixels the interpolation
# will actually be based on, so isolating them makes things much faster than just sending
# in every valid pixel
border = scipy.ndimage.distance_transform_edt(~mask)==1
pix = map.pixmap()
pix_good = pix[:,border].reshape(2,-1).T
pix_bad = pix[:,mask].reshape(2,-1).T
if pix_good.size == 0: return map*0
omap = map.copy()
# Loop over each scalar component of omap
for m in omap.preflat:
val_good = m[border].reshape(-1)
val_ipol = interpolate.griddata(pix_good, val_good, pix_bad, method=method)
m[pix_bad[:,0],pix_bad[:,1]] = val_ipol
return omap
[docs]def calc_window(shape, order=0, scale=1):
"""Compute fourier-space pixel window function. Since the
window function is separable, it is returned as an x and y part,
such that window = wy[:,None]*wx[None,:]. By default the pixel
window for interpolation order 0 mapmaking (nearest neighbor)
is returned. Pass 1 for bilinear mapmaking's pixel window.
The scale argument can be used to calculate the pixel window
at non-native resolutions. For example, with scale=2 you will
get the pixwin for a map with twice the resolution"""
wy = utils.pixwin_1d(np.fft.fftfreq(shape[-2], scale), order=order)
wx = utils.pixwin_1d(np.fft.fftfreq(shape[-1], scale), order=order)
return wy, wx
[docs]def apply_window(emap, pow=1.0, order=0, scale=1, nofft=False):
"""Apply the pixel window function to the specified power to the map,
returning a modified copy. Use pow=-1 to unapply the pixel window.
By default the pixel window for interpolation order 0 mapmaking
(nearest neighbor) is applied. Pass 1 for bilinear mapmaking's pixel window."""
wy, wx = calc_window(emap.shape, order=order, scale=scale)
if not nofft: emap = fft(emap)
else: emap = emap.copy()
emap *= wy[:,None]**pow
emap *= wx[None,:]**pow
if not nofft: emap = ifft(emap).real
return emap
[docs]def unapply_window(emap, pow=1.0, order=0, scale=1, nofft=False):
"""The inverse of apply_window. Equivalent to just flipping the sign of the pow argument."""
return apply_window(emap, pow=-pow, order=order, scale=scale, nofft=nofft)
[docs]def samewcs(arr, *args):
"""Returns arr with the same wcs information as the first enmap among
args. If no matches are found, arr is returned as is. Will
reference, rather than copy, the underlying array data
whenever possible.
"""
for m in args:
try: return ndmap(arr, m.wcs)
except AttributeError: pass
return arr
# Idea: Make geometry a class with .shape and .wcs members.
# Make a function that takes (foo,bar) and returns a geometry,
# there (foo,bar) can either be (shape,wcs) or (geometry,None).
# Use that to make everything that currently accepts shape, wcs
# transparently accept geometry. This will free us from having
# to drag around a shape, wcs pair all the time.
[docs]def geometry(pos, res=None, shape=None, proj="car", deg=False, pre=(), force=False, ref=None, **kwargs):
"""Consruct a shape,wcs pair suitable for initializing enmaps.
pos can be either a {dec,ra} center position or a [{from,to},{dec,ra}]
array giving the bottom-left and top-right corners of the desired geometry.
At least one of res or shape must be specified. If res is specified, it
must either be a number, in which the same resolution is used in each direction,
or {dec,ra}. If shape is specified, it must be at least [2]. All angles
are given in radians.
The projection type is chosen with the proj argument. The default is "car",
corresponding to the equirectangular plate carree projection. Other valid
projections are "cea", "zea", "gnom", etc. See wcsutils for details.
By default the geometry is tweaked so that a standard position, typically
ra=0,dec=0, would be at an integer logical pixel position (even if that position is
outside the physical map). This makes it easier to generate maps that are compatible
up to an integer pixel offset, as well as maps that are compatible with the predefined
spherical harmonics transform ring weights. The cost of this tweaking is that the
resulting corners can differ by a fraction of a pixel from the one requested.
To force the geometry to exactly match the corners provided you can pass force=True.
It is also possible to manually choose the reference point via the ref argument, which
must be a dec,ra coordinate pair (in radians)."""
# TODO: This function should be generalized to support fejer1.
# This is problematic because we can't assume that ra=0,dec=0 will be a pixel
# center in a Fejer1 geometry. Actually we can't even assume that for CC.
# For both cases it will happen for odd ny, but odd ny is normal for CC but
# rare for fejer1. For fejer1 the norm will instead be to have a pixel edge
# at ra=0,dec=0. In general the safest approach is to at least conceptually
# start from a standardized fullsky geometry and then crop it to the target,
# rather than to start from a standard point on the sky and then grow it
# to the required size. This will require a complete rework of this function
# though.
# We use radians by default, while wcslib uses degrees, so need to rescale.
# The exception is when we are using a plain, non-spherical wcs, in which case
# both are unitless. So undo the scaling in this case.
scale = 1 if deg else 1/utils.degree
pos = np.asarray(pos)*scale
if res is not None: res = np.asarray(res)*scale
# Apply a standard reference points unless one is manually specified, or we
# want to force the corners to exactly match the input.
try:
# if it's a (dec,ra) tuple in radians, make it (ra,dec) in degrees.
ref = (ref[1] * scale, ref[0] * scale)
assert(len(ref) == 2)
except (TypeError, ValueError):
pass
if ref is None and not force: ref = "standard"
wcs = wcsutils.build(pos, res, shape, rowmajor=True, system=proj, ref=ref, **kwargs)
if shape is None:
# Infer shape. WCS does not allow us to wrap around the
# sky, so shape mustn't be large enough to make that happen.
# Our relevant pixel coordinates go from (-0.5,-0.5) to
# shape-(0.5,0.5). We assume that wcs.build has already
# assured the former. Our job is to find shape that puts
# the top edge close to the requested value, while still
# being valid. If we always round down, we should be safe:
nearedge = wcsutils.nobcheck(wcs).wcs_world2pix(pos[0:1,::-1],0)[0,::-1]
faredge = wcsutils.nobcheck(wcs).wcs_world2pix(pos[1:2,::-1],0)[0,::-1]
shape = tuple(np.round(np.abs(faredge-nearedge)).astype(int))
return pre+tuple(shape), wcs
[docs]def fullsky_geometry(res=None, shape=None, dims=(), proj="car", variant="CC"):
"""Build an enmap covering the full sky, with the outermost pixel centers
at the poles and wrap-around points. Only the car projection is
supported for now, but the variants CC and fejer1 can be selected using
the variant keyword. This currently defaults to CC, but will likely
change to fejer1 in the future."""
assert proj == "car", "Only CAR fullsky geometry implemented"
# Handle the CAR variants
if variant.lower() == "cc": yo = 1
elif variant.lower() == "fejer1": yo = 0
else: raise ValueError("Unrecognized CAR variant '%s'" % str(variant))
# Set up the shape/resolution
if shape is None:
res = np.zeros(2)+res
shape = utils.nint(([1*np.pi,2*np.pi]/res) + (yo,0))
else:
res = np.array([1*np.pi,2*np.pi])/(np.array(shape)-(yo,0))
ny, nx = shape
assert abs(res[0]*(ny-yo)- np.pi) < 1e-8, "Vertical resolution does not evenly divide the sky; this is required for SHTs."
assert abs(res[1]*nx -2*np.pi) < 1e-8, "Horizontal resolution does not evenly divide the sky; this is required for SHTs."
wcs = wcsutils.WCS(naxis=2)
# Note the reference point is shifted by half a pixel to keep
# the grid in bounds, from ra=180+cdelt/2 to ra=-180+cdelt/2.
wcs.wcs.crval = [res[1]/2/utils.degree,0]
wcs.wcs.cdelt = [-360./nx,180./(ny-yo)]
wcs.wcs.crpix = [nx//2+0.5,(ny+1)/2]
wcs.wcs.ctype = ["RA---CAR","DEC--CAR"]
return dims+(ny,nx), wcs
[docs]def band_geometry(dec_cut,res=None, shape=None, dims=(), proj="car", variant="CC"):
"""Return a geometry corresponding to a sky that had a full-sky
geometry but to which a declination cut was applied. If dec_cut
is a single number, the declination range will be (-dec_cut,dec_cut)
radians, and if specified with two components, it is interpreted as
(dec_cut_min,dec_cut_max). The remaining arguments are the same as
fullsky_geometry and pertain to the geometry before cropping to the
cut-sky.
"""
dec_cut = np.atleast_1d(dec_cut)
if dec_cut.size == 1:
dec_cut_min = -dec_cut[0]
dec_cut_max = dec_cut[0]
assert dec_cut_max>0
elif dec_cut.size == 2:
dec_cut_min,dec_cut_max = dec_cut
assert dec_cut_max>dec_cut_min
else:
raise ValueError
ishape,iwcs = fullsky_geometry(res=res, shape=shape, dims=dims, proj=proj, variant=variant)
start = sky2pix(ishape,iwcs,(dec_cut_min,0))[0]
stop = sky2pix(ishape,iwcs,(dec_cut_max,0))[0]
Ny,_ = ishape[-2:]
start = max(int(np.round(start)),0); stop = min(int(np.round(stop)),Ny)
assert start>=0 and start<Ny
assert stop>=0 and stop<Ny
return slice_geometry(ishape,iwcs,np.s_[start:stop,:])
[docs]def thumbnail_geometry(r=None, res=None, shape=None, dims=(), proj="tan"):
"""Build a geometry in the given projection centered on (0,0), which will
be exactly at a pixel center.
r: The radius from the center to the edges of the patch, in radians.
res: The resolution of the patch, in radians.
shape: The target shape of the patch. Will be forced to odd numbers if necessary.
Any two out of these three arguments must be specified. The most common usage
will probably be to specify r and res, e.g.
shape, wcs = enmap.thumbnail_geometry(r=1*utils.degree, res=0.5*utils.arcmin)
The purpose of this function is to provide a geometry appropriate for object
stacking, etc. Ideally enmap.geometry would do this, but this specialized function
makes it easier to ensure that the center of the coordinate system will be at
excactly the pixel index (y,x) = shape//2+1, which was a commonly requested feature
(even though which pixel is at the center shouldn't really matter as long as one
takes into account the actual coordinates of each pixel).
"""
if wcsutils.is_plain(proj):
ctype = ["",""]
dirs = [1,1]
else:
ctype = ["RA---%s" % proj.upper(), "DEC--%s" % proj.upper()]
dirs = [1,-1]
if r is None: # res and shape given
assert res is not None and shape is not None, "Two of r, res and shape must be given"
res = wcsutils.expand_res(res, dirs)
shape = utils.nint(np.zeros(2)+shape[-2:]) # Broadcast and make sure it's an integer
shape = shape//2*2+1 # Force odd shape
wcs = wcsutils.explicit(ctype=ctype, crval=[0,0], cdelt=res[::-1]/utils.degree, crpix=shape[::-1]//2+1)
elif shape is None: # res and r given
assert res is not None and r is not None, "Two of r, res and shape must be given"
res = wcsutils.expand_res(res, dirs)
r = np.zeros(2)+r
wcs = wcsutils.explicit(ctype=ctype, crval=[0,0], cdelt=res[::-1]/utils.degree, crpix=[1,1])
rpix = utils.nint(np.abs(wcsutils.nobcheck(wcs).wcs_world2pix(r[None,::-1]/utils.degree,0)[0,::-1]))
shape = 2*rpix+1
wcs.wcs.crpix = shape[::-1]//2+1
else: # r and shape given
assert r is not None and shape is not None, "Two of r, res and shape must be given"
shape = utils.nint(np.zeros(2)+shape[-2:]) # Broadcast and make sure it's an integer
shape = shape//2*2+1 # Force odd shape
r = np.zeros(2)+r
wcs = wcsutils.explicit(ctype=ctype, crval=[0,0], crpix=[1,1])
rpix = np.abs(wcsutils.nobcheck(wcs).wcs_world2pix(r[None,::-1]/utils.degree,0)[0,::-1])
res_ratio = (shape-1)/(2*rpix)*dirs
wcs.wcs.cdelt /= res_ratio[::-1]
wcs.wcs.crpix = shape[::-1]//2+1
shape = dims+tuple(shape)
return shape, wcs
[docs]def union_geometry(geometries):
"""Given a list of compatible geometries, return a new geometry that's the union
if the inputs, in the sense that it contains all the pixels that the individual ones
contain"""
ref = geometries[0]
pixboxes = [pixbox_of(ref[1],shape,wcs) for shape, wcs in geometries]
bbox = utils.bounding_box(pixboxes)
oshape = tuple(bbox[1]-bbox[0])
owcs = ref[1].deepcopy()
owcs.wcs.crpix -= bbox[0,::-1]
return oshape, owcs
[docs]def create_wcs(shape, box=None, proj="cea"):
if box is None:
box = np.array([[-1,-1],[1,1]])*0.5*10
box *= utils.degree
return wcsutils.build(box, shape=shape, rowmajor=True, system=proj)
[docs]def spec2flat(shape, wcs, cov, exp=1.0, mode="constant", oversample=1, smooth="auto"):
"""Given a (ncomp,ncomp,l) power spectrum, expand it to harmonic map space,
returning (ncomp,ncomp,y,x). This involves a rescaling which converts from
power in terms of multipoles, to power in terms of 2d frequency.
The optional exp argument controls the exponent of the rescaling factor.
To use this with the inverse power spectrum, pass exp=-1, for example.
If apply_exp is True, the power spectrum will be taken to the exp'th
power. Otherwise, it is assumed that this has already been done, and
the exp argument only controls the normalization of the result.
It is irritating that this function needs to know what kind of matrix
it is expanding, but I can't see a way to avoid it. Changing the
units of harmonic space is not sufficient, as the following demonstrates:
m = harm2map(map_mul(spec2flat(s, b, multi_pow(ps, 0.5), 0.5), map2harm(rand_gauss(s,b))))
The map m is independent of the units of harmonic space, and will be wrong unless
the spectrum is properly scaled. Since this scaling depends on the shape of
the map, this is the appropriate place to do so, ugly as it is."""
cov = np.asarray(cov)
oshape = cov.shape[:-1] + tuple(shape)[-2:]
if cov.ndim == 1: cov = cov[None,None]
ls = np.sum(lmap(oshape, wcs, oversample=oversample)**2,0)**0.5
if smooth == "auto":
# Determine appropriate fourier-scale smoothing based on 2d fourer
# space resolution. We wish to smooth by about this width to approximate
# averaging over sub-grid modes
smooth = 0.5*(ls[1,0]+ls[0,1])
smooth /= 3.41 # 3.41 is an empirical factor
if smooth > 0:
cov = smooth_spectrum(cov, kernel="gauss", weight="mode", width=smooth)
# Translate from steradians to pixels
cov = cov * np.prod(shape[-2:])/area(shape,wcs)
if exp != 1.0: cov = multi_pow(cov, exp)
cov[~np.isfinite(cov)] = 0
# Use order 1 because we will perform very short interpolation, and to avoid negative
# values in spectra that must be positive (and it's faster)
res = ndmap(utils.interpol(cov, np.reshape(ls,(1,)+ls.shape),mode=mode, mask_nan=False, order=1),wcs)
res = downgrade(res, oversample)
res = res.reshape(oshape[:-2]+res.shape[-2:])
return res
[docs]def spec2flat_corr(shape, wcs, cov, exp=1.0, mode="constant"):
cov = np.asarray(cov)
oshape = cov.shape[:-1] + tuple(shape)[-2:]
if cov.ndim == 1: cov = cov[None,None]
if exp != 1.0: cov = multi_pow(cov, exp)
cov[~np.isfinite(cov)] = 0
# Convert power spectrum to correlation
ext = extent(shape,wcs)
rmax = np.sum(ext**2)**0.5
res = np.max(ext/shape[-2:])
nr = rmax/res
r = np.arange(nr)*rmax/nr
corrfun = powspec.spec2corr(cov, r)
# Interpolate it 2d. First get the pixel positions
# (remember to move to the corner because this is
# a correlation function)
dpos = posmap(shape, wcs)
dpos -= dpos[:,None,None,dpos.shape[-2]//2,dpos.shape[-1]//2]
ipos = np.arccos(np.cos(dpos[0])*np.cos(dpos[1]))*nr/rmax
corr2d = utils.interpol(corrfun, ipos.reshape((-1,)+ipos.shape), mode=mode, mask_nan=False, order=1)
corr2d = np.roll(corr2d, -corr2d.shape[-2]//2, -2)
corr2d = np.roll(corr2d, -corr2d.shape[-1]//2, -1)
corr2d = ndmap(corr2d, wcs)
return fft(corr2d).real * np.prod(shape[-2:])**0.5
[docs]def smooth_spectrum(ps, kernel="gauss", weight="mode", width=1.0):
"""Smooth the spectrum ps with the given kernel, using the given weighting."""
ps = np.asanyarray(ps)
pflat = ps.reshape(-1,ps.shape[-1])
nspec,nl = pflat.shape
# Set up the kernel array
K = np.zeros((nspec,nl))
l = np.arange(nl)
if isinstance(kernel, basestring):
if kernel == "gauss":
K[:] = np.exp(-0.5*(l/width)**2)
elif kernel == "step":
K[:,:int(width)] = 1
else:
raise ValueError("Unknown kernel type %s in smooth_spectrum" % kernel)
else:
tmp = np.atleast_2d(kernel)
K[:,:tmp.shape[-1]] = tmp[:,:K.shape[-1]]
# Set up the weighting scheme
W = np.zeros((nspec,nl))
if isinstance(weight, basestring):
if weight == "mode":
W[:] = l[None,:]**2
elif weight == "uniform":
W[:] = 1
else:
raise ValueError("Unknown weighting scheme %s in smooth_spectrum" % weight)
else:
tmp = np.atleast_2d(weight)
assert tmp.shape[-1] == W.shape[-1], "Spectrum weight must have the same length as spectrum"
pWK = _convolute_sym(pflat*W, K)
WK = _convolute_sym(W, K)
res = pWK/WK
return res.reshape(ps.shape)
[docs]def calc_ps2d(harm, harm2=None):
"""Compute the 2d power spectrum of the harmonic-space enmap "harm", as output by
map2harm. Use map2harm with norm="phys" to get physical units in this spectrum.
If harm2 is specified, then the cross-spectrum between harm and harm2 is computed
instead.
Some example usage, where the notation a[{x,y,z},n,m] specifies that the array
a has shape [3,n,m], and the 3 entries in the first axis should be interpreted
as x, y and z respectively.
1. cl[nl] = calc_ps2d(harm[ny,nx])
This just computes the standard power spectrum of the given harm, resulting in
a single 2d enmap.
2. cl[nl] = calc_ps2d(harm1[ny,nx], harm2[ny,nx])
This compues the 1d cross-spectrum between the 2d enmaps harm1 and harm2.
3. cl[{T,E,B},{T,E,B},nl] = calc_ps2d(harm[{T,E,B},None,ny,nx], harm[None,{T,E,B},ny,nx])
This computes the 3x3 polarization auto-spectrum for a 3d polarized harmonic enmap.
4. cl[{T,E,B},{T,E,B},nl] = calc_ps2d(harm1[{T,E,B},None,ny,nx], harm2[None,{T,E,B},ny,nx])
As above, but gives the 3x3 polarization cross-spectrum between two 3d harmonic enmaps.
The output is in the shape one would expect from numpy broadcasting. For example,
in the last example, the TE power spectrum would be found in cl[0,1], and the
ET power spectrum (which is different for the cross-spectrum case) is in cl[1,0]."""
harm = np.asanyarray(harm)
harm2 = np.asanyarray(harm2) if harm2 is not None else harm
# Unify dtypes
dtype = np.result_type(harm.real, harm2.real)
def getaddr(a): return a.__array_interface__["data"][0]
harm, harm2 = [samewcs(a, harm) for a in np.broadcast_arrays(harm, harm2)]
# We set the writable flags not because we intend to write, but to silience a
# false positive warning from numpy
harm.flags["WRITEABLE"] = harm2.flags["WRITEABLE"] = True
# I used to flatten here to make looping simple, but that caused a copy to be made
# when combined with np.broadcast. So instead I will use manual raveling
pshape = harm.shape[:-2]
npre = int(np.prod(pshape))
# A common use case is to compute TEBxTEB auto-cross spectra, where
# e.g. TE === ET since harm1 is the same array as harm2. To avoid duplicate
# calculations in this case we use a cache, which skips computing the
# cross-spectrum of any given pair of arrays more than once.
cache = {}
ps2d = empty(harm.shape, harm.wcs, dtype)
# We will loop over individual spectra
for i in range(npre):
I = np.unravel_index(i, pshape)
# Avoid duplicate calculation
key = tuple(sorted([getaddr(harm[I]), getaddr(harm2[I])]))
if key in cache:
ps2d[I] = cache[key]
else:
ps2d[I] = (harm[I]*np.conj(harm2[I])).real
cache[key] = ps2d[I]
return ps2d
def _convolute_sym(a,b):
sa = np.concatenate([a,a[:,-2:0:-1]],-1)
sb = np.concatenate([b,b[:,-2:0:-1]],-1)
fa = enfft.rfft(sa)
fb = enfft.rfft(sb)
sa = enfft.ifft(fa*fb,sa,normalize=True)
return sa[:,:a.shape[-1]]
[docs]def multi_pow(mat, exp, axes=[0,1]):
"""Raise each sub-matrix of mat (ncomp,ncomp,...) to
the given exponent in eigen-space."""
return samewcs(utils.eigpow(mat, exp, axes=axes), mat)
[docs]def get_downgrade_offset(shape, wcs, factor, ref=None):
"""Get the pixel offset required to keep a map downgraded by the given factor
aligned with the reference point."""
factor = np.zeros(2, int)+factor
if ref is None: return np.zeros(2,int)
else: return utils.nint(sky2pix(shape, wcs, ref))%factor
[docs]def downgrade(emap, factor, op=np.mean, ref=None, off=None, inclusive=False):
"""Returns enmap "emap" downgraded by the given integer factor
(may be a list for each direction, or just a number) by averaging
inside pixels."""
factor = np.zeros(2, int)+factor
# Optionally apply an offset to keep different downgraded maps pixel-compatible.
# This can be either manually specified, or inferred from reference coordinates.
if off is None: off = get_downgrade_offset(emap.shape, emap.wcs, factor, ref)
else: off = np.zeros(2, int)+off
# Do the averaging
omap = utils.block_reduce(emap, factor[0], off=off[0], axis=-2, inclusive=inclusive, op=op)
omap = utils.block_reduce(omap, factor[1], off=off[1], axis=-1, inclusive=inclusive, op=op)
# Update the wcs
wcs = emap[...,off[0]::factor[0],off[1]::factor[1]].wcs
wcs.wcs.crpix += (off[1::-1]>0)*inclusive # extra downgraded pixel in front if inclusive with offset
omap = ndmap(omap, wcs)
return omap
[docs]def upgrade(emap, factor, off=None, oshape=None, inclusive=False):
"""Upgrade emap to a larger size using nearest neighbor interpolation,
returning the result. More advanced interpolation can be had using
enmap.interpolate."""
factor = np.zeros(2, int)+factor
off = np.zeros(2,int)+(0 if off is None else off)
# If no output shape specified, guess one that had no pixels left over at the end
if oshape is None: oshape = (np.array(emap.shape[-2:])-(off>0)*inclusive)*factor+off
omap = utils.block_expand(emap, factor[0], oshape[-2], off=off[0], axis=-2, inclusive=inclusive)
omap = utils.block_expand(omap, factor[1], oshape[-1], off=off[1], axis=-1, inclusive=inclusive)
# Correct the WCS information
omap = ndmap(omap, emap.wcs.copy())
for j in range(2):
omap.wcs.wcs.crpix[j] -= 0.5 + (off[1-j]>0)*inclusive
omap.wcs.wcs.crpix[j] *= factor[1-j]
omap.wcs.wcs.cdelt[j] /= factor[1-j]
omap.wcs.wcs.crpix[j] += 0.5 + off[1-j]
return omap
[docs]def downgrade_geometry(shape, wcs, factor):
"""Returns the oshape, owcs corresponding to a map with geometry
shape, wcs that has been downgraded by the given factor. Similar
to scale_geometry, but truncates the same way as downgrade, and only
supports integer factors."""
factor = np.full(2, 1, dtype=int)*factor
oshape = tuple(shape[-2:]//factor)
owcs = wcsutils.scale(wcs, 1.0/factor)
return oshape, owcs
[docs]def upgrade_geometry(shape, wcs, factor):
return scale_geometry(shape, wcs, factor)
[docs]def distance_from(shape, wcs, points, omap=None, odomains=None, domains=False, method="cellgrid", rmax=None, step=1024):
"""Find the distance from each pixel in the geometry (shape, wcs) to the
nearest of the points[{dec,ra},npoint], returning a [ny,nx] map of distances.
If domains==True, then it will also return a [ny,nx] map of the index of the point
that was closest to each pixel. If rmax is specified and the method is "cellgrid" or "bubble", then
distances will only be computed up to rmax. Beyond that distance will be set to rmax
and domains to -1. This can be used to speed up the calculation when one only cares
about nearby areas."""
from pixell import distances
if wcsutils.is_plain(wcs): warnings.warn("Distance functions are not tested on plain coordinate systems.")
if omap is None: omap = empty(shape[-2:], wcs)
if domains and odomains is None: odomains = empty(shape[-2:], wcs, np.int32)
points = np.asarray(points)
assert points.ndim == 2 and len(points) == 2, "points must be [{dec,ra},npoint]"
# Handle case where no points are specified
if points.size == 0:
if rmax is None: rmax = np.inf
omap[:] = rmax
if domains: odomains[:] = -1
return (omap, odomains) if domains else omap
# Ok, we have at least one point, use the normal stuff
if wcsutils.is_cyl(wcs):
dec, ra = posaxes(shape, wcs)
if method == "bubble":
point_pix = utils.nint(sky2pix(shape, wcs, points))
return distances.distance_from_points_bubble_separable(dec, ra, points, point_pix, rmax=rmax, omap=omap, odomains=odomains, domains=domains)
elif method == "cellgrid":
point_pix = utils.nint(sky2pix(shape, wcs, points))
return distances.distance_from_points_cellgrid(dec, ra, points, point_pix, rmax=rmax, omap=omap, odomains=odomains, domains=domains)
elif method == "simple":
return distances.distance_from_points_simple_separable(dec, ra, points, omap=omap, odomains=odomains, domains=domains)
else: raise ValueError("Unknown method '%s'" % str(method))
else:
# We have a general geometry, so we need the full posmap. But to avoid wasting memory we
# can loop over chunks of the posmap.
if method == "bubble":
# Not sure how to slice bubble. Just do it in one go for now
pos = posmap(shape, wcs, safe=False)
point_pix = utils.nint(sky2pix(shape, wcs, points))
return distances.distance_from_points_bubble(pos, points, point_pix, rmax=rmax, omap=omap, odomains=odomains, domains=domains)
elif method == "cellgrid":
pos = posmap(shape, wcs, safe=False)
point_pix = utils.nint(sky2pix(shape, wcs, points))
return distances.distance_from_points_cellgrid(pos[0], pos[1], points, point_pix, rmax=rmax, omap=omap, odomains=odomains, domains=domains)
elif method == "simple":
geo = Geometry(shape, wcs)
for y in range(0, shape[-2], step):
sub_geo = geo[y:y+step]
pos = posmap(*sub_geo, safe=False)
if domains:
distances.distance_from_points_simple(pos, points, omap=omap[y:y+step], odomains=odomains[y:y+step], domains=True)
else:
distances.distance_from_points_simple(pos, points, omap=omap[y:y+step])
if domains: return omap, odomains
else: return omap
[docs]def distance_from_healpix(nside, points, omap=None, odomains=None, domains=False, rmax=None, method="bubble"):
"""Find the distance from each pixel in healpix map with nside nside to the
nearest of the points[{dec,ra},npoint], returning a [ny,nx] map of distances.
If domains==True, then it will also return a [ny,nx] map of the index of the point
that was closest to each pixel. If rmax is specified, then distances will only be
computed up to rmax. Beyond that distance will be set to rmax and domains to -1.
This can be used to speed up the calculation when one only cares about nearby areas."""
import healpy
from pixell import distances
info = distances.healpix_info(nside)
if omap is None: omap = np.empty(info.npix)
if domains and odomains is None: odomains = np.empty(info.npix, np.int32)
pixs = utils.nint(healpy.ang2pix(nside, np.pi/2-points[0], points[1]))
return distances.distance_from_points_healpix(info, points, pixs, rmax=rmax, omap=omap, odomains=odomains, domains=domains, method=method)
[docs]def grow_mask(mask, r):
"""Grow the True part of boolean mask "mask" by a distance of r radians"""
return (~mask).distance_transform(rmax=r) < r
[docs]def shrink_mask(mask, r):
"""Shrink the True part of boolean mask "mask" by a distance of r radians"""
return mask.distance_transform(rmax=r) >= r
[docs]def pad(emap, pix, return_slice=False, wrap=False):
"""Pad enmap "emap", creating a larger map with zeros filled in on the sides.
How much to pad is controlled via pix, which har format [{from,to},{y,x}],
[{y,x}] or just a single number to apply on all sides. E.g. pix=5 would pad
by 5 on all sides, and pix=[[1,2],[3,4]] would pad by 1 on the bottom,
2 on the left, 3 on the top and 4 on the right."""
pix = np.asarray(pix,dtype=int)
if pix.ndim == 0:
pix = np.array([[pix,pix],[pix,pix]])
elif pix.ndim == 1:
pix = np.array([pix,pix])
# Exdend the wcs in each direction.
w = emap.wcs.deepcopy()
w.wcs.crpix += pix[0,::-1]
# Construct a slice between the new and old map
res = zeros(emap.shape[:-2]+tuple([s+sum(p) for s,p in zip(emap.shape[-2:],pix.T)]),wcs=w, dtype=emap.dtype)
mslice = (Ellipsis,slice(pix[0,0],res.shape[-2]-pix[1,0]),slice(pix[0,1],res.shape[-1]-pix[1,1]))
res[mslice] = emap
if wrap:
res[...,:pix[0,0],:] = res[...,-pix[0,0]-pix[1,0]:-pix[1,0],:]
res[...,-pix[1,0]:,:] = res[...,pix[0,0]:pix[0,0]+pix[1,0],:]
res[...,:,:pix[0,1]] = res[...,:,-pix[0,1]-pix[1,1]:-pix[1,1]]
res[...,:,-pix[1,1]:] = res[...,:,pix[0,1]:pix[0,1]+pix[1,1]]
return (res,mslice) if return_slice else res
[docs]def find_blank_edges(m, value="auto"):
"""Returns blanks[{front,back},{y,x}], the size of the blank area
at the beginning and end of each axis of the map, where the argument
"value" determines which value is considered blank. Can be a float value,
or the strings "auto" or "none". Auto will choose the value that maximizes
the edge area considered blank. None will result in nothing being consideered blank."""
if value == "auto":
# Find the median value along each edge
medians = [np.median(m[...,:,i],-1) for i in [0,-1]] + [np.median(m[...,i,:],-1) for i in [0,-1]]
bs = [find_blank_edges(m, med) for med in medians]
nb = [np.prod(np.sum(b,0)) for b in bs]
blanks = bs[np.argmax(nb)]
return blanks
elif value == "none":
# Don't use any values for cropping, so no cropping is done
return np.zeros([2,2],dtype=int)
else:
value = np.asarray(value)
# Find which rows and cols consist entirely of the given value
hitmask = np.all(np.isclose(m.T, value.T, equal_nan=True, rtol=1e-6, atol=0).T,axis=tuple(range(m.ndim-2)))
hitrows = np.all(hitmask,1)
hitcols = np.all(hitmask,0)
# Find the first and last row and col which aren't all the value
blanks = np.array([
np.where(~hitrows)[0][[0,-1]],
np.where(~hitcols)[0][[0,-1]]]
).T
blanks[1] = m.shape[-2:]-blanks[1]-1
return blanks
[docs]def autocrop(m, method="plain", value="auto", margin=0, factors=None, return_info=False):
"""Adjust the size of m to be more fft-friendly. If possible,
blank areas at the edge of the map are cropped to bring us to a nice
length. If there there aren't enough blank areas, the map is padded
instead. If value="none" no values are considered blank, so no cropping
will happen. This can be used to autopad for fourier-friendliness."""
blanks = find_blank_edges(m, value=value)
nblank = np.sum(blanks,0)
# Find the first good sizes larger than the unblank lengths
minshape = m.shape[-2:]-nblank+margin
if method == "plain":
goodshape = minshape
elif method == "fft":
goodshape = np.array([enfft.fft_len(l, direction="above", factors=None) for l in minshape])
else:
raise ValueError("Unknown autocrop method %s!" % method)
# Pad if necessary
adiff = np.maximum(0,goodshape-m.shape[-2:])
padding = [[0,0],[0,0]]
if any(adiff>0):
padding = [adiff,[0,0]]
m = pad(m, padding)
blanks[0] += adiff
nblank = np.sum(blanks,0)
# Then crop to goodshape
tocrop = m.shape[-2:]-goodshape
lower = np.minimum(tocrop,blanks[0])
upper = tocrop-lower
s = (Ellipsis,slice(lower[0],m.shape[-2]-upper[0]),slice(lower[1],m.shape[-1]-upper[1]))
class PadcropInfo:
slice = s
pad = padding
if return_info:
return m[s], PadcropInfo
else:
return m[s]
[docs]def padcrop(m, info):
return pad(m, info.pad)[info.slice]
[docs]def grad(m):
"""Returns the gradient of the map m as [2,...]."""
return ifft(fft(m)*_widen(m.lmap(),m.ndim+1)*1j).real
[docs]def grad_pix(m):
"""The gradient of map m expressed in units of pixels.
Not the same as the gradient of m with resepect to pixels.
Useful for avoiding sky2pix-calls for e.g. lensing,
and removes the complication of axes that increase in
nonstandard directions."""
return grad(m)*(m.shape[-2:]/m.extent(signed=True))[(slice(None),)+(None,)*m.ndim]
[docs]def div(m):
"""Returns the divergence of the map m[2,...] as [...]."""
return ifft(np.sum(fft(m)*_widen(m.lmap(),m.ndim)*1j,0)).real
def _widen(map,n):
"""Helper for gard and div. Adds degenerate axes between the first
and the last two to give the map a total dimensionality of n."""
return map[(slice(None),) + (None,)*(n-3) + (slice(None),slice(None))]
[docs]def laplace(m):
return -ifft(fft(m)*np.sum(m.lmap()**2,0)).real
[docs]def apod(m, width, profile="cos", fill="zero"):
"""Apodize the provided map. Currently only cosine apodization is
implemented.
Args:
imap: (...,Ny,Nx) or (Ny,Nx) ndarray to be apodized
width: The width in pixels of the apodization on each edge.
profile: The shape of the apodization. Only "cos" is supported.
"""
width = np.minimum(np.zeros(2)+width,m.shape[-2:]).astype(np.int32)
if profile == "cos":
a = [0.5*(1-np.cos(np.linspace(0,np.pi,w))) for w in width]
else:
raise ValueError("Unknown apodization profile %s" % profile)
res = m.copy()
if fill == "mean":
offset = np.asarray(np.mean(res,(-2,-1)))[...,None,None]
res -= offset
elif fill == "median":
offset = np.asarray(np.median(res,(-2,-1)))[...,None,None]
res -= offset
if width[0] > 0:
res[...,:width[0],:] *= a[0][:,None]
res[...,-width[0]:,:] *= a[0][::-1,None]
if width[1] > 0:
res[...,:,:width[1]] *= a[1][None,:]
res[...,:,-width[1]:] *= a[1][None,::-1]
if fill == "mean" or fill == "median":
res += offset
return res
[docs]def apod_profile_lin(x): return x
[docs]def apod_profile_cos(x): return 0.5*(1-np.cos(np.pi*x))
[docs]def apod_mask(mask, width=1*utils.degree, edge=True, profile=apod_profile_cos):
"""Given an enmap mask that's 0 in bad regions and 1 in good regions, return an
apodization map that's still 0 in bad regions, but transitions smoothly
to 1 in the good region over the given width in radians. The transition
profile is controlled by the profile argument. Regions outside the
image are considered to be bad."""
if edge:
mask = mask.copy()
mask[..., 0,:] = False; mask[...,:, 0] = False
mask[...,-1,:] = False; mask[...,:,-1] = False
r = mask.distance_transform(rmax=width)
return profile(r/width)
[docs]def lwcs(shape, wcs):
"""Build world coordinate system for l-space"""
lres = 2*np.pi/extent(shape, wcs, signed=True)
ny, nx = shape[-2:]
owcs = wcsutils.explicit(crpix=[nx//2+1,ny//2+1], crval=[0,0], cdelt=lres[::-1])
return owcs
[docs]def rbin(map, center=[0,0], bsize=None, brel=1.0, return_nhit=False, return_bins=False):
"""Radially bin map around the given center point ([0,0] by default).
If bsize it given it will be the constant bin width. This defaults to
the pixel size. brel can be used to scale up the bin size. This is
mostly useful when using automatic bsize.
Returns bvals[...,nbin], r[nbin], where bvals is the mean
of the map in each radial bin and r is the mid-point of each bin
"""
r = map.modrmap(ref=center)
if bsize is None:
bsize = np.min(map.extent()/map.shape[-2:])
return _bin_helper(map, r, bsize*brel, return_nhit=return_nhit, return_bins=return_bins)
[docs]def lbin(map, bsize=None, brel=1.0, return_nhit=False, return_bins=False):
"""Like rbin, but for fourier space. Returns b(l),l"""
l = map.modlmap()
if bsize is None: bsize = min(abs(l[0,1]),abs(l[1,0]))
return _bin_helper(map, l, bsize*brel, return_nhit=return_nhit, return_bins=return_bins)
def _bin_helper(map, r, bsize, return_nhit=False, return_bins=False):
"""This is very similar to a function in utils, but was sufficiently different
that it didn't make sense to reuse that one. This is often the case with the
binning in utils. I should clean that up, and probably base one of the new
functions on this one."""
# Get the number of bins
n = int(np.max(r/bsize))
rinds = utils.floor((r/bsize).reshape(-1))
# Ok, rebin the map. We do this using bincount, which can be a bit slow
mflat = map.reshape((-1,)+map.shape[-2:])
mout = np.zeros((len(mflat),n))
nhit = np.bincount(rinds)[:n]
for i, m in enumerate(mflat):
mout[i] = np.bincount(rinds, weights=m.reshape(-1))[:n]/nhit
mout = mout.reshape(map.shape[:-2]+mout.shape[1:])
# What r should we assign to each bin? We could just use the bin center,
# but since we're averaging point samples in each bin, it makes more sense
# to assign the same average of the r values
orads = np.bincount(rinds, weights=r.reshape(-1))[:n]/nhit
if return_bins:
edges = np.arange(len(orads)+1)*bsize
orads = np.array([orads,edges[:-1],edges[1:]])
if return_nhit: return mout, orads, nhit
else: return mout, orads
[docs]def radial_average(map, center=[0,0], step=1.0):
warnings.warn("radial_average has been renamed to rbin", DeprecationWarning)
return rbin(map, center=center, brel=step)
[docs]def padslice(map, box, default=np.nan):
"""Equivalent to map[...,box[0,0]:box[1,0],box[0,1]:box[1,1]], except that
pixels outside the map are treated as actually being present, but filled with
the value given by "default". Hence, ther esult will always have size box[1]-box[0]."""
box = np.asarray(box).astype(int)
# Construct our output map
wcs = map.wcs.deepcopy()
wcs.wcs.crpix -= box[0,::-1]
res = full(map.shape[:-2]+tuple(box[1]-box[0]), wcs, default, map.dtype)
# Get the (possibly smaller) box for the valid pixels of the input map
ibox = np.maximum(0,np.minimum(np.array(map.shape[-2:])[None],box))
# Copy over the relevant region
o, w = ibox[0]-box[0], ibox[1]-ibox[0]
res[...,o[0]:o[0]+w[0],o[1]:o[1]+w[1]] = map[...,ibox[0,0]:ibox[1,0],ibox[0,1]:ibox[1,1]]
return res
[docs]def tile_maps(maps):
"""Given a 2d list of enmaps representing contiguous tiles in the
same global pixelization, stack them into a total map and return it.
E.g. if maps = [[a,b],[c,d]], then the result would be
c d
map = a b
"""
# First stack the actual data:
m = np.concatenate([np.concatenate(row,-1) for row in maps],-2)
# Then figure out the wcs of the result. crpix counts from the
# lower left corner, so a and the total map should have the same wcs
m = samewcs(m, maps[0][0])
return m
[docs]def stamps(map, pos, shape, aslist=False):
"""Given a map, extract a set of identically shaped postage stamps with corners
at pos[ntile,2]. The result will be an enmap with shape [ntile,...,ny,nx]
and a wcs appropriate for the *first* tile only. If that is not the
behavior wanted, you can specify aslist=True, in which case the result
will be a list of enmaps, each with the correct wcs."""
shape = np.zeros(2)+shape
pos = np.asarray(pos)
res = []
for p in pos:
res.append(padslice(map, [p,p+shape]))
if aslist: return res
res = samewcs(np.array(res),res[0])
return res
[docs]def to_healpix(imap, omap=None, nside=0, order=3, chunk=100000, destroy_input=False):
"""Project the enmap "imap" onto the healpix pixelization. If omap is given,
the output will be written to it. Otherwise, a new healpix map will be constructed.
The healpix map must be in RING order. nside controls the resolution of the output map.
If 0, nside is chosen such that the output map is higher resolution than the input.
This is needed to avoid losing information. To go to a lower-resolution output map,
you should first degrade the input map. The chunk argument affects the speed/memory
tradeoff of the function. Higher values use more memory, and might (and might not)
give higher speed. If destroy_input is True, then the input map will be prefiltered
in-place, which saves memory but modifies its values."""
warnings.warn("enmap.to_healpix is deprecated. Reprojecting this way is error-prone due to the potential loss of information, and the (very small) loss of high-l power due to the use of spline interpolation. Use reproject.map2healpix instead. And read its docstring!")
import healpy
if not destroy_input and order > 1: imap = imap.copy()
if order > 1:
imap = utils.interpol_prefilter(imap, order=order, inplace=True)
if omap is None:
# Generate an output map
if not nside:
npix_full_cyl = 4*np.pi/imap.pixsize()
nside = 2**int(np.floor(np.log2((npix_full_cyl/12)**0.5)))
npix = 12*nside**2
omap = np.zeros(imap.shape[:-2]+(npix,),imap.dtype)
else:
nside = healpy.npix2nside(omap.shape[-1])
npix = omap.shape[-1]
# Interpolate values at output pixel positions
for i in range(0, npix, chunk):
pos = np.array(healpy.pix2ang(nside, np.arange(i, min(npix,i+chunk))))
# Healpix uses polar angle, not dec
pos[0] = np.pi/2 - pos[0]
omap[...,i:i+chunk] = imap.at(pos, order=order, mask_nan=False, prefilter=False)
return omap
[docs]def to_flipper(imap, omap=None, unpack=True):
"""Convert the enmap "imap" into a flipper map with the same geometry. If
omap is given, the output will be written to it. Otherwise, a an array of
flipper maps will be constructed. If the input map has dimensions
[a,b,c,ny,nx], then the output will be an [a,b,c] array with elements
that are flipper maps with dimension [ny,nx]. The exception is for
a 2d enmap, which is returned as a plain flipper map, not a
0-dimensional array of flipper maps. To avoid this unpacking, pass
Flipper needs cdelt0 to be in decreasing order. This function ensures that,
at the cost of losing the original orientation. Hence to_flipper followed
by from_flipper does not give back an exactly identical map to the one
on started with.
"""
import flipper.liteMap
if imap.wcs.wcs.cdelt[0] > 0: imap = imap[...,::-1]
# flipper wants a different kind of wcs object than we have.
header = imap.wcs.to_header(relax=True)
header['NAXIS'] = 2
header['NAXIS1'] = imap.shape[-1]
header['NAXIS2'] = imap.shape[-2]
flipwcs = flipper.liteMap.astLib.astWCS.WCS(header, mode="pyfits")
iflat = imap.preflat
if omap is None:
omap = np.empty(iflat.shape[:-2],dtype=object)
for i, m in enumerate(iflat):
omap[i] = flipper.liteMap.liteMapFromDataAndWCS(iflat[i], flipwcs)
omap = omap.reshape(imap.shape[:-2])
if unpack and omap.ndim == 0: return omap.reshape(-1)[0]
else: return omap
[docs]def from_flipper(imap, omap=None):
"""Construct an enmap from a flipper map or array of flipper maps imap.
If omap is specified, it must have the correct shape, and the data will
be written there."""
imap = np.asarray(imap)
first = imap.reshape(-1)[0]
# flipper and enmap wcs objects come from different wcs libraries, so
# they must be converted
wcs = wcsutils.WCS(first.wcs.header).sub(2)
if omap is None:
omap = empty(imap.shape + first.data.shape, wcs, first.data.dtype)
# Copy over all components
iflat = imap.reshape(-1)
for im, om in zip(iflat, omap.preflat):
om[:] = im.data
omap = fix_endian(omap)
return omap
############
# File I/O #
############
[docs]def write_map(fname, emap, fmt=None, address=None, extra={}, allow_modify=False):
"""Writes an enmap to file. If fmt is not passed,
the file type is inferred from the file extension, and can
be either fits or hdf. This can be overriden by
passing fmt with either 'fits' or 'hdf' as argument.
The other arguments are passed to write_fits and/or write_hdf."""
if fmt == None:
if fname.endswith(".hdf"): fmt = "hdf"
elif fname.endswith(".fits"): fmt = "fits"
elif fname.endswith(".fits.gz"): fmt = "fits"
else: fmt = "fits"
if fmt == "fits":
write_fits(fname, emap, extra=extra, allow_modify=allow_modify)
elif fmt == "hdf":
write_hdf(fname, emap, address=address, extra=extra)
else:
raise ValueError
[docs]def read_map(fname, fmt=None, sel=None, box=None, pixbox=None, geometry=None, wrap="auto", mode=None, sel_threshold=10e6, wcs=None, hdu=None, delayed=False, verbose=False, address=None):
"""Read an enmap from file. The file type is inferred
from the file extension, unless fmt is passed.
fmt must be one of 'fits' and 'hdf'.
The sel, box, pixbox, geometry, wrap, mode, and delayed arguments
are all used by read_helper to (optionally) select a subregion of
the map or change how it is wrapped on the sky.
The hdu and verbose arguments are only used for FITS (see
read_fits). The address argument is only used for HDF (see
read_hdf)."""
toks = fname.split(":")
fname = toks[0]
if fmt == None:
if fname.endswith(".hdf"): fmt = "hdf"
elif fname.endswith(".npy"): fmt = "npy"
elif fname.endswith(".fits"): fmt = "fits"
elif fname.endswith(".fits.gz"): fmt = "fits"
else: fmt = "fits"
if fmt == "fits":
res = read_fits(fname, sel=sel, box=box, pixbox=pixbox, geometry=geometry, wrap=wrap, mode=mode, sel_threshold=sel_threshold, wcs=wcs, hdu=hdu, delayed=delayed, verbose=verbose)
elif fmt == "hdf":
res = read_hdf(fname, sel=sel, box=box, pixbox=pixbox, geometry=geometry, wrap=wrap, mode=mode, sel_threshold=sel_threshold, wcs=wcs, delayed=delayed, hdu=hdu, address=address)
elif fmt == "npy":
res = read_npy(fname, sel=sel, box=box, pixbox=pixbox, geometry=geometry, wrap=wrap, mode=mode, sel_threshold=sel_threshold, wcs=wcs, delayed=delayed, hdu=hdu, address=address)
else:
raise ValueError
if len(toks) > 1:
res = eval("res"+":".join(toks[1:]))
return res
[docs]def read_map_geometry(fname, fmt=None, hdu=None, address=None):
"""Read an enmap geometry from file. The file type is inferred
from the file extension, unless fmt is passed.
fmt must be one of 'fits' and 'hdf'."""
toks = fname.split(":")
fname = toks[0]
if fmt == None:
if fname.endswith(".hdf"): fmt = "hdf"
elif fname.endswith(".fits"): fmt = "fits"
elif fname.endswith(".fits.gz"): fmt = "fits.gz"
else: fmt = "fits"
if fmt == "fits":
shape, wcs = read_fits_geometry(fname, hdu=hdu)
elif fmt == "fits.gz":
shape, wcs = read_fits_geometry(fname, hdu=hdu, quick=False)
elif fmt == "hdf":
shape, wcs = read_hdf_geometry(fname, address=address)
else:
raise ValueError
if len(toks) > 1:
sel = eval("utils.sliceeval"+":".join(toks[1:]))[-2:]
shape, wcs = slice_geometry(shape, wcs, sel)
return shape, wcs
[docs]def write_map_geometry(fname, shape, wcs, fmt=None):
"""Write an enmap geometry to file. The file type is inferred
from the file extension, unless fmt is passed.
fmt must be one of 'fits' and 'hdf'. Only fits is supported for now, though."""
toks = fname.split(":")
fname = toks[0]
if fmt == None:
if fname.endswith(".hdf"): fmt = "hdf"
elif fname.endswith(".fits"): fmt = "fits"
else: fmt = "fits"
if fmt == "fits":
write_fits_geometry(fname, shape, wcs)
elif fmt == "hdf":
raise NotImplementedError("Write write_hdf_geometry not implemented yet")
else:
raise ValueError
[docs]def write_fits(fname, emap, extra={}, allow_modify=False):
"""Write an enmap to a fits file."""
# The fits write routines may attempt to modify
# the map. So make a copy.
if not allow_modify:
emap = enmap(emap, copy=True)
# Get our basic wcs header
header = emap.wcs.to_header(relax=True)
# Add our map headers
header['NAXIS'] = emap.ndim
for i,n in enumerate(emap.shape[::-1]):
header['NAXIS%d'%(i+1)] = n
for key, val in extra.items():
header[key] = val
hdus = astropy.io.fits.HDUList([astropy.io.fits.PrimaryHDU(emap, header)])
if isinstance(fname, str):
utils.mkdir(os.path.dirname(fname))
with warnings.catch_warnings():
warnings.filterwarnings('ignore')
hdus.writeto(fname, overwrite=True)
[docs]def write_fits_geometry(fname, shape, wcs):
"""Write just the geometry to a fits file that will only contain the header"""
header = wcs.to_header(relax=True)
header.insert(0, ("SIMPLE",True))
header.insert(1, ("BITPIX",-32))
header.insert(2, ("NAXIS",len(shape)))
for i, s in enumerate(shape[::-1]):
header.insert(3+i, ("NAXIS%d"%(i+1),s))
# Dummy, but must be present
utils.mkdir(os.path.dirname(fname))
header.tofile(fname, overwrite=True)
[docs]def read_fits(fname, hdu=None, sel=None, box=None, pixbox=None, geometry=None, wrap="auto", mode=None, sel_threshold=10e6, wcs=None, delayed=False, verbose=False):
"""Read an enmap from the specified fits file. By default,
the map and coordinate system will be read from HDU 0. Use
the hdu argument to change this. The map must be stored as
a fits image. If sel is specified, it should be a slice
that will be applied to the image before reading. This avoids
reading more of the image than necessary. Instead of sel,
a coordinate box [[yfrom,xfrom],[yto,xto]] can be specified."""
if hdu is None: hdu = 0
hdu = astropy.io.fits.open(fname)[hdu]
ndim = len(hdu.shape)
if hdu.header["NAXIS"] < 2:
raise ValueError("%s is not an enmap (only %d axes)" % (str(fname), hdu.header["NAXIS"]))
if wcs is None:
with warnings.catch_warnings():
wcs = wcsutils.WCS(hdu.header).sub(2)
proxy = ndmap_proxy_fits(hdu, wcs, fname=fname, threshold=sel_threshold, verbose=verbose)
return read_helper(proxy, sel=sel, box=box, pixbox=pixbox, geometry=geometry, wrap=wrap, mode=mode, delayed=delayed)
[docs]def read_fits_geometry(fname, hdu=None, quick=True):
"""Read an enmap wcs from the specified fits file. By default,
the map and coordinate system will be read from HDU 0. Use
the hdu argument to change this. The map must be stored as
a fits image."""
if hdu is None: hdu = 0
if hdu == 0 and quick:
# Read header only, without body
if isinstance(fname, str):
with open(fname, "rb") as ifile:
header = astropy.io.fits.Header.fromstring(ifile.read(2880))
else:
# Handle the case where the user already has a file object
header = astropy.io.fits.Header.fromstring(fname.read(2880))
else:
with utils.nowarn():
header = astropy.io.fits.open(fname)[hdu].header
if header["NAXIS"] < 2:
raise ValueError("%s is not an enmap (only %d axes)" % (str(fname), header["NAXIS"]))
with warnings.catch_warnings():
wcs = wcsutils.WCS(header).sub(2)
shape = tuple([header["NAXIS%d"%(i+1)] for i in range(header["NAXIS"])[::-1]])
return shape, wcs
[docs]def write_hdf(fname, emap, address=None, extra={}):
"""Write an enmap as an hdf file, preserving all the WCS
metadata.
Args:
fname (str or h5py.Group): Filename or open h5py handle.
emap (ndmap): Object to store.
address (str): Group address within the HDF file to place
the result. If None, the data are written at root level
after truncating the file.
extra (dict): additional data to write into the output.
Notes:
If address is None, the output file will be replaced if it
exists. If address is a string, and the target file exists,
the file will not be reset but anything living at that
address will be replaced with the encoded emap."""
import h5py
emap = enmap(emap, copy=False)
if isinstance(fname, h5py.Group):
context = contextlib.nullcontext(fname)
else:
utils.mkdir(os.path.dirname(fname))
mode = "w" if address is None else "a"
context = h5py.File(fname, mode)
with context as hfile:
if address is not None:
if address in hfile:
del hfile[address]
hfile = hfile.create_group(address)
hfile["data"] = emap
header = emap.wcs.to_header()
for key in header:
hfile["wcs/"+key] = header[key]
for key, val in extra.items():
hfile[key] = val
[docs]def read_hdf(fname, hdu=None, sel=None, box=None, pixbox=None, geometry=None, wrap="auto", mode=None, sel_threshold=10e6, wcs=None, delayed=False, address=None):
"""Read an enmap from the specified hdf file. Two formats
are supported. The old enmap format, which simply used
a bounding box to specify the coordinates, and the new
format, which uses WCS properties. The latter is used if
available. With the old format, plate carree projection
is assumed. Note: some of the old files have a slightly
buggy wcs, which can result in 1-pixel errors.
If address is a string, the map will be loaded from that group
address within fname.
Note fname can be passed in as an h5py.Group (e.g. an open
h5py.File) instead of a string, and the map will be read from that
handle."""
import h5py
if isinstance(fname, h5py.Group):
context = contextlib.nullcontext(fname)
else:
context = h5py.File(fname, "r")
with context as hfile:
if address is not None:
hfile = hfile[address]
data = hfile["data"][()]
hwcs = hfile["wcs"]
header = astropy.io.fits.Header()
for key in hwcs:
header[key] = fix_python3(hwcs[key][()])
if wcs is None:
wcs = wcsutils.WCS(header).sub(2)
proxy = ndmap_proxy_hdf(data, wcs, fname=fname, threshold=sel_threshold)
return read_helper(proxy, sel=sel, box=box, pixbox=pixbox, geometry=geometry, wrap=wrap, mode=mode, delayed=delayed)
[docs]def read_hdf_geometry(fname, address=None):
"""Read an enmap wcs from the specified hdf file."""
import h5py
with h5py.File(fname,"r") as hfile:
if address is not None:
hfile = hfile[address]
hwcs = hfile["wcs"]
header = astropy.io.fits.Header()
for key in hwcs:
header[key] = hwcs[key][()]
wcs = wcsutils.WCS(header).sub(2)
shape = hfile["data"].shape
return shape, wcs
[docs]def read_npy(fname, hdu=None, sel=None, box=None, pixbox=None, geometry=None, wrap="auto", mode=None, sel_threshold=10e6, wcs=None, delayed=False, address=None):
"""Read an enmap from the specified npy file. Only minimal support.
No wcs information."""
return enmap(np.load(fname), wcs)
[docs]def fix_python3(s):
"""Convert "bytes" to string in python3, while leaving other types unmolested.
Python3 string handling is stupid."""
try:
if isinstance(s, bytes): return s.decode("utf-8")
else: return s
except TypeError: return s
[docs]def read_helper(data, sel=None, box=None, pixbox=None, geometry=None, wrap="auto", mode=None, delayed=False):
"""Helper function for map reading. Handles the slicing, sky-wrapping and capping, etc."""
if delayed: return data # Slicing not supported yet when we want to return a proxy object
if geometry is not None: data = extract(data, *geometry, wrap=wrap)
if box is not None: data = submap(data, box, wrap=wrap)
if pixbox is not None: data = extract_pixbox(data, pixbox, wrap=wrap)
if sel is not None: data = data[sel]
data = data[:] # Get rid of the wrapper if it still remains
data = data.copy()
return data
# These wrapper classes are there to let us reuse the normal map
# extract and submap operations on fits and hdf maps without needing
# to read in all the data.
[docs]class ndmap_proxy:
def __init__(self, shape, wcs, dtype, fname="<none>", threshold=1e7):
self.fname, self.shape, self.wcs, self.dtype = fname, shape, wcs, dtype
self.threshold = threshold
@property
def ndim(self): return len(self.shape)
@property
def geometry(self): return self.shape, self.wcs
@property
def npix(self): return self.shape[-2]*self.shape[-1]
def __str__(self): return repr(self)
def __repr__(self): return "ndmap_proxy(fname=%s, shape=%s, wcs=%s, dtype=%s)" % (str(self.fname), str(self.shape), str(self.wcs), str(self.dtype))
def __getslice__(self, a, b=None, c=None): return self[slice(a,b,c)]
def __getitem__(self, sel): raise NotImplementedError("ndmap_proxy must be subclassed")
[docs] def submap(self, box, mode=None, wrap="auto"):
return submap(self, box, mode=mode, wrap=wrap)
[docs] def stamps(self, pos, shape, aslist=False): return stamps(self, pos, shape, aslist=aslist)
# Copy over some methos from ndmap
for name in ["sky2pix", "pix2sky", "box", "pixbox_of", "posmap", "pixmap", "lmap", "modlmap", "modrmap", "area", "pixsize", "pixshape",
"pixsizemap", "pixshapemap", "extent", "distance_from", "center", "extract", "extract_pixbox"]:
setattr(ndmap_proxy, name, getattr(ndmap, name))
[docs]class ndmap_proxy_fits(ndmap_proxy):
def __init__(self, hdu, wcs, fname="<none>", threshold=1e7, verbose=False):
self.hdu = hdu
self.verbose = verbose
# Note that 'section' is not part of some HDU types, such as CompImageHDU.
self.use_section = hasattr(hdu, 'section')
if self.use_section:
dtype = fix_endian(hdu.section[(slice(0,1),)*hdu.header["NAXIS"]]).dtype
else:
dtype = fix_endian(hdu.data[(slice(0,1),)*hdu.header["NAXIS"]]).dtype
self.stokes_flips = get_stokes_flips(hdu)
def slist(vals):
return ",".join([str(v) for v in vals])
if verbose and np.any(self.stokes_flips >= 0):
print("Converting index %s for Stokes axis %s from IAU to COSMO in %s" % (
slist(self.stokes_flips[self.stokes_flips >= 0]),
slist(np.where(self.stokes_flips >= 0)[0]),
str(fname)))
ndmap_proxy.__init__(self, hdu.shape, wcs, dtype, fname=fname, threshold=threshold)
def __getitem__(self, sel):
_, psel = utils.split_slice(sel, [len(self.shape)-2,2])
if len(psel) > 2: raise IndexError("too many indices")
_, wcs = slice_geometry(self.shape[-2:], self.wcs, psel)
if (self.hdu.size > self.threshold) and self.use_section:
sel1, sel2 = utils.split_slice(sel, [len(self.shape)-1,1])
res = self.hdu.section[sel1][(Ellipsis,)+sel2]
else: res = self.hdu.data[sel]
# Apply stokes flips if necessary. This is a bit complicated because we have to
# take into account that slicing might have already been done. The simplest way
# to do this is to make a sign array with the same shape as all the pre-dimensions
# of the raw map, and then slice that the same way.
if np.any(self.stokes_flips >= 0):
signs = np.full(self.shape[:-2], 1, int)
for i, ind in enumerate(self.stokes_flips):
if ind >= 0 and ind < self.shape[i]:
signs[(slice(None),)*i + (ind,)] *= -1
sel1, sel2 = utils.split_slice(sel, [len(self.shape)-2,2])
res *= signs[sel1][...,None,None]
return ndmap(fix_endian(res), wcs)
def __repr__(self): return "ndmap_proxy_fits(fname=%s, shape=%s, wcs=%s, dtype=%s)" % (str(self.fname), str(self.shape), str(self.wcs), str(self.dtype))
[docs]class ndmap_proxy_hdf(ndmap_proxy):
def __init__(self, dset, wcs, fname="<none>", threshold=1e7):
self.dset = dset
ndmap_proxy.__init__(self, dset.shape, wcs, dset.dtype, fname=fname, threshold=threshold)
def __getitem__(self, sel):
_, psel = utils.split_slice(sel, [self.ndim-2,2])
if len(psel) > 2: raise IndexError("too many indices")
_, wcs = slice_geometry(self.shape[-2:], self.wcs, psel)
if self.dset.size > self.threshold:
sel1, sel2 = utils.split_slice(sel, [len(self.shape)-1,1])
res = self.dset[sel1][(Ellipsis,)+sel2]
else:
res = self.dset[sel]
return ndmap(fix_endian(res), wcs)
def __repr__(self): return "ndmap_proxy_hdf(fname=%s, shape=%s, wcs=%s, dtype=%s)" % (self.fname, str(self.shape), str(self.wcs), str(self.dtype))
[docs]def fix_endian(map):
"""Make endianness of array map match the current machine.
Returns the result."""
if map.dtype.byteorder not in ['=','<' if sys.byteorder == 'little' else '>']:
map = map.byteswap(True).newbyteorder()
map.dtype = utils.fix_dtype_mpi4py(map.dtype)
return map
[docs]def get_stokes_flips(hdu):
"""Given a FITS HDU, parse its header to determine which, if any, axes
need to have their sign flip to get them in the COSMO polarization convention.
Returns an array of length ndim, with each entry being the index of the axis
that should be flipped, or -1 if none should be flipped."""
ndim = hdu.header["NAXIS"]
# First find which index of each axis is U
inds = np.full(ndim, -1, int)
noflip = np.full(ndim, -1, int)
def get(name, ndim, i, default=None):
nfull = name + "%d" % (ndim-i)
return hdu.header[nfull] if nfull in hdu.header else default
for i in range(ndim):
ctype = get("CTYPE", ndim, i, "")
if ctype.strip() == "STOKES":
crpix = get("CRPIX", ndim, i, 1.0)
crval = get("CRVAL", ndim, i, 1.0)
cdelt = get("CDELT", ndim, i, 1.0)
U_ind = utils.nint((3-crval)/cdelt+crpix)
inds[i] = U_ind - 1
# If there are no U indices (for example because there was no Stokes axis),
# then there is nothing to flip
if np.all(inds == -1): return noflip
# Otherwise, check the polarization convention
if "POLCCONV" in hdu.header: polconv = hdu.header["POLCCONV"].strip()
elif "POLCONV" in hdu.header: polconv = hdu.header["POLCONV" ].strip()
else:
warnings.warn("FITS file has stokes axis, but no POLCCONV is specified. Assuming IAU")
return inds
if polconv == "COSMO": return noflip
elif polconv == "IAU": return inds
else:
warnings.warn("Unrecognized POLCCONV '%s', assuming COSMO" % polconv)
return noflip
[docs]def shift(map, off, inplace=False, keepwcs=False):
"""Cyclicly shift the pixels in map such that a pixel at
position (i,j) ends up at position (i+off[0],j+off[1])"""
if not inplace: map = map.copy()
off = np.atleast_1d(off)
for i, o in enumerate(off):
if o != 0:
map[:] = np.roll(map, o, -len(off)+i)
if not keepwcs:
map.wcs.wcs.crpix += off[::-1]
return map
[docs]def fractional_shift(map, off, keepwcs=False, nofft=False):
"""Shift map cyclically by a non-integer amount off [{y_off,x_off}]"""
omap = samewcs(enfft.shift(map, off, nofft=nofft), map)
if not keepwcs:
omap.wcs.wcs.crpix += off[::-1]
return omap
[docs]def fftshift(map, inplace=False):
if not inplace: map = map.copy()
map[:] = np.fft.fftshift(map, axes=[-2,-1])
return map
[docs]def ifftshift(map, inplace=False):
if not inplace: map = map.copy()
map[:] = np.fft.ifftshift(map, axes=[-2,-1])
return map
[docs]def fillbad(map, val=0, inplace=False):
return np.nan_to_num(map, copy=not inplace, nan=val, posinf=val, neginf=val)
[docs]def resample(map, oshape, off=(0,0), method="fft", mode="wrap", corner=False, order=3):
"""Resample the input map such that it covers the same area of the sky
with a different number of pixels given by oshape."""
# Construct the output shape and wcs
oshape = map.shape[:-2] + tuple(oshape)[-2:]
if method == "fft":
omap = ifft(resample_fft(fft(map, normalize=False), oshape, off=off, corner=corner, norm=1/map.npix), normalize=False).real
elif method == "spline":
owcs = wcsutils.scale(map.wcs, np.array(oshape[-2:],float)/map.shape[-2:], rowmajor=True, corner=corner)
off = np.zeros(2)+off
if not corner:
off -= 0.5 - 0.5*np.array(oshape[-2:],float)/map.shape[-2:] # in output units
opix = pixmap(oshape) - off[:,None,None]
ipix = opix * (np.array(map.shape[-2:],float)/oshape[-2:])[:,None,None]
omap = ndmap(map.at(ipix, unit="pix", mode=mode, order=order), owcs)
else:
raise ValueError("Invalid resample method '%s'" % method)
return omap
[docs]def resample_fft(fimap, oshape, fomap=None, off=(0,0), corner=False, norm="pix", op=lambda a,b:b, dummy=False):
"""Like resample, but takes a fourier-space map as input and outputs a fourier-space map.
unit specifies which fourier-space unit is used. "pix" corresponds to
the standard enmap normalization (normalize=True in enmap.fft). "phys" corresponds
to physical normalization (normalize="phys"). The fourier-units matter because some
fourier-space units need rescaline when going from one resolution to another.
"""
# Construct the output shape and wcs
oshape = fimap.shape[:-2] + tuple(oshape)[-2:]
off = np.zeros(2)+off
if not corner:
# Apply phase shift to realign with pixel centers. This can be seen as a half pixel shift to
# the left in the original pixelization followed by a half pixel shift to the right in the new
# pixelization.
off -= 0.5 - 0.5*np.array(oshape[-2:],float)/fimap.shape[-2:] # in output units
if fomap is None:
owcs = wcsutils.scale(fimap.wcs, np.array(oshape[-2:],float)/fimap.shape[-2:], rowmajor=True, corner=corner)
if dummy: return oshape, owcs
fomap = zeros(oshape, owcs, fimap.dtype)
if dummy: return oshape, owcs
# We sadly need to care about fourier-space normalization when doing this, since
# different-size fourier spaces can have different units. First handle explicit normalization,
# where the factor to multiply is given directly.
try: norm = float(norm)
except (TypeError, ValueError):
# Then handle various normalization conventions.
if norm is None: norm = 1 # Don't do anything if None is passed. Cost free
elif norm == "plain": norm = fomap.npix/fimap.npix # Corresponds to normalize=False in enmap.ifft
elif norm == "pix": norm = (fomap.npix/fimap.npix)**0.5 # Corresponds to normalize=True, enmap.fft default
elif norm == "phys": norm = 1 # Corresponds to normalize="phys"
else: raise ValueError("Unrecognized fourier unit '%s'" % str(unit))
# copy over all 4 quadrants. This would have been a single operation if the
# fourier center had been in the middle. This could be acieved using fftshift,
# but that would require two extra full-array shifts
cny, cnx = np.minimum(fimap.shape[-2:], oshape[-2:])
hny, hnx = cny//2, cnx//2
# This function is used to avoid paying the cost of multiplying by norm when it's one
def transfer(dest, source, norm, op):
if norm != 1: source = source*norm
dest[:] = op(dest, source)
transfer(fomap[...,:hny, :hnx ],fimap[...,:hny, :hnx ], norm, op)
transfer(fomap[...,:hny, -(cnx-hnx):],fimap[...,:hny, -(cnx-hnx):], norm, op)
transfer(fomap[...,-(cny-hny):,:hnx ],fimap[...,-(cny-hny):,:hnx ], norm, op)
transfer(fomap[...,-(cny-hny):,-(cnx-hnx):],fimap[...,-(cny-hny):,-(cnx-hnx):], norm, op)
if np.any(off != 0):
# It's fastest to do this here when downsampling, but when upsampling
# it's faster to do so in the fimap. And for a mix it's bad to do it both places.
fomap[:] = enfft.shift(fomap, off, axes=(-2,-1), nofft=True)
return fomap
[docs]def spin_helper(spin, n):
spin = np.array(spin).reshape(-1)
scomp = 1+(spin!=0)
ci, i1 = 0, 0
while True:
i2 = min(i1+scomp[ci],n)
if i2-i1 != scomp[ci]: raise IndexError("Unpaired component in spin transform")
yield spin[ci], i1, i2
if i2 == n: break
i1 = i2
ci = (ci+1)%len(spin)
[docs]def spin_pre_helper(spin, pre):
"""Like spin_helper, but also handles looping over pre-dimensions"""
# Make spin a 1d array. This will be used to
# interpret the last axis in pre
spin = np.array(spin).reshape(-1)
scomp = 1+(spin!=0)
# Make pre an array that's at least 1d
pre = np.array(pre).reshape(-1)
# Handle empty pre-dimentions
if len(pre) == 0:
yield 0, (None,)
return
n = pre[-1]
# Loop over pre-dimensions
for Ipre in utils.nditer(pre[:-1]):
ci, i1 = 0, 0
while True:
i2 = min(i1+scomp[ci],n)
if i2-i1 != scomp[ci]: raise IndexError("Unpaired component in spin transform")
Itot = Ipre + (slice(i1,i2),)
yield spin[ci], Itot
if i2 == n: break
i1 = i2
ci = (ci+1)%len(spin)
# It's often useful to be able to loop over padded tiles, do some operation on them,
# and then stitch them back together with crossfading. If would be handy to have a way to
# hide all this complexity. How about an iterator that iterates over padded tiles?
# E.g.
# for itile, otile in zip(ipadtiles(imap), opadtiles(omap)):
# otile[:] = fancy_stuff(itile)
# The input tile iterator is straightforward. The output iterator would
# zero out omap at the start, and then at the start of each next()
# would paste the previously yielded tile back into omap with crossfading weights applied.
# A problem with this formulation where ipadtiles and opadtiles are separate
# functions, though, is that padding arguments need to be duplicated, which can get
# tedious. Padding argument must always be consistent when iterating over input
# and output maps, so probably better to have a single function that processes
# multiple maps.
#
# for itile, otile in padtiles(imap, omap, tshape=512, pad=30, apod=30):
# otile[:] = foo(itile[:])
#
# When multiple maps are involved, how should it know which ones
# are output and input maps? Default:
# 1 map: input
# 2 maps: input, output
# N maps: input, input, input, ..., output
# But have an optional argument that lets us specify the types.
#
# 3rd alternative which is cleaner but less convenient:
# padtile = Padtiler(tshape=512, pad=30, apod=30)
# for itile, otile in zip(padtile.in(imap), padtile.out(omap)):
# otile[:] = foo(itile)
# This one has the advantage that it can be built once and then
# passed to other functions as a single argument. It can easily be used to
# implement #2, so can get both cheaply. #3 is less convenient in
# most cases, so #2 would be the typcial interface.
[docs]def padtiles(*maps, tshape=600, pad=60, margin=60, mode="auto", start=0, step=1):
"""Iterate over padded tiles in one or more maps. The tiling
will have a logical tile shape of tshape, but each yielded tile
will be expanded with some data from its neighbors. The extra
area consists of two parts: The padding and the margin. For
a read-iterator these are equivalent, but for a write-iterator
the margin will be ignored (and so can be used for apodization),
while the padding will be used for crossfading when mergin the
tiles together.
Typical usage:
for itile, otile in padtiles(imap, imap, margin=60):
itile = apod(itile, 60)
otile[:] = some_filter(itile)
This would iterate over tiles of imap and omap, with the default
padding and a margin of 60 pixels. The margin region is used for
apodization, and some filter is then applied to the tile, writing
the result to the output tile. Note the use of [:] to actually write
to otile instead of just rebinding the variable name!
It's also possible to iterate over fewer or more maps at once.
See the "mode" argument.
If the tile shape does not evenly divide the map shape, then the
last tile in each row and column will extend beyond the edge of the
map. These pixels will be treated as enmap.extract does, with the
potential of sky wrapping. Warning: Write-iterators for a map that
goes all the way around the sky while the tile shape does not divide
the map shape will incorrectly weight the wrapped tiles, so avoid this.
Arguments:
* *maps: The maps to iterate over. Must have the same pixel dimensions.
* tshape: The tile shape. Either an integer or a (yshape,xshape) tuple.
Default: 600 pixels.
* pad: The padding. Either an integer or a (ypad,xpad) tuple. Used to
implement context and crossfading. Cannot be larger than half of tshape.
Default: 60 pixels.
* margin: The margin size. Either an integer or a (ymargin,xmargin) tuple.
Ignored in write-iterators, so suitable for apodization.
Default 60 pixels.
* mode: Specifies which maps should be read-iterated vs. write-iterated.
A read-iterated map will yield padded tiles from the corresponding map.
Writes to these tiles are discarded. A write-iterated map yields zeroed
tiles of the same shape as the read-iterator. Writes to these tiles are
used to update the corresponding map, including crossfading the overlapping
regions (due to the padding) such that there aren't any sharp tile boundaries
in the output map. mode can be either "auto" or a string of the same length
as maps consisting of "r" and "w" characters. If the nth character is r/w
then the corresponding map will be read/write-iterated. If the string is
"auto", then the last map will be output-iterated and all the others input-
iterated, unless there's only a single map in which case it will be input-
iterated. Default: "auto".
* start: Flattened tile offset to start at. Useful for mpi loops. Default: 0.
* step: Flattened tile stride. Useful for mpi loops. Default: 1
"""
if mode == "auto":
if len(maps) == 0: mode = ""
elif len(maps) == 1: mode = "r"
else: mode = "r"*(len(maps)-1)+"w"
tiler = Padtiler(tshape=tshape, pad=pad, margin=margin)
iters = []
for map, io in zip(maps, mode):
if io == "r": iters.append(tiler.read (map))
elif io == "w": iters.append(tiler.write(map))
else: raise ValueError("Invalid mode character '%s'" % str(io))
# Can't just return zip(*iters) because zip gives up when just
# one iterator raises StopIteration. This doesn't allow the other
# iterators to finish. Maybe this is hacky
return utils.zip2(*iters)
[docs]class Padtiler:
"""Helper class used to implement padtiles. See its docstring for details."""
def __init__(self, tshape=600, pad=60, margin=60, start=0, step=1):
self.tshape = tuple(np.broadcast_to(tshape, 2).astype(int))
self.pad = tuple(np.broadcast_to(pad, 2).astype(int))
self.margin = tuple(np.broadcast_to(margin, 2).astype(int))
oly, olx = 2*np.array(self.pad,int) # overlap region size
self.wy = (np.arange(oly)+1)/(oly+1)
self.wx = (np.arange(olx)+1)/(olx+1)
self.start = start
self.step = step
def _tbound(self, tile, tsize, n):
pix1 = tile*tsize
pix2 = (tile+1)*tsize
return pix1, pix2
[docs] def read (self, imap): return self._it_helper(imap, mode="read")
[docs] def write(self, omap): return self._it_helper(omap, mode="write")
def _it_helper(self, map, mode):
# Loop over tiles
nty, ntx = (np.array(map.shape[-2:],int)+self.tshape-1)//self.tshape
growy, growx = np.array(self.pad) + self.margin
oly, olx = 2*np.array(self.pad) # overlap region size
for ti in range(self.start, nty*ntx, self.step):
ty = ti // ntx
tx = ti % ntx
y1, y2 = self._tbound(ty, self.tshape[-2], map.shape[-2])
x1, x2 = self._tbound(tx, self.tshape[-1], map.shape[-1])
# Construct padded pixel box and extract it
pixbox = np.array([[y1-growy,x1-growx],[y2+growy,x2+growx]])
tile = map.extract_pixbox(pixbox).copy()
if mode == "read":
yield tile
else:
tile[:] = 0
yield tile
# Before the next iteration, take the changes the user
# made to tile, cop off the margin, and apply crossfading
# weights so the overlapping pad regions add up to 1, and
# then add to the output map
tile = tile[...,self.margin[-2]:tile.shape[-2]-self.margin[-2],self.margin[-1]:tile.shape[-1]-self.margin[-1]]
# Apply crossfade weights
if ty > 0: tile[...,:oly,:] *= self.wy[:,None]
if tx > 0: tile[...,:,:olx] *= self.wx[None,:]
if ty < nty-1: tile[...,tile.shape[-2]-oly:,:] *= self.wy[::-1,None]
if tx < ntx-1: tile[...,:,tile.shape[-1]-olx:] *= self.wx[None,::-1]
# And add into output map
map.insert(tile, op=lambda a,b:a+b)