Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,24 @@ locals {
arn => try(trust.Statement, [])
}

# updated_statements[arn]: ensure exactly one statement with this Sid
updated_statements = {
for arn, stmts in local.original_statements :
arn => concat(
[
for s in stmts :
s if !(can(s.Sid) && s.Sid == local.lambda_trust_statement.Sid)
],
[local.lambda_trust_statement]
)
}

# concatenated_trust_object[arn] = full updated policy for each role
# = original (Version + Statements) + our lambda_trust_statement
concatenated_trust_object = {
for arn, trust in local.original_trust :
arn => {
Version = try(trust.Version, "2012-10-17")
Statement = concat(
local.original_statements[arn],
[local.lambda_trust_statement]
)
Version = try(trust.Version, "2012-10-17")
Statement = local.updated_statements[arn]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def build_session_policy_for_tenants(tenant_role_arns: List[str]) -> str:
{
'Sid': 'AllowAssumeTenantRolesForValidation',
'Effect': 'Allow',
'Action': 'sts:AssumeRole',
'Action': [
'sts:AssumeRole',
'sts:TagSession',
],
'Resource': tenant_role_arns,
}
],
Expand Down Expand Up @@ -120,36 +123,63 @@ def validate_forge_role_against_tenants(
f"Attempting to assume Tenant role: {tenant_arn} from Forge role: {forge_role_arn}")
tenant_entry = {
'tenant_role_arn': tenant_arn,
'success': False,
'error': None,
'assume_role_success': False,
'assume_role_error': None,
'tag_session_success': False,
'tag_session_error': None
}

# --- Test 1: Basic AssumeRole (no tags) ---
try:
tenant_resp = sts_as_forge.assume_role(
sts_as_forge.assume_role(
RoleArn=tenant_arn,
RoleSessionName=f"TenantValidation-{int(time.time())}",
)

# Optional: verify the tenant creds actually work
tenant_creds = tenant_resp['Credentials']
sts_as_tenant = boto3.client(
'sts',
aws_access_key_id=tenant_creds['AccessKeyId'],
aws_secret_access_key=tenant_creds['SecretAccessKey'],
aws_session_token=tenant_creds['SessionToken'],
RoleSessionName=f"TenantValidation-Basic-{int(time.time())}",
)
identity = sts_as_tenant.get_caller_identity()
LOG.info(
f"Successfully assumed Tenant role: {tenant_arn}. Identity: {identity['Arn']}")

tenant_entry['success'] = True
LOG.info(f"Basic AssumeRole successful for {tenant_arn}")
tenant_entry['assume_role_success'] = True
except ClientError as e:
LOG.error(
f"ClientError assuming Tenant role {tenant_arn}: {e}")
tenant_entry['error'] = str(e)
LOG.error(f"Basic AssumeRole failed for {tenant_arn}: {e}")
tenant_entry['assume_role_error'] = str(e)
except Exception as e:
LOG.error(
f"Unexpected error assuming Tenant role {tenant_arn}: {e}")
tenant_entry['error'] = f"Unexpected error assuming tenant role: {e}"
f"Unexpected error in Basic AssumeRole for {tenant_arn}: {e}")
tenant_entry['assume_role_error'] = f"Unexpected error: {e}"

# --- Test 2: AssumeRole WITH Tags (only if basic succeeded) ---
if tenant_entry['assume_role_success']:
try:
tenant_resp = sts_as_forge.assume_role(
RoleArn=tenant_arn,
RoleSessionName=f"TenantValidation-Tags-{int(time.time())}",
Tags=[
{'Key': 'CreatedBy', 'Value': 'ForgeTrustValidator'},
{'Key': 'Validation', 'Value': 'True'}
]
)

# Optional: verify the tenant creds actually work
tenant_creds = tenant_resp['Credentials']
sts_as_tenant = boto3.client(
'sts',
aws_access_key_id=tenant_creds['AccessKeyId'],
aws_secret_access_key=tenant_creds['SecretAccessKey'],
aws_session_token=tenant_creds['SessionToken'],
)
identity = sts_as_tenant.get_caller_identity()
LOG.info(
f"AssumeRole WITH Tags successful for {tenant_arn}. Identity: {identity['Arn']}")

tenant_entry['tag_session_success'] = True
except ClientError as e:
LOG.error(
f"AssumeRole WITH Tags failed for {tenant_arn}: {e}")
tenant_entry['tag_session_error'] = str(e)
except Exception as e:
LOG.error(
f"Unexpected error in AssumeRole WITH Tags for {tenant_arn}: {e}")
tenant_entry['tag_session_error'] = f"Unexpected error: {e}"
else:
tenant_entry['tag_session_error'] = 'Skipped because basic AssumeRole failed'

result['tenant_results'].append(tenant_entry)

Expand Down