Compare commits

...

23 Commits

Author SHA1 Message Date
05d8de4902 Misc v2 2026-01-05 15:29:49 -01:00
9d65d4a862 misc 2026-01-04 22:35:49 -01:00
9276ff0c47 Fix nas estatísticas 2026-01-04 19:26:19 -01:00
c275bc7065 Mais coisas, estatisticas 2026-01-04 18:51:37 -01:00
1d1826ebe5 Fix to magnitude agency 2026-01-02 23:01:08 -01:00
21f5d29a53 Gerar dados falsos para testar 2026-01-02 22:47:47 -01:00
1e6551a2b1 (feat): Insertion to mongoDB done 2025-12-28 16:22:42 -01:00
1bb945f7e6 Re-implemnting the parser, yay 2025-12-23 18:40:53 -01:00
d9cca721c9 Yeet everything, rebuild from scratch poggies! 2025-12-22 19:15:52 -01:00
7a4de947c6 WHOOPS 2025-12-16 16:58:37 -01:00
c6bd1eb669 O resto dos comentarios 2025-12-13 18:19:38 -01:00
3e0814057f doc: Comentários a cada função
fix: remover código morto ou desnecessário
2025-12-13 17:56:55 -01:00
96aaeed19f merging 2025-12-12 21:46:48 -01:00
smolsbs
f7d1992595 Merge pull request #7 from aulojor/main
T5, 6, 7
2025-12-12 21:38:44 -01:00
Paulo Jorge Medeiros Alexandre
10075123d8 fix: corrigir nome da coluna Prof 2025-12-12 20:53:47 -01:00
Paulo Jorge Medeiros Alexandre
2573cfaf13 fhiiiofhweoi
aaaaaaaa
2025-12-12 20:29:23 -01:00
b3d9a31792 mais coisas de estatistica 2025-12-11 15:42:07 -01:00
14dee58ab2 graficos, estatitsticas e filtros 2025-12-11 15:14:38 -01:00
490c88085a a 2025-12-11 07:56:11 -01:00
aulojor
991d372baf plotly vis 2025-12-04 18:09:13 -01:00
aulojor
a9839e64bf feat: substituir guardar_json pelo utils.save_as_json
Mudei um pouco o save_as_json e talvez tenha ficado um pouco nojento porque não sabia muito bem como fazer uso das EVENT_COLS e STATION_COLS de dentro do utils.py
2025-11-15 22:26:26 -01:00
afef4c4d5c novo json 2025-11-15 16:05:41 -01:00
047f5e25ac mais coisas 2025-11-15 15:34:07 -01:00
19 changed files with 1514 additions and 1514 deletions

3
.gitignore vendored
View File

@@ -1,6 +1,9 @@
*.zip
*.json
*.csv
stats-*.txt
# ---> Python
# Byte-compiled / optimized / DLL files

View File

