Quickstart Examples

Creating and running basis Process

A Plumpy process can be create and run with:

  1. Copy and paste the following code block into a new file called helloWorld.py:

# -*- coding: utf-8 -*-
import plumpy


class HelloWorld(plumpy.Process):

    @classmethod
    def define(cls, spec):
        super().define(spec)
        spec.input('name', default='World', required=True)
        spec.output('greeting', valid_type=str)

    def run(self):
        self.out('greeting', 'Hello {:}!'.format(self.inputs.name))
        return plumpy.Stop(None, True)


def launch():
    process = HelloWorld(inputs={'name': 'foobar'})
    print('Process State: {:}'.format(process.state))
    process.execute()

    print('Process State: {:}'.format(process.state))
    print('{:}'.format(process.outputs['greeting']))

    # default inputs
    process = HelloWorld()
    process.execute()
    print('{:}'.format(process.outputs['greeting']))


if __name__ == '__main__':
    launch()
  1. run the process:

    (venv) $ python helloWorld.py
    

Process can wait, pause, play and resume

The example below shows how process state transition with different action:

# -*- coding: utf-8 -*-
from kiwipy import rmq
import plumpy


class WaitForResumeProc(plumpy.Process):

    def run(self):
        print('Now I am running: {:}'.format(self.state))
        return plumpy.Wait(self.after_resume_and_exec)

    def after_resume_and_exec(self):
        print('After resume from waiting state: {:}'.format(self.state))


kwargs = {
    'connection_params': {
        'url': 'amqp://guest:guest@127.0.0.1:5672/'
    },
    'message_exchange': '{}.{}'.format('WaitForResume', 'uuid-0'),
    'task_exchange': '{}.{}'.format('WaitForResume', 'uuid-0'),
    'task_queue': '{}.{}'.format('WaitForResume', 'uuid-0')
}

if __name__ == '__main__':
    with rmq.RmqThreadCommunicator.connect(**kwargs) as communicator:
        proc = WaitForResumeProc(communicator=communicator)
        process_controller = plumpy.RemoteProcessThreadController(communicator)

        status_future = process_controller.get_status(proc.pid)
        print(status_future.result())  # pause: False

        process_controller.pause_process(proc.pid)
        status_future = process_controller.get_status(proc.pid)
        print(status_future.result())  # pause: True

        process_controller.play_process(proc.pid)
        status_future = process_controller.get_status(proc.pid)
        print(status_future.result())  # pause: False

Remote controled process

process start.

script to kill that process

Creating and running basis WorkChain

The WorkChain is a special process that can strung different small function together into a independent process.

See the example below:

# -*- coding: utf-8 -*-
import plumpy


class AddAndMulWF(plumpy.WorkChain):

    @classmethod
    def define(cls, spec):
        super().define(spec)
        spec.input('ini', valid_type=float, default=1.0)
        spec.input('add', valid_type=int, required=True)
        spec.input('mul', valid_type=int, required=True)
        spec.output('result', valid_type=float)
        spec.outline(
            cls.add,
            cls.mul,
        )

    def add(self):
        self.ctx.addresult = self.inputs.ini + self.inputs.add

    def mul(self):
        result = self.ctx.addresult * self.inputs.mul
        self.out('result', result)


def launch():
    workchain = AddAndMulWF(inputs={'ini': 10.0, 'add': 1, 'mul': 2})
    workchain.execute()
    print(workchain.outputs['result'])  # prints 22.0


if __name__ == '__main__':
    launch()