MCP 클라이언트

MCP 클라이언트는 MCP 서버와 통신하여 AI 모델에게 도구와 리소스를 제공합니다. 이 문서에서는 대표 예시로 Claude Desktop, Claude CLI, 그리고 커스텀 클라이언트를 통해 MCP 클라이언트의 구조와 사용법을 설명합니다.

업데이트 안내: 모델/요금/버전/정책 등 시점에 민감한 정보는 변동될 수 있습니다. 최신 내용은 공식 문서를 확인하세요.
빠른 시작
  • 데스크톱 앱 예시: Claude Desktop에서 설정 파일로 서버 추가
  • CLI 예시: Claude CLI의 mcp 명령으로 서버 관리
  • 커스텀 클라이언트: SDK로 직접 구현

데스크톱 클라이언트 예시: Claude Desktop

설정

대표적인 데스크톱 클라이언트 예시인 Claude Desktop은 MCP 서버를 설정 파일을 통해 관리합니다.

설정 파일 위치

OS 경로
macOS ~/Library/Application Support/Claude/claude_desktop_config.json
Windows %APPDATA%\Claude\claude_desktop_config.json
Linux ~/.config/Claude/claude_desktop_config.json

설정 파일 구조

JSON
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["-m", "mcp_server_filesystem"]
    },
    "postgres": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-postgres"],
      "env": {
        "DATABASE_URL": "postgresql://user:password@localhost/mydb"
      }
    },
    "github": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-github"],
      "env": {
        "GITHUB_TOKEN": "ghp_your_token_here"
      }
    }
  }
}

설정 예제

1. 파일시스템 서버 (Python)

JSON
{
  "mcpServers": {
    "filesystem": {
      "command": "uv",
      "args": [
        "--directory",
        "/path/to/mcp-server-filesystem",
        "run",
        "mcp-server-filesystem"
      ]
    }
  }
}

2. SQLite 서버

JSON
{
  "mcpServers": {
    "sqlite": {
      "command": "npx",
      "args": [
        "-y",
        "@modelcontextprotocol/server-sqlite",
        "/path/to/database.db"
      ]
    }
  }
}

3. 여러 서버 동시 사용

JSON
{
  "mcpServers": {
    "filesystem": {
      "command": "python",
      "args": ["-m", "mcp_server_filesystem"],
      "env": {
        "ALLOWED_DIRS": "/home/user/Documents,/home/user/Projects"
      }
    },
    "github": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-github"],
      "env": {
        "GITHUB_TOKEN": "ghp_your_token"
      }
    },
    "postgres": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-postgres"],
      "env": {
        "DATABASE_URL": "postgresql://localhost/myapp"
      }
    },
    "slack": {
      "command": "npx",
      "args": ["-y", "@modelcontextprotocol/server-slack"],
      "env": {
        "SLACK_BOT_TOKEN": "xoxb-your-token",
        "SLACK_TEAM_ID": "T1234567"
      }
    }
  }
}

사용 방법

예시: Claude Desktop에서 MCP 서버 사용하기
  1. 설정 파일 편집 후 Claude Desktop 재시작
  2. 새 대화 시작
  3. 서버가 제공하는 도구를 자연어로 요청
  4. 클라이언트가 자동으로 적절한 도구 선택 및 실행

예제 대화

대화
사용자: 내 프로젝트 디렉토리에 있는 모든 Python 파일을 찾아줘

AI 앱: [filesystem 서버의 search_files 도구 사용]
다음 Python 파일을 찾았습니다:
- /home/user/Projects/app.py
- /home/user/Projects/utils.py
- /home/user/Projects/tests/test_app.py

사용자: app.py 파일의 내용을 보여줘

AI 앱: [filesystem 서버의 read_file 도구 사용]
app.py 파일의 내용입니다:
[파일 내용 표시]

사용자: GitHub 저장소의 open 이슈를 확인해줘

AI 앱: [github 서버의 list_issues 도구 사용]
현재 open 상태인 이슈 목록입니다:
1. #42: 로그인 버그 수정
2. #43: 성능 개선 필요
...

문제 해결

문제 원인 해결
서버가 연결되지 않음 잘못된 명령어/경로 명령어와 경로 확인
환경 변수 인식 안 됨 env 설정 누락 env 섹션 추가
도구가 표시되지 않음 서버 초기화 실패 로그 확인 (개발자 도구)
권한 오류 파일/디렉토리 접근 권한 서버 허용 목록 확인
보안 주의사항
  • 설정 파일에 API 토큰/패스워드를 직접 저장하지 마세요
  • 환경 변수나 시크릿 관리 도구 사용 권장
  • 파일시스템 서버는 허용 디렉토리 제한 필수

