Most async iterator libraries currently available are prone to causing memory leaks through normal usage. Repeaters use the following design principles to prevent them:
Repeaters execute lazily
There are several existing async iterator libraries which provide tightly-coupled wrappers around event emitters, streams, or other callback-based APIs. Almost all of them make the critical mistake of registering callbacks eagerly, i.e. when the iterator is created. Consider the following naive async iterator returning function:
function listen(target, name) {const events = [];const nexts = [];function listener(ev) {const next = nexts.shift();if (next == null) {events.push(ev);} else {next({ value: ev, done: false });}};console.log("adding listener!");target.addEventListener(name, listener);return {next() {const ev = events.shift();if (ev == null) {return new Promise((next) => nexts.push(next));}return Promise.resolve({ value: ev, done: false });},return() {nexts.forEach((next) => next({ done: true }));console.log("removing listener!");target.removeEventListener(name, listener);return Promise.resolve({ done: true });},[Symbol.asyncIterator]() {return this;},};}
The listen
function returns an async iterator of events and cleans up after itself when return
is called. However, there is no guarantee that return
will be called in normal usage, causing a memory leak in the form of unremoved event listeners. Consider the following usage of listen
above with an async generator:
async function* positions(clicks) {for await (const c of clicks) {yield {x: c.clientX,y: c.clientY,};}}(async function() {const clicks = listen(window, "click"); // adding listener!const pos = positions(clicks);// never mind we’re not interested in the positions of clicks.pos.return(); // 💭💭💭 clicks.return is never called.})();
The positions
async generator takes an async iterator of click events and yields x/y coordinates. However, because the example code calls pos.return
immediately, the for await…of
loop inside the positions
generator never runs. Consequently, clicks.return
is never called and the event listener registered inside listen
is never cleaned up. To make the code safe, we would have to make sure that either every positions
generator is iterated at least once, or that every listen
iterator is manually returned. This logic is difficult to enforce and indicative of a leaky abstraction in that we have to treat listen
-based async iterators differently than async generator objects, which can be safely created and ignored.
Repeaters solve this problem by executing lazily. In other words, the executor passed to the Repeater
constructor does not run until the first time next
is called. Here’s the same listen
function written with repeaters:
function listen(target, name) {return new Repeater(async (push, _, stop) => {const listener = (ev) => push(ev);console.log("adding listener!");target.addEventListener(name, listener);await stop;console.log("removing listener!");target.removeEventListener(name, listener);});}
If we swap in this repeater-based listen
function for the one above, neither target.addEventListener
nor target.removeEventListener
are called, and the clicks
repeater can be safely ignored.
Because repeaters execute lazily, the contract for safely consuming repeaters is simple: if you call next
, you must call return
. This happens automatically when using for await…of
loops and is easy to enforce when calling next
manually using try/finally
.
Repeaters respond to backpressure
The naive listen
function has an additional, more insidious problem, which is that it pushes events onto an unbounded array. Imagine for instance, using the naive listen
function to create an async iterator which listens for scroll events. It is easy to think of a situation where the rate at which these scroll events are pushed outpaces the rate at which they are pulled from the iterator. In this situation, the events
array created by the naive listen
function would continue to grow unbounded, eventually causing application performance to degrade. This is often referred to as the “fast producer, slow consumer” problem and while it might not seem like a big issue for short-lived browser sessions, it is crucial to deal with when writing long-running server processes with Node.js.
Inspired by Clojure’s core.async
library, repeaters provide three solutions for dealing with slow consumers:
1. Waiting for pushes to resolve
The push
function passed to the executor returns a promise which resolves when next
is called, so that you can write code as follows:
const numbers = new Repeater(async (push, stop) => {console.log("starting...");for (let i = 1; i <= 4; i++) {console.log(`pushing ${i}`);await push(i);}console.log("stopping...");stop();});(async function() {// starting...console.log(await numbers.next());// pushing 1// { value: 1, done: false }console.log(await numbers.next());// pushing 2// { value: 2, done: false }console.log(await numbers.next());// pushing 3// { value: 3, done: false }console.log(await numbers.next());// pushing 4// { value: 4, done: false }// stopping...console.log(await numbers.next());// { done: true }})();
By awaiting push
, code in the executor can wait for values to be consumed and the repeater becomes a simple synchronization mechanism between producers and consumers.
2. Throwing errors
When using callback-based APIs, it is often inconvenient to await push
calls because the callbacks run frequently and synchronously. Therefore, repeaters allow you to call push
in a fire-and-forget manner with the caveat that push
will begin throwing synchronous errors when there are too many pending pushes.
const ys = new Repeater(async (push, stop) => {const listener = () => push(window.scrollY); // ⚠️ Might throw an error!window.addEventListener("scroll", listener);await stop;window.removeEventListener("scroll", listener);});ys.next();
This behavior is desirable because it allows developers to quickly surface bottlenecks and deadlocks when and where they happen, rather than when the process runs out of memory.
3. Buffering and dropping values
If you neither wish to await push
calls nor want to deal with errors, one last option is to have the repeater store values in a buffer and drop them when the buffer reaches capacity. The Repeater
constructor optionally takes a RepeaterBuffer
instance as its second argument. For example, by passing in a SlidingBuffer
instance, we can make it so that the repeater only retains the twenty latest scroll positions. If values are pulled at a slower rate than they are pushed, older values in the buffer are simply dropped.
import { Repeater, SlidingBuffer } from "@repeaterjs/repeater";const ys = new Repeater(async (push, stop) => {const listener = () => push(window.scrollY); // 🙂 will never throwwindow.addEventListener("scroll", listener);await stop;window.removeEventListener("scroll", listener);}, new SlidingBuffer(20));ys.next();
The @repeaterjs/repeater
module exports three buffer classes: FixedBuffer
, DroppingBuffer
and SlidingBuffer
. FixedBuffer
allows repeaters to push a set number of values without pushes waiting or throwing errors, but preserves the waiting/error throwing behavior described above when the buffer is full. Alternatively, DroppingBuffer
drops the latest values when the buffer is full and SlidingBuffer
drops the earliest values. Because DroppingBuffer
and SlidingBuffer
instances never fill up, pushes to repeaters with these buffers never throw overflow errors, and the promises returned from push
always resolve immediately. You can define custom buffer classes to give repeaters more complex buffering behaviors.