activity¶
This module implements the base classes for Activities.
-
class
dsplab.flow.activity.
Activity
[source]¶ Bases:
object
Any activity is the something that may be called and can provide the information about itself. To get working activity the __call__ method must be implemented.
-
class
dsplab.flow.activity.
ActivityMeta
(name, bases, attrs)[source]¶ Bases:
type
Metaclass for Activity.
-
class
dsplab.flow.activity.
Work
(descr=None, worker=None)[source]¶ Bases:
dsplab.flow.activity.Activity
Work is data processing that can be done in a variety of ways.
-
descr
¶ Description of work
-
-
class
dsplab.flow.activity.
Worker
[source]¶ Bases:
dsplab.flow.activity.Activity
Deprecated.
online¶
This module implements the base class for online filters.
-
class
dsplab.flow.online.
And
[source]¶ Bases:
dsplab.flow.activity.Activity
And operation.
-
class
dsplab.flow.online.
Delayer
(ntaps, fill_with=0)[source]¶ Bases:
dsplab.flow.online.QueueFilter
Provide delay in online processing.
-
class
dsplab.flow.online.
OnlineFilter
(ntaps=None, smooth_ntaps=None, fill_with=0, step=1)[source]¶ Bases:
dsplab.flow.activity.Activity
Universal online filter.
Parameters: - ntaps (int) – Length of internal queue using for accumulation of input samples. Default is None.
- smooth_ntaps (int) – Length of queue using for smoothing output values. Default id None.
- fill_with (object) – Initial value of every element of queues.
- step (int) – Step. Must be positive.
-
class
dsplab.flow.online.
Or
[source]¶ Bases:
dsplab.flow.activity.Activity
Or operation.
-
class
dsplab.flow.online.
QueueFilter
(ntaps, fill_with=0)[source]¶ Bases:
dsplab.flow.activity.Activity
Online filter with queue.
Parameters: - ntaps (int) – Lenght of filter.
- fill_with (object) – Initial value of every element of queue.
plan¶
Examples¶
Helper module with workers¶
"""Workers for examples."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Activity
class Linear(Activity):
"""Linear transformation: y = k*x + b."""
def __init__(self, k, b):
super().__init__()
self.k = k
self.b = b
def __call__(self, x):
y = x*self.k + self.b
return y
class Sum(Activity):
"""Sum."""
def __call__(self, *xs):
y = sum(xs)
return y
class Inc(Activity):
"""Add 1 to value."""
def __init__(self):
super().__init__()
def __call__(self, x):
y = x + 1
return y
class DoNothing(Activity):
"""Just pass input to output."""
def __init__(self):
super().__init__()
def __call__(self, x):
return x
Basic usage¶
"""Basic usage of plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
from workers import Linear
def main():
"""Run example."""
plan = Plan()
node_a = WorkNode(work=Work("Linear transformation", worker=Linear(1, 1)))
node_b = WorkNode(work=Work("Linear transformation", worker=Linear(2, 2)))
node_c = WorkNode(work=Work("Linear transformation", worker=Linear(3, 3)))
plan.add_node(node_a)
plan.add_node(node_b, inputs=[node_a])
plan.add_node(node_c, inputs=[node_b])
plan.inputs = [node_a]
plan.outputs = [node_c, node_b]
print(plan([5]))
if __name__ == "__main__":
main()
Using of start and stop hooks¶
"""Start and stop hooks."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
def func(x):
"""Worker."""
return x + 1
def start_handler(node):
"""Node start handler."""
print("'{}' started".format(node.work.descr))
def stop_handler(node):
"""Node stop handler."""
print("'{}' finished".format(node.work.descr))
def progress_handler():
"""Progress handler."""
print("Calculated one node.")
def main():
"""Entry point."""
print(__doc__)
node = WorkNode(work=Work("Increment", worker=func))
node.set_start_hook(start_handler, node)
node.set_stop_hook(stop_handler, node)
plan = Plan()
plan.add_node(node)
plan.set_progress_hook(progress_handler)
plan.inputs = [node]
plan.outputs = [node]
plan([5])
if __name__ == "__main__":
main()
MapNode (applying work for iterable input)¶
"""Mapping."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import MapNode, WorkNode, Plan
from workers import Sum, DoNothing
def main():
"""Run example."""
plan = Plan()
pass_node_1 = WorkNode(
Work("Pass", worker=DoNothing())
)
pass_node_2 = WorkNode(
Work("Pass", worker=DoNothing())
)
map_node = MapNode(
work=Work("Transformation", worker=Sum()),
inputs=[pass_node_1, pass_node_2]
)
plan.add_node(pass_node_1)
plan.add_node(pass_node_2)
plan.add_node(map_node)
plan.inputs = [pass_node_1, pass_node_2]
plan.outputs = [map_node]
res = plan([
[1, 1, 1],
[2, 2, 2],
])
print("Outputs:", res)
if __name__ == "__main__":
main()
PackNode (pack inputs to list)¶
"""Pack inputs to list."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, PackNode, Plan
from workers import DoNothing
def main():
"""Run example."""
print(__doc__)
plan = Plan()
node_1 = WorkNode(Work("Pass", worker=DoNothing()))
node_2 = WorkNode(Work("Pass", worker=DoNothing()))
node_3 = WorkNode(Work("Pass", worker=DoNothing()))
pack_node_1 = PackNode()
pack_node_2 = PackNode()
plan.add_node(node_1)
plan.add_node(node_2)
plan.add_node(node_3)
plan.add_node(pack_node_1, inputs=[node_1, node_2])
plan.add_node(pack_node_2, inputs=[node_2, node_3])
plan.inputs = [node_1, node_2, node_3]
plan.outputs = [pack_node_1, pack_node_2]
print(plan([1, 2, 3]))
if __name__ == "__main__":
main()
SelectNode¶
"""Using of SelectNode with multiple input."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import SelectNode, WorkNode, Plan
from workers import DoNothing
def main():
"""Run example."""
plan = Plan()
pass_node_1 = WorkNode(Work(descr="Pass", worker=DoNothing()))
pass_node_2 = WorkNode(Work(descr="Pass", worker=DoNothing()))
select_node_m = SelectNode(index=0)
select_node_s = SelectNode(index=0)
plan.add_node(pass_node_1)
plan.add_node(pass_node_2)
plan.add_node(select_node_m,
inputs=[pass_node_1, pass_node_2])
plan.add_node(select_node_s,
inputs=[pass_node_1])
plan.inputs = [pass_node_1, pass_node_2]
plan.outputs = [select_node_m, select_node_s]
res = plan([
[1, 2, 3],
[2, 3, 4]
])
print("Outputs: ", res)
if __name__ == "__main__":
main()
Node-generator¶
‘Node-generator’ means the no input node with no inputs.
"""Node may not have inputs."""
import os
import sys
from random import randint
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
def gen():
"""Generate random number."""
y = randint(1, 10)
print("gen -> {}".format(y))
return y
def inc(x):
"""Increment."""
y = x + 1
print("{} -> inc -> {}".format(x, y))
return y
def plus(x1, x2):
"""Sum of two numbers."""
y = x1 + x2
print("{}, {} -> plus -> {}".format(x1, x2, y))
return y
def main():
"""Run example."""
p = Plan()
g = WorkNode(Work("Generate random number", gen))
a = WorkNode(Work("Add 1", inc))
b = WorkNode(Work("Summation", plus))
p.add_node(g)
p.add_node(a)
p.add_node(b, inputs=[g, a])
p.inputs = [a]
p.outputs = [b]
x = [1]
print(x)
y = p(x)
print(y)
if __name__ == "__main__":
main()
Get Plan instance from dict¶
"""Get plan from dictionary."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import get_plan_from_dict
SETTINGS = {
'nodes': [
{
'id': 'a',
'class': 'WorkNode',
'work': {
'descr': "First step",
'worker': {
'class': "workers.Linear",
'params': {
'k': 1,
'b': 1,
}
}
}
},
{
'id': 'b',
'class': 'WorkNode',
'work': {
'descr': "Second step",
'worker': {
'function': "numpy.exp"
}
},
'inputs': ['a'],
},
{
'id': 'c',
'class': 'WorkNode',
'work': {
'descr': "Third step",
'worker': {
'class': "workers.Inc"
}
},
'inputs': ['b'],
},
{
'id': 'd',
'class': 'PackNode',
'inputs': ['b', 'c'],
'result': 'Result value'
}
],
'inputs': ['a'],
'outputs': ['d'],
}
def main():
"""Run example."""
plan = get_plan_from_dict(SETTINGS)
x = 1
y = plan([x])
print(y)
if __name__ == "__main__":
main()
Quick plan for on-line processing¶
"""Online plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import Plan, WorkNode
from dsplab.flow.activity import Work
from workers import Inc
def main():
"""Run example."""
node_1 = WorkNode(work=Work("Step 1", worker=Inc()))
node_2 = WorkNode(work=Work("Step 2", worker=Inc()))
node_3 = WorkNode(work=Work("Step 3", worker=Inc()))
plan = Plan(quick=True)
plan.add_node(node_1)
plan.add_node(node_2, inputs=[node_1])
plan.add_node(node_3, inputs=[node_2])
plan.inputs = [node_1]
plan.outputs = [node_3]
plan.reduce_calls()
xs = [1, 2, 3, 4, 5]
for x in xs:
y = plan([x])[0]
print("{} -> {}".format(x, y))
if __name__ == "__main__":
main()
Members¶
This module implements the Node and Plan classes. Node can be understood as the workplace for worker. Node can have inputs that are also nodes. Plan is the system of linked nodes.
-
class
dsplab.flow.plan.
MapNode
(work=None, inputs=None)[source]¶ Bases:
dsplab.flow.plan.WorkNode
Apply work to all components of iterable input and build iterable output.
-
class
dsplab.flow.plan.
Node
(inputs=None)[source]¶ Bases:
dsplab.flow.activity.Activity
Base class for nodes.
-
inputs
¶ Return inputs.
-
node_id
¶ ID of node.
-
result_info
¶ Information about result
-
-
class
dsplab.flow.plan.
PackNode
(inputs=None)[source]¶ Bases:
dsplab.flow.plan.Node
Pack input to output.
-
class
dsplab.flow.plan.
PassNode
(inputs=None)[source]¶ Bases:
dsplab.flow.plan.Node
Pass input to output.
-
class
dsplab.flow.plan.
Plan
(descr=None, quick=False)[source]¶ Bases:
dsplab.flow.activity.Activity
The plan. Plan is the system of linked nodes.
-
descr
¶ Description of plan
-
inputs
¶ The nodes which are inputs.
-
outputs
¶ The nodes wich are outputs.
-
-
class
dsplab.flow.plan.
SelectNode
(index, inputs=None)[source]¶ Bases:
dsplab.flow.plan.Node
Select component of output.
-
class
dsplab.flow.plan.
WorkNode
(work=None, inputs=None)[source]¶ Bases:
dsplab.flow.plan.Node
Node with work.
-
work
¶ Work in node
-
-
dsplab.flow.plan.
get_plan_from_dict
(settings, params=None)[source]¶ Create and return instance of Plan described in dictionary.
Parameters: - setting (dict) – Dictionary with plan.
- params (dict) – Dictionary with parameters like “$name” for plan.
Returns: - Plan – The instance of Plan.
- **Keys in settings**
- - ‘descr’ - description of the plan (optional)
- - ‘nodes’ - list of dicts with nodes settings
- - ‘inputs’ - list of inputs nodes ids
- - ‘outputs’ - list of output nodes ids
- **Common settings for nodes**
- - ‘id’ - id of node
- - ‘inputs’ - list of ids of input nodes for this node
- - ‘result’ - result description
- **Settings for WorkNode and MapNode**
- - ‘work’ - dict with work settings
- **Settings for PackNode**
- - ‘index’ - index of selected item