Source code for plumpy.process_comms

# -*- coding: utf-8 -*-
"""Module for process level communication functions and classes"""
import asyncio
import copy
import logging

import kiwipy

from . import loaders
from . import communications
from . import futures
from . import persistence

__all__ = [
    'PAUSE_MSG',
    'PLAY_MSG',
    'KILL_MSG',
    'STATUS_MSG',
    'ProcessLauncher',
    'create_continue_body',
    'create_launch_body',
    'RemoteProcessThreadController',
    'RemoteProcessController',
]

INTENT_KEY = 'intent'
MESSAGE_KEY = 'message'


class Intent:
    """Intent constants for a process message"""
    # pylint: disable=too-few-public-methods
    PLAY = 'play'
    PAUSE = 'pause'
    KILL = 'kill'
    STATUS = 'status'


PAUSE_MSG = {INTENT_KEY: Intent.PAUSE}
PLAY_MSG = {INTENT_KEY: Intent.PLAY}
KILL_MSG = {INTENT_KEY: Intent.KILL}
STATUS_MSG = {INTENT_KEY: Intent.STATUS}

TASK_KEY = 'task'
TASK_ARGS = 'args'
PERSIST_KEY = 'persist'
# Launch
PROCESS_CLASS_KEY = 'process_class'
ARGS_KEY = 'init_args'
KWARGS_KEY = 'init_kwargs'
NOWAIT_KEY = 'nowait'
# Continue
PID_KEY = 'pid'
TAG_KEY = 'tag'
# Task types
LAUNCH_TASK = 'launch'
CONTINUE_TASK = 'continue'
CREATE_TASK = 'create'

LOGGER = logging.getLogger(__name__)


def create_launch_body(process_class, init_args=None, init_kwargs=None, persist=False, loader=None, nowait=True):
    """
    Create a message body for the launch action

    :param process_class: the class of the process to launch
    :param init_args: any initialisation positional arguments
    :param init_kwargs: any initialisation keyword arguments
    :param persist: persist this process if True, otherwise don't
    :param loader: the loader to use to load the persisted process
    :param nowait: wait for the process to finish before completing the task, otherwise just return the PID
    :return: a dictionary with the body of the message to launch the process
    :rtype: dict
    """
    if loader is None:
        loader = loaders.get_object_loader()

    msg_body = {
        TASK_KEY: LAUNCH_TASK,
        TASK_ARGS: {
            PROCESS_CLASS_KEY: loader.identify_object(process_class),
            PERSIST_KEY: persist,
            NOWAIT_KEY: nowait,
            ARGS_KEY: init_args,
            KWARGS_KEY: init_kwargs
        }
    }
    return msg_body


def create_continue_body(pid, tag=None, nowait=False):
    """
    Create a message body to continue an existing process
    :param pid: the pid of the existing process
    :param tag: the optional persistence tag
    :param nowait: wait for the process to finish before completing the task, otherwise just return the PID
    :return: a dictionary with the body of the message to continue the process
    :rtype: dict
    """
    msg_body = {TASK_KEY: CONTINUE_TASK, TASK_ARGS: {PID_KEY: pid, NOWAIT_KEY: nowait, TAG_KEY: tag}}
    return msg_body


def create_create_body(process_class, init_args=None, init_kwargs=None, persist=False, loader=None):
    """
    Create a message body to create a new process
    :param process_class: the class of the process to launch
    :param init_args: any initialisation positional arguments
    :param init_kwargs: any initialisation keyword arguments
    :param persist: persist this process if True, otherwise don't
    :param loader: the loader to use to load the persisted process
    :return: a dictionary with the body of the message to launch the process
    :rtype: dict
    """
    if loader is None:
        loader = loaders.get_object_loader()

    msg_body = {
        TASK_KEY: CREATE_TASK,
        TASK_ARGS: {
            PROCESS_CLASS_KEY: loader.identify_object(process_class),
            PERSIST_KEY: persist,
            ARGS_KEY: init_args,
            KWARGS_KEY: init_kwargs
        }
    }
    return msg_body


