RPC路由规则
mqant 每一类模块可以部署到多台服务器中,因此需要一个nodeId对同一类模块进行区分。 在框架加入服务注册和发现功能后,nodeId通过服务发现模块在服务启动时自动生成,无法提前编码指定。
RPC调用方法介绍
1. 通过Call函数调度(推荐)
/*
通用RPC调度函数
ctx context.Context 上下文,可以设置这次请求的超时时间
moduleType string 服务名称 serverId 或 serverId@nodeId
_func string 需要调度的服务方法
param mqrpc.ParamOption 方法传参
opts ...selector.SelectOption 服务发现模块过滤,可以用来选择调用哪个服务节点
*/
Call(ctx context.Context, moduleType, _func string, param mqrpc.ParamOption, opts ...selector.SelectOption) (interface{}, string)
特点
- 支持设置调用超时时间
- 支持自定义的服务节点选择过滤器
超时时间设置
ctx, _ := context.WithTimeout(context.TODO(), time.Second*3) //3s后超时
rstr,err:=mqrpc.String(
self.Call(
ctx,
"helloworld", //要访问的moduleType
"/say/hi", //访问模块中handler路径
mqrpc.Param(r.Form.Get("name")),
),
)
超时时间仅是调用方有效,超时后无法取消被调用方正在执行的任务。
服务节点选择过滤器
伪代码
ctx, _ := context.WithTimeout(context.TODO(), time.Second*3)
rstr,err:=mqrpc.String(
self.Call(
ctx,
"helloworld", //要访问的moduleType
"/say/hi", //访问模块中handler路径
mqrpc.Param(r.Form.Get("name")),
selector.WithStrategy(func(services []*registry.Service) selector.Next {
var nodes []*registry.Node
// Filter the nodes for datacenter
for _, service := range services {
for _, node := range service.Nodes {
if node.Metadata["version"] == "1.0.0" {
nodes = append(nodes, node)
}
}
}
var mtx sync.Mutex
//log.Info("services[0] $v",services[0].Nodes[0])
return func() (*registry.Node, error) {
mtx.Lock()
defer mtx.Unlock()
if len(nodes) == 0 {
return nil, fmt.Errorf("no node")
}
index := rand.Intn(int(len(nodes)))
return nodes[index], nil
}
}),
),
)
moduleType的格式
指定到模块级别
当moduleType为模块名时 func GetType()值一样,rpc将查找模块已启用的所有节点,然后根据【节点选择过滤器】选择一个节点发起调用
指定到节点级别
格式为 moduleType@moduleID 例如 helloworld@1b0073cbbab33247,rpc将直接选择节点1b0073cbbab33247发起调用
2. 通过RpcInvoke函数调度
module.Invoke(moduleType string, _func string, params ...interface{})
特点
- 不支持设置调用超时时间(只能通过配置文件设置全局RPC超时时间)
- 不支持自定义的服务节点选择过滤器
- 支持moduleType过滤
3. 通过InvokeNR函数调度
module.InvokeNR(moduleType string, _func string, params ...interface{})
特点
- 包涵Invoke所有特点
- 本函数无需等待返回结果(不会阻塞),仅投递RPC消息
4. 指定节点调用
查找到节点(module.ServerSession),通过节点结构体提供的方法调用
moduleType 模块名称(类型)
opts 服务节点选择过滤器
func GetRouteServer(moduleType string, opts ...selector.SelectOption) (s module.ServerSession, err error)
SvrSession, err :=self.GetRouteServer("helloworld",
selector.WithStrategy(func(services []*registry.Service) selector.Next {
var nodes []*registry.Node
// Filter the nodes for datacenter
for _, service := range services {
for _, node := range service.Nodes {
if node.Metadata["version"] == "1.0.0" {
nodes = append(nodes, node)
}
}
}
var mtx sync.Mutex
//log.Info("services[0] $v",services[0].Nodes[0])
return func() (*registry.Node, error) {
mtx.Lock()
defer mtx.Unlock()
if len(nodes) == 0 {
return nil, fmt.Errorf("no node")
}
index := rand.Intn(int(len(nodes)))
return nodes[index], nil
}
}), )
if err != nil {
log.Warning("HelloWorld error:%v", err.Error())
return
}
rstr, err :=mqrpc.String(SvrSession.Call(ctx, "/say/hi", r.Form.Get("name")))
if err != nil {
log.Warning("HelloWorld error:%v", err)
return
}
所属
以上的调用方法在module级别和app级别都有对应实现,可灵活选择