最近想找一个 [[Wallabag]] 的 Python 客户端,在 GitHub 简单搜寻了一下之后发现了 wallbag_api 这个仓库,看了一下 Python 代码之后发现库中的代码是用 aiohttp 编写的,所以就来学习一下这个 Python 的异步 HTTP 库 aiohttp。

aiohttp 官方简洁明了,Asynchronous HTTP Client/Server for asyncio and Python. aiohttp 是一个基于 asyncio 模块的异步 HTTP 客户端/服务端框架。

可以 看到 aiohttp 不仅是一个 HTTP 客户端,也可以作为服务端。同时支持服务端的 WebSockets 和客户端 WebSockets。

我们知道 Python 下还有一个比较著名的 HTTP Client 库叫做 requests,这是一个同步的 HTTP 调用客户端,使用 requests 发起 HTTP 调用之后需要同步等待返回结果,而在 aiohttp 可以在发起请求之后将将程序的控制权暂时给别人,等待响应返回结果回来了之后再进行处理,这就可以提升系统的性能。

aiohttp 内部使用了 [[python-asyncio]] 来实现,而 Python 下 asyncio 的核心就是 [[Coroutine 协程]]

Coroutine 协程

Coroutine(协程)是一个更通用的 subroutine(子程序)。

async 方法中使用 await 关键字来表示 coroutine。当使用 await 关键字的时候,coroutine 会将当前程序的控制释放给 Event loop。

import asyncio
async def async_func():
    print('start ...')
    await asyncio.sleep(1)
    print('... end!')

async def main():
    async_func()#this will do nothing because coroutine object is created but not awaited
    await async_func()

asyncio.run(main())

如果直接调用 async_func() 不会有任何作用,需要在前面添加 await

协程适合 IO 密集型任务:

  • 网络请求,比如爬虫 [[aiohttp]]
  • 文件读写,[[aiofile]]
  • Web 框架,[[aiohttp]], [[FastAPI]]
  • 数据库查询,[[asyncpg]], [[databases]], [[aiomysql]]

Installation

安装使用:

pip install aiohttp

作为客户端

aiohttp 作为客户端。

这里可以和 requests 做一个简单的对比,在 requests 中发起同步请求:

import requests 
def hello()    
     return requests.get("http://httpbin.org/get")     

print(hello())

程序在 requests.get 方法调用时会同步等待请求返回。

而如果使用 aiohttp:

#!/usr/local/bin/python3.6
import asyncio 
from aiohttp import ClientSession 

async def hello():     
    async with ClientSession() as session:         
        async with session.get("http://httpbin.org/headers") as response:                
            response = await response.read()                         
            print(response) 
  
loop = asyncio.get_event_loop() 
loop.run_until_complete(hello())

在这个异步代码中我们可以看到很多的异步关键字,async 以及 await 关键字定义函数为异步。

这里使用 ClientSession() 获取一个 Session,session 可以在多个请求之间保留 cookie 和相关信息。因为 session 关闭是一个异步操作,所以需要使用 async with

session.get 发起调用,也是一个异步操作,所以也需要使用 async,而 with 语句保证 response 可以正确关闭。

最后为了让这个异步方法可以执行,需要将其放入一个事件循环。

作为服务端

[[用 aiohttp 写服务器]] 不是本文的重点,所以简单了解一下,先略过。

from aiohttp import web

routes = web.RouteTableDef()

@routes.get('/')
async def hello(request):
    return web.Response(text="Hello, world")

app = web.Application()
app.add_routes(routes)
web.run_app(app)

控制并发

使用 Semaphore 控制

Semaphore 是一个线程计数器,核心是 acquire()release() 方法。

执行 acquire() 方法时,先判断内部值是否小于 0,如果大于 0,获得锁,内部值减一,如果小于 0,阻塞,直到 release() 方法释放锁,内部值加一。

async def fetch(url, semaphore):
    async with semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                data = await response.text()
                print('data', data + " " + time.ctime())
                return data

async def run():
    url = 'http://127.0.0.1:5000'
    semaphore = asyncio.Semaphore(2)  # 限制并发量为2
    to_get = [fetch(url, semaphore) for _ in range(10)]
    await asyncio.wait(to_get)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    loop.close()

使用 TCPConnector 控制

async def fetch(session, url):
    async with session.get(url) as resp:
        if resp.status != 200:
            resp.raise_for_status()
        data = await resp.text()
        print('data', data + " " + time.ctime())
        return data

async def fetch_multi(session, urls):
    tasks = []
    for url in urls:
        task = asyncio.create_task(fetch(session, url))
        tasks.append(task)
    # gather: 搜集所有future对象,并等待返回
    results = await asyncio.gather(*tasks)
    return results

async def main():
    urls = ["http://127.0.0.1:5000/" for _ in range(10)]
    conn = aiohttp.TCPConnector(limit=3)
    async with aiohttp.ClientSession(connector=conn) as session:
        datas = await fetch_multi(session, urls)
        print(datas)

if __name__ == '__main__':
    asyncio.run(main())