csvpath.managers.paths.paths_manager
1# pylint: disable=C0114 2import os 3import json 4from typing import NewType 5from json import JSONDecodeError 6from csvpath import CsvPath 7from csvpath.util.exceptions import InputException 8from csvpath.util.metadata_parser import MetadataParser 9from csvpath.util.reference_parser import ReferenceParser 10from csvpath.util.file_readers import DataFileReader 11from csvpath.util.file_writers import DataFileWriter 12from csvpath.util.nos import Nos 13from .paths_registrar import PathsRegistrar 14from .paths_metadata import PathsMetadata 15 16# types added just for clarity 17NamedPathsName = NewType("NamedPathsName", str) 18"""@private""" 19Csvpath = NewType("Csvpath", str) 20"""@private""" 21Identity = NewType("Identity", str) 22"""@private""" 23 24 25class PathsManager: 26 MARKER: str = "---- CSVPATH ----" 27 28 def __init__(self, *, csvpaths, named_paths=None): 29 """@private""" 30 self.csvpaths = csvpaths 31 """@private""" 32 self._registrar = None 33 34 # 35 # ================== publics ===================== 36 # 37 @property 38 def paths_root_manifest_path(self) -> str: 39 """@private""" 40 r = self.csvpaths.config.get(section="inputs", name="csvpaths") 41 p = os.path.join(r, "manifest.json") 42 if not Nos(r).dir_exists(): 43 Nos(r).makedirs() 44 if not Nos(p).exists(): 45 with DataFileWriter(path=p) as writer: 46 writer.write("[]") 47 return p 48 49 @property 50 def paths_root_manifest(self) -> str: 51 """@private""" 52 p = self.paths_root_manifest_path 53 with DataFileReader(p) as reader: 54 return json.load(reader.source) 55 56 @property 57 def registrar(self) -> PathsRegistrar: 58 """@private""" 59 if self._registrar is None: 60 self._registrar = PathsRegistrar(self.csvpaths) 61 return self._registrar 62 63 def named_paths_home(self, name: NamedPathsName) -> str: 64 """@private""" 65 home = os.path.join(self.named_paths_dir, name) 66 if not Nos(home).dir_exists(): 67 Nos(home).makedirs() 68 return home 69 70 @property 71 def named_paths_dir(self) -> str: 72 """@private""" 73 return self.csvpaths.config.inputs_csvpaths_path 74 75 def set_named_paths(self, np: dict[NamedPathsName, list[Csvpath]]) -> None: 76 for name in np: 77 if not isinstance(np[name], list): 78 msg = f"{name} does not key a list of csvpaths" 79 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 80 if self.csvpaths.ecoms.do_i_raise(): 81 raise InputException(msg) 82 return 83 for k, v in np.items(): 84 self.add_named_paths(name=k, paths=v) 85 self.csvpaths.logger.info("Set named-paths to %s groups", len(np)) 86 87 def add_named_paths_from_dir( 88 self, *, directory: str, name: NamedPathsName = None 89 ) -> None: 90 if directory is None: 91 msg = "Named paths collection name needed" 92 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 93 if self.csvpaths.ecoms.do_i_raise(): 94 raise InputException(msg) 95 if not Nos(directory).isfile(): 96 dlist = Nos(directory).listdir() 97 base = directory 98 agg = [] 99 for p in dlist: 100 if p[0] == ".": 101 continue 102 if p.find(".") == -1: 103 continue 104 ext = p[p.rfind(".") + 1 :].strip().lower() 105 if ext not in self.csvpaths.config.csvpath_file_extensions: 106 continue 107 path = os.path.join(base, p) 108 if name is None: 109 # 110 # add files one by one under their own names 111 # 112 aname = self._name_from_name_part(p) 113 self.add_named_paths_from_file(name=aname, file_path=path) 114 else: 115 # 116 # if a name, aggregate all the files 117 # 118 _ = self._get_csvpaths_from_file(path) 119 120 # 121 # try to find a run-index: N metadata and use it 122 # to try to impose order? we could do this, but it would 123 # be messy and a work-around to avoid making people 124 # use the ordered ways of creating named-paths that 125 # already exist: JSON and all-in-ones 126 # 127 """ 128 c = self.csvpaths.csvpath() 129 MetadataParser(c).extract_metadata(instance=c, csvpath=path) 130 """ 131 agg += _ 132 if len(agg) > 0: 133 self.add_named_paths(name=name, paths=agg, source_path=directory) 134 else: 135 msg = "Dirname must point to a directory" 136 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 137 if self.csvpaths.ecoms.do_i_raise(): 138 raise InputException(msg) 139 140 def add_named_paths_from_file( 141 self, *, name: NamedPathsName, file_path: str 142 ) -> None: 143 self.csvpaths.logger.debug("Reading csvpaths file at %s", file_path) 144 _ = self._get_csvpaths_from_file(file_path) 145 self.add_named_paths(name=name, paths=_, source_path=file_path) 146 147 def add_named_paths_from_json(self, file_path: str) -> None: 148 try: 149 self.csvpaths.logger.debug("Opening JSON file at %s", file_path) 150 with open(file_path, encoding="utf-8") as f: 151 j = json.load(f) 152 self.csvpaths.logger.debug("Found JSON file with %s keys", len(j)) 153 for k in j: 154 self.store_json_paths_file(k, file_path) 155 v = j[k] 156 paths = [] 157 for f in v: 158 _ = self._get_csvpaths_from_file(f) 159 paths += _ 160 self.add_named_paths(name=k, paths=paths, source_path=file_path) 161 except (OSError, ValueError, TypeError, JSONDecodeError) as ex: 162 self.csvpaths.error_manager.handle_error(source=self, msg=f"{ex}") 163 if self.csvpaths.ecoms.do_i_raise(): 164 raise 165 166 def add_named_paths( 167 self, 168 *, 169 name: NamedPathsName, 170 paths: list[Csvpath] = None, 171 from_file: str = None, 172 from_dir: str = None, 173 from_json: str = None, 174 source_path: str = None, 175 ) -> None: 176 if from_file is not None: 177 return self.add_named_paths_from_file(name=name, file_path=from_file) 178 elif from_dir is not None: 179 return self.add_named_paths_from_dir(name=name, directory=from_dir) 180 elif from_json is not None: 181 return self.add_named_paths_from_json(file_path=from_json) 182 if not isinstance(paths, list): 183 msg = """Paths must be a list of csvpaths. 184 If you want to load a file use add_named_paths_from_file or 185 set_named_paths_from_json.""" 186 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 187 if self.csvpaths.ecoms.do_i_raise(): 188 raise InputException(msg) 189 return 190 self.csvpaths.logger.debug("Adding csvpaths to named-paths group %s", name) 191 for _ in paths: 192 self.csvpaths.logger.debug("Adding %s to %s", _, name) 193 s = self._str_from_list(paths) 194 t = self._copy_in(name, s) 195 grp_paths = self.get_identified_paths_in(name, paths=paths) 196 ids = [t[0] for t in grp_paths] 197 for i, t in enumerate(ids): 198 if t is None or t.strip() == "": 199 ids[i] = f"{i}" 200 mdata = PathsMetadata(self.csvpaths.config) 201 mdata.archive_name = self.csvpaths.config.archive_name 202 mdata.named_paths_name = name 203 sep = Nos(mdata.named_paths_root).sep 204 mdata.named_paths_home = f"{mdata.named_paths_root}{sep}{name}" 205 mdata.group_file_path = f"{mdata.named_paths_home}{sep}group.csvpaths" 206 mdata.named_paths = paths 207 mdata.named_paths_identities = ids 208 mdata.named_paths_count = len(ids) 209 mdata.source_path = source_path 210 self.registrar.register_complete(mdata) 211 212 # 213 # adding ref handling for the form: $many.csvpaths.food 214 # which is equiv to: many#food 215 # 216 def get_named_paths(self, name: NamedPathsName) -> list[Csvpath]: 217 self.csvpaths.logger.info("Getting named-paths for %s", name) 218 ret = None 219 npn = None 220 identity = None 221 if name.startswith("$"): 222 ref = ReferenceParser(name) 223 if ref.datatype != ReferenceParser.CSVPATHS: 224 raise InputException( 225 f"Reference datatype must be {ReferenceParser.CSVPATHS}" 226 ) 227 npn = ref.root_major 228 identity = ref.name_one 229 else: 230 npn, identity = self._paths_name_path(name) 231 if identity is None and self.has_named_paths(npn): 232 ret = self._get_named_paths(npn) 233 elif identity is not None and identity.find(":") == -1: 234 ret = [self._find_one(npn, identity)] 235 # 236 # we need to be able to grab paths up to and starting from like this: 237 # $many.csvpaths.food:to 238 # $many.csvpaths.food:from 239 # 240 elif identity is not None: 241 i = identity.find(":") 242 directive = identity[i:] 243 identity = identity[0:i] 244 if directive == ":to": 245 ret = self._get_to(npn, identity) 246 elif directive == ":from": 247 ret = self._get_from(npn, identity) 248 else: 249 self.csvpaths.logger.error( 250 "Incorrect reference directive: name: %s, paths-name: %, identity: %", 251 name, 252 npn, 253 identity, 254 ) 255 raise InputException( 256 f"Reference directive must be :to or :from, not {directive}" 257 ) 258 return ret 259 260 def store_json_paths_file(self, name: str, jsonpath: str) -> None: 261 """@private""" 262 home = self.named_paths_home(name) 263 j = "" 264 with DataFileReader(jsonpath) as file: 265 j = file.read() 266 p = os.path.join(home, "definition.json") 267 with DataFileWriter(path=p) as writer: 268 writer.write(j) 269 270 @property 271 def named_paths_names(self) -> list[str]: 272 """@private""" 273 path = self.named_paths_dir 274 # names = [n for n in Nos(path).listdir() if (not n.startswith(".") and not n == "manifest.json")] 275 names = [ 276 n for n in Nos(path).listdir() if not Nos(os.path.join(path, n)).isfile() 277 ] 278 return names 279 280 def remove_named_paths(self, name: NamedPathsName, strict: bool = False) -> None: 281 """@private""" 282 if not self.has_named_paths(name) and strict is True: 283 raise InputException(f"Named-paths name {name} not found") 284 if not self.has_named_paths(name): 285 return 286 home = self.named_paths_home(name) 287 Nos(home).remove() 288 289 def remove_all_named_paths(self) -> None: 290 """@private""" 291 names = self.named_paths_names 292 for name in names: 293 self.remove_named_paths(name) 294 295 def has_named_paths(self, name: NamedPathsName) -> bool: 296 """@private""" 297 path = os.path.join(self.named_paths_dir, name) 298 return Nos(path).dir_exists() 299 300 def number_of_named_paths(self, name: NamedPathsName) -> int: 301 """@private""" 302 return len(self._get_named_paths(name)) 303 304 def total_named_paths(self) -> bool: 305 """@private""" 306 return len(self.named_paths_names) # pragma: no cover 307 308 # 309 # ================== internals ===================== 310 # 311 312 def _get_named_paths(self, name: NamedPathsName) -> list[Csvpath]: 313 if not self.has_named_paths(name): 314 return None 315 s = "" 316 path = self.named_paths_home(name) 317 grp = os.path.join(path, "group.csvpaths") 318 if Nos(grp).exists(): 319 with DataFileReader(grp) as reader: 320 s = reader.read() 321 cs = s.split("---- CSVPATH ----") 322 cs = [s for s in cs if s.strip() != ""] 323 # 324 # this update may not happen. it depends on if the group.csvpaths file has changed. 325 # if someone put a new group.csvpaths file by hand we want to capture its fingerprint 326 # for future reference. this shouldn't happen, but it probably will happen. 327 # 328 self.registrar.update_manifest_if(name=name, group_file_path=grp, paths=cs) 329 return cs 330 331 def _str_from_list(self, paths: list[Csvpath]) -> str: 332 """@private""" 333 f = "" 334 for _ in paths: 335 f = f"{f}\n\n---- CSVPATH ----\n\n{_}" 336 return f 337 338 def _copy_in(self, name, csvpathstr) -> None: 339 temp = self._group_file_path(name) 340 # 341 # TODO: use a DataFileWriter that supports S3 and local to write. 342 # 343 with DataFileWriter(path=temp, mode="w") as writer: 344 writer.append(csvpathstr) 345 return temp 346 347 def _group_file_path(self, name: NamedPathsName) -> str: 348 temp = os.path.join(self.named_paths_home(name), "group.csvpaths") 349 return temp 350 351 def _get_csvpaths_from_file(self, file_path: str) -> list[str]: 352 # 353 # TODO: use DataFileReader to support S3 and local 354 # 355 with DataFileReader(file_path) as reader: 356 cp = reader.read() 357 _ = [ 358 apath.strip() 359 for apath in cp.split(PathsManager.MARKER) 360 if apath.strip() != "" 361 ] 362 self.csvpaths.logger.debug("Found %s csvpaths in file", len(_)) 363 return _ 364 365 def _paths_name_path(self, pathsname) -> tuple[NamedPathsName, Identity]: 366 specificpath = None 367 i = pathsname.find("#") 368 if i > 0: 369 specificpath = pathsname[i + 1 :] 370 pathsname = pathsname[0:i] 371 return (pathsname, specificpath) 372 373 def _get_to(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]: 374 ps = [] 375 paths = self.get_identified_paths_in(npn) 376 for path in paths: 377 ps.append(path[1]) 378 if path[0] == identity: 379 break 380 return ps 381 382 def _get_from(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]: 383 ps = [] 384 paths = self.get_identified_paths_in(npn) 385 for path in paths: 386 if path[0] != identity and len(ps) == 0: 387 continue 388 ps.append(path[1]) 389 return ps 390 391 def get_identified_paths_in( 392 self, nps: NamedPathsName, paths: list[Csvpath] = None 393 ) -> list[tuple[Identity, Csvpath]]: 394 """@private""" 395 # used by PathsRegistrar 396 if paths is None: 397 paths = self.get_named_paths(nps) 398 idps = [] 399 for path in paths: 400 # 401 # we can get this from our self.csvpath, should we? 402 # 403 c = CsvPath() 404 MetadataParser(c).extract_metadata(instance=c, csvpath=path) 405 idps.append((c.identity, path)) 406 return idps 407 408 def _find_one(self, npn: NamedPathsName, identity: Identity) -> Csvpath: 409 if npn is not None: 410 paths = self.get_identified_paths_in(npn) 411 for path in paths: 412 if path[0] == identity: 413 return path[1] 414 raise InputException( 415 f"Path identified as '{identity}' must be in the group identitied as '{npn}'" 416 ) 417 418 def _name_from_name_part(self, name): 419 i = name.rfind(".") 420 if i == -1: 421 pass 422 else: 423 name = name[0:i] 424 return name
class
PathsManager:
26class PathsManager: 27 MARKER: str = "---- CSVPATH ----" 28 29 def __init__(self, *, csvpaths, named_paths=None): 30 """@private""" 31 self.csvpaths = csvpaths 32 """@private""" 33 self._registrar = None 34 35 # 36 # ================== publics ===================== 37 # 38 @property 39 def paths_root_manifest_path(self) -> str: 40 """@private""" 41 r = self.csvpaths.config.get(section="inputs", name="csvpaths") 42 p = os.path.join(r, "manifest.json") 43 if not Nos(r).dir_exists(): 44 Nos(r).makedirs() 45 if not Nos(p).exists(): 46 with DataFileWriter(path=p) as writer: 47 writer.write("[]") 48 return p 49 50 @property 51 def paths_root_manifest(self) -> str: 52 """@private""" 53 p = self.paths_root_manifest_path 54 with DataFileReader(p) as reader: 55 return json.load(reader.source) 56 57 @property 58 def registrar(self) -> PathsRegistrar: 59 """@private""" 60 if self._registrar is None: 61 self._registrar = PathsRegistrar(self.csvpaths) 62 return self._registrar 63 64 def named_paths_home(self, name: NamedPathsName) -> str: 65 """@private""" 66 home = os.path.join(self.named_paths_dir, name) 67 if not Nos(home).dir_exists(): 68 Nos(home).makedirs() 69 return home 70 71 @property 72 def named_paths_dir(self) -> str: 73 """@private""" 74 return self.csvpaths.config.inputs_csvpaths_path 75 76 def set_named_paths(self, np: dict[NamedPathsName, list[Csvpath]]) -> None: 77 for name in np: 78 if not isinstance(np[name], list): 79 msg = f"{name} does not key a list of csvpaths" 80 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 81 if self.csvpaths.ecoms.do_i_raise(): 82 raise InputException(msg) 83 return 84 for k, v in np.items(): 85 self.add_named_paths(name=k, paths=v) 86 self.csvpaths.logger.info("Set named-paths to %s groups", len(np)) 87 88 def add_named_paths_from_dir( 89 self, *, directory: str, name: NamedPathsName = None 90 ) -> None: 91 if directory is None: 92 msg = "Named paths collection name needed" 93 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 94 if self.csvpaths.ecoms.do_i_raise(): 95 raise InputException(msg) 96 if not Nos(directory).isfile(): 97 dlist = Nos(directory).listdir() 98 base = directory 99 agg = [] 100 for p in dlist: 101 if p[0] == ".": 102 continue 103 if p.find(".") == -1: 104 continue 105 ext = p[p.rfind(".") + 1 :].strip().lower() 106 if ext not in self.csvpaths.config.csvpath_file_extensions: 107 continue 108 path = os.path.join(base, p) 109 if name is None: 110 # 111 # add files one by one under their own names 112 # 113 aname = self._name_from_name_part(p) 114 self.add_named_paths_from_file(name=aname, file_path=path) 115 else: 116 # 117 # if a name, aggregate all the files 118 # 119 _ = self._get_csvpaths_from_file(path) 120 121 # 122 # try to find a run-index: N metadata and use it 123 # to try to impose order? we could do this, but it would 124 # be messy and a work-around to avoid making people 125 # use the ordered ways of creating named-paths that 126 # already exist: JSON and all-in-ones 127 # 128 """ 129 c = self.csvpaths.csvpath() 130 MetadataParser(c).extract_metadata(instance=c, csvpath=path) 131 """ 132 agg += _ 133 if len(agg) > 0: 134 self.add_named_paths(name=name, paths=agg, source_path=directory) 135 else: 136 msg = "Dirname must point to a directory" 137 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 138 if self.csvpaths.ecoms.do_i_raise(): 139 raise InputException(msg) 140 141 def add_named_paths_from_file( 142 self, *, name: NamedPathsName, file_path: str 143 ) -> None: 144 self.csvpaths.logger.debug("Reading csvpaths file at %s", file_path) 145 _ = self._get_csvpaths_from_file(file_path) 146 self.add_named_paths(name=name, paths=_, source_path=file_path) 147 148 def add_named_paths_from_json(self, file_path: str) -> None: 149 try: 150 self.csvpaths.logger.debug("Opening JSON file at %s", file_path) 151 with open(file_path, encoding="utf-8") as f: 152 j = json.load(f) 153 self.csvpaths.logger.debug("Found JSON file with %s keys", len(j)) 154 for k in j: 155 self.store_json_paths_file(k, file_path) 156 v = j[k] 157 paths = [] 158 for f in v: 159 _ = self._get_csvpaths_from_file(f) 160 paths += _ 161 self.add_named_paths(name=k, paths=paths, source_path=file_path) 162 except (OSError, ValueError, TypeError, JSONDecodeError) as ex: 163 self.csvpaths.error_manager.handle_error(source=self, msg=f"{ex}") 164 if self.csvpaths.ecoms.do_i_raise(): 165 raise 166 167 def add_named_paths( 168 self, 169 *, 170 name: NamedPathsName, 171 paths: list[Csvpath] = None, 172 from_file: str = None, 173 from_dir: str = None, 174 from_json: str = None, 175 source_path: str = None, 176 ) -> None: 177 if from_file is not None: 178 return self.add_named_paths_from_file(name=name, file_path=from_file) 179 elif from_dir is not None: 180 return self.add_named_paths_from_dir(name=name, directory=from_dir) 181 elif from_json is not None: 182 return self.add_named_paths_from_json(file_path=from_json) 183 if not isinstance(paths, list): 184 msg = """Paths must be a list of csvpaths. 185 If you want to load a file use add_named_paths_from_file or 186 set_named_paths_from_json.""" 187 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 188 if self.csvpaths.ecoms.do_i_raise(): 189 raise InputException(msg) 190 return 191 self.csvpaths.logger.debug("Adding csvpaths to named-paths group %s", name) 192 for _ in paths: 193 self.csvpaths.logger.debug("Adding %s to %s", _, name) 194 s = self._str_from_list(paths) 195 t = self._copy_in(name, s) 196 grp_paths = self.get_identified_paths_in(name, paths=paths) 197 ids = [t[0] for t in grp_paths] 198 for i, t in enumerate(ids): 199 if t is None or t.strip() == "": 200 ids[i] = f"{i}" 201 mdata = PathsMetadata(self.csvpaths.config) 202 mdata.archive_name = self.csvpaths.config.archive_name 203 mdata.named_paths_name = name 204 sep = Nos(mdata.named_paths_root).sep 205 mdata.named_paths_home = f"{mdata.named_paths_root}{sep}{name}" 206 mdata.group_file_path = f"{mdata.named_paths_home}{sep}group.csvpaths" 207 mdata.named_paths = paths 208 mdata.named_paths_identities = ids 209 mdata.named_paths_count = len(ids) 210 mdata.source_path = source_path 211 self.registrar.register_complete(mdata) 212 213 # 214 # adding ref handling for the form: $many.csvpaths.food 215 # which is equiv to: many#food 216 # 217 def get_named_paths(self, name: NamedPathsName) -> list[Csvpath]: 218 self.csvpaths.logger.info("Getting named-paths for %s", name) 219 ret = None 220 npn = None 221 identity = None 222 if name.startswith("$"): 223 ref = ReferenceParser(name) 224 if ref.datatype != ReferenceParser.CSVPATHS: 225 raise InputException( 226 f"Reference datatype must be {ReferenceParser.CSVPATHS}" 227 ) 228 npn = ref.root_major 229 identity = ref.name_one 230 else: 231 npn, identity = self._paths_name_path(name) 232 if identity is None and self.has_named_paths(npn): 233 ret = self._get_named_paths(npn) 234 elif identity is not None and identity.find(":") == -1: 235 ret = [self._find_one(npn, identity)] 236 # 237 # we need to be able to grab paths up to and starting from like this: 238 # $many.csvpaths.food:to 239 # $many.csvpaths.food:from 240 # 241 elif identity is not None: 242 i = identity.find(":") 243 directive = identity[i:] 244 identity = identity[0:i] 245 if directive == ":to": 246 ret = self._get_to(npn, identity) 247 elif directive == ":from": 248 ret = self._get_from(npn, identity) 249 else: 250 self.csvpaths.logger.error( 251 "Incorrect reference directive: name: %s, paths-name: %, identity: %", 252 name, 253 npn, 254 identity, 255 ) 256 raise InputException( 257 f"Reference directive must be :to or :from, not {directive}" 258 ) 259 return ret 260 261 def store_json_paths_file(self, name: str, jsonpath: str) -> None: 262 """@private""" 263 home = self.named_paths_home(name) 264 j = "" 265 with DataFileReader(jsonpath) as file: 266 j = file.read() 267 p = os.path.join(home, "definition.json") 268 with DataFileWriter(path=p) as writer: 269 writer.write(j) 270 271 @property 272 def named_paths_names(self) -> list[str]: 273 """@private""" 274 path = self.named_paths_dir 275 # names = [n for n in Nos(path).listdir() if (not n.startswith(".") and not n == "manifest.json")] 276 names = [ 277 n for n in Nos(path).listdir() if not Nos(os.path.join(path, n)).isfile() 278 ] 279 return names 280 281 def remove_named_paths(self, name: NamedPathsName, strict: bool = False) -> None: 282 """@private""" 283 if not self.has_named_paths(name) and strict is True: 284 raise InputException(f"Named-paths name {name} not found") 285 if not self.has_named_paths(name): 286 return 287 home = self.named_paths_home(name) 288 Nos(home).remove() 289 290 def remove_all_named_paths(self) -> None: 291 """@private""" 292 names = self.named_paths_names 293 for name in names: 294 self.remove_named_paths(name) 295 296 def has_named_paths(self, name: NamedPathsName) -> bool: 297 """@private""" 298 path = os.path.join(self.named_paths_dir, name) 299 return Nos(path).dir_exists() 300 301 def number_of_named_paths(self, name: NamedPathsName) -> int: 302 """@private""" 303 return len(self._get_named_paths(name)) 304 305 def total_named_paths(self) -> bool: 306 """@private""" 307 return len(self.named_paths_names) # pragma: no cover 308 309 # 310 # ================== internals ===================== 311 # 312 313 def _get_named_paths(self, name: NamedPathsName) -> list[Csvpath]: 314 if not self.has_named_paths(name): 315 return None 316 s = "" 317 path = self.named_paths_home(name) 318 grp = os.path.join(path, "group.csvpaths") 319 if Nos(grp).exists(): 320 with DataFileReader(grp) as reader: 321 s = reader.read() 322 cs = s.split("---- CSVPATH ----") 323 cs = [s for s in cs if s.strip() != ""] 324 # 325 # this update may not happen. it depends on if the group.csvpaths file has changed. 326 # if someone put a new group.csvpaths file by hand we want to capture its fingerprint 327 # for future reference. this shouldn't happen, but it probably will happen. 328 # 329 self.registrar.update_manifest_if(name=name, group_file_path=grp, paths=cs) 330 return cs 331 332 def _str_from_list(self, paths: list[Csvpath]) -> str: 333 """@private""" 334 f = "" 335 for _ in paths: 336 f = f"{f}\n\n---- CSVPATH ----\n\n{_}" 337 return f 338 339 def _copy_in(self, name, csvpathstr) -> None: 340 temp = self._group_file_path(name) 341 # 342 # TODO: use a DataFileWriter that supports S3 and local to write. 343 # 344 with DataFileWriter(path=temp, mode="w") as writer: 345 writer.append(csvpathstr) 346 return temp 347 348 def _group_file_path(self, name: NamedPathsName) -> str: 349 temp = os.path.join(self.named_paths_home(name), "group.csvpaths") 350 return temp 351 352 def _get_csvpaths_from_file(self, file_path: str) -> list[str]: 353 # 354 # TODO: use DataFileReader to support S3 and local 355 # 356 with DataFileReader(file_path) as reader: 357 cp = reader.read() 358 _ = [ 359 apath.strip() 360 for apath in cp.split(PathsManager.MARKER) 361 if apath.strip() != "" 362 ] 363 self.csvpaths.logger.debug("Found %s csvpaths in file", len(_)) 364 return _ 365 366 def _paths_name_path(self, pathsname) -> tuple[NamedPathsName, Identity]: 367 specificpath = None 368 i = pathsname.find("#") 369 if i > 0: 370 specificpath = pathsname[i + 1 :] 371 pathsname = pathsname[0:i] 372 return (pathsname, specificpath) 373 374 def _get_to(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]: 375 ps = [] 376 paths = self.get_identified_paths_in(npn) 377 for path in paths: 378 ps.append(path[1]) 379 if path[0] == identity: 380 break 381 return ps 382 383 def _get_from(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]: 384 ps = [] 385 paths = self.get_identified_paths_in(npn) 386 for path in paths: 387 if path[0] != identity and len(ps) == 0: 388 continue 389 ps.append(path[1]) 390 return ps 391 392 def get_identified_paths_in( 393 self, nps: NamedPathsName, paths: list[Csvpath] = None 394 ) -> list[tuple[Identity, Csvpath]]: 395 """@private""" 396 # used by PathsRegistrar 397 if paths is None: 398 paths = self.get_named_paths(nps) 399 idps = [] 400 for path in paths: 401 # 402 # we can get this from our self.csvpath, should we? 403 # 404 c = CsvPath() 405 MetadataParser(c).extract_metadata(instance=c, csvpath=path) 406 idps.append((c.identity, path)) 407 return idps 408 409 def _find_one(self, npn: NamedPathsName, identity: Identity) -> Csvpath: 410 if npn is not None: 411 paths = self.get_identified_paths_in(npn) 412 for path in paths: 413 if path[0] == identity: 414 return path[1] 415 raise InputException( 416 f"Path identified as '{identity}' must be in the group identitied as '{npn}'" 417 ) 418 419 def _name_from_name_part(self, name): 420 i = name.rfind(".") 421 if i == -1: 422 pass 423 else: 424 name = name[0:i] 425 return name
def
set_named_paths( self, np: dict[csvpath.managers.paths.paths_manager.NamedPathsName, list[csvpath.managers.paths.paths_manager.Csvpath]]) -> None:
76 def set_named_paths(self, np: dict[NamedPathsName, list[Csvpath]]) -> None: 77 for name in np: 78 if not isinstance(np[name], list): 79 msg = f"{name} does not key a list of csvpaths" 80 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 81 if self.csvpaths.ecoms.do_i_raise(): 82 raise InputException(msg) 83 return 84 for k, v in np.items(): 85 self.add_named_paths(name=k, paths=v) 86 self.csvpaths.logger.info("Set named-paths to %s groups", len(np))
def
add_named_paths_from_dir( self, *, directory: str, name: csvpath.managers.paths.paths_manager.NamedPathsName = None) -> None:
88 def add_named_paths_from_dir( 89 self, *, directory: str, name: NamedPathsName = None 90 ) -> None: 91 if directory is None: 92 msg = "Named paths collection name needed" 93 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 94 if self.csvpaths.ecoms.do_i_raise(): 95 raise InputException(msg) 96 if not Nos(directory).isfile(): 97 dlist = Nos(directory).listdir() 98 base = directory 99 agg = [] 100 for p in dlist: 101 if p[0] == ".": 102 continue 103 if p.find(".") == -1: 104 continue 105 ext = p[p.rfind(".") + 1 :].strip().lower() 106 if ext not in self.csvpaths.config.csvpath_file_extensions: 107 continue 108 path = os.path.join(base, p) 109 if name is None: 110 # 111 # add files one by one under their own names 112 # 113 aname = self._name_from_name_part(p) 114 self.add_named_paths_from_file(name=aname, file_path=path) 115 else: 116 # 117 # if a name, aggregate all the files 118 # 119 _ = self._get_csvpaths_from_file(path) 120 121 # 122 # try to find a run-index: N metadata and use it 123 # to try to impose order? we could do this, but it would 124 # be messy and a work-around to avoid making people 125 # use the ordered ways of creating named-paths that 126 # already exist: JSON and all-in-ones 127 # 128 """ 129 c = self.csvpaths.csvpath() 130 MetadataParser(c).extract_metadata(instance=c, csvpath=path) 131 """ 132 agg += _ 133 if len(agg) > 0: 134 self.add_named_paths(name=name, paths=agg, source_path=directory) 135 else: 136 msg = "Dirname must point to a directory" 137 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 138 if self.csvpaths.ecoms.do_i_raise(): 139 raise InputException(msg)
def
add_named_paths_from_file( self, *, name: csvpath.managers.paths.paths_manager.NamedPathsName, file_path: str) -> None:
def
add_named_paths_from_json(self, file_path: str) -> None:
148 def add_named_paths_from_json(self, file_path: str) -> None: 149 try: 150 self.csvpaths.logger.debug("Opening JSON file at %s", file_path) 151 with open(file_path, encoding="utf-8") as f: 152 j = json.load(f) 153 self.csvpaths.logger.debug("Found JSON file with %s keys", len(j)) 154 for k in j: 155 self.store_json_paths_file(k, file_path) 156 v = j[k] 157 paths = [] 158 for f in v: 159 _ = self._get_csvpaths_from_file(f) 160 paths += _ 161 self.add_named_paths(name=k, paths=paths, source_path=file_path) 162 except (OSError, ValueError, TypeError, JSONDecodeError) as ex: 163 self.csvpaths.error_manager.handle_error(source=self, msg=f"{ex}") 164 if self.csvpaths.ecoms.do_i_raise(): 165 raise
def
add_named_paths( self, *, name: csvpath.managers.paths.paths_manager.NamedPathsName, paths: list[csvpath.managers.paths.paths_manager.Csvpath] = None, from_file: str = None, from_dir: str = None, from_json: str = None, source_path: str = None) -> None:
167 def add_named_paths( 168 self, 169 *, 170 name: NamedPathsName, 171 paths: list[Csvpath] = None, 172 from_file: str = None, 173 from_dir: str = None, 174 from_json: str = None, 175 source_path: str = None, 176 ) -> None: 177 if from_file is not None: 178 return self.add_named_paths_from_file(name=name, file_path=from_file) 179 elif from_dir is not None: 180 return self.add_named_paths_from_dir(name=name, directory=from_dir) 181 elif from_json is not None: 182 return self.add_named_paths_from_json(file_path=from_json) 183 if not isinstance(paths, list): 184 msg = """Paths must be a list of csvpaths. 185 If you want to load a file use add_named_paths_from_file or 186 set_named_paths_from_json.""" 187 self.csvpaths.error_manager.handle_error(source=self, msg=msg) 188 if self.csvpaths.ecoms.do_i_raise(): 189 raise InputException(msg) 190 return 191 self.csvpaths.logger.debug("Adding csvpaths to named-paths group %s", name) 192 for _ in paths: 193 self.csvpaths.logger.debug("Adding %s to %s", _, name) 194 s = self._str_from_list(paths) 195 t = self._copy_in(name, s) 196 grp_paths = self.get_identified_paths_in(name, paths=paths) 197 ids = [t[0] for t in grp_paths] 198 for i, t in enumerate(ids): 199 if t is None or t.strip() == "": 200 ids[i] = f"{i}" 201 mdata = PathsMetadata(self.csvpaths.config) 202 mdata.archive_name = self.csvpaths.config.archive_name 203 mdata.named_paths_name = name 204 sep = Nos(mdata.named_paths_root).sep 205 mdata.named_paths_home = f"{mdata.named_paths_root}{sep}{name}" 206 mdata.group_file_path = f"{mdata.named_paths_home}{sep}group.csvpaths" 207 mdata.named_paths = paths 208 mdata.named_paths_identities = ids 209 mdata.named_paths_count = len(ids) 210 mdata.source_path = source_path 211 self.registrar.register_complete(mdata)
def
get_named_paths( self, name: csvpath.managers.paths.paths_manager.NamedPathsName) -> list[csvpath.managers.paths.paths_manager.Csvpath]:
217 def get_named_paths(self, name: NamedPathsName) -> list[Csvpath]: 218 self.csvpaths.logger.info("Getting named-paths for %s", name) 219 ret = None 220 npn = None 221 identity = None 222 if name.startswith("$"): 223 ref = ReferenceParser(name) 224 if ref.datatype != ReferenceParser.CSVPATHS: 225 raise InputException( 226 f"Reference datatype must be {ReferenceParser.CSVPATHS}" 227 ) 228 npn = ref.root_major 229 identity = ref.name_one 230 else: 231 npn, identity = self._paths_name_path(name) 232 if identity is None and self.has_named_paths(npn): 233 ret = self._get_named_paths(npn) 234 elif identity is not None and identity.find(":") == -1: 235 ret = [self._find_one(npn, identity)] 236 # 237 # we need to be able to grab paths up to and starting from like this: 238 # $many.csvpaths.food:to 239 # $many.csvpaths.food:from 240 # 241 elif identity is not None: 242 i = identity.find(":") 243 directive = identity[i:] 244 identity = identity[0:i] 245 if directive == ":to": 246 ret = self._get_to(npn, identity) 247 elif directive == ":from": 248 ret = self._get_from(npn, identity) 249 else: 250 self.csvpaths.logger.error( 251 "Incorrect reference directive: name: %s, paths-name: %, identity: %", 252 name, 253 npn, 254 identity, 255 ) 256 raise InputException( 257 f"Reference directive must be :to or :from, not {directive}" 258 ) 259 return ret