# pyright: basic import io from collections import defaultdict from datetime import datetime import pandas as pd # --- globals --- DIST_IND = {"L": "Local", "R": "Regional", "D": "Distante"} TYPE = {"Q": "Quake", "V": "Volcanic", "U": "Unknown", "E": "Explosion"} # --- helper funcs --- def is_blank(l: str) -> bool: return len(l.strip(" ")) == 0 def parse_flt(v: str) -> float | None: try: t = float(v) return t except ValueError: return None def parse_int(v: str) -> int | None: try: t = int(v) return t except ValueError: return None def into_dataframe(data) -> pd.DataFrame: if len(data) == 0: return pd.DataFrame() aux = {k: [] for k in data.keys()} for k, v in data.items(): aux[k].append(v) return pd.DataFrame(data=aux) def _concat(preamble, df: pd.DataFrame): for k, v in preamble.items(): df.insert(len(df.columns) - 1, k, [v for _ in range(len(df))]) return df def validate_no_stations(expected: int, stationsDF: pd.DataFrame) -> bool: uniqueStations = stationsDF["Estacao"].nunique() return expected == uniqueStations # --- principal --- def parse(fname): fp = open(fname) data = [l for l in fp.read().split("\n")] chunks = boundaries(data) df = pd.DataFrame() for idx, c in enumerate(chunks): a = parse_chunk(data[c[0] : c[1]]) aux = pd.concat([df, a], axis=0, ignore_index=True) df = aux fp.close() return df def boundaries(data: list[str]): boundaries = [] start = None for idx, l in enumerate(data): if start is None: if not is_blank(l): start = idx else: if is_blank(l): boundaries.append((start, idx)) start = None return boundaries def parse_chunk(chunk_lines: list[str]): hIdx = None for idx, l in enumerate(chunk_lines): if l[-1] == "7": hIdx = idx break preambleRet = _parse_preamble(chunk_lines[:hIdx]) phaseRet = _parse_type_7(chunk_lines[hIdx:]) if not validate_no_stations(preambleRet["Estacoes"], phaseRet): pass return _concat(preambleRet, phaseRet) def _parse_preamble(hLines: list[str]): aux = defaultdict(list) for line in hLines: match line[-1]: case "1": aux[1].append(line) case "3": aux[3].append(line) case "6": aux[6].append(line) case "E": aux["E"].append(line) case "I": aux["I"].append(line) case "F": pass # aux["F"].append(line) case _: pass headerDict = dict() for k, v in aux.items(): if len(v) != 0: headerDict.update(FUNCS[k](v)) return headerDict def _parse_type_1(data: list[str]): aux = data[0] y = int(aux[1:5]) mo = int(aux[6:8]) d = int(aux[8:10]) h = int(aux[11:13]) m = int(aux[13:15]) s = int(aux[16:18]) mil = int(aux[19]) * 10**5 dt = datetime(y, mo, d, h, m, s, mil) dist_ind = DIST_IND[aux[21]] ev_type = TYPE[aux[22]] lat = float(aux[23:30]) long = float(aux[30:38]) depth = float(aux[38:43]) no_stat = int(aux[48:51]) hypo = { "Data": dt.isoformat(), "Distancia": dist_ind, "Tipo Evento": ev_type, "Latitude": lat, "Longitude": long, "Profundidade": depth, "Estacoes": no_stat, "Magnitudes": list(), } for l in data: hypo["Magnitudes"] = hypo["Magnitudes"] + _parse_mag(l) return hypo def _parse_mag(line: str): magnitudes = [] base = 55 while base < 79: m = line[base : base + 4] mt = line[base + 4] if not is_blank(m): magnitudes.append({"Magnitude": m, "Tipo": mt}) base += 8 return magnitudes def _parse_type_3(data: list[str]): comments = {} for line in data: if line.startswith(" SENTIDO"): c, v = line[:-2].strip().split(": ", maxsplit=1) v = v.split(",")[0] comments[c.capitalize()] = v elif line.startswith(" REGIAO"): c, vals = line[:-2].strip().split(": ", maxsplit=1) _d = {} for v in vals.split(","): if v.startswith("SZ"): comments["SZ"] = int(v[2:]) elif v.startswith("VZ"): comments["VZ"] = int(v[2:]) elif v.startswith("FE"): comments["FZ"] = v[2:] else: comments["Regiao"] = v return comments def _parse_type_6(data: list[str]): waves = [] for l in data: waves.append(l.strip().split(" ")[0]) return {"Onda": waves} def _parse_type_7(data: list[str]): aux = io.StringIO("\n".join(data)) dados = pd.read_fwf( aux, colspecs=[ (1, 5), (6, 8), (10, 15), (18, 20), (20, 22), (23, 28), (34, 38), (71, 75), ], ) dados.rename( columns={ "STAT": "Estacao", "SP": "Componente", "PHASW": "Tipo Onda", "HR": "Hora", "MM": "Min", "SECON": "Seg", "AMPL": "Amplitude", " DIST": "Distancia Epicentro", }, inplace=True, ) return dados def _parse_type_e(data: list[str]): aux = data[0] error = { "Gap": int(aux[5:8]), "Origin": float(aux[14:20]), "Error_lat": float(aux[24:30]), "Error_long": float(aux[32:38]), "Error_depth": float(aux[38:43]), "Cov_xy": float(aux[43:55]), "Cov_xz": float(aux[55:67]), "Cov_yz": float(aux[67:79]), } return error def _parse_type_i(data: list[str]): aux = data[0] return {"ID": int(aux[60:74])} FUNCS = { 1: _parse_type_1, 3: _parse_type_3, 6: _parse_type_6, "E": _parse_type_e, "I": _parse_type_i, }