4 from collections import defaultdict
5 from typing import Any, Tuple, Type
7 _Address = Tuple[str, int] # (host, port)
9 class SMTPChannel(asynchat.async_chat):
13 command_size_limits: defaultdict[str, int]
14 smtp_server: SMTPServer
17 received_lines: list[str]
26 command_size_limit: int
31 def max_command_size_limit(self) -> int: ...
37 data_size_limit: int = ...,
38 map: asyncore._maptype | None = ...,
39 enable_SMTPUTF8: bool = ...,
40 decode_data: bool = ...,
42 # base asynchat.async_chat.push() accepts bytes
43 def push(self, msg: str) -> None: ... # type: ignore
44 def collect_incoming_data(self, data: bytes) -> None: ...
45 def found_terminator(self) -> None: ...
46 def smtp_HELO(self, arg: str) -> None: ...
47 def smtp_NOOP(self, arg: str) -> None: ...
48 def smtp_QUIT(self, arg: str) -> None: ...
49 def smtp_MAIL(self, arg: str) -> None: ...
50 def smtp_RCPT(self, arg: str) -> None: ...
51 def smtp_RSET(self, arg: str) -> None: ...
52 def smtp_DATA(self, arg: str) -> None: ...
53 def smtp_EHLO(self, arg: str) -> None: ...
54 def smtp_HELP(self, arg: str) -> None: ...
55 def smtp_VRFY(self, arg: str) -> None: ...
56 def smtp_EXPN(self, arg: str) -> None: ...
58 class SMTPServer(asyncore.dispatcher):
59 channel_class: Type[SMTPChannel]
67 data_size_limit: int = ...,
68 map: asyncore._maptype | None = ...,
69 enable_SMTPUTF8: bool = ...,
70 decode_data: bool = ...,
72 def handle_accepted(self, conn: socket.socket, addr: Any) -> None: ...
74 self, peer: _Address, mailfrom: str, rcpttos: list[str], data: bytes | str, **kwargs: Any
77 class DebuggingServer(SMTPServer): ...
79 class PureProxy(SMTPServer):
80 def process_message( # type: ignore
81 self, peer: _Address, mailfrom: str, rcpttos: list[str], data: bytes | str
84 class MailmanProxy(PureProxy):
85 def process_message( # type: ignore
86 self, peer: _Address, mailfrom: str, rcpttos: list[str], data: bytes | str