Coverage for /home/runner/work/nr-catalog-tools/nr-catalog-tools/nrcatalogtools/waveform.py: 90%

184 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-01 05:18 +0000

1import os 

2 

3import h5py 

4import lal 

5import numpy as np 

6from pycbc.types import TimeSeries 

7from pycbc.waveform import frequency_from_polarizations 

8from scipy.interpolate import InterpolatedUnivariateSpline 

9from scipy.stats import mode as stat_mode 

10 

11from nrcatalogtools import utils 

12from nrcatalogtools.lvc import ( 

13 check_interp_req, 

14 get_nr_to_lal_rotation_angles, 

15 get_ref_vals, 

16) 

17from sxs import WaveformModes as sxs_WaveformModes 

18from sxs.time_series import TimeSeries as SXSTimeSeries 

19from sxs.waveforms.nrar import ( 

20 h, 

21 translate_data_type_to_spin_weight, 

22 translate_data_type_to_sxs_string, 

23) 

24 

25 

26class WaveformModes(sxs_WaveformModes): 

27 def __new__( 

28 cls, 

29 data, 

30 time=None, 

31 time_axis=0, 

32 modes_axis=1, 

33 ell_min=2, 

34 ell_max=4, 

35 verbosity=0, 

36 **w_attributes, 

37 ) -> None: 

38 self = super().__new__( 

39 cls, 

40 data, 

41 time=time, 

42 time_axis=time_axis, 

43 modes_axis=modes_axis, 

44 ell_min=ell_min, 

45 ell_max=ell_max, 

46 **w_attributes, 

47 ) 

48 self._t_ref_nr = None 

49 self._filepath = None 

50 self.verbosity = verbosity 

51 return self 

52 

53 @classmethod 

54 def _load( 

55 cls, 

56 data, 

57 time=None, 

58 time_axis=0, 

59 modes_axis=1, 

60 ell_min=2, 

61 ell_max=4, 

62 verbosity=0, 

63 **w_attributes, 

64 ): 

65 if time is None: 

66 time = np.arange(0, len(data[:, 0])) 

67 return cls( 

68 data, 

69 time=time, 

70 time_axis=time_axis, 

71 modes_axis=modes_axis, 

72 ell_min=ell_min, 

73 ell_max=ell_max, 

74 verbosity=verbosity, 

75 **w_attributes, 

76 ) 

77 

78 @classmethod 

79 def load_from_h5(cls, file_path_or_open_file, metadata={}, verbosity=0): 

80 """Method to load SWSH waveform modes from RIT or MAYA catalogs 

81 from HDF5 file. 

82 

83 Args: 

84 file_path_or_open_file (str or open file): Either the path to an 

85 HDF5 file containing waveform data, or an open file pointer to 

86 the same. 

87 metadata (dict): Dictionary containing metadata (Note that keys 

88 will be NR group specific) 

89 verbosity (int, optional): Verbosity level with which to 

90 print messages during execution. Defaults to 0. 

91 

92 Raises: 

93 RuntimeError: If inputs are invalid, or if no mode found in 

94 input file. 

95 

96 Returns: 

97 WaveformModes: Object containing time-series of SWSH modes. 

98 """ 

99 import quaternionic 

100 

101 if type(file_path_or_open_file) == h5py._hl.files.File: 

102 h5_file = file_path_or_open_file 

103 close_input_file = False 

104 elif os.path.exists(file_path_or_open_file): 

105 h5_file = h5py.File(file_path_or_open_file, "r") 

106 close_input_file = True 

107 else: 

108 raise RuntimeError(f"Could not use or open {file_path_or_open_file}") 

109 

110 # Set the file path attribute 

111 cls._filepath = h5_file.filename 

112 # If _metadata is not already 

113 # a set attribute, then set 

114 # it here. 

115 

116 try: 

117 cls._metadata 

118 except AttributeError: 

119 cls._metadata = metadata 

120 

121 ELL_MIN, ELL_MAX = 2, 10 

122 ell_min, ell_max = 99, -1 

123 LM = [] 

124 t_min, t_max, dt = -1e99, 1e99, 1 

125 mode_data = {} 

126 for ell in range(ELL_MIN, ELL_MAX + 1): 

127 for em in range(-ell, ell + 1): 

128 afmt = f"amp_l{ell}_m{em}" 

129 pfmt = f"phase_l{ell}_m{em}" 

130 if afmt not in h5_file or pfmt not in h5_file: 

131 continue 

132 amp_time = h5_file[afmt]["X"][:] 

133 amp = h5_file[afmt]["Y"][:] 

134 phase_time = h5_file[pfmt]["X"][:] 

135 phase = h5_file[pfmt]["Y"][:] 

136 mode_data[(ell, em)] = [amp_time, amp, phase_time, phase] 

137 # get the minimum time and maximum time stamps for all modes 

138 t_min = max(t_min, amp_time[0], phase_time[0]) 

139 t_max = min(t_max, amp_time[-1], phase_time[-1]) 

140 dt = min( 

141 dt, 

142 stat_mode(np.diff(amp_time), keepdims=True)[0][0], 

143 stat_mode(np.diff(phase_time), keepdims=True)[0][0], 

144 ) 

145 ell_min = min(ell_min, ell) 

146 ell_max = max(ell_max, ell) 

147 LM.append([ell, em]) 

148 if close_input_file: 

149 h5_file.close() 

150 if len(LM) == 0: 

151 raise RuntimeError( 

152 "We did not find even one mode in the file. Perhaps the " 

153 "format `amp_l?_m?` and `phase_l?_m?` is not the " 

154 "nomenclature of datagroups in the input file?" 

155 ) 

156 

157 times = np.arange(t_min, t_max + 0.5 * dt, dt) 

158 data = np.empty((len(times), len(LM)), dtype=complex) 

159 for idx, (ell, em) in enumerate(LM): 

160 amp_time, amp, phase_time, phase = mode_data[(ell, em)] 

161 amp_interp = InterpolatedUnivariateSpline(amp_time, amp) 

162 phase_interp = InterpolatedUnivariateSpline(phase_time, phase) 

163 data[:, idx] = amp_interp(times) * np.exp(1j * phase_interp(times)) 

164 

165 w_attributes = {} 

166 w_attributes["metadata"] = metadata 

167 w_attributes["history"] = "" 

168 w_attributes["frame"] = quaternionic.array([[1.0, 0.0, 0.0, 0.0]]) 

169 w_attributes["frame_type"] = "inertial" 

170 w_attributes["data_type"] = h 

171 w_attributes["spin_weight"] = translate_data_type_to_spin_weight( 

172 w_attributes["data_type"] 

173 ) 

174 w_attributes["data_type"] = translate_data_type_to_sxs_string( 

175 w_attributes["data_type"] 

176 ) 

177 w_attributes["r_is_scaled_out"] = True 

178 w_attributes["m_is_scaled_out"] = True 

179 # w_attributes["ells"] = ell_min, ell_max 

180 

181 return cls( 

182 data, 

183 time=times, 

184 time_axis=0, 

185 modes_axis=1, 

186 ell_min=ell_min, 

187 ell_max=ell_max, 

188 verbosity=verbosity, 

189 **w_attributes, 

190 ) 

191 

192 @property 

193 def filepath(self): 

194 """Return the data file path""" 

195 if not self._filepath: 

196 self._filepath = self.sim_metadata["waveform_data_location"] 

197 

198 return self._filepath 

199 

200 @property 

201 def sim_metadata(self): 

202 """Return the simulation metadata dictionary""" 

203 return self._metadata["metadata"] 

204 

205 def get_mode(self, ell, em): 

206 return self[f"Y_l{ell}_m{em}.dat"] 

207 

208 @property 

209 def f_lower_at_1Msun(self): 

210 mode22 = self.get_mode(2, 2) 

211 fr22 = frequency_from_polarizations( 

212 TimeSeries(mode22[:, 1], delta_t=np.diff(self.time)[0]), 

213 TimeSeries(-1 * mode22[:, 2], delta_t=np.diff(self.time)[0]), 

214 ) 

215 return fr22[0] / lal.MTSUN_SI 

216 

217 def get_polarizations( 

218 self, inclination, coa_phase, f_ref=None, t_ref=None, tol=1e-6 

219 ): 

