using IStation.DataDockingMqtt; using MQTTnet.Client.Connecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace IStation.DataDockingMqtt4SQI_DEMO.sub { public class Mqtt1 { static string TopicName = "MK-S906/up/Data/SHHM22100001"; static string MqttUserName = "test"; static string MqttUserPwd = "123456"; static string MqttHostAdress = "106.14.83.94"; static int MqttHostPort = 8810; public static async void Connect() { //连接到MQTT服务器 IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient(); _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => { Console.WriteLine("链接 MQTT1 服务器成功!"); }); _mqttClient.ConnectingFailedHandler = new ClientConnectingFaildHandler(e => { Console.WriteLine("链接 MQTT1 服务器失败!"); }); _mqttClient.UseDisconnectedHandler(ee => { Console.WriteLine("链接 MQTT1 服务器断开连接!"); }); _mqttClient.UseConnectedHandler(ee => { Console.WriteLine("链接 MQTT1 连接到服务!"); }); //client.DisconnectedHandler _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived); IMqttClientOptions clientOptions = new MqttClientOptionsBuilder() .WithClientId("EventechDemo01") .WithTcpServer(MqttHostAdress, MqttHostPort) .WithCredentials(MqttUserName, MqttUserPwd) .WithCleanSession(false)//false 接收离线消息 服务端需要启用之久会话 WithPersistentSessions //.WithWillMessage(new MqttApplicationMessage { Topic = "ooo", Payload = Encoding.UTF8.GetBytes("下线通知"), Retain = true }) .WithWillDelayInterval(1) .Build(); IManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions(clientOptions) .Build(); await _mqttClient.StartAsync(options);// 异步连接到服务器 // 订阅主题过滤器 MqttTopicFilter topicFilter = new MqttTopicFilterBuilder() .WithTopic(TopicName)// 主题 .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) .Build(); await _mqttClient.SubscribeAsync(topicFilter); //client.UnsubscribeAsync() //await Task.Delay(1000); //// 发布() TCP 点对点 //// 针对某个主题发布 需要指定主题 //// 针对这个主题的内容(负载) //string msg = "Hello Eventech"; //MqttApplicationMessage message = new MqttApplicationMessageBuilder() // .WithTopic("test") // .WithPayload(Encoding.Default.GetBytes(msg)) // .WithRetainFlag(false) // .Build(); //await client.PublishAsync(message); } // 响应订阅主题的消息 public static void Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { if (e.ApplicationMessage == null) return; Console.WriteLine(">>> 收到消息:" + e.ApplicationMessage.ConvertPayloadToString() + ",来自客户端" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic); var data = JsonHelper.Json2Object(e.ApplicationMessage.ConvertPayloadToString()); if (data != null && data.sensor != null && data.sensor.data != null && data.sensor.data.value != null && data.sensor.data.value.Count > 0) { //1620681990539972608 进口压力 //1620682095074611200 出口压力 //1620682379477782528 温度 //1620682549540032512 出口瞬时流量 //1620684539431096320 噪声 //1628644701211070464 电压 //1628644747105144832 电流 //1628644805007511552 功率 //1628644845088280576 频率 //1628644902701240320 温度 //1628644990471245824 x加速度 //1628645042027630592 Y加速度 //1628645078274805760 Z加速度 Model.MonitorDataDockingReceiveRecord r = new Model.MonitorDataDockingReceiveRecord(); r.RecordType = Model.eMonitorType.General; r.SrcTime = data.sensor.GetTime(); r.SrcValue = data.sensor.data.value[0].ToString(); var id = data.sensor.ID; if (id == "22100004") {//流量 r.SysId = 1620682549540032512; HandleDataHelper.Add(r); } if (id == "22100002") {//出口压力 r.SysId = 1620682095074611200; HandleDataHelper.Add(r); } if (id == "22100005") {//噪音 r.SysId = 1620684539431096320; HandleDataHelper.Add(r); } if (id == "22100003") {//温度 r.SysId = 1620682379477782528; HandleDataHelper.Add(r); } if (id == "23020001") { //进口压力 r.SysId = 1620681990539972608; HandleDataHelper.Add(r); } } } } }