Source code for amoeba.Util.pipeline_util

import numpy as np
import astropy.units as u
import astropy.constants as const
from speclite.filters import (
    load_filter,
    load_filters,
)
import speclite
from amoeba.Util.util import convolve_signal_with_transfer_function

# todo: add pipeline support for microlensing


[docs]def intrinsic_signal_propagation_pipeline_for_agn( AGN, intrinsic_light_curve=None, time_axis=None, observer_frame_wavelengths_in_nm=None, speclite_filter=None, return_components=False, **kwargs, ): """run the pipeline to generate the full AGN intrinsic signal :param AGN: The AGN object to propagate the signal through. :param intrinsic_light_curve: the signal to propogate though the AGN model. Must be in units of days and sampled daily if time_axis is not given. :param time_axis: the time stamps of the driving signal to be specified if the driving signal is not evenly sampled every day. :param observer_frame_wavelengths_in_nm: a wavelength or list of wavelengths in nm. :param speclite_filter: a speclite filter, list of speclite filters, or list of speclite filter names. :param return_components: a bool which allows the return of each component light curve in addition to the combined light curve. :param output_cadence: float/int representing the output cadence in days :return: a list with the length of wavelengths/filters of light curves represented by lists of time axes and amplitudes. If return_components, then the BLR light curves are returned as a dictionary with the key being the BLR index. """ if observer_frame_wavelengths_in_nm is None and speclite_filter is None: print("please provide a range of wavelengths or a speclite filter to use") return False if observer_frame_wavelengths_in_nm is not None and speclite_filter is not None: print("only provide a range of wavelengths or a speclite filter to use") return False if speclite_filter is not None: if isinstance(speclite_filter, str): try: current_filters = [load_filter(speclite_filter)] except: current_filters = load_filters(speclite_filter) elif isinstance(speclite_filter, speclite.filters.FilterResponse): current_filters = [speclite_filter] elif isinstance(speclite_filter, list): successful_filters = [] for item in speclite_filter: if isinstance(item, str): try: current_filter = load_filter(item) successful_filters.append(current_filter) except: continue elif isinstance(item, speclite.filters.FilterResponse): successful_filters.append(item) else: print(f"{item} not recognized") if len(successful_filters) == 0: print("no filters loaded, no propagation required") return False current_filters = successful_filters mean_wavelengths = [] wavelength_ranges = [] for band in current_filters: mean_wavelengths.append(band.effective_wavelength.to(u.nm).value) min_wavelength = band.wavelength[np.argmax(band.response > 0.01)] / 10 total_wavelengths = np.sum(band.response > 0.01) wavelength_ranges.append( [ int(min_wavelength), int((min_wavelength + total_wavelengths / 10)), ] ) else: if isinstance(observer_frame_wavelengths_in_nm, (int, float)): mean_wavelengths = [observer_frame_wavelengths_in_nm] wavelength_ranges = [ [ observer_frame_wavelengths_in_nm - 20, observer_frame_wavelengths_in_nm + 20, ] ] elif isinstance(observer_frame_wavelengths_in_nm, (list, np.ndarray)): mean_wavelengths = [] wavelength_ranges = [] for band in observer_frame_wavelengths_in_nm: if isinstance(band, (int, float)): mean_wavelengths.append(band) wavelength_ranges.append([band - 20, band + 20]) elif isinstance(band, (list, np.ndarray)): mean_wavelengths.append(np.mean(band)) wavelength_ranges.append([np.min(band), np.max(band)]) if len(mean_wavelengths) == 0: print( "please provide a speclite filter, wavelength, wavelength range, or list containing \n previously mentioned types" ) return False if "accretion_disk" not in AGN.components.keys(): print( "please add an accretion disk model to this agn, other components require the variable continuum." ) return False if intrinsic_light_curve is None: if AGN.intrinsic_light_curve is None: try: AGN.generate_intrinsic_signal(len(AGN.frequencies)) except: print( "please provide a psd and set of frequencies, or a driving light curve" ) return False intrinsic_light_curve = AGN.intrinsic_light_curve.copy() time_axis = AGN.intrinsic_light_curve_time_axis.copy() reprocessed_signals = [] for wavelength in mean_wavelengths: current_tf = AGN.components[ "accretion_disk" ].construct_accretion_disk_transfer_function(wavelength) if "diffuse_continuum" in AGN.components.keys(): current_dc_mean_lag_increase = AGN.components[ "diffuse_continuum" ].get_diffuse_continuum_lag_contribution(wavelength) lag_increase = np.zeros(int(current_dc_mean_lag_increase)) current_tf = np.concatenate((lag_increase, current_tf)) t_ax, current_signal = convolve_signal_with_transfer_function( smbh_mass_exp=AGN.smbh_mass_exp, driving_signal=intrinsic_light_curve, initial_time_axis=time_axis, transfer_function=current_tf, redshift_source=0, desired_cadence_in_days=0.1, ) reprocessed_signals.append([t_ax, current_signal]) output_signals = reprocessed_signals.copy() blr_signals = {} if len(AGN.blr_indicies) > 0: for index in AGN.blr_indicies: blr_signals[str(index)] = [] for jj, wavelength_range in enumerate(wavelength_ranges): weighting_factor, current_blr_tf = AGN.components[ "blr_" + str(index) ].calculate_blr_emission_line_transfer_function( AGN.inclination_angle, observed_wavelength_range_in_nm=wavelength_range, ) t_ax, contaminated_signals = convolve_signal_with_transfer_function( smbh_mass_exp=AGN.smbh_mass_exp, driving_signal=reprocessed_signals[jj][1], initial_time_axis=reprocessed_signals[jj][0], transfer_function=current_blr_tf, redshift_source=0, desired_cadence_in_days=0.1, ) contaminated_signals -= np.mean(contaminated_signals) if np.std(contaminated_signals) != 0: contaminated_signals /= np.std(contaminated_signals) blr_signals[str(index)].append( [t_ax, contaminated_signals, weighting_factor] ) for jj, wavelength_range in enumerate(wavelength_ranges): current_weighting = 1 original_mean = np.mean(reprocessed_signals[jj][1]) original_std = np.std(reprocessed_signals[jj][1]) current_signal = reprocessed_signals[jj][1] - original_mean if original_std != 0: current_signal /= original_std if len(AGN.blr_indicies) > 0: for index in AGN.blr_indicies: if isinstance(blr_signals[str(index)], list): if not isinstance( blr_signals[str(index)][jj], list ): # pragma: no cover continue current_weighting += blr_signals[str(index)][jj][-1] current_blr_signal = blr_signals[str(index)][jj][1] current_blr_signal -= np.mean(current_blr_signal) if np.std(current_blr_signal) != 0: current_blr_signal /= np.std(current_blr_signal) current_signal += ( current_blr_signal * blr_signals[str(index)][jj][2] ) output_signals[jj] = [ reprocessed_signals[jj][0] * (1 + AGN.redshift_source), current_signal, ] if return_components is True: return [reprocessed_signals, blr_signals, output_signals] return output_signals
[docs]def visualization_pipeline( AGN, inclination_angle=None, observer_frame_wavelengths_in_nm=None, speclite_filter=None, return_components=None, **kwargs, ): """This pipeline produces a flux projection object of the combined emission of each agn component. Note that depending on flux ratios between various components, one may easily dominate the flux distribution. :param AGN: The AGN object to project into the source plane :param inclination_angle: the inclination of the source w.r.t. the observer :param observer_frame_wavelengths_in_nm: a [list of] wavelength[s] to project the AGN into. Cannot be used with speclite_filter :param speclite_filter: a [list of] speclite filter[s] to project the AGN into. Cannot be used with observer_frame_wavelengths_in_nm. :param return_components: a boolean toggle to return each component as individual FluxProjection objects before adding them together. :return: A list of FluxProjection objects for each wavelength range or speclite filter given. """ if inclination_angle is not None: update_output = AGN.update_inclination(inclination_angle) if update_output is False: return False if observer_frame_wavelengths_in_nm is None and speclite_filter is None: print("please provide a range of wavelengths or a speclite filter to use") return False if observer_frame_wavelengths_in_nm is not None and speclite_filter is not None: print("only provide a range of wavelengths or a speclite filter to use") return False if speclite_filter is not None: if isinstance(speclite_filter, str): try: current_filters = [load_filter(speclite_filter)] except: current_filters = load_filters(speclite_filter) elif isinstance(speclite_filter, speclite.filters.FilterResponse): current_filters = [speclite_filter] elif isinstance(speclite_filter, list): successful_filters = [] for item in speclite_filter: if isinstance(item, str): try: current_filter = load_filter(item) successful_filters.append(current_filter) except: continue elif isinstance(item, speclite.filters.FilterResponse): successful_filters.append(item) else: print(f"{item} not recognized") if len(successful_filters) == 0: print("no filters loaded, no visualization required") return False current_filters = successful_filters mean_wavelengths = [] wavelength_ranges = [] for band in current_filters: mean_wavelengths.append(band.effective_wavelength.to(u.nm).value) min_wavelength = band.wavelength[np.argmax(band.response > 0.01)] / 10 total_wavelengths = np.sum(band.response > 0.01) wavelength_ranges.append( [ int(min_wavelength), int((min_wavelength + total_wavelengths / 10)), ] ) else: if isinstance(observer_frame_wavelengths_in_nm, (int, float)): mean_wavelengths = [observer_frame_wavelengths_in_nm] wavelength_ranges = [ [ observer_frame_wavelengths_in_nm - 20, observer_frame_wavelengths_in_nm + 20, ] ] elif isinstance(observer_frame_wavelengths_in_nm, (list, np.ndarray)): mean_wavelengths = [] wavelength_ranges = [] for band in observer_frame_wavelengths_in_nm: if isinstance(band, (int, float)): mean_wavelengths.append(band) wavelength_ranges.append([band - 20, band + 20]) elif isinstance(band, (list, np.ndarray)): mean_wavelengths.append(np.mean(band)) wavelength_ranges.append([np.min(band), np.max(band)]) if len(mean_wavelengths) == 0: # pragma: no cover print( "please provide a speclite filter, wavelength, wavelength range, or list containing \n previously mentioned types" ) return False if len(AGN.components.keys()) == 0: print( "please add at least one component model to this agn. You cannot project nothing!" ) return False list_of_wavelength_resolved_projections = [] for jj, wavelength in enumerate(mean_wavelengths): list_of_projections = [] if "accretion_disk" in AGN.components.keys(): current_img = AGN.components[ "accretion_disk" ].calculate_surface_intensity_map(wavelength) list_of_projections.append(current_img) reference_flux = current_img.total_flux reference_px_size = current_img.pixel_size if "diffuse_continuum" in AGN.components.keys(): current_img = AGN.components[ "diffuse_continuum" ].get_diffuse_continuum_emission(wavelength) current_img.total_flux *= reference_flux current_img.flux_array *= current_img.total_flux / np.sum( current_img.flux_array * current_img.pixel_size**2 ) list_of_projections.append(current_img) output_blr_projection = None if len(AGN.blr_indicies) > 0: for kk, index in enumerate(AGN.blr_indicies): current_projection = AGN.components[ "blr_" + str(index) ].project_blr_intensity_over_velocity_range( AGN.inclination_angle, observed_wavelength_range_in_nm=[wavelength_ranges[jj]], ) if current_projection.total_flux > 0: current_projection.total_flux *= ( AGN.components["blr_" + str(index)].line_strength * reference_flux / current_projection.total_flux ) current_projection.flux_array *= current_projection.total_flux / ( np.sum( current_projection.flux_array * current_projection.pixel_size**2 ) ) if output_blr_projection is None: output_blr_projection = current_projection else: output_blr_projection.add_flux_projection(current_projection) if output_blr_projection is not None: list_of_projections.append(output_blr_projection) list_of_wavelength_resolved_projections.append(list_of_projections) if return_components: return list_of_wavelength_resolved_projections output_projections = [] for list_of_projections in list_of_wavelength_resolved_projections: output_projections.append(list_of_projections[0]) for jj in range(len(list_of_projections) - 1): output_projections[-1].add_flux_projection(list_of_projections[jj + 1]) return output_projections