1
"""
2
Everything involving asynchronicity in acquisition instruments is in this file.
3

4
In particular, this includes getting curves and continuously averaging curves.
5

6
Using the coroutine syntax introduced in python 3.4+ would make the code
7
more elegant, but it would not be compatible with python 2.7. Hence we have
8
chosen to implement all asynchronous methods such that a promise is returned (a
9
Future-object in python). The promise implements the following methods:
10

11
- await_result(): returns the acquisition result once it is ready.
12
- add_done_callback(func): the function func(value) is used as "done-callback)
13

14
All asynchronous methods also have a blocking equivalent that directly
15
returns the result once it is ready:
16

17
- curve_async  <---> curve
18
- single_async <---> single
19

20
Finally, this implmentation using standard python Futures makes it
21
possible to use transparently pyrpl asynchronous methods inside python 3.x
22
coroutines.
23

24
Example:
25

26
    This example shows a typical acquisition use case where a sequence of
27
    n aquisitions of simultaneous oscilloscope and network analyzer traces
28
    are launched
29
    ::
30

31
        from asyncio import ensure_future, event_loop
32

33
        async def my_acquisition_routine(n):
34
            for i in range(n):
35
                print("acquiring scope")
36
                fut = ensure_future(p.rp.scope.run_single())
37
                print("acquiring na")
38
                data2 = await p.networkanalyzer.run_single()
39
                # both acquisitions are carried out simultaneously
40
                data1 = await fut
41
                print("loop %i"%i, data1, data2)
42

43
        ensure_future(my_acquisition_coroutine(10))
44
        eventloop.run_until_complete()
45
"""
46 3
from copy import copy
47 3
from .module_attributes import *
48 3
from .async_utils import PyrplFuture, Future, MainThreadTimer, CancelledError
49

50

51 3
class AcquisitionError(ValueError):
52 3
    pass
53

54

55 3
class CurveFuture(PyrplFuture):
56
    """
57
    The basic acquisition of instruments is an asynchronous process:
58

59
    For instance, when the scope acquisition has been launched, we know
60
    that the curve won't be ready before duration(), but if the scope is
61
    waiting for a trigger event, this could take much longer. Of course,
62
    we want the event loop to stay alive while waiting for a pending curve.
63
    That's the purpose of this future object.
64

65
    After its creation, it will perform the following actions:
66

67
        1. stay inactive for a time given by instrument._remaining_time()
68
        2. after that, it will check every min_refresh_delay if a new curve is ready with instrument._data_ready()
69
        3. when data is ready, its result will be set with the instrument data, as returned by instrument._get_data()
70

71
    """
72

73 3
    def __init__(self, module, min_delay_ms=20):
74 0
        self._module = module
75 0
        self.min_delay_ms = min_delay_ms
76 0
        super(CurveFuture, self).__init__()
77 0
        self._init_timer()
78 0
        self._module._start_acquisition()
79

80 3
    def _init_timer(self):
81 0
        if self.min_delay_ms == 0:
82
            # make sure 1st instrument interrogation occurs before time
83 0
            delay = self._module._remaining_time() * 1000 - 1
84
        else:
85
            # 1 ms loss due to timer inaccuracy is acceptable
86 0
            delay = max(self.min_delay_ms,
87
                        self._module._remaining_time() * 1000)
88

89 0
        self._timer = MainThreadTimer(max(0, delay)) #  avoid negative times
90
        # delays
91 0
        self._timer.timeout.connect(self._set_data_as_result)
92 0
        self._timer.start()
93

94 3
    def _get_one_curve(self):
95 0
        if self._module._data_ready():
96 0
            return self._module._get_curve()
97
        else:
98 0
            return None
99

100 3
    def _set_data_as_result(self):
101 0
        data = self._get_one_curve()
102 0
        if data is not None:
103 0
            self.set_result(data)
104 0
            if self._module.running_state in ["paused", "stopped"]:
105 0
                self._module._free_up_resources()
106
        else:
107 0
            self._timer.setInterval(self.min_delay_ms)
108 0
            self._timer.start()
109

110
    def set_exception(self, exception):  # pragma: no cover
111
        self._timer.stop()
112
        super(CurveFuture, self).set_exception(exception)
113

114 3
    def cancel(self):
115 0
        self._timer.stop()
