Skip to content

Tap: subscribe to labelers#1250

Open
Teqed wants to merge 1 commit intobluesky-social:mainfrom
Teqed:tap-labelers
Open

Tap: subscribe to labelers#1250
Teqed wants to merge 1 commit intobluesky-social:mainfrom
Teqed:tap-labelers

Conversation

@Teqed
Copy link

@Teqed Teqed commented Dec 24, 2025

Adds support for using the subscribeLabels endpoint. Allows Tap to cursor through historical and live labels, verify their signatures to the labeler's public key, and emit them as a new event type. Optionally allows Tap to automatically discover labelers when they emit Identity events containing their service URL.

Summary:

Updated README.md to document new labeler endpoints, config vars, and events.

  • POST /labelers/add: add labeler DIDs to subscribe to (triggers backfill of labels)
  • POST /labelers/remove: remove labeler DIDs
  • GET /stats/labeler-count: get total number of tracked labelers
  • TAP_LABELER_PARALLELISM: maximum concurrent labeler subscriptions (default: 1000)
  • TAP_DISCOVER_LABELERS: auto-discover labeler services from identity events (default: false)
{
  "id": 12347,
  "type": "label",
  "label": {
    "live": false, // true if a label was received with a `cts` in the last minute
    "labelerDID": "did:plc:ar7c4by46qjdydhdevvrndac",
    "uri": "at://did:plc:abc123/app.bsky.feed.post/xyz",
    "val": "spam",
    "cts": "2024-10-07T12:00:00.000Z",
    "src": "did:plc:ar7c4by46qjdydhdevvrndac",
    "cid": "bafyrei...",
    "neg": false
  }
}

Updated event_manager.go with AddLabelEvent; similar to AddRecordEvents and AddIdentityEvent but for the new Label event type.

Updated firehose.go to, when configured for discovery, detect new atproto_labeler endpoints off firehose identity events and automatically register new labelers for subscription.

Updated main.go with configuration for max concurrent labelers and whether to auto-discover as described above. According to mackuba.eu there are probably 624 currently-registered labelers out of 5728 total ever registered. I believe the id count here increments whenever a labeler recreates their declaration. Not all 624 of these labelers are actually online, so I chose 1_000 as the default limit for concurrent connections to prevent someone from flooding tap services with unlimited connections to new labelers.

Updated metrics.go with three metrics for label events received total, processed total, and last sequence number of each labeler.

Updated models/models.go with Labeler, a table for subscribed labelers (did, service_url, cursor, state, error_msg, retry_count, retry_after).

Updated resyncer.go similar to firehose.go to, when configured for discovery, add labelers during resync.

Updated server.go with endpoints to add, remove, and count subscribed labelers. Added labeler cursors to cursor endpoint's response.

Added required wiring in tap.go for LabelerManager and whatnot.

Updated types.go with LabelEvt struct used for the new label event.

Created label_processor.go. Similar to firehose connections, this connects to subscribeLabels websockets, counts cursors, and processes the events it receives. Uses the same cursor saving interval configuration as firehose. Uses backoff for reconnections. Uses VerifySignature from atproto/labeling/label.go.

Regarding the TODO on label_processor.go:147; this references another existing TODO on events/consumer.go:268. These two types are functionally identical but the naming is not clear and I wanted to avoid changing that code if I didn't have to.

Created labeler_manager.go. Attempts to reconcile active labelers every 1 second, which means to attempt to resolve, reconnect, or disconnect labelers. Labelers are tracked in a table when added automatically by discovery or manually by endpoint, and the manager periodically enforces these additions/changes/retries/removals.

Reference:

AT Protocol spec for Label Distribution Endpoints
Bluesky's Moderation Architecture: Running a labeler

Updated `README.md` to document new labeler endpoints, config vars, and events.

Updated `event_manager.go` with `AddLabelEvent`; similar to `AddRecordEvents` and `AddIdentityEvent` but for the new Label event type.

Updated `firehose.go` to, when configured for discovery, detect new `atproto_labeler` endpoints off firehose identity events and automatically register new labelers for subscription.

Updated `main.go` with configuration for max concurrent labelers and whether to auto-discover as described above. According to [mackuba.eu](https://blue.mackuba.eu/labellers/) there are probably 624 currently-registered labelers out of 5728 total ever registered. I believe the `id` count here increments whenever a labeler recreates their declaration. Not all 624 of these labelers are actually online, so I chose 1_000 as the default limit for concurrent connections to prevent someone from flooding tap services with unlimited connections to new labelers.

Updated `metrics.go` with three metrics for label events received total, processed total, and last sequence number of each labeler.

Updated `models/models.go` with `Labeler`, a table for subscribed labelers (did, service_url, cursor, state, error_msg, retry_count, retry_after).

Updated `resyncer.go` similar to `firehose.go` to, when configured for discovery, add labelers during resync.

Updated `server.go` with endpoints to add, remove, and count subscribed labelers. Added labeler cursors to cursor endpoint's response.

Added required wiring in `tap.go` for `LabelerManager` and whatnot.

Updated `types.go` with `LabelEvt` struct used for the new label event.

Created `label_processor.go`. Similar to firehose connections, this connects to `subscribeLabels` websockets, counts cursors, and processes the events it receives. Uses the same cursor saving interval configuration as firehose. Uses backoff for reconnections. Uses `VerifySignature` from `atproto/labeling/label.go`.

Regarding the TODO on `label_processor.go:147`; this references another existing TODO on `events/consumer.go:268`. These two types are functionally identical but the naming is not clear and I wanted to avoid changing that code if I didn't have to.

Created `labeler_manager.go`. Attempts to reconcile active labelers every 1 second, which means to attempt to resolve, reconnect, or disconnect labelers. Labelers are tracked in a table when added automatically by discovery or manually by endpoint, and the manager periodically enforces these additions/changes/retries/removals.
@ThisIsMissEm
Copy link
Contributor

You'd probably not want to discover labellers based on the presence of the atproto_labeler but rather based on the presence of a lexicon specific announcement, e.g., app.bsky.labeler.service, such that the labels you subscribe to matter for your lexicon

@Teqed
Copy link
Author

Teqed commented Dec 24, 2025

The app.bsky.labeler.service record can be created independently of the DID document, and in either order, but we need the service URL to connect to the websocket, which we only receive from the DID doc at #atproto_labeler.

Both the labeler record and the DID doc can be updated, and we need to reinitialize connections when and only if the service URL from the DID doc changes.

If you were looking to signal discovery of labelers based off the existence of a lexicon, I expect TAP_SIGNAL_COLLECTION might be sufficient, e.g., you could signal off app.bsky.labeler.service to discover the repos of Bluesky labelers but you could signal off com.example.labeler.service for another lexicon's labelers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants