Coverage for /home/runner/work/nr-catalog-tools/nr-catalog-tools/nrcatalogtools/maya.py: 72%

170 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-01 05:18 +0000

1import collections 

2import functools 

3import os 

4import zipfile 

5 

6import pandas as pd 

7 

8from nrcatalogtools import catalog, utils 

9 

10 

11class MayaCatalog(catalog.CatalogBase): 

12 def __init__(self, catalog=None, use_cache=True, verbosity=0, **kwargs) -> None: 

13 if catalog is not None: 

14 super().__init__(catalog) 

15 else: 

16 obj = type(self).load(verbosity=verbosity, **kwargs) 

17 super().__init__(obj._dict) 

18 self._verbosity = verbosity 

19 self._dict["catalog_file_description"] = "scraped from website" 

20 self._dict["modified"] = {} 

21 self._dict["records"] = {} 

22 

23 # Other info 

24 self.num_of_sims = 0 

25 self.catalog_url = utils.maya_catalog_info["url"] 

26 self.cache_dir = utils.maya_catalog_info["cache_dir"] 

27 self.use_cache = use_cache 

28 

29 self.metadata = pd.DataFrame.from_dict({}) 

30 self.metadata_url = utils.maya_catalog_info["metadata_url"] 

31 self.metadata_dir = utils.maya_catalog_info["metadata_dir"] 

32 

33 self.waveform_data = {} 

34 self.waveform_data_url = utils.maya_catalog_info["data_url"] 

35 self.waveform_data_dir = utils.maya_catalog_info["data_dir"] 

36 

37 self._add_paths_to_metadata() 

38 

39 internal_dirs = [self.cache_dir, self.metadata_dir, self.waveform_data_dir] 

40 for d in internal_dirs: 

41 d.mkdir(parents=True, exist_ok=True) 

42 

43 @classmethod 

44 @functools.lru_cache() 

45 def load(cls, download=None, verbosity=0): 

46 progress = True 

47 utils.maya_catalog_info["cache_dir"].mkdir(parents=True, exist_ok=True) 

48 catalog_url = utils.maya_catalog_info["metadata_url"] 

49 cache_path = utils.maya_catalog_info["cache_dir"] / "catalog.zip" 

50 if cache_path.exists(): 

51 if_newer = cache_path 

52 else: 

53 if_newer = False 

54 

55 if download or download is None: 

56 # 1. Download the full txt file (zipped in flight, but auto-decompressed on arrival) 

57 # 2. Zip to a temporary file (using bzip2, which is better than the in-flight compression) 

58 # 3. Replace the original catalog.zip with the temporary zip file 

59 # 4. Remove the full txt file 

60 # 5. Make sure the temporary zip file is gone too 

61 temp_txt = cache_path.with_suffix(".temp.txt") 

62 temp_zip = cache_path.with_suffix(".temp.zip") 

63 try: 

64 try: 

65 utils.download_file( 

66 catalog_url, temp_txt, progress=progress, if_newer=if_newer 

67 ) 

68 except Exception as e: 

69 if download: 

70 raise RuntimeError( 

71 f"Failed to download '{catalog_url}'; try setting `download=False`" 

72 ) from e 

73 download_failed = e # We'll try the cache 

74 else: 

75 download_failed = False 

76 if temp_txt.exists(): 

77 with zipfile.ZipFile( 

78 temp_zip, "w", compression=zipfile.ZIP_BZIP2 

79 ) as catalog_zip: 

80 catalog_zip.write(temp_txt, arcname="catalog.txt") 

81 temp_zip.replace(cache_path) 

82 finally: 

83 # The `missing_ok` argument to `unlink` would be much nicer, but was added in python 3.8 

84 try: 

85 temp_txt.unlink() 

86 except FileNotFoundError: 

87 pass 

88 try: 

89 temp_zip.unlink() 

90 except FileNotFoundError: 

91 pass 

92 

93 if not cache_path.exists(): 

94 if download_failed: 

95 raise ValueError( 

96 f"Catalog not found in '{cache_path}' and download failed" 

97 ) from download_failed 

98 elif ( 

99 download is False 

100 ): # Test if it literally *is* False, rather than just casts to False 

101 raise ValueError( 

102 f"The catalog was not found in '{cache_path}', and downloading was turned off" 

103 ) 

104 else: 

105 raise ValueError( 

106 f"Catalog not found in '{cache_path}' for unknown reasons" 

107 ) 

108 

109 try: 

110 with zipfile.ZipFile(cache_path, "r") as catalog_zip: 

111 try: 

112 with catalog_zip.open("catalog.txt") as catalog_txt: 

113 try: 

114 catalog_df = ( 

115 pd.read_table( 

116 catalog_txt, 

117 sep="|", 

118 header=0, 

119 index_col=1, 

120 skipinitialspace=True, 

121 ) 

122 .dropna(axis=1, how="all") 

123 .iloc[1:] 

124 ) 

125 except Exception as e: 

126 raise ValueError( 

127 f"Failed to parse 'catalog.json' in '{cache_path}'" 

128 ) from e 

129 except Exception as e: 

130 raise ValueError( 

131 f"Failed to open 'catalog.txt' in '{cache_path}'" 

132 ) from e 

133 except Exception as e: 

134 raise ValueError(f"Failed to open '{cache_path}' as a ZIP file") from e 

135 

136 # Fill in the catalog object 

137 catalog_dict = {} 

138 catalog_dict["GTID"] = [s.strip() for s in list(catalog_df.index)] 

139 

140 for col_name in catalog_df.columns: 

141 column = list(catalog_df[col_name]) 

