"""
Module with utis for photometry
"""
import logging
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from astropy.stats import sigma_clipped_stats
from matplotlib.patches import Circle
from photutils.aperture import CircularAnnulus, CircularAperture, aperture_photometry
from mirar.data import Image
from mirar.errors import ProcessorError
from mirar.paths import GAIN_KEY
logger = logging.getLogger(__name__)
[docs]
class CutoutError(ProcessorError):
"""
Error raised when cutout generation fails
"""
[docs]
def make_cutouts(
image_paths: Path | list[Path], position: tuple, half_size: int
) -> list[np.array]:
"""
Function to make cutouts
Args:
:param: image_paths: Path or list of paths to the images
:param: position: (x,y) coordinates of the center of the cutouts
:param: half_size: half_size of the square cutouts
Returns:
:return: cutout_list: list of 2D numpy arrays
"""
if not isinstance(image_paths, list):
image_paths = [image_paths]
cutout_list = []
for image_path in image_paths:
data = fits.getdata(image_path)
y_image_size, x_image_size = np.shape(data)
x, y = position
if x < 0 or x > x_image_size or y < 0 or y > y_image_size:
raise CutoutError(
f"Cutout position {x},{y} is outside the image {image_path}"
)
if x < half_size:
xmin, xmax = 0, x + half_size + 1
x_pad_width = (half_size - x, 0)
elif x + half_size + 1 > x_image_size:
xmin, xmax = x - half_size, x_image_size
x_pad_width = (0, (half_size + x + 1) - x_image_size)
else:
xmin, xmax = x - half_size, x + half_size + 1
x_pad_width = (0, 0)
if y < half_size:
ymin, ymax = 0, y + half_size + 1
y_pad_width = (half_size - y, 0)
elif y + half_size + 1 > y_image_size:
ymin, ymax = y - half_size, y_image_size
y_pad_width = (0, (half_size + y + 1) - y_image_size)
else:
ymin, ymax = y - half_size, y + half_size + 1
y_pad_width = (0, 0)
cutout = data[ymin:ymax, xmin:xmax]
cutout = np.pad(cutout, (y_pad_width, x_pad_width), "constant")
cutout_list.append(cutout)
assert cutout.shape == (2 * half_size + 1, 2 * half_size + 1), (
f"Cutout shape is not correct for x={x} and y={y} and "
f"half_size={half_size}. Shape is {cutout.shape} Data shape is {data.shape}"
)
return cutout_list
[docs]
def psf_photometry(
image_cutout: np.ndarray,
image_unc_cutout: np.ndarray,
psfmodels: np.ndarray,
) -> tuple[float, float, float, float, float, np.ndarray]:
"""
Function to perform PSF photometry
Args:
:param: image_cutout: 2D numpy array of the image cutout
:param: image_unc_cutout: 2D numpy array of the image uncertainty cutout
:param: psfmodels: 3D numpy array of the PSF models
Returns:
:return: psf_fluxes: PSF flux
:return: psf_flux_uncs: PSF flux uncertainty
:return: chi2s: chi2 value
:return xshifts: xshift required to match PSF to the source
:return yshifts: yshift required to match PSF to the source
"""
numpsfmodels = psfmodels.shape[2]
chi2s, psf_fluxes, psf_flux_uncs = [], [], []
for ind in range(numpsfmodels):
psfmodel = psfmodels[:, :, ind]
psf_flux = np.nansum(psfmodel * image_cutout) / np.nansum(np.square(psfmodel))
psf_flux_unc = np.sqrt(
np.nansum(np.square(psfmodel) * np.square(image_unc_cutout))
) / np.nansum(np.square(psfmodel))
deg_freedom = np.size(image_cutout) - 1
chi2 = (
np.nansum(
np.square(image_cutout - psfmodel * psf_flux)
/ np.square(image_unc_cutout)
)
/ deg_freedom
)
psf_fluxes.append(psf_flux)
psf_flux_uncs.append(psf_flux_unc)
chi2s.append(chi2)
minchi2_ind = np.argmin(chi2s)
minchi2 = np.min(chi2s)
best_fit_psf_flux = psf_fluxes[minchi2_ind]
best_fit_psf_fluxunc = psf_flux_uncs[minchi2_ind]
best_fit_psfmodel = psfmodels[:, :, minchi2_ind]
ys_cen, xs_cen = np.where(best_fit_psfmodel == np.max(best_fit_psfmodel))
unshifted_psf = psfmodels[:, :, numpsfmodels // 2 + 1]
y_cen, x_cen = np.where(unshifted_psf == np.max(unshifted_psf))
yshift = (ys_cen[0] - y_cen)[0]
xshift = (xs_cen[0] - x_cen)[0]
return (
best_fit_psf_flux,
best_fit_psf_fluxunc,
minchi2,
xshift,
yshift,
best_fit_psfmodel,
)
[docs]
def make_psf_shifted_array(
psf_filename: str, cutout_size_psf_phot: int = 20, pad_psf_size: int = 60
):
"""
Function to make a shifted array from a PSF model
Args:
psf_filename:
cutout_size_psf_phot:
pad_psf_size:
Returns:
"""
psf = fits.getdata(psf_filename)
normpsf = psf / np.sum(psf)
ngrid = 81
x_crds = np.linspace(-4, 4, 9)
grid_x, grid_y = np.meshgrid(x_crds, x_crds)
grid_x = np.ndarray.flatten(grid_x)
grid_y = np.ndarray.flatten(grid_y)
psfmodel_half_size = int(psf.shape[0] / 2)
padpsfs = np.zeros((pad_psf_size, pad_psf_size, ngrid))
pad_loind = int((pad_psf_size - 2 * psfmodel_half_size) / 2)
pad_upind = pad_psf_size - pad_loind + 1
for i in range(ngrid):
padpsfs[
int(pad_loind + grid_y[i]) : int(pad_upind + grid_y[i]),
int(pad_loind + grid_x[i]) : int(pad_upind + grid_x[i]),
i,
] = normpsf
unshifted_ind = int(ngrid / 2) + 1
normpsfmax = np.max(normpsf)
xcen_1, xcen_2 = np.where(padpsfs[:, :, unshifted_ind] == normpsfmax)
xcen_1 = int(xcen_1)
xcen_2 = int(xcen_2)
psfmodels = padpsfs[
xcen_1 - cutout_size_psf_phot : xcen_1 + cutout_size_psf_phot + 1,
xcen_2 - cutout_size_psf_phot : xcen_2 + cutout_size_psf_phot + 1,
]
return psfmodels
[docs]
def aper_photometry(
image_cutout: np.ndarray,
image_unc_cutout: np.ndarray,
aper_diameter: float,
bkg_in_diameter: float,
bkg_out_diameter: float,
plot: bool = False,
plotfilename: str = None,
):
"""
Perform aperture photometry
Args:
:param: image_cutout: np.ndarray of the image cutout
:param: image_unc_cutout: np.ndarray of the image uncertainty cutout
:param: aper_diameter: aperture diameter in pixels
:param: bkg_in_diameter: inner background annulus diameter in pixels
:param: bkg_out_diameter: outer background annulus diameter in pixels
:param: plot: whether to plot the cutout
:param: plotfilename: filename to save plot to
Returns:
:return: aperture flux, aperture flux uncertainty
"""
x_crd, y_crd = int(image_cutout.shape[0] / 2), int(image_cutout.shape[1] / 2)
image_cutout_mask = np.isnan(image_cutout)
if plot:
if plotfilename is None:
raise ValueError("Please provide a filename to save the plot to.")
fig, plot_ax = plt.subplots()
mean, std = np.nanmean(image_cutout), np.nanstd(image_cutout)
plot_ax.imshow(
image_cutout,
interpolation="nearest",
cmap="gray",
vmin=mean - std,
vmax=mean + 10 * std,
origin="lower",
)
# c = Circle(xy=(x_img, y_img),radius=15)
circle = Circle(xy=(x_crd, y_crd), radius=aper_diameter / 2)
bkg_inner_annulus = Circle(xy=(x_crd, y_crd), radius=bkg_in_diameter / 2)
bkg_outer_annulus = Circle(xy=(x_crd, y_crd), radius=bkg_out_diameter / 2)
circle.set_facecolor("none")
circle.set_edgecolor("red")
bkg_inner_annulus.set_facecolor("none")
bkg_inner_annulus.set_edgecolor("red")
bkg_outer_annulus.set_facecolor("none")
bkg_outer_annulus.set_edgecolor("red")
plot_ax.add_artist(circle)
plot_ax.add_artist(bkg_inner_annulus)
plot_ax.add_artist(bkg_outer_annulus)
plot_ax.set_xlim(x_crd - 30, x_crd + 30)
plot_ax.set_ylim(y_crd - 30, y_crd + 30)
plt.savefig(plotfilename)
plt.close(fig)
aperture = CircularAperture((x_crd, y_crd), r=aper_diameter / 2)
annulus_aperture = CircularAnnulus(
(x_crd, y_crd), r_in=bkg_in_diameter / 2, r_out=bkg_out_diameter / 2
)
annulus_masks = annulus_aperture.to_mask(method="center")
annulus_data = annulus_masks.multiply(image_cutout)
mask = annulus_masks.data
annulus_data_1d = annulus_data[mask > 0]
_, bkg_median, _ = sigma_clipped_stats(annulus_data_1d, sigma=2, mask_value=np.nan)
bkg = np.zeros(image_cutout.shape) + bkg_median
aperture_mask = aperture.to_mask(method="center")
aperture_unc_data = aperture_mask.multiply(image_unc_cutout)
error = np.sqrt(np.nansum(aperture_unc_data**2))
phot_table = aperture_photometry(
data=image_cutout - bkg, apertures=aperture, mask=image_cutout_mask
)
counts = phot_table["aperture_sum"][0]
counts_err = error
return counts, counts_err
[docs]
def get_rms_image(image: Image) -> Image:
"""Get an RMS image from a regular image
:param image: An :class:`~mirar.data.image_data.Image`
:param rms: rms of the image
:return: An RMS :class:`~mirar.data.image_data.Image`
"""
image_data = image.get_data()
image_data = image_data[np.invert(np.isnan(image_data))]
rms = 0.5 * (
np.percentile(image_data[image_data != 0.0], 84.13)
- np.percentile(image_data[image_data != 0.0], 15.86)
)
gain = image[GAIN_KEY]
poisson_noise = np.copy(image.get_data()) / gain
poisson_noise[poisson_noise < 0] = 0
rms_image = Image(data=np.sqrt(poisson_noise + rms**2), header=image.get_header())
return rms_image
[docs]
def get_mags_from_fluxes(
flux_list: np.ndarray[float],
fluxunc_list: np.ndarray[float],
zeropoint: float,
zeropoint_unc: float,
) -> tuple[np.ndarray, np.ndarray]:
"""Convert fluxes and flux uncertainties to magnitudes and magnitude uncertainties
:param flux_list: List of fluxes
:param fluxunc_list: List of flux uncertainties
:param zeropoint: Zeropoint
:param zeropoint_unc: Zeropoint uncertainties
:return: magnitudes, magnitude uncertainties
"""
assert len(flux_list) == len(fluxunc_list)
magnitudes = zeropoint - 2.5 * np.log10(flux_list)
magnitudes_unc = 1.086 * fluxunc_list / flux_list
magnitudes_unc = np.sqrt(magnitudes_unc**2 + zeropoint_unc**2)
magnitudes = magnitudes.astype(object)
magnitudes_unc = magnitudes_unc.astype(object)
magnitudes[flux_list <= 0] = None
magnitudes_unc[flux_list <= 0] = None
return magnitudes, magnitudes_unc