Optimizing resource utilization of an asynchronous pipeline using resource-based throttling

In the previous post, we introduced the concept of creating an asynchronous pipeline using the asyncio module to tackle a computational problem. One of the potential pitfalls of using an asynchronous pipeline for a non-trivial problem is that each call to an asynchronous function results in a new addition to the asynchronous equivalent of the "stack". In a traditional synchronous program, the size of the stack is predefined by the call structure, and heap allocations will generally fall within a pretty well understood range depending on the program structure. We might know for example that a given function will need to create a copy of its input data, but once that function completes, it will discard the extra copy and the memory will be available again for the next stage of processing.

When developing a threaded program, we might have to be more careful, but we still have a good concept of how much memory we might need to tackle a given problem. If we create a thread pool of ten threads, and each one requires a copy of the original data to work on, the total memory required will be on the order of ten times the size of the structure. As long as we can handle that, and the individual threads aren't going too crazy with other allocations, we don't have a problem.

In an asynchronous program, though, if we're dealing with large in-memory data structures, we have to be cognizant of the program flow and how it might push our memory limits. Take the following video processing example.

"""
Mock up a video feed pipeline
"""
import asyncio
import logging
import sys

import cv2

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')
logger.setLevel(logging.INFO)

async def process_video(filename):
    cap = cv2.VideoCapture(filename)
    tasks = list()
    frame_ind = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        tasks.append(asyncio.ensure_future(process_frame(frame, frame_ind)))
        frame_ind += 1
        await asyncio.sleep(0)
    await asyncio.gather(tasks)

async def process_frame(frame, frame_ind):
    logger.info("Processing frame {}".format(frame_ind))
    await asyncio.sleep(15.0)
    logger.info("Finished processing frame {}".format(frame_ind))

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(process_video(sys.argv[1]))
    logger.info("Completed")

if __name__ == '__main__':
    main()

This test case uses OpenCV to open a video file and asynchronously process each frame of the video. The "processing" in this case is slow, mocked up as an asynchronous 15 seconds of background operations.

Before we continue, we've used a couple of new patterns here. The first is the call to ensure_future. Calling a coroutine function (like "process_frame") only returns a coroutine object (the terminology is confusing here, as both the function and the object returned can be casually referred to as a "coroutine"). Creating a coroutine object does not begin or schedule execution of the coroutine. To do that, you must either await the coroutine object or explicitly schedule it, which ensure_future does. In this case we do not want to immediately await  the coroutine since that would result in the pipeline being executed synchronously — read a frame, process it, wait for the result to complete, and then read the next frame. So instead we pass it to ensure_future.

Once we call ensure_future, we then call asyncio.sleep(0). This is a pattern to relinquish control of the pipeline back to the event loop, which will allow the asynchronous processing of the video frame we just read to begin. This pattern might look familiar to multithreaded programmers in say, C++, where an explicit sleep(0) call from a thread might (depending on the platform) cause it to relinquish the remainder of its time slice to another thread that is ready to run. In this case, however, it's absolutely necessary, as there's no other mechanism to relinquish control of the event loop other than to explicitly await another Future. If this call wasn't here, the entire reading of the video file would complete before any processing of the frames would begin. This pattern might seem ugly or unintuitive, but alternatives have been discussed and this one has been explicitly approved by Guido.

The memory problem

Going back to the primary logic of our video processing pipeline, you might already guess what the issue is going to be. Our individual frame processing is very slow, and we have made no attempt to throttle the reading of a video input of arbitrary length and size. If we attempt to run our pipeline on this 4K honey bee video, our memory usage starts to spiral out of control:

The virtual size of the process balloons up to almost 20 gigs of memory until frame processing starts to finish at the 15 second mark. This is already enough to blow out the physical memory constraints of most machines.

Process oriented solution: the asynchronous semaphore

A process oriented solution requires looking at the details of what we're trying to do and setting a reasonable limit on our parallelism. In this case we're primarily motivated by memory utilization, but explicit throttling might have other motivations — for example, when crawling a web site, we may want to explicitly throttle the number of active connections to the server to prevent overwhelming it. The asyncio module provides a Semaphore object to accomplish this.

