Python異步庫asyncio、aiohttp詳解
更新時間:2024年06月27日 10:38:50 作者:IT.BOB
這篇文章主要介紹了Python異步庫asyncio、aiohttp使用,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
asyncio
版本支持
- asyncio 模塊在 Python3.4 時發(fā)布。
- async 和 await 關鍵字最早在 Python3.5 中引入。
- Python3.3 之前不支持。
關鍵概念
event_loop事件循環(huán):程序開啟一個無限的循環(huán),程序員會把一些函數(shù)(協(xié)程)注冊到事件循環(huán)上。當滿足事件發(fā)生的時候,調用相應的協(xié)程函數(shù)。coroutine協(xié)程:協(xié)程對象,指一個使用async關鍵字定義的函數(shù),它的調用不會立即執(zhí)行函數(shù),而是會返回一個協(xié)程對象。協(xié)程對象需要注冊到事件循環(huán),由事件循環(huán)調用。future對象: 代表將來執(zhí)行或沒有執(zhí)行的任務的結果。它和task上沒有本質的區(qū)別task任務:一個協(xié)程對象就是一個原生可以掛起的函數(shù),任務則是對協(xié)程進一步封裝,其中包含任務的各種狀態(tài)。Task 對象是 Future 的子類,它將 coroutine 和 Future 聯(lián)系在一起,將 coroutine 封裝成一個 Future 對象。async/await關鍵字:python3.5 用于定義協(xié)程的關鍵字,async定義一個協(xié)程,await用于掛起阻塞的異步調用接口。其作用在一定程度上類似于yield。
工作流程
- 定義/創(chuàng)建協(xié)程對象
- 將協(xié)程轉為task任務
- 定義事件循環(huán)對象容器
- 將task任務放到事件循環(huán)對象中觸發(fā)
import asyncio
async def hello(name):
print('Hello,', name)
# 定義協(xié)程對象
coroutine = hello("World")
# 定義事件循環(huán)對象容器
loop = asyncio.get_event_loop()
# 將協(xié)程轉為task任務
# task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
# 將task任務扔進事件循環(huán)對象中并觸發(fā)
loop.run_until_complete(task)并發(fā)
1. 創(chuàng)建多個協(xié)程的列表 tasks:
import asyncio
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
tasks = [do_some_work(1), do_some_work(2), do_some_work(4)]2. 將協(xié)程注冊到事件循環(huán)中:
- 方法一:使用
asyncio.wait()
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks))
- 方法二:使用
asyncio.gather()
loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks))
3. 查看 return 結果:
for task in tasks:
print('Task ret: ', task.result())4. asyncio.wait() 與 asyncio.gather() 的區(qū)別:
接收參數(shù)不同:
asyncio.wait():必須是一個 list 對象,list 對象里存放多個 task 任務。
# 使用 asyncio.ensure_future 轉換為 task 對象
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
# 也可以不轉為 task 對象
# tasks=[
# factorial("A", 2),
# factorial("B", 3),
# factorial("C", 4)
# ]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))asyncio.gather():比較廣泛,注意接收 list 對象時*不能省略。
tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]
# tasks=[
# factorial("A", 2),
# factorial("B", 3),
# factorial("C", 4)
# ]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))loop = asyncio.get_event_loop()
group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])
loop.run_until_complete(asyncio.gather(group1, group2, group3))返回結果不同:
asyncio.wait():返回dones(已完成任務) 和pendings(未完成任務)
dones, pendings = await asyncio.wait(tasks)
for task in dones:
print('Task ret: ', task.result())asyncio.gather():直接返回結果
results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)aiohttp
ClientSession 會話管理
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/get') as resp:
print(resp.status)
print(await resp.text())
asyncio.run(main())其他請求:
session.post('http://httpbin.org/post', data=b'data')
session.put('http://httpbin.org/put', data=b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=b'data')URL 參數(shù)傳遞
async def main():
async with aiohttp.ClientSession() as session:
params = {'key1': 'value1', 'key2': 'value2'}
async with session.get('http://httpbin.org/get', params=params) as r:
expect = 'http://httpbin.org/get?key1=value1&key2=value2'
assert str(r.url) == expectasync def main():
async with aiohttp.ClientSession() as session:
params = [('key', 'value1'), ('key', 'value2')]
async with session.get('http://httpbin.org/get', params=params) as r:
expect = 'http://httpbin.org/get?key=value2&key=value1'
assert str(r.url) == expect獲取響應內容
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/get') as r:
# 狀態(tài)碼
print(r.status)
# 響應內容,可以自定義編碼
print(await r.text(encoding='utf-8'))
# 非文本內容
print(await r.read())
# JSON 內容
print(await r.json())自定義請求頭
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"
}
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/get', headers=headers) as r:
print(r.status)為所有會話設置請求頭:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"
}
async def main():
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get('http://httpbin.org/get') as r:
print(r.status)自定義 cookies
async def main():
cookies = {'cookies_are': 'working'}
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/cookies', cookies=cookies) as resp:
assert await resp.json() == {"cookies": {"cookies_are": "working"}}為所有會話設置 cookies:
async def main():
cookies = {'cookies_are': 'working'}
async with aiohttp.ClientSession(cookies=cookies) as session:
async with session.get('http://httpbin.org/cookies') as resp:
assert await resp.json() == {"cookies": {"cookies_are": "working"}}設置代理
注意:只支持 http 代理。
async def main():
async with aiohttp.ClientSession() as session:
proxy = "http://127.0.0.1:1080"
async with session.get("http://python.org", proxy=proxy) as r:
print(r.status)需要用戶名密碼授權的代理:
async def main():
async with aiohttp.ClientSession() as session:
proxy = "http://127.0.0.1:1080"
proxy_auth = aiohttp.BasicAuth('username', 'password')
async with session.get("http://python.org", proxy=proxy, proxy_auth=proxy_auth) as r:
print(r.status)也可以直接傳遞:
async def main():
async with aiohttp.ClientSession() as session:
proxy = "http://username:password@127.0.0.1:1080"
async with session.get("http://python.org", proxy=proxy) as r:
print(r.status)異步爬蟲示例
import asyncio
import aiohttp
from lxml import etree
from datetime import datetime
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36"}
async def get_movie_url():
req_url = "https://movie.douban.com/chart"
async with aiohttp.ClientSession() as session:
async with session.get(url=req_url, headers=headers) as response:
result = await response.text()
result = etree.HTML(result)
return result.xpath("http://*[@id='content']/div/div[1]/div/div/table/tr/td/a/@href")
async def get_movie_content(movie_url):
async with aiohttp.ClientSession() as session:
async with session.get(url=movie_url, headers=headers) as response:
result = await response.text()
result = etree.HTML(result)
movie = dict()
name = result.xpath('//*[@id="content"]/h1/span[1]//text()')
author = result.xpath('//*[@id="info"]/span[1]/span[2]//text()')
movie["name"] = name
movie["author"] = author
return movie
def run():
start = datetime.now()
loop = asyncio.get_event_loop()
movie_url_list = loop.run_until_complete(get_movie_url())
tasks = [get_movie_content(url) for url in movie_url_list]
movies = loop.run_until_complete(asyncio.gather(*tasks))
print(movies)
print("異步用時為:{}".format(datetime.now() - start))
if __name__ == '__main__':
run()總結
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
詳解Python 2.6 升級至 Python 2.7 的實踐心得
本篇文章主要介紹了詳解Python 2.6 升級至 Python 2.7 的實踐心得,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-04-04
Python 實現(xiàn)的 Google 批量翻譯功能
這篇文章主要介紹了Python 實現(xiàn)的 Google 批量翻譯功能,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2019-08-08
python數(shù)據(jù)分析必會的Pandas技巧匯總
用Python做數(shù)據(jù)分析光是掌握numpy和matplotlib可不夠,numpy雖然能夠幫我們處理處理數(shù)值型數(shù)據(jù),但很多時候,還有字符串,還有時間序列等,比如:我們通過爬蟲獲取到了存儲在數(shù)據(jù)庫中的數(shù)據(jù),一些Pandas必會的用法,讓你的數(shù)據(jù)分析水平更上一層樓2021-08-08

