Andrew Svetlov
@andrew_svetlov
andrew.svetlov@gmail.com
https://asvetlov.github.io/ua-pycon-2018/
Microservices
Websockets
Non-HTTP transports
Long-running tasks
async def fetch(session, url):
async with session.get('http://python.org') as response:
return await response.text()
A kind of concurrent programming
asyncio task is a lightweight thread
Utilize the knowledge of multithreaded approach
Use locks, events, queues etc
No lowlevel futures
A way to speed up Django app
def view(request):
loop = asyncio.get_event_loop()
tasks = [asyncio.create_task(fetch(url))
for url in urls]
loop.run_until_complete(asyncio.wait(tasks))
Don't mix sync and async code
async def main():
await func()
asyncio.run(main())
async def f():
...
loop.create_task(f())
loop.run_forever()
def run(main, *, debug=False):
if events._get_running_loop() is not None:
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
raise ValueError("a coroutine was expected, got {!r}".format(main))
loop = events.new_event_loop()
try:
events.set_event_loop(loop)
loop.set_debug(debug)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
events.set_event_loop(None)
loop.close()
def _cancel_all_tasks(loop):
to_cancel = [task for task in tasks.all_tasks(loop)
if not task.done()]
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during asyncio.run() shutdown',
'exception': task.exception(),
'task': task,
})
async def process(url):
...
async def process_all():
for url in urls:
asyncio.create_task(process(url))
class A:
def __init__(self):
self.data = fetch(url)
class A:
def __init__(self):
self.data = None
@classmethod
async def create(cls):
self = cls()
self.data = await fetch(url)
a = await A.create()
async def fetch(db):
cursor = await db.execute("SELECT * from tbl")
ret = []
async for rec in cursor:
ret.append(rec)
return ret
__del__
await obj.close()
async with obj: ...
async def handler(request):
await request.config['db'].execute("UPDATE ...")
return web.Response(text="OK")
async def handler(request):
await asyncio.shield(request.config['db'].execute("UPDATE ..."))
return web.Response(text="OK")
import contextvars
var = contextvars.ContextVar('var', 'default')
async def inner():
log.debug("User name: %s", var.get())
@routes.get('/{name}')
async def handler(request):
assert var.get() == 'default'
var.set(request.match_info['name'])
await inner()
...
async with TaskGroup() as tg:
tg.create_task(task_1())
tg.create_task(task_2())
tg.call_later(5.0, callback)
Andrew Svetlov
@andrew_svetlov
andrew.svetlov@gmail.com
http://asvetlov.github.io/ua-pycon-2018/