Source code for plynx.plugins.executors.python.local

"""Python Operation"""
import contextlib
import inspect
import os
import pydoc
import sys
import threading
import uuid
from typing import Any, Callable, Dict

import plynx.plugins.executors.bases
import plynx.plugins.executors.local
import plynx.utils.plugin_manager
from plynx.constants import PRIMITIVE_TYPES, NodeRunningStatus, ParameterTypes
from plynx.db.node import Input, Node, Output, Parameter, ParameterCode

[docs]DEFAULT_CMD = """# Python Operation # The code of the operation have to be defined as a function # # A function must be named `operation`. # The arguments are named and must represent then inputs of the PLynx Operation. # The function must return a dictionary that map to the outputs of PLynx Operation def operation(int_a, int_b): return {"sum": int_a + int_b} """
[docs]stateful_init_mutex = threading.Lock()
[docs]stateful_class_registry = {}
[docs]_resource_manager = plynx.utils.plugin_manager.get_resource_manager()
[docs]def materialize_fn_or_cls(node: Node) -> Callable: """Unpickle the function""" code_function_location = node.code_function_location code_parameter = node.get_parameter_by_name_safe("_cmd") assert not (code_function_location and code_parameter), "`code_function_location` and `_cmd` cannot be both non-null" if code_function_location: func = pydoc.locate(code_function_location) assert callable(func), f"The function or class `{code_function_location}` is not callable" return func elif code_parameter: code = code_parameter.value.value local_vars: Dict[str, Any] = {} exec(code, globals(), local_vars) # pylint: disable=W0122 return local_vars["operation"] raise ValueError("No function to materialize")
[docs]def assign_outputs(node: Node, output_dict: Dict[str, Any]): """Apply output_dict to node's outputs.""" if not output_dict: return for key, value in output_dict.items(): node_output = node.get_output_by_name(key) func = _resource_manager.kind_to_resource_class[node_output.file_type].postprocess_output if node_output.is_array: node_output.values = list(map(func, value)) else: node_output.values = [func(value)]
[docs]class redirect_to_plynx_logs: # pylint: disable=invalid-name """Redirect stdout and stderr to standard PLynx Outputs""" # pylint: disable=too-many-instance-attributes def __init__(self, node: Node, stdout: str, stderr: str): self.stdout_filename = str(uuid.uuid4()) self.stderr_filename = str(uuid.uuid4()) self.stdout = stdout self.stderr = stderr self.node = node self.names_map = [ (self.stdout_filename, self.stdout), (self.stderr_filename, self.stderr), ] self.stdout_file = None self.stderr_file = None self.stdout_redirect = None self.stderr_redirect = None
[docs] def __enter__(self): self.stdout_file = open(self.stdout_filename, 'w') self.stderr_file = open(self.stderr_filename, 'w') self.stdout_redirect = contextlib.redirect_stdout(self.stdout_file) self.stderr_redirect = contextlib.redirect_stderr(self.stderr_file) self.stdout_redirect.__enter__() self.stderr_redirect.__enter__()
[docs] def __exit__(self, *args): self.stdout_redirect.__exit__(*sys.exc_info()) self.stderr_redirect.__exit__(*sys.exc_info()) self.stdout_file.close() self.stderr_file.close() for filename, logs_name in self.names_map: if os.stat(filename).st_size > 0: with plynx.utils.file_handler.open(filename, "w") as f: with open(filename) as fi: f.write(fi.read()) output = self.node.get_log_by_name(name=logs_name) output.values = [filename] else: os.remove(filename)
[docs]def prep_args(node: Node) -> Dict[str, Any]: """Pythonize inputs and parameters""" args = {} for input in node.inputs: # pylint: disable=redefined-builtin func = _resource_manager.kind_to_resource_class[input.file_type].preprocess_input if input.is_array: args[input.name] = list(map(func, input.values)) else: if len(input.values) == 1: args[input.name] = func(input.values[0]) elif len(input.values) == 0 and input.file_type in PRIMITIVE_TYPES: args[input.name] = func(input.primitive_override) else: raise ValueError(f"Input {input.name} is not array but has multiple values") # TODO smater way to determine what parameters to pass visible_parameters = list(filter(lambda param: param.widget is not None, node.parameters)) args.update( plynx.plugins.executors.local.prepare_parameters_for_python(visible_parameters) ) return args
[docs]class PythonNode(plynx.plugins.executors.bases.PLynxSyncExecutor): """ Class is used as a placeholder for local python executor """
[docs] def run(self, preview: bool = False) -> str: assert self.node, "Executor memeber `node` is not defined" func = materialize_fn_or_cls(self.node) if inspect.isclass(func): with stateful_init_mutex: if self.node.code_hash not in stateful_class_registry: with redirect_to_plynx_logs(self.node, "init_stdout", "init_stderr"): stateful_class_registry[self.node.code_hash] = func() func = stateful_class_registry[self.node.code_hash] with redirect_to_plynx_logs(self.node, "stdout", "stderr"): res = func(**prep_args(self.node)) assign_outputs(self.node, res) return NodeRunningStatus.SUCCESS
[docs] def kill(self): raise NotImplementedError()
@classmethod
[docs] def get_default_node(cls, is_workflow: bool) -> Node: """Generate a new default Node for this executor""" if is_workflow: raise Exception('This class cannot be a workflow') node = super().get_default_node(is_workflow) node.inputs.extend( [ Input( name="int_a", file_type="int", ), Input( name="int_b", file_type="int", ), ] ) node.outputs.extend( [ Output( name="sum", file_type="int", ), ] ) node.parameters.extend( [ Parameter( name="_cmd", parameter_type=ParameterTypes.CODE, value=ParameterCode( mode="python", value=DEFAULT_CMD, ), mutable_type=False, publicable=False, removable=False, ), ] ) return node