Source code for mirar.processors.avro.base_avro_exporter

"""
Module with classes to make avro alert packets
"""

import logging
import time
from pathlib import Path

import fastavro
from fastavro.schema import load_schema
from fastavro.types import Schema

from mirar.data import SourceBatch, SourceTable
from mirar.paths import get_output_dir
from mirar.processors.base_processor import BaseSourceProcessor

logger = logging.getLogger(__name__)


[docs] class BaseAvroExporter(BaseSourceProcessor): """Class to generate Avro Packets from a dataframe of candidates. Attributes: output_sub_dir (str): output data path. base_name (str): 4-letter code for telescope. save_local (bool): save avro packets to out_sub_dir. broadcast (bool): send to brokers at IPAC. """ base_key = "AVRO" def __init__( self, base_name: str, avro_schema_path: Path | str, output_sub_dir: str = "avro", save_local: bool = True, broadcast: bool = False, ): super().__init__() self.output_sub_dir = output_sub_dir self.base_name = base_name self.avro_schema_path = Path(avro_schema_path) self.save_local = save_local self.broadcast = broadcast assert ( self.avro_schema_path.exists() ), f"Schema file {self.avro_schema_path} does not exist" self.schema = load_schema(self.avro_schema_path)
[docs] def description(self) -> str: return ( f"Creates avros with '{self.avro_schema_path.name}' schema, " f" saves to '{self.output_sub_dir}' directory. Broadcast: {self.broadcast}" )
def _apply_to_sources( self, batch: SourceBatch, ) -> SourceBatch: for source_table in batch: new_alerts, save_path, topic_name = self.make_alerts(source_table) self.process_alerts(new_alerts, save_path, topic_name) return batch
[docs] def make_alerts( self, source_table: SourceTable ) -> tuple[list[dict], Path, str | None]: """ Make avro alerts from a source table :param source_table: input source table :return: list of avro alerts, path to save avro alerts to, topic name """ raise NotImplementedError
[docs] def get_sub_output_dir(self): """Returns path of output subdirectory.""" return get_output_dir(self.output_sub_dir, self.night_sub_dir)
[docs] @staticmethod def save_alert_packets(packets: list[dict], schema: Schema, save_path: Path | str): """ Saves packets to output path. :param packets: list of packets to save :param schema: schema of the packets :param save_path: path to save packets to """ with open(save_path, "wb") as out: fastavro.writer(out, schema, packets)
@staticmethod def _send_alert(topicname, records, schema): """ Function to send alert to Kafka broker :param topicname: Name of the topic to send to :param records: Records to send :param schema: Schema of the records :return: None """ raise NotImplementedError
[docs] def broadcast_single_alert_packet(self, packet, schema, topic_name): """ Sends avro-formatted packets to specified topicname using Kafka. Modified from https://github.com/dekishalay/pgirdps Args: packet (dict): candidate data in avro packed dict format. schema (dict): schema definition. topic_name (str): name of the topic sending to, e.g. ztf_20191221. Returns: (int): 1 if broadcast successful or -1 if candidate not sent. """ try: self._send_alert(topic_name, [packet], schema) return 1 except OSError: logger.warning(f"Could not send candid {packet['candidate']['candid']}") return -1
[docs] def process_alerts( self, alerts: list[dict], save_path: Path, topic_name: str | None = None ): """ Top level method to process avro alerts. :param alerts: list of avro alerts :param save_path: path to save avro alerts to :param topic_name: name of the topic sending to, e.g. ztf_20191221 :return: None """ if self.save_local: # Save avro packets to local directory save_path.parent.mkdir(parents=True, exist_ok=True) logger.debug(f"Saving {len(alerts)} alerts to {save_path}") self.save_alert_packets(alerts, self.schema, save_path) if self.broadcast: t_start = time.time() if topic_name is None: raise ValueError("topic_name must be specified if broadcast is True") logger.debug(f"Broadcasting {len(alerts)} alerts to {topic_name}") # Export avro packets to Kafka broker successes = 0 # successful kafka producing of a candidate for cand in alerts: flag = self.broadcast_single_alert_packet(cand, self.schema, topic_name) if flag > 0: successes += 1 t_end = time.time() logger.debug( f"Took {(t_end - t_start):.2f} seconds to process. " f"{successes} of {len(alerts)} successfully broadcast." )