package com.smtaiserver.smtaiserver.javaai.metrics; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.dom4j.Document; import org.dom4j.Element; import org.duckdb.DuckDBAppender; import org.duckdb.DuckDBConnection; import com.smtaiserver.smtaiserver.core.SMTAIServerApp; import com.smtaiserver.smtaiserver.core.SMTAIServerRequest; import com.smtaiserver.smtaiserver.database.SMTDatabase; import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecord; import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecords; import com.smtaiserver.smtaiserver.javaai.SMTJavaAIError; import com.smtaiserver.smtaiserver.javaai.ast.ASTDBMap; import com.smtaiserver.smtaiserver.javaai.ast.ASTDimFilter; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs.DuckCubeColTitle; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs.DuckCubeRecsType; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckResult; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTDimensionDef; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricSqlXml; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricsDefXmlDim; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricSqlXml.SQLXMLQuery; import com.smtservlet.util.Json; import com.smtservlet.util.SMTStatic; public class SMTMetricsDefDuckBaseRec extends SMTMetricsDefXmlDim { private SMTMetricSqlXml _sqlxmlMeticName; private SMTMetricSqlXml _sqlxmlQuotaName; private int _colKeyCount; private String _colDevKeyName; private String _posXColName; private String _posYColName; @Override public boolean isTimeValues() { return false; } @Override protected void initInstanceByDoc(DBRecord rec, Document doc) throws Exception { super.initInstanceByDoc(rec, doc); Element xmlNameSQL = (Element)doc.selectSingleNode("ROOT/NAME_SQL"); _colDevKeyName = SMTStatic.getXmlAttr(xmlNameSQL, "dev_key_col", null); _posXColName = SMTStatic.getXmlAttr(xmlNameSQL, "pos_x_col", null); _posYColName = SMTStatic.getXmlAttr(xmlNameSQL, "pos_y_col", null); if(!(_posXColName == null && _posYColName == null) && !(_posXColName != null && _posYColName != null)) throw new Exception("pos_x_col and pos_y_col is different"); _colKeyCount = SMTStatic.toInt(SMTStatic.getXmlAttr(xmlNameSQL, "key_cols", "1")); String sNameSQLRef = SMTStatic.getXmlAttr(xmlNameSQL, "ref", null); if(!SMTStatic.isNullOrEmpty(sNameSQLRef)) xmlNameSQL = (Element) xmlNameSQL.getDocument().selectSingleNode("ROOT/" + sNameSQLRef); _sqlxmlMeticName = new SMTMetricSqlXml(xmlNameSQL); Element xmlQuotaSQL = (Element)doc.selectSingleNode("ROOT/QUOTA_SQL"); if(xmlQuotaSQL != null) { _sqlxmlQuotaName = new SMTMetricSqlXml(xmlQuotaSQL); } } private SMTJavaAIError queryNearRangeSQL(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map extArg, SMTAIServerRequest tranReq, SQLXMLQuery r_queryONAME) throws Exception { SMTJavaAIError error = null; // 获取当前的范围配置 Json jsonNearRange = jsonAST.safeGetJson("near_range"); // 如果不存在范围配置,则直接忽略 if(jsonNearRange == null) return null; // 生成范围查询记录集 List listNearRange = new ArrayList<>(); if(jsonNearRange.isObject()) { listNearRange.add(jsonNearRange); } else if(jsonNearRange.isArray()) { for(Json json : jsonNearRange.asJsonList()) { listNearRange.add(json); } } else { return new SMTJavaAIError("解析范围错误"); } for(Json jsonRange : listNearRange) { if(_posXColName == null) return new SMTJavaAIError("当前指标不支持定位操作"); Double posX; Double posY; boolean isCurPos = "Y".equalsIgnoreCase(jsonRange.safeGetStr("cur_pos", "N")); if(isCurPos) { double[] curPos = tranReq.getCurQuestionPos(); if(curPos == null) return new SMTJavaAIError("当前位置未定位,无法依据当前位置回答问题"); posX = curPos[0]; posY = curPos[1]; } else { // 解析过滤条件 StringBuilder sbDIM_NAME_FILTERS = new StringBuilder(); if((error = parseSQLFilterFromJson(dbMap, jsonRange, "", tranReq, sbDIM_NAME_FILTERS, null)) != null) return error; // 设置sql参数 Map mapId2SqlArg = new HashMap<>(); mapId2SqlArg.put("__METRICS_ID__", this.getId()); mapId2SqlArg.put("__DIM_NAME_FILTERS__", sbDIM_NAME_FILTERS.toString()); mapId2SqlArg.put("__DIM_NAME_GROUP__", ""); // 生成SQL SQLXMLQuery queryRANGE = new SQLXMLQuery(); if((error = this._sqlxmlMeticName.parseSQL(dbMap, jsonAST, mapId2SqlArg, tranReq, queryRANGE)) != null) return error; // 查询结果集 tranReq.traceLLMDebug(queryRANGE.getSqlLog()); // 执行SQL,查询到坐标 DBRecords recs = queryRANGE._db.querySQL(queryRANGE._sbSQLText.toString(), queryRANGE._sqlParams.toArray(new Object[queryRANGE._sqlParams.size()])); if(recs.getRowCount() == 0) return new SMTJavaAIError("未找到指定范围的设备"); // 获取设备坐标 DBRecord rec = recs.getRecord(0); posX = rec.getDouble(_posXColName); posY = rec.getDouble(_posYColName); if(posX == null || posY == null) return new SMTJavaAIError("指定范围的设备无坐标"); } // 对主查询结果外围套壳 String rangeOp = jsonRange.getJson("operate").asString(); if("范围".equals(rangeOp)) { double rangeValue = jsonRange.getJson("value").asDouble(); r_queryONAME._sbSQLText.insert(0, "SELECT * FROM (SELECT *,sqrt(" + " pow(" + _posXColName + " - " + SMTStatic.toString(posX) + ", 2)" + "+pow(" + _posYColName + " - " + SMTStatic.toString(posY) + ", 2)" + ") as __dist__ FROM ("); r_queryONAME._sbSQLText.append(")T)T WHERE __dist__ > 0.001 AND __dist__ < " + SMTStatic.toString(rangeValue)); } else if("最近".equals(rangeOp)) { r_queryONAME._sbSQLText.insert(0, "SELECT * FROM (SELECT *,sqrt(" + " pow(" + _posXColName + " - " + SMTStatic.toString(posX) + ", 2)" + "+pow(" + _posYColName + " - " + SMTStatic.toString(posY) + ", 2)" + ") as __dist__ FROM ("); r_queryONAME._sbSQLText.append(")T)T WHERE __dist__ > 0.001 ORDER BY __dist__ LIMIT 1 "); } else if("最远".equals(rangeOp)) { r_queryONAME._sbSQLText.insert(0, "SELECT * FROM (SELECT *,sqrt(" + " pow(" + _posXColName + " - " + SMTStatic.toString(posX) + ", 2)" + "+pow(" + _posYColName + " - " + SMTStatic.toString(posY) + ", 2)" + ") as __dist__ FROM ("); r_queryONAME._sbSQLText.append(")T)T WHERE __dist__ > 0.001 ORDER BY __dist__ DESC LIMIT 1 "); } else { return new SMTJavaAIError("指定范围的设备查询条件[" + rangeOp + "]不支持"); } // NOTE: 目前只支持第一个对象的查询,所以生成第一个SQL后就退出 break; } return null; } private SMTJavaAIError queryNoGroupRecords(ASTDBMap dbMap, DuckCubeRecs astRS, String stepOpKey, Json jsonAST, SQLXMLQuery queryONAME, SMTAIServerRequest tranReq) throws Exception { astRS._recsType = DuckCubeRecsType.RECORD; // 如果设备包含了所属指标名称,则查出指标名称列表 Map mapDevKey2QuotaUrl = new HashMap<>(); if(_sqlxmlQuotaName != null && _colDevKeyName != null) { SQLXMLQuery queryQuota = new SQLXMLQuery(); SMTJavaAIError error = _sqlxmlQuotaName.parseSQL(dbMap, null, null, tranReq, queryQuota); if(error != null) return error; DBRecords recsQuotaName = queryQuota._db.querySQL(queryQuota._sbSQLText.toString(), queryQuota._sqlParams.size() == 0 ? null : queryQuota._sqlParams.toArray(new Object[] {queryQuota._sqlParams.size()})); for(DBRecord recQuotaName : recsQuotaName.getRecords()) { String devKey = recQuotaName.getString("dev_key"); String queryJson = recQuotaName.getString("query_json"); StringBuilder sbJsonList = mapDevKey2QuotaUrl.get(devKey); if(sbJsonList == null) { sbJsonList = new StringBuilder(); mapDevKey2QuotaUrl.put(devKey, sbJsonList); } if(sbJsonList.length() > 0) sbJsonList.append(","); sbJsonList.append(queryJson); } } // 查询结果集 tranReq.traceLLMDebug(queryONAME.getSqlLog()); DuckDBAppender[] appender = new DuckDBAppender[1]; int[] colDevKeyPos = new int[] {-1}; int[] colXYKeyPos = new int[] {-1, -1}; try { queryONAME._db.querySQLNotify(queryONAME._sbSQLText.toString(), queryONAME._sqlParams.toArray(new Object[queryONAME._sqlParams.size()]), new SMTDatabase.DBQueryNotifyMeta() { @Override public boolean onMetaInfo(DBRecords metaInfo, String[] sColTypes) throws Exception { astRS._tableName = "t_" + SMTStatic.newUUID(); SMTDatabase dbResult = tranReq.createResultTable(astRS._tableName, metaInfo, sColTypes); appender[0] = ((DuckDBConnection)dbResult.getConnection()).createAppender(DuckDBConnection.DEFAULT_SCHEMA, astRS._tableName); // 如果存在需要保存的设备从属指标json,则首先获取此字段的位置 if(_colDevKeyName != null) { colDevKeyPos[0] = metaInfo.getFieldIndex(_colDevKeyName); } if(_posXColName != null && _posYColName != null) { colXYKeyPos[0] = metaInfo.getFieldIndex(_posXColName); colXYKeyPos[1] = metaInfo.getFieldIndex(_posYColName); if(colXYKeyPos[0] == -1 || colXYKeyPos[1] == -1) { colXYKeyPos[0] = -1; colXYKeyPos[1] = -1; } } return true; } @Override public boolean onNextRecord(DBRecord rec) throws Exception { Object[] values = rec.getValues(); // 如果存在需要保存的设备从属指标json,则替换当前值 if(colDevKeyPos[0] >= 0) { StringBuilder sbValue = mapDevKey2QuotaUrl.get(values[colDevKeyPos[0]]); if(sbValue == null) values[colDevKeyPos[0]] = null; else values[colDevKeyPos[0]] = "[" + sbValue.toString() + "]"; } // 如果存在坐标地址,则做坐标转换 if(colXYKeyPos[0] >= 0) { Object x = values[colXYKeyPos[0]]; Object y = values[colXYKeyPos[1]]; if(x != null && y != null) { double[] gisPos = SMTAIServerApp.getApp().convMapToGisTransform(new double[] {SMTStatic.toDouble(x), SMTStatic.toDouble(y)}); values[colXYKeyPos[0]] = gisPos[0]; values[colXYKeyPos[1]] = gisPos[1]; } } // 保存结果 appender[0].beginRow(); for(Object value : values) { if(value == null) appender[0].append(null); else appender[0].append(SMTStatic.toString(value)); } appender[0].endRow(); return true; } }); } finally { if(appender[0] != null) appender[0].close(); } return null; } private SMTJavaAIError queryGroupRecords(DuckCubeRecs astRS, String stepOpKey, Json jsonAST, SQLXMLQuery queryONAME, SMTAIServerRequest tranReq) throws Exception { //"step_dim_name" String groupAggField = "*"; String groupOperate = "COUNT"; if(!"COUNT".equals(stepOpKey)) { // {"is_output":false,"step_op_key":"SUM","step_op_title":"累计值","step_dim_name":"pipe_length"} groupAggField = jsonAST.safeGetStr("step_dim_name", null); if(SMTStatic.isNullOrEmpty(groupAggField)) return new SMTJavaAIError("未指定需要统计的字段"); SMTSQLXMLDimDef sqlXmlDimDef = this._mapId2SqlDimDef.get(groupAggField); if(sqlXmlDimDef == null) return new SMTJavaAIError("需要统计的维度[" + groupAggField + "]未定义"); // 设置标题 SMTDimensionDef dimDef = SMTAIServerApp.getApp().getDimensionDef(groupAggField); boolean isCount = false; if("SUM".equals(stepOpKey)) isCount = true; astRS._title += jsonAST.safeGetStr("step_op_title", ""); astRS._chartUnit = dimDef.getUnitName(true); astRS._mapCol2Title.put("__CNT__", new DuckCubeColTitle(jsonAST.safeGetStr("step_op_title", "") + dimDef.getUnitName(true), true, isCount)); groupOperate = jsonAST.getJson("step_op_key").asString(); } else { astRS._title += jsonAST.safeGetStr("step_op_title", ""); astRS._mapCol2Title.put("__CNT__", new DuckCubeColTitle("个数", true, true)); } // 按照分组进行统计个数 if(astRS._dimNames.size() > 0) { String groupDimField = ""; for(int i = 0; i < astRS._dimNames.size(); i ++) { if(i > 0) groupDimField += ","; groupDimField += astRS._dimNames.get(i); } String sql = "SELECT " + groupDimField + ", " + groupOperate + "(" + groupAggField + ") AS __CNT__ FROM (\n" + queryONAME._sbSQLText.toString() + "\n)T \nGROUP BY " + groupDimField; // 查询结果集 StringBuilder sbLogSQL = new StringBuilder(); sbLogSQL.append(sql + "\n"); for(Object param : queryONAME._sqlParams) { sbLogSQL.append(" param : " + SMTStatic.toString(param) + "\n"); } tranReq.traceLLMDebug("\n" + sbLogSQL.toString()); astRS._recsType = DuckCubeRecsType.RECORD; // 分组统计后,分组字段意义就失去了 astRS._dimNames.clear(); DuckDBAppender[] appender = new DuckDBAppender[1]; try { queryONAME._db.querySQLNotify(sql, queryONAME._sqlParams.toArray(new Object[queryONAME._sqlParams.size()]), new SMTDatabase.DBQueryNotifyMeta() { @Override public boolean onMetaInfo(DBRecords metaInfo, String[] colTypes) throws Exception { astRS._tableName = "t_" + SMTStatic.newUUID(); SMTDatabase dbResult = tranReq.createResultTable(astRS._tableName, metaInfo, colTypes); appender[0] = ((DuckDBConnection)dbResult.getConnection()).createAppender(DuckDBConnection.DEFAULT_SCHEMA, astRS._tableName); return true; } @Override public boolean onNextRecord(DBRecord rec) throws Exception { Object[] values = rec.getValues(); appender[0].beginRow(); for(Object value : values) { appender[0].append(value == null ? null : SMTStatic.toString(value)); } appender[0].endRow(); return true; } }); } finally { if(appender[0] != null) { appender[0].close(); appender[0] = null; } } } // 无分组直接统计个数 else { String sql = "SELECT " + groupOperate + "(" + groupAggField + ") AS CNT FROM (\n" + queryONAME._sbSQLText.toString() + "\n)T"; // 查询结果集 StringBuilder sbLogSQL = new StringBuilder(); sbLogSQL.append(sql + "\n"); for(Object param : queryONAME._sqlParams) { sbLogSQL.append(" param : " + SMTStatic.toString(param) + "\n"); } tranReq.traceLLMDebug(sbLogSQL.toString()); astRS._recsType = DuckCubeRecsType.SUMMARY; DuckDBAppender[] appender = new DuckDBAppender[1]; try { queryONAME._db.querySQLNotify(sql, queryONAME._sqlParams.toArray(new Object[queryONAME._sqlParams.size()]), new SMTDatabase.DBQueryNotifyMeta() { @Override public boolean onMetaInfo(DBRecords metaInfo, String[] colTypes) throws Exception { astRS._tableName = "t_" + SMTStatic.newUUID(); SMTDatabase dbResult = tranReq.createResultTable(astRS._tableName, metaInfo, colTypes); appender[0] = ((DuckDBConnection)dbResult.getConnection()).createAppender(DuckDBConnection.DEFAULT_SCHEMA, astRS._tableName); return true; } @Override public boolean onNextRecord(DBRecord rec) throws Exception { Object[] values = rec.getValues(); appender[0].beginRow(); for(Object value : values) { appender[0].append(value == null ? null : SMTStatic.toString(value)); } appender[0].endRow(); return true; } }); } finally { if(appender[0] != null) { appender[0].close(); appender[0] = null; } } } return null; } // 按生产厂家统计流量计个数 // 按照付款类型对用户表信息进行统计 @Override public SMTJavaAIError queryMetrics(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map extArg, SMTAIServerRequest tranReq, DuckResult r_result) throws Exception { SMTJavaAIError error = this.executeQueryMetrics(jsonPath, dbMap, jsonAST, extArg, tranReq, r_result); if(error != null) { appendSampleQuestionToResponse(jsonAST, tranReq); return error; } else { return null; } } public SMTJavaAIError executeQueryMetrics(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map extArg, SMTAIServerRequest tranReq, DuckResult r_result) throws Exception { DuckCubeRecs astRS = new DuckCubeRecs(); // 将维度过滤器加上 astRS._listDimFilter = new ArrayList<>(); for(SMTSQLXMLDimDef sqlXmlDimDef : _mapId2SqlDimDef.values()) { if(SMTStatic.isNullOrEmpty(sqlXmlDimDef._filterType)) continue; ASTDimFilter dimFilter = new ASTDimFilter(); astRS._listDimFilter.add(dimFilter); dimFilter._dimPath = jsonPath + sqlXmlDimDef._dimDef.getId(); dimFilter._filterType = sqlXmlDimDef._filterType; dimFilter._dimDef = sqlXmlDimDef._dimDef; } astRS._colKeyCount = this._colKeyCount; if(this._posXColName != null) { astRS._posXName = this._posXColName; astRS._posYName = this._posYColName; } if(this._colDevKeyName != null) { astRS._devKeyName = _colDevKeyName; } r_result._listRecordset.add(astRS); SMTJavaAIError error = null; // 如果存在查询名称的sqlxml,则首先查询名称 Map mapId2SqlArg = new HashMap<>(); // 解析要分组的维度列表 Map mapIdDimDef = new LinkedHashMap<>(); if((error = parseDimListFromJson(jsonAST, tranReq, mapIdDimDef)) != null) return error; StringBuilder sbDIM_NAME_GROUP = new StringBuilder(); for(String name : mapIdDimDef.keySet()) { sbDIM_NAME_GROUP.append(","); sbDIM_NAME_GROUP.append(name); } // 解析过滤条件 Map mapName2EqDimValue = new HashMap<>(); StringBuilder sbDIM_NAME_FILTERS = new StringBuilder(); if((error = parseSQLFilterFromJson(dbMap, jsonAST, "", tranReq, sbDIM_NAME_FILTERS, mapName2EqDimValue)) != null) return error; // 设置sql参数 mapId2SqlArg.put("__METRICS_ID__", this.getId()); mapId2SqlArg.put("__DIM_NAME_FILTERS__", sbDIM_NAME_FILTERS.toString()); mapId2SqlArg.put("__DIM_NAME_GROUP__", sbDIM_NAME_GROUP.toString()); // 生成SQL SQLXMLQuery queryONAME = new SQLXMLQuery(); if((error = this._sqlxmlMeticName.parseSQL(dbMap, jsonAST, mapId2SqlArg, tranReq, queryONAME)) != null) return error; // 利用地理范围做SQL二次包装 if((error = queryNearRangeSQL(jsonPath, dbMap, jsonAST, extArg, tranReq, queryONAME)) != null) return error; tranReq.traceLLMDebug(queryONAME.getSqlLog()); // 创建cube记录集 for(Entry entry : mapIdDimDef.entrySet()) { astRS._dimNames.add(entry.getKey()); } String title; if(astRS._dimNames.size() == 0) { title = this._title; } else { StringBuilder sbTitle = new StringBuilder(); for(String dimName : astRS._dimNames) { if(sbTitle.length() > 0) sbTitle.append(","); sbTitle.append(this._mapId2SqlDimDef.get(dimName)._dimDef.getName()); } sbTitle.insert(0, "按照"); sbTitle.append("分类的" + this._title); title = sbTitle.toString(); } astRS._title = title; // 输出查询条件 StringBuilder sbQueryProcess = new StringBuilder(); sbQueryProcess.append("查询"); if(astRS._dimNames.size() > 0) { sbQueryProcess.append("以:"); for(String dimName : astRS._dimNames) { sbQueryProcess.append(SMTAIServerApp.getApp().getDimensionDef(dimName).getName() + " "); } sbQueryProcess.append("为分组"); } else { sbQueryProcess.append("所有"); } sbQueryProcess.append("的数据..."); tranReq.sendChunkedBlock("begin", sbQueryProcess.toString()); // 如果存在聚合操作,则直接将SQL变成聚合操作 String stepOpKey = jsonAST.safeGetStr("step_op_key", ""); if(!SMTStatic.isNullOrEmpty(stepOpKey)) { if((error = this.queryGroupRecords(astRS, stepOpKey, jsonAST, queryONAME, tranReq)) != null) return error; } // 如果不存在聚合操作则查询所有结果 else { if((error = this.queryNoGroupRecords(dbMap, astRS, null, jsonAST, queryONAME, tranReq)) != null) return error; } return null; } }