MCP 실전 예제

실무에서 바로 활용할 수 있는 완전한 MCP 서버 구현 예제를 제공합니다. 파일 시스템, 데이터베이스, API 통합, RAG 시스템 등 다양한 사용 사례를 Python과 TypeScript로 구현하는 방법을 학습하세요.

업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다. 최신 내용은 공식 문서를 확인하세요.
이 페이지에서 배울 내용
  • 파일 시스템 서버 완전 구현 (Python/TypeScript)
  • 데이터베이스 서버 (SQLite, PostgreSQL)
  • GitHub API 통합 서버
  • Slack 봇 MCP 서버
  • RAG 시스템 구현
  • 각 예제마다 서버 + 클라이언트 코드 제공

파일 시스템 서버

Python 구현

파일 시스템 MCP 서버는 가장 기본적이면서도 실용적인 예제입니다. 파일 읽기/쓰기, 디렉토리 탐색, 검색 등의 기능을 제공합니다.

Python filesystem_server.py
import asyncio
import os
import json
from pathlib import Path
from typing import Optional, List

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
    Tool,
    TextContent,
    ImageContent,
    EmbeddedResource,
)

# 서버 인스턴스 생성
server = Server("filesystem-server")

# 허용된 디렉토리 (보안을 위해)
ALLOWED_DIRECTORIES = [
    Path.home() / "Documents",
    Path.home() / "Projects",
]

def is_path_allowed(path: Path) -> bool:
    """경로가 허용된 디렉토리 내에 있는지 확인"""
    path = path.resolve()
    return any(
        path.is_relative_to(allowed_dir)
        for allowed_dir in ALLOWED_DIRECTORIES
    )

