Quickstart Examples¶
Creating and running basis Process¶
A Plumpy process can be create and run with:
Copy and paste the following code block into a new file called
helloWorld.py
:
# -*- coding: utf-8 -*-
import plumpy
class HelloWorld(plumpy.Process):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('name', default='World', required=True)
spec.output('greeting', valid_type=str)
def run(self):
self.out('greeting', 'Hello {:}!'.format(self.inputs.name))
return plumpy.Stop(None, True)
def launch():
process = HelloWorld(inputs={'name': 'foobar'})
print('Process State: {:}'.format(process.state))
process.execute()
print('Process State: {:}'.format(process.state))
print('{:}'.format(process.outputs['greeting']))
# default inputs
process = HelloWorld()
process.execute()
print('{:}'.format(process.outputs['greeting']))
if __name__ == '__main__':
launch()
run the process:
(venv) $ python helloWorld.py
Process can wait, pause, play and resume¶
The example below shows how process state transition with different action:
# -*- coding: utf-8 -*-
from kiwipy import rmq
import plumpy
class WaitForResumeProc(plumpy.Process):
def run(self):
print('Now I am running: {:}'.format(self.state))
return plumpy.Wait(self.after_resume_and_exec)
def after_resume_and_exec(self):
print('After resume from waiting state: {:}'.format(self.state))
kwargs = {
'connection_params': {
'url': 'amqp://guest:guest@127.0.0.1:5672/'
},
'message_exchange': '{}.{}'.format('WaitForResume', 'uuid-0'),
'task_exchange': '{}.{}'.format('WaitForResume', 'uuid-0'),
'task_queue': '{}.{}'.format('WaitForResume', 'uuid-0')
}
if __name__ == '__main__':
with rmq.RmqThreadCommunicator.connect(**kwargs) as communicator:
proc = WaitForResumeProc(communicator=communicator)
process_controller = plumpy.RemoteProcessThreadController(communicator)
status_future = process_controller.get_status(proc.pid)
print(status_future.result()) # pause: False
process_controller.pause_process(proc.pid)
status_future = process_controller.get_status(proc.pid)
print(status_future.result()) # pause: True
process_controller.play_process(proc.pid)
status_future = process_controller.get_status(proc.pid)
print(status_future.result()) # pause: False
Creating and running basis WorkChain¶
The WorkChain is a special process that can strung different small function together into a independent process.
See the example below:
# -*- coding: utf-8 -*-
import plumpy
class AddAndMulWF(plumpy.WorkChain):
@classmethod
def define(cls, spec):
super().define(spec)
spec.input('ini', valid_type=float, default=1.0)
spec.input('add', valid_type=int, required=True)
spec.input('mul', valid_type=int, required=True)
spec.output('result', valid_type=float)
spec.outline(
cls.add,
cls.mul,
)
def add(self):
self.ctx.addresult = self.inputs.ini + self.inputs.add
def mul(self):
result = self.ctx.addresult * self.inputs.mul
self.out('result', result)
def launch():
workchain = AddAndMulWF(inputs={'ini': 10.0, 'add': 1, 'mul': 2})
workchain.execute()
print(workchain.outputs['result']) # prints 22.0
if __name__ == '__main__':
launch()