jatinchowdhury18 / audio_dspy

@@ -10,6 +10,7 @@
Loading
10 10
from .delay_utils import *
11 11
from .adaptive_filt import *
12 12
from .level_detector import level_detect
13 +
from .farina import Farina
13 14
name = "audio_dspy"
14 15
15 16

@@ -16,11 +16,11 @@
Loading
16 16
                        'Log Sweep response is not flat!')
17 17
18 18
    def test_lin_sweep(self):
19 -
        sweep = adsp.sweep_lin(10, _fs_)
20 -
        sweep2 = adsp.sweep_lin(10, _fs_)
19 +
        sweep = adsp.sweep_lin(0, _fs_/2, 10, _fs_)
20 +
        sweep2 = adsp.sweep_lin(0, _fs_/2, 10, _fs_)
21 21
        h = adsp.sweep2ir(sweep, sweep2)
22 22
        self.assertTrue(self.diff_vs_imp(h) < _tolrance_,
23 -
                        'Log Sweep response is not flat!')
23 +
                        'Lin Sweep response is not flat!')
24 24
25 25
    def diff_vs_imp(self, h):
26 26
        test_h = np.zeros(len(h))

@@ -0,0 +1,120 @@
Loading
1 +
import numpy as np
2 +
import audio_dspy as adsp
3 +
import scipy.signal as signal
4 +
from functools import wraps
5 +
6 +
7 +
class Farina:
8 +
    """
9 +
    Class that implements Alberto Farina's method [1]_ for
10 +
    simultaneously measuring frequency response and harmonic
11 +
    distortion of weakly nonlinear systems.
12 +
13 +
    References
14 +
    ----------
15 +
    .. [1] A. Farina "Simultaneous Measurement of Impulse Response
16 +
           and Distortion with a Swept-Sine Technique", Audio
17 +
           Engineering Society Convention 108, Feb. 2000
18 +
    """
19 +
20 +
    def __init__(self, duration, fs, f0=20, f1=20000):
21 +
        """
22 +
        Creates an object to create and process a Farina-style measurement.
23 +
24 +
        Parameters
25 +
        ----------
26 +
        duration : float
27 +
            length [seconds] of the desired measurement signal
28 +
        fs : float
29 +
            sample rate [Hz] of the measurement signal
30 +
        f0 : float
31 +
            Frequency [Hz] at which to start the measurement
32 +
        f1 : float
33 +
            Frequency [Hz] at which to end the measurement
34 +
        """
35 +
        N = int(duration * fs)
36 +
        self.fs = fs
37 +
38 +
        # create probe and inverse probe
39 +
        self.probe = adsp.sweep_log(f0, f1, duration, fs)
40 +
        R = np.log(f1 / f0)
41 +
        k = np.exp(np.arange(N) * R / N)
42 +
        self.inv_probe = np.flip(self.probe) / k
43 +
44 +
        # @TEST: test that probe convolved with inverse has flat spectrum,
45 +
        # and impulse-like response
46 +
47 +
        # determin times to look for harmonics
48 +
        self.far_response = None
49 +
        self.harm_times = [0]
50 +
        mult = 1
51 +
        while True:
52 +
            mult += 1
53 +
            delta_n = int(N * np.log(mult) / np.log(f1/f0))
54 +
            self.harm_times.append(delta_n)
55 +
            if self.harm_times[-1] - self.harm_times[-2] < fs * 0.05:
56 +
                break
57 +
58 +
    def process_measurement(self, measurement):
59 +
        """
60 +
        Processes a measurement made using the probe signal
61 +
        for this object.
62 +
        """
63 +
        self.far_response = adsp.normalize(
64 +
            signal.convolve(measurement, self.inv_probe))
65 +
66 +
        amax = np.argmax(self.far_response)
67 +
        level = adsp.level_detect(self.far_response, self.fs)
68 +
        off = int(self.fs/10)
69 +
        amin = np.argwhere(level[amax-off:amax] < 0.05)[-1][0]
70 +
        amax = amax - (off - amin)
71 +
        end = amax + np.argwhere(level[amax:] < 10**(-60/20))[0][0]
72 +
73 +
        self.harm_responses = [self.far_response[amax:end]]
74 +
        for i in range(1, len(self.harm_times)):
75 +
            start = amax - self.harm_times[i]
76 +
            end = amax - self.harm_times[i-1]
77 +
            self.harm_responses.append(self.far_response[start:end])
78 +
79 +
    def _check_meas(func):
80 +
        """
81 +
        Decorator to make sure the measurement has been
82 +
        processed before attempting to access anything that
83 +
        depends on it.
84 +
        """
85 +
        @wraps(func)
86 +
        def checker(self, *args, **kwargs):
87 +
            if self.far_response is None:
88 +
                assert False, 'You must process a measurement before calling this function'
89 +
            return func(self, *args, **kwargs)
90 +
        return checker
91 +
92 +
    @_check_meas
93 +
    def get_harm_response(self, harm_num):
94 +
        """
95 +
        Returns the impulse response for a certain harmonic
96 +
        of the system. Note that the fundamental is the 1st harmonic.
97 +
        """
98 +
        assert harm_num > 0, 'Harmonic number must be greater than zero!'
99 +
        assert harm_num < len(self.harm_times), 'Harmonic number too large!'
100 +
        return self.harm_responses[harm_num-1]
101 +
102 +
    @_check_meas
103 +
    def get_IR(self):
104 +
        """
105 +
        Returns the impulse response for the linear
106 +
        part of the system.
107 +
        """
108 +
        return self.get_harm_response(1)
109 +
110 +
    @_check_meas
111 +
    def getTHD(self, harms=9):
112 +
        """
113 +
        Returns the estimated total harmonic distortion for the system.
114 +
        """
115 +
        rms_vals = np.zeros(harms)
116 +
        for idx, response in enumerate(self.harm_responses[:harms]):
117 +
            r_corr = signal.convolve(response, self.get_IR())
118 +
            rms_vals[idx] = np.sqrt(np.mean(r_corr**2))
119 +
        rms_vals /= rms_vals[0]
120 +
        return np.sqrt(np.sum(rms_vals[1:]**2)) / rms_vals[0]

@@ -30,11 +30,15 @@
Loading
30 30
    return np.cos((phase + phi)/fs)
