activity

This module implements the base classes for Activities.

class dsplab.flow.activity.Activity[source]

Bases: object

Any activity is the something that may be called and can provide the information about itself. To get working activity the __call__ method must be implemented.

info(as_string=None)[source]

Deprecated.

set_descr(descr)[source]

Deprecated.

class dsplab.flow.activity.ActivityMeta(name, bases, attrs)[source]

Bases: type

Metaclass for Activity.

class_info()[source]

Return the information about activity.

Returns:Information about class of activity.
Return type:dict
class dsplab.flow.activity.Work(descr=None, worker=None)[source]

Bases: dsplab.flow.activity.Activity

Work is data processing that can be done in a variety of ways.

descr

Description of work

get_descr()[source]

Return description.

set_descr(descr)[source]

Set description.

set_worker(act)[source]

Set worker for doing work. Worker must be callable.

class dsplab.flow.activity.Worker[source]

Bases: dsplab.flow.activity.Activity

Deprecated.

add_param(name, value=None)[source]

Deprecated.

dsplab.flow.activity.get_work_from_dict(settings, params=None)[source]

Create and return Work instance described in dictionary.

online

This module implements the base class for online filters.

class dsplab.flow.online.And[source]

Bases: dsplab.flow.activity.Activity

And operation.

class dsplab.flow.online.Delayer(ntaps, fill_with=0)[source]

Bases: dsplab.flow.online.QueueFilter

Provide delay in online processing.

proc_queue()[source]

Process queue.

class dsplab.flow.online.OnlineFilter(ntaps=None, smooth_ntaps=None, fill_with=0, step=1)[source]

Bases: dsplab.flow.activity.Activity

Universal online filter.

Parameters:
  • ntaps (int) – Length of internal queue using for accumulation of input samples. Default is None.
  • smooth_ntaps (int) – Length of queue using for smoothing output values. Default id None.
  • fill_with (object) – Initial value of every element of queues.
  • step (int) – Step. Must be positive.
proc_queue()[source]

Process queue.

Returns:Ouput value.
Return type:object
proc_sample(sample)[source]

Process sample.

Parameters:sample (object) – Input sample.
Returns:Output value.
Return type:object
class dsplab.flow.online.Or[source]

Bases: dsplab.flow.activity.Activity

Or operation.

class dsplab.flow.online.QueueFilter(ntaps, fill_with=0)[source]

Bases: dsplab.flow.activity.Activity

Online filter with queue.

Parameters:
  • ntaps (int) – Lenght of filter.
  • fill_with (object) – Initial value of every element of queue.
proc_queue()[source]

Process queue.

dsplab.flow.online.unwrap_point(phi)[source]

Unwrap angle (for signle value).

plan

Examples

Helper module with workers

"""Workers for examples."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Activity


class Linear(Activity):
    """Linear transformation: y = k*x + b."""
    def __init__(self, k, b):
        super().__init__()
        self.k = k
        self.b = b

    def __call__(self, x):
        y = x*self.k + self.b
        return y


class Sum(Activity):
    """Sum."""
    def __call__(self, *xs):
        y = sum(xs)
        return y


class Inc(Activity):
    """Add 1 to value."""
    def __init__(self):
        super().__init__()

    def __call__(self, x):
        y = x + 1
        return y


class DoNothing(Activity):
    """Just pass input to output."""
    def __init__(self):
        super().__init__()

    def __call__(self, x):
        return x

Basic usage

"""Basic usage of plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan
from workers import Linear


def main():
    """Run example."""
    plan = Plan()
    node_a = WorkNode(work=Work("Linear transformation", worker=Linear(1, 1)))
    node_b = WorkNode(work=Work("Linear transformation", worker=Linear(2, 2)))
    node_c = WorkNode(work=Work("Linear transformation", worker=Linear(3, 3)))
    plan.add_node(node_a)
    plan.add_node(node_b, inputs=[node_a])
    plan.add_node(node_c, inputs=[node_b])
    plan.inputs = [node_a]
    plan.outputs = [node_c, node_b]

    print(plan([5]))


if __name__ == "__main__":
    main()

Using of start and stop hooks

"""Start and stop hooks."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan


def func(x):
    """Worker."""
    return x + 1


def start_handler(node):
    """Node start handler."""
    print("'{}' started".format(node.work.descr))


def stop_handler(node):
    """Node stop handler."""
    print("'{}' finished".format(node.work.descr))


def progress_handler():
    """Progress handler."""
    print("Calculated one node.")


def main():
    """Entry point."""
    print(__doc__)
    node = WorkNode(work=Work("Increment", worker=func))
    node.set_start_hook(start_handler, node)
    node.set_stop_hook(stop_handler, node)
    plan = Plan()
    plan.add_node(node)
    plan.set_progress_hook(progress_handler)
    plan.inputs = [node]
    plan.outputs = [node]
    plan([5])


if __name__ == "__main__":
    main()

MapNode (applying work for iterable input)

"""Mapping."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import MapNode, WorkNode, Plan
from workers import Sum, DoNothing


