1 3
import logging
2 3
import os
3 3
import threading
4

5 3
import pykka
6

7 3
from mopidy import exceptions
8 3
from mopidy.audio import tags as tags_lib
9 3
from mopidy.audio import utils
10 3
from mopidy.audio.constants import PlaybackState
11 3
from mopidy.audio.listener import AudioListener
12 3
from mopidy.internal import process
13 3
from mopidy.internal.gi import GLib, GObject, Gst, GstPbutils
14

15 3
logger = logging.getLogger(__name__)
16

17
# This logger is only meant for debug logging of low level GStreamer info such
18
# as callbacks, event, messages and direct interaction with GStreamer such as
19
# set_state() on a pipeline.
20 3
gst_logger = logging.getLogger("mopidy.audio.gst")
21

22 3
_GST_PLAY_FLAGS_AUDIO = 0x02
23 3
_GST_PLAY_FLAGS_DOWNLOAD = 0x80
24

25 3
_GST_STATE_MAPPING = {
26
    Gst.State.PLAYING: PlaybackState.PLAYING,
27
    Gst.State.PAUSED: PlaybackState.PAUSED,
28
    Gst.State.NULL: PlaybackState.STOPPED,
29
}
30

31

32
# TODO: expose this as a property on audio?
33 3
class _Appsrc:
34

35
    """Helper class for dealing with appsrc based playback."""
36

37 3
    def __init__(self):
38 3
        self._signals = utils.Signals()
39 3
        self.reset()
40

41 3
    def reset(self):
42
        """Reset the helper.
43

44
        Should be called whenever the source changes and we are not setting up
45
        a new appsrc.
46
        """
47 3
        self.prepare(None, None, None, None)
48

49 3
    def prepare(self, caps, need_data, enough_data, seek_data):
50
        """Store info we will need when the appsrc element gets installed."""
51 3
        self._signals.clear()
52 3
        self._source = None
53 3
        self._caps = caps
54 3
        self._need_data_callback = need_data
55 3
        self._seek_data_callback = seek_data
56 3
        self._enough_data_callback = enough_data
57

58 3
    def configure(self, source):
59
        """Configure the supplied source for use.
60

61
        Should be called whenever we get a new appsrc.
62
        """
63 0
        source.set_property("caps", self._caps)
64 0
        source.set_property("format", "time")
65 0
        source.set_property("stream-type", "seekable")
66 0
        source.set_property("max-bytes", 1 << 20)  # 1MB
67 0
        source.set_property("min-percent", 50)
68

69 0
        if self._need_data_callback:
70 0
            self._signals.connect(
71
                source, "need-data", self._on_signal, self._need_data_callback
72
            )
73 0
        if self._seek_data_callback:
74 0
            self._signals.connect(
75
                source, "seek-data", self._on_signal, self._seek_data_callback
76
            )
77 0
        if self._enough_data_callback:
78 0
            self._signals.connect(
79
                source,
80
                "enough-data",
81
                self._on_signal,
82
                None,
83
                self._enough_data_callback,
84
            )
85

86 0
        self._source = source
87

88 3
    def push(self, buffer_):
89 0
        if self._source is None:
90 0
            return False
91

92 0
        if buffer_ is None:
93 0
            gst_logger.debug("Sending appsrc end-of-stream event.")
94 0
            result = self._source.emit("end-of-stream")
95 0
            return result == Gst.FlowReturn.OK
96
        else:
97 0
            result = self._source.emit("push-buffer", buffer_)
98 0
            return result == Gst.FlowReturn.OK
99

100 3
    def _on_signal(self, element, clocktime, func):
101
        # This shim is used to ensure we always return true, and also handles
102
        # that not all the callbacks have a time argument.
103 0
        if clocktime is None:
104 0
            func()
105
        else:
106 0
            func(utils.clocktime_to_millisecond(clocktime))
107 0
        return True
108

109

110
# TODO: expose this as a property on audio when #790 gets further along.
111 3
class _Outputs(Gst.Bin):
112 3
    def __init__(self):
113 0
        Gst.Bin.__init__(self)
114
        # TODO gst1: Set 'outputs' as the Bin name for easier debugging
115

116 0
        self._tee = Gst.ElementFactory.make("tee")
117 0
        self.add(self._tee)
118

