--- /dev/null
+import os
+import os.path
+import io
+import re
+import sys
+import json
+import traceback
+import platform
+
+jediPreview = False
+
+
+class RedirectStdout(object):
+ def __init__(self, new_stdout=None):
+ """If stdout is None, redirect to /dev/null"""
+ self._new_stdout = new_stdout or open(os.devnull, "w")
+
+ def __enter__(self):
+ sys.stdout.flush()
+ self.oldstdout_fno = os.dup(sys.stdout.fileno())
+ os.dup2(self._new_stdout.fileno(), 1)
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self._new_stdout.flush()
+ os.dup2(self.oldstdout_fno, 1)
+ os.close(self.oldstdout_fno)
+
+
+class JediCompletion(object):
+ basic_types = {
+ "module": "import",
+ "instance": "variable",
+ "statement": "value",
+ "param": "variable",
+ }
+
+ def __init__(self):
+ self.default_sys_path = sys.path
+ self.environment = jedi.api.environment.create_environment(
+ sys.executable, safe=False
+ )
+ self._input = io.open(sys.stdin.fileno(), encoding="utf-8")
+ if (os.path.sep == "/") and (platform.uname()[2].find("Microsoft") > -1):
+ # WSL; does not support UNC paths
+ self.drive_mount = "/mnt/"
+ elif sys.platform == "cygwin":
+ # cygwin
+ self.drive_mount = "/cygdrive/"
+ else:
+ # Do no normalization, e.g. Windows build of Python.
+ # Could add additional test: ((os.path.sep == '/') and os.path.isdir('/mnt/c'))
+ # However, this may have more false positives trying to identify Windows/*nix hybrids
+ self.drive_mount = ""
+
+ def _get_definition_type(self, definition):
+ # if definition.type not in ['import', 'keyword'] and is_built_in():
+ # return 'builtin'
+ try:
+ if definition.type in ["statement"] and definition.name.isupper():
+ return "constant"
+ return self.basic_types.get(definition.type, definition.type)
+ except Exception:
+ return "builtin"
+
+ def _additional_info(self, completion):
+ """Provide additional information about the completion object."""
+ if not hasattr(completion, "_definition") or completion._definition is None:
+ return ""
+ if completion.type == "statement":
+ nodes_to_display = ["InstanceElement", "String", "Node", "Lambda", "Number"]
+ return "".join(
+ c.get_code()
+ for c in completion._definition.children
+ if type(c).__name__ in nodes_to_display
+ ).replace("\n", "")
+ return ""
+
+ @classmethod
+ def _get_top_level_module(cls, path):
+ """Recursively walk through directories looking for top level module.
+
+ Jedi will use current filepath to look for another modules at same
+ path, but it will not be able to see modules **above**, so our goal
+ is to find the higher python module available from filepath.
+ """
+ _path, _ = os.path.split(path)
+ if os.path.isfile(os.path.join(_path, "__init__.py")):
+ return cls._get_top_level_module(_path)
+ return path
+
+ def _generate_signature(self, completion):
+ """Generate signature with function arguments.
+ """
+ if completion.type in ["module"] or not hasattr(completion, "params"):
+ return ""
+ return "%s(%s)" % (
+ completion.name,
+ ", ".join(p.description[6:] for p in completion.params if p),
+ )
+
+ def _get_call_signatures(self, script, line, column):
+ """Extract call signatures from jedi.api.Script object in failsafe way.
+
+ Returns:
+ Tuple with original signature object, name and value.
+ """
+ _signatures = []
+ try:
+ call_signatures = script.get_signatures(line, column)
+ except KeyError:
+ call_signatures = []
+ except:
+ call_signatures = []
+ for signature in call_signatures:
+ for pos, param in enumerate(signature.params):
+ if not param.name:
+ continue
+
+ name = self._get_param_name(param)
+ if param.name == "self" and pos == 0:
+ continue
+ if name.startswith("*"):
+ continue
+
+ value = self._get_param_value(param)
+ _signatures.append((signature, name, value))
+ return _signatures
+
+ def _get_param_name(self, p):
+ if p.name.startswith("param "):
+ return p.name[6:] # drop leading 'param '
+ return p.name
+
+ def _get_param_value(self, p):
+ pair = p.description.split("=")
+ if len(pair) > 1:
+ return pair[1]
+ return None
+
+ def _get_call_signatures_with_args(self, script, line, column):
+ """Extract call signatures from jedi.api.Script object in failsafe way.
+
+ Returns:
+ Array with dictionary
+ """
+ _signatures = []
+ try:
+ call_signatures = script.get_signatures(line, column)
+ except KeyError:
+ call_signatures = []
+ for signature in call_signatures:
+ sig = {
+ "name": "",
+ "description": "",
+ "docstring": "",
+ "paramindex": 0,
+ "params": [],
+ "bracketstart": [],
+ }
+ sig["description"] = signature.description
+ try:
+ sig["docstring"] = signature.docstring()
+ sig["raw_docstring"] = signature.docstring(raw=True)
+ except Exception:
+ sig["docstring"] = ""
+ sig["raw_docstring"] = ""
+
+ sig["name"] = signature.name
+ sig["paramindex"] = signature.index
+ sig["bracketstart"].append(signature.index)
+
+ _signatures.append(sig)
+ for pos, param in enumerate(signature.params):
+ if not param.name:
+ continue
+
+ name = self._get_param_name(param)
+ if param.name == "self" and pos == 0:
+ continue
+
+ value = self._get_param_value(param)
+ paramDocstring = ""
+ try:
+ paramDocstring = param.docstring()
+ except Exception:
+ paramDocstring = ""
+
+ sig["params"].append(
+ {
+ "name": name,
+ "value": value,
+ "docstring": paramDocstring,
+ "description": param.description,
+ }
+ )
+ return _signatures
+
+ def _serialize_completions(self, script, line, column, identifier=None, prefix=""):
+ """Serialize response to be read from VSCode.
+
+ Args:
+ script: Instance of jedi.api.Script object.
+ identifier: Unique completion identifier to pass back to VSCode.
+ prefix: String with prefix to filter function arguments.
+ Used only when fuzzy matcher turned off.
+
+ Returns:
+ Serialized string to send to VSCode.
+ """
+ _completions = []
+
+ for signature, name, value in self._get_call_signatures(script, line, column):
+ if not self.fuzzy_matcher and not name.lower().startswith(prefix.lower()):
+ continue
+ _completion = {
+ "type": "property",
+ "raw_type": "",
+ "rightLabel": self._additional_info(signature),
+ }
+ _completion["description"] = ""
+ _completion["raw_docstring"] = ""
+
+ # we pass 'text' here only for fuzzy matcher
+ if value:
+ _completion["snippet"] = "%s=${1:%s}$0" % (name, value)
+ _completion["text"] = "%s=" % (name)
+ else:
+ _completion["snippet"] = "%s=$1$0" % name
+ _completion["text"] = name
+ _completion["displayText"] = name
+ _completions.append(_completion)
+
+ try:
+ completions = script.complete(line, column)
+ except KeyError:
+ completions = []
+ except:
+ completions = []
+ for completion in completions:
+ try:
+ _completion = {
+ "text": completion.name,
+ "type": self._get_definition_type(completion),
+ "raw_type": completion.type,
+ "rightLabel": self._additional_info(completion),
+ }
+ except Exception:
+ continue
+
+ for c in _completions:
+ if c["text"] == _completion["text"]:
+ c["type"] = _completion["type"]
+ c["raw_type"] = _completion["raw_type"]
+
+ if any(
+ [c["text"].split("=")[0] == _completion["text"] for c in _completions]
+ ):
+ # ignore function arguments we already have
+ continue
+ _completions.append(_completion)
+ return json.dumps({"id": identifier, "results": _completions})
+
+ def _serialize_methods(self, script, line, column, identifier=None, prefix=""):
+ _methods = []
+ try:
+ completions = script.complete(line, column)
+ except KeyError:
+ return []
+
+ for completion in completions:
+ if completion.name == "__autocomplete_python":
+ instance = completion.parent().name
+ break
+ else:
+ instance = "self.__class__"
+
+ for completion in completions:
+ params = []
+ if hasattr(completion, "params"):
+ params = [p.description for p in completion.params if p]
+ if completion.parent().type == "class":
+ _methods.append(
+ {
+ "parent": completion.parent().name,
+ "instance": instance,
+ "name": completion.name,
+ "params": params,
+ "moduleName": completion.module_name,
+ "fileName": completion.module_path,
+ "line": completion.line,
+ "column": completion.column,
+ }
+ )
+ return json.dumps({"id": identifier, "results": _methods})
+
+ def _serialize_arguments(self, script, line, column, identifier=None):
+ """Serialize response to be read from VSCode.
+
+ Args:
+ script: Instance of jedi.api.Script object.
+ identifier: Unique completion identifier to pass back to VSCode.
+
+ Returns:
+ Serialized string to send to VSCode.
+ """
+ return json.dumps(
+ {
+ "id": identifier,
+ "results": self._get_call_signatures_with_args(script, line, column),
+ }
+ )
+
+ def _top_definition(self, definition):
+ for d in definition.goto_assignments():
+ if d == definition:
+ continue
+ if d.type == "import":
+ return self._top_definition(d)
+ else:
+ return d
+ return definition
+
+ def _extract_range_jedi_0_11_1(self, definition):
+ from parso.utils import split_lines
+
+ # get the scope range
+ try:
+ if definition.type in ["class", "function"]:
+ tree_name = definition._name.tree_name
+ scope = tree_name.get_definition()
+ start_line = scope.start_pos[0] - 1
+ start_column = scope.start_pos[1]
+ # get the lines
+ code = scope.get_code(include_prefix=False)
+ lines = split_lines(code)
+ # trim the lines
+ lines = "\n".join(lines).rstrip().split("\n")
+ end_line = start_line + len(lines) - 1
+ end_column = len(lines[-1]) - 1
+ else:
+ symbol = definition._name.tree_name
+ start_line = symbol.start_pos[0] - 1
+ start_column = symbol.start_pos[1]
+ end_line = symbol.end_pos[0] - 1
+ end_column = symbol.end_pos[1]
+ return {
+ "start_line": start_line,
+ "start_column": start_column,
+ "end_line": end_line,
+ "end_column": end_column,
+ }
+ except Exception as e:
+ return {
+ "start_line": definition.line - 1,
+ "start_column": definition.column,
+ "end_line": definition.line - 1,
+ "end_column": definition.column,
+ }
+
+ def _extract_range(self, definition):
+ """Provides the definition range of a given definition
+
+ For regular symbols it returns the start and end location of the
+ characters making up the symbol.
+
+ For scoped containers it will return the entire definition of the
+ scope.
+
+ The scope that jedi provides ends with the first character of the next
+ scope so it's not ideal. For vscode we need the scope to end with the
+ last character of actual code. That's why we extract the lines that
+ make up our scope and trim the trailing whitespace.
+ """
+ return self._extract_range_jedi_0_11_1(definition)
+
+ def _get_definitionsx(self, definitions, identifier=None, ignoreNoModulePath=False):
+ """Serialize response to be read from VSCode.
+
+ Args:
+ definitions: List of jedi.api.classes.Definition objects.
+ identifier: Unique completion identifier to pass back to VSCode.
+
+ Returns:
+ Serialized string to send to VSCode.
+ """
+ _definitions = []
+ for definition in definitions:
+ try:
+ if definition.type == "import":
+ definition = self._top_definition(definition)
+ definitionRange = {
+ "start_line": 0,
+ "start_column": 0,
+ "end_line": 0,
+ "end_column": 0,
+ }
+ module_path = ""
+ if hasattr(definition, "module_path") and definition.module_path:
+ module_path = definition.module_path
+ definitionRange = self._extract_range(definition)
+ else:
+ if not ignoreNoModulePath:
+ continue
+ try:
+ parent = definition.parent()
+ container = parent.name if parent.type != "module" else ""
+ except Exception:
+ container = ""
+
+ try:
+ docstring = definition.docstring()
+ rawdocstring = definition.docstring(raw=True)
+ except Exception:
+ docstring = ""
+ rawdocstring = ""
+ _definition = {
+ "text": definition.name,
+ "type": self._get_definition_type(definition),
+ "raw_type": definition.type,
+ "fileName": module_path,
+ "container": container,
+ "range": definitionRange,
+ "description": definition.description,
+ "docstring": docstring,
+ "raw_docstring": rawdocstring,
+ "signature": self._generate_signature(definition),
+ }
+ _definitions.append(_definition)
+ except Exception as e:
+ pass
+ return _definitions
+
+ def _serialize_definitions(self, definitions, identifier=None):
+ """Serialize response to be read from VSCode.
+
+ Args:
+ definitions: List of jedi.api.classes.Definition objects.
+ identifier: Unique completion identifier to pass back to VSCode.
+
+ Returns:
+ Serialized string to send to VSCode.
+ """
+ _definitions = []
+ for definition in definitions:
+ try:
+ if definition.module_path:
+ if definition.type == "import":
+ definition = self._top_definition(definition)
+ if not definition.module_path:
+ continue
+ try:
+ parent = definition.parent()
+ container = parent.name if parent.type != "module" else ""
+ except Exception:
+ container = ""
+
+ try:
+ docstring = definition.docstring()
+ rawdocstring = definition.docstring(raw=True)
+ except Exception:
+ docstring = ""
+ rawdocstring = ""
+ _definition = {
+ "text": definition.name,
+ "type": self._get_definition_type(definition),
+ "raw_type": definition.type,
+ "fileName": definition.module_path,
+ "container": container,
+ "range": self._extract_range(definition),
+ "description": definition.description,
+ "docstring": docstring,
+ "raw_docstring": rawdocstring,
+ }
+ _definitions.append(_definition)
+ except Exception as e:
+ pass
+ return json.dumps({"id": identifier, "results": _definitions})
+
+ def _serialize_tooltip(self, definitions, identifier=None):
+ _definitions = []
+ for definition in definitions:
+ signature = definition.name
+ description = None
+ if definition.type in ["class", "function"]:
+ signature = self._generate_signature(definition)
+ try:
+ description = definition.docstring(raw=True).strip()
+ except Exception:
+ description = ""
+ if not description and not hasattr(definition, "get_line_code"):
+ # jedi returns an empty string for compiled objects
+ description = definition.docstring().strip()
+ if definition.type == "module":
+ signature = definition.full_name
+ try:
+ description = definition.docstring(raw=True).strip()
+ except Exception:
+ description = ""
+ if not description and hasattr(definition, "get_line_code"):
+ # jedi returns an empty string for compiled objects
+ description = definition.docstring().strip()
+ _definition = {
+ "type": self._get_definition_type(definition),
+ "text": definition.name,
+ "description": description,
+ "docstring": description,
+ "signature": signature,
+ }
+ _definitions.append(_definition)
+ return json.dumps({"id": identifier, "results": _definitions})
+
+ def _serialize_usages(self, usages, identifier=None):
+ _usages = []
+ for usage in usages:
+ _usages.append(
+ {
+ "name": usage.name,
+ "moduleName": usage.module_name,
+ "fileName": usage.module_path,
+ "line": usage.line,
+ "column": usage.column,
+ }
+ )
+ return json.dumps({"id": identifier, "results": _usages})
+
+ def _deserialize(self, request):
+ """Deserialize request from VSCode.
+
+ Args:
+ request: String with raw request from VSCode.
+
+ Returns:
+ Python dictionary with request data.
+ """
+ return json.loads(request)
+
+ def _set_request_config(self, config):
+ """Sets config values for current request.
+
+ This includes sys.path modifications which is getting restored to
+ default value on each request so each project should be isolated
+ from each other.
+
+ Args:
+ config: Dictionary with config values.
+ """
+ sys.path = self.default_sys_path
+ self.use_snippets = config.get("useSnippets")
+ self.show_doc_strings = config.get("showDescriptions", True)
+ self.fuzzy_matcher = config.get("fuzzyMatcher", False)
+ jedi.settings.case_insensitive_completion = config.get(
+ "caseInsensitiveCompletion", True
+ )
+ for path in config.get("extraPaths", []):
+ if path and path not in sys.path:
+ sys.path.insert(0, path)
+
+ def _normalize_request_path(self, request):
+ """Normalize any Windows paths received by a *nix build of
+ Python. Does not alter the reverse os.path.sep=='\\',
+ i.e. *nix paths received by a Windows build of Python.
+ """
+ if "path" in request:
+ if not self.drive_mount:
+ return
+ newPath = request["path"].replace("\\", "/")
+ if newPath[0:1] == "/":
+ # is absolute path with no drive letter
+ request["path"] = newPath
+ elif newPath[1:2] == ":":
+ # is path with drive letter, only absolute can be mapped
+ request["path"] = self.drive_mount + newPath[0:1].lower() + newPath[2:]
+ else:
+ # is relative path
+ request["path"] = newPath
+
+ def _process_request(self, request):
+ """Accept serialized request from VSCode and write response.
+ """
+ request = self._deserialize(request)
+
+ self._set_request_config(request.get("config", {}))
+
+ self._normalize_request_path(request)
+ path = self._get_top_level_module(request.get("path", ""))
+ if len(path) > 0 and path not in sys.path:
+ sys.path.insert(0, path)
+ lookup = request.get("lookup", "completions")
+
+ if lookup == "names":
+ return self._serialize_definitions(
+ jedi.Script(
+ code=request.get("source", None),
+ path=request.get("path", ""),
+ project=jedi.get_default_project(os.path.dirname(path)),
+ environment=self.environment,
+ ).get_names(all_scopes=True),
+ request["id"],
+ )
+
+ line = request["line"] + 1
+ column = request["column"]
+ script = jedi.Script(
+ code=request.get("source", None),
+ path=request.get("path", ""),
+ project=jedi.get_default_project(os.path.dirname(path)),
+ environment=self.environment,
+ )
+
+ if lookup == "definitions":
+ defs = self._get_definitionsx(
+ script.goto(line, column, follow_imports=True), request["id"]
+ )
+ return json.dumps({"id": request["id"], "results": defs})
+ if lookup == "tooltip":
+ if jediPreview:
+ defs = []
+ try:
+ defs = self._get_definitionsx(
+ script.infer(line, column), request["id"], True
+ )
+ except:
+ pass
+ try:
+ if len(defs) == 0:
+ defs = self._get_definitionsx(
+ script.goto(line, column), request["id"], True
+ )
+ except:
+ pass
+ return json.dumps({"id": request["id"], "results": defs})
+ else:
+ try:
+ return self._serialize_tooltip(
+ script.infer(line, column), request["id"]
+ )
+ except:
+ return json.dumps({"id": request["id"], "results": []})
+ elif lookup == "arguments":
+ return self._serialize_arguments(script, line, column, request["id"])
+ elif lookup == "usages":
+ return self._serialize_usages(
+ script.get_references(line, column), request["id"]
+ )
+ elif lookup == "methods":
+ return self._serialize_methods(
+ script, line, column, request["id"], request.get("prefix", "")
+ )
+ else:
+ return self._serialize_completions(
+ script, line, column, request["id"], request.get("prefix", "")
+ )
+
+ def _write_response(self, response):
+ sys.stdout.write(response + "\n")
+ sys.stdout.flush()
+
+ def watch(self):
+ while True:
+ try:
+ rq = self._input.readline()
+ if len(rq) == 0:
+ # Reached EOF - indication our parent process is gone.
+ sys.stderr.write(
+ "Received EOF from the standard input,exiting" + "\n"
+ )
+ sys.stderr.flush()
+ return
+ with RedirectStdout():
+ response = self._process_request(rq)
+ self._write_response(response)
+
+ except Exception:
+ sys.stderr.write(traceback.format_exc() + "\n")
+ sys.stderr.flush()
+
+
+if __name__ == "__main__":
+ cachePrefix = "v"
+ modulesToLoad = ""
+ if len(sys.argv) > 2 and sys.argv[1] == "custom":
+ jediPath = sys.argv[2]
+ jediPreview = True
+ cachePrefix = "custom_v"
+ if len(sys.argv) > 3:
+ modulesToLoad = sys.argv[3]
+ else:
+ # release
+ jediPath = os.path.join(os.path.dirname(__file__), "lib", "python")
+ if len(sys.argv) > 1:
+ modulesToLoad = sys.argv[1]
+
+ sys.path.insert(0, jediPath)
+ import jedi
+
+ digits = jedi.__version__.split(".")
+ if int(digits[0]) == 0 and int(digits[1]) < 17:
+ raise RuntimeError("Jedi version %s too old, requires >= 0.17.0" % (jedi.__version__))
+ else:
+ if jediPreview:
+ jedi.settings.cache_directory = os.path.join(
+ jedi.settings.cache_directory,
+ cachePrefix + jedi.__version__.replace(".", ""),
+ )
+ # remove jedi from path after we import it so it will not be completed
+ sys.path.pop(0)
+ if len(modulesToLoad) > 0:
+ jedi.preload_module(*modulesToLoad.split(","))
+ JediCompletion().watch()