Compare commits

..

4 Commits

Author SHA1 Message Date
6a5bec73b0 feat: parser_type_7 adicionado, falta adicionar nos returns 2025-10-30 22:47:12 -01:00
d7e351909e feat: Adicionado testes 2025-10-30 22:01:44 -01:00
82912bcbe8 feat: Adicionar o parsing do tipo E
fix: ligeiras mudanças no parser de tipo 1 e mudanças de
nomes de variaveis
2025-10-30 20:55:25 -01:00
1f7075041c feat: Esqueleto do parser feito
feat: Parse do tipo 1 implementado
2025-10-30 19:37:46 -01:00
5 changed files with 255 additions and 0 deletions

View File

@@ -14,6 +14,9 @@ First, let's represent the data using Python's Pandas module and implement CRUD
- T1 a T4 -> 10 de novembro - T1 a T4 -> 10 de novembro
- (a definir) - (a definir)
## Apontamentos
Dados parecem estar no formato [Nordic](https://seisan.info/v13/node259.html)
## Bibliografia ## Bibliografia
- [Pandas lib](https://pandas.pydata.org/docs) - [Pandas lib](https://pandas.pydata.org/docs)

170
parser.py
View File

@@ -0,0 +1,170 @@
from collections import defaultdict
from datetime import datetime, time
def is_blank(l: str) -> bool:
return len(l.strip(" ")) == 0
def parse_flt(v:str) -> float | None:
try:
t = float(v)
return t
except ValueError:
return None
def parse_int(v:str) -> int | None:
try:
t = int(v)
return t
except ValueError:
return None
def parse():
fp = open("dados.txt")
data = [l for l in fp.read().split("\n")]
chunks = boundaries(data)
for c in chunks:
parse_chunk(data[c[0]:c[1]])
fp.close()
def boundaries(data: list[str]):
boundaries = []
start = None
for (idx,l) in enumerate(data):
if start is None:
if not is_blank(l):
start = idx
else:
if is_blank(l):
boundaries.append((start,idx))
start = None
return boundaries
def parse_chunk(chunk_lines: list[str]):
hIdx = None
for (idx, l) in enumerate(chunk_lines):
if l[-1] == "7":
hIdx = idx
break
headersRet = parse_header(chunk_lines[:hIdx])
# TODO: implementar o parser das fases parser_type_7
return headersRet
def parse_header(hLines: list[str]):
aux = defaultdict(list)
for line in hLines:
match line[-1]:
case "1":
aux[1].append(line)
case "3":
aux[3].append(line)
case "6":
aux[6].append(line)
case "E":
aux["E"].append(line)
case "I":
aux["I"].append(line)
case "F":
aux["F"].append(line)
case _:
raise NotImplemented
headerDict = dict()
for (k,v) in aux.items():
if len(v) != 0:
headerDict.update(FUNCS[k](v))
return headerDict
def parse_mag(line: str):
magnitudes = []
base = 55
while base < 79:
m = line[base:base+4]
mt = line[base+4]
if not is_blank(m):
magnitudes.append({"M": m, "T": mt})
base += 8
return magnitudes
def parse_type_1(data: list[str]):
aux = data[0]
y = int(aux[1:5])
mo = int(aux[6:8])
d = int(aux[8:10])
h = int(aux[11:13])
m = int(aux[13:15])
s = int(aux[16:18])
mil = int(aux[19]) * 10**5
dt = datetime(y,mo,d,h,m,s,mil)
dist_ind = aux[21]
eId = aux[22]
lat = float(aux[23:30])
long = float(aux[30:38])
depth = float(aux[38:43])
rep_ag = aux[45:48]
hypo = {"DateTime": dt.isoformat(), "Distance Indicator": dist_ind, "Event ID": eId, "Lat": lat, "Long": long, "Depth": depth, "Agency": rep_ag, "Magnitudes": list()}
for l in data:
hypo["Magnitudes"] = hypo["Magnitudes"] + parse_mag(l)
return hypo
def parse_type_3(data: list[str]):
comments = []
for line in data:
comments.append(line[:-2].strip())
return {"Comments": comments}
def parse_type_6(data: list[str]):
waves = []
for l in data:
waves.append(l.strip().split(" ")[0])
return {"Wave": waves}
def parse_type_7(data: list[str]):
phases = []
# nordic format
for l in data:
print(l)
h = int(l[18:20])
m = int(l[20:22])
sec = int(l[23:25])
mil = int(l[26:28]) * 10**4
t = time(h,m,sec,mil)
phases.append({"Stat:":l[1:5], "Com": l[6:10], "I": l[9].strip(), "Phase": l[10:15].strip(), "Polarity": l[16].strip(), "Time": t.isoformat(), "Duration": parse_flt(l[29:33]), "Amplitude": parse_flt(l[34:40]), "Period": parse_flt(l[41:45]), "Azimuth": parse_flt(l[46:51]), "Velocity":parse_int(l[52:56]), "AIN": parse_int(l[57:60]), "AR": l[61:63], "Travel Time": parse_flt(l[63:67]), "Weigth": parse_int(l[67:70]), "Distance": float(l[71:75]), "CAZ": int(l[76:79])})
return {"Phases": phases}
def parse_type_e(data: list[str]):
aux = data[0]
error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])}
return error
def parse_type_f(data: list[str]):
return {}
def parse_type_i(data: list[str]):
aux = data[0]
dt = datetime.strptime(aux[12:26], "%y-%m-%d %H:%M")
return {"Action": aux[8:11], "Action Extra": {"Date": dt.isoformat(), "OP": aux[30:35].strip(), "Status": aux[42:57].strip(), "ID":int(aux[60:74])}}
FUNCS = {1: parse_type_1, 3: parse_type_3, 6: parse_type_6, "E": parse_type_e, "F": parse_type_f, "I": parse_type_i}
parse()

1
requirements.txt Normal file
View File

@@ -0,0 +1 @@
pytest==8.4.2

31
test_data.txt Normal file
View File

@@ -0,0 +1,31 @@
1996 6 7 1325 29.2 L 59.846 5.130 12.0F TES 12 .60 1.9LTES 2.2CTES 2.0LNAO1
GAP=177 2.78 4.5 12.80.000 0.2239E+02 0.6258E+03 -0.2817E+03E
1996 6 7 1325 30.5 L 59.763 5.396 29.2 NAO 2 1.0 2.0LNAO1
8.3 41.0 74.7 1 F
1996-06-07-1324-51S.TEST__009 6
ACTION:SPL 14-12-11 12:04 OP:jh STATUS: ID:19960607132529 L I
STAT COM NTLO IPHASE W HHMM SS.SSS PAR1 PAR2 AGA OPE AIN RES W DIS CAZ7
EGD HHZ NS IP 4 1325 35.950 C BER jh 120.0-1.131047.70 6
EGD HHZ NS END 1325 35.950 111.0 BER jh 0.0 47.70 6
EGD HHZ NS AMP 1325 35.950 11.1 33.3 BER jh 47.70 6
EGD HHN NS ES 1325 42.030 BER jh 70.0-.8901047.70 6
BER BHZ NS00 IP 1325 38.120 C kkk AUT -.9801061.00 11
BER BHZ NS00 END 1325 38.120 55.0 BER jh 4.8 61.00 11
BER BHN NS00 ES 1325 45.440 BER jh 70.0-.9901061.00 11
BER BHZ NS00 IAML A1325 46.710 31.7 0.20 BER jh 0.4 61.00 11
KMY BHZ NS10 IP 1325 40.260 C PPP Ajh 70.0 .3301070.90 175
KMY BHZ NS10 END 1325 40.260 62.0 BER jh 70.90 175
KMY BHN NS10 ES 1325 48.740 BER jh 70.0.3001070.90 175
KMY BHZ NS10 IAML 1325 48.920 83.6 0.20 BER jh 70.90 175
ASK SHZ NS EP 2 1325 39.590 D -1.031071.10 3
ASK SHZ NS END 1325 39.590 68.0 71.10 3
ASK SHZ NS ES 1325 48.070 -1.021071.10 3
ASK SHZ NS AMP 1325 48.070 333.3 2.20 71.10 3
ASK SHZ NS IAML 1325 50.900 111.0 0.30 71.10 3
NRA0 S Z Pn A1326 19.090 50.0-.05010368.0 72
NRA0 S Z END 1326 19.090 333.0 368.0 72
NRA0 S Z BAZ-P 1326 19.090 256.9 6.9 0. 368.0 72
NRA0 S Z Pg 1326 27.940 -.64010368.0 72
NRA0 S Z BAZ 1326 27.940 253.0 7.3 -3. 368.0 72
NRA0 S Z Lg 1327 10.540 -.89010368.0 72
NRA0 S Z BAZ 1327 10.540 266.6 4.1 9. 368.0 72

50
tests.py Normal file
View File

@@ -0,0 +1,50 @@
import pytest
import parser
def test_type_1():
test_data =[" 1996 6 7 1325 29.2 L 59.846 5.130 12.0F TES 12 .60 1.9LTES 2.2CTES 2.0LNAO1",
" 1996 6 7 1325 30.5 L 59.763 5.396 29.2 NAO 2 1.0 2.0LNAO1"]
expected = {"DateTime": "1996-06-07T13:25:29.200000", "Distance Indicator": "L", "Event ID": " ", 'Lat': 59.846, 'Long': 5.13,'Depth': 12.0, 'Agency': 'TES', 'Magnitudes': [{'M': ' 1.9', 'T': 'L'},{'M': ' 2.2', 'T': 'C'},{'M': ' 2.0', 'T': 'L'},{'M': ' 2.0', 'T': 'L'}]}
_ret = parser.parse_type_1(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]
def test_type_3():
test_data = [" OP: CVUA-RM/RC 3",
" STATUS: OK SENTIDO 3",
" SENTIDO: II/III -Pico: S. Caetano 3",
" PUB: NAO 3",
" WEB: SIM 3",
" OBS: Por ordem do CT nao foi emitido novo comunicado 3",
" OBS: Sismo sobreposto 3",
" REGIAO: Pico,VZ14,SZ06,FE95 405 3"]
_ret = parser.parse_type_3(test_data)
assert len(_ret["Comments"]) == 8
def test_type_6():
test_data = [" 1996-06-03-2002-18S.TEST__012 6",
" 1996-06-03-1917-52S.TEST__002 6"]
expected = {"Wave": ["1996-06-03-2002-18S.TEST__012", "1996-06-03-1917-52S.TEST__002"]}
_ret = parser.parse_type_6(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]
def test_type_i():
test_data = [" ACTION:SPL 08-10-02 10:19 OP:jh STATUS: ID:19960603195540 I"]
expected = {"Action": "SPL", "Action Extra": {"Date": '2008-10-02T10:19:00', "OP": "jh", "Status": "", "ID":19960603195540}}
_ret = parser.parse_type_i(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]
def test_type_e():
test_data =[" GAP=348 2.88 999.9 999.9999.9 -0.1404E+08 -0.3810E+08 0.1205E+09E"]
expected = {"Gap": 348, "Origin": 2.88, "Error_lat": 999.9, "Error_long": 999.9, "Error_depth": 999.9, "Cov_xy": -14040000.0, "Cov_xz": -38100000.0, "Cov_yz": 120500000.0}
_ret = parser.parse_type_e(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]