跳到主要内容
版本:4.2

Mqtt服务器

定义

命名空间:
TouchSocket.Mqtt
安装:
dotnet add package TouchSocket.Mqtt

一、说明

MqttTcpService 是基于 Mqtt 协议的消息服务端,支持客户端接入管理、订阅关系维护、消息路由转发、遗嘱消息处理等功能,兼容 Mqtt 3.1.1 及 5.0+ 协议版本。

二、特点

  • 多协议版本支持(v3.1.1/v5.0)
  • 高性能异步架构设计
  • 主题树形管理机制
  • 精确的 QoS 保障
  • 遗嘱消息转发
  • 插件化扩展体系
  • TLS 加密通信
  • 客户端黑白名单控制

三、应用场景

  • 工业物联网平台
  • 实时数据监控中心
  • 智慧城市中枢系统
  • 私有化消息总线
  • 设备远程管理服务

四、可配置项

继承所有 TcpService 的配置。除此之外还支持 Mqtt 服务端专有配置。

可配置项

五、支持插件

插件方法功能
IMqttConnectingPlugin当Mqtt客户端正在连接之前调用此方法。
IMqttConnectedPlugin当Mqtt客户端连接成功时调用。
IMqttClosingPlugin当Mqtt客户端正在关闭时调用。
IMqttClosedPlugin当Mqtt客户端断开连接后触发。
IMqttReceivingPlugin在收到Mqtt所有消息时触发,可以通过e.MqttMessage获取到Mqtt的所有消息,包括订阅、订阅确认、发布、发布确认等。
IMqttReceivedPlugin当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage获取到Mqtt的发布消息。

六、创建Mqtt服务端

6.1 基于Tcp协议创建

直接创建MqttTcpService,然后配置基本的监听地址和插件:

🔄 正在加载代码...

6.2 基于WebSocket协议创建

TouchSocket也支持基于WebSocket协议的Mqtt服务器。这种方式允许Web浏览器客户端直接连接到Mqtt服务器,特别适合于需要在浏览器环境中使用Mqtt的场景。

要创建基于WebSocket的Mqtt服务器,需要使用HttpService作为基础服务,然后通过插件启用MqttWebSocket功能:

🔄 正在加载代码...
说明
  • 基于WebSocket的Mqtt服务使用HttpService作为基础服务
  • 需要在容器中添加AddMqttWebSocketService()注册服务
  • 通过UseMqttWebSocket("/mqtt")插件启用WebSocket协议支持,其中/mqtt是WebSocket的路径
  • 客户端连接时使用ws://或wss://协议,例如:ws://127.0.0.1:1883/mqtt
  • 支持所有标准的Mqtt插件和功能

七、接收消息

在Mqtt服务端创建好之后,Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin插件和IMqttReceivedPlugin插件来接收消息。

正如插件说明所示,IMqttReceivingPlugin插件可以在收到所有消息时触发,IMqttReceivedPlugin插件可以在收到发布消息时触发。

所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可。

但是对于服务器来说,也可能需要获取到的消息可能是订阅、发布、取消订阅等。所以你可以通过IMqttReceivingPlugine.MqttMessage来获取到具体的消息类型。

下列将简单演示。

7.1 通过插件接收所有消息

🔄 正在加载代码...

7.2 接收发布消息

如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin插件即可:

🔄 正在加载代码...

八、发布消息

Mqtt服务器可以主动向连接的客户端发布消息。

实际上,只要能拿到IMqttService实例,就可以使用MqttBroker属性,调用它的ForwardMessageAsync方法来发布消息。

8.1 向所有客户端发布消息

Tcp协议的Mqtt服务器,其本身实现了IMqttService接口,所以可以直接调用MqttBroker.ForwardMessageAsync方法来发布消息:

🔄 正在加载代码...

如果是基于WebSocket协议的Mqtt服务器,则需要通过依赖注入获取到IMqttService实例:

🔄 正在加载代码...

九、客户端管理

9.1 获取所有连接的客户端

🔄 正在加载代码...

9.2 断开指定客户端

🔄 正在加载代码...

十、订阅管理

MqttBroker 是 Mqtt 服务器的核心订阅路由引擎,负责维护客户端与主题之间的订阅关系,并将发布消息路由转发给所有匹配的订阅者。通过 service.MqttBroker 属性即可获取到当前服务器的 MqttBroker 实例。

10.1 查询所有主题

GetAllTopics 方法返回当前 Broker 中已注册的所有主题名称快照,可用于监控、调试或管理界面展示:

🔄 正在加载代码...

10.2 查询主题的订阅者

GetSubscribers 方法支持通配符匹配,返回与指定主题匹配的所有 Subscription 订阅者对象。每个订阅者包含 ClientId(客户端标识)和 QosLevel(订阅的服务质量等级)属性:

🔄 正在加载代码...

10.3 检查订阅关系

ContainsSubscriber 方法可精确判断某个客户端是否已订阅了指定主题:

🔄 正在加载代码...

10.4 手动添加订阅

除了客户端主动发送 Subscribe 指令之外,服务端也可以通过 RegisterActor 方法为指定客户端主动建立订阅关系,适用于需要服务端控制订阅权限或批量初始化订阅的场景:

🔄 正在加载代码...
说明

RegisterActor 等同于在内部创建一个 Subscription 对象并调用 AddSubscriber,两者均可使用。手动添加的订阅关系与客户端主动订阅的行为一致,消息转发时会按 QoS 等级协商进行路由。

10.5 手动移除订阅

UnregisterActor 方法可从指定主题中移除某个客户端的订阅关系。若需要更精确地控制,也可以使用 RemoveSubscriber 方法直接传入 Subscription 对象:

🔄 正在加载代码...

10.6 清空订阅

ClearTopic 可移除指定主题下的所有订阅者,Clear 方法可一次性清空所有主题及全部订阅关系:

🔄 正在加载代码...

十一、服务器管理

11.1 停止服务器

🔄 正在加载代码...

11.2 释放资源

🔄 正在加载代码...

十二、示例Demo

十三、性能测试

本节通过基准测试对比 TouchSocket.Mqtt(TSMqtt)服务器MQTTnet 服务器 在不同 QoS 等级和发布/订阅规模下的吞吐表现,客户端同时使用 MQTTnet 和 TSMqtt 两种实现进行测试,共覆盖 18 个测试场景,全部零丢包完成。

13.1 使用 MQTTnet 作为服务器

下表展示了在 MQTTnet Broker 托管下,两种客户端在各场景中的发送速率与接收速率(单位:条/秒):

[使用mqttnet作为服务器]

场景名称客户端QoS发布x订阅发送量实际收丢包耗时(s)发送/s接收/s状态
1发布x1订阅MQTTnetQoS01x150,00050,000-1.8726,78626,786OK
1发布x1订阅MQTTnetQoS11x120,00020,000-2.019,9369,936OK
1发布x1订阅MQTTnetQoS21x15,0005,000-0.935,3855,385OK
1发布x50订阅MQTTnetQoS01x5010,000500,000-4.732,112105,636OK
1发布x50订阅MQTTnetQoS11x505,000250,000-3.381,47973,996OK
1发布x50订阅MQTTnetQoS21x501,00050,000-1.1190345,177OK
50发布x50订阅MQTTnetQoS050x5050,0002,500,000-26.821,86493,219OK
50发布x50订阅MQTTnetQoS150x5025,0001,250,000-17.131,45972,963OK
50发布x50订阅MQTTnetQoS250x505,000250,000-5.6887943,987OK
1发布x1订阅TSMqttQoS01x150,00050,000-1.8327,32127,321OK
1发布x1订阅TSMqttQoS11x120,00020,000-1.8310,91210,912OK
1发布x1订阅TSMqttQoS21x15,0005,000-0.875,7615,761OK
1发布x50订阅TSMqttQoS01x5010,000500,000-5.201,92396,175OK
1发布x50订阅TSMqttQoS11x505,000250,000-3.581,39769,870OK
1发布x50订阅TSMqttQoS21x501,00050,000-1.1686543,288OK
50发布x50订阅TSMqttQoS050x5050,0002,500,000-27.071,84792,359OK
50发布x50订阅TSMqttQoS150x5025,0001,250,000-15.751,58779,362OK
50发布x50订阅TSMqttQoS250x505,000250,000-4.481,11555,759OK
合计 / 平均18/18--342,0009,750,000--5,75253,216全部OK

MQTTnet vs TSMqtt 对比

场景QoSMQTTnet 接收/sTSMqtt 接收/s接收/s 差距MQTTnet 耗时(s)TSMqtt 耗时(s)耗时差(s)
1发布x1订阅QoS026,78627,321+5351.871.83-0.04
1发布x1订阅QoS19,93610,912+9762.011.83-0.18
1发布x1订阅QoS25,3855,761+3760.930.87-0.06
1发布x50订阅QoS0105,63696,175-9,4614.735.20+0.47
1发布x50订阅QoS173,99669,870-4,1263.383.58+0.20
1发布x50订阅QoS245,17743,288-1,8891.111.16+0.05
50发布x50订阅QoS093,21992,359-86026.8227.07+0.25
50发布x50订阅QoS172,96379,362+6,39917.1315.75-1.38
50发布x50订阅QoS243,98755,759+11,7725.684.48-1.20

在 MQTTnet 服务器下的结论: 两种客户端的整体表现基本持平,TSMqtt 客户端在 QoS0 单发单订场景下与 MQTTnet 相近,在高 QoS 大规模(50x50)场景中 TSMqtt 接收速率略有优势(QoS1 +6,399/s,QoS2 +11,772/s)。可见客户端实现对服务端吞吐的影响在同一 Broker 下并不显著,性能瓶颈主要在 Broker 侧。

