Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions modules/integrations/splunk_cloud_conf_shared/transforms_lambda.tf
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,37 @@ resource "splunk_configs_conf" "forgecicd_extra_lambda_tenant_fields" {
}
}

resource "splunk_configs_conf" "forgecicd_extra_lambda_ec2_tenant_fields" {
name = "transforms/forgecicd_extra_lambda_ec2_tenant_fields"

variables = {
"REGEX" = "(?<aws_region>[^:]+):\\/aws\\/lambda\\/(?<forgecicd_tenant>[a-z0-9]+)-(?<forgecicd_region_alias>[a-z0-9]+)-(?<forgecicd_vpc_alias>[a-z0-9]+)-(?<forgecicd_log_type>ec2-redrive-deadletter|ec2-update-runner-ssm-ami|ec2-update-runner-tags)"
"FORMAT" = "aws_region::$1 forgecicd_tenant::$2 forgecicd_region_alias::$3 forgecicd_vpc_alias::$4 forgecicd_log_type::$5 forgecicd_type::ec2"
"SOURCE_KEY" = "source"
"CLEAN_KEYS" = "0"
}
acl {
app = var.splunk_conf.acl.app
owner = var.splunk_conf.acl.owner
sharing = var.splunk_conf.acl.sharing
read = var.splunk_conf.acl.read
write = var.splunk_conf.acl.write
}
lifecycle {
ignore_changes = [
variables["CAN_OPTIMIZE"],
variables["DEFAULT_VALUE"],
variables["DEPTH_LIMIT"],
variables["DEST_KEY"],
variables["KEEP_EMPTY_VALS"],
variables["LOOKAHEAD"],
variables["MATCH_LIMIT"],
variables["MV_ADD"],
variables["WRITE_META"],
variables["disabled"]
]
}
}

