Coverage for datasette/views/database.py : 98%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import os
2import jinja2
4from datasette.utils import (
5 check_visibility,
6 to_css_class,
7 validate_sql_select,
8 is_url,
9 path_with_added_args,
10 path_with_removed_args,
11)
12from datasette.utils.asgi import AsgiFileDownload, Response
13from datasette.plugins import pm
15from .base import DatasetteError, DataView
18class DatabaseView(DataView):
19 name = "database"
21 async def data(self, request, database, hash, default_labels=False, _size=None):
22 await self.check_permission(request, "view-instance")
23 await self.check_permission(request, "view-database", database)
24 metadata = (self.ds.metadata("databases") or {}).get(database, {})
25 self.ds.update_with_inherited_metadata(metadata)
27 if request.args.get("sql"):
28 sql = request.args.get("sql")
29 validate_sql_select(sql)
30 return await QueryView(self.ds).data(
31 request, database, hash, sql, _size=_size, metadata=metadata
32 )
34 db = self.ds.databases[database]
36 table_counts = await db.table_counts(5)
37 hidden_table_names = set(await db.hidden_table_names())
38 all_foreign_keys = await db.get_all_foreign_keys()
40 views = []
41 for view_name in await db.view_names():
42 visible, private = await check_visibility(
43 self.ds, request.actor, "view-table", (database, view_name),
44 )
45 if visible:
46 views.append(
47 {"name": view_name, "private": private,}
48 )
50 tables = []
51 for table in table_counts:
52 visible, private = await check_visibility(
53 self.ds, request.actor, "view-table", (database, table),
54 )
55 if not visible:
56 continue
57 table_columns = await db.table_columns(table)
58 tables.append(
59 {
60 "name": table,
61 "columns": table_columns,
62 "primary_keys": await db.primary_keys(table),
63 "count": table_counts[table],
64 "hidden": table in hidden_table_names,
65 "fts_table": await db.fts_table(table),
66 "foreign_keys": all_foreign_keys[table],
67 "private": private,
68 }
69 )
71 tables.sort(key=lambda t: (t["hidden"], t["name"]))
72 canned_queries = []
73 for query in self.ds.get_canned_queries(database):
74 visible, private = await check_visibility(
75 self.ds, request.actor, "view-query", (database, query["name"]),
76 )
77 if visible:
78 canned_queries.append(dict(query, private=private))
79 return (
80 {
81 "database": database,
82 "size": db.size,
83 "tables": tables,
84 "hidden_count": len([t for t in tables if t["hidden"]]),
85 "views": views,
86 "queries": canned_queries,
87 "private": not await self.ds.permission_allowed(
88 None, "view-database", database
89 ),
90 "allow_execute_sql": await self.ds.permission_allowed(
91 request.actor, "execute-sql", database, default=True
92 ),
93 },
94 {
95 "show_hidden": request.args.get("_show_hidden"),
96 "editable": True,
97 "metadata": metadata,
98 "allow_download": self.ds.config("allow_download")
99 and not db.is_mutable
100 and database != ":memory:",
101 },
102 ("database-{}.html".format(to_css_class(database)), "database.html"),
103 )
106class DatabaseDownload(DataView):
107 name = "database_download"
109 async def view_get(self, request, database, hash, correct_hash_present, **kwargs):
110 await self.check_permission(request, "view-instance")
111 await self.check_permission(request, "view-database", database)
112 await self.check_permission(request, "view-database-download", database)
113 if database not in self.ds.databases:
114 raise DatasetteError("Invalid database", status=404)
115 db = self.ds.databases[database]
116 if db.is_memory:
117 raise DatasetteError("Cannot download :memory: database", status=404)
118 if not self.ds.config("allow_download") or db.is_mutable:
119 raise DatasetteError("Database download is forbidden", status=403)
120 if not db.path:
121 raise DatasetteError("Cannot download database", status=404)
122 filepath = db.path
123 return AsgiFileDownload(
124 filepath,
125 filename=os.path.basename(filepath),
126 content_type="application/octet-stream",
127 )
130class QueryView(DataView):
131 async def data(
132 self,
133 request,
134 database,
135 hash,
136 sql,
137 editable=True,
138 canned_query=None,
139 metadata=None,
140 _size=None,
141 named_parameters=None,
142 write=False,
143 ):
144 params = {key: request.args.get(key) for key in request.args}
145 if "sql" in params:
146 params.pop("sql")
147 if "_shape" in params:
148 params.pop("_shape")
150 # Respect canned query permissions
151 await self.check_permission(request, "view-instance")
152 await self.check_permission(request, "view-database", database)
153 private = False
154 if canned_query:
155 await self.check_permission(request, "view-query", (database, canned_query))
156 private = not await self.ds.permission_allowed(
157 None, "view-query", (database, canned_query), default=True
158 )
159 else:
160 await self.check_permission(request, "execute-sql", database)
161 # Extract any :named parameters
162 named_parameters = named_parameters or self.re_named_parameter.findall(sql)
163 named_parameter_values = {
164 named_parameter: params.get(named_parameter) or ""
165 for named_parameter in named_parameters
166 }
168 # Set to blank string if missing from params
169 for named_parameter in named_parameters:
170 if named_parameter not in params:
171 params[named_parameter] = ""
173 extra_args = {}
174 if params.get("_timelimit"):
175 extra_args["custom_time_limit"] = int(params["_timelimit"])
176 if _size:
177 extra_args["page_size"] = _size
179 templates = ["query-{}.html".format(to_css_class(database)), "query.html"]
181 # Execute query - as write or as read
182 if write:
183 if request.method == "POST":
184 params = await request.post_vars()
185 try:
186 cursor = await self.ds.databases[database].execute_write(
187 sql, params, block=True
188 )
189 message = metadata.get(
190 "on_success_message"
191 ) or "Query executed, {} row{} affected".format(
192 cursor.rowcount, "" if cursor.rowcount == 1 else "s"
193 )
194 message_type = self.ds.INFO
195 redirect_url = metadata.get("on_success_redirect")
196 except Exception as e:
197 message = metadata.get("on_error_message") or str(e)
198 message_type = self.ds.ERROR
199 redirect_url = metadata.get("on_error_redirect")
200 self.ds.add_message(request, message, message_type)
201 return self.redirect(request, redirect_url or request.path)
202 else:
204 async def extra_template():
205 return {
206 "request": request,
207 "path_with_added_args": path_with_added_args,
208 "path_with_removed_args": path_with_removed_args,
209 "named_parameter_values": named_parameter_values,
210 "canned_query": canned_query,
211 "success_message": request.args.get("_success") or "",
212 "canned_write": True,
213 }
215 return (
216 {
217 "database": database,
218 "rows": [],
219 "truncated": False,
220 "columns": [],
221 "query": {"sql": sql, "params": params},
222 "private": private,
223 },
224 extra_template,
225 templates,
226 )
227 else: # Not a write
228 results = await self.ds.execute(
229 database, sql, params, truncate=True, **extra_args
230 )
231 columns = [r[0] for r in results.description]
233 if canned_query:
234 templates.insert(
235 0,
236 "query-{}-{}.html".format(
237 to_css_class(database), to_css_class(canned_query)
238 ),
239 )
241 async def extra_template():
242 display_rows = []
243 for row in results.rows:
244 display_row = []
245 for column, value in zip(results.columns, row):
246 display_value = value
247 # Let the plugins have a go
248 # pylint: disable=no-member
249 plugin_value = pm.hook.render_cell(
250 value=value,
251 column=column,
252 table=None,
253 database=database,
254 datasette=self.ds,
255 )
256 if plugin_value is not None:
257 display_value = plugin_value
258 else:
259 if value in ("", None):
260 display_value = jinja2.Markup(" ")
261 elif is_url(str(display_value).strip()):
262 display_value = jinja2.Markup(
263 '<a href="{url}">{url}</a>'.format(
264 url=jinja2.escape(value.strip())
265 )
266 )
267 display_row.append(display_value)
268 display_rows.append(display_row)
269 return {
270 "display_rows": display_rows,
271 "custom_sql": True,
272 "named_parameter_values": named_parameter_values,
273 "editable": editable,
274 "canned_query": canned_query,
275 "metadata": metadata,
276 "config": self.ds.config_dict(),
277 "request": request,
278 "path_with_added_args": path_with_added_args,
279 "path_with_removed_args": path_with_removed_args,
280 "hide_sql": "_hide_sql" in params,
281 }
283 return (
284 {
285 "database": database,
286 "query_name": canned_query,
287 "rows": results.rows,
288 "truncated": results.truncated,
289 "columns": columns,
290 "query": {"sql": sql, "params": params},
291 "private": private,
292 "allow_execute_sql": await self.ds.permission_allowed(
293 request.actor, "execute-sql", database, default=True
294 ),
295 },
296 extra_template,
297 templates,
298 )