13.2 使用 TouchSocket.Mqtt 作为服务器

下表展示了在 TouchSocket.Mqtt Broker 托管下,两种客户端在各场景中的表现:

[使用TouchSocket.Mqtt作为服务器]

场景名称客户端QoS发布x订阅发送量实际收丢包耗时(s)发送/s接收/s状态
1发布x1订阅MQTTnetQoS01x150,00050,000-1.5831,66731,667OK
1发布x1订阅MQTTnetQoS11x120,00020,000-1.3414,97714,977OK
1发布x1订阅MQTTnetQoS21x15,0005,000-0.598,4038,403OK
1发布x50订阅MQTTnetQoS01x5010,000500,000-1.506,656332,840OK
1发布x50订阅MQTTnetQoS11x505,000250,000-2.142,334116,736OK
1发布x50订阅MQTTnetQoS21x501,00050,000-0.851,17758,850OK
50发布x50订阅MQTTnetQoS050x5050,0002,500,000-2.4320,5481,027,408OK
50发布x50订阅MQTTnetQoS150x5025,0001,250,000-11.182,235111,761OK
50发布x50订阅MQTTnetQoS250x505,000250,000-4.791,04252,138OK
1发布x1订阅TSMqttQoS01x150,00050,000-0.18271,947271,947OK
1发布x1订阅TSMqttQoS11x120,00020,000-1.1717,13017,130OK
1发布x1订阅TSMqttQoS21x15,0005,000-0.578,8458,845OK
1发布x50订阅TSMqttQoS01x5010,000500,000-0.2050,6352,531,772OK
1发布x50订阅TSMqttQoS11x505,000250,000-2.172,308115,438OK
1发布x50订阅TSMqttQoS21x501,00050,000-0.881,13356,684OK
50发布x50订阅TSMqttQoS050x5050,0002,500,000-0.8956,4282,821,412OK
50发布x50订阅TSMqttQoS150x5025,0001,250,000-12.332,026101,346OK
50发布x50订阅TSMqttQoS250x505,000250,000-4.801,04152,097OK
合计 / 平均18/18--342,0009,750,000--27,807429,525全部OK

MQTTnet vs TSMqtt 对比

场景QoSMQTTnet 接收/sTSMqtt 接收/s接收/s 差距MQTTnet 耗时(s)TSMqtt 耗时(s)耗时差(s)
1发布x1订阅QoS031,667271,947+240,2801.580.18-1.40
1发布x1订阅QoS114,97717,130+2,1531.341.17-0.17
1发布x1订阅QoS28,4038,845+4420.590.57-0.03
1发布x50订阅QoS0332,8402,531,772+2,198,9321.500.20-1.30
1发布x50订阅QoS1116,736115,438-1,2982.142.17+0.02
1发布x50订阅QoS258,85056,684-2,1660.850.88+0.03
50发布x50订阅QoS01,027,4082,821,412+1,794,0042.430.89-1.55
50发布x50订阅QoS1111,761101,346-10,41511.1812.33+1.15
50发布x50订阅QoS252,13852,097-414.794.80+0.00

13.3 综合结论

TouchSocket.Mqtt 服务端在 QoS0 场景下具有极为显著的性能优势:

  • QoS0 单发单订(1x1):TSMqtt 客户端连接 TouchSocket 服务器时接收速率高达 271,947/s,是使用 MQTTnet 服务器时(27,321/s)的约 10 倍,耗时从 1.83s 降至 0.18s。这得益于 TouchSocket 在零确认路径上的内存零拷贝与无锁化分发设计。

  • QoS0 一发多订(1x50):TSMqtt + TouchSocket 组合的接收速率达到 253 万/s,而同场景下 MQTTnet 服务器仅能达到 10.5 万/s,提升约 24 倍

  • QoS0 多发多订(50x50):TSMqtt + TouchSocket 组合接收速率高达 282 万/s,是 MQTTnet 服务器同场景的约 30 倍

  • QoS1/QoS2 场景:受协议本身的握手确认机制约束,两者服务器在此类场景下表现接近,不存在量级差异。这说明 QoS1/2 的吞吐上限更多由协议往返延迟决定,而非 Broker 实现效率。

  • 综合平均吞吐:TouchSocket 服务器的平均接收速率为 429,525/s,是 MQTTnet 服务器(53,216/s)的约 8 倍;平均发送速率也从 5,752/s 提升至 27,807/s(约 4.8 倍)。

性能建议

在对 QoS0 吞吐有极高要求的场景(如大规模设备遥测数据上报、实时数据流分发等),推荐优先选用 TouchSocket.Mqtt 作为服务端并配合 TSMqtt 客户端,可获得最优性能表现。QoS1/2 场景下两种 Broker 表现相当,可根据团队技术栈灵活选择。