Source code for mirar.processors.photcal.photcalibrator

"""
Module for running photometric calibration
"""

import logging
import warnings
from collections.abc import Callable
from pathlib import Path

import numpy as np
from astropy.io import fits
from astropy.io.fits.verify import VerifyWarning
from astropy.table import Table

from mirar.catalog.base.base_catalog import BaseCatalog
from mirar.data import Image, ImageBatch
from mirar.paths import (
    MAGLIM_KEY,
    REF_CAT_PATH_KEY,
    ZP_KEY,
    ZP_NSTARS_KEY,
    ZP_STD_KEY,
    get_output_dir,
)
from mirar.processors.astromatic.sextractor.sextractor import sextractor_checkimg_map
from mirar.processors.astrometry.validate import get_fwhm
from mirar.processors.base_catalog_xmatch_processor import (
    BaseImageProcessor,
    BaseProcessorWithCrossMatch,
    default_image_sextractor_catalog_purifier,
    xmatch_catalogs,
)
from mirar.processors.photcal.photcal_errors import (
    PhotometryCalculationError,
    PhotometryCrossMatchError,
    PhotometryReferenceError,
    PhotometrySourceError,
)
from mirar.processors.photcal.zp_calculator.base_zp_calculator import (
    BaseZeroPointCalculator,
)
from mirar.processors.photcal.zp_calculator.outlier_rejection_zp_calculator import (
    OutlierRejectionZPCalculator,
)

logger = logging.getLogger(__name__)

# All the Sextractor parameters required for this script to run
REQUIRED_PARAMETERS = [
    "X_IMAGE",
    "Y_IMAGE",
    "FWHM_WORLD",
    "FLAGS",
    "ALPHAWIN_J2000",
    "DELTAWIN_J2000",
    "MAG_APER",
    "MAG_AUTO",
]


[docs] def get_maglim( bkg_rms_image_path: str | Path, zeropoint: float | list[float], aperture_radius_pixels: float | list[float], ) -> float: """ Function to calculate limiting magnitude Args: bkg_rms_image_path: zeropoint: aperture_radius_pixels: Returns: """ if isinstance(zeropoint, float): zeropoint = [zeropoint] if isinstance(aperture_radius_pixels, float): aperture_radius_pixels = [aperture_radius_pixels] zeropoint = np.array(zeropoint, dtype=float) aperture_radius_pixels = np.array(aperture_radius_pixels, dtype=float) logger.debug(aperture_radius_pixels) bkg_rms_image = fits.getdata(bkg_rms_image_path) bkg_rms_med = np.nanmedian(bkg_rms_image) noise = bkg_rms_med * np.sqrt(np.pi * aperture_radius_pixels**2) maglim = -2.5 * np.log10(5 * noise) + zeropoint logger.debug(f"Aperture radii: {aperture_radius_pixels}") logger.debug(f"Calculated maglim: {maglim}") return maglim
[docs] class PhotCalibrator(BaseProcessorWithCrossMatch): """ Photometric calibrator processor Attributes: num_matches_threshold: minimum number of matches required for photometric calibration for outlier rejection. If a ist is provided, the list is sorted and stepped through in order with increasing thresholds until the specified number of matches is reached. zp_calculator: Zero point calculator object zp_column_name: Name of the column in the photometric catalog that will be used to assign the value of "ZP_KEY" in the header of the output image. Default is "MAG_AUTO" """ base_key = "photcalibrator" def __init__( self, ref_catalog_generator: Callable[[Image], BaseCatalog], temp_output_sub_dir: str = "phot", catalogs_purifier: Callable[ [Table, Table, Image], [Table, Table] ] = default_image_sextractor_catalog_purifier, num_matches_threshold: int = 5, crossmatch_radius_arcsec: float = 1.0, write_regions: bool = False, cache: bool = False, zp_calculator: BaseZeroPointCalculator = OutlierRejectionZPCalculator(), zp_column_name: str = "MAG_AUTO", ): super().__init__( ref_catalog_generator=ref_catalog_generator, temp_output_sub_dir=temp_output_sub_dir, crossmatch_radius_arcsec=crossmatch_radius_arcsec, catalogs_purifier=catalogs_purifier, write_regions=write_regions, cache=cache, required_parameters=REQUIRED_PARAMETERS, ) self.num_matches_threshold = num_matches_threshold self.zp_calculator = zp_calculator self.zp_column_name = zp_column_name
[docs] def description(self) -> str: return "Processor to perform photometric calibration."
[docs] def get_phot_output_dir(self): """ Return the :return: """ return get_output_dir(self.temp_output_sub_dir, self.night_sub_dir)
def _apply_to_images( self, batch: ImageBatch, ) -> ImageBatch: phot_output_dir = self.get_phot_output_dir() phot_output_dir.mkdir(parents=True, exist_ok=True) apertures = self.get_sextractor_apertures() # aperture diameters for image in batch: ref_cat, _, cleaned_img_cat = self.setup_catalogs(image) fwhm_med, _, fwhm_std, med_fwhm_pix, _, _ = get_fwhm(cleaned_img_cat) header_map = { "FWHM_MED": fwhm_med, "FWHM_STD": fwhm_std, "FWHM_PIX": med_fwhm_pix, } for key, value in header_map.items(): if np.isnan(value): value = -999.0 image.header[key] = value if len(ref_cat) < self.num_matches_threshold: err = ( f"Not enough sources ({len(ref_cat)} found in reference catalog " f"to calculate a reliable zeropoint. " f"Require at least {self.num_matches_threshold} matches." ) logger.error(err) raise PhotometryReferenceError(err) logger.debug(f"Found {len(cleaned_img_cat)} clean sources in image.") if len(cleaned_img_cat) < self.num_matches_threshold: err = ( f"Not enough sources ({len(cleaned_img_cat)} " f"found in source catalog " f"to calculate a reliable zeropoint. " f"Require at least {self.num_matches_threshold} matches." ) logger.error(err) raise PhotometrySourceError(err) matched_img_cat, matched_ref_cat, _ = xmatch_catalogs( ref_cat=ref_cat, image_cat=cleaned_img_cat, crossmatch_radius_arcsec=self.crossmatch_radius_arcsec, ) logger.debug( f"Cross-matched {len(matched_img_cat)} sources from catalog to " "the image." ) if len(matched_img_cat) < self.num_matches_threshold: err = ( "Not enough cross-matched sources " "found to calculate a reliable zeropoint. " f"Only found {len(matched_img_cat)} crossmatches, " f"while {self.num_matches_threshold} are required. " f"Used {len(ref_cat)} reference sources and " f"{len(cleaned_img_cat)} image sources." ) logger.error(err) raise PhotometryCrossMatchError(err) # Add columns to image catalog for each aperture colnames = [] for ind, aperture in enumerate(apertures): matched_img_cat[f"MAGAPER_{aperture}"] = matched_img_cat["MAG_APER"][ :, ind ] matched_img_cat[f"MAGERRAPER_{aperture}"] = matched_img_cat[ "MAGERR_APER" ][:, ind] colnames.append(f"MAGAPER_{aperture}") if "MAG_AUTO" in matched_img_cat.colnames: colnames.append("MAG_AUTO") if "MAG_PSF" in matched_img_cat.colnames: colnames.append("MAG_PSF") if "MAG_POINTSOURCE" in matched_img_cat.colnames: colnames.append("MAG_POINTSOURCE") image = self.zp_calculator.calculate_zeropoint( image=image, matched_ref_cat=matched_ref_cat, matched_img_cat=matched_img_cat, colnames=colnames, ) aperture_diameters = [] zp_values = [] with warnings.catch_warnings(record=True): warnings.simplefilter("ignore", category=VerifyWarning) for col in colnames: aper = col.split("_")[-1] # Check if the right zeropoint keys are in the image header for key in [f"ZP_{aper}", f"ZP_{aper}_std", f"ZP_{aper}_nstars"]: assert ( key in image.header.keys() ), f"Zeropoint key {key} not found in image header." zp_values.append(image[f"ZP_{aper}"]) if col in ["MAG_AUTO", "MAG_PSF", "MAG_POINTSOURCE"]: aperture_diameters.append(med_fwhm_pix * 2) else: aperture_diameters.append(float(aper)) if sextractor_checkimg_map["BACKGROUND_RMS"] in image.header.keys(): logger.debug( "Calculating limiting magnitudes from background RMS file" ) limmags = get_maglim( image[sextractor_checkimg_map["BACKGROUND_RMS"]], zp_values, np.array(aperture_diameters) / 2.0, ) else: limmags = [-99] * len(aperture_diameters) for ind, diam in enumerate(aperture_diameters[:-1]): image[f"MAGLIM_{np.rint(diam)}"] = limmags[ind] image[MAGLIM_KEY] = limmags[-1] assert self.zp_column_name in colnames, ( f"You requested {ZP_KEY} be calculated using column " f"{self.zp_column_name}, which was not found in the image " "catalog." ) zp_key = self.zp_column_name.split("_")[-1].upper() if image[f"ZP_{zp_key}"] == -99.0: err = f"Zeropoint calculation failed using {zp_key} column. " raise PhotometryCalculationError(err) image[ZP_KEY] = image[f"ZP_{zp_key}"] image[ZP_STD_KEY] = image[f"ZP_{zp_key}_STD"] image[ZP_NSTARS_KEY] = image[f"ZP_{zp_key}_NSTARS"] image["MAGSYS"] = "AB" return batch
[docs] class ImageBatchReferenceCatalogDownloader(BaseImageProcessor): """ Base processor for downloading a reference catalog for an imagebatch and save it to a specified location. """ base_key = "imagebatchrefcatdownloader" def __init__( self, ref_catalog_generator: Callable[[Image], BaseCatalog], temp_output_sub_dir: str = "phot", ref_cat_header_key: str = REF_CAT_PATH_KEY, ): super().__init__() self.ref_catalog_generator = ref_catalog_generator self.temp_output_sub_dir = temp_output_sub_dir self.ref_cat_header_key = ref_cat_header_key
[docs] def description(self) -> str: return "Processor to download reference catalog for an ImageBatch."
[docs] def get_output_dir(self): """ Return the :return: """ return get_output_dir(self.temp_output_sub_dir, self.night_sub_dir)
def _apply_to_images( self, batch: ImageBatch, ) -> ImageBatch: for image in batch: if self.ref_cat_header_key not in image.header: err = ( f"Reference catalog path not found in image header. " f"Expected key: {self.ref_cat_header_key}" ) raise KeyError(err) ref_cat_paths = [image[self.ref_cat_header_key] for image in batch] assert len(np.unique(ref_cat_paths)) == 1, ( "All images in the batch must have the same reference catalog path, " f"specified by the {self.ref_cat_header_key} key in the image header." ) ref_cat_path = Path(ref_cat_paths[0]) if not ref_cat_path.exists(): logger.debug( f"Reference catalog not found at {ref_cat_path}. " "Generating reference catalog." ) ref_catalog = self.ref_catalog_generator(batch[0]) output_dir = self.get_output_dir() _ = ref_catalog.write_catalog(batch[0], output_dir=output_dir) else: logger.debug(f"Reference catalog found at {ref_cat_path}. Using it.") return batch