← 返回首页
🧪

E2E测试架构:Selenium、Cypress与Playwright

📂 architecture ⏱ 7 min 1318 words

E2E测试架构:Selenium、Cypress与Playwright

E2E测试框架对比

端到端测试模拟真实用户操作,验证完整业务流程。三大主流框架各有特点:Selenium最成熟、生态最丰富;Cypress开发体验最好、调试最便捷;Playwright性能最佳、支持多浏览器。

# E2E测试框架抽象
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from abc import ABC, abstractmethod
from enum import Enum
import time

class Browser(Enum):
    CHROME = "chrome"
    FIREFOX = "firefox"
    SAFARI = "safari"
    EDGE = "edge"

@dataclass
class TestCase:
    name: str
    steps: List[Dict]
    expected_results: List[Dict]
    timeout: int = 30
    tags: List[str] = field(default_factory=list)

class E2ETestFramework(ABC):
    @abstractmethod
    def setup(self, browser: Browser, headless: bool = True):
        pass
    
    @abstractmethod
    def navigate(self, url: str):
        pass
    
    @abstractmethod
    def click(self, selector: str):
        pass
    
    @abstractmethod
    def type_text(self, selector: str, text: str):
        pass
    
    @abstractmethod
    def wait_for_element(self, selector: str, timeout: int = 10):
        pass
    
    @abstractmethod
    def assert_text(self, selector: str, expected: str):
        pass
    
    @abstractmethod
    def screenshot(self, name: str):
        pass
    
    @abstractmethod
    def teardown(self):
        pass

class E2ETestRunner:
    def __init__(self, framework: E2ETestFramework):
        self.framework = framework
        self.results = []
    
    def run_test(self, test_case: TestCase) -> Dict:
        """运行单个测试用例"""
        start_time = time.time()
        
        try:
            for step in test_case.steps:
                self._execute_step(step)
            
            for expected in test_case.expected_results:
                self._verify_result(expected)
            
            duration = time.time() - start_time
            result = {
                "name": test_case.name,
                "status": "passed",
                "duration": duration
            }
        
        except Exception as e:
            duration = time.time() - start_time
            result = {
                "name": test_case.name,
                "status": "failed",
                "duration": duration,
                "error": str(e)
            }
        
        self.results.append(result)
        return result
    
    def _execute_step(self, step: Dict):
        """执行测试步骤"""
        action = step["action"]
        
        if action == "navigate":
            self.framework.navigate(step["url"])
        elif action == "click":
            self.framework.click(step["selector"])
        elif action == "type":
            self.framework.type_text(step["selector"], step["text"])
        elif action == "wait":
            self.framework.wait_for_element(step["selector"], step.get("timeout", 10))
    
    def _verify_result(self, expected: Dict):
        """验证预期结果"""
        if expected["type"] == "text_visible":
            self.framework.assert_text(expected["selector"], expected["text"])
        elif expected["type"] == "element_exists":
            self.framework.wait_for_element(expected["selector"])
    
    def generate_report(self) -> Dict:
        """生成测试报告"""
        total = len(self.results)
        passed = sum(1 for r in self.results if r["status"] == "passed")
        
        return {
            "summary": {
                "total": total,
                "passed": passed,
                "failed": total - passed,
                "pass_rate": passed / total if total > 0 else 0
            },
            "results": self.results
        }

Selenium架构与最佳实践

Selenium是最成熟的浏览器自动化框架,支持多种语言和浏览器。Page Object模式是Selenium测试的最佳实践,将页面元素和操作封装为可复用的对象。

# Selenium Page Object模式
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

class BasePage:
    def __init__(self, driver):
        self.driver = driver
        self.wait = WebDriverWait(driver, 10)
    
    def find_element(self, locator: tuple):
        return self.wait.until(EC.presence_of_element_located(locator))
    
    def click(self, locator: tuple):
        element = self.wait.until(EC.element_to_be_clickable(locator))
        element.click()
    
    def type_text(self, locator: tuple, text: str):
        element = self.find_element(locator)
        element.clear()
        element.send_keys(text)
    
    def get_text(self, locator: tuple) -> str:
        return self.find_element(locator).text

