Revisiting async streams
← Back to homeTable of contents
Whats this about again?
Yeah, apparently I’d like to revisit my post on async streams 😅.
Why?
- We’ve got Promise.withResolvers since
- I think I can do better than before 🤔
.withResolvers()
Sounds exciting, doesn’t it? Here’s how it works:
// Without Promise.withResolvers:
let resolve, reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
// With Promise.withResolvers:
let { promise, resolve, reject } = Promise.withResolvers()
tangent, MDN docs
I love how this MDN example turns a stream into a Promise. This is almost what I’m interested in here.
async function* readableToAsyncIterable(stream) {
let { promise, resolve, reject } = Promise.withResolvers();
stream.on("error", (error) => reject(error));
stream.on("end", () => resolve());
stream.on("readable", () => resolve());
while (stream.readable) {
await promise;
let chunk;
while ((chunk = stream.read())) {
yield chunk;
}
({ promise, resolve, reject } = Promise.withResolvers());
}
}A linked list
This will be our central building block:
type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
Since we’re interested in async streams the first AsyncCell will also be wrapped in a Promise.
After all we may need to wait on the first value.
For a sequence of values like 1,2,3,… we get a structure like this:
The last Promise is drawn as an empty box to illustrate a pending Promise.
Now let’s imagine which parts of the structure producer and consumers would reference.
And let’s also throw in the GC for stuff not otherwise referenced.
Implications
- To add a new consumer it is simply enough to start from the Promise the producer is currently pending at.
- To remove a consumer it is enough to no longer reference the stream.
- The progress of consumers is enough to constrain what data remains in memory.
- The producer immediately forgets the current Promise when creating the next.
- So when no consumers are around no data is kept.
- All consumers will see values in the same order, independent of their progress.
Sometimes the shape of data can really imply a lot about how things behave.
4 years later I still love this 💖.
Tangent, GC
The fact that this linked list has no cycles suggests that reference counting would also do the trick here.
It suggests to me that this should also be fairly doable with an Arc<T>.
Implementation
For completeness let’s piece together the implementation again:
mkAsyncStream and types
type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
type AsyncStreamOutput<T> = () => AsyncGenerator<unknown, T, unknown>;
type AsyncStreamInput<T> = (value: T) => unknown;
const mkAsyncStream = <T>(): [AsyncStreamOutput<T>, AsyncStreamInput<T>] => {
let {
promise: currentCellPromise,
resolve : resolveCurrentCell
} = Promise.withResolvers<AsyncCell<T>>()
// fill in streamOutput, streamInput
return [streamOutput, streamInput];
};
To bring this machinery to life we now need to fill in implementations for streamInput and streamOutput.
streamInput
streamInput just needs to resolve the old promise and get a new one ready:
const streamInput: AsyncStreamInput<T> = (value: T) => {
const resolvePreviousCell = resolveCurrentCell;
({ promise: currentCellPromise, resolve : resolveCurrentCell}
= Promise.withResolvers<AsyncCell<T>>())
resolvePreviousCell({ value, next: currentCellPromise });
};
streamOutput
streamOutput is just an async generator over our list structure:
const streamOutput = async function* () {
let currentCell = await currentCellPromise;
while (true) {
yield currentCell.value;
currentCell = await currentCell.next;
}
};
That’s it
To use it:
// Create async stream:
const [streamOutput, streamInput] = mkAsyncStream();
// Start processing values from the stream:
for await (const value of streamOutput()) {
if (value > 3) {
// Stop processing:
break;
}
}
// Add values to the stream:
[1,2,3].forEach(value => streamInput(value));
Really this is all we need.
Complete code
type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
type AsyncStreamOutput<T> = () => AsyncGenerator<unknown, T, unknown>;
type AsyncStreamInput<T> = (value: T) => unknown;
const mkAsyncStream = <T>(): [AsyncStreamOutput<T>, AsyncStreamInput<T>] => {
let { promise: currentCellPromise, resolve : resolveCurrentCell} = Promise.withResolvers<AsyncCell<T>>()
const streamInput: AsyncStreamInput<T> = (value: T) => {
const resolvePreviousCell = resolveCurrentCell;
({ promise: currentCellPromise, resolve : resolveCurrentCell} = Promise.withResolvers<AsyncCell<T>>())
resolvePreviousCell({ value, next: currentCellPromise });
};
const streamOutput = async function* () {
let currentCell = await currentCellPromise;
while (true) {
yield currentCell.value;
currentCell = await currentCell.next;
}
};
return [streamOutput, streamInput];
};