using IStation.DataDockingMqtt; using MQTTnet.Client.Connecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IStation.DataDockingMqtt4SQI_DEMO.sub { public static class Mqtt2 { static string TopicName = "tkg07342A3521633000/msg/up"; static string MqttUserName = "test"; static string MqttUserPwd = "123456"; static string MqttHostAdress = "106.14.83.94";//172.19.0.218 static int MqttHostPort = 8810; public static async void Connect() { //连接到MQTT服务器 IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient(); _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => { Console.WriteLine("链接 MQTT2 服务器成功!"); }); _mqttClient.ConnectingFailedHandler = new ClientConnectingFaildHandler(e => { Console.WriteLine("链接 MQTT2 服务器失败!"); }); _mqttClient.UseDisconnectedHandler(ee => { Console.WriteLine("链接 MQTT2 服务器断开连接!"); }); _mqttClient.UseConnectedHandler(ee => { Console.WriteLine("链接 MQTT2 连接到服务!"); }); _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived); IMqttClientOptions clientOptions = new MqttClientOptionsBuilder() .WithClientId("EventechDemo02") .WithTcpServer(MqttHostAdress, MqttHostPort) .WithCredentials(MqttUserName, MqttUserPwd) .WithCleanSession(false)//false 接收离线消息 服务端需要启用之久会话 WithPersistentSessions //.WithWillMessage(new MqttApplicationMessage { Topic = "ooo", Payload = Encoding.UTF8.GetBytes("下线通知"), Retain = true }) .WithWillDelayInterval(1) .Build(); IManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions(clientOptions) .Build(); await _mqttClient.StartAsync(options);// 异步连接到服务器 // 订阅主题过滤器 MqttTopicFilter topicFilter = new MqttTopicFilterBuilder() .WithTopic(TopicName)// 主题 .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) .Build(); await _mqttClient.SubscribeAsync(topicFilter); } /* 1628644701211070464 电压 1628644747105144832 电流 1628644805007511552 功率 1628644845088280576 频率 1628644902701240320 温度 1628644990471245824 x加速度 1628645042027630592 Y加速度 1628645078274805760 Z加速度 */ // 响应订阅主题的消息 public static void Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { if (e.ApplicationMessage == null) return; Console.WriteLine(">>> 收到消息:" + e.ApplicationMessage.ConvertPayloadToString() + ",来自客户端" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic); var data = JsonHelper.Json2Object(e.ApplicationMessage.ConvertPayloadToString()); if (data == null) { LogCustomHelper.Error("MQTT2 data 为空"); return; } if (string.IsNullOrEmpty(data.msg)) { LogCustomHelper.Error("MQTT2 data.msg 为空"); return; } var bytes = StringToHexValuve(data.msg, out string err)?.ToArray(); LogCustomHelper.Info($"MQTT2 msg:{BitConverter.ToString(bytes)}"); if (bytes == null || bytes.Count() < 1) return; var list = new List(); var record = new Model.MonitorDataDockingReceiveRecord(); record.SrcTime = DateTime.Now; record.RecordType = Model.eMonitorType.General; var byte4 = new byte[4]; var byte2 = new byte[2]; //电压 Array.Copy(bytes, 0, byte4, 0, 4); var dy = Bytes2Double(byte4.Reverse().ToArray()); record.SysId = 1628644701211070464; record.SrcValue = dy.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //电流 Array.Copy(bytes, 4, byte4, 0, 4); var dl = Bytes2Double(byte4.Reverse().ToArray()); record.SysId = 1628644747105144832; record.SrcValue = dl.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //功率 Array.Copy(bytes, 8, byte4, 0, 4); var gl = Bytes2Double(byte4.Reverse().ToArray()); record.SysId = 1628644805007511552; record.SrcValue = gl.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //频率 Array.Copy(bytes, 12, byte4, 0, 4); var pl = Bytes2Double(byte4.Reverse().ToArray()); record.SysId = 1628644845088280576; record.SrcValue = pl.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //温度*10 Array.Copy(bytes, 16, byte2, 0, 2); var wd = BitConverter.ToInt16(byte2.Reverse().ToArray()); record.SysId = 1628644902701240320; record.SrcValue = (wd / 10.0).ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //x加速度 Array.Copy(bytes, 18, byte2, 0, 2); var xjsd = bytesToInt2(byte2.Reverse().ToArray()); record.SysId = 1628644990471245824; record.SrcValue = xjsd.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //Y加速度 Array.Copy(bytes, 20, byte2, 0, 2); var yjsd = bytesToInt2(byte2.Reverse().ToArray()); record.SysId = 1628645042027630592; record.SrcValue = yjsd.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); //Z加速度 Array.Copy(bytes, 22, byte2, 0, 2); var zjsd = bytesToInt2(byte2.Reverse().ToArray()); record.SysId = 1628645078274805760; record.SrcValue = zjsd.ToString(); list.Add(new Model.MonitorDataDockingReceiveRecord(record)); HandleDataHelper.Add(list); } /// /// 字符串转16进制字符2 /// static List StringToHexValuve(string content, out string error_info) { //去掉空格 string[] arr = content.Split(2); if (arr.Length < 3) { arr = content.Split('-'); if (arr.Length < 3) { error_info = "字符无法解析"; return null; } } List result = new List(); for (int i = 0; i < arr.Length; i++) { if (string.IsNullOrWhiteSpace(arr[i])) continue; var dddd = Convert.ToByte(arr[i], 16); result.Add(dddd); } error_info = null; return result; } static string[] Split(this string str, int count) { var list = new List(); int length = (int)Math.Ceiling((double)str.Length / count); for (int i = 0; i < length; i++) { int start = count * i; if (str.Length <= start) { break; } if (str.Length < start + count) { list.Add(str.Substring(start)); } else { list.Add(str.Substring(start, count)); } } return list.ToArray(); } /** * byte数组中取int数值,本方法适用于(低位在后,高位在前)的顺序。2个字节 */ public static int bytesToInt2(byte[] src, int offset = 0) { int value; value = (src[offset + 0] & 0xFF) << 8 | src[offset + 1] & 0xFF; return value; } /** * byte数组中取int数值,本方法适用于(低位在后,高位在前)的顺序。4个字节 */ public static int bytesToInt4(byte[] src, int offset = 0) { int value; value = (src[offset] & 0xFF) << 24 | (src[offset + 1] & 0xFF) << 16 | (src[offset + 2] & 0xFF) << 8 | src[offset + 3] & 0xFF; return value; } private static float Bytes2Double(byte[] value, int offset = 0) { return BitConverter.ToSingle(value, 0);//采用了IEEE-754二进制浮点数算术标准 } } }