代码组织结构

首先我们重新组织了一下代码目录结构,新增了一个gate目录用来存放网关代码,robot目录用来存放访问网关的mqtt客户端代码

    工程目录
        |-bin
            |-conf
                |-server.conf
        |-helloworld
            |-module.go
            |-xxx.go
        |-gate
            |-module.go
        |-robot
            |-test
                |-manager.go
                |-work.go
            |-robot_task.go
        |-main.go

编写第一个网关

package mgate

import (
    "github.com/liangdas/mqant/conf"
    "github.com/liangdas/mqant/gate"
    "github.com/liangdas/mqant/gate/base"
    "github.com/liangdas/mqant/module"
)

var Module = func() module.Module {
    gate := new(Gate)
    return gate
}

type Gate struct {
    basegate.Gate //继承
}

func (this *Gate) GetType() string {
    //很关键,需要与配置文件中的Module配置对应
    return "Gate"
}
func (this *Gate) Version() string {
    //可以在监控时了解代码版本
    return "1.0.0"
}

func (this *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
    //注意这里一定要用 gate.Gate 而不是 module.BaseModule
    this.Gate.OnInit(this, app, settings,
        gate.WsAddr(":3653"),
        gate.TcpAddr(":3563"),
    )
}

网关监听端口

func (this *Gate) OnInit(app module.App, settings *conf.ModuleSettings) {
    //注意这里一定要用 gate.Gate 而不是 module.BaseModule
    this.Gate.OnInit(this, app, settings,
        gate.WsAddr(":3653"),
        gate.TcpAddr(":3563"),
    )
}

运行

2020-05-05T20:12:32.5603+08:00 [-] [-] [development] [I] [rpc_server.go:142] Registering node: Gate@ba5ccc4ce9feb31c
2020-05-05T20:12:32.568484+08:00 [-] [-] [development] [I] [module.go:45] rpctest模块运行中...
2020-05-05T20:12:32.573808+08:00 [-] [-] [development] [I] [ws_server_x.go:131] WS Listen ::3653
2020-05-05T20:12:32.574043+08:00 [-] [-] [development] [I] [tcp_server.go:39] TCP Listen ::3563

编写第一个客户端

核心逻辑在robot/test/work.go

package test_task

import (
    "encoding/json"
    "fmt"
    MQTT "github.com/eclipse/paho.mqtt.golang"
    "github.com/liangdas/armyant/task"
    "github.com/liangdas/armyant/work"
)

func NewWork(manager *Manager) *Work {
    this := new(Work)
    this.manager = manager
    //opts := this.GetDefaultOptions("tcp://127.0.0.1:3563")
    opts := this.GetDefaultOptions("ws://127.0.0.1:3653")
    opts.SetConnectionLostHandler(func(client MQTT.Client, err error) {
        fmt.Println("ConnectionLost", err.Error())
    })
    opts.SetOnConnectHandler(func(client MQTT.Client) {
        fmt.Println("OnConnectHandler")
    })
    err := this.Connect(opts)
    if err != nil {
        fmt.Println(err.Error())
    }

    this.On("/gate/send/test", func(client MQTT.Client, msg MQTT.Message) {
        fmt.Println(msg.Topic(), string(msg.Payload()))
    })
    return this
}

/**
Work 代表一个协程内具体执行任务工作者
*/
type Work struct {
    work.MqttWork
    manager *Manager
}

func (this *Work) UnmarshalResult(payload []byte) map[string]interface{} {
    rmsg := map[string]interface{}{}
    json.Unmarshal(payload, &rmsg)
    return rmsg["Result"].(map[string]interface{})
}

/**
每一次请求都会调用该函数,在该函数内实现具体请求操作

task:=task.Task{
        N:1000,    //一共请求次数,会被平均分配给每一个并发协程
        C:100,        //并发数
        //QPS:10,        //每一个并发平均每秒请求次数(限流) 不填代表不限流
}

N/C 可计算出每一个Work(协程) RunWorker将要调用的次数
*/
func (this *Work) RunWorker(t task.Task) {
    msg, err := this.Request("helloworld/HD_say", []byte(`{"name":"mqant"}`))
    if err != nil {
        return
    }

    fmt.Println(msg.Topic(), string(msg.Payload()))
}
func (this *Work) Init(t task.Task) {

}
func (this *Work) Close(t task.Task) {
    this.GetClient().Disconnect(0)
}

主动发起请求

给后端helloworld发起一个handler(HD_say)调用

msg, err := this.Request("helloworld/HD_say", []byte(`{"name":"mqant"}`))
if err != nil {
    return
}
fmt.Println(msg.Topic(), string(msg.Payload()))

监听服务器主动下发消息

this.On("/gate/send/test", func(client MQTT.Client, msg MQTT.Message) {
        fmt.Println(msg.Topic(), string(msg.Payload()))
})

编写后端handler

handler实现

func (self *HellWorld) gatesay(session gate.Session,msg map[string]interface{}) (r string, err error) {
    session.Send("/gate/send/test",[]byte(fmt.Sprintf("send hi to %v", msg["name"])))
    return fmt.Sprintf("hi %v 你在网关 %v", msg["name"],session.GetServerId()), nil
}

注册handler

self.GetServer().RegisterGO("HD_say", self.gatesay)

主动给客户端发送消息

session.Send("/gate/send/test",[]byte(fmt.Sprintf("send hi to %v", msg["name"])))

运行客户端(robot)

go run robot_task.go

开始压测请等待
Connect...
OnConnectHandler
/gate/send/test send hi to mqant
helloworld/HD_say/1 {"Trace":"5f7f87ee73f79b7a","Error":"","Result":"hi mqant 你在网关 Gate@1a8a1b29c7496c04"}

后续

下一章我们讲解网关的路由协议如何工作的。

Copyright © 梁大帅 2020 all right reserved,powered by Gitbook该文件修订时间: 2020-05-06 14:22:46

results matching ""

    No results matching ""