Revisiting async streams

← Back to home
Table of contents

Whats this about again?

Yeah, apparently I’d like to revisit my post on async streams 😅.

Why?

.withResolvers()

Sounds exciting, doesn’t it? Here’s how it works:

// Without Promise.withResolvers:
let resolve, reject;
const promise = new Promise((res, rej) => {
  resolve = res;
  reject = rej;
});

// With Promise.withResolvers:
let { promise, resolve, reject } = Promise.withResolvers()
tangent, MDN docs

I love how this MDN example turns a stream into a Promise. This is almost what I’m interested in here.

async function* readableToAsyncIterable(stream) {
  let { promise, resolve, reject } = Promise.withResolvers();
  stream.on("error", (error) => reject(error));
  stream.on("end", () => resolve());
  stream.on("readable", () => resolve());

  while (stream.readable) {
    await promise;
    let chunk;
    while ((chunk = stream.read())) {
      yield chunk;
    }
    ({ promise, resolve, reject } = Promise.withResolvers());
  }
}

A linked list

This will be our central building block:

type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };

Since we’re interested in async streams the first AsyncCell will also be wrapped in a Promise. After all we may need to wait on the first value.

For a sequence of values like 1,2,3,… we get a structure like this:

A linked list of 4 Promises containing AsyncCells where each AsyncCell references the next Promise.PromiseAsyncCellValue: 1Next:PromiseAsyncCellValue: 2Next:PromiseAsyncCellValue: 3Next:Promise

The last Promise is drawn as an empty box to illustrate a pending Promise.

Now let’s imagine which parts of the structure producer and consumers would reference.

And let’s also throw in the GC for stuff not otherwise referenced.

Same as before we see 4 Promises of AsyncCells arranged in a linked list way. In addition there are Boxes labeled Producer, Consumers and GC, identifying different parts of the queue.PromiseAsyncCellValue: 1Next:PromiseAsyncCellValue: 2Next:PromiseAsyncCellValue: 3Next:PromiseProducerConsumerConsumerConsumerGC

Implications

Sometimes the shape of data can really imply a lot about how things behave.

4 years later I still love this 💖.

Tangent, GC

The fact that this linked list has no cycles suggests that reference counting would also do the trick here.

It suggests to me that this should also be fairly doable with an Arc<T>.

Implementation

For completeness let’s piece together the implementation again:

mkAsyncStream and types

type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
type AsyncStreamOutput<T> = () => AsyncGenerator<unknown, T, unknown>;
type AsyncStreamInput<T> = (value: T) => unknown;

const mkAsyncStream = <T>(): [AsyncStreamOutput<T>, AsyncStreamInput<T>] => {
  let {
    promise: currentCellPromise,
    resolve : resolveCurrentCell
    } = Promise.withResolvers<AsyncCell<T>>()

  // fill in streamOutput, streamInput

  return [streamOutput, streamInput];
};

To bring this machinery to life we now need to fill in implementations for streamInput and streamOutput.

streamInput

streamInput just needs to resolve the old promise and get a new one ready:

const streamInput: AsyncStreamInput<T> = (value: T) => {
  const resolvePreviousCell = resolveCurrentCell;
  ({ promise: currentCellPromise, resolve : resolveCurrentCell}
    = Promise.withResolvers<AsyncCell<T>>())
  resolvePreviousCell({ value, next: currentCellPromise });
};

streamOutput

streamOutput is just an async generator over our list structure:

const streamOutput = async function* () {
  let currentCell = await currentCellPromise;
  while (true) {
    yield currentCell.value;
    currentCell = await currentCell.next;
  }
};

That’s it

To use it:

// Create async stream:
const [streamOutput, streamInput] = mkAsyncStream();

// Start processing values from the stream:
for await (const value of streamOutput()) {
  if (value > 3) {
    // Stop processing:
    break;
  }
}

// Add values to the stream:
[1,2,3].forEach(value => streamInput(value));

Really this is all we need.

Complete code
type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
type AsyncStreamOutput<T> = () => AsyncGenerator<unknown, T, unknown>;
type AsyncStreamInput<T> = (value: T) => unknown;

const mkAsyncStream = <T>(): [AsyncStreamOutput<T>, AsyncStreamInput<T>] => {
  let { promise: currentCellPromise, resolve : resolveCurrentCell} = Promise.withResolvers<AsyncCell<T>>()

  const streamInput: AsyncStreamInput<T> = (value: T) => {
    const resolvePreviousCell = resolveCurrentCell;
    ({ promise: currentCellPromise, resolve : resolveCurrentCell} = Promise.withResolvers<AsyncCell<T>>())
    resolvePreviousCell({ value, next: currentCellPromise });
  };

  const streamOutput = async function* () {
    let currentCell = await currentCellPromise;
    while (true) {
      yield currentCell.value;
      currentCell = await currentCell.next;
    }
  };

  return [streamOutput, streamInput];
};