CLI 클라이언트 예시: Claude CLI

설치

Bash
# npm으로 설치
npm install -g @anthropic-ai/claude-cli

# 또는 Homebrew (macOS)
brew install claude-cli

설정

Bash
# API 키 설정
claude auth login

# 또는 환경 변수
export ANTHROPIC_API_KEY=sk-ant-your-key-here

MCP 서버 사용

서버 추가

Bash
# MCP 서버 추가
claude mcp add filesystem python -m mcp_server_filesystem

# 환경 변수와 함께 추가
claude mcp add github npx -y @modelcontextprotocol/server-github \
  --env GITHUB_TOKEN=ghp_your_token

# 서버 목록 확인
claude mcp list

# 서버 제거
claude mcp remove filesystem

대화 시작

Bash
# MCP 서버와 함께 대화 시작
claude chat --mcp filesystem,github

# 또는 모든 서버 사용
claude chat --mcp all

# 일회성 질문
claude ask "프로젝트의 README.md 파일 내용을 보여줘" --mcp filesystem

CLI 사용 예제

Bash
# 파일 검색 및 분석
claude ask "src/ 디렉토리의 모든 TypeScript 파일을 분석하고 \
주요 함수 목록을 만들어줘" --mcp filesystem

# 데이터베이스 쿼리
claude ask "users 테이블에서 최근 7일간 가입한 사용자 수를 알려줘" \
  --mcp postgres

# GitHub 이슈 생성
claude ask "버그 리포트: 로그인 시 세션 만료 문제, \
우선순위 높음으로 이슈 생성해줘" --mcp github

# 여러 서버 조합
claude chat --mcp filesystem,github,slack

커스텀 클라이언트 개발

Python 클라이언트

설치

Bash
pip install mcp
# 또는
uv add mcp

기본 클라이언트

Python
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport

async def main():
    # 클라이언트 생성
    client = Client(
        {"name": "my-client", "version": "1.0.0"},
        {"capabilities": {"tools": {}}}
    )

    # 서버 연결 (stdio)
    transport = StdioClientTransport({
        "command": "python",
        "args": ["-m", "mcp_server_filesystem"]
    })

    await client.connect(transport)

    try:
        # 도구 목록 조회
        tools = await client.list_tools()
        print("Available tools:")
        for tool in tools:
            print(f"  - {tool['name']}: {tool['description']}")

        # 도구 호출
        result = await client.call_tool("read_file", {
            "path": "/path/to/file.txt"
        })
        print("File content:", result)

    finally:
        await client.close()

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

리소스 읽기

Python
# 리소스 목록
resources = await client.list_resources()
for resource in resources:
    print(f"Resource: {resource['uri']}")

# 리소스 읽기
content = await client.read_resource("file:///path/to/doc.txt")
print(content)

프롬프트 사용

Python
# 프롬프트 목록
prompts = await client.list_prompts()
for prompt in prompts:
    print(f"Prompt: {prompt['name']}")

# 프롬프트 가져오기
messages = await client.get_prompt("code-review", {
    "language": "python",
    "code": "def hello(): return 'world'"
})
print(messages)

TypeScript 클라이언트

설치

Bash
npm install @modelcontextprotocol/sdk

기본 클라이언트

TypeScript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

async function main() {
  // 클라이언트 생성
  const client = new Client(
    {
      name: "my-client",
      version: "1.0.0"
    },
    {
      capabilities: {
        tools: {}
      }
    }
  );

  // 서버 연결
  const transport = new StdioClientTransport({
    command: "node",
    args: ["dist/server.js"]
  });

  await client.connect(transport);

  try {
    // 도구 목록
    const { tools } = await client.listTools();
    console.log("Available tools:", tools);

    // 도구 호출
    const result = await client.callTool("read_file", {
      path: "/path/to/file.txt"
    });
    console.log("Result:", result);

  } finally {
    await client.close();
  }
}

main().catch(console.error);

LLM SDK와 통합 예시 (Anthropic SDK)

Python
import anthropic
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport

async def chat_with_mcp():
    # MCP 클라이언트 설정
    mcp_client = Client(
        {"name": "mcp-chat-client", "version": "1.0.0"},
        {"capabilities": {"tools": {}}}
    )

    transport = StdioClientTransport({
        "command": "python",
        "args": ["-m", "mcp_server_filesystem"]
    })

    await mcp_client.connect(transport)

    # MCP 도구를 Anthropic Tool 형식으로 변환
    mcp_tools = await mcp_client.list_tools()
    anthropic_tools = []

    for tool in mcp_tools:
        anthropic_tools.append({
            "name": tool["name"],
            "description": tool["description"],
            "input_schema": tool["inputSchema"]
        })

    # Anthropic 클라이언트
    client = anthropic.Anthropic()

    # 대화 시작
    messages = [{
        "role": "user",
        "content": "README.md 파일을 읽어줘"
    }]

    response = client.messages.create(
        model="claude-4-5",
        max_tokens=4096,
        messages=messages,
        tools=anthropic_tools
    )

    # Tool use 처리
    while response.stop_reason == "tool_use":
        tool_use = response.content[-1]

        # MCP 서버에서 도구 실행
        tool_result = await mcp_client.call_tool(
            tool_use.name,
            tool_use.input
        )

        # 결과를 메시지에 추가
        messages.append({"role": "assistant", "content": response.content})
        messages.append({
            "role": "user",
            "content": [{
                "type": "tool_result",
                "tool_use_id": tool_use.id,
                "content": tool_result
            }]
        })

        # 다음 응답
        response = client.messages.create(
            model="claude-4-5",
            max_tokens=4096,
            messages=messages,
            tools=anthropic_tools
        )

    print(response.content)

    await mcp_client.close()

import asyncio
asyncio.run(chat_with_mcp())

클라이언트 Capabilities

Capability 선언

Python
client = Client(
    {"name": "my-client", "version": "1.0.0"},
    {
        "capabilities": {
            "tools": {},                    # 도구 호출 지원
            "resources": {
                "subscribe": True          # 리소스 변경 구독
            },
            "prompts": {},                  # 프롬프트 사용
            "sampling": {}                  # 샘플링 요청 수신
        }
    }
)

리소스 구독

Python
# 리소스 변경 구독
await client.subscribe_resource("file:///path/to/watched.txt")

# 리소스 변경 알림 수신
@client.on_resource_updated
async def handle_update(uri):
    print(f"Resource updated: {uri}")
    content = await client.read_resource(uri)
    print("New content:", content)

# 구독 해제
await client.unsubscribe_resource("file:///path/to/watched.txt")

에러 처리

Python
from mcp.client import McpError

try:
    result = await client.call_tool("read_file", {
        "path": "/nonexistent.txt"
    })
except McpError as e:
    if e.code == -32001:
        print("File not found")
    elif e.code == -32002:
        print("Permission denied")
    else:
        print(f"Error: {e.message}")
except Exception as e:
    print(f"Unexpected error: {e}")

모범 사례

클라이언트 개발 권장사항
  1. 연결 관리: try-finally로 확실한 종료 보장
  2. 에러 처리: 모든 도구 호출에 에러 처리 구현
  3. 타임아웃: 장시간 실행 작업에 타임아웃 설정
  4. 재시도: 일시적 오류 시 재시도 로직
  5. 로깅: 디버깅을 위한 상세 로깅

연결 풀링

Python
class McpClientPool:
    def __init__(self, server_config, pool_size=5):
        self.server_config = server_config
        self.pool_size = pool_size
        self.clients = []

    async def initialize(self):
        for _ in range(self.pool_size):
            client = await self._create_client()
            self.clients.append(client)

    async def _create_client(self):
        client = Client(
            {"name": "pooled-client", "version": "1.0.0"},
            {"capabilities": {"tools": {}}}
        )
        transport = StdioClientTransport(self.server_config)
        await client.connect(transport)
        return client

    async def get_client(self):
        if self.clients:
            return self.clients.pop()
        return await self._create_client()

    async def return_client(self, client):
        if len(self.clients) < self.pool_size:
            self.clients.append(client)
        else:
            await client.close()

다음 단계

MCP 클라이언트 아키텍처

MCP 클라이언트는 호스트 애플리케이션 내부에서 동작하며, 트랜스포트 계층을 통해 하나 이상의 MCP 서버와 통신합니다. 아래 다이어그램은 클라이언트의 전체 아키텍처와 1:N 서버 연결 구조를 보여줍니다.

Host Application (AI 앱) LLM / AI 모델 Claude, GPT 등 MCP Client 프로토콜 협상 / 메시지 라우팅 Capability 관리 Transport Layer stdio | SSE | Streamable HTTP MCP Server 1 - Filesystem Tools: read_file, write_file, search Resources: file:// MCP Server 2 - Database Tools: query, insert, update Resources: db://tables MCP Server 3 - GitHub Tools: list_issues, create_pr Resources: github://repos 메시지 흐름: Request / Response Notification 1 : N
핵심 개념
  • Host Application: MCP 클라이언트를 내장하는 AI 애플리케이션 (Claude Desktop, IDE 플러그인 등)
  • MCP Client: 프로토콜 협상, 메시지 직렬화/역직렬화, Capability 교환을 담당
  • Transport Layer: 실제 데이터 전송을 담당하며, stdio, SSE, Streamable HTTP 중 선택
  • 1:N 관계: 하나의 클라이언트가 여러 서버에 동시 연결 가능. 각 서버는 독립적 세션 유지

SSE 트랜스포트 클라이언트

SSE(Server-Sent Events) 트랜스포트는 HTTP 기반으로 원격 MCP 서버에 연결할 때 사용합니다. 클라이언트는 HTTP POST로 요청을 보내고, SSE 스트림으로 응답과 알림을 수신합니다.

SSE 기본 연결

TypeScript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";

async function connectSSE() {
  const client = new Client(
    { name: "sse-client", version: "1.0.0" },
    { capabilities: { tools: {}, resources: {} } }
  );

  // SSE 트랜스포트 생성 - 원격 서버 URL 지정
  const transport = new SSEClientTransport(
    new URL("http://localhost:3001/sse")
  );

  await client.connect(transport);
  console.log("SSE 서버에 연결됨");

  // 도구 목록 확인
  const { tools } = await client.listTools();
  console.log("사용 가능한 도구:", tools.map(t => t.name));

  return client;
}

SSE 이벤트 처리

TypeScript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";

async function sseWithEventHandling() {
  const client = new Client(
    { name: "sse-event-client", version: "1.0.0" },
    { capabilities: { tools: {}, resources: { subscribe: true } } }
  );

  // 서버로부터의 알림 처리
  client.setNotificationHandler(
    "notifications/tools/list_changed",
    async () => {
      console.log("도구 목록이 변경되었습니다. 갱신 중...");
      const { tools } = await client.listTools();
      console.log("갱신된 도구:", tools);
    }
  );

  client.setNotificationHandler(
    "notifications/resources/updated",
    async (notification) => {
      const uri = notification.params.uri;
      console.log(`리소스 변경 감지: ${uri}`);
      const content = await client.readResource({ uri });
      console.log("새 내용:", content);
    }
  );

  const transport = new SSEClientTransport(
    new URL("http://localhost:3001/sse")
  );

  await client.connect(transport);

  // 리소스 구독
  await client.subscribeResource({ uri: "file:///config.json" });
  console.log("리소스 구독 완료");

  return client;
}

SSE 재연결 로직

TypeScript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { SSEClientTransport } from "@modelcontextprotocol/sdk/client/sse.js";

class ResilientSSEClient {
  private client: Client | null = null;
  private serverUrl: string;
  private maxRetries: number;
  private retryCount = 0;

  constructor(serverUrl: string, maxRetries = 5) {
    this.serverUrl = serverUrl;
    this.maxRetries = maxRetries;
  }

  async connect(): Promise<Client> {
    while (this.retryCount < this.maxRetries) {
      try {
        this.client = new Client(
          { name: "resilient-sse", version: "1.0.0" },
          { capabilities: { tools: {} } }
        );

        const transport = new SSEClientTransport(
          new URL(this.serverUrl)
        );

        // 연결 종료 시 자동 재연결
        transport.onclose = async () => {
          console.log("연결 종료 감지, 재연결 시도...");
          await this.reconnect();
        };

        transport.onerror = async (error) => {
          console.error("트랜스포트 오류:", error);
        };

        await this.client.connect(transport);
        this.retryCount = 0; // 성공 시 카운터 리셋
        console.log("SSE 서버 연결 성공");
        return this.client;

      } catch (error) {
        this.retryCount++;
        const delay = Math.min(
          1000 * Math.pow(2, this.retryCount),
          30000
        );
        console.log(
          `연결 실패 (${this.retryCount}/${this.maxRetries}). ` +
          `${delay}ms 후 재시도...`
        );
        await new Promise(r => setTimeout(r, delay));
      }
    }
    throw new Error("최대 재연결 횟수 초과");
  }

  private async reconnect(): Promise<void> {
    try {
      await this.client?.close();
    } catch { /* 무시 */ }
    await this.connect();
  }

  async close(): Promise<void> {
    this.maxRetries = 0; // 재연결 방지
    await this.client?.close();
  }
}

Streamable HTTP 클라이언트