@@ -2,6 +2,7 @@
## Como utilizar
Correr o ficheiro `earthquakes.py` usando `python earthquakes.py`
Garantir que o ficheiro de dados está no mesmo diretório que o ficheiro `earthquakes.py`
## Objectivos
@@ -13,11 +14,30 @@ First, let's represent the data using Python's Pandas module and implement CRUD
- T2 - Implement CRUD operations through a text menu;
- T3 - Implement statistical operations such as: average, variance, standard desviation, max, min, mode; through a text menu;
- T4 - Convert from Pandas to JSON and save it in a text file;
- T5 - to be continued ...
- T5 - Calcular as seguintes estatísticas:
- Número de eventos por dia e por mês.
- Média e desvio padrão da profundidade e da magnitude por mês.
- Mediana, 1º quartil e 3º quartil da profundidade e da magnitude por mês.
- Máximo e mínimo a profundidade e da magnitude por mês.
- T6 - Para a representação gráfica:
- Um gráfico de barras com o numero de eventos por dia.
- Um gráfico de barras com o numero de eventos por mês.
- Um gráfico linear com a média +/- o desvio padrão das profundidades por mês.
- Um gráfico linear com a média +/- a desvio padrão da magnitude L por mês.
- Um gráfico tipo "boxplot" com as profundidades por mês.
- Um gráfico tipo "boxplot" com as magnitudes L por mês.
- T7 - Implementar os filtros de seleção de eventos para o cálculo / representação gráfica:
- Período temporal (Data inicial, Data final).
- Eventos com GAP menor que um determinado valor.
- Qualidade (EPI ou Todos).
- Zonas SZ.
- Zonas VZ.
- Limitar por Magnitudes L (mínimo, máximo).
- Limitar Profundidades (mínimo, máximo).
## Prazos
- T1 a T4 -> 10 de novembro
- (a definir)
- T5 a T7 -> 14 de dezembro
## Apontamentos
Dados parecem estar no formato [Nordic](https://seisan.info/v13/node259.html)

1237
dados.txt

File diff suppressed because it is too large Load Diff

View File

@@ -1,247 +0,0 @@
#! /usr/bin/env python
# pyright: basic
import os
import sys
from datetime import datetime
import pandas as pd
from utils import parser, crud, stats
HEADER = """=== Terramotos ==="""
MENU ="""[1] Criar a base de dados
[] Atualizar uma entrada (Removido)
[3] Apagar um evento
[4] Apagar uma entrada de um evento
[5] Visualizar um evento
[6] Guardar como JSON
[7] Guardar como CSV
[8] Estatísticas
[9] Criar uma entrada
[Q] Sair
"""
def guardar_df(df: pd.DataFrame, fname: str) -> bool:
with open(fname, "w") as fp:
fname = f"{fname}.txt"
try:
fp.write(df.to_string())
except ValueError:
return False
return True
def guardar_json(df: pd.DataFrame, fname: str) -> bool:
with open(fname , "w") as fp:
try:
df.to_json(fp, indent=4)
except:
return False
return True
def guardar_csv(df: pd.DataFrame, fname: str):
with open(fname, "w") as fp:
try:
df.to_csv(fp, index=False)
except ValueError:
return False
return True
def main():
isRunning = True
db = None
retInfo = None
while isRunning:
os.system("cls" if sys.platform == "windows" else "clear")
print(HEADER + "\n" + MENU)
usrIn = input("Opção: ").lower()
match usrIn:
case "1":
fname = _get_usr_input("Qual os dados a ler? (dados.txt por defeito): ")
if fname is None:
fname = "dados.txt"
if _file_exists(fname) and fname.endswith(".json"):
db = pd.read_json(fname)
print("Base de dados populada.")
elif _file_exists(fname):
db = parser.parse(fname)
input("Base de dados populada. Enter para voltar ao menu inicial")
else:
input("Base de dados não encontrada. Por favor tenta de novo.")
case "2":
pass
# if db is not None:
# crud.read_ids(db)
# eid_choice = _get_usr_input("Escolhe o ID: ", int)
#
# if not _event_exists(db, eid_choice):
# retInfo = "ID do event não encontrado!"
#
# else:
# table = crud.get_table(db, eid_choice)
# crud.show_table(table)
# row_choice = _get_usr_input("Escolhe a linha a atualizar: ", int)
# new_data = {}
# for col in crud.TABLE_READ_RET:
# val = _get_usr_input(f"Novo valor para {col} (Enter para manter o valor atual): ")
# if val is not None:
# new_data[col] = val
# crud.update_table_row(db, row_choice, new_data)
case "3":
if db is not None:
crud.read_ids(db)
choice = _get_usr_input("Escolhe o ID para apagar: ", int)
if not _event_exists(db, choice):
retInfo = "ID do event não encontrado!"
else:
db = crud.delete_event(db, choice)
input()
else:
retInfo = "Base de dados não encontrada!"
case "4":
if db is not None:
crud.read_ids(db)
eid_choice = _get_usr_input("Escolhe o ID: ", int)
if not _event_exists(db, eid_choice):
retInfo = "ID do event não encontrado!"
else:
os.system("cls" if sys.platform == "windows" else "clear")
table = crud.get_table(db, eid_choice)
_prettify_event(table)
crud.show_table(table)
row_choice = _get_usr_input("Escolhe a linha a apagar:", int)
# TODO: balizar a escolha para apenas as linhas do evento em questao
db, msg = crud.delete_table_row(db, eid_choice, row_choice)
new_table = crud.get_table(db, eid_choice)
crud.show_table(new_table)
print(msg)
input()
else:
retInfo = "Base de dados não encontrada!"
case "5":
if db is not None:
crud.read_ids(db)
choice = _get_usr_input("Escolhe o ID para ver os dados: ", int)
if not _event_exists(db, choice):
retInfo = "ID do event não encontrado!"
else:
os.system("cls" if sys.platform == "windows" else "clear")
table = crud.get_table(db, choice)
_prettify_event(table)
crud.show_table(table)
input()
else:
retInfo = "Base de dados não encontrada!"
case "6":
if db is not None:
fname = _get_usr_input("Nome do ficheiro a guardar? ")
if fname is None:
fname = "valores.json"
guardar_json(db, fname)
else:
retInfo = "Base de dados não encontrada!"
case "7":
if db is not None:
fname = _get_usr_input("Nome do ficheiro a guardar? ")
if fname is None:
fname = "valores.csv"
guardar_csv(db, fname)
else:
retInfo = "Base de dados não encontrada!"
case "8":
if db is not None:
stats.stat_menu(db)
else:
retInfo = "Base de dados não encontrada!"
case "9":
if db is not None:
crud.read_ids(db)
eid_choice = _get_usr_input("Escolhe o ID: ", int)
if not _event_exists(db, eid_choice):
retInfo = "ID do event não encontrado!"
else:
os.system("cls" if sys.platform == "windows" else "clear")
table = crud.get_table(db, eid_choice)
_prettify_event(table)
crud.show_table(table)
insertion_point = _get_usr_input("Posição da nova linha: ", int)
# TODO: balizar a escolha para apenas as linhas do evento em questao
db, msg = crud.create_table_row(db, eid_choice, insertion_point)
new_table = crud.get_table(db, eid_choice)
crud.show_table(new_table)
print(msg)
input()
else:
retInfo = "Base de dados não encontrada!"
case "q":
isRunning = False
continue
case _:
pass
if retInfo:
print(retInfo)
retInfo = None
input("Clique Enter para continuar")
def _file_exists(name: str) -> bool:
currFiles = os.listdir(os.getcwd())
if name in currFiles:
return True
return False
def _event_exists(df, eid) -> bool:
allEvents = set(df["ID"])
return eid in allEvents
def _get_usr_input(msg:str, asType=str):
usrIn = input(msg)
if usrIn == "":
return None
return asType(usrIn)
def _prettify_event(df):
preambleInfo = df.drop_duplicates(subset="ID", keep="first")
stations = df[["Estacao", "Componente", "Tipo Onda", "Amplitude"]]
info = df.drop_duplicates(subset="Data", keep="first")
data = datetime.fromisoformat(info.Data.values[0]).strftime("%c")
print(f"Região: {info["Regiao"].values[0]}\nData: {data}\nLatitude: {info.Lat.values[0]}\nLongitude: {info.Long.values[0]}"
+ f"\nProfundidade: {info.Prof.values[0]}\nTipo de evento: {info['Tipo Ev'].values[0]}\n")
if __name__ == '__main__':
main()

172
ev2.py Normal file
View File

@@ -0,0 +1,172 @@
import logging
import os
import time
from utilsv2 import mongo, parser, stats, utils
from utilsv2.log import logger
OS = os.name
MAIN_MENU = {
"1": "Adicionar novos dados",
"2": "Aplicar filtros",
"3": "Estatísticas",
"4": "Limpar filtros",
"q": "Sair",
}
FILTER_MENU = {
"1": "Data início",
"2": "Magnitudes",
"3": "Profundidade",
"4": "GAP",
"6": "Limpar filtros",
"7": "Mostrar filtros",
"q": "Voltar ao menu principal",
}
def clear_screen():
os.system("cls" if OS == "nt" else "clear")
def print_menu(menu: dict[str, str]):
clear_screen()
for k, v in menu.items():
print(f"[{k}]: {v}")
def filter_menu(old_fiters):
filters = old_fiters
while True:
print_menu(FILTER_MENU)
usrIn = input()
match usrIn:
# datas
case "1":
clear_screen()
print(
"Formato da data: YYYY-MM-DD\nInserir datas de corte, separadas por uma vírgula(,)"
)
aux = input()
d1, d2 = aux.split(",", maxsplit=1)
d1 = utils.toDateTime(d1)
d2 = utils.toDateTime(d2)
filters["DateTime"] = {}
if d1 != -1:
filters["DateTime"]["$gte"] = d1
if d2 != -1:
filters["DateTime"]["$lte"] = d2
# magnitudes
case "2":
clear_screen()
print("Inserir magnitudes de corte, separadas por uma vírgula(,)")
aux = input()
d1, d2 = aux.split(",", maxsplit=1)
d1 = utils.toFloat(d1)
d2 = utils.toFloat(d2)
filters["Magnitudes.L.Magnitude"] = {}
if d1 != -1:
filters["Magnitudes.L.Magnitude"]["$gte"] = d1
if d2 != -1:
filters["Magnitudes.L.Magnitude"]["$lte"] = d2
# Profundidades
case "3":
clear_screen()
print("Inserir profundidades de corte, separadas por uma vírgula(,)")
aux = input()
d1, d2 = aux.split(",", maxsplit=1)
d1 = utils.toFloat(d1)
d2 = utils.toFloat(d2)
filters["Depth"] = {}
if d1 != -1:
filters["Depth"]["$gte"] = d1
if d2 != -1:
filters["Depth"]["$lte"] = d2
# GAP
case "4":
clear_screen()
print("Inserir GAP")
aux = input()
gap = utils.toInt(aux)
filters["GAP"] = {}
if aux:
filters["GAP"]["$lte"] = gap
case "6":
fliters = {}
case "7":
print(filters)
time.sleep(2.0)
case "q":
return filters
def graph_menu():
pass
def main():
cli = mongo.connect("mongodb://localhost:27017")
filters = {}
while True:
print_menu(MAIN_MENU)
usrIn = input()
match usrIn:
case "1":
aux = input("Ficheiro a ler:")
if utils.fileExists(aux):
logger.info(f"Parsing the file {aux}")
ev, st = parser.parse(aux)
mongo.add_events(cli, "quakes", ev, "main")
mongo.add_stations(cli, "stations", st, "main")
else:
print(f"Could not open the file {aux}")
logger.error(f"Could not open the file {aux}")
time.sleep(2.0)
case "2":
filters = filter_menu(filters)
case "3":
print(filters)
v = mongo.filter_query(cli, "quakes", filters, "test")
stats.stats(v)
time.sleep(2.0)
case "q":
break
mongo.close(cli)
if __name__ == "__main__":
logger = logging.getLogger(__name__)
# initialization
logger.info("Started")
main()
logger.info("Ended")

113
generator/gen-data.py Normal file
View File

@@ -0,0 +1,113 @@
import argparse
import random
import re
import time
from datetime import datetime
PAD = re.compile(r"^0?([1-9]\d?)$")
NORDIC_7 = (
" STAT SP IPHASW D HRMM SECON CODA AMPLIT PERI AZIMU VELO AIN AR TRES W DIS CAZ7"
)
def generate_event():
ts = random.randint(946684800000, 1767225600000)
dt = datetime.fromtimestamp(ts / 1e3)
line_3 = " OBS: Dados falsos".ljust(79) + "3"
stat = generate_station(dt)
return "\n".join(
[
gen_line_1(dt),
line_3,
generate_line6(dt),
generate_line_i(dt),
NORDIC_7,
stat,
"\n",
]
)
def generate_agency(size: int = 3) -> str:
return "".join([chr(random.randint(65, 90)) for _ in range(size)])
def remove_pad(v: str) -> str:
aux = PAD.search(v)
if aux:
return aux.group(0)
return ""
def fmt_date(dt: datetime) -> str:
return f"{dt.year} {dt.month:2d}{dt.day:2d} {dt.hour:2d}{dt.minute:2d} {dt.second:2d}.0"
# 1D Lerp
def generate_number(lb: float, ub: float, precision: int = 2) -> float:
x = random.random()
return round(lb * (1.0 - x) + (ub * x), precision)
def gen_line_1(dt: datetime) -> str:
lat = generate_number(-90.0, 90.0, 3)
long = generate_number(-180.0, 180.0, 3)
ev = random.choices(("E", "V", "Q"))[0]
depth = generate_number(0.0, 30.0, 1)
agency = generate_agency()
mag = generate_number(1.0, 6.0, 1)
return (
f" {fmt_date(dt)} L{ev}{lat: >7.3f}{long: >8.3f}{depth:>5.1f} {agency} 1 {mag: >4.1f}L{agency}{mag: >4.1f}C{agency}".ljust(
79
)
+ "1"
)
def generate_line6(dt: datetime) -> str:
return f" {dt.strftime('%Y-%m-%d-%H%M-%S')}-FAKE___001".ljust(79) + "6"
def generate_line_i(dt: datetime) -> str:
return " " * 57 + f"ID:{dt.strftime('%Y%m%d%H%M%S')} I"
def generate_station(dt: datetime) -> str:
st = generate_agency(4)
return "\n".join(
[
f" {st} EZ EP {dt.hour:2d}{dt.minute:2d} {dt.second: >5.2f}".ljust(
80
),
f" {st} EZ ES {dt.hour:2d}{dt.minute:2d} {dt.second: >5.2f}".ljust(
80
),
]
)
def main(argv: int):
fp = open("falsos.txt", "w", newline="\n")
for _ in range(argv):
aux = generate_event()
fp.write(aux)
fp.close()
if __name__ == "__main__":
random.seed(time.time())
parser = argparse.ArgumentParser()
parser.add_argument(
"n", action="store", type=int, help="Generates n amount of events"
)
args = parser.parse_args()
if args.n:
main(args.n)

Binary file not shown.

View File

@@ -1,2 +1,3 @@
numpy==2.3.4
pandas==2.3.3
matplotlib==3.10.8
numpy==2.4.0
pymongo==4.15.5

View File

@@ -1,171 +0,0 @@
# pyright: basic
import pandas as pd
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 150)
# -- globals
HEADER_COLS = ["Data", "Distancia", "Tipo Ev", "Lat", "Long", "Prof", "Magnitudes"]
TABLE_READ_RET = ["Estacao", "Hora", "Min", "Seg", "Componente", "Amplitude"]
# -- helper funcs
def _get_uniques(df) -> pd.DataFrame:
return df.get(["ID", "Data", "Regiao"]).drop_duplicates(subset="ID", keep="first")
def _show_events(df):
for (_, row) in df.iterrows():
print(f"{row["ID"]}: {row["Regiao"]}")
# -- main
def read_ids(df):
ids = _get_uniques(df)
_show_events(ids)
def get_unique_events_table(df):
return df.drop_duplicates(subset="ID", keep="first")
def read_header(df, event_id):
# Informações do header do evento
row = df[df["ID"] == event_id].iloc[0]
cols = list(df.columns)
# end = cols.index("ID") - 1
# header_cols = cols[:end]
# Para selecionar todas as colunas em vez de só algumas
info = []
for (i, col) in enumerate(HEADER_COLS):
info.append(f"{i+1} {col}: {row[col]}")
infoString = f"Header do evento {event_id}:\n" + "\n".join(info)
return infoString
def show_table(df, retCols=TABLE_READ_RET):
print(df.loc[:,retCols])
def get_table(df, event_id):
rows = df[df["ID"] == event_id]
return rows
def read_table_row(df, event_id, row_number_1):
# retorna uma linha específica da tabela
row_number_0 = row_number_1 - 1
table = get_table(df, event_id)
if row_number_0 < 0 or row_number_0 >= len(table):
return f"Linha {row_number_1} não pertence ao evento {event_id}."
row = table.iloc[row_number_0]
cols = list(df.columns)
start = cols.index("Estacao")
tableCols = cols[start:]
info = []
for (i, col) in enumerate(tableCols):
info.append(f"{i+1} {col}: {row[col]}")
return f"Linha {row_number_1:02d} do evento {event_id}:\n" + "\n".join(info)
def update_table_row(df, row_line, new_data):
for key, value in new_data.items():
if key in df.columns:
df.loc[row_line, key] = value
return f"Linha {row_line} do evento atualizada com sucesso."
def update_header(df, event_id, new_data):
# atualiza o header de um evento
for key, value in new_data.items():
if key in df.columns:
df.loc[(df["ID"] == event_id) | df.iloc[0], key] = value
return f"Header do evento {event_id} atualizado com sucesso."
def delete_event(df, event_id):
# Apaga um evento inteiro (header + tabela)
new_df = df.drop(df[df["ID"] == event_id].index)
print(f"Evento {event_id} apagado!")
return new_df
def delete_table_row(df, event_id, row_number):
# Apaga uma linha específica da tabela do evento
# Cria uma nova linha vazia no dataframe na posição insertion_point
matching_indices = df.index[df['ID'] == event_id].tolist()
first_event_row = matching_indices[0]
last_event_row = matching_indices[-1]
if row_number < first_event_row or row_number > last_event_row:
return df, f"Erro: A posição a apagar, {row_number} está fora do intervalo permitido para o evento {event_id}."
new_df = df.drop([row_number]).reset_index(drop=True)
return new_df, f"Linha {row_choice} apagada com sucesso!"
def create_blank_event(df, event_id):
# Criar um evento vazio com linha de header e 1 linha de coluna
df.loc[df["ID"] >= event_id, "ID"] += 1
blank_row_df = pd.DataFrame(columns=df.columns, index=[0, 1])
blank_row_df["ID"] = event_id
blank_row_df = blank_row_df.astype(df.dtypes)
new_df = pd.concat([df, blank_row_df], ignore_index=True)
new_df = new_df.sort_values(by="ID", kind="mergesort").reset_index(drop=True)
return new_df
def create_table_row(df, event_id, insertion_point):
# Cria uma nova linha vazia no dataframe na posição insertion_point
matching_indices = df.index[df['ID'] == event_id].tolist()
first_event_row = matching_indices[0]
last_event_row = matching_indices[-1]
if insertion_point < first_event_row or insertion_point > last_event_row + 1:
return df, f"Erro: A posição de inserção {insertion_point} está fora do intervalo permitido para o evento {event_id}"
new_row_df = pd.DataFrame(columns=df.columns, index=[0])
new_row_df['ID'] = event_id
new_row_df = new_row_df.fillna(0)
new_row_df = new_row_df.astype(df.dtypes)
df_before = df.iloc[:insertion_point]
df_after = df.iloc[insertion_point:]
new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True)
return new_df, f"Linha inserida com sucesso na posição {insertion_point}"
def create_entire_database() -> pd.DataFrame:
pass
def create_table_row_old(df, event_id, row_number_1):
event_rows = df[df["ID"] == event_id]
if event_rows.empty:
return df, f"Erro: Evento com ID {event_id} não encontrado."
header_idx = event_rows.index[0]
table_size = len(event_rows.index) - 1
# Validar posição da nova linha
if not (1 <= row_number_1 <= table_size + 1):
return df, f"Erro: Posição {row_number_1} inválida. Evento {event_id} tem {table_size} linha(s) na tabela."
insertion_point = header_idx + row_number_1
new_row_df = pd.DataFrame(columns=df.columns, index=[0])
new_row_df['ID'] = event_id
new_row_df = new_row_df.astype(df.dtypes)
df_before = df.iloc[:insertion_point]
df_after = df.iloc[insertion_point:]
new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True)
return new_df, f"Linha inserida com sucesso na posição {row_number_1} do evento {event_id}."

