IM架构:消息投递、已读回执与群聊
IM架构:消息投递、已读回执与群聊
IM系统架构概览
IM系统是实时性要求最高的系统之一,核心挑战是保证消息的可靠投递、有序性和实时性。主要模块包括:连接管理、消息收发、消息存储、群聊系统、已读回执。
IM系统架构:
接入层:长连接网关(WebSocket/TCP)、负载均衡
连接层:连接管理、心跳检测、断线重连
业务层:
- 消息服务:消息收发、存储、同步
- 群组服务:群管理、成员管理、消息分发
- 已读回执:已读状态同步、未读计数
- 推送服务:离线推送、消息提醒
数据层:MySQL、Redis、HBase、MongoDB
消息投递流程
IM系统的核心是消息投递,需要保证消息不丢失、不重复、有序。典型的流程是:发送方→服务端→接收方。
// 消息发送服务
@Service
public class MessageService {
@Autowired
private MessageRepository messageRepo;
@Autowired
private ConnectionManager connectionManager;
@Autowired
private OfflineMessageService offlineService;
public SendMessageResult sendMessage(SendMessageRequest request) {
// 1. 生成消息ID(全局唯一)
String messageId = generateMessageId();
// 2. 保存消息到存储
Message message = Message.builder()
.messageId(messageId)
.senderId(request.getSenderId())
.receiverId(request.getReceiverId())
.content(request.getContent())
.timestamp(System.currentTimeMillis())
.build();
messageRepo.save(message);
// 3. 尝试投递
boolean delivered = connectionManager.deliver(request.getReceiverId(), message);
if (!delivered) {
// 4. 投递失败,保存离线消息
offlineService.saveOfflineMessage(request.getReceiverId(), message);
}
// 5. 返回发送结果
return SendMessageResult.success(messageId, delivered);
}
}
// 连接管理器
@Component
public class ConnectionManager {
private final Map<Long, Channel> connections = new ConcurrentHashMap<>();
public boolean deliver(Long userId, Message message) {
Channel channel = connections.get(userId);
if (channel != null && channel.isActive()) {
try {
channel.writeAndFlush(message);
return true;
} catch (Exception e) {
log.error("Deliver message failed", e);
return false;
}
}
return false;
}
public void addConnection(Long userId, Channel channel) {
Channel oldChannel = connections.put(userId, channel);
if (oldChannel != null) {
oldChannel.close();
}
}
public void removeConnection(Long userId) {
connections.remove(userId);
}
}
消息同步机制
用户在多设备登录时,需要保证消息同步。常用方案是基于同步点(sync_key)的增量同步。
# 消息同步服务
class MessageSyncService:
def __init__(self):
self.mysql = MySQLStorage()
self.redis = RedisCache()
def get_latest_messages(self, user_id, sync_key):
# 获取sync_key之后的所有消息
messages = self.mysql.get_messages_after(user_id, sync_key)
# 更新sync_key
if messages:
new_sync_key = messages[-1].sequence
self.update_sync_key(user_id, new_sync_key)
return messages
def sync_messages(self, user_id, device_id, last_sync_key):
# 1. 获取未同步消息
messages = self.get_latest_messages(user_id, last_sync_key)
# 2. 获取未读计数
unread_counts = self.get_unread_counts(user_id)
# 3. 构建同步响应
return SyncResponse(
messages=messages,
unread_counts=unread_counts,
has_more=len(messages) >= 100
)
def update_sync_key(self, user_id, sync_key):
self.redis.set(f"sync_key:{user_id}", sync_key)
已读回执设计
已读回执用于同步消息的已读状态,支持已读未读计数、批量已读等功能。
// 已读回执服务
@Service
public class ReadReceiptService {
@Autowired
private ReadReceiptRepository receiptRepo;
@Autowired
private UnreadCountService unreadService;
// 标记单条消息已读
public void markAsRead(Long userId, String messageId) {
// 1. 保存已读状态
ReadReceipt receipt = ReadReceipt.builder()
.userId(userId)
.messageId(messageId)
.readTime(System.currentTimeMillis())
.build();
receiptRepo.save(receipt);
// 2. 更新未读计数
unreadService.decrementUnread(userId, getMessageSenderId(messageId));
}
// 批量标记已读(同步到某条消息)
public void markAsReadUpTo(Long userId, Long peerId, String messageId) {
// 1. 获取该会话的所有未读消息
List<Message> unreadMessages = getUnreadMessages(userId, peerId);
// 2. 批量保存已读状态
List<ReadReceipt> receipts = unreadMessages.stream()
.map(msg -> ReadReceipt.builder()
.userId(userId)
.messageId(msg.getId())
.readTime(System.currentTimeMillis())
.build())
.collect(Collectors.toList());
receiptRepo.batchSave(receipts);
// 3. 更新未读计数为0
unreadService.resetUnread(userId, peerId);
}
// 获取已读状态
public ReadStatus getMessageReadStatus(String messageId) {
int readCount = receiptRepo.getReadCount(messageId);
List<Long> readUsers = receiptRepo.getReadUsers(messageId);
return new ReadStatus(readCount, readUsers);
}
}
群聊系统设计
群聊是IM系统的重要功能,需要处理消息广播、群成员管理、群消息同步等问题。
# 群聊消息分发
class GroupMessageService:
def __init__(self):
self.group_repo = GroupRepository()
self.connection_manager = ConnectionManager()
self.offline_service = OfflineMessageService()
def send_group_message(self, group_id, sender_id, content):
# 1. 验证发送者是否在群中
if not self.group_repo.is_member(group_id, sender_id):
raise PermissionError("Not a group member")
# 2. 生成消息
message = Message(
group_id=group_id,
sender_id=sender_id,
content=content,
timestamp=time.time()
)
# 3. 保存消息
self.group_repo.save_message(message)
# 4. 分发给在线成员
members = self.group_repo.get_members(group_id)
delivered = []
offline = []
for member_id in members:
if member_id == sender_id:
continue
if self.connection_manager.deliver(member_id, message):
delivered.append(member_id)
else:
offline.append(member_id)
# 5. 离线消息推送
for member_id in offline:
self.offline_service.save_offline_message(member_id, message)
return {
"message_id": message.id,
"delivered_count": len(delivered),
"offline_count": len(offline)
}
消息存储设计
IM系统需要高效存储海量消息,同时支持历史消息查询。常用方案是MySQL+HBase的组合存储。
// 消息存储服务
@Service
public class MessageStorageService {
@Autowired
private MySQLMessageStore mysqlStore;
@Autowired
private HBaseMessageStore hbaseStore;
// 存储消息
public void storeMessage(Message message) {
// 1. 写入MySQL(近期消息)
mysqlStore.save(message);
// 2. 异步写入HBase(历史消息)
asyncWriteToHBase(message);
// 3. 设置MySQL消息过期清理任务
scheduleCleanup(message);
}
// 查询历史消息
public List<Message> getHistoryMessages(Long userId, Long peerId,
int limit, String cursor) {
// 1. 优先从MySQL查询(近7天)
List<Message> messages = mysqlStore.getMessages(userId, peerId, limit, cursor);
if (messages.size() < limit) {
// 2. MySQL数据不足,从HBase查询
List<Message> hbaseMessages = hbaseStore.getMessages(
userId, peerId, limit - messages.size(), cursor);
messages.addAll(hbaseMessages);
}
return messages;
}
}