def main():
    """Run example."""
    plan = Plan()

    pass_node_1 = WorkNode(
        Work("Pass", worker=DoNothing())
    )
    pass_node_2 = WorkNode(
        Work("Pass", worker=DoNothing())
    )

    map_node = MapNode(
        work=Work("Transformation", worker=Sum()),
        inputs=[pass_node_1, pass_node_2]
    )

    plan.add_node(pass_node_1)
    plan.add_node(pass_node_2)
    plan.add_node(map_node)
    plan.inputs = [pass_node_1, pass_node_2]
    plan.outputs = [map_node]

    res = plan([
        [1, 1, 1],
        [2, 2, 2],
    ])
    print("Outputs:", res)


if __name__ == "__main__":
    main()

PackNode (pack inputs to list)

"""Pack inputs to list."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, PackNode, Plan
from workers import DoNothing


def main():
    """Run example."""
    print(__doc__)
    plan = Plan()
    node_1 = WorkNode(Work("Pass", worker=DoNothing()))
    node_2 = WorkNode(Work("Pass", worker=DoNothing()))
    node_3 = WorkNode(Work("Pass", worker=DoNothing()))
    pack_node_1 = PackNode()
    pack_node_2 = PackNode()

    plan.add_node(node_1)
    plan.add_node(node_2)
    plan.add_node(node_3)
    plan.add_node(pack_node_1, inputs=[node_1, node_2])
    plan.add_node(pack_node_2, inputs=[node_2, node_3])

    plan.inputs = [node_1, node_2, node_3]
    plan.outputs = [pack_node_1, pack_node_2]

    print(plan([1, 2, 3]))


if __name__ == "__main__":
    main()

SelectNode

"""Using of SelectNode with multiple input."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import SelectNode, WorkNode, Plan
from workers import DoNothing


def main():
    """Run example."""
    plan = Plan()

    pass_node_1 = WorkNode(Work(descr="Pass", worker=DoNothing()))
    pass_node_2 = WorkNode(Work(descr="Pass", worker=DoNothing()))
    select_node_m = SelectNode(index=0)
    select_node_s = SelectNode(index=0)

    plan.add_node(pass_node_1)
    plan.add_node(pass_node_2)
    plan.add_node(select_node_m,
                  inputs=[pass_node_1, pass_node_2])
    plan.add_node(select_node_s,
                  inputs=[pass_node_1])
    plan.inputs = [pass_node_1, pass_node_2]
    plan.outputs = [select_node_m, select_node_s]

    res = plan([
        [1, 2, 3],
        [2, 3, 4]
    ])
    print("Outputs: ", res)


if __name__ == "__main__":
    main()

Node-generator

‘Node-generator’ means the no input node with no inputs.

"""Node may not have inputs."""
import os
import sys
from random import randint

sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.activity import Work
from dsplab.flow.plan import WorkNode, Plan


def gen():
    """Generate random number."""
    y = randint(1, 10)
    print("gen -> {}".format(y))
    return y


def inc(x):
    """Increment."""
    y = x + 1
    print("{} -> inc -> {}".format(x, y))
    return y


def plus(x1, x2):
    """Sum of two numbers."""
    y = x1 + x2
    print("{}, {} -> plus -> {}".format(x1, x2, y))
    return y


def main():
    """Run example."""
    p = Plan()
    g = WorkNode(Work("Generate random number", gen))
    a = WorkNode(Work("Add 1", inc))
    b = WorkNode(Work("Summation", plus))
    p.add_node(g)
    p.add_node(a)
    p.add_node(b, inputs=[g, a])
    p.inputs = [a]
    p.outputs = [b]

    x = [1]
    print(x)
    y = p(x)
    print(y)


if __name__ == "__main__":
    main()

Get Plan instance from dict

