using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IStation.Server { /// /// 监测任务辅助类 /// public class MonitorJobHelper : IJobHelper { private readonly RabbitMqQueueHelper _queueHelper = new RabbitMqQueueHelper(); /// /// 开始任务 /// public void StartJob() { _queueHelper.Receive(ConfigHelper.QueueName, (data) => { try { if (data == null) { LogHelper.Error("监测数据存储服务中,数据序列化失败,自动跳过当前数据!"); return true; } if (data.Records == null || data.Records.Count < 1) { LogHelper.Error($"监测数据存储服务中,客户标识:{data.CorpID},数据记录为空,自动跳过当前数据!"); return true; } #region 写日志 if (ConfigHelper.IsWriteLog) { LogHelper.Info($"监测数据存储服务中,客户标识:{data.CorpID},数据数量-{data.Records.Count}!"); } #endregion var result = new Service.MonitorRealRecord().Insert(data.CorpID, data.Records); if (!result) { LogHelper.Error($"监测数据存储服务中,客户标识:{data.CorpID},当前通道处于阻塞状态 !"); } else { var flow_record = new Model.MonitorStoreFlowRecord(); flow_record.CorpID = data.CorpID; flow_record.StoreTime = DateTime.Now; flow_record.StoreCount = data.Records.Count; flow_record.StoreContent = data.Records.Select(x => x.MonitorPointID).Distinct().ToList(); new RedisCache.MonitorStoreFlowRecordCacheHelper().SetLastRecord(flow_record); } return result; } catch (Exception ex) { LogHelper.Error("监测数据存储服务中,数据消息队列出错,自动跳过该条数据", ex); return true; } }); } /// /// 取消任务 /// public void CancelJob() { if (_queueHelper == null) return; _queueHelper.Close(); } } }