--- /dev/null
+import datetime\r
+import logging.handlers\r
+import subprocess\r
+import sys\r
+import time\r
+\r
+import boto.connection\r
+from typing import (\r
+ Any,\r
+ Callable,\r
+ ContextManager,\r
+ Dict,\r
+ IO,\r
+ Iterable,\r
+ List,\r
+ Mapping,\r
+ Optional,\r
+ Sequence,\r
+ Tuple,\r
+ Type,\r
+ TypeVar,\r
+ Union,\r
+)\r
+\r
+_KT = TypeVar('_KT')\r
+_VT = TypeVar('_VT')\r
+\r
+if sys.version_info[0] >= 3:\r
+ # TODO move _StringIO definition into boto.compat once stubs exist and rename to StringIO\r
+ import io\r
+ _StringIO = io.StringIO\r
+\r
+ from hashlib import _Hash\r
+ _HashType = _Hash\r
+\r
+ from email.message import Message as _Message\r
+else:\r
+ # TODO move _StringIO definition into boto.compat once stubs exist and rename to StringIO\r
+ import StringIO\r
+ _StringIO = StringIO.StringIO\r
+\r
+ from hashlib import _hash\r
+ _HashType = _hash\r
+\r
+ # TODO use email.message.Message once stubs exist\r
+ _Message = Any\r
+\r
+_Provider = Any # TODO replace this with boto.provider.Provider once stubs exist\r
+_LockType = Any # TODO replace this with _thread.LockType once stubs exist\r
+\r
+\r
+JSONDecodeError = ... # type: Type[ValueError]\r
+qsa_of_interest = ... # type: List[str]\r
+\r
+\r
+def unquote_v(nv: str) -> Union[str, Tuple[str, str]]: ...\r
+def canonical_string(\r
+ method: str,\r
+ path: str,\r
+ headers: Mapping[str, Optional[str]],\r
+ expires: Optional[int] = ...,\r
+ provider: Optional[_Provider] = ...,\r
+) -> str: ...\r
+def merge_meta(\r
+ headers: Mapping[str, str],\r
+ metadata: Mapping[str, str],\r
+ provider: Optional[_Provider] = ...,\r
+) -> Mapping[str, str]: ...\r
+def get_aws_metadata(\r
+ headers: Mapping[str, str],\r
+ provider: Optional[_Provider] = ...,\r
+) -> Mapping[str, str]: ...\r
+def retry_url(\r
+ url: str,\r
+ retry_on_404: bool = ...,\r
+ num_retries: int = ...,\r
+ timeout: Optional[int] = ...,\r
+) -> str: ...\r
+\r
+class LazyLoadMetadata(Dict[_KT, _VT]):\r
+ def __init__(\r
+ self,\r
+ url: str,\r
+ num_retries: int,\r
+ timeout: Optional[int] = ...,\r
+ ) -> None: ...\r
+\r
+def get_instance_metadata(\r
+ version: str = ...,\r
+ url: str = ...,\r
+ data: str = ...,\r
+ timeout: Optional[int] = ...,\r
+ num_retries: int = ...,\r
+) -> Optional[LazyLoadMetadata]: ...\r
+def get_instance_identity(\r
+ version: str = ...,\r
+ url: str = ...,\r
+ timeout: Optional[int] = ...,\r
+ num_retries: int = ...,\r
+) -> Optional[Mapping[str, Any]]: ...\r
+def get_instance_userdata(\r
+ version: str = ...,\r
+ sep: Optional[str] = ...,\r
+ url: str = ...,\r
+ timeout: Optional[int] = ...,\r
+ num_retries: int = ...,\r
+) -> Mapping[str, str]: ...\r
+\r
+ISO8601 = ... # type: str\r
+ISO8601_MS = ... # type: str\r
+RFC1123 = ... # type: str\r
+LOCALE_LOCK = ... # type: _LockType\r
+\r
+def setlocale(name: Union[str, Tuple[str, str]]) -> ContextManager[str]: ...\r
+def get_ts(ts: Optional[time.struct_time] = ...) -> str: ...\r
+def parse_ts(ts: str) -> datetime.datetime: ...\r
+def find_class(module_name: str, class_name: Optional[str] = ...) -> Optional[Type[Any]]: ...\r
+def update_dme(username: str, password: str, dme_id: str, ip_address: str) -> str: ...\r
+def fetch_file(\r
+ uri: str,\r
+ file: Optional[IO[str]] = ...,\r
+ username: Optional[str] = ...,\r
+ password: Optional[str] = ...,\r
+) -> Optional[IO[str]]: ...\r
+\r
+class ShellCommand:\r
+ exit_code = ... # type: int\r
+ command = ... # type: subprocess._CMD\r
+ log_fp = ... # type: _StringIO\r
+ wait = ... # type: bool\r
+ fail_fast = ... # type: bool\r
+\r
+ def __init__(\r
+ self,\r
+ command: subprocess._CMD,\r
+ wait: bool = ...,\r
+ fail_fast: bool = ...,\r
+ cwd: Optional[subprocess._TXT] = ...,\r
+ ) -> None: ...\r
+\r
+ process = ... # type: subprocess.Popen\r
+\r
+ def run(self, cwd: Optional[subprocess._CMD] = ...) -> Optional[int]: ...\r
+ def setReadOnly(self, value) -> None: ...\r
+ def getStatus(self) -> Optional[int]: ...\r
+\r
+ status = ... # type: Optional[int]\r
+\r
+ def getOutput(self) -> str: ...\r
+\r
+ output = ... # type: str\r
+\r
+class AuthSMTPHandler(logging.handlers.SMTPHandler):\r
+ username = ... # type: str\r
+ password = ... # type: str\r
+ def __init__(\r
+ self,\r
+ mailhost: str,\r
+ username: str,\r
+ password: str,\r
+ fromaddr: str,\r
+ toaddrs: Sequence[str],\r
+ subject: str,\r
+ ) -> None: ...\r
+\r
+class LRUCache(Dict[_KT, _VT]):\r
+ class _Item:\r
+ previous = ... # type: Optional[LRUCache._Item]\r
+ next = ... # type: Optional[LRUCache._Item]\r
+ key = ...\r
+ value = ...\r
+ def __init__(self, key, value) -> None: ...\r
+\r
+ _dict = ... # type: Dict[_KT, LRUCache._Item]\r
+ capacity = ... # type: int\r
+ head = ... # type: Optional[LRUCache._Item]\r
+ tail = ... # type: Optional[LRUCache._Item]\r
+\r
+ def __init__(self, capacity: int) -> None: ...\r
+\r
+\r
+# This exists to work around Password.str's name shadowing the str type\r
+_str = str\r
+\r
+class Password:\r
+ hashfunc = ... # type: Callable[[bytes], _HashType]\r
+ str = ... # type: Optional[_str]\r
+\r
+ def __init__(\r
+ self,\r
+ str: Optional[_str] = ...,\r
+ hashfunc: Optional[Callable[[bytes], _HashType]] = ...,\r
+ ) -> None: ...\r
+ def set(self, value: Union[bytes, _str]) -> None: ...\r
+ def __eq__(self, other: Any) -> bool: ...\r
+ def __len__(self) -> int: ...\r
+\r
+def notify(\r
+ subject: str,\r
+ body: Optional[str] = ...,\r
+ html_body: Optional[Union[Sequence[str], str]] = ...,\r
+ to_string: Optional[str] = ...,\r
+ attachments: Optional[Iterable[_Message]] = ...,\r
+ append_instance_id: bool = ...,\r
+) -> None: ...\r
+def get_utf8_value(value: str) -> bytes: ...\r
+def mklist(value: Any) -> List: ...\r
+def pythonize_name(name: str) -> str: ...\r
+def write_mime_multipart(\r
+ content: List[Tuple[str, str]],\r
+ compress: bool = ...,\r
+ deftype: str = ...,\r
+ delimiter: str = ...,\r
+) -> str: ...\r
+def guess_mime_type(content: str, deftype: str) -> str: ...\r
+def compute_md5(\r
+ fp: IO[Any],\r
+ buf_size: int = ...,\r
+ size: Optional[int] = ...,\r
+) -> Tuple[str, str, int]: ...\r
+def compute_hash(\r
+ fp: IO[Any],\r
+ buf_size: int = ...,\r
+ size: Optional[int] = ...,\r
+ hash_algorithm: Any = ...,\r
+) -> Tuple[str, str, int]: ...\r
+def find_matching_headers(name: str, headers: Mapping[str, Optional[str]]) -> List[str]: ...\r
+def merge_headers_by_name(name: str, headers: Mapping[str, Optional[str]]) -> str: ...\r
+\r
+class RequestHook:\r
+ def handle_request_data(\r
+ self,\r
+ request: boto.connection.HTTPRequest,\r
+ response: boto.connection.HTTPResponse,\r
+ error: bool = ...,\r
+ ) -> Any: ...\r
+\r
+def host_is_ipv6(hostname: str) -> bool: ...\r
+def parse_host(hostname: str) -> str: ...\r