View File

@@ -1,194 +0,0 @@
# pyright: basic
import io
from collections import defaultdict
from datetime import datetime
import pandas as pd
# --- globals ---
DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"}
TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"}
# --- helper funcs ---
def is_blank(l: str) -> bool:
return len(l.strip(" ")) == 0
def parse_flt(v:str) -> float | None:
try:
t = float(v)
return t
except ValueError:
return None
def parse_int(v:str) -> int | None:
try:
t = int(v)
return t
except ValueError:
return None
def into_dataframe(data) -> pd.DataFrame:
if len(data) == 0:
return pd.DataFrame()
aux = {k: [] for k in data.keys()}
for (k,v) in data.items():
aux[k].append(v)
return pd.DataFrame(data=aux)
def _concat(preamble, df: pd.DataFrame):
for (k,v) in preamble.items():
df.insert(len(df.columns)-1, k, [v for _ in range(len(df))])
return df
def validate_no_stations(expected:int , stationsDF:pd.DataFrame) -> bool:
uniqueStations = stationsDF["Estacao"].nunique()
return expected == uniqueStations
# --- principal ---
def parse(fname):
fp = open(fname)
data = [l for l in fp.read().split("\n")]
chunks = boundaries(data)
df = pd.DataFrame()
for (idx,c) in enumerate(chunks):
a = parse_chunk(data[c[0]:c[1]])
aux = pd.concat([df, a], axis=0, ignore_index=True)
df = aux
fp.close()
return df
def boundaries(data: list[str]):
boundaries = []
start = None
for (idx,l) in enumerate(data):
if start is None:
if not is_blank(l):
start = idx
else:
if is_blank(l):
boundaries.append((start,idx))
start = None
return boundaries
def parse_chunk(chunk_lines: list[str]):
hIdx = None
for (idx, l) in enumerate(chunk_lines):
if l[-1] == "7":
hIdx = idx
break
preambleRet = _parse_preamble(chunk_lines[:hIdx])
phaseRet = _parse_type_7(chunk_lines[hIdx:])
if not validate_no_stations(preambleRet["Estacoes"], phaseRet):
pass
return _concat(preambleRet, phaseRet)
def _parse_preamble(hLines: list[str]):
aux = defaultdict(list)
for line in hLines:
match line[-1]:
case "1":
aux[1].append(line)
case "3":
aux[3].append(line)
case "6":
aux[6].append(line)
case "E":
pass
# aux["E"].append(line)
case "I":
aux["I"].append(line)
case "F":
pass
# aux["F"].append(line)
case _:
pass
headerDict = dict()
for (k,v) in aux.items():
if len(v) != 0:
headerDict.update(FUNCS[k](v))
return headerDict
def _parse_type_1(data: list[str]):
aux = data[0]
y = int(aux[1:5])
mo = int(aux[6:8])
d = int(aux[8:10])
h = int(aux[11:13])
m = int(aux[13:15])
s = int(aux[16:18])
mil = int(aux[19]) * 10**5
dt = datetime(y,mo,d,h,m,s,mil)
dist_ind = DIST_IND[aux[21]]
ev_type = TYPE[aux[22]]
lat = float(aux[23:30])
long = float(aux[30:38])
depth = float(aux[38:43])
no_stat = int(aux[48:51])
hypo = {"Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Ev": ev_type, "Lat": lat, "Long": long, "Prof": depth, "Estacoes": no_stat, "Magnitudes": list()}
for l in data:
hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l)
return hypo
def _parse_mag(line: str):
magnitudes = []
base = 55
while base < 79:
m = line[base:base+4]
mt = line[base+4]
if not is_blank(m):
magnitudes.append({"Magnitude": m, "Tipo": mt})
base += 8
return magnitudes
def _parse_type_3(data: list[str]):
comments = {}
for line in data:
if line.startswith(" SENTIDO") or line.startswith(" REGIAO"):
c, v = line[:-2].strip().split(": ", maxsplit=1)
v = v.split(",")[0]
comments[c.capitalize()] = v
return comments
def _parse_type_6(data: list[str]):
waves = []
for l in data:
waves.append(l.strip().split(" ")[0])
return {"Onda": waves}
def _parse_type_7(data: list[str]):
aux = io.StringIO("\n".join(data))
dados = pd.read_fwf(aux, colspecs=[(1,5), (6,8),(10,15), (18,20), (20,22), (23,28), (34,38), (71,75)])
dados.rename(columns={'STAT': "Estacao", 'SP': "Componente" , 'PHASW': "Tipo Onda", 'HR': "Hora", 'MM': "Min", 'SECON': "Seg", 'AMPL': "Amplitude", " DIST": "Dist. Epi"}, inplace=True)
return dados
def _parse_type_e(data: list[str]):
aux = data[0]
error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])}
return error
def _parse_type_i(data: list[str]):
aux = data[0]
return {"ID":int(aux[60:74])}
FUNCS = {1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i}

