activity¶
This module implements the base classes for Activities.
-
class
dsplab.flow.activity.Activity[source]¶ Bases:
objectAny activity is the something that may be called and can provide the information about itself. To get working activity the __call__ method must be implemented.
-
class
dsplab.flow.activity.ActivityMeta(name, bases, attrs)[source]¶ Bases:
typeMetaclass for Activity.
-
class
dsplab.flow.activity.Work(descr=None, worker=None)[source]¶ Bases:
dsplab.flow.activity.ActivityWork is data processing that can be done in a variety of ways.
-
descr¶ Description of work
-
-
class
dsplab.flow.activity.Worker[source]¶ Bases:
dsplab.flow.activity.ActivityDeprecated.
online¶
This module implements the base class for online filters.
-
class
dsplab.flow.online.And[source]¶ Bases:
dsplab.flow.activity.ActivityAnd operation.
-
class
dsplab.flow.online.Delayer(ntaps, fill_with=0)[source]¶ Bases:
dsplab.flow.online.QueueFilterProvide delay in online processing.
-
class
dsplab.flow.online.OnlineFilter(ntaps=None, smooth_ntaps=None, fill_with=0, step=1)[source]¶ Bases:
dsplab.flow.activity.ActivityUniversal online filter.
Parameters: - ntaps (int) – Length of internal queue using for accumulation of input samples. Default is None.
- smooth_ntaps (int) – Length of queue using for smoothing output values. Default id None.
- fill_with (object) – Initial value of every element of queues.
- step (int) – Step. Must be positive.
-
class
dsplab.flow.online.Or[source]¶ Bases:
dsplab.flow.activity.ActivityOr operation.
-
class
dsplab.flow.online.QueueFilter(ntaps, fill_with=0)[source]¶ Bases:
dsplab.flow.activity.ActivityOnline filter with queue.
Parameters: - ntaps (int) – Lenght of filter.
- fill_with (object) – Initial value of every element of queue.
plan¶
Examples¶
Helper module with workers¶
"""Workers for examples."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Activity
class Linear(Activity):
"""Linear transformation: y = k*x + b."""
def __init__(self, k, b):
super().__init__()
self.k = k
self.b = b
def __call__(self, x):
y = x*self.k + self.b
return y
class Sum(Activity):
"""Sum."""
def __call__(self, *xs):
y = sum(xs)
return y
class Inc(Activity):
"""Add 1 to value."""
def __init__(self):
super().__init__()
def __call__(self, x):
y = x + 1
return y
class DoNothing(Activity):
"""Just pass input to output."""
def __init__(self):
super().__init__()
def __call__(self, x):
return x
Basic usage¶
"""Basic usage of plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
from workers import Linear
def main():
"""Run example."""
plan = Plan()
node_a = WorkNode(work=Work("Linear transformation", worker=Linear(1, 1)))
node_b = WorkNode(work=Work("Linear transformation", worker=Linear(2, 2)))
node_c = WorkNode(work=Work("Linear transformation", worker=Linear(3, 3)))
plan.add_node(node_a)
plan.add_node(node_b, inputs=[node_a])
plan.add_node(node_c, inputs=[node_b])
plan.inputs = [node_a]
plan.outputs = [node_c, node_b]
print(plan([5]))
if __name__ == "__main__":
main()
Using of start and stop hooks¶
"""Start and stop hooks."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
def func(x):
"""Worker."""
return x + 1
def start_handler(node):
"""Node start handler."""
print("'{}' started".format(node.work.descr))
def stop_handler(node):
"""Node stop handler."""
print("'{}' finished".format(node.work.descr))
def progress_handler():
"""Progress handler."""
print("Calculated one node.")
def main():
"""Entry point."""
print(__doc__)
node = WorkNode(work=Work("Increment", worker=func))
node.set_start_hook(start_handler, node)
node.set_stop_hook(stop_handler, node)
plan = Plan()
plan.add_node(node)
plan.set_progress_hook(progress_handler)
plan.inputs = [node]
plan.outputs = [node]
plan([5])
if __name__ == "__main__":
main()
MapNode (applying work for iterable input)¶
"""Mapping."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import MapNode, WorkNode, Plan
from workers import Sum, DoNothing
def main():
"""Run example."""
plan = Plan()
pass_node_1 = WorkNode(
Work("Pass", worker=DoNothing())
)
pass_node_2 = WorkNode(
Work("Pass", worker=DoNothing())
)
map_node = MapNode(
work=Work("Transformation", worker=Sum()),
inputs=[pass_node_1, pass_node_2]
)
plan.add_node(pass_node_1)
plan.add_node(pass_node_2)
plan.add_node(map_node)
plan.inputs = [pass_node_1, pass_node_2]
plan.outputs = [map_node]
res = plan([
[1, 1, 1],
[2, 2, 2],
])
print("Outputs:", res)
if __name__ == "__main__":
main()
PackNode (pack inputs to list)¶
"""Pack inputs to list."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, PackNode, Plan
from workers import DoNothing
def main():
"""Run example."""
print(__doc__)
plan = Plan()
node_1 = WorkNode(Work("Pass", worker=DoNothing()))
node_2 = WorkNode(Work("Pass", worker=DoNothing()))
node_3 = WorkNode(Work("Pass", worker=DoNothing()))
pack_node_1 = PackNode()
pack_node_2 = PackNode()
plan.add_node(node_1)
plan.add_node(node_2)
plan.add_node(node_3)
plan.add_node(pack_node_1, inputs=[node_1, node_2])
plan.add_node(pack_node_2, inputs=[node_2, node_3])
plan.inputs = [node_1, node_2, node_3]
plan.outputs = [pack_node_1, pack_node_2]
print(plan([1, 2, 3]))
if __name__ == "__main__":
main()
SelectNode¶
"""Using of SelectNode with multiple input."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import SelectNode, WorkNode, Plan
from workers import DoNothing
def main():
"""Run example."""
plan = Plan()
pass_node_1 = WorkNode(Work(descr="Pass", worker=DoNothing()))
pass_node_2 = WorkNode(Work(descr="Pass", worker=DoNothing()))
select_node_m = SelectNode(index=0)
select_node_s = SelectNode(index=0)
plan.add_node(pass_node_1)
plan.add_node(pass_node_2)
plan.add_node(select_node_m,
inputs=[pass_node_1, pass_node_2])
plan.add_node(select_node_s,
inputs=[pass_node_1])
plan.inputs = [pass_node_1, pass_node_2]
plan.outputs = [select_node_m, select_node_s]
res = plan([
[1, 2, 3],
[2, 3, 4]
])
print("Outputs: ", res)
if __name__ == "__main__":
main()
Node-generator¶
‘Node-generator’ means the no input node with no inputs.
"""Node may not have inputs."""
import os
import sys
from random import randint
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
def gen():
"""Generate random number."""
y = randint(1, 10)
print("gen -> {}".format(y))
return y
def inc(x):
"""Increment."""
y = x + 1
print("{} -> inc -> {}".format(x, y))
return y
def plus(x1, x2):
"""Sum of two numbers."""
y = x1 + x2
print("{}, {} -> plus -> {}".format(x1, x2, y))
return y
def main():
"""Run example."""
p = Plan()
g = WorkNode(Work("Generate random number", gen))
a = WorkNode(Work("Add 1", inc))
b = WorkNode(Work("Summation", plus))
p.add_node(g)
p.add_node(a)
p.add_node(b, inputs=[g, a])
p.inputs = [a]
p.outputs = [b]
x = [1]
print(x)
y = p(x)
print(y)
if __name__ == "__main__":
main()
Get Plan instance from dict¶
"""Get plan from dictionary."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import get_plan_from_dict
SETTINGS = {
'nodes': [
{
'id': 'a',
'class': 'WorkNode',
'work': {
'descr': "First step",
'worker': {
'class': "workers.Linear",
'params': {
'k': 1,
'b': 1,
}
}
}
},
{
'id': 'b',
'class': 'WorkNode',
'work': {
'descr': "Second step",
'worker': {
'function': "numpy.exp"
}
},
'inputs': ['a'],
},
{
'id': 'c',
'class': 'WorkNode',
'work': {
'descr': "Third step",
'worker': {
'class': "workers.Inc"
}
},
'inputs': ['b'],
},
{
'id': 'd',
'class': 'PackNode',
'inputs': ['b', 'c'],
'result': 'Result value'
}
],
'inputs': ['a'],
'outputs': ['d'],
}
def main():
"""Run example."""
plan = get_plan_from_dict(SETTINGS)
x = 1
y = plan([x])
print(y)
if __name__ == "__main__":
main()
Quick plan for on-line processing¶
"""Online plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import Plan, WorkNode
from dsplab.flow.activity import Work
from workers import Inc
def main():
"""Run example."""
node_1 = WorkNode(work=Work("Step 1", worker=Inc()))
node_2 = WorkNode(work=Work("Step 2", worker=Inc()))
node_3 = WorkNode(work=Work("Step 3", worker=Inc()))
plan = Plan(quick=True)
plan.add_node(node_1)
plan.add_node(node_2, inputs=[node_1])
plan.add_node(node_3, inputs=[node_2])
plan.inputs = [node_1]
plan.outputs = [node_3]
plan.reduce_calls()
xs = [1, 2, 3, 4, 5]
for x in xs:
y = plan([x])[0]
print("{} -> {}".format(x, y))
if __name__ == "__main__":
main()
Members¶
This module implements the Node and Plan classes. Node can be understood as the workplace for worker. Node can have inputs that are also nodes. Plan is the system of linked nodes.
-
class
dsplab.flow.plan.MapNode(work=None, inputs=None)[source]¶ Bases:
dsplab.flow.plan.WorkNodeApply work to all components of iterable input and build iterable output.
-
class
dsplab.flow.plan.Node(inputs=None)[source]¶ Bases:
dsplab.flow.activity.ActivityBase class for nodes.
-
inputs¶ Return inputs.
-
node_id¶ ID of node.
-
result_info¶ Information about result
-
-
class
dsplab.flow.plan.PackNode(inputs=None)[source]¶ Bases:
dsplab.flow.plan.NodePack input to output.
-
class
dsplab.flow.plan.PassNode(inputs=None)[source]¶ Bases:
dsplab.flow.plan.NodePass input to output.
-
class
dsplab.flow.plan.Plan(descr=None, quick=False)[source]¶ Bases:
dsplab.flow.activity.ActivityThe plan. Plan is the system of linked nodes.
-
descr¶ Description of plan
-
inputs¶ The nodes which are inputs.
-
outputs¶ The nodes wich are outputs.
-
-
class
dsplab.flow.plan.SelectNode(index, inputs=None)[source]¶ Bases:
dsplab.flow.plan.NodeSelect component of output.
-
class
dsplab.flow.plan.WorkNode(work=None, inputs=None)[source]¶ Bases:
dsplab.flow.plan.NodeNode with work.
-
work¶ Work in node
-
-
dsplab.flow.plan.get_plan_from_dict(settings, params=None)[source]¶ Create and return instance of Plan described in dictionary.
Parameters: - setting (dict) – Dictionary with plan.
- params (dict) – Dictionary with parameters like “$name” for plan.
Returns: - Plan – The instance of Plan.
- **Keys in settings**
- - ‘descr’ - description of the plan (optional)
- - ‘nodes’ - list of dicts with nodes settings
- - ‘inputs’ - list of inputs nodes ids
- - ‘outputs’ - list of output nodes ids
- **Common settings for nodes**
- - ‘id’ - id of node
- - ‘inputs’ - list of ids of input nodes for this node
- - ‘result’ - result description
- **Settings for WorkNode and MapNode**
- - ‘work’ - dict with work settings
- **Settings for PackNode**
- - ‘index’ - index of selected item