MCP 클라이언트
MCP 클라이언트는 MCP 서버와 통신하여 AI 모델에게 도구와 리소스를 제공합니다. 이 문서에서는 대표 예시로 Claude Desktop, Claude CLI, 그리고 커스텀 클라이언트를 통해 MCP 클라이언트의 구조와 사용법을 설명합니다.
- 데스크톱 앱 예시: Claude Desktop에서 설정 파일로 서버 추가
- CLI 예시: Claude CLI의
mcp명령으로 서버 관리 - 커스텀 클라이언트: SDK로 직접 구현
데스크톱 클라이언트 예시: Claude Desktop
설정
대표적인 데스크톱 클라이언트 예시인 Claude Desktop은 MCP 서버를 설정 파일을 통해 관리합니다.
설정 파일 위치
| OS | 경로 |
|---|---|
| macOS | ~/Library/Application Support/Claude/claude_desktop_config.json |
| Windows | %APPDATA%\Claude\claude_desktop_config.json |
| Linux | ~/.config/Claude/claude_desktop_config.json |
설정 파일 구조
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["-m", "mcp_server_filesystem"]
},
"postgres": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-postgres"],
"env": {
"DATABASE_URL": "postgresql://user:password@localhost/mydb"
}
},
"github": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {
"GITHUB_TOKEN": "ghp_your_token_here"
}
}
}
}
설정 예제
1. 파일시스템 서버 (Python)
{
"mcpServers": {
"filesystem": {
"command": "uv",
"args": [
"--directory",
"/path/to/mcp-server-filesystem",
"run",
"mcp-server-filesystem"
]
}
}
}
2. SQLite 서버
{
"mcpServers": {
"sqlite": {
"command": "npx",
"args": [
"-y",
"@modelcontextprotocol/server-sqlite",
"/path/to/database.db"
]
}
}
}
3. 여러 서버 동시 사용
{
"mcpServers": {
"filesystem": {
"command": "python",
"args": ["-m", "mcp_server_filesystem"],
"env": {
"ALLOWED_DIRS": "/home/user/Documents,/home/user/Projects"
}
},
"github": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-github"],
"env": {
"GITHUB_TOKEN": "ghp_your_token"
}
},
"postgres": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-postgres"],
"env": {
"DATABASE_URL": "postgresql://localhost/myapp"
}
},
"slack": {
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-slack"],
"env": {
"SLACK_BOT_TOKEN": "xoxb-your-token",
"SLACK_TEAM_ID": "T1234567"
}
}
}
}
사용 방법
- 설정 파일 편집 후 Claude Desktop 재시작
- 새 대화 시작
- 서버가 제공하는 도구를 자연어로 요청
- 클라이언트가 자동으로 적절한 도구 선택 및 실행
예제 대화
사용자: 내 프로젝트 디렉토리에 있는 모든 Python 파일을 찾아줘
AI 앱: [filesystem 서버의 search_files 도구 사용]
다음 Python 파일을 찾았습니다:
- /home/user/Projects/app.py
- /home/user/Projects/utils.py
- /home/user/Projects/tests/test_app.py
사용자: app.py 파일의 내용을 보여줘
AI 앱: [filesystem 서버의 read_file 도구 사용]
app.py 파일의 내용입니다:
[파일 내용 표시]
사용자: GitHub 저장소의 open 이슈를 확인해줘
AI 앱: [github 서버의 list_issues 도구 사용]
현재 open 상태인 이슈 목록입니다:
1. #42: 로그인 버그 수정
2. #43: 성능 개선 필요
...
문제 해결
| 문제 | 원인 | 해결 |
|---|---|---|
| 서버가 연결되지 않음 | 잘못된 명령어/경로 | 명령어와 경로 확인 |
| 환경 변수 인식 안 됨 | env 설정 누락 | env 섹션 추가 |
| 도구가 표시되지 않음 | 서버 초기화 실패 | 로그 확인 (개발자 도구) |
| 권한 오류 | 파일/디렉토리 접근 권한 | 서버 허용 목록 확인 |
- 설정 파일에 API 토큰/패스워드를 직접 저장하지 마세요
- 환경 변수나 시크릿 관리 도구 사용 권장
- 파일시스템 서버는 허용 디렉토리 제한 필수
CLI 클라이언트 예시: Claude CLI
설치
# npm으로 설치
npm install -g @anthropic-ai/claude-cli
# 또는 Homebrew (macOS)
brew install claude-cli
설정
# API 키 설정
claude auth login
# 또는 환경 변수
export ANTHROPIC_API_KEY=sk-ant-your-key-here
MCP 서버 사용
서버 추가
# MCP 서버 추가
claude mcp add filesystem python -m mcp_server_filesystem
# 환경 변수와 함께 추가
claude mcp add github npx -y @modelcontextprotocol/server-github \
--env GITHUB_TOKEN=ghp_your_token
# 서버 목록 확인
claude mcp list
# 서버 제거
claude mcp remove filesystem
대화 시작
# MCP 서버와 함께 대화 시작
claude chat --mcp filesystem,github
# 또는 모든 서버 사용
claude chat --mcp all
# 일회성 질문
claude ask "프로젝트의 README.md 파일 내용을 보여줘" --mcp filesystem
CLI 사용 예제
# 파일 검색 및 분석
claude ask "src/ 디렉토리의 모든 TypeScript 파일을 분석하고 \
주요 함수 목록을 만들어줘" --mcp filesystem
# 데이터베이스 쿼리
claude ask "users 테이블에서 최근 7일간 가입한 사용자 수를 알려줘" \
--mcp postgres
# GitHub 이슈 생성
claude ask "버그 리포트: 로그인 시 세션 만료 문제, \
우선순위 높음으로 이슈 생성해줘" --mcp github
# 여러 서버 조합
claude chat --mcp filesystem,github,slack
커스텀 클라이언트 개발
Python 클라이언트
설치
pip install mcp
# 또는
uv add mcp
기본 클라이언트
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport
async def main():
# 클라이언트 생성
client = Client(
{"name": "my-client", "version": "1.0.0"},
{"capabilities": {"tools": {}}}
)
# 서버 연결 (stdio)
transport = StdioClientTransport({
"command": "python",
"args": ["-m", "mcp_server_filesystem"]
})
await client.connect(transport)
try:
# 도구 목록 조회
tools = await client.list_tools()
print("Available tools:")
for tool in tools:
print(f" - {tool['name']}: {tool['description']}")
# 도구 호출
result = await client.call_tool("read_file", {
"path": "/path/to/file.txt"
})
print("File content:", result)
finally:
await client.close()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
리소스 읽기
# 리소스 목록
resources = await client.list_resources()
for resource in resources:
print(f"Resource: {resource['uri']}")
# 리소스 읽기
content = await client.read_resource("file:///path/to/doc.txt")
print(content)
프롬프트 사용
# 프롬프트 목록
prompts = await client.list_prompts()
for prompt in prompts:
print(f"Prompt: {prompt['name']}")
# 프롬프트 가져오기
messages = await client.get_prompt("code-review", {
"language": "python",
"code": "def hello(): return 'world'"
})
print(messages)
TypeScript 클라이언트
설치
npm install @modelcontextprotocol/sdk
기본 클라이언트
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";
async function main() {
// 클라이언트 생성
const client = new Client(
{
name: "my-client",
version: "1.0.0"
},
{
capabilities: {
tools: {}
}
}
);
// 서버 연결
const transport = new StdioClientTransport({
command: "node",
args: ["dist/server.js"]
});
await client.connect(transport);
try {
// 도구 목록
const { tools } = await client.listTools();
console.log("Available tools:", tools);
// 도구 호출
const result = await client.callTool("read_file", {
path: "/path/to/file.txt"
});
console.log("Result:", result);
} finally {
await client.close();
}
}
main().catch(console.error);
LLM SDK와 통합 예시 (Anthropic SDK)
import anthropic
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport
async def chat_with_mcp():
# MCP 클라이언트 설정
mcp_client = Client(
{"name": "mcp-chat-client", "version": "1.0.0"},
{"capabilities": {"tools": {}}}
)
transport = StdioClientTransport({
"command": "python",
"args": ["-m", "mcp_server_filesystem"]
})
await mcp_client.connect(transport)
# MCP 도구를 Anthropic Tool 형식으로 변환
mcp_tools = await mcp_client.list_tools()
anthropic_tools = []
for tool in mcp_tools:
anthropic_tools.append({
"name": tool["name"],
"description": tool["description"],
"input_schema": tool["inputSchema"]
})
# Anthropic 클라이언트
client = anthropic.Anthropic()
# 대화 시작
messages = [{
"role": "user",
"content": "README.md 파일을 읽어줘"
}]
response = client.messages.create(
model="claude-4-5",
max_tokens=4096,
messages=messages,
tools=anthropic_tools
)
# Tool use 처리
while response.stop_reason == "tool_use":
tool_use = response.content[-1]
# MCP 서버에서 도구 실행
tool_result = await mcp_client.call_tool(
tool_use.name,
tool_use.input
)
# 결과를 메시지에 추가
messages.append({"role": "assistant", "content": response.content})
messages.append({
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": tool_result
}]
})
# 다음 응답
response = client.messages.create(
model="claude-4-5",
max_tokens=4096,
messages=messages,
tools=anthropic_tools
)
print(response.content)
await mcp_client.close()
import asyncio
asyncio.run(chat_with_mcp())
클라이언트 Capabilities
Capability 선언
client = Client(
{"name": "my-client", "version": "1.0.0"},
{
"capabilities": {
"tools": {}, # 도구 호출 지원
"resources": {
"subscribe": True # 리소스 변경 구독
},
"prompts": {}, # 프롬프트 사용
"sampling": {} # 샘플링 요청 수신
}
}
)
리소스 구독
# 리소스 변경 구독
await client.subscribe_resource("file:///path/to/watched.txt")
# 리소스 변경 알림 수신
@client.on_resource_updated
async def handle_update(uri):
print(f"Resource updated: {uri}")
content = await client.read_resource(uri)
print("New content:", content)
# 구독 해제
await client.unsubscribe_resource("file:///path/to/watched.txt")
에러 처리
from mcp.client import McpError
try:
result = await client.call_tool("read_file", {
"path": "/nonexistent.txt"
})
except McpError as e:
if e.code == -32001:
print("File not found")
elif e.code == -32002:
print("Permission denied")
else:
print(f"Error: {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")
모범 사례
- 연결 관리: try-finally로 확실한 종료 보장
- 에러 처리: 모든 도구 호출에 에러 처리 구현
- 타임아웃: 장시간 실행 작업에 타임아웃 설정
- 재시도: 일시적 오류 시 재시도 로직
- 로깅: 디버깅을 위한 상세 로깅
연결 풀링
class McpClientPool:
def __init__(self, server_config, pool_size=5):
self.server_config = server_config
self.pool_size = pool_size
self.clients = []
async def initialize(self):
for _ in range(self.pool_size):
client = await self._create_client()
self.clients.append(client)
async def _create_client(self):
client = Client(
{"name": "pooled-client", "version": "1.0.0"},
{"capabilities": {"tools": {}}}
)
transport = StdioClientTransport(self.server_config)
await client.connect(transport)
return client
async def get_client(self):
if self.clients:
return self.clients.pop()
return await self._create_client()
async def return_client(self, client):
if len(self.clients) < self.pool_size:
self.clients.append(client)
else:
await client.close()
다음 단계
MCP 클라이언트 아키텍처
MCP 클라이언트는 호스트 애플리케이션 내부에서 동작하며, 트랜스포트 계층을 통해 하나 이상의 MCP 서버와 통신합니다. 아래 다이어그램은 클라이언트의 전체 아키텍처와 1:N 서버 연결 구조를 보여줍니다.
- Host Application: MCP 클라이언트를 내장하는 AI 애플리케이션 (Claude Desktop, IDE 플러그인 등)
- MCP Client: 프로토콜 협상, 메시지 직렬화/역직렬화, Capability 교환을 담당
- Transport Layer: 실제 데이터 전송을 담당하며, stdio, SSE, Streamable HTTP 중 선택
- 1:N 관계: 하나의 클라이언트가 여러 서버에 동시 연결 가능. 각 서버는 독립적 세션 유지
SSE 트랜스포트 클라이언트
SSE(Server-Sent Events) 트랜스포트는 HTTP 기반으로 원격 MCP 서버에 연결할 때 사용합니다. 클라이언트는 HTTP POST로 요청을 보내고, SSE 스트림으로 응답과 알림을 수신합니다.
SSE 기본 연결
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
async function connectSSE() {
const client = new Client(
{ name: "sse-client", version: "1.0.0" },
{ capabilities: { tools: {}, resources: {} } }
);
// SSE 트랜스포트 생성 - 원격 서버 URL 지정
const transport = new SSEClientTransport(
new URL("http://localhost:3001/sse")
);
await client.connect(transport);
console.log("SSE 서버에 연결됨");
// 도구 목록 확인
const { tools } = await client.listTools();
console.log("사용 가능한 도구:", tools.map(t => t.name));
return client;
}
SSE 이벤트 처리
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
async function sseWithEventHandling() {
const client = new Client(
{ name: "sse-event-client", version: "1.0.0" },
{ capabilities: { tools: {}, resources: { subscribe: true } } }
);
// 서버로부터의 알림 처리
client.setNotificationHandler(
"notifications/tools/list_changed",
async () => {
console.log("도구 목록이 변경되었습니다. 갱신 중...");
const { tools } = await client.listTools();
console.log("갱신된 도구:", tools);
}
);
client.setNotificationHandler(
"notifications/resources/updated",
async (notification) => {
const uri = notification.params.uri;
console.log(`리소스 변경 감지: ${uri}`);
const content = await client.readResource({ uri });
console.log("새 내용:", content);
}
);
const transport = new SSEClientTransport(
new URL("http://localhost:3001/sse")
);
await client.connect(transport);
// 리소스 구독
await client.subscribeResource({ uri: "file:///config.json" });
console.log("리소스 구독 완료");
return client;
}
SSE 재연결 로직
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";
class ResilientSSEClient {
private client: Client | null = null;
private serverUrl: string;
private maxRetries: number;
private retryCount = 0;
constructor(serverUrl: string, maxRetries = 5) {
this.serverUrl = serverUrl;
this.maxRetries = maxRetries;
}
async connect(): Promise<Client> {
while (this.retryCount < this.maxRetries) {
try {
this.client = new Client(
{ name: "resilient-sse", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
const transport = new SSEClientTransport(
new URL(this.serverUrl)
);
// 연결 종료 시 자동 재연결
transport.onclose = async () => {
console.log("연결 종료 감지, 재연결 시도...");
await this.reconnect();
};
transport.onerror = async (error) => {
console.error("트랜스포트 오류:", error);
};
await this.client.connect(transport);
this.retryCount = 0; // 성공 시 카운터 리셋
console.log("SSE 서버 연결 성공");
return this.client;
} catch (error) {
this.retryCount++;
const delay = Math.min(
1000 * Math.pow(2, this.retryCount),
30000
);
console.log(
`연결 실패 (${this.retryCount}/${this.maxRetries}). ` +
`${delay}ms 후 재시도...`
);
await new Promise(r => setTimeout(r, delay));
}
}
throw new Error("최대 재연결 횟수 초과");
}
private async reconnect(): Promise<void> {
try {
await this.client?.close();
} catch { /* 무시 */ }
await this.connect();
}
async close(): Promise<void> {
this.maxRetries = 0; // 재연결 방지
await this.client?.close();
}
}
Streamable HTTP 클라이언트
Streamable HTTP는 MCP 프로토콜의 최신 트랜스포트 방식으로, SSE의 한계를 극복하고 양방향 스트리밍과 세션 관리를 지원합니다. 기존 SSE 대비 더 유연한 연결 관리와 상태 추적이 가능합니다.
기본 연결
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from
"@modelcontextprotocol/sdk/client/streamableHttp.js";
async function connectStreamableHTTP() {
const client = new Client(
{ name: "streamable-http-client", version: "1.0.0" },
{
capabilities: {
tools: {},
resources: { subscribe: true },
sampling: {}
}
}
);
// Streamable HTTP 트랜스포트 생성
const transport = new StreamableHTTPClientTransport(
new URL("http://localhost:3001/mcp")
);
await client.connect(transport);
console.log("Streamable HTTP 서버에 연결됨");
// 서버 정보 확인
const serverInfo = client.getServerVersion();
console.log("서버:", serverInfo?.name, serverInfo?.version);
return client;
}
세션 관리
Streamable HTTP 트랜스포트는 서버가 발급하는 세션 ID를 통해 상태를 유지합니다. 클라이언트는 Mcp-Session-Id 헤더를 통해 세션을 추적합니다.
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from
"@modelcontextprotocol/sdk/client/streamableHttp.js";
class SessionManagedClient {
private client: Client;
private transport: StreamableHTTPClientTransport;
private serverUrl: string;
constructor(serverUrl: string) {
this.serverUrl = serverUrl;
this.client = new Client(
{ name: "session-client", version: "1.0.0" },
{ capabilities: { tools: {}, sampling: {} } }
);
this.transport = new StreamableHTTPClientTransport(
new URL(serverUrl)
);
}
async connect(): Promise<void> {
await this.client.connect(this.transport);
console.log("세션 연결 완료");
}
async callToolSafely(
toolName: string,
args: Record<string, unknown>
) {
try {
const result = await this.client.callTool({
name: toolName,
arguments: args
});
return result;
} catch (error: any) {
// 세션 만료 시 재연결
if (error.code === -32001) {
console.log("세션 만료, 재연결 중...");
this.transport = new StreamableHTTPClientTransport(
new URL(this.serverUrl)
);
await this.client.connect(this.transport);
return await this.client.callTool({
name: toolName,
arguments: args
});
}
throw error;
}
}
async disconnect(): Promise<void> {
await this.client.close();
console.log("세션 종료 완료");
}
}
// 사용 예
const client = new SessionManagedClient("http://localhost:3001/mcp");
await client.connect();
const result = await client.callToolSafely("read_file", {
path: "/data/config.json"
});
console.log(result);
await client.disconnect();
| 특성 | SSE | Streamable HTTP |
|---|---|---|
| 세션 관리 | 없음 (연결 기반) | 명시적 세션 ID |
| 재연결 | 클라이언트 구현 필요 | 세션 ID로 복원 가능 |
| 스트리밍 | 서버 -> 클라이언트만 | 양방향 지원 |
| 상태 | Deprecated (권장하지 않음) | 최신 표준 권장 |
클라이언트 생명주기 관리
MCP 클라이언트는 명확한 상태 전이를 따릅니다. 안정적인 운영을 위해 각 상태에서의 동작과 전이 조건을 이해해야 합니다.
상태 머신 다이어그램
재시도 + Exponential Backoff 클라이언트
import asyncio
import logging
from enum import Enum
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport
logger = logging.getLogger(__name__)
class ClientState(Enum):
INIT = "init"
CONNECTING = "connecting"
ACTIVE = "active"
RECONNECTING = "reconnecting"
CLOSED = "closed"
class ResilientMcpClient:
"""재시도 로직과 exponential backoff를 갖춘 MCP 클라이언트 래퍼"""
def __init__(
self,
server_command: str,
server_args: list[str],
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
):
self.server_command = server_command
self.server_args = server_args
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.state = ClientState.INIT
self.client: Client | None = None
self._retry_count = 0
async def connect(self) -> None:
"""서버에 연결 (실패 시 exponential backoff 재시도)"""
self.state = ClientState.CONNECTING
while self._retry_count <= self.max_retries:
try:
self.client = Client(
{"name": "resilient-client", "version": "1.0.0"},
{"capabilities": {"tools": {}, "sampling": {}}}
)
transport = StdioClientTransport({
"command": self.server_command,
"args": self.server_args,
})
await self.client.connect(transport)
self.state = ClientState.ACTIVE
self._retry_count = 0
logger.info("MCP 서버 연결 성공")
return
except Exception as e:
self._retry_count += 1
delay = min(
self.base_delay * (2 ** self._retry_count),
self.max_delay
)
logger.warning(
f"연결 실패 ({self._retry_count}/{self.max_retries}): {e}"
)
logger.info(f"{delay:.1f}초 후 재시도...")
await asyncio.sleep(delay)
self.state = ClientState.CLOSED
raise ConnectionError("최대 재시도 횟수 초과")
async def call_tool_safe(
self, name: str, arguments: dict
) -> dict:
"""도구 호출 (연결 끊김 시 자동 재연결)"""
try:
return await self.client.call_tool(name, arguments)
except Exception as e:
logger.warning(f"도구 호출 실패: {e}, 재연결 시도")
self.state = ClientState.RECONNECTING
await self.connect()
return await self.client.call_tool(name, arguments)
async def close(self) -> None:
"""Graceful shutdown"""
if self.client and self.state == ClientState.ACTIVE:
try:
await self.client.close()
logger.info("클라이언트 정상 종료")
except Exception as e:
logger.error(f"종료 중 오류: {e}")
self.state = ClientState.CLOSED
async def __aenter__(self):
await self.connect()
return self
async def __aexit__(self, *args):
await self.close()
# 사용 예
async def main():
async with ResilientMcpClient(
server_command="python",
server_args=["-m", "mcp_server_filesystem"],
max_retries=3,
) as client:
tools = await client.client.list_tools()
print("도구 목록:", tools)
result = await client.call_tool_safe(
"read_file", {"path": "/etc/hostname"}
)
print(result)
멀티 서버 클라이언트
실제 운영 환경에서는 하나의 AI 애플리케이션이 여러 MCP 서버를 동시에 사용합니다. 멀티 서버 클라이언트는 도구 라우팅, 네임스페이스 충돌 해결, 서버 헬스체크 등을 통합적으로 관리합니다.
서버 통합 관리
import asyncio
import logging
from dataclasses import dataclass, field
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport
logger = logging.getLogger(__name__)
@dataclass
class ServerConfig:
"""MCP 서버 설정"""
name: str
command: str
args: list[str] = field(default_factory=list)
env: dict[str, str] = field(default_factory=dict)
namespace: str = "" # 도구 이름 접두사
class MultiServerClient:
"""여러 MCP 서버를 통합 관리하는 클라이언트"""
def __init__(self):
self.servers: dict[str, Client] = {}
self.tool_map: dict[str, str] = {} # 도구명 -> 서버명
self.configs: dict[str, ServerConfig] = {}
self.health: dict[str, bool] = {}
async def add_server(self, config: ServerConfig) -> None:
"""서버를 추가하고 연결"""
client = Client(
{"name": f"multi-client-{config.name}",
"version": "1.0.0"},
{"capabilities": {"tools": {}, "resources": {}}}
)
transport = StdioClientTransport({
"command": config.command,
"args": config.args,
"env": config.env,
})
await client.connect(transport)
self.servers[config.name] = client
self.configs[config.name] = config
self.health[config.name] = True
# 도구 목록 등록 (네임스페이스 적용)
await self._register_tools(config.name, client, config.namespace)
logger.info(f"서버 '{config.name}' 연결 완료")
async def _register_tools(
self, server_name: str, client: Client, namespace: str
) -> None:
"""서버의 도구를 등록하고 충돌 시 네임스페이스 접두사 추가"""
tools = await client.list_tools()
prefix = f"{namespace}_" if namespace else ""
for tool in tools:
tool_name = f"{prefix}{tool['name']}"
# 충돌 감지 및 자동 접두사 추가
if tool_name in self.tool_map:
existing = self.tool_map[tool_name]
logger.warning(
f"도구 이름 충돌: '{tool_name}' "
f"({existing} vs {server_name})"
)
tool_name = f"{server_name}_{tool['name']}"
self.tool_map[tool_name] = server_name
async def call_tool(
self, tool_name: str, arguments: dict
) -> dict:
"""적절한 서버로 도구 호출을 라우팅"""
if tool_name not in self.tool_map:
raise ValueError(f"등록되지 않은 도구: {tool_name}")
server_name = self.tool_map[tool_name]
# 서버 상태 확인
if not self.health.get(server_name, False):
logger.info(f"서버 '{server_name}' 비정상, 재연결 시도")
await self.reconnect_server(server_name)
# 원본 도구 이름 추출 (접두사 제거)
config = self.configs[server_name]
prefix = f"{config.namespace}_" if config.namespace else ""
original_name = tool_name.removeprefix(prefix)
client = self.servers[server_name]
return await client.call_tool(original_name, arguments)
async def list_all_tools(self) -> list[dict]:
"""모든 서버의 도구 목록 통합 반환"""
all_tools = []
for tool_name, server_name in self.tool_map.items():
all_tools.append({
"name": tool_name,
"server": server_name,
})
return all_tools
헬스체크 및 자동 재연결
class MultiServerClient(MultiServerClient):
# (위 클래스에 추가되는 메서드들)
async def health_check(self) -> dict[str, bool]:
"""모든 서버의 상태를 점검"""
results = {}
for name, client in self.servers.items():
try:
await asyncio.wait_for(
client.list_tools(), timeout=5.0
)
results[name] = True
except Exception:
results[name] = False
logger.warning(f"서버 '{name}' 헬스체크 실패")
self.health = results
return results
async def reconnect_server(self, server_name: str) -> None:
"""특정 서버 재연결"""
if server_name not in self.configs:
raise ValueError(f"알 수 없는 서버: {server_name}")
# 기존 연결 정리
try:
await self.servers[server_name].close()
except Exception:
pass
# 재연결
config = self.configs[server_name]
await self.add_server(config)
logger.info(f"서버 '{server_name}' 재연결 성공")
async def start_health_monitor(
self, interval: float = 30.0
) -> None:
"""주기적 헬스체크 및 자동 재연결"""
while True:
await asyncio.sleep(interval)
health = await self.health_check()
for name, is_healthy in health.items():
if not is_healthy:
logger.info(f"서버 '{name}' 비정상, 자동 재연결")
try:
await self.reconnect_server(name)
except Exception as e:
logger.error(
f"서버 '{name}' 재연결 실패: {e}"
)
async def close_all(self) -> None:
"""모든 서버 연결 종료"""
for name, client in self.servers.items():
try:
await client.close()
logger.info(f"서버 '{name}' 종료")
except Exception as e:
logger.error(f"서버 '{name}' 종료 실패: {e}")
self.servers.clear()
self.tool_map.clear()
# 사용 예
async def main():
manager = MultiServerClient()
# 여러 서버 등록
await manager.add_server(ServerConfig(
name="fs",
command="python",
args=["-m", "mcp_server_filesystem"],
namespace="fs",
))
await manager.add_server(ServerConfig(
name="github",
command="npx",
args=["-y", "@modelcontextprotocol/server-github"],
env={"GITHUB_TOKEN": "ghp_..."},
namespace="gh",
))
# 헬스 모니터 백그라운드 실행
monitor = asyncio.create_task(
manager.start_health_monitor(interval=30)
)
# 통합 도구 목록
tools = await manager.list_all_tools()
print("전체 도구:", tools)
# 라우팅된 도구 호출
result = await manager.call_tool(
"fs_read_file", {"path": "/README.md"}
)
print(result)
monitor.cancel()
await manager.close_all()
Sampling 구현
Sampling은 MCP 프로토콜의 고급 기능으로, 서버가 클라이언트에게 LLM 호출을 요청하는 역방향 패턴입니다. 일반적으로 MCP에서는 클라이언트가 서버에게 도구 실행을 요청하지만, Sampling에서는 서버가 "이 프롬프트로 LLM에게 물어봐 달라"고 클라이언트에게 요청합니다.
- 서버 측에서 동적으로 LLM 판단이 필요한 워크플로 (예: 코드 분석 후 리팩토링 제안)
- 에이전트 루프: 서버가 도구 실행 결과를 보고 다음 행동을 LLM에게 질의
- 사용자의 명시적 동의 하에 서버가 AI 기능을 활용하는 경우
Sampling 흐름
Sampling Request Handler 구현
import anthropic
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport
from mcp.types import (
CreateMessageRequest,
CreateMessageResult,
SamplingMessage,
TextContent,
)
class SamplingEnabledClient:
"""Sampling 요청을 처리할 수 있는 MCP 클라이언트"""
def __init__(self, anthropic_api_key: str):
self.anthropic = anthropic.Anthropic(
api_key=anthropic_api_key
)
self.client: Client | None = None
async def _handle_sampling(
self, request: CreateMessageRequest
) -> CreateMessageResult:
"""서버로부터의 sampling 요청을 처리"""
# 1. MCP 메시지를 Anthropic API 형식으로 변환
messages = []
for msg in request.params.messages:
messages.append({
"role": msg.role,
"content": msg.content.text
if hasattr(msg.content, "text")
else str(msg.content),
})
# 2. 사용자에게 승인 요청 (보안을 위해 권장)
print("[Sampling 요청 수신]")
print(f" 모델: {request.params.modelPreferences}")
print(f" 메시지: {messages}")
# 3. LLM API 호출
response = self.anthropic.messages.create(
model=request.params.modelPreferences.hints[0].name
if request.params.modelPreferences
else "claude-4-5-haiku",
max_tokens=request.params.maxTokens or 1024,
system=request.params.systemPrompt or "",
messages=messages,
)
# 4. 결과를 MCP 형식으로 변환하여 반환
result_text = response.content[0].text
return CreateMessageResult(
role="assistant",
content=TextContent(
type="text",
text=result_text,
),
model=response.model,
)
async def connect(
self, command: str, args: list[str]
) -> None:
"""서버 연결 및 sampling 핸들러 등록"""
self.client = Client(
{"name": "sampling-client", "version": "1.0.0"},
{
"capabilities": {
"tools": {},
"sampling": {}, # sampling capability 선언
}
}
)
# Sampling 핸들러 등록
self.client.set_sampling_handler(self._handle_sampling)
transport = StdioClientTransport({
"command": command,
"args": args,
})
await self.client.connect(transport)
print("Sampling 지원 클라이언트 연결 완료")
async def close(self) -> None:
if self.client:
await self.client.close()
# 사용 예
async def main():
client = SamplingEnabledClient(
anthropic_api_key="sk-ant-..."
)
await client.connect(
command="python",
args=["-m", "my_agent_server"]
)
# 서버가 작업 중 LLM 판단이 필요하면
# 자동으로 _handle_sampling이 호출됨
result = await client.client.call_tool(
"analyze_and_suggest",
{"file_path": "/src/main.py"}
)
print("분석 결과:", result)
await client.close()
- 사용자 승인: 프로덕션 환경에서는 반드시 사용자에게 sampling 요청을 승인받아야 합니다. 서버가 임의로 LLM 호출을 유발할 수 있으므로 비용과 보안 위험이 있습니다.
- 모델 제한: 서버가 요청하는 모델과 max_tokens를 클라이언트가 검증하고 제한해야 합니다.
- 프롬프트 인젝션: 서버가 보내는 system prompt와 messages를 검증하여 악의적 프롬프트를 차단해야 합니다.
- 비용 관리: sampling 호출 횟수와 토큰 사용량을 추적하고 한도를 설정하세요.
핵심 정리
- MCP 클라이언트의 핵심 개념과 흐름을 정리합니다.
- 대표적인 데스크톱/CLI MCP 클라이언트의 흐름을 단계별로 이해합니다.
- 실전 적용 시 기준과 주의점을 확인합니다.
실무 팁
- 입력/출력 예시를 고정해 재현성을 확보하세요.
- MCP 클라이언트 범위를 작게 잡고 단계적으로 확장하세요.
- 클라이언트별 설정 조건을 문서화해 대응 시간을 줄이세요.