From c9b6d9a10160ca1d18237a0728bac87ec8ff16da Mon Sep 17 00:00:00 2001 From: gerson <1405270578@qq.com> Date: 星期二, 21 一月 2025 16:16:43 +0800 Subject: [PATCH] 屏蔽非当前聊天室chat_start --- src/utils/sse/SSEClient.ts | 91 +++++++++++++++++++++++---------------------- 1 files changed, 46 insertions(+), 45 deletions(-) diff --git a/src/utils/sse/SSEClient.ts b/src/utils/sse/SSEClient.ts index 095a9ec..1a8cfe8 100644 --- a/src/utils/sse/SSEClient.ts +++ b/src/utils/sse/SSEClient.ts @@ -1,16 +1,20 @@ import type { AxiosRequestConfig } from 'axios'; +import { accessSessionKey } from '../request'; +import { SESSION_KEY } from '../request'; +import { Local } from '../storage'; +import { debounce } from 'lodash-es'; +import { Logger } from '/@/model/logger/Logger'; export interface SSEOptions { - /** 閲嶈瘯娆℃暟 */ - retries?: number; /** 閲嶈瘯寤惰繜(ms) */ retryDelay?: number; /** 瓒呮椂鏃堕棿(ms) */ timeout?: number; - /** 璇锋眰閰嶇疆 */ - // requestConfig?: AxiosRequestConfig; /** 鏄惁鑷姩閲嶈繛 */ autoReconnect?: boolean; + + /** 璇锋眰澶� */ + headers?: Record<string, string>; } export interface SSEEventCallbacks { @@ -23,61 +27,62 @@ /** 杩炴帴鍏抽棴鍥炶皟 */ onClose?: () => void; /** 閲嶈瘯鍥炶皟 */ - onRetry?: (retryCount: number) => void; + onRetry?: () => void; } + +export type MessageHandler = (data: any) => void; export class SSEClient { private eventSource: EventSource | null = null; - private retryCount = 0; - private isConnecting = false; private reconnectTimeout: number | null = null; private abortController: AbortController | null = null; + private messageHandlers: Set<MessageHandler> = new Set(); constructor(private url: string, private options: SSEOptions = {}, private callbacks: SSEEventCallbacks = {}) { // 璁剧疆榛樿鍊� this.options = { - retries: 3, retryDelay: 1000, - timeout: 30000, autoReconnect: true, + timeout: 3000, + headers: { + [SESSION_KEY]: Local.get(accessSessionKey), + }, ...options, }; + } + + /** + * 璁㈤槄娑堟伅 + */ + subscribe(handler: MessageHandler): void { + this.messageHandlers.add(handler); + } + + /** + * 鍙栨秷璁㈤槄娑堟伅 + */ + unsubscribe(handler: MessageHandler): void { + this.messageHandlers.delete(handler); } /** * 寤虹珛杩炴帴 */ async connect(params?: Record<string, any>): Promise<void> { - if (this.isConnecting) return; - this.isConnecting = true; - try { // 鏋勫缓 URL 鍜屽弬鏁� - const queryString = params ? `?${new URLSearchParams(params).toString()}` : ''; + const queryString = params && Object.values(params).length ? `?${new URLSearchParams(params).toString()}` : ''; const fullUrl = `${this.url}${queryString}`; // 鍒涘缓 AbortController 鐢ㄤ簬瓒呮椂鎺у埗 - this.abortController = new AbortController(); - - // 璁剧疆瓒呮椂 - const timeoutId = setTimeout(() => { - this.abortController?.abort(); - throw new Error('Connection timeout'); - }, this.options.timeout); - - // 鍒涘缓 EventSource + // this.abortController = new AbortController(); + // 鍒涘缓 EventSource 骞舵坊鍔� headers this.eventSource = new EventSource(fullUrl); - - // 娓呴櫎瓒呮椂 - clearTimeout(timeoutId); // 缁戝畾浜嬩欢澶勭悊 this.bindEvents(); - - this.isConnecting = false; } catch (error) { - this.isConnecting = false; - await this.handleError(error); + console.log('catch error'); } } @@ -88,10 +93,12 @@ if (!this.eventSource) return; this.eventSource.onopen = () => { + Logger.info('onopen锛氳繛鎺ユ垚鍔�'); this.callbacks.onOpen?.(); }; this.eventSource.onmessage = (event) => { + // Logger.info('onmessage锛歕n\n'+ event.data); try { const data = JSON.parse(event.data); // 妫�鏌ユ槸鍚︽槸缁撴潫鏍囪 @@ -99,6 +106,9 @@ this.disconnect(); return; } + // 閫氱煡鎵�鏈夎闃呰�� + this.messageHandlers.forEach(handler => handler(data)); + // 璋冪敤鍘熸湁鐨勫洖璋� this.callbacks.onMessage?.(data); } catch (error) { console.error('Failed to parse SSE data:', error); @@ -107,7 +117,7 @@ }; this.eventSource.onerror = async (error) => { - await this.handleError(error); + Logger.error('onerror锛歕n\n'+ error); }; } @@ -116,18 +126,12 @@ */ private async handleError(error: any): Promise<void> { this.callbacks.onError?.(error); - - if (this.options.autoReconnect && this.retryCount < (this.options.retries || 0)) { - // debugger; - this.retryCount++; - this.callbacks.onRetry?.(this.retryCount); - + if (this.options.autoReconnect) { + this.callbacks.onRetry?.(); // 寤惰繜閲嶈瘯 await new Promise((resolve) => { this.reconnectTimeout = setTimeout(resolve, this.options.retryDelay); }); - - // 閲嶆柊杩炴帴 await this.connect(); } else { this.disconnect(); @@ -142,20 +146,17 @@ clearTimeout(this.reconnectTimeout); this.reconnectTimeout = null; } - if (this.eventSource) { this.eventSource.close(); this.eventSource = null; } - if (this.abortController) { - this.abortController.abort(); - this.abortController = null; - } + // if (this.abortController) { + // this.abortController.abort(); + // this.abortController = null; + // } - this.isConnecting = false; this.callbacks.onClose?.(); - this.retryCount = 0; } /** -- Gitblit v1.9.3