Compare commits

...

40 Commits

Author SHA1 Message Date
05d8de4902 Misc v2 2026-01-05 15:29:49 -01:00
9d65d4a862 misc 2026-01-04 22:35:49 -01:00
9276ff0c47 Fix nas estatísticas 2026-01-04 19:26:19 -01:00
c275bc7065 Mais coisas, estatisticas 2026-01-04 18:51:37 -01:00
1d1826ebe5 Fix to magnitude agency 2026-01-02 23:01:08 -01:00
21f5d29a53 Gerar dados falsos para testar 2026-01-02 22:47:47 -01:00
1e6551a2b1 (feat): Insertion to mongoDB done 2025-12-28 16:22:42 -01:00
1bb945f7e6 Re-implemnting the parser, yay 2025-12-23 18:40:53 -01:00
d9cca721c9 Yeet everything, rebuild from scratch poggies! 2025-12-22 19:15:52 -01:00
7a4de947c6 WHOOPS 2025-12-16 16:58:37 -01:00
c6bd1eb669 O resto dos comentarios 2025-12-13 18:19:38 -01:00
3e0814057f doc: Comentários a cada função
fix: remover código morto ou desnecessário
2025-12-13 17:56:55 -01:00
96aaeed19f merging 2025-12-12 21:46:48 -01:00
smolsbs
f7d1992595 Merge pull request #7 from aulojor/main
T5, 6, 7
2025-12-12 21:38:44 -01:00
Paulo Jorge Medeiros Alexandre
10075123d8 fix: corrigir nome da coluna Prof 2025-12-12 20:53:47 -01:00
Paulo Jorge Medeiros Alexandre
2573cfaf13 fhiiiofhweoi
aaaaaaaa
2025-12-12 20:29:23 -01:00
b3d9a31792 mais coisas de estatistica 2025-12-11 15:42:07 -01:00
14dee58ab2 graficos, estatitsticas e filtros 2025-12-11 15:14:38 -01:00
490c88085a a 2025-12-11 07:56:11 -01:00
aulojor
991d372baf plotly vis 2025-12-04 18:09:13 -01:00
aulojor
a9839e64bf feat: substituir guardar_json pelo utils.save_as_json
Mudei um pouco o save_as_json e talvez tenha ficado um pouco nojento porque não sabia muito bem como fazer uso das EVENT_COLS e STATION_COLS de dentro do utils.py
2025-11-15 22:26:26 -01:00
afef4c4d5c novo json 2025-11-15 16:05:41 -01:00
047f5e25ac mais coisas 2025-11-15 15:34:07 -01:00
827e9b5c77 feat: Implementado formato JSON personalizado 2025-11-13 15:18:51 -01:00
4da96e8e74 Merge 2025-11-11 14:11:20 -01:00
smolsbs
ec17137f4b Merge pull request #5 from aulojor/main
Json formatado, Criação de rows com baliza, baliza para deleção de rows, tempo adicionado ao TABLE_RET
2025-11-11 13:55:08 -01:00
aulojor
6826941c55 feat: criação de entrada e balizas para linhas, adicionar tempo à visualização 2025-11-11 13:27:29 -01:00
aulojor
12d23d2c0c feat: identar o output do json 2025-11-11 12:34:14 -01:00
599e456fcf T1 a T4 implementados, provavelmente 2025-11-09 22:57:16 -01:00
74dea1c653 a 2025-11-09 22:29:34 -01:00
1645645017 Alterar como mostro os eventos 2025-11-09 22:29:01 -01:00
smolsbs
47f6ff5ca4 Merge pull request #4 from aulojor/main
feat: load de json se o ficheiro colocado no [1] for um json
2025-11-09 21:48:06 -01:00
aulojor
bb10d808f8 feat: adicionar edição de uma linha ao menu 2025-11-09 21:28:19 -01:00
aulojor
987324f7f1 feat: load de json se o ficheiro colocado no [1] for um json 2025-11-09 21:06:17 -01:00
31563ed0da Merge changes 2025-11-09 20:56:41 -01:00
8fa3b1ec10 Misc 2025-11-09 20:50:28 -01:00
smolsbs
ef2cdfdc6f Merge pull request #3 from aulojor/main
feat: adicionar o delete_table_row ao menu
2025-11-09 20:50:05 -01:00
aulojor
d83a953a12 feat: adicionar o delete_table_row ao menu 2025-11-09 20:44:38 -01:00
d9ddd5b3ed docs: README atualizado 2025-11-09 18:37:04 -01:00
36f57ae3c7 fix: atualizada lista de requerimentos 2025-11-09 18:35:04 -01:00
18 changed files with 1519 additions and 1236 deletions

3
.gitignore vendored
View File

@@ -1,6 +1,9 @@
*.zip
*.json *.json
*.csv *.csv
stats-*.txt
# ---> Python # ---> Python
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files

View File

