mopidy / mopidy
1 2
import logging
2 2
import logging.config
3 2
import logging.handlers
4 2
import platform
5

6 2
LOG_LEVELS = {
7
    -1: dict(root=logging.ERROR, mopidy=logging.WARNING),
8
    0: dict(root=logging.ERROR, mopidy=logging.INFO),
9
    1: dict(root=logging.WARNING, mopidy=logging.DEBUG),
10
    2: dict(root=logging.INFO, mopidy=logging.DEBUG),
11
    3: dict(root=logging.DEBUG, mopidy=logging.DEBUG),
12
    4: dict(root=logging.NOTSET, mopidy=logging.NOTSET),
13
}
14

15
# Custom log level which has even lower priority than DEBUG
16 2
TRACE_LOG_LEVEL = 5
17 2
logging.addLevelName(TRACE_LOG_LEVEL, "TRACE")
18

19 2
logger = logging.getLogger(__name__)
20

21

22 2
class DelayedHandler(logging.Handler):
23 2
    def __init__(self):
24 2
        logging.Handler.__init__(self)
25 2
        self._released = False
26 2
        self._buffer = []
27

28 2
    def handle(self, record):
29 0
        if not self._released:
30 0
            self._buffer.append(record)
31

32 2
    def release(self):
33 0
        self._released = True
34 0
        root = logging.getLogger("")
35 0
        while self._buffer:
36 0
            root.handle(self._buffer.pop(0))
37

38

39 2
_delayed_handler = DelayedHandler()
40

41

42 2
def bootstrap_delayed_logging():
43 0
    root = logging.getLogger("")
44 0
    root.setLevel(logging.NOTSET)
45 0
    root.addHandler(_delayed_handler)
46

47

48 2
def setup_logging(config, base_verbosity_level, args_verbosity_level):
49 0
    logging.captureWarnings(True)
50

51 0
    if config["logging"]["config_file"]:
52
        # Logging config from file must be read before other handlers are
53
        # added. If not, the other handlers will have no effect.
54 0
        try:
55 0
            path = config["logging"]["config_file"]
56 0
            logging.config.fileConfig(path, disable_existing_loggers=False)
57 0
        except Exception as e:
58
            # Catch everything as logging does not specify what can go wrong.
59 0
            logger.error("Loading logging config %r failed. %s", path, e)
60

61 0
    loglevels = config.get("loglevels", {})
62

63 0
    verbosity_level = get_verbosity_level(
64
        config, base_verbosity_level, args_verbosity_level
65
    )
66 0
    verbosity_filter = VerbosityFilter(verbosity_level, loglevels)
67

68 0
    formatter = logging.Formatter(config["logging"]["format"])
69

70 0
    if config["logging"]["color"]:
71 0
        handler = ColorizingStreamHandler(config.get("logcolors", {}))
72
    else:
73 0
        handler = logging.StreamHandler()
74 0
    handler.addFilter(verbosity_filter)
75 0
    handler.setFormatter(formatter)
76

77 0
    logging.getLogger("").addHandler(handler)
78

79 0
    _delayed_handler.release()
80

81

82 2
def get_verbosity_level(config, base_verbosity_level, args_verbosity_level):
83 0
    if args_verbosity_level:
84 0
        result = base_verbosity_level + args_verbosity_level
85
    else:
86 0
        result = base_verbosity_level + config["logging"]["verbosity"]
87

88 0
    if result < min(LOG_LEVELS.keys()):
89 0
        result = min(LOG_LEVELS.keys())
90 0
    if result > max(LOG_LEVELS.keys()):
91 0
        result = max(LOG_LEVELS.keys())
92

93 0
    return result
94

95

96 2
class VerbosityFilter(logging.Filter):
97 2
    def __init__(self, verbosity_level, loglevels):
98 0
        self.verbosity_level = verbosity_level
99 0
        self.loglevels = loglevels
100

101 2
    def filter(self, record):
102 0
        for name, required_log_level in self.loglevels.items():
103 0
            if record.name == name or record.name.startswith(name + "."):
104 0
                return record.levelno >= required_log_level
105

106 0
        if record.name.startswith("mopidy"):
107 0
            required_log_level = LOG_LEVELS[self.verbosity_level]["mopidy"]
108
        else:
109 0
            required_log_level = LOG_LEVELS[self.verbosity_level]["root"]
110 0
        return record.levelno >= required_log_level
111

112

113
#: Available log colors.
114 2
COLORS = [
115
    "black",
116
    "red",
117
    "green",
118
    "yellow",
119
    "blue",
120
    "magenta",
121
    "cyan",
122
    "white",
123
]
124

125

126 2
class ColorizingStreamHandler(logging.StreamHandler):
127

128
    """
129
    Stream handler which colorizes the log using ANSI escape sequences.
130

131
    Does nothing on Windows, which doesn't support ANSI escape sequences.
132

133
    This implementation is based upon https://gist.github.com/vsajip/758430,
134
    which is:
135

136
        Copyright (C) 2010-2012 Vinay Sajip. All rights reserved.
137
        Licensed under the new BSD license.
138
    """
139

140
    # Map logging levels to (background, foreground, bold/intense)
141 2
    level_map = {
142
        TRACE_LOG_LEVEL: (None, "blue", False),
143
        logging.DEBUG: (None, "blue", False),
144
        logging.INFO: (None, "white", False),
145
        logging.WARNING: (None, "yellow", False),
146
        logging.ERROR: (None, "red", False),
147
        logging.CRITICAL: ("red", "white", True),
148
    }
149
    # Map logger name to foreground colors
150 2
    logger_map = {}
151

152 2
    csi = "\x1b["
153 2
    reset = "\x1b[0m"
154

155 2
    is_windows = platform.system() == "Windows"
156

157 2
    def __init__(self, logger_colors):
158 0
        super().__init__()
159 0
        self.logger_map = logger_colors
160

161 2
    @property
162 1
    def is_tty(self):
163 0
        isatty = getattr(self.stream, "isatty", None)
164 0
        return isatty and isatty()
165

166 2
    def emit(self, record):
167 0
        try:
168 0
            message = self.format(record)
169 0
            self.stream.write(message)
170 0
            self.stream.write(getattr(self, "terminator", "\n"))
171 0
            self.flush()
172 0
        except Exception:
173 0
            self.handleError(record)
174

175 2
    def format(self, record):
176 0
        message = logging.StreamHandler.format(self, record)
177 0
        if not self.is_tty or self.is_windows:
178 0
            return message
179 0
        for name, color in self.logger_map.items():
180 0
            if record.name.startswith(name):
181 0
                return self.colorize(message, fg=color)
182 0
        if record.levelno in self.level_map:
183 0
            bg, fg, bold = self.level_map[record.levelno]
184 0
            return self.colorize(message, bg=bg, fg=fg, bold=bold)
185 0
        return message
186

187 2
    def colorize(self, message, bg=None, fg=None, bold=False):
188 0
        params = []
189 0
        if bg in COLORS:
190 0
            params.append(str(COLORS.index(bg) + 40))
191 0
        if fg in COLORS:
192 0
            params.append(str(COLORS.index(fg) + 30))
193 0
        if bold:
194 0
            params.append("1")
195 0
        if params:
196 0
            message = "".join(
197
                (self.csi, ";".join(params), "m", message, self.reset)
198
            )
199 0
        return message

Read our documentation on viewing source code .

Loading