Coverage for /home/runner/work/nr-catalog-tools/nr-catalog-tools/nrcatalogtools/maya.py: 72%
170 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-01 05:18 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-01 05:18 +0000
1import collections
2import functools
3import os
4import zipfile
6import pandas as pd
8from nrcatalogtools import catalog, utils
11class MayaCatalog(catalog.CatalogBase):
12 def __init__(self, catalog=None, use_cache=True, verbosity=0, **kwargs) -> None:
13 if catalog is not None:
14 super().__init__(catalog)
15 else:
16 obj = type(self).load(verbosity=verbosity, **kwargs)
17 super().__init__(obj._dict)
18 self._verbosity = verbosity
19 self._dict["catalog_file_description"] = "scraped from website"
20 self._dict["modified"] = {}
21 self._dict["records"] = {}
23 # Other info
24 self.num_of_sims = 0
25 self.catalog_url = utils.maya_catalog_info["url"]
26 self.cache_dir = utils.maya_catalog_info["cache_dir"]
27 self.use_cache = use_cache
29 self.metadata = pd.DataFrame.from_dict({})
30 self.metadata_url = utils.maya_catalog_info["metadata_url"]
31 self.metadata_dir = utils.maya_catalog_info["metadata_dir"]
33 self.waveform_data = {}
34 self.waveform_data_url = utils.maya_catalog_info["data_url"]
35 self.waveform_data_dir = utils.maya_catalog_info["data_dir"]
37 self._add_paths_to_metadata()
39 internal_dirs = [self.cache_dir, self.metadata_dir, self.waveform_data_dir]
40 for d in internal_dirs:
41 d.mkdir(parents=True, exist_ok=True)
43 @classmethod
44 @functools.lru_cache()
45 def load(cls, download=None, verbosity=0):
46 progress = True
47 utils.maya_catalog_info["cache_dir"].mkdir(parents=True, exist_ok=True)
48 catalog_url = utils.maya_catalog_info["metadata_url"]
49 cache_path = utils.maya_catalog_info["cache_dir"] / "catalog.zip"
50 if cache_path.exists():
51 if_newer = cache_path
52 else:
53 if_newer = False
55 if download or download is None:
56 # 1. Download the full txt file (zipped in flight, but auto-decompressed on arrival)
57 # 2. Zip to a temporary file (using bzip2, which is better than the in-flight compression)
58 # 3. Replace the original catalog.zip with the temporary zip file
59 # 4. Remove the full txt file
60 # 5. Make sure the temporary zip file is gone too
61 temp_txt = cache_path.with_suffix(".temp.txt")
62 temp_zip = cache_path.with_suffix(".temp.zip")
63 try:
64 try:
65 utils.download_file(
66 catalog_url, temp_txt, progress=progress, if_newer=if_newer
67 )
68 except Exception as e:
69 if download:
70 raise RuntimeError(
71 f"Failed to download '{catalog_url}'; try setting `download=False`"
72 ) from e
73 download_failed = e # We'll try the cache
74 else:
75 download_failed = False
76 if temp_txt.exists():
77 with zipfile.ZipFile(
78 temp_zip, "w", compression=zipfile.ZIP_BZIP2
79 ) as catalog_zip:
80 catalog_zip.write(temp_txt, arcname="catalog.txt")
81 temp_zip.replace(cache_path)
82 finally:
83 # The `missing_ok` argument to `unlink` would be much nicer, but was added in python 3.8
84 try:
85 temp_txt.unlink()
86 except FileNotFoundError:
87 pass
88 try:
89 temp_zip.unlink()
90 except FileNotFoundError:
91 pass
93 if not cache_path.exists():
94 if download_failed:
95 raise ValueError(
96 f"Catalog not found in '{cache_path}' and download failed"
97 ) from download_failed
98 elif (
99 download is False
100 ): # Test if it literally *is* False, rather than just casts to False
101 raise ValueError(
102 f"The catalog was not found in '{cache_path}', and downloading was turned off"
103 )
104 else:
105 raise ValueError(
106 f"Catalog not found in '{cache_path}' for unknown reasons"
107 )
109 try:
110 with zipfile.ZipFile(cache_path, "r") as catalog_zip:
111 try:
112 with catalog_zip.open("catalog.txt") as catalog_txt:
113 try:
114 catalog_df = (
115 pd.read_table(
116 catalog_txt,
117 sep="|",
118 header=0,
119 index_col=1,
120 skipinitialspace=True,
121 )
122 .dropna(axis=1, how="all")
123 .iloc[1:]
124 )
125 except Exception as e:
126 raise ValueError(
127 f"Failed to parse 'catalog.json' in '{cache_path}'"
128 ) from e
129 except Exception as e:
130 raise ValueError(
131 f"Failed to open 'catalog.txt' in '{cache_path}'"
132 ) from e
133 except Exception as e:
134 raise ValueError(f"Failed to open '{cache_path}' as a ZIP file") from e
136 # Fill in the catalog object
137 catalog_dict = {}
138 catalog_dict["GTID"] = [s.strip() for s in list(catalog_df.index)]
140 for col_name in catalog_df.columns:
141 column = list(catalog_df[col_name])
142 if "GT_Tag" in col_name:
143 catalog_dict["GT_Tag"] = [s.strip() for s in column]
144 else:
145 catalog_dict[col_name.strip()] = [
146 float(s.strip().replace("-", "NAN")) if type(s) == str else float(s)
147 for s in column
148 ]
149 catalog_df = pd.DataFrame(catalog_dict)
150 catalog = {}
151 simulations = {}
152 for idx, row in catalog_df.iterrows():
153 name = row["GTID"]
154 metadata_dict = row.to_dict()
155 simulations[name] = metadata_dict
156 catalog["simulations"] = simulations
157 return cls(catalog=catalog, verbosity=verbosity)
159 def _add_paths_to_metadata(self):
160 metadata_dict = self._dict["simulations"]
161 existing_cols = list(metadata_dict[list(metadata_dict.keys())[0]].keys())
162 new_cols = [
163 "metadata_link",
164 "metadata_location",
165 "waveform_data_link",
166 "waveform_data_location",
167 ]
169 if any([col not in existing_cols for col in new_cols]):
170 for sim_name in metadata_dict:
171 if "metadata_location" not in existing_cols:
172 metadata_dict[sim_name][
173 "metadata_location"
174 ] = self.metadata_filepath_from_simname(sim_name)
175 if "metadata_link" not in existing_cols:
176 metadata_dict[sim_name]["metadata_link"] = self.metadata_url
177 if "waveform_data_link" not in existing_cols:
178 metadata_dict[sim_name]["waveform_data_link"] = (
179 self.waveform_data_url + "/" + f"{sim_name}.h5"
180 )
181 if "waveform_data_location" not in existing_cols:
182 metadata_dict[sim_name][
183 "waveform_data_location"
184 ] = self.waveform_filepath_from_simname(sim_name)
186 @property
187 @functools.lru_cache()
188 def simulations_dataframe(self):
189 df = pd.DataFrame(self.simulations).transpose()
190 df.rename(columns={"GTID": "simulation_name"}, inplace=True)
191 return df
193 @property
194 @functools.lru_cache()
195 def files(self):
196 """Map of all file names to the corresponding file info"""
197 file_infos = {}
198 for _, row in self.simulations_dataframe.iterrows():
199 waveform_data_location = row["waveform_data_location"]
200 path_str = os.path.basename(waveform_data_location)
201 if os.path.exists(waveform_data_location):
202 file_size = os.path.getsize(waveform_data_location)
203 else:
204 file_size = 0
205 file_info = {
206 "checksum": None,
207 "filename": os.path.basename(waveform_data_location),
208 "filesize": file_size,
209 "download": row["waveform_data_link"],
210 }
211 file_infos[path_str] = file_info
213 unique_files = collections.defaultdict(list)
214 for k, v in file_infos.items():
215 unique_files[f"{v['checksum']}{v['filesize']}"].append(k)
217 original_paths = {k: min(v) for k, v in unique_files.items()}
219 for v in file_infos.values():
220 v["truepath"] = original_paths[f"{v['checksum']}{v['filesize']}"]
222 return file_infos
224 def waveform_filename_from_simname(self, sim_name):
225 return sim_name + ".h5"
227 def waveform_filepath_from_simname(self, sim_name):
228 file_path = self.waveform_data_dir / self.waveform_filename_from_simname(
229 sim_name
230 )
231 if not os.path.exists(file_path):
232 if self._verbosity > 2:
233 print(
234 f"WARNING: Could not resolve path for {sim_name}"
235 f"..best calculated path = {file_path}"
236 )
237 return file_path.as_posix()
239 def waveform_url_from_simname(self, sim_name):
240 return (
241 self.waveform_data_url + "/" + self.waveform_filename_from_simname(sim_name)
242 )
244 def metadata_filename_from_simname(self, sim_name):
245 return os.path.basename(self.metadata_filepath_from_simname(sim_name))
247 def metadata_filepath_from_simname(self, sim_name, ext="txt"):
248 return str(self.metadata_dir / f"{sim_name}.{ext}")
250 def download_waveform_data(self, sim_name, use_cache=None):
251 if use_cache is None:
252 use_cache = self.use_cache
253 file_name = self.waveform_filename_from_simname(sim_name)
254 file_path_web = self.waveform_data_url + "/" + file_name
255 local_file_path = self.waveform_data_dir / file_name
256 if (
257 use_cache
258 and os.path.exists(local_file_path)
259 and os.path.getsize(local_file_path) > 0
260 ):
261 if self._verbosity > 2:
262 print("...can read from cache: {}".format(str(local_file_path)))
263 pass
264 elif os.path.exists(local_file_path) and os.path.getsize(local_file_path) > 0:
265 pass
266 else:
267 if self._verbosity > 2:
268 print("...writing to cache: {}".format(str(local_file_path)))
269 if utils.url_exists(file_path_web):
270 if self._verbosity > 2:
271 print("...downloading {}".format(file_path_web))
272 utils.download_file(file_path_web, local_file_path)
273 else:
274 if self._verbosity > 2:
275 print(
276 "... ... but couldnt find link: {}".format(str(file_path_web))
277 )