Source code for mirar.pipelines.winter.load_winter_image

"""
Module for loading raw WINTER images and ensuring they have the correct format
"""

import logging
import os
import warnings
from pathlib import Path

import astropy
import numpy as np
from astropy.io import fits
from astropy.stats import sigma_clipped_stats
from astropy.time import Time
from astropy.utils.exceptions import AstropyWarning

from mirar.data import Image, ImageBatch
from mirar.io import (
    ExtensionParsingError,
    open_fits,
    open_mef_fits,
    open_mef_image,
    open_raw_image,
    tag_mef_extension_file_headers,
)
from mirar.paths import (
    BASE_NAME_KEY,
    COADD_KEY,
    GAIN_KEY,
    OBSCLASS_KEY,
    PROC_FAIL_KEY,
    PROC_HISTORY_KEY,
    RAW_IMG_KEY,
    SATURATE_KEY,
    TARGET_KEY,
    core_fields,
)
from mirar.pipelines.winter.constants import (
    all_winter_board_ids,
    imgtype_dict,
    palomar_observer,
    sncosmo_filters,
    subdets,
    winter_filters_map,
)
from mirar.pipelines.winter.models import DEFAULT_FIELD, default_program, itid_dict
from mirar.processors.skyportal import SNCOSMO_KEY
from mirar.processors.utils.image_loader import BadImageError

logger = logging.getLogger(__name__)


