gerson
2025-01-21 c9071fc1d8ae01496f4715adda6989a71b503a3d
src/utils/sse/SSEClient.ts
@@ -1,9 +1,10 @@
import type { AxiosRequestConfig } from 'axios';
import { EventSourcePolyfill } from 'event-source-polyfill';
import { accessSessionKey } from '../request';
import { SESSION_KEY } from '../request';
import { Local } from '../storage';
import { debounce } from 'lodash-es';
import { Logger } from '/@/model/logger/Logger';
export interface SSEOptions {
   /** 重试延迟(ms) */
   retryDelay?: number;
@@ -29,22 +30,39 @@
   onRetry?: () => void;
}
export type MessageHandler = (data: any) => void;
export class SSEClient {
   private eventSource: EventSource | null = null;
   private reconnectTimeout: number | null = null;
   private abortController: AbortController | null = null;
   private messageHandlers: Set<MessageHandler> = new Set();
   constructor(private url: string, private options: SSEOptions = {}, private callbacks: SSEEventCallbacks = {}) {
      // 设置默认值
      this.options = {
         retryDelay: 1000,
         autoReconnect: true,
         timeout: 24 * 60 * 60 * 1000,
         timeout: 3000,
         headers: {
            [SESSION_KEY]: Local.get(accessSessionKey),
         },
         ...options,
      };
   }
   /**
    * 订阅消息
    */
   subscribe(handler: MessageHandler): void {
      this.messageHandlers.add(handler);
   }
   /**
    * 取消订阅消息
    */
   unsubscribe(handler: MessageHandler): void {
      this.messageHandlers.delete(handler);
   }
   /**
@@ -58,19 +76,13 @@
         // 创建 AbortController 用于超时控制
         // this.abortController = new AbortController();
         // 创建 EventSource 并添加 headers
         this.eventSource = new EventSourcePolyfill(fullUrl, {
            headers: this.options.headers,
            heartbeatTimeout: this.options.timeout,
         });
         this.eventSource = new EventSource(fullUrl);
         // 绑定事件处理
         this.bindEvents();
      } catch (error) {
         console.log('catch error');
         await this.debounceHandleError.call(this, error);
      }
   }
@@ -81,11 +93,12 @@
      if (!this.eventSource) return;
      this.eventSource.onopen = () => {
         console.log('连接成功');
         Logger.info('eventSource onopen:连接成功');
         this.callbacks.onOpen?.();
      };
      this.eventSource.onmessage = (event) => {
         Logger.info('eventSource onmessage:\n\n'+ event.data);
         try {
            const data = JSON.parse(event.data);
            // 检查是否是结束标记
@@ -93,6 +106,9 @@
               this.disconnect();
               return;
            }
            // 通知所有订阅者
            this.messageHandlers.forEach(handler => handler(data));
            // 调用原有的回调
            this.callbacks.onMessage?.(data);
         } catch (error) {
            console.error('Failed to parse SSE data:', error);
@@ -101,9 +117,7 @@
      };
      this.eventSource.onerror = async (error) => {
         console.log('on error');
         await this.debounceHandleError.call(this, error);
         Logger.error('eventSource onerror:\n\n'+ error);
      };
   }
@@ -111,7 +125,6 @@
    * 错误处理
    */
   private async handleError(error: any): Promise<void> {
      console.log('🚀 ~ error:', error);
      this.callbacks.onError?.(error);
      if (this.options.autoReconnect) {
         this.callbacks.onRetry?.();
@@ -124,10 +137,6 @@
         this.disconnect();
      }
   }
   private debounceHandleError = debounce((error: any) => {
      this.handleError(error);
   }, 500);
   /**
    * 断开连接