116 0
        super(CurveFuture, self).cancel()
117

118

119 3
class RunFuture(PyrplFuture):
120
    """
121
    Uses several CurveFuture to perform an average.
122

123
    2 extra functions are provided to control the acquisition:
124

125
    pause(): stalls the acquisition
126

127
    start(): (re-)starts the acquisition (needs to be called at the beginning)
128

129
    The format for curves are:
130

131
    - Scope:
132
        - data_x  : self.times
133
        - data_avg: np.array((ch1, ch2))
134
    - Specan or NA:
135
        - data_x  : frequencies
136
        - data_avg: np.array(y_complex)
137
    """
138

139 3
    def __init__(self, module, min_delay_ms):
140 0
        self._run_continuous = False
141 0
        self._module = module
142 0
        self._min_delay_ms = min_delay_ms
143 0
        super(RunFuture, self).__init__()
144 0
        self.data_avg = None
145 0
        self.data_x = copy(self._module.data_x) #  in case it is saved later
146 0
        self._fut = None
147 0
        self.current_avg = 0
148 0
        self._paused = True
149

150 3
    def _new_curve_arrived(self, curve):
151 0
        try:
152 0
            result = curve.result()
153 0
        except (AcquisitionError, CancelledError):
154 0
            if self._module.running_state in ["running_continuous",
155
                                              "running_single"]:
156 0
                return
157
            else:
158 0
                self.cancel()
159 0
        if self._module.running_state in ["running_continuous",
160
                                          "running_single"]:
161 0
            self.current_avg = min(self.current_avg + 1,
162
                                   self._module.trace_average)
163

164 0
            if self.data_avg is None:
165 0
                self.data_avg = result
166
            else:
167 0
                self.data_avg = (self.data_avg * (self.current_avg - 1) +
168
                                 result) / self.current_avg
169

170 0
            self._module._emit_signal_by_name('display_curve',
171
                                              [self._module.data_x,
172
                                               self.data_avg])
173

174 0
            if self._is_run_over():
175 0
                if not self.done():
176 0
                    self.set_result(self.data_avg)
177 0
                    self._module.running_state = "stopped" # should be 'paused'
178
                    # if we want to average over the single run, but for
179
                    # scope and specan, it is more convenient to restart
180
                    # averaging (basically saves the button stop in the GUI)
181
            else:
182 0
                if not self._paused:
183 0
                    self.start()
184

185 3
    def _is_run_over(self):
186 0
        if self._run_continuous:
187 0
            return False
188
        else:
189 0
            return self.current_avg >= self._module.trace_average
190

191 3
    def cancel(self):
192 0
        self.pause()
193 0
        super(RunFuture, self).cancel()
194

195 3
    def pause(self):
196 0
        self._paused = True
197 0
        self._module._free_up_resources()
198 0
        if self._fut is not None:
199 0
            self._fut.cancel()
200

201 3
    def start(self):
202 0
        self._paused = False
203 0
        if self._fut is not None:
204 0
            self._fut.cancel()
205 0
        self._fut = self._module._curve_async(self._min_delay_ms)
206 0
        self._fut.add_done_callback(self._new_curve_arrived)
207

208 3
    def _set_run_continuous(self):
209
        """
210
        Makes the RunFuture continuous (used when setting "running_continuous")
211
        """
212 0
        self._run_continuous = True
213 0
        self._min_delay_ms = self._module.MIN_DELAY_CONTINUOUS_MS
214

215

216 3
class RunningStateProperty(SelectProperty):
217 3
    def __init__(self, options=["running_single", "running_continuous", "paused", "stopped"], **kwargs):
218
        """
219
        A property to indicate whether the instrument is currently running or not.
220

221
        Changing the running_state performs the necessary actions to enable the
222
        selected state. The state can be one of the following:
223

224
        - 'running_single': takes a single acquisition (trace_average averages). Acquisitions are automatically restarted until the desired number of averages is acquired.
225
        - 'running_continuous': continuously takes a acquisitions, eternally averages and restarts automatically.
226
        - 'paused': acquisition interrupted, but no need to restart averaging at next call of running_continous.
227
        - 'stopped': acquisition interrupted, averaging will restart at next call of running_continuous.
228
        """
229 3
        super(RunningStateProperty, self).__init__(options=options, **kwargs)
