csvpath

The CsvPath Framework makes it easy to pre-board external data files. It sits between Managed File Transfer and the data lake or applications. CsvPath provides durable dataset identification, validation and canonicalization, and stages data for internal use as a known-good raw data source. The goal is to automate the pre-boarding process.

CsvPath Language is the core validation and canonicalization engine of the Framework. The validation files can be developed without coding using the CLI. When you are ready to automate pre-boarding you will use the classes documented here, in particular csvpath.CsvPath, csvpath.CsvPaths, and the managers in csvpath.managers:

You access the managers from a csvpath.CsvPaths instance. You should not construct your own.

There are many other classes you could potentially use in some specific and narrow cases, such as building a new integration, a new function, or a new type of printer. But for 99% of automation use cases these classes are all you need.

 1"""
 2    The CsvPath Framework makes it easy to pre-board external data files. It sits between Managed File Transfer and the data lake or applications. CsvPath provides durable dataset identification, validation and canonicalization, and stages data for internal use as a known-good raw data source. The goal is to automate the pre-boarding process.
 3
 4CsvPath Language is the core validation and canonicalization engine of the Framework. The validation files can be developed without coding using the CLI. When you are ready to automate pre-boarding you will use the classes documented here, in particular csvpath.CsvPath, csvpath.CsvPaths, and the managers in csvpath.managers:
 5
 6- csvpath.managers.files.file_manager
 7- csvpath.managers.paths.paths_manager
 8- csvpath.managers.results.results_manager
 9
10You access the managers from a csvpath.CsvPaths instance. You should not construct your own.
11
12There are many other classes you could potentially use in some specific and narrow cases, such as building a new integration, a new function, or a new type of printer. But for 99% of automation use cases these classes are all you need.
13"""
14
15from csvpath.csvpath import CsvPath
16from csvpath.csvpaths import CsvPaths
17
18__all__ = ["CsvPath", "CsvPaths"]
class CsvPath(csvpath.managers.errors.error_collector.ErrorCollector, csvpath.util.printer.Printer):
  39class CsvPath(ErrorCollector, Printer):  # pylint: disable=R0902, R0904
  40    """CsvPath represents a csvpath string that contains a reference to
  41    a file, scanning instructions, and rules for matching lines.
  42    """
  43
  44    # re R0902, R0904: reasonable, but not a priority
  45
  46    def __init__(  # pylint: disable=R0913
  47        self,
  48        *,
  49        csvpaths=None,
  50        delimiter=",",
  51        quotechar='"',
  52        skip_blank_lines=True,
  53        print_default=True,
  54        config=None,
  55        error_manager=None,
  56    ):
  57        # re: R0913: all reasonable pass-ins with sensible defaults
  58        #
  59        # the config.ini file loaded as a ConfigParser instance
  60        #
  61        # definitely do not want this coming from CsvPaths because
  62        # we want to be able to override config.ini specifically for
  63        # this instance, if needed; however, we do want to be able
  64        # to pass in a config object that has been configured in some
  65        # way.
  66        self._config = config
  67        self.scanner = None
  68        """ @private """
  69        self.matcher = None
  70        """ @private """
  71        #
  72        # a parent CsvPaths may manage a CsvPath instance. if so, it will enable
  73        # the use of named files and named paths, print capture, error handling,
  74        # results collection, reference handling, etc. if a CsvPaths is not present
  75        # the CsvPath instance is responsible for all its own upkeep and does not
  76        # have some of those capabilities.
  77        #
  78        self.csvpaths = csvpaths
  79        """ @private """
  80        #
  81        # there are two logger components one for CsvPath and one for CsvPaths.
  82        # the default levels are set in config.ini. to change the levels pass LogUtility
  83        # your component instance and the logging level. e.g.:
  84        # LogUtility.logger(csvpath, "debug")
  85        #
  86        self.logger = LogUtility.logger(self)
  87        """ @private """
  88        self.logger.info("initialized CsvPath")
  89        #
  90        # if we don't have a csvpaths these will both be None
  91        #
  92        self.named_paths_name = None
  93        """ @private """
  94        self.named_file_name = None
  95        """ @private """
  96        #
  97        # all errors come to our manager if they occur during matching. we use
  98        # the CsvPaths manager if possible. Otherwise, we just make our own that
  99        # only knows how to collect errors, not distribute them.
 100        #
 101        self.ecoms = ErrorCommunications(csvpath=self)
 102        """ @private """
 103        self.error_manager = ErrorManager(csvpath=self)
 104        if csvpaths is not None:
 105            self.error_manager.add_internal_listener(csvpaths.error_manager)
 106        #
 107        # modes are set in external comments
 108        #
 109        self.modes = ModeController(self)
 110        """ @private """
 111        #
 112        # captures the number of lines up front and tracks line stats as the
 113        # run progresses
 114        #
 115        self._line_monitor = None
 116        #
 117        # the scanning part of the csvpath. e.g. $test.csv[*]
 118        #
 119        self.scan = None
 120        """ @private """
 121        #
 122        # the matching part of the csvpath. e.g. [yes()]
 123        #
 124        self.match = None
 125        """ @private """
 126        #
 127        # when True the lines that do not match are returned from next()
 128        # and collect(). this effectively switches CsvPath from being an
 129        # create an OR expression in this case. in the default, we say:
 130        #     are all of these things true?
 131        # but when collect_when_not_matched is True we ask:
 132        #     are any of these things not true?
 133        #
 134        # self._when_not_matched = False
 135        self._headers = None
 136        self.variables: Dict[str, Any] = {}
 137        self.delimiter = delimiter
 138        self.quotechar = quotechar
 139        #
 140        # a blank line has no headers. it has no data. physically it is 2 \n with
 141        # nothing but whitespace between them. any data or any delimiters would make
 142        # the line non-blank.
 143        #
 144        self.skip_blank_lines = skip_blank_lines
 145        #
 146        # in the case of a [*] scan where the last line is blank we would miss firing
 147        # last() unless we take steps. instead, we allow that line to match, but we
 148        # do not return a line to the caller of next() and we freeze the variables
 149        # there is room for side effects make changes, but that a reasonable compromise
 150        # between missing last and allowing unwanted changes. we definitely do not
 151        # freeze is_valid or stop, which can be useful signaling, even in an
 152        # inconsistent state.
 153        #
 154        self._freeze_path = False
 155        #
 156        # counts are 1-based
 157        #
 158        self.scan_count = 0
 159        self.match_count = 0
 160        #
 161        # used by stop() and advance(). a stopped CsvPath halts without finishing
 162        # its run. an advancing CsvPath doesn't consider the match part of the
 163        # csvpath and does not incur any side effects as it progresses through the
 164        # rows the advance skips. the skip() function has the same effect as
 165        # advance(1) but without any guarantee that the other match components on
 166        # the line will be considered before skipping ahead. there are likely
 167        # corner cases where an onmatch qualifier or some other constraint will
 168        # trigger match components that would otherwise be skipped so the ability
 169        # to shortcut some of the match should not be relied on for anything
 170        # critical.
 171        #
 172        self.stopped = False
 173        self._advance = 0
 174        #
 175        # the lines var will hold a reference to a LineSpooler instance during a
 176        # run, if lines are being collected by the collect() method. if the user
 177        # is using this CsvPath instance directly the LineSpooler is only there
 178        # as a proxy to the list of lines being collected -- we don't spool to
 179        # disk, at least atm.
 180        #
 181        self.lines = None
 182        """ @private """
 183        #
 184        # set by fail()
 185        #
 186        self._is_valid = True
 187        #
 188        # basic timing for the CsvPath instance only. if the CsvPath is managed
 189        # by a CsvPaths the timings for a run may include time spent by other
 190        # CsvPath instances.
 191        #
 192        self.last_row_time = -1
 193        """ @private """
 194        self.rows_time = -1
 195        """ @private """
 196        self.total_iteration_time = -1
 197        """ @private """
 198        #
 199        # limiting collection means returning fewer headers (values in the
 200        # line, a.k.a columns) then are available. limiting headers returned
 201        # can impact named results, reset_headers(), and other considerations.
 202        #
 203        self._limit_collection_to = []
 204        #
 205        # error collecting is at the CsvPath instance by default. CsvPath
 206        # instances that are managed by a CsvPaths have their errors collected
 207        # by their Results as well. Result handles persistence.
 208        #
 209        # errors policies are set in config.ini at CsvPath and CsvPaths levels.
 210        #
 211        self._errors: List[Error] = []
 212        #
 213        # saves the scan and match parts of paths for reference. mainly helpful
 214        # for testing the CsvPath library itself; not used end users. the run
 215        # name becomes the file name of the saved path parts.
 216        #
 217        self._save_scan_dir = None
 218        self._save_match_dir = None
 219        self._run_name = None
 220        #
 221        # metadata is collected from "outer" csvpath comments. outer comments
 222        # separate from the comments within the match part of the csvpath.
 223        # the keys are words with colons. e.g. ~ name: my new csvpath ~
 224        #
 225        self.metadata: Dict[str, Any] = {}
 226        #
 227        # holds the current match count while we're in the middle of a match
 228        # so that anyone who wants to can increase the match count using
 229        # raise_match_count_if(). it is important to do the raise asap so that
 230        # components that are onmatched have the right match count available.
 231        #
 232        self._current_match_count = 0
 233        #
 234        # printers receive print lines from the print function. the default
 235        # printer prints to standard out. a CsvPath that is managed by a
 236        # CsvPaths has its Results as a printer, as well as having
 237        # the default printer.
 238        #
 239        self.printers = []
 240        """ @private """
 241        if print_default:
 242            self.printers.append(StdOutPrinter())
 243        #
 244        # _function_times_match collects the time a function spends doing its matches()
 245        #
 246        self._function_times_match = {}
 247        #
 248        # _function_times_value collects the time a function spends doing its to_value()
 249        #
 250        self._function_times_value = {}
 251        self._created_at = datetime.now(timezone.utc)
 252        self._run_started_at = None
 253        self._collecting = False
 254        #
 255        # holds the unmatched lines when lines are being collected and
 256        # _unmatched_available is True. it is analogous to the lines returned
 257        # by collect(), but is the lines not returned by collect().
 258        #
 259        self._unmatched = None
 260
 261    #
 262    # this method saves and reloads the config. if you don't want that use
 263    # CsvPath.config.save_to_config().
 264    #
 265    def add_to_config(self, section, key, value) -> None:
 266        """@private"""
 267        self.config.add_to_config(section=section, key=key, value=value)
 268        self.config.save_config()
 269        self.config.reload()
 270
 271    @property
 272    def data_from_preceding(self) -> bool:
 273        """@private"""
 274        return self.modes.source_mode.value
 275
 276    @data_from_preceding.setter
 277    def data_from_preceding(self, dfp: bool) -> None:
 278        """@private"""
 279        self.modes.source_mode.value = dfp
 280
 281    @property
 282    def unmatched(self) -> list[list[Any]]:
 283        """@private"""
 284        return self._unmatched
 285
 286    @unmatched.setter
 287    def unmatched(self, lines: list[list[Any]]) -> None:
 288        """@private"""
 289        self._unmatched = lines
 290
 291    @property
 292    def collecting(self) -> bool:
 293        """@private"""
 294        return self._collecting
 295
 296    @collecting.setter
 297    def collecting(self, c: bool) -> None:
 298        """@private"""
 299        self._collecting = c
 300
 301    @property
 302    def unmatched_available(self) -> bool:
 303        """@private"""
 304        return self.modes.unmatched_mode.value
 305
 306    @unmatched_available.setter
 307    def unmatched_available(self, ua: bool) -> None:
 308        """@private"""
 309        self.modes.unmatched_mode.value = ua
 310
 311    @property
 312    def created_at(self) -> datetime:
 313        """@private"""
 314        return self._created_at
 315
 316    @property
 317    def run_started_at(self) -> datetime:
 318        """@private"""
 319        return self._run_started_at
 320
 321    @property
 322    def will_run(self) -> bool:
 323        """@private"""
 324        return self.modes.run_mode.value
 325
 326    @will_run.setter
 327    def will_run(self, mode) -> None:
 328        """@private"""
 329        self.modes.run_mode.value = mode
 330
 331    #
 332    # increases the total accumulated time spent doing c.matches() by t
 333    #
 334    def up_function_time_match(self, c, t) -> None:
 335        """@private"""
 336        if c not in self.function_times_match:
 337            self.function_times_match[c] = 0
 338        st = self.function_times_match[c]
 339        st += t
 340        self.function_times_match[c] = st
 341
 342    @property
 343    def function_times_match(self) -> int:
 344        """@private"""
 345        return self._function_times_match
 346
 347    #
 348    # increases the total accumulated time spent doing c.to_value() by t
 349    #
 350    def up_function_time_value(self, c, t) -> None:
 351        """@private"""
 352        if c not in self.function_times_value:
 353            self.function_times_value[c] = 0
 354        st = self.function_times_value[c]
 355        st += t
 356        self.function_times_value[c] = st
 357
 358    @property
 359    def function_times_value(self) -> int:
 360        """@private"""
 361        return self._function_times_value
 362
 363    def do_i_raise(self) -> bool:
 364        """@private"""
 365        return self.ecoms.do_i_raise()
 366
 367    @property
 368    def advance_count(self) -> int:  # pragma: no cover
 369        """@private"""
 370        return self._advance
 371
 372    @advance_count.setter
 373    def advance_count(self, lines: int) -> None:
 374        """@private"""
 375        self._advance = lines
 376
 377    @property
 378    def headers(self) -> List[str]:
 379        """@private"""
 380        if self._headers is None:
 381            self.get_total_lines_and_headers()
 382        return self._headers
 383
 384    @headers.setter
 385    def headers(self, headers: List[str]) -> None:
 386        """@private"""
 387        self._headers = headers
 388
 389    @property
 390    def line_monitor(self) -> LineMonitor:
 391        """@private"""
 392        if self._line_monitor is None:
 393            self.get_total_lines_and_headers()
 394        return self._line_monitor
 395
 396    @line_monitor.setter
 397    def line_monitor(self, lm) -> None:
 398        """@private"""
 399        self._line_monitor = lm
 400
 401    @property
 402    def AND(self) -> bool:  # pylint: disable=C0103
 403        return self.modes.logic_mode.value
 404
 405    @AND.setter
 406    def AND(self, a: bool) -> bool:  # pylint: disable=C0103
 407        self.modes.logic_mode.value = a
 408
 409    @property
 410    def OR(self) -> bool:  # pylint: disable=C0103
 411        return not self.modes.logic_mode.value
 412
 413    @OR.setter
 414    def OR(self, a: bool) -> bool:  # pylint: disable=C0103
 415        self.modes.logic_mode.value = not a
 416
 417    @property
 418    def identity(self) -> str:
 419        """returns id or name if found in metadata.
 420
 421        the id or name gets into metadata primarily if found
 422        in an "external" comment in the csvpath. "external"
 423        meaning outside the []s. comments are keyword:comment.
 424        we take id, Id, ID and name, Name, NAME.
 425
 426        id is preferred over name. E.g. in:
 427        ~ name: my path description: an example id: this value wins ~
 428        the id becomes the identity of the instance.
 429
 430        we prefer in this order: all-lower most, Initial-caps,
 431        ALL-CAPS least
 432
 433        the ordering is relied on in Result and possibly
 434        elsewhere.
 435        """
 436        ret = None
 437        if not self.metadata:
 438            ret = ""
 439        if "NAME" in self.metadata:
 440            ret = self.metadata["NAME"]
 441        if "Name" in self.metadata:
 442            ret = self.metadata["Name"]
 443        if "name" in self.metadata:
 444            ret = self.metadata["name"]
 445        if "ID" in self.metadata:
 446            ret = self.metadata["ID"]
 447        if "Id" in self.metadata:
 448            ret = self.metadata["Id"]
 449        if "id" in self.metadata:
 450            ret = self.metadata["id"]
 451        return ret
 452
 453    @property
 454    def config(self) -> Config:  # pylint: disable=C0116
 455        """@private"""
 456        if not self._config:
 457            self._config = Config()
 458        return self._config
 459
 460    # ==========================
 461    # Errors
 462    # <thinking> if we have a csvpaths people should look at the result to find errors
 463    # but we give access to metadata, vars, etc. from the csvpath, so we should
 464    # give errors too. that means we need to have our own listener. ultimately we'd
 465    # just be adding pointers, not dup the original error data.
 466    #
 467    def metadata_update(self, mdata: Metadata) -> None:
 468        """@private"""
 469        if isinstance(mdata, Error):
 470            self.collect_error(mdata)
 471
 472    @property
 473    def errors(self) -> List[Error]:  # pylint: disable=C0116
 474        return self._errors
 475
 476    @property
 477    def errors_count(self) -> int:  # pylint: disable=C0116
 478        return len(self.errors)
 479
 480    def collect_error(self, error: Error) -> None:  # pylint: disable=C0116
 481        """@private"""
 482        self.errors.append(error)
 483
 484    def has_errors(self) -> bool:
 485        return self.errors_count > 0
 486
 487    @property
 488    def stop_on_validation_errors(self) -> bool:
 489        """@private"""
 490        return self.modes.validation_mode.stop_on_validation_errors
 491
 492    @property
 493    def fail_on_validation_errors(self) -> bool:
 494        """@private"""
 495        return self.modes.validation_mode.fail_on_validation_errors
 496
 497    @property
 498    def print_validation_errors(self) -> bool:
 499        """@private"""
 500        return self.modes.validation_mode.print_validation_errors
 501
 502    @property
 503    def log_validation_errors(self) -> bool:
 504        """@private"""
 505        return self.modes.validation_mode.log_validation_errors
 506
 507    @property
 508    def raise_validation_errors(self) -> bool:
 509        """@private"""
 510        return self.modes.validation_mode.raise_validation_errors
 511
 512    @property
 513    def match_validation_errors(self) -> bool:
 514        """@private"""
 515        return self.modes.validation_mode.match_validation_errors
 516
 517    @property
 518    def collect_validation_errors(self) -> bool:
 519        """@private"""
 520        return self.modes.validation_mode.collect_validation_errors
 521
 522    def add_printer(self, printer) -> None:  # pylint: disable=C0116
 523        """@private"""
 524        if printer not in self.printers:
 525            self.printers.append(printer)
 526
 527    def set_printers(self, printers: List) -> None:  # pylint: disable=C0116
 528        """@private"""
 529        self.printers = printers
 530
 531    @property
 532    def has_default_printer(self) -> bool:
 533        """@private"""
 534        if not self.printers:
 535            self.printers = []
 536        for p in self.printers:
 537            if isinstance(p, StdOutPrinter):
 538                return True
 539        return False
 540
 541    def print(self, string: str) -> None:  # pylint: disable=C0116
 542        """@private"""
 543        for p in self.printers:
 544            p.print(string)
 545
 546    def print_to(self, name: str, string: str) -> None:
 547        """@private"""
 548        for p in self.printers:
 549            p.print_to(name, string)
 550
 551    @property
 552    def last_line(self):
 553        """@private
 554        this method only returns the default printer's last_line"""
 555        if not self.printers or len(self.printers) == 0:
 556            return None
 557        return self.printers[0].last_line
 558
 559    @property
 560    def lines_printed(self) -> int:
 561        """@private
 562        this method only returns the default printer's lines printed"""
 563        if not self.printers or len(self.printers) == 0:
 564            return -1
 565        return self.printers[0].lines_printed
 566
 567    @property
 568    def is_frozen(self) -> bool:
 569        """@private
 570        True if the instance is matching on its last row only to
 571        allow last()s to run; in which case, no variable updates
 572        are allowed, along with other limitations."""
 573        return self._freeze_path
 574
 575    @is_frozen.setter
 576    def is_frozen(self, freeze: bool) -> None:
 577        """@private"""
 578        self._freeze_path = freeze
 579
 580    @property
 581    def explain(self) -> bool:
 582        """@private
 583        when this property is True CsvPath dumps a match explaination
 584        to INFO. this can be expensive. a 25% performance hit wouldn't
 585        be unexpected.
 586        """
 587        return self.modes.explain_mode.value
 588
 589    @explain.setter
 590    def explain(self, yesno: bool) -> None:
 591        """@private"""
 592        self.modes.explain_mode.value = yesno
 593
 594    @property
 595    def collect_when_not_matched(self) -> bool:
 596        """@private
 597        when this property is True CsvPath returns the lines that do not
 598        match the matchers match components"""
 599        return self.modes.return_mode.collect_when_not_matched
 600
 601    @collect_when_not_matched.setter
 602    def collect_when_not_matched(self, yesno: bool) -> None:
 603        """@private
 604        when c ollect_when_not_matched is True we return the lines that failed
 605        to match, rather than the default behavior of returning the matches.
 606        """
 607        self.modes.return_mode.collect_when_not_matched = yesno
 608
 609    def parse(self, csvpath, disposably=False):
 610        """@private
 611        displosably is True when a Matcher is needed for some purpose other than
 612        the run we were created to do. could be that a match component wanted a
 613        parsed csvpath for its own purposes. when True, we create and return the
 614        Matcher, but then forget it ever existed.
 615
 616        when disposably is False we build the scanner and return that
 617        """
 618        #
 619        # strip off any comments and collect any metadata
 620        # CsvPaths will do this earlier but it stripped off
 621        # the comments so we won't find them again
 622        #
 623        csvpath = MetadataParser(self).extract_metadata(instance=self, csvpath=csvpath)
 624        self.update_settings_from_metadata()
 625        #
 626        #
 627        # exp!
 628        if disposably is False:
 629            csvpath = self._update_file_path(csvpath)
 630        #
 631        #
 632        #
 633        s, mat = self._find_scan_and_match_parts(csvpath)
 634        #
 635        # a disposable matcher still needs the match part
 636        #
 637        self.match = mat
 638        if disposably:
 639            pass
 640        else:
 641            self.scan = s
 642            self.scanner = Scanner(csvpath=self)
 643            self.scanner.parse(s)
 644        #
 645        # we build a matcher to see if it builds without error.
 646        # in principle we could keep this as the actual matcher.
 647        # atm, tho, just create a dry-run copy. in some possible
 648        # unit tests we may not have a parsable match part.
 649        #
 650        if disposably:
 651            matcher = None
 652            if mat:
 653                matcher = Matcher(csvpath=self, data=mat, line=None, headers=None)
 654            #
 655            # if the matcher was requested for some reason beyond our own needs
 656            # we just return it and forget it existed.
 657            #
 658            return matcher
 659        if self.scanner.filename is None:
 660            raise FileException("Cannot proceed without a filename")
 661        self.get_total_lines_and_headers()
 662        return self
 663
 664    def update_settings_from_metadata(self) -> None:
 665        """@private"""
 666        #
 667        # settings:
 668        #   - logic-mode: AND | OR
 669        #   - return-mode: matches | no-matches
 670        #   - print-mode: default | no-default
 671        #   - validation-mode: (no-)print | log | (no-)raise | quiet | (no-)match
 672        #   - run-mode: no-run | run
 673        #   - unmatched-mode: no-keep | keep
 674        #   - source-mode: preceding | origin
 675        #   - files-mode: all | no-data | no-unmatched | no-printouts | data | unmatched | errors | meta | vars | printouts
 676        #
 677        self.modes.update()
 678
 679    # =====================
 680    # in principle the modes should come through the mode controller like:
 681    #      self.modes.transfer_mode.value
 682    # not wading into that today. low value.
 683    #
 684    @property
 685    def transfer_mode(self) -> str:
 686        """@private"""
 687        return self.metadata.get("transfer-mode")
 688
 689    @property
 690    def source_mode(self) -> str:
 691        """@private"""
 692        return self.metadata.get("source-mode")
 693
 694    @property
 695    def error_mode(self) -> str:
 696        """@private"""
 697        return self.metadata.get("error-mode")
 698
 699    @property
 700    def files_mode(self) -> str:
 701        """@private"""
 702        return self.metadata.get("files-mode")
 703
 704    @property
 705    def validation_mode(self) -> str:
 706        """@private"""
 707        return self.metadata.get("validation-mode")
 708
 709    @property
 710    def run_mode(self) -> str:
 711        """@private"""
 712        return self.metadata.get("run-mode")
 713
 714    @property
 715    def logic_mode(self) -> str:
 716        """@private"""
 717        return self.metadata.get("logic-mode")
 718
 719    @property
 720    def return_mode(self) -> str:
 721        """@private"""
 722        return self.modes.get("return-mode")
 723
 724    @property
 725    def explain_mode(self) -> str:
 726        """@private"""
 727        return self.metadata.get("explain-mode")
 728
 729    @property
 730    def print_mode(self) -> str:
 731        """@private"""
 732        return self.metadata.get("print-mode")
 733
 734    @property
 735    def unmatched_mode(self) -> str:
 736        """@private"""
 737        return self.metadata.get("unmatched-mode")
 738
 739    # =====================
 740
 741    @property
 742    def transfers(self) -> list[tuple[str, str]]:
 743        """@private"""
 744        return self.modes.transfer_mode.transfers
 745
 746    @property
 747    def all_expected_files(self) -> list[str]:
 748        """@private"""
 749        return self.modes.files_mode.all_expected_files
 750
 751    @all_expected_files.setter
 752    def all_expected_files(self, efs: list[str]) -> None:
 753        """@private"""
 754        self.modes.files_mode.all_expected_files = efs
 755
 756    def _pick_named_path(self, name, *, specific=None) -> str:
 757        """@private"""
 758        if not self.csvpaths:
 759            raise CsvPathsException("No CsvPaths object available")
 760        np = self.csvpaths.paths_manager.get_named_paths(name)
 761        if not np:
 762            raise CsvPathsException(f"Named-paths '{name}' not found")
 763        if len(np) == 0:
 764            raise CsvPathsException(f"Named-paths '{name}' has no csvpaths")
 765        if len(np) == 1:
 766            return np[0]
 767        if specific is None:
 768            self.logger.warning(
 769                "Parse_named_path %s has %s csvpaths. Using just the first one.",
 770                name,
 771                len(np),
 772            )
 773            return np[0]
 774        for p in np:
 775            # this ends up being redundant to the caller. we do it 1x so it's not
 776            # a big lift and is consistent.
 777            c = CsvPath(csvpaths=self.csvpaths)
 778            MetadataParser(c).extract_metadata(instance=c, csvpath=p)
 779            if c.identity == specific:
 780                return p
 781        self.logger.error(
 782            "Cannot find csvpath identified as %s in named-paths %s", specific, name
 783        )
 784        raise ParsingException(f"Cannot find path '{specific}' in named-paths '{name}'")
 785
 786    def parse_named_path(self, name, *, disposably=False, specific=None):
 787        """@private
 788        disposably is True when a Matcher is needed for some purpose other than
 789        the run we were created to do. could be that a match component wanted a
 790        parsed csvpath for its own purposes. import() uses this method.
 791        when True, we create and return the Matcher, but then forget it ever existed.
 792        also note: the path must have a name or full filename. $[*] is not enough.
 793        """
 794        if not self.csvpaths:
 795            raise CsvPathsException("No CsvPaths object available")
 796
 797        path = self._pick_named_path(name, specific=specific)
 798        c = CsvPath(csvpaths=self.csvpaths)
 799        path = MetadataParser(c).extract_metadata(instance=c, csvpath=path)
 800        #
 801        # exp. oddly this seems to be superfluous
 802        # if disposably is False:
 803        #    path = c._update_file_path(path)
 804        #
 805        dis = c.parse(path, disposably=disposably)
 806        if disposably is True:
 807            return dis
 808        return None
 809
 810    def _update_file_path(self, data: str):
 811        """@private
 812        this method replaces a name (i.e. name in: $name[*[][yes()]) with
 813        a file system path, if that name is registered with csvpaths's file
 814        manager. if there is no csvpaths no replace happens. if there is a
 815        csvpaths but the file manager doesn't know the name, no replace
 816        happens.
 817        """
 818        if data is None:
 819            raise InputException("The csvpath string cannot be None")
 820        if self.csvpaths is None:
 821            return data
 822        name = self._get_name(data)
 823        path = self.csvpaths.file_manager.get_named_file(name)
 824        if path is None:
 825            return data
 826        if path == name:
 827            return data
 828        return data.replace(name, path)
 829
 830    def _get_name(self, data: str):
 831        if self.csvpaths is None:
 832            return data
 833        data = data.strip()
 834        if data[0] == "$":
 835            name = data[1 : data.find("[")]
 836            return name
 837        raise FormatException(f"Must start with '$', not {data[0]}")
 838
 839    def _find_scan_and_match_parts(self, data):
 840        if data is None or not isinstance(data, str):
 841            raise InputException("Not a csvpath string")
 842        scan = ""
 843        matches = ""
 844        data = data.strip()
 845        i = data.find("]")
 846        if i < 0:
 847            raise InputException(f"Cannot find the scan part of this csvpath: {data}")
 848        if i == len(data) - 1:
 849            raise InputException(
 850                f"The scan part of this csvpath cannot be last: {data}"
 851            )
 852
 853        scan = data[0 : i + 1]
 854        scan = scan.strip()
 855
 856        ndata = data[i + 1 :]
 857        ndata = ndata.strip()
 858
 859        if ndata == "":
 860            raise InputException(f"There must be a match part of this csvpath: {data}")
 861        if ndata[0] != "[":
 862            raise InputException(f"Cannot find the match part of this csvpath: {data}")
 863        if ndata[len(ndata) - 1] != "]":
 864            raise InputException(f"The match part of this csvpath is incorrect: {data}")
 865        matches = ndata
 866        #
 867        # if we're given directory(s) to save to, save the parts
 868        #
 869        self._save_parts_if(scan, matches)
 870        return scan, matches
 871
 872    def _save_parts_if(self, scan, match):
 873        if self._save_scan_dir and self._run_name:
 874            with open(
 875                os.path.join(self._save_scan_dir, f"{self._run_name}.txt"),
 876                "w",
 877                encoding="utf-8",
 878            ) as f:
 879                f.write(scan)
 880        if self._save_match_dir and self._run_name:
 881            with open(
 882                os.path.join(self._save_match_dir, f"{self._run_name}.txt"),
 883                "w",
 884                encoding="utf-8",
 885            ) as f:
 886                f.write(match)
 887
 888    def __str__(self):
 889        return f"""
 890            path: {self.scanner.path if self.scanner else None}
 891            identity: {self.identity}
 892            parsers: [scanner=Ply, matcher=Lark, print=Lark]
 893            from_line: {self.scanner.from_line if self.scanner else None}
 894            to_line: {self.scanner.to_line if self.scanner else None}
 895            all_lines: {self.scanner.all_lines if self.scanner else None}
 896            these: {self.scanner.these if self.scanner else None}
 897            matcher: {self.matcher}
 898            variables: {len(self.variables)}
 899            metadata: {len(self.metadata)}
 900        """
 901
 902    @property
 903    def is_valid(self) -> bool:  # pragma: no cover
 904        """Csvpaths can flag a CSV file as invalid using the fail() function"""
 905        return self._is_valid
 906
 907    @is_valid.setter
 908    def is_valid(self, tf: bool) -> None:
 909        self._is_valid = tf
 910
 911    @property
 912    def completed(self) -> bool:
 913        if not self.scanner or not self.line_monitor:
 914            return False
 915        if self.scanner.is_last(self.line_monitor.physical_line_number):
 916            return True
 917        return False
 918
 919    @property
 920    def from_line(self):  # pragma: no cover pylint: disable=C0116
 921        """@private"""
 922        if self.scanner is None:
 923            raise ParsingException("No scanner available. Have you parsed a csvpath?")
 924        return self.scanner.from_line
 925
 926    @property
 927    def to_line(self):  # pragma: no cover pylint: disable=C0116
 928        """@private"""
 929        if self.scanner is None:
 930            raise ParsingException("No scanner available. Have you parsed a csvpath?")
 931        return self.scanner.to_line
 932
 933    @property
 934    def all_lines(self):  # pragma: no cover pylint: disable=C0116
 935        """@private"""
 936        if self.scanner is None:
 937            raise ParsingException("No scanner available. Have you parsed a csvpath?")
 938        return self.scanner.all_lines
 939
 940    @property
 941    def path(self):  # pragma: no cover pylint: disable=C0116
 942        """@private"""
 943        if self.scanner is None:
 944            raise ParsingException("No scanner available. Have you parsed a csvpath?")
 945        return self.scanner.path
 946
 947    @property
 948    def these(self):  # pragma: no cover pylint: disable=C0116
 949        """@private"""
 950        if self.scanner is None:
 951            raise ParsingException("No scanner available. Have you parsed a csvpath?")
 952        return self.scanner.these
 953
 954    @property
 955    def limit_collection_to(self) -> List[int]:
 956        """@private
 957        returns the list of headers to collect when a line matches. by default
 958        this list is empty and all headers are collected.
 959        """
 960        return self._limit_collection_to
 961
 962    @limit_collection_to.setter
 963    def limit_collection_to(self, indexes: List[int]) -> None:
 964        """@private"""
 965        self._limit_collection_to = indexes
 966        self.logger.warning("Setting a limit on headers collected: %s", indexes)
 967
 968    def stop(self) -> None:
 969        """@private"""
 970        self.stopped = True
 971
 972    #
 973    #
 974    # collect(), fast_forward(), and next() are the central methods of CsvPath.
 975    #
 976    # remember that since next() is a public method and collect() and fast_forward()
 977    # rely on it, we have to cut off exceptions at next(). users will not see
 978    # fast_forward and collect in the stack trace. probably that's not a big deal.
 979    # if it seems confusing we can work around the problem.
 980    #
 981    def collect(
 982        self, csvpath: str = None, *, nexts: int = -1, lines=None
 983    ) -> List[List[Any]] | LineSpooler:
 984        """Runs the csvpath forward and returns the matching lines seen as
 985        a list of lists. this method does not holds lines locally, not as
 986        accessible attributes. lines are not kept after the run completes
 987        and the collected lines are returned.
 988
 989        the optional lines argument may be an instance of any class that has an append(obj)
 990        method. if lines is None, a list is returned.
 991        """
 992        if self.scanner is None and csvpath is not None:
 993            self.parse(csvpath)
 994        if nexts < -1:
 995            raise ProcessingException(
 996                "Input must be >= -1. -1 means collect to the end of the file."
 997            )
 998        self.collecting = True
 999        #
