Skip to content

Tap Go Library#1241

Open
jcalabro wants to merge 19 commits intomainfrom
jc/tap-lib
Open

Tap Go Library#1241
jcalabro wants to merge 19 commits intomainfrom
jc/tap-lib

Conversation

@jcalabro
Copy link
Member

This complements #1170 and bluesky-social/atproto#4290 by creating a new go library for working with the tap websocket.

This explicitly does not create HTTP handler helpers, but the tap.Event type can be used to support the plain HTTP request/response paradigm supported by tap.

It's pretty important that the websocket reconnection and message retry logic be robust, if ever want to say replace inferno with this library, so I went ahead and did what feels like over-engineering, but I don't actually think it is. I would not want to ship this to our prod appview backend without very robust retry logic that mimics what we already do internally (infinite retry on transient processing failures).

There are mocked tests here, and I'm using this library in a new system I'm working on and it appears to be working well (not public yet, but it will be! 👀)

@jcalabro jcalabro requested review from bnewbold and dholms December 22, 2025 14:19
@jcalabro
Copy link
Member Author

Note that I just updated make build to fix the build here. I want to have the tap executable named tap, and I also want a top-level tap package for consumers to use. I figure spitting out the executables in their corresponding cmd directory is a good way to ensure we can do both. Open to feedback though; I personally don't think I would ever use make build for anything other than CI.

}

func (e Event) MarshalJSON() ([]byte, error) {
event := struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the Event type here?

readTimeout time.Duration
writeTimeout time.Duration

handler WebsocketHandlerFunc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something I wondered when working with our firehose processing code is why do we use callbacks/handlers? These seems like a pretty natural use case for channels.

In this particular case, it seems like we miss out on parallel processing of events since we call the handler serially

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also interested in channel-based APIs for stream processing. there are some complexities:

  • error and ACK handling can be simple with callbacks
  • you lose ordering guarantees (eg, per DID) unless you add some sharding or locking mechanism (eg, like the parallel scheduler jaz implemented for firehose)
  • might be some perf overhead around GC? but you get concurrency which seems like it should be an overall win

I know that why generally avoids channel APIs, and I think that is the reason we don't have many. I think we probably want to keep a callback/handler API for most of our packages; i'd just also like the option to use channels. message queing packages (kafka, nsq, nats, etc) have good examples.

Copy link
Contributor

@dholms dholms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice this looks good to me! Thanks for putting this together.

Before merging, we should add at least a barebones README. It could just have the example from doc.go in it. But more importantly, it should link to ./cmd/tap which has the fuller documentation for tap.

I can just see devs looking for tap, coming across this first (since it's in the root dir) and not knowing what to do with it. Actually to that end, maybe we rename this package tap-client or similar?

Copy link
Collaborator

@bnewbold bnewbold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for hacking on this!

I think especially for a package like this (vs a daemon), it is good/fine to do just doc.go, which renders nicely on pkg.go.dev. Might be worth a small README which just links to that, I think the current doc.go could be improved though: link to the tap command README, or even the announcement blog post. Have a section talking about how to do clean shutdown. Maybe talk about some design properties, like how backpressure is expected to work, what kind of performance (evt/sec) is expected with/without ACKs, etc.

I'm not super in to having the Makefile build the binaries in each cmd directory. I'm open to rethinking how it works, but I don't think the current setup is obviously wrong, and think we should discuss it in a separate PR not here.

I think the module should be named tapclient, not tap. I'm kind of ambivalent about all the options of where to put it: top level is fine ok but gets busy; cmd/tap/tapclient isn't idiomatic or accessible but feels a bit tidier; maybe pkg/tapclient but if we do that we should be more consistent. top-level is probably best for now.

I'd be curious to iterate on this more together, but also think this is good and helpful as is and don't think we need to block merging.

// A thin error wrapper that indicates to the tap client consumer loop that a message
// should not be retried (i.e. invalid user input that will surely fail again on retry).
type NonRetryableError struct {
err error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not expose the inner error? I feel like there is probably an idiom around this sort of error wrapper which i'm not remembering.

readTimeout time.Duration
writeTimeout time.Duration

handler WebsocketHandlerFunc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also interested in channel-based APIs for stream processing. there are some complexities:

  • error and ACK handling can be simple with callbacks
  • you lose ordering guarantees (eg, per DID) unless you add some sharding or locking mechanism (eg, like the parallel scheduler jaz implemented for firehose)
  • might be some perf overhead around GC? but you get concurrency which seems like it should be an overall win

I know that why generally avoids channel APIs, and I think that is the reason we don't have many. I think we probably want to keep a callback/handler API for most of our packages; i'd just also like the option to use channels. message queing packages (kafka, nsq, nats, etc) have good examples.

type WebsocketHandlerFunc func(context.Context, *Event) error

// Initializes a tap websocket consumer
func NewWebsocket(addr string, handler WebsocketHandlerFunc, opts ...WebsocketOption) (*Websocket, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love Websocket as the noun in these APIs. Consumer? Channel?

// if WithAcks() is provided.
type WebsocketHandlerFunc func(context.Context, *Event) error

// Initializes a tap websocket consumer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc comment should explain that addr is expected to be a wss:// URL. naming it websocketURL might also help

return ws, nil
}

// Connects to and beings the main tap websocket consumer loop
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"begins"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants