holoviz / panel
1
"""
2
Renders Streamz Stream objects.
3
"""
4 6
import sys
5

6 6
import param
7

8 6
from .base import ReplacementPane
9

10

11 6
class Streamz(ReplacementPane):
12

13 6
    always_watch = param.Boolean(default=False, doc="""
14
        Whether to watch even when not displayed.""")
15

16 6
    rate_limit = param.Number(default=0.1, bounds=(0, None), doc="""
17
        The minimum interval between events.""")
18

19 6
    _rename = {'rate_limit': None, 'always_watch': None}
20

21 6
    def __init__(self, object=None, **params):
22 6
        super().__init__(object, **params)
23 6
        self._stream = None
24 6
        if self.always_watch:
25 0
            self._setup_stream()
26

27 6
    @param.depends('always_watch', 'object', 'rate_limit', watch=True)
28 2
    def _setup_stream(self):
29 6
        if self.object is None or (self.always_watch and self._stream):
30 6
            return
31 0
        elif self._stream:
32 0
            self._stream.destroy()
33 0
            self._stream = None
34 0
        if self._pane._models or self.always_watch:
35 0
            self._stream = self.object.latest().rate_limit(self.rate_limit).gather()
36 0
            self._stream.sink(self._update_inner)
37

38 6
    def _get_model(self, doc, root=None, parent=None, comm=None):
39 6
        model = super()._get_model(doc, root, parent, comm)
40 6
        self._setup_stream()
41 6
        return model
42

43 6
    def _cleanup(self, root=None):
44 0
        super()._cleanup(root)
45 0
        if not self._pane._models and self._stream:
46 0
            self._stream.destroy()
47 0
            self._stream = None
48

49
    #----------------------------------------------------------------
50
    # Public API
51
    #----------------------------------------------------------------
52

53 6
    @classmethod
54 2
    def applies(cls, obj):
55 6
        if 'streamz' in sys.modules:
56 6
            from streamz import Stream
57 6
            return isinstance(obj, Stream)
58 0
        return False

Read our documentation on viewing source code .

Loading