119 0
        ghost_pad = Gst.GhostPad.new("sink", self._tee.get_static_pad("sink"))
120 0
        self.add_pad(ghost_pad)
121

122 3
    def add_output(self, description):
123
        # XXX This only works for pipelines not in use until #790 gets done.
124 0
        try:
125 0
            output = Gst.parse_bin_from_description(
126
                description, ghost_unlinked_pads=True
127
            )
128 0
        except GLib.GError as ex:
129 0
            logger.error(
130
                'Failed to create audio output "%s": %s', description, ex
131
            )
132 0
            raise exceptions.AudioException(bytes(ex))
133

134 0
        self._add(output)
135 0
        logger.info('Audio output set to "%s"', description)
136

137 3
    def _add(self, element):
138 0
        queue = Gst.ElementFactory.make("queue")
139 0
        self.add(element)
140 0
        self.add(queue)
141 0
        queue.link(element)
142 0
        self._tee.link(queue)
143

144

145 3
class SoftwareMixer:
146 3
    def __init__(self, mixer):
147 0
        self._mixer = mixer
148 0
        self._element = None
149 0
        self._last_volume = None
150 0
        self._last_mute = None
151 0
        self._signals = utils.Signals()
152

153 3
    def setup(self, element, mixer_ref):
154 0
        self._element = element
155 0
        self._mixer.setup(mixer_ref)
156

157 3
    def teardown(self):
158 0
        self._signals.clear()
159 0
        self._mixer.teardown()
160

161 3
    def get_volume(self):
162 0
        return int(round(self._element.get_property("volume") * 100))
163

164 3
    def set_volume(self, volume):
165 0
        self._element.set_property("volume", volume / 100.0)
166 0
        self._mixer.trigger_volume_changed(self.get_volume())
167

168 3
    def get_mute(self):
169 0
        return self._element.get_property("mute")
170

171 3
    def set_mute(self, mute):
172 0
        self._element.set_property("mute", bool(mute))
173 0
        self._mixer.trigger_mute_changed(self.get_mute())
174

175

176 3
class _Handler:
177 3
    def __init__(self, audio):
178 3
        self._audio = audio
179 3
        self._element = None
180 3
        self._pad = None
181 3
        self._message_handler_id = None
182 3
        self._event_handler_id = None
183

184 3
    def setup_message_handling(self, element):
185 3
        self._element = element
186 3
        bus = element.get_bus()
187 3
        bus.add_signal_watch()
188 3
        self._message_handler_id = bus.connect("message", self.on_message)
189

190 3
    def setup_event_handling(self, pad):
191 3
        self._pad = pad
192 3
        self._event_handler_id = pad.add_probe(
193
            Gst.PadProbeType.EVENT_BOTH, self.on_pad_event
194
        )
195

196 3
    def teardown_message_handling(self):
197 3
        bus = self._element.get_bus()
198 3
        bus.remove_signal_watch()
199 3
        bus.disconnect(self._message_handler_id)
200 3
        self._message_handler_id = None
201

202 3
    def teardown_event_handling(self):
203 3
        self._pad.remove_probe(self._event_handler_id)
204 3
        self._event_handler_id = None
205

206 3
    def on_message(self, bus, msg):
207 3
        if msg.type == Gst.MessageType.STATE_CHANGED:
208 3
            if msg.src != self._element:
209 3
                return
210 3
            old_state, new_state, pending_state = msg.parse_state_changed()
211 3
            self.on_playbin_state_changed(old_state, new_state, pending_state)
212 3
        elif msg.type == Gst.MessageType.BUFFERING:
213 0
            self.on_buffering(msg.parse_buffering(), msg.get_structure())
214 3
        elif msg.type == Gst.MessageType.EOS:
215 3
            self.on_end_of_stream()
216 3
        elif msg.type == Gst.MessageType.ERROR:
217 0
            error, debug = msg.parse_error()
218 0
            self.on_error(error, debug)
219 3
        elif msg.type == Gst.MessageType.WARNING:
220 0
            error, debug = msg.parse_warning()
221 0
            self.on_warning(error, debug)
222 3
        elif msg.type == Gst.MessageType.ASYNC_DONE:
223 0
            self.on_async_done()
224 3
        elif msg.type == Gst.MessageType.TAG:
225 0
            taglist = msg.parse_tag()
226 0
            self.on_tag(taglist)