class LoginPage(BasePage):
    # 页面元素定位器
    USERNAME_INPUT = (By.ID, "username")
    PASSWORD_INPUT = (By.ID, "password")
    LOGIN_BUTTON = (By.ID, "login-btn")
    ERROR_MESSAGE = (By.CLASS_NAME, "error-msg")
    
    def __init__(self, driver):
        super().__init__(driver)
        self.url = "https://example.com/login"
    
    def open(self):
        self.driver.get(self.url)
        return self
    
    def login(self, username: str, password: str):
        self.type_text(self.USERNAME_INPUT, username)
        self.type_text(self.PASSWORD_INPUT, password)
        self.click(self.LOGIN_BUTTON)
        return DashboardPage(self.driver)
    
    def get_error_message(self) -> str:
        return self.get_text(self.ERROR_MESSAGE)

class DashboardPage(BasePage):
    WELCOME_MSG = (By.CLASS_NAME, "welcome-message")
    LOGOUT_BUTTON = (By.ID, "logout")
    USER_PROFILE = (By.ID, "user-profile")
    
    def get_welcome_message(self) -> str:
        return self.get_text(self.WELCOME_MSG)
    
    def logout(self):
        self.click(self.LOGOUT_BUTTON)
        return LoginPage(self.driver)

# 测试用例
class LoginTest:
    def __init__(self):
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        self.driver = webdriver.Chrome(options=chrome_options)
    
    def test_successful_login(self):
        """测试成功登录"""
        login_page = LoginPage(self.driver)
        login_page.open()
        
        dashboard = login_page.login("testuser", "password123")
        
        welcome = dashboard.get_welcome_message()
        assert "Welcome" in welcome
        assert "testuser" in welcome
    
    def test_failed_login(self):
        """测试登录失败"""
        login_page = LoginPage(self.driver)
        login_page.open()
        
        login_page.login("wronguser", "wrongpass")
        
        error = login_page.get_error_message()
        assert "Invalid credentials" in error
    
    def teardown(self):
        self.driver.quit()

# Selenium测试配置
class SeleniumConfig:
    def __init__(self):
        self.browsers = [Browser.CHROME, Browser.FIREFOX]
        self.headless = True
        self.window_size = (1920, 1080)
        self timeouts = {
            "implicit": 10,
            "page_load": 30,
            "script": 30
        }
    
    def create_driver(self, browser: Browser) -> webdriver.Remote:
        if browser == Browser.CHROME:
            options = Options()
            if self.headless:
                options.add_argument("--headless")
            options.add_argument(f"--window-size={self.window_size[0]},{self.window_size[1]}")
            return webdriver.Chrome(options=options)
        
        elif browser == Browser.FIREFOX:
            from selenium.webdriver.firefox.options import Options as FirefoxOptions
            options = FirefoxOptions()
            if self.headless:
                options.add_argument("--headless")
            return webdriver.Firefox(options=options)

Cypress架构与最佳实践

Cypress是专为现代Web应用设计的测试框架,提供时间旅行调试、自动等待和实时重载。其架构将测试运行器嵌入浏览器,实现更可靠的测试。

// Cypress测试示例
describe('Login Flow', () => {
  beforeEach(() => {
    cy.visit('/login');
  });

  it('should login successfully', () => {
    cy.get('#username').type('testuser');
    cy.get('#password').type('password123');
    cy.get('#login-btn').click();
    
    cy.url().should('include', '/dashboard');
    cy.get('.welcome-message').should('contain', 'Welcome');
  });

  it('should show error for invalid credentials', () => {
    cy.get('#username').wronguser();
    cy.get('#password').type('wrongpass');
    cy.get('#login-btn').click();
    
    cy.get('.error-msg').should('contain', 'Invalid credentials');
  });
});

// Cypress自定义命令
Cypress.Commands.add('login', (username, password) => {
  cy.session([username, password], () => {
    cy.visit('/login');
    cy.get('#username').type(username);
    cy.get('#password').type(password);
    cy.get('#login-btn').click();
    cy.url().should('include', '/dashboard');
  });
});

// 使用自定义命令
describe('Dashboard', () => {
  beforeEach(() => {
    cy.login('testuser', 'password123');
  });

  it('should display user info', () => {
    cy.get('.user-profile').should('be.visible');
  });
});
# Cypress Python封装(用于API测试)
class CypressTestHelper:
    def __init__(self, base_url: str):
        self.base_url = base_url
    
    def generate_test(self, scenario: Dict) -> str:
        """生成Cypress测试代码"""
        test_name = scenario["name"]
        steps = scenario["steps"]
        
        code = f"""describe('{test_name}', () => {{
  it('should {test_name.lower()}', () => {{
"""
        
        for step in steps:
            if step["action"] == "visit":
                code += f"    cy.visit('{step['url']}');\n"
            elif step["action"] == "get":
                code += f"    cy.get('{step['selector']}').{step['command']}({step.get('value', '')});\n"
            elif step["action"] == "assert":
                code += f"    cy.get('{step['selector']}').should('{step['assertion']}', '{step.get('value', '')}');\n"
        
        code += """  });
});"""
        
        return code

