From 7caa3084c61fda8f4f5098af1e71099fd47d700a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AF=BB=E6=AC=A2?= Date: Mon, 19 Feb 2024 14:57:36 +0800 Subject: [PATCH] =?UTF-8?q?:new:=20=E6=96=B0=E5=A2=9EMQ=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=80=9A=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/callback/hook.go | 3 + config/config.go | 1 + config/mq.go | 31 ++++++++ go.mod | 1 + go.sum | 2 + internal/initialize/datatable.go | 1 + internal/initialize/init.go | 2 + internal/message/handler.go | 56 +++++++++++++++ main.go | 3 +- model/entity/message.go | 26 +++++++ model/robot/chatroom.go | 2 +- model/robot/friend.go | 2 +- model/robot/message.go | 2 +- model/robot/response.go | 2 +- model/robot/userinfo.go | 2 +- mq/rabbitmq.go | 118 +++++++++++++++++++++++++++++++ router/callback/route.go | 2 +- service/message/save.go | 26 +++++++ 18 files changed, 274 insertions(+), 8 deletions(-) create mode 100644 config/mq.go create mode 100644 internal/message/handler.go create mode 100644 model/entity/message.go create mode 100644 mq/rabbitmq.go create mode 100644 service/message/save.go diff --git a/api/callback/hook.go b/api/callback/hook.go index 3d4f689..bcc788b 100644 --- a/api/callback/hook.go +++ b/api/callback/hook.go @@ -1,6 +1,7 @@ package callback import ( + "gitee.ltd/lxh/logger/log" "github.com/gin-gonic/gin" "wechat-robot/model/param/callback" "wechat-robot/pkg/response" @@ -15,4 +16,6 @@ func RobotHookNotify(ctx *gin.Context) { response.New(ctx).SetError(err).Fail() return } + + log.Debugf("机器人已启动,Id: %s", param.Robot) } diff --git a/config/config.go b/config/config.go index 1560277..534a2c5 100644 --- a/config/config.go +++ b/config/config.go @@ -8,4 +8,5 @@ var Conf conf type conf struct { Database db `json:"database" yaml:"database"` // 数据库 配置 Redis redis `json:"redis" yaml:"redis"` // Redis 配置 + Mq mq `json:"mq" yaml:"mq"` // MQ 配置 } diff --git a/config/mq.go b/config/mq.go new file mode 100644 index 0000000..caa7388 --- /dev/null +++ b/config/mq.go @@ -0,0 +1,31 @@ +package config + +import "fmt" + +// mq +// @description: MQ配置 +type mq struct { + RabbitMQ rabbitMq `json:"rabbitmq" yaml:"rabbitmq"` // RabbitMQ配置 +} + +// rabbitMq +// @description: RabbitMQ配置 +type rabbitMq struct { + Host string `json:"host" yaml:"host"` // 主机地址 + Port int `json:"port" yaml:"port"` // 端口 + User string `json:"user" yaml:"user"` // 用户名 + Password string `json:"password" yaml:"password"` // 密码 + VHost string `json:"vhost" yaml:"vhost"` // 虚拟主机 +} + +// GetURL +// @description: 获取MQ连接地址 +// @receiver r +// @return string +func (r rabbitMq) GetURL() string { + port := r.Port + if port == 0 { + port = 5672 + } + return fmt.Sprintf("amqp://%s:%s@%s:%d/%s", r.User, r.Password, r.Host, port, r.VHost) +} diff --git a/go.mod b/go.mod index 0ea4e8c..2487086 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/go-resty/resty/v2 v2.11.0 github.com/google/uuid v1.6.0 github.com/mojocn/base64Captcha v1.3.6 + github.com/rabbitmq/amqp091-go v1.2.0 github.com/spf13/viper v1.18.2 golang.org/x/crypto v0.18.0 gorm.io/driver/mysql v1.5.2 diff --git a/go.sum b/go.sum index 4295b0a..6eb5f5a 100644 --- a/go.sum +++ b/go.sum @@ -770,6 +770,8 @@ github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= github.com/prometheus/prometheus v1.8.2-0.20201028100903-3245b3267b24 h1:V/4Cj2GytqdqK7OMEz6c4LNjey3SNyfw3pg5jPKtJvQ= github.com/prometheus/prometheus v1.8.2-0.20201028100903-3245b3267b24/go.mod h1:MDRkz271loM/PrYN+wUNEaTMDGSP760MQzB0yEjdgSQ= +github.com/rabbitmq/amqp091-go v1.2.0 h1:1pHBxAsQh54R9eX/xo679fUEAfv3loMqi0pvRFOj2nk= +github.com/rabbitmq/amqp091-go v1.2.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0VTJ0kHRghqbM= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= diff --git a/internal/initialize/datatable.go b/internal/initialize/datatable.go index f02ff6f..a6bb49f 100644 --- a/internal/initialize/datatable.go +++ b/internal/initialize/datatable.go @@ -18,6 +18,7 @@ func databaseTable() { new(entity.SystemConfig), // 系统配置表 new(entity.Robot), // 机器人表 new(entity.AiAssistant), // AI助手表 + new(entity.Message), // 微信消息表 } // 同步表结构 diff --git a/internal/initialize/init.go b/internal/initialize/init.go index 738d4f5..106b5eb 100644 --- a/internal/initialize/init.go +++ b/internal/initialize/init.go @@ -2,6 +2,7 @@ package initialize import ( "wechat-robot/internal/tasks" + "wechat-robot/mq" "wechat-robot/pkg/auth" ) @@ -13,4 +14,5 @@ func InitSystem() { initDefaultAdminUser() // 初始化默认管理员用户 auth.InitOAuth2Server() // 初始化OAuth2服务 tasks.StartScheduled() // 启动定时任务 + mq.Init() // 初始化MQ } diff --git a/internal/message/handler.go b/internal/message/handler.go new file mode 100644 index 0000000..816c049 --- /dev/null +++ b/internal/message/handler.go @@ -0,0 +1,56 @@ +package message + +import ( + "encoding/json" + "log" + "strings" + "time" + "wechat-robot/model/entity" + "wechat-robot/model/robot" + "wechat-robot/pkg/types" + "wechat-robot/service/message" +) + +// Message +// @description: 处理消息 +// @param msg +// @return err +func Message(msg []byte) (err error) { + var m robot.Message + if err = json.Unmarshal(msg, &m); err != nil { + log.Printf("消息解析失败: %v", err) + log.Printf("消息内容: %d -> %v", len(msg), string(msg)) + return + } + // 记录原始数据 + m.Raw = string(msg) + // 提取出群成员信息 + // Sys类型的消息正文不包含微信 Id,所以不需要处理 + if m.IsGroup() && m.Type != types.MsgTypeSys { + // 群消息,处理一下消息和发信人 + groupUser := strings.Split(m.Content, "\n")[0] + groupUser = strings.ReplaceAll(groupUser, ":", "") + // 如果两个id一致,说明是系统发的 + if m.FromUser != groupUser { + m.GroupUser = groupUser + } + // 用户的操作单独提出来处理一下 + m.Content = strings.Join(strings.Split(m.Content, "\n")[1:], "\n") + } + log.Printf("收到微信消息\n机器人Id: %s\n消息来源: %s\n群成员: %s\n消息类型: %v\n消息内容: %s", m.ToUser, m.FromUser, m.GroupUser, m.Type, m.Content) + + // 消息入库 + var ent entity.Message + ent.MsgId = m.MsgId + ent.Timestamp = m.CreateTime + ent.MessageTime = time.Unix(int64(m.CreateTime), 0) + ent.Content = m.Content + ent.FromUser = m.FromUser + ent.GroupUser = m.GroupUser + ent.ToUser = m.ToUser + ent.Type = m.Type + ent.DisplayFullContent = m.DisplayFullContent + ent.Raw = m.Raw + err = message.Save(ent) + return +} diff --git a/main.go b/main.go index e6be185..1f099e9 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "wechat-robot/router/admin" "wechat-robot/router/callback" "wechat-robot/router/middleware" - "wechat-robot/tcpserver" ) // init @@ -25,7 +24,7 @@ func init() { // @description: 启动入口 func main() { // 启动TCP服务 - go tcpserver.Start() + //go tcpserver.Start() // 注册参数绑定错误信息翻译器 validator.Init() diff --git a/model/entity/message.go b/model/entity/message.go new file mode 100644 index 0000000..efdf758 --- /dev/null +++ b/model/entity/message.go @@ -0,0 +1,26 @@ +package entity + +import ( + "time" + "wechat-robot/pkg/types" +) + +// Message +// @description: 消息数据库结构体 +type Message struct { + types.BaseDbModelWithReal + MsgId int64 `gorm:"index:idx_msg_id,unique;type:bigint;not null;comment:'微信Id'"` // 消息Id + Timestamp int `gorm:"type:bigint;not null;comment:'消息时间戳'"` // 发送时间戳 + MessageTime time.Time `gorm:"type:datetime;not null;comment:'消息时间'"` // 发送时间 + Type types.MessageType `gorm:"type:int;not null;comment:'消息类型'"` // 消息类型 + Content string `gorm:"type:longtext;not null;comment:'消息内容'"` // 内容 + DisplayFullContent string `gorm:"type:longtext;not null;comment:'显示的完整内容'"` // 显示的完整内容 + FromUser string `gorm:"type:varchar(100);not null;comment:'发送者'"` // 发送者 + GroupUser string `gorm:"type:varchar(100);comment:'群成员'"` // 群成员 + ToUser string `gorm:"index:idx_msg_id,unique;type:varchar(100);not null;comment:'接收者'"` // 接收者 + Raw string `gorm:"type:longtext;not null;comment:'原始通知字符串'"` // 原始通知字符串 +} + +func (Message) TableName() string { + return "t_message" +} diff --git a/model/robot/chatroom.go b/model/robot/chatroom.go index 91ba3bc..7d89038 100644 --- a/model/robot/chatroom.go +++ b/model/robot/chatroom.go @@ -1,4 +1,4 @@ -package model +package robot // ChatRoomDetailInfo // @description: 群聊详情 diff --git a/model/robot/friend.go b/model/robot/friend.go index 3104172..06c0ed7 100644 --- a/model/robot/friend.go +++ b/model/robot/friend.go @@ -1,4 +1,4 @@ -package model +package robot // FriendItem // @description: 好友列表数据 diff --git a/model/robot/message.go b/model/robot/message.go index c49e8e0..9d91381 100644 --- a/model/robot/message.go +++ b/model/robot/message.go @@ -1,4 +1,4 @@ -package model +package robot import ( "encoding/xml" diff --git a/model/robot/response.go b/model/robot/response.go index 4bf376e..c845b73 100644 --- a/model/robot/response.go +++ b/model/robot/response.go @@ -1,4 +1,4 @@ -package model +package robot // Response // @description: 基础返回结构体 diff --git a/model/robot/userinfo.go b/model/robot/userinfo.go index c6449df..a3fb63d 100644 --- a/model/robot/userinfo.go +++ b/model/robot/userinfo.go @@ -1,4 +1,4 @@ -package model +package robot // UserInfo // @description: 机器人用户信息 diff --git a/mq/rabbitmq.go b/mq/rabbitmq.go new file mode 100644 index 0000000..14c7211 --- /dev/null +++ b/mq/rabbitmq.go @@ -0,0 +1,118 @@ +package mq + +import ( + "gitee.ltd/lxh/logger/log" + amqp "github.com/rabbitmq/amqp091-go" + "wechat-robot/config" + "wechat-robot/internal/message" +) + +// MQ连接对象 +var conn *amqp.Connection +var channel *amqp.Channel + +// 交换机名称 +const exchangeName = "wechat-message" + +// Init +// @description: 初始化MQ +func Init() { + // 读取MQ连接配置 + mqUrl := config.Conf.Mq.RabbitMQ.GetURL() + if mqUrl == "" { + log.Panicf("MQ配置异常") + } + + var err error + // 创建MQ连接 + if conn, err = amqp.Dial(mqUrl); err != nil { + log.Panicf("RabbitMQ连接失败: %s", err) + } + + //获取channel + if channel, err = conn.Channel(); err != nil { + log.Panicf("打开Channel失败: %s", err) + } + log.Debug("RabbitMQ连接成功") + go Receive() + log.Debug("开始监听消息") +} + +// Receive +// @description: 接收消息 +func Receive() (err error) { + // 创建交换机 + if err = channel.ExchangeDeclare( + exchangeName, + "fanout", + true, + false, + //true表示这个exchange不可以被client用来推送消息,仅用来进行exchange和exchange之间的绑定 + false, + false, + nil, + ); err != nil { + log.Errorf("声明Exchange失败: %s", err) + return + } + // 创建队列 + var q amqp.Queue + q, err = channel.QueueDeclare( + "", //随机生产队列名称 + false, + false, + true, + false, + nil, + ) + if err != nil { + log.Errorf("无法声明Queue: %s", err) + return + } + // 绑定队列到 exchange 中 + err = channel.QueueBind( + q.Name, + //在pub/sub模式下,这里的key要为空 + "", + exchangeName, + false, + nil) + if err != nil { + log.Errorf("绑定队列失败: %s", err) + return + } + + // 消费消息 + var messages <-chan amqp.Delivery + messages, err = channel.Consume( + q.Name, + "", + false, // 不自动ack,手动处理,这样即使消费者挂掉,消息也不会丢失 + false, + false, + false, + nil, + ) + if err != nil { + log.Errorf("无法使用队列: %s", err) + return + } + for { + msg, ok := <-messages + if !ok { + log.Errorf("获取消息失败") + return Receive() + } + log.Debugf("收到消息: %s", msg.Body) + if err = message.Message(msg.Body); err != nil { + log.Errorf("处理消息失败: %s", err) + continue + } + // ACK消息 + if err = msg.Ack(true); err != nil { + log.Errorf("ACK消息失败: %s", err) + continue + } + } + +} diff --git a/router/callback/route.go b/router/callback/route.go index a2e325e..ae07885 100644 --- a/router/callback/route.go +++ b/router/callback/route.go @@ -9,5 +9,5 @@ import ( // @description: 初始化路由 // @param g func InitRoute(g *gin.RouterGroup) { - g.GET("/hook", callback.RobotHookNotify) + g.POST("/hook", callback.RobotHookNotify) } diff --git a/service/message/save.go b/service/message/save.go new file mode 100644 index 0000000..03d56d3 --- /dev/null +++ b/service/message/save.go @@ -0,0 +1,26 @@ +package message + +import ( + "gitee.ltd/lxh/logger/log" + "wechat-robot/internal/database" + "wechat-robot/model/entity" +) + +// Save +// @description: 保存消息 +// @param ent entity.Message 消息实体 +// @return err error 错误 +func Save(ent entity.Message) (err error) { + // 判断是否存在 + var count int64 + database.Client.Model(&ent).Where("msg_id = ?", ent.MsgId).Where("to_user = ?", ent.ToUser).Count(&count) + if count > 0 { + return + } + // 保存入库 + err = database.Client.Create(&ent).Error + if err != nil { + log.Errorf("消息保存失败: %v", err) + } + return +}