package com.smtaiserver.smtaiserver.javaai.jsonflow.node; import java.io.BufferedReader; import java.io.File; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import com.smtaiserver.smtaiserver.core.SMTAIServerApp; import com.smtaiserver.smtaiserver.javaai.SMTJavaAIError; import com.smtaiserver.smtaiserver.javaai.jsonflow.core.SMTJsonFlowExecArg; import com.smtaiserver.smtaiserver.javaai.jsonflow.core.SMTJsonFlowManager; import com.smtaiserver.smtaiserver.javaai.jsonflow.core.SMTJsonFlowNodeOnlyOutput; import com.smtaiserver.smtaiserver.javaai.qwen.SMTQwenApp; import com.smtservlet.util.Json; import com.smtservlet.util.SMTStatic; public class SMTJsonFlowNodePython extends SMTJsonFlowNodeOnlyOutput { private String[] _inputArg; private String _outputArg; private String _code; private static Logger _logger = LogManager.getLogger(SMTQwenApp.class); @Override public void initInstane(SMTJsonFlowManager manager, Json jsonNode) throws Exception { super.initInstane(manager, jsonNode); List jsonGroupParams = jsonNode.getJsonPath("data|group_params|", false).asJsonList(); for(Json jsonParams : jsonGroupParams) { String type = jsonParams.getJsonPath("params|0|type", false).asString(); if("code_input".equals(type)) { String sInputArg = jsonParams.getJsonPath("params|0|value|value", false).asString(); if(!SMTStatic.isNullOrEmpty(sInputArg)) _inputArg = sInputArg.split(","); } else if("code_output".equals(type)) { _outputArg = jsonParams.getJsonPath("params|0|value|value", false).asString(); } else if("code".equals(type)) { _code = jsonParams.getJsonPath("params|0|value", false).asString(); } } } @Override public void afterInstance() throws Exception { super.afterInstance(); if(SMTStatic.isNullOrEmpty(_code)) throw new Exception("script function code is not define"); } @Override public SMTJavaAIError executeFlowNode(SMTJsonFlowExecArg execArg) throws Exception { Json jsonPython = (Json)SMTAIServerApp.getApp().getGlobalConfig("python_env"); String pyPath = jsonPython.getJson("path").asString(); String argText = ""; if(_inputArg != null && _inputArg.length > 0) { Json jsonArg = Json.object(); for(String key : _inputArg) { jsonArg.set(key, execArg._jsonArgs.getJson(key)); } argText = jsonArg.toString(); } String id = SMTStatic.newUUID(); File inputFile = new File(pyPath, "py_i_" + id); File outputFile = new File(pyPath, "py_o_" + id); File executeFile = new File(pyPath, "py_x_" + id); outputFile.delete(); SMTStatic.saveTextFile(inputFile , argText); SMTStatic.saveTextFile(executeFile , _code); try { List params = new ArrayList(); params.add(jsonPython.getJson("exec").asString()); params.add(executeFile.getAbsolutePath()); params.add(inputFile.getAbsolutePath()); params.add(outputFile.getAbsolutePath()); _logger.info("python : start : " + executeFile.getAbsolutePath()); ProcessBuilder pbuilder=new ProcessBuilder(SMTStatic.convProcessArg(params)); pbuilder.directory(new File(pyPath)); pbuilder.redirectErrorStream(true); Process process = pbuilder.start(); BufferedReader bufferedStdout = new BufferedReader(new InputStreamReader(process.getInputStream(), SMTStatic.getStdoutEncode())); execArg._tranReq.sendChunkedBlock("begin_stream", ""); try { int size; char[] sBuf = new char[1024]; while((size = bufferedStdout.read(sBuf)) > 0) { String text = new String(sBuf, 0, size); execArg._tranReq.sendChunkedStreamBlock(text); } } finally { execArg._tranReq.sendChunkedBlock("end_stream", ""); } int exitCode = process.waitFor(); _logger.info("python : end : " + exitCode + " : " + executeFile.getAbsolutePath()); if(!SMTStatic.isNullOrEmpty(_outputArg)) { if(outputFile.exists()) { String text = SMTStatic.readAllText(outputFile); execArg._jsonArgs.set(_outputArg, text); } else { execArg._jsonArgs.set(_outputArg, ""); } } } finally { inputFile.delete(); outputFile.delete(); executeFile.delete(); } return super.executeFlowNode(execArg); } }