import logging from typing import Any from pymongo import MongoClient from pymongo.collection import Collection from pymongo.cursor import Cursor from pymongo.errors import ConnectionFailure try: from utilsv2.log import logger except ModuleNotFoundError: from log import logger logger = logging.getLogger(__name__) def connect(uri) -> MongoClient: try: client = MongoClient(uri) logger.info("Connected to the DB") except ConnectionFailure as e: logger.critical("Could not connect to the MongoDB") raise e return client def add_events( client: MongoClient, collection: str, data: list[dict[str, Any]] ) -> None: db = client["main"] coll = db[collection] _res = coll.insert_many(data) if _res.acknowledged: logger.info(f"Added {len(_res.inserted_ids)} events.") else: logger.info("Could not add events to the database.") def add_stations( client: MongoClient, collection: str, data: list[dict[str, Any]] ) -> None: db = client["main"] coll = db[collection] _res = coll.insert_many(data) if _res.acknowledged: logger.info(f"Added {len(_res.inserted_ids)} events.") else: logger.info("Could not add events to the database.") def get_ids(collection: Collection) -> set[Any]: return set(collection.distinct("ID")) def close(client: MongoClient) -> None: client.close() logger.info("Closed the DB.") def query_all(cli: MongoClient, collection: str) -> Any: coll: Collection = cli.main[collection] result = coll.find({}) for doc in result: print(doc) def filter_query(cli: MongoClient, collection: str, filter_by): coll: Collection = cli.main[collection] res = coll.find({""}) if __name__ == "__main__": v = connect("mongodb://localhost:27017") query_all(v, "quakes") close(v)