MCP 고급 주제

프로덕션 환경에서 MCP 서버를 안전하고 효율적으로 운영하기 위한 고급 주제를 다룹니다. 보안, 성능 최적화, 디버깅, 테스트, 배포 전략 등을 학습하세요.

업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다. 최신 내용은 공식 문서를 확인하세요.
이 페이지에서 배울 내용
  • 보안: 샌드박싱, 권한 관리, 인증
  • 성능 최적화: 캐싱, 연결 풀링, 병렬 처리
  • 멀티 서버 아키텍처 및 조정
  • 효과적인 디버깅 및 로깅 전략
  • 테스트 자동화
  • 프로덕션 배포 및 모니터링

보안

샌드박싱

MCP 서버는 외부 데이터에 접근하므로 적절한 샌드박싱이 필수입니다.

파일 시스템 샌드박싱

Python secure_filesystem.py
from pathlib import Path
from typing import List

class SecureFileSystem:
    """보안이 강화된 파일 시스템 접근"""

    def __init__(self, allowed_dirs: List[Path]):
        self.allowed_dirs = [d.resolve() for d in allowed_dirs]

    def validate_path(self, path: Path) -> Path:
        """경로 검증 및 정규화"""
        # 1. 경로 정규화 (심볼릭 링크 해결)
        resolved = path.resolve()

        # 2. 허용된 디렉토리 내에 있는지 확인
        if not any(resolved.is_relative_to(d) for d in self.allowed_dirs):
            raise PermissionError(f"Access denied: {path}")

        # 3. 숨김 파일/디렉토리 차단 (선택사항)
        for part in resolved.parts:
            if part.startswith(".") and part not in {".", ".."}:
                raise PermissionError(f"Hidden files not allowed: {path}")

        return resolved

    def read_file(self, path: str) -> str:
        """안전한 파일 읽기"""
        validated_path = self.validate_path(Path(path))

        # 파일 크기 제한
        max_size = 10 * 1024 * 1024  # 10MB
        size = validated_path.stat().st_size
        if size > max_size:
            raise ValueError(f"File too large: {size} bytes")

        return validated_path.read_text(encoding="utf-8")

    def write_file(self, path: str, content: str):
        """안전한 파일 쓰기"""
        validated_path = self.validate_path(Path(path))

        # 콘텐츠 크기 제한
        max_size = 5 * 1024 * 1024  # 5MB
        if len(content) > max_size:
            raise ValueError(f"Content too large: {len(content)} bytes")

        # 임시 파일에 먼저 쓰고 원자적으로 이동
        tmp_path = validated_path.with_suffix(".tmp")
        try:
            tmp_path.write_text(content, encoding="utf-8")
            tmp_path.replace(validated_path)
        finally:
            if tmp_path.exists():
                tmp_path.unlink()

SQL 인젝션 방지

Python secure_sql.py
import sqlparse
from sqlparse.sql import Token, TokenList
from typing import List

class SecureSQL:
    """SQL 쿼리 검증 및 실행"""

    ALLOWED_STATEMENTS = {"SELECT"}  # 읽기 전용
    FORBIDDEN_KEYWORDS = {
        "DROP", "DELETE", "UPDATE", "INSERT",
        "ALTER", "CREATE", "TRUNCATE", "EXEC"
    }

    def validate_query(self, sql: str) -> bool:
        """쿼리 안전성 검증"""
        # 1. 파싱
        parsed = sqlparse.parse(sql)
        if not parsed:
            raise ValueError("Invalid SQL syntax")

        for statement in parsed:
            # 2. 문장 타입 확인
            stmt_type = statement.get_type()
            if stmt_type not in self.ALLOWED_STATEMENTS:
                raise ValueError(f"Statement type not allowed: {stmt_type}")

            # 3. 금지된 키워드 확인
            tokens = [t for t in statement.flatten() if t.ttype is Token.Keyword]
            for token in tokens:
                if token.value.upper() in self.FORBIDDEN_KEYWORDS:
                    raise ValueError(f"Forbidden keyword: {token.value}")

        return True

    async def execute_safe(self, conn, sql: str, params: List = None):
        """안전한 쿼리 실행"""
        self.validate_query(sql)

        # 매개변수화된 쿼리만 허용
        if params is None:
            params = []

        # 타임아웃 설정
        async with asyncio.timeout(30):  # 30초
            result = await conn.fetch(sql, *params)

        return result

인증 및 권한 관리

API 키 인증

Python auth_server.py
import os
import hashlib
import secrets
from typing import Dict, Optional

class APIKeyAuth:
    """API 키 기반 인증"""

    def __init__(self):
        # 환경 변수에서 마스터 키 로드
        self.master_key = os.getenv("MCP_MASTER_KEY")
        if not self.master_key:
            raise ValueError("MCP_MASTER_KEY not set")

        # API 키 해시 저장 (실제로는 DB 사용)
        self.api_keys: Dict[str, dict] = {}

    def generate_api_key(self, user_id: str, permissions: list) -> str:
        """새 API 키 생성"""
        # 안전한 랜덤 키 생성
        api_key = secrets.token_urlsafe(32)

        # 해시 저장 (원본은 사용자에게만 표시)
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()

        self.api_keys[key_hash] = {
            "user_id": user_id,
            "permissions": permissions,
            "created_at": datetime.now()
        }

        return api_key

    def validate_api_key(self, api_key: str) -> Optional[dict]:
        """API 키 검증"""
        key_hash = hashlib.sha256(api_key.encode()).hexdigest()
        return self.api_keys.get(key_hash)

    def check_permission(self, api_key: str, required_permission: str) -> bool:
        """권한 확인"""
        key_info = self.validate_api_key(api_key)
        if not key_info:
            return False

        return required_permission in key_info["permissions"]

# 서버에 통합
auth = APIKeyAuth()

@server.call_tool()
async def call_tool(name: str, arguments: dict, meta: dict):
    # 메타데이터에서 API 키 추출
    api_key = meta.get("api_key")
    if not api_key:
        raise PermissionError("API key required")

    # 도구별 권한 확인
    required_permission = f"tool:{name}"
    if not auth.check_permission(api_key, required_permission):
        raise PermissionError(f"Permission denied: {required_permission}")

    # 도구 실행...

Rate Limiting

Python rate_limiter.py
import time
from collections import defaultdict
from typing import Dict, Tuple

class RateLimiter:
    """토큰 버킷 알고리즘 기반 Rate Limiter"""

    def __init__(self, requests_per_minute: int = 60):
        self.capacity = requests_per_minute
        self.refill_rate = requests_per_minute / 60.0  # 초당 토큰 수

        # 사용자별 버킷: {user_id: (tokens, last_update)}
        self.buckets: Dict[str, Tuple[float, float]] = defaultdict(
            lambda: (self.capacity, time.time())
        )

    def _refill(self, user_id: str) -> float:
        """버킷 리필"""
        tokens, last_update = self.buckets[user_id]
        now = time.time()
        elapsed = now - last_update

        # 경과 시간에 따라 토큰 추가
        tokens = min(self.capacity, tokens + elapsed * self.refill_rate)

        self.buckets[user_id] = (tokens, now)
        return tokens

    def allow_request(self, user_id: str) -> bool:
        """요청 허용 여부 확인"""
        tokens = self._refill(user_id)

        if tokens >= 1.0:
            self.buckets[user_id] = (tokens - 1.0, time.time())
            return True

        return False

    def get_retry_after(self, user_id: str) -> float:
        """다음 요청 가능 시간 (초)"""
        tokens = self._refill(user_id)
        if tokens >= 1.0:
            return 0.0

        needed = 1.0 - tokens
        return needed / self.refill_rate

# 사용 예제
rate_limiter = RateLimiter(requests_per_minute=100)

@server.call_tool()
async def call_tool(name: str, arguments: dict, user_id: str):
    if not rate_limiter.allow_request(user_id):
        retry_after = rate_limiter.get_retry_after(user_id)
        raise Exception(f"Rate limit exceeded. Retry after {retry_after:.2f}s")

    # 도구 실행...

성능 최적화

캐싱

Python cache_decorator.py
import asyncio
import hashlib
import json
from functools import wraps
from typing import Any, Callable
from datetime import datetime, timedelta

