csvpath
The CsvPath Framework makes it easy to pre-board external data files. It sits between Managed File Transfer and the data lake or applications. CsvPath provides durable dataset identification, validation and canonicalization, and stages data for internal use as a known-good raw data source. The goal is to automate the pre-boarding process.
CsvPath Language is the core validation and canonicalization engine of the Framework. The validation files can be developed without coding using the CLI. When you are ready to automate pre-boarding you will use the classes documented here, in particular csvpath.CsvPath, csvpath.CsvPaths, and the managers in csvpath.managers:
- csvpath.managers.files.file_manager
- csvpath.managers.paths.paths_manager
- csvpath.managers.results.results_manager
You access the managers from a csvpath.CsvPaths instance. You should not construct your own.
There are many other classes you could potentially use in some specific and narrow cases, such as building a new integration, a new function, or a new type of printer. But for 99% of automation use cases these classes are all you need.
1""" 2 The CsvPath Framework makes it easy to pre-board external data files. It sits between Managed File Transfer and the data lake or applications. CsvPath provides durable dataset identification, validation and canonicalization, and stages data for internal use as a known-good raw data source. The goal is to automate the pre-boarding process. 3 4CsvPath Language is the core validation and canonicalization engine of the Framework. The validation files can be developed without coding using the CLI. When you are ready to automate pre-boarding you will use the classes documented here, in particular csvpath.CsvPath, csvpath.CsvPaths, and the managers in csvpath.managers: 5 6- csvpath.managers.files.file_manager 7- csvpath.managers.paths.paths_manager 8- csvpath.managers.results.results_manager 9 10You access the managers from a csvpath.CsvPaths instance. You should not construct your own. 11 12There are many other classes you could potentially use in some specific and narrow cases, such as building a new integration, a new function, or a new type of printer. But for 99% of automation use cases these classes are all you need. 13""" 14 15from csvpath.csvpath import CsvPath 16from csvpath.csvpaths import CsvPaths 17 18__all__ = ["CsvPath", "CsvPaths"]
39class CsvPath(ErrorCollector, Printer): # pylint: disable=R0902, R0904 40 """CsvPath represents a csvpath string that contains a reference to 41 a file, scanning instructions, and rules for matching lines. 42 """ 43 44 # re R0902, R0904: reasonable, but not a priority 45 46 def __init__( # pylint: disable=R0913 47 self, 48 *, 49 csvpaths=None, 50 delimiter=",", 51 quotechar='"', 52 skip_blank_lines=True, 53 print_default=True, 54 config=None, 55 error_manager=None, 56 ): 57 # re: R0913: all reasonable pass-ins with sensible defaults 58 # 59 # the config.ini file loaded as a ConfigParser instance 60 # 61 # definitely do not want this coming from CsvPaths because 62 # we want to be able to override config.ini specifically for 63 # this instance, if needed; however, we do want to be able 64 # to pass in a config object that has been configured in some 65 # way. 66 self._config = config 67 self.scanner = None 68 """ @private """ 69 self.matcher = None 70 """ @private """ 71 # 72 # a parent CsvPaths may manage a CsvPath instance. if so, it will enable 73 # the use of named files and named paths, print capture, error handling, 74 # results collection, reference handling, etc. if a CsvPaths is not present 75 # the CsvPath instance is responsible for all its own upkeep and does not 76 # have some of those capabilities. 77 # 78 self.csvpaths = csvpaths 79 """ @private """ 80 # 81 # there are two logger components one for CsvPath and one for CsvPaths. 82 # the default levels are set in config.ini. to change the levels pass LogUtility 83 # your component instance and the logging level. e.g.: 84 # LogUtility.logger(csvpath, "debug") 85 # 86 self.logger = LogUtility.logger(self) 87 """ @private """ 88 self.logger.info("initialized CsvPath") 89 # 90 # if we don't have a csvpaths these will both be None 91 # 92 self.named_paths_name = None 93 """ @private """ 94 self.named_file_name = None 95 """ @private """ 96 # 97 # all errors come to our manager if they occur during matching. we use 98 # the CsvPaths manager if possible. Otherwise, we just make our own that 99 # only knows how to collect errors, not distribute them. 100 # 101 self.ecoms = ErrorCommunications(csvpath=self) 102 """ @private """ 103 self.error_manager = ErrorManager(csvpath=self) 104 if csvpaths is not None: 105 self.error_manager.add_internal_listener(csvpaths.error_manager) 106 # 107 # modes are set in external comments 108 # 109 self.modes = ModeController(self) 110 """ @private """ 111 # 112 # captures the number of lines up front and tracks line stats as the 113 # run progresses 114 # 115 self._line_monitor = None 116 # 117 # the scanning part of the csvpath. e.g. $test.csv[*] 118 # 119 self.scan = None 120 """ @private """ 121 # 122 # the matching part of the csvpath. e.g. [yes()] 123 # 124 self.match = None 125 """ @private """ 126 # 127 # when True the lines that do not match are returned from next() 128 # and collect(). this effectively switches CsvPath from being an 129 # create an OR expression in this case. in the default, we say: 130 # are all of these things true? 131 # but when collect_when_not_matched is True we ask: 132 # are any of these things not true? 133 # 134 # self._when_not_matched = False 135 self._headers = None 136 self.variables: Dict[str, Any] = {} 137 self.delimiter = delimiter 138 self.quotechar = quotechar 139 # 140 # a blank line has no headers. it has no data. physically it is 2 \n with 141 # nothing but whitespace between them. any data or any delimiters would make 142 # the line non-blank. 143 # 144 self.skip_blank_lines = skip_blank_lines 145 # 146 # in the case of a [*] scan where the last line is blank we would miss firing 147 # last() unless we take steps. instead, we allow that line to match, but we 148 # do not return a line to the caller of next() and we freeze the variables 149 # there is room for side effects make changes, but that a reasonable compromise 150 # between missing last and allowing unwanted changes. we definitely do not 151 # freeze is_valid or stop, which can be useful signaling, even in an 152 # inconsistent state. 153 # 154 self._freeze_path = False 155 # 156 # counts are 1-based 157 # 158 self.scan_count = 0 159 self.match_count = 0 160 # 161 # used by stop() and advance(). a stopped CsvPath halts without finishing 162 # its run. an advancing CsvPath doesn't consider the match part of the 163 # csvpath and does not incur any side effects as it progresses through the 164 # rows the advance skips. the skip() function has the same effect as 165 # advance(1) but without any guarantee that the other match components on 166 # the line will be considered before skipping ahead. there are likely 167 # corner cases where an onmatch qualifier or some other constraint will 168 # trigger match components that would otherwise be skipped so the ability 169 # to shortcut some of the match should not be relied on for anything 170 # critical. 171 # 172 self.stopped = False 173 self._advance = 0 174 # 175 # the lines var will hold a reference to a LineSpooler instance during a 176 # run, if lines are being collected by the collect() method. if the user 177 # is using this CsvPath instance directly the LineSpooler is only there 178 # as a proxy to the list of lines being collected -- we don't spool to 179 # disk, at least atm. 180 # 181 self.lines = None 182 """ @private """ 183 # 184 # set by fail() 185 # 186 self._is_valid = True 187 # 188 # basic timing for the CsvPath instance only. if the CsvPath is managed 189 # by a CsvPaths the timings for a run may include time spent by other 190 # CsvPath instances. 191 # 192 self.last_row_time = -1 193 """ @private """ 194 self.rows_time = -1 195 """ @private """ 196 self.total_iteration_time = -1 197 """ @private """ 198 # 199 # limiting collection means returning fewer headers (values in the 200 # line, a.k.a columns) then are available. limiting headers returned 201 # can impact named results, reset_headers(), and other considerations. 202 # 203 self._limit_collection_to = [] 204 # 205 # error collecting is at the CsvPath instance by default. CsvPath 206 # instances that are managed by a CsvPaths have their errors collected 207 # by their Results as well. Result handles persistence. 208 # 209 # errors policies are set in config.ini at CsvPath and CsvPaths levels. 210 # 211 self._errors: List[Error] = [] 212 # 213 # saves the scan and match parts of paths for reference. mainly helpful 214 # for testing the CsvPath library itself; not used end users. the run 215 # name becomes the file name of the saved path parts. 216 # 217 self._save_scan_dir = None 218 self._save_match_dir = None 219 self._run_name = None 220 # 221 # metadata is collected from "outer" csvpath comments. outer comments 222 # separate from the comments within the match part of the csvpath. 223 # the keys are words with colons. e.g. ~ name: my new csvpath ~ 224 # 225 self.metadata: Dict[str, Any] = {} 226 # 227 # holds the current match count while we're in the middle of a match 228 # so that anyone who wants to can increase the match count using 229 # raise_match_count_if(). it is important to do the raise asap so that 230 # components that are onmatched have the right match count available. 231 # 232 self._current_match_count = 0 233 # 234 # printers receive print lines from the print function. the default 235 # printer prints to standard out. a CsvPath that is managed by a 236 # CsvPaths has its Results as a printer, as well as having 237 # the default printer. 238 # 239 self.printers = [] 240 """ @private """ 241 if print_default: 242 self.printers.append(StdOutPrinter()) 243 # 244 # _function_times_match collects the time a function spends doing its matches() 245 # 246 self._function_times_match = {} 247 # 248 # _function_times_value collects the time a function spends doing its to_value() 249 # 250 self._function_times_value = {} 251 self._created_at = datetime.now(timezone.utc) 252 self._run_started_at = None 253 self._collecting = False 254 # 255 # holds the unmatched lines when lines are being collected and 256 # _unmatched_available is True. it is analogous to the lines returned 257 # by collect(), but is the lines not returned by collect(). 258 # 259 self._unmatched = None 260 261 # 262 # this method saves and reloads the config. if you don't want that use 263 # CsvPath.config.save_to_config(). 264 # 265 def add_to_config(self, section, key, value) -> None: 266 """@private""" 267 self.config.add_to_config(section=section, key=key, value=value) 268 self.config.save_config() 269 self.config.reload() 270 271 @property 272 def data_from_preceding(self) -> bool: 273 """@private""" 274 return self.modes.source_mode.value 275 276 @data_from_preceding.setter 277 def data_from_preceding(self, dfp: bool) -> None: 278 """@private""" 279 self.modes.source_mode.value = dfp 280 281 @property 282 def unmatched(self) -> list[list[Any]]: 283 """@private""" 284 return self._unmatched 285 286 @unmatched.setter 287 def unmatched(self, lines: list[list[Any]]) -> None: 288 """@private""" 289 self._unmatched = lines 290 291 @property 292 def collecting(self) -> bool: 293 """@private""" 294 return self._collecting 295 296 @collecting.setter 297 def collecting(self, c: bool) -> None: 298 """@private""" 299 self._collecting = c 300 301 @property 302 def unmatched_available(self) -> bool: 303 """@private""" 304 return self.modes.unmatched_mode.value 305 306 @unmatched_available.setter 307 def unmatched_available(self, ua: bool) -> None: 308 """@private""" 309 self.modes.unmatched_mode.value = ua 310 311 @property 312 def created_at(self) -> datetime: 313 """@private""" 314 return self._created_at 315 316 @property 317 def run_started_at(self) -> datetime: 318 """@private""" 319 return self._run_started_at 320 321 @property 322 def will_run(self) -> bool: 323 """@private""" 324 return self.modes.run_mode.value 325 326 @will_run.setter 327 def will_run(self, mode) -> None: 328 """@private""" 329 self.modes.run_mode.value = mode 330 331 # 332 # increases the total accumulated time spent doing c.matches() by t 333 # 334 def up_function_time_match(self, c, t) -> None: 335 """@private""" 336 if c not in self.function_times_match: 337 self.function_times_match[c] = 0 338 st = self.function_times_match[c] 339 st += t 340 self.function_times_match[c] = st 341 342 @property 343 def function_times_match(self) -> int: 344 """@private""" 345 return self._function_times_match 346 347 # 348 # increases the total accumulated time spent doing c.to_value() by t 349 # 350 def up_function_time_value(self, c, t) -> None: 351 """@private""" 352 if c not in self.function_times_value: 353 self.function_times_value[c] = 0 354 st = self.function_times_value[c] 355 st += t 356 self.function_times_value[c] = st 357 358 @property 359 def function_times_value(self) -> int: 360 """@private""" 361 return self._function_times_value 362 363 def do_i_raise(self) -> bool: 364 """@private""" 365 return self.ecoms.do_i_raise() 366 367 @property 368 def advance_count(self) -> int: # pragma: no cover 369 """@private""" 370 return self._advance 371 372 @advance_count.setter 373 def advance_count(self, lines: int) -> None: 374 """@private""" 375 self._advance = lines 376 377 @property 378 def headers(self) -> List[str]: 379 """@private""" 380 if self._headers is None: 381 self.get_total_lines_and_headers() 382 return self._headers 383 384 @headers.setter 385 def headers(self, headers: List[str]) -> None: 386 """@private""" 387 self._headers = headers 388 389 @property 390 def line_monitor(self) -> LineMonitor: 391 """@private""" 392 if self._line_monitor is None: 393 self.get_total_lines_and_headers() 394 return self._line_monitor 395 396 @line_monitor.setter 397 def line_monitor(self, lm) -> None: 398 """@private""" 399 self._line_monitor = lm 400 401 @property 402 def AND(self) -> bool: # pylint: disable=C0103 403 return self.modes.logic_mode.value 404 405 @AND.setter 406 def AND(self, a: bool) -> bool: # pylint: disable=C0103 407 self.modes.logic_mode.value = a 408 409 @property 410 def OR(self) -> bool: # pylint: disable=C0103 411 return not self.modes.logic_mode.value 412 413 @OR.setter 414 def OR(self, a: bool) -> bool: # pylint: disable=C0103 415 self.modes.logic_mode.value = not a 416 417 @property 418 def identity(self) -> str: 419 """returns id or name if found in metadata. 420 421 the id or name gets into metadata primarily if found 422 in an "external" comment in the csvpath. "external" 423 meaning outside the []s. comments are keyword:comment. 424 we take id, Id, ID and name, Name, NAME. 425 426 id is preferred over name. E.g. in: 427 ~ name: my path description: an example id: this value wins ~ 428 the id becomes the identity of the instance. 429 430 we prefer in this order: all-lower most, Initial-caps, 431 ALL-CAPS least 432 433 the ordering is relied on in Result and possibly 434 elsewhere. 435 """ 436 ret = None 437 if not self.metadata: 438 ret = "" 439 if "NAME" in self.metadata: 440 ret = self.metadata["NAME"] 441 if "Name" in self.metadata: 442 ret = self.metadata["Name"] 443 if "name" in self.metadata: 444 ret = self.metadata["name"] 445 if "ID" in self.metadata: 446 ret = self.metadata["ID"] 447 if "Id" in self.metadata: 448 ret = self.metadata["Id"] 449 if "id" in self.metadata: 450 ret = self.metadata["id"] 451 return ret 452 453 @property 454 def config(self) -> Config: # pylint: disable=C0116 455 """@private""" 456 if not self._config: 457 self._config = Config() 458 return self._config 459 460 # ========================== 461 # Errors 462 # <thinking> if we have a csvpaths people should look at the result to find errors 463 # but we give access to metadata, vars, etc. from the csvpath, so we should 464 # give errors too. that means we need to have our own listener. ultimately we'd 465 # just be adding pointers, not dup the original error data. 466 # 467 def metadata_update(self, mdata: Metadata) -> None: 468 """@private""" 469 if isinstance(mdata, Error): 470 self.collect_error(mdata) 471 472 @property 473 def errors(self) -> List[Error]: # pylint: disable=C0116 474 return self._errors 475 476 @property 477 def errors_count(self) -> int: # pylint: disable=C0116 478 return len(self.errors) 479 480 def collect_error(self, error: Error) -> None: # pylint: disable=C0116 481 """@private""" 482 self.errors.append(error) 483 484 def has_errors(self) -> bool: 485 return self.errors_count > 0 486 487 @property 488 def stop_on_validation_errors(self) -> bool: 489 """@private""" 490 return self.modes.validation_mode.stop_on_validation_errors 491 492 @property 493 def fail_on_validation_errors(self) -> bool: 494 """@private""" 495 return self.modes.validation_mode.fail_on_validation_errors 496 497 @property 498 def print_validation_errors(self) -> bool: 499 """@private""" 500 return self.modes.validation_mode.print_validation_errors 501 502 @property 503 def log_validation_errors(self) -> bool: 504 """@private""" 505 return self.modes.validation_mode.log_validation_errors 506 507 @property 508 def raise_validation_errors(self) -> bool: 509 """@private""" 510 return self.modes.validation_mode.raise_validation_errors 511 512 @property 513 def match_validation_errors(self) -> bool: 514 """@private""" 515 return self.modes.validation_mode.match_validation_errors 516 517 @property 518 def collect_validation_errors(self) -> bool: 519 """@private""" 520 return self.modes.validation_mode.collect_validation_errors 521 522 def add_printer(self, printer) -> None: # pylint: disable=C0116 523 """@private""" 524 if printer not in self.printers: 525 self.printers.append(printer) 526 527 def set_printers(self, printers: List) -> None: # pylint: disable=C0116 528 """@private""" 529 self.printers = printers 530 531 @property 532 def has_default_printer(self) -> bool: 533 """@private""" 534 if not self.printers: 535 self.printers = [] 536 for p in self.printers: 537 if isinstance(p, StdOutPrinter): 538 return True 539 return False 540 541 def print(self, string: str) -> None: # pylint: disable=C0116 542 """@private""" 543 for p in self.printers: 544 p.print(string) 545 546 def print_to(self, name: str, string: str) -> None: 547 """@private""" 548 for p in self.printers: 549 p.print_to(name, string) 550 551 @property 552 def last_line(self): 553 """@private 554 this method only returns the default printer's last_line""" 555 if not self.printers or len(self.printers) == 0: 556 return None 557 return self.printers[0].last_line 558 559 @property 560 def lines_printed(self) -> int: 561 """@private 562 this method only returns the default printer's lines printed""" 563 if not self.printers or len(self.printers) == 0: 564 return -1 565 return self.printers[0].lines_printed 566 567 @property 568 def is_frozen(self) -> bool: 569 """@private 570 True if the instance is matching on its last row only to 571 allow last()s to run; in which case, no variable updates 572 are allowed, along with other limitations.""" 573 return self._freeze_path 574 575 @is_frozen.setter 576 def is_frozen(self, freeze: bool) -> None: 577 """@private""" 578 self._freeze_path = freeze 579 580 @property 581 def explain(self) -> bool: 582 """@private 583 when this property is True CsvPath dumps a match explaination 584 to INFO. this can be expensive. a 25% performance hit wouldn't 585 be unexpected. 586 """ 587 return self.modes.explain_mode.value 588 589 @explain.setter 590 def explain(self, yesno: bool) -> None: 591 """@private""" 592 self.modes.explain_mode.value = yesno 593 594 @property 595 def collect_when_not_matched(self) -> bool: 596 """@private 597 when this property is True CsvPath returns the lines that do not 598 match the matchers match components""" 599 return self.modes.return_mode.collect_when_not_matched 600 601 @collect_when_not_matched.setter 602 def collect_when_not_matched(self, yesno: bool) -> None: 603 """@private 604 when c ollect_when_not_matched is True we return the lines that failed 605 to match, rather than the default behavior of returning the matches. 606 """ 607 self.modes.return_mode.collect_when_not_matched = yesno 608 609 def parse(self, csvpath, disposably=False): 610 """@private 611 displosably is True when a Matcher is needed for some purpose other than 612 the run we were created to do. could be that a match component wanted a 613 parsed csvpath for its own purposes. when True, we create and return the 614 Matcher, but then forget it ever existed. 615 616 when disposably is False we build the scanner and return that 617 """ 618 # 619 # strip off any comments and collect any metadata 620 # CsvPaths will do this earlier but it stripped off 621 # the comments so we won't find them again 622 # 623 csvpath = MetadataParser(self).extract_metadata(instance=self, csvpath=csvpath) 624 self.update_settings_from_metadata() 625 # 626 # 627 # exp! 628 if disposably is False: 629 csvpath = self._update_file_path(csvpath) 630 # 631 # 632 # 633 s, mat = self._find_scan_and_match_parts(csvpath) 634 # 635 # a disposable matcher still needs the match part 636 # 637 self.match = mat 638 if disposably: 639 pass 640 else: 641 self.scan = s 642 self.scanner = Scanner(csvpath=self) 643 self.scanner.parse(s) 644 # 645 # we build a matcher to see if it builds without error. 646 # in principle we could keep this as the actual matcher. 647 # atm, tho, just create a dry-run copy. in some possible 648 # unit tests we may not have a parsable match part. 649 # 650 if disposably: 651 matcher = None 652 if mat: 653 matcher = Matcher(csvpath=self, data=mat, line=None, headers=None) 654 # 655 # if the matcher was requested for some reason beyond our own needs 656 # we just return it and forget it existed. 657 # 658 return matcher 659 if self.scanner.filename is None: 660 raise FileException("Cannot proceed without a filename") 661 self.get_total_lines_and_headers() 662 return self 663 664 def update_settings_from_metadata(self) -> None: 665 """@private""" 666 # 667 # settings: 668 # - logic-mode: AND | OR 669 # - return-mode: matches | no-matches 670 # - print-mode: default | no-default 671 # - validation-mode: (no-)print | log | (no-)raise | quiet | (no-)match 672 # - run-mode: no-run | run 673 # - unmatched-mode: no-keep | keep 674 # - source-mode: preceding | origin 675 # - files-mode: all | no-data | no-unmatched | no-printouts | data | unmatched | errors | meta | vars | printouts 676 # 677 self.modes.update() 678 679 # ===================== 680 # in principle the modes should come through the mode controller like: 681 # self.modes.transfer_mode.value 682 # not wading into that today. low value. 683 # 684 @property 685 def transfer_mode(self) -> str: 686 """@private""" 687 return self.metadata.get("transfer-mode") 688 689 @property 690 def source_mode(self) -> str: 691 """@private""" 692 return self.metadata.get("source-mode") 693 694 @property 695 def error_mode(self) -> str: 696 """@private""" 697 return self.metadata.get("error-mode") 698 699 @property 700 def files_mode(self) -> str: 701 """@private""" 702 return self.metadata.get("files-mode") 703 704 @property 705 def validation_mode(self) -> str: 706 """@private""" 707 return self.metadata.get("validation-mode") 708 709 @property 710 def run_mode(self) -> str: 711 """@private""" 712 return self.metadata.get("run-mode") 713 714 @property 715 def logic_mode(self) -> str: 716 """@private""" 717 return self.metadata.get("logic-mode") 718 719 @property 720 def return_mode(self) -> str: 721 """@private""" 722 return self.modes.get("return-mode") 723 724 @property 725 def explain_mode(self) -> str: 726 """@private""" 727 return self.metadata.get("explain-mode") 728 729 @property 730 def print_mode(self) -> str: 731 """@private""" 732 return self.metadata.get("print-mode") 733 734 @property 735 def unmatched_mode(self) -> str: 736 """@private""" 737 return self.metadata.get("unmatched-mode") 738 739 # ===================== 740 741 @property 742 def transfers(self) -> list[tuple[str, str]]: 743 """@private""" 744 return self.modes.transfer_mode.transfers 745 746 @property 747 def all_expected_files(self) -> list[str]: 748 """@private""" 749 return self.modes.files_mode.all_expected_files 750 751 @all_expected_files.setter 752 def all_expected_files(self, efs: list[str]) -> None: 753 """@private""" 754 self.modes.files_mode.all_expected_files = efs 755 756 def _pick_named_path(self, name, *, specific=None) -> str: 757 """@private""" 758 if not self.csvpaths: 759 raise CsvPathsException("No CsvPaths object available") 760 np = self.csvpaths.paths_manager.get_named_paths(name) 761 if not np: 762 raise CsvPathsException(f"Named-paths '{name}' not found") 763 if len(np) == 0: 764 raise CsvPathsException(f"Named-paths '{name}' has no csvpaths") 765 if len(np) == 1: 766 return np[0] 767 if specific is None: 768 self.logger.warning( 769 "Parse_named_path %s has %s csvpaths. Using just the first one.", 770 name, 771 len(np), 772 ) 773 return np[0] 774 for p in np: 775 # this ends up being redundant to the caller. we do it 1x so it's not 776 # a big lift and is consistent. 777 c = CsvPath(csvpaths=self.csvpaths) 778 MetadataParser(c).extract_metadata(instance=c, csvpath=p) 779 if c.identity == specific: 780 return p 781 self.logger.error( 782 "Cannot find csvpath identified as %s in named-paths %s", specific, name 783 ) 784 raise ParsingException(f"Cannot find path '{specific}' in named-paths '{name}'") 785 786 def parse_named_path(self, name, *, disposably=False, specific=None): 787 """@private 788 disposably is True when a Matcher is needed for some purpose other than 789 the run we were created to do. could be that a match component wanted a 790 parsed csvpath for its own purposes. import() uses this method. 791 when True, we create and return the Matcher, but then forget it ever existed. 792 also note: the path must have a name or full filename. $[*] is not enough. 793 """ 794 if not self.csvpaths: 795 raise CsvPathsException("No CsvPaths object available") 796 797 path = self._pick_named_path(name, specific=specific) 798 c = CsvPath(csvpaths=self.csvpaths) 799 path = MetadataParser(c).extract_metadata(instance=c, csvpath=path) 800 # 801 # exp. oddly this seems to be superfluous 802 # if disposably is False: 803 # path = c._update_file_path(path) 804 # 805 dis = c.parse(path, disposably=disposably) 806 if disposably is True: 807 return dis 808 return None 809 810 def _update_file_path(self, data: str): 811 """@private 812 this method replaces a name (i.e. name in: $name[*[][yes()]) with 813 a file system path, if that name is registered with csvpaths's file 814 manager. if there is no csvpaths no replace happens. if there is a 815 csvpaths but the file manager doesn't know the name, no replace 816 happens. 817 """ 818 if data is None: 819 raise InputException("The csvpath string cannot be None") 820 if self.csvpaths is None: 821 return data 822 name = self._get_name(data) 823 path = self.csvpaths.file_manager.get_named_file(name) 824 if path is None: 825 return data 826 if path == name: 827 return data 828 return data.replace(name, path) 829 830 def _get_name(self, data: str): 831 if self.csvpaths is None: 832 return data 833 data = data.strip() 834 if data[0] == "$": 835 name = data[1 : data.find("[")] 836 return name 837 raise FormatException(f"Must start with '$', not {data[0]}") 838 839 def _find_scan_and_match_parts(self, data): 840 if data is None or not isinstance(data, str): 841 raise InputException("Not a csvpath string") 842 scan = "" 843 matches = "" 844 data = data.strip() 845 i = data.find("]") 846 if i < 0: 847 raise InputException(f"Cannot find the scan part of this csvpath: {data}") 848 if i == len(data) - 1: 849 raise InputException( 850 f"The scan part of this csvpath cannot be last: {data}" 851 ) 852 853 scan = data[0 : i + 1] 854 scan = scan.strip() 855 856 ndata = data[i + 1 :] 857 ndata = ndata.strip() 858 859 if ndata == "": 860 raise InputException(f"There must be a match part of this csvpath: {data}") 861 if ndata[0] != "[": 862 raise InputException(f"Cannot find the match part of this csvpath: {data}") 863 if ndata[len(ndata) - 1] != "]": 864 raise InputException(f"The match part of this csvpath is incorrect: {data}") 865 matches = ndata 866 # 867 # if we're given directory(s) to save to, save the parts 868 # 869 self._save_parts_if(scan, matches) 870 return scan, matches 871 872 def _save_parts_if(self, scan, match): 873 if self._save_scan_dir and self._run_name: 874 with open( 875 os.path.join(self._save_scan_dir, f"{self._run_name}.txt"), 876 "w", 877 encoding="utf-8", 878 ) as f: 879 f.write(scan) 880 if self._save_match_dir and self._run_name: 881 with open( 882 os.path.join(self._save_match_dir, f"{self._run_name}.txt"), 883 "w", 884 encoding="utf-8", 885 ) as f: 886 f.write(match) 887 888 def __str__(self): 889 return f""" 890 path: {self.scanner.path if self.scanner else None} 891 identity: {self.identity} 892 parsers: [scanner=Ply, matcher=Lark, print=Lark] 893 from_line: {self.scanner.from_line if self.scanner else None} 894 to_line: {self.scanner.to_line if self.scanner else None} 895 all_lines: {self.scanner.all_lines if self.scanner else None} 896 these: {self.scanner.these if self.scanner else None} 897 matcher: {self.matcher} 898 variables: {len(self.variables)} 899 metadata: {len(self.metadata)} 900 """ 901 902 @property 903 def is_valid(self) -> bool: # pragma: no cover 904 """Csvpaths can flag a CSV file as invalid using the fail() function""" 905 return self._is_valid 906 907 @is_valid.setter 908 def is_valid(self, tf: bool) -> None: 909 self._is_valid = tf 910 911 @property 912 def completed(self) -> bool: 913 if not self.scanner or not self.line_monitor: 914 return False 915 if self.scanner.is_last(self.line_monitor.physical_line_number): 916 return True 917 return False 918 919 @property 920 def from_line(self): # pragma: no cover pylint: disable=C0116 921 """@private""" 922 if self.scanner is None: 923 raise ParsingException("No scanner available. Have you parsed a csvpath?") 924 return self.scanner.from_line 925 926 @property 927 def to_line(self): # pragma: no cover pylint: disable=C0116 928 """@private""" 929 if self.scanner is None: 930 raise ParsingException("No scanner available. Have you parsed a csvpath?") 931 return self.scanner.to_line 932 933 @property 934 def all_lines(self): # pragma: no cover pylint: disable=C0116 935 """@private""" 936 if self.scanner is None: 937 raise ParsingException("No scanner available. Have you parsed a csvpath?") 938 return self.scanner.all_lines 939 940 @property 941 def path(self): # pragma: no cover pylint: disable=C0116 942 """@private""" 943 if self.scanner is None: 944 raise ParsingException("No scanner available. Have you parsed a csvpath?") 945 return self.scanner.path 946 947 @property 948 def these(self): # pragma: no cover pylint: disable=C0116 949 """@private""" 950 if self.scanner is None: 951 raise ParsingException("No scanner available. Have you parsed a csvpath?") 952 return self.scanner.these 953 954 @property 955 def limit_collection_to(self) -> List[int]: 956 """@private 957 returns the list of headers to collect when a line matches. by default 958 this list is empty and all headers are collected. 959 """ 960 return self._limit_collection_to 961 962 @limit_collection_to.setter 963 def limit_collection_to(self, indexes: List[int]) -> None: 964 """@private""" 965 self._limit_collection_to = indexes 966 self.logger.warning("Setting a limit on headers collected: %s", indexes) 967 968 def stop(self) -> None: 969 """@private""" 970 self.stopped = True 971 972 # 973 # 974 # collect(), fast_forward(), and next() are the central methods of CsvPath. 975 # 976 # remember that since next() is a public method and collect() and fast_forward() 977 # rely on it, we have to cut off exceptions at next(). users will not see 978 # fast_forward and collect in the stack trace. probably that's not a big deal. 979 # if it seems confusing we can work around the problem. 980 # 981 def collect( 982 self, csvpath: str = None, *, nexts: int = -1, lines=None 983 ) -> List[List[Any]] | LineSpooler: 984 """Runs the csvpath forward and returns the matching lines seen as 985 a list of lists. this method does not holds lines locally, not as 986 accessible attributes. lines are not kept after the run completes 987 and the collected lines are returned. 988 989 the optional lines argument may be an instance of any class that has an append(obj) 990 method. if lines is None, a list is returned. 991 """ 992 if self.scanner is None and csvpath is not None: 993 self.parse(csvpath) 994 if nexts < -1: 995 raise ProcessingException( 996 "Input must be >= -1. -1 means collect to the end of the file." 997 ) 998 self.collecting = True 999 # 1000 # we're going to use the passed in lines object. if it exists 1001 # it is a LineSpooler (we presume). we'll associate it with the 1002 # csvpath temporarily so anyone interested can get it. if it 1003 # doesn't exist we create a list for the lines var. we create a 1004 # list LinesSpooler to handle any inquiries during the run we 1005 # do all our line collecting busness with the local lines var 1006 # which has an append method regardless of if it is list or 1007 # LineSpooler. when we're done we break the link with the csvpath 1008 # and return the local variable. if it is a list it is going to 1009 # a csvpath user. if it is a LineSpooler it is going to a 1010 # CsvPaths instance. 1011 # 1012 self.lines = lines 1013 if lines is None: 1014 lines = [] 1015 self.lines = ListLineSpooler(lines=lines) 1016 for _ in self.next(): 1017 _ = _[:] 1018 self.lines.append(_) 1019 if nexts == -1: 1020 continue 1021 if nexts > 1: 1022 nexts -= 1 1023 else: 1024 break 1025 # we don't want to hold on to data more than needed. but 1026 # we do want to return data if we're not spooling. the 1027 # way we do that is to keep the local var available with the 1028 # list and/or the spooler. the caller needs to be aware of 1029 # both possibilities, but both offer __len__ and append. 1030 # 1031 # we keep the self.lines if it is not a list because that 1032 # makes it available to the runtime data collector so we can 1033 # see the line count in the metadata, saving opening a 1034 # potentially large data.csv to find out how many lines. 1035 if isinstance(self.lines, list): 1036 self.lines = None 1037 return lines 1038 1039 def fast_forward(self, csvpath=None): 1040 """Scans to the end of the CSV file. All scanned rows will be 1041 considered for match and variables and side effects will happen, 1042 but no rows will be returned or stored. -1 means to the end of 1043 the file. If you do not pass the csvpath string here you must first 1044 use the parse method.""" 1045 if self.scanner is None and csvpath is not None: 1046 self.parse(csvpath) 1047 for _ in self.next(): 1048 pass 1049 return self 1050 1051 # 1052 # dont_raise is for the use of fast_forward and collect who 1053 # may want to handle raising errors themselves. 1054 # 1055 def next(self, csvpath=None): 1056 """A generator function that steps through the CSV file returning 1057 matching rows.""" 1058 try: 1059 if self.scanner is None and csvpath is not None: 1060 self.parse(csvpath) 1061 start = time.time() 1062 if self.will_run is True: 1063 for line in self._next_line(): 1064 b = self._consider_line(line) 1065 if b: 1066 line = self.limit_collection(line) 1067 if line is None: 1068 msg = "Line cannot be None" 1069 self.logger.error(msg) 1070 raise MatchException(msg) 1071 if len(line) == 0: 1072 msg = "Line cannot be len() == 0" 1073 self.logger.error(msg) 1074 raise MatchException(msg) 1075 yield line 1076 elif self.collecting and self.unmatched_available: 1077 if self.unmatched is None: 1078 self.unmatched = [] 1079 line = self.limit_collection(line) 1080 # we aren't None and 0 checking as above. needed? 1081 self.unmatched.append(line) 1082 if self.stopped: 1083 self.logger.info( 1084 "CsvPath has been stopped at line %s", 1085 self.line_monitor.physical_line_number, 1086 ) 1087 break 1088 else: 1089 self.logger.warning( 1090 "Csvpath identified as {self.identity} is disabled by run-mode:no-run" 1091 ) 1092 self.finalize() 1093 end = time.time() 1094 self.total_iteration_time = end - start 1095 self.logger.info("Run against %s is complete.", self.scanner.filename) 1096 self.logger.info( 1097 "Iteration time was %s", round(self.total_iteration_time, 2) 1098 ) 1099 self.logger.info( 1100 "%s per line", 1101 round( 1102 self.total_iteration_time 1103 / self.line_monitor.physical_end_line_count, 1104 2, 1105 ), 1106 ) 1107 except Exception as e: 1108 if not self.ecoms.do_i_quiet(): 1109 self.logger.error(e, exc_info=True) 1110 if self.ecoms.do_i_raise(): 1111 raise 1112 1113 def _next_line(self) -> List[Any]: 1114 """@private""" 1115 self.logger.info("beginning to scan file: %s", self.scanner.filename) 1116 # 1117 # this exception will blow up a standalone CsvPath but should be 1118 # caught and handled if there is a CsvPaths. 1119 # 1120 # but when would it happen? shouldn't we just let Python's exception 1121 # handle it should it really occur? 1122 # 1123 if self.scanner.filename is None: 1124 raise FileException("There is no filename") 1125 # 1126 # DataFileReader is abstract. instantiating it results in a concrete subclass. 1127 # pylint doesn't like that just because it doesn't see what we're doing. 1128 # otoh, is this a bad way to do it? not sure but it works fine. 1129 # 1130 reader = DataFileReader( # pylint: disable=E0110 1131 self.scanner.filename, delimiter=self.delimiter, quotechar=self.quotechar 1132 ) 1133 for line in reader.next(): 1134 self.track_line(line=line) 1135 yield line 1136 self.finalize() 1137 1138 def finalize(self) -> None: 1139 """@private 1140 clears caches, etc. this is an internal method, but not _ because 1141 it is part of the lifecycle and we might find a reason to call it 1142 from higher up. 1143 """ 1144 # this method can run multiple times w/np, but that 1145 # shouldn't happen anyway. 1146 self._freeze_path = True 1147 if self.matcher: 1148 self.matcher.clear_caches() 1149 1150 def track_line(self, line) -> None: 1151 """@private 1152 csvpaths needs to handle some of the iteration logic, and we don't want 1153 to lose track of line number monitoring or repeat the code up there, 1154 so we need this method to give csvpaths a way to tap in. 1155 """ 1156 last_line = None 1157 if self.matcher: 1158 last_line = self.matcher.line 1159 self.line_monitor.next_line(last_line=last_line, data=line) 1160 if self.line_monitor.physical_line_number == 0: 1161 self._run_started_at = datetime.now(timezone.utc) 1162 1163 def _consider_line(self, line): # pylint: disable=R0912, R0911 1164 """@private""" 1165 # re: R0912: this method has already been refactored but maybe 1166 # there is more we can do? 1167 # 1168 # we always look at the last line so that last() has a 1169 # chance to run 1170 # 1171 # if we're empty, but last, we need to make sure the 1172 # matcher runs a final time so that any last() can run. 1173 # 1174 if self.line_monitor.is_last_line_and_blank(line): 1175 # if self.line_monitor.is_last_line_and_empty(line): 1176 self.logger.info("last line is empty. freezing, matching, returning false") 1177 self._freeze_path = True 1178 self.matches(line) 1179 return False 1180 if self.skip_blank_lines and len(line) == 0: 1181 self.logger.info( 1182 "Skipping line %s because blank", self.line_monitor.physical_line_number 1183 ) 1184 return False 1185 if self.scanner.includes(self.line_monitor.physical_line_number): 1186 self.logger.debug("Scanner includes line") 1187 self.scan_count = self.scan_count + 1 1188 matches = None 1189 self._current_match_count = self.match_count 1190 if self.advance_count > 0: 1191 self.advance_count -= 1 1192 matches = False 1193 self.logger.debug( 1194 "Advancing one line with {self.advance_count} more skips to go" 1195 ) 1196 else: 1197 self.logger.debug("Starting matching") 1198 startmatch = time.perf_counter_ns() 1199 matches = self.matches(line) 1200 endmatch = time.perf_counter_ns() 1201 t = (endmatch - startmatch) / 1000000 1202 self.last_row_time = t 1203 self.rows_time += t 1204 self.logger.debug( 1205 "CsvPath.matches:703: %s: matches: %s", self.identity, matches 1206 ) 1207 # 1208 # if we are done scanning we can stop 1209 # 1210 if self.scanner.is_last(self.line_monitor.physical_line_number): 1211 self.stop() 1212 if matches is True: 1213 # 1214 # _current_match_count is a placeholder that 1215 # allows anyone to call a match early and update 1216 # the count. this is important when there is 1217 # an onmatch component that needs to use the 1218 # match_count. e.g. an onmatch print statement. 1219 # we would want the onmatch to propagate asap. we 1220 # can accept that there could be a variable set to 1221 # match count prior to the onmatch upping the 1222 # count. that wouldn't be great for explainability, 1223 # but order is important -- match components 1224 # impact each other left to right, top to bottom. 1225 # 1226 self.raise_match_count_if() 1227 if self.collect_when_not_matched: 1228 return False 1229 return True 1230 if self.collect_when_not_matched: 1231 return True 1232 return False 1233 return False 1234 1235 def raise_match_count_if(self): 1236 """@private 1237 if the match count has already been raised earlier in the matching 1238 process than the caller we don't raise it; otherwise, we raise.""" 1239 if self._current_match_count == self.match_count: 1240 self.match_count += 1 1241 else: 1242 self.logger.debug("Match count was already raised, so not doing it again") 1243 1244 def limit_collection(self, line: List[Any]) -> List[Any]: 1245 """@private 1246 this method creates a line based on the given line that holds only the headers 1247 that the csvpath says to collect. headers for collection are indicated using 1248 the collect() function. 1249 """ 1250 if len(self.limit_collection_to) == 0: 1251 return line 1252 ls = [] 1253 for k in self.limit_collection_to: 1254 if k is None or k >= len(line): 1255 raise InputException( 1256 f"[{self.identity}] Line {self.line_monitor.physical_line_number}: unknown header name: {k}" 1257 ) 1258 ls.append(line[k]) 1259 return ls 1260 1261 def advance(self, ff: int = -1) -> None: 1262 """@private 1263 Advances the iteration by ff rows. The rows will be seen but not matched.""" 1264 if ff is None: 1265 raise InputException("Input to advance must not be None") 1266 if self.line_monitor.physical_end_line_number is None: 1267 raise ProcessingException( 1268 "The last line number must be known (physical_end_line_number)" 1269 ) 1270 if ff == -1: 1271 a = self.advance_count 1272 a = ( 1273 self.line_monitor.physical_end_line_number 1274 - self.line_monitor.physical_line_number 1275 - a 1276 ) 1277 self.advance_count = a 1278 else: 1279 self.advance_count += ff 1280 self.advance_count = min( 1281 self.advance_count, self.line_monitor.physical_end_line_number 1282 ) 1283 1284 def get_total_lines(self) -> int: # pylint: disable=C0116 1285 """@private""" 1286 if ( 1287 self.line_monitor.physical_end_line_number is None 1288 or self.line_monitor.physical_end_line_number == 0 1289 ): 1290 self.get_total_lines_and_headers() 1291 return self.line_monitor.physical_end_line_number 1292 1293 def get_total_lines_and_headers(self) -> None: # pylint: disable=C0116 1294 """@private""" 1295 if not self.scanner or not self.scanner.filename: 1296 self.logger.error( 1297 "Csvpath identified as %s has no filename. Since we could be error handling an exception is not raised.", 1298 self.identity, 1299 ) 1300 # exp 1301 return 1302 # end exp 1303 # 1304 # there are times, e.g. when using Lambda, when it may be better to 1305 # not use a cache. in the case of Lambda the reason is to avoid working 1306 # around the read-only filesystem. 1307 # 1308 use_cache = self.csvpaths is not None 1309 if use_cache: 1310 uc = self.csvpaths.config.get(section="cache", name="use_cache") 1311 use_cache = uc is None or uc.strip().lower() != "no" 1312 if self.csvpaths and use_cache is True: 1313 self.line_monitor = self.csvpaths.file_manager.cacher.get_new_line_monitor( 1314 self.scanner.filename 1315 ) 1316 self.headers = self.csvpaths.file_manager.cacher.get_original_headers( 1317 self.scanner.filename 1318 ) 1319 else: 1320 lc = LineCounter(self) 1321 lm, headers = lc.get_lines_and_headers(self.scanner.filename) 1322 self.line_monitor = lm 1323 self.headers = headers 1324 1325 @property 1326 def current_scan_count(self) -> int: # pylint: disable=C0116 1327 """@private""" 1328 return self.scan_count 1329 1330 @property 1331 def current_match_count(self) -> int: # pylint: disable=C0116 1332 """@private""" 1333 return self.match_count 1334 1335 def matches(self, line) -> bool: # pylint: disable=C0116 1336 """@private""" 1337 if not self.match: 1338 return True 1339 # 1340 # when we first consider a line we don't have a matcher. we build 1341 # it on the fly. later, we just reset the matcher for the new lines. 1342 # 1343 # when we originally call parse we're just parsing for the scanner: 1344 # 1345 # path = CsvPath() 1346 # path.parse ("$file[*][yes()]") 1347 # path.fast_forward() 1348 # 1349 # "find_file" would be a more intuitive method name. we don't create 1350 # the path's matcher until the 3rd line. by then we're on the 3rd parser 1351 # and 4 parse. 1352 # 1353 if self.matcher is None: 1354 h = hashlib.sha256(self.match.encode("utf-8")).hexdigest() 1355 self.logger.info("Loading matcher with data. match part hash: %s", h) 1356 self.matcher = Matcher( 1357 csvpath=self, data=self.match, line=line, headers=self.headers, myid=h 1358 ) 1359 self.matcher.AND = self.AND 1360 # 1361 # we need to register all the Expressions as error listeners. not 1362 # sure it matters if we do it here or allow the Matcher to do it. 1363 # since the Matcher is responsible for its Expressions, has a handle 1364 # this CsvPath, and through it has the error_manager let's let it 1365 # register the expressions. 1366 # 1367 else: 1368 self.logger.debug("Resetting and reloading matcher") 1369 self.matcher.reset() 1370 self.matcher.line = line 1371 matched = self.matcher.matches() 1372 return matched 1373 1374 def set_variable(self, name: str, *, value: Any, tracking: Any = None) -> None: 1375 """@private 1376 sets a variable and the tracking variable as a key within 1377 it, if a tracking value is provided.""" 1378 if self._freeze_path: 1379 self.logger.warning( 1380 "Run is ending, variables are frozen. Cannot set %s to %s.", name, value 1381 ) 1382 return 1383 if not name: 1384 raise VariableException( 1385 f"Name cannot be None: name: {name}, tracking: {tracking}, value: {value}" 1386 ) 1387 if name.strip() == "": 1388 raise VariableException( 1389 f"""Name cannot be the empty string: 1390 name: {name}, tracking: {tracking}, value: {value}""" 1391 ) 1392 if tracking is not None and f"{tracking}".strip() == "": 1393 raise VariableException( 1394 f"""Tracking value cannot be empty. 1395 name: {name}, tracking: {tracking}, value: {value}""" 1396 ) 1397 if tracking is not None: 1398 if name not in self.variables: 1399 self.variables[name] = {} 1400 instances = self.variables[name] 1401 instances[tracking] = value 1402 else: 1403 self.variables[name] = value 1404 1405 def get_variable( # pylint: disable=R0912 1406 self, name: str, *, tracking: Any = None, set_if_none: Any = None 1407 ) -> Any: 1408 """@private 1409 gets a variable by name. uses the tracking value as a key to get 1410 the value if the variable is a dictionary.""" 1411 # 1412 # re: R0912: totally true. this is a scary method. plan to refactor. 1413 # 1414 if not name: 1415 raise VariableException("Name cannot be None") 1416 if self._freeze_path: 1417 # 1418 # run is ending, no more changes 1419 # 1420 set_if_none = None 1421 thevalue = None 1422 if tracking is not None: 1423 thedict = None 1424 thevalue = None 1425 if name in self.variables: 1426 thedict = self.variables[name] 1427 if not thedict: 1428 thedict = {} 1429 self.variables[name] = thedict 1430 thedict[tracking] = set_if_none 1431 else: 1432 thedict = {} 1433 thedict[tracking] = set_if_none 1434 self.variables[name] = thedict 1435 if isinstance(thedict, dict): 1436 thevalue = thedict.get(tracking) 1437 if not thevalue and set_if_none is not None: 1438 thedict[tracking] = set_if_none 1439 thevalue = set_if_none 1440 else: 1441 if name not in self.variables: 1442 if set_if_none is not None: 1443 self.variables[name] = set_if_none 1444 thevalue = set_if_none 1445 else: 1446 thevalue = self.variables[name] 1447 if self._freeze_path: 1448 if isinstance(thevalue, list): 1449 # 1450 # run is ending, no more changes 1451 # 1452 thevalue = tuple(thevalue[:]) 1453 self.logger.debug( 1454 "Returning %s for frozen variable %s.%s", thevalue, name, tracking 1455 ) 1456 return thevalue 1457 1458 def line_numbers(self) -> Iterator[int | str]: 1459 """@private 1460 returns all the line numbers the scanner will scan during 1461 the run of a csvpath""" 1462 these = self.scanner.these 1463 from_line = self.scanner.from_line 1464 to_line = self.scanner.to_line 1465 all_lines = self.scanner.all_lines 1466 return self._line_numbers( 1467 these=these, from_line=from_line, to_line=to_line, all_lines=all_lines 1468 ) 1469 1470 def _line_numbers( 1471 self, 1472 *, 1473 these: List[int] = None, 1474 from_line: int = None, 1475 to_line: int = None, 1476 all_lines: bool = None, 1477 ) -> Iterator[int | str]: 1478 """@private""" 1479 if these is None: 1480 these = [] 1481 if len(these) > 0: 1482 yield from these 1483 else: 1484 if from_line is not None and to_line is not None and from_line > to_line: 1485 yield from range(to_line, from_line + 1) 1486 elif from_line is not None and to_line is not None: 1487 yield from range(from_line, to_line + 1) 1488 elif from_line is not None: 1489 if all_lines: 1490 yield f"{from_line}..." 1491 else: 1492 yield from_line 1493 elif to_line is not None: 1494 yield f"0..{to_line}" 1495 1496 def collect_line_numbers(self) -> List[int | str]: # pylint: disable=C0116 1497 """@private""" 1498 if self.scanner is None: 1499 raise ParsingException("No scanner available. Have you parsed a csvpath?") 1500 these = self.scanner.these 1501 from_line = self.scanner.from_line 1502 to_line = self.scanner.to_line 1503 all_lines = self.scanner.all_lines 1504 return self._collect_line_numbers( 1505 these=these, from_line=from_line, to_line=to_line, all_lines=all_lines 1506 ) 1507 1508 def _collect_line_numbers( 1509 self, 1510 *, 1511 these: List[int] = None, 1512 from_line: int = None, 1513 to_line: int = None, 1514 all_lines: bool = None, 1515 ) -> List[int | str]: 1516 """@private""" 1517 collect = [] 1518 if these is None: 1519 these = [] 1520 for i in self._line_numbers( 1521 these=these, from_line=from_line, to_line=to_line, all_lines=all_lines 1522 ): 1523 collect.append(i) 1524 return collect 1525 1526 def header_index(self, name: str) -> int: # pylint: disable=C0116 1527 """@private""" 1528 if not self.headers: 1529 return None 1530 for i, n in enumerate(self.headers): 1531 if n == name: 1532 return i 1533 return None
CsvPath represents a csvpath string that contains a reference to a file, scanning instructions, and rules for matching lines.
46 def __init__( # pylint: disable=R0913 47 self, 48 *, 49 csvpaths=None, 50 delimiter=",", 51 quotechar='"', 52 skip_blank_lines=True, 53 print_default=True, 54 config=None, 55 error_manager=None, 56 ): 57 # re: R0913: all reasonable pass-ins with sensible defaults 58 # 59 # the config.ini file loaded as a ConfigParser instance 60 # 61 # definitely do not want this coming from CsvPaths because 62 # we want to be able to override config.ini specifically for 63 # this instance, if needed; however, we do want to be able 64 # to pass in a config object that has been configured in some 65 # way. 66 self._config = config 67 self.scanner = None 68 """ @private """ 69 self.matcher = None 70 """ @private """ 71 # 72 # a parent CsvPaths may manage a CsvPath instance. if so, it will enable 73 # the use of named files and named paths, print capture, error handling, 74 # results collection, reference handling, etc. if a CsvPaths is not present 75 # the CsvPath instance is responsible for all its own upkeep and does not 76 # have some of those capabilities. 77 # 78 self.csvpaths = csvpaths 79 """ @private """ 80 # 81 # there are two logger components one for CsvPath and one for CsvPaths. 82 # the default levels are set in config.ini. to change the levels pass LogUtility 83 # your component instance and the logging level. e.g.: 84 # LogUtility.logger(csvpath, "debug") 85 # 86 self.logger = LogUtility.logger(self) 87 """ @private """ 88 self.logger.info("initialized CsvPath") 89 # 90 # if we don't have a csvpaths these will both be None 91 # 92 self.named_paths_name = None 93 """ @private """ 94 self.named_file_name = None 95 """ @private """ 96 # 97 # all errors come to our manager if they occur during matching. we use 98 # the CsvPaths manager if possible. Otherwise, we just make our own that 99 # only knows how to collect errors, not distribute them. 100 # 101 self.ecoms = ErrorCommunications(csvpath=self) 102 """ @private """ 103 self.error_manager = ErrorManager(csvpath=self) 104 if csvpaths is not None: 105 self.error_manager.add_internal_listener(csvpaths.error_manager) 106 # 107 # modes are set in external comments 108 # 109 self.modes = ModeController(self) 110 """ @private """ 111 # 112 # captures the number of lines up front and tracks line stats as the 113 # run progresses 114 # 115 self._line_monitor = None 116 # 117 # the scanning part of the csvpath. e.g. $test.csv[*] 118 # 119 self.scan = None 120 """ @private """ 121 # 122 # the matching part of the csvpath. e.g. [yes()] 123 # 124 self.match = None 125 """ @private """ 126 # 127 # when True the lines that do not match are returned from next() 128 # and collect(). this effectively switches CsvPath from being an 129 # create an OR expression in this case. in the default, we say: 130 # are all of these things true? 131 # but when collect_when_not_matched is True we ask: 132 # are any of these things not true? 133 # 134 # self._when_not_matched = False 135 self._headers = None 136 self.variables: Dict[str, Any] = {} 137 self.delimiter = delimiter 138 self.quotechar = quotechar 139 # 140 # a blank line has no headers. it has no data. physically it is 2 \n with 141 # nothing but whitespace between them. any data or any delimiters would make 142 # the line non-blank. 143 # 144 self.skip_blank_lines = skip_blank_lines 145 # 146 # in the case of a [*] scan where the last line is blank we would miss firing 147 # last() unless we take steps. instead, we allow that line to match, but we 148 # do not return a line to the caller of next() and we freeze the variables 149 # there is room for side effects make changes, but that a reasonable compromise 150 # between missing last and allowing unwanted changes. we definitely do not 151 # freeze is_valid or stop, which can be useful signaling, even in an 152 # inconsistent state. 153 # 154 self._freeze_path = False 155 # 156 # counts are 1-based 157 # 158 self.scan_count = 0 159 self.match_count = 0 160 # 161 # used by stop() and advance(). a stopped CsvPath halts without finishing 162 # its run. an advancing CsvPath doesn't consider the match part of the 163 # csvpath and does not incur any side effects as it progresses through the 164 # rows the advance skips. the skip() function has the same effect as 165 # advance(1) but without any guarantee that the other match components on 166 # the line will be considered before skipping ahead. there are likely 167 # corner cases where an onmatch qualifier or some other constraint will 168 # trigger match components that would otherwise be skipped so the ability 169 # to shortcut some of the match should not be relied on for anything 170 # critical. 171 # 172 self.stopped = False 173 self._advance = 0 174 # 175 # the lines var will hold a reference to a LineSpooler instance during a 176 # run, if lines are being collected by the collect() method. if the user 177 # is using this CsvPath instance directly the LineSpooler is only there 178 # as a proxy to the list of lines being collected -- we don't spool to 179 # disk, at least atm. 180 # 181 self.lines = None 182 """ @private """ 183 # 184 # set by fail() 185 # 186 self._is_valid = True 187 # 188 # basic timing for the CsvPath instance only. if the CsvPath is managed 189 # by a CsvPaths the timings for a run may include time spent by other 190 # CsvPath instances. 191 # 192 self.last_row_time = -1 193 """ @private """ 194 self.rows_time = -1 195 """ @private """ 196 self.total_iteration_time = -1 197 """ @private """ 198 # 199 # limiting collection means returning fewer headers (values in the 200 # line, a.k.a columns) then are available. limiting headers returned 201 # can impact named results, reset_headers(), and other considerations. 202 # 203 self._limit_collection_to = [] 204 # 205 # error collecting is at the CsvPath instance by default. CsvPath 206 # instances that are managed by a CsvPaths have their errors collected 207 # by their Results as well. Result handles persistence. 208 # 209 # errors policies are set in config.ini at CsvPath and CsvPaths levels. 210 # 211 self._errors: List[Error] = [] 212 # 213 # saves the scan and match parts of paths for reference. mainly helpful 214 # for testing the CsvPath library itself; not used end users. the run 215 # name becomes the file name of the saved path parts. 216 # 217 self._save_scan_dir = None 218 self._save_match_dir = None 219 self._run_name = None 220 # 221 # metadata is collected from "outer" csvpath comments. outer comments 222 # separate from the comments within the match part of the csvpath. 223 # the keys are words with colons. e.g. ~ name: my new csvpath ~ 224 # 225 self.metadata: Dict[str, Any] = {} 226 # 227 # holds the current match count while we're in the middle of a match 228 # so that anyone who wants to can increase the match count using 229 # raise_match_count_if(). it is important to do the raise asap so that 230 # components that are onmatched have the right match count available. 231 # 232 self._current_match_count = 0 233 # 234 # printers receive print lines from the print function. the default 235 # printer prints to standard out. a CsvPath that is managed by a 236 # CsvPaths has its Results as a printer, as well as having 237 # the default printer. 238 # 239 self.printers = [] 240 """ @private """ 241 if print_default: 242 self.printers.append(StdOutPrinter()) 243 # 244 # _function_times_match collects the time a function spends doing its matches() 245 # 246 self._function_times_match = {} 247 # 248 # _function_times_value collects the time a function spends doing its to_value() 249 # 250 self._function_times_value = {} 251 self._created_at = datetime.now(timezone.utc) 252 self._run_started_at = None 253 self._collecting = False 254 # 255 # holds the unmatched lines when lines are being collected and 256 # _unmatched_available is True. it is analogous to the lines returned 257 # by collect(), but is the lines not returned by collect(). 258 # 259 self._unmatched = None
417 @property 418 def identity(self) -> str: 419 """returns id or name if found in metadata. 420 421 the id or name gets into metadata primarily if found 422 in an "external" comment in the csvpath. "external" 423 meaning outside the []s. comments are keyword:comment. 424 we take id, Id, ID and name, Name, NAME. 425 426 id is preferred over name. E.g. in: 427 ~ name: my path description: an example id: this value wins ~ 428 the id becomes the identity of the instance. 429 430 we prefer in this order: all-lower most, Initial-caps, 431 ALL-CAPS least 432 433 the ordering is relied on in Result and possibly 434 elsewhere. 435 """ 436 ret = None 437 if not self.metadata: 438 ret = "" 439 if "NAME" in self.metadata: 440 ret = self.metadata["NAME"] 441 if "Name" in self.metadata: 442 ret = self.metadata["Name"] 443 if "name" in self.metadata: 444 ret = self.metadata["name"] 445 if "ID" in self.metadata: 446 ret = self.metadata["ID"] 447 if "Id" in self.metadata: 448 ret = self.metadata["Id"] 449 if "id" in self.metadata: 450 ret = self.metadata["id"] 451 return ret
returns id or name if found in metadata.
the id or name gets into metadata primarily if found in an "external" comment in the csvpath. "external" meaning outside the []s. comments are keyword:comment. we take id, Id, ID and name, Name, NAME.
id is preferred over name. E.g. in: ~ name: my path description: an example id: this value wins ~ the id becomes the identity of the instance.
we prefer in this order: all-lower most, Initial-caps, ALL-CAPS least
the ordering is relied on in Result and possibly elsewhere.
902 @property 903 def is_valid(self) -> bool: # pragma: no cover 904 """Csvpaths can flag a CSV file as invalid using the fail() function""" 905 return self._is_valid
Csvpaths can flag a CSV file as invalid using the fail() function
981 def collect( 982 self, csvpath: str = None, *, nexts: int = -1, lines=None 983 ) -> List[List[Any]] | LineSpooler: 984 """Runs the csvpath forward and returns the matching lines seen as 985 a list of lists. this method does not holds lines locally, not as 986 accessible attributes. lines are not kept after the run completes 987 and the collected lines are returned. 988 989 the optional lines argument may be an instance of any class that has an append(obj) 990 method. if lines is None, a list is returned. 991 """ 992 if self.scanner is None and csvpath is not None: 993 self.parse(csvpath) 994 if nexts < -1: 995 raise ProcessingException( 996 "Input must be >= -1. -1 means collect to the end of the file." 997 ) 998 self.collecting = True 999 # 1000 # we're going to use the passed in lines object. if it exists 1001 # it is a LineSpooler (we presume). we'll associate it with the 1002 # csvpath temporarily so anyone interested can get it. if it 1003 # doesn't exist we create a list for the lines var. we create a 1004 # list LinesSpooler to handle any inquiries during the run we 1005 # do all our line collecting busness with the local lines var 1006 # which has an append method regardless of if it is list or 1007 # LineSpooler. when we're done we break the link with the csvpath 1008 # and return the local variable. if it is a list it is going to 1009 # a csvpath user. if it is a LineSpooler it is going to a 1010 # CsvPaths instance. 1011 # 1012 self.lines = lines 1013 if lines is None: 1014 lines = [] 1015 self.lines = ListLineSpooler(lines=lines) 1016 for _ in self.next(): 1017 _ = _[:] 1018 self.lines.append(_) 1019 if nexts == -1: 1020 continue 1021 if nexts > 1: 1022 nexts -= 1 1023 else: 1024 break 1025 # we don't want to hold on to data more than needed. but 1026 # we do want to return data if we're not spooling. the 1027 # way we do that is to keep the local var available with the 1028 # list and/or the spooler. the caller needs to be aware of 1029 # both possibilities, but both offer __len__ and append. 1030 # 1031 # we keep the self.lines if it is not a list because that 1032 # makes it available to the runtime data collector so we can 1033 # see the line count in the metadata, saving opening a 1034 # potentially large data.csv to find out how many lines. 1035 if isinstance(self.lines, list): 1036 self.lines = None 1037 return lines
Runs the csvpath forward and returns the matching lines seen as a list of lists. this method does not holds lines locally, not as accessible attributes. lines are not kept after the run completes and the collected lines are returned.
the optional lines argument may be an instance of any class that has an append(obj) method. if lines is None, a list is returned.
1039 def fast_forward(self, csvpath=None): 1040 """Scans to the end of the CSV file. All scanned rows will be 1041 considered for match and variables and side effects will happen, 1042 but no rows will be returned or stored. -1 means to the end of 1043 the file. If you do not pass the csvpath string here you must first 1044 use the parse method.""" 1045 if self.scanner is None and csvpath is not None: 1046 self.parse(csvpath) 1047 for _ in self.next(): 1048 pass 1049 return self
Scans to the end of the CSV file. All scanned rows will be considered for match and variables and side effects will happen, but no rows will be returned or stored. -1 means to the end of the file. If you do not pass the csvpath string here you must first use the parse method.
1055 def next(self, csvpath=None): 1056 """A generator function that steps through the CSV file returning 1057 matching rows.""" 1058 try: 1059 if self.scanner is None and csvpath is not None: 1060 self.parse(csvpath) 1061 start = time.time() 1062 if self.will_run is True: 1063 for line in self._next_line(): 1064 b = self._consider_line(line) 1065 if b: 1066 line = self.limit_collection(line) 1067 if line is None: 1068 msg = "Line cannot be None" 1069 self.logger.error(msg) 1070 raise MatchException(msg) 1071 if len(line) == 0: 1072 msg = "Line cannot be len() == 0" 1073 self.logger.error(msg) 1074 raise MatchException(msg) 1075 yield line 1076 elif self.collecting and self.unmatched_available: 1077 if self.unmatched is None: 1078 self.unmatched = [] 1079 line = self.limit_collection(line) 1080 # we aren't None and 0 checking as above. needed? 1081 self.unmatched.append(line) 1082 if self.stopped: 1083 self.logger.info( 1084 "CsvPath has been stopped at line %s", 1085 self.line_monitor.physical_line_number, 1086 ) 1087 break 1088 else: 1089 self.logger.warning( 1090 "Csvpath identified as {self.identity} is disabled by run-mode:no-run" 1091 ) 1092 self.finalize() 1093 end = time.time() 1094 self.total_iteration_time = end - start 1095 self.logger.info("Run against %s is complete.", self.scanner.filename) 1096 self.logger.info( 1097 "Iteration time was %s", round(self.total_iteration_time, 2) 1098 ) 1099 self.logger.info( 1100 "%s per line", 1101 round( 1102 self.total_iteration_time 1103 / self.line_monitor.physical_end_line_count, 1104 2, 1105 ), 1106 ) 1107 except Exception as e: 1108 if not self.ecoms.do_i_quiet(): 1109 self.logger.error(e, exc_info=True) 1110 if self.ecoms.do_i_raise(): 1111 raise
A generator function that steps through the CSV file returning matching rows.
59class CsvPaths(CsvPathsCoordinator, ErrorCollector): 60 """ 61 a CsvPaths instance manages applying any number of csvpaths 62 to any number of files. CsvPaths applies sets of csvpaths 63 to a given file, on demand. Think of CsvPaths as a session 64 object. It gives you a way to manage files, csvpaths, and 65 the results generated by applying paths to files. It is not 66 intended for concurrent use. If you need multiple threads, 67 create multiple CsvPaths instances. 68 """ 69 70 # pylint: disable=too-many-instance-attributes 71 72 def __init__( 73 self, 74 *, 75 delimiter=",", 76 quotechar='"', 77 skip_blank_lines=True, 78 print_default=True, 79 config: Config = None, 80 ): 81 self._config = Config() if config is None else config 82 # 83 # managers centralize activities, offer async potential, and 84 # are where integrations hook in. ErrorManager functionality 85 # must be available in CsvPath too. The others are CsvPaths 86 # only. 87 # 88 self._paths_manager = None 89 self._file_manager = None 90 self._results_manager = None 91 self._ecoms = None 92 self._error_manager = None 93 self._set_managers() 94 # 95 # TODO: 96 # self.print_manager = ... <<<=== should we do this? 97 # 98 # 99 self.print_default = print_default 100 """ @private """ 101 self.delimiter = delimiter 102 """ @private """ 103 self.quotechar = quotechar 104 """ @private """ 105 self.skip_blank_lines = skip_blank_lines 106 """ @private """ 107 self.current_matcher: CsvPath = None 108 """ @private """ 109 self.logger = LogUtility.logger(self) 110 """ @private """ 111 self._errors = [] 112 # coordinator attributes 113 self._stop_all = False 114 self._fail_all = False 115 self._skip_all = False 116 self._advance_all = 0 117 self._current_run_time = None 118 self._run_time_str = None 119 self.named_paths_name = None 120 """ @private """ 121 self.named_file_name = None 122 """ @private """ 123 # 124 # metrics is for OTLP OpenTelemetry. it should only 125 # be used by the OTLP listener. it is here because 126 # the integration may need a long-lived presence. if 127 # needed, the first OTLP listener will set it up 128 # before spinning up a thread. any other OTLP 129 # listener threads that need to use a long-lived metric 130 # will work with this property. 131 # 132 self.metrics = None 133 """ @private """ 134 self.logger.info("initialized CsvPaths") 135 136 def _set_managers(self) -> None: 137 self.paths_manager = PathsManager(csvpaths=self) 138 self.file_manager = FileManager(csvpaths=self) 139 self.results_manager = ResultsManager(csvpaths=self) 140 self.ecoms = ErrorCommunications(csvpaths=self) 141 self.error_manager = ErrorManager(csvpaths=self) 142 143 @property 144 def ecoms(self) -> ErrorCommunications: 145 """@private""" 146 return self._ecoms 147 148 @ecoms.setter 149 def ecoms(self, ec: ErrorCommunications) -> None: 150 """@private""" 151 self._ecoms = ec 152 153 @property 154 def file_manager(self) -> FileManager: 155 return self._file_manager 156 157 @file_manager.setter 158 def file_manager(self, m: FileManager) -> None: 159 self._file_manager = m 160 161 @property 162 def results_manager(self) -> ResultsManager: 163 return self._results_manager 164 165 @results_manager.setter 166 def results_manager(self, m: ResultsManager) -> None: 167 self._results_manager = m 168 169 @property 170 def paths_manager(self) -> PathsManager: 171 return self._paths_manager 172 173 @paths_manager.setter 174 def paths_manager(self, m: PathsManager) -> None: 175 self._paths_manager = m 176 177 @property 178 def error_manager(self) -> ErrorManager: 179 return self._error_manager 180 181 @error_manager.setter 182 def error_manager(self, em: ErrorManager) -> None: 183 if em.csvpaths is None: 184 raise Exception("CsvPaths cannot be None") 185 self._error_manager = em 186 187 def run_time_str(self, pathsname=None) -> str: 188 """@private 189 adds the stringified current run time to the named-paths 190 group home_dir to create the run_dir""" 191 if self._run_time_str is None and pathsname is None: 192 raise CsvPathsException( 193 "Cannot have None in both run_time_str and pathsname" 194 ) 195 if self._run_time_str is None: 196 self._run_time_str = self.results_manager.get_run_time_str( 197 pathsname, self.current_run_time 198 ) 199 return self._run_time_str 200 201 @property 202 def current_run_time(self) -> datetime: 203 """@private 204 gets the time marking the start of the run. used to create the run home directory.""" 205 if self._current_run_time is None: 206 self._current_run_time = datetime.now(timezone.utc) 207 return self._current_run_time 208 209 def clear_run_coordination(self) -> None: 210 """@private 211 run coordination is the set of signals that csvpaths send to affect 212 one another through the CsvPaths instance""" 213 self._stop_all = False 214 self._fail_all = False 215 self._skip_all = False 216 self._advance_all = 0 217 self._current_run_time = None 218 self._run_time_str = None 219 self.logger.debug("Cleared run coordination") 220 221 def csvpath(self) -> CsvPath: 222 """Gets a CsvPath object primed with a reference to this CsvPaths""" 223 path = CsvPath( 224 csvpaths=self, 225 delimiter=self.delimiter, 226 quotechar=self.quotechar, 227 skip_blank_lines=self.skip_blank_lines, 228 # 229 # in the usual case we don't want csvpaths and its csvpath children 230 # to share the same config. sharing doesn't offer much. the flexibility 231 # of having separate configs is valuable. 232 # 233 config=None, 234 print_default=self.print_default, 235 error_manager=self.error_manager, 236 ) 237 return path 238 239 def stop_all(self) -> None: # pragma: no cover 240 self._stop_all = True 241 242 def fail_all(self) -> None: # pragma: no cover 243 self._fail_all = True 244 245 def skip_all(self) -> None: # pragma: no cover 246 self._skip_all = True 247 248 def advance_all(self, lines: int) -> None: # pragma: no cover 249 self._advance_all = lines 250 251 @property 252 def errors(self) -> List[Error]: # pylint: disable=C0116 253 """@private 254 generally you should be looking at results_manager or error_manager for errors. 255 """ 256 return self._errors 257 258 def collect_error(self, error: Error) -> None: # pylint: disable=C0116 259 """@private""" 260 self._errors.append(error) 261 262 def has_errors(self) -> bool: # pylint: disable=C0116 263 """@private 264 generally you should be looking at results_manager or error_manager for errors. 265 """ 266 return len(self._errors) > 0 267 268 @property 269 def config(self) -> Config: # pylint: disable=C0116 270 """@private""" 271 if not self._config: 272 self._config = Config() # pragma: no cover 273 return self._config 274 275 # 276 # this is the preferred way to update config. it is preferred because 277 # csvpath and csvpaths work off the same config file, even though they, 278 # in some cases, have separate keys. if you update the config directly 279 # before a run starts using the CsvPaths's Config you have to remember 280 # to save and reload for it to effect both CsvPaths and CsvPath. this 281 # method does the save and reload every time. 282 # 283 def add_to_config(self, section, key, value) -> None: 284 """@private""" 285 self.config.add_to_config(section=section, key=key, value=value) 286 self.config.save_config() 287 self.config.reload() 288 self._set_managers() 289 290 def clean(self, *, paths) -> None: 291 """@private 292 at this time we do not recommend reusing CsvPaths, but it is doable 293 you should clean before reuse unless you want to accumulate results.""" 294 self.results_manager.clean_named_results(paths) 295 self.clear_run_coordination() 296 # self.error_manager.reset() 297 self._errors = [] 298 self.named_paths_name = None 299 self.named_file_name = None 300 301 def collect_paths(self, *, pathsname, filename) -> None: 302 """ 303 Sequentially does a CsvPath.collect() on filename for every named path. lines are collected into results.""" 304 paths = self.paths_manager.get_named_paths(pathsname) 305 if paths is None: 306 raise InputException(f"No named-paths found for {pathsname}") 307 if len(paths) == 0: 308 raise InputException(f"Named-paths group {pathsname} is empty") 309 if "" in paths: 310 raise InputException( 311 f"Named-paths group {pathsname} has one or more empty csvpaths" 312 ) 313 file = self.file_manager.get_named_file(filename) 314 if file is None: 315 raise InputException(f"No named-file found for {filename}") 316 self.logger.info("Prepping %s and %s", filename, pathsname) 317 self.clean(paths=pathsname) 318 self.logger.info( 319 "Beginning collect_paths %s with %s paths", pathsname, len(paths) 320 ) 321 crt = self.run_time_str(pathsname) 322 results = [] 323 # 324 # run starts here 325 # 326 self.results_manager.start_run( 327 run_dir=crt, pathsname=pathsname, filename=filename 328 ) 329 # 330 # 331 # 332 for i, path in enumerate(paths): 333 csvpath = self.csvpath() 334 if not csvpath.will_run: 335 continue 336 result = Result( 337 csvpath=csvpath, 338 file_name=filename, 339 paths_name=pathsname, 340 run_index=i, 341 run_time=self.current_run_time, 342 run_dir=crt, 343 ) 344 # casting a broad net because if "raise" not in the error policy we 345 # want to never fail during a run 346 try: 347 self._load_csvpath( 348 csvpath=csvpath, 349 path=path, 350 file=file, 351 pathsname=pathsname, 352 filename=filename, 353 crt=crt, 354 ) 355 # 356 # if run-mode: no-run we skip ahead without saving results 357 # 358 if not csvpath.will_run: 359 continue 360 # 361 # the add has to come after _load_csvpath because we need the identity or index 362 # to be stable and the identity is found in load, if it exists. 363 # 364 self.results_manager.add_named_result(result) 365 lines = result.lines 366 self.logger.debug("Collecting lines using a %s", type(lines)) 367 csvpath.collect(lines=lines) 368 if lines is None: 369 self.logger.error( # pragma: no cover 370 "Unexpected None for lines after collect_paths: file: %s, match: %s", 371 file, 372 csvpath.match, 373 ) 374 # 375 # this is obviously not a good idea for very large files! 376 # 377 result.unmatched = csvpath.unmatched 378 except Exception as ex: # pylint: disable=W0718 379 # ex.trace = traceback.format_exc() 380 # ex.source = self 381 if self.error_manager.csvpaths is None: 382 raise Exception("ErrorManager's CsvPaths cannot be None") 383 self.error_manager.handle_error(source=self, msg=f"{ex}") 384 if self.ecoms.do_i_raise(): 385 self.results_manager.save(result) 386 raise 387 self.results_manager.save(result) 388 results.append(result) 389 # 390 # run ends here 391 # 392 self.results_manager.complete_run( 393 run_dir=crt, pathsname=pathsname, results=results 394 ) 395 # 396 # update/write run manifests here 397 # - validity (are all paths valid) 398 # - paths-completeness (did they all run and complete) 399 # - method (collect, fast_forward, next) 400 # - timestamp 401 # 402 self.clear_run_coordination() 403 self.logger.info( 404 "Completed collect_paths %s with %s paths", pathsname, len(paths) 405 ) 406 407 def _load_csvpath( 408 self, 409 *, 410 csvpath: CsvPath, 411 path: str, 412 file: str, 413 pathsname: str = None, 414 filename, 415 by_line: bool = False, 416 crt: str, 417 ) -> None: 418 """@private""" 419 # file is the physical file (+/- if preceding mode) filename is the named-file name 420 self.logger.debug("Beginning to load csvpath %s with file %s", path, file) 421 csvpath.named_paths_name = pathsname 422 self.named_paths_name = pathsname 423 csvpath.named_file_name = filename 424 self.named_file_name = filename 425 # 426 # by_line==True means we are starting a run that is breadth-first ultimately using 427 # next_by_line(). by_line runs cannot be source-mode preceding and have different 428 # semantics around csvpaths influencing one another. 429 # 430 # we strip comments from above the path so we need to extract them first 431 path = MetadataParser(self).extract_metadata(instance=csvpath, csvpath=path) 432 identity = csvpath.identity 433 self.logger.debug("Csvpath %s after metadata extract: %s", identity, path) 434 # update the settings using the metadata fields we just collected 435 csvpath.update_settings_from_metadata() 436 # 437 # if we have a reference, resolve it. we may not actually use the file 438 # if we're in source-mode: preceding, but that doesn't matter from the 439 # pov of the reference. 440 # 441 if filename.startswith("$"): 442 self.logger.debug( 443 "File name is a reference: %s. Replacing the path passed in with the reffed data file path.", 444 filename, 445 ) 446 file = self.results_manager.data_file_for_reference(filename, not_name=crt) 447 # 448 # 449 # 450 if csvpath.data_from_preceding is True: 451 if by_line is True: 452 raise CsvPathsException( 453 "Breadth-first runs do not support source-mode preceding because each line of data flows through each csvpath in order already" 454 ) 455 # 456 # we are in source-mode: preceding 457 # that means we ignore the original data file path and 458 # instead use the data.csv from the preceding csvpath. that is, 459 # assuming there is a preceding csvpath and it created and 460 # saved data. 461 # 462 # find the preceding csvpath in the named-paths 463 # 464 # we may be in a file reference like: $sourcemode.csvpaths.source2:from. if so 465 # we just need the named-paths name. 466 # 467 if pathsname.startswith("$"): 468 self.logger.debug( 469 "Named-paths name is a reference: %s. Stripping it down to just the actual named-paths name.", 470 pathsname, 471 ) 472 pathsname = pathsname[1 : pathsname.find(".")] 473 result = self.results_manager.get_last_named_result( 474 name=pathsname, before=csvpath.identity 475 ) 476 if result is not None: 477 # get its data.csv path for this present run 478 # swap in that path for the regular origin path 479 file = result.data_file_path 480 csvpath.metadata["source-mode-source"] = file 481 self.logger.info( 482 "Csvpath identified as %s uses last csvpath's data.csv at %s as source", 483 csvpath.identity, 484 file, 485 ) 486 else: 487 self.logger.warning( 488 "Cannot find a preceding data file to use for csvpath identified as %s running in source-mode", 489 csvpath.identity, 490 ) 491 f = path.find("[") 492 self.logger.debug("Csvpath matching part starts at char # %s", f) 493 apath = f"${file}{path[f:]}" 494 self.logger.info("Parsing csvpath %s", apath) 495 csvpath.parse(apath) 496 # 497 # ready to run. time to register the run. this is separate from 498 # the run_register.py (ResultsRegister) event 499 # 500 # crt = self.run_time_str(pathsname) 501 # fingerprint = self.file_manager.get_fingerprint_for_name(filename) 502 # 503 self.logger.debug("Done loading csvpath") 504 505 def fast_forward_paths(self, *, pathsname, filename): 506 """Sequentially does a CsvPath.fast_forward() on filename for every named path. No matches are collected.""" 507 paths = self.paths_manager.get_named_paths(pathsname) 508 file = self.file_manager.get_named_file(filename) 509 self.logger.info("Prepping %s and %s", filename, pathsname) 510 self.clean(paths=pathsname) 511 self.logger.info( 512 "Beginning FF %s with %s paths against file %s. No match results will be held.", 513 pathsname, 514 len(paths), 515 filename, 516 ) 517 crt = self.run_time_str(pathsname) 518 # 519 # run starts here 520 # 521 self.results_manager.start_run( 522 run_dir=crt, pathsname=pathsname, filename=filename 523 ) 524 # 525 # 526 # 527 results = [] 528 for i, path in enumerate(paths): 529 csvpath = self.csvpath() 530 self.logger.debug("Beginning to FF CsvPath instance: %s", csvpath) 531 result = Result( 532 csvpath=csvpath, 533 file_name=filename, 534 paths_name=pathsname, 535 run_index=i, 536 run_time=self.current_run_time, 537 run_dir=crt, 538 ) 539 try: 540 self._load_csvpath( 541 csvpath=csvpath, 542 path=path, 543 file=file, 544 pathsname=pathsname, 545 filename=filename, 546 crt=crt, 547 ) 548 # 549 # if run-mode: no-run we skip ahead without saving results 550 # 551 if not csvpath.will_run: 552 continue 553 # 554 # the add has to come after _load_csvpath because we need the identity or index 555 # to be stable and the identity is found in load, if it exists. 556 # 557 self.results_manager.add_named_result(result) 558 self.logger.info( 559 "Parsed csvpath %s pointed at %s and starting to fast-forward", 560 i, 561 file, 562 ) 563 csvpath.fast_forward() 564 self.logger.info( 565 "Completed fast forward of csvpath %s against %s", i, file 566 ) 567 except Exception as ex: # pylint: disable=W0718 568 # ex.trace = traceback.format_exc() 569 # ex.source = self 570 self.error_manager.handle_error(source=self, msg=f"{ex}") 571 if self.ecoms.do_i_raise(): 572 self.results_manager.save(result) 573 raise 574 self.results_manager.save(result) 575 results.append(result) 576 # 577 # run ends here 578 # 579 self.results_manager.complete_run( 580 run_dir=crt, pathsname=pathsname, results=results 581 ) 582 self.clear_run_coordination() 583 self.logger.info( 584 "Completed fast_forward_paths %s with %s paths", pathsname, len(paths) 585 ) 586 587 def next_paths( 588 self, *, pathsname, filename, collect: bool = False 589 ): # pylint: disable=R0914 590 """Does a CsvPath.next() on filename for every line against every named path in sequence""" 591 paths = self.paths_manager.get_named_paths(pathsname) 592 file = self.file_manager.get_named_file(filename) 593 self.logger.info("Prepping %s and %s", filename, pathsname) 594 self.clean(paths=pathsname) 595 self.logger.info("Beginning next_paths with %s paths", len(paths)) 596 crt = self.run_time_str(pathsname) 597 # 598 # run starts here 599 # 600 self.results_manager.start_run( 601 run_dir=crt, pathsname=pathsname, filename=filename 602 ) 603 # 604 # 605 # 606 results = [] 607 for i, path in enumerate(paths): 608 if self._skip_all: 609 skip_err = "Found the skip-all signal set. skip_all() is" 610 skip_err = f"{skip_err} only for breadth-first runs using the" 611 skip_err = f"{skip_err} '_by_line' methods. It has the same" 612 skip_err = f"{skip_err} effect as skip() in a" 613 skip_err = f"{skip_err} serial run like this one." 614 self.logger.error(skip_err) 615 if self._stop_all: 616 self.logger.warning("Stop-all set. Shutting down run.") 617 break 618 if self._advance_all > 0: 619 advance_err = "Found the advance-all signal set. advance_all() is" 620 advance_err = f"{advance_err} only for breadth-first runs using the" 621 advance_err = f"{advance_err} '_by_line' methods. It has the same" 622 advance_err = f"{advance_err} effect as advance() in a" 623 advance_err = f"{advance_err} serial run like this one." 624 self.logger.error(advance_err) 625 csvpath = self.csvpath() 626 result = Result( 627 csvpath=csvpath, 628 file_name=filename, 629 paths_name=pathsname, 630 run_index=i, 631 run_time=self.current_run_time, 632 run_dir=crt, 633 ) 634 if self._fail_all: 635 self.logger.warning( 636 "Fail-all set. Failing all remaining CsvPath instances in the run." 637 ) 638 csvpath.is_valid = False 639 try: 640 self._load_csvpath( 641 csvpath=csvpath, 642 path=path, 643 file=file, 644 pathsname=pathsname, 645 filename=filename, 646 crt=crt, 647 ) 648 # 649 # if run-mode: no-run we skip ahead without saving results 650 # 651 if not csvpath.will_run: 652 continue 653 # 654 # the add has to come after _load_csvpath because we need the identity or index 655 # to be stable and the identity is found in load, if it exists. 656 # 657 self.results_manager.add_named_result(result) 658 for line in csvpath.next(): 659 # 660 # removed dec 1. why was this? it doesn't seem to make sense and 661 # removing it doesn't break any unit tests. was it a mistake? 662 # 663 # line.append(result) 664 if collect: 665 result.append(line) 666 result.unmatched = csvpath.unmatched 667 yield line 668 except Exception as ex: # pylint: disable=W0718 669 self.error_manager.handle_error(source=self, msg=f"{ex}") 670 if self.ecoms.do_i_raise(): 671 self.results_manager.save(result) 672 raise 673 self.results_manager.save(result) 674 results.append(result) 675 # 676 # run ends here 677 # 678 self.results_manager.complete_run( 679 run_dir=crt, pathsname=pathsname, results=results 680 ) 681 self.clear_run_coordination() 682 683 # =============== breadth first processing ================ 684 685 def collect_by_line( 686 self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False 687 ): 688 """Does a CsvPath.collect() on filename where each row is considered 689 by every named path before the next row starts 690 691 next_by_line for if_all_agree and collect_when_not_matched. 692 """ 693 self.logger.info( 694 "Starting collect_by_line for paths: %s and file: %s", pathsname, filename 695 ) 696 lines = [] 697 for line in self.next_by_line( # pylint: disable=W0612 698 pathsname=pathsname, 699 filename=filename, 700 collect=True, 701 if_all_agree=if_all_agree, 702 collect_when_not_matched=collect_when_not_matched, 703 ): 704 # re: W0612: we need 'line' in order to do the iteration. we have to iterate. 705 lines.append(line) 706 self.logger.info( 707 "Completed collect_by_line for paths: %s and file: %s", pathsname, filename 708 ) 709 # 710 # the results have all the lines according to what CsvPath captured them, but 711 # since we're doing if_all_agree T/F we should return the union here. for some 712 # files this obviously makes the data in memory problem even bigger, but it's 713 # operator's responsibility to know if that will be a problem for their use 714 # case. 715 # 716 return lines 717 718 def fast_forward_by_line( 719 self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False 720 ): 721 """Does a CsvPath.fast_forward() on filename where each row is 722 considered by every named path before the next row starts 723 724 next_by_line for if_all_agree and collect_when_not_matched. 725 """ 726 self.logger.info( 727 "Starting fast_forward_by_line for paths: %s and file: %s", 728 pathsname, 729 filename, 730 ) 731 for line in self.next_by_line( # pylint: disable=W0612 732 pathsname=pathsname, 733 filename=filename, 734 collect=False, 735 if_all_agree=if_all_agree, 736 collect_when_not_matched=collect_when_not_matched, 737 ): 738 # re: W0612: we need 'line' in order to do the iteration. we have to iterate. 739 pass 740 self.logger.info( 741 "Completed fast_forward_by_line for paths: %s and file: %s", 742 pathsname, 743 filename, 744 ) 745 746 def next_by_line( # pylint: disable=R0912,R0915,R0914 747 self, 748 *, 749 pathsname, 750 filename, 751 collect: bool = False, 752 if_all_agree=False, 753 collect_when_not_matched=False, 754 ) -> List[Any]: 755 """Does a CsvPath.next() on filename where each row is considered 756 by every named path before the next row starts. 757 758 if_all_agree=True means all the CsvPath instances must match for 759 the line to be kept. However, every CsvPath instance will keep its 760 own matches in its results regardless of if every line kept was 761 returned to the caller by CsvPaths. 762 763 collect_when_not_matched=True inverts the match so that lines 764 which did not match are returned, rather than the default behavior. 765 """ 766 # re: R0912 -- absolutely. plan to refactor. 767 self.logger.info("Prepping %s and %s", filename, pathsname) 768 self.clean(paths=pathsname) 769 fn = self.file_manager.get_named_file(filename) 770 paths = self.paths_manager.get_named_paths(pathsname) 771 if ( 772 paths is None or not isinstance(paths, list) or len(paths) == 0 773 ): # pragma: no cover 774 raise InputException( 775 f"Pathsname '{pathsname}' must name a list of csvpaths" 776 ) 777 # 778 # experiment! 779 # 780 crt = self.run_time_str(pathsname) 781 # 782 # also use of crt below 783 # 784 csvpath_objects = self._load_csvpath_objects( 785 paths=paths, 786 named_file=fn, 787 collect_when_not_matched=collect_when_not_matched, 788 filename=filename, 789 pathsname=pathsname, 790 crt=crt, 791 ) 792 # 793 # prep has to come after _load_csvpath_objects because we need the identity or 794 # indexes to be stable and the identity is found in the load, if it exists. 795 # 796 self._prep_csvpath_results( 797 csvpath_objects=csvpath_objects, 798 filename=filename, 799 pathsname=pathsname, 800 crt=crt, 801 ) 802 # 803 # setting fn into the csvpath is less obviously useful at CsvPaths 804 # but we'll do it for consistency. 805 # 806 self.logger.info("Beginning next_by_line with %s paths", len(csvpath_objects)) 807 reader = FileManager.get_reader( 808 fn, delimiter=self.delimiter, quotechar=self.quotechar 809 ) 810 stopped_count: List[int] = [] 811 for line in reader.next(): 812 # for line in reader: # pylint: disable=R1702 813 # question to self: should this default be in a central place 814 # so that we can switch to OR, in part by changing the default? 815 keep = if_all_agree 816 self._skip_all = False 817 self._advance_all = 0 818 try: 819 # p is a (CsvPath, List[List[str]]) where the second item is 820 # the line-by-line results of the first item's matching 821 for p in csvpath_objects: 822 # 823 # if run-mode: no-run we skip ahead without saving results 824 # 825 if not p[0].will_run: 826 continue 827 self.current_matcher = p[0] 828 if self._fail_all: 829 self.logger.warning( 830 "Fail-all set. Setting CsvPath is_valid to False." 831 ) 832 self.current_matcher.is_valid = False 833 if self._stop_all: 834 self.logger.warning("Stop-all set. Shutting down run.") 835 self.current_matcher.stopped = True 836 continue 837 if self._skip_all: 838 self.logger.warning("Skip-all set. Continuing to next.") 839 # 840 # all following CsvPaths must have their 841 # line_monitors incremented 842 # 843 self.current_matcher.track_line(line) 844 continue 845 if self._advance_all > 0: 846 logtxt = "Advance-all set. Setting advance. " 847 logtxt = f"{logtxt}CsvPath and its Matcher will handle the advancing." 848 self.logger.info(logtxt) 849 # 850 # CsvPath will handle advancing so we don't need to do 851 # anything, including track_line(line). we just need to 852 # see if we're setting advance or increasing it. 853 # 854 a = self.current_matcher.advance_count 855 if self._advance_all > a: 856 self.current_matcher.advance_count = self._advance_all 857 # 858 # all following CsvPaths must have their 859 # advance incremented -- with the advance not being simply 860 # additive, have to be mindful of any existing advance 861 # count! 862 # 863 if self.current_matcher.stopped: # pylint: disable=R1724 864 continue 865 866 # 867 # allowing the match to happen regardless of keep 868 # because we may want side-effects or to have different 869 # results in different named-results, as well as the 870 # union 871 # 872 self.logger.debug( 873 "considering line with csvpath identified as: %s", 874 self.current_matcher.identity, 875 ) 876 matched = False 877 self.current_matcher.track_line(line) 878 # 879 # re: W0212: treating _consider_line something like package private 880 # 881 matched = ( 882 self.current_matcher._consider_line( # pylint:disable=W0212 883 line 884 ) 885 ) 886 if self.current_matcher.stopped: 887 stopped_count.append(1) 888 if if_all_agree: 889 keep = keep and matched 890 else: 891 keep = keep or matched 892 # 893 # not doing continue if we have if_all_agree and not keep as we 894 # used to do allows individual results to have lines that in 895 # aggregate we do not keep. 896 # 897 if matched and collect: 898 line = self.current_matcher.limit_collection(line) 899 p[1].append(line) 900 except Exception as ex: # pylint: disable=W0718 901 # ex.trace = traceback.format_exc() 902 # ex.source = self 903 self.error_manager.handle_error(source=self, msg=f"{ex}") 904 if self.ecoms.do_i_raise(): 905 for r in csvpath_objects: 906 result = r[1] 907 result.unmatched = r[0].unmatched 908 self.results_manager.save(result) 909 raise 910 # we yield even if we stopped in this iteration. 911 # caller needs to see what we stopped on. 912 # 913 # ! we only yield if keep is True 914 # 915 if keep: 916 yield line 917 if sum(stopped_count) == len(csvpath_objects): 918 break 919 results = [] 920 for r in csvpath_objects: 921 result = r[1] 922 results.append(result) 923 result.unmatched = r[0].unmatched 924 self.results_manager.save(result) 925 926 # 927 # run ends here 928 # 929 self.results_manager.complete_run( 930 run_dir=results[0].run_dir, pathsname=pathsname, results=results 931 ) 932 self.clear_run_coordination() 933 934 def _load_csvpath_objects( 935 self, 936 *, 937 paths: List[str], 938 named_file: str, 939 collect_when_not_matched=False, 940 filename, 941 pathsname, 942 crt: str, 943 ): 944 """@private""" 945 csvpath_objects = [] 946 for path in paths: 947 csvpath = self.csvpath() 948 csvpath.collect_when_not_matched = collect_when_not_matched 949 try: 950 self._load_csvpath( 951 csvpath=csvpath, 952 path=path, 953 file=named_file, 954 filename=filename, 955 pathsname=pathsname, 956 by_line=True, 957 crt=crt, 958 ) 959 if csvpath.data_from_preceding is True: 960 # this exception raise may be redundant, but I'm leaving it for now for good measure. 961 raise CsvPathsException( 962 "Csvpath identified as {csvpath.identity} is set to use preceding data, but CsvPaths's by_line methods do not permit that" 963 ) 964 csvpath_objects.append([csvpath, []]) 965 except Exception as ex: # pylint: disable=W0718 966 ex.trace = traceback.format_exc() 967 ex.source = self 968 # the error collector is the Results. it registers itself with 969 # the csvpath as the error collector. not as straightforward but 970 # effectively same as we do above 971 self.error_manager.handle_error(source=self, msg=f"{ex}") 972 return csvpath_objects 973 974 def _prep_csvpath_results(self, *, csvpath_objects, filename, pathsname, crt: str): 975 """@private""" 976 # 977 # run starts here 978 # 979 self.results_manager.start_run( 980 run_dir=crt, pathsname=pathsname, filename=filename 981 ) 982 # 983 # 984 # 985 for i, csvpath in enumerate(csvpath_objects): 986 try: 987 # 988 # Result will set itself into its CsvPath as error collector 989 # printer, etc. 990 # 991 result = Result( 992 csvpath=csvpath[0], 993 file_name=filename, 994 paths_name=pathsname, 995 lines=csvpath[1], 996 run_index=i, 997 run_time=self.current_run_time, 998 run_dir=crt, 999 by_line=True, 1000 ) 1001 csvpath[1] = result 1002 # 1003 # the add has to come after _load_csvpath because we need the identity or index 1004 # to be stable and the identity is found in load, if it exists. 1005 # 1006 self.results_manager.add_named_result(result) 1007 except Exception as ex: # pylint: disable=W0718 1008 """ 1009 ex.trace = traceback.format_exc() 1010 ex.source = self 1011 ErrorHandler(csvpaths=self, error_collector=csvpath).handle_error(ex) 1012 """ 1013 self.error_manager.handle_error(source=self, msg=f"{ex}") 1014 # 1015 # keep this comment for modelines avoidance 1016 #
a CsvPaths instance manages applying any number of csvpaths to any number of files. CsvPaths applies sets of csvpaths to a given file, on demand. Think of CsvPaths as a session object. It gives you a way to manage files, csvpaths, and the results generated by applying paths to files. It is not intended for concurrent use. If you need multiple threads, create multiple CsvPaths instances.
72 def __init__( 73 self, 74 *, 75 delimiter=",", 76 quotechar='"', 77 skip_blank_lines=True, 78 print_default=True, 79 config: Config = None, 80 ): 81 self._config = Config() if config is None else config 82 # 83 # managers centralize activities, offer async potential, and 84 # are where integrations hook in. ErrorManager functionality 85 # must be available in CsvPath too. The others are CsvPaths 86 # only. 87 # 88 self._paths_manager = None 89 self._file_manager = None 90 self._results_manager = None 91 self._ecoms = None 92 self._error_manager = None 93 self._set_managers() 94 # 95 # TODO: 96 # self.print_manager = ... <<<=== should we do this? 97 # 98 # 99 self.print_default = print_default 100 """ @private """ 101 self.delimiter = delimiter 102 """ @private """ 103 self.quotechar = quotechar 104 """ @private """ 105 self.skip_blank_lines = skip_blank_lines 106 """ @private """ 107 self.current_matcher: CsvPath = None 108 """ @private """ 109 self.logger = LogUtility.logger(self) 110 """ @private """ 111 self._errors = [] 112 # coordinator attributes 113 self._stop_all = False 114 self._fail_all = False 115 self._skip_all = False 116 self._advance_all = 0 117 self._current_run_time = None 118 self._run_time_str = None 119 self.named_paths_name = None 120 """ @private """ 121 self.named_file_name = None 122 """ @private """ 123 # 124 # metrics is for OTLP OpenTelemetry. it should only 125 # be used by the OTLP listener. it is here because 126 # the integration may need a long-lived presence. if 127 # needed, the first OTLP listener will set it up 128 # before spinning up a thread. any other OTLP 129 # listener threads that need to use a long-lived metric 130 # will work with this property. 131 # 132 self.metrics = None 133 """ @private """ 134 self.logger.info("initialized CsvPaths")
221 def csvpath(self) -> CsvPath: 222 """Gets a CsvPath object primed with a reference to this CsvPaths""" 223 path = CsvPath( 224 csvpaths=self, 225 delimiter=self.delimiter, 226 quotechar=self.quotechar, 227 skip_blank_lines=self.skip_blank_lines, 228 # 229 # in the usual case we don't want csvpaths and its csvpath children 230 # to share the same config. sharing doesn't offer much. the flexibility 231 # of having separate configs is valuable. 232 # 233 config=None, 234 print_default=self.print_default, 235 error_manager=self.error_manager, 236 ) 237 return path
Gets a CsvPath object primed with a reference to this CsvPaths
301 def collect_paths(self, *, pathsname, filename) -> None: 302 """ 303 Sequentially does a CsvPath.collect() on filename for every named path. lines are collected into results.""" 304 paths = self.paths_manager.get_named_paths(pathsname) 305 if paths is None: 306 raise InputException(f"No named-paths found for {pathsname}") 307 if len(paths) == 0: 308 raise InputException(f"Named-paths group {pathsname} is empty") 309 if "" in paths: 310 raise InputException( 311 f"Named-paths group {pathsname} has one or more empty csvpaths" 312 ) 313 file = self.file_manager.get_named_file(filename) 314 if file is None: 315 raise InputException(f"No named-file found for {filename}") 316 self.logger.info("Prepping %s and %s", filename, pathsname) 317 self.clean(paths=pathsname) 318 self.logger.info( 319 "Beginning collect_paths %s with %s paths", pathsname, len(paths) 320 ) 321 crt = self.run_time_str(pathsname) 322 results = [] 323 # 324 # run starts here 325 # 326 self.results_manager.start_run( 327 run_dir=crt, pathsname=pathsname, filename=filename 328 ) 329 # 330 # 331 # 332 for i, path in enumerate(paths): 333 csvpath = self.csvpath() 334 if not csvpath.will_run: 335 continue 336 result = Result( 337 csvpath=csvpath, 338 file_name=filename, 339 paths_name=pathsname, 340 run_index=i, 341 run_time=self.current_run_time, 342 run_dir=crt, 343 ) 344 # casting a broad net because if "raise" not in the error policy we 345 # want to never fail during a run 346 try: 347 self._load_csvpath( 348 csvpath=csvpath, 349 path=path, 350 file=file, 351 pathsname=pathsname, 352 filename=filename, 353 crt=crt, 354 ) 355 # 356 # if run-mode: no-run we skip ahead without saving results 357 # 358 if not csvpath.will_run: 359 continue 360 # 361 # the add has to come after _load_csvpath because we need the identity or index 362 # to be stable and the identity is found in load, if it exists. 363 # 364 self.results_manager.add_named_result(result) 365 lines = result.lines 366 self.logger.debug("Collecting lines using a %s", type(lines)) 367 csvpath.collect(lines=lines) 368 if lines is None: 369 self.logger.error( # pragma: no cover 370 "Unexpected None for lines after collect_paths: file: %s, match: %s", 371 file, 372 csvpath.match, 373 ) 374 # 375 # this is obviously not a good idea for very large files! 376 # 377 result.unmatched = csvpath.unmatched 378 except Exception as ex: # pylint: disable=W0718 379 # ex.trace = traceback.format_exc() 380 # ex.source = self 381 if self.error_manager.csvpaths is None: 382 raise Exception("ErrorManager's CsvPaths cannot be None") 383 self.error_manager.handle_error(source=self, msg=f"{ex}") 384 if self.ecoms.do_i_raise(): 385 self.results_manager.save(result) 386 raise 387 self.results_manager.save(result) 388 results.append(result) 389 # 390 # run ends here 391 # 392 self.results_manager.complete_run( 393 run_dir=crt, pathsname=pathsname, results=results 394 ) 395 # 396 # update/write run manifests here 397 # - validity (are all paths valid) 398 # - paths-completeness (did they all run and complete) 399 # - method (collect, fast_forward, next) 400 # - timestamp 401 # 402 self.clear_run_coordination() 403 self.logger.info( 404 "Completed collect_paths %s with %s paths", pathsname, len(paths) 405 )
Sequentially does a CsvPath.collect() on filename for every named path. lines are collected into results.
505 def fast_forward_paths(self, *, pathsname, filename): 506 """Sequentially does a CsvPath.fast_forward() on filename for every named path. No matches are collected.""" 507 paths = self.paths_manager.get_named_paths(pathsname) 508 file = self.file_manager.get_named_file(filename) 509 self.logger.info("Prepping %s and %s", filename, pathsname) 510 self.clean(paths=pathsname) 511 self.logger.info( 512 "Beginning FF %s with %s paths against file %s. No match results will be held.", 513 pathsname, 514 len(paths), 515 filename, 516 ) 517 crt = self.run_time_str(pathsname) 518 # 519 # run starts here 520 # 521 self.results_manager.start_run( 522 run_dir=crt, pathsname=pathsname, filename=filename 523 ) 524 # 525 # 526 # 527 results = [] 528 for i, path in enumerate(paths): 529 csvpath = self.csvpath() 530 self.logger.debug("Beginning to FF CsvPath instance: %s", csvpath) 531 result = Result( 532 csvpath=csvpath, 533 file_name=filename, 534 paths_name=pathsname, 535 run_index=i, 536 run_time=self.current_run_time, 537 run_dir=crt, 538 ) 539 try: 540 self._load_csvpath( 541 csvpath=csvpath, 542 path=path, 543 file=file, 544 pathsname=pathsname, 545 filename=filename, 546 crt=crt, 547 ) 548 # 549 # if run-mode: no-run we skip ahead without saving results 550 # 551 if not csvpath.will_run: 552 continue 553 # 554 # the add has to come after _load_csvpath because we need the identity or index 555 # to be stable and the identity is found in load, if it exists. 556 # 557 self.results_manager.add_named_result(result) 558 self.logger.info( 559 "Parsed csvpath %s pointed at %s and starting to fast-forward", 560 i, 561 file, 562 ) 563 csvpath.fast_forward() 564 self.logger.info( 565 "Completed fast forward of csvpath %s against %s", i, file 566 ) 567 except Exception as ex: # pylint: disable=W0718 568 # ex.trace = traceback.format_exc() 569 # ex.source = self 570 self.error_manager.handle_error(source=self, msg=f"{ex}") 571 if self.ecoms.do_i_raise(): 572 self.results_manager.save(result) 573 raise 574 self.results_manager.save(result) 575 results.append(result) 576 # 577 # run ends here 578 # 579 self.results_manager.complete_run( 580 run_dir=crt, pathsname=pathsname, results=results 581 ) 582 self.clear_run_coordination() 583 self.logger.info( 584 "Completed fast_forward_paths %s with %s paths", pathsname, len(paths) 585 )
Sequentially does a CsvPath.fast_forward() on filename for every named path. No matches are collected.
587 def next_paths( 588 self, *, pathsname, filename, collect: bool = False 589 ): # pylint: disable=R0914 590 """Does a CsvPath.next() on filename for every line against every named path in sequence""" 591 paths = self.paths_manager.get_named_paths(pathsname) 592 file = self.file_manager.get_named_file(filename) 593 self.logger.info("Prepping %s and %s", filename, pathsname) 594 self.clean(paths=pathsname) 595 self.logger.info("Beginning next_paths with %s paths", len(paths)) 596 crt = self.run_time_str(pathsname) 597 # 598 # run starts here 599 # 600 self.results_manager.start_run( 601 run_dir=crt, pathsname=pathsname, filename=filename 602 ) 603 # 604 # 605 # 606 results = [] 607 for i, path in enumerate(paths): 608 if self._skip_all: 609 skip_err = "Found the skip-all signal set. skip_all() is" 610 skip_err = f"{skip_err} only for breadth-first runs using the" 611 skip_err = f"{skip_err} '_by_line' methods. It has the same" 612 skip_err = f"{skip_err} effect as skip() in a" 613 skip_err = f"{skip_err} serial run like this one." 614 self.logger.error(skip_err) 615 if self._stop_all: 616 self.logger.warning("Stop-all set. Shutting down run.") 617 break 618 if self._advance_all > 0: 619 advance_err = "Found the advance-all signal set. advance_all() is" 620 advance_err = f"{advance_err} only for breadth-first runs using the" 621 advance_err = f"{advance_err} '_by_line' methods. It has the same" 622 advance_err = f"{advance_err} effect as advance() in a" 623 advance_err = f"{advance_err} serial run like this one." 624 self.logger.error(advance_err) 625 csvpath = self.csvpath() 626 result = Result( 627 csvpath=csvpath, 628 file_name=filename, 629 paths_name=pathsname, 630 run_index=i, 631 run_time=self.current_run_time, 632 run_dir=crt, 633 ) 634 if self._fail_all: 635 self.logger.warning( 636 "Fail-all set. Failing all remaining CsvPath instances in the run." 637 ) 638 csvpath.is_valid = False 639 try: 640 self._load_csvpath( 641 csvpath=csvpath, 642 path=path, 643 file=file, 644 pathsname=pathsname, 645 filename=filename, 646 crt=crt, 647 ) 648 # 649 # if run-mode: no-run we skip ahead without saving results 650 # 651 if not csvpath.will_run: 652 continue 653 # 654 # the add has to come after _load_csvpath because we need the identity or index 655 # to be stable and the identity is found in load, if it exists. 656 # 657 self.results_manager.add_named_result(result) 658 for line in csvpath.next(): 659 # 660 # removed dec 1. why was this? it doesn't seem to make sense and 661 # removing it doesn't break any unit tests. was it a mistake? 662 # 663 # line.append(result) 664 if collect: 665 result.append(line) 666 result.unmatched = csvpath.unmatched 667 yield line 668 except Exception as ex: # pylint: disable=W0718 669 self.error_manager.handle_error(source=self, msg=f"{ex}") 670 if self.ecoms.do_i_raise(): 671 self.results_manager.save(result) 672 raise 673 self.results_manager.save(result) 674 results.append(result) 675 # 676 # run ends here 677 # 678 self.results_manager.complete_run( 679 run_dir=crt, pathsname=pathsname, results=results 680 ) 681 self.clear_run_coordination()
Does a CsvPath.next() on filename for every line against every named path in sequence
685 def collect_by_line( 686 self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False 687 ): 688 """Does a CsvPath.collect() on filename where each row is considered 689 by every named path before the next row starts 690 691 next_by_line for if_all_agree and collect_when_not_matched. 692 """ 693 self.logger.info( 694 "Starting collect_by_line for paths: %s and file: %s", pathsname, filename 695 ) 696 lines = [] 697 for line in self.next_by_line( # pylint: disable=W0612 698 pathsname=pathsname, 699 filename=filename, 700 collect=True, 701 if_all_agree=if_all_agree, 702 collect_when_not_matched=collect_when_not_matched, 703 ): 704 # re: W0612: we need 'line' in order to do the iteration. we have to iterate. 705 lines.append(line) 706 self.logger.info( 707 "Completed collect_by_line for paths: %s and file: %s", pathsname, filename 708 ) 709 # 710 # the results have all the lines according to what CsvPath captured them, but 711 # since we're doing if_all_agree T/F we should return the union here. for some 712 # files this obviously makes the data in memory problem even bigger, but it's 713 # operator's responsibility to know if that will be a problem for their use 714 # case. 715 # 716 return lines
Does a CsvPath.collect() on filename where each row is considered by every named path before the next row starts
next_by_line for if_all_agree and collect_when_not_matched.
718 def fast_forward_by_line( 719 self, *, pathsname, filename, if_all_agree=False, collect_when_not_matched=False 720 ): 721 """Does a CsvPath.fast_forward() on filename where each row is 722 considered by every named path before the next row starts 723 724 next_by_line for if_all_agree and collect_when_not_matched. 725 """ 726 self.logger.info( 727 "Starting fast_forward_by_line for paths: %s and file: %s", 728 pathsname, 729 filename, 730 ) 731 for line in self.next_by_line( # pylint: disable=W0612 732 pathsname=pathsname, 733 filename=filename, 734 collect=False, 735 if_all_agree=if_all_agree, 736 collect_when_not_matched=collect_when_not_matched, 737 ): 738 # re: W0612: we need 'line' in order to do the iteration. we have to iterate. 739 pass 740 self.logger.info( 741 "Completed fast_forward_by_line for paths: %s and file: %s", 742 pathsname, 743 filename, 744 )
Does a CsvPath.fast_forward() on filename where each row is considered by every named path before the next row starts
next_by_line for if_all_agree and collect_when_not_matched.
746 def next_by_line( # pylint: disable=R0912,R0915,R0914 747 self, 748 *, 749 pathsname, 750 filename, 751 collect: bool = False, 752 if_all_agree=False, 753 collect_when_not_matched=False, 754 ) -> List[Any]: 755 """Does a CsvPath.next() on filename where each row is considered 756 by every named path before the next row starts. 757 758 if_all_agree=True means all the CsvPath instances must match for 759 the line to be kept. However, every CsvPath instance will keep its 760 own matches in its results regardless of if every line kept was 761 returned to the caller by CsvPaths. 762 763 collect_when_not_matched=True inverts the match so that lines 764 which did not match are returned, rather than the default behavior. 765 """ 766 # re: R0912 -- absolutely. plan to refactor. 767 self.logger.info("Prepping %s and %s", filename, pathsname) 768 self.clean(paths=pathsname) 769 fn = self.file_manager.get_named_file(filename) 770 paths = self.paths_manager.get_named_paths(pathsname) 771 if ( 772 paths is None or not isinstance(paths, list) or len(paths) == 0 773 ): # pragma: no cover 774 raise InputException( 775 f"Pathsname '{pathsname}' must name a list of csvpaths" 776 ) 777 # 778 # experiment! 779 # 780 crt = self.run_time_str(pathsname) 781 # 782 # also use of crt below 783 # 784 csvpath_objects = self._load_csvpath_objects( 785 paths=paths, 786 named_file=fn, 787 collect_when_not_matched=collect_when_not_matched, 788 filename=filename, 789 pathsname=pathsname, 790 crt=crt, 791 ) 792 # 793 # prep has to come after _load_csvpath_objects because we need the identity or 794 # indexes to be stable and the identity is found in the load, if it exists. 795 # 796 self._prep_csvpath_results( 797 csvpath_objects=csvpath_objects, 798 filename=filename, 799 pathsname=pathsname, 800 crt=crt, 801 ) 802 # 803 # setting fn into the csvpath is less obviously useful at CsvPaths 804 # but we'll do it for consistency. 805 # 806 self.logger.info("Beginning next_by_line with %s paths", len(csvpath_objects)) 807 reader = FileManager.get_reader( 808 fn, delimiter=self.delimiter, quotechar=self.quotechar 809 ) 810 stopped_count: List[int] = [] 811 for line in reader.next(): 812 # for line in reader: # pylint: disable=R1702 813 # question to self: should this default be in a central place 814 # so that we can switch to OR, in part by changing the default? 815 keep = if_all_agree 816 self._skip_all = False 817 self._advance_all = 0 818 try: 819 # p is a (CsvPath, List[List[str]]) where the second item is 820 # the line-by-line results of the first item's matching 821 for p in csvpath_objects: 822 # 823 # if run-mode: no-run we skip ahead without saving results 824 # 825 if not p[0].will_run: 826 continue 827 self.current_matcher = p[0] 828 if self._fail_all: 829 self.logger.warning( 830 "Fail-all set. Setting CsvPath is_valid to False." 831 ) 832 self.current_matcher.is_valid = False 833 if self._stop_all: 834 self.logger.warning("Stop-all set. Shutting down run.") 835 self.current_matcher.stopped = True 836 continue 837 if self._skip_all: 838 self.logger.warning("Skip-all set. Continuing to next.") 839 # 840 # all following CsvPaths must have their 841 # line_monitors incremented 842 # 843 self.current_matcher.track_line(line) 844 continue 845 if self._advance_all > 0: 846 logtxt = "Advance-all set. Setting advance. " 847 logtxt = f"{logtxt}CsvPath and its Matcher will handle the advancing." 848 self.logger.info(logtxt) 849 # 850 # CsvPath will handle advancing so we don't need to do 851 # anything, including track_line(line). we just need to 852 # see if we're setting advance or increasing it. 853 # 854 a = self.current_matcher.advance_count 855 if self._advance_all > a: 856 self.current_matcher.advance_count = self._advance_all 857 # 858 # all following CsvPaths must have their 859 # advance incremented -- with the advance not being simply 860 # additive, have to be mindful of any existing advance 861 # count! 862 # 863 if self.current_matcher.stopped: # pylint: disable=R1724 864 continue 865 866 # 867 # allowing the match to happen regardless of keep 868 # because we may want side-effects or to have different 869 # results in different named-results, as well as the 870 # union 871 # 872 self.logger.debug( 873 "considering line with csvpath identified as: %s", 874 self.current_matcher.identity, 875 ) 876 matched = False 877 self.current_matcher.track_line(line) 878 # 879 # re: W0212: treating _consider_line something like package private 880 # 881 matched = ( 882 self.current_matcher._consider_line( # pylint:disable=W0212 883 line 884 ) 885 ) 886 if self.current_matcher.stopped: 887 stopped_count.append(1) 888 if if_all_agree: 889 keep = keep and matched 890 else: 891 keep = keep or matched 892 # 893 # not doing continue if we have if_all_agree and not keep as we 894 # used to do allows individual results to have lines that in 895 # aggregate we do not keep. 896 # 897 if matched and collect: 898 line = self.current_matcher.limit_collection(line) 899 p[1].append(line) 900 except Exception as ex: # pylint: disable=W0718 901 # ex.trace = traceback.format_exc() 902 # ex.source = self 903 self.error_manager.handle_error(source=self, msg=f"{ex}") 904 if self.ecoms.do_i_raise(): 905 for r in csvpath_objects: 906 result = r[1] 907 result.unmatched = r[0].unmatched 908 self.results_manager.save(result) 909 raise 910 # we yield even if we stopped in this iteration. 911 # caller needs to see what we stopped on. 912 # 913 # ! we only yield if keep is True 914 # 915 if keep: 916 yield line 917 if sum(stopped_count) == len(csvpath_objects): 918 break 919 results = [] 920 for r in csvpath_objects: 921 result = r[1] 922 results.append(result) 923 result.unmatched = r[0].unmatched 924 self.results_manager.save(result) 925 926 # 927 # run ends here 928 # 929 self.results_manager.complete_run( 930 run_dir=results[0].run_dir, pathsname=pathsname, results=results 931 ) 932 self.clear_run_coordination()
Does a CsvPath.next() on filename where each row is considered by every named path before the next row starts.
if_all_agree=True means all the CsvPath instances must match for the line to be kept. However, every CsvPath instance will keep its own matches in its results regardless of if every line kept was returned to the caller by CsvPaths.
collect_when_not_matched=True inverts the match so that lines which did not match are returned, rather than the default behavior.