EMQ介绍
  EMQ (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平台开发的开源物联网 MQTT
消息服务器。Erlang/OTP 是出色的软实时(Soft-Realtime)、低延时(Low-Latency)、分布式(Distributed)
的语言平台。MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网(IoT)消息协议。   订阅(pub)/发布(sub)模式  
消息队列中的广播(fanout)模式     轻量化:docker镜像都才88.4MB    
一些关于常用EMQTT的快速链接:
官网API地址:http://www.emqtt.com/docs/v2/
<https://yq.aliyun.com/go/articleRenderRedirect?url=http%3A%2F%2Fwww.emqtt.com%2Fdocs%2Fv2%2F>
开源项目地址:https://github.com/emqtt
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fgithub.com%2Femqtt>
Docker安装模式:http://www.emqtt.com/docs/v2/install.html#docker
<https://yq.aliyun.com/go/articleRenderRedirect?url=http%3A%2F%2Fwww.emqtt.com%2Fdocs%2Fv2%2Finstall.html%23docker>
MQTT介绍和场景:https://www.mqtt.com/
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.mqtt.com%2F>
 
安装过程
假如你的centos上已经安装了Docker,并pull了devicexx/emqttd这个镜像,输入如下命令 docker run -dit
--name=sample_emqtt --restart=always -p18083:18083 -p 1883:1883 -p 8083:8083 -p
8883:8883 998429a869e8
 
  确保映射如下几个端口 1883、8083、8883:这三个是基于EMQTT传输通讯的端口 18083:这个是EMQTT Web控制台的端口  
我的虚拟机IP网段是153.132,直接输入http://192.168.153.132:18083进入Web控制台,默认用户名和密码是admin,
public 至此EMQTT已经顺利安装完成。  
在NET Core中使用EMQ:
emqtt官网没有提供.net的客户端,在nuget和github上找到了实现了MQTT协议的公共组件。   MQTTnet nuget: 
https://www.nuget.org/packages/MQTTnet/
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.nuget.org%2Fpackages%2FMQTTnet%2F>
MQTTnet github: https://github.com/chkr1011/MQTTnet
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fgithub.com%2Fchkr1011%2FMQTTnet>
 
MQTTnet Client的几个主要事件:

ApplicationMessageReceived:MQTTnet中主要方法事件,当接收到消息的时候,该事件触发。包含Topic,Payload,Qos,Retain主要成员。
Connected:客户端成功连接时触发。 Disconnected:客户端丢失连接时触发。  
创建一个发布者
先创建一个发布者Publisher的配置类,代码简单: public static class PublisherConfig { ///
<summary> /// 创建mqtt的客户端接口实例 /// </summary> public static readonly IMqttClient
Client = new MqttFactory().CreateMqttClient(); /// <summary> /// 创建mqtt配置选项 ///
</summary> public static readonly MqttClientOptions ClientOptions = new
MqttClientOptions {// 通道选项 ChannelOptions = new MqttClientTcpOptions { Server =
"192.168.153.132" }, /* * 客户端ID *
在MQTTnet框架中,当ClientId未赋值,将使用默认的GUID生成默认的ClientId*/ ClientId = "Client_publisher"
,// 客户端认证 Credentials = new MqttClientCredentials { Username = "clientUser_01",
Password= "123123" } }; public const string Topic =
MQTT_Common.Config.Topic.Name; }
 

 为了测试方便,创建了一个全局固定的Topic名称
public static class Topic { public const string Name = "/WorldTopic"; }
 

