Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import os 

2import jinja2 

3 

4from datasette.utils import ( 

5 check_visibility, 

6 to_css_class, 

7 validate_sql_select, 

8 is_url, 

9 path_with_added_args, 

10 path_with_removed_args, 

11) 

12from datasette.utils.asgi import AsgiFileDownload, Response 

13from datasette.plugins import pm 

14 

15from .base import DatasetteError, DataView 

16 

17 

18class DatabaseView(DataView): 

19 name = "database" 

20 

21 async def data(self, request, database, hash, default_labels=False, _size=None): 

22 await self.check_permission(request, "view-instance") 

23 await self.check_permission(request, "view-database", database) 

24 metadata = (self.ds.metadata("databases") or {}).get(database, {}) 

25 self.ds.update_with_inherited_metadata(metadata) 

26 

27 if request.args.get("sql"): 

28 sql = request.args.get("sql") 

29 validate_sql_select(sql) 

30 return await QueryView(self.ds).data( 

31 request, database, hash, sql, _size=_size, metadata=metadata 

32 ) 

33 

34 db = self.ds.databases[database] 

35 

36 table_counts = await db.table_counts(5) 

37 hidden_table_names = set(await db.hidden_table_names()) 

38 all_foreign_keys = await db.get_all_foreign_keys() 

39 

40 views = [] 

41 for view_name in await db.view_names(): 

42 visible, private = await check_visibility( 

43 self.ds, request.actor, "view-table", (database, view_name), 

44 ) 

45 if visible: 

46 views.append( 

47 {"name": view_name, "private": private,} 

48 ) 

49 

50 tables = [] 

51 for table in table_counts: 

52 visible, private = await check_visibility( 

53 self.ds, request.actor, "view-table", (database, table), 

54 ) 

55 if not visible: 

56 continue 

57 table_columns = await db.table_columns(table) 

58 tables.append( 

59 { 

60 "name": table, 

61 "columns": table_columns, 

62 "primary_keys": await db.primary_keys(table), 

63 "count": table_counts[table], 

64 "hidden": table in hidden_table_names, 

65 "fts_table": await db.fts_table(table), 

66 "foreign_keys": all_foreign_keys[table], 

67 "private": private, 

68 } 

69 ) 

70 

71 tables.sort(key=lambda t: (t["hidden"], t["name"])) 

72 canned_queries = [] 

73 for query in self.ds.get_canned_queries(database): 

74 visible, private = await check_visibility( 

75 self.ds, request.actor, "view-query", (database, query["name"]), 

76 ) 

77 if visible: 

78 canned_queries.append(dict(query, private=private)) 

79 return ( 

80 { 

81 "database": database, 

82 "size": db.size, 

83 "tables": tables, 

84 "hidden_count": len([t for t in tables if t["hidden"]]), 

85 "views": views, 

86 "queries": canned_queries, 

87 "private": not await self.ds.permission_allowed( 

88 None, "view-database", database 

89 ), 

90 "allow_execute_sql": await self.ds.permission_allowed( 

91 request.actor, "execute-sql", database, default=True 

92 ), 

93 }, 

94 { 

95 "show_hidden": request.args.get("_show_hidden"), 

96 "editable": True, 

97 "metadata": metadata, 

98 "allow_download": self.ds.config("allow_download") 

99 and not db.is_mutable 

100 and database != ":memory:", 

101 }, 

102 ("database-{}.html".format(to_css_class(database)), "database.html"), 

103 ) 

104 

105 

106class DatabaseDownload(DataView): 

107 name = "database_download" 

108 

109 async def view_get(self, request, database, hash, correct_hash_present, **kwargs): 

110 await self.check_permission(request, "view-instance") 

111 await self.check_permission(request, "view-database", database) 

112 await self.check_permission(request, "view-database-download", database) 

113 if database not in self.ds.databases: 

114 raise DatasetteError("Invalid database", status=404) 

115 db = self.ds.databases[database] 

116 if db.is_memory: 

117 raise DatasetteError("Cannot download :memory: database", status=404) 

118 if not self.ds.config("allow_download") or db.is_mutable: 

119 raise DatasetteError("Database download is forbidden", status=403) 

120 if not db.path: 

121 raise DatasetteError("Cannot download database", status=404) 

122 filepath = db.path 

123 return AsgiFileDownload( 

124 filepath, 

125 filename=os.path.basename(filepath), 

126 content_type="application/octet-stream", 

127 ) 

128 

129 

130class QueryView(DataView): 

131 async def data( 

132 self, 

133 request, 

134 database, 

135 hash, 

136 sql, 

137 editable=True, 

138 canned_query=None, 

139 metadata=None, 

140 _size=None, 

141 named_parameters=None, 

142 write=False, 

143 ): 

144 params = {key: request.args.get(key) for key in request.args} 

145 if "sql" in params: 

146 params.pop("sql") 

147 if "_shape" in params: 

148 params.pop("_shape") 

149 

