Source code for pixell.lensing

import numpy as np
from . import enmap, utils, powspec
try: from . import interpol
except ImportError: pass

####### Flat sky lensing #######

[docs]def lens_map(imap, grad_phi, order=3, mode="spline", border="cyclic", trans=False, deriv=False, h=1e-7): """Lens map imap[{pre},ny,nx] according to grad_phi[2,ny,nx], where phi is the lensing potential, and grad_phi, which can be computed as enmap.grad(phi), simply is the coordinate displacement for each pixel. order, mode and border specify details of the interpolation used. See enlib.interpol.map_coordinates for details. If trans is true, the transpose operation is performed. This is NOT equivalent to delensing. If the same lensing field needs to be reused repeatedly, then higher efficiency can be gotten from calling displace_map directly with precomputed pixel positions.""" # Converting from grad_phi to pix has roughly the same cost as calling displace_map. # So almost a factor 2 in speed can be won from calling displace_map directly. pos = imap.posmap() + grad_phi pix = imap.sky2pix(pos, safe=False) if not deriv: return displace_map(imap, pix, order=order, mode=mode, border=border, trans=trans) else: # displace_map deriv gives us ndim,{pre},ny,nx dlens_pix = displace_map(imap, pix, order=order, mode=mode, border=border, trans=trans, deriv=True) res = dlens_pix[0]*0 pad = (slice(None),)+(None,)*(imap.ndim-2)+(slice(None),slice(None)) for i in range(2): pos2 = pos.copy() pos2[i] += h pix2 = imap.sky2pix(pos2, safe=False) dpix = (pix2-pix)/h res += np.sum(dlens_pix * dpix[pad],0) return res
[docs]def delens_map(imap, grad_phi, nstep=3, order=3, mode="spline", border="cyclic"): """The inverse of lens_map, such that delens_map(lens_map(imap, dpos), dpos) = imap for well-behaved fields. The inverse does not always exist, in which case the equation above will only be approximately fulfilled. The inverse is computed by iteration, with the number of steps in the iteration controllable through the nstep parameter. See enlib.interpol.map_coordinates for details on the other parameters.""" grad_phi = delens_grad(grad_phi, nstep=nstep, order=order, mode=mode, border=border) return lens_map(imap, -grad_phi, order=order, mode=mode, border=border)
[docs]def delens_grad(grad_phi, nstep=3, order=3, mode="spline", border="cyclic"): """Helper function for delens_map. Attempts to find the undisplaced gradient given one that has been displaced by itself.""" alpha = grad_phi for i in range(nstep): alpha = lens_map(grad_phi, -alpha, order=order, mode=mode, border=border) return alpha
[docs]def displace_map(imap, pix, order=3, mode="spline", border="cyclic", trans=False, deriv=False): """Displace map m[{pre},ny,nx] by pix[2,ny,nx], where pix indicates the location in the input map each output pixel should get its value from (float). The output is [{pre},ny,nx].""" if not deriv: omap = imap.copy() else: omap = enmap.empty((2,)+imap.shape, imap.wcs, imap.dtype) if not trans: interpol.map_coordinates(imap, pix, omap, order=order, mode=mode, border=border, trans=trans, deriv=deriv) else: interpol.map_coordinates(omap, pix, imap, order=order, mode=mode, border=border, trans=trans, deriv=deriv) return omap
# Compatibility function. Not quite equivalent lens_map above due to taking phi rather than # its gradient as an argument.
[docs]def lens_map_flat(cmb_map, phi_map): raw_pix = cmb_map.pixmap() + enmap.grad_pix(phi_map) # And extract the interpolated values. Because of a bug in map_pixels with # mode="wrap", we must handle wrapping ourselves. npad = int(np.ceil(max(np.max(-raw_pix),np.max(raw_pix-np.array(cmb_map.shape[-2:])[:,None,None])))) pmap = enmap.pad(cmb_map, npad, wrap=True) return enmap.samewcs(utils.interpol(pmap, raw_pix+npad, order=4, mode="wrap"), cmb_map)
######## Curved sky lensing ########
[docs]def phi_to_kappa(phi_alm,phi_ainfo=None): """Convert lensing potential alms phi_alm to lensing convergence alms kappa_alm, i.e. phi_alm * l * (l+1) / 2 Args: phi_alm: (...,N) ndarray of spherical harmonic alms of lensing potential phi_ainfo: If ainfo is provided, it is an alm_info describing the layout of the input alm. Otherwise it will be inferred from the alm itself. Returns: kappa_alm: The filtered alms phi_alm * l * (l+1) / 2 """ from . import curvedsky return curvedsky.almxfl(alm=phi_alm,lfilter=lambda x: x*(x+1)/2,ainfo=phi_ainfo)
[docs]def lens_map_curved(shape, wcs, phi_alm, cmb_alm, phi_ainfo=None, maplmax=None, dtype=np.float64, spin=[0,2], output="l", geodesic=True, verbose=False, delta_theta=None): from . import curvedsky # Restrict to target number of components oshape = shape[-3:] if len(oshape) == 2: shape = (1,)+tuple(shape) if delta_theta is None: bsize = shape[-2] else: bsize = utils.nint(abs(delta_theta/utils.degree/wcs.wcs.cdelt[1])) # Adjust bsize so we don't get any tiny blocks at the end nblock= shape[-2]//bsize bsize = int(shape[-2]/(nblock+0.5)) # Allocate output maps if "p" in output: phi_map = enmap.empty(shape[-2:], wcs, dtype=dtype) if "k" in output: kappa_map = enmap.empty(shape[-2:], wcs, dtype=dtype) kappa_alm = phi_to_kappa(phi_alm,phi_ainfo=phi_ainfo) for i1 in range(0, shape[-2], bsize): curvedsky.alm2map(kappa_alm, kappa_map[...,i1:i1+bsize,:]) del kappa_alm if "a" in output: grad_map = enmap.empty((2,)+shape[-2:], wcs, dtype=dtype) if "u" in output: cmb_raw = enmap.empty(shape, wcs, dtype=dtype) if "l" in output: cmb_obs = enmap.empty(shape, wcs, dtype=dtype) # Then loop over dec bands for i1 in range(0, shape[-2], bsize): i2 = min(i1+bsize, shape[-2]) lshape, lwcs = enmap.slice_geometry(shape, wcs, (slice(i1,i2),slice(None))) if "p" in output: if verbose: print("Computing phi map") curvedsky.alm2map(phi_alm, phi_map[...,i1:i2,:]) if verbose: print("Computing grad map") if "a" in output: grad = grad_map[...,i1:i2,:] else: grad = enmap.zeros((2,)+lshape[-2:], lwcs, dtype=dtype) curvedsky.alm2map(phi_alm, grad, deriv=True) if "l" not in output: continue if verbose: print("Computing observed coordinates") obs_pos = enmap.posmap(lshape, lwcs) if verbose: print("Computing alpha map") raw_pos = enmap.samewcs(offset_by_grad(obs_pos, grad, pol=shape[-3]>1, geodesic=geodesic), obs_pos) del obs_pos, grad if "u" in output: if verbose: print("Computing unlensed map") curvedsky.alm2map(cmb_alm, cmb_raw[...,i1:i2,:], spin=spin) if verbose: print("Computing lensed map") cmb_obs[...,i1:i2,:] = curvedsky.alm2map_pos(cmb_alm, raw_pos[:2], spin=spin) if raw_pos.shape[0] > 2 and np.any(raw_pos[2]): if verbose: print("Rotating polarization") cmb_obs[...,i1:i2,:] = enmap.rotate_pol(cmb_obs[...,i1:i2,:], raw_pos[2]) del raw_pos del cmb_alm, phi_alm # Output in same order as specified in output argument res = [] for c in output: if c == "l": res.append(cmb_obs.reshape(oshape)) elif c == "u": res.append(cmb_raw.reshape(oshape)) elif c == "p": res.append(phi_map) elif c == "k": res.append(kappa_map) elif c == "a": res.append(grad_map) return tuple(res)
[docs]def rand_alm(ps_lensinput, lmax=None, dtype=np.float64, seed=None, phi_seed=None, verbose=False, ncomp=None): from . import curvedsky ctype = np.result_type(dtype,0j) if ncomp is not None: ps_lensinput = ps_lensinput[:1+ncomp,:1+ncomp] # First draw a random lensing field, and use it to compute the undeflected positions if verbose: print("Generating alms") if phi_seed is None: alm, ainfo = curvedsky.rand_alm(ps_lensinput, lmax=lmax, seed=seed, dtype=ctype, return_ainfo=True) else: # We want separate seeds for cmb and phi. This means we have to do things a bit more manually wps, ainfo = curvedsky.prepare_ps(ps_lensinput, lmax=lmax) alm = np.empty([1+ncomp,ainfo.nelem],ctype) curvedsky.rand_alm_white(ainfo, alm=alm[:1], seed=phi_seed) curvedsky.rand_alm_white(ainfo, alm=alm[1:], seed=seed) ps12 = enmap.multi_pow(wps, 0.5) ainfo.lmul(alm, (ps12/2**0.5).astype(dtype), alm) alm[:,:ainfo.lmax].imag = 0 alm[:,:ainfo.lmax].real *= 2**0.5 del wps, ps12 # Return phi_alm and cmb_alm return alm[0], alm[1:], ainfo
[docs]def rand_map(shape, wcs, ps_lensinput, lmax=None, maplmax=None, dtype=np.float64, seed=None, phi_seed=None, spin=[0,2], output="l", geodesic=True, verbose=False, delta_theta=None): # Restrict to target number of components oshape = shape[-3:] if len(oshape) == 2: shape = (1,)+tuple(shape) ncomp = shape[-3] phi_alm, cmb_alm, ainfo = rand_alm(ps_lensinput=ps_lensinput, lmax=lmax, dtype=dtype, seed=seed, phi_seed=phi_seed, verbose=verbose, ncomp=ncomp) # Truncate alm if we want a smoother map. In taylens, it was necessary to truncate # to a lower lmax for the map than for phi, to avoid aliasing. The appropriate lmax # for the cmb was the one that fits the resolution. FIXME: Can't slice alm this way. #if maplmax: cmb_alm = cmb_alm[:,:maplmax] return lens_map_curved(shape=shape, wcs=wcs, phi_alm=phi_alm, cmb_alm=cmb_alm, phi_ainfo=ainfo, maplmax=maplmax, dtype=dtype, spin=spin, output=output, geodesic=geodesic, verbose=verbose, delta_theta=delta_theta)
[docs]def offset_by_grad(ipos, grad, geodesic=True, pol=None): """Given a set of coordinates ipos[{dec,ra},...] and a gradient grad[{ddec,dphi/cos(dec)},...] (as returned by curvedsky.alm2map(deriv=True)), returns opos = ipos + grad, while properly parallel transporting on the sphere. If geodesic=False is specified, then an much faster approximation is used, which is still very accurate unless one is close to the poles.""" ncomp = 2 if pol is False or pol is None and ipos.shape[0] <= 2 else 3 opos = np.empty((ncomp,)+ipos.shape[1:]) iflat = ipos.reshape(ipos.shape[0],-1) # Oflat is a flattened view of opos, so changes to oflat # are visible in our return value opos oflat = opos.reshape(opos.shape[0],-1) gflat = grad.reshape(grad.shape[0],-1) if geodesic: # Loop over chunks in order to conserve memory step = 0x100000 for i in range(0, iflat.shape[1], step): # The helper function assumes zenith coordinates small_grad = gflat[:,i:i+step].copy(); small_grad[0] = -small_grad[0] small_ipos = iflat[:,i:i+step].copy(); small_ipos[0] = np.pi/2-small_ipos[0] # Compute the offset position and polarization rotation, the latter in the # form of cos and sin of the polarization rotation angle small_opos, small_orot = offset_by_grad_helper(small_ipos, small_grad, ncomp>2) oflat[0,i:i+step] = np.pi/2 - small_opos[0] oflat[1,i:i+step] = small_opos[1] # Handle rotation if necessary if oflat.shape[0] > 2: # Recover angle from cos and sin oflat[2,i:i+step] = np.arctan2(small_orot[1],small_orot[0]) if iflat.shape[0] > 2: oflat[2,i:i+step] += iflat[2,i:i+step] else: oflat[0] = iflat[0] + gflat[0] oflat[1] = iflat[1] + gflat[1]/np.cos(iflat[0]) oflat[:2] = pole_wrap(oflat[:2]) if oflat.shape[0] > 2: oflat[2] = 0 return opos
[docs]def offset_by_grad_helper(ipos, grad, pol): """Find the new position and induced rotation from offseting the input positions ipos[2,nsamp] by grad[2,nsamp].""" grad = np.array(grad) # Decompose grad into direction and length. # Fix zero-length gradients first, to avoid # division by zero. grad[:,np.all(grad==0,0)] = 1e-20 d = np.sum(grad**2,0)**0.5 grad /=d # Perform the position offset using spherical # trigonometry cosd, sind = np.cos(d), np.sin(d) cost, sint = np.cos(ipos[0]), np.sin(ipos[0]) ocost = cosd*cost-sind*sint*grad[0] osint = (1-ocost**2)**0.5 ophi = ipos[1] + np.arcsin(sind*grad[1]/osint) if not pol: return np.array([np.arccos(ocost), ophi]), None # Compute the induced polarization rotation. with utils.nowarn(): # If sint is zero then we get a divide by zero here, but # the final result is still finite A = grad[1]/(sind*cost/sint+grad[0]*cosd) nom1 = grad[0]+grad[1]*A denom = 1+A**2 cosgam = 2*nom1**2/denom-1 singam = 2*nom1*(grad[1]-grad[0]*A)/denom res = np.array([np.arccos(ocost), ophi]), np.array([cosgam,singam]) return res
[docs]def pole_wrap(pos): """Handle pole wraparound.""" a = np.array(pos) bad = np.where(a[0] > np.pi/2) a[0,bad] = np.pi - a[0,bad] a[1,bad] = a[1,bad]+np.pi bad = np.where(a[0] < -np.pi/2) a[0,bad] = -np.pi - a[0,bad] a[1,bad] = a[1,bad]+np.pi return a