Source code for time_domain_astronomy_sandbox.observation

"""Observation class."""
import numpy as np
from .backend import Backend
from .pulse import Pulse
from .rfim import RFIm


[docs]class Observation(): """Observation class.""" def __init__(self, backend:Backend, length:int = 1, # length in second t0:float = 0. ): """Initialise Observation class. Parameters ---------- backend : Backend An instance of Backend class length : int Length of the observation (in second) t0 : float Starting time of the observation (in second) """ self.backend = backend self.length = length self.t0 = t0 self.window = np.random.normal(0, 1, (self.backend.n_channels, int(self.length*backend.samples_per_second))) # self.window += np.abs(np.min(self.window)) self.noise_median = np.median(self.window).copy() self.noise_std = np.std(self.window).copy() # Time self.time_to_index = lambda t_i : np.ceil((t_i-self.t0)/self.backend.sampling_time).astype(int) self.index_to_time = lambda index : index * self.backend.sampling_time + self.t0 self.next_time = lambda i : i * self.backend.sampling_time self.times = np.array([self.next_time(i) for i in range(int(self.backend.samples_per_second * self.length))]) self.time_indices = np.array([self.time_to_index(t) for t in self.times]) # self.snr = lambda snr, area : (self.noise_median + snr * self.noise_std) / np.sqrt(area) self.snr = lambda snr, area : snr / np.sqrt(area) @property def window(self): return self._window @window.setter def window(self, window): self._window = window
[docs] def time_cleaning(self, window=[], n_iter=1, threshold=3.25, symetric=False, keep_state=False): """RFI mitigation (cleaning) in time domain. Parameters ---------- window : (list | Numpy.array) An observation window (to clean a specific instance of window). If empty, cleans self.window n_iter : int Number of cleaning iteration keep_state : bool Save result of cleaning to self.window Returns ------- self.window : Numpy.array The cleaned window. """ if len(window) == 0: window = self.window if keep_state: window = RFIm().tdsc_amber(window, threshold=threshold, n_iter=n_iter, symetric=symetric) else: return RFIm().tdsc_amber(window, threshold=threshold, n_iter=n_iter, symetric=symetric) self.window = window return self.window
[docs] def frequency_cleaning(self, window=[], n_iter=1, bin_size=32, threshold=2.75, symetric=False, keep_state=False): """RFI mitigation (cleaning) in frequency domain. Parameters ---------- window : (list | Numpy.array) An observation window (to clean a specific instance of window). If empty, cleans self.window n_iter : int Number of cleaning iteration keep_state : bool Save result of cleaning to self.window Returns ------- self.window : Numpy.array The cleaned window. """ if len(window) == 0: window = self.window if keep_state: window = RFIm().fdsc_amber(window, n_iter=n_iter, bin_size=bin_size, threshold=threshold, symetric=symetric) else: return RFIm().fdsc_amber(window, n_iter=n_iter, bin_size=bin_size, threshold=threshold, symetric=symetric) self.window = window return self.window
def dm0_cleaning(self, window=[], threshold=3.25, keep_state=False): if len(window) == 0: window = self.window if keep_state: window = RFIm().dm0clean(window, threshold=threshold) else: return RFIm().dm0clean(window, threshold=threshold) self.window = window return self.window
[docs] def dedisperse(self, dm, window=[]): """Dedisperse an observation window for a given dispersion measure (DM). Parameters ---------- dm : int Dispersion measure to use for dedispersion window : (list | Numpy.array) An observation window (to clean a specific instance of window). If empty, cleans self.window Returns ------- dedispersed_window : Numpy.array The dedispersed window. """ if len(window) == 0: window = self.window return np.array([ np.roll(window[i].copy(), -r) for i, r in zip( range(window.shape[0]), self.time_to_index(Pulse(self.backend).delays(dm)) ) ])
def add_signal(self, signal_value, x_t0, x_t1, y_t0, y_t1): self.window[x_t0:x_t1, y_t0:y_t1] += signal_value def add_dispersed_pulse(self, dm, width, pulse_t0, snr=100, verbose=False): pulse = Pulse(self.backend, width=width) pulse_t_start = self.index_to_time(index=self.time_to_index(pulse_t0).astype(int)) t_idx = self.time_to_index(t_i=pulse_t_start+pulse.delays(dm=dm)) area = np.sqrt(self.backend.n_channels*pulse.width/self.backend.sampling_time) value = self.snr(snr, area) for i in range(t_idx.shape[0]): self.add_signal(value, i, i+1, t_idx[i], t_idx[i]+self.time_to_index(pulse.width)) if verbose: print ("snr:", snr, "value: ", value, "area: ", area) def add_rfi(self, t_start=0., t_stop=0.5, t_step=0.03, t_width=0.003, f_start=200, f_stop=250, snr=50, verbose=False ): start = self.time_to_index(t_start) stop = self.time_to_index(t_stop) step = self.time_to_index(t_step) width = self.time_to_index(t_width) height = np.abs(f_start-f_stop) # area = np.sqrt((stop-start)/(step*width)) area = np.sqrt(width*height) value = self.snr(snr, area) for t in range(start, stop, step): self.add_signal(value, f_start, f_stop, t, t+width) if verbose: print ("snr:", snr, "value: ", value)