Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions scripts/aws/ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,26 @@ def get_token_url(cls) -> str:
@classmethod
def get_meta_url(cls) -> str:
return f"http://{cls.AWS_METADATA}/latest/dynamic/instance-identity/document"



# Default URLs for core/optout by identity scope + environment
# CloudFormation template injects these secrets, but this is a fallback in case
# of a manual AMI_only upgrade
DEFAULT_BASE_URLS = {
"uid2": {
"integ": {"core_base_url": "https://core-integ.uidapi.com",
"optout_base_url": "https://optout-integ.uidapi.com"},
"prod": {"core_base_url": "https://core-prod.uidapi.com",
"optout_base_url": "https://optout-prod.uidapi.com"},
},
"euid": {
"integ": {"core_base_url": "https://core.integ.euid.eu",
"optout_base_url": "https://optout.integ.euid.eu"},
"prod": {"core_base_url": "https://core.prod.euid.eu",
"optout_base_url": "https://optout.prod.euid.eu"},
}
}


class EC2(ConfidentialCompute):

Expand Down Expand Up @@ -112,6 +131,7 @@ def add_defaults(configs: Dict[str, any]) -> AWSConfidentialComputeConfig:
configs.setdefault("debug_mode", False)
configs.setdefault("core_api_token", configs.get("operator_key"))
configs.setdefault("optout_api_token", configs.get("operator_key"))
self.__set_base_url_defaults(configs)
return configs

region = self.__get_current_region()
Expand All @@ -129,6 +149,24 @@ def add_defaults(configs: Dict[str, any]) -> AWSConfidentialComputeConfig:
except ClientError as _:
raise OperatorKeyNotFoundError(self.__class__.__name__, f"Secret Manager {secret_identifier} in {region}")

@staticmethod
def __get_identity_scope() -> str:
"""Reads the identity scope (uid2/euid) baked into the AMI at build time."""
with open("/opt/uid2operator/identity_scope.txt") as file:
return file.read().strip().lower()

def __set_base_url_defaults(self, configs: Dict[str, any]) -> None:
"""Fills core_base_url/optout_base_url from environment + identity scope when the
secret omits them. No-op when a value is already present, or when scope/environment
is unrecognised (validate_configuration then surfaces the appropriate error)."""
scope = self.__get_identity_scope()
environment = configs.get("environment")
defaults = DEFAULT_BASE_URLS.get(scope, {}).get(environment, {})
for key, url in defaults.items():
if not configs.get(key):
logging.info(f"{key} not provided in secret; defaulting to {url} ({scope}/{environment})")
configs[key] = url

@staticmethod
def __get_max_capacity():
try:
Expand Down Expand Up @@ -255,10 +293,8 @@ def __get_secret_name_from_userdata(self) -> str:
response = requests.get(AuxiliaryConfig.get_user_data_url(), headers={"X-aws-ec2-metadata-token": token})
user_data = response.text

with open("/opt/uid2operator/identity_scope.txt") as file:
identity_scope = file.read().strip()

default_name = f"{identity_scope.lower()}-operator-config-key"
identity_scope = self.__get_identity_scope()
default_name = f"{identity_scope}-operator-config-key"
Comment thread
cYKatherine marked this conversation as resolved.
hardcoded_value = f"{identity_scope.upper()}_CONFIG_SECRET_KEY"
match = re.search(rf'^export {hardcoded_value}="(.+?)"$', user_data, re.MULTILINE)
return match.group(1) if match else default_name
Expand Down
Loading