1000        # we're going to use the passed in lines object. if it exists
1001        # it is a LineSpooler (we presume). we'll associate it with the
1002        # csvpath temporarily so anyone interested can get it. if it
1003        # doesn't exist we create a list for the lines var. we create a
1004        # list LinesSpooler to handle any inquiries during the run we
1005        # do all our line collecting busness with the local lines var
1006        # which has an append method regardless of if it is list or
1007        # LineSpooler. when we're done we break the link with the csvpath
1008        # and return the local variable. if it is a list it is going to
1009        # a csvpath user. if it is a LineSpooler it is going to a
1010        # CsvPaths instance.
1011        #
1012        self.lines = lines
1013        if lines is None:
1014            lines = []
1015            self.lines = ListLineSpooler(lines=lines)
1016        for _ in self.next():
1017            _ = _[:]
1018            self.lines.append(_)
1019            if nexts == -1:
1020                continue
1021            if nexts > 1:
1022                nexts -= 1
1023            else:
1024                break
1025        # we don't want to hold on to data more than needed. but
1026        # we do want to return data if we're not spooling. the
1027        # way we do that is to keep the local var available with the
1028        # list and/or the spooler. the caller needs to be aware of
1029        # both possibilities, but both offer __len__ and append.
1030        #
1031        # we keep the self.lines if it is not a list because that
1032        # makes it available to the runtime data collector so we can
1033        # see the line count in the metadata, saving opening a
1034        # potentially large data.csv to find out how many lines.
1035        if isinstance(self.lines, list):
1036            self.lines = None
1037        return lines
1038
1039    def fast_forward(self, csvpath=None):
1040        """Scans to the end of the CSV file. All scanned rows will be
1041        considered for match and variables and side effects will happen,
1042        but no rows will be returned or stored. -1 means to the end of
1043        the file. If you do not pass the csvpath string here you must first
1044        use the parse method."""
1045        if self.scanner is None and csvpath is not None:
1046            self.parse(csvpath)
1047        for _ in self.next():
1048            pass
1049        return self
1050
1051    #
1052    # dont_raise is for the use of fast_forward and collect who
1053    # may want to handle raising errors themselves.
1054    #
1055    def next(self, csvpath=None):
1056        """A generator function that steps through the CSV file returning
1057        matching rows."""
1058        try:
1059            if self.scanner is None and csvpath is not None:
1060                self.parse(csvpath)
1061            start = time.time()
1062            if self.will_run is True:
1063                for line in self._next_line():
1064                    b = self._consider_line(line)
1065                    if b:
1066                        line = self.limit_collection(line)
1067                        if line is None:
1068                            msg = "Line cannot be None"
1069                            self.logger.error(msg)
1070                            raise MatchException(msg)
1071                        if len(line) == 0:
1072                            msg = "Line cannot be len() == 0"
1073                            self.logger.error(msg)
1074                            raise MatchException(msg)
1075                        yield line
1076                    elif self.collecting and self.unmatched_available:
1077                        if self.unmatched is None:
1078                            self.unmatched = []
1079                        line = self.limit_collection(line)
1080                        # we aren't None and 0 checking as above. needed?
1081                        self.unmatched.append(line)
1082                    if self.stopped:
1083                        self.logger.info(
1084                            "CsvPath has been stopped at line %s",
1085                            self.line_monitor.physical_line_number,
1086                        )
1087                        break
1088            else:
1089                self.logger.warning(
1090                    "Csvpath identified as {self.identity} is disabled by run-mode:no-run"
1091                )
1092            self.finalize()
1093            end = time.time()
1094            self.total_iteration_time = end - start
1095            self.logger.info("Run against %s is complete.", self.scanner.filename)
1096            self.logger.info(
1097                "Iteration time was %s", round(self.total_iteration_time, 2)
1098            )
1099            self.logger.info(
1100                "%s per line",
1101                round(
1102                    self.total_iteration_time
1103                    / self.line_monitor.physical_end_line_count,
1104                    2,
1105                ),
1106            )
1107        except Exception as e:
1108            if not self.ecoms.do_i_quiet():
1109                self.logger.error(e, exc_info=True)
1110            if self.ecoms.do_i_raise():
1111                raise
1112
1113    def _next_line(self) -> List[Any]:
1114        """@private"""
1115        self.logger.info("beginning to scan file: %s", self.scanner.filename)
1116        #
1117        # this exception will blow up a standalone CsvPath but should be
1118        # caught and handled if there is a CsvPaths.
1119        #
1120        # but when would it happen? shouldn't we just let Python's exception
1121        # handle it should it really occur?
1122        #
1123        if self.scanner.filename is None:
1124            raise FileException("There is no filename")
1125        #
1126        # DataFileReader is abstract. instantiating it results in a concrete subclass.
1127        # pylint doesn't like that just because it doesn't see what we're doing.
1128        # otoh, is this a bad way to do it? not sure but it works fine.
1129        #
1130        reader = DataFileReader(  # pylint: disable=E0110
1131            self.scanner.filename, delimiter=self.delimiter, quotechar=self.quotechar
1132        )
1133        for line in reader.next():
1134            self.track_line(line=line)
1135            yield line
1136        self.finalize()
1137
1138    def finalize(self) -> None:
1139        """@private
1140        clears caches, etc. this is an internal method, but not _ because
1141        it is part of the lifecycle and we might find a reason to call it
1142        from higher up.
1143        """
1144        # this method can run multiple times w/np, but that
1145        # shouldn't happen anyway.
1146        self._freeze_path = True
1147        if self.matcher:
1148            self.matcher.clear_caches()
1149
1150    def track_line(self, line) -> None:
1151        """@private
1152        csvpaths needs to handle some of the iteration logic, and we don't want
1153        to lose track of line number monitoring or repeat the code up there,
1154        so we need this method to give csvpaths a way to tap in.
1155        """
1156        last_line = None
1157        if self.matcher:
1158            last_line = self.matcher.line
1159        self.line_monitor.next_line(last_line=last_line, data=line)
1160        if self.line_monitor.physical_line_number == 0:
1161            self._run_started_at = datetime.now(timezone.utc)
1162
1163    def _consider_line(self, line):  # pylint: disable=R0912, R0911
1164        """@private"""
1165        # re: R0912: this method has already been refactored but maybe
1166        # there is more we can do?
1167        #
1168        # we always look at the last line so that last() has a
1169        # chance to run
1170        #
1171        # if we're empty, but last, we need to make sure the
1172        # matcher runs a final time so that any last() can run.
1173        #
1174        if self.line_monitor.is_last_line_and_blank(line):
1175            # if self.line_monitor.is_last_line_and_empty(line):
1176            self.logger.info("last line is empty. freezing, matching, returning false")
1177            self._freeze_path = True
1178            self.matches(line)
1179            return False
1180        if self.skip_blank_lines and len(line) == 0:
1181            self.logger.info(
1182                "Skipping line %s because blank", self.line_monitor.physical_line_number
1183            )
1184            return False
1185        if self.scanner.includes(self.line_monitor.physical_line_number):
1186            self.logger.debug("Scanner includes line")
1187            self.scan_count = self.scan_count + 1
1188            matches = None
1189            self._current_match_count = self.match_count
1190            if self.advance_count > 0:
1191                self.advance_count -= 1
1192                matches = False
1193                self.logger.debug(
1194                    "Advancing one line with {self.advance_count} more skips to go"
1195                )
1196            else:
1197                self.logger.debug("Starting matching")
1198                startmatch = time.perf_counter_ns()
1199                matches = self.matches(line)
1200                endmatch = time.perf_counter_ns()
1201                t = (endmatch - startmatch) / 1000000
1202                self.last_row_time = t
1203                self.rows_time += t
1204                self.logger.debug(
1205                    "CsvPath.matches:703: %s: matches: %s", self.identity, matches
1206                )
1207            #
1208            # if we are done scanning we can stop
1209            #
1210            if self.scanner.is_last(self.line_monitor.physical_line_number):
1211                self.stop()
1212            if matches is True:
1213                #
1214                # _current_match_count is a placeholder that
1215                # allows anyone to call a match early and update
1216                # the count. this is important when there is
1217                # an onmatch component that needs to use the
1218                # match_count. e.g. an onmatch print statement.
1219                # we would want the onmatch to propagate asap. we
1220                # can accept that there could be a variable set to
1221                # match count prior to the onmatch upping the
1222                # count. that wouldn't be great for explainability,
1223                # but order is important -- match components
1224                # impact each other left to right, top to bottom.
1225                #
1226                self.raise_match_count_if()
1227                if self.collect_when_not_matched:
1228                    return False
1229                return True
1230            if self.collect_when_not_matched:
1231                return True
1232            return False
1233        return False
1234
1235    def raise_match_count_if(self):
1236        """@private
1237        if the match count has already been raised earlier in the matching
1238        process than the caller we don't raise it; otherwise, we raise."""
1239        if self._current_match_count == self.match_count:
1240            self.match_count += 1
1241        else:
1242            self.logger.debug("Match count was already raised, so not doing it again")
1243
1244    def limit_collection(self, line: List[Any]) -> List[Any]:
1245        """@private
1246        this method creates a line based on the given line that holds only the headers
1247        that the csvpath says to collect. headers for collection are indicated using
1248        the collect() function.
1249        """
1250        if len(self.limit_collection_to) == 0:
1251            return line
1252        ls = []
1253        for k in self.limit_collection_to:
1254            if k is None or k >= len(line):
1255                raise InputException(
1256                    f"[{self.identity}] Line {self.line_monitor.physical_line_number}: unknown header name: {k}"
1257                )
1258            ls.append(line[k])
1259        return ls
1260
1261    def advance(self, ff: int = -1) -> None:
1262        """@private
1263        Advances the iteration by ff rows. The rows will be seen but not matched."""
1264        if ff is None:
1265            raise InputException("Input to advance must not be None")
1266        if self.line_monitor.physical_end_line_number is None:
1267            raise ProcessingException(
1268                "The last line number must be known (physical_end_line_number)"
1269            )
1270        if ff == -1:
1271            a = self.advance_count
1272            a = (
1273                self.line_monitor.physical_end_line_number
1274                - self.line_monitor.physical_line_number
1275                - a
1276            )
1277            self.advance_count = a
1278        else:
1279            self.advance_count += ff
1280        self.advance_count = min(
1281            self.advance_count, self.line_monitor.physical_end_line_number
1282        )
1283
1284    def get_total_lines(self) -> int:  # pylint: disable=C0116
1285        """@private"""
1286        if (
1287            self.line_monitor.physical_end_line_number is None
1288            or self.line_monitor.physical_end_line_number == 0
1289        ):
1290            self.get_total_lines_and_headers()
1291        return self.line_monitor.physical_end_line_number
1292
1293    def get_total_lines_and_headers(self) -> None:  # pylint: disable=C0116
1294        """@private"""
1295        if not self.scanner or not self.scanner.filename:
1296            self.logger.error(
1297                "Csvpath identified as %s has no filename. Since we could be error handling an exception is not raised.",
1298                self.identity,
1299            )
1300            # exp
1301            return
1302            # end exp
1303        #
1304        # there are times, e.g. when using Lambda, when it may be better to
1305        # not use a cache. in the case of Lambda the reason is to avoid working
1306        # around the read-only filesystem.
1307        #
1308        use_cache = self.csvpaths is not None
1309        if use_cache:
1310            uc = self.csvpaths.config.get(section="cache", name="use_cache")
1311            use_cache = uc is None or uc.strip().lower() != "no"
1312        if self.csvpaths and use_cache is True:
1313            self.line_monitor = self.csvpaths.file_manager.cacher.get_new_line_monitor(
1314                self.scanner.filename
1315            )
1316            self.headers = self.csvpaths.file_manager.cacher.get_original_headers(
1317                self.scanner.filename
1318            )
1319        else:
1320            lc = LineCounter(self)
1321            lm, headers = lc.get_lines_and_headers(self.scanner.filename)
1322            self.line_monitor = lm
1323            self.headers = headers
1324
1325    @property
1326    def current_scan_count(self) -> int:  # pylint: disable=C0116
1327        """@private"""
1328        return self.scan_count
1329
1330    @property
1331    def current_match_count(self) -> int:  # pylint: disable=C0116
1332        """@private"""
1333        return self.match_count
1334
1335    def matches(self, line) -> bool:  # pylint: disable=C0116
1336        """@private"""
1337        if not self.match:
1338            return True
1339        #
1340        # when we first consider a line we don't have a matcher. we build
1341        # it on the fly. later, we just reset the matcher for the new lines.
1342        #
1343        # when we originally call parse we're just parsing for the scanner:
1344        #
1345        #   path = CsvPath()
1346        #   path.parse ("$file[*][yes()]")
1347        #   path.fast_forward()
1348        #
1349        # "find_file" would be a more intuitive method name. we don't create
1350        # the path's matcher until the 3rd line. by then we're on the 3rd parser
1351        # and 4 parse.
1352        #
1353        if self.matcher is None:
1354            h = hashlib.sha256(self.match.encode("utf-8")).hexdigest()
1355            self.logger.info("Loading matcher with data. match part hash: %s", h)
1356            self.matcher = Matcher(
1357                csvpath=self, data=self.match, line=line, headers=self.headers, myid=h
1358            )
1359            self.matcher.AND = self.AND
1360            #
1361            # we need to register all the Expressions as error listeners. not
1362            # sure it matters if we do it here or allow the Matcher to do it.
1363            # since the Matcher is responsible for its Expressions, has a handle
1364            # this CsvPath, and through it has the error_manager let's let it
1365            # register the expressions.
1366            #
1367        else:
1368            self.logger.debug("Resetting and reloading matcher")
1369            self.matcher.reset()
1370            self.matcher.line = line
1371        matched = self.matcher.matches()
1372        return matched
1373
1374    def set_variable(self, name: str, *, value: Any, tracking: Any = None) -> None:
1375        """@private
1376        sets a variable and the tracking variable as a key within
1377        it, if a tracking value is provided."""
1378        if self._freeze_path:
1379            self.logger.warning(
1380                "Run is ending, variables are frozen. Cannot set %s to %s.", name, value
1381            )
1382            return
1383        if not name:
1384            raise VariableException(
1385                f"Name cannot be None: name: {name}, tracking: {tracking}, value: {value}"
1386            )
1387        if name.strip() == "":
1388            raise VariableException(
1389                f"""Name cannot be the empty string:
1390                    name: {name}, tracking: {tracking}, value: {value}"""
1391            )
1392        if tracking is not None and f"{tracking}".strip() == "":
1393            raise VariableException(
1394                f"""Tracking value cannot be empty.
1395                    name: {name}, tracking: {tracking}, value: {value}"""
1396            )
1397        if tracking is not None:
1398            if name not in self.variables:
1399                self.variables[name] = {}
1400            instances = self.variables[name]
1401            instances[tracking] = value
1402        else:
1403            self.variables[name] = value
1404
1405    def get_variable(  # pylint: disable=R0912
1406        self, name: str, *, tracking: Any = None, set_if_none: Any = None
1407    ) -> Any:
1408        """@private
1409        gets a variable by name. uses the tracking value as a key to get
1410        the value if the variable is a dictionary."""
1411        #
1412        # re: R0912: totally true. this is a scary method. plan to refactor.
1413        #
1414        if not name:
1415            raise VariableException("Name cannot be None")
1416        if self._freeze_path:
1417            #
1418            # run is ending, no more changes
1419            #
1420            set_if_none = None
1421        thevalue = None
1422        if tracking is not None:
1423            thedict = None
1424            thevalue = None
1425            if name in self.variables:
1426                thedict = self.variables[name]
1427                if not thedict:
1428                    thedict = {}
1429                    self.variables[name] = thedict
1430                    thedict[tracking] = set_if_none
1431            else:
1432                thedict = {}
1433                thedict[tracking] = set_if_none
1434                self.variables[name] = thedict
1435            if isinstance(thedict, dict):
1436                thevalue = thedict.get(tracking)
1437            if not thevalue and set_if_none is not None:
1438                thedict[tracking] = set_if_none
1439                thevalue = set_if_none
1440        else:
1441            if name not in self.variables:
1442                if set_if_none is not None:
1443                    self.variables[name] = set_if_none
1444                    thevalue = set_if_none
1445            else:
1446                thevalue = self.variables[name]
1447        if self._freeze_path:
1448            if isinstance(thevalue, list):
1449                #
1450                # run is ending, no more changes
1451                #
1452                thevalue = tuple(thevalue[:])
1453                self.logger.debug(
1454                    "Returning %s for frozen variable %s.%s", thevalue, name, tracking
1455                )
1456        return thevalue
1457
1458    def line_numbers(self) -> Iterator[int | str]:
1459        """@private
1460        returns all the line numbers the scanner will scan during
1461        the run of a csvpath"""
1462        these = self.scanner.these
1463        from_line = self.scanner.from_line
1464        to_line = self.scanner.to_line
1465        all_lines = self.scanner.all_lines
1466        return self._line_numbers(
1467            these=these, from_line=from_line, to_line=to_line, all_lines=all_lines
1468        )
1469
1470    def _line_numbers(
1471        self,
1472        *,
1473        these: List[int] = None,
1474        from_line: int = None,
1475        to_line: int = None,
1476        all_lines: bool = None,
1477    ) -> Iterator[int | str]:
1478        """@private"""
1479        if these is None:
1480            these = []
1481        if len(these) > 0:
1482            yield from these
1483        else:
1484            if from_line is not None and to_line is not None and from_line > to_line:
1485                yield from range(to_line, from_line + 1)
1486            elif from_line is not None and to_line is not None:
1487                yield from range(from_line, to_line + 1)
1488            elif from_line is not None:
1489                if all_lines:
1490                    yield f"{from_line}..."
1491                else:
1492                    yield from_line
1493            elif to_line is not None:
1494                yield f"0..{to_line}"
1495
1496    def collect_line_numbers(self) -> List[int | str]:  # pylint: disable=C0116
1497        """@private"""
1498        if self.scanner is None:
1499            raise ParsingException("No scanner available. Have you parsed a csvpath?")
1500        these = self.scanner.these
1501        from_line = self.scanner.from_line
1502        to_line = self.scanner.to_line
1503        all_lines = self.scanner.all_lines
1504        return self._collect_line_numbers(
1505            these=these, from_line=from_line, to_line=to_line, all_lines=all_lines
1506        )
1507
1508    def _collect_line_numbers(
1509        self,
1510        *,
1511        these: List[int] = None,
1512        from_line: int = None,
1513        to_line: int = None,
1514        all_lines: bool = None,
1515    ) -> List[int | str]:
1516        """@private"""
1517        collect = []
1518        if these is None:
1519            these = []
1520        for i in self._line_numbers(
1521            these=these, from_line=from_line, to_line=to_line, all_lines=all_lines
1522        ):
1523            collect.append(i)
1524        return collect
1525
1526    def header_index(self, name: str) -> int:  # pylint: disable=C0116
1527        """@private"""
1528        if not self.headers:
1529            return None
1530        for i, n in enumerate(self.headers):
1531            if n == name:
1532                return i
1533        return None

CsvPath represents a csvpath string that contains a reference to a file, scanning instructions, and rules for matching lines.

CsvPath( *, csvpaths=None, delimiter=',', quotechar='"', skip_blank_lines=True, print_default=True, config=None, error_manager=None)
 46    def __init__(  # pylint: disable=R0913
 47        self,
 48        *,
 49        csvpaths=None,
 50        delimiter=",",
 51        quotechar='"',
 52        skip_blank_lines=True,
 53        print_default=True,
 54        config=None,
 55        error_manager=None,
 56    ):
 57        # re: R0913: all reasonable pass-ins with sensible defaults
 58        #
 59        # the config.ini file loaded as a ConfigParser instance
 60        #
 61        # definitely do not want this coming from CsvPaths because
 62        # we want to be able to override config.ini specifically for
 63        # this instance, if needed; however, we do want to be able
 64        # to pass in a config object that has been configured in some
 65        # way.
 66        self._config = config
 67        self.scanner = None
 68        """ @private """
 69        self.matcher = None
 70        """ @private """
 71        #
 72        # a parent CsvPaths may manage a CsvPath instance. if so, it will enable
 73        # the use of named files and named paths, print capture, error handling,
 74        # results collection, reference handling, etc. if a CsvPaths is not present
 75        # the CsvPath instance is responsible for all its own upkeep and does not
 76        # have some of those capabilities.
 77        #
 78        self.csvpaths = csvpaths
 79        """ @private """
 80        #
 81        # there are two logger components one for CsvPath and one for CsvPaths.
 82        # the default levels are set in config.ini. to change the levels pass LogUtility
 83        # your component instance and the logging level. e.g.:
 84        # LogUtility.logger(csvpath, "debug")
 85        #
 86        self.logger = LogUtility.logger(self)
 87        """ @private """
 88        self.logger.info("initialized CsvPath")
 89        #
 90        # if we don't have a csvpaths these will both be None
 91        #
 92        self.named_paths_name = None
 93        """ @private """
 94        self.named_file_name = None
 95        """ @private """
 96        #
 97        # all errors come to our manager if they occur during matching. we use
 98        # the CsvPaths manager if possible. Otherwise, we just make our own that
 99        # only knows how to collect errors, not distribute them.
100        #
101        self.ecoms = ErrorCommunications(csvpath=self)
102        """ @private """
103        self.error_manager = ErrorManager(csvpath=self)
104        if csvpaths is not None:
105            self.error_manager.add_internal_listener(csvpaths.error_manager)
106        #
107        # modes are set in external comments
108        #
109        self.modes = ModeController(self)
110        """ @private """
111        #
112        # captures the number of lines up front and tracks line stats as the
113        # run progresses
114        #
115        self._line_monitor = None
116        #
117        # the scanning part of the csvpath. e.g. $test.csv[*]
118        #
119        self.scan = None
120        """ @private """
121        #
122        # the matching part of the csvpath. e.g. [yes()]
123        #
124        self.match = None
125        """ @private """
126        #
127        # when True the lines that do not match are returned from next()
128        # and collect(). this effectively switches CsvPath from being an
129        # create an OR expression in this case. in the default, we say:
130        #     are all of these things true?
131        # but when collect_when_not_matched is True we ask:
132        #     are any of these things not true?
133        #
134        # self._when_not_matched = False
135        self._headers = None
136        self.variables: Dict[str, Any] = {}
137        self.delimiter = delimiter
138        self.quotechar = quotechar
139        #
140        # a blank line has no headers. it has no data. physically it is 2 \n with
141        # nothing but whitespace between them. any data or any delimiters would make
142        # the line non-blank.
143        #
144        self.skip_blank_lines = skip_blank_lines
145        #
146        # in the case of a [*] scan where the last line is blank we would miss firing
147        # last() unless we take steps. instead, we allow that line to match, but we
148        # do not return a line to the caller of next() and we freeze the variables
149        # there is room for side effects make changes, but that a reasonable compromise
150        # between missing last and allowing unwanted changes. we definitely do not
151        # freeze is_valid or stop, which can be useful signaling, even in an
152        # inconsistent state.
153        #
154        self._freeze_path = False
155        #
156        # counts are 1-based
157        #
158        self.scan_count = 0
159        self.match_count = 0
160        #
161        # used by stop() and advance(). a stopped CsvPath halts without finishing
162        # its run. an advancing CsvPath doesn't consider the match part of the
163        # csvpath and does not incur any side effects as it progresses through the
164        # rows the advance skips. the skip() function has the same effect as
165        # advance(1) but without any guarantee that the other match components on
166        # the line will be considered before skipping ahead. there are likely
167        # corner cases where an onmatch qualifier or some other constraint will
168        # trigger match components that would otherwise be skipped so the ability
169        # to shortcut some of the match should not be relied on for anything
170        # critical.
171        #
172        self.stopped = False
173        self._advance = 0
174        #
175        # the lines var will hold a reference to a LineSpooler instance during a
176        # run, if lines are being collected by the collect() method. if the user
177        # is using this CsvPath instance directly the LineSpooler is only there
178        # as a proxy to the list of lines being collected -- we don't spool to
179        # disk, at least atm.
180        #
181        self.lines = None
182        """ @private """
183        #
184        # set by fail()
185        #
186        self._is_valid = True
187        #
188        # basic timing for the CsvPath instance only. if the CsvPath is managed
189        # by a CsvPaths the timings for a run may include time spent by other
190        # CsvPath instances.
191        #
192        self.last_row_time = -1
193        """ @private """
194        self.rows_time = -1
195        """ @private """
196        self.total_iteration_time = -1
197        """ @private """
198        #
199        # limiting collection means returning fewer headers (values in the
200        # line, a.k.a columns) then are available. limiting headers returned
201        # can impact named results, reset_headers(), and other considerations.
202        #
203        self._limit_collection_to = []
204        #
205        # error collecting is at the CsvPath instance by default. CsvPath
206        # instances that are managed by a CsvPaths have their errors collected
207        # by their Results as well. Result handles persistence.
208        #
209        # errors policies are set in config.ini at CsvPath and CsvPaths levels.
210        #
211        self._errors: List[Error] = []
212        #
213        # saves the scan and match parts of paths for reference. mainly helpful
214        # for testing the CsvPath library itself; not used end users. the run
215        # name becomes the file name of the saved path parts.
216        #
217        self._save_scan_dir = None
218        self._save_match_dir = None
219        self._run_name = None
220        #
221        # metadata is collected from "outer" csvpath comments. outer comments
222        # separate from the comments within the match part of the csvpath.
223        # the keys are words with colons. e.g. ~ name: my new csvpath ~
224        #
225        self.metadata: Dict[str, Any] = {}
226        #
227        # holds the current match count while we're in the middle of a match
228        # so that anyone who wants to can increase the match count using
229        # raise_match_count_if(). it is important to do the raise asap so that
230        # components that are onmatched have the right match count available.
231        #
232        self._current_match_count = 0
233        #
234        # printers receive print lines from the print function. the default
235        # printer prints to standard out. a CsvPath that is managed by a
236        # CsvPaths has its Results as a printer, as well as having
237        # the default printer.
238        #
239        self.printers = []
240        """ @private """
241        if print_default:
242            self.printers.append(StdOutPrinter())
243        #
244        # _function_times_match collects the time a function spends doing its matches()
245        #
246        self._function_times_match = {}
247        #
248        # _function_times_value collects the time a function spends doing its to_value()
249        #
250        self._function_times_value = {}
251        self._created_at = datetime.now(timezone.utc)
252        self._run_started_at = None
253        self._collecting = False
254        #
255        # holds the unmatched lines when lines are being collected and
256        # _unmatched_available is True. it is analogous to the lines returned
257        # by collect(), but is the lines not returned by collect().
258        #
259        self._unmatched = None
error_manager
variables: Dict[str, Any]
delimiter
quotechar
skip_blank_lines
scan_count
match_count
stopped
metadata: Dict[str, Any]
AND: bool
401    @property
402    def AND(self) -> bool:  # pylint: disable=C0103
403        return self.modes.logic_mode.value
OR: bool
409    @property
410    def OR(self) -> bool:  # pylint: disable=C0103
411        return not self.modes.logic_mode.value
identity: str
417    @property
418    def identity(self) -> str:
419        """returns id or name if found in metadata.
420
421        the id or name gets into metadata primarily if found
422        in an "external" comment in the csvpath. "external"
423        meaning outside the []s. comments are keyword:comment.
424        we take id, Id, ID and name, Name, NAME.
425
426        id is preferred over name. E.g. in:
427        ~ name: my path description: an example id: this value wins ~
428        the id becomes the identity of the instance.
429
430        we prefer in this order: all-lower most, Initial-caps,
431        ALL-CAPS least
432
433        the ordering is relied on in Result and possibly
434        elsewhere.
435        """
436        ret = None
437        if not self.metadata:
438            ret = ""
439        if "NAME" in self.metadata:
440            ret = self.metadata["NAME"]
441        if "Name" in self.metadata:
442            ret = self.metadata["Name"]
443        if "name" in self.metadata:
444            ret = self.metadata["name"]
445        if "ID" in self.metadata:
446            ret = self.metadata["ID"]
447        if "Id" in self.metadata:
448            ret = self.metadata["Id"]
449        if "id" in self.metadata:
450            ret = self.metadata["id"]
451        return ret

returns id or name if found in metadata.

