Files
2025-12-24 16:32:06 +08:00

383 lines
11 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/**
* WebSocket工具类
* 支持STOMP协议和uni.connectSocket API
*/
// uni-app 类型声明
declare const uni: {
connectSocket: (options: any) => any
getStorageSync: (key: string) => any
}
interface StompFrame {
command: string
headers: Record<string, string>
body: string
}
interface SubscriptionCallback {
(message: any): void
}
export class WebSocketClient {
private socketTask: any | null = null
private connected: boolean = false
private subscriptions: Map<string, SubscriptionCallback> = new Map()
private messageQueue: string[] = []
private heartbeatTimer: number | null = null
private reconnectTimer: number | null = null
private reconnectAttempts: number = 0
private maxReconnectAttempts: number = 5
private url: string = ''
private token: string = ''
constructor() {}
/**
* 连接WebSocket
*/
async connect(url: string, token: string): Promise<void> {
// 如果已经连接到相同的URL直接返回
if (this.connected && this.url === url) {
console.log('[WebSocket] 已连接到相同URL跳过')
return Promise.resolve()
}
// 如果有旧连接,先关闭并等待关闭完成
if (this.socketTask) {
console.log('[WebSocket] 关闭旧连接')
await new Promise<void>((resolveClose) => {
try {
this.socketTask.close({
success: () => {
console.log('[WebSocket] 旧连接已关闭')
this.socketTask = null
this.connected = false
resolveClose()
},
fail: () => {
console.warn('[WebSocket] 关闭旧连接失败')
this.socketTask = null
this.connected = false
resolveClose()
}
})
} catch (e) {
console.warn('[WebSocket] 关闭旧连接异常:', e)
this.socketTask = null
this.connected = false
resolveClose()
}
})
// 等待一小段时间确保旧连接完全关闭
await new Promise(resolve => setTimeout(resolve, 100))
}
this.url = url
this.token = token
console.log('[WebSocket] 开始连接:', url)
return new Promise((resolve, reject) => {
this.socketTask = uni.connectSocket({
url: url,
success: () => {
console.log('[WebSocket] 连接请求已发送')
},
fail: (err: any) => {
console.error('[WebSocket] 连接失败:', err)
reject(err)
}
})
if (!this.socketTask) {
reject(new Error('创建WebSocket失败'))
return
}
// 监听打开
this.socketTask.onOpen(() => {
console.log('[WebSocket] 连接已建立')
this.connected = true
this.reconnectAttempts = 0
// 发送STOMP CONNECT帧
this.sendStompFrame({
command: 'CONNECT',
headers: {
'accept-version': '1.2',
'heart-beat': '10000,10000',
'Authorization': `Bearer ${this.token}`
},
body: ''
})
// 启动心跳
this.startHeartbeat()
resolve()
})
// 监听消息
this.socketTask.onMessage((res: any) => {
const data = res.data as string
this.handleMessage(data)
})
// 监听关闭
this.socketTask.onClose(() => {
console.log('[WebSocket] 连接已关闭')
this.connected = false
this.stopHeartbeat()
this.handleReconnect()
})
// 监听错误
this.socketTask.onError((err: any) => {
console.error('[WebSocket] 连接错误:', err)
this.connected = false
})
})
}
/**
* 断开连接
*/
disconnect() {
console.log('[WebSocket] 主动断开连接')
this.stopHeartbeat()
this.clearReconnectTimer()
this.reconnectAttempts = this.maxReconnectAttempts // 阻止自动重连
if (this.socketTask) {
this.socketTask.close({
success: () => {
console.log('[WebSocket] 断开成功')
}
})
this.socketTask = null
}
this.connected = false
this.subscriptions.clear()
this.messageQueue = []
}
/**
* 订阅主题
*/
subscribe(destination: string, callback: SubscriptionCallback): string {
const id = `sub-${Date.now()}-${Math.random()}`
console.log('[WebSocket] 订阅主题:', destination, 'id:', id)
this.subscriptions.set(destination, callback)
if (this.connected) {
this.sendStompFrame({
command: 'SUBSCRIBE',
headers: {
'id': id,
'destination': destination
},
body: ''
})
} else {
console.warn('[WebSocket] 未连接,订阅已加入队列')
}
return id
}
/**
* 取消订阅
*/
unsubscribe(destination: string) {
console.log('[WebSocket] 取消订阅:', destination)
this.subscriptions.delete(destination)
if (this.connected) {
this.sendStompFrame({
command: 'UNSUBSCRIBE',
headers: {
'destination': destination
},
body: ''
})
}
}
/**
* 发送STOMP帧
*/
private sendStompFrame(frame: StompFrame) {
let message = frame.command + '\n'
for (const key in frame.headers) {
message += `${key}:${frame.headers[key]}\n`
}
message += '\n' + frame.body + '\x00'
if (this.connected && this.socketTask) {
this.socketTask.send({
data: message,
success: () => {
console.log('[WebSocket] 发送成功:', frame.command)
},
fail: (err: any) => {
console.error('[WebSocket] 发送失败:', err)
}
})
} else {
console.warn('[WebSocket] 未连接,消息已加入队列')
this.messageQueue.push(message)
}
}
/**
* 处理接收到的消息
*/
private handleMessage(data: string) {
console.log('[WebSocket] 收到消息:', data.substring(0, 200))
const frame = this.parseStompFrame(data)
if (frame.command === 'CONNECTED') {
console.log('[WebSocket] STOMP连接成功')
// 处理队列中的订阅
this.subscriptions.forEach((callback, destination) => {
const id = `sub-${Date.now()}-${Math.random()}`
this.sendStompFrame({
command: 'SUBSCRIBE',
headers: {
'id': id,
'destination': destination
},
body: ''
})
})
// 发送队列中的消息
while (this.messageQueue.length > 0) {
const msg = this.messageQueue.shift()
if (msg && this.socketTask) {
this.socketTask.send({ data: msg })
}
}
} else if (frame.command === 'MESSAGE') {
const destination = frame.headers['destination']
const callback = this.subscriptions.get(destination)
if (callback) {
try {
const message = JSON.parse(frame.body)
callback(message)
} catch (e) {
console.error('[WebSocket] 解析消息失败:', e)
}
}
} else if (frame.command === 'ERROR') {
console.error('[WebSocket] 服务器错误:', frame.body)
}
}
/**
* 解析STOMP帧
*/
private parseStompFrame(data: string): StompFrame {
const lines = data.split('\n')
const command = lines[0]
const headers: Record<string, string> = {}
let bodyStart = 0
for (let i = 1; i < lines.length; i++) {
const line = lines[i]
if (line === '') {
bodyStart = i + 1
break
}
const colonIndex = line.indexOf(':')
if (colonIndex > 0) {
const key = line.substring(0, colonIndex)
const value = line.substring(colonIndex + 1)
headers[key] = value
}
}
const body = lines.slice(bodyStart).join('\n').replace(/\x00$/, '')
return { command, headers, body }
}
/**
* 启动心跳
*/
private startHeartbeat() {
this.stopHeartbeat()
this.heartbeatTimer = setInterval(() => {
if (this.connected && this.socketTask) {
this.socketTask.send({
data: '\n',
fail: () => {
console.warn('[WebSocket] 心跳发送失败')
}
})
}
}, 10000) as unknown as number
}
/**
* 停止心跳
*/
private stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
/**
* 处理重连
*/
private handleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log('[WebSocket] 达到最大重连次数,停止重连')
return
}
this.clearReconnectTimer()
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000)
console.log(`[WebSocket] ${delay}ms后尝试重连 (${this.reconnectAttempts + 1}/${this.maxReconnectAttempts})`)
this.reconnectTimer = setTimeout(() => {
this.reconnectAttempts++
this.connect(this.url, this.token).catch((err: any) => {
console.error('[WebSocket] 重连失败:', err)
})
}, delay) as unknown as number
}
/**
* 清除重连定时器
*/
private clearReconnectTimer() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
}
/**
* 检查连接状态
*/
isConnected(): boolean {
return this.connected
}
}
// 导出单例
export const wsClient = new WebSocketClient()