Python asyncio can be quite tricky to work with. We'll look at constructing a very basic pipeline to help with those IO heavy jobs.

Generic processing pipeline diagram
Generic processing pipeline diagram

When dealing with a lot of IO a naive program will idle a lot, waiting on resources. Using Python asyncio a process can continue working on other parts of your program while it's waiting to, say, fetch something from disc. All the asyncio code can quickly clutter the most basic of programs, so I found it helpful to order the program into a clear pipeline if possible.

Here we will look at such a pipeline. Starting out with a few basic examples, and finishing with a more realistic use case.

Note that I've found that asyncio is only of use for IO bottlenecked programs. Technically I think CPU heavy loads can be parallelized as well with ThreadPoolExecutor, but I've never gotten any good results. If you have a CPU heavy load I suggest you wrap the entire program in something else (multiprocessing) or (if possible) switch to a language more suited for the task.

Basic setup

To start out, we'll look at a program that takes a string and changes it to upper case letter by letter.

It consists of

  • Stage A, sends word, letter by letter
  • Stage B, changes letter to uppercase and forwards it
  • Stage C, accumulates result

Here's the code (see gist):

"""
Example program for basic asyncio pipeline.

Program takes string as input and converts it to upper case.

For sake of simplicity missing some "features", most notably error handling is absent.
Errors will silenty prevent program completion in many cases.
"""
import asyncio
from dataclasses import dataclass


@dataclass()
class DataAB:
    letter: str


@dataclass()
class DataBC:
    letter: str
    upper: str


result = ""


async def do_stepA(queue_out, input):
    for letter in input:
        print(f'A, sending {letter}')
        await queue_out.put(DataAB(letter))


async def do_stepB(queue_in, queue_out):
    while True:
        data: DataAB = await queue_in.get()

        # perform actual step
        letter = data.letter
        upper = letter.upper()
        print(f'B, processed {upper}')

        await queue_out.put(DataBC(letter, upper))

        queue_in.task_done()


async def do_stepC(queue_in):
    global result
    while True:
        data: DataBC = await queue_in.get()

        # perform actual step
        letter = data.letter
        upper = data.upper
        print(f'C, {letter} changed to {upper}')

        result += upper

        queue_in.task_done()


async def main():
    pipeline_in = 'hello world'

    print(f'converting to upper case: {pipeline_in}')

    queue_AB = asyncio.Queue()
    queue_BC = asyncio.Queue()

    stepA = asyncio.create_task(do_stepA(queue_AB, pipeline_in))
    stepB = asyncio.create_task(do_stepB(queue_AB, queue_BC))
    stepC = asyncio.create_task(do_stepC(queue_BC))

    await stepA
    print('step A done')

    await queue_AB.join()
    print('queue A - B done')
    stepB.cancel()  # no more date is going to show up at B

    await queue_BC.join()
    print('queue B - C done')
    stepC.cancel()  # no more date is going to show up at C

    print(f'main complete, result: {result}')


asyncio.run(main())
print('program complete')

For the input "hello" this program produces the following output:

converting to upper case: hello
A, sending h
A, sending e
A, sending l
A, sending l
A, sending o
B, processed H
B, processed E
B, processed L
B, processed L
B, processed O
C, h changed to H
C, e changed to E
C, l changed to L
C, l changed to L
C, o changed to O
step A done
queue A - B done
queue B - C done
main complete, result: HELLO
program complete

Note that every stage in te pipeline is serverd by a single task. And note that the input is passed through the pipeline in order.

Waiting

When we add a random sleep to the stages the output is less clean, but still ordered.

async def sleep_about(t: float):
    sleep_s = t + 0.5 * random.random()
    await asyncio.sleep(sleep_s)
    
async def do_stepA(queue_out, input):
    for letter in input:
        await sleep_about(0.1)
        print(f'A, sending {letter}')
        await queue_out.put(DataAB(letter))