227 3
        elif msg.type == Gst.MessageType.ELEMENT:
228 0
            if GstPbutils.is_missing_plugin_message(msg):
229 0
                self.on_missing_plugin(msg)
230 3
        elif msg.type == Gst.MessageType.STREAM_START:
231 0
            self.on_stream_start()
232

233 3
    def on_pad_event(self, pad, pad_probe_info):
234 3
        event = pad_probe_info.get_event()
235 3
        if event.type == Gst.EventType.SEGMENT:
236 0
            self.on_segment(event.parse_segment())
237 3
        return Gst.PadProbeReturn.OK
238

239 3
    def on_playbin_state_changed(self, old_state, new_state, pending_state):
240 3
        gst_logger.debug(
241
            "Got STATE_CHANGED bus message: old=%s new=%s pending=%s",
242
            old_state.value_name,
243
            new_state.value_name,
244
            pending_state.value_name,
245
        )
246

247 3
        if new_state == Gst.State.READY and pending_state == Gst.State.NULL:
248
            # XXX: We're not called on the last state change when going down to
249
            # NULL, so we rewrite the second to last call to get the expected
250
            # behavior.
251 3
            new_state = Gst.State.NULL
252 3
            pending_state = Gst.State.VOID_PENDING
253

254 3
        if pending_state != Gst.State.VOID_PENDING:
255 3
            return  # Ignore intermediate state changes
256

257 3
        if new_state == Gst.State.READY:
258 3
            return  # Ignore READY state as it's GStreamer specific
259

260 3
        new_state = _GST_STATE_MAPPING[new_state]
261 3
        old_state, self._audio.state = self._audio.state, new_state
262

263 3
        target_state = _GST_STATE_MAPPING.get(self._audio._target_state)
264 3
        if target_state is None:
265
            # XXX: Workaround for #1430, to be fixed properly by #1222.
266 0
            logger.warn("Race condition happened. See #1222 and #1430.")
267 0
            return
268 3
        if target_state == new_state:
269 3
            target_state = None
270

271 3
        logger.debug(
272
            "Audio event: state_changed(old_state=%s, new_state=%s, "
273
            "target_state=%s)",
274
            old_state,
275
            new_state,
276
            target_state,
277
        )
278 3
        AudioListener.send(
279
            "state_changed",
280
            old_state=old_state,
281
            new_state=new_state,
282
            target_state=target_state,
283
        )
284 3
        if new_state == PlaybackState.STOPPED:
285 3
            logger.debug("Audio event: stream_changed(uri=None)")
286 3
            AudioListener.send("stream_changed", uri=None)
287

288 3
        if "GST_DEBUG_DUMP_DOT_DIR" in os.environ:
289 0
            Gst.debug_bin_to_dot_file(
290
                self._audio._playbin, Gst.DebugGraphDetails.ALL, "mopidy"
291
            )
292

293 3
    def on_buffering(self, percent, structure=None):
294 3
        if self._audio._target_state < Gst.State.PAUSED:
295 0
            gst_logger.debug("Skip buffering during track change.")
296 0
            return
297

298 3
        if structure is not None and structure.has_field("buffering-mode"):
299 0
            buffering_mode = structure.get_enum(
300
                "buffering-mode", Gst.BufferingMode
301
            )
302 0
            if buffering_mode == Gst.BufferingMode.LIVE:
303 0
                return  # Live sources stall in paused.
304

305 3
        level = logging.getLevelName("TRACE")
306 3
        if percent < 10 and not self._audio._buffering:
307 3
            self._audio._playbin.set_state(Gst.State.PAUSED)
308 3
            self._audio._buffering = True
309 3
            level = logging.DEBUG
310 3
        if percent == 100:
311 3
            self._audio._buffering = False
312 3
            if self._audio._target_state == Gst.State.PLAYING:
313 0
                self._audio._playbin.set_state(Gst.State.PLAYING)
314 3
            level = logging.DEBUG
315

316 3
        gst_logger.log(
317
            level, "Got BUFFERING bus message: percent=%d%%", percent
318
        )
319

320 3
    def on_end_of_stream(self):
321 3
        gst_logger.debug("Got EOS (end of stream) bus message.")
322 3
        logger.debug("Audio event: reached_end_of_stream()")
323 3
        self._audio._tags = {}
324 3
        AudioListener.send("reached_end_of_stream")
325

326 3
    def on_error(self, error, debug):
327 0
        gst_logger.error(f"GStreamer error: {error.message}")
328 0
        gst_logger.debug(
329
            f"Got ERROR bus message: error={error!r} debug={debug!r}"
330
        )
331

332
        # TODO: is this needed?
333 0
        self._audio.stop_playback()
334

335 3
    def on_warning(self, error, debug):
336 0
        gst_logger.warning(f"GStreamer warning: {error.message}")
337 0
        gst_logger.debug(
338
            f"Got WARNING bus message: error={error!r} debug={debug!r}"
339
        )
340

341 3
    def on_async_done(self):
342 0
        gst_logger.debug("Got ASYNC_DONE bus message.")
343

344 3
    def on_tag(self, taglist):
345 0
        tags = tags_lib.convert_taglist(taglist)
346 0
        gst_logger.debug("Got TAG bus message: tags=%r", dict(tags))
347

348
        # Postpone emitting tags until stream start.
349 0
        if self._audio._pending_tags is not None:
350 0
            self._audio._pending_tags.update(tags)
351 0
            return
352

353
        # TODO: Add proper tests for only emitting changed tags.
354 0
        unique = object()
355 0
        changed = []
356 0
        for key, value in tags.items():
357
            # Update any tags that changed, and store changed keys.
358 0
            if self._audio._tags.get(key, unique) != value:
359 0
                self._audio._tags[key] = value
360 0
                changed.append(key)
361

362 0
        if changed:
363 0
            logger.debug("Audio event: tags_changed(tags=%r)", changed)
364 0
            AudioListener.send("tags_changed", tags=changed)
365

366 3
    def on_missing_plugin(self, msg):
367 0
        desc = GstPbutils.missing_plugin_message_get_description(msg)
368 0
        debug = GstPbutils.missing_plugin_message_get_installer_detail(msg)
369 0
        gst_logger.debug("Got missing-plugin bus message: description=%r", desc)
370 0
        logger.warning("Could not find a %s to handle media.", desc)
371 0
        if GstPbutils.install_plugins_supported():
372 0
            logger.info(
373
                "You might be able to fix this by running: "
374
                'gst-installer "%s"',
375
                debug,
376
            )
377
        # TODO: store the missing plugins installer info in a file so we can
378
        # can provide a 'mopidy install-missing-plugins' if the system has the
379
        # required helper installed?
380

381 3
    def on_stream_start(self):
382 0
        gst_logger.debug("Got STREAM_START bus message")
383 0
        uri = self._audio._pending_uri
384 0
        logger.debug("Audio event: stream_changed(uri=%r)", uri)
385 0
        AudioListener.send("stream_changed", uri=uri)
386

387
        # Emit any postponed tags that we got after about-to-finish.
388 0
        tags, self._audio._pending_tags = self._audio._pending_tags, None
389 0
        self._audio._tags = tags or {}
390

391 0
        if tags:
392 0
            logger.debug("Audio event: tags_changed(tags=%r)", tags.keys())
393 0
            AudioListener.send("tags_changed", tags=tags.keys())
394

395 0
        if self._audio._pending_metadata:
396 0
            self._audio._playbin.send_event(self._audio._pending_metadata)
397 0
            self._audio._pending_metadata = None
398

399 3
    def on_segment(self, segment):
400 0
        gst_logger.debug(
401
            "Got SEGMENT pad event: "
402
            "rate=%(rate)s format=%(format)s start=%(start)s stop=%(stop)s "
403
            "position=%(position)s",
404
            {
405
                "rate": segment.rate,
406
                "format": Gst.Format.get_name(segment.format),
407
                "start": segment.start,
408
                "stop": segment.stop,
409
                "position": segment.position,
410
            },
411
        )
412 0
        position_ms = segment.position // Gst.MSECOND
413 0
        logger.debug("Audio event: position_changed(position=%r)", position_ms)
414 0
        AudioListener.send("position_changed", position=position_ms)
415

416

417
# TODO: create a player class which replaces the actors internals
418 3
class Audio(pykka.ThreadingActor):
419

420
    """
421
    Audio output through `GStreamer <https://gstreamer.freedesktop.org/>`_.
422
    """
423

424
    #: The GStreamer state mapped to :class:`mopidy.audio.PlaybackState`
425 3
    state = PlaybackState.STOPPED
426

427
    #: The software mixing interface :class:`mopidy.audio.actor.SoftwareMixer`
428 3
    mixer = None
429

430 3
    def __init__(self, config, mixer):
431 3
        super().__init__()
432

433 3
        self._config = config
434 3
        self._target_state = Gst.State.NULL
435 3
        self._buffering = False
436 3
        self._live_stream = False
437 3
        self._tags = {}
438 3
        self._pending_uri = None
439 3
        self._pending_tags = None
440 3
        self._pending_metadata = None
441

442 3
        self._playbin = None
443 3
        self._outputs = None
444 3
        self._queue = None
445 3
        self._about_to_finish_callback = None
446

447 3
        self._handler = _Handler(self)
448 3
        self._appsrc = _Appsrc()
449 3
        self._signals = utils.Signals()
450

451 3
        if mixer and self._config["audio"]["mixer"] == "software":
452 0
            self.mixer = pykka.traversable(SoftwareMixer(mixer))
453

454 3
    def on_start(self):
455 3
        self._thread = threading.current_thread()
456 3
        try:
457 3
            self._setup_preferences()
458 3
            self._setup_playbin()
459 3
            self._setup_outputs()
460 3
            self._setup_audio_sink()
461 0
        except GLib.GError as ex:
462 0
            logger.exception(ex)
463 0
            process.exit_process()
464

465 3
    def on_stop(self):
466 3
        self._teardown_mixer()
467 3
        self._teardown_playbin()
468

469 3
    def _setup_preferences(self):
470
        # TODO: move out of audio actor?
471
        # Fix for https://github.com/mopidy/mopidy/issues/604
472 3
        registry = Gst.Registry.get()
473 3
        jacksink = registry.find_feature("jackaudiosink", Gst.ElementFactory)
474 3
        if jacksink:
475 3
            jacksink.set_rank(Gst.Rank.SECONDARY)
476

477 3
    def _setup_playbin(self):
478 3
        playbin = Gst.ElementFactory.make("playbin")
479 3
        playbin.set_property("flags", _GST_PLAY_FLAGS_AUDIO)
480

481
        # TODO: turn into config values...
482 3
        playbin.set_property("buffer-size", 5 << 20)  # 5MB
483 3
        playbin.set_property("buffer-duration", 5 * Gst.SECOND)
484

485 3
        self._signals.connect(playbin, "source-setup", self._on_source_setup)
486 3
        self._signals.connect(
487
            playbin, "about-to-finish", self._on_about_to_finish
488
        )
489

490 3
        self._playbin = playbin
491 3
        self._handler.setup_message_handling(playbin)
492

493 3
    def _teardown_playbin(self):
494 3
        self._handler.teardown_message_handling()
495 3
        self._handler.teardown_event_handling()
496 3
        self._signals.disconnect(self._playbin, "about-to-finish")
497 3
        self._signals.disconnect(self._playbin, "source-setup")
498 3
        self._playbin.set_state(Gst.State.NULL)
499

500 3
    def _setup_outputs(self):
501
        # We don't want to use outputs for regular testing, so just install
502
        # an unsynced fakesink when someone asks for a 'testoutput'.
503 3
        if self._config["audio"]["output"] == "testoutput":
504 3
            self._outputs = Gst.ElementFactory.make("fakesink")
505
        else:
506 0
            self._outputs = _Outputs()
507 0
            try:
508 0
                self._outputs.add_output(self._config["audio"]["output"])
509 0
            except exceptions.AudioException:
510 0
                process.exit_process()  # TODO: move this up the chain
511

512 3
        self._handler.setup_event_handling(self._outputs.get_static_pad("sink"))
513

514 3
    def _setup_audio_sink(self):
515 3
        audio_sink = Gst.ElementFactory.make("bin", "audio-sink")
516 3
        queue = Gst.ElementFactory.make("queue")
517 3
        volume = Gst.ElementFactory.make("volume")
518

519
        # Queue element to buy us time between the about-to-finish event and
520
        # the actual switch, i.e. about to switch can block for longer thanks
521
        # to this queue.
522

523
        # TODO: See if settings should be set to minimize latency. Previous
524
        # setting breaks appsrc, and settings before that broke on a few
525
        # systems. So leave the default to play it safe.
526 3
        buffer_time = self._config["audio"]["buffer_time"]
527 3
        if buffer_time is not None and buffer_time > 0:
528 0
            queue.set_property("max-size-time", buffer_time * Gst.MSECOND)
529

530 3
        audio_sink.add(queue)
531 3
        audio_sink.add(self._outputs)
532 3
        audio_sink.add(volume)
533

534 3
        queue.link(volume)
535 3
        volume.link(self._outputs)
536

537 3
        if self.mixer:
538 0
            self.mixer.setup(volume, self.actor_ref.proxy().mixer)
539

540 3
        ghost_pad = Gst.GhostPad.new("sink", queue.get_static_pad("sink"))
541 3
        audio_sink.add_pad(ghost_pad)
542

543 3
        self._playbin.set_property("audio-sink", audio_sink)
544 3
        self._queue = queue
545

546 3
    def _teardown_mixer(self):
547 3
        if self.mixer:
548 0
            self.mixer.teardown()
549

550 3
    def _on_about_to_finish(self, element):
551 0
        if self._thread == threading.current_thread():
552 0
            logger.error(
553
                "about-to-finish in actor, aborting to avoid deadlock."
554
            )
555 0
            return
556

557 0
        gst_logger.debug("Got about-to-finish event.")
558 0
        if self._about_to_finish_callback:
559 0
            logger.debug("Running about-to-finish callback.")
560 0
            self._about_to_finish_callback()
561

562 3
    def _on_source_setup(self, element, source):
563 3
        gst_logger.debug(
564
            "Got source-setup signal: element=%s", source.__class__.__name__
565
        )
566

567 3
        if source.get_factory().get_name() == "appsrc":
568 0
            self._appsrc.configure(source)
569
        else:
570 3
            self._appsrc.reset()
571

572 3
        if self._live_stream and hasattr(source.props, "is_live"):
573 3
            gst_logger.debug("Enabling live stream mode")
574 3
            source.set_live(True)
575

576 3
        utils.setup_proxy(source, self._config["proxy"])
577

578 3
    def set_uri(self, uri, live_stream=False, download=False):
579
        """
580
        Set URI of audio to be played.
581

582
        You *MUST* call :meth:`prepare_change` before calling this method.
583

584
        :param uri: the URI to play
585
        :type uri: string
586
        :param live_stream: disables buffering, reducing latency for stream,
587
            and discarding data when paused
588
        :type live_stream: bool
589
        :param download: enables "download" buffering mode
590
        :type download: bool
591
        """
592

593
        # XXX: Hack to workaround issue on Mac OS X where volume level
594
        # does not persist between track changes. mopidy/mopidy#886
595 3
        if self.mixer is not None:
596 0
            current_volume = self.mixer.get_volume()
597
        else:
598 3
            current_volume = None
599

600 3
        flags = _GST_PLAY_FLAGS_AUDIO
601 3
        if download:
602 3
            flags |= _GST_PLAY_FLAGS_DOWNLOAD
603

604 3
        logger.debug(f"Flags: {flags}")
605 3
        if live_stream and download:
606 0
            logger.warning(
607
                "Ambiguous buffering flags: "
608
                "'is_live' and 'should_download' should not both be set."
609
            )
610

611 3
        self._pending_uri = uri
612 3
        self._pending_tags = {}
613 3
        self._live_stream = live_stream
614 3
        self._playbin.set_property("flags", flags)
615 3
        self._playbin.set_property("uri", uri)
616

617 3
        if self.mixer is not None and current_volume is not None:
618 0
            self.mixer.set_volume(current_volume)
619

620 3
    def set_appsrc(
621
        self, caps, need_data=None, enough_data=None, seek_data=None
622
    ):
623
        """
624
        Switch to using appsrc for getting audio to be played.
625

626
        You *MUST* call :meth:`prepare_change` before calling this method.
627

628
        :param caps: GStreamer caps string describing the audio format to
629
            expect
630
        :type caps: string
631
        :param need_data: callback for when appsrc needs data
632
        :type need_data: callable which takes data length hint in ms
633
        :param enough_data: callback for when appsrc has enough data
634
        :type enough_data: callable
635
        :param seek_data: callback for when data from a new position is needed
636
            to continue playback
637
        :type seek_data: callable which takes time position in ms
638
        """
