From 3e0814057fbaabc6345ea5309619b1747f521b6e Mon Sep 17 00:00:00 2001 From: Shino Date: Sat, 13 Dec 2025 12:16:20 -0100 Subject: [PATCH] =?UTF-8?q?doc:=20Coment=C3=A1rios=20a=20cada=20fun=C3=A7?= =?UTF-8?q?=C3=A3o=20fix:=20remover=20c=C3=B3digo=20morto=20ou=20desnecess?= =?UTF-8?q?=C3=A1rio?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 +- earthquakes.py | 163 +++++++++++++------ requirements.txt | 5 +- utils/crud.py | 389 +++++++++++++++++++++++++++++++------------- utils/filters.py | 141 ++++++++++++---- utils/parser.py | 409 ++++++++++++++++++++++++++++++++++++----------- utils/plot.py | 49 ------ utils/stats.py | 328 ++++++++++++++++++++++++++++--------- utils/utils.py | 119 +++++++++++--- utils/visuals.py | 149 +++++++++++++---- 10 files changed, 1287 insertions(+), 468 deletions(-) delete mode 100644 utils/plot.py diff --git a/README.md b/README.md index fd4d1d4..37e3a99 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ ## Como utilizar Correr o ficheiro `earthquakes.py` usando `python earthquakes.py` +Garantir que o ficheiro de dados está no mesmo diretório que o ficheiro `earthquakes.py` ## Objectivos @@ -36,7 +37,7 @@ First, let's represent the data using Python's Pandas module and implement CRUD ## Prazos - T1 a T4 -> 10 de novembro -- (a definir) +- T5 a T7 -> 14 de dezembro ## Apontamentos Dados parecem estar no formato [Nordic](https://seisan.info/v13/node259.html) diff --git a/earthquakes.py b/earthquakes.py index 4acf493..dce1c37 100644 --- a/earthquakes.py +++ b/earthquakes.py @@ -5,43 +5,63 @@ import json import os import sys from datetime import datetime +from typing import Any import pandas as pd -from utils import parser, crud, stats, utils, visuals, filters +from utils import crud, filters, parser, stats, utils, visuals HEADER = """=== Terramotos ===""" -EVENT_COLS = ["Data", "Latitude", "Longitude", "Profundidade", "Tipo Evento", "Gap", "Magnitudes", "Regiao", "Sentido", "Pub", "SZ", "VZ"] -STATION_COLS = ["Estacao", "Hora", "Min", "Seg", "Componente", "Distancia Epicentro", "Tipo Onda"] +EVENT_COLS = [ + "Data", + "Latitude", + "Longitude", + "Profundidade", + "Tipo Evento", + "Gap", + "Magnitudes", + "Regiao", + "Sentido", + "Pub", + "SZ", + "VZ", +] +STATION_COLS = [ + "Estacao", + "Hora", + "Min", + "Seg", + "Componente", + "Distancia Epicentro", + "Tipo Onda", +] -MENU ="""[1] Criar a base de dados -[3] Apagar um evento -[4] Apagar uma entrada de um evento -[5] Visualizar um evento -[6] Guardar como JSON -[7] Guardar como CSV -[8] Estatísticas -[9] Criar uma entrada -[10] Gráficos -[11] Filtros (T7) +MENU = """[1] Criar a base de dados +[2] Apagar um evento +[3] Apagar uma entrada de um evento +[4] Visualizar um evento +[5] Guardar como JSON +[6] Guardar como CSV +[7] Estatísticas +[8] Criar uma entrada +[9] Gráficos +[10] Filtros (T7) [Q] Sair """ -def guardar_json(df: pd.DataFrame, fname: str) -> bool: - _retValues = utils.create_dict_struct(df, EVENT_COLS, None) - - with open(fname , "w") as fp: - try: - json.dump(_retValues, fp) - except: - return False - return True - - def guardar_csv(df: pd.DataFrame, fname: str): + """Guarda uma DataFrame num ficheiro csv + + Args: + df (pd.DataFrame): Dataframe com os dados + fname (str): nome do ficheiro csv + + Returns: + bool: Retorna se a operação foi bem sucedida ou não + """ with open(fname, "w") as fp: try: df.to_csv(fp, index=False) @@ -51,10 +71,16 @@ def guardar_csv(df: pd.DataFrame, fname: str): def main(): + """Ponto de entrada do programa. + + Constituido por um while loop a correr um menu onde o utilizador pode + interagir com os vários módulos implementados. + + """ isRunning = True db = None original_db = None - + retInfo = None while isRunning: @@ -79,10 +105,10 @@ def main(): else: input("Base de dados não encontrada. Por favor tenta de novo.") - case "3": + case "2": if db is not None: crud.read_ids(db) - choice = _get_usr_input("Escolhe o ID para apagar: ", int) + choice: int = _get_usr_input("Escolhe o ID para apagar: ", int) if not _event_exists(db, choice): retInfo = "ID do event não encontrado!" @@ -94,11 +120,10 @@ def main(): else: retInfo = "Base de dados não encontrada!" - - case "4": + case "3": if db is not None: crud.read_ids(db) - eid_choice = _get_usr_input("Escolhe o ID: ", int) + eid_choice: int = _get_usr_input("Escolhe o ID: ", int) if not _event_exists(db, eid_choice): retInfo = "ID do event não encontrado!" @@ -119,7 +144,7 @@ def main(): else: retInfo = "Base de dados não encontrada!" - case "5": + case "4": if db is not None: crud.read_ids(db) choice = _get_usr_input("Escolhe o ID para ver os dados: ", int) @@ -137,16 +162,16 @@ def main(): else: retInfo = "Base de dados não encontrada!" - case "6": + case "5": if db is not None: fname = _get_usr_input("Nome do ficheiro a guardar? ") if fname is None: fname = "valores.json" - utils.save_as_json(db, fname, EVENT_COLS, STATION_COLS) + utils.save_as_json(db, fname, EVENT_COLS) else: retInfo = "Base de dados não encontrada!" - case "7": + case "6": if db is not None: fname = _get_usr_input("Nome do ficheiro a guardar? ") if fname is None: @@ -155,13 +180,13 @@ def main(): else: retInfo = "Base de dados não encontrada!" - case "8": + case "7": if db is not None: stats.stat_menu(db) else: retInfo = "Base de dados não encontrada!" - - case "9": + + case "8": if db is not None: crud.read_ids(db) eid_choice = _get_usr_input("Escolhe o ID: ", int) @@ -176,7 +201,6 @@ def main(): crud.show_table(table) insertion_point = _get_usr_input("Posição da nova linha: ", int) - # TODO: balizar a escolha para apenas as linhas do evento em questao db, msg = crud.create_table_row(db, eid_choice, insertion_point) new_table = crud.get_table(db, eid_choice) @@ -185,14 +209,14 @@ def main(): input() else: retInfo = "Base de dados não encontrada!" - - case "10": + + case "9": if db is not None: visuals.visual_menu(db) else: retInfo = "Base de dados não encontrada!" - case "11": + case "10": if db is not None: # Passa db e original_db para o menu de filtros # Retorna a nova db ativa (filtrada ou redefinida) @@ -213,30 +237,71 @@ def main(): def _file_exists(name: str) -> bool: + """Verifica se um ficheiro existe no diretório onde o programa correntemente + corre, através de os.getcwd() + + Args: + name (str): Nome do ficheiro a verificar + + Returns: + bool: True se existe, False caso contrário + """ currFiles = os.listdir(os.getcwd()) if name in currFiles: return True return False -def _event_exists(df, eid) -> bool: + +def _event_exists(df: pd.DataFrame, eid: int) -> bool: + """Função privada de verificação de eventos + + Verifica se um certo ID de evento existe ou não dentro de uma DataFrame + + Args: + df (pd.DataFrame): DataFrame a pesquisar + eid (int): Evento específico a pesquisar + + Returns: + bool: True se evento existe dentro da DataFrame, False caso contrário + """ allEvents = set(df["ID"]) return eid in allEvents -def _get_usr_input(msg:str, asType=str): +def _get_usr_input(msg: str, asType: Any = str) -> Any: + """Modifica o stdin do utilizador para o tipo especificado. Por defeito retorna uma str. + + Args: + msg (str): String a ser alterada + asType (Any): tipo no qual msg deverá ser intepretado como (default: `str`) + + Returns: + [type]: [description] + """ usrIn = input(msg) if usrIn == "": return None return asType(usrIn) -def _prettify_event(df): - preambleInfo = df.drop_duplicates(subset="ID", keep="first") - stations = df[["Estacao", "Componente", "Tipo Onda", "Amplitude"]] + +def _prettify_event(df: pd.DataFrame) -> None: + """Função privada para utilização na visualização de um evento singular através + do menu de `Visualizar um evento` + + Args: + df (pd.DataFrame): DataFrame com os dados do evento + """ + # preambleInfo = df.drop_duplicates(subset="ID", keep="first") + # stations = df[["Estacao", "Componente", "Tipo Onda", "Amplitude"]] info = df.drop_duplicates(subset="Data", keep="first") data = datetime.fromisoformat(info.Data.values[0]).strftime("%c") - print(f"Região: {info["Regiao"].values[0]}\nData: {data}\nLatitude: {info['Latitude'].values[0]}\nLongitude: {info['Longitude'].values[0]}" - + f"\nProfundidade: {info['Profundidade'].values[0]}\nTipo de evento: {info['Tipo Evento'].values[0]}\n") + print( + f"Região: {info['Regiao'].values[0]}\nData: {data}\nLatitude: {info['Latitude'].values[0]}\nLongitude: {info['Longitude'].values[0]}" + + f"\nProfundidade: {info['Profundidade'].values[0]}\nTipo de evento: {info['Tipo Evento'].values[0]}\n" + ) -if __name__ == '__main__': + +# entry point +if __name__ == "__main__": main() diff --git a/requirements.txt b/requirements.txt index ebb4734..0ac4274 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,5 @@ -numpy==2.3.4 +dash==3.3.0 +matplotlib==3.10.8 +numpy==2.3.5 pandas==2.3.3 +plotly==6.5.0 diff --git a/utils/crud.py b/utils/crud.py index e8b0ffb..c885c05 100644 --- a/utils/crud.py +++ b/utils/crud.py @@ -1,92 +1,231 @@ # pyright: basic +from typing import Any + import pandas as pd -pd.set_option('display.max_rows', 500) -pd.set_option('display.max_columns', 500) -pd.set_option('display.width', 150) +pd.set_option("display.max_rows", 500) +pd.set_option("display.max_columns", 500) +pd.set_option("display.width", 150) # -- globals -HEADER_COLS = ["Data", "Distancia", "Tipo Evento", "Latitude", "Longitude", "Profundidade", "Magnitudes"] +HEADER_COLS = [ + "Data", + "Distancia", + "Tipo Evento", + "Latitude", + "Longitude", + "Profundidade", + "Magnitudes", +] TABLE_READ_RET = ["Estacao", "Hora", "Min", "Seg", "Componente", "Amplitude"] # -- helper funcs -def _get_uniques(df) -> pd.DataFrame: + +def _get_uniques(df: pd.DataFrame) -> pd.DataFrame: + """Funcao privada que retorna os eventos unicos, removendo duplicados. + + Mantem o primeiro evento de cada ID unico, removendo entradas com o ID igual + + Args: + df (pd.DataFrame): DataFrame com os dados + + Returns: + pd.DataFrame: Nova DataFrame com IDs duplicados removidos + """ return df.get(["ID", "Data", "Regiao"]).drop_duplicates(subset="ID", keep="first") -def _show_events(df): - for (_, row) in df.iterrows(): - print(f"{row["ID"]}: {row["Regiao"]}") +def _show_events(df: pd.DataFrame) -> None: + """Funcao privada para print de cada evendo e a respectiva Regiao + + Args: + df (pd.DataFrame): DataFrame com os dados + """ + for _, row in df.iterrows(): + print(f"{row['ID']}: {row['Regiao']}") + # -- main -def read_ids(df): + +def read_ids(df: pd.DataFrame) -> None: + """Mostra, por print(), os eventos disponiveis em df. + + Args: + df (pd.DataFrame): DataFrame com os dados + """ ids = _get_uniques(df) _show_events(ids) -def get_unique_events_table(df): +def get_unique_events_table(df: pd.DataFrame) -> pd.DataFrame: + """Retorna uma nova DataFrame com eventos de ID unico. + + Remove, para cada ID unico, os duplicados, mantendo o primeiro. + + Args: + df (pd.DataFrame): DataFrame com os dados + + Returns: + pd.DataFrame: Nova DataFrame filtrada + """ return df.drop_duplicates(subset="ID", keep="first") -def read_header(df, event_id): - # Obtém a informação da primeira linha do evento (cabeçalho) +def read_header(df: pd.DataFrame, event_id: int) -> str: + """Lê a primeira entrada com ID `event_id` + + Obtém a informação da primeira linha do evento (cabeçalho) e + constrói a string formatada " : " + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento + + Returns: + str: str com os dados do evento + """ row = df[df["ID"] == event_id].iloc[0] - cols = list(df.columns) info = [] - for (i, col) in enumerate(HEADER_COLS): - # Constrói a string formatada "Índice Nome: Valor" - info.append(f"{i+1} {col}: {row[col]}") - infoString = f"Header do evento {event_id}:\n" + "\n".join(info) + for i, col in enumerate(HEADER_COLS): + info.append(f"{i + 1} {col}: {row[col]}") + + infoString = f"Header do evento {event_id}:\n" + "\n".join(info) return infoString -def show_table(df, retCols=TABLE_READ_RET): - print(df.loc[:,retCols]) +def show_table(df: pd.DataFrame, retCols: list[str] = TABLE_READ_RET) -> None: + """print() da DataFrame total, filtrada por colunas. Por defeito, faz print + de apenas da Estação, HMS, Componente e Amplitude registada + + Args: + df (pd.DataFrame): DataFrame com os dados + retCols (list[str]): Filtro de colunas a fazer print() (default: `TABLE_READ_RET`) + """ + print(df.loc[:, retCols]) -def get_table(df, event_id): - rows = df[df["ID"] == event_id] +def get_table(df: pd.DataFrame, event_id: int) -> pd.DataFrame: + """Retorna uma DataFrame apenas com o evento `event_id` + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento + + Returns: + pd.DataFrame: Nova DataFrame com todos os dados do evento `event_id` + """ + rows: pd.DataFrame = df[df["ID"] == event_id] # type: ignore return rows -def read_table_row(df, event_id, row_number_1): - # Retorna uma linha específica da tabela de estações - # row_number_1 é o índice dado pelo utilizador (começa em 1) - # row_number_0 é o índice real da lista (começa em 0) - row_number_0 = row_number_1 - 1 - table = get_table(df, event_id) - - # Verifica se a linha pedida existe dentro das linhas deste evento - if row_number_0 < 0 or row_number_0 >= len(table): - return f"Linha {row_number_1} não pertence ao evento {event_id}." - - row = table.iloc[row_number_0] - cols = list(df.columns) - - # Encontra onde começam as colunas da estação para mostrar apenas os dados relevantes - start = cols.index("Estacao") - tableCols = cols[start:] - - info = [] - for (i, col) in enumerate(tableCols): - info.append(f"{i+1} {col}: {row[col]}") - return f"Linha {row_number_1:02d} do evento {event_id}:\n" + "\n".join(info) +def delete_event(df: pd.DataFrame, event_id: int) -> pd.DataFrame: + """Apaga um evento da DataFrame, retornando a DataFrame atualizada + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento a apagar + + Returns: + pd.DataFrame: DataFrame sem o evento. + """ + new_df = df.drop(df[df["ID"] == event_id].index) + print(f"Evento {event_id} apagado!") + return new_df -def update_table_row(df, row_line, new_data): - for key, value in new_data.items(): - if key in df.columns: - df.loc[row_line, key] = value - return f"Linha {row_line} do evento atualizada com sucesso." +def delete_table_row(df: pd.DataFrame, event_id: int, row_number: int) -> pd.DataFrame: + """Apaga uma linha específica relativa ao evento `event_id` + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id ([type]): [description] + row_number ([type]): [description] + + Returns: + pd.DataFrame: [description] + """ + matching_indices = df.index[df["ID"] == event_id].tolist() + + first_event_row = matching_indices[0] + last_event_row = matching_indices[-1] + + # Garante que não estamos a apagar uma linha que pertence a outro evento + if row_number < first_event_row or row_number > last_event_row: + print( + f"Erro: A posição a apagar, {row_number} está fora do intervalo permitido para o evento {event_id}." + ) + return df + + new_df = df.drop([row_number]).reset_index(drop=True) + print(f"Linha {row_number} apagada com sucesso!") + return new_df + + +def create_table_row( + df: pd.DataFrame, event_id: int, insertion_point: int +) -> pd.DataFrame: + """Insere uma nova linha vazia numa posição específica dentro do evento `event_id` + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento + insertion_point (int): [description] + + Returns: + tuple: [description] + """ + # + + # Encontra os limites (início e fim) do evento atual + matching_indices = df.index[df["ID"] == event_id].tolist() + + first_event_row = matching_indices[0] + last_event_row = matching_indices[-1] + + # Valida se o ponto de inserção é válido para este evento + if insertion_point < first_event_row or insertion_point > last_event_row + 1: + print( + f"Erro: A posição de inserção {insertion_point} está fora do intervalo permitido para o evento {event_id}" + ) + return df + + # Cria a nova linha + new_row_df = pd.DataFrame(columns=df.columns, index=[0]) + new_row_df["ID"] = event_id + new_row_df = new_row_df.fillna(0) + new_row_df = new_row_df.astype(df.dtypes) + + # Parte o dataframe em dois (antes e depois do ponto de inserção) e mete a nova linha no meio + df_before = df.iloc[:insertion_point] + df_after = df.iloc[insertion_point:] + + new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True) + print(f"Linha inserida com sucesso na posição {insertion_point}") + + return new_df + + +# -- Deprecated def update_header(df, event_id, new_data): - # Atualiza o cabeçalho de um evento com os novos dados + """>OBSOLETO< + + Atualiza o cabeçalho de um evento com os novos dados + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento + new_data (dict[str, Any]): Novos dados para substituir + + Returns: + str: AWK de atualização do cabeçalho + """ for key, value in new_data.items(): if key in df.columns: # Atualiza todas as linhas deste evento (ID == event_id) com o novo valor @@ -94,32 +233,61 @@ def update_header(df, event_id, new_data): return f"Header do evento {event_id} atualizado com sucesso." -def delete_event(df, event_id): - # Apaga um evento inteiro (header + tabela) - new_df = df.drop(df[df["ID"] == event_id].index) - print(f"Evento {event_id} apagado!") +def create_table_row_old( + df: pd.DataFrame, event_id: int, row_number_1: int +) -> pd.DataFrame: + """>OBSOLETO< + + Cria uma linha na DataFrame + + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento + row_number_1 (int): posição livre onde inserir + + Returns: + pd.DataFrame: Nova DataFrame + """ + event_rows = df[df["ID"] == event_id] + if event_rows.empty: + print(f"Erro: Evento com ID {event_id} não encontrado.") + return df + header_idx: int = event_rows.index[0] # type: ignore + table_size = len(event_rows.index) - 1 + + # Validar posição da nova linha + if not (1 <= row_number_1 <= table_size + 1): + print( + f"Erro: Posição {row_number_1} inválida. Evento {event_id} tem {table_size} linha(s) na tabela." + ) + return df + + insertion_point = header_idx + row_number_1 + + new_row_df = pd.DataFrame(columns=df.columns, index=[0]) + new_row_df["ID"] = event_id + new_row_df = new_row_df.astype(df.dtypes) + df_before = df.iloc[:insertion_point] + df_after = df.iloc[insertion_point:] + + new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True) + + print(f"Linha inserida com sucesso na posição {row_number_1} do evento {event_id}.") return new_df -def delete_table_row(df, event_id, row_number): - # Apaga uma linha específica da tabela de estações de um evento - - # Encontra todos os índices (números de linha no DataFrame que pertencem a este evento - matching_indices = df.index[df['ID'] == event_id].tolist() +def create_blank_event(df: pd.DataFrame, event_id: int) -> pd.DataFrame: + """>OBSOLETO< - first_event_row = matching_indices[0] - last_event_row = matching_indices[-1] + Criano um novo evento com valores vazios - # Garante que não estamos a apagar uma linha que pertence a outro evento - if row_number < first_event_row or row_number > last_event_row: - return df, f"Erro: A posição a apagar, {row_number} está fora do intervalo permitido para o evento {event_id}." - - new_df = df.drop([row_number]).reset_index(drop=True) - return new_df, f"Linha {row_number} apagada com sucesso!" + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do novo evento - -def create_blank_event(df, event_id): - # Cria um novo evento vazio + Returns: + pd.DataFrame: Nova DataFrame + """ # Primeiro, avança os IDs de todos os eventos seguintes para arranjar espaço df.loc[df["ID"] >= event_id, "ID"] += 1 @@ -136,56 +304,53 @@ def create_blank_event(df, event_id): return new_df -def create_table_row(df, event_id, insertion_point): - # Insere uma nova linha vazia numa posição específica dentro do evento - - # Encontra os limites (início e fim) do evento atual - matching_indices = df.index[df['ID'] == event_id].tolist() +def update_table_row(df: pd.DataFrame, row_line: int, new_data: dict[str, Any]) -> str: + """Atualiza uma linha de `df` com novos dados - first_event_row = matching_indices[0] - last_event_row = matching_indices[-1] + Args: + df (pd.DataFrame): DataFrame com os dados + row_line (int): linha a atualizar + new_data (dict[str, Any]): novos dados a substituir na linha - # Valida se o ponto de inserção é válido para este evento - if insertion_point < first_event_row or insertion_point > last_event_row + 1: - return df, f"Erro: A posição de inserção {insertion_point} está fora do intervalo permitido para o evento {event_id}" + Returns: + str: AWK de atualização da linha + """ + for key, value in new_data.items(): + if key in df.columns: + df.loc[row_line, key] = value + return f"Linha {row_line} do evento atualizada com sucesso." - # Cria a nova linha - new_row_df = pd.DataFrame(columns=df.columns, index=[0]) - new_row_df['ID'] = event_id - new_row_df = new_row_df.fillna(0) - new_row_df = new_row_df.astype(df.dtypes) - - # Parte o dataframe em dois (antes e depois do ponto de inserção) e mete a nova linha no meio - df_before = df.iloc[:insertion_point] - df_after = df.iloc[insertion_point:] - new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True) +def read_table_row(df: pd.DataFrame, event_id: int, row_number_1: int) -> str: + """Retorna uma str com todos os valores de uma linha de `df`, relativa ao + evento `event_id`. - return new_df, f"Linha inserida com sucesso na posição {insertion_point}" + Args: + df (pd.DataFrame): DataFrame com os dados + event_id (int): ID do evento + row_number_1 (int): Linha a imprimir -def create_entire_database() -> pd.DataFrame: - pass + Returns: + str: String formatada com os dados. + """ + # Retorna uma linha específica da tabela de estações + # row_number_1 é o índice dado pelo utilizador (começa em 1) + # row_number_0 é o índice real da lista (começa em 0) + row_number_0 = row_number_1 - 1 + table = get_table(df, event_id) -def create_table_row_old(df, event_id, row_number_1): - event_rows = df[df["ID"] == event_id] - if event_rows.empty: - return df, f"Erro: Evento com ID {event_id} não encontrado." + # Verifica se a linha pedida existe dentro das linhas deste evento + if row_number_0 < 0 or row_number_0 >= len(table): + return f"Linha {row_number_1} não pertence ao evento {event_id}." - header_idx = event_rows.index[0] - table_size = len(event_rows.index) - 1 + row = table.iloc[row_number_0] + cols = list(df.columns) - # Validar posição da nova linha - if not (1 <= row_number_1 <= table_size + 1): - return df, f"Erro: Posição {row_number_1} inválida. Evento {event_id} tem {table_size} linha(s) na tabela." - insertion_point = header_idx + row_number_1 - - new_row_df = pd.DataFrame(columns=df.columns, index=[0]) - new_row_df['ID'] = event_id - new_row_df = new_row_df.astype(df.dtypes) - df_before = df.iloc[:insertion_point] - df_after = df.iloc[insertion_point:] - - new_df = pd.concat([df_before, new_row_df, df_after], ignore_index=True) - - return new_df, f"Linha inserida com sucesso na posição {row_number_1} do evento {event_id}." + # Encontra onde começam as colunas da estação para mostrar apenas os dados relevantes + start = cols.index("Estacao") + tableCols = cols[start:] + info = [] + for i, col in enumerate(tableCols): + info.append(f"{i + 1} {col}: {row[col]}") + return f"Linha {row_number_1:02d} do evento {event_id}:\n" + "\n".join(info) diff --git a/utils/filters.py b/utils/filters.py index b29fc88..4302cf0 100644 --- a/utils/filters.py +++ b/utils/filters.py @@ -1,43 +1,117 @@ -import pandas as pd -from datetime import datetime import os import sys +from datetime import datetime + +import pandas as pd def filter_by_date(df: pd.DataFrame, start_date: str, end_date: str) -> pd.DataFrame: - # filtra o dataframe por intervalo de datas (strings em formato ISO) - mask = (df['Data'] >= start_date) & (df['Data'] <= end_date) + """Retorna uma nova DataFrame filtrada por datas de inicio e fim + + Args: + df (pd.DataFrame): DataFrame a filtrar + start_date (str): data de inicio, em formato ISO + end_date (str): data de fim, em formato ISO + + Returns: + pd.DataFrame: DataFrame filtrada + """ + # FIX: filtragem por datas usando datetime + mask = (df["Data"] >= start_date) & (df["Data"] <= end_date) return df.loc[mask] -def filter_by_depth(df: pd.DataFrame, min_depth: float, max_depth: float) -> pd.DataFrame: - mask = (df['Profundidade'] >= min_depth) & (df['Profundidade'] <= max_depth) + +def filter_by_depth( + df: pd.DataFrame, min_depth: float, max_depth: float +) -> pd.DataFrame: + """Retorna uma nova DataFrame, filtrada entre um intervalo de profundidades + + Args: + df (pd.DataFrame): DataFrame a filtrar + min_depth (float): profundidade minima + max_depth (float): profundidade maxima + + Returns: + pd.DataFrame: DataFrame filtrada + """ + mask = (df["Profundidade"] >= min_depth) & (df["Profundidade"] <= max_depth) return df.loc[mask] -def filter_by_magnitude(df: pd.DataFrame, min_mag: float, max_mag: float, mag_type: str = 'L') -> pd.DataFrame: - def filter_mag(mags): + +def filter_by_magnitude( + df: pd.DataFrame, min_mag: float, max_mag: float, mag_type: str = "L" +) -> pd.DataFrame: + """Retorna uma nova DataFrame, filtrada entre um intervalo de magnitudes. + + [description] + + Args: + df (pd.DataFrame): DataFrame a filtrar + min_mag (float): magnitude minima + max_mag (float): magnitude maxima + mag_type (str): Tipo de magnitude a filtrar (default: `'L'`) + + Returns: + pd.DataFrame: DataFrame filtrada + """ + + def _filter_mag(mags): # Filtrar por tipo de magnitude específico - vals = [float(m['Magnitude']) for m in mags if m.get('Tipo') == mag_type] - if not vals: + vals = [float(m["Magnitude"]) for m in mags if m.get("Tipo") == mag_type] + if not vals: return False # Se houver múltiplas magnitudes do mesmo tipo, usa o máximo para filtragem mx = max(vals) return min_mag <= mx <= max_mag - mask = df['Magnitudes'].apply(filter_mag) + mask = df["Magnitudes"].apply(_filter_mag) return df.loc[mask] + # -- t7 filters + def filter_by_gap(df: pd.DataFrame, max_gap: float) -> pd.DataFrame: + """Retorna uma nova DataFrame, filtrada por valores do GAP inferiores a `max_gap` + + Args: + df (pd.DataFrame): DataFrame a filtrar + max_gap (float): valor GAP maximo + + Returns: + pd.DataFrame: DataFrame filtrada + """ # Filtra onde Gap <= max_gap - return df[df['Gap'] <= max_gap] + return df[df["Gap"] <= max_gap] + def filter_by_quality(df: pd.DataFrame, quality: str) -> pd.DataFrame: - return df[df['Pub'] == quality] + """Retorna uma nova DataFrame para eventos apenas com qualidade especificada + + Args: + df (pd.DataFrame): DataFrame a filtrar + quality (str): Qualidade a filtrar + + Returns: + pd.DataFrame: DataFrame filtrada + """ + return df[df["Pub"] == quality] + def filter_by_zone(df: pd.DataFrame, zone_type: str, zone_val: str) -> pd.DataFrame: + """Retorna uma nova DataFrame para eventos de uma certa zona + + Args: + df (pd.DataFrame): DataFrame a filtrar + zone_type (str): Tipo da zona, (ex: VZ, SZ) + zone_val (str): Valor da zona + + Returns: + pd.DataFrame: DataFrame filtrada + """ return df[df[zone_type] == zone_val] + FILTER_MENU = """[1] Filtrar por Data (Inicio:Fim) [2] Filtrar por Gap (< Valor) [3] Filtrar por Qualidade (EPI) @@ -50,39 +124,50 @@ FILTER_MENU = """[1] Filtrar por Data (Inicio:Fim) [Q] Voltar """ -def filter_menu(db: pd.DataFrame, original_db: pd.DataFrame): + +def filter_menu(db: pd.DataFrame, original_db: pd.DataFrame) -> pd.DataFrame: + """Menu de filtragem da DataFrame, com base em datas, magnitudes, profundidades, zonas, GAP e qualidades, + com opcao para reverter para a DataFrame original, para remocao dos filtros aplicados + + Args: + db (pd.DataFrame): DataFrame a ser filtrada + original_db (pd.DataFrame): DataFrame de origem, para reversao + + Returns: + pd.DataFrame: Retorna a DataFrame com os filtros aplicados, ou a original sem qualquer filtro aplicado. + """ currDb = db - + while True: os.system("cls" if sys.platform == "windows" else "clear") print("=== T7: Filtros ===") print(f"Linhas actuais: {len(currDb)}") print(FILTER_MENU) usrIn = input("Opção: ").lower() - - + match usrIn: case "1": start = input("Data Inicio (YYYY-MM-DD): ") end = input("Data Fim (YYYY-MM-DD): ") currDb = filter_by_date(currDb, start, end) - + case "2": val = float(input("Gap Máximo: ")) currDb = filter_by_gap(currDb, val) - - + case "3": - confirm = input("Filtrar apenas eventos com Qualidade EPI? (s/n): ").lower() - if confirm == 's': + confirm = input( + "Filtrar apenas eventos com Qualidade EPI? (s/n): " + ).lower() + if confirm == "s": currDb = filter_by_quality(currDb, "EPI") else: print("Filtro não aplicado.") - + case "4": val = input("Zona SZ (ex: SZ31): ") currDb = filter_by_zone(currDb, "SZ", val) - + case "5": val = input("Zona VZ (ex: VZ14): ") currDb = filter_by_zone(currDb, "VZ", val) @@ -91,18 +176,18 @@ def filter_menu(db: pd.DataFrame, original_db: pd.DataFrame): print("Filtrar por Magnitude Tipo 'L'") min_m = float(input("Min Mag L: ")) max_m = float(input("Max Mag L: ")) - + currDb = filter_by_magnitude(currDb, min_m, max_m, "L") case "7": min_d = float(input("Min Profundidade: ")) max_d = float(input("Max Profundidade: ")) currDb = filter_by_depth(currDb, min_d, max_d) - + case "r": currDb = original_db.copy() - + case "q": return currDb case _: - pass \ No newline at end of file + pass diff --git a/utils/parser.py b/utils/parser.py index 1d1c4e7..72a60dd 100644 --- a/utils/parser.py +++ b/utils/parser.py @@ -1,165 +1,292 @@ import io - from collections import defaultdict from datetime import datetime +from typing import Any import pandas as pd +"""Parser de dados + +A dataframe retornada tera multiplas linhas referentes ao mesmo evento +visto que se esta a guardar por linha cada estacao que registou o evento em questa +logo cada linha tem sempre a mesma informacao duplicada que se encontra no preambulo +para cada estacao + +""" + + # --- variáveis globais --- DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"} TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"} -# --- funções auxiliares --- -def is_blank(l: str) -> bool: - return len(l.strip(" ")) == 0 +# --- funções auxiliares --- +def is_blank(_str: str) -> bool: + """Verifica se uma string tem ou nao conteudo -def parse_flt(v:str) -> float | None: + Args: + _str (str): str a verificar se esta vazia + + Returns: + bool: True se str tem conteudo, False caso contrario + """ + return len(_str.strip(" ")) == 0 + + +def parse_flt(value: str) -> float | None: + """Formata str como float + + Args: + value (str): nro em string para ser formatado + + Returns: + float | None: Retorna um float se bem sucedido, None se excepcao + """ try: - t = float(v) - return t + return float(value) except ValueError: return None -def parse_int(v:str) -> int | None: + +def parse_int(value: str) -> int | None: + """Formata str como int + + Args: + value (str): nro em string para ser formatado + + Returns: + int | None: Retorna um int se bem sucedido, None se excepcao + """ try: - t = int(v) - return t + return int(value) except ValueError: return None -def into_dataframe(data) -> pd.DataFrame: + +def into_dataframe(data: dict[str, Any]) -> pd.DataFrame: + """Transforma uma dict numa DataFrame + + Args: + data (dict[str, Any]): [description] + + Returns: + pd.DataFrame: DataFrame + """ if len(data) == 0: return pd.DataFrame() aux = {k: [] for k in data.keys()} - for (k,v) in data.items(): + for k, v in data.items(): aux[k].append(v) return pd.DataFrame(data=aux) -def _concat(preamble, df: pd.DataFrame): - for (k,v) in preamble.items(): - df.insert(len(df.columns)-1, k, [v for _ in range(len(df))]) + +def _concat(preamble: dict[str, Any], df: pd.DataFrame) -> pd.DataFrame: + """Junta o preambulo, uma dict, na DataFrame + + Args: + preamble (dict[str, Any]): Preambulo do evento a inserir + df (pd.DataFrame): DataFrame com eventos + + Returns: + [type]: Nova DataFrame com o preambulo adicionado + """ + for k, v in preamble.items(): + df.insert(len(df.columns) - 1, k, [v for _ in range(len(df))]) return df -def validate_no_stations(expected:int , stationsDF:pd.DataFrame) -> bool: - uniqueStations = stationsDF["Estacao"].nunique() - return expected == uniqueStations - # --- principal --- -def parse(fname): +def parse(fname: str) -> pd.DataFrame: + """Faz o parse de todos os eventos no ficheiro. + + A funcao separa em eventos singulares, e transforma cada evento numa DataFrame, + que sera concatenada com uma DataFrame que contem todos os eventos existentes + + Args: + fname (str): nome do ficheiro que contem os dados + + Returns: + pd.DataFrame: DataFrame com os eventos formatados + """ fp = open(fname) - data = [l for l in fp.read().split("\n")] + data = [line for line in fp.read().split("\n")] chunks = boundaries(data) df = pd.DataFrame() - for (idx,c) in enumerate(chunks): - a = parse_chunk(data[c[0]:c[1]]) + for c in chunks: + a = parse_chunk(data[c[0] : c[1]]) aux = pd.concat([df, a], axis=0, ignore_index=True) df = aux fp.close() return df -def boundaries(data: list[str]): + +def boundaries(data: list[str]) -> list[tuple[int, int]]: + """Procura e guarda a posicao de cada evento. + + O ficheiro tem os eventos separados por uma linha em branco + + Args: + data (list[str]): lista dos dados + + Returns: + list[tuple[int, int]]: lista com tuples dos indices de inicio + e fim de cada evento + """ boundaries = [] - start = None - for (idx,l) in enumerate(data): - if start is None: - if not is_blank(l): - start = idx + eventStart = None + for idx, line in enumerate(data): + if eventStart is None: + if not is_blank(line): + eventStart = idx else: - if is_blank(l): - boundaries.append((start,idx)) - start = None + if is_blank(line): + boundaries.append((eventStart, idx)) + eventStart = None return boundaries -def parse_chunk(chunk_lines: list[str]): - hIdx = None - for (idx, l) in enumerate(chunk_lines): - if l[-1] == "7": - hIdx = idx - break - preambleRet = _parse_preamble(chunk_lines[:hIdx]) - phaseRet = _parse_type_7(chunk_lines[hIdx:]) - if not validate_no_stations(preambleRet["Estacoes"], phaseRet): - pass +def parse_chunk(chunk_lines: list[str]) -> pd.DataFrame: + """Parse de um evento no formato Nordic, separando num preambulo, e nas estacoes + Ambos sao enviados para as suas funcoes privadas para serem parsed + + Args: + chunk_lines (list[str]): lista de str do evento, como slice da lista de todos os eventos + + Returns: + pd.DataFrame: DataFrame do evento + """ + separatorIdx = None + for idx, line in enumerate(chunk_lines): + if line[-1] == "7": + separatorIdx = idx + break + preambleRet = _parse_preamble(chunk_lines[:separatorIdx]) + phaseRet = _parse_type_7(chunk_lines[separatorIdx:]) return _concat(preambleRet, phaseRet) -def _parse_preamble(hLines: list[str]): - aux = defaultdict(list) + +def _parse_preamble(hLines: list[str]) -> dict[str, Any]: + """Transforma o preambulo numa dict com os valores que precisamos + + Verifica cada linha e separa dentro de uma dict, com a chave sendo o tipo de linha + + Args: + hLines (list[str]): slice da lista com apenas o preambulo + + Returns: + dict[str, Any]: dict com os valores necessarios + """ + lineTypes = defaultdict(list) for line in hLines: match line[-1]: case "1": - aux[1].append(line) + lineTypes[1].append(line) case "3": - aux[3].append(line) + lineTypes[3].append(line) case "6": - aux[6].append(line) + lineTypes[6].append(line) case "E": - aux["E"].append(line) + lineTypes["E"].append(line) case "I": - aux["I"].append(line) - case "F": - pass - # aux["F"].append(line) + lineTypes["I"].append(line) case _: pass headerDict = dict() - for (k,v) in aux.items(): + for k, v in lineTypes.items(): if len(v) != 0: + # FUNCS[k] retorna o handle de cada funcao para cada tipo de linha headerDict.update(FUNCS[k](v)) return headerDict -def _parse_type_1(data: list[str]): - aux = data[0] - y = int(aux[1:5]) - mo = int(aux[6:8]) - d = int(aux[8:10]) - h = int(aux[11:13]) - m = int(aux[13:15]) - s = int(aux[16:18]) - mil = int(aux[19]) * 10**5 - dt = datetime(y,mo,d,h,m,s,mil) +def _parse_type_1(data: list[str]) -> dict[str, Any]: + """Transforma linhas tipo 1 (data, hora, latitude, longitude, profundidade + agencia, magnitudes e tipos e nro de estacoes que registaram o evento) - dist_ind = DIST_IND[aux[21]] - ev_type = TYPE[aux[22]] - lat = float(aux[23:30]) - long = float(aux[30:38]) - depth = float(aux[38:43]) - no_stat = int(aux[48:51]) + Args: + data (list[str]): lista de linhas tipo 1 - hypo = {"Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Evento": ev_type, "Latitude": lat, "Longitude": long, "Profundidade": depth, "Estacoes": no_stat, "Magnitudes": list()} - for l in data: - hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l) + Returns: + dict[str, Any]: dict com os valores necessarios + """ + y = int(data[0][1:5]) + mo = int(data[0][6:8]) + d = int(data[0][8:10]) + h = int(data[0][11:13]) + m = int(data[0][13:15]) + s = int(data[0][16:18]) + mil = int(data[0][19]) * 10**5 + dt = datetime(y, mo, d, h, m, s, mil) + + dist_ind = DIST_IND[data[0][21]] + ev_type = TYPE[data[0][22]] + lat = float(data[0][23:30]) + long = float(data[0][30:38]) + depth = float(data[0][38:43]) + no_stat = int(data[0][48:51]) + + hypo = { + # NOTE: ANTES ERA UMA STRING, AGORA E O OBJECTO DATETIME + "Data": dt, + "Distancia": dist_ind, + "Tipo Evento": ev_type, + "Latitude": lat, + "Longitude": long, + "Profundidade": depth, + "Estacoes": no_stat, + "Magnitudes": [], + } + for line in data: + hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(line) return hypo -def _parse_mag(line: str): + +def _parse_mag(line: str) -> list[dict[str, Any]]: + """Transforma nos varios tipos de magnitudes + + Args: + line (str): str das linhas tipo 1 + + Returns: + list[dict[str, Any]]: dict com os valores das magnitudes e o seu tipo + """ magnitudes = [] base = 55 while base < 79: - m = line[base:base+4] - mt = line[base+4] + m = line[base : base + 4] + mt = line[base + 4] if not is_blank(m): magnitudes.append({"Magnitude": m, "Tipo": mt}) base += 8 return magnitudes -def _parse_type_3(data: list[str]): +def _parse_type_3(data: list[str]) -> dict[str, Any]: + """Transforma linhas tipo 3 (observacoes) + + Args: + data (list[str]): lista com linhas tipo 3 + + Returns: + dict[str, Any]: dict com valores necessarios + """ comments = {} for line in data: - if line.startswith(" SENTIDO") or line.startswith(" REGIAO") or line.startswith(" PUB"): - c, v = line[:-2].strip().split(": ", maxsplit=1) - - if c == "REGIAO": - parts = v.split(",") + if ( + line.startswith(" SENTIDO") + or line.startswith(" REGIAO") + or line.startswith(" PUB") + ): + chave, valor = line[:-2].strip().split(": ", maxsplit=1) + + if chave == "REGIAO": + parts = valor.split(",") comments["Regiao"] = parts[0].strip() for p in parts[1:]: p = p.strip() @@ -167,38 +294,128 @@ def _parse_type_3(data: list[str]): comments["SZ"] = p elif "VZ" in p: comments["VZ"] = p - elif c == "PUB": - comments["Pub"] = v.strip() + elif chave == "PUB": + comments["Pub"] = valor.strip() else: - comments[c.capitalize()] = v.split(",")[0] + comments[chave.capitalize()] = valor.split(",")[0] return comments -def _parse_type_6(data: list[str]): +def _parse_type_6(data: list[str]) -> dict[str, list[str]]: + """Transforma linhas tipo 6 (nome de onda) + + [description] + + Args: + data (list[str]): lista de linhas tipo 6 + + Returns: + dict[str, list[str]]: lista de nomes dos ficheiros das ondas + """ waves = [] - for l in data: - waves.append(l.strip().split(" ")[0]) + for line in data: + waves.append(line.strip().split(" ")[0]) return {"Onda": waves} -def _parse_type_7(data: list[str]): +def _parse_type_7(data: list[str]) -> pd.DataFrame: + """Transforma linhas tipo 7 (estacoes) + + Args: + data (list[str]): linhas tipo 7 + + Returns: + pd.DataFrame: DataFrame com as informacoes de cada estacao + """ aux = io.StringIO("\n".join(data)) - dados = pd.read_fwf(aux, colspecs=[(1,5), (6,8),(10,15), (18,20), (20,22), (23,28), (34,38), (71,75)]) - dados.rename(columns={'STAT': "Estacao", 'SP': "Componente" , 'PHASW': "Tipo Onda", 'HR': "Hora", 'MM': "Min", 'SECON': "Seg", 'AMPL': "Amplitude", " DIST": "Distancia Epicentro"}, inplace=True) + dados = pd.read_fwf( + aux, + colspecs=[ + (1, 5), + (6, 8), + (10, 15), + (18, 20), + (20, 22), + (23, 28), + (34, 38), + (71, 75), + ], + ) + dados.rename( + columns={ + "STAT": "Estacao", + "SP": "Componente", + "PHASW": "Tipo Onda", + "HR": "Hora", + "MM": "Min", + "SECON": "Seg", + "AMPL": "Amplitude", + " DIST": "Distancia Epicentro", + }, + inplace=True, + ) return dados -def _parse_type_e(data: list[str]): - aux = data[0] - error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])} +def _parse_type_e(data: list[str]) -> dict[str, Any]: + """Transformar linhas tipo E (erros) + + Args: + data (list[str]): linhas tipo E + + Returns: + dict[str, Any]: dict com os valores necessarios + """ + error = { + "Gap": int(data[0][5:8]), + "Origin": float(data[0][14:20]), + "Error_lat": float(data[0][24:30]), + "Error_long": float(data[0][32:38]), + "Error_depth": float(data[0][38:43]), + "Cov_xy": float(data[0][43:55]), + "Cov_xz": float(data[0][55:67]), + "Cov_yz": float(data[0][67:79]), + } return error -def _parse_type_i(data: list[str]): +def _parse_type_i(data: list[str]) -> dict[str, int]: + """Transforma linhas tipo I(ID do evento) + + Args: + data (list[str]): linhas tipo I + + Returns: + dict[str, int]: dict com o valor do ID + """ aux = data[0] - return {"ID":int(aux[60:74])} + return {"ID": int(aux[60:74])} -FUNCS = {1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i} +FUNCS = { + 1: _parse_type_1, + 3: _parse_type_3, + 6: _parse_type_6, + "E": _parse_type_e, + "I": _parse_type_i, +} + +# -- Deprecated + + +def validate_station_numbers(expected: int, stationsDF: pd.DataFrame) -> bool: + """[summary] + + [description] + + Args: + expected (int): [description] + stationsDF (pd.DataFrame): [description] + + Returns: + bool: [description] + """ + uniqueStations = stationsDF["Estacao"].nunique() + return expected == uniqueStations diff --git a/utils/plot.py b/utils/plot.py deleted file mode 100644 index b5ccad1..0000000 --- a/utils/plot.py +++ /dev/null @@ -1,49 +0,0 @@ -import collections -import datetime - -import stats -from matplotlib import pyplot as plt - - -class Plotter: - def __init__(self, data): - self.raw_data = data - pass - - def extract_info(self): - pass - - def plot_events_day(self): - values = collections.Counter(stats._preprare_days(self.raw_data)) - - x = list(values.keys()) - y = list(values.values()) - fig, ax = plt.subplots(layout="constrained") - - ax.bar(x, y) - plt.show() - - def plot_events_month(self): - values = collections.Counter(stats._preprare_months(self.raw_data)) - - x = list(values.keys()) - y = list(values.values()) - fig, ax = plt.subplots(layout="constrained") - - ax.bar(x, y) - plt.show() - - -if __name__ == "__main__": - import parser - - asdf = parser.parse("../dados.txt") - - a = Plotter(asdf) - # b = stats._filter_mags(a.raw_data, more_than=2.5, less_than=2.9) - c = stats.filter_date( - a.raw_data, - after=datetime.datetime(year=2014, month=1, day=6), - before=datetime.datetime(year=2014, month=1, day=12), - ) - print(c) diff --git a/utils/stats.py b/utils/stats.py index 904bd1c..69a8eff 100644 --- a/utils/stats.py +++ b/utils/stats.py @@ -1,11 +1,14 @@ import os import sys +from typing import Any, Iterable, TypeAlias -import pandas as pd import numpy as np +import pandas as pd -STAT_HEADER ="""=== Terramotos === - == Estatísticas == +from utils.utils import extract_mag_depth + +STAT_HEADER = """=== Terramotos === + == Estatísticas == """ STAT_MENU = """[1] Média @@ -14,6 +17,7 @@ STAT_MENU = """[1] Média [4] Máximo [5] Mínimo [6] Moda +[7] Print de todas as estatísticas [T] Estatísticas Temporais (T5) [Q] Voltar ao menu principal @@ -25,10 +29,20 @@ FILTER_CHOICES = """[1] Magnitudes """ -CHOICE = {"1": "Magnitudes", "2": "Distancia","3": "Profundidade"} +CHOICE = {"1": "Magnitudes", "2": "Distancia", "3": "Profundidade"} def filter_submenu(type: str): + """[summary] + + [description] + + Args: + type (str): [description] + + Returns: + [type]: [description] + """ os.system("cls" if sys.platform == "windows" else "clear") print(f"{STAT_HEADER}\n = {type} = ") print(FILTER_CHOICES) @@ -42,70 +56,110 @@ def filter_submenu(type: str): return None - # -- t5 funcs + def _get_unique_events(df: pd.DataFrame) -> pd.DataFrame: - return df.drop_duplicates(subset="ID", keep='first') + """Função privada que retorna os eventos únicos -def convert_to_datetime(df: pd.DataFrame) -> pd.DataFrame: - # Converte coluna Data para objetos datetime - df = df.copy() - df['Data'] = pd.to_datetime(df['Data'], format='mixed') - return df + (Ler docstring do `parser.py` para o porquê de se fazer isto) -def events_per_period(df: pd.DataFrame, period: str): + Args: + df (pd.DataFrame): Dataframe com todos os eventos + + Returns: + pd.DataFrame: Dataframe com apenas uma linha por evento + """ + return df.drop_duplicates(subset="ID", keep="first") + + +def events_per_period(df: pd.DataFrame, period: str) -> tuple[Iterable, Iterable]: + """Retorna os eventos por período, seja por dia, seja por mês + + Args: + df (pd.DataFrame): Dataframe com valores + period (str): tipo de período. `D` para dia, `M` para mês + + Returns: + tuple[Iterable, Iterable]: tuple com iteradores dos indices e valores + """ # Calcula o número de eventos por dia ('D') ou mês ('M') - df = convert_to_datetime(df) events = _get_unique_events(df) - if period == 'M': - period = 'ME' - - res = events.set_index('Data').resample(period).size() - return res.index, res.values + if period == "M": + period = "ME" + + res = events.set_index("Data").resample(period).size() + return (res.index, res.values) + + +def stats_depth_month(df: pd.DataFrame) -> pd.DataFrame: + """Estatisticas de profundidade de sismos, por mes + + Args: + df (pd.DataFrame): DataFrame com eventos + + Returns: + [type]: Dataframe com as estatisticas de profundidade, por mes + """ + events = _get_unique_events(df) + + grouped = events.set_index("Data").resample("ME")["Profundidade"] + + stats_df = pd.DataFrame( + { + "Mean": grouped.mean(), + "Std": grouped.std(), + "Median": grouped.median(), + "Q1": grouped.quantile(0.25), + "Q3": grouped.quantile(0.75), + "Min": grouped.min(), + "Max": grouped.max(), + } + ) + return stats_df -def stats_depth_month(df: pd.DataFrame): - # Calcula estatísticas de Profundidade por Mês - df = convert_to_datetime(df) - events = _get_unique_events(df) - - grouped = events.set_index('Data').resample('ME')['Profundidade'] - - stats_df = pd.DataFrame({ - 'Mean': grouped.mean(), - 'Std': grouped.std(), - 'Median': grouped.median(), - 'Q1': grouped.quantile(0.25), - 'Q3': grouped.quantile(0.75), - 'Min': grouped.min(), - 'Max': grouped.max() - }) - return stats_df def stats_mag_month(df: pd.DataFrame): + """Estatisticas de magnitude dos sismos, por mes + + Args: + df (pd.DataFrame): DataFrame com eventos + + Returns: + [type]: Dataframe com as estatisticas de magnitude, por mes + """ # Calcula estatísticas de Magnitude por Mês - df = convert_to_datetime(df) events = _get_unique_events(df) - - def get_max_mag(mags): - vals = [float(m['Magnitude']) for m in mags if 'Magnitude' in m] + + def _get_max_mag(mags: pd.Series): + """Funcao aplicadora à df, para encontrar a maior magnitude + + Args: + mags (pd.Series): Serie com as magnitudes + + Returns: + pd.Series: Serie com a magnitude maxima + """ + vals = [float(m["Magnitude"]) for m in mags if "Magnitude" in m] return max(vals) if vals else np.nan events = events.copy() - events['MaxMag'] = events['Magnitudes'].apply(get_max_mag) - - grouped = events.set_index('Data').resample('ME')['MaxMag'] - - stats_df = pd.DataFrame({ - 'Mean': grouped.mean(), - 'Std': grouped.std(), - 'Median': grouped.median(), - 'Q1': grouped.quantile(0.25), - 'Q3': grouped.quantile(0.75), - 'Min': grouped.min(), - 'Max': grouped.max() - }) + events["MaxMag"] = events["Magnitudes"].apply(_get_max_mag) + + grouped = events.set_index("Data").resample("ME")["MaxMag"] + + stats_df = pd.DataFrame( + { + "Mean": grouped.mean(), + "Std": grouped.std(), + "Median": grouped.median(), + "Q1": grouped.quantile(0.25), + "Q3": grouped.quantile(0.75), + "Min": grouped.min(), + "Max": grouped.max(), + } + ) return stats_df @@ -119,22 +173,36 @@ T5_MENU = """[1] Número de eventos por dia [Q] Voltar """ + def t5_menu(df: pd.DataFrame): + """Menu de estatisticas das magnitudes e profundidades por mes + + Args: + df (pd.DataFrame): Dataframe com os eventos + """ while True: os.system("cls" if sys.platform == "windows" else "clear") print(STAT_HEADER + "\n" + " == T5: Estatísticas Temporais ==\n" + T5_MENU) usrIn = input("Opção: ").lower() - + match usrIn: case "1": - dates, counts = events_per_period(df, 'D') + dates, counts = events_per_period(df, "D") print("\nEventos por Dia:") - print(pd.DataFrame({'Data': dates, 'Contagem': counts}).to_string(index=False)) - + print( + pd.DataFrame({"Data": dates, "Contagem": counts}).to_string( + index=False + ) + ) + case "2": - dates, counts = events_per_period(df, 'M') + dates, counts = events_per_period(df, "M") print("\nEventos por Mês:") - print(pd.DataFrame({'Data': dates, 'Contagem': counts}).to_string(index=False)) + print( + pd.DataFrame({"Data": dates, "Contagem": counts}).to_string( + index=False + ) + ) case "3": st = stats_depth_month(df) @@ -145,18 +213,25 @@ def t5_menu(df: pd.DataFrame): st = stats_mag_month(df) print("\nEstatísticas Magnitude por Mês:") print(st.to_string()) - + case "q": return case _: pass - + input("\n[Enter] para continuar...") # -- stat menu + def stat_menu(df: pd.DataFrame): + """Menu de estatísticas + + + Args: + df (pd.DataFrame): Dataframe com eventos + """ inStats = True while inStats: os.system("cls" if sys.platform == "windows" else "clear") @@ -231,17 +306,64 @@ def stat_menu(df: pd.DataFrame): else: continue + case "7": + m, d = _mag_depth(df) + + print("\t\tMagnitude\tProfundidade") + for a, b in zip(m, d): + print(f"{a[0]}\t{round(a[1], 4)}\t\t{round(b[1], 4)}") + case "q": inStats = False continue case _: pass - + input("Clica `Enter` para continuar") -def average(df: pd.DataFrame, filter_by): +type tuples = tuple[list[tuple[str, Any]], list[tuple[str, Any]]] + + +def _mag_depth(df: pd.DataFrame) -> tuples: + """Cria uma lista com cada estatística para as magnitudes e profundidades, + de forma a ser possivel fazer print de tudo de uma só vez + + + Args: + df (pd.DataFrame): Dataframe com valores + + Returns: + tuples: lista com estatisticas das magnitudes e profundidades + """ + data = extract_mag_depth(df) + + mag_array = data.Magnitudes.values + depth_array = data.Profundidade.values + + mags = [] + dep = [] + for a, b in zip( + ["Media\t", "Desvio-Padrao", "Variancia", "Valor Maximo", "Valor Minimo"], + [np.average, np.std, np.var, np.max, np.min], + ): + mags.append((a, b(mag_array))) + dep.append((a, b(depth_array))) + + return (mags, dep) + + +def average(df: pd.DataFrame, filter_by) -> np.float64 | None: + """Calculo da média para o tipo especifico + + Args: + df (pd.DataFrame): Dataframe com valores + filter_by (str): Valor para calculo da media + + Returns: + np.float64 | None: média + """ events = _get_unique_events(df) values = events[filter_by].to_numpy() @@ -249,11 +371,20 @@ def average(df: pd.DataFrame, filter_by): values = _unpack_mags(values) try: return np.average(values) - except: + except Exception: return None -def variance(df, filter_by): +def variance(df: pd.DataFrame, filter_by: str) -> np.float64 | None: + """calcula a variancia para o tipo especificado + + Args: + df (pd.DataFrame): Dataframe com valores + filter_by (str): Valor para calculo da variancia + + Returns: + np.float64 | None: variancia + """ events = _get_unique_events(df) values = events[filter_by].to_numpy() @@ -262,44 +393,81 @@ def variance(df, filter_by): try: return np.var(values) - except: + except Exception: return None -def std_dev(df, filter_by): +def std_dev(df: pd.DataFrame, filter_by: str) -> np.float64 | None: + """calcula o desvio-padrao para o tipo especificado + + Args: + df (pd.DataFrame): Dataframe com valores + filter_by (str): Valor para calculo do desvio-padrao + + Returns: + np.float64 | None: desvio-padrao + """ events = _get_unique_events(df) values = events[filter_by].to_numpy() if filter_by == "Magnitudes": values = _unpack_mags(values) - + try: return np.std(values) - except: + except Exception: return None -def max_v(df, filter_by): +def max_v(df: pd.DataFrame, filter_by: str) -> np.floating: + """Retorna o valor maximo num array + + Args: + df (pd.DataFrame): Dataframe com valores + filter_by (str): Coluna para o valor maximo + + Returns: + np.floating: valor maximo + """ events = _get_unique_events(df) values = events[filter_by].to_numpy() if filter_by == "Magnitudes": values = _unpack_mags(values) - + return np.max(values) -def min_v(df, filter_by): +def min_v(df, filter_by) -> np.floating: + """Retorna o valor minimo num array + + Args: + df (pd.DataFrame): Dataframe com valores + filter_by (str): Coluna para o valor minimo + + Returns: + np.floating: valor minimo + """ events = _get_unique_events(df) values = events[filter_by].to_numpy() if filter_by == "Magnitudes": values = _unpack_mags(values) - + return np.min(values) -def moda(df, filter_by): +def moda(df, filter_by) -> np.floating: + """Calcula a moda para um array de valores + + + Args: + df (pd.DataFrame): Dataframe com valores + filter_by (str): Coluna para o calculo da moda + + Returns: + np.floating: moda + """ events = _get_unique_events(df) values = events[filter_by].to_numpy() @@ -309,13 +477,21 @@ def moda(df, filter_by): uniques, count = np.unique(values, return_counts=True) uniques_list = list(zip(uniques, count)) - return sorted(uniques_list, reverse=True ,key=lambda x: x[1])[0][0] + return sorted(uniques_list, reverse=True, key=lambda x: x[1])[0][0] -def _unpack_mags(arr: np.ndarray): +def _unpack_mags(arr: np.ndarray) -> np.ndarray: + """Funcao privada para facilitar o calculo das magnitudes + + + Args: + arr (np.ndarray): Lista dos tipos de magnitudes + + Returns: + np.ndarray: magnitudes + """ newVals = np.empty(0) for v in arr: for m in v: newVals = np.append(newVals, float(m["Magnitude"])) return newVals - diff --git a/utils/utils.py b/utils/utils.py index 1e04fd7..67acbdf 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -1,24 +1,71 @@ #! /usr/bin/env python # pyright: basic -from datetime import time import json +from datetime import time from math import modf from typing import Any import pandas as pd -def save_as_json(df: pd.DataFrame, fname, event_cols, station_cols) -> bool: - info = create_dict_struct(df, event_cols, station_cols) +def extract_mag_depth(df: pd.DataFrame) -> pd.DataFrame: + """Extrai as magnitudes e profundidades. + + Nas magnitudes, apenas deixa o tipo L + + Args: + df (pd.DataFrame): Dataframe com eventos + + Returns: + pd.DataFrame: Dataframe com apenas magnitudes e profundidades + """ + _df = df.drop_duplicates(subset="ID", keep="first")[ + ["Magnitudes", "Profundidade"] + ].reset_index(drop=True) + mags = [] + + for _, value in _df.iterrows(): + for mag in value.Magnitudes: + if mag["Tipo"] == "L": + mags.append(float(mag["Magnitude"])) + break + _df = _df.drop(columns=["Magnitudes"]) + aux = pd.DataFrame.from_dict({"Magnitudes": mags}) + return pd.concat([aux, _df], axis=1) + + +def save_as_json(df: pd.DataFrame, fname: str, event_cols: list[str]) -> bool: + """Guarda a dataframe como um ficheiro JSON + + + Args: + df (pd.DataFrame): Dataframe com eventos + fname (str): nome do ficheiro a guardar + event_cols (list[str]): lista com os nomes das colunas presentes em `df` + + Returns: + bool: Sucesso da operacao + """ + info = _create_dict_struct(df, event_cols) with open(fname, "w") as fp: json.dump(info, fp, indent=4) - + return True -# TODO: passar os nomes das colunas, para não haver problemas no futuro, caso se altere os nomes da dataframe -def create_dict_struct(df: pd.DataFrame, event_cols, station_cols) -> dict[str, Any]: - # get all events by their id + +def _create_dict_struct(df: pd.DataFrame, event_cols) -> dict[str, Any]: + """Funcao privada para ajuda a guardar como ficheiro JSON + + [description] + + Args: + df (pd.DataFrame): [description] + event_cols ([type]): [description] + + Returns: + dict[str, Any]: [description] + """ uniqueIds = df["ID"].unique() allEvents = {} @@ -26,17 +73,30 @@ def create_dict_struct(df: pd.DataFrame, event_cols, station_cols) -> dict[str, for id in uniqueIds: filteredDf = df.loc[df["ID"] == id] first_row = filteredDf.head(1) - allEvents[int(id)] = create_event_info(first_row, event_cols) + allEvents[int(id)] = _create_event_info(first_row, event_cols) allEvents[int(id)].update(create_stations_info_1(filteredDf)) return allEvents -def create_event_info(info: pd.DataFrame, cols) -> dict[str, Any]: +def _create_event_info(info: pd.DataFrame, cols) -> dict[str, Any]: + """Funcao privada para criar a estrutura dict pretendida + no ficheiro JSOn + + + Args: + info (pd.DataFrame): dataframe com eventos + cols ([type]): lista com nomes das colunas + + Returns: + dict[str, Any]: dict com o formato pretendido + """ informacoes = dict() for v in cols: - if v == "Magnitudes": + if v == "Data": + informacoes[v] = info.iloc[0][v].isoformat() + elif v == "Magnitudes": informacoes[v] = create_mag_info(info.iloc[0][v]) elif v in {"Latitude", "Longitude", "Profundidade", "Gap"}: informacoes[v] = float(info.iloc[0][v]) @@ -47,20 +107,33 @@ def create_event_info(info: pd.DataFrame, cols) -> dict[str, Any]: def create_stations_info_1(info: pd.DataFrame) -> dict[str, Any]: + """Funcao privada para ajuda de formatacao no guardar como JSON + + Args: + info (pd.DataFrame): dataframe com eventos + + Returns: + dict[str, Any]: dict com o formato pretendido + """ stationsDict = {} for idx in range(len(info)): aux = info.iloc[idx] micro, sec = tuple(map(int, modf(aux["Seg"]))) - hms = time(hour=aux["Hora"],minute=aux["Min"], second=sec, microsecond=micro).strftime("%H:%M:%S.%f") - station = {"Componente": aux["Componente"], "Hora": hms, "Distancia": float(aux["DIS"])} + hms = time( + hour=aux["Hora"], minute=aux["Min"], second=sec, microsecond=micro + ).strftime("%H:%M:%S.%f") + station = { + "Componente": aux["Componente"], + "Hora": hms, + "Distancia": float(aux["DIS"]), + } - if type(aux["Tipo Onda"]) != float: + if type(aux["Tipo Onda"]) is float: station.update({"Tipo Onda": aux["Tipo Onda"]}) if aux["Tipo Onda"] == "IAML": station.update({"Amplitude": float(aux["Amplitude"])}) - if aux["Estacao"] not in stationsDict.keys(): stationsDict[aux["Estacao"]] = [station] else: @@ -68,16 +141,16 @@ def create_stations_info_1(info: pd.DataFrame) -> dict[str, Any]: return {"Estacoes": stationsDict} -def create_mag_info(magnitudes): +def create_mag_info(magnitudes: list[dict[str, Any]]) -> dict[str, Any]: + """Funcao privada para parsing das magnitudes + + Args: + magnitudes (list[dict[str, Any]]): [description] + + Returns: + dict[str, Any]: dict com o formato pretendido + """ mags = {} for value in magnitudes: mags[value["Tipo"]] = value["Magnitude"] return mags - - -if __name__ == '__main__': - import parser - - df = parser.parse("dados.txt") - a = create_dict_struct(df, None, None) - save_as_json(a) diff --git a/utils/visuals.py b/utils/visuals.py index b88f644..5ab163e 100644 --- a/utils/visuals.py +++ b/utils/visuals.py @@ -1,14 +1,25 @@ -import matplotlib.pyplot as plt -import pandas as pd -import sys import os +import sys + +import matplotlib.pyplot as plt import numpy as np +import pandas as pd from utils import stats # -- helpers + def plot_bar(x, y, xLabel, yLabel, title): + """Funcao para efetuar o plot de um grafico de barras + + Args: + x ([]): valores em x + y ([type]): valor y correspondente a cada valor x + xLabel ([type]): [description] + yLabel ([type]): [description] + title ([type]): [description] + """ plt.figure(figsize=(10, 6)) plt.bar(x, y) plt.xlabel(xLabel) @@ -18,9 +29,22 @@ def plot_bar(x, y, xLabel, yLabel, title): plt.tight_layout() plt.show() + def plot_linear_with_std(x, mean, std, xLabel, yLabel, title): + """[summary] + + [description] + + Args: + x ([type]): [description] + mean ([type]): [description] + std ([type]): [description] + xLabel ([type]): [description] + yLabel ([type]): [description] + title ([type]): [description] + """ plt.figure(figsize=(10, 6)) - plt.errorbar(x, mean, yerr=std, fmt='-o', capsize=5, ecolor='red') + plt.errorbar(x, mean, yerr=std, fmt="-o", capsize=5, ecolor="red") plt.xlabel(xLabel) plt.ylabel(yLabel) plt.title(title) @@ -29,11 +53,23 @@ def plot_linear_with_std(x, mean, std, xLabel, yLabel, title): plt.tight_layout() plt.show() + def plot_boxplot(dataList, labels, xLabel, yLabel, title): + """[summary] + + [description] + + Args: + dataList ([type]): [description] + labels ([type]): [description] + xLabel ([type]): [description] + yLabel ([type]): [description] + title ([type]): [description] + """ # dataList: lista de arrays/series, um para cada etiqueta # labels: lista de etiquetas correspondentes a dataList plt.figure(figsize=(10, 6)) - plt.boxplot(dataList, labels=labels) + plt.boxplot(dataList, label=labels) plt.xlabel(xLabel) plt.ylabel(yLabel) plt.title(title) @@ -41,56 +77,95 @@ def plot_boxplot(dataList, labels, xLabel, yLabel, title): plt.tight_layout() plt.show() + # -- t6 logic + def viz_events_per_period(df: pd.DataFrame, period: str, title_suffix: str): + """[summary] + + [description] + + Args: + df (pd.DataFrame): [description] + period (str): [description] + title_suffix (str): [description] + """ dates, counts = stats.events_per_period(df, period) # Formatar datas para melhor leitura no gráfico - if period == 'D': - # dates é um DatetimeIndex - labels = [d.strftime('%Y-%m-%d') for d in dates] + if period == "D": + # dates é um DatetimeIndex + labels = [d.strftime("%Y-%m-%d") for d in dates] else: - labels = [d.strftime('%Y-%m') for d in dates] - + labels = [d.strftime("%Y-%m") for d in dates] + plot_bar(labels, counts, "Data", "Número de Eventos", f"Eventos por {title_suffix}") + def viz_linear_stats(df: pd.DataFrame, target: str): + """[summary] + + [description] + + Args: + df (pd.DataFrame): [description] + target (str): [description] + """ # Média +/- Desvio Padrão - if target == 'Profundidade': + if target == "Profundidade": st = stats.stats_depth_month(df) unit = "km" - else: # Magnitude + else: # Magnitude st = stats.stats_mag_month(df) unit = "Magn" - - labels = [d.strftime('%Y-%m') for d in st.index] - - plot_linear_with_std(labels, st['Mean'], st['Std'], "Mês", f"{target} ({unit})", f"Média e Desvio Padrão de {target} por Mês") + + labels = [d.strftime("%Y-%m") for d in st.index] + + plot_linear_with_std( + labels, + st["Mean"], + st["Std"], + "Mês", + f"{target} ({unit})", + f"Média e Desvio Padrão de {target} por Mês", + ) + def viz_boxplot(df: pd.DataFrame, target: str): - df = stats.convert_to_datetime(df) + """[summary] + + [description] + + Args: + df (pd.DataFrame): [description] + target (str): [description] + + Returns: + [type]: [description] + """ events = stats._get_unique_events(df) - + # Agrupar por mês - grouped = events.set_index('Data').resample('ME') - + grouped = events.set_index("Data").resample("ME") + data_to_plot = [] labels = [] - + for name, group in grouped: - if target == 'Profundidade': - vals = group['Profundidade'].dropna().values + if target == "Profundidade": + vals = group["Profundidade"].dropna().values else: - # Extrair magnitudes máximas - def get_max_mag(mags): - vals = [float(m['Magnitude']) for m in mags if 'Magnitude' in m] + # Extrair magnitudes máximas + def get_max_mag(mags): + vals = [float(m["Magnitude"]) for m in mags if "Magnitude" in m] return max(vals) if vals else np.nan - vals = group['Magnitudes'].apply(get_max_mag).dropna().values - + + vals = group["Magnitudes"].apply(get_max_mag).dropna().values + if len(vals) > 0: data_to_plot.append(vals) - labels.append(name.strftime('%Y-%m')) - + labels.append(name.strftime("%Y-%m")) + plot_boxplot(data_to_plot, labels, "Mês", target, f"Boxplot de {target} por Mês") @@ -108,17 +183,25 @@ VISUALS_MENU = """[1] Gráfico Barras: Eventos por Dia HEADER = "=== T6: Representação Gráfica ===" + def visual_menu(df: pd.DataFrame): + """ + + [description] + + Args: + df (pd.DataFrame): [description] + """ while True: os.system("cls" if sys.platform == "windows" else "clear") print(HEADER + "\n" + VISUALS_MENU) usrIn = input("Opção: ").lower() - + match usrIn: case "1": - viz_events_per_period(df, 'D', "Dia") + viz_events_per_period(df, "D", "Dia") case "2": - viz_events_per_period(df, 'M', "Mês") + viz_events_per_period(df, "M", "Mês") case "3": viz_linear_stats(df, "Profundidade") case "4":