A short experiment with async streams

← Back to home

Problem description

The problem was characterized by the following constraints:

  1. There is an async producer that generates inputs to some kind of stream/queue.
  2. We want consumers to consume the stream contents via async generators.
  3. Multiple consumers should be able to consume the contents of this stream. If something passes through the stream it should be visible to all consumers.
  4. We want to spontaneously add consumers to the stream and these consumers should also be able to stop consumption at will.
  5. If the stream has no consumers we want to forget about queued items. And we don’t want them to stick around forever.

Implementation

The first thing that came to my mind here was that it would be helpful to have some kind of Promise that waits on input from a separated function:

type AsyncStreamInput<T> = (value: T) => unknown;

const mkPromiseAndInput = <T>(): [
  Promise<T>,
  AsyncStreamInput<T>
] => {
  // streamInput is assigned before return
  // because Promise executor must be executed before
  let streamInput!: AsyncStreamInput<T>;
  const promise = new Promise<T>((resolve) => {
    streamInput = resolve as AsyncStreamInput<T>;
  });

  return [promise, streamInput];
};

Now if we want to produce a sequence of async values this appears to work well with linked lists. Each cell of our linked list holds a value: T of the type that we want to queue and a Promise that resolves to the next cell once it becomes available.

type AsyncCell<T> = {
  value: T;
  next: Promise<AsyncCell<T>>;
};

Since values are produced asynchronously we cannot start with one, and so instead of having a cell directly we use a promise of an AsyncCell, which also happens to be the next value of a cell itself. Hence we get:

let [
  currentCellPromise,
  resolveCurrentCell,
] = mkPromiseAndInput<AsyncCell<T>>();

Given a currentCellPromise we can construct an async generator streamOutput (constraint 2):

type AsyncStreamOutput<T> = () => AsyncGenerator<
  unknown,
  T,
  unknown
>;

const streamOutput = async function* () {
  let currentCell = await currentCellPromise;
  while (true) {
    yield currentCell.value;
    currentCell = await currentCell.next;
  }
};

A nice property of streamOutput is, that consumers can start consuming the stream at any point. Consumers can also stop consuming by simply discontinuing use of the async generator (constraint 4). From that point onwards consumers will follow values in the linked list (constraint 3). If currentCellPromise is replaced with a different promise this will not effect already working consumers. Replacing currentCellPromise will make previous cells unavailable to later consumers though. Hence once a cell has been visited by all available consumers it can be collected (constraint 5).

Now we can construct the streamInput like this:

const streamInput: AsyncStreamInput<T> = (value: T) => {
  const resolvePreviousCell = resolveCurrentCell;
  [
    currentCellPromise,
    resolveCurrentCell,
  ] = mkPromiseAndInput<AsyncCell<T>>();
  resolvePreviousCell({
    value,
    next: currentCellPromise,
  });
};

Every time an input value occurs we replace the currentCellPromise with a new one. After that the previous Promise can be resolved with a cell that holds the value and points to the new currentCellPromise with it’s next field.

By combining mkPromiseAndInput, streamInput and streamOutput we can construct mkAsyncStream like this:

const mkAsyncStream = <T>(): [
  AsyncStreamOutput<T>,
  AsyncStreamInput<T>
] => {
  let [
    currentCellPromise,
    resolveCurrentCell,
  ] = mkPromiseAndInput<AsyncCell<T>>();

  const streamInput: AsyncStreamInput<T> = (value: T) => {
    const resolvePreviousCell = resolveCurrentCell;
    [
      currentCellPromise,
      resolveCurrentCell,
    ] = mkPromiseAndInput<AsyncCell<T>>();
    resolvePreviousCell({
      value,
      next: currentCellPromise,
    });
  };

  const streamOutput = async function* () {
    let currentCell = await currentCellPromise;
    while (true) {
      yield currentCell.value;
      currentCell = await currentCell.next;
    }
  };

  return [streamOutput, streamInput];
};

The thing that fascinated me here was how usage of a linked list in a garbage collected language can enable desired properties for a problem. Adding and removing consumers work fine with garbage collection simply by asking what data is referenced where. The case for the removal of items from the linked list is similar: Items from the start of the list that cannot be reached by any consumer can be collected.

The code produced here is also available as a gist and on the typescript playground.