[docs] def clean_header(header: fits.Header) -> fits.Header: """ Function to clean the header of an image, adding in missing keys and correcting values where necessary :param header: Header to clean :return: Updated header """ header[GAIN_KEY] = 1.0 header[SATURATE_KEY] = 40000.0 if "READOUTM" not in header.keys(): header["READOUTM"] = None if "READOUTV" not in header.keys(): header["READOUTV"] = None elif header["READOUTV"] is not None: header["READOUTV"] = str(header["READOUTV"]) header["UTCTIME"] = Time(header["UTCISO"], format="iso").isot date_t = Time(header["UTCTIME"]) header["MJD-OBS"] = date_t.mjd header["JD"] = date_t.jd header["DATE-OBS"] = date_t.isot header[OBSCLASS_KEY] = header["OBSTYPE"].lower().strip() # Check to ensure that biases and darks are tagged appropriately if header["EXPTIME"] == 0.0: if header[OBSCLASS_KEY] not in ["corrupted", "bias"]: header[OBSCLASS_KEY] = "bias" if header["FILTERID"] == "dark": if header[OBSCLASS_KEY] not in ["dark", "bias", "test", "science", "corrupted"]: header[OBSCLASS_KEY] = "test" else: header[OBSCLASS_KEY] = "dark" # Discard pre-sunset, post-sunset darks if header[OBSCLASS_KEY] == "dark": sun_pos = palomar_observer.sun_altaz(Time(header["UTCTIME"])) if sun_pos.alt.to_value("deg") > -20.0: header[OBSCLASS_KEY] = "test" # Sometimes darks come with wrong fieldids if header[OBSCLASS_KEY] == "dark": header["FIELDID"] = DEFAULT_FIELD # Set targname to dark header["TARGNAME"] = "dark" # Tag dome flats as flats if header[OBSCLASS_KEY].lower() == "domeflat": header[OBSCLASS_KEY] = "flat" header[TARGET_KEY] = "flat" # Mirror cover should be open for science images, and open or closed for darks if "MIRCOVER" in header.keys(): bad_mirror_cover = False if header[OBSCLASS_KEY] in ["dark", "bias"]: if not header["MIRCOVER"].lower() in ["open", "closed"]: bad_mirror_cover = True elif header[OBSCLASS_KEY] not in ["corrupted", "test"]: if not header["MIRCOVER"].lower() == "open": bad_mirror_cover = True if bad_mirror_cover: logger.error( f"Bad MIRCOVER value: {header['MIRCOVER']} " f"(img class={header[OBSCLASS_KEY]})" ) header[OBSCLASS_KEY] = "corrupted" else: header["MIRCOVER"] = "open" if float(header["EXPTIME"]) >= 1.0: header["EXPTIME"] = np.rint(header["EXPTIME"]) else: header["EXPTIME"] = np.round(header["EXPTIME"], 2) # Set up the target name target = f"field_{header['FIELDID']}" if ("SCHDNAME" in header.keys()) & ("OBHISTID" in header.keys()): if header["SCHDNAME"] != "": target = f"{header['SCHDNAME']}_{header['OBHISTID']}" elif TARGET_KEY in header.keys(): if header[TARGET_KEY] != "": target = header[TARGET_KEY] # If the observation is dark/bias/focus/pointing/flat/corrupted, # enforce TARGET_KEY is also dark/bias as the TARGET_KEY is used for CalHunter. # Currently they may not come # with the correct TARGET_KEY. if header[OBSCLASS_KEY].lower() in [ "dark", "bias", "focus", "pointing", "flat", "test", "corrupted", ]: target = header[OBSCLASS_KEY].lower() header[TARGET_KEY] = target if "TARGNAME" in header.keys(): if header["TARGNAME"] == "": header["TARGNAME"] = None else: header["TARGNAME"] = None header["RA"] = header["RADEG"] header["DEC"] = header["DECDEG"] header["EXPID"] = int((date_t.mjd - 59000.0) * 86400.0) # seconds since 60000 MJD if COADD_KEY not in header.keys(): logger.debug(f"No {COADD_KEY} entry. Setting coadds to 1.") header[COADD_KEY] = 1 header[PROC_HISTORY_KEY] = "" header[PROC_FAIL_KEY] = False # Make sure filter is a keyword that the pipeline recognizes filter_dict = {"J": 1, "H": 2, "Ks": 3} if "FILTERID" not in header.keys(): header["FILTERID"] = filter_dict[header["FILTER"]] header["FILTER"] = header["FILTERID"] if header["FILTER"] == "Hs": header["FILTER"] = "H" if header["FILTERID"] in winter_filters_map: header["FID"] = int(winter_filters_map[header["FILTERID"]]) else: header["FID"] = -99 # Set default values if field details seem incorrect if "FIELDID" not in header.keys(): header["FIELDID"] = DEFAULT_FIELD if header["FIELDID"] < 0: header["FIELDID"] = DEFAULT_FIELD # Set default values if program is not correct if "PROGPI" not in header.keys(): header["PROGPI"] = default_program.pi_name if "PROGID" not in header.keys(): header["PROGID"] = default_program.progid # If PROGNAME is not present or is empty, set it to default here. # Otherwise, it gets set to default in the insert_entry for exposures. if "PROGNAME" not in header: header["PROGNAME"] = default_program.progname if header["PROGNAME"] == "": header["PROGNAME"] = default_program.progname if len(header["PROGNAME"]) != 8: logger.warning( f"PROGNAME {header['PROGNAME']} is not 8 characters long. " f"Replacing with default." ) header["PROGNAME"] = default_program.progname with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) header["NIGHTDATE"] = date_t.to_datetime().strftime("%Y-%m-%d") header["IMGTYPE"] = header[OBSCLASS_KEY] if header["IMGTYPE"] == "test": header["IMGTYPE"] = "other" if not header["IMGTYPE"] in imgtype_dict: logger.error(f"Unknown image type: {header['IMGTYPE']}") header["ITID"] = itid_dict[imgtype_dict["corrupted"]] else: header["ITID"] = itid_dict[imgtype_dict[header["IMGTYPE"]]] header["EXPMJD"] = header["MJD-OBS"] if header["FILTER"].lower() in ["y", "j", "h"]: header[SNCOSMO_KEY] = sncosmo_filters[header["FILTER"].lower()] if "GAINCOLT" not in header.keys(): header["GAINCOLT"] = "[]" if "GAINCOLB" not in header.keys(): header["GAINCOLB"] = "[]" if "GAINROW" not in header.keys(): header["GAINROW"] = "[]" if "NUMDITHS" not in header.keys(): header["NUMDITHS"] = None elif header["NUMDITHS"] is not None: header["NUMDITHS"] = int(header["NUMDITHS"]) if "DITHNUM" not in header.keys(): header["DITHNUM"] = None elif header["DITHNUM"] is not None: header["DITHNUM"] = int(header["DITHNUM"]) if "DITHSTEP" not in header.keys(): header["DITHSTEP"] = None elif header["DITHSTEP"] is not None: header["DITHSTEP"] = float(header["DITHSTEP"]) try: header["BOARD_ID"] = int(header["BOARD_ID"]) assert ( header["BOARD_ID"] in all_winter_board_ids ), f"Board ID {header['BOARD_ID']} not in {all_winter_board_ids}" except KeyError: pass header["LABFLATV"] = "0.1" return header
[docs] def load_winter_stack( path: str | Path, ) -> Image: """ Load proc image :param path: Path to image :return: data and header """ logger.debug(f"Loading {path}") data, header = open_fits(path) dirname = path.split("/winter/")[0] + "/winter/" wghtpath = header["WGHTPATH"] weight_pathname = wghtpath.split("/winter/")[-1] new_weightpath = Path(dirname) / weight_pathname header["WGHTPATH"] = new_weightpath.as_posix() header["SAVEPATH"] = path if "PSFCAT" in header.keys(): new_psfpath = Path(dirname) / header["PSFCAT"].split("/winter/")[-1] header["PSFCAT"] = new_psfpath.as_posix() if "RFCTPATH" in header.keys(): new_catpath = Path(dirname) / header["RFCTPATH"].split("/winter/")[-1] header["RFCTPATH"] = new_catpath.as_posix() if TARGET_KEY not in header.keys(): if "TARGNAME" in header.keys(): header[TARGET_KEY] = header["TARGNAME"] if SNCOSMO_KEY not in header.keys(): if header["FILTER"].lower() in ["y", "j", "h"]: header[SNCOSMO_KEY] = sncosmo_filters[header["FILTER"].lower()] return Image(data=data, header=header)
[docs] def load_astrometried_winter_image(path: str | Path) -> Image: """ Load astrometried image :param path: Path to image :return: Image object """ logger.debug(f"Loading {path}") data, header = open_fits(path) dirname = path.split("/winter/")[0] + "/winter/" scamp_path = header["SCMPHEAD"] new_scamp_path = Path(dirname) / scamp_path.split("/winter/")[-1] header["SCMPHEAD"] = new_scamp_path.as_posix() return Image(data=data, header=header)
[docs] def load_stacked_winter_image( path: str | Path, ) -> tuple[np.array, fits.Header]: """ Load proc image :param path: Path to image :return: data and header """ logger.debug(f"Loading {path}") data, header = open_fits(path) if "weight" in path: header[OBSCLASS_KEY] = "weight" header["COADDS"] = 1 header["CALSTEPS"] = "" header["PROCFAIL"] = 1 header["RAWPATH"] = "" header["BASENAME"] = os.path.basename(path) header[TARGET_KEY] = "weight" if "UTCTIME" not in header.keys(): header["UTCTIME"] = "2023-06-14T00:00:00" if TARGET_KEY not in header.keys(): header[TARGET_KEY] = header["TARGNAME"] return data, header
[docs] def load_test_winter_image( path: str | Path, ) -> Image: """ Load test WINTER image :param path: Path to image :return: Image object """ image = open_raw_image(path) header = clean_header(image.header) image.set_header(header) return image
[docs] def load_raw_winter_mef( path: str, ) -> tuple[astropy.io.fits.Header, list[np.array], list[astropy.io.fits.Header]]: """ Load mef image. :param path: Path to image :return: Primary header, list of data arrays, list of headers """ primary_header, split_data, split_headers = open_mef_fits(path) img_name = Path(path).name primary_header[BASE_NAME_KEY] = img_name primary_header[RAW_IMG_KEY] = path corrupted = False try: primary_header = clean_header(primary_header) except KeyError as exc: logger.error( f"Could not clean header for {path}: '{exc.args[0]}'. " f"Marking as corrupted." ) corrupted = True try: primary_header["OBSTYPE"] = "CORRUPTED" primary_header["FILTERID"] = "dark" primary_header["FIELDID"] = DEFAULT_FIELD primary_header["RADEG"] = 180.0 primary_header["DECDEG"] = 0.0 primary_header["AZIMUTH"] = 180.0 primary_header["ALTITUDE"] = 0.0 primary_header["DITHNUM"] = 0 primary_header["NUMDITHS"] = 1 primary_header["EXPTIME"] = 0.0 [date, time, milliseconds] = img_name.split("_")[1].split("-") dateiso = ( f"{date[:4]}-{date[4:6]}-{date[6:]} " f"{time[:2]}:{time[2:4]}:{time[4:6]}.{milliseconds}" ) primary_header["UTCISO"] = dateiso primary_header["TARGNAME"] = "CORRUPTED" for field in core_fields: if field not in primary_header.keys(): primary_header[field] = -99.0 primary_header = clean_header(primary_header) except KeyError as exc2: err = f"Could not mark {path} as corrupted. Failing entirely: {exc2}" logger.error(err) raise BadImageError(err) from exc2 try: split_headers = tag_mef_extension_file_headers( primary_header=primary_header, extension_headers=split_headers, extension_key="BOARD_ID", ) except ExtensionParsingError: logger.error(f"Could not parse extensions for {path}. Marking as corrupted.") corrupted = True split_headers = tag_mef_extension_file_headers( primary_header=primary_header, extension_headers=split_headers, ) for i, split_header in enumerate(split_headers): split_header["BOARD_ID"] = i split_header["T_ROIC"] = -99.0 # Mark final headers as corrupted regardless # of which bit of the header was corrupted for split_header in split_headers: if corrupted: split_header["OBSTYPE"] = "CORRUPTED" split_header["TARGNAME"] = "CORRUPTED" clean_header(split_header) # Sometimes there are exptime keys for board_header in split_headers: if "EXPTIME" in board_header.keys(): del board_header["EXPTIME"] # This is especially annoying if "BOARD_ID" in board_header.keys(): board_header["BOARD_ID"] = int(board_header["BOARD_ID"]) return primary_header, split_data, split_headers
[docs] def load_winter_mef_image( path: str | Path, ) -> list[Image]: """ Function to load winter mef images :param path: Path to image :return: list of images """ images = open_mef_image(path, load_raw_winter_mef, extension_key="BOARD_ID") return images
[docs] def annotate_winter_subdet_headers(batch: ImageBatch) -> ImageBatch: """ Annotate winter header with information on the subdetector :param batch: ImageBatch to annotate :return: ImageBatch where images have the updated header """ new_batch = [] for image in batch: data = image.get_data() with warnings.catch_warnings(): warnings.simplefilter("ignore", AstropyWarning) _, med, std = sigma_clipped_stats(data, sigma=3.0, maxiters=5) image["MEDCOUNT"] = med image["STDDEV"] = std subnx, subny, subnxtot, subnytot = ( image["SUBNX"], image["SUBNY"], image["SUBNXTOT"], image["SUBNYTOT"], ) mask = ( (subdets["nx"] == subnx) & (subdets["ny"] == subny) & (subdets["nxtot"] == subnxtot) & (subdets["nytot"] == subnytot) & (subdets["boardid"] == image["BOARD_ID"]) ) assert np.sum(mask) == 1, ( f"Subdet not found for nx={subnx}, ny={subny}, " f"nxtot={subnxtot}, nytot={subnytot} and boardid={image['BOARD_ID']}" ) image["SUBDETID"] = int(subdets[mask]["subdetid"].iloc[0]) image["RAWID"] = int(f"{image['EXPID']}_{str(image['SUBDETID']).rjust(2, '0')}") image["USTACKID"] = None if "DATASEC" in image.keys(): del image["DATASEC"] # TODO: Write a little snippet to estimate the central RA/Dec from the pointing # RA/Dec, BOARD_ID, SUBCOORD, and PA new_batch.append(image) new_batch = ImageBatch(new_batch) return new_batch
[docs] def get_raw_winter_mask(image: Image) -> np.ndarray: """ Get mask for raw winter image. """ data = image.get_data() header = image.header mask = np.zeros(data.shape) if header["BOARD_ID"] == 0: # Mask the outage in the bottom center mask[:500, 700:1600] = 1.0 mask[1075:, :] = 1.0 mask[:, 1950:] = 1.0 mask[:20, :] = 1.0 if header["BOARD_ID"] == 1: mask[:, 344:347] = 1.0 mask[:, 998:1000] = 1.0 mask[:, 1006:1008] = 1.0 mask[260:262, :] = 1.0 # Mask entire striped area to the right of the chip # mask[:, 1655:] = 1.0 # Mask the low sensitivity regions around edges mask[1070:, :] = 1.0 mask[:20, :] = 1.0 mask[:, :75] = 1.0 mask[:, 1961:] = 1.0 if header["BOARD_ID"] == 2: mask[1060:, :] = 1.0 mask[:, 1970:] = 1.0 mask[:55, :] = 1.0 mask[:, :20] = 1.0 mask[:475, 406:419] = 1.0 mask[350:352, :] = 1.0 mask[260:287, :66] = 1.0 mask[:, 1564:1567] = 1.0 mask[:, 1931:] = 1.0 if header["BOARD_ID"] == 3: mask[1085:, :] = 1.0 mask[:, 1970:] = 1.0 mask[:55, :] = 1.0 mask[:, :20] = 1.0 # Mask outages on top right and bottom right mask[:180, 1725:] = 1.0 mask[1030:, 1800:] = 1.0 if header["BOARD_ID"] == 4: # # Mask the region to the top left mask[610:, :250] = 1.0 # # There seems to be a dead spot in the middle of the image mask[503:518, 384:405] = 1.0 # Mask the edges with low sensitivity due to masking mask[:, 1948:] = 1.0 mask[:, :61] = 1.0 mask[:20, :] = 1.0 mask[1060:, :] = 1.0 # Mask a vertical strip mask[:, 992:1002] = 1.0 # Mask another vertical strip mask[:, 1266:1273] = 1.0 # Mask the outage to the right mask[145:, 1735:] = 1.0 # mask[data > 40000] = 1.0 # Mask random vertical strip mask[:, 1080:1085] = 1.0 if header["BOARD_ID"] == 5: # Mask the outage in the top-right. mask[700:, 1200:1900] = 1.0 mask[1072:, :] = 1.0 mask[:, 1940:] = 1.0 mask[:15, :] = 1.0 return mask.astype(bool)