← 返回首页
🔥

异步编程进阶

📂 python ⏱ 2 min 392 words

异步编程进阶

本文将深入探讨异步编程的高级特性,包括任务管理、信号量控制、异步生成器以及使用aiohttp进行异步HTTP请求。

创建和管理 Task

import asyncio

async def background_task(name, interval):
    while True:
        print(f"{name}: 执行任务")
        await asyncio.sleep(interval)

async def main():
    # 创建后台任务
    task1 = asyncio.create_task(background_task("任务1", 2))
    task2 = asyncio.create_task(background_task("任务2", 3))

    # 主任务继续执行
    await asyncio.sleep(10)

    # 取消任务
    task1.cancel()
    task2.cancel()

    # 等待取消完成
    await asyncio.gather(task1, task2, return_exceptions=True)

asyncio.run(main())

Semaphore 信号量

控制并发数量,避免资源耗尽:

import asyncio

sem = asyncio.Semaphore(5)  # 最多5个并发

async def fetch(url):
    async with sem:
        print(f"开始获取 {url}")
        await asyncio.sleep(1)
        print(f"完成 {url}")
        return f"数据 from {url}"

async def main():
    urls = [f"url_{i}" for i in range(20)]
    results = await asyncio.gather(*[fetch(url) for url in urls])
    print(f"获取了 {len(results)} 条数据")

asyncio.run(main())

异步生成器

import asyncio

async def async_range(start, stop, delay=0.1):
    """异步生成器"""
    current = start
    while current < stop:
        await asyncio.sleep(delay)
        yield current
        current += 1

async def main():
    async for num in async_range(0, 5):
        print(num)

asyncio.run(main())

aiohttp 进行异步HTTP请求

import asyncio
import aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/user-agent",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result[:100])

asyncio.run(main())

aiohttp 限制并发

import asyncio
import aiohttp

class RateLimiter:
    def __init__(self, max_concurrent):
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def fetch(self, session, url):
        async with self.semaphore:
            async with session.get(url) as response:
                return await response.text()

async def main():
    urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]

    limiter = RateLimiter(3)

    async with aiohttp.ClientSession() as session:
        tasks = [limiter.fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(f"完成 {len(results)} 个请求")

asyncio.run(main())

异步上下文管理器与资源管理

import asyncio
import aiohttp

class AsyncHTTPClient:
    def __init__(self):
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
        return False

    async def get(self, url):
        async with self.session.get(url) as response:
            return await response.json()

async def main():
    async with AsyncHTTPClient() as client:
        data = await client.get("https://httpbin.org/json")
        print(data)

asyncio.run(main())

实战:异步爬虫

import asyncio
import aiohttp
from urllib.parse import urljoin

class AsyncCrawler:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.visited = set()

    async def fetch(self, session, url):
        async with self.semaphore:
            try:
                async with session.get(url, timeout=10) as response:
                    return await response.text()
            except Exception as e:
                print(f"错误 {url}: {e}")
                return None

    async def crawl(self, urls):
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url) for url in urls]
            return await asyncio.gather(*tasks)

# 使用示例
async def main():
    crawler = AsyncCrawler(max_concurrent=10)
    urls = ["https://example.com"] * 20
    results = await crawler.crawl(urls)
    print(f"完成 {len([r for r in results if r])} 个页面")

asyncio.run(main())

掌握这些进阶技巧,能让你构建高效、可控的异步应用。