movidos ficheiros, alterado algumas coisas

This commit is contained in:
2025-11-09 15:59:56 -01:00
parent b7719295ab
commit 5eed5fbe7a
8 changed files with 710 additions and 61 deletions

135
utils/crud.py Normal file
View File

@@ -0,0 +1,135 @@
# pyright: basic
import pandas as pd
from . import parser
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 150)
HEADER_COLS = ["Data", "Distancia", "Tipo Ev", "Lat", "Long", "Prof", "Magnitudes"]
TABLE_READ_RET = ["Data", "Lat", "Long", "Distancia", "Tipo Ev"]
def _get_uniques(df) -> pd.DataFrame:
return df.get(["ID", "Data", "Regiao"]).drop_duplicates(subset="ID", keep="first")
def _show_events(df):
for (_, row) in df.iterrows():
print(f"{row["ID"]}: {row["Regiao"]}")
def read_ids(df):
ids = _get_uniques(df)
_show_events(ids)
def read_header(df, event_id):
# Informações do header do evento
row = df[df["ID"] == event_id].iloc[0]
cols = list(df.columns)
# end = cols.index("ID") - 1
# header_cols = cols[:end]
# Para selecionar todas as colunas em vez de só algumas
info = []
for (i, col) in enumerate(HEADER_COLS):
info.append(f"{i+1} {col}: {row[col]}")
infoString = f"Header do evento {event_id}:\n" + "\n".join(info)
return infoString
def show_table(df, retCols=TABLE_READ_RET):
print(df.loc[:,retCols])
def get_table(df, event_id):
rows = df[df["ID"] == event_id]
rows = rows.drop("ID", axis=1)
return rows
def read_table_row(df, event_id, row_number_1):
# retorna uma linha específica da tabela
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
row = table.iloc[row_number_0]
cols = list(df.columns)
start = cols.index("Estacao")
tableCols = cols[start:]
info = []
for (i, col) in enumerate(tableCols):
info.append(f"{i+1} {col}: {row[col]}")
return f"Linha {row_number_1:02d} do evento {event_id}:\n" + "\n".join(info)
def update_table_row(df, event_id, row_number_1, new_data):
# atualiza uma linha específica da tabela do evento
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
for key, value in new_data.items():
if key in table.columns:
df.loc[(df["ID"] == event_id) & (df.index == table.index[row_number_0]), key] = value
return f"Linha {row_number_1} do evento {event_id} atualizada com sucesso."
def update_header(df, event_id, new_data):
# atualiza o header de um evento
for key, value in new_data.items():
if key in df.columns:
df.loc[(df["ID"] == event_id) | df.iloc[0], key] = value
return f"Header do evento {event_id} atualizado com sucesso."
def delete_event(df, event_id):
# Apaga um evento inteiro (header + tabela)
new_df = df.drop(df[df["ID"] == event_id].index)
print(f"Evento {event_id} apagado!")
return new_df
def delete_table_row(df, event_id, row_number_1):
# Apaga uma linha específica da tabela do evento
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
new_df = df.drop(table.index[row_number_0])
return new_df
def create_blank_event(df, event_id):
# Criar um evento vazio com linha de header e 1 linha de coluna
df.loc[df["ID"] >= event_id, "ID"] += 1
blank_row_df = pd.DataFrame(columns=df.columns, index=[0, 1])
blank_row_df["ID"] = event_id
blank_row_df = blank_row_df.astype(df.dtypes)
new_df = pd.concat([df, blank_row_df], ignore_index=True)
new_df = new_df.sort_values(by="ID", kind="mergesort").reset_index(drop=True)
return new_df
def create_table_row(df, event_id, row_number_1):
event_rows = df[df["ID"] == event_id]
if event_rows.empty:
return df, f"Erro: Evento com ID {event_id} não encontrado."
header_idx = event_rows.index[0]
table_size = len(event_rows.index) - 1
# Validar posição da nova linha
if not (1 <= row_number_1 <= table_size + 1):
return df, f"Erro: Posição {row_number_1} inválida. Evento {event_id} tem {table_size} linha(s) na tabela."
insertion_point = header_idx + row_number_1
new_row_df = pd.DataFrame(columns=df.columns, index=[0])
new_row_df['ID'] = event_id
new_row_df = new_row_df.astype(df.dtypes)
df_before = df.iloc[:insertion_point]
df_after = df.iloc[insertion_point:]
new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True)
return new_df, f"Linha inserida com sucesso na posição {row_number_1} do evento {event_id}."
def create_entire_database() -> pd.DataFrame:
pass

195
utils/parser.py Normal file
View File

