From 400c9286342e65ef885669b71e8ede33ae0fde7e Mon Sep 17 00:00:00 2001 From: edersonbrilhante Date: Mon, 1 Dec 2025 16:49:31 +0100 Subject: [PATCH] feat: add support to validate if tenant role allows session tag --- .../forge_trust_validator/forge_roles.tf | 20 +++-- .../lambda/forge_trust_validator.py | 78 +++++++++++++------ 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/modules/platform/forge_runners/forge_trust_validator/forge_roles.tf b/modules/platform/forge_runners/forge_trust_validator/forge_roles.tf index 65ad80b6..6d72880b 100644 --- a/modules/platform/forge_runners/forge_trust_validator/forge_roles.tf +++ b/modules/platform/forge_runners/forge_trust_validator/forge_roles.tf @@ -27,16 +27,24 @@ locals { arn => try(trust.Statement, []) } + # updated_statements[arn]: ensure exactly one statement with this Sid + updated_statements = { + for arn, stmts in local.original_statements : + arn => concat( + [ + for s in stmts : + s if !(can(s.Sid) && s.Sid == local.lambda_trust_statement.Sid) + ], + [local.lambda_trust_statement] + ) + } + # concatenated_trust_object[arn] = full updated policy for each role - # = original (Version + Statements) + our lambda_trust_statement concatenated_trust_object = { for arn, trust in local.original_trust : arn => { - Version = try(trust.Version, "2012-10-17") - Statement = concat( - local.original_statements[arn], - [local.lambda_trust_statement] - ) + Version = try(trust.Version, "2012-10-17") + Statement = local.updated_statements[arn] } } diff --git a/modules/platform/forge_runners/forge_trust_validator/lambda/forge_trust_validator.py b/modules/platform/forge_runners/forge_trust_validator/lambda/forge_trust_validator.py index e3c7d73d..e126653f 100644 --- a/modules/platform/forge_runners/forge_trust_validator/lambda/forge_trust_validator.py +++ b/modules/platform/forge_runners/forge_trust_validator/lambda/forge_trust_validator.py @@ -40,7 +40,10 @@ def build_session_policy_for_tenants(tenant_role_arns: List[str]) -> str: { 'Sid': 'AllowAssumeTenantRolesForValidation', 'Effect': 'Allow', - 'Action': 'sts:AssumeRole', + 'Action': [ + 'sts:AssumeRole', + 'sts:TagSession', + ], 'Resource': tenant_role_arns, } ], @@ -120,36 +123,63 @@ def validate_forge_role_against_tenants( f"Attempting to assume Tenant role: {tenant_arn} from Forge role: {forge_role_arn}") tenant_entry = { 'tenant_role_arn': tenant_arn, - 'success': False, - 'error': None, + 'assume_role_success': False, + 'assume_role_error': None, + 'tag_session_success': False, + 'tag_session_error': None } + + # --- Test 1: Basic AssumeRole (no tags) --- try: - tenant_resp = sts_as_forge.assume_role( + sts_as_forge.assume_role( RoleArn=tenant_arn, - RoleSessionName=f"TenantValidation-{int(time.time())}", - ) - - # Optional: verify the tenant creds actually work - tenant_creds = tenant_resp['Credentials'] - sts_as_tenant = boto3.client( - 'sts', - aws_access_key_id=tenant_creds['AccessKeyId'], - aws_secret_access_key=tenant_creds['SecretAccessKey'], - aws_session_token=tenant_creds['SessionToken'], + RoleSessionName=f"TenantValidation-Basic-{int(time.time())}", ) - identity = sts_as_tenant.get_caller_identity() - LOG.info( - f"Successfully assumed Tenant role: {tenant_arn}. Identity: {identity['Arn']}") - - tenant_entry['success'] = True + LOG.info(f"Basic AssumeRole successful for {tenant_arn}") + tenant_entry['assume_role_success'] = True except ClientError as e: - LOG.error( - f"ClientError assuming Tenant role {tenant_arn}: {e}") - tenant_entry['error'] = str(e) + LOG.error(f"Basic AssumeRole failed for {tenant_arn}: {e}") + tenant_entry['assume_role_error'] = str(e) except Exception as e: LOG.error( - f"Unexpected error assuming Tenant role {tenant_arn}: {e}") - tenant_entry['error'] = f"Unexpected error assuming tenant role: {e}" + f"Unexpected error in Basic AssumeRole for {tenant_arn}: {e}") + tenant_entry['assume_role_error'] = f"Unexpected error: {e}" + + # --- Test 2: AssumeRole WITH Tags (only if basic succeeded) --- + if tenant_entry['assume_role_success']: + try: + tenant_resp = sts_as_forge.assume_role( + RoleArn=tenant_arn, + RoleSessionName=f"TenantValidation-Tags-{int(time.time())}", + Tags=[ + {'Key': 'CreatedBy', 'Value': 'ForgeTrustValidator'}, + {'Key': 'Validation', 'Value': 'True'} + ] + ) + + # Optional: verify the tenant creds actually work + tenant_creds = tenant_resp['Credentials'] + sts_as_tenant = boto3.client( + 'sts', + aws_access_key_id=tenant_creds['AccessKeyId'], + aws_secret_access_key=tenant_creds['SecretAccessKey'], + aws_session_token=tenant_creds['SessionToken'], + ) + identity = sts_as_tenant.get_caller_identity() + LOG.info( + f"AssumeRole WITH Tags successful for {tenant_arn}. Identity: {identity['Arn']}") + + tenant_entry['tag_session_success'] = True + except ClientError as e: + LOG.error( + f"AssumeRole WITH Tags failed for {tenant_arn}: {e}") + tenant_entry['tag_session_error'] = str(e) + except Exception as e: + LOG.error( + f"Unexpected error in AssumeRole WITH Tags for {tenant_arn}: {e}") + tenant_entry['tag_session_error'] = f"Unexpected error: {e}" + else: + tenant_entry['tag_session_error'] = 'Skipped because basic AssumeRole failed' result['tenant_results'].append(tenant_entry)