andrew.svetlov@gmail.com
import requests
r = requests.get('https://api.github.com/user',
auth=('user', 'pass'))
print(r.status_code)
print(r.text)
session = requests.Session()
r = session.get(url)
print(r.status_code)
print(r.headers['content-type'])
print(r.text)
async def coro():
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
print(r.status)
print(r.headers['content-type'])
print(await r.text())
async def func():
await asyncio.sleep(1)
async def other():
await func()
async def fetch(session, url):
async with session.get(url) as r:
assert r.status == 200
return await r.text()
tasks = [loop.create_task(fetch(session, url)
for url in ['http://google.com', 'http://python.org']]
res = await asyncio.gather(*tasks)
async def coro(session):
with aiohttp.Timeout(1.5):
async with session.get(url) as r:
...
async with client.ws_connect(
'http://websocket-server.org/endpoint') as ws:
async for msg in ws:
if msg.data == 'close':
await ws.close()
break
else:
ws.send_str("Answer on " + msg.data)
from django.conf.urls import url
from django.http import HttpResponse
def index(request):
return HttpResponse("Hello, world")
urlpatterns = [
url(r'^$', index),
]
from aiohttp import web
async def index(request):
return web.Response(text="Hello, world")
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
web.run_app(app)
import tornado.ioloop
import tornado.web
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello, world")
app = tornado.web.Application([
(r"/", MainHandler)])
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
async def handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.data == 'close':
await ws.close()
break
else:
ws.send_str(msg.data + '/answer')
return ws
async def long_running_operation():
...
loop.create_task(long_running_operation())
async def f():
fut = asyncio.Future()
fut.set_exception(RuntimeError())
del fut
...
ERROR:asyncio:Future exception was never retrieved
future: Future finished exception=RuntimeError()
RuntimeError
$ PYTHONASYNCIODEBUG=x python myapp.py
ERROR:asyncio:Future exception was never retrieved
future: Future finished exception=RuntimeError() created at filename.py:10
source_traceback: Object created at (most recent call last):
...
File "filename.py", line 10, in f
fut = asyncio.Future()
RuntimeError
async def fetch_all(urls, *, loop):
async with aiohttp.ClientSession(loop=loop):
...
loop = asyncio.get_event_loop()
asyncio.set_event_loop(None) # !!!
await fetch_all(urls, loop=loop)
async def fetch_all(urls, *, loop):
coros = []
async with aiohttp.ClientSession(loop=loop):
for url in urls:
coros.append(fetch(url))
await asyncio.gather(*tasks, loop=loop)
class Test(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
def tearDown(self):
self.loop.close()
def test_func(self):
async def go():
self.assertEqual(1, await func(loop=self.loop))
self.loop.run_until_complete(go())
def create_app(loop, path, handler):
app = web.Application(loop=loop)
app.router.add_route('GET', path, handler)
return app
async def test_hello(test_client):
async def hello(request):
return web.Response(body=b'Hello, world')
client = await test_client(create_app, '/', handler)
resp = await client.get('/')
assert resp.status == 200
text = await resp.text()
assert 'Hello, world' in text
from motor.motor_asyncio import AsyncIOMotorClient
DBNAME = 'testdb'
db = AsyncIOMotorClient()[DBNAME]
async def register(request):
post_data = await request.post()
login, password = post_data['login'], post_data['password']
matches = await db.users.find({'login': login}).count()
...
async def register(request):
post_data = await request.post()
login, password = post_data['login'], post_data['password']
matches = await request.app['db'].users.find({'login': login}).count()
...
def make_app(loop=None):
app = web.Application(loop=loop)
mongo = AsyncIOMotorClient(io_loop=loop)
db = mongo['testdb']
app['db'] = db
async def cleanup(app):
mongo.close()
app.on_cleanup.append(cleanup)
...
return app
from aiohttp_session import get_session
async def hander(request):
session = await get_session(request)
session['key'] = 'value'
return web.Response()
andrew.svetlov@gmail.com
@andrew_svetlov