高并发架构:连接池与线程池
高并发架构:连接池与线程池
连接池优化
连接池是管理数据库、HTTP等连接的重要组件,合理的配置能显著提升系统吞吐量。
// 数据库连接池配置
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties("spring.datasource.hikari")
public HikariDataSource dataSource() {
HikariConfig config = new HikariConfig();
// 基础配置
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb");
config.setUsername("user");
config.setPassword("password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池配置
config.setMinimumIdle(10);
config.setMaximumPoolSize(100);
config.setConnectionTimeout(30000);
config.setIdleTimeout(600000);
config.setMaxLifetime(1800000);
// 连接验证
config.setConnectionTestQuery("SELECT 1");
config.setValidationTimeout(5000);
// 性能优化
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
config.addDataSourceProperty("rewriteBatchedStatements", "true");
config.addDataSourceProperty("cacheResultSetMetadata", "true");
config.addDataSourceProperty("cacheServerConfiguration", "true");
config.addDataSourceProperty("elideSetAutoCommits", "true");
config.addDataSourceProperty("maintainTimeStats", "false");
return new HikariDataSource(config);
}
}
// 连接池监控
@Component
public class ConnectionPoolMonitor {
private final HikariDataSource dataSource;
private final MetricsExporter metrics;
@Scheduled(fixedRate = 5000)
public void monitorPool() {
HikariPoolMXBean poolMXBean = dataSource.getHikariPoolMXBean();
if (poolMXBean != null) {
// 监控指标
int activeConnections = poolMXBean.getActiveConnections();
int idleConnections = poolMXBean.getIdleConnections();
int totalConnections = poolMXBean.getTotalConnections();
int threadsAwaiting = poolMXBean.getThreadsAwaitingConnection();
// 导出指标
metrics.gauge("db.pool.active", activeConnections);
metrics.gauge("db.pool.idle", idleConnections);
metrics.gauge("db.pool.total", totalConnections);
metrics.gauge("db.pool.waiting", threadsAwaiting);
// 告警检测
if (threadsAwaiting > 10) {
alertService.send(new Alert(
"数据库连接池等待线程过多",
"当前等待线程数: " + threadsAwaiting
));
}
if (activeConnections > poolMXBean.getMaximumPoolSize() * 0.9) {
alertService.send(new Alert(
"数据库连接池使用率过高",
"当前使用率: " +
(activeConnections * 100.0 / poolMXBean.getMaximumPoolSize()) + "%"
));
}
}
}
}
线程池配置
// 线程池配置
@Configuration
public class ThreadPoolConfig {
// IO密集型任务线程池
@Bean("ioThreadPool")
public ThreadPoolTaskExecutor ioThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数 = CPU核心数 * 2
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 最大线程数 = CPU核心数 * 4
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 4);
// 队列容量
executor.setQueueCapacity(1000);
// 线程名前缀
executor.setThreadNamePrefix("io-thread-");
// 拒绝策略:调用者运行
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
// 空闲线程存活时间
executor.setKeepAliveSeconds(60);
executor.initialize();
return executor;
}
// CPU密集型任务线程池
@Bean("cpuThreadPool")
public ThreadPoolTaskExecutor cpuThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数 = CPU核心数 + 1
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
// 最大线程数 = CPU核心数 * 2
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
// 队列容量
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("cpu-thread-");
executor.setRejectedExecutionHandler(new CallerRunsPolicy());
executor.setKeepAliveSeconds(30);
executor.initialize();
return executor;
}
// 定时任务线程池
@Bean("scheduledThreadPool")
public ThreadPoolTaskScheduler scheduledThreadPool() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(10);
scheduler.setThreadNamePrefix("scheduled-");
scheduler.setErrorHandler(t ->
log.error("定时任务执行异常", t));
return scheduler;
}
}
// 线程池监控
@Component
public class ThreadPoolMonitor {
private final Map<String, ThreadPoolTaskExecutor> executors;
private final MetricsExporter metrics;
@Scheduled(fixedRate = 5000)
public void monitorThreadPools() {
executors.forEach((name, executor) -> {
ThreadPoolExecutor threadPoolExecutor =
executor.getThreadPoolExecutor();
// 监控指标
int activeCount = threadPoolExecutor.getActiveCount();
int poolSize = threadPoolExecutor.getPoolSize();
int queueSize = threadPoolExecutor.getQueue().size();
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
// 导出指标
metrics.gauge("threadpool." + name + ".active", activeCount);
metrics.gauge("threadpool." + name + ".pool_size", poolSize);
metrics.gauge("threadpool." + name + ".queue_size", queueSize);
metrics.gauge("threadpool." + name + ".completed", completedTaskCount);
// 计算使用率
double utilization = (double) activeCount /
threadPoolExecutor.getMaximumPoolSize() * 100;
// 告警
if (utilization > 80) {
alertService.send(new Alert(
"线程池" + name + "使用率过高",
"当前使用率: " + utilization + "%"
));
}
});
}
}
异步化处理
// 异步服务
@Service
public class AsyncProcessingService {
private final TaskExecutor taskExecutor;
private final TaskScheduler taskScheduler;
// 异步任务执行
@Async("ioThreadPool")
public CompletableFuture<Result> processAsync(Request request) {
try {
Result result = performHeavyOperation(request);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
// 异步批量处理
public List<CompletableFuture<Result>> processBatch(List<Request> requests) {
return requests.stream()
.map(request -> CompletableFuture.supplyAsync(
() -> performHeavyOperation(request),
taskExecutor
))
.collect(Collectors.toList());
}
// 延迟执行
public void scheduleDelayed(Runnable task, Duration delay) {
taskScheduler.schedule(task,
Instant.now().plus(delay));
}
// 定时执行
public void scheduleAtFixedRate(Runnable task, Duration interval) {
taskScheduler.scheduleAtFixedRate(task,
Instant.now(), interval);
}
}
// 事件驱动异步处理
@Component
public class EventDrivenProcessor {
private final ApplicationEventPublisher eventPublisher;
// 发布异步事件
@Async
@EventListener
public void handleOrderCreated(OrderCreatedEvent event) {
// 异步处理订单创建后的逻辑
sendNotification(event.getOrder());
updateInventory(event.getOrder());
generateInvoice(event.getOrder());
}
// 使用CompletableFuture链式处理
public CompletableFuture<Result> processWithFutures(Request request) {
return CompletableFuture
.supplyAsync(() -> validateInput(request))
.thenApplyAsync(input -> processData(input))
.thenApplyAsync(data -> transformData(data))
.thenComposeAsync(data -> saveToDatabase(data))
.exceptionally(ex -> handleError(ex));
}
}
并发控制
// 分布式锁
@Component
public class DistributedLockManager {
private final RedissonClient redisson;
public <T> T executeWithLock(String lockKey, Duration timeout,
Supplier<T> task) {
RLock lock = redisson.getLock(lockKey);
try {
// 尝试获取锁
if (lock.tryLock(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
try {
return task.get();
} finally {
lock.unlock();
}
} else {
throw new LockAcquisitionException("获取锁超时: " + lockKey);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockAcquisitionException("获取锁被中断: " + lockKey, e);
}
}
}
// 信号量控制并发
@Component
public class ConcurrencyLimiter {
private final Semaphore semaphore;
private final RateLimiter rateLimiter;
public ConcurrencyLimiter() {
// 限制并发数
this.semaphore = new Semaphore(100);
// 限流
this.rateLimiter = RateLimiter.create(1000); // 1000 QPS
}
public <T> T execute(Request request, Supplier<T> task) {
// 限流检查
if (!rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
throw new RateLimitExceededException("请求过于频繁");
}
// 并发控制
try {
if (semaphore.tryAcquire(100, TimeUnit.MILLISECONDS)) {
try {
return task.get();
} finally {
semaphore.release();
}
} else {
throw new ConcurrencyLimitExceededException("并发数已满");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ConcurrencyLimitExceededException("等待中断", e);
}
}
}
高并发架构通过连接池优化、线程池配置、异步化处理和并发控制,确保系统在高负载下的稳定性和性能。