go-wxhelper/mq/rabbitmq.go

128 lines
2.4 KiB
Go
Raw Permalink Normal View History

2024-02-19 14:57:36 +08:00
package mq
import (
"gitee.ltd/lxh/logger/log"
amqp "github.com/rabbitmq/amqp091-go"
"wechat-robot/config"
"wechat-robot/internal/message"
)
// MQ连接对象
var conn *amqp.Connection
var channel *amqp.Channel
// 交换机名称
const exchangeName = "wechat-message"
// Init
// @description: 初始化MQ
func Init() {
// 读取MQ连接配置
mqUrl := config.Conf.Mq.RabbitMQ.GetURL()
if mqUrl == "" {
log.Panicf("MQ配置异常")
}
var err error
// 创建MQ连接
if conn, err = amqp.Dial(mqUrl); err != nil {
log.Panicf("RabbitMQ连接失败: %s", err)
}
//获取channel
if channel, err = conn.Channel(); err != nil {
log.Panicf("打开Channel失败: %s", err)
}
log.Debug("RabbitMQ连接成功")
go Receive()
log.Debug("开始监听消息")
}
// Receive
// @description: 接收消息
2024-02-19 15:04:14 +08:00
func Receive(retry ...int) (err error) {
var retryCount int
if len(retry) > 0 {
retryCount = retry[0]
}
// 重试次数超过100次退出
if retryCount > 100 {
log.Panicf("获取消息失败次数过多")
}
2024-02-19 14:57:36 +08:00
// 创建交换机
if err = channel.ExchangeDeclare(
exchangeName,
"fanout",
true,
false,
//true表示这个exchange不可以被client用来推送消息仅用来进行exchange和exchange之间的绑定
false,
false,
nil,
); err != nil {
log.Errorf("声明Exchange失败: %s", err)
return
}
// 创建队列
var q amqp.Queue
q, err = channel.QueueDeclare(
"", //随机生产队列名称
false,
false,
true,
false,
nil,
)
if err != nil {
log.Errorf("无法声明Queue: %s", err)
return
}
// 绑定队列到 exchange 中
err = channel.QueueBind(
q.Name,
//在pub/sub模式下这里的key要为空
"",
exchangeName,
false,
nil)
if err != nil {
log.Errorf("绑定队列失败: %s", err)
return
}
// 消费消息
var messages <-chan amqp.Delivery
messages, err = channel.Consume(
q.Name,
"",
false, // 不自动ack手动处理这样即使消费者挂掉消息也不会丢失
false,
false,
false,
nil,
)
if err != nil {
log.Errorf("无法使用队列: %s", err)
return
}
for {
msg, ok := <-messages
if !ok {
log.Errorf("获取消息失败")
2024-02-19 15:04:14 +08:00
return Receive(retryCount + 1)
2024-02-19 14:57:36 +08:00
}
log.Debugf("收到消息: %s", msg.Body)
if err = message.Message(msg.Body); err != nil {
log.Errorf("处理消息失败: %s", err)
continue
}
// ACK消息
if err = msg.Ack(true); err != nil {
log.Errorf("ACK消息失败: %s", err)
continue
}
}
}