using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using IStation.IDataDockingQueue; namespace IStation.DataDockingQueue { public class HandleHelper : IHandleHelper { //连接工厂 private static ConnectionFactory Factory = new ConnectionFactory() { HostName = ConfigHelper.RabbitMq_HostName, UserName = ConfigHelper.RabbitMq_UserName, Password = ConfigHelper.RabbitMq_Password }; private IConnection _conn_receive = null;//接收连接对象 private IModel _channel_receive = null;//接收会话 /// /// 推入数据 /// public bool Push(string flag, T t) { if (t == null) return default; try { var queueName = QueueHelper.GetQueueName(flag); using (var con = Factory.CreateConnection())//创建连接对象 { using (var channel = con.CreateModel()) { channel.QueueDeclare(queueName, true, false, false, null);//定义通道 var properties = channel.CreateBasicProperties(); properties.Persistent = true;//数据持久化 var message = JsonHelper.Object2Json(t); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queueName, properties, body);//插入数据 } } return true; } catch (Exception ex) { LogHelper.Error(ex.Message, ex); return false; } } /// /// 接收数据 /// public void Receive(string flag, Func func) { if (_conn_receive != null) return; if (_channel_receive != null) return; var queueName = QueueHelper.GetQueueName(flag); _conn_receive = Factory.CreateConnection();//创建连接对象 _channel_receive = _conn_receive.CreateModel(); _channel_receive.QueueDeclare(queueName, true, false, false, null);//定义通道 _channel_receive.BasicQos(0, 1, false);//公平分发 var consumer = new EventingBasicConsumer(_channel_receive);//创建消费者 consumer.Received += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var data = JsonHelper.Json2Object(message); var result = func(data); if (result) { _channel_receive.BasicAck(ea.DeliveryTag, false); } else { _channel_receive.BasicNack(ea.DeliveryTag, false, true); } }; _channel_receive.BasicConsume(queueName, false, consumer);//开始消费 } /// /// 关闭 /// public void Close() { if (_channel_receive != null) { if (_channel_receive.IsOpen) { _channel_receive.Close(); } } if (_conn_receive != null) { if (_conn_receive.IsOpen) { _conn_receive.Close(); } } } } }