class AsyncCache:
    """비동기 메모리 캐시"""

    def __init__(self):
        self.cache: dict[str, tuple[Any, datetime]] = {}

    def _make_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        """캐시 키 생성"""
        key_data = {
            "func": func_name,
            "args": args,
            "kwargs": kwargs
        }
        key_str = json.dumps(key_data, sort_keys=True)
        return hashlib.sha256(key_str.encode()).hexdigest()

    def get(self, key: str, ttl: int) -> Any:
        """캐시에서 값 가져오기"""
        if key in self.cache:
            value, timestamp = self.cache[key]
            if datetime.now() - timestamp < timedelta(seconds=ttl):
                return value
            else:
                del self.cache[key]
        return None

    def set(self, key: str, value: Any):
        """캐시에 값 저장"""
        self.cache[key] = (value, datetime.now())

    def clear(self):
        """캐시 초기화"""
        self.cache.clear()

# 글로벌 캐시 인스턴스
cache = AsyncCache()

def cached(ttl: int = 300):
    """캐싱 데코레이터 (TTL 기본 5분)"""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 캐시 키 생성
            cache_key = cache._make_key(func.__name__, args, kwargs)

            # 캐시 조회
            cached_value = cache.get(cache_key, ttl)
            if cached_value is not None:
                return cached_value

            # 실제 함수 실행
            result = await func(*args, **kwargs)

            # 결과 캐싱
            cache.set(cache_key, result)

            return result

        return wrapper
    return decorator

# 사용 예제
@server.tool()
@cached(ttl=600)  # 10분
async def fetch_github_issues(repo: str) -> list:
    """GitHub 이슈 조회 (캐싱됨)"""
    # 비용이 큰 API 호출...
    async with httpx.AsyncClient() as client:
        response = await client.get(f"https://api.github.com/repos/{repo}/issues")
        return response.json()

연결 풀링

Python db_pool.py
import asyncpg
from typing import Optional

class DatabasePool:
    """PostgreSQL 연결 풀 관리"""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.pool: Optional[asyncpg.Pool] = None

    async def initialize(self):
        """풀 초기화"""
        self.pool = await asyncpg.create_pool(
            self.dsn,
            min_size=5,           # 최소 연결 수
            max_size=20,          # 최대 연결 수
            max_queries=50000,    # 연결당 최대 쿼리 수 (재생성 전)
            max_inactive_connection_lifetime=300,  # 비활성 연결 수명 (초)
            command_timeout=60,    # 쿼리 타임아웃
        )

    async def close(self):
        """풀 종료"""
        if self.pool:
            await self.pool.close()

    async def execute(self, query: str, *args):
        """쿼리 실행"""
        async with self.pool.acquire() as conn:
            return await conn.execute(query, *args)

    async def fetch(self, query: str, *args):
        """데이터 조회"""
        async with self.pool.acquire() as conn:
            return await conn.fetch(query, *args)

# 서버 초기화 시 풀 생성
db_pool = DatabasePool("postgresql://user:pass@localhost/db")

async def startup():
    await db_pool.initialize()

async def shutdown():
    await db_pool.close()

병렬 처리

Python parallel_tasks.py
import asyncio
from typing import List, Any

async def process_files_parallel(file_paths: List[str]) -> List[Any]:
    """파일들을 병렬로 처리"""

    async def process_single_file(path: str) -> dict:
        """단일 파일 처리"""
        # 비동기 파일 읽기 (aiofiles 사용)
        import aiofiles
        async with aiofiles.open(path, 'r') as f:
            content = await f.read()

        # 처리 로직...
        return {
            "path": path,
            "size": len(content),
            "lines": content.count("\n")
        }

    # asyncio.gather로 병렬 실행
    results = await asyncio.gather(
        *[process_single_file(path) for path in file_paths],
        return_exceptions=True  # 개별 에러를 결과에 포함
    )

    # 에러 필터링
    valid_results = [r for r in results if not isinstance(r, Exception)]

    return valid_results

# Semaphore로 동시 실행 수 제한
async def process_with_limit(items: List[Any], max_concurrent: int = 10):
    """동시 실행 수를 제한하여 처리"""
    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_item(item):
        async with semaphore:
            # 처리 로직...
            await asyncio.sleep(0.1)  # 예시
            return item

    results = await asyncio.gather(*[process_item(item) for item in items])
    return results

스트리밍 응답

Python streaming.py
async def stream_large_file(path: str, chunk_size: int = 8192):
    """대용량 파일을 청크 단위로 스트리밍"""
    import aiofiles

    async with aiofiles.open(path, 'rb') as f:
        while True:
            chunk = await f.read(chunk_size)
            if not chunk:
                break

            # 청크 전송 (MCP progress notification 사용)
            yield chunk

