1 3
import collections
2 3
import datetime
3 3
import logging
4 3
import numbers
5

6 3
from mopidy.internal import log
7 3
from mopidy.internal.gi import GLib, Gst
8 3
from mopidy.models import Album, Artist, Track
9

10 3
logger = logging.getLogger(__name__)
11

12

13 3
def convert_taglist(taglist):
14
    """Convert a :class:`Gst.TagList` to plain Python types.
15

16
    Knows how to convert:
17

18
    - Dates
19
    - Buffers
20
    - Numbers
21
    - Strings
22
    - Booleans
23

24
    Unknown types will be ignored and trace logged. Tag keys are all strings
25
    defined as part GStreamer under GstTagList_.
26

27
    .. _GstTagList: https://developer.gnome.org/gstreamer/stable/\
28
gstreamer-GstTagList.html
29

30
    :param taglist: A GStreamer taglist to be converted.
31
    :type taglist: :class:`Gst.TagList`
32
    :rtype: dictionary of tag keys with a list of values.
33
    """
34 3
    result = collections.defaultdict(list)
35

36 3
    for n in range(taglist.n_tags()):
37 3
        tag = taglist.nth_tag_name(n)
38

39 3
        for i in range(taglist.get_tag_size(tag)):
40 3
            value = taglist.get_value_index(tag, i)
41

42 3
            if isinstance(value, GLib.Date):
43 3
                try:
44 3
                    date = datetime.date(
45
                        value.get_year(), value.get_month(), value.get_day()
46
                    )
47 3
                    result[tag].append(date.isoformat())
48 3
                except ValueError:
49 3
                    logger.debug(
50
                        "Ignoring dodgy date value: %d-%d-%d",
51
                        value.get_year(),
52
                        value.get_month(),
53
                        value.get_day(),
54
                    )
55 3
            elif isinstance(value, Gst.DateTime):
56 3
                result[tag].append(value.to_iso8601_string())
57 3
            elif isinstance(value, bytes):
58 0
                result[tag].append(value.decode(errors="replace"))
59 3
            elif isinstance(value, (str, bool, numbers.Number)):
60 3
                result[tag].append(value)
61 0
            elif isinstance(value, Gst.Sample):
62 0
                data = _extract_sample_data(value)
63 0
                if data:
64 0
                    result[tag].append(data)
65
            else:
66 0
                logger.log(
67
                    log.TRACE_LOG_LEVEL,
68
                    "Ignoring unknown tag data: %r = %r",
69
                    tag,
70
                    value,
71
                )
72

73
    # TODO: dict(result) to not leak the defaultdict, or just use setdefault?
74 3
    return result
75

76

77 3
def _extract_sample_data(sample):
78 0
    buf = sample.get_buffer()
79 0
    if not buf:
80 0
        return None
81 0
    return _extract_buffer_data(buf)
82

83

84
# Fix for https://github.com/mopidy/mopidy/issues/1827
85
# Using GstBuffer.extract_dup() is a memory leak in versions of PyGObject prior
86
# to v3.36.0. As a workaround we use the GstMemory APIs instead.
87 3
def _extract_buffer_data(buf):
88 0
    mem = buf.get_all_memory()
89 0
    if not mem:
90 0
        return None
91 0
    success, info = mem.map(Gst.MapFlags.READ)
92 0
    if not success:
93 0
        return None
94 0
    data = info.data
95 0
    mem.unmap(info)
96 0
    return data
97

98

99
# TODO: split based on "stream" and "track" based conversion? i.e. handle data
100
# from radios in it's own helper instead?
101 3
def convert_tags_to_track(tags):
102
    """Convert our normalized tags to a track.
103

104
    :param  tags: dictionary of tag keys with a list of values
105
    :type tags: :class:`dict`
106
    :rtype: :class:`mopidy.models.Track`
107
    """
108 3
    album_kwargs = {}
109 3
    track_kwargs = {}
110

111 3
    track_kwargs["composers"] = _artists(tags, Gst.TAG_COMPOSER)
