MCP 고급 주제
프로덕션 환경에서 MCP 서버를 안전하고 효율적으로 운영하기 위한 고급 주제를 다룹니다. 보안, 성능 최적화, 디버깅, 테스트, 배포 전략 등을 학습하세요.
업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다.
최신 내용은 공식 문서를 확인하세요.
이 페이지에서 배울 내용
- 보안: 샌드박싱, 권한 관리, 인증
- 성능 최적화: 캐싱, 연결 풀링, 병렬 처리
- 멀티 서버 아키텍처 및 조정
- 효과적인 디버깅 및 로깅 전략
- 테스트 자동화
- 프로덕션 배포 및 모니터링
보안
샌드박싱
MCP 서버는 외부 데이터에 접근하므로 적절한 샌드박싱이 필수입니다.
파일 시스템 샌드박싱
Python
secure_filesystem.py
from pathlib import Path
from typing import List
class SecureFileSystem:
"""보안이 강화된 파일 시스템 접근"""
def __init__(self, allowed_dirs: List[Path]):
self.allowed_dirs = [d.resolve() for d in allowed_dirs]
def validate_path(self, path: Path) -> Path:
"""경로 검증 및 정규화"""
# 1. 경로 정규화 (심볼릭 링크 해결)
resolved = path.resolve()
# 2. 허용된 디렉토리 내에 있는지 확인
if not any(resolved.is_relative_to(d) for d in self.allowed_dirs):
raise PermissionError(f"Access denied: {path}")
# 3. 숨김 파일/디렉토리 차단 (선택사항)
for part in resolved.parts:
if part.startswith(".") and part not in {".", ".."}:
raise PermissionError(f"Hidden files not allowed: {path}")
return resolved
def read_file(self, path: str) -> str:
"""안전한 파일 읽기"""
validated_path = self.validate_path(Path(path))
# 파일 크기 제한
max_size = 10 * 1024 * 1024 # 10MB
size = validated_path.stat().st_size
if size > max_size:
raise ValueError(f"File too large: {size} bytes")
return validated_path.read_text(encoding="utf-8")
def write_file(self, path: str, content: str):
"""안전한 파일 쓰기"""
validated_path = self.validate_path(Path(path))
# 콘텐츠 크기 제한
max_size = 5 * 1024 * 1024 # 5MB
if len(content) > max_size:
raise ValueError(f"Content too large: {len(content)} bytes")
# 임시 파일에 먼저 쓰고 원자적으로 이동
tmp_path = validated_path.with_suffix(".tmp")
try:
tmp_path.write_text(content, encoding="utf-8")
tmp_path.replace(validated_path)
finally:
if tmp_path.exists():
tmp_path.unlink()
SQL 인젝션 방지
Python
secure_sql.py
import sqlparse
from sqlparse.sql import Token, TokenList
from typing import List
class SecureSQL:
"""SQL 쿼리 검증 및 실행"""
ALLOWED_STATEMENTS = {"SELECT"} # 읽기 전용
FORBIDDEN_KEYWORDS = {
"DROP", "DELETE", "UPDATE", "INSERT",
"ALTER", "CREATE", "TRUNCATE", "EXEC"
}
def validate_query(self, sql: str) -> bool:
"""쿼리 안전성 검증"""
# 1. 파싱
parsed = sqlparse.parse(sql)
if not parsed:
raise ValueError("Invalid SQL syntax")
for statement in parsed:
# 2. 문장 타입 확인
stmt_type = statement.get_type()
if stmt_type not in self.ALLOWED_STATEMENTS:
raise ValueError(f"Statement type not allowed: {stmt_type}")
# 3. 금지된 키워드 확인
tokens = [t for t in statement.flatten() if t.ttype is Token.Keyword]
for token in tokens:
if token.value.upper() in self.FORBIDDEN_KEYWORDS:
raise ValueError(f"Forbidden keyword: {token.value}")
return True
async def execute_safe(self, conn, sql: str, params: List = None):
"""안전한 쿼리 실행"""
self.validate_query(sql)
# 매개변수화된 쿼리만 허용
if params is None:
params = []
# 타임아웃 설정
async with asyncio.timeout(30): # 30초
result = await conn.fetch(sql, *params)
return result
인증 및 권한 관리
API 키 인증
Python
auth_server.py
import os
import hashlib
import secrets
from typing import Dict, Optional
class APIKeyAuth:
"""API 키 기반 인증"""
def __init__(self):
# 환경 변수에서 마스터 키 로드
self.master_key = os.getenv("MCP_MASTER_KEY")
if not self.master_key:
raise ValueError("MCP_MASTER_KEY not set")
# API 키 해시 저장 (실제로는 DB 사용)
self.api_keys: Dict[str, dict] = {}
def generate_api_key(self, user_id: str, permissions: list) -> str:
"""새 API 키 생성"""
# 안전한 랜덤 키 생성
api_key = secrets.token_urlsafe(32)
# 해시 저장 (원본은 사용자에게만 표시)
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
self.api_keys[key_hash] = {
"user_id": user_id,
"permissions": permissions,
"created_at": datetime.now()
}
return api_key
def validate_api_key(self, api_key: str) -> Optional[dict]:
"""API 키 검증"""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
return self.api_keys.get(key_hash)
def check_permission(self, api_key: str, required_permission: str) -> bool:
"""권한 확인"""
key_info = self.validate_api_key(api_key)
if not key_info:
return False
return required_permission in key_info["permissions"]
# 서버에 통합
auth = APIKeyAuth()
@server.call_tool()
async def call_tool(name: str, arguments: dict, meta: dict):
# 메타데이터에서 API 키 추출
api_key = meta.get("api_key")
if not api_key:
raise PermissionError("API key required")
# 도구별 권한 확인
required_permission = f"tool:{name}"
if not auth.check_permission(api_key, required_permission):
raise PermissionError(f"Permission denied: {required_permission}")
# 도구 실행...
Rate Limiting
Python
rate_limiter.py
import time
from collections import defaultdict
from typing import Dict, Tuple
class RateLimiter:
"""토큰 버킷 알고리즘 기반 Rate Limiter"""
def __init__(self, requests_per_minute: int = 60):
self.capacity = requests_per_minute
self.refill_rate = requests_per_minute / 60.0 # 초당 토큰 수
# 사용자별 버킷: {user_id: (tokens, last_update)}
self.buckets: Dict[str, Tuple[float, float]] = defaultdict(
lambda: (self.capacity, time.time())
)
def _refill(self, user_id: str) -> float:
"""버킷 리필"""
tokens, last_update = self.buckets[user_id]
now = time.time()
elapsed = now - last_update
# 경과 시간에 따라 토큰 추가
tokens = min(self.capacity, tokens + elapsed * self.refill_rate)
self.buckets[user_id] = (tokens, now)
return tokens
def allow_request(self, user_id: str) -> bool:
"""요청 허용 여부 확인"""
tokens = self._refill(user_id)
if tokens >= 1.0:
self.buckets[user_id] = (tokens - 1.0, time.time())
return True
return False
def get_retry_after(self, user_id: str) -> float:
"""다음 요청 가능 시간 (초)"""
tokens = self._refill(user_id)
if tokens >= 1.0:
return 0.0
needed = 1.0 - tokens
return needed / self.refill_rate
# 사용 예제
rate_limiter = RateLimiter(requests_per_minute=100)
@server.call_tool()
async def call_tool(name: str, arguments: dict, user_id: str):
if not rate_limiter.allow_request(user_id):
retry_after = rate_limiter.get_retry_after(user_id)
raise Exception(f"Rate limit exceeded. Retry after {retry_after:.2f}s")
# 도구 실행...
성능 최적화
캐싱
Python
cache_decorator.py
import asyncio
import hashlib
import json
from functools import wraps
from typing import Any, Callable
from datetime import datetime, timedelta
class AsyncCache:
"""비동기 메모리 캐시"""
def __init__(self):
self.cache: dict[str, tuple[Any, datetime]] = {}
def _make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
"""캐시 키 생성"""
key_data = {
"func": func_name,
"args": args,
"kwargs": kwargs
}
key_str = json.dumps(key_data, sort_keys=True)
return hashlib.sha256(key_str.encode()).hexdigest()
def get(self, key: str, ttl: int) -> Any:
"""캐시에서 값 가져오기"""
if key in self.cache:
value, timestamp = self.cache[key]
if datetime.now() - timestamp < timedelta(seconds=ttl):
return value
else:
del self.cache[key]
return None
def set(self, key: str, value: Any):
"""캐시에 값 저장"""
self.cache[key] = (value, datetime.now())
def clear(self):
"""캐시 초기화"""
self.cache.clear()
# 글로벌 캐시 인스턴스
cache = AsyncCache()
def cached(ttl: int = 300):
"""캐싱 데코레이터 (TTL 기본 5분)"""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
# 캐시 키 생성
cache_key = cache._make_key(func.__name__, args, kwargs)
# 캐시 조회
cached_value = cache.get(cache_key, ttl)
if cached_value is not None:
return cached_value
# 실제 함수 실행
result = await func(*args, **kwargs)
# 결과 캐싱
cache.set(cache_key, result)
return result
return wrapper
return decorator
# 사용 예제
@server.tool()
@cached(ttl=600) # 10분
async def fetch_github_issues(repo: str) -> list:
"""GitHub 이슈 조회 (캐싱됨)"""
# 비용이 큰 API 호출...
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.github.com/repos/{repo}/issues")
return response.json()
연결 풀링
Python
db_pool.py
import asyncpg
from typing import Optional
class DatabasePool:
"""PostgreSQL 연결 풀 관리"""
def __init__(self, dsn: str):
self.dsn = dsn
self.pool: Optional[asyncpg.Pool] = None
async def initialize(self):
"""풀 초기화"""
self.pool = await asyncpg.create_pool(
self.dsn,
min_size=5, # 최소 연결 수
max_size=20, # 최대 연결 수
max_queries=50000, # 연결당 최대 쿼리 수 (재생성 전)
max_inactive_connection_lifetime=300, # 비활성 연결 수명 (초)
command_timeout=60, # 쿼리 타임아웃
)
async def close(self):
"""풀 종료"""
if self.pool:
await self.pool.close()
async def execute(self, query: str, *args):
"""쿼리 실행"""
async with self.pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch(self, query: str, *args):
"""데이터 조회"""
async with self.pool.acquire() as conn:
return await conn.fetch(query, *args)
# 서버 초기화 시 풀 생성
db_pool = DatabasePool("postgresql://user:pass@localhost/db")
async def startup():
await db_pool.initialize()
async def shutdown():
await db_pool.close()
병렬 처리
Python
parallel_tasks.py
import asyncio
from typing import List, Any
async def process_files_parallel(file_paths: List[str]) -> List[Any]:
"""파일들을 병렬로 처리"""
async def process_single_file(path: str) -> dict:
"""단일 파일 처리"""
# 비동기 파일 읽기 (aiofiles 사용)
import aiofiles
async with aiofiles.open(path, 'r') as f:
content = await f.read()
# 처리 로직...
return {
"path": path,
"size": len(content),
"lines": content.count("\n")
}
# asyncio.gather로 병렬 실행
results = await asyncio.gather(
*[process_single_file(path) for path in file_paths],
return_exceptions=True # 개별 에러를 결과에 포함
)
# 에러 필터링
valid_results = [r for r in results if not isinstance(r, Exception)]
return valid_results
# Semaphore로 동시 실행 수 제한
async def process_with_limit(items: List[Any], max_concurrent: int = 10):
"""동시 실행 수를 제한하여 처리"""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_item(item):
async with semaphore:
# 처리 로직...
await asyncio.sleep(0.1) # 예시
return item
results = await asyncio.gather(*[process_item(item) for item in items])
return results
스트리밍 응답
Python
streaming.py
async def stream_large_file(path: str, chunk_size: int = 8192):
"""대용량 파일을 청크 단위로 스트리밍"""
import aiofiles
async with aiofiles.open(path, 'rb') as f:
while True:
chunk = await f.read(chunk_size)
if not chunk:
break
# 청크 전송 (MCP progress notification 사용)
yield chunk
@server.tool()
async def read_large_file(path: str):
"""대용량 파일 읽기 with progress"""
total_size = Path(path).stat().st_size
processed = 0
chunks = []
async for chunk in stream_large_file(path):
chunks.append(chunk)
processed += len(chunk)
# Progress notification 전송
progress = processed / total_size
await server.send_progress_notification(
token="read_file",
progress=progress,
total=total_size
)
return b"".join(chunks)
멀티 서버 아키텍처
서버 간 조정
여러 MCP 서버를 동시에 사용할 때의 조정 전략:
TypeScript
multi_server_client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
class MultiServerClient {
private servers: Map<string, Client> = new Map();
private toolRegistry: Map<string, string> = new Map();
async addServer(name: string, command: string, args: string[]) {
// 클라이언트 생성 및 연결
const client = new Client({ name: `client-${name}`, version: "1.0.0" }, {
capabilities: { tools: {} }
});
const transport = new StdioClientTransport({ command, args });
await client.connect(transport);
this.servers.set(name, client);
// 도구 목록 조회 및 등록
const tools = await client.listTools();
for (const tool of tools.tools) {
if (this.toolRegistry.has(tool.name)) {
console.warn(`Tool ${tool.name} already registered, skipping`);
} else {
this.toolRegistry.set(tool.name, name);
}
}
}
async callTool(toolName: string, args: any) {
// 도구가 등록된 서버 찾기
const serverName = this.toolRegistry.get(toolName);
if (!serverName) {
throw new Error(`Tool ${toolName} not found`);
}
const client = this.servers.get(serverName);
return await client!.callTool(toolName, args);
}
async closeAll() {
for (const client of this.servers.values()) {
await client.close();
}
}
}
// 사용 예제
const multiClient = new MultiServerClient();
await multiClient.addServer("filesystem", "npx", [
"-y", "@modelcontextprotocol/server-filesystem", "/home/user"
]);
await multiClient.addServer("github", "npx", [
"-y", "@modelcontextprotocol/server-github"
]);
// 통합된 인터페이스로 도구 호출
await multiClient.callTool("read_file", { path: "/home/user/test.txt" });
await multiClient.callTool("list_issues", { owner: "anthropics", repo: "claude" });
도구 네임스페이싱
여러 서버에서 동일한 이름의 도구를 제공하는 경우 충돌을 방지합니다:
Python
namespaced_tools.py
class NamespacedServer:
"""네임스페이스가 적용된 MCP 서버"""
def __init__(self, namespace: str):
self.namespace = namespace
self.server = Server(f"{namespace}-server")
def tool(self, name: str):
"""네임스페이스가 적용된 도구 등록"""
namespaced_name = f"{self.namespace}_{name}"
def decorator(func):
@self.server.tool(name=namespaced_name)
async def wrapper(*args, **kwargs):
return await func(*args, **kwargs)
return wrapper
return decorator
# 사용 예제
local_fs = NamespacedServer("local")
remote_fs = NamespacedServer("remote")
@local_fs.tool("read_file")
async def local_read_file(path: str):
# 로컬 파일 시스템 읽기...
pass
@remote_fs.tool("read_file")
async def remote_read_file(path: str):
# 원격 파일 시스템 읽기...
pass
# 결과: "local_read_file", "remote_read_file" 도구 생성
디버깅
로깅
Python
logging_config.py
import logging
import json
from datetime import datetime
from pathlib import Path
class MCPLogger:
"""구조화된 로깅"""
def __init__(self, name: str, log_dir: Path):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
# 파일 핸들러 (JSON 형식)
log_file = log_dir / f"{name}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(self._json_formatter())
self.logger.addHandler(file_handler)
# 콘솔 핸들러 (사람이 읽기 쉬운 형식)
console_handler = logging.StreamHandler()
console_handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
self.logger.addHandler(console_handler)
def _json_formatter(self):
class JSONFormatter(logging.Formatter):
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
return JSONFormatter()
def log_tool_call(self, tool_name: str, arguments: dict, result: any = None, error: Exception = None):
"""도구 호출 로깅"""
log_data = {
"event": "tool_call",
"tool": tool_name,
"arguments": arguments,
}
if result is not None:
log_data["result_size"] = len(str(result))
if error:
self.logger.error(f"Tool call failed: {json.dumps(log_data)}", exc_info=error)
else:
self.logger.info(f"Tool call succeeded: {json.dumps(log_data)}")
# 사용 예제
logger = MCPLogger("my-server", Path("/var/log/mcp"))
@server.call_tool()
async def call_tool(name: str, arguments: dict):
try:
result = await execute_tool(name, arguments)
logger.log_tool_call(name, arguments, result=result)
return result
except Exception as e:
logger.log_tool_call(name, arguments, error=e)
raise
분산 추적
Python
tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
# Tracer 설정
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
# Jaeger 익스포터
jaeger_exporter = JaegerExporter(
agent_host_name="localhost",
agent_port=6831,
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# 도구 호출 추적
@server.call_tool()
async def call_tool(name: str, arguments: dict):
with tracer.start_as_current_span(f"tool.{name}") as span:
# 속성 추가
span.set_attribute("tool.name", name)
span.set_attribute("tool.args", str(arguments))
try:
result = await execute_tool(name, arguments)
span.set_attribute("tool.status", "success")
return result
except Exception as e:
span.set_attribute("tool.status", "error")
span.record_exception(e)
raise
디버깅 도구
Bash
# MCP Inspector - 대화형 디버깅 도구
npx @modelcontextprotocol/inspector python server.py
# 로그 수준 설정
MCP_LOG_LEVEL=debug python server.py
# 프로토콜 메시지 덤프
MCP_DUMP_PROTOCOL=1 python server.py 2> protocol.log
테스트
단위 테스트
Python
test_server.py
import pytest
from mcp.server.stdio import stdio_server
from mcp import types
@pytest.mark.asyncio
async def test_read_file_tool():
"""read_file 도구 테스트"""
from my_server import server
# 임시 파일 생성
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
f.write("test content")
temp_path = f.name
try:
# 도구 호출
result = await server.call_tool("read_file", {"path": temp_path})
# 검증
assert len(result) == 1
assert result[0].type == "text"
assert result[0].text == "test content"
finally:
import os
os.unlink(temp_path)
@pytest.mark.asyncio
async def test_permission_denied():
"""권한 거부 테스트"""
from my_server import server
with pytest.raises(PermissionError):
await server.call_tool("read_file", {"path": "/etc/shadow"})
통합 테스트
Python
test_integration.py
import pytest
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
@pytest.mark.asyncio
async def test_full_workflow():
"""전체 워크플로우 테스트"""
# 서버 시작
server_params = StdioServerParameters(
command="python",
args=["my_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# 초기화
await session.initialize()
# 도구 목록 조회
tools = await session.list_tools()
assert len(tools.tools) > 0
# 도구 실행
result = await session.call_tool("list_directory", {
"path": "/tmp"
})
assert result is not None
부하 테스트
Python
load_test.py
import asyncio
import time
from statistics import mean, stdev
async def load_test(num_requests: int = 1000, concurrency: int = 50):
"""부하 테스트"""
latencies = []
errors = 0
async def single_request():
nonlocal errors
start = time.time()
try:
# 요청 실행...
result = await session.call_tool("some_tool", {})
latency = time.time() - start
latencies.append(latency)
except Exception:
errors += 1
# 동시 요청 실행
tasks = [single_request() for _ in range(num_requests)]
# Semaphore로 동시성 제한
semaphore = asyncio.Semaphore(concurrency)
async def bounded_task(task):
async with semaphore:
await task
await asyncio.gather(*[bounded_task(t) for t in tasks])
# 결과 분석
print(f"Total requests: {num_requests}")
print(f"Successful: {len(latencies)}")
print(f"Errors: {errors}")
print(f"Mean latency: {mean(latencies):.3f}s")
print(f"Stdev latency: {stdev(latencies):.3f}s")
print(f"Min/Max latency: {min(latencies):.3f}s / {max(latencies):.3f}s")
프로덕션 배포
Docker 컨테이너화
Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 애플리케이션 복사
COPY . .
# 비-root 사용자로 실행
RUN useradd -m mcpuser
USER mcpuser
# 헬스체크
HEALTHCHECK --interval=30s --timeout=3s \
CMD python -c "import sys; sys.exit(0)"
CMD ["python", "server.py"]
YAML
docker-compose.yml
version: '3.8'
services:
mcp-server:
build: .
restart: unless-stopped
environment:
- MCP_LOG_LEVEL=info
- DB_CONNECTION_STRING=${DB_CONNECTION_STRING}
volumes:
- ./data:/app/data:ro
- ./logs:/app/logs
networks:
- mcp-network
deploy:
resources:
limits:
cpus: '2'
memory: 1G
reservations:
cpus: '0.5'
memory: 256M
networks:
mcp-network:
driver: bridge
모니터링
Python
metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# 메트릭 정의
tool_calls_total = Counter(
'mcp_tool_calls_total',
'Total number of tool calls',
['tool_name', 'status']
)
tool_duration = Histogram(
'mcp_tool_duration_seconds',
'Tool execution duration',
['tool_name']
)
active_connections = Gauge(
'mcp_active_connections',
'Number of active connections'
)
# Prometheus 서버 시작 (별도 포트)
start_http_server(9090)
@server.call_tool()
async def call_tool(name: str, arguments: dict):
with tool_duration.labels(tool_name=name).time():
try:
result = await execute_tool(name, arguments)
tool_calls_total.labels(tool_name=name, status='success').inc()
return result
except Exception:
tool_calls_total.labels(tool_name=name, status='error').inc()
raise
CI/CD 파이프라인
YAML
.github/workflows/deploy.yml
name: Deploy MCP Server
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.11'
- run: pip install -r requirements.txt
- run: pytest tests/
build:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: docker/build-push-action@v4
with:
push: true
tags: myregistry/mcp-server:latest
deploy:
needs: build
runs-on: ubuntu-latest
steps:
- run: |
ssh deploy@server 'docker pull myregistry/mcp-server:latest'
ssh deploy@server 'docker-compose up -d'
베스트 프랙티스
보안
- 최소 권한 원칙 적용
- 입력 검증 철저히
- API 키 환경 변수로 관리
- Rate limiting 구현
- 샌드박싱으로 격리
성능
- 적절한 캐싱 전략
- 연결 풀링 사용
- 비동기 I/O 활용
- 타임아웃 설정
- 리소스 제한 (CPU, 메모리)
신뢰성
- 포괄적인 에러 처리
- 재시도 로직 (exponential backoff)
- Circuit breaker 패턴
- 헬스체크 엔드포인트
- Graceful shutdown
관찰성
- 구조화된 로깅
- 메트릭 수집 (Prometheus)
- 분산 추적 (Jaeger, Zipkin)
- 알림 설정 (PagerDuty, Slack)
다음 단계
학습 경로
- 보안 강화: 프로덕션 서버에 인증 및 권한 관리 추가
- 성능 테스트: 부하 테스트로 병목 지점 식별 및 최적화
- 모니터링 구축: Prometheus + Grafana 대시보드 설정
- CI/CD 구축: 자동화된 테스트 및 배포 파이프라인
- 스케일링: 수평 확장 전략 수립
추가 리소스
핵심 정리
- MCP 고급 주제의 핵심 개념과 흐름을 정리합니다.
- 보안를 단계별로 이해합니다.
- 실전 적용 시 기준과 주의점을 확인합니다.
실무 팁
- 입력/출력 예시를 고정해 재현성을 확보하세요.
- MCP 고급 주제 범위를 작게 잡고 단계적으로 확장하세요.
- 보안 조건을 문서화해 대응 시간을 줄이세요.