# 配置生成器
class CypressConfigGenerator:
    def __init__(self):
        self.config = {
            "baseUrl": "http://localhost:3000",
            "viewportWidth": 1280,
            "viewportHeight": 720,
            "video": True,
            "screenshotOnRunFailure": True
        }
    
    def generate_config(self, env: str = "development") -> Dict:
        """生成Cypress配置"""
        config = self.config.copy()
        
        if env == "production":
            config["baseUrl"] = "https://example.com"
            config["video"] = False
        
        return config
    
    def generate_plugins(self) -> str:
        """生成插件配置"""
        return """
const { defineConfig } = require('cypress');

module.exports = defineConfig({
  e2e: {
    setupNodeEvents(on, config) {
      // implement node event listeners here
    },
  },
});
"""

Playwright架构与最佳实践

Playwright由Microsoft开发,提供跨浏览器支持、自动等待、网络拦截和并行执行。其架构基于浏览器原生协议,性能优于Selenium。

# Playwright测试
from playwright.sync_api import sync_playwright, Page, expect
from playwright.async_api import async_playwright
import asyncio

class PlaywrightTestSuite:
    def __init__(self):
        self.playwright = None
        self.browser = None
    
    def setup(self, headless: bool = True):
        self.playwright = sync_playwright().start()
        self.browser = self.playwright.chromium.launch(headless=headless)
    
    def teardown(self):
        if self.browser:
            self.browser.close()
        if self.playwright:
            self.playwright.stop()
    
    def test_login(self):
        """测试登录功能"""
        page = self.browser.new_page()
        
        try:
            page.goto("https://example.com/login")
            
            # 填写表单
            page.fill("#username", "testuser")
            page.fill("#password", "password123")
            page.click("#login-btn")
            
            # 验证跳转
            page.wait_for_url("**/dashboard")
            expect(page.locator(".welcome-message")).to_contain_text("Welcome")
        
        finally:
            page.close()
    
    def test_api_interception(self):
        """测试API拦截"""
        page = self.browser.new_page()
        
        try:
            # 拦截API请求
            def handle_route(route):
                if "/api/users" in route.request.url:
                    route.fulfill(
                        status=200,
                        content_type="application/json",
                        body='[{"id": 1, "name": "Test User"}]'
                    )
                else:
                    route.continue_()
            
            page.route("**/*", handle_route)
            
            page.goto("https://example.com/users")
            
            # 验证mock数据
            expect(page.locator(".user-name")).to_have_text("Test User")
        
        finally:
            page.close()

# 异步测试
class AsyncPlaywrightTest:
    def __init__(self):
        self.playwright = None
        self.browser = None
    
    async def setup(self):
        self.playwright = await async_playwright().start()
        self.browser = await self.playwright.chromium.launch(headless=True)
    
    async def teardown(self):
        if self.browser:
            await self.browser.close()
        if self.playwright:
            await self.playwright.stop()
    
    async def test_concurrent_actions(self):
        """测试并发操作"""
        context = await self.browser.new_context()
        page = await context.new_page()
        
        try:
            await page.goto("https://example.com")
            
            # 并发执行多个操作
            await asyncio.gather(
                page.click("#button1"),
                page.fill("#input1", "text1")
            )
            
            # 验证结果
            await page.wait_for_selector(".result")
        
        finally:
            await context.close()

# Playwright配置
class PlaywrightConfig:
    def __init__(self):
        self.config = {
            "projects": [
                {"name": "chromium", "use": {"browserName": "chromium"}},
                {"name": "firefox", "use": {"browserName": "firefox"}},
                {"name": "webkit", "use": {"browserName": "webkit"}}
            ],
            "use": {
                "baseURL": "http://localhost:3000",
                "trace": "on-first-retry",
                "video": "on-first-retry"
            }
        }
    
    def generate_config(self) -> Dict:
        return self.config