sem = asyncio.Semaphore(10)

async def process_video(filename):
    cap = cv2.VideoCapture(filename)
    tasks = list()
    frame_ind = 0
    while cap.isOpened():
        await sem.acquire()
        ret, frame = cap.read()
        if not ret:
            break
        tasks.append(asyncio.ensure_future(process_frame(frame, frame_ind)))
        frame_ind += 1
        await asyncio.sleep(0)
    await asyncio.gather(tasks)

async def process_frame(frame, frame_ind):
    logger.info("Processing frame {}".format(frame_ind))
    await asyncio.sleep(15.0)
    logger.info("Finished processing frame {}".format(frame_ind))
    sem.release()

By limiting the number of frames being processed at a time, we've limited the effective memory utiization.

However we've had to explicitly decide on the number of concurrent frames to process. In a more complicated pipeline we might have to make tradeoffs between different asynchronous methods and apply some guesswork as to whether they should share the same semaphore or be separately throttled, and how many concurrent operations should be allowed. If the limiting factor truly is memory, why not focus on those allocations instead?

Resource oriented solution: the asynchronous allocator

If we have well-defined entrypoints where large memory allocations occur, we can marshal those allocations through an asynchronous allocator which will limit our total memory consumption.

class AsyncAllocator(object):
    def __init__(self, max_size):
        self._max_size = max_size
        self._allocated_size = 0
        self._waiting_allocations = deque()

    async def allocate_data(self, size):
        if size > self.max_size:
            raise Exception('Requested size exceeds total allowed allocation size')
        if not self.can_allocate_now(size):
            logger.info("Can't allocate {} bytes right now, waiting for memory to be freed".format(size))
            future = asyncio.Future()
            self._waiting_allocations.append((future, size))
            await future
        self._allocated_size += size

    def deallocate_data(self, array):
        self._allocated_size -= array.nbytes
        new_deque = deque()
        for ind, (future, size) in enumerate(self._waiting_allocations):
            if self.can_allocate_now(size):
                future.set_result(0)
            else:
                new_deque.append((future, size))
        self._waiting_allocations = new_deque

    def can_allocate_now(self, size):
        return self._allocated_size + size <= self._max_size

By plugging our video feed into the allocator, we can set a specific memory limit on tracked allocations without having to worry about explicitly throttling the processing of the video feed.

allocator = AsyncAllocator(2e9)
async def process_video(filename):
    cap = cv2.VideoCapture(filename)
    tasks = list()
    frame_ind = 0
    while cap.isOpened():
        ret, frame = cap.read()
        await allocator.allocate_data(frame.nbytes)
        if not ret:
            break
        tasks.append(asyncio.ensure_future(process_frame(frame, frame_ind)))
        frame_ind += 1
        await asyncio.sleep(0)
    await asyncio.gather(tasks)

async def process_frame(frame, frame_ind):
    logger.info("Processing frame {}".format(frame_ind))
    await asyncio.sleep(15.0)
    logger.info("Finished processing frame {}".format(frame_ind))
    allocator.deallocate_data(frame.nbytes)

