Pipelining with core.async

There are many techniques for parallelizing the execution of programs. We’ll be looking at just one in this post - pipelining. I’ve constructed a little idiom you can follow when this design technique is appropriate.

Pipelining is a method that can be applied to tasks that meet two criteria:

1. The task be broken into subtasks.

2. No two subtasks can be executing at the same time.

The Wikipedia article about pipelining as a concept is pretty good. Optionally you can just keep reading, and it’ll probably make more sense if it doesn’t now.

Here are three functions that we chain together. Very straightforward:

There’s no need to parallelize that. Let’s imagine that those functions perform side effects that take a while:

It takes 3.5 seconds to execute this chain of functions.

What if function ‘m’ could be executing in parallel while ‘n’ and ‘o’ are too? More work can be accomplished faster. The only thing we want to avoid is ‘m’ running more than once at a single point in time. The same constraint applies to ‘n’ and ‘o’.

And we see the output here, showing how function ‘m’ can do a lot more work, with function ‘o’ catching up at the end:

It works by making a channel for each function. We then spin up infinitely looping go-blocks that wait for something on the channel. The function is applied to the channel contents, and placed on the next channel. We return the head channel from the pipeline function so we have a reference to the channel to feed values into.

Pull out this template as needed. It’s on GitHub. Tweet at @MichaelDrogalis.

Tags: clojure