- Published on
Inside Hypercorn: A Multi-Protocol ASGI Server Implementation
Inside Hypercorn: A Multi-Protocol ASGI Server Implementation
Hypercorn is a Python ASGI web server supporting HTTP/1.1, HTTP/2, HTTP/3 (QUIC), and WebSocket protocols. This deep dive examines its implementation, focusing on event loop abstraction, protocol handling, and concurrency management.
Architecture Overview
Hypercorn's architecture centers on three key abstractions:
- Event Loop Backend - Pluggable async runtime (asyncio or Trio)
- Protocol Layer - Protocol-specific implementations (H11, H2, H3, WebSocket)
- Worker Context - Request lifecycle and resource management
┌─────────────────────────────────────┐
│ Worker Process │
│ ┌───────────────────────────────┐ │
│ │ Event Loop (asyncio/Trio) │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ Protocol Handlers │ │ │
│ │ │ • H11Protocol │ │ │
│ │ │ • H2Protocol │ │ │
│ │ │ • H3Protocol │ │ │
│ │ │ • WSStream │ │ │
│ │ └─────────────────────────┘ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ ASGI Application │ │ │
│ │ └─────────────────────────┘ │ │
│ └───────────────────────────────┘ │
│ WorkerContext (request tracking) │
└─────────────────────────────────────┘
Event Loop Abstraction
Hypercorn supports both asyncio and Trio through parallel implementations. The dual-backend approach trades code duplication for zero abstraction overhead.
Asyncio Implementation
async def worker_serve(
app: AppWrapper,
config: Config,
*,
sockets: Optional[Sockets] = None,
shutdown_trigger: Optional[Callable[..., Awaitable]] = None,
) -> None:
loop = asyncio.get_event_loop()
# Signal handling
signal_event = asyncio.Event()
for signal_name in {"SIGINT", "SIGTERM", "SIGBREAK"}:
if hasattr(signal, signal_name):
loop.add_signal_handler(getattr(signal, signal_name),
lambda: signal_event.set())
# Lifespan management
lifespan = Lifespan(app, config, loop, lifespan_state)
await lifespan.wait_for_startup()
# Server setup
async def _server_callback(reader: StreamReader, writer: StreamWriter):
task = asyncio.current_task(loop)
server_tasks.add(task)
task.add_done_callback(server_tasks.discard)
await TCPServer(app, loop, config, context, lifespan_state,
reader, writer)
# TCP servers (HTTP/1.1 and HTTP/2)
servers = []
for sock in sockets.secure_sockets:
servers.append(await asyncio.start_server(
_server_callback,
backlog=config.backlog,
ssl=ssl_context,
sock=sock
))
# UDP endpoint (HTTP/3/QUIC)
for sock in sockets.quic_sockets:
_, protocol = await loop.create_datagram_endpoint(
lambda: UDPServer(app, loop, config, context, lifespan_state),
sock=sock
)
Key differences from typical servers:
- ASGI lifespan protocol handling before accepting connections
- Separate TCP/UDP endpoints for HTTP vs QUIC
- Task tracking for graceful shutdown
Trio Implementation
async def worker_serve(
app: AppWrapper,
config: Config,
*,
sockets: Optional[Sockets] = None,
shutdown_trigger: Optional[Callable[..., Awaitable[None]]] = None,
task_status: trio.TaskStatus = trio.TASK_STATUS_IGNORED,
) -> None:
async with trio.open_nursery() as lifespan_nursery:
await lifespan_nursery.start(lifespan.handle_lifespan)
await lifespan.wait_for_startup()
async with trio.open_nursery() as server_nursery:
# SSL wrapping via SSLListener
listeners = []
for sock in sockets.secure_sockets:
listeners.append(
trio.SSLListener(
trio.SocketListener(trio.socket.from_stdlib_socket(sock)),
ssl_context,
https_compatible=True,
)
)
# Serve with structured concurrency
nursery.start_soon(
trio.serve_listeners,
partial(TCPServer, app, config, context,
ConnectionState(lifespan_state.copy())),
listeners,
handler_nursery=server_nursery,
)
Trio's structured concurrency:
- Nested nurseries for lifecycle management
- Explicit cancellation scopes for graceful shutdown
- No manual task tracking - nursery handles cleanup
Event Abstraction
Both backends implement a unified Event interface through wrappers:
class EventWrapper:
def __init__(self) -> None:
self._event = asyncio.Event()
async def clear(self) -> None:
self._event.clear()
async def wait(self) -> None:
await self._event.wait()
async def set(self) -> None:
self._event.set()
Trio's version differs only in the underlying event type. This allows protocol implementations to be backend-agnostic.
Protocol Implementations
HTTP/1.1 (H11Protocol)
Built on the h11 state machine library. Handles connection recycling and upgrade detection.
class H11Protocol:
def __init__(self, app, config, context, task_group,
connection_state, ssl, client, server, send):
self.connection = h11.Connection(
h11.SERVER,
max_incomplete_event_size=config.h11_max_incomplete_size
)
self.stream: Optional[Union[HTTPStream, WSStream]] = None
self.keep_alive_requests = 0
Protocol upgrade detection:
async def _check_protocol(self, event: h11.Request) -> None:
upgrade_value = ""
for name, value in event.headers:
if name.decode("latin1").strip().lower() == "upgrade":
upgrade_value = value.decode("latin1").strip()
# H2C upgrade (HTTP/2 over cleartext)
if upgrade_value.lower() == "h2c" and not has_body:
await self._send_h11_event(
h11.InformationalResponse(
status_code=101,
headers=[(b"connection", b"upgrade"), (b"upgrade", b"h2c")]
)
)
raise H2CProtocolRequiredError(self.connection.trailing_data[0], event)
# Direct HTTP/2 connection preface
elif event.method == b"PRI" and event.target == b"*":
raise H2ProtocolAssumedError(b"PRI * HTTP/2.0\r\n\r\n" + ...)
Connection recycling:
async def _maybe_recycle(self) -> None:
await self._close_stream()
if (not self.context.terminated.is_set() and
self.connection.our_state is h11.DONE and
self.connection.their_state is h11.DONE):
try:
self.connection.start_next_cycle()
except h11.LocalProtocolError:
await self.send(Closed())
else:
await self.can_read.set()
await self.send(Updated(idle=True))
HTTP/2 (H2Protocol)
Uses the h2 library for frame handling, with custom flow control and prioritization.
class H2Protocol:
def __init__(self, app, config, context, task_group,
connection_state, ssl, client, server, send):
self.connection = h2.connection.H2Connection(
config=h2.config.H2Configuration(
client_side=False,
header_encoding=None
)
)
self.connection.local_settings = h2.settings.Settings(
client=False,
initial_values={
h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS:
config.h2_max_concurrent_streams,
h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE:
config.h2_max_header_list_size,
h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL: 1,
},
)
self.streams: Dict[int, Union[HTTPStream, WSStream]] = {}
self.priority = priority.PriorityTree()
self.stream_buffers: Dict[int, StreamBuffer] = {}
Priority-based sending:
async def send_task(self) -> None:
"""Separate task for prioritized data transmission"""
while not self.closed:
try:
stream_id = next(self.priority) # Get highest priority stream
except priority.DeadlockError:
await self.has_data.wait()
await self.has_data.clear()
else:
await self._send_data(stream_id)
async def _send_data(self, stream_id: int) -> None:
chunk_size = min(
self.connection.local_flow_control_window(stream_id),
self.connection.max_outbound_frame_size,
)
data = await self.stream_buffers[stream_id].pop(chunk_size)
if data:
self.connection.send_data(stream_id, data)
await self._flush()
else:
self.priority.block(stream_id) # No data available
if self.stream_buffers[stream_id].complete:
self.connection.end_stream(stream_id)
del self.stream_buffers[stream_id]
self.priority.remove_stream(stream_id)
Why a separate send task? HTTP/2 allows multiple concurrent streams but requires fair, priority-based scheduling. The send task continuously pulls from the priority tree, respecting flow control windows.
Flow control buffering:
class StreamBuffer:
def __init__(self, event_class: Type[IOEvent]) -> None:
self.buffer = bytearray()
self._complete = False
self._is_empty = event_class()
self._paused = event_class()
async def push(self, data: bytes) -> None:
"""Backpressure when buffer is full"""
if self._complete:
raise BufferCompleteError()
self.buffer.extend(data)
await self._is_empty.clear()
if len(self.buffer) >= BUFFER_HIGH_WATER: # 2 * 16KB
await self._paused.wait() # Block until drained
await self._paused.clear()
async def pop(self, max_length: int) -> bytes:
"""Flow-controlled data extraction"""
length = min(len(self.buffer), max_length)
data = bytes(self.buffer[:length])
del self.buffer[:length]
if len(data) < BUFFER_LOW_WATER:
await self._paused.set() # Unpause producer
return data
HTTP/3 (H3Protocol)
Delegates to aioquic for QUIC and HTTP/3 framing. Simplest implementation due to library handling complexity.
class H3Protocol:
def __init__(self, app, config, context, task_group, state,
client, server, quic, send):
self.connection = H3Connection(quic)
self.streams: Dict[int, Union[HTTPStream, WSStream]] = {}
async def handle(self, quic_event: QuicEvent) -> None:
for event in self.connection.handle_event(quic_event):
if isinstance(event, HeadersReceived):
if not self.context.terminated.is_set():
await self._create_stream(event)
elif isinstance(event, DataReceived):
await self.streams[event.stream_id].handle(
Body(stream_id=event.stream_id, data=event.data)
)
QUIC handles flow control, retransmission, and multiplexing natively, so no manual buffering is needed.
WebSocket (WSStream)
Handles WebSocket handshake, framing via wsproto, and ASGI WebSocket extension.
class Handshake:
def is_valid(self) -> bool:
"""Validate WebSocket upgrade request"""
if self.http_version < "1.1":
return False
elif self.http_version == "1.1":
if self.key is None:
return False
if not any(token.lower() == "upgrade"
for token in self.connection_tokens):
return False
if self.upgrade.lower() != b"websocket":
return False
if self.version != WEBSOCKET_VERSION: # "13"
return False
return True
def accept(self, subprotocol, additional_headers):
"""Generate WebSocket accept response"""
headers = []
if subprotocol is not None:
headers.append((b"sec-websocket-protocol",
subprotocol.encode()))
# Extension negotiation
extensions = [PerMessageDeflate()]
if self.extensions:
accepts = server_extensions_handshake(self.extensions,
extensions)
if accepts:
headers.append((b"sec-websocket-extensions", accepts))
headers.append((b"sec-websocket-accept",
generate_accept_token(self.key)))
if self.http_version == "1.1":
headers.extend([(b"upgrade", b"WebSocket"),
(b"connection", b"Upgrade")])
status_code = 101
return status_code, headers, Connection(ConnectionType.SERVER,
extensions)
Worker Context and Request Lifecycle
The WorkerContext tracks requests and manages graceful shutdown.
class WorkerContext:
def __init__(self, max_requests: Optional[int]) -> None:
self.max_requests = max_requests
self.requests = 0
self.terminate = self.event_class()
self.terminated = self.event_class()
async def mark_request(self) -> None:
"""Called after each request"""
if self.max_requests is None:
return
self.requests += 1
if self.requests > self.max_requests:
await self.terminate.set()
Graceful shutdown flow:
terminateevent is set (max requests reached or signal received)- Server stops accepting new connections
- Existing requests complete within
graceful_timeout terminatedevent is set- Worker exits
Key Implementation Patterns
1. Protocol Detection and Upgrading
HTTP/1.1 can upgrade to HTTP/2 (h2c) or WebSocket. Detection happens at the H11Protocol layer:
- WebSocket: Check for
Connection: upgrade+Upgrade: websocket - H2C: Check for
Upgrade: h2cheader orPRI * HTTP/2.0method - Direct HTTP/2: TLS ALPN negotiation (outside protocol layer)
2. Stream vs Connection Lifecycle
| Protocol | Lifecycle |
|---|---|
| HTTP/1.1 | One stream per connection, recycled via keep-alive |
| HTTP/2 | Multiple streams per connection, tracked in dict |
| HTTP/3 | Multiple streams per QUIC connection |
| WebSocket | One long-lived stream per connection |
3. Backpressure Handling
| Protocol | Mechanism |
|---|---|
| HTTP/1.1 | TCP socket backpressure (OS handles) |
| HTTP/2 | StreamBuffer with high/low watermarks (manual) |
| HTTP/3 | QUIC flow control (library handles) |
| WebSocket | Frame size limits + TCP backpressure |
Takeaways
Event loop abstraction via parallel implementations - Duplication beats leaky abstractions when backends differ significantly (asyncio vs Trio)
Protocol layering enables upgrade paths - H11Protocol detects upgrades and delegates to H2Protocol/WSStream dynamically
HTTP/2 requires manual flow control - Unlike HTTP/1.1 (TCP handles it) and HTTP/3 (QUIC handles it), HTTP/2 needs application-level buffering and prioritization
Graceful shutdown needs coordination - Lifespan events, connection draining, and timeout enforcement must work together
ASGI is not just HTTP - Supporting WebSocket, HTTP/2 server push, and trailers requires careful ASGI event mapping
Resource limits prevent DoS - Max requests, header size, message size, and concurrent streams all need enforcement
References: