"""
Module to run the IR reference building pipeline on WINTER images
"""
import argparse
import logging
import sys
from pathlib import Path
import numpy as np
from astropy.io import fits
from wintertoo.data import winter_fields
from mirar.data import Dataset, Image, ImageBatch
from mirar.data.utils import get_corners_ra_dec_from_header, plot_fits_image
from mirar.paths import BASE_NAME_KEY, core_fields, get_output_dir
from mirar.pipelines.winter.winter_pipeline import WINTERPipeline
from mirar.processors.split import SUB_ID_KEY
logger = logging.getLogger(__name__)
winter_subfields_file = get_output_dir(
dir_root="cache",
sub_dir="winter",
).joinpath("WINTER_subfields.txt")
winter_subfields_file.parent.mkdir(parents=True, exist_ok=True)
[docs]
def dummy_split_image_batch_generator(
cent_ra: float,
cent_dec: float,
fieldid: int,
nx: int = 3,
ny: int = 4,
full_ra_size_deg: float = 1.0,
full_dec_size_deg: float = 1.2,
) -> ImageBatch:
"""
Make a dummy image batch by splitting an image with the specified dimensions
Args:
cent_ra: Center RA of the parent image
cent_dec: Center Dec of the parent image
fieldid: field ID
nx: Number of subimages in the x direction
ny: Number of subimages in the y direction
full_ra_size_deg: RA-Size of parent image in degrees
full_dec_size_deg: Dec-Size of parent image in degrees
Returns:
"""
subimg_half_ra_deg = full_ra_size_deg / (2 * nx)
subimg_half_dec_deg = full_dec_size_deg / (2 * ny)
all_images = []
for iind, i in enumerate(np.arange(-nx + 1, nx + 1, 2)):
for jind, j in enumerate(np.arange(-ny + 1, ny + 1, 2)):
ra = cent_ra + subimg_half_ra_deg * i / np.cos(cent_dec * np.pi / 180)
if ra < 0:
ra += 360
dec = cent_dec + subimg_half_dec_deg * j
hdu = fits.PrimaryHDU()
hdu.header["NAXIS"] = 2
hdu.header["NAXIS1"] = 3
hdu.header["NAXIS2"] = 3
hdu.header["CTYPE1"] = "RA--TAN"
hdu.header["CTYPE2"] = "DEC-TAN"
hdu.header["CRVAL1"] = ra
hdu.header["CRVAL2"] = dec
hdu.header["CRPIX1"] = 1.5
hdu.header["CRPIX2"] = 1.5
hdu.header["FILTER"] = "J"
hdu.header["CD1_1"] = (
2 * subimg_half_ra_deg / hdu.header["NAXIS1"]
) / np.cos(cent_dec * np.pi / 180)
hdu.header["CD2_2"] = (2 * subimg_half_dec_deg) / hdu.header["NAXIS2"]
data = np.zeros((3, 3))
for k in core_fields:
hdu.header[k] = ""
hdu.header["FIELDID"] = int(fieldid)
subdet = int(iind * len(np.arange(-ny + 1, ny + 1, 2)) + jind)
hdu.header[SUB_ID_KEY] = subdet
hdu.header[BASE_NAME_KEY] = f"field{fieldid}_subdet{subdet}.fits"
image = Image(header=hdu.header, data=data)
all_images.append(image)
del hdu
image_batch = ImageBatch(all_images)
return image_batch
[docs]
def write_subfields_file(
subfields_filename: Path,
full_ra_size_deg: float = 1.0,
full_dec_size_deg: float = 1.2,
nx: int = 3,
ny: int = 4,
):
"""
Write a subfields file for the specified fields file
Args:
subfields_filename: Path to the subfields file
full_ra_size_deg: RA-Size of the field in degrees
full_dec_size_deg: Dec-Size of the field in degrees
nx: Number of sub-fields in the x direction
ny: Number of sub-fields in the y direction
Returns:
None
"""
subimg_half_ra_deg = full_ra_size_deg / (2 * nx)
subimg_half_dec_deg = full_dec_size_deg / (2 * ny)
with open(subfields_filename, "w", encoding="utf8") as subfield_f:
subfield_f.write(
"FieldID, SubdetID, RA_cent, Dec_cent, RA0_0, Dec0_0, "
"RA0_1, Dec0_1, RA1_0, Dec1_0, RA1_1, Dec1_1\n"
)
for _, row in winter_fields.iterrows():
fieldid = row["ID"]
cent_ra = row["RA"]
cent_dec = row["Dec"]
for iind, i in enumerate(np.arange(-nx + 1, nx + 1, 2)):
for jind, j in enumerate(np.arange(-ny + 1, ny + 1, 2)):
subdetid = int(iind * len(np.arange(-ny + 1, ny + 1, 2)) + jind)
ra = cent_ra + subimg_half_ra_deg * i / np.cos(cent_dec * np.pi / 180)
if ra < 0:
ra += 360
dec = cent_dec + subimg_half_dec_deg * j
half_delta_ra = subimg_half_ra_deg / np.cos(cent_dec * np.pi / 180)
half_delta_dec = subimg_half_dec_deg
ra0_0 = ra - half_delta_ra
ra1_0 = ra - half_delta_ra
ra0_1 = ra + half_delta_ra
ra1_1 = ra + half_delta_ra
dec0_0 = dec - half_delta_dec
dec0_1 = dec - half_delta_dec
dec1_0 = dec + half_delta_dec
dec1_1 = dec + half_delta_dec
with open(subfields_filename, "a", encoding="utf8") as subfield_f:
subfield_f.write(
f"{int(fieldid)}, {subdetid}, "
f"{ra}, {dec}, "
f"{ra0_0}, {dec0_0}, {ra0_1}, {dec0_1}, "
f"{ra1_0}, {dec1_0}, {ra1_1}, {dec1_1}\n"
)
[docs]
def run_winter_reference_build_pipeline(
nx: int = 3,
ny: int = 4,
full_ra_size_deg: float = 1.0,
full_dec_size_deg: float = 1.2,
field_id: int | None = None,
subdet_id: int | None = None,
catch_all_errors: bool = True,
):
"""
Run the reference build pipeline on the winter fields
Args:
nx: Number of sub-fields in the x direction
ny: Number of sub-fields in the y direction
full_ra_size_deg: Full right ascension size of the field in degrees
full_dec_size_deg: Full declination size of the field in degrees
field_id: Run only for this fieldid (for debugging)
subdet_id: Run only for this subdetid (for debugging)
catch_all_errors: Catch all errors and continue
Returns:
"""
if not winter_subfields_file.exists():
logger.info(f"Writing subfields file to {winter_subfields_file}")
write_subfields_file(winter_subfields_file)
winter_northern_fields = winter_fields[
(winter_fields["Dec"] > -40) & (winter_fields["Dec"] < 60)
].reset_index(drop=True)
if field_id is not None:
winter_northern_fields = winter_northern_fields[
winter_northern_fields["ID"] == field_id
].reset_index(drop=True)
pipeline = WINTERPipeline(night="references", selected_configurations="refbuild")
res, errorstack = [], []
for ind, _ in winter_northern_fields.iterrows():
cent_ra, cent_dec = (
winter_northern_fields.loc[ind]["RA"],
winter_northern_fields.loc[ind]["Dec"],
)
split_image_batch = dummy_split_image_batch_generator(
cent_ra=float(cent_ra),
cent_dec=float(cent_dec),
fieldid=int(winter_northern_fields.loc[ind]["ID"]),
nx=nx,
ny=ny,
full_ra_size_deg=full_ra_size_deg,
full_dec_size_deg=full_dec_size_deg,
)
subdetids = np.array([x.header[SUB_ID_KEY] for x in split_image_batch])
if subdet_id is not None:
split_image_batch = [
split_image_batch[np.where(subdetids == subdet_id)[0][0]]
]
subdetids = np.array([x.header[SUB_ID_KEY] for x in split_image_batch])
dataset = Dataset([ImageBatch(x) for x in split_image_batch])
res, errorstack = pipeline.reduce_images(
dataset=dataset,
catch_all_errors=catch_all_errors,
)
if len(res) == 0:
logger.error(
f"Something went wrong for field {ind}, " f"returned res with len 0"
)
continue
if len(errorstack.failed_images) < len(split_image_batch):
plots_dir = get_output_dir(
dir_root="plots",
sub_dir="winter/references",
)
if not plots_dir.exists():
plots_dir.mkdir(parents=True)
for res_image in res[0]:
subdet_id = np.where(subdetids == int(res_image.header[SUB_ID_KEY]))[0][
0
]
split_image = split_image_batch[subdet_id]
corner_wcs_coords = get_corners_ra_dec_from_header(split_image.header)
plot_savepath = plots_dir / res_image.header[BASE_NAME_KEY].replace(
".fits", ".png"
)
if not plot_savepath.exists():
plot_fits_image(
res_image,
savedir=plots_dir,
regions_wcs_coords=corner_wcs_coords,
)
return res, errorstack
if __name__ == "__main__":
logger = logging.getLogger("mirar")
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
"%(asctime)s %(name)s [l %(lineno)d] - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument("-fieldid", type=int, default=None)
parser.add_argument("-subdetid", type=int, default=None)
args = parser.parse_args()
run_winter_reference_build_pipeline(
field_id=args.fieldid,
subdet_id=args.subdetid,
)