Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import folder_paths | |
| import json | |
| from server import PromptServer | |
| import glob | |
| from aiohttp import web | |
| def get_allowed_dirs(): | |
| dir = os.path.abspath(os.path.join(__file__, "../../user")) | |
| file = os.path.join(dir, "text_file_dirs.json") | |
| with open(file, "r") as f: | |
| return json.loads(f.read()) | |
| def get_valid_dirs(): | |
| return get_allowed_dirs().keys() | |
| def get_dir_from_name(name): | |
| dirs = get_allowed_dirs() | |
| if name not in dirs: | |
| raise KeyError(name + " dir not found") | |
| path = dirs[name] | |
| path = path.replace("$input", folder_paths.get_input_directory()) | |
| path = path.replace("$output", folder_paths.get_output_directory()) | |
| path = path.replace("$temp", folder_paths.get_temp_directory()) | |
| return path | |
| def is_child_dir(parent_path, child_path): | |
| parent_path = os.path.abspath(parent_path) | |
| child_path = os.path.abspath(child_path) | |
| return os.path.commonpath([parent_path]) == os.path.commonpath([parent_path, child_path]) | |
| def get_real_path(dir): | |
| dir = dir.replace("/**/", "/") | |
| dir = os.path.abspath(dir) | |
| dir = os.path.split(dir)[0] | |
| return dir | |
| async def get_files(request): | |
| name = request.match_info["name"] | |
| dir = get_dir_from_name(name) | |
| recursive = "/**/" in dir | |
| # Ugh cant use root_path on glob... lazy hack.. | |
| pre = get_real_path(dir) | |
| files = list(map(lambda t: os.path.relpath(t, pre), | |
| glob.glob(dir, recursive=recursive))) | |
| if len(files) == 0: | |
| files = ["[none]"] | |
| return web.json_response(files) | |
| def get_file(root_dir, file): | |
| if file == "[none]" or not file or not file.strip(): | |
| raise ValueError("No file") | |
| root_dir = get_dir_from_name(root_dir) | |
| root_dir = get_real_path(root_dir) | |
| if not os.path.exists(root_dir): | |
| os.mkdir(root_dir) | |
| full_path = os.path.join(root_dir, file) | |
| if not is_child_dir(root_dir, full_path): | |
| raise ReferenceError() | |
| return full_path | |
| class TextFileNode: | |
| RETURN_TYPES = ("STRING",) | |
| CATEGORY = "utils" | |
| def VALIDATE_INPUTS(self, root_dir, file, **kwargs): | |
| if file == "[none]" or not file or not file.strip(): | |
| return True | |
| get_file(root_dir, file) | |
| return True | |
| def load_text(self, **kwargs): | |
| self.file = get_file(kwargs["root_dir"], kwargs["file"]) | |
| with open(self.file, "r") as f: | |
| return (f.read(), ) | |
| class LoadText(TextFileNode): | |
| def IS_CHANGED(self, **kwargs): | |
| return os.path.getmtime(self.file) | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "root_dir": (list(get_valid_dirs()), {}), | |
| "file": (["[none]"], { | |
| "pysssss.binding": [{ | |
| "source": "root_dir", | |
| "callback": [{ | |
| "type": "set", | |
| "target": "$this.disabled", | |
| "value": True | |
| }, { | |
| "type": "fetch", | |
| "url": "/pysssss/text-file/{$source.value}", | |
| "then": [{ | |
| "type": "set", | |
| "target": "$this.options.values", | |
| "value": "$result" | |
| }, { | |
| "type": "validate-combo" | |
| }, { | |
| "type": "set", | |
| "target": "$this.disabled", | |
| "value": False | |
| }] | |
| }], | |
| }] | |
| }) | |
| }, | |
| } | |
| FUNCTION = "load_text" | |
| class SaveText(TextFileNode): | |
| OUTPUT_NODE = True | |
| def IS_CHANGED(self, **kwargs): | |
| return float("nan") | |
| def INPUT_TYPES(s): | |
| return { | |
| "required": { | |
| "root_dir": (list(get_valid_dirs()), {}), | |
| "file": ("STRING", {"default": "file.txt"}), | |
| "append": (["append", "overwrite", "new only"], {}), | |
| "insert": ("BOOLEAN", { | |
| "default": True, "label_on": "new line", "label_off": "none", | |
| "pysssss.binding": [{ | |
| "source": "append", | |
| "callback": [{ | |
| "type": "if", | |
| "condition": [{ | |
| "left": "$source.value", | |
| "op": "eq", | |
| "right": '"append"' | |
| }], | |
| "true": [{ | |
| "type": "set", | |
| "target": "$this.disabled", | |
| "value": False | |
| }], | |
| "false": [{ | |
| "type": "set", | |
| "target": "$this.disabled", | |
| "value": True | |
| }], | |
| }] | |
| }] | |
| }), | |
| "text": ("STRING", {"forceInput": True, "multiline": True}) | |
| }, | |
| } | |
| FUNCTION = "write_text" | |
| def write_text(self, **kwargs): | |
| self.file = get_file(kwargs["root_dir"], kwargs["file"]) | |
| if kwargs["append"] == "new only" and os.path.exists(self.file): | |
| raise FileExistsError( | |
| self.file + " already exists and 'new only' is selected.") | |
| with open(self.file, "a+" if kwargs["append"] == "append" else "w") as f: | |
| is_append = f.tell() != 0 | |
| if is_append and kwargs["insert"]: | |
| f.write("\n") | |
| f.write(kwargs["text"]) | |
| return super().load_text(**kwargs) | |
| NODE_CLASS_MAPPINGS = { | |
| "LoadText|pysssss": LoadText, | |
| "SaveText|pysssss": SaveText, | |
| } | |
| NODE_DISPLAY_NAME_MAPPINGS = { | |
| "LoadText|pysssss": "Load Text 🐍", | |
| "SaveText|pysssss": "Save Text 🐍", | |
| } | |