639 0
        self._appsrc.prepare(
640
            Gst.Caps.from_string(caps), need_data, enough_data, seek_data
641
        )
642 0
        uri = "appsrc://"
643 0
        self._pending_uri = uri
644 0
        self._playbin.set_property("uri", uri)
645

646 3
    def emit_data(self, buffer_):
647
        """
648
        Call this to deliver raw audio data to be played.
649

650
        If the buffer is :class:`None`, the end-of-stream token is put on the
651
        playbin. We will get a GStreamer message when the stream playback
652
        reaches the token, and can then do any end-of-stream related tasks.
653

654
        Note that the URI must be set to ``appsrc://`` for this to work.
655

656
        Returns :class:`True` if data was delivered.
657

658
        :param buffer_: buffer to pass to appsrc
659
        :type buffer_: :class:`Gst.Buffer` or :class:`None`
660
        :rtype: boolean
661
        """
662 0
        return self._appsrc.push(buffer_)
663

664 3
    def set_about_to_finish_callback(self, callback):
665
        """
666
        Configure audio to use an about-to-finish callback.
667

668
        This should be used to achieve gapless playback. For this to work the
669
        callback *MUST* call :meth:`set_uri` with the new URI to play and
670
        block until this call has been made. :meth:`prepare_change` is not
671
        needed before :meth:`set_uri` in this one special case.
672

673
        :param callable callback: Callback to run when we need the next URI.
674
        """
675 3
        self._about_to_finish_callback = callback
676

677 3
    def get_position(self):
678
        """
679
        Get position in milliseconds.
680

681
        :rtype: int
682
        """
683 0
        success, position = self._playbin.query_position(Gst.Format.TIME)
684

685 0
        if not success:
686
            # TODO: take state into account for this and possibly also return
687
            # None as the unknown value instead of zero?
688 0
            logger.debug("Position query failed")
689 0
            return 0
690

691 0
        return utils.clocktime_to_millisecond(position)
692

693 3
    def set_position(self, position):
694
        """
695
        Set position in milliseconds.
696

697
        :param position: the position in milliseconds
698
        :type position: int
699
        :rtype: :class:`True` if successful, else :class:`False`
700
        """
701
        # TODO: double check seek flags in use.
702 3
        gst_position = utils.millisecond_to_clocktime(position)
703 3
        gst_logger.debug("Sending flushing seek: position=%r", gst_position)
704
        # Send seek event to the queue not the playbin. The default behavior
705
        # for bins is to forward this event to all sinks. Which results in
706
        # duplicate seek events making it to appsrc. Since elements are not
707
        # allowed to act on the seek event, only modify it, this should be safe
708
        # to do.
709 3
        result = self._queue.seek_simple(
710
            Gst.Format.TIME, Gst.SeekFlags.FLUSH, gst_position
711
        )
712 3
        return result
713

714 3
    def start_playback(self):
715
        """
716
        Notify GStreamer that it should start playback.
717

718
        :rtype: :class:`True` if successfull, else :class:`False`
719
        """
720 3
        return self._set_state(Gst.State.PLAYING)
721

722 3
    def pause_playback(self):
723
        """
724
        Notify GStreamer that it should pause playback.
725

726
        :rtype: :class:`True` if successfull, else :class:`False`
727
        """
728 3
        return self._set_state(Gst.State.PAUSED)
729

730 3
    def prepare_change(self):
731
        """
732
        Notify GStreamer that we are about to change state of playback.
733

734
        This function *MUST* be called before changing URIs or doing
735
        changes like updating data that is being pushed. The reason for this
736
        is that GStreamer will reset all its state when it changes to
737
        :attr:`Gst.State.READY`.
738
        """
739 3
        return self._set_state(Gst.State.READY)
740

741 3
    def stop_playback(self):
742
        """
743
        Notify GStreamer that is should stop playback.
744

745
        :rtype: :class:`True` if successfull, else :class:`False`
746
        """
747 3
        return self._set_state(Gst.State.NULL)
748

749 3
    def wait_for_state_change(self):
750
        """Block until any pending state changes are complete.
751

752
        Should only be used by tests.
753
        """
754 3
        self._playbin.get_state(timeout=Gst.CLOCK_TIME_NONE)
755

756 3
    def enable_sync_handler(self):
757
        """Enable manual processing of messages from bus.
758

759
        Should only be used by tests.
760
        """
761

