package com.smtaiserver.smtaiserver.javaai.metrics; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.Node; import org.duckdb.DuckDBAppender; import org.duckdb.DuckDBConnection; import com.smtaiserver.smtaiserver.core.SMTAIServerApp; import com.smtaiserver.smtaiserver.core.SMTAIServerRequest; import com.smtaiserver.smtaiserver.database.SMTDatabase; import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecord; import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecords; import com.smtaiserver.smtaiserver.javaai.SMTJavaAIError; import com.smtaiserver.smtaiserver.javaai.ast.ASTDBMap; import com.smtaiserver.smtaiserver.javaai.ast.ASTTimeRange; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckResult; import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs.DuckCubeColTitle; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTDuckTimeGroupName; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricSqlXml; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricsDefXmlDim; import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricSqlXml.SQLXMLQuery; import com.smtservlet.util.Json; import com.smtservlet.util.SMTStatic; public class SMTMetricsDefDuckTimeAggChart extends SMTMetricsDefXmlDim { private static class TimeBucketTable { public String _simSqlPerfix; public String _chartType; public String _titleField; public String[] _onameFields; public String _timeField; public String _groupField; public char _groupType; public String _timeStepTitle; public int _timeStepUnit; public int _timeStepValue; public SMTMetricSqlXml _sqlxmlMeticName; public SMTMetricSqlXml _sqlxmlMeticValue; public TimeBucketTable(Element xmlTimeBucket) throws Exception { _simSqlPerfix = ""; String sTimeStep = SMTStatic.getXmlAttr(xmlTimeBucket, "time_step"); _timeStepTitle = SMTStatic.getXmlAttr(xmlTimeBucket, "time_title"); _chartType = SMTStatic.getXmlAttr(xmlTimeBucket, "chart_type", ""); int[] aTimeStep = SMTAIServerApp.convStrToTimeStep(sTimeStep); if(aTimeStep == null) throw new Exception("can't parse time_step : " + sTimeStep); _timeStepUnit = aTimeStep[0]; _timeStepValue = aTimeStep[1]; Element xmlNameSQL = (Element)xmlTimeBucket.selectSingleNode("NAME_SQL"); String sNameSQLRef = SMTStatic.getXmlAttr(xmlNameSQL, "ref", null); if(!SMTStatic.isNullOrEmpty(sNameSQLRef)) xmlNameSQL = (Element) xmlNameSQL.getDocument().selectSingleNode("ROOT/" + sNameSQLRef); _onameFields = SMTStatic.getXmlAttr(xmlNameSQL, "oname_fields").toUpperCase().split(","); _titleField = SMTStatic.getXmlAttr(xmlNameSQL, "title_fields").toUpperCase(); _sqlxmlMeticName = new SMTMetricSqlXml(xmlNameSQL); Element xmlValueSQL = (Element)xmlTimeBucket.selectSingleNode("VALUE_SQL"); String sValueSQLRef = SMTStatic.getXmlAttr(xmlValueSQL, "ref", null); if(!SMTStatic.isNullOrEmpty(sValueSQLRef)) xmlValueSQL = (Element) xmlNameSQL.getDocument().selectSingleNode("ROOT/" + sValueSQLRef); _timeField = SMTStatic.getXmlAttr(xmlValueSQL, "time_field").toUpperCase(); _groupField = SMTStatic.getXmlAttr(xmlValueSQL, "group_field").toUpperCase(); _groupType = SMTStatic.getXmlAttr(xmlValueSQL, "group_type").toUpperCase().charAt(0); _sqlxmlMeticValue = new SMTMetricSqlXml(xmlValueSQL); } public int compareTimeStep(int unit, int value) { int subUnit = _timeStepUnit - unit; if(subUnit != 0) return subUnit; int subValue = _timeStepValue - value; if(subValue == 0) return 0; if(subValue < 0) return -1; return 1; } } /////////////////////////////////////////////////////////////////////////////////////// private List _listGroupNameInfo = new ArrayList<>(); private List _listTimeBucketTable = new ArrayList<>(); public boolean isChartValues() { return true; } @Override protected void initInstanceByDoc(DBRecord rec, Document doc) throws Exception { super.initInstanceByDoc(rec, doc); // 读取分组信息 for(Node nodeGroupName : doc.selectNodes("ROOT/GROUP_NAMES/GROUP_NAME")) { SMTDuckTimeGroupName groupNameInfo = new SMTDuckTimeGroupName((Element)nodeGroupName); _listGroupNameInfo.add(groupNameInfo); } // 将时序表 for(Node nodeTimeBucket : doc.selectNodes("ROOT/TIME_BUCKET")) { TimeBucketTable timeBucketTable = new TimeBucketTable((Element)nodeTimeBucket); for(int i = 0; i < _listTimeBucketTable.size(); i ++) { TimeBucketTable curTable = _listTimeBucketTable.get(i); int comp = curTable.compareTimeStep(timeBucketTable._timeStepUnit, timeBucketTable._timeStepValue); if(comp == 0) { throw new Exception("time bucket existed at : " + SMTStatic.toInt(i)); } else if(comp < 0) { _listTimeBucketTable.add(i, timeBucketTable); timeBucketTable = null; break; } } if(timeBucketTable != null) _listTimeBucketTable.add(timeBucketTable); } if(_listTimeBucketTable.size() == 0) throw new Exception("time bucket is empty : " + this._id + " : " + this._title); } private SMTJavaAIError parseSQLValueFilter(Json jsonCondList, String fieldName, char valueType, StringBuilder r_SQL) throws Exception { SMTJavaAIError error = null; if(!jsonCondList.isArray()) return null; List listCond = jsonCondList.asJsonList(); if(listCond.size() == 0) return null; Json jsonCond = listCond.get(0); // 解析[第一个表达式] [逻辑符号] [第二个表达式] if(jsonCond.isArray()) { r_SQL.append("("); if((error = parseSQLValueFilter(jsonCond, fieldName, valueType, r_SQL)) != null) return error; r_SQL.append(")"); if(listCond.size() > 1) { r_SQL.append(listCond.get(1).asString()); r_SQL.append("("); if((error = parseSQLValueFilter(jsonCond, fieldName, valueType, r_SQL)) != null) return error; r_SQL.append(")"); } } // 解析[操作符号] [值] else if(jsonCond.isString()) { r_SQL.append(fieldName); r_SQL.append(" " + jsonCond.asString() + " "); String value = listCond.get(1).asString(); if(valueType == 'S') { r_SQL.append("'" + value.replace("'", "''") + "'"); } else if(valueType == 'T') { r_SQL.append("'" + SMTStatic.toString(SMTStatic.toDate(value)) + "'"); } else { r_SQL.append(SMTStatic.toString(SMTStatic.toDouble(value))); } } else throw new Exception("jsonCond is not array or value:" + jsonCond.toString()); return null; } // 东南厂近五天出厂每两日出厂流量最大值 // 五一广场按照生产厂家分组且查询XXX厂生产的的昨日压力范围在20-30的明细 // 查询五一广场前三天内每天的压力平均值 // 五一广场和光明路昨日压力每小时为单位的压力最大值 // 按照设备类型分组获取出厂日期为2001-01-01的昨日压力明细 // 统计压力指标关联设备个数 @Override public SMTJavaAIError queryMetrics(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map extArg, SMTAIServerRequest tranReq, DuckResult r_result) throws Exception { SMTJavaAIError error = this.executeQueryMetrics(jsonPath, dbMap, jsonAST, extArg, tranReq, r_result); if(error != null) { appendSampleQuestionToResponse(jsonAST, tranReq); return error; } return null; } protected int getLastDimPosByFilter(Json jsonAST, List listGroupNameInfo) { int lastPos = -1; for(String key : jsonAST.asJsonMap().keySet()) { SMTSQLXMLDimDef dimDef = _mapId2SqlDimDef.get(key); if(dimDef == null) continue; String dimName = dimDef._dimDef.getId(); for(int i = 0; i < listGroupNameInfo.size(); i ++) { SMTDuckTimeGroupName groupNameInfo = listGroupNameInfo.get(i); if(groupNameInfo._idCol.equalsIgnoreCase(dimName)) { if(lastPos < i) lastPos = i; } } } return lastPos; } public SMTJavaAIError executeQueryMetrics(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map extArg, SMTAIServerRequest tranReq, DuckResult r_result) throws Exception { tranReq.sendChunkedBlock("begin", "分析要查询指标\"" + this._title + "\"的查询条件..."); SMTJavaAIError error = null; TimeBucketTable timeBucket = null; // 获取步长时间 String stepTime = jsonAST.safeGetStr("step_time", null); // 如果步长时间不为空,则采用最接近步长时间间隔的值 int[] timeStep = null; if(!SMTStatic.isNullOrEmpty(stepTime)) { timeStep = SMTAIServerApp.convStrToTimeStep(stepTime); if(timeStep != null) { for(TimeBucketTable curTimeBucket : this._listTimeBucketTable) { int comp = curTimeBucket.compareTimeStep(timeStep[0], timeStep[1]); if(comp <= 0) { timeBucket = curTimeBucket; break; } } } } // 如果步长时间为空,则采用最小时间间隔的值 if(timeBucket == null) { timeBucket = this._listTimeBucketTable.get(this._listTimeBucketTable.size() - 1); } String chartType = timeBucket._chartType; if(timeStep != null && SMTStatic.isNullOrEmpty(chartType)) { if(timeStep[0] == 1 || timeStep[1] >= 24 * 60) chartType = "bar"; } String startTime = jsonAST.safeGetStr("start_time", null); String endTime = jsonAST.safeGetStr("end_time", null); boolean hasTimeValue = !SMTStatic.isNullOrEmpty(startTime) && !SMTStatic.isNullOrEmpty(endTime); if(!hasTimeValue) { String[] timeRange = new String[2]; if((error = this.promptUserForTimeRange("请输入时间范围", tranReq, timeRange)) != null) return error; hasTimeValue = true; jsonAST.set("start_time", timeRange[0]); jsonAST.set("end_time", timeRange[1]); startTime = timeRange[0]; endTime = timeRange[1]; } // 如果存在查询名称的sqlxml,则首先查询名称 Map mapId2SqlArg = new HashMap<>(); // 解析要分组的维度列表 Map mapIdDimDef = new LinkedHashMap<>(); if((error = parseDimListFromJson(jsonAST, tranReq, mapIdDimDef)) != null) return error; StringBuilder sbDIM_NAME_GROUP = new StringBuilder(); for(String name : mapIdDimDef.keySet()) { sbDIM_NAME_GROUP.append(","); sbDIM_NAME_GROUP.append(name); } // 解析过滤条件 Map mapName2EqDimValue = new HashMap<>(); StringBuilder sbDIM_NAME_FILTERS = new StringBuilder(); if((error = parseSQLFilterFromJson(dbMap, jsonAST, timeBucket._simSqlPerfix, tranReq, sbDIM_NAME_FILTERS, mapName2EqDimValue)) != null) return error; // 查询最后一个维度 int lastDimPos = getLastDimPosByFilter(jsonAST, _listGroupNameInfo); // 设置sql参数 mapId2SqlArg.put("__METRICS_ID__", this.getId()); mapId2SqlArg.put("__DIM_NAME_FILTERS__", sbDIM_NAME_FILTERS.toString()); mapId2SqlArg.put("__DIM_NAME_GROUP__", sbDIM_NAME_GROUP.toString()); mapId2SqlArg.put("__AGG_DIM_GROUP_NAMES__", _listGroupNameInfo); mapId2SqlArg.put("__AGG_DIM_LAST_POS__", lastDimPos); // 生成SQL SQLXMLQuery queryONAME = new SQLXMLQuery(); if((error = timeBucket._sqlxmlMeticName.parseSQL(dbMap, jsonAST, mapId2SqlArg, tranReq, queryONAME)) != null) return error; // 查询结果集 tranReq.traceLLMDebug(queryONAME.getSqlLog()); // 输出查询条件 StringBuilder sbQueryProcess = new StringBuilder(); sbQueryProcess.append("查询从" + startTime + "到" + endTime); if(sbDIM_NAME_FILTERS.length() > 0) sbQueryProcess.append("的查询条件为:[" + sbDIM_NAME_FILTERS.toString() + "]"); if(!SMTStatic.isNullOrEmpty(stepTime)) sbQueryProcess.append("的按照" + stepTime + " 为步长"); sbQueryProcess.append("的数据..."); tranReq.sendChunkedBlock("begin", sbQueryProcess.toString()); // 查询指标名称记录 DBRecords recsONAME = queryONAME._db.querySQL(queryONAME._sbSQLText.toString(), queryONAME._sqlParams.toArray(new Object[queryONAME._sqlParams.size()])); if(recsONAME.getRowCount() == 0) return new SMTJavaAIError("当前设备未匹配指标,请检查设备名是否拼写错误。"); recsONAME.limitRecord(1); // 生成指标值记录 for(DBRecord recONAME : recsONAME.getRecords()) { DuckCubeRecs astRS = new DuckCubeRecs(); r_result._listRecordset.add(astRS); if(hasTimeValue) { astRS._timeRange = new ASTTimeRange(); astRS._timeRange._startTime = SMTStatic.toDate(startTime); astRS._timeRange._pathStartTime = jsonPath + "start_time"; astRS._timeRange._endTime = SMTStatic.toDate(endTime); astRS._timeRange._pathEndTime = jsonPath + "end_time"; astRS._timeRange._timeField = timeBucket._timeField; astRS._timeRange._valueField = timeBucket._groupField; } astRS._chartType = chartType; astRS._title = timeBucket._timeStepTitle + this._title; astRS._chartTitle = recONAME.getString(timeBucket._titleField); astRS._chartUnit = this._unit; astRS._listGroupNameInfo = this._listGroupNameInfo; // 将查询延时数据的值放入参数映射表 for(String onameField : timeBucket._onameFields) { if(recONAME.getColIndex(onameField) < 0) continue; mapId2SqlArg.put(onameField, recONAME.getString(onameField)); } // 获取value的过滤条件 StringBuilder sbSQLFilterValue = new StringBuilder(); Json jsonFilterValue = jsonAST.safeGetJson("value"); if(jsonFilterValue != null) { if((error = parseSQLValueFilter(jsonFilterValue, timeBucket._groupField, timeBucket._groupType, sbSQLFilterValue)) != null) return error; if(sbSQLFilterValue.length() > 0) mapId2SqlArg.put("__VALUE_FILTER__", sbSQLFilterValue.toString()); } // 生成SQL SQLXMLQuery queryValue = new SQLXMLQuery(); if((error = timeBucket._sqlxmlMeticValue.parseSQL(dbMap, jsonAST, mapId2SqlArg, tranReq, queryValue)) != null) return error; // 如果不包含步长,却包含聚合操作,则直接对数据库做聚合操作 String realSQL = queryValue._sbSQLText.toString(); if(SMTStatic.isNullOrEmpty(stepTime) && jsonAST.has("step_op_key")) { String stepOper = jsonAST.safeGetStr("step_op_key", "AVG"); String stepTitle = jsonAST.safeGetStr("step_op_title", "平均"); realSQL = "SELECT " + stepOper + "(" + timeBucket._groupField + ") AS __CNT__ FROM (" + realSQL + ") T"; astRS._title = astRS._title + "的" + stepTitle; astRS._recsType = DuckCubeRecs.DuckCubeRecsType.SUMMARY; astRS._mapCol2Title.put("__CNT__", new DuckCubeColTitle(stepTitle, true, "COUNT".equals(stepOper))); } // 查询结果集 tranReq.traceLLMDebug(queryValue.getSqlLog()); DuckDBAppender[] appender = new DuckDBAppender[1]; try { queryValue._db.querySQLNotify(realSQL, queryValue._sqlParams.toArray(new Object[] {queryValue._sqlParams.size()}), new SMTDatabase.DBQueryNotifyMeta() { @Override public boolean onMetaInfo(DBRecords metaInfo, String[] colTypes) throws Exception { astRS._tableName = "t_" + SMTStatic.newUUID(); SMTDatabase dbResult = tranReq.createResultTable(astRS._tableName, metaInfo, colTypes); appender[0] = ((DuckDBConnection)dbResult.getConnection()).createAppender(DuckDBConnection.DEFAULT_SCHEMA, astRS._tableName); return true; } @Override public boolean onNextRecord(DBRecord rec) throws Exception { Object[] values = rec.getValues(); appender[0].beginRow(); for(Object value : values) { if(value == null) appender[0].append(null); else appender[0].append(SMTStatic.toString(value)); } appender[0].endRow(); return true; } }); } finally { if(appender[0] != null) { appender[0].close(); appender[0] = null; } } // 如果包含步长,则对步长做操作 if(!SMTStatic.isNullOrEmpty(stepTime)) { String stepOper = jsonAST.safeGetStr("step_op_key", "AVG"); String stepTitle = jsonAST.safeGetStr("step_op_title", "平均"); String stepTimePath = jsonAST.safeGetStr("step_time_path", null); astRS._timeRange._timeStep = SMTAIServerApp.convTimeStepToUnitStr(timeStep[0], timeStep[1]); astRS._timeRange._pathTimeSteps = SMTStatic.isNullOrEmpty(stepTimePath) ? new String[] { jsonPath + "step_time"} : new String[] { jsonPath + "step_time", stepTimePath}; tranReq.timeBucketTable(astRS._tableName, "OTIME", stepOper + "(" + timeBucket._groupField + ")" , timeBucket._groupField, timeStep[0], timeStep[1]); astRS._title = astRS._title + "每" + SMTAIServerApp.convTimeStepToStr(timeStep[0], timeStep[1]) + "的" + stepTitle; } } return error; } }