← 返回首页
🏎️

高并发架构:连接池与线程池

📂 architecture ⏱ 4 min 672 words

高并发架构:连接池与线程池

连接池优化

连接池是管理数据库、HTTP等连接的重要组件,合理的配置能显著提升系统吞吐量。

// 数据库连接池配置
@Configuration
public class DataSourceConfig {
    
    @Bean
    @ConfigurationProperties("spring.datasource.hikari")
    public HikariDataSource dataSource() {
        HikariConfig config = new HikariConfig();
        
        // 基础配置
        config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
        config.setUsername("user");
        config.setPassword("password");
        config.setDriverClassName("com.mysql.cj.jdbc.Driver");
        
        // 连接池配置
        config.setMinimumIdle(10);
        config.setMaximumPoolSize(100);
        config.setConnectionTimeout(30000);
        config.setIdleTimeout(600000);
        config.setMaxLifetime(1800000);
        
        // 连接验证
        config.setConnectionTestQuery("SELECT 1");
        config.setValidationTimeout(5000);
        
        // 性能优化
        config.addDataSourceProperty("cachePrepStmts", "true");
        config.addDataSourceProperty("prepStmtCacheSize", "250");
        config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
        config.addDataSourceProperty("useServerPrepStmts", "true");
        config.addDataSourceProperty("useLocalSessionState", "true");
        config.addDataSourceProperty("rewriteBatchedStatements", "true");
        config.addDataSourceProperty("cacheResultSetMetadata", "true");
        config.addDataSourceProperty("cacheServerConfiguration", "true");
        config.addDataSourceProperty("elideSetAutoCommits", "true");
        config.addDataSourceProperty("maintainTimeStats", "false");
        
        return new HikariDataSource(config);
    }
}

// 连接池监控
@Component
public class ConnectionPoolMonitor {
    
    private final HikariDataSource dataSource;
    private final MetricsExporter metrics;
    
    @Scheduled(fixedRate = 5000)
    public void monitorPool() {
        HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
        
        if (poolMXBean != null) {
            // 监控指标
            int activeConnections = poolMXBean.getActiveConnections();
            int idleConnections = poolMXBean.getIdleConnections();
            int totalConnections = poolMXBean.getTotalConnections();
            int threadsAwaiting = poolMXBean.getThreadsAwaitingConnection();
            
            // 导出指标
            metrics.gauge("db.pool.active", activeConnections);
            metrics.gauge("db.pool.idle", idleConnections);
            metrics.gauge("db.pool.total", totalConnections);
            metrics.gauge("db.pool.waiting", threadsAwaiting);
            
            // 告警检测
            if (threadsAwaiting > 10) {
                alertService.send(new Alert(
                    "数据库连接池等待线程过多",
                    "当前等待线程数: " + threadsAwaiting
                ));
            }
            
            if (activeConnections > poolMXBean.getMaximumPoolSize() * 0.9) {
                alertService.send(new Alert(
                    "数据库连接池使用率过高",
                    "当前使用率: " + 
                    (activeConnections * 100.0 / poolMXBean.getMaximumPoolSize()) + "%"
                ));
            }
        }
    }
}

线程池配置

// 线程池配置
@Configuration
public class ThreadPoolConfig {
    
    // IO密集型任务线程池
    @Bean("ioThreadPool")
    public ThreadPoolTaskExecutor ioThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // 核心线程数 = CPU核心数 * 2
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
        
        // 最大线程数 = CPU核心数 * 4
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
        
        // 队列容量
        executor.setQueueCapacity(1000);
        
        // 线程名前缀
        executor.setThreadNamePrefix("io-thread-");
        
        // 拒绝策略:调用者运行
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        
        // 空闲线程存活时间
        executor.setKeepAliveSeconds(60);
        
        executor.initialize();
        
        return executor;
    }
    
    // CPU密集型任务线程池
    @Bean("cpuThreadPool")
    public ThreadPoolTaskExecutor cpuThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // 核心线程数 = CPU核心数 + 1
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
        
        // 最大线程数 = CPU核心数 * 2
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        
        // 队列容量
        executor.setQueueCapacity(500);
        
        executor.setThreadNamePrefix("cpu-thread-");
        executor.setRejectedExecutionHandler(new CallerRunsPolicy());
        executor.setKeepAliveSeconds(30);
        
        executor.initialize();
        
        return executor;
    }
    
    // 定时任务线程池
    @Bean("scheduledThreadPool")
    public ThreadPoolTaskScheduler scheduledThreadPool() {
        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
        
        scheduler.setPoolSize(10);
        scheduler.setThreadNamePrefix("scheduled-");
        scheduler.setErrorHandler(t -> 
            log.error("定时任务执行异常", t));
        
        return scheduler;
    }
}

// 线程池监控
@Component
public class ThreadPoolMonitor {
    