230

231
    #  Changing running_state is handled here instead of inside _setup()
232
    # (with a call_setup=True option) because the precise actions to be
233
    # taken depend on the previous state of running_state. Such a behavior
234
    # would not be straightforward to implement in _setup()
235 3
    def set_value(self, obj, val):
236
        """
237
        This is the master property: changing this value triggers all the logic
238
        to change the acquisition mode
239
        """
240
        # touching the running_state cancels the pending curve_future object
241
        # (no effect if future is already done)
242 0
        obj._curve_future.cancel()
243 0
        previous_state = obj.running_state
244 0
        SelectProperty.set_value(self, obj, val)
245 0
        if val == "running_single":
246
            # acquire as fast as possible trace_average curves
247 0
            obj.setup()
248 0
        elif val == "running_continuous":
249 0
            if previous_state == 'stopped':  #  restart averaging...
250 0
                obj.setup()
251
            else:
252 0
                obj._run_future._set_run_continuous() # if previous run was
253
                # "running_single" keep averaging in the same run, simply make
254
                # it continuous
255 0
                obj._run_future.start()
256 0
        elif val in ["paused", "stopped"]:
257 0
            if hasattr(obj, '_run_future'):
258 0
                obj._run_future.cancel() #  single cannot be resumed
259
                #  on the other hand, continuous can still be started again
260
                #  eventhough it is cancelled. Basically, the result will never
261
                #  be set, but the acquisition can still be going on indefinitely.
262

263

264 3
class SignalLauncherAcquisitionModule(SignalLauncher):
265
    """ class that takes care of emitting signals to update all possible
266
    displays"""
267

268 3
    display_curve = QtCore.Signal(list)  # This signal is emitted when
269
    # curves need to be displayed the argument is [array(times),
270
    # array(curve1), array(curve2)] or [times, None, array(curve2)]
271 3
    autoscale_x = QtCore.Signal()
272

273
    # For now, the following signals are only implemented with NA.
274 3
    update_point = QtCore.Signal(int)  #  used in NA only
275 3
    scan_finished = QtCore.Signal()  #  used in NA only
276 3
    clear_curve = QtCore.Signal()  #  NA only
277 3
    x_log_toggled = QtCore.Signal() #  logscale changed
278

279
    # Following signal only implemented in spec an
280 3
    unit_changed = QtCore.Signal()
281

282

283 3
class AcquisitionModule(Module):
284
    """
285
    The asynchronous mode is supported by a sub-object "run"
286
    of the module. When an asynchronous acquisition is running
287
    and the widget is visible, the current averaged data are
288
    automatically displayed. Also, the run object provides a
289
    function save_curve to store the current averaged curve
290
    on the hard-drive.
291

292
    The full API of the "run" object is the following.
293

294
    Methods:
295
        *(All methods return immediately)*
296
        single(): performs an asynchronous acquisition of trace_average curves.
297
            The function returns a promise of the result:
298
            an object with a ready() function, and a get() function that
299
            blocks until data is ready.
300
        continuous(): continuously acquires curves, and performs a
301
            moving average over the trace_average last ones.
302
        pause(): stops the current acquisition without restarting the
303
            averaging
304
        stop(): stops the current acquisition and restarts the averaging.
305
        save_curve(): saves the currently averaged curve (or curves for scope)
306
        curve(): the currently averaged curve
307

308
    Attributes:
309
        curve_name (str): name of the curve to create upon saving
310
        trace_average (int): number of averages in single (not to confuse with
311
            averaging per point)
312
        data_avg (array of numbers): array containing the current averaged curve
313
        current_avg (int): current number of averages
314
    """
315
    # The averaged data are stored in a RunFuture object _run_future
316
    #
317
    # _setup() recreates from scratch _run_future by calling _new_run_future()
318
    #
319
    # It is necessary to setup the AcquisitionModule on startup to start
320
    # with clean arrays
321
    #
322
    # Changing any attribute in callback_attribute (mostly every
323
    # setup_attribute except running_state) will force a restart of the
324
    # averaging by calling setup
325
    #
326
    # On the other hand, "running_state" has a customized behavior: it will
327
    # only call setup() when needed and perform customized actions otherwise:
328
    #  - paused/stopped -> running_single: start acquisition on new future
329
    #  - paused -> running_continuous: start acquisition on same future + set
330
    #  future to run_continuous (irreversible)
331
    #  - stopped -> running_continuous: start acquisition on new future +
332
    # set future to run_continuous (irreversible) == call setup()
333
    #  - running_single/running_continuous -> pause/stop: pause acquisition
334

335 3
    _gui_attributes = ['trace_average', 'curve_name']
336

337 3
    _setup_on_load = True #  acquisition_modules need to be setup() once
338
    # they are loaded
339 3
    _signal_launcher = SignalLauncherAcquisitionModule
340 3
    _setup_attributes = ['running_state', 'trace_average', 'curve_name']
341 3
    _run_future_cls = RunFuture
342 3
    _curve_future_cls = CurveFuture
343

344 3
    MIN_DELAY_SINGLE_MS = 0  # async acquisition should be as fast as
345
    # possible
346 3
    MIN_DELAY_CONTINUOUS_MS = 40  # leave time for the event loop in
347
    # continuous
348

349 3
    running_state = RunningStateProperty(
350
        default='stopped',
351
        doc="Indicates whether the instrument is running acquisitions or not. "
352
            "See :class:`RunningStateProperty` for available options. ")
353

354 3
    trace_average = IntProperty(doc="number of curves to average in single mode. In "
355
                           "continuous mode, a moving window average is "
356
                           "performed.",
357
                           default=1,
358
                           min=1)
359 3
    curve_name = StringProperty(doc="name of the curve to save.")
360

361 3
    def __init__(self, parent, name=None):
362
        # The curve promise is initialized with a dummy Future, because
363
        # instantiating CurveFuture launches a curve acquisition
364 0
        self._curve_future = Future()
365

366 0
        super(AcquisitionModule, self).__init__(parent, name=name)
367 0
        self.curve_name = self.name + " curve"
368 0
        self._run_future = self._run_future_cls(self,
369
                                                min_delay_ms=self.MIN_DELAY_SINGLE_MS)
370
        # On the other hand, RunFuture has a start method and is not started
371
        # at instanciation.
372

373

374 3
    def _new_curve_future(self, min_delay_ms):
375 0
        self._curve_future.cancel()
376 0
        self._curve_future = self._curve_future_cls(self,
377
                                                    min_delay_ms=min_delay_ms)
378

379 3
    def _new_run_future(self):
380 0
        if hasattr(self, "_run_future"):
381 0
            self._run_future.cancel()
382 0
        if self.running_state == "running_continuous":
383 0
            self._run_future = self._run_future_cls(self,
384
                                    min_delay_ms=self.MIN_DELAY_CONTINUOUS_MS)
385 0
            self._run_future._set_run_continuous()
386
        else:
387 0
            self._run_future = self._run_future_cls(self,
388
                                    min_delay_ms=self.MIN_DELAY_SINGLE_MS)
389

390 3
    def _emit_signal_by_name(self, signal_name, *args, **kwds):
391
        """Let's the module's signal_launcher emit signal name"""
392 0
        self._signal_launcher.emit_signal_by_name(signal_name, *args, **kwds)
393

394 3
    def _curve_async(self, min_delay_ms):
395
        """
396
        Same as curve_async except this function can be used in any
397
        running_state.
398
        """
399 0
        self._start_acquisition()
400 0
        self._new_curve_future(min_delay_ms=min_delay_ms)
401 0
        return self._curve_future
402

403 3
    def curve_async(self):
404
        """
405
        Launches the acquisition for one curve with the current parameters.
406

407
        - If running_state is not "stopped", stops the current acquisition.
408
        - If rolling_mode is True, raises an exception.
409
        - Immediately returns a future object representing the curve.
410
        - The curve can be retrieved by calling result(timeout) on the future object.
411
        - The future is cancelled if the instrument's state is changed before the end of the acquisition, or another call to curve_async() or curve() is made on the same instrument.
412
        """
413 0
        if self.running_state is not "stopped":
414 0
            self.stop()
415 0
        return self._curve_async(0)
416

417 3
    def curve(self, timeout=None):
418
        """
419
        Same as curve_async, except:
420

421
        - the function will not return until the curve is ready or timeout occurs.
422
        - the function directly returns an array with the curve instead of a future object
423
        """
424 0
        return self.curve_async().await_result(timeout)
425

426 3
    def single_async(self):
427
        """
428
        Performs an asynchronous acquisition of trace_average curves.
429

430
        - If running_state is not stop, stops the current acquisition.
431
        - Immediately returns a future object representing the curve.
432
        - The curve can be retrieved by calling result(timeout) on the future object.
433
        - The future is cancelled if the instrument's state is changed before the end of the acquisition.
434
        """
435 0
        self.running_state = 'running_single'
436 0
        return self._run_future
437

438 3
    def single(self, timeout=None):
439
        """
440
        Same as single_async, except:
441
            - the function will not return until the averaged curve is ready or timeout occurs.
442
            - the function directly returns an array with the curve instead of a future object.
443
        """
444 0
        return self.single_async().await_result(timeout)
445

446 3
    def continuous(self):
447
        """
448
        continuously acquires curves, and performs a moving
449
        average over the trace_average last ones.
450
        """
451 0
        self.running_state = 'running_continuous'
452
        # return self._continuous_future
453

454 3
    def pause(self):
455
        """
456
        Stops the current acquisition without restarting the averaging
457
        """
458 0
        self.running_state = 'paused'
459

460 3
    def stop(self):
461
        """
462
        Stops the current acquisition and averaging will be restarted
463
        at next run.
464
        """
465 0
        self.running_state = 'stopped'
466

467 3
    def save_curve(self):
468
        """
469
        Saves the curve(s) that is (are) currently displayed in the gui in
470
        the db_system. Also, returns the list [curve_ch1, curve_ch2]...
471
        """
472 0
        params = self.setup_attributes
473 0
        params.update(name=self.curve_name)
474 0
        curve = self._save_curve(self._run_future.data_x,
475
                                 self._run_future.data_avg,
476
                                 **params)
477 0
        return curve
478

479 3
    def _clear(self):
480 0
        super(AcquisitionModule, self)._clear()
481 0
        self._curve_future.cancel()
482 0
        self._run_future.cancel()
483

484 3
    def _setup(self):
485
        # the _run_future is renewed to match the requested type of run (
486
        # rolling_mode or triggered)
487
        # This is how we make sure changing duration or rolling_mode won't
488
        # freeze the acquisition.
489 0
        self._new_run_future()
490 0
        if self.running_state in ["running_single", "running_continuous"]:
491 0
            self._run_future.start()
492 0
            self._emit_signal_by_name("autoscale_x")
493

494
    # Methods to implement in derived class:
495
    # --------------------------------------
496

497 3
    def _remaining_time(self):
498
        """
499
        remaining time (in seconds) until the data has a chance to be ready.
500
        In the case of scope, where trigger might delay the acquisition,
501
        this is the minimum time to wait in the "best case scenario" where
502
        the acquisition would have started immediately after setup().
503
        """
504
        raise NotImplementedError("To implement in derived class")  # pragma: no cover
505

506 3
    def _data_ready(self):
507
        """
508
        :return: True or False
509
        """
510
        raise NotImplementedError('To implement in derived class')  # pragma: no cover
511

512 3
    def _get_curve(self):
513
        """
514
        get the curve from the instrument.
515
          a 1D array for single channel instruments
516
          a 2*n array for the scope
517
        """
518
        raise NotImplementedError  # pragma: no cover
519

520 3
    @property
521
    def data_x(self):
522
        """
523
        x-axis of the curves to plot.
524
        :return:
525
        """
526
        raise NotImplementedError("To implement in derived class")  # pragma: no cover
527

528 3
    def _start_acquisition(self):
529
        """
530
        If anything has to be communicated to the hardware (such as make
531
        trigger ready...) to start the acquisition, it should be done here.
532
        This function will be called only be called by the init-function of
533
        the _curve_future()
534
        Only non-blocking operations are allowed.
535
        """
536
        pass  # pragma: no cover
537

538 3
    def _free_up_resources(self):
539
        pass # pragma: no cover
540

541
    # Shortcut to the RunFuture data (for plotting):
542
    # ----------------------------------------------
543

544 3
    @property
545
    def data_avg(self):
546 0
        return self._run_future.data_avg
547

548 3
    @property
549
    def current_avg(self):
550 0
        return self._run_future.current_avg

Read our documentation on viewing source code .

Loading