MCP servers are the new API endpoints. And just like API endpoints, they need transport security, authentication, authorization, input validation, rate limiting, and monitoring.
The difference? MCP servers are consumed by AI agents — not browsers or mobile apps. Agents call tools programmatically, pass parameters generated by an LLM, and process structured responses. The security model needs to account for non-deterministic inputs, prompt injection through parameters, and the fact that a compromised agent can call any tool on any connected MCP server.
This post covers the full MCP server security stack.
MCP Transport Security
MCP supports three transports. Each has a different security profile.
Transport Comparison
| Transport | Default Security | Best For | Risk |
|---|---|---|---|
| stdio | Local process isolation | Local development, CLI agents | Low (local only) |
| SSE (Server-Sent Events) | None (HTTP) | Remote MCP servers, web apps | Medium (requires auth) |
| WebSocket | None (WS) | Real-time streaming, bidirectional | Medium (requires auth) |
stdio: Safer by Default
stdio MCP servers run as child processes. The agent starts the server, communicates over stdin/stdout, and kills it when done. No network exposure.
{ "mcpServers": { "local-db": { "command": "node", "args": ["./mcp-servers/db-server.js"], "env": { "DATABASE_URL": "postgres://user:pass@localhost:5432/mydb" } } }}Security properties:
- No network port → no remote attacks
- Process-level isolation → OS enforces permissions
- Environment variables → credentials never in command line
- Process lifetime → credentials exist only while server runs
When to use: Development, single-user agents, agents on the same machine as tools.
SSE: The Remote MCP Challenge
SSE-based MCP servers expose HTTP endpoints. This is necessary for remote agents (Claude Code via Tailscale, multi-user setups) but introduces security requirements.
from mcp.server import Serverfrom mcp.server.sse import SseServerTransport
app = Server("payment-mcp")
# SECURITY: Add middleware layer@app.middleware("http")async def auth_middleware(request, call_next): # Transport-level authentication api_key = request.headers.get("X-API-Key") if not api_key or not validate_api_key(api_key): return JSONResponse( status_code=401, content={"error": "Invalid API key"} )
# Rate limiting client_ip = request.client.host if await rate_limiter.is_rate_limited(client_ip): return JSONResponse( status_code=429, content={"error": "Rate limit exceeded"} )
return await call_next(request)Security requirements for SSE:
- TLS (HTTPS) — mandatory, not optional
- API key or OAuth token in headers
- Rate limiting at the transport level
- Request logging for audit
- CORS configuration if browser-accessible
WebSocket: Bidirectional with State
WebSocket MCP servers maintain persistent connections. The security concern is state management — a compromised WebSocket can maintain an authenticated session.
import websocketsimport jwt
async def handle_connection(websocket): # Authenticate on connection token = await websocket.recv() # First message must be auth token try: payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"]) agent_id = payload["agent_id"] permissions = payload["permissions"] except jwt.InvalidTokenError: await websocket.close(code=4001, reason="Authentication failed") return
# Session-scoped context session = MCPSession(agent_id, permissions)
async for message in websocket: response = await session.handle_message(message) await websocket.send(response)
# Clean up session on disconnect session.revoke_credentials()Authentication Patterns
Pattern 1: API Key Authentication (Simple)
import osfrom fastapi import FastAPI, Header, HTTPException
app = FastAPI()
# Load valid API keys from environment or vaultVALID_API_KEYS = set(os.getenv("MCP_API_KEYS", "").split(","))
@app.post("/mcp/tools/call")async def call_tool( request: ToolCallRequest, x_api_key: str = Header(None),): # Validate API key if not x_api_key or x_api_key not in VALID_API_KEYS: raise HTTPException(status_code=401, detail="Invalid API key")
# Extract permissions from API key identity key_id, permissions = get_key_permissions(x_api_key)
# Log the call audit_log( key_id=key_id, tool=request.tool_name, params=request.params, timestamp=datetime.utcnow(), )
# Execute with scoped permissions return await execute_tool(request.tool_name, request.params, permissions)Security notes for API keys:
- Use environment variables, never hardcode
- Rotate keys regularly (90 days max)
- Each key has a unique identity for auditing
- Keys can be scoped to specific tools/actions
- Include a key prefix for identification:
mcp_prod_xxx,mcp_dev_xxx
Pattern 2: OAuth 2.0 / OAuth 2.1 (Enterprise)
For organizations with identity providers, OAuth is the standard:
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()oauth.register( name="mcp-provider", client_id=os.getenv("OAUTH_CLIENT_ID"), client_secret=os.getenv("OAUTH_CLIENT_SECRET"), authorize_url="https://auth.company.com/oauth/authorize", token_url="https://auth.company.com/oauth/token", client_kwargs={"scope": "mcp:read mcp:tools:execute"},)
@app.get("/mcp/auth")async def mcp_auth(request: Request): # Redirect to OAuth provider redirect = request.url_for("mcp_callback") return await oauth.mcp_provider.authorize_redirect(request, redirect)
@app.get("/mcp/auth/callback")async def mcp_callback(request: Request): token = await oauth.mcp_provider.authorize_access_token(request) # Token contains identity, permissions, expiry # Return to MCP client with access token return {"access_token": token["access_token"], "token_type": "Bearer"}Pattern 3: Short-Lived Session Tokens (Recommended)
Generate time-bound tokens for each agent session:
import jwtfrom datetime import datetime, timedelta
class SessionTokenManager: def __init__(self, secret: str): self.secret = secret
def create_session_token(self, agent_id: str, permissions: list[str], ttl: timedelta = timedelta(hours=1)) -> str: payload = { "agent_id": agent_id, "permissions": permissions, "iat": datetime.utcnow(), "exp": datetime.utcnow() + ttl, "jti": uuid4().hex, # Unique token ID for revocation } return jwt.encode(payload, self.secret, algorithm="HS256")
def validate_session_token(self, token: str) -> dict: try: payload = jwt.decode(token, self.secret, algorithms=["HS256"]) # Check if revoked if self.is_revoked(payload["jti"]): raise SecurityError("Token revoked") return payload except jwt.ExpiredSignatureError: raise SecurityError("Token expired") except jwt.InvalidTokenError: raise SecurityError("Invalid token")
def revoke_token(self, jti: str): self.revoked_tokens.add(jti)Input Validation: The MCP Server’s Last Line of Defense
MCP servers receive inputs from agents. Those inputs may be malformed, malicious, or simply unexpected. Validate everything.
Tool Parameter Validation
from pydantic import BaseModel, Field, validatorfrom typing import Literal
class QueryDatabaseParams(BaseModel): sql: str = Field(..., description="SQL query to execute") params: list = Field(default_factory=list, description="Query parameters") max_rows: int = Field(default=100, le=1000, description="Maximum rows to return")
@validator("sql") def validate_sql(cls, v): # Only allow SELECT queries if not v.strip().upper().startswith("SELECT"): raise ValueError("Only SELECT queries are allowed")
# Block dangerous patterns forbidden = ["pg_sleep", "pg_read_file", "COPY", "CREATE", "DROP", "ALTER", "TRUNCATE"] for pattern in forbidden: if pattern.lower() in v.lower(): raise ValueError(f"Forbidden SQL pattern: {pattern}")
# Prevent SQL injection in dynamic parts if ";" in v.rstrip(";"): raise ValueError("Multiple statements not allowed")
return v
class SendEmailParams(BaseModel): to: list[str] = Field(..., min_items=1, max_items=10) subject: str = Field(..., max_length=200) body: str = Field(..., max_length=10000)
@validator("to", each_item=True) def validate_email(cls, v): if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', v): raise ValueError(f"Invalid email: {v}")
# Check against allowed domains allowed_domains = ["company.com", "client.org"] domain = v.split("@")[1] if domain not in allowed_domains: raise ValueError(f"Domain {domain} not allowed for email")
return v
@mcp.tool()async def query_database(params: QueryDatabaseParams) -> dict: """Query the database with validated parameters.""" # Params are already validated by Pydantic results = await db.execute(params.sql, params.params, limit=params.max_rows) return {"rows": results, "count": len(results)}Preventing Parameter Injection
The most unique MCP security concern: parameters generated by an LLM can contain injection through the parameter itself:
# An agent reads a file and calls:tool_call("send_email", { "to": "admin@company.com", "subject": "Important: password reset", "body": "The attacker-controlled content read from a file said: 'Click this link to reset your password: http://evil.com'"})# The body comes from untrusted content — it's an injection via parameterDefense: Validate parameters as untrusted input
class ToolParameterSanitizer: """Sanitize tool parameters that may contain injected content."""
def sanitize_body(self, body: str, context: str = "") -> str: # Strip potential injection patterns body = re.sub(r'<(script|iframe|object|embed)[^>]*>.*?</\1>', '[blocked]', body, flags=re.DOTALL) body = re.sub(r'javascript:', '', body, flags=re.IGNORECASE) body = re.sub(r'on\w+\s*=', '', body, flags=re.IGNORECASE)
# Truncate to prevent data exfiltration via response length max_length = 10000 if "read" not in context else 50000 return body[:max_length]
def sanitize_filename(self, filename: str) -> str: # Prevent path traversal filename = os.path.basename(filename) # Strip directory components filename = re.sub(r'[<>:"/\\|?*]', '_', filename) # Remove dangerous chars return filenameRate Limiting and Cost Protection
MCP servers can be called many times per second by a misbehaving agent. Rate limiting protects both the server and your budget.
class MCPRateLimiter: """Multi-level rate limiting for MCP servers."""
def __init__(self): self.redis = Redis.from_url(os.getenv("REDIS_URL"))
async def check_rate_limit(self, agent_id: str, tool_name: str, api_key: str) -> bool: checks = [ # Per-agent: max 60 calls/min across all tools self.redis.get(f"ratelimit:agent:{agent_id}") < 60,
# Per-tool: max 10 calls/min for expensive operations self.redis.get(f"ratelimit:tool:{agent_id}:{tool_name}") < 10,
# Per-key: max 100 calls/min per API key self.redis.get(f"ratelimit:key:{api_key}") < 100,
# Global: max 1000 calls/min for the whole server self.redis.get("ratelimit:global") < 1000, ]
return all(checks)
async def increment(self, agent_id: str, tool_name: str, api_key: str): pipeline = self.redis.pipeline() pipeline.incr(f"ratelimit:agent:{agent_id}") pipeline.expire(f"ratelimit:agent:{agent_id}", 60) pipeline.incr(f"ratelimit:tool:{agent_id}:{tool_name}") pipeline.expire(f"ratelimit:tool:{agent_id}:{tool_name}", 60) # ... await pipeline.execute()
async def get_costs(self, agent_id: str) -> dict: """Track costs per agent for billing.""" tool_counts = await self.redis.hgetall(f"costs:{agent_id}") return { tool: count * TOOL_COSTS.get(tool, 0.01) for tool, count in tool_counts.items() }Cost Attack Prevention
class CostAttackPrevention: """Detect and prevent cost attacks on MCP servers."""
# Tools with high per-call cost EXPENSIVE_TOOLS = { "web_search": 0.05, # $0.05 per call "llm_completion": 0.10, # $0.10 per call "code_generation": 0.15, # $0.15 per call "image_generation": 0.05, # $0.05 per call }
BUDGET_LIMITS = { "per_request": 1.00, # Max $1 per request "per_hour": 10.00, # Max $10 per hour per agent "per_day": 50.00, # Max $50 per day per agent }
async def check_budget(self, agent_id: str, tool_name: str) -> bool: if tool_name not in self.EXPENSIVE_TOOLS: return True # Cheap tools always allowed
tool_cost = self.EXPENSIVE_TOOLS[tool_name]
# Check if this call would exceed per-request budget if tool_cost > self.BUDGET_LIMITS["per_request"]: return False
# Check running totals hourly = await self.get_hourly_cost(agent_id) if hourly + tool_cost > self.BUDGET_LIMITS["per_hour"]: return False
daily = await self.get_daily_cost(agent_id) if daily + tool_cost > self.BUDGET_LIMITS["per_day"]: return False
return TrueMCP Server Deployment Security
# Docker Compose for secure MCP server deploymentversion: '3.8'services: mcp-server: build: ./mcp-server ports: - "127.0.0.1:3000:3000" # Localhost only, not 0.0.0.0 environment: - DATABASE_URL=${READ_ONLY_DB_URL} - API_KEY_FILE=/run/secrets/mcp_api_key secrets: - mcp_api_key - db_credentials healthcheck: test: ["CMD", "curl", "-f", "http://localhost:3000/health"] interval: 30s timeout: 10s retries: 3 restart: unless-stopped # Run as non-root user user: "1000:1000" # Read-only filesystem read_only: true # Resource limits deploy: resources: limits: cpus: '0.5' memory: 256M
secrets: mcp_api_key: file: ./secrets/mcp_api_key.txt db_credentials: file: ./secrets/db_credentials.jsonProduction MCP Security Architecture
┌──────────────┐ │ Agent │ │ Process │ └──────┬───────┘ │ MCP Protocol ▼ ┌──────────────┐ │ API Gateway │ ← TLS, Auth, Rate Limiting │ (Envoy/Kong)│ └──────┬───────┘ │ ┌────────────┼────────────┐ ▼ ▼ ▼ ┌──────────────┐ ┌──────────┐ ┌──────────┐ │ MCP Server │ │ MCP │ │ MCP │ │ (Read DB) │ │ Server │ │ Server │ │ read-only │ │ (GitHub) │ │ (Slack) │ │ credentials │ │ PR scope │ │ channel │ └──────────────┘ └──────────┘ └──────────┘Production Checklist
- TLS enabled for all remote MCP connections (SSE, WebSocket)
- stdio MCP for local-only, SSE/WebSocket for remote
- Authentication on every MCP request (API key, OAuth, or JWT)
- Short-lived session tokens with unique IDs
- Token revocation capability
- Pydantic/Zod validation on all tool parameters
- Parameter sanitization for untrusted content
- Multi-level rate limiting (per-agent, per-tool, per-key, global)
- Cost attack prevention (budget limits per request/hour/day)
- Docker: non-root user, read-only fs, resource limits
- Audit logging of every tool call
- Health check endpoints
- Process isolation for stdio MCP servers
Next in the Series
| Post | Topic |
|---|---|
| 1 | Prompt Injection & Defense |
| 2 | Tool Access Control |
| 3 | MCP Server Security (this) |
| 4 | Agent Auditing & Compliance |
| 5 | Production Security Patterns |
Series: Agent Security 2026 — Production Patterns. Post 3: MCP Server Security — transport, authentication, and production hardening.
Advertisement
Advertisement