Introduction into aiohttp

Andrew Svetlov

andrew.svetlov@gmail.com

Bio

  • Using Python for more than 15 years
  • Python Core Developer since 2012
  • asyncio co-author
  • aiohttp maintainer
  • Providing dozen libraries under aio-libs umbrella

Poll

  • Who use Python 3?
  • Develop WEB sites with classic WSGI approach?
  • Using async frameworks (asyncio/tornado/twisted)?

Why?

  • 1,000 OS native threads
  • 1,000,000 lightweight tasks

aiohttp -- asyncio-based web

  • Client API
  • Server
  • Persistent connections
  • Websockets

3 years long history

  • Extracted from asyncio (former tulip)
  • 21 releases so far
  • 2800+ commits
  • 100+ contributors

Client API

Requests


import requests

r = requests.get('https://api.github.com/user',
                 auth=('user', 'pass'))
print(r.status_code)
print(r.headers['content-type'])
print(r.text)
                  

aiohttp


import aiohttp

async def coro():
    r = await aiohttp.get('https://api.github.com/user',
                          auth=aiohttp.BasicAuth('user', 'pass'))
    print(r.status)
    print(r.headers['content-type'])
    print(await r.text())
    r.close()
                  

Rule of thumb for coroutines

  1. Coroutine is an async def function
  2. Call a coroutine with await
  3. If a function contains awaits -- make it coroutine

                    
async def func():
    await asyncio.sleep(1)
                    
                  

Requests with session


session = requests.Session()

r = session.get(url)
                  

aiohttp with session


session = aiohttp.Session()

async def coro(session):
    async with session.get(url) as r:
        print(r.status)
        print(await r.text())
                  

Timeouts


async def coro(session):
    with aiohttp.Timeout(1.5):
        async with session.get(url) as r:
            ...
                  

Websockets

                    
async with client.ws_connect(
        'http://websocket-server.org/endpoint') as ws:

    async for msg in ws:
        if msg.data == 'close':
            await ws.close()
            break
        else:
            ws.send_str("Answer on " + msg.data)
                    
                  

Server

Django

                    
from django.conf.urls import url
from django.http import HttpResponse


def index(request):
    return HttpResponse("Hello, world")

urlpatterns = [
    url(r'^$', index),
]
                    
                  

aiohttp

                    
from aiohttp import web

async def index(request):
    return web.Response(text="Hello, world")

app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
web.run_app(app)
                    
                  

Tornado

                    
import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

app = tornado.web.Application([
    (r"/", MainHandler)])

app.listen(8888)
tornado.ioloop.IOLoop.current().start()
                    
                  

Database init

                    
import sqlalchemy as sa
from aiopg.sa import create_engine

metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))
app['db'] = await create_engine(...)

async def close_db(app):
    app['db'].close()
    await app['db'].wait_closed()
app.on_cleanup.append(close_db)
                    
                  

Database usage

                    
async def handler(request):
    txt = ""
    async with request.app['db'].acquire() as conn:
        await conn.execute(tbl.insert().values(val='abc'))

        async for row in conn.execute(tbl.select()):
            txt += "{}: {}\n".format(row.id, row.val)
    return web.Response(text=txt)
                    
                  

Serverside websockets

                    
async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.data == 'close':
            await ws.close()
            break
    else:
        ws.send_str(msg.data + '/answer')

    return ws
                    
                  

Advanced techniques

Request lifecicle and middlewares

Server-side sessions

                    
from aiohttp_session import get_session

async def hander(request):
    session = await get_session(request)
    session['key'] = 'value'
    return web.Response()
                    
                  

Debug Toolbar

asyncio under the hood

Selector

EVENT_READ/EVENT_WRITE

  • register(fd, events, data=None)
  • unregister(fd)
  • select(timeout=None)

select, poll, epoll, kqueue

Event loop

  • add_reader(fd, callback, *args)
  • remove_reader(fd)
  • add_writer(fd, callback, *args)
  • remove_writer(fd)

  • call_soon(callback, *args)
  • call_later(delay, callback, *args)

event loop iteration

                    
def run_forever():
    while True:
        events = selector.select(timeout)
        for key, mask in events:
            if mask & EVENT_READ:
                key.reader(key.fileobj)
            if mask & EVENT_WRITE:
                key.writer(key.fileobj)
        for timer in ready:
            timer.run()
                    
                  

socket buffers

timers

tips and tricks

Development cycle

  • Use single process for dev environment
  • Make test run easy
  • Deploy separately on different processes/containers/nodes

PYTHONASYNCIODEBUG=1

                    
async def f():
    fut = asyncio.Future()
    fut.set_exception(RuntimeError())
    del fut
                    
                  
                    
...
ERROR:asyncio:Future exception was never retrieved
future: Future finished exception=RuntimeError()
RuntimeError
                    
                  
                    
$ PYTHONASYNCIODEBUG=x python myapp.py
                    
                  
                    
ERROR:asyncio:Future exception was never retrieved
future: Future finished exception=RuntimeError() created at filename.py:10
source_traceback: Object created at (most recent call last):
...
File "filename.py", line 10, in f
fut = asyncio.Future()
RuntimeError
                    
                  

Explicit loop

                    
def request(method, url, *, loop=None):
    ...

loop = asyncio.get_event_loop()
await request('GET', 'http://python.org', loop=loop)
                    
                  

Testing

                    
class Test(unittest.TestCase):

    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(None)

    def tearDown(self):
        self.loop.close()

    def test_func(self):
        async def go():
            self.assertEqual(1, await func(loop=loop))

        self.loop.run_until_complete(go())
                    
                  

Performance

Benchmark numbers are equal to Tornado and Twisted

Good enough for highload

Support billions online users etc.

Questions?


Andrew Svetlov

andrew.svetlov@gmail.com

@andrew_svetlov