using IStation.DataDockingMqtt; using MQTTnet.Client.Connecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using IStation.DataDockingMqtt4SQI_DEMO.sub; namespace IStation.DataDockingMqtt4SQI_DEMO { public class Mqtt { static string TopicName = "MK-S906/up/Data/SHHM22100001"; static string TopicName2 = "tkg07342A3521633000/msg/up"; static string MqttUserName = "test"; static string MqttUserPwd = "123456"; static string MqttHostAdress = "106.14.83.94"; static int MqttHostPort = 8810; public static async void Connect() { //连接到MQTT服务器 IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient(); _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e => { Console.WriteLine("链接 MQTT 服务器成功!"); }); _mqttClient.ConnectingFailedHandler = new ClientConnectingFaildHandler(e => { Console.WriteLine("链接 MQTT 服务器失败!"); }); _mqttClient.UseDisconnectedHandler(ee => { Console.WriteLine("链接 MQTT 服务器断开连接!"); }); _mqttClient.UseConnectedHandler(ee => { Console.WriteLine("链接 MQTT 连接到服务!"); }); //client.DisconnectedHandler _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived); IMqttClientOptions clientOptions = new MqttClientOptionsBuilder() .WithClientId("EventechDemo01") .WithTcpServer(MqttHostAdress, MqttHostPort) .WithCredentials(MqttUserName, MqttUserPwd) .WithCleanSession(false)//false 接收离线消息 服务端需要启用之久会话 WithPersistentSessions //.WithWillMessage(new MqttApplicationMessage { Topic = "ooo", Payload = Encoding.UTF8.GetBytes("下线通知"), Retain = true }) .WithWillDelayInterval(1) .Build(); IManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder() .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .WithClientOptions(clientOptions) .Build(); await _mqttClient.StartAsync(options);// 异步连接到服务器 // 订阅主题过滤器1 MqttTopicFilter topicFilter = new MqttTopicFilterBuilder() .WithTopic(TopicName)// 主题 .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) .Build(); await _mqttClient.SubscribeAsync(topicFilter); // 订阅主题过滤器2 MqttTopicFilter topicFilter2 = new MqttTopicFilterBuilder() .WithTopic(TopicName2)// 主题 .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce) .Build(); await _mqttClient.SubscribeAsync(topicFilter2); } static void Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e) { if (e.ApplicationMessage == null) return; if (e.ApplicationMessage.Topic == TopicName) { Mqtt1.Client_ApplicationMessageReceived(e); } else { Mqtt2.Client_ApplicationMessageReceived(e); } } } }