← 返回首页
🔌

WebSocket架构

📂 architecture ⏱ 2 min 207 words

WebSocket架构

WebSocket连接管理

通过STOMP协议实现消息订阅和发布,支持主题广播和点对点通信。

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic", "/queue");
        config.setApplicationDestinationPrefixes("/app");
        config.setUserDestinationPrefix("/user");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws")
            .setAllowedOriginPatterns("*")
            .withSockJS();
    }
}
@Controller
public class ChatController {

    @MessageMapping("/chat.send")
    @SendTo("/topic/messages")
    public ChatMessage sendMessage(ChatMessage message) {
        message.setTimestamp(Instant.now());
        return message;
    }

    @MessageMapping("/chat.private")
    public void sendPrivateMessage(ChatMessage message,
                                   SimpMessageHeaderAccessor headerAccessor) {
        messagingTemplate.convertAndSendToUser(
            message.getRecipient(), "/queue/private", message);
    }
}

心跳机制

客户端定期发送心跳包,服务端检测超时断开连接。

@Component
public class WebSocketHeartbeatHandler {

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    @Scheduled(fixedRate = 30000)
    public void checkConnections() {
        sessions.forEach((sessionId, session) -> {
            if (System.currentTimeMillis() - session.getLastHeartbeat() > 60000) {
                session.close(CloseStatus.GOING_AWAY);
                sessions.remove(sessionId);
            }
        });
    }

    @MessageMapping("/heartbeat")
    public void handleHeartbeat(Principal principal) {
        String sessionId = ((SimpUser) principal).getName();
        if (sessions.containsKey(sessionId)) {
            sessions.get(sessionId).setLastHeartbeat(System.currentTimeMillis());
        }
    }
}

集群方案

使用Redis Pub/Sub实现WebSocket集群消息同步。

@Configuration
public class RedisClusterConfig {

    @Bean
    public RedisMessageListenerContainer messageListenerContainer(
            RedisConnectionFactory connectionFactory,
            WebSocketClusterListener listener) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listener, new ChannelTopic("ws-cluster"));
        return container;
    }
}

@Service
public class WebSocketClusterService {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    public void broadcastToCluster(String message) {
        redisTemplate.convertAndSend("ws-cluster", message);
    }
}

客户端实现

const socket = new SockJS('/ws');
const stompClient = Stomp.over(socket);

stompClient.connect({}, (frame) => {
    // 订阅公共主题
    stompClient.subscribe('/topic/messages', (message) => {
        showMessage(JSON.parse(message.body));
    });

    // 订阅私有队列
    stompClient.subscribe('/user/queue/private', (message) => {
        showPrivateMessage(JSON.parse(message.body));
    });

    // 发送心跳
    setInterval(() => {
        stompClient.send("/app/heartbeat", {}, JSON.stringify({}));
    }, 30000);
});