Source code for mirar.processors.skyportal.skyportal_candidate

"""
Module for sending candidates to Fritz.
"""

import logging
from datetime import datetime

from astropy.time import Time

from mirar.paths import SOURCE_NAME_KEY
from mirar.processors.skyportal.skyportal_source import SkyportalSourceUploader

logger = logging.getLogger(__name__)


[docs] class SkyportalCandidateUploader(SkyportalSourceUploader): """ Processor for sending candidates to Fritz. """ def __init__( self, *args, stream_id: int, fritz_filter_id: int, annotation_keys: list[str] | None = None, **kwargs, ): super().__init__(*args, **kwargs) self.stream_id = stream_id self.fritz_filter_id = fritz_filter_id self.annotation_keys = annotation_keys
[docs] def description(self) -> str: return ( f"Sending candidates via API to {self.skyportal_client.base_url} " f"(filter={self.fritz_filter_id})" )
[docs] def skyportal_post_candidate(self, alert): """ Post a candidate on SkyPortal. Creates new candidate(s) (one per filter) :param alert: dict of alert info :return: None """ data = { "id": alert[SOURCE_NAME_KEY], "ra": alert["ra"], "dec": alert["dec"], "filter_ids": [self.fritz_filter_id], "passing_alert_id": self.fritz_filter_id, "passed_at": Time(datetime.utcnow()).isot, "origin": self.origin, } logger.debug( f"Posting metadata of {alert[SOURCE_NAME_KEY]} " f"{alert['candid']} to SkyPortal" ) response = self.api("POST", "candidates", data) if response.json()["status"] == "success": logger.debug( f"Posted {alert[SOURCE_NAME_KEY]} {alert['candid']} " f"metadata to SkyPortal" ) else: logger.error( f"Failed to post {alert[SOURCE_NAME_KEY]} {alert['candid']} " f"metadata to SkyPortal" ) logger.error(response.json())
[docs] def get_annotations(self, alert) -> dict: """ Retrieve annotations from alert data. :param alert: Alert data :return: Annotations """ data = {} if self.annotation_keys is not None: for key in self.annotation_keys: if key in alert: data[key] = alert[key] return data
[docs] def skyportal_post_annotation(self, alert): """ Post an annotation. Works for both candidates and sources. :param alert: alert data :return: None """ data = self.get_annotations(alert) payload = {"origin": self.origin, "data": data, "group_ids": self.group_ids} path = f"sources/{str(alert[SOURCE_NAME_KEY])}/annotations" response = self.api("POST", path, payload) if response.json()["status"] == "success": logger.debug(f"Posted {alert[SOURCE_NAME_KEY]} annotation to SkyPortal") else: logger.error( f"Failed to post {alert[SOURCE_NAME_KEY]} annotation to SkyPortal" ) logger.error(response.json())
[docs] def skyportal_put_annotation(self, source): """ Retrieve an annotation to check if it exists already. :param source: detection data :return: None """ response = self.api( "GET", f"sources/{str(source[SOURCE_NAME_KEY])}/annotations", ) if response.json()["status"] == "success": logger.debug(f"Got {source[SOURCE_NAME_KEY]} annotations from SkyPortal") else: logger.debug( f"Failed to get {source[SOURCE_NAME_KEY]} annotations from SkyPortal" ) logger.debug(response.json()) return existing_annotations = { annotation["origin"]: { "annotation_id": annotation["id"], "author_id": annotation["author_id"], } for annotation in response.json()["data"] } # no previous annotation exists on SkyPortal for this object? just post then if self.origin not in existing_annotations: self.skyportal_post_annotation(source) # annotation from this(WNTR) origin exists else: # annotation data data = self.get_annotations(source) new_annotation = { "author_id": existing_annotations[self.origin]["author_id"], "obj_id": source[SOURCE_NAME_KEY], "origin": self.origin, "data": data, "group_ids": self.group_ids, } logger.debug( f"Putting annotation for {source[SOURCE_NAME_KEY]} {source['candid']} " f"to SkyPortal", ) response = self.api( "PUT", f"sources/{source[SOURCE_NAME_KEY]}" f"/annotations/{existing_annotations[self.origin]['annotation_id']}", new_annotation, ) if response.json()["status"] == "success": logger.debug( f"Posted updated {source[SOURCE_NAME_KEY]} annotation to SkyPortal" ) else: logger.error( f"Failed to post updated {source[SOURCE_NAME_KEY]} annotation " f"to SkyPortal" ) logger.error(response.json())
[docs] def export_to_skyportal(self, alert): """ Posts a candidate to SkyPortal. :param alert: _description_ :type alert: _type_ """ # check if candidate exists in SkyPortal logger.debug(f"Checking if {alert[SOURCE_NAME_KEY]} is candidate in SkyPortal") response = self.api("HEAD", f"candidates/{alert[SOURCE_NAME_KEY]}") if response.status_code not in [200, 404]: response.raise_for_status() is_candidate = response.status_code == 200 logger.debug( f"{alert[SOURCE_NAME_KEY]} {'is' if is_candidate else 'is not'} " f"candidate in SkyPortal" ) # check if source exists in SkyPortal logger.debug(f"Checking if {alert[SOURCE_NAME_KEY]} is source in SkyPortal") response = self.api("HEAD", f"sources/{alert[SOURCE_NAME_KEY]}") if response.status_code not in [200, 404]: response.raise_for_status() is_source = response.status_code == 200 logger.debug( f"{alert[SOURCE_NAME_KEY]} " f"{'is' if is_source else 'is not'} source in SkyPortal" ) # object does not exist in SkyPortal: neither cand nor source if (not is_candidate) and (not is_source): # post candidate self.skyportal_post_candidate(alert) # post annotations self.skyportal_post_annotation(alert) # post full light curve logger.debug(f"Using stream_id={self.stream_id}") self.skyportal_put_photometry(alert) # post thumbnails self.skyportal_post_thumbnails(alert) # obj already exists in SkyPortal else: # post candidate with new filter ids self.skyportal_post_candidate(alert) # put (*not* post) annotations self.skyportal_put_annotation(alert) # exists in SkyPortal & already saved as a source if is_source: # get info on the corresponding groups: logger.debug( f"Getting source groups info on " f"{alert[SOURCE_NAME_KEY]} from SkyPortal", ) response = self.api( "GET", f"sources/{alert[SOURCE_NAME_KEY]}/groups", ) if response.json()["status"] == "success": existing_groups = response.json()["data"] existing_group_ids = [g["id"] for g in existing_groups] for existing_gid in existing_group_ids: if existing_gid in self.group_ids: self.skyportal_post_source(alert, [existing_gid]) else: logger.error( f"Failed to get source groups info on {alert[SOURCE_NAME_KEY]}" ) # post alert photometry in single call to /api/photometry logger.debug(f"Using stream_id={self.stream_id}") self.skyportal_put_photometry(alert) if self.update_thumbnails: self.skyportal_post_thumbnails(alert) logger.debug(f"SendToSkyportal Manager complete for {alert[SOURCE_NAME_KEY]}")