using MQTTnet;
|
using MQTTnet.Client.Connecting;
|
using MQTTnet.Client.Options;
|
using MQTTnet.Client.Receiving;
|
using MQTTnet.Extensions.ManagedClient;
|
using MQTTnet.Server;
|
using System;
|
using System.Collections.Generic;
|
using System.Diagnostics.Tracing;
|
using System.Text;
|
using System.Threading.Tasks;
|
|
namespace IStation.DataDockingMqtt
|
{
|
internal class Program
|
{
|
static void Main(string[] args)
|
{
|
Console.WriteLine("启动 上海质检所 MQTT!");
|
|
Connect();
|
|
Console.ReadLine();
|
}
|
|
|
static string TopicName = "MK-S906/up/Data/SHHM22100001";
|
static string MqttUserName = "test";
|
static string MqttUserPwd = "123456";
|
static string MqttHostAdress = "106.14.83.94";
|
static int MqttHostPort = 8810;
|
|
static async void Connect()
|
{
|
//string info = "{\"GW_MAC\":\"0080E10A4D34\",\"GW_IMEI\":\"867542055972080\",\"ICCID\":\"898608022221C0013505\",\"csq\":31,\"sensor\":{\"ID\":\"22100005\",\"name\":\"22100005\",\"rssi\":-29,\"battery\":100,\"type\":\"N\",\"data\":{\"value\":[32],\"unit\":[\"dB\"]},\"time\":1677072683,\"alarm\":\"N\"}}";
|
//IStation.UserDto.MqttMsg data = JsonHelper.Json2Object<IStation.UserDto.MqttMsg>(info);
|
|
//if (data != null && data.sensor != null)
|
//{
|
// var t = data.sensor.GetTime();
|
//}
|
|
|
|
// 连接到MQTT服务器
|
IManagedMqttClient _mqttClient = new MqttFactory().CreateManagedMqttClient();
|
_mqttClient.ConnectedHandler = new MqttClientConnectedHandlerDelegate(e =>
|
{
|
Console.WriteLine("链接 MQTT 服务器成功!");
|
});
|
_mqttClient.ConnectingFailedHandler = new ClientConnectingFaildHandler(e =>
|
{
|
Console.WriteLine("链接 MQTT 服务器失败!");
|
});
|
_mqttClient.UseDisconnectedHandler(ee =>
|
{
|
Console.WriteLine("链接 MQTT 服务器断开连接!");
|
});
|
_mqttClient.UseConnectedHandler(ee =>
|
{
|
Console.WriteLine("链接 MQTT 连接到服务!");
|
});
|
//client.DisconnectedHandler
|
_mqttClient.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceived);
|
|
|
IMqttClientOptions clientOptions = new MqttClientOptionsBuilder()
|
.WithClientId("EventechDemo01")
|
.WithTcpServer(MqttHostAdress, MqttHostPort)
|
.WithCredentials(MqttUserName, MqttUserPwd)
|
.WithCleanSession(false)//false 接收离线消息 服务端需要启用之久会话 WithPersistentSessions
|
//.WithWillMessage(new MqttApplicationMessage { Topic = "ooo", Payload = Encoding.UTF8.GetBytes("下线通知"), Retain = true })
|
.WithWillDelayInterval(1)
|
.Build();
|
IManagedMqttClientOptions options = new ManagedMqttClientOptionsBuilder()
|
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
|
.WithClientOptions(clientOptions)
|
.Build();
|
|
await _mqttClient.StartAsync(options);// 异步连接到服务器
|
|
|
// 订阅主题过滤器
|
MqttTopicFilter topicFilter = new MqttTopicFilterBuilder()
|
.WithTopic(TopicName)// 主题
|
.WithQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce)
|
.Build();
|
await _mqttClient.SubscribeAsync(topicFilter);
|
//client.UnsubscribeAsync()
|
|
|
//await Task.Delay(1000);
|
//// 发布() TCP 点对点
|
//// 针对某个主题发布 需要指定主题
|
//// 针对这个主题的内容(负载)
|
//string msg = "Hello Eventech";
|
//MqttApplicationMessage message = new MqttApplicationMessageBuilder()
|
// .WithTopic("test")
|
// .WithPayload(Encoding.Default.GetBytes(msg))
|
// .WithRetainFlag(false)
|
// .Build();
|
//await client.PublishAsync(message);
|
}
|
|
// 响应订阅主题的消息
|
static void Client_ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs e)
|
{
|
if (e.ApplicationMessage == null)
|
return;
|
Console.WriteLine(">>> 收到消息:" + e.ApplicationMessage.ConvertPayloadToString());// + ",来自客户端" + e.ClientId + ",主题:" + e.ApplicationMessage.Topic);
|
//try
|
//{
|
// {Encoding.UTF8.GetString(ee.ApplicationMessage.Payload)}");
|
//}
|
//catch { }
|
|
var data = JsonHelper.Json2Object<IStation.UserDto.MqttMsg>(e.ApplicationMessage.ConvertPayloadToString());
|
if(data != null && data.sensor != null && data.sensor.data != null && data.sensor.data.value != null
|
&& data.sensor.data.value.Count>0)
|
{
|
//1620681990539972608 进口压力
|
//1620682095074611200 出口压力
|
//1620682379477782528 温度
|
//1620682549540032512 出口瞬时流量
|
//1620684539431096320 噪声
|
IStation.Model.MonitorDataDockingSrcRecord r = new Model.MonitorDataDockingSrcRecord();
|
r.RecordType = Model.eMonitorType.General;
|
r.SrcTime = data.sensor.GetTime();
|
//DateTime.Now;
|
r.SrcValue = data.sensor.data.value[0].ToString();
|
|
var id = data.sensor.ID;
|
if(id == "22100004")
|
{//流量
|
r.SignId = "1620682549540032512";
|
HandleDataHelper.Add(r);
|
}
|
if (id == "22100002")
|
{//出口压力
|
r.SignId = "1620682095074611200";
|
HandleDataHelper.Add(r);
|
}
|
if (id == "22100005")
|
{//噪音
|
r.SignId = "1620684539431096320";
|
HandleDataHelper.Add(r);
|
}
|
if (id == "22100003")
|
{//温度
|
r.SignId = "1620682379477782528";
|
HandleDataHelper.Add(r);
|
}
|
}
|
}
|
}
|
|
class ClientConnectingFaildHandler : IConnectingFailedHandler
|
{
|
Action<ManagedProcessFailedEventArgs> action;
|
|
public ClientConnectingFaildHandler(Action<ManagedProcessFailedEventArgs> a)
|
{
|
action = a;
|
}
|
public Task HandleConnectingFailedAsync(ManagedProcessFailedEventArgs eventArgs)
|
{
|
action(eventArgs);
|
return Task.CompletedTask;
|
}
|
}
|
}
|