Spaces:
Runtime error
Runtime error
| import asyncio | |
| import os | |
| import torch | |
| from grpc import aio | |
| from loguru import logger | |
| from grpc_reflection.v1alpha import reflection | |
| from pathlib import Path | |
| from typing import List, Optional | |
| from text_generation_server.cache import Cache | |
| from text_generation_server.interceptor import ExceptionInterceptor | |
| from text_generation_server.models import Model, get_model | |
| from text_generation_server.pb import generate_pb2_grpc, generate_pb2 | |
| from text_generation_server.tracing import UDSOpenTelemetryAioServerInterceptor | |
| class TextGenerationService(generate_pb2_grpc.TextGenerationServiceServicer): | |
| def __init__(self, model: Model, cache: Cache, server_urls: List[str]): | |
| self.cache = cache | |
| self.model = model | |
| self.server_urls = server_urls | |
| # For some reason, inference_mode does not work well with GLOO which we use on CPU | |
| if model.device.type == "cuda": | |
| # Force inference mode for the lifetime of TextGenerationService | |
| self._inference_mode_raii_guard = torch._C._InferenceMode(True) | |
| async def Info(self, request, context): | |
| return self.model.info | |
| async def Health(self, request, context): | |
| if self.model.device.type == "cuda": | |
| torch.zeros((2, 2)).cuda() | |
| return generate_pb2.HealthResponse() | |
| async def ServiceDiscovery(self, request, context): | |
| return generate_pb2.ServiceDiscoveryResponse(urls=self.server_urls) | |
| async def ClearCache(self, request, context): | |
| if request.HasField("id"): | |
| self.cache.delete(request.id) | |
| else: | |
| self.cache.clear() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| return generate_pb2.ClearCacheResponse() | |
| async def FilterBatch(self, request, context): | |
| batch = self.cache.pop(request.batch_id) | |
| if batch is None: | |
| raise ValueError(f"Batch ID {request.batch_id} not found in cache.") | |
| filtered_batch = batch.filter(request.keep_requests) | |
| self.cache.set(filtered_batch) | |
| return generate_pb2.FilterBatchResponse(batch=filtered_batch.to_pb()) | |
| async def Prefill(self, request, context): | |
| batch = self.model.batch_type.from_pb( | |
| request.batch, self.model.tokenizer, self.model.device | |
| ) | |
| generations, next_batch = self.model.generate_token(batch) | |
| self.cache.set(next_batch) | |
| return generate_pb2.PrefillResponse( | |
| generations=[generation.to_pb() for generation in generations], | |
| batch=next_batch.to_pb() if next_batch else None, | |
| ) | |
| async def Decode(self, request, context): | |
| if len(request.batches) == 0: | |
| raise ValueError("Must provide at least one batch") | |
| batches = [] | |
| for batch_pb in request.batches: | |
| batch = self.cache.pop(batch_pb.id) | |
| if batch is None: | |
| raise ValueError(f"Batch ID {batch_pb.id} not found in cache.") | |
| batches.append(batch) | |
| if len(batches) == 0: | |
| raise ValueError("All batches are empty") | |
| if len(batches) > 1: | |
| batch = self.model.batch_type.concatenate(batches) | |
| else: | |
| batch = batches[0] | |
| generations, next_batch = self.model.generate_token(batch) | |
| self.cache.set(next_batch) | |
| return generate_pb2.DecodeResponse( | |
| generations=[generation.to_pb() for generation in generations], | |
| batch=next_batch.to_pb() if next_batch else None, | |
| ) | |
| def serve( | |
| model_id: str, | |
| revision: Optional[str], | |
| sharded: bool, | |
| quantize: bool, | |
| uds_path: Path, | |
| ): | |
| async def serve_inner( | |
| model_id: str, | |
| revision: Optional[str], | |
| sharded: bool = False, | |
| quantize: bool = False, | |
| ): | |
| unix_socket_template = "unix://{}-{}" | |
| if sharded: | |
| server_urls = [ | |
| unix_socket_template.format(uds_path, rank) | |
| for rank in range(int(os.environ["WORLD_SIZE"])) | |
| ] | |
| local_url = server_urls[int(os.environ["RANK"])] | |
| else: | |
| local_url = unix_socket_template.format(uds_path, 0) | |
| server_urls = [local_url] | |
| try: | |
| model = get_model(model_id, revision, sharded, quantize) | |
| except Exception: | |
| logger.exception("Error when initializing model") | |
| raise | |
| server = aio.server( | |
| interceptors=[ | |
| ExceptionInterceptor(), | |
| UDSOpenTelemetryAioServerInterceptor(), | |
| ] | |
| ) | |
| generate_pb2_grpc.add_TextGenerationServiceServicer_to_server( | |
| TextGenerationService(model, Cache(), server_urls), server | |
| ) | |
| SERVICE_NAMES = ( | |
| generate_pb2.DESCRIPTOR.services_by_name["TextGenerationService"].full_name, | |
| reflection.SERVICE_NAME, | |
| ) | |
| reflection.enable_server_reflection(SERVICE_NAMES, server) | |
| server.add_insecure_port(local_url) | |
| await server.start() | |
| logger.info("Server started at {}".format(local_url)) | |
| try: | |
| await server.wait_for_termination() | |
| except KeyboardInterrupt: | |
| logger.info("Signal received. Shutting down") | |
| await server.stop(0) | |
| asyncio.run(serve_inner(model_id, revision, sharded, quantize)) | |