import numpy as np, os, warnings
from . import enmap, utils, wcsutils
[docs]def zeros(tile_geom, dtype=np.float64):
"""Construct a zero-initialized TileMap with the given TileGeometry and data type"""
flat = np.zeros(tile_geom.pre + (np.sum(tile_geom.npixs[tile_geom.active]),), dtype)
return TileMap(flat, tile_geom.copy())
[docs]def empty(tile_geom, dtype=np.float64):
"""Construct a zero-initialized TileMap with the given TileGeometry and data type"""
flat = np.empty(tile_geom.pre + (np.sum(tile_geom.npixs[tile_geom.active]),), dtype)
return TileMap(flat, tile_geom.copy())
[docs]def full(tile_geom, val, dtype=np.float64):
"""Construct a zero-initialized TileMap with the given TileGeometry and data type"""
flat = np.full(tile_geom.pre + (np.sum(tile_geom.npixs[tile_geom.active]),), val, dtype)
return TileMap(flat, tile_geom.copy())
[docs]def from_tiles(tiles, tile_geom):
"""Construct a TileMap from a set of a full list of tiles, both active
and inactive. Inactive tiles are indicated with None entries. The active
information in tile_geom is ignored, as is the non-pixel part of tile_geom.shape,
which is instead inferred from the tiles."""
active_tiles = []
active = []
for gi, tile in enumerate(tiles):
if tile is None: continue
active_tiles.append(tile)
active.append(gi)
return from_active_tiles(active_tiles, tile_geom.copy(active=active))
[docs]def from_active_tiles(tiles, tile_geom):
"""Construct a TileMap from a list of active tiles that should match the
active list in the provided tile geometry. The non-pixel part of tile_geom
is ignored, and is instead inferred from the tile shapes."""
if len(tiles) != tile_geom.nactive:
raise ValueError("Wrong number of tiles passed. Expected %d but got %d" % (tile_geom.nactive, len(tiles)))
if len(tiles) == 0: return zeros(tile_geom)
data = np.concatenate([tile.reshape(tile.shape[:-2]+(-1,)) for tile in tiles],-1)
return TileMap(data, tile_geom.copy(pre=data.shape[:-1]))
[docs]class TileMap(np.ndarray):
"""Implements a sparse tiled map, as described by a TileGeometry. This is effectively
a large enmap that has been split into tiles, of which only a subset is stored. This
is implemented as a subclass of ndarray instead of a list of tiles to allow us to
transparently perform math operations on it. The maps are stored stored as a single
array with all tiles concatenated along a flattened pixel dimension, in the same
order as in tile_geom.active.
Example: A TileMap covering a logical area with shape (3,100,100) with (10,10) tiles
and active tiles [7,5] will have a shape of (3,200=10*10*2) when accessed directly.
When accessed through the .tiles view, .tiles[5] will return a view of an (3,10,10) enmap,
as will .tiles[7]. For all other indices, .tiles[x] will return None. The same
two tiles can be accessed as .active_tiles[1] and .active_tiles[0] respecitvely.
Slicing the TileMap using the [] operator works. For all but the last axis, this
does what you would expect. E.g. for the above example, tile_map[0].tiles[5] would
return a view of a (10,10) enmap (so the first axis is gone). If the last axis,
which represents a flattened view of the pixels and tiles, is sliced, then the
returned object will be a plain numpy array.
"""
def __new__(cls, arr, tile_geom):
"""Constructs a TileMap object given a raw array arr[...,totpix] and
a tuple of geometries."""
obj = np.asarray(arr).view(cls)
obj.geometry = tile_geom
return obj
def __array_finalize__(self, obj):
if obj is None: return
self.geometry = getattr(obj, "geometry", None)
def __repr__(self):
return "TileMap(%s,%s)" % (np.asarray(self), str(self.geometry))
def __str__(self): return repr(self)
def __array_wrap__(self, arr, context=None):
return TileMap(arr, self.geometry)
def __getitem__(self, sel):
# Split sel into normal and wcs parts.
sel1, sel2 = utils.split_slice(sel, [self.ndim-1,1])
if len(sel2) > 1:
raise IndexError("too many indices")
elif len(sel2) == 1:
# Degrade to plain array if we index the last, special pixel/tile axis
return np.ndarray.__getitem__(self, sel)
else:
res = np.ndarray.__getitem__(self, sel)
ogeo = self.geometry.copy()
ogeo.shape = res.shape[:-1]+self.geometry.shape[-2:]
return TileMap(res, ogeo)
def __getslice__(self, a, b=None, c=None): return self[slice(a,b,c)]
[docs] def contig(self): return TileMap(np.ascontiguousarray(self), self.geometry)
@property
def pre(self): return self.geometry.pre
@property
def ntile(self): return self.geometry.ntile
@property
def nactive(self): return self.geometry.nactive
[docs] def copy(self, order='K'):
return TileMap(np.copy(self,order), self.geometry.copy())
@property
def tiles(self):
return TileView(self, active=False)
@property
def active_tiles(self):
return TileView(self, active=True)
[docs] def with_tiles(self, other, strict=False):
"""If a and b are TileMaps with the same overall tiling but different
active tile sets, then c = a.with_tiles(b) will make c a TileMap
with the union of the active tiles of a and b and the data from a
(new tiles are zero-initialized).
If strict==True, then c will have exactly the active tiles of b,
in exactly that order. Binary operations on strictly compatible arrays
should be considerably faster."""
try: active = other.geometry.active
except AttributeError: active = _parse_active(other, self.ntile)
if np.all(active == self.geometry.active):
return self.copy()
# Construct the new geometry
if strict: new_geom = self.geometry.copy(active =active)
else: new_geom = self.geometry.copy(add_active=active)
# Construct the new array
res = zeros(new_geom, dtype=self.dtype)
# Copy over data. Not optimized
for gi in res.geometry.active:
ai = self.geometry.lookup[gi]
if ai >= 0: res.tiles[gi] = self.active_tiles[ai]
return res
# Forward some properties of TileGeometry
@property
def active(self): return self.geometry.active
@property
def lookup(self): return self.geometry.lookup
@property
def nactive(self): return self.geometry.nactive
@property
def ntile(self): return self.geometry.ntile
@property
def tile_shape(self): return self.geometry.tile_shape
# General methods
[docs] def insert(self, imap, op=lambda a,b:b): return insert(self, imap, op=op)
[docs]class TileView:
"""Helper class used to implement access to the individual tiles that make up a TileMap object"""
def __init__(self, tile_map, active=True):
self.tile_map = tile_map
self.active = active
self.offs = utils.cumsum(tile_map.geometry.npixs[tile_map.geometry.active], endpoint=True)
@property
def ndim(self): return self.tile_map.ndim+1 # ndim of logical full map
@property
def shape(self): return self.tile_map.geometry.shape # shape of logical full map
def __len__(self):
if self.active: return len(self.tile_map.geometry.active)
else: return self.tile_map.geometry.ntile
def __getitem__(self, sel):
"""Get a single tile or subset of a tile from the TileMap. The first
entry in the slice must be an integer - general slicing in the tile axis is not
supported, though it could be added. The rest of the indices can be anything an
enmap will accept."""
if isinstance(sel, int):
# Optimize common use case by avoiding slice decoding.
# Doesn't save much time, really. 0.1 ms when combined with the
# sel2 check below
i = sel
sel2 = ()
else:
sel1, sel2 = utils.split_slice(sel, [1,self.tile_map.ndim+2-1])
if len(sel1) == 0: return self.tile_map
i = sel1[0]
geo = self.tile_map.geometry
if self.active:
ai, gi = i, geo.active[i]
else:
ai, gi = geo.lookup[i], i
# Return None for inactive tiles since that's what so3g does.
# But consider raising an exception instead.
if ai < 0: return None
if ai < 0 or ai >= self.tile_map.nactive:
raise IndexError("Active tile index %d (global %d) is out of bounds for TileMap with %d active tiles" % (ai, gi, self.tile_map.nactive))
tile_info = geo.tiles[gi]
tile = enmap.ndmap(self.tile_map[...,self.offs[ai]:self.offs[ai+1]].reshape(self.tile_map.pre + tile_info.shape[-2:]), tile_info.wcs)
# Apply any slicing of the tile itself
if len(sel2) > 0: tile = tile[sel2]
return tile
def __setitem__(self, sel, val):
"""Set a single tile or subset of a tile to the given value, which can be a number
or a compatibly shaped array. The first entry in the slice must be an integer -
general slicing in the tile axis is not supported, though it could be added.
The rest of the indices can be anything an enmap will accept. This relies on
slicing not producing copies, and may fail if the TileMap is not contiguous."""
sel1, sel2 = utils.split_slice(sel, [1,self.tile_map.ndim+2-1])
if len(sel1) == 0: return self.tile_map
i = sel1[0]
geo = self.tile_map.geometry
if self.active:
ai, gi = i, geo.active[i]
else:
ai, gi = geo.lookup[i], i
# Return None for inactive tiles since that's what so3g does.
# But consider raising an exception instead.
if ai < 0: raise IndexError("Tile %d is not active" % gi)
# This assumes that the slicing and reshaping won't cause copies. This should be the case if
# self.tile_map is contiguous. To be robust I should detect if a copy would be made, and fall back
# on something slower in that case
tile_info = geo.tiles[gi]
enmap.ndmap(self.tile_map[...,self.offs[ai]:self.offs[ai+1]].reshape(self.tile_map.pre + tile_info.shape[-2:]), tile_info.wcs)[sel2] = val
def __iter__(self):
"""Iterator access. Faster than __getitem__ due to not having to resolve
the more complicated slicing it supports."""
geo = self.tile_map.geometry
if self.active: items = [(ai,geo.active[ai]) for ai in range(geo.nactive)]
else: items = [(geo.lookup[gi],gi) for gi in range(geo.ntile)]
for ai, gi in items:
if ai < 0:
yield None
else:
tile_info = geo.tiles[geo.active[ai]]
yield enmap.ndmap(self.tile_map[...,self.offs[ai]:self.offs[ai+1]].reshape(self.tile_map.pre + tile_info.shape[-2:]), tile_info.wcs)
# Math operations. These are automatically supported by virtue of being a numpy subclass,
# but the functions below add support for expanding the active tiles automatically when TileMaps
# with incompatible active tiles are combined.
[docs]def make_binop(op, is_inplace=False):
if isinstance(op, str):
op = getattr(np.ndarray, op)
def binop(self, other):
if isinstance(other, TileMap): # could be replaced with a try statement for duck typing
comp = self.geometry.compatible(other.geometry)
if comp == 0:
# Not compatible
raise ValueError("operands could not be broadcast together with geometries %s and %s" % (str(self.geometry), str(other.geometry)))
elif comp == 1:
# Loosely compatible. Requires manual work. Slow, both due to looping and
# poor memory localization.
if is_inplace:
# We can't change the shape of the output array for an in-place operation
if np.any(self.geometry.lookup[other.geometry.active]<0):
raise ValueError("Cannot expand TileMap with geometry %s to include active tiles in geometry %s in in-place operation" % (str(self.geometry), str(other.geometry)))
opre = utils.broadcast_shape(self.pre, other.pre)
if opre != self.pre:
raise ValueError("operands could not be broadcast together with geometries %s and %s" % (str(self.geometry), str(other.geometry)))
for gi in other.geometry.active:
self.tiles[gi] = op(self.tiles[gi], other.tiles[gi])
return self
else:
# Not in-place, so we have more flexibility. First build the output map
oact = np.unique(np.concatenate([self.geometry.active, other.geometry.active]))
otype= np.result_type(self.dtype, other.dtype)
oshape = utils.broadcast_shape(self.pre, other.pre) + self.geometry.shape[-2:]
ogeo = geometry(oshape, self.geometry.wcs, tile_shape=self.geometry.tile_shape, active=oact)
out = zeros(ogeo, otype)
# Copy over our old values
for gi in self.geometry.active:
out.tiles[gi] = self.tiles[gi]
# Then update with valus from other
for gi in other.geometry.active:
out.tiles[gi] = op(out.tiles[gi], other.tiles[gi])
return out
else:
# Fully compatible. Handle outside
pass
# Handle fully compatible or plain array. Fast.
out = op(self, other)
out = TileMap(out, self.geometry.copy(pre=out.shape[:-1]))
return out
return binop
for op in ["__add__", "__sub__", "__mul__", "__pow__", "__truediv__", "__floordiv__",
"__lt__", "__le__", "__eq__", "__ne__", "__ge__", "__gt__",
"__and__", "__or__", "__xor__", "__lshift__", "__rshift__"]:
setattr(TileMap, op, make_binop(op))
for op in ["__iadd__", "__isub__", "__imul__", "__ipow__", "__itruediv__", "__ifloordiv__",
"__iand__", "__ior__", "__ixor__", "__ilshift__", "__irshift__"]:
setattr(TileMap, op, make_binop(op, is_inplace=True))
[docs]def insert(omap, imap, op=lambda a,b:b):
"""Insert imap into omap, returning the result. Equivalent to enmap.insert, but with the
following important differences:
* omap is not modified. Use the result is returned. (enmap both modifies and returns)
* The maps must have the same geometry, only differing by the active tiles.
This may be generalized in the future."""
binop = make_binop(op)
return binop(omap, imap)
[docs]def map_mul(mat, vec):
"""Elementwise matrix multiplication mat*vec. Result will have
the same shape as vec. Multiplication happens along the last non-pixel
indices."""
# Allow scalar product, broadcasting if necessary
mat = np.asanyarray(mat)
if mat.ndim <= 2: return mat*vec
# Otherwise we do a matrix product along the last axes
ovec = samegeo(np.einsum("...abi,...bi->...ai", mat, vec), mat, vec)
return ovec
[docs]def samegeo(arr, *args):
"""Returns arr with the same geometry information as the first tilemap among
args. If no matches are found, arr is returned as is. Will
reference, rather than copy, the underlying array data
whenever possible.
"""
for m in args:
try: return TileMap(arr, m.geometry.copy(pre=arr.shape[:-1]))
except AttributeError: pass
return arr
########################################
############ Geometry stuff ############
########################################
[docs]def geometry(shape, wcs, tile_shape=(500,500), active=[]):
"""TileGeometry constructor.
shape, wcs: The enmap geometry of the full space the tiling covers.
tile_shape: The (ny,nx) vertical and horizontal tile shape in pixels.
active: The list of active tile indices."""
shape = tuple(shape)
wcs = wcs
tile_shape = tuple(np.zeros(2,int)+tile_shape) # broadcast to len(2)
# Get the grid shape by rounding up to include any partial tiles at the edge
grid_shape = tuple([(s+ts-1)//ts for s,ts in zip(shape[-2:], tile_shape[-2:])])
ntile = grid_shape[0]*grid_shape[1]
# Compute all the individual tile shapes. Should maybe be done on-the-fly
tile_shapes= np.zeros(grid_shape + (2,), int)
tile_shapes[:, :] = tile_shape
tile_shapes[-1,:,0] = np.minimum(tile_shape[-2], (shape[-2]-1)%tile_shape[-2]+1)
tile_shapes[:,-1,1] = np.minimum(tile_shape[-1], (shape[-1]-1)%tile_shape[-1]+1)
tile_shapes = tile_shapes.reshape(-1,2)
# Number of pixels per tile
npixs = tile_shapes[:,0]*tile_shapes[:,1]
# And the active tile list and reverse lookup
active = np.array(active,int)
lookup = np.full(ntile,-1,int)
lookup[active] = np.arange(len(active))
# Call the raw constructor
return TileGeometry(shape, wcs, tile_shape, grid_shape, tile_shapes, npixs, active, lookup)
[docs]class TileGeometry:
def __init__(self, shape, wcs, tile_shape, grid_shape, tile_shapes, npixs, active, lookup):
"""Raw constructor for a TileGeometry. You normally don't want to use this. Use
tilemap.geometry() instead."""
self.shape = shape
self.wcs = wcs
self.tile_shape = tile_shape
self.grid_shape = grid_shape
self.ntile = self.grid_shape[0]*self.grid_shape[1]
self.tile_shapes= tile_shapes
self.npixs = npixs
self.active = active
self.lookup = lookup
[docs] def grid2ind(self, ty, tx):
"""Get the index of the tile wiht grid coordinates ty,tx in the full tiling"""
return ty*self.grid_shape[1]+tx
[docs] def ind2grid(self, i):
"""Get the tile grid coordinates ty, tx for tile #i in the full tiling"""
nx = self.grid_shape[-1]
return i//nx, i%nx
[docs] def copy(self, pre=None, active=None, add_active=None):
if pre is not None: shape = tuple(pre) + self.shape[-2:]
else: shape = self.shape
_active = self.active.copy()
lookup = self.lookup.copy()
if active is not None or add_active is not None:
# Allow us to override these, which will require recalculation of lookup
if active is not None:
_active = _parse_active(active, self.ntile)
lookup = np.full(self.ntile,-1,int)
lookup[_active] = np.arange(len(_active))
if add_active is not None:
add_active = _parse_active(add_active, self.ntile)
_active = np.concatenate([_active, add_active[lookup[add_active]<0]])
lookup[_active] = np.arange(len(_active))
return TileGeometry(shape, self.wcs, self.tile_shape, self.grid_shape, self.tile_shapes.copy(), self.npixs.copy(), _active, lookup)
@property
def pre(self): return self.shape[:-2]
@property
def nactive(self): return len(self.active)
@property
def size(self): return np.product(self.pre)*np.sum(self.npixs[self.active])
@property
def tiles(self):
"""Allow us to get the enmap geometry of tile #i by writing
tile_geom.tiles[i]"""
return _TileGeomHelper(self)
def __repr__(self): return "TileGeometry(%s, %s, tile_shape=%s, active=%s)" % (str(self.shape), str(self.wcs), str(self.tile_shape), str(self.active))
[docs] def compatible(self, other):
"""Return our compatibility with binary operations with other.
The return value can be 2, 1 or 0:
2: Strictly compatible. Both the logical geometry (shape, wcs), tile shape
and active tiles match. This allows for direct numpy operations without
any manual looping over tiles.
1: Loosely compatible. The logical and tile geometry match, but not the active tiles.
0. Not compatible."""
if tuple(self.shape[-2:]) != tuple(other.shape[-2:]): return 0
if tuple(self.tile_shape) != tuple(other.tile_shape): return 0
if self.nactive == other.nactive and np.all(self.active == other.active): return 2
else: return 1
class _TileGeomHelper:
def __init__(self, tile_geom):
self.tile_geom = tile_geom
def __getitem__(self, i):
"""Return the geometry of index #i in the full tiling"""
g = self.tile_geom
ty, tx = g.ind2grid(i)
y1 = ty*g.tile_shape[-2]
x1 = tx*g.tile_shape[-1]
y2 = min(y1+g.tile_shape[-2], g.shape[-2])
x2 = min(x1+g.tile_shape[-1], g.shape[-1])
return enmap.Geometry(g.shape, g.wcs)[...,y1:y2,x1:x2]
def _parse_active(active, ntile):
if utils.streq(active, "all"): return np.arange(ntile,dtype=int)
else: return np.asarray(active,int)
[docs]def to_enmap(tile_map):
omap = enmap.zeros(tile_map.geometry.shape, tile_map.geometry.wcs, tile_map.dtype)
for ai, tile in enumerate(tile_map.active_tiles):
gi = tile_map.active[ai]
gy, gx = tile_map.geometry.ind2grid(gi)
th, tw = tile_map.geometry.tile_shape
y1 = gy*th
y2 = min((gy+1)*th, omap.shape[-2])
x1 = gx*tw
x2 = min((gx+1)*tw, omap.shape[-1])
omap[...,y1:y2,x1:x2] = tile
return omap
############################################
########## Distributed TileMaps ############
############################################
# The main purpose of a TileMap is to spead the data of a huge map across many mpi tasks.
[docs]def redistribute(imap, comm, active=None, omap=None):
"""Redistirbute the data in the mpi-distributed tiles in imap into the
active tiles in omap, using the given communicator. If a tile is active in
multiple tasks in imap, it will be reduced. If it is active in multiple tiles in
omap, it will be duplicated."""
# 1. Who owns what?
iactive = np.zeros(imap.ntile,bool); iactive[imap.active]=True
iactive_all = utils.allgather(iactive, comm) # [ntask,ntile]
# 2. Who should own what? Determine automatically if not given
if omap is None:
if active is None:
iactive_any = np.nonzero(np.any(iactive_all,0))[0]
active = np.array_split(iactive_any, comm.size)[comm.rank]
omap = zeros(imap.geometry.copy(active=active), dtype=imap.dtype)
oactive = np.zeros(omap.ntile,bool); oactive[omap.active]=True
oactive_all = utils.allgather(oactive, comm) # [ntask,ntile]
# 3. Figure out who I should send and receive each of my tiles to/from
omask = oactive_all[:,imap.active] # [ntasks,iactive]
imask = iactive_all[:,omap.active] # [ntasks,oactive]
isizes = imap.geometry.npixs[imap.active] * np.product(imap.pre).astype(int)
osizes = omap.geometry.npixs[omap.active] * np.product(omap.pre).astype(int)
ioffs = utils.cumsum(isizes)
ooffs = utils.cumsum(osizes)
# 4. Build our alltoallv send info
iflat = np.ascontiguousarray(imap.T).reshape(-1)
send_sizes= np.sum(omask*isizes,1)
send_offs = utils.cumsum(send_sizes)
send_buf = [iflat[ioffs[iact]:ioffs[iact]+isizes[iact]] for rank, iact in np.argwhere(omask)]
send_buf = np.concatenate(send_buf, dtype=iflat.dtype) if len(send_buf) > 0 else np.zeros(0, iflat.dtype)
# 5. Build the alltoallv receive info
recv_sizes= np.sum(imask*osizes,1)
recv_offs = utils.cumsum(recv_sizes)
recv_buf = np.zeros(np.sum(recv_sizes), omap.dtype)
# 6. Perform the actual communication
comm.Alltoallv((send_buf, (send_sizes, send_offs)), (recv_buf, (recv_sizes, recv_offs)))
del iflat, send_buf
# 7. Copy and reduce into flattened output tiles
oflat = np.zeros(omap.size, omap.dtype)
rbuf_off = 0
for rank, oact in np.argwhere(imask):
oflat[ooffs[oact]:ooffs[oact]+osizes[oact]] += recv_buf[rbuf_off:rbuf_off+osizes[oact]]
rbuf_off += osizes[oact]
del recv_buf
# 8. And move data into our actual output map
omap[:] = oflat.reshape(omap.shape[::-1]).T
return omap
[docs]def tree_reduce(imap, comm, plan=None):
"""Given a tilemap imap that's distributed over the communicator comm,
and where each tile is potentially present in multiple tasks, sum the
duplicate tiles and assign them to a single task, such that in the end
each tile is present in at most one task. Exactly which tiles end up in
which tasks is determined automatically but deterministically based on the
tile ownership pattern in imap."""
from map_reduce import distlib
if plan is None:
plan = distlib.Logistics(imap.active, comm)
work = [None if tile is None else tile.copy() for tile in imap.tiles]
for gi, sender, receiver in plan.ops:
if comm.rank == sender:
comm.Send(work[gi], dest=receiver)
work[gi] = None
elif comm.rank == receiver:
tile = np.zeros_like(work[gi])
comm.Recv(tile, source=sender)
work[gi] += tile
omap = from_tiles(work, imap.geometry)
return omap
[docs]def get_active_distributed(tile_map, comm):
# Figure out which tiles are owned by anybody
iactive = np.zeros(tile_map.ntile,int); iactive[tile_map.active]=1
iactive = utils.allreduce(iactive, comm)
return np.nonzero(iactive)[0]
[docs]def reduce(tile_map, comm, root=0):
"""Given a distributed TileMap tile_map, collect all the tiles
on the task with rank root (default is rank 0), and return it.
Multiply owned tiles are reduced. Returns a TileMap with no
active tiles for other tasks than root."""
active_distributed = get_active_distributed(tile_map, comm)
active = active_distributed if comm.rank == root else []
return redistribute(tile_map, comm, active)
[docs]def write_map(fname, tile_map, comm):
"""Write a distributed tile_map to disk as a single enmap.
Collects all the data on a single task before writing."""
omap = reduce(tile_map, comm)
if comm.rank == 0:
omap = to_enmap(omap)
enmap.write_map(fname, omap, allow_modify=True)