分布式消息队列:Topic/分区与消费组
分布式消息队列:Topic/分区与消费组
Topic与分区模型
Topic是消息的逻辑分类,分区(Partition)是Topic的物理分片。分区是并行处理的基本单位,单个分区内消息有序,跨分区无序。
// Kafka生产者 - 发送消息
Producer<String, String> producer = new KafkaProducer<>(props);
// 指定分区发送(保证分区顺序)
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-events", // topic
orderId.hashCode() % 12, // partition
orderId, // key
orderEventJson // value
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Failed to send message", exception);
} else {
log.info("Message sent to partition {} offset {}",
metadata.partition(), metadata.offset());
}
});
// 基于Key的分区策略(相同Key到同一分区,保证顺序)
ProducerRecord<String, String> keyRecord = new ProducerRecord<>(
"order-events",
orderId, // key
orderEventJson
);
消费组与负载均衡
消费组实现消息的负载均衡和故障转移。组内消费者竞争消费分区,不同消费组独立消费。
// Kafka消费者
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "order-processing-group");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 100);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-events"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 批量处理
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value());
}
// 手动提交偏移量
consumer.commitSync();
}
消息回溯与重试
消息回溯支持重新消费历史消息,用于数据修复和功能迭代。重试机制保证消息最终消费成功。
// Kafka消费者回溯到指定时间点
func (c *Consumer) SeekToTimestamp(topic string, timestamp time.Time) error {
// 获取分区信息
partitions, err := c.client.Partitions(topic)
if err != nil {
return err
}
for _, partition := range partitions {
// 查找指定时间点的偏移量
offset, err := c.client.OffsetManager.OffsetForTime(topic, partition, timestamp)
if err != nil {
return err
}
// 重置偏移量
c.consumer.SeekOffset(topic, partition, offset)
}
return nil
}
// 消息重试策略
type RetryPolicy struct {
MaxRetries int
Delay time.Duration
BackoffFactor float64
}
func (r *RetryPolicy) Execute(msg Message, handler func(Message) error) error {
var lastErr error
for i := 0; i <= r.MaxRetries; i++ {
if err := handler(msg); err != nil {
lastErr = err
if i < r.MaxRetries {
delay := time.Duration(float64(r.Delay) * math.Pow(r.BackoffFactor, float64(i)))
time.Sleep(delay)
}
} else {
return nil
}
}
return fmt.Errorf("max retries exceeded: %w", lastErr)
}
顺序消息保证
顺序消息需要在生产端指定Key,在消费端单线程处理或加锁保证顺序。
// RocketMQ顺序消息
// 生产端
Message msg = new Message("OrderTopic", "OrderTag", orderId, orderData.getBytes());
// 使用orderId作为MessageQueueSelector的key,保证相同订单进入同一队列
producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
String key = (String) arg;
int index = key.hashCode() % mqs.size();
return mqs.get(index);
}
}, orderId);
// 消费端 - 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
processOrder(new String(msg.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
消息队列设计要点
选择消息队列时需要考虑:吞吐量(Kafka > RocketMQ > RabbitMQ)、延迟(RabbitMQ最低)、顺序消息支持(RocketMQ最佳)、事务消息(RocketMQ原生支持)、消息回溯(Kafka和RocketMQ支持)。