from logging import Logger from socket import socket from threading import Condition, Event, Lock, Thread from types import ModuleType from typing import Any, Callable, Iterable, Protocol, Sequence, Tuple, Type from paramiko.auth_handler import AuthHandler, _InteractiveCallback from paramiko.channel import Channel from paramiko.message import Message from paramiko.packet import Packetizer from paramiko.pkey import PKey from paramiko.server import ServerInterface, SubsystemHandler from paramiko.sftp_client import SFTPClient from paramiko.ssh_gss import _SSH_GSSAuth from paramiko.util import ClosingContextManager _Addr = Tuple[str, int] class _KexEngine(Protocol): def start_kex(self) -> None: ... def parse_next(self, ptype: int, m: Message) -> None: ... class Transport(Thread, ClosingContextManager): active: bool hostname: str | None sock: socket packetizer: Packetizer local_version: str remote_version: str local_cipher: str local_kex_init: bytes | None local_mac: str | None local_compression: str | None session_id: bytes | None host_key_type: str | None host_key: PKey | None use_gss_kex: bool gss_kex_used: bool kexgss_ctxt: _SSH_GSSAuth | None gss_host: str kex_engine: _KexEngine | None H: bytes | None K: int | None initial_kex_done: bool in_kex: bool authenticated: bool lock: Lock channel_events: dict[int, Event] channels_seen: dict[int, bool] default_max_packet_size: int default_window_size: int saved_exception: Exception | None clear_to_send: Event clear_to_send_lock: Lock clear_to_send_timeout: float log_name: str logger: Logger auth_handler: AuthHandler | None global_response: Message | None completion_event: Event | None banner_timeout: float handshake_timeout: float auth_timeout: float disabled_algorithms: dict[str, Iterable[str]] | None server_mode: bool server_object: ServerInterface | None server_key_dict: dict[str, PKey] server_accepts: list[Channel] server_accept_cv: Condition subsystem_table: dict[str, tuple[Type[SubsystemHandler], Tuple[Any, ...], dict[str, Any]]] sys: ModuleType def __init__( self, sock: str | tuple[str, int] | socket, default_window_size: int = ..., default_max_packet_size: int = ..., gss_kex: bool = ..., gss_deleg_creds: bool = ..., disabled_algorithms: dict[str, Iterable[str]] | None = ..., ) -> None: ... @property def preferred_ciphers(self) -> Sequence[str]: ... @property def preferred_macs(self) -> Sequence[str]: ... @property def preferred_keys(self) -> Sequence[str]: ... @property def preferred_kex(self) -> Sequence[str]: ... @property def preferred_compression(self) -> Sequence[str]: ... def atfork(self) -> None: ... def get_security_options(self) -> SecurityOptions: ... def set_gss_host(self, gss_host: str | None, trust_dns: bool = ..., gssapi_requested: bool = ...) -> None: ... def start_client(self, event: Event | None = ..., timeout: float | None = ...) -> None: ... def start_server(self, event: Event = ..., server: ServerInterface | None = ...) -> None: ... def add_server_key(self, key: PKey) -> None: ... def get_server_key(self) -> PKey | None: ... @staticmethod def load_server_moduli(filename: str | None = ...) -> bool: ... def close(self) -> None: ... def get_remote_server_key(self) -> PKey: ... def is_active(self) -> bool: ... def open_session( self, window_size: int | None = ..., max_packet_size: int | None = ..., timeout: float | None = ... ) -> Channel: ... def open_x11_channel(self, src_addr: _Addr = ...) -> Channel: ... def open_forward_agent_channel(self) -> Channel: ... def open_forwarded_tcpip_channel(self, src_addr: _Addr, dest_addr: _Addr) -> Channel: ... def open_channel( self, kind: str, dest_addr: _Addr | None = ..., src_addr: _Addr | None = ..., window_size: int | None = ..., max_packet_size: int | None = ..., timeout: float | None = ..., ) -> Channel: ... def request_port_forward( self, address: str, port: int, handler: Callable[[Channel, _Addr, _Addr], None] | None = ... ) -> int: ... def cancel_port_forward(self, address: str, port: int) -> None: ... def open_sftp_client(self) -> SFTPClient | None: ... def send_ignore(self, byte_count: int = ...) -> None: ... def renegotiate_keys(self) -> None: ... def set_keepalive(self, interval: int) -> None: ... def global_request(self, kind: str, data: Iterable[Any] | None = ..., wait: bool = ...) -> Message | None: ... def accept(self, timeout: float | None = ...) -> Channel | None: ... def connect( self, hostkey: PKey | None = ..., username: str = ..., password: str | None = ..., pkey: PKey | None = ..., gss_host: str | None = ..., gss_auth: bool = ..., gss_kex: bool = ..., gss_deleg_creds: bool = ..., gss_trust_dns: bool = ..., ) -> None: ... def get_exception(self) -> Exception | None: ... def set_subsystem_handler(self, name: str, handler: Type[SubsystemHandler], *larg: Any, **kwarg: Any) -> None: ... def is_authenticated(self) -> bool: ... def get_username(self) -> str | None: ... def get_banner(self) -> bytes | None: ... def auth_none(self, username: str) -> list[str]: ... def auth_password(self, username: str, password: str, event: Event | None = ..., fallback: bool = ...) -> list[str]: ... def auth_publickey(self, username: str, key: PKey, event: Event | None = ...) -> list[str]: ... def auth_interactive(self, username: str, handler: _InteractiveCallback, submethods: str = ...) -> list[str]: ... def auth_interactive_dumb( self, username: str, handler: _InteractiveCallback | None = ..., submethods: str = ... ) -> list[str]: ... def auth_gssapi_with_mic(self, username: str, gss_host: str, gss_deleg_creds: bool) -> list[str]: ... def auth_gssapi_keyex(self, username: str) -> list[str]: ... def set_log_channel(self, name: str) -> None: ... def get_log_channel(self) -> str: ... def set_hexdump(self, hexdump: bool) -> None: ... def get_hexdump(self) -> bool: ... def use_compression(self, compress: bool = ...) -> None: ... def getpeername(self) -> tuple[str, int]: ... def stop_thread(self) -> None: ... def run(self) -> None: ... class SecurityOptions: def __init__(self, transport: Transport) -> None: ... @property def ciphers(self) -> Sequence[str]: ... @ciphers.setter def ciphers(self, x: Sequence[str]) -> None: ... @property def digests(self) -> Sequence[str]: ... @digests.setter def digests(self, x: Sequence[str]) -> None: ... @property def key_types(self) -> Sequence[str]: ... @key_types.setter def key_types(self, x: Sequence[str]) -> None: ... @property def kex(self) -> Sequence[str]: ... @kex.setter def kex(self, x: Sequence[str]) -> None: ... @property def compression(self) -> Sequence[str]: ... @compression.setter def compression(self, x: Sequence[str]) -> None: ... class ChannelMap: def __init__(self) -> None: ... def put(self, chanid: int, chan: Channel) -> None: ... def get(self, chanid: int) -> Channel: ... def delete(self, chanid: int) -> None: ... def values(self) -> list[Channel]: ... def __len__(self) -> int: ...