API Reference

Process

Implement this API to support a new Process.

class plumpy.processes.Process(*args, **kwargs)[source]

The Process class is the base for any unit of work in plumpy.

A process can be in one of the following states:

  • CREATED

  • RUNNING

  • WAITING

  • FINISHED

  • EXCEPTED

  • KILLED

as defined in the ProcessState enum.

                  ___
                 |   v
CREATED (x) --- RUNNING (x) --- FINISHED (o)
                 |   ^          /
                 v   |         /
                WAITING (x) --
                 |   ^
                  ---

* -- EXCEPTED (o)
* -- KILLED (o)
  • (o): terminal state

  • (x): non terminal state

When a Process enters a state is always gets a corresponding message, e.g. on entering RUNNING it will receive the on_run message. These are always called immediately after that state is entered but before being executed.

add_cleanup(cleanup)[source]

add cleanup callback which will be run when the process is being closed.

add_process_listener(listener)[source]

Add a process listener to process. It defines the actions when process is triggering the specific state condition.

Parameters

listener (plumpy.ProcessListener) – a process listener.

broadcast_receive(_comm, body, sender, subject, correlation_id)[source]

Coroutine called when the process receives a message from the communicator

Parameters
  • _comm (kiwipy.Communicator) – the communicator that sent the message

  • msg – the message

call_soon(callback, *args, **kwargs)[source]

Schedule a callback to what is considered an internal process function (this needn’t be a method). If it raises an exception it will cause the process to fail.

close()[source]

Calling this method indicates that this process should not ran anymore and will trigger any runtime resources (such as the communicator connection) to be cleaned up. The state of the process will still be accessible.

It is safe to call this method multiple times.

create_initial_state()[source]

This method is here to override of its superclass. Automatically enter the CREATED state when the process is created.

Returns

A Created state of type plumpy.Created

property creation_time

The creation time of this Process as returned by time.time() when instantiated :return: The creation time :rtype: float

classmethod current()[source]

Get the currently running process i.e. the one at the top of the stack

Returns

the currently running process

Return type

plumpy.Process

decode_input_args(encoded)[source]

Decode saved input arguments as they came from the saved instance state plumpy.Bundle. The decoded inputs should contain no reference to the encoded inputs that were passed in. This often will mean making a deepcopy of the encoded input dictionary.

Parameters

encoded

Returns

The decoded input args

classmethod define(_spec)[source]

The method which define the specification of the process. Normally should be override by the subclass.

done()[source]

Return True if the call was successfully killed or finished running. :rtype: bool

encode_input_args(inputs)[source]

Encode input arguments such that they may be saved in a plumpy.Bundle. The encoded inputs should contain no reference to the inputs that were passed in. This often will mean making a deepcopy of the input dictionary.

Parameters

inputs – A mapping of the inputs as passed to the process

Returns

The encoded inputs

exception()[source]

Return exception if the process is terminated in excepted state.

execute()[source]

Execute the process. This will return if the process terminates or is paused.

Returns

None if not terminated, otherwise self.outputs

fail(exception, trace_back=None)[source]

Fail the process in response to an exception :param exception: The exception that caused the failure :param trace_back: Optional exception traceback

future()[source]

Return a savable future represnet an eventual result of an asynchronous operation. The result is set at the terminal state.

Return type

plumpy.SavableFuture

classmethod get_description()[source]

Get a human readable description of what this Process does.

Returns

The description.

Return type

dict

classmethod get_name()[source]

Return the process name i.e. the class name of which inherit the Process

Returns

The process name

Return type

str

classmethod get_states()[source]

Return all allowed states of the process.

get_status_info(out_status_info)[source]

Get updated status information of process.

Parameters

out_status_info (dict) – the old status

has_terminated()[source]

Return whether the process was tarminated.

Return type

Bool. True if it is in terminal states.

property inputs

Get the parsed inputs.

property is_successful

Return whether the result of the process is considered successful.

Returns

boolean, True if the process is in Finished state with successful attribute set to True

kill(msg: Optional[str] = None) → Union[bool, _asyncio.Future][source]

Kill the process :param msg: An optional kill message

killed()[source]

Return whether the process is killed.

killed_msg()[source]

Get the kiiled massage.

launch(process_class, inputs=None, pid=None, logger=None)[source]

Start running the nested process asynchronously without blocking other task in the event loop.

load_instance_state(saved_state, load_context)[source]

Load the process from its saved instance state.

Parameters
  • saved_state – A bundle to load the state from

  • load_context – The load context

log_with_pid(level, msg)[source]

logging the messages with process pid.

property logger

Get the logger for this class. If not set, return the default logger.

Returns

The logger.

Return type

logging.Logger

property loop

Return the event loop of the process.

message_receive(_comm, msg)[source]

Coroutine called when the process receives a message from the communicator

Parameters
  • _comm (kiwipy.Communicator) – the communicator that sent the message

  • msg – the message

Returns

the outcome of processing the message, the return value will be sent back as a response to the sender

on_output_emitting(output_port, value)[source]

Output is about to be emitted.

on_terminated()[source]

Call when a terminal state is reached.

out(output_port, value)[source]

Record an output value for a specific output port. If the output port matches an explicitly defined Port it will be validated against that. If not it will be validated against the PortNamespace, which means it will be checked for dynamicity and whether the type of the value is valid

Parameters
  • output_port (str) – the name of the output port, can be namespaced

  • value – the value for the output port

Raises

ValueError if the output value is not validated against the port

property outputs

Get the current outputs emitted by the Process. These may grow over time as the process runs.

Returns

A mapping of {output_port: value} outputs

Return type

dict

pause(msg: Optional[str] = None) → Union[bool, _asyncio.Future][source]

Pause the process. Returns True if after this call the process is paused, False otherwise

