This commit is contained in:
ditiqi 2025-05-27 15:37:27 +08:00
parent a0b7f22a25
commit d4554cee03
12 changed files with 724 additions and 35 deletions

View File

@ -1,17 +1,20 @@
import { Hono } from 'hono' import { Hono } from 'hono';
import { logger } from 'hono/logger' import { logger } from 'hono/logger';
import { contextStorage, getContext } from 'hono/context-storage' import { contextStorage, getContext } from 'hono/context-storage';
import { prettyJSON } from 'hono/pretty-json' import { prettyJSON } from 'hono/pretty-json';
import { cors } from 'hono/cors' import { cors } from 'hono/cors';
import { trpcServer } from '@hono/trpc-server' import { trpcServer } from '@hono/trpc-server';
import Redis from 'ioredis';
import redis from './redis';
import minioClient from './minio';
import { Client } from 'minio';
import oidc from './oidc/provider';
import { appRouter } from './trpc';
import { createBunWebSocket } from 'hono/bun';
import { wsHandler, wsConfig } from './socket';
import Redis from 'ioredis'
import redis from './redis'
import minioClient from './minio'
import { Client } from 'minio'
import oidc from './oidc/provider'
import { appRouter } from './trpc'
type Env = { type Env = {
Variables: { Variables: {
redis: Redis; redis: Redis;
@ -21,10 +24,13 @@ type Env = {
const app = new Hono<Env>(); const app = new Hono<Env>();
app.use('*', cors({ app.use(
origin: 'http://localhost:3001', '*',
credentials: true, cors({
})) origin: 'http://localhost:3001',
credentials: true,
}),
);
app.use('*', async (c, next) => { app.use('*', async (c, next) => {
c.set('redis', redis); c.set('redis', redis);
@ -51,4 +57,14 @@ app.use('/oidc/*', async (c, next) => {
// return void 也可以 // return void 也可以
return; return;
}); });
// 添加 WebSocket 路由
app.get('/ws', wsHandler);
const bunServerConfig = {
port: 3000,
fetch: app.fetch,
...wsConfig,
};
// 启动 Bun 服务器
Bun.serve(bunServerConfig);
export default app; export default app;

122
apps/backend/src/socket.ts Normal file
View File

@ -0,0 +1,122 @@
import { createBunWebSocket } from 'hono/bun';
import type { ServerWebSocket } from 'bun';
import { WSContext } from 'hono/ws';
import { MessageType } from '../../../packages/client/src/websocket/type';
// 定义消息类型接口
interface WSMessage {
type: 'join' | 'leave' | 'message';
roomId: string;
data: any;
}
// 创建 WebSocket 实例并指定泛型类型
const { upgradeWebSocket, websocket } = createBunWebSocket<ServerWebSocket>();
// 存储房间和连接的映射关系
const rooms = new Map<string, Set<WSContext<ServerWebSocket>>>();
// WebSocket 处理器
const wsHandler = upgradeWebSocket((c) => {
// 存储当前连接加入的房间
const joinedRooms = new Set<string>();
return {
onOpen(_, ws) {
console.log('WebSocket 连接已建立');
},
onMessage(message, ws) {
try {
const parsedMessage: WSMessage = JSON.parse(message.data);
console.log('收到消息:', parsedMessage);
switch (parsedMessage.type) {
case 'join':
// 加入房间
if (!rooms.has(parsedMessage.roomId)) {
rooms.set(parsedMessage.roomId, new Set());
}
rooms.get(parsedMessage.roomId)?.add(ws);
joinedRooms.add(parsedMessage.roomId);
// 发送加入成功消息
ws.send(
JSON.stringify({
type: 'system',
data: {
text: `成功加入房间 ${parsedMessage.roomId}`,
type: MessageType.TEXT,
},
roomId: parsedMessage.roomId,
}),
);
break;
case 'leave':
// 离开房间
rooms.get(parsedMessage.roomId)?.delete(ws);
joinedRooms.delete(parsedMessage.roomId);
if (rooms.get(parsedMessage.roomId)?.size === 0) {
rooms.delete(parsedMessage.roomId);
}
break;
case 'message':
// 在指定房间内广播消息
const room = rooms.get(parsedMessage.roomId);
if (room) {
const messageToSend = {
type: 'message',
data: parsedMessage.data,
roomId: parsedMessage.roomId,
};
for (const conn of room) {
if (conn.readyState === WebSocket.OPEN) {
conn.send(JSON.stringify(messageToSend));
}
}
}
break;
}
} catch (error) {
console.error('处理消息时出错:', error);
ws.send(
JSON.stringify({
type: 'error',
data: {
text: '消息处理失败',
type: MessageType.TEXT,
},
}),
);
}
},
onClose(_, ws) {
console.log('WebSocket 连接已关闭');
// 清理所有加入的房间
for (const roomId of joinedRooms) {
rooms.get(roomId)?.delete(ws);
if (rooms.get(roomId)?.size === 0) {
rooms.delete(roomId);
}
}
},
onError(error, ws) {
console.error('WebSocket 错误:', error);
// 清理所有加入的房间
for (const roomId of joinedRooms) {
rooms.get(roomId)?.delete(ws);
if (rooms.get(roomId)?.size === 0) {
rooms.delete(roomId);
}
}
},
};
});
export const wsConfig = {
websocket,
};
export { wsHandler, rooms };

View File

@ -1 +1 @@
DATABASE_URL="postgresql://root:Letusdoit000@localhost:5432/app?schema=public" DATABASE_URL="postgresql://nice:Letusdoit123@localhost:5432/db?schema=public"

View File

@ -1,13 +1,161 @@
'use client'; 'use client';
import { useTRPC } from '@repo/client'; import { useHello, useTRPC, useWebSocket, MessageType } from '@repo/client';
import { useQuery } from '@tanstack/react-query'; import { useQuery } from '@tanstack/react-query';
import { useEffect } from 'react'; import { useRef, useState } from 'react';
export default function Home() { export default function Home() {
const trpc = useTRPC(); const trpc = useTRPC();
const { data, isLoading } = useQuery(trpc.user.getUser.queryOptions()); const { data, isLoading } = useQuery(trpc.user.getUser.queryOptions());
useEffect(() => { const [message, setMessage] = useState('');
console.log(data); const [roomId, setRoomId] = useState('');
}, [data]); const messagesEndRef = useRef<HTMLDivElement>(null);
return <div>123</div>;
// 使用 WebSocket hook
const { messages, currentRoom, connecting, joinRoom, leaveRoom, sendMessage } = useWebSocket();
// 滚动到底部
const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' });
setTimeout(scrollToBottom, 100);
};
const handleJoinRoom = async () => {
const success = await joinRoom(roomId.trim());
if (success) {
setRoomId('');
}
};
const handleLeaveRoom = async () => {
await leaveRoom();
};
const handleSendMessage = async () => {
const success = await sendMessage({
text: message,
type: MessageType.TEXT,
});
if (success) {
setMessage('');
}
};
// 处理按回车发送消息
const handleKeyPress = (e: React.KeyboardEvent) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
handleSendMessage();
}
};
return (
<div className="p-4">
<h1 className="text-2xl font-bold mb-4">WebSocket </h1>
{/* 房间管理 */}
<div className="mb-6 p-4 border rounded">
<h2 className="text-xl mb-2"></h2>
{!currentRoom ? (
<div className="flex gap-2">
<input
type="text"
value={roomId}
onChange={(e) => setRoomId(e.target.value)}
onKeyPress={handleKeyPress}
disabled={connecting}
className="border border-gray-300 rounded px-3 py-2 flex-1"
placeholder="输入房间ID..."
/>
<button
onClick={handleJoinRoom}
disabled={connecting}
className={`px-4 py-2 rounded text-white ${
connecting ? 'bg-gray-400 cursor-not-allowed' : 'bg-green-500 hover:bg-green-600'
}`}
>
{connecting ? '连接中...' : '加入房间'}
</button>
</div>
) : (
<div className="flex gap-2 items-center">
<span>: {currentRoom}</span>
<button
onClick={handleLeaveRoom}
disabled={connecting}
className={`px-4 py-2 rounded text-white ${
connecting ? 'bg-gray-400 cursor-not-allowed' : 'bg-red-500 hover:bg-red-600'
}`}
>
</button>
</div>
)}
</div>
{/* 消息显示区域 */}
{currentRoom && (
<div className="mb-4">
<div className="border rounded p-4 h-[400px] overflow-y-auto mb-4 bg-gray-50">
{messages.map((msg, index) => (
<div
key={index}
className={`mb-2 p-2 rounded ${
msg.type === 'system'
? 'bg-gray-200 text-gray-700'
: msg.type === 'error'
? 'bg-red-100 text-red-700'
: 'bg-blue-100 text-blue-700'
}`}
>
<div className="flex items-center gap-2">
{msg.data.type === MessageType.IMAGE && msg.data.fileUri && (
<img src={msg.data.fileUri} alt="图片消息" className="max-w-xs" />
)}
{msg.data.type === MessageType.FILE && msg.data.fileUri && (
<a
href={msg.data.fileUri}
target="_blank"
rel="noopener noreferrer"
className="text-blue-600 hover:underline"
>
</a>
)}
{msg.data.text && <div>{msg.data.text}</div>}
</div>
</div>
))}
<div ref={messagesEndRef} />
</div>
<div className="flex gap-2">
<input
type="text"
value={message}
onChange={(e) => setMessage(e.target.value)}
onKeyPress={handleKeyPress}
disabled={connecting}
className="border border-gray-300 rounded px-3 py-2 flex-1"
placeholder="输入消息..."
/>
<button
onClick={handleSendMessage}
disabled={connecting}
className={`px-4 py-2 rounded text-white ${
connecting ? 'bg-gray-400 cursor-not-allowed' : 'bg-blue-500 hover:bg-blue-600'
}`}
>
</button>
</div>
</div>
)}
{!currentRoom && (
<div>
<p className="text-gray-600">提示: 请先加入一个房间开始聊天</p>
<p className="text-gray-600">ID来测试房间内通信</p>
</div>
)}
</div>
);
} }

View File

@ -3,21 +3,18 @@ services:
image: postgres:${POSTGRES_VERSION:-16} image: postgres:${POSTGRES_VERSION:-16}
container_name: postgres container_name: postgres
ports: ports:
- "5432:5432" - '5432:5432'
environment: environment:
- POSTGRES_DB=${POSTGRES_DB:-nice_db} - POSTGRES_DB=${POSTGRES_DB:-nice_db}
- POSTGRES_USER=${POSTGRES_USER:-nice_user} - POSTGRES_USER=${POSTGRES_USER:-nice_user}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-nice_password} - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-nice_password}
volumes: volumes:
- type: volume - ./volumes/postgres:/var/lib/postgresql/data
source: postgres_data # - type: volume
target: /var/lib/postgresql/data # source: postgres_data
# target: /var/lib/postgresql/data
healthcheck: healthcheck:
test: test: ['CMD-SHELL', "sh -c 'pg_isready -U ${POSTGRES_USER:-nice_user} -d ${POSTGRES_DB:-nice_db}'"]
[
"CMD-SHELL",
"sh -c 'pg_isready -U ${POSTGRES_USER:-nice_user} -d ${POSTGRES_DB:-nice_db}'",
]
interval: 10s interval: 10s
timeout: 3s timeout: 3s
retries: 3 retries: 3

View File

@ -3,10 +3,12 @@ import { useTRPC } from '../trpc';
export function useHello() { export function useHello() {
const trpc = useTRPC(); const trpc = useTRPC();
trpc.user.getUser.queryOptions() // trpc.user.getUser.queryOptions();
const getUser = useQuery(trpc.user.getUser.queryOptions());
console.log(getUser);
// useQuery; // useQuery;
return { return {
getUser: getUser,
// hello: api.hello, // hello: api.hello,
}; };
} }

View File

@ -1 +1,3 @@
export * from './api'; export * from './api';
export * from './websocket';
export { MessageType } from './websocket/type';

View File

@ -0,0 +1,192 @@
// WebSocket 消息接口
import { WSMessage, WSMessageParams, MessageType } from './type';
export class WebSocketClient {
private ws: WebSocket | null = null;
private url: string;
private currentRoom: string | null = null;
private messageHandler: ((message: WSMessage) => void) | null = null;
private connectionPromise: Promise<void> | null = null;
constructor(url: string = 'ws://localhost:3000/ws') {
this.url = url;
}
private async ensureConnected(): Promise<void> {
if (this.ws?.readyState === WebSocket.OPEN) {
return;
}
if (this.connectionPromise) {
return this.connectionPromise;
}
this.connectionPromise = new Promise((resolve, reject) => {
this.disconnect(); // 确保先断开旧连接
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket 连接已建立');
this.connectionPromise = null;
resolve();
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data) as WSMessage;
// 只处理系统消息、错误消息,或者当前房间的消息
if (
message.type === 'system' ||
message.type === 'error' ||
(message.roomId && message.roomId === this.currentRoom)
) {
console.log('收到消息:', message);
// 触发消息处理器
if (this.messageHandler) {
this.messageHandler(message);
}
}
} catch (error) {
console.error('解析消息失败:', error);
}
};
this.ws.onclose = () => {
console.log('WebSocket 连接已关闭');
this.cleanup();
reject(new Error('WebSocket connection closed'));
};
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error);
this.cleanup();
reject(error);
};
// 5秒超时
setTimeout(() => {
if (this.ws?.readyState !== WebSocket.OPEN) {
this.cleanup();
reject(new Error('WebSocket connection timeout'));
}
}, 5000);
});
return this.connectionPromise;
}
async connect() {
return this.ensureConnected();
}
private cleanup() {
if (this.ws) {
this.ws.onclose = null;
this.ws.onerror = null;
this.ws.onmessage = null;
this.ws.onopen = null;
this.ws = null;
}
this.currentRoom = null;
this.messageHandler = null;
this.connectionPromise = null;
}
// 设置消息处理器
onMessage(handler: (message: WSMessage) => void) {
this.messageHandler = handler;
return () => {
if (this.messageHandler === handler) {
this.messageHandler = null;
}
};
}
async joinRoom(roomId: string) {
try {
await this.ensureConnected();
// 如果已经在一个房间中,先离开
if (this.currentRoom) {
await this.leaveRoom();
}
const message: WSMessage = {
type: 'join',
roomId,
data: {
text: `加入房间 ${roomId}`,
type: MessageType.TEXT,
},
};
this.ws!.send(JSON.stringify(message));
this.currentRoom = roomId;
} catch (error) {
console.error('加入房间失败:', error);
throw error;
}
}
async leaveRoom() {
if (this.ws?.readyState === WebSocket.OPEN && this.currentRoom) {
const message: WSMessage = {
type: 'leave',
roomId: this.currentRoom,
data: {
text: `离开房间 ${this.currentRoom}`,
type: MessageType.TEXT,
},
};
this.ws.send(JSON.stringify(message));
this.currentRoom = null;
}
}
async sendMessage(messageParams: WSMessageParams) {
try {
await this.ensureConnected();
if (!this.currentRoom) {
throw new Error('请先加入房间');
}
const messageObj: WSMessage = {
type: 'message',
roomId: this.currentRoom,
data: {
text: messageParams.text || '',
type: messageParams.type || MessageType.TEXT,
fileUri: messageParams.fileUri,
},
};
this.ws!.send(JSON.stringify(messageObj));
} catch (error) {
console.error('发送消息失败:', error);
throw error;
}
}
disconnect() {
if (this.currentRoom) {
this.leaveRoom();
}
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.close();
}
this.cleanup();
}
getCurrentRoom() {
return this.currentRoom;
}
isConnected() {
return this.ws?.readyState === WebSocket.OPEN;
}
}
// 导出一个默认的 WebSocket 客户端实例
export const wsClient = new WebSocketClient();

