1 3
import collections
2 3
import logging
3 3
import time
4

5 3
from mopidy import exceptions
6 3
from mopidy.audio import tags as tags_lib
7 3
from mopidy.audio import utils
8 3
from mopidy.internal import log
9 3
from mopidy.internal.gi import Gst, GstPbutils
10

11
# GST_ELEMENT_FACTORY_LIST:
12 3
_DECODER = 1 << 0
13 3
_AUDIO = 1 << 50
14 3
_DEMUXER = 1 << 5
15 3
_DEPAYLOADER = 1 << 8
16 3
_PARSER = 1 << 6
17

18
# GST_TYPE_AUTOPLUG_SELECT_RESULT:
19 3
_SELECT_TRY = 0
20 3
_SELECT_EXPOSE = 1
21

22 3
_Result = collections.namedtuple(
23
    "Result", ("uri", "tags", "duration", "seekable", "mime", "playable")
24
)
25

26 3
logger = logging.getLogger(__name__)
27

28

29 3
def _trace(*args, **kwargs):
30 0
    logger.log(log.TRACE_LOG_LEVEL, *args, **kwargs)
31

32

33
# TODO: replace with a scan(uri, timeout=1000, proxy_config=None)?
34 3
class Scanner:
35

36
    """
37
    Helper to get tags and other relevant info from URIs.
38

39
    :param timeout: timeout for scanning a URI in ms
40
    :param proxy_config: dictionary containing proxy config strings.
41
    :type event: int
42
    """
43

44 3
    def __init__(self, timeout=1000, proxy_config=None):
45 3
        self._timeout_ms = int(timeout)
46 3
        self._proxy_config = proxy_config or {}
47

48 3
    def scan(self, uri, timeout=None):
49
        """
50
        Scan the given uri collecting relevant metadata.
51

52
        :param uri: URI of the resource to scan.
53
        :type uri: string
54
        :param timeout: timeout for scanning a URI in ms. Defaults to the
55
            ``timeout`` value used when creating the scanner.
56
        :type timeout: int
57
        :return: A named tuple containing
58
            ``(uri, tags, duration, seekable, mime)``.
59
            ``tags`` is a dictionary of lists for all the tags we found.
60
            ``duration`` is the length of the URI in milliseconds, or
61
            :class:`None` if the URI has no duration. ``seekable`` is boolean.
62
            indicating if a seek would succeed.
63
        """
64 3
        timeout = int(timeout or self._timeout_ms)
65 3
        tags, duration, seekable, mime = None, None, None, None
66 3
        pipeline, signals = _setup_pipeline(uri, self._proxy_config)
67

68 3
        try:
69 3
            _start_pipeline(pipeline)
70 3
            tags, mime, have_audio, duration = _process(pipeline, timeout)
71 3
            seekable = _query_seekable(pipeline)
72
        finally:
73 3
            signals.clear()
74 3
            pipeline.set_state(Gst.State.NULL)
75 3
            del pipeline
76

77 3
        return _Result(uri, tags, duration, seekable, mime, have_audio)
78

79

80
# Turns out it's _much_ faster to just create a new pipeline for every as
81
# decodebins and other elements don't seem to take well to being reused.
82 3
def _setup_pipeline(uri, proxy_config=None):
83 3
    src = Gst.Element.make_from_uri(Gst.URIType.SRC, uri)
84 3
    if not src:
85 0
        raise exceptions.ScannerError(f"GStreamer can not open: {uri}")
86

87 3
    if proxy_config:
88 0
        utils.setup_proxy(src, proxy_config)
89

90 3
    signals = utils.Signals()
91 3
    pipeline = Gst.ElementFactory.make("pipeline")
92 3
    pipeline.add(src)
93

94 3
    if _has_src_pads(src):
95 3
        _setup_decodebin(src, src.get_static_pad("src"), pipeline, signals)
96 0
    elif _has_dynamic_src_pad(src):
97 0
        signals.connect(src, "pad-added", _setup_decodebin, pipeline, signals)
98
    else:
99 0
        raise exceptions.ScannerError("No pads found in source element.")
100

101 3
    return pipeline, signals
102

103

104 3
def _has_src_pads(element):
105 3
    pads = []
106 3
    element.iterate_src_pads().foreach(pads.append)
107 3
    return bool(pads)
108

109

