using IStation.DataDockingMqtt;
|
using MQTTnet.Client.Connecting;
|
using MQTTnet.Client.Options;
|
using MQTTnet.Client.Receiving;
|
using MQTTnet.Extensions.ManagedClient;
|
using MQTTnet;
|
using System;
|
using System.Collections.Generic;
|
using System.Linq;
|
using System.Text;
|
using System.Threading.Tasks;
|
|
namespace IStation.DataDockingMqtt4SQI_DEMO.sub
|
{
|
public static class Mqtt2
|
{
|
static string TopicName = "tkg07342A3521633000/msg/up";
|
static string MqttUserName = "test";
|
static string MqttUserPwd = "123456";
|
static string MqttHostAdress = "106.14.83.94";//172.19.0.218
|
static int MqttHostPort = 8810;
|
|
public static async void Connect()
|
{
|
//连接到MQTT服务器
|
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
|
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
|
{
|
Console.WriteLine("链接 MQTT2 服务器成功!");
|
});
|
_mqttClient.ConnectingFailedHandler = new ClientConnectingFaildHandler(e =>
|
{
|
Console.WriteLine("链接 MQTT2 服务器失败!");
|
});
|
_mqttClient.UseDisconnectedHandler(ee =>
|
{
|
Console.WriteLine("链接 MQTT2 服务器断开连接!");
|
});
|
_mqttClient.UseConnectedHandler(ee =>
|
{
|
Console.WriteLine("链接 MQTT2 连接到服务!");
|
});
|
|
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived);
|
|
IMqttClientOptions clientOptions = new MqttClientOptionsBuilder()
|
.WithClientId("EventechDemo02")
|
.WithTcpServer(MqttHostAdress, MqttHostPort)
|
.WithCredentials(MqttUserName, MqttUserPwd)
|
.WithCleanSession(false)//false 接收离线消息 服务端需要启用之久会话 WithPersistentSessions
|
//.WithWillMessage(new MqttApplicationMessage { Topic = "ooo", Payload = Encoding.UTF8.GetBytes("下线通知"), Retain = true })
|
.WithWillDelayInterval(1)
|
.Build();
|
IManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
|
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
|
.WithClientOptions(clientOptions)
|
.Build();
|
|
await _mqttClient.StartAsync(options);// 异步连接到服务器
|
|
// 订阅主题过滤器
|
MqttTopicFilter topicFilter = new MqttTopicFilterBuilder()
|
.WithTopic(TopicName)// 主题
|
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
|
.Build();
|
await _mqttClient.SubscribeAsync(topicFilter);
|
|
|
}
|
|
|
|
|
/*
|
1628644701211070464 电压
|
1628644747105144832 电流
|
1628644805007511552 功率
|
1628644845088280576 频率
|
1628644902701240320 温度
|
1628644990471245824 x加速度
|
1628645042027630592 Y加速度
|
1628645078274805760 Z加速度
|
*/
|
|
// 响应订阅主题的消息
|
public static void Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
|
{
|
if (e.ApplicationMessage == null)
|
return;
|
Console.WriteLine(">>> 收到消息:" + e.ApplicationMessage.ConvertPayloadToString() + ",来自客户端" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic);
|
|
var data = JsonHelper.Json2Object<Mqtt2Msg>(e.ApplicationMessage.ConvertPayloadToString());
|
if (data == null)
|
{
|
LogCustomHelper.Error("MQTT2 data 为空");
|
return;
|
}
|
if (string.IsNullOrEmpty(data.msg))
|
{
|
LogCustomHelper.Error("MQTT2 data.msg 为空");
|
return;
|
}
|
|
var bytes = StringToHexValuve(data.msg, out string err)?.ToArray();
|
LogCustomHelper.Info($"MQTT2 msg:{BitConverter.ToString(bytes)}");
|
if (bytes == null || bytes.Count() < 1)
|
return;
|
var list = new List<Model.MonitorDataDockingReceiveRecord>();
|
|
var record = new Model.MonitorDataDockingReceiveRecord();
|
record.SrcTime = DateTime.Now;
|
record.RecordType = Model.eMonitorType.General;
|
|
var byte4 = new byte[4];
|
var byte2 = new byte[2];
|
|
//电压
|
Array.Copy(bytes, 0, byte4, 0, 4);
|
var dy = Bytes2Double(byte4.Reverse().ToArray());
|
record.SysId = 1628644701211070464;
|
record.SrcValue = dy.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//电流
|
Array.Copy(bytes, 4, byte4, 0, 4);
|
var dl = Bytes2Double(byte4.Reverse().ToArray());
|
record.SysId = 1628644747105144832;
|
record.SrcValue = dl.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//功率
|
Array.Copy(bytes, 8, byte4, 0, 4);
|
var gl = Bytes2Double(byte4.Reverse().ToArray());
|
record.SysId = 1628644805007511552;
|
record.SrcValue = gl.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//频率
|
Array.Copy(bytes, 12, byte4, 0, 4);
|
var pl = Bytes2Double(byte4.Reverse().ToArray());
|
record.SysId = 1628644845088280576;
|
record.SrcValue = pl.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//温度*10
|
Array.Copy(bytes, 16, byte2, 0, 2);
|
var wd = BitConverter.ToInt16(byte2.Reverse().ToArray());
|
record.SysId = 1628644902701240320;
|
record.SrcValue = (wd / 10.0).ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//x加速度
|
Array.Copy(bytes, 18, byte2, 0, 2);
|
var xjsd = bytesToInt2(byte2.Reverse().ToArray());
|
record.SysId = 1628644990471245824;
|
record.SrcValue = xjsd.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//Y加速度
|
Array.Copy(bytes, 20, byte2, 0, 2);
|
var yjsd = bytesToInt2(byte2.Reverse().ToArray());
|
record.SysId = 1628645042027630592;
|
record.SrcValue = yjsd.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
//Z加速度
|
Array.Copy(bytes, 22, byte2, 0, 2);
|
var zjsd = bytesToInt2(byte2.Reverse().ToArray());
|
record.SysId = 1628645078274805760;
|
record.SrcValue = zjsd.ToString();
|
list.Add(new Model.MonitorDataDockingReceiveRecord(record));
|
|
HandleDataHelper.Add(list);
|
}
|
|
/// <summary>
|
/// 字符串转16进制字符2
|
/// </summary>
|
static List<byte> StringToHexValuve(string content, out string error_info)
|
{
|
|
//去掉空格
|
string[] arr = content.Split(2);
|
if (arr.Length < 3)
|
{
|
arr = content.Split('-');
|
if (arr.Length < 3)
|
{
|
error_info = "字符无法解析";
|
return null;
|
}
|
}
|
List<byte> result = new List<byte>();
|
for (int i = 0; i < arr.Length; i++)
|
{
|
if (string.IsNullOrWhiteSpace(arr[i]))
|
continue;
|
|
var dddd = Convert.ToByte(arr[i], 16);
|
result.Add(dddd);
|
}
|
error_info = null;
|
return result;
|
}
|
|
static string[] Split(this string str, int count)
|
{
|
var list = new List<string>();
|
int length = (int)Math.Ceiling((double)str.Length / count);
|
|
for (int i = 0; i < length; i++)
|
{
|
int start = count * i;
|
if (str.Length <= start)
|
{
|
break;
|
}
|
if (str.Length < start + count)
|
{
|
list.Add(str.Substring(start));
|
}
|
else
|
{
|
list.Add(str.Substring(start, count));
|
}
|
}
|
|
return list.ToArray();
|
}
|
|
/**
|
* byte数组中取int数值,本方法适用于(低位在后,高位在前)的顺序。2个字节
|
*/
|
public static int bytesToInt2(byte[] src, int offset = 0)
|
{
|
int value;
|
value =
|
(src[offset + 0] & 0xFF) << 8
|
| src[offset + 1] & 0xFF;
|
return value;
|
}
|
/**
|
* byte数组中取int数值,本方法适用于(低位在后,高位在前)的顺序。4个字节
|
*/
|
public static int bytesToInt4(byte[] src, int offset = 0)
|
{
|
int value;
|
value = (src[offset] & 0xFF) << 24
|
| (src[offset + 1] & 0xFF) << 16
|
| (src[offset + 2] & 0xFF) << 8
|
| src[offset + 3] & 0xFF;
|
return value;
|
}
|
|
private static float Bytes2Double(byte[] value, int offset = 0)
|
{
|
return BitConverter.ToSingle(value, 0);//采用了IEEE-754二进制浮点数算术标准
|
}
|
|
}
|
}
|