using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using IStation.IDataDockingQueue;
namespace IStation.DataDockingQueue
{
public class HandleHelper : IHandleHelper
{
//连接工厂
private static ConnectionFactory Factory = new ConnectionFactory()
{
HostName = ConfigHelper.RabbitMq_HostName,
UserName = ConfigHelper.RabbitMq_UserName,
Password = ConfigHelper.RabbitMq_Password
};
private IConnection _conn_receive = null;//接收连接对象
private IModel _channel_receive = null;//接收会话
///
/// 推入数据
///
public bool Push(string flag, T t)
{
if (t == null)
return default;
try
{
var queueName = QueueHelper.GetQueueName(flag);
using (var con = Factory.CreateConnection())//创建连接对象
{
using (var channel = con.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);//定义通道
var properties = channel.CreateBasicProperties();
properties.Persistent = true;//数据持久化
var message = JsonHelper.Object2Json(t);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queueName, properties, body);//插入数据
}
}
return true;
}
catch (Exception ex)
{
LogHelper.Error(ex.Message, ex);
return false;
}
}
///
/// 接收数据
///
public void Receive(string flag, Func func)
{
if (_conn_receive != null)
return;
if (_channel_receive != null)
return;
var queueName = QueueHelper.GetQueueName(flag);
_conn_receive = Factory.CreateConnection();//创建连接对象
_channel_receive = _conn_receive.CreateModel();
_channel_receive.QueueDeclare(queueName, true, false, false, null);//定义通道
_channel_receive.BasicQos(0, 1, false);//公平分发
var consumer = new EventingBasicConsumer(_channel_receive);//创建消费者
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var data = JsonHelper.Json2Object(message);
var result = func(data);
if (result)
{
_channel_receive.BasicAck(ea.DeliveryTag, false);
}
else
{
_channel_receive.BasicNack(ea.DeliveryTag, false, true);
}
};
_channel_receive.BasicConsume(queueName, false, consumer);//开始消费
}
///
/// 关闭
///
public void Close()
{
if (_channel_receive != null)
{
if (_channel_receive.IsOpen)
{
_channel_receive.Close();
}
}
if (_conn_receive != null)
{
if (_conn_receive.IsOpen)
{
_conn_receive.Close();
}
}
}
}
}