@@ -1,5 +1,11 @@
# Project I - Earthquakes # Project I - Earthquakes
## Como utilizar
Correr o ficheiro `earthquakes.py` usando `python earthquakes.py`
Garantir que o ficheiro de dados está no mesmo diretório que o ficheiro `earthquakes.py`
## Objectivos
First, let's represent the data using Python's Pandas module and implement CRUD operations, including JSON's conversion. Then, let's implement some statistical operations with graphical representations using Python's Matplotlib module over data representation in Pandas data model. First, let's represent the data using Python's Pandas module and implement CRUD operations, including JSON's conversion. Then, let's implement some statistical operations with graphical representations using Python's Matplotlib module over data representation in Pandas data model.
## Tarefas: ## Tarefas:
@@ -8,16 +14,34 @@ First, let's represent the data using Python's Pandas module and implement CRUD
- T2 - Implement CRUD operations through a text menu; - T2 - Implement CRUD operations through a text menu;
- T3 - Implement statistical operations such as: average, variance, standard desviation, max, min, mode; through a text menu; - T3 - Implement statistical operations such as: average, variance, standard desviation, max, min, mode; through a text menu;
- T4 - Convert from Pandas to JSON and save it in a text file; - T4 - Convert from Pandas to JSON and save it in a text file;
- T5 - to be continued ... - T5 - Calcular as seguintes estatísticas:
- Número de eventos por dia e por mês.
- Média e desvio padrão da profundidade e da magnitude por mês.
- Mediana, 1º quartil e 3º quartil da profundidade e da magnitude por mês.
- Máximo e mínimo a profundidade e da magnitude por mês.
- T6 - Para a representação gráfica:
- Um gráfico de barras com o numero de eventos por dia.
- Um gráfico de barras com o numero de eventos por mês.
- Um gráfico linear com a média +/- o desvio padrão das profundidades por mês.
- Um gráfico linear com a média +/- a desvio padrão da magnitude L por mês.
- Um gráfico tipo "boxplot" com as profundidades por mês.
- Um gráfico tipo "boxplot" com as magnitudes L por mês.
- T7 - Implementar os filtros de seleção de eventos para o cálculo / representação gráfica:
- Período temporal (Data inicial, Data final).
- Eventos com GAP menor que um determinado valor.
- Qualidade (EPI ou Todos).
- Zonas SZ.
- Zonas VZ.
- Limitar por Magnitudes L (mínimo, máximo).
- Limitar Profundidades (mínimo, máximo).
## Prazos ## Prazos
- T1 a T4 -> 10 de novembro - T1 a T4 -> 10 de novembro
- (a definir) - T5 a T7 -> 14 de dezembro
## Apontamentos ## Apontamentos
Dados parecem estar no formato [Nordic](https://seisan.info/v13/node259.html) Dados parecem estar no formato [Nordic](https://seisan.info/v13/node259.html)
## Bibliografia ## Bibliografia
- [Pandas lib](https://pandas.pydata.org/docs) - [Pandas lib](https://pandas.pydata.org/docs)
- [Matplotlib](https://matplotlib.org/stable/index.html) - [Matplotlib](https://matplotlib.org/stable/index.html)

1237
dados.txt

File diff suppressed because it is too large Load Diff

View File

@@ -1,185 +0,0 @@
#! /usr/bin/env python
# pyright: basic
import json
import os
import pandas as pd
from utils import parser, crud, stats
HEADER = """=== Terramotos ==="""
MENU ="""[1] Criar a base de dados
[2] Atualizar uma entrada
[3] Apagar um evento
[4] Apagar uma entrada de um evento
[5] Visualizar uma entrada
[6] Guardar como JSON
[7] Guardar como CSV
[8] Estatísticas
[Q] Sair
"""
def guardar_df(df: pd.DataFrame, fname: str) -> bool:
with open(fname, "w") as fp:
fname = f"{fname}.txt"
try:
fp.write(df.to_string())
except ValueError:
return False
return True
def guardar_json(df: pd.DataFrame, fname: str) -> bool:
with open(fname , "w") as fp:
try:
json.dump(df.to_json(), fp)
except:
return False
return True
def guardar_csv(df: pd.DataFrame, fname: str):
with open(fname, "w") as fp:
try:
df.to_csv(fp, index=False)
except ValueError:
return False
return True
def main():
isRunning = True
db = None
retInfo = None
while isRunning:
os.system("cls")
print(HEADER + "\n" + MENU)
usrIn = input("Opção: ").lower()
match usrIn:
case "1":
fname = _get_usr_input("Qual os dados a ler? (dados.txt por defeito): ")
if fname is None:
fname = "dados.txt"
if _file_exists(fname):
db = parser.parse(fname)
input("Base de dados populada. Enter para voltar ao menu inicial")
else:
input("Base de dados não encontrada. Por favor tenta de novo.")
case "2":
if db is not None:
continue
else:
retInfo = "Base de dados não encontrada!"
case "3":
if db is not None:
crud.read_ids(db)
choice = _get_usr_input("Escolhe o ID para apagar: ", int)
if not _event_exists(db, choice):
retInfo = "ID do event não encontrado!"
else:
db = crud.delete_event(db, choice)
input()
else:
retInfo = "Base de dados não encontrada!"
case "4":
if db is not None:
crud.read_ids(db)
eid_choice = _get_usr_input("Escolhe o ID: ", int)
if not _event_exists(db, eid_choice):
retInfo = "ID do event não encontrado!"
else:
db = crud.delete_event(db, eid_choice)
input()
else:
retInfo = "Base de dados não encontrada!"
case "5":
if db is not None:
crud.read_ids(db)
choice = _get_usr_input("Escolhe o ID para ver os dados: ", int)
if not _event_exists(db, choice):
retInfo = "ID do event não encontrado!"
else:
table = crud.get_table(db, choice)
crud.show_table(table)
input()
else:
retInfo = "Base de dados não encontrada!"
case "6":
if db is not None:
fname = _get_usr_input("Nome do ficheiro a guardar?")
if fname is None:
fname = "valores.json"
guardar_json(db, fname)
else:
retInfo = "Base de dados não encontrada!"
case "7":
if db is not None:
fname = _get_usr_input("Nome do ficheiro a guardar?")
if fname is None:
fname = "valores.csv"
guardar_csv(db, fname)
else:
retInfo = "Base de dados não encontrada!"
case "8":
if db is not None:
pass
else:
retInfo = "Base de dados não encontrada!"
case "q":
isRunning = False
continue
case _:
pass
if retInfo:
print(retInfo)
retInfo = None
input("Clique Enter para continuar")
def _file_exists(name: str) -> bool:
currFiles = os.listdir(os.getcwd())
if name in currFiles:
return True
return False
def _event_exists(df, eid) -> bool:
allEvents = set(df["ID"])
return eid in allEvents
def _get_usr_input(msg:str, asType=str):
usrIn = input(msg)
if usrIn == "":
return None
return asType(usrIn)
if __name__ == '__main__':
main()

172
ev2.py Normal file
View File

@@ -0,0 +1,172 @@
import logging
import os
import time
from utilsv2 import mongo, parser, stats, utils
from utilsv2.log import logger
OS = os.name
MAIN_MENU = {
"1": "Adicionar novos dados",
"2": "Aplicar filtros",
"3": "Estatísticas",
"4": "Limpar filtros",
"q": "Sair",
}
FILTER_MENU = {
"1": "Data início",
"2": "Magnitudes",
"3": "Profundidade",
"4": "GAP",
"6": "Limpar filtros",
"7": "Mostrar filtros",
"q": "Voltar ao menu principal",
}
def clear_screen():
os.system("cls" if OS == "nt" else "clear")
def print_menu(menu: dict[str, str]):
clear_screen()
for k, v in menu.items():
print(f"[{k}]: {v}")
def filter_menu(old_fiters):
filters = old_fiters
while True:
print_menu(FILTER_MENU)
usrIn = input()
match usrIn:
# datas
case "1":
clear_screen()
print(
"Formato da data: YYYY-MM-DD\nInserir datas de corte, separadas por uma vírgula(,)"
)
aux = input()
d1, d2 = aux.split(",", maxsplit=1)
d1 = utils.toDateTime(d1)
d2 = utils.toDateTime(d2)
filters["DateTime"] = {}
if d1 != -1:
filters["DateTime"]["$gte"] = d1
if d2 != -1:
filters["DateTime"]["$lte"] = d2
# magnitudes
case "2":
clear_screen()
print("Inserir magnitudes de corte, separadas por uma vírgula(,)")
aux = input()
d1, d2 = aux.split(",", maxsplit=1)
d1 = utils.toFloat(d1)
d2 = utils.toFloat(d2)
filters["Magnitudes.L.Magnitude"] = {}
if d1 != -1:
filters["Magnitudes.L.Magnitude"]["$gte"] = d1
if d2 != -1:
filters["Magnitudes.L.Magnitude"]["$lte"] = d2
# Profundidades
case "3":
clear_screen()
print("Inserir profundidades de corte, separadas por uma vírgula(,)")
aux = input()
d1, d2 = aux.split(",", maxsplit=1)
d1 = utils.toFloat(d1)
d2 = utils.toFloat(d2)
filters["Depth"] = {}
if d1 != -1:
filters["Depth"]["$gte"] = d1
if d2 != -1:
filters["Depth"]["$lte"] = d2
# GAP
case "4":
clear_screen()
print("Inserir GAP")
aux = input()
gap = utils.toInt(aux)
filters["GAP"] = {}
if aux:
filters["GAP"]["$lte"] = gap
case "6":
fliters = {}
case "7":
print(filters)
time.sleep(2.0)
case "q":
return filters
def graph_menu():
pass
def main():
cli = mongo.connect("mongodb://localhost:27017")
filters = {}
while True:
print_menu(MAIN_MENU)
usrIn = input()
match usrIn:
case "1":
aux = input("Ficheiro a ler:")
if utils.fileExists(aux):
logger.info(f"Parsing the file {aux}")
ev, st = parser.parse(aux)
mongo.add_events(cli, "quakes", ev, "main")
mongo.add_stations(cli, "stations", st, "main")
else:
print(f"Could not open the file {aux}")
logger.error(f"Could not open the file {aux}")
time.sleep(2.0)
case "2":
filters = filter_menu(filters)
case "3":
print(filters)
v = mongo.filter_query(cli, "quakes", filters, "test")
stats.stats(v)
time.sleep(2.0)
case "q":
break
mongo.close(cli)
if __name__ == "__main__":
logger = logging.getLogger(__name__)
# initialization
logger.info("Started")
main()
logger.info("Ended")

113
generator/gen-data.py Normal file
View File

@@ -0,0 +1,113 @@
import argparse
import random
import re
import time
from datetime import datetime
PAD = re.compile(r"^0?([1-9]\d?)$")
NORDIC_7 = (
" STAT SP IPHASW D HRMM SECON CODA AMPLIT PERI AZIMU VELO AIN AR TRES W DIS CAZ7"
)
def generate_event():
ts = random.randint(946684800000, 1767225600000)
dt = datetime.fromtimestamp(ts / 1e3)
line_3 = " OBS: Dados falsos".ljust(79) + "3"
stat = generate_station(dt)
return "\n".join(
[
gen_line_1(dt),
line_3,
generate_line6(dt),
generate_line_i(dt),
NORDIC_7,
stat,
"\n",
]
)
def generate_agency(size: int = 3) -> str:
return "".join([chr(random.randint(65, 90)) for _ in range(size)])
def remove_pad(v: str) -> str:
aux = PAD.search(v)
if aux:
return aux.group(0)
return ""
def fmt_date(dt: datetime) -> str:
return f"{dt.year} {dt.month:2d}{dt.day:2d} {dt.hour:2d}{dt.minute:2d} {dt.second:2d}.0"
# 1D Lerp
def generate_number(lb: float, ub: float, precision: int = 2) -> float:
x = random.random()
return round(lb * (1.0 - x) + (ub * x), precision)
def gen_line_1(dt: datetime) -> str:
lat = generate_number(-90.0, 90.0, 3)
long = generate_number(-180.0, 180.0, 3)
ev = random.choices(("E", "V", "Q"))[0]
depth = generate_number(0.0, 30.0, 1)
agency = generate_agency()
mag = generate_number(1.0, 6.0, 1)
return (
f" {fmt_date(dt)} L{ev}{lat: >7.3f}{long: >8.3f}{depth:>5.1f} {agency} 1 {mag: >4.1f}L{agency}{mag: >4.1f}C{agency}".ljust(
79
)
+ "1"
)
def generate_line6(dt: datetime) -> str:
return f" {dt.strftime('%Y-%m-%d-%H%M-%S')}-FAKE___001".ljust(79) + "6"
def generate_line_i(dt: datetime) -> str:
return " " * 57 + f"ID:{dt.strftime('%Y%m%d%H%M%S')} I"
def generate_station(dt: datetime) -> str:
st = generate_agency(4)
return "\n".join(
[
f" {st} EZ EP {dt.hour:2d}{dt.minute:2d} {dt.second: >5.2f}".ljust(
80
),
f" {st} EZ ES {dt.hour:2d}{dt.minute:2d} {dt.second: >5.2f}".ljust(
80
),
]
)
def main(argv: int):
fp = open("falsos.txt", "w", newline="\n")
for _ in range(argv):
aux = generate_event()
fp.write(aux)
fp.close()
if __name__ == "__main__":
random.seed(time.time())
parser = argparse.ArgumentParser()
parser.add_argument(
"n", action="store", type=int, help="Generates n amount of events"
)
args = parser.parse_args()
if args.n:
main(args.n)

Binary file not shown.

View File

@@ -1,2 +1,3 @@
pytest==8.4.2 matplotlib==3.10.8
pandas==2.3.3 numpy==2.4.0
pymongo==4.15.5

View File

@@ -1,137 +0,0 @@
# pyright: basic
import pandas as pd
from . import parser
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 150)
HEADER_COLS = ["Data", "Distancia", "Tipo Ev", "Lat", "Long", "Prof", "Magnitudes"]
TABLE_READ_RET = ["Data", "Lat", "Long", "Distancia", "Tipo Ev"]
def _get_uniques(df) -> pd.DataFrame:
return df.get(["ID", "Data", "Regiao"]).drop_duplicates(subset="ID", keep="first")
def _show_events(df):
for (_, row) in df.iterrows():
print(f"{row["ID"]}: {row["Regiao"]}")
def read_ids(df):
ids = _get_uniques(df)
_show_events(ids)
def get_unique_events_table(df):
return df.drop_duplicates(subset="ID", keep="first")
def read_header(df, event_id):
# Informações do header do evento
row = df[df["ID"] == event_id].iloc[0]
cols = list(df.columns)
# end = cols.index("ID") - 1
# header_cols = cols[:end]
# Para selecionar todas as colunas em vez de só algumas
info = []
for (i, col) in enumerate(HEADER_COLS):
info.append(f"{i+1} {col}: {row[col]}")
infoString = f"Header do evento {event_id}:\n" + "\n".join(info)
return infoString
def show_table(df, retCols=TABLE_READ_RET):
print(df.loc[:,retCols])
def get_table(df, event_id):
rows = df[df["ID"] == event_id]
rows = rows.drop("ID", axis=1)
return rows
def read_table_row(df, event_id, row_number_1):
# retorna uma linha específica da tabela
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
row = table.iloc[row_number_0]
cols = list(df.columns)
start = cols.index("Estacao")
tableCols = cols[start:]
info = []
for (i, col) in enumerate(tableCols):
info.append(f"{i+1} {col}: {row[col]}")
return f"Linha {row_number_1:02d} do evento {event_id}:\n" + "\n".join(info)
def update_table_row(df, event_id, row_number_1, new_data):
# atualiza uma linha específica da tabela do evento
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
for key, value in new_data.items():
if key in table.columns:
df.loc[(df["ID"] == event_id) & (df.index == table.index[row_number_0]), key] = value
return f"Linha {row_number_1} do evento {event_id} atualizada com sucesso."
def update_header(df, event_id, new_data):
# atualiza o header de um evento
for key, value in new_data.items():
if key in df.columns:
df.loc[(df["ID"] == event_id) | df.iloc[0], key] = value
return f"Header do evento {event_id} atualizado com sucesso."
def delete_event(df, event_id):
# Apaga um evento inteiro (header + tabela)
new_df = df.drop(df[df["ID"] == event_id].index)
print(f"Evento {event_id} apagado!")
return new_df
def delete_table_row(df, event_id, row_number_1):
# Apaga uma linha específica da tabela do evento
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
new_df = df.drop(table.index[row_number_0])
return new_df
def create_blank_event(df, event_id):
# Criar um evento vazio com linha de header e 1 linha de coluna
df.loc[df["ID"] >= event_id, "ID"] += 1
blank_row_df = pd.DataFrame(columns=df.columns, index=[0, 1])
blank_row_df["ID"] = event_id
blank_row_df = blank_row_df.astype(df.dtypes)
new_df = pd.concat([df, blank_row_df], ignore_index=True)
new_df = new_df.sort_values(by="ID", kind="mergesort").reset_index(drop=True)
return new_df
def create_table_row(df, event_id, row_number_1):
event_rows = df[df["ID"] == event_id]
if event_rows.empty:
return df, f"Erro: Evento com ID {event_id} não encontrado."
header_idx = event_rows.index[0]
table_size = len(event_rows.index) - 1
# Validar posição da nova linha
if not (1 <= row_number_1 <= table_size + 1):
return df, f"Erro: Posição {row_number_1} inválida. Evento {event_id} tem {table_size} linha(s) na tabela."
insertion_point = header_idx + row_number_1
new_row_df = pd.DataFrame(columns=df.columns, index=[0])
new_row_df['ID'] = event_id
new_row_df = new_row_df.astype(df.dtypes)
df_before = df.iloc[:insertion_point]
df_after = df.iloc[insertion_point:]
new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True)
return new_df, f"Linha inserida com sucesso na posição {row_number_1} do evento {event_id}."
def create_entire_database() -> pd.DataFrame:
pass

View File

@@ -1,194 +0,0 @@
# pyright: basic
import io
from collections import defaultdict
from datetime import datetime
import pandas as pd
# --- globals ---
DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"}
TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"}
# --- helper funcs ---
def is_blank(l: str) -> bool:
return len(l.strip(" ")) == 0
def parse_flt(v:str) -> float | None:
try:
t = float(v)
return t
except ValueError:
return None
def parse_int(v:str) -> int | None:
try:
t = int(v)
return t
except ValueError:
return None
def into_dataframe(data) -> pd.DataFrame:
if len(data) == 0:
return pd.DataFrame()
aux = {k: [] for k in data.keys()}
for (k,v) in data.items():
aux[k].append(v)
return pd.DataFrame(data=aux)
def _concat(preamble, df: pd.DataFrame):
for (k,v) in preamble.items():
df.insert(len(df.columns)-1, k, [v for _ in range(len(df))])
return df
def validate_no_stations(expected:int , stationsDF:pd.DataFrame) -> bool:
uniqueStations = stationsDF["Estacao"].nunique()
return expected == uniqueStations
# --- principal ---
def parse(fname):
fp = open(fname)
data = [l for l in fp.read().split("\n")]
chunks = boundaries(data)
df = pd.DataFrame()
for (idx,c) in enumerate(chunks):
a = parse_chunk(data[c[0]:c[1]])
aux = pd.concat([df, a], axis=0, ignore_index=True)
df = aux
fp.close()
return df
def boundaries(data: list[str]):
boundaries = []
start = None
for (idx,l) in enumerate(data):
if start is None:
if not is_blank(l):
start = idx
else:
if is_blank(l):
boundaries.append((start,idx))
start = None
return boundaries
def parse_chunk(chunk_lines: list[str]):
hIdx = None
for (idx, l) in enumerate(chunk_lines):
if l[-1] == "7":
hIdx = idx
break
preambleRet = _parse_preamble(chunk_lines[:hIdx])
phaseRet = _parse_type_7(chunk_lines[hIdx:])
if not validate_no_stations(preambleRet["Estacoes"], phaseRet):
pass
return _concat(preambleRet, phaseRet)
def _parse_preamble(hLines: list[str]):
aux = defaultdict(list)
for line in hLines:
match line[-1]:
case "1":
aux[1].append(line)
case "3":
aux[3].append(line)
case "6":
aux[6].append(line)
case "E":
pass
# aux["E"].append(line)
case "I":
aux["I"].append(line)
case "F":
pass
# aux["F"].append(line)
case _:
pass
headerDict = dict()
for (k,v) in aux.items():
if len(v) != 0:
headerDict.update(FUNCS[k](v))
return headerDict
def _parse_type_1(data: list[str]):
aux = data[0]
y = int(aux[1:5])
mo = int(aux[6:8])
d = int(aux[8:10])
h = int(aux[11:13])
m = int(aux[13:15])
s = int(aux[16:18])
mil = int(aux[19]) * 10**5
dt = datetime(y,mo,d,h,m,s,mil)
dist_ind = DIST_IND[aux[21]]
ev_type = TYPE[aux[22]]
lat = float(aux[23:30])
long = float(aux[30:38])
depth = float(aux[38:43])
no_stat = int(aux[48:51])
hypo = {"Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Ev": ev_type, "Lat": lat, "Long": long, "Prof": depth, "Estacoes": no_stat, "Magnitudes": list()}
for l in data:
hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l)
return hypo
def _parse_mag(line: str):
magnitudes = []
base = 55
while base < 79:
m = line[base:base+4]
mt = line[base+4]
if not is_blank(m):
magnitudes.append({"Magnitude": m, "Tipo": mt})
base += 8
return magnitudes
def _parse_type_3(data: list[str]):
comments = {}
for line in data:
if line.startswith(" SENTIDO") or line.startswith(" REGIAO"):
c, v = line[:-2].strip().split(": ", maxsplit=1)
comments[c.capitalize()] = v
return comments
def _parse_type_6(data: list[str]):
waves = []
for l in data:
waves.append(l.strip().split(" ")[0])
return {"Onda": waves}
def _parse_type_7(data: list[str]):
aux = io.StringIO("\n".join(data))
dados = pd.read_fwf(aux, colspecs=[(1,5), (6,8),(10,15), (18,20), (20,22), (23,28), (34,38)])
dados.rename(columns={'STAT': "Estacao", 'SP': "Componente" , 'PHASW': "Tipo Onda", 'HR': "Hora", 'MM': "Min", 'SECON': "Seg", 'AMPL': "Amplitude"}, inplace=True)
return dados
def _parse_type_e(data: list[str]):
aux = data[0]
error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])}
return error
def _parse_type_i(data: list[str]):
aux = data[0]
return {"ID":int(aux[60:74])}
FUNCS = {1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i}
parse("dados.txt")