the id or name gets into metadata primarily if found in an "external" comment in the csvpath. "external" meaning outside the []s. comments are keyword:comment. we take id, Id, ID and name, Name, NAME.

id is preferred over name. E.g. in: ~ name: my path description: an example id: this value wins ~ the id becomes the identity of the instance.

we prefer in this order: all-lower most, Initial-caps, ALL-CAPS least

the ordering is relied on in Result and possibly elsewhere.

errors: List[csvpath.managers.errors.error.Error]
472    @property
473    def errors(self) -> List[Error]:  # pylint: disable=C0116
474        return self._errors
errors_count: int
476    @property
477    def errors_count(self) -> int:  # pylint: disable=C0116
478        return len(self.errors)
def has_errors(self) -> bool:
484    def has_errors(self) -> bool:
485        return self.errors_count > 0
is_valid: bool
902    @property
903    def is_valid(self) -> bool:  # pragma: no cover
904        """Csvpaths can flag a CSV file as invalid using the fail() function"""
905        return self._is_valid

Csvpaths can flag a CSV file as invalid using the fail() function

completed: bool
911    @property
912    def completed(self) -> bool:
913        if not self.scanner or not self.line_monitor:
914            return False
915        if self.scanner.is_last(self.line_monitor.physical_line_number):
916            return True
917        return False
def collect( self, csvpath: str = None, *, nexts: int = -1, lines=None) -> Union[List[List[Any]], csvpath.util.line_spooler.LineSpooler]:
 981    def collect(
 982        self, csvpath: str = None, *, nexts: int = -1, lines=None
 983    ) -> List[List[Any]] | LineSpooler:
 984        """Runs the csvpath forward and returns the matching lines seen as
 985        a list of lists. this method does not holds lines locally, not as
 986        accessible attributes. lines are not kept after the run completes
 987        and the collected lines are returned.
 988
 989        the optional lines argument may be an instance of any class that has an append(obj)
 990        method. if lines is None, a list is returned.
 991        """
 992        if self.scanner is None and csvpath is not None:
 993            self.parse(csvpath)
 994        if nexts < -1:
 995            raise ProcessingException(
 996                "Input must be >= -1. -1 means collect to the end of the file."
 997            )
 998        self.collecting = True
 999        #
1000        # we're going to use the passed in lines object. if it exists
1001        # it is a LineSpooler (we presume). we'll associate it with the
1002        # csvpath temporarily so anyone interested can get it. if it
1003        # doesn't exist we create a list for the lines var. we create a
1004        # list LinesSpooler to handle any inquiries during the run we
1005        # do all our line collecting busness with the local lines var
1006        # which has an append method regardless of if it is list or
1007        # LineSpooler. when we're done we break the link with the csvpath
1008        # and return the local variable. if it is a list it is going to
1009        # a csvpath user. if it is a LineSpooler it is going to a
1010        # CsvPaths instance.
1011        #
1012        self.lines = lines
1013        if lines is None:
1014            lines = []
1015            self.lines = ListLineSpooler(lines=lines)
1016        for _ in self.next():
1017            _ = _[:]
1018            self.lines.append(_)
1019            if nexts == -1:
1020                continue
1021            if nexts > 1:
1022                nexts -= 1
1023            else:
1024                break
1025        # we don't want to hold on to data more than needed. but
1026        # we do want to return data if we're not spooling. the
1027        # way we do that is to keep the local var available with the
1028        # list and/or the spooler. the caller needs to be aware of
1029        # both possibilities, but both offer __len__ and append.
1030        #
1031        # we keep the self.lines if it is not a list because that
1032        # makes it available to the runtime data collector so we can
1033        # see the line count in the metadata, saving opening a
1034        # potentially large data.csv to find out how many lines.
1035        if isinstance(self.lines, list):
1036            self.lines = None
1037        return lines

Runs the csvpath forward and returns the matching lines seen as a list of lists. this method does not holds lines locally, not as accessible attributes. lines are not kept after the run completes and the collected lines are returned.

the optional lines argument may be an instance of any class that has an append(obj) method. if lines is None, a list is returned.

def fast_forward(self, csvpath=None):
1039    def fast_forward(self, csvpath=None):
1040        """Scans to the end of the CSV file. All scanned rows will be
1041        considered for match and variables and side effects will happen,
1042        but no rows will be returned or stored. -1 means to the end of
1043        the file. If you do not pass the csvpath string here you must first
1044        use the parse method."""
1045        if self.scanner is None and csvpath is not None:
1046            self.parse(csvpath)
1047        for _ in self.next():
1048            pass
1049        return self

Scans to the end of the CSV file. All scanned rows will be considered for match and variables and side effects will happen, but no rows will be returned or stored. -1 means to the end of the file. If you do not pass the csvpath string here you must first use the parse method.

def next(self, csvpath=None):
1055    def next(self, csvpath=None):
1056        """A generator function that steps through the CSV file returning
1057        matching rows."""
1058        try:
1059            if self.scanner is None and csvpath is not None:
1060                self.parse(csvpath)
1061            start = time.time()
1062            if self.will_run is True:
1063                for line in self._next_line():
1064                    b = self._consider_line(line)
1065                    if b:
1066                        line = self.limit_collection(line)
1067                        if line is None:
1068                            msg = "Line cannot be None"
1069                            self.logger.error(msg)
1070                            raise MatchException(msg)
1071                        if len(line) == 0:
1072                            msg = "Line cannot be len() == 0"
1073                            self.logger.error(msg)
1074                            raise MatchException(msg)
1075                        yield line
1076                    elif self.collecting and self.unmatched_available:
1077                        if self.unmatched is None:
1078                            self.unmatched = []
1079                        line = self.limit_collection(line)
1080                        # we aren't None and 0 checking as above. needed?
1081                        self.unmatched.append(line)
1082                    if self.stopped:
1083                        self.logger.info(
1084                            "CsvPath has been stopped at line %s",
1085                            self.line_monitor.physical_line_number,
1086                        )
1087                        break
1088            else:
1089                self.logger.warning(
1090                    "Csvpath identified as {self.identity} is disabled by run-mode:no-run"
1091                )
1092            self.finalize()
1093            end = time.time()
1094            self.total_iteration_time = end - start
1095            self.logger.info("Run against %s is complete.", self.scanner.filename)
1096            self.logger.info(
1097                "Iteration time was %s", round(self.total_iteration_time, 2)
1098            )
1099            self.logger.info(
1100                "%s per line",
1101                round(
1102                    self.total_iteration_time
1103                    / self.line_monitor.physical_end_line_count,
1104                    2,
1105                ),
1106            )
1107        except Exception as e:
1108            if not self.ecoms.do_i_quiet():
1109                self.logger.error(e, exc_info=True)
1110            if self.ecoms.do_i_raise():
1111                raise

A generator function that steps through the CSV file returning matching rows.

