Source code for EMToolKit.EMToolKit

import copy, importlib, astropy, uuid, numpy as np
from sunpy.map import Map
from ndcube import NDCube, NDCubeSequence, NDCollection
from astropy.coordinates import SkyCoord
from astropy.nddata import StdDevUncertainty, UnknownUncertainty

from EMToolKit.schemas.basic_schemas import basic_source, basic_detector

from scipy.interpolate import interp1d
from scipy.integrate import trapezoid

def em_data(maps, errs, logts, tresps, channels=None):
    cubes = []
    for i in range(len(maps)):
        mapi = copy.copy(maps[i])
        if 'CHANNEL' not in mapi.meta:
            if channels is None:
                mapi.meta['CHANNEL'] = mapi.meta['DETECTOR'] + mapi.meta['WAVE_STR']
            else:
                mapi.meta['CHANNEL'] = channels[i]
        mapi.meta['LOGT'] = logts[i]
        mapi.meta['TRESP'] = tresps[i]
        mapi.meta['SHAPE'] = np.array(mapi.data.shape)
        if 'SCHEMA' not in mapi.meta:
            mapi.meta['SCHEMA'] = basic_detector(mapi.meta)
        cubes.append(NDCube(mapi, uncertainty=errs[i]))
    return NDCubeSequence(cubes)

[docs] def em_data(maps, errs, logts, tresps, channels=None): cubes = [] for i in range(len(maps)): mapi = copy.copy(maps[i]) if 'CHANNEL' not in mapi.meta: if channels is None: mapi.meta['CHANNEL'] = mapi.meta['DETECTOR'] + mapi.meta['WAVE_STR'] else: mapi.meta['CHANNEL'] = channels[i] mapi.meta['LOGT'] = logts[i] mapi.meta['TRESP'] = tresps[i] mapi.meta['SHAPE'] = np.array(mapi.data.shape) if 'SCHEMA' not in mapi.meta: mapi.meta['SCHEMA'] = basic_detector(mapi.meta) cubes.append(NDCube(mapi, uncertainty=errs[i])) return NDCubeSequence(cubes)
[docs] def dem_model(coeffs, logts, bases, coord_info, algorithm, wrapper, meta=None, wrapargs={}): nd = len(coeffs) dem_sequence = [] if isinstance(coord_info, astropy.wcs.wcs.WCS): wcs = coord_info else: meta = coord_info if not isinstance(coord_info, dict): print('Warning in EMToolKit dem_model: coord_info is not a wcs or dict') wcs = meta.get('wcs', None) if meta is None: if wcs is not None: meta = dict(wcs.to_header()) else: print('Warning in EMToolKit dem_model: need wcs or image meta') if 'LOGT' not in meta: meta['LOGT'] = logts[0] if 'SCHEMA' not in meta: meta['SCHEMA'] = basic_source([Map(coeffs[0], meta)]) for i in range(nd): logt0 = np.median(logts[i][np.where(bases[i] == np.max(bases[i]))]) metai = { 'BASIS': bases[i], 'LOGT': logts[i], 'LOGT0': logt0, 'ALGORITHM': algorithm, 'WRAPPER': wrapper, 'WRAPARGS': wrapargs, } dem_sequence.append(NDCube(coeffs[i], wcs=wcs, meta={**meta, **metai})) return NDCubeSequence(dem_sequence, meta={'ALGORITHM': algorithm, 'WRAPPER': wrapper, 'WRAPARGS': wrapargs}) if meta is None: if wcs is not None: meta = dict(wcs.to_header()) else: print('Warning in EMToolKit dem_model: need wcs or image meta') print(meta) if 'LOGT' not in meta: meta['LOGT'] = logts[0] if 'SCHEMA' not in meta: meta['SCHEMA'] = basic_source([Map(coeffs[0], meta)]) for i in range(nd): logt0 = np.median(logts[i][np.where(bases[i] == np.max(bases[i]))]) metai = { 'BASIS': bases[i], 'LOGT': logts[i], 'LOGT0': logt0, 'ALGORITHM': algorithm, 'WRAPPER': wrapper, 'WRAPARGS': wrapargs, } dem_sequence.append(NDCube(coeffs[i], wcs=wcs, meta={**meta, **metai})) return NDCubeSequence(dem_sequence, meta={'ALGORITHM': algorithm, 'WRAPPER': wrapper, 'WRAPARGS': wrapargs})
[docs] class em_collection: def __init__(self, datasequence): self.collection = NDCollection([("data", datasequence), ("models", [])]) self.precomputed_interpolations = None def data(self): return self.collection['data'] def __init__(self, datasequence): self.collection = NDCollection([("data", datasequence), ("models", [])]) self.precomputed_interpolations = None
[docs] def data(self): return self.collection['data']
[docs] def add_model(self, modelsequence): pairs = [(k, self.collection.get(k)) for k in self.collection.keys()] # Safely retrieve the algorithm name, with a fallback to 'algorithm' algorithm_name = modelsequence.meta.get('ALGORITHM', modelsequence.meta.get('algorithm')) pairs.append((algorithm_name, modelsequence)) # Clear and recreate the collection with the updated pairs self.collection.clear() self.collection = NDCollection(pairs) # Append the algorithm name to the models list if 'models' not in self.collection: self.collection['models'] = [] self.collection['models'].append(algorithm_name)
[docs] def precompute_interpolations(self): #TODO call this function at the correct time! """Precompute interpolation functions for all pixels.""" if self.precomputed_interpolations is None: self.precomputed_interpolations = {} for algorithm in self.collection['models']: if algorithm not in self.precomputed_interpolations.keys(): # print(f"Precomputing {algorithm}", end="\n") model = self.collection[algorithm] interpolations = [] for component in model: complogt = component.meta.get('LOGT', component.meta.get('logt')) compbasis = component.meta.get('BASIS', component.meta.get('basis')) interp_func = interp1d(complogt, compbasis, fill_value=0.0, bounds_error=False) interpolations.append(interp_func) self.precomputed_interpolations[algorithm] = interpolations
[docs] def compute_dem(self, i, j, logt=None, algorithm=None): if algorithm is None: algorithm = self.collection['models'][0] model = self.collection[algorithm] if logt is None: logt = model[0].meta.get('LOGT', model[0].meta.get('logt')) dem = np.zeros(logt.size) interpolations = self.precomputed_interpolations[algorithm] for component, interp_func in zip(model, interpolations): if j <= component.data.shape[1] and i <= component.data.shape[0]: dem += component.data[i, j] * interp_func(logt) return logt, dem
[docs] def compute_dem_all(self, logt=None, algorithm=None): if algorithm is None: algorithm = self.collection['models'][0] model = self.collection[algorithm] if logt is None: logt = model[0].meta.get('LOGT', model[0].meta.get('logt')) dem = np.zeros([model[0].data.shape[0], model[0].data.shape[1], logt.size]) interpolations = self.precomputed_interpolations[algorithm] for component, interp_func in zip(model, interpolations): dem += np.expand_dims(component.data, -1) * interp_func(logt) return logt, dem
def synthesize_data(self, logts, tresps, algorithm=None, channels=None, ilo=0, ihi=-1, jlo=0, jhi=-1, meta=None): if logts[0].size == 1: [logts, tresps] = [[logts], [tresps]] ndata = len(logts) if algorithm is None: algorithm = self.collection['models'][0] if channels is None: channels = ['SYNTHDATA' + str(i) for i in range(ndata)] model = self.collection[algorithm] [synthmaps, syntherrs] = [[], []] self.precompute_interpolations() for i in range(ndata): synthdata = np.zeros(model[0].data[ilo:ihi, jlo:jhi].shape) syntherrs.append(UnknownUncertainty(np.zeros(model[0].data.shape) - 1)) synthmap = copy.deepcopy(model[0])[ilo:ihi, jlo:jhi] synthmap.meta['NAXIS'] = 2 synthmap.meta['ALGORITHM'] = algorithm synthmap.meta['CHANNEL'] = channels[i] datainterp = interp1d(logts[i], tresps[i], fill_value=0.0, bounds_error=False) for ind, component in enumerate(model): basisinterp = self.precomputed_interpolations[algorithm][ind] #[model.index(component)] logt = np.unique(np.hstack([component.meta['LOGT'], logts[i]])) coupling = trapezoid(datainterp(logt) * basisinterp(logt), x=logt) synthdata += coupling * component.data[ilo:ihi, jlo:jhi] synthmap.data[:] = synthdata[:] synthmaps.append(synthmap)
[docs] def synthesize_data(self, logts, tresps, algorithm=None, channels=None, ilo=0, ihi=-1, jlo=0, jhi=-1, meta=None): if logts[0].size == 1: [logts, tresps] = [[logts], [tresps]] ndata = len(logts) if algorithm is None: algorithm = self.collection['models'][0] if channels is None: channels = ['SYNTHDATA' + str(i) for i in range(ndata)] model = self.collection[algorithm] [synthmaps, syntherrs] = [[], []] self.precompute_interpolations() for i in range(ndata): synthdata = np.zeros(model[0].data[ilo:ihi, jlo:jhi].shape) syntherrs.append(UnknownUncertainty(np.zeros(model[0].data.shape) - 1)) synthmap = copy.deepcopy(model[0])[ilo:ihi, jlo:jhi] synthmap.meta['NAXIS'] = 2 synthmap.meta['ALGORITHM'] = algorithm synthmap.meta['CHANNEL'] = channels[i] datainterp = interp1d(logts[i], tresps[i], fill_value=0.0, bounds_error=False) for ind, component in enumerate(model): basisinterp = self.precomputed_interpolations[algorithm][ind] #[model.index(component)] logt = np.unique(np.hstack([component.meta['LOGT'], logts[i]])) coupling = trapezoid(datainterp(logt) * basisinterp(logt), x=logt) synthdata += coupling * component.data[ilo:ihi, jlo:jhi] synthmap.data[:] = synthdata[:] synthmaps.append(synthmap) return em_data(synthmaps, syntherrs, logts, tresps, channels=channels) return em_data(synthmaps, syntherrs, logts, tresps, channels=channels)
[docs] def synthesize_map(self, map, logt=None, tresp=None, algorithm=None, channel=None): if 'CHANNEL' not in map.meta: if channel is None: map.meta['CHANNEL'] = map.meta['DETECTOR'] + map.meta['WAVE_STR'] else: map.meta['CHANNEL'] = channel if 'SCHEMA' not in map.meta: map.meta['LOGT'] = logt map.meta['TRESP'] = tresp map.meta['SHAPE'] = np.array(map.data.shape) map.meta['SCHEMA'] = basic_detector(map.meta) if algorithm is None: algorithm = self.collection['models'][0] source = self.collection[algorithm][0].meta['SCHEMA'] coeffs = np.array([data.data for data in self.collection[algorithm].data]).flatten() output_map = copy.deepcopy(map) output_map.meta['CHANNEL'] += '_SYNTHETIC' output_map.data[:] = (((map.meta['SCHEMA']).fwdop(source)) * coeffs).reshape(map.data.shape) return output_map
# Beta DEM uncertainty estimation code. First, a lengthy preamble: # The DEM forward problem is given by $M_j = \sum_j R_{ij}c_j$ with data $D_i$ # and associated uncertainties $\sigma_i$, where the DEM is determined by $c_j$ # e.g., as $\sum_j B_j(T)c_j$ and $R_{ij} = \int R_i(T)B_j(T)dT$ ($R_i$ being the # continuous response functions provided for the observations in question): solve # for $c_j$ minimizing $\chi^2=\sum_i\frac{(D_i-M_i)^2}{\sigma_i^2}$, while # satisfying constraints like $\vec{c} > 0$ and minizing some regularization # operator like $\sum{ij}c_i\Lambda_{ij}c_j$. # The uncertainty problem is similar: 'Given a solution for which $M_i=D_i$, # how much can the $c_j$ change (resulting in $M'_i = \sum_j R_{ij}(c_j + \Delta # c_j)$) before the resulting $\chi'^2$ is a nominal threshold value (usually # this is the number of data points, $n_\mathrm{data}$)? # This question has multiple answers depending on interpretation. Do we mean # 'how much can an individual $c_j$ change?', or all of them? Do they change # in the same way, randomly/incoherently, or do they change in such a way as # to counterbalance each other? The uncertainty will depend drastically on the # version of this question -- in the last version, it can easily be inifite or # undefined. If they all change in the same way, the resulting uncertainty is # fairly small. If only one is changing, then the uncertainty is larger (in # proportion to the number of $c_j$). Randomly falls between by the square # root of the number of parameters. We use the 'one at a time' estimate since # it is the most conservative of the ones that are straightforward to compute, # as a nod to the potential ill-posedness of the fully inqualified question. # Conveniently, this uncertainty estimate doesn't require knowledge of the # solution and is independent of it! It does, however, depend on the definition # of a basis element; we will assume temperature bins of width 0.1 in # $\log_{10}(T)$ (1 Dex), see below. # If $\Delta c_j$ is the only source of deviation from agreement (i.e, none of # the other c in the coefficient vector are changing) then $\chi'^2=n_{data}$ # implies that $\sum_i \frac{(R_{ij}\Delta c_j)^2}{\sigma_i^2} = n_\mathrm{data}$ # Therefore the uncertainty estimate is just # $\Delta c_j = \frac{\sqrt(n_\mathrm{data})}{\sqrt{\sum_i R_{ij}^2/\sigma_i^2}}$ # In terms of the original temperature response functions, # $R_{ij} = \int R_i(T) B_j(T) dT$. For purposes of this uncertainty estimate, # we take the basis functions to be top hats of width $\Delta logT=0.1$ at # temperature $T_j$. Therefore $R_{ij} = R_i(T_j)\Delta logT$ and the # uncertainty in a DEM component with that characteristic width is # $\sigma E_j = \frac{\sqrt(n_\mathrm{data}}{\sqrt{\sum_i (R_i(T_j) \Delta lotT)^2/\sigma_i^2}}$ # The input temperatures to this may have any spacing, irrespective of # $\Delta logT$. The output therefore is an independent uncertainty # estimate for an effective temperature bin of width $\Delta logT$ at that # temperature.
[docs] def estimate_uncertainty(self, logt, dlogt=0.1, project_map=None): data = self.data() if(project_map is None): project_map = data[0] output_cube = np.zeros([project_map.data.shape[0],project_map.data.shape[1],len(logt)]) for i in range(0,len(data)): dat_tresp = data[i].meta['TRESP'] dat_logt = data[i].meta['LOGT'] dati = copy.deepcopy(data[i]) dati.data[:] = dati.uncertainty.array tresp = np.interp(logt,dat_logt,dat_tresp,right=0,left=0) data_reproj = dati.reproject_to(project_map.wcs) for j in range(0,len(logt)): output_cube[:,:,j] += (tresp[j]*dlogt/data_reproj.data)**2 return np.sqrt(len(data)/output_cube)