package com.smtaiserver.smtaiserver.javaai.metrics;
|
|
import java.util.ArrayList;
|
import java.util.HashMap;
|
import java.util.LinkedHashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
import org.dom4j.Document;
|
import org.dom4j.Element;
|
import org.dom4j.Node;
|
import org.duckdb.DuckDBAppender;
|
import org.duckdb.DuckDBConnection;
|
|
import com.smtaiserver.smtaiserver.core.SMTAIServerApp;
|
import com.smtaiserver.smtaiserver.core.SMTAIServerRequest;
|
import com.smtaiserver.smtaiserver.database.SMTDatabase;
|
import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecord;
|
import com.smtaiserver.smtaiserver.database.SMTDatabase.DBRecords;
|
import com.smtaiserver.smtaiserver.javaai.SMTJavaAIError;
|
import com.smtaiserver.smtaiserver.javaai.ast.ASTDBMap;
|
import com.smtaiserver.smtaiserver.javaai.ast.ASTTimeRange;
|
import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs;
|
import com.smtaiserver.smtaiserver.javaai.duckdb.DuckResult;
|
import com.smtaiserver.smtaiserver.javaai.duckdb.DuckCubeRecs.DuckCubeColTitle;
|
import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTDuckTimeGroupName;
|
import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricSqlXml;
|
import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricsDefXmlDim;
|
import com.smtaiserver.smtaiserver.javaai.metrics.base.SMTMetricSqlXml.SQLXMLQuery;
|
import com.smtservlet.util.Json;
|
import com.smtservlet.util.SMTStatic;
|
|
public class SMTMetricsDefDuckTimeAggChart extends SMTMetricsDefXmlDim
|
{
|
private static class TimeBucketTable
|
{
|
public String _simSqlPerfix;
|
public String _chartType;
|
public String _titleField;
|
public String[] _onameFields;
|
public String _timeField;
|
public String _groupField;
|
public char _groupType;
|
public String _timeStepTitle;
|
public int _timeStepUnit;
|
public int _timeStepValue;
|
public SMTMetricSqlXml _sqlxmlMeticName;
|
public SMTMetricSqlXml _sqlxmlMeticValue;
|
|
public TimeBucketTable(Element xmlTimeBucket) throws Exception
|
{
|
_simSqlPerfix = "";
|
String sTimeStep = SMTStatic.getXmlAttr(xmlTimeBucket, "time_step");
|
_timeStepTitle = SMTStatic.getXmlAttr(xmlTimeBucket, "time_title");
|
_chartType = SMTStatic.getXmlAttr(xmlTimeBucket, "chart_type", "");
|
|
int[] aTimeStep = SMTAIServerApp.convStrToTimeStep(sTimeStep);
|
if(aTimeStep == null)
|
throw new Exception("can't parse time_step : " + sTimeStep);
|
|
_timeStepUnit = aTimeStep[0];
|
_timeStepValue = aTimeStep[1];
|
|
Element xmlNameSQL = (Element)xmlTimeBucket.selectSingleNode("NAME_SQL");
|
String sNameSQLRef = SMTStatic.getXmlAttr(xmlNameSQL, "ref", null);
|
if(!SMTStatic.isNullOrEmpty(sNameSQLRef))
|
xmlNameSQL = (Element) xmlNameSQL.getDocument().selectSingleNode("ROOT/" + sNameSQLRef);
|
_onameFields = SMTStatic.getXmlAttr(xmlNameSQL, "oname_fields").toUpperCase().split(",");
|
_titleField = SMTStatic.getXmlAttr(xmlNameSQL, "title_fields").toUpperCase();
|
_sqlxmlMeticName = new SMTMetricSqlXml(xmlNameSQL);
|
|
Element xmlValueSQL = (Element)xmlTimeBucket.selectSingleNode("VALUE_SQL");
|
String sValueSQLRef = SMTStatic.getXmlAttr(xmlValueSQL, "ref", null);
|
if(!SMTStatic.isNullOrEmpty(sValueSQLRef))
|
xmlValueSQL = (Element) xmlNameSQL.getDocument().selectSingleNode("ROOT/" + sValueSQLRef);
|
_timeField = SMTStatic.getXmlAttr(xmlValueSQL, "time_field").toUpperCase();
|
_groupField = SMTStatic.getXmlAttr(xmlValueSQL, "group_field").toUpperCase();
|
_groupType = SMTStatic.getXmlAttr(xmlValueSQL, "group_type").toUpperCase().charAt(0);
|
_sqlxmlMeticValue = new SMTMetricSqlXml(xmlValueSQL);
|
|
}
|
|
public int compareTimeStep(int unit, int value)
|
{
|
int subUnit = _timeStepUnit - unit;
|
if(subUnit != 0)
|
return subUnit;
|
|
int subValue = _timeStepValue - value;
|
if(subValue == 0)
|
return 0;
|
if(subValue < 0)
|
return -1;
|
|
return 1;
|
}
|
}
|
|
///////////////////////////////////////////////////////////////////////////////////////
|
private List<SMTDuckTimeGroupName> _listGroupNameInfo = new ArrayList<>();
|
private List<TimeBucketTable> _listTimeBucketTable = new ArrayList<>();
|
|
public boolean isChartValues()
|
{
|
return true;
|
}
|
|
@Override
|
protected void initInstanceByDoc(DBRecord rec, Document doc) throws Exception
|
{
|
super.initInstanceByDoc(rec, doc);
|
|
// 读取分组信息
|
for(Node nodeGroupName : doc.selectNodes("ROOT/GROUP_NAMES/GROUP_NAME"))
|
{
|
SMTDuckTimeGroupName groupNameInfo = new SMTDuckTimeGroupName((Element)nodeGroupName);
|
_listGroupNameInfo.add(groupNameInfo);
|
}
|
|
// 将时序表
|
for(Node nodeTimeBucket : doc.selectNodes("ROOT/TIME_BUCKET"))
|
{
|
TimeBucketTable timeBucketTable = new TimeBucketTable((Element)nodeTimeBucket);
|
for(int i = 0; i < _listTimeBucketTable.size(); i ++)
|
{
|
TimeBucketTable curTable = _listTimeBucketTable.get(i);
|
|
int comp = curTable.compareTimeStep(timeBucketTable._timeStepUnit, timeBucketTable._timeStepValue);
|
|
if(comp == 0)
|
{
|
throw new Exception("time bucket existed at : " + SMTStatic.toInt(i));
|
}
|
else if(comp < 0)
|
{
|
_listTimeBucketTable.add(i, timeBucketTable);
|
timeBucketTable = null;
|
break;
|
}
|
}
|
if(timeBucketTable != null)
|
_listTimeBucketTable.add(timeBucketTable);
|
}
|
|
if(_listTimeBucketTable.size() == 0)
|
throw new Exception("time bucket is empty : " + this._id + " : " + this._title);
|
|
}
|
|
private SMTJavaAIError parseSQLValueFilter(Json jsonCondList, String fieldName, char valueType, StringBuilder r_SQL) throws Exception
|
{
|
SMTJavaAIError error = null;
|
|
if(!jsonCondList.isArray())
|
return null;
|
|
List<Json> listCond = jsonCondList.asJsonList();
|
if(listCond.size() == 0)
|
return null;
|
|
Json jsonCond = listCond.get(0);
|
|
// 解析[第一个表达式] [逻辑符号] [第二个表达式]
|
if(jsonCond.isArray())
|
{
|
r_SQL.append("(");
|
if((error = parseSQLValueFilter(jsonCond, fieldName, valueType, r_SQL)) != null)
|
return error;
|
r_SQL.append(")");
|
|
if(listCond.size() > 1)
|
{
|
r_SQL.append(listCond.get(1).asString());
|
|
r_SQL.append("(");
|
if((error = parseSQLValueFilter(jsonCond, fieldName, valueType, r_SQL)) != null)
|
return error;
|
r_SQL.append(")");
|
}
|
|
}
|
// 解析[操作符号] [值]
|
else if(jsonCond.isString())
|
{
|
r_SQL.append(fieldName);
|
r_SQL.append(" " + jsonCond.asString() + " ");
|
|
String value = listCond.get(1).asString();
|
if(valueType == 'S')
|
{
|
r_SQL.append("'" + value.replace("'", "''") + "'");
|
}
|
else if(valueType == 'T')
|
{
|
r_SQL.append("'" + SMTStatic.toString(SMTStatic.toDate(value)) + "'");
|
}
|
else
|
{
|
r_SQL.append(SMTStatic.toString(SMTStatic.toDouble(value)));
|
}
|
}
|
else
|
throw new Exception("jsonCond is not array or value:" + jsonCond.toString());
|
|
return null;
|
|
}
|
|
// 东南厂近五天出厂每两日出厂流量最大值
|
// 五一广场按照生产厂家分组且查询XXX厂生产的的昨日压力范围在20-30的明细
|
// 查询五一广场前三天内每天的压力平均值
|
// 五一广场和光明路昨日压力每小时为单位的压力最大值
|
// 按照设备类型分组获取出厂日期为2001-01-01的昨日压力明细
|
// 统计压力指标关联设备个数
|
|
@Override
|
public SMTJavaAIError queryMetrics(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map<String, String> extArg, SMTAIServerRequest tranReq, DuckResult r_result) throws Exception
|
{
|
SMTJavaAIError error = this.executeQueryMetrics(jsonPath, dbMap, jsonAST, extArg, tranReq, r_result);
|
if(error != null)
|
{
|
appendSampleQuestionToResponse(jsonAST, tranReq);
|
return error;
|
}
|
|
return null;
|
}
|
|
protected int getLastDimPosByFilter(Json jsonAST, List<SMTDuckTimeGroupName> listGroupNameInfo)
|
{
|
int lastPos = -1;
|
|
for(String key : jsonAST.asJsonMap().keySet())
|
{
|
SMTSQLXMLDimDef dimDef = _mapId2SqlDimDef.get(key);
|
if(dimDef == null)
|
continue;
|
|
String dimName = dimDef._dimDef.getId();
|
|
for(int i = 0; i < listGroupNameInfo.size(); i ++)
|
{
|
SMTDuckTimeGroupName groupNameInfo = listGroupNameInfo.get(i);
|
if(groupNameInfo._idCol.equalsIgnoreCase(dimName))
|
{
|
if(lastPos < i)
|
lastPos = i;
|
}
|
}
|
}
|
return lastPos;
|
}
|
|
public SMTJavaAIError executeQueryMetrics(String jsonPath, ASTDBMap dbMap, Json jsonAST, Map<String, String> extArg, SMTAIServerRequest tranReq, DuckResult r_result) throws Exception
|
{
|
tranReq.sendChunkedBlock("begin", "分析要查询指标\"" + this._title + "\"的查询条件...");
|
|
|
SMTJavaAIError error = null;
|
TimeBucketTable timeBucket = null;
|
|
// 获取步长时间
|
String stepTime = jsonAST.safeGetStr("step_time", null);
|
|
// 如果步长时间不为空,则采用最接近步长时间间隔的值
|
int[] timeStep = null;
|
if(!SMTStatic.isNullOrEmpty(stepTime))
|
{
|
timeStep = SMTAIServerApp.convStrToTimeStep(stepTime);
|
|
if(timeStep != null)
|
{
|
for(TimeBucketTable curTimeBucket : this._listTimeBucketTable)
|
{
|
int comp = curTimeBucket.compareTimeStep(timeStep[0], timeStep[1]);
|
if(comp <= 0)
|
{
|
|
timeBucket = curTimeBucket;
|
break;
|
}
|
}
|
}
|
|
}
|
|
// 如果步长时间为空,则采用最小时间间隔的值
|
if(timeBucket == null)
|
{
|
timeBucket = this._listTimeBucketTable.get(this._listTimeBucketTable.size() - 1);
|
}
|
|
String chartType = timeBucket._chartType;
|
if(timeStep != null && SMTStatic.isNullOrEmpty(chartType))
|
{
|
if(timeStep[0] == 1 || timeStep[1] >= 24 * 60)
|
chartType = "bar";
|
}
|
|
String startTime = jsonAST.safeGetStr("start_time", null);
|
String endTime = jsonAST.safeGetStr("end_time", null);
|
boolean hasTimeValue = !SMTStatic.isNullOrEmpty(startTime) && !SMTStatic.isNullOrEmpty(endTime);
|
|
if(!hasTimeValue)
|
{
|
String[] timeRange = new String[2];
|
if((error = this.promptUserForTimeRange("请输入时间范围", tranReq, timeRange)) != null)
|
return error;
|
|
hasTimeValue = true;
|
jsonAST.set("start_time", timeRange[0]);
|
jsonAST.set("end_time", timeRange[1]);
|
startTime = timeRange[0];
|
endTime = timeRange[1];
|
}
|
|
// 如果存在查询名称的sqlxml,则首先查询名称
|
Map<String, Object> mapId2SqlArg = new HashMap<>();
|
|
// 解析要分组的维度列表
|
Map<String, SMTSQLXMLDimDef> mapIdDimDef = new LinkedHashMap<>();
|
if((error = parseDimListFromJson(jsonAST, tranReq, mapIdDimDef)) != null)
|
return error;
|
|
StringBuilder sbDIM_NAME_GROUP = new StringBuilder();
|
for(String name : mapIdDimDef.keySet())
|
{
|
sbDIM_NAME_GROUP.append(",");
|
sbDIM_NAME_GROUP.append(name);
|
}
|
|
// 解析过滤条件
|
Map<String, String> mapName2EqDimValue = new HashMap<>();
|
StringBuilder sbDIM_NAME_FILTERS = new StringBuilder();
|
if((error = parseSQLFilterFromJson(dbMap, jsonAST, timeBucket._simSqlPerfix, tranReq, sbDIM_NAME_FILTERS, mapName2EqDimValue)) != null)
|
return error;
|
|
// 查询最后一个维度
|
int lastDimPos = getLastDimPosByFilter(jsonAST, _listGroupNameInfo);
|
|
// 设置sql参数
|
mapId2SqlArg.put("__METRICS_ID__", this.getId());
|
mapId2SqlArg.put("__DIM_NAME_FILTERS__", sbDIM_NAME_FILTERS.toString());
|
mapId2SqlArg.put("__DIM_NAME_GROUP__", sbDIM_NAME_GROUP.toString());
|
mapId2SqlArg.put("__AGG_DIM_GROUP_NAMES__", _listGroupNameInfo);
|
mapId2SqlArg.put("__AGG_DIM_LAST_POS__", lastDimPos);
|
|
// 生成SQL
|
SQLXMLQuery queryONAME = new SQLXMLQuery();
|
if((error = timeBucket._sqlxmlMeticName.parseSQL(dbMap, jsonAST, mapId2SqlArg, tranReq, queryONAME)) != null)
|
return error;
|
|
// 查询结果集
|
tranReq.traceLLMDebug(queryONAME.getSqlLog());
|
|
// 输出查询条件
|
StringBuilder sbQueryProcess = new StringBuilder();
|
sbQueryProcess.append("查询从" + startTime + "到" + endTime);
|
if(sbDIM_NAME_FILTERS.length() > 0)
|
sbQueryProcess.append("的查询条件为:[" + sbDIM_NAME_FILTERS.toString() + "]");
|
if(!SMTStatic.isNullOrEmpty(stepTime))
|
sbQueryProcess.append("的按照" + stepTime + " 为步长");
|
|
sbQueryProcess.append("的数据...");
|
tranReq.sendChunkedBlock("begin", sbQueryProcess.toString());
|
|
// 查询指标名称记录
|
DBRecords recsONAME = queryONAME._db.querySQL(queryONAME._sbSQLText.toString(), queryONAME._sqlParams.toArray(new Object[queryONAME._sqlParams.size()]));
|
if(recsONAME.getRowCount() == 0)
|
return new SMTJavaAIError("当前设备未匹配指标,请检查设备名是否拼写错误。");
|
|
recsONAME.limitRecord(1);
|
|
// 生成指标值记录
|
for(DBRecord recONAME : recsONAME.getRecords())
|
{
|
DuckCubeRecs astRS = new DuckCubeRecs();
|
r_result._listRecordset.add(astRS);
|
|
if(hasTimeValue)
|
{
|
astRS._timeRange = new ASTTimeRange();
|
astRS._timeRange._startTime = SMTStatic.toDate(startTime);
|
astRS._timeRange._pathStartTime = jsonPath + "start_time";
|
astRS._timeRange._endTime = SMTStatic.toDate(endTime);
|
astRS._timeRange._pathEndTime = jsonPath + "end_time";
|
astRS._timeRange._timeField = timeBucket._timeField;
|
astRS._timeRange._valueField = timeBucket._groupField;
|
}
|
|
astRS._chartType = chartType;
|
astRS._title = timeBucket._timeStepTitle + this._title;
|
astRS._chartTitle = recONAME.getString(timeBucket._titleField);
|
astRS._chartUnit = this._unit;
|
astRS._listGroupNameInfo = this._listGroupNameInfo;
|
|
// 将查询延时数据的值放入参数映射表
|
for(String onameField : timeBucket._onameFields)
|
{
|
if(recONAME.getColIndex(onameField) < 0)
|
continue;
|
mapId2SqlArg.put(onameField, recONAME.getString(onameField));
|
}
|
|
// 获取value的过滤条件
|
StringBuilder sbSQLFilterValue = new StringBuilder();
|
Json jsonFilterValue = jsonAST.safeGetJson("value");
|
if(jsonFilterValue != null)
|
{
|
if((error = parseSQLValueFilter(jsonFilterValue, timeBucket._groupField, timeBucket._groupType, sbSQLFilterValue)) != null)
|
return error;
|
|
if(sbSQLFilterValue.length() > 0)
|
mapId2SqlArg.put("__VALUE_FILTER__", sbSQLFilterValue.toString());
|
}
|
|
// 生成SQL
|
SQLXMLQuery queryValue = new SQLXMLQuery();
|
if((error = timeBucket._sqlxmlMeticValue.parseSQL(dbMap, jsonAST, mapId2SqlArg, tranReq, queryValue)) != null)
|
return error;
|
|
// 如果不包含步长,却包含聚合操作,则直接对数据库做聚合操作
|
String realSQL = queryValue._sbSQLText.toString();
|
if(SMTStatic.isNullOrEmpty(stepTime) && jsonAST.has("step_op_key"))
|
{
|
String stepOper = jsonAST.safeGetStr("step_op_key", "AVG");
|
String stepTitle = jsonAST.safeGetStr("step_op_title", "平均");
|
|
realSQL = "SELECT " + stepOper + "(" + timeBucket._groupField + ") AS __CNT__ FROM (" + realSQL + ") T";
|
|
astRS._title = astRS._title + "的" + stepTitle;
|
astRS._recsType = DuckCubeRecs.DuckCubeRecsType.SUMMARY;
|
astRS._mapCol2Title.put("__CNT__", new DuckCubeColTitle(stepTitle, true, "COUNT".equals(stepOper)));
|
}
|
|
// 查询结果集
|
tranReq.traceLLMDebug(queryValue.getSqlLog());
|
DuckDBAppender[] appender = new DuckDBAppender[1];
|
try
|
{
|
queryValue._db.querySQLNotify(realSQL, queryValue._sqlParams.toArray(new Object[] {queryValue._sqlParams.size()}), new SMTDatabase.DBQueryNotifyMeta()
|
{
|
@Override
|
public boolean onMetaInfo(DBRecords metaInfo, String[] colTypes) throws Exception
|
{
|
astRS._tableName = "t_" + SMTStatic.newUUID();
|
SMTDatabase dbResult = tranReq.createResultTable(astRS._tableName, metaInfo, colTypes);
|
appender[0] = ((DuckDBConnection)dbResult.getConnection()).createAppender(DuckDBConnection.DEFAULT_SCHEMA, astRS._tableName);
|
return true;
|
}
|
|
@Override
|
public boolean onNextRecord(DBRecord rec) throws Exception
|
{
|
Object[] values = rec.getValues();
|
|
appender[0].beginRow();
|
for(Object value : values)
|
{
|
if(value == null)
|
appender[0].append(null);
|
else
|
appender[0].append(SMTStatic.toString(value));
|
}
|
|
appender[0].endRow();
|
return true;
|
}
|
});
|
}
|
finally
|
{
|
if(appender[0] != null)
|
{
|
appender[0].close();
|
appender[0] = null;
|
}
|
}
|
|
// 如果包含步长,则对步长做操作
|
if(!SMTStatic.isNullOrEmpty(stepTime))
|
{
|
String stepOper = jsonAST.safeGetStr("step_op_key", "AVG");
|
String stepTitle = jsonAST.safeGetStr("step_op_title", "平均");
|
String stepTimePath = jsonAST.safeGetStr("step_time_path", null);
|
|
astRS._timeRange._timeStep = SMTAIServerApp.convTimeStepToUnitStr(timeStep[0], timeStep[1]);
|
astRS._timeRange._pathTimeSteps = SMTStatic.isNullOrEmpty(stepTimePath) ?
|
new String[] { jsonPath + "step_time"} : new String[] { jsonPath + "step_time", stepTimePath};
|
|
tranReq.timeBucketTable(astRS._tableName, "OTIME", stepOper + "(" + timeBucket._groupField + ")" , timeBucket._groupField, timeStep[0], timeStep[1]);
|
astRS._title = astRS._title + "每" + SMTAIServerApp.convTimeStepToStr(timeStep[0], timeStep[1]) + "的" + stepTitle;
|
}
|
|
|
}
|
|
return error;
|
}
|
|
}
|