class CsvPaths(csvpath.csvpaths.CsvPathsCoordinator, csvpath.managers.errors.error_collector.ErrorCollector):
  59class CsvPaths(CsvPathsCoordinator, ErrorCollector):
  60    """
  61    a CsvPaths instance manages applying any number of csvpaths
  62    to any number of files. CsvPaths applies sets of csvpaths
  63    to a given file, on demand. Think of CsvPaths as a session
  64    object. It gives you a way to manage files, csvpaths, and
  65    the results generated by applying paths to files. It is not
  66    intended for concurrent use. If you need multiple threads,
  67    create multiple CsvPaths instances.
  68    """
  69
  70    # pylint: disable=too-many-instance-attributes
  71
  72    def __init__(
  73        self,
  74        *,
  75        delimiter=",",
  76        quotechar='"',
  77        skip_blank_lines=True,
  78        print_default=True,
  79        config: Config = None,
  80    ):
  81        self._config = Config() if config is None else config
  82        #
  83        # managers centralize activities, offer async potential, and
  84        # are where integrations hook in. ErrorManager functionality
  85        # must be available in CsvPath too. The others are CsvPaths
  86        # only.
  87        #
  88        self._paths_manager = None
  89        self._file_manager = None
  90        self._results_manager = None
  91        self._ecoms = None
  92        self._error_manager = None
  93        self._set_managers()
  94        #
  95        # TODO:
  96        # self.print_manager = ... <<<=== should we do this?
  97        #
  98        #
  99        self.print_default = print_default
 100        """ @private """
 101        self.delimiter = delimiter
 102        """ @private """
 103        self.quotechar = quotechar
 104        """ @private """
 105        self.skip_blank_lines = skip_blank_lines
 106        """ @private """
 107        self.current_matcher: CsvPath = None
 108        """ @private """
 109        self.logger = LogUtility.logger(self)
 110        """ @private """
 111        self._errors = []
 112        # coordinator attributes
 113        self._stop_all = False
 114        self._fail_all = False
 115        self._skip_all = False
 116        self._advance_all = 0
 117        self._current_run_time = None
 118        self._run_time_str = None
 119        self.named_paths_name = None
 120        """ @private """
 121        self.named_file_name = None
 122        """ @private """
 123        #
 124        # metrics is for OTLP OpenTelemetry. it should only
 125        # be used by the OTLP listener. it is here because
 126        # the integration may need a long-lived presence. if
 127        # needed, the first OTLP listener will set it up
 128        # before spinning up a thread. any other OTLP
 129        # listener threads that need to use a long-lived metric
 130        # will work with this property.
 131        #
 132        self.metrics = None
 133        """ @private """
 134        self.logger.info("initialized CsvPaths")
 135
 136    def _set_managers(self) -> None:
 137        self.paths_manager = PathsManager(csvpaths=self)
 138        self.file_manager = FileManager(csvpaths=self)
 139        self.results_manager = ResultsManager(csvpaths=self)
 140        self.ecoms = ErrorCommunications(csvpaths=self)
 141        self.error_manager = ErrorManager(csvpaths=self)
 142
 143    @property
 144    def ecoms(self) -> ErrorCommunications:
 145        """@private"""
 146        return self._ecoms
 147
 148    @ecoms.setter
 149    def ecoms(self, ec: ErrorCommunications) -> None:
 150        """@private"""
 151        self._ecoms = ec
 152
 153    @property
 154    def file_manager(self) -> FileManager:
 155        return self._file_manager
 156
 157    @file_manager.setter
 158    def file_manager(self, m: FileManager) -> None:
 159        self._file_manager = m
 160
 161    @property
 162    def results_manager(self) -> ResultsManager:
 163        return self._results_manager
 164
 165    @results_manager.setter
 166    def results_manager(self, m: ResultsManager) -> None:
 167        self._results_manager = m
 168
 169    @property
 170    def paths_manager(self) -> PathsManager:
 171        return self._paths_manager
 172
 173    @paths_manager.setter
 174    def paths_manager(self, m: PathsManager) -> None:
 175        self._paths_manager = m
 176
 177    @property
 178    def error_manager(self) -> ErrorManager:
 179        return self._error_manager
 180
 181    @error_manager.setter
 182    def error_manager(self, em: ErrorManager) -> None:
 183        if em.csvpaths is None:
 184            raise Exception("CsvPaths cannot be None")
 185        self._error_manager = em
 186
 187    def run_time_str(self, pathsname=None) -> str:
 188        """@private
 189        adds the stringified current run time to the named-paths
 190        group home_dir to create the run_dir"""
 191        if self._run_time_str is None and pathsname is None:
 192            raise CsvPathsException(
 193                "Cannot have None in both run_time_str and pathsname"
 194            )
 195        if self._run_time_str is None:
 196            self._run_time_str = self.results_manager.get_run_time_str(
 197                pathsname, self.current_run_time
 198            )
 199        return self._run_time_str
 200
 201    @property
 202    def current_run_time(self) -> datetime:
 203        """@private
 204        gets the time marking the start of the run. used to create the run home directory."""
 205        if self._current_run_time is None:
 206            self._current_run_time = datetime.now(timezone.utc)
 207        return self._current_run_time
 208
 209    def clear_run_coordination(self) -> None:
 210        """@private
 211        run coordination is the set of signals that csvpaths send to affect
 212        one another through the CsvPaths instance"""
 213        self._stop_all = False
 214        self._fail_all = False
 215        self._skip_all = False
 216        self._advance_all = 0
 217        self._current_run_time = None
 218        self._run_time_str = None
 219        self.logger.debug("Cleared run coordination")
 220
 221    def csvpath(self) -> CsvPath:
 222        """Gets a CsvPath object primed with a reference to this CsvPaths"""
 223        path = CsvPath(
 224            csvpaths=self,
 225            delimiter=self.delimiter,
 226            quotechar=self.quotechar,
 227            skip_blank_lines=self.skip_blank_lines,
 228            #
 229            # in the usual case we don't want csvpaths and its csvpath children
 230            # to share the same config. sharing doesn't offer much. the flexibility
 231            # of having separate configs is valuable.
 232            #
 233            config=None,
 234            print_default=self.print_default,
 235            error_manager=self.error_manager,
 236        )
 237        return path
 238
 239    def stop_all(self) -> None:  # pragma: no cover
 240        self._stop_all = True
 241
 242    def fail_all(self) -> None:  # pragma: no cover
 243        self._fail_all = True
 244
 245    def skip_all(self) -> None:  # pragma: no cover
 246        self._skip_all = True
 247
 248    def advance_all(self, lines: int) -> None:  # pragma: no cover
 249        self._advance_all = lines
 250
 251    @property
 252    def errors(self) -> List[Error]:  # pylint: disable=C0116
 253        """@private
 254        generally you should be looking at results_manager or error_manager for errors.
 255        """
 256        return self._errors
 257
 258    def collect_error(self, error: Error) -> None:  # pylint: disable=C0116
 259        """@private"""
 260        self._errors.append(error)
 261
 262    def has_errors(self) -> bool:  # pylint: disable=C0116
 263        """@private
 264        generally you should be looking at results_manager or error_manager for errors.
 265        """
 266        return len(self._errors) > 0
 267
 268    @property
 269    def config(self) -> Config:  # pylint: disable=C0116
 270        """@private"""
 271        if not self._config:
 272            self._config = Config()  # pragma: no cover
 273        return self._config
 274
 275    #
 276    # this is the preferred way to update config. it is preferred because
 277    # csvpath and csvpaths work off the same config file, even though they,
 278    # in some cases, have separate keys. if you update the config directly
 279    # before a run starts using the CsvPaths's Config you have to remember
 280    # to save and reload for it to effect both CsvPaths and CsvPath. this
 281    # method does the save and reload every time.
 282    #
 283    def add_to_config(self, section, key, value) -> None:
 284        """@private"""
 285        self.config.add_to_config(section=section, key=key, value=value)
 286        self.config.save_config()
 287        self.config.reload()
 288        self._set_managers()
 289
 290    def clean(self, *, paths) -> None:
 291        """@private
 292        at this time we do not recommend reusing CsvPaths, but it is doable
 293        you should clean before reuse unless you want to accumulate results."""
 294        self.results_manager.clean_named_results(paths)
 295        self.clear_run_coordination()
 296        # self.error_manager.reset()
 297        self._errors = []
 298        self.named_paths_name = None
 299        self.named_file_name = None
 300
 301    def collect_paths(self, *, pathsname, filename) -> None:
 302        """
 303        Sequentially does a CsvPath.collect() on filename for every named path. lines are collected into results."""
 304        paths = self.paths_manager.get_named_paths(pathsname)
 305        if paths is None:
 306            raise InputException(f"No named-paths found for {pathsname}")
 307        if len(paths) == 0:
 308            raise InputException(f"Named-paths group {pathsname} is empty")
 309        if "" in paths:
 310            raise InputException(
 311                f"Named-paths group {pathsname} has one or more empty csvpaths"
 312            )
 313        file = self.file_manager.get_named_file(filename)
 314        if file is None:
 315            raise InputException(f"No named-file found for {filename}")
 316        self.logger.info("Prepping %s and %s", filename, pathsname)
 317        self.clean(paths=pathsname)
 318        self.logger.info(
 319            "Beginning collect_paths %s with %s paths", pathsname, len(paths)
 320        )
 321        crt = self.run_time_str(pathsname)
 322        results = []
 323        #
 324        # run starts here
 325        #
 326        self.results_manager.start_run(
 327            run_dir=crt, pathsname=pathsname, filename=filename
 328        )
 329        #
 330        #
 331        #
 332        for i, path in enumerate(paths):
 333            csvpath = self.csvpath()
 334            if not csvpath.will_run:
 335                continue
 336            result = Result(
 337                csvpath=csvpath,
 338                file_name=filename,
 339                paths_name=pathsname,
 340                run_index=i,
 341                run_time=self.current_run_time,
 342                run_dir=crt,
 343            )
 344            # casting a broad net because if "raise" not in the error policy we
 345            # want to never fail during a run
 346            try:
 347                self._load_csvpath(
 348                    csvpath=csvpath,
 349                    path=path,
 350                    file=file,
 351                    pathsname=pathsname,
 352                    filename=filename,
 353                    crt=crt,
 354                )
 355                #
 356                # if run-mode: no-run we skip ahead without saving results
 357                #
 358                if not csvpath.will_run:
 359                    continue
 360                #
 361                # the add has to come after _load_csvpath because we need the identity or index
 362                # to be stable and the identity is found in load, if it exists.
 363                #
 364                self.results_manager.add_named_result(result)
 365                lines = result.lines
 366                self.logger.debug("Collecting lines using a %s", type(lines))
 367                csvpath.collect(lines=lines)
 368                if lines is None:
 369                    self.logger.error(  # pragma: no cover
 370                        "Unexpected None for lines after collect_paths: file: %s, match: %s",
 371                        file,
 372                        csvpath.match,
 373                    )
 374                #
 375                # this is obviously not a good idea for very large files!
 376                #
 377                result.unmatched = csvpath.unmatched
 378            except Exception as ex:  # pylint: disable=W0718
 379                # ex.trace = traceback.format_exc()
 380                # ex.source = self
 381                if self.error_manager.csvpaths is None:
 382                    raise Exception("ErrorManager's CsvPaths cannot be None")
 383                self.error_manager.handle_error(source=self, msg=f"{ex}")
 384                if self.ecoms.do_i_raise():
 385                    self.results_manager.save(result)
 386                    raise
 387            self.results_manager.save(result)
 388            results.append(result)
 389        #
 390        # run ends here
 391        #
 392        self.results_manager.complete_run(
 393            run_dir=crt, pathsname=pathsname, results=results
 394        )
 395        #
 396        # update/write run manifests here
 397        #  - validity (are all paths valid)
 398        #  - paths-completeness (did they all run and complete)
 399        #  - method (collect, fast_forward, next)
 400        #  - timestamp
 401        #
 402        self.clear_run_coordination()
 403        self.logger.info(
 404            "Completed collect_paths %s with %s paths", pathsname, len(paths)
 405        )
 406
 407    def _load_csvpath(
 408        self,
 409        *,
 410        csvpath: CsvPath,
 411        path: str,
 412        file: str,
 413        pathsname: str = None,
 414        filename,
 415        by_line: bool = False,
 416        crt: str,
 417    ) -> None:
 418        """@private"""
 419        # file is the physical file (+/- if preceding mode) filename is the named-file name
 420        self.logger.debug("Beginning to load csvpath %s with file %s", path, file)
 421        csvpath.named_paths_name = pathsname
 422        self.named_paths_name = pathsname
 423        csvpath.named_file_name = filename
 424        self.named_file_name = filename
 425        #
 426        # by_line==True means we are starting a run that is breadth-first ultimately using
 427        # next_by_line(). by_line runs cannot be source-mode preceding and have different
 428        # semantics around csvpaths influencing one another.
 429        #
 430        # we strip comments from above the path so we need to extract them first
 431        path = MetadataParser(self).extract_metadata(instance=csvpath, csvpath=path)
 432        identity = csvpath.identity
 433        self.logger.debug("Csvpath %s after metadata extract: %s", identity, path)
 434        # update the settings using the metadata fields we just collected
 435        csvpath.update_settings_from_metadata()
 436        #
 437        # if we have a reference, resolve it. we may not actually use the file
 438        # if we're in source-mode: preceding, but that doesn't matter from the
 439        # pov of the reference.
 440        #
 441        if filename.startswith("$"):
 442            self.logger.debug(
 443                "File name is a reference: %s. Replacing the path passed in with the reffed data file path.",
 444                filename,
 445            )
 446            file = self.results_manager.data_file_for_reference(filename, not_name=crt)
 447        #
 448        #
 449        #
 450        if csvpath.data_from_preceding is True:
 451            if by_line is True:
 452                raise CsvPathsException(
 453                    "Breadth-first runs do not support source-mode preceding because each line of data flows through each csvpath in order already"
 454                )
 455            #
 456            # we are in source-mode: preceding
 457            # that means we ignore the original data file path and
 458            # instead use the data.csv from the preceding csvpath. that is,
 459            # assuming there is a preceding csvpath and it created and
 460            # saved data.
 461            #
 462            # find the preceding csvpath in the named-paths
 463            #
 464            # we may be in a file reference like: $sourcemode.csvpaths.source2:from. if so
 465            # we just need the named-paths name.
 466            #
 467            if pathsname.startswith("$"):
 468                self.logger.debug(
 469                    "Named-paths name is a reference: %s. Stripping it down to just the actual named-paths name.",
 470                    pathsname,
 471                )
 472                pathsname = pathsname[1 : pathsname.find(".")]
 473            result = self.results_manager.get_last_named_result(
 474                name=pathsname, before=csvpath.identity
 475            )
 476            if result is not None:
 477                # get its data.csv path for this present run
 478                # swap in that path for the regular origin path
 479                file = result.data_file_path
 480                csvpath.metadata["source-mode-source"] = file
 481                self.logger.info(
 482                    "Csvpath identified as %s uses last csvpath's data.csv at %s as source",
 483                    csvpath.identity,
 484                    file,
 485                )
 486            else:
 487                self.logger.warning(
 488                    "Cannot find a preceding data file to use for csvpath identified as %s running in source-mode",
 489                    csvpath.identity,
 490                )
 491        f = path.find("[")
 492        self.logger.debug("Csvpath matching part starts at char # %s", f)
 493        apath = f"${file}{path[f:]}"
 494        self.logger.info("Parsing csvpath %s", apath)
 495        csvpath.parse(apath)
 496        #
 497        # ready to run. time to register the run. this is separate from
 498        # the run_register.py (ResultsRegister) event
 499        #
 500        # crt = self.run_time_str(pathsname)
 501        # fingerprint = self.file_manager.get_fingerprint_for_name(filename)
 502        #
 503        self.logger.debug("Done loading csvpath")
 504
 505    def fast_forward_paths(self, *, pathsname, filename):
 506        """Sequentially does a CsvPath.fast_forward() on filename for every named path. No matches are collected."""
 507        paths = self.paths_manager.get_named_paths(pathsname)
 508        file = self.file_manager.get_named_file(filename)
 509        self.logger.info("Prepping %s and %s", filename, pathsname)
 510        self.clean(paths=pathsname)
 511        self.logger.info(
 512            "Beginning FF %s with %s paths against file %s. No match results will be held.",
 513            pathsname,
 514            len(paths),
 515            filename,
 516        )
 517        crt = self.run_time_str(pathsname)
 518        #
 519        # run starts here
 520        #
 521        self.results_manager.start_run(
 522            run_dir=crt, pathsname=pathsname, filename=filename
 523        )
 524        #
 525        #
 526        #
 527        results = []
 528        for i, path in enumerate(paths):
 529            csvpath = self.csvpath()
 530            self.logger.debug("Beginning to FF CsvPath instance: %s", csvpath)
 531            result = Result(
 532                csvpath=csvpath,
 533                file_name=filename,
 534                paths_name=pathsname,
 535                run_index=i,
 536                run_time=self.current_run_time,
 537                run_dir=crt,
 538            )
 539            try:
 540                self._load_csvpath(
 541                    csvpath=csvpath,
 542                    path=path,
 543                    file=file,
 544                    pathsname=pathsname,
 545                    filename=filename,
 546                    crt=crt,
 547                )
 548                #
 549                # if run-mode: no-run we skip ahead without saving results
 550                #
 551                if not csvpath.will_run:
 552                    continue
 553                #
 554                # the add has to come after _load_csvpath because we need the identity or index
 555                # to be stable and the identity is found in load, if it exists.
 556                #
 557                self.results_manager.add_named_result(result)
 558                self.logger.info(
 559                    "Parsed csvpath %s pointed at %s and starting to fast-forward",
 560                    i,
 561                    file,
 562                )
 563                csvpath.fast_forward()
 564                self.logger.info(
 565                    "Completed fast forward of csvpath %s against %s", i, file
 566                )
 567            except Exception as ex:  # pylint: disable=W0718
 568                # ex.trace = traceback.format_exc()
 569                # ex.source = self
 570                self.error_manager.handle_error(source=self, msg=f"{ex}")
 571                if self.ecoms.do_i_raise():
 572                    self.results_manager.save(result)
 573                    raise
 574            self.results_manager.save(result)
 575            results.append(result)
 576        #
 577        # run ends here
 578        #
 579        self.results_manager.complete_run(
 580            run_dir=crt, pathsname=pathsname, results=results
 581        )
 582        self.clear_run_coordination()
 583        self.logger.info(
 584            "Completed fast_forward_paths %s with %s paths", pathsname, len(paths)
 585        )
 586
 587    def next_paths(
 588        self, *, pathsname, filename, collect: bool = False
 589    ):  # pylint: disable=R0914
 590        """Does a CsvPath.next() on filename for every line against every named path in sequence"""
 591        paths = self.paths_manager.get_named_paths(pathsname)
 592        file = self.file_manager.get_named_file(filename)
 593        self.logger.info("Prepping %s and %s", filename, pathsname)
 594        self.clean(paths=pathsname)
 595        self.logger.info("Beginning next_paths with %s paths", len(paths))
 596        crt = self.run_time_str(pathsname)
 597        #
 598        # run starts here
 599        #
 600        self.results_manager.start_run(
 601            run_dir=crt, pathsname=pathsname, filename=filename
 602        )
 603        #
 604        #
 605        #
 606        results = []
 607        for i, path in enumerate(paths):
 608            if self._skip_all:
 609                skip_err = "Found the skip-all signal set. skip_all() is"
 610                skip_err = f"{skip_err} only for breadth-first runs using the"
 611                skip_err = f"{skip_err} '_by_line' methods. It has the same"
 612                skip_err = f"{skip_err} effect as skip() in a"
 613                skip_err = f"{skip_err} serial run like this one."
 614                self.logger.error(skip_err)
 615            if self._stop_all:
 616                self.logger.warning("Stop-all set. Shutting down run.")
 617                break
 618            if self._advance_all > 0:
 619                advance_err = "Found the advance-all signal set. advance_all() is"
 620                advance_err = f"{advance_err} only for breadth-first runs using the"
 621                advance_err = f"{advance_err} '_by_line' methods. It has the same"
 622                advance_err = f"{advance_err} effect as advance() in a"
 623                advance_err = f"{advance_err} serial run like this one."
 624                self.logger.error(advance_err)
 625            csvpath = self.csvpath()
 626            result = Result(
 627                csvpath=csvpath,
 628                file_name=filename,
 629                paths_name=pathsname,
 630                run_index=i,
 631                run_time=self.current_run_time,
 632                run_dir=crt,
 633            )
 634            if self._fail_all:
 635                self.logger.warning(
 636                    "Fail-all set. Failing all remaining CsvPath instances in the run."
 637                )
 638                csvpath.is_valid = False
 639            try:
 640                self._load_csvpath(
 641                    csvpath=csvpath,
 642                    path=path,
 643                    file=file,
 644                    pathsname=pathsname,
 645                    filename=filename,
 646                    crt=crt,
 647                )
 648                #
 649                # if run-mode: no-run we skip ahead without saving results
 650                #
 651                if not csvpath.will_run:
 652                    continue
 653                #
 654                # the add has to come after _load_csvpath because we need the identity or index
 655                # to be stable and the identity is found in load, if it exists.
 656                #
 657                self.results_manager.add_named_result(result)
 658                for line in csvpath.next():
 659                    #
 660                    # removed dec 1. why was this? it doesn't seem to make sense and
 661                    # removing it doesn't break any unit tests. was it a mistake?
 662                    #
 663                    # line.append(result)
 664                    if collect:
 665                        result.append(line)
 666                        result.unmatched = csvpath.unmatched
 667                    yield line
 668            except Exception as ex:  # pylint: disable=W0718
 669                self.error_manager.handle_error(source=self, msg=f"{ex}")
 670                if self.ecoms.do_i_raise():
 671                    self.results_manager.save(result)
 672                    raise
 673            self.results_manager.save(result)
 674            results.append(result)
 675        #
 676        # run ends here
 677        #
 678        self.results_manager.complete_run(
 679            run_dir=crt, pathsname=pathsname, results=results
 680        )
 681        self.clear_run_coordination()
 682
 683    # =============== breadth first processing ================
 684
 685    def collect_by_line(
 686        self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False
 687    ):
 688        """Does a CsvPath.collect() on filename where each row is considered
 689        by every named path before the next row starts
 690
 691        next_by_line for if_all_agree and collect_when_not_matched.
 692        """
 693        self.logger.info(
 694            "Starting collect_by_line for paths: %s and file: %s", pathsname, filename
 695        )
 696        lines = []
 697        for line in self.next_by_line(  # pylint: disable=W0612
 698            pathsname=pathsname,
 699            filename=filename,
 700            collect=True,
 701            if_all_agree=if_all_agree,
 702            collect_when_not_matched=collect_when_not_matched,
 703        ):
 704            # re: W0612: we need 'line' in order to do the iteration. we have to iterate.
 705            lines.append(line)
 706        self.logger.info(
 707            "Completed collect_by_line for paths: %s and file: %s", pathsname, filename
 708        )
 709        #
 710        # the results have all the lines according to what CsvPath captured them, but
 711        # since we're doing if_all_agree T/F we should return the union here. for some
 712        # files this obviously makes the data in memory problem even bigger, but it's
 713        # operator's responsibility to know if that will be a problem for their use
 714        # case.
 715        #
 716        return lines
 717
 718    def fast_forward_by_line(
 719        self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False
 720    ):
 721        """Does a CsvPath.fast_forward() on filename where each row is
 722        considered by every named path before the next row starts
 723
 724        next_by_line for if_all_agree and collect_when_not_matched.
 725        """
 726        self.logger.info(
 727            "Starting fast_forward_by_line for paths: %s and file: %s",
 728            pathsname,
 729            filename,
 730        )
 731        for line in self.next_by_line(  # pylint: disable=W0612
 732            pathsname=pathsname,
 733            filename=filename,
 734            collect=False,
 735            if_all_agree=if_all_agree,
 736            collect_when_not_matched=collect_when_not_matched,
 737        ):
 738            # re: W0612: we need 'line' in order to do the iteration. we have to iterate.
 739            pass
 740        self.logger.info(
 741            "Completed fast_forward_by_line for paths: %s and file: %s",
 742            pathsname,
 743            filename,
 744        )
 745
 746    def next_by_line(  # pylint: disable=R0912,R0915,R0914
 747        self,
 748        *,
 749        pathsname,
 750        filename,
 751        collect: bool = False,
 752        if_all_agree=False,
 753        collect_when_not_matched=False,
 754    ) -> List[Any]:
 755        """Does a CsvPath.next() on filename where each row is considered
 756        by every named path before the next row starts.
 757
 758        if_all_agree=True means all the CsvPath instances must match for
 759        the line to be kept. However, every CsvPath instance will keep its
 760        own matches in its results regardless of if every line kept was
 761        returned to the caller by CsvPaths.
 762
 763        collect_when_not_matched=True inverts the match so that lines
 764        which did not match are returned, rather than the default behavior.
 765        """
 766        # re: R0912 -- absolutely. plan to refactor.
 767        self.logger.info("Prepping %s and %s", filename, pathsname)
 768        self.clean(paths=pathsname)
 769        fn = self.file_manager.get_named_file(filename)
 770        paths = self.paths_manager.get_named_paths(pathsname)
 771        if (
 772            paths is None or not isinstance(paths, list) or len(paths) == 0
 773        ):  # pragma: no cover
 774            raise InputException(
 775                f"Pathsname '{pathsname}' must name a list of csvpaths"
 776            )
 777        #
 778        # experiment!
 779        #
 780        crt = self.run_time_str(pathsname)
 781        #
 782        # also use of crt below
 783        #
 784        csvpath_objects = self._load_csvpath_objects(
 785            paths=paths,
 786            named_file=fn,
 787            collect_when_not_matched=collect_when_not_matched,
 788            filename=filename,
 789            pathsname=pathsname,
 790            crt=crt,
 791        )
 792        #
 793        # prep has to come after _load_csvpath_objects because we need the identity or
 794        # indexes to be stable and the identity is found in the load, if it exists.
 795        #
 796        self._prep_csvpath_results(
 797            csvpath_objects=csvpath_objects,
 798            filename=filename,
 799            pathsname=pathsname,
 800            crt=crt,
 801        )
 802        #
 803        # setting fn into the csvpath is less obviously useful at CsvPaths
 804        # but we'll do it for consistency.
 805        #
 806        self.logger.info("Beginning next_by_line with %s paths", len(csvpath_objects))
 807        reader = FileManager.get_reader(
 808            fn, delimiter=self.delimiter, quotechar=self.quotechar
 809        )
 810        stopped_count: List[int] = []
 811        for line in reader.next():
 812            # for line in reader:  # pylint: disable=R1702
 813            # question to self: should this default be in a central place
 814            # so that we can switch to OR, in part by changing the default?
 815            keep = if_all_agree
 816            self._skip_all = False
 817            self._advance_all = 0
 818            try:
 819                # p is a (CsvPath, List[List[str]]) where the second item is
 820                # the line-by-line results of the first item's matching
 821                for p in csvpath_objects:
 822                    #
 823                    # if run-mode: no-run we skip ahead without saving results
 824                    #
 825                    if not p[0].will_run:
 826                        continue
 827                    self.current_matcher = p[0]
 828                    if self._fail_all:
 829                        self.logger.warning(
 830                            "Fail-all set. Setting CsvPath is_valid to False."
 831                        )
 832                        self.current_matcher.is_valid = False
 833                    if self._stop_all:
 834                        self.logger.warning("Stop-all set. Shutting down run.")
 835                        self.current_matcher.stopped = True
 836                        continue
 837                    if self._skip_all:
 838                        self.logger.warning("Skip-all set. Continuing to next.")
 839                        #
 840                        # all following CsvPaths must have their
 841                        # line_monitors incremented
 842                        #
 843                        self.current_matcher.track_line(line)
 844                        continue
 845                    if self._advance_all > 0:
 846                        logtxt = "Advance-all set. Setting advance. "
 847                        logtxt = f"{logtxt}CsvPath and its Matcher will handle the advancing."
 848                        self.logger.info(logtxt)
 849                        #
 850                        # CsvPath will handle advancing so we don't need to do
 851                        # anything, including track_line(line). we just need to
 852                        # see if we're setting advance or increasing it.
 853                        #
 854                        a = self.current_matcher.advance_count
 855                        if self._advance_all > a:
 856                            self.current_matcher.advance_count = self._advance_all
 857                        #
 858                        # all following CsvPaths must have their
 859                        # advance incremented -- with the advance not being simply
 860                        # additive, have to be mindful of any existing advance
 861                        # count!
 862                        #
 863                    if self.current_matcher.stopped:  # pylint: disable=R1724
 864                        continue
 865
 866                    #
 867                    # allowing the match to happen regardless of keep
 868                    # because we may want side-effects or to have different
 869                    # results in different named-results, as well as the
 870                    # union
 871                    #
 872                    self.logger.debug(
 873                        "considering line with csvpath identified as: %s",
 874                        self.current_matcher.identity,
 875                    )
 876                    matched = False
 877                    self.current_matcher.track_line(line)
 878                    #
 879                    # re: W0212: treating _consider_line something like package private
 880                    #
 881                    matched = (
 882                        self.current_matcher._consider_line(  # pylint:disable=W0212
 883                            line
 884                        )
 885                    )
 886                    if self.current_matcher.stopped:
 887                        stopped_count.append(1)
 888                    if if_all_agree:
 889                        keep = keep and matched
 890                    else:
 891                        keep = keep or matched
 892                    #
 893                    # not doing continue if we have if_all_agree and not keep as we
 894                    # used to do allows individual results to have lines that in
 895                    # aggregate we do not keep.
 896                    #
 897                    if matched and collect:
 898                        line = self.current_matcher.limit_collection(line)
 899                        p[1].append(line)
 900            except Exception as ex:  # pylint: disable=W0718
 901                # ex.trace = traceback.format_exc()
 902                # ex.source = self
 903                self.error_manager.handle_error(source=self, msg=f"{ex}")
 904                if self.ecoms.do_i_raise():
 905                    for r in csvpath_objects:
 906                        result = r[1]
 907                        result.unmatched = r[0].unmatched
 908                        self.results_manager.save(result)
 909                    raise
 910            # we yield even if we stopped in this iteration.
 911            # caller needs to see what we stopped on.
 912            #
 913            # ! we only yield if keep is True
 914            #
 915            if keep:
 916                yield line
 917            if sum(stopped_count) == len(csvpath_objects):
 918                break
 919        results = []
 920        for r in csvpath_objects:
 921            result = r[1]
 922            results.append(result)
 923            result.unmatched = r[0].unmatched
 924            self.results_manager.save(result)
 925
 926        #
 927        # run ends here
 928        #
 929        self.results_manager.complete_run(
 930            run_dir=results[0].run_dir, pathsname=pathsname, results=results
 931        )
 932        self.clear_run_coordination()
 933
 934    def _load_csvpath_objects(
 935        self,
 936        *,
 937        paths: List[str],
 938        named_file: str,
 939        collect_when_not_matched=False,
 940        filename,
 941        pathsname,
 942        crt: str,
 943    ):
 944        """@private"""
 945        csvpath_objects = []
 946        for path in paths:
 947            csvpath = self.csvpath()
 948            csvpath.collect_when_not_matched = collect_when_not_matched
 949            try:
 950                self._load_csvpath(
 951                    csvpath=csvpath,
 952                    path=path,
 953                    file=named_file,
 954                    filename=filename,
 955                    pathsname=pathsname,
 956                    by_line=True,
 957                    crt=crt,
 958                )
 959                if csvpath.data_from_preceding is True:
 960                    # this exception raise may be redundant, but I'm leaving it for now for good measure.
 961                    raise CsvPathsException(
 962                        "Csvpath identified as {csvpath.identity} is set to use preceding data, but CsvPaths's by_line methods do not permit that"
 963                    )
 964                csvpath_objects.append([csvpath, []])
 965            except Exception as ex:  # pylint: disable=W0718
 966                ex.trace = traceback.format_exc()
 967                ex.source = self
 968                # the error collector is the Results. it registers itself with
 969                # the csvpath as the error collector. not as straightforward but
 970                # effectively same as we do above
 971                self.error_manager.handle_error(source=self, msg=f"{ex}")
 972        return csvpath_objects
 973
 974    def _prep_csvpath_results(self, *, csvpath_objects, filename, pathsname, crt: str):
 975        """@private"""
 976        #
 977        # run starts here
 978        #
 979        self.results_manager.start_run(
 980            run_dir=crt, pathsname=pathsname, filename=filename
 981        )
 982        #
 983        #
 984        #
 985        for i, csvpath in enumerate(csvpath_objects):
 986            try:
 987                #
 988                # Result will set itself into its CsvPath as error collector
 989                # printer, etc.
 990                #
 991                result = Result(
 992                    csvpath=csvpath[0],
 993                    file_name=filename,
 994                    paths_name=pathsname,
 995                    lines=csvpath[1],
 996                    run_index=i,
 997                    run_time=self.current_run_time,
 998                    run_dir=crt,
 999                    by_line=True,
1000                )
1001                csvpath[1] = result
1002                #
1003                # the add has to come after _load_csvpath because we need the identity or index
1004                # to be stable and the identity is found in load, if it exists.
1005                #
1006                self.results_manager.add_named_result(result)
1007            except Exception as ex:  # pylint: disable=W0718
1008                """
1009                ex.trace = traceback.format_exc()
1010                ex.source = self
1011                ErrorHandler(csvpaths=self, error_collector=csvpath).handle_error(ex)
1012                """
1013                self.error_manager.handle_error(source=self, msg=f"{ex}")
1014                #
1015                # keep this comment for modelines avoidance
1016                #