async def do_stepB(queue_in, queue_out):
    while True:
        data: DataAB = await queue_in.get()

        # perform actual step
        letter = data.letter
        upper = letter.upper()
        await sleep_about(1)
        print(f'B, processed {upper}')

        await queue_out.put(DataBC(letter, upper))

        queue_in.task_done()

The full program can be found here

For the input "hello" the result now changes between runs, one such output is:

converting to upper case: hello
A, sending h
A, sending e
A, sending l
B, processed H
C, h changed to H
A, sending l
A, sending o
step A done
B, processed E
C, e changed to E
B, processed L
C, l changed to L
B, processed L
C, l changed to L
B, processed O
C, o changed to O
queue A - B done
queue B - C done
main complete, result: HELLO
program complete

Stage level concurrency

These example thus far have had a single task per stage. Even so for heavy IO programs you will likely see a speedup because there is still work available while waiting for IO. In certain cases it is possible to perform multiple of the same IO operations at the same time. Here it may pay of to run multiple tasks for the same stage.

Task B for example can be served by 3 tasks using:

async def main():
    ...
    stepA = asyncio.create_task(do_stepA(queue_AB, pipeline_in))
    stepsB = [
        asyncio.create_task(do_stepB(queue_AB, queue_BC, 1)),
        asyncio.create_task(do_stepB(queue_AB, queue_BC, 2)),
        asyncio.create_task(do_stepB(queue_AB, queue_BC, 3)),
    ]
    stepC = asyncio.create_task(do_stepC(queue_BC))
  
    await queue_AB.join()
    print('queue A - B done')
    for step in stepsB:
        step.cancel()  # no more date is going to show up at B
    
    ...

For the full code see gist.

If a stage execution time can vary, the order is now no longer guaranteed. This can cause garbled output such as:

converting to upper case: hello
A, sending h
A, sending e
A, sending l
A, sending l
A, sending o
step A done
B 1, processed H
C, h changed to H
B 2, processed E
C, e changed to E
B 3, processed L
C, l changed to L
B 2, processed O
C, o changed to O
B 1, processed L
C, l changed to L
queue A - B done
queue B - C done
main complete, result: HELOL
program complete

"Parallel" downloads

One of the main use cases for asyncio is parallel network requests. Network request often have long wait time during which other processing can occur. Furthermore by using asyncio for network requests, multiple requests can run in parallel.

A typical scenario requires fetching a list of ids/objects from a list end point and calling each individual detail end point. The example here fetches the sink rates from an open API of the Amsterdam municipality. The sink rate of meetbouten (building "sink" rate measurement device) is retrieved in the serial and concurrent example.

Stage one gets a list of ids from the list end point:

...
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/?page_size={min(100, N)}')
data = json.loads(r.content)
meetbouten = data.get('results')
ids = [x.get('id') for x in meetbouten[:N]]
...

Next for each id a detail end point is called using:

...
import httpx
client = httpx.AsyncClient()
...

...
id = data.meetbout_id
r = await client.get(f'https://api.data.amsterdam.nl/meetbouten/meetbout/{id}/')
data = json.loads(r.content)
zakkingsnelheid = data.get("zakkingssnelheid")
....

Given that this program is mostly network bound, a large increase in speed is obtained by performing many detail calls in parallel. A quick benchmark yields the following results:

Program Time (in seconds) Speedup
Serial 13.4
Asyncio B concurency 1 13.0 1.03
Asyncio B concurency 2 6.6 2.03
Asyncio B concurency 4 3.4 3.94
Asyncio B concurency 8 1.8 7.44
Asyncio B concurency 16 1.3 10.30

For this example a single task serving the detail fetch stage performs equal to a serial program. Performing the detail fetch concurrently however scales nearly linearly with the number of tasks deticated to the detail stage. The speedup drops off after 8 parallel network calls.

So using asyncio we can get some great performance gains for these high latency IO tasks. And by structuring the code as a pipeline the added logic of asyncio is kept in check.

Further reading and insipration