Optimizing resource utilization of an asynchronous pipeline using resource-based throttling

In the previous post, we introduced the concept of creating an asynchronous pipeline using the asyncio module to tackle a computational problem. One of the potential pitfalls of using an asynchronous pipeline for a non-trivial problem is that each call to an asynchronous function results in a new addition to the asynchronous equivalent of the "stack". In a traditional synchronous program, the size of the stack is predefined by the call structure, and heap allocations will generally fall within a pretty well understood range depending on the program structure. We might know for example that a given function will need to create a copy of its input data, but once that function completes, it will discard the extra copy and the memory will be available again for the next stage of processing.

When developing a threaded program, we might have to be more careful, but we still have a good concept of how much memory we might need to tackle a given problem. If we create a thread pool of ten threads, and each one requires a copy of the original data to work on, the total memory required will be on the order of ten times the size of the structure. As long as we can handle that, and the individual threads aren't going too crazy with other allocations, we don't have a problem.

In an asynchronous program, though, if we're dealing with large in-memory data structures, we have to be cognizant of the program flow and how it might push our memory limits. Take the following video processing example.

"""
Mock up a video feed pipeline
"""
import asyncio
import logging
import sys

import cv2

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')
logger.setLevel(logging.INFO)

async def process_video(filename):
    cap = cv2.VideoCapture(filename)
    tasks = list()
    frame_ind = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
        tasks.append(asyncio.ensure_future(process_frame(frame, frame_ind)))
        frame_ind += 1
        await asyncio.sleep(0)
    await asyncio.gather(tasks)

async def process_frame(frame, frame_ind):
    logger.info("Processing frame {}".format(frame_ind))
    await asyncio.sleep(15.0)
    logger.info("Finished processing frame {}".format(frame_ind))

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(process_video(sys.argv[1]))
    logger.info("Completed")

if __name__ == '__main__':
    main()

This test case uses OpenCV to open a video file and asynchronously process each frame of the video. The "processing" in this case is slow, mocked up as an asynchronous 15 seconds of background operations.

Before we continue, we've used a couple of new patterns here. The first is the call to ensure_future. Calling a coroutine function (like "process_frame") only returns a coroutine object (the terminology is confusing here, as both the function and the object returned can be casually referred to as a "coroutine"). Creating a coroutine object does not begin or schedule execution of the coroutine. To do that, you must either await the coroutine object or explicitly schedule it, which ensure_future does. In this case we do not want to immediately await  the coroutine since that would result in the pipeline being executed synchronously — read a frame, process it, wait for the result to complete, and then read the next frame. So instead we pass it to ensure_future.

Once we call ensure_future, we then call asyncio.sleep(0). This is a pattern to relinquish control of the pipeline back to the event loop, which will allow the asynchronous processing of the video frame we just read to begin. This pattern might look familiar to multithreaded programmers in say, C++, where an explicit sleep(0) call from a thread might (depending on the platform) cause it to relinquish the remainder of its time slice to another thread that is ready to run. In this case, however, it's absolutely necessary, as there's no other mechanism to relinquish control of the event loop other than to explicitly await another Future. If this call wasn't here, the entire reading of the video file would complete before any processing of the frames would begin. This pattern might seem ugly or unintuitive, but alternatives have been discussed and this one has been explicitly approved by Guido.

The memory problem

Going back to the primary logic of our video processing pipeline, you might already guess what the issue is going to be. Our individual frame processing is very slow, and we have made no attempt to throttle the reading of a video input of arbitrary length and size. If we attempt to run our pipeline on this 4K honey bee video, our memory usage starts to spiral out of control:

The virtual size of the process balloons up to almost 20 gigs of memory until frame processing starts to finish at the 15 second mark. This is already enough to blow out the physical memory constraints of most machines.

Process oriented solution: the asynchronous semaphore

A process oriented solution requires looking at the details of what we're trying to do and setting a reasonable limit on our parallelism. In this case we're primarily motivated by memory utilization, but explicit throttling might have other motivations — for example, when crawling a web site, we may want to explicitly throttle the number of active connections to the server to prevent overwhelming it. The asyncio module provides a Semaphore object to accomplish this.

sem = asyncio.Semaphore(10)

async def process_video(filename):
    cap = cv2.VideoCapture(filename)
    tasks = list()
    frame_ind = 0
    while cap.isOpened():
        await sem.acquire()
        ret, frame = cap.read()
        if not ret:
            break
        tasks.append(asyncio.ensure_future(process_frame(frame, frame_ind)))
        frame_ind += 1
        await asyncio.sleep(0)
    await asyncio.gather(tasks)

