/** * WebSocket工具类 * 支持STOMP协议和uni.connectSocket API */ // uni-app 类型声明 declare const uni: { connectSocket: (options: any) => any getStorageSync: (key: string) => any } interface StompFrame { command: string headers: Record body: string } interface SubscriptionCallback { (message: any): void } export class WebSocketClient { private socketTask: any | null = null private connected: boolean = false private subscriptions: Map = new Map() private messageQueue: string[] = [] private heartbeatTimer: number | null = null private reconnectTimer: number | null = null private reconnectAttempts: number = 0 private maxReconnectAttempts: number = 5 private url: string = '' private token: string = '' constructor() {} /** * 连接WebSocket */ async connect(url: string, token: string): Promise { // 如果已经连接到相同的URL,直接返回 if (this.connected && this.url === url) { console.log('[WebSocket] 已连接到相同URL,跳过') return Promise.resolve() } // 如果有旧连接,先关闭并等待关闭完成 if (this.socketTask) { console.log('[WebSocket] 关闭旧连接') await new Promise((resolveClose) => { try { this.socketTask.close({ success: () => { console.log('[WebSocket] 旧连接已关闭') this.socketTask = null this.connected = false resolveClose() }, fail: () => { console.warn('[WebSocket] 关闭旧连接失败') this.socketTask = null this.connected = false resolveClose() } }) } catch (e) { console.warn('[WebSocket] 关闭旧连接异常:', e) this.socketTask = null this.connected = false resolveClose() } }) // 等待一小段时间确保旧连接完全关闭 await new Promise(resolve => setTimeout(resolve, 100)) } this.url = url this.token = token console.log('[WebSocket] 开始连接:', url) return new Promise((resolve, reject) => { this.socketTask = uni.connectSocket({ url: url, success: () => { console.log('[WebSocket] 连接请求已发送') }, fail: (err: any) => { console.error('[WebSocket] 连接失败:', err) reject(err) } }) if (!this.socketTask) { reject(new Error('创建WebSocket失败')) return } // 监听打开 this.socketTask.onOpen(() => { console.log('[WebSocket] 连接已建立') this.connected = true this.reconnectAttempts = 0 // 发送STOMP CONNECT帧 this.sendStompFrame({ command: 'CONNECT', headers: { 'accept-version': '1.2', 'heart-beat': '10000,10000', 'Authorization': `Bearer ${this.token}` }, body: '' }) // 启动心跳 this.startHeartbeat() resolve() }) // 监听消息 this.socketTask.onMessage((res: any) => { const data = res.data as string this.handleMessage(data) }) // 监听关闭 this.socketTask.onClose(() => { console.log('[WebSocket] 连接已关闭') this.connected = false this.stopHeartbeat() this.handleReconnect() }) // 监听错误 this.socketTask.onError((err: any) => { console.error('[WebSocket] 连接错误:', err) this.connected = false }) }) } /** * 断开连接 */ disconnect() { console.log('[WebSocket] 主动断开连接') this.stopHeartbeat() this.clearReconnectTimer() this.reconnectAttempts = this.maxReconnectAttempts // 阻止自动重连 if (this.socketTask) { this.socketTask.close({ success: () => { console.log('[WebSocket] 断开成功') } }) this.socketTask = null } this.connected = false this.subscriptions.clear() this.messageQueue = [] } /** * 订阅主题 */ subscribe(destination: string, callback: SubscriptionCallback): string { const id = `sub-${Date.now()}-${Math.random()}` console.log('[WebSocket] 订阅主题:', destination, 'id:', id) this.subscriptions.set(destination, callback) if (this.connected) { this.sendStompFrame({ command: 'SUBSCRIBE', headers: { 'id': id, 'destination': destination }, body: '' }) } else { console.warn('[WebSocket] 未连接,订阅已加入队列') } return id } /** * 取消订阅 */ unsubscribe(destination: string) { console.log('[WebSocket] 取消订阅:', destination) this.subscriptions.delete(destination) if (this.connected) { this.sendStompFrame({ command: 'UNSUBSCRIBE', headers: { 'destination': destination }, body: '' }) } } /** * 发送STOMP帧 */ private sendStompFrame(frame: StompFrame) { let message = frame.command + '\n' for (const key in frame.headers) { message += `${key}:${frame.headers[key]}\n` } message += '\n' + frame.body + '\x00' if (this.connected && this.socketTask) { this.socketTask.send({ data: message, success: () => { console.log('[WebSocket] 发送成功:', frame.command) }, fail: (err: any) => { console.error('[WebSocket] 发送失败:', err) } }) } else { console.warn('[WebSocket] 未连接,消息已加入队列') this.messageQueue.push(message) } } /** * 处理接收到的消息 */ private handleMessage(data: string) { console.log('[WebSocket] 收到消息:', data.substring(0, 200)) const frame = this.parseStompFrame(data) if (frame.command === 'CONNECTED') { console.log('[WebSocket] STOMP连接成功') // 处理队列中的订阅 this.subscriptions.forEach((callback, destination) => { const id = `sub-${Date.now()}-${Math.random()}` this.sendStompFrame({ command: 'SUBSCRIBE', headers: { 'id': id, 'destination': destination }, body: '' }) }) // 发送队列中的消息 while (this.messageQueue.length > 0) { const msg = this.messageQueue.shift() if (msg && this.socketTask) { this.socketTask.send({ data: msg }) } } } else if (frame.command === 'MESSAGE') { const destination = frame.headers['destination'] const callback = this.subscriptions.get(destination) if (callback) { try { const message = JSON.parse(frame.body) callback(message) } catch (e) { console.error('[WebSocket] 解析消息失败:', e) } } } else if (frame.command === 'ERROR') { console.error('[WebSocket] 服务器错误:', frame.body) } } /** * 解析STOMP帧 */ private parseStompFrame(data: string): StompFrame { const lines = data.split('\n') const command = lines[0] const headers: Record = {} let bodyStart = 0 for (let i = 1; i < lines.length; i++) { const line = lines[i] if (line === '') { bodyStart = i + 1 break } const colonIndex = line.indexOf(':') if (colonIndex > 0) { const key = line.substring(0, colonIndex) const value = line.substring(colonIndex + 1) headers[key] = value } } const body = lines.slice(bodyStart).join('\n').replace(/\x00$/, '') return { command, headers, body } } /** * 启动心跳 */ private startHeartbeat() { this.stopHeartbeat() this.heartbeatTimer = setInterval(() => { if (this.connected && this.socketTask) { this.socketTask.send({ data: '\n', fail: () => { console.warn('[WebSocket] 心跳发送失败') } }) } }, 10000) as unknown as number } /** * 停止心跳 */ private stopHeartbeat() { if (this.heartbeatTimer) { clearInterval(this.heartbeatTimer) this.heartbeatTimer = null } } /** * 处理重连 */ private handleReconnect() { if (this.reconnectAttempts >= this.maxReconnectAttempts) { console.log('[WebSocket] 达到最大重连次数,停止重连') return } this.clearReconnectTimer() const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000) console.log(`[WebSocket] ${delay}ms后尝试重连 (${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`) this.reconnectTimer = setTimeout(() => { this.reconnectAttempts++ this.connect(this.url, this.token).catch((err: any) => { console.error('[WebSocket] 重连失败:', err) }) }, delay) as unknown as number } /** * 清除重连定时器 */ private clearReconnectTimer() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer) this.reconnectTimer = null } } /** * 检查连接状态 */ isConnected(): boolean { return this.connected } } // 导出单例 export const wsClient = new WebSocketClient()