Spaces:
Paused
Paused
| from typing import Optional, List, Dict, Any, Union | |
| import logging | |
| from pinecone import Pinecone, ServerlessSpec | |
| from open_webui.retrieval.vector.main import ( | |
| VectorDBBase, | |
| VectorItem, | |
| SearchResult, | |
| GetResult, | |
| ) | |
| from open_webui.config import ( | |
| PINECONE_API_KEY, | |
| PINECONE_ENVIRONMENT, | |
| PINECONE_INDEX_NAME, | |
| PINECONE_DIMENSION, | |
| PINECONE_METRIC, | |
| PINECONE_CLOUD, | |
| ) | |
| from open_webui.env import SRC_LOG_LEVELS | |
| NO_LIMIT = 10000 # Reasonable limit to avoid overwhelming the system | |
| BATCH_SIZE = 100 # Recommended batch size for Pinecone operations | |
| log = logging.getLogger(__name__) | |
| log.setLevel(SRC_LOG_LEVELS["RAG"]) | |
| class PineconeClient(VectorDBBase): | |
| def __init__(self): | |
| self.collection_prefix = "open-webui" | |
| # Validate required configuration | |
| self._validate_config() | |
| # Store configuration values | |
| self.api_key = PINECONE_API_KEY | |
| self.environment = PINECONE_ENVIRONMENT | |
| self.index_name = PINECONE_INDEX_NAME | |
| self.dimension = PINECONE_DIMENSION | |
| self.metric = PINECONE_METRIC | |
| self.cloud = PINECONE_CLOUD | |
| # Initialize Pinecone client | |
| self.client = Pinecone(api_key=self.api_key) | |
| # Create index if it doesn't exist | |
| self._initialize_index() | |
| def _validate_config(self) -> None: | |
| """Validate that all required configuration variables are set.""" | |
| missing_vars = [] | |
| if not PINECONE_API_KEY: | |
| missing_vars.append("PINECONE_API_KEY") | |
| if not PINECONE_ENVIRONMENT: | |
| missing_vars.append("PINECONE_ENVIRONMENT") | |
| if not PINECONE_INDEX_NAME: | |
| missing_vars.append("PINECONE_INDEX_NAME") | |
| if not PINECONE_DIMENSION: | |
| missing_vars.append("PINECONE_DIMENSION") | |
| if not PINECONE_CLOUD: | |
| missing_vars.append("PINECONE_CLOUD") | |
| if missing_vars: | |
| raise ValueError( | |
| f"Required configuration missing: {', '.join(missing_vars)}" | |
| ) | |
| def _initialize_index(self) -> None: | |
| """Initialize the Pinecone index.""" | |
| try: | |
| # Check if index exists | |
| if self.index_name not in self.client.list_indexes().names(): | |
| log.info(f"Creating Pinecone index '{self.index_name}'...") | |
| self.client.create_index( | |
| name=self.index_name, | |
| dimension=self.dimension, | |
| metric=self.metric, | |
| spec=ServerlessSpec(cloud=self.cloud, region=self.environment), | |
| ) | |
| log.info(f"Successfully created Pinecone index '{self.index_name}'") | |
| else: | |
| log.info(f"Using existing Pinecone index '{self.index_name}'") | |
| # Connect to the index | |
| self.index = self.client.Index(self.index_name) | |
| except Exception as e: | |
| log.error(f"Failed to initialize Pinecone index: {e}") | |
| raise RuntimeError(f"Failed to initialize Pinecone index: {e}") | |
| def _create_points( | |
| self, items: List[VectorItem], collection_name_with_prefix: str | |
| ) -> List[Dict[str, Any]]: | |
| """Convert VectorItem objects to Pinecone point format.""" | |
| points = [] | |
| for item in items: | |
| # Start with any existing metadata or an empty dict | |
| metadata = item.get("metadata", {}).copy() if item.get("metadata") else {} | |
| # Add text to metadata if available | |
| if "text" in item: | |
| metadata["text"] = item["text"] | |
| # Always add collection_name to metadata for filtering | |
| metadata["collection_name"] = collection_name_with_prefix | |
| point = { | |
| "id": item["id"], | |
| "values": item["vector"], | |
| "metadata": metadata, | |
| } | |
| points.append(point) | |
| return points | |
| def _get_collection_name_with_prefix(self, collection_name: str) -> str: | |
| """Get the collection name with prefix.""" | |
| return f"{self.collection_prefix}_{collection_name}" | |
| def _normalize_distance(self, score: float) -> float: | |
| """Normalize distance score based on the metric used.""" | |
| if self.metric.lower() == "cosine": | |
| # Cosine similarity ranges from -1 to 1, normalize to 0 to 1 | |
| return (score + 1.0) / 2.0 | |
| elif self.metric.lower() in ["euclidean", "dotproduct"]: | |
| # These are already suitable for ranking (smaller is better for Euclidean) | |
| return score | |
| else: | |
| # For other metrics, use as is | |
| return score | |
| def _result_to_get_result(self, matches: list) -> GetResult: | |
| """Convert Pinecone matches to GetResult format.""" | |
| ids = [] | |
| documents = [] | |
| metadatas = [] | |
| for match in matches: | |
| metadata = match.get("metadata", {}) | |
| ids.append(match["id"]) | |
| documents.append(metadata.get("text", "")) | |
| metadatas.append(metadata) | |
| return GetResult( | |
| **{ | |
| "ids": [ids], | |
| "documents": [documents], | |
| "metadatas": [metadatas], | |
| } | |
| ) | |
| def has_collection(self, collection_name: str) -> bool: | |
| """Check if a collection exists by searching for at least one item.""" | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| try: | |
| # Search for at least 1 item with this collection name in metadata | |
| response = self.index.query( | |
| vector=[0.0] * self.dimension, # dummy vector | |
| top_k=1, | |
| filter={"collection_name": collection_name_with_prefix}, | |
| include_metadata=False, | |
| ) | |
| return len(response.matches) > 0 | |
| except Exception as e: | |
| log.exception( | |
| f"Error checking collection '{collection_name_with_prefix}': {e}" | |
| ) | |
| return False | |
| def delete_collection(self, collection_name: str) -> None: | |
| """Delete a collection by removing all vectors with the collection name in metadata.""" | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| try: | |
| self.index.delete(filter={"collection_name": collection_name_with_prefix}) | |
| log.info( | |
| f"Collection '{collection_name_with_prefix}' deleted (all vectors removed)." | |
| ) | |
| except Exception as e: | |
| log.warning( | |
| f"Failed to delete collection '{collection_name_with_prefix}': {e}" | |
| ) | |
| raise | |
| def insert(self, collection_name: str, items: List[VectorItem]) -> None: | |
| """Insert vectors into a collection.""" | |
| if not items: | |
| log.warning("No items to insert") | |
| return | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| points = self._create_points(items, collection_name_with_prefix) | |
| # Insert in batches for better performance and reliability | |
| for i in range(0, len(points), BATCH_SIZE): | |
| batch = points[i : i + BATCH_SIZE] | |
| try: | |
| self.index.upsert(vectors=batch) | |
| log.debug( | |
| f"Inserted batch of {len(batch)} vectors into '{collection_name_with_prefix}'" | |
| ) | |
| except Exception as e: | |
| log.error( | |
| f"Error inserting batch into '{collection_name_with_prefix}': {e}" | |
| ) | |
| raise | |
| log.info( | |
| f"Successfully inserted {len(items)} vectors into '{collection_name_with_prefix}'" | |
| ) | |
| def upsert(self, collection_name: str, items: List[VectorItem]) -> None: | |
| """Upsert (insert or update) vectors into a collection.""" | |
| if not items: | |
| log.warning("No items to upsert") | |
| return | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| points = self._create_points(items, collection_name_with_prefix) | |
| # Upsert in batches | |
| for i in range(0, len(points), BATCH_SIZE): | |
| batch = points[i : i + BATCH_SIZE] | |
| try: | |
| self.index.upsert(vectors=batch) | |
| log.debug( | |
| f"Upserted batch of {len(batch)} vectors into '{collection_name_with_prefix}'" | |
| ) | |
| except Exception as e: | |
| log.error( | |
| f"Error upserting batch into '{collection_name_with_prefix}': {e}" | |
| ) | |
| raise | |
| log.info( | |
| f"Successfully upserted {len(items)} vectors into '{collection_name_with_prefix}'" | |
| ) | |
| def search( | |
| self, collection_name: str, vectors: List[List[Union[float, int]]], limit: int | |
| ) -> Optional[SearchResult]: | |
| """Search for similar vectors in a collection.""" | |
| if not vectors or not vectors[0]: | |
| log.warning("No vectors provided for search") | |
| return None | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| if limit is None or limit <= 0: | |
| limit = NO_LIMIT | |
| try: | |
| # Search using the first vector (assuming this is the intended behavior) | |
| query_vector = vectors[0] | |
| # Perform the search | |
| query_response = self.index.query( | |
| vector=query_vector, | |
| top_k=limit, | |
| include_metadata=True, | |
| filter={"collection_name": collection_name_with_prefix}, | |
| ) | |
| if not query_response.matches: | |
| # Return empty result if no matches | |
| return SearchResult( | |
| ids=[[]], | |
| documents=[[]], | |
| metadatas=[[]], | |
| distances=[[]], | |
| ) | |
| # Convert to GetResult format | |
| get_result = self._result_to_get_result(query_response.matches) | |
| # Calculate normalized distances based on metric | |
| distances = [ | |
| [ | |
| self._normalize_distance(match.score) | |
| for match in query_response.matches | |
| ] | |
| ] | |
| return SearchResult( | |
| ids=get_result.ids, | |
| documents=get_result.documents, | |
| metadatas=get_result.metadatas, | |
| distances=distances, | |
| ) | |
| except Exception as e: | |
| log.error(f"Error searching in '{collection_name_with_prefix}': {e}") | |
| return None | |
| def query( | |
| self, collection_name: str, filter: Dict, limit: Optional[int] = None | |
| ) -> Optional[GetResult]: | |
| """Query vectors by metadata filter.""" | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| if limit is None or limit <= 0: | |
| limit = NO_LIMIT | |
| try: | |
| # Create a zero vector for the dimension as Pinecone requires a vector | |
| zero_vector = [0.0] * self.dimension | |
| # Combine user filter with collection_name | |
| pinecone_filter = {"collection_name": collection_name_with_prefix} | |
| if filter: | |
| pinecone_filter.update(filter) | |
| # Perform metadata-only query | |
| query_response = self.index.query( | |
| vector=zero_vector, | |
| filter=pinecone_filter, | |
| top_k=limit, | |
| include_metadata=True, | |
| ) | |
| return self._result_to_get_result(query_response.matches) | |
| except Exception as e: | |
| log.error(f"Error querying collection '{collection_name}': {e}") | |
| return None | |
| def get(self, collection_name: str) -> Optional[GetResult]: | |
| """Get all vectors in a collection.""" | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| try: | |
| # Use a zero vector for fetching all entries | |
| zero_vector = [0.0] * self.dimension | |
| # Add filter to only get vectors for this collection | |
| query_response = self.index.query( | |
| vector=zero_vector, | |
| top_k=NO_LIMIT, | |
| include_metadata=True, | |
| filter={"collection_name": collection_name_with_prefix}, | |
| ) | |
| return self._result_to_get_result(query_response.matches) | |
| except Exception as e: | |
| log.error(f"Error getting collection '{collection_name}': {e}") | |
| return None | |
| def delete( | |
| self, | |
| collection_name: str, | |
| ids: Optional[List[str]] = None, | |
| filter: Optional[Dict] = None, | |
| ) -> None: | |
| """Delete vectors by IDs or filter.""" | |
| collection_name_with_prefix = self._get_collection_name_with_prefix( | |
| collection_name | |
| ) | |
| try: | |
| if ids: | |
| # Delete by IDs (in batches for large deletions) | |
| for i in range(0, len(ids), BATCH_SIZE): | |
| batch_ids = ids[i : i + BATCH_SIZE] | |
| # Note: When deleting by ID, we can't filter by collection_name | |
| # This is a limitation of Pinecone - be careful with ID uniqueness | |
| self.index.delete(ids=batch_ids) | |
| log.debug( | |
| f"Deleted batch of {len(batch_ids)} vectors by ID from '{collection_name_with_prefix}'" | |
| ) | |
| log.info( | |
| f"Successfully deleted {len(ids)} vectors by ID from '{collection_name_with_prefix}'" | |
| ) | |
| elif filter: | |
| # Combine user filter with collection_name | |
| pinecone_filter = {"collection_name": collection_name_with_prefix} | |
| if filter: | |
| pinecone_filter.update(filter) | |
| # Delete by metadata filter | |
| self.index.delete(filter=pinecone_filter) | |
| log.info( | |
| f"Successfully deleted vectors by filter from '{collection_name_with_prefix}'" | |
| ) | |
| else: | |
| log.warning("No ids or filter provided for delete operation") | |
| except Exception as e: | |
| log.error(f"Error deleting from collection '{collection_name}': {e}") | |
| raise | |
| def reset(self) -> None: | |
| """Reset the database by deleting all collections.""" | |
| try: | |
| self.index.delete(delete_all=True) | |
| log.info("All vectors successfully deleted from the index.") | |
| except Exception as e: | |
| log.error(f"Failed to reset Pinecone index: {e}") | |
| raise | |