csvpath.managers.paths.paths_manager

  1# pylint: disable=C0114
  2import os
  3import json
  4from typing import NewType
  5from json import JSONDecodeError
  6from csvpath import CsvPath
  7from csvpath.util.exceptions import InputException
  8from csvpath.util.metadata_parser import MetadataParser
  9from csvpath.util.reference_parser import ReferenceParser
 10from csvpath.util.file_readers import DataFileReader
 11from csvpath.util.file_writers import DataFileWriter
 12from csvpath.util.nos import Nos
 13from .paths_registrar import PathsRegistrar
 14from .paths_metadata import PathsMetadata
 15
 16# types added just for clarity
 17NamedPathsName = NewType("NamedPathsName", str)
 18"""@private"""
 19Csvpath = NewType("Csvpath", str)
 20"""@private"""
 21Identity = NewType("Identity", str)
 22"""@private"""
 23
 24
 25class PathsManager:
 26    MARKER: str = "---- CSVPATH ----"
 27
 28    def __init__(self, *, csvpaths, named_paths=None):
 29        """@private"""
 30        self.csvpaths = csvpaths
 31        """@private"""
 32        self._registrar = None
 33
 34    #
 35    # ================== publics =====================
 36    #
 37    @property
 38    def paths_root_manifest_path(self) -> str:
 39        """@private"""
 40        r = self.csvpaths.config.get(section="inputs", name="csvpaths")
 41        p = os.path.join(r, "manifest.json")
 42        if not Nos(r).dir_exists():
 43            Nos(r).makedirs()
 44        if not Nos(p).exists():
 45            with DataFileWriter(path=p) as writer:
 46                writer.write("[]")
 47        return p
 48
 49    @property
 50    def paths_root_manifest(self) -> str:
 51        """@private"""
 52        p = self.paths_root_manifest_path
 53        with DataFileReader(p) as reader:
 54            return json.load(reader.source)
 55
 56    @property
 57    def registrar(self) -> PathsRegistrar:
 58        """@private"""
 59        if self._registrar is None:
 60            self._registrar = PathsRegistrar(self.csvpaths)
 61        return self._registrar
 62
 63    def named_paths_home(self, name: NamedPathsName) -> str:
 64        """@private"""
 65        home = os.path.join(self.named_paths_dir, name)
 66        if not Nos(home).dir_exists():
 67            Nos(home).makedirs()
 68        return home
 69
 70    @property
 71    def named_paths_dir(self) -> str:
 72        """@private"""
 73        return self.csvpaths.config.inputs_csvpaths_path
 74
 75    def set_named_paths(self, np: dict[NamedPathsName, list[Csvpath]]) -> None:
 76        for name in np:
 77            if not isinstance(np[name], list):
 78                msg = f"{name} does not key a list of csvpaths"
 79                self.csvpaths.error_manager.handle_error(source=self, msg=msg)
 80                if self.csvpaths.ecoms.do_i_raise():
 81                    raise InputException(msg)
 82                return
 83        for k, v in np.items():
 84            self.add_named_paths(name=k, paths=v)
 85        self.csvpaths.logger.info("Set named-paths to %s groups", len(np))
 86
 87    def add_named_paths_from_dir(
 88        self, *, directory: str, name: NamedPathsName = None
 89    ) -> None:
 90        if directory is None:
 91            msg = "Named paths collection name needed"
 92            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
 93            if self.csvpaths.ecoms.do_i_raise():
 94                raise InputException(msg)
 95        if not Nos(directory).isfile():
 96            dlist = Nos(directory).listdir()
 97            base = directory
 98            agg = []
 99            for p in dlist:
100                if p[0] == ".":
101                    continue
102                if p.find(".") == -1:
103                    continue
104                ext = p[p.rfind(".") + 1 :].strip().lower()
105                if ext not in self.csvpaths.config.csvpath_file_extensions:
106                    continue
107                path = os.path.join(base, p)
108                if name is None:
109                    #
110                    # add files one by one under their own names
111                    #
112                    aname = self._name_from_name_part(p)
113                    self.add_named_paths_from_file(name=aname, file_path=path)
114                else:
115                    #
116                    # if a name, aggregate all the files
117                    #
118                    _ = self._get_csvpaths_from_file(path)
119
120                    #
121                    # try to find a run-index: N metadata and use it
122                    # to try to impose order? we could do this, but it would
123                    # be messy and a work-around to avoid making people
124                    # use the ordered ways of creating named-paths that
125                    # already exist: JSON and all-in-ones
126                    #
127                    """
128                    c = self.csvpaths.csvpath()
129                    MetadataParser(c).extract_metadata(instance=c, csvpath=path)
130                    """
131                    agg += _
132            if len(agg) > 0:
133                self.add_named_paths(name=name, paths=agg, source_path=directory)
134        else:
135            msg = "Dirname must point to a directory"
136            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
137            if self.csvpaths.ecoms.do_i_raise():
138                raise InputException(msg)
139
140    def add_named_paths_from_file(
141        self, *, name: NamedPathsName, file_path: str
142    ) -> None:
143        self.csvpaths.logger.debug("Reading csvpaths file at %s", file_path)
144        _ = self._get_csvpaths_from_file(file_path)
145        self.add_named_paths(name=name, paths=_, source_path=file_path)
146
147    def add_named_paths_from_json(self, file_path: str) -> None:
148        try:
149            self.csvpaths.logger.debug("Opening JSON file at %s", file_path)
150            with open(file_path, encoding="utf-8") as f:
151                j = json.load(f)
152                self.csvpaths.logger.debug("Found JSON file with %s keys", len(j))
153                for k in j:
154                    self.store_json_paths_file(k, file_path)
155                    v = j[k]
156                    paths = []
157                    for f in v:
158                        _ = self._get_csvpaths_from_file(f)
159                        paths += _
160                    self.add_named_paths(name=k, paths=paths, source_path=file_path)
161        except (OSError, ValueError, TypeError, JSONDecodeError) as ex:
162            self.csvpaths.error_manager.handle_error(source=self, msg=f"{ex}")
163            if self.csvpaths.ecoms.do_i_raise():
164                raise
165
166    def add_named_paths(
167        self,
168        *,
169        name: NamedPathsName,
170        paths: list[Csvpath] = None,
171        from_file: str = None,
172        from_dir: str = None,
173        from_json: str = None,
174        source_path: str = None,
175    ) -> None:
176        if from_file is not None:
177            return self.add_named_paths_from_file(name=name, file_path=from_file)
178        elif from_dir is not None:
179            return self.add_named_paths_from_dir(name=name, directory=from_dir)
180        elif from_json is not None:
181            return self.add_named_paths_from_json(file_path=from_json)
182        if not isinstance(paths, list):
183            msg = """Paths must be a list of csvpaths.
184                    If you want to load a file use add_named_paths_from_file or
185                    set_named_paths_from_json."""
186            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
187            if self.csvpaths.ecoms.do_i_raise():
188                raise InputException(msg)
189            return
190        self.csvpaths.logger.debug("Adding csvpaths to named-paths group %s", name)
191        for _ in paths:
192            self.csvpaths.logger.debug("Adding %s to %s", _, name)
193        s = self._str_from_list(paths)
194        t = self._copy_in(name, s)
195        grp_paths = self.get_identified_paths_in(name, paths=paths)
196        ids = [t[0] for t in grp_paths]
197        for i, t in enumerate(ids):
198            if t is None or t.strip() == "":
199                ids[i] = f"{i}"
200        mdata = PathsMetadata(self.csvpaths.config)
201        mdata.archive_name = self.csvpaths.config.archive_name
202        mdata.named_paths_name = name
203        sep = Nos(mdata.named_paths_root).sep
204        mdata.named_paths_home = f"{mdata.named_paths_root}{sep}{name}"
205        mdata.group_file_path = f"{mdata.named_paths_home}{sep}group.csvpaths"
206        mdata.named_paths = paths
207        mdata.named_paths_identities = ids
208        mdata.named_paths_count = len(ids)
209        mdata.source_path = source_path
210        self.registrar.register_complete(mdata)
211
212    #
213    # adding ref handling for the form: $many.csvpaths.food
214    # which is equiv to: many#food
215    #
216    def get_named_paths(self, name: NamedPathsName) -> list[Csvpath]:
217        self.csvpaths.logger.info("Getting named-paths for %s", name)
218        ret = None
219        npn = None
220        identity = None
221        if name.startswith("$"):
222            ref = ReferenceParser(name)
223            if ref.datatype != ReferenceParser.CSVPATHS:
224                raise InputException(
225                    f"Reference datatype must be {ReferenceParser.CSVPATHS}"
226                )
227            npn = ref.root_major
228            identity = ref.name_one
229        else:
230            npn, identity = self._paths_name_path(name)
231        if identity is None and self.has_named_paths(npn):
232            ret = self._get_named_paths(npn)
233        elif identity is not None and identity.find(":") == -1:
234            ret = [self._find_one(npn, identity)]
235        #
236        # we need to be able to grab paths up to and starting from like this:
237        #   $many.csvpaths.food:to
238        #   $many.csvpaths.food:from
239        #
240        elif identity is not None:
241            i = identity.find(":")
242            directive = identity[i:]
243            identity = identity[0:i]
244            if directive == ":to":
245                ret = self._get_to(npn, identity)
246            elif directive == ":from":
247                ret = self._get_from(npn, identity)
248            else:
249                self.csvpaths.logger.error(
250                    "Incorrect reference directive: name: %s, paths-name: %, identity: %",
251                    name,
252                    npn,
253                    identity,
254                )
255                raise InputException(
256                    f"Reference directive must be :to or :from, not {directive}"
257                )
258        return ret
259
260    def store_json_paths_file(self, name: str, jsonpath: str) -> None:
261        """@private"""
262        home = self.named_paths_home(name)
263        j = ""
264        with DataFileReader(jsonpath) as file:
265            j = file.read()
266        p = os.path.join(home, "definition.json")
267        with DataFileWriter(path=p) as writer:
268            writer.write(j)
269
270    @property
271    def named_paths_names(self) -> list[str]:
272        """@private"""
273        path = self.named_paths_dir
274        # names = [n for n in Nos(path).listdir() if (not n.startswith(".") and not n == "manifest.json")]
275        names = [
276            n for n in Nos(path).listdir() if not Nos(os.path.join(path, n)).isfile()
277        ]
278        return names
279
280    def remove_named_paths(self, name: NamedPathsName, strict: bool = False) -> None:
281        """@private"""
282        if not self.has_named_paths(name) and strict is True:
283            raise InputException(f"Named-paths name {name} not found")
284        if not self.has_named_paths(name):
285            return
286        home = self.named_paths_home(name)
287        Nos(home).remove()
288
289    def remove_all_named_paths(self) -> None:
290        """@private"""
291        names = self.named_paths_names
292        for name in names:
293            self.remove_named_paths(name)
294
295    def has_named_paths(self, name: NamedPathsName) -> bool:
296        """@private"""
297        path = os.path.join(self.named_paths_dir, name)
298        return Nos(path).dir_exists()
299
300    def number_of_named_paths(self, name: NamedPathsName) -> int:
301        """@private"""
302        return len(self._get_named_paths(name))
303
304    def total_named_paths(self) -> bool:
305        """@private"""
306        return len(self.named_paths_names)  # pragma: no cover
307
308    #
309    # ================== internals =====================
310    #
311
312    def _get_named_paths(self, name: NamedPathsName) -> list[Csvpath]:
313        if not self.has_named_paths(name):
314            return None
315        s = ""
316        path = self.named_paths_home(name)
317        grp = os.path.join(path, "group.csvpaths")
318        if Nos(grp).exists():
319            with DataFileReader(grp) as reader:
320                s = reader.read()
321        cs = s.split("---- CSVPATH ----")
322        cs = [s for s in cs if s.strip() != ""]
323        #
324        # this update may not happen. it depends on if the group.csvpaths file has changed.
325        # if someone put a new group.csvpaths file by hand we want to capture its fingerprint
326        # for future reference. this shouldn't happen, but it probably will happen.
327        #
328        self.registrar.update_manifest_if(name=name, group_file_path=grp, paths=cs)
329        return cs
330
331    def _str_from_list(self, paths: list[Csvpath]) -> str:
332        """@private"""
333        f = ""
334        for _ in paths:
335            f = f"{f}\n\n---- CSVPATH ----\n\n{_}"
336        return f
337
338    def _copy_in(self, name, csvpathstr) -> None:
339        temp = self._group_file_path(name)
340        #
341        # TODO: use a DataFileWriter that supports S3 and local to write.
342        #
343        with DataFileWriter(path=temp, mode="w") as writer:
344            writer.append(csvpathstr)
345        return temp
346
347    def _group_file_path(self, name: NamedPathsName) -> str:
348        temp = os.path.join(self.named_paths_home(name), "group.csvpaths")
349        return temp
350
351    def _get_csvpaths_from_file(self, file_path: str) -> list[str]:
352        #
353        # TODO: use DataFileReader to support S3 and local
354        #
355        with DataFileReader(file_path) as reader:
356            cp = reader.read()
357            _ = [
358                apath.strip()
359                for apath in cp.split(PathsManager.MARKER)
360                if apath.strip() != ""
361            ]
362            self.csvpaths.logger.debug("Found %s csvpaths in file", len(_))
363            return _
364
365    def _paths_name_path(self, pathsname) -> tuple[NamedPathsName, Identity]:
366        specificpath = None
367        i = pathsname.find("#")
368        if i > 0:
369            specificpath = pathsname[i + 1 :]
370            pathsname = pathsname[0:i]
371        return (pathsname, specificpath)
372
373    def _get_to(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]:
374        ps = []
375        paths = self.get_identified_paths_in(npn)
376        for path in paths:
377            ps.append(path[1])
378            if path[0] == identity:
379                break
380        return ps
381
382    def _get_from(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]:
383        ps = []
384        paths = self.get_identified_paths_in(npn)
385        for path in paths:
386            if path[0] != identity and len(ps) == 0:
387                continue
388            ps.append(path[1])
389        return ps
390
391    def get_identified_paths_in(
392        self, nps: NamedPathsName, paths: list[Csvpath] = None
393    ) -> list[tuple[Identity, Csvpath]]:
394        """@private"""
395        # used by PathsRegistrar
396        if paths is None:
397            paths = self.get_named_paths(nps)
398        idps = []
399        for path in paths:
400            #
401            # we can get this from our self.csvpath, should we?
402            #
403            c = CsvPath()
404            MetadataParser(c).extract_metadata(instance=c, csvpath=path)
405            idps.append((c.identity, path))
406        return idps
407
408    def _find_one(self, npn: NamedPathsName, identity: Identity) -> Csvpath:
409        if npn is not None:
410            paths = self.get_identified_paths_in(npn)
411            for path in paths:
412                if path[0] == identity:
413                    return path[1]
414        raise InputException(
415            f"Path identified as '{identity}' must be in the group identitied as '{npn}'"
416        )
417
418    def _name_from_name_part(self, name):
419        i = name.rfind(".")
420        if i == -1:
421            pass
422        else:
423            name = name[0:i]
424        return name
class PathsManager:
 26class PathsManager:
 27    MARKER: str = "---- CSVPATH ----"
 28
 29    def __init__(self, *, csvpaths, named_paths=None):
 30        """@private"""
 31        self.csvpaths = csvpaths
 32        """@private"""
 33        self._registrar = None
 34
 35    #
 36    # ================== publics =====================
 37    #
 38    @property
 39    def paths_root_manifest_path(self) -> str:
 40        """@private"""
 41        r = self.csvpaths.config.get(section="inputs", name="csvpaths")
 42        p = os.path.join(r, "manifest.json")
 43        if not Nos(r).dir_exists():
 44            Nos(r).makedirs()
 45        if not Nos(p).exists():
 46            with DataFileWriter(path=p) as writer:
 47                writer.write("[]")
 48        return p
 49
 50    @property
 51    def paths_root_manifest(self) -> str:
 52        """@private"""
 53        p = self.paths_root_manifest_path
 54        with DataFileReader(p) as reader:
 55            return json.load(reader.source)
 56
 57    @property
 58    def registrar(self) -> PathsRegistrar:
 59        """@private"""
 60        if self._registrar is None:
 61            self._registrar = PathsRegistrar(self.csvpaths)
 62        return self._registrar
 63
 64    def named_paths_home(self, name: NamedPathsName) -> str:
 65        """@private"""
 66        home = os.path.join(self.named_paths_dir, name)
 67        if not Nos(home).dir_exists():
 68            Nos(home).makedirs()
 69        return home
 70
 71    @property
 72    def named_paths_dir(self) -> str:
 73        """@private"""
 74        return self.csvpaths.config.inputs_csvpaths_path
 75
 76    def set_named_paths(self, np: dict[NamedPathsName, list[Csvpath]]) -> None:
 77        for name in np:
 78            if not isinstance(np[name], list):
 79                msg = f"{name} does not key a list of csvpaths"
 80                self.csvpaths.error_manager.handle_error(source=self, msg=msg)
 81                if self.csvpaths.ecoms.do_i_raise():
 82                    raise InputException(msg)
 83                return
 84        for k, v in np.items():
 85            self.add_named_paths(name=k, paths=v)
 86        self.csvpaths.logger.info("Set named-paths to %s groups", len(np))
 87
 88    def add_named_paths_from_dir(
 89        self, *, directory: str, name: NamedPathsName = None
 90    ) -> None:
 91        if directory is None:
 92            msg = "Named paths collection name needed"
 93            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
 94            if self.csvpaths.ecoms.do_i_raise():
 95                raise InputException(msg)
 96        if not Nos(directory).isfile():
 97            dlist = Nos(directory).listdir()
 98            base = directory
 99            agg = []
