package com.smtscript.lib.kettle; import java.io.ByteArrayInputStream; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map.Entry; import org.dom4j.Document; import org.dom4j.Node; import org.dom4j.io.SAXReader; import org.mozilla.javascript.NativeObject; import com.smtscript.lib.JSComment; import com.smtscript.utils.HttpClient; import com.smtscript.utils.SMTStatic; public class ScriptKettleCarte { private boolean _jobRunning; private String _jobState; private String _baseUrl; private String _authHead; private String _repoName; private String _jobId; private HttpClient _web; public ScriptKettleCarte(NativeObject nvConfig) throws Exception { _web = new HttpClient(); _jobRunning = false; _baseUrl = (String) SMTStatic.getJSValue(nvConfig, "base_url"); _repoName = (String) SMTStatic.getJSValue(nvConfig, "repo_name"); String user = (String) SMTStatic.getJSValue(nvConfig, "user"); String pass = (String) SMTStatic.getJSValue(nvConfig, "pass"); if(!SMTStatic.isNullOrEmpty(user) && !SMTStatic.isNullOrEmpty(pass)) { _authHead = "Basic " + Base64.getEncoder().encodeToString(String.format("%s:%s", user, pass).getBytes()); } } @JSComment( "" + "@param nvConfig \n" + " job_name - String : job name\n" + " wait_end - Boolean(true): Wait for job end\n" + " params - Map : params for job\n" + "" ) public boolean executeJob(NativeObject nvConfig) throws Exception { if(_jobRunning) throw new Exception("current job is running"); // 初始化基本参数 Boolean waitEnd = (Boolean)SMTStatic.getJSValue(nvConfig, "wait_end", true); String jobName = (String) SMTStatic.getJSValue(nvConfig, "job_name"); List listParams = new ArrayList(); listParams.add("xml"); listParams.add("Y"); listParams.add("rep"); listParams.add(_repoName); listParams.add("job"); listParams.add(jobName); // 初始化自定义参数 NativeObject nvParams = (NativeObject) SMTStatic.getJSValue(nvConfig, "params", null); if(nvParams != null) { for(Entry entry : nvParams.entrySet()) { if(entry.getValue() == null) throw new Exception("kettlen param is not define : " + entry.getKey()); listParams.add(entry.getKey().toString()); listParams.add(entry.getValue().toString()); } } // 查询服务器 Document doc = queryHttpXml("executeJob", listParams.toArray(new String[listParams.size()])); if(doc == null) { return false; } _jobId = doc.selectSingleNode("webresult/id").getText(); _jobRunning = true; if(waitEnd) { waitForJobEnd(-1); return isJobSucceed(); } return false; } public boolean isJobSucceed() throws Exception { if(_jobRunning) throw new Exception("current job is running"); return "Finished".equals(_jobState); } public boolean waitForJobEnd(int timeout) throws Exception { if(!_jobRunning) throw new Exception("current job is NOT running"); long startTick = System.currentTimeMillis(); while(true) { Document doc = queryHttpXml("jobStatus", new String[]{ "xml", "Y", "id",_jobId, }); if(doc == null) { _jobState = "NULL_XML"; _jobRunning = false; return true; } Node xmlStatus = doc.selectSingleNode("jobstatus/status_desc"); if(xmlStatus == null) { _jobState = "NO_STATUS_DESC : " + doc.getText(); _jobRunning = false; return true; } String status = xmlStatus.getText(); if(status.startsWith("Finished")) { _jobState = status; _jobRunning = false; return true; } if(timeout >= 0) { long curTick = System.currentTimeMillis(); if((curTick - startTick) >= timeout) break; } Thread.sleep(500); } return false; } private Document queryHttpXml(String cmd, String[] params) throws Exception { byte[] xmlBytes = _web.getHttpBytes( _baseUrl + cmd, params, new String[]{"Authorization", _authHead}, "UTF-8", null); ByteArrayInputStream bis = new ByteArrayInputStream(xmlBytes); SAXReader reader = new SAXReader(); Document document = reader.read(bis); return document; } }