"""Get plan from dictionary."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import get_plan_from_dict


SETTINGS = {
    'nodes': [
        {
            'id': 'a',
            'class': 'WorkNode',
            'work': {
                'descr': "First step",
                'worker': {
                    'class': "workers.Linear",
                    'params': {
                        'k': 1,
                        'b': 1,
                    }
                }
            }
        },

        {
            'id': 'b',
            'class': 'WorkNode',
            'work': {
                'descr': "Second step",
                'worker': {
                    'function': "numpy.exp"
                }
            },
            'inputs': ['a'],
        },

        {
            'id': 'c',
            'class': 'WorkNode',
            'work': {
                'descr': "Third step",
                'worker': {
                    'class': "workers.Inc"
                }
            },
            'inputs': ['b'],
        },

        {
            'id': 'd',
            'class': 'PackNode',
            'inputs': ['b', 'c'],
            'result': 'Result value'
        }
    ],

    'inputs': ['a'],
    'outputs': ['d'],
}


def main():
    """Run example."""
    plan = get_plan_from_dict(SETTINGS)
    x = 1
    y = plan([x])
    print(y)


if __name__ == "__main__":
    main()

Quick plan for on-line processing

"""Online plan."""
import os
import sys
sys.path.insert(0, os.path.abspath('.'))
from dsplab.flow.plan import Plan, WorkNode
from dsplab.flow.activity import Work
from workers import Inc


def main():
    """Run example."""
    node_1 = WorkNode(work=Work("Step 1", worker=Inc()))
    node_2 = WorkNode(work=Work("Step 2", worker=Inc()))
    node_3 = WorkNode(work=Work("Step 3", worker=Inc()))
    plan = Plan(quick=True)
    plan.add_node(node_1)
    plan.add_node(node_2, inputs=[node_1])
    plan.add_node(node_3, inputs=[node_2])
    plan.inputs = [node_1]
    plan.outputs = [node_3]

    plan.reduce_calls()
    xs = [1, 2, 3, 4, 5]
    for x in xs:
        y = plan([x])[0]
        print("{} -> {}".format(x, y))


if __name__ == "__main__":
    main()

Members

This module implements the Node and Plan classes. Node can be understood as the workplace for worker. Node can have inputs that are also nodes. Plan is the system of linked nodes.

class dsplab.flow.plan.MapNode(work=None, inputs=None)[source]

Bases: dsplab.flow.plan.WorkNode

Apply work to all components of iterable input and build iterable output.

class dsplab.flow.plan.Node(inputs=None)[source]

Bases: dsplab.flow.activity.Activity

Base class for nodes.

clear_result()[source]

Clear the result.

get_id()[source]

Return ID of node.

get_inputs()[source]

Return inputs.

get_result()[source]

Return the calculated data.

get_result_info()[source]

Return result info.

inputs

Return inputs.

is_inputs_ready()[source]

Check if data in all inputs is ready.

is_output_ready()[source]

Check if the calculation in the node is finished.

node_id

ID of node.

reset()[source]

Deprecated.

result_info

Information about result

run_start_hook()[source]

Run function associated with start hook.

run_stop_hook()[source]

Run function associated with stop hook.

set_id(value)[source]

Set ID for node.

set_inputs(inputs)[source]

Set inputs.

set_result_info(info)[source]

Appent to info the desctription of the output data.

set_start_hook(func, *args, **kwargs)[source]

Set start hook.

set_stop_hook(func, *args, **kwargs)[source]

Set stop hook.

class dsplab.flow.plan.PackNode(inputs=None)[source]

Bases: dsplab.flow.plan.Node

Pack input to output.

class dsplab.flow.plan.PassNode(inputs=None)[source]

Bases: dsplab.flow.plan.Node

Pass input to output.

class dsplab.flow.plan.Plan(descr=None, quick=False)[source]

Bases: dsplab.flow.activity.Activity

The plan. Plan is the system of linked nodes.

add_node(node, inputs=None)[source]

Add node to plan.

clear()[source]

Clear plan.

descr

Description of plan

get_descr()[source]

Return description of plan.

get_inputs()[source]

Return input nodes.

get_nodes()[source]

Return the list of nodes.

get_outputs()[source]

Return output nodes.

inputs

The nodes which are inputs.

outputs

The nodes wich are outputs.

quick_run(data)[source]

Sequential execution of plan with no hooks (for on-line quick processing).

reduce_calls()[source]

Reduce call chains for all nodes. Recommended before run quick plans.

remove_node(node)[source]

Remove node from plan.

run(data)[source]

Run plan.

set_descr(descr)[source]

Set description of plan.

set_inputs(inputs)[source]

Set input nodes.

set_outputs(outputs)[source]

Set output nodes.

set_progress_hook(func)[source]

Set progress handler.

set_quick(value=True)[source]

Make plan quick (for online with no hooks) or not.

verify()[source]

Verify plan.

Returns:
  • bool – True if success, False otherwise.
  • str – Empty string or description of error.
class dsplab.flow.plan.SelectNode(index, inputs=None)[source]

Bases: dsplab.flow.plan.Node

Select component of output.

class dsplab.flow.plan.WorkNode(work=None, inputs=None)[source]

Bases: dsplab.flow.plan.Node

Node with work.

get_work()[source]

Return work of the node.

reduce_call()[source]

Try to reduce call chain.

set_work(work)[source]

Set work for the node.

work

Work in node

dsplab.flow.plan.get_plan_from_dict(settings, params=None)[source]

Create and return instance of Plan described in dictionary.

Parameters:
  • setting (dict) – Dictionary with plan.
  • params (dict) – Dictionary with parameters like “$name” for plan.
Returns:

  • Plan – The instance of Plan.
  • **Keys in settings**
  • - ‘descr’ - description of the plan (optional)
  • - ‘nodes’ - list of dicts with nodes settings
  • - ‘inputs’ - list of inputs nodes ids
  • - ‘outputs’ - list of output nodes ids
  • **Common settings for nodes**
  • - ‘id’ - id of node
  • - ‘inputs’ - list of ids of input nodes for this node
  • - ‘result’ - result description
  • **Settings for WorkNode and MapNode**
  • - ‘work’ - dict with work settings
  • **Settings for PackNode**
  • - ‘index’ - index of selected item

Verification

Verification of the plan.

exception dsplab.flow.verify.VerifyError[source]

Bases: Exception

Verification error.

dsplab.flow.verify.check_plan(plan_dict)[source]

Check plan’s dictionary.