graficos, estatitsticas e filtros

This commit is contained in:
2025-12-11 14:18:03 -01:00
parent 490c88085a
commit 14dee58ab2
5 changed files with 286 additions and 69 deletions

View File

@@ -1,6 +1,5 @@
# pyright: basic
import io
from collections import defaultdict
from datetime import datetime
@@ -11,40 +10,45 @@ DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"}
TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"}
# --- helper funcs ---
# --- helper funcs ---
def is_blank(l: str) -> bool:
return len(l.strip(" ")) == 0
def parse_flt(v:str) -> float | None:
def parse_flt(v: str) -> float | None:
try:
t = float(v)
return t
except ValueError:
return None
def parse_int(v:str) -> int | None:
def parse_int(v: str) -> int | None:
try:
t = int(v)
return t
except ValueError:
return None
def into_dataframe(data) -> pd.DataFrame:
if len(data) == 0:
return pd.DataFrame()
aux = {k: [] for k in data.keys()}
for (k,v) in data.items():
for k, v in data.items():
aux[k].append(v)
return pd.DataFrame(data=aux)
def _concat(preamble, df: pd.DataFrame):
for (k,v) in preamble.items():
df.insert(len(df.columns)-1, k, [v for _ in range(len(df))])
for k, v in preamble.items():
df.insert(len(df.columns) - 1, k, [v for _ in range(len(df))])
return df
def validate_no_stations(expected:int , stationsDF:pd.DataFrame) -> bool:
def validate_no_stations(expected: int, stationsDF: pd.DataFrame) -> bool:
uniqueStations = stationsDF["Estacao"].nunique()
return expected == uniqueStations
@@ -55,29 +59,31 @@ def parse(fname):
data = [l for l in fp.read().split("\n")]
chunks = boundaries(data)
df = pd.DataFrame()
for (idx,c) in enumerate(chunks):
a = parse_chunk(data[c[0]:c[1]])
for idx, c in enumerate(chunks):
a = parse_chunk(data[c[0] : c[1]])
aux = pd.concat([df, a], axis=0, ignore_index=True)
df = aux
fp.close()
return df
def boundaries(data: list[str]):
boundaries = []
start = None
for (idx,l) in enumerate(data):
for idx, l in enumerate(data):
if start is None:
if not is_blank(l):
start = idx
else:
if is_blank(l):
boundaries.append((start,idx))
boundaries.append((start, idx))
start = None
return boundaries
def parse_chunk(chunk_lines: list[str]):
hIdx = None
for (idx, l) in enumerate(chunk_lines):
for idx, l in enumerate(chunk_lines):
if l[-1] == "7":
hIdx = idx
break
@@ -89,6 +95,7 @@ def parse_chunk(chunk_lines: list[str]):
return _concat(preambleRet, phaseRet)
def _parse_preamble(hLines: list[str]):
aux = defaultdict(list)
@@ -111,7 +118,7 @@ def _parse_preamble(hLines: list[str]):
pass
headerDict = dict()
for (k,v) in aux.items():
for k, v in aux.items():
if len(v) != 0:
headerDict.update(FUNCS[k](v))
return headerDict
@@ -126,7 +133,7 @@ def _parse_type_1(data: list[str]):
m = int(aux[13:15])
s = int(aux[16:18])
mil = int(aux[19]) * 10**5
dt = datetime(y,mo,d,h,m,s,mil)
dt = datetime(y, mo, d, h, m, s, mil)
dist_ind = DIST_IND[aux[21]]
ev_type = TYPE[aux[22]]
@@ -135,18 +142,28 @@ def _parse_type_1(data: list[str]):
depth = float(aux[38:43])
no_stat = int(aux[48:51])
hypo = {"Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Evento": ev_type, "Latitude": lat, "Longitude": long, "Profundidade": depth, "Estacoes": no_stat, "Magnitudes": list()}
hypo = {
"Data": dt.isoformat(),
"Distancia": dist_ind,
"Tipo Evento": ev_type,
"Latitude": lat,
"Longitude": long,
"Profundidade": depth,
"Estacoes": no_stat,
"Magnitudes": list(),
}
for l in data:
hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l)
return hypo
def _parse_mag(line: str):
magnitudes = []
base = 55
while base < 79:
m = line[base:base+4]
mt = line[base+4]
m = line[base : base + 4]
mt = line[base + 4]
if not is_blank(m):
magnitudes.append({"Magnitude": m, "Tipo": mt})
base += 8
@@ -156,11 +173,24 @@ def _parse_mag(line: str):
def _parse_type_3(data: list[str]):
comments = {}
for line in data:
if line.startswith(" SENTIDO") or line.startswith(" REGIAO"):
if line.startswith(" SENTIDO"):
c, v = line[:-2].strip().split(": ", maxsplit=1)
v = v.split(",")[0]
comments[c.capitalize()] = v
elif line.startswith(" REGIAO"):
c, vals = line[:-2].strip().split(": ", maxsplit=1)
_d = {}
for v in vals.split(","):
if v.startswith("SZ"):
comments["SZ"] = int(v[2:])
elif v.startswith("VZ"):
comments["VZ"] = int(v[2:])
elif v.startswith("FE"):
comments["FZ"] = v[2:]
else:
comments["Regiao"] = v
return comments
@@ -173,21 +203,59 @@ def _parse_type_6(data: list[str]):
def _parse_type_7(data: list[str]):
aux = io.StringIO("\n".join(data))
dados = pd.read_fwf(aux, colspecs=[(1,5), (6,8),(10,15), (18,20), (20,22), (23,28), (34,38), (71,75)])
dados.rename(columns={'STAT': "Estacao", 'SP': "Componente" , 'PHASW': "Tipo Onda", 'HR': "Hora", 'MM': "Min", 'SECON': "Seg", 'AMPL': "Amplitude", " DIST": "Distancia Epicentro"}, inplace=True)
dados = pd.read_fwf(
aux,
colspecs=[
(1, 5),
(6, 8),
(10, 15),
(18, 20),
(20, 22),
(23, 28),
(34, 38),
(71, 75),
],
)
dados.rename(
columns={
"STAT": "Estacao",
"SP": "Componente",
"PHASW": "Tipo Onda",
"HR": "Hora",
"MM": "Min",
"SECON": "Seg",
"AMPL": "Amplitude",
" DIST": "Distancia Epicentro",
},
inplace=True,
)
return dados
def _parse_type_e(data: list[str]):
aux = data[0]
error = {"Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79])}
error = {
"Gap": int(aux[5:8]),
"Origin": float(aux[14:20]),
"Error_lat": float(aux[24:30]),
"Error_long": float(aux[32:38]),
"Error_depth": float(aux[38:43]),
"Cov_xy": float(aux[43:55]),
"Cov_xz": float(aux[55:67]),
"Cov_yz": float(aux[67:79]),
}
return error
def _parse_type_i(data: list[str]):
aux = data[0]
return {"ID":int(aux[60:74])}
return {"ID": int(aux[60:74])}
FUNCS = {1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i}
FUNCS = {
1: _parse_type_1,
3: _parse_type_3,
6: _parse_type_6,
"E": _parse_type_e,
"I": _parse_type_i,
}

View File

@@ -1,6 +1,8 @@
import collections
import datetime
import numpy as np
import stats
from matplotlib import pyplot as plt
@@ -13,7 +15,7 @@ class Plotter:
pass
def plot_events_day(self):
values = collections.Counter(self._preprare_days())
values = collections.Counter(stats._preprare_days(self.raw_data))
x = list(values.keys())
y = list(values.values())
@@ -23,7 +25,7 @@ class Plotter:
plt.show()
def plot_events_month(self):
values = collections.Counter(self._preprare_months())
values = collections.Counter(stats._preprare_months(self.raw_data))
x = list(values.keys())
y = list(values.values())
@@ -32,26 +34,6 @@ class Plotter:
ax.bar(x, y)
plt.show()
def _preprare_days(self):
c = self.raw_data.Data.to_list()
for idx, d in enumerate(c):
aux = datetime.datetime.fromisoformat(d)
c[idx] = datetime.datetime.strftime(aux, "%Y-%m-%d")
return c
def _preprare_months(self):
c = self.raw_data.Data.to_list()
for idx, d in enumerate(c):
aux = datetime.datetime.fromisoformat(d)
c[idx] = datetime.datetime.strftime(aux, "%Y-%m")
return c
def _prepare_mags(self):
pass
# c = self.raw_data.
if __name__ == "__main__":
import parser
@@ -59,4 +41,10 @@ if __name__ == "__main__":
asdf = parser.parse("../dados.txt")
a = Plotter(asdf)
print(a.raw_data.dtypes)
# b = stats._filter_mags(a.raw_data, more_than=2.5, less_than=2.9)
c = stats.filter_date(
a.raw_data,
after=datetime.datetime(year=2014, month=1, day=6),
before=datetime.datetime(year=2014, month=1, day=12),
)
print(c)

View File

@@ -1,13 +1,15 @@
# pyright: basic
import datetime
import os
import sys
import pandas as pd
import numpy as np
import pandas as pd
import utils
STAT_HEADER ="""=== Terramotos ===
== Estatísticas ==
STAT_HEADER = """=== Terramotos ===
== Estatísticas ==
"""
STAT_MENU = """[1] Média
@@ -26,7 +28,7 @@ FILTER_CHOICES = """[1] Magnitudes
"""
CHOICE = {"1": "Magnitudes", "2": "Distancia","3": "Prof"}
CHOICE = {"1": "Magnitudes", "2": "Distancia", "3": "Prof"}
def filter_submenu(type: str):
@@ -124,7 +126,7 @@ def stat_menu(df: pd.DataFrame):
def average(df: pd.DataFrame, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
events = df.drop_duplicates(subset="ID", keep="first")
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
@@ -136,7 +138,7 @@ def average(df: pd.DataFrame, filter_by):
def variance(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
events = df.drop_duplicates(subset="ID", keep="first")
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
@@ -149,12 +151,12 @@ def variance(df, filter_by):
def std_dev(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
events = df.drop_duplicates(subset="ID", keep="first")
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
try:
return np.std(values)
except:
@@ -162,27 +164,27 @@ def std_dev(df, filter_by):
def max_v(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
events = df.drop_duplicates(subset="ID", keep="first")
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.max(values)
def min_v(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
events = df.drop_duplicates(subset="ID", keep="first")
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
values = _unpack_mags(values)
return np.min(values)
def moda(df, filter_by):
events = df.drop_duplicates(subset="ID", keep='first')
events = df.drop_duplicates(subset="ID", keep="first")
values = events[filter_by].to_numpy()
if filter_by == "Magnitudes":
@@ -191,7 +193,7 @@ def moda(df, filter_by):
uniques, count = np.unique(values, return_counts=True)
uniques_list = list(zip(uniques, count))
return sorted(uniques_list, reverse=True ,key=lambda x: x[1])[0][0]
return sorted(uniques_list, reverse=True, key=lambda x: x[1])[0][0]
def _unpack_mags(arr: np.ndarray):
@@ -201,3 +203,128 @@ def _unpack_mags(arr: np.ndarray):
newVals = np.append(newVals, float(m["Magnitude"]))
return newVals
def filter_mags(data, more_than=None, less_than=None):
"""Filters by magnitudes a DataFrame into a new Dataframe
:param data: Raw pandas DataFrame
:param more_than(optional): Filter for magnitudes above threshold
:param after(optional): Filters for dates after set date
:returns: Returns a filtered pandas DataFrame
"""
v = data.drop_duplicates(subset="ID", keep="first")
_dict = {"Data": [], "MagL": []}
for idx, c in v.iterrows():
_dict["Data"].append(str(c.Data))
_dict["MagL"].append(utils.extract_mag_l(c.Magnitudes))
_df = pd.DataFrame.from_dict(_dict)
if more_than:
_df = _df[_df["MagL"] >= more_than]
if less_than:
_df = _df[_df["MagL"] <= less_than]
return _df
def filter_date(
data: pd.DataFrame,
before: datetime.datetime | None = None,
after: datetime.datetime | None = None,
) -> pd.DataFrame:
"""Filters by date a DataFrame into a new Dataframe
:param data: Raw pandas DataFrame
:param before(optional): Filter for dates before set date
:param after(optional): Filters for dates after set date
:returns: Returns a filtered pandas DataFrame
"""
v = data
for idx, c in v.iterrows():
v.at[idx, "Data"] = datetime.datetime.fromisoformat(c.Data)
if after:
v = v[v["Data"] >= after]
if before:
v = v[v["Data"] >= before]
return v
def filter_depth(
data: pd.DataFrame,
less_than: float | None = None,
more_than: float | None = None,
) -> pd.DataFrame:
"""Filters by the depth a DataFrame into a new Dataframe
:param data: Raw pandas DataFrame
:param less_than(optional): Filter for depths below the threshold
:param after(optional): Filters for depths deeper than threshold
:returns: Returns a filtered pandas DataFrame
"""
v = data.drop_duplicates(subset="ID", keep="first")
if more_than:
v = v[v["Profundidade"] >= more_than]
if less_than:
v = v[v["Profundidade"] >= less_than]
return v
def filter_gap(
data: pd.DataFrame,
threshold: int,
) -> pd.DataFrame:
"""Filters by the depth a DataFrame into a new Dataframe
:param data: Raw pandas DataFrame
:param threshold: Filter for GAPS below the threshold
:returns: Returns a filtered pandas DataFrame
"""
v = data.drop_duplicates(subset="ID", keep="first")
v = v[v["Gap"] <= threshold]
return v
def filter_sz(
data: pd.DataFrame,
) -> pd.DataFrame:
"""Filters by SZ plane a DataFrame into a new Dataframe
:param data: Raw pandas DataFrame
:returns: Returns a filtered pandas DataFrame
"""
v = data[data["SZ"].notna()]
return v
def filter_vz(
data: pd.DataFrame,
) -> pd.DataFrame:
"""Filters by VZ plane a DataFrame into a new Dataframe
:param data: Raw pandas DataFrame
:returns: Returns a filtered pandas DataFrame
"""
v = data[data["VZ"].notna()]
return v
def _preprare_days(data):
c = data.Data.to_list()
for idx, d in enumerate(c):
aux = datetime.datetime.fromisoformat(d)
c[idx] = datetime.datetime.strftime(aux, "%Y-%m-%d")
return c
def _preprare_months(data):
c = data.Data.to_list()
for idx, d in enumerate(c):
aux = datetime.datetime.fromisoformat(d)
c[idx] = datetime.datetime.strftime(aux, "%Y-%m")
return c

View File

@@ -1,20 +1,30 @@
#! /usr/bin/env python
# pyright: basic
from datetime import time
import json
from datetime import time
from math import modf
from typing import Any
import numpy as np
import pandas as pd
def extract_mag_l(data) -> np.float64:
for v in data:
if v["Tipo"] == "L":
return np.float64(v["Magnitude"])
return np.float64(0.0)
def save_as_json(info: dict[str, Any]) -> bool:
with open("test.json", "w") as fp:
json.dump(info, fp)
return True
# TODO: passar os nomes das colunas, para não haver problemas no futuro, caso se altere os nomes da dataframe
def create_dict_struct(df: pd.DataFrame, event_cols, station_cols) -> dict[str, Any]:
# get all events by their id
@@ -51,15 +61,20 @@ def create_stations_info_1(info: pd.DataFrame) -> dict[str, Any]:
aux = info.iloc[idx]
micro, sec = tuple(map(int, modf(aux["Seg"])))
hms = time(hour=aux["Hora"],minute=aux["Min"], second=sec, microsecond=micro).strftime("%H:%M:%S.%f")
station = {"Componente": aux["Componente"], "Hora": hms, "Distancia": float(aux["DIS"])}
hms = time(
hour=aux["Hora"], minute=aux["Min"], second=sec, microsecond=micro
).strftime("%H:%M:%S.%f")
station = {
"Componente": aux["Componente"],
"Hora": hms,
"Distancia": float(aux["DIS"]),
}
if type(aux["Tipo Onda"]) != float:
station.update({"Tipo Onda": aux["Tipo Onda"]})
if aux["Tipo Onda"] == "IAML":
station.update({"Amplitude": float(aux["Amplitude"])})
if aux["Estacao"] not in stationsDict.keys():
stationsDict[aux["Estacao"]] = [station]
else:
@@ -74,7 +89,7 @@ def create_mag_info(magnitudes):
return mags
if __name__ == '__main__':
if __name__ == "__main__":
import parser
df = parser.parse("dados.txt")