@@ -0,0 +1,195 @@
# pyright: basic
import io
import warnings
from collections import defaultdict
from datetime import datetime
import pandas as pd
# --- globals ---
DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"}
TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"}
# --- helper funcs ---
def is_blank(l: str) -> bool:
return len(l.strip(" ")) == 0
def parse_flt(v:str) -> float | None:
try:
t = float(v)
return t
except ValueError:
return None
def parse_int(v:str) -> int | None:
try:
t = int(v)
return t
except ValueError:
return None
def into_dataframe(data) -> pd.DataFrame:
if len(data) == 0:
return pd.DataFrame()
aux = {k: [] for k in data.keys()}
for (k,v) in data.items():
aux[k].append(v)
return pd.DataFrame(data=aux)
def _concat(preamble, df: pd.DataFrame):
for (k,v) in preamble.items():
df.insert(len(df.columns)-1, k, [v for _ in range(len(df))])
return df
def validate_no_stations(expected:int , stationsDF:pd.DataFrame) -> bool:
uniqueStations = stationsDF["Estacao"].nunique()
return expected == uniqueStations
# --- principal ---
def parse(fname):
fp = open(fname)
data = [l for l in fp.read().split("\n")]
chunks = boundaries(data)
df = pd.DataFrame()
for (idx,c) in enumerate(chunks):
a = parse_chunk(data[c[0]:c[1]])
aux = pd.concat([df, a], axis=0, ignore_index=True)
df = aux
fp.close()
return df
def boundaries(data: list[str]):
boundaries = []
start = None
for (idx,l) in enumerate(data):
if start is None:
if not is_blank(l):
start = idx
else:
if is_blank(l):
boundaries.append((start,idx))
start = None
return boundaries
def parse_chunk(chunk_lines: list[str]):
hIdx = None
for (idx, l) in enumerate(chunk_lines):
if l[-1] == "7":
hIdx = idx
break
preambleRet = _parse_preamble(chunk_lines[:hIdx])
phaseRet = _parse_type_7(chunk_lines[hIdx:])
if not validate_no_stations(preambleRet["Estacoes"], phaseRet):
pass
return _concat(preambleRet, phaseRet)
def _parse_preamble(hLines: list[str]):
aux = defaultdict(list)
for line in hLines:
match line[-1]:
case "1":
aux[1].append(line)
case "3":
aux[3].append(line)
case "6":
aux[6].append(line)
case "E":
pass
# aux["E"].append(line)
case "I":
aux["I"].append(line)
case "F":
pass
# aux["F"].append(line)
case _:
pass
headerDict = dict()
for (k,v) in aux.items():
if len(v) != 0:
headerDict.update(FUNCS[k](v))
return headerDict
def _parse_type_1(data: list[str]):
aux = data[0]
y = int(aux[1:5])
mo = int(aux[6:8])
d = int(aux[8:10])
h = int(aux[11:13])
m = int(aux[13:15])
s = int(aux[16:18])
mil = int(aux[19]) * 10**5
dt = datetime(y,mo,d,h,m,s,mil)
dist_ind = DIST_IND[aux[21]]
ev_type = TYPE[aux[22]]
lat = float(aux[23:30])
long = float(aux[30:38])
depth = float(aux[38:43])
no_stat = int(aux[48:51])
hypo = {"Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Ev": ev_type, "Lat": lat, "Long": long, "Prof": depth, "Estacoes": no_stat, "Magnitudes": list()}
for l in data:
hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l)
return hypo
def _parse_mag(line: str):
magnitudes = []
base = 55
while base < 79:
m = line[base:base+4]
mt = line[base+4]
if not is_blank(m):
magnitudes.append({"Magnitude": m, "Tipo": mt})
base += 8
return magnitudes
def _parse_type_3(data: list[str]):
comments = {}
for line in data:
if line.startswith(" SENTIDO") or line.startswith(" REGIAO"):
c, v = line[:-2].strip().split(": ", maxsplit=1)
comments[c.capitalize()] = v
return comments
def _parse_type_6(data: list[str]):
waves = []
for l in data:
waves.append(l.strip().split(" ")[0])
return {"Onda": waves}
def _parse_type_7(data: list[str]):
aux = io.StringIO("\n".join(data))
dados = pd.read_fwf(aux, colspecs=[(1,5), (6,8),(10,15), (18,20), (20,22), (23,28), (34,38)])
dados.rename(columns={'STAT': "Estacao", 'SP': "Componente" , 'PHASW': "Tipo Onda", 'HR': "Hora", 'MM': "Min", 'SECON': "Seg", 'AMPL': "Amplitude"}, inplace=True)
return dados
def _parse_type_e(data: list[str]):
aux = data[0]
error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])}
return error
def _parse_type_i(data: list[str]):
aux = data[0]
return {"ID":int(aux[60:74])}
FUNCS = {1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i}
parse("dados.txt")

31
utils/test_data.txt Normal file
View File

