Files
prog-team-proj/utilsv2/parser.py

89 lines
2.4 KiB
Python

import logging
from io import TextIOWrapper
from typing import Any
from utilsv2 import utils
from utilsv2.log import logger
from utilsv2.nordic import evtype, parse_event, parse_stations_V1, sttype
logger = logging.getLogger(__name__)
def read_file(fname: str) -> TextIOWrapper | OSError:
try:
fp = open(fname, "r", newline="\n")
return fp
except FileNotFoundError:
return FileNotFoundError("Nenhum ficheiro encontrado")
except PermissionError:
return PermissionError("Sem permissões para abrir")
def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]:
event_indices = []
event_start = -1
idx = 0
for line in fp.read().split("\n"):
if event_start == -1:
event_start = idx
if utils.is_empty(line):
event_indices.append((event_start, idx))
event_start = -1
idx += 1
logger.info("Found %d events", len(event_indices))
return event_indices
def split_event(lines: list[str], start: int, end: int) -> int:
for idx in range(start, end):
if lines[idx].endswith("7"):
return idx
return -1
def extract_event(
fp: TextIOWrapper, event_bounds: list[tuple[int, int]]
) -> tuple[list[evtype], list[sttype]]:
lines = fp.read().split("\n")
events, ev_stations = [], []
for event_idx in event_bounds:
stations = split_event(lines, event_idx[0], event_idx[1])
if stations == -1:
logger.error(f"Could not parse event at pos {event_idx}")
continue
ev = parse_event(lines[event_idx[0] : stations])
events.append(ev)
ev_stations.append(
parse_stations_V1(lines[stations + 1 : event_idx[1]], ev["ID"])
)
return events, ev_stations
def massage_magnitudes(data: list[evtype]) -> list[evtype]:
data = data
for idx, ev in enumerate(data):
for jdx, mag in enumerate(ev["Magnitudes"]):
data[idx]["Magnitudes"][jdx] = mag.toJSON()
return data
def parse(fname: str):
_ret = read_file(fname)
if not isinstance(_ret, TextIOWrapper):
logger.critical(_ret.__str__())
raise _ret
events = find_events(_ret)
_ret.seek(0)
evs, stations = extract_event(_ret, events)
# cleanup
_ret.close()
return evs, stations