-
-
Notifications
You must be signed in to change notification settings - Fork 99
Expand file tree
/
Copy pathendpoint_route_handler.py
More file actions
253 lines (219 loc) · 8.95 KB
/
endpoint_route_handler.py
File metadata and controls
253 lines (219 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# Copyright 2021 Camptocamp SA
# @author: Simone Orsi <simone.orsi@camptocamp.com>
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl).
import logging
from odoo import _, api, exceptions, fields, models
ENDPOINT_ROUTE_CONSUMER_MODELS = {
# by db
}
class EndpointRouteHandler(models.AbstractModel):
_name = "endpoint.route.handler"
_inherit = "endpoint.route.sync.mixin"
_description = "Endpoint Route handler"
name = fields.Char(required=True)
route = fields.Char(
required=True,
index=True,
compute="_compute_route",
inverse="_inverse_route",
readonly=False,
store=True,
copy=False,
)
route_group = fields.Char(help="Use this to classify routes together")
route_type = fields.Selection(selection="_selection_route_type", default="http")
auth_type = fields.Selection(
selection="_selection_auth_type", default="user_endpoint"
)
request_content_type = fields.Selection(selection="_selection_request_content_type")
# TODO: this is limiting the possibility of supporting more than one method.
request_method = fields.Selection(
selection="_selection_request_method", required=True
)
# # TODO: validate params? Just for doc? Maybe use Cerberus?
# # -> For now let the implementer validate the params in the snippet.
# request_params = Serialized(help="TODO")
endpoint_hash = fields.Char(
compute="_compute_endpoint_hash", help="Identify the route with its main params"
)
csrf = fields.Boolean(default=False)
readonly = fields.Boolean(default=False)
# TODO: add flag to prevent route updates on save ->
# should be handled by specific actions + filter in a tree view + btn on form
_sql_constraints = [
(
"endpoint_route_unique",
"unique(route)",
"You can register an endpoint route only once.",
)
]
@api.constrains("route")
def _check_route_unique_across_models(self):
"""Make sure routes are unique across all models.
The SQL constraint above, works only on one specific model/table.
Here we check that routes stay unique across all models.
This is mostly to make sure admins know that the route already exists
somewhere else, because route controllers are registered only once
for the same path.
"""
# TODO: add tests registering a fake model.
# However, @simahawk tested manually and it works.
# TODO: shall we check for route existance in the registry instead?
all_models = self._get_endpoint_route_consumer_models()
routes = [x["route"] for x in self.read(["route"])]
clashing_models = []
for model in all_models:
if model != self._name and self.env[model].sudo().search_count(
[("route", "in", routes)]
):
clashing_models.append(model)
if clashing_models:
raise exceptions.UserError(
_(
"Non unique route(s): %(routes)s.\n"
"Found in model(s): %(models)s.\n"
)
% {"routes": ", ".join(routes), "models": ", ".join(clashing_models)}
)
def _get_endpoint_route_consumer_models(self):
global ENDPOINT_ROUTE_CONSUMER_MODELS
if ENDPOINT_ROUTE_CONSUMER_MODELS.get(self.env.cr.dbname):
return ENDPOINT_ROUTE_CONSUMER_MODELS.get(self.env.cr.dbname)
models = []
route_model = "endpoint.route.handler"
for model in self.env.values():
if (
model._name != route_model
and not model._abstract
and route_model in model._inherit
):
models.append(model._name)
ENDPOINT_ROUTE_CONSUMER_MODELS[self.env.cr.dbname] = models
return models
@property
def _logger(self):
return logging.getLogger(self._name)
def _selection_route_type(self):
return [("http", "HTTP"), ("json", "JSON")]
def _selection_auth_type(self):
return [("public", "Public"), ("user_endpoint", "User")]
def _selection_request_method(self):
return [
("GET", "GET"),
("POST", "POST"),
("PUT", "PUT"),
("DELETE", "DELETE"),
]
def _selection_request_content_type(self):
return [
("", "None"),
("text/plain", "Text"),
("text/csv", "CSV"),
("application/json", "JSON"),
("application/xml", "XML"),
("application/x-www-form-urlencoded", "Form"),
("application/json; charset=utf-8", "JSON_UTF8 (Deprecated)"),
]
@api.depends(lambda self: self._routing_impacting_fields())
def _compute_endpoint_hash(self):
# Do not use read to be able to play this on NewId records too
# (NewId records are classified as missing in ACL check).
# values = self.read(self._routing_impacting_fields())
values = [
{fname: rec[fname] for fname in self._routing_impacting_fields()}
for rec in self
]
for rec, vals in zip(self, values, strict=True):
vals.pop("id", None)
rec.endpoint_hash = hash(tuple(vals.values()))
def _routing_impacting_fields(self):
return ("route", "auth_type", "request_method")
@api.depends("route")
def _compute_route(self):
for rec in self:
rec.route = rec._clean_route()
def _inverse_route(self):
for rec in self:
rec.route = rec._clean_route()
# TODO: move to something better? Eg: computed field?
# Shall we use the route_group? TBD!
_endpoint_route_prefix = ""
def _clean_route(self):
route = (self.route or "").strip()
if not route.startswith("/"):
route = "/" + route
prefix = self._endpoint_route_prefix
if prefix and not route.startswith(prefix):
route = prefix + route
return route
_blacklist_routes = ("/", "/web") # TODO: what else?
@api.constrains("route")
def _check_route(self):
for rec in self:
if rec.route in self._blacklist_routes:
raise exceptions.UserError(
_("`%(name)s` uses a blacklisted routed = `%(route)s`")
% {"name": rec.name, "route": rec.route}
)
@api.constrains("request_method", "request_content_type")
def _check_request_method(self):
for rec in self:
if rec.request_method in ("POST", "PUT") and not rec.request_content_type:
raise exceptions.UserError(
_("Request content type is required for POST and PUT.")
)
def _prepare_endpoint_rules(self, options=None):
return [rec._make_controller_rule(options=options) for rec in self]
def _registered_endpoint_rule_keys(self):
return tuple([rec._endpoint_registry_unique_key() for rec in self])
def _endpoint_registry_unique_key(self):
return f"{self._name}:{self.id}"
# TODO: consider if useful or not for single records
def _register_single_controller(self, options=None, key=None, init=False):
"""Shortcut to register one single controller."""
rule = self._make_controller_rule(options=options, key=key)
self._endpoint_registry.update_rules([rule], init=init)
self.env.registry.clear_cache("routing")
self._logger.debug(
"Registered controller %s (auth: %s)", self.route, self.auth_type
)
def _make_controller_rule(self, options=None, key=None):
key = key or self._endpoint_registry_unique_key()
route, routing, endpoint_hash = self._get_routing_info()
options = options or self._default_endpoint_options()
return self._endpoint_registry.make_rule(
# fmt: off
key,
route,
options,
routing,
endpoint_hash,
route_group=self.route_group,
# fmt: on
)
def _default_endpoint_options(self):
options = {"handler": self._default_endpoint_options_handler()}
return options
def _default_endpoint_options_handler(self):
self._logger.warning(
"No specific endpoint handler options defined for: %s, falling back to "
"default",
self._name,
)
base_path = "odoo.addons.endpoint_route_handler.controllers.main"
return {
"klass_dotted_path": f"{base_path}.EndpointNotFoundController",
"method_name": "auto_not_found",
"default_pargs": (self.route,),
}
def _get_routing_info(self):
route = self.route
routing = dict(
type=self.route_type,
auth=self.auth_type,
methods=[self.request_method],
routes=[route],
csrf=self.csrf,
readonly=self.readonly,
)
return route, routing, self.endpoint_hash