@server.list_tools()
async def list_tools() -> list[Tool]:
    """사용 가능한 도구 목록 반환"""
    return [
        Tool(
            name="read_file",
            description="파일 내용을 읽습니다. 텍스트 및 바이너리 파일 지원.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "읽을 파일의 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="write_file",
            description="파일에 내용을 씁니다. 파일이 없으면 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "쓸 파일의 경로"
                    },
                    "content": {
                        "type": "string",
                        "description": "파일에 쓸 내용"
                    }
                },
                "required": ["path", "content"]
            }
        ),
        Tool(
            name="list_directory",
            description="디렉토리의 파일과 하위 디렉토리 목록을 반환합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "나열할 디렉토리 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="search_files",
            description="파일 이름 패턴으로 파일을 검색합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "directory": {
                        "type": "string",
                        "description": "검색할 디렉토리"
                    },
                    "pattern": {
                        "type": "string",
                        "description": "검색 패턴 (예: *.py)"
                    }
                },
                "required": ["directory", "pattern"]
            }
        ),
        Tool(
            name="create_directory",
            description="새 디렉토리를 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "생성할 디렉토리 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="delete_file",
            description="파일 또는 디렉토리를 삭제합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "삭제할 파일/디렉토리 경로"
                    }
                },
                "required": ["path"]
            }
        ),
        Tool(
            name="get_file_info",
            description="파일의 메타데이터(크기, 수정일 등)를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "path": {
                        "type": "string",
                        "description": "정보를 조회할 파일 경로"
                    }
                },
                "required": ["path"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    """도구 실행"""

    if name == "read_file":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if not path.exists():
            raise FileNotFoundError(f"File not found: {path}")

        try:
            content = path.read_text(encoding="utf-8")
            return [TextContent(
                type="text",
                text=content
            )]
        except UnicodeDecodeError:
            # 바이너리 파일인 경우
            size = path.stat().st_size
            return [TextContent(
                type="text",
                text=f"Binary file ({size} bytes)"
            )]

    elif name == "write_file":
        path = Path(arguments["path"])
        content = arguments["content"]

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        path.parent.mkdir(parents=True, exist_ok=True)
        path.write_text(content, encoding="utf-8")

        return [TextContent(
            type="text",
            text=f"Successfully wrote {len(content)} characters to {path}"
        )]

    elif name == "list_directory":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if not path.is_dir():
            raise NotADirectoryError(f"Not a directory: {path}")

        items = []
        for item in sorted(path.iterdir()):
            item_type = "dir" if item.is_dir() else "file"
            size = item.stat().st_size if item.is_file() else 0
            items.append(f"[{item_type}] {item.name} ({size} bytes)")

        return [TextContent(
            type="text",
            text="\n".join(items) if items else "Empty directory"
        )]

    elif name == "search_files":
        directory = Path(arguments["directory"])
        pattern = arguments["pattern"]

        if not is_path_allowed(directory):
            raise ValueError(f"Access denied: {directory}")

        matches = list(directory.rglob(pattern))
        results = [str(match.relative_to(directory)) for match in matches]

        return [TextContent(
            type="text",
            text=f"Found {len(results)} files:\n" + "\n".join(results)
        )]

    elif name == "create_directory":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        path.mkdir(parents=True, exist_ok=True)

        return [TextContent(
            type="text",
            text=f"Created directory: {path}"
        )]

    elif name == "delete_file":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if path.is_file():
            path.unlink()
            return [TextContent(type="text", text=f"Deleted file: {path}")]
        elif path.is_dir():
            import shutil
            shutil.rmtree(path)
            return [TextContent(type="text", text=f"Deleted directory: {path}")]
        else:
            raise FileNotFoundError(f"Not found: {path}")

    elif name == "get_file_info":
        path = Path(arguments["path"])

        if not is_path_allowed(path):
            raise ValueError(f"Access denied: {path}")

        if not path.exists():
            raise FileNotFoundError(f"Not found: {path}")

        stat = path.stat()
        from datetime import datetime

        info = {
            "name": path.name,
            "path": str(path),
            "type": "directory" if path.is_dir() else "file",
            "size": stat.st_size,
            "created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
            "modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
        }

        return [TextContent(
            type="text",
            text=json.dumps(info, indent=2)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    """서버 실행"""
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

클라이언트 사용 예제

TypeScript filesystem_client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

async function main() {
  // 클라이언트 생성
  const client = new Client({
    name: "filesystem-client",
    version: "1.0.0"
  }, {
    capabilities: {
      tools: {}
    }
  });

  // 전송 계층 설정 (Python 서버 실행)
  const transport = new StdioClientTransport({
    command: "python",
    args: ["filesystem_server.py"]
  });

  try {
    // 서버 연결
    await client.connect(transport);
    console.log("✓ Connected to filesystem server");

    // 사용 가능한 도구 목록 조회
    const tools = await client.listTools();
    console.log("\nAvailable tools:");
    tools.tools.forEach(tool => {
      console.log(`  - ${tool.name}: ${tool.description}`);
    });

    // 예제 1: 디렉토리 나열
    console.log("\n--- Listing directory ---");
    const listResult = await client.callTool("list_directory", {
      path: process.env.HOME + "/Documents"
    });
    console.log(listResult.content[0].text);

    // 예제 2: 파일 쓰기
    console.log("\n--- Writing file ---");
    const writeResult = await client.callTool("write_file", {
      path: process.env.HOME + "/Documents/test.txt",
      content: "Hello from MCP!\nThis is a test file."
    });
    console.log(writeResult.content[0].text);

    // 예제 3: 파일 읽기
    console.log("\n--- Reading file ---");
    const readResult = await client.callTool("read_file", {
      path: process.env.HOME + "/Documents/test.txt"
    });
    console.log(readResult.content[0].text);

    // 예제 4: 파일 검색
    console.log("\n--- Searching files ---");
    const searchResult = await client.callTool("search_files", {
      directory: process.env.HOME + "/Documents",
      pattern: "*.txt"
    });
    console.log(searchResult.content[0].text);

    // 예제 5: 파일 정보 조회
    console.log("\n--- File info ---");
    const infoResult = await client.callTool("get_file_info", {
      path: process.env.HOME + "/Documents/test.txt"
    });
    console.log(infoResult.content[0].text);

  } catch (error) {
    console.error("Error:", error);
  } finally {
    await client.close();
    console.log("\n✓ Connection closed");
  }
}

main();

TypeScript 서버 구현

TypeScript filesystem-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
  CallToolRequestSchema,
  ListToolsRequestSchema,
  Tool,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";
import { glob } from "glob";
import os from "os";

// 허용된 디렉토리
const ALLOWED_DIRECTORIES = [
  path.join(os.homedir(), "Documents"),
  path.join(os.homedir(), "Projects"),
];

function isPathAllowed(filePath: string): boolean {
  const resolved = path.resolve(filePath);
  return ALLOWED_DIRECTORIES.some(dir => resolved.startsWith(dir));
}

// 서버 생성
const server = new Server({
  name: "filesystem-server",
  version: "1.0.0"
}, {
  capabilities: {
    tools: {}
  }
});

// 도구 목록 핸들러
server.setRequestHandler(ListToolsRequestSchema, async () => {
  return {
    tools: [
      {
        name: "read_file",
        description: "파일 내용을 읽습니다",
        inputSchema: {
          type: "object",
          properties: {
            path: {
              type: "string",
              description: "읽을 파일 경로"
            }
          },
          required: ["path"]
        }
      },
      {
        name: "write_file",
        description: "파일에 내용을 씁니다",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string", description: "파일 경로" },
            content: { type: "string", description: "파일 내용" }
          },
          required: ["path", "content"]
        }
      },
      {
        name: "list_directory",
        description: "디렉토리 내용을 나열합니다",
        inputSchema: {
          type: "object",
          properties: {
            path: { type: "string", description: "디렉토리 경로" }
          },
          required: ["path"]
        }
      },
      {
        name: "search_files",
        description: "패턴으로 파일을 검색합니다",
        inputSchema: {
          type: "object",
          properties: {
            directory: { type: "string" },
            pattern: { type: "string" }
          },
          required: ["directory", "pattern"]
        }
      }
    ] as Tool[]
  };
});

// 도구 호출 핸들러
server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args } = request.params;

  switch (name) {
    case "read_file": {
      const filePath = args.path as string;

      if (!isPathAllowed(filePath)) {
        throw new Error(`Access denied: ${filePath}`);
      }

      const content = await fs.readFile(filePath, "utf-8");

      return {
        content: [{
          type: "text",
          text: content
        }]
      };
    }

    case "write_file": {
      const filePath = args.path as string;
      const content = args.content as string;

      if (!isPathAllowed(filePath)) {
        throw new Error(`Access denied: ${filePath}`);
      }

      await fs.mkdir(path.dirname(filePath), { recursive: true });
      await fs.writeFile(filePath, content, "utf-8");

      return {
        content: [{
          type: "text",
          text: `Successfully wrote ${content.length} characters to ${filePath}`
        }]
      };
    }

    case "list_directory": {
      const dirPath = args.path as string;

      if (!isPathAllowed(dirPath)) {
        throw new Error(`Access denied: ${dirPath}`);
      }

      const entries = await fs.readdir(dirPath, { withFileTypes: true });
      const items = await Promise.all(
        entries.map(async entry => {
          const fullPath = path.join(dirPath, entry.name);
          const stats = await fs.stat(fullPath);
          const type = entry.isDirectory() ? "dir" : "file";
          return `[${type}] ${entry.name} (${stats.size} bytes)`;
        })
      );

      return {
        content: [{
          type: "text",
          text: items.join("\n")
        }]
      };
    }

    case "search_files": {
      const directory = args.directory as string;
      const pattern = args.pattern as string;

      if (!isPathAllowed(directory)) {
        throw new Error(`Access denied: ${directory}`);
      }

      const matches = await glob(pattern, {
        cwd: directory,
        nodir: false
      });

      return {
        content: [{
          type: "text",
          text: `Found ${matches.length} files:\n${matches.join("\n")}`
        }]
      };
    }

    default:
      throw new Error(`Unknown tool: ${name}`);
  }
});

