← 返回首页
🔒

Python安全编程:加密、签名、安全哈希与漏洞防护

📂 python ⏱ 6 min 1169 words

Python安全编程:加密、签名、安全哈希与漏洞防护

安全编程是现代软件开发中不可或缺的技能。本文将介绍Python中的加密技术、安全哈希、数字签名,以及如何防护常见的安全漏洞。

密码学基础

对称加密

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.backends import default_backend
import os

def symmetric_encryption_example():
    """对称加密示例"""
    # 使用Fernet(高级接口)
    key = Fernet.generate_key()
    fernet = Fernet(key)
    
    message = "这是一条需要加密的消息".encode()
    
    # 加密
    encrypted = fernet.encrypt(message)
    print(f"加密后: {encrypted}")
    
    # 解密
    decrypted = fernet.decrypt(encrypted)
    print(f"解密后: {decrypted.decode()}")
    
    return key

def aes_encryption_example():
    """AES加密示例(底层接口)"""
    # 生成密钥
    key = os.urandom(32)  # 256位密钥
    iv = os.urandom(16)   # 16字节初始化向量
    
    # 创建AES加密器
    cipher = Cipher(
        algorithms.AES(key),
        modes.CBC(iv),
        backend=default_backend()
    )
    
    # 要加密的消息
    message = "需要加密的敏感数据".encode()
    
    # PKCS7填充
    padder = padding.PKCS7(128).padder()
    padded_data = padder.update(message) + padder.finalize()
    
    # 加密
    encryptor = cipher.encryptor()
    ciphertext = encryptor.update(padded_data) + encryptor.finalize()
    
    print(f"密文: {ciphertext.hex()}")
    print(f"密文长度: {len(ciphertext)} 字节")
    
    # 解密
    decryptor = cipher.decryptor()
    decrypted_padded = decryptor.update(ciphertext) + decryptor.finalize()
    
    # 去除填充
    unpadder = padding.PKCS7(128).unpadder()
    decrypted = unpadder.update(decrypted_padded) + unpadder.finalize()
    
    print(f"解密后: {decrypted.decode()}")

class AESCipher:
    """AES加密器类"""
    
    def __init__(self, key: bytes = None):
        if key is None:
            key = os.urandom(32)
        self.key = key
    
    def encrypt(self, data: bytes) -> tuple:
        """加密数据,返回(iv, ciphertext)"""
        iv = os.urandom(16)
        
        cipher = Cipher(
            algorithms.AES(self.key),
            modes.CBC(iv),
            backend=default_backend()
        )
        
        padder = padding.PKCS7(128).padder()
        padded_data = padder.update(data) + padder.finalize()
        
        encryptor = cipher.encryptor()
        ciphertext = encryptor.update(padded_data) + encryptor.finalize()
        
        return iv, ciphertext
    
    def decrypt(self, iv: bytes, ciphertext: bytes) -> bytes:
        """解密数据"""
        cipher = Cipher(
            algorithms.AES(self.key),
            modes.CBC(iv),
            backend=default_backend()
        )
        
        decryptor = cipher.decryptor()
        decrypted_padded = decryptor.update(ciphertext) + decryptor.finalize()
        
        unpadder = padding.PKCS7(128).unpadder()
        return unpadder.update(decrypted_padded) + unpadder.finalize()

# 使用示例
cipher = AESCipher()
message = "敏感数据".encode()
iv, encrypted = cipher.encrypt(message)
decrypted = cipher.decrypt(iv, encrypted)
print(f"解密结果: {decrypted.decode()}")

非对称加密

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
import json

