3 from _typeshed import Self
4 from socket import socket as _socket
5 from typing import Any, BinaryIO, Callable, ClassVar, Tuple, Type, TypeVar, Union
8 _RequestType = Union[_socket, Tuple[bytes, _socket]]
9 _AddressType = Union[Tuple[str, int], str]
13 RequestHandlerClass: Callable[..., BaseRequestHandler]
14 server_address: tuple[str, int]
16 allow_reuse_address: bool
17 request_queue_size: int
20 def __init__(self, server_address: Any, RequestHandlerClass: Callable[..., BaseRequestHandler]) -> None: ...
21 def fileno(self) -> int: ...
22 def handle_request(self) -> None: ...
23 def serve_forever(self, poll_interval: float = ...) -> None: ...
24 def shutdown(self) -> None: ...
25 def server_close(self) -> None: ...
26 def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
27 def get_request(self) -> tuple[Any, Any]: ...
28 def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
29 def handle_timeout(self) -> None: ...
30 def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
31 def server_activate(self) -> None: ...
32 def server_bind(self) -> None: ...
33 def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
34 def __enter__(self: Self) -> Self: ...
36 self, exc_type: Type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None
38 def service_actions(self) -> None: ...
39 def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
40 def close_request(self, request: _RequestType) -> None: ... # undocumented
42 class TCPServer(BaseServer):
45 server_address: tuple[str, int],
46 RequestHandlerClass: Callable[..., BaseRequestHandler],
47 bind_and_activate: bool = ...,
49 def get_request(self) -> tuple[_socket, Any]: ...
50 def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
51 def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
52 def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
53 def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
54 def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
55 def close_request(self, request: _RequestType) -> None: ... # undocumented
57 class UDPServer(BaseServer):
60 server_address: tuple[str, int],
61 RequestHandlerClass: Callable[..., BaseRequestHandler],
62 bind_and_activate: bool = ...,
64 def get_request(self) -> tuple[tuple[bytes, _socket], Any]: ...
65 def finish_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
66 def handle_error(self, request: _RequestType, client_address: _AddressType) -> None: ...
67 def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
68 def verify_request(self, request: _RequestType, client_address: _AddressType) -> bool: ...
69 def shutdown_request(self, request: _RequestType) -> None: ... # undocumented
70 def close_request(self, request: _RequestType) -> None: ... # undocumented
72 if sys.platform != "win32":
73 class UnixStreamServer(BaseServer):
76 server_address: str | bytes,
77 RequestHandlerClass: Callable[..., BaseRequestHandler],
78 bind_and_activate: bool = ...,
80 class UnixDatagramServer(BaseServer):
83 server_address: str | bytes,
84 RequestHandlerClass: Callable[..., BaseRequestHandler],
85 bind_and_activate: bool = ...,
88 if sys.platform != "win32":
90 timeout: float | None # undocumented
91 active_children: set[int] | None # undocumented
92 max_children: int # undocumented
93 if sys.version_info >= (3, 7):
95 def collect_children(self, *, blocking: bool = ...) -> None: ... # undocumented
96 def handle_timeout(self) -> None: ... # undocumented
97 def service_actions(self) -> None: ... # undocumented
98 def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
99 def server_close(self) -> None: ...
101 class ThreadingMixIn:
103 if sys.version_info >= (3, 7):
105 def process_request_thread(self, request: _RequestType, client_address: _AddressType) -> None: ... # undocumented
106 def process_request(self, request: _RequestType, client_address: _AddressType) -> None: ...
107 def server_close(self) -> None: ...
109 if sys.platform != "win32":
110 class ForkingTCPServer(ForkingMixIn, TCPServer): ...
111 class ForkingUDPServer(ForkingMixIn, UDPServer): ...
113 class ThreadingTCPServer(ThreadingMixIn, TCPServer): ...
114 class ThreadingUDPServer(ThreadingMixIn, UDPServer): ...
116 if sys.platform != "win32":
117 class ThreadingUnixStreamServer(ThreadingMixIn, UnixStreamServer): ...
118 class ThreadingUnixDatagramServer(ThreadingMixIn, UnixDatagramServer): ...
120 class BaseRequestHandler:
121 # Those are technically of types, respectively:
124 # But there are some concerns that having unions here would cause
125 # too much inconvenience to people using it (see
126 # https://github.com/python/typeshed/pull/384#issuecomment-234649696)
130 def __init__(self, request: _RequestType, client_address: _AddressType, server: BaseServer) -> None: ...
131 def setup(self) -> None: ...
132 def handle(self) -> None: ...
133 def finish(self) -> None: ...
135 class StreamRequestHandler(BaseRequestHandler):
136 rbufsize: ClassVar[int] # undocumented
137 wbufsize: ClassVar[int] # undocumented
138 timeout: ClassVar[float | None] # undocumented
139 disable_nagle_algorithm: ClassVar[bool] # undocumented
140 connection: _socket # undocumented
144 class DatagramRequestHandler(BaseRequestHandler):
145 packet: _socket # undocumented
146 socket: _socket # undocumented