Building an MCP stdio Server in Python with asyncio
Step-by-step tutorial for building a Model Context Protocol stdio server in Python using asyncio, with JSON-RPC framing, tool registration, and Claude Code integration.
Building an MCP stdio Server in Python with asyncio
Most MCP tutorials reach for the official mcp SDK and stop there. That works, but it hides the wire protocol behind a decorator and leaves you guessing when something breaks. This walkthrough builds a Model Context Protocol stdio server from raw asyncio primitives, so you understand exactly what Claude Code reads off your stdout when it calls a tool.
The Model Context Protocol is JSON-RPC 2.0 over a transport. The stdio transport is the simplest one to ship: your process reads requests from stdin, writes responses to stdout, and uses stderr for logging. No HTTP server, no port management, no TLS \u2014 Claude Code spawns your process as a subprocess and pipes bytes back and forth.
Why stdio over HTTP for local tools
stdio wins for local-only tools that don't need to be shared across machines. The trade-offs:
- stdio: zero auth (process-level trust), instant cold start, no port conflicts, dies when the parent dies. Best for tools that touch the local filesystem or run local subprocesses.
- HTTP/SSE: networked, multi-client, survives parent restart, needs auth and port routing. Best when one server backs multiple clients or runs on a different host.
For a "let Claude read my Aseprite palettes" or "let Claude grep my notes" tool, stdio is the right pick. For a shared team-wide tool, you'll want HTTP/SSE later.
The wire protocol in one paragraph
MCP messages are JSON-RPC 2.0 objects, one per line on stdout. Each object is either a request (has id, expects a response), a response (has id, replies to a request), or a notification (no id, fire-and-forget). The framing is line-delimited: write one JSON object, write \ , flush. Read one line, parse JSON. That's the whole transport.
The lifecycle: client sends initialize, server replies with capabilities, client sends initialized (notification), then the client may send tools/list, tools/call, resources/list, etc. You shut down on EOF.
Minimal server skeleton
Here is the smallest useful MCP server. It exposes one tool, echo, which returns whatever string you pass it. Save as server.py:
import asyncio
import json
import sys
from typing import Any
PROTOCOL_VERSION = "2024-11-05"
TOOLS = [
{
"name": "echo",
"description": "Echo a string back to the caller.",
"inputSchema": {
"type": "object",
"properties": {"text": {"type": "string"}},
"required": ["text"],
},
}
]
def log(msg: str) -> None:
print(msg, file=sys.stderr, flush=True)
async def read_message(reader: asyncio.StreamReader) -> dict[str, Any] | None:
line = await reader.readline()
if not line:
return None
return json.loads(line.decode("utf-8"))
def write_message(payload: dict[str, Any]) -> None:
sys.stdout.write(json.dumps(payload) + "\
")
sys.stdout.flush()
def make_response(req_id: Any, result: dict[str, Any]) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": req_id, "result": result}
def make_error(req_id: Any, code: int, message: str) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": req_id, "error": {"code": code, "message": message}}
Three primitives so far: read a line, parse it, and write a line. Notice every log goes to stderr \u2014 writing to stdout outside of write_message corrupts the wire protocol.
Dispatching requests
Map the JSON-RPC method field to a handler. A flat dict beats nested if/elif:
async def handle_initialize(params: dict[str, Any]) -> dict[str, Any]:
return {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"tools": {}},
"serverInfo": {"name": "echo-server", "version": "0.1.0"},
}
async def handle_tools_list(params: dict[str, Any]) -> dict[str, Any]:
return {"tools": TOOLS}
async def handle_tools_call(params: dict[str, Any]) -> dict[str, Any]:
name = params.get("name")
arguments = params.get("arguments") or {}
if name != "echo":
raise ValueError(f"unknown tool: {name}")
text = arguments.get("text", "")
return {"content": [{"type": "text", "text": text}]}
HANDLERS = {
"initialize": handle_initialize,
"tools/list": handle_tools_list,
"tools/call": handle_tools_call,
}
async def dispatch(method: str, params: dict[str, Any]) -> dict[str, Any]:
handler = HANDLERS.get(method)
if handler is None:
raise LookupError(f"method not found: {method}")
return await handler(params)
The main loop
Wire stdin to an asyncio.StreamReader and run until EOF:
async def main() -> None:
loop = asyncio.get_running_loop()
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
log("server starting")
while True:
msg = await read_message(reader)
if msg is None:
log("EOF, shutting down")
return
req_id = msg.get("id")
method = msg.get("method", "")
params = msg.get("params") or {}
if req_id is None:
log(f"notification: {method}")
continue
try:
result = await dispatch(method, params)
write_message(make_response(req_id, result))
except LookupError as exc:
write_message(make_error(req_id, -32601, str(exc)))
except Exception as exc:
log(f"handler error: {exc!r}")
write_message(make_error(req_id, -32603, str(exc)))
if __name__ == "__main__":
asyncio.run(main())
connect_read_pipe is the asyncio-native way to bind a file descriptor to a stream reader. The alternative \u2014 wrapping sys.stdin in loop.run_in_executor \u2014 works but burns a thread per server and adds 5-10ms of latency per message round-trip.
Registering with Claude Code
Add the server to your project-level .mcp.json:
{
"mcpServers": {
"echo": {
"command": "python3",
"args": ["/path/to/server.py"]
}
}
}
Restart Claude Code, then verify with claude mcp list. If the server fails to register, tail stderr by replacing the args with ["-u", "/path/to/server.py"] and watching the Claude Code log file.
Failure modes you will hit
- Buffered stdout: forgetting
flush=True(or usingprintwithout it) makes Claude Code hang waiting for a response that's stuck in a 4KB OS buffer. Always flush after writing one message. - Logging to stdout: a stray
print("debug")corrupts the JSON stream. Route all logs to stderr. Runpython -u server.pyduring development to disable stdout buffering globally. - Blocking I/O in handlers: an
open(...).read()for a 200MB file freezes the event loop and stalls every concurrent tool call. Wrap blocking work inasyncio.to_threador useaiofiles. - Unhandled exceptions: an uncaught exception kills the process and Claude Code shows "server crashed" with no detail. Catch broadly in the dispatch layer (as above) and return a JSON-RPC error so the client gets a structured response.
Where to go from here
Add a second tool that does real work \u2014 read a file, run a subprocess, query a database. The pattern is identical: append a ToolSpec to TOOLS, add an if name == "..." branch in handle_tools_call, return {"content": [{"type": "text", "text": ...}]}. Once you have three or more tools, refactor the branch into a registry dict the same way HANDLERS works.
For production, consider migrating to the official Python SDK at github.com/modelcontextprotocol/python-sdk \u2014 it handles capability negotiation, resource subscriptions, and progress notifications you'd otherwise reinvent. But ship the raw version first. You'll debug protocol issues 3\u00d7 faster when you wrote the wire format yourself.
References: