diff --git a/utilsv2/nordic.py b/utilsv2/nordic.py index e69de29..8d9b8c4 100644 --- a/utilsv2/nordic.py +++ b/utilsv2/nordic.py @@ -0,0 +1,142 @@ +import json +import logging +import textwrap +from collections import defaultdict +from datetime import datetime, time +from typing import Any + +from utilsv2 import utils +from utilsv2.log import logger + +logger = logging.getLogger(__name__) + + +class Mag: + def __init__(self, mag: float, type: str, agency: str): + self.mag = mag + self.type = type + self.agency = agency + + def __str__(self): + return f"mag: {self.mag}, type: {self.type}, agency: {self.agency}" + + def toJSON(self): + return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) + + +def parse(event: list[str]) -> dict[str, Any]: + # nordic must always have the first line a type 1 line + # but a type 1 line can have the id ommited if it's the first line + # if event[0][-1] != "1" or event[0][-1] != " ": + # return {} + + toParse: dict[str, list[str]] = defaultdict(list) + + for line in event: + toParse[line[-1]].append(line) + + for k, v in toParse.items(): + match k: + case "1": + parse_type_1(v) + case "3": + parse_type_3(v) + case _: + pass + + return {} + + +def parse_type_1(lines: list[str]) -> dict[str, Any] | None: + line1 = {} + for line in lines: + if "Date" not in line1.keys(): + dt = parse_dt(line[:21]) + dist_ind = line[20] + event_id = line[21] + lat = float(line[24:31]) + long = float(line[30:39]) + depth = float(line[38:44]) + mags = parse_mag(line[56:79]) + line1.update( + { + "DateTime": dt, + "Distance Indicator": dist_ind, + "Event ID": event_id, + "Latitude": lat, + "Longitude": long, + "Depth": depth, + "Magnitudes": mags, + } + ) + else: + mags = parse_mag(line[56:79]) + line1["Magnitudes"] += mags + + return line1 + + +def parse_type_3(lines: list[str]) -> dict[str, Any]: + comments = {"Sentido": "", "Regiao": "", "VZ": None, "SZ": None, "FE": None} + for line in lines: + if line.startswith(" SENTIDO"): + aux = line[:-2].split(":", maxsplit=1) + comments["Sentido"] = aux[1].strip() + elif line.startswith(" REGIAO"): + aux = line[:-2].split(":", maxsplit=1) + for item in aux[1].split(","): + if item.startswith("VZ"): + comments["VZ"] = int(item[2:]) + elif item.startswith("SZ"): + comments["SZ"] = int(item[2:]) + elif item.startswith("FE"): + comments["FE"] = int(item[2:5]) + else: + comments["Regiao"] = item[1:] + + print(comments) + return comments + + +def parse_type_6(): + pass + + +def parse_type_e(): + pass + + +def parse_type_7(): + pass + + +def parse_dt(_text: str, isStation=False) -> datetime | time: + if not isStation: + y = int(_text[0:5]) + mo = int(_text[6:8]) + d = int(_text[8:10]) + h = int(_text[11:13]) + m = int(_text[13:15]) + s_ms = int(float(_text[16:20]) * 1000) + s = s_ms // 1000 + s_ms = s_ms % 1000 + dt = datetime( + year=y, month=mo, day=d, hour=h, minute=m, second=s, microsecond=s_ms + ) + return dt + else: + h = int(_text[11:13]) + m = int(_text[13:15]) + s_ms = int(float(_text[16:20]) * 1000) + s = s_ms // 1000 + s_ms = s_ms % 1000 + dt = time(hour=h, minute=m, second=s, microsecond=s_ms) + return dt + + +def parse_mag(_text: str) -> list[Mag]: + mags = [] + for mag in textwrap.wrap(_text, 8): + if not utils.is_empty(mag): + mags.append(Mag(float(mag[:3]), mag[3], mag[4:])) + return mags diff --git a/utilsv2/parser.py b/utilsv2/parser.py index 72cc555..6938679 100644 --- a/utilsv2/parser.py +++ b/utilsv2/parser.py @@ -1,7 +1,9 @@ import logging from io import TextIOWrapper +from utilsv2 import utils from utilsv2.log import logger +from utilsv2.nordic import parse as n_parse logger = logging.getLogger(__name__) @@ -24,7 +26,7 @@ def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]: for line in fp.read().split("\n"): if event_start == -1: event_start = idx - if is_empty(line): + if utils.is_empty(line): event_indices.append((event_start, idx)) event_start = -1 idx += 1 @@ -33,12 +35,13 @@ def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]: return event_indices -def is_empty(_str: str) -> bool: - return len(_str.strip(" ")) == 0 - - def extract_event(fp: TextIOWrapper, event_bounds: list[tuple[int, int]]): - pass + lines = fp.read().split("\n") + print(len(lines)) + print(event_bounds) + + for event_idx in event_bounds: + n_parse(lines[event_idx[0] : event_idx[1]]) def parse(fname: str) -> None: @@ -50,3 +53,6 @@ def parse(fname: str) -> None: events = find_events(_ret) _ret.seek(0) extract_event(_ret, events) + + # cleanup + _ret.close() diff --git a/utilsv2/utils.py b/utilsv2/utils.py new file mode 100644 index 0000000..53d9d20 --- /dev/null +++ b/utilsv2/utils.py @@ -0,0 +1,2 @@ +def is_empty(_str: str) -> bool: + return len(_str.strip(" ")) == 0