# 测试数据管理
class PlaywrightTestData:
    def __init__(self, page: Page):
        self.page = page
    
    async def login_as(self, username: str, password: str):
        """使用指定用户登录"""
        await self.page.goto("/login")
        await self.page.fill("#username", username)
        await self.page.fill("#password", password)
        await self.page.click("#login-btn")
        await self.page.wait_for_url("**/dashboard")
    
    async def create_test_data(self, data_type: str, count: int = 1):
        """创建测试数据"""
        for i in range(count):
            if data_type == "user":
                await self.page.goto("/users/new")
                await self.page.fill("#name", f"Test User {i}")
                await self.page.fill("#email", f"user{i}@example.com")
                await self.page.click("#save")

E2E测试CI/CD集成

将E2E测试集成到CI/CD流水线中,实现自动化验证。使用Docker运行测试环境,支持并行执行和失败重试。

# E2E测试CI配置
class E2ECIConfig:
    def __init__(self):
        self.config = {
            "pipeline": {
                "stages": ["build", "test", "deploy"],
                "test_stage": {
                    "parallel": True,
                    "retry": 2,
                    "timeout": "30m"
                }
            },
            "docker": {
                "services": {
                    "app": {
                        "image": "myapp:latest",
                        "ports": ["3000:3000"]
                    },
                    "db": {
                        "image": "postgres:14",
                        "env": {"POSTGRES_DB": "test"}
                    }
                }
            },
            "reporting": {
                "screenshots": True,
                "videos": True,
                "reporters": ["html", "junit"]
            }
        }
    
    def generate_github_actions(self) -> str:
        """生成GitHub Actions配置"""
        return """
name: E2E Tests
on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      
      - name: Setup Node.js
        uses: actions/setup-node@v3
        with:
          node-version: '18'
      
      - name: Install dependencies
        run: npm ci
      
      - name: Install Playwright
        run: npx playwright install --with-deps
      
      - name: Run tests
        run: npx playwright test
      
      - name: Upload test results
        if: always()
        uses: actions/upload-artifact@v3
        with:
          name: playwright-report
          path: playwright-report/
"""
    
    def generate_gitlab_ci(self) -> str:
        """生成GitLab CI配置"""
        return """
e2e-tests:
  stage: test
  image: mcr.microsoft.com/playwright/python:v1.40.0-jammy
  services:
    - postgres:14
  variables:
    POSTGRES_DB: test
  script:
    - pip install -r requirements.txt
    - playwright install --with-deps
    - pytest tests/e2e/ --junitxml=report.xml
  artifacts:
    when: always
    reports:
      junit: report.xml
    paths:
      - test-results/
  retry:
    max: 2
    when: script_failure
"""

# 测试报告生成
class E2ETestReporter:
    def __init__(self, results: List[Dict]):
        self.results = results
    
    def generate_html_report(self) -> str:
        """生成HTML报告"""
        passed = sum(1 for r in self.results if r["status"] == "passed")
        failed = sum(1 for r in self.results if r["status"] == "failed")
        
        html = f"""
<!DOCTYPE html>
<html>
<head><title>E2E Test Report</title></head>
<body>
<h1>E2E Test Report</h1>
<div class="summary">
    <p>Total: {len(self.results)}</p>
    <p>Passed: {passed}</p>
    <p>Failed: {failed}</p>
</div>
<table>
<tr><th>Test</th><th>Status</th><th>Duration</th></tr>
"""
        
        for result in self.results:
            status_class = "passed" if result["status"] == "passed" else "failed"
            html += f"""
<tr class="{status_class}">
    <td>{result['name']}</td>
    <td>{result['status']}</td>
    <td>{result['duration']:.2f}s</td>
</tr>
"""
        
        html += "</table></body></html>"
        return html
    
    def generate_junit_xml(self) -> str:
        """生成JUnit XML报告"""
        import xml.etree.ElementTree as ET
        
        testsuites = ET.Element("testsuites")
        testsuite = ET.SubElement(testsuites, "testsuite", 
                                 name="E2E Tests",
                                 tests=str(len(self.results)))
        
        for result in self.results:
            testcase = ET.SubElement(testsuite, "testcase",
                                    name=result["name"],
                                    time=str(result["duration"]))
            
            if result["status"] == "failed":
                failure = ET.SubElement(testcase, "failure",
                                       message=result.get("error", "Test failed"))
        
        return ET.tostring(testsuites, encoding="unicode")