Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 37 additions & 42 deletions basyx/aas/adapter/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,11 +490,13 @@ def _get_obj_ts(self, identifier: model.Identifier, type_: Type[model.provider._
identifiable = self.object_store.get(identifier)
if not isinstance(identifiable, type_):
raise NotFound(f"No {type_.__name__} with {identifier} found!")
identifiable.update()
return identifiable

def _get_all_obj_of_type(self, type_: Type[model.provider._IT]) -> Iterator[model.provider._IT]:
for obj in self.object_store:
if isinstance(obj, type_):
obj.update()
yield obj

def _resolve_reference(self, reference: model.ModelReference[model.base._RT]) -> model.base._RT:
Expand Down Expand Up @@ -559,6 +561,28 @@ def _get_submodel_reference(cls, aas: model.AssetAdministrationShell, submodel_i
return ref
raise NotFound(f"The AAS {aas!r} doesn't have a submodel reference to {submodel_id!r}!")

def _get_shells(self, request: Request) -> Iterator[model.AssetAdministrationShell]:
aas: Iterator[model.AssetAdministrationShell] = self._get_all_obj_of_type(model.AssetAdministrationShell)

id_short = request.args.get("idShort")
if id_short is not None:
aas = filter(lambda shell: shell.id_short == id_short, aas)

asset_ids = request.args.getlist("assetIds")
if asset_ids is not None:
# Decode and instantiate SpecificAssetIds
# This needs to be a list, otherwise we can only iterate it once.
specific_asset_ids: List[model.SpecificAssetId] = list(
map(lambda asset_id: HTTPApiDecoder.base64urljson(asset_id, model.SpecificAssetId, False), asset_ids))
# Filter AAS based on these SpecificAssetIds
aas = filter(lambda shell: all(specific_asset_id in shell.asset_information.specific_asset_id
for specific_asset_id in specific_asset_ids), aas)

return aas

def _get_shell(self, url_args: Dict) -> model.AssetAdministrationShell:
return self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)

def _get_submodels(self, request: Request) -> Iterator[model.Submodel]:
submodels: Iterator[model.Submodel] = self._get_all_obj_of_type(model.Submodel)
id_short = request.args.get("idShort")
Expand All @@ -572,9 +596,7 @@ def _get_submodels(self, request: Request) -> Iterator[model.Submodel]:
return submodels

def _get_submodel(self, url_args: Dict) -> model.Submodel:
submodel = self._get_obj_ts(url_args["submodel_id"], model.Submodel)
submodel.update()
return submodel
return self._get_obj_ts(url_args["submodel_id"], model.Submodel)

def _get_submodel_submodel_elements_id_short_path(self, url_args: Dict) \
-> model.SubmodelElement:
Expand Down Expand Up @@ -605,23 +627,7 @@ def handle_request(self, request: Request):
# ------ AAS REPO ROUTES -------
def get_aas_all(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas: Iterator[model.AssetAdministrationShell] = self._get_all_obj_of_type(model.AssetAdministrationShell)

id_short = request.args.get("idShort")
if id_short is not None:
aas = filter(lambda shell: shell.id_short == id_short, aas)

asset_ids = request.args.getlist("assetIds")
if asset_ids is not None:
# Decode and instantiate SpecificAssetIds
# This needs to be a list, otherwise we can only iterate it once.
specific_asset_ids: List[model.SpecificAssetId] = list(
map(lambda asset_id: HTTPApiDecoder.base64urljson(asset_id, model.SpecificAssetId, False), asset_ids))
# Filter AAS based on these SpecificAssetIds
aas = filter(lambda shell: all(specific_asset_id in shell.asset_information.specific_asset_id
for specific_asset_id in specific_asset_ids), aas)

return response_t(list(aas))
return response_t(list(self._get_shells(request)))

def post_aas(self, request: Request, url_args: Dict, map_adapter: MapAdapter) -> Response:
response_t = get_response_type(request)
Expand All @@ -638,52 +644,45 @@ def post_aas(self, request: Request, url_args: Dict, map_adapter: MapAdapter) ->

def delete_aas(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
self.object_store.remove(self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell))
self.object_store.remove(self._get_shell(url_args))
return response_t()

# --------- AAS ROUTES ---------
def get_aas(self, request: Request, url_args: Dict, **_kwargs) -> Response:
# TODO: support content parameter
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
return response_t(aas, stripped=is_stripped_request(request))

def put_aas(self, request: Request, url_args: Dict, **_kwargs) -> Response:
# TODO: support content parameter
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
aas.update_from(HTTPApiDecoder.request_body(request, model.AssetAdministrationShell,
is_stripped_request(request)))
aas.commit()
return response_t()

def get_aas_asset_information(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
return response_t(aas.asset_information)

def put_aas_asset_information(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
aas.asset_information = HTTPApiDecoder.request_body(request, model.AssetInformation, False)
aas.commit()
return response_t()

def get_aas_submodel_refs(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
return response_t(list(aas.submodel))

def post_aas_submodel_refs(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas_identifier = url_args["aas_id"]
aas = self._get_obj_ts(aas_identifier, model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
sm_ref = HTTPApiDecoder.request_body(request, model.ModelReference, False)
if sm_ref in aas.submodel:
raise Conflict(f"{sm_ref!r} already exists!")
Expand All @@ -693,16 +692,14 @@ def post_aas_submodel_refs(self, request: Request, url_args: Dict, **_kwargs) ->

def delete_aas_submodel_refs_specific(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
aas.submodel.remove(self._get_submodel_reference(aas, url_args["submodel_id"]))
aas.commit()
return response_t()

def put_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
sm_ref = self._get_submodel_reference(aas, url_args["submodel_id"])
submodel = self._resolve_reference(sm_ref)
new_submodel = HTTPApiDecoder.request_body(request, model.Submodel, is_stripped_request(request))
Expand All @@ -719,8 +716,7 @@ def put_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, **_kw

def delete_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, **_kwargs) -> Response:
response_t = get_response_type(request)
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
sm_ref = self._get_submodel_reference(aas, url_args["submodel_id"])
submodel = self._resolve_reference(sm_ref)
self.object_store.remove(submodel)
Expand All @@ -729,8 +725,7 @@ def delete_aas_submodel_refs_submodel(self, request: Request, url_args: Dict, **
return response_t()

def aas_submodel_refs_redirect(self, request: Request, url_args: Dict, map_adapter: MapAdapter) -> Response:
aas = self._get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
aas = self._get_shell(url_args)
# the following makes sure the reference exists
self._get_submodel_reference(aas, url_args["submodel_id"])
redirect_url = map_adapter.build(self.get_submodel, {
Expand Down