Parameters

msg – an optional message to set as the status. The current status will be saved in the private _pre_paused_status attribute, such that it can be restored when the process is played again.

property paused

Return whether the process was being pased. :rtype: boolean

property pid

Get the pid of the process

play()[source]

Play a process. Returns True if after this call the process is playing, False otherwise

Returns

True if playing, False otherwise

property raw_inputs

raw_inputs is the AttributesFrozendict of inputs if not None

classmethod recreate_from(saved_state, load_context=None)[source]

Recreate a process from a saved state, passing any positional and keyword arguments on to load_instance_state

Parameters
  • saved_state – The saved state to load from

  • load_context (persistence.LoadSaveContext) – The load context to use

Returns

An instance of the object with its state loaded from the save state.

Return type

Process

recreate_state(saved_state)[source]

Create a state object from a saved state

Parameters

saved_state (Bundle) – The saved state

Returns

An instance of the object with its state loaded from the save state.

remove_process_listener(listener)[source]

Remove a process listener to process.

Parameters

listener (plumpy.ProcessListener) – a process listener.

result()[source]

Get the result from the process if it is finished. If the process was killed then a KilledError will be raise. If the process has excepted then the failing exception will be raised. If in any other state this will raise an InvalidStateError. :return: The result of the process

resume(*args)[source]

Start running the process again

run()[source]

The function will be run when process is triggered. Should be override by the its subclass.

save_instance_state(out_state, save_context)[source]

Ask the process to save its current instance state.

Parameters
  • out_state (plumpy.Bundle) – A bundle to save the state to

  • save_context – The save context

set_logger(logger)[source]

Set the logger of the process.

set_status(status)[source]

Set the status message of the process.

property status

Get the status massage of the process.

step()[source]

step is running synchronously in its own process and asynchronously with steps in other processes. The execute function which is running in this method is dependent on the state of the process.

async step_until_terminated()[source]

This is the function that truely run by the event loop. If the process has not terminated, run the current step and wait until the step finished.

successful()[source]

Returns whether the result of the process is considered successful Will raise if the process is not in the FINISHED state

property uuid

Get the uuid of the process

Finite State Machine

Implement this API to support a new state machine.

class plumpy.base.state_machine.State(state_machine)[source]
execute()[source]

Execute the state, performing the actions that this state is responsible for. Return a state to transition to or None if finished.

property label

Convenience property to get the state label

class plumpy.base.state_machine.StateMachine(*args, **kwargs)[source]
add_state_event_callback(hook, callback)[source]

Add a callback to be called on a particular state event hook. The callback should have form fn(state_machine, hook, state)

Parameters
  • hook – The state event hook

  • callback – The callback function

static transition_failed(initial_state, final_state, exception, trace)[source]

Called when a state transitions fails. This method can be overwritten to change the default behaviour which is to raise the exception.

Parameters

exception (Exception) – The transition failed exception

WorkChain

Implement this API to support a new WorkChain.

class plumpy.workchains.WorkChain(*args, **kwargs)[source]

A WorkChain is a series of instructions carried out with the ability to save state in between.

load_instance_state(saved_state, load_context)[source]

Load the process from its saved instance state.

Parameters
  • saved_state – A bundle to load the state from

  • load_context – The load context

run()[source]

The function will be run when process is triggered. Should be override by the its subclass.

save_instance_state(out_state, save_context)[source]

Ask the process to save its current instance state.

Parameters
  • out_state (plumpy.Bundle) – A bundle to save the state to

  • save_context – The save context

to_context(**kwargs)[source]

This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the workchain

Controller

class plumpy.process_comms.RemoteProcessThreadController(communicator)[source]

A class that can be used to control and launch remote processes

execute_process(process_class, init_args=None, init_kwargs=None, loader=None, nowait=False, no_reply=False)[source]

Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.

Parameters
  • process_class – the process class to execute

  • init_args – the positional arguments to the class constructor

  • init_kwargs – the keyword arguments to the class constructor

  • loader – the class loader to use

  • nowait – if True, don’t wait for the process to send a response

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the result of executing the process

get_status(pid)[source]

Get the status of a process with the given PID :param pid: the process id :return: the status response from the process

kill_all(msg)[source]

Kill all processes that are subscribed to the same communicator

Parameters

msg – an optional pause message

kill_process(pid, msg=None)[source]

Kill the process

Parameters
  • pid – the pid of the process to kill

  • msg – optional kill message

Returns

a response future from the process to be killed

Return type

kiwipy.Future

launch_process(process_class, init_args=None, init_kwargs=None, persist=False, loader=None, nowait=False, no_reply=False)[source]

Launch the process

Parameters
  • process_class – the process class to launch

  • init_args – positional arguments to the process constructor

  • init_kwargs – keyword arguments to the process constructor

  • persist – should the process be persisted

  • loader – the class loader to use

  • nowait – if True only return when the process finishes

  • no_reply – don’t send a reply to the sender

Returns

the pid of the created process or the outputs (if nowait=False)

pause_all(msg)[source]

Pause all processes that are subscribed to the same communicator

Parameters

msg – an optional pause message

pause_process(pid, msg=None)[source]

Pause the process

Parameters
  • pid – the pid of the process to pause

  • msg – optional pause message

Returns

a response future from the process to be paused

Return type

kiwipy.Future

play_all()[source]

Play all processes that are subscribed to the same communicator

play_process(pid)[source]

Play the process

Parameters

pid – the pid of the process to pause

Returns

a response future from the process to be played

Return type

kiwipy.Future

task_send(message, no_reply=False)[source]

Send a task to be performed using the communicator

Parameters
  • message – the task message

  • no_reply – if True, this call will be fire-and-forget, i.e. no return value

Returns

the response from the remote side (if no_reply=False)