mopidy / mopidy
1 2
import contextlib
2 2
import locale
3 2
import logging
4 2
import operator
5 2
import os
6 2
import pathlib
7 2
import tempfile
8

9 2
from mopidy import backend
10 2
from mopidy.internal import path
11

12 2
from . import Extension, translator
13

14 2
logger = logging.getLogger(__name__)
15

16

17 2
def log_environment_error(message, error):
18 2
    if isinstance(error.strerror, bytes):
19 0
        strerror = error.strerror.decode(locale.getpreferredencoding())
20
    else:
21 2
        strerror = error.strerror
22 2
    logger.error("%s: %s", message, strerror)
23

24

25 2
@contextlib.contextmanager
26 2
def replace(path, mode="w+b", encoding=None, errors=None):
27 2
    (fd, tempname) = tempfile.mkstemp(dir=str(path.parent))
28 2
    tempname = pathlib.Path(tempname)
29 2
    try:
30 2
        fp = open(fd, mode, encoding=encoding, errors=errors)
31 0
    except Exception:
32 0
        tempname.unlink()
33 0
        os.close(fd)
34 0
        raise
35 2
    try:
36 2
        yield fp
37 2
        fp.flush()
38 2
        os.fsync(fd)
39 2
        tempname.rename(path)
40 0
    except Exception:
41 0
        tempname.unlink()
42 0
        raise
43
    finally:
44 2
        fp.close()
45

46

47 2
class M3UPlaylistsProvider(backend.PlaylistsProvider):
48 2
    def __init__(self, backend, config):
49 2
        super().__init__(backend)
50

51 2
        ext_config = config[Extension.ext_name]
52 2
        if ext_config["playlists_dir"] is None:
53 0
            self._playlists_dir = Extension.get_data_dir(config)
54
        else:
55 2
            self._playlists_dir = path.expand_path(ext_config["playlists_dir"])
56 2
        if ext_config["base_dir"] is None:
57 2
            self._base_dir = self._playlists_dir
58
        else:
59 2
            self._base_dir = path.expand_path(ext_config["base_dir"])
60 2
        self._default_encoding = ext_config["default_encoding"]
61 2
        self._default_extension = ext_config["default_extension"]
62

63 2
    def as_list(self):
64 2
        result = []
65 2
        for entry in self._playlists_dir.iterdir():
66 2
            if entry.suffix not in [".m3u", ".m3u8"]:
67 2
                continue
68 2
            elif not entry.is_file():
69 0
                continue
70
            else:
71 2
                playlist_path = entry.relative_to(self._playlists_dir)
72 2
                result.append(translator.path_to_ref(playlist_path))
73 2
        result.sort(key=operator.attrgetter("name"))
74 2
        return result
75

76 2
    def create(self, name):
77 2
        path = translator.path_from_name(name.strip(), self._default_extension)
78 2
        try:
79 2
            with self._open(path, "w"):
80 2
                pass
81 2
            mtime = self._abspath(path).stat().st_mtime
82 0
        except OSError as e:
83 0
            log_environment_error(f"Error creating playlist {name!r}", e)
84
        else:
85 2
            return translator.playlist(path, [], mtime)
86

87 2
    def delete(self, uri):
88 2
        path = translator.uri_to_path(uri)
89 2
        if not self._is_in_basedir(path):
90 2
            logger.debug("Ignoring path outside playlist dir: %s", uri)
91 2
            return False
92 2
        try:
93 2
            self._abspath(path).unlink()
94 2
        except OSError as e:
95 2
            log_environment_error(f"Error deleting playlist {uri!r}", e)
96 2
            return False
97
        else:
98 2
            return True
99

100 2
    def get_items(self, uri):
101 2
        path = translator.uri_to_path(uri)
102 2
        if not self._is_in_basedir(path):
103 2
            logger.debug("Ignoring path outside playlist dir: %s", uri)
104 2
            return None
105 2
        try:
106 2
            with self._open(path, "r") as fp:
107 2
                items = translator.load_items(fp, self._base_dir)
108 0
        except OSError as e:
109 0
            log_environment_error(f"Error reading playlist {uri!r}", e)
110
        else:
111 2
            return items
112

113 2
    def lookup(self, uri):
114 2
        path = translator.uri_to_path(uri)
115 2
        if not self._is_in_basedir(path):
116 2
            logger.debug("Ignoring path outside playlist dir: %s", uri)
117 2
            return None
118 2
        try:
119 2
            with self._open(path, "r") as fp:
120 2
                items = translator.load_items(fp, self._base_dir)
121 2
            mtime = self._abspath(path).stat().st_mtime
122 2
        except OSError as e:
123 2
            log_environment_error(f"Error reading playlist {uri!r}", e)
124
        else:
125 2
            return translator.playlist(path, items, mtime)
126

127 2
    def refresh(self):
128 2
        pass  # nothing to do
129

130 2
    def save(self, playlist):
131 2
        path = translator.uri_to_path(playlist.uri)
132 2
        if not self._is_in_basedir(path):
133 2
            logger.debug("Ignoring path outside playlist dir: %s", playlist.uri)
134 2
            return None
135 2
        name = translator.name_from_path(path)
136 2
        try:
137 2
            with self._open(path, "w") as fp:
138 2
                translator.dump_items(playlist.tracks, fp)
139 2
            if playlist.name and playlist.name != name:
140 2
                orig_path = path
141 2
                path = translator.path_from_name(playlist.name.strip())
142 2
                path = path.with_suffix(orig_path.suffix)
143 2
                self._abspath(orig_path).rename(self._abspath(path))
144 2
            mtime = self._abspath(path).stat().st_mtime
145 0
        except OSError as e:
146 0
            log_environment_error(f"Error saving playlist {playlist.uri!r}", e)
147
        else:
148 2
            return translator.playlist(path, playlist.tracks, mtime)
149

150 2
    def _abspath(self, path):
151 2
        if not path.is_absolute():
152 2
            return self._playlists_dir / path
153
        else:
154 2
            return path
155

156 2
    def _is_in_basedir(self, local_path):
157 2
        local_path = self._abspath(local_path)
158 2
        return path.is_path_inside_base_dir(local_path, self._playlists_dir)
159

160 2
    def _open(self, path, mode="r"):
161 2
        if path.suffix == ".m3u8":
162 0
            encoding = "utf-8"
163
        else:
164 2
            encoding = self._default_encoding
165 2
        if not path.is_absolute():
166 2
            path = self._abspath(path)
167 2
        if not self._is_in_basedir(path):
168 0
            raise Exception(
169
                f"Path {path!r} is not inside playlist dir {self._playlist_dir!r}"
170
            )
171 2
        if "w" in mode:
172 2
            return replace(path, mode, encoding=encoding, errors="replace")
173
        else:
174 2
            return path.open(mode, encoding=encoding, errors="replace")

Read our documentation on viewing source code .

Loading