异步编程进阶
异步编程进阶
本文将深入探讨异步编程的高级特性,包括任务管理、信号量控制、异步生成器以及使用aiohttp进行异步HTTP请求。
创建和管理 Task
import asyncio
async def background_task(name, interval):
while True:
print(f"{name}: 执行任务")
await asyncio.sleep(interval)
async def main():
# 创建后台任务
task1 = asyncio.create_task(background_task("任务1", 2))
task2 = asyncio.create_task(background_task("任务2", 3))
# 主任务继续执行
await asyncio.sleep(10)
# 取消任务
task1.cancel()
task2.cancel()
# 等待取消完成
await asyncio.gather(task1, task2, return_exceptions=True)
asyncio.run(main())
Semaphore 信号量
控制并发数量,避免资源耗尽:
import asyncio
sem = asyncio.Semaphore(5) # 最多5个并发
async def fetch(url):
async with sem:
print(f"开始获取 {url}")
await asyncio.sleep(1)
print(f"完成 {url}")
return f"数据 from {url}"
async def main():
urls = [f"url_{i}" for i in range(20)]
results = await asyncio.gather(*[fetch(url) for url in urls])
print(f"获取了 {len(results)} 条数据")
asyncio.run(main())
异步生成器
import asyncio
async def async_range(start, stop, delay=0.1):
"""异步生成器"""
current = start
while current < stop:
await asyncio.sleep(delay)
yield current
current += 1
async def main():
async for num in async_range(0, 5):
print(num)
asyncio.run(main())
aiohttp 进行异步HTTP请求
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result[:100])
asyncio.run(main())
aiohttp 限制并发
import asyncio
import aiohttp
class RateLimiter:
def __init__(self, max_concurrent):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch(self, session, url):
async with self.semaphore:
async with session.get(url) as response:
return await response.text()
async def main():
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]
limiter = RateLimiter(3)
async with aiohttp.ClientSession() as session:
tasks = [limiter.fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
异步上下文管理器与资源管理
import asyncio
import aiohttp
class AsyncHTTPClient:
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
return False
async def get(self, url):
async with self.session.get(url) as response:
return await response.json()
async def main():
async with AsyncHTTPClient() as client:
data = await client.get("https://httpbin.org/json")
print(data)
asyncio.run(main())
实战:异步爬虫
import asyncio
import aiohttp
from urllib.parse import urljoin
class AsyncCrawler:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.visited = set()
async def fetch(self, session, url):
async with self.semaphore:
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"错误 {url}: {e}")
return None
async def crawl(self, urls):
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 使用示例
async def main():
crawler = AsyncCrawler(max_concurrent=10)
urls = ["https://example.com"] * 20
results = await crawler.crawl(urls)
print(f"完成 {len([r for r in results if r])} 个页面")
asyncio.run(main())
掌握这些进阶技巧,能让你构建高效、可控的异步应用。