762 3
        def sync_handler(bus, message):
763 3
            self._handler.on_message(bus, message)
764 3
            return Gst.BusSyncReply.DROP
765

766 3
        bus = self._playbin.get_bus()
767 3
        bus.set_sync_handler(sync_handler)
768

769 3
    def _set_state(self, state):
770
        """
771
        Internal method for setting the raw GStreamer state.
772

773
        .. digraph:: gst_state_transitions
774

775
            graph [rankdir="LR"];
776
            node [fontsize=10];
777

778
            "NULL" -> "READY"
779
            "PAUSED" -> "PLAYING"
780
            "PAUSED" -> "READY"
781
            "PLAYING" -> "PAUSED"
782
            "READY" -> "NULL"
783
            "READY" -> "PAUSED"
784

785
        :param state: State to set playbin to. One of: `Gst.State.NULL`,
786
            `Gst.State.READY`, `Gst.State.PAUSED` and `Gst.State.PLAYING`.
787
        :type state: :class:`Gst.State`
788
        :rtype: :class:`True` if successfull, else :class:`False`
789
        """
790 3
        if state < Gst.State.PAUSED:
791 3
            self._buffering = False
792

793 3
        self._target_state = state
794 3
        result = self._playbin.set_state(state)
795 3
        gst_logger.debug(
796
            "Changing state to %s: result=%s",
797
            state.value_name,
798
            result.value_name,
799
        )
800

801 3
        if result == Gst.StateChangeReturn.FAILURE:
802 3
            logger.warning(
803
                "Setting GStreamer state to %s failed", state.value_name
804
            )
805 3
            return False
806
        # TODO: at this point we could already emit stopped event instead
807
        # of faking it in the message handling when result=OK
808 3
        return True
809

810
    # TODO: bake this into setup appsrc perhaps?
811 3
    def set_metadata(self, track):
812
        """
813
        Set track metadata for currently playing song.
814

815
        Only needs to be called by sources such as ``appsrc`` which do not
816
        already inject tags in playbin, e.g. when using :meth:`emit_data` to
817
        deliver raw audio data to GStreamer.
818

819
        :param track: the current track
820
        :type track: :class:`mopidy.models.Track`
821
        """
822 0
        taglist = Gst.TagList.new_empty()
823 0
        artists = [a for a in (track.artists or []) if a.name]
824

825 0
        def set_value(tag, value):
826 0
            gobject_value = GObject.Value()
827 0
            gobject_value.init(GObject.TYPE_STRING)
828 0
            gobject_value.set_string(value)
829 0
            taglist.add_value(Gst.TagMergeMode.REPLACE, tag, gobject_value)
830

831
        # Default to blank data to trick shoutcast into clearing any previous
832
        # values it might have.
833
        # TODO: Verify if this works at all, likely it doesn't.
834 0
        set_value(Gst.TAG_ARTIST, " ")
835 0
        set_value(Gst.TAG_TITLE, " ")
836 0
        set_value(Gst.TAG_ALBUM, " ")
837

838 0
        if artists:
839 0
            set_value(Gst.TAG_ARTIST, ", ".join(a.name for a in artists))
840

841 0
        if track.name:
842 0
            set_value(Gst.TAG_TITLE, track.name)
843

844 0
        if track.album and track.album.name:
845 0
            set_value(Gst.TAG_ALBUM, track.album.name)
846

847 0
        gst_logger.debug(
848
            "Sending TAG event for track %r: %r", track.uri, taglist.to_string()
849
        )
850 0
        event = Gst.Event.new_tag(taglist)
851 0
        if self._pending_uri:
852 0
            self._pending_metadata = event
853
        else:
854 0
            self._playbin.send_event(event)
855

856 3
    def get_current_tags(self):
857
        """
858
        Get the currently playing media's tags.
859

860
        If no tags have been found, or nothing is playing this returns an empty
861
        dictionary. For each set of tags we collect a tags_changed event is
862
        emitted with the keys of the changes tags. After such calls users may
863
        call this function to get the updated values.
864

865
        :rtype: {key: [values]} dict for the current media.
866
        """
867
        # TODO: should this be a (deep) copy? most likely yes
868
        # TODO: should we return None when stopped?
869
        # TODO: support only fetching keys we care about?
870 3
        return self._tags

Read our documentation on viewing source code .

Loading