Published

~8 minutes reading đź•‘

Tue, 01 May 2018

← Back to articles

How to take advantage of AsyncIO to solve an IO Bound / CPU Bound problem?

Do you remember this SD-Card of pictures that you wanted to send by email when you had to resize them one by one so that they fit the 5MB limit?

Do you remember this migration from S3 to OpenStack where you had to download each file from a bucket, extract some metatadata and store them in a database before uploading the file back to the new bucket?

In my case I had to download videos from our NAS and encrypt them for HLS in three sizes with a M3U8 file for each resolution/bitrate couples and a master M3U8 listing the variantes for each video.

Asyncio to the rescue

AsyncIO is a Python3 feature that allow to develop a Python program around the IO loop design pattern.

At first understanding the IO loop design pattern is not such an easy task, mainly because usually computer don't work like that.

CPU bound programs

Computers are super calculators that take parameters from the memory, calculate things and store the result in memory.

When they are busy they heat and the CPU is at 100%.

We call CPU bound programs, the kind of programs that feed the CPU.

We can make such a program faster to run either by adding CPUs or by using faster CPUs.

IO bound programs

As soon as you start working with networks you realize that you often don't have enough data to give to your program to feed your CPU at 100%.

If you watch videos in streaming, you will often have to wait for the video to download, this is because the video bitrate is larger than your network capacity. Your connection doesn't allow you to download the video faster than the player consumes the data.

We call IO bound programs, the kind of programs that are limited by the network bandwith.

But it is also true when accessing a slow device, for instance Hard Drive Disks are quite slow. We say that the computer lags as soon as the memory is full and programs starts to use the HDD to store their parameters and results because they are waiting too much on reading and writting to the HDD.

As soon as the data, your feed your program with, flows slower than the calculation, you have an IO bound software.

IO loop to the rescue

An IO loop based software is a software that will trigger asynchroneous functions that we call coroutines, the call will immediately return a future which is an object that will one day be completed.

The IO loop will have a list of future and as soon as the future is completed it will resume the calculation of the function with its result.

Let say you play with your dog, you will throw a stick. If your dog wants to play, it will go fetch the stick and come back to you (sometimes).

Dog fetching a stick

Depending on how far you've thrown the stick it may take more or less time for your dog to come back but during that time you can answer your cousin question or make sure nothing is burning on the barbecue.

In that case you are working like an IO loop.

  • You started cooking on the barbecue, but you have to wait for a while before turning the meet.
  • In the meantime you've thrown the stick at your dog and you have to wait for a couple of minutes before getting it back
  • In the meantime Mark asked you something and you can also answer that question.

An IO loop helps a software to do multiple things by letting it wait for something while still handling other things at the same time.

So you can always imagine your IO loop having an array of futures and everytime it waits it will go around the list of things and see if one has completed.

If it has completed it will continue where it stopped and eventually start another async tasks to monitor.

Since Python 3.6 we are using the keyword await to wait for an async def function. We call it the async/await feature.

async def get_weather_temperature(city):
    weathercast = await get_weathercast(city)
    return weathercast['temperature']

A consumer / producer model with AsyncIO

The documentation already gives an example about it.

The idea is to have two coroutines:

  • one that will publish to a Queue
  • one that will consume the Queue

As soon as both are done, the program ends.

asyncio.gather let us wait for two coroutines to complete.

import asyncio
import random


async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue):
    while True:
        # wait for an item from the producer
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break

        # process the item
        print('consuming item {}...'.format(item))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())


loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()

Download videos and encode them for HLS

youtube-dl

youtube-dl is a great tool to download videos in hight quality and then watch them offline. It supports an impressive and growing lists of website.

We will use it in our example to download our videos.

FFmpeg

FFmpeg is an OpenSource, powerful and cross-platform solution to handle video files.

You can use it to create M3U8 variants with it, out of the box, HLS support.

FFmpeg already use by default all the power of the computer by spreading the work load on all your CPUs.

asyncio example

We know that downloading videos will take time and we know that encoding them will also take time.

The former is IO bound, because downloading videos will depend on our network connection.

The latter is CPU bound, because encoding videos will depend on our computer processing power.

We will then use AsyncIO to download the videos and to encode them in parallel without one process blocking the other.

Our producer will download videos and put their ID in the queue and our consumer will take the video ID from the Queue to encode it.

How do we run either youtube-dl or ffmpeg from Python?

AsyncIO expose a subprocess module that is really handy because it will run the process on its own as an async task and will mark the future as completed for us when it exited while letting us get the stdout and stderr outputs.

To run youtube-dl we use

import asyncio
import asyncio.subprocess


async def youtube_dl(video_url, output_path):
    args = ['youtube-dl', video_url, '--no-part', '-c',
            '-o', '{}/%(id)s.mp4'.format(output_path)]
    create = asyncio.create_subprocess_exec(*args,
                                            stdout=asyncio.subprocess.PIPE,
                                            stderr=asyncio.subprocess.PIPE)
    proc = await create
    code = await proc.wait()
    if code != 0:
        print("Command '{}' failed.".format(' '.join(args)), file=sys.stderr)
        logs = await proc.stderr.read()
        raise OSError(logs)
    return await proc.stdout.read()

To run FFmpeg we use

import asyncio
import asyncio.subprocess


# mp4: x264 and aac presets
VIDEO_CODEC = "libx264"
AUDIO_CODEC = "aac"

# For file streaming between 6 and 10 is good.
# The longer the better image quality.
# The smaller the quicker to download.
SEGMENT_LENGTH = 6

# infile = 'video.mp4'
# playlist_name = 'video/360.m3u8'
# resolution = '360x360'
# bitrate = '360'
# output_name = 'video/360_%05d.ts'