@server.tool()
async def read_large_file(path: str):
    """대용량 파일 읽기 with progress"""
    total_size = Path(path).stat().st_size
    processed = 0

    chunks = []
    async for chunk in stream_large_file(path):
        chunks.append(chunk)
        processed += len(chunk)

        # Progress notification 전송
        progress = processed / total_size
        await server.send_progress_notification(
            token="read_file",
            progress=progress,
            total=total_size
        )

    return b"".join(chunks)

멀티 서버 아키텍처

서버 간 조정

여러 MCP 서버를 동시에 사용할 때의 조정 전략:

TypeScript multi_server_client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

class MultiServerClient {
  private servers: Map<string, Client> = new Map();
  private toolRegistry: Map<string, string> = new Map();

  async addServer(name: string, command: string, args: string[]) {
    // 클라이언트 생성 및 연결
    const client = new Client({ name: `client-${name}`, version: "1.0.0" }, {
      capabilities: { tools: {} }
    });

    const transport = new StdioClientTransport({ command, args });
    await client.connect(transport);

    this.servers.set(name, client);

    // 도구 목록 조회 및 등록
    const tools = await client.listTools();
    for (const tool of tools.tools) {
      if (this.toolRegistry.has(tool.name)) {
        console.warn(`Tool ${tool.name} already registered, skipping`);
      } else {
        this.toolRegistry.set(tool.name, name);
      }
    }
  }

  async callTool(toolName: string, args: any) {
    // 도구가 등록된 서버 찾기
    const serverName = this.toolRegistry.get(toolName);
    if (!serverName) {
      throw new Error(`Tool ${toolName} not found`);
    }

    const client = this.servers.get(serverName);
    return await client!.callTool(toolName, args);
  }

  async closeAll() {
    for (const client of this.servers.values()) {
      await client.close();
    }
  }
}

// 사용 예제
const multiClient = new MultiServerClient();

await multiClient.addServer("filesystem", "npx", [
  "-y", "@modelcontextprotocol/server-filesystem", "/home/user"
]);

await multiClient.addServer("github", "npx", [
  "-y", "@modelcontextprotocol/server-github"
]);

// 통합된 인터페이스로 도구 호출
await multiClient.callTool("read_file", { path: "/home/user/test.txt" });
await multiClient.callTool("list_issues", { owner: "anthropics", repo: "claude" });

도구 네임스페이싱

여러 서버에서 동일한 이름의 도구를 제공하는 경우 충돌을 방지합니다:

Python namespaced_tools.py
class NamespacedServer:
    """네임스페이스가 적용된 MCP 서버"""

    def __init__(self, namespace: str):
        self.namespace = namespace
        self.server = Server(f"{namespace}-server")

    def tool(self, name: str):
        """네임스페이스가 적용된 도구 등록"""
        namespaced_name = f"{self.namespace}_{name}"

        def decorator(func):
            @self.server.tool(name=namespaced_name)
            async def wrapper(*args, **kwargs):
                return await func(*args, **kwargs)
            return wrapper

        return decorator

# 사용 예제
local_fs = NamespacedServer("local")
remote_fs = NamespacedServer("remote")

@local_fs.tool("read_file")
async def local_read_file(path: str):
    # 로컬 파일 시스템 읽기...
    pass

@remote_fs.tool("read_file")
async def remote_read_file(path: str):
    # 원격 파일 시스템 읽기...
    pass

# 결과: "local_read_file", "remote_read_file" 도구 생성

디버깅

로깅

Python logging_config.py
import logging
import json
from datetime import datetime
from pathlib import Path