142 if "GT_Tag" in col_name: 

143 catalog_dict["GT_Tag"] = [s.strip() for s in column] 

144 else: 

145 catalog_dict[col_name.strip()] = [ 

146 float(s.strip().replace("-", "NAN")) if type(s) == str else float(s) 

147 for s in column 

148 ] 

149 catalog_df = pd.DataFrame(catalog_dict) 

150 catalog = {} 

151 simulations = {} 

152 for idx, row in catalog_df.iterrows(): 

153 name = row["GTID"] 

154 metadata_dict = row.to_dict() 

155 simulations[name] = metadata_dict 

156 catalog["simulations"] = simulations 

157 return cls(catalog=catalog, verbosity=verbosity) 

158 

159 def _add_paths_to_metadata(self): 

160 metadata_dict = self._dict["simulations"] 

161 existing_cols = list(metadata_dict[list(metadata_dict.keys())[0]].keys()) 

162 new_cols = [ 

163 "metadata_link", 

164 "metadata_location", 

165 "waveform_data_link", 

166 "waveform_data_location", 

167 ] 

168 

169 if any([col not in existing_cols for col in new_cols]): 

170 for sim_name in metadata_dict: 

171 if "metadata_location" not in existing_cols: 

172 metadata_dict[sim_name][ 

173 "metadata_location" 

174 ] = self.metadata_filepath_from_simname(sim_name) 

175 if "metadata_link" not in existing_cols: 

176 metadata_dict[sim_name]["metadata_link"] = self.metadata_url 

177 if "waveform_data_link" not in existing_cols: 

178 metadata_dict[sim_name]["waveform_data_link"] = ( 

179 self.waveform_data_url + "/" + f"{sim_name}.h5" 

180 ) 

181 if "waveform_data_location" not in existing_cols: 

182 metadata_dict[sim_name][ 

183 "waveform_data_location" 

184 ] = self.waveform_filepath_from_simname(sim_name) 

185 

186 @property 

187 @functools.lru_cache() 

188 def simulations_dataframe(self): 

189 df = pd.DataFrame(self.simulations).transpose() 

190 df.rename(columns={"GTID": "simulation_name"}, inplace=True) 

191 return df 

192 

193 @property 

194 @functools.lru_cache() 

195 def files(self): 

196 """Map of all file names to the corresponding file info""" 

197 file_infos = {} 

198 for _, row in self.simulations_dataframe.iterrows(): 

199 waveform_data_location = row["waveform_data_location"] 

200 path_str = os.path.basename(waveform_data_location) 

201 if os.path.exists(waveform_data_location): 

202 file_size = os.path.getsize(waveform_data_location) 

203 else: 

204 file_size = 0 

205 file_info = { 

206 "checksum": None, 

207 "filename": os.path.basename(waveform_data_location), 

208 "filesize": file_size, 

209 "download": row["waveform_data_link"], 

210 } 

211 file_infos[path_str] = file_info 

212 

213 unique_files = collections.defaultdict(list) 

214 for k, v in file_infos.items(): 

215 unique_files[f"{v['checksum']}{v['filesize']}"].append(k) 

216 

217 original_paths = {k: min(v) for k, v in unique_files.items()} 

218 

219 for v in file_infos.values(): 

220 v["truepath"] = original_paths[f"{v['checksum']}{v['filesize']}"] 

221 

222 return file_infos 

223 

224 def waveform_filename_from_simname(self, sim_name): 

225 return sim_name + ".h5" 

226 

227 def waveform_filepath_from_simname(self, sim_name): 

228 file_path = self.waveform_data_dir / self.waveform_filename_from_simname( 

229 sim_name 

230 ) 

231 if not os.path.exists(file_path): 

232 if self._verbosity > 2: 

233 print( 

234 f"WARNING: Could not resolve path for {sim_name}" 

235 f"..best calculated path = {file_path}" 

236 ) 

237 return file_path.as_posix() 

238 

239 def waveform_url_from_simname(self, sim_name): 

240 return ( 

241 self.waveform_data_url + "/" + self.waveform_filename_from_simname(sim_name) 

242 ) 

243 

244 def metadata_filename_from_simname(self, sim_name): 

245 return os.path.basename(self.metadata_filepath_from_simname(sim_name)) 

246 

247 def metadata_filepath_from_simname(self, sim_name, ext="txt"): 

248 return str(self.metadata_dir / f"{sim_name}.{ext}") 

249 

250 def download_waveform_data(self, sim_name, use_cache=None): 

251 if use_cache is None: 

252 use_cache = self.use_cache 

253 file_name = self.waveform_filename_from_simname(sim_name) 

254 file_path_web = self.waveform_data_url + "/" + file_name 

255 local_file_path = self.waveform_data_dir / file_name 

256 if ( 

257 use_cache 

258 and os.path.exists(local_file_path) 

259 and os.path.getsize(local_file_path) > 0 

260 ): 

261 if self._verbosity > 2: 

262 print("...can read from cache: {}".format(str(local_file_path))) 

263 pass 

264 elif os.path.exists(local_file_path) and os.path.getsize(local_file_path) > 0: 

265 pass 

266 else: 

267 if self._verbosity > 2: 

268 print("...writing to cache: {}".format(str(local_file_path))) 

269 if utils.url_exists(file_path_web): 

270 if self._verbosity > 2: 

271 print("...downloading {}".format(file_path_web)) 

272 utils.download_file(file_path_web, local_file_path) 

273 else: 

274 if self._verbosity > 2: 

275 print( 

276 "... ... but couldnt find link: {}".format(str(file_path_web)) 

277 )