Model Context Protocol (MCP): Building Extensible AI Tool Ecosystems

The Model Context Protocol (MCP) represents a paradigm shift in how AI systems access and interact with external tools and data sources. Developed by Anthropic, MCP provides a standardized way for AI models to securely and efficiently connect with databases, APIs, file systems, and custom business logic. For production AI systems, MCP offers the promise of building truly extensible ecosystems where new capabilities can be added dynamically without core system modifications.

This comprehensive guide explores the architectural principles, implementation strategies, and production deployment patterns for building robust MCP-based tool ecosystems, drawing from real-world experience implementing MCP servers and clients in enterprise environments.

Understanding MCP Architecture

MCP operates on a client-server model where AI applications (clients) communicate with tool providers (servers) through a standardized protocol. This architecture enables loose coupling, independent scaling, and secure sandboxing of tool execution.

MCP Ecosystem Architecture:

┌─────────────────────────────────────────────────────────────┐
│ AI Application Layer (MCP Clients)                         │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │
│ │ Chat        │ │ Agent       │ │ Workflow    │            │
│ │ Interface   │ │ System      │ │ Engine      │            │
│ └─────────────┘ └─────────────┘ └─────────────┘            │
└─────────────┬───────────────┬─────────────────┬─────────────┘
              │               │                 │
              v               v                 v
┌─────────────────────────────────────────────────────────────┐
│ MCP Transport Layer                                         │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │
│ │ WebSocket   │ │ HTTP/REST   │ │ gRPC        │            │
│ │ Transport   │ │ Transport   │ │ Transport   │            │
│ └─────────────┘ └─────────────┘ └─────────────┘            │
└─────────────────────────────────────────────────────────────┘
              │               │                 │
              v               v                 v
┌─────────────────────────────────────────────────────────────┐
│ MCP Server Layer (Tool Providers)                          │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │
│ │ Database    │ │ File System │ │ API Gateway │            │
│ │ Server      │ │ Server      │ │ Server      │            │
│ └─────────────┘ └─────────────┘ └─────────────┘            │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │
│ │ Git         │ │ Kubernetes  │ │ Custom      │            │
│ │ Server      │ │ Server      │ │ Business    │            │
│ │             │ │             │ │ Logic       │            │
│ └─────────────┘ └─────────────┘ └─────────────┘            │
└─────────────────────────────────────────────────────────────┘
              │               │                 │
              v               v                 v
┌─────────────────────────────────────────────────────────────┐
│ Resource Layer                                              │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │
│ │ PostgreSQL  │ │ Local Files │ │ REST APIs   │            │
│ │ Database    │ │ System      │ │             │            │
│ └─────────────┘ └─────────────┘ └─────────────┘            │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐            │
│ │ Git         │ │ Kubernetes  │ │ Enterprise  │            │
│ │ Repository  │ │ Cluster     │ │ Systems     │            │
│ └─────────────┘ └─────────────┘ └─────────────┘            │
└─────────────────────────────────────────────────────────────┘

Production MCP Server Implementation

Here’s a comprehensive implementation of a production-ready MCP server that handles multiple tool types:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
import asyncio
import json
import logging
from typing import Dict, List, Optional, Any, Union, AsyncGenerator
from dataclasses import dataclass, asdict
from enum import Enum
import uuid
from abc import ABC, abstractmethod
import websockets
import httpx
from contextlib import asynccontextmanager

# MCP Protocol Types
class MessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    NOTIFICATION = "notification"

class MCPMethod(Enum):
    INITIALIZE = "initialize"
    LIST_TOOLS = "tools/list"
    CALL_TOOL = "tools/call"
    LIST_RESOURCES = "resources/list"
    READ_RESOURCE = "resources/read"
    LIST_PROMPTS = "prompts/list"
    GET_PROMPT = "prompts/get"

@dataclass
class MCPMessage:
    """Base MCP message structure"""
    jsonrpc: str = "2.0"
    id: Optional[Union[str, int]] = None
    method: Optional[str] = None
    params: Optional[Dict[str, Any]] = None
    result: Optional[Any] = None
    error: Optional[Dict[str, Any]] = None

@dataclass
class MCPTool:
    """MCP tool definition"""
    name: str
    description: str
    inputSchema: Dict[str, Any]

@dataclass
class MCPResource:
    """MCP resource definition"""
    uri: str
    name: str
    description: Optional[str] = None
    mimeType: Optional[str] = None

@dataclass 
class MCPPrompt:
    """MCP prompt template definition"""
    name: str
    description: str
    arguments: Optional[List[Dict[str, Any]]] = None

class MCPError(Exception):
    """MCP-specific error"""
    def __init__(self, code: int, message: str, data: Optional[Any] = None):
        self.code = code
        self.message = message
        self.data = data
        super().__init__(message)

class MCPToolProvider(ABC):
    """Abstract base class for MCP tool providers"""
    
    @abstractmethod
    async def get_tools(self) -> List[MCPTool]:
        """Return list of available tools"""
        pass
    
    @abstractmethod
    async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """Execute a tool with given arguments"""
        pass

class DatabaseToolProvider(MCPToolProvider):
    """Database operations tool provider"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.connection_pool = None
        
    async def initialize(self):
        """Initialize database connection pool"""
        import asyncpg
        
        self.connection_pool = await asyncpg.create_pool(
            host=self.config['host'],
            port=self.config['port'],
            user=self.config['user'],
            password=self.config['password'],
            database=self.config['database'],
            min_size=self.config.get('min_connections', 5),
            max_size=self.config.get('max_connections', 20)
        )
    
    async def get_tools(self) -> List[MCPTool]:
        """Return available database tools"""
        return [
            MCPTool(
                name="execute_query",
                description="Execute a SQL query and return results",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "query": {
                            "type": "string",
                            "description": "SQL query to execute"
                        },
                        "parameters": {
                            "type": "array",
                            "description": "Query parameters",
                            "items": {"type": "string"}
                        }
                    },
                    "required": ["query"]
                }
            ),
            MCPTool(
                name="get_schema",
                description="Get database schema information",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "table_name": {
                            "type": "string",
                            "description": "Optional table name to get schema for"
                        }
                    }
                }
            ),
            MCPTool(
                name="execute_transaction",
                description="Execute multiple queries in a transaction",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "queries": {
                            "type": "array",
                            "description": "List of SQL queries to execute",
                            "items": {
                                "type": "object",
                                "properties": {
                                    "query": {"type": "string"},
                                    "parameters": {"type": "array"}
                                }
                            }
                        }
                    },
                    "required": ["queries"]
                }
            )
        ]
    
    async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """Execute database tool"""
        
        if not self.connection_pool:
            raise MCPError(-32603, "Database not initialized")
        
        if name == "execute_query":
            return await self._execute_query(
                arguments.get("query"),
                arguments.get("parameters", [])
            )
        elif name == "get_schema":
            return await self._get_schema(arguments.get("table_name"))
        elif name == "execute_transaction":
            return await self._execute_transaction(arguments.get("queries", []))
        else:
            raise MCPError(-32601, f"Unknown tool: {name}")
    
    async def _execute_query(self, query: str, parameters: List[Any]) -> Dict[str, Any]:
        """Execute a single SQL query"""
        
        # Basic SQL injection protection
        if not self._validate_query(query):
            raise MCPError(-32602, "Query validation failed")
        
        async with self.connection_pool.acquire() as conn:
            try:
                if parameters:
                    result = await conn.fetch(query, *parameters)
                else:
                    result = await conn.fetch(query)
                
                # Convert result to JSON-serializable format
                rows = [dict(record) for record in result]
                
                return {
                    "rows": rows,
                    "row_count": len(rows),
                    "query": query
                }
                
            except Exception as e:
                raise MCPError(-32603, f"Query execution failed: {str(e)}")
    
    async def _get_schema(self, table_name: Optional[str]) -> Dict[str, Any]:
        """Get database schema information"""
        
        async with self.connection_pool.acquire() as conn:
            try:
                if table_name:
                    # Get schema for specific table
                    query = """
                        SELECT column_name, data_type, is_nullable, column_default
                        FROM information_schema.columns
                        WHERE table_name = $1
                        ORDER BY ordinal_position
                    """
                    result = await conn.fetch(query, table_name)
                    
                    return {
                        "table": table_name,
                        "columns": [dict(record) for record in result]
                    }
                else:
                    # Get all tables
                    query = """
                        SELECT table_name, table_type
                        FROM information_schema.tables
                        WHERE table_schema = 'public'
                        ORDER BY table_name
                    """
                    result = await conn.fetch(query)
                    
                    return {
                        "tables": [dict(record) for record in result]
                    }
                    
            except Exception as e:
                raise MCPError(-32603, f"Schema query failed: {str(e)}")
    
    def _validate_query(self, query: str) -> bool:
        """Basic SQL query validation"""
        
        query_lower = query.lower().strip()
        
        # Block potentially dangerous operations in production
        dangerous_keywords = [
            'drop table', 'drop database', 'truncate table',
            'delete from', 'alter table', 'create user',
            'grant', 'revoke', 'shutdown'
        ]
        
        for keyword in dangerous_keywords:
            if keyword in query_lower:
                return False
        
        return True

class FileSystemToolProvider(MCPToolProvider):
    """File system operations tool provider"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.allowed_paths = set(config.get('allowed_paths', []))
        self.max_file_size = config.get('max_file_size', 10 * 1024 * 1024)  # 10MB
    
    async def get_tools(self) -> List[MCPTool]:
        """Return available file system tools"""
        return [
            MCPTool(
                name="read_file",
                description="Read contents of a file",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "path": {
                            "type": "string",
                            "description": "Path to the file to read"
                        },
                        "encoding": {
                            "type": "string",
                            "description": "File encoding (default: utf-8)",
                            "default": "utf-8"
                        }
                    },
                    "required": ["path"]
                }
            ),
            MCPTool(
                name="write_file",
                description="Write contents to a file",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "path": {
                            "type": "string",
                            "description": "Path to the file to write"
                        },
                        "content": {
                            "type": "string",
                            "description": "Content to write to the file"
                        },
                        "encoding": {
                            "type": "string",
                            "description": "File encoding (default: utf-8)",
                            "default": "utf-8"
                        }
                    },
                    "required": ["path", "content"]
                }
            ),
            MCPTool(
                name="list_directory",
                description="List contents of a directory",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "path": {
                            "type": "string",
                            "description": "Path to the directory to list"
                        },
                        "recursive": {
                            "type": "boolean",
                            "description": "Whether to list recursively",
                            "default": False
                        }
                    },
                    "required": ["path"]
                }
            ),
            MCPTool(
                name="create_directory",
                description="Create a new directory",
                inputSchema={
                    "type": "object",
                    "properties": {
                        "path": {
                            "type": "string",
                            "description": "Path of the directory to create"
                        }
                    },
                    "required": ["path"]
                }
            )
        ]
    
    async def call_tool(self, name: str, arguments: Dict[str, Any]) -> Any:
        """Execute file system tool"""
        
        path = arguments.get("path", "")
        
        # Security: Validate path is allowed
        if not self._validate_path(path):
            raise MCPError(-32602, f"Access denied: Path not allowed: {path}")
        
        if name == "read_file":
            return await self._read_file(path, arguments.get("encoding", "utf-8"))
        elif name == "write_file":
            return await self._write_file(
                path, 
                arguments.get("content", ""),
                arguments.get("encoding", "utf-8")
            )
        elif name == "list_directory":
            return await self._list_directory(path, arguments.get("recursive", False))
        elif name == "create_directory":
            return await self._create_directory(path)
        else:
            raise MCPError(-32601, f"Unknown tool: {name}")
    
    def _validate_path(self, path: str) -> bool:
        """Validate that path is within allowed directories"""
        
        import os.path
        
        # Convert to absolute path
        abs_path = os.path.abspath(path)
        
        # Check if path starts with any allowed path
        for allowed_path in self.allowed_paths:
            abs_allowed = os.path.abspath(allowed_path)
            if abs_path.startswith(abs_allowed):
                return True
        
        return False
    
    async def _read_file(self, path: str, encoding: str) -> Dict[str, Any]:
        """Read file contents"""
        
        import aiofiles
        import os
        
        try:
            # Check file size
            file_size = os.path.getsize(path)
            if file_size > self.max_file_size:
                raise MCPError(-32602, f"File too large: {file_size} bytes")
            
            async with aiofiles.open(path, 'r', encoding=encoding) as f:
                content = await f.read()
            
            return {
                "path": path,
                "content": content,
                "size": file_size,
                "encoding": encoding
            }
            
        except FileNotFoundError:
            raise MCPError(-32602, f"File not found: {path}")
        except Exception as e:
            raise MCPError(-32603, f"Failed to read file: {str(e)}")

