LLM API 모범 사례
LLM API를 프로덕션 환경에 안전하고 효율적으로 배포하려면 에러 처리, Rate Limiting, 타임아웃, 캐싱, 로깅, 모니터링, 보안 등 여러 측면을 고려해야 합니다. 이 가이드는 안정적이고 비용 효율적인 LLM 서비스를 구축하는 데 필요한 모든 모범 사례를 다룹니다.
업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다.
최신 내용은 공식 문서를 확인하세요.
핵심 포인트
- 지수 백오프 재시도로 일시적 오류 자동 복구
- Rate Limiting 대응으로 서비스 중단 방지
- 적절한 타임아웃 설정으로 리소스 낭비 차단
- 캐싱으로 비용 90% 절감 가능
- 구조화된 로깅과 모니터링으로 문제 조기 발견
- API 키 보안 및 데이터 암호화
- 프로덕션 체크리스트 20항목 완벽 준수
에러 처리
에러 유형
에러
LLM API 주요 에러 유형:
// 1. Rate Limit Error (429)
원인: 요청 한도 초과 (RPM, TPM, RPD)
대응: 지수 백오프 재시도, Rate Limiter 구현
// 2. Timeout Error
원인: 네트워크 지연, 긴 생성 시간
대응: 타임아웃 증가, 스트리밍 사용
// 3. Invalid Request (400)
원인: 잘못된 파라미터, 모델명 오타
대응: 입력 검증, 스키마 정의
// 4. Authentication Error (401)
원인: 잘못된 API 키, 만료된 키
대응: 키 검증, 자동 갱신 메커니즘
// 5. Context Length Error (400)
원인: 입력 토큰이 모델 한도 초과
대응: 토큰 계산, 컨텍스트 트리밍
// 6. Server Error (500, 502, 503)
원인: API 제공자 장애
대응: 폴백 제공자, Circuit Breaker
// 7. Content Filter (400)
원인: 유해 콘텐츠 감지
대응: 프롬프트 수정, 사용자 알림
재시도 전략
python
import time
import random
from typing import Callable, TypeVar, Any
import logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
def exponential_backoff_retry(
func: Callable[..., T],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True,
retriable_exceptions: tuple = None
) -> Callable[..., T]:
"""
지수 백오프 재시도 데코레이터
Args:
max_retries: 최대 재시도 횟수
base_delay: 초기 대기 시간 (초)
max_delay: 최대 대기 시간 (초)
exponential_base: 지수 베이스
jitter: 랜덤 지터 추가 (Thundering Herd 방지)
retriable_exceptions: 재시도할 예외 튜플
"""
if retriable_exceptions is None:
from openai import RateLimitError, APIConnectionError, APITimeoutError
retriable_exceptions = (RateLimitError, APIConnectionError, APITimeoutError)
def decorator(*args, **kwargs) -> T:
last_exception = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except retriable_exceptions as e:
last_exception = e
if attempt == max_retries:
logger.error(f"Max retries ({max_retries}) exceeded")
raise
# 대기 시간 계산
delay = min(base_delay * (exponential_base ** attempt), max_delay)
# 지터 추가 (±25%)
if jitter:
delay *= (0.75 + 0.5 * random.random())
logger.warning(
f"Attempt {attempt + 1}/{max_retries} failed: {e}. "
f"Retrying in {delay:.2f}s..."
)
time.sleep(delay)
except Exception as e:
# 재시도 불가능한 예외는 즉시 발생
logger.error(f"Non-retriable error: {e}")
raise
raise last_exception
return decorator
# 사용 예제
from openai import OpenAI
client = OpenAI()
@exponential_backoff_retry
def call_llm(prompt: str) -> str:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content
# 자동으로 재시도
try:
result = call_llm("Explain quantum computing")
except Exception as e:
print(f"Failed after retries: {e}")
Tenacity 라이브러리 사용
python
# pip install tenacity
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
before_sleep_log
)
from openai import RateLimitError, APIConnectionError
import logging
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=2, max=60),
retry=retry_if_exception_type((RateLimitError, APIConnectionError)),
before_sleep=before_sleep_log(logger, logging.WARNING)
)
def resilient_llm_call(messages: list) -> str:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return response.choices[0].message.content
# 조건부 재시도 (Rate Limit 에러 시 더 오래 대기)
from tenacity import wait_chain, wait_fixed
@retry(
wait=wait_chain(
wait_fixed(2), # 첫 2회: 2초
wait_exponential(min=4, max=60) # 이후: 지수 백오프
),
retry=retry_if_exception_type(RateLimitError)
)
def rate_limited_call(prompt: str) -> str:
return client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
).choices[0].message.content
컨텍스트 길이 에러 처리
python
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o") -> int:
"""정확한 토큰 수 계산"""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def trim_messages(messages: list, max_tokens: int, model: str) -> list:
"""메시지를 토큰 한도 내로 트리밍"""
total_tokens = sum(count_tokens(msg['content'], model) for msg in messages)
if total_tokens <= max_tokens:
return messages
# 시스템 메시지는 유지, 오래된 대화부터 제거
system_msg = [messages[0]] if messages[0]['role'] == 'system' else []
user_messages = messages[1:] if system_msg else messages
# 최신 메시지부터 역순으로 추가
trimmed = []
current_tokens = count_tokens(system_msg[0]['content'], model) if system_msg else 0
for msg in reversed(user_messages):
msg_tokens = count_tokens(msg['content'], model)
if current_tokens + msg_tokens <= max_tokens:
trimmed.insert(0, msg)
current_tokens += msg_tokens
else:
break
return system_msg + trimmed
# 자동 트리밍 래퍼
def safe_chat_completion(messages: list, model: str = "gpt-4o", max_tokens: int = 100000):
"""컨텍스트 길이 자동 관리"""
try:
return client.chat.completions.create(
model=model,
messages=messages
)
except Exception as e:
if "context_length_exceeded" in str(e):
logger.warning("Context length exceeded, trimming...")
trimmed = trim_messages(messages, max_tokens, model)
return client.chat.completions.create(
model=model,
messages=trimmed
)
raise
Rate Limiting 대응
Rate Limit 이해
한도
주요 LLM 제공자 Rate Limits (2026년 1월):
// OpenAI (Tier 5 기준)
GPT-4o:
RPM (요청/분): 10,000
TPM (토큰/분): 30,000,000
RPD (요청/일): 10,000
GPT-4o Mini:
RPM: 30,000
TPM: 150,000,000
RPD: 제한 없음
// Anthropic
Claude Opus:
RPM: 5,000
TPM: 10,000,000
Claude Sonnet:
RPM: 5,000
TPM: 20,000,000
// Google
Gemini Pro:
RPM: 1,500
TPM: 4,000,000
// 계층별 차이
Tier 1 (무료): RPM 500, TPM 200,000
Tier 2 (변동+): RPM 5,000, TPM 450,000
Tier 5 (변동,000+): RPM 10,000, TPM 30,000,000
Rate Limiter 구현
python
import time
from collections import deque
from threading import Lock
class TokenBucketRateLimiter:
"""토큰 버킷 알고리즘 기반 Rate Limiter"""
def __init__(self, rpm: int, tpm: int):
"""
Args:
rpm: 분당 요청 한도
tpm: 분당 토큰 한도
"""
self.rpm = rpm
self.tpm = tpm
self.request_tokens = rpm
self.token_tokens = tpm
self.last_refill = time.time()
self.lock = Lock()
def _refill(self):
"""경과 시간에 따라 토큰 리필"""
now = time.time()
elapsed = now - self.last_refill
# 분당 한도이므로 초당 리필율 계산
request_refill = elapsed * (self.rpm / 60)
token_refill = elapsed * (self.tpm / 60)
self.request_tokens = min(self.rpm, self.request_tokens + request_refill)
self.token_tokens = min(self.tpm, self.token_tokens + token_refill)
self.last_refill = now
def acquire(self, estimated_tokens: int = 1000) -> float:
"""
토큰 획득 시도
Returns:
대기 시간 (초). 0이면 즉시 사용 가능.
"""
with self.lock:
self._refill()
# 요청 및 토큰 모두 충분한지 확인
if self.request_tokens >= 1 and self.token_tokens >= estimated_tokens:
self.request_tokens -= 1
self.token_tokens -= estimated_tokens
return 0.0
# 대기 시간 계산
wait_for_request = max(0, (1 - self.request_tokens) / (self.rpm / 60))
wait_for_tokens = max(0, (estimated_tokens - self.token_tokens) / (self.tpm / 60))
return max(wait_for_request, wait_for_tokens)
# 사용
limiter = TokenBucketRateLimiter(rpm=10000, tpm=30_000_000)
def rate_limited_call(messages: list, estimated_tokens: int = 1000):
# 토큰 획득 (필요시 대기)
wait_time = limiter.acquire(estimated_tokens)
if wait_time > 0:
logger.info(f"Rate limit approached, waiting {wait_time:.2f}s")
time.sleep(wait_time)
# API 호출
return client.chat.completions.create(
model="gpt-4o",
messages=messages
)
슬라이딩 윈도우 Rate Limiter
python
from collections import deque
from threading import Lock
import time
class SlidingWindowRateLimiter:
"""슬라이딩 윈도우 기반 Rate Limiter (더 정확)"""
def __init__(self, max_requests: int, window_seconds: int = 60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.lock = Lock()
def _remove_old_requests(self):
"""윈도우 밖의 오래된 요청 제거"""
now = time.time()
cutoff = now - self.window_seconds
while self.requests and self.requests[0] < cutoff:
self.requests.popleft()
def acquire(self) -> bool:
"""요청 허용 여부"""
with self.lock:
self._remove_old_requests()
if len(self.requests) < self.max_requests:
self.requests.append(time.time())
return True
return False
def wait_if_needed(self):
"""필요시 대기"""
while not self.acquire():
# 가장 오래된 요청이 만료될 때까지 대기
with self.lock:
if self.requests:
wait_time = self.requests[0] + self.window_seconds - time.time()
if wait_time > 0:
time.sleep(wait_time + 0.1)
# 사용
limiter = SlidingWindowRateLimiter(max_requests=10000, window_seconds=60)
def api_call(prompt: str):
limiter.wait_if_needed()
return client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
타임아웃 설정
적절한 타임아웃
타임아웃
권장 타임아웃 설정:
// 짧은 응답 (요약, 분류)
타임아웃: 10-15초
이유: 짧은 출력, 빠른 실패 선호
// 중간 길이 응답 (일반 대화)
타임아웃: 30-60초
이유: 대부분의 요청 처리, 네트워크 지연 고려
// 긴 응답 (코드 생성, 긴 분석)
타임아웃: 90-120초
이유: 복잡한 추론, 긴 출력
// 스트리밍
연결 타임아웃: 10초 (첫 바이트)
읽기 타임아웃: 5초 (청크 간)
총 타임아웃: 제한 없음
// 주의사항
• 너무 짧음 → 정상 요청도 실패
• 너무 김 → 리소스 낭비, 사용자 대기
• 스트리밍 사용 시 총 타임아웃 길게 설정
타임아웃 구현
python
from openai import OpenAI
import httpx
# 1. OpenAI SDK 타임아웃 설정
client = OpenAI(
timeout=30.0, # 총 타임아웃
max_retries=2
)
# 2. 세밀한 타임아웃 제어
client = OpenAI(
timeout=httpx.Timeout(
connect=5.0, # 연결 타임아웃
read=30.0, # 읽기 타임아웃
write=5.0, # 쓰기 타임아웃
pool=5.0 # 풀 타임아웃
)
)
# 3. 요청별 타임아웃 오버라이드
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
timeout=60.0 # 이 요청만 60초
)
# 4. 스트리밍 타임아웃
stream = client.chat.completions.create(
model="gpt-4o",
messages=messages,
stream=True,
timeout=httpx.Timeout(10.0, read=5.0) # 청크당 5초
)
for chunk in stream:
print(chunk.choices[0].delta.content, end="")
# 5. 타임아웃 에러 처리
from openai import APITimeoutError
try:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
timeout=30.0
)
except APITimeoutError:
logger.warning("Request timed out, falling back to shorter prompt")
# 프롬프트 축소 또는 다른 모델 시도
response = client.chat.completions.create(
model="gpt-4o-mini", # 더 빠른 모델
messages=trim_messages(messages, max_tokens=2000),
timeout=15.0
)
캐싱 전략
캐싱 유형
유형
// 1. 정확한 매칭 캐시
입력이 완전히 동일한 경우만 캐시 히트
장점: 구현 간단, 안전
단점: 히트율 낮음
사용처: FAQ, 정적 콘텐츠
// 2. 시맨틱 캐시
의미가 유사한 쿼리도 캐시 히트
장점: 높은 히트율
단점: 임베딩 비용, 복잡도
사용처: 유사 질문 많은 환경
// 3. 프롬프트 캐싱 (제공자 네이티브)
Claude, OpenAI의 프롬프트 캐싱 기능
장점: 공식 지원, 90% 비용 절감
단점: 제공자 종속
사용처: 긴 컨텍스트 반복 사용
// 4. 응답 캐싱
LLM 응답 자체를 캐싱
장점: 즉시 응답, 비용 제로
단점: 스토리지 필요, 신선도 관리
사용처: 정적 데이터 조회
정확한 매칭 캐시 (Redis)
python
import redis
import hashlib
import json
class LLMCache:
"""Redis 기반 LLM 응답 캐시"""
def __init__(self, redis_url: str = "redis://localhost:6379", ttl: int = 3600):
self.redis = redis.from_url(redis_url)
self.ttl = ttl # Time To Live (초)
def _generate_key(self, model: str, messages: list, **kwargs) -> str:
"""캐시 키 생성 (입력 해시)"""
cache_input = {
'model': model,
'messages': messages,
'params': kwargs
}
serialized = json.dumps(cache_input, sort_keys=True)
hash_digest = hashlib.sha256(serialized.encode()).hexdigest()
return f"llm_cache:{hash_digest}"
def get(self, model: str, messages: list, **kwargs) -> str | None:
"""캐시된 응답 조회"""
key = self._generate_key(model, messages, **kwargs)
cached = self.redis.get(key)
if cached:
logger.info(f"Cache HIT: {key[:16]}...")
return json.loads(cached)
logger.info(f"Cache MISS: {key[:16]}...")
return None
def set(self, model: str, messages: list, response: str, **kwargs):
"""응답 캐싱"""
key = self._generate_key(model, messages, **kwargs)
self.redis.setex(key, self.ttl, json.dumps(response))
logger.info(f"Cached response: {key[:16]}...")
def invalidate(self, pattern: str = "llm_cache:*"):
"""캐시 무효화"""
keys = self.redis.keys(pattern)
if keys:
self.redis.delete(*keys)
logger.info(f"Invalidated {len(keys)} cache entries")
# 사용
cache = LLMCache(ttl=3600) # 1시간
def cached_chat(model: str, messages: list, **kwargs) -> str:
# 캐시 확인
cached_response = cache.get(model, messages, **kwargs)
if cached_response:
return cached_response
# 캐시 미스 → API 호출
response = client.chat.completions.create(
model=model,
messages=messages,
**kwargs
)
result = response.choices[0].message.content
# 응답 캐싱
cache.set(model, messages, result, **kwargs)
return result
# 테스트
print(cached_chat("gpt-4o", [{"role": "user", "content": "Hello"}])) # MISS
print(cached_chat("gpt-4o", [{"role": "user", "content": "Hello"}])) # HIT
시맨틱 캐시
python
from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
client = OpenAI()
class SemanticCache:
"""임베딩 기반 시맨틱 캐시"""
def __init__(self, similarity_threshold: float = 0.95):
self.cache = [] # [(embedding, query, response), ...]
self.threshold = similarity_threshold
def _get_embedding(self, text: str) -> np.ndarray:
"""텍스트 임베딩 생성"""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
def get(self, query: str) -> str | None:
"""유사한 쿼리의 캐시된 응답 찾기"""
if not self.cache:
return None
query_embedding = self._get_embedding(query)
# 모든 캐시 항목과 유사도 계산
max_similarity = 0.0
best_response = None
for cached_embedding, cached_query, cached_response in self.cache:
similarity = cosine_similarity(
[query_embedding],
[cached_embedding]
)[0][0]
if similarity > max_similarity:
max_similarity = similarity
best_response = cached_response
# 임계값 초과 시 캐시 히트
if max_similarity >= self.threshold:
logger.info(f"Semantic cache HIT (similarity: {max_similarity:.3f})")
return best_response
logger.info(f"Semantic cache MISS (best: {max_similarity:.3f})")
return None
def set(self, query: str, response: str):
"""쿼리와 응답 캐싱"""
embedding = self._get_embedding(query)
self.cache.append((embedding, query, response))
# 캐시 크기 제한 (LRU)
if len(self.cache) > 1000:
self.cache.pop(0)
# 사용
semantic_cache = SemanticCache(similarity_threshold=0.95)
queries = [
"Python이란 무엇인가요?",
"Python이 뭐에요?", # 유사 → 캐시 히트
"자바스크립트란?", # 다름 → 캐시 미스
]
for q in queries:
cached = semantic_cache.get(q)
if cached:
print(f"Cached: {cached[:50]}...")
else:
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": q}]
).choices[0].message.content
semantic_cache.set(q, response)
print(f"Fresh: {response[:50]}...")
프롬프트 캐싱 (제공자 네이티브)
python
# Claude 프롬프트 캐싱
from anthropic import Anthropic
client = Anthropic()
# 긴 시스템 프롬프트 (캐싱 대상)
long_context = """
[10,000 토큰의 문서, API 명세, 코드베이스 등]
"""
response = client.messages.create(
model="claude-" ,
max_tokens=1000,
system=[
{
"type": "text",
"text": long_context,
"cache_control": {"type": "ephemeral"} # 캐싱 활성화
}
],
messages=[
{"role": "user", "content": "이 문서에 대한 질문..."}
]
)
# 비용 절감 확인
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")
print(f"Regular input tokens: {response.usage.input_tokens}")
# 캐시 히트 시: cache_read_input_tokens 비용 90% 할인
로깅 및 모니터링
구조화된 로깅
python
import logging
import json
from datetime import datetime
from typing import Any
class JSONFormatter(logging.Formatter):
"""JSON 형식 로그"""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
# 추가 필드
if hasattr(record, 'extra_fields'):
log_data.update(record.extra_fields)
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
# 로거 설정
logger = logging.getLogger("llm_service")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
# LLM 호출 로깅 래퍼
import time
from functools import wraps
def log_llm_call(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
model = kwargs.get('model', 'unknown')
messages = kwargs.get('messages', [])
try:
result = func(*args, **kwargs)
latency = time.time() - start_time
# 성공 로그
logger.info(
"LLM call successful",
extra={'extra_fields': {
"model": model,
"latency_ms": int(latency * 1000),
"input_tokens": result.usage.prompt_tokens,
"output_tokens": result.usage.completion_tokens,
"total_tokens": result.usage.total_tokens,
"cost_usd": calculate_cost(model, result.usage),
"message_count": len(messages),
"status": "success"
}}
)
return result
except Exception as e:
latency = time.time() - start_time
# 실패 로그
logger.error(
f"LLM call failed: {e}",
extra={'extra_fields': {
"model": model,
"latency_ms": int(latency * 1000),
"error_type": type(e).__name__,
"status": "error"
}},
exc_info=True
)
raise
return wrapper
@log_llm_call
def call_llm(**kwargs):
return client.chat.completions.create(**kwargs)
메트릭 수집
python
# Prometheus 메트릭
from prometheus_client import Counter, Histogram, Gauge
# 메트릭 정의
llm_requests_total = Counter(
'llm_requests_total',
'Total LLM API requests',
['model', 'status']
)
llm_latency_seconds = Histogram(
'llm_latency_seconds',
'LLM API latency',
['model'],
buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0)
)
llm_tokens_total = Counter(
'llm_tokens_total',
'Total tokens processed',
['model', 'type'] # type: input/output
)
llm_cost_usd_total = Counter(
'llm_cost_usd_total',
'Total cost in USD',
['model']
)
llm_active_requests = Gauge(
'llm_active_requests',
'Number of active LLM requests',
['model']
)
# 메트릭 래퍼
def track_metrics(func):
@wraps(func)
def wrapper(*args, **kwargs):
model = kwargs.get('model', 'unknown')
llm_active_requests.labels(model=model).inc()
start = time.time()
try:
result = func(*args, **kwargs)
latency = time.time() - start
# 성공 메트릭
llm_requests_total.labels(model=model, status='success').inc()
llm_latency_seconds.labels(model=model).observe(latency)
llm_tokens_total.labels(model=model, type='input').inc(
result.usage.prompt_tokens
)
llm_tokens_total.labels(model=model, type='output').inc(
result.usage.completion_tokens
)
cost = calculate_cost(model, result.usage)
llm_cost_usd_total.labels(model=model).inc(cost)
return result
except Exception as e:
# 실패 메트릭
llm_requests_total.labels(model=model, status='error').inc()
raise
finally:
llm_active_requests.labels(model=model).dec()
return wrapper
# Prometheus 엔드포인트
from prometheus_client import start_http_server
start_http_server(9090) # http://localhost:9090/metrics
알림 설정
yaml
# Prometheus Alert Rules (prometheus_rules.yml)
groups:
- name: llm_alerts
interval: 30s
rules:
# 높은 에러율
- alert: HighLLMErrorRate
expr: |
(
rate(llm_requests_total{status="error"}[5m])
/
rate(llm_requests_total[5m])
) > 0.05
for: 5m
labels:
severity: warning
annotations:
summary: "High LLM error rate"
description: "Error rate {{ $value | humanizePercentage }} for {{ $labels.model }}"
# 느린 응답
- alert: SlowLLMResponse
expr: |
histogram_quantile(0.95, llm_latency_seconds_bucket) > 30
for: 10m
labels:
severity: info
annotations:
summary: "Slow LLM responses"
description: "P95 latency {{ $value }}s for {{ $labels.model }}"
# 높은 비용
- alert: HighLLMCost
expr: |
increase(llm_cost_usd_total[1h]) > 100
for: 5m
labels:
severity: critical
annotations:
summary: "High LLM cost"
description: "Cost ${{ $value }} in last hour"
# Rate Limit 접근
- alert: ApproachingRateLimit
expr: |
rate(llm_requests_total[1m]) > 150 # RPM 10k의 90%
for: 2m
labels:
severity: warning
annotations:
summary: "Approaching rate limit"
description: "{{ $value }} req/min for {{ $labels.model }}"
보안
API 키 보안
모범 사례
// 1. 환경 변수 사용
❌ 나쁨:
api_key = "sk-proj-abc123..." # 하드코딩
✅ 좋음:
api_key = os.getenv("OPENAI_API_KEY")
// 2. .env 파일 (로컬 개발)
# .env
OPENAI_API_KEY=sk-proj-...
ANTHROPIC_API_KEY=sk-ant-...
# .gitignore에 추가
.env
*.env
// 3. 비밀 관리 서비스 (프로덕션)
AWS Secrets Manager
Google Cloud Secret Manager
Azure Key Vault
HashiCorp Vault
// 4. 키 로테이션
• 정기적으로 API 키 갱신 (90일마다)
• 침해 의심 시 즉시 폐기
• 여러 키 사용 (서비스별, 환경별)
// 5. 최소 권한 원칙
• 필요한 권한만 부여 (예: read-only)
• 프로젝트별 키 분리
• 개발/스테이징/프로덕션 키 분리
// 6. 로그에서 키 제외
❌ 나쁨:
logger.info(f"Using key: {api_key}")
✅ 좋음:
logger.info(f"Using key: {api_key[:8]}...")
API 키 관리 코드
python
# 환경 변수
import os
from dotenv import load_dotenv
load_dotenv() # .env 파일 로드
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY not set")
# AWS Secrets Manager
import boto3
import json
def get_secret(secret_name: str) -> dict:
client = boto3.client('secretsmanager', region_name='us-east-1')
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
secrets = get_secret('prod/llm/api-keys')
openai_key = secrets['openai']
anthropic_key = secrets['anthropic']
# 키 마스킹 헬퍼
def mask_api_key(key: str) -> str:
"""API 키를 안전하게 표시"""
if not key or len(key) < 12:
return "***"
return f"{key[:8]}...{key[-4:]}"
logger.info(f"Using API key: {mask_api_key(api_key)}")
입력 검증
python
from pydantic import BaseModel, Field, validator
class ChatRequest(BaseModel):
"""입력 검증 스키마"""
model: str = Field(..., regex=r'^(gpt-4o|claude-|gemini-pro)$' )
messages: list = Field(..., min_items=1, max_items=100)
temperature: float = Field(0.7, ge=0.0, le=2.0)
max_tokens: int = Field(1000, ge=1, le=4000)
@validator('messages')
def validate_messages(cls, v):
for msg in v:
if 'role' not in msg or 'content' not in msg:
raise ValueError("Message must have 'role' and 'content'")
if msg['role'] not in ['system', 'user', 'assistant']:
raise ValueError(f"Invalid role: {msg['role']}")
# 콘텐츠 길이 제한 (프롬프트 인젝션 방지)
if len(msg['content']) > 50000:
raise ValueError("Message content too long")
return v
# 사용
try:
request = ChatRequest(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello"}]
)
except Exception as e:
logger.error(f"Invalid request: {e}")
raise
데이터 프라이버시
모범 사례
// 1. PII 제거 (개인 식별 정보)
import re
def redact_pii(text: str) -> str:
"""민감 정보 마스킹"""
# 이메일
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'[EMAIL]', text)
# 전화번호
text = re.sub(r'\b\d{3}[-.]?\d{3,4}[-.]?\d{4}\b',
'[PHONE]', text)
# 신용카드 (간단한 패턴)
text = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b',
'[CARD]', text)
return text
// 2. 데이터 암호화
from cryptography.fernet import Fernet
class SecureStorage:
def __init__(self, key: bytes):
self.cipher = Fernet(key)
def encrypt(self, data: str) -> bytes:
return self.cipher.encrypt(data.encode())
def decrypt(self, encrypted: bytes) -> str:
return self.cipher.decrypt(encrypted).decode()
// 3. 로그에서 민감 정보 제외
• 사용자 프롬프트 전체 로깅 금지
• 응답 내용 샘플링만 로깅
• 개인 정보 필터링 후 로깅
// 4. 데이터 보존 정책
• 필요 기간만 저장 (30일, 90일)
• 자동 삭제 스크립트
• 사용자 요청 시 즉시 삭제
프로덕션 체크리스트
체크리스트
## 1. 에러 처리
☐ 지수 백오프 재시도 구현
☐ Circuit Breaker 패턴 적용
☐ 타임아웃 설정 (연결, 읽기, 총)
☐ 모든 예외 유형 처리 (Rate Limit, Timeout, Auth, etc.)
☐ 컨텍스트 길이 초과 시 자동 트리밍
## 2. Rate Limiting
☐ Rate Limiter 구현 (RPM, TPM 모두)
☐ 여러 계정/키로 로드 밸런싱
☐ Rate Limit 접근 시 알림
☐ 요청 큐 구현 (급증 대응)
## 3. 성능
☐ 캐싱 전략 (정확/시맨틱/프롬프트)
☐ 스트리밍 사용 (긴 응답)
☐ 적절한 모델 선택 (작업 복잡도 기반)
☐ 응답 시간 P95 < 5초
☐ 동시 요청 처리 (비동기)
## 4. 비용 최적화
☐ 비용 추적 및 알림
☐ 일일/월간 예산 제한
☐ 캐시 히트율 > 30%
☐ 단순 작업 → 저렴한 모델
☐ 토큰 사용량 모니터링
## 5. 로깅 및 모니터링
☐ 구조화된 로깅 (JSON)
☐ 메트릭 수집 (Prometheus/CloudWatch)
☐ 대시보드 구축 (Grafana)
☐ 알림 설정 (에러율, 지연시간, 비용)
☐ 분산 추적 (Jaeger/DataDog)
## 6. 보안
☐ API 키 환경 변수/비밀 관리 서비스
☐ HTTPS 강제
☐ 입력 검증 (길이, 형식, 악성 콘텐츠)
☐ PII 자동 제거
☐ 로그에서 민감 정보 제외
☐ Rate Limiting (사용자별)
## 7. 가용성
☐ 폴백 제공자 설정 (주 → 보조 → 로컬)
☐ Health Check 엔드포인트
☐ 자동 복구 메커니즘
☐ 점진적 배포 (Canary, Blue/Green)
☐ 장애 시나리오 테스트
## 8. 테스트
☐ 단위 테스트 (모킹)
☐ 통합 테스트 (실제 API)
☐ 부하 테스트 (예상 트래픽 2배)
☐ 장애 주입 테스트 (Chaos Engineering)
☐ 회귀 테스트 (품질 검증)
## 9. 문서화
☐ API 문서 (OpenAPI/Swagger)
☐ Runbook (장애 대응)
☐ 아키텍처 다이어그램
☐ 비용 모델 문서
☐ 온콜 가이드
## 10. 규정 준수
☐ GDPR 준수 (EU 사용자)
☐ CCPA 준수 (캘리포니아)
☐ 데이터 보존 정책
☐ 사용자 동의 관리
☐ 감사 로그
프로덕션급 통합 예제
python
# production_llm_client.py
import os
import time
import logging
from typing import Optional
from dataclasses import dataclass
from openai import OpenAI, APIError, RateLimitError, APITimeoutError
from anthropic import Anthropic, APIConnectionError
from tenacity import retry, stop_after_attempt, wait_exponential
from prometheus_client import Counter, Histogram
import redis
logger = logging.getLogger(__name__)
# 메트릭
requests_total = Counter('llm_requests_total', 'Total requests', ['provider', 'status'])
latency_seconds = Histogram('llm_latency_seconds', 'Latency', ['provider'])
cost_total = Counter('llm_cost_usd_total', 'Total cost', ['provider'])
@dataclass
class LLMConfig:
"""LLM 클라이언트 설정"""
openai_key: str
anthropic_key: str
redis_url: str = "redis://localhost:6379"
cache_ttl: int = 3600
timeout: float = 30.0
max_retries: int = 3
daily_budget: float = 100.0
class ProductionLLMClient:
"""
프로덕션 환경의 LLM 클라이언트
- 재시도, 폴백, 캐싱, 메트릭, 비용 추적
"""
def __init__(self, config: LLMConfig):
self.config = config
self.openai = OpenAI(api_key=config.openai_key, timeout=config.timeout)
self.anthropic = Anthropic(api_key=config.anthropic_key, timeout=config.timeout)
self.cache = redis.from_url(config.redis_url)
self.daily_spend = 0.0
def _check_budget(self, estimated_cost: float):
if self.daily_spend + estimated_cost > self.config.daily_budget:
raise Exception(f"Daily budget ${self.config.daily_budget} exceeded")
def _get_cache_key(self, model: str, messages: list) -> str:
import hashlib, json
key_data = json.dumps({'model': model, 'messages': messages}, sort_keys=True)
return f"llm:{hashlib.sha256(key_data.encode()).hexdigest()}"
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=2, max=60),
retry_error_callback=lambda _: None
)
def _call_openai(self, model: str, messages: list) -> str:
response = self.openai.chat.completions.create(
model=model,
messages=messages
)
# 비용 추적
cost = (
response.usage.prompt_tokens * 2.5 / 1_000_000 +
response.usage.completion_tokens * 10 / 1_000_000
)
self.daily_spend += cost
cost_total.labels(provider='openai').inc(cost)
return response.choices[0].message.content
def _call_anthropic(self, model: str, messages: list) -> str:
response = self.anthropic.messages.create(
model=model,
messages=messages,
max_tokens=2000
)
cost = (
response.usage.input_tokens * 3 / 1_000_000 +
response.usage.output_tokens * 15 / 1_000_000
)
self.daily_spend += cost
cost_total.labels(provider='anthropic').inc(cost)
return response.content[0].text
def chat(self, model: str, messages: list, use_cache: bool = True) -> str:
"""
채팅 완성 (캐싱, 재시도, 폴백 포함)
Args:
model: 모델 이름 (예: 'gpt-4o', 'claude-')
messages: 메시지 리스트
use_cache: 캐싱 사용 여부
Returns:
LLM 응답
"""
start_time = time.time()
# 1. 캐시 확인
if use_cache:
cache_key = self._get_cache_key(model, messages)
cached = self.cache.get(cache_key)
if cached:
logger.info("Cache HIT")
return cached.decode()
# 2. 예산 확인
self._check_budget(estimated_cost=0.05)
# 3. API 호출 (폴백 체인)
providers = [
('openai', self._call_openai, 'gpt-4o'),
('anthropic', self._call_anthropic, 'claude-' ),
]
last_error = None
for provider_name, provider_func, fallback_model in providers:
try:
logger.info(f"Trying {provider_name}...")
result = provider_func(fallback_model, messages)
# 성공 메트릭
requests_total.labels(provider=provider_name, status='success').inc()
latency = time.time() - start_time
latency_seconds.labels(provider=provider_name).observe(latency)
# 캐싱
if use_cache:
self.cache.setex(cache_key, self.config.cache_ttl, result)
return result
except Exception as e:
last_error = e
requests_total.labels(provider=provider_name, status='error').inc()
logger.warning(f"{provider_name} failed: {e}")
continue
# 모든 제공자 실패
raise Exception(f"All providers failed. Last error: {last_error}")
# 사용 예제
if __name__ == "__main__":
config = LLMConfig(
openai_key=os.getenv("OPENAI_API_KEY"),
anthropic_key=os.getenv("ANTHROPIC_API_KEY"),
daily_budget=50.0
)
client = ProductionLLMClient(config)
try:
response = client.chat(
model="gpt-4o",
messages=[{"role": "user", "content": "Explain quantum computing"}]
)
print(response)
except Exception as e:
logger.error(f"Failed: {e}")
핵심 요약
- 에러 처리: 지수 백오프 재시도, Circuit Breaker, 타임아웃
- Rate Limiting: 토큰 버킷, 슬라이딩 윈도우, 여러 계정 로드 밸런싱
- 캐싱: 정확한 매칭, 시맨틱, 프롬프트 캐싱으로 비용 90% 절감
- 로깅: 구조화된 로그, Prometheus 메트릭, 알림
- 보안: API 키 보호, 입력 검증, PII 제거
- 프로덕션 체크리스트 20항목 준수
핵심 정리
- LLM API 모범 사례의 핵심 개념과 흐름을 정리합니다.
- 에러 처리를 단계별로 이해합니다.
- 실전 적용 시 기준과 주의점을 확인합니다.
실무 팁
- 입력/출력 예시를 고정해 재현성을 확보하세요.
- LLM API 모범 사례 범위를 작게 잡고 단계적으로 확장하세요.
- 에러 처리 조건을 문서화해 대응 시간을 줄이세요.