:feature: 添加WebSocket API以实时获取容器日志,并优化消息结构
All checks were successful
BuildImage / build-image (push) Successful in 7m30s

This commit is contained in:
李寻欢 2025-04-22 20:44:57 +08:00
parent f2c9eed3cf
commit b256d82855
8 changed files with 224 additions and 36 deletions

3
go.mod
View File

@ -27,6 +27,7 @@ require (
github.com/containerd/log v0.1.0 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/fasthttp/websocket v1.5.3 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-jose/go-jose/v4 v4.0.4 // indirect
@ -37,6 +38,7 @@ require (
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/gofiber/template v1.8.3 // indirect
github.com/gofiber/utils v1.1.0 // indirect
github.com/gofiber/websocket/v2 v2.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
@ -61,6 +63,7 @@ require (
github.com/rivo/uniseg v0.4.7 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/sagikazarmark/locafero v0.9.0 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.14.0 // indirect
github.com/spf13/cast v1.7.1 // indirect

6
go.sum
View File

@ -25,6 +25,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek=
github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
@ -57,6 +59,8 @@ github.com/gofiber/template/html/v2 v2.1.3 h1:n1LYBtmr9C0V/k/3qBblXyMxV5B0o/gpb6
github.com/gofiber/template/html/v2 v2.1.3/go.mod h1:U5Fxgc5KpyujU9OqKzy6Kn6Qup6Tm7zdsISR+VpnHRE=
github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM=
github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0=
github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w=
github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
@ -133,6 +137,8 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/sagikazarmark/locafero v0.9.0 h1:GbgQGNtTrEmddYDSAH9QLRyfAHY12md+8YFTqyMTC9k=
github.com/sagikazarmark/locafero v0.9.0/go.mod h1:UBUyz37V+EdMS3hDF3QWIiVr/2dPrx49OMO0Bn0hJqk=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=

View File

@ -0,0 +1,123 @@
package handler
import (
"context"
"io"
"strings"
"time"
"github.com/docker/docker/api/types/container"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
"gitee.ltd/lxh/wechat-robot/internal/docker"
)
// WebSocketContainerLogs 通过WebSocket推送容器日志
func WebSocketContainerLogs(c *websocket.Conn) {
// 获取容器ID
containerID := c.Params("id")
if containerID == "" {
c.Close()
return
}
// 创建上下文,可以通过客户端关闭连接来取消
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 设置客户端关闭时取消上下文
go func() {
defer cancel()
for {
if _, _, err := c.ReadMessage(); err != nil {
// 客户端断开连接
return
}
}
}()
// 获取Docker客户端
cli := docker.GetClient()
// 设置日志选项
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true, // 持续跟踪日志
Tail: "100", // 初始返回100行
}
// 获取容器日志流
logStream, err := cli.ContainerLogs(ctx, containerID, options)
if err != nil {
// 发送错误消息并关闭连接
c.WriteMessage(websocket.TextMessage, []byte("获取容器日志失败: "+err.Error()))
c.Close()
return
}
defer logStream.Close()
// 创建缓冲区
buffer := make([]byte, 4096)
// 持续读取日志并发送到WebSocket
for {
select {
case <-ctx.Done():
// 上下文被取消,退出循环
return
default:
// 读取日志
n, err := logStream.Read(buffer)
if err != nil {
if err == io.EOF {
// 日志读取完毕,等待新日志
time.Sleep(500 * time.Millisecond)
continue
}
// 其他错误,关闭连接
return
}
if n > 0 {
// 清理Docker日志头部的8个字节控制信息
cleanedLog := cleanDockerLogBuffer(buffer[:n])
// 发送日志到WebSocket
if err := c.WriteMessage(websocket.TextMessage, cleanedLog); err != nil {
// 发送失败,关闭连接
return
}
}
}
}
}
// ContainerLogsWebSocketMiddleware WebSocket中间件用于升级HTTP连接到WebSocket
func ContainerLogsWebSocketMiddleware() fiber.Handler {
return websocket.New(WebSocketContainerLogs, websocket.Config{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
})
}
// cleanDockerLogBuffer 清理Docker日志缓冲区
func cleanDockerLogBuffer(buffer []byte) []byte {
// 如果缓冲区太小,直接返回
if len(buffer) <= 8 {
return buffer
}
// 分割日志为行
lines := strings.Split(string(buffer), "\n")
for i, line := range lines {
if len(line) > 8 {
// 跳过每行前8个字节的头部信息
lines[i] = line[8:]
}
}
// 合并并返回
return []byte(strings.Join(lines, "\n"))
}

View File

@ -21,8 +21,9 @@ const (
// Message 表示微信消息
type Message struct {
BaseModel
RobotId uint // 机器人Id
MsgId int64 // 消息Id
RobotId uint `gorm:"index:deleted,unique"` // 机器人Id
MsgId int64 `gorm:"index:deleted,unique"` // 消息Id
ClientMsgId int `gorm:"index:deleted,unique"` // 客户端消息Id
CreateTime int // 发送时间戳
CreateAt time.Time // 发送时间
Type types.MessageType // 消息类型

View File

@ -132,6 +132,8 @@ func (s *Server) SetupRoutes() {
// 添加容器API接口
s.app.Get("/api/v1/containers/:id/logs", handler.GetContainerLogs)
s.app.Get("/api/v1/containers/:id/stats", handler.GetContainerStats)
// 添加WebSocket API实时获取容器日志
s.app.Get("/api/v1/ws/containers/:id/logs", handler.ContainerLogsWebSocketMiddleware())
// 受保护的路由 - 简化为仅保留必要功能
auth := s.app.Group("/admin", middleware.Authenticate())

View File

@ -43,6 +43,7 @@ func syncMessage(client *xybot.Client, robotId uint) {
var m model.Message
m.RobotId = robotId
m.MsgId = message.NewMsgId
m.ClientMsgId = message.MsgId
m.CreateTime = message.CreateTime
m.CreateAt = time.Unix(int64(message.CreateTime), 0)
m.Type = types.MessageType(message.MsgType)

View File

@ -34,32 +34,7 @@ func Start() {
}
// 遍历机器人,添加任务
for _, robot := range robots {
// 初始化微信客户端
robotCli, err := xybot.NewClient(robot.WechatID, robot.ContainerHost, false)
if err != nil {
log.Panicf("初始化微信客户端失败: %v", err)
}
var job gocron.Job
job, err = scheduler.NewJob(
gocron.CronJob("*/5 * * * * *", true), // 五秒钟同步一次
gocron.NewTask(syncMessage, robotCli, robot.ID),
)
if err != nil {
log.Panicf("添加定时任务失败: %v", err)
}
// 添加到已启动的任务列表
enabledSyncMessageMap.Store(robot.ID, job.ID())
// 添加联系人同步任务
job, err = scheduler.NewJob(
gocron.CronJob("0 */1 * * *", true), // 每小时同步一次
gocron.NewTask(syncContact, robot.ContainerHost, robot.WechatID, robot.ID),
)
if err != nil {
log.Panicf("添加联系人同步任务失败: %v", err)
}
// 添加到已启动的任务列表
enabledSyncContactMap.Store(robot.ID, job.ID())
AddJob(robot) // 添加定时任务
}
// 启动定时任务

View File

@ -17,7 +17,7 @@
</div>
<div>
<!-- 修改删除按钮使用confirmDialog而非直接调用deleteRobot函数 -->
<button onclick="confirmDelete({{.Robot.ID}}, '{{.Robot.Nickname}}')" class="inline-flex items-center justify-center px-4 py-2 border border-transparent rounded-md shadow-sm text-sm font-medium text-white bg-red-600 hover:bg-red-700 transition-colors">
<button onclick="confirmDelete('{{.Robot.ID}}', '{{.Robot.Nickname}}')" class="inline-flex items-center justify-center px-4 py-2 border border-transparent rounded-md shadow-sm text-sm font-medium text-white bg-red-600 hover:bg-red-700 transition-colors">
<i class="fas fa-trash mr-2"></i> 删除
</button>
</div>
@ -190,9 +190,60 @@
// 容器ID可能长度不一致确保使用完整ID
const containerId = "{{.Robot.ContainerID}}";
// WebSocket连接
let logsSocket = null;
// 加载日志
function loadLogs() {
// 连接WebSocket
function connectLogsWebSocket() {
logsLoading.style.display = 'flex';
containerLogs.textContent = '正在连接到WebSocket...';
// 关闭已存在的WebSocket连接
if (logsSocket) {
logsSocket.close();
}
// 创建新的WebSocket连接
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${wsProtocol}//${window.location.host}/api/v1/ws/containers/${containerId}/logs`;
logsSocket = new WebSocket(wsUrl);
// 连接打开时
logsSocket.onopen = function() {
containerLogs.textContent = '已连接,等待日志数据...';
logsLoading.style.display = 'none';
};
// 收到消息时
logsSocket.onmessage = function(event) {
// 如果是第一条消息,清空容器
if (containerLogs.textContent === '已连接,等待日志数据...') {
containerLogs.textContent = '';
}
// 添加新日志
containerLogs.textContent += event.data;
// 自动滚动到底部
containerLogs.scrollTop = containerLogs.scrollHeight;
};
// 连接关闭时
logsSocket.onclose = function() {
containerLogs.textContent += '\n\n连接已关闭。点击刷新按钮重新连接。';
};
// 发生错误时
logsSocket.onerror = function(error) {
containerLogs.textContent = '连接错误: ' + JSON.stringify(error);
logsLoading.style.display = 'none';
};
}
// 加载历史日志初次加载时使用HTTP API获取
function loadInitialLogs() {
logsLoading.style.display = 'flex';
fetch(`/api/v1/containers/${containerId}/logs?lines=${logLines.value}`)
@ -201,6 +252,9 @@
if (data.success && data.logs) {
containerLogs.textContent = data.logs;
containerLogs.scrollTop = containerLogs.scrollHeight;
// 加载历史日志后连接WebSocket获取实时日志
connectLogsWebSocket();
} else {
containerLogs.textContent = '无法获取日志: ' + (data.message || '未知错误');
}
@ -261,12 +315,28 @@
}
// 初始加载日志和状态
loadLogs();
loadInitialLogs();
loadStats();
// 设置事件监听器
reloadLogs.addEventListener('click', loadLogs);
logLines.addEventListener('change', loadLogs);
reloadLogs.addEventListener('click', function() {
// 先断开WebSocket连接
if (logsSocket) {
logsSocket.close();
}
// 重新加载初始日志然后会自动重连WebSocket
loadInitialLogs();
});
logLines.addEventListener('change', function() {
// 先断开WebSocket连接
if (logsSocket) {
logsSocket.close();
}
// 重新加载初始日志然后会自动重连WebSocket
loadInitialLogs();
});
refreshBtn.addEventListener('click', function() {
this.classList.add('animate-spin');
loadStats();
@ -277,13 +347,20 @@
// 定期更新状态
setInterval(loadStats, 5000); // 每5秒更新一次
// 页面关闭时断开WebSocket连接
window.addEventListener('beforeunload', function() {
if (logsSocket) {
logsSocket.close();
}
});
});
// 确认删除函数
async function confirmDelete(id, name) {
if (typeof confirmDialog === 'function') {
const confirmed = await confirmDialog(
`确定要删除机器人"${name}"吗?此操作将永久删除容器及相关数据,无法恢复!`,
"确定要删除机器人\"" + name + "\"吗?此操作将永久删除容器及相关数据,无法恢复!",
{ type: 'danger', title: '删除机器人' }
);
@ -292,7 +369,7 @@
}
} else {
// 降级方案使用原生confirm
if (confirm(`确定要删除机器人"${name}"吗?此操作不可恢复!`)) {
if (confirm("确定要删除机器人\"" + name + "\"吗?此操作不可恢复!")) {
deleteRobot(id);
}
}