Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import json 

2import numbers 

3 

4from .utils import detect_json1, escape_sqlite 

5 

6 

7class Filter: 

8 key = None 

9 display = None 

10 no_argument = False 

11 

12 def where_clause(self, table, column, value, param_counter): 

13 raise NotImplementedError 

14 

15 def human_clause(self, column, value): 

16 raise NotImplementedError 

17 

18 

19class TemplatedFilter(Filter): 

20 def __init__( 

21 self, 

22 key, 

23 display, 

24 sql_template, 

25 human_template, 

26 format="{}", 

27 numeric=False, 

28 no_argument=False, 

29 ): 

30 self.key = key 

31 self.display = display 

32 self.sql_template = sql_template 

33 self.human_template = human_template 

34 self.format = format 

35 self.numeric = numeric 

36 self.no_argument = no_argument 

37 

38 def where_clause(self, table, column, value, param_counter): 

39 converted = self.format.format(value) 

40 if self.numeric and converted.isdigit(): 

41 converted = int(converted) 

42 if self.no_argument: 

43 kwargs = {"c": column} 

44 converted = None 

45 else: 

46 kwargs = {"c": column, "p": "p{}".format(param_counter), "t": table} 

47 return self.sql_template.format(**kwargs), converted 

48 

49 def human_clause(self, column, value): 

50 if callable(self.human_template): 

51 template = self.human_template(column, value) 

52 else: 

53 template = self.human_template 

54 if self.no_argument: 

55 return template.format(c=column) 

56 else: 

57 return template.format(c=column, v=value) 

58 

59 

60class InFilter(Filter): 

61 key = "in" 

62 display = "in" 

63 

64 def split_value(self, value): 

65 if value.startswith("["): 

66 return json.loads(value) 

67 else: 

68 return [v.strip() for v in value.split(",")] 

69 

70 def where_clause(self, table, column, value, param_counter): 

71 values = self.split_value(value) 

72 params = [":p{}".format(param_counter + i) for i in range(len(values))] 

73 sql = "{} in ({})".format(escape_sqlite(column), ", ".join(params)) 

74 return sql, values 

75 

76 def human_clause(self, column, value): 

77 return "{} in {}".format(column, json.dumps(self.split_value(value))) 

78 

79 

80class NotInFilter(InFilter): 

81 key = "notin" 

82 display = "not in" 

83 

84 def where_clause(self, table, column, value, param_counter): 

85 values = self.split_value(value) 

86 params = [":p{}".format(param_counter + i) for i in range(len(values))] 

87 sql = "{} not in ({})".format(escape_sqlite(column), ", ".join(params)) 

88 return sql, values 

89 

90 def human_clause(self, column, value): 

91 return "{} not in {}".format(column, json.dumps(self.split_value(value))) 

92 

93 

94class Filters: 