For simplicity we've been a little sloppy here, only "allocating" the memory after we've read from the video stream (since we don't know a priori what the size will be, though we could certainly guess/assume after reading the first frame), so we'll actually go a bit above our defined memory limit.

Tracking the virtual size confirms that we've again successfully limited our memory utilization:

Side note: tracking memory utilization

You may have noticed that we set our allocation limit to 2 gigs, but our virtual size topped out at 3.5 gigs. You may have even noticed the discrepancy with the semaphore example — even with 4K video, storing 10 frames should not require anywhere near the 2 gigabytes of memory reported in the virtual size. What happened? Is there a lot more memory allocation happening in the background of opencv that we're missing? Is the numpy array representation significantly larger than suggested by the nbytes attribute? Or is the use of virtual size as an indicator of memory utilization misleading?

Accurate tracking of memory utilization is a another topic for another blog post, but in this case we're mostly being misled by "virtual size". The virtual size attribute (in this case being tracked using ps: "ps -C python -o vsz=") measures the total size of the address space being used by the process, which can include, for example, memory mapped files, which will not in general be fully loaded into physical memory. Using the memory_profiler python module on the memory allocator example gives us a probably more accurate representation of actual memory utilization:

Side note: deadlocks

By introducing some cleverness into our asynchronous pipeline we've also introduced the possibility of deadlock. We've been sloppy in exception handling, which in our semaphore example might lead to the semaphore not being released. And maybe worse, we've introduced the potential for deadlock in our memory allocator even without an unexpected error. While the allocator makes sure we don't attempt to allocate a chunk larger than the max allocation size, it allows us to attempt to allocate two large chunks that, together, exceed the max allocation size. If this second chunk is allocated in the same part of the pipeline of the first one such that the first will not be released until after the second is allocated, our event loop will never complete.

We can work around this by, for example, creating sub-allocator objects that decrease the max allocation size when creating an allocation, but the point to keep in mind is that even though we haven't even introduced multithreading or multiprocessing into this example, we have introduced the potential for deadlocks by going beyond the "async def" and "await" syntax into manual handling of semaphores and futures. Proceed with caution.

Side note: the "async with" syntax

The "async with" syntax introduced in Python 3.5 didn't really fit our model example use case, but it's worth mentioning. Asynchronous context managers can be created using the new __aenter__ and __aexit__ magic methods.

class AsyncArrayTracker(object):
    def __init__(self, array, allocator):
        self._allocator = allocator
        self._array = array
        self._array_copy = None

    async def __aenter__(self):
        await self._allocator.allocate_data(self._array.nbytes)
        self._array_copy = self._array.copy()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._allocator.deallocate_data(self._array.nbytes)
        self._array_copy = None

    @property
    def array(self):
        assert self._array_copy is not None, "Array has not been allocated (use in 'async with' block)"
        return self._array_copy

This helper class creates a way to work on copies of Numpy arrays using the asynchronous allocator we previously developed.

async def run_allocations():
    allocator = AsyncAllocator(100)
    base_array = np.ones(10, dtype=np.float32)  # nbytes = 40
    await asyncio.gather(
        do_work_on_array_copy("Operation 1", base_array, allocator),
        do_work_on_array_copy("Operation 2", base_array, allocator),
        do_work_on_array_copy("Operation 3", base_array, allocator))

async def do_work_on_array_copy(opname, array, allocator):
    logger.info("Attempting to allocate {} bytes for {}".format(array.nbytes, opname))
    async with AsyncArrayTracker(array, allocator):
        logger.info("Allocated {} bytes for {}".format(array.nbytes, opname))
        await asyncio.sleep(3)
        logger.info("Finished working on {}, deallocating {} bytes".format(opname, array.nbytes))
    logger.info("Deallocation on {} finished".format(opname))

Since copying the input array 3 times would require 120 bytes of allocations, and the allocator is set to max out at 100 bytes, we'll only run two concurrent operations.

2015-12-06 14:18:45,912: Attempting to allocate 40 bytes for Operation 1
2015-12-06 14:18:45,912: Allocating 40 bytes
2015-12-06 14:18:45,912: Allocated 40 bytes for Operation 1
2015-12-06 14:18:45,912: Attempting to allocate 40 bytes for Operation 3
2015-12-06 14:18:45,912: Allocating 40 bytes
2015-12-06 14:18:45,912: Allocated 40 bytes for Operation 3
2015-12-06 14:18:45,912: Attempting to allocate 40 bytes for Operation 2
2015-12-06 14:18:45,912: Allocating 40 bytes
2015-12-06 14:18:45,912: Can't allocate 40 bytes right now, waiting for memory to be freed
2015-12-06 14:18:48,916: Finished working on Operation 1, deallocating 40 bytes
2015-12-06 14:18:48,916: Deallocating 40 bytes
2015-12-06 14:18:48,916: Deallocation on Operation 1 finished
2015-12-06 14:18:48,916: Finished working on Operation 3, deallocating 40 bytes
2015-12-06 14:18:48,916: Deallocating 40 bytes
2015-12-06 14:18:48,917: Deallocation on Operation 3 finished
2015-12-06 14:18:48,917: Allocated 40 bytes for Operation 2
2015-12-06 14:18:51,920: Finished working on Operation 2, deallocating 40 bytes
2015-12-06 14:18:51,920: Deallocating 40 bytes
2015-12-06 14:18:51,921: Deallocation on Operation 2 finished

The benefits of asynchronous resource-based throttling

In this example we introduced both process-based and resource-based throttling for asynchronous pipelines. Process based throttling will be familiar to anyone who has worked in high throughput multithreaded applications before, but the use of resource based throttling is frequently more flexible. Process based throttling will generally require some guesswork — given some knowledge about the problem at hand and the kind of machine specifications it will run on, we think we will probably want no more than X concurrent executions of this method. Resource based throttling instead defines the actual known resource requirements and allows the asynchronous pipeline to figure out the rest. This can lead to some especially interesting use cases when we expand the available resources beyond just memory and CPU utilization — for example, by introducing some GPUs into the mix, which have their own separate process and memory limits.

/

Developing a computational pipeline using the asyncio module in Python 3

Asynchronous Programming

"Asynchronous programming" is a vague concept that will likely bring to mind a range of related ideas  multithreading, callbacks, event-driven programming, network I/O, hardware interrupts, etc. Fundamentally, asynchronous programming refers to a separation between the conceptual control flow of a program or subroutine and its actual flow of execution  that there's an input that needs to be processed, but there might be a delay in retrieving that input, and there are other things that might need to be done while we're waiting for that input to be ready.

This concept of "there's more than one thing to be done" is so common in most applications that we tend to think more in terms of the patterns we use for solving it than the actual problem itself. A UI will need to have callbacks to deal with user-triggered events like "button clicked". A background worker will have to have its own thread which (say) schedules itself to review its tasks on a timed basis.

But the usage of callbacks or explicit worker threads quickly gets ugly when attempting to maintain a data processing pipeline. In this context, a set of data might need to be delegated to multiple sub-processes, and only once those processes are finished can the next stage of data processing take place. A serial approach (executing each task one after another) will suffer from a loss of performance by not taking advantage of potential parallelism. A callback approach will have to juggle passing along all data that might be needed further down the pipeline, and furthermore will have to be negotiated by some logical process that knows when all of the data needed for the next step is available. A dedicated thread approach will have to intelligently decide how to delegate tasks to threads  are we okay with creating a new thread for every single subprocess, or could it overwhelm the threading system? If we create a thread pool, are we setting ourselves up for a potential deadlock when we have a bunch of sleeping threads waiting on the result of jobs that are waiting in the queue? Can we properly engineer a solution that will handle the potentially high variation in the number and complexity of jobs that enter the pipeline?

Asynchronous Frameworks and the asyncio Module

The hallmark of an asynchronous framework is that it provides the ability to write a pipeline in a "synchronous-ish" looking way while defining the context-switching points where the framework will re-delegate control to another asynchronous method until the data required to continue is available. In Python 3.5 this is provided with asyncio module and a new type of functions called coroutines, defined using the new "async" and "await" keywords.

Note: In Python 3.4, coroutines were created using a modified form of generators. This is confusing because generators are fundamentally the opposite of coroutines: as explained by David Beazley, generators produce data while coroutines consume data. While it's possible to use the "yield" syntax as a consumer of data and not just a producer, I prefer not to mix the concepts. Any of the following examples can be run in 3.4 however by replacing the "async def" syntax with the @asyncio.coroutine decorator, and the "await <X>" syntax with "yield from <X>".

"""
Trivial async example
"""
import asyncio
import logging

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')
logger.setLevel(logging.INFO)

async def print_after(msg, wait_secs):
    await asyncio.sleep(wait_secs)
    logger.info(msg)

async def test_async():
    await asyncio.gather(
        print_after("One second", 1),
        print_after("Two seconds", 2),
        print_after("Three seconds", 3))
    logger.info("finished")

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_async())

