Spaces:
Runtime error
Runtime error
| """CodeImportsAnalyzer uses the ast module from Python's standard library | |
| to get what modules are imported in given python files, then uses networkx to generate imports graph | |
| """ | |
| import ast | |
| import asyncio | |
| import aiohttp | |
| import pybase64 | |
| from .graph_analyzer import GraphAnalyzer | |
| def construct_fetch_program_text_api_url(api_url): | |
| import os | |
| # to increase api rate limiting | |
| # https://docs.github.com/en/rest/overview/resources-in-the-rest-api#rate-limiting | |
| USER = os.environ.get("USER", "") | |
| PERSONAL_ACCESS_TOKEN = os.environ.get("PERSONAL_ACCESS_TOKEN", "") | |
| if USER and PERSONAL_ACCESS_TOKEN: | |
| protocol, api_url_components = api_url.split("://") | |
| new_api_url_components = f"{USER}:{PERSONAL_ACCESS_TOKEN}@{api_url_components}" | |
| return f"{protocol}://{new_api_url_components}" | |
| else: | |
| return api_url | |
| async def get_program_text(session, python_file): | |
| # about Retry-After | |
| # https://docs.github.com/en/rest/guides/best-practices-for-integrators#dealing-with-secondary-rate-limits | |
| async with session.get( | |
| construct_fetch_program_text_api_url(python_file["url"]), | |
| headers={"Accept": "application/vnd.github.v3+json", "Retry-After": "5"}, | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| if data["encoding"] == "base64": | |
| return data["content"] | |
| else: | |
| print( | |
| f"WARNING: {python_file['path']}'s encoding is {data['encoding']}, not base64" | |
| ) | |
| class CodeImportsAnalyzer: | |
| class _NodeVisitor(ast.NodeVisitor): | |
| def __init__(self, imports): | |
| self.imports = imports | |
| def visit_Import(self, node): | |
| for alias in node.names: | |
| self.imports[-1]["imports"].append( | |
| {"module": None, "name": alias.name, "level": -1} | |
| ) | |
| self.generic_visit(node) | |
| def visit_ImportFrom(self, node): | |
| for alias in node.names: | |
| self.imports[-1]["imports"].append( | |
| {"module": node.module, "name": alias.name, "level": node.level} | |
| ) | |
| self.generic_visit(node) | |
| def __init__(self, python_files): | |
| self.python_imports = [] | |
| self.graph_analyzer = GraphAnalyzer(is_directed=True) | |
| self.python_files = python_files | |
| self._node_visitor = CodeImportsAnalyzer._NodeVisitor(self.python_imports) | |
| async def analyze(self): | |
| async with aiohttp.ClientSession() as session: | |
| tasks = [] | |
| for python_file in self.python_files: | |
| self.python_imports += [ | |
| { | |
| "file_name": python_file["path"].split("/")[-1], | |
| "file_path": python_file["path"], | |
| "imports": [], | |
| } | |
| ] | |
| tasks.append( | |
| asyncio.ensure_future(get_program_text(session, python_file)) | |
| ) | |
| base64_program_texts = await asyncio.gather(*tasks) | |
| for base64_program_text in base64_program_texts: | |
| if base64_program_text: | |
| program = pybase64.b64decode(base64_program_text) | |
| tree = ast.parse(program) | |
| self._node_visitor.visit(tree) | |
| def generate_imports_graph(self): | |
| # TODO: thought on how to improve the graph generation logic | |
| # generate a dictionary of lists data structure | |
| # generate a graph based on a dictionary of lists | |
| for python_import in self.python_imports: | |
| _nodes = python_import["file_path"].split("/") | |
| if len(_nodes): | |
| # generate graph based on file_path | |
| # node/edge relationship means file/folder structure | |
| if len(_nodes) > 1: | |
| # make last node and second last node as one node | |
| # to solve the issue of duplicated file names using only last node | |
| if len(_nodes) >= 3: | |
| _nodes[-2] = _nodes[-2] + "/" + _nodes[-1] | |
| del _nodes[-1] | |
| self.graph_analyzer.add_edges_from_nodes(_nodes) | |
| else: | |
| self.graph_analyzer.add_node(_nodes[0]) | |
| # generate graph based on imported modules in each file | |
| if python_import["file_name"] != "__init__.py": | |
| for _import in python_import["imports"]: | |
| if _import["module"] is None: | |
| _import_names = _import["name"].split(".") | |
| _new_nodes = _import_names + [_nodes[-1]] | |
| self.graph_analyzer.add_edges_from_nodes(_new_nodes) | |
| else: | |
| _import_names = _import["module"].split(".") + [ | |
| _import["name"] | |
| ] | |
| _new_nodes = _import_names + [_nodes[-1]] | |
| self.graph_analyzer.add_edges_from_nodes(_new_nodes) | |
| return self.graph_analyzer.graph | |
| def report(self): | |
| from pprint import pprint | |
| pprint(self.python_imports) | |