a CsvPaths instance manages applying any number of csvpaths to any number of files. CsvPaths applies sets of csvpaths to a given file, on demand. Think of CsvPaths as a session object. It gives you a way to manage files, csvpaths, and the results generated by applying paths to files. It is not intended for concurrent use. If you need multiple threads, create multiple CsvPaths instances.

CsvPaths( *, delimiter=',', quotechar='"', skip_blank_lines=True, print_default=True, config: csvpath.util.config.Config = None)
 72    def __init__(
 73        self,
 74        *,
 75        delimiter=",",
 76        quotechar='"',
 77        skip_blank_lines=True,
 78        print_default=True,
 79        config: Config = None,
 80    ):
 81        self._config = Config() if config is None else config
 82        #
 83        # managers centralize activities, offer async potential, and
 84        # are where integrations hook in. ErrorManager functionality
 85        # must be available in CsvPath too. The others are CsvPaths
 86        # only.
 87        #
 88        self._paths_manager = None
 89        self._file_manager = None
 90        self._results_manager = None
 91        self._ecoms = None
 92        self._error_manager = None
 93        self._set_managers()
 94        #
 95        # TODO:
 96        # self.print_manager = ... <<<=== should we do this?
 97        #
 98        #
 99        self.print_default = print_default
100        """ @private """
101        self.delimiter = delimiter
102        """ @private """
103        self.quotechar = quotechar
104        """ @private """
105        self.skip_blank_lines = skip_blank_lines
106        """ @private """
107        self.current_matcher: CsvPath = None
108        """ @private """
109        self.logger = LogUtility.logger(self)
110        """ @private """
111        self._errors = []
112        # coordinator attributes
113        self._stop_all = False
114        self._fail_all = False
115        self._skip_all = False
116        self._advance_all = 0
117        self._current_run_time = None
118        self._run_time_str = None
119        self.named_paths_name = None
120        """ @private """
121        self.named_file_name = None
122        """ @private """
123        #
124        # metrics is for OTLP OpenTelemetry. it should only
125        # be used by the OTLP listener. it is here because
126        # the integration may need a long-lived presence. if
127        # needed, the first OTLP listener will set it up
128        # before spinning up a thread. any other OTLP
129        # listener threads that need to use a long-lived metric
130        # will work with this property.
131        #
132        self.metrics = None
133        """ @private """
134        self.logger.info("initialized CsvPaths")
153    @property
154    def file_manager(self) -> FileManager:
155        return self._file_manager
161    @property
162    def results_manager(self) -> ResultsManager:
163        return self._results_manager
169    @property
170    def paths_manager(self) -> PathsManager:
171        return self._paths_manager
error_manager: csvpath.managers.errors.error_manager.ErrorManager
177    @property
178    def error_manager(self) -> ErrorManager:
179        return self._error_manager
def csvpath(self) -> CsvPath:
221    def csvpath(self) -> CsvPath:
222        """Gets a CsvPath object primed with a reference to this CsvPaths"""
223        path = CsvPath(
224            csvpaths=self,
225            delimiter=self.delimiter,
226            quotechar=self.quotechar,
227            skip_blank_lines=self.skip_blank_lines,
228            #
229            # in the usual case we don't want csvpaths and its csvpath children
230            # to share the same config. sharing doesn't offer much. the flexibility
231            # of having separate configs is valuable.
232            #
233            config=None,
234            print_default=self.print_default,
235            error_manager=self.error_manager,
236        )
237        return path

Gets a CsvPath object primed with a reference to this CsvPaths

def collect_paths(self, *, pathsname, filename) -> None:
301    def collect_paths(self, *, pathsname, filename) -> None:
302        """
303        Sequentially does a CsvPath.collect() on filename for every named path. lines are collected into results."""
304        paths = self.paths_manager.get_named_paths(pathsname)
305        if paths is None:
306            raise InputException(f"No named-paths found for {pathsname}")
307        if len(paths) == 0:
308            raise InputException(f"Named-paths group {pathsname} is empty")
309        if "" in paths:
310            raise InputException(
311                f"Named-paths group {pathsname} has one or more empty csvpaths"
312            )
313        file = self.file_manager.get_named_file(filename)
314        if file is None:
315            raise InputException(f"No named-file found for {filename}")
316        self.logger.info("Prepping %s and %s", filename, pathsname)
317        self.clean(paths=pathsname)
318        self.logger.info(
319            "Beginning collect_paths %s with %s paths", pathsname, len(paths)
320        )
321        crt = self.run_time_str(pathsname)
322        results = []
323        #
324        # run starts here
325        #
326        self.results_manager.start_run(
327            run_dir=crt, pathsname=pathsname, filename=filename
328        )
329        #
330        #
331        #
332        for i, path in enumerate(paths):
333            csvpath = self.csvpath()
334            if not csvpath.will_run:
335                continue
336            result = Result(
337                csvpath=csvpath,
338                file_name=filename,
339                paths_name=pathsname,
340                run_index=i,
341                run_time=self.current_run_time,
342                run_dir=crt,
343            )
344            # casting a broad net because if "raise" not in the error policy we
345            # want to never fail during a run
346            try:
347                self._load_csvpath(
348                    csvpath=csvpath,
349                    path=path,
350                    file=file,
351                    pathsname=pathsname,
352                    filename=filename,
353                    crt=crt,
354                )
355                #
356                # if run-mode: no-run we skip ahead without saving results
357                #
358                if not csvpath.will_run:
359                    continue
360                #
361                # the add has to come after _load_csvpath because we need the identity or index
362                # to be stable and the identity is found in load, if it exists.
363                #
364                self.results_manager.add_named_result(result)
365                lines = result.lines
366                self.logger.debug("Collecting lines using a %s", type(lines))
367                csvpath.collect(lines=lines)
368                if lines is None:
369                    self.logger.error(  # pragma: no cover
370                        "Unexpected None for lines after collect_paths: file: %s, match: %s",
371                        file,
372                        csvpath.match,
373                    )
374                #
375                # this is obviously not a good idea for very large files!
376                #
377                result.unmatched = csvpath.unmatched
378            except Exception as ex:  # pylint: disable=W0718
379                # ex.trace = traceback.format_exc()
380                # ex.source = self
381                if self.error_manager.csvpaths is None:
382                    raise Exception("ErrorManager's CsvPaths cannot be None")
383                self.error_manager.handle_error(source=self, msg=f"{ex}")
384                if self.ecoms.do_i_raise():
385                    self.results_manager.save(result)
386                    raise
387            self.results_manager.save(result)
388            results.append(result)
389        #
390        # run ends here
391        #
392        self.results_manager.complete_run(
393            run_dir=crt, pathsname=pathsname, results=results
394        )
395        #
396        # update/write run manifests here
397        #  - validity (are all paths valid)
398        #  - paths-completeness (did they all run and complete)
399        #  - method (collect, fast_forward, next)
400        #  - timestamp
401        #
402        self.clear_run_coordination()
403        self.logger.info(
404            "Completed collect_paths %s with %s paths", pathsname, len(paths)
405        )

