# Copyright (C) 2017-2021 Aleksandr Popov, Kirill Butin
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""This module implements the base class for online filters."""
from math import pi
from collections import deque
import numpy as np
from dsplab.flow.activity import Activity
PI = pi
PI2 = 2 * PI
[docs]def unwrap_point(phi):
"""Unwrap angle (for signle value)."""
if phi < -pi:
return phi + (1 + int(phi / -pi)) * pi
if phi > pi:
return phi - (1 + int(phi / pi)) * pi
return phi
[docs]class QueueFilter(Activity):
"""Online filter with queue.
Parameters
----------
ntaps: int
Lenght of filter.
fill_with: object
Initial value of every element of queue.
"""
def __init__(self, ntaps, fill_with=0):
super().__init__()
self.queue = deque([fill_with]*ntaps, maxlen=ntaps)
self.ntaps = ntaps
def __call__(self, *args, **kwargs):
"""Add sample to queue."""
return self.__call(*args, **kwargs)
def __call(self, sample):
self.queue.append(sample)
return self.proc_queue()
[docs] def proc_queue(self):
"""Process queue."""
raise NotImplementedError
[docs]class Delayer(QueueFilter):
"""Provide delay in online processing."""
[docs] def proc_queue(self):
return self.queue[0]
[docs]class And(Activity):
"""And operation."""
def __call__(self, *args, **kwargs):
"""Do operation.
Parameters
----------
sample: array_like of floats
Input values.
"""
return self.__call(*args, **kwargs)
def __call(self, sample):
res = 1
for value in sample:
res *= value
return res
[docs]class Or(Activity):
"""Or operation."""
def __call__(self, sample):
"""Do operation.
Parameters
----------
sample: array_like of floats
Input values.
"""
res = 0
for value in sample:
res += value * (1 - res)
return res
[docs]class OnlineFilter(Activity):
"""Universal online filter.
Parameters
----------
ntaps: int
Length of internal queue using for accumulation of input
samples. Default is None.
smooth_ntaps: int
Length of queue using for smoothing output values. Default
id None.
fill_with: object
Initial value of every element of queues.
step: int
Step. Must be positive.
"""
def __init__(self, ntaps=None, smooth_ntaps=None, fill_with=0, step=1):
super().__init__()
self.add_sample_func = None
if (ntaps is None) and (smooth_ntaps is None):
self.add_sample_func = self.__add_sample_simple
elif (ntaps is not None) and (smooth_ntaps is None):
self.add_sample_func = self.__add_sample_only_queue
elif (ntaps is None) and (smooth_ntaps is not None):
self.add_sample_func = self.__add_sample_only_smooth
else:
self.add_sample_func = self.__add_sample_full
if ntaps is not None:
self.queue = deque([fill_with]*ntaps, maxlen=ntaps)
if smooth_ntaps is not None:
self.smooth_queue = deque(
[fill_with]*smooth_ntaps,
maxlen=smooth_ntaps
)
wind = np.hamming(smooth_ntaps)
self.wind = wind / sum(wind)
self.step = step
self.steps = 0
self.ntaps = ntaps
self.smooth_ntaps = smooth_ntaps
def __call__(self, *args, **kwargs):
"""Add input sample to filter and return output value.
Parameters
----------
sample: object
Input sample.
Returns
-------
: object
Output value.
"""
self.__call(*args, **kwargs)
def __call(self, sample):
return self.add_sample_func(sample)
def __add_sample_simple(self, sample):
"""Add sample without using queues."""
self.steps += 1
if self.steps == self.step:
self.steps = 0
return self.proc_sample(sample)
return None
def __add_sample_only_queue(self, sample):
"""Add sample with no smoothing."""
self.steps += 1
self.queue.append(sample)
if self.steps == self.step:
self.steps = 0
return self.proc_queue()
return None
def __add_sample_only_smooth(self, sample):
"""Add sample with not internal queue but with smoothed
ouput."""
self.steps += 1
if self.steps == self.step:
self.steps = 0
self.smooth_queue.append(self.proc_sample(sample))
resm = np.dot(np.array(self.smooth_queue), self.wind)
return resm
return None
def __add_sample_full(self, sample):
"""Add sample with internal queue and smoothing of ouput
values."""
self.steps += 1
self.queue.append(sample)
if self.steps == self.step:
self.steps = 0
self.smooth_queue.append(self.proc_queue())
resm = np.dot(np.array(self.smooth_queue), self.wind)
return resm
return None
[docs] def proc_queue(self):
"""Process queue.
Returns
-------
: object
Ouput value.
"""
[docs] def proc_sample(self, sample):
"""Process sample.
Parameters
----------
sample: object
Input sample.
Returns
-------
: object
Output value.
"""