From cd5d537aadbe302fc8b62f87edd8aea81021930e Mon Sep 17 00:00:00 2001 From: qfrjava <13402782+qfrjava@user.noreply.gitee.com> Date: 星期日, 27 四月 2025 13:38:22 +0800 Subject: [PATCH] feat(LightRAG): 新增 lightrag 服务管理接口并实现相关功能 --- JAVA/SMTAIServer/src/main/java/com/smtaiserver/smtaiserver/javaai/qwen/agent/SMTQwenAgentLightRAG.java | 172 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 172 insertions(+), 0 deletions(-) diff --git a/JAVA/SMTAIServer/src/main/java/com/smtaiserver/smtaiserver/javaai/qwen/agent/SMTQwenAgentLightRAG.java b/JAVA/SMTAIServer/src/main/java/com/smtaiserver/smtaiserver/javaai/qwen/agent/SMTQwenAgentLightRAG.java new file mode 100644 index 0000000..4907fd5 --- /dev/null +++ b/JAVA/SMTAIServer/src/main/java/com/smtaiserver/smtaiserver/javaai/qwen/agent/SMTQwenAgentLightRAG.java @@ -0,0 +1,172 @@ +package com.smtaiserver.smtaiserver.javaai.qwen.agent; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.smtaiserver.smtaiserver.core.SMTAIServerApp; +import com.smtaiserver.smtaiserver.core.SMTAIServerRequest; +import com.smtaiserver.smtaiserver.database.SMTDatabase; +import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecord; +import com.smtaiserver.smtaiserver.javaai.SMTJavaAIError; +import com.smtaiserver.smtaiserver.javaai.llm.core.SMTLLMConnect; +import com.smtservlet.util.Json; +import com.smtservlet.util.SMTJsonWriter; +import com.smtservlet.util.SMTStatic; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import javax.servlet.http.HttpServletResponse; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.dom4j.Document; +import org.dom4j.Element; +import org.dom4j.Node; + +public class SMTQwenAgentLightRAG extends SMTQwenAgent +{ + + public String port; + public static class SMTQueryServerInfo { + public String _serverId; + public String _alias; + public String _querySql; + + public SMTQueryServerInfo(Element element) { + _serverId = element.attributeValue("server_id"); + _alias = element.attributeValue("alias"); + + Element queryElement = element.element("QUERY_NAME"); + _querySql = queryElement.getText().trim(); + } + + public DBRecord queryServer(SMTDatabase db) throws Exception { + SMTDatabase.DBRecords recs = db.querySQL(_querySql, null); + if (recs.getRowCount() > 0) { + return recs.getRecord(0); + } else { + throw new Exception("鏈壘鍒� server_id=" + _serverId + " 鐨勮褰�"); + } + } + } + + + @Override + public void initInstance(DBRecord rec) throws Exception + { + super.initInstance(rec); + try { + SMTDatabase db = SMTAIServerApp.getApp().allocDatabase(); + try { + // 1. 鍔犺浇 XML + Document doc = SMTStatic.convStrToXmlDoc("<ROOT>" + rec.getString("clz_arguments") + "</ROOT>"); + + // 2. 瑙f瀽 SERVER_LIST + for (Node node : doc.selectNodes("ROOT/SERVER_LIST/SERVER_SQL")) { + SMTQueryServerInfo server = new SMTQueryServerInfo((Element) node); + DBRecord serverRec = server.queryServer(db); + port= serverRec.getString("server_port"); + System.out.println("Server 鏌ヨ缁撴灉: " + serverRec); + } + } finally { + db.close(); + } + } catch (Exception ex) { + throw new Exception("init server info error : " + this._agentId, ex); + } + } + + @Override + public SMTJavaAIError callAgents(String jsonPath, Json jsonArgs, SMTLLMConnect llm, String question, SMTAIServerRequest tranReq) throws Exception + { + SMTJsonWriter jsonWrResult = tranReq.getResultJsonWr(); + + StringBuilder sbAnswer = new StringBuilder(); + SMTJavaAIError error = callRAGServer(question, tranReq, sbAnswer); + if(error != null) + return error; + jsonWrResult.addKeyValue("json_ok", true); + jsonWrResult.addKeyValue("answer_type", "knowledge"); + jsonWrResult.beginArray("knowledge"); + { + jsonWrResult.beginMap(null); + jsonWrResult.addKeyValue("answer", sbAnswer.toString()); + jsonWrResult.endMap(); + } + jsonWrResult.endArray(); + return null; + } + + + + private SMTJavaAIError callRAGServer(String question, SMTAIServerRequest tranReq, StringBuilder sbAnswer) throws Exception { + System.out.println("杩涘叆浜唋ight"); + OkHttpClient okHttpClient = new OkHttpClient.Builder() + .readTimeout(0, TimeUnit.SECONDS) // 涓嶈秴鏃讹紝鏀寔娴� + .build(); + HttpServletResponse response = tranReq.getResponse(); + response.setContentType("application/json"); + response.setCharacterEncoding("UTF-8"); + + SMTJsonWriter jsonWriter = new SMTJsonWriter(false); + jsonWriter.addKeyValue("query",question); + jsonWriter.addKeyValue("mode","global"); + jsonWriter.addKeyValue("response_type","Multiple Paragraphs"); + jsonWriter.addKeyValue("top_k",10); + jsonWriter.addKeyValue("max_token_for_text_unit",4000); + jsonWriter.addKeyValue("max_token_for_global_context",4000); + jsonWriter.addKeyValue("max_token_for_local_context",4000); + jsonWriter.addKeyValue("only_need_context",false); + jsonWriter.addKeyValue("only_need_prompt",false); + jsonWriter.addKeyValue("stream",true); + jsonWriter.addKeyValue("history_turns",3); + String json = jsonWriter.getRootJson().toString(); + + RequestBody body = RequestBody.create(MediaType.parse("application/json"), json); + + String streamHook ="http://localhost:"+port; + String lightragQueyStream = (String)SMTAIServerApp.getApp().getGlobalConfig("lightrag_quey_stream"); + Request request = new Request.Builder() + .url(streamHook+lightragQueyStream) + .post(body) + .build(); + tranReq.sendChunkedBlock("begin_stream", ""); + ObjectMapper objectMapper = new ObjectMapper(); // 鐢ㄤ簬瑙f瀽 JSON + try (Response lightRagResp = okHttpClient.newCall(request).execute()) { + if (lightRagResp.body() != null) { + BufferedReader reader = new BufferedReader(new InputStreamReader(lightRagResp.body().byteStream(), StandardCharsets.UTF_8)); + String line; + while ((line = reader.readLine()) != null) { + try { + // 姣忚鏄� {"response": "xxx"}锛屾垜浠彧鍙栧嚭 "xxx" + JsonNode node = objectMapper.readTree(line); + String content = node.get("response").asText(); // 鑷姩澶勭悊缂栫爜闂锛岃繑鍥炵殑鏄腑鏂� + + // 閫愬瓧绗︽帹閫� + for (char ch : content.toCharArray()) { + tranReq.sendChunkedStreamBlock(com.smtservlet.util.SMTStatic.toString(ch)); + sbAnswer.append(ch); + Thread.sleep(50); + } + + } catch (Exception e) { + System.err.println("瑙f瀽澶辫触锛�" + line); + } + } + } + } catch (Exception e) { + String errMsg = "璋冪敤 LightRAG 澶辫触: " + e.getMessage(); + for (char ch : errMsg.toCharArray()) { + tranReq.sendChunkedStreamBlock(com.smtservlet.util.SMTStatic.toString(ch)); + sbAnswer.append(ch); + } + } finally { + tranReq.sendChunkedBlock("end_stream", ""); + } + + return null; + } + +} -- Gitblit v1.9.3