← 返回首页
💼

IM架构:消息投递、已读回执与群聊

📂 architecture ⏱ 3 min 508 words

IM架构:消息投递、已读回执与群聊

IM系统架构概览

IM系统是实时性要求最高的系统之一,核心挑战是保证消息的可靠投递、有序性和实时性。主要模块包括:连接管理、消息收发、消息存储、群聊系统、已读回执。

IM系统架构:

接入层:长连接网关(WebSocket/TCP)、负载均衡
连接层:连接管理、心跳检测、断线重连
业务层:
  - 消息服务:消息收发、存储、同步
  - 群组服务:群管理、成员管理、消息分发
  - 已读回执:已读状态同步、未读计数
  - 推送服务:离线推送、消息提醒
数据层:MySQL、Redis、HBase、MongoDB

消息投递流程

IM系统的核心是消息投递,需要保证消息不丢失、不重复、有序。典型的流程是:发送方→服务端→接收方。

// 消息发送服务
@Service
public class MessageService {
    @Autowired
    private MessageRepository messageRepo;
    @Autowired
    private ConnectionManager connectionManager;
    @Autowired
    private OfflineMessageService offlineService;
    
    public SendMessageResult sendMessage(SendMessageRequest request) {
        // 1. 生成消息ID(全局唯一)
        String messageId = generateMessageId();
        
        // 2. 保存消息到存储
        Message message = Message.builder()
            .messageId(messageId)
            .senderId(request.getSenderId())
            .receiverId(request.getReceiverId())
            .content(request.getContent())
            .timestamp(System.currentTimeMillis())
            .build();
        
        messageRepo.save(message);
        
        // 3. 尝试投递
        boolean delivered = connectionManager.deliver(request.getReceiverId(), message);
        
        if (!delivered) {
            // 4. 投递失败,保存离线消息
            offlineService.saveOfflineMessage(request.getReceiverId(), message);
        }
        
        // 5. 返回发送结果
        return SendMessageResult.success(messageId, delivered);
    }
}

// 连接管理器
@Component
public class ConnectionManager {
    private final Map<Long, Channel> connections = new ConcurrentHashMap<>();
    
    public boolean deliver(Long userId, Message message) {
        Channel channel = connections.get(userId);
        if (channel != null && channel.isActive()) {
            try {
                channel.writeAndFlush(message);
                return true;
            } catch (Exception e) {
                log.error("Deliver message failed", e);
                return false;
            }
        }
        return false;
    }
    
    public void addConnection(Long userId, Channel channel) {
        Channel oldChannel = connections.put(userId, channel);
        if (oldChannel != null) {
            oldChannel.close();
        }
    }
    
    public void removeConnection(Long userId) {
        connections.remove(userId);
    }
}

消息同步机制

用户在多设备登录时,需要保证消息同步。常用方案是基于同步点(sync_key)的增量同步。

# 消息同步服务
class MessageSyncService:
    def __init__(self):
        self.mysql = MySQLStorage()
        self.redis = RedisCache()
    
    def get_latest_messages(self, user_id, sync_key):
        # 获取sync_key之后的所有消息
        messages = self.mysql.get_messages_after(user_id, sync_key)
        
        # 更新sync_key
        if messages:
            new_sync_key = messages[-1].sequence
            self.update_sync_key(user_id, new_sync_key)
        
        return messages
    
    def sync_messages(self, user_id, device_id, last_sync_key):
        # 1. 获取未同步消息
        messages = self.get_latest_messages(user_id, last_sync_key)
        
        # 2. 获取未读计数
        unread_counts = self.get_unread_counts(user_id)
        
        # 3. 构建同步响应
        return SyncResponse(
            messages=messages,
            unread_counts=unread_counts,
            has_more=len(messages) >= 100
        )
    
    def update_sync_key(self, user_id, sync_key):
        self.redis.set(f"sync_key:{user_id}", sync_key)

已读回执设计

已读回执用于同步消息的已读状态,支持已读未读计数、批量已读等功能。

// 已读回执服务
@Service
public class ReadReceiptService {
    @Autowired
    private ReadReceiptRepository receiptRepo;
    @Autowired
    private UnreadCountService unreadService;
    