100            for p in dlist:
101                if p[0] == ".":
102                    continue
103                if p.find(".") == -1:
104                    continue
105                ext = p[p.rfind(".") + 1 :].strip().lower()
106                if ext not in self.csvpaths.config.csvpath_file_extensions:
107                    continue
108                path = os.path.join(base, p)
109                if name is None:
110                    #
111                    # add files one by one under their own names
112                    #
113                    aname = self._name_from_name_part(p)
114                    self.add_named_paths_from_file(name=aname, file_path=path)
115                else:
116                    #
117                    # if a name, aggregate all the files
118                    #
119                    _ = self._get_csvpaths_from_file(path)
120
121                    #
122                    # try to find a run-index: N metadata and use it
123                    # to try to impose order? we could do this, but it would
124                    # be messy and a work-around to avoid making people
125                    # use the ordered ways of creating named-paths that
126                    # already exist: JSON and all-in-ones
127                    #
128                    """
129                    c = self.csvpaths.csvpath()
130                    MetadataParser(c).extract_metadata(instance=c, csvpath=path)
131                    """
132                    agg += _
133            if len(agg) > 0:
134                self.add_named_paths(name=name, paths=agg, source_path=directory)
135        else:
136            msg = "Dirname must point to a directory"
137            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
138            if self.csvpaths.ecoms.do_i_raise():
139                raise InputException(msg)
140
141    def add_named_paths_from_file(
142        self, *, name: NamedPathsName, file_path: str
143    ) -> None:
144        self.csvpaths.logger.debug("Reading csvpaths file at %s", file_path)
145        _ = self._get_csvpaths_from_file(file_path)
146        self.add_named_paths(name=name, paths=_, source_path=file_path)
147
148    def add_named_paths_from_json(self, file_path: str) -> None:
149        try:
150            self.csvpaths.logger.debug("Opening JSON file at %s", file_path)
151            with open(file_path, encoding="utf-8") as f:
152                j = json.load(f)
153                self.csvpaths.logger.debug("Found JSON file with %s keys", len(j))
154                for k in j:
155                    self.store_json_paths_file(k, file_path)
156                    v = j[k]
157                    paths = []
158                    for f in v:
159                        _ = self._get_csvpaths_from_file(f)
160                        paths += _
161                    self.add_named_paths(name=k, paths=paths, source_path=file_path)
162        except (OSError, ValueError, TypeError, JSONDecodeError) as ex:
163            self.csvpaths.error_manager.handle_error(source=self, msg=f"{ex}")
164            if self.csvpaths.ecoms.do_i_raise():
165                raise
166
167    def add_named_paths(
168        self,
169        *,
170        name: NamedPathsName,
171        paths: list[Csvpath] = None,
172        from_file: str = None,
173        from_dir: str = None,
174        from_json: str = None,
175        source_path: str = None,
176    ) -> None:
177        if from_file is not None:
178            return self.add_named_paths_from_file(name=name, file_path=from_file)
179        elif from_dir is not None:
180            return self.add_named_paths_from_dir(name=name, directory=from_dir)
181        elif from_json is not None:
182            return self.add_named_paths_from_json(file_path=from_json)
183        if not isinstance(paths, list):
184            msg = """Paths must be a list of csvpaths.
185                    If you want to load a file use add_named_paths_from_file or
186                    set_named_paths_from_json."""
187            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
188            if self.csvpaths.ecoms.do_i_raise():
189                raise InputException(msg)
190            return
191        self.csvpaths.logger.debug("Adding csvpaths to named-paths group %s", name)
192        for _ in paths:
193            self.csvpaths.logger.debug("Adding %s to %s", _, name)
194        s = self._str_from_list(paths)
195        t = self._copy_in(name, s)
196        grp_paths = self.get_identified_paths_in(name, paths=paths)
197        ids = [t[0] for t in grp_paths]
198        for i, t in enumerate(ids):
199            if t is None or t.strip() == "":
200                ids[i] = f"{i}"
201        mdata = PathsMetadata(self.csvpaths.config)
202        mdata.archive_name = self.csvpaths.config.archive_name
203        mdata.named_paths_name = name
204        sep = Nos(mdata.named_paths_root).sep
205        mdata.named_paths_home = f"{mdata.named_paths_root}{sep}{name}"
206        mdata.group_file_path = f"{mdata.named_paths_home}{sep}group.csvpaths"
207        mdata.named_paths = paths
208        mdata.named_paths_identities = ids
209        mdata.named_paths_count = len(ids)
210        mdata.source_path = source_path
211        self.registrar.register_complete(mdata)
212
213    #
214    # adding ref handling for the form: $many.csvpaths.food
215    # which is equiv to: many#food
216    #
217    def get_named_paths(self, name: NamedPathsName) -> list[Csvpath]:
218        self.csvpaths.logger.info("Getting named-paths for %s", name)
219        ret = None
220        npn = None
221        identity = None
222        if name.startswith("$"):
223            ref = ReferenceParser(name)
224            if ref.datatype != ReferenceParser.CSVPATHS:
225                raise InputException(
226                    f"Reference datatype must be {ReferenceParser.CSVPATHS}"
227                )
228            npn = ref.root_major
229            identity = ref.name_one
230        else:
231            npn, identity = self._paths_name_path(name)
232        if identity is None and self.has_named_paths(npn):
233            ret = self._get_named_paths(npn)
234        elif identity is not None and identity.find(":") == -1:
235            ret = [self._find_one(npn, identity)]
236        #
237        # we need to be able to grab paths up to and starting from like this:
238        #   $many.csvpaths.food:to
239        #   $many.csvpaths.food:from
240        #
241        elif identity is not None:
242            i = identity.find(":")
243            directive = identity[i:]
244            identity = identity[0:i]
245            if directive == ":to":
246                ret = self._get_to(npn, identity)
247            elif directive == ":from":
248                ret = self._get_from(npn, identity)
249            else:
250                self.csvpaths.logger.error(
251                    "Incorrect reference directive: name: %s, paths-name: %, identity: %",
252                    name,
253                    npn,
254                    identity,
255                )
256                raise InputException(
257                    f"Reference directive must be :to or :from, not {directive}"
258                )
259        return ret
260
261    def store_json_paths_file(self, name: str, jsonpath: str) -> None:
262        """@private"""
263        home = self.named_paths_home(name)
264        j = ""
265        with DataFileReader(jsonpath) as file:
266            j = file.read()
267        p = os.path.join(home, "definition.json")
268        with DataFileWriter(path=p) as writer:
269            writer.write(j)
270
271    @property
272    def named_paths_names(self) -> list[str]:
273        """@private"""
274        path = self.named_paths_dir
275        # names = [n for n in Nos(path).listdir() if (not n.startswith(".") and not n == "manifest.json")]
276        names = [
277            n for n in Nos(path).listdir() if not Nos(os.path.join(path, n)).isfile()
278        ]
279        return names
280
281    def remove_named_paths(self, name: NamedPathsName, strict: bool = False) -> None:
282        """@private"""
283        if not self.has_named_paths(name) and strict is True:
284            raise InputException(f"Named-paths name {name} not found")
285        if not self.has_named_paths(name):
286            return
287        home = self.named_paths_home(name)
288        Nos(home).remove()
289
290    def remove_all_named_paths(self) -> None:
291        """@private"""
292        names = self.named_paths_names
293        for name in names:
294            self.remove_named_paths(name)
295
296    def has_named_paths(self, name: NamedPathsName) -> bool:
297        """@private"""
298        path = os.path.join(self.named_paths_dir, name)
299        return Nos(path).dir_exists()
300
301    def number_of_named_paths(self, name: NamedPathsName) -> int:
302        """@private"""
303        return len(self._get_named_paths(name))
304
305    def total_named_paths(self) -> bool:
306        """@private"""
307        return len(self.named_paths_names)  # pragma: no cover
308
309    #
310    # ================== internals =====================
311    #
312
313    def _get_named_paths(self, name: NamedPathsName) -> list[Csvpath]:
314        if not self.has_named_paths(name):
315            return None
316        s = ""
317        path = self.named_paths_home(name)
318        grp = os.path.join(path, "group.csvpaths")
319        if Nos(grp).exists():
320            with DataFileReader(grp) as reader:
321                s = reader.read()
322        cs = s.split("---- CSVPATH ----")
323        cs = [s for s in cs if s.strip() != ""]
324        #
325        # this update may not happen. it depends on if the group.csvpaths file has changed.
326        # if someone put a new group.csvpaths file by hand we want to capture its fingerprint
327        # for future reference. this shouldn't happen, but it probably will happen.
328        #
329        self.registrar.update_manifest_if(name=name, group_file_path=grp, paths=cs)
330        return cs
331
332    def _str_from_list(self, paths: list[Csvpath]) -> str:
333        """@private"""
334        f = ""
335        for _ in paths:
336            f = f"{f}\n\n---- CSVPATH ----\n\n{_}"
337        return f
338
339    def _copy_in(self, name, csvpathstr) -> None:
340        temp = self._group_file_path(name)
341        #
342        # TODO: use a DataFileWriter that supports S3 and local to write.
343        #
344        with DataFileWriter(path=temp, mode="w") as writer:
345            writer.append(csvpathstr)
346        return temp
347
348    def _group_file_path(self, name: NamedPathsName) -> str:
349        temp = os.path.join(self.named_paths_home(name), "group.csvpaths")
350        return temp
351
352    def _get_csvpaths_from_file(self, file_path: str) -> list[str]:
353        #
354        # TODO: use DataFileReader to support S3 and local
355        #
356        with DataFileReader(file_path) as reader:
357            cp = reader.read()
358            _ = [
359                apath.strip()
360                for apath in cp.split(PathsManager.MARKER)
361                if apath.strip() != ""
362            ]
363            self.csvpaths.logger.debug("Found %s csvpaths in file", len(_))
364            return _
365
366    def _paths_name_path(self, pathsname) -> tuple[NamedPathsName, Identity]:
367        specificpath = None
368        i = pathsname.find("#")
369        if i > 0:
370            specificpath = pathsname[i + 1 :]
371            pathsname = pathsname[0:i]
372        return (pathsname, specificpath)
373
374    def _get_to(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]:
375        ps = []
376        paths = self.get_identified_paths_in(npn)
377        for path in paths:
378            ps.append(path[1])
379            if path[0] == identity:
380                break
381        return ps
382
383    def _get_from(self, npn: NamedPathsName, identity: Identity) -> list[Csvpath]:
384        ps = []
385        paths = self.get_identified_paths_in(npn)
386        for path in paths:
387            if path[0] != identity and len(ps) == 0:
388                continue
389            ps.append(path[1])
390        return ps
391
392    def get_identified_paths_in(
393        self, nps: NamedPathsName, paths: list[Csvpath] = None
394    ) -> list[tuple[Identity, Csvpath]]:
395        """@private"""
396        # used by PathsRegistrar
397        if paths is None:
398            paths = self.get_named_paths(nps)
399        idps = []
400        for path in paths:
401            #
402            # we can get this from our self.csvpath, should we?
403            #
404            c = CsvPath()
405            MetadataParser(c).extract_metadata(instance=c, csvpath=path)
406            idps.append((c.identity, path))
407        return idps
408
409    def _find_one(self, npn: NamedPathsName, identity: Identity) -> Csvpath:
410        if npn is not None:
411            paths = self.get_identified_paths_in(npn)
412            for path in paths:
413                if path[0] == identity:
414                    return path[1]
415        raise InputException(
416            f"Path identified as '{identity}' must be in the group identitied as '{npn}'"
417        )
418
419    def _name_from_name_part(self, name):
420        i = name.rfind(".")
421        if i == -1:
422            pass
423        else:
424            name = name[0:i]
425        return name
MARKER: str = '---- CSVPATH ----'
def set_named_paths( self, np: dict[csvpath.managers.paths.paths_manager.NamedPathsName, list[csvpath.managers.paths.paths_manager.Csvpath]]) -> None:
76    def set_named_paths(self, np: dict[NamedPathsName, list[Csvpath]]) -> None:
77        for name in np:
78            if not isinstance(np[name], list):
79                msg = f"{name} does not key a list of csvpaths"
80                self.csvpaths.error_manager.handle_error(source=self, msg=msg)
81                if self.csvpaths.ecoms.do_i_raise():
82                    raise InputException(msg)
83                return
84        for k, v in np.items():
85            self.add_named_paths(name=k, paths=v)
86        self.csvpaths.logger.info("Set named-paths to %s groups", len(np))
def add_named_paths_from_dir( self, *, directory: str, name: csvpath.managers.paths.paths_manager.NamedPathsName = None) -> None:
 88    def add_named_paths_from_dir(
 89        self, *, directory: str, name: NamedPathsName = None
 90    ) -> None:
 91        if directory is None:
 92            msg = "Named paths collection name needed"
 93            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
 94            if self.csvpaths.ecoms.do_i_raise():
 95                raise InputException(msg)
 96        if not Nos(directory).isfile():
 97            dlist = Nos(directory).listdir()
 98            base = directory
 99            agg = []
