import logging from typing import Any from pymongo import MongoClient from pymongo.collection import Collection from pymongo.errors import ConnectionFailure try: from utilsv2.log import logger except ModuleNotFoundError: from log import logger logger = logging.getLogger(__name__) def connect(uri: str) -> MongoClient: try: client = MongoClient(uri) logger.info("Connected to the DB") except ConnectionFailure as e: logger.critical("Could not connect to the MongoDB") raise e return client def add_events( client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main" ) -> None: coll: Collection = client[db][collection] _res = coll.insert_many(data) if _res.acknowledged: logger.info(f"Added {len(_res.inserted_ids)} events to {db}.{collection}") else: logger.info("Could not add events to the database.") def add_stations( client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main" ) -> None: coll: Collection = client[db][collection] _res = coll.insert_many(data) if _res.acknowledged: logger.info(f"Added {len(_res.inserted_ids)} stations to {db}.{collection}") else: logger.info("Could not add events to the database.") def get_ids(collection: Collection) -> set[Any]: return set(collection.distinct("ID")) def close(client: MongoClient) -> None: client.close() logger.info("Closed the DB.") def query_all(client: MongoClient, collection: str, db: str = "main") -> Any: coll: Collection = client[db][collection] result = coll.find({}) return list(result) def filter_query( client: MongoClient, collection: str, filter_by: dict[str, Any], db: str = "main" ): coll: Collection = client[db][collection] res = coll.find( filter_by, {"DateTime": 1, "Magnitudes": 1, "Depth": 1, "GAP": 1} ).sort({"DateTime": 1}) if not res._empty: res = list(res) logger.info(f"Retrieved {len(res)} elements.") return res if __name__ == "__main__": v = connect("mongodb://localhost:27017") query_all(v, "quakes") close(v)