Executing q.all promises in batches

At KoalaSafe, we have a process (let's call it an orderbot), which wakes up each morning, pulls all of our orders from Stripe and Paypal, merges the orders into a common format, then sends them to our fulfilment partner as a CSV.

I wanted to add a step to subscribe them to mailchimp for shipping notifications, which is simple enough, since they provide an API, but it's limited to 10 concurrent connections.

The orderbot is in nodejs, and is based around the idea of tasks than get chained together, adding their results to a state object as they go, which subsequent tasks may or may not use. For example:
node orderbot extract email subscribe --since=last --outputFile=orders.csv --mailto=orders@supplier.com
will extract all the orders to a file, email them to be, and subscribe all the emails in those orders to our mailchimp shipping list.

Based on the tasks passed in, tasks, being a function pointer and the relevant parsed args will be created and then executed.
if (contains_task('extract', argv)) {
chain(merge_charges); chain(extract_charges); } if (contains_task('subscribe', argv)) {
chain(mailchimp.subscribe); } if (contains_task('stats', argv)) {
chain(stats); }

The chain function queues a method for execution.

function chain(func) { var $q = q.defer(); if (!promise_chain) { promise_chain = func($q, state); } else { promise_chain = promise_chain.then(function (state) { return func($q, state) }); } }

Functions have signature function method($q, state) and always return a $q.promise.

The idea is that any new tasks can be added independently of the other tasks and thus avoid the 'async christmas tree'.

async christmas tree

OK, so now it was a simple matter of adding the subscribe task. However, doing a q.all() wouldn't work, as we had to restrict to 10 concurrent connections to mailchimp. Resource limiting isn't a new idea, but I hadn't done it using promises before.

I ended up settling on a pattern where a Task object would contain the function pointer and some state, and then a batching mechanism which would do a q.all on that batch, and then wait for it to resolve before continuing. So, we need a task:

function Task(fn, arg) { this.fn = fn; this.arg = arg; }

Some state

var state = {}; state.in_progress = 0;
state.started = 0;
state.finished = 0;
var tasks = [];
We then add some tasks to execute:

for (var i = 0; i < 17; i++) {
var fn = function hello_async(obj) { return hello(q.defer(), state, obj)}; var task = new Task(fn, i); tasks.push(task); }

And then we execute them using the q_batch function.

var promise = q_batch(q.defer(), tasks, 5);
promise.then(function(data) {
console.log(state.started + " functions started"); console.log(state.finished + " functions finished"); });

And so the point of the post... batching up promises:

function q_batch($q, tasks, batch_size) { //Finished? if (!tasks || tasks.length == 0) { $q.resolve(tasks); console.log("Finished"); return; } //Call the function batch_size times to get the promises var promise_arr = []; for (var i = 0; (i < batch_size) && (i < tasks.length); i++) { try { var task = tasks[i]; console.log("Calling " + task.fn.name + " with " + task.arg); promise_arr.push(task.fn.call(this, task.arg)); } catch (e) { $q.reject("Batch failed. " + e); } } //Execute the batch_size, then go again var promise_batch = q.all(promise_arr); promise_batch.then(function(results) { try { //Remove the first n funcs and go again. tasks.splice(0, batch_size); q_batch($q, tasks, batch_size); } catch (err) { console.error(err.stack ? err.stack : err); } }, function(err) { $q.reject("Batch failed. " + err); }); return $q.promise; }

Given a test function hello to simulate a callback style of function:

function hello($q, state, my_num) { state.in_progress++; state.started++; setTimeout(function(){ console.log("Hello world. " + state.in_progress + " in progress. I am: " + my_num); state.in_progress--; state.finished++; $q.resolve(state); }, Math.random() * 1000); return $q.promise; }

We get this:

node test.js 17 functions to execute
Calling helloasync with 0
Calling hello
async with 1
Calling helloasync with 2
Calling hello
async with 3
Calling helloasync with 4
Hello world. 5 in progress. I am: 0
Hello world. 4 in progress. I am: 4
Hello world. 3 in progress. I am: 1
Hello world. 2 in progress. I am: 3
Hello world. 1 in progress. I am: 2
Calling hello
async with 5
Calling helloasync with 6
Calling hello
async with 7
Calling helloasync with 8
Calling hello
async with 9
Hello world. 5 in progress. I am: 8
Hello world. 4 in progress. I am: 9
Hello world. 3 in progress. I am: 6
Hello world. 2 in progress. I am: 7
Hello world. 1 in progress. I am: 5
Calling helloasync with 10
Calling hello
async with 11
Calling helloasync with 12
Calling hello
async with 13
Calling helloasync with 14
Hello world. 5 in progress. I am: 12
Hello world. 4 in progress. I am: 13
Hello world. 3 in progress. I am: 11
Hello world. 2 in progress. I am: 10
Hello world. 1 in progress. I am: 14
Calling hello
async with 15
Calling hello_async with 16
Hello world. 2 in progress. I am: 15
Hello world. 1 in progress. I am: 16
Finished
17 functions started
17 functions finished

It works. So, we're limiting invocations of hello to 5 at a time, but still using the q style of deferred execution.

It uses recursion, so is not suitable for large amounts of data (ever wondered how big the javascript stack is?), but if you just need to limit the amount of promises being executed at a time, this is one approach.

A self contained gist showing it in action is here: https://gist.github.com/stevenpack/8ed09cfbe886c0ecf785