← 返回首页
📨

消息队列与异步通信

📂 devops ⏱ 2 min 294 words

消息队列与异步通信

消息队列概念

概念 说明
Producer 消息生产者
Consumer 消息消费者
Broker 消息代理
Topic/Queue 消息主题/队列
Partition 分区

Kafka

安装Kafka

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

  schema-registry:
    image: confluentinc/cp-schema-registry:7.4.0
    depends_on:
      - kafka
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092

生产者

from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

# 发送消息
producer.send('user-events', {
    'user_id': '12345',
    'event': 'login',
    'timestamp': '2024-01-01T12:00:00Z'
})

producer.flush()

消费者

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'user-events',
    bootstrap_servers=['kafka:9092'],
    group_id='user-service',
    value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

for message in consumer:
    event = message.value
    print(f"Processing event: {event}")
    # 处理事件

RabbitMQ

安装RabbitMQ

version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret

生产者

import pika
import json

connection = pika.BlockingConnection(
    pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = {
    'task_id': '12345',
    'action': 'send_email',
    'data': {'to': 'user@example.com'}
}

channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2,  # 持久化
    )
)

connection.close()

消费者

import pika
import json
import time

def callback(ch, method, properties, body):
    message = json.loads(body)
    print(f"Processing: {message}")
    time.sleep(1)  # 模拟处理
    ch.basic_ack(delivery_tag=method.delivery_tag)

connection = pika.BlockingConnection(
    pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

print('Waiting for messages...')
channel.start_consuming()

实践:事件驱动架构

# docker-compose.yml
version: '3.8'

services:
  kafka:
    image: confluentinc/cp-kafka:7.4.0
    ports:
      - "9092:9092"

  order-service:
    build: ./order-service
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092

  inventory-service:
    build: ./inventory-service
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092

  notification-service:
    build: ./notification-service
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:9092

事件流

Order Service → order.created → Kafka
                                  ↓
                    Inventory Service (扣减库存)
                    Notification Service (发送通知)

消息模式

发布/订阅

Producer → Topic → Consumer Group 1
                 → Consumer Group 2

点对点

Producer → Queue → Consumer 1
                  → Consumer 2 (竞争消费)

死信队列

# RabbitMQ死信队列
channel.exchange_declare(exchange='dlx_exchange', durable=True)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue')

channel.queue_declare(
    queue='main_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx_exchange',
        'x-dead-letter-routing-key': 'dlx_queue'
    }
)

最佳实践

  1. 消息持久化
  2. 消费者幂等性
  3. 死信队列处理失败消息
  4. 监控消息积压
  5. 实现优雅关闭

总结

消息队列是实现异步通信和解耦的关键组件。Kafka和RabbitMQ是主流的消息队列系统,适用于不同的使用场景。