Coverage for datasette/filters.py : 94%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import json
2import numbers
4from .utils import detect_json1, escape_sqlite
7class Filter:
8 key = None
9 display = None
10 no_argument = False
12 def where_clause(self, table, column, value, param_counter):
13 raise NotImplementedError
15 def human_clause(self, column, value):
16 raise NotImplementedError
19class TemplatedFilter(Filter):
20 def __init__(
21 self,
22 key,
23 display,
24 sql_template,
25 human_template,
26 format="{}",
27 numeric=False,
28 no_argument=False,
29 ):
30 self.key = key
31 self.display = display
32 self.sql_template = sql_template
33 self.human_template = human_template
34 self.format = format
35 self.numeric = numeric
36 self.no_argument = no_argument
38 def where_clause(self, table, column, value, param_counter):
39 converted = self.format.format(value)
40 if self.numeric and converted.isdigit():
41 converted = int(converted)
42 if self.no_argument:
43 kwargs = {"c": column}
44 converted = None
45 else:
46 kwargs = {"c": column, "p": "p{}".format(param_counter), "t": table}
47 return self.sql_template.format(**kwargs), converted
49 def human_clause(self, column, value):
50 if callable(self.human_template):
51 template = self.human_template(column, value)
52 else:
53 template = self.human_template
54 if self.no_argument:
55 return template.format(c=column)
56 else:
57 return template.format(c=column, v=value)
60class InFilter(Filter):
61 key = "in"
62 display = "in"
64 def split_value(self, value):
65 if value.startswith("["):
66 return json.loads(value)
67 else:
68 return [v.strip() for v in value.split(",")]
70 def where_clause(self, table, column, value, param_counter):
71 values = self.split_value(value)
72 params = [":p{}".format(param_counter + i) for i in range(len(values))]
73 sql = "{} in ({})".format(escape_sqlite(column), ", ".join(params))
74 return sql, values
76 def human_clause(self, column, value):
77 return "{} in {}".format(column, json.dumps(self.split_value(value)))
80class NotInFilter(InFilter):
81 key = "notin"
82 display = "not in"
84 def where_clause(self, table, column, value, param_counter):
85 values = self.split_value(value)
86 params = [":p{}".format(param_counter + i) for i in range(len(values))]
87 sql = "{} not in ({})".format(escape_sqlite(column), ", ".join(params))
88 return sql, values
90 def human_clause(self, column, value):
91 return "{} not in {}".format(column, json.dumps(self.split_value(value)))
94class Filters:
95 _filters = (
96 [
97 # key, display, sql_template, human_template, format=, numeric=, no_argument=
98 TemplatedFilter(
99 "exact",
100 "=",
101 '"{c}" = :{p}',
102 lambda c, v: "{c} = {v}" if v.isdigit() else '{c} = "{v}"',
103 ),
104 TemplatedFilter(
105 "not",
106 "!=",
107 '"{c}" != :{p}',
108 lambda c, v: "{c} != {v}" if v.isdigit() else '{c} != "{v}"',
109 ),
110 TemplatedFilter(
111 "contains",
112 "contains",
113 '"{c}" like :{p}',
114 '{c} contains "{v}"',
115 format="%{}%",
116 ),
117 TemplatedFilter(
118 "endswith",
119 "ends with",
120 '"{c}" like :{p}',
121 '{c} ends with "{v}"',
122 format="%{}",
123 ),
124 TemplatedFilter(
125 "startswith",
126 "starts with",
127 '"{c}" like :{p}',
128 '{c} starts with "{v}"',
129 format="{}%",
130 ),
131 TemplatedFilter("gt", ">", '"{c}" > :{p}', "{c} > {v}", numeric=True),
132 TemplatedFilter(
133 "gte", "\u2265", '"{c}" >= :{p}', "{c} \u2265 {v}", numeric=True
134 ),
135 TemplatedFilter("lt", "<", '"{c}" < :{p}', "{c} < {v}", numeric=True),
136 TemplatedFilter(
137 "lte", "\u2264", '"{c}" <= :{p}', "{c} \u2264 {v}", numeric=True
138 ),
139 TemplatedFilter("like", "like", '"{c}" like :{p}', '{c} like "{v}"'),
140 TemplatedFilter(
141 "notlike", "not like", '"{c}" not like :{p}', '{c} not like "{v}"'
142 ),
143 TemplatedFilter("glob", "glob", '"{c}" glob :{p}', '{c} glob "{v}"'),
144 InFilter(),
145 NotInFilter(),
146 ]
147 + (
148 [
149 TemplatedFilter(
150 "arraycontains",
151 "array contains",
152 """rowid in (
153 select {t}.rowid from {t}, json_each({t}.{c}) j
154 where j.value = :{p}
155 )""",
156 '{c} contains "{v}"',
157 )
158 ]
159 if detect_json1()
160 else []
161 )
162 + [
163 TemplatedFilter(
164 "date", "date", 'date("{c}") = :{p}', '"{c}" is on date {v}'
165 ),
166 TemplatedFilter(
167 "isnull", "is null", '"{c}" is null', "{c} is null", no_argument=True
168 ),
169 TemplatedFilter(
170 "notnull",
171 "is not null",
172 '"{c}" is not null',
173 "{c} is not null",
174 no_argument=True,
175 ),
176 TemplatedFilter(
177 "isblank",
178 "is blank",
179 '("{c}" is null or "{c}" = "")',
180 "{c} is blank",
181 no_argument=True,
182 ),
183 TemplatedFilter(
184 "notblank",
185 "is not blank",
186 '("{c}" is not null and "{c}" != "")',
187 "{c} is not blank",
188 no_argument=True,
189 ),
190 ]
191 )
192 _filters_by_key = {f.key: f for f in _filters}
194 def __init__(self, pairs, units={}, ureg=None):
195 self.pairs = pairs
196 self.units = units
197 self.ureg = ureg
199 def lookups(self):
200 "Yields (lookup, display, no_argument) pairs"
201 for filter in self._filters:
202 yield filter.key, filter.display, filter.no_argument
204 def human_description_en(self, extra=None):
205 bits = []
206 if extra:
207 bits.extend(extra)
208 for column, lookup, value in self.selections():
209 filter = self._filters_by_key.get(lookup, None)
210 if filter:
211 bits.append(filter.human_clause(column, value))
212 # Comma separated, with an ' and ' at the end
213 and_bits = []
214 commas, tail = bits[:-1], bits[-1:]
215 if commas:
216 and_bits.append(", ".join(commas))
217 if tail:
218 and_bits.append(tail[0])
219 s = " and ".join(and_bits)
220 if not s:
221 return ""
222 return "where {}".format(s)
224 def selections(self):
225 "Yields (column, lookup, value) tuples"
226 for key, value in self.pairs:
227 if "__" in key:
228 column, lookup = key.rsplit("__", 1)
229 else:
230 column = key
231 lookup = "exact"
232 yield column, lookup, value
234 def has_selections(self):
235 return bool(self.pairs)
237 def convert_unit(self, column, value):
238 "If the user has provided a unit in the query, convert it into the column unit, if present."
239 if column not in self.units:
240 return value
242 # Try to interpret the value as a unit
243 value = self.ureg(value)
244 if isinstance(value, numbers.Number):
245 # It's just a bare number, assume it's the column unit
246 return value
248 column_unit = self.ureg(self.units[column])
249 return value.to(column_unit).magnitude
251 def build_where_clauses(self, table):
252 sql_bits = []
253 params = {}
254 i = 0
255 for column, lookup, value in self.selections():
256 filter = self._filters_by_key.get(lookup, None)
257 if filter:
258 sql_bit, param = filter.where_clause(
259 table, column, self.convert_unit(column, value), i
260 )
261 sql_bits.append(sql_bit)
262 if param is not None:
263 if not isinstance(param, list):
264 param = [param]
265 for individual_param in param:
266 param_id = "p{}".format(i)
267 params[param_id] = individual_param
268 i += 1
269 return sql_bits, params