class RSAEncryption:
    """RSA非对称加密"""
    
    def __init__(self, key_size=2048):
        self.private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=key_size,
            backend=default_backend()
        )
        self.public_key = self.private_key.public_key()
    
    def encrypt(self, data: bytes) -> bytes:
        """使用公钥加密"""
        encrypted = self.public_key.encrypt(
            data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        return encrypted
    
    def decrypt(self, encrypted_data: bytes) -> bytes:
        """使用私钥解密"""
        decrypted = self.private_key.decrypt(
            encrypted_data,
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )
        return decrypted
    
    def sign(self, data: bytes) -> bytes:
        """使用私钥签名"""
        signature = self.private_key.sign(
            data,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )
        return signature
    
    def verify(self, data: bytes, signature: bytes) -> bool:
        """使用公钥验证签名"""
        try:
            self.public_key.verify(
                signature,
                data,
                padding.PSS(
                    mgf=padding.MGF1(hashes.SHA256()),
                    salt_length=padding.PSS.MAX_LENGTH
                ),
                hashes.SHA256()
            )
            return True
        except Exception:
            return False
    
    def export_public_key(self) -> bytes:
        """导出公钥"""
        return self.public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    
    def export_private_key(self, password: bytes = None) -> bytes:
        """导出私钥"""
        encryption = (
            serialization.BestAvailableEncryption(password)
            if password
            else serialization.NoEncryption()
        )
        return self.private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=encryption
        )

# 使用示例
rsa_enc = RSAEncryption()
message = "RSA加密测试数据".encode()

# 加密解密
encrypted = rsa_enc.encrypt(message)
decrypted = rsa_enc.decrypt(encrypted)
print(f"RSA解密: {decrypted.decode()}")

# 签名验证
signature = rsa_enc.sign(message)
is_valid = rsa_enc.verify(message, signature)
print(f"签名验证: {'有效' if is_valid else '无效'}")

安全哈希算法

import hashlib
import hmac
import secrets
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend

class SecureHash:
    """安全哈希工具"""
    
    @staticmethod
    def sha256(data: bytes) -> str:
        """SHA-256哈希"""
        return hashlib.sha256(data).hexdigest()
    
    @staticmethod
    def sha512(data: bytes) -> str:
        """SHA-512哈希"""
        return hashlib.sha512(data).hexdigest()
    
    @staticmethod
    def password_hash(password: str, salt: bytes = None) -> tuple:
        """密码哈希(使用PBKDF2)"""
        if salt is None:
            salt = os.urandom(32)
        
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
            backend=default_backend()
        )
        key = kdf.derive(password.encode())
        return salt, key
    
    @staticmethod
    def verify_password(password: str, salt: bytes, stored_key: bytes) -> bool:
        """验证密码"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
            backend=default_backend()
        )
        try:
            kdf.verify(password.encode(), stored_key)
            return True
        except Exception:
            return False
    
    @staticmethod
    def hmac_sha256(key: bytes, message: bytes) -> str:
        """HMAC-SHA256消息认证码"""
        return hmac.new(key, message, hashlib.sha256).hexdigest()
    
    @staticmethod
    def generate_salt(length: int = 32) -> bytes:
        """生成安全的随机盐值"""
        return secrets.token_bytes(length)

class PasswordManager:
    """密码管理器"""
    
    def __init__(self):
        self.passwords = {}
    
    def store_password(self, service: str, username: str, password: str):
        """存储密码"""
        salt = SecureHash.generate_salt()
        _, key = SecureHash.password_hash(password, salt)
        
        self.passwords[service] = {
            "username": username,
            "salt": salt,
            "key": key
        }
    
    def verify_password(self, service: str, password: str) -> bool:
        """验证密码"""
        if service not in self.passwords:
            return False
        
        stored = self.passwords[service]
        return SecureHash.verify_password(
            password, 
            stored["salt"], 
            stored["key"]
        )

# 使用示例
password_mgr = PasswordManager()
password_mgr.store_password("github", "user123", "secure_password_123")
print(f"密码验证: {password_mgr.verify_password('github', 'secure_password_123')}")

安全令牌生成

import secrets
import base64
from datetime import datetime, timedelta

class SecureTokenGenerator:
    """安全令牌生成器"""
    
    @staticmethod
    def generate_api_key(length: int = 32) -> str:
        """生成API密钥"""
        return secrets.token_urlsafe(length)
    
    @staticmethod
    def generate_session_token() -> str:
        """生成会话令牌"""
        return secrets.token_hex(32)
    
    @staticmethod
    def generate_otp(length: int = 6) -> str:
        """生成一次性密码"""
        return ''.join([str(secrets.randbelow(10)) for _ in range(length)])
    
    @staticmethod
    def create_jwt_like_token(payload: dict, secret: str, expires_in: int = 3600) -> str:
        """创建类似JWT的令牌"""
        # 添加过期时间
        payload['exp'] = (datetime.now() + timedelta(seconds=expires_in)).isoformat()
        payload['iat'] = datetime.now().isoformat()
        
        # 编码载荷
        payload_json = json.dumps(payload, sort_keys=True)
        payload_b64 = base64.urlsafe_b64encode(payload_json.encode()).decode()
        
        # 创建签名
        signature = hmac.new(
            secret.encode(),
            payload_b64.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return f"{payload_b64}.{signature}"
    
    @staticmethod
    def verify_jwt_like_token(token: str, secret: str) -> tuple:
        """验证JWT-like令牌"""
        try:
            parts = token.split('.')
            if len(parts) != 2:
                return False, "无效的令牌格式"
            
            payload_b64, signature = parts
            
            # 验证签名
            expected_signature = hmac.new(
                secret.encode(),
                payload_b64.encode(),
                hashlib.sha256
            ).hexdigest()
            
            if not hmac.compare_digest(signature, expected_signature):
                return False, "签名验证失败"
            
            # 解码载荷
            payload_json = base64.urlsafe_b64decode(payload_b64 + '==')
            payload = json.loads(payload_json)
            
            # 检查过期时间
            exp_time = datetime.fromisoformat(payload['exp'])
            if datetime.now() > exp_time:
                return False, "令牌已过期"
            
            return True, payload
            
        except Exception as e:
            return False, f"令牌验证错误: {str(e)}"

# 使用示例
token_gen = SecureTokenGenerator()
api_key = token_gen.generate_api_key()
session_token = token_gen.generate_session_token()
otp = token_gen.generate_otp()

print(f"API密钥: {api_key}")
print(f"会话令牌: {session_token}")
print(f"OTP: {otp}")

# JWT-like令牌
secret = "my_secret_key"
payload = {"user_id": 123, "role": "admin"}
token = token_gen.create_jwt_like_token(payload, secret)
print(f"生成的令牌: {token}")

is_valid, decoded_payload = token_gen.verify_jwt_like_token(token, secret)
print(f"令牌验证: {'有效' if is_valid else '无效'}")

常见漏洞防护

import re
import html
import urllib.parse

class SecurityProtector:
    """安全防护工具"""
    
    @staticmethod
    def prevent_sql_injection(query: str, params: tuple) -> str:
        """防止SQL注入"""
        # 使用参数化查询(示例)
        # 实际使用时请使用ORM或参数化查询
        safe_query = query.replace('%s', '?')
        print(f"安全查询: {safe_query}, 参数: {params}")
        return safe_query
    
    @staticmethod
    def prevent_xss(text: str) -> str:
        """防止XSS攻击"""
        # HTML转义
        escaped = html.escape(text)
        # 移除危险的标签和属性
        cleaned = re.sub(r'<script[^>]*>.*?</script>', '', escaped, flags=re.DOTALL)
        cleaned = re.sub(r'javascript:', '', cleaned, flags=re.IGNORECASE)
        return cleaned
    
    @staticmethod
    def prevent_path_traversal(path: str) -> str:
        """防止路径遍历攻击"""
        # 规范化路径
        normalized = os.path.normpath(path)
        # 检查是否尝试访问上级目录
        if '..' in normalized or normalized.startswith('/'):
            raise ValueError("路径遍历攻击检测")
        return normalized
    
    @staticmethod
    def validate_input(input_data: str, pattern: str) -> bool:
        """输入验证"""
        return bool(re.match(pattern, input_data))
    
    @staticmethod
    def sanitize_filename(filename: str) -> str:
        """清理文件名"""
        # 移除危险字符
        sanitized = re.sub(r'[<>:"/\\|?*]', '', filename)
        # 限制长度
        if len(sanitized) > 255:
            sanitized = sanitized[:255]
        return sanitized

class InputValidator:
    """输入验证器"""
    
    @staticmethod
    def validate_email(email: str) -> bool:
        """验证邮箱格式"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    @staticmethod
    def validate_password(password: str) -> tuple:
        """验证密码强度"""
        errors = []
        
        if len(password) < 8:
            errors.append("密码长度至少8位")
        
        if not re.search(r'[A-Z]', password):
            errors.append("密码必须包含大写字母")
        
        if not re.search(r'[a-z]', password):
            errors.append("密码必须包含小写字母")
        
        if not re.search(r'\d', password):
            errors.append("密码必须包含数字")
        
        if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            errors.append("密码必须包含特殊字符")
        
        return len(errors) == 0, errors
    
    @staticmethod
    def validate_credit_card(card_number: str) -> bool:
        """验证信用卡号(Luhn算法)"""
        # 移除空格和连字符
        card_number = re.sub(r'[\s-]', '', card_number)
        
        if not card_number.isdigit():
            return False
        
        # Luhn算法
        total = 0
        reverse_digits = card_number[::-1]
        for i, digit in enumerate(reverse_digits):
            n = int(digit)
            if i % 2 == 1:
                n *= 2
                if n > 9:
                    n -= 9
            total += n
        
        return total % 10 == 0

# 使用示例
protector = SecurityProtector()
validator = InputValidator()

# 测试输入验证
print(f"邮箱验证: {validator.validate_email('user@example.com')}")
print(f"密码验证: {validator.validate_password('SecureP@ss123')}")

# 测试安全防护
xss_text = '<script>alert("xss")</script>'
safe_text = protector.prevent_xss(xss_text)
print(f"XSS防护: {safe_text}")

安全最佳实践

import logging
from functools import wraps
import time

class SecurityLogger:
    """安全日志记录器"""
    
    def __init__(self):
        self.logger = logging.getLogger('security')
        self.logger.setLevel(logging.INFO)
        
        # 防止日志注入
        handler = logging.StreamHandler()
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
    
    def log_security_event(self, event_type: str, details: dict):
        """记录安全事件"""
        # 清理日志消息,防止日志注入
        safe_details = {}
        for key, value in details.items():
            if isinstance(value, str):
                # 移除换行符和控制字符
                safe_value = re.sub(r'[\n\r\t]', '', value)
                safe_details[key] = safe_value
            else:
                safe_details[key] = value
        
        self.logger.warning(f"安全事件: {event_type}", extra=safe_details)

class RateLimiter:
    """速率限制器"""
    
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = {}
    
    def is_allowed(self, client_id: str) -> bool:
        """检查是否允许请求"""
        now = time.time()
        
        # 清理过期记录
        self.requests = {
            k: v for k, v in self.requests.items()
            if now - v[0] < self.window_seconds
        }
        
        if client_id not in self.requests:
            self.requests[client_id] = [now, 1]
            return True
        
        first_request, count = self.requests[client_id]
        
        if count >= self.max_requests:
            return False
        
        self.requests[client_id][1] += 1
        return True

# 使用示例
security_logger = SecurityLogger()
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)

# 测试速率限制
client_id = "192.168.1.1"
for i in range(105):
    if rate_limiter.is_allowed(client_id):
        print(f"请求 {i+1}: 允许")
    else:
        print(f"请求 {i+1}: 被拒绝(超过速率限制)")

总结

安全编程需要持续学习和实践。掌握加密技术、输入验证、速率限制等安全措施,可以帮助你构建更加安全可靠的Python应用。记住,安全是一个持续的过程,需要定期审查和更新安全措施。