Skip to content
2 changes: 1 addition & 1 deletion .codegen.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{ "engineHash": "a3ec39e", "specHash": "8402463", "version": "1.15.0" }
{ "engineHash": "b8e7dbb", "specHash": "8402463", "version": "1.15.0" }
70 changes: 44 additions & 26 deletions box_sdk_gen/box/jwt_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@

from box_sdk_gen.internal.utils import JwtAlgorithm

from box_sdk_gen.internal.utils import PrivateKeyDecryptor

from box_sdk_gen.internal.utils import DefaultPrivateKeyDecryptor

from box_sdk_gen.managers.authorization import AuthorizationManager

from box_sdk_gen.box.errors import BoxSDKError
Expand Down Expand Up @@ -161,7 +165,8 @@ def __init__(
enterprise_id: Optional[str] = None,
user_id: Optional[str] = None,
algorithm: Optional[JwtAlgorithm] = JwtAlgorithm.RS256,
token_storage: TokenStorage = None
token_storage: TokenStorage = None,
private_key_decryptor: PrivateKeyDecryptor = None
):
"""
:param client_id: App client ID
Expand All @@ -181,6 +186,8 @@ def __init__(
"""
if token_storage is None:
token_storage = InMemoryTokenStorage()
if private_key_decryptor is None:
private_key_decryptor = DefaultPrivateKeyDecryptor()
self.client_id = client_id
self.client_secret = client_secret
self.jwt_key_id = jwt_key_id
Expand All @@ -190,10 +197,14 @@ def __init__(
self.user_id = user_id
self.algorithm = algorithm
self.token_storage = token_storage
self.private_key_decryptor = private_key_decryptor

@staticmethod
def from_config_json_string(
config_json_string: str, *, token_storage: Optional[TokenStorage] = None
config_json_string: str,
*,
token_storage: Optional[TokenStorage] = None,
private_key_decryptor: Optional[PrivateKeyDecryptor] = None
) -> 'JWTConfig':
"""
Create an auth instance as defined by a string content of JSON file downloaded from the Box Developer Console.
Expand All @@ -202,39 +213,41 @@ def from_config_json_string(

:param config_json_string: String content of JSON file containing the configuration.
:type config_json_string: str
:param token_storage: Object responsible for storing token. If no custom implementation provided, the token will be stored in memory.g, defaults to None
:param token_storage: Object responsible for storing token. If no custom implementation provided, the token will be stored in memory, defaults to None
:type token_storage: Optional[TokenStorage], optional
:param private_key_decryptor: Object responsible for decrypting private key for jwt auth. If no custom implementation provided, the DefaultPrivateKeyDecryptor will be used., defaults to None
:type private_key_decryptor: Optional[PrivateKeyDecryptor], optional
"""
config_json: JwtConfigFile = deserialize(
json_to_serialized_data(config_json_string), JwtConfigFile
)
new_config: 'JWTConfig' = (
JWTConfig(
client_id=config_json.box_app_settings.client_id,
client_secret=config_json.box_app_settings.client_secret,
enterprise_id=config_json.enterprise_id,
user_id=config_json.user_id,
jwt_key_id=config_json.box_app_settings.app_auth.public_key_id,
private_key=config_json.box_app_settings.app_auth.private_key,
private_key_passphrase=config_json.box_app_settings.app_auth.passphrase,
token_storage=token_storage,
)
if not token_storage == None
else JWTConfig(
client_id=config_json.box_app_settings.client_id,
client_secret=config_json.box_app_settings.client_secret,
enterprise_id=config_json.enterprise_id,
user_id=config_json.user_id,
jwt_key_id=config_json.box_app_settings.app_auth.public_key_id,
private_key=config_json.box_app_settings.app_auth.private_key,
private_key_passphrase=config_json.box_app_settings.app_auth.passphrase,
)
token_storage_to_use: Optional[TokenStorage] = (
InMemoryTokenStorage() if token_storage == None else token_storage
)
private_key_decryptor_to_use: Optional[PrivateKeyDecryptor] = (
DefaultPrivateKeyDecryptor()
if private_key_decryptor == None
else private_key_decryptor
)
new_config: 'JWTConfig' = JWTConfig(
client_id=config_json.box_app_settings.client_id,
client_secret=config_json.box_app_settings.client_secret,
enterprise_id=config_json.enterprise_id,
user_id=config_json.user_id,
jwt_key_id=config_json.box_app_settings.app_auth.public_key_id,
private_key=config_json.box_app_settings.app_auth.private_key,
private_key_passphrase=config_json.box_app_settings.app_auth.passphrase,
token_storage=token_storage_to_use,
private_key_decryptor=private_key_decryptor_to_use,
)
return new_config

@staticmethod
def from_config_file(
config_file_path: str, *, token_storage: Optional[TokenStorage] = None
config_file_path: str,
*,
token_storage: Optional[TokenStorage] = None,
private_key_decryptor: Optional[PrivateKeyDecryptor] = None
) -> 'JWTConfig':
"""
Create an auth instance as defined by a JSON file downloaded from the Box Developer Console.
Expand All @@ -245,10 +258,14 @@ def from_config_file(
:type config_file_path: str
:param token_storage: Object responsible for storing token. If no custom implementation provided, the token will be stored in memory., defaults to None
:type token_storage: Optional[TokenStorage], optional
:param private_key_decryptor: Object responsible for decrypting private key for jwt auth. If no custom implementation provided, the DefaultPrivateKeyDecryptor will be used., defaults to None
:type private_key_decryptor: Optional[PrivateKeyDecryptor], optional
"""
config_json_string: str = read_text_from_file(config_file_path)
return JWTConfig.from_config_json_string(
config_json_string, token_storage=token_storage
config_json_string,
token_storage=token_storage,
private_key_decryptor=private_key_decryptor,
)


Expand Down Expand Up @@ -298,6 +315,7 @@ def refresh_token(
issuer=self.config.client_id,
jwtid=get_uuid(),
keyid=self.config.jwt_key_id,
private_key_decryptor=self.config.private_key_decryptor,
)
jwt_key: JwtKey = JwtKey(
key=self.config.private_key, passphrase=self.config.private_key_passphrase
Expand Down
54 changes: 35 additions & 19 deletions box_sdk_gen/internal/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from io import SEEK_CUR, SEEK_END, SEEK_SET, BufferedIOBase, BytesIO
from typing import Any, Callable, Dict, Iterable, Optional, TypeVar, BinaryIO

from abc import abstractmethod
from typing import Any

try:
import jwt
from cryptography.hazmat.backends import default_backend
Expand Down Expand Up @@ -270,6 +273,31 @@ def get_value_from_object_raw_data(obj: BaseObject, key: str) -> Any:
return value


class PrivateKeyDecryptor:
"""Class used for private key decryption in JWT auth."""

@abstractmethod
def decrypt_private_key(self, encryptedPrivateKey: str, passphrase: str) -> Any:
"""Decrypts private key using a passphrase."""
pass


class DefaultPrivateKeyDecryptor(PrivateKeyDecryptor):
def decrypt_private_key(self, encryptedPrivateKey: str, passphrase: str) -> Any:
if default_backend is None or serialization is None:
raise ImportError(
'Missing `cryptography` dependency. `cryptography` library is required to create JWT assertion.'
)
encoded_private_key = encode_str_ascii_or_raise(encryptedPrivateKey)
encoded_passphrase = encode_str_ascii_or_raise(passphrase)

return serialization.load_pem_private_key(
encoded_private_key,
password=encoded_passphrase,
backend=default_backend(),
)


class JwtAlgorithm(str, Enum):
HS256 = 'HS256'
HS384 = 'HS384'
Expand All @@ -296,6 +324,7 @@ def __init__(
subject: Optional[str] = None,
jwtid: Optional[str] = None,
keyid: Optional[str] = None,
private_key_decryptor: Optional[PrivateKeyDecryptor] = None,
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -308,6 +337,11 @@ def __init__(
self.subject = subject
self.jwtid = jwtid
self.keyid = keyid
self.private_key_decryptor = (
private_key_decryptor
if private_key_decryptor is not None
else DefaultPrivateKeyDecryptor()
)


class JwtKey(BaseObject):
Expand All @@ -326,24 +360,6 @@ def encode_str_ascii_or_raise(passphrase: str) -> bytes:
) from unicode_error


def get_rsa_private_key(
private_key: str,
passphrase: str,
) -> Any:
if default_backend is None or serialization is None:
raise ImportError(
'Missing `cryptography` dependency. `cryptography` library is required to create JWT assertion.'
)
encoded_private_key = encode_str_ascii_or_raise(private_key)
encoded_passphrase = encode_str_ascii_or_raise(passphrase)

return serialization.load_pem_private_key(
encoded_private_key,
password=encoded_passphrase,
backend=default_backend(),
)


def create_jwt_assertion(claims: dict, key: JwtKey, options: JwtSignOptions) -> str:
if jwt is None:
raise ImportError(
Expand All @@ -358,7 +374,7 @@ def create_jwt_assertion(claims: dict, key: JwtKey, options: JwtSignOptions) ->
'jti': options.jwtid,
'exp': claims['exp'],
},
get_rsa_private_key(key.key, key.passphrase),
options.private_key_decryptor.decrypt_private_key(key.key, key.passphrase),
algorithm=options.algorithm,
headers={'kid': options.keyid},
)
Expand Down
Loading