150 # Respect canned query permissions 

151 await self.check_permission(request, "view-instance") 

152 await self.check_permission(request, "view-database", database) 

153 private = False 

154 if canned_query: 

155 await self.check_permission(request, "view-query", (database, canned_query)) 

156 private = not await self.ds.permission_allowed( 

157 None, "view-query", (database, canned_query), default=True 

158 ) 

159 else: 

160 await self.check_permission(request, "execute-sql", database) 

161 # Extract any :named parameters 

162 named_parameters = named_parameters or self.re_named_parameter.findall(sql) 

163 named_parameter_values = { 

164 named_parameter: params.get(named_parameter) or "" 

165 for named_parameter in named_parameters 

166 } 

167 

168 # Set to blank string if missing from params 

169 for named_parameter in named_parameters: 

170 if named_parameter not in params: 

171 params[named_parameter] = "" 

172 

173 extra_args = {} 

174 if params.get("_timelimit"): 

175 extra_args["custom_time_limit"] = int(params["_timelimit"]) 

176 if _size: 

177 extra_args["page_size"] = _size 

178 

179 templates = ["query-{}.html".format(to_css_class(database)), "query.html"] 

180 

181 # Execute query - as write or as read 

182 if write: 

183 if request.method == "POST": 

184 params = await request.post_vars() 

185 try: 

186 cursor = await self.ds.databases[database].execute_write( 

187 sql, params, block=True 

188 ) 

189 message = metadata.get( 

190 "on_success_message" 

191 ) or "Query executed, {} row{} affected".format( 

192 cursor.rowcount, "" if cursor.rowcount == 1 else "s" 

193 ) 

194 message_type = self.ds.INFO 

195 redirect_url = metadata.get("on_success_redirect") 

196 except Exception as e: 

197 message = metadata.get("on_error_message") or str(e) 

198 message_type = self.ds.ERROR 

199 redirect_url = metadata.get("on_error_redirect") 

200 self.ds.add_message(request, message, message_type) 

201 return self.redirect(request, redirect_url or request.path) 

202 else: 

203 

204 async def extra_template(): 

205 return { 

206 "request": request, 

207 "path_with_added_args": path_with_added_args, 

208 "path_with_removed_args": path_with_removed_args, 

209 "named_parameter_values": named_parameter_values, 

210 "canned_query": canned_query, 

211 "success_message": request.args.get("_success") or "", 

212 "canned_write": True, 

213 } 

214 

215 return ( 

216 { 

217 "database": database, 

218 "rows": [], 

219 "truncated": False, 

220 "columns": [], 

221 "query": {"sql": sql, "params": params}, 

222 "private": private, 

223 }, 

224 extra_template, 

225 templates, 

226 ) 

227 else: # Not a write 

228 results = await self.ds.execute( 

229 database, sql, params, truncate=True, **extra_args 

230 ) 

231 columns = [r[0] for r in results.description] 

232 

233 if canned_query: 

234 templates.insert( 

235 0, 

236 "query-{}-{}.html".format( 

237 to_css_class(database), to_css_class(canned_query) 

238 ), 

239 ) 

240 

241 async def extra_template(): 

242 display_rows = [] 

243 for row in results.rows: 

244 display_row = [] 

245 for column, value in zip(results.columns, row): 

246 display_value = value 

247 # Let the plugins have a go 

248 # pylint: disable=no-member 

249 plugin_value = pm.hook.render_cell( 

250 value=value, 

251 column=column, 

252 table=None, 

253 database=database, 

254 datasette=self.ds, 

255 ) 

256 if plugin_value is not None: 

257 display_value = plugin_value 

258 else: 

259 if value in ("", None): 

260 display_value = jinja2.Markup(" ") 

261 elif is_url(str(display_value).strip()): 

262 display_value = jinja2.Markup( 

263 '<a href="{url}">{url}</a>'.format( 

264 url=jinja2.escape(value.strip()) 

265 ) 

266 ) 

267 display_row.append(display_value) 

268 display_rows.append(display_row) 

269 return { 

270 "display_rows": display_rows, 

271 "custom_sql": True, 

272 "named_parameter_values": named_parameter_values, 

273 "editable": editable, 

274 "canned_query": canned_query, 

275 "metadata": metadata, 

276 "config": self.ds.config_dict(), 

277 "request": request, 

278 "path_with_added_args": path_with_added_args, 

279 "path_with_removed_args": path_with_removed_args, 

280 "hide_sql": "_hide_sql" in params, 

281 } 

282 

283 return ( 

284 { 

285 "database": database, 

286 "query_name": canned_query, 

287 "rows": results.rows, 

288 "truncated": results.truncated, 

289 "columns": columns, 

290 "query": {"sql": sql, "params": params}, 

291 "private": private, 

292 "allow_execute_sql": await self.ds.permission_allowed( 

293 request.actor, "execute-sql", database, default=True 

294 ), 

295 }, 

296 extra_template, 

297 templates, 

298 )