// 서버 시작
async function main() {
  const transport = new StdioServerTransport();
  await server.connect(transport);
  console.error("Filesystem MCP Server running on stdio");
}

main().catch(console.error);

데이터베이스 서버

SQLite MCP 서버

SQLite 데이터베이스에 접근하는 MCP 서버 구현입니다. 쿼리 실행, 스키마 조회, 트랜잭션 관리 등을 지원합니다.

Python sqlite_server.py
import asyncio
import sqlite3
import json
from typing import Any, List, Dict
from pathlib import Path

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("sqlite-server")

# 데이터베이스 연결 풀
connections: Dict[str, sqlite3.Connection] = {}

def get_connection(db_path: str) -> sqlite3.Connection:
    """데이터베이스 연결 가져오기 (연결 풀 사용)"""
    if db_path not in connections:
        conn = sqlite3.connect(db_path)
        conn.row_factory = sqlite3.Row  # 딕셔너리 형태로 결과 반환
        connections[db_path] = conn
    return connections[db_path]

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="query",
            description="SQL SELECT 쿼리를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {
                        "type": "string",
                        "description": "데이터베이스 파일 경로"
                    },
                    "sql": {
                        "type": "string",
                        "description": "실행할 SELECT 쿼리"
                    },
                    "params": {
                        "type": "array",
                        "description": "쿼리 매개변수 (선택사항)",
                        "items": {"type": ["string", "number", "null"]}
                    }
                },
                "required": ["db_path", "sql"]
            }
        ),
        Tool(
            name="execute",
            description="SQL INSERT/UPDATE/DELETE 쿼리를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {"type": "string"},
                    "sql": {"type": "string"},
                    "params": {
                        "type": "array",
                        "items": {"type": ["string", "number", "null"]}
                    }
                },
                "required": ["db_path", "sql"]
            }
        ),
        Tool(
            name="get_schema",
            description="데이터베이스 스키마를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {"type": "string"},
                    "table_name": {
                        "type": "string",
                        "description": "특정 테이블만 조회 (선택사항)"
                    }
                },
                "required": ["db_path"]
            }
        ),
        Tool(
            name="create_table",
            description="새 테이블을 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "db_path": {"type": "string"},
                    "table_name": {"type": "string"},
                    "columns": {
                        "type": "object",
                        "description": "컬럼명: 타입 형식"
                    }
                },
                "required": ["db_path", "table_name", "columns"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    db_path = arguments["db_path"]
    conn = get_connection(db_path)
    cursor = conn.cursor()

    if name == "query":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        # SELECT만 허용
        if not sql.strip().upper().startswith("SELECT"):
            raise ValueError("Only SELECT queries are allowed in query()")

        cursor.execute(sql, params)
        rows = cursor.fetchall()

        # Row 객체를 딕셔너리로 변환
        result = [dict(row) for row in rows]

        return [TextContent(
            type="text",
            text=json.dumps(result, indent=2, ensure_ascii=False)
        )]

    elif name == "execute":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        cursor.execute(sql, params)
        conn.commit()

        return [TextContent(
            type="text",
            text=f"Executed successfully. Rows affected: {cursor.rowcount}"
        )]

    elif name == "get_schema":
        table_name = arguments.get("table_name")

        if table_name:
            # 특정 테이블 스키마
            cursor.execute(f"PRAGMA table_info({table_name})")
            columns = cursor.fetchall()
            schema = {
                "table": table_name,
                "columns": [dict(col) for col in columns]
            }
        else:
            # 전체 데이터베이스 스키마
            cursor.execute("""
                SELECT name, type, sql
                FROM sqlite_master
                WHERE type IN ('table', 'view')
                ORDER BY name
            """)
            objects = cursor.fetchall()
            schema = {
                "database": db_path,
                "objects": [dict(obj) for obj in objects]
            }

        return [TextContent(
            type="text",
            text=json.dumps(schema, indent=2)
        )]

    elif name == "create_table":
        table_name = arguments["table_name"]
        columns = arguments["columns"]

        column_defs = [f"{col_name} {col_type}"
                       for col_name, col_type in columns.items()]

        sql = f"""
            CREATE TABLE IF NOT EXISTS {table_name} (
                {', '.join(column_defs)}
            )
        """

        cursor.execute(sql)
        conn.commit()

        return [TextContent(
            type="text",
            text=f"Table '{table_name}' created successfully"
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

PostgreSQL MCP 서버

Python postgres_server.py
import asyncio
import json
from typing import Any, List, Dict, Optional

import asyncpg

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("postgres-server")

# 연결 풀
pool: Optional[asyncpg.Pool] = None

async def get_pool(dsn: str) -> asyncpg.Pool:
    """PostgreSQL 연결 풀 가져오기"""
    global pool
    if pool is None:
        pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
    return pool

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="pg_query",
            description="PostgreSQL SELECT 쿼리를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dsn": {
                        "type": "string",
                        "description": "PostgreSQL 연결 문자열"
                    },
                    "sql": {"type": "string"},
                    "params": {
                        "type": "array",
                        "items": {}
                    }
                },
                "required": ["dsn", "sql"]
            }
        ),
        Tool(
            name="pg_execute",
            description="PostgreSQL INSERT/UPDATE/DELETE를 실행합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dsn": {"type": "string"},
                    "sql": {"type": "string"},
                    "params": {"type": "array"}
                },
                "required": ["dsn", "sql"]
            }
        ),
        Tool(
            name="pg_get_tables",
            description="데이터베이스의 모든 테이블 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "dsn": {"type": "string"},
                    "schema": {
                        "type": "string",
                        "description": "스키마 이름 (기본: public)",
                        "default": "public"
                    }
                },
                "required": ["dsn"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    dsn = arguments["dsn"]
    pool = await get_pool(dsn)

    if name == "pg_query":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        async with pool.acquire() as conn:
            rows = await conn.fetch(sql, *params)
            result = [dict(row) for row in rows]

            return [TextContent(
                type="text",
                text=json.dumps(result, indent=2, default=str)
            )]

    elif name == "pg_execute":
        sql = arguments["sql"]
        params = arguments.get("params", [])

        async with pool.acquire() as conn:
            status = await conn.execute(sql, *params)

            return [TextContent(
                type="text",
                text=f"Executed: {status}"
            )]

    elif name == "pg_get_tables":
        schema = arguments.get("schema", "public")

        async with pool.acquire() as conn:
            rows = await conn.fetch("""
                SELECT tablename, schemaname
                FROM pg_tables
                WHERE schemaname = 변동
                ORDER BY tablename
            """, schema)

            tables = [dict(row) for row in rows]

            return [TextContent(
                type="text",
                text=json.dumps(tables, indent=2)
            )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

데이터베이스 클라이언트 예제

Python database_client_example.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

async def main():
    # SQLite 서버에 연결
    server_params = StdioServerParameters(
        command="python",
        args=["sqlite_server.py"]
    )

    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            db_path = "./test.db"

            # 테이블 생성
            print("Creating table...")
            result = await session.call_tool("create_table", {
                "db_path": db_path,
                "table_name": "users",
                "columns": {
                    "id": "INTEGER PRIMARY KEY",
                    "name": "TEXT NOT NULL",
                    "email": "TEXT UNIQUE",
                    "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
                }
            })
            print(result.content[0].text)

            # 데이터 삽입
            print("\nInserting data...")
            result = await session.call_tool("execute", {
                "db_path": db_path,
                "sql": "INSERT INTO users (name, email) VALUES (?, ?)",
                "params": ["Alice", "alice@example.com"]
            })
            print(result.content[0].text)

            # 데이터 조회
            print("\nQuerying data...")
            result = await session.call_tool("query", {
                "db_path": db_path,
                "sql": "SELECT * FROM users"
            })
            print(result.content[0].text)

            # 스키마 조회
            print("\nGetting schema...")
            result = await session.call_tool("get_schema", {
                "db_path": db_path,
                "table_name": "users"
            })
            print(result.content[0].text)

if __name__ == "__main__":
    asyncio.run(main())

GitHub API 서버

GitHub API를 MCP 서버로 래핑하여 이슈, PR, 리포지토리 관리 기능을 제공합니다.

Python github_server.py
import asyncio
import os
import json
from typing import Optional

import httpx

from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent

server = Server("github-server")

# GitHub API 클라이언트
class GitHubClient:
    def __init__(self, token: str):
        self.token = token
        self.base_url = "https://api.github.com"
        self.client = httpx.AsyncClient(
            headers={
                "Authorization": f"token {token}",
                "Accept": "application/vnd.github.v3+json"
            },
            timeout=30.0
        )

    async def get(self, path: str, params: dict = None) -> dict:
        response = await self.client.get(f"{self.base_url}{path}", params=params)
        response.raise_for_status()
        return response.json()

    async def post(self, path: str, data: dict) -> dict:
        response = await self.client.post(f"{self.base_url}{path}", json=data)
        response.raise_for_status()
        return response.json()

    async def patch(self, path: str, data: dict) -> dict:
        response = await self.client.patch(f"{self.base_url}{path}", json=data)
        response.raise_for_status()
        return response.json()

# 글로벌 클라이언트 인스턴스
github_client: Optional[GitHubClient] = None

def get_github_client() -> GitHubClient:
    global github_client
    if github_client is None:
        token = os.getenv("GITHUB_TOKEN")
        if not token:
            raise ValueError("GITHUB_TOKEN environment variable not set")
        github_client = GitHubClient(token)
    return github_client

@server.list_tools()
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="list_issues",
            description="리포지토리의 이슈 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string", "description": "리포지토리 소유자"},
                    "repo": {"type": "string", "description": "리포지토리 이름"},
                    "state": {
                        "type": "string",
                        "enum": ["open", "closed", "all"],
                        "default": "open"
                    },
                    "labels": {
                        "type": "array",
                        "items": {"type": "string"}
                    }
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="create_issue",
            description="새 이슈를 생성합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string"},
                    "repo": {"type": "string"},
                    "title": {"type": "string"},
                    "body": {"type": "string"},
                    "labels": {"type": "array", "items": {"type": "string"}},
                    "assignees": {"type": "array", "items": {"type": "string"}}
                },
                "required": ["owner", "repo", "title"]
            }
        ),
        Tool(
            name="list_pull_requests",
            description="리포지토리의 Pull Request 목록을 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string"},
                    "repo": {"type": "string"},
                    "state": {"type": "string", "enum": ["open", "closed", "all"]}
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="get_repository",
            description="리포지토리 정보를 조회합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "owner": {"type": "string"},
                    "repo": {"type": "string"}
                },
                "required": ["owner", "repo"]
            }
        ),
        Tool(
            name="search_code",
            description="코드를 검색합니다.",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "검색 쿼리"},
                    "repo": {"type": "string", "description": "owner/repo 형식"}
                },
                "required": ["query"]
            }
        ),
    ]

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    client = get_github_client()

    if name == "list_issues":
        owner = arguments["owner"]
        repo = arguments["repo"]
        state = arguments.get("state", "open")
        labels = arguments.get("labels", [])

        params = {"state": state}
        if labels:
            params["labels"] = ",".join(labels)

        issues = await client.get(f"/repos/{owner}/{repo}/issues", params)

        # 간소화된 정보만 추출
        simplified = [{
            "number": issue["number"],
            "title": issue["title"],
            "state": issue["state"],
            "user": issue["user"]["login"],
            "labels": [l["name"] for l in issue["labels"]],
            "created_at": issue["created_at"],
            "url": issue["html_url"]
        } for issue in issues]

        return [TextContent(
            type="text",
            text=json.dumps(simplified, indent=2)
        )]

    elif name == "create_issue":
        owner = arguments["owner"]
        repo = arguments["repo"]

        data = {
            "title": arguments["title"],
            "body": arguments.get("body", ""),
        }

        if "labels" in arguments:
            data["labels"] = arguments["labels"]
        if "assignees" in arguments:
            data["assignees"] = arguments["assignees"]

        issue = await client.post(f"/repos/{owner}/{repo}/issues", data)

        return [TextContent(
            type="text",
            text=f"Created issue #{issue['number']}: {issue['html_url']}"
        )]

    elif name == "list_pull_requests":
        owner = arguments["owner"]
        repo = arguments["repo"]
        state = arguments.get("state", "open")

        prs = await client.get(
            f"/repos/{owner}/{repo}/pulls",
            params={"state": state}
        )

        simplified = [{
            "number": pr["number"],
            "title": pr["title"],
            "state": pr["state"],
            "user": pr["user"]["login"],
            "head": pr["head"]["ref"],
            "base": pr["base"]["ref"],
            "url": pr["html_url"]
        } for pr in prs]

        return [TextContent(
            type="text",
            text=json.dumps(simplified, indent=2)
        )]

    elif name == "get_repository":
        owner = arguments["owner"]
        repo = arguments["repo"]

        repo_data = await client.get(f"/repos/{owner}/{repo}")

        info = {
            "name": repo_data["name"],
            "full_name": repo_data["full_name"],
            "description": repo_data["description"],
            "stars": repo_data["stargazers_count"],
            "forks": repo_data["forks_count"],
            "language": repo_data["language"],
            "default_branch": repo_data["default_branch"],
            "url": repo_data["html_url"]
        }

        return [TextContent(
            type="text",
            text=json.dumps(info, indent=2)
        )]

    elif name == "search_code":
        query = arguments["query"]

        if "repo" in arguments:
            query += f" repo:{arguments['repo']}"

        results = await client.get("/search/code", params={"q": query})

        items = [{
            "name": item["name"],
            "path": item["path"],
            "repository": item["repository"]["full_name"],
            "url": item["html_url"]
        } for item in results["items"][:  10]]  # 최대 10개만

        return [TextContent(
            type="text",
            text=json.dumps({
                "total_count": results["total_count"],
                "items": items
            }, indent=2)
        )]

    raise ValueError(f"Unknown tool: {name}")

async def main():
    async with stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            server.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())

핵심 정리

  • MCP 실전 예제의 핵심 개념과 흐름을 정리합니다.
  • 파일 시스템 서버를 단계별로 이해합니다.
  • 실전 적용 시 기준과 주의점을 확인합니다.

실무 팁

  • 입력/출력 예시를 고정해 재현성을 확보하세요.
  • MCP 실전 예제 범위를 작게 잡고 단계적으로 확장하세요.
  • 파일 시스템 서버 조건을 문서화해 대응 시간을 줄이세요.