async def process_frame(frame, frame_ind):
    logger.info("Processing frame {}".format(frame_ind))
    await asyncio.sleep(15.0)
    logger.info("Finished processing frame {}".format(frame_ind))
    sem.release()

By limiting the number of frames being processed at a time, we've limited the effective memory utiization.

However we've had to explicitly decide on the number of concurrent frames to process. In a more complicated pipeline we might have to make tradeoffs between different asynchronous methods and apply some guesswork as to whether they should share the same semaphore or be separately throttled, and how many concurrent operations should be allowed. If the limiting factor truly is memory, why not focus on those allocations instead?

Resource oriented solution: the asynchronous allocator

If we have well-defined entrypoints where large memory allocations occur, we can marshal those allocations through an asynchronous allocator which will limit our total memory consumption.

class AsyncAllocator(object):
    def __init__(self, max_size):
        self._max_size = max_size
        self._allocated_size = 0
        self._waiting_allocations = deque()

    async def allocate_data(self, size):
        if size > self.max_size:
            raise Exception('Requested size exceeds total allowed allocation size')
        if not self.can_allocate_now(size):
            logger.info("Can't allocate {} bytes right now, waiting for memory to be freed".format(size))
            future = asyncio.Future()
            self._waiting_allocations.append((future, size))
            await future
        self._allocated_size += size

    def deallocate_data(self, array):
        self._allocated_size -= array.nbytes
        new_deque = deque()
        for ind, (future, size) in enumerate(self._waiting_allocations):
            if self.can_allocate_now(size):
                future.set_result(0)
            else:
                new_deque.append((future, size))
        self._waiting_allocations = new_deque

    def can_allocate_now(self, size):
        return self._allocated_size + size <= self._max_size

By plugging our video feed into the allocator, we can set a specific memory limit on tracked allocations without having to worry about explicitly throttling the processing of the video feed.

allocator = AsyncAllocator(2e9)
async def process_video(filename):
    cap = cv2.VideoCapture(filename)
    tasks = list()
    frame_ind = 0
    while cap.isOpened():
        ret, frame = cap.read()
        await allocator.allocate_data(frame.nbytes)
        if not ret:
            break
        tasks.append(asyncio.ensure_future(process_frame(frame, frame_ind)))
        frame_ind += 1
        await asyncio.sleep(0)
    await asyncio.gather(tasks)

async def process_frame(frame, frame_ind):
    logger.info("Processing frame {}".format(frame_ind))
    await asyncio.sleep(15.0)
    logger.info("Finished processing frame {}".format(frame_ind))
    allocator.deallocate_data(frame.nbytes)