220 """Sum over modes data and return plus and cross GW polarizations 

221 

222 Args: 

223 inclination (float): Inclination angle between the line-of-sight 

224 orbital angular momentum vector [radians] 

225 coa_phase (float): Coalesence orbital phase [radians] 

226 tol (float, optional) : The tolerance to allow for 

227 floating point precision errors 

228 in the computation of rotation 

229 angles. Default value is 1e-6. 

230 

231 Returns: 

232 Tuple(numpy.ndarray): Numpy Arrays containing polarizations 

233 time-series 

234 """ 

235 

236 # Get angles 

237 angles = self.get_angles(inclination, coa_phase, f_ref, t_ref, tol) 

238 

239 polarizations = self.evaluate([angles["theta"], angles["psi"], angles["alpha"]]) 

240 

241 return polarizations 

242 

243 def get_td_waveform( 

244 self, 

245 total_mass, 

246 distance, 

247 inclination, 

248 coa_phase, 

249 delta_t=None, 

250 f_ref=None, 

251 t_ref=None, 

252 k=3, 

253 kind=None, 

254 tol=1e-6, 

255 ): 

256 """Sum over modes data and return plus and cross GW polarizations, 

257 rescaled appropriately for a compact-object binary with given 

258 total mass and distance from GW detectors. 

259 

260 Returns: 

261 Tuple(numpy.ndarray): Numpy Arrays containing polarizations 

262 time-series 

263 

264 Args: 

265 total_mass (_type_): _description_ 

266 distance (_type_): _description_ 

267 inclination (float): Inclination angle between the line-of-sight 

268 orbital angular momentum vector [radians] 

269 coa_phase (float): Coalesence orbital phase [radians] 

270 delta_t (_type_, optional): _description_. Defaults to None. 

271 f_ref (float, optional) : The reference frequency. 

272 t_ref (float, optional) : The reference time. 

273 k (int, optional) : The interpolation order to use with 

274 `scipy.interpolate.InterpolatedUnivariateSpline`. 

275 This is the method used by default with value 3. 

276 This parameter `k` is given preference over 

277 `kind` (see below). 

278 kind (str, optional) : The interpolation order to use with 

279 `scipy.interpolate.interp1d` 

280 (`linear`, `quadratic`, `cubic`) or 

281 `CubicSpline` to use `scipy.interpolate.CubicSpline`. 

282 tol (float, optional) : The tolerance to allow for 

283 floating point precision errors 

284 in the computation of rotation 

285 angles. Default value is 1e-6. 

286 Returns: 

287 pycbc.TimeSeries(numpy.complex128): Complex polarizations 

288 stored in `pycbc` container `TimeSeries` 

289 """ 

290 if delta_t is None: 

291 delta_t = stat_mode(np.diff(self.time), keepdims=True)[0][0] 

292 m_secs = utils.time_to_physical(total_mass) 

293 # we assume that we generally do not sample at a rate below 128Hz. 

294 # Therefore, depending on the numerical value of dt, we deduce whether 

295 # dt is in dimensionless units or in seconds. 

296 if delta_t > 1.0 / 128: 

297 new_time = np.arange(min(self.time), max(self.time), delta_t) 

298 else: 

299 new_time = np.arange(min(self.time), max(self.time), delta_t / m_secs) 

300 

301 # Get angles 

302 angles = self.get_angles( 

303 inclination=inclination, 

304 coa_phase=coa_phase, 

305 f_ref=f_ref, 

306 t_ref=t_ref, 

307 tol=tol, 

308 ) 

309 h = interpolate_in_amp_phase( 

310 self.evaluate([angles["theta"], angles["psi"], angles["alpha"]]), 

311 new_time, 

312 k=k, 

313 kind=kind, 

314 ) * utils.amp_to_physical(total_mass, distance) 

315 

316 h.time *= m_secs 

317 # Return conjugated waveform to comply with lal 

318 return self.to_pycbc(np.conjugate(h)) 

319 

320 def get_angles(self, inclination, coa_phase, f_ref=None, t_ref=None, tol=1e-6): 

321 """Get the inclination, azimuthal and polarization angles 

322 of the observer in the NR source frame. 

323 

324 Parameters 

325 ---------- 

326 inclination : float 

327 The inclination angle of the observer 

328 in the LAL source frame 

329 coa_phase : float 

330 The coalescence phase. This will be 

331 used to compute the reference orbital phase. 

332 f_ref, t_ref : float, optional 

333 The reference frquency and time to define the LAL source frame. 

334 Defaults to the available frequency in the data file. 

335 

336 tol (float, optional) : The tolerance to allow for 

337 floating point precision errors 

338 in the computation of rotation 

339 angles. Default value is 1e-6. 

340 Returns 

341 ------- 

342 angles : dict 

343 angles : dict 

344 The angular corrdinates Theta, Psi, and the rotation angle Alpha. 

345 If available, this also contains the reference time and frequency. 

346 """ 

347 

348 # Get observer phi_ref 

349 obs_phi_ref = self.get_obs_phi_ref_from_obs_coa_phase( 

350 coa_phase=coa_phase, t_ref=t_ref, f_ref=f_ref 

351 ) 

352 

353 # Compute angles 

354 with h5py.File(self.filepath) as h5_file: 

355 angles = get_nr_to_lal_rotation_angles( 

356 h5_file=h5_file, 

357 sim_metadata=self.sim_metadata, 

358 inclination=inclination, 

359 phi_ref=obs_phi_ref, 

360 f_ref=f_ref, 

361 t_ref=t_ref, 

362 tol=tol, 

363 ) 

364 

365 return angles 

366 

367 def to_pycbc(self, input_array=None): 

368 if input_array is None: 

369 input_array = self 

370 

371 delta_t = stat_mode(np.diff(input_array.time), keepdims=True)[0][0] 

372 return TimeSeries( 

373 np.array(input_array), 

374 delta_t=delta_t, 

375 dtype=self.ndarray.dtype, 

376 epoch=input_array.time[0], 

377 copy=True, 

378 ) 

379 

380 def get_nr_coa_phase(self): 

381 """Get the NR coalescence orbital phase from the 2,2 mode.""" 

382 

383 # Get the waveform phase. 

384 phase_22 = self._get_phase(2, 2) 

385 

386 waveform_22 = self.get_mode(2, 2)[:, 1] + 1j * self.get_mode(2, 2)[:, 2] 

387 

388 # print(len(phase_22), len(waveform_22)) 

389 # Get the localtion of max amplitude. 

390 

391 maxloc = np.argmax(np.absolute(waveform_22)) 

392 # Compute the orbital phase at max amplitude. 

393 coa_phase = phase_22[maxloc] / 2 

394 

395 return coa_phase 

396 

397 def get_obs_phi_ref_from_obs_coa_phase(self, coa_phase, t_ref=None, f_ref=None): 

398 """Get the observer reference phase given the observer 

399 coalescence phase.""" 

400 

401 # Get the NR coalescence phase 

402 nr_coa_phase = self.get_nr_coa_phase() 

403 # Get the NR orbital phasing series 

404 nr_orb_phase_ts = self._get_phase(2, 2) / 2 

405 

406 # Compute the observer reference phase from 

407 # this information. 

408 

409 avail_t_ref = self.t_ref_nr 

410 

411 # Second, get the NR reference phase 

412 from scipy.interpolate import interp1d 

413 

414 nr_phi_ref = interp1d(self.time, nr_orb_phase_ts, kind="cubic")(avail_t_ref) 

415 

416 # Third, compute the offset in coa_phase 

417 delta_phi_ref = coa_phase - nr_coa_phase 

418 

419 # Finally compute the obserer reference phase at NR reference time. 

420 obs_phi_ref = nr_phi_ref + delta_phi_ref 

421 

422 return obs_phi_ref 

423 

424 def to_lal(self): 

425 raise NotImplementedError() 

426 

427 def to_astropy(self): 

428 return self.to_pycbc().to_astropy() 

429 

430 def _get_phase(self, ell=2, emm=2): 

431 """Get the phasing of a particular waveform mode.""" 

432 

433 # Get the complex waveform. 

434 wfm_array = self.get_mode(ell, emm) 

435 waveform_lm_re = wfm_array[:, 1] 

436 waveform_lm_im = wfm_array[:, 2] 

437 waveform_lm = waveform_lm_re + 1j * waveform_lm_im 

438 # Get the waveform phase. 

439 phase_lm = np.unwrap(np.angle(waveform_lm)) 

440 return phase_lm 

441 

442 def _compute_reference_time(self): 

443 """Obtain the reference time from the 

444 simulation data. Interpolate and get the reference 

445 time if only reference frequency is given""" 

446 

447 # To get from available data 

