Source code for mirar.pipelines.winter.run

"""
Module containing WINTER CLI commands
"""

import argparse
import logging
import sys
import tempfile
from pathlib import Path

import numpy as np
from astropy.time import Time
from sqlalchemy import Select, text
from sqlalchemy.sql import func

from mirar.data import Dataset, ImageBatch, cache
from mirar.database.transactions.select import run_select
from mirar.io import open_raw_image
from mirar.paths import TEMP_DIR
from mirar.pipelines.winter.models import ExposuresTable, RawsTable, StacksTable
from mirar.pipelines.winter.winter_pipeline import WINTERPipeline
from mirar.processors.utils.image_loader import load_from_list

logger = logging.getLogger(__name__)


[docs] def run_winter( config: str, dataset: Dataset | None = None, log_level: str = "INFO", subdir: str = None, ): """ Run a WINTER pipeline :param config: :param dataset: :param log_level: :param subdir: :return: """ # Set up logging log = logging.getLogger("mirar") handler = logging.StreamHandler(sys.stdout) formatter = logging.Formatter( "%(name)s [l %(lineno)d] - %(levelname)s - %(message)s" ) handler.setFormatter(formatter) log.addHandler(handler) log.setLevel(log_level) pipe = WINTERPipeline( selected_configurations=config, night=subdir, ) _, errorstack = pipe.reduce_images( dataset=dataset, catch_all_errors=True, ) errorstack.summarise_error_stack()
[docs] def run_stack_of_stacks(): """ CLI wrapper for stacking pre-existing stack images of a target :return: """ parser = argparse.ArgumentParser(description="Stack of stacks") parser.add_argument("-t", "--target", help="Target to stack", type=str) parser.add_argument("-f", "--fieldid", help="Field ID", type=str) parser.add_argument("--level", help="Logging level", type=str, default="INFO") parser.add_argument( "--bindays", help="Window, in days, for bin", type=int, default=None ) parser.add_argument( "-s", "--startdate", help="Start date (e.g., 20240820)", type=str, default=None ) args = parser.parse_args() if args.target is not None: db_constraint = f"targname = '{args.target}'" subdir = args.target if args.fieldid is not None: err = "Cannot specify both target and fieldid" logger.error(err) raise ValueError(err) elif args.fieldid is not None: db_constraint = f"fieldid = '{args.fieldid}'" subdir = args.fieldid else: err = "Must specify either target or fieldid" logger.error(err) raise ValueError(err) sel = ( Select( StacksTable.savepath, func.min(ExposuresTable.utctime).label("utctime"), ) .join(StacksTable.raw) .join(RawsTable.exposure_ids) .group_by(StacksTable.savepath) .where(text(db_constraint)) ) df = run_select(sel, StacksTable) df.sort_values(by="utctime", inplace=True) times = np.array( [ Time(str(x["utctime"].to_datetime64()), format="isot").mjd for _, x in df.iterrows() ] ) if args.startdate is not None: print(f"Only including images after {args.startdate}") start_date = Time.strptime(args.startdate, "%Y%m%d") mask = times >= start_date.mjd df = df[mask] times = times[mask] df.reset_index(drop=True, inplace=True) with tempfile.TemporaryDirectory(dir=TEMP_DIR) as temp_dir_path: print(f"Using cache {temp_dir_path}") cache.set_cache_dir(temp_dir_path) savepaths = [] for x in df["savepath"]: path = Path(x) if not path.exists(): err = f"Stack file {path} does not exist" logger.warning(err) savepaths.append(path) print(f"Found {len(savepaths)} stack images") savepaths = [Path(x) for x in df["savepath"]] img_batch = load_from_list(savepaths, open_raw_image) if args.bindays is not None: bins = np.arange(min(times), max(times) + args.bindays, step=args.bindays) new_batch = ImageBatch() images = np.array(img_batch.get_batch()) for i, t_start in enumerate(bins[:-1]): t_end = bins[i + 1] mask = (times >= t_start) & (times < t_end) for img in images[mask]: targ_name = f"{subdir}_{i}" img["TARGNAME"] = targ_name img["RESTACKN"] = i img["BINSTART"] = t_start img["BINEND"] = t_end new_batch.append(img) img_batch = new_batch dataset = Dataset(img_batch) run_winter( config="stack_stacks_db", dataset=dataset, log_level=args.level, subdir=f"restacks/{subdir}", )