class MCPLogger:
    """구조화된 로깅"""

    def __init__(self, name: str, log_dir: Path):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)

        # 파일 핸들러 (JSON 형식)
        log_file = log_dir / f"{name}.log"
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(self._json_formatter())
        self.logger.addHandler(file_handler)

        # 콘솔 핸들러 (사람이 읽기 쉬운 형식)
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        self.logger.addHandler(console_handler)

    def _json_formatter(self):
        class JSONFormatter(logging.Formatter):
            def format(self, record):
                log_data = {
                    "timestamp": datetime.utcnow().isoformat(),
                    "level": record.levelname,
                    "logger": record.name,
                    "message": record.getMessage(),
                    "module": record.module,
                    "function": record.funcName,
                    "line": record.lineno,
                }

                if record.exc_info:
                    log_data["exception"] = self.formatException(record.exc_info)

                return json.dumps(log_data)

        return JSONFormatter()

    def log_tool_call(self, tool_name: str, arguments: dict, result: any = None, error: Exception = None):
        """도구 호출 로깅"""
        log_data = {
            "event": "tool_call",
            "tool": tool_name,
            "arguments": arguments,
        }

        if result is not None:
            log_data["result_size"] = len(str(result))

        if error:
            self.logger.error(f"Tool call failed: {json.dumps(log_data)}", exc_info=error)
        else:
            self.logger.info(f"Tool call succeeded: {json.dumps(log_data)}")

# 사용 예제
logger = MCPLogger("my-server", Path("/var/log/mcp"))

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    try:
        result = await execute_tool(name, arguments)
        logger.log_tool_call(name, arguments, result=result)
        return result
    except Exception as e:
        logger.log_tool_call(name, arguments, error=e)
        raise

분산 추적

Python tracing.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter

# Tracer 설정
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Jaeger 익스포터
jaeger_exporter = JaegerExporter(
    agent_host_name="localhost",
    agent_port=6831,
)

span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# 도구 호출 추적
@server.call_tool()
async def call_tool(name: str, arguments: dict):
    with tracer.start_as_current_span(f"tool.{name}") as span:
        # 속성 추가
        span.set_attribute("tool.name", name)
        span.set_attribute("tool.args", str(arguments))

        try:
            result = await execute_tool(name, arguments)
            span.set_attribute("tool.status", "success")
            return result
        except Exception as e:
            span.set_attribute("tool.status", "error")
            span.record_exception(e)
            raise

디버깅 도구

Bash
# MCP Inspector - 대화형 디버깅 도구
npx @modelcontextprotocol/inspector python server.py

# 로그 수준 설정
MCP_LOG_LEVEL=debug python server.py

# 프로토콜 메시지 덤프
MCP_DUMP_PROTOCOL=1 python server.py 2> protocol.log

테스트

단위 테스트

Python test_server.py
import pytest
from mcp.server.stdio import stdio_server
from mcp import types

@pytest.mark.asyncio
async def test_read_file_tool():
    """read_file 도구 테스트"""
    from my_server import server

    # 임시 파일 생성
    import tempfile
    with tempfile.NamedTemporaryFile(mode='w', delete=False) as f:
        f.write("test content")
        temp_path = f.name

    try:
        # 도구 호출
        result = await server.call_tool("read_file", {"path": temp_path})

        # 검증
        assert len(result) == 1
        assert result[0].type == "text"
        assert result[0].text == "test content"

    finally:
        import os
        os.unlink(temp_path)

@pytest.mark.asyncio
async def test_permission_denied():
    """권한 거부 테스트"""
    from my_server import server

    with pytest.raises(PermissionError):
        await server.call_tool("read_file", {"path": "/etc/shadow"})

통합 테스트

Python test_integration.py
import pytest
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

