API Reference¶
Process¶
Implement this API to support a new Process.
- class
plumpy.processes.
Process
(*args, **kwargs)[source]The Process class is the base for any unit of work in plumpy.
A process can be in one of the following states:
CREATED
RUNNING
WAITING
FINISHED
EXCEPTED
KILLED
as defined in the
ProcessState
enum.___ | v CREATED (x) --- RUNNING (x) --- FINISHED (o) | ^ / v | / WAITING (x) -- | ^ --- * -- EXCEPTED (o) * -- KILLED (o)
(o): terminal state
(x): non terminal state
When a Process enters a state is always gets a corresponding message, e.g. on entering RUNNING it will receive the on_run message. These are always called immediately after that state is entered but before being executed.
add_cleanup
(cleanup)[source]add cleanup callback which will be run when the process is being closed.
add_process_listener
(listener)[source]Add a process listener to process. It defines the actions when process is triggering the specific state condition.
- Parameters
listener (
plumpy.ProcessListener
) – a process listener.
broadcast_receive
(_comm, body, sender, subject, correlation_id)[source]Coroutine called when the process receives a message from the communicator
- Parameters
_comm (
kiwipy.Communicator
) – the communicator that sent the messagemsg – the message
call_soon
(callback, *args, **kwargs)[source]Schedule a callback to what is considered an internal process function (this needn’t be a method). If it raises an exception it will cause the process to fail.
close
()[source]Calling this method indicates that this process should not ran anymore and will trigger any runtime resources (such as the communicator connection) to be cleaned up. The state of the process will still be accessible.
It is safe to call this method multiple times.
create_initial_state
()[source]This method is here to override of its superclass. Automatically enter the CREATED state when the process is created.
- Returns
A Created state of type
plumpy.Created
- property
creation_time
The creation time of this Process as returned by time.time() when instantiated :return: The creation time :rtype: float
- classmethod
current
()[source]Get the currently running process i.e. the one at the top of the stack
- Returns
the currently running process
- Return type
plumpy.Process
decode_input_args
(encoded)[source]Decode saved input arguments as they came from the saved instance state
plumpy.Bundle
. The decoded inputs should contain no reference to the encoded inputs that were passed in. This often will mean making a deepcopy of the encoded input dictionary.
- Parameters
encoded –
- Returns
The decoded input args
- classmethod
define
(_spec)[source]The method which define the specification of the process. Normally should be override by the subclass.
done
()[source]Return True if the call was successfully killed or finished running. :rtype: bool
encode_input_args
(inputs)[source]Encode input arguments such that they may be saved in a
plumpy.Bundle
. The encoded inputs should contain no reference to the inputs that were passed in. This often will mean making a deepcopy of the input dictionary.
- Parameters
inputs – A mapping of the inputs as passed to the process
- Returns
The encoded inputs
exception
()[source]Return exception if the process is terminated in excepted state.
execute
()[source]Execute the process. This will return if the process terminates or is paused.
- Returns
None if not terminated, otherwise self.outputs
fail
(exception, trace_back=None)[source]Fail the process in response to an exception :param exception: The exception that caused the failure :param trace_back: Optional exception traceback
future
()[source]Return a savable future represnet an eventual result of an asynchronous operation. The result is set at the terminal state.
- Return type
plumpy.SavableFuture
- classmethod
get_description
()[source]Get a human readable description of what this
Process
does.
- Returns
The description.
- Return type
dict
- classmethod
get_name
()[source]Return the process name i.e. the class name of which inherit the Process
- Returns
The process name
- Return type
str
- classmethod
get_states
()[source]Return all allowed states of the process.
get_status_info
(out_status_info)[source]Get updated status information of process.
- Parameters
out_status_info (dict) – the old status
has_terminated
()[source]Return whether the process was tarminated.
- Return type
Bool. True if it is in terminal states.
- property
inputs
Get the parsed inputs.
- property
is_successful
Return whether the result of the process is considered successful.
- Returns
boolean, True if the process is in Finished state with successful attribute set to True
kill
(msg: Optional[str] = None) → Union[bool, _asyncio.Future][source]Kill the process :param msg: An optional kill message
killed
()[source]Return whether the process is killed.
killed_msg
()[source]Get the kiiled massage.
launch
(process_class, inputs=None, pid=None, logger=None)[source]Start running the nested process asynchronously without blocking other task in the event loop.
load_instance_state
(saved_state, load_context)[source]Load the process from its saved instance state.
- Parameters
saved_state – A bundle to load the state from
load_context – The load context
log_with_pid
(level, msg)[source]logging the messages with process pid.
- property
logger
Get the logger for this class. If not set, return the default logger.
- Returns
The logger.
- Return type
logging.Logger
- property
loop
Return the event loop of the process.
message_receive
(_comm, msg)[source]Coroutine called when the process receives a message from the communicator
- Parameters
_comm (
kiwipy.Communicator
) – the communicator that sent the messagemsg – the message
- Returns
the outcome of processing the message, the return value will be sent back as a response to the sender
on_output_emitting
(output_port, value)[source]Output is about to be emitted.
on_terminated
()[source]Call when a terminal state is reached.
out
(output_port, value)[source]Record an output value for a specific output port. If the output port matches an explicitly defined Port it will be validated against that. If not it will be validated against the PortNamespace, which means it will be checked for dynamicity and whether the type of the value is valid
- Parameters
output_port (str) – the name of the output port, can be namespaced
value – the value for the output port
- Raises
ValueError if the output value is not validated against the port
- property
outputs
Get the current outputs emitted by the Process. These may grow over time as the process runs.
- Returns
A mapping of {output_port: value} outputs
- Return type
dict
pause
(msg: Optional[str] = None) → Union[bool, _asyncio.Future][source]Pause the process. Returns True if after this call the process is paused, False otherwise
- Parameters
msg – an optional message to set as the status. The current status will be saved in the private _pre_paused_status attribute, such that it can be restored when the process is played again.
- property
paused
Return whether the process was being pased. :rtype: boolean
- property
pid
Get the pid of the process
play
()[source]Play a process. Returns True if after this call the process is playing, False otherwise
- Returns
True if playing, False otherwise
- property
raw_inputs
raw_inputs is the AttributesFrozendict of inputs if not None
- classmethod
recreate_from
(saved_state, load_context=None)[source]Recreate a process from a saved state, passing any positional and keyword arguments on to load_instance_state
- Parameters
saved_state – The saved state to load from
load_context (
persistence.LoadSaveContext
) – The load context to use- Returns
An instance of the object with its state loaded from the save state.
- Return type
Process
recreate_state
(saved_state)[source]Create a state object from a saved state
- Parameters
saved_state (
Bundle
) – The saved state- Returns
An instance of the object with its state loaded from the save state.
remove_process_listener
(listener)[source]Remove a process listener to process.
- Parameters
listener (
plumpy.ProcessListener
) – a process listener.
result
()[source]Get the result from the process if it is finished. If the process was killed then a KilledError will be raise. If the process has excepted then the failing exception will be raised. If in any other state this will raise an InvalidStateError. :return: The result of the process
resume
(*args)[source]Start running the process again
run
()[source]The function will be run when process is triggered. Should be override by the its subclass.
save_instance_state
(out_state, save_context)[source]Ask the process to save its current instance state.
- Parameters
out_state (
plumpy.Bundle
) – A bundle to save the state tosave_context – The save context
set_logger
(logger)[source]Set the logger of the process.
set_status
(status)[source]Set the status message of the process.
- property
status
Get the status massage of the process.
step
()[source]step is running synchronously in its own process and asynchronously with steps in other processes. The execute function which is running in this method is dependent on the state of the process.
- async
step_until_terminated
()[source]This is the function that truely run by the event loop. If the process has not terminated, run the current step and wait until the step finished.
successful
()[source]Returns whether the result of the process is considered successful Will raise if the process is not in the FINISHED state
- property
uuid
Get the uuid of the process
Finite State Machine¶
Implement this API to support a new state machine.
- class
plumpy.base.state_machine.
State
(state_machine)[source]
execute
()[source]Execute the state, performing the actions that this state is responsible for. Return a state to transition to or None if finished.
- property
label
Convenience property to get the state label
- class
plumpy.base.state_machine.
StateMachine
(*args, **kwargs)[source]
add_state_event_callback
(hook, callback)[source]Add a callback to be called on a particular state event hook. The callback should have form fn(state_machine, hook, state)
- Parameters
hook – The state event hook
callback – The callback function
- static
transition_failed
(initial_state, final_state, exception, trace)[source]Called when a state transitions fails. This method can be overwritten to change the default behaviour which is to raise the exception.
- Parameters
exception (
Exception
) – The transition failed exception
WorkChain¶
Implement this API to support a new WorkChain.
- class
plumpy.workchains.
WorkChain
(*args, **kwargs)[source]A WorkChain is a series of instructions carried out with the ability to save state in between.
load_instance_state
(saved_state, load_context)[source]Load the process from its saved instance state.
- Parameters
saved_state – A bundle to load the state from
load_context – The load context
run
()[source]The function will be run when process is triggered. Should be override by the its subclass.
save_instance_state
(out_state, save_context)[source]Ask the process to save its current instance state.
- Parameters
out_state (
plumpy.Bundle
) – A bundle to save the state tosave_context – The save context
to_context
(**kwargs)[source]This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the workchain
Controller¶
-
class
plumpy.process_comms.
RemoteProcessThreadController
(communicator)[source] A class that can be used to control and launch remote processes
-
execute_process
(process_class, init_args=None, init_kwargs=None, loader=None, nowait=False, no_reply=False)[source] Execute a process. This call will first send a create task and then a continue task over the communicator. This means that if communicator messages are durable then the process will run until the end even if this interpreter instance ceases to exist.
- Parameters
process_class – the process class to execute
init_args – the positional arguments to the class constructor
init_kwargs – the keyword arguments to the class constructor
loader – the class loader to use
nowait – if True, don’t wait for the process to send a response
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the result of executing the process
-
get_status
(pid)[source] Get the status of a process with the given PID :param pid: the process id :return: the status response from the process
-
kill_all
(msg)[source] Kill all processes that are subscribed to the same communicator
- Parameters
msg – an optional pause message
-
kill_process
(pid, msg=None)[source] Kill the process
- Parameters
pid – the pid of the process to kill
msg – optional kill message
- Returns
a response future from the process to be killed
- Return type
kiwipy.Future
-
launch_process
(process_class, init_args=None, init_kwargs=None, persist=False, loader=None, nowait=False, no_reply=False)[source] Launch the process
- Parameters
process_class – the process class to launch
init_args – positional arguments to the process constructor
init_kwargs – keyword arguments to the process constructor
persist – should the process be persisted
loader – the class loader to use
nowait – if True only return when the process finishes
no_reply – don’t send a reply to the sender
- Returns
the pid of the created process or the outputs (if nowait=False)
-
pause_all
(msg)[source] Pause all processes that are subscribed to the same communicator
- Parameters
msg – an optional pause message
-
pause_process
(pid, msg=None)[source] Pause the process
- Parameters
pid – the pid of the process to pause
msg – optional pause message
- Returns
a response future from the process to be paused
- Return type
kiwipy.Future
-
play_all
()[source] Play all processes that are subscribed to the same communicator
-
play_process
(pid)[source] Play the process
- Parameters
pid – the pid of the process to pause
- Returns
a response future from the process to be played
- Return type
kiwipy.Future
-
task_send
(message, no_reply=False)[source] Send a task to be performed using the communicator
- Parameters
message – the task message
no_reply – if True, this call will be fire-and-forget, i.e. no return value
- Returns
the response from the remote side (if no_reply=False)
-