Examples

While developing TaskFlow the team has worked hard to make sure the various concepts are explained by relevant examples. Here are a few selected examples to get started (ordered by perceived complexity):

To explore more of these examples please check out the examples directory in the TaskFlow source tree.

Note

If the examples provided are not satisfactory (or up to your standards) contributions are welcome and very much appreciated to help improve them. The higher the quality and the clearer the examples are the better and more useful they are for everyone.

Hello world

Note

Full source located at hello_world.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.patterns import linear_flow as lf
15from taskflow.patterns import unordered_flow as uf
16from taskflow import task
17
18
19# INTRO: This is the defacto hello world equivalent for taskflow; it shows how
20# an overly simplistic workflow can be created that runs using different
21# engines using different styles of execution (all can be used to run in
22# parallel if a workflow is provided that is parallelizable).
23
24class PrinterTask(task.Task):
25    def __init__(self, name, show_name=True, inject=None):
26        super(PrinterTask, self).__init__(name, inject=inject)
27        self._show_name = show_name
28
29    def execute(self, output):
30        if self._show_name:
31            print("%s: %s" % (self.name, output))
32        else:
33            print(output)
34
35
36# This will be the work that we want done, which for this example is just to
37# print 'hello world' (like a song) using different tasks and different
38# execution models.
39song = lf.Flow("beats")
40
41# Unordered flows when ran can be ran in parallel; and a chorus is everyone
42# singing at once of course!
43hi_chorus = uf.Flow('hello')
44world_chorus = uf.Flow('world')
45for (name, hello, world) in [('bob', 'hello', 'world'),
46                             ('joe', 'hellooo', 'worllllld'),
47                             ('sue', "helloooooo!", 'wooorllld!')]:
48    hi_chorus.add(PrinterTask("%s@hello" % name,
49                              # This will show up to the execute() method of
50                              # the task as the argument named 'output' (which
51                              # will allow us to print the character we want).
52                              inject={'output': hello}))
53    world_chorus.add(PrinterTask("%s@world" % name,
54                                 inject={'output': world}))
55
56# The composition starts with the conductor and then runs in sequence with
57# the chorus running in parallel, but no matter what the 'hello' chorus must
58# always run before the 'world' chorus (otherwise the world will fall apart).
59song.add(PrinterTask("conductor@begin",
60                     show_name=False, inject={'output': "*ding*"}),
61         hi_chorus,
62         world_chorus,
63         PrinterTask("conductor@end",
64                     show_name=False, inject={'output': "*dong*"}))
65
66# Run in parallel using eventlet green threads...
67try:
68    import eventlet as _eventlet  # noqa
69except ImportError:
70    # No eventlet currently active, skip running with it...
71    pass
72else:
73    print("-- Running in parallel using eventlet --")
74    e = engines.load(song, executor='greenthreaded', engine='parallel',
75                     max_workers=1)
76    e.run()
77
78
79# Run in parallel using real threads...
80print("-- Running in parallel using threads --")
81e = engines.load(song, executor='threaded', engine='parallel',
82                 max_workers=1)
83e.run()
84
85
86# Run in parallel using external processes...
87print("-- Running in parallel using processes --")
88e = engines.load(song, executor='processes', engine='parallel',
89                 max_workers=1)
90e.run()
91
92
93# Run serially (aka, if the workflow could have been ran in parallel, it will
94# not be when ran in this mode)...
95print("-- Running serially --")
96e = engines.load(song, engine='serial')
97e.run()
98print("-- Statistics gathered --")
99print(e.statistics)

Passing values from and to tasks

Note

Full source located at simple_linear_pass.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8self_dir = os.path.abspath(os.path.dirname(__file__))
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15from taskflow import engines
16from taskflow.patterns import linear_flow
17from taskflow import task
18
19# INTRO: This example shows how a task (in a linear/serial workflow) can
20# produce an output that can be then consumed/used by a downstream task.
21
22
23class TaskA(task.Task):
24    default_provides = 'a'
25
26    def execute(self):
27        print("Executing '%s'" % (self.name))
28        return 'a'
29
30
31class TaskB(task.Task):
32    def execute(self, a):
33        print("Executing '%s'" % (self.name))
34        print("Got input '%s'" % (a))
35
36
37print("Constructing...")
38wf = linear_flow.Flow("pass-from-to")
39wf.add(TaskA('a'), TaskB('b'))
40
41print("Loading...")
42e = engines.load(wf)
43
44print("Compiling...")
45e.compile()
46
47print("Preparing...")
48e.prepare()
49
50print("Running...")
51e.run()
52
53print("Done...")

Using listeners

Note

Full source located at echo_listener.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.DEBUG)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13from taskflow import engines
14from taskflow.listeners import logging as logging_listener
15from taskflow.patterns import linear_flow as lf
16from taskflow import task
17
18# INTRO: This example walks through a miniature workflow which will do a
19# simple echo operation; during this execution a listener is associated with
20# the engine to receive all notifications about what the flow has performed,
21# this example dumps that output to the stdout for viewing (at debug level
22# to show all the information which is possible).
23
24
25class Echo(task.Task):
26    def execute(self):
27        print(self.name)
28
29
30# Generate the work to be done (but don't do it yet).
31wf = lf.Flow('abc')
32wf.add(Echo('a'))
33wf.add(Echo('b'))
34wf.add(Echo('c'))
35
36# This will associate the listener with the engine (the listener
37# will automatically register for notifications with the engine and deregister
38# when the context is exited).
39e = engines.load(wf)
40with logging_listener.DynamicLoggingListener(e):
41    e.run()

Using listeners (to watch a phone call)

Note

Full source located at simple_linear_listening.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16from taskflow.types import notifier
17
18ANY = notifier.Notifier.ANY
19
20# INTRO: In this example we create two tasks (this time as functions instead
21# of task subclasses as in the simple_linear.py example), each of which ~calls~
22# a given ~phone~ number (provided as a function input) in a linear fashion
23# (one after the other).
24#
25# For a workflow which is serial this shows an extremely simple way
26# of structuring your tasks (the code that does the work) into a linear
27# sequence (the flow) and then passing the work off to an engine, with some
28# initial data to be ran in a reliable manner.
29#
30# This example shows a basic usage of the taskflow structures without involving
31# the complexity of persistence. Using the structures that taskflow provides
32# via tasks and flows makes it possible for you to easily at a later time
33# hook in a persistence layer (and then gain the functionality that offers)
34# when you decide the complexity of adding that layer in is 'worth it' for your
35# applications usage pattern (which some applications may not need).
36#
37# It **also** adds on to the simple_linear.py example by adding a set of
38# callback functions which the engine will call when a flow state transition
39# or task state transition occurs. These types of functions are useful for
40# updating task or flow progress, or for debugging, sending notifications to
41# external systems, or for other yet unknown future usage that you may create!
42
43
44def call_jim(context):
45    print("Calling jim.")
46    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
47
48
49def call_joe(context):
50    print("Calling joe.")
51    print("Context = %s" % (sorted(context.items(), key=lambda x: x[0])))
52
53
54def flow_watch(state, details):
55    print('Flow => %s' % state)
56
57
58def task_watch(state, details):
59    print('Task %s => %s' % (details.get('task_name'), state))
60
61
62# Wrap your functions into a task type that knows how to treat your functions
63# as tasks. There was previous work done to just allow a function to be
64# directly passed, but in python 3.0 there is no easy way to capture an
65# instance method, so this wrapping approach was decided upon instead which
66# can attach to instance methods (if that's desired).
67flow = lf.Flow("Call-them")
68flow.add(task.FunctorTask(execute=call_jim))
69flow.add(task.FunctorTask(execute=call_joe))
70
71# Now load (but do not run) the flow using the provided initial data.
72engine = taskflow.engines.load(flow, store={
73    'context': {
74        "joe_number": 444,
75        "jim_number": 555,
76    }
77})
78
79# This is where we attach our callback functions to the 2 different
80# notification objects that an engine exposes. The usage of a ANY (kleene star)
81# here means that we want to be notified on all state changes, if you want to
82# restrict to a specific state change, just register that instead.
83engine.notifier.register(ANY, flow_watch)
84engine.atom_notifier.register(ANY, task_watch)
85
86# And now run!
87engine.run()

Dumping a in-memory backend

Note

Full source located at dump_memory_backend.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8self_dir = os.path.abspath(os.path.dirname(__file__))
 9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
10                                       os.pardir,
11                                       os.pardir))
12sys.path.insert(0, top_dir)
13sys.path.insert(0, self_dir)
14
15from taskflow import engines
16from taskflow.patterns import linear_flow as lf
17from taskflow import task
18
19# INTRO: in this example we create a dummy flow with a dummy task, and run
20# it using a in-memory backend and pre/post run we dump out the contents
21# of the in-memory backends tree structure (which can be quite useful to
22# look at for debugging or other analysis).
23
24
25class PrintTask(task.Task):
26    def execute(self):
27        print("Running '%s'" % self.name)
28
29# Make a little flow and run it...
30f = lf.Flow('root')
31for alpha in ['a', 'b', 'c']:
32    f.add(PrintTask(alpha))
33
34e = engines.load(f)
35e.compile()
36e.prepare()
37
38# After prepare the storage layer + backend can now be accessed safely...
39backend = e.storage.backend
40
41print("----------")
42print("Before run")
43print("----------")
44print(backend.memory.pformat())
45print("----------")
46
47e.run()
48
49print("---------")
50print("After run")
51print("---------")
52for path in backend.memory.ls_r(backend.memory.root_path, absolute=True):
53    value = backend.memory[path]
54    if value:
55        print("%s -> %s" % (path, value))
56    else:
57        print("%s" % (path))

Making phone calls

Note

Full source located at simple_linear.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: In this example we create two tasks, each of which ~calls~ a given
18# ~phone~ number (provided as a function input) in a linear fashion (one after
19# the other). For a workflow which is serial this shows a extremely simple way
20# of structuring your tasks (the code that does the work) into a linear
21# sequence (the flow) and then passing the work off to an engine, with some
22# initial data to be ran in a reliable manner.
23#
24# NOTE(harlowja): This example shows a basic usage of the taskflow structures
25# without involving the complexity of persistence. Using the structures that
26# taskflow provides via tasks and flows makes it possible for you to easily at
27# a later time hook in a persistence layer (and then gain the functionality
28# that offers) when you decide the complexity of adding that layer in
29# is 'worth it' for your application's usage pattern (which certain
30# applications may not need).
31
32
33class CallJim(task.Task):
34    def execute(self, jim_number, *args, **kwargs):
35        print("Calling jim %s." % jim_number)
36
37
38class CallJoe(task.Task):
39    def execute(self, joe_number, *args, **kwargs):
40        print("Calling joe %s." % joe_number)
41
42
43# Create your flow and associated tasks (the work to be done).
44flow = lf.Flow('simple-linear').add(
45    CallJim(),
46    CallJoe()
47)
48
49# Now run that flow using the provided initial data (store below).
50taskflow.engines.run(flow, store=dict(joe_number=444,
51                                      jim_number=555))

Making phone calls (automatically reverting)

Note

Full source located at reverting_linear.

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import task
16
17# INTRO: In this example we create three tasks, each of which ~calls~ a given
18# number (provided as a function input), one of those tasks *fails* calling a
19# given number (the suzzie calling); this causes the workflow to enter the
20# reverting process, which activates the revert methods of the previous two
21# phone ~calls~.
22#
23# This simulated calling makes it appear like all three calls occur or all
24# three don't occur (transaction-like capabilities). No persistence layer is
25# used here so reverting and executing will *not* be tolerant of process
26# failure.
27
28
29class CallJim(task.Task):
30    def execute(self, jim_number, *args, **kwargs):
31        print("Calling jim %s." % jim_number)
32
33    def revert(self, jim_number, *args, **kwargs):
34        print("Calling %s and apologizing." % jim_number)
35
36
37class CallJoe(task.Task):
38    def execute(self, joe_number, *args, **kwargs):
39        print("Calling joe %s." % joe_number)
40
41    def revert(self, joe_number, *args, **kwargs):
42        print("Calling %s and apologizing." % joe_number)
43
44
45class CallSuzzie(task.Task):
46    def execute(self, suzzie_number, *args, **kwargs):
47        raise IOError("Suzzie not home right now.")
48
49
50# Create your flow and associated tasks (the work to be done).
51flow = lf.Flow('simple-linear').add(
52    CallJim(),
53    CallJoe(),
54    CallSuzzie()
55)
56
57try:
58    # Now run that flow using the provided initial data (store below).
59    taskflow.engines.run(flow, store=dict(joe_number=444,
60                                          jim_number=555,
61                                          suzzie_number=666))
62except Exception as e:
63    # NOTE(harlowja): This exception will be the exception that came out of the
64    # 'CallSuzzie' task instead of a different exception, this is useful since
65    # typically surrounding code wants to handle the original exception and not
66    # a wrapped or altered one.
67    #
68    # *WARNING* If this flow was multi-threaded and multiple active tasks threw
69    # exceptions then the above exception would be wrapped into a combined
70    # exception (the object has methods to iterate over the contained
71    # exceptions). See: exceptions.py and the class 'WrappedFailure' to look at
72    # how to deal with multiple tasks failing while running.
73    #
74    # You will also note that this is not a problem in this case since no
75    # parallelism is involved; this is ensured by the usage of a linear flow
76    # and the default engine type which is 'serial' vs being 'parallel'.
77    print("Flow failed: %s" % e)

Building a car

Note

