dylanebert's picture
refactor
bc7e9cd
import type { IncomingMessage } from "http";
import type { WebSocket } from "ws";
import { LangGraphAgent } from "./langgraph-agent";
import { HumanMessage, AIMessage, BaseMessage } from "@langchain/core/messages";
import { consoleBuffer } from "./console-buffer";
import { mcpClientManager } from "./mcp-client";
import { virtualFileSystem } from "../services/virtual-fs";
export interface WebSocketMessage {
type:
| "chat"
| "error"
| "status"
| "stream"
| "stream_start"
| "stream_token"
| "stream_end"
| "auth"
| "editor_update"
| "editor_sync"
| "tool_execution"
| "console_sync"
| "abort"
| "clear_conversation";
payload: {
content?: string;
role?: string;
chunk?: string;
token?: string;
error?: string;
processing?: boolean;
connected?: boolean;
message?: string;
toolName?: string;
toolArgs?: Record<string, unknown>;
toolResult?: string;
id?: string;
type?: string;
messageId?: string;
};
timestamp: number;
}
class WebSocketManager {
private connections: Map<
WebSocket,
{
token?: string;
agent?: LangGraphAgent;
messages?: BaseMessage[];
abortController?: AbortController;
}
> = new Map();
handleConnection(ws: WebSocket, _request: IncomingMessage) {
this.connections.set(ws, {});
this.sendMessage(ws, {
type: "status",
payload: { connected: true, message: "Connected to agent server" },
timestamp: Date.now(),
});
ws.on("message", async (data) => {
try {
const message = JSON.parse(data.toString()) as WebSocketMessage;
await this.handleMessage(ws, message);
} catch (error) {
console.error("Error handling WebSocket message:", error);
this.sendError(ws, "Failed to process message");
}
});
ws.on("close", async () => {
await mcpClientManager.cleanup();
this.connections.delete(ws);
});
ws.on("error", (error) => {
console.error("WebSocket error:", error);
this.connections.delete(ws);
});
}
private async handleMessage(ws: WebSocket, message: WebSocketMessage) {
const connectionData = this.connections.get(ws);
switch (message.type) {
case "auth":
if (message.payload.token && connectionData) {
connectionData.token = message.payload.token;
try {
connectionData.agent = new LangGraphAgent();
await connectionData.agent.initialize(message.payload.token, ws);
connectionData.messages = [];
this.sendMessage(ws, {
type: "status",
payload: {
message: "Authenticated successfully",
connected: true,
},
timestamp: Date.now(),
});
} catch (error) {
console.error("Authentication failed:", error);
this.sendError(
ws,
"Authentication failed. Please sign in with Hugging Face.",
);
}
} else {
this.sendError(ws, "No authentication token provided");
}
break;
case "console_sync":
if (
message.payload.id &&
message.payload.type &&
message.payload.message
) {
consoleBuffer.addMessage({
id: message.payload.id,
type: message.payload.type as "log" | "warn" | "error" | "info",
message: message.payload.message,
timestamp: message.timestamp,
});
}
break;
case "abort":
if (connectionData?.abortController) {
connectionData.abortController.abort();
this.sendMessage(ws, {
type: "status",
payload: { processing: false, message: "Conversation stopped" },
timestamp: Date.now(),
});
}
break;
case "editor_sync":
if (message.payload.content) {
virtualFileSystem.updateGameContent(
message.payload.content as string,
);
}
break;
case "clear_conversation":
if (connectionData) {
connectionData.messages = [];
this.sendMessage(ws, {
type: "status",
payload: { message: "Conversation history cleared" },
timestamp: Date.now(),
});
}
break;
case "chat":
try {
if (!connectionData?.agent) {
throw new Error(
"Authentication required. Please sign in with Hugging Face.",
);
}
const userMessage = message.payload.content;
if (!userMessage) {
throw new Error("No message content provided");
}
connectionData.abortController = new AbortController();
this.sendMessage(ws, {
type: "status",
payload: { processing: true },
timestamp: Date.now(),
});
if (!connectionData.messages) {
connectionData.messages = [];
}
connectionData.messages.push(new HumanMessage(userMessage));
const messageId = `msg_${Date.now()}`;
this.sendMessage(ws, {
type: "stream_start",
payload: { messageId },
timestamp: Date.now(),
});
const response = await connectionData.agent.processMessage(
userMessage,
connectionData.messages.slice(0, -1),
(chunk: string) => {
this.sendMessage(ws, {
type: "stream_token",
payload: {
token: chunk,
messageId,
},
timestamp: Date.now(),
});
},
messageId,
connectionData.abortController.signal,
);
connectionData.messages.push(new AIMessage(response));
this.sendMessage(ws, {
type: "stream_end",
payload: {
messageId,
content: response,
},
timestamp: Date.now(),
});
this.sendMessage(ws, {
type: "status",
payload: { processing: false },
timestamp: Date.now(),
});
} catch (error) {
console.error("Error processing chat message:", error);
if (error instanceof Error && error.name === "AbortError") {
this.sendMessage(ws, {
type: "stream_end",
payload: {
messageId: `msg_${Date.now()}`,
content: "",
},
timestamp: Date.now(),
});
} else {
this.sendError(
ws,
error instanceof Error ? error.message : "Unknown error",
);
}
} finally {
if (connectionData) {
connectionData.abortController = undefined;
}
}
break;
default:
this.sendError(ws, `Unknown message type: ${message.type}`);
}
}
private sendMessage(ws: WebSocket, message: WebSocketMessage) {
if (ws.readyState === ws.OPEN) {
ws.send(JSON.stringify(message));
}
}
private sendError(ws: WebSocket, error: string) {
this.sendMessage(ws, {
type: "error",
payload: { error },
timestamp: Date.now(),
});
}
broadcast(message: WebSocketMessage) {
for (const [ws] of this.connections) {
this.sendMessage(ws, message);
}
}
}
export const wsManager = new WebSocketManager();