ningshuxia
2023-02-23 036ce5eaab6126560cf9ca19b4a2783d42c2e191
Mqtt/IStation.DataDockingMqtt4SQI/Program.cs
@@ -1,4 +1,5 @@
using MQTTnet;
using IStation.DataDockingMqtt4SQI_DEMO;
using MQTTnet;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
@@ -17,143 +18,13 @@
        static void Main(string[] args)
        {
            Console.WriteLine("启动 上海质检所 MQTT!");
            Connect();
            /*Mqtt1.Connect();
            Mqtt2.Connect();*/
            Mqtt.Connect();
            Console.ReadLine();
        }
        static string TopicName = "MK-S906/up/Data/SHHM22100001";
        static string MqttUserName = "test";
        static string MqttUserPwd = "123456";
        static string MqttHostAdress = "106.14.83.94";
        static int  MqttHostPort = 8810;
        static async void Connect()
        {
            //string info = "{\"GW_MAC\":\"0080E10A4D34\",\"GW_IMEI\":\"867542055972080\",\"ICCID\":\"898608022221C0013505\",\"csq\":31,\"sensor\":{\"ID\":\"22100005\",\"name\":\"22100005\",\"rssi\":-29,\"battery\":100,\"type\":\"N\",\"data\":{\"value\":[32],\"unit\":[\"dB\"]},\"time\":1677072683,\"alarm\":\"N\"}}";
            //IStation.UserDto.MqttMsg data = JsonHelper.Json2Object<IStation.UserDto.MqttMsg>(info);
            //if (data != null && data.sensor != null)
            //{
            //    var t = data.sensor.GetTime();
            //}
            // 连接到MQTT服务器
            IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
            _mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
            {
                Console.WriteLine("链接 MQTT 服务器成功!");
            });
            _mqttClient.ConnectingFailedHandler = new ClientConnectingFaildHandler(e =>
            {
                Console.WriteLine("链接 MQTT 服务器失败!");
            });
            _mqttClient.UseDisconnectedHandler(ee =>
            {
                Console.WriteLine("链接 MQTT 服务器断开连接!");
            });
            _mqttClient.UseConnectedHandler(ee =>
            {
                Console.WriteLine("链接 MQTT 连接到服务!");
            });
            //client.DisconnectedHandler
            _mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived);
            IMqttClientOptions clientOptions = new MqttClientOptionsBuilder()
                .WithClientId("EventechDemo01")
                .WithTcpServer(MqttHostAdress, MqttHostPort)
                .WithCredentials(MqttUserName, MqttUserPwd)
                .WithCleanSession(false)//false 接收离线消息 服务端需要启用之久会话 WithPersistentSessions
                //.WithWillMessage(new MqttApplicationMessage { Topic = "ooo", Payload = Encoding.UTF8.GetBytes("下线通知"), Retain = true })
                .WithWillDelayInterval(1)
                .Build();
            IManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
                .WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
                .WithClientOptions(clientOptions)
                .Build();
            await _mqttClient.StartAsync(options);// 异步连接到服务器
            // 订阅主题过滤器
            MqttTopicFilter topicFilter = new MqttTopicFilterBuilder()
                .WithTopic(TopicName)// 主题
                .WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
                .Build();
            await _mqttClient.SubscribeAsync(topicFilter);
            //client.UnsubscribeAsync()
            //await Task.Delay(1000);
            //// 发布()     TCP  点对点
            //// 针对某个主题发布   需要指定主题
            //// 针对这个主题的内容(负载)
            //string msg = "Hello Eventech";
            //MqttApplicationMessage message = new MqttApplicationMessageBuilder()
            //    .WithTopic("test")
            //    .WithPayload(Encoding.Default.GetBytes(msg))
            //    .WithRetainFlag(false)
            //    .Build();
            //await client.PublishAsync(message);
        }
        // 响应订阅主题的消息
        static void Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
        {
            if (e.ApplicationMessage == null)
                return;
            Console.WriteLine(">>> 收到消息:" + e.ApplicationMessage.ConvertPayloadToString());// + ",来自客户端" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic);
            //try
            //{
            //    {Encoding.UTF8.GetString(ee.ApplicationMessage.Payload)}");
            //}
            //catch { }
            var data = JsonHelper.Json2Object<IStation.UserDto.MqttMsg>(e.ApplicationMessage.ConvertPayloadToString());
            if(data != null && data.sensor != null && data.sensor.data != null && data.sensor.data.value != null
                && data.sensor.data.value.Count>0)
            {
                //1620681990539972608 进口压力
                //1620682095074611200 出口压力
                //1620682379477782528 温度
                //1620682549540032512 出口瞬时流量
                //1620684539431096320 噪声
                IStation.Model.MonitorDataDockingSrcRecord r = new Model.MonitorDataDockingSrcRecord();
                r.RecordType = Model.eMonitorType.General;
                r.SrcTime = data.sensor.GetTime();
                    //DateTime.Now;
                r.SrcValue = data.sensor.data.value[0].ToString();
                var id = data.sensor.ID;
                if(id == "22100004")
                {//流量
                    r.SignId = "1620682549540032512";
                    HandleDataHelper.Add(r);
                }
                if (id == "22100002")
                {//出口压力
                    r.SignId = "1620682095074611200";
                    HandleDataHelper.Add(r);
                }
                if (id == "22100005")
                {//噪音
                    r.SignId = "1620684539431096320";
                    HandleDataHelper.Add(r);
                }
                if (id == "22100003")
                {//温度
                    r.SignId = "1620682379477782528";
                    HandleDataHelper.Add(r);
                }
            }
        }
    }
    class ClientConnectingFaildHandler : IConnectingFailedHandler
    {
        Action<ManagedProcessFailedEventArgs> action;