using System;
|
using System.Collections.Generic;
|
using System.Linq;
|
using System.Text;
|
using System.Threading.Tasks;
|
using IStation.IStoreQueue;
|
using RabbitMQ.Client;
|
using RabbitMQ.Client.Events;
|
|
namespace IStation.StoreQueue
|
{
|
/// <summary>
|
/// 处理辅助类
|
/// </summary>
|
public class HandleHelper : IHandleHelper
|
{
|
//连接工厂
|
private static ConnectionFactory Factory = new ConnectionFactory()
|
{
|
HostName = ConfigHelper.RabbitMq_HostName,
|
UserName = ConfigHelper.RabbitMq_UserName,
|
Password = ConfigHelper.RabbitMq_Password
|
};
|
|
private IConnection _conn_receive = null;//接收连接对象
|
private IModel _channel_receive = null;//接收会话
|
|
/// <summary>
|
/// 推送
|
/// </summary>
|
public bool Push<T>(string flag, T t)
|
{
|
if (t == null)
|
return default;
|
|
try
|
{
|
var queueName = QueueHelper.GetQueueName(flag);
|
using (var con = Factory.CreateConnection())//创建连接对象
|
{
|
using (var channel = con.CreateModel())
|
{
|
channel.QueueDeclare(queueName, true, false, false, null);//定义通道
|
var properties = channel.CreateBasicProperties();
|
properties.Persistent = true;//数据持久化
|
var message = JsonHelper.Object2Json(t);
|
var body = Encoding.UTF8.GetBytes(message);
|
channel.BasicPublish("", queueName, properties, body);//插入数据
|
}
|
}
|
return true;
|
}
|
catch (Exception ex)
|
{
|
LogHelper.Error(ex.Message, ex);
|
return false;
|
}
|
}
|
|
/// <summary>
|
/// 接收
|
/// </summary>
|
public void Receive<T>(string flag, Func<T, bool> func)
|
{
|
if (_conn_receive != null)
|
return;
|
if (_channel_receive != null)
|
return;
|
var queueName = QueueHelper.GetQueueName(flag);
|
_conn_receive = Factory.CreateConnection();//创建连接对象
|
_channel_receive = _conn_receive.CreateModel();
|
_channel_receive.QueueDeclare(queueName, true, false, false, null);//定义通道
|
_channel_receive.BasicQos(0, 1, false);//公平分发
|
var consumer = new EventingBasicConsumer(_channel_receive);//创建消费者
|
consumer.Received += (model, ea) =>
|
{
|
byte[] body = ea.Body.ToArray();
|
var message = Encoding.UTF8.GetString(body);
|
var data = JsonHelper.Json2Object<T>(message);
|
var result = func(data);
|
if (result)
|
{
|
_channel_receive.BasicAck(ea.DeliveryTag, false);
|
}
|
else
|
{
|
_channel_receive.BasicNack(ea.DeliveryTag, false, true);
|
}
|
};
|
|
_channel_receive.BasicConsume(queueName, false, consumer);//开始消费
|
}
|
|
/// <summary>
|
/// 关闭
|
/// </summary>
|
public void Close()
|
{
|
if (_channel_receive != null)
|
{
|
if (_channel_receive.IsOpen)
|
{
|
_channel_receive.Close();
|
}
|
}
|
if (_conn_receive != null)
|
{
|
if (_conn_receive.IsOpen)
|
{
|
_conn_receive.Close();
|
}
|
}
|
}
|
|
}
|
}
|