Spaces:
Sleeping
Sleeping
| import configparser | |
| import logging | |
| import os | |
| import ast | |
| import re | |
| from dotenv import load_dotenv | |
| # Local .env file | |
| load_dotenv() | |
| def getconfig(configfile_path: str): | |
| """ | |
| Read the config file | |
| Params | |
| ---------------- | |
| configfile_path: file path of .cfg file | |
| """ | |
| config = configparser.ConfigParser() | |
| try: | |
| config.read_file(open(configfile_path)) | |
| return config | |
| except: | |
| logging.warning("config file not found") | |
| def get_auth(provider: str) -> dict: | |
| """Get authentication configuration for different providers""" | |
| auth_configs = { | |
| "huggingface": {"api_key": os.getenv("HF_TOKEN")}, | |
| "qdrant": {"api_key": os.getenv("QDRANT_API_KEY")}, | |
| } | |
| provider = provider.lower() # Normalize to lowercase | |
| if provider not in auth_configs: | |
| raise ValueError(f"Unsupported provider: {provider}") | |
| auth_config = auth_configs[provider] | |
| api_key = auth_config.get("api_key") | |
| if not api_key: | |
| logging.warning(f"No API key found for provider '{provider}'. Please set the appropriate environment variable.") | |
| auth_config["api_key"] = None | |
| return auth_config | |
| def process_content(content: str) -> str: | |
| """ | |
| Process and clean malformed content that may contain stringified nested lists. | |
| The test DB on qdrant somehow got a bit malformed in the processing - but probably good to have this anyway | |
| Args: | |
| content: Raw content from vector store | |
| Returns: | |
| Cleaned, readable text content | |
| """ | |
| if not content: | |
| return content | |
| # Check if content looks like a stringified list/nested structure | |
| content_stripped = content.strip() | |
| if content_stripped.startswith('[') and content_stripped.endswith(']'): | |
| try: | |
| # Parse as literal list structure | |
| parsed_content = ast.literal_eval(content_stripped) | |
| if isinstance(parsed_content, list): | |
| # Flatten nested lists and extract meaningful text | |
| def extract_text_from_nested(obj): | |
| if isinstance(obj, list): | |
| text_items = [] | |
| for item in obj: | |
| extracted = extract_text_from_nested(item) | |
| if extracted and extracted.strip(): | |
| text_items.append(extracted) | |
| return ' '.join(text_items) | |
| elif isinstance(obj, str) and obj.strip(): | |
| return obj.strip() | |
| elif isinstance(obj, dict): | |
| # Handle dict structures if present | |
| text_items = [] | |
| for key, value in obj.items(): | |
| if isinstance(value, str) and value.strip(): | |
| text_items.append(f"{key}: {value}") | |
| return ' '.join(text_items) | |
| else: | |
| return '' | |
| extracted_text = extract_text_from_nested(parsed_content) | |
| if extracted_text and len(extracted_text.strip()) > 0: | |
| # Clean up extra whitespace and format nicely | |
| cleaned_text = re.sub(r'\s+', ' ', extracted_text).strip() | |
| logging.debug(f"Successfully processed nested list content: {len(cleaned_text)} chars") | |
| return cleaned_text | |
| else: | |
| logging.warning("Parsed list content but no meaningful text found") | |
| return content # Return original if no meaningful text extracted | |
| except (ValueError, SyntaxError) as e: | |
| logging.debug(f"Content looks like list but failed to parse: {e}") | |
| # Fall through to return original content | |
| # For regular text content, just clean up whitespace | |
| return re.sub(r'\s+', ' ', content).strip() |