diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index 36e5284..f2f5b74 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -1,17 +1,20 @@ -import { Hono } from 'hono' -import { logger } from 'hono/logger' -import { contextStorage, getContext } from 'hono/context-storage' -import { prettyJSON } from 'hono/pretty-json' -import { cors } from 'hono/cors' +import { Hono } from 'hono'; +import { logger } from 'hono/logger'; +import { contextStorage, getContext } from 'hono/context-storage'; +import { prettyJSON } from 'hono/pretty-json'; +import { cors } from 'hono/cors'; -import { trpcServer } from '@hono/trpc-server' +import { trpcServer } from '@hono/trpc-server'; +import Redis from 'ioredis'; +import redis from './redis'; +import minioClient from './minio'; +import { Client } from 'minio'; +import oidc from './oidc/provider'; +import { appRouter } from './trpc'; + +import { createBunWebSocket } from 'hono/bun'; +import { wsHandler, wsConfig } from './socket'; -import Redis from 'ioredis' -import redis from './redis' -import minioClient from './minio' -import { Client } from 'minio' -import oidc from './oidc/provider' -import { appRouter } from './trpc' type Env = { Variables: { redis: Redis; @@ -21,10 +24,13 @@ type Env = { const app = new Hono(); -app.use('*', cors({ - origin: 'http://localhost:3001', - credentials: true, -})) +app.use( + '*', + cors({ + origin: 'http://localhost:3001', + credentials: true, + }), +); app.use('*', async (c, next) => { c.set('redis', redis); @@ -51,4 +57,14 @@ app.use('/oidc/*', async (c, next) => { // return void 也可以 return; }); + +// 添加 WebSocket 路由 +app.get('/ws', wsHandler); +const bunServerConfig = { + port: 3000, + fetch: app.fetch, + ...wsConfig, +}; +// 启动 Bun 服务器 +Bun.serve(bunServerConfig); export default app; diff --git a/apps/backend/src/socket.ts b/apps/backend/src/socket.ts new file mode 100644 index 0000000..2ff3db4 --- /dev/null +++ b/apps/backend/src/socket.ts @@ -0,0 +1,122 @@ +import { createBunWebSocket } from 'hono/bun'; +import type { ServerWebSocket } from 'bun'; +import { WSContext } from 'hono/ws'; +import { MessageType } from '../../../packages/client/src/websocket/type'; + +// 定义消息类型接口 +interface WSMessage { + type: 'join' | 'leave' | 'message'; + roomId: string; + data: any; +} + +// 创建 WebSocket 实例并指定泛型类型 +const { upgradeWebSocket, websocket } = createBunWebSocket(); + +// 存储房间和连接的映射关系 +const rooms = new Map>>(); + +// WebSocket 处理器 +const wsHandler = upgradeWebSocket((c) => { + // 存储当前连接加入的房间 + const joinedRooms = new Set(); + + return { + onOpen(_, ws) { + console.log('WebSocket 连接已建立'); + }, + onMessage(message, ws) { + try { + const parsedMessage: WSMessage = JSON.parse(message.data); + console.log('收到消息:', parsedMessage); + + switch (parsedMessage.type) { + case 'join': + // 加入房间 + if (!rooms.has(parsedMessage.roomId)) { + rooms.set(parsedMessage.roomId, new Set()); + } + rooms.get(parsedMessage.roomId)?.add(ws); + joinedRooms.add(parsedMessage.roomId); + + // 发送加入成功消息 + ws.send( + JSON.stringify({ + type: 'system', + data: { + text: `成功加入房间 ${parsedMessage.roomId}`, + type: MessageType.TEXT, + }, + roomId: parsedMessage.roomId, + }), + ); + break; + + case 'leave': + // 离开房间 + rooms.get(parsedMessage.roomId)?.delete(ws); + joinedRooms.delete(parsedMessage.roomId); + if (rooms.get(parsedMessage.roomId)?.size === 0) { + rooms.delete(parsedMessage.roomId); + } + break; + + case 'message': + // 在指定房间内广播消息 + const room = rooms.get(parsedMessage.roomId); + if (room) { + const messageToSend = { + type: 'message', + data: parsedMessage.data, + roomId: parsedMessage.roomId, + }; + + for (const conn of room) { + if (conn.readyState === WebSocket.OPEN) { + conn.send(JSON.stringify(messageToSend)); + } + } + } + break; + } + } catch (error) { + console.error('处理消息时出错:', error); + ws.send( + JSON.stringify({ + type: 'error', + data: { + text: '消息处理失败', + type: MessageType.TEXT, + }, + }), + ); + } + }, + onClose(_, ws) { + console.log('WebSocket 连接已关闭'); + // 清理所有加入的房间 + for (const roomId of joinedRooms) { + rooms.get(roomId)?.delete(ws); + if (rooms.get(roomId)?.size === 0) { + rooms.delete(roomId); + } + } + }, + onError(error, ws) { + console.error('WebSocket 错误:', error); + // 清理所有加入的房间 + for (const roomId of joinedRooms) { + rooms.get(roomId)?.delete(ws); + if (rooms.get(roomId)?.size === 0) { + rooms.delete(roomId); + } + } + }, + }; +}); + +export const wsConfig = { + websocket, +}; + +export { wsHandler, rooms }; diff --git a/apps/web/.env.example b/apps/web/.env.example index 8183f56..72f3759 100644 --- a/apps/web/.env.example +++ b/apps/web/.env.example @@ -1 +1 @@ -DATABASE_URL="postgresql://root:Letusdoit000@localhost:5432/app?schema=public" \ No newline at end of file +DATABASE_URL="postgresql://nice:Letusdoit123@localhost:5432/db?schema=public" \ No newline at end of file diff --git a/apps/web/app/page.tsx b/apps/web/app/page.tsx index 60c4991..e34e682 100644 --- a/apps/web/app/page.tsx +++ b/apps/web/app/page.tsx @@ -1,13 +1,161 @@ 'use client'; -import { useTRPC } from '@repo/client'; +import { useHello, useTRPC, useWebSocket, MessageType } from '@repo/client'; import { useQuery } from '@tanstack/react-query'; -import { useEffect } from 'react'; +import { useRef, useState } from 'react'; export default function Home() { const trpc = useTRPC(); const { data, isLoading } = useQuery(trpc.user.getUser.queryOptions()); - useEffect(() => { - console.log(data); - }, [data]); - return
123
; + const [message, setMessage] = useState(''); + const [roomId, setRoomId] = useState(''); + const messagesEndRef = useRef(null); + + // 使用 WebSocket hook + const { messages, currentRoom, connecting, joinRoom, leaveRoom, sendMessage } = useWebSocket(); + + // 滚动到底部 + const scrollToBottom = () => { + messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); + setTimeout(scrollToBottom, 100); + }; + + const handleJoinRoom = async () => { + const success = await joinRoom(roomId.trim()); + if (success) { + setRoomId(''); + } + }; + + const handleLeaveRoom = async () => { + await leaveRoom(); + }; + + const handleSendMessage = async () => { + const success = await sendMessage({ + text: message, + type: MessageType.TEXT, + }); + if (success) { + setMessage(''); + } + }; + + // 处理按回车发送消息 + const handleKeyPress = (e: React.KeyboardEvent) => { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + handleSendMessage(); + } + }; + + return ( +
+

WebSocket 房间测试

+ + {/* 房间管理 */} +
+

房间管理

+ {!currentRoom ? ( +
+ setRoomId(e.target.value)} + onKeyPress={handleKeyPress} + disabled={connecting} + className="border border-gray-300 rounded px-3 py-2 flex-1" + placeholder="输入房间ID..." + /> + +
+ ) : ( +
+ 当前房间: {currentRoom} + +
+ )} +
+ + {/* 消息显示区域 */} + {currentRoom && ( +
+
+ {messages.map((msg, index) => ( +
+
+ {msg.data.type === MessageType.IMAGE && msg.data.fileUri && ( + 图片消息 + )} + {msg.data.type === MessageType.FILE && msg.data.fileUri && ( + + 下载文件 + + )} + {msg.data.text &&
{msg.data.text}
} +
+
+ ))} +
+
+
+ setMessage(e.target.value)} + onKeyPress={handleKeyPress} + disabled={connecting} + className="border border-gray-300 rounded px-3 py-2 flex-1" + placeholder="输入消息..." + /> + +
+
+ )} + + {!currentRoom && ( +
+

提示: 请先加入一个房间开始聊天

+

打开多个浏览器窗口,输入相同的房间ID来测试房间内通信

+
+ )} +
+ ); } diff --git a/dockers/database-postgres.yml b/dockers/database-postgres.yml index 91c23e9..27d324c 100644 --- a/dockers/database-postgres.yml +++ b/dockers/database-postgres.yml @@ -3,21 +3,18 @@ services: image: postgres:${POSTGRES_VERSION:-16} container_name: postgres ports: - - "5432:5432" + - '5432:5432' environment: - POSTGRES_DB=${POSTGRES_DB:-nice_db} - POSTGRES_USER=${POSTGRES_USER:-nice_user} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-nice_password} volumes: - - type: volume - source: postgres_data - target: /var/lib/postgresql/data + - ./volumes/postgres:/var/lib/postgresql/data + # - type: volume + # source: postgres_data + # target: /var/lib/postgresql/data healthcheck: - test: - [ - "CMD-SHELL", - "sh -c 'pg_isready -U ${POSTGRES_USER:-nice_user} -d ${POSTGRES_DB:-nice_db}'", - ] + test: ['CMD-SHELL', "sh -c 'pg_isready -U ${POSTGRES_USER:-nice_user} -d ${POSTGRES_DB:-nice_db}'"] interval: 10s timeout: 3s retries: 3 diff --git a/packages/client/src/api/hooks/useHello.ts b/packages/client/src/api/hooks/useHello.ts index 5f06611..a3bd83d 100644 --- a/packages/client/src/api/hooks/useHello.ts +++ b/packages/client/src/api/hooks/useHello.ts @@ -3,10 +3,12 @@ import { useTRPC } from '../trpc'; export function useHello() { const trpc = useTRPC(); - trpc.user.getUser.queryOptions() - + // trpc.user.getUser.queryOptions(); + const getUser = useQuery(trpc.user.getUser.queryOptions()); + console.log(getUser); // useQuery; return { + getUser: getUser, // hello: api.hello, }; } diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index b1c13e7..8bac1ff 100755 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -1 +1,3 @@ export * from './api'; +export * from './websocket'; +export { MessageType } from './websocket/type'; diff --git a/packages/client/src/websocket/client.ts b/packages/client/src/websocket/client.ts new file mode 100644 index 0000000..c1427d3 --- /dev/null +++ b/packages/client/src/websocket/client.ts @@ -0,0 +1,192 @@ +// WebSocket 消息接口 + +import { WSMessage, WSMessageParams, MessageType } from './type'; + +export class WebSocketClient { + private ws: WebSocket | null = null; + private url: string; + private currentRoom: string | null = null; + private messageHandler: ((message: WSMessage) => void) | null = null; + private connectionPromise: Promise | null = null; + + constructor(url: string = 'ws://localhost:3000/ws') { + this.url = url; + } + + private async ensureConnected(): Promise { + if (this.ws?.readyState === WebSocket.OPEN) { + return; + } + + if (this.connectionPromise) { + return this.connectionPromise; + } + + this.connectionPromise = new Promise((resolve, reject) => { + this.disconnect(); // 确保先断开旧连接 + + this.ws = new WebSocket(this.url); + + this.ws.onopen = () => { + console.log('WebSocket 连接已建立'); + this.connectionPromise = null; + resolve(); + }; + + this.ws.onmessage = (event) => { + try { + const message = JSON.parse(event.data) as WSMessage; + + // 只处理系统消息、错误消息,或者当前房间的消息 + if ( + message.type === 'system' || + message.type === 'error' || + (message.roomId && message.roomId === this.currentRoom) + ) { + console.log('收到消息:', message); + // 触发消息处理器 + if (this.messageHandler) { + this.messageHandler(message); + } + } + } catch (error) { + console.error('解析消息失败:', error); + } + }; + + this.ws.onclose = () => { + console.log('WebSocket 连接已关闭'); + this.cleanup(); + reject(new Error('WebSocket connection closed')); + }; + + this.ws.onerror = (error) => { + console.error('WebSocket 错误:', error); + this.cleanup(); + reject(error); + }; + + // 5秒超时 + setTimeout(() => { + if (this.ws?.readyState !== WebSocket.OPEN) { + this.cleanup(); + reject(new Error('WebSocket connection timeout')); + } + }, 5000); + }); + + return this.connectionPromise; + } + + async connect() { + return this.ensureConnected(); + } + + private cleanup() { + if (this.ws) { + this.ws.onclose = null; + this.ws.onerror = null; + this.ws.onmessage = null; + this.ws.onopen = null; + this.ws = null; + } + this.currentRoom = null; + this.messageHandler = null; + this.connectionPromise = null; + } + + // 设置消息处理器 + onMessage(handler: (message: WSMessage) => void) { + this.messageHandler = handler; + return () => { + if (this.messageHandler === handler) { + this.messageHandler = null; + } + }; + } + + async joinRoom(roomId: string) { + try { + await this.ensureConnected(); + + // 如果已经在一个房间中,先离开 + if (this.currentRoom) { + await this.leaveRoom(); + } + + const message: WSMessage = { + type: 'join', + roomId, + data: { + text: `加入房间 ${roomId}`, + type: MessageType.TEXT, + }, + }; + this.ws!.send(JSON.stringify(message)); + this.currentRoom = roomId; + } catch (error) { + console.error('加入房间失败:', error); + throw error; + } + } + + async leaveRoom() { + if (this.ws?.readyState === WebSocket.OPEN && this.currentRoom) { + const message: WSMessage = { + type: 'leave', + roomId: this.currentRoom, + data: { + text: `离开房间 ${this.currentRoom}`, + type: MessageType.TEXT, + }, + }; + this.ws.send(JSON.stringify(message)); + this.currentRoom = null; + } + } + + async sendMessage(messageParams: WSMessageParams) { + try { + await this.ensureConnected(); + + if (!this.currentRoom) { + throw new Error('请先加入房间'); + } + + const messageObj: WSMessage = { + type: 'message', + roomId: this.currentRoom, + data: { + text: messageParams.text || '', + type: messageParams.type || MessageType.TEXT, + fileUri: messageParams.fileUri, + }, + }; + this.ws!.send(JSON.stringify(messageObj)); + } catch (error) { + console.error('发送消息失败:', error); + throw error; + } + } + + disconnect() { + if (this.currentRoom) { + this.leaveRoom(); + } + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + this.ws.close(); + } + this.cleanup(); + } + + getCurrentRoom() { + return this.currentRoom; + } + + isConnected() { + return this.ws?.readyState === WebSocket.OPEN; + } +} + +// 导出一个默认的 WebSocket 客户端实例 +export const wsClient = new WebSocketClient(); diff --git a/packages/client/src/websocket/index.ts b/packages/client/src/websocket/index.ts new file mode 100644 index 0000000..9fe5e08 --- /dev/null +++ b/packages/client/src/websocket/index.ts @@ -0,0 +1,3 @@ +export * from './client'; +export * from './useWebSocket'; +export * from './type'; diff --git a/packages/client/src/websocket/type.ts b/packages/client/src/websocket/type.ts new file mode 100644 index 0000000..a8e29b0 --- /dev/null +++ b/packages/client/src/websocket/type.ts @@ -0,0 +1,18 @@ +export enum MessageType { + TEXT = 'text', + IMAGE = 'image', + FILE = 'file', + RETRACT = 'retract', // 用于实现撤回 +} + +export interface WSMessageParams { + text?: string; + type?: MessageType; + fileUri?: string; +} + +export interface WSMessage { + type: 'join' | 'leave' | 'message' | 'system' | 'error'; + roomId?: string; + data: WSMessageParams; +} diff --git a/packages/client/src/websocket/useWebSocket.ts b/packages/client/src/websocket/useWebSocket.ts new file mode 100644 index 0000000..2b8ad2c --- /dev/null +++ b/packages/client/src/websocket/useWebSocket.ts @@ -0,0 +1,189 @@ +import { useCallback, useEffect, useState } from 'react'; +import { wsClient } from './client'; +import { WSMessage, WSMessageParams, MessageType } from './type'; + +interface UseWebSocketReturn { + // 状态 + messages: WSMessage[]; + currentRoom: string | null; + connecting: boolean; + + // 方法 + joinRoom: (roomId: string) => Promise; + leaveRoom: () => Promise; + sendMessage: (messageParams: WSMessageParams) => Promise; + + // 工具方法 + clearMessages: () => void; +} + +export const useWebSocket = (): UseWebSocketReturn => { + const [messages, setMessages] = useState([]); + const [currentRoom, setCurrentRoom] = useState(null); + const [connecting, setConnecting] = useState(false); + + // 消息处理器 + const messageHandler = useCallback((message: WSMessage) => { + setMessages((prev) => [...prev, message]); + }, []); + + // 初始化 WebSocket 连接 + useEffect(() => { + const initConnection = async () => { + try { + setConnecting(true); + await wsClient.connect(); + const unsubscribe = wsClient.onMessage(messageHandler); + return unsubscribe; + } catch (error) { + console.error('连接失败:', error); + setMessages((prev) => [ + ...prev, + { + type: 'error', + data: { + text: '连接失败,请刷新页面重试', + type: MessageType.TEXT, + }, + }, + ]); + } finally { + setConnecting(false); + } + }; + + const unsubscribePromise = initConnection(); + + return () => { + unsubscribePromise.then((unsubscribe) => unsubscribe?.()); + wsClient.disconnect(); + }; + }, [messageHandler]); + + // 加入房间 + const joinRoom = async (roomId: string): Promise => { + // 验证房间ID + if (!roomId.trim()) { + setMessages((prev) => [ + ...prev, + { + type: 'error', + data: { + text: '请输入有效的房间ID', + type: MessageType.TEXT, + }, + }, + ]); + return false; + } + + // 检查是否正在连接 + if (connecting) { + return false; + } + + try { + setConnecting(true); + await wsClient.joinRoom(roomId.trim()); + setCurrentRoom(roomId.trim()); + setMessages([]); // 清空消息历史 + return true; + } catch (error) { + console.error('加入房间失败:', error); + setMessages((prev) => [ + ...prev, + { + type: 'error', + data: { + text: '加入房间失败,请重试', + type: MessageType.TEXT, + }, + }, + ]); + return false; + } finally { + setConnecting(false); + } + }; + + // 离开房间 + const leaveRoom = async (): Promise => { + if (connecting) { + return false; + } + + try { + await wsClient.disconnect(); + setCurrentRoom(null); + setMessages([]); // 清空消息历史 + return true; + } catch (error) { + console.error('离开房间失败:', error); + return false; + } + }; + + // 发送消息 + const sendMessage = async (messageParams: WSMessageParams): Promise => { + // 验证消息内容 + if (!messageParams.text?.trim() && !messageParams.fileUri) { + return false; + } + + // 检查房间状态 + if (!currentRoom) { + setMessages((prev) => [ + ...prev, + { + type: 'error', + data: { + text: '请先加入房间', + type: MessageType.TEXT, + }, + }, + ]); + return false; + } + + // 检查连接状态 + if (connecting) { + return false; + } + + try { + await wsClient.sendMessage(messageParams); + return true; + } catch (error) { + console.error('发送消息失败:', error); + setMessages((prev) => [ + ...prev, + { + type: 'error', + data: { + text: '发送消息失败,请重试', + type: MessageType.TEXT, + }, + }, + ]); + return false; + } + }; + + // 清空消息 + const clearMessages = useCallback(() => { + setMessages([]); + }, []); + + return { + // 状态 + messages, + currentRoom, + connecting, + // 方法 + joinRoom, + leaveRoom, + sendMessage, + // 工具方法 + clearMessages, + }; +}; diff --git a/packages/db/.env.example b/packages/db/.env.example index 8183f56..72f3759 100755 --- a/packages/db/.env.example +++ b/packages/db/.env.example @@ -1 +1 @@ -DATABASE_URL="postgresql://root:Letusdoit000@localhost:5432/app?schema=public" \ No newline at end of file +DATABASE_URL="postgresql://nice:Letusdoit123@localhost:5432/db?schema=public" \ No newline at end of file