resource "splunk_configs_conf" "forgecicd_trust_validation" {
name = "transforms/forgecicd_trust_validation"
Expand Down
20 changes: 20 additions & 0 deletions modules/platform/ec2_deployment/ec2_redrive_deadletter.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module "ec2_redrive_deadletter" {
source = "./ec2_redrive_deadletter"

providers = {
aws = aws
}

prefix = var.runner_configs.prefix
logging_retention_in_days = var.runner_configs.logging_retention_in_days
log_level = var.runner_configs.log_level
tags = var.tenant_configs.tags

sqs_map = {
for key in keys(var.runner_configs.runner_specs) :
key => {
dlq = "${var.runner_configs.prefix}-${key}-queued-builds_dead_letter"
main = "${var.runner_configs.prefix}-${key}-queued-builds"
}
}
}
56 changes: 56 additions & 0 deletions modules/platform/ec2_deployment/ec2_redrive_deadletter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<!-- BEGIN_TF_DOCS -->
## Requirements

| Name | Version |
|------|---------|
| <a name="requirement_terraform"></a> [terraform](#requirement\_terraform) | >= 1.9.1 |
| <a name="requirement_aws"></a> [aws](#requirement\_aws) | >= 5.27 |
| <a name="requirement_random"></a> [random](#requirement\_random) | >= 3.6 |

## Providers

| Name | Version |
|------|---------|
| <a name="provider_aws"></a> [aws](#provider\_aws) | 6.19.0 |
| <a name="provider_random"></a> [random](#provider\_random) | 3.7.2 |

## Modules

| Name | Source | Version |
|------|--------|---------|
| <a name="module_github_webhook_relay_source"></a> [github\_webhook\_relay\_source](#module\_github\_webhook\_relay\_source) | ../../../integrations/github_webhook_relay_source | n/a |

## Resources

| Name | Type |
|------|------|
| [aws_iam_role.secret_reader](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role) | resource |
| [aws_iam_role_policy.secret_reader_inline](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/iam_role_policy) | resource |
| [aws_kms_alias.github_webhook_relay](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kms_alias) | resource |
| [aws_kms_key.github_webhook_relay](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/kms_key) | resource |
| [aws_secretsmanager_secret.github_webhook_relay](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret) | resource |
| [aws_secretsmanager_secret_version.github_webhook_relay](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/secretsmanager_secret_version) | resource |
| [random_id.github_webhook_relay_source_secret](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) | resource |
| [aws_iam_policy_document.secret_reader_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_iam_policy_document.secret_reader_trust](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/iam_policy_document) | data source |
| [aws_region.current](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/data-sources/region) | data source |

## Inputs

| Name | Description | Type | Default | Required |
|------|-------------|------|---------|:--------:|
| <a name="input_github_webhook_relay"></a> [github\_webhook\_relay](#input\_github\_webhook\_relay) | Configuration for the (optional) webhook relay source module.<br/>If enabled=true we provision the API Gateway + source EventBridge forwarding rule.<br/>destination\_event\_bus\_name must already exist or be created in the destination account (or via the destination submodule run there). | <pre>object({<br/> enabled = bool<br/> destination_account_id = optional(string)<br/> destination_event_bus_name = optional(string)<br/> destination_region = optional(string)<br/> destination_reader_role_arn = optional(string)<br/> })</pre> | <pre>{<br/> "destination_account_id": "",<br/> "destination_event_bus_name": "",<br/> "destination_reader_role_arn": "",<br/> "destination_region": "",<br/> "enabled": false<br/>}</pre> | no |
| <a name="input_log_level"></a> [log\_level](#input\_log\_level) | Log level for application logging (e.g., INFO, DEBUG, WARN, ERROR) | `string` | `"INFO"` | no |
| <a name="input_logging_retention_in_days"></a> [logging\_retention\_in\_days](#input\_logging\_retention\_in\_days) | Retention in days for CloudWatch Log Group for the Lambdas. | `number` | `30` | no |
| <a name="input_prefix"></a> [prefix](#input\_prefix) | Prefix for all resources | `string` | n/a | yes |
| <a name="input_secret_prefix"></a> [secret\_prefix](#input\_secret\_prefix) | Prefix for secret | `string` | n/a | yes |
| <a name="input_tags"></a> [tags](#input\_tags) | Tags to apply to created resources. | `map(string)` | `{}` | no |

## Outputs

| Name | Description |
|------|-------------|
| <a name="output_source_secret_arn"></a> [source\_secret\_arn](#output\_source\_secret\_arn) | ARN of the GitHub webhook relay secret |
| <a name="output_source_secret_region"></a> [source\_secret\_region](#output\_source\_secret\_region) | AWS region the secret resides in |
| <a name="output_source_secret_role_arn"></a> [source\_secret\_role\_arn](#output\_source\_secret\_role\_arn) | ARN of IAM role permitted to read/decrypt the webhook relay secret |
<!-- END_TF_DOCS -->
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
data "aws_caller_identity" "current" {}

data "aws_region" "current" {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import json
import logging
import os
import time
from typing import Dict, List

import boto3

LOG = logging.getLogger()
level_str = os.environ.get('LOG_LEVEL', 'INFO').upper()
LOG.setLevel(getattr(logging, level_str, logging.INFO))

sqs = boto3.client('sqs')


def parse_sqs_map(raw: str) -> List[Dict[str, str]]:
"""
Expected SQS_MAP env var (from Terraform map):
{
"runner-a": {"main": "queue-a-main", "dlq": "queue-a-dlq"},
"runner-b": {"main": "queue-b-main", "dlq": "queue-b-dlq"}
}

Returns a list of:
[{"key": "runner-a", "main": "...", "dlq": "..."}, ...]
"""
if not raw.strip():
return []

try:
parsed = json.loads(raw)
except json.JSONDecodeError as e:
raise Exception(f"Invalid SQS_MAP JSON: {e}. Value: {raw}") from e

if not isinstance(parsed, dict):
raise Exception(
f"SQS_MAP must be a JSON object/map, got: {type(parsed)}")

mappings: List[Dict[str, str]] = []
for key, value in parsed.items():
if not isinstance(value, dict):
raise Exception(
f"SQS_MAP['{key}'] must be an object with 'main' and 'dlq'")
if 'main' not in value or 'dlq' not in value:
raise Exception(
f"SQS_MAP['{key}'] missing 'main' or 'dlq' keys: {value}")
mappings.append(
{
'key': key,
'main': str(value['main']),
'dlq': str(value['dlq']),
}
)

return mappings


def resolve_queue_url(sqs_client, name_or_url: str) -> str:
"""Return URL if already full URL, else resolve by queue name."""
if name_or_url.startswith('https://'):
return name_or_url

resp = sqs_client.get_queue_url(QueueName=name_or_url)
return resp['QueueUrl']


def drain_dlq(sqs_client, dlq_url: str, main_url: str) -> int:
"""Drain DLQ → main queue, returns number of moved messages."""
moved = 0

while True:
resp = sqs_client.receive_message(
QueueUrl=dlq_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=0,
VisibilityTimeout=30,
)

messages = resp.get('Messages', [])
if not messages:
break

for msg in messages:
sqs_client.send_message(
QueueUrl=main_url,
MessageBody=msg['Body'],
)
sqs_client.delete_message(
QueueUrl=dlq_url,
ReceiptHandle=msg['ReceiptHandle'],
)
moved += 1

# small throttle to avoid hammering SQS too hard
time.sleep(0.1)

return moved


def lambda_handler(event, context):
raw_sqs_map = os.getenv('SQS_MAP', '')
mappings = parse_sqs_map(raw_sqs_map)

if not mappings:
LOG.warning('SQS_MAP is empty; nothing to do.')
return {'status': 'noop', 'message': 'SQS_MAP is empty', 'results': []}

LOG.info('Starting DLQ drain for %d mapping(s)', len(mappings))

results = []
for entry in mappings:
key = entry['key']
dlq_name_or_url = entry['dlq']
main_name_or_url = entry['main']

LOG.info('Processing SQS mapping key=%s dlq=%s main=%s',
key, dlq_name_or_url, main_name_or_url)

dlq_url = resolve_queue_url(sqs, dlq_name_or_url)
main_url = resolve_queue_url(sqs, main_name_or_url)

moved = drain_dlq(sqs, dlq_url, main_url)

LOG.info(
'Finished SQS mapping key=%s dlq=%s main=%s moved=%d',
key,
dlq_name_or_url,
main_name_or_url,
moved,
)

results.append(
{
'key': key,
'dlq': dlq_name_or_url,
'main': main_name_or_url,
'moved': moved,
}
)

return {'status': 'ok', 'results': results}
106 changes: 106 additions & 0 deletions modules/platform/ec2_deployment/ec2_redrive_deadletter/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
module "ec2_redrive_deadletter_lambda" {
source = "terraform-aws-modules/lambda/aws"
version = "8.1.2"

function_name = "${var.prefix}-ec2-redrive-deadletter"
handler = "ec2_redrive_deadletter.lambda_handler"
runtime = "python3.12"
timeout = 900
architectures = ["x86_64"]

source_path = [{
path = "${path.module}/lambda"
}]

logging_log_group = aws_cloudwatch_log_group.ec2_redrive_deadletter_lambda.name
use_existing_cloudwatch_log_group = true

trigger_on_package_timestamp = false

environment_variables = {
SQS_MAP = jsonencode(var.sqs_map)
LOG_LEVEL = var.log_level
}

attach_policy_json = true

policy_json = data.aws_iam_policy_document.ec2_redrive_deadletter_lambda.json

function_tags = var.tags
role_tags = var.tags
tags = var.tags

depends_on = [aws_cloudwatch_log_group.ec2_redrive_deadletter_lambda]
}

data "aws_iam_policy_document" "ec2_redrive_deadletter_lambda" {
statement {
sid = "SQSReceiveFromDLQ"
effect = "Allow"

actions = [
"sqs:ReceiveMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:DeleteMessage",
]

resources = [
for key, cfg in var.sqs_map :
"arn:aws:sqs:${data.aws_region.current.region}:${data.aws_caller_identity.current.account_id}:${cfg.dlq}"
]
}

statement {
sid = "SQSSendToMainQueue"
effect = "Allow"

actions = [
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
]

resources = [
for key, cfg in var.sqs_map :
"arn:aws:sqs:${data.aws_region.current.region}:${data.aws_caller_identity.current.account_id}:${cfg.main}"
]
}
}



resource "aws_cloudwatch_log_group" "ec2_redrive_deadletter_lambda" {
name = "/aws/lambda/${var.prefix}-ec2-redrive-deadletter"
retention_in_days = var.logging_retention_in_days
tags = var.tags
tags_all = var.tags
}

resource "aws_cloudwatch_event_rule" "ec2_redrive_deadletter_lambda" {
name = "${var.prefix}-ec2-redrive-deadletter"
description = "Trigger Lambda every 10 minutes"
schedule_expression = "cron(*/10 * * * ? *)"

tags = var.tags
tags_all = var.tags

depends_on = [module.ec2_redrive_deadletter_lambda]
}

resource "aws_cloudwatch_event_target" "ec2_redrive_deadletter_lambda" {
rule = aws_cloudwatch_event_rule.ec2_redrive_deadletter_lambda.name
arn = module.ec2_redrive_deadletter_lambda.lambda_function_arn

depends_on = [module.ec2_redrive_deadletter_lambda]
}

resource "aws_lambda_permission" "ec2_redrive_deadletter_lambda" {
action = "lambda:InvokeFunction"
function_name = "${var.prefix}-ec2-redrive-deadletter"
principal = "events.amazonaws.com"
statement_id = "AllowExecutionFromCloudWatch"
source_arn = aws_cloudwatch_event_rule.ec2_redrive_deadletter_lambda.arn

depends_on = [module.ec2_redrive_deadletter_lambda]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
variable "prefix" {
description = "Prefix for all resources"
type = string
}

variable "tags" {
description = "Tags to apply to created resources."
type = map(string)
default = {}
}

variable "logging_retention_in_days" {
description = "Retention in days for CloudWatch Log Group for the Lambdas."
type = number
default = 30
}

variable "log_level" {
type = string
description = "Log level for application logging (e.g., INFO, DEBUG, WARN, ERROR)"
default = "INFO"
}

variable "sqs_map" {
description = "Map of runner SQS queue names."
type = map(object({
main = string
dlq = string
}))
}
Loading