mopidy / mopidy
1 2
import logging
2 2
import pathlib
3 2
import re
4 2
import urllib
5

6 2
from mopidy.internal import xdg
7

8 2
logger = logging.getLogger(__name__)
9

10

11 2
XDG_DIRS = xdg.get_dirs()
12

13

14 2
def get_or_create_dir(dir_path):
15 2
    dir_path = expand_path(dir_path)
16 2
    if dir_path.is_file():
17 2
        raise OSError(
18
            f"A file with the same name as the desired dir, "
19
            f"{dir_path!r}, already exists."
20
        )
21 2
    elif not dir_path.is_dir():
22 2
        logger.info(f"Creating dir {dir_path.as_uri()}")
23 2
        dir_path.mkdir(mode=0o755, parents=True)
24 2
    return dir_path
25

26

27 2
def get_or_create_file(file_path, mkdir=True, content=None):
28 2
    file_path = expand_path(file_path)
29 2
    if isinstance(content, str):
30 2
        content = content.encode()
31 2
    if mkdir:
32 2
        get_or_create_dir(file_path.parent)
33 2
    if not file_path.is_file():
34 2
        logger.info(f"Creating file {file_path.as_uri()}")
35 2
        file_path.touch(exist_ok=False)
36 2
        if content is not None:
37 2
            file_path.write_bytes(content)
38 2
    return file_path
39

40

41 2
def get_unix_socket_path(socket_path):
42 2
    match = re.search("^unix:(.*)", socket_path)
43 2
    if not match:
44 2
        return None
45 2
    return match.group(1)
46

47

48 2
def path_to_uri(path):
49
    """
50
    Convert OS specific path to file:// URI.
51

52
    Accepts either unicode strings or bytestrings. The encoding of any
53
    bytestring will be maintained so that :func:`uri_to_path` can return the
54
    same bytestring.
55

56
    Returns a file:// URI as an unicode string.
57
    """
58 2
    return pathlib.Path(path).as_uri()
59

60

61 2
def uri_to_path(uri):
62
    """
63
    Convert an URI to a OS specific path.
64
    """
65 2
    bytes_path = urllib.parse.unquote_to_bytes(urllib.parse.urlsplit(uri).path)
66 2
    unicode_path = bytes_path.decode(errors="surrogateescape")
67 2
    return pathlib.Path(unicode_path)
68

69

70 2
def expand_path(path):
71 2
    if isinstance(path, bytes):
72 2
        path = path.decode(errors="surrogateescape")
73 2
    path = str(pathlib.Path(path))
74

75 2
    for xdg_var, xdg_dir in XDG_DIRS.items():
76 2
        path = path.replace("$" + xdg_var, str(xdg_dir))
77 2
    if "$" in path:
78 2
        return None
79

80 2
    return pathlib.Path(path).expanduser().resolve()
81

82

83 2
def is_path_inside_base_dir(path, base_path):
84 2
    if isinstance(path, bytes):
85 2
        path = path.decode(errors="surrogateescape")
86 2
    if isinstance(base_path, bytes):
87 2
        base_path = base_path.decode(errors="surrogateescape")
88

89 2
    path = pathlib.Path(path).resolve()
90 2
    base_path = pathlib.Path(base_path).resolve()
91

92 2
    if path.is_file():
93
        # Use dir of file for prefix comparision, so we don't accept
94
        # /tmp/foo.m3u as being inside /tmp/foo, simply because they have a
95
        # common prefix, /tmp/foo, which matches the base path, /tmp/foo.
96 2
        path = path.parent
97

98
    # Check if dir of file is the base path or a subdir
99 2
    try:
100 2
        path.relative_to(base_path)
101 2
    except ValueError:
102 2
        return False
103
    else:
104 2
        return True

Read our documentation on viewing source code .

Loading