Source code for plynx.db.node_cache_manager

"""Cache Manager and utils."""
import datetime
import logging
from typing import Any, Dict, List, Optional, Union

from plynx.constants import NodeRunningStatus
from plynx.db.node import Node
from plynx.db.node_cache import NodeCache
from plynx.utils.common import ObjectId
from plynx.utils.db_connector import get_db_connector


[docs]class NodeCacheManager: """The Node cache interface. The cache is defined by Node's - original_node_id - inputs - parameters """ @staticmethod
[docs] def get(node: Node) -> Optional[NodeCache]: """Pull NodeCache if exists. Args: node (Node): Node object Return: (NodeCache) NodeCache or None """ key = NodeCache.generate_key(node) db_node_cache = get_db_connector().node_cache.find({ 'key': key, 'removed': {'$ne': True} }).sort('insertion_date', -1).limit(1) caches = list(db_node_cache) if len(caches) > 0: return NodeCache.from_dict(caches[0]) else: return None
@staticmethod
[docs] def post(node: Node, run_id: ObjectId) -> bool: """Create NodeCache instance in the database. Args: node (Node): Node object run_id (ObjectId, str): Run ID Return: True if cache saved else False """ assert node.node_running_status == NodeRunningStatus.SUCCESS, \ 'Only Nodes with status SUCCESS can be cached' node_cache = NodeCache.instantiate(node=node, run_id=run_id) try: node_cache.save() except Exception as e: # pylint: disable=broad-except logging.error(f"Could not save cache: `{e}`") return False return True
@staticmethod
[docs] def _make_query( start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, non_protected_only: bool = False, ) -> Dict[str, Any]: """Make sample query. Args: start_datetime (datetime, None): Start datetime or None if selecting from beginning end_datetime (datetime, None): End datetime or None if selecting until now Return: Iterator on the list of dict-like objects """ and_query: List[Dict[str, Dict[str, Union[bool, datetime.datetime]]]] = [] insertion_query: Dict[str, Union[bool, datetime.datetime]] = {} if start_datetime: insertion_query['$gte'] = start_datetime if end_datetime: insertion_query['$lt'] = end_datetime if insertion_query: and_query.append({'insertion_date': insertion_query}) if non_protected_only: and_query.append({'protected': {'$ne': True}}) return {'$and': and_query} if and_query else {}
@staticmethod
[docs] def get_list( start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, non_protected_only: bool = False ): """List of NodeCache objects. Args: start_datetime (datetime, None): Start datetime or None if selecting from beginning end_datetime (datetime, None): End datetime or None if selecting until now Return: Iterator on the list of dict-like objects """ return get_db_connector().node_cache.find(NodeCacheManager._make_query(start_datetime, end_datetime, non_protected_only))
@staticmethod
[docs] def clean_up(): """Remove NodeCache objects with flag `removed` set """ return get_db_connector().node_cache.remove({'removed': True})