Streamable HTTP는 MCP 프로토콜의 최신 트랜스포트 방식으로, SSE의 한계를 극복하고 양방향 스트리밍과 세션 관리를 지원합니다. 기존 SSE 대비 더 유연한 연결 관리와 상태 추적이 가능합니다.

기본 연결

TypeScript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from
  "@modelcontextprotocol/sdk/client/streamableHttp.js";

async function connectStreamableHTTP() {
  const client = new Client(
    { name: "streamable-http-client", version: "1.0.0" },
    {
      capabilities: {
        tools: {},
        resources: { subscribe: true },
        sampling: {}
      }
    }
  );

  // Streamable HTTP 트랜스포트 생성
  const transport = new StreamableHTTPClientTransport(
    new URL("http://localhost:3001/mcp")
  );

  await client.connect(transport);
  console.log("Streamable HTTP 서버에 연결됨");

  // 서버 정보 확인
  const serverInfo = client.getServerVersion();
  console.log("서버:", serverInfo?.name, serverInfo?.version);

  return client;
}

세션 관리

Streamable HTTP 트랜스포트는 서버가 발급하는 세션 ID를 통해 상태를 유지합니다. 클라이언트는 Mcp-Session-Id 헤더를 통해 세션을 추적합니다.

TypeScript
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StreamableHTTPClientTransport } from
  "@modelcontextprotocol/sdk/client/streamableHttp.js";

class SessionManagedClient {
  private client: Client;
  private transport: StreamableHTTPClientTransport;
  private serverUrl: string;

  constructor(serverUrl: string) {
    this.serverUrl = serverUrl;
    this.client = new Client(
      { name: "session-client", version: "1.0.0" },
      { capabilities: { tools: {}, sampling: {} } }
    );
    this.transport = new StreamableHTTPClientTransport(
      new URL(serverUrl)
    );
  }

  async connect(): Promise<void> {
    await this.client.connect(this.transport);
    console.log("세션 연결 완료");
  }

  async callToolSafely(
    toolName: string,
    args: Record<string, unknown>
  ) {
    try {
      const result = await this.client.callTool({
        name: toolName,
        arguments: args
      });
      return result;
    } catch (error: any) {
      // 세션 만료 시 재연결
      if (error.code === -32001) {
        console.log("세션 만료, 재연결 중...");
        this.transport = new StreamableHTTPClientTransport(
          new URL(this.serverUrl)
        );
        await this.client.connect(this.transport);
        return await this.client.callTool({
          name: toolName,
          arguments: args
        });
      }
      throw error;
    }
  }

  async disconnect(): Promise<void> {
    await this.client.close();
    console.log("세션 종료 완료");
  }
}

// 사용 예
const client = new SessionManagedClient("http://localhost:3001/mcp");
await client.connect();

const result = await client.callToolSafely("read_file", {
  path: "/data/config.json"
});
console.log(result);

await client.disconnect();
SSE vs Streamable HTTP
특성 SSE Streamable HTTP
세션 관리 없음 (연결 기반) 명시적 세션 ID
재연결 클라이언트 구현 필요 세션 ID로 복원 가능
스트리밍 서버 -> 클라이언트만 양방향 지원
상태 Deprecated (권장하지 않음) 최신 표준 권장

클라이언트 생명주기 관리

MCP 클라이언트는 명확한 상태 전이를 따릅니다. 안정적인 운영을 위해 각 상태에서의 동작과 전이 조건을 이해해야 합니다.

상태 머신 다이어그램

초기화 connect() 연결 중 핸드셰이크 initialized 사용 중 도구 호출 / 리소스 읽기 알림 수신 연결 끊김 재연결 backoff 대기 재시도 close() 최대 재시도 종료 연결 실패

재시도 + Exponential Backoff 클라이언트

Python
import asyncio
import logging
from enum import Enum
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport

logger = logging.getLogger(__name__)

class ClientState(Enum):
    INIT = "init"
    CONNECTING = "connecting"
    ACTIVE = "active"
    RECONNECTING = "reconnecting"
    CLOSED = "closed"

