1 6
import re
2 6
from functools import reduce
3 6
from pyparsing import (
4
    infixNotation,
5
    opAssoc,
6
    Optional,
7
    Literal,
8
    CharsNotIn,
9
    ParseException,
10
)
11

12 6
import six
13

14 6
import django
15

16 6
from cyborgbackup.main.utils.common import get_search_fields
17

18 6
__all__ = ['SmartFilter']
19

20

21 6
def string_to_type(t):
22 0
    if t == u'true':
23 0
        return True
24 0
    elif t == u'false':
25 0
        return False
26

27 0
    if re.search(r'^[-+]?[0-9]+$', t):
28 0
        return int(t)
29

30 0
    if re.search(r'^[-+]?[0-9]+\.[0-9]+$', t):
31 0
        return float(t)
32

33 0
    return t
34

35

36 6
def get_model(name):
37 0
    return django.apps.apps.get_model('main', name)
38

39

40 6
class SmartFilter(object):
41

42 6
    class BoolOperand(object):
43 6
        def __init__(self, t):
44 0
            kwargs = dict()
45 0
            k, v = self._extract_key_value(t)
46 0
            k, v = self._json_path_to_contains(k, v)
47

48 0
            Host = get_model('host')
49 0
            search_kwargs = self._expand_search(k, v)
50 0
            if search_kwargs:
51 0
                kwargs.update(search_kwargs)
52 0
                q = reduce(lambda x, y: x | y, [django.db.models.Q(**{u'%s__contains' % _k: _v}) for _k, _v in kwargs.items()])  # noqa
53 0
                self.result = Host.objects.filter(q)
54
            else:
55 0
                kwargs[k] = v
56 0
                self.result = Host.objects.filter(**kwargs)
57

58 6
        def strip_quotes_traditional_logic(self, v):
59 0
            if type(v) is six.text_type and v.startswith('"') and v.endswith('"'):
60 0
                return v[1:-1]
61 0
            return v
62

63 6
        def strip_quotes_json_logic(self, v):
64 0
            if type(v) is six.text_type and v.startswith('"') and v.endswith('"') and v != u'"null"':
65 0
                return v[1:-1]
66 0
            return v
67

68 6
        def _json_path_to_contains(self, k, v):
69 0
            v = self.strip_quotes_traditional_logic(v)
70 0
            return (k, v)
71

72 6
        def _extract_key_value(self, t):
73 0
            t_len = len(t)
74

75 0
            k = None
76 0
            v = None
77

78
            # key
79
            # "something"=
80 0
            v_offset = 2
81 0
            if t_len >= 2 and t[0] == "\"" and t[2] == "\"":
82 0
                k = t[1]
83 0
                v_offset = 4
84
            # something=
85
            else:
86 0
                k = t[0]
87

88
            # value
89
            # ="something"
90 0
            if t_len > (v_offset + 2) and t[v_offset] == "\"" and t[v_offset + 2] == "\"":
91 0
                v = u'"' + six.text_type(t[v_offset + 1]) + u'"'
92
            # empty ""
93 0
            elif t_len > (v_offset + 1):
94 0
                v = u""
95
            # no ""
96
            else:
97 0
                v = string_to_type(t[v_offset])
98

99 0
            return (k, v)
100

101 6
        def _expand_search(self, k, v):
102 0
            if 'search' not in k:
103 0
                return None
104

105 0
            model, relation = None, None
106 0
            if k == 'search':
107 0
                model = get_model('host')
108 0
            elif k.endswith('__search'):
109 0
                relation = k.split('__')[0]
110 0
                try:
111 0
                    model = get_model(relation)
112 0
                except LookupError:
113 0
                    raise ParseException('No related field named %s' % relation)
114

115 0
            search_kwargs = {}
116 0
            if model is not None:
117 0
                search_fields = get_search_fields(model)
118 0
                for field in search_fields:
119 0
                    if relation is not None:
120 0
                        k = '{0}__{1}'.format(relation, field)
121
                    else:
122 0
                        k = field
123 0
                    search_kwargs[k] = v
124 0
            return search_kwargs
125

126 6
    class BoolBinOp(object):
127 6
        def __init__(self, t):
128 0
            self.result = None
129 0
            i = 2
130 0
            while i < len(t[0]):
131 0
                if not self.result:
132 0
                    self.result = t[0][0].result
133 0
                right = t[0][i].result
134 0
                self.result = self.execute_logic(self.result, right)
135 0
                i += 2
136

137 6
    class BoolAnd(BoolBinOp):
138 6
        def execute_logic(self, left, right):
139 0
            return left & right
140

141 6
    class BoolOr(BoolBinOp):
142 6
        def execute_logic(self, left, right):
143 0
            return left | right
144

145 6
    @classmethod
146
    def query_from_string(cls, filter_string):
147

148 0
        filter_string_raw = filter_string
149 0
        filter_string = six.text_type(filter_string)
150

151 0
        unicode_spaces = list(set(six.text_type(c) for c in filter_string if c.isspace()))
152 0
        unicode_spaces_other = unicode_spaces + [u'(', u')', u'=', u'"']
153 0
        atom = CharsNotIn(unicode_spaces_other)
154 0
        atom_inside_quotes = CharsNotIn(u'"')
155 0
        atom_quoted = Literal('"') + Optional(atom_inside_quotes) + Literal('"')
156 0
        EQUAL = Literal('=')
157

158 0
        grammar = ((atom_quoted | atom) + EQUAL + Optional((atom_quoted | atom)))
159 0
        grammar.setParseAction(cls.BoolOperand)
160

161 0
        boolExpr = infixNotation(grammar, [
162
            ("and", 2, opAssoc.LEFT, cls.BoolAnd),
163
            ("or", 2, opAssoc.LEFT, cls.BoolOr),
164
        ])
165

166 0
        try:
167 0
            res = boolExpr.parseString('(' + filter_string + ')')
168 0
        except ParseException:
169 0
            raise RuntimeError(u"Invalid query %s" % filter_string_raw)
170

171 0
        if len(res) > 0:
172 0
            return res[0].result
173

174 0
        raise RuntimeError("Parsing the filter_string %s went terribly wrong" % filter_string)

Read our documentation on viewing source code .

Loading