Coverage for datasette/views/index.py : 96%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import hashlib
2import json
4from datasette.utils import check_visibility, CustomJSONEncoder
5from datasette.utils.asgi import Response, Forbidden
6from datasette.version import __version__
8from .base import BaseView
11# Truncate table list on homepage at:
12TRUNCATE_AT = 5
14# Only attempt counts if database less than this size in bytes:
15COUNT_DB_SIZE_LIMIT = 100 * 1024 * 1024
18class IndexView(BaseView):
19 name = "index"
21 def __init__(self, datasette):
22 self.ds = datasette
24 async def get(self, request, as_format):
25 await self.check_permission(request, "view-instance")
26 databases = []
27 for name, db in self.ds.databases.items():
28 visible, database_private = await check_visibility(
29 self.ds, request.actor, "view-database", name,
30 )
31 if not visible:
32 continue
33 table_names = await db.table_names()
34 hidden_table_names = set(await db.hidden_table_names())
36 views = []
37 for view_name in await db.view_names():
38 visible, private = await check_visibility(
39 self.ds, request.actor, "view-table", (name, view_name),
40 )
41 if visible:
42 views.append({"name": view_name, "private": private})
44 # Perform counts only for immutable or DBS with <= COUNT_TABLE_LIMIT tables
45 table_counts = {}
46 if not db.is_mutable or db.size < COUNT_DB_SIZE_LIMIT:
47 table_counts = await db.table_counts(10)
48 # If any of these are None it means at least one timed out - ignore them all
49 if any(v is None for v in table_counts.values()):
50 table_counts = {}
52 tables = {}
53 for table in table_names:
54 visible, private = await check_visibility(
55 self.ds, request.actor, "view-table", (name, table),
56 )
57 if not visible:
58 continue
59 table_columns = await db.table_columns(table)
60 tables[table] = {
61 "name": table,
62 "columns": table_columns,
63 "primary_keys": await db.primary_keys(table),
64 "count": table_counts.get(table),
65 "hidden": table in hidden_table_names,
66 "fts_table": await db.fts_table(table),
67 "num_relationships_for_sorting": 0,
68 "private": private,
69 }
71 if request.args.get("_sort") == "relationships" or not table_counts:
72 # We will be sorting by number of relationships, so populate that field
73 all_foreign_keys = await db.get_all_foreign_keys()
74 for table, foreign_keys in all_foreign_keys.items():
75 count = len(foreign_keys["incoming"] + foreign_keys["outgoing"])
76 tables[table]["num_relationships_for_sorting"] = count
78 hidden_tables = [t for t in tables.values() if t["hidden"]]
79 visible_tables = [t for t in tables.values() if not t["hidden"]]
81 tables_and_views_truncated = list(
82 sorted(
83 (t for t in tables.values() if t not in hidden_tables),
84 key=lambda t: (
85 t["num_relationships_for_sorting"],
86 t["count"] or 0,
87 t["name"],
88 ),
89 reverse=True,
90 )[:TRUNCATE_AT]
91 )
93 # Only add views if this is less than TRUNCATE_AT
94 if len(tables_and_views_truncated) < TRUNCATE_AT:
95 num_views_to_add = TRUNCATE_AT - len(tables_and_views_truncated)
96 for view in views[:num_views_to_add]:
97 tables_and_views_truncated.append(view)
99 databases.append(
100 {
101 "name": name,
102 "hash": db.hash,
103 "color": db.hash[:6]
104 if db.hash
105 else hashlib.md5(name.encode("utf8")).hexdigest()[:6],
106 "path": self.database_url(name),
107 "tables_and_views_truncated": tables_and_views_truncated,
108 "tables_and_views_more": (len(visible_tables) + len(views))
109 > TRUNCATE_AT,
110 "tables_count": len(visible_tables),
111 "table_rows_sum": sum((t["count"] or 0) for t in visible_tables),
112 "show_table_row_counts": bool(table_counts),
113 "hidden_table_rows_sum": sum(
114 t["count"] for t in hidden_tables if t["count"] is not None
115 ),
116 "hidden_tables_count": len(hidden_tables),
117 "views_count": len(views),
118 "private": database_private,
119 }
120 )
122 if as_format:
123 headers = {}
124 if self.ds.cors:
125 headers["Access-Control-Allow-Origin"] = "*"
126 return Response(
127 json.dumps({db["name"]: db for db in databases}, cls=CustomJSONEncoder),
128 content_type="application/json; charset=utf-8",
129 headers=headers,
130 )
131 else:
132 return await self.render(
133 ["index.html"],
134 request=request,
135 context={
136 "databases": databases,
137 "metadata": self.ds.metadata(),
138 "datasette_version": __version__,
139 "private": not await self.ds.permission_allowed(
140 None, "view-instance", default=True
141 ),
142 },
143 )