class ResilientMcpClient:
    """재시도 로직과 exponential backoff를 갖춘 MCP 클라이언트 래퍼"""

    def __init__(
        self,
        server_command: str,
        server_args: list[str],
        max_retries: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
    ):
        self.server_command = server_command
        self.server_args = server_args
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.state = ClientState.INIT
        self.client: Client | None = None
        self._retry_count = 0

    async def connect(self) -> None:
        """서버에 연결 (실패 시 exponential backoff 재시도)"""
        self.state = ClientState.CONNECTING

        while self._retry_count <= self.max_retries:
            try:
                self.client = Client(
                    {"name": "resilient-client", "version": "1.0.0"},
                    {"capabilities": {"tools": {}, "sampling": {}}}
                )
                transport = StdioClientTransport({
                    "command": self.server_command,
                    "args": self.server_args,
                })
                await self.client.connect(transport)

                self.state = ClientState.ACTIVE
                self._retry_count = 0
                logger.info("MCP 서버 연결 성공")
                return

            except Exception as e:
                self._retry_count += 1
                delay = min(
                    self.base_delay * (2 ** self._retry_count),
                    self.max_delay
                )
                logger.warning(
                    f"연결 실패 ({self._retry_count}/{self.max_retries}): {e}"
                )
                logger.info(f"{delay:.1f}초 후 재시도...")
                await asyncio.sleep(delay)

        self.state = ClientState.CLOSED
        raise ConnectionError("최대 재시도 횟수 초과")

    async def call_tool_safe(
        self, name: str, arguments: dict
    ) -> dict:
        """도구 호출 (연결 끊김 시 자동 재연결)"""
        try:
            return await self.client.call_tool(name, arguments)
        except Exception as e:
            logger.warning(f"도구 호출 실패: {e}, 재연결 시도")
            self.state = ClientState.RECONNECTING
            await self.connect()
            return await self.client.call_tool(name, arguments)

    async def close(self) -> None:
        """Graceful shutdown"""
        if self.client and self.state == ClientState.ACTIVE:
            try:
                await self.client.close()
                logger.info("클라이언트 정상 종료")
            except Exception as e:
                logger.error(f"종료 중 오류: {e}")
        self.state = ClientState.CLOSED

    async def __aenter__(self):
        await self.connect()
        return self

    async def __aexit__(self, *args):
        await self.close()

# 사용 예
async def main():
    async with ResilientMcpClient(
        server_command="python",
        server_args=["-m", "mcp_server_filesystem"],
        max_retries=3,
    ) as client:
        tools = await client.client.list_tools()
        print("도구 목록:", tools)

        result = await client.call_tool_safe(
            "read_file", {"path": "/etc/hostname"}
        )
        print(result)

멀티 서버 클라이언트

실제 운영 환경에서는 하나의 AI 애플리케이션이 여러 MCP 서버를 동시에 사용합니다. 멀티 서버 클라이언트는 도구 라우팅, 네임스페이스 충돌 해결, 서버 헬스체크 등을 통합적으로 관리합니다.

서버 통합 관리

Python
import asyncio
import logging
from dataclasses import dataclass, field
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport

logger = logging.getLogger(__name__)

@dataclass
class ServerConfig:
    """MCP 서버 설정"""
    name: str
    command: str
    args: list[str] = field(default_factory=list)
    env: dict[str, str] = field(default_factory=dict)
    namespace: str = ""  # 도구 이름 접두사

class MultiServerClient:
    """여러 MCP 서버를 통합 관리하는 클라이언트"""

    def __init__(self):
        self.servers: dict[str, Client] = {}
        self.tool_map: dict[str, str] = {}  # 도구명 -> 서버명
        self.configs: dict[str, ServerConfig] = {}
        self.health: dict[str, bool] = {}

    async def add_server(self, config: ServerConfig) -> None:
        """서버를 추가하고 연결"""
        client = Client(
            {"name": f"multi-client-{config.name}",
             "version": "1.0.0"},
            {"capabilities": {"tools": {}, "resources": {}}}
        )

        transport = StdioClientTransport({
            "command": config.command,
            "args": config.args,
            "env": config.env,
        })

        await client.connect(transport)
        self.servers[config.name] = client
        self.configs[config.name] = config
        self.health[config.name] = True

        # 도구 목록 등록 (네임스페이스 적용)
        await self._register_tools(config.name, client, config.namespace)
        logger.info(f"서버 '{config.name}' 연결 완료")

    async def _register_tools(
        self, server_name: str, client: Client, namespace: str
    ) -> None:
        """서버의 도구를 등록하고 충돌 시 네임스페이스 접두사 추가"""
        tools = await client.list_tools()
        prefix = f"{namespace}_" if namespace else ""

        for tool in tools:
            tool_name = f"{prefix}{tool['name']}"

            # 충돌 감지 및 자동 접두사 추가
            if tool_name in self.tool_map:
                existing = self.tool_map[tool_name]
                logger.warning(
                    f"도구 이름 충돌: '{tool_name}' "
                    f"({existing} vs {server_name})"
                )
                tool_name = f"{server_name}_{tool['name']}"

            self.tool_map[tool_name] = server_name

    async def call_tool(
        self, tool_name: str, arguments: dict
    ) -> dict:
        """적절한 서버로 도구 호출을 라우팅"""
        if tool_name not in self.tool_map:
            raise ValueError(f"등록되지 않은 도구: {tool_name}")

        server_name = self.tool_map[tool_name]

        # 서버 상태 확인
        if not self.health.get(server_name, False):
            logger.info(f"서버 '{server_name}' 비정상, 재연결 시도")
            await self.reconnect_server(server_name)

        # 원본 도구 이름 추출 (접두사 제거)
        config = self.configs[server_name]
        prefix = f"{config.namespace}_" if config.namespace else ""
        original_name = tool_name.removeprefix(prefix)

        client = self.servers[server_name]
        return await client.call_tool(original_name, arguments)

    async def list_all_tools(self) -> list[dict]:
        """모든 서버의 도구 목록 통합 반환"""
        all_tools = []
        for tool_name, server_name in self.tool_map.items():
            all_tools.append({
                "name": tool_name,
                "server": server_name,
            })
        return all_tools

헬스체크 및 자동 재연결

Python
class MultiServerClient(MultiServerClient):
    # (위 클래스에 추가되는 메서드들)

    async def health_check(self) -> dict[str, bool]:
        """모든 서버의 상태를 점검"""
        results = {}
        for name, client in self.servers.items():
            try:
                await asyncio.wait_for(
                    client.list_tools(), timeout=5.0
                )
                results[name] = True
            except Exception:
                results[name] = False
                logger.warning(f"서버 '{name}' 헬스체크 실패")

        self.health = results
        return results

    async def reconnect_server(self, server_name: str) -> None:
        """특정 서버 재연결"""
        if server_name not in self.configs:
            raise ValueError(f"알 수 없는 서버: {server_name}")

        # 기존 연결 정리
        try:
            await self.servers[server_name].close()
        except Exception:
            pass

        # 재연결
        config = self.configs[server_name]
        await self.add_server(config)
        logger.info(f"서버 '{server_name}' 재연결 성공")

    async def start_health_monitor(
        self, interval: float = 30.0
    ) -> None:
        """주기적 헬스체크 및 자동 재연결"""
        while True:
            await asyncio.sleep(interval)
            health = await self.health_check()

            for name, is_healthy in health.items():
                if not is_healthy:
                    logger.info(f"서버 '{name}' 비정상, 자동 재연결")
                    try:
                        await self.reconnect_server(name)
                    except Exception as e:
                        logger.error(
                            f"서버 '{name}' 재연결 실패: {e}"
                        )

    async def close_all(self) -> None:
        """모든 서버 연결 종료"""
        for name, client in self.servers.items():
            try:
                await client.close()
                logger.info(f"서버 '{name}' 종료")
            except Exception as e:
                logger.error(f"서버 '{name}' 종료 실패: {e}")
        self.servers.clear()
        self.tool_map.clear()

# 사용 예
async def main():
    manager = MultiServerClient()

    # 여러 서버 등록
    await manager.add_server(ServerConfig(
        name="fs",
        command="python",
        args=["-m", "mcp_server_filesystem"],
        namespace="fs",
    ))
    await manager.add_server(ServerConfig(
        name="github",
        command="npx",
        args=["-y", "@modelcontextprotocol/server-github"],
        env={"GITHUB_TOKEN": "ghp_..."},
        namespace="gh",
    ))

    # 헬스 모니터 백그라운드 실행
    monitor = asyncio.create_task(
        manager.start_health_monitor(interval=30)
    )

    # 통합 도구 목록
    tools = await manager.list_all_tools()
    print("전체 도구:", tools)

    # 라우팅된 도구 호출
    result = await manager.call_tool(
        "fs_read_file", {"path": "/README.md"}
    )
    print(result)

    monitor.cancel()
    await manager.close_all()

Sampling 구현

Sampling은 MCP 프로토콜의 고급 기능으로, 서버가 클라이언트에게 LLM 호출을 요청하는 역방향 패턴입니다. 일반적으로 MCP에서는 클라이언트가 서버에게 도구 실행을 요청하지만, Sampling에서는 서버가 "이 프롬프트로 LLM에게 물어봐 달라"고 클라이언트에게 요청합니다.

