MCP 고급 주제

프로덕션 환경에서 MCP 서버를 안전하고 효율적으로 운영하기 위한 고급 주제를 다룹니다. 보안, 성능 최적화, 디버깅, 테스트, 배포 전략 등을 학습하세요.

업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다. 최신 내용은 공식 문서를 확인하세요.
이 페이지에서 배울 내용
  • 보안: 샌드박싱, 권한 관리, 인증
  • 성능 최적화: 캐싱, 연결 풀링, 병렬 처리
  • 멀티 서버 아키텍처 및 조정
  • 효과적인 디버깅 및 로깅 전략
  • 테스트 자동화
  • 프로덕션 배포 및 모니터링

보안

샌드박싱

MCP 서버는 외부 데이터에 접근하므로 적절한 샌드박싱이 필수입니다.

파일 시스템 샌드박싱

Python secure_filesystem.py
from pathlib import Path
from typing import List

class SecureFileSystem:
    """보안이 강화된 파일 시스템 접근"""

    def __init__(self, allowed_dirs: List[Path]):
        self.allowed_dirs = [d.resolve() for d in allowed_dirs]

    def validate_path(self, path: Path) -> Path:
        """경로 검증 및 정규화"""
        # 1. 경로 정규화 (심볼릭 링크 해결)
        resolved = path.resolve()

        # 2. 허용된 디렉토리 내에 있는지 확인
        if not any(resolved.is_relative_to(d) for d in self.allowed_dirs):
            raise PermissionError(f"Access denied: {path}")

        # 3. 숨김 파일/디렉토리 차단 (선택사항)
        for part in resolved.parts:
            if part.startswith(".") and part not in {".", ".."}:
                raise PermissionError(f"Hidden files not allowed: {path}")

        return resolved

    def read_file(self, path: str) -> str:
        """안전한 파일 읽기"""
        validated_path = self.validate_path(Path(path))

        # 파일 크기 제한
        max_size = 10 * 1024 * 1024  # 10MB
        size = validated_path.stat().st_size
        if size > max_size:
            raise ValueError(f"File too large: {size} bytes")

        return validated_path.read_text(encoding="utf-8")

    def write_file(self, path: str, content: str):
        """안전한 파일 쓰기"""
        validated_path = self.validate_path(Path(path))

        # 콘텐츠 크기 제한
        max_size = 5 * 1024 * 1024  # 5MB
        if len(content) > max_size:
            raise ValueError(f"Content too large: {len(content)} bytes")

        # 임시 파일에 먼저 쓰고 원자적으로 이동
        tmp_path = validated_path.with_suffix(".tmp")
        try:
            tmp_path.write_text(content, encoding="utf-8")
            tmp_path.replace(validated_path)
        finally:
            if tmp_path.exists():
                tmp_path.unlink()

SQL 인젝션 방지

Python secure_sql.py
import sqlparse
from sqlparse.sql import Token, TokenList
from typing import List

class SecureSQL:
    """SQL 쿼리 검증 및 실행"""

    ALLOWED_STATEMENTS = {"SELECT"}  # 읽기 전용
    FORBIDDEN_KEYWORDS = {
        "DROP", "DELETE", "UPDATE", "INSERT",
        "ALTER", "CREATE", "TRUNCATE", "EXEC"
    }

    def validate_query(self, sql: str) -> bool:
        """쿼리 안전성 검증"""
        # 1. 파싱
        parsed = sqlparse.parse(sql)
        if not parsed:
            raise ValueError("Invalid SQL syntax")

        for statement in parsed:
            # 2. 문장 타입 확인
            stmt_type = statement.get_type()
            if stmt_type not in self.ALLOWED_STATEMENTS:
                raise ValueError(f"Statement type not allowed: {stmt_type}")

            # 3. 금지된 키워드 확인
            tokens = [t for t in statement.flatten() if t.ttype is Token.Keyword]
            for token in tokens:
                if token.value.upper() in self.FORBIDDEN_KEYWORDS:
                    raise ValueError(f"Forbidden keyword: {token.value}")

        return True

    async def execute_safe(self, conn, sql: str, params: List = None):
        """안전한 쿼리 실행"""
        self.validate_query(sql)

        # 매개변수화된 쿼리만 허용
        if params is None:
            params = []

        # 타임아웃 설정
        async with asyncio.timeout(30):  # 30초
            result = await conn.fetch(sql, *params)

        return result

인증 및 권한 관리

API 키 인증

Python auth_server.py
import os
import hashlib
import secrets
from typing import Dict, Optional

class APIKeyAuth:
    """API 키 기반 인증"""

    def __init__(self):
        # 환경 변수에서 마스터 키 로드
        self.master_key = os.getenv("MCP_MASTER_KEY")
        if not self.master_key:
            raise ValueError("MCP_MASTER_KEY not set")

        # API 키 해시 저장 (실제로는 DB 사용)
        self.api_keys: Dict[str, dict] = {}

    def generate_api_key(self, user_id: str, permissions: list) -> str:
        """새 API 키 생성"""
        # 안전한 랜덤 키 생성
        api_key = secrets.token_urlsafe(32)

        # 해시 저장 (원본은 사용자에게만 표시)
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()

        self.api_keys[key_hash] = {
            "user_id": user_id,
            "permissions": permissions,
            "created_at": datetime.now()
        }

        return api_key

    def validate_api_key(self, api_key: str) -> Optional[dict]:
        """API 키 검증"""
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        return self.api_keys.get(key_hash)

    def check_permission(self, api_key: str, required_permission: str) -> bool:
        """권한 확인"""
        key_info = self.validate_api_key(api_key)
        if not key_info:
            return False

        return required_permission in key_info["permissions"]

# 서버에 통합
auth = APIKeyAuth()

@server.call_tool()
async def call_tool(name: str, arguments: dict, meta: dict):
    # 메타데이터에서 API 키 추출
    api_key = meta.get("api_key")
    if not api_key:
        raise PermissionError("API key required")

    # 도구별 권한 확인
    required_permission = f"tool:{name}"
    if not auth.check_permission(api_key, required_permission):
        raise PermissionError(f"Permission denied: {required_permission}")

    # 도구 실행...

Rate Limiting

Python rate_limiter.py
import time
from collections import defaultdict
from typing import Dict, Tuple

class RateLimiter:
    """토큰 버킷 알고리즘 기반 Rate Limiter"""

    def __init__(self, requests_per_minute: int = 60):
        self.capacity = requests_per_minute
        self.refill_rate = requests_per_minute / 60.0  # 초당 토큰 수

        # 사용자별 버킷: {user_id: (tokens, last_update)}
        self.buckets: Dict[str, Tuple[float, float]] = defaultdict(
            lambda: (self.capacity, time.time())
        )

    def _refill(self, user_id: str) -> float:
        """버킷 리필"""
        tokens, last_update = self.buckets[user_id]
        now = time.time()
        elapsed = now - last_update

        # 경과 시간에 따라 토큰 추가
        tokens = min(self.capacity, tokens + elapsed * self.refill_rate)

        self.buckets[user_id] = (tokens, now)
        return tokens

    def allow_request(self, user_id: str) -> bool:
        """요청 허용 여부 확인"""
        tokens = self._refill(user_id)

        if tokens >= 1.0:
            self.buckets[user_id] = (tokens - 1.0, time.time())
            return True

        return False

    def get_retry_after(self, user_id: str) -> float:
        """다음 요청 가능 시간 (초)"""
        tokens = self._refill(user_id)
        if tokens >= 1.0:
            return 0.0

        needed = 1.0 - tokens
        return needed / self.refill_rate

