93 lines
2.2 KiB
Python
93 lines
2.2 KiB
Python
import logging
|
|
from typing import Any
|
|
|
|
from pymongo import MongoClient
|
|
from pymongo.collection import Collection
|
|
from pymongo.cursor import Cursor
|
|
from pymongo.errors import ConnectionFailure
|
|
|
|
try:
|
|
from utilsv2.log import logger
|
|
except ModuleNotFoundError:
|
|
from log import logger
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def connect(uri: str) -> MongoClient:
|
|
try:
|
|
client = MongoClient(uri)
|
|
logger.info("Connected to the DB")
|
|
except ConnectionFailure as e:
|
|
logger.critical("Could not connect to the MongoDB")
|
|
raise e
|
|
|
|
return client
|
|
|
|
|
|
def add_events(
|
|
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
|
|
) -> None:
|
|
coll: Collection = client[db][collection]
|
|
|
|
_res = coll.insert_many(data)
|
|
|
|
if _res.acknowledged:
|
|
logger.info(f"Added {len(_res.inserted_ids)} events.")
|
|
else:
|
|
logger.info("Could not add events to the database.")
|
|
|
|
|
|
def add_stations(
|
|
client: MongoClient, collection: str, data: list[dict[str, Any]], db: str = "main"
|
|
) -> None:
|
|
coll: Collection = client[db][collection]
|
|
|
|
_res = coll.insert_many(data)
|
|
|
|
if _res.acknowledged:
|
|
logger.info(f"Added {len(_res.inserted_ids)} events.")
|
|
else:
|
|
logger.info("Could not add events to the database.")
|
|
|
|
|
|
def get_ids(collection: Collection) -> set[Any]:
|
|
return set(collection.distinct("ID"))
|
|
|
|
|
|
def close(client: MongoClient) -> None:
|
|
client.close()
|
|
logger.info("Closed the DB.")
|
|
|
|
|
|
def query_all(client: MongoClient, collection: str, db: str = "main") -> Any:
|
|
coll: Collection = client[db][collection]
|
|
|
|
result = coll.find({})
|
|
|
|
return list(result)
|
|
|
|
|
|
def filter_query(
|
|
client: MongoClient, collection: str, filter_by: dict[str, Any], db: str = "main"
|
|
):
|
|
coll: Collection = client[db][collection]
|
|
|
|
res = coll.find(
|
|
filter_by, {"DateTime": 1, "Magnitudes": 1, "Depth": 1, "GAP": 1}
|
|
).sort({"DateTime": 1})
|
|
|
|
if not res._empty:
|
|
res = list(res)
|
|
logger.info(f"Retrieved {len(res)} elements.")
|
|
|
|
return res
|
|
|
|
|
|
if __name__ == "__main__":
|
|
v = connect("mongodb://localhost:27017")
|
|
|
|
query_all(v, "quakes")
|
|
|
|
close(v)
|