if __name__ == '__main__':
    main()

Output:

[10388]2015-11-22 18:38:50,129: One second
[10388]2015-11-22 18:38:51,130: Two seconds
[10388]2015-11-22 18:38:52,130: Three seconds
[10388]2015-11-22 18:38:52,131: finished

What we've done here is created three asynchronous jobs that print a message after 1, 2, and 3 seconds respectively. The most important thing to note here is that there is no threading involved  the asynchronous behavior is mediated by an event loop which is triggered by an "I/O" event  in this case, just a timed event.

The "gather" statement simply groups together a group of coroutines  "awaiting" on the gather call returns execution (and, if relevant, results) to that asynchronous method once all of those coroutines have finished.

Where does the I/O come in?

I'm not a fan of the name "asyncio" simply because I/O, to most people, implies a very specific set of low-level operations (specifically network I/O and disk I/O) and an asynchronous paradigm, while certainly useful in those cases, is much more versatile. Let's jump straight to a mocked up computational pipeline example.

"""
Demonstration of the asyncio module in Python 3.5

This simulation is composed of three layers, each of which will split up the
data into some different subsets, pass the subsets off to the next layer, wait
for results, and then do some non-trivial processing to return to the previous
layer (in this case, sleeping for a few seconds). The expensive operations are
offloaded to a ThreadPoolExecutor, which maintains a pool of processing
threads, allowing for the utilization of multiple cores (hypothetically).
"""
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')
logger.setLevel(logging.INFO)

