Python安全编程:加密、签名、安全哈希与漏洞防护
Python安全编程:加密、签名、安全哈希与漏洞防护
安全编程是现代软件开发中不可或缺的技能。本文将介绍Python中的加密技术、安全哈希、数字签名,以及如何防护常见的安全漏洞。
密码学基础
对称加密
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.backends import default_backend
import os
def symmetric_encryption_example():
"""对称加密示例"""
# 使用Fernet(高级接口)
key = Fernet.generate_key()
fernet = Fernet(key)
message = "这是一条需要加密的消息".encode()
# 加密
encrypted = fernet.encrypt(message)
print(f"加密后: {encrypted}")
# 解密
decrypted = fernet.decrypt(encrypted)
print(f"解密后: {decrypted.decode()}")
return key
def aes_encryption_example():
"""AES加密示例(底层接口)"""
# 生成密钥
key = os.urandom(32) # 256位密钥
iv = os.urandom(16) # 16字节初始化向量
# 创建AES加密器
cipher = Cipher(
algorithms.AES(key),
modes.CBC(iv),
backend=default_backend()
)
# 要加密的消息
message = "需要加密的敏感数据".encode()
# PKCS7填充
padder = padding.PKCS7(128).padder()
padded_data = padder.update(message) + padder.finalize()
# 加密
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
print(f"密文: {ciphertext.hex()}")
print(f"密文长度: {len(ciphertext)} 字节")
# 解密
decryptor = cipher.decryptor()
decrypted_padded = decryptor.update(ciphertext) + decryptor.finalize()
# 去除填充
unpadder = padding.PKCS7(128).unpadder()
decrypted = unpadder.update(decrypted_padded) + unpadder.finalize()
print(f"解密后: {decrypted.decode()}")
class AESCipher:
"""AES加密器类"""
def __init__(self, key: bytes = None):
if key is None:
key = os.urandom(32)
self.key = key
def encrypt(self, data: bytes) -> tuple:
"""加密数据,返回(iv, ciphertext)"""
iv = os.urandom(16)
cipher = Cipher(
algorithms.AES(self.key),
modes.CBC(iv),
backend=default_backend()
)
padder = padding.PKCS7(128).padder()
padded_data = padder.update(data) + padder.finalize()
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return iv, ciphertext
def decrypt(self, iv: bytes, ciphertext: bytes) -> bytes:
"""解密数据"""
cipher = Cipher(
algorithms.AES(self.key),
modes.CBC(iv),
backend=default_backend()
)
decryptor = cipher.decryptor()
decrypted_padded = decryptor.update(ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(128).unpadder()
return unpadder.update(decrypted_padded) + unpadder.finalize()
# 使用示例
cipher = AESCipher()
message = "敏感数据".encode()
iv, encrypted = cipher.encrypt(message)
decrypted = cipher.decrypt(iv, encrypted)
print(f"解密结果: {decrypted.decode()}")
非对称加密
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
import json
class RSAEncryption:
"""RSA非对称加密"""
def __init__(self, key_size=2048):
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=key_size,
backend=default_backend()
)
self.public_key = self.private_key.public_key()
def encrypt(self, data: bytes) -> bytes:
"""使用公钥加密"""
encrypted = self.public_key.encrypt(
data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return encrypted
def decrypt(self, encrypted_data: bytes) -> bytes:
"""使用私钥解密"""
decrypted = self.private_key.decrypt(
encrypted_data,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None
)
)
return decrypted
def sign(self, data: bytes) -> bytes:
"""使用私钥签名"""
signature = self.private_key.sign(
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return signature
def verify(self, data: bytes, signature: bytes) -> bool:
"""使用公钥验证签名"""
try:
self.public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
def export_public_key(self) -> bytes:
"""导出公钥"""
return self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def export_private_key(self, password: bytes = None) -> bytes:
"""导出私钥"""
encryption = (
serialization.BestAvailableEncryption(password)
if password
else serialization.NoEncryption()
)
return self.private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption
)
# 使用示例
rsa_enc = RSAEncryption()
message = "RSA加密测试数据".encode()
# 加密解密
encrypted = rsa_enc.encrypt(message)
decrypted = rsa_enc.decrypt(encrypted)
print(f"RSA解密: {decrypted.decode()}")
# 签名验证
signature = rsa_enc.sign(message)
is_valid = rsa_enc.verify(message, signature)
print(f"签名验证: {'有效' if is_valid else '无效'}")
安全哈希算法
import hashlib
import hmac
import secrets
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.backends import default_backend
class SecureHash:
"""安全哈希工具"""
@staticmethod
def sha256(data: bytes) -> str:
"""SHA-256哈希"""
return hashlib.sha256(data).hexdigest()
@staticmethod
def sha512(data: bytes) -> str:
"""SHA-512哈希"""
return hashlib.sha512(data).hexdigest()
@staticmethod
def password_hash(password: str, salt: bytes = None) -> tuple:
"""密码哈希(使用PBKDF2)"""
if salt is None:
salt = os.urandom(32)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
key = kdf.derive(password.encode())
return salt, key
@staticmethod
def verify_password(password: str, salt: bytes, stored_key: bytes) -> bool:
"""验证密码"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
backend=default_backend()
)
try:
kdf.verify(password.encode(), stored_key)
return True
except Exception:
return False
@staticmethod
def hmac_sha256(key: bytes, message: bytes) -> str:
"""HMAC-SHA256消息认证码"""
return hmac.new(key, message, hashlib.sha256).hexdigest()
@staticmethod
def generate_salt(length: int = 32) -> bytes:
"""生成安全的随机盐值"""
return secrets.token_bytes(length)
class PasswordManager:
"""密码管理器"""
def __init__(self):
self.passwords = {}
def store_password(self, service: str, username: str, password: str):
"""存储密码"""
salt = SecureHash.generate_salt()
_, key = SecureHash.password_hash(password, salt)
self.passwords[service] = {
"username": username,
"salt": salt,
"key": key
}
def verify_password(self, service: str, password: str) -> bool:
"""验证密码"""
if service not in self.passwords:
return False
stored = self.passwords[service]
return SecureHash.verify_password(
password,
stored["salt"],
stored["key"]
)
# 使用示例
password_mgr = PasswordManager()
password_mgr.store_password("github", "user123", "secure_password_123")
print(f"密码验证: {password_mgr.verify_password('github', 'secure_password_123')}")
安全令牌生成
import secrets
import base64
from datetime import datetime, timedelta
class SecureTokenGenerator:
"""安全令牌生成器"""
@staticmethod
def generate_api_key(length: int = 32) -> str:
"""生成API密钥"""
return secrets.token_urlsafe(length)
@staticmethod
def generate_session_token() -> str:
"""生成会话令牌"""
return secrets.token_hex(32)
@staticmethod
def generate_otp(length: int = 6) -> str:
"""生成一次性密码"""
return ''.join([str(secrets.randbelow(10)) for _ in range(length)])
@staticmethod
def create_jwt_like_token(payload: dict, secret: str, expires_in: int = 3600) -> str:
"""创建类似JWT的令牌"""
# 添加过期时间
payload['exp'] = (datetime.now() + timedelta(seconds=expires_in)).isoformat()
payload['iat'] = datetime.now().isoformat()
# 编码载荷
payload_json = json.dumps(payload, sort_keys=True)
payload_b64 = base64.urlsafe_b64encode(payload_json.encode()).decode()
# 创建签名
signature = hmac.new(
secret.encode(),
payload_b64.encode(),
hashlib.sha256
).hexdigest()
return f"{payload_b64}.{signature}"
@staticmethod
def verify_jwt_like_token(token: str, secret: str) -> tuple:
"""验证JWT-like令牌"""
try:
parts = token.split('.')
if len(parts) != 2:
return False, "无效的令牌格式"
payload_b64, signature = parts
# 验证签名
expected_signature = hmac.new(
secret.encode(),
payload_b64.encode(),
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return False, "签名验证失败"
# 解码载荷
payload_json = base64.urlsafe_b64decode(payload_b64 + '==')
payload = json.loads(payload_json)
# 检查过期时间
exp_time = datetime.fromisoformat(payload['exp'])
if datetime.now() > exp_time:
return False, "令牌已过期"
return True, payload
except Exception as e:
return False, f"令牌验证错误: {str(e)}"
# 使用示例
token_gen = SecureTokenGenerator()
api_key = token_gen.generate_api_key()
session_token = token_gen.generate_session_token()
otp = token_gen.generate_otp()
print(f"API密钥: {api_key}")
print(f"会话令牌: {session_token}")
print(f"OTP: {otp}")
# JWT-like令牌
secret = "my_secret_key"
payload = {"user_id": 123, "role": "admin"}
token = token_gen.create_jwt_like_token(payload, secret)
print(f"生成的令牌: {token}")
is_valid, decoded_payload = token_gen.verify_jwt_like_token(token, secret)
print(f"令牌验证: {'有效' if is_valid else '无效'}")
常见漏洞防护
import re
import html
import urllib.parse
class SecurityProtector:
"""安全防护工具"""
@staticmethod
def prevent_sql_injection(query: str, params: tuple) -> str:
"""防止SQL注入"""
# 使用参数化查询(示例)
# 实际使用时请使用ORM或参数化查询
safe_query = query.replace('%s', '?')
print(f"安全查询: {safe_query}, 参数: {params}")
return safe_query
@staticmethod
def prevent_xss(text: str) -> str:
"""防止XSS攻击"""
# HTML转义
escaped = html.escape(text)
# 移除危险的标签和属性
cleaned = re.sub(r'<script[^>]*>.*?</script>', '', escaped, flags=re.DOTALL)
cleaned = re.sub(r'javascript:', '', cleaned, flags=re.IGNORECASE)
return cleaned
@staticmethod
def prevent_path_traversal(path: str) -> str:
"""防止路径遍历攻击"""
# 规范化路径
normalized = os.path.normpath(path)
# 检查是否尝试访问上级目录
if '..' in normalized or normalized.startswith('/'):
raise ValueError("路径遍历攻击检测")
return normalized
@staticmethod
def validate_input(input_data: str, pattern: str) -> bool:
"""输入验证"""
return bool(re.match(pattern, input_data))
@staticmethod
def sanitize_filename(filename: str) -> str:
"""清理文件名"""
# 移除危险字符
sanitized = re.sub(r'[<>:"/\\|?*]', '', filename)
# 限制长度
if len(sanitized) > 255:
sanitized = sanitized[:255]
return sanitized
class InputValidator:
"""输入验证器"""
@staticmethod
def validate_email(email: str) -> bool:
"""验证邮箱格式"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
@staticmethod
def validate_password(password: str) -> tuple:
"""验证密码强度"""
errors = []
if len(password) < 8:
errors.append("密码长度至少8位")
if not re.search(r'[A-Z]', password):
errors.append("密码必须包含大写字母")
if not re.search(r'[a-z]', password):
errors.append("密码必须包含小写字母")
if not re.search(r'\d', password):
errors.append("密码必须包含数字")
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
errors.append("密码必须包含特殊字符")
return len(errors) == 0, errors
@staticmethod
def validate_credit_card(card_number: str) -> bool:
"""验证信用卡号(Luhn算法)"""
# 移除空格和连字符
card_number = re.sub(r'[\s-]', '', card_number)
if not card_number.isdigit():
return False
# Luhn算法
total = 0
reverse_digits = card_number[::-1]
for i, digit in enumerate(reverse_digits):
n = int(digit)
if i % 2 == 1:
n *= 2
if n > 9:
n -= 9
total += n
return total % 10 == 0
# 使用示例
protector = SecurityProtector()
validator = InputValidator()
# 测试输入验证
print(f"邮箱验证: {validator.validate_email('user@example.com')}")
print(f"密码验证: {validator.validate_password('SecureP@ss123')}")
# 测试安全防护
xss_text = '<script>alert("xss")</script>'
safe_text = protector.prevent_xss(xss_text)
print(f"XSS防护: {safe_text}")
安全最佳实践
import logging
from functools import wraps
import time
class SecurityLogger:
"""安全日志记录器"""
def __init__(self):
self.logger = logging.getLogger('security')
self.logger.setLevel(logging.INFO)
# 防止日志注入
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
def log_security_event(self, event_type: str, details: dict):
"""记录安全事件"""
# 清理日志消息,防止日志注入
safe_details = {}
for key, value in details.items():
if isinstance(value, str):
# 移除换行符和控制字符
safe_value = re.sub(r'[\n\r\t]', '', value)
safe_details[key] = safe_value
else:
safe_details[key] = value
self.logger.warning(f"安全事件: {event_type}", extra=safe_details)
class RateLimiter:
"""速率限制器"""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
def is_allowed(self, client_id: str) -> bool:
"""检查是否允许请求"""
now = time.time()
# 清理过期记录
self.requests = {
k: v for k, v in self.requests.items()
if now - v[0] < self.window_seconds
}
if client_id not in self.requests:
self.requests[client_id] = [now, 1]
return True
first_request, count = self.requests[client_id]
if count >= self.max_requests:
return False
self.requests[client_id][1] += 1
return True
# 使用示例
security_logger = SecurityLogger()
rate_limiter = RateLimiter(max_requests=100, window_seconds=60)
# 测试速率限制
client_id = "192.168.1.1"
for i in range(105):
if rate_limiter.is_allowed(client_id):
print(f"请求 {i+1}: 允许")
else:
print(f"请求 {i+1}: 被拒绝(超过速率限制)")
总结
安全编程需要持续学习和实践。掌握加密技术、输入验证、速率限制等安全措施,可以帮助你构建更加安全可靠的Python应用。记住,安全是一个持续的过程,需要定期审查和更新安全措施。