--- /dev/null
+from logging import Logger
+from socket import socket
+from threading import Condition, Event, Lock, Thread
+from types import ModuleType
+from typing import Any, Callable, Iterable, Protocol, Sequence, Tuple, Type
+
+from paramiko.auth_handler import AuthHandler, _InteractiveCallback
+from paramiko.channel import Channel
+from paramiko.message import Message
+from paramiko.packet import Packetizer
+from paramiko.pkey import PKey
+from paramiko.server import ServerInterface, SubsystemHandler
+from paramiko.sftp_client import SFTPClient
+from paramiko.ssh_gss import _SSH_GSSAuth
+from paramiko.util import ClosingContextManager
+
+_Addr = Tuple[str, int]
+
+class _KexEngine(Protocol):
+ def start_kex(self) -> None: ...
+ def parse_next(self, ptype: int, m: Message) -> None: ...
+
+class Transport(Thread, ClosingContextManager):
+ active: bool
+ hostname: str | None
+ sock: socket
+ packetizer: Packetizer
+ local_version: str
+ remote_version: str
+ local_cipher: str
+ local_kex_init: bytes | None
+ local_mac: str | None
+ local_compression: str | None
+ session_id: bytes | None
+ host_key_type: str | None
+ host_key: PKey | None
+ use_gss_kex: bool
+ gss_kex_used: bool
+ kexgss_ctxt: _SSH_GSSAuth | None
+ gss_host: str
+ kex_engine: _KexEngine | None
+ H: bytes | None
+ K: int | None
+ initial_kex_done: bool
+ in_kex: bool
+ authenticated: bool
+ lock: Lock
+ channel_events: dict[int, Event]
+ channels_seen: dict[int, bool]
+ default_max_packet_size: int
+ default_window_size: int
+ saved_exception: Exception | None
+ clear_to_send: Event
+ clear_to_send_lock: Lock
+ clear_to_send_timeout: float
+ log_name: str
+ logger: Logger
+ auth_handler: AuthHandler | None
+ global_response: Message | None
+ completion_event: Event | None
+ banner_timeout: float
+ handshake_timeout: float
+ auth_timeout: float
+ disabled_algorithms: dict[str, Iterable[str]] | None
+ server_mode: bool
+ server_object: ServerInterface | None
+ server_key_dict: dict[str, PKey]
+ server_accepts: list[Channel]
+ server_accept_cv: Condition
+ subsystem_table: dict[str, tuple[Type[SubsystemHandler], Tuple[Any, ...], dict[str, Any]]]
+ sys: ModuleType
+ def __init__(
+ self,
+ sock: str | tuple[str, int] | socket,
+ default_window_size: int = ...,
+ default_max_packet_size: int = ...,
+ gss_kex: bool = ...,
+ gss_deleg_creds: bool = ...,
+ disabled_algorithms: dict[str, Iterable[str]] | None = ...,
+ ) -> None: ...
+ @property
+ def preferred_ciphers(self) -> Sequence[str]: ...
+ @property
+ def preferred_macs(self) -> Sequence[str]: ...
+ @property
+ def preferred_keys(self) -> Sequence[str]: ...
+ @property
+ def preferred_kex(self) -> Sequence[str]: ...
+ @property
+ def preferred_compression(self) -> Sequence[str]: ...
+ def atfork(self) -> None: ...
+ def get_security_options(self) -> SecurityOptions: ...
+ def set_gss_host(self, gss_host: str | None, trust_dns: bool = ..., gssapi_requested: bool = ...) -> None: ...
+ def start_client(self, event: Event | None = ..., timeout: float | None = ...) -> None: ...
+ def start_server(self, event: Event = ..., server: ServerInterface | None = ...) -> None: ...
+ def add_server_key(self, key: PKey) -> None: ...
+ def get_server_key(self) -> PKey | None: ...
+ @staticmethod
+ def load_server_moduli(filename: str | None = ...) -> bool: ...
+ def close(self) -> None: ...
+ def get_remote_server_key(self) -> PKey: ...
+ def is_active(self) -> bool: ...
+ def open_session(
+ self, window_size: int | None = ..., max_packet_size: int | None = ..., timeout: float | None = ...
+ ) -> Channel: ...
+ def open_x11_channel(self, src_addr: _Addr = ...) -> Channel: ...
+ def open_forward_agent_channel(self) -> Channel: ...
+ def open_forwarded_tcpip_channel(self, src_addr: _Addr, dest_addr: _Addr) -> Channel: ...
+ def open_channel(
+ self,
+ kind: str,
+ dest_addr: _Addr | None = ...,
+ src_addr: _Addr | None = ...,
+ window_size: int | None = ...,
+ max_packet_size: int | None = ...,
+ timeout: float | None = ...,
+ ) -> Channel: ...
+ def request_port_forward(
+ self, address: str, port: int, handler: Callable[[Channel, _Addr, _Addr], None] | None = ...
+ ) -> int: ...
+ def cancel_port_forward(self, address: str, port: int) -> None: ...
+ def open_sftp_client(self) -> SFTPClient | None: ...
+ def send_ignore(self, byte_count: int = ...) -> None: ...
+ def renegotiate_keys(self) -> None: ...
+ def set_keepalive(self, interval: int) -> None: ...
+ def global_request(self, kind: str, data: Iterable[Any] | None = ..., wait: bool = ...) -> Message | None: ...
+ def accept(self, timeout: float | None = ...) -> Channel | None: ...
+ def connect(
+ self,
+ hostkey: PKey | None = ...,
+ username: str = ...,
+ password: str | None = ...,
+ pkey: PKey | None = ...,
+ gss_host: str | None = ...,
+ gss_auth: bool = ...,
+ gss_kex: bool = ...,
+ gss_deleg_creds: bool = ...,
+ gss_trust_dns: bool = ...,
+ ) -> None: ...
+ def get_exception(self) -> Exception | None: ...
+ def set_subsystem_handler(self, name: str, handler: Type[SubsystemHandler], *larg: Any, **kwarg: Any) -> None: ...
+ def is_authenticated(self) -> bool: ...
+ def get_username(self) -> str | None: ...
+ def get_banner(self) -> bytes | None: ...
+ def auth_none(self, username: str) -> list[str]: ...
+ def auth_password(self, username: str, password: str, event: Event | None = ..., fallback: bool = ...) -> list[str]: ...
+ def auth_publickey(self, username: str, key: PKey, event: Event | None = ...) -> list[str]: ...
+ def auth_interactive(self, username: str, handler: _InteractiveCallback, submethods: str = ...) -> list[str]: ...
+ def auth_interactive_dumb(
+ self, username: str, handler: _InteractiveCallback | None = ..., submethods: str = ...
+ ) -> list[str]: ...
+ def auth_gssapi_with_mic(self, username: str, gss_host: str, gss_deleg_creds: bool) -> list[str]: ...
+ def auth_gssapi_keyex(self, username: str) -> list[str]: ...
+ def set_log_channel(self, name: str) -> None: ...
+ def get_log_channel(self) -> str: ...
+ def set_hexdump(self, hexdump: bool) -> None: ...
+ def get_hexdump(self) -> bool: ...
+ def use_compression(self, compress: bool = ...) -> None: ...
+ def getpeername(self) -> tuple[str, int]: ...
+ def stop_thread(self) -> None: ...
+ def run(self) -> None: ...
+
+class SecurityOptions:
+ def __init__(self, transport: Transport) -> None: ...
+ @property
+ def ciphers(self) -> Sequence[str]: ...
+ @ciphers.setter
+ def ciphers(self, x: Sequence[str]) -> None: ...
+ @property
+ def digests(self) -> Sequence[str]: ...
+ @digests.setter
+ def digests(self, x: Sequence[str]) -> None: ...
+ @property
+ def key_types(self) -> Sequence[str]: ...
+ @key_types.setter
+ def key_types(self, x: Sequence[str]) -> None: ...
+ @property
+ def kex(self) -> Sequence[str]: ...
+ @kex.setter
+ def kex(self, x: Sequence[str]) -> None: ...
+ @property
+ def compression(self) -> Sequence[str]: ...
+ @compression.setter
+ def compression(self, x: Sequence[str]) -> None: ...
+
+class ChannelMap:
+ def __init__(self) -> None: ...
+ def put(self, chanid: int, chan: Channel) -> None: ...
+ def get(self, chanid: int) -> Channel: ...
+ def delete(self, chanid: int) -> None: ...
+ def values(self) -> list[Channel]: ...
+ def __len__(self) -> int: ...