# 사용 예제
rate_limiter = RateLimiter(requests_per_minute=100)

@server.call_tool()
async def call_tool(name: str, arguments: dict, user_id: str):
    if not rate_limiter.allow_request(user_id):
        retry_after = rate_limiter.get_retry_after(user_id)
        raise Exception(f"Rate limit exceeded. Retry after {retry_after:.2f}s")

    # 도구 실행...

Rate Limiting 시나리오 예시

시나리오: 분당 100회 제한 (초당 ~1.67회 리필)
# 시간 0초: 사용자가 처음 요청 (토큰 100개 시작)
요청 1-50: ✅ 허용 (토큰 100 → 50 남음)

# 시간 30초: 50개 토큰 소진 상태
- 경과 시간: 30초
- 리필된 토큰: 30 × 1.67 ≈ 50개
- 현재 토큰: 50 + 50 = 100개 (상한 100)

요청 51-100: ✅ 허용 (토큰 100 → 0 남음)

# 시간 30초: 토큰 소진
요청 101: ❌ 거부 (토큰 0개, 0.6초 후 재시도)

# 시간 31초: 1초 대기 (리필 1.67개)
요청 102: ✅ 허용 (토큰 1.67 → 0.67 남음)

# 결과:
# - 짧은 버스트: 최대 100개까지 허용
# - 지속적 사용: 초당 ~1.67개로 제한
# - 공평성: 모든 사용자가 동일한 버킷

성능 최적화

캐싱

Python cache_decorator.py
import asyncio
import hashlib
import json
from functools import wraps
from typing import Any, Callable
from datetime import datetime, timedelta

class AsyncCache:
    """비동기 메모리 캐시"""

    def __init__(self):
        self.cache: dict[str, tuple[Any, datetime]] = {}

    def _make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """캐시 키 생성"""
        key_data = {
            "func": func_name,
            "args": args,
            "kwargs": kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()

    def get(self, key: str, ttl: int) -> Any:
        """캐시에서 값 가져오기"""
        if key in self.cache:
            value, timestamp = self.cache[key]
            if datetime.now() - timestamp < timedelta(seconds=ttl):
                return value
            else:
                del self.cache[key]
        return None

    def set(self, key: str, value: Any):
        """캐시에 값 저장"""
        self.cache[key] = (value, datetime.now())

    def clear(self):
        """캐시 초기화"""
        self.cache.clear()

# 글로벌 캐시 인스턴스
cache = AsyncCache()

def cached(ttl: int = 300):
    """캐싱 데코레이터 (TTL 기본 5분)"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 캐시 키 생성
            cache_key = cache._make_key(func.__name__, args, kwargs)

            # 캐시 조회
            cached_value = cache.get(cache_key, ttl)
            if cached_value is not None:
                return cached_value

            # 실제 함수 실행
            result = await func(*args, **kwargs)

            # 결과 캐싱
            cache.set(cache_key, result)

            return result

        return wrapper
    return decorator

# 사용 예제
@server.tool()
@cached(ttl=600)  # 10분
async def fetch_github_issues(repo: str) -> list:
    """GitHub 이슈 조회 (캐싱됨)"""
    # 비용이 큰 API 호출...
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.github.com/repos/{repo}/issues")
        return response.json()

연결 풀링

Python db_pool.py
import asyncpg
from typing import Optional

class DatabasePool:
    """PostgreSQL 연결 풀 관리"""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def initialize(self):
        """풀 초기화"""
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=5,           # 최소 연결 수
            max_size=20,          # 최대 연결 수
            max_queries=50000,    # 연결당 최대 쿼리 수 (재생성 전)
            max_inactive_connection_lifetime=300,  # 비활성 연결 수명 (초)
            command_timeout=60,    # 쿼리 타임아웃
        )

    async def close(self):
        """풀 종료"""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """쿼리 실행"""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """데이터 조회"""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

# 서버 초기화 시 풀 생성
db_pool = DatabasePool("postgresql://user:pass@localhost/db")

async def startup():
    await db_pool.initialize()

async def shutdown():
    await db_pool.close()

병렬 처리

Python parallel_tasks.py
import asyncio
from typing import List, Any

async def process_files_parallel(file_paths: List[str]) -> List[Any]:
    """파일들을 병렬로 처리"""

    async def process_single_file(path: str) -> dict:
        """단일 파일 처리"""
        # 비동기 파일 읽기 (aiofiles 사용)
        import aiofiles
        async with aiofiles.open(path, 'r') as f:
            content = await f.read()

        # 처리 로직...
        return {
            "path": path,
            "size": len(content),
            "lines": content.count("\n")
        }

    # asyncio.gather로 병렬 실행
    results = await asyncio.gather(
        *[process_single_file(path) for path in file_paths],
        return_exceptions=True  # 개별 에러를 결과에 포함
    )

    # 에러 필터링
    valid_results = [r for r in results if not isinstance(r, Exception)]

    return valid_results

# Semaphore로 동시 실행 수 제한
async def process_with_limit(items: List[Any], max_concurrent: int = 10):
    """동시 실행 수를 제한하여 처리"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_item(item):
        async with semaphore:
            # 처리 로직...
            await asyncio.sleep(0.1)  # 예시
            return item

    results = await asyncio.gather(*[process_item(item) for item in items])
    return results

스트리밍 응답

Python streaming.py
async def stream_large_file(path: str, chunk_size: int = 8192):
    """대용량 파일을 청크 단위로 스트리밍"""
    import aiofiles

    async with aiofiles.open(path, 'rb') as f:
        while True:
            chunk = await f.read(chunk_size)
            if not chunk:
                break

            # 청크 전송 (MCP progress notification 사용)
            yield chunk

@server.tool()
async def read_large_file(path: str):
    """대용량 파일 읽기 with progress"""
    total_size = Path(path).stat().st_size
    processed = 0

    chunks = []
    async for chunk in stream_large_file(path):
        chunks.append(chunk)
        processed += len(chunk)

        # Progress notification 전송
        progress = processed / total_size
        await server.send_progress_notification(
            token="read_file",
            progress=progress,
            total=total_size
        )

    return b"".join(chunks)

멀티 서버 아키텍처

서버 간 조정

여러 MCP 서버를 동시에 사용할 때의 조정 전략:

TypeScript multi_server_client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

class MultiServerClient {
  private servers: Map<string, Client> = new Map();
  private toolRegistry: Map<string, string> = new Map();

  async addServer(name: string, command: string, args: string[]) {
    // 클라이언트 생성 및 연결
    const client = new Client({ name: `client-${name}`, version: "1.0.0" }, {
      capabilities: { tools: {} }
    });

    const transport = new StdioClientTransport({ command, args });
    await client.connect(transport);

    this.servers.set(name, client);

    // 도구 목록 조회 및 등록
    const tools = await client.listTools();
    for (const tool of tools.tools) {
      if (this.toolRegistry.has(tool.name)) {
        console.warn(`Tool ${tool.name} already registered, skipping`);
      } else {
        this.toolRegistry.set(tool.name, name);
      }
    }
  }

  async callTool(toolName: string, args: any) {
    // 도구가 등록된 서버 찾기
    const serverName = this.toolRegistry.get(toolName);
    if (!serverName) {
      throw new Error(`Tool ${toolName} not found`);
    }

    const client = this.servers.get(serverName);
    return await client!.callTool(toolName, args);
  }

  async closeAll() {
    for (const client of this.servers.values()) {
      await client.close();
    }
  }
}

// 사용 예제
const multiClient = new MultiServerClient();

await multiClient.addServer("filesystem", "npx", [
  "-y", "@modelcontextprotocol/server-filesystem", "/home/user"
]);

await multiClient.addServer("github", "npx", [
  "-y", "@modelcontextprotocol/server-github"
]);

// 통합된 인터페이스로 도구 호출
await multiClient.callTool("read_file", { path: "/home/user/test.txt" });
await multiClient.callTool("list_issues", { owner: "anthropics", repo: "claude" });

도구 네임스페이싱

여러 서버에서 동일한 이름의 도구를 제공하는 경우 충돌을 방지합니다:

Python namespaced_tools.py
class NamespacedServer:
    """네임스페이스가 적용된 MCP 서버"""

    def __init__(self, namespace: str):
        self.namespace = namespace
        self.server = Server(f"{namespace}-server")

    def tool(self, name: str):
        """네임스페이스가 적용된 도구 등록"""
        namespaced_name = f"{self.namespace}_{name}"

        def decorator(func):
            @self.server.tool(name=namespaced_name)
            async def wrapper(*args, **kwargs):
                return await func(*args, **kwargs)
            return wrapper

        return decorator

# 사용 예제
local_fs = NamespacedServer("local")
remote_fs = NamespacedServer("remote")

@local_fs.tool("read_file")
async def local_read_file(path: str):
    # 로컬 파일 시스템 읽기...
    pass

@remote_fs.tool("read_file")
async def remote_read_file(path: str):
    # 원격 파일 시스템 읽기...
    pass

# 결과: "local_read_file", "remote_read_file" 도구 생성

디버깅

로깅

Python logging_config.py
import logging
import json
from datetime import datetime
from pathlib import Path

class MCPLogger:
    """구조화된 로깅"""

    def __init__(self, name: str, log_dir: Path):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)

        # 파일 핸들러 (JSON 형식)
        log_file = log_dir / f"{name}.log"
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(self._json_formatter())
        self.logger.addHandler(file_handler)

        # 콘솔 핸들러 (사람이 읽기 쉬운 형식)
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        self.logger.addHandler(console_handler)

    def _json_formatter(self):
        class JSONFormatter(logging.Formatter):
            def format(self, record):
                log_data = {
                    "timestamp": datetime.utcnow().isoformat(),
                    "level": record.levelname,
                    "logger": record.name,
                    "message": record.getMessage(),
                    "module": record.module,
                    "function": record.funcName,
                    "line": record.lineno,
                }

                if record.exc_info:
                    log_data["exception"] = self.formatException(record.exc_info)

                return json.dumps(log_data)

        return JSONFormatter()

    def log_tool_call(self, tool_name: str, arguments: dict, result: any = None, error: Exception = None):
        """도구 호출 로깅"""
        log_data = {
            "event": "tool_call",
            "tool": tool_name,
            "arguments": arguments,
        }

        if result is not None:
            log_data["result_size"] = len(str(result))

        if error:
            self.logger.error(f"Tool call failed: {json.dumps(log_data)}", exc_info=error)
        else:
            self.logger.info(f"Tool call succeeded: {json.dumps(log_data)}")