    private final Map<String, ThreadPoolTaskExecutor> executors;
    private final MetricsExporter metrics;
    
    @Scheduled(fixedRate = 5000)
    public void monitorThreadPools() {
        executors.forEach((name, executor) -> {
            ThreadPoolExecutor threadPoolExecutor = 
                executor.getThreadPoolExecutor();
            
            // 监控指标
            int activeCount = threadPoolExecutor.getActiveCount();
            int poolSize = threadPoolExecutor.getPoolSize();
            int queueSize = threadPoolExecutor.getQueue().size();
            long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
            
            // 导出指标
            metrics.gauge("threadpool." + name + ".active", activeCount);
            metrics.gauge("threadpool." + name + ".pool_size", poolSize);
            metrics.gauge("threadpool." + name + ".queue_size", queueSize);
            metrics.gauge("threadpool." + name + ".completed", completedTaskCount);
            
            // 计算使用率
            double utilization = (double) activeCount / 
                threadPoolExecutor.getMaximumPoolSize() * 100;
            
            // 告警
            if (utilization > 80) {
                alertService.send(new Alert(
                    "线程池" + name + "使用率过高",
                    "当前使用率: " + utilization + "%"
                ));
            }
        });
    }
}

异步化处理

// 异步服务
@Service
public class AsyncProcessingService {
    
    private final TaskExecutor taskExecutor;
    private final TaskScheduler taskScheduler;
    
    // 异步任务执行
    @Async("ioThreadPool")
    public CompletableFuture<Result> processAsync(Request request) {
        try {
            Result result = performHeavyOperation(request);
            return CompletableFuture.completedFuture(result);
        } catch (Exception e) {
            return CompletableFuture.failedFuture(e);
        }
    }
    
    // 异步批量处理
    public List<CompletableFuture<Result>> processBatch(List<Request> requests) {
        return requests.stream()
            .map(request -> CompletableFuture.supplyAsync(
                () -> performHeavyOperation(request),
                taskExecutor
            ))
            .collect(Collectors.toList());
    }
    
    // 延迟执行
    public void scheduleDelayed(Runnable task, Duration delay) {
        taskScheduler.schedule(task, 
            Instant.now().plus(delay));
    }
    
    // 定时执行
    public void scheduleAtFixedRate(Runnable task, Duration interval) {
        taskScheduler.scheduleAtFixedRate(task, 
            Instant.now(), interval);
    }
}

// 事件驱动异步处理
@Component
public class EventDrivenProcessor {
    
    private final ApplicationEventPublisher eventPublisher;
    
    // 发布异步事件
    @Async
    @EventListener
    public void handleOrderCreated(OrderCreatedEvent event) {
        // 异步处理订单创建后的逻辑
        sendNotification(event.getOrder());
        updateInventory(event.getOrder());
        generateInvoice(event.getOrder());
    }
    
    // 使用CompletableFuture链式处理
    public CompletableFuture<Result> processWithFutures(Request request) {
        return CompletableFuture
            .supplyAsync(() -> validateInput(request))
            .thenApplyAsync(input -> processData(input))
            .thenApplyAsync(data -> transformData(data))
            .thenComposeAsync(data -> saveToDatabase(data))
            .exceptionally(ex -> handleError(ex));
    }
}

并发控制

// 分布式锁
@Component
public class DistributedLockManager {
    
    private final RedissonClient redisson;
    
    public <T> T executeWithLock(String lockKey, Duration timeout, 
                                  Supplier<T> task) {
        RLock lock = redisson.getLock(lockKey);
        
        try {
            // 尝试获取锁
            if (lock.tryLock(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
                try {
                    return task.get();
                } finally {
                    lock.unlock();
                }
            } else {
                throw new LockAcquisitionException("获取锁超时: " + lockKey);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new LockAcquisitionException("获取锁被中断: " + lockKey, e);
        }
    }
}

// 信号量控制并发
@Component
public class ConcurrencyLimiter {
    
    private final Semaphore semaphore;
    private final RateLimiter rateLimiter;
    
    public ConcurrencyLimiter() {
        // 限制并发数
        this.semaphore = new Semaphore(100);
        // 限流
        this.rateLimiter = RateLimiter.create(1000); // 1000 QPS
    }
    
    public <T> T execute(Request request, Supplier<T> task) {
        // 限流检查
        if (!rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
            throw new RateLimitExceededException("请求过于频繁");
        }
        
        // 并发控制
        try {
            if (semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)) {
                try {
                    return task.get();
                } finally {
                    semaphore.release();
                }
            } else {
                throw new ConcurrencyLimitExceededException("并发数已满");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ConcurrencyLimitExceededException("等待中断", e);
        }
    }
}

高并发架构通过连接池优化、线程池配置、异步化处理和并发控制,确保系统在高负载下的稳定性和性能。