448 

449 with h5py.File(self.filepath) as h5_file: 

450 # First, check if interp is required and get the available reference time . 

451 interp, avail_t_ref = check_interp_req( 

452 h5_file, self.sim_metadata, ref_time=None 

453 ) 

454 

455 if avail_t_ref is None: 

456 ref_omega = None 

457 # If the reference time is not available, 

458 # compute from reference phase! 

459 # Omega is the key of interest. 

460 try: 

461 ref_omega = get_ref_vals(self.sim_metadata, req_attrs=["Omega"])[ 

462 "Omega" 

463 ] 

464 except Exception as excep: 

465 print( 

466 "Reference orbital phase not found in simulation metadata." 

467 "Proceeding to retrieve from the h5 file..", 

468 excep, 

469 ) 

470 with h5py.File(self.filepath) as h5_file: 

471 ref_omega = get_ref_vals(h5_file, req_attrs=["Omega"])["Omega"] 

472 if ref_omega is None: 

473 raise KeyError("Could not compute reference omega!") 

474 

475 nr_orb_phase_ts = self._get_phase(2, 2) / 2 

476 

477 # Differentiate the phase to get orbital angular frequency 

478 from waveformtools.differentiate import derivative 

479 

480 nr_omega_ts = derivative(self.time, nr_orb_phase_ts, method="FD", degree=2) 

481 # Identify the location in time where nr_omega = ref_omega 

482 ref_loc = np.argmin(np.absolute(nr_omega_ts - ref_omega)) 

483 avail_t_ref = self.time[ref_loc] 

484 

485 self._t_ref_nr = avail_t_ref 

486 

487 # print(avail_t_ref, self._t_ref_nr) 

488 return avail_t_ref 

489 

490 @property 

491 def t_ref_nr(self): 

492 """Fetch the reference time of a simulation""" 

493 

494 if not isinstance(self._t_ref_nr, float): 

495 print("Computing reference time..") 

496 self._compute_reference_time() 

497 

498 return self._t_ref_nr 

499 

500 

501def interpolate_in_amp_phase(obj, new_time, k=3, kind=None): 

502 """Interpolate in amplitude and phase 

503 using a variety of interpolation methods. 

504 

505 Paramters 

506 --------- 

507 obj: sxs.TimeSeries 

508 The TimeSeries object that holds the complex 

509 waveform to be interpolated. 

510 new_time: array_like 

511 The new time axis to interpolate onto. 

512 

513 k: int, optional 

514 The order of interpolation when 

515 `scipy.interpolated.InterpolatedUnivariateSpline` is used. 

516 This gets preference over `kind` parameter when both are 

517 specified. The default is 3. 

518 

519 kind: str, optional 

520 The interpolation kind parameter when `scipy.interpolate.interp1d` 

521 is used. Can be `linear`, `quadratic` or `cubic` for`scipy.interpolate.interp1d`, 

522 or 'CubicSpline' to use `scipy.interpolate.CubicSpline`. Default is None 

523 i.e. the parameter `k` will be used instead. 

524 See Also 

525 -------- 

526 waveformtools.waveformtools.interp_resam_wfs : 

527 The function that interpolates in amplitude 

528 and phases using scipy interpolators. 

529 

530 scipy.interpolate.CubicSpline: 

531 One of the possible methods that can 

532 be used for interpolation. 

533 scipy.interpolate.interp1d: 

534 Can be used in linear, quadratic and cubic mode. 

535 scipy.interpolate.InterpolatedUnivariateSpline: 

536 Can be used with orders k from 1 to 5. 

537 

538 Notes 

539 ----- 

540 These interpolation methods ensure that the 

541 interpolated function passes through all the 

542 data points. 

543 """ 

544 from waveformtools.waveformtools import interp_resam_wfs 

545 

546 resam_data = interp_resam_wfs( 

547 wavf_data=np.array(obj), 

548 old_taxis=obj.time, 

549 new_taxis=new_time, 

550 k=k, 

551 kind=kind, 

552 ) 

553 

554 resam_data = SXSTimeSeries(resam_data, new_time) 

555 

556 metadata = obj._metadata.copy() 

557 metadata["time"] = new_time 

558 metadata["time_axis"] = obj.time_axis 

559 

560 return type(obj)(resam_data, **metadata)