The current state and future of asyncio

Andrew Svetlov

@andrew_svetlov
andrew.svetlov@gmail.com
https://asvetlov.github.io/ua-pycon-2018/

Bio

  • Work on Ocean S.A. (https://ocean.io/)
  • aio-libs, aiohttp etc.
  • Python Core Developer

asyncio

Why?

Microservices

Websockets

Non-HTTP transports

Long-running tasks

Basic API

  • async/await
  • create_task()
  • gather()
  • locks
  • queues

Third parties


  async def fetch(session, url):
      async with session.get('http://python.org') as response:
          return await response.text()
	    

asyncio is

A kind of concurrent programming

asyncio task is a lightweight thread

Utilize the knowledge of multithreaded approach

Use locks, events, queues etc

No lowlevel futures

asyncio is not

A way to speed up Django app

Don't run asyncio from WSGI app


def view(request):
    loop = asyncio.get_event_loop()
    tasks = [asyncio.create_task(fetch(url))
             for url in urls]
    loop.run_until_complete(asyncio.wait(tasks))
	    

Don't mix sync and async code

Async entry point


async def main():
    await func()

asyncio.run(main())
	    

Ugly


async def f():
    ...

loop.create_task(f())
loop.run_forever()
	    

Don't invent a wheel


def run(main, *, debug=False):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    loop = events.new_event_loop()
    try:
        events.set_event_loop(loop)
        loop.set_debug(debug)
        return loop.run_until_complete(main)
    finally:
        try:
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            events.set_event_loop(None)
            loop.close()


def _cancel_all_tasks(loop):
    to_cancel = [task for task in tasks.all_tasks(loop)
                 if not task.done()]
    if not to_cancel:
        return

    for task in to_cancel:
        task.cancel()

    loop.run_until_complete(
        tasks.gather(*to_cancel, loop=loop, return_exceptions=True))

    for task in to_cancel:
        if task.cancelled():
            continue
        if task.exception() is not None:
            loop.call_exception_handler({
                'message': 'unhandled exception during asyncio.run() shutdown',
                'exception': task.exception(),
                'task': task,
            })
	    

Common mistakes

Fire-and-forget


async def process(url):
    ...

async def process_all():
    for url in urls:
        asyncio.create_task(process(url))
	    

Problems

  • Errors are not handled
  • Amount of spawned task is not controlled
  • Graceful shutdown is impossible

IO in constructor


class A:
    def __init__(self):
        self.data = fetch(url)
	    

Solution: Factory method


class A:
    def __init__(self):
        self.data = None

    @classmethod
    async def create(cls):
        self = cls()
        self.data = await fetch(url)

a = await A.create()
	    

Resource cleanup


async def fetch(db):
    cursor = await db.execute("SELECT * from tbl")
    ret = []
    async for rec in cursor:
        ret.append(rec)
    return ret
	    

Finalizers

  • __del__
  • await obj.close()
  • async with obj: ...

Timeouts

  • Read
  • Write
  • Close

Task Cancellation: Naive approach


async def handler(request):
    await request.config['db'].execute("UPDATE ...")
    return web.Response(text="OK")
	    

Shielded execution


async def handler(request):
    await asyncio.shield(request.config['db'].execute("UPDATE ..."))
    return web.Response(text="OK")
	    
  • aiojobs
  • armor

No futures

  • Future is for library writers
  • Writing error-less code is hard
  • asyncio had bugs in locks and queues
  • aiohttp had bugs in connection pools

Modern asyncio

async/await everywhere

Context variables


import contextvars
var = contextvars.ContextVar('var', 'default')

async def inner():
    log.debug("User name: %s", var.get())

@routes.get('/{name}')
async def handler(request):
    assert var.get() == 'default'
    var.set(request.match_info['name'])
    await inner()
    ...
	    

Context var usage

  • Logging and tracing
  • DB and request storage

Top-level functions

  • run()
  • create_task()
  • current_task()
  • all_tasks()
  • get_running_loop()

New features

  • SSL upgrade: loop.start_tls()
  • Sendfile support: loop.sendfile()
  • server.serve_forever()

asyncio future plans

Hidden hard work

  • TLS reimplementation
  • Alternative TCP stream

Task groups


async with TaskGroup() as tg:
    tg.create_task(task_1())
    tg.create_task(task_2())
    tg.call_later(5.0, callback)
	    

Questions?

Andrew Svetlov

@andrew_svetlov
andrew.svetlov@gmail.com
http://asvetlov.github.io/ua-pycon-2018/