async def encode_hls(infile, playlist_name, output_name, resolution, bitrate):
    """This is a simplified version of the call to FFmpeg, usually we would do two passes."""
    command_args = ['ffmpeg',
                    '-i', infile,
                    '-y',
                    '-vcodec', VIDEO_CODEC,
                    '-acodec', AUDIO_CODEC,
                    '-flags',
                    '-global_header',
                    '-f', 'segment',
                    '-segment_list', playlist_name,
                    '-segment_time', str(SEGMENT_LENGTH),
                    '-segment_format', 'mpeg_ts',
                    '-vf', 'scale={}'.format(resolution.replace('x', ':')),
                    '-b:v', '{}k'.format(bitrate),
                    output_name]
    create = asyncio.create_subprocess_exec(*command_args,
                                            stdout=asyncio.subprocess.PIPE,
                                            stderr=asyncio.subprocess.PIPE)
    proc = await create
    code = await proc.wait()
    if code != 0:
        print("Command '{}' failed.".format(
            ' '.join(command_args)), file=sys.stderr)
        logs = await proc.stderr.read()
        raise OSError(logs)

    return await proc.stdout.read()

Implementing our producer and consumer

Now we have all the needed information to build our script.

Starting the IO loop

To start with, we create an async def main() function that we will call using the asyncio loop.run_until_complete() function.

import asyncio


async def main(loop, argv):
    return 0


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    return_code = loop.run_until_complete(main(loop, sys.argv[1:]))
    loop.close()
    sys.exit(return_code)

Reading command's arguments

In our main function we might want to read a bit the arguments to find out the list of URLs:

import argparse


async def main(loop, argv):
    parser = argparse.ArgumentParser(description='Download Youtube Videos from a YAML file.')
    parser.add_argument(dest='filepath', help='URLs list file. (An URL per line)')
    parser.add_argument('--download-dir', dest='download_dir', default='downloads',
                        help='The directory where to download videos.')
    parser.add_argument('--bucket-rsync-dir', dest='bucket_dir', default="cdn-bucket/",
                        help='The mounted Storage Bucket. '
                        '(Will be used to detect if the video already exists.)')
    args = parser.parse_args(argv)

    with open(args.filepath, 'r') as f:
        data = f.readlines()
        urls = [url.strip() for url in data]

    print("Download {} videos".format(len(urls)))

    encoding_queue = asyncio.Queue(loop=loop)
    downloader = downloading_producer(args, urls, encoding_queue)
    encoder = encoding_consumer(args, encoding_queue)

    await asyncio.gather(video_producer, downloader, encoder)
    return 0

Producer : Downloading the URLs

We take our list of URLs and for each of them we download the URL and once downloaded we feed the ID to the encoding queue.

import os.path

async def downloading_producer(args, urls, encoding_queue):
    """Download videos from URLs and push the video_id on the encoding_queue once downloaded.
    """
    for video_url in urls
        video_id = parse_video_id(video_url)
        video_dirname = os.path.join(args.bucket_dir, video_id)
        download_filename = os.path.join(args.download_dir, '{}.mp4'.format(video_id))

        print("Download:", video_url, "as", download_filename)
        if not os.path.isdir(video_dirname) and not os.path.exists(download_filename):
             await youtube_dl(video_url, args.download_dir)
        await encoding_queue.put(video_id)

    # All URL have been proceeded
    await encoding_queue.put(None)

Consumer : Encoding the URLs

We listen to the queue and start encoding any URL coming in. We stop as soon as we receive None which is our signal for: end of queue.

import os
from io import StringIO

# https://developer.apple.com/library/content/technotes/tn2224/_index.html
# Figure 3: Recommended Encoding Settings for HLS Media.
BITRATES = (('360x360', '360'),
            ('720x720', '870'),  # 360/(sqrt(2)-1)
            ('1080x1080', '2100'))  # 870/(sqrt(2)-1)

async def encoding_consumer(args, encoding_queue):
    while True:
        video_id = await encoding_queue.get()
        if video_id is None:
            # All videos have been encoded
            break
        await encode_video(args, video_id)

async def encode_video(args, video_id):
    video_path = os.path.join(args.bucket_dir, video_id)
    os.makedirs(video_path, exist_ok=True)

    infile = os.path.join(args.download_dir, '{}.mp4'.format(video_id))
    master_playlist = os.path.join(args.bucket_dir, video_id, 'master.m3u8')
    master_playlist_content = StringIO()
    master_playlist_content.write('#EXTM3U\n')

    for resolution, bitrate in BITRATES:
        playlist_name = os.path.join(args.bucket_dir, video_id, '{}.m3u8'.format(bitrate))
        output_name = os.path.join(args.bucket_dir, video_id, '{}_%05d.ts'.format(bitrate))

        await encode_hls(infile, playlist_name, output_name, resolution, bitrate):

        master_playlist_content.write(
            '#EXT-X-STREAM-INF:BANDWIDTH={}000,RESOLUTION={}\n{}\n'.format(
                bitrate, resolution, os.path.basename(playlist_name)))

    with open(master_playlist, 'w') as f:
        f.write(master_playlist_content.getvalue())

    print("Encoded '{}' - {} seconds".format(infile, round(time() - start_time, 2)))

Conclusion

AsyncIO let us make the most of our computing and networking power. We can parallelize download and encoding without worrying on which one will be quicker.

We don't need to wait for all the video to be downloaded to start encoding and we don't need to wait for everything to be downloaded to start uploading. We could do the later with a second queue and another consumer for instance.

This design pattern is really efficient for numerous cases and you can also use it with a remote Queue such as Redis BLPOP or RabbitMQ to create workers that will never stop; yet as soon as a new video is uploaded to our website the worker can start encoding it.

Picture Credits

Revenir au début