class ProductionMCPServer:
    """Production-ready MCP server implementation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.tool_providers: Dict[str, MCPToolProvider] = {}
        self.resource_providers: Dict[str, 'MCPResourceProvider'] = {}
        self.prompt_providers: Dict[str, 'MCPPromptProvider'] = {}
        
        # Server state
        self.server_info = {
            "name": config.get("server_name", "MCP Server"),
            "version": config.get("server_version", "1.0.0")
        }
        
        # Security and rate limiting
        self.rate_limiter = RateLimiter(config.get('rate_limit', {}))
        self.auth_handler = AuthHandler(config.get('auth', {}))
        
        # Metrics and monitoring
        self.metrics = MCPServerMetrics()
        
    def register_tool_provider(self, name: str, provider: MCPToolProvider):
        """Register a tool provider"""
        self.tool_providers[name] = provider
        
    def register_resource_provider(self, name: str, provider: 'MCPResourceProvider'):
        """Register a resource provider"""
        self.resource_providers[name] = provider
        
    def register_prompt_provider(self, name: str, provider: 'MCPPromptProvider'):
        """Register a prompt provider"""
        self.prompt_providers[name] = provider
    
    async def start(self, host: str = "localhost", port: int = 8080):
        """Start the MCP server"""
        
        # Initialize all providers
        await self._initialize_providers()
        
        # Start WebSocket server
        logger.info(f"Starting MCP server on {host}:{port}")
        
        async def handler(websocket, path):
            await self._handle_client_connection(websocket, path)
        
        server = await websockets.serve(
            handler,
            host,
            port,
            ping_interval=30,
            ping_timeout=10,
            close_timeout=10
        )
        
        logger.info("MCP server started successfully")
        return server
    
    async def _initialize_providers(self):
        """Initialize all registered providers"""
        
        for name, provider in self.tool_providers.items():
            try:
                if hasattr(provider, 'initialize'):
                    await provider.initialize()
                logger.info(f"Initialized tool provider: {name}")
            except Exception as e:
                logger.error(f"Failed to initialize tool provider {name}: {e}")
        
        # Initialize resource and prompt providers similarly...
    
    async def _handle_client_connection(self, websocket, path):
        """Handle a client WebSocket connection"""
        
        client_id = str(uuid.uuid4())
        logger.info(f"New client connection: {client_id}")
        
        try:
            # Authentication
            if not await self.auth_handler.authenticate(websocket):
                await websocket.close(code=4001, reason="Authentication failed")
                return
            
            # Handle messages
            async for message in websocket:
                try:
                    await self._handle_message(websocket, message, client_id)
                except Exception as e:
                    logger.error(f"Error handling message from {client_id}: {e}")
                    
                    # Send error response
                    error_response = MCPMessage(
                        error={
                            "code": -32603,
                            "message": "Internal server error",
                            "data": str(e)
                        }
                    )
                    await websocket.send(json.dumps(asdict(error_response)))
                    
        except websockets.exceptions.ConnectionClosedError:
            logger.info(f"Client {client_id} disconnected")
        except Exception as e:
            logger.error(f"Connection error for {client_id}: {e}")
        finally:
            self.metrics.record_client_disconnect(client_id)
    
    async def _handle_message(self, websocket, raw_message: str, client_id: str):
        """Handle a single message from client"""
        
        # Rate limiting
        if not await self.rate_limiter.check_rate_limit(client_id):
            error_response = MCPMessage(
                error={
                    "code": -32603,
                    "message": "Rate limit exceeded"
                }
            )
            await websocket.send(json.dumps(asdict(error_response)))
            return
        
        # Parse message
        try:
            message_data = json.loads(raw_message)
            message = MCPMessage(**message_data)
        except Exception as e:
            error_response = MCPMessage(
                error={
                    "code": -32700,
                    "message": "Parse error",
                    "data": str(e)
                }
            )
            await websocket.send(json.dumps(asdict(error_response)))
            return
        
        # Record metrics
        self.metrics.record_message_received(message.method, client_id)
        
        # Route message
        response = await self._route_message(message, client_id)
        
        # Send response
        if response:
            await websocket.send(json.dumps(asdict(response)))
            self.metrics.record_message_sent(client_id)
    
    async def _route_message(self, message: MCPMessage, client_id: str) -> Optional[MCPMessage]:
        """Route message to appropriate handler"""
        
        if message.method == MCPMethod.INITIALIZE.value:
            return await self._handle_initialize(message)
        elif message.method == MCPMethod.LIST_TOOLS.value:
            return await self._handle_list_tools(message)
        elif message.method == MCPMethod.CALL_TOOL.value:
            return await self._handle_call_tool(message, client_id)
        elif message.method == MCPMethod.LIST_RESOURCES.value:
            return await self._handle_list_resources(message)
        elif message.method == MCPMethod.READ_RESOURCE.value:
            return await self._handle_read_resource(message)
        elif message.method == MCPMethod.LIST_PROMPTS.value:
            return await self._handle_list_prompts(message)
        elif message.method == MCPMethod.GET_PROMPT.value:
            return await self._handle_get_prompt(message)
        else:
            return MCPMessage(
                id=message.id,
                error={
                    "code": -32601,
                    "message": f"Method not found: {message.method}"
                }
            )
    
    async def _handle_initialize(self, message: MCPMessage) -> MCPMessage:
        """Handle initialization request"""
        
        client_info = message.params or {}
        
        result = {
            "protocolVersion": "2024-11-05",
            "capabilities": {
                "tools": {"listChanged": True},
                "resources": {"subscribe": True, "listChanged": True},
                "prompts": {"listChanged": True}
            },
            "serverInfo": self.server_info,
            "instructions": "Use this server to access databases, files, and custom business logic."
        }
        
        return MCPMessage(
            id=message.id,
            result=result
        )
    
    async def _handle_list_tools(self, message: MCPMessage) -> MCPMessage:
        """Handle tools listing request"""
        
        all_tools = []
        
        for provider_name, provider in self.tool_providers.items():
            try:
                tools = await provider.get_tools()
                for tool in tools:
                    # Add provider context
                    tool_dict = asdict(tool)
                    tool_dict['provider'] = provider_name
                    all_tools.append(tool_dict)
            except Exception as e:
                logger.error(f"Error getting tools from {provider_name}: {e}")
        
        return MCPMessage(
            id=message.id,
            result={"tools": all_tools}
        )
    
    async def _handle_call_tool(self, message: MCPMessage, client_id: str) -> MCPMessage:
        """Handle tool execution request"""
        
        params = message.params or {}
        tool_name = params.get("name")
        arguments = params.get("arguments", {})
        
        if not tool_name:
            return MCPMessage(
                id=message.id,
                error={
                    "code": -32602,
                    "message": "Missing tool name"
                }
            )
        
        # Find the provider that has this tool
        for provider_name, provider in self.tool_providers.items():
            try:
                tools = await provider.get_tools()
                tool_names = [tool.name for tool in tools]
                
                if tool_name in tool_names:
                    # Record tool execution start
                    self.metrics.record_tool_execution_start(tool_name, client_id)
                    
                    # Execute tool
                    result = await provider.call_tool(tool_name, arguments)
                    
                    # Record successful execution
                    self.metrics.record_tool_execution_success(tool_name, client_id)
                    
                    return MCPMessage(
                        id=message.id,
                        result={
                            "content": [
                                {
                                    "type": "text",
                                    "text": json.dumps(result, indent=2)
                                }
                            ]
                        }
                    )
                    
            except MCPError as e:
                self.metrics.record_tool_execution_error(tool_name, client_id, str(e))
                return MCPMessage(
                    id=message.id,
                    error={
                        "code": e.code,
                        "message": e.message,
                        "data": e.data
                    }
                )
            except Exception as e:
                self.metrics.record_tool_execution_error(tool_name, client_id, str(e))
                return MCPMessage(
                    id=message.id,
                    error={
                        "code": -32603,
                        "message": f"Tool execution failed: {str(e)}"
                    }
                )
        
        return MCPMessage(
            id=message.id,
            error={
                "code": -32601,
                "message": f"Tool not found: {tool_name}"
            }
        )

class MCPServerMetrics:
    """Metrics collection for MCP server"""
    
    def __init__(self):
        self.metrics = {}
        self.client_connections = set()
        self.tool_execution_counts = {}
        self.error_counts = {}
    
    def record_client_connect(self, client_id: str):
        """Record client connection"""
        self.client_connections.add(client_id)
    
    def record_client_disconnect(self, client_id: str):
        """Record client disconnection"""
        self.client_connections.discard(client_id)
    
    def record_message_received(self, method: str, client_id: str):
        """Record incoming message"""
        key = f"messages_received_{method}"
        self.metrics[key] = self.metrics.get(key, 0) + 1
    
    def record_message_sent(self, client_id: str):
        """Record outgoing message"""
        self.metrics["messages_sent"] = self.metrics.get("messages_sent", 0) + 1
    
    def record_tool_execution_start(self, tool_name: str, client_id: str):
        """Record tool execution start"""
        key = f"tool_executions_{tool_name}"
        self.metrics[key] = self.metrics.get(key, 0) + 1
    
    def record_tool_execution_success(self, tool_name: str, client_id: str):
        """Record successful tool execution"""
        key = f"tool_successes_{tool_name}"
        self.metrics[key] = self.metrics.get(key, 0) + 1
    
    def record_tool_execution_error(self, tool_name: str, client_id: str, error: str):
        """Record tool execution error"""
        key = f"tool_errors_{tool_name}"
        self.metrics[key] = self.metrics.get(key, 0) + 1
        
        # Store specific error
        error_key = f"{tool_name}_{error}"
        self.error_counts[error_key] = self.error_counts.get(error_key, 0) + 1
    
    def get_metrics(self) -> Dict[str, Any]:
        """Get current metrics"""
        return {
            "active_connections": len(self.client_connections),
            "metrics": self.metrics.copy(),
            "error_counts": self.error_counts.copy()
        }

class RateLimiter:
    """Rate limiter for MCP server"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.requests_per_minute = config.get('requests_per_minute', 60)
        self.client_requests: Dict[str, List[float]] = {}
    
    async def check_rate_limit(self, client_id: str) -> bool:
        """Check if client is within rate limit"""
        
        import time
        
        current_time = time.time()
        
        if client_id not in self.client_requests:
            self.client_requests[client_id] = []
        
        # Clean old requests (older than 1 minute)
        self.client_requests[client_id] = [
            req_time for req_time in self.client_requests[client_id]
            if current_time - req_time < 60
        ]
        
        # Check rate limit
        if len(self.client_requests[client_id]) >= self.requests_per_minute:
            return False
        
        # Record this request
        self.client_requests[client_id].append(current_time)
        return True

