Spaces:
Sleeping
Sleeping
| import traceback | |
| import logging | |
| from typing import Tuple, List | |
| import copy | |
| from pathlib import Path | |
| import json | |
| from collections import OrderedDict | |
| import os | |
| import sys | |
| sys.path.append(os.getcwd()) | |
| from cllm.agents import container | |
| from cllm.agents.builtin import BUILTIN_PLANS, load_builtin_plans | |
| from cllm.agents.container import auto_type | |
| from cllm.agents.base import DataType, NON_FILE_TYPES | |
| from .interpretor import Interpretor | |
| from .planner import Planner | |
| from .responser import generate_response | |
| logger = logging.getLogger(__name__) | |
| class Controller: | |
| def __init__(self, stream=True, interpretor_kwargs={}): | |
| self.stream = stream | |
| self.planner = Planner(self.stream) | |
| self.interpretor = Interpretor(**interpretor_kwargs) | |
| self.SHORTCUT = "**Using builtin shortcut solution.**" | |
| BUILTIN_PLANS.update(load_builtin_plans("builtin_plan.json")) | |
| logger.info(BUILTIN_PLANS) | |
| def plan(self, request: str, state: dict): | |
| logger.info(request) | |
| resource_memory = state.get("resources", {}) | |
| raw_solution = None | |
| # shortcut for builtin plan | |
| for trigger_prompt, _ in BUILTIN_PLANS.items(): | |
| if request == trigger_prompt: | |
| return self.SHORTCUT | |
| # dynamic execution | |
| if raw_solution is None: | |
| raw_solution = self.planner.plan(request, resource_memory) | |
| return raw_solution | |
| def parse_solution_from_stream(self, raw_solution): | |
| return self.planner.parse(raw_solution) | |
| def execute(self, raw_solution: str, state: dict): | |
| resource_memory = state.get("resources") | |
| request = state["request"] | |
| solution = None | |
| if raw_solution == self.SHORTCUT: | |
| for trigger_prompt, builtin_plan in BUILTIN_PLANS.items(): | |
| if request == trigger_prompt: | |
| solution = builtin_plan | |
| solution = self._fill_args(solution, resource_memory) | |
| else: | |
| solution = self.planner.parse(raw_solution) | |
| if not solution: | |
| return None | |
| try: | |
| history_msgs = state.get("history_msgs") | |
| return self.interpretor.interpret(solution, history_msgs) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return None | |
| def reply(self, executed_plan: dict, outputs: list, state: dict): | |
| error_response = [ | |
| auto_type( | |
| "response", | |
| DataType.TEXT, | |
| "Sorry, I cannot understand your request due to an internal error.", | |
| ) | |
| ] | |
| state = copy.deepcopy(state) | |
| if ( | |
| executed_plan is None | |
| or len(executed_plan) == 0 | |
| or outputs is None | |
| or len(outputs) == 0 | |
| ): | |
| return error_response, state | |
| resources = state.get("resources", OrderedDict()) | |
| for o in outputs: | |
| if isinstance(o, container.File): | |
| resources[str(o.filename)] = str(o.rtype) | |
| state["resources"] = resources | |
| response = generate_response(state["request"], executed_plan, outputs) | |
| if len(response) == 0: | |
| return error_response, state | |
| logger.info(response) | |
| return response, state | |
| def run(self, task: str, state: dict) -> Tuple[List, str]: | |
| try: | |
| return self._run(task, state) | |
| except: | |
| traceback.print_exc() | |
| logger.info(traceback.format_exc()) | |
| return [ | |
| auto_type( | |
| "response", | |
| DataType.TEXT, | |
| "Sorry, I cannot understand your request due to an internal error.", | |
| ) | |
| ], "Internal Error" | |
| def _run(self, task: str, state: dict) -> Tuple[List, str]: | |
| logger.info(task) | |
| BUILTIN_PLANS.update(load_builtin_plans("builtin_plan.json")) | |
| logger.info(BUILTIN_PLANS) | |
| resource_memory = state.get("resources", OrderedDict()) | |
| history_msgs = state.get("history_msgs", []) | |
| plan = None | |
| # shortcut for builtin plan | |
| for trigger_prompt, builtin_plan in BUILTIN_PLANS.items(): | |
| if task == trigger_prompt: | |
| plan = builtin_plan | |
| plan = self._fill_args(plan, resource_memory) | |
| # dynamic executation | |
| if plan is None: | |
| plan = self.planner.planning(task, resource_memory) | |
| logger.info(plan) | |
| executed_plan, output_files = self.interpretor.interpret( | |
| plan, resource_memory, history_msgs | |
| ) | |
| logger.info(output_files) | |
| for o in output_files: | |
| if isinstance(o, container.File): | |
| resource_memory[o.filename] = str(o.rtype) | |
| outputs = generate_response(task, executed_plan, output_files) | |
| logger.info(outputs) | |
| return outputs, executed_plan | |
| def _fill_args(self, plan, memory): | |
| plan = copy.deepcopy(plan) | |
| latest_resource = OrderedDict() | |
| for key, val in memory.items(): | |
| latest_resource[val] = key | |
| for actions in plan: | |
| for action in actions: | |
| for key, val in action.inputs.items(): | |
| if "<TOOL-GENERATED>" not in val: | |
| action.inputs[key] = latest_resource.get(val, val) | |
| return plan | |