跳到主要内容
版本:3.1

Mqtt客户端使用

定义

命名空间:TouchSocket.Mqtt
程序集:TouchSocket.Mqtt.dll

一、说明

MqttClient是遵循Mqtt协议的消息客户端,支持连接Mqtt Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级。

二、特点

  • 轻量级协议支持
  • QoS消息质量保障
  • 遗嘱消息支持
  • 保留消息处理
  • TLS加密通信
  • 主题通配符匹配(+/#)

三、应用场景

  • IoT设备数据上报
  • 跨平台消息推送
  • 低带宽环境通信
  • 设备状态同步

四、可配置项

继承所有TcpClient的配置。除此之外还支持Mqtt的一些专有配置。

可配置项

SetMqttConnectOptions

MqttConnectOptions 类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数,支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明:

基础连接选项(通用)

属性名Mqtt 版本支持说明
CleanSession3.1.1+是否清理会话(断开后删除未完成的遗嘱和离线消息)。
ClientId3.1.1+客户端唯一标识符(服务端用于识别客户端);Mqtt 3.1.1 中必填(长度 1-23 字节,ASCII 字符);5.0+ 允许空字符串(仅当服务端允许时)。
KeepAlive3.1.1+心跳保活时长(单位:秒),表示客户端与服务端保持连接的最长时间;需与服务端配置兼容(服务端可能拒绝过大或过小的值),默认值通常为 60 秒。
Password3.1.1+连接认证密码(与 UserName 配合使用);若 UserName 为空则无效;5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。
ProtocolName3.1.1+Mqtt 协议名称(固定为 "MQTT"),默认值为 "MQTT",通常无需手动设置。
UserName3.1.1+连接认证用户名(与 Password 配合使用);5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。
Version3.1.1+/5.0+Mqtt 协议版本(如 MqttProtocolVersion.V311V500);需根据服务端支持的版本设置,否则可能导致连接失败。

遗嘱(Will)消息配置(可选)

WillFlagtrue 时,服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 WillFlag=true 时生效。

属性名Mqtt 版本支持说明
WillFlag3.1.1+是否启用遗嘱消息(若为 true,需配置遗嘱相关属性)。
WillQos3.1.1+遗嘱消息的服务质量等级(0/1/2);需与服务端支持的 QoS 等级兼容。
WillRetain3.1.1+是否保留遗嘱消息(服务端保留最新遗嘱消息供新订阅者接收);默认值为 false;若为 true,服务端需支持保留消息功能。
WillMessage3.1.1+遗嘱消息的内容(文本类型,Mqtt 5.0+ 支持二进制数据,需通过 WillContentTypeWillPayloadFormatIndicator 配合)。
WillTopic3.1.1+遗嘱消息发布的主题(需符合 Mqtt 主题命名规则);主题需提前订阅才能接收遗嘱消息。
WillContentType5.0+遗嘱消息内容的 MIME 类型(如 "text/plain"、"application/json");描述 WillMessage 的格式(仅 5.0+ 有效)。
WillPayloadFormatIndicator5.0+遗嘱消息负载格式指示符(如 RawUtf8);更细粒度描述负载格式(仅 5.0+ 有效)。
WillResponseTopic5.0+遗嘱消息触发后,服务端向客户端发送响应的主题;需客户端提前订阅该主题以接收响应。
WillCorrelationData5.0+遗嘱消息关联的上下文数据(用于匹配请求与响应);需与服务端约定格式(仅 5.0+ 有效)。
WillDelayInterval5.0+遗嘱消息延迟发布的时间间隔(单位:秒);遗嘱触发后,延迟指定时间再发布(仅 5.0+ 有效)。
WillMessageExpiryInterval5.0+遗嘱消息的有效期(单位:秒,过期后服务端删除);超过此时间未发布的遗嘱消息将被丢弃(仅 5.0+ 有效)。

Mqtt 5.0+ 扩展配置(可选)

仅当使用 Mqtt 5.0+ 协议时生效,用于更细粒度的连接控制。

属性名Mqtt 版本支持说明
UserProperties5.0+用户自定义属性(键值对,用于传递业务元数据);键值对需符合 Mqtt 5.0 规范(键为 UTF-8 字符串,值支持多种类型)。
AuthenticationMethod5.0+认证方法名称(如 "OAuth2"、"Custom");需与服务端支持的认证方法一致(仅 5.0+ 支持扩展认证)。
AuthenticationData5.0+认证方法的附加数据(如令牌、签名等);配合 AuthenticationMethod 使用(仅 5.0+ 有效)。
MaximumPacketSize5.0+客户端请求的最大数据包大小(单位:字节);服务端会返回其支持的最大值(客户端需遵守)。
ReceiveMaximum5.0+客户端能接收的最大 QoS 1/2 消息数量(防止消息堆积);默认值通常为 65535(需与服务端配置兼容)。
RequestProblemInformation5.0+是否请求服务端在出错时返回详细问题信息(如原因码、原因字符串);默认值为 true(建议开启以便排查问题)。
RequestResponseInformation5.0+是否请求服务端返回连接响应的额外信息(如会话过期时间);默认值为 false(仅当需要时启用)。
SessionExpiryInterval5.0+会话过期时间间隔(单位:秒,0 表示立即过期);客户端断开后,服务端保留会话的时间(仅当 CleanSession=false 时有效)。
TopicAliasMaximum5.0+客户端允许的最大主题别名值(服务站不能使用超过此值的别名);默认值为 0(表示不使用主题别名)。

注意事项

  1. 协议版本兼容性:部分属性(如 UserPropertiesAuthenticationMethod)仅在 Mqtt 5.0+ 中有效,需根据服务端版本选择配置。
  2. 遗嘱消息生效条件WillFlag 必须为 true,且 WillTopicWillMessage 需有效配置,否则遗嘱不会被发送。
  3. 认证扩展:5.0+ 的 AuthenticationData 需配合自定义认证插件使用,具体格式由服务端定义。
  4. 只读属性WillMessageWillTopic 标记为 internal set,通常通过构造函数或专用方法设置(如 MqttApplicationMessage)。

五、支持插件

插件方法功能
IMqttConnectingPlugin当Mqtt客户端正在连接之前调用此方法。
IMqttConnectedPlugin当Mqtt客户端连接成功时调用。
IMqttClosingPlugin当Mqtt客户端正在关闭时调用。
IMqttClosedPlugin当Mqtt客户端断开连接后触发。
IMqttReceivingPlugin在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。
IMqttReceivedPlugin当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。

六、创建Mqtt客户端

var client = new MqttTcpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("tcp://127.0.0.1:7789")
.SetMqttConnectOptions(options =>
{
options.ClientId = "TestClient";
options.UserName = "TestUser";
options.Password = "TestPassword";
options.ProtocolName = "MQTT";
options.Version = MqttProtocolVersion.V311;
options.KeepAlive = 60;
options.CleanSession = true;

// v5可以继续配置其他

// options.UserProperties = new[]
// {
// new MqttUserProperty("key1","value1"),
// new MqttUserProperty("key2","value2")
// };
})
.ConfigurePlugins(a =>
{
a.AddMqttConnectingPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Connecting:{e.ConnectMessage.ClientId}");
await e.InvokeNext();
});

a.AddMqttConnectedPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Connected:{e.ConnectMessage.ClientId}");
await e.InvokeNext();
});