Sequentially does a CsvPath.collect() on filename for every named path. lines are collected into results.

def fast_forward_paths(self, *, pathsname, filename):
505    def fast_forward_paths(self, *, pathsname, filename):
506        """Sequentially does a CsvPath.fast_forward() on filename for every named path. No matches are collected."""
507        paths = self.paths_manager.get_named_paths(pathsname)
508        file = self.file_manager.get_named_file(filename)
509        self.logger.info("Prepping %s and %s", filename, pathsname)
510        self.clean(paths=pathsname)
511        self.logger.info(
512            "Beginning FF %s with %s paths against file %s. No match results will be held.",
513            pathsname,
514            len(paths),
515            filename,
516        )
517        crt = self.run_time_str(pathsname)
518        #
519        # run starts here
520        #
521        self.results_manager.start_run(
522            run_dir=crt, pathsname=pathsname, filename=filename
523        )
524        #
525        #
526        #
527        results = []
528        for i, path in enumerate(paths):
529            csvpath = self.csvpath()
530            self.logger.debug("Beginning to FF CsvPath instance: %s", csvpath)
531            result = Result(
532                csvpath=csvpath,
533                file_name=filename,
534                paths_name=pathsname,
535                run_index=i,
536                run_time=self.current_run_time,
537                run_dir=crt,
538            )
539            try:
540                self._load_csvpath(
541                    csvpath=csvpath,
542                    path=path,
543                    file=file,
544                    pathsname=pathsname,
545                    filename=filename,
546                    crt=crt,
547                )
548                #
549                # if run-mode: no-run we skip ahead without saving results
550                #
551                if not csvpath.will_run:
552                    continue
553                #
554                # the add has to come after _load_csvpath because we need the identity or index
555                # to be stable and the identity is found in load, if it exists.
556                #
557                self.results_manager.add_named_result(result)
558                self.logger.info(
559                    "Parsed csvpath %s pointed at %s and starting to fast-forward",
560                    i,
561                    file,
562                )
563                csvpath.fast_forward()
564                self.logger.info(
565                    "Completed fast forward of csvpath %s against %s", i, file
566                )
567            except Exception as ex:  # pylint: disable=W0718
568                # ex.trace = traceback.format_exc()
569                # ex.source = self
570                self.error_manager.handle_error(source=self, msg=f"{ex}")
571                if self.ecoms.do_i_raise():
572                    self.results_manager.save(result)
573                    raise
574            self.results_manager.save(result)
575            results.append(result)
576        #
577        # run ends here
578        #
579        self.results_manager.complete_run(
580            run_dir=crt, pathsname=pathsname, results=results
581        )
582        self.clear_run_coordination()
583        self.logger.info(
584            "Completed fast_forward_paths %s with %s paths", pathsname, len(paths)
585        )

Sequentially does a CsvPath.fast_forward() on filename for every named path. No matches are collected.

def next_paths(self, *, pathsname, filename, collect: bool = False):
587    def next_paths(
588        self, *, pathsname, filename, collect: bool = False
589    ):  # pylint: disable=R0914
590        """Does a CsvPath.next() on filename for every line against every named path in sequence"""
591        paths = self.paths_manager.get_named_paths(pathsname)
592        file = self.file_manager.get_named_file(filename)
593        self.logger.info("Prepping %s and %s", filename, pathsname)
594        self.clean(paths=pathsname)
595        self.logger.info("Beginning next_paths with %s paths", len(paths))
596        crt = self.run_time_str(pathsname)
597        #
598        # run starts here
599        #
600        self.results_manager.start_run(
601            run_dir=crt, pathsname=pathsname, filename=filename
602        )
603        #
604        #
605        #
606        results = []
607        for i, path in enumerate(paths):
608            if self._skip_all:
609                skip_err = "Found the skip-all signal set. skip_all() is"
610                skip_err = f"{skip_err} only for breadth-first runs using the"
611                skip_err = f"{skip_err} '_by_line' methods. It has the same"
612                skip_err = f"{skip_err} effect as skip() in a"
613                skip_err = f"{skip_err} serial run like this one."
614                self.logger.error(skip_err)
615            if self._stop_all:
616                self.logger.warning("Stop-all set. Shutting down run.")
617                break
618            if self._advance_all > 0:
619                advance_err = "Found the advance-all signal set. advance_all() is"
620                advance_err = f"{advance_err} only for breadth-first runs using the"
621                advance_err = f"{advance_err} '_by_line' methods. It has the same"
622                advance_err = f"{advance_err} effect as advance() in a"
623                advance_err = f"{advance_err} serial run like this one."
624                self.logger.error(advance_err)
625            csvpath = self.csvpath()
626            result = Result(
627                csvpath=csvpath,
628                file_name=filename,
629                paths_name=pathsname,
630                run_index=i,
631                run_time=self.current_run_time,
632                run_dir=crt,
633            )
634            if self._fail_all:
635                self.logger.warning(
636                    "Fail-all set. Failing all remaining CsvPath instances in the run."
637                )
638                csvpath.is_valid = False
639            try:
640                self._load_csvpath(
641                    csvpath=csvpath,
642                    path=path,
643                    file=file,
644                    pathsname=pathsname,
645                    filename=filename,
646                    crt=crt,
647                )
648                #
649                # if run-mode: no-run we skip ahead without saving results
650                #
651                if not csvpath.will_run:
652                    continue
653                #
654                # the add has to come after _load_csvpath because we need the identity or index
655                # to be stable and the identity is found in load, if it exists.
656                #
657                self.results_manager.add_named_result(result)
658                for line in csvpath.next():
659                    #
660                    # removed dec 1. why was this? it doesn't seem to make sense and
661                    # removing it doesn't break any unit tests. was it a mistake?
662                    #
663                    # line.append(result)
664                    if collect:
665                        result.append(line)
666                        result.unmatched = csvpath.unmatched
667                    yield line
668            except Exception as ex:  # pylint: disable=W0718
669                self.error_manager.handle_error(source=self, msg=f"{ex}")
670                if self.ecoms.do_i_raise():
671                    self.results_manager.save(result)
672                    raise
673            self.results_manager.save(result)
674            results.append(result)
675        #
676        # run ends here
677        #
678        self.results_manager.complete_run(
679            run_dir=crt, pathsname=pathsname, results=results
680        )
681        self.clear_run_coordination()

Does a CsvPath.next() on filename for every line against every named path in sequence

def collect_by_line( self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False):
685    def collect_by_line(
686        self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False
687    ):
688        """Does a CsvPath.collect() on filename where each row is considered
689        by every named path before the next row starts
690
691        next_by_line for if_all_agree and collect_when_not_matched.
692        """
693        self.logger.info(
694            "Starting collect_by_line for paths: %s and file: %s", pathsname, filename
695        )
696        lines = []
697        for line in self.next_by_line(  # pylint: disable=W0612
698            pathsname=pathsname,
699            filename=filename,
700            collect=True,
701            if_all_agree=if_all_agree,
702            collect_when_not_matched=collect_when_not_matched,
703        ):
704            # re: W0612: we need 'line' in order to do the iteration. we have to iterate.
705            lines.append(line)
706        self.logger.info(
707            "Completed collect_by_line for paths: %s and file: %s", pathsname, filename
708        )
709        #
710        # the results have all the lines according to what CsvPath captured them, but
711        # since we're doing if_all_agree T/F we should return the union here. for some
712        # files this obviously makes the data in memory problem even bigger, but it's
713        # operator's responsibility to know if that will be a problem for their use
714        # case.
715        #
716        return lines

Does a CsvPath.collect() on filename where each row is considered by every named path before the next row starts

next_by_line for if_all_agree and collect_when_not_matched.

def fast_forward_by_line( self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False):
718    def fast_forward_by_line(
719        self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False
720    ):
721        """Does a CsvPath.fast_forward() on filename where each row is
722        considered by every named path before the next row starts
723
724        next_by_line for if_all_agree and collect_when_not_matched.
725        """
726        self.logger.info(
727            "Starting fast_forward_by_line for paths: %s and file: %s",
728            pathsname,
729            filename,
730        )
731        for line in self.next_by_line(  # pylint: disable=W0612
732            pathsname=pathsname,
733            filename=filename,
734            collect=False,
735            if_all_agree=if_all_agree,
736            collect_when_not_matched=collect_when_not_matched,
737        ):
738            # re: W0612: we need 'line' in order to do the iteration. we have to iterate.
739            pass
740        self.logger.info(
741            "Completed fast_forward_by_line for paths: %s and file: %s",
742            pathsname,
743            filename,
744        )

Does a CsvPath.fast_forward() on filename where each row is considered by every named path before the next row starts

next_by_line for if_all_agree and collect_when_not_matched.

def next_by_line( self, *, pathsname, filename, collect: bool = False, if_all_agree=False, collect_when_not_matched=False) -> List[Any]:
746    def next_by_line(  # pylint: disable=R0912,R0915,R0914
747        self,
748        *,
749        pathsname,
750        filename,
751        collect: bool = False,
752        if_all_agree=False,
753        collect_when_not_matched=False,
754    ) -> List[Any]:
755        """Does a CsvPath.next() on filename where each row is considered
756        by every named path before the next row starts.
757
758        if_all_agree=True means all the CsvPath instances must match for
759        the line to be kept. However, every CsvPath instance will keep its
760        own matches in its results regardless of if every line kept was
761        returned to the caller by CsvPaths.
762
763        collect_when_not_matched=True inverts the match so that lines
764        which did not match are returned, rather than the default behavior.
765        """
766        # re: R0912 -- absolutely. plan to refactor.
767        self.logger.info("Prepping %s and %s", filename, pathsname)
768        self.clean(paths=pathsname)
769        fn = self.file_manager.get_named_file(filename)
770        paths = self.paths_manager.get_named_paths(pathsname)
771        if (
772            paths is None or not isinstance(paths, list) or len(paths) == 0
773        ):  # pragma: no cover
774            raise InputException(
775                f"Pathsname '{pathsname}' must name a list of csvpaths"
776            )
777        #
778        # experiment!
779        #
780        crt = self.run_time_str(pathsname)
781        #
782        # also use of crt below
783        #
784        csvpath_objects = self._load_csvpath_objects(
785            paths=paths,
786            named_file=fn,
787            collect_when_not_matched=collect_when_not_matched,
788            filename=filename,
789            pathsname=pathsname,
790            crt=crt,
791        )
792        #
793        # prep has to come after _load_csvpath_objects because we need the identity or
794        # indexes to be stable and the identity is found in the load, if it exists.
795        #
796        self._prep_csvpath_results(
797            csvpath_objects=csvpath_objects,
798            filename=filename,
799            pathsname=pathsname,
800            crt=crt,
801        )
802        #
803        # setting fn into the csvpath is less obviously useful at CsvPaths
804        # but we'll do it for consistency.
805        #
806        self.logger.info("Beginning next_by_line with %s paths", len(csvpath_objects))
807        reader = FileManager.get_reader(
808            fn, delimiter=self.delimiter, quotechar=self.quotechar
809        )
810        stopped_count: List[int] = []
811        for line in reader.next():
812            # for line in reader:  # pylint: disable=R1702
813            # question to self: should this default be in a central place
814            # so that we can switch to OR, in part by changing the default?
815            keep = if_all_agree
816            self._skip_all = False
817            self._advance_all = 0
818            try:
819                # p is a (CsvPath, List[List[str]]) where the second item is
820                # the line-by-line results of the first item's matching
821                for p in csvpath_objects:
822                    #
823                    # if run-mode: no-run we skip ahead without saving results
824                    #
825                    if not p[0].will_run:
826                        continue
827                    self.current_matcher = p[0]
828                    if self._fail_all:
829                        self.logger.warning(
830                            "Fail-all set. Setting CsvPath is_valid to False."
831                        )
832                        self.current_matcher.is_valid = False
833                    if self._stop_all:
834                        self.logger.warning("Stop-all set. Shutting down run.")
835                        self.current_matcher.stopped = True
836                        continue
837                    if self._skip_all:
838                        self.logger.warning("Skip-all set. Continuing to next.")
839                        #
840                        # all following CsvPaths must have their
841                        # line_monitors incremented
842                        #
843                        self.current_matcher.track_line(line)
844                        continue
845                    if self._advance_all > 0:
846                        logtxt = "Advance-all set. Setting advance. "
847                        logtxt = f"{logtxt}CsvPath and its Matcher will handle the advancing."
848                        self.logger.info(logtxt)
849                        #
850                        # CsvPath will handle advancing so we don't need to do
851                        # anything, including track_line(line). we just need to
852                        # see if we're setting advance or increasing it.
853                        #
854                        a = self.current_matcher.advance_count
855                        if self._advance_all > a:
856                            self.current_matcher.advance_count = self._advance_all
857                        #
858                        # all following CsvPaths must have their
859                        # advance incremented -- with the advance not being simply
860                        # additive, have to be mindful of any existing advance
861                        # count!
862                        #
863                    if self.current_matcher.stopped:  # pylint: disable=R1724
864                        continue
865
866                    #
867                    # allowing the match to happen regardless of keep
868                    # because we may want side-effects or to have different
869                    # results in different named-results, as well as the
870                    # union
871                    #
872                    self.logger.debug(
873                        "considering line with csvpath identified as: %s",
874                        self.current_matcher.identity,
875                    )
876                    matched = False
877                    self.current_matcher.track_line(line)
878                    #
879                    # re: W0212: treating _consider_line something like package private
880                    #
881                    matched = (
882                        self.current_matcher._consider_line(  # pylint:disable=W0212
883                            line
884                        )
885                    )
886                    if self.current_matcher.stopped:
887                        stopped_count.append(1)
888                    if if_all_agree:
889                        keep = keep and matched
890                    else:
891                        keep = keep or matched
892                    #
893                    # not doing continue if we have if_all_agree and not keep as we
894                    # used to do allows individual results to have lines that in
895                    # aggregate we do not keep.
896                    #
897                    if matched and collect:
898                        line = self.current_matcher.limit_collection(line)
899                        p[1].append(line)
900            except Exception as ex:  # pylint: disable=W0718
901                # ex.trace = traceback.format_exc()
902                # ex.source = self
903                self.error_manager.handle_error(source=self, msg=f"{ex}")
904                if self.ecoms.do_i_raise():
905                    for r in csvpath_objects:
906                        result = r[1]
907                        result.unmatched = r[0].unmatched
908                        self.results_manager.save(result)
909                    raise
910            # we yield even if we stopped in this iteration.
911            # caller needs to see what we stopped on.
912            #
913            # ! we only yield if keep is True
914            #
915            if keep:
916                yield line
917            if sum(stopped_count) == len(csvpath_objects):
918                break
919        results = []
920        for r in csvpath_objects:
921            result = r[1]
922            results.append(result)
923            result.unmatched = r[0].unmatched
924            self.results_manager.save(result)
925
926        #
927        # run ends here
928        #
929        self.results_manager.complete_run(
930            run_dir=results[0].run_dir, pathsname=pathsname, results=results
931        )
932        self.clear_run_coordination()

Does a CsvPath.next() on filename where each row is considered by every named path before the next row starts.

if_all_agree=True means all the CsvPath instances must match for the line to be kept. However, every CsvPath instance will keep its own matches in its results regardless of if every line kept was returned to the caller by CsvPaths.

collect_when_not_matched=True inverts the match so that lines which did not match are returned, rather than the default behavior.