100            for p in dlist:
101                if p[0] == ".":
102                    continue
103                if p.find(".") == -1:
104                    continue
105                ext = p[p.rfind(".") + 1 :].strip().lower()
106                if ext not in self.csvpaths.config.csvpath_file_extensions:
107                    continue
108                path = os.path.join(base, p)
109                if name is None:
110                    #
111                    # add files one by one under their own names
112                    #
113                    aname = self._name_from_name_part(p)
114                    self.add_named_paths_from_file(name=aname, file_path=path)
115                else:
116                    #
117                    # if a name, aggregate all the files
118                    #
119                    _ = self._get_csvpaths_from_file(path)
120
121                    #
122                    # try to find a run-index: N metadata and use it
123                    # to try to impose order? we could do this, but it would
124                    # be messy and a work-around to avoid making people
125                    # use the ordered ways of creating named-paths that
126                    # already exist: JSON and all-in-ones
127                    #
128                    """
129                    c = self.csvpaths.csvpath()
130                    MetadataParser(c).extract_metadata(instance=c, csvpath=path)
131                    """
132                    agg += _
133            if len(agg) > 0:
134                self.add_named_paths(name=name, paths=agg, source_path=directory)
135        else:
136            msg = "Dirname must point to a directory"
137            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
138            if self.csvpaths.ecoms.do_i_raise():
139                raise InputException(msg)
def add_named_paths_from_file( self, *, name: csvpath.managers.paths.paths_manager.NamedPathsName, file_path: str) -> None:
141    def add_named_paths_from_file(
142        self, *, name: NamedPathsName, file_path: str
143    ) -> None:
144        self.csvpaths.logger.debug("Reading csvpaths file at %s", file_path)
145        _ = self._get_csvpaths_from_file(file_path)
146        self.add_named_paths(name=name, paths=_, source_path=file_path)
def add_named_paths_from_json(self, file_path: str) -> None:
148    def add_named_paths_from_json(self, file_path: str) -> None:
149        try:
150            self.csvpaths.logger.debug("Opening JSON file at %s", file_path)
151            with open(file_path, encoding="utf-8") as f:
152                j = json.load(f)
153                self.csvpaths.logger.debug("Found JSON file with %s keys", len(j))
154                for k in j:
155                    self.store_json_paths_file(k, file_path)
156                    v = j[k]
157                    paths = []
158                    for f in v:
159                        _ = self._get_csvpaths_from_file(f)
160                        paths += _
161                    self.add_named_paths(name=k, paths=paths, source_path=file_path)
162        except (OSError, ValueError, TypeError, JSONDecodeError) as ex:
163            self.csvpaths.error_manager.handle_error(source=self, msg=f"{ex}")
164            if self.csvpaths.ecoms.do_i_raise():
165                raise
def add_named_paths( self, *, name: csvpath.managers.paths.paths_manager.NamedPathsName, paths: list[csvpath.managers.paths.paths_manager.Csvpath] = None, from_file: str = None, from_dir: str = None, from_json: str = None, source_path: str = None) -> None:
167    def add_named_paths(
168        self,
169        *,
170        name: NamedPathsName,
171        paths: list[Csvpath] = None,
172        from_file: str = None,
173        from_dir: str = None,
174        from_json: str = None,
175        source_path: str = None,
176    ) -> None:
177        if from_file is not None:
178            return self.add_named_paths_from_file(name=name, file_path=from_file)
179        elif from_dir is not None:
180            return self.add_named_paths_from_dir(name=name, directory=from_dir)
181        elif from_json is not None:
182            return self.add_named_paths_from_json(file_path=from_json)
183        if not isinstance(paths, list):
184            msg = """Paths must be a list of csvpaths.
185                    If you want to load a file use add_named_paths_from_file or
186                    set_named_paths_from_json."""
187            self.csvpaths.error_manager.handle_error(source=self, msg=msg)
188            if self.csvpaths.ecoms.do_i_raise():
189                raise InputException(msg)
190            return
191        self.csvpaths.logger.debug("Adding csvpaths to named-paths group %s", name)
192        for _ in paths:
193            self.csvpaths.logger.debug("Adding %s to %s", _, name)
194        s = self._str_from_list(paths)
195        t = self._copy_in(name, s)
196        grp_paths = self.get_identified_paths_in(name, paths=paths)
197        ids = [t[0] for t in grp_paths]
198        for i, t in enumerate(ids):
199            if t is None or t.strip() == "":
200                ids[i] = f"{i}"
201        mdata = PathsMetadata(self.csvpaths.config)
202        mdata.archive_name = self.csvpaths.config.archive_name
203        mdata.named_paths_name = name
204        sep = Nos(mdata.named_paths_root).sep
205        mdata.named_paths_home = f"{mdata.named_paths_root}{sep}{name}"
206        mdata.group_file_path = f"{mdata.named_paths_home}{sep}group.csvpaths"
207        mdata.named_paths = paths
208        mdata.named_paths_identities = ids
209        mdata.named_paths_count = len(ids)
210        mdata.source_path = source_path
211        self.registrar.register_complete(mdata)
def get_named_paths( self, name: csvpath.managers.paths.paths_manager.NamedPathsName) -> list[csvpath.managers.paths.paths_manager.Csvpath]:
217    def get_named_paths(self, name: NamedPathsName) -> list[Csvpath]:
218        self.csvpaths.logger.info("Getting named-paths for %s", name)
219        ret = None
220        npn = None
221        identity = None
222        if name.startswith("$"):
223            ref = ReferenceParser(name)
224            if ref.datatype != ReferenceParser.CSVPATHS:
225                raise InputException(
226                    f"Reference datatype must be {ReferenceParser.CSVPATHS}"
227                )
228            npn = ref.root_major
229            identity = ref.name_one
230        else:
231            npn, identity = self._paths_name_path(name)
232        if identity is None and self.has_named_paths(npn):
233            ret = self._get_named_paths(npn)
234        elif identity is not None and identity.find(":") == -1:
235            ret = [self._find_one(npn, identity)]
236        #
237        # we need to be able to grab paths up to and starting from like this:
238        #   $many.csvpaths.food:to
239        #   $many.csvpaths.food:from
240        #
241        elif identity is not None:
242            i = identity.find(":")
243            directive = identity[i:]
244            identity = identity[0:i]
245            if directive == ":to":
246                ret = self._get_to(npn, identity)
247            elif directive == ":from":
248                ret = self._get_from(npn, identity)
249            else:
250                self.csvpaths.logger.error(
251                    "Incorrect reference directive: name: %s, paths-name: %, identity: %",
252                    name,
253                    npn,
254                    identity,
255                )
256                raise InputException(
257                    f"Reference directive must be :to or :from, not {directive}"
258                )
259        return ret