Mqtt客户端使用
定义
命名空间:TouchSocket.Mqtt
程序集:TouchSocket.Mqtt.dll
一、说明
MqttClient
是遵循Mqtt协议的消息客户端,支持连接Mqtt Broker、订阅主题、发布消息等功能,支持QoS 0/1/2三种消息质量等级。
二、特点
- 轻量级协议支持
- QoS消息质量保障
- 遗嘱消息支持
- 保留消息处理
- TLS加密通信
- 主题通配符匹配(+/#)
三、应用场景
- IoT设备数据上报
- 跨平台消息推送
- 低带宽环境通信
- 设备状态同步
四、可配置项
继承所有TcpClient的配置。除此之外还支持Mqtt的一些专有配置。
可配置项
SetMqttConnectOptions
MqttConnectOptions
类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数,支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明:
基础连接选项(通用)
属性名 | Mqtt 版本支持 | 说明 |
---|---|---|
CleanSession | 3.1.1+ | 是否清理会话(断开后删除未完成的遗嘱和离线消息)。 |
ClientId | 3.1.1+ | 客户端唯一标识符(服务端用于识别客户端);Mqtt 3.1.1 中必填(长度 1-23 字节,ASCII 字符);5.0+ 允许空字符串(仅当服务端允许时)。 |
KeepAlive | 3.1.1+ | 心跳保活时长(单位:秒),表示客户端与服务端保持连接的最长时间;需与 服务端配置兼容(服务端可能拒绝过大或过小的值),默认值通常为 60 秒。 |
Password | 3.1.1+ | 连接认证密码(与 UserName 配合使用);若 UserName 为空则无效;5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。 |
ProtocolName | 3.1.1+ | Mqtt 协议名称(固定为 "MQTT"),默认值为 "MQTT",通常无需手动设置。 |
UserName | 3.1.1+ | 连接认证用户名(与 Password 配合使用);5.0+ 支持二进制数据(需通过 AuthenticationData 传递)。 |
Version | 3.1.1+/5.0+ | Mqtt 协议版本(如 MqttProtocolVersion.V311 或 V500 );需根据服务端支持的版本设置,否则可能导致连接失败。 |
遗嘱(Will)消息配置(可选)
当 WillFlag
为 true
时,服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 WillFlag=true
时生效。
属性名 | Mqtt 版本支持 | 说明 |
---|---|---|
WillFlag | 3.1.1+ | 是否启用遗嘱消息(若为 true ,需配置遗嘱相关属性)。 |
WillQos | 3.1.1+ | 遗嘱消息的服务质量等级(0/1/2);需与服务端支持的 QoS 等级兼容。 |
WillRetain | 3.1.1+ | 是否保留遗嘱消息(服务端保留最新遗嘱消息供新订阅者接收);默认值为 false ;若为 true ,服务端需支持保留消息功能。 |
WillMessage | 3.1.1+ | 遗嘱消息的内容(文本类型,Mqtt 5.0+ 支持二进制数据,需通过 WillContentType 和 WillPayloadFormatIndicator 配合)。 |
WillTopic | 3.1.1+ | 遗嘱消息发布的主题(需符合 Mqtt 主题命名规则);主题需提前订阅才能接收遗嘱消息。 |
WillContentType | 5.0+ | 遗嘱消息内容的 MIME 类型(如 "text/plain"、"application/json");描述 WillMessage 的格式(仅 5.0+ 有效)。 |
WillPayloadFormatIndicator | 5.0+ | 遗嘱消息负载格式指示符(如 Raw 、Utf8 );更细粒度描述负载格式(仅 5.0+ 有效)。 |
WillResponseTopic | 5.0+ | 遗嘱消息触发后,服务端向客户端发送响应的主题;需客户端提前订阅该主题以接收响应。 |
WillCorrelationData | 5.0+ | 遗嘱消息关联的上下文数据(用于匹配请求与响应);需与服务端约定格式(仅 5.0+ 有效)。 |
WillDelayInterval | 5.0+ | 遗嘱消息延迟发布的时间间隔(单位:秒);遗嘱触发后,延迟指定时间再发布(仅 5.0+ 有效)。 |
WillMessageExpiryInterval | 5.0+ | 遗嘱消息的有效期(单位:秒,过期后服务端删除);超过此时间未发布的遗嘱消息将被丢弃(仅 5.0+ 有效)。 |
Mqtt 5.0+ 扩展配置(可选)
仅当使用 Mqtt 5.0+ 协议时生效,用于更细粒度的连接控制。
属性名 | Mqtt 版本支持 | 说明 |
---|---|---|
UserProperties | 5.0+ | 用户自定义属性(键值对,用于传递业务元数据);键值对需符合 Mqtt 5.0 规范(键为 UTF-8 字符串,值支持多种类型)。 |
AuthenticationMethod | 5.0+ | 认证方法名称(如 "OAuth2"、"Custom");需与服务端支持的认证方法一致(仅 5.0+ 支持扩展认证)。 |
AuthenticationData | 5.0+ | 认证方法的附加数据(如令牌、签名等);配合 AuthenticationMethod 使用(仅 5.0+ 有效)。 |
MaximumPacketSize | 5.0+ | 客户端请求的最大数据包大小(单位:字节);服务端会返回其支持的最大值(客户端需遵守)。 |
ReceiveMaximum | 5.0+ | 客户端能接收的最大 QoS 1/2 消息数量(防止消息堆积);默认值通常为 65535(需与服务端配置兼容)。 |
RequestProblemInformation | 5.0+ | 是否请求服务端在出错时返回详细问题信息(如原因码、原因字符串);默认值为 true (建议开启以便排查问题)。 |
RequestResponseInformation | 5.0+ | 是否请求服务端返回连接响应的额外信息(如会话过期时间);默认值为 false (仅当需要时启用)。 |
SessionExpiryInterval | 5.0+ | 会话过期时间间隔(单位:秒,0 表示立即过期);客户端断开后,服务端保留会话的时间(仅当 CleanSession=false 时有效)。 |
TopicAliasMaximum | 5.0+ | 客户端允许的最大主题别名值(服务站不能使用超过此值的别名);默认值为 0(表示不使用主题别名)。 |
注意事项
- 协议版本兼容性:部分属性(如
UserProperties
、AuthenticationMethod
)仅在 Mqtt 5.0+ 中有效,需根据服务端版本选择配置。 - 遗嘱消息生效条件:
WillFlag
必须为true
,且WillTopic
和WillMessage
需有效配置,否则遗嘱不会被发送。 - 认证扩展:5.0+ 的
AuthenticationData
需配合自定义认证插件使用,具体格式由服务端定义。 - 只读属性:
WillMessage
和WillTopic
标记为internal set
,通常通过构造函数或专用方法设置(如MqttApplicationMessage
)。
五、支持插件
插件方法 | 功能 |
---|---|
IMqttConnectingPlugin | 当Mqtt客户端正在连接之前调用此方法。 |
IMqttConnectedPlugin | 当Mqtt客户端连接成功时调用。 |
IMqttClosingPlugin | 当Mqtt客户端正在关闭时调用。 |
IMqttClosedPlugin | 当Mqtt客户端断开连接后触发。 |
IMqttReceivingPlugin | 在收到Mqtt所有消息时触发,可以通过e.MqttMessage 获取到Mqtt 的所有消息,包括订阅、订阅确认、发布、发布确认等。 |
IMqttReceivedPlugin | 当接收到Mqtt发布消息,且成功接收时触发。可以通过e.MqttMessage 获取到Mqtt 的发布消息。 |
六、创建Mqtt客户端
var client = new MqttTcpClient();
await client.SetupAsync(new TouchSocketConfig()
.SetRemoteIPHost("tcp://127.0.0.1:7789")
.SetMqttConnectOptions(options =>
{
options.ClientId = "TestClient";
options.UserName = "TestUser";
options.Password = "TestPassword";
options.ProtocolName = "MQTT";
options.Version = MqttProtocolVersion.V311;
options.KeepAlive = 60;
options.CleanSession = true;
// v5可以继续配置其他
// options.UserProperties = new[]
// {
// new MqttUserProperty("key1","value1"),
// new MqttUserProperty("key2","value2")
// };
})
.ConfigurePlugins(a =>
{
a.AddMqttConnectingPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Connecting:{e.ConnectMessage.ClientId}");
await e.InvokeNext();
});
a.AddMqttConnectedPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Connected:{e.ConnectMessage.ClientId}");
await e.InvokeNext();
});
a.AddMqttReceivingPlugin(async (mqttSession, e) =>
{
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
});
a.AddMqttReceivedPlugin(async (mqttSession, e) =>
{
var message = e.Message;
await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
});
a.AddMqttClosingPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Closing:{e.MqttMessage.MessageType}");
await e.InvokeNext();
});
a.AddMqttClosedPlugin(async (mqttSession, e) =>
{
Console.WriteLine($"Client Closed:{e.Message}");
await e.InvokeNext();
});
}));//载入配置
await client.ConnectAsync();//连接
七、订阅与取消订阅
7.1 订阅
按照Mqtt协议,客户端可以通过Subscribe
指令订阅一个或多个主题,服务端会返回订阅结果。
var topic1 = "topic1";
var topic2 = "topic2";
SubscribeRequest subscribeRequest1=new SubscribeRequest(topic1, QosLevel.AtLeastOnce);//订阅请求
SubscribeRequest subscribeRequest2=new SubscribeRequest(topic2, QosLevel.AtMostOnce);//可以设置不同的Qos级别
//多个订阅请求
var mqttSubscribeMessage = new MqttSubscribeMessage(subscribeRequest1, subscribeRequest2);
//执行订阅
var mqttSubAckMessage = await client.SubscribeAsync(mqttSubscribeMessage);
//输出订阅结果
foreach (var item in mqttSubAckMessage.ReturnCodes)
{
Console.WriteLine($"ReturnCode:{item}");
}
7.2 取消订阅
按照Mqtt协议,客户端可以通过Unsubscribe
指令取消订阅一个或多个主题,服务端会返回所有取消订阅结果。
var topic1 = "topic1";
var topic2 = "topic2";
//取消订阅
var mqttUnsubAckMessage = await client.UnsubscribeAsync(new MqttUnsubscribeMessage(topic1,topic2));
八、接收消息
Mqtt组件的消息都是通过插件抛出的。你可以订阅IMqttReceivingPlugin
插件和IMqttReceivedPlugin
插件来接收消息。
正如插件说明所示,IMqttReceivingPlugin
插件可以在收到所有消息时触发,IMqttReceivedPlugin
插件可以在收到发布消息时触发。
所以如果你只关心发布成功的消息,那么可以只订阅IMqttReceivedPlugin
插件即可。
你可以直接使用委托实现订阅:
.ConfigurePlugins(a =>
{
a.AddMqttReceivedPlugin(async (client, e) =>
{
var mqttMessage = e.MqttMessage;
Console.WriteLine("Reved:" + mqttMessage);
//订阅消息的主题
var topicName = mqttMessage.TopicName;
//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;
//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
});
})
或者使用插件类来继承接口实现:
class MyMqttReceivedPlugin : PluginBase, IMqttReceivedPlugin
{
public async Task OnMqttReceived(IMqttSession client, MqttReceivedEventArgs e)
{
var mqttMessage = e.MqttMessage;
//订阅消息的主题
var topicName = mqttMessage.TopicName;
//订阅消息的Qos级别
var qosLevel = mqttMessage.QosLevel;
//订阅消息的Payload
var payload = mqttMessage.Payload;
await e.InvokeNext();
}
}
然后把这个类添加到插件即可:
.ConfigurePlugins(a =>
{
a.Add<MyMqttReceivedPlugin>();//添加自定义插件
})
九、发布消息
MqttPublishMessage message = new(topic1, false, QosLevel.AtLeastOnce, Encoding.UTF8.GetBytes("Hello World"));
await client.PublishAsync(message);