支付架构:清算、对账、风控与幂等
支付架构:清算、对账、风控与幂等
支付系统架构概览
支付系统是资金流转的核心,需要保证极高的安全性和可靠性。主要模块包括:收单、支付路由、清算、对账、风控、账户系统。
支付系统架构:
接入层:商户接入、API网关、安全认证
业务层:
- 收单服务:订单创建、支付请求、支付结果
- 支付路由:渠道选择、费率计算、限额控制
- 清算服务:交易清分、资金划拨、结算
- 对账服务:渠道对账、内部对账、差错处理
- 风控服务:交易监控、风险识别、拦截处置
- 账户服务:账户管理、余额、流水
数据层:MySQL、Redis、消息队列
清算对账流程
清算是支付的核心环节,需要保证资金准确无误。典型流程包括:交易清分、资金划拨、结算到账。
// 清算服务
@Service
public class SettlementService {
@Autowired
private TransactionRepository transRepo;
@Autowired
private AccountService accountService;
// 日终清算
@Scheduled(cron = "0 0 2 * * ?")
public void dailySettlement() {
// 1. 获取当日交易
List<Transaction> transactions = transRepo.getTodayTransactions();
// 2. 按商户清分
Map<Long, List<Transaction>> merchantGroups = transactions.stream()
.collect(Collectors.groupingBy(Transaction::getMerchantId));
// 3. 逐商户结算
for (Map.Entry<Long, List<Transaction>> entry : merchantGroups.entrySet()) {
settleMerchant(entry.getKey(), entry.getValue());
}
}
private void settleMerchant(Long merchantId, List<Transaction> transactions) {
// 计算应收金额(扣除手续费)
BigDecimal totalAmount = transactions.stream()
.map(Transaction::getAmount)
.reduce(BigDecimal.ZERO, BigDecimal::add);
BigDecimal fee = calculateFee(merchantId, totalAmount);
BigDecimal settleAmount = totalAmount.subtract(fee);
// 生成结算单
SettlementOrder order = SettlementOrder.builder()
.merchantId(merchantId)
.totalAmount(totalAmount)
.fee(fee)
.settleAmount(settleAmount)
.build();
// 更新账户余额
accountService.credit(merchantId, settleAmount);
}
}
对账系统设计
对账是保证资金准确的关键,需要处理渠道对账、内部对账、差错处理。
# 对账服务
class ReconciliationService:
def __init__(self):
self.channel_repos = {
'alipay': AlipayReconciliation(),
'wechat': WechatReconciliation(),
'bank': BankReconciliation(),
}
def daily_reconciliation(self):
"""日终对账"""
# 1. 下载渠道对账文件
for channel, repo in self.channel_repos.items():
channel_file = repo.download_reconciliation_file()
# 2. 解析渠道数据
channel_records = repo.parse_file(channel_file)
# 3. 获取内部交易记录
internal_records = self.get_internal_records(channel)
# 4. 执行对账
result = self.reconcile(channel_records, internal_records)
# 5. 处理差异
if result.has_differences():
self.handle_differences(channel, result.differences)
def reconcile(self, channel_records, internal_records):
"""执行对账"""
# 按交易号匹配
channel_map = {r.transaction_id: r for r in channel_records}
internal_map = {r.transaction_id: r for r in internal_records}
differences = []
# 检查渠道有、内部无(长款)
for trans_id in channel_map:
if trans_id not in internal_map:
differences.append(Difference(
type='LONG',
transaction_id=trans_id,
description='渠道有记录,内部无记录'
))
# 检查内部有、渠道无(短款)
for trans_id in internal_map:
if trans_id not in channel_map:
differences.append(Difference(
type='SHORT',
transaction_id=trans_id,
description='内部有记录,渠道无记录'
))
# 检查金额不一致
for trans_id in set(channel_map.keys()) & set(internal_map.keys()):
if channel_map[trans_id].amount != internal_map[trans_id].amount:
differences.append(Difference(
type='AMOUNT',
transaction_id=trans_id,
description='金额不一致'
))
return ReconciliationResult(differences)
风控系统设计
风控系统需要实时识别和拦截风险交易,保护资金安全。
// 风控服务
@Service
public class RiskControlService {
@Autowired
private RuleEngine ruleEngine;
@Autowired
private RiskEventRepository eventRepo;
public RiskResult checkTransaction(Transaction transaction) {
// 1. 规则引擎检查
List<RuleResult> ruleResults = ruleEngine.evaluate(transaction);
// 2. 综合评估风险
RiskLevel overallRisk = assessOverallRisk(ruleResults);
// 3. 记录风险事件
if (overallRisk != RiskLevel.LOW) {
RiskEvent event = RiskEvent.builder()
.transactionId(transaction.getId())
.riskLevel(overallRisk)
.ruleResults(ruleResults)
.build();
eventRepo.save(event);
}
// 4. 根据风险级别决定处置
return RiskResult.builder()
.riskLevel(overallRisk)
.action(determineAction(overallRisk))
.build();
}
private RiskAction determineAction(RiskLevel level) {
switch (level) {
case HIGH:
return RiskAction.BLOCK; // 直接拦截
case MEDIUM:
return RiskAction.CHALLENGE; // 要求验证
case LOW:
default:
return RiskAction.ALLOW; // 放行
}
}
}
// 规则引擎
@Component
public class RuleEngine {
private final List<RiskRule> rules = new ArrayList<>();
@PostConstruct
public void init() {
// 金额规则
rules.add(new AmountRule(10000)); // 单笔超过1万
// 频率规则
rules.add(new FrequencyRule(10, 3600)); // 1小时内超过10笔
// 地理位置规则
rules.add(new GeoLocationRule());
// 设备指纹规则
rules.add(new DeviceFingerprintRule());
}
public List<RuleResult> evaluate(Transaction transaction) {
return rules.stream()
.map(rule -> rule.evaluate(transaction))
.filter(RuleResult::isTriggered)
.collect(Collectors.toList());
}
}
幂等性设计
支付系统必须保证幂等性,防止重复支付。核心思路是使用唯一的业务号作为幂等键。
# 支付幂等性设计
class PaymentIdempotent:
def __init__(self):
self.redis = RedisClient()
def create_payment(self, order_id, amount):
"""创建支付(幂等)"""
# 生成幂等键
idempotent_key = f"payment:{order_id}"
# 尝试获取锁
lock = self.redis.acquire_lock(idempotent_key, timeout=30)
if not lock:
# 已有支付进行中
return self.get_existing_payment(order_id)
try:
# 检查是否已存在
existing = self.redis.get(idempotent_key)
if existing:
return json.loads(existing)
# 创建支付记录
payment = Payment(
order_id=order_id,
amount=amount,
status='PENDING',
create_time=time.time()
)
# 保存到Redis
self.redis.set(idempotent_key, json.dumps(payment.to_dict()), ex=3600)
# 异步调用支付渠道
self.call_payment_channel_async(payment)
return payment
finally:
self.redis.release_lock(lock)
def handle_payment_callback(self, payment_id, result):
"""处理支付回调(幂等)"""
# 生成回调幂等键
idempotent_key = f"payment_callback:{payment_id}"
# 尝试获取锁
lock = self.redis.acquire_lock(idempotent_key, timeout=10)
if not lock:
return # 已处理过
try:
# 检查是否已处理
if self.redis.get(idempotent_key):
return # 已处理过
# 更新支付状态
self.update_payment_status(payment_id, result)
# 更新订单状态
self.update_order_status(payment_id, result)
# 标记已处理
self.redis.set(idempotent_key, 'processed', ex=86400)
finally:
self.redis.release_lock(lock)