Full source located at build_a_car.

  1
  2import logging
  3import os
  4import sys
  5
  6
  7logging.basicConfig(level=logging.ERROR)
  8
  9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 10                                       os.pardir,
 11                                       os.pardir))
 12sys.path.insert(0, top_dir)
 13
 14
 15import taskflow.engines
 16from taskflow.patterns import graph_flow as gf
 17from taskflow.patterns import linear_flow as lf
 18from taskflow import task
 19from taskflow.types import notifier
 20
 21ANY = notifier.Notifier.ANY
 22
 23import example_utils as eu  # noqa
 24
 25
 26# INTRO: This example shows how a graph flow and linear flow can be used
 27# together to execute dependent & non-dependent tasks by going through the
 28# steps required to build a simplistic car (an assembly line if you will). It
 29# also shows how raw functions can be wrapped into a task object instead of
 30# being forced to use the more *heavy* task base class. This is useful in
 31# scenarios where pre-existing code has functions that you easily want to
 32# plug-in to taskflow, without requiring a large amount of code changes.
 33
 34
 35def build_frame():
 36    return 'steel'
 37
 38
 39def build_engine():
 40    return 'honda'
 41
 42
 43def build_doors():
 44    return '2'
 45
 46
 47def build_wheels():
 48    return '4'
 49
 50
 51# These just return true to indiciate success, they would in the real work
 52# do more than just that.
 53
 54def install_engine(frame, engine):
 55    return True
 56
 57
 58def install_doors(frame, windows_installed, doors):
 59    return True
 60
 61
 62def install_windows(frame, doors):
 63    return True
 64
 65
 66def install_wheels(frame, engine, engine_installed, wheels):
 67    return True
 68
 69
 70def trash(**kwargs):
 71    eu.print_wrapped("Throwing away pieces of car!")
 72
 73
 74def startup(**kwargs):
 75    # If you want to see the rollback function being activated try uncommenting
 76    # the following line.
 77    #
 78    # raise ValueError("Car not verified")
 79    return True
 80
 81
 82def verify(spec, **kwargs):
 83    # If the car is not what we ordered throw away the car (trigger reversion).
 84    for key, value in kwargs.items():
 85        if spec[key] != value:
 86            raise Exception("Car doesn't match spec!")
 87    return True
 88
 89
 90# These two functions connect into the state transition notification emission
 91# points that the engine outputs, they can be used to log state transitions
 92# that are occurring, or they can be used to suspend the engine (or perform
 93# other useful activities).
 94def flow_watch(state, details):
 95    print('Flow => %s' % state)
 96
 97
 98def task_watch(state, details):
 99    print('Task %s => %s' % (details.get('task_name'), state))
100
101
102flow = lf.Flow("make-auto").add(
103    task.FunctorTask(startup, revert=trash, provides='ran'),
104    # A graph flow allows automatic dependency based ordering, the ordering
105    # is determined by analyzing the symbols required and provided and ordering
106    # execution based on a functioning order (if one exists).
107    gf.Flow("install-parts").add(
108        task.FunctorTask(build_frame, provides='frame'),
109        task.FunctorTask(build_engine, provides='engine'),
110        task.FunctorTask(build_doors, provides='doors'),
111        task.FunctorTask(build_wheels, provides='wheels'),
112        # These *_installed outputs allow for other tasks to depend on certain
113        # actions being performed (aka the components were installed), another
114        # way to do this is to link() the tasks manually instead of creating
115        # an 'artificial' data dependency that accomplishes the same goal the
116        # manual linking would result in.
117        task.FunctorTask(install_engine, provides='engine_installed'),
118        task.FunctorTask(install_doors, provides='doors_installed'),
119        task.FunctorTask(install_windows, provides='windows_installed'),
120        task.FunctorTask(install_wheels, provides='wheels_installed')),
121    task.FunctorTask(verify, requires=['frame',
122                                       'engine',
123                                       'doors',
124                                       'wheels',
125                                       'engine_installed',
126                                       'doors_installed',
127                                       'windows_installed',
128                                       'wheels_installed']))
129
130# This dictionary will be provided to the tasks as a specification for what
131# the tasks should produce, in this example this specification will influence
132# what those tasks do and what output they create. Different tasks depend on
133# different information from this specification, all of which will be provided
134# automatically by the engine to those tasks.
135spec = {
136    "frame": 'steel',
137    "engine": 'honda',
138    "doors": '2',
139    "wheels": '4',
140    # These are used to compare the result product, a car without the pieces
141    # installed is not a car after all.
142    "engine_installed": True,
143    "doors_installed": True,
144    "windows_installed": True,
145    "wheels_installed": True,
146}
147
148
149engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
150
151# This registers all (ANY) state transitions to trigger a call to the
152# flow_watch function for flow state transitions, and registers the
153# same all (ANY) state transitions for task state transitions.
154engine.notifier.register(ANY, flow_watch)
155engine.atom_notifier.register(ANY, task_watch)
156
157eu.print_wrapped("Building a car")
158engine.run()
159
160# Alter the specification and ensure that the reverting logic gets triggered
161# since the resultant car that will be built by the build_wheels function will
162# build a car with 4 doors only (not 5), this will cause the verification
163# task to mark the car that is produced as not matching the desired spec.
164spec['doors'] = 5
165
166engine = taskflow.engines.load(flow, store={'spec': spec.copy()})
167engine.notifier.register(ANY, flow_watch)
168engine.atom_notifier.register(ANY, task_watch)
169
170eu.print_wrapped("Building a wrong car that doesn't match specification")
171try:
172    engine.run()
173except Exception as e:
174    eu.print_wrapped("Flow failed: %s" % e)

Iterating over the alphabet (using processes)

Note

Full source located at alphabet_soup.

 1
 2import fractions
 3import functools
 4import logging
 5import os
 6import string
 7import sys
 8import time
 9
10logging.basicConfig(level=logging.ERROR)
11
12self_dir = os.path.abspath(os.path.dirname(__file__))
13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
14                                       os.pardir,
15                                       os.pardir))
16sys.path.insert(0, top_dir)
17sys.path.insert(0, self_dir)
18
19from taskflow import engines
20from taskflow import exceptions
21from taskflow.patterns import linear_flow
22from taskflow import task
23
24
25# In this example we show how a simple linear set of tasks can be executed
26# using local processes (and not threads or remote workers) with minimal (if
27# any) modification to those tasks to make them safe to run in this mode.
28#
29# This is useful since it allows further scaling up your workflows when thread
30# execution starts to become a bottleneck (which it can start to be due to the
31# GIL in python). It also offers a intermediary scalable runner that can be
32# used when the scale and/or setup of remote workers is not desirable.
33
34
35def progress_printer(task, event_type, details):
36    # This callback, attached to each task will be called in the local
37    # process (not the child processes)...
38    progress = details.pop('progress')
39    progress = int(progress * 100.0)
40    print("Task '%s' reached %d%% completion" % (task.name, progress))
41
42
43class AlphabetTask(task.Task):
44    # Second delay between each progress part.
45    _DELAY = 0.1
46
47    # This task will run in X main stages (each with a different progress
48    # report that will be delivered back to the running process...). The
49    # initial 0% and 100% are triggered automatically by the engine when
50    # a task is started and finished (so that's why those are not emitted
51    # here).
52    _PROGRESS_PARTS = [fractions.Fraction("%s/5" % x) for x in range(1, 5)]
53
54    def execute(self):
55        for p in self._PROGRESS_PARTS:
56            self.update_progress(p)
57            time.sleep(self._DELAY)
58
59
60print("Constructing...")
61soup = linear_flow.Flow("alphabet-soup")
62for letter in string.ascii_lowercase:
63    abc = AlphabetTask(letter)
64    abc.notifier.register(task.EVENT_UPDATE_PROGRESS,
65                          functools.partial(progress_printer, abc))
66    soup.add(abc)
67try:
68    print("Loading...")
69    e = engines.load(soup, engine='parallel', executor='processes')
70    print("Compiling...")
71    e.compile()
72    print("Preparing...")
73    e.prepare()
74    print("Running...")
75    e.run()
76    print("Done: %s" % e.statistics)
77except exceptions.NotImplementedError as e:
78    print(e)

Watching execution timing

Note

Full source located at timing_listener.

 1
 2import logging
 3import os
 4import random
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11                                       os.pardir,
12                                       os.pardir))
13sys.path.insert(0, top_dir)
14
15from taskflow import engines
16from taskflow.listeners import timing
17from taskflow.patterns import linear_flow as lf
18from taskflow import task
19
20# INTRO: in this example we will attach a listener to an engine
21# and have variable run time tasks run and show how the listener will print
22# out how long those tasks took (when they started and when they finished).
23#
24# This shows how timing metrics can be gathered (or attached onto an engine)
25# after a workflow has been constructed, making it easy to gather metrics
26# dynamically for situations where this kind of information is applicable (or
27# even adding this information on at a later point in the future when your
28# application starts to slow down).
29
30
31class VariableTask(task.Task):
32    def __init__(self, name):
33        super(VariableTask, self).__init__(name)
34        self._sleepy_time = random.random()
35
36    def execute(self):
37        time.sleep(self._sleepy_time)
38
39
40f = lf.Flow('root')
41f.add(VariableTask('a'), VariableTask('b'), VariableTask('c'))
42e = engines.load(f)
43with timing.PrintingDurationListener(e):
44    e.run()

Distance calculator

Note

Full source located at distance_calculator

 1
 2import collections
 3import math
 4import os
 5import sys
 6
 7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 8                                       os.pardir,
 9                                       os.pardir))
10sys.path.insert(0, top_dir)
11
12from taskflow import engines
13from taskflow.patterns import linear_flow
14from taskflow import task
15
16# INTRO: This shows how to use a tasks/atoms ability to take requirements from
17# its execute functions default parameters and shows how to provide those
18# via different methods when needed, to influence those parameters to in
19# this case calculate the distance between two points in 2D space.
20
21# A 2D point.
22Point = collections.namedtuple("Point", "x,y")
23
24
25def is_near(val, expected, tolerance=0.001):
26    # Floats don't really provide equality...
27    if val > (expected + tolerance):
28        return False
29    if val < (expected - tolerance):
30        return False
31    return True
32
33
34class DistanceTask(task.Task):
35    # See: http://en.wikipedia.org/wiki/Distance#Distance_in_Euclidean_space
36
37    default_provides = 'distance'
38
39    def execute(self, a=Point(0, 0), b=Point(0, 0)):
40        return math.sqrt(math.pow(b.x - a.x, 2) + math.pow(b.y - a.y, 2))
41
42
43if __name__ == '__main__':
44    # For these we rely on the execute() methods points by default being
45    # at the origin (and we override it with store values when we want) at
46    # execution time (which then influences what is calculated).
47    any_distance = linear_flow.Flow("origin").add(DistanceTask())
48    results = engines.run(any_distance)
49    print(results)
50    print("%s is near-enough to %s: %s" % (results['distance'],
51                                           0.0,
52                                           is_near(results['distance'], 0.0)))
53
54    results = engines.run(any_distance, store={'a': Point(1, 1)})
55    print(results)
56    print("%s is near-enough to %s: %s" % (results['distance'],
57                                           1.4142,
58                                           is_near(results['distance'],
59                                                   1.4142)))
60
61    results = engines.run(any_distance, store={'a': Point(10, 10)})
62    print(results)
63    print("%s is near-enough to %s: %s" % (results['distance'],
64                                           14.14199,
65                                           is_near(results['distance'],
66                                                   14.14199)))
67
68    results = engines.run(any_distance,
69                          store={'a': Point(5, 5), 'b': Point(10, 10)})
70    print(results)
71    print("%s is near-enough to %s: %s" % (results['distance'],
72                                           7.07106,
73                                           is_near(results['distance'],
74                                                   7.07106)))
75
76    # For this we use the ability to override at task creation time the
77    # optional arguments so that we don't need to continue to send them
78    # in via the 'store' argument like in the above (and we fix the new
79    # starting point 'a' at (10, 10) instead of (0, 0)...
80
81    ten_distance = linear_flow.Flow("ten")
82    ten_distance.add(DistanceTask(inject={'a': Point(10, 10)}))
83    results = engines.run(ten_distance, store={'b': Point(10, 10)})
84    print(results)
85    print("%s is near-enough to %s: %s" % (results['distance'],
86                                           0.0,
87                                           is_near(results['distance'], 0.0)))
88
89    results = engines.run(ten_distance)
90    print(results)
91    print("%s is near-enough to %s: %s" % (results['distance'],
92                                           14.14199,
93                                           is_near(results['distance'],
94                                                   14.14199)))

Table multiplier (in parallel)

Note

Full source located at parallel_table_multiply

  1
  2import csv
  3import logging
  4import os
  5import random
  6import sys
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14
 15import futurist
 16from six.moves import range as compat_range
 17
 18from taskflow import engines
 19from taskflow.patterns import unordered_flow as uf
 20from taskflow import task
 21
 22# INTRO: This example walks through a miniature workflow which does a parallel
 23# table modification where each row in the table gets adjusted by a thread, or
 24# green thread (if eventlet is available) in parallel and then the result
 25# is reformed into a new table and some verifications are performed on it
 26# to ensure everything went as expected.
 27
 28
 29MULTIPLER = 10
 30
 31
 32class RowMultiplier(task.Task):
 33    """Performs a modification of an input row, creating a output row."""
 34
 35    def __init__(self, name, index, row, multiplier):
 36        super(RowMultiplier, self).__init__(name=name)
 37        self.index = index
 38        self.multiplier = multiplier
 39        self.row = row
 40
 41    def execute(self):
 42        return [r * self.multiplier for r in self.row]
 43
 44
 45def make_flow(table):
 46    # This creation will allow for parallel computation (since the flow here
 47    # is specifically unordered; and when things are unordered they have
 48    # no dependencies and when things have no dependencies they can just be
 49    # ran at the same time, limited in concurrency by the executor or max
 50    # workers of that executor...)
 51    f = uf.Flow("root")
 52    for i, row in enumerate(table):
 53        f.add(RowMultiplier("m-%s" % i, i, row, MULTIPLER))
 54    # NOTE(harlowja): at this point nothing has ran, the above is just
 55    # defining what should be done (but not actually doing it) and associating
 56    # an ordering dependencies that should be enforced (the flow pattern used
 57    # forces this), the engine in the later main() function will actually
 58    # perform this work...
 59    return f
 60
 61
 62def main():
 63    if len(sys.argv) == 2:
 64        tbl = []
 65        with open(sys.argv[1], 'rb') as fh:
 66            reader = csv.reader(fh)
 67            for row in reader:
 68                tbl.append([float(r) if r else 0.0 for r in row])
 69    else:
 70        # Make some random table out of thin air...
 71        tbl = []
 72        cols = random.randint(1, 100)
 73        rows = random.randint(1, 100)
 74        for _i in compat_range(0, rows):
 75            row = []
 76            for _j in compat_range(0, cols):
 77                row.append(random.random())
 78            tbl.append(row)
 79
 80    # Generate the work to be done.
 81    f = make_flow(tbl)
 82
 83    # Now run it (using the specified executor)...
 84    try:
 85        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
 86    except RuntimeError:
 87        # No eventlet currently active, use real threads instead.
 88        executor = futurist.ThreadPoolExecutor(max_workers=5)
 89    try:
 90        e = engines.load(f, engine='parallel', executor=executor)
 91        for st in e.run_iter():
 92            print(st)
 93    finally:
 94        executor.shutdown()
 95
 96    # Find the old rows and put them into place...
 97    #
 98    # TODO(harlowja): probably easier just to sort instead of search...
 99    computed_tbl = []
100    for i in compat_range(0, len(tbl)):
101        for t in f:
102            if t.index == i:
103                computed_tbl.append(e.storage.get(t.name))
104
105    # Do some basic validation (which causes the return code of this process
106    # to be different if things were not as expected...)
107    if len(computed_tbl) != len(tbl):
108        return 1
109    else:
110        return 0
111
112
113if __name__ == "__main__":
114    sys.exit(main())

Linear equation solver (explicit dependencies)

Note

Full source located at calculate_linear.

  1
  2import logging
  3import os
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13import taskflow.engines
 14from taskflow.patterns import linear_flow as lf
 15from taskflow import task
 16
 17
 18# INTRO: In this example a linear flow is used to group four tasks to calculate
 19# a value. A single added task is used twice, showing how this can be done
 20# and the twice added task takes in different bound values. In the first case
 21# it uses default parameters ('x' and 'y') and in the second case arguments
 22# are bound with ('z', 'd') keys from the engines internal storage mechanism.
 23#
 24# A multiplier task uses a binding that another task also provides, but this
 25# example explicitly shows that 'z' parameter is bound with 'a' key
 26# This shows that if a task depends on a key named the same as a key provided
 27# from another task the name can be remapped to take the desired key from a
 28# different origin.
 29
 30
 31# This task provides some values from as a result of execution, this can be
 32# useful when you want to provide values from a static set to other tasks that
 33# depend on those values existing before those tasks can run.
 34#
 35# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
 36# that just provides those values on engine running by prepopulating the
 37# storage backend before your tasks are ran (which accomplishes a similar goal
 38# in a more uniform manner).
 39class Provider(task.Task):
 40
 41    def __init__(self, name, *args, **kwargs):
 42        super(Provider, self).__init__(name=name, **kwargs)
 43        self._provide = args
 44
 45    def execute(self):
 46        return self._provide
 47
 48
 49# This task adds two input variables and returns the result.
 50#
 51# Note that since this task does not have a revert() function (since addition
 52# is a stateless operation) there are no side-effects that this function needs
 53# to undo if some later operation fails.
 54class Adder(task.Task):
 55    def execute(self, x, y):
 56        return x + y
 57
 58
 59# This task multiplies an input variable by a multiplier and returns the
 60# result.
 61#
 62# Note that since this task does not have a revert() function (since
 63# multiplication is a stateless operation) and there are no side-effects that
 64# this function needs to undo if some later operation fails.
 65class Multiplier(task.Task):
 66    def __init__(self, name, multiplier, provides=None, rebind=None):
 67        super(Multiplier, self).__init__(name=name, provides=provides,
 68                                         rebind=rebind)
 69        self._multiplier = multiplier
 70
 71    def execute(self, z):
 72        return z * self._multiplier
 73
 74
 75# Note here that the ordering is established so that the correct sequences
 76# of operations occurs where the adding and multiplying is done according
 77# to the expected and typical mathematical model. A graph flow could also be
 78# used here to automatically infer & ensure the correct ordering.
 79flow = lf.Flow('root').add(
 80    # Provide the initial values for other tasks to depend on.
 81    #
 82    # x = 2, y = 3, d = 5
 83    Provider("provide-adder", 2, 3, 5, provides=('x', 'y', 'd')),
 84    # z = x+y = 5
 85    Adder("add-1", provides='z'),
 86    # a = z+d = 10
 87    Adder("add-2", provides='a', rebind=['z', 'd']),
 88    # Calculate 'r = a*3 = 30'
 89    #
 90    # Note here that the 'z' argument of the execute() function will not be
 91    # bound to the 'z' variable provided from the above 'provider' object but
 92    # instead the 'z' argument will be taken from the 'a' variable provided
 93    # by the second add-2 listed above.
 94    Multiplier("multi", 3, provides='r', rebind={'z': 'a'})
 95)
 96
 97# The result here will be all results (from all tasks) which is stored in an
 98# in-memory storage location that backs this engine since it is not configured
 99# with persistence storage.
100results = taskflow.engines.run(flow)
101print(results)

Linear equation solver (inferred dependencies)

Source: graph_flow.py

  1
  2import logging
  3import os
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13import taskflow.engines
 14from taskflow.patterns import graph_flow as gf
 15from taskflow.patterns import linear_flow as lf
 16from taskflow import task
 17
 18
 19# In this example there are complex *inferred* dependencies between tasks that
 20# are used to perform a simple set of linear equations.
 21#
 22# As you will see below the tasks just define what they require as input
 23# and produce as output (named values). Then the user doesn't care about
 24# ordering the tasks (in this case the tasks calculate pieces of the overall
 25# equation).
 26#
 27# As you will notice a graph flow resolves dependencies automatically using the
 28# tasks symbol requirements and provided symbol values and no orderin
 29# dependency has to be manually created.
 30#
 31# Also notice that flows of any types can be nested into a graph flow; showing
 32# that subflow dependencies (and associated ordering) will be inferred too.
 33
 34
 35class Adder(task.Task):
 36
 37    def execute(self, x, y):
 38        return x + y
 39
 40
 41flow = gf.Flow('root').add(
 42    lf.Flow('nested_linear').add(
 43        # x2 = y3+y4 = 12
 44        Adder("add2", provides='x2', rebind=['y3', 'y4']),
 45        # x1 = y1+y2 = 4
 46        Adder("add1", provides='x1', rebind=['y1', 'y2'])
 47    ),
 48    # x5 = x1+x3 = 20
 49    Adder("add5", provides='x5', rebind=['x1', 'x3']),
 50    # x3 = x1+x2 = 16
 51    Adder("add3", provides='x3', rebind=['x1', 'x2']),
 52    # x4 = x2+y5 = 21
 53    Adder("add4", provides='x4', rebind=['x2', 'y5']),
 54    # x6 = x5+x4 = 41
 55    Adder("add6", provides='x6', rebind=['x5', 'x4']),
 56    # x7 = x6+x6 = 82
 57    Adder("add7", provides='x7', rebind=['x6', 'x6']))
 58
 59# Provide the initial variable inputs using a storage dictionary.
 60store = {
 61    "y1": 1,
 62    "y2": 3,
 63    "y3": 5,
 64    "y4": 7,
 65    "y5": 9,
 66}
 67
 68# This is the expected values that should be created.
 69unexpected = 0
 70expected = [
 71    ('x1', 4),
 72    ('x2', 12),
 73    ('x3', 16),
 74    ('x4', 21),
 75    ('x5', 20),
 76    ('x6', 41),
 77    ('x7', 82),
 78]
 79
 80result = taskflow.engines.run(
 81    flow, engine='serial', store=store)
 82
 83print("Single threaded engine result %s" % result)
 84for (name, value) in expected:
 85    actual = result.get(name)
 86    if actual != value:
 87        sys.stderr.write("%s != %s\n" % (actual, value))
 88        unexpected += 1
 89
 90result = taskflow.engines.run(
 91    flow, engine='parallel', store=store)
 92
 93print("Multi threaded engine result %s" % result)
 94for (name, value) in expected:
 95    actual = result.get(name)
 96    if actual != value:
 97        sys.stderr.write("%s != %s\n" % (actual, value))
 98        unexpected += 1
 99
100if unexpected:
101    sys.exit(1)

Linear equation solver (in parallel)

Note

Full source located at calculate_in_parallel

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow.patterns import unordered_flow as uf
16from taskflow import task
17
18# INTRO: These examples show how a linear flow and an unordered flow can be
19# used together to execute calculations in parallel and then use the
20# result for the next task/s. The adder task is used for all calculations
21# and argument bindings are used to set correct parameters for each task.
22
23
24# This task provides some values from as a result of execution, this can be
25# useful when you want to provide values from a static set to other tasks that
26# depend on those values existing before those tasks can run.
27#
28# NOTE(harlowja): this usage is *depreciated* in favor of a simpler mechanism
29# that provides those values on engine running by prepopulating the storage
30# backend before your tasks are ran (which accomplishes a similar goal in a
31# more uniform manner).
32class Provider(task.Task):
33    def __init__(self, name, *args, **kwargs):
34        super(Provider, self).__init__(name=name, **kwargs)
35        self._provide = args
36
37    def execute(self):
38        return self._provide
39
40
41# This task adds two input variables and returns the result of that addition.
42#
43# Note that since this task does not have a revert() function (since addition
44# is a stateless operation) there are no side-effects that this function needs
45# to undo if some later operation fails.
46class Adder(task.Task):
47    def execute(self, x, y):
48        return x + y
49
50
51flow = lf.Flow('root').add(
52    # Provide the initial values for other tasks to depend on.
53    #
54    # x1 = 2, y1 = 3, x2 = 5, x3 = 8
55    Provider("provide-adder", 2, 3, 5, 8,
56             provides=('x1', 'y1', 'x2', 'y2')),
57    # Note here that we define the flow that contains the 2 adders to be an
58    # unordered flow since the order in which these execute does not matter,
59    # another way to solve this would be to use a graph_flow pattern, which
60    # also can run in parallel (since they have no ordering dependencies).
61    uf.Flow('adders').add(
62        # Calculate 'z1 = x1+y1 = 5'
63        #
64        # Rebind here means that the execute() function x argument will be
65        # satisfied from a previous output named 'x1', and the y argument
66        # of execute() will be populated from the previous output named 'y1'
67        #
68        # The output (result of adding) will be mapped into a variable named
69        # 'z1' which can then be refereed to and depended on by other tasks.
70        Adder(name="add", provides='z1', rebind=['x1', 'y1']),
71        # z2 = x2+y2 = 13
72        Adder(name="add-2", provides='z2', rebind=['x2', 'y2']),
73    ),
74    # r = z1+z2 = 18
75    Adder(name="sum-1", provides='r', rebind=['z1', 'z2']))
76
77
78# The result here will be all results (from all tasks) which is stored in an
79# in-memory storage location that backs this engine since it is not configured
80# with persistence storage.
81result = taskflow.engines.run(flow, engine='parallel')
82print(result)

Creating a volume (in parallel)

Note

Full source located at create_parallel_volume

 1
 2import contextlib
 3import logging
 4import os
 5import random
 6import sys
 7import time
 8
 9logging.basicConfig(level=logging.ERROR)
10
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                       os.pardir,
13                                       os.pardir))
14sys.path.insert(0, top_dir)
15
16from oslo_utils import reflection
17
18from taskflow import engines
19from taskflow.listeners import printing
20from taskflow.patterns import unordered_flow as uf
21from taskflow import task
22
23# INTRO: These examples show how unordered_flow can be used to create a large
24# number of fake volumes in parallel (or serially, depending on a constant that
25# can be easily changed).
26
27
28@contextlib.contextmanager
29def show_time(name):
30    start = time.time()
31    yield
32    end = time.time()
33    print(" -- %s took %0.3f seconds" % (name, end - start))
34
35
36# This affects how many volumes to create and how much time to *simulate*
37# passing for that volume to be created.
38MAX_CREATE_TIME = 3
39VOLUME_COUNT = 5
40
41# This will be used to determine if all the volumes are created in parallel
42# or whether the volumes are created serially (in an undefined ordered since
43# a unordered flow is used). Note that there is a disconnection between the
44# ordering and the concept of parallelism (since unordered items can still be
45# ran in a serial ordering). A typical use-case for offering both is to allow
46# for debugging using a serial approach, while when running at a larger scale
47# one would likely want to use the parallel approach.
48#
49# If you switch this flag from serial to parallel you can see the overall
50# time difference that this causes.
51SERIAL = False
52if SERIAL:
53    engine = 'serial'
54else:
55    engine = 'parallel'
56
57
58class VolumeCreator(task.Task):
59    def __init__(self, volume_id):
60        # Note here that the volume name is composed of the name of the class
61        # along with the volume id that is being created, since a name of a
62        # task uniquely identifies that task in storage it is important that
63        # the name be relevant and identifiable if the task is recreated for
64        # subsequent resumption (if applicable).
65        #
66        # UUIDs are *not* used as they can not be tied back to a previous tasks
67        # state on resumption (since they are unique and will vary for each
68        # task that is created). A name based off the volume id that is to be
69        # created is more easily tied back to the original task so that the
70        # volume create can be resumed/revert, and is much easier to use for
71        # audit and tracking purposes.
72        base_name = reflection.get_callable_name(self)
73        super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
74                                                            volume_id))
75        self._volume_id = volume_id
76
77    def execute(self):
78        print("Making volume %s" % (self._volume_id))
79        time.sleep(random.random() * MAX_CREATE_TIME)
80        print("Finished making volume %s" % (self._volume_id))
81
82
83# Assume there is no ordering dependency between volumes.
84flow = uf.Flow("volume-maker")
85for i in range(0, VOLUME_COUNT):
86    flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
87
88
89# Show how much time the overall engine loading and running takes.
90with show_time(name=flow.name.title()):
91    eng = engines.load(flow, engine=engine)
92    # This context manager automatically adds (and automatically removes) a
93    # helpful set of state transition notification printing helper utilities
94    # that show you exactly what transitions the engine is going through
95    # while running the various volume create tasks.
96    with printing.PrintingListener(eng):
97        eng.run()

Summation mapper(s) and reducer (in parallel)

Note

Full source located at simple_map_reduce

  1
  2import logging
  3import os
  4import sys
  5
  6logging.basicConfig(level=logging.ERROR)
  7
  8self_dir = os.path.abspath(os.path.dirname(__file__))
  9top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 10                                       os.pardir,
 11                                       os.pardir))
 12sys.path.insert(0, top_dir)
 13sys.path.insert(0, self_dir)
 14
 15# INTRO: These examples show a simplistic map/reduce implementation where
 16# a set of mapper(s) will sum a series of input numbers (in parallel) and
 17# return their individual summed result. A reducer will then use those
 18# produced values and perform a final summation and this result will then be
 19# printed (and verified to ensure the calculation was as expected).
 20
 21import six
 22
 23from taskflow import engines
 24from taskflow.patterns import linear_flow
 25from taskflow.patterns import unordered_flow
 26from taskflow import task
 27
 28
 29class SumMapper(task.Task):
 30    def execute(self, inputs):
 31        # Sums some set of provided inputs.
 32        return sum(inputs)
 33
 34
 35class TotalReducer(task.Task):
 36    def execute(self, *args, **kwargs):
 37        # Reduces all mapped summed outputs into a single value.
 38        total = 0
 39        for (k, v) in six.iteritems(kwargs):
 40            # If any other kwargs was passed in, we don't want to use those
 41            # in the calculation of the total...
 42            if k.startswith('reduction_'):
 43                total += v
 44        return total
 45
 46
 47def chunk_iter(chunk_size, upperbound):
 48    """Yields back chunk size pieces from zero to upperbound - 1."""
 49    chunk = []
 50    for i in range(0, upperbound):
 51        chunk.append(i)
 52        if len(chunk) == chunk_size:
 53            yield chunk
 54            chunk = []
 55
 56
 57# Upper bound of numbers to sum for example purposes...
 58UPPER_BOUND = 10000
 59
 60# How many mappers we want to have.
 61SPLIT = 10
 62
 63# How big of a chunk we want to give each mapper.
 64CHUNK_SIZE = UPPER_BOUND // SPLIT
 65
 66# This will be the workflow we will compose and run.
 67w = linear_flow.Flow("root")
 68
 69# The mappers will run in parallel.
 70store = {}
 71provided = []
 72mappers = unordered_flow.Flow('map')
 73for i, chunk in enumerate(chunk_iter(CHUNK_SIZE, UPPER_BOUND)):
 74    mapper_name = 'mapper_%s' % i
 75    # Give that mapper some information to compute.
 76    store[mapper_name] = chunk
 77    # The reducer uses all of the outputs of the mappers, so it needs
 78    # to be recorded that it needs access to them (under a specific name).
 79    provided.append("reduction_%s" % i)
 80    mappers.add(SumMapper(name=mapper_name,
 81                          rebind={'inputs': mapper_name},
 82                          provides=provided[-1]))
 83w.add(mappers)
 84
 85# The reducer will run last (after all the mappers).
 86w.add(TotalReducer('reducer', requires=provided))
 87
 88# Now go!
 89e = engines.load(w, engine='parallel', store=store, max_workers=4)
 90print("Running a parallel engine with options: %s" % e.options)
 91e.run()
 92
 93# Now get the result the reducer created.
 94total = e.storage.get('reducer')
 95print("Calculated result = %s" % total)
 96
 97# Calculate it manually to verify that it worked...
 98calc_total = sum(range(0, UPPER_BOUND))
 99if calc_total != total:
100    sys.exit(1)

Sharing a thread pool executor (in parallel)

Note

Full source located at share_engine_thread

 1
 2import logging
 3import os
 4import random
 5import sys
 6import time
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
11                                       os.pardir,
12                                       os.pardir))
13sys.path.insert(0, top_dir)
14
15import futurist
16import six
17
18from taskflow import engines
19from taskflow.patterns import unordered_flow as uf
20from taskflow import task
21from taskflow.utils import threading_utils as tu
22
23# INTRO: in this example we create 2 dummy flow(s) with a 2 dummy task(s), and
24# run it using a shared thread pool executor to show how a single executor can
25# be used with more than one engine (sharing the execution thread pool between
26# them); this allows for saving resources and reusing threads in situations
27# where this is benefical.
28
29
30class DelayedTask(task.Task):
31    def __init__(self, name):
32        super(DelayedTask, self).__init__(name=name)
33        self._wait_for = random.random()
34
35    def execute(self):
36        print("Running '%s' in thread '%s'" % (self.name, tu.get_ident()))
37        time.sleep(self._wait_for)
38
39
40f1 = uf.Flow("f1")
41f1.add(DelayedTask("f1-1"))
42f1.add(DelayedTask("f1-2"))
43
44f2 = uf.Flow("f2")
45f2.add(DelayedTask("f2-1"))
46f2.add(DelayedTask("f2-2"))
47
48# Run them all using the same futures (thread-pool based) executor...
49with futurist.ThreadPoolExecutor() as ex:
50    e1 = engines.load(f1, engine='parallel', executor=ex)
51    e2 = engines.load(f2, engine='parallel', executor=ex)
52    iters = [e1.run_iter(), e2.run_iter()]
53    # Iterate over a copy (so we can remove from the source list).
54    cloned_iters = list(iters)
55    while iters:
56        # Run a single 'step' of each iterator, forcing each engine to perform
57        # some work, then yield, and repeat until each iterator is consumed
58        # and there is no more engine work to be done.
59        for it in cloned_iters:
60            try:
61                six.next(it)
62            except StopIteration:
63                try:
64                    iters.remove(it)
65                except ValueError:
66                    pass

Storing & emitting a bill

Note

Full source located at fake_billing

  1
  2import json
  3import logging
  4import os
  5import sys
  6import time
  7
  8logging.basicConfig(level=logging.ERROR)
  9
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14
 15from oslo_utils import uuidutils
 16
 17from taskflow import engines
 18from taskflow.listeners import printing
 19from taskflow.patterns import graph_flow as gf
 20from taskflow.patterns import linear_flow as lf
 21from taskflow import task
 22from taskflow.utils import misc
 23
 24# INTRO: This example walks through a miniature workflow which simulates
 25# the reception of an API request, creation of a database entry, driver
 26# activation (which invokes a 'fake' webservice) and final completion.
 27#
 28# This example also shows how a function/object (in this class the url sending)
 29# that occurs during driver activation can update the progress of a task
 30# without being aware of the internals of how to do this by associating a
 31# callback that the url sending can update as the sending progresses from 0.0%
 32# complete to 100% complete.
 33
 34
 35class DB(object):
 36    def query(self, sql):
 37        print("Querying with: %s" % (sql))
 38
 39
 40class UrlCaller(object):
 41    def __init__(self):
 42        self._send_time = 0.5
 43        self._chunks = 25
 44
 45    def send(self, url, data, status_cb=None):
 46        sleep_time = float(self._send_time) / self._chunks
 47        for i in range(0, len(data)):
 48            time.sleep(sleep_time)
 49            # As we send the data, each chunk we 'fake' send will progress
 50            # the sending progress that much further to 100%.
 51            if status_cb:
 52                status_cb(float(i) / len(data))
 53
 54
 55# Since engines save the output of tasks to a optional persistent storage
 56# backend resources have to be dealt with in a slightly different manner since
 57# resources are transient and can *not* be persisted (or serialized). For tasks
 58# that require access to a set of resources it is a common pattern to provide
 59# a object (in this case this object) on construction of those tasks via the
 60# task constructor.
 61class ResourceFetcher(object):
 62    def __init__(self):
 63        self._db_handle = None
 64        self._url_handle = None
 65
 66    @property
 67    def db_handle(self):
 68        if self._db_handle is None:
 69            self._db_handle = DB()
 70        return self._db_handle
 71
 72    @property
 73    def url_handle(self):
 74        if self._url_handle is None:
 75            self._url_handle = UrlCaller()
 76        return self._url_handle
 77
 78
 79class ExtractInputRequest(task.Task):
 80    def __init__(self, resources):
 81        super(ExtractInputRequest, self).__init__(provides="parsed_request")
 82        self._resources = resources
 83
 84    def execute(self, request):
 85        return {
 86            'user': request.user,
 87            'user_id': misc.as_int(request.id),
 88            'request_id': uuidutils.generate_uuid(),
 89        }
 90
 91
 92class MakeDBEntry(task.Task):
 93    def __init__(self, resources):
 94        super(MakeDBEntry, self).__init__()
 95        self._resources = resources
 96
 97    def execute(self, parsed_request):
 98        db_handle = self._resources.db_handle
 99        db_handle.query("INSERT %s INTO mydb" % (parsed_request))
100
101    def revert(self, result, parsed_request):
102        db_handle = self._resources.db_handle
103        db_handle.query("DELETE %s FROM mydb IF EXISTS" % (parsed_request))
104
105
106class ActivateDriver(task.Task):
107    def __init__(self, resources):
108        super(ActivateDriver, self).__init__(provides='sent_to')
109        self._resources = resources
110        self._url = "http://blahblah.com"
111
112    def execute(self, parsed_request):
113        print("Sending billing data to %s" % (self._url))
114        url_sender = self._resources.url_handle
115        # Note that here we attach our update_progress function (which is a
116        # function that the engine also 'binds' to) to the progress function
117        # that the url sending helper class uses. This allows the task progress
118        # to be tied to the url sending progress, which is very useful for
119        # downstream systems to be aware of what a task is doing at any time.
120        url_sender.send(self._url, json.dumps(parsed_request),
121                        status_cb=self.update_progress)
122        return self._url
123
124    def update_progress(self, progress, **kwargs):
125        # Override the parent method to also print out the status.
126        super(ActivateDriver, self).update_progress(progress, **kwargs)
127        print("%s is %0.2f%% done" % (self.name, progress * 100))
128
129
130class DeclareSuccess(task.Task):
131    def execute(self, sent_to):
132        print("Done!")
133        print("All data processed and sent to %s" % (sent_to))
134
135
136class DummyUser(object):
137    def __init__(self, user, id_):
138        self.user = user
139        self.id = id_
140
141
142# Resources (db handles and similar) of course can *not* be persisted so we
143# need to make sure that we pass this resource fetcher to the tasks constructor
144# so that the tasks have access to any needed resources (the resources are
145# lazily loaded so that they are only created when they are used).
146resources = ResourceFetcher()
147flow = lf.Flow("initialize-me")
148
149# 1. First we extract the api request into a usable format.
150# 2. Then we go ahead and make a database entry for our request.
151flow.add(ExtractInputRequest(resources), MakeDBEntry(resources))
152
153# 3. Then we activate our payment method and finally declare success.
154sub_flow = gf.Flow("after-initialize")
155sub_flow.add(ActivateDriver(resources), DeclareSuccess())
156flow.add(sub_flow)
157
158# Initially populate the storage with the following request object,
159# prepopulating this allows the tasks that dependent on the 'request' variable
160# to start processing (in this case this is the ExtractInputRequest task).
161store = {
162    'request': DummyUser(user="bob", id_="1.35"),
163}
164eng = engines.load(flow, engine='serial', store=store)
165
166# This context manager automatically adds (and automatically removes) a
167# helpful set of state transition notification printing helper utilities
168# that show you exactly what transitions the engine is going through
169# while running the various billing related tasks.
170with printing.PrintingListener(eng):
171    eng.run()

Suspending a workflow & resuming

Note

Full source located at resume_from_backend

  1
  2import contextlib
  3import logging
  4import os
  5import sys
  6
  7logging.basicConfig(level=logging.ERROR)
  8
  9self_dir = os.path.abspath(os.path.dirname(__file__))
 10top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 11                                       os.pardir,
 12                                       os.pardir))
 13sys.path.insert(0, top_dir)
 14sys.path.insert(0, self_dir)
 15
 16from oslo_utils import uuidutils
 17
 18import taskflow.engines
 19from taskflow.patterns import linear_flow as lf
 20from taskflow.persistence import models
 21from taskflow import task
 22
 23import example_utils as eu  # noqa
 24
 25# INTRO: In this example linear_flow is used to group three tasks, one which
 26# will suspend the future work the engine may do. This suspend engine is then
 27# discarded and the workflow is reloaded from the persisted data and then the
 28# workflow is resumed from where it was suspended. This allows you to see how
 29# to start an engine, have a task stop the engine from doing future work (if
 30# a multi-threaded engine is being used, then the currently active work is not
 31# preempted) and then resume the work later.
 32#
 33# Usage:
 34#
 35#   With a filesystem directory as backend
 36#
 37#     python taskflow/examples/resume_from_backend.py
 38#
 39#   With ZooKeeper as backend
 40#
 41#     python taskflow/examples/resume_from_backend.py \
 42#       zookeeper://127.0.0.1:2181/taskflow/resume_from_backend/
 43
 44
 45# UTILITY FUNCTIONS #########################################
 46
 47
 48def print_task_states(flowdetail, msg):
 49    eu.print_wrapped(msg)
 50    print("Flow '%s' state: %s" % (flowdetail.name, flowdetail.state))
 51    # Sort by these so that our test validation doesn't get confused by the
 52    # order in which the items in the flow detail can be in.
 53    items = sorted((td.name, td.version, td.state, td.results)
 54                   for td in flowdetail)
 55    for item in items:
 56        print(" %s==%s: %s, result=%s" % item)
 57
 58
 59def find_flow_detail(backend, lb_id, fd_id):
 60    conn = backend.get_connection()
 61    lb = conn.get_logbook(lb_id)
 62    return lb.find(fd_id)
 63
 64
 65# CREATE FLOW ###############################################
 66
 67
 68class InterruptTask(task.Task):
 69    def execute(self):
 70        # DO NOT TRY THIS AT HOME
 71        engine.suspend()
 72
 73
 74class TestTask(task.Task):
 75    def execute(self):
 76        print('executing %s' % self)
 77        return 'ok'
 78
 79
 80def flow_factory():
 81    return lf.Flow('resume from backend example').add(
 82        TestTask(name='first'),
 83        InterruptTask(name='boom'),
 84        TestTask(name='second'))
 85
 86
 87# INITIALIZE PERSISTENCE ####################################
 88
 89with eu.get_backend() as backend:
 90
 91    # Create a place where the persistence information will be stored.
 92    book = models.LogBook("example")
 93    flow_detail = models.FlowDetail("resume from backend example",
 94                                    uuid=uuidutils.generate_uuid())
 95    book.add(flow_detail)
 96    with contextlib.closing(backend.get_connection()) as conn:
 97        conn.save_logbook(book)
 98
 99    # CREATE AND RUN THE FLOW: FIRST ATTEMPT ####################
100
101    flow = flow_factory()
102    engine = taskflow.engines.load(flow, flow_detail=flow_detail,
103                                   book=book, backend=backend)
104
105    print_task_states(flow_detail, "At the beginning, there is no state")
106    eu.print_wrapped("Running")
107    engine.run()
108    print_task_states(flow_detail, "After running")
109
110    # RE-CREATE, RESUME, RUN ####################################
111
112    eu.print_wrapped("Resuming and running again")
113
114    # NOTE(harlowja): reload the flow detail from backend, this will allow us
115    # to resume the flow from its suspended state, but first we need to search
116    # for the right flow details in the correct logbook where things are
117    # stored.
118    #
119    # We could avoid re-loading the engine and just do engine.run() again, but
120    # this example shows how another process may unsuspend a given flow and
121    # start it again for situations where this is useful to-do (say the process
122    # running the above flow crashes).
123    flow2 = flow_factory()
124    flow_detail_2 = find_flow_detail(backend, book.uuid, flow_detail.uuid)
125    engine2 = taskflow.engines.load(flow2,
126                                    flow_detail=flow_detail_2,
127                                    backend=backend, book=book)
128    engine2.run()
129    print_task_states(flow_detail_2, "At the end")

Creating a virtual machine (resumable)

Note

Full source located at resume_vm_boot

  1
  2import contextlib
  3import hashlib
  4import logging
  5import os
  6import random
  7import sys
  8import time
  9
 10logging.basicConfig(level=logging.ERROR)
 11
 12self_dir = os.path.abspath(os.path.dirname(__file__))
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17sys.path.insert(0, self_dir)
 18
 19import futurist
 20from oslo_utils import uuidutils
 21
 22from taskflow import engines
 23from taskflow import exceptions as exc
 24from taskflow.patterns import graph_flow as gf
 25from taskflow.patterns import linear_flow as lf
 26from taskflow.persistence import models
 27from taskflow import task
 28
 29import example_utils as eu  # noqa
 30
 31# INTRO: These examples show how a hierarchy of flows can be used to create a
 32# vm in a reliable & resumable manner using taskflow + a miniature version of
 33# what nova does while booting a vm.
 34
 35
 36@contextlib.contextmanager
 37def slow_down(how_long=0.5):
 38    try:
 39        yield how_long
 40    finally:
 41        if len(sys.argv) > 1:
 42            # Only both to do this if user input provided.
 43            print("** Ctrl-c me please!!! **")
 44            time.sleep(how_long)
 45
 46
 47class PrintText(task.Task):
 48    """Just inserts some text print outs in a workflow."""
 49    def __init__(self, print_what, no_slow=False):
 50        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 51        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
 52        self._text = print_what
 53        self._no_slow = no_slow
 54
 55    def execute(self):
 56        if self._no_slow:
 57            eu.print_wrapped(self._text)
 58        else:
 59            with slow_down():
 60                eu.print_wrapped(self._text)
 61
 62
 63class DefineVMSpec(task.Task):
 64    """Defines a vm specification to be."""
 65    def __init__(self, name):
 66        super(DefineVMSpec, self).__init__(provides='vm_spec', name=name)
 67
 68    def execute(self):
 69        return {
 70            'type': 'kvm',
 71            'disks': 2,
 72            'vcpu': 1,
 73            'ips': 1,
 74            'volumes': 3,
 75        }
 76
 77
 78class LocateImages(task.Task):
 79    """Locates where the vm images are."""
 80    def __init__(self, name):
 81        super(LocateImages, self).__init__(provides='image_locations',
 82                                           name=name)
 83
 84    def execute(self, vm_spec):
 85        image_locations = {}
 86        for i in range(0, vm_spec['disks']):
 87            url = "http://www.yahoo.com/images/%s" % (i)
 88            image_locations[url] = "/tmp/%s.img" % (i)
 89        return image_locations
 90
 91
 92class DownloadImages(task.Task):
 93    """Downloads all the vm images."""
 94    def __init__(self, name):
 95        super(DownloadImages, self).__init__(provides='download_paths',
 96                                             name=name)
 97
 98    def execute(self, image_locations):
 99        for src, loc in image_locations.items():
100            with slow_down(1):
101                print("Downloading from %s => %s" % (src, loc))
102        return sorted(image_locations.values())
103
104
105class CreateNetworkTpl(task.Task):
106    """Generates the network settings file to be placed in the images."""
107    SYSCONFIG_CONTENTS = """DEVICE=eth%s
108BOOTPROTO=static
109IPADDR=%s
110ONBOOT=yes"""
111
112    def __init__(self, name):
113        super(CreateNetworkTpl, self).__init__(provides='network_settings',
114                                               name=name)
115
116    def execute(self, ips):
117        settings = []
118        for i, ip in enumerate(ips):
119            settings.append(self.SYSCONFIG_CONTENTS % (i, ip))
120        return settings
121
122
123class AllocateIP(task.Task):
124    """Allocates the ips for the given vm."""
125    def __init__(self, name):
126        super(AllocateIP, self).__init__(provides='ips', name=name)
127
128    def execute(self, vm_spec):
129        ips = []
130        for _i in range(0, vm_spec.get('ips', 0)):
131            ips.append("192.168.0.%s" % (random.randint(1, 254)))
132        return ips
133
134
135class WriteNetworkSettings(task.Task):
136    """Writes all the network settings into the downloaded images."""
137    def execute(self, download_paths, network_settings):
138        for j, path in enumerate(download_paths):
139            with slow_down(1):
140                print("Mounting %s to /tmp/%s" % (path, j))
141            for i, setting in enumerate(network_settings):
142                filename = ("/tmp/etc/sysconfig/network-scripts/"
143                            "ifcfg-eth%s" % (i))
144                with slow_down(1):
145                    print("Writing to %s" % (filename))
146                    print(setting)
147
148
149class BootVM(task.Task):
150    """Fires off the vm boot operation."""
151    def execute(self, vm_spec):
152        print("Starting vm!")
153        with slow_down(1):
154            print("Created: %s" % (vm_spec))
155
156
157class AllocateVolumes(task.Task):
158    """Allocates the volumes for the vm."""
159    def execute(self, vm_spec):
160        volumes = []
161        for i in range(0, vm_spec['volumes']):
162            with slow_down(1):
163                volumes.append("/dev/vda%s" % (i + 1))
164                print("Allocated volume %s" % volumes[-1])
165        return volumes
166
167
168class FormatVolumes(task.Task):
169    """Formats the volumes for the vm."""
170    def execute(self, volumes):
171        for v in volumes:
172            print("Formatting volume %s" % v)
173            with slow_down(1):
174                pass
175            print("Formatted volume %s" % v)
176
177
178def create_flow():
179    # Setup the set of things to do (mini-nova).
180    flow = lf.Flow("root").add(
181        PrintText("Starting vm creation.", no_slow=True),
182        lf.Flow('vm-maker').add(
183            # First create a specification for the final vm to-be.
184            DefineVMSpec("define_spec"),
185            # This does all the image stuff.
186            gf.Flow("img-maker").add(
187                LocateImages("locate_images"),
188                DownloadImages("download_images"),
189            ),
190            # This does all the network stuff.
191            gf.Flow("net-maker").add(
192                AllocateIP("get_my_ips"),
193                CreateNetworkTpl("fetch_net_settings"),
194                WriteNetworkSettings("write_net_settings"),
195            ),
196            # This does all the volume stuff.
197            gf.Flow("volume-maker").add(
198                AllocateVolumes("allocate_my_volumes", provides='volumes'),
199                FormatVolumes("volume_formatter"),
200            ),
201            # Finally boot it all.
202            BootVM("boot-it"),
203        ),
204        # Ya it worked!
205        PrintText("Finished vm create.", no_slow=True),
206        PrintText("Instance is running!", no_slow=True))
207    return flow
208
209eu.print_wrapped("Initializing")
210
211# Setup the persistence & resumption layer.
212with eu.get_backend() as backend:
213
214    # Try to find a previously passed in tracking id...
215    try:
216        book_id, flow_id = sys.argv[2].split("+", 1)
217        if not uuidutils.is_uuid_like(book_id):
218            book_id = None
219        if not uuidutils.is_uuid_like(flow_id):
220            flow_id = None
221    except (IndexError, ValueError):
222        book_id = None
223        flow_id = None
224
225    # Set up how we want our engine to run, serial, parallel...
226    try:
227        executor = futurist.GreenThreadPoolExecutor(max_workers=5)
228    except RuntimeError:
229        # No eventlet installed, just let the default be used instead.
230        executor = None
231
232    # Create/fetch a logbook that will track the workflows work.
233    book = None
234    flow_detail = None
235    if all([book_id, flow_id]):
236        # Try to find in a prior logbook and flow detail...
237        with contextlib.closing(backend.get_connection()) as conn:
238            try:
239                book = conn.get_logbook(book_id)
240                flow_detail = book.find(flow_id)
241            except exc.NotFound:
242                pass
243    if book is None and flow_detail is None:
244        book = models.LogBook("vm-boot")
245        with contextlib.closing(backend.get_connection()) as conn:
246            conn.save_logbook(book)
247        engine = engines.load_from_factory(create_flow,
248                                           backend=backend, book=book,
249                                           engine='parallel',
250                                           executor=executor)
251        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
252                                                   engine.storage.flow_uuid))
253        print("!! Please submit this on later runs for tracking purposes")
254    else:
255        # Attempt to load from a previously partially completed flow.
256        engine = engines.load_from_detail(flow_detail, backend=backend,
257                                          engine='parallel', executor=executor)
258
259    # Make me my vm please!
260    eu.print_wrapped('Running')
261    engine.run()
262
263# How to use.
264#
265# 1. $ python me.py "sqlite:////tmp/nova.db"
266# 2. ctrl-c before this finishes
267# 3. Find the tracking id (search for 'Your tracking id is')
268# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
269# 5. Watch it pick up where it left off.
270# 6. Profit!

Creating a volume (resumable)

Note

Full source located at resume_volume_create

  1
  2import contextlib
  3import hashlib
  4import logging
  5import os
  6import random
  7import sys
  8import time
  9
 10logging.basicConfig(level=logging.ERROR)
 11
 12self_dir = os.path.abspath(os.path.dirname(__file__))
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17sys.path.insert(0, self_dir)
 18
 19from oslo_utils import uuidutils
 20
 21from taskflow import engines
 22from taskflow.patterns import graph_flow as gf
 23from taskflow.patterns import linear_flow as lf
 24from taskflow.persistence import models
 25from taskflow import task
 26
 27import example_utils  # noqa
 28
 29# INTRO: These examples show how a hierarchy of flows can be used to create a
 30# pseudo-volume in a reliable & resumable manner using taskflow + a miniature
 31# version of what cinder does while creating a volume (very miniature).
 32
 33
 34@contextlib.contextmanager
 35def slow_down(how_long=0.5):
 36    try:
 37        yield how_long
 38    finally:
 39        print("** Ctrl-c me please!!! **")
 40        time.sleep(how_long)
 41
 42
 43def find_flow_detail(backend, book_id, flow_id):
 44    # NOTE(harlowja): this is used to attempt to find a given logbook with
 45    # a given id and a given flow details inside that logbook, we need this
 46    # reference so that we can resume the correct flow (as a logbook tracks
 47    # flows and a flow detail tracks a individual flow).
 48    #
 49    # Without a reference to the logbook and the flow details in that logbook
 50    # we will not know exactly what we should resume and that would mean we
 51    # can't resume what we don't know.
 52    with contextlib.closing(backend.get_connection()) as conn:
 53        lb = conn.get_logbook(book_id)
 54        return lb.find(flow_id)
 55
 56
 57class PrintText(task.Task):
 58    def __init__(self, print_what, no_slow=False):
 59        content_hash = hashlib.md5(print_what.encode('utf-8')).hexdigest()[0:8]
 60        super(PrintText, self).__init__(name="Print: %s" % (content_hash))
 61        self._text = print_what
 62        self._no_slow = no_slow
 63
 64    def execute(self):
 65        if self._no_slow:
 66            print("-" * (len(self._text)))
 67            print(self._text)
 68            print("-" * (len(self._text)))
 69        else:
 70            with slow_down():
 71                print("-" * (len(self._text)))
 72                print(self._text)
 73                print("-" * (len(self._text)))
 74
 75
 76class CreateSpecForVolumes(task.Task):
 77    def execute(self):
 78        volumes = []
 79        for i in range(0, random.randint(1, 10)):
 80            volumes.append({
 81                'type': 'disk',
 82                'location': "/dev/vda%s" % (i + 1),
 83            })
 84        return volumes
 85
 86
 87class PrepareVolumes(task.Task):
 88    def execute(self, volume_specs):
 89        for v in volume_specs:
 90            with slow_down():
 91                print("Dusting off your hard drive %s" % (v))
 92            with slow_down():
 93                print("Taking a well deserved break.")
 94            print("Your drive %s has been certified." % (v))
 95
 96
 97# Setup the set of things to do (mini-cinder).
 98flow = lf.Flow("root").add(
 99    PrintText("Starting volume create", no_slow=True),
100    gf.Flow('maker').add(
101        CreateSpecForVolumes("volume_specs", provides='volume_specs'),
102        PrintText("I need a nap, it took me a while to build those specs."),
103        PrepareVolumes(),
104    ),
105    PrintText("Finished volume create", no_slow=True))
106
107# Setup the persistence & resumption layer.
108with example_utils.get_backend() as backend:
109    try:
110        book_id, flow_id = sys.argv[2].split("+", 1)
111    except (IndexError, ValueError):
112        book_id = None
113        flow_id = None
114
115    if not all([book_id, flow_id]):
116        # If no 'tracking id' (think a fedex or ups tracking id) is provided
117        # then we create one by creating a logbook (where flow details are
118        # stored) and creating a flow detail (where flow and task state is
119        # stored). The combination of these 2 objects unique ids (uuids) allows
120        # the users of taskflow to reassociate the workflows that were
121        # potentially running (and which may have partially completed) back
122        # with taskflow so that those workflows can be resumed (or reverted)
123        # after a process/thread/engine has failed in someway.
124        book = models.LogBook('resume-volume-create')
125        flow_detail = models.FlowDetail("root", uuid=uuidutils.generate_uuid())
126        book.add(flow_detail)
127        with contextlib.closing(backend.get_connection()) as conn:
128            conn.save_logbook(book)
129        print("!! Your tracking id is: '%s+%s'" % (book.uuid,
130                                                   flow_detail.uuid))
131        print("!! Please submit this on later runs for tracking purposes")
132    else:
133        flow_detail = find_flow_detail(backend, book_id, flow_id)
134
135    # Load and run.
136    engine = engines.load(flow,
137                          flow_detail=flow_detail,
138                          backend=backend, engine='serial')
139    engine.run()
140
141# How to use.
142#
143# 1. $ python me.py "sqlite:////tmp/cinder.db"
144# 2. ctrl-c before this finishes
145# 3. Find the tracking id (search for 'Your tracking id is')
146# 4. $ python me.py "sqlite:////tmp/cinder.db" "$tracking_id"
147# 5. Profit!

Running engines via iteration

Note

Full source located at run_by_iter

 1
 2import logging
 3import os
 4import sys
 5
 6import six
 7
 8logging.basicConfig(level=logging.ERROR)
 9
10self_dir = os.path.abspath(os.path.dirname(__file__))
11top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
12                                       os.pardir,
13                                       os.pardir))
14sys.path.insert(0, top_dir)
15sys.path.insert(0, self_dir)
16
17
18from taskflow import engines
19from taskflow.patterns import linear_flow as lf
20from taskflow import task
21
22
23# INTRO: This example shows how to run a set of engines at the same time, each
24# running in different engines using a single thread of control to iterate over
25# each engine (which causes that engine to advanced to its next state during
26# each iteration).
27
28
29class EchoTask(task.Task):
30    def execute(self, value):
31        print(value)
32        return chr(ord(value) + 1)
33
34
35def make_alphabet_flow(i):
36    f = lf.Flow("alphabet_%s" % (i))
37    start_value = 'A'
38    end_value = 'Z'
39    curr_value = start_value
40    while ord(curr_value) <= ord(end_value):
41        next_value = chr(ord(curr_value) + 1)
42        if curr_value != end_value:
43            f.add(EchoTask(name="echoer_%s" % curr_value,
44                           rebind={'value': curr_value},
45                           provides=next_value))
46        else:
47            f.add(EchoTask(name="echoer_%s" % curr_value,
48                           rebind={'value': curr_value}))
49        curr_value = next_value
50    return f
51
52
53# Adjust this number to change how many engines/flows run at once.
54flow_count = 1
55flows = []
56for i in range(0, flow_count):
57    f = make_alphabet_flow(i + 1)
58    flows.append(make_alphabet_flow(i + 1))
59engine_iters = []
60for f in flows:
61    e = engines.load(f)
62    e.compile()
63    e.storage.inject({'A': 'A'})
64    e.prepare()
65    engine_iters.append(e.run_iter())
66while engine_iters:
67    for it in list(engine_iters):
68        try:
69            print(six.next(it))
70        except StopIteration:
71            engine_iters.remove(it)

Controlling retries using a retry controller

Note

Full source located at retry_flow

 1
 2import logging
 3import os
 4import sys
 5
 6logging.basicConfig(level=logging.ERROR)
 7
 8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 9                                       os.pardir,
10                                       os.pardir))
11sys.path.insert(0, top_dir)
12
13import taskflow.engines
14from taskflow.patterns import linear_flow as lf
15from taskflow import retry
16from taskflow import task
17
18# INTRO: In this example we create a retry controller that receives a phone
19# directory and tries different phone numbers. The next task tries to call Jim
20# using the given number. If it is not a Jim's number, the task raises an
21# exception and retry controller takes the next number from the phone
22# directory and retries the call.
23#
24# This example shows a basic usage of retry controllers in a flow.
25# Retry controllers allows to revert and retry a failed subflow with new
26# parameters.
27
28
29class CallJim(task.Task):
30    def execute(self, jim_number):
31        print("Calling jim %s." % jim_number)
32        if jim_number != 555:
33            raise Exception("Wrong number!")
34        else:
35            print("Hello Jim!")
36
37    def revert(self, jim_number, **kwargs):
38        print("Wrong number, apologizing.")
39
40
41# Create your flow and associated tasks (the work to be done).
42flow = lf.Flow('retrying-linear',
43               retry=retry.ParameterizedForEach(
44                   rebind=['phone_directory'],
45                   provides='jim_number')).add(CallJim())
46
47# Now run that flow using the provided initial data (store below).
48taskflow.engines.run(flow, store={'phone_directory': [333, 444, 555, 666]})

Distributed execution (simple)

Note

Full source located at wbe_simple_linear

  1
  2import json
  3import logging
  4import os
  5import sys
  6import tempfile
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13from taskflow import engines
 14from taskflow.engines.worker_based import worker
 15from taskflow.patterns import linear_flow as lf
 16from taskflow.tests import utils
 17from taskflow.utils import threading_utils
 18
 19import example_utils  # noqa
 20
 21# INTRO: This example walks through a miniature workflow which shows how to
 22# start up a number of workers (these workers will process task execution and
 23# reversion requests using any provided input data) and then use an engine
 24# that creates a set of *capable* tasks and flows (the engine can not create
 25# tasks that the workers are not able to run, this will end in failure) that
 26# those workers will run and then executes that workflow seamlessly using the
 27# workers to perform the actual execution.
 28#
 29# NOTE(harlowja): this example simulates the expected larger number of workers
 30# by using a set of threads (which in this example simulate the remote workers
 31# that would typically be running on other external machines).
 32
 33# A filesystem can also be used as the queue transport (useful as simple
 34# transport type that does not involve setting up a larger mq system). If this
 35# is false then the memory transport is used instead, both work in standalone
 36# setups.
 37USE_FILESYSTEM = False
 38BASE_SHARED_CONF = {
 39    'exchange': 'taskflow',
 40}
 41
 42# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 43# recommended to run many worker threads in this example due to the types
 44# of errors mentioned in that issue.
 45MEMORY_WORKERS = 2
 46FILE_WORKERS = 1
 47WORKER_CONF = {
 48    # These are the tasks the worker can execute, they *must* be importable,
 49    # typically this list is used to restrict what workers may execute to
 50    # a smaller set of *allowed* tasks that are known to be safe (one would
 51    # not want to allow all python code to be executed).
 52    'tasks': [
 53        'taskflow.tests.utils:TaskOneArgOneReturn',
 54        'taskflow.tests.utils:TaskMultiArgOneReturn'
 55    ],
 56}
 57
 58
 59def run(engine_options):
 60    flow = lf.Flow('simple-linear').add(
 61        utils.TaskOneArgOneReturn(provides='result1'),
 62        utils.TaskMultiArgOneReturn(provides='result2')
 63    )
 64    eng = engines.load(flow,
 65                       store=dict(x=111, y=222, z=333),
 66                       engine='worker-based', **engine_options)
 67    eng.run()
 68    return eng.storage.fetch_all()
 69
 70
 71if __name__ == "__main__":
 72    logging.basicConfig(level=logging.ERROR)
 73
 74    # Setup our transport configuration and merge it into the worker and
 75    # engine configuration so that both of those use it correctly.
 76    shared_conf = dict(BASE_SHARED_CONF)
 77
 78    tmp_path = None
 79    if USE_FILESYSTEM:
 80        worker_count = FILE_WORKERS
 81        tmp_path = tempfile.mkdtemp(prefix='wbe-example-')
 82        shared_conf.update({
 83            'transport': 'filesystem',
 84            'transport_options': {
 85                'data_folder_in': tmp_path,
 86                'data_folder_out': tmp_path,
 87                'polling_interval': 0.1,
 88            },
 89        })
 90    else:
 91        worker_count = MEMORY_WORKERS
 92        shared_conf.update({
 93            'transport': 'memory',
 94            'transport_options': {
 95                'polling_interval': 0.1,
 96            },
 97        })
 98    worker_conf = dict(WORKER_CONF)
 99    worker_conf.update(shared_conf)
100    engine_options = dict(shared_conf)
101    workers = []
102    worker_topics = []
103
104    try:
105        # Create a set of workers to simulate actual remote workers.
106        print('Running %s workers.' % (worker_count))
107        for i in range(0, worker_count):
108            worker_conf['topic'] = 'worker-%s' % (i + 1)
109            worker_topics.append(worker_conf['topic'])
110            w = worker.Worker(**worker_conf)
111            runner = threading_utils.daemon_thread(w.run)
112            runner.start()
113            w.wait()
114            workers.append((runner, w.stop))
115
116        # Now use those workers to do something.
117        print('Executing some work.')
118        engine_options['topics'] = worker_topics
119        result = run(engine_options)
120        print('Execution finished.')
121        # This is done so that the test examples can work correctly
122        # even when the keys change order (which will happen in various
123        # python versions).
124        print("Result = %s" % json.dumps(result, sort_keys=True))
125    finally:
126        # And cleanup.
127        print('Stopping workers.')
128        while workers:
129            r, stopper = workers.pop()
130            stopper()
131            r.join()
132        if tmp_path:
133            example_utils.rm_path(tmp_path)

Distributed notification (simple)

Note

Full source located at wbe_event_sender

  1
  2import logging
  3import os
  4import string
  5import sys
  6import time
  7
  8top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  9                                       os.pardir,
 10                                       os.pardir))
 11sys.path.insert(0, top_dir)
 12
 13from six.moves import range as compat_range
 14
 15from taskflow import engines
 16from taskflow.engines.worker_based import worker
 17from taskflow.patterns import linear_flow as lf
 18from taskflow import task
 19from taskflow.types import notifier
 20from taskflow.utils import threading_utils
 21
 22ANY = notifier.Notifier.ANY
 23
 24# INTRO: These examples show how to use a remote worker's event notification
 25# attribute to proxy back task event notifications to the controlling process.
 26#
 27# In this case a simple set of events is triggered by a worker running a
 28# task (simulated to be remote by using a kombu memory transport and threads).
 29# Those events that the 'remote worker' produces will then be proxied back to
 30# the task that the engine is running 'remotely', and then they will be emitted
 31# back to the original callbacks that exist in the originating engine
 32# process/thread. This creates a one-way *notification* channel that can
 33# transparently be used in-process, outside-of-process using remote workers and
 34# so-on that allows tasks to signal to its controlling process some sort of
 35# action that has occurred that the task may need to tell others about (for
 36# example to trigger some type of response when the task reaches 50% done...).
 37
 38
 39def event_receiver(event_type, details):
 40    """This is the callback that (in this example) doesn't do much..."""
 41    print("Recieved event '%s'" % event_type)
 42    print("Details = %s" % details)
 43
 44
 45class EventReporter(task.Task):
 46    """This is the task that will be running 'remotely' (not really remote)."""
 47
 48    EVENTS = tuple(string.ascii_uppercase)
 49    EVENT_DELAY = 0.1
 50
 51    def execute(self):
 52        for i, e in enumerate(self.EVENTS):
 53            details = {
 54                'leftover': self.EVENTS[i:],
 55            }
 56            self.notifier.notify(e, details)
 57            time.sleep(self.EVENT_DELAY)
 58
 59
 60BASE_SHARED_CONF = {
 61    'exchange': 'taskflow',
 62    'transport': 'memory',
 63    'transport_options': {
 64        'polling_interval': 0.1,
 65    },
 66}
 67
 68# Until https://github.com/celery/kombu/issues/398 is resolved it is not
 69# recommended to run many worker threads in this example due to the types
 70# of errors mentioned in that issue.
 71MEMORY_WORKERS = 1
 72WORKER_CONF = {
 73    'tasks': [
 74        # Used to locate which tasks we can run (we don't want to allow
 75        # arbitrary code/tasks to be ran by any worker since that would
 76        # open up a variety of vulnerabilities).
 77        '%s:EventReporter' % (__name__),
 78    ],
 79}
 80
 81
 82def run(engine_options):
 83    reporter = EventReporter()
 84    reporter.notifier.register(ANY, event_receiver)
 85    flow = lf.Flow('event-reporter').add(reporter)
 86    eng = engines.load(flow, engine='worker-based', **engine_options)
 87    eng.run()
 88
 89
 90if __name__ == "__main__":
 91    logging.basicConfig(level=logging.ERROR)
 92
 93    # Setup our transport configuration and merge it into the worker and
 94    # engine configuration so that both of those objects use it correctly.
 95    worker_conf = dict(WORKER_CONF)
 96    worker_conf.update(BASE_SHARED_CONF)
 97    engine_options = dict(BASE_SHARED_CONF)
 98    workers = []
 99
100    # These topics will be used to request worker information on; those
101    # workers will respond with their capabilities which the executing engine
102    # will use to match pending tasks to a matched worker, this will cause
103    # the task to be sent for execution, and the engine will wait until it
104    # is finished (a response is received) and then the engine will either
105    # continue with other tasks, do some retry/failure resolution logic or
106    # stop (and potentially re-raise the remote workers failure)...
107    worker_topics = []
108
109    try:
110        # Create a set of worker threads to simulate actual remote workers...
111        print('Running %s workers.' % (MEMORY_WORKERS))
112        for i in compat_range(0, MEMORY_WORKERS):
113            # Give each one its own unique topic name so that they can
114            # correctly communicate with the engine (they will all share the
115            # same exchange).
116            worker_conf['topic'] = 'worker-%s' % (i + 1)
117            worker_topics.append(worker_conf['topic'])
118            w = worker.Worker(**worker_conf)
119            runner = threading_utils.daemon_thread(w.run)
120            runner.start()
121            w.wait()
122            workers.append((runner, w.stop))
123
124        # Now use those workers to do something.
125        print('Executing some work.')
126        engine_options['topics'] = worker_topics
127        result = run(engine_options)
128        print('Execution finished.')
129    finally:
130        # And cleanup.
131        print('Stopping workers.')
132        while workers:
133            r, stopper = workers.pop()
134            stopper()
135            r.join()

Distributed mandelbrot (complex)

Note

Full source located at wbe_mandelbrot

Output

Generated mandelbrot fractal