Sampling이 필요한 경우
  • 서버 측에서 동적으로 LLM 판단이 필요한 워크플로 (예: 코드 분석 후 리팩토링 제안)
  • 에이전트 루프: 서버가 도구 실행 결과를 보고 다음 행동을 LLM에게 질의
  • 사용자의 명시적 동의 하에 서버가 AI 기능을 활용하는 경우

Sampling 흐름

MCP Server MCP Client LLM 1. sampling/createMessage 2. LLM API 호출 3. 응답 반환 4. 결과 전달

Sampling Request Handler 구현

Python
import anthropic
from mcp.client import Client
from mcp.client.stdio import StdioClientTransport
from mcp.types import (
    CreateMessageRequest,
    CreateMessageResult,
    SamplingMessage,
    TextContent,
)

class SamplingEnabledClient:
    """Sampling 요청을 처리할 수 있는 MCP 클라이언트"""

    def __init__(self, anthropic_api_key: str):
        self.anthropic = anthropic.Anthropic(
            api_key=anthropic_api_key
        )
        self.client: Client | None = None

    async def _handle_sampling(
        self, request: CreateMessageRequest
    ) -> CreateMessageResult:
        """서버로부터의 sampling 요청을 처리"""
        # 1. MCP 메시지를 Anthropic API 형식으로 변환
        messages = []
        for msg in request.params.messages:
            messages.append({
                "role": msg.role,
                "content": msg.content.text
                    if hasattr(msg.content, "text")
                    else str(msg.content),
            })

        # 2. 사용자에게 승인 요청 (보안을 위해 권장)
        print("[Sampling 요청 수신]")
        print(f"  모델: {request.params.modelPreferences}")
        print(f"  메시지: {messages}")

        # 3. LLM API 호출
        response = self.anthropic.messages.create(
            model=request.params.modelPreferences.hints[0].name
                if request.params.modelPreferences
                else "claude-4-5-haiku",
            max_tokens=request.params.maxTokens or 1024,
            system=request.params.systemPrompt or "",
            messages=messages,
        )

        # 4. 결과를 MCP 형식으로 변환하여 반환
        result_text = response.content[0].text
        return CreateMessageResult(
            role="assistant",
            content=TextContent(
                type="text",
                text=result_text,
            ),
            model=response.model,
        )

    async def connect(
        self, command: str, args: list[str]
    ) -> None:
        """서버 연결 및 sampling 핸들러 등록"""
        self.client = Client(
            {"name": "sampling-client", "version": "1.0.0"},
            {
                "capabilities": {
                    "tools": {},
                    "sampling": {},  # sampling capability 선언
                }
            }
        )

        # Sampling 핸들러 등록
        self.client.set_sampling_handler(self._handle_sampling)

        transport = StdioClientTransport({
            "command": command,
            "args": args,
        })

        await self.client.connect(transport)
        print("Sampling 지원 클라이언트 연결 완료")

    async def close(self) -> None:
        if self.client:
            await self.client.close()

# 사용 예
async def main():
    client = SamplingEnabledClient(
        anthropic_api_key="sk-ant-..."
    )
    await client.connect(
        command="python",
        args=["-m", "my_agent_server"]
    )

    # 서버가 작업 중 LLM 판단이 필요하면
    # 자동으로 _handle_sampling이 호출됨
    result = await client.client.call_tool(
        "analyze_and_suggest",
        {"file_path": "/src/main.py"}
    )
    print("분석 결과:", result)

    await client.close()
Sampling 보안 고려사항
  • 사용자 승인: 프로덕션 환경에서는 반드시 사용자에게 sampling 요청을 승인받아야 합니다. 서버가 임의로 LLM 호출을 유발할 수 있으므로 비용과 보안 위험이 있습니다.
  • 모델 제한: 서버가 요청하는 모델과 max_tokens를 클라이언트가 검증하고 제한해야 합니다.
  • 프롬프트 인젝션: 서버가 보내는 system prompt와 messages를 검증하여 악의적 프롬프트를 차단해야 합니다.
  • 비용 관리: sampling 호출 횟수와 토큰 사용량을 추적하고 한도를 설정하세요.

핵심 정리

  • MCP 클라이언트의 핵심 개념과 흐름을 정리합니다.
  • 대표적인 데스크톱/CLI MCP 클라이언트의 흐름을 단계별로 이해합니다.
  • 실전 적용 시 기준과 주의점을 확인합니다.

실무 팁

  • 입력/출력 예시를 고정해 재현성을 확보하세요.
  • MCP 클라이언트 범위를 작게 잡고 단계적으로 확장하세요.
  • 클라이언트별 설정 조건을 문서화해 대응 시간을 줄이세요.