消息队列与异步通信
消息队列与异步通信
消息队列概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Broker | 消息代理 |
| Topic/Queue | 消息主题/队列 |
| Partition | 分区 |
Kafka
安装Kafka
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
生产者
from kafka import KafkaProducer
import json
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# 发送消息
producer.send('user-events', {
'user_id': '12345',
'event': 'login',
'timestamp': '2024-01-01T12:00:00Z'
})
producer.flush()
消费者
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['kafka:9092'],
group_id='user-service',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
event = message.value
print(f"Processing event: {event}")
# 处理事件
RabbitMQ
安装RabbitMQ
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
生产者
import pika
import json
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = {
'task_id': '12345',
'action': 'send_email',
'data': {'to': 'user@example.com'}
}
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
)
)
connection.close()
消费者
import pika
import json
import time
def callback(ch, method, properties, body):
message = json.loads(body)
print(f"Processing: {message}")
time.sleep(1) # 模拟处理
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq')
)
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print('Waiting for messages...')
channel.start_consuming()
实践:事件驱动架构
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.4.0
ports:
- "9092:9092"
order-service:
build: ./order-service
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
inventory-service:
build: ./inventory-service
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
notification-service:
build: ./notification-service
environment:
- KAFKA_BOOTSTRAP_SERVERS=kafka:9092
事件流
Order Service → order.created → Kafka
↓
Inventory Service (扣减库存)
Notification Service (发送通知)
消息模式
发布/订阅
Producer → Topic → Consumer Group 1
→ Consumer Group 2
点对点
Producer → Queue → Consumer 1
→ Consumer 2 (竞争消费)
死信队列
# RabbitMQ死信队列
channel.exchange_declare(exchange='dlx_exchange', durable=True)
channel.queue_declare(queue='dlx_queue', durable=True)
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue')
channel.queue_declare(
queue='main_queue',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx_exchange',
'x-dead-letter-routing-key': 'dlx_queue'
}
)
最佳实践
- 消息持久化
- 消费者幂等性
- 死信队列处理失败消息
- 监控消息积压
- 实现优雅关闭
总结
消息队列是实现异步通信和解耦的关键组件。Kafka和RabbitMQ是主流的消息队列系统,适用于不同的使用场景。