跳到主要内容
版本:4.2

Mqtt客户端

定义

命名空间:
TouchSocket.Mqtt
安装:
dotnet add package TouchSocket.Mqtt

一、说明

MqttTcpClient 是遵循Mqtt协议的消息客户端,支持连接Mqtt Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。

二、特点

  • 轻量级协议支持
  • QoS消息质量保障(0/1/2)
  • 遗嘱消息支持
  • 保留消息处理
  • 自动重连机制
  • TLS加密通信
  • 主题通配符匹配(+/#)
  • 插件化扩展体系

三、应用场景

  • IoT设备数据上报
  • 跨平台消息推送
  • 低带宽环境通信
  • 设备状态同步
  • 远程设备控制

四、可配置项

继承所有TcpClient的配置。除此之外还支持Mqtt客户端专有配置。

可配置项

SetMqttConnectOptions

MqttConnectOptions 类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数,支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明:

基础连接选项(通用)

属性名Mqtt 版本支持说明
CleanSession3.1.1+是否清理会话(断开后删除未完成的遗嘱和离线消息)。
ClientId3.1.1+客户端唯一标识符(服务端用于识别客户端);Mqtt 3.1.1 中必填(长度 1-23 字节,ASCII 字符);5.0+ 允许空字符串(仅当服务端允许时)。
KeepAlive3.1.1+心跳保活时长(单位:秒),表示客户端与服务端保持连接的最长时间;需与服务端配置兼容(服务端可能拒绝过大或过小的值),默认值通常为 60 秒。
Password3.1.1+连接认证密码(与 UserName 配合使用);若 UserName 为空则无效;5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。
ProtocolName3.1.1+Mqtt 协议名称(固定为 "MQTT"),默认值为 "MQTT",通常无需手动设置。
UserName3.1.1+连接认证用户名(与 Password 配合使用);5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。
Version3.1.1+/5.0+Mqtt 协议版本(如 MqttProtocolVersion.V311V500);需根据服务端支持的版本设置,否则可能导致连接失败。

遗嘱(Will)消息配置(可选)

WillFlagtrue 时,服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 WillFlag=true 时生效。

属性名Mqtt 版本支持说明
WillFlag3.1.1+是否启用遗嘱消息(若为 true,需配置遗嘱相关属性)。
WillQos3.1.1+遗嘱消息的服务质量等级(0/1/2);需与服务端支持的 QoS 等级兼容。
WillRetain3.1.1+是否保留遗嘱消息(服务端保留最新遗嘱消息供新订阅者接收);默认值为 false;若为 true,服务端需支持保留消息功能。
WillMessage3.1.1+遗嘱消息的内容(文本类型,Mqtt 5.0+ 支持二进制数据,需通过 WillContentTypeWillPayloadFormatIndicator 配合)。
WillTopic3.1.1+遗嘱消息发布的主题(需符合 Mqtt 主题命名规则);主题需提前订阅才能接收遗嘱消息。
WillContentType5.0+遗嘱消息内容的 MIME 类型(如 "text/plain"、"application/json");描述 WillMessage 的格式(仅 5.0+ 有效)。
WillPayloadFormatIndicator5.0+遗嘱消息负载格式指示符(如 RawUtf8);更细粒度描述负载格式(仅 5.0+ 有效)。
WillResponseTopic5.0+遗嘱消息触发后,服务端向客户端发送响应的主题;需客户端提前订阅该主题以接收响应。
WillCorrelationData5.0+遗嘱消息关联的上下文数据(用于匹配请求与响应);需与服务端约定格式(仅 5.0+ 有效)。
WillDelayInterval5.0+遗嘱消息延迟发布的时间间隔(单位:秒);遗嘱触发后,延迟指定时间再发布(仅 5.0+ 有效)。
WillMessageExpiryInterval5.0+遗嘱消息的有效期(单位:秒,过期后服务端删除);超过此时间未发布的遗嘱消息将被丢弃(仅 5.0+ 有效)。

Mqtt 5.0+ 扩展配置(可选)

仅当使用 Mqtt 5.0+ 协议时生效,用于更细粒度的连接控制。

属性名Mqtt 版本支持说明
UserProperties5.0+用户自定义属性(键值对,用于传递业务元数据);键值对需符合 Mqtt 5.0 规范(键为 UTF-8 字符串,值支持多种类型)。
AuthenticationMethod5.0+认证方法名称(如 "OAuth2"、"Custom");需与服务端支持的认证方法一致(仅 5.0+ 支持扩展认证)。
AuthenticationData5.0+认证方法的附加数据(如令牌、签名等);配合 AuthenticationMethod 使用(仅 5.0+ 有效)。
MaximumPacketSize5.0+客户端请求的最大数据包大小(单位:字节);服务端会返回其支持的最大值(客户端需遵守)。
ReceiveMaximum5.0+客户端能接收的最大 QoS 1/2 消息数量(防止消息堆积);默认值通常为 65535(需与服务端配置兼容)。
RequestProblemInformation5.0+是否请求服务端在出错时返回详细问题信息(如原因码、原因字符串);默认值为 true(建议开启以便排查问题)。
RequestResponseInformation5.0+是否请求服务端返回连接响应的额外信息(如会话过期时间);默认值为 false(仅当需要时启用)。
SessionExpiryInterval5.0+会话过期时间间隔(单位:秒,0 表示立即过期);客户端断开后,服务端保留会话的时间(仅当 CleanSession=false 时有效)。
TopicAliasMaximum5.0+客户端允许的最大主题别名值(服务端不能使用超过此值的别名);默认值为 0(表示不使用主题别名)。

注意
  1. 协议版本兼容性:部分属性(如 UserPropertiesAuthenticationMethod)仅在 Mqtt 5.0+ 中有效,需根据服务端版本选择配置。
  2. 遗嘱消息生效条件WillFlag 必须为 true,且 WillTopicWillMessage 需有效配置,否则遗嘱不会被发送。
  3. 认证扩展:5.0+ 的 AuthenticationData 需配合自定义认证插件使用,具体格式由服务端定义。

五、支持插件

插件方法功能
IMqttConnectingPlugin当Mqtt客户端正在连接之前调用此方法。
IMqttConnectedPlugin当Mqtt客户端连接成功时调用。
IMqttClosingPlugin当Mqtt客户端正在关闭时调用。
IMqttClosedPlugin当Mqtt客户端断开连接后触发。
IMqttReceivingPlugin在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。
IMqttReceivedPlugin当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。

UseMqttTopicHandler —— 主题路由处理插件

UseMqttTopicHandler 是基于上述插件封装的高层扩展方法,可将主题订阅与消息处理逻辑集中配置,简化代码结构。其核心特性如下:

  • 自动订阅:客户端连接成功后自动发起 Subscribe 请求,无需手动调用 SubscribeAsync
  • 主题路由:收到消息时自动按主题过滤器(含通配符)匹配并分发到对应处理器。
  • 断线重订阅:重连后再次触发 OnMqttConnected,会自动重新订阅所有注册的主题。
  • 支持通配符:过滤主题支持 +(单级通配符)和 #(多级通配符)。

六、创建Mqtt客户端

6.1 基于Tcp协议创建

直接创建MqttTcpClient实例,配置连接参数和插件:

🔄 正在加载代码...
温馨提示

client.ConnectAsync()方法会连接到Mqtt服务器,如果连接失败会抛出异常。

6.2 基于WebSocket协议创建

🔄 正在加载代码...

6.3 配置连接选项

🔄 正在加载代码...

6.4 配置插件

🔄 正在加载代码...

例如,连接相关插件:

🔄 正在加载代码...

七、订阅与取消订阅

7.1 订阅主题

按照Mqtt协议,客户端可以通过Subscribe指令订阅一个或多个主题,服务端会返回订阅结果。

🔄 正在加载代码...

7.2 取消订阅

按照Mqtt协议,客户端可以通过Unsubscribe指令取消订阅一个或多个主题,服务端会返回所有取消订阅结果。

🔄 正在加载代码...

八、接收消息

在Mqtt客户端连接成功之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。

正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。

所以如果你只关心接收(已发布)消息的消息,那么可以只订阅IMqttReceivedPlugin插件即可。

但是对于客户端来说,也可能需要获取到的消息可能是订阅确认、发布、发布确认、取消订阅确认等。所以你可以通过IMqttReceivingPlugine.MqttMessage来获取到具体的消息类型。

下列将简单演示。

8.1 通过插件接收所有消息

🔄 正在加载代码...

8.2 接收发布消息

如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可:

🔄 正在加载代码...

九、发布消息

🔄 正在加载代码...

或者使用高性能的内存池方式发布数据。

🔄 正在加载代码...

十、断开连接

10.1 正常断开

🔄 正在加载代码...

10.2 释放资源

🔄 正在加载代码...

十一、连接管理

11.1 检查连接状态

🔄 正在加载代码...

11.2 自动重连配置

🔄 正在加载代码...

十二、UseMqttTopicHandler —— 主题路由处理

UseMqttTopicHandler 是对 IMqttConnectedPluginIMqttReceivedPlugin 的高层封装,支持在配置阶段声明主题过滤器及对应的消息处理器,客户端连接成功后将自动完成订阅,收到消息后自动按规则路由到匹配的处理器。

12.1 基本用法

调用 ConfigurePlugins 时使用 UseMqttTopicHandler 并通过 AddSubscription 注册主题与处理器:

🔄 正在加载代码...

12.2 单级通配符(+)

主题过滤器中的 + 可匹配该层级任意单个节点,例如 home/+/temperature 能同时匹配 home/room1/temperaturehome/room2/temperature 等:

🔄 正在加载代码...

12.3 多级通配符(#)

主题过滤器末尾的 # 可匹配任意层级的后缀,例如 home/# 能匹配 home/temperaturehome/room1/humidityhome/floor1/room2/light 等:

🔄 正在加载代码...

12.4 同时订阅多个主题

通过链式调用 AddSubscription 为不同主题绑定独立的处理器,还可为指定主题单独设置 QoS 等级:

🔄 正在加载代码...
提示

UseMqttTopicHandler 会在每次连接成功(包括断线重连)时自动重新发起订阅,无需额外处理重订阅逻辑。

十三、性能测试

本节通过 BenchmarkDotNet 对 TouchSocket.Mqtt(TSMqtt)客户端MQTTnet 客户端 在相同服务器环境下发布 1000 条消息进行基准对比,测试涵盖不同数据大小(1KB / 4KB)、不同 QoS 等级(AtMostOnce / AtLeastOnce / ExactlyOnce)以及 Mqtt 协议版本(V311 / V500),重点关注耗时与内存分配两个维度。

MethodDataSizeQosVersionMeanErrorStdDevMedianAllocated
MqttnetPublishMessages1024AtMostOnceV31123.727 ms0.4705 ms1.3115 ms23.766 ms180.29 KB
TouchSocketPublishMessages1024AtMostOnceV3114.442 ms0.2472 ms0.7288 ms4.392 ms3.55 KB
MqttnetPublishMessages1024AtMostOnceV50023.869 ms0.4691 ms0.9476 ms23.841 ms180.12 KB
TouchSocketPublishMessages1024AtMostOnceV5004.375 ms0.2522 ms0.7435 ms4.297 ms3.65 KB
MqttnetPublishMessages1024AtLeastOnceV31192.370 ms8.1923 ms24.1552 ms87.462 ms1726.95 KB
TouchSocketPublishMessages1024AtLeastOnceV31175.053 ms5.1410 ms15.1585 ms75.007 ms360 KB
MqttnetPublishMessages1024AtLeastOnceV50091.927 ms3.4399 ms10.1425 ms91.922 ms1727.13 KB
TouchSocketPublishMessages1024AtLeastOnceV50073.622 ms3.1962 ms9.3740 ms73.184 ms359.92 KB
MqttnetPublishMessages1024ExactlyOnceV311163.781 ms7.8247 ms23.0712 ms159.986 ms3250.45 KB
TouchSocketPublishMessages1024ExactlyOnceV311147.421 ms5.3558 ms15.6233 ms149.457 ms641.25 KB
MqttnetPublishMessages1024ExactlyOnceV500165.352 ms5.3792 ms15.6913 ms164.293 ms3250.65 KB
TouchSocketPublishMessages1024ExactlyOnceV500147.806 ms9.0072 ms26.5578 ms146.130 ms641.32 KB
MqttnetPublishMessages4096AtMostOnceV31125.222 ms0.5034 ms1.1566 ms25.123 ms180.38 KB
TouchSocketPublishMessages4096AtMostOnceV3114.124 ms0.2799 ms0.8165 ms4.061 ms19.13 KB
MqttnetPublishMessages4096AtMostOnceV50023.519 ms0.7153 ms2.1090 ms24.089 ms180.26 KB
TouchSocketPublishMessages4096AtMostOnceV5005.030 ms0.5822 ms1.7167 ms4.797 ms17.85 KB
MqttnetPublishMessages4096AtLeastOnceV31197.740 ms3.7732 ms11.0661 ms96.862 ms1727.06 KB
TouchSocketPublishMessages4096AtLeastOnceV31180.583 ms5.0478 ms14.8834 ms80.928 ms359.94 KB
MqttnetPublishMessages4096AtLeastOnceV50093.996 ms4.3629 ms12.8640 ms94.499 ms1727.18 KB
TouchSocketPublishMessages4096AtLeastOnceV50080.611 ms4.1234 ms12.1580 ms80.232 ms359.92 KB
MqttnetPublishMessages4096ExactlyOnceV311167.821 ms7.5768 ms22.1018 ms167.827 ms3250.61 KB
TouchSocketPublishMessages4096ExactlyOnceV311148.221 ms7.9895 ms23.4319 ms144.520 ms641.32 KB
MqttnetPublishMessages4096ExactlyOnceV500162.248 ms7.7536 ms22.7401 ms163.285 ms3250.22 KB
TouchSocketPublishMessages4096ExactlyOnceV500142.952 ms8.5958 ms25.2100 ms137.929 ms641.32 KB

结论

QoS0(AtMostOnce)——性能差距最为显著:

  • 耗时方面:发送 1000 条 1KB 消息,TouchSocket 仅需约 4.4ms,而 MQTTnet 需要约 23.7ms,TouchSocket 快约 5.3 倍;4KB 数据包的差距更大,约 6.1 倍
  • 内存分配方面:TouchSocket 仅分配约 3.55KB,MQTTnet 则高达 180KB,内存占用约为 MQTTnet 的 1/50。这得益于 TouchSocket 的内存池机制和零拷贝设计,在高频 QoS0 场景下几乎无 GC 压力。

QoS1(AtLeastOnce)——仍有明显优势:

  • 受 PUBACK 握手的延迟影响,两者的绝对耗时均有所上升,但 TouchSocket 仍比 MQTTnet 快约 18%~20%(1KB: 75ms vs 92ms)。
  • 内存分配差距依然显著:TouchSocket 约 360KB,MQTTnet 约 1727KB,节省约 79% 的内存

QoS2(ExactlyOnce)——差距收窄但内存优势保持:

  • 由于需要 4 次握手,耗时较长,两者均在 140~170ms 区间,TouchSocket 约 快 10%(147ms vs 164ms)。
  • 内存分配:TouchSocket 约 641KB,MQTTnet 约 3250KB,仍节省约 80% 的内存

协议版本影响:

V311 与 V500 在同等条件下的性能差异可忽略不计,两者均在测量误差范围内,说明 TouchSocket 对两种协议版本均有良好优化。

综合建议

在追求极低延迟与低内存占用的场景(如大规模设备数据上报、高频遥测)中,TouchSocket.Mqtt 客户端具有显著优势,尤其在 QoS0 下性能领先幅度极为突出。即便在 QoS1/2 场景下,其内存分配优势也能有效降低 GC 停顿对服务稳定性的影响。