再创建一个发布者Publisher,代码简单
public static class Publisher { private static readonly IMqttClient Client =
PublisherConfig.Client; private static readonly MqttClientOptions ClientOptions=
PublisherConfig.ClientOptions; private static readonlystring Topic =
PublisherConfig.Topic; public static void RunAsync() { try { Console.WriteLine("
publisher is running"); CreateConnection(); LoopInput().Wait(); } catch
(Exception exception) { Console.WriteLine(exception); } } private static void
CreateConnection() { CommonEventHandler.EventHandler(Client, ClientOptions);
CommonEventHandler.TryConnectionAsync(Client, ClientOptions); } private static
async Task LoopInput() {while (true) { await Client.SubscribeAsync("/World");
Console.WriteLine("输入消息数据:"); var r = Console.ReadLine(); var applicationMessage
= new MqttApplicationMessageBuilder() .WithTopic(Topic) // 设置主题 .WithPayload(r)
// 设置载荷(消息内容) .WithAtLeastOnceQoS() // 设置质量 .WithRetainFlag(false) // 设置持久化
.Build(); await Client.PublishAsync(applicationMessage); } }
 

 创建一个或多个订阅者
public static class SubscriberConfig01 { public static readonly IMqttClient
Client= new MqttFactory().CreateMqttClient(); public static readonly
MqttClientOptions ClientOptions= new MqttClientOptions { ChannelOptions = new
MqttClientTcpOptions { Server= "192.168.153.132" }, // 唯一需要修改的Client唯一ID
ClientId ="Client_01", Credentials = new MqttClientCredentials { //
用户可用多个,也可不启用客户端验证 Username = "clientUser_02", Password = "123123" } }; public
conststring Topic = MQTT_Common.Config.Topic.Name; }
 

MQTTnet中所有连接端都是Client,所以唯一的变化是ClientID这个值,可使用系统GUID自动生成,也可以使用不同的Client限定名。
public static class Subscriber01 { private static readonly IMqttClient Client =
SubscriberConfig01.Client;private static readonly MqttClientOptions
ClientOptions = SubscriberConfig01.ClientOptions; private static readonly string
Topic = SubscriberConfig01.Topic; public static async Task RunAsync() { try {
Console.WriteLine("subscriber 1 is running"); CreateConnection(); //
注册/订阅Topic主题 await Client.SubscribeAsync(Topic); Console.ReadKey(); } catch
(Exception exception) { Console.WriteLine(exception); } }private static void
CreateConnection() { CommonEventHandler.EventHandler(Client, ClientOptions);
CommonEventHandler.TryConnectionAsync(Client, ClientOptions); } }
 

公共函数CommonEventHandler,用于公共事件处理和连接尝试
public static class CommonEventHandler { /// <summary> /// 添加Client相关事件处理函数 ///
</summary> /// <param name="client">IMqttClient客户端</param> /// <param
name="clientOptions">MqttClientOptions配置选项</param> /// <param name="subscribe">
主题Topic名称</param> public static void EventHandler(IMqttClient client,
MqttClientOptions clientOptions,string subscribe = "#") {
client.ApplicationMessageReceived+= (s, e) => { Console.WriteLine("### 收到程序消息
###"); Console.WriteLine($"+ [主题]Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ [载荷]Payload =
{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); Console.WriteLine($"
+ [质量]QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"
+ [持久]Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); };
client.Connected+= async (s, e) => { Console.WriteLine("### 成功连接MQTT ###"); if
(!subscribe.Equals("#")) return; await client.SubscribeAsync(new
TopicFilterBuilder().WithTopic("#").Build()); Console.WriteLine($"###
成功订阅主题[{subscribe}] ###"); }; client.Disconnected += async (s, e) => {
Console.WriteLine("### 连接丢失 ###"); await Task.Delay(TimeSpan.FromSeconds(5));
try { await client.ConnectAsync(clientOptions); } catch { Console.WriteLine("
### 连接错误 ###"); } }; } /// <summary> /// 尝试Client连接到EMQTT /// </summary> ///
<param name="client">IMqttClient客户端</param> /// <param name="clientOptions">
MqttClientOptions配置选项</param> public static void TryConnectionAsync(IMqttClient
client, MqttClientOptions clientOptions) {try {
client.ConnectAsync(clientOptions).Wait(); }catch (Exception exception) {
Console.WriteLine("### 连接错误 ###" + Environment.NewLine + exception); } } }
 

配置三个客户端,一个作为发布者,两个作为订阅者,通过发布者输入任意数据,运行结果如下:

其中发布者客户端会收到一个Retain=True的消息,是因为之前有发布过一条持久化的消息,该消息已经存储在EMQTT中,每次启动均会主动向订阅过"/World"的订阅者再次推送该条持久化的消息。
根据生产消费的机制,此处消息理当已经被消费和清除,但不清楚为何EMQ仍然存在。  
总结
本篇简单介绍使用EMQTT的PubSub模式,并使用MQTTnet客户端连接并订阅和发布消息内容。    

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:ixiaoyang8@qq.com
QQ群:637538335
关注微信