# 사용 예제
logger = MCPLogger("my-server", Path("/var/log/mcp"))

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    try:
        result = await execute_tool(name, arguments)
        logger.log_tool_call(name, arguments, result=result)
        return result
    except Exception as e:
        logger.log_tool_call(name, arguments, error=e)
        raise

분산 추적

Python tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# Tracer 설정
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Jaeger 익스포터
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# 도구 호출 추적
@server.call_tool()
async def call_tool(name: str, arguments: dict):
    with tracer.start_as_current_span(f"tool.{name}") as span:
        # 속성 추가
        span.set_attribute("tool.name", name)
        span.set_attribute("tool.args", str(arguments))

        try:
            result = await execute_tool(name, arguments)
            span.set_attribute("tool.status", "success")
            return result
        except Exception as e:
            span.set_attribute("tool.status", "error")
            span.record_exception(e)
            raise

디버깅 도구

Bash
# MCP Inspector - 대화형 디버깅 도구
npx @modelcontextprotocol/inspector python server.py

# 로그 수준 설정
MCP_LOG_LEVEL=debug python server.py

# 프로토콜 메시지 덤프
MCP_DUMP_PROTOCOL=1 python server.py 2> protocol.log

테스트

단위 테스트

Python test_server.py
import pytest
from mcp.server.stdio import stdio_server
from mcp import types

@pytest.mark.asyncio
async def test_read_file_tool():
    """read_file 도구 테스트"""
    from my_server import server

    # 임시 파일 생성
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
        f.write("test content")
        temp_path = f.name

    try:
        # 도구 호출
        result = await server.call_tool("read_file", {"path": temp_path})

        # 검증
        assert len(result) == 1
        assert result[0].type == "text"
        assert result[0].text == "test content"

    finally:
        import os
        os.unlink(temp_path)

@pytest.mark.asyncio
async def test_permission_denied():
    """권한 거부 테스트"""
    from my_server import server

    with pytest.raises(PermissionError):
        await server.call_tool("read_file", {"path": "/etc/shadow"})

통합 테스트

Python test_integration.py
import pytest
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

