Spaces:
Runtime error
Runtime error
| # Copyright (c) Facebook, Inc. and its affiliates. | |
| # | |
| # This source code is licensed under the MIT license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| import faiss | |
| import time | |
| import numpy as np | |
| import logging | |
| LOG = logging.getLogger(__name__) | |
| def knn_ground_truth(xq, db_iterator, k, metric_type=faiss.METRIC_L2): | |
| """Computes the exact KNN search results for a dataset that possibly | |
| does not fit in RAM but for which we have an iterator that | |
| returns it block by block. | |
| """ | |
| LOG.info("knn_ground_truth queries size %s k=%d" % (xq.shape, k)) | |
| t0 = time.time() | |
| nq, d = xq.shape | |
| keep_max = faiss.is_similarity_metric(metric_type) | |
| rh = faiss.ResultHeap(nq, k, keep_max=keep_max) | |
| index = faiss.IndexFlat(d, metric_type) | |
| if faiss.get_num_gpus(): | |
| LOG.info('running on %d GPUs' % faiss.get_num_gpus()) | |
| index = faiss.index_cpu_to_all_gpus(index) | |
| # compute ground-truth by blocks, and add to heaps | |
| i0 = 0 | |
| for xbi in db_iterator: | |
| ni = xbi.shape[0] | |
| index.add(xbi) | |
| D, I = index.search(xq, k) | |
| I += i0 | |
| rh.add_result(D, I) | |
| index.reset() | |
| i0 += ni | |
| LOG.info("%d db elements, %.3f s" % (i0, time.time() - t0)) | |
| rh.finalize() | |
| LOG.info("GT time: %.3f s (%d vectors)" % (time.time() - t0, i0)) | |
| return rh.D, rh.I | |
| # knn function used to be here | |
| knn = faiss.knn | |
| def range_search_gpu(xq, r2, index_gpu, index_cpu, gpu_k=1024): | |
| """GPU does not support range search, so we emulate it with | |
| knn search + fallback to CPU index. | |
| The index_cpu can either be: | |
| - a CPU index that supports range search | |
| - a numpy table, that will be used to construct a Flat index if needed. | |
| - None. In that case, at most gpu_k results will be returned | |
| """ | |
| nq, d = xq.shape | |
| is_binary_index = isinstance(index_gpu, faiss.IndexBinary) | |
| keep_max = faiss.is_similarity_metric(index_gpu.metric_type) | |
| r2 = int(r2) if is_binary_index else float(r2) | |
| k = min(index_gpu.ntotal, gpu_k) | |
| LOG.debug( | |
| f"GPU search {nq} queries with {k=:} {is_binary_index=:} {keep_max=:}") | |
| t0 = time.time() | |
| D, I = index_gpu.search(xq, k) | |
| t1 = time.time() - t0 | |
| if is_binary_index: | |
| assert d * 8 < 32768 # let's compact the distance matrix | |
| D = D.astype('int16') | |
| t2 = 0 | |
| lim_remain = None | |
| if index_cpu is not None: | |
| if not keep_max: | |
| mask = D[:, k - 1] < r2 | |
| else: | |
| mask = D[:, k - 1] > r2 | |
| if mask.sum() > 0: | |
| LOG.debug("CPU search remain %d" % mask.sum()) | |
| t0 = time.time() | |
| if isinstance(index_cpu, np.ndarray): | |
| # then it in fact an array that we have to make flat | |
| xb = index_cpu | |
| if is_binary_index: | |
| index_cpu = faiss.IndexBinaryFlat(d * 8) | |
| else: | |
| index_cpu = faiss.IndexFlat(d, index_gpu.metric_type) | |
| index_cpu.add(xb) | |
| lim_remain, D_remain, I_remain = index_cpu.range_search(xq[mask], r2) | |
| if is_binary_index: | |
| D_remain = D_remain.astype('int16') | |
| t2 = time.time() - t0 | |
| LOG.debug("combine") | |
| t0 = time.time() | |
| CombinerRangeKNN = ( | |
| faiss.CombinerRangeKNNint16 if is_binary_index else | |
| faiss.CombinerRangeKNNfloat | |
| ) | |
| combiner = CombinerRangeKNN(nq, k, r2, keep_max) | |
| if True: | |
| sp = faiss.swig_ptr | |
| combiner.I = sp(I) | |
| combiner.D = sp(D) | |
| # combiner.set_knn_result(sp(I), sp(D)) | |
| if lim_remain is not None: | |
| combiner.mask = sp(mask) | |
| combiner.D_remain = sp(D_remain) | |
| combiner.lim_remain = sp(lim_remain.view("int64")) | |
| combiner.I_remain = sp(I_remain) | |
| # combiner.set_range_result(sp(mask), sp(lim_remain.view("int64")), sp(D_remain), sp(I_remain)) | |
| L_res = np.empty(nq + 1, dtype='int64') | |
| combiner.compute_sizes(sp(L_res)) | |
| nres = L_res[-1] | |
| D_res = np.empty(nres, dtype=D.dtype) | |
| I_res = np.empty(nres, dtype='int64') | |
| combiner.write_result(sp(D_res), sp(I_res)) | |
| else: | |
| D_res, I_res = [], [] | |
| nr = 0 | |
| for i in range(nq): | |
| if not mask[i]: | |
| if index_gpu.metric_type == faiss.METRIC_L2: | |
| nv = (D[i, :] < r2).sum() | |
| else: | |
| nv = (D[i, :] > r2).sum() | |
| D_res.append(D[i, :nv]) | |
| I_res.append(I[i, :nv]) | |
| else: | |
| l0, l1 = lim_remain[nr], lim_remain[nr + 1] | |
| D_res.append(D_remain[l0:l1]) | |
| I_res.append(I_remain[l0:l1]) | |
| nr += 1 | |
| L_res = np.cumsum([0] + [len(di) for di in D_res]) | |
| D_res = np.hstack(D_res) | |
| I_res = np.hstack(I_res) | |
| t3 = time.time() - t0 | |
| LOG.debug(f"times {t1:.3f}s {t2:.3f}s {t3:.3f}s") | |
| return L_res, D_res, I_res | |
| def range_ground_truth(xq, db_iterator, threshold, metric_type=faiss.METRIC_L2, | |
| shard=False, ngpu=-1): | |
| """Computes the range-search search results for a dataset that possibly | |
| does not fit in RAM but for which we have an iterator that | |
| returns it block by block. | |
| """ | |
| nq, d = xq.shape | |
| t0 = time.time() | |
| xq = np.ascontiguousarray(xq, dtype='float32') | |
| index = faiss.IndexFlat(d, metric_type) | |
| if ngpu == -1: | |
| ngpu = faiss.get_num_gpus() | |
| if ngpu: | |
| LOG.info('running on %d GPUs' % ngpu) | |
| co = faiss.GpuMultipleClonerOptions() | |
| co.shard = shard | |
| index_gpu = faiss.index_cpu_to_all_gpus(index, co=co, ngpu=ngpu) | |
| # compute ground-truth by blocks | |
| i0 = 0 | |
| D = [[] for _i in range(nq)] | |
| I = [[] for _i in range(nq)] | |
| for xbi in db_iterator: | |
| ni = xbi.shape[0] | |
| if ngpu > 0: | |
| index_gpu.add(xbi) | |
| lims_i, Di, Ii = range_search_gpu(xq, threshold, index_gpu, xbi) | |
| index_gpu.reset() | |
| else: | |
| index.add(xbi) | |
| lims_i, Di, Ii = index.range_search(xq, threshold) | |
| index.reset() | |
| Ii += i0 | |
| for j in range(nq): | |
| l0, l1 = lims_i[j], lims_i[j + 1] | |
| if l1 > l0: | |
| D[j].append(Di[l0:l1]) | |
| I[j].append(Ii[l0:l1]) | |
| i0 += ni | |
| LOG.info("%d db elements, %.3f s" % (i0, time.time() - t0)) | |
| empty_I = np.zeros(0, dtype='int64') | |
| empty_D = np.zeros(0, dtype='float32') | |
| # import pdb; pdb.set_trace() | |
| D = [(np.hstack(i) if i != [] else empty_D) for i in D] | |
| I = [(np.hstack(i) if i != [] else empty_I) for i in I] | |
| sizes = [len(i) for i in I] | |
| assert len(sizes) == nq | |
| lims = np.zeros(nq + 1, dtype="uint64") | |
| lims[1:] = np.cumsum(sizes) | |
| return lims, np.hstack(D), np.hstack(I) | |
| def threshold_radius_nres(nres, dis, ids, thresh, keep_max=False): | |
| """ select a set of results """ | |
| if keep_max: | |
| mask = dis > thresh | |
| else: | |
| mask = dis < thresh | |
| new_nres = np.zeros_like(nres) | |
| o = 0 | |
| for i, nr in enumerate(nres): | |
| nr = int(nr) # avoid issues with int64 + uint64 | |
| new_nres[i] = mask[o:o + nr].sum() | |
| o += nr | |
| return new_nres, dis[mask], ids[mask] | |
| def threshold_radius(lims, dis, ids, thresh, keep_max=False): | |
| """ restrict range-search results to those below a given radius """ | |
| if keep_max: | |
| mask = dis > thresh | |
| else: | |
| mask = dis < thresh | |
| new_lims = np.zeros_like(lims) | |
| n = len(lims) - 1 | |
| for i in range(n): | |
| l0, l1 = lims[i], lims[i + 1] | |
| new_lims[i + 1] = new_lims[i] + mask[l0:l1].sum() | |
| return new_lims, dis[mask], ids[mask] | |
| def apply_maxres(res_batches, target_nres, keep_max=False): | |
| """find radius that reduces number of results to target_nres, and | |
| applies it in-place to the result batches used in | |
| range_search_max_results""" | |
| alldis = np.hstack([dis for _, dis, _ in res_batches]) | |
| assert len(alldis) > target_nres | |
| if keep_max: | |
| alldis.partition(len(alldis) - target_nres - 1) | |
| radius = alldis[-1 - target_nres] | |
| else: | |
| alldis.partition(target_nres) | |
| radius = alldis[target_nres] | |
| if alldis.dtype == 'float32': | |
| radius = float(radius) | |
| else: | |
| radius = int(radius) | |
| LOG.debug(' setting radius to %s' % radius) | |
| totres = 0 | |
| for i, (nres, dis, ids) in enumerate(res_batches): | |
| nres, dis, ids = threshold_radius_nres( | |
| nres, dis, ids, radius, keep_max=keep_max) | |
| totres += len(dis) | |
| res_batches[i] = nres, dis, ids | |
| LOG.debug(' updated previous results, new nb results %d' % totres) | |
| return radius, totres | |
| def range_search_max_results(index, query_iterator, radius, | |
| max_results=None, min_results=None, | |
| shard=False, ngpu=0, clip_to_min=False): | |
| """Performs a range search with many queries (given by an iterator) | |
| and adjusts the threshold on-the-fly so that the total results | |
| table does not grow larger than max_results. | |
| If ngpu != 0, the function moves the index to this many GPUs to | |
| speed up search. | |
| """ | |
| # TODO: all result manipulations are in python, should move to C++ if perf | |
| # critical | |
| is_binary_index = isinstance(index, faiss.IndexBinary) | |
| if min_results is None: | |
| assert max_results is not None | |
| min_results = int(0.8 * max_results) | |
| if max_results is None: | |
| assert min_results is not None | |
| max_results = int(min_results * 1.5) | |
| if ngpu == -1: | |
| ngpu = faiss.get_num_gpus() | |
| if ngpu: | |
| LOG.info('running on %d GPUs' % ngpu) | |
| co = faiss.GpuMultipleClonerOptions() | |
| co.shard = shard | |
| index_gpu = faiss.index_cpu_to_all_gpus(index, co=co, ngpu=ngpu) | |
| else: | |
| index_gpu = None | |
| t_start = time.time() | |
| t_search = t_post_process = 0 | |
| qtot = totres = raw_totres = 0 | |
| res_batches = [] | |
| for xqi in query_iterator: | |
| t0 = time.time() | |
| LOG.debug(f"searching {len(xqi)} vectors") | |
| if index_gpu: | |
| lims_i, Di, Ii = range_search_gpu(xqi, radius, index_gpu, index) | |
| else: | |
| lims_i, Di, Ii = index.range_search(xqi, radius) | |
| nres_i = lims_i[1:] - lims_i[:-1] | |
| raw_totres += len(Di) | |
| qtot += len(xqi) | |
| t1 = time.time() | |
| if is_binary_index: | |
| # weird Faiss quirk that returns floats for Hamming distances | |
| Di = Di.astype('int16') | |
| totres += len(Di) | |
| res_batches.append((nres_i, Di, Ii)) | |
| if max_results is not None and totres > max_results: | |
| LOG.info('too many results %d > %d, scaling back radius' % | |
| (totres, max_results)) | |
| radius, totres = apply_maxres( | |
| res_batches, min_results, | |
| keep_max=index.metric_type == faiss.METRIC_INNER_PRODUCT | |
| ) | |
| t2 = time.time() | |
| t_search += t1 - t0 | |
| t_post_process += t2 - t1 | |
| LOG.debug(' [%.3f s] %d queries done, %d results' % ( | |
| time.time() - t_start, qtot, totres)) | |
| LOG.info( | |
| 'search done in %.3f s + %.3f s, total %d results, end threshold %g' % ( | |
| t_search, t_post_process, totres, radius) | |
| ) | |
| if clip_to_min and totres > min_results: | |
| radius, totres = apply_maxres( | |
| res_batches, min_results, | |
| keep_max=index.metric_type == faiss.METRIC_INNER_PRODUCT | |
| ) | |
| nres = np.hstack([nres_i for nres_i, dis_i, ids_i in res_batches]) | |
| dis = np.hstack([dis_i for nres_i, dis_i, ids_i in res_batches]) | |
| ids = np.hstack([ids_i for nres_i, dis_i, ids_i in res_batches]) | |
| lims = np.zeros(len(nres) + 1, dtype='uint64') | |
| lims[1:] = np.cumsum(nres) | |
| return radius, lims, dis, ids | |
| def exponential_query_iterator(xq, start_bs=32, max_bs=20000): | |
| """ produces batches of progressively increasing sizes. This is useful to | |
| adjust the search radius progressively without overflowing with | |
| intermediate results """ | |
| nq = len(xq) | |
| bs = start_bs | |
| i = 0 | |
| while i < nq: | |
| xqi = xq[i:i + bs] | |
| yield xqi | |
| if bs < max_bs: | |
| bs *= 2 | |
| i += len(xqi) | |