From 14dee58ab2550783ebe522d3a6e954d7f2ab6f0c Mon Sep 17 00:00:00 2001 From: Shino Date: Thu, 11 Dec 2025 14:18:03 -0100 Subject: [PATCH] graficos, estatitsticas e filtros --- README.md | 21 ++++++- utils/parser.py | 118 ++++++++++++++++++++++++++++-------- utils/plot.py | 34 ++++------- utils/stats.py | 155 +++++++++++++++++++++++++++++++++++++++++++----- utils/utils.py | 27 +++++++-- 5 files changed, 286 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index d42f72d..fd4d1d4 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,26 @@ First, let's represent the data using Python's Pandas module and implement CRUD - T2 - Implement CRUD operations through a text menu; - T3 - Implement statistical operations such as: average, variance, standard desviation, max, min, mode; through a text menu; - T4 - Convert from Pandas to JSON and save it in a text file; -- T5 - to be continued ... +- T5 - Calcular as seguintes estatísticas: + - Número de eventos por dia e por mês. + - Média e desvio padrão da profundidade e da magnitude por mês. + - Mediana, 1º quartil e 3º quartil da profundidade e da magnitude por mês. + - Máximo e mínimo a profundidade e da magnitude por mês. +- T6 - Para a representação gráfica: + - Um gráfico de barras com o numero de eventos por dia. + - Um gráfico de barras com o numero de eventos por mês. + - Um gráfico linear com a média +/- o desvio padrão das profundidades por mês. + - Um gráfico linear com a média +/- a desvio padrão da magnitude L por mês. + - Um gráfico tipo "boxplot" com as profundidades por mês. + - Um gráfico tipo "boxplot" com as magnitudes L por mês. +- T7 - Implementar os filtros de seleção de eventos para o cálculo / representação gráfica: + - Período temporal (Data inicial, Data final). + - Eventos com GAP menor que um determinado valor. + - Qualidade (EPI ou Todos). + - Zonas SZ. + - Zonas VZ. + - Limitar por Magnitudes L (mínimo, máximo). + - Limitar Profundidades (mínimo, máximo). ## Prazos - T1 a T4 -> 10 de novembro diff --git a/utils/parser.py b/utils/parser.py index 1e80b15..ecaba9b 100644 --- a/utils/parser.py +++ b/utils/parser.py @@ -1,6 +1,5 @@ # pyright: basic import io - from collections import defaultdict from datetime import datetime @@ -11,40 +10,45 @@ DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"} TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"} -# --- helper funcs --- +# --- helper funcs --- def is_blank(l: str) -> bool: return len(l.strip(" ")) == 0 -def parse_flt(v:str) -> float | None: + +def parse_flt(v: str) -> float | None: try: t = float(v) return t except ValueError: return None -def parse_int(v:str) -> int | None: + +def parse_int(v: str) -> int | None: try: t = int(v) return t except ValueError: return None + def into_dataframe(data) -> pd.DataFrame: if len(data) == 0: return pd.DataFrame() aux = {k: [] for k in data.keys()} - for (k,v) in data.items(): + for k, v in data.items(): aux[k].append(v) return pd.DataFrame(data=aux) + def _concat(preamble, df: pd.DataFrame): - for (k,v) in preamble.items(): - df.insert(len(df.columns)-1, k, [v for _ in range(len(df))]) + for k, v in preamble.items(): + df.insert(len(df.columns) - 1, k, [v for _ in range(len(df))]) return df -def validate_no_stations(expected:int , stationsDF:pd.DataFrame) -> bool: + +def validate_no_stations(expected: int, stationsDF: pd.DataFrame) -> bool: uniqueStations = stationsDF["Estacao"].nunique() return expected == uniqueStations @@ -55,29 +59,31 @@ def parse(fname): data = [l for l in fp.read().split("\n")] chunks = boundaries(data) df = pd.DataFrame() - for (idx,c) in enumerate(chunks): - a = parse_chunk(data[c[0]:c[1]]) + for idx, c in enumerate(chunks): + a = parse_chunk(data[c[0] : c[1]]) aux = pd.concat([df, a], axis=0, ignore_index=True) df = aux fp.close() return df + def boundaries(data: list[str]): boundaries = [] start = None - for (idx,l) in enumerate(data): + for idx, l in enumerate(data): if start is None: if not is_blank(l): start = idx else: if is_blank(l): - boundaries.append((start,idx)) + boundaries.append((start, idx)) start = None return boundaries + def parse_chunk(chunk_lines: list[str]): hIdx = None - for (idx, l) in enumerate(chunk_lines): + for idx, l in enumerate(chunk_lines): if l[-1] == "7": hIdx = idx break @@ -89,6 +95,7 @@ def parse_chunk(chunk_lines: list[str]): return _concat(preambleRet, phaseRet) + def _parse_preamble(hLines: list[str]): aux = defaultdict(list) @@ -111,7 +118,7 @@ def _parse_preamble(hLines: list[str]): pass headerDict = dict() - for (k,v) in aux.items(): + for k, v in aux.items(): if len(v) != 0: headerDict.update(FUNCS[k](v)) return headerDict @@ -126,7 +133,7 @@ def _parse_type_1(data: list[str]): m = int(aux[13:15]) s = int(aux[16:18]) mil = int(aux[19]) * 10**5 - dt = datetime(y,mo,d,h,m,s,mil) + dt = datetime(y, mo, d, h, m, s, mil) dist_ind = DIST_IND[aux[21]] ev_type = TYPE[aux[22]] @@ -135,18 +142,28 @@ def _parse_type_1(data: list[str]): depth = float(aux[38:43]) no_stat = int(aux[48:51]) - hypo = {"Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Evento": ev_type, "Latitude": lat, "Longitude": long, "Profundidade": depth, "Estacoes": no_stat, "Magnitudes": list()} + hypo = { + "Data": dt.isoformat(), + "Distancia": dist_ind, + "Tipo Evento": ev_type, + "Latitude": lat, + "Longitude": long, + "Profundidade": depth, + "Estacoes": no_stat, + "Magnitudes": list(), + } for l in data: hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l) return hypo + def _parse_mag(line: str): magnitudes = [] base = 55 while base < 79: - m = line[base:base+4] - mt = line[base+4] + m = line[base : base + 4] + mt = line[base + 4] if not is_blank(m): magnitudes.append({"Magnitude": m, "Tipo": mt}) base += 8 @@ -156,11 +173,24 @@ def _parse_mag(line: str): def _parse_type_3(data: list[str]): comments = {} for line in data: - if line.startswith(" SENTIDO") or line.startswith(" REGIAO"): + if line.startswith(" SENTIDO"): c, v = line[:-2].strip().split(": ", maxsplit=1) v = v.split(",")[0] comments[c.capitalize()] = v + elif line.startswith(" REGIAO"): + c, vals = line[:-2].strip().split(": ", maxsplit=1) + _d = {} + for v in vals.split(","): + if v.startswith("SZ"): + comments["SZ"] = int(v[2:]) + elif v.startswith("VZ"): + comments["VZ"] = int(v[2:]) + elif v.startswith("FE"): + comments["FZ"] = v[2:] + else: + comments["Regiao"] = v + return comments @@ -173,21 +203,59 @@ def _parse_type_6(data: list[str]): def _parse_type_7(data: list[str]): aux = io.StringIO("\n".join(data)) - dados = pd.read_fwf(aux, colspecs=[(1,5), (6,8),(10,15), (18,20), (20,22), (23,28), (34,38), (71,75)]) - dados.rename(columns={'STAT': "Estacao", 'SP': "Componente" , 'PHASW': "Tipo Onda", 'HR': "Hora", 'MM': "Min", 'SECON': "Seg", 'AMPL': "Amplitude", " DIST": "Distancia Epicentro"}, inplace=True) + dados = pd.read_fwf( + aux, + colspecs=[ + (1, 5), + (6, 8), + (10, 15), + (18, 20), + (20, 22), + (23, 28), + (34, 38), + (71, 75), + ], + ) + dados.rename( + columns={ + "STAT": "Estacao", + "SP": "Componente", + "PHASW": "Tipo Onda", + "HR": "Hora", + "MM": "Min", + "SECON": "Seg", + "AMPL": "Amplitude", + " DIST": "Distancia Epicentro", + }, + inplace=True, + ) return dados def _parse_type_e(data: list[str]): aux = data[0] - error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])} + error = { + "Gap": int(aux[5:8]), + "Origin": float(aux[14:20]), + "Error_lat": float(aux[24:30]), + "Error_long": float(aux[32:38]), + "Error_depth": float(aux[38:43]), + "Cov_xy": float(aux[43:55]), + "Cov_xz": float(aux[55:67]), + "Cov_yz": float(aux[67:79]), + } return error def _parse_type_i(data: list[str]): aux = data[0] - return {"ID":int(aux[60:74])} + return {"ID": int(aux[60:74])} -FUNCS = {1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i} - +FUNCS = { + 1: _parse_type_1, + 3: _parse_type_3, + 6: _parse_type_6, + "E": _parse_type_e, + "I": _parse_type_i, +} diff --git a/utils/plot.py b/utils/plot.py index 5c3f08d..0c41f3b 100644 --- a/utils/plot.py +++ b/utils/plot.py @@ -1,6 +1,8 @@ import collections import datetime +import numpy as np +import stats from matplotlib import pyplot as plt @@ -13,7 +15,7 @@ class Plotter: pass def plot_events_day(self): - values = collections.Counter(self._preprare_days()) + values = collections.Counter(stats._preprare_days(self.raw_data)) x = list(values.keys()) y = list(values.values()) @@ -23,7 +25,7 @@ class Plotter: plt.show() def plot_events_month(self): - values = collections.Counter(self._preprare_months()) + values = collections.Counter(stats._preprare_months(self.raw_data)) x = list(values.keys()) y = list(values.values()) @@ -32,26 +34,6 @@ class Plotter: ax.bar(x, y) plt.show() - def _preprare_days(self): - c = self.raw_data.Data.to_list() - for idx, d in enumerate(c): - aux = datetime.datetime.fromisoformat(d) - c[idx] = datetime.datetime.strftime(aux, "%Y-%m-%d") - - return c - - def _preprare_months(self): - c = self.raw_data.Data.to_list() - for idx, d in enumerate(c): - aux = datetime.datetime.fromisoformat(d) - c[idx] = datetime.datetime.strftime(aux, "%Y-%m") - - return c - - def _prepare_mags(self): - pass - # c = self.raw_data. - if __name__ == "__main__": import parser @@ -59,4 +41,10 @@ if __name__ == "__main__": asdf = parser.parse("../dados.txt") a = Plotter(asdf) - print(a.raw_data.dtypes) + # b = stats._filter_mags(a.raw_data, more_than=2.5, less_than=2.9) + c = stats.filter_date( + a.raw_data, + after=datetime.datetime(year=2014, month=1, day=6), + before=datetime.datetime(year=2014, month=1, day=12), + ) + print(c) diff --git a/utils/stats.py b/utils/stats.py index 8ef25cd..949ab8c 100644 --- a/utils/stats.py +++ b/utils/stats.py @@ -1,13 +1,15 @@ # pyright: basic +import datetime import os import sys -import pandas as pd import numpy as np +import pandas as pd +import utils -STAT_HEADER ="""=== Terramotos === - == Estatísticas == +STAT_HEADER = """=== Terramotos === + == Estatísticas == """ STAT_MENU = """[1] Média @@ -26,7 +28,7 @@ FILTER_CHOICES = """[1] Magnitudes """ -CHOICE = {"1": "Magnitudes", "2": "Distancia","3": "Prof"} +CHOICE = {"1": "Magnitudes", "2": "Distancia", "3": "Prof"} def filter_submenu(type: str): @@ -124,7 +126,7 @@ def stat_menu(df: pd.DataFrame): def average(df: pd.DataFrame, filter_by): - events = df.drop_duplicates(subset="ID", keep='first') + events = df.drop_duplicates(subset="ID", keep="first") values = events[filter_by].to_numpy() if filter_by == "Magnitudes": @@ -136,7 +138,7 @@ def average(df: pd.DataFrame, filter_by): def variance(df, filter_by): - events = df.drop_duplicates(subset="ID", keep='first') + events = df.drop_duplicates(subset="ID", keep="first") values = events[filter_by].to_numpy() if filter_by == "Magnitudes": @@ -149,12 +151,12 @@ def variance(df, filter_by): def std_dev(df, filter_by): - events = df.drop_duplicates(subset="ID", keep='first') + events = df.drop_duplicates(subset="ID", keep="first") values = events[filter_by].to_numpy() if filter_by == "Magnitudes": values = _unpack_mags(values) - + try: return np.std(values) except: @@ -162,27 +164,27 @@ def std_dev(df, filter_by): def max_v(df, filter_by): - events = df.drop_duplicates(subset="ID", keep='first') + events = df.drop_duplicates(subset="ID", keep="first") values = events[filter_by].to_numpy() if filter_by == "Magnitudes": values = _unpack_mags(values) - + return np.max(values) def min_v(df, filter_by): - events = df.drop_duplicates(subset="ID", keep='first') + events = df.drop_duplicates(subset="ID", keep="first") values = events[filter_by].to_numpy() if filter_by == "Magnitudes": values = _unpack_mags(values) - + return np.min(values) def moda(df, filter_by): - events = df.drop_duplicates(subset="ID", keep='first') + events = df.drop_duplicates(subset="ID", keep="first") values = events[filter_by].to_numpy() if filter_by == "Magnitudes": @@ -191,7 +193,7 @@ def moda(df, filter_by): uniques, count = np.unique(values, return_counts=True) uniques_list = list(zip(uniques, count)) - return sorted(uniques_list, reverse=True ,key=lambda x: x[1])[0][0] + return sorted(uniques_list, reverse=True, key=lambda x: x[1])[0][0] def _unpack_mags(arr: np.ndarray): @@ -201,3 +203,128 @@ def _unpack_mags(arr: np.ndarray): newVals = np.append(newVals, float(m["Magnitude"])) return newVals + +def filter_mags(data, more_than=None, less_than=None): + """Filters by magnitudes a DataFrame into a new Dataframe + + :param data: Raw pandas DataFrame + :param more_than(optional): Filter for magnitudes above threshold + :param after(optional): Filters for dates after set date + :returns: Returns a filtered pandas DataFrame + """ + v = data.drop_duplicates(subset="ID", keep="first") + _dict = {"Data": [], "MagL": []} + for idx, c in v.iterrows(): + _dict["Data"].append(str(c.Data)) + _dict["MagL"].append(utils.extract_mag_l(c.Magnitudes)) + + _df = pd.DataFrame.from_dict(_dict) + if more_than: + _df = _df[_df["MagL"] >= more_than] + + if less_than: + _df = _df[_df["MagL"] <= less_than] + return _df + + +def filter_date( + data: pd.DataFrame, + before: datetime.datetime | None = None, + after: datetime.datetime | None = None, +) -> pd.DataFrame: + """Filters by date a DataFrame into a new Dataframe + + :param data: Raw pandas DataFrame + :param before(optional): Filter for dates before set date + :param after(optional): Filters for dates after set date + :returns: Returns a filtered pandas DataFrame + """ + v = data + for idx, c in v.iterrows(): + v.at[idx, "Data"] = datetime.datetime.fromisoformat(c.Data) + + if after: + v = v[v["Data"] >= after] + + if before: + v = v[v["Data"] >= before] + return v + + +def filter_depth( + data: pd.DataFrame, + less_than: float | None = None, + more_than: float | None = None, +) -> pd.DataFrame: + """Filters by the depth a DataFrame into a new Dataframe + + :param data: Raw pandas DataFrame + :param less_than(optional): Filter for depths below the threshold + :param after(optional): Filters for depths deeper than threshold + :returns: Returns a filtered pandas DataFrame + """ + v = data.drop_duplicates(subset="ID", keep="first") + + if more_than: + v = v[v["Profundidade"] >= more_than] + + if less_than: + v = v[v["Profundidade"] >= less_than] + return v + + +def filter_gap( + data: pd.DataFrame, + threshold: int, +) -> pd.DataFrame: + """Filters by the depth a DataFrame into a new Dataframe + + :param data: Raw pandas DataFrame + :param threshold: Filter for GAPS below the threshold + :returns: Returns a filtered pandas DataFrame + """ + v = data.drop_duplicates(subset="ID", keep="first") + v = v[v["Gap"] <= threshold] + return v + + +def filter_sz( + data: pd.DataFrame, +) -> pd.DataFrame: + """Filters by SZ plane a DataFrame into a new Dataframe + + :param data: Raw pandas DataFrame + :returns: Returns a filtered pandas DataFrame + """ + v = data[data["SZ"].notna()] + return v + + +def filter_vz( + data: pd.DataFrame, +) -> pd.DataFrame: + """Filters by VZ plane a DataFrame into a new Dataframe + + :param data: Raw pandas DataFrame + :returns: Returns a filtered pandas DataFrame + """ + v = data[data["VZ"].notna()] + return v + + +def _preprare_days(data): + c = data.Data.to_list() + for idx, d in enumerate(c): + aux = datetime.datetime.fromisoformat(d) + c[idx] = datetime.datetime.strftime(aux, "%Y-%m-%d") + + return c + + +def _preprare_months(data): + c = data.Data.to_list() + for idx, d in enumerate(c): + aux = datetime.datetime.fromisoformat(d) + c[idx] = datetime.datetime.strftime(aux, "%Y-%m") + + return c diff --git a/utils/utils.py b/utils/utils.py index 2bca49b..dcfb9ef 100644 --- a/utils/utils.py +++ b/utils/utils.py @@ -1,20 +1,30 @@ #! /usr/bin/env python # pyright: basic -from datetime import time import json +from datetime import time from math import modf from typing import Any +import numpy as np import pandas as pd +def extract_mag_l(data) -> np.float64: + for v in data: + if v["Tipo"] == "L": + return np.float64(v["Magnitude"]) + + return np.float64(0.0) + + def save_as_json(info: dict[str, Any]) -> bool: with open("test.json", "w") as fp: json.dump(info, fp) - + return True + # TODO: passar os nomes das colunas, para não haver problemas no futuro, caso se altere os nomes da dataframe def create_dict_struct(df: pd.DataFrame, event_cols, station_cols) -> dict[str, Any]: # get all events by their id @@ -51,15 +61,20 @@ def create_stations_info_1(info: pd.DataFrame) -> dict[str, Any]: aux = info.iloc[idx] micro, sec = tuple(map(int, modf(aux["Seg"]))) - hms = time(hour=aux["Hora"],minute=aux["Min"], second=sec, microsecond=micro).strftime("%H:%M:%S.%f") - station = {"Componente": aux["Componente"], "Hora": hms, "Distancia": float(aux["DIS"])} + hms = time( + hour=aux["Hora"], minute=aux["Min"], second=sec, microsecond=micro + ).strftime("%H:%M:%S.%f") + station = { + "Componente": aux["Componente"], + "Hora": hms, + "Distancia": float(aux["DIS"]), + } if type(aux["Tipo Onda"]) != float: station.update({"Tipo Onda": aux["Tipo Onda"]}) if aux["Tipo Onda"] == "IAML": station.update({"Amplitude": float(aux["Amplitude"])}) - if aux["Estacao"] not in stationsDict.keys(): stationsDict[aux["Estacao"]] = [station] else: @@ -74,7 +89,7 @@ def create_mag_info(magnitudes): return mags -if __name__ == '__main__': +if __name__ == "__main__": import parser df = parser.parse("dados.txt")