112 3
    track_kwargs["performers"] = _artists(tags, Gst.TAG_PERFORMER)
113 3
    track_kwargs["artists"] = _artists(
114
        tags, Gst.TAG_ARTIST, "musicbrainz-artistid", "musicbrainz-sortname"
115
    )
116 3
    album_kwargs["artists"] = _artists(
117
        tags, Gst.TAG_ALBUM_ARTIST, "musicbrainz-albumartistid"
118
    )
119

120 3
    track_kwargs["genre"] = "; ".join(tags.get(Gst.TAG_GENRE, []))
121 3
    track_kwargs["name"] = "; ".join(tags.get(Gst.TAG_TITLE, []))
122 3
    if not track_kwargs["name"]:
123 3
        track_kwargs["name"] = "; ".join(tags.get(Gst.TAG_ORGANIZATION, []))
124

125 3
    track_kwargs["comment"] = "; ".join(tags.get("comment", []))
126 3
    if not track_kwargs["comment"]:
127 3
        track_kwargs["comment"] = "; ".join(tags.get(Gst.TAG_LOCATION, []))
128 3
    if not track_kwargs["comment"]:
129 3
        track_kwargs["comment"] = "; ".join(tags.get(Gst.TAG_COPYRIGHT, []))
130

131 3
    track_kwargs["track_no"] = tags.get(Gst.TAG_TRACK_NUMBER, [None])[0]
132 3
    track_kwargs["disc_no"] = tags.get(Gst.TAG_ALBUM_VOLUME_NUMBER, [None])[0]
133 3
    track_kwargs["bitrate"] = tags.get(Gst.TAG_BITRATE, [None])[0]
134 3
    track_kwargs["musicbrainz_id"] = tags.get("musicbrainz-trackid", [None])[0]
135

136 3
    album_kwargs["name"] = tags.get(Gst.TAG_ALBUM, [None])[0]
137 3
    album_kwargs["num_tracks"] = tags.get(Gst.TAG_TRACK_COUNT, [None])[0]
138 3
    album_kwargs["num_discs"] = tags.get(Gst.TAG_ALBUM_VOLUME_COUNT, [None])[0]
139 3
    album_kwargs["musicbrainz_id"] = tags.get("musicbrainz-albumid", [None])[0]
140

141 3
    album_kwargs["date"] = tags.get(Gst.TAG_DATE, [None])[0]
142 3
    if not album_kwargs["date"]:
143 3
        datetime = tags.get(Gst.TAG_DATE_TIME, [None])[0]
144 3
        if datetime is not None:
145 3
            album_kwargs["date"] = datetime.split("T")[0]
146 3
    track_kwargs["date"] = album_kwargs["date"]
147

148
    # Clear out any empty values we found
149 3
    track_kwargs = {k: v for k, v in track_kwargs.items() if v}
150 3
    album_kwargs = {k: v for k, v in album_kwargs.items() if v}
151

152
    # Only bother with album if we have a name to show.
153 3
    if album_kwargs.get("name"):
154 3
        track_kwargs["album"] = Album(**album_kwargs)
155

156 3
    return Track(**track_kwargs)
157

158

159 3
def _artists(tags, artist_name, artist_id=None, artist_sortname=None):
160
    # Name missing, don't set artist
161 3
    if not tags.get(artist_name):
162 3
        return None
163

164
    # One artist name and either id or sortname, include all available fields
165 3
    if len(tags[artist_name]) == 1 and (
166
        artist_id in tags or artist_sortname in tags
167
    ):
168 3
        attrs = {"name": tags[artist_name][0]}
169 3
        if artist_id in tags:
170 3
            attrs["musicbrainz_id"] = tags[artist_id][0]
171 3
        if artist_sortname in tags:
172 3
            attrs["sortname"] = tags[artist_sortname][0]
173 3
        return [Artist(**attrs)]
174

175
    # Multiple artist, provide artists with name only to avoid ambiguity.
176 3
    return [Artist(name=name) for name in tags[artist_name]]

Read our documentation on viewing source code .

Loading