import logging from io import TextIOWrapper try: from utilsv2 import utils from utilsv2.log import logger from utilsv2.nordic import evtype, parse_event, parse_stations_V1, sttype except ModuleNotFoundError: import utils from log import logger from nordic import evtype, parse_event, parse_stations_V1, sttype logger = logging.getLogger(__name__) def read_file(fname: str) -> TextIOWrapper | OSError: try: fp = open(fname, "r", newline="\n") return fp except FileNotFoundError: return FileNotFoundError("Nenhum ficheiro encontrado") except PermissionError: return PermissionError("Sem permissões para abrir") def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]: event_indices = [] event_start = -1 idx = 0 for line in fp.read().split("\n"): if event_start == -1: event_start = idx if utils.is_empty(line): event_indices.append((event_start, idx)) event_start = -1 idx += 1 logger.info("Found %d events", len(event_indices)) return event_indices def split_event(lines: list[str], start: int, end: int) -> int: for idx in range(start, end): if lines[idx].endswith("7"): return idx return -1 def extract_event( fp: TextIOWrapper, event_bounds: list[tuple[int, int]] ) -> tuple[list[evtype], list[sttype]]: lines = fp.read().split("\n") events, ev_stations = [], [] for event_idx in event_bounds: stations = split_event(lines, event_idx[0], event_idx[1]) if stations == -1: logger.error(f"Could not parse event at pos {event_idx}") continue ev = parse_event(lines[event_idx[0] : stations]) events.append(ev) ev_stations.append( parse_stations_V1(lines[stations + 1 : event_idx[1]], ev["ID"]) ) return events, ev_stations def parse(fname: str): _ret = read_file(fname) if not isinstance(_ret, TextIOWrapper): logger.critical(_ret.__str__()) raise _ret events = find_events(_ret) _ret.seek(0) evs, stations = extract_event(_ret, events) # cleanup _ret.close() return evs, stations