@pytest.mark.asyncio
async def test_full_workflow():
    """전체 워크플로우 테스트"""

    # 서버 시작
    server_params = StdioServerParameters(
        command="python",
        args=["my_server.py"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # 초기화
            await session.initialize()

            # 도구 목록 조회
            tools = await session.list_tools()
            assert len(tools.tools) > 0

            # 도구 실행
            result = await session.call_tool("list_directory", {
                "path": "/tmp"
            })

            assert result is not None

부하 테스트

Python load_test.py
import asyncio
import time
from statistics import mean, stdev

async def load_test(num_requests: int = 1000, concurrency: int = 50):
    """부하 테스트"""

    latencies = []
    errors = 0

    async def single_request():
        nonlocal errors
        start = time.time()
        try:
            # 요청 실행...
            result = await session.call_tool("some_tool", {})
            latency = time.time() - start
            latencies.append(latency)
        except Exception:
            errors += 1

    # 동시 요청 실행
    tasks = [single_request() for _ in range(num_requests)]

    # Semaphore로 동시성 제한
    semaphore = asyncio.Semaphore(concurrency)

    async def bounded_task(task):
        async with semaphore:
            await task

    await asyncio.gather(*[bounded_task(t) for t in tasks])

    # 결과 분석
    print(f"Total requests: {num_requests}")
    print(f"Successful: {len(latencies)}")
    print(f"Errors: {errors}")
    print(f"Mean latency: {mean(latencies):.3f}s")
    print(f"Stdev latency: {stdev(latencies):.3f}s")
    print(f"Min/Max latency: {min(latencies):.3f}s / {max(latencies):.3f}s")

프로덕션 배포

Docker 컨테이너화

Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 의존성 설치
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 애플리케이션 복사
COPY . .

# 비-root 사용자로 실행
RUN useradd -m mcpuser
USER mcpuser

# 헬스체크
HEALTHCHECK --interval=30s --timeout=3s \
  CMD python -c "import sys; sys.exit(0)"

CMD ["python", "server.py"]
YAML docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: .
    restart: unless-stopped
    environment:
      - MCP_LOG_LEVEL=info
      - DB_CONNECTION_STRING=${DB_CONNECTION_STRING}
    volumes:
      - ./data:/app/data:ro
      - ./logs:/app/logs
    networks:
      - mcp-network
    deploy:
      resources:
        limits:
          cpus: '2'
          memory: 1G
        reservations:
          cpus: '0.5'
          memory: 256M

networks:
  mcp-network:
    driver: bridge

모니터링

Python metrics.py
from prometheus_client import Counter, Histogram, Gauge, start_http_server

# 메트릭 정의
tool_calls_total = Counter(
    'mcp_tool_calls_total',
    'Total number of tool calls',
    ['tool_name', 'status']
)

tool_duration = Histogram(
    'mcp_tool_duration_seconds',
    'Tool execution duration',
    ['tool_name']
)

active_connections = Gauge(
    'mcp_active_connections',
    'Number of active connections'
)

# Prometheus 서버 시작 (별도 포트)
start_http_server(9090)

@server.call_tool()
async def call_tool(name: str, arguments: dict):
    with tool_duration.labels(tool_name=name).time():
        try:
            result = await execute_tool(name, arguments)
            tool_calls_total.labels(tool_name=name, status='success').inc()
            return result
        except Exception:
            tool_calls_total.labels(tool_name=name, status='error').inc()
            raise

CI/CD 파이프라인

YAML .github/workflows/deploy.yml
name: Deploy MCP Server

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - run: pip install -r requirements.txt
      - run: pytest tests/

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - uses: docker/build-push-action@v4
        with:
          push: true
          tags: myregistry/mcp-server:latest

  deploy:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - run: |
          ssh deploy@server 'docker pull myregistry/mcp-server:latest'
          ssh deploy@server 'docker-compose up -d'

베스트 프랙티스

보안
  • 최소 권한 원칙 적용
  • 입력 검증 철저히
  • API 키 환경 변수로 관리
  • Rate limiting 구현
  • 샌드박싱으로 격리
성능
  • 적절한 캐싱 전략
  • 연결 풀링 사용
  • 비동기 I/O 활용
  • 타임아웃 설정
  • 리소스 제한 (CPU, 메모리)
신뢰성
  • 포괄적인 에러 처리
  • 재시도 로직 (exponential backoff)
  • Circuit breaker 패턴
  • 헬스체크 엔드포인트
  • Graceful shutdown
관찰성
  • 구조화된 로깅
  • 메트릭 수집 (Prometheus)
  • 분산 추적 (Jaeger, Zipkin)
  • 알림 설정 (PagerDuty, Slack)

다음 단계

학습 경로
  1. 보안 강화: 프로덕션 서버에 인증 및 권한 관리 추가
  2. 성능 테스트: 부하 테스트로 병목 지점 식별 및 최적화
  3. 모니터링 구축: Prometheus + Grafana 대시보드 설정
  4. CI/CD 구축: 자동화된 테스트 및 배포 파이프라인
  5. 스케일링: 수평 확장 전략 수립

추가 리소스

핵심 정리

  • MCP 고급 주제의 핵심 개념과 흐름을 정리합니다.
  • 보안를 단계별로 이해합니다.
  • 실전 적용 시 기준과 주의점을 확인합니다.

실무 팁

  • 입력/출력 예시를 고정해 재현성을 확보하세요.
  • MCP 고급 주제 범위를 작게 잡고 단계적으로 확장하세요.
  • 보안 조건을 문서화해 대응 시간을 줄이세요.