Асинхронные сетевые приложения на Python

Вчера, сегодня, завтра

Андрей Светлов

andrew.svetlov@gmail.com
@andrew_svetlov

О себе

  • Использую python 15+ лет
  • Python Core Developer с 2012
  • Соавтор asyncio
  • Главный разработчик aiohttp
  • Дюжина библиотек в сообществе aio-libs

Опрос

  • Кто использует Python 3?
  • Разрабатывает WEB сайты на Django/Flask
  • Понимает что такое асинхронное сетевое программирование
  • Использует асинхронные frameworks (twisted/tornado/gevent)?

Зачем?

  • 1,000 потоков OS
  • 1,000,000 легковесных задач
asyncio is another interesting topic. I was surprised how alive and well that little corner of the Python community is (albeit in mainly Russian speaking countries).

Hynek Schlawack

aiohttp -- асинхронный web

  • Клиент
  • Сервер
  • Перманентные соединения
  • Веб-сокеты

3 года истории

  • Выделилися из asyncio (tulip)
  • 21 releases
  • 2800+ commits
  • 100+ contributors

Клиент

Requests


import requests

r = requests.get('https://api.github.com/user',
                 auth=('user', 'pass'))
print(r.status_code)
print(r.headers['content-type'])
print(r.text)
                  

aiohttp


import aiohttp

async def coro():
    r = await aiohttp.get('https://api.github.com/user',
                          auth=aiohttp.BasicAuth('user', 'pass'))
    print(r.status)
    print(r.headers['content-type'])
    print(await r.text())
    r.close()
                  

Правила для asyncio

  1. Корутина -- это async def
  2. Вызывай корутину через await
  3. Если функция содержит await -- сделай её корутиной

                    
async def func():
    await asyncio.sleep(1)
                    
                  

Requests с сессией


session = requests.Session()

r = session.get(url)
                  

aiohttp с сессией


session = aiohttp.ClientSession()

async def coro(session):
    async with session.get(url) as r:
        print(r.status)
        print(await r.text())
                  

Таймауты


async def coro(session):
    with aiohttp.Timeout(1.5):
        async with session.get(url) as r:
            ...
                  

Веб-сокеты

                    
async with client.ws_connect(
    'http://websocket-server.org/endpoint') as ws:

    async for msg in ws:
        if msg.data == 'close':
            await ws.close()
            break
        else:
            ws.send_str("Answer on " + msg.data)
                    
                  

Сервер

Django

                    
from django.conf.urls import url
from django.http import HttpResponse


def index(request):
    return HttpResponse("Hello, world")

urlpatterns = [
    url(r'^$', index),
]
                    
                  

aiohttp

                    
from aiohttp import web

async def handler(request):
    return web.Response(text="Hello, world")

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
web.run_app(app)
                    
                  

Tornado

                    
import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")

app = tornado.web.Application([
    (r"/", MainHandler)])

app.listen(8888)
tornado.ioloop.IOLoop.current().start()
                    
                  

Подключение БД

                    
import sqlalchemy as sa
from aiopg.sa import create_engine

metadata = sa.MetaData()
tbl = sa.Table('tbl', metadata,
               sa.Column('id', sa.Integer, primary_key=True),
               sa.Column('val', sa.String(255)))
app['db'] = await create_engine(...)

async def close_db(app):
    app['db'].close()
    await app['db'].wait_closed()
app.on_cleanup.append(close_db)
                    
                  

Запросы в БД

                    
async def handler(request):
    txt = ""
    async with request.app['db'].acquire() as conn:
        await conn.execute(tbl.insert().values(val='abc'))

        async for row in conn.execute(tbl.select()):
            txt += "{}: {}\n".format(row.id, row.val)
    return web.Response(text=txt)
                    
                  

Серверные веб-сокеты

                    
async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.data == 'close':
            await ws.close()
            break
    else:
        ws.send_str(msg.data + '/answer')

    return ws
                    
                  

Нетривиальные штучки

Жизненный цикл запроса-ответа и middlewares

Серверные сессии

                    
from aiohttp_session import get_session

async def hander(request):
    session = await get_session(request)
    session['key'] = 'value'
    return web.Response()
                    
                  

Debug Toolbar

asyncio под капотом

Selector

EVENT_READ/EVENT_WRITE

  • register(fd, events, data=None)
  • unregister(fd)
  • select(timeout=None)

select, poll, epoll, kqueue

Event loop

  • add_reader(fd, callback, *args)
  • remove_reader(fd)
  • add_writer(fd, callback, *args)
  • remove_writer(fd)
  • ***
  • call_soon(callback, *args)
  • call_later(delay, callback, *args)

Итерация event loop

                    
def run_forever():
    while True:
        events = selector.select(timeout)
        for key, mask in events:
            if mask & EVENT_READ:
                key.reader(key.fileobj)
            if mask & EVENT_WRITE:
                key.writer(key.fileobj)
        for handler in ready:
            handler._run()
                    
                  

Буферы сокета

Таймеры

Полезные советы

Цикл разработки

  • Для разработки -- один процесс
  • Удобство запуска тестов
  • Разворачивать на отдельных процессах-контейнерах-узлах

PYTHONASYNCIODEBUG=1

                    
async def f():
    fut = asyncio.Future()
    fut.set_exception(RuntimeError())
    del fut
                    
                  
                    
...
ERROR:asyncio:Future exception was never retrieved
future: Future finished exception=RuntimeError()
RuntimeError
                    
                  
                    
$ PYTHONASYNCIODEBUG=x python myapp.py
                    
                  
                    
ERROR:asyncio:Future exception was never retrieved
future: Future finished exception=RuntimeError() created at filename.py:10
source_traceback: Object created at (most recent call last):
...
File "filename.py", line 10, in f
fut = asyncio.Future()
RuntimeError
                    
                  

Explicit loop

                    
def request(method, url, *, loop=None):
    ...

loop = asyncio.get_event_loop()
await request('GET', 'http://python.org', loop=loop)
                    
                  

Тестирование

                    
class Test(unittest.TestCase):

    def setUp(self):
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(None)

    def tearDown(self):
        self.loop.close()

    def test_func(self):
        async def go():
            self.assertEqual(1, await func(loop=loop))

        self.loop.run_until_complete(go())
                    
                  

Производительность

Как у Tornado или Twisted

Достаточно неплохо для highload

Выдерживает миллионы online пользователей и пр.

Вопросы?


Андрей Светлов

andrew.svetlov@gmail.com

@andrew_svetlov