跳到主要内容
版本:4.2

MqttRpc 使用指南

定义

命名空间:
TouchSocket.Mqtt
安装:
dotnet add package TouchSocket.Mqtt

一、概述

MqttRpc 是基于 Mqtt 协议实现的 RPC 组件,允许 Mqtt 客户端之间进行远程过程调用。它以 Mqtt Broker 为消息中转中心,使用 JSON 对请求和响应进行编解码,具备良好的跨语言兼容性。

与传统的 Tcp/Http RPC 不同,MqttRpc 天然契合物联网场景,客户端既可以作为 RPC 服务端(处理请求),也可以作为 RPC 调用端(发起请求),甚至可以同时承担两种角色

1.1 架构原理

调用流程说明:

  1. 调用端mqttrpc/req 主题发布 JSON 格式的请求报文,报文中携带响应主题地址 mqttrpc/res/{callerId}
  2. Broker 将请求路由到订阅了 mqttrpc/req 的所有服务端客户端
  3. 服务端收到请求后,通过注册的 RpcStore 匹配并执行对应的 RPC 方法
  4. 服务端将方法返回值序列化为 JSON,发布到请求报文中指定的响应主题
  5. 调用端接收响应并将结果返回给上层调用代码

1.2 核心特点

  • 基于 Mqtt 协议,天然支持 QoS 0/1/2 消息可靠性保障
  • 使用 JSON 编码,跨语言、跨平台兼容
  • 支持同步等待响应(WaitInvoke)和仅发送(OnlySend)两种调用模式
  • 支持调用超时与 CancellationToken 取消
  • 支持 RPC 调用上下文(IMqttRpcCallContext),可获取调用方信息
  • 支持 DispatchProxy 强类型代理调用
  • 支持源码生成器自动生成代理扩展方法
  • 服务端与调用端可以是同一个 Mqtt 客户端,双向通信无缝衔接
  • 可与任何标准 Mqtt Broker(EMQX、Mosquitto 等)配合使用

二、定义 RPC 服务

服务端创建一个类,继承 SingletonRpcServer(或实现 ISingletonRpcServer),然后在该类中定义公共方法,并使用 [MqttRpc] 特性标记。

🔄 正在加载代码...

2.1 调用键说明

备注

调用键(InvokeKey)默认格式为:命名空间.类名.方法名(全部小写)。

例如,MqttRpcConsoleApp.MyRpcServer.Add 方法的调用键为:mqttrpcconsoleapp.myrpcserver.add

若在定义代理接口时继承自 ISingletonRpcServer,调用键基于接口的命名空间和接口名生成,而非实现类名。

2.2 调用上下文

[MqttRpc] 标记的方法支持将 IMqttRpcCallContext 作为第一个参数,该参数由框架自动注入,无需客户端传递。通过它可以获取调用方的 Mqtt 会话信息(如客户端 ID、连接状态等)。

🔄 正在加载代码...
提示

更多 RPC 注册方式请参考:注册 Rpc 服务

三、创建 Mqtt Broker

MqttRpc 需要一个 Mqtt Broker 作为消息中转中心。MqttTcpService 本身即具备完整的 Broker(消息路由转发)能力。

🔄 正在加载代码...
提示

在生产环境中,可使用任何标准 Mqtt Broker(如 EMQX、Mosquitto 等)替代,无需修改客户端代码。只需确保服务端和调用端连接到同一个 Broker 即可。

四、创建 RPC 服务端客户端

RPC 服务端是一个普通的 Mqtt 客户端,通过 UseMqttRpc 插件订阅请求主题并处理 RPC 调用。该模式下,客户端既能处理 RPC 请求,也能发起 RPC 调用

🔄 正在加载代码...
核心要点
  • ConfigureContainer 中通过 AddRpcStore 注册 RPC 服务类(支持接口映射和直接注册两种方式)
  • ConfigurePlugins 中通过 UseMqttRpc 启用服务端+客户端双向 RPC 功能
  • UseMqttRpc 会自动从容器中解析 IRpcServerProvider,无需手动传入
  • 连接后插件会自动订阅 RequestTopic(处理传入请求)和 ResponseTopicPrefix/+(接收响应)

五、创建 RPC 调用端客户端

调用端同样是一个 Mqtt 客户端,通过 UseMqttRpcClient 插件以纯客户端模式运行,只发起调用、不注册任何 RPC 服务。

🔄 正在加载代码...
说明

UseMqttRpc(服务端+客户端)与 UseMqttRpcClient(纯客户端)的区别:

插件订阅请求主题订阅响应主题适用场景
UseMqttRpc既要处理请求,也要发起调用
UseMqttRpcClient只发起调用,不处理 RPC 请求

六、配置选项(MqttRpcOption)

属性类型默认值说明
RequestTopicstringmqttrpc/reqRPC 请求主题,服务端订阅此主题接收请求
ResponseTopicPrefixstringmqttrpc/res响应主题前缀,实际响应主题为 {前缀}/{唯一标识}
QosLevelQosLevelAtLeastOnce请求/响应消息的 QoS 等级(AtMostOnce/AtLeastOnce/ExactlyOnce)
SerializerOptionsJsonSerializerOptions默认JSON 序列化选项,可自定义序列化行为
注意

服务端与调用端配置的 RequestTopicResponseTopicPrefix 必须保持一致,否则无法完成 RPC 调用。建议将这两个值集中定义为常量以避免配置不一致。

七、调用 RPC 方法

连接成功后,通过 GetMqttRpcClient() 扩展方法获取 IMqttRpcClient 接口,即可发起调用。

7.1 直接调用(InvokeAsync)

使用 InvokeAsync 方法进行原生调用,需要手动指定调用键返回值类型

🔄 正在加载代码...

InvokeOption 提供以下调用模式:

选项说明
InvokeOption.WaitInvoke等待服务端执行完毕并返回结果(默认超时 5000ms)
InvokeOption.OnlySend仅发送请求,不等待响应(fire-and-forget)

也可以自定义调用选项:

🔄 正在加载代码...

7.2 代理调用(源码生成器)

通过定义接口并使用 [GeneratorRpcProxy] 特性,源码生成器会自动生成强类型代理扩展方法,消除字符串调用键和类型转换的繁琐操作。

第一步: 定义代理接口,方法签名与服务端保持一致(带 Async 后缀的异步版本),并标记 [GeneratorRpcProxy][MqttRpc]

🔄 正在加载代码...

第二步: 直接使用生成的扩展方法调用,类型安全,无需字符串调用键:

🔄 正在加载代码...
提示

MqttRpc 支持 RPC 平台的所有功能,如过滤器调度器以及所有代理调用方式。更多代理调用方式请参考:服务端代理生成

八、序列化配置

MqttRpc 默认使用 System.Text.Json 进行序列化。可以通过 MqttRpcOption.SerializerOptions 自定义序列化行为:

🔄 正在加载代码...
注意

服务端与调用端的 SerializerOptions 必须保持一致(例如属性命名策略、枚举处理方式等),否则反序列化可能失败。

九、本文示例 Demo