Files
prog-team-proj/utilsv2/mongo.py
2026-01-04 18:51:37 -01:00

93 lines
2.2 KiB
Python

import logging
from typing import Any
from pymongo import MongoClient
from pymongo.collection import Collection
from pymongo.cursor import Cursor
from pymongo.errors import ConnectionFailure
try:
from utilsv2.log import logger
except ModuleNotFoundError:
from log import logger
logger = logging.getLogger(__name__)
def connect(uri: str) -> MongoClient:
try:
client = MongoClient(uri)
logger.info("Connected to the DB")
except ConnectionFailure as e:
logger.critical("Could not connect to the MongoDB")
raise e
return client
def add_events(
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
) -> None:
coll: Collection = client[db][collection]
_res = coll.insert_many(data)
if _res.acknowledged:
logger.info(f"Added {len(_res.inserted_ids)} events.")
else:
logger.info("Could not add events to the database.")
def add_stations(
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
) -> None:
coll: Collection = client[db][collection]
_res = coll.insert_many(data)
if _res.acknowledged:
logger.info(f"Added {len(_res.inserted_ids)} events.")
else:
logger.info("Could not add events to the database.")
def get_ids(collection: Collection) -> set[Any]:
return set(collection.distinct("ID"))
def close(client: MongoClient) -> None:
client.close()
logger.info("Closed the DB.")
def query_all(client: MongoClient, collection: str, db: str = "main") -> Any:
coll: Collection = client[db][collection]
result = coll.find({})
return list(result)
def filter_query(
client: MongoClient, collection: str, filter_by: dict[str, Any], db: str = "main"
):
coll: Collection = client[db][collection]
res = coll.find(
filter_by, {"DateTime": 1, "Magnitudes": 1, "Depth": 1, "GAP": 1}
).sort({"DateTime": 1})
if not res._empty:
res = list(res)
logger.info(f"Retrieved {len(res)} elements.")
return res
if __name__ == "__main__":
v = connect("mongodb://localhost:27017")
query_all(v, "quakes")
close(v)