View File

@ -0,0 +1,3 @@
export * from './client';
export * from './useWebSocket';
export * from './type';

View File

@ -0,0 +1,18 @@
export enum MessageType {
TEXT = 'text',
IMAGE = 'image',
FILE = 'file',
RETRACT = 'retract', // 用于实现撤回
}
export interface WSMessageParams {
text?: string;
type?: MessageType;
fileUri?: string;
}
export interface WSMessage {
type: 'join' | 'leave' | 'message' | 'system' | 'error';
roomId?: string;
data: WSMessageParams;
}

View File

@ -0,0 +1,189 @@
import { useCallback, useEffect, useState } from 'react';
import { wsClient } from './client';
import { WSMessage, WSMessageParams, MessageType } from './type';
interface UseWebSocketReturn {
// 状态
messages: WSMessage[];
currentRoom: string | null;
connecting: boolean;
// 方法
joinRoom: (roomId: string) => Promise<boolean>;
leaveRoom: () => Promise<boolean>;
sendMessage: (messageParams: WSMessageParams) => Promise<boolean>;
// 工具方法
clearMessages: () => void;
}
export const useWebSocket = (): UseWebSocketReturn => {
const [messages, setMessages] = useState<WSMessage[]>([]);
const [currentRoom, setCurrentRoom] = useState<string | null>(null);
const [connecting, setConnecting] = useState(false);
// 消息处理器
const messageHandler = useCallback((message: WSMessage) => {
setMessages((prev) => [...prev, message]);
}, []);
// 初始化 WebSocket 连接
useEffect(() => {
const initConnection = async () => {
try {
setConnecting(true);
await wsClient.connect();
const unsubscribe = wsClient.onMessage(messageHandler);
return unsubscribe;
} catch (error) {
console.error('连接失败:', error);
setMessages((prev) => [
...prev,
{
type: 'error',
data: {
text: '连接失败,请刷新页面重试',
type: MessageType.TEXT,
},
},
]);
} finally {
setConnecting(false);
}
};
const unsubscribePromise = initConnection();
return () => {
unsubscribePromise.then((unsubscribe) => unsubscribe?.());
wsClient.disconnect();
};
}, [messageHandler]);
// 加入房间
const joinRoom = async (roomId: string): Promise<boolean> => {
// 验证房间ID
if (!roomId.trim()) {
setMessages((prev) => [
...prev,
{
type: 'error',
data: {
text: '请输入有效的房间ID',
type: MessageType.TEXT,
},
},
]);
return false;
}
// 检查是否正在连接
if (connecting) {
return false;
}
try {
setConnecting(true);
await wsClient.joinRoom(roomId.trim());
setCurrentRoom(roomId.trim());
setMessages([]); // 清空消息历史
return true;
} catch (error) {
console.error('加入房间失败:', error);
setMessages((prev) => [
...prev,
{
type: 'error',
data: {
text: '加入房间失败,请重试',
type: MessageType.TEXT,
},
},
]);
return false;
} finally {
setConnecting(false);
}
};
// 离开房间
const leaveRoom = async (): Promise<boolean> => {
if (connecting) {
return false;
}
try {
await wsClient.disconnect();
setCurrentRoom(null);
setMessages([]); // 清空消息历史
return true;
} catch (error) {
console.error('离开房间失败:', error);
return false;
}
};
// 发送消息
const sendMessage = async (messageParams: WSMessageParams): Promise<boolean> => {
// 验证消息内容
if (!messageParams.text?.trim() && !messageParams.fileUri) {
return false;
}
// 检查房间状态
if (!currentRoom) {
setMessages((prev) => [
...prev,
{
type: 'error',
data: {
text: '请先加入房间',
type: MessageType.TEXT,
},
},
]);
return false;
}
// 检查连接状态
if (connecting) {
return false;
}
try {
await wsClient.sendMessage(messageParams);
return true;
} catch (error) {
console.error('发送消息失败:', error);
setMessages((prev) => [
...prev,
{
type: 'error',
data: {
text: '发送消息失败,请重试',
type: MessageType.TEXT,
},
},
]);
return false;
}
};
// 清空消息
const clearMessages = useCallback(() => {
setMessages([]);
}, []);
return {
// 状态
messages,
currentRoom,
connecting,
// 方法
joinRoom,
leaveRoom,
sendMessage,
// 工具方法
clearMessages,
};
};

View File

@ -1 +1 @@
DATABASE_URL="postgresql://root:Letusdoit000@localhost:5432/app?schema=public" DATABASE_URL="postgresql://nice:Letusdoit123@localhost:5432/db?schema=public"