Coverage for /home/runner/work/nr-catalog-tools/nr-catalog-tools/nrcatalogtools/waveform.py: 90%
184 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-01 05:18 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-01 05:18 +0000
1import os
3import h5py
4import lal
5import numpy as np
6from pycbc.types import TimeSeries
7from pycbc.waveform import frequency_from_polarizations
8from scipy.interpolate import InterpolatedUnivariateSpline
9from scipy.stats import mode as stat_mode
11from nrcatalogtools import utils
12from nrcatalogtools.lvc import (
13 check_interp_req,
14 get_nr_to_lal_rotation_angles,
15 get_ref_vals,
16)
17from sxs import WaveformModes as sxs_WaveformModes
18from sxs.time_series import TimeSeries as SXSTimeSeries
19from sxs.waveforms.nrar import (
20 h,
21 translate_data_type_to_spin_weight,
22 translate_data_type_to_sxs_string,
23)
26class WaveformModes(sxs_WaveformModes):
27 def __new__(
28 cls,
29 data,
30 time=None,
31 time_axis=0,
32 modes_axis=1,
33 ell_min=2,
34 ell_max=4,
35 verbosity=0,
36 **w_attributes,
37 ) -> None:
38 self = super().__new__(
39 cls,
40 data,
41 time=time,
42 time_axis=time_axis,
43 modes_axis=modes_axis,
44 ell_min=ell_min,
45 ell_max=ell_max,
46 **w_attributes,
47 )
48 self._t_ref_nr = None
49 self._filepath = None
50 self.verbosity = verbosity
51 return self
53 @classmethod
54 def _load(
55 cls,
56 data,
57 time=None,
58 time_axis=0,
59 modes_axis=1,
60 ell_min=2,
61 ell_max=4,
62 verbosity=0,
63 **w_attributes,
64 ):
65 if time is None:
66 time = np.arange(0, len(data[:, 0]))
67 return cls(
68 data,
69 time=time,
70 time_axis=time_axis,
71 modes_axis=modes_axis,
72 ell_min=ell_min,
73 ell_max=ell_max,
74 verbosity=verbosity,
75 **w_attributes,
76 )
78 @classmethod
79 def load_from_h5(cls, file_path_or_open_file, metadata={}, verbosity=0):
80 """Method to load SWSH waveform modes from RIT or MAYA catalogs
81 from HDF5 file.
83 Args:
84 file_path_or_open_file (str or open file): Either the path to an
85 HDF5 file containing waveform data, or an open file pointer to
86 the same.
87 metadata (dict): Dictionary containing metadata (Note that keys
88 will be NR group specific)
89 verbosity (int, optional): Verbosity level with which to
90 print messages during execution. Defaults to 0.
92 Raises:
93 RuntimeError: If inputs are invalid, or if no mode found in
94 input file.
96 Returns:
97 WaveformModes: Object containing time-series of SWSH modes.
98 """
99 import quaternionic
101 if type(file_path_or_open_file) == h5py._hl.files.File:
102 h5_file = file_path_or_open_file
103 close_input_file = False
104 elif os.path.exists(file_path_or_open_file):
105 h5_file = h5py.File(file_path_or_open_file, "r")
106 close_input_file = True
107 else:
108 raise RuntimeError(f"Could not use or open {file_path_or_open_file}")
110 # Set the file path attribute
111 cls._filepath = h5_file.filename
112 # If _metadata is not already
113 # a set attribute, then set
114 # it here.
116 try:
117 cls._metadata
118 except AttributeError:
119 cls._metadata = metadata
121 ELL_MIN, ELL_MAX = 2, 10
122 ell_min, ell_max = 99, -1
123 LM = []
124 t_min, t_max, dt = -1e99, 1e99, 1
125 mode_data = {}
126 for ell in range(ELL_MIN, ELL_MAX + 1):
127 for em in range(-ell, ell + 1):
128 afmt = f"amp_l{ell}_m{em}"
129 pfmt = f"phase_l{ell}_m{em}"
130 if afmt not in h5_file or pfmt not in h5_file:
131 continue
132 amp_time = h5_file[afmt]["X"][:]
133 amp = h5_file[afmt]["Y"][:]
134 phase_time = h5_file[pfmt]["X"][:]
135 phase = h5_file[pfmt]["Y"][:]
136 mode_data[(ell, em)] = [amp_time, amp, phase_time, phase]
137 # get the minimum time and maximum time stamps for all modes
138 t_min = max(t_min, amp_time[0], phase_time[0])
139 t_max = min(t_max, amp_time[-1], phase_time[-1])
140 dt = min(
141 dt,
142 stat_mode(np.diff(amp_time), keepdims=True)[0][0],
143 stat_mode(np.diff(phase_time), keepdims=True)[0][0],
144 )
145 ell_min = min(ell_min, ell)
146 ell_max = max(ell_max, ell)
147 LM.append([ell, em])
148 if close_input_file:
149 h5_file.close()
150 if len(LM) == 0:
151 raise RuntimeError(
152 "We did not find even one mode in the file. Perhaps the "
153 "format `amp_l?_m?` and `phase_l?_m?` is not the "
154 "nomenclature of datagroups in the input file?"
155 )
157 times = np.arange(t_min, t_max + 0.5 * dt, dt)
158 data = np.empty((len(times), len(LM)), dtype=complex)
159 for idx, (ell, em) in enumerate(LM):
160 amp_time, amp, phase_time, phase = mode_data[(ell, em)]
161 amp_interp = InterpolatedUnivariateSpline(amp_time, amp)
162 phase_interp = InterpolatedUnivariateSpline(phase_time, phase)
163 data[:, idx] = amp_interp(times) * np.exp(1j * phase_interp(times))
165 w_attributes = {}
166 w_attributes["metadata"] = metadata
167 w_attributes["history"] = ""
168 w_attributes["frame"] = quaternionic.array([[1.0, 0.0, 0.0, 0.0]])
169 w_attributes["frame_type"] = "inertial"
170 w_attributes["data_type"] = h
171 w_attributes["spin_weight"] = translate_data_type_to_spin_weight(
172 w_attributes["data_type"]
173 )
174 w_attributes["data_type"] = translate_data_type_to_sxs_string(
175 w_attributes["data_type"]
176 )
177 w_attributes["r_is_scaled_out"] = True
178 w_attributes["m_is_scaled_out"] = True
179 # w_attributes["ells"] = ell_min, ell_max
181 return cls(
182 data,
183 time=times,
184 time_axis=0,
185 modes_axis=1,
186 ell_min=ell_min,
187 ell_max=ell_max,
188 verbosity=verbosity,
189 **w_attributes,
190 )
192 @property
193 def filepath(self):
194 """Return the data file path"""
195 if not self._filepath:
196 self._filepath = self.sim_metadata["waveform_data_location"]
198 return self._filepath
200 @property
201 def sim_metadata(self):
202 """Return the simulation metadata dictionary"""
203 return self._metadata["metadata"]
205 def get_mode(self, ell, em):
206 return self[f"Y_l{ell}_m{em}.dat"]
208 @property
209 def f_lower_at_1Msun(self):
210 mode22 = self.get_mode(2, 2)
211 fr22 = frequency_from_polarizations(
212 TimeSeries(mode22[:, 1], delta_t=np.diff(self.time)[0]),
213 TimeSeries(-1 * mode22[:, 2], delta_t=np.diff(self.time)[0]),
214 )
215 return fr22[0] / lal.MTSUN_SI
217 def get_polarizations(
218 self, inclination, coa_phase, f_ref=None, t_ref=None, tol=1e-6
219 ):
220 """Sum over modes data and return plus and cross GW polarizations
222 Args:
223 inclination (float): Inclination angle between the line-of-sight
224 orbital angular momentum vector [radians]
225 coa_phase (float): Coalesence orbital phase [radians]
226 tol (float, optional) : The tolerance to allow for
227 floating point precision errors
228 in the computation of rotation
229 angles. Default value is 1e-6.
231 Returns:
232 Tuple(numpy.ndarray): Numpy Arrays containing polarizations
233 time-series
234 """
236 # Get angles
237 angles = self.get_angles(inclination, coa_phase, f_ref, t_ref, tol)
239 polarizations = self.evaluate([angles["theta"], angles["psi"], angles["alpha"]])
241 return polarizations
243 def get_td_waveform(
244 self,
245 total_mass,
246 distance,
247 inclination,
248 coa_phase,
249 delta_t=None,
250 f_ref=None,
251 t_ref=None,
252 k=3,
253 kind=None,
254 tol=1e-6,
255 ):
256 """Sum over modes data and return plus and cross GW polarizations,
257 rescaled appropriately for a compact-object binary with given
258 total mass and distance from GW detectors.
260 Returns:
261 Tuple(numpy.ndarray): Numpy Arrays containing polarizations
262 time-series
264 Args:
265 total_mass (_type_): _description_
266 distance (_type_): _description_
267 inclination (float): Inclination angle between the line-of-sight
268 orbital angular momentum vector [radians]
269 coa_phase (float): Coalesence orbital phase [radians]
270 delta_t (_type_, optional): _description_. Defaults to None.
271 f_ref (float, optional) : The reference frequency.
272 t_ref (float, optional) : The reference time.
273 k (int, optional) : The interpolation order to use with
274 `scipy.interpolate.InterpolatedUnivariateSpline`.
275 This is the method used by default with value 3.
276 This parameter `k` is given preference over
277 `kind` (see below).
278 kind (str, optional) : The interpolation order to use with
279 `scipy.interpolate.interp1d`
280 (`linear`, `quadratic`, `cubic`) or
281 `CubicSpline` to use `scipy.interpolate.CubicSpline`.
282 tol (float, optional) : The tolerance to allow for
283 floating point precision errors
284 in the computation of rotation
285 angles. Default value is 1e-6.
286 Returns:
287 pycbc.TimeSeries(numpy.complex128): Complex polarizations
288 stored in `pycbc` container `TimeSeries`
289 """
290 if delta_t is None:
291 delta_t = stat_mode(np.diff(self.time), keepdims=True)[0][0]
292 m_secs = utils.time_to_physical(total_mass)
293 # we assume that we generally do not sample at a rate below 128Hz.
294 # Therefore, depending on the numerical value of dt, we deduce whether
295 # dt is in dimensionless units or in seconds.
296 if delta_t > 1.0 / 128:
297 new_time = np.arange(min(self.time), max(self.time), delta_t)
298 else:
299 new_time = np.arange(min(self.time), max(self.time), delta_t / m_secs)
301 # Get angles
302 angles = self.get_angles(
303 inclination=inclination,
304 coa_phase=coa_phase,
305 f_ref=f_ref,
306 t_ref=t_ref,
307 tol=tol,
308 )
309 h = interpolate_in_amp_phase(
310 self.evaluate([angles["theta"], angles["psi"], angles["alpha"]]),
311 new_time,
312 k=k,
313 kind=kind,
314 ) * utils.amp_to_physical(total_mass, distance)
316 h.time *= m_secs
317 # Return conjugated waveform to comply with lal
318 return self.to_pycbc(np.conjugate(h))
320 def get_angles(self, inclination, coa_phase, f_ref=None, t_ref=None, tol=1e-6):
321 """Get the inclination, azimuthal and polarization angles
322 of the observer in the NR source frame.
324 Parameters
325 ----------
326 inclination : float
327 The inclination angle of the observer
328 in the LAL source frame
329 coa_phase : float
330 The coalescence phase. This will be
331 used to compute the reference orbital phase.
332 f_ref, t_ref : float, optional
333 The reference frquency and time to define the LAL source frame.
334 Defaults to the available frequency in the data file.
336 tol (float, optional) : The tolerance to allow for
337 floating point precision errors
338 in the computation of rotation
339 angles. Default value is 1e-6.
340 Returns
341 -------
342 angles : dict
343 angles : dict
344 The angular corrdinates Theta, Psi, and the rotation angle Alpha.
345 If available, this also contains the reference time and frequency.
346 """
348 # Get observer phi_ref
349 obs_phi_ref = self.get_obs_phi_ref_from_obs_coa_phase(
350 coa_phase=coa_phase, t_ref=t_ref, f_ref=f_ref
351 )
353 # Compute angles
354 with h5py.File(self.filepath) as h5_file:
355 angles = get_nr_to_lal_rotation_angles(
356 h5_file=h5_file,
357 sim_metadata=self.sim_metadata,
358 inclination=inclination,
359 phi_ref=obs_phi_ref,
360 f_ref=f_ref,
361 t_ref=t_ref,
362 tol=tol,
363 )
365 return angles
367 def to_pycbc(self, input_array=None):
368 if input_array is None:
369 input_array = self
371 delta_t = stat_mode(np.diff(input_array.time), keepdims=True)[0][0]
372 return TimeSeries(
373 np.array(input_array),
374 delta_t=delta_t,
375 dtype=self.ndarray.dtype,
376 epoch=input_array.time[0],
377 copy=True,
378 )
380 def get_nr_coa_phase(self):
381 """Get the NR coalescence orbital phase from the 2,2 mode."""
383 # Get the waveform phase.
384 phase_22 = self._get_phase(2, 2)
386 waveform_22 = self.get_mode(2, 2)[:, 1] + 1j * self.get_mode(2, 2)[:, 2]
388 # print(len(phase_22), len(waveform_22))
389 # Get the localtion of max amplitude.
391 maxloc = np.argmax(np.absolute(waveform_22))
392 # Compute the orbital phase at max amplitude.
393 coa_phase = phase_22[maxloc] / 2
395 return coa_phase
397 def get_obs_phi_ref_from_obs_coa_phase(self, coa_phase, t_ref=None, f_ref=None):
398 """Get the observer reference phase given the observer
399 coalescence phase."""
401 # Get the NR coalescence phase
402 nr_coa_phase = self.get_nr_coa_phase()
403 # Get the NR orbital phasing series
404 nr_orb_phase_ts = self._get_phase(2, 2) / 2
406 # Compute the observer reference phase from
407 # this information.
409 avail_t_ref = self.t_ref_nr
411 # Second, get the NR reference phase
412 from scipy.interpolate import interp1d
414 nr_phi_ref = interp1d(self.time, nr_orb_phase_ts, kind="cubic")(avail_t_ref)
416 # Third, compute the offset in coa_phase
417 delta_phi_ref = coa_phase - nr_coa_phase
419 # Finally compute the obserer reference phase at NR reference time.
420 obs_phi_ref = nr_phi_ref + delta_phi_ref
422 return obs_phi_ref
424 def to_lal(self):
425 raise NotImplementedError()
427 def to_astropy(self):
428 return self.to_pycbc().to_astropy()
430 def _get_phase(self, ell=2, emm=2):
431 """Get the phasing of a particular waveform mode."""
433 # Get the complex waveform.
434 wfm_array = self.get_mode(ell, emm)
435 waveform_lm_re = wfm_array[:, 1]
436 waveform_lm_im = wfm_array[:, 2]
437 waveform_lm = waveform_lm_re + 1j * waveform_lm_im
438 # Get the waveform phase.
439 phase_lm = np.unwrap(np.angle(waveform_lm))
440 return phase_lm
442 def _compute_reference_time(self):
443 """Obtain the reference time from the
444 simulation data. Interpolate and get the reference
445 time if only reference frequency is given"""
447 # To get from available data
449 with h5py.File(self.filepath) as h5_file:
450 # First, check if interp is required and get the available reference time .
451 interp, avail_t_ref = check_interp_req(
452 h5_file, self.sim_metadata, ref_time=None
453 )
455 if avail_t_ref is None:
456 ref_omega = None
457 # If the reference time is not available,
458 # compute from reference phase!
459 # Omega is the key of interest.
460 try:
461 ref_omega = get_ref_vals(self.sim_metadata, req_attrs=["Omega"])[
462 "Omega"
463 ]
464 except Exception as excep:
465 print(
466 "Reference orbital phase not found in simulation metadata."
467 "Proceeding to retrieve from the h5 file..",
468 excep,
469 )
470 with h5py.File(self.filepath) as h5_file:
471 ref_omega = get_ref_vals(h5_file, req_attrs=["Omega"])["Omega"]
472 if ref_omega is None:
473 raise KeyError("Could not compute reference omega!")
475 nr_orb_phase_ts = self._get_phase(2, 2) / 2
477 # Differentiate the phase to get orbital angular frequency
478 from waveformtools.differentiate import derivative
480 nr_omega_ts = derivative(self.time, nr_orb_phase_ts, method="FD", degree=2)
481 # Identify the location in time where nr_omega = ref_omega
482 ref_loc = np.argmin(np.absolute(nr_omega_ts - ref_omega))
483 avail_t_ref = self.time[ref_loc]
485 self._t_ref_nr = avail_t_ref
487 # print(avail_t_ref, self._t_ref_nr)
488 return avail_t_ref
490 @property
491 def t_ref_nr(self):
492 """Fetch the reference time of a simulation"""
494 if not isinstance(self._t_ref_nr, float):
495 print("Computing reference time..")
496 self._compute_reference_time()
498 return self._t_ref_nr
501def interpolate_in_amp_phase(obj, new_time, k=3, kind=None):
502 """Interpolate in amplitude and phase
503 using a variety of interpolation methods.
505 Paramters
506 ---------
507 obj: sxs.TimeSeries
508 The TimeSeries object that holds the complex
509 waveform to be interpolated.
510 new_time: array_like
511 The new time axis to interpolate onto.
513 k: int, optional
514 The order of interpolation when
515 `scipy.interpolated.InterpolatedUnivariateSpline` is used.
516 This gets preference over `kind` parameter when both are
517 specified. The default is 3.
519 kind: str, optional
520 The interpolation kind parameter when `scipy.interpolate.interp1d`
521 is used. Can be `linear`, `quadratic` or `cubic` for`scipy.interpolate.interp1d`,
522 or 'CubicSpline' to use `scipy.interpolate.CubicSpline`. Default is None
523 i.e. the parameter `k` will be used instead.
524 See Also
525 --------
526 waveformtools.waveformtools.interp_resam_wfs :
527 The function that interpolates in amplitude
528 and phases using scipy interpolators.
530 scipy.interpolate.CubicSpline:
531 One of the possible methods that can
532 be used for interpolation.
533 scipy.interpolate.interp1d:
534 Can be used in linear, quadratic and cubic mode.
535 scipy.interpolate.InterpolatedUnivariateSpline:
536 Can be used with orders k from 1 to 5.
538 Notes
539 -----
540 These interpolation methods ensure that the
541 interpolated function passes through all the
542 data points.
543 """
544 from waveformtools.waveformtools import interp_resam_wfs
546 resam_data = interp_resam_wfs(
547 wavf_data=np.array(obj),
548 old_taxis=obj.time,
549 new_taxis=new_time,
550 k=k,
551 kind=kind,
552 )
554 resam_data = SXSTimeSeries(resam_data, new_time)
556 metadata = obj._metadata.copy()
557 metadata["time"] = new_time
558 metadata["time_axis"] = obj.time_axis
560 return type(obj)(resam_data, **metadata)