95 _filters = ( 

96 [ 

97 # key, display, sql_template, human_template, format=, numeric=, no_argument= 

98 TemplatedFilter( 

99 "exact", 

100 "=", 

101 '"{c}" = :{p}', 

102 lambda c, v: "{c} = {v}" if v.isdigit() else '{c} = "{v}"', 

103 ), 

104 TemplatedFilter( 

105 "not", 

106 "!=", 

107 '"{c}" != :{p}', 

108 lambda c, v: "{c} != {v}" if v.isdigit() else '{c} != "{v}"', 

109 ), 

110 TemplatedFilter( 

111 "contains", 

112 "contains", 

113 '"{c}" like :{p}', 

114 '{c} contains "{v}"', 

115 format="%{}%", 

116 ), 

117 TemplatedFilter( 

118 "endswith", 

119 "ends with", 

120 '"{c}" like :{p}', 

121 '{c} ends with "{v}"', 

122 format="%{}", 

123 ), 

124 TemplatedFilter( 

125 "startswith", 

126 "starts with", 

127 '"{c}" like :{p}', 

128 '{c} starts with "{v}"', 

129 format="{}%", 

130 ), 

131 TemplatedFilter("gt", ">", '"{c}" > :{p}', "{c} > {v}", numeric=True), 

132 TemplatedFilter( 

133 "gte", "\u2265", '"{c}" >= :{p}', "{c} \u2265 {v}", numeric=True 

134 ), 

135 TemplatedFilter("lt", "<", '"{c}" < :{p}', "{c} < {v}", numeric=True), 

136 TemplatedFilter( 

137 "lte", "\u2264", '"{c}" <= :{p}', "{c} \u2264 {v}", numeric=True 

138 ), 

139 TemplatedFilter("like", "like", '"{c}" like :{p}', '{c} like "{v}"'), 

140 TemplatedFilter( 

141 "notlike", "not like", '"{c}" not like :{p}', '{c} not like "{v}"' 

142 ), 

143 TemplatedFilter("glob", "glob", '"{c}" glob :{p}', '{c} glob "{v}"'), 

144 InFilter(), 

145 NotInFilter(), 

146 ] 

147 + ( 

148 [ 

149 TemplatedFilter( 

150 "arraycontains", 

151 "array contains", 

152 """rowid in ( 

153 select {t}.rowid from {t}, json_each({t}.{c}) j 

154 where j.value = :{p} 

155 )""", 

156 '{c} contains "{v}"', 

157 ) 

158 ] 

159 if detect_json1() 

160 else [] 

161 ) 

162 + [ 

163 TemplatedFilter( 

164 "date", "date", 'date("{c}") = :{p}', '"{c}" is on date {v}' 

165 ), 

166 TemplatedFilter( 

167 "isnull", "is null", '"{c}" is null', "{c} is null", no_argument=True 

168 ), 

169 TemplatedFilter( 

170 "notnull", 

171 "is not null", 

172 '"{c}" is not null', 

173 "{c} is not null", 

174 no_argument=True, 

175 ), 

176 TemplatedFilter( 

177 "isblank", 

178 "is blank", 

179 '("{c}" is null or "{c}" = "")', 

180 "{c} is blank", 

181 no_argument=True, 

182 ), 

183 TemplatedFilter( 

184 "notblank", 

185 "is not blank", 

186 '("{c}" is not null and "{c}" != "")', 

187 "{c} is not blank", 

188 no_argument=True, 

189 ), 

190 ] 

191 ) 

192 _filters_by_key = {f.key: f for f in _filters} 

193 

194 def __init__(self, pairs, units={}, ureg=None): 

195 self.pairs = pairs 

196 self.units = units 

197 self.ureg = ureg 

198 

199 def lookups(self): 

200 "Yields (lookup, display, no_argument) pairs" 

201 for filter in self._filters: 

202 yield filter.key, filter.display, filter.no_argument 

203 

204 def human_description_en(self, extra=None): 

205 bits = [] 

206 if extra: 

207 bits.extend(extra) 

208 for column, lookup, value in self.selections(): 

209 filter = self._filters_by_key.get(lookup, None) 

210 if filter: 

211 bits.append(filter.human_clause(column, value)) 

212 # Comma separated, with an ' and ' at the end 

213 and_bits = [] 

214 commas, tail = bits[:-1], bits[-1:] 

215 if commas: 

216 and_bits.append(", ".join(commas)) 

217 if tail: 

218 and_bits.append(tail[0]) 

219 s = " and ".join(and_bits) 

220 if not s: 

221 return "" 

222 return "where {}".format(s) 

223 

224 def selections(self): 

225 "Yields (column, lookup, value) tuples" 

226 for key, value in self.pairs: 

227 if "__" in key: 

228 column, lookup = key.rsplit("__", 1) 

229 else: 

230 column = key 

231 lookup = "exact" 

232 yield column, lookup, value 

233 

234 def has_selections(self): 

235 return bool(self.pairs) 

236 

237 def convert_unit(self, column, value): 

238 "If the user has provided a unit in the query, convert it into the column unit, if present." 

239 if column not in self.units: 

240 return value 

241 

242 # Try to interpret the value as a unit 

243 value = self.ureg(value) 

244 if isinstance(value, numbers.Number): 

245 # It's just a bare number, assume it's the column unit 

246 return value 

247 

248 column_unit = self.ureg(self.units[column]) 

249 return value.to(column_unit).magnitude 

250 

251 def build_where_clauses(self, table): 

252 sql_bits = [] 

253 params = {} 

254 i = 0 

255 for column, lookup, value in self.selections(): 

256 filter = self._filters_by_key.get(lookup, None) 

257 if filter: 

258 sql_bit, param = filter.where_clause( 

259 table, column, self.convert_unit(column, value), i 

260 ) 

261 sql_bits.append(sql_bit) 

262 if param is not None: 

263 if not isinstance(param, list): 

264 param = [param] 

265 for individual_param in param: 

266 param_id = "p{}".format(i) 

267 params[param_id] = individual_param 

268 i += 1 

269 return sql_bits, params