Code

  1
  2import logging
  3import math
  4import os
  5import sys
  6
  7top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
  8                                       os.pardir,
  9                                       os.pardir))
 10sys.path.insert(0, top_dir)
 11
 12from six.moves import range as compat_range
 13
 14from taskflow import engines
 15from taskflow.engines.worker_based import worker
 16from taskflow.patterns import unordered_flow as uf
 17from taskflow import task
 18from taskflow.utils import threading_utils
 19
 20# INTRO: This example walks through a workflow that will in parallel compute
 21# a mandelbrot result set (using X 'remote' workers) and then combine their
 22# results together to form a final mandelbrot fractal image. It shows a usage
 23# of taskflow to perform a well-known embarrassingly parallel problem that has
 24# the added benefit of also being an elegant visualization.
 25#
 26# NOTE(harlowja): this example simulates the expected larger number of workers
 27# by using a set of threads (which in this example simulate the remote workers
 28# that would typically be running on other external machines).
 29#
 30# NOTE(harlowja): to have it produce an image run (after installing pillow):
 31#
 32# $ python taskflow/examples/wbe_mandelbrot.py output.png
 33
 34BASE_SHARED_CONF = {
 35    'exchange': 'taskflow',
 36}
 37WORKERS = 2
 38WORKER_CONF = {
 39    # These are the tasks the worker can execute, they *must* be importable,
 40    # typically this list is used to restrict what workers may execute to
 41    # a smaller set of *allowed* tasks that are known to be safe (one would
 42    # not want to allow all python code to be executed).
 43    'tasks': [
 44        '%s:MandelCalculator' % (__name__),
 45    ],
 46}
 47ENGINE_CONF = {
 48    'engine': 'worker-based',
 49}
 50
 51# Mandelbrot & image settings...
 52IMAGE_SIZE = (512, 512)
 53CHUNK_COUNT = 8
 54MAX_ITERATIONS = 25
 55
 56
 57class MandelCalculator(task.Task):
 58    def execute(self, image_config, mandelbrot_config, chunk):
 59        """Returns the number of iterations before the computation "escapes".
 60
 61        Given the real and imaginary parts of a complex number, determine if it
 62        is a candidate for membership in the mandelbrot set given a fixed
 63        number of iterations.
 64        """
 65
 66        # Parts borrowed from (credit to mark harris and benoît mandelbrot).
 67        #
 68        # http://nbviewer.ipython.org/gist/harrism/f5707335f40af9463c43
 69        def mandelbrot(x, y, max_iters):
 70            c = complex(x, y)
 71            z = 0.0j
 72            for i in compat_range(max_iters):
 73                z = z * z + c
 74                if (z.real * z.real + z.imag * z.imag) >= 4:
 75                    return i
 76            return max_iters
 77
 78        min_x, max_x, min_y, max_y, max_iters = mandelbrot_config
 79        height, width = image_config['size']
 80        pixel_size_x = (max_x - min_x) / width
 81        pixel_size_y = (max_y - min_y) / height
 82        block = []
 83        for y in compat_range(chunk[0], chunk[1]):
 84            row = []
 85            imag = min_y + y * pixel_size_y
 86            for x in compat_range(0, width):
 87                real = min_x + x * pixel_size_x
 88                row.append(mandelbrot(real, imag, max_iters))
 89            block.append(row)
 90        return block
 91
 92
 93def calculate(engine_conf):
 94    # Subdivide the work into X pieces, then request each worker to calculate
 95    # one of those chunks and then later we will write these chunks out to
 96    # an image bitmap file.
 97
 98    # And unordered flow is used here since the mandelbrot calculation is an
 99    # example of an embarrassingly parallel computation that we can scatter
100    # across as many workers as possible.
101    flow = uf.Flow("mandelbrot")
102
103    # These symbols will be automatically given to tasks as input to their
104    # execute method, in this case these are constants used in the mandelbrot
105    # calculation.
106    store = {
107        'mandelbrot_config': [-2.0, 1.0, -1.0, 1.0, MAX_ITERATIONS],
108        'image_config': {
109            'size': IMAGE_SIZE,
110        }
111    }
112
113    # We need the task names to be in the right order so that we can extract
114    # the final results in the right order (we don't care about the order when
115    # executing).
116    task_names = []
117
118    # Compose our workflow.
119    height, _width = IMAGE_SIZE
120    chunk_size = int(math.ceil(height / float(CHUNK_COUNT)))
121    for i in compat_range(0, CHUNK_COUNT):
122        chunk_name = 'chunk_%s' % i
123        task_name = "calculation_%s" % i
124        # Break the calculation up into chunk size pieces.
125        rows = [i * chunk_size, i * chunk_size + chunk_size]
126        flow.add(
127            MandelCalculator(task_name,
128                             # This ensures the storage symbol with name
129                             # 'chunk_name' is sent into the tasks local
130                             # symbol 'chunk'. This is how we give each
131                             # calculator its own correct sequence of rows
132                             # to work on.
133                             rebind={'chunk': chunk_name}))
134        store[chunk_name] = rows
135        task_names.append(task_name)
136
137    # Now execute it.
138    eng = engines.load(flow, store=store, engine_conf=engine_conf)
139    eng.run()
140
141    # Gather all the results and order them for further processing.
142    gather = []
143    for name in task_names:
144        gather.extend(eng.storage.get(name))
145    points = []
146    for y, row in enumerate(gather):
147        for x, color in enumerate(row):
148            points.append(((x, y), color))
149    return points
150
151
152def write_image(results, output_filename=None):
153    print("Gathered %s results that represents a mandelbrot"
154          " image (using %s chunks that are computed jointly"
155          " by %s workers)." % (len(results), CHUNK_COUNT, WORKERS))
156    if not output_filename:
157        return
158
159    # Pillow (the PIL fork) saves us from writing our own image writer...
160    try:
161        from PIL import Image
162    except ImportError as e:
163        # To currently get this (may change in the future),
164        # $ pip install Pillow
165        raise RuntimeError("Pillow is required to write image files: %s" % e)
166
167    # Limit to 255, find the max and normalize to that...
168    color_max = 0
169    for _point, color in results:
170        color_max = max(color, color_max)
171
172    # Use gray scale since we don't really have other colors.
173    img = Image.new('L', IMAGE_SIZE, "black")
174    pixels = img.load()
175    for (x, y), color in results:
176        if color_max == 0:
177            color = 0
178        else:
179            color = int((float(color) / color_max) * 255.0)
180        pixels[x, y] = color
181    img.save(output_filename)
182
183
184def create_fractal():
185    logging.basicConfig(level=logging.ERROR)
186
187    # Setup our transport configuration and merge it into the worker and
188    # engine configuration so that both of those use it correctly.
189    shared_conf = dict(BASE_SHARED_CONF)
190    shared_conf.update({
191        'transport': 'memory',
192        'transport_options': {
193            'polling_interval': 0.1,
194        },
195    })
196
197    if len(sys.argv) >= 2:
198        output_filename = sys.argv[1]
199    else:
200        output_filename = None
201
202    worker_conf = dict(WORKER_CONF)
203    worker_conf.update(shared_conf)
204    engine_conf = dict(ENGINE_CONF)
205    engine_conf.update(shared_conf)
206    workers = []
207    worker_topics = []
208
209    print('Calculating your mandelbrot fractal of size %sx%s.' % IMAGE_SIZE)
210    try:
211        # Create a set of workers to simulate actual remote workers.
212        print('Running %s workers.' % (WORKERS))
213        for i in compat_range(0, WORKERS):
214            worker_conf['topic'] = 'calculator_%s' % (i + 1)
215            worker_topics.append(worker_conf['topic'])
216            w = worker.Worker(**worker_conf)
217            runner = threading_utils.daemon_thread(w.run)
218            runner.start()
219            w.wait()
220            workers.append((runner, w.stop))
221
222        # Now use those workers to do something.
223        engine_conf['topics'] = worker_topics
224        results = calculate(engine_conf)
225        print('Execution finished.')
226    finally:
227        # And cleanup.
228        print('Stopping workers.')
229        while workers:
230            r, stopper = workers.pop()
231            stopper()
232            r.join()
233    print("Writing image...")
234    write_image(results, output_filename=output_filename)
235
236
237if __name__ == "__main__":
238    create_fractal()

Jobboard producer/consumer (simple)

Note

Full source located at jobboard_produce_consume_colors

  1
  2import collections
  3import contextlib
  4import logging
  5import os
  6import random
  7import sys
  8import threading
  9import time
 10
 11logging.basicConfig(level=logging.ERROR)
 12
 13top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 14                                       os.pardir,
 15                                       os.pardir))
 16sys.path.insert(0, top_dir)
 17
 18import six
 19from six.moves import range as compat_range
 20from zake import fake_client
 21
 22from taskflow import exceptions as excp
 23from taskflow.jobs import backends
 24from taskflow.utils import threading_utils
 25
 26# In this example we show how a jobboard can be used to post work for other
 27# entities to work on. This example creates a set of jobs using one producer
 28# thread (typically this would be split across many machines) and then having
 29# other worker threads with their own jobboards select work using a given
 30# filters [red/blue] and then perform that work (and consuming or abandoning
 31# the job after it has been completed or failed).
 32
 33# Things to note:
 34# - No persistence layer is used (or logbook), just the job details are used
 35#   to determine if a job should be selected by a worker or not.
 36# - This example runs in a single process (this is expected to be atypical
 37#   but this example shows that it can be done if needed, for testing...)
 38# - The iterjobs(), claim(), consume()/abandon() worker workflow.
 39# - The post() producer workflow.
 40
 41SHARED_CONF = {
 42    'path': "/taskflow/jobs",
 43    'board': 'zookeeper',
 44}
 45
 46# How many workers and producers of work will be created (as threads).
 47PRODUCERS = 3
 48WORKERS = 5
 49
 50# How many units of work each producer will create.
 51PRODUCER_UNITS = 10
 52
 53# How many units of work are expected to be produced (used so workers can
 54# know when to stop running and shutdown, typically this would not be a
 55# a value but we have to limit this example's execution time to be less than
 56# infinity).
 57EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
 58
 59# Delay between producing/consuming more work.
 60WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
 61
 62# To ensure threads don't trample other threads output.
 63STDOUT_LOCK = threading.Lock()
 64
 65
 66def dispatch_work(job):
 67    # This is where the jobs contained work *would* be done
 68    time.sleep(1.0)
 69
 70
 71def safe_print(name, message, prefix=""):
 72    with STDOUT_LOCK:
 73        if prefix:
 74            print("%s %s: %s" % (prefix, name, message))
 75        else:
 76            print("%s: %s" % (name, message))
 77
 78
 79def worker(ident, client, consumed):
 80    # Create a personal board (using the same client so that it works in
 81    # the same process) and start looking for jobs on the board that we want
 82    # to perform.
 83    name = "W-%s" % (ident)
 84    safe_print(name, "started")
 85    claimed_jobs = 0
 86    consumed_jobs = 0
 87    abandoned_jobs = 0
 88    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
 89        while len(consumed) != EXPECTED_UNITS:
 90            favorite_color = random.choice(['blue', 'red'])
 91            for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
 92                # See if we should even bother with it...
 93                if job.details.get('color') != favorite_color:
 94                    continue
 95                safe_print(name, "'%s' [attempting claim]" % (job))
 96                try:
 97                    board.claim(job, name)
 98                    claimed_jobs += 1
 99                    safe_print(name, "'%s' [claimed]" % (job))
100                except (excp.NotFound, excp.UnclaimableJob):
101                    safe_print(name, "'%s' [claim unsuccessful]" % (job))
102                else:
103                    try:
104                        dispatch_work(job)
105                        board.consume(job, name)
106                        safe_print(name, "'%s' [consumed]" % (job))
107                        consumed_jobs += 1
108                        consumed.append(job)
109                    except Exception:
110                        board.abandon(job, name)
111                        abandoned_jobs += 1
112                        safe_print(name, "'%s' [abandoned]" % (job))
113            time.sleep(WORKER_DELAY)
114    safe_print(name,
115               "finished (claimed %s jobs, consumed %s jobs,"
116               " abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
117                                        abandoned_jobs), prefix=">>>")
118
119
120def producer(ident, client):
121    # Create a personal board (using the same client so that it works in
122    # the same process) and start posting jobs on the board that we want
123    # some entity to perform.
124    name = "P-%s" % (ident)
125    safe_print(name, "started")
126    with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
127        for i in compat_range(0, PRODUCER_UNITS):
128            job_name = "%s-%s" % (name, i)
129            details = {
130                'color': random.choice(['red', 'blue']),
131            }
132            job = board.post(job_name, book=None, details=details)
133            safe_print(name, "'%s' [posted]" % (job))
134            time.sleep(PRODUCER_DELAY)
135    safe_print(name, "finished", prefix=">>>")
136
137
138def main():
139    if six.PY3:
140        # TODO(harlowja): Hack to make eventlet work right, remove when the
141        # following is fixed: https://github.com/eventlet/eventlet/issues/230
142        from taskflow.utils import eventlet_utils as _eu  # noqa
143        try:
144            import eventlet as _eventlet  # noqa
145        except ImportError:
146            pass
147    with contextlib.closing(fake_client.FakeClient()) as c:
148        created = []
149        for i in compat_range(0, PRODUCERS):
150            p = threading_utils.daemon_thread(producer, i + 1, c)
151            created.append(p)
152            p.start()
153        consumed = collections.deque()
154        for i in compat_range(0, WORKERS):
155            w = threading_utils.daemon_thread(worker, i + 1, c, consumed)
156            created.append(w)
157            w.start()
158        while created:
159            t = created.pop()
160            t.join()
161        # At the end there should be nothing leftover, let's verify that.
162        board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
163        board.connect()
164        with contextlib.closing(board):
165            if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
166                return 1
167            return 0
168
169
170if __name__ == "__main__":
171    sys.exit(main())

Conductor simulating a CI pipeline

Note

Full source located at tox_conductor

  1
  2import contextlib
  3import itertools
  4import logging
  5import os
  6import shutil
  7import socket
  8import sys
  9import tempfile
 10import threading
 11import time
 12
 13logging.basicConfig(level=logging.ERROR)
 14
 15top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 16                                       os.pardir,
 17                                       os.pardir))
 18sys.path.insert(0, top_dir)
 19
 20from oslo_utils import timeutils
 21from oslo_utils import uuidutils
 22import six
 23from zake import fake_client
 24
 25from taskflow.conductors import backends as conductors
 26from taskflow import engines
 27from taskflow.jobs import backends as boards
 28from taskflow.patterns import linear_flow
 29from taskflow.persistence import backends as persistence
 30from taskflow.persistence import models
 31from taskflow import task
 32from taskflow.utils import threading_utils
 33
 34# INTRO: This examples shows how a worker/producer can post desired work (jobs)
 35# to a jobboard and a conductor can consume that work (jobs) from that jobboard
 36# and execute those jobs in a reliable & async manner (for example, if the
 37# conductor were to crash then the job will be released back onto the jobboard
 38# and another conductor can attempt to finish it, from wherever that job last
 39# left off).
 40#
 41# In this example a in-memory jobboard (and in-memory storage) is created and
 42# used that simulates how this would be done at a larger scale (it is an
 43# example after all).
 44
 45# Restrict how long this example runs for...
 46RUN_TIME = 5
 47REVIEW_CREATION_DELAY = 0.5
 48SCAN_DELAY = 0.1
 49NAME = "%s_%s" % (socket.getfqdn(), os.getpid())
 50
 51# This won't really use zookeeper but will use a local version of it using
 52# the zake library that mimics an actual zookeeper cluster using threads and
 53# an in-memory data structure.
 54JOBBOARD_CONF = {
 55    'board': 'zookeeper://localhost?path=/taskflow/tox/jobs',
 56}
 57
 58
 59class RunReview(task.Task):
 60    # A dummy task that clones the review and runs tox...
 61
 62    def _clone_review(self, review, temp_dir):
 63        print("Cloning review '%s' into %s" % (review['id'], temp_dir))
 64
 65    def _run_tox(self, temp_dir):
 66        print("Running tox in %s" % temp_dir)
 67
 68    def execute(self, review, temp_dir):
 69        self._clone_review(review, temp_dir)
 70        self._run_tox(temp_dir)
 71
 72
 73class MakeTempDir(task.Task):
 74    # A task that creates and destroys a temporary dir (on failure).
 75    #
 76    # It provides the location of the temporary dir for other tasks to use
 77    # as they see fit.
 78
 79    default_provides = 'temp_dir'
 80
 81    def execute(self):
 82        return tempfile.mkdtemp()
 83
 84    def revert(self, *args, **kwargs):
 85        temp_dir = kwargs.get(task.REVERT_RESULT)
 86        if temp_dir:
 87            shutil.rmtree(temp_dir)
 88
 89
 90class CleanResources(task.Task):
 91    # A task that cleans up any workflow resources.
 92
 93    def execute(self, temp_dir):
 94        print("Removing %s" % temp_dir)
 95        shutil.rmtree(temp_dir)
 96
 97
 98def review_iter():
 99    """Makes reviews (never-ending iterator/generator)."""