executor = ThreadPoolExecutor(max_workers=10)
loop = asyncio.get_event_loop()

def cpu_bound_op(exec_time, *data):
    """
    Simulation of a long-running CPU-bound operation
    :param exec_time: how long this operation will take
    :param data: data to "process" (sum it up)
    :return: the processed result
    """
    logger.info("Running cpu-bound op on {} for {} seconds".format(data, exec_time))
    time.sleep(exec_time)
    return sum(data)

async def process_pipeline(data):
    # just pass the data along to level_a and return the results
    results = await level_a(data)
    return results

async def level_a(data):
    # tweak the data a few different ways and pass them each to level b.
    level_b_inputs = data, 2*data, 3*data
    results = await asyncio.gather(*[level_b(val) for val in level_b_inputs])
    # we've now effectively called level_b(...) three times with three inputs,
    # and (once the await returns) they've all finished, so now we'll take
    # the results and pass them along to our own long-running CPU-bound
    # process via the thread pool.
    # Note the signature of run_in_executor: (executor, func, *args)
    # The third argument and beyond will be passed to cpu_bound_op when it is called.
    result = await loop.run_in_executor(executor, cpu_bound_op, 3, *results)
    # level_a processing is now done, pass back the results
    return result

async def level_b(data):
    # similar to level a
    level_c_inputs = data/2, data/4, data/7
    results = await asyncio.gather(*[level_c(val) for val in level_c_inputs])
    result = await loop.run_in_executor(executor, cpu_bound_op, 2, *results)
    return result

async def level_c(data):
    # final level - queue up the long-running CPU-bound process in the
    # thread pool immediately
    result = await loop.run_in_executor(executor, cpu_bound_op, 1, data)
    return result

def main():
    start_time = time.time()
    result = loop.run_until_complete(process_pipeline(2.5))
    logger.info("Completed ({}) in {} seconds".format(result, time.time() - start_time))

if __name__ == '__main__':
    main()

For this example we've mocked up a three-level pipeline, where each layer is modifying the input data and passing it along in multiple forms to a lower layer. Once the lower layer has finished processing the data, it will then pass off the CPU-heavy computation to a separate (synchronous) function, which might be a numpy/scipy method or a C algorithm (presumably something that releases the GIL so we can take advantage of paraellism). Note that we use time.sleep here intentionally in this mock-up rather than asyncio.sleep because this is meant to simulate an expensive synchronous operation.

In this example, data will flow down quickly to the lowest layer  level_c  where processing will begin.

Once we hit level C, CPU-heavy computation (defined to take 1 second for this layer) will be offloaded to the thread pool, which will return execution to the asynchronous level_c method once completed. Without the thread pool, each invocation of level C would be run sequentially; since our computation is an actual CPU-bound computation and not an asynchronous wait event like an I/O read, we need the thread pool in order to achieve parallelism. As these computations complete, results will be bubbled back up to level B (defined to take 2 seconds), which will again offload work and defer execution until those computations complete, and likewise bubble back up to level A.