110 3
def _has_dynamic_src_pad(element):
111 0
    for template in element.get_pad_template_list():
112 0
        if template.direction == Gst.PadDirection.SRC:
113 0
            if template.presence == Gst.PadPresence.SOMETIMES:
114 0
                return True
115 0
    return False
116

117

118 3
def _setup_decodebin(element, pad, pipeline, signals):
119 3
    typefind = Gst.ElementFactory.make("typefind")
120 3
    decodebin = Gst.ElementFactory.make("decodebin")
121

122 3
    for element in (typefind, decodebin):
123 3
        pipeline.add(element)
124 3
        element.sync_state_with_parent()
125

126 3
    pad.link(typefind.get_static_pad("sink"))
127 3
    typefind.link(decodebin)
128

129 3
    signals.connect(typefind, "have-type", _have_type, decodebin)
130 3
    signals.connect(decodebin, "pad-added", _pad_added, pipeline)
131 3
    signals.connect(decodebin, "autoplug-select", _autoplug_select)
132

133

134 3
def _have_type(element, probability, caps, decodebin):
135 0
    decodebin.set_property("sink-caps", caps)
136 0
    struct = Gst.Structure.new_empty("have-type")
137 0
    struct.set_value("caps", caps.get_structure(0))
138 0
    element.get_bus().post(Gst.Message.new_application(element, struct))
139

140

141 3
def _pad_added(element, pad, pipeline):
142 0
    sink = Gst.ElementFactory.make("fakesink")
143 0
    sink.set_property("sync", False)
144

145 0
    pipeline.add(sink)
146 0
    sink.sync_state_with_parent()
147 0
    pad.link(sink.get_static_pad("sink"))
148

149 0
    if pad.query_caps().is_subset(Gst.Caps.from_string("audio/x-raw")):
150
        # Probably won't happen due to autoplug-select fix, but lets play it
151
        # safe until we've tested more.
152 0
        struct = Gst.Structure.new_empty("have-audio")
153 0
        element.get_bus().post(Gst.Message.new_application(element, struct))
154

155

156 3
def _autoplug_select(element, pad, caps, factory):
157 0
    if factory.list_is_type(_DECODER | _AUDIO):
158 0
        struct = Gst.Structure.new_empty("have-audio")
159 0
        element.get_bus().post(Gst.Message.new_application(element, struct))
160 0
    if not factory.list_is_type(_DEMUXER | _DEPAYLOADER | _PARSER):
161 0
        return _SELECT_EXPOSE
162 0
    return _SELECT_TRY
163

164

165 3
def _start_pipeline(pipeline):
166 3
    result = pipeline.set_state(Gst.State.PAUSED)
167 3
    if result == Gst.StateChangeReturn.NO_PREROLL:
168 0
        pipeline.set_state(Gst.State.PLAYING)
169

170

171 3
def _query_duration(pipeline):
172 3
    success, duration = pipeline.query_duration(Gst.Format.TIME)
173 3
    if not success:
174 3
        duration = None  # Make sure error case preserves None.
175 3
    elif duration < 0:
176 0
        duration = None  # Stream without duration.
177
    else:
178 3
        duration = int(duration // Gst.MSECOND)
179 3
    return success, duration
180

181

182 3
def _query_seekable(pipeline):
183 3
    query = Gst.Query.new_seeking(Gst.Format.TIME)
184 3
    pipeline.query(query)
185 3
    return query.parse_seeking()[1]
186

187

188 3
def _process(pipeline, timeout_ms):
189 3
    bus = pipeline.get_bus()
190 3
    tags = {}
191 3
    mime = None
192 3
    have_audio = False
193 3
    missing_message = None
194 3
    duration = None
195

196 3
    types = (
197
        Gst.MessageType.ELEMENT
198
        | Gst.MessageType.APPLICATION
199
        | Gst.MessageType.ERROR
200
        | Gst.MessageType.EOS
201
        | Gst.MessageType.ASYNC_DONE
202
        | Gst.MessageType.DURATION_CHANGED
203
        | Gst.MessageType.TAG
204
    )
205

206 3
    timeout = timeout_ms
207 3
    start = int(time.time() * 1000)
208 3
    while timeout > 0:
209 3
        msg = bus.timed_pop_filtered(timeout * Gst.MSECOND, types)
210 3
        if msg is None:
211 0
            break
212

213 3
        if logger.isEnabledFor(log.TRACE_LOG_LEVEL) and msg.get_structure():
214 0
            debug_text = msg.get_structure().to_string()
215 0
            if len(debug_text) > 77:
216 0
                debug_text = debug_text[:77] + "..."
217 0
            _trace("element %s: %s", msg.src.get_name(), debug_text)
218

219 3
        if msg.type == Gst.MessageType.ELEMENT:
220 3
            if GstPbutils.is_missing_plugin_message(msg):
221 3
                missing_message = msg
222 3
        elif msg.type == Gst.MessageType.APPLICATION:
223 3
            if msg.get_structure().get_name() == "have-type":
224 0
                mime = msg.get_structure().get_value("caps").get_name()
225 0
                if mime and (
226
                    mime.startswith("text/") or mime == "application/xml"
227
                ):
228 0
                    return tags, mime, have_audio, duration
229 3
            elif msg.get_structure().get_name() == "have-audio":
230 3
                have_audio = True
231 3
        elif msg.type == Gst.MessageType.ERROR:
232 3
            error, _debug = msg.parse_error()
233 3
            if missing_message and not mime:
234 3
                caps = missing_message.get_structure().get_value("detail")
235 3
                mime = caps.get_structure(0).get_name()
236 3
                return tags, mime, have_audio, duration
237 3
            raise exceptions.ScannerError(str(error))
238 3
        elif msg.type == Gst.MessageType.EOS:
239 3
            return tags, mime, have_audio, duration
240 3
        elif msg.type == Gst.MessageType.ASYNC_DONE:
241 3
            success, duration = _query_duration(pipeline)
242 3
            if tags and success:
243 3
                return tags, mime, have_audio, duration
244

245
            # Don't try workaround for non-seekable sources such as mmssrc:
246 3
            if not _query_seekable(pipeline):
247 0
                return tags, mime, have_audio, duration
248

249
            # Workaround for upstream bug which causes tags/duration to arrive
250
            # after pre-roll. We get around this by starting to play the track
251
            # and then waiting for a duration change.
252
            # https://bugzilla.gnome.org/show_bug.cgi?id=763553
253 3
            logger.debug("Using workaround for duration missing before play.")
254 3
            result = pipeline.set_state(Gst.State.PLAYING)
255 3
            if result == Gst.StateChangeReturn.FAILURE:
256 0
                return tags, mime, have_audio, duration
257

258 3
        elif msg.type == Gst.MessageType.DURATION_CHANGED and tags:
259
            # VBR formats sometimes seem to not have a duration by the time we
260
            # go back to paused. So just try to get it right away.
261 0
            success, duration = _query_duration(pipeline)
262 0
            pipeline.set_state(Gst.State.PAUSED)
263 0
            if success:
264 0
                return tags, mime, have_audio, duration
265 3
        elif msg.type == Gst.MessageType.TAG:
266 3
            taglist = msg.parse_tag()
267
            # Note that this will only keep the last tag.
268 3
            tags.update(tags_lib.convert_taglist(taglist))
269

270 3
        timeout = timeout_ms - (int(time.time() * 1000) - start)
271

272 0
    raise exceptions.ScannerError(f"Timeout after {timeout_ms:d}ms")
273

274

275 3
if __name__ == "__main__":
276 0
    import os
277 0
    import sys
278

279 0
    from mopidy.internal import path
280

281 0
    logging.basicConfig(
282
        format="%(asctime)-15s %(levelname)s %(message)s",
283
        level=log.TRACE_LOG_LEVEL,
284
    )
285

286 0
    scanner = Scanner(5000)
287 0
    for uri in sys.argv[1:]:
288 0
        if not Gst.uri_is_valid(uri):
289 0
            uri = path.path_to_uri(os.path.abspath(uri))
290 0
        try:
291 0
            result = scanner.scan(uri)
292 0
            for key in ("uri", "mime", "duration", "playable", "seekable"):
293 0
                value = getattr(result, key)
294 0
                print(f"{key:<20}   {value}")
295 0
            print("tags")
296 0
            for tag, value in result.tags.items():
297 0
                line = f"{tag:<20}   {value}"
298 0
                if len(line) > 77:
299 0
                    line = line[:77] + "..."
300 0
                print(line)
301 0
        except exceptions.ScannerError as error:
302 0
            print(f"{uri}: {error}")

Read our documentation on viewing source code .

Loading