--- /dev/null
+import datetime
+import logging.handlers
+import subprocess
+import sys
+import time
+from typing import IO, Any, Callable, ContextManager, Dict, Iterable, Mapping, Sequence, Type, TypeVar
+
+import boto.connection
+
+_KT = TypeVar("_KT")
+_VT = TypeVar("_VT")
+
+if sys.version_info >= (3,):
+ # TODO move _StringIO definition into boto.compat once stubs exist and rename to StringIO
+ import io
+
+ _StringIO = io.StringIO
+
+ from hashlib import _Hash
+
+ _HashType = _Hash
+
+ from email.message import Message as _Message
+else:
+ # TODO move _StringIO definition into boto.compat once stubs exist and rename to StringIO
+ import StringIO
+
+ _StringIO = StringIO.StringIO[Any]
+
+ from hashlib import _hash
+
+ _HashType = _hash
+
+ # TODO use email.message.Message once stubs exist
+ _Message = Any
+
+_Provider = Any # TODO replace this with boto.provider.Provider once stubs exist
+_LockType = Any # TODO replace this with _thread.LockType once stubs exist
+
+JSONDecodeError: Type[ValueError]
+qsa_of_interest: list[str]
+
+def unquote_v(nv: str) -> str | tuple[str, str]: ...
+def canonical_string(
+ method: str, path: str, headers: Mapping[str, str | None], expires: int | None = ..., provider: _Provider | None = ...
+) -> str: ...
+def merge_meta(
+ headers: Mapping[str, str], metadata: Mapping[str, str], provider: _Provider | None = ...
+) -> Mapping[str, str]: ...
+def get_aws_metadata(headers: Mapping[str, str], provider: _Provider | None = ...) -> Mapping[str, str]: ...
+def retry_url(url: str, retry_on_404: bool = ..., num_retries: int = ..., timeout: int | None = ...) -> str: ...
+
+class LazyLoadMetadata(Dict[_KT, _VT]):
+ def __init__(self, url: str, num_retries: int, timeout: int | None = ...) -> None: ...
+
+def get_instance_metadata(
+ version: str = ..., url: str = ..., data: str = ..., timeout: int | None = ..., num_retries: int = ...
+) -> LazyLoadMetadata[Any, Any] | None: ...
+def get_instance_identity(
+ version: str = ..., url: str = ..., timeout: int | None = ..., num_retries: int = ...
+) -> Mapping[str, Any] | None: ...
+def get_instance_userdata(
+ version: str = ..., sep: str | None = ..., url: str = ..., timeout: int | None = ..., num_retries: int = ...
+) -> Mapping[str, str]: ...
+
+ISO8601: str
+ISO8601_MS: str
+RFC1123: str
+LOCALE_LOCK: _LockType
+
+def setlocale(name: str | tuple[str, str]) -> ContextManager[str]: ...
+def get_ts(ts: time.struct_time | None = ...) -> str: ...
+def parse_ts(ts: str) -> datetime.datetime: ...
+def find_class(module_name: str, class_name: str | None = ...) -> Type[Any] | None: ...
+def update_dme(username: str, password: str, dme_id: str, ip_address: str) -> str: ...
+def fetch_file(
+ uri: str, file: IO[str] | None = ..., username: str | None = ..., password: str | None = ...
+) -> IO[str] | None: ...
+
+class ShellCommand:
+ exit_code: int
+ command: subprocess._CMD
+ log_fp: _StringIO
+ wait: bool
+ fail_fast: bool
+ def __init__(
+ self, command: subprocess._CMD, wait: bool = ..., fail_fast: bool = ..., cwd: subprocess._TXT | None = ...
+ ) -> None: ...
+ process: subprocess.Popen[Any]
+ def run(self, cwd: subprocess._CMD | None = ...) -> int | None: ...
+ def setReadOnly(self, value) -> None: ...
+ def getStatus(self) -> int | None: ...
+ status: int | None
+ def getOutput(self) -> str: ...
+ output: str
+
+class AuthSMTPHandler(logging.handlers.SMTPHandler):
+ username: str
+ password: str
+ def __init__(
+ self, mailhost: str, username: str, password: str, fromaddr: str, toaddrs: Sequence[str], subject: str
+ ) -> None: ...
+
+class LRUCache(Dict[_KT, _VT]):
+ class _Item:
+ previous: LRUCache._Item | None
+ next: LRUCache._Item | None
+ key = ...
+ value = ...
+ def __init__(self, key, value) -> None: ...
+ _dict: dict[_KT, LRUCache._Item]
+ capacity: int
+ head: LRUCache._Item | None
+ tail: LRUCache._Item | None
+ def __init__(self, capacity: int) -> None: ...
+
+# This exists to work around Password.str's name shadowing the str type
+_str = str
+
+class Password:
+ hashfunc: Callable[[bytes], _HashType]
+ str: _str | None
+ def __init__(self, str: _str | None = ..., hashfunc: Callable[[bytes], _HashType] | None = ...) -> None: ...
+ def set(self, value: bytes | _str) -> None: ...
+ def __eq__(self, other: Any) -> bool: ...
+ def __len__(self) -> int: ...
+
+def notify(
+ subject: str,
+ body: str | None = ...,
+ html_body: Sequence[str] | str | None = ...,
+ to_string: str | None = ...,
+ attachments: Iterable[_Message] | None = ...,
+ append_instance_id: bool = ...,
+) -> None: ...
+def get_utf8_value(value: str) -> bytes: ...
+def mklist(value: Any) -> list[Any]: ...
+def pythonize_name(name: str) -> str: ...
+def write_mime_multipart(
+ content: list[tuple[str, str]], compress: bool = ..., deftype: str = ..., delimiter: str = ...
+) -> str: ...
+def guess_mime_type(content: str, deftype: str) -> str: ...
+def compute_md5(fp: IO[Any], buf_size: int = ..., size: int | None = ...) -> tuple[str, str, int]: ...
+def compute_hash(fp: IO[Any], buf_size: int = ..., size: int | None = ..., hash_algorithm: Any = ...) -> tuple[str, str, int]: ...
+def find_matching_headers(name: str, headers: Mapping[str, str | None]) -> list[str]: ...
+def merge_headers_by_name(name: str, headers: Mapping[str, str | None]) -> str: ...
+
+class RequestHook:
+ def handle_request_data(
+ self, request: boto.connection.HTTPRequest, response: boto.connection.HTTPResponse, error: bool = ...
+ ) -> Any: ...
+
+def host_is_ipv6(hostname: str) -> bool: ...
+def parse_host(hostname: str) -> str: ...