@@ -0,0 +1,31 @@
1996 6 7 1325 29.2 L 59.846 5.130 12.0F TES 12 .60 1.9LTES 2.2CTES 2.0LNAO1
GAP=177 2.78 4.5 12.80.000 0.2239E+02 0.6258E+03 -0.2817E+03E
1996 6 7 1325 30.5 L 59.763 5.396 29.2 NAO 2 1.0 2.0LNAO1
8.3 41.0 74.7 1 F
1996-06-07-1324-51S.TEST__009 6
ACTION:SPL 14-12-11 12:04 OP:jh STATUS: ID:19960607132529 L I
STAT COM NTLO IPHASE W HHMM SS.SSS PAR1 PAR2 AGA OPE AIN RES W DIS CAZ7
EGD HHZ NS IP 4 1325 35.950 C BER jh 120.0-1.131047.70 6
EGD HHZ NS END 1325 35.950 111.0 BER jh 0.0 47.70 6
EGD HHZ NS AMP 1325 35.950 11.1 33.3 BER jh 47.70 6
EGD HHN NS ES 1325 42.030 BER jh 70.0-.8901047.70 6
BER BHZ NS00 IP 1325 38.120 C kkk AUT -.9801061.00 11
BER BHZ NS00 END 1325 38.120 55.0 BER jh 4.8 61.00 11
BER BHN NS00 ES 1325 45.440 BER jh 70.0-.9901061.00 11
BER BHZ NS00 IAML A1325 46.710 31.7 0.20 BER jh 0.4 61.00 11
KMY BHZ NS10 IP 1325 40.260 C PPP Ajh 70.0 .3301070.90 175
KMY BHZ NS10 END 1325 40.260 62.0 BER jh 70.90 175
KMY BHN NS10 ES 1325 48.740 BER jh 70.0.3001070.90 175
KMY BHZ NS10 IAML 1325 48.920 83.6 0.20 BER jh 70.90 175
ASK SHZ NS EP 2 1325 39.590 D -1.031071.10 3
ASK SHZ NS END 1325 39.590 68.0 71.10 3
ASK SHZ NS ES 1325 48.070 -1.021071.10 3
ASK SHZ NS AMP 1325 48.070 333.3 2.20 71.10 3
ASK SHZ NS IAML 1325 50.900 111.0 0.30 71.10 3
NRA0 S Z Pn A1326 19.090 50.0-.05010368.0 72
NRA0 S Z END 1326 19.090 333.0 368.0 72
NRA0 S Z BAZ-P 1326 19.090 256.9 6.9 0. 368.0 72
NRA0 S Z Pg 1326 27.940 -.64010368.0 72
NRA0 S Z BAZ 1326 27.940 253.0 7.3 -3. 368.0 72
NRA0 S Z Lg 1327 10.540 -.89010368.0 72
NRA0 S Z BAZ 1327 10.540 266.6 4.1 9. 368.0 72

50
utils/tests.py Normal file
View File

@@ -0,0 +1,50 @@
import pytest
import parser
def test_type_1():
test_data =[" 1996 6 7 1325 29.2 L 59.846 5.130 12.0F TES 12 .60 1.9LTES 2.2CTES 2.0LNAO1",
" 1996 6 7 1325 30.5 L 59.763 5.396 29.2 NAO 2 1.0 2.0LNAO1"]
expected = {"DateTime": "1996-06-07T13:25:29.200000", "Distance Indicator": "L", "Event ID": " ", 'Lat': 59.846, 'Long': 5.13,'Depth': 12.0, 'Agency': 'TES', 'Magnitudes': [{'M': ' 1.9', 'T': 'L'},{'M': ' 2.2', 'T': 'C'},{'M': ' 2.0', 'T': 'L'},{'M': ' 2.0', 'T': 'L'}]}
_ret = parser.parse_type_1(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]
def test_type_3():
test_data = [" OP: CVUA-RM/RC 3",
" STATUS: OK SENTIDO 3",
" SENTIDO: II/III -Pico: S. Caetano 3",
" PUB: NAO 3",
" WEB: SIM 3",
" OBS: Por ordem do CT nao foi emitido novo comunicado 3",
" OBS: Sismo sobreposto 3",
" REGIAO: Pico,VZ14,SZ06,FE95 405 3"]
_ret = parser.parse_type_3(test_data)
assert len(_ret["Comments"]) == 8
def test_type_6():
test_data = [" 1996-06-03-2002-18S.TEST__012 6",
" 1996-06-03-1917-52S.TEST__002 6"]
expected = {"Wave": ["1996-06-03-2002-18S.TEST__012", "1996-06-03-1917-52S.TEST__002"]}
_ret = parser.parse_type_6(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]
def test_type_i():
test_data = [" ACTION:SPL 08-10-02 10:19 OP:jh STATUS: ID:19960603195540 I"]
expected = {"Action": "SPL", "Action Extra": {"Date": '2008-10-02T10:19:00', "OP": "jh", "Status": "", "ID":19960603195540}}
_ret = parser.parse_type_i(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]
def test_type_e():
test_data =[" GAP=348 2.88 999.9 999.9999.9 -0.1404E+08 -0.3810E+08 0.1205E+09E"]
expected = {"Gap": 348, "Origin": 2.88, "Error_lat": 999.9, "Error_long": 999.9, "Error_depth": 999.9, "Cov_xy": -14040000.0, "Cov_xz": -38100000.0, "Cov_yz": 120500000.0}
_ret = parser.parse_type_e(test_data)
for (k,v) in _ret.items():
assert _ret[k] == expected[k]

0
utils/vis.py Normal file
View File