返回博客技巧

验证码API错误处理:生产环境最佳实践

使用正确的超时处理、重试逻辑、熔断器和优雅降级模式构建弹性集成。

reGOTCHA团队2025年12月12日6分钟 阅读
验证码API错误处理:生产环境最佳实践

为什么错误处理重要

验证码解决涉及外部 API、网络请求和时间敏感操作。 健壮的错误处理可防止级联故障和资源浪费。

常见错误类型

1. 网络错误

example.py
import httpx
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def create_task(client, params):
    try:
        response = await client.post("/createTask", json=params)
        response.raise_for_status()
        return response.json()
    except httpx.TimeoutException:
        raise  # 让 tenacity 重试
    except httpx.HTTPStatusError as e:
        if e.response.status_code >= 500:
            raise  # 重试服务器错误
        raise e  # 不重试客户端错误

2. API 错误

example.py
class CaptchaAPIError(Exception):
    def __init__(self, error_id: int, description: str):
        self.error_id = error_id
        self.description = description
        super().__init__(f"错误 {error_id}: {description}")

ERROR_CODES = {
    1: "API 密钥无效",
    2: "余额不足",
    3: "任务未找到",
    10: "Site key 无效",
    12: "任务超时",
}

def handle_api_response(data: dict):
    if error_id := data.get("errorId"):
        desc = ERROR_CODES.get(error_id, data.get("errorDescription", "未知"))
        raise CaptchaAPIError(error_id, desc)

3. 超时处理

example.py
import asyncio

async def solve_with_timeout(solver, params, timeout=120):
    try:
        return await asyncio.wait_for(
            solver.solve(**params),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        # 记录指标
        logger.warning(f"验证码解决在 {timeout} 秒后超时")
        raise

# 带回退
async def solve_with_fallback(solvers, params):
    for solver in solvers:
        try:
            return await solve_with_timeout(solver, params)
        except Exception as e:
            logger.warning(f"解决器 {solver.name} 失败: {e}")
            continue
    raise Exception("所有解决器失败")

熔断器模式

example.py
from datetime import datetime, timedelta
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        half_open_max: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max = half_open_max

        self.state = CircuitState.CLOSED
        self.failures = 0
        self.last_failure_time = None
        self.half_open_calls = 0

    async def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if self._should_try_reset():
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise Exception("熔断器已打开")

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _should_try_reset(self):
        return (
            self.last_failure_time and
            datetime.now() - self.last_failure_time >
            timedelta(seconds=self.recovery_timeout)
        )

    def _on_success(self):
        if self.state == CircuitState.HALF_OPEN:
            self.half_open_calls += 1
            if self.half_open_calls >= self.half_open_max:
                self.state = CircuitState.CLOSED
                self.failures = 0

    def _on_failure(self):
        self.failures += 1
        self.last_failure_time = datetime.now()

        if self.failures >= self.failure_threshold:
            self.state = CircuitState.OPEN

优雅降级

example.py
class CaptchaSolverWithFallback:
    def __init__(self, primary_solver, fallback_strategy):
        self.primary = primary_solver
        self.fallback = fallback_strategy
        self.circuit = CircuitBreaker()

    async def solve(self, **params):
        try:
            return await self.circuit.call(
                self.primary.solve,
                **params
            )
        except Exception as e:
            logger.warning(f"主解决器失败: {e}")
            return await self.fallback(**params)

# 回退策略
async def queue_for_manual_review(**params):
    """将任务排队等待人工处理"""
    await task_queue.push({
        "type": "captcha_manual",
        "params": params,
        "created_at": datetime.now()
    })
    return None

async def skip_captcha_action(**params):
    """跳过需要验证码的操作"""
    logger.info(f"跳过受验证码保护的操作: {params}")
    return None

监控和告警

example.py
from dataclasses import dataclass
from collections import deque

@dataclass
class SolverMetrics:
    total_attempts: int = 0
    successes: int = 0
    failures: int = 0
    timeouts: int = 0
    avg_solve_time: float = 0.0

    recent_errors: deque = None

    def __post_init__(self):
        self.recent_errors = deque(maxlen=100)

    @property
    def success_rate(self):
        if self.total_attempts == 0:
            return 0.0
        return self.successes / self.total_attempts * 100

    def should_alert(self):
        # 成功率低于 80% 时告警
        return self.success_rate < 80 and self.total_attempts > 10

# 使用
metrics = SolverMetrics()

async def solve_with_metrics(solver, params):
    start = time.time()
    metrics.total_attempts += 1

    try:
        result = await solver.solve(**params)
        metrics.successes += 1
        metrics.avg_solve_time = (
            metrics.avg_solve_time * 0.9 +
            (time.time() - start) * 0.1
        )
        return result
    except asyncio.TimeoutError:
        metrics.timeouts += 1
        raise
    except Exception as e:
        metrics.failures += 1
        metrics.recent_errors.append(str(e))
        raise
专业提示:实施健康检查,在生产工作流需要之前验证您的验证码解决管道正常工作。
API错误处理生产环境最佳实践

准备大规模破解验证码?

免费获取 50 积分开始使用。无需信用卡。