Error handling для AI API — это набор паттернов для корректной обработки сбоев при взаимодействии с языковыми моделями. AI API могут возвращать ошибки по множеству причин: превышение лимитов, таймауты, перегрузка сервера. Правильная обработка ошибок — разница между стабильным продуктом и сервисом, который «падает» при каждом всплеске нагрузки.
Типы ошибок AI API
Основные HTTP-коды ошибок:
| Код | Ошибка | Действие |
|---|---|---|
| 400 | Bad Request | Исправить запрос (невалидный JSON, слишком длинный prompt) |
| 401 | Unauthorized | Проверить API-ключ |
| 429 | Rate Limit | Retry с backoff, подождать |
| 500 | Server Error | Retry с backoff |
| 503 | Service Unavailable | Retry с backoff, возможно failover |
| timeout | Request Timeout | Retry с увеличенным timeout |
Ключевое правило: retry только для transient-ошибок (429, 500, 503, timeout). Ошибки 400, 401, 403 нет смысла ретраить — они не исправятся сами.
Retry с Exponential Backoff
Exponential backoff — это стратегия, при которой пауза между повторами экспоненциально увеличивается:
import time
import random
from openai import OpenAI, APIError, RateLimitError, APITimeoutError
client = OpenAI(
base_url="https://api.modelswitch.ru/v1",
api_key="msk_ваш_ключ"
)
def completion_with_retry(
messages: list,
model: str = "gpt-4o",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> str:
"""Запрос к AI API с exponential backoff."""
for attempt in range(max_retries + 1):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
timeout=30.0
)
return response.choices[0].message.content
except RateLimitError as e:
if attempt == max_retries:
raise
# Используем Retry-After если есть
retry_after = getattr(e, "retry_after", None)
delay = retry_after if retry_after else min(base_delay * (2 ** attempt), max_delay)
# Добавляем jitter для избежания thundering herd
delay += random.uniform(0, delay * 0.1)
print(f"Rate limit. Retry через {delay:.1f}с (попытка {attempt + 1}/{max_retries})")
time.sleep(delay)
except (APIError, APITimeoutError) as e:
if attempt == max_retries:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
delay += random.uniform(0, delay * 0.1)
print(f"Ошибка: {e}. Retry через {delay:.1f}с (попытка {attempt + 1}/{max_retries})")
time.sleep(delay)
# Использование
result = completion_with_retry([{"role": "user", "content": "Привет!"}])
print(result)
Retry на TypeScript
import OpenAI from "openai";
const client = new OpenAI({
baseURL: "https://api.modelswitch.ru/v1",
apiKey: "msk_ваш_ключ",
});
async function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
async function completionWithRetry(
messages: OpenAI.ChatCompletionMessageParam[],
model = "gpt-4o",
maxRetries = 5
): Promise<string> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const response = await client.chat.completions.create({
model,
messages,
});
return response.choices[0].message.content ?? "";
} catch (error) {
if (attempt === maxRetries) throw error;
const isRetryable =
error instanceof OpenAI.RateLimitError ||
error instanceof OpenAI.InternalServerError ||
error instanceof OpenAI.APIConnectionError;
if (!isRetryable) throw error;
const delay = Math.min(1000 * 2 ** attempt, 60000);
const jitter = Math.random() * delay * 0.1;
console.log(`Retry через ${((delay + jitter) / 1000).toFixed(1)}с (попытка ${attempt + 1})`);
await sleep(delay + jitter);
}
}
throw new Error("Max retries exceeded");
}
const result = await completionWithRetry([
{ role: "user", content: "Привет!" },
]);
console.log(result);
Паттерн Circuit Breaker
Circuit Breaker предотвращает каскадные сбои. Если API возвращает ошибки слишком часто, «выключатель» срабатывает и запросы временно прекращаются:
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0.0
self.state = "closed" # closed, open, half-open
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time >= self.recovery_timeout:
self.state = "half-open"
return True
return False
return True # half-open: разрешаем одну попытку
def record_success(self):
self.failure_count = 0
self.state = "closed"
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "open"
print(f"Circuit OPEN: API недоступен, пауза {self.recovery_timeout}с")
Лучшие практики
- Всегда используйте таймауты — установите timeout 30-60 секунд для обычных запросов, 120 секунд для streaming.
- Логируйте ошибки — записывайте код ошибки, модель, timestamp для анализа паттернов сбоев.
- Fallback-модели — при стабильных ошибках одной модели переключайтесь на другую через ModelSwitch.
- Graceful degradation — если AI недоступен, покажите кешированный ответ или сообщение «сервис временно недоступен».
- Мониторинг error rate — настройте алерты, если процент ошибок превышает 5%.
Через ModelSwitch retry и failover упрощаются: один API, одна точка входа. При ошибке конкретной модели можно мгновенно переключиться на альтернативную, изменив один параметр.