    // 标记单条消息已读
    public void markAsRead(Long userId, String messageId) {
        // 1. 保存已读状态
        ReadReceipt receipt = ReadReceipt.builder()
            .userId(userId)
            .messageId(messageId)
            .readTime(System.currentTimeMillis())
            .build();
        
        receiptRepo.save(receipt);
        
        // 2. 更新未读计数
        unreadService.decrementUnread(userId, getMessageSenderId(messageId));
    }
    
    // 批量标记已读(同步到某条消息)
    public void markAsReadUpTo(Long userId, Long peerId, String messageId) {
        // 1. 获取该会话的所有未读消息
        List<Message> unreadMessages = getUnreadMessages(userId, peerId);
        
        // 2. 批量保存已读状态
        List<ReadReceipt> receipts = unreadMessages.stream()
            .map(msg -> ReadReceipt.builder()
                .userId(userId)
                .messageId(msg.getId())
                .readTime(System.currentTimeMillis())
                .build())
            .collect(Collectors.toList());
        
        receiptRepo.batchSave(receipts);
        
        // 3. 更新未读计数为0
        unreadService.resetUnread(userId, peerId);
    }
    
    // 获取已读状态
    public ReadStatus getMessageReadStatus(String messageId) {
        int readCount = receiptRepo.getReadCount(messageId);
        List<Long> readUsers = receiptRepo.getReadUsers(messageId);
        return new ReadStatus(readCount, readUsers);
    }
}

群聊系统设计

群聊是IM系统的重要功能,需要处理消息广播、群成员管理、群消息同步等问题。

# 群聊消息分发
class GroupMessageService:
    def __init__(self):
        self.group_repo = GroupRepository()
        self.connection_manager = ConnectionManager()
        self.offline_service = OfflineMessageService()
    
    def send_group_message(self, group_id, sender_id, content):
        # 1. 验证发送者是否在群中
        if not self.group_repo.is_member(group_id, sender_id):
            raise PermissionError("Not a group member")
        
        # 2. 生成消息
        message = Message(
            group_id=group_id,
            sender_id=sender_id,
            content=content,
            timestamp=time.time()
        )
        
        # 3. 保存消息
        self.group_repo.save_message(message)
        
        # 4. 分发给在线成员
        members = self.group_repo.get_members(group_id)
        delivered = []
        offline = []
        
        for member_id in members:
            if member_id == sender_id:
                continue
            
            if self.connection_manager.deliver(member_id, message):
                delivered.append(member_id)
            else:
                offline.append(member_id)
        
        # 5. 离线消息推送
        for member_id in offline:
            self.offline_service.save_offline_message(member_id, message)
        
        return {
            "message_id": message.id,
            "delivered_count": len(delivered),
            "offline_count": len(offline)
        }

消息存储设计

IM系统需要高效存储海量消息,同时支持历史消息查询。常用方案是MySQL+HBase的组合存储。

// 消息存储服务
@Service
public class MessageStorageService {
    @Autowired
    private MySQLMessageStore mysqlStore;
    @Autowired
    private HBaseMessageStore hbaseStore;
    
    // 存储消息
    public void storeMessage(Message message) {
        // 1. 写入MySQL(近期消息)
        mysqlStore.save(message);
        
        // 2. 异步写入HBase(历史消息)
        asyncWriteToHBase(message);
        
        // 3. 设置MySQL消息过期清理任务
        scheduleCleanup(message);
    }
    
    // 查询历史消息
    public List<Message> getHistoryMessages(Long userId, Long peerId, 
                                           int limit, String cursor) {
        // 1. 优先从MySQL查询(近7天)
        List<Message> messages = mysqlStore.getMessages(userId, peerId, limit, cursor);
        
        if (messages.size() < limit) {
            // 2. MySQL数据不足,从HBase查询
            List<Message> hbaseMessages = hbaseStore.getMessages(
                userId, peerId, limit - messages.size(), cursor);
            messages.addAll(hbaseMessages);
        }
        
        return messages;
    }
}