返回博客技巧
验证码API错误处理:生产环境最佳实践
使用正确的超时处理、重试逻辑、熔断器和优雅降级模式构建弹性集成。
reGOTCHA团队2025年12月12日6分钟 阅读
为什么错误处理重要
验证码解决涉及外部 API、网络请求和时间敏感操作。 健壮的错误处理可防止级联故障和资源浪费。
常见错误类型
1. 网络错误
example.py
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def create_task(client, params):
try:
response = await client.post("/createTask", json=params)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
raise # 让 tenacity 重试
except httpx.HTTPStatusError as e:
if e.response.status_code >= 500:
raise # 重试服务器错误
raise e # 不重试客户端错误2. API 错误
example.py
class CaptchaAPIError(Exception):
def __init__(self, error_id: int, description: str):
self.error_id = error_id
self.description = description
super().__init__(f"错误 {error_id}: {description}")
ERROR_CODES = {
1: "API 密钥无效",
2: "余额不足",
3: "任务未找到",
10: "Site key 无效",
12: "任务超时",
}
def handle_api_response(data: dict):
if error_id := data.get("errorId"):
desc = ERROR_CODES.get(error_id, data.get("errorDescription", "未知"))
raise CaptchaAPIError(error_id, desc)3. 超时处理
example.py
import asyncio
async def solve_with_timeout(solver, params, timeout=120):
try:
return await asyncio.wait_for(
solver.solve(**params),
timeout=timeout
)
except asyncio.TimeoutError:
# 记录指标
logger.warning(f"验证码解决在 {timeout} 秒后超时")
raise
# 带回退
async def solve_with_fallback(solvers, params):
for solver in solvers:
try:
return await solve_with_timeout(solver, params)
except Exception as e:
logger.warning(f"解决器 {solver.name} 失败: {e}")
continue
raise Exception("所有解决器失败")熔断器模式
example.py
from datetime import datetime, timedelta
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
half_open_max: int = 3
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.half_open_max = half_open_max
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.half_open_calls = 0
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_try_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise Exception("熔断器已打开")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _should_try_reset(self):
return (
self.last_failure_time and
datetime.now() - self.last_failure_time >
timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
if self.state == CircuitState.HALF_OPEN:
self.half_open_calls += 1
if self.half_open_calls >= self.half_open_max:
self.state = CircuitState.CLOSED
self.failures = 0
def _on_failure(self):
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = CircuitState.OPEN优雅降级
example.py
class CaptchaSolverWithFallback:
def __init__(self, primary_solver, fallback_strategy):
self.primary = primary_solver
self.fallback = fallback_strategy
self.circuit = CircuitBreaker()
async def solve(self, **params):
try:
return await self.circuit.call(
self.primary.solve,
**params
)
except Exception as e:
logger.warning(f"主解决器失败: {e}")
return await self.fallback(**params)
# 回退策略
async def queue_for_manual_review(**params):
"""将任务排队等待人工处理"""
await task_queue.push({
"type": "captcha_manual",
"params": params,
"created_at": datetime.now()
})
return None
async def skip_captcha_action(**params):
"""跳过需要验证码的操作"""
logger.info(f"跳过受验证码保护的操作: {params}")
return None监控和告警
example.py
from dataclasses import dataclass
from collections import deque
@dataclass
class SolverMetrics:
total_attempts: int = 0
successes: int = 0
failures: int = 0
timeouts: int = 0
avg_solve_time: float = 0.0
recent_errors: deque = None
def __post_init__(self):
self.recent_errors = deque(maxlen=100)
@property
def success_rate(self):
if self.total_attempts == 0:
return 0.0
return self.successes / self.total_attempts * 100
def should_alert(self):
# 成功率低于 80% 时告警
return self.success_rate < 80 and self.total_attempts > 10
# 使用
metrics = SolverMetrics()
async def solve_with_metrics(solver, params):
start = time.time()
metrics.total_attempts += 1
try:
result = await solver.solve(**params)
metrics.successes += 1
metrics.avg_solve_time = (
metrics.avg_solve_time * 0.9 +
(time.time() - start) * 0.1
)
return result
except asyncio.TimeoutError:
metrics.timeouts += 1
raise
except Exception as e:
metrics.failures += 1
metrics.recent_errors.append(str(e))
raise专业提示:实施健康检查,在生产工作流需要之前验证您的验证码解决管道正常工作。
API错误处理生产环境最佳实践