Source code for plynx.service.worker

"""Main PLynx worker service and utils"""

import logging
import socket
import sys
import threading
import uuid
from typing import Dict, Optional, Set

from plynx.base.executor import BaseExecutor
from plynx.db.worker_state import WorkerState
from plynx.utils.common import ObjectId
from plynx.utils.config import WorkerConfig, get_worker_config
from plynx.utils.executor import DBJobExecutor, materialize_executor, post_request


[docs]class Worker: # TODO update docstring """Worker main class. On the high level Worker distributes Jobs over all available Workers and updates statuses of the Graphs in the database. Worker performs several roles: * Pull graphs in status READY from the database. * Create Schedulers for each Graph. * Populate the queue of the Jobs. * Distribute Jobs accross Workers. * Keep track of Job's statuses. * Process CANCEL requests. * Update Graph's statuses. * Track worker status and last response. """ # pylint: disable=too-many-instance-attributes # Define sync with database timeout
[docs] SDB_STATUS_UPDATE_TIMEOUT: int = 1
# Worker State update timeout
[docs] WORKER_STATE_UPDATE_TIMEOUT: int = 1
def __init__(self, worker_config: WorkerConfig, worker_id: Optional[str]): self.worker_id = worker_id if worker_id else str(uuid.uuid1()) self.kinds = worker_config.kinds self.host = socket.gethostname() self._stop_event = threading.Event() # Mapping keep track of Worker Status self._run_id_to_executor: Dict[ObjectId, BaseExecutor] = {} self._run_id_to_executor_lock = threading.Lock() # Start new threads self._thread_db_status_update = threading.Thread(target=self._run_db_status_update, args=()) self._thread_db_status_update.start() self._thread_worker_state_update = threading.Thread(target=self._run_worker_state_update, args=()) self._thread_worker_state_update.start() self._killed_run_ids: Set[ObjectId] = set()
[docs] def serve_forever(self): """ Run the worker. """ self._stop_event.wait()
[docs] def execute_job(self, executor: BaseExecutor): """Run a single job in the executor""" assert executor.node, "Executor has no `node` attribute defined" db_executor = DBJobExecutor(executor) db_executor.run() with self._run_id_to_executor_lock: del self._run_id_to_executor[executor.node._id]
[docs] def _run_db_status_update(self): """Syncing with the database.""" try: while not self._stop_event.is_set(): response = post_request("pick_run", data={"kinds": self.kinds}) if response: node = response["node"] else: node = None logging.error("Failed to pick a run.") if node: logging.info(f"New node found: {node['_id']} {node['node_running_status']} {node['title']}") executor = materialize_executor(node) with self._run_id_to_executor_lock: self._run_id_to_executor[executor.node._id] = executor thread = threading.Thread(target=self.execute_job, args=(executor, )) thread.start() else: self._stop_event.wait(timeout=Worker.SDB_STATUS_UPDATE_TIMEOUT) except Exception: self.stop() raise finally: logging.info(f"Exit {self._run_db_status_update.__name__}")
[docs] def _run_worker_state_update(self): """Syncing with the database.""" try: while not self._stop_event.is_set(): # TODO move CANCEL to a separate thread run_ids = list(self._run_id_to_executor.keys()) if run_ids: response = post_request("get_run_cancelations", data={"run_ids": run_ids}, num_retries=1) runs_to_kill = [] if not response else response["run_ids_to_cancel"] for run_id in runs_to_kill: self._run_id_to_executor[run_id].kill() runs = [] with self._run_id_to_executor_lock: for executor in self._run_id_to_executor.values(): runs.append(executor.node.to_dict()) worker_state = WorkerState( worker_id=self.worker_id, host=self.host, runs=runs, kinds=self.kinds, ) post_request("push_worker_state", data={"worker_state": worker_state.to_dict()}, num_retries=1) self._stop_event.wait(timeout=Worker.WORKER_STATE_UPDATE_TIMEOUT) except Exception: self.stop() raise finally: logging.info(f"Exit {self._run_worker_state_update.__name__}")
[docs] def stop(self): """Stop worker.""" self._stop_event.set()
[docs]def run_worker(worker_id: Optional[str] = None): """Run worker daemon. It will run in the same thread.""" logging.info('Init Worker') worker_config = get_worker_config() logging.info(worker_config) worker = Worker(worker_config, worker_id) # Activate the server; this will keep running until you # interrupt the program with Ctrl-C try: logging.info("Start serving") worker.serve_forever() except KeyboardInterrupt: worker.stop() sys.exit(0)