Spaces:
Build error
Build error
| # | |
| # GGUF file reading/modification support. For API usage information, | |
| # please see the files scripts/ for some fairly simple examples. | |
| # | |
| from __future__ import annotations | |
| import logging | |
| import os | |
| from collections import OrderedDict | |
| from typing import Any, Literal, NamedTuple, TypeVar, Union | |
| import numpy as np | |
| import numpy.typing as npt | |
| from .quants import quant_shape_to_byte_shape | |
| if __name__ == "__main__": | |
| import sys | |
| from pathlib import Path | |
| # Allow running file in package as a script. | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from gguf.constants import ( | |
| GGML_QUANT_SIZES, | |
| GGUF_DEFAULT_ALIGNMENT, | |
| GGUF_MAGIC, | |
| GGUF_VERSION, | |
| GGMLQuantizationType, | |
| GGUFValueType, | |
| ) | |
| logger = logging.getLogger(__name__) | |
| READER_SUPPORTED_VERSIONS = [2, GGUF_VERSION] | |
| class ReaderField(NamedTuple): | |
| # Offset to start of this field. | |
| offset: int | |
| # Name of the field (not necessarily from file data). | |
| name: str | |
| # Data parts. Some types have multiple components, such as strings | |
| # that consist of a length followed by the string data. | |
| parts: list[npt.NDArray[Any]] = [] | |
| # Indexes into parts that we can call the actual data. For example | |
| # an array of strings will be populated with indexes to the actual | |
| # string data. | |
| data: list[int] = [-1] | |
| types: list[GGUFValueType] = [] | |
| class ReaderTensor(NamedTuple): | |
| name: str | |
| tensor_type: GGMLQuantizationType | |
| shape: npt.NDArray[np.uint32] | |
| n_elements: int | |
| n_bytes: int | |
| data_offset: int | |
| data: npt.NDArray[Any] | |
| field: ReaderField | |
| class GGUFReader: | |
| # I - same as host, S - swapped | |
| byte_order: Literal['I', 'S'] = 'I' | |
| alignment: int = GGUF_DEFAULT_ALIGNMENT | |
| data_offset: int | |
| # Note: Internal helper, API may change. | |
| gguf_scalar_to_np: dict[GGUFValueType, type[np.generic]] = { | |
| GGUFValueType.UINT8: np.uint8, | |
| GGUFValueType.INT8: np.int8, | |
| GGUFValueType.UINT16: np.uint16, | |
| GGUFValueType.INT16: np.int16, | |
| GGUFValueType.UINT32: np.uint32, | |
| GGUFValueType.INT32: np.int32, | |
| GGUFValueType.FLOAT32: np.float32, | |
| GGUFValueType.UINT64: np.uint64, | |
| GGUFValueType.INT64: np.int64, | |
| GGUFValueType.FLOAT64: np.float64, | |
| GGUFValueType.BOOL: np.bool_, | |
| } | |
| def __init__(self, path: os.PathLike[str] | str, mode: Literal['r', 'r+', 'c'] = 'r'): | |
| self.data = np.memmap(path, mode = mode) | |
| offs = 0 | |
| # Check for GGUF magic | |
| if self._get(offs, np.uint32, override_order = '<')[0] != GGUF_MAGIC: | |
| raise ValueError('GGUF magic invalid') | |
| offs += 4 | |
| # Check GGUF version | |
| temp_version = self._get(offs, np.uint32) | |
| if temp_version[0] & 65535 == 0: | |
| # If we get 0 here that means it's (probably) a GGUF file created for | |
| # the opposite byte order of the machine this script is running on. | |
| self.byte_order = 'S' | |
| temp_version = temp_version.newbyteorder(self.byte_order) | |
| version = temp_version[0] | |
| if version not in READER_SUPPORTED_VERSIONS: | |
| raise ValueError(f'Sorry, file appears to be version {version} which we cannot handle') | |
| self.fields: OrderedDict[str, ReaderField] = OrderedDict() | |
| self.tensors: list[ReaderTensor] = [] | |
| offs += self._push_field(ReaderField(offs, 'GGUF.version', [temp_version], [0], [GGUFValueType.UINT32])) | |
| # Check tensor count and kv count | |
| temp_counts = self._get(offs, np.uint64, 2) | |
| offs += self._push_field(ReaderField(offs, 'GGUF.tensor_count', [temp_counts[:1]], [0], [GGUFValueType.UINT64])) | |
| offs += self._push_field(ReaderField(offs, 'GGUF.kv_count', [temp_counts[1:]], [0], [GGUFValueType.UINT64])) | |
| tensor_count, kv_count = temp_counts | |
| offs = self._build_fields(offs, kv_count) | |
| # Build Tensor Info Fields | |
| offs, tensors_fields = self._build_tensor_info(offs, tensor_count) | |
| new_align = self.fields.get('general.alignment') | |
| if new_align is not None: | |
| if new_align.types != [GGUFValueType.UINT32]: | |
| raise ValueError('Bad type for general.alignment field') | |
| self.alignment = new_align.parts[-1][0] | |
| padding = offs % self.alignment | |
| if padding != 0: | |
| offs += self.alignment - padding | |
| self.data_offset = offs | |
| self._build_tensors(offs, tensors_fields) | |
| _DT = TypeVar('_DT', bound = npt.DTypeLike) | |
| # Fetch a key/value metadata field by key. | |
| def get_field(self, key: str) -> Union[ReaderField, None]: | |
| return self.fields.get(key, None) | |
| # Fetch a tensor from the list by index. | |
| def get_tensor(self, idx: int) -> ReaderTensor: | |
| return self.tensors[idx] | |
| def _get( | |
| self, offset: int, dtype: npt.DTypeLike, count: int = 1, override_order: None | Literal['I', 'S', '<'] = None, | |
| ) -> npt.NDArray[Any]: | |
| count = int(count) | |
| itemsize = int(np.empty([], dtype = dtype).itemsize) | |
| end_offs = offset + itemsize * count | |
| arr = self.data[offset:end_offs].view(dtype=dtype)[:count] | |
| if override_order is None: | |
| return arr | |
| return arr.view(arr.dtype.newbyteorder(override_order)) | |
| def _push_field(self, field: ReaderField, skip_sum: bool = False) -> int: | |
| if field.name in self.fields: | |
| # TODO: add option to generate error on duplicate keys | |
| # raise KeyError(f'Duplicate {field.name} already in list at offset {field.offset}') | |
| logger.warning(f'Duplicate key {field.name} at offset {field.offset}') | |
| self.fields[field.name + '_{}'.format(field.offset)] = field | |
| else: | |
| self.fields[field.name] = field | |
| return 0 if skip_sum else sum(int(part.nbytes) for part in field.parts) | |
| def _get_str(self, offset: int) -> tuple[npt.NDArray[np.uint64], npt.NDArray[np.uint8]]: | |
| slen = self._get(offset, np.uint64) | |
| return slen, self._get(offset + 8, np.uint8, slen[0]) | |
| def _get_field_parts( | |
| self, orig_offs: int, raw_type: int, | |
| ) -> tuple[int, list[npt.NDArray[Any]], list[int], list[GGUFValueType]]: | |
| offs = orig_offs | |
| types: list[GGUFValueType] = [] | |
| gtype = GGUFValueType(raw_type) | |
| types.append(gtype) | |
| # Handle strings. | |
| if gtype == GGUFValueType.STRING: | |
| sparts: list[npt.NDArray[Any]] = list(self._get_str(offs)) | |
| size = sum(int(part.nbytes) for part in sparts) | |
| return size, sparts, [1], types | |
| # Check if it's a simple scalar type. | |
| nptype = self.gguf_scalar_to_np.get(gtype) | |
| if nptype is not None: | |
| val = self._get(offs, nptype) | |
| return int(val.nbytes), [val], [0], types | |
| # Handle arrays. | |
| if gtype == GGUFValueType.ARRAY: | |
| raw_itype = self._get(offs, np.uint32) | |
| offs += int(raw_itype.nbytes) | |
| alen = self._get(offs, np.uint64) | |
| offs += int(alen.nbytes) | |
| aparts: list[npt.NDArray[Any]] = [raw_itype, alen] | |
| data_idxs: list[int] = [] | |
| for idx in range(alen[0]): | |
| curr_size, curr_parts, curr_idxs, curr_types = self._get_field_parts(offs, raw_itype[0]) | |
| if idx == 0: | |
| types += curr_types | |
| idxs_offs = len(aparts) | |
| aparts += curr_parts | |
| data_idxs += (idx + idxs_offs for idx in curr_idxs) | |
| offs += curr_size | |
| return offs - orig_offs, aparts, data_idxs, types | |
| # We can't deal with this one. | |
| raise ValueError('Unknown/unhandled field type {gtype}') | |
| def _get_tensor_info_field(self, orig_offs: int) -> ReaderField: | |
| offs = orig_offs | |
| # Get Tensor Name | |
| name_len, name_data = self._get_str(offs) | |
| offs += int(name_len.nbytes + name_data.nbytes) | |
| # Get Tensor Dimensions Count | |
| n_dims = self._get(offs, np.uint32) | |
| offs += int(n_dims.nbytes) | |
| # Get Tensor Dimension Array | |
| dims = self._get(offs, np.uint64, n_dims[0]) | |
| offs += int(dims.nbytes) | |
| # Get Tensor Encoding Scheme Type | |
| raw_dtype = self._get(offs, np.uint32) | |
| offs += int(raw_dtype.nbytes) | |
| # Get Tensor Offset | |
| offset_tensor = self._get(offs, np.uint64) | |
| offs += int(offset_tensor.nbytes) | |
| return ReaderField( | |
| orig_offs, | |
| str(bytes(name_data), encoding = 'utf-8'), | |
| [name_len, name_data, n_dims, dims, raw_dtype, offset_tensor], | |
| [1, 3, 4, 5], | |
| ) | |
| def _build_fields(self, offs: int, count: int) -> int: | |
| for _ in range(count): | |
| orig_offs = offs | |
| kv_klen, kv_kdata = self._get_str(offs) | |
| offs += int(kv_klen.nbytes + kv_kdata.nbytes) | |
| raw_kv_type = self._get(offs, np.uint32) | |
| offs += int(raw_kv_type.nbytes) | |
| parts: list[npt.NDArray[Any]] = [kv_klen, kv_kdata, raw_kv_type] | |
| idxs_offs = len(parts) | |
| field_size, field_parts, field_idxs, field_types = self._get_field_parts(offs, raw_kv_type[0]) | |
| parts += field_parts | |
| self._push_field(ReaderField( | |
| orig_offs, | |
| str(bytes(kv_kdata), encoding = 'utf-8'), | |
| parts, | |
| [idx + idxs_offs for idx in field_idxs], | |
| field_types, | |
| ), skip_sum = True) | |
| offs += field_size | |
| return offs | |
| def _build_tensor_info(self, offs: int, count: int) -> tuple[int, list[ReaderField]]: | |
| tensor_fields = [] | |
| for _ in range(count): | |
| field = self._get_tensor_info_field(offs) | |
| offs += sum(int(part.nbytes) for part in field.parts) | |
| tensor_fields.append(field) | |
| return offs, tensor_fields | |
| def _build_tensors(self, start_offs: int, fields: list[ReaderField]) -> None: | |
| tensors = [] | |
| tensor_names = set() # keep track of name to prevent duplicated tensors | |
| for field in fields: | |
| _name_len, name_data, _n_dims, dims, raw_dtype, offset_tensor = field.parts | |
| # check if there's any tensor having same name already in the list | |
| tensor_name = str(bytes(name_data), encoding = 'utf-8') | |
| if tensor_name in tensor_names: | |
| raise ValueError(f'Found duplicated tensor with name {tensor_name}') | |
| tensor_names.add(tensor_name) | |
| ggml_type = GGMLQuantizationType(raw_dtype[0]) | |
| n_elems = int(np.prod(dims)) | |
| np_dims = tuple(reversed(dims.tolist())) | |
| block_size, type_size = GGML_QUANT_SIZES[ggml_type] | |
| n_bytes = n_elems * type_size // block_size | |
| data_offs = int(start_offs + offset_tensor[0]) | |
| item_type: npt.DTypeLike | |
| if ggml_type == GGMLQuantizationType.F16: | |
| item_count = n_elems | |
| item_type = np.float16 | |
| elif ggml_type == GGMLQuantizationType.F32: | |
| item_count = n_elems | |
| item_type = np.float32 | |
| elif ggml_type == GGMLQuantizationType.F64: | |
| item_count = n_elems | |
| item_type = np.float64 | |
| elif ggml_type == GGMLQuantizationType.I8: | |
| item_count = n_elems | |
| item_type = np.int8 | |
| elif ggml_type == GGMLQuantizationType.I16: | |
| item_count = n_elems | |
| item_type = np.int16 | |
| elif ggml_type == GGMLQuantizationType.I32: | |
| item_count = n_elems | |
| item_type = np.int32 | |
| elif ggml_type == GGMLQuantizationType.I64: | |
| item_count = n_elems | |
| item_type = np.int64 | |
| else: | |
| item_count = n_bytes | |
| item_type = np.uint8 | |
| np_dims = quant_shape_to_byte_shape(np_dims, ggml_type) | |
| tensors.append(ReaderTensor( | |
| name = tensor_name, | |
| tensor_type = ggml_type, | |
| shape = dims, | |
| n_elements = n_elems, | |
| n_bytes = n_bytes, | |
| data_offset = data_offs, | |
| data = self._get(data_offs, item_type, item_count).reshape(np_dims), | |
| field = field, | |
| )) | |
| self.tensors = tensors | |