"""All of the endpoints related to the Nodes or simial DB structures"""
from __future__ import absolute_import
import json
from typing import Optional
import bson.objectid
from flask import g, request
import plynx.base.hub
import plynx.db.node_collection_manager
import plynx.db.run_cancellation_manager
import plynx.utils.plugin_manager
from plynx.constants import Collections, IAMPolicies, NodeClonePolicy, NodePostAction, NodePostStatus, NodeRunningStatus, NodeStatus, NodeVirtualCollection
from plynx.db.node import Node
from plynx.utils import node_utils
from plynx.utils.common import to_object_id
from plynx.utils.thumbnails import apply_thumbnails
from plynx.web.common import app, handle_errors, logger, make_fail_response, make_permission_denied, make_success_response, requires_auth
[docs]node_collection_managers = {
collection: plynx.db.node_collection_manager.NodeCollectionManager(collection=collection)
for collection in [Collections.TEMPLATES, Collections.RUNS]
}
[docs]run_cancellation_manager = plynx.db.run_cancellation_manager.RunCancellationManager()
[docs]operation_manager = plynx.utils.plugin_manager.get_operation_manager()
[docs]hub_manager = plynx.utils.plugin_manager.get_hub_manager()
[docs]workflow_manager = plynx.utils.plugin_manager.get_workflow_manager()
[docs]executor_manager = plynx.utils.plugin_manager.get_executor_manager()
[docs]PLUGINS_DICT = plynx.utils.plugin_manager.get_plugins_dict()
@app.route('/plynx/api/v0/search_<collection>', methods=['POST'])
@handle_errors
@requires_auth
[docs]def post_search_nodes(collection: str):
"""Create a search request in templates or runs"""
query = json.loads(request.data)
logger.debug(request.data)
query['user_id'] = to_object_id(g.user._id)
virtual_collection = query.pop('virtual_collection', None)
if len(query.keys() - PAGINATION_QUERY_KEYS):
return make_fail_response(f"Unknown keys: `{query.keys() - PAGINATION_QUERY_KEYS}`"), 400
if collection == 'in_hubs':
hub = query.pop('hub')
res = hub_manager.kind_to_hub_class[hub].search(plynx.base.hub.Query(**query))
else:
if virtual_collection == NodeVirtualCollection.OPERATIONS:
query['node_kinds'] = list(operation_manager.kind_to_operation_dict.keys())
elif virtual_collection == NodeVirtualCollection.WORKFLOWS:
query['node_kinds'] = list(workflow_manager.kind_to_workflow_dict.keys())
res = node_collection_managers[collection].get_db_objects(**query)
return make_success_response({
'items': res['list'],
'total_count': res['metadata'][0]['total'] if res['metadata'] else 0,
'plugins_dict': PLUGINS_DICT,
})
@app.route('/plynx/api/v0/<collection>/<node_link>', methods=['GET'])
@handle_errors
@requires_auth
[docs]def get_nodes(collection: str, node_link: Optional[str] = None):
"""Get the Node based on its ID or kind"""
# pylint: disable=too-many-locals,too-many-return-statements,too-many-branches
user_id = to_object_id(g.user._id)
can_view_others_operations = g.user.check_role(IAMPolicies.CAN_VIEW_OTHERS_OPERATIONS)
can_view_others_workflows = g.user.check_role(IAMPolicies.CAN_VIEW_OTHERS_WORKFLOWS)
can_view_operations = g.user.check_role(IAMPolicies.CAN_VIEW_OPERATIONS)
can_view_workflows = g.user.check_role(IAMPolicies.CAN_VIEW_WORKFLOWS)
can_create_operations = g.user.check_role(IAMPolicies.CAN_CREATE_OPERATIONS)
can_create_workflows = g.user.check_role(IAMPolicies.CAN_CREATE_WORKFLOWS)
if node_link in executor_manager.kind_to_executor_class and collection == Collections.TEMPLATES:
# if node_link is a base node
# i.e. /templates/basic-bash
kind = node_link
if kind in workflow_manager.kind_to_workflow_dict and (not can_view_workflows or not can_create_workflows):
return make_permission_denied()
if kind in operation_manager.kind_to_operation_dict and (not can_view_operations or not can_create_operations):
return make_permission_denied()
node: Node = executor_manager.kind_to_executor_class[kind].get_default_node(
is_workflow=kind in workflow_manager.kind_to_workflow_dict
)
if isinstance(node, tuple):
data = node[0].to_dict()
tour_steps = node[1]
else:
data = node.to_dict()
tour_steps = []
node.author = user_id
node.save()
data['kind'] = kind
return make_success_response({
'node': data,
'tour_steps': tour_steps,
'plugins_dict': PLUGINS_DICT,
})
else:
# when node_link is an id of the object
try:
node_id = to_object_id(node_link)
except bson.objectid.InvalidId: # type: ignore
return make_fail_response('Invalid ID'), 404
node_dict = node_collection_managers[collection].get_db_node(node_id, user_id)
if not node_dict:
return make_fail_response(f"Node `{node_link}` was not found"), 404
node = Node.from_dict(node_dict)
is_owner = node.author == user_id
if node.kind in workflow_manager.kind_to_workflow_dict and not can_view_workflows:
return make_permission_denied()
if node.kind in operation_manager.kind_to_operation_dict and not can_view_operations:
return make_permission_denied()
if node.kind in workflow_manager.kind_to_workflow_dict and not can_view_others_workflows and not is_owner:
return make_permission_denied()
if node.kind in operation_manager.kind_to_operation_dict and not can_view_others_operations and not is_owner:
return make_permission_denied()
latest_run_id = node.latest_run_id
last_run_is_in_finished_status = None
if collection == Collections.TEMPLATES and latest_run_id:
node_in_run_dict = node_collection_managers[Collections.RUNS].get_db_node(latest_run_id, user_id)
if node_in_run_dict:
node_in_run = Node.from_dict(node_in_run_dict)
node_utils.augment_node_with_cache(node, node_in_run)
last_run_is_in_finished_status = NodeRunningStatus.is_finished(node_in_run.node_running_status)
else:
logger.warning(f"Failed to load a run with id `{latest_run_id}`")
apply_thumbnails(node)
return make_success_response({
"node": node.to_dict(),
"plugins_dict": PLUGINS_DICT,
"last_run_is_in_finished_status": last_run_is_in_finished_status,
})
@app.route('/plynx/api/v0/<collection>', methods=['POST'])
@handle_errors
@requires_auth
[docs]def post_node(collection: str):
"""Post a Node with an action"""
# TODO: fix disables
# pylint: disable=too-many-return-statements,too-many-branches,too-many-statements,too-many-locals
logger.debug(request.data)
data = json.loads(request.data)
node: Node = Node.from_dict(data['node'])
node.starred = False
action = data['action']
user_id = g.user._id
db_node = node_collection_managers[collection].get_db_node(node._id, user_id)
if db_node:
if not node.author:
node.author = db_node['author']
if node.author != db_node['author']:
raise Exception("Author of the node does not match the one in the database")
is_author = db_node['author'] == g.user._id
else:
# assign the author
node.author = g.user._id
is_author = True
is_admin = g.user.check_role(IAMPolicies.IS_ADMIN)
is_workflow = node.kind in workflow_manager.kind_to_workflow_dict
can_create_operations = g.user.check_role(IAMPolicies.CAN_CREATE_OPERATIONS)
can_create_workflows = g.user.check_role(IAMPolicies.CAN_CREATE_WORKFLOWS)
can_modify_others_workflows = g.user.check_role(IAMPolicies.CAN_MODIFY_OTHERS_WORKFLOWS)
can_run_workflows = g.user.check_role(IAMPolicies.CAN_RUN_WORKFLOWS)
executor = executor_manager.kind_to_executor_class[node.kind](node)
logger.info(f"User `{user_id}`(is_admin {is_admin}) is trying to {action} node `{node._id}`")
if action == NodePostAction.SAVE:
if (is_workflow and not can_create_workflows) or (not is_workflow and not can_create_operations):
return make_permission_denied('You do not have permission to save this object')
if node.node_status != NodeStatus.CREATED:
return make_fail_response(f"Cannot save node with status `{node.node_status}`")
if not (is_author or is_admin or (is_workflow and can_modify_others_workflows)):
return make_permission_denied('Only the owners or users with CAN_MODIFY_OTHERS_WORKFLOWS role can save it')
if node.auto_run and is_workflow:
node_in_run, new_node_in_run = node_utils.construct_new_run(node, user_id)
node_utils.remove_auto_run_disabled(new_node_in_run)
need_to_run = False
if node_in_run is None:
need_to_run = True
else:
for new_subnode, subnode in node_utils.traverse_left_join(new_node_in_run, node_in_run):
if subnode is None:
need_to_run = True
logger.info("Auto run because subnode not found")
break
if not node_utils.node_inputs_and_params_are_identical(new_subnode, subnode):
need_to_run = True
logger.info("Auto run because inputs or params are not identical")
break
logger.info(f"Auto run: {need_to_run}")
if need_to_run:
# TODO check permissions
executor._update_node(new_node_in_run)
validation_error = executor.validate()
if validation_error:
logger.info("Validation failed. Won't start the run")
else:
executor.launch()
if node.latest_run_id:
run_cancellation_manager.cancel_run(node.latest_run_id)
node.latest_run_id = new_node_in_run._id
node.save(force=True)
elif action == NodePostAction.APPROVE:
if is_workflow:
return make_fail_response('Invalid action for a workflow'), 400
if node.node_status != NodeStatus.CREATED:
return make_fail_response(f"Node status `{NodeStatus.CREATED}` expected. Found `{node.node_status}`")
validation_error = executor.validate()
if validation_error:
return make_success_response({
'status': NodePostStatus.VALIDATION_FAILED,
'message': 'Node validation failed',
'validation_error': validation_error.to_dict()
})
node.node_status = NodeStatus.READY
if is_author or is_admin:
node.save(force=True)
else:
return make_permission_denied()
elif action == NodePostAction.CREATE_RUN:
if not is_workflow:
return make_fail_response('Invalid action for an operation'), 400
if node.node_status != NodeStatus.CREATED:
return make_fail_response(f"Node status `{NodeStatus.CREATED}` expected. Found `{node.node_status}`")
validation_error = executor.validate()
if validation_error:
return make_success_response({
'status': NodePostStatus.VALIDATION_FAILED,
'message': 'Node validation failed',
'validation_error': validation_error.to_dict()
})
_, new_node_in_run = node_utils.construct_new_run(node, user_id)
new_node_in_run.author = g.user._id
if is_admin or can_run_workflows:
executor._update_node(new_node_in_run)
executor.launch()
else:
return make_permission_denied('You do not have CAN_RUN_WORKFLOWS role')
node.latest_run_id = new_node_in_run._id
node.save(force=True)
return make_success_response({
'status': NodePostStatus.SUCCESS,
'message': f"Run(_id=`{new_node_in_run._id}`) successfully created",
'run_id': str(new_node_in_run._id),
'url': f"/{Collections.RUNS}/{new_node_in_run._id}",
})
elif action == NodePostAction.CREATE_RUN_FROM_SCRATCH:
if not is_workflow:
return make_fail_response('Invalid action for an operation'), 400
if node.node_status != NodeStatus.CREATED:
return make_fail_response(f"Node status `{NodeStatus.CREATED}` expected. Found `{node.node_status}`")
validation_error = executor.validate()
if validation_error:
return make_success_response({
'status': NodePostStatus.VALIDATION_FAILED,
'message': 'Node validation failed',
'validation_error': validation_error.to_dict()
})
node_in_run = node.clone(NodeClonePolicy.NODE_TO_RUN)
node_in_run.author = g.user._id
assert db_node, "db_node should be present"
db_node_obj = Node.from_dict(db_node)
db_node_obj.latest_run_id = node_in_run._id
db_node_obj.save(force=True)
if is_admin or can_run_workflows:
executor._update_node(node_in_run)
executor.launch()
else:
return make_permission_denied('You do not have CAN_RUN_WORKFLOWS role')
return make_success_response({
'status': NodePostStatus.SUCCESS,
'message': f"Run(_id=`{node_in_run._id}`) successfully created",
'run_id': str(node_in_run._id),
'url': f"/{Collections.RUNS}/{node_in_run._id}",
})
elif action == NodePostAction.CLONE:
if (is_workflow and not can_create_workflows) or (not is_workflow and not can_create_operations):
return make_permission_denied('You do not have the role to create an object')
node_clone_policy: int
if collection == Collections.TEMPLATES:
node_clone_policy = NodeClonePolicy.NODE_TO_NODE
elif collection == Collections.RUNS:
node_clone_policy = NodeClonePolicy.RUN_TO_NODE
else:
raise ValueError(f"Unknown or unexpeted collection `{collection}`")
node = node.clone(node_clone_policy)
node.save(collection=Collections.TEMPLATES)
return make_success_response({
'message': f"Node(_id=`{node._id}`) successfully created",
'node_id': str(node._id),
'url': f"/{Collections.TEMPLATES}/{node._id}",
})
elif action == NodePostAction.VALIDATE:
validation_error = executor.validate()
if validation_error:
return make_success_response({
'status': NodePostStatus.VALIDATION_FAILED,
'message': 'Node validation failed',
'validation_error': validation_error.to_dict()
})
elif action == NodePostAction.DEPRECATE:
if node.node_status == NodeStatus.CREATED:
return make_fail_response(f"Node status `{node.node_status}` not expected.")
node.node_status = NodeStatus.DEPRECATED
if is_author or is_admin:
node.save(force=True)
else:
return make_permission_denied('You are not an author to deprecate it')
elif action == NodePostAction.MANDATORY_DEPRECATE:
if node.node_status == NodeStatus.CREATED:
return make_fail_response(f"Node status `{node.node_status}` not expected.")
node.node_status = NodeStatus.MANDATORY_DEPRECATED
if is_author or is_admin:
node.save(force=True)
else:
return make_permission_denied('You are not an author to deprecate it')
elif action == NodePostAction.PREVIEW_CMD:
return make_success_response({
'message': 'Successfully created preview',
'preview_text': executor.run(preview=True)
})
elif action == NodePostAction.REARRANGE_NODES:
node_utils.arrange_auto_layout(node)
return make_success_response({
'message': 'Successfully created preview',
'node': node.to_dict(),
})
elif action == NodePostAction.UPGRADE_NODES:
upd = node_collection_managers[collection].upgrade_sub_nodes(node)
return make_success_response({
'message': 'Successfully updated nodes',
'node': node.to_dict(),
'upgraded_nodes_count': upd,
})
elif action == NodePostAction.CANCEL:
if is_author or is_admin:
run_cancellation_manager.cancel_run(node._id)
else:
return make_permission_denied('You are not an author to cancel the run')
elif action == NodePostAction.GENERATE_CODE:
raise NotImplementedError()
else:
return make_fail_response(f"Unknown action `{action}`")
return make_success_response({
'message': f"Node(_id=`{node._id}`) successfully updated"
})