MCP 실전 예제
실무에서 바로 활용할 수 있는 완전한 MCP 서버 구현 예제를 제공합니다. 파일 시스템, 데이터베이스, API 통합, RAG 시스템 등 다양한 사용 사례를 Python과 TypeScript로 구현하는 방법을 학습하세요.
- 파일 시스템 서버 완전 구현 (Python/TypeScript)
- 데이터베이스 서버 (SQLite, PostgreSQL)
- GitHub API 통합 서버
- Slack 봇 MCP 서버
- RAG 시스템 구현
- 각 예제마다 서버 + 클라이언트 코드 제공
파일 시스템 서버
Python 구현
파일 시스템 MCP 서버는 가장 기본적이면서도 실용적인 예제입니다. 파일 읽기/쓰기, 디렉토리 탐색, 검색 등의 기능을 제공합니다.
import asyncio
import os
import json
from pathlib import Path
from typing import Optional, List
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import (
Tool,
TextContent,
ImageContent,
EmbeddedResource,
)
# 서버 인스턴스 생성
server = Server("filesystem-server")
# 허용된 디렉토리 (보안을 위해)
ALLOWED_DIRECTORIES = [
Path.home() / "Documents",
Path.home() / "Projects",
]
def is_path_allowed(path: Path) -> bool:
"""경로가 허용된 디렉토리 내에 있는지 확인"""
path = path.resolve()
return any(
path.is_relative_to(allowed_dir)
for allowed_dir in ALLOWED_DIRECTORIES
)
@server.list_tools()
async def list_tools() -> list[Tool]:
"""사용 가능한 도구 목록 반환"""
return [
Tool(
name="read_file",
description="파일 내용을 읽습니다. 텍스트 및 바이너리 파일 지원.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "읽을 파일의 경로"
}
},
"required": ["path"]
}
),
Tool(
name="write_file",
description="파일에 내용을 씁니다. 파일이 없으면 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "쓸 파일의 경로"
},
"content": {
"type": "string",
"description": "파일에 쓸 내용"
}
},
"required": ["path", "content"]
}
),
Tool(
name="list_directory",
description="디렉토리의 파일과 하위 디렉토리 목록을 반환합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "나열할 디렉토리 경로"
}
},
"required": ["path"]
}
),
Tool(
name="search_files",
description="파일 이름 패턴으로 파일을 검색합니다.",
inputSchema={
"type": "object",
"properties": {
"directory": {
"type": "string",
"description": "검색할 디렉토리"
},
"pattern": {
"type": "string",
"description": "검색 패턴 (예: *.py)"
}
},
"required": ["directory", "pattern"]
}
),
Tool(
name="create_directory",
description="새 디렉토리를 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "생성할 디렉토리 경로"
}
},
"required": ["path"]
}
),
Tool(
name="delete_file",
description="파일 또는 디렉토리를 삭제합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "삭제할 파일/디렉토리 경로"
}
},
"required": ["path"]
}
),
Tool(
name="get_file_info",
description="파일의 메타데이터(크기, 수정일 등)를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "정보를 조회할 파일 경로"
}
},
"required": ["path"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
"""도구 실행"""
if name == "read_file":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
try:
content = path.read_text(encoding="utf-8")
return [TextContent(
type="text",
text=content
)]
except UnicodeDecodeError:
# 바이너리 파일인 경우
size = path.stat().st_size
return [TextContent(
type="text",
text=f"Binary file ({size} bytes)"
)]
elif name == "write_file":
path = Path(arguments["path"])
content = arguments["content"]
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
return [TextContent(
type="text",
text=f"Successfully wrote {len(content)} characters to {path}"
)]
elif name == "list_directory":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if not path.is_dir():
raise NotADirectoryError(f"Not a directory: {path}")
items = []
for item in sorted(path.iterdir()):
item_type = "dir" if item.is_dir() else "file"
size = item.stat().st_size if item.is_file() else 0
items.append(f"[{item_type}] {item.name} ({size} bytes)")
return [TextContent(
type="text",
text="\n".join(items) if items else "Empty directory"
)]
elif name == "search_files":
directory = Path(arguments["directory"])
pattern = arguments["pattern"]
if not is_path_allowed(directory):
raise ValueError(f"Access denied: {directory}")
matches = list(directory.rglob(pattern))
results = [str(match.relative_to(directory)) for match in matches]
return [TextContent(
type="text",
text=f"Found {len(results)} files:\n" + "\n".join(results)
)]
elif name == "create_directory":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
path.mkdir(parents=True, exist_ok=True)
return [TextContent(
type="text",
text=f"Created directory: {path}"
)]
elif name == "delete_file":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if path.is_file():
path.unlink()
return [TextContent(type="text", text=f"Deleted file: {path}")]
elif path.is_dir():
import shutil
shutil.rmtree(path)
return [TextContent(type="text", text=f"Deleted directory: {path}")]
else:
raise FileNotFoundError(f"Not found: {path}")
elif name == "get_file_info":
path = Path(arguments["path"])
if not is_path_allowed(path):
raise ValueError(f"Access denied: {path}")
if not path.exists():
raise FileNotFoundError(f"Not found: {path}")
stat = path.stat()
from datetime import datetime
info = {
"name": path.name,
"path": str(path),
"type": "directory" if path.is_dir() else "file",
"size": stat.st_size,
"created": datetime.fromtimestamp(stat.st_ctime).isoformat(),
"modified": datetime.fromtimestamp(stat.st_mtime).isoformat(),
}
return [TextContent(
type="text",
text=json.dumps(info, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
"""서버 실행"""
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
클라이언트 사용 예제
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
async function main() {
// 클라이언트 생성
const client = new Client({
name: "filesystem-client",
version: "1.0.0"
}, {
capabilities: {
tools: {}
}
});
// 전송 계층 설정 (Python 서버 실행)
const transport = new StdioClientTransport({
command: "python",
args: ["filesystem_server.py"]
});
try {
// 서버 연결
await client.connect(transport);
console.log("✓ Connected to filesystem server");
// 사용 가능한 도구 목록 조회
const tools = await client.listTools();
console.log("\nAvailable tools:");
tools.tools.forEach(tool => {
console.log(` - ${tool.name}: ${tool.description}`);
});
// 예제 1: 디렉토리 나열
console.log("\n--- Listing directory ---");
const listResult = await client.callTool("list_directory", {
path: process.env.HOME + "/Documents"
});
console.log(listResult.content[0].text);
// 예제 2: 파일 쓰기
console.log("\n--- Writing file ---");
const writeResult = await client.callTool("write_file", {
path: process.env.HOME + "/Documents/test.txt",
content: "Hello from MCP!\nThis is a test file."
});
console.log(writeResult.content[0].text);
// 예제 3: 파일 읽기
console.log("\n--- Reading file ---");
const readResult = await client.callTool("read_file", {
path: process.env.HOME + "/Documents/test.txt"
});
console.log(readResult.content[0].text);
// 예제 4: 파일 검색
console.log("\n--- Searching files ---");
const searchResult = await client.callTool("search_files", {
directory: process.env.HOME + "/Documents",
pattern: "*.txt"
});
console.log(searchResult.content[0].text);
// 예제 5: 파일 정보 조회
console.log("\n--- File info ---");
const infoResult = await client.callTool("get_file_info", {
path: process.env.HOME + "/Documents/test.txt"
});
console.log(infoResult.content[0].text);
} catch (error) {
console.error("Error:", error);
} finally {
await client.close();
console.log("\n✓ Connection closed");
}
}
main();
TypeScript 서버 구현
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListToolsRequestSchema,
Tool,
} from "@modelcontextprotocol/sdk/types.js";
import * as fs from "fs/promises";
import * as path from "path";
import { glob } from "glob";
import os from "os";
// 허용된 디렉토리
const ALLOWED_DIRECTORIES = [
path.join(os.homedir(), "Documents"),
path.join(os.homedir(), "Projects"),
];
function isPathAllowed(filePath: string): boolean {
const resolved = path.resolve(filePath);
return ALLOWED_DIRECTORIES.some(dir => resolved.startsWith(dir));
}
// 서버 생성
const server = new Server({
name: "filesystem-server",
version: "1.0.0"
}, {
capabilities: {
tools: {}
}
});
// 도구 목록 핸들러
server.setRequestHandler(ListToolsRequestSchema, async () => {
return {
tools: [
{
name: "read_file",
description: "파일 내용을 읽습니다",
inputSchema: {
type: "object",
properties: {
path: {
type: "string",
description: "읽을 파일 경로"
}
},
required: ["path"]
}
},
{
name: "write_file",
description: "파일에 내용을 씁니다",
inputSchema: {
type: "object",
properties: {
path: { type: "string", description: "파일 경로" },
content: { type: "string", description: "파일 내용" }
},
required: ["path", "content"]
}
},
{
name: "list_directory",
description: "디렉토리 내용을 나열합니다",
inputSchema: {
type: "object",
properties: {
path: { type: "string", description: "디렉토리 경로" }
},
required: ["path"]
}
},
{
name: "search_files",
description: "패턴으로 파일을 검색합니다",
inputSchema: {
type: "object",
properties: {
directory: { type: "string" },
pattern: { type: "string" }
},
required: ["directory", "pattern"]
}
}
] as Tool[]
};
});
// 도구 호출 핸들러
server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
switch (name) {
case "read_file": {
const filePath = args.path as string;
if (!isPathAllowed(filePath)) {
throw new Error(`Access denied: ${filePath}`);
}
const content = await fs.readFile(filePath, "utf-8");
return {
content: [{
type: "text",
text: content
}]
};
}
case "write_file": {
const filePath = args.path as string;
const content = args.content as string;
if (!isPathAllowed(filePath)) {
throw new Error(`Access denied: ${filePath}`);
}
await fs.mkdir(path.dirname(filePath), { recursive: true });
await fs.writeFile(filePath, content, "utf-8");
return {
content: [{
type: "text",
text: `Successfully wrote ${content.length} characters to ${filePath}`
}]
};
}
case "list_directory": {
const dirPath = args.path as string;
if (!isPathAllowed(dirPath)) {
throw new Error(`Access denied: ${dirPath}`);
}
const entries = await fs.readdir(dirPath, { withFileTypes: true });
const items = await Promise.all(
entries.map(async entry => {
const fullPath = path.join(dirPath, entry.name);
const stats = await fs.stat(fullPath);
const type = entry.isDirectory() ? "dir" : "file";
return `[${type}] ${entry.name} (${stats.size} bytes)`;
})
);
return {
content: [{
type: "text",
text: items.join("\n")
}]
};
}
case "search_files": {
const directory = args.directory as string;
const pattern = args.pattern as string;
if (!isPathAllowed(directory)) {
throw new Error(`Access denied: ${directory}`);
}
const matches = await glob(pattern, {
cwd: directory,
nodir: false
});
return {
content: [{
type: "text",
text: `Found ${matches.length} files:\n${matches.join("\n")}`
}]
};
}
default:
throw new Error(`Unknown tool: ${name}`);
}
});
// 서버 시작
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
console.error("Filesystem MCP Server running on stdio");
}
main().catch(console.error);
데이터베이스 서버
SQLite MCP 서버
SQLite 데이터베이스에 접근하는 MCP 서버 구현입니다. 쿼리 실행, 스키마 조회, 트랜잭션 관리 등을 지원합니다.
import asyncio
import sqlite3
import json
from typing import Any, List, Dict
from pathlib import Path
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("sqlite-server")
# 데이터베이스 연결 풀
connections: Dict[str, sqlite3.Connection] = {}
def get_connection(db_path: str) -> sqlite3.Connection:
"""데이터베이스 연결 가져오기 (연결 풀 사용)"""
if db_path not in connections:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row # 딕셔너리 형태로 결과 반환
connections[db_path] = conn
return connections[db_path]
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="query",
description="SQL SELECT 쿼리를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {
"type": "string",
"description": "데이터베이스 파일 경로"
},
"sql": {
"type": "string",
"description": "실행할 SELECT 쿼리"
},
"params": {
"type": "array",
"description": "쿼리 매개변수 (선택사항)",
"items": {"type": ["string", "number", "null"]}
}
},
"required": ["db_path", "sql"]
}
),
Tool(
name="execute",
description="SQL INSERT/UPDATE/DELETE 쿼리를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {"type": "string"},
"sql": {"type": "string"},
"params": {
"type": "array",
"items": {"type": ["string", "number", "null"]}
}
},
"required": ["db_path", "sql"]
}
),
Tool(
name="get_schema",
description="데이터베이스 스키마를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {"type": "string"},
"table_name": {
"type": "string",
"description": "특정 테이블만 조회 (선택사항)"
}
},
"required": ["db_path"]
}
),
Tool(
name="create_table",
description="새 테이블을 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"db_path": {"type": "string"},
"table_name": {"type": "string"},
"columns": {
"type": "object",
"description": "컬럼명: 타입 형식"
}
},
"required": ["db_path", "table_name", "columns"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
db_path = arguments["db_path"]
conn = get_connection(db_path)
cursor = conn.cursor()
if name == "query":
sql = arguments["sql"]
params = arguments.get("params", [])
# SELECT만 허용
if not sql.strip().upper().startswith("SELECT"):
raise ValueError("Only SELECT queries are allowed in query()")
cursor.execute(sql, params)
rows = cursor.fetchall()
# Row 객체를 딕셔너리로 변환
result = [dict(row) for row in rows]
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
elif name == "execute":
sql = arguments["sql"]
params = arguments.get("params", [])
cursor.execute(sql, params)
conn.commit()
return [TextContent(
type="text",
text=f"Executed successfully. Rows affected: {cursor.rowcount}"
)]
elif name == "get_schema":
table_name = arguments.get("table_name")
if table_name:
# 특정 테이블 스키마
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
schema = {
"table": table_name,
"columns": [dict(col) for col in columns]
}
else:
# 전체 데이터베이스 스키마
cursor.execute("""
SELECT name, type, sql
FROM sqlite_master
WHERE type IN ('table', 'view')
ORDER BY name
""")
objects = cursor.fetchall()
schema = {
"database": db_path,
"objects": [dict(obj) for obj in objects]
}
return [TextContent(
type="text",
text=json.dumps(schema, indent=2)
)]
elif name == "create_table":
table_name = arguments["table_name"]
columns = arguments["columns"]
column_defs = [f"{col_name} {col_type}"
for col_name, col_type in columns.items()]
sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
{', '.join(column_defs)}
)
"""
cursor.execute(sql)
conn.commit()
return [TextContent(
type="text",
text=f"Table '{table_name}' created successfully"
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
PostgreSQL MCP 서버
import asyncio
import json
from typing import Any, List, Dict, Optional
import asyncpg
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("postgres-server")
# 연결 풀
pool: Optional[asyncpg.Pool] = None
async def get_pool(dsn: str) -> asyncpg.Pool:
"""PostgreSQL 연결 풀 가져오기"""
global pool
if pool is None:
pool = await asyncpg.create_pool(dsn, min_size=2, max_size=10)
return pool
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="pg_query",
description="PostgreSQL SELECT 쿼리를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"dsn": {
"type": "string",
"description": "PostgreSQL 연결 문자열"
},
"sql": {"type": "string"},
"params": {
"type": "array",
"items": {}
}
},
"required": ["dsn", "sql"]
}
),
Tool(
name="pg_execute",
description="PostgreSQL INSERT/UPDATE/DELETE를 실행합니다.",
inputSchema={
"type": "object",
"properties": {
"dsn": {"type": "string"},
"sql": {"type": "string"},
"params": {"type": "array"}
},
"required": ["dsn", "sql"]
}
),
Tool(
name="pg_get_tables",
description="데이터베이스의 모든 테이블 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"dsn": {"type": "string"},
"schema": {
"type": "string",
"description": "스키마 이름 (기본: public)",
"default": "public"
}
},
"required": ["dsn"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
dsn = arguments["dsn"]
pool = await get_pool(dsn)
if name == "pg_query":
sql = arguments["sql"]
params = arguments.get("params", [])
async with pool.acquire() as conn:
rows = await conn.fetch(sql, *params)
result = [dict(row) for row in rows]
return [TextContent(
type="text",
text=json.dumps(result, indent=2, default=str)
)]
elif name == "pg_execute":
sql = arguments["sql"]
params = arguments.get("params", [])
async with pool.acquire() as conn:
status = await conn.execute(sql, *params)
return [TextContent(
type="text",
text=f"Executed: {status}"
)]
elif name == "pg_get_tables":
schema = arguments.get("schema", "public")
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT tablename, schemaname
FROM pg_tables
WHERE schemaname = 변동
ORDER BY tablename
""", schema)
tables = [dict(row) for row in rows]
return [TextContent(
type="text",
text=json.dumps(tables, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
데이터베이스 클라이언트 예제
import asyncio
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def main():
# SQLite 서버에 연결
server_params = StdioServerParameters(
command="python",
args=["sqlite_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
db_path = "./test.db"
# 테이블 생성
print("Creating table...")
result = await session.call_tool("create_table", {
"db_path": db_path,
"table_name": "users",
"columns": {
"id": "INTEGER PRIMARY KEY",
"name": "TEXT NOT NULL",
"email": "TEXT UNIQUE",
"created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
}
})
print(result.content[0].text)
# 데이터 삽입
print("\nInserting data...")
result = await session.call_tool("execute", {
"db_path": db_path,
"sql": "INSERT INTO users (name, email) VALUES (?, ?)",
"params": ["Alice", "alice@example.com"]
})
print(result.content[0].text)
# 데이터 조회
print("\nQuerying data...")
result = await session.call_tool("query", {
"db_path": db_path,
"sql": "SELECT * FROM users"
})
print(result.content[0].text)
# 스키마 조회
print("\nGetting schema...")
result = await session.call_tool("get_schema", {
"db_path": db_path,
"table_name": "users"
})
print(result.content[0].text)
if __name__ == "__main__":
asyncio.run(main())
GitHub API 서버
GitHub API를 MCP 서버로 래핑하여 이슈, PR, 리포지토리 관리 기능을 제공합니다.
import asyncio
import os
import json
from typing import Optional
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("github-server")
# GitHub API 클라이언트
class GitHubClient:
def __init__(self, token: str):
self.token = token
self.base_url = "https://api.github.com"
self.client = httpx.AsyncClient(
headers={
"Authorization": f"token {token}",
"Accept": "application/vnd.github.v3+json"
},
timeout=30.0
)
async def get(self, path: str, params: dict = None) -> dict:
response = await self.client.get(f"{self.base_url}{path}", params=params)
response.raise_for_status()
return response.json()
async def post(self, path: str, data: dict) -> dict:
response = await self.client.post(f"{self.base_url}{path}", json=data)
response.raise_for_status()
return response.json()
async def patch(self, path: str, data: dict) -> dict:
response = await self.client.patch(f"{self.base_url}{path}", json=data)
response.raise_for_status()
return response.json()
# 글로벌 클라이언트 인스턴스
github_client: Optional[GitHubClient] = None
def get_github_client() -> GitHubClient:
global github_client
if github_client is None:
token = os.getenv("GITHUB_TOKEN")
if not token:
raise ValueError("GITHUB_TOKEN environment variable not set")
github_client = GitHubClient(token)
return github_client
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="list_issues",
description="리포지토리의 이슈 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string", "description": "리포지토리 소유자"},
"repo": {"type": "string", "description": "리포지토리 이름"},
"state": {
"type": "string",
"enum": ["open", "closed", "all"],
"default": "open"
},
"labels": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["owner", "repo"]
}
),
Tool(
name="create_issue",
description="새 이슈를 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"title": {"type": "string"},
"body": {"type": "string"},
"labels": {"type": "array", "items": {"type": "string"}},
"assignees": {"type": "array", "items": {"type": "string"}}
},
"required": ["owner", "repo", "title"]
}
),
Tool(
name="list_pull_requests",
description="리포지토리의 Pull Request 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"},
"state": {"type": "string", "enum": ["open", "closed", "all"]}
},
"required": ["owner", "repo"]
}
),
Tool(
name="get_repository",
description="리포지토리 정보를 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"owner": {"type": "string"},
"repo": {"type": "string"}
},
"required": ["owner", "repo"]
}
),
Tool(
name="search_code",
description="코드를 검색합니다.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "검색 쿼리"},
"repo": {"type": "string", "description": "owner/repo 형식"}
},
"required": ["query"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
client = get_github_client()
if name == "list_issues":
owner = arguments["owner"]
repo = arguments["repo"]
state = arguments.get("state", "open")
labels = arguments.get("labels", [])
params = {"state": state}
if labels:
params["labels"] = ",".join(labels)
issues = await client.get(f"/repos/{owner}/{repo}/issues", params)
# 간소화된 정보만 추출
simplified = [{
"number": issue["number"],
"title": issue["title"],
"state": issue["state"],
"user": issue["user"]["login"],
"labels": [l["name"] for l in issue["labels"]],
"created_at": issue["created_at"],
"url": issue["html_url"]
} for issue in issues]
return [TextContent(
type="text",
text=json.dumps(simplified, indent=2)
)]
elif name == "create_issue":
owner = arguments["owner"]
repo = arguments["repo"]
data = {
"title": arguments["title"],
"body": arguments.get("body", ""),
}
if "labels" in arguments:
data["labels"] = arguments["labels"]
if "assignees" in arguments:
data["assignees"] = arguments["assignees"]
issue = await client.post(f"/repos/{owner}/{repo}/issues", data)
return [TextContent(
type="text",
text=f"Created issue #{issue['number']}: {issue['html_url']}"
)]
elif name == "list_pull_requests":
owner = arguments["owner"]
repo = arguments["repo"]
state = arguments.get("state", "open")
prs = await client.get(
f"/repos/{owner}/{repo}/pulls",
params={"state": state}
)
simplified = [{
"number": pr["number"],
"title": pr["title"],
"state": pr["state"],
"user": pr["user"]["login"],
"head": pr["head"]["ref"],
"base": pr["base"]["ref"],
"url": pr["html_url"]
} for pr in prs]
return [TextContent(
type="text",
text=json.dumps(simplified, indent=2)
)]
elif name == "get_repository":
owner = arguments["owner"]
repo = arguments["repo"]
repo_data = await client.get(f"/repos/{owner}/{repo}")
info = {
"name": repo_data["name"],
"full_name": repo_data["full_name"],
"description": repo_data["description"],
"stars": repo_data["stargazers_count"],
"forks": repo_data["forks_count"],
"language": repo_data["language"],
"default_branch": repo_data["default_branch"],
"url": repo_data["html_url"]
}
return [TextContent(
type="text",
text=json.dumps(info, indent=2)
)]
elif name == "search_code":
query = arguments["query"]
if "repo" in arguments:
query += f" repo:{arguments['repo']}"
results = await client.get("/search/code", params={"q": query})
items = [{
"name": item["name"],
"path": item["path"],
"repository": item["repository"]["full_name"],
"url": item["html_url"]
} for item in results["items"][: 10]] # 최대 10개만
return [TextContent(
type="text",
text=json.dumps({
"total_count": results["total_count"],
"items": items
}, indent=2)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
MCP 데이터 흐름 다이어그램
MCP 서버는 다양한 데이터 소스(파일시스템, 데이터베이스, 외부 API)에서 데이터를 가져와 LLM 클라이언트에 전달합니다. 아래 다이어그램은 각 서버 유형별 데이터 파이프라인의 전체 흐름을 시각화합니다.
Slack Bot MCP 서버
Slack Bot Token을 기반으로 채널 목록 조회, 메시지 전송, 메시지 검색 기능을 제공하는 MCP 서버입니다. httpx 비동기 클라이언트를 사용하여 Slack Web API와 통신합니다.
xoxb-...)을 발급받아야 합니다. 필요한 OAuth 스코프: channels:read, chat:write, search:read, groups:read
import asyncio
import os
import json
from typing import Optional
import httpx
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("slack-server")
# Slack API 클라이언트
class SlackClient:
def __init__(self, token: str):
self.token = token
self.base_url = "https://slack.com/api"
self.client = httpx.AsyncClient(
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json; charset=utf-8",
},
timeout=30.0
)
async def api_call(
self, method: str, params: dict = None
) -> dict:
"""Slack Web API 호출"""
url = f"{self.base_url}/{method}"
response = await self.client.post(url, json=params or {})
response.raise_for_status()
data = response.json()
if not data.get("ok"):
raise ValueError(
f"Slack API error: {data.get('error', 'unknown')}"
)
return data
# 글로벌 클라이언트
slack_client: Optional[SlackClient] = None
def get_slack_client() -> SlackClient:
global slack_client
if slack_client is None:
token = os.getenv("SLACK_BOT_TOKEN")
if not token:
raise ValueError(
"SLACK_BOT_TOKEN 환경변수가 설정되지 않았습니다"
)
slack_client = SlackClient(token)
return slack_client
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="list_channels",
description="Slack 워크스페이스의 채널 목록을 조회합니다.",
inputSchema={
"type": "object",
"properties": {
"limit": {
"type": "integer",
"description": "조회할 채널 수 (기본: 100)",
"default": 100
},
"types": {
"type": "string",
"description": "채널 유형 (public_channel, private_channel)",
"default": "public_channel"
}
}
}
),
Tool(
name="send_message",
description="Slack 채널에 메시지를 전송합니다.",
inputSchema={
"type": "object",
"properties": {
"channel": {
"type": "string",
"description": "채널 ID 또는 이름 (예: C01234567)"
},
"text": {
"type": "string",
"description": "전송할 메시지 텍스트"
},
"thread_ts": {
"type": "string",
"description": "스레드 타임스탬프 (스레드 답장 시)"
}
},
"required": ["channel", "text"]
}
),
Tool(
name="search_messages",
description="Slack 메시지를 검색합니다.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "검색 쿼리"
},
"count": {
"type": "integer",
"description": "결과 수 (기본: 20)",
"default": 20
},
"sort": {
"type": "string",
"enum": ["score", "timestamp"],
"default": "score"
}
},
"required": ["query"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
client = get_slack_client()
if name == "list_channels":
limit = arguments.get("limit", 100)
types = arguments.get("types", "public_channel")
data = await client.api_call("conversations.list", {
"limit": limit,
"types": types
})
channels = [{
"id": ch["id"],
"name": ch["name"],
"topic": ch.get("topic", {}).get("value", ""),
"num_members": ch.get("num_members", 0),
"is_archived": ch.get("is_archived", False)
} for ch in data["channels"]]
return [TextContent(
type="text",
text=json.dumps(channels, indent=2, ensure_ascii=False)
)]
elif name == "send_message":
channel = arguments["channel"]
text = arguments["text"]
thread_ts = arguments.get("thread_ts")
params = {"channel": channel, "text": text}
if thread_ts:
params["thread_ts"] = thread_ts
data = await client.api_call("chat.postMessage", params)
return [TextContent(
type="text",
text=json.dumps({
"ok": True,
"channel": data["channel"],
"ts": data["ts"],
"message": data["message"]["text"]
}, indent=2)
)]
elif name == "search_messages":
query = arguments["query"]
count = arguments.get("count", 20)
sort = arguments.get("sort", "score")
data = await client.api_call("search.messages", {
"query": query,
"count": count,
"sort": sort
})
matches = data.get("messages", {}).get("matches", [])
results = [{
"text": m["text"][:200],
"user": m.get("username", "unknown"),
"channel": m.get("channel", {}).get("name", ""),
"ts": m["ts"],
"permalink": m.get("permalink", "")
} for m in matches[:10]]
return [TextContent(
type="text",
text=json.dumps({
"total": data.get("messages", {}).get("total", 0),
"results": results
}, indent=2, ensure_ascii=False)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
Claude Desktop 설정
{
"mcpServers": {
"slack": {
"command": "python",
"args": ["slack_server.py"],
"env": {
"SLACK_BOT_TOKEN": "xoxb-your-bot-token-here"
}
}
}
}
RAG 시스템 MCP 서버
벡터 검색 기반 RAG(Retrieval-Augmented Generation) 서버입니다. chromadb를 사용하여 문서를 임베딩하고 저장하며, 유사도 검색을 통해 관련 문서를 검색한 뒤 LLM에 컨텍스트를 제공합니다.
pip install chromadb mcp 명령으로 필요한 패키지를 설치하세요. ChromaDB는 기본 임베딩 모델(all-MiniLM-L6-v2)을 자동으로 다운로드합니다.
import asyncio
import json
import uuid
from typing import List, Optional
import chromadb
from chromadb.config import Settings
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("rag-server")
# ChromaDB 클라이언트 초기화 (영속 저장소)
chroma_client = chromadb.Client(Settings(
chroma_db_impl="duckdb+parquet",
persist_directory="./chroma_data",
anonymized_telemetry=False
))
def get_or_create_collection(name: str = "documents"):
"""컬렉션 가져오기 또는 생성"""
return chroma_client.get_or_create_collection(
name=name,
metadata={"hnsw:space": "cosine"}
)
def chunk_text(
text: str,
chunk_size: int = 500,
overlap: int = 50
) -> List[str]:
"""텍스트를 청크 단위로 분할 (오버랩 포함)"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
# 문장 경계에서 자르기 시도
if end < len(text):
# 마지막 마침표/줄바꿈 위치 탐색
last_period = chunk.rfind(".")
last_newline = chunk.rfind("\n")
split_pos = max(last_period, last_newline)
if split_pos > chunk_size * 0.3:
chunk = chunk[:split_pos + 1]
end = start + split_pos + 1
chunks.append(chunk.strip())
start = end - overlap
return [c for c in chunks if c] # 빈 청크 제거
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="add_document",
description="문서를 청크로 분할하여 벡터 DB에 추가합니다.",
inputSchema={
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "문서 내용"
},
"metadata": {
"type": "object",
"description": "문서 메타데이터 (title, source 등)"
},
"collection": {
"type": "string",
"description": "컬렉션 이름 (기본: documents)",
"default": "documents"
},
"chunk_size": {
"type": "integer",
"description": "청크 크기 (기본: 500자)",
"default": 500
}
},
"required": ["content"]
}
),
Tool(
name="search",
description="벡터 유사도 검색으로 관련 문서 청크를 찾습니다.",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "검색 쿼리"
},
"n_results": {
"type": "integer",
"description": "반환할 결과 수 (기본: 5)",
"default": 5
},
"collection": {
"type": "string",
"default": "documents"
}
},
"required": ["query"]
}
),
Tool(
name="ask",
description="검색 결과를 컨텍스트로 포함하여 질문에 답변할 프롬프트를 생성합니다.",
inputSchema={
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "사용자 질문"
},
"n_results": {
"type": "integer",
"default": 5
},
"collection": {
"type": "string",
"default": "documents"
}
},
"required": ["question"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "add_document":
content = arguments["content"]
metadata = arguments.get("metadata", {})
collection_name = arguments.get("collection", "documents")
chunk_size = arguments.get("chunk_size", 500)
collection = get_or_create_collection(collection_name)
# 텍스트 청크 분할
chunks = chunk_text(content, chunk_size=chunk_size)
doc_id = str(uuid.uuid4())[:8]
# 각 청크를 벡터 DB에 추가
ids = []
metadatas = []
for i, chunk in enumerate(chunks):
chunk_id = f"{doc_id}-{i}"
ids.append(chunk_id)
metadatas.append({
**metadata,
"chunk_index": i,
"total_chunks": len(chunks),
"doc_id": doc_id
})
collection.add(
documents=chunks,
ids=ids,
metadatas=metadatas
)
return [TextContent(
type="text",
text=json.dumps({
"doc_id": doc_id,
"chunks_added": len(chunks),
"collection": collection_name
}, indent=2)
)]
elif name == "search":
query = arguments["query"]
n_results = arguments.get("n_results", 5)
collection_name = arguments.get("collection", "documents")
collection = get_or_create_collection(collection_name)
results = collection.query(
query_texts=[query],
n_results=n_results
)
items = []
for i in range(len(results["documents"][0])):
items.append({
"text": results["documents"][0][i],
"distance": results["distances"][0][i],
"metadata": results["metadatas"][0][i],
"id": results["ids"][0][i]
})
return [TextContent(
type="text",
text=json.dumps(items, indent=2, ensure_ascii=False)
)]
elif name == "ask":
question = arguments["question"]
n_results = arguments.get("n_results", 5)
collection_name = arguments.get("collection", "documents")
collection = get_or_create_collection(collection_name)
# 관련 문서 검색
results = collection.query(
query_texts=[question],
n_results=n_results
)
# 컨텍스트 구성
context_parts = []
for i, doc in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i]
source = meta.get("source", "unknown")
context_parts.append(
f"[출처: {source}]\n{doc}"
)
context = "\n\n---\n\n".join(context_parts)
# RAG 프롬프트 생성
prompt = f"""다음 컨텍스트를 기반으로 질문에 답변하세요.
컨텍스트:
{context}
질문: {question}
답변:"""
return [TextContent(
type="text",
text=json.dumps({
"prompt": prompt,
"sources_used": len(results["documents"][0]),
"context_length": len(context)
}, indent=2, ensure_ascii=False)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
Claude Desktop 설정
{
"mcpServers": {
"rag": {
"command": "python",
"args": ["rag_server.py"]
}
}
}
chunk_text 함수는 문장 경계(마침표, 줄바꿈)에서 분할하고 오버랩을 적용합니다. 문서 특성에 따라 chunk_size와 overlap 값을 조정하세요. 코드 문서는 더 큰 청크(800~1000자), 짧은 FAQ는 작은 청크(200~300자)가 효과적입니다.
웹 스크래핑 MCP 서버
httpx와 BeautifulSoup를 활용하여 웹 페이지를 가져오고, CSS 셀렉터로 특정 요소를 추출하며, 페이지 내 링크를 수집하는 MCP 서버입니다.
pip install httpx beautifulsoup4 mcp 명령으로 필요한 패키지를 설치하세요.
import asyncio
import json
from typing import List, Optional
from urllib.parse import urljoin, urlparse
import httpx
from bs4 import BeautifulSoup
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("web-scraper-server")
# 공유 HTTP 클라이언트
http_client = httpx.AsyncClient(
timeout=30.0,
follow_redirects=True,
headers={
"User-Agent": (
"Mozilla/5.0 (compatible; MCPBot/1.0)"
)
}
)
async def fetch_page(url: str) -> BeautifulSoup:
"""URL에서 HTML을 가져와 파싱합니다."""
response = await http_client.get(url)
response.raise_for_status()
return BeautifulSoup(response.text, "html.parser")
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="fetch_url",
description="URL의 HTML을 가져와 텍스트를 추출합니다.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "가져올 웹 페이지 URL"
},
"include_html": {
"type": "boolean",
"description": "원본 HTML 포함 여부 (기본: false)",
"default": false
}
},
"required": ["url"]
}
),
Tool(
name="extract_selector",
description="CSS 셀렉터로 웹 페이지의 특정 요소를 추출합니다.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "대상 URL"
},
"selector": {
"type": "string",
"description": "CSS 셀렉터 (예: h1, .class, #id)"
},
"attribute": {
"type": "string",
"description": "추출할 속성 (없으면 텍스트 추출)"
},
"limit": {
"type": "integer",
"description": "최대 결과 수 (기본: 50)",
"default": 50
}
},
"required": ["url", "selector"]
}
),
Tool(
name="collect_links",
description="웹 페이지에서 모든 링크를 수집합니다.",
inputSchema={
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "대상 URL"
},
"same_domain": {
"type": "boolean",
"description": "같은 도메인만 필터링 (기본: false)",
"default": false
},
"pattern": {
"type": "string",
"description": "URL 필터링 패턴 (포함 문자열)"
}
},
"required": ["url"]
}
),
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "fetch_url":
url = arguments["url"]
include_html = arguments.get("include_html", False)
soup = await fetch_page(url)
# 불필요한 요소 제거
for tag in soup.find_all(["script", "style", "nav", "footer"]):
tag.decompose()
result = {
"url": url,
"title": soup.title.string if soup.title else "",
"text": soup.get_text(separator="\n", strip=True)[:5000],
"meta_description": "",
}
# 메타 설명 추출
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc:
result["meta_description"] = meta_desc.get("content", "")
if include_html:
result["html"] = str(soup)[:10000]
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
elif name == "extract_selector":
url = arguments["url"]
selector = arguments["selector"]
attribute = arguments.get("attribute")
limit = arguments.get("limit", 50)
soup = await fetch_page(url)
elements = soup.select(selector)[:limit]
results = []
for el in elements:
if attribute:
value = el.get(attribute, "")
else:
value = el.get_text(strip=True)
results.append({
"tag": el.name,
"value": value,
"attributes": dict(el.attrs) if el.attrs else {}
})
return [TextContent(
type="text",
text=json.dumps({
"url": url,
"selector": selector,
"count": len(results),
"results": results
}, indent=2, ensure_ascii=False)
)]
elif name == "collect_links":
url = arguments["url"]
same_domain = arguments.get("same_domain", False)
pattern = arguments.get("pattern")
soup = await fetch_page(url)
base_domain = urlparse(url).netloc
links = []
for a in soup.find_all("a", href=True):
href = a["href"]
full_url = urljoin(url, href)
parsed = urlparse(full_url)
# 유효한 URL만 수집
if parsed.scheme not in ("http", "https"):
continue
# 같은 도메인 필터
if same_domain and parsed.netloc != base_domain:
continue
# 패턴 필터
if pattern and pattern not in full_url:
continue
links.append({
"text": a.get_text(strip=True)[:100],
"url": full_url
})
# 중복 URL 제거
seen = set()
unique_links = []
for link in links:
if link["url"] not in seen:
seen.add(link["url"])
unique_links.append(link)
return [TextContent(
type="text",
text=json.dumps({
"url": url,
"total_links": len(unique_links),
"links": unique_links
}, indent=2, ensure_ascii=False)
)]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
server.create_initialization_options()
)
if __name__ == "__main__":
asyncio.run(main())
Claude Desktop 설정
{
"mcpServers": {
"web-scraper": {
"command": "python",
"args": ["web_scraper_server.py"]
}
}
}
멀티 서버 오케스트레이션
여러 MCP 서버를 조합하여 복잡한 워크플로우를 자동화하는 오케스트레이터입니다. 아래 예제는 GitHub PR을 분석하고, 관련 코드 파일을 읽은 뒤, 결과를 데이터베이스에 저장하는 전체 파이프라인을 TypeScript로 구현합니다.
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
// 서버별 클라이언트 타입 정의
interface ServerConfig {
name: string;
command: string;
args: string[];
env?: Record<string, string>;
}
// 여러 MCP 서버에 동시 연결 관리
class MCPOrchestrator {
private clients: Map<string, Client> = new Map();
private transports: Map<string, StdioClientTransport> = new Map();
async connect(config: ServerConfig): Promise<void> {
const client = new Client(
{ name: "orchestrator", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
const transport = new StdioClientTransport({
command: config.command,
args: config.args,
env: config.env
});
await client.connect(transport);
this.clients.set(config.name, client);
this.transports.set(config.name, transport);
console.log(`[${config.name}] 서버 연결 완료`);
}
// 특정 서버의 도구 호출
async callTool(
serverName: string,
toolName: string,
args: Record<string, unknown>
): Promise<string> {
const client = this.clients.get(serverName);
if (!client) {
throw new Error(`서버 '${serverName}'에 연결되지 않았습니다`);
}
const result = await client.callTool(toolName, args);
return (result.content[0] as any).text;
}
// 모든 연결 종료
async disconnect(): Promise<void> {
for (const [name, client] of this.clients) {
await client.close();
console.log(`[${name}] 연결 종료`);
}
this.clients.clear();
this.transports.clear();
}
}
// ================================================
// 워크플로우: GitHub PR 분석 → 코드 읽기 → DB 저장
// ================================================
async function analyzePRWorkflow(
orchestrator: MCPOrchestrator,
owner: string,
repo: string
) {
console.log("=== PR 분석 워크플로우 시작 ===");
// 1단계: GitHub에서 열린 PR 목록 조회
console.log("\n[1단계] PR 목록 조회...");
const prsJson = await orchestrator.callTool(
"github", "list_pull_requests",
{ owner, repo, state: "open" }
);
const prs = JSON.parse(prsJson);
console.log(` 열린 PR ${prs.length}개 발견`);
// 2단계: 각 PR에 대해 상세 분석
for (const pr of prs.slice(0, 5)) {
console.log(`\n[2단계] PR #${pr.number}: ${pr.title}`);
// GitHub에서 PR의 변경된 파일 목록 가져오기
const searchResult = await orchestrator.callTool(
"github", "search_code",
{ query: `repo:${owner}/${repo} ${pr.head}` }
);
// 3단계: 파일시스템에서 관련 파일 읽기
console.log("[3단계] 관련 파일 읽기...");
let codeContext = "";
try {
const fileContent = await orchestrator.callTool(
"filesystem", "search_files",
{
directory: `/home/user/projects/${repo}`,
pattern: "*.ts"
}
);
codeContext = fileContent;
} catch (e) {
console.log(" 로컬 파일 접근 불가, 건너뛰기");
}
// 4단계: 분석 결과를 DB에 저장
console.log("[4단계] DB에 결과 저장...");
const analysis = {
pr_number: pr.number,
title: pr.title,
author: pr.user,
branch: pr.head,
code_context_length: codeContext.length,
analyzed_at: new Date().toISOString()
};
await orchestrator.callTool(
"database", "execute",
{
db_path: "./pr_analysis.db",
sql: `INSERT INTO pr_analyses
(pr_number, title, author, branch, context_len, analyzed_at)
VALUES (?, ?, ?, ?, ?, ?)`,
params: [
analysis.pr_number,
analysis.title,
analysis.author,
analysis.branch,
analysis.code_context_length,
analysis.analyzed_at
]
}
);
console.log(` PR #${pr.number} 분석 결과 저장 완료`);
}
// 5단계: 전체 분석 요약 조회
console.log("\n[5단계] 분석 요약 조회...");
const summary = await orchestrator.callTool(
"database", "query",
{
db_path: "./pr_analysis.db",
sql: "SELECT COUNT(*) as total, MAX(analyzed_at) as last_run FROM pr_analyses"
}
);
console.log(" 요약:", summary);
console.log("\n=== 워크플로우 완료 ===");
}
// 메인 실행
async function main() {
const orchestrator = new MCPOrchestrator();
try {
// 각 MCP 서버에 연결
await orchestrator.connect({
name: "github",
command: "python",
args: ["github_server.py"],
env: { GITHUB_TOKEN: process.env.GITHUB_TOKEN! }
});
await orchestrator.connect({
name: "filesystem",
command: "python",
args: ["filesystem_server.py"]
});
await orchestrator.connect({
name: "database",
command: "python",
args: ["sqlite_server.py"]
});
// DB 테이블 생성
await orchestrator.callTool(
"database", "create_table",
{
db_path: "./pr_analysis.db",
table_name: "pr_analyses",
columns: {
id: "INTEGER PRIMARY KEY AUTOINCREMENT",
pr_number: "INTEGER NOT NULL",
title: "TEXT NOT NULL",
author: "TEXT",
branch: "TEXT",
context_len: "INTEGER DEFAULT 0",
analyzed_at: "TEXT NOT NULL"
}
}
);
// 워크플로우 실행
await analyzePRWorkflow(
orchestrator, "owner", "my-repo"
);
} finally {
await orchestrator.disconnect();
}
}
main().catch(console.error);
멀티 서버 Claude Desktop 설정
Claude Desktop에서 여러 MCP 서버를 동시에 사용하려면 claude_desktop_config.json에 모든 서버를 등록합니다.
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["filesystem_server.py"]
},
"database": {
"command": "python",
"args": ["sqlite_server.py"]
},
"github": {
"command": "python",
"args": ["github_server.py"],
"env": {
"GITHUB_TOKEN": "ghp_your_token_here"
}
},
"slack": {
"command": "python",
"args": ["slack_server.py"],
"env": {
"SLACK_BOT_TOKEN": "xoxb-your-token"
}
},
"rag": {
"command": "python",
"args": ["rag_server.py"]
},
"web-scraper": {
"command": "python",
"args": ["web_scraper_server.py"]
}
}
}
핵심 정리
- MCP 실전 예제의 핵심 개념과 흐름을 정리합니다.
- 파일 시스템 서버를 단계별로 이해합니다.
- 실전 적용 시 기준과 주의점을 확인합니다.
실무 팁
- 입력/출력 예시를 고정해 재현성을 확보하세요.
- MCP 실전 예제 범위를 작게 잡고 단계적으로 확장하세요.
- 파일 시스템 서버 조건을 문서화해 대응 시간을 줄이세요.