MCP 실전 예제

실무에서 바로 활용할 수 있는 완전한 MCP 서버 구현 예제를 제공합니다. 파일 시스템, 데이터베이스, API 통합, RAG 시스템 등 다양한 사용 사례를 Python과 TypeScript로 구현하는 방법을 학습하세요.

업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다. 최신 내용은 공식 문서를 확인하세요.
이 페이지에서 배울 내용
  • 파일 시스템 서버 완전 구현 (Python/TypeScript)
  • 데이터베이스 서버 (SQLite, PostgreSQL)
  • GitHub API 통합 서버
  • Slack 봇 MCP 서버
  • RAG 시스템 구현
  • 각 예제마다 서버 + 클라이언트 코드 제공

파일 시스템 서버

Python 구현

파일 시스템 MCP 서버는 가장 기본적이면서도 실용적인 예제입니다. 파일 읽기/쓰기, 디렉토리 탐색, 검색 등의 기능을 제공합니다.

Python filesystem_server.py
import asyncio
import os
import json
from pathlib import Path
from typing import Optional, List

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Tool,
    TextContent,
    ImageContent,
    EmbeddedResource,
)

# 서버 인스턴스 생성
server = Server("filesystem-server")

# 허용된 디렉토리 (보안을 위해)
ALLOWED_DIRECTORIES = [
    Path.home() / "Documents",
    Path.home() / "Projects",
]

def is_path_allowed(path: Path) -> bool:
    """경로가 허용된 디렉토리 내에 있는지 확인"""
    path = path.resolve()
    return any(
        path.is_relative_to(allowed_dir)
        for allowed_dir in ALLOWED_DIRECTORIES
    )

