Drinking from the Bluesky Firehose
I recently built a Bluesky bot called Link Notifier Link Notifier (@linknotifier.bsky.social) Follow me to receive a DM whenever someone posts a link to your website! Made by @jakelazaroff.com bsky.app/profile/linknotifier.bsky.social that sends you a DM whenever someone posts a link to your website. To build it, I had to dig into the Bluesky firehose. That seems like a pretty common entry point for people looking to build on top of Bluesky, so I figured I’d share what I learned.
There are a couple ways to get at the Bluesky firehose:
- Consume it directly. This is pretty complex, involving binary WebSocket messages containing CBOR-encoded Merkle Search Tree blocks. If reading that makes you feel adrift in a sea of jargon, you’re not alone!
- Use Jetstream GitHub - bluesky-social/jetstream: A simplified JSON event stream for AT Proto A simplified JSON event stream for AT Proto. Contribute to bluesky-social/jetstream development by creating an account on GitHub. github.com/bluesky-social/jetstream , a first-party Bluesky service that converts the firehose into normal JSON.1 You can self-host it if you want, but Bluesky provides official instances that you can connect to.
You might be tempted — as I was at first — to avoid all the gory details and just reach for a library like @skyware/jetstream
@skyware/jetstream A fully typed client for the Bluesky Jetstream (https://github.com/bluesky-social/jetstream) service.. Latest version: 0.2.0, last published: a month ago. Start using @skyware/jetstream in your project by running `npm i @skyware/jetstream`. There are no other projects in the npm registry using @skyware/jetstream. www.npmjs.com/package/@skyware/jetstream .
Don’t be intimidated! The Jetstream API is actually remarkably simple, and you can easily consume it without adding a dependency to your project.
Here’s a small example running in the browser that consumes the Bluesky Jetstream: a web component that shows the latest post every second. (This is totally unfiltered; I’m sorry if anything unsavory shows up here.)
The full code of this component is less than 40 lines — including the templating and all the web component boilerplate! The code that reads from the Jetstream takes up about six. There are no dependencies outside of the browser’s standard library.
Before we look at any code, though, let’s take a quick detour through the AT Protocol and Jetstream API.
The Jetstream API
Jetstream is a WebSocket server: we connect via a WebSocket connection, and it sends events as WebSocket messages encoded in JSON. You can host a Jetstream instance yourself, but as of today Bluesky hosts official instances that you can use without authentication.
The connection string for a Jetstream instance looks like this:
wss://jetstream2.us-west.bsky.network/subscribe
Once you’re connected, Jetstream will start sending events. They look like this:
{
"did": "did:plc:eygmaihciaxprqvxpfvl6flk",
"time_us": 1725911162329308,
"kind": "commit",
"commit": {
"rev": "3l3qo2vutsw2b",
"operation": "create",
"collection": "app.bsky.feed.like",
"rkey": "3l3qo2vuowo2b",
"record": {
"$type": "app.bsky.feed.like",
"createdAt": "2024-09-09T19:46:02.102Z",
"subject": {
"cid": "bafyreidc6sydkkbchcyg62v77wbhzvb2mvytlmsychqgwf2xojjtirmzj4",
"uri": "at://did:plc:wa7b35aakoll7hugkrjtf3xf/app.bsky.feed.post/3l3pte3p2e325"
}
},
"cid": "bafyreidwaivazkwu67xztlmuobx35hs2lnfh3kolmgfmucldvhd3sgzcqi"
}
}
That’s the full event of a post being liked.
It’s pretty dense! There are bunch of terms like “collection” and “did” that are idiosyncratic to AT Protocol. Most of them can be found in the glossary Glossary of terms - AT Protocol A collection of terminology used in the AT Protocol and their definitions. atproto.com/guides/glossary , but I’ll try to define them in my own words as they come up as well.
In AT Protocol, everything a user does is found in a repo.
Each repo has a DID: a Decentralized ID that uniquely identifies it.
The did
property at the root of the event object is a reference to the repo of the user who took the action (in this case, the user who liked the post).
The kind
property disambiguates between three types of events:
commit
for events that create, update or delete something in a repo.identity
for events that describe some change to the repo itself (not quite sure which — I assume changing a handle would be one example).account
for events that describe a change in account status (e.g. from “active” to “deactivated”).
For our purposes, we’re only worried about commit
events.
Those events all have a nested commit
object with an operation
property: create
, update
or delete
.
I’ll let you guess what those mean.
Each commit object also has a collection
property.
This is way to “group” events across repos.
For example, to listen to all new posts, we’d ignore all events in collections other than app.bsky.feed.post
.
If we do all the filtering in the client, we’d be receiving a ton of data we don’t need.
Jetstream provides a way to avoid this: append a wantedCollections
query string parameter to the connection string.2
Say we’re only interested in new posts and likes. We’d connect to this long URL:
wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.like
That wouldn’t absolve us of the need to filter on the client — we’d still need to branch between new posts and likes within our app — but it would prevent us from sifting through a ton of other events we don’t care about.
We can also use asterisks as “wildcards” to filter through multiple collections at once.
For example, to get events in all feed collections, we’d set wantedCollections
to app.bsky.feed.*
.
On create
and update
commits, the record
is the “contents” of it — either the thing that was just created, or the thing with which to replace the previous record.
As a reminder, here’s what the record looked like in the example event:
{
"$type": "app.bsky.feed.like",
"createdAt": "2024-09-09T19:46:02.102Z",
"subject": {
"cid": "bafyreidc6sydkkbchcyg62v77wbhzvb2mvytlmsychqgwf2xojjtirmzj4",
"uri": "at://did:plc:wa7b35aakoll7hugkrjtf3xf/app.bsky.feed.post/3l3pte3p2e325"
}
}
And here’s what a record might look like for posts:
{
"$type": "app.bsky.feed.post",
"text": "Hello World!",
"createdAt": "2023-08-07T05:31:12.156888Z"
}
This is a minimal example; Bluesky’s documentation Posts | Bluesky This is an in-depth dive into how creating a post works on Bluesky. We'll use Python below, without a SDK, so you can see how it works behind the scenes. docs.bsky.app/docs/advanced-guides/posts details how to handle links, quotes and so forth.
Notice that in some cases (such as the text
property of a post record) the record itself contains information, while in others (such as the subject
property of a like record) it contains references to other records.
The Client
The simplest possible Jetstream client looks something like this:
const jetstream = new WebSocket("wss://jetstream1.us-east.bsky.network/subscribe");
jetstream.onmessage = e => console.log(e.data);
Voilà: two lines of code and every event from the Bluesky firehose gets logged to the console!
With a little elbow grease, we can come up with something a little more ergonomic.
Let’s write a client that mimics the @skyware/jetstream
API:
const jetstream = new Jetstream();
jetstream.onCreate("app.bsky.graph.follow", event => {
// ...
});
jetstream.onDelete("app.bsky.feed.post", event => {
// ...
});
jetstream.start();
This is still a pretty simple client that doesn’t cover everything we might ever want to do with Jetstream, but it’s more than enough to get us started.
We’ll start by writing a Jetstream
class:
class Jetstream {
endpoint = "jetstream1.us-east.bsky.network";
emitters = new Map<string, EventTarget>();
ws?: WebSocket;
constructor(options: { endpoint?: string } = {}) {
this.endpoint = options.endpoint ?? this.endpoint;
}
}
By default, our client connects to the Bluesky-hosted Jetstream instance at jetstream1.us-east.bsky.network
.
The user can override that by passing an endpoint into the constructor.
We also see two additional members:
emitters
, which holds a map ofEventTarget
s keyed by the collection names.ws
, which will hold the WebSocket client when we connect to the Jetstream instance.
First, we’ll write a private #listen
method that calls an event listener when the client receives an in a given collection with a specific operation:
class Jetstream {
// ...
#listen(collection: string, operation: string, listener: (event: unknown) => void) {
const emitter = this.emitters.get(collection) || new EventTarget();
this.emitters.set(collection, emitter);
emitter.addEventListener(operation, event => listener(event.detail));
}
}
It gets an EventTarget
from the map at the given collection key — creating one if it doesn’t exist — and attaches an event listener for events matching the given commit operation.3
When we’re dispatching the events later, we’ll use CustomEvent
s, which allow you to include arbitrary data in their detail
property.
Since the use of CustomEvent
s is an implementation detail, we’ll just pass that property to the listener, rather than the whole event.
From here, we can make public wrapper methods for each of those commit operations:
class Jetstream {
// ...
onCreate(collection: string, listener: (event: unknown) => void) {
this.#listen(collection, "create", listener);
}
onUpdate(collection: string, listener: (event: unknown) => void) {
this.#listen(collection, "update", listener);
}
onDelete(collection: string, listener: (event: unknown) => void) {
this.#listen(collection, "delete", listener);
}
}
These don’t really do much other than make that #listen
method slightly more convenient to use.
Next, let’s take a look at the start
method:
class Jetstream {
// ...
start() {
if (this.ws) this.ws.close();
this.ws = new WebSocket(this.url);
this.ws.onmessage = ev => {
const data = JSON.parse(ev.data);
if (data.kind !== "commit") return;
const emitter = this.emitters.get(data.commit.collection);
if (!emitter) return;
emitter.dispatch(new CustomEvent(data.commit.operation, { detail: data }));
};
}
}
This looks pretty familiar: it’s a thin abstraction over the barebones client we saw earlier.
- First, if there’s already an open WebSocket connection, close it.
- Next, set up a new WebSocket connection at the appropriate URL.
- When we receive a message, parse the data into JSON.
- Discard any non-
commit
events. - Get the event emitter corresponding to the event’s collection.
- Dispatch the event using the commit operation as a key.
Sharp-eyed readers might notice that we haven’t defined the class’s url
member yet:
class Jetstream {
// ...
get url() {
const url = new URL(`wss://${this.endpoint}/subscribe`);
for (const collection of this.emitters.keys()) {
url.searchParams.append("wantedCollections", collection);
}
return url.toString();
}
}
It’s a getter that constructs the WebSocket URL, adding wantedCollections
query string parameters for any collections in which we’re listening for events.
That way, we’ll only receive the slice of the Jetstream containing the collections we care about.
For posterity, here’s the full code:
class Jetstream {
endpoint = "jetstream1.us-east.bsky.network";
emitters = new Map<string, EventTaret>();
ws?: WebSocket;
get url() {
const url = new URL(`wss://${this.endpoint}/subscribe`);
for (const collection of this.emitters.keys()) {
url.searchParams.append("wantedCollections", collection);
}
return url.toString();
}
constructor(options: { endpoint?: string } = {}) {
this.endpoint = options.endpoint ?? this.endpoint;
}
#listen(collection: string, operation: string, listener: (event: unknown) => void) {
const emitter = this.emitters.get(collection) || new EventTarget();
this.emitters.set(collection, emitter);
emitter.addEventListener(operation, listener);
}
onCreate(collection: string, listener: (event: unknown) => void) {
this.#listen(collection, "create", listener);
}
onUpdate(collection: string, listener: (event: unknown) => void) {
this.#listen(collection, "update", listener);
}
onDelete(collection: string, listener: (event: unknown) => void) {
this.#listen(collection, "delete", listener);
}
start() {
if (this.ws) this.ws.close();
this.ws = new WebSocket(this.url);
this.ws.onmessage = ev => {
const data = JSON.parse(ev.data);
if (data.kind !== "commit") return;
const emitter = this.emitters.get(data.commit.collection);
if (!emitter) return;
emitter.dispatch(new CustomEvent(data.commit.operation, { detail: data }));
};
}
}
40 lines of code and we’ve replicated a significant portion of @skyware/jetstream
!
Use it, modify it and make something cool.
Footnotes
-
There’s an official blog post Introducing Jetstream | Bluesky One of most popular aspects of atproto for developers is the firehose: an aggregated stream of all the public data updates in the network. Independent developers have used the firehose to build real-time monitoring tools (like Firesky), feed generators, labeling services, bots, entire applications, and more. docs.bsky.app/blog/jetstream announcing it, as well as a more in-depth explanation on the original author Jaz’s blog Jetstream: Shrinking the AT Proto Firehose by >99% Jetstream is a streaming service that consumes an AT Proto Sync Firehose and converts it into lightweight, filterable, friendly JSON, allowing us to live tail all posts on Bluesky for as little as ~850 MB/day jazco.dev/2024/09/24/jetstream/ . Both are worth reading, but not required to understand the rest of this article. ↩
-
There are a bunch of other options as well. For instance, you can have Jetstream only send events for specific repos by including a
wantedDids
query string parameter. All the options are listed in the GitHub readme. ↩ -
Why does the listener take an event of type
unknown
? Technically, Jetstream could send anything over the wire!@skyware/jetstream
provides a typed event definition, but they just cast the type than actually checking that it’s correct. If you want type safety, you should parse the incoming events using a library like Valibot Valibot: The modular and type safe schema library Validate unknown data with Valibot, the open source schema library with bundle size, type safety and developer experience in mind. valibot.dev . ↩