1+ # ---license-start
2+ # eu-digital-green-certificates / dgc-testdata
3+ # ---
4+ # Copyright (C) 2021 T-Systems International GmbH and all other contributors
5+ # ---
6+ # Licensed under the Apache License, Version 2.0 (the "License");
7+ # you may not use this file except in compliance with the License.
8+ # You may obtain a copy of the License at
9+ #
10+ # http://www.apache.org/licenses/LICENSE-2.0
11+ #
12+ # Unless required by applicable law or agreed to in writing, software
13+ # distributed under the License is distributed on an "AS IS" BASIS,
14+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ # See the License for the specific language governing permissions and
16+ # limitations under the License.
17+ # ---license-end
18+
19+ import os
20+ import re
21+ import cbor2
22+ import base64
23+
24+ from zlib import decompress
25+ from base45 import b45decode
26+ from cose .messages import Sign1Message
27+ from cose .headers import Algorithm , KID
28+ from datetime import date , datetime , timezone
29+ from PIL .Image import NONE , open as image_open
30+ from pyzbar .pyzbar import decode as qrcode_decode
31+
32+
33+ # Constants
34+ TIMESTAMP_ISO8601_EXTENDED = "%Y-%m-%dT%H:%M:%S.%fZ"
35+
36+ class DccQrCode ():
37+ "Represents a DCC QR code based on a file"
38+
39+ def __init__ (self , path ):
40+ def datetime_to_string (decoder , value ):
41+ 'replace datetime objects with a string representation when loading the CBOR'
42+ return {k : v .astimezone (timezone .utc ).strftime (TIMESTAMP_ISO8601_EXTENDED ) \
43+ if isinstance (v , (date , datetime )) else v for k , v in value .items ()}
44+
45+ self .file_path = path
46+ image = image_open ( path )
47+ self .qr_code_data = qrcode_decode (image )[0 ].data .decode ()
48+ if not self .qr_code_data .startswith ('HC1:' ):
49+ raise ValueError ('Encoded data does not begin with magic number "HC1:"' )
50+ self .decompressed = decompress (b45decode (self .qr_code_data [4 :]))
51+ self .sign1Message = Sign1Message .decode (self .decompressed )
52+ self .payload = cbor2 .loads (self .sign1Message .payload , object_hook = datetime_to_string )
53+ self ._path_country = None
54+
55+ def get_key_id_base64 (self ):
56+ "returns the key ID of the COSE message"
57+ if KID in self .sign1Message .phdr :
58+ kid = self .sign1Message .phdr [KID ]
59+ else :
60+ kid = self .sign1Message .uhdr [KID ]
61+
62+ return base64 .b64encode (kid ).decode ("ascii" )
63+
64+ def get_path_schema_version (self ):
65+ """Returns the schema version that is encoded in the path (exactly 3 digits separated by dots)
66+ or None if no match is found."""
67+ _previous = None
68+ for subdir_name in self .file_path .split (os .sep ):
69+ if re .match ("^\\ d\\ .\\ d\\ .\\ d$" , subdir_name ):
70+ self ._path_country = _previous
71+ return subdir_name
72+ _previous = subdir_name
73+ return None # --> No path schema version
74+
75+ def get_path_country (self ):
76+ """Returns the country code that is encoded in the path right before the schema version"""
77+ if self ._path_country is None :
78+ self .get_path_schema_version ()
79+ return self ._path_country
80+
81+ def get_file_name (self ):
82+ return self .file_path .split (os .sep )[- 1 ]
0 commit comments