An Asynchronous Task
Following on from Funsize fun times we're going to make the task run asynchronously. This won't save us CPU time, as the work still needs to be done, but it should save us wall-clock time. We'll be done in fewer minutes, but we'll be doing more work in those minutes.
Anatomy of a partials task
The main loop for work in a funsize task follows the following pseudocode:
for every partial we've been told to build Set up a working directory, including downloading tools like 'mar' and 'mbsdiff' Download the 'from' MAR Verify its signature Unpack the MAR Virus scan the unpacked contents Download the 'to' MAR Verify its signature Unpack the MAR Virus scan the unpacked contents Set up some metadata Generate the partial using 'make_incremental_update.sh' Copy the resulting file to the artifacts area Clean up
Because each task generates multiple partials, we have what's known as an embarrassingly parallel task. Each iteration in that for loop could easily run at the same time, as long as they're given a different working environment.
Starting out
We know we'll be doing web requests, so let's add the 'aiohttp' library to our virtual environment, and get to work.
Let's keep the set-up in 'main', but move the work out into different functions. This will make things easier to read in any case. Setting up an event loop in place of the earlier 'for' loop:
loop = asyncio.get_event_loop() manifest = loop.run_until_complete(async_main(args, signing_certs)) loop.close()
async_main
can now contain the for
loop that will iterate over the partials we want to create, but instead of doing them sequentially, we can create them as futures, and then await
them all. We can move the bulk of the work into a new `manage_partials` function.
async def async_main(args, signing_certs); tasks = [] master_env = WorkEnv() await master_env.setup() for definition in task["extra"]["funsize"]["partials"]: workenv = WorkEnv() await workenv.clone(master_env) tasks.append(asyncio.ensure_future(manage_partial(...all the args...))) manifest = await asyncio.gather(*tasks) master_env.cleanup() return manifest
We could have just called return await asyncio.gather(*tasks)
if we didn't want to clean up the working environments. Instead, since setting up a work environment involves downloading some tools, we optimise a bit by only doing that once and then cloning it locally for each task.
The heavy(ish) lifting
Most of the work is done in called processes, so we need to migrate away from using the sh
module, even if that's very useful. Perhaps in the future sh
can be async-aware.
Thankfully, the asyncio
module has create_subprocess_shell
, which works in a similar way, and is easy to use. We can call that with the command we're given and still set up the process's environment and current working directory, and get the output. The results is the run_command
function.
async def run_command(cmd, cwd='/', env=None, label=None, silent=False): if not env: env = dict() process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=cwd, env=env) stdout, stderr = await process.communicate() await process.wait() if silent: return if not stderr: stderr = "" if not stdout: stdout = "" if label: label = "{}: ".format(label) else: label = '' for line in stdout.splitlines(): log.debug("%s%s", label, line.decode('utf-8')) for line in stderr.splitlines(): log.warn("%s%s", label, line.decode('utf-8'))
We do the same for download functions, and things Just Work.
Task Durations
We've done well with the overall task time, using the same amount of cpu in a shorter space of time. We still have more optimisation to do within the partials generation itself, though.