Since we have 10 workers available in the thread pool, all 9 of the level C invocations will begin "computing" (i.e., sleeping) immediately, while the 10th slot remains unused since there's nothing for it to do. These will all complete after 1 second, at which point all of the B computations will initiate. Likewise, after 2 seconds, they will all complete and be passed along back up to the A layer (which is defined to take up 3 seconds of CPU time). Output (note the timestamps):

[11416]2015-11-23 00:47:14,407: Running cpu-bound op on (0.7142857142857143,) for 1 seconds
[5676 ]2015-11-23 00:47:14,408: Running cpu-bound op on (1.25,) for 1 seconds
[13120]2015-11-23 00:47:14,408: Running cpu-bound op on (2.5,) for 1 seconds
[13624]2015-11-23 00:47:14,408: Running cpu-bound op on (1.0714285714285714,) for 1 seconds
[8740 ]2015-11-23 00:47:14,408: Running cpu-bound op on (1.875,) for 1 seconds
[14820]2015-11-23 00:47:14,408: Running cpu-bound op on (3.75,) for 1 seconds
[1820 ]2015-11-23 00:47:14,409: Running cpu-bound op on (0.625,) for 1 seconds
[11860]2015-11-23 00:47:14,409: Running cpu-bound op on (1.25,) for 1 seconds
[13124]2015-11-23 00:47:14,409: Running cpu-bound op on (0.35714285714285715,) for 1 seconds
[11416]2015-11-23 00:47:15,409: Running cpu-bound op on (2.5, 1.25, 0.7142857142857143) for 2 seconds
[13120]2015-11-23 00:47:15,409: Running cpu-bound op on (3.75, 1.875, 1.0714285714285714) for 2 seconds
[5676 ]2015-11-23 00:47:15,410: Running cpu-bound op on (1.25, 0.625, 0.35714285714285715) for 2 seconds
[5676 ]2015-11-23 00:47:17,411: Running cpu-bound op on (2.232142857142857, 4.464285714285714, 6.696428571428571) for 3 seconds
[13828]2015-11-23 00:47:20,412: Completed (13.392857142857142) in 6.005390644073486 seconds

As expected, total execution time is 6 seconds (1 second for all of level C, 2 seconds for level B, and 3 seconds for level A).

Well that's neat

The elegant thing about this solution is that all we really needed to know about the problem we want to solve is which parts of the pipeline are the computationally expensive bits we want to offload to a thread pool. Otherwise we can handle just about any computational pipeline with the same pattern  there might be 10 or 20 or 100 layers, or layer A might invoke layer B a hundred times per call, or we might have a case where A only calls B once and B only calls C once. In any case, all of our worker threads are always only being used for actual computations while active (they're never blocking on another thread), we're always using them as much as possible as defined by the pipeline, we're never instantiating more threads than needed, and the overall flow of the pipeline is always being handled by the main (event loop) thread.

Furthermore, we didn't really have to worry about how the main loop figures out which part of the pipeline is ready to go based off of which tasks have completed. For simplicity in this example we made each layer take up a pre-defined amount of time, which leads to a pretty predictable ordering of events, but in reality there might be much greater variation in execution time for each processing event  maybe C6, C1, and C7 finish first, in which case we do not yet have enough data to begin processing anything at layer B. But that's fine  the results from those computations will wait while the other threads continue to chug away, and once enough data has been computed to begin a layer B computation, it will get kicked off immediately (assuming the main event loop isn't busy doing anything else).

In fact perhaps the biggest downside to this architecture is that it allows you to shoot yourself in the foot by trying to maintain too many active portions of the pipeline. Say instead of passing single floats around, we're processing a large N dimensional numpy dataset. We could exhaust our memory resources by jumping around too much in the pipeline while maintaining state for all the different bits we've started processing on, and might have to instead restrict execution at some level. More on that later...

Supplemental Material

David Beazley's slides on coroutine
And his examples
PEP-3156  Asynchronous IO Support Rebooted: the "asyncio" Module
PEP-0492  Coroutines with async and await syntax
Guido on Tulip/asyncio

/