import { Hono } from 'hono'; import { stream } from 'hono/streaming'; import type { StatusCode } from 'hono/utils/http-status'; import { existsSync, mkdirSync, writeFileSync, readFileSync, unlinkSync } from 'fs'; import { join, basename, resolve } from 'path'; import { uploadFiles, checkRepoAccess, createRepo,whoAmI } from '@huggingface/hub'; import type { RepoDesignation } from '@huggingface/hub'; const app = new Hono(); const TARGET_BASE_URL = process.env.TARGET_BASE_URL || "https://router.huggingface.co"; const PORT = parseInt(process.env.PORT || '4040', 10); const LOGS_DIR = process.env.LOGS_DIR || './logs'; const HF_ACCESS_TOKEN = process.env.HF_API_KEY || ''; const DATASET_PRIVATE = (process.env.DATASET_PRIVATE || 'false').toLowerCase() === 'true'; /* USER_NAME - the name of the user to use for the dataset This will be used to invalidate requests that are not from the user */ const USER_NAME = process.env.USER_NAME || 'cfahlgren1'; if (!HF_ACCESS_TOKEN) { console.error('Please set HF_API_KEY in environment variable'); process.exit(1); } if (!USER_NAME) { console.error('Please set USER_NAME in environment variable'); process.exit(1); } /* BATCH_SIZE_LIMIT - the maximum batch size before pushing to dataset BATCH_TIME_LIMIT - the amount of time before pushing to dataset We will push to dataset for whatever is reached first. */ const BATCH_SIZE_LIMIT = parseInt(process.env.BATCH_SIZE_LIMIT || '100', 10); const BATCH_TIME_LIMIT = parseInt(process.env.BATCH_TIME_LIMIT || '1', 10); // 1 minute default if (!existsSync(LOGS_DIR)) { mkdirSync(LOGS_DIR, { recursive: true }); } async function checkUserAccess(username: string, accessToken: string): Promise { const response = await whoAmI({ accessToken }); return response.name === username; } const requestTraces: { model?: string; timestamp_start: string; timestamp_end?: string; messages?: any[]; prompt_tokens?: number; completion_tokens?: number; response?: string; arguments?: any; provider?: string; duration_ms?: number; status_code?: number; }[] = []; let lastTraceBatchTime = Date.now(); async function checkDatasetExists(datasetName: string): Promise { try { if (!HF_ACCESS_TOKEN) { console.warn('HF_ACCESS_TOKEN not set, skipping dataset check'); return false; } const repo: RepoDesignation = { type: 'dataset', name: datasetName }; await checkRepoAccess({ repo, accessToken: HF_ACCESS_TOKEN }); return true; } catch (error) { return false; } } async function createDataset(datasetName: string): Promise { try { if (!HF_ACCESS_TOKEN) { console.warn('HF_ACCESS_TOKEN not set, skipping dataset creation'); return false; } const repo: RepoDesignation = { type: 'dataset', name: datasetName }; await createRepo({ repo, accessToken: HF_ACCESS_TOKEN, private: DATASET_PRIVATE, files: [ { path: 'README.md', content: new Blob([`--- tags: - inference-proxy ---`]), } ] }); return true; } catch (error) { console.error('Error creating dataset:', error); return false; } } async function uploadTraceFile(filePath: string, datasetName: string): Promise { try { if (!HF_ACCESS_TOKEN) { console.warn('HF_ACCESS_TOKEN not set, skipping file upload'); return false; } const repo: RepoDesignation = { type: 'dataset', name: datasetName }; const fileName = basename(filePath); const uploadPath = `traces/${fileName}`; await uploadFiles({ repo, accessToken: HF_ACCESS_TOKEN, files: [ { path: uploadPath, content: new Blob([readFileSync(filePath)]), }, ], }); return true; } catch (error) { console.error('Error uploading trace file:', error); return false; } } function writeBatchToFile(traces: typeof requestTraces): string { try { const timestamp = Date.now(); const filename = `batch_${timestamp}.json`; const filePath = join(LOGS_DIR, filename); writeFileSync(filePath, JSON.stringify(traces, null, 2)); return filePath; } catch (error) { console.error('Error writing batch to file:', error); return ''; } } async function writeBatchedTraces() { if (requestTraces.length === 0) { return; } const tracesToWrite = [...requestTraces]; const batchSize = tracesToWrite.length; requestTraces.length = 0; lastTraceBatchTime = Date.now(); console.log(`Processing batch of ${batchSize} traces...`); // Write all traces to a single batch file const filePath = writeBatchToFile(tracesToWrite); const filePaths = filePath ? [filePath] : []; if (HF_ACCESS_TOKEN) { const response = await whoAmI({ accessToken: HF_ACCESS_TOKEN }); const datasetName = `${response.name}/traces`; // Check if dataset exists, create if not const exists = await checkDatasetExists(datasetName); if (!exists) { console.log(`Dataset ${datasetName} does not exist, creating...`); const created = await createDataset(datasetName); if (!created) { console.error(`Failed to create dataset ${datasetName}`); } else { console.log(`Successfully created dataset ${datasetName}`); } } // Upload files to dataset for (const filePath of filePaths) { const uploaded = await uploadTraceFile(filePath, datasetName); // Clean up local file if uploaded successfully if (uploaded && existsSync(filePath)) { unlinkSync(filePath); console.log(`Deleted local file ${filePath} after successful upload`); } } } else { console.log(`HF_ACCESS_TOKEN or HF_DATASET_OWNER not set, keeping ${filePaths.length} local files`); } console.log(`Successfully processed ${batchSize} traces.`); } setInterval(() => { const timeSinceLastBatch = Date.now() - lastTraceBatchTime; if (timeSinceLastBatch >= BATCH_TIME_LIMIT * 60 * 1000 && requestTraces.length > 0) { console.log(`Time limit reached (${BATCH_TIME_LIMIT} minutes). Flushing ${requestTraces.length} traces.`); writeBatchedTraces().catch(err => { console.error('Error flushing traces:', err); }); } }, Math.min(BATCH_TIME_LIMIT * 60 * 1000, 10000)); // Check at least every 10 seconds function checkAndFlushTraces() { if (requestTraces.length >= BATCH_SIZE_LIMIT) { console.log(`Batch size limit reached (${BATCH_SIZE_LIMIT}). Flushing traces.`); writeBatchedTraces().catch(err => { console.error('Error flushing traces:', err); }); return true; } return false; } app.get('/', (c) => { const hostUrl = new URL(c.req.url).origin; try { let html = readFileSync('index.html', 'utf8'); // Replace template variables html = html.replace(/{{TARGET_BASE_URL}}/g, TARGET_BASE_URL) .replace(/{{HOST_URL}}/g, hostUrl); return c.html(html); } catch (error) { console.error('Error reading index.html:', error); return c.text('Hono forwarding proxy running!', 500); } }); async function storeStreamedResponse(streamToLog: ReadableStream, contentType: string | null, targetUrl: string, traceIndex: number, statusCode: number) { const reader = streamToLog.getReader(); const chunks: Uint8Array[] = []; try { requestTraces[traceIndex].status_code = statusCode; while (true) { const { done, value } = await reader.read(); if (done) break; if (value) chunks.push(value); } const blob = new Blob(chunks); const bodyText = await blob.text(); contentType = contentType?.toLowerCase() || ''; // Handle event streams (streaming responses) if (contentType.includes('text/event-stream')) { const lines = bodyText.split('\n'); let accumulatedContent = ''; for (const line of lines) { if (line.startsWith('data: ')) { const jsonData = line.substring(5).trim(); if (jsonData && jsonData !== '[DONE]') { try { const parsed = JSON.parse(jsonData); if (parsed.choices && parsed.choices[0]?.delta?.content) { accumulatedContent += parsed.choices[0].delta.content; } } catch (parseError) { // Continue processing other lines } } } } if (accumulatedContent) { requestTraces[traceIndex].response = accumulatedContent; requestTraces[traceIndex].completion_tokens = accumulatedContent.length; } } else { try { const jsonResponse = JSON.parse(bodyText); // Get response content from standard LLM response formats requestTraces[traceIndex].response = jsonResponse.choices?.[0]?.message?.content || jsonResponse.generated_text || bodyText; // Get token counts if available if (jsonResponse.usage) { if (jsonResponse.usage.completion_tokens !== undefined) { requestTraces[traceIndex].completion_tokens = jsonResponse.usage.completion_tokens; } if (jsonResponse.usage.prompt_tokens !== undefined) { requestTraces[traceIndex].prompt_tokens = jsonResponse.usage.prompt_tokens; } } } catch (e) { // If not JSON, use bodyText as is requestTraces[traceIndex].response = bodyText; requestTraces[traceIndex].completion_tokens = bodyText.length; } } // Set the end timestamp after processing requestTraces[traceIndex].timestamp_end = new Date().toISOString(); // Calculate duration if we have both timestamps if (requestTraces[traceIndex].timestamp_start && requestTraces[traceIndex].timestamp_end) { const startTime = new Date(requestTraces[traceIndex].timestamp_start).getTime(); const endTime = new Date(requestTraces[traceIndex].timestamp_end).getTime(); requestTraces[traceIndex].duration_ms = endTime - startTime; } checkAndFlushTraces(); } catch (error) { requestTraces[traceIndex].timestamp_end = new Date().toISOString(); // Calculate duration if we have both timestamps if (requestTraces[traceIndex].timestamp_start && requestTraces[traceIndex].timestamp_end) { const startTime = new Date(requestTraces[traceIndex].timestamp_start).getTime(); const endTime = new Date(requestTraces[traceIndex].timestamp_end).getTime(); requestTraces[traceIndex].duration_ms = endTime - startTime; } checkAndFlushTraces(); } finally { try { reader.releaseLock(); } catch { // Ignore release errors } } } app.all('*', async (c) => { try { const authHeader = c.req.header('authorization') || ''; let requestToken = ''; if (authHeader.startsWith('Bearer ')) { requestToken = authHeader.substring(7); } // check if the user is authorized to access the dataset if (USER_NAME && !await checkUserAccess(USER_NAME, requestToken)) { return c.text('Unauthorized', 401); } const url = new URL(c.req.url); const targetPath = url.pathname; const targetUrl = `${TARGET_BASE_URL}${targetPath}${url.search}`; // Skip trace creation for favicon and other common browser requests const skipTracePatterns = ['/favicon.ico', '/robots.txt']; const shouldSkipTrace = skipTracePatterns.some(pattern => targetPath.includes(pattern)); // Extract provider from the URL path const pathParts = targetPath.split('/'); const provider = pathParts.length > 1 ? pathParts[1] : 'unknown'; // Only log if we're not skipping the trace if (!shouldSkipTrace) { console.log(`Forwarding request for ${url.pathname} to ${targetUrl}`); } const headers = new Headers(c.req.header()); headers.delete('host'); headers.set('host', new URL(TARGET_BASE_URL).host); headers.delete('content-length'); headers.delete('transfer-encoding'); let requestBody: BodyInit | null = null; let parsedRequestBody: any = null; const incomingContentType = c.req.header('content-type') || ''; const methodNeedsBody = !['GET', 'HEAD'].includes(c.req.method); if (methodNeedsBody && c.req.raw.body) { if (incomingContentType.includes('application/json')) { try { const rawBodyText = await c.req.text(); parsedRequestBody = JSON.parse(rawBodyText); requestBody = rawBodyText; } catch (e) { console.warn("Failed to parse incoming JSON body, forwarding raw body:", e); try { requestBody = await c.req.blob(); } catch (blobError) { console.error("Could not retrieve request body after JSON parse failure:", blobError); requestBody = null; } } } else { requestBody = c.req.raw.body; } } let shouldCreateTrace = !shouldSkipTrace; let traceEntry: typeof requestTraces[0] = { timestamp_start: new Date().toISOString(), provider }; if (parsedRequestBody) { if (parsedRequestBody.model) { traceEntry.model = parsedRequestBody.model; } else if (targetPath.includes('/models/') || targetPath.includes('/model/')) { const pathParts = targetPath.split('/'); const modelIndex = pathParts.findIndex(part => part === 'models' || part === 'model'); if (modelIndex >= 0 && pathParts.length > modelIndex + 1) { traceEntry.model = pathParts[modelIndex + 1]; } } // Skip traces without a valid model if (!traceEntry.model || traceEntry.model === 'unknown') { shouldCreateTrace = false; } if (parsedRequestBody.messages) { traceEntry.messages = parsedRequestBody.messages; let promptText = ''; for (const message of parsedRequestBody.messages) { if (message.content) { promptText += message.content; } } traceEntry.prompt_tokens = promptText.length; } if (parsedRequestBody.arguments) { traceEntry.arguments = parsedRequestBody.arguments; } else if (parsedRequestBody.parameters) { traceEntry.arguments = parsedRequestBody.parameters; } } else { // Skip traces without a request body shouldCreateTrace = false; } const traceIndex = shouldCreateTrace ? requestTraces.length : -1; if (shouldCreateTrace) { requestTraces.push(traceEntry); // Check if we need to flush based on batch size checkAndFlushTraces(); } const response = await fetch(targetUrl, { method: c.req.method, headers: headers, body: requestBody, }); // Only log if we're not skipping the trace if (!shouldSkipTrace) { console.log(`Received response status ${response.status} from ${targetUrl}`); } c.status(response.status as StatusCode); response.headers.forEach((value, key) => { if (key.toLowerCase() !== 'content-encoding' && key.toLowerCase() !== 'transfer-encoding') { c.header(key, value); } }); if (!response.headers.has('content-type')) { c.header('content-type', 'application/octet-stream'); } if (response.body) { const [streamForClient, streamForStorage] = response.body.tee(); const contentType = response.headers.get('content-type'); if (shouldCreateTrace && traceIndex >= 0) { storeStreamedResponse(streamForStorage, contentType, targetUrl, traceIndex, response.status).catch(err => { console.error("Error in background stream storage:", err); }); } return stream(c, async (streamInstance) => { await streamInstance.pipe(streamForClient); }); } else { // Only log if we're not skipping the trace if (!shouldSkipTrace) { console.log(`Received response with no body from ${targetUrl}.`); } if (shouldCreateTrace && traceIndex >= 0) { requestTraces[traceIndex].timestamp_end = new Date().toISOString(); requestTraces[traceIndex].status_code = response.status; // Calculate duration if we have both timestamps if (requestTraces[traceIndex].timestamp_start && requestTraces[traceIndex].timestamp_end) { const startTime = new Date(requestTraces[traceIndex].timestamp_start).getTime(); const endTime = new Date(requestTraces[traceIndex].timestamp_end).getTime(); requestTraces[traceIndex].duration_ms = endTime - startTime; } // Check if we need to flush based on batch size checkAndFlushTraces(); } return c.body(null); } } catch (error) { console.error('Error during proxy request:', error); return c.text('Internal Server Error', 500); } }); // Ensure we flush any remaining traces when the process is terminating process.on('SIGINT', () => { console.log('Process terminating, flushing remaining traces...'); writeBatchedTraces().then(() => { process.exit(); }).catch(err => { console.error('Error flushing traces on shutdown:', err); process.exit(1); }); }); process.on('SIGTERM', () => { console.log('Process terminating, flushing remaining traces...'); writeBatchedTraces().then(() => { process.exit(); }).catch(err => { console.error('Error flushing traces on shutdown:', err); process.exit(1); }); }); console.log(`Inference Proxy running on port ${PORT}`); console.log(`Forwarding to: ${TARGET_BASE_URL}`); console.log(`Logs directory: ${resolve(LOGS_DIR)}`); console.log(`Batching: max ${BATCH_SIZE_LIMIT} traces or ${BATCH_TIME_LIMIT} minutes`); export default { port: PORT, fetch: app.fetch, };