WebSocket架构
WebSocket架构
WebSocket连接管理
通过STOMP协议实现消息订阅和发布,支持主题广播和点对点通信。
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
@Controller
public class ChatController {
@MessageMapping("/chat.send")
@SendTo("/topic/messages")
public ChatMessage sendMessage(ChatMessage message) {
message.setTimestamp(Instant.now());
return message;
}
@MessageMapping("/chat.private")
public void sendPrivateMessage(ChatMessage message,
SimpMessageHeaderAccessor headerAccessor) {
messagingTemplate.convertAndSendToUser(
message.getRecipient(), "/queue/private", message);
}
}
心跳机制
客户端定期发送心跳包,服务端检测超时断开连接。
@Component
public class WebSocketHeartbeatHandler {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@Scheduled(fixedRate = 30000)
public void checkConnections() {
sessions.forEach((sessionId, session) -> {
if (System.currentTimeMillis() - session.getLastHeartbeat() > 60000) {
session.close(CloseStatus.GOING_AWAY);
sessions.remove(sessionId);
}
});
}
@MessageMapping("/heartbeat")
public void handleHeartbeat(Principal principal) {
String sessionId = ((SimpUser) principal).getName();
if (sessions.containsKey(sessionId)) {
sessions.get(sessionId).setLastHeartbeat(System.currentTimeMillis());
}
}
}
集群方案
使用Redis Pub/Sub实现WebSocket集群消息同步。
@Configuration
public class RedisClusterConfig {
@Bean
public RedisMessageListenerContainer messageListenerContainer(
RedisConnectionFactory connectionFactory,
WebSocketClusterListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listener, new ChannelTopic("ws-cluster"));
return container;
}
}
@Service
public class WebSocketClusterService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
public void broadcastToCluster(String message) {
redisTemplate.convertAndSend("ws-cluster", message);
}
}
客户端实现
const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);
stompClient.connect({}, (frame) => {
// 订阅公共主题
stompClient.subscribe('/topic/messages', (message) => {
showMessage(JSON.parse(message.body));
});
// 订阅私有队列
stompClient.subscribe('/user/queue/private', (message) => {
showPrivateMessage(JSON.parse(message.body));
});
// 发送心跳
setInterval(() => {
stompClient.send("/app/heartbeat", {}, JSON.stringify({}));
}, 30000);
});