100    review_id_gen = itertools.count(0)
101    while True:
102        review_id = six.next(review_id_gen)
103        review = {
104            'id': review_id,
105        }
106        yield review
107
108
109# The reason this is at the module namespace level is important, since it must
110# be accessible from a conductor dispatching an engine, if it was a lambda
111# function for example, it would not be reimportable and the conductor would
112# be unable to reference it when creating the workflow to run.
113def create_review_workflow():
114    """Factory method used to create a review workflow to run."""
115    f = linear_flow.Flow("tester")
116    f.add(
117        MakeTempDir(name="maker"),
118        RunReview(name="runner"),
119        CleanResources(name="cleaner")
120    )
121    return f
122
123
124def generate_reviewer(client, saver, name=NAME):
125    """Creates a review producer thread with the given name prefix."""
126    real_name = "%s_reviewer" % name
127    no_more = threading.Event()
128    jb = boards.fetch(real_name, JOBBOARD_CONF,
129                      client=client, persistence=saver)
130
131    def make_save_book(saver, review_id):
132        # Record what we want to happen (sometime in the future).
133        book = models.LogBook("book_%s" % review_id)
134        detail = models.FlowDetail("flow_%s" % review_id,
135                                   uuidutils.generate_uuid())
136        book.add(detail)
137        # Associate the factory method we want to be called (in the future)
138        # with the book, so that the conductor will be able to call into
139        # that factory to retrieve the workflow objects that represent the
140        # work.
141        #
142        # These args and kwargs *can* be used to save any specific parameters
143        # into the factory when it is being called to create the workflow
144        # objects (typically used to tell a factory how to create a unique
145        # workflow that represents this review).
146        factory_args = ()
147        factory_kwargs = {}
148        engines.save_factory_details(detail, create_review_workflow,
149                                     factory_args, factory_kwargs)
150        with contextlib.closing(saver.get_connection()) as conn:
151            conn.save_logbook(book)
152            return book
153
154    def run():
155        """Periodically publishes 'fake' reviews to analyze."""
156        jb.connect()
157        review_generator = review_iter()
158        with contextlib.closing(jb):
159            while not no_more.is_set():
160                review = six.next(review_generator)
161                details = {
162                    'store': {
163                        'review': review,
164                    },
165                }
166                job_name = "%s_%s" % (real_name, review['id'])
167                print("Posting review '%s'" % review['id'])
168                jb.post(job_name,
169                        book=make_save_book(saver, review['id']),
170                        details=details)
171                time.sleep(REVIEW_CREATION_DELAY)
172
173    # Return the unstarted thread, and a callback that can be used
174    # shutdown that thread (to avoid running forever).
175    return (threading_utils.daemon_thread(target=run), no_more.set)
176
177
178def generate_conductor(client, saver, name=NAME):
179    """Creates a conductor thread with the given name prefix."""
180    real_name = "%s_conductor" % name
181    jb = boards.fetch(name, JOBBOARD_CONF,
182                      client=client, persistence=saver)
183    conductor = conductors.fetch("blocking", real_name, jb,
184                                 engine='parallel', wait_timeout=SCAN_DELAY)
185
186    def run():
187        jb.connect()
188        with contextlib.closing(jb):
189            conductor.run()
190
191    # Return the unstarted thread, and a callback that can be used
192    # shutdown that thread (to avoid running forever).
193    return (threading_utils.daemon_thread(target=run), conductor.stop)
194
195
196def main():
197    # Need to share the same backend, so that data can be shared...
198    persistence_conf = {
199        'connection': 'memory',
200    }
201    saver = persistence.fetch(persistence_conf)
202    with contextlib.closing(saver.get_connection()) as conn:
203        # This ensures that the needed backend setup/data directories/schema
204        # upgrades and so on... exist before they are attempted to be used...
205        conn.upgrade()
206    fc1 = fake_client.FakeClient()
207    # Done like this to share the same client storage location so the correct
208    # zookeeper features work across clients...
209    fc2 = fake_client.FakeClient(storage=fc1.storage)
210    entities = [
211        generate_reviewer(fc1, saver),
212        generate_conductor(fc2, saver),
213    ]
214    for t, stopper in entities:
215        t.start()
216    try:
217        watch = timeutils.StopWatch(duration=RUN_TIME)
218        watch.start()
219        while not watch.expired():
220            time.sleep(0.1)
221    finally:
222        for t, stopper in reversed(entities):
223            stopper()
224            t.join()
225
226
227if __name__ == '__main__':
228    main()

Conductor running 99 bottles of beer song requests

Note

Full source located at 99_bottles

  1
  2import contextlib
  3import functools
  4import logging
  5import os
  6import sys
  7import time
  8import traceback
  9
 10from kazoo import client
 11
 12top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
 13                                       os.pardir,
 14                                       os.pardir))
 15sys.path.insert(0, top_dir)
 16
 17from taskflow.conductors import backends as conductor_backends
 18from taskflow import engines
 19from taskflow.jobs import backends as job_backends
 20from taskflow import logging as taskflow_logging
 21from taskflow.patterns import linear_flow as lf
 22from taskflow.persistence import backends as persistence_backends
 23from taskflow.persistence import models
 24from taskflow import task
 25
 26from oslo_utils import timeutils
 27from oslo_utils import uuidutils
 28
 29# Instructions!
 30#
 31# 1. Install zookeeper (or change host listed below)
 32# 2. Download this example, place in file '99_bottles.py'
 33# 3. Run `python 99_bottles.py p` to place a song request onto the jobboard
 34# 4. Run `python 99_bottles.py c` a few times (in different shells)
 35# 5. On demand kill previously listed processes created in (4) and watch
 36#    the work resume on another process (and repeat)
 37# 6. Keep enough workers alive to eventually finish the song (if desired).
 38
 39ME = os.getpid()
 40ZK_HOST = "localhost:2181"
 41JB_CONF = {
 42    'hosts': ZK_HOST,
 43    'board': 'zookeeper',
 44    'path': '/taskflow/99-bottles-demo',
 45}
 46PERSISTENCE_URI = r"sqlite:////tmp/bottles.db"
 47TAKE_DOWN_DELAY = 1.0
 48PASS_AROUND_DELAY = 3.0
 49HOW_MANY_BOTTLES = 99
 50
 51
 52class TakeABottleDown(task.Task):
 53    def execute(self, bottles_left):
 54        sys.stdout.write('Take one down, ')
 55        sys.stdout.flush()
 56        time.sleep(TAKE_DOWN_DELAY)
 57        return bottles_left - 1
 58
 59
 60class PassItAround(task.Task):
 61    def execute(self):
 62        sys.stdout.write('pass it around, ')
 63        sys.stdout.flush()
 64        time.sleep(PASS_AROUND_DELAY)
 65
 66
 67class Conclusion(task.Task):
 68    def execute(self, bottles_left):
 69        sys.stdout.write('%s bottles of beer on the wall...\n' % bottles_left)
 70        sys.stdout.flush()
 71
 72
 73def make_bottles(count):
 74    # This is the function that will be called to generate the workflow
 75    # and will also be called to regenerate it on resumption so that work
 76    # can continue from where it last left off...
 77
 78    s = lf.Flow("bottle-song")
 79
 80    take_bottle = TakeABottleDown("take-bottle-%s" % count,
 81                                  inject={'bottles_left': count},
 82                                  provides='bottles_left')
 83    pass_it = PassItAround("pass-%s-around" % count)
 84    next_bottles = Conclusion("next-bottles-%s" % (count - 1))
 85    s.add(take_bottle, pass_it, next_bottles)
 86
 87    for bottle in reversed(list(range(1, count))):
 88        take_bottle = TakeABottleDown("take-bottle-%s" % bottle,
 89                                      provides='bottles_left')
 90        pass_it = PassItAround("pass-%s-around" % bottle)
 91        next_bottles = Conclusion("next-bottles-%s" % (bottle - 1))
 92        s.add(take_bottle, pass_it, next_bottles)
 93
 94    return s
 95
 96
 97def run_conductor(only_run_once=False):
 98    # This continuously runs consumers until its stopped via ctrl-c or other
 99    # kill signal...
100    event_watches = {}
101
102    # This will be triggered by the conductor doing various activities
103    # with engines, and is quite nice to be able to see the various timing
104    # segments (which is useful for debugging, or watching, or figuring out
105    # where to optimize).
106    def on_conductor_event(cond, event, details):
107        print("Event '%s' has been received..." % event)
108        print("Details = %s" % details)
109        if event.endswith("_start"):
110            w = timeutils.StopWatch()
111            w.start()
112            base_event = event[0:-len("_start")]
113            event_watches[base_event] = w
114        if event.endswith("_end"):
115            base_event = event[0:-len("_end")]
116            try:
117                w = event_watches.pop(base_event)
118                w.stop()
119                print("It took %0.3f seconds for event '%s' to finish"
120                      % (w.elapsed(), base_event))
121            except KeyError:
122                pass
123        if event == 'running_end' and only_run_once:
124            cond.stop()
125
126    print("Starting conductor with pid: %s" % ME)
127    my_name = "conductor-%s" % ME
128    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
129    with contextlib.closing(persist_backend):
130        with contextlib.closing(persist_backend.get_connection()) as conn:
131            conn.upgrade()
132        job_backend = job_backends.fetch(my_name, JB_CONF,
133                                         persistence=persist_backend)
134        job_backend.connect()
135        with contextlib.closing(job_backend):
136            cond = conductor_backends.fetch('blocking', my_name, job_backend,
137                                            persistence=persist_backend)
138            on_conductor_event = functools.partial(on_conductor_event, cond)
139            cond.notifier.register(cond.notifier.ANY, on_conductor_event)
140            # Run forever, and kill -9 or ctrl-c me...
141            try:
142                cond.run()
143            finally:
144                cond.stop()
145                cond.wait()
146
147
148def run_poster():
149    # This just posts a single job and then ends...
150    print("Starting poster with pid: %s" % ME)
151    my_name = "poster-%s" % ME
152    persist_backend = persistence_backends.fetch(PERSISTENCE_URI)
153    with contextlib.closing(persist_backend):
154        with contextlib.closing(persist_backend.get_connection()) as conn:
155            conn.upgrade()
156        job_backend = job_backends.fetch(my_name, JB_CONF,
157                                         persistence=persist_backend)
158        job_backend.connect()
159        with contextlib.closing(job_backend):
160            # Create information in the persistence backend about the
161            # unit of work we want to complete and the factory that
162            # can be called to create the tasks that the work unit needs
163            # to be done.
164            lb = models.LogBook("post-from-%s" % my_name)
165            fd = models.FlowDetail("song-from-%s" % my_name,
166                                   uuidutils.generate_uuid())
167            lb.add(fd)
168            with contextlib.closing(persist_backend.get_connection()) as conn:
169                conn.save_logbook(lb)
170            engines.save_factory_details(fd, make_bottles,
171                                         [HOW_MANY_BOTTLES], {},
172                                         backend=persist_backend)
173            # Post, and be done with it!
174            jb = job_backend.post("song-from-%s" % my_name, book=lb)
175            print("Posted: %s" % jb)
176            print("Goodbye...")
177
178
179def main_local():
180    # Run locally typically this is activating during unit testing when all
181    # the examples are made sure to still function correctly...
182    global TAKE_DOWN_DELAY
183    global PASS_AROUND_DELAY
184    global JB_CONF
185    # Make everything go much faster (so that this finishes quickly).
186    PASS_AROUND_DELAY = 0.01
187    TAKE_DOWN_DELAY = 0.01
188    JB_CONF['path'] = JB_CONF['path'] + "-" + uuidutils.generate_uuid()
189    run_poster()
190    run_conductor(only_run_once=True)
191
192
193def check_for_zookeeper(timeout=1):
194    sys.stderr.write("Testing for the existence of a zookeeper server...\n")
195    sys.stderr.write("Please wait....\n")
196    with contextlib.closing(client.KazooClient()) as test_client:
197        try:
198            test_client.start(timeout=timeout)
199        except test_client.handler.timeout_exception:
200            sys.stderr.write("Zookeeper is needed for running this example!\n")
201            traceback.print_exc()
202            return False
203        else:
204            test_client.stop()
205            return True
206
207
208def main():
209    if not check_for_zookeeper():
210        return
211    if len(sys.argv) == 1:
212        main_local()
213    elif sys.argv[1] in ('p', 'c'):
214        if sys.argv[-1] == "v":
215            logging.basicConfig(level=taskflow_logging.TRACE)
216        else:
217            logging.basicConfig(level=logging.ERROR)
218        if sys.argv[1] == 'p':
219            run_poster()
220        else:
221            run_conductor()
222    else:
223        sys.stderr.write("%s p|c (v?)\n" % os.path.basename(sys.argv[0]))
224
225
226if __name__ == '__main__':
227    main()