For simplicity we've been a little sloppy here, only "allocating" the memory after we've read from the video stream (since we don't know a priori what the size will be, though we could certainly guess/assume after reading the first frame), so we'll actually go a bit above our defined memory limit.

Tracking the virtual size confirms that we've again successfully limited our memory utilization:

Side note: tracking memory utilization

You may have noticed that we set our allocation limit to 2 gigs, but our virtual size topped out at 3.5 gigs. You may have even noticed the discrepancy with the semaphore example — even with 4K video, storing 10 frames should not require anywhere near the 2 gigabytes of memory reported in the virtual size. What happened? Is there a lot more memory allocation happening in the background of opencv that we're missing? Is the numpy array representation significantly larger than suggested by the nbytes attribute? Or is the use of virtual size as an indicator of memory utilization misleading?

Accurate tracking of memory utilization is a another topic for another blog post, but in this case we're mostly being misled by "virtual size". The virtual size attribute (in this case being tracked using ps: "ps -C python -o vsz=") measures the total size of the address space being used by the process, which can include, for example, memory mapped files, which will not in general be fully loaded into physical memory. Using the memory_profiler python module on the memory allocator example gives us a probably more accurate representation of actual memory utilization:

Side note: deadlocks

By introducing some cleverness into our asynchronous pipeline we've also introduced the possibility of deadlock. We've been sloppy in exception handling, which in our semaphore example might lead to the semaphore not being released. And maybe worse, we've introduced the potential for deadlock in our memory allocator even without an unexpected error. While the allocator makes sure we don't attempt to allocate a chunk larger than the max allocation size, it allows us to attempt to allocate two large chunks that, together, exceed the max allocation size. If this second chunk is allocated in the same part of the pipeline of the first one such that the first will not be released until after the second is allocated, our event loop will never complete.

We can work around this by, for example, creating sub-allocator objects that decrease the max allocation size when creating an allocation, but the point to keep in mind is that even though we haven't even introduced multithreading or multiprocessing into this example, we have introduced the potential for deadlocks by going beyond the "async def" and "await" syntax into manual handling of semaphores and futures. Proceed with caution.

Side note: the "async with" syntax

The "async with" syntax introduced in Python 3.5 didn't really fit our model example use case, but it's worth mentioning. Asynchronous context managers can be created using the new __aenter__ and __aexit__ magic methods.

class AsyncArrayTracker(object):
    def __init__(self, array, allocator):
        self._allocator = allocator
        self._array = array
        self._array_copy = None

    async def __aenter__(self):
        await self._allocator.allocate_data(self._array.nbytes)
        self._array_copy = self._array.copy()

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self._allocator.deallocate_data(self._array.nbytes)
        self._array_copy = None

    @property
    def array(self):
        assert self._array_copy is not None, "Array has not been allocated (use in 'async with' block)"
        return self._array_copy

This helper class creates a way to work on copies of Numpy arrays using the asynchronous allocator we previously developed.

async def run_allocations():
    allocator = AsyncAllocator(100)
    base_array = np.ones(10, dtype=np.float32)  # nbytes = 40
    await asyncio.gather(
        do_work_on_array_copy("Operation 1", base_array, allocator),
        do_work_on_array_copy("Operation 2", base_array, allocator),
        do_work_on_array_copy("Operation 3", base_array, allocator))

async def do_work_on_array_copy(opname, array, allocator):
    logger.info("Attempting to allocate {} bytes for {}".format(array.nbytes, opname))
    async with AsyncArrayTracker(array, allocator):
        logger.info("Allocated {} bytes for {}".format(array.nbytes, opname))
        await asyncio.sleep(3)
        logger.info("Finished working on {}, deallocating {} bytes".format(opname, array.nbytes))
    logger.info("Deallocation on {} finished".format(opname))

Since copying the input array 3 times would require 120 bytes of allocations, and the allocator is set to max out at 100 bytes, we'll only run two concurrent operations.

2015-12-06 14:18:45,912: Attempting to allocate 40 bytes for Operation 1
2015-12-06 14:18:45,912: Allocating 40 bytes
2015-12-06 14:18:45,912: Allocated 40 bytes for Operation 1
2015-12-06 14:18:45,912: Attempting to allocate 40 bytes for Operation 3
2015-12-06 14:18:45,912: Allocating 40 bytes
2015-12-06 14:18:45,912: Allocated 40 bytes for Operation 3
2015-12-06 14:18:45,912: Attempting to allocate 40 bytes for Operation 2
2015-12-06 14:18:45,912: Allocating 40 bytes
2015-12-06 14:18:45,912: Can't allocate 40 bytes right now, waiting for memory to be freed
2015-12-06 14:18:48,916: Finished working on Operation 1, deallocating 40 bytes
2015-12-06 14:18:48,916: Deallocating 40 bytes
2015-12-06 14:18:48,916: Deallocation on Operation 1 finished
2015-12-06 14:18:48,916: Finished working on Operation 3, deallocating 40 bytes
2015-12-06 14:18:48,916: Deallocating 40 bytes
2015-12-06 14:18:48,917: Deallocation on Operation 3 finished
2015-12-06 14:18:48,917: Allocated 40 bytes for Operation 2
2015-12-06 14:18:51,920: Finished working on Operation 2, deallocating 40 bytes
2015-12-06 14:18:51,920: Deallocating 40 bytes
2015-12-06 14:18:51,921: Deallocation on Operation 2 finished

The benefits of asynchronous resource-based throttling

In this example we introduced both process-based and resource-based throttling for asynchronous pipelines. Process based throttling will be familiar to anyone who has worked in high throughput multithreaded applications before, but the use of resource based throttling is frequently more flexible. Process based throttling will generally require some guesswork — given some knowledge about the problem at hand and the kind of machine specifications it will run on, we think we will probably want no more than X concurrent executions of this method. Resource based throttling instead defines the actual known resource requirements and allows the asynchronous pipeline to figure out the rest. This can lead to some especially interesting use cases when we expand the available resources beyond just memory and CPU utilization — for example, by introducing some GPUs into the mix, which have their own separate process and memory limits.

/