2 import logging.handlers
\r
7 import boto.connection
\r
25 _KT = TypeVar('_KT')
\r
26 _VT = TypeVar('_VT')
\r
28 if sys.version_info[0] >= 3:
\r
29 # TODO move _StringIO definition into boto.compat once stubs exist and rename to StringIO
\r
31 _StringIO = io.StringIO
\r
33 from hashlib import _Hash
\r
36 from email.message import Message as _Message
\r
38 # TODO move _StringIO definition into boto.compat once stubs exist and rename to StringIO
\r
40 _StringIO = StringIO.StringIO
\r
42 from hashlib import _hash
\r
45 # TODO use email.message.Message once stubs exist
\r
48 _Provider = Any # TODO replace this with boto.provider.Provider once stubs exist
\r
49 _LockType = Any # TODO replace this with _thread.LockType once stubs exist
\r
52 JSONDecodeError = ... # type: Type[ValueError]
\r
53 qsa_of_interest = ... # type: List[str]
\r
56 def unquote_v(nv: str) -> Union[str, Tuple[str, str]]: ...
\r
57 def canonical_string(
\r
60 headers: Mapping[str, Optional[str]],
\r
61 expires: Optional[int] = ...,
\r
62 provider: Optional[_Provider] = ...,
\r
65 headers: Mapping[str, str],
\r
66 metadata: Mapping[str, str],
\r
67 provider: Optional[_Provider] = ...,
\r
68 ) -> Mapping[str, str]: ...
\r
69 def get_aws_metadata(
\r
70 headers: Mapping[str, str],
\r
71 provider: Optional[_Provider] = ...,
\r
72 ) -> Mapping[str, str]: ...
\r
75 retry_on_404: bool = ...,
\r
76 num_retries: int = ...,
\r
77 timeout: Optional[int] = ...,
\r
80 class LazyLoadMetadata(Dict[_KT, _VT]):
\r
85 timeout: Optional[int] = ...,
\r
88 def get_instance_metadata(
\r
92 timeout: Optional[int] = ...,
\r
93 num_retries: int = ...,
\r
94 ) -> Optional[LazyLoadMetadata]: ...
\r
95 def get_instance_identity(
\r
98 timeout: Optional[int] = ...,
\r
99 num_retries: int = ...,
\r
100 ) -> Optional[Mapping[str, Any]]: ...
\r
101 def get_instance_userdata(
\r
102 version: str = ...,
\r
103 sep: Optional[str] = ...,
\r
105 timeout: Optional[int] = ...,
\r
106 num_retries: int = ...,
\r
107 ) -> Mapping[str, str]: ...
\r
109 ISO8601 = ... # type: str
\r
110 ISO8601_MS = ... # type: str
\r
111 RFC1123 = ... # type: str
\r
112 LOCALE_LOCK = ... # type: _LockType
\r
114 def setlocale(name: Union[str, Tuple[str, str]]) -> ContextManager[str]: ...
\r
115 def get_ts(ts: Optional[time.struct_time] = ...) -> str: ...
\r
116 def parse_ts(ts: str) -> datetime.datetime: ...
\r
117 def find_class(module_name: str, class_name: Optional[str] = ...) -> Optional[Type[Any]]: ...
\r
118 def update_dme(username: str, password: str, dme_id: str, ip_address: str) -> str: ...
\r
121 file: Optional[IO[str]] = ...,
\r
122 username: Optional[str] = ...,
\r
123 password: Optional[str] = ...,
\r
124 ) -> Optional[IO[str]]: ...
\r
126 class ShellCommand:
\r
127 exit_code = ... # type: int
\r
128 command = ... # type: subprocess._CMD
\r
129 log_fp = ... # type: _StringIO
\r
130 wait = ... # type: bool
\r
131 fail_fast = ... # type: bool
\r
135 command: subprocess._CMD,
\r
137 fail_fast: bool = ...,
\r
138 cwd: Optional[subprocess._TXT] = ...,
\r
141 process = ... # type: subprocess.Popen
\r
143 def run(self, cwd: Optional[subprocess._CMD] = ...) -> Optional[int]: ...
\r
144 def setReadOnly(self, value) -> None: ...
\r
145 def getStatus(self) -> Optional[int]: ...
\r
147 status = ... # type: Optional[int]
\r
149 def getOutput(self) -> str: ...
\r
151 output = ... # type: str
\r
153 class AuthSMTPHandler(logging.handlers.SMTPHandler):
\r
154 username = ... # type: str
\r
155 password = ... # type: str
\r
162 toaddrs: Sequence[str],
\r
166 class LRUCache(Dict[_KT, _VT]):
\r
168 previous = ... # type: Optional[LRUCache._Item]
\r
169 next = ... # type: Optional[LRUCache._Item]
\r
172 def __init__(self, key, value) -> None: ...
\r
174 _dict = ... # type: Dict[_KT, LRUCache._Item]
\r
175 capacity = ... # type: int
\r
176 head = ... # type: Optional[LRUCache._Item]
\r
177 tail = ... # type: Optional[LRUCache._Item]
\r
179 def __init__(self, capacity: int) -> None: ...
\r
182 # This exists to work around Password.str's name shadowing the str type
\r
186 hashfunc = ... # type: Callable[[bytes], _HashType]
\r
187 str = ... # type: Optional[_str]
\r
191 str: Optional[_str] = ...,
\r
192 hashfunc: Optional[Callable[[bytes], _HashType]] = ...,
\r
194 def set(self, value: Union[bytes, _str]) -> None: ...
\r
195 def __eq__(self, other: Any) -> bool: ...
\r
196 def __len__(self) -> int: ...
\r
200 body: Optional[str] = ...,
\r
201 html_body: Optional[Union[Sequence[str], str]] = ...,
\r
202 to_string: Optional[str] = ...,
\r
203 attachments: Optional[Iterable[_Message]] = ...,
\r
204 append_instance_id: bool = ...,
\r
206 def get_utf8_value(value: str) -> bytes: ...
\r
207 def mklist(value: Any) -> List: ...
\r
208 def pythonize_name(name: str) -> str: ...
\r
209 def write_mime_multipart(
\r
210 content: List[Tuple[str, str]],
\r
211 compress: bool = ...,
\r
212 deftype: str = ...,
\r
213 delimiter: str = ...,
\r
215 def guess_mime_type(content: str, deftype: str) -> str: ...
\r
218 buf_size: int = ...,
\r
219 size: Optional[int] = ...,
\r
220 ) -> Tuple[str, str, int]: ...
\r
223 buf_size: int = ...,
\r
224 size: Optional[int] = ...,
\r
225 hash_algorithm: Any = ...,
\r
226 ) -> Tuple[str, str, int]: ...
\r
227 def find_matching_headers(name: str, headers: Mapping[str, Optional[str]]) -> List[str]: ...
\r
228 def merge_headers_by_name(name: str, headers: Mapping[str, Optional[str]]) -> str: ...
\r
231 def handle_request_data(
\r
233 request: boto.connection.HTTPRequest,
\r
234 response: boto.connection.HTTPResponse,
\r
238 def host_is_ipv6(hostname: str) -> bool: ...
\r
239 def parse_host(hostname: str) -> str: ...
\r