@pytest.mark.asyncio
async def test_full_workflow():
    """전체 워크플로우 테스트"""

    # 서버 시작
    server_params = StdioServerParameters(
        command="python",
        args=["my_server.py"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 초기화
            await session.initialize()

            # 도구 목록 조회
            tools = await session.list_tools()
            assert len(tools.tools) > 0

            # 도구 실행
            result = await session.call_tool("list_directory", {
                "path": "/tmp"
            })

            assert result is not None

부하 테스트

Python load_test.py
import asyncio
import time
from statistics import mean, stdev

async def load_test(num_requests: int = 1000, concurrency: int = 50):
    """부하 테스트"""

    latencies = []
    errors = 0

    async def single_request():
        nonlocal errors
        start = time.time()
        try:
            # 요청 실행...
            result = await session.call_tool("some_tool", {})
            latency = time.time() - start
            latencies.append(latency)
        except Exception:
            errors += 1

    # 동시 요청 실행
    tasks = [single_request() for _ in range(num_requests)]

    # Semaphore로 동시성 제한
    semaphore = asyncio.Semaphore(concurrency)

    async def bounded_task(task):
        async with semaphore:
            await task

    await asyncio.gather(*[bounded_task(t) for t in tasks])

    # 결과 분석
    print(f"Total requests: {num_requests}")
    print(f"Successful: {len(latencies)}")
    print(f"Errors: {errors}")
    print(f"Mean latency: {mean(latencies):.3f}s")
    print(f"Stdev latency: {stdev(latencies):.3f}s")
    print(f"Min/Max latency: {min(latencies):.3f}s / {max(latencies):.3f}s")

프로덕션 배포

Docker 컨테이너화

Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 애플리케이션 복사
COPY . .

# 비-root 사용자로 실행
RUN useradd -m mcpuser
USER mcpuser

# 헬스체크
HEALTHCHECK --interval=30s --timeout=3s \
  CMD python -c "import sys; sys.exit(0)"

CMD ["python", "server.py"]
YAML docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    restart: unless-stopped
    environment:
      - MCP_LOG_LEVEL=info
      - DB_CONNECTION_STRING=${DB_CONNECTION_STRING}
    volumes:
      - ./data:/app/data:ro
      - ./logs:/app/logs
    networks:
      - mcp-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 1G
        reservations:
          cpus: '0.5'
          memory: 256M

networks:
  mcp-network:
    driver: bridge

모니터링

Python metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 메트릭 정의
tool_calls_total = Counter(
    'mcp_tool_calls_total',
    'Total number of tool calls',
    ['tool_name', 'status']
)

tool_duration = Histogram(
    'mcp_tool_duration_seconds',
    'Tool execution duration',
    ['tool_name']
)

active_connections = Gauge(
    'mcp_active_connections',
    'Number of active connections'
)

# Prometheus 서버 시작 (별도 포트)
start_http_server(9090)

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    with tool_duration.labels(tool_name=name).time():
        try:
            result = await execute_tool(name, arguments)
            tool_calls_total.labels(tool_name=name, status='success').inc()
            return result
        except Exception:
            tool_calls_total.labels(tool_name=name, status='error').inc()
            raise

CI/CD 파이프라인

YAML .github/workflows/deploy.yml
name: Deploy MCP Server

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - run: pip install -r requirements.txt
      - run: pytest tests/

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: docker/build-push-action@v4
        with:
          push: true
          tags: myregistry/mcp-server:latest

  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - run: |
          ssh deploy@server 'docker pull myregistry/mcp-server:latest'
          ssh deploy@server 'docker-compose up -d'

베스트 프랙티스

보안 체크리스트
  • ✅ 최소 권한 원칙 적용
    • 각 도구별 권한 정의 (read-only, write, admin)
    • 사용자별 권한 매핑 테이블 구현
    • 예: tool:read_file, tool:write_file 분리
  • ✅ 입력 검증 철저히
    • 경로 검증: Path.resolve()로 심볼릭 링크 해제
    • SQL 검증: 파라미터화된 쿼리만 허용
    • 파일 크기 제한: 10MB 초과 차단
  • ✅ API 키 환경 변수로 관리
    • 절대 코드에 하드코딩 금지
    • os.getenv("MCP_API_KEY") 사용
    • .env 파일 .gitignore 추가
  • ✅ Rate limiting 구현
    • 사용자별: 분당 60회
    • IP별: 분당 100회
    • 전역: 초당 1000회
  • ✅ 샌드박싱으로 격리
    • 파일 접근: 허용 디렉토리 화이트리스트
    • 네트워크: 허용 도메인만 접근 가능
    • 프로세스: Docker 컨테이너 격리
성능 체크리스트
  • ✅ 적절한 캐싱 전략
    • 읽기 전용 데이터: TTL 10분
    • 사용자 세션: TTL 1시간
    • API 응답: LRU 캐시 1000개 엔트리
  • ✅ 연결 풀링 사용
    • DB 풀: min=5, max=20
    • HTTP 클라이언트: keep-alive 활성화
    • 연결 재사용으로 오버헤드 50% 감소
  • ✅ 비동기 I/O 활용
    • async/await 일관되게 사용
    • asyncio.gather()로 병렬 처리
    • 블로킹 호출은 run_in_executor()
  • ✅ 타임아웃 설정
    • 쿼리: 30초
    • API 호출: 10초
    • 파일 I/O: 60초
  • ✅ 리소스 제한 (CPU, 메모리)
    • Docker: --memory=1g --cpus=2
    • 프로세스: ulimit -v 1048576 (1GB)
신뢰성 체크리스트
  • ✅ 포괄적인 에러 처리
    • 모든 외부 호출에 try-except
    • 사용자 친화적 에러 메시지
    • 스택 트레이스는 로그만 (사용자에게 노출 금지)
  • ✅ 재시도 로직 (exponential backoff)
    • 1차: 2초, 2차: 4초, 3차: 8초
    • 최대 3회 재시도 후 실패 처리
    • 일시적 오류만 재시도 (5xx, timeout)
  • ✅ Circuit breaker 패턴
    • 실패율 50% 초과 시 회로 차단
    • 30초 후 half-open으로 테스트
    • 성공 시 회로 복구
  • ✅ 헬스체크 엔드포인트
    • /health: 기본 상태 (200 OK)
    • /health/ready: DB 연결 확인
    • /health/live: 프로세스 살아있는지
  • ✅ Graceful shutdown
    • SIGTERM 시그널 처리
    • 진행 중인 요청 완료 대기 (최대 30초)
    • DB 연결 정리 후 종료
관찰성 체크리스트
  • ✅ 구조화된 로깅
    • JSON 형식으로 출력
    • 타임스탬프, 레벨, 요청 ID 포함
    • 민감 정보 마스킹 (비밀번호, API 키)
  • ✅ 메트릭 수집 (Prometheus)
    • 요청 수: mcp_requests_total
    • 응답 시간: mcp_response_duration_seconds
    • 에러율: mcp_errors_total
  • ✅ 분산 추적 (Jaeger, Zipkin)
    • 각 요청에 trace_id 부여
    • 서비스 간 호출 추적
    • 병목 지점 시각화
  • ✅ 알림 설정 (PagerDuty, Slack)
    • 에러율 5% 초과: WARNING
    • 서비스 다운: CRITICAL
    • 응답 시간 2초 초과: INFO

다음 단계

학습 경로
  1. 보안 강화: 프로덕션 서버에 인증 및 권한 관리 추가
  2. 성능 테스트: 부하 테스트로 병목 지점 식별 및 최적화
  3. 모니터링 구축: Prometheus + Grafana 대시보드 설정
  4. CI/CD 구축: 자동화된 테스트 및 배포 파이프라인
  5. 스케일링: 수평 확장 전략 수립

추가 리소스

보안 아키텍처 전체 다이어그램

MCP 서버의 보안은 단일 계층이 아니라 여러 계층이 순차적으로 동작하는 심층 방어(Defense in Depth) 전략으로 구현해야 합니다. 아래 다이어그램은 클라이언트 요청이 서버 로직에 도달하기까지 거치는 보안 검사 포인트를 보여줍니다.

MCP 보안 아키텍처 - 요청 처리 흐름 클라이언트 (AI Agent) TLS 암호화 mTLS / HTTPS 인증서 검증 인증 레이어 OAuth 2.0 / JWT 토큰 검증 권한 검사 RBAC / ABAC 도구별 ACL Rate Limiter 토큰 버킷 사용자별 제한 입력 검증 및 샌드박싱 경로 검증 / SQL 파싱 / 크기 제한 MCP 서버 로직 도구 실행 / 리소스 접근 감사 로그 모든 요청/응답 기록 / 알림 보안 검사 포인트 요약 1. 전송 계층 TLS 1.3 강제 인증서 피닝 2. 신원 확인 JWT 서명 검증 토큰 만료 확인 3. 접근 제어 역할 기반 권한 리소스별 ACL 4. 트래픽 제어 요청 빈도 제한 악용 탐지 5. 데이터 보호 입력 정제 출력 필터링 모든 계층에서 실패 시 즉시 요청 거부 및 감사 로그 기록

각 계층은 독립적으로 동작하며, 하나의 계층이 우회되더라도 다음 계층에서 차단할 수 있습니다. 이 심층 방어 전략은 프로덕션 MCP 서버의 필수 요소입니다.

OAuth 2.0 / JWT 인증

프로덕션 MCP 서버에서는 단순 API 키보다 OAuth 2.0JWT(JSON Web Token)를 활용한 인증이 더 안전하고 유연합니다. JWT는 토큰 자체에 클레임 정보를 포함하므로 DB 조회 없이 검증할 수 있으며, OAuth 2.0 프레임워크와 결합하면 토큰 발급, 갱신, 폐기를 체계적으로 관리할 수 있습니다.

JWT 토큰 검증 미들웨어

Python jwt_auth.py
import jwt
import time
from typing import Optional, Dict, Any
from dataclasses import dataclass
from functools import wraps

@dataclass
class TokenPayload:
    """JWT 토큰 페이로드"""
    sub: str           # 사용자 ID
    roles: list        # 역할 목록
    permissions: list  # 권한 목록
    exp: int           # 만료 시간 (Unix timestamp)
    iat: int           # 발급 시간
    jti: str           # 토큰 고유 ID (재사용 방지)

class JWTAuthenticator:
    """MCP 서버용 JWT 인증 미들웨어"""

    def __init__(self, secret_key: str, algorithm: str = "HS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.revoked_tokens: set = set()  # 폐기된 토큰 JTI

    def create_token(self, user_id: str, roles: list,
                     permissions: list, ttl: int = 3600) -> str:
        """JWT 액세스 토큰 생성"""
        import uuid
        now = int(time.time())
        payload = {
            "sub": user_id,
            "roles": roles,
            "permissions": permissions,
            "exp": now + ttl,
            "iat": now,
            "jti": str(uuid.uuid4()),
        }
        return jwt.encode(payload, self.secret_key,
                         algorithm=self.algorithm)

    def create_refresh_token(self, user_id: str,
                              ttl: int = 86400 * 7) -> str:
        """리프레시 토큰 생성 (7일 유효)"""
        import uuid
        now = int(time.time())
        payload = {
            "sub": user_id,
            "type": "refresh",
            "exp": now + ttl,
            "iat": now,
            "jti": str(uuid.uuid4()),
        }
        return jwt.encode(payload, self.secret_key,
                         algorithm=self.algorithm)

    def verify_token(self, token: str) -> Optional[TokenPayload]:
        """토큰 검증 및 디코딩"""
        try:
            payload = jwt.decode(
                token, self.secret_key,
                algorithms=[self.algorithm]
            )

            # 폐기된 토큰 확인
            if payload.get("jti") in self.revoked_tokens:
                return None

            return TokenPayload(
                sub=payload["sub"],
                roles=payload.get("roles", []),
                permissions=payload.get("permissions", []),
                exp=payload["exp"],
                iat=payload["iat"],
                jti=payload["jti"],
            )
        except jwt.ExpiredSignatureError:
            raise PermissionError("토큰이 만료되었습니다")
        except jwt.InvalidTokenError:
            raise PermissionError("유효하지 않은 토큰입니다")

    def refresh_access_token(self, refresh_token: str,
                              roles: list, permissions: list) -> str:
        """리프레시 토큰으로 새 액세스 토큰 발급"""
        try:
            payload = jwt.decode(
                refresh_token, self.secret_key,
                algorithms=[self.algorithm]
            )
            if payload.get("type") != "refresh":
                raise ValueError("리프레시 토큰이 아닙니다")

            # 새 액세스 토큰 발급
            return self.create_token(
                payload["sub"], roles, permissions
            )
        except jwt.ExpiredSignatureError:
            raise PermissionError("리프레시 토큰이 만료되었습니다. 재로그인 필요")

    def revoke_token(self, jti: str):
        """토큰 폐기 (로그아웃 시)"""
        self.revoked_tokens.add(jti)

# MCP 서버에 JWT 인증 통합
import os
auth = JWTAuthenticator(os.getenv("JWT_SECRET_KEY"))

def require_permission(permission: str):
    """권한 확인 데코레이터"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            token = kwargs.get("_token")
            if not token:
                raise PermissionError("인증 토큰이 필요합니다")

            payload = auth.verify_token(token)
            if not payload:
                raise PermissionError("유효하지 않은 토큰")

            if permission not in payload.permissions:
                raise PermissionError(
                    f"권한 부족: {permission} 필요"
                )
            return await func(*args, **kwargs)
        return wrapper
    return decorator

# 사용 예제
@server.tool()
@require_permission("file:write")
async def write_file(path: str, content: str, **kwargs):
    """파일 쓰기 (file:write 권한 필요)"""
    # 인증된 사용자만 실행 가능...
    pass
토큰 갱신 흐름
  1. 클라이언트가 액세스 토큰(1시간 유효)과 리프레시 토큰(7일 유효)을 발급받음
  2. API 요청 시 액세스 토큰을 포함
  3. 액세스 토큰 만료 시 리프레시 토큰으로 새 액세스 토큰 발급
  4. 리프레시 토큰 만료 시 재인증(로그인) 필요
  5. 로그아웃 시 revoke_token()으로 토큰 폐기

미들웨어 패턴

미들웨어 패턴을 사용하면 로깅, 인증, Rate Limiting, 에러 처리 등의 횡단 관심사(Cross-Cutting Concerns)를 서버 로직과 분리하여 관리할 수 있습니다. 각 미들웨어는 요청을 처리하고 다음 미들웨어로 넘기는 체인 구조로 동작합니다.

미들웨어 체인 구현

Python middleware.py
import time
import logging
import traceback
from typing import Callable, Any, List
from dataclasses import dataclass, field
from abc import ABC, abstractmethod

logger = logging.getLogger("mcp.middleware")

@dataclass
class MCPRequest:
    """MCP 요청 컨텍스트"""
    tool_name: str
    arguments: dict
    user_id: str = ""
    token: str = ""
    metadata: dict = field(default_factory=dict)
    start_time: float = field(default_factory=time.time)

class Middleware(ABC):
    """미들웨어 기본 클래스"""
    @abstractmethod
    async def process(self, request: MCPRequest,
                      next_handler: Callable) -> Any:
        pass

# --- 1. 로깅 미들웨어 ---
class LoggingMiddleware(Middleware):
    """요청/응답 로깅"""
    async def process(self, request: MCPRequest,
                      next_handler: Callable) -> Any:
        logger.info(
            f"[REQ] tool={request.tool_name} "
            f"user={request.user_id}"
        )
        try:
            result = await next_handler(request)
            elapsed = time.time() - request.start_time
            logger.info(
                f"[RES] tool={request.tool_name} "
                f"duration={elapsed:.3f}s status=ok"
            )
            return result
        except Exception as e:
            elapsed = time.time() - request.start_time
            logger.error(
                f"[ERR] tool={request.tool_name} "
                f"duration={elapsed:.3f}s error={e}"
            )
            raise

# --- 2. 인증 미들웨어 ---
class AuthMiddleware(Middleware):
    """JWT 인증 검사"""
    def __init__(self, authenticator):
        self.auth = authenticator

    async def process(self, request: MCPRequest,
                      next_handler: Callable) -> Any:
        if not request.token:
            raise PermissionError("인증 토큰이 필요합니다")

        payload = self.auth.verify_token(request.token)
        if not payload:
            raise PermissionError("유효하지 않은 토큰")

        request.user_id = payload.sub
        request.metadata["permissions"] = payload.permissions
        return await next_handler(request)

# --- 3. Rate Limiting 미들웨어 ---
class RateLimitMiddleware(Middleware):
    """사용자별 요청 제한"""
    def __init__(self, rate_limiter):
        self.limiter = rate_limiter

    async def process(self, request: MCPRequest,
                      next_handler: Callable) -> Any:
        if not self.limiter.allow_request(request.user_id):
            retry = self.limiter.get_retry_after(request.user_id)
            raise Exception(
                f"요청 제한 초과. {retry:.1f}초 후 재시도"
            )
        return await next_handler(request)

# --- 4. 에러 처리 미들웨어 ---
class ErrorHandlerMiddleware(Middleware):
    """전역 에러 처리 및 안전한 응답"""
    async def process(self, request: MCPRequest,
                      next_handler: Callable) -> Any:
        try:
            return await next_handler(request)
        except PermissionError as e:
            return {"error": str(e), "code": "AUTH_ERROR"}
        except ValueError as e:
            return {"error": str(e), "code": "VALIDATION_ERROR"}
        except Exception as e:
            # 내부 오류는 상세 정보 숨김
            logger.error(traceback.format_exc())
            return {"error": "내부 서버 오류",
                    "code": "INTERNAL_ERROR"}

미들웨어 등록 및 실행

Python middleware_runner.py
class MiddlewareRunner:
    """미들웨어 체인 관리 및 실행"""

    def __init__(self):
        self.middlewares: List[Middleware] = []
        self.tool_handlers: dict = {}

    def use(self, middleware: Middleware):
        """미들웨어 등록 (순서 중요!)"""
        self.middlewares.append(middleware)
        return self  # 체이닝 지원

    def register_tool(self, name: str, handler: Callable):
        """도구 핸들러 등록"""
        self.tool_handlers[name] = handler

    async def execute(self, request: MCPRequest) -> Any:
        """미들웨어 체인 실행"""
        # 실제 도구 핸들러를 최종 호출로 설정
        handler = self.tool_handlers.get(request.tool_name)
        if not handler:
            raise ValueError(
                f"알 수 없는 도구: {request.tool_name}"
            )

        # 미들웨어 체인 구성 (역순으로 감싸기)
        async def final_handler(req):
            return await handler(req.arguments)

        chain = final_handler
        for mw in reversed(self.middlewares):
            # 클로저로 현재 미들웨어와 다음 핸들러 캡처
            chain = self._wrap(mw, chain)

        return await chain(request)

    def _wrap(self, mw: Middleware, next_fn: Callable):
        async def wrapped(req: MCPRequest):
            return await mw.process(req, next_fn)
        return wrapped

# === 서버 조립 ===
runner = MiddlewareRunner()

# 미들웨어 등록 (실행 순서: 위에서 아래로)
runner.use(ErrorHandlerMiddleware())    # 1. 에러 처리 (가장 바깥)
runner.use(LoggingMiddleware())          # 2. 로깅
runner.use(AuthMiddleware(auth))          # 3. 인증
runner.use(RateLimitMiddleware(limiter))  # 4. Rate Limiting

# 도구 핸들러 등록
runner.register_tool("read_file", read_file_handler)
runner.register_tool("write_file", write_file_handler)

# 요청 처리
request = MCPRequest(
    tool_name="read_file",
    arguments={"path": "/data/config.json"},
    token="eyJhbGciOiJIUzI1NiIs..."
)
result = await runner.execute(request)
미들웨어 실행 순서

미들웨어는 양파 모델(Onion Model)로 동작합니다. 요청은 바깥 → 안쪽으로, 응답은 안쪽 → 바깥으로 흐릅니다.

# 요청 흐름 (바깥 → 안쪽)
ErrorHandler → Logging → Auth → RateLimit → 도구 실행

# 응답 흐름 (안쪽 → 바깥)
도구 실행 → RateLimit → Auth → Logging → ErrorHandler

# 에러 발생 시 (Auth에서 실패한 경우)
ErrorHandler → Logging → Auth ✗ (PermissionError)
                         ↑ 여기서 중단, 이후 미들웨어 실행 안 됨

수평 확장 아키텍처

단일 MCP 서버 인스턴스로는 트래픽 증가에 대응하기 어렵습니다. 수평 확장(Horizontal Scaling)은 동일한 서버를 여러 개 실행하여 부하를 분산하는 전략입니다. 이 때 상태 관리가 핵심 과제가 됩니다.

수평 확장 아키텍처 Client A Client B Client C Load Balancer Nginx / HAProxy 라운드 로빈 / 최소 연결 MCP Server #1 Stateless MCP Server #2 Stateless MCP Server #N Stateless 공유 상태 Redis (세션/캐시) PostgreSQL (영속 데이터) S3 (파일 저장소) Pub/Sub (이벤트) Stateless 전략 (권장) + 인스턴스 추가/제거 자유로움 + 요청 라우팅에 제약 없음 + 장애 발생 시 자동 복구 - 외부 저장소(Redis) 의존성 Sticky Session 전략 + 외부 저장소 불필요 + 낮은 지연 시간 - 불균형 부하 가능 - 인스턴스 장애 시 세션 유실

Redis 기반 세션 공유

Python shared_state.py
import redis.asyncio as redis
import json
from typing import Optional, Any

class SharedStateManager:
    """Redis 기반 분산 상태 관리"""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(
            redis_url,
            decode_responses=True,
            max_connections=20
        )

    async def get_session(self, session_id: str) -> Optional[dict]:
        """세션 데이터 조회"""
        data = await self.redis.get(f"session:{session_id}")
        return json.loads(data) if data else None

    async def set_session(self, session_id: str,
                          data: dict, ttl: int = 3600):
        """세션 데이터 저장 (기본 1시간 TTL)"""
        await self.redis.setex(
            f"session:{session_id}",
            ttl,
            json.dumps(data)
        )

    async def delete_session(self, session_id: str):
        """세션 삭제"""
        await self.redis.delete(f"session:{session_id}")

    async def distributed_lock(self, key: str,
                               ttl: int = 10) -> bool:
        """분산 락 획득 (동시 접근 제어)"""
        return await self.redis.set(
            f"lock:{key}", "1",
            nx=True, ex=ttl
        )

    async def release_lock(self, key: str):
        """분산 락 해제"""
        await self.redis.delete(f"lock:{key}")

    async def publish_event(self, channel: str, event: dict):
        """이벤트 발행 (서버 간 통신)"""
        await self.redis.publish(channel, json.dumps(event))

# MCP 서버에서 사용
state = SharedStateManager("redis://redis-cluster:6379")

@server.tool()
async def process_with_lock(resource_id: str, data: dict):
    """분산 락을 사용한 안전한 리소스 처리"""
    lock_key = f"resource:{resource_id}"

    if not await state.distributed_lock(lock_key):
        raise Exception("리소스가 다른 인스턴스에서 처리 중")

    try:
        # 리소스 처리 로직...
        result = await process_resource(resource_id, data)
        return result
    finally:
        await state.release_lock(lock_key)

Circuit Breaker 구현

Circuit Breaker 패턴은 외부 서비스 호출 시 연속적인 실패가 발생하면 일정 시간 동안 호출을 차단하여 장애 전파를 방지합니다. 이는 MCP 서버가 DB, API 등 외부 의존성에 문제가 발생했을 때 전체 시스템이 다운되는 것을 막는 핵심 패턴입니다.

상태 전이 다이어그램

Circuit Breaker 상태 전이 CLOSED (정상 동작) 요청 통과 OPEN (차단 상태) 즉시 실패 반환 HALF_OPEN (테스트 상태) 제한적 요청 허용 실패 임계값 초과 (failures >= threshold) 타임아웃 경과 (recovery_timeout) 테스트 요청 성공 테스트 요청 실패 성공

전체 구현

Python circuit_breaker.py
import time
import asyncio
import logging
from enum import Enum
from typing import Callable, Any, Optional
from functools import wraps

logger = logging.getLogger("circuit_breaker")

class CircuitState(Enum):
    CLOSED = "closed"        # 정상 - 요청 통과
    OPEN = "open"            # 차단 - 즉시 실패
    HALF_OPEN = "half_open"  # 테스트 - 제한적 허용

class CircuitBreaker:
    """Circuit Breaker 패턴 구현"""

    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        half_open_max_calls: int = 3,
        expected_exceptions: tuple = (Exception,),
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.half_open_max_calls = half_open_max_calls
        self.expected_exceptions = expected_exceptions

        # 내부 상태
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[float] = None
        self.half_open_calls = 0
        self._lock = asyncio.Lock()

    async def _transition_to(self, new_state: CircuitState):
        """상태 전이"""
        old_state = self.state
        self.state = new_state
        logger.warning(
            f"Circuit Breaker: {old_state.value} → "
            f"{new_state.value}"
        )
        if new_state == CircuitState.CLOSED:
            self.failure_count = 0
            self.success_count = 0
        elif new_state == CircuitState.HALF_OPEN:
            self.half_open_calls = 0

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Circuit Breaker를 통한 함수 호출"""
        async with self._lock:
            await self._check_state()

        try:
            result = await func(*args, **kwargs)
            await self._on_success()
            return result
        except self.expected_exceptions as e:
            await self._on_failure()
            raise

    async def _check_state(self):
        """현재 상태에 따라 요청 허용 여부 결정"""
        if self.state == CircuitState.OPEN:
            # 복구 타임아웃 경과 확인
            if (self.last_failure_time and
                time.time() - self.last_failure_time
                    >= self.recovery_timeout):
                await self._transition_to(CircuitState.HALF_OPEN)
            else:
                raise Exception(
                    "Circuit breaker OPEN: 서비스 일시 중단"
                )
        elif self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.half_open_max_calls:
                raise Exception(
                    "Circuit breaker HALF_OPEN: 테스트 한도 초과"
                )
            self.half_open_calls += 1

    async def _on_success(self):
        """성공 시 처리"""
        async with self._lock:
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.half_open_max_calls:
                    await self._transition_to(CircuitState.CLOSED)
            elif self.state == CircuitState.CLOSED:
                self.failure_count = 0  # 성공 시 카운터 리셋

    async def _on_failure(self):
        """실패 시 처리"""
        async with self._lock:
            self.last_failure_time = time.time()

            if self.state == CircuitState.HALF_OPEN:
                await self._transition_to(CircuitState.OPEN)
            elif self.state == CircuitState.CLOSED:
                self.failure_count += 1
                if self.failure_count >= self.failure_threshold:
                    await self._transition_to(CircuitState.OPEN)

# === 데코레이터 버전 ===
def circuit_protected(breaker: CircuitBreaker):
    """Circuit Breaker 데코레이터"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            return await breaker.call(func, *args, **kwargs)
        return wrapper
    return decorator

# 사용 예제
db_breaker = CircuitBreaker(
    failure_threshold=5,
    recovery_timeout=30.0,
    half_open_max_calls=3
)

@circuit_protected(db_breaker)
async def query_database(sql: str):
    """DB 조회 (Circuit Breaker 보호)"""
    return await db_pool.fetch(sql)

Graceful Shutdown 구현

프로덕션 환경에서 서버를 종료할 때, 진행 중인 요청을 갑자기 중단하면 데이터 손실이나 불일치가 발생할 수 있습니다. Graceful Shutdown은 SIGTERM 신호를 수신하면 새 요청 수락을 중지하고, 진행 중인 요청이 완료될 때까지 대기한 뒤 안전하게 종료하는 패턴입니다.

Python graceful_shutdown.py
import asyncio
import signal
import logging
from typing import Set
from contextlib import asynccontextmanager

logger = logging.getLogger("mcp.shutdown")

class GracefulShutdownManager:
    """Graceful Shutdown 관리자"""

    def __init__(self, shutdown_timeout: float = 30.0):
        self.shutdown_timeout = shutdown_timeout
        self.is_shutting_down = False
        self.active_requests: Set[asyncio.Task] = set()
        self.shutdown_event = asyncio.Event()
        self._cleanup_callbacks = []

    def register_cleanup(self, callback):
        """종료 시 실행할 정리 콜백 등록"""
        self._cleanup_callbacks.append(callback)

    def setup_signal_handlers(self, loop: asyncio.AbstractEventLoop):
        """시그널 핸들러 등록"""
        for sig in (signal.SIGTERM, signal.SIGINT):
            loop.add_signal_handler(
                sig,
                lambda s=sig: asyncio.create_task(
                    self.initiate_shutdown(s)
                )
            )
        logger.info("시그널 핸들러 등록 완료 (SIGTERM, SIGINT)")

    async def initiate_shutdown(self, sig: signal.Signals):
        """종료 프로세스 시작"""
        logger.warning(
            f"시그널 수신: {sig.name}. "
            f"Graceful shutdown 시작..."
        )
        self.is_shutting_down = True

        # 1단계: 진행 중인 요청 대기
        if self.active_requests:
            logger.info(
                f"진행 중인 요청 {len(self.active_requests)}개 "
                f"완료 대기 중..."
            )
            try:
                await asyncio.wait_for(
                    self._wait_for_requests(),
                    timeout=self.shutdown_timeout
                )
                logger.info("모든 요청 완료")
            except asyncio.TimeoutError:
                remaining = len(self.active_requests)
                logger.warning(
                    f"타임아웃! {remaining}개 요청 강제 취소"
                )
                for task in self.active_requests:
                    task.cancel()
                await asyncio.gather(
                    *self.active_requests,
                    return_exceptions=True
                )

        # 2단계: 정리 콜백 실행
        logger.info("리소스 정리 시작...")
        for callback in self._cleanup_callbacks:
            try:
                await callback()
            except Exception as e:
                logger.error(f"정리 콜백 오류: {e}")

        # 3단계: 종료 신호 발행
        self.shutdown_event.set()
        logger.info("Graceful shutdown 완료")

    async def _wait_for_requests(self):
        """모든 활성 요청 완료 대기"""
        while self.active_requests:
            await asyncio.sleep(0.1)

    @asynccontextmanager
    async def track_request(self):
        """요청 추적 컨텍스트 매니저"""
        if self.is_shutting_down:
            raise Exception(
                "서버가 종료 중입니다. 새 요청을 받지 않습니다."
            )

        task = asyncio.current_task()
        self.active_requests.add(task)
        try:
            yield
        finally:
            self.active_requests.discard(task)

# === 서버에 통합 ===
shutdown_mgr = GracefulShutdownManager(shutdown_timeout=30.0)

# 정리 콜백 등록
shutdown_mgr.register_cleanup(db_pool.close)
shutdown_mgr.register_cleanup(state.redis.close)

# 도구 호출에서 요청 추적
@server.call_tool()
async def call_tool(name: str, arguments: dict):
    async with shutdown_mgr.track_request():
        return await execute_tool(name, arguments)

# 서버 시작
async def main():
    loop = asyncio.get_event_loop()
    shutdown_mgr.setup_signal_handlers(loop)
    await run_server()
    await shutdown_mgr.shutdown_event.wait()
Graceful Shutdown 체크포인트
  • 새 요청 거부: is_shutting_down 플래그로 새 요청을 즉시 거부합니다.
  • 진행 중 요청 추적: active_requests 셋으로 현재 처리 중인 요청을 추적합니다.
  • 타임아웃 후 강제 종료: 기본 30초 대기 후 응답하지 않는 요청은 task.cancel()로 강제 취소합니다.
  • 리소스 정리: DB 연결, Redis 연결, 파일 핸들 등을 순서대로 닫습니다.

관찰성 스택

프로덕션 MCP 서버의 안정적인 운영을 위해서는 메트릭(Metrics), 추적(Tracing), 알림(Alerting)을 통합한 관찰성(Observability) 스택이 필수입니다. 아래 다이어그램은 MCP 서버에 최적화된 관찰성 아키텍처를 보여줍니다.

MCP 서버 관찰성 스택 MCP Server /metrics (Prometheus) 구조화 로그 (JSON) OpenTelemetry SDK 트레이스 전송 Health: /health Ready: /health/ready Live: /health/live Prometheus 메트릭 수집/저장 PromQL 쿼리 scrape Jaeger 분산 추적 서비스 맵 traces Loki 로그 수집/인덱싱 LogQL 쿼리 logs Grafana 대시보드 시각화 통합 검색/분석 알림 규칙 관리 AlertManager 알림 라우팅/그룹핑 억제/사일런스 alerts Slack PagerDuty Email Prometheus(메트릭) + Jaeger(추적) + Loki(로그) → Grafana(시각화) + AlertManager(알림) 3가지 관찰성 축(Metrics, Traces, Logs)을 통합하여 MCP 서버의 상태를 완전히 파악

Grafana 대시보드 주요 패널

MCP 서버 대시보드에 포함해야 할 핵심 패널
  • 요청 처리량 (RPS): rate(mcp_tool_calls_total[5m]) - 도구별 초당 요청 수 추이 그래프. 트래픽 패턴을 파악하고 이상 급증을 탐지합니다.
  • 응답 시간 분포: histogram_quantile(0.95, mcp_tool_duration_seconds_bucket) - P50, P95, P99 지연 시간. 95번째 백분위가 SLA 기준을 넘지 않는지 모니터링합니다.
  • 에러율: rate(mcp_tool_calls_total{status="error"}[5m]) / rate(mcp_tool_calls_total[5m]) - 전체 요청 대비 에러 비율. 5% 초과 시 경고 알림을 설정합니다.
  • 활성 연결 수: mcp_active_connections - 실시간 연결 수 게이지. 연결 풀 고갈 전에 스케일링 필요 여부를 판단합니다.
  • Circuit Breaker 상태: 각 외부 서비스별 Circuit Breaker 상태(CLOSED/OPEN/HALF_OPEN)를 상태 타임라인으로 표시합니다.
  • 리소스 사용량: CPU, 메모리, 디스크 I/O, 네트워크 대역폭 등 인프라 메트릭을 함께 표시하여 상관관계를 분석합니다.
알림 규칙 설정 예시
  • CRITICAL (즉시 호출): 서비스 다운 5분 이상, 에러율 20% 초과, Circuit Breaker 전체 OPEN
  • WARNING (Slack 알림): 에러율 5% 초과, P95 응답 시간 2초 초과, 연결 풀 사용률 80% 초과
  • INFO (대시보드 표시): 배포 이벤트, 스케일링 이벤트, 설정 변경 기록

핵심 정리

  • MCP 고급 주제의 핵심 개념과 흐름을 정리합니다.
  • 보안를 단계별로 이해합니다.
  • 실전 적용 시 기준과 주의점을 확인합니다.

실무 팁

  • 입력/출력 예시를 고정해 재현성을 확보하세요.
  • MCP 고급 주제 범위를 작게 잡고 단계적으로 확장하세요.
  • 보안 조건을 문서화해 대응 시간을 줄이세요.