a.AddMqttReceivingPlugin(async (mqttSession, e) =>
{
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
});

a.AddMqttReceivedPlugin(async (mqttSession, e) =>
{
var message = e.Message;
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
});

a.AddMqttClosingPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Closing:{e.MqttMessage.MessageType}");
await e.InvokeNext();
});

a.AddMqttClosedPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Closed:{e.Message}");
await e.InvokeNext();
});

}));//载入配置

await client.ConnectAsync();//连接

七、订阅与取消订阅

7.1 订阅

按照Mqtt协议,客户端可以通过Subscribe指令订阅一个或多个主题,服务端会返回订阅结果。

var topic1 = "topic1";
var topic2 = "topic2";
SubscribeRequest subscribeRequest1=new SubscribeRequest(topic1, QosLevel.AtLeastOnce);//订阅请求
SubscribeRequest subscribeRequest2=new SubscribeRequest(topic2, QosLevel.AtMostOnce);//可以设置不同的Qos级别

//多个订阅请求
var mqttSubscribeMessage = new MqttSubscribeMessage(subscribeRequest1, subscribeRequest2);

//执行订阅
var mqttSubAckMessage = await client.SubscribeAsync(mqttSubscribeMessage);

//输出订阅结果
foreach (var item in mqttSubAckMessage.ReturnCodes)
{
Console.WriteLine($"ReturnCode:{item}");
}

7.2 取消订阅

按照Mqtt协议,客户端可以通过Unsubscribe指令取消订阅一个或多个主题,服务端会返回所有取消订阅结果。

var topic1 = "topic1";
var topic2 = "topic2";

//取消订阅
var mqttUnsubAckMessage = await client.UnsubscribeAsync(new MqttUnsubscribeMessage(topic1,topic2));

八、接收消息

Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。

正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。

所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可。

你可以直接使用委托实现订阅:

.ConfigurePlugins(a =>
{
a.AddMqttReceivedPlugin(async (client, e) =>
{
var mqttMessage = e.MqttMessage;
Console.WriteLine("Reved:" + mqttMessage);

//订阅消息的主题
var topicName = mqttMessage.TopicName;

//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;

//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
});
})

或者使用插件类来继承接口实现:

class MyMqttReceivedPlugin : PluginBase, IMqttReceivedPlugin
{
public async Task OnMqttReceived(IMqttSession client, MqttReceivedEventArgs e)
{
var mqttMessage = e.MqttMessage;

//订阅消息的主题
var topicName = mqttMessage.TopicName;

//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;

//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
}
}

然后把这个类添加到插件即可:

.ConfigurePlugins(a =>
{
a.Add<MyMqttReceivedPlugin>();//添加自定义插件
})

九、发布消息

MqttPublishMessage message = new(topic1, false, QosLevel.AtLeastOnce, Encoding.UTF8.GetBytes("Hello World"));

await client.PublishAsync(message);