"""Templates for PLynx Executors and utils."""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, Union
from plynx.constants import PRIMITIVE_TYPES, NodeStatus, SpecialNodeId, ValidationCode, ValidationTargetType
from plynx.db.node import Node, NodeRunningStatus, Parameter, ParameterListOfNodes, ParameterTypes
from plynx.db.validation_error import ValidationError
@dataclass
[docs]class RunningStatus:
"""Async job running status"""
[docs] node_running_status: str
[docs]class BaseExecutor(ABC):
"""Base Executor class"""
def __init__(self, node: Optional[Node] = None):
self.node = node
[docs] def _update_node(self, node):
self.node = node
@abstractmethod
[docs] def run(self, preview: bool = False) -> str:
"""Main execution function.
- Workdir has been initialized.
- Inputs are not preprocessed.
- Outputs shoul be manually postprocessed.
- It is OK to raise an exception in this function.
Returns:
enum: plynx.constants.NodeRunningStatus
"""
@abstractmethod
[docs] def launch(self) -> RunningStatus:
"""Launch the Node on the backend.
The difference between `launch()` and `run()` is that:
- `run()` assumes synchronous execution.
- `launch()` does not necessary have this assumtion. Use `get_node_running_status` to get the status of the execution.
Returns:
RunningStatus
"""
raise NotImplementedError()
[docs] def get_running_status(self) -> RunningStatus:
"""Returns the status of the execution.
Async executions should sync with the remote and return the result immediately.
"""
assert self.node, "Node must be difened by this stage"
return RunningStatus(self.node.node_running_status)
@abstractmethod
[docs] def kill(self):
"""Force to kill the process.
The reason can be the fact it was working too long or parent executor canceled it.
"""
[docs] def is_updated(self) -> bool:
"""Function that is regularly called by a Worker.
The function is running in a separate thread and does not block execution of `run()`.
Returns:
(bool): True if worker needs to update DB else False
"""
return True
@classmethod
[docs] def get_default_node(cls, is_workflow: bool) -> Node:
"""Generate a new default Node for this executor"""
node = Node()
if cls.IS_GRAPH:
nodes_parameter = Parameter(
name='_nodes',
parameter_type=ParameterTypes.LIST_NODE,
value=ParameterListOfNodes(),
mutable_type=False,
publicable=False,
removable=False,
)
if not is_workflow:
# need to add inputs and outputs
nodes_parameter.value.value.extend(
[
Node(
_id=SpecialNodeId.INPUT,
title='Input',
kind='dummy',
node_running_status=NodeRunningStatus.SPECIAL,
node_status=NodeStatus.READY,
),
Node(
_id=SpecialNodeId.OUTPUT,
title='Output',
kind='dummy',
node_running_status=NodeRunningStatus.SPECIAL,
node_status=NodeStatus.READY,
),
]
)
node.parameters.extend([
nodes_parameter,
])
return node
[docs] def init_executor(self):
"""Initialize environment for the executor"""
[docs] def clean_up_executor(self):
"""Clean up the environment created by executor"""
[docs] def validate(self, ignore_inputs: bool = True) -> Union[ValidationError, None]:
"""Validate Node.
Return:
(ValidationError) Validation error if found; else None
"""
assert self.node, "Attribute `node` is not assigned"
violations = []
if self.node.title == '':
violations.append(
ValidationError(
target=ValidationTargetType.PROPERTY,
object_id='title',
validation_code=ValidationCode.MISSING_PARAMETER
))
# Meaning the node is in the graph. Otherwise souldn't be in the validation step
if not ignore_inputs:
for input in self.node.inputs: # pylint: disable=redefined-builtin
min_count = input.min_count if input.is_array else 1
if len(input.input_references) < min_count and input.file_type not in PRIMITIVE_TYPES:
violations.append(
ValidationError(
target=ValidationTargetType.INPUT,
object_id=input.name,
validation_code=ValidationCode.MISSING_INPUT
))
if self.node.node_status == NodeStatus.MANDATORY_DEPRECATED:
violations.append(
ValidationError(
target=ValidationTargetType.NODE,
object_id=str(self.node._id),
validation_code=ValidationCode.DEPRECATED_NODE
))
if len(violations) == 0:
return None
return ValidationError(
target=ValidationTargetType.NODE,
object_id=str(self.node._id),
validation_code=ValidationCode.IN_DEPENDENTS,
children=violations
)
[docs]class Dummy(BaseExecutor):
"""Dummy Executor. Used for static Operations"""
[docs] def run(self, preview=False) -> str:
"""Not Implemented"""
raise NotImplementedError()
[docs] def status(self):
"""Not Implemented"""
raise NotImplementedError()
[docs] def kill(self):
"""Not Implemented"""
raise NotImplementedError()
@classmethod
[docs] def get_default_node(cls, is_workflow: bool) -> Node:
"""Not Implemented"""
raise NotImplementedError()