activity

This module implements the base classes for Activities.

class dsplab.flow.activity.Activity[source]

Bases: object

Any activity is the something that may be called and can provide the information about itself. To get working activity the __call__ method must be implemented.

info(as_string=None)[source]

Deprecated.

set_descr(descr)[source]

Deprecated.

class dsplab.flow.activity.ActivityMeta(name, bases, attrs)[source]

Bases: type

Metaclass for Activity.

class_info()[source]

Return the information about activity.

Returns:Information about class of activity.
Return type:dict
class dsplab.flow.activity.Work(descr=None, worker=None)[source]

Bases: dsplab.flow.activity.Activity

Work is data processing that can be done in a variety of ways.

descr

Description of work

get_descr()[source]

Return description.

set_descr(descr)[source]

Set description.

set_worker(act)[source]

Set worker for doing work. Worker must be callable.

class dsplab.flow.activity.Worker[source]

Bases: dsplab.flow.activity.Activity

Deprecated.

add_param(name, value=None)[source]

Deprecated.

dsplab.flow.activity.get_work_from_dict(settings, params=None)[source]

Create and return Work instance described in dictionary.

online

This module implements the base class for online filters.

class dsplab.flow.online.And[source]

Bases: dsplab.flow.activity.Activity

And operation.

class dsplab.flow.online.Delayer(ntaps, fill_with=0)[source]

Bases: dsplab.flow.online.QueueFilter

Provide delay in online processing.

proc_queue()[source]

Process queue.

class dsplab.flow.online.OnlineFilter(ntaps=None, smooth_ntaps=None, fill_with=0, step=1)[source]

Bases: dsplab.flow.activity.Activity

Universal online filter.

Parameters:
  • ntaps (int) – Length of internal queue using for accumulation of input samples. Default is None.
  • smooth_ntaps (int) – Length of queue using for smoothing output values. Default id None.
  • fill_with (object) – Initial value of every element of queues.
  • step (int) – Step. Must be positive.
proc_queue()[source]

Process queue.

Returns:Ouput value.
Return type:object
proc_sample(sample)[source]

Process sample.

Parameters:sample (object) – Input sample.
Returns:Output value.
Return type:object
class dsplab.flow.online.Or[source]

Bases: dsplab.flow.activity.Activity

Or operation.

class dsplab.flow.online.QueueFilter(ntaps, fill_with=0)[source]

Bases: dsplab.flow.activity.Activity

Online filter with queue.

Parameters:
  • ntaps (int) – Lenght of filter.
  • fill_with (object) – Initial value of every element of queue.
proc_queue()[source]

Process queue.

dsplab.flow.online.unwrap_point(phi)[source]

Unwrap angle (for signle value).

plan

Examples

Helper module with workers

"""Workers for examples."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Activity


class Linear(Activity):
    """Linear transformation: y = k*x + b."""
    def __init__(self, k, b):
        super().__init__()
        self.k = k
        self.b = b

    def __call__(self, x):
        y = x*self.k + self.b
        return y


class Sum(Activity):
    """Sum."""
    def __call__(self, *xs):
        y = sum(xs)
        return y


class Inc(Activity):
    """Add 1 to value."""
    def __init__(self):
        super().__init__()

    def __call__(self, x):
        y = x + 1
        return y


class DoNothing(Activity):
    """Just pass input to output."""
    def __init__(self):
        super().__init__()

    def __call__(self, x):
        return x

Basic usage

"""Basic usage of plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
from workers import Linear


def main():
    """Run example."""
    plan = Plan()
    node_a = WorkNode(work=Work("Linear transformation", worker=Linear(1, 1)))
    node_b = WorkNode(work=Work("Linear transformation", worker=Linear(2, 2)))
    node_c = WorkNode(work=Work("Linear transformation", worker=Linear(3, 3)))
    plan.add_node(node_a)
    plan.add_node(node_b, inputs=[node_a])
    plan.add_node(node_c, inputs=[node_b])
    plan.inputs = [node_a]
    plan.outputs = [node_c, node_b]

    print(plan([5]))


if __name__ == "__main__":
    main()

Using of start and stop hooks

"""Start and stop hooks."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan


def func(x):
    """Worker."""
    return x + 1


def start_handler(node):
    """Node start handler."""
    print("'{}' started".format(node.work.descr))


def stop_handler(node):
    """Node stop handler."""
    print("'{}' finished".format(node.work.descr))


def progress_handler():
    """Progress handler."""
    print("Calculated one node.")


def main():
    """Entry point."""
    print(__doc__)
    node = WorkNode(work=Work("Increment", worker=func))
    node.set_start_hook(start_handler, node)
    node.set_stop_hook(stop_handler, node)
    plan = Plan()
    plan.add_node(node)
    plan.set_progress_hook(progress_handler)
    plan.inputs = [node]
    plan.outputs = [node]
    plan([5])


if __name__ == "__main__":
    main()

MapNode (applying work for iterable input)

"""Mapping."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import MapNode, WorkNode, Plan
from workers import Sum, DoNothing


def main():
    """Run example."""
    plan = Plan()

    pass_node_1 = WorkNode(
        Work("Pass", worker=DoNothing())
    )
    pass_node_2 = WorkNode(
        Work("Pass", worker=DoNothing())
    )

    map_node = MapNode(
        work=Work("Transformation", worker=Sum()),
        inputs=[pass_node_1, pass_node_2]
    )

    plan.add_node(pass_node_1)
    plan.add_node(pass_node_2)
    plan.add_node(map_node)
    plan.inputs = [pass_node_1, pass_node_2]
    plan.outputs = [map_node]

    res = plan([
        [1, 1, 1],
        [2, 2, 2],
    ])
    print("Outputs:", res)


if __name__ == "__main__":
    main()

PackNode (pack inputs to list)

"""Pack inputs to list."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, PackNode, Plan
from workers import DoNothing


def main():
    """Run example."""
    print(__doc__)
    plan = Plan()
    node_1 = WorkNode(Work("Pass", worker=DoNothing()))
    node_2 = WorkNode(Work("Pass", worker=DoNothing()))
    node_3 = WorkNode(Work("Pass", worker=DoNothing()))
    pack_node_1 = PackNode()
    pack_node_2 = PackNode()

    plan.add_node(node_1)
    plan.add_node(node_2)
    plan.add_node(node_3)
    plan.add_node(pack_node_1, inputs=[node_1, node_2])
    plan.add_node(pack_node_2, inputs=[node_2, node_3])

    plan.inputs = [node_1, node_2, node_3]
    plan.outputs = [pack_node_1, pack_node_2]

    print(plan([1, 2, 3]))


if __name__ == "__main__":
    main()

SelectNode

"""Using of SelectNode with multiple input."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import SelectNode, WorkNode, Plan
from workers import DoNothing


def main():
    """Run example."""
    plan = Plan()

    pass_node_1 = WorkNode(Work(descr="Pass", worker=DoNothing()))
    pass_node_2 = WorkNode(Work(descr="Pass", worker=DoNothing()))
    select_node_m = SelectNode(index=0)
    select_node_s = SelectNode(index=0)

    plan.add_node(pass_node_1)
    plan.add_node(pass_node_2)
    plan.add_node(select_node_m,
                  inputs=[pass_node_1, pass_node_2])
    plan.add_node(select_node_s,
                  inputs=[pass_node_1])
    plan.inputs = [pass_node_1, pass_node_2]
    plan.outputs = [select_node_m, select_node_s]

    res = plan([
        [1, 2, 3],
        [2, 3, 4]
    ])
    print("Outputs: ", res)


if __name__ == "__main__":
    main()

Node-generator

‘Node-generator’ means the no input node with no inputs.

"""Node may not have inputs."""
import os
import sys
from random import randint

sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan


def gen():
    """Generate random number."""
    y = randint(1, 10)
    print("gen -> {}".format(y))
    return y


def inc(x):
    """Increment."""
    y = x + 1
    print("{} -> inc -> {}".format(x, y))
    return y


def plus(x1, x2):
    """Sum of two numbers."""
    y = x1 + x2
    print("{}, {} -> plus -> {}".format(x1, x2, y))
    return y


def main():
    """Run example."""
    p = Plan()
    g = WorkNode(Work("Generate random number", gen))
    a = WorkNode(Work("Add 1", inc))
    b = WorkNode(Work("Summation", plus))
    p.add_node(g)
    p.add_node(a)
    p.add_node(b, inputs=[g, a])
    p.inputs = [a]
    p.outputs = [b]

    x = [1]
    print(x)
    y = p(x)
    print(y)


if __name__ == "__main__":
    main()

Get Plan instance from dict

"""Get plan from dictionary."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import get_plan_from_dict


SETTINGS = {
    'nodes': [
        {
            'id': 'a',
            'class': 'WorkNode',
            'work': {
                'descr': "First step",
                'worker': {
                    'class': "workers.Linear",
                    'params': {
                        'k': 1,
                        'b': 1,
                    }
                }
            }
        },

        {
            'id': 'b',
            'class': 'WorkNode',
            'work': {
                'descr': "Second step",
                'worker': {
                    'function': "numpy.exp"
                }
            },
            'inputs': ['a'],
        },

        {
            'id': 'c',
            'class': 'WorkNode',
            'work': {
                'descr': "Third step",
                'worker': {
                    'class': "workers.Inc"
                }
            },
            'inputs': ['b'],
        },

        {
            'id': 'd',
            'class': 'PackNode',
            'inputs': ['b', 'c'],
            'result': 'Result value'
        }
    ],

    'inputs': ['a'],
    'outputs': ['d'],
}


def main():
    """Run example."""
    plan = get_plan_from_dict(SETTINGS)
    x = 1
    y = plan([x])
    print(y)


if __name__ == "__main__":
    main()

Quick plan for on-line processing

"""Online plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import Plan, WorkNode
from dsplab.flow.activity import Work
from workers import Inc


def main():
    """Run example."""
    node_1 = WorkNode(work=Work("Step 1", worker=Inc()))
    node_2 = WorkNode(work=Work("Step 2", worker=Inc()))
    node_3 = WorkNode(work=Work("Step 3", worker=Inc()))
    plan = Plan(quick=True)
    plan.add_node(node_1)
    plan.add_node(node_2, inputs=[node_1])
    plan.add_node(node_3, inputs=[node_2])
    plan.inputs = [node_1]
    plan.outputs = [node_3]

    plan.reduce_calls()
    xs = [1, 2, 3, 4, 5]
    for x in xs:
        y = plan([x])[0]
        print("{} -> {}".format(x, y))


if __name__ == "__main__":
    main()

Members

Verification of plan