gerson
2025-01-21 c9b6d9a10160ca1d18237a0728bac87ec8ff16da
src/components/chat/hooks/useSyncMsg.ts
@@ -1,85 +1,103 @@
import { onMounted, onUnmounted, type Ref } from 'vue';
import type { ChatMessage } from '../model/types';
import { RoleEnum } from '../model/types';
import { MAIN_URL } from '/@/constants';
import { SSEClient } from '/@/utils/sse/SSEClient';
import { reverse } from 'lodash-es';
import { differenceBy } from 'lodash-es';
import moment from 'moment';
import { onActivated, onDeactivated, unref, type Ref } from 'vue';
import { RoleEnum, type ChatMessage } from '../model/types';
import { QueryHistoryDetail } from '/@/api/ai/chat';
import { sseClient } from '/@/stores/global';
import { Logger } from '/@/model/logger/Logger';
import { ElNotification } from 'element-plus';
type UseSyncMsgOptions = {
   updateLoadIndex: (addCount: number) => void;
   msgList: Ref<ChatMessage[]>;
   historyGroupId: string | Ref<string>;
   checkCanSync: (data: any) => boolean;
   loadReplyData: (data: any) => Promise<ChatMessage[]>;
   scrollToBottom: () => void;
   showTip: (data: any) => void;
};
export const useSyncMsg = (options: UseSyncMsgOptions) => {
   const { updateLoadIndex, msgList } = options;
   // 创建实例
   const sseClient = new SSEClient(
      `${MAIN_URL}chat/connect_broadcast_chat`,
   const { updateLoadIndex, msgList, historyGroupId, checkCanSync, loadReplyData, scrollToBottom, showTip } = options;
      {},
      {
         onMessage: (data) => {
            console.log("🚀 ~ data:\n", data)
            return;
            const recentIds = reverse([
               { id: 'a1b2c3d4', time: '2024-03-27 15:42:33' },
               { id: 'e5f6g7h8', time: '2024-02-15 09:23:45' },
               { id: 'i9j0k1l2', time: '2024-05-08 14:37:21' },
               { id: 'm3n4o5p6', time: '2024-01-30 11:55:16' },
               { id: 'q7r8s9t0', time: '2024-07-12 16:48:59' },
               { id: 'u1v2w3x4', time: '2024-04-03 10:15:27' },
               { id: 'y5z6a7b8', time: '2024-06-21 13:29:44' },
               { id: 'c9d0e1f2', time: '2024-08-09 17:52:38' },
               { id: 'g3h4i5j6', time: '2024-09-14 12:33:51' },
               { id: 'k7l8m9n0', time: '2024-10-25 08:19:07' },
            ]);
            // const userHistoryIds = reverse(msgList.value.filter((item) => item.role === RoleEnum.user).map((item) => item.historyId));
            const userHistoryIds = reverse([
               { id: 'a1b2c3d4', time: '2024-03-27 15:42:33' },
               { id: 'e5f6g7h8', time: '2024-02-15 09:23:45' },
               // {id: 'i9j0k1l2', time: '2024-05-08 14:37:21'},
               // {id: 'm3n4o5p6', time: '2024-01-30 11:55:16'},
               { id: 'q7r8s9t0', time: '2024-07-12 16:48:59' },
               // {id: 'u1v2w3x4', time: '2024-04-03 10:15:27'},
               { id: 'y5z6a7b8', time: '2024-06-21 13:29:44' },
               { id: 'c9d0e1f2', time: '2024-08-09 17:52:38' },
               { id: 'g3h4i5j6', time: '2024-09-14 12:33:51' },
               // {id: 'k7l8m9n0', time: '2024-10-25 08:19:07'},
            ]);
            // 获取未同步的消息
            const unsyncedMessages = findUnsyncedMessages(recentIds, userHistoryIds);
            console.log('未同步的消息:', unsyncedMessages);
         },
         onError: (error) => {
            console.error('SSE error:', error);
         },
         onOpen: () => {
            console.log('SSE connection opened');
         },
         onClose: () => {
            console.log('SSE connection closed');
         },
         onRetry: (retryCount) => {
            console.log(`Retrying connection (${retryCount})`);
         },
   const insertSyncMsg = (replayData: any[]) => {
      const insertResult: { index: number; item: any }[] = [];
      for (let i = replayData.length - 1; i >= 0; i--) {
         const insertItem = replayData[i];
         if (insertItem.role === RoleEnum.assistant) continue;
         for (let j = msgList.value.length - 1; j >= 0; j--) {
            const currentItem = msgList.value[j];
            if (currentItem.role === RoleEnum.assistant) continue;
            if (moment(insertItem.createTime).isAfter(currentItem.createTime)) {
               const insertAssistantItem = replayData[i + 1];
               insertResult.push({
                  index: j + 2,
                  item: [insertItem, insertAssistantItem],
               });
               break;
            }
         }
      }
   );
   sseClient.connect({});
      insertResult.forEach((resultItem) => {
         msgList.value.splice(resultItem.index, 0, ...resultItem.item);
      });
   };
   // onMounted(() => {
   //    sseClient.connect({});
   // });
   const historyUpdate = async (data: any) => {
      if (!checkCanSync(data)) return;
      if (!data) return;
      Logger.info('sync message notification:\n\n' + JSON.stringify(data));
      if (data?.type === 'chat_start') {
         const groupId = unref(historyGroupId);
         const startGroupId = data?.history_group_id;
         if (groupId !== startGroupId) return;
         showTip(data);
      }
      if (data?.type === 'chat_history_id') {
         const groupId = unref(historyGroupId);
         if (!groupId) return;
         const recentIds = data.id_list ?? [];
         const recentGroupHistoryIds = recentIds.filter((item) => item.group_id === groupId);
         if (recentGroupHistoryIds.length === 0) return;
         const userHistoryIds = msgList.value
            .filter((item) => item.role === RoleEnum.user)
            .map((item) => ({ history_id: item.historyId, time: item.createTime }));
         const tmpUnSyncedHistoryIds = differenceBy(recentGroupHistoryIds, userHistoryIds, 'history_id') as any[];
         const latestUserHistory = userHistoryIds[userHistoryIds.length - 1];
         let unSyncedHistoryIds = tmpUnSyncedHistoryIds;
         // 太晚的不需要更新
         if (latestUserHistory) {
            unSyncedHistoryIds = tmpUnSyncedHistoryIds.filter((item) => moment(item.time).isAfter(latestUserHistory.time));
         }
         if (!unSyncedHistoryIds || unSyncedHistoryIds.length === 0) return;
         const res = await QueryHistoryDetail({
            history_group_id: groupId,
            id_list: unSyncedHistoryIds.map((item) => item.history_id).join(','),
         });
         if (!checkCanSync(data)) return;
         const result: ChatMessage[] = res.details ?? [];
         if (!result || result.length === 0) return;
         const replayData = await loadReplyData(res.details);
         if (!checkCanSync(data)) return;
         insertSyncMsg(replayData);
         updateLoadIndex(unSyncedHistoryIds.length);
         scrollToBottom();
      }
   };
   onActivated(() => {
      sseClient?.subscribe(historyUpdate);
   });
   onDeactivated(() => {
      sseClient?.unsubscribe(historyUpdate);
   });
   // onUnmounted(() => {
   //    sseClient.disconnect();
   // });
   return {
      reconnect: () => sseClient.reconnect(),
      disconnect: () => sseClient.disconnect(),
      isConnected: () => sseClient.isConnected(),
   };
};