Skip to content

Commit 708c0e0

Browse files
committed
add aggregate_status_events function
1 parent 7292dd7 commit 708c0e0

File tree

12 files changed

+412
-131
lines changed

12 files changed

+412
-131
lines changed

Gopkg.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.go
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
8+
"bitbucket.org/RocksauceStudios/standup-lambda/modules/standup"
9+
"github.com/altairsix/eventsource"
10+
"github.com/altairsix/eventsource/dynamodbstore"
11+
"github.com/apex/go-apex"
12+
"github.com/apex/go-apex/dynamo"
13+
"github.com/apex/log"
14+
jlog "github.com/apex/log/handlers/json"
15+
"github.com/aws/aws-sdk-go/aws"
16+
"github.com/aws/aws-sdk-go/aws/awserr"
17+
"github.com/aws/aws-sdk-go/aws/session"
18+
"github.com/aws/aws-sdk-go/service/dynamodb"
19+
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
20+
)
21+
22+
func main() {
23+
log.SetHandler(jlog.Default)
24+
25+
dynamo.HandleFunc(func(event *dynamo.Event, bg *apex.Context) (err error) {
26+
27+
log.Infof("event: %#v", event)
28+
29+
// create dynamodb store for events
30+
store, err := dynamodbstore.New(
31+
os.Getenv("AWS_DYNAMODB_TABLE_STATUS_EVENTS"),
32+
dynamodbstore.WithRegion(os.Getenv("AWS_REGION")),
33+
)
34+
if err != nil {
35+
return fmt.Errorf("error creating store: %v", err)
36+
}
37+
38+
// create eventsource repo
39+
repo := eventsource.New(&standup.Status{},
40+
eventsource.WithStore(store),
41+
eventsource.WithSerializer(eventsource.NewJSONSerializer(
42+
standup.StatusSubmitted{},
43+
)),
44+
)
45+
46+
for _, record := range event.Records {
47+
log.Infof("record: %#v", record)
48+
ctx := context.Background()
49+
50+
switch record.EventName {
51+
case "INSERT", "MODIFY":
52+
key := record.Dynamodb.Keys["key"]
53+
if err := aggregate(ctx, repo, *key.S); err != nil {
54+
return err
55+
}
56+
case "REMOVE":
57+
key := record.Dynamodb.Keys["key"]
58+
if err := remove(ctx, *key.S); err != nil {
59+
return err
60+
}
61+
}
62+
63+
}
64+
65+
return
66+
})
67+
}
68+
69+
func aggregate(ctx context.Context, repo *eventsource.Repository, id string) error {
70+
aggregate, err := repo.Load(ctx, id)
71+
if err != nil {
72+
log.WithField("id", id).WithError(err).Info("ignoring load error")
73+
return nil
74+
}
75+
76+
// save aggregate to a dynamodb table
77+
status := aggregate.(*standup.Status)
78+
item, err := dynamodbattribute.MarshalMap(status)
79+
if err != nil {
80+
log.WithError(err).Info("error converting aggregate to dynamodb attribute map")
81+
return err
82+
}
83+
84+
svc := dynamodb.New(session.New())
85+
_, err = svc.PutItem(&dynamodb.PutItemInput{
86+
Item: item,
87+
TableName: aws.String(os.Getenv("AWS_DYNAMODB_TABLE_STATUS_AGGREGATES")),
88+
ReturnConsumedCapacity: aws.String(dynamodb.ReturnConsumedCapacityNone),
89+
})
90+
if err != nil {
91+
log.WithError(err).Info("error with dynamodb PutItem")
92+
return err
93+
}
94+
95+
return nil
96+
}
97+
98+
func remove(_ context.Context, id string) error {
99+
svc := dynamodb.New(session.New())
100+
101+
if _, err := svc.DeleteItem(&dynamodb.DeleteItemInput{
102+
Key: map[string]*dynamodb.AttributeValue{
103+
"id": {S: aws.String(id)},
104+
},
105+
TableName: aws.String(os.Getenv("AWS_DYNAMODB_TABLE_STATUS_AGGREGATES")),
106+
}); err != nil {
107+
if aerr, ok := err.(awserr.Error); ok {
108+
switch aerr.Code() {
109+
case dynamodb.ErrCodeResourceNotFoundException:
110+
// item not in table. ignore...
111+
default:
112+
return fmt.Errorf("error with dyhnamodb DeleteItem (%v): %v", aerr.Code(), err)
113+
}
114+
} else {
115+
return fmt.Errorf("error with dyhnamodb DeleteItem: %v", err)
116+
}
117+
}
118+
119+
return nil
120+
}

functions/slack_standup/main.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
"encoding/json"
1010

11+
"net/http"
12+
1113
"bitbucket.org/RocksauceStudios/standup-lambda/modules/slack"
1214
"bitbucket.org/RocksauceStudios/standup-lambda/modules/standup"
1315
"github.com/altairsix/eventsource"
@@ -23,6 +25,12 @@ type message struct {
2325
Body string `json:"body"`
2426
}
2527

