← 返回首页
💼

支付架构:清算、对账、风控与幂等

📂 architecture ⏱ 3 min 481 words

支付架构:清算、对账、风控与幂等

支付系统架构概览

支付系统是资金流转的核心,需要保证极高的安全性和可靠性。主要模块包括:收单、支付路由、清算、对账、风控、账户系统。

支付系统架构:

接入层:商户接入、API网关、安全认证
业务层:
  - 收单服务:订单创建、支付请求、支付结果
  - 支付路由:渠道选择、费率计算、限额控制
  - 清算服务:交易清分、资金划拨、结算
  - 对账服务:渠道对账、内部对账、差错处理
  - 风控服务:交易监控、风险识别、拦截处置
  - 账户服务:账户管理、余额、流水
数据层:MySQL、Redis、消息队列

清算对账流程

清算是支付的核心环节,需要保证资金准确无误。典型流程包括:交易清分、资金划拨、结算到账。

// 清算服务
@Service
public class SettlementService {
    @Autowired
    private TransactionRepository transRepo;
    @Autowired
    private AccountService accountService;
    
    // 日终清算
    @Scheduled(cron = "0 0 2 * * ?")
    public void dailySettlement() {
        // 1. 获取当日交易
        List<Transaction> transactions = transRepo.getTodayTransactions();
        
        // 2. 按商户清分
        Map<Long, List<Transaction>> merchantGroups = transactions.stream()
            .collect(Collectors.groupingBy(Transaction::getMerchantId));
        
        // 3. 逐商户结算
        for (Map.Entry<Long, List<Transaction>> entry : merchantGroups.entrySet()) {
            settleMerchant(entry.getKey(), entry.getValue());
        }
    }
    
    private void settleMerchant(Long merchantId, List<Transaction> transactions) {
        // 计算应收金额(扣除手续费)
        BigDecimal totalAmount = transactions.stream()
            .map(Transaction::getAmount)
            .reduce(BigDecimal.ZERO, BigDecimal::add);
        
        BigDecimal fee = calculateFee(merchantId, totalAmount);
        BigDecimal settleAmount = totalAmount.subtract(fee);
        
        // 生成结算单
        SettlementOrder order = SettlementOrder.builder()
            .merchantId(merchantId)
            .totalAmount(totalAmount)
            .fee(fee)
            .settleAmount(settleAmount)
            .build();
        
        // 更新账户余额
        accountService.credit(merchantId, settleAmount);
    }
}

对账系统设计

对账是保证资金准确的关键,需要处理渠道对账、内部对账、差错处理。

# 对账服务
class ReconciliationService:
    def __init__(self):
        self.channel_repos = {
            'alipay': AlipayReconciliation(),
            'wechat': WechatReconciliation(),
            'bank': BankReconciliation(),
        }
    
    def daily_reconciliation(self):
        """日终对账"""
        # 1. 下载渠道对账文件
        for channel, repo in self.channel_repos.items():
            channel_file = repo.download_reconciliation_file()
            
            # 2. 解析渠道数据
            channel_records = repo.parse_file(channel_file)
            
            # 3. 获取内部交易记录
            internal_records = self.get_internal_records(channel)
            
            # 4. 执行对账
            result = self.reconcile(channel_records, internal_records)
            
            # 5. 处理差异
            if result.has_differences():
                self.handle_differences(channel, result.differences)
    
    def reconcile(self, channel_records, internal_records):
        """执行对账"""
        # 按交易号匹配
        channel_map = {r.transaction_id: r for r in channel_records}
        internal_map = {r.transaction_id: r for r in internal_records}
        
        differences = []
        
        # 检查渠道有、内部无(长款)
        for trans_id in channel_map:
            if trans_id not in internal_map:
                differences.append(Difference(
                    type='LONG',
                    transaction_id=trans_id,
                    description='渠道有记录,内部无记录'
                ))
        
        # 检查内部有、渠道无(短款)
        for trans_id in internal_map:
            if trans_id not in channel_map:
                differences.append(Difference(
                    type='SHORT',
                    transaction_id=trans_id,
                    description='内部有记录,渠道无记录'
                ))
        
        # 检查金额不一致
        for trans_id in set(channel_map.keys()) & set(internal_map.keys()):
            if channel_map[trans_id].amount != internal_map[trans_id].amount:
                differences.append(Difference(
                    type='AMOUNT',
                    transaction_id=trans_id,
                    description='金额不一致'
                ))
        
        return ReconciliationResult(differences)

风控系统设计

风控系统需要实时识别和拦截风险交易,保护资金安全。

// 风控服务
@Service
public class RiskControlService {
    @Autowired
    private RuleEngine ruleEngine;
    @Autowired
    private RiskEventRepository eventRepo;
    
    public RiskResult checkTransaction(Transaction transaction) {
        // 1. 规则引擎检查
        List<RuleResult> ruleResults = ruleEngine.evaluate(transaction);
        
        // 2. 综合评估风险
        RiskLevel overallRisk = assessOverallRisk(ruleResults);
        
        // 3. 记录风险事件
        if (overallRisk != RiskLevel.LOW) {
            RiskEvent event = RiskEvent.builder()
                .transactionId(transaction.getId())
                .riskLevel(overallRisk)
                .ruleResults(ruleResults)
                .build();
            eventRepo.save(event);
        }
        
        // 4. 根据风险级别决定处置
        return RiskResult.builder()
            .riskLevel(overallRisk)
            .action(determineAction(overallRisk))
            .build();
    }
    
    private RiskAction determineAction(RiskLevel level) {
        switch (level) {
            case HIGH:
                return RiskAction.BLOCK;      // 直接拦截
            case MEDIUM:
                return RiskAction.CHALLENGE;   // 要求验证
            case LOW:
            default:
                return RiskAction.ALLOW;       // 放行
        }
    }
}

// 规则引擎
@Component
public class RuleEngine {
    private final List<RiskRule> rules = new ArrayList<>();
    
    @PostConstruct
    public void init() {
        // 金额规则
        rules.add(new AmountRule(10000));  // 单笔超过1万
        
        // 频率规则
        rules.add(new FrequencyRule(10, 3600));  // 1小时内超过10笔
        
        // 地理位置规则
        rules.add(new GeoLocationRule());
        
        // 设备指纹规则
        rules.add(new DeviceFingerprintRule());
    }
    
    public List<RuleResult> evaluate(Transaction transaction) {
        return rules.stream()
            .map(rule -> rule.evaluate(transaction))
            .filter(RuleResult::isTriggered)
            .collect(Collectors.toList());
    }
}

幂等性设计

支付系统必须保证幂等性,防止重复支付。核心思路是使用唯一的业务号作为幂等键。

# 支付幂等性设计
class PaymentIdempotent:
    def __init__(self):
        self.redis = RedisClient()
    
    def create_payment(self, order_id, amount):
        """创建支付(幂等)"""
        # 生成幂等键
        idempotent_key = f"payment:{order_id}"
        
        # 尝试获取锁
        lock = self.redis.acquire_lock(idempotent_key, timeout=30)
        if not lock:
            # 已有支付进行中
            return self.get_existing_payment(order_id)
        
        try:
            # 检查是否已存在
            existing = self.redis.get(idempotent_key)
            if existing:
                return json.loads(existing)
            
            # 创建支付记录
            payment = Payment(
                order_id=order_id,
                amount=amount,
                status='PENDING',
                create_time=time.time()
            )
            
            # 保存到Redis
            self.redis.set(idempotent_key, json.dumps(payment.to_dict()), ex=3600)
            
            # 异步调用支付渠道
            self.call_payment_channel_async(payment)
            
            return payment
        finally:
            self.redis.release_lock(lock)
    
    def handle_payment_callback(self, payment_id, result):
        """处理支付回调(幂等)"""
        # 生成回调幂等键
        idempotent_key = f"payment_callback:{payment_id}"
        
        # 尝试获取锁
        lock = self.redis.acquire_lock(idempotent_key, timeout=10)
        if not lock:
            return  # 已处理过
        
        try:
            # 检查是否已处理
            if self.redis.get(idempotent_key):
                return  # 已处理过
            
            # 更新支付状态
            self.update_payment_status(payment_id, result)
            
            # 更新订单状态
            self.update_order_status(payment_id, result)
            
            # 标记已处理
            self.redis.set(idempotent_key, 'processed', ex=86400)
        finally:
            self.redis.release_lock(lock)