1 2
import base64
2 2
import logging
3 2
import os
4

5 2
from mopidy import backend, exceptions, models
6 2
from mopidy.audio import scan, tags
7 2
from mopidy.exceptions import ScannerError
8 2
from mopidy.internal import path
9 2
from mopidy.models import Image
10

11 2
logger = logging.getLogger(__name__)
12

13

14 2
class FileLibraryProvider(backend.LibraryProvider):
15
    """Library for browsing local files."""
16

17
    # TODO: get_images that can pull from metadata and/or .folder.png etc?
18
    # TODO: handle playlists?
19

20 2
    @property
21 1
    def root_directory(self):
22 0
        if not self._media_dirs:
23 0
            return None
24 0
        elif len(self._media_dirs) == 1:
25 0
            uri = path.path_to_uri(self._media_dirs[0]["path"])
26
        else:
27 0
            uri = "file:root"
28 0
        return models.Ref.directory(name="Files", uri=uri)
29

30 2
    def __init__(self, backend, config):
31 2
        super().__init__(backend)
32 2
        self._media_dirs = list(self._get_media_dirs(config))
33 2
        self._show_dotfiles = config["file"]["show_dotfiles"]
34 2
        self._excluded_file_extensions = tuple(
35
            file_ext.lower()
36
            for file_ext in config["file"]["excluded_file_extensions"]
37
        )
38 2
        self._follow_symlinks = config["file"]["follow_symlinks"]
39

40 2
        self._scanner = scan.Scanner(timeout=config["file"]["metadata_timeout"])
41

42 2
        self._folder_image_names = config["file"].get("folder_image_names", [])
43

44 2
    def browse(self, uri):
45 0
        logger.debug("Browsing files at: %s", uri)
46 0
        result = []
47 0
        local_path = path.uri_to_path(uri)
48

49 0
        if str(local_path) == "root":
50 0
            return list(self._get_media_dirs_refs())
51

52 0
        if not self._is_in_basedir(local_path):
53 0
            logger.warning(
54
                "Rejected attempt to browse path (%s) outside dirs defined "
55
                "in file/media_dirs config.",
56
                uri,
57
            )
58 0
            return []
59

60 0
        for dir_entry in local_path.iterdir():
61 0
            child_path = dir_entry.resolve()
62 0
            uri = path.path_to_uri(child_path)
63

64 0
            if not self._show_dotfiles and dir_entry.name.startswith("."):
65 0
                continue
66

67 0
            if (
68
                self._excluded_file_extensions
69
                and dir_entry.suffix in self._excluded_file_extensions
70
            ):
71 0
                continue
72

73 0
            if child_path.is_symlink() and not self._follow_symlinks:
74 0
                logger.debug("Ignoring symlink: %s", uri)
75 0
                continue
76

77 0
            if not self._is_in_basedir(child_path):
78 0
                logger.debug("Ignoring symlink to outside base dir: %s", uri)
79 0
                continue
80

81 0
            if child_path.is_dir():
82 0
                result.append(
83
                    models.Ref.directory(name=dir_entry.name, uri=uri)
84
                )
85 0
            elif child_path.is_file():
86 0
                result.append(models.Ref.track(name=dir_entry.name, uri=uri))
87

88 0
        def order(item):
89 0
            return (item.type != models.Ref.DIRECTORY, item.name)
90

91 0
        result.sort(key=order)
92

93 0
        return result
94

95 2
    def lookup(self, uri):
96 2
        logger.debug("Looking up file URI: %s", uri)
97 2
        local_path = path.uri_to_path(uri)
98

99 2
        try:
100 2
            result = self._scanner.scan(uri)
101 2
            track = tags.convert_tags_to_track(result.tags).replace(
102
                uri=uri, length=result.duration
103
            )
104 0
        except exceptions.ScannerError as e:
105 0
            logger.warning("Failed looking up %s: %s", uri, e)
106 0
            track = models.Track(uri=uri)
107

108 2
        if not track.name:
109 2
            track = track.replace(name=local_path.name)
110

111 2
        return [track]
112

113 2
    def _get_media_dirs(self, config):
114 2
        for entry in config["file"]["media_dirs"]:
115 0
            media_dir = {}
116 0
            media_dir_split = entry.split("|", 1)
117 0
            local_path = path.expand_path(media_dir_split[0])
118

119 0
            if local_path is None:
120 0
                logger.debug(
121
                    "Failed expanding path (%s) from file/media_dirs config "
122
                    "value.",
123
                    media_dir_split[0],
124
                )
125 0
                continue
126 0
            elif not local_path.is_dir():
127 0
                logger.warning(
128
                    "%s is not a directory. Please create the directory or "
129
                    "update the file/media_dirs config value.",
130
                    local_path,
131
                )
132 0
                continue
133

134 0
            media_dir["path"] = local_path
135 0
            if len(media_dir_split) == 2:
136 0
                media_dir["name"] = media_dir_split[1]
137
            else:
138
                # TODO Mpd client should accept / in dir name
139 0
                media_dir["name"] = media_dir_split[0].replace(os.sep, "+")
140

141 0
            yield media_dir
142

143 2
    def _get_media_dirs_refs(self):
144 0
        for media_dir in self._media_dirs:
145 0
            yield models.Ref.directory(
146
                name=media_dir["name"], uri=path.path_to_uri(media_dir["path"])
147
            )
148

149 2
    def _is_in_basedir(self, local_path):
150 0
        return any(
151
            path.is_path_inside_base_dir(local_path, media_dir["path"])
152
            for media_dir in self._media_dirs
153
        )
154

155 2
    def get_images(self, uris):
156 0
        images = {}
157 0
        for uri in uris:
158 0
            images[uri] = ()
159 0
            try:
160 0
                result = self._scanner.scan(uri)
161 0
            except ScannerError as e:
162 0
                logger.warning(str(e))
163 0
                continue
164 0
            if len(result.tags.get("image", [])) > 0:
165 0
                image = result.tags["image"][0]
166 0
                images[uri] = (
167
                    Image(
168
                        uri=f"data:;base64, {base64.b64encode(image).decode('utf-8')}"
169
                    ),
170
                )
171 0
                continue
172 0
            for i in self._folder_image_names:
173 0
                if (path.uri_to_path(uri).parent / i).exists():
174 0
                    with open(path.uri_to_path(uri).parent / i, "rb",) as f:
175 0
                        images[uri] = (
176
                            Image(
177
                                uri=f"data:;base64,"
178
                                f"{base64.b64encode(f.read()).decode('utf-8')}"
179
                            ),
180
                        )
181 0
                    break
182 0
        return images

Read our documentation on viewing source code .

Loading