Re-implemnting the parser, yay

This commit is contained in:
2025-12-23 00:19:13 -01:00
parent d9cca721c9
commit 1bb945f7e6
3 changed files with 156 additions and 6 deletions

View File

@@ -0,0 +1,142 @@
import json
import logging
import textwrap
from collections import defaultdict
from datetime import datetime, time
from typing import Any
from utilsv2 import utils
from utilsv2.log import logger
logger = logging.getLogger(__name__)
class Mag:
def __init__(self, mag: float, type: str, agency: str):
self.mag = mag
self.type = type
self.agency = agency
def __str__(self):
return f"mag: {self.mag}, type: {self.type}, agency: {self.agency}"
def toJSON(self):
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4)
def parse(event: list[str]) -> dict[str, Any]:
# nordic must always have the first line a type 1 line
# but a type 1 line can have the id ommited if it's the first line
# if event[0][-1] != "1" or event[0][-1] != " ":
# return {}
toParse: dict[str, list[str]] = defaultdict(list)
for line in event:
toParse[line[-1]].append(line)
for k, v in toParse.items():
match k:
case "1":
parse_type_1(v)
case "3":
parse_type_3(v)
case _:
pass
return {}
def parse_type_1(lines: list[str]) -> dict[str, Any] | None:
line1 = {}
for line in lines:
if "Date" not in line1.keys():
dt = parse_dt(line[:21])
dist_ind = line[20]
event_id = line[21]
lat = float(line[24:31])
long = float(line[30:39])
depth = float(line[38:44])
mags = parse_mag(line[56:79])
line1.update(
{
"DateTime": dt,
"Distance Indicator": dist_ind,
"Event ID": event_id,
"Latitude": lat,
"Longitude": long,
"Depth": depth,
"Magnitudes": mags,
}
)
else:
mags = parse_mag(line[56:79])
line1["Magnitudes"] += mags
return line1
def parse_type_3(lines: list[str]) -> dict[str, Any]:
comments = {"Sentido": "", "Regiao": "", "VZ": None, "SZ": None, "FE": None}
for line in lines:
if line.startswith(" SENTIDO"):
aux = line[:-2].split(":", maxsplit=1)
comments["Sentido"] = aux[1].strip()
elif line.startswith(" REGIAO"):
aux = line[:-2].split(":", maxsplit=1)
for item in aux[1].split(","):
if item.startswith("VZ"):
comments["VZ"] = int(item[2:])
elif item.startswith("SZ"):
comments["SZ"] = int(item[2:])
elif item.startswith("FE"):
comments["FE"] = int(item[2:5])
else:
comments["Regiao"] = item[1:]
print(comments)
return comments
def parse_type_6():
pass
def parse_type_e():
pass
def parse_type_7():
pass
def parse_dt(_text: str, isStation=False) -> datetime | time:
if not isStation:
y = int(_text[0:5])
mo = int(_text[6:8])
d = int(_text[8:10])
h = int(_text[11:13])
m = int(_text[13:15])
s_ms = int(float(_text[16:20]) * 1000)
s = s_ms // 1000
s_ms = s_ms % 1000
dt = datetime(
year=y, month=mo, day=d, hour=h, minute=m, second=s, microsecond=s_ms
)
return dt
else:
h = int(_text[11:13])
m = int(_text[13:15])
s_ms = int(float(_text[16:20]) * 1000)
s = s_ms // 1000
s_ms = s_ms % 1000
dt = time(hour=h, minute=m, second=s, microsecond=s_ms)
return dt
def parse_mag(_text: str) -> list[Mag]:
mags = []
for mag in textwrap.wrap(_text, 8):
if not utils.is_empty(mag):
mags.append(Mag(float(mag[:3]), mag[3], mag[4:]))
return mags

View File

@@ -1,7 +1,9 @@
import logging import logging
from io import TextIOWrapper from io import TextIOWrapper
from utilsv2 import utils
from utilsv2.log import logger from utilsv2.log import logger
from utilsv2.nordic import parse as n_parse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -24,7 +26,7 @@ def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]:
for line in fp.read().split("\n"): for line in fp.read().split("\n"):
if event_start == -1: if event_start == -1:
event_start = idx event_start = idx
if is_empty(line): if utils.is_empty(line):
event_indices.append((event_start, idx)) event_indices.append((event_start, idx))
event_start = -1 event_start = -1
idx += 1 idx += 1
@@ -33,12 +35,13 @@ def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]:
return event_indices return event_indices
def is_empty(_str: str) -> bool:
return len(_str.strip(" ")) == 0
def extract_event(fp: TextIOWrapper, event_bounds: list[tuple[int, int]]): def extract_event(fp: TextIOWrapper, event_bounds: list[tuple[int, int]]):
pass lines = fp.read().split("\n")
print(len(lines))
print(event_bounds)
for event_idx in event_bounds:
n_parse(lines[event_idx[0] : event_idx[1]])
def parse(fname: str) -> None: def parse(fname: str) -> None:
@@ -50,3 +53,6 @@ def parse(fname: str) -> None:
events = find_events(_ret) events = find_events(_ret)
_ret.seek(0) _ret.seek(0)
extract_event(_ret, events) extract_event(_ret, events)
# cleanup
_ret.close()

2
utilsv2/utils.py Normal file
View File

@@ -0,0 +1,2 @@
def is_empty(_str: str) -> bool:
return len(_str.strip(" ")) == 0