This repository was archived by the owner on Aug 15, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconvert.go
More file actions
95 lines (76 loc) · 1.9 KB
/
convert.go
File metadata and controls
95 lines (76 loc) · 1.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"fmt"
"os"
"strings"
"time"
"github.com/m-mizutani/rlogs"
"github.com/pkg/errors"
)
type partitionKey struct {
year int
month int
day int
accountID string
}
func newPartitionKey(ts time.Time, flog *FlowLog) partitionKey {
return partitionKey{
year: ts.Year(),
month: int(ts.Month()),
day: ts.Day(),
accountID: flog.AccountID,
}
}
func (x partitionKey) String() string {
return fmt.Sprintf("logs/%s/%04d/%02d/%02d", x.accountID, x.year, x.month, x.day)
}
func (x partitionKey) Date() string {
return fmt.Sprintf("%04d-%02d-%02d", x.year, x.month, x.day)
}
type result struct {
partitionKeys []partitionKey
}
func convert(src, dst S3Location) (*result, error) {
wmap := map[partitionKey]*ParquetFlowLogWriter{}
ch := rlogs.Read(src.S3Region, src.S3Bucket, src.S3Key,
&rlogs.S3GzipLines{}, &VpcFlowLogParser{})
for q := range ch {
if q.Error != nil {
return nil, q.Error
}
flowlog, ok := q.Record.Entity.(*FlowLog)
if !ok {
continue
}
pkey := newPartitionKey(q.Record.Timestamp, flowlog)
w, ok := wmap[pkey]
if !ok {
logger.WithField("pkey", pkey).Info("create a new writer")
newWriter, err := NewParquetFlowLogWriter()
if err != nil {
return nil, err
}
w = &newWriter
wmap[pkey] = w
}
w.Write(flowlog)
}
srcPath := strings.Split(src.S3Key, "/")
srcFileName := srcPath[len(srcPath)-1]
res := result{}
for pkey, w := range wmap {
if err := w.Close(); err != nil {
return nil, errors.Wrap(err, "Fail to close a Parquet file")
}
fd, err := os.Open(w.FileName)
if err != nil {
return nil, errors.Wrap(err, "Fail to close a Parquet file")
}
dstKey := fmt.Sprintf("%s%s/%s.parquet", dst.S3Key, pkey, srcFileName)
if err = uploadS3(dst.S3Region, dst.S3Bucket, dstKey, fd); err != nil {
return nil, err
}
res.partitionKeys = append(res.partitionKeys, pkey)
}
return &res, nil
}