mirar.processors.avro package

Module for producing, and broadcasting, avro alerts

Submodules

mirar.processors.avro.base_avro_exporter module

Module with classes to make avro alert packets

class mirar.processors.avro.base_avro_exporter.BaseAvroExporter(base_name: str, avro_schema_path: Path | str, output_sub_dir: str = 'avro', save_local: bool = True, broadcast: bool = False)[source]

Bases: BaseSourceProcessor

Class to generate Avro Packets from a dataframe of candidates.

Attributes:

output_sub_dir (str): output data path. base_name (str): 4-letter code for telescope. save_local (bool): save avro packets to out_sub_dir. broadcast (bool): send to brokers at IPAC.

base_key = 'AVRO'
broadcast_single_alert_packet(packet, schema, topic_name)[source]

Sends avro-formatted packets to specified topicname using Kafka. Modified from https://github.com/dekishalay/pgirdps

Args:

packet (dict): candidate data in avro packed dict format. schema (dict): schema definition. topic_name (str): name of the topic sending to, e.g. ztf_20191221.

Returns:

(int): 1 if broadcast successful or -1 if candidate not sent.

description() str[source]

Return a description of the processor

Returns:

A description of the processor

get_sub_output_dir()[source]

Returns path of output subdirectory.

make_alerts(source_table: SourceTable) tuple[list[dict], Path, str | None][source]

Make avro alerts from a source table

Parameters:

source_table – input source table

Returns:

list of avro alerts, path to save avro alerts to, topic name

process_alerts(alerts: list[dict], save_path: Path, topic_name: str | None = None)[source]

Top level method to process avro alerts.

Parameters:
  • alerts – list of avro alerts

  • save_path – path to save avro alerts to

  • topic_name – name of the topic sending to, e.g. ztf_20191221

Returns:

None

static save_alert_packets(packets: list[dict], schema: str | List[Any] | Dict[Any, Any], save_path: Path | str)[source]

Saves packets to output path.

Parameters:
  • packets – list of packets to save

  • schema – schema of the packets

  • save_path – path to save packets to

mirar.processors.avro.ipac_avro_exporter module

Module with classes to make avro alert packets

class mirar.processors.avro.ipac_avro_exporter.IPACAvroExporter(*args, topic_prefix: str | None = None, **kwargs)[source]

Bases: BaseAvroExporter

Class to generate Avro Packets from a dataframe of candidates.

Attributes:

output_sub_dir (str): output data path. base_name (str): 4-letter code for telescope. save_local (bool): save avro packets to out_sub_dir. broadcast (bool): send to brokers at IPAC.

static fill_schema(schema: str | List[Any] | Dict[Any, Any], row: Series, metadata: dict) dict[source]

Fill an avro schema with data from a row of a pandas dataframe

Parameters:
  • schema – Schema to fill

  • row – Row of pandas dataframe

  • metadata – Metadata to fill

Returns:

Dictionary of filled schema

get_topic_name() str[source]

Get the topic name for a source table

Returns:

topic name

make_alerts(source_table: SourceTable) tuple[list[dict], Path, str | None][source]

Make avro alerts from a source table

Parameters:

source_table – input source table

Returns:

list of avro alerts