Safe aiohttp

Andrew Svetlov

@andrew_svetlov
andrew.svetlov@gmail.com
https://asvetlov.github.io/pycon-ru-2018/

Bio

  • aio-libs, aiohttp etc.
  • Python Core Developer
  • asyncio

async http

Why?

Microservices

Websockets

Non-HTTP transports

Long-running tasks

asyncio is

A kind of concurrent programming

asyncio task is a lightweight thread

Utilize the knowledge of multithreaded approach

Use locks, events, queues etc

No lowlevel API

No futures

  • Future is for library writers
  • Writing error-free code is hard
  • asyncio had bugs in locks and queues
  • aiohttp had bugs in connection pools

aiohttp is not

A way to speed up Django app

Don't run asyncio from WSGI app


def view(request):
    loop = asyncio.get_event_loop()
    tasks = [asyncio.create_task(fetch(url))
             for url in urls]
    loop.run_until_complete(asyncio.wait(tasks))
	    

Don't mix sync and async code

Common mistakes

Fire-and-forget


async def process(url):
    ...

async def process_all():
    for url in urls:
        asyncio.create_task(process(url))
	    

Problems

  • Errors are not handled
  • Amount of spawned task is not controlled
  • Graceful shutdown is impossible

aiojobs

IO in constructor


class Cls:
    def __init__(self):
        self.client = aiohttp.ClientSession()
	    

Solution: Factory method


class Cls:
    def __init__(self):
        self.client = None

    @classmethod
    async def create(cls):
        self = cls()
        self.client = await aiohttp.ClientSession()

obj = await Cls.create()
	    

Resource cleanup


async def fetch(db):
    cursor = await db.execute("SELECT * from tbl")
    ret = []
    async for rec in cursor:
        ret.append(rec)
    return ret  # What happens with the cursor?
	    

Finalizers

  • __del__
  • await obj.close()
  • async with obj: ...

aiohttp client


async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
        body = await resp.text()
	    

Hint: reuse client sessions

Hint2: await session.close()

Timeouts


timeout = aiohttp.ClientTimeout(
    total=60,
    connect=15)

async with session.get(url, timeout=timeout): ...
	    
  • total (5min)
  • connect (None)
  • sock_read (None)
  • sock_connect (None)

WebSockets

Naive websocket


async def handler(request):
    resp = web.WebSocketResponse()
    await resp.prepare(request)
    await resp.send_str('Welcome!')
    async for msg in resp:
        await process(msg.data)
	    

Disconnection


try:
    async for msg in resp:
        # process msg
finally:
    # disconnected
	    

Ping a peer, dude

Task Cancellation: Naive approach


async def handler(request):
    await request.config['db'].execute("UPDATE ...")
    return web.Response(text="OK")
	    

Shielded execution


async def handler(request):
    await asyncio.shield(request.config['db'].execute("UPDATE ..."))
    return web.Response(text="OK")
	    
  • aiojobs
  • armor

Global storage

Singleton and loop lifecycle


from module import db
await db.execute(...)
	    

Problems: testing, configuration etc

app as a storage: init


async def init(app):
    app['config'] = await load_config()
    app['db'] = await create_db(app['config'])
    yield
    await app['db'].close()

app = web.Application()
app.cleanup_ctx.append(init)
	    

Storage usage


async def handler(request):
    await app.config_dict['db'].execute('SELECT 42')
    ...
	    

Stability, Performance and Future

Stability

  • Shrink Public API
  • Deprecate and protect implementation details
  • Forbid wild inheritance and attrs modification
  • Break API again :)

Don't inherit from aiohttp classes


class Bad(aiohttp.ClientSession):
    async def query(self, url):
        ...

class Good:
    def __init__(self):
        self._client = aiohttp.ClientSession()

    async def close(self):
        await self._client.close()

    async def query(self, url):
        async with self._client.get(url) as resp:
            return await self._process(resp)
 
	    

Don't modify an application


async def bad(request):
    request.app.my_attr[key] = value

async def good(request):
    request.config_dict['my_attr'][key] = value
	    

Performance: Why?

Single core

  • aiohttp: 20k RPS
  • Sanic: 30k RPS
  • Vibora 70k ???

300k RPS on multicore?

Future plans

  • Boost up aiohttp/yarl/multidict
  • Drop custom router (no traversal etc)
  • Support HTTP2
  • Rewrite client internals
  • Custom proxies: SOCKS
  • Pluggable client auth: OAUTH etc.

Questions?

Andrew Svetlov

@andrew_svetlov
andrew.svetlov@gmail.com
http://asvetlov.github.io/pycon-ru-2018/