# Copyright (C) 2017-2021 Aleksandr Popov, Kirill Butin
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Functions for modulation and demodulation."""
from math import pi, cos
import numpy as np
from numpy import unwrap, angle, diff
import scipy.signal as sig
PI = pi
_2PI = 2 * PI
[docs]def harm(length, sample_rate, amp, freq, phi=0,
noise_a=None, noise_f=None):
"""Generate harmonic signal.
Parameters
----------
length: float
Length pf signal (sec).
sample_rate: float
Sampling frequency (Hz).
amp: float
Amplitude of signal.
freq: Object
Frequency of signal (Hz).
phi: float
Initial phase (radians).
noise_a: callable
Returns noise value added to amplitude.
noise_f: callable
Returns noise value added to frequency.
Returns
-------
: np.array
Signal values.
: np.array
Time values.
"""
times = np.arange(0, length, 1 / sample_rate)
values = []
for time_value in times:
amp_value = amp
if noise_a is not None:
amp_value += noise_a(time_value)
arg = _2PI * freq * time_value + phi
if noise_f is not None:
arg += noise_f(time_value)
values.append(amp_value * cos(arg))
return np.array(values), times
[docs]def amp_mod(length, sample_rate, func, freq, phi=0,
noise_f=None, noise_a=None):
"""Amplitude modulation.
Parameters
----------
length: float
Length pf signal (sec).
sample_rate: float
Sampling frequency (Hz).
freq: float
Frequency of signal (Hz).
phi: float
Initial phase (radians).
func: Object
Function that returns amplitude value depending on time.
noise_f: Object
Function that returns noise value added to frequency.
noise_a: Object
Function that returns noise value added to amplitude.
Returns
-------
: np.array
Signal values.
: np.array
Time values.
"""
full_phase = phi
delta_ph = _2PI * freq / sample_rate
sampling_period = 1.0 / sample_rate
times = np.arange(0, length + sampling_period, sampling_period)
values = []
for time_value in times:
if noise_f is None:
value = func(time_value) * cos(full_phase)
else:
value = func(time_value) * cos(full_phase + noise_f(time_value))
if noise_a is not None:
value += noise_a(time_value)
values.append(value)
full_phase += delta_ph
values = np.array(values)
return values, times
[docs]def freq_mod(length, sample_rate, amp, func, phi=0,
noise_f=None, noise_a=None):
"""Amplitude modulation.
Parameters
----------
length: float
Length pf signal (sec).
sample_rate: float
Sampling frequency (Hz).
amp: float
Amplitude of signal.
phi: float
Initial phase (radians).
func: Object
Function that returns frequency values (in Hz) depending on time.
noise_f: Object
Function that returns noise value added to frequency.
noise_a: Object
Function that returns noise value added to amplitude.
Returns
-------
: np.array
Signal values.
: np.array
Full phase values.
: np.array
Time values.
"""
full_phase = phi
times = np.arange(0, length + 1.0 / sample_rate, 1.0 / sample_rate)
values = []
phs = []
for time_value in times:
arg = full_phase
if noise_f is not None:
arg += noise_f(time_value)
value = amp * cos(arg)
if noise_a is not None:
value += noise_a(time_value)
values.append(value)
phs.append(full_phase)
delta_ph = _2PI * func(time_value) / sample_rate
full_phase += delta_ph
values = np.array(values)
phs = np.array(phs)
return values, phs, times
[docs]def phase_mod(length, sample_rate, amp, freq, func,
noise_f=None, noise_a=None):
"""Phase modulation.
Parameters
----------
length: float
Length pf signal (sec).
sample_rate: float
Sampling frequency (Hz).
amp: float
Amplitude of signal.
freq: float
Frequency of signal (Hz).
func: Object
Function that returns phase values (in radians) depending on time.
noise_f: Object
Function that returns noise value added to frequency.
noise_a: Object
Function that returns noise value added to amplitude.
Returns
-------
: np.array
Signal values.
: np.array
Time values.
"""
sampling_period = 1.0 / sample_rate
times = np.arange(0, length + sampling_period, sampling_period)
values = []
for time_value in times:
arg = _2PI * freq * time_value + func(time_value)
if noise_f is not None:
arg += noise_f(time_value)
value = amp * cos(arg)
if noise_a is not None:
value += noise_a(time_value)
values.append(value)
values = np.array(values)
return values, times
[docs]def freq_amp_mod(length, sample_rate, a_func, f_func, phi=0):
"""Simultaneous frequency and amplitude modulation.
Parameters
----------
length: float
Length pf signal (sec).
sample_rate: float
Sampling frequency (Hz).
a_func: Object
Function that returns amplitude value depending on time.
f_func: Object
Function that returns frequency values (in Hz) depending on time.
phi: float
Initial phase (radians).
Returns
-------
: np.array
Signal values.
: np.array
Full phase values.
: np.array
Time values.
"""
full_phase = phi
sampling_period = 1.0 / sample_rate
times = np.arange(0, length + sampling_period, sampling_period)
values = []
phs = []
for time_value in times:
arg = full_phase
values.append(a_func(time_value) * cos(arg))
delta_ph = _2PI * f_func(time_value) / sample_rate
phs.append(full_phase)
full_phase += delta_ph
values = np.array(values)
phs = np.array(phs)
return values, phs, times
[docs]def iq_demod(xdata, tdata, f_central, a_coeffs, b_coeffs):
"""Return instantaneous frequency of modulated signal using IQ processign.
Parameters
----------
xdata: array_like
Signal values.
tdata: array_like
Time values.
f_central: float
Carrier frequency.
a_coeffs: array_like
a values of filter.
b_coeffs: array_like
b values of filter.
Returns
-------
: np.ndarray of floats
Instantaneous frequency values.
: np.ndarray
Time values.
"""
muli = xdata * np.cos(_2PI * f_central * tdata)
mulq = xdata * np.sin(_2PI * f_central * tdata)
muli_low = sig.lfilter(b_coeffs, a_coeffs, muli)
mulq_low = sig.lfilter(b_coeffs, a_coeffs, mulq)
analytic = muli_low + 1j * mulq_low
phase = -unwrap(angle(analytic))
freq = diff(phase) / _2PI / (tdata[1] - tdata[0]) + f_central
return freq, tdata[:-1]
[docs]def envelope_by_extremums(xdata, sample_rate=1, tdata=None):
"""Calculate envelope by local extremums of signals.
Parameters
----------
xdata: array_like
Signal values.
sample_rate: float
Sampling frequency.
tdata: array_like
Time values. Use it for unregular discretized input signal.
Returns
--------
: np.array
Damping values.
: np.array
Time values.
"""
if tdata is None:
tdata = np.linspace(0, (len(xdata)-1)/sample_rate, len(xdata))
t_new = []
x_new = []
xabs = abs(xdata)
for x_l, x_c, x_r, t_c in zip(xabs[:-2], xabs[1:-1],
xabs[2:], tdata[1:-1]):
if (x_l < x_c) and (x_c >= x_r):
t_new.append(t_c)
x_new.append(x_c)
if xabs[-1] > xabs[-2]:
t_new.append(tdata[-1])
x_new.append(xabs[-1])
return np.array(x_new), np.array(t_new)
[docs]def digital_hilbert_filter(ntaps=101, window='hamming'):
"""Calculate digital hilbert filter.
Parameters
----------
ntaps: integer
Length of filter.
window: str
Window. Default is 'hamming'.
Returns
-------
: np.array
Filter.
"""
if ntaps % 2 == 0:
raise ValueError("ntaps of digital Hilbert filter must be odd.")
coeffs = np.zeros(ntaps)
num = ntaps // 2
for k in range(1, num+1, 2):
coeffs[num + k] = 2 / PI / k
coeffs[num - k] = -2 / PI / k
wind = sig.get_window(window, ntaps)
coeffs *= wind
return coeffs
[docs]def freq_by_extremums(xdata, sample_rate):
"""Calculate frequency of oscillating signal by extremums.
Parameters
----------
xdata: array_like
Values of input signals.
sample_rate: float
Sampling frequency (Hz).
Returns
-------
: float
Frequency.
"""
if len(xdata) < 3:
raise ValueError("Short signal")
T = len(xdata) / sample_rate
max_total = 0
min_total = 0
for prev, curr, nxt in zip(xdata[:-2], xdata[1:-1], xdata[2:]):
if (prev < curr) and (curr >= nxt):
max_total += 1
if (prev > curr) and (curr <= nxt):
min_total += 1
if xdata[0] > xdata[1]:
max_total += 1
if xdata[0] < xdata[1]:
min_total += 1
return (max_total + min_total) / 2 / T
[docs]def freq_by_zeros(xdata, sample_rate):
"""Calculate average frequency of detrended oscillating signal by
counting zeros."""
T = len(xdata) / sample_rate
zeros_total = 0
for prev, curr in zip(xdata[:-1], xdata[1:]):
if prev * curr < 0:
zeros_total += 1
elif prev != 0 and curr == 0:
zeros_total += 1
return zeros_total / 2 / T
[docs]def linint(xdata, tdata, ts_new):
"""Find values of xdata in ts_new points.
Parameters
----------
xdata: np.ndarray
Signal values.
tdata: np.ndarray
Time values.
ts_new: np.ndarray
New time values.
Returns
-------
: np.ndarray
New signal values.
"""
x_new = np.zeros(len(ts_new)) * np.nan
for x_p, t_p, x_c, t_c in zip(xdata[:-1], tdata[:-1],
xdata[1:], tdata[1:]):
k = (x_c - x_p) / (t_c - t_p)
b = x_p - k*t_p
ind = (ts_new >= t_p) & (ts_new <= t_c)
x_new[ind] = k * ts_new[ind] + b
return x_new
[docs]def wave_lens(xdata, tdata):
"""Calculate wave lengths of signal by space between zeros.
Parameters
----------
xdata: np.ndarray
Signal values.
tdata: np.ndarray
Time values.
Returns
-------
: np.ndarray
Wave lengths.
: np.ndarray
Time values.
"""
tms = []
for t_c, x_p, x_c in zip(tdata[1:], xdata[:-1], xdata[1:]):
if x_p * x_c < 0:
tms.append(t_c)
lens = np.diff(tms) * 2
t_lens = np.array(tms[1:])
return lens, t_lens
[docs]def freqs_by_wave_len(xdata, tdata, cut_nans=True):
"""Calculate frequencies using lenghs of waves and linear
interpolation.
Parameters
----------
xdata: np.ndarray
Signal values.
tdata: np.ndarray
Time values.
cut_nans: boolean
If True, the nan values at the ends of the of the produced
array will removed.
Returns
-------
: np.ndarray
Freqs values.
"""
wls, t_wl = wave_lens(xdata, tdata)
freqs = 1 / linint(wls, t_wl, tdata)
if cut_nans:
freqs_cut = []
t_cut = []
for (f, tt) in zip(freqs, tdata):
if f == f:
freqs_cut.append(f)
t_cut.append(tt)
return np.array(freqs_cut), t_cut
return freqs