Skip to content

Commit 35bde95

Browse files
authored
Merge pull request #773 from tobilg/feature/cf-queues
Cloudflare Queues endpoint implementation
2 parents ed481ff + f5b678d commit 35bde95

4 files changed

Lines changed: 158 additions & 3 deletions

File tree

go.mod

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ require (
2323
github.com/tidwall/buntdb v1.2.9
2424
github.com/tidwall/expr v0.13.0
2525
github.com/tidwall/geojson v1.4.5
26-
github.com/tidwall/gjson v1.14.3
26+
github.com/tidwall/gjson v1.18.0
2727
github.com/tidwall/hashmap v1.6.1
2828
github.com/tidwall/limiter v0.4.0
2929
github.com/tidwall/match v1.1.1
30-
github.com/tidwall/pretty v1.2.0
30+
github.com/tidwall/pretty v1.2.1
3131
github.com/tidwall/redbench v0.1.0
3232
github.com/tidwall/redcon v1.4.4
3333
github.com/tidwall/resp v0.1.1
3434
github.com/tidwall/rtree v1.9.2
35-
github.com/tidwall/sjson v1.2.4
35+
github.com/tidwall/sjson v1.2.5
3636
github.com/tidwall/tinylru v1.2.1
3737
github.com/xdg-go/scram v1.1.2
3838
github.com/yuin/gopher-lua v1.1.0
@@ -62,6 +62,7 @@ require (
6262
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
6363
github.com/beorn7/perks v1.0.1 // indirect
6464
github.com/cespare/xxhash/v2 v2.2.0 // indirect
65+
github.com/cloudflare/cloudflare-go/v4 v4.6.0 // indirect
6566
github.com/davecgh/go-spew v1.1.1 // indirect
6667
github.com/devigned/tab v0.1.1 // indirect
6768
github.com/eapache/go-resiliency v1.3.0 // indirect

go.sum

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
112112
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
113113
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
114114
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
115+
github.com/cloudflare/cloudflare-go/v4 v4.6.0 h1:ZaWwXjHFR5NoY8UEf4QFY0g3KTi72kqqEXpajV610/o=
116+
github.com/cloudflare/cloudflare-go/v4 v4.6.0/go.mod h1:XcYpLe7Mf6FN87kXzEWVnJ6z+vskW/k6eUqgqfhFE9k=
115117
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
116118
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
117119
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
@@ -377,8 +379,11 @@ github.com/tidwall/geoindex v1.7.0/go.mod h1:rvVVNEFfkJVWGUdEfU8QaoOg/9zFX0h9ofW
377379
github.com/tidwall/geojson v1.4.5 h1:BFVb5Pr7WZJMqFXy1LVudt5hPEWR3g4uhjk5Ezc3GzA=
378380
github.com/tidwall/geojson v1.4.5/go.mod h1:1cn3UWfSYCJOq53NZoQ9rirdw89+DM0vw+ZOAVvuReg=
379381
github.com/tidwall/gjson v1.12.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
382+
github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
380383
github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw=
381384
github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
385+
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
386+
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
382387
github.com/tidwall/grect v0.1.4 h1:dA3oIgNgWdSspFzn1kS4S/RDpZFLrIxAZOdJKjYapOg=
383388
github.com/tidwall/grect v0.1.4/go.mod h1:9FBsaYRaR0Tcy4UwefBX/UDcDcDy9V5jUcxHzv2jd5Q=
384389
github.com/tidwall/hashmap v1.6.1 h1:FIAHjKwcyOo1Y3/orsQO08floKhInbEX2VQv7CQRNuw=
@@ -391,6 +396,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
391396
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
392397
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
393398
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
399+
github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
400+
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
394401
github.com/tidwall/redbench v0.1.0 h1:UZYUMhwMMObQRq5xU4SA3lmlJRztXzqtushDii+AmPo=
395402
github.com/tidwall/redbench v0.1.0/go.mod h1:zxcRGCq/JcqV48YjK9WxBNJL7JSpMzbLXaHvMcnanKQ=
396403
github.com/tidwall/redcon v1.4.4 h1:N3ZwZx6n5dqNxB3cfmj9D/8zNboFia5FAv1wt+azwyU=
@@ -404,6 +411,8 @@ github.com/tidwall/rtree v1.9.2 h1:6HiSU/bf4a7l2smEC+fEum/WloHMFCIQKWHjahm0Do8=
404411
github.com/tidwall/rtree v1.9.2/go.mod h1:iDJQ9NBRtbfKkzZu02za+mIlaP+bjYPnunbSNidpbCQ=
405412
github.com/tidwall/sjson v1.2.4 h1:cuiLzLnaMeBhRmEv00Lpk3tkYrcxpmbU81tAY4Dw0tc=
406413
github.com/tidwall/sjson v1.2.4/go.mod h1:098SZ494YoMWPmMO6ct4dcFnqxwj9r/gF0Etp19pSNM=
414+
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
415+
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
407416
github.com/tidwall/tinylru v1.2.1 h1:VgBr72c2IEr+V+pCdkPZUwiQ0KJknnWIYbhxAVkYfQk=
408417
github.com/tidwall/tinylru v1.2.1/go.mod h1:9bQnEduwB6inr2Y7AkBP7JPgCkyrhTV/ZpX0oOOpBI4=
409418
github.com/tidwall/tinyqueue v0.1.1 h1:SpNEvEggbpyN5DIReaJ2/1ndroY8iyEGxPYxoSaymYE=

internal/endpoint/cfqueue.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package endpoint
2+
3+
import (
4+
"context"
5+
"sync"
6+
"time"
7+
8+
"github.com/cloudflare/cloudflare-go/v4"
9+
"github.com/cloudflare/cloudflare-go/v4/option"
10+
"github.com/cloudflare/cloudflare-go/v4/queues"
11+
)
12+
13+
const cfqueueExpiresAfter = time.Second * 30
14+
15+
// CFQueueConn is an endpoint connection
16+
type CFQueueConn struct {
17+
mu sync.Mutex
18+
ep Endpoint
19+
client *cloudflare.Client
20+
ex bool
21+
t time.Time
22+
}
23+
24+
// Expired returns true if the connection has expired
25+
func (conn *CFQueueConn) Expired() bool {
26+
conn.mu.Lock()
27+
defer conn.mu.Unlock()
28+
if !conn.ex {
29+
if time.Since(conn.t) > cfqueueExpiresAfter {
30+
conn.close()
31+
conn.ex = true
32+
}
33+
}
34+
return conn.ex
35+
}
36+
37+
// ExpireNow forces the connection to expire
38+
func (conn *CFQueueConn) ExpireNow() {
39+
conn.mu.Lock()
40+
defer conn.mu.Unlock()
41+
conn.close()
42+
conn.ex = true
43+
}
44+
45+
func (conn *CFQueueConn) close() {
46+
if conn.client != nil {
47+
conn.client = nil
48+
}
49+
}
50+
51+
// Send sends a message
52+
func (conn *CFQueueConn) Send(msg string) error {
53+
conn.mu.Lock()
54+
defer conn.mu.Unlock()
55+
56+
if conn.ex {
57+
return errExpired
58+
}
59+
conn.t = time.Now()
60+
61+
// Initialize client if not already done
62+
if conn.client == nil {
63+
conn.client = cloudflare.NewClient(
64+
option.WithAPIToken(conn.ep.CFQueue.APIToken),
65+
)
66+
}
67+
68+
// Push message to CF Queue
69+
_, err := conn.client.Queues.Messages.Push(
70+
context.TODO(),
71+
conn.ep.CFQueue.QueueID,
72+
queues.MessagePushParams{
73+
AccountID: cloudflare.String(conn.ep.CFQueue.AccountID),
74+
Body: queues.MessagePushParamsBodyMqQueueMessageText{
75+
Body: cloudflare.String(msg),
76+
ContentType: cloudflare.F(queues.MessagePushParamsBodyMqQueueMessageTextContentTypeText),
77+
},
78+
},
79+
)
80+
if err != nil {
81+
return err
82+
}
83+
84+
return nil
85+
}
86+
87+
func newCFQueueConn(ep Endpoint) *CFQueueConn {
88+
return &CFQueueConn{
89+
ep: ep,
90+
t: time.Now(),
91+
}
92+
}

internal/endpoint/endpoint.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ const (
4242
NATS = Protocol("nats")
4343
// EventHub protocol
4444
EventHub = Protocol("sb")
45+
// CFQueue protocol
46+
CFQueue = Protocol("cf-queue")
4547
)
4648

4749
// Endpoint represents an endpoint.
@@ -134,6 +136,11 @@ type Endpoint struct {
134136
EventHub struct {
135137
ConnectionString string
136138
}
139+
CFQueue struct {
140+
AccountID string
141+
QueueID string
142+
APIToken string
143+
}
137144
Local struct {
138145
Channel string
139146
}
@@ -241,6 +248,8 @@ func (epc *Manager) Send(endpoint, msg string) error {
241248
conn = newLocalConn(ep, epc.publisher)
242249
case EventHub:
243250
conn = newEventHubConn(ep)
251+
case CFQueue:
252+
conn = newCFQueueConn(ep)
244253
}
245254
epc.conns[endpoint] = conn
246255
}
@@ -298,6 +307,8 @@ func parseEndpoint(s string) (Endpoint, error) {
298307
endpoint.Protocol = NATS
299308
case strings.HasPrefix(s, "Endpoint="):
300309
endpoint.Protocol = EventHub
310+
case strings.HasPrefix(s, "cf-queue:"):
311+
endpoint.Protocol = CFQueue
301312
}
302313

303314
s = s[strings.Index(s, ":")+1:]
@@ -817,6 +828,48 @@ func parseEndpoint(s string) (Endpoint, error) {
817828
endpoint.EventHub.ConnectionString = endpoint.Original
818829
}
819830

831+
// Basic CF Queue connection strings in HOOKS interface
832+
// cf-queue://<account_id>/<queue_id>?token=<api_token>
833+
//
834+
// params are:
835+
//
836+
// token - API token
837+
if endpoint.Protocol == CFQueue {
838+
// Parse account_id/queue_id from the path parts
839+
if len(sp) < 2 {
840+
return endpoint, errors.New("invalid CF Queue format, should be account_id/queue_id")
841+
}
842+
endpoint.CFQueue.AccountID = sp[0]
843+
endpoint.CFQueue.QueueID = sp[1]
844+
845+
// Parse query parameters for API token
846+
if len(sqp) > 1 {
847+
m, err := url.ParseQuery(sqp[1])
848+
if err != nil {
849+
return endpoint, errors.New("invalid CF Queue url")
850+
}
851+
for key, val := range m {
852+
if len(val) == 0 {
853+
continue
854+
}
855+
switch key {
856+
case "token":
857+
endpoint.CFQueue.APIToken = val[0]
858+
}
859+
}
860+
}
861+
862+
if endpoint.CFQueue.AccountID == "" {
863+
return endpoint, errors.New("missing CF Queue account ID")
864+
}
865+
if endpoint.CFQueue.QueueID == "" {
866+
return endpoint, errors.New("missing CF Queue queue ID")
867+
}
868+
if endpoint.CFQueue.APIToken == "" {
869+
return endpoint, errors.New("missing CF Queue API token")
870+
}
871+
}
872+
820873
return endpoint, nil
821874
}
822875

0 commit comments

Comments
 (0)