class AuthHandler:
    """Authentication handler for MCP server"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.auth_enabled = config.get('enabled', False)
        self.api_keys = set(config.get('api_keys', []))
        self.jwt_secret = config.get('jwt_secret')
    
    async def authenticate(self, websocket) -> bool:
        """Authenticate client connection"""
        
        if not self.auth_enabled:
            return True
        
        # For WebSocket, authentication is typically done via headers
        # or initial handshake message
        auth_header = websocket.request_headers.get('Authorization')
        
        if not auth_header:
            return False
        
        if auth_header.startswith('Bearer '):
            token = auth_header[7:]
            return await self._validate_jwt_token(token)
        elif auth_header.startswith('ApiKey '):
            api_key = auth_header[7:]
            return api_key in self.api_keys
        
        return False
    
    async def _validate_jwt_token(self, token: str) -> bool:
        """Validate JWT token"""
        
        if not self.jwt_secret:
            return False
        
        try:
            import jwt
            payload = jwt.decode(token, self.jwt_secret, algorithms=['HS256'])
            
            # Additional validation (expiry, permissions, etc.)
            return True
            
        except jwt.InvalidTokenError:
            return False

MCP Client Implementation

Building robust MCP clients requires handling connection management, retry logic, and tool orchestration:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
class ProductionMCPClient:
    """Production-ready MCP client implementation"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.server_url = config['server_url']
        self.auth_token = config.get('auth_token')
        
        # Connection management
        self.websocket: Optional[websockets.WebSocketClientProtocol] = None
        self.connected = False
        self.connection_lock = asyncio.Lock()
        
        # Request/response handling
        self.pending_requests: Dict[str, asyncio.Future] = {}
        self.request_counter = 0
        
        # Tool and resource caches
        self.tool_cache: Optional[List[MCPTool]] = None
        self.resource_cache: Optional[List[MCPResource]] = None
        self.cache_ttl = config.get('cache_ttl', 300)  # 5 minutes
        self.cache_updated: Optional[float] = None
        
        # Error handling and retries
        self.retry_config = config.get('retry', {})
        self.max_retries = self.retry_config.get('max_retries', 3)
        self.retry_delay = self.retry_config.get('initial_delay', 1.0)
        
        # Metrics
        self.metrics = MCPClientMetrics()
        
    async def connect(self) -> bool:
        """Connect to MCP server"""
        
        async with self.connection_lock:
            if self.connected and self.websocket:
                return True
            
            try:
                # Set up connection headers
                headers = {}
                if self.auth_token:
                    headers['Authorization'] = f'Bearer {self.auth_token}'
                
                # Connect to server
                self.websocket = await websockets.connect(
                    self.server_url,
                    extra_headers=headers,
                    ping_interval=30,
                    ping_timeout=10
                )
                
                # Start message handler
                asyncio.create_task(self._message_handler())
                
                # Initialize connection
                await self._initialize_connection()
                
                self.connected = True
                self.metrics.record_connection_success()
                logger.info("Connected to MCP server successfully")
                
                return True
                
            except Exception as e:
                self.metrics.record_connection_error(str(e))
                logger.error(f"Failed to connect to MCP server: {e}")
                return False
    
    async def disconnect(self):
        """Disconnect from MCP server"""
        
        async with self.connection_lock:
            if self.websocket:
                await self.websocket.close()
                self.websocket = None
            
            self.connected = False
            
            # Cancel pending requests
            for future in self.pending_requests.values():
                future.cancel()
            self.pending_requests.clear()
    
    async def _message_handler(self):
        """Handle incoming messages from server"""
        
        try:
            async for raw_message in self.websocket:
                try:
                    message_data = json.loads(raw_message)
                    message = MCPMessage(**message_data)
                    
                    # Handle response to pending request
                    if message.id and str(message.id) in self.pending_requests:
                        future = self.pending_requests.pop(str(message.id))
                        if message.error:
                            error = MCPError(
                                message.error.get('code', -32603),
                                message.error.get('message', 'Unknown error'),
                                message.error.get('data')
                            )
                            future.set_exception(error)
                        else:
                            future.set_result(message.result)
                    
                    # Handle notifications
                    elif message.method and not message.id:
                        await self._handle_notification(message)
                    
                except Exception as e:
                    logger.error(f"Error processing message: {e}")
                    
        except websockets.exceptions.ConnectionClosedError:
            logger.info("Server connection closed")
            self.connected = False
        except Exception as e:
            logger.error(f"Message handler error: {e}")
            self.connected = False
    
    async def _send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Any:
        """Send request to server and wait for response"""
        
        if not self.connected:
            if not await self.connect():
                raise MCPError(-32603, "Not connected to server")
        
        # Generate unique request ID
        self.request_counter += 1
        request_id = str(self.request_counter)
        
        # Create request message
        request = MCPMessage(
            id=request_id,
            method=method,
            params=params
        )
        
        # Create future for response
        future = asyncio.Future()
        self.pending_requests[request_id] = future
        
        try:
            # Send request
            await self.websocket.send(json.dumps(asdict(request)))
            self.metrics.record_request_sent(method)
            
            # Wait for response with timeout
            timeout = self.config.get('request_timeout', 30)
            result = await asyncio.wait_for(future, timeout=timeout)
            
            self.metrics.record_request_success(method)
            return result
            
        except asyncio.TimeoutError:
            self.pending_requests.pop(request_id, None)
            self.metrics.record_request_timeout(method)
            raise MCPError(-32603, f"Request timeout: {method}")
        except Exception as e:
            self.pending_requests.pop(request_id, None)
            self.metrics.record_request_error(method, str(e))
            raise
    
    async def _initialize_connection(self):
        """Initialize connection with server"""
        
        client_info = {
            "name": self.config.get('client_name', 'MCP Client'),
            "version": self.config.get('client_version', '1.0.0')
        }
        
        result = await self._send_request('initialize', client_info)
        logger.info(f"Initialized connection: {result}")
    
    async def list_tools(self, use_cache: bool = True) -> List[MCPTool]:
        """List available tools from server"""
        
        # Check cache
        if use_cache and self._is_cache_valid():
            if self.tool_cache is not None:
                return self.tool_cache
        
        # Request from server
        result = await self._send_request('tools/list')
        tools = [MCPTool(**tool_data) for tool_data in result.get('tools', [])]
        
        # Update cache
        self.tool_cache = tools
        self.cache_updated = time.time()
        
        return tools
    
    async def call_tool(self, 
                       tool_name: str, 
                       arguments: Dict[str, Any],
                       retry_on_failure: bool = True) -> Any:
        """Call a tool on the server"""
        
        params = {
            "name": tool_name,
            "arguments": arguments
        }
        
        if retry_on_failure:
            return await self._call_with_retry(
                lambda: self._send_request('tools/call', params),
                tool_name
            )
        else:
            return await self._send_request('tools/call', params)
    
    async def _call_with_retry(self, 
                              call_func: callable,
                              tool_name: str,
                              current_retry: int = 0) -> Any:
        """Call function with retry logic"""
        
        try:
            result = await call_func()
            if current_retry > 0:
                self.metrics.record_retry_success(tool_name, current_retry)
            return result
            
        except Exception as e:
            if current_retry < self.max_retries:
                # Calculate delay with exponential backoff
                delay = self.retry_delay * (2 ** current_retry)
                
                logger.warning(f"Tool call failed (attempt {current_retry + 1}), retrying in {delay}s: {e}")
                
                await asyncio.sleep(delay)
                return await self._call_with_retry(call_func, tool_name, current_retry + 1)
            else:
                self.metrics.record_retry_exhausted(tool_name, current_retry)
                raise
    
    async def call_tool_with_context(self, 
                                   tool_name: str,
                                   arguments: Dict[str, Any],
                                   context: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """Call tool with additional context and result processing"""
        
        start_time = time.time()
        
        try:
            # Add context to arguments if provided
            if context:
                arguments = {**arguments, '_context': context}
            
            # Call tool
            result = await self.call_tool(tool_name, arguments)
            
            # Process result
            processed_result = self._process_tool_result(result, tool_name)
            
            execution_time = time.time() - start_time
            self.metrics.record_tool_execution_time(tool_name, execution_time)
            
            return {
                'tool': tool_name,
                'result': processed_result,
                'execution_time': execution_time,
                'success': True
            }
            
        except Exception as e:
            execution_time = time.time() - start_time
            self.metrics.record_tool_execution_error(tool_name, str(e))
            
            return {
                'tool': tool_name,
                'error': str(e),
                'execution_time': execution_time,
                'success': False
            }
    
    def _process_tool_result(self, result: Any, tool_name: str) -> Any:
        """Process and validate tool result"""
        
        # Extract content from MCP response format
        if isinstance(result, dict) and 'content' in result:
            content_items = result['content']
            
            if len(content_items) == 1 and content_items[0].get('type') == 'text':
                # Single text response
                text_content = content_items[0]['text']
                
                # Try to parse as JSON if it looks like JSON
                if text_content.strip().startswith(('{', '[')):
                    try:
                        return json.loads(text_content)
                    except json.JSONDecodeError:
                        return text_content
                
                return text_content
            else:
                # Multiple content items
                return content_items
        
        return result
    
    def _is_cache_valid(self) -> bool:
        """Check if cache is still valid"""
        
        if self.cache_updated is None:
            return False
        
        return (time.time() - self.cache_updated) < self.cache_ttl

class MCPToolOrchestrator:
    """Orchestrate multiple MCP tools for complex workflows"""
    
    def __init__(self, client: ProductionMCPClient):
        self.client = client
        self.workflow_cache: Dict[str, Any] = {}
        
    async def execute_workflow(self, 
                             workflow_definition: Dict[str, Any],
                             context: Dict[str, Any]) -> Dict[str, Any]:
        """Execute a multi-step workflow using MCP tools"""
        
        workflow_id = workflow_definition.get('id', str(uuid.uuid4()))
        steps = workflow_definition.get('steps', [])
        
        workflow_result = {
            'workflow_id': workflow_id,
            'steps': [],
            'success': True,
            'error': None
        }
        
        # Maintain workflow context
        workflow_context = context.copy()
        
        for step_idx, step in enumerate(steps):
            step_result = await self._execute_workflow_step(
                step, 
                workflow_context, 
                step_idx
            )
            
            workflow_result['steps'].append(step_result)
            
            if not step_result['success']:
                # Handle step failure
                if step.get('continue_on_failure', False):
                    logger.warning(f"Step {step_idx} failed but continuing: {step_result['error']}")
                else:
                    workflow_result['success'] = False
                    workflow_result['error'] = f"Step {step_idx} failed: {step_result['error']}"
                    break
            else:
                # Update context with step result
                if step.get('output_variable'):
                    workflow_context[step['output_variable']] = step_result['result']
        
        return workflow_result
    
    async def _execute_workflow_step(self, 
                                   step: Dict[str, Any],
                                   context: Dict[str, Any],
                                   step_idx: int) -> Dict[str, Any]:
        """Execute a single workflow step"""
        
        step_type = step.get('type', 'tool_call')
        
        if step_type == 'tool_call':
            return await self._execute_tool_call_step(step, context, step_idx)
        elif step_type == 'condition':
            return await self._execute_condition_step(step, context, step_idx)
        elif step_type == 'loop':
            return await self._execute_loop_step(step, context, step_idx)
        elif step_type == 'parallel':
            return await self._execute_parallel_step(step, context, step_idx)
        else:
            return {
                'step_index': step_idx,
                'success': False,
                'error': f"Unknown step type: {step_type}"
            }
    
    async def _execute_tool_call_step(self, 
                                    step: Dict[str, Any],
                                    context: Dict[str, Any],
                                    step_idx: int) -> Dict[str, Any]:
        """Execute a tool call step"""
        
        tool_name = step['tool']
        arguments_template = step.get('arguments', {})
        
        # Resolve arguments from context
        arguments = self._resolve_arguments(arguments_template, context)
        
        # Execute tool
        result = await self.client.call_tool_with_context(
            tool_name, 
            arguments, 
            context
        )
        
        return {
            'step_index': step_idx,
            'step_type': 'tool_call',
            'tool': tool_name,
            **result
        }
    
    def _resolve_arguments(self, 
                          arguments_template: Dict[str, Any],
                          context: Dict[str, Any]) -> Dict[str, Any]:
        """Resolve argument templates using workflow context"""
        
        resolved = {}
        
        for key, value in arguments_template.items():
            if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
                # Template variable
                var_name = value[2:-1]
                if var_name in context:
                    resolved[key] = context[var_name]
                else:
                    raise ValueError(f"Context variable not found: {var_name}")
            else:
                resolved[key] = value
        
        return resolved

Security and Compliance

Production MCP deployments require robust security measures:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class MCPSecurityManager:
    """Security manager for MCP deployments"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.policy_engine = PolicyEngine(config.get('policies', {}))
        self.audit_logger = AuditLogger(config.get('audit', {}))
        
    async def validate_tool_call(self, 
                                tool_name: str,
                                arguments: Dict[str, Any],
                                client_context: Dict[str, Any]) -> ValidationResult:
        """Validate tool call against security policies"""
        
        # Check tool permissions
        tool_permission_result = await self.policy_engine.check_tool_permission(
            tool_name, 
            client_context
        )
        
        if not tool_permission_result.allowed:
            await self.audit_logger.log_security_event(
                event_type="tool_access_denied",
                tool=tool_name,
                client=client_context.get('client_id'),
                reason=tool_permission_result.reason
            )
            return ValidationResult(False, tool_permission_result.reason)
        
        # Validate arguments
        argument_validation = await self.policy_engine.validate_arguments(
            tool_name,
            arguments,
            client_context
        )
        
        if not argument_validation.valid:
            await self.audit_logger.log_security_event(
                event_type="invalid_arguments",
                tool=tool_name,
                client=client_context.get('client_id'),
                arguments=arguments,
                reason=argument_validation.reason
            )
            return ValidationResult(False, argument_validation.reason)
        
        # Check rate limits and quotas
        quota_check = await self.policy_engine.check_quota(
            tool_name,
            client_context
        )
        
        if not quota_check.within_limits:
            await self.audit_logger.log_security_event(
                event_type="quota_exceeded",
                tool=tool_name,
                client=client_context.get('client_id'),
                quota=quota_check.limit,
                current_usage=quota_check.current_usage
            )
            return ValidationResult(False, f"Quota exceeded: {quota_check.limit}")
        
        return ValidationResult(True, "Validation passed")

class PolicyEngine:
    """Policy engine for security enforcement"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.policies = self._load_policies()
        
    def _load_policies(self) -> Dict[str, Any]:
        """Load security policies from configuration"""
        
        return {
            'tool_permissions': self.config.get('tool_permissions', {}),
            'argument_validation': self.config.get('argument_validation', {}),
            'rate_limits': self.config.get('rate_limits', {}),
            'data_access_controls': self.config.get('data_access_controls', {})
        }
    
    async def check_tool_permission(self, 
                                  tool_name: str,
                                  client_context: Dict[str, Any]) -> 'PermissionResult':
        """Check if client has permission to use tool"""
        
        client_role = client_context.get('role', 'default')
        
        tool_permissions = self.policies['tool_permissions']
        
        # Check role-based permissions
        if client_role in tool_permissions:
            allowed_tools = tool_permissions[client_role].get('allowed_tools', [])
            denied_tools = tool_permissions[client_role].get('denied_tools', [])
            
            if tool_name in denied_tools:
                return PermissionResult(False, f"Tool {tool_name} explicitly denied")
            
            if allowed_tools == ['*'] or tool_name in allowed_tools:
                return PermissionResult(True, "Tool allowed")
            else:
                return PermissionResult(False, f"Tool {tool_name} not in allowed list")
        
        # Default deny
        return PermissionResult(False, f"No permissions defined for role {client_role}")

@dataclass
class ValidationResult:
    valid: bool
    reason: str

@dataclass 
class PermissionResult:
    allowed: bool
    reason: str

Monitoring and Observability

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class MCPObservabilityStack:
    """Comprehensive observability for MCP deployments"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        
        # Metrics collection
        self.metrics_collector = MetricsCollector()
        self.custom_metrics: Dict[str, Any] = {}
        
        # Tracing
        self.tracer = self._setup_tracing()
        
        # Alerting
        self.alert_manager = AlertManager(config.get('alerts', {}))
        
        # Health checks
        self.health_checker = HealthChecker()
        
    def _setup_tracing(self):
        """Set up distributed tracing"""
        
        # Initialize OpenTelemetry or similar
        from opentelemetry import trace
        from opentelemetry.exporter.jaeger.thrift import JaegerExporter
        from opentelemetry.sdk.trace import TracerProvider
        from opentelemetry.sdk.trace.export import BatchSpanProcessor
        
        trace.set_tracer_provider(TracerProvider())
        tracer = trace.get_tracer(__name__)
        
        # Set up Jaeger exporter
        jaeger_exporter = JaegerExporter(
            agent_host_name=self.config.get('jaeger_host', 'localhost'),
            agent_port=self.config.get('jaeger_port', 6831),
        )
        
        span_processor = BatchSpanProcessor(jaeger_exporter)
        trace.get_tracer_provider().add_span_processor(span_processor)
        
        return tracer
    
    async def trace_tool_execution(self, 
                                 tool_name: str,
                                 arguments: Dict[str, Any],
                                 execution_func: callable) -> Any:
        """Trace tool execution with distributed tracing"""
        
        with self.tracer.start_as_current_span(f"mcp.tool.{tool_name}") as span:
            span.set_attribute("tool.name", tool_name)
            span.set_attribute("tool.arguments_count", len(arguments))
            
            try:
                result = await execution_func()
                span.set_attribute("tool.success", True)
                return result
            except Exception as e:
                span.set_attribute("tool.success", False)
                span.set_attribute("tool.error", str(e))
                raise
    
    async def check_system_health(self) -> Dict[str, Any]:
        """Comprehensive system health check"""
        
        health_status = {
            'overall': 'healthy',
            'components': {},
            'timestamp': datetime.now().isoformat()
        }
        
        # Check server connectivity
        server_health = await self.health_checker.check_server_connectivity()
        health_status['components']['server'] = server_health
        
        # Check tool provider health
        for provider_name, provider in self.tool_providers.items():
            provider_health = await self.health_checker.check_provider_health(
                provider_name, provider
            )
            health_status['components'][f'provider_{provider_name}'] = provider_health
        
        # Check resource usage
        resource_health = await self.health_checker.check_resource_usage()
        health_status['components']['resources'] = resource_health
        
        # Determine overall health
        if any(component['status'] == 'unhealthy' 
               for component in health_status['components'].values()):
            health_status['overall'] = 'unhealthy'
        elif any(component['status'] == 'degraded'
                 for component in health_status['components'].values()):
            health_status['overall'] = 'degraded'
        
        return health_status

Conclusion

The Model Context Protocol represents a significant advancement in building extensible AI tool ecosystems. Key implementation considerations for production environments include:

  1. Robust Architecture: Design MCP servers and clients with proper error handling, connection management, and security
  2. Security First: Implement comprehensive authentication, authorization, and audit logging
  3. Performance Optimization: Use caching, connection pooling, and async processing for scale
  4. Comprehensive Monitoring: Track metrics, traces, and health across all components
  5. Tool Orchestration: Build higher-level abstractions for complex multi-tool workflows

The MCP ecosystem enables organizations to build truly modular AI systems where capabilities can be added, updated, and scaled independently. As the protocol continues to evolve, early adopters who invest in robust MCP infrastructure will be well-positioned to leverage the expanding ecosystem of MCP-compatible tools and services.

Success with MCP requires treating it as a foundational infrastructure component rather than just another API. Organizations that approach MCP with the same rigor they apply to database systems, message queues, and other critical infrastructure will unlock the full potential of composable AI systems.