activity¶
This module implements the base classes for Activities.
-
class
dsplab.flow.activity.
Activity
[source]¶ Bases:
object
Any activity is the something that may be called and can provide the information about itself. To get working activity the __call__ method must be implemented.
-
class
dsplab.flow.activity.
ActivityMeta
(name, bases, attrs)[source]¶ Bases:
type
Metaclass for Activity.
-
class
dsplab.flow.activity.
Work
(descr=None, worker=None)[source]¶ Bases:
dsplab.flow.activity.Activity
Work is data processing that can be done in a variety of ways.
-
descr
¶ Description of work
-
-
class
dsplab.flow.activity.
Worker
[source]¶ Bases:
dsplab.flow.activity.Activity
Deprecated.
online¶
This module implements the base class for online filters.
-
class
dsplab.flow.online.
And
[source]¶ Bases:
dsplab.flow.activity.Activity
And operation.
-
class
dsplab.flow.online.
Delayer
(ntaps, fill_with=0)[source]¶ Bases:
dsplab.flow.online.QueueFilter
Provide delay in online processing.
-
class
dsplab.flow.online.
OnlineFilter
(ntaps=None, smooth_ntaps=None, fill_with=0, step=1)[source]¶ Bases:
dsplab.flow.activity.Activity
Universal online filter.
Parameters: - ntaps (int) – Length of internal queue using for accumulation of input samples. Default is None.
- smooth_ntaps (int) – Length of queue using for smoothing output values. Default id None.
- fill_with (object) – Initial value of every element of queues.
- step (int) – Step. Must be positive.
-
class
dsplab.flow.online.
Or
[source]¶ Bases:
dsplab.flow.activity.Activity
Or operation.
-
class
dsplab.flow.online.
QueueFilter
(ntaps, fill_with=0)[source]¶ Bases:
dsplab.flow.activity.Activity
Online filter with queue.
Parameters: - ntaps (int) – Lenght of filter.
- fill_with (object) – Initial value of every element of queue.
plan¶
Examples¶
Helper module with workers¶
"""Workers for examples."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Activity
class Linear(Activity):
"""Linear transformation: y = k*x + b."""
def __init__(self, k, b):
super().__init__()
self.k = k
self.b = b
def __call__(self, x):
y = x*self.k + self.b
return y
class Sum(Activity):
"""Sum."""
def __call__(self, *xs):
y = sum(xs)
return y
class Inc(Activity):
"""Add 1 to value."""
def __init__(self):
super().__init__()
def __call__(self, x):
y = x + 1
return y
class DoNothing(Activity):
"""Just pass input to output."""
def __init__(self):
super().__init__()
def __call__(self, x):
return x
Basic usage¶
"""Basic usage of plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
from workers import Linear
def main():
"""Run example."""
plan = Plan()
node_a = WorkNode(work=Work("Linear transformation", worker=Linear(1, 1)))
node_b = WorkNode(work=Work("Linear transformation", worker=Linear(2, 2)))
node_c = WorkNode(work=Work("Linear transformation", worker=Linear(3, 3)))
plan.add_node(node_a)
plan.add_node(node_b, inputs=[node_a])
plan.add_node(node_c, inputs=[node_b])
plan.inputs = [node_a]
plan.outputs = [node_c, node_b]
print(plan([5]))
if __name__ == "__main__":
main()
Using of start and stop hooks¶
"""Start and stop hooks."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
def func(x):
"""Worker."""
return x + 1
def start_handler(node):
"""Node start handler."""
print("'{}' started".format(node.work.descr))
def stop_handler(node):
"""Node stop handler."""
print("'{}' finished".format(node.work.descr))
def progress_handler():
"""Progress handler."""
print("Calculated one node.")
def main():
"""Entry point."""
print(__doc__)
node = WorkNode(work=Work("Increment", worker=func))
node.set_start_hook(start_handler, node)
node.set_stop_hook(stop_handler, node)
plan = Plan()
plan.add_node(node)
plan.set_progress_hook(progress_handler)
plan.inputs = [node]
plan.outputs = [node]
plan([5])
if __name__ == "__main__":
main()
MapNode (applying work for iterable input)¶
"""Mapping."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import MapNode, WorkNode, Plan
from workers import Sum, DoNothing
def main():
"""Run example."""
plan = Plan()
pass_node_1 = WorkNode(
Work("Pass", worker=DoNothing())
)
pass_node_2 = WorkNode(
Work("Pass", worker=DoNothing())
)
map_node = MapNode(
work=Work("Transformation", worker=Sum()),
inputs=[pass_node_1, pass_node_2]
)
plan.add_node(pass_node_1)
plan.add_node(pass_node_2)
plan.add_node(map_node)
plan.inputs = [pass_node_1, pass_node_2]
plan.outputs = [map_node]
res = plan([
[1, 1, 1],
[2, 2, 2],
])
print("Outputs:", res)
if __name__ == "__main__":
main()
PackNode (pack inputs to list)¶
"""Pack inputs to list."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, PackNode, Plan
from workers import DoNothing
def main():
"""Run example."""
print(__doc__)
plan = Plan()
node_1 = WorkNode(Work("Pass", worker=DoNothing()))
node_2 = WorkNode(Work("Pass", worker=DoNothing()))
node_3 = WorkNode(Work("Pass", worker=DoNothing()))
pack_node_1 = PackNode()
pack_node_2 = PackNode()
plan.add_node(node_1)
plan.add_node(node_2)
plan.add_node(node_3)
plan.add_node(pack_node_1, inputs=[node_1, node_2])
plan.add_node(pack_node_2, inputs=[node_2, node_3])
plan.inputs = [node_1, node_2, node_3]
plan.outputs = [pack_node_1, pack_node_2]
print(plan([1, 2, 3]))
if __name__ == "__main__":
main()
SelectNode¶
"""Using of SelectNode with multiple input."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import SelectNode, WorkNode, Plan
from workers import DoNothing
def main():
"""Run example."""
plan = Plan()
pass_node_1 = WorkNode(Work(descr="Pass", worker=DoNothing()))
pass_node_2 = WorkNode(Work(descr="Pass", worker=DoNothing()))
select_node_m = SelectNode(index=0)
select_node_s = SelectNode(index=0)
plan.add_node(pass_node_1)
plan.add_node(pass_node_2)
plan.add_node(select_node_m,
inputs=[pass_node_1, pass_node_2])
plan.add_node(select_node_s,
inputs=[pass_node_1])
plan.inputs = [pass_node_1, pass_node_2]
plan.outputs = [select_node_m, select_node_s]
res = plan([
[1, 2, 3],
[2, 3, 4]
])
print("Outputs: ", res)
if __name__ == "__main__":
main()
Node-generator¶
‘Node-generator’ means the no input node with no inputs.
"""Node may not have inputs."""
import os
import sys
from random import randint
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
def gen():
"""Generate random number."""
y = randint(1, 10)
print("gen -> {}".format(y))
return y
def inc(x):
"""Increment."""
y = x + 1
print("{} -> inc -> {}".format(x, y))
return y
def plus(x1, x2):
"""Sum of two numbers."""
y = x1 + x2
print("{}, {} -> plus -> {}".format(x1, x2, y))
return y
def main():
"""Run example."""
p = Plan()
g = WorkNode(Work("Generate random number", gen))
a = WorkNode(Work("Add 1", inc))
b = WorkNode(Work("Summation", plus))
p.add_node(g)
p.add_node(a)
p.add_node(b, inputs=[g, a])
p.inputs = [a]
p.outputs = [b]
x = [1]
print(x)
y = p(x)
print(y)
if __name__ == "__main__":
main()
Get Plan instance from dict¶
"""Get plan from dictionary."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import get_plan_from_dict
SETTINGS = {
'nodes': [
{
'id': 'a',
'class': 'WorkNode',
'work': {
'descr': "First step",
'worker': {
'class': "workers.Linear",
'params': {
'k': 1,
'b': 1,
}
}
}
},
{
'id': 'b',
'class': 'WorkNode',
'work': {
'descr': "Second step",
'worker': {
'function': "numpy.exp"
}
},
'inputs': ['a'],
},
{
'id': 'c',
'class': 'WorkNode',
'work': {
'descr': "Third step",
'worker': {
'class': "workers.Inc"
}
},
'inputs': ['b'],
},
{
'id': 'd',
'class': 'PackNode',
'inputs': ['b', 'c'],
'result': 'Result value'
}
],
'inputs': ['a'],
'outputs': ['d'],
}
def main():
"""Run example."""
plan = get_plan_from_dict(SETTINGS)
x = 1
y = plan([x])
print(y)
if __name__ == "__main__":
main()
Quick plan for on-line processing¶
"""Online plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import Plan, WorkNode
from dsplab.flow.activity import Work
from workers import Inc
def main():
"""Run example."""
node_1 = WorkNode(work=Work("Step 1", worker=Inc()))
node_2 = WorkNode(work=Work("Step 2", worker=Inc()))
node_3 = WorkNode(work=Work("Step 3", worker=Inc()))
plan = Plan(quick=True)
plan.add_node(node_1)
plan.add_node(node_2, inputs=[node_1])
plan.add_node(node_3, inputs=[node_2])
plan.inputs = [node_1]
plan.outputs = [node_3]
plan.reduce_calls()
xs = [1, 2, 3, 4, 5]
for x in xs:
y = plan([x])[0]
print("{} -> {}".format(x, y))
if __name__ == "__main__":
main()