class RemoteProcessController:
    """
    Control remote processes using coroutines that will send messages and wait
    (in a non-blocking way) for their response
    """

    def __init__(self, communicator):
        self._communicator = communicator

    async def get_status(self, pid):
        """
        Get the status of a process with the given PID
        :param pid: the process id
        :return: the status response from the process
        """
        future = self._communicator.rpc_send(pid, STATUS_MSG)
        result = await asyncio.wrap_future(future)
        return result

    async def pause_process(self, pid, msg=None):
        """
        Pause the process

        :param pid: the pid of the process to pause
        :param msg: optional pause message
        :return: True if paused, False otherwise
        """
        message = copy.copy(PAUSE_MSG)
        if msg is not None:
            message[MESSAGE_KEY] = msg

        pause_future = self._communicator.rpc_send(pid, message)
        # rpc_send return a thread future from communicator
        future = await asyncio.wrap_future(pause_future)
        # future is just returned from rpc call which return a kiwipy future
        result = await asyncio.wrap_future(future)
        return result

    async def play_process(self, pid):
        """
        Play the process

        :param pid: the pid of the process to play
        :return: True if played, False otherwise
        """
        play_future = self._communicator.rpc_send(pid, PLAY_MSG)
        future = await asyncio.wrap_future(play_future)
        result = await asyncio.wrap_future(future)
        return result

    async def kill_process(self, pid, msg=None):
        """
        Kill the process

        :param pid: the pid of the process to kill
        :param msg: optional kill message
        :return: True if killed, False otherwise
        """
        message = copy.copy(KILL_MSG)
        if msg is not None:
            message[MESSAGE_KEY] = msg

        # Wait for the communication to go through
        kill_future = self._communicator.rpc_send(pid, message)
        future = await asyncio.wrap_future(kill_future)
        # Now wait for the kill to be enacted
        result = await asyncio.wrap_future(future)

        return result

    async def continue_process(self, pid, tag=None, nowait=False, no_reply=False):
        """
        Continue the process

        :param _communicator: the communicator
        :param pid: the pid of the process to continue
        :param tag: the checkpoint tag to continue from
        """
        message = create_continue_body(pid=pid, tag=tag, nowait=nowait)
        # Wait for the communication to go through
        continue_future = self._communicator.task_send(message, no_reply=no_reply)
        future = await asyncio.wrap_future(continue_future)

        if no_reply:
            return

        # Now wait for the result of the task
        result = await asyncio.wrap_future(future)
        return result

    async def launch_process(
        self,
        process_class,
        init_args=None,
        init_kwargs=None,
        persist=False,
        loader=None,
        nowait=False,
        no_reply=False
    ):
        """
        Launch a process given the class and constructor arguments

        :param process_class: the class of the process to launch
        :param init_args: the constructor positional arguments
        :param init_kwargs: the constructor keyword arguments
        :param persist: should the process be persisted
        :param loader: the classloader to use
        :param nowait: if True, don't wait for the process to send a response, just return the pid
        :param no_reply: if True, this call will be fire-and-forget, i.e. no return value
        :return: the result of launching the process
        """
        # pylint: disable=too-many-arguments
        message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait)
        launch_future = self._communicator.task_send(message, no_reply=no_reply)
        future = await asyncio.wrap_future(launch_future)

        if no_reply:
            return

        result = await asyncio.wrap_future(future)
        return result

    async def execute_process(
        self, process_class, init_args=None, init_kwargs=None, loader=None, nowait=False, no_reply=False
    ):
        """
        Execute a process.  This call will first send a create task and then a continue task over
        the communicator.  This means that if communicator messages are durable then the process
        will run until the end even if this interpreter instance ceases to exist.

        :param process_class: the process class to execute
        :param init_args: the positional arguments to the class constructor
        :param init_kwargs: the keyword arguments to the class constructor
        :param loader: the class loader to use
        :param nowait: if True, don't wait for the process to send a response
        :param no_reply: if True, this call will be fire-and-forget, i.e. no return value
        :return: the result of executing the process
        """
        # pylint: disable=too-many-arguments
        message = create_create_body(process_class, init_args, init_kwargs, persist=True, loader=loader)

        create_future = self._communicator.task_send(message)
        future = await asyncio.wrap_future(create_future)
        pid = await asyncio.wrap_future(future)

        message = create_continue_body(pid, nowait=nowait)
        continue_future = self._communicator.task_send(message, no_reply=no_reply)
        future = await asyncio.wrap_future(continue_future)

        if no_reply:
            return

        result = await asyncio.wrap_future(future)
        return result


[docs]class RemoteProcessThreadController: """ A class that can be used to control and launch remote processes """ def __init__(self, communicator): """ Create a new process controller :param communicator: the communicator to use :type communicator: :class:`kiwipy.Communicator` """ self._communicator = communicator
[docs] def get_status(self, pid): """ Get the status of a process with the given PID :param pid: the process id :return: the status response from the process """ return self._communicator.rpc_send(pid, STATUS_MSG)
[docs] def pause_process(self, pid, msg=None): """ Pause the process :param pid: the pid of the process to pause :param msg: optional pause message :return: a response future from the process to be paused :rtype: :class:`kiwipy.Future` """ message = copy.copy(PAUSE_MSG) if msg is not None: message[MESSAGE_KEY] = msg return self._communicator.rpc_send(pid, message)
[docs] def pause_all(self, msg): """ Pause all processes that are subscribed to the same communicator :param msg: an optional pause message """ self._communicator.broadcast_send(msg, subject=Intent.PAUSE)
[docs] def play_process(self, pid): """ Play the process :param pid: the pid of the process to pause :return: a response future from the process to be played :rtype: :class:`kiwipy.Future` """ return self._communicator.rpc_send(pid, PLAY_MSG)
[docs] def play_all(self): """ Play all processes that are subscribed to the same communicator """ self._communicator.broadcast_send(None, subject=Intent.PLAY)
[docs] def kill_process(self, pid, msg=None): """ Kill the process :param pid: the pid of the process to kill :param msg: optional kill message :return: a response future from the process to be killed :rtype: :class:`kiwipy.Future` """ message = copy.copy(KILL_MSG) if msg is not None: message[MESSAGE_KEY] = msg return self._communicator.rpc_send(pid, message)
[docs] def kill_all(self, msg): """ Kill all processes that are subscribed to the same communicator :param msg: an optional pause message """ self._communicator.broadcast_send(msg, subject=Intent.KILL)
def continue_process(self, pid, tag=None, nowait=False, no_reply=False): message = create_continue_body(pid=pid, tag=tag, nowait=nowait) return self.task_send(message, no_reply=no_reply)
[docs] def launch_process( self, process_class, init_args=None, init_kwargs=None, persist=False, loader=None, nowait=False, no_reply=False ): # pylint: disable=too-many-arguments """ Launch the process :param process_class: the process class to launch :param init_args: positional arguments to the process constructor :param init_kwargs: keyword arguments to the process constructor :param persist: should the process be persisted :param loader: the class loader to use :param nowait: if True only return when the process finishes :param no_reply: don't send a reply to the sender :return: the pid of the created process or the outputs (if nowait=False) """ message = create_launch_body(process_class, init_args, init_kwargs, persist, loader, nowait) return self.task_send(message, no_reply=no_reply)
[docs] def execute_process( self, process_class, init_args=None, init_kwargs=None, loader=None, nowait=False, no_reply=False ): """ Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist. :param process_class: the process class to execute :param init_args: the positional arguments to the class constructor :param init_kwargs: the keyword arguments to the class constructor :param loader: the class loader to use :param nowait: if True, don't wait for the process to send a response :param no_reply: if True, this call will be fire-and-forget, i.e. no return value :return: the result of executing the process """ # pylint: disable=too-many-arguments message = create_create_body(process_class, init_args, init_kwargs, persist=True, loader=loader) execute_future = kiwipy.Future() create_future = futures.unwrap_kiwi_future(self._communicator.task_send(message)) def on_created(_): with kiwipy.capture_exceptions(execute_future): pid = create_future.result() continue_future = self.continue_process(pid, nowait=nowait, no_reply=no_reply) kiwipy.chain(continue_future, execute_future) create_future.add_done_callback(on_created) return execute_future
[docs] def task_send(self, message, no_reply=False): """ Send a task to be performed using the communicator :param message: the task message :param no_reply: if True, this call will be fire-and-forget, i.e. no return value :return: the response from the remote side (if no_reply=False) """ return self._communicator.task_send(message, no_reply=no_reply)
class ProcessLauncher: """ Takes incoming task messages and uses them to launch processes. Expected format of task: For launch:: { 'task': <LAUNCH_TASK> 'process_class': <Process class to launch> 'args': <tuple of positional args for process constructor> 'kwargs': <dict of keyword args for process constructor>. 'nowait': True or False } For continue:: { 'task': <CONTINUE_TASK> 'pid': <Process ID> 'nowait': True or False } """ def __init__(self, loop=None, persister=None, load_context=None, loader=None): self._loop = loop self._persister = persister self._load_context = load_context if load_context is not None else persistence.LoadSaveContext() if loader is not None: self._loader = loader self._load_context = self._load_context.copyextend(loader=loader) else: self._loader = loaders.get_object_loader() async def __call__(self, communicator, task): """ Receive a task. :param task: The task message """ task_type = task[TASK_KEY] if task_type == LAUNCH_TASK: return await self._launch(communicator, **task.get(TASK_ARGS, {})) if task_type == CONTINUE_TASK: return await self._continue(communicator, **task.get(TASK_ARGS, {})) if task_type == CREATE_TASK: return await self._create(communicator, **task.get(TASK_ARGS, {})) raise communications.TaskRejected async def _launch(self, _communicator, process_class, persist, nowait, init_args=None, init_kwargs=None): """ Launch the process :param _communicator: the communicator :param process_class: the process class to launch :param persist: should the process be persisted :param nowait: if True only return when the process finishes :param init_args: positional arguments to the process constructor :param init_kwargs: keyword arguments to the process constructor :return: the pid of the created process or the outputs (if nowait=False) """ if persist and not self._persister: raise communications.TaskRejected('Cannot persist process, no persister') if init_args is None: init_args = () if init_kwargs is None: init_kwargs = {} proc_class = self._loader.load_object(process_class) proc = proc_class(*init_args, **init_kwargs) if persist: self._persister.save_checkpoint(proc) if nowait: asyncio.ensure_future(proc.step_until_terminated()) return proc.pid await proc.step_until_terminated() return proc.future().result() async def _continue(self, _communicator, pid, nowait, tag=None): """ Continue the process :param _communicator: the communicator :param pid: the pid of the process to continue :param nowait: if True don't wait for the process to complete :param tag: the checkpoint tag to continue from """ if not self._persister: LOGGER.warning('rejecting task: cannot continue process<%d> because no persister is available', pid) raise communications.TaskRejected('Cannot continue process, no persister') # Do not catch exceptions here, because if these operations fail, the continue task should except and bubble up saved_state = self._persister.load_checkpoint(pid, tag) proc = saved_state.unbundle(self._load_context) if nowait: asyncio.ensure_future(proc.step_until_terminated()) return proc.pid await proc.step_until_terminated() return proc.future().result() async def _create(self, _communicator, process_class, persist, init_args=None, init_kwargs=None): """ Create the process :param _communicator: the communicator :param process_class: the process class to create :param persist: should the process be persisted :param init_args: positional arguments to the process constructor :param init_kwargs: keyword arguments to the process constructor :return: the pid of the created process """ if persist and not self._persister: raise communications.TaskRejected('Cannot persist process, no persister') if init_args is None: init_args = () if init_kwargs is None: init_kwargs = {} proc_class = self._loader.load_object(process_class) proc = proc_class(*init_args, **init_kwargs) if persist: self._persister.save_checkpoint(proc) return proc.pid