31 31
32 32
33 -
def sweep_lin(duration, fs):
33 +
def sweep_lin(f0, f1, duration, fs):
34 34
    """Generates a linear sine sweep
35 35
36 36
    Parameters
37 37
    ----------
38 +
    f0: float
39 +
        The frequency [Hz] at which to begin the sine sweep
40 +
    f1: float
41 +
        The frequency [Hz] at which to stop the sine sweep
38 42
    duration: float
39 43
        The length of time [seconds] over which to sweep the signal
40 44
    fs: float
@@ -48,7 +52,7 @@
Loading
48 52
    N = int(duration * fs)
49 53
    n = np.arange(N)
50 54
51 -
    phase = 2 * np.pi * (((fs/2)/N) * n * n / 2)
55 +
    phase = 2 * np.pi * (f0 * n + (((f1 - f0)/N) * n * n / 2))
52 56
    phi = np.pi / 180
53 57
54 58
    return np.cos((phase + phi)/fs)

@@ -0,0 +1,124 @@
Loading
1 +
from unittest import TestCase
2 +
import numpy as np
3 +
import scipy.signal as signal
4 +
5 +
import audio_dspy as adsp
6 +
7 +
_fs_ = 44100
8 +
9 +
10 +
class TestFarina(TestCase):
11 +
    def setUp(self):
12 +
        g = adsp.delay_feedback_gain_for_t60(1, _fs_, 0.5)
13 +
        self.N = int(0.6 * _fs_)
14 +
15 +
        np.random.seed(0x2345)
16 +
        self.test_filt = adsp.normalize(
17 +
            np.random.randn(self.N) * g**np.arange(self.N))
18 +
        self.lin_sys = lambda sig: signal.convolve(sig, self.test_filt)
19 +
        self.nonlin_sys = lambda sig: signal.convolve(
20 +
            adsp.soft_clipper(1.1*sig, deg=9), self.test_filt)
21 +
22 +
    def test_inverse_probe_freq(self):
23 +
        far = adsp.Farina(10, _fs_)
24 +
        delta = signal.convolve(far.probe, far.inv_probe)
25 +
26 +
        freqs = np.fft.rfftfreq(len(delta), 1.0/_fs_)
27 +
        start = np.argwhere(freqs > 100)[0][0]
28 +
        end = np.argwhere(freqs < 16000)[-1][0]
29 +
30 +
        delta_fft = 20 * \
31 +
            np.log10(adsp.normalize(np.abs(np.fft.rfft(delta))))[start:end]
32 +
        rangeDB = np.max(delta_fft) - np.min(delta_fft)
33 +
        self.assertTrue(
34 +
            rangeDB < 2.0, 'Inverse probe does not have correct frequency response!')
35 +
36 +
    def test_inverse_probe_imp(self):
37 +
        far = adsp.Farina(10, _fs_)
38 +
        delta = signal.convolve(far.probe, far.inv_probe)
39 +
        delta_sort = np.sort(np.abs(delta))
40 +
        ratio = delta_sort[-1] / delta_sort[-2]
41 +
42 +
        self.assertTrue(
43 +
            ratio > 5, 'Probe and inverse probe does not convolve to a delta!')
44 +
45 +
    def test_freq_response_lin(self):
46 +
        far = adsp.Farina(20, _fs_)
47 +
        meas = self.lin_sys(far.probe)
48 +
        far.process_measurement(meas)
49 +
50 +
        freqs = np.fft.rfftfreq(self.N, 1.0/_fs_)
51 +
        start = np.argwhere(freqs > 100)[0][0]
52 +
        end = np.argwhere(freqs < 16000)[-1][0]
53 +
54 +
        test_fft = 20 * \
55 +
            np.log10(adsp.normalize(np.abs(np.fft.rfft(self.test_filt))))[
56 +
                start:end]
57 +
        far_fft = 20 * \
58 +
            np.log10(adsp.normalize(np.abs(np.fft.rfft(far.get_IR()[:self.N]))))[
59 +
                start:end]
60 +
        error = np.max(np.abs(test_fft - far_fft))
61 +
62 +
        self.assertTrue(
63 +
            error < 10, 'Incorrect frequency response for linear system!')
64 +
65 +
    def test_THD_lin(self):
66 +
        far = adsp.Farina(20, _fs_)
67 +
        meas = self.lin_sys(far.probe)
68 +
        far.process_measurement(meas)
69 +
70 +
        thd = far.getTHD()
71 +
        self.assertTrue(thd < 0.1, 'Incorrect THD for linear system!')
72 +
73 +
    def test_freq_response_nonlin(self):
74 +
        far = adsp.Farina(20, _fs_)
75 +
        meas = self.nonlin_sys(far.probe)
76 +
        far.process_measurement(meas)
77 +
78 +
        freqs = np.fft.rfftfreq(self.N, 1.0/_fs_)
79 +
        start = np.argwhere(freqs > 100)[0][0]
80 +
        end = np.argwhere(freqs < 16000)[-1][0]
81 +
82 +
        test_fft = 20 * \
83 +
            np.log10(adsp.normalize(np.abs(np.fft.rfft(self.test_filt))))[
84 +
                start:end]
85 +
        far_fft = 20 * \
86 +
            np.log10(adsp.normalize(np.abs(np.fft.rfft(far.get_IR()[:self.N]))))[
87 +
                start:end]
88 +
        error = np.max(np.abs(test_fft - far_fft))
89 +
90 +
        self.assertTrue(
91 +
            error < 10, 'Incorrect frequency response for nonlinear system!')
92 +
93 +
    def test_THD_nonlin(self):
94 +
        # get reference THD from sine wave test
95 +
        N = _fs_*2
96 +
        freq = 1000
97 +
        sine = np.sin(2 * np.pi * np.arange(N) * freq / _fs_)
98 +
        y = self.nonlin_sys(sine)
99 +
100 +
        V_rms = np.zeros(9)
101 +
        for n in range(1, 10):
102 +
            low_freq = freq*n - 30
103 +
            b_hpf, a_hpf = signal.butter(
104 +
                4, low_freq, btype='highpass', analog=False, fs=_fs_)
105 +
106 +
            high_freq = freq*n + 30
107 +
            b_lpf, a_lpf = signal.butter(
108 +
                4, high_freq, btype='lowpass', analog=False, fs=_fs_)
109 +
110 +
            harmonic = signal.lfilter(
111 +
                b_hpf, a_hpf, (signal.lfilter(b_lpf, a_lpf, y)))
112 +
            V_rms[n-1] = np.sqrt(np.mean(harmonic**2))
113 +
114 +
        sine_thd = np.sqrt(np.sum(V_rms[1:]**2)) / V_rms[0]
115 +
116 +
        # get measured THD from Farina
117 +
        far = adsp.Farina(20, _fs_)
118 +
        meas = self.nonlin_sys(far.probe)
119 +
        far.process_measurement(meas)
120 +
        far_thd = far.getTHD()
121 +
122 +
        error = np.abs(far_thd - sine_thd)
123 +
        self.assertTrue(
124 +
            error < 0.1, f'Incorrect THD! Sine: {sine_thd}, Far: {far_thd}')
Files Coverage
audio_dspy 84.42%
tests 100.00%
setup.py 100.00%
Project Totals (25 files) 90.64%
Sunburst
The inner-most circle is the entire project, moving away from the center are folders then, finally, a single file. The size and color of each slice is representing the number of statements and the coverage, respectively.
Icicle
The top section represents the entire project. Proceeding with folders and finally individual files. The size and color of each slice is representing the number of statements and the coverage, respectively.
Grid
Each block represents a single file in the project. The size and color of each block is represented by the number of statements and the coverage, respectively.
Loading