← 返回首页
🌐

分布式消息队列:Topic/分区与消费组

📂 architecture ⏱ 2 min 338 words

分布式消息队列:Topic/分区与消费组

Topic与分区模型

Topic是消息的逻辑分类,分区(Partition)是Topic的物理分片。分区是并行处理的基本单位,单个分区内消息有序,跨分区无序。

// Kafka生产者 - 发送消息
Producer<String, String> producer = new KafkaProducer<>(props);

// 指定分区发送(保证分区顺序)
ProducerRecord<String, String> record = new ProducerRecord<>(
    "order-events",           // topic
    orderId.hashCode() % 12,  // partition
    orderId,                  // key
    orderEventJson            // value
);

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("Failed to send message", exception);
    } else {
        log.info("Message sent to partition {} offset {}", 
            metadata.partition(), metadata.offset());
    }
});

// 基于Key的分区策略(相同Key到同一分区,保证顺序)
ProducerRecord<String, String> keyRecord = new ProducerRecord<>(
    "order-events",
    orderId,     // key
    orderEventJson
);

消费组与负载均衡

消费组实现消息的负载均衡和故障转移。组内消费者竞争消费分区,不同消费组独立消费。

// Kafka消费者
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "order-processing-group");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
props.put("max.poll.records", 100);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("order-events"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    // 批量处理
    for (ConsumerRecord<String, String> record : records) {
        processOrder(record.value());
    }
    
    // 手动提交偏移量
    consumer.commitSync();
}

消息回溯与重试

消息回溯支持重新消费历史消息,用于数据修复和功能迭代。重试机制保证消息最终消费成功。

// Kafka消费者回溯到指定时间点
func (c *Consumer) SeekToTimestamp(topic string, timestamp time.Time) error {
    // 获取分区信息
    partitions, err := c.client.Partitions(topic)
    if err != nil {
        return err
    }
    
    for _, partition := range partitions {
        // 查找指定时间点的偏移量
        offset, err := c.client.OffsetManager.OffsetForTime(topic, partition, timestamp)
        if err != nil {
            return err
        }
        
        // 重置偏移量
        c.consumer.SeekOffset(topic, partition, offset)
    }
    return nil
}

// 消息重试策略
type RetryPolicy struct {
    MaxRetries  int
    Delay       time.Duration
    BackoffFactor float64
}

func (r *RetryPolicy) Execute(msg Message, handler func(Message) error) error {
    var lastErr error
    for i := 0; i <= r.MaxRetries; i++ {
        if err := handler(msg); err != nil {
            lastErr = err
            if i < r.MaxRetries {
                delay := time.Duration(float64(r.Delay) * math.Pow(r.BackoffFactor, float64(i)))
                time.Sleep(delay)
            }
        } else {
            return nil
        }
    }
    return fmt.Errorf("max retries exceeded: %w", lastErr)
}

顺序消息保证

顺序消息需要在生产端指定Key,在消费端单线程处理或加锁保证顺序。

// RocketMQ顺序消息
// 生产端
Message msg = new Message("OrderTopic", "OrderTag", orderId, orderData.getBytes());
// 使用orderId作为MessageQueueSelector的key,保证相同订单进入同一队列
producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        String key = (String) arg;
        int index = key.hashCode() % mqs.size();
        return mqs.get(index);
    }
}, orderId);

// 消费端 - 顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            processOrder(new String(msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
});

消息队列设计要点

选择消息队列时需要考虑:吞吐量(Kafka > RocketMQ > RabbitMQ)、延迟(RabbitMQ最低)、顺序消息支持(RocketMQ最佳)、事务消息(RocketMQ原生支持)、消息回溯(Kafka和RocketMQ支持)。