View File

@@ -1,203 +0,0 @@
# pyright: basic
import os
import sys
import pandas as pd
import numpy as np
STAT_HEADER ="""=== Terramotos ===
== Estatísticas ==
"""
STAT_MENU = """[1] Média
[2] Variância
[3] Desvio padrão
[4] Máximo
[5] Mínimo
[6] Moda
[Q] Voltar ao menu principal
"""
FILTER_CHOICES = """[1] Magnitudes
[2] Distância
[3] Profundidade
"""
CHOICE = {"1": "Magnitudes", "2": "Distancia","3": "Prof"}
def filter_submenu(type: str):
os.system("cls" if sys.platform == "windows" else "clear")
print(f"{STAT_HEADER}\n = {type} = ")
print(FILTER_CHOICES)
choice = input("Qual dos valores: ")
try:
usrChoice = CHOICE[choice]
return usrChoice
except KeyError:
return None
def stat_menu(df: pd.DataFrame):
inStats = True
while inStats:
os.system("cls" if sys.platform == "windows" else "clear")
print(STAT_HEADER + "\n" + STAT_MENU)
usrIn = input("Opção: ").lower()
match usrIn:
case "1":
c = filter_submenu("Média")
if c is not None:
retValue = average(df, c)
if retValue:
print(f"A média de {c} é {retValue}")
else:
print("Um erro aconteceu. Nada a apresentar de momento.")
else:
continue
case "2":
c = filter_submenu("Variância")
if c is not None:
retValue = variance(df, c)
if retValue:
print(f"A variância dos dados de {c} é {retValue}")
else:
print("Um erro aconteceu. Nada a apresentar de momento.")
else:
continue
case "3":
c = filter_submenu("Desvio Padrão")
if c is not None:
retValue = std_dev(df, c)
if retValue:
print(f"O desvio padrão de {c} é {retValue}")
else:
print("Um erro aconteceu. Nada a apresentar de momento.")
else:
continue
case "4":
c = filter_submenu("Máximo")
if c is not None:
retValue = max_v(df, c)
print(f"O valor máximo em {c} é {retValue}")
else:
continue
case "5":
c = filter_submenu("Mínimo")
if c is not None:
retValue = min_v(df, c)
print(f"O valor mínimo em {c} é {retValue}")
else:
continue
case "6":
c = filter_submenu("Mínimo")
if c is not None:
retValue = moda(df, c)
print(f"O valor moda em {c} é {retValue}")
else:
continue
case "q":
inStats = False
continue
case _:
pass
input("Clica `Enter` para continuar")
def average(df: pd.DataFrame, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
try:
return np.average(values)
except:
return None
def variance(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
try:
return np.var(values)
except:
return None
def std_dev(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
try:
return np.std(values)
except:
return None
def max_v(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.max(values)
def min_v(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.min(values)
def moda(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
uniques, count = np.unique(values, return_counts=True)
uniques_list = list(zip(uniques, count))
return sorted(uniques_list, reverse=True ,key=lambda x: x[1])[0][0]
def _unpack_mags(arr: np.ndarray):
newVals = np.empty(0)
for v in arr:
for m in v:
newVals = np.append(newVals, float(m["Magnitude"]))
return newVals

View File

@@ -1,76 +0,0 @@
#! /usr/bin/env python
# pyright: basic
from datetime import time
import json
from math import modf
from typing import Any
from numpy import nan
import pandas as pd
def save_as_json(info: dict[str, Any]) -> bool:
with open("test.json", "w") as fp:
json.dump(info, fp)
return True
# TODO: passar os nomes das colunas, para não haver problemas no futuro, caso se altere os nomes da dataframe
def create_dict_struct(df: pd.DataFrame, event_cols, station_cols) -> dict[str, Any]:
# get all events by their id
uniqueIds = df["ID"].unique()
allEvents = {}
for id in uniqueIds:
filteredDf = df.loc[df["ID"] == id]
first_row = filteredDf.head(1)
allEvents[int(id)] = create_event_info(first_row)
allEvents[int(id)].update(create_stations_info_1(filteredDf))
return allEvents
def create_event_info(info: pd.DataFrame) -> dict[str, Any]:
return {"DataHora": info.iloc[0]["Data"], "Lat": float(info.iloc[0]["Lat"]), "Long": float(info.iloc[0]["Long"]),
"Profundidade": float(info.iloc[0]["Prof"]), "Tipo Evento": info.iloc[0]["Tipo Ev"],
"Magnitude": create_mag_info(info.iloc[0]["Magnitudes"]), "Regiao": info.iloc[0]["Regiao"],
"Sentido": info.iloc[0]["Sentido"]}
def create_stations_info_1(info: pd.DataFrame) -> dict[str, Any]:
stationsDict = {}
for idx in range(len(info)):
aux = info.iloc[idx]
micro, sec = tuple(map(int, modf(aux["Seg"])))
hms = time(hour=aux["Hora"],minute=aux["Min"], second=sec, microsecond=micro).strftime("%H:%M:%S.%f")
station = {"Componente": aux["Componente"], "Hora": hms, "Distancia": float(aux["DIS"])}
if type(aux["Tipo Onda"]) != float:
station.update({"Tipo Onda": aux["Tipo Onda"]})
if aux["Tipo Onda"] == "IAML":
station.update({"Amplitude": float(aux["Amplitude"])})
if aux["Estacao"] not in stationsDict.keys():
stationsDict[aux["Estacao"]] = [station]
else:
stationsDict[aux["Estacao"]].append(station)
return {"Estacoes": stationsDict}
def create_mag_info(magnitudes):
mags = {}
for value in magnitudes:
mags[value["Tipo"]] = value["Magnitude"]
return mags
if __name__ == '__main__':
import parser
df = parser.parse("dados.txt")
a = create_dict_struct(df, None, None)
save_as_json(a)

74
utilsv2/graphs.py Normal file
View File

@@ -0,0 +1,74 @@
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
from utilsv2 import stats, utils
class Plotter:
def __init__(self) -> None:
self.x = []
self.y = []
self.figure = None
self.ax = None
def plot_bars(self, title: str, isDepth: bool = False):
self.figure, self.ax = plt.subplots()
self.ax.bar(self.x, self.y)
self.ax.set_title(title)
plt.show()
def plot_lin(self):
pass
def plot_box(self):
pass
def adjust_x(self):
pass
def add_x_values(self, xvalues):
self.x = xvalues
def add_y_values(self, yvalues):
self.y = yvalues
@staticmethod
def concat_data_day(data):
pass
@staticmethod
def concat_data_month(data):
x = []
y_vals = {"e": [], "m": [], "d": []}
currMonth: datetime = data[0]["DateTime"]
currMonth_str = utils.print_ym(currMonth)
x.append(currMonth_str)
e = 0
m = []
d = []
idx = 0
while idx <= len(data):
if data[idx]["DateTime"].month == currMonth.month and idx < len(data):
e += 1
m.append(data[idx]["Magnitudes"]["L"]["Magnitude"])
d.append(data[idx]["Depth"])
idx += 1
else:
y_vals["e"].append(e)
y_vals["m"].append(np.average(m))
y_vals["d"].append(np.average(d))
currMonth = data[idx]["DateTime"]
currMonth_str = utils.print_ym(currMonth)
x.append(currMonth_str)
e = 0
m = []
d = []
return x, y_vals

10
utilsv2/log.py Normal file
View File

@@ -0,0 +1,10 @@
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
level=logging.INFO,
filename="ev.log",
)

91
utilsv2/mongo.py Normal file
View File

@@ -0,0 +1,91 @@
import logging
from typing import Any
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.errors import ConnectionFailure
try:
from utilsv2.log import logger
except ModuleNotFoundError:
from log import logger
logger = logging.getLogger(__name__)
def connect(uri: str) -> MongoClient:
try:
client = MongoClient(uri)
logger.info("Connected to the DB")
except ConnectionFailure as e:
logger.critical("Could not connect to the MongoDB")
raise e
return client
def add_events(
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
) -> None:
coll: Collection = client[db][collection]
_res = coll.insert_many(data)
if _res.acknowledged:
logger.info(f"Added {len(_res.inserted_ids)} events to {db}.{collection}")
else:
logger.info("Could not add events to the database.")
def add_stations(
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
) -> None:
coll: Collection = client[db][collection]
_res = coll.insert_many(data)
if _res.acknowledged:
logger.info(f"Added {len(_res.inserted_ids)} stations to {db}.{collection}")
else:
logger.info("Could not add events to the database.")
def get_ids(collection: Collection) -> set[Any]:
return set(collection.distinct("ID"))
def close(client: MongoClient) -> None:
client.close()
logger.info("Closed the DB.")
def query_all(client: MongoClient, collection: str, db: str = "main") -> Any:
coll: Collection = client[db][collection]
result = coll.find({})
return list(result)
def filter_query(
client: MongoClient, collection: str, filter_by: dict[str, Any], db: str = "main"
):
coll: Collection = client[db][collection]
res = coll.find(
filter_by, {"DateTime": 1, "Magnitudes": 1, "Depth": 1, "GAP": 1}
).sort({"DateTime": 1})
if not res._empty:
res = list(res)
logger.info(f"Retrieved {len(res)} elements.")
return res
if __name__ == "__main__":
v = connect("mongodb://localhost:27017")
query_all(v, "quakes")
close(v)

194
utilsv2/nordic.py Normal file
View File

@@ -0,0 +1,194 @@
import json
import logging
from collections import defaultdict
from datetime import datetime, time
from typing import Any
from utilsv2 import utils
from utilsv2.log import logger
logger = logging.getLogger(__name__)
type evtype = dict[str, Any]
type sttype = dict[str, Any]
# INFO: Don't think we really need this
class Mag:
def __init__(self, mag: float, type: str, agency: str):
self.mag = mag
self.type = type
self.agency = agency
def __str__(self):
return f"mag: {self.mag}, type: {self.type}, agency: {self.agency}"
def toJSON(self):
json.dumps({"Magnitude": self.mag, "Agency": self.agency})
def parse_event(event: list[str]) -> evtype:
# nordic must always have the first line a type 1 line
# but a type 1 line can have the id ommited if it's the first line
# if event[0][-1] != "1" or event[0][-1] != " ":
# return {}
toParse: dict[str, list[str]] = defaultdict(list)
for line in event:
toParse[line[-1]].append(line)
_ret = {}
for k, v in toParse.items():
match k:
case "1":
aux = parse_type_1(v)
if aux:
_ret.update(aux)
case "3":
_ret.update(parse_type_3(v))
case "6":
_ret.update(parse_type_6(v))
case "E":
_ret.update(parse_type_e(v))
case "I":
_ret.update(parse_type_i(v))
case _:
pass
return _ret
def parse_stations_V1(lines: list[str], event_id: int) -> sttype:
_ret = {"ID": event_id, "stations": {}}
for st in lines:
try:
ampl = float(st[35:40])
except ValueError:
ampl = None
station = st[1:6].strip()
if station not in _ret["stations"].keys():
_ret["stations"][station] = []
_ret["stations"][station].append(
{
"Component": st[6:9].strip(),
"I": None if st[9] == " " else st[9],
"Time": parse_dt(st[18:30], True).strftime("%H:%M:%S.%f%z"),
"Phase": st[10:15].strip(),
"Weigth": None if st[15] == " " else st[15],
"Amplitude": ampl,
}
)
return _ret
def parse_type_1(lines: list[str]) -> dict[str, Any] | None:
line1 = {}
for line in lines:
if "Date" not in line1.keys():
dt = parse_dt(line[:21])
dist_ind = line[21]
event_id = line[22]
lat = float(line[23:30])
long = float(line[30:38])
depth = float(line[38:44])
mags = parse_mag(line[55:79])
line1.update(
{
"DateTime": dt,
"Distance Indicator": dist_ind,
"Event ID": event_id,
"Latitude": lat,
"Longitude": long,
"Depth": depth,
"Magnitudes": mags,
}
)
else:
mags = parse_mag(line[56:79])
line1["Magnitudes"].union(mags)
return line1
def parse_type_3(lines: list[str]) -> dict[str, Any]:
comments = {"Sentido": "", "Regiao": "", "VZ": None, "SZ": None, "FE": None}
for line in lines:
if line.startswith(" SENTIDO"):
aux = line[:-2].split(":", maxsplit=1)
comments["Sentido"] = aux[1].strip()
elif line.startswith(" REGIAO"):
aux = line[:-2].split(":", maxsplit=1)
for item in aux[1].split(","):
if item.startswith("VZ"):
comments["VZ"] = int(item[2:])
elif item.startswith("SZ"):
comments["SZ"] = int(item[2:])
elif item.startswith("FE"):
comments["FE"] = int(item[2:5])
else:
comments["Regiao"] = item[1:]
return comments
def parse_type_6(lines: list[str]) -> dict[str, list[str]]:
_ret = {"Wavename": []}
for line in lines:
_ret["Wavename"].append(line[:-2].strip())
return _ret
def parse_type_e(lines: list[str]) -> dict[str, int]:
err = {}
for line in lines:
gap = int(line[5:8])
err["GAP"] = gap
return err
def parse_type_i(lines: list[str]) -> dict[str, int]:
aux = {}
for line in lines:
aux["ID"] = int(line[60:75])
return aux
def parse_dt(_text: str, isStation=False) -> datetime | time:
if not isStation:
y = int(_text[0:5])
mo = int(_text[6:8])
d = int(_text[8:10])
h = int(_text[11:13])
m = int(_text[13:15])
s_ms = int(float(_text[16:20]) * 1000)
s = s_ms // 1000
s_ms = s_ms % 1000
dt = datetime(
year=y, month=mo, day=d, hour=h, minute=m, second=s, microsecond=s_ms
)
return dt
else:
h = int(_text[:2])
m = int(_text[2:4])
s_ms = int(float(_text[5:]) * 1000)
s = s_ms // 1000
s_ms = s_ms % 1000
dt = time(hour=h, minute=m, second=s, microsecond=s_ms)
return dt
def parse_mag(_text: str) -> dict[str, Mag]:
mags = {}
for i in range(0, 3):
mag = _text[8 * i : 8 * (i + 1) - 1] # split every 8 chars
if not utils.is_empty(mag):
mags[mag[4]] = {"Magnitude": float(mag[:4]), "Agency": mag[5:]}
return mags

83
utilsv2/parser.py Normal file
View File

@@ -0,0 +1,83 @@
import logging
from io import TextIOWrapper
try:
from utilsv2 import utils
from utilsv2.log import logger
from utilsv2.nordic import evtype, parse_event, parse_stations_V1, sttype
except ModuleNotFoundError:
import utils
from log import logger
from nordic import evtype, parse_event, parse_stations_V1, sttype
logger = logging.getLogger(__name__)
def read_file(fname: str) -> TextIOWrapper | OSError:
try:
fp = open(fname, "r", newline="\n")
return fp
except FileNotFoundError:
return FileNotFoundError("Nenhum ficheiro encontrado")
except PermissionError:
return PermissionError("Sem permissões para abrir")
def find_events(fp: TextIOWrapper) -> list[tuple[int, int]]:
event_indices = []
event_start = -1
idx = 0
for line in fp.read().split("\n"):
if event_start == -1:
event_start = idx
if utils.is_empty(line):
event_indices.append((event_start, idx))
event_start = -1
idx += 1
logger.info("Found %d events", len(event_indices))
return event_indices
def split_event(lines: list[str], start: int, end: int) -> int:
for idx in range(start, end):
if lines[idx].endswith("7"):
return idx
return -1
def extract_event(
fp: TextIOWrapper, event_bounds: list[tuple[int, int]]
) -> tuple[list[evtype], list[sttype]]:
lines = fp.read().split("\n")
events, ev_stations = [], []
for event_idx in event_bounds:
stations = split_event(lines, event_idx[0], event_idx[1])
if stations == -1:
logger.error(f"Could not parse event at pos {event_idx}")
continue
ev = parse_event(lines[event_idx[0] : stations])
events.append(ev)
ev_stations.append(
parse_stations_V1(lines[stations + 1 : event_idx[1]], ev["ID"])
)
return events, ev_stations
def parse(fname: str):
_ret = read_file(fname)
if not isinstance(_ret, TextIOWrapper):
logger.critical(_ret.__str__())
raise _ret
events = find_events(_ret)
_ret.seek(0)
evs, stations = extract_event(_ret, events)
# cleanup
_ret.close()
return evs, stations

83
utilsv2/stats.py Normal file
View File

@@ -0,0 +1,83 @@
import time
from datetime import datetime
import numpy as np
def print_filters(filters):
_res = ""
for k, v in filters.items():
_res += f"{k}: {v}\n"
def pprint(v):
return f"\tMédia: {v[0]} \u00b1 {v[1]}; 1o Quartil: {v[3]}; Mediana: {v[2]}; 3o Quartil: {v[4]}; Máximo: {v[5]}; Mínimo: {v[6]}"
def stats(data):
_stats = f"===Estatística==\nNúmero total de eventos: {len(data)}\n"
aux = []
currMonth: datetime = data[0]["DateTime"]
idx = 0
while idx < len(data):
if data[idx]["DateTime"].month == currMonth.month:
aux.append(data[idx])
idx += 1
else:
m = calc_mag(aux)
d = calc_depth(aux)
aux = []
_stats += f"{currMonth.strftime('%Y-%m')}:\n\tMagnitude: {pprint(m)}\n\tProfundidade: {pprint(d)}\n\n"
currMonth = data[idx]["DateTime"]
m = calc_mag(aux)
d = calc_depth(aux)
_stats += f"{currMonth.strftime('%Y-%m')}:\nMagnitude: {pprint(m)}\nProfundidade: {pprint(d)}\n"
fname = f"stats-{time.time_ns()}.txt"
with open(fname, "wb") as fp:
fp.write(_stats.encode("utf-8"))
# print(_stats)
def calc_depth(data):
if len(data) == 0:
return 0
depths = np.array([v["Depth"] for v in data], dtype=float)
quantile = np.quantile(depths, [0.25, 0.75])
return list(
map(
float,
(
round(np.average(depths), 3),
round(np.std(depths), 3),
round(np.median(depths), 3),
round(quantile[0], 3),
round(quantile[1], 3),
np.min(depths),
np.max(depths),
),
)
)
def calc_mag(data):
if len(data) == 0:
return 0
mags = np.array([v["Magnitudes"]["L"]["Magnitude"] for v in data], dtype=float)
quantile = np.quantile(mags, [0.25, 0.75])
return list(
map(
float,
(
round(np.average(mags), 3),
round(np.std(mags), 3),
round(np.median(mags), 3),
round(quantile[0], 3),
round(quantile[1], 3),
np.min(mags),
np.max(mags),
),
)
)

48
utilsv2/utils.py Normal file
View File

@@ -0,0 +1,48 @@
import os
from datetime import datetime
def is_empty(_str: str) -> bool:
return len(_str.strip(" ")) == 0
def toDateTime(dt: str) -> datetime | int:
if len(dt) == 0:
return -1
try:
return datetime.strptime(dt, "%Y-%m-%d")
except ValueError:
return -1
def toFloat(v: str) -> float:
if len(v) == 0:
return -1.0
try:
return float(v)
except ValueError:
return -1.0
def toInt(v: str) -> int | None:
if len(v) == 0:
return None
try:
return int(v)
except ValueError:
return None
def print_ym(dt: datetime) -> str:
return dt.strftime("%Y-%m")
def fileExists(fname: str) -> bool:
files = set(os.listdir())
if fname not in files:
return False
if not os.path.isfile(fname):
return False
return True