gerson
2025-01-21 f2d4e9aa00543101ebb189ba634f337ef9eb9199
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
import type { AxiosRequestConfig } from 'axios';
import { accessSessionKey } from '../request';
import { SESSION_KEY } from '../request';
import { Local } from '../storage';
import { debounce } from 'lodash-es';
import { Logger } from '/@/model/logger/Logger';
 
export interface SSEOptions {
    /** 重试延迟(ms) */
    retryDelay?: number;
    /** 超时时间(ms) */
    timeout?: number;
    /** 是否自动重连 */
    autoReconnect?: boolean;
 
    /** 请求头 */
    headers?: Record<string, string>;
}
 
export interface SSEEventCallbacks {
    /** 消息回调 */
    onMessage?: (data: any) => void;
    /** 错误回调 */
    onError?: (error: Error) => void;
    /** 连接打开回调 */
    onOpen?: () => void;
    /** 连接关闭回调 */
    onClose?: () => void;
    /** 重试回调 */
    onRetry?: () => void;
}
 
export type MessageHandler = (data: any) => void;
 
export class SSEClient {
    private eventSource: EventSource | null = null;
    private reconnectTimeout: number | null = null;
    private abortController: AbortController | null = null;
    private messageHandlers: Set<MessageHandler> = new Set();
 
    constructor(private url: string, private options: SSEOptions = {}, private callbacks: SSEEventCallbacks = {}) {
        // 设置默认值
        this.options = {
            retryDelay: 1000,
            autoReconnect: true,
            timeout: 3000,
            headers: {
                [SESSION_KEY]: Local.get(accessSessionKey),
            },
            ...options,
        };
    }
 
    /**
     * 订阅消息
     */
    subscribe(handler: MessageHandler): void {
        this.messageHandlers.add(handler);
    }
 
    /**
     * 取消订阅消息
     */
    unsubscribe(handler: MessageHandler): void {
        this.messageHandlers.delete(handler);
    }
 
    /**
     * 建立连接
     */
    async connect(params?: Record<string, any>): Promise<void> {
        try {
            // 构建 URL 和参数
            const queryString = params && Object.values(params).length ? `?${new URLSearchParams(params).toString()}` : '';
            const fullUrl = `${this.url}${queryString}`;
 
            // 创建 AbortController 用于超时控制
            // this.abortController = new AbortController();
            // 创建 EventSource 并添加 headers
            this.eventSource = new EventSource(fullUrl);
 
            // 绑定事件处理
            this.bindEvents();
        } catch (error) {
            console.log('catch error');
        }
    }
 
    /**
     * 绑定事件处理
     */
    private bindEvents(): void {
        if (!this.eventSource) return;
 
        this.eventSource.onopen = () => {
            Logger.info('onopen:连接成功');
            this.callbacks.onOpen?.();
        };
 
        this.eventSource.onmessage = (event) => {
            Logger.info('onmessage:\n\n'+ event.data);
            try {
                const data = JSON.parse(event.data);
                // 检查是否是结束标记
                if (data === '[DONE]') {
                    this.disconnect();
                    return;
                }
                // 通知所有订阅者
                this.messageHandlers.forEach(handler => handler(data));
                // 调用原有的回调
                this.callbacks.onMessage?.(data);
            } catch (error) {
                console.error('Failed to parse SSE data:', error);
                this.callbacks.onError?.(new Error('Failed to parse SSE data'));
            }
        };
 
        this.eventSource.onerror = async (error) => {
            Logger.error('onerror:\n\n'+ error);
        };
    }
 
    /**
     * 错误处理
     */
    private async handleError(error: any): Promise<void> {
        this.callbacks.onError?.(error);
        if (this.options.autoReconnect) {
            this.callbacks.onRetry?.();
            // 延迟重试
            await new Promise((resolve) => {
                this.reconnectTimeout = setTimeout(resolve, this.options.retryDelay);
            });
            await this.connect();
        } else {
            this.disconnect();
        }
    }
 
    /**
     * 断开连接
     */
    disconnect(): void {
        if (this.reconnectTimeout) {
            clearTimeout(this.reconnectTimeout);
            this.reconnectTimeout = null;
        }
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
 
        // if (this.abortController) {
        //     this.abortController.abort();
        //     this.abortController = null;
        // }
 
        this.callbacks.onClose?.();
    }
 
    /**
     * 是否已连接
     */
    isConnected(): boolean {
        return this.eventSource?.readyState === EventSource.OPEN;
    }
 
    /**
     * 重新连接
     */
    async reconnect(): Promise<void> {
        this.disconnect();
        await this.connect();
    }
}