View File

@@ -1,96 +0,0 @@
# pyright: basic
import os
import pandas as pd
import numpy as np
STAT_MENU = """=== Earthquakes ===
== Estatísticas ==
[1] Média
[2] Variância
[3] Desvio padrão
[4] Máximo
[5] Mínimo
[Q] Voltar ao menu principal
"""
def stat_menu(df: pd.DataFrame):
inStats = True
while inStats:
os.system("cls")
print(STAT_MENU)
usrIn = input("Opção: ").lower()
match usrIn:
case "1":
pass
case "2":
pass
case "3":
pass
case "4":
pass
case "5":
pass
case "q":
inStats = False
pass
case _:
pass
def average(df: pd.DataFrame, filter_by):
values = df[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.average(values)
def variance(df, filter_by):
values = df[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.var(values)
def std_dev(df, filter_by):
values = df[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.std(values)
def max(df, filter_by):
values = df[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.max(values)
def min(df, filter_by):
values = df[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.min(values)
def _unpack_mags(arr: np.ndarray):
newVals = np.empty(0)
for v in arr:
for m in v:
newVals = np.append(newVals, np.float32(m["Magnitude"]))
return newVals

74
utilsv2/graphs.py Normal file
View File

@@ -0,0 +1,74 @@
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
from utilsv2 import stats, utils
class Plotter:
def __init__(self) -> None:
self.x = []
self.y = []
self.figure = None
self.ax = None
def plot_bars(self, title: str, isDepth: bool = False):
self.figure, self.ax = plt.subplots()
self.ax.bar(self.x, self.y)
self.ax.set_title(title)
plt.show()
def plot_lin(self):
pass
def plot_box(self):
pass
def adjust_x(self):
pass
def add_x_values(self, xvalues):
self.x = xvalues
def add_y_values(self, yvalues):
self.y = yvalues
@staticmethod
def concat_data_day(data):
pass
@staticmethod
def concat_data_month(data):
x = []
y_vals = {"e": [], "m": [], "d": []}
currMonth: datetime = data[0]["DateTime"]
currMonth_str = utils.print_ym(currMonth)
x.append(currMonth_str)
e = 0
m = []
d = []
idx = 0
while idx <= len(data):
if data[idx]["DateTime"].month == currMonth.month and idx < len(data):
e += 1
m.append(data[idx]["Magnitudes"]["L"]["Magnitude"])
d.append(data[idx]["Depth"])
idx += 1
else:
y_vals["e"].append(e)
y_vals["m"].append(np.average(m))
y_vals["d"].append(np.average(d))
currMonth = data[idx]["DateTime"]
currMonth_str = utils.print_ym(currMonth)
x.append(currMonth_str)
e = 0
m = []
d = []
return x, y_vals

10
utilsv2/log.py Normal file
View File

@@ -0,0 +1,10 @@
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO,
filename="ev.log",
)

91
utilsv2/mongo.py Normal file
View File

@@ -0,0 +1,91 @@
import logging
from typing import Any
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.errors import ConnectionFailure
try:
from utilsv2.log import logger
except ModuleNotFoundError:
from log import logger
logger = logging.getLogger(__name__)
def connect(uri: str) -> MongoClient:
try:
client = MongoClient(uri)
logger.info("Connected to the DB")
except ConnectionFailure as e:
logger.critical("Could not connect to the MongoDB")
raise e
return client
def add_events(
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
) -> None:
coll: Collection = client[db][collection]
_res = coll.insert_many(data)
if _res.acknowledged:
logger.info(f"Added {len(_res.inserted_ids)} events to {db}.{collection}")
else:
logger.info("Could not add events to the database.")
def add_stations(
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
) -> None:
coll: Collection = client[db][collection]
_res = coll.insert_many(data)
if _res.acknowledged:
logger.info(f"Added {len(_res.inserted_ids)} stations to {db}.{collection}")
else:
logger.info("Could not add events to the database.")
def get_ids(collection: Collection) -> set[Any]:
return set(collection.distinct("ID"))
def close(client: MongoClient) -> None:
client.close()
logger.info("Closed the DB.")
def query_all(client: MongoClient, collection: str, db: str = "main") -> Any:
coll: Collection = client[db][collection]
result = coll.find({})
return list(result)
def filter_query(
client: MongoClient, collection: str, filter_by: dict[str, Any], db: str = "main"
):
coll: Collection = client[db][collection]
res = coll.find(
filter_by, {"DateTime": 1, "Magnitudes": 1, "Depth": 1, "GAP": 1}
).sort({"DateTime": 1})
if not res._empty:
res = list(res)
logger.info(f"Retrieved {len(res)} elements.")
return res
if __name__ == "__main__":
v = connect("mongodb://localhost:27017")
query_all(v, "quakes")
close(v)

194
utilsv2/nordic.py Normal file
View File

@@ -0,0 +1,194 @@
import json
import logging
from collections import defaultdict
from datetime import datetime, time
from typing import Any
from utilsv2 import utils
from utilsv2.log import logger
logger = logging.getLogger(__name__)
type evtype = dict[str, Any]
type sttype = dict[str, Any]
# INFO: Don't think we really need this
class Mag:
def __init__(self, mag: float, type: str, agency: str):
self.mag = mag
self.type = type
self.agency = agency
def __str__(self):
return f"mag: {self.mag}, type: {self.type}, agency: {self.agency}"
def toJSON(self):
json.dumps({"Magnitude": self.mag, "Agency": self.agency})
def parse_event(event: list[str]) -> evtype:
# nordic must always have the first line a type 1 line
# but a type 1 line can have the id ommited if it's the first line
# if event[0][-1] != "1" or event[0][-1] != " ":
# return {}
toParse: dict[str, list[str]] = defaultdict(list)
for line in event:
toParse[line[-1]].append(line)
_ret = {}
for k, v in toParse.items():
match k:
case "1":
aux = parse_type_1(v)
if aux:
_ret.update(aux)
case "3":
_ret.update(parse_type_3(v))
case "6":
_ret.update(parse_type_6(v))
case "E":
_ret.update(parse_type_e(v))
case "I":
_ret.update(parse_type_i(v))
case _:
pass
return _ret
def parse_stations_V1(lines: list[str], event_id: int) -> sttype:
_ret = {"ID": event_id, "stations": {}}
for st in lines:
try:
ampl = float(st[35:40])
except ValueError:
ampl = None
station = st[1:6].strip()
if station not in _ret["stations"].keys():
_ret["stations"][station] = []
_ret["stations"][station].append(
{
"Component": st[6:9].strip(),
"I": None if st[9] == " " else st[9],
"Time": parse_dt(st[18:30], True).strftime("%H:%M:%S.%f%z"),
"Phase": st[10:15].strip(),
"Weigth": None if st[15] == " " else st[15],
"Amplitude": ampl,
}
)
return _ret
def parse_type_1(lines: list[str]) -> dict[str, Any] | None:
line1 = {}
for line in lines:
if "Date" not in line1.keys():
dt = parse_dt(line[:21])
dist_ind = line[21]
event_id = line[22]
lat = float(line[23:30])
long = float(line[30:38])
depth = float(line[38:44])
mags = parse_mag(line[55:79])
line1.update(
{
"DateTime": dt,
"Distance Indicator": dist_ind,
"Event ID": event_id,
"Latitude": lat,
"Longitude": long,
"Depth": depth,
"Magnitudes": mags,
}
)
else:
mags = parse_mag(line[56:79])
line1["Magnitudes"].union(mags)
return line1
def parse_type_3(lines: list[str]) -> dict[str, Any]:
comments = {"Sentido": "", "Regiao": "", "VZ": None, "SZ": None, "FE": None}
for line in lines:
if line.startswith(" SENTIDO"):
aux = line[:-2].split(":", maxsplit=1)
comments["Sentido"] = aux[1].strip()
elif line.startswith(" REGIAO"):
aux = line[:-2].split(":", maxsplit=1)
for item in aux[1].split(","):
if item.startswith("VZ"):
comments["VZ"] = int(item[2:])
elif item.startswith("SZ"):
comments["SZ"] = int(item[2:])
elif item.startswith("FE"):
comments["FE"] = int(item[2:5])
else:
comments["Regiao"] = item[1:]
return comments
def parse_type_6(lines: list[str]) -> dict[str, list[str]]:
_ret = {"Wavename": []}
for line in lines:
_ret["Wavename"].append(line[:-2].strip())
return _ret
def parse_type_e(lines: list[str]) -> dict[str, int]:
err = {}
for line in lines:
gap = int(line[5:8])
err["GAP"] = gap
return err
def parse_type_i(lines: list[str]) -> dict[str, int]:
aux = {}
for line in lines:
aux["ID"] = int(line[60:75])
return aux
def parse_dt(_text: str, isStation=False) -> datetime | time:
if not isStation:
y = int(_text[0:5])
mo = int(_text[6:8])
d = int(_text[8:10])
h = int(_text[11:13])
m = int(_text[13:15])
s_ms = int(float(_text[16:20]) * 1000)
s = s_ms // 1000
s_ms = s_ms % 1000
dt = datetime(
year=y, month=mo, day=d, hour=h, minute=m, second=s, microsecond=s_ms
)
return dt
else:
h = int(_text[:2])
m = int(_text[2:4])
s_ms = int(float(_text[5:]) * 1000)
s = s_ms // 1000
s_ms = s_ms % 1000
dt = time(hour=h, minute=m, second=s, microsecond=s_ms)
return dt
def parse_mag(_text: str) -> dict[str, Mag]:
mags = {}
for i in range(0, 3):
mag = _text[8 * i : 8 * (i + 1) - 1] # split every 8 chars
if not utils.is_empty(mag):
mags[mag[4]] = {"Magnitude": float(mag[:4]), "Agency": mag[5:]}
return mags

83
utilsv2/parser.py Normal file
View File

@@ -0,0 +1,83 @@
import logging
from io import TextIOWrapper
try:
from utilsv2 import utils
from utilsv2.log import logger
from utilsv2.nordic import evtype, parse_event, parse_stations_V1, sttype
except ModuleNotFoundError:
import utils
from log import logger
from nordic import evtype, parse_event, parse_stations_V1, sttype
logger = logging.getLogger(__name__)
def read_file(fname: str) -> TextIOWrapper | OSError:
try:
fp = open(fname, "r", newline="\n")
return fp
except FileNotFoundError:
return FileNotFoundError("Nenhum ficheiro encontrado")
except PermissionError:
return PermissionError("Sem permissões para abrir")
def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]:
event_indices = []
event_start = -1
idx = 0
for line in fp.read().split("\n"):
if event_start == -1:
event_start = idx
if utils.is_empty(line):
event_indices.append((event_start, idx))
event_start = -1
idx += 1
logger.info("Found %d events", len(event_indices))
return event_indices
def split_event(lines: list[str], start: int, end: int) -> int:
for idx in range(start, end):
if lines[idx].endswith("7"):
return idx
return -1
def extract_event(
fp: TextIOWrapper, event_bounds: list[tuple[int, int]]
) -> tuple[list[evtype], list[sttype]]:
lines = fp.read().split("\n")
events, ev_stations = [], []
for event_idx in event_bounds:
stations = split_event(lines, event_idx[0], event_idx[1])
if stations == -1:
logger.error(f"Could not parse event at pos {event_idx}")
continue
ev = parse_event(lines[event_idx[0] : stations])
events.append(ev)
ev_stations.append(
parse_stations_V1(lines[stations + 1 : event_idx[1]], ev["ID"])
)
return events, ev_stations
def parse(fname: str):
_ret = read_file(fname)
if not isinstance(_ret, TextIOWrapper):
logger.critical(_ret.__str__())
raise _ret
events = find_events(_ret)
_ret.seek(0)
evs, stations = extract_event(_ret, events)
# cleanup
_ret.close()
return evs, stations

83
utilsv2/stats.py Normal file
View File

@@ -0,0 +1,83 @@
import time
from datetime import datetime
import numpy as np
def print_filters(filters):
_res = ""
for k, v in filters.items():
_res += f"{k}: {v}\n"
def pprint(v):
return f"\tMédia: {v[0]} \u00b1 {v[1]}; 1o Quartil: {v[3]}; Mediana: {v[2]}; 3o Quartil: {v[4]}; Máximo: {v[5]}; Mínimo: {v[6]}"
def stats(data):
_stats = f"===Estatística==\nNúmero total de eventos: {len(data)}\n"
aux = []
currMonth: datetime = data[0]["DateTime"]
idx = 0
while idx < len(data):
if data[idx]["DateTime"].month == currMonth.month:
aux.append(data[idx])
idx += 1
else:
m = calc_mag(aux)
d = calc_depth(aux)
aux = []
_stats += f"{currMonth.strftime('%Y-%m')}:\n\tMagnitude: {pprint(m)}\n\tProfundidade: {pprint(d)}\n\n"
currMonth = data[idx]["DateTime"]
m = calc_mag(aux)
d = calc_depth(aux)
_stats += f"{currMonth.strftime('%Y-%m')}:\nMagnitude: {pprint(m)}\nProfundidade: {pprint(d)}\n"
fname = f"stats-{time.time_ns()}.txt"
with open(fname, "wb") as fp:
fp.write(_stats.encode("utf-8"))
# print(_stats)
def calc_depth(data):
if len(data) == 0:
return 0
depths = np.array([v["Depth"] for v in data], dtype=float)
quantile = np.quantile(depths, [0.25, 0.75])
return list(
map(
float,
(
round(np.average(depths), 3),
round(np.std(depths), 3),
round(np.median(depths), 3),
round(quantile[0], 3),
round(quantile[1], 3),
np.min(depths),
np.max(depths),
),
)
)
def calc_mag(data):
if len(data) == 0:
return 0
mags = np.array([v["Magnitudes"]["L"]["Magnitude"] for v in data], dtype=float)
quantile = np.quantile(mags, [0.25, 0.75])
return list(
map(
float,
(
round(np.average(mags), 3),
round(np.std(mags), 3),
round(np.median(mags), 3),
round(quantile[0], 3),
round(quantile[1], 3),
np.min(mags),
np.max(mags),
),
)
)

48
utilsv2/utils.py Normal file
View File

@@ -0,0 +1,48 @@
import os
from datetime import datetime
def is_empty(_str: str) -> bool:
return len(_str.strip(" ")) == 0
def toDateTime(dt: str) -> datetime | int:
if len(dt) == 0:
return -1
try:
return datetime.strptime(dt, "%Y-%m-%d")
except ValueError:
return -1
def toFloat(v: str) -> float:
if len(v) == 0:
return -1.0
try:
return float(v)
except ValueError:
return -1.0
def toInt(v: str) -> int | None:
if len(v) == 0:
return None
try:
return int(v)
except ValueError:
return None
def print_ym(dt: datetime) -> str:
return dt.strftime("%Y-%m")
def fileExists(fname: str) -> bool:
files = set(os.listdir())
if fname not in files:
return False
if not os.path.isfile(fname):
return False
return True