28+
type response struct {
29+
StatusCode int `json:"statusCode"`
30+
Headers map[string]string `json:"headers"`
31+
Body string `json:"body"`
32+
}
33+
2634
type Event struct {
2735
ChannelID string `schema:"channel_id"`
2836
ChannelName string `schema:"channel_name"`
@@ -68,8 +76,7 @@ func main() {
6876
dynamodbstore.WithRegion(os.Getenv("AWS_REGION")),
6977
)
7078
if err != nil {
71-
log.Errorf("error creating store: %v", err)
72-
return nil, err
79+
return nil, fmt.Errorf("error creating store: %v", err)
7380
}
7481

7582
// create eventsource repo
@@ -90,14 +97,12 @@ func main() {
9097
Text: event.Text,
9198
})
9299
if err != nil {
93-
log.Errorf("error dispatching command: %v", err)
94-
return nil, err
100+
return nil, fmt.Errorf("error dispatching command: %v", err)
95101
}
96102

97103
aggregate, err := repo.Load(ctx, id.String())
98104
if err != nil {
99-
log.Errorf("error loading aggregate: %v", err)
100-
return nil, err
105+
return nil, fmt.Errorf("error loading aggregate: %v", err)
101106
}
102107

103108
// Send slack message to `SLACK_STANDUP_CHANNEL`
@@ -106,10 +111,20 @@ func main() {
106111
os.Getenv("SLACK_STANDUP_CHANNEL"),
107112
aggregate.(*standup.Status),
108113
); err != nil {
109-
log.Errorf("error posting to Slack: %v", err)
110-
return nil, err
114+
return nil, fmt.Errorf("error posting to Slack: %v", err)
115+
}
116+
117+
var body string = ""
118+
if event.ChannelID != os.Getenv("SLACK_STANDUP_CHANNEL") {
119+
body = `{"response_type": "ephemeral", "text": "status submitted in #standup channel"}`
111120
}
112121

113-
return nil, nil
122+
return response{
123+
StatusCode: http.StatusOK,
124+
Headers: map[string]string{
125+
"Content-Type": "application/json",
126+
},
127+
Body: body,
128+
}, nil
114129
})
115130
}
Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
# lambda function name (injected by apex)
2-
variable "apex_function_slack_standup" {}
3-
41
# api resource for slack slash commands
52
resource "aws_api_gateway_resource" "Slack" {
63
rest_api_id = "${aws_api_gateway_rest_api.api.id}"

infrastructure/dev/dynamodb.tf

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#
44
resource "aws_dynamodb_table" "status_events_table" {
55
name = "${var.project}-${var.apex_environment}-status-events"
6-
read_capacity = 10
6+
read_capacity = 5
77
write_capacity = 10
88
hash_key = "key"
99
range_key = "partition"
@@ -21,7 +21,7 @@ resource "aws_dynamodb_table" "status_events_table" {
2121
}
2222

2323
tags {
24-
Name = "${var.project}-${var.apex_environment}-dynamodb-table-status_events"
24+
Name = "${var.project}-${var.apex_environment}-dynamodb-table-status-events"
2525
Environment = "${var.apex_environment}"
2626
Project = "${var.project}"
2727
}
@@ -30,10 +30,28 @@ resource "aws_dynamodb_table" "status_events_table" {
3030
#
3131
# dynamodb stream mapping for status_events aggregates
3232
#
33-
//resource "aws_lambda_event_source_mapping" "status_events_source_mapping" {
34-
// batch_size = 1
35-
// event_source_arn = "${aws_dynamodb_table.status_events_table.stream_arn}"
36-
// enabled = true
37-
// function_name = "${var.apex_function_aggregate_status_events}"
38-
// starting_position = "TRIM_HORIZON"
39-
//}
33+
resource "aws_lambda_event_source_mapping" "status_events_source_mapping" {
34+
batch_size = 1
35+
event_source_arn = "${aws_dynamodb_table.status_events_table.stream_arn}"
36+
enabled = true
37+
function_name = "${var.apex_function_aggregate_status_events}"
38+
starting_position = "TRIM_HORIZON"
39+
}
40+
41+
resource "aws_dynamodb_table" "status_aggregates_table" {
42+
name = "${var.project}-${var.apex_environment}-status-aggregates"
43+
read_capacity = 10
44+
write_capacity = 5
45+
hash_key = "id"
46+
47+
attribute {
48+
name = "id"
49+
type = "S"
50+
}
51+
52+
tags {
53+
Name = "${var.project}-${var.apex_environment}-dynamodb-table-status-aggregates"
54+
Environment = "${var.apex_environment}"
55+
Project = "${var.project}"
56+
}
57+
}

infrastructure/dev/main.tf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ variable "apex_environment" {
1313
default = "dev"
1414
}
1515

16+
# lambda function name (injected by apex)
17+
variable "apex_function_slack_standup" {}
18+
variable "apex_function_aggregate_status_events" {}
19+
1620
#
1721
# private subnet ranges
1822
#

infrastructure/dev/outputs.tf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,7 @@ output "vpc_id" {
2626
output "dynamodb_status_events_tablename" {
2727
value = "${aws_dynamodb_table.status_events_table.id}"
2828
}
29+
30+
output "dynamodb_status_aggregates_tablename" {
31+
value = "${aws_dynamodb_table.status_aggregates_table.id}"
32+
}

0 commit comments

Comments
 (0)