RPC路由规则

mqant 每一类模块可以部署到多台服务器中,因此需要一个nodeId对同一类模块进行区分。 在框架加入服务注册和发现功能后,nodeId通过服务发现模块在服务启动时自动生成,无法提前编码指定。

RPC调用方法介绍

1. 通过Call函数调度(推荐)

/*
通用RPC调度函数
ctx         context.Context             上下文,可以设置这次请求的超时时间
moduleType    string                         服务名称 serverId 或 serverId@nodeId
_func        string                        需要调度的服务方法
param         mqrpc.ParamOption            方法传参
opts ...selector.SelectOption            服务发现模块过滤,可以用来选择调用哪个服务节点
 */
Call(ctx context.Context, moduleType, _func string, param mqrpc.ParamOption, opts ...selector.SelectOption) (interface{}, string)

特点

  • 支持设置调用超时时间
  • 支持自定义的服务节点选择过滤器

超时时间设置

ctx, _ := context.WithTimeout(context.TODO(), time.Second*3) //3s后超时
rstr,err:=mqrpc.String(
    self.Call(
        ctx,
    "helloworld",    //要访问的moduleType
    "/say/hi",            //访问模块中handler路径
    mqrpc.Param(r.Form.Get("name")),
    ),
)

超时时间仅是调用方有效,超时后无法取消被调用方正在执行的任务。

服务节点选择过滤器

伪代码

ctx, _ := context.WithTimeout(context.TODO(), time.Second*3)
rstr,err:=mqrpc.String(
    self.Call(
        ctx,
    "helloworld",    //要访问的moduleType
    "/say/hi",            //访问模块中handler路径
    mqrpc.Param(r.Form.Get("name")),
    selector.WithStrategy(func(services []*registry.Service) selector.Next {
        var nodes []*registry.Node

        // Filter the nodes for datacenter
        for _, service := range services {
            for _, node := range service.Nodes {
                if node.Metadata["version"] == "1.0.0" {
                    nodes = append(nodes, node)
                }
            }
        }

        var mtx sync.Mutex
        //log.Info("services[0] $v",services[0].Nodes[0])
        return func() (*registry.Node, error) {
            mtx.Lock()
            defer mtx.Unlock()
            if len(nodes) == 0 {
                return nil, fmt.Errorf("no node")
            }
            index := rand.Intn(int(len(nodes)))
            return nodes[index], nil
        }
    }),
    ),
)

moduleType的格式

  • 指定到模块级别

    当moduleType为模块名时 func GetType()值一样,rpc将查找模块已启用的所有节点,然后根据【节点选择过滤器】选择一个节点发起调用

  • 指定到节点级别

    格式为 moduleType@moduleID 例如 helloworld@1b0073cbbab33247,rpc将直接选择节点1b0073cbbab33247发起调用

2. 通过RpcInvoke函数调度

module.Invoke(moduleType string, _func string, params ...interface{})

特点

  • 不支持设置调用超时时间(只能通过配置文件设置全局RPC超时时间)
  • 不支持自定义的服务节点选择过滤器
  • 支持moduleType过滤

3. 通过InvokeNR函数调度

module.InvokeNR(moduleType string, _func string, params ...interface{})

特点

  • 包涵Invoke所有特点
  • 本函数无需等待返回结果(不会阻塞),仅投递RPC消息

4. 指定节点调用

查找到节点(module.ServerSession),通过节点结构体提供的方法调用

moduleType 模块名称(类型)
opts        服务节点选择过滤器
func GetRouteServer(moduleType string, opts ...selector.SelectOption) (s module.ServerSession, err error)
SvrSession, err :=self.GetRouteServer("helloworld",
selector.WithStrategy(func(services []*registry.Service) selector.Next {
    var nodes []*registry.Node

    // Filter the nodes for datacenter
    for _, service := range services {
        for _, node := range service.Nodes {
            if node.Metadata["version"] == "1.0.0" {
                nodes = append(nodes, node)
            }
        }
    }

    var mtx sync.Mutex
    //log.Info("services[0] $v",services[0].Nodes[0])
    return func() (*registry.Node, error) {
        mtx.Lock()
        defer mtx.Unlock()
        if len(nodes) == 0 {
            return nil, fmt.Errorf("no node")
        }
        index := rand.Intn(int(len(nodes)))
        return nodes[index], nil
    }
}), )
if err != nil {
    log.Warning("HelloWorld error:%v", err.Error())
    return 
}
rstr, err :=mqrpc.String(SvrSession.Call(ctx, "/say/hi", r.Form.Get("name")))
if err != nil {
    log.Warning("HelloWorld error:%v", err)
    return 
}

所属

以上的调用方法在module级别和app级别都有对应实现,可灵活选择

Copyright © 梁大帅 2020 all right reserved,powered by Gitbook该文件修订时间: 2020-06-03 09:38:44

results matching ""

    No results matching ""