@server.list_tools()
async def list_tools() -> list[Tool]:
    """사용 가능한 도구 목록 반환"""
    return [
        Tool(
            name="read_file",
            description="파일 내용을 읽습니다. 텍스트 및 바이너리 파일 지원.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "읽을 파일의 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="write_file",
            description="파일에 내용을 씁니다. 파일이 없으면 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "쓸 파일의 경로"
                    },
                    "content": {
                        "type": "string",
                        "description": "파일에 쓸 내용"
                    }
                },
                "required": ["path", "content"]
            }
        ),
        Tool(
            name="list_directory",
            description="디렉토리의 파일과 하위 디렉토리 목록을 반환합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "나열할 디렉토리 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="search_files",
            description="파일 이름 패턴으로 파일을 검색합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "directory": {
                        "type": "string",
                        "description": "검색할 디렉토리"
                    },
                    "pattern": {
                        "type": "string",
                        "description": "검색 패턴 (예: *.py)"
                    }
                },
                "required": ["directory", "pattern"]
            }
        ),
        Tool(
            name="create_directory",
            description="새 디렉토리를 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "생성할 디렉토리 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="delete_file",
            description="파일 또는 디렉토리를 삭제합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "삭제할 파일/디렉토리 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="get_file_info",
            description="파일의 메타데이터(크기, 수정일 등)를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "정보를 조회할 파일 경로"
                    }
                },
                "required": ["path"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """도구 실행"""

    if name == "read_file":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")

        try:
            content = path.read_text(encoding="utf-8")
            return [TextContent(
                type="text",
                text=content
            )]
        except UnicodeDecodeError:
            # 바이너리 파일인 경우
            size = path.stat().st_size
            return [TextContent(
                type="text",
                text=f"Binary file ({size} bytes)"
            )]

    elif name == "write_file":
        path = Path(arguments["path"])
        content = arguments["content"]

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(content, encoding="utf-8")

        return [TextContent(
            type="text",
            text=f"Successfully wrote {len(content)} characters to {path}"
        )]

    elif name == "list_directory":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if not path.is_dir():
            raise NotADirectoryError(f"Not a directory: {path}")

        items = []
        for item in sorted(path.iterdir()):
            item_type = "dir" if item.is_dir() else "file"
            size = item.stat().st_size if item.is_file() else 0
            items.append(f"[{item_type}] {item.name} ({size} bytes)")

        return [TextContent(
            type="text",
            text="\n".join(items) if items else "Empty directory"
        )]

    elif name == "search_files":
        directory = Path(arguments["directory"])
        pattern = arguments["pattern"]

        if not is_path_allowed(directory):
            raise ValueError(f"Access denied: {directory}")

        matches = list(directory.rglob(pattern))
        results = [str(match.relative_to(directory)) for match in matches]

        return [TextContent(
            type="text",
            text=f"Found {len(results)} files:\n" + "\n".join(results)
        )]

    elif name == "create_directory":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        path.mkdir(parents=True, exist_ok=True)

        return [TextContent(
            type="text",
            text=f"Created directory: {path}"
        )]

    elif name == "delete_file":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if path.is_file():
            path.unlink()
            return [TextContent(type="text", text=f"Deleted file: {path}")]
        elif path.is_dir():
            import shutil
            shutil.rmtree(path)
            return [TextContent(type="text", text=f"Deleted directory: {path}")]
        else:
            raise FileNotFoundError(f"Not found: {path}")

    elif name == "get_file_info":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if not path.exists():
            raise FileNotFoundError(f"Not found: {path}")

        stat = path.stat()
        from datetime import datetime

        info = {
            "name": path.name,
            "path": str(path),
            "type": "directory" if path.is_dir() else "file",
            "size": stat.st_size,
            "created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
            "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
        }

        return [TextContent(
            type="text",
            text=json.dumps(info, indent=2)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    """서버 실행"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

클라이언트 사용 예제

TypeScript filesystem_client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

async function main() {
  // 클라이언트 생성
  const client = new Client({
    name: "filesystem-client",
    version: "1.0.0"
  }, {
    capabilities: {
      tools: {}
    }
  });

  // 전송 계층 설정 (Python 서버 실행)
  const transport = new StdioClientTransport({
    command: "python",
    args: ["filesystem_server.py"]
  });

  try {
    // 서버 연결
    await client.connect(transport);
    console.log("✓ Connected to filesystem server");

    // 사용 가능한 도구 목록 조회
    const tools = await client.listTools();
    console.log("\nAvailable tools:");
    tools.tools.forEach(tool => {
      console.log(`  - ${tool.name}: ${tool.description}`);
    });

    // 예제 1: 디렉토리 나열
    console.log("\n--- Listing directory ---");
    const listResult = await client.callTool("list_directory", {
      path: process.env.HOME + "/Documents"
    });
    console.log(listResult.content[0].text);

    // 예제 2: 파일 쓰기
    console.log("\n--- Writing file ---");
    const writeResult = await client.callTool("write_file", {
      path: process.env.HOME + "/Documents/test.txt",
      content: "Hello from MCP!\nThis is a test file."
    });
    console.log(writeResult.content[0].text);

    // 예제 3: 파일 읽기
    console.log("\n--- Reading file ---");
    const readResult = await client.callTool("read_file", {
      path: process.env.HOME + "/Documents/test.txt"
    });
    console.log(readResult.content[0].text);

    // 예제 4: 파일 검색
    console.log("\n--- Searching files ---");
    const searchResult = await client.callTool("search_files", {
      directory: process.env.HOME + "/Documents",
      pattern: "*.txt"
    });
    console.log(searchResult.content[0].text);

    // 예제 5: 파일 정보 조회
    console.log("\n--- File info ---");
    const infoResult = await client.callTool("get_file_info", {
      path: process.env.HOME + "/Documents/test.txt"
    });
    console.log(infoResult.content[0].text);

  } catch (error) {
    console.error("Error:", error);
  } finally {
    await client.close();
    console.log("\n✓ Connection closed");
  }
}

main();

TypeScript 서버 구현

TypeScript filesystem-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  Tool,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";
import { glob } from "glob";
import os from "os";

// 허용된 디렉토리
const ALLOWED_DIRECTORIES = [
  path.join(os.homedir(), "Documents"),
  path.join(os.homedir(), "Projects"),
];

function isPathAllowed(filePath: string): boolean {
  const resolved = path.resolve(filePath);
  return ALLOWED_DIRECTORIES.some(dir => resolved.startsWith(dir));
}

// 서버 생성
const server = new Server({
  name: "filesystem-server",
  version: "1.0.0"
}, {
  capabilities: {
    tools: {}
  }
});

// 도구 목록 핸들러
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "파일 내용을 읽습니다",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "읽을 파일 경로"
            }
          },
          required: ["path"]
        }
      },
      {
        name: "write_file",
        description: "파일에 내용을 씁니다",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string", description: "파일 경로" },
            content: { type: "string", description: "파일 내용" }
          },
          required: ["path", "content"]
        }
      },
      {
        name: "list_directory",
        description: "디렉토리 내용을 나열합니다",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string", description: "디렉토리 경로" }
          },
          required: ["path"]
        }
      },
      {
        name: "search_files",
        description: "패턴으로 파일을 검색합니다",
        inputSchema: {
          type: "object",
          properties: {
            directory: { type: "string" },
            pattern: { type: "string" }
          },
          required: ["directory", "pattern"]
        }
      }
    ] as Tool[]
  };
});

// 도구 호출 핸들러
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file": {
      const filePath = args.path as string;

      if (!isPathAllowed(filePath)) {
        throw new Error(`Access denied: ${filePath}`);
      }

      const content = await fs.readFile(filePath, "utf-8");

      return {
        content: [{
          type: "text",
          text: content
        }]
      };
    }

    case "write_file": {
      const filePath = args.path as string;
      const content = args.content as string;

      if (!isPathAllowed(filePath)) {
        throw new Error(`Access denied: ${filePath}`);
      }

      await fs.mkdir(path.dirname(filePath), { recursive: true });
      await fs.writeFile(filePath, content, "utf-8");

      return {
        content: [{
          type: "text",
          text: `Successfully wrote ${content.length} characters to ${filePath}`
        }]
      };
    }

    case "list_directory": {
      const dirPath = args.path as string;

      if (!isPathAllowed(dirPath)) {
        throw new Error(`Access denied: ${dirPath}`);
      }

      const entries = await fs.readdir(dirPath, { withFileTypes: true });
      const items = await Promise.all(
        entries.map(async entry => {
          const fullPath = path.join(dirPath, entry.name);
          const stats = await fs.stat(fullPath);
          const type = entry.isDirectory() ? "dir" : "file";
          return `[${type}] ${entry.name} (${stats.size} bytes)`;
        })
      );

      return {
        content: [{
          type: "text",
          text: items.join("\n")
        }]
      };
    }

    case "search_files": {
      const directory = args.directory as string;
      const pattern = args.pattern as string;

      if (!isPathAllowed(directory)) {
        throw new Error(`Access denied: ${directory}`);
      }

      const matches = await glob(pattern, {
        cwd: directory,
        nodir: false
      });

      return {
        content: [{
          type: "text",
          text: `Found ${matches.length} files:\n${matches.join("\n")}`
        }]
      };
    }

    default:
      throw new Error(`Unknown tool: ${name}`);
  }
});

// 서버 시작
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Filesystem MCP Server running on stdio");
}

main().catch(console.error);

데이터베이스 서버

SQLite MCP 서버

SQLite 데이터베이스에 접근하는 MCP 서버 구현입니다. 쿼리 실행, 스키마 조회, 트랜잭션 관리 등을 지원합니다.

Python sqlite_server.py
import asyncio
import sqlite3
import json
from typing import Any, List, Dict
from pathlib import Path

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("sqlite-server")

# 데이터베이스 연결 풀
connections: Dict[str, sqlite3.Connection] = {}

def get_connection(db_path: str) -> sqlite3.Connection:
    """데이터베이스 연결 가져오기 (연결 풀 사용)"""
    if db_path not in connections:
        conn = sqlite3.connect(db_path)
        conn.row_factory = sqlite3.Row  # 딕셔너리 형태로 결과 반환
        connections[db_path] = conn
    return connections[db_path]

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="query",
            description="SQL SELECT 쿼리를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {
                        "type": "string",
                        "description": "데이터베이스 파일 경로"
                    },
                    "sql": {
                        "type": "string",
                        "description": "실행할 SELECT 쿼리"
                    },
                    "params": {
                        "type": "array",
                        "description": "쿼리 매개변수 (선택사항)",
                        "items": {"type": ["string", "number", "null"]}
                    }
                },
                "required": ["db_path", "sql"]
            }
        ),
        Tool(
            name="execute",
            description="SQL INSERT/UPDATE/DELETE 쿼리를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {"type": "string"},
                    "sql": {"type": "string"},
                    "params": {
                        "type": "array",
                        "items": {"type": ["string", "number", "null"]}
                    }
                },
                "required": ["db_path", "sql"]
            }
        ),
        Tool(
            name="get_schema",
            description="데이터베이스 스키마를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {"type": "string"},
                    "table_name": {
                        "type": "string",
                        "description": "특정 테이블만 조회 (선택사항)"
                    }
                },
                "required": ["db_path"]
            }
        ),
        Tool(
            name="create_table",
            description="새 테이블을 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {"type": "string"},
                    "table_name": {"type": "string"},
                    "columns": {
                        "type": "object",
                        "description": "컬럼명: 타입 형식"
                    }
                },
                "required": ["db_path", "table_name", "columns"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    db_path = arguments["db_path"]
    conn = get_connection(db_path)
    cursor = conn.cursor()

    if name == "query":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        # SELECT만 허용
        if not sql.strip().upper().startswith("SELECT"):
            raise ValueError("Only SELECT queries are allowed in query()")

        cursor.execute(sql, params)
        rows = cursor.fetchall()

        # Row 객체를 딕셔너리로 변환
        result = [dict(row) for row in rows]

        return [TextContent(
            type="text",
            text=json.dumps(result, indent=2, ensure_ascii=False)
        )]

    elif name == "execute":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        cursor.execute(sql, params)
        conn.commit()

        return [TextContent(
            type="text",
            text=f"Executed successfully. Rows affected: {cursor.rowcount}"
        )]

    elif name == "get_schema":
        table_name = arguments.get("table_name")

        if table_name:
            # 특정 테이블 스키마
            cursor.execute(f"PRAGMA table_info({table_name})")
            columns = cursor.fetchall()
            schema = {
                "table": table_name,
                "columns": [dict(col) for col in columns]
            }
        else:
            # 전체 데이터베이스 스키마
            cursor.execute("""
                SELECT name, type, sql
                FROM sqlite_master
                WHERE type IN ('table', 'view')
                ORDER BY name
            """)
            objects = cursor.fetchall()
            schema = {
                "database": db_path,
                "objects": [dict(obj) for obj in objects]
            }

        return [TextContent(
            type="text",
            text=json.dumps(schema, indent=2)
        )]

    elif name == "create_table":
        table_name = arguments["table_name"]
        columns = arguments["columns"]

        column_defs = [f"{col_name} {col_type}"
                       for col_name, col_type in columns.items()]

        sql = f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                {', '.join(column_defs)}
            )
        """

        cursor.execute(sql)
        conn.commit()

        return [TextContent(
            type="text",
            text=f"Table '{table_name}' created successfully"
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

PostgreSQL MCP 서버

Python postgres_server.py
import asyncio
import json
from typing import Any, List, Dict, Optional

import asyncpg

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("postgres-server")

# 연결 풀
pool: Optional[asyncpg.Pool] = None

async def get_pool(dsn: str) -> asyncpg.Pool:
    """PostgreSQL 연결 풀 가져오기"""
    global pool
    if pool is None:
        pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
    return pool

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="pg_query",
            description="PostgreSQL SELECT 쿼리를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dsn": {
                        "type": "string",
                        "description": "PostgreSQL 연결 문자열"
                    },
                    "sql": {"type": "string"},
                    "params": {
                        "type": "array",
                        "items": {}
                    }
                },
                "required": ["dsn", "sql"]
            }
        ),
        Tool(
            name="pg_execute",
            description="PostgreSQL INSERT/UPDATE/DELETE를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dsn": {"type": "string"},
                    "sql": {"type": "string"},
                    "params": {"type": "array"}
                },
                "required": ["dsn", "sql"]
            }
        ),
        Tool(
            name="pg_get_tables",
            description="데이터베이스의 모든 테이블 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dsn": {"type": "string"},
                    "schema": {
                        "type": "string",
                        "description": "스키마 이름 (기본: public)",
                        "default": "public"
                    }
                },
                "required": ["dsn"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    dsn = arguments["dsn"]
    pool = await get_pool(dsn)

    if name == "pg_query":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        async with pool.acquire() as conn:
            rows = await conn.fetch(sql, *params)
            result = [dict(row) for row in rows]

            return [TextContent(
                type="text",
                text=json.dumps(result, indent=2, default=str)
            )]

    elif name == "pg_execute":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        async with pool.acquire() as conn:
            status = await conn.execute(sql, *params)

            return [TextContent(
                type="text",
                text=f"Executed: {status}"
            )]

    elif name == "pg_get_tables":
        schema = arguments.get("schema", "public")

        async with pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT tablename, schemaname
                FROM pg_tables
                WHERE schemaname = 변동
                ORDER BY tablename
            """, schema)

            tables = [dict(row) for row in rows]

            return [TextContent(
                type="text",
                text=json.dumps(tables, indent=2)
            )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

데이터베이스 클라이언트 예제

Python database_client_example.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def main():
    # SQLite 서버에 연결
    server_params = StdioServerParameters(
        command="python",
        args=["sqlite_server.py"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            db_path = "./test.db"

            # 테이블 생성
            print("Creating table...")
            result = await session.call_tool("create_table", {
                "db_path": db_path,
                "table_name": "users",
                "columns": {
                    "id": "INTEGER PRIMARY KEY",
                    "name": "TEXT NOT NULL",
                    "email": "TEXT UNIQUE",
                    "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
                }
            })
            print(result.content[0].text)

            # 데이터 삽입
            print("\nInserting data...")
            result = await session.call_tool("execute", {
                "db_path": db_path,
                "sql": "INSERT INTO users (name, email) VALUES (?, ?)",
                "params": ["Alice", "alice@example.com"]
            })
            print(result.content[0].text)

            # 데이터 조회
            print("\nQuerying data...")
            result = await session.call_tool("query", {
                "db_path": db_path,
                "sql": "SELECT * FROM users"
            })
            print(result.content[0].text)

            # 스키마 조회
            print("\nGetting schema...")
            result = await session.call_tool("get_schema", {
                "db_path": db_path,
                "table_name": "users"
            })
            print(result.content[0].text)

if __name__ == "__main__":
    asyncio.run(main())

GitHub API 서버

GitHub API를 MCP 서버로 래핑하여 이슈, PR, 리포지토리 관리 기능을 제공합니다.

Python github_server.py
import asyncio
import os
import json
from typing import Optional

import httpx

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("github-server")

# GitHub API 클라이언트
class GitHubClient:
    def __init__(self, token: str):
        self.token = token
        self.base_url = "https://api.github.com"
        self.client = httpx.AsyncClient(
            headers={
                "Authorization": f"token {token}",
                "Accept": "application/vnd.github.v3+json"
            },
            timeout=30.0
        )

    async def get(self, path: str, params: dict = None) -> dict:
        response = await self.client.get(f"{self.base_url}{path}", params=params)
        response.raise_for_status()
        return response.json()

    async def post(self, path: str, data: dict) -> dict:
        response = await self.client.post(f"{self.base_url}{path}", json=data)
        response.raise_for_status()
        return response.json()

    async def patch(self, path: str, data: dict) -> dict:
        response = await self.client.patch(f"{self.base_url}{path}", json=data)
        response.raise_for_status()
        return response.json()

# 글로벌 클라이언트 인스턴스
github_client: Optional[GitHubClient] = None

def get_github_client() -> GitHubClient:
    global github_client
    if github_client is None:
        token = os.getenv("GITHUB_TOKEN")
        if not token:
            raise ValueError("GITHUB_TOKEN environment variable not set")
        github_client = GitHubClient(token)
    return github_client

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="list_issues",
            description="리포지토리의 이슈 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string", "description": "리포지토리 소유자"},
                    "repo": {"type": "string", "description": "리포지토리 이름"},
                    "state": {
                        "type": "string",
                        "enum": ["open", "closed", "all"],
                        "default": "open"
                    },
                    "labels": {
                        "type": "array",
                        "items": {"type": "string"}
                    }
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="create_issue",
            description="새 이슈를 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string"},
                    "repo": {"type": "string"},
                    "title": {"type": "string"},
                    "body": {"type": "string"},
                    "labels": {"type": "array", "items": {"type": "string"}},
                    "assignees": {"type": "array", "items": {"type": "string"}}
                },
                "required": ["owner", "repo", "title"]
            }
        ),
        Tool(
            name="list_pull_requests",
            description="리포지토리의 Pull Request 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string"},
                    "repo": {"type": "string"},
                    "state": {"type": "string", "enum": ["open", "closed", "all"]}
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="get_repository",
            description="리포지토리 정보를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string"},
                    "repo": {"type": "string"}
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="search_code",
            description="코드를 검색합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "검색 쿼리"},
                    "repo": {"type": "string", "description": "owner/repo 형식"}
                },
                "required": ["query"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    client = get_github_client()

    if name == "list_issues":
        owner = arguments["owner"]
        repo = arguments["repo"]
        state = arguments.get("state", "open")
        labels = arguments.get("labels", [])

        params = {"state": state}
        if labels:
            params["labels"] = ",".join(labels)

        issues = await client.get(f"/repos/{owner}/{repo}/issues", params)

        # 간소화된 정보만 추출
        simplified = [{
            "number": issue["number"],
            "title": issue["title"],
            "state": issue["state"],
            "user": issue["user"]["login"],
            "labels": [l["name"] for l in issue["labels"]],
            "created_at": issue["created_at"],
            "url": issue["html_url"]
        } for issue in issues]

        return [TextContent(
            type="text",
            text=json.dumps(simplified, indent=2)
        )]

    elif name == "create_issue":
        owner = arguments["owner"]
        repo = arguments["repo"]

        data = {
            "title": arguments["title"],
            "body": arguments.get("body", ""),
        }

        if "labels" in arguments:
            data["labels"] = arguments["labels"]
        if "assignees" in arguments:
            data["assignees"] = arguments["assignees"]

        issue = await client.post(f"/repos/{owner}/{repo}/issues", data)

        return [TextContent(
            type="text",
            text=f"Created issue #{issue['number']}: {issue['html_url']}"
        )]

    elif name == "list_pull_requests":
        owner = arguments["owner"]
        repo = arguments["repo"]
        state = arguments.get("state", "open")

        prs = await client.get(
            f"/repos/{owner}/{repo}/pulls",
            params={"state": state}
        )

        simplified = [{
            "number": pr["number"],
            "title": pr["title"],
            "state": pr["state"],
            "user": pr["user"]["login"],
            "head": pr["head"]["ref"],
            "base": pr["base"]["ref"],
            "url": pr["html_url"]
        } for pr in prs]

        return [TextContent(
            type="text",
            text=json.dumps(simplified, indent=2)
        )]

    elif name == "get_repository":
        owner = arguments["owner"]
        repo = arguments["repo"]

        repo_data = await client.get(f"/repos/{owner}/{repo}")

        info = {
            "name": repo_data["name"],
            "full_name": repo_data["full_name"],
            "description": repo_data["description"],
            "stars": repo_data["stargazers_count"],
            "forks": repo_data["forks_count"],
            "language": repo_data["language"],
            "default_branch": repo_data["default_branch"],
            "url": repo_data["html_url"]
        }

        return [TextContent(
            type="text",
            text=json.dumps(info, indent=2)
        )]

    elif name == "search_code":
        query = arguments["query"]

        if "repo" in arguments:
            query += f" repo:{arguments['repo']}"

        results = await client.get("/search/code", params={"q": query})

        items = [{
            "name": item["name"],
            "path": item["path"],
            "repository": item["repository"]["full_name"],
            "url": item["html_url"]
        } for item in results["items"][:  10]]  # 최대 10개만

        return [TextContent(
            type="text",
            text=json.dumps({
                "total_count": results["total_count"],
                "items": items
            }, indent=2)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

MCP 데이터 흐름 다이어그램

MCP 서버는 다양한 데이터 소스(파일시스템, 데이터베이스, 외부 API)에서 데이터를 가져와 LLM 클라이언트에 전달합니다. 아래 다이어그램은 각 서버 유형별 데이터 파이프라인의 전체 흐름을 시각화합니다.

데이터 소스 파일시스템 로컬 파일/디렉토리 데이터베이스 SQLite / PostgreSQL 외부 API GitHub / Slack / Web 벡터 DB ChromaDB 임베딩 웹 콘텐츠 HTML / 스크래핑 MCP 서버 (도구 제공) filesystem-server database-server github / slack-server rag-server web-scraper-server orchestrator 멀티 서버 조합 MCP 클라이언트 / LLM Claude Desktop claude_desktop_config.json 커스텀 클라이언트 Python / TypeScript SDK LLM (Claude) 도구 호출 결정/실행 네이티브 I/O JSON-RPC (stdio/SSE)
데이터 흐름 핵심 포인트: 각 MCP 서버는 특정 데이터 소스에 대한 네이티브 접근을 캡슐화하고, 표준 JSON-RPC 프로토콜을 통해 클라이언트에 도구를 노출합니다. 오케스트레이터는 여러 서버를 조합하여 복잡한 워크플로우를 구현합니다.

Slack Bot MCP 서버

Slack Bot Token을 기반으로 채널 목록 조회, 메시지 전송, 메시지 검색 기능을 제공하는 MCP 서버입니다. httpx 비동기 클라이언트를 사용하여 Slack Web API와 통신합니다.

사전 준비: Slack App을 생성하고 Bot Token(xoxb-...)을 발급받아야 합니다. 필요한 OAuth 스코프: channels:read, chat:write, search:read, groups:read
Python slack_server.py
import asyncio
import os
import json
from typing import Optional

import httpx

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("slack-server")

# Slack API 클라이언트
class SlackClient:
    def __init__(self, token: str):
        self.token = token
        self.base_url = "https://slack.com/api"
        self.client = httpx.AsyncClient(
            headers={
                "Authorization": f"Bearer {token}",
                "Content-Type": "application/json; charset=utf-8",
            },
            timeout=30.0
        )

    async def api_call(
        self, method: str, params: dict = None
    ) -> dict:
        """Slack Web API 호출"""
        url = f"{self.base_url}/{method}"
        response = await self.client.post(url, json=params or {})
        response.raise_for_status()
        data = response.json()
        if not data.get("ok"):
            raise ValueError(
                f"Slack API error: {data.get('error', 'unknown')}"
            )
        return data

# 글로벌 클라이언트
slack_client: Optional[SlackClient] = None

def get_slack_client() -> SlackClient:
    global slack_client
    if slack_client is None:
        token = os.getenv("SLACK_BOT_TOKEN")
        if not token:
            raise ValueError(
                "SLACK_BOT_TOKEN 환경변수가 설정되지 않았습니다"
            )
        slack_client = SlackClient(token)
    return slack_client

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="list_channels",
            description="Slack 워크스페이스의 채널 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "limit": {
                        "type": "integer",
                        "description": "조회할 채널 수 (기본: 100)",
                        "default": 100
                    },
                    "types": {
                        "type": "string",
                        "description": "채널 유형 (public_channel, private_channel)",
                        "default": "public_channel"
                    }
                }
            }
        ),
        Tool(
            name="send_message",
            description="Slack 채널에 메시지를 전송합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "channel": {
                        "type": "string",
                        "description": "채널 ID 또는 이름 (예: C01234567)"
                    },
                    "text": {
                        "type": "string",
                        "description": "전송할 메시지 텍스트"
                    },
                    "thread_ts": {
                        "type": "string",
                        "description": "스레드 타임스탬프 (스레드 답장 시)"
                    }
                },
                "required": ["channel", "text"]
            }
        ),
        Tool(
            name="search_messages",
            description="Slack 메시지를 검색합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "검색 쿼리"
                    },
                    "count": {
                        "type": "integer",
                        "description": "결과 수 (기본: 20)",
                        "default": 20
                    },
                    "sort": {
                        "type": "string",
                        "enum": ["score", "timestamp"],
                        "default": "score"
                    }
                },
                "required": ["query"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    client = get_slack_client()

    if name == "list_channels":
        limit = arguments.get("limit", 100)
        types = arguments.get("types", "public_channel")

        data = await client.api_call("conversations.list", {
            "limit": limit,
            "types": types
        })

        channels = [{
            "id": ch["id"],
            "name": ch["name"],
            "topic": ch.get("topic", {}).get("value", ""),
            "num_members": ch.get("num_members", 0),
            "is_archived": ch.get("is_archived", False)
        } for ch in data["channels"]]

        return [TextContent(
            type="text",
            text=json.dumps(channels, indent=2, ensure_ascii=False)
        )]

    elif name == "send_message":
        channel = arguments["channel"]
        text = arguments["text"]
        thread_ts = arguments.get("thread_ts")

        params = {"channel": channel, "text": text}
        if thread_ts:
            params["thread_ts"] = thread_ts

        data = await client.api_call("chat.postMessage", params)

        return [TextContent(
            type="text",
            text=json.dumps({
                "ok": True,
                "channel": data["channel"],
                "ts": data["ts"],
                "message": data["message"]["text"]
            }, indent=2)
        )]

    elif name == "search_messages":
        query = arguments["query"]
        count = arguments.get("count", 20)
        sort = arguments.get("sort", "score")

        data = await client.api_call("search.messages", {
            "query": query,
            "count": count,
            "sort": sort
        })

        matches = data.get("messages", {}).get("matches", [])
        results = [{
            "text": m["text"][:200],
            "user": m.get("username", "unknown"),
            "channel": m.get("channel", {}).get("name", ""),
            "ts": m["ts"],
            "permalink": m.get("permalink", "")
        } for m in matches[:10]]

        return [TextContent(
            type="text",
            text=json.dumps({
                "total": data.get("messages", {}).get("total", 0),
                "results": results
            }, indent=2, ensure_ascii=False)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Claude Desktop 설정

JSON claude_desktop_config.json
{
  "mcpServers": {
    "slack": {
      "command": "python",
      "args": ["slack_server.py"],
      "env": {
        "SLACK_BOT_TOKEN": "xoxb-your-bot-token-here"
      }
    }
  }
}
활용 시나리오: Claude Desktop에서 "engineering 채널에 배포 완료 메시지 보내줘" 또는 "지난주 장애 관련 메시지 검색해줘"와 같은 자연어 요청으로 Slack을 제어할 수 있습니다.

RAG 시스템 MCP 서버

벡터 검색 기반 RAG(Retrieval-Augmented Generation) 서버입니다. chromadb를 사용하여 문서를 임베딩하고 저장하며, 유사도 검색을 통해 관련 문서를 검색한 뒤 LLM에 컨텍스트를 제공합니다.

의존성 설치: pip install chromadb mcp 명령으로 필요한 패키지를 설치하세요. ChromaDB는 기본 임베딩 모델(all-MiniLM-L6-v2)을 자동으로 다운로드합니다.
Python rag_server.py
import asyncio
import json
import uuid
from typing import List, Optional

import chromadb
from chromadb.config import Settings

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("rag-server")

# ChromaDB 클라이언트 초기화 (영속 저장소)
chroma_client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./chroma_data",
    anonymized_telemetry=False
))

def get_or_create_collection(name: str = "documents"):
    """컬렉션 가져오기 또는 생성"""
    return chroma_client.get_or_create_collection(
        name=name,
        metadata={"hnsw:space": "cosine"}
    )

def chunk_text(
    text: str,
    chunk_size: int = 500,
    overlap: int = 50
) -> List[str]:
    """텍스트를 청크 단위로 분할 (오버랩 포함)"""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunk = text[start:end]

        # 문장 경계에서 자르기 시도
        if end < len(text):
            # 마지막 마침표/줄바꿈 위치 탐색
            last_period = chunk.rfind(".")
            last_newline = chunk.rfind("\n")
            split_pos = max(last_period, last_newline)

            if split_pos > chunk_size * 0.3:
                chunk = chunk[:split_pos + 1]
                end = start + split_pos + 1

        chunks.append(chunk.strip())
        start = end - overlap

    return [c for c in chunks if c]  # 빈 청크 제거

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="add_document",
            description="문서를 청크로 분할하여 벡터 DB에 추가합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "content": {
                        "type": "string",
                        "description": "문서 내용"
                    },
                    "metadata": {
                        "type": "object",
                        "description": "문서 메타데이터 (title, source 등)"
                    },
                    "collection": {
                        "type": "string",
                        "description": "컬렉션 이름 (기본: documents)",
                        "default": "documents"
                    },
                    "chunk_size": {
                        "type": "integer",
                        "description": "청크 크기 (기본: 500자)",
                        "default": 500
                    }
                },
                "required": ["content"]
            }
        ),
        Tool(
            name="search",
            description="벡터 유사도 검색으로 관련 문서 청크를 찾습니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "검색 쿼리"
                    },
                    "n_results": {
                        "type": "integer",
                        "description": "반환할 결과 수 (기본: 5)",
                        "default": 5
                    },
                    "collection": {
                        "type": "string",
                        "default": "documents"
                    }
                },
                "required": ["query"]
            }
        ),
        Tool(
            name="ask",
            description="검색 결과를 컨텍스트로 포함하여 질문에 답변할 프롬프트를 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "question": {
                        "type": "string",
                        "description": "사용자 질문"
                    },
                    "n_results": {
                        "type": "integer",
                        "default": 5
                    },
                    "collection": {
                        "type": "string",
                        "default": "documents"
                    }
                },
                "required": ["question"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:

    if name == "add_document":
        content = arguments["content"]
        metadata = arguments.get("metadata", {})
        collection_name = arguments.get("collection", "documents")
        chunk_size = arguments.get("chunk_size", 500)

        collection = get_or_create_collection(collection_name)

        # 텍스트 청크 분할
        chunks = chunk_text(content, chunk_size=chunk_size)
        doc_id = str(uuid.uuid4())[:8]

        # 각 청크를 벡터 DB에 추가
        ids = []
        metadatas = []
        for i, chunk in enumerate(chunks):
            chunk_id = f"{doc_id}-{i}"
            ids.append(chunk_id)
            metadatas.append({
                **metadata,
                "chunk_index": i,
                "total_chunks": len(chunks),
                "doc_id": doc_id
            })

        collection.add(
            documents=chunks,
            ids=ids,
            metadatas=metadatas
        )

        return [TextContent(
            type="text",
            text=json.dumps({
                "doc_id": doc_id,
                "chunks_added": len(chunks),
                "collection": collection_name
            }, indent=2)
        )]

    elif name == "search":
        query = arguments["query"]
        n_results = arguments.get("n_results", 5)
        collection_name = arguments.get("collection", "documents")

        collection = get_or_create_collection(collection_name)

        results = collection.query(
            query_texts=[query],
            n_results=n_results
        )

        items = []
        for i in range(len(results["documents"][0])):
            items.append({
                "text": results["documents"][0][i],
                "distance": results["distances"][0][i],
                "metadata": results["metadatas"][0][i],
                "id": results["ids"][0][i]
            })

        return [TextContent(
            type="text",
            text=json.dumps(items, indent=2, ensure_ascii=False)
        )]

    elif name == "ask":
        question = arguments["question"]
        n_results = arguments.get("n_results", 5)
        collection_name = arguments.get("collection", "documents")

        collection = get_or_create_collection(collection_name)

        # 관련 문서 검색
        results = collection.query(
            query_texts=[question],
            n_results=n_results
        )

        # 컨텍스트 구성
        context_parts = []
        for i, doc in enumerate(results["documents"][0]):
            meta = results["metadatas"][0][i]
            source = meta.get("source", "unknown")
            context_parts.append(
                f"[출처: {source}]\n{doc}"
            )

        context = "\n\n---\n\n".join(context_parts)

        # RAG 프롬프트 생성
        prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.

컨텍스트:
{context}

질문: {question}

답변:"""

        return [TextContent(
            type="text",
            text=json.dumps({
                "prompt": prompt,
                "sources_used": len(results["documents"][0]),
                "context_length": len(context)
            }, indent=2, ensure_ascii=False)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Claude Desktop 설정

JSON claude_desktop_config.json
{
  "mcpServers": {
    "rag": {
      "command": "python",
      "args": ["rag_server.py"]
    }
  }
}
청크 전략: chunk_text 함수는 문장 경계(마침표, 줄바꿈)에서 분할하고 오버랩을 적용합니다. 문서 특성에 따라 chunk_sizeoverlap 값을 조정하세요. 코드 문서는 더 큰 청크(800~1000자), 짧은 FAQ는 작은 청크(200~300자)가 효과적입니다.

웹 스크래핑 MCP 서버

httpxBeautifulSoup를 활용하여 웹 페이지를 가져오고, CSS 셀렉터로 특정 요소를 추출하며, 페이지 내 링크를 수집하는 MCP 서버입니다.

의존성 설치: pip install httpx beautifulsoup4 mcp 명령으로 필요한 패키지를 설치하세요.
Python web_scraper_server.py
import asyncio
import json
from typing import List, Optional
from urllib.parse import urljoin, urlparse

import httpx
from bs4 import BeautifulSoup

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("web-scraper-server")

# 공유 HTTP 클라이언트
http_client = httpx.AsyncClient(
    timeout=30.0,
    follow_redirects=True,
    headers={
        "User-Agent": (
            "Mozilla/5.0 (compatible; MCPBot/1.0)"
        )
    }
)

async def fetch_page(url: str) -> BeautifulSoup:
    """URL에서 HTML을 가져와 파싱합니다."""
    response = await http_client.get(url)
    response.raise_for_status()
    return BeautifulSoup(response.text, "html.parser")

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="fetch_url",
            description="URL의 HTML을 가져와 텍스트를 추출합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "가져올 웹 페이지 URL"
                    },
                    "include_html": {
                        "type": "boolean",
                        "description": "원본 HTML 포함 여부 (기본: false)",
                        "default": false
                    }
                },
                "required": ["url"]
            }
        ),
        Tool(
            name="extract_selector",
            description="CSS 셀렉터로 웹 페이지의 특정 요소를 추출합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "대상 URL"
                    },
                    "selector": {
                        "type": "string",
                        "description": "CSS 셀렉터 (예: h1, .class, #id)"
                    },
                    "attribute": {
                        "type": "string",
                        "description": "추출할 속성 (없으면 텍스트 추출)"
                    },
                    "limit": {
                        "type": "integer",
                        "description": "최대 결과 수 (기본: 50)",
                        "default": 50
                    }
                },
                "required": ["url", "selector"]
            }
        ),
        Tool(
            name="collect_links",
            description="웹 페이지에서 모든 링크를 수집합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "url": {
                        "type": "string",
                        "description": "대상 URL"
                    },
                    "same_domain": {
                        "type": "boolean",
                        "description": "같은 도메인만 필터링 (기본: false)",
                        "default": false
                    },
                    "pattern": {
                        "type": "string",
                        "description": "URL 필터링 패턴 (포함 문자열)"
                    }
                },
                "required": ["url"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:

    if name == "fetch_url":
        url = arguments["url"]
        include_html = arguments.get("include_html", False)

        soup = await fetch_page(url)

        # 불필요한 요소 제거
        for tag in soup.find_all(["script", "style", "nav", "footer"]):
            tag.decompose()

        result = {
            "url": url,
            "title": soup.title.string if soup.title else "",
            "text": soup.get_text(separator="\n", strip=True)[:5000],
            "meta_description": "",
        }

        # 메타 설명 추출
        meta_desc = soup.find("meta", attrs={"name": "description"})
        if meta_desc:
            result["meta_description"] = meta_desc.get("content", "")

        if include_html:
            result["html"] = str(soup)[:10000]

        return [TextContent(
            type="text",
            text=json.dumps(result, indent=2, ensure_ascii=False)
        )]

    elif name == "extract_selector":
        url = arguments["url"]
        selector = arguments["selector"]
        attribute = arguments.get("attribute")
        limit = arguments.get("limit", 50)

        soup = await fetch_page(url)
        elements = soup.select(selector)[:limit]

        results = []
        for el in elements:
            if attribute:
                value = el.get(attribute, "")
            else:
                value = el.get_text(strip=True)
            results.append({
                "tag": el.name,
                "value": value,
                "attributes": dict(el.attrs) if el.attrs else {}
            })

        return [TextContent(
            type="text",
            text=json.dumps({
                "url": url,
                "selector": selector,
                "count": len(results),
                "results": results
            }, indent=2, ensure_ascii=False)
        )]

    elif name == "collect_links":
        url = arguments["url"]
        same_domain = arguments.get("same_domain", False)
        pattern = arguments.get("pattern")

        soup = await fetch_page(url)
        base_domain = urlparse(url).netloc

        links = []
        for a in soup.find_all("a", href=True):
            href = a["href"]
            full_url = urljoin(url, href)
            parsed = urlparse(full_url)

            # 유효한 URL만 수집
            if parsed.scheme not in ("http", "https"):
                continue

            # 같은 도메인 필터
            if same_domain and parsed.netloc != base_domain:
                continue

            # 패턴 필터
            if pattern and pattern not in full_url:
                continue

            links.append({
                "text": a.get_text(strip=True)[:100],
                "url": full_url
            })

        # 중복 URL 제거
        seen = set()
        unique_links = []
        for link in links:
            if link["url"] not in seen:
                seen.add(link["url"])
                unique_links.append(link)

        return [TextContent(
            type="text",
            text=json.dumps({
                "url": url,
                "total_links": len(unique_links),
                "links": unique_links
            }, indent=2, ensure_ascii=False)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

Claude Desktop 설정

JSON claude_desktop_config.json
{
  "mcpServers": {
    "web-scraper": {
      "command": "python",
      "args": ["web_scraper_server.py"]
    }
  }
}
활용 예시: "Python 공식 문서에서 asyncio 관련 내용 가져와줘", "이 뉴스 사이트에서 h2 제목들만 추출해줘", "이 페이지의 모든 외부 링크를 수집해줘" 등의 요청에 활용됩니다.

멀티 서버 오케스트레이션

여러 MCP 서버를 조합하여 복잡한 워크플로우를 자동화하는 오케스트레이터입니다. 아래 예제는 GitHub PR을 분석하고, 관련 코드 파일을 읽은 뒤, 결과를 데이터베이스에 저장하는 전체 파이프라인을 TypeScript로 구현합니다.

TypeScript orchestrator.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

// 서버별 클라이언트 타입 정의
interface ServerConfig {
  name: string;
  command: string;
  args: string[];
  env?: Record<string, string>;
}

// 여러 MCP 서버에 동시 연결 관리
class MCPOrchestrator {
  private clients: Map<string, Client> = new Map();
  private transports: Map<string, StdioClientTransport> = new Map();

  async connect(config: ServerConfig): Promise<void> {
    const client = new Client(
      { name: "orchestrator", version: "1.0.0" },
      { capabilities: { tools: {} } }
    );

    const transport = new StdioClientTransport({
      command: config.command,
      args: config.args,
      env: config.env
    });

    await client.connect(transport);
    this.clients.set(config.name, client);
    this.transports.set(config.name, transport);

    console.log(`[${config.name}] 서버 연결 완료`);
  }

  // 특정 서버의 도구 호출
  async callTool(
    serverName: string,
    toolName: string,
    args: Record<string, unknown>
  ): Promise<string> {
    const client = this.clients.get(serverName);
    if (!client) {
      throw new Error(`서버 '${serverName}'에 연결되지 않았습니다`);
    }

    const result = await client.callTool(toolName, args);
    return (result.content[0] as any).text;
  }

  // 모든 연결 종료
  async disconnect(): Promise<void> {
    for (const [name, client] of this.clients) {
      await client.close();
      console.log(`[${name}] 연결 종료`);
    }
    this.clients.clear();
    this.transports.clear();
  }
}

// ================================================
// 워크플로우: GitHub PR 분석 → 코드 읽기 → DB 저장
// ================================================

async function analyzePRWorkflow(
  orchestrator: MCPOrchestrator,
  owner: string,
  repo: string
) {
  console.log("=== PR 분석 워크플로우 시작 ===");

  // 1단계: GitHub에서 열린 PR 목록 조회
  console.log("\n[1단계] PR 목록 조회...");
  const prsJson = await orchestrator.callTool(
    "github", "list_pull_requests",
    { owner, repo, state: "open" }
  );
  const prs = JSON.parse(prsJson);
  console.log(`  열린 PR ${prs.length}개 발견`);

  // 2단계: 각 PR에 대해 상세 분석
  for (const pr of prs.slice(0, 5)) {
    console.log(`\n[2단계] PR #${pr.number}: ${pr.title}`);

    // GitHub에서 PR의 변경된 파일 목록 가져오기
    const searchResult = await orchestrator.callTool(
      "github", "search_code",
      { query: `repo:${owner}/${repo} ${pr.head}` }
    );

    // 3단계: 파일시스템에서 관련 파일 읽기
    console.log("[3단계] 관련 파일 읽기...");
    let codeContext = "";
    try {
      const fileContent = await orchestrator.callTool(
        "filesystem", "search_files",
        {
          directory: `/home/user/projects/${repo}`,
          pattern: "*.ts"
        }
      );
      codeContext = fileContent;
    } catch (e) {
      console.log("  로컬 파일 접근 불가, 건너뛰기");
    }

    // 4단계: 분석 결과를 DB에 저장
    console.log("[4단계] DB에 결과 저장...");
    const analysis = {
      pr_number: pr.number,
      title: pr.title,
      author: pr.user,
      branch: pr.head,
      code_context_length: codeContext.length,
      analyzed_at: new Date().toISOString()
    };

    await orchestrator.callTool(
      "database", "execute",
      {
        db_path: "./pr_analysis.db",
        sql: `INSERT INTO pr_analyses
              (pr_number, title, author, branch, context_len, analyzed_at)
              VALUES (?, ?, ?, ?, ?, ?)`,
        params: [
          analysis.pr_number,
          analysis.title,
          analysis.author,
          analysis.branch,
          analysis.code_context_length,
          analysis.analyzed_at
        ]
      }
    );

    console.log(`  PR #${pr.number} 분석 결과 저장 완료`);
  }

  // 5단계: 전체 분석 요약 조회
  console.log("\n[5단계] 분석 요약 조회...");
  const summary = await orchestrator.callTool(
    "database", "query",
    {
      db_path: "./pr_analysis.db",
      sql: "SELECT COUNT(*) as total, MAX(analyzed_at) as last_run FROM pr_analyses"
    }
  );
  console.log("  요약:", summary);

  console.log("\n=== 워크플로우 완료 ===");
}

// 메인 실행
async function main() {
  const orchestrator = new MCPOrchestrator();

  try {
    // 각 MCP 서버에 연결
    await orchestrator.connect({
      name: "github",
      command: "python",
      args: ["github_server.py"],
      env: { GITHUB_TOKEN: process.env.GITHUB_TOKEN! }
    });

    await orchestrator.connect({
      name: "filesystem",
      command: "python",
      args: ["filesystem_server.py"]
    });

    await orchestrator.connect({
      name: "database",
      command: "python",
      args: ["sqlite_server.py"]
    });

    // DB 테이블 생성
    await orchestrator.callTool(
      "database", "create_table",
      {
        db_path: "./pr_analysis.db",
        table_name: "pr_analyses",
        columns: {
          id: "INTEGER PRIMARY KEY AUTOINCREMENT",
          pr_number: "INTEGER NOT NULL",
          title: "TEXT NOT NULL",
          author: "TEXT",
          branch: "TEXT",
          context_len: "INTEGER DEFAULT 0",
          analyzed_at: "TEXT NOT NULL"
        }
      }
    );

    // 워크플로우 실행
    await analyzePRWorkflow(
      orchestrator, "owner", "my-repo"
    );

  } finally {
    await orchestrator.disconnect();
  }
}

main().catch(console.error);

멀티 서버 Claude Desktop 설정

Claude Desktop에서 여러 MCP 서버를 동시에 사용하려면 claude_desktop_config.json에 모든 서버를 등록합니다.

JSON claude_desktop_config.json
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["filesystem_server.py"]
    },
    "database": {
      "command": "python",
      "args": ["sqlite_server.py"]
    },
    "github": {
      "command": "python",
      "args": ["github_server.py"],
      "env": {
        "GITHUB_TOKEN": "ghp_your_token_here"
      }
    },
    "slack": {
      "command": "python",
      "args": ["slack_server.py"],
      "env": {
        "SLACK_BOT_TOKEN": "xoxb-your-token"
      }
    },
    "rag": {
      "command": "python",
      "args": ["rag_server.py"]
    },
    "web-scraper": {
      "command": "python",
      "args": ["web_scraper_server.py"]
    }
  }
}
오케스트레이션 패턴: 멀티 서버 조합 시 각 서버의 독립성을 유지하세요. 오케스트레이터는 서버 간 데이터를 전달하는 "접착제" 역할만 수행하고, 각 서버는 자신의 도메인에 집중합니다. 이 패턴은 마이크로서비스 아키텍처와 유사합니다.

핵심 정리

  • MCP 실전 예제의 핵심 개념과 흐름을 정리합니다.
  • 파일 시스템 서버를 단계별로 이해합니다.
  • 실전 적용 시 기준과 주의점을 확인합니다.

실무 팁

  • 입력/출력 예시를 고정해 재현성을 확보하세요.
  • MCP 실전 예제 범위를 작게 잡고 단계적으로 확장하세요.
  • 파일 시스템 서버 조건을 문서화해 대응 시간을 줄이세요.