Source code for muse_origin.steps

import inspect
import itertools
import logging
import os
import shutil
import time
import warnings
from collections import OrderedDict
from datetime import datetime
from enum import Enum

import numpy as np
from astropy.io import fits
from astropy.table import Column, Table, vstack
from mpdaf.obj import Cube, Image, Spectrum
from scipy import ndimage as ndi
from scipy.spatial import cKDTree

from .lib_origin import (
    Compute_GreedyPCA_area,
    Compute_PCA_threshold,
    Compute_threshold_purity,
    Correlation_GLR_test,
    O2test,
    add_tglr_stat,
    area_growing,
    area_segmentation_convex_fusion,
    area_segmentation_final,
    area_segmentation_sources_fusion,
    area_segmentation_square_fusion,
    compute_local_max,
    compute_segmap_gauss,
    create_masks,
    dct_residual,
    estimation_line,
    merge_similar_lines,
    phot_deblend_sources,
    purity_estimation,
    spatiospectral_merging,
    unique_sources,
)

__all__ = (
    'Preprocessing',
    'CreateAreas',
    'ComputePCAThreshold',
    'ComputeGreedyPCA',
    'ComputeTGLR',
    'ComputePurityThreshold',
    'Detection',
    'ComputeSpectra',
    'CleanResults',
    'CreateMasks',
    'SaveSources',
    'Status',
    'Step',
    'STEPS',
)


def _format_cat(cat):
    columns = {
        '.1f': ('flux',),
        '.2f': ('lbda', 'T_GLR', 'STD'),
        '.3f': ('ra', 'dec', 'residual', 'purity'),
    }
    for fmt, colnames in columns.items():
        for name in colnames:
            if name in cat.colnames:
                cat[name].format = fmt
                if name in ('STD', 'T_GLR') and np.ma.is_masked(cat[name]):
                    cat[name].fill_value = np.nan
    return cat


def save_spectra(spectra, outname):
    """ The spectra are saved to a FITS file with two extension per
    spectrum, a DATA<ID> one and a STAT<ID> one.
    """
    hdulist = []
    for spec_id, sp in spectra.items():
        hdu = sp.get_data_hdu(name='DATA%d' % spec_id, savemask='nan')
        hdulist.append(hdu)
        hdu = sp.get_stat_hdu(name='STAT%d' % spec_id)
        if hdu is not None:
            hdulist.append(hdu)

    hdulist = fits.HDUList([fits.PrimaryHDU()] + hdulist)
    hdulist.writeto(outname, overwrite=True)


def load_spectra(filename):
    spectra = OrderedDict()
    with fits.open(filename) as hdul:
        for i in range(1, len(hdul), 2):
            spec_id = int(hdul[i].name[4:])
            spectra[spec_id] = Spectrum(hdulist=hdul, ext=(i, i + 1))
    return spectra


class LogMixin:
    def _logdebug(self, *args):
        self.logger.debug(*args)

    def _loginfo(self, *args):
        self.logger.info(*args)

    def _logwarning(self, *args):
        self.logger.warning(*args)


[docs]class Status(Enum): """Step processing status.""" NOTRUN = 'not run yet' RUN = 'run' DUMPED = 'dumped outputs' FAILED = 'failed'
class DataObj: """Descriptor used to load the data on demand. The "kind" of data is stored (cube, image, array, etc.), and used to load the data is the attribute is accessed. This requires that the path of the file associated to the data is set to its value. """ def __init__(self, kind): # Note: self.label is set by the metaclass self.kind = kind def __get__(self, obj, owner=None): if obj is None: return try: val = obj.__dict__[self.label] except KeyError: return if isinstance(val, str): if os.path.isfile(val): kind = self.kind if kind == 'cube': val = Cube(val) if kind == 'image': val = Image(val) elif kind == 'table': val = _format_cat(Table.read(val)) elif kind == 'array': val = np.loadtxt(val, ndmin=1) elif kind == 'spectra': val = load_spectra(val) obj.__dict__[self.label] = val else: val = None return val def __set__(self, obj, val): obj.__dict__[self.label] = val class StepMeta(type): """Metaclass used to manage DataObj descriptors. The metaclass does two things: - It sets the "label" attribute on the descriptors, which allow to use ``cube_std = DataObj('cube')`` instead of ``cube_std = DataObj('cube_std', 'cube')`` if the label must be set manually. - It stores a list of all the descriptors in a ``_dataobjs`` attribute on the instance. """ def __new__(cls, name, bases, attrs): descr = [] for n, inst in attrs.items(): if isinstance(inst, DataObj): inst.label = n descr.append((n, inst.kind)) attrs['_dataobjs'] = descr return super().__new__(cls, name, bases, attrs)
[docs]class Step(LogMixin, metaclass=StepMeta): """Define a processing step. Parameters ---------- orig : `muse_origin.ORIGIN` The ORIGIN instance. idx : int The step number param : dict Dictionary of parameters for the step. """ name = None """Name of the function to run the step.""" desc = None """Description of the step.""" require = None """List of required steps (that must be run before).""" def __init__(self, orig, idx, param): self.logger = logging.getLogger(__name__) self.orig = orig self.idx = idx self.method_name = 'step%02d_%s' % (idx, self.name) # when a session is reloaded, use its param dict (don't overwrite it) self.meta = param.setdefault(self.name, {}) self.meta.setdefault('stepidx', idx) self.param = self.meta.setdefault('params', {}) def __repr__(self): return 'Step {:02d}: <{}(status: {})>'.format( self.idx, self.__class__.__name__, self.status.name ) @property def status(self): """Processing status (`muse_origin.Status`): - NOTRUN: The step has not been run. - RUN: The step has been run but not saved. - DUMP: The step has been run and its outputs saved to disk. - FAILED: The step has been run but it failed. """ return self.meta.get('status', Status.NOTRUN) @status.setter def status(self, val): self.meta['status'] = val
[docs] def __call__(self, *args, **kwargs): """Run a step, calling its ``run`` method. This method is the one that is called from the ORIGIN object. It calls the ``run`` method of the step and also does a few additional things like storing the parameters, execution time and date. """ t0 = time.time() self._loginfo('Step %02d - %s', self.idx, self.desc) # Use the step's signature to 1) store the parameter values in the # `self.param` dict and 2) to print the default and actual values. sig = inspect.signature(self.run) for name, p in sig.parameters.items(): if name == 'orig': # hide the orig param continue annotation = (' - ' + p.annotation) if p.annotation is not p.empty else '' default = p.default if p.default is not p.empty else '' msg = ' - %s = %r (default: %r)%s' self._logdebug(msg, name, kwargs.get(name, ''), default, annotation) self.param[name] = kwargs.get(name, p.default) # Manage dependencies between steps if self.require is not None: for req in self.require: step = self.orig.steps[req] if step.status not in (Status.RUN, Status.DUMPED): raise RuntimeError(f'step {step.idx:02d} must be run before') try: self.run(self.orig, *args, **kwargs) except Exception: self.status = Status.FAILED raise else: self.status = Status.RUN self.meta['runtime'] = tot = time.time() - t0 self.meta['execution_date'] = datetime.now().isoformat() self._loginfo('%02d Done - %.2f sec.', self.idx, tot)
[docs] def store_cube(self, name, data, **kwargs): """Create a MPDAF Cube and store it as an attribute.""" cube = Cube( data=data, wave=self.orig.wave, wcs=self.orig.wcs, mask=np.ma.nomask, copy=False, **kwargs, ) setattr(self, name, cube)
[docs] def store_image(self, name, data, **kwargs): """Create a MPDAF Image and store it as an attribute.""" im = Image(data=data, wcs=self.orig.wcs, copy=False, **kwargs) setattr(self, name, im)
[docs] def dump(self, outpath): """Save the attributes that have been created by the steps, and unload them to free memory. """ if self.status is not Status.RUN: # If the step was not run, there is nothing to dump return self.logger.debug('%s - DUMP', self.method_name) for name, kind in self._dataobjs: obj = getattr(self, name) if obj is not None: ext = 'txt' if kind == 'array' else 'fits' outf = f'{outpath}/{name}.{ext}' self.logger.debug(' - %s [%s]', name, kind) if kind in ('cube', 'image'): try: obj.write(outf, convert_float32=False) except TypeError: warnings.warn( 'MPDAF version too old to support the new type ' 'conversion parameter, data will be saved as float32.' ) obj.write(outf) elif kind in ('table',): obj.write(outf, overwrite=True) elif kind in ('array',): np.savetxt(outf, obj) elif kind in ('spectra',): save_spectra(obj, outf) # now set the value to the path of the written file, which # frees memory, and allow the object to reload automatically # the data if needed. setattr(self, name, outf) self.status = Status.DUMPED
[docs] def load(self, outpath): """Recreate attributes of a step, not really loading them as just the file is set, and files are loaded in memory only if needed. """ if self.status is not Status.DUMPED: # If the step was not dumped previously, there is nothing to load return self.logger.debug('%s - LOAD', self.method_name) for name, kind in self._dataobjs: # we just set the paths here, the data will be loaded only if # the attribute is accessed. ext = 'txt' if kind == 'array' else 'fits' setattr(self, name, f'{outpath}/{name}.{ext}')
[docs]class Preprocessing(Step): """ Preparation of the data for the following steps: - Continuum subtraction with a DCT filter (the continuum cube is stored in ``cube_dct``) - Standardization of the data (stored in ``cube_std``). - Computation of the local maxima and minima of the std cube (``cube_std_local_max`` and ``cube_std_local_min``). - Segmentation based on the continuum (``segmap_cont``). - Segmentation based on the residual image (``ima_std``), merged with the previous one which gives ``segmap_merged``. Parameters ---------- dct_order : int The number of atom to keep for the DCT decomposition, defaults to 10. dct_approx : bool If True, the DCT computation does not take the variance into account for the computation of the DCT coefficients. Defaults to False. pfasegcont : float PFA for the segmentation based on the continuum, defaults to 0.01 pfasegres : float PFA for the segmentation based on the residual, defaults to 0.01 local_max_size : int Connectivity of contiguous voxels per axis, for the maximum filter, defaults to 3. bins : str Method for computing bins for the segmentation of the continuum and of the residual images (see `numpy.histogram_bin_edges`), defaults to 'fd'. Returns ------- self.cube_std : `~mpdaf.obj.Cube` Standardized data for the PCA. self.cont_dct : `~mpdaf.obj.Cube` Continuum estimated with a DCT. self.ima_std : `~mpdaf.obj.Image` White-light image of standardized data cube. self.ima_dct : `~mpdaf.obj.Image` White-light image of DCT continuum cube. self.cube_std_local_max : `~mpdaf.obj.Cube` Local maxima from ``cube_std``. self.cube_std_local_min : `~mpdaf.obj.Cube` Local maxima from minus ``cube_std``. self.segmap_cont : `~mpdaf.obj.Image` Segmentation map computed on the white-light image. self.segmap_merged : `~mpdaf.obj.Image` Segmentation map merged with the cont one and another one computed on the residual. """ name = 'preprocessing' desc = 'Preprocessing' cube_std = DataObj('cube') cont_dct = DataObj('cube') ima_std = DataObj('image') ima_dct = DataObj('image') segmap_cont = DataObj('image') segmap_merged = DataObj('image') cube_std_local_min = DataObj('cube') cube_std_local_max = DataObj('cube')
[docs] def run( self, orig, dct_order=10, dct_approx=False, pfasegcont=0.01, pfasegres=0.01, local_max_size=3, bins='fd', ): self._loginfo('DCT computation') cont_dct = dct_residual( orig.cube_raw, dct_order, orig.var, dct_approx, orig.mask ) data = orig.cube_raw - cont_dct # compute faint_dct data[orig.mask] = np.nan # compute standardized data self._loginfo('Data standardizing') std = np.sqrt(orig.var) cont_dct /= std mean = np.nanmean(data, axis=(1, 2)) # orig.var[orig.mask] = np.inf data -= mean[:, np.newaxis, np.newaxis] data /= std data[orig.mask] = 0 self._loginfo('Std signal saved in self.cube_std and self.ima_std') self.store_cube('cube_std', data) self.store_image('ima_std', data.mean(axis=0)) self._loginfo('Compute local maximum of std cube values') cube_local_max, cube_local_min = compute_local_max( data, data, orig.mask, local_max_size ) self._loginfo( 'Save self.cube_local_max/self.cube_local_min from max/min correlations' ) self.store_cube('cube_std_local_max', cube_local_max) self.store_cube('cube_std_local_min', cube_local_min) self._loginfo('DCT continuum saved in self.cont_dct and self.ima_dct') cont_dct = cont_dct.astype(np.float32) self.store_cube('cont_dct', cont_dct) self.store_image('ima_dct', cont_dct.mean(axis=0)) mean_fwhm = int(np.ceil(np.mean(self.orig.FWHM_PSF))) self._loginfo('Segmentation based on the continuum') # Test statistic for each spaxels - use log here as it gives a more # gaussian distribution map1 = np.log10(np.sum(cont_dct ** 2, axis=0)) thresh, map_cont = compute_segmap_gauss(map1, pfasegcont, mean_fwhm, bins=bins) self._loginfo( 'Found %d regions, threshold=%.2f', len(np.unique(map_cont)) - 1, thresh ) self.store_image('segmap_cont', map_cont) self._loginfo('Segmentation based on the residual') map2 = O2test(data) thresh, map_res = compute_segmap_gauss(map2, pfasegres, mean_fwhm, bins=bins) self._loginfo( 'Found %d regions, threshold=%.2f', len(np.unique(map_res)) - 1, thresh ) self._loginfo('Merging both maps') segmap, nlabels = ndi.label((map_cont > 0) | (map_res > 0)) self._loginfo('Segmap saved in self.segmap_merged (%d regions)', nlabels) self.store_image('segmap_merged', segmap)
[docs]class CreateAreas(Step): """ Creation of areas to split the work. This allows to split the cube into sub-cubes to distribute the following steps on multiple processes. The merged segmap computed previously is used to avoid cutting objects. Parameters ---------- pfa : float PFA of the segmentation test to estimates sources with strong continuum, defaults to 0.2 minsize : int Length in pixel of the side of typical area wanted enough big area to satisfy the PCA, defaults to 100. maxsize : int Length in pixel of the side of maximum area wanted, defaults to None. Returns ------- self.nbAreas : int Number of areas. self.areamap : `~mpdaf.obj.Image` The map of areas. """ name = 'areas' desc = 'Areas creation' areamap = DataObj('image')
[docs] def run(self, orig, pfa=0.2, minsize=100, maxsize=None): nexpmap = (np.sum(~orig.mask, axis=0) > 0).astype(np.int) NbSubcube = np.maximum(1, int(np.sqrt(np.sum(nexpmap) / (minsize ** 2)))) if NbSubcube > 1: if maxsize is None: maxsize = minsize * 2 MinSize = minsize ** 2 MaxSize = maxsize ** 2 self._loginfo('First segmentation of %d^2 square', NbSubcube) self._logdebug('Squares segmentation and fusion') square_cut_fus = area_segmentation_square_fusion( nexpmap, MinSize, MaxSize, NbSubcube, orig.Ny, orig.Nx ) self._logdebug('Sources fusion') square_src_fus, src = area_segmentation_sources_fusion( orig.segmap_merged._data, square_cut_fus, pfa, orig.Ny, orig.Nx ) self._logdebug('Convex envelope') convex_lab = area_segmentation_convex_fusion(square_src_fus, src) self._logdebug('Areas dilation') Grown_label = area_growing(convex_lab, nexpmap) self._logdebug('Fusion of small area') self._logdebug('Minimum Size: %d px', MinSize) self._logdebug('Maximum Size: %d px', MaxSize) areamap = area_segmentation_final(Grown_label, MinSize, MaxSize) elif NbSubcube == 1: areamap = nexpmap areamap = areamap.astype(int) labels = np.unique(areamap) if 0 in labels: # expmap=0 nbAreas = len(labels) - 1 else: nbAreas = len(labels) orig.param['nbareas'] = nbAreas self.store_image('areamap', areamap) self._loginfo('Save the map of areas in self.areamap') self._loginfo('%d areas generated', nbAreas)
[docs]class ComputePCAThreshold(Step): """ Loop on each area and estimate the threshold for the PCA. Parameters ---------- pfa_test : float Threshold of the test (default=0.01). Returns ------- self.testO2 : list of arrays (one per PCA area) Result of the O2 test. self.histO2 : lists of arrays (one per PCA area) PCA histogram. self.binO2 : lists of arrays (one per PCA area) bin for the PCA histogram. self.thresO2 : list of float For each area, threshold value. self.meaO2 : list of float Location parameter of the Gaussian fit used to estimate the threshold. self.stdO2 : list of float Scale parameter of the Gaussian fit used to estimate the threshold. """ name = 'compute_PCA_threshold' desc = 'PCA threshold computation' thresO2 = DataObj('array') meaO2 = DataObj('array') stdO2 = DataObj('array') # FIXME: test02, histO2 and binO2 are lists of arrays with variable # sizes so they cannot be managed by the Step class currently # testO2 = DataObj('array') # histO2 = DataObj('array') # binO2 = DataObj('array') require = ('preprocessing', 'areas')
[docs] def run(self, orig, pfa_test=0.01): results = [] for area_ind in range(1, orig.nbAreas + 1): # limits of each spatial zone ksel = orig.areamap._data == area_ind # Data in this spatio-spectral zone cube_temp = orig.cube_std._data[:, ksel] res = Compute_PCA_threshold(cube_temp, pfa_test) results.append(res) msg = 'Area %d, estimation mean/std/threshold: %f/%f/%f' self._loginfo(msg, area_ind, res[4], res[5], res[3]) ( orig.testO2, orig.histO2, orig.binO2, self.thresO2, self.meaO2, self.stdO2, ) = zip(*results)
[docs]class ComputeGreedyPCA(Step): """ Loop on each area and compute the greedy PCA. The O2 test and the thresholds (``threshold_list``) define the part of the each zone of the cube to segment in nuisance and background. A part of the background part (``1 / Noise_population %``) is used to compute a mean background, a signature. The Nuisance part is orthogonalized to this signature in order to not loose this part during the greedy process. SVD is performed on nuisance in order to modelized the nuisance part and the principal eigen vector, only one, is used to perform the projection of the whole set of data: Nuisance and background. The Nuisance spectra which satisfied the test are updated in the background computation and the background is so cleaned from sources signature. The iteration stop when all the spectra satisfy the criteria. Parameters ---------- Noise_population : float Fraction of spectra used to estimate the background signature, as a percentage of the number of spectra. Defaults to 50. itermax : int Maximum number of iterations, defaults to 100. threshold_list : list List of thresholds (not pfa) to apply on each area. Before using this option make sure to have good correspondence between areas and the threshold in list. Use ``self.plot_areas()`` to be sure. By default the thresholds computed in the previous step are used. Returns ------- self.cube_faint : `~mpdaf.obj.Cube` Projection on the eigenvectors associated to the lower eigenvalues of the data cube (representing the faint signal). self.mapO2 : `~mpdaf.obj.Image` The numbers of iterations used by testO2 for each spaxel. """ name = 'compute_greedy_PCA' desc = 'Greedy PCA computation' cube_faint = DataObj('cube') mapO2 = DataObj('image') require = ('preprocessing', 'areas', 'compute_PCA_threshold')
[docs] def run(self, orig, Noise_population=50, itermax=100, threshold_list=None): thr = orig.thresO2 if threshold_list is None else threshold_list orig.param['threshold_list'] = thr self._loginfo(' - List of threshold = %s', ' '.join("%.2f" % x for x in thr)) self._loginfo('Compute greedy PCA on each zone') faint, mapO2, nstop = Compute_GreedyPCA_area( orig.nbAreas, orig.cube_std._data, orig.areamap._data, Noise_population, thr, itermax, orig.testO2, ) if nstop > 0: msg = 'The iterations have been reached the limit of %d in %d cases' self._logwarning(msg, itermax, nstop) self._loginfo('Save the faint signal in self.cube_faint') self.store_cube('cube_faint', faint) self._loginfo( 'Save numbers of iterations used by testO2 for each spaxel in self.mapO2' ) self.store_image('mapO2', mapO2)
[docs]class ComputeTGLR(Step): """ Compute the cube of GLR test values. The test is done on the cube containing the faint signal (``self.cube_faint``) and it uses the PSF and the spectral profiles. Then compute the p-values of local maximum of correlation values. Parameters ---------- size : int Connectivity of contiguous voxels per axis for the maximum filter. ncpu : int Number of CPUs used, defaults to 1. pcut : float Cut applied to the profiles to limit their width (defaults to 1e-8). pmeansub : bool Subtract the mean of the profiles (defaults to True). Returns ------- self.cube_correl : `~mpdaf.obj.Cube` Cube of T_GLR values. self.correl_min : `~mpdaf.obj.Cube` Cube of T_GLR values of minimum correlation. self.cube_profile : `~mpdaf.obj.Cube` (type int) Number of the profile associated to the T_GLR. self.maxmap : `~mpdaf.obj.Image` Map of maximum correlations along the wavelength axis. self.minmap : `~mpdaf.obj.Image` Map of minimum correlations along the wavelength axis. self.cube_local_max : `~mpdaf.obj.Cube` Local maxima from max correlation. self.cube_local_min : `~mpdaf.obj.Cube` Local maxima from minus min correlation. """ name = 'compute_TGLR' desc = 'GLR test' cube_correl = DataObj('cube') cube_correl_min = DataObj('cube') cube_profile = DataObj('cube') cube_local_min = DataObj('cube') cube_local_max = DataObj('cube') maxmap = DataObj('image') minmap = DataObj('image') require = ('compute_greedy_PCA',)
[docs] def run(self, orig, size=3, ncpu=1, pcut=1e-8, pmeansub=True): if ncpu > 1: try: import mkl_fft # noqa except ImportError: pass else: warnings.warn( 'using multiprocessing (ncpu>1) is not possible ' 'with the mkl_fft package, it will crash' ) # TGLR computing (normalized correlations) self._loginfo('Correlation') correl, profile, correl_min = Correlation_GLR_test( orig.cube_faint._data, orig.PSF, orig.wfields, orig.profiles, nthreads=ncpu, pcut=pcut, pmeansub=pmeansub, ) self._loginfo('Save the TGLR value in self.cube_correl') correl[orig.mask] = 0 self.store_cube('cube_correl', correl) self.store_cube('cube_correl_min', correl_min) self._loginfo( 'Save the number of profile associated to the TGLR in self.cube_profile' ) profile[orig.mask] = 0 self.store_cube('cube_profile', profile) self._loginfo('Save the map of maxima in self.maxmap') self.store_image('maxmap', np.amax(correl, axis=0)) self.store_image('minmap', np.amin(correl_min, axis=0)) self._loginfo('Compute p-values of local maximum of correlation values') cube_local_max, cube_local_min = compute_local_max( correl, correl_min, orig.mask, size ) self._loginfo('Save self.cube_local_max from max correlations') self.store_cube('cube_local_max', cube_local_max) self._loginfo('Save self.cube_local_min from min correlations') self.store_cube('cube_local_min', cube_local_min)
[docs]class ComputePurityThreshold(Step): """Find the threshold for a given purity. Parameters ---------- purity : float Purity to automatically compute the threshold. purity_std : float Purity to automatically compute the threshold on the cube_std. If None, previous purity is used. threshlist : list List of thresholds to compute the purity. If not provided it is computed from the data with 50 values between 1.1*median and the max value. pfasegfinal : float PFA for the segmentation based on the maxmap, defaults to 1e-5. bins : str Method for computing bins for the maxmap segmap (see `numpy.histogram_bin_edges`). Defaults to 'fd'. Returns ------- self.threshold_correl : float Estimated threshold used to detect on the correl, defaults to 0.9. self.threshold_std : float Estimated threshold used to detect complementary lines on std cube. self.segmap_purity : `~mpdaf.obj.Image` Combines self.segmap_merged and a segmentation on the maxmap. self.Pval : astropy.table.Table Table with the purity results for each threshold: - Pval_r : The purity function - Tval_r : index value to plot - Det_m : Number of detections (-DATA) - Det_M : Number of detections (+DATA) self.Pval_comp : astropy.table.Table Table with the purity results on cube_std, same columns as Pval. """ name = 'compute_purity_threshold' desc = 'Compute Purity threshold' Pval = DataObj('table') Pval_comp = DataObj('table') segmap_purity = DataObj('image') require = ('compute_TGLR',)
[docs] def run( self, orig, purity=0.9, purity_std=None, threshlist=None, pfasegfinal=1e-5, bins='fd', ): if purity_std is None: purity_std = purity orig.param.update(dict(purity=purity, purity_std=purity_std)) # compute another segmap on the maxmap and merge with # self.segmap_merged thresh, map_res = compute_segmap_gauss( self.orig.maxmap._data, pfasegfinal, 0, bins=bins ) segmap, nlabels = ndi.label((map_res > 0) | (orig.segmap_merged._data > 0)) self.store_image('segmap_purity', segmap) self._loginfo('Estimation of threshold with purity = %.2f', purity) # purity computed on the background area using the segmap threshold, self.Pval = Compute_threshold_purity( purity, orig.cube_local_max._data, orig.cube_local_min._data, segmap, threshlist=threshlist, ) orig.param['threshold'] = threshold self._loginfo('Threshold: %.2f ', threshold) self._loginfo('Estimation of threshold std with purity = %.2f', purity_std) threshold_std, self.Pval_comp = Compute_threshold_purity( purity_std, orig.cube_std_local_max._data, orig.cube_std_local_min._data, threshlist=threshlist, ) orig.param['threshold_std'] = threshold_std self._loginfo('Threshold: %.2f ', threshold_std)
[docs]class Detection(Step): """Detections on local maxima from correlation and std cube, and spatial-spectral merging in order to create the first catalog. Parameters ---------- threshold : float User threshold if the estimated threshold from the previous step is not good. threshold_std : float User threshold if the estimated cube_std threshold is not good. tol_spat : int Tolerance for the spatial merging (distance in pixels), defaults to 3. TODO en fonction du FWHM tol_spec : int Tolerance for the spectral merging (distance in pixels), defaults to 5. segmap : str Optional segmap to use instead of the one computed automatically on the continuum image. Returns ------- self.Cat0 : `astropy.table.Table` Catalog with all detections (correl and comp). Columns: x0 y0 z0 comp STD T_GLR profile self.Cat1 : `astropy.table.Table` Catalog with filtered and matched detections. Columns: ID ra dec lbda x0 y0 z0 comp STD T_GLR profile seg_label self.segmap_label : `~mpdaf.obj.Image` Segmentation map used for the catalog, either the one given as input, otherwise self.segmap_cont. """ name = 'detection' desc = 'Thresholding and spatio-spectral merging' Cat0 = DataObj('table') Cat1 = DataObj('table') segmap_label = DataObj('image')
[docs] def det_correl_min(self, thresh=None): """3D positions of detections in correl_min.""" thresh = thresh or self.orig.param['threshold'] zm, ym, xm = np.where(self.orig.cube_local_min._data > thresh) return zm, ym, xm
[docs] def run( self, orig, threshold=None, threshold_std=None, tol_spat=3, tol_spec=5, maxdist_lines=2.5, segmap=None, ): if threshold is not None: orig.threshold_correl = threshold if threshold_std is not None: orig.threshold_std = threshold_std # detection on the correl cube self._loginfo('Thresholding correl (>%.2f)', orig.threshold_correl) z, y, x = np.where(orig.cube_local_max._data > orig.threshold_correl) cat = Table([x, y, z], names=('x0', 'y0', 'z0')) cat['comp'] = 0 cat['STD'] = np.nan cat['T_GLR'] = orig.cube_local_max._data[z, y, x] cat['profile'] = orig.cube_profile._data[z, y, x] self._loginfo('%d detected lines', len(cat)) # detection on the std cube self._loginfo('Thresholding std (>%.2f)', orig.threshold_std) z, y, x = np.where(orig.cube_std_local_max._data > orig.threshold_std) cat_std = Table([x, y, z], names=('x0', 'y0', 'z0')) cat_std['comp'] = 1 cat_std['STD'] = orig.cube_std_local_max._data[z, y, x] cat_std['T_GLR'] = np.nan cat_std['profile'] = 0 self._loginfo('%d detected lines', len(cat_std)) # Store the raw detection catalog in Cat0 # Merge tables. vstack creates a masked Table, masking the missing # values. But a bug with Astropy/Numpy 1.14 is causing the Cat1 table # to be modified later by Cat2 operations, if it was not dumped before. # So for now we transform the table to a non-masked one. self.Cat0 = _format_cat(vstack([cat, cat_std]).filled()) # merging: remove detections in std close to the ones in correl kdt_cor = cKDTree(np.array([cat['x0'], cat['y0'], cat['z0']]).T) kdt_std = cKDTree(np.array([cat_std['x0'], cat_std['y0'], cat_std['z0']]).T) # find matches in std that are close to correl detections matched = set( itertools.chain.from_iterable( kdt_cor.query_ball_tree(kdt_std, maxdist_lines) ) ) unmatched = list(set(range(len(cat_std))) - matched) cat_std = cat_std[unmatched] self._loginfo('kept %d lines from std after filtering', len(unmatched)) if segmap is not None: self.logger.info('Overriding segmap_cont with the given one') self.segmap_label = Image(segmap) if self.segmap_label.shape != orig.shape[1:]: raise ValueError( 'segmap does not have the same shape as the processed cube' ) else: self.logger.info('Using segmap_cont with an additional deblending step') self.segmap_label = phot_deblend_sources( orig.ima_dct, orig.segmap_cont.data, npixels=5, mode='linear' ) cat = _format_cat(vstack([cat, cat_std]).filled()) cat['area'] = self.segmap_label._data[cat['y0'], cat['x0']] self.logger.info('Spatio-spectral merging...') cat = spatiospectral_merging(cat, tol_spat, tol_spec) # add real coordinates and other useful info z, y, x = cat['z0'].data, cat['y0'].data, cat['x0'].data dec, ra = orig.wcs.pix2sky(np.stack((y, x)).T).T cat.add_column(Column(name='ra', data=ra), index=0) cat.add_column(Column(name='dec', data=dec), index=1) cat.add_column(Column(name='lbda', data=orig.wave.coord(z)), index=2) cat.rename_column('area', 'seg_label') # Start ID numbering at 1 cat['imatch'] += 1 cat['imatch2'] += 1 # Relabel IDs sequentially oldIDs = np.unique(cat['imatch']) idmap = np.zeros(oldIDs.max() + 1, dtype=int) idmap[oldIDs] = np.arange(1, len(oldIDs) + 1) cat.add_column(Column(name='ID', data=idmap[cat['imatch']]), index=0) cat.sort('ID') self._loginfo('Purity estimation') cat = purity_estimation(cat, orig.Pval, orig.Pval_comp) cat_comp = cat[cat['comp'] == 1] ns = len(set(cat['ID'])) ds = len(set(cat_comp['ID']) - set(cat['ID'])) nl = len(cat) dl = len(cat_comp) self.Cat1 = cat msg = 'Save the catalog in self.Cat1 (%d [+%s] sources, %d [+%d] lines)' self._loginfo(msg, ns, ds, nl, dl)
[docs]class ComputeSpectra(Step): """Compute the estimated emission line and the optimal coordinates. For each detected line in a spatio-spectral grid, the line is estimated with the deconvolution model:: subcube = FSF*line -> line_est = subcube*fsf/(fsf^2)) Via PCA LS or denoised PCA LS Method. Parameters ---------- grid_dxy : int Maximum spatial shift for the grid, defaults to 0. spectrum_size_fwhm: float The length of the spectrum to keep around each line as a factor of the fitted line FWHM, defaults to 6. Returns ------- self.Cat2 : `astropy.table.Table` Catalog of parameters of detected emission lines. Columns: ra dec lbda x0 x y0 y z0 z T_GLR profile residual flux num_line purity self.spectra : list of `~mpdaf.obj.Spectrum` Estimated lines. """ name = 'compute_spectra' desc = 'Lines estimation' Cat2 = DataObj('table') spectra = DataObj('spectra') require = ('detection',)
[docs] def run(self, orig, grid_dxy=0, spectrum_size_fwhm=6): self.Cat2, line_est, line_var = estimation_line( orig.Cat1, orig.cube_raw, orig.var, orig.PSF, orig.wfields, orig.wcs, orig.wave, size_grid=grid_dxy, criteria='flux', order_dct=30, horiz_psf=1, horiz=5, ) _format_cat(self.Cat2) self._loginfo( 'Save the updated catalog in self.Cat2 (%d lines)', len(self.Cat2) ) # Radius for spectrum trimming radius = np.ceil(np.array(orig.FWHM_profiles) * spectrum_size_fwhm / 2) radius = radius.astype(int) self.spectra = OrderedDict() for row, data, vari in zip(self.Cat2, line_est, line_var): profile, z, num_line = row['profile', 'z', 'num_line'] z_min = z - radius[profile] z_max = z + radius[profile] if len(data) > 1: # RB test for bug in GridAnalysis sp = Spectrum( data=data, var=vari, wave=orig.wave, mask=np.ma.nomask, copy=False ) self.spectra[num_line] = sp.subspec(z_min, z_max, unit=None) self._loginfo('Save estimated spectrum of each line in self.spectra')
[docs]class CleanResults(Step): """Clean the various results. This step does several things to “clean” the results of ORIGIN: - Some lines are associated to the same source but are very near considering their z positions. The lines are all marked as merged in the brightest line of the group (but are kept in the line table). - A table of unique sources is created. - Statistical detection info is added on the 2 resulting catalogs. Attributes added to the ORIGIN object: - `Cat3_lines`: clean table of lines; - `Cat3_sources`: table of unique sources Parameters ---------- merge_lines_z_threshold: int z axis pixel threshold used when merging similar lines, defaults to 5. """ name = 'clean_results' desc = 'Results cleaning' Cat3_lines = DataObj('table') Cat3_sources = DataObj('table') require = ('compute_spectra',)
[docs] def run(self, orig, merge_lines_z_threshold=5): self.Cat3_lines = merge_similar_lines( orig.Cat2, z_pix_threshold=merge_lines_z_threshold ) self.Cat3_sources = unique_sources(orig.Cat3_lines) # add some statistics to Cat3_lines and Cat3_sources self.Cat3_sources = add_tglr_stat( self.Cat3_sources, self.Cat3_lines, orig.cube_correl.data, orig.cube_std.data, ) self._loginfo( 'Save the unique source catalog in self.Cat3_sources (%d sources)', len(orig.Cat3_sources), ) self._loginfo( 'Save the cleaned lines in self.Cat3_lines (%d lines)', len(orig.Cat3_lines) ) nb_line_merged = np.sum(orig.Cat3_lines['merged_in'] != -9999) if nb_line_merged: self._loginfo('%d lines were merged in nearby lines', nb_line_merged)
[docs]class CreateMasks(Step): """Create source masks and sky masks. This step create the mask and sky mask for each source. Parameters ---------- path : str Path where the masks will be saved. By default this is ``<output_dir>/masks``. overwrite : bool Overwrite the folder if it already exists, True by default. mask_size: int Minimal width in pixel for the square masks. The mask size must be odd. If this parameter is even, 1 will be added to it when creating the masks. Defaults to 25. min_sky_npixel: int Minimum number of sky pixels in the mask, defaults to 100. seg_thres_factor: float Factor applied to the detection threshold to get the threshold used for mask creation, defaults to 0.5 fwhm_factor: float When creating a source, for each line a disk with a diameter of the FWMH multiplied by this factor is added to the source mask, defaults to 2. plot_problems: bool If true, when the mask or a source seems dubious, some diagnostic plots about the source and its lines is saved in the output folder. Note that this may be problematic when running the code inside a notebook, hence the option is False by default. """ name = 'create_masks' desc = 'Mask creation' require = ('clean_results',)
[docs] def run( self, orig, path=None, overwrite=True, mask_size=25, min_sky_npixels=100, seg_thres_factor=0.5, fwhm_factor=2, plot_problems=False, ): if path is None: out_dir = '%s/masks' % orig.outpath else: if os.path.exists(path): raise ValueError(f"Invalid path: {path}") path = os.path.normpath(path) out_dir = f'{path}/{orig.name}/masks' if overwrite: shutil.rmtree(out_dir, ignore_errors=True) os.makedirs(out_dir, exist_ok=True) orig.param['mask_filename_tpl'] = f"{out_dir}/source-mask-%0.5d.fits" orig.param['skymask_filename_tpl'] = f"{out_dir}/sky-mask-%0.5d.fits" create_masks( line_table=orig.Cat3_lines, source_table=orig.Cat3_sources, profile_fwhm=orig.FWHM_profiles, cube_correl=orig.cube_correl, threshold_correl=orig.threshold_correl, cube_std=orig.cube_std, threshold_std=orig.threshold_std, segmap=orig.segmap_label, fwhm=orig.LBDA_FWHM_PSF, out_dir=out_dir, mask_size=mask_size, min_sky_npixels=min_sky_npixels, seg_thres_factor=seg_thres_factor, fwhm_factor=fwhm_factor, plot_problems=plot_problems, )
[docs]class SaveSources(Step): """Create the source file for each source. Parameters ---------- version: str Version number of the source files, for the SRC_V FITS keyword. path: str Path where the sources will be saved, by default this is the output directory of the ORIGIN object. n_jobs : int Number of jobs for parallel processing, defaults to 1. author: str Name of the author to add in the sources. nb_fwhm: float Factor multiplying the FWHM of a line to compute the width of the associated narrow band image, defaults to 2. expmap_filename: str Name of the file containing the exposure map to add to the source. overwrite: bool Overwrite the folder if it already exists. """ name = 'save_sources' desc = 'Save sources'
[docs] def run( self, orig, version, *, path=None, n_jobs=1, author="", nb_fwhm=2, expmap_filename=None, overwrite=True, ): if path is None: outpath = orig.outpath else: if not os.path.exists(path): raise ValueError(f"Invalid path: {path}") outpath = os.path.join(os.path.normpath(path), orig.name) out_dir = os.path.join(outpath, 'sources') if overwrite: shutil.rmtree(out_dir, ignore_errors=True) os.makedirs(out_dir, exist_ok=True) # We need the correlation cube and the spectrum FITS files saved to disk orig.write() from .source_creation import create_all_sources create_all_sources( cat3_sources=orig.Cat3_sources, cat3_lines=orig.Cat3_lines, origin_params=orig.param, cube_cor_filename=os.path.join(outpath, 'cube_correl.fits'), cube_std_filename=os.path.join(outpath, 'cube_std.fits'), mask_filename_tpl=orig.param['mask_filename_tpl'], skymask_filename_tpl=orig.param['skymask_filename_tpl'], spectra_fits_filename=os.path.join(outpath, 'spectra.fits'), segmaps={ "LABEL": os.path.join(outpath, 'segmap_label.fits'), "MERGED": os.path.join(outpath, 'segmap_merged.fits'), }, version=version, profile_fwhm=orig.FWHM_profiles, out_tpl=os.path.join(out_dir, 'source-%0.5d.fits'), n_jobs=n_jobs, author=author, nb_fwhm=nb_fwhm, expmap_filename=expmap_filename, )
"""This defines the list of all processing steps.""" STEPS = [ Preprocessing, CreateAreas, ComputePCAThreshold, ComputeGreedyPCA, ComputeTGLR, ComputePurityThreshold, Detection, ComputeSpectra, CleanResults, CreateMasks, SaveSources, ]