MCP 실전 예제
실무에서 바로 활용할 수 있는 완전한 MCP 서버 구현 예제를 제공합니다. 파일 시스템, 데이터베이스, API 통합, RAG 시스템 등 다양한 사용 사례를 Python과 TypeScript로 구현하는 방법을 학습하세요.
업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다.
최신 내용은 공식 문서를 확인하세요.
이 페이지에서 배울 내용
- 파일 시스템 서버 완전 구현 (Python/TypeScript)
- 데이터베이스 서버 (SQLite, PostgreSQL)
- GitHub API 통합 서버
- Slack 봇 MCP 서버
- RAG 시스템 구현
- 각 예제마다 서버 + 클라이언트 코드 제공
파일 시스템 서버
Python 구현
파일 시스템 MCP 서버는 가장 기본적이면서도 실용적인 예제입니다. 파일 읽기/쓰기, 디렉토리 탐색, 검색 등의 기능을 제공합니다.
Python
filesystem_server.py
import asyncio
import os
import json
from pathlib import Path
from typing import Optional, List
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
# 서버 인스턴스 생성
server = Server("filesystem-server")
# 허용된 디렉토리 (보안을 위해)
ALLOWED_DIRECTORIES = [
Path.home() / "Documents",
Path.home() / "Projects",
]
def is_path_allowed(path: Path) -> bool:
"""경로가 허용된 디렉토리 내에 있는지 확인"""
path = path.resolve()
return any(
path.is_relative_to(allowed_dir)
for allowed_dir in ALLOWED_DIRECTORIES
)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""사용 가능한 도구 목록 반환"""
return [
Tool(
name="read_file",
description="파일 내용을 읽습니다. 텍스트 및 바이너리 파일 지원.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "읽을 파일의 경로"
}
},
"required": ["path"]
}
),
Tool(
name="write_file",
description="파일에 내용을 씁니다. 파일이 없으면 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "쓸 파일의 경로"
},
"content": {
"type": "string",
"description": "파일에 쓸 내용"
}
},
"required": ["path", "content"]
}
),
Tool(
name="list_directory",
description="디렉토리의 파일과 하위 디렉토리 목록을 반환합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "나열할 디렉토리 경로"
}
},
"required": ["path"]
}
),
Tool(
name="search_files",
description="파일 이름 패턴으로 파일을 검색합니다.",
inputSchema={
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "검색할 디렉토리"
},
"pattern": {
"type": "string",
"description": "검색 패턴 (예: *.py)"
}
},
"required": ["directory", "pattern"]
}
),
Tool(
name="create_directory",
description="새 디렉토리를 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "생성할 디렉토리 경로"
}
},
"required": ["path"]
}
),
Tool(
name="delete_file",
description="파일 또는 디렉토리를 삭제합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "삭제할 파일/디렉토리 경로"
}
},
"required": ["path"]
}
),
Tool(
name="get_file_info",
description="파일의 메타데이터(크기, 수정일 등)를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "정보를 조회할 파일 경로"
}
},
"required": ["path"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""도구 실행"""
if name == "read_file":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
try:
content = path.read_text(encoding="utf-8")
return [TextContent(
type="text",
text=content
)]
except UnicodeDecodeError:
# 바이너리 파일인 경우
size = path.stat().st_size
return [TextContent(
type="text",
text=f"Binary file ({size} bytes)"
)]
elif name == "write_file":
path = Path(arguments["path"])
content = arguments["content"]
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return [TextContent(
type="text",
text=f"Successfully wrote {len(content)} characters to {path}"
)]
elif name == "list_directory":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if not path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
items = []
for item in sorted(path.iterdir()):
item_type = "dir" if item.is_dir() else "file"
size = item.stat().st_size if item.is_file() else 0
items.append(f"[{item_type}] {item.name} ({size} bytes)")
return [TextContent(
type="text",
text="\n".join(items) if items else "Empty directory"
)]
elif name == "search_files":
directory = Path(arguments["directory"])
pattern = arguments["pattern"]
if not is_path_allowed(directory):
raise ValueError(f"Access denied: {directory}")
matches = list(directory.rglob(pattern))
results = [str(match.relative_to(directory)) for match in matches]
return [TextContent(
type="text",
text=f"Found {len(results)} files:\n" + "\n".join(results)
)]
elif name == "create_directory":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
path.mkdir(parents=True, exist_ok=True)
return [TextContent(
type="text",
text=f"Created directory: {path}"
)]
elif name == "delete_file":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if path.is_file():
path.unlink()
return [TextContent(type="text", text=f"Deleted file: {path}")]
elif path.is_dir():
import shutil
shutil.rmtree(path)
return [TextContent(type="text", text=f"Deleted directory: {path}")]
else:
raise FileNotFoundError(f"Not found: {path}")
elif name == "get_file_info":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if not path.exists():
raise FileNotFoundError(f"Not found: {path}")
stat = path.stat()
from datetime import datetime
info = {
"name": path.name,
"path": str(path),
"type": "directory" if path.is_dir() else "file",
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
}
return [TextContent(
type="text",
text=json.dumps(info, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
"""서버 실행"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
클라이언트 사용 예제
TypeScript
filesystem_client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
async function main() {
// 클라이언트 생성
const client = new Client({
name: "filesystem-client",
version: "1.0.0"
}, {
capabilities: {
tools: {}
}
});
// 전송 계층 설정 (Python 서버 실행)
const transport = new StdioClientTransport({
command: "python",
args: ["filesystem_server.py"]
});
try {
// 서버 연결
await client.connect(transport);
console.log("✓ Connected to filesystem server");
// 사용 가능한 도구 목록 조회
const tools = await client.listTools();
console.log("\nAvailable tools:");
tools.tools.forEach(tool => {
console.log(` - ${tool.name}: ${tool.description}`);
});
// 예제 1: 디렉토리 나열
console.log("\n--- Listing directory ---");
const listResult = await client.callTool("list_directory", {
path: process.env.HOME + "/Documents"
});
console.log(listResult.content[0].text);
// 예제 2: 파일 쓰기
console.log("\n--- Writing file ---");
const writeResult = await client.callTool("write_file", {
path: process.env.HOME + "/Documents/test.txt",
content: "Hello from MCP!\nThis is a test file."
});
console.log(writeResult.content[0].text);
// 예제 3: 파일 읽기
console.log("\n--- Reading file ---");
const readResult = await client.callTool("read_file", {
path: process.env.HOME + "/Documents/test.txt"
});
console.log(readResult.content[0].text);
// 예제 4: 파일 검색
console.log("\n--- Searching files ---");
const searchResult = await client.callTool("search_files", {
directory: process.env.HOME + "/Documents",
pattern: "*.txt"
});
console.log(searchResult.content[0].text);
// 예제 5: 파일 정보 조회
console.log("\n--- File info ---");
const infoResult = await client.callTool("get_file_info", {
path: process.env.HOME + "/Documents/test.txt"
});
console.log(infoResult.content[0].text);
} catch (error) {
console.error("Error:", error);
} finally {
await client.close();
console.log("\n✓ Connection closed");
}
}
main();
TypeScript 서버 구현
TypeScript
filesystem-server.ts
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
Tool,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";
import { glob } from "glob";
import os from "os";
// 허용된 디렉토리
const ALLOWED_DIRECTORIES = [
path.join(os.homedir(), "Documents"),
path.join(os.homedir(), "Projects"),
];
function isPathAllowed(filePath: string): boolean {
const resolved = path.resolve(filePath);
return ALLOWED_DIRECTORIES.some(dir => resolved.startsWith(dir));
}
// 서버 생성
const server = new Server({
name: "filesystem-server",
version: "1.0.0"
}, {
capabilities: {
tools: {}
}
});
// 도구 목록 핸들러
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description: "파일 내용을 읽습니다",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "읽을 파일 경로"
}
},
required: ["path"]
}
},
{
name: "write_file",
description: "파일에 내용을 씁니다",
inputSchema: {
type: "object",
properties: {
path: { type: "string", description: "파일 경로" },
content: { type: "string", description: "파일 내용" }
},
required: ["path", "content"]
}
},
{
name: "list_directory",
description: "디렉토리 내용을 나열합니다",
inputSchema: {
type: "object",
properties: {
path: { type: "string", description: "디렉토리 경로" }
},
required: ["path"]
}
},
{
name: "search_files",
description: "패턴으로 파일을 검색합니다",
inputSchema: {
type: "object",
properties: {
directory: { type: "string" },
pattern: { type: "string" }
},
required: ["directory", "pattern"]
}
}
] as Tool[]
};
});
// 도구 호출 핸들러
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case "read_file": {
const filePath = args.path as string;
if (!isPathAllowed(filePath)) {
throw new Error(`Access denied: ${filePath}`);
}
const content = await fs.readFile(filePath, "utf-8");
return {
content: [{
type: "text",
text: content
}]
};
}
case "write_file": {
const filePath = args.path as string;
const content = args.content as string;
if (!isPathAllowed(filePath)) {
throw new Error(`Access denied: ${filePath}`);
}
await fs.mkdir(path.dirname(filePath), { recursive: true });
await fs.writeFile(filePath, content, "utf-8");
return {
content: [{
type: "text",
text: `Successfully wrote ${content.length} characters to ${filePath}`
}]
};
}
case "list_directory": {
const dirPath = args.path as string;
if (!isPathAllowed(dirPath)) {
throw new Error(`Access denied: ${dirPath}`);
}
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const items = await Promise.all(
entries.map(async entry => {
const fullPath = path.join(dirPath, entry.name);
const stats = await fs.stat(fullPath);
const type = entry.isDirectory() ? "dir" : "file";
return `[${type}] ${entry.name} (${stats.size} bytes)`;
})
);
return {
content: [{
type: "text",
text: items.join("\n")
}]
};
}
case "search_files": {
const directory = args.directory as string;
const pattern = args.pattern as string;
if (!isPathAllowed(directory)) {
throw new Error(`Access denied: ${directory}`);
}
const matches = await glob(pattern, {
cwd: directory,
nodir: false
});
return {
content: [{
type: "text",
text: `Found ${matches.length} files:\n${matches.join("\n")}`
}]
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}
});
// 서버 시작
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Filesystem MCP Server running on stdio");
}
main().catch(console.error);
데이터베이스 서버
SQLite MCP 서버
SQLite 데이터베이스에 접근하는 MCP 서버 구현입니다. 쿼리 실행, 스키마 조회, 트랜잭션 관리 등을 지원합니다.
Python
sqlite_server.py
import asyncio
import sqlite3
import json
from typing import Any, List, Dict
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("sqlite-server")
# 데이터베이스 연결 풀
connections: Dict[str, sqlite3.Connection] = {}
def get_connection(db_path: str) -> sqlite3.Connection:
"""데이터베이스 연결 가져오기 (연결 풀 사용)"""
if db_path not in connections:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # 딕셔너리 형태로 결과 반환
connections[db_path] = conn
return connections[db_path]
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description="SQL SELECT 쿼리를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "데이터베이스 파일 경로"
},
"sql": {
"type": "string",
"description": "실행할 SELECT 쿼리"
},
"params": {
"type": "array",
"description": "쿼리 매개변수 (선택사항)",
"items": {"type": ["string", "number", "null"]}
}
},
"required": ["db_path", "sql"]
}
),
Tool(
name="execute",
description="SQL INSERT/UPDATE/DELETE 쿼리를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {"type": "string"},
"sql": {"type": "string"},
"params": {
"type": "array",
"items": {"type": ["string", "number", "null"]}
}
},
"required": ["db_path", "sql"]
}
),
Tool(
name="get_schema",
description="데이터베이스 스키마를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {"type": "string"},
"table_name": {
"type": "string",
"description": "특정 테이블만 조회 (선택사항)"
}
},
"required": ["db_path"]
}
),
Tool(
name="create_table",
description="새 테이블을 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {"type": "string"},
"table_name": {"type": "string"},
"columns": {
"type": "object",
"description": "컬럼명: 타입 형식"
}
},
"required": ["db_path", "table_name", "columns"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
db_path = arguments["db_path"]
conn = get_connection(db_path)
cursor = conn.cursor()
if name == "query":
sql = arguments["sql"]
params = arguments.get("params", [])
# SELECT만 허용
if not sql.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed in query()")
cursor.execute(sql, params)
rows = cursor.fetchall()
# Row 객체를 딕셔너리로 변환
result = [dict(row) for row in rows]
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
elif name == "execute":
sql = arguments["sql"]
params = arguments.get("params", [])
cursor.execute(sql, params)
conn.commit()
return [TextContent(
type="text",
text=f"Executed successfully. Rows affected: {cursor.rowcount}"
)]
elif name == "get_schema":
table_name = arguments.get("table_name")
if table_name:
# 특정 테이블 스키마
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
schema = {
"table": table_name,
"columns": [dict(col) for col in columns]
}
else:
# 전체 데이터베이스 스키마
cursor.execute("""
SELECT name, type, sql
FROM sqlite_master
WHERE type IN ('table', 'view')
ORDER BY name
""")
objects = cursor.fetchall()
schema = {
"database": db_path,
"objects": [dict(obj) for obj in objects]
}
return [TextContent(
type="text",
text=json.dumps(schema, indent=2)
)]
elif name == "create_table":
table_name = arguments["table_name"]
columns = arguments["columns"]
column_defs = [f"{col_name} {col_type}"
for col_name, col_type in columns.items()]
sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{', '.join(column_defs)}
)
"""
cursor.execute(sql)
conn.commit()
return [TextContent(
type="text",
text=f"Table '{table_name}' created successfully"
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
PostgreSQL MCP 서버
Python
postgres_server.py
import asyncio
import json
from typing import Any, List, Dict, Optional
import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("postgres-server")
# 연결 풀
pool: Optional[asyncpg.Pool] = None
async def get_pool(dsn: str) -> asyncpg.Pool:
"""PostgreSQL 연결 풀 가져오기"""
global pool
if pool is None:
pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
return pool
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="pg_query",
description="PostgreSQL SELECT 쿼리를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"dsn": {
"type": "string",
"description": "PostgreSQL 연결 문자열"
},
"sql": {"type": "string"},
"params": {
"type": "array",
"items": {}
}
},
"required": ["dsn", "sql"]
}
),
Tool(
name="pg_execute",
description="PostgreSQL INSERT/UPDATE/DELETE를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"dsn": {"type": "string"},
"sql": {"type": "string"},
"params": {"type": "array"}
},
"required": ["dsn", "sql"]
}
),
Tool(
name="pg_get_tables",
description="데이터베이스의 모든 테이블 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"dsn": {"type": "string"},
"schema": {
"type": "string",
"description": "스키마 이름 (기본: public)",
"default": "public"
}
},
"required": ["dsn"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
dsn = arguments["dsn"]
pool = await get_pool(dsn)
if name == "pg_query":
sql = arguments["sql"]
params = arguments.get("params", [])
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
result = [dict(row) for row in rows]
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
elif name == "pg_execute":
sql = arguments["sql"]
params = arguments.get("params", [])
async with pool.acquire() as conn:
status = await conn.execute(sql, *params)
return [TextContent(
type="text",
text=f"Executed: {status}"
)]
elif name == "pg_get_tables":
schema = arguments.get("schema", "public")
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT tablename, schemaname
FROM pg_tables
WHERE schemaname = 변동
ORDER BY tablename
""", schema)
tables = [dict(row) for row in rows]
return [TextContent(
type="text",
text=json.dumps(tables, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
데이터베이스 클라이언트 예제
Python
database_client_example.py
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
# SQLite 서버에 연결
server_params = StdioServerParameters(
command="python",
args=["sqlite_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
db_path = "./test.db"
# 테이블 생성
print("Creating table...")
result = await session.call_tool("create_table", {
"db_path": db_path,
"table_name": "users",
"columns": {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE",
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
}
})
print(result.content[0].text)
# 데이터 삽입
print("\nInserting data...")
result = await session.call_tool("execute", {
"db_path": db_path,
"sql": "INSERT INTO users (name, email) VALUES (?, ?)",
"params": ["Alice", "alice@example.com"]
})
print(result.content[0].text)
# 데이터 조회
print("\nQuerying data...")
result = await session.call_tool("query", {
"db_path": db_path,
"sql": "SELECT * FROM users"
})
print(result.content[0].text)
# 스키마 조회
print("\nGetting schema...")
result = await session.call_tool("get_schema", {
"db_path": db_path,
"table_name": "users"
})
print(result.content[0].text)
if __name__ == "__main__":
asyncio.run(main())
GitHub API 서버
GitHub API를 MCP 서버로 래핑하여 이슈, PR, 리포지토리 관리 기능을 제공합니다.
Python
github_server.py
import asyncio
import os
import json
from typing import Optional
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("github-server")
# GitHub API 클라이언트
class GitHubClient:
def __init__(self, token: str):
self.token = token
self.base_url = "https://api.github.com"
self.client = httpx.AsyncClient(
headers={
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
},
timeout=30.0
)
async def get(self, path: str, params: dict = None) -> dict:
response = await self.client.get(f"{self.base_url}{path}", params=params)
response.raise_for_status()
return response.json()
async def post(self, path: str, data: dict) -> dict:
response = await self.client.post(f"{self.base_url}{path}", json=data)
response.raise_for_status()
return response.json()
async def patch(self, path: str, data: dict) -> dict:
response = await self.client.patch(f"{self.base_url}{path}", json=data)
response.raise_for_status()
return response.json()
# 글로벌 클라이언트 인스턴스
github_client: Optional[GitHubClient] = None
def get_github_client() -> GitHubClient:
global github_client
if github_client is None:
token = os.getenv("GITHUB_TOKEN")
if not token:
raise ValueError("GITHUB_TOKEN environment variable not set")
github_client = GitHubClient(token)
return github_client
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="list_issues",
description="리포지토리의 이슈 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string", "description": "리포지토리 소유자"},
"repo": {"type": "string", "description": "리포지토리 이름"},
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"default": "open"
},
"labels": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["owner", "repo"]
}
),
Tool(
name="create_issue",
description="새 이슈를 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"title": {"type": "string"},
"body": {"type": "string"},
"labels": {"type": "array", "items": {"type": "string"}},
"assignees": {"type": "array", "items": {"type": "string"}}
},
"required": ["owner", "repo", "title"]
}
),
Tool(
name="list_pull_requests",
description="리포지토리의 Pull Request 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"state": {"type": "string", "enum": ["open", "closed", "all"]}
},
"required": ["owner", "repo"]
}
),
Tool(
name="get_repository",
description="리포지토리 정보를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"}
},
"required": ["owner", "repo"]
}
),
Tool(
name="search_code",
description="코드를 검색합니다.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "검색 쿼리"},
"repo": {"type": "string", "description": "owner/repo 형식"}
},
"required": ["query"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
client = get_github_client()
if name == "list_issues":
owner = arguments["owner"]
repo = arguments["repo"]
state = arguments.get("state", "open")
labels = arguments.get("labels", [])
params = {"state": state}
if labels:
params["labels"] = ",".join(labels)
issues = await client.get(f"/repos/{owner}/{repo}/issues", params)
# 간소화된 정보만 추출
simplified = [{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"user": issue["user"]["login"],
"labels": [l["name"] for l in issue["labels"]],
"created_at": issue["created_at"],
"url": issue["html_url"]
} for issue in issues]
return [TextContent(
type="text",
text=json.dumps(simplified, indent=2)
)]
elif name == "create_issue":
owner = arguments["owner"]
repo = arguments["repo"]
data = {
"title": arguments["title"],
"body": arguments.get("body", ""),
}
if "labels" in arguments:
data["labels"] = arguments["labels"]
if "assignees" in arguments:
data["assignees"] = arguments["assignees"]
issue = await client.post(f"/repos/{owner}/{repo}/issues", data)
return [TextContent(
type="text",
text=f"Created issue #{issue['number']}: {issue['html_url']}"
)]
elif name == "list_pull_requests":
owner = arguments["owner"]
repo = arguments["repo"]
state = arguments.get("state", "open")
prs = await client.get(
f"/repos/{owner}/{repo}/pulls",
params={"state": state}
)
simplified = [{
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"user": pr["user"]["login"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"]
} for pr in prs]
return [TextContent(
type="text",
text=json.dumps(simplified, indent=2)
)]
elif name == "get_repository":
owner = arguments["owner"]
repo = arguments["repo"]
repo_data = await client.get(f"/repos/{owner}/{repo}")
info = {
"name": repo_data["name"],
"full_name": repo_data["full_name"],
"description": repo_data["description"],
"stars": repo_data["stargazers_count"],
"forks": repo_data["forks_count"],
"language": repo_data["language"],
"default_branch": repo_data["default_branch"],
"url": repo_data["html_url"]
}
return [TextContent(
type="text",
text=json.dumps(info, indent=2)
)]
elif name == "search_code":
query = arguments["query"]
if "repo" in arguments:
query += f" repo:{arguments['repo']}"
results = await client.get("/search/code", params={"q": query})
items = [{
"name": item["name"],
"path": item["path"],
"repository": item["repository"]["full_name"],
"url": item["html_url"]
} for item in results["items"][: 10]] # 최대 10개만
return [TextContent(
type="text",
text=json.dumps({
"total_count": results["total_count"],
"items": items
}, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
핵심 정리
- MCP 실전 예제의 핵심 개념과 흐름을 정리합니다.
- 파일 시스템 서버를 단계별로 이해합니다.
- 실전 적용 시 기준과 주의점을 확인합니다.
실무 팁
- 입력/출력 예시를 고정해 재현성을 확보하세요.
- MCP 실전 예제 범위를 작게 잡고 단계적으로 확장하세요.
- 파일 시스템 서버 조건을 문서화해 대응 시간을 줄이세요.