From b256d828559305832a33ce837f620375a00ea9ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E5=AF=BB=E6=AC=A2?= Date: Tue, 22 Apr 2025 20:44:57 +0800 Subject: [PATCH] =?UTF-8?q?:feature:=20=E6=B7=BB=E5=8A=A0WebSocket=20API?= =?UTF-8?q?=E4=BB=A5=E5=AE=9E=E6=97=B6=E8=8E=B7=E5=8F=96=E5=AE=B9=E5=99=A8?= =?UTF-8?q?=E6=97=A5=E5=BF=97=EF=BC=8C=E5=B9=B6=E4=BC=98=E5=8C=96=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 + go.sum | 6 ++ internal/handler/ws_container_logs.go | 123 ++++++++++++++++++++++++++ internal/model/message.go | 5 +- internal/server/server.go | 2 + internal/tasks/sync.go | 1 + internal/tasks/tasks.go | 27 +----- internal/view/robot/show.html | 93 +++++++++++++++++-- 8 files changed, 224 insertions(+), 36 deletions(-) create mode 100644 internal/handler/ws_container_logs.go diff --git a/go.mod b/go.mod index 2a25ee3..18c90e6 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/containerd/log v0.1.0 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/fasthttp/websocket v1.5.3 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-jose/go-jose/v4 v4.0.4 // indirect @@ -37,6 +38,7 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/gofiber/template v1.8.3 // indirect github.com/gofiber/utils v1.1.0 // indirect + github.com/gofiber/websocket/v2 v2.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect @@ -61,6 +63,7 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sagikazarmark/locafero v0.9.0 // indirect + github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect github.com/sourcegraph/conc v0.3.0 // indirect github.com/spf13/afero v1.14.0 // indirect github.com/spf13/cast v1.7.1 // indirect diff --git a/go.sum b/go.sum index bf076d7..2d33260 100644 --- a/go.sum +++ b/go.sum @@ -25,6 +25,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/fasthttp/websocket v1.5.3 h1:TPpQuLwJYfd4LJPXvHDYPMFWbLjsT91n3GpWtCQtdek= +github.com/fasthttp/websocket v1.5.3/go.mod h1:46gg/UBmTU1kUaTcwQXpUxtRwG2PvIZYeA8oL6vF3Fs= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= @@ -57,6 +59,8 @@ github.com/gofiber/template/html/v2 v2.1.3 h1:n1LYBtmr9C0V/k/3qBblXyMxV5B0o/gpb6 github.com/gofiber/template/html/v2 v2.1.3/go.mod h1:U5Fxgc5KpyujU9OqKzy6Kn6Qup6Tm7zdsISR+VpnHRE= github.com/gofiber/utils v1.1.0 h1:vdEBpn7AzIUJRhe+CiTOJdUcTg4Q9RK+pEa0KPbLdrM= github.com/gofiber/utils v1.1.0/go.mod h1:poZpsnhBykfnY1Mc0KeEa6mSHrS3dV0+oBWyeQmb2e0= +github.com/gofiber/websocket/v2 v2.2.1 h1:C9cjxvloojayOp9AovmpQrk8VqvVnT8Oao3+IUygH7w= +github.com/gofiber/websocket/v2 v2.2.1/go.mod h1:Ao/+nyNnX5u/hIFPuHl28a+NIkrqK7PRimyKaj4JxVU= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -133,6 +137,8 @@ github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZV github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/sagikazarmark/locafero v0.9.0 h1:GbgQGNtTrEmddYDSAH9QLRyfAHY12md+8YFTqyMTC9k= github.com/sagikazarmark/locafero v0.9.0/go.mod h1:UBUyz37V+EdMS3hDF3QWIiVr/2dPrx49OMO0Bn0hJqk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk= +github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= diff --git a/internal/handler/ws_container_logs.go b/internal/handler/ws_container_logs.go new file mode 100644 index 0000000..37ae9c7 --- /dev/null +++ b/internal/handler/ws_container_logs.go @@ -0,0 +1,123 @@ +package handler + +import ( + "context" + "io" + "strings" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/websocket/v2" + + "gitee.ltd/lxh/wechat-robot/internal/docker" +) + +// WebSocketContainerLogs 通过WebSocket推送容器日志 +func WebSocketContainerLogs(c *websocket.Conn) { + // 获取容器ID + containerID := c.Params("id") + if containerID == "" { + c.Close() + return + } + + // 创建上下文,可以通过客户端关闭连接来取消 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // 设置客户端关闭时取消上下文 + go func() { + defer cancel() + for { + if _, _, err := c.ReadMessage(); err != nil { + // 客户端断开连接 + return + } + } + }() + + // 获取Docker客户端 + cli := docker.GetClient() + + // 设置日志选项 + options := container.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, // 持续跟踪日志 + Tail: "100", // 初始返回100行 + } + + // 获取容器日志流 + logStream, err := cli.ContainerLogs(ctx, containerID, options) + if err != nil { + // 发送错误消息并关闭连接 + c.WriteMessage(websocket.TextMessage, []byte("获取容器日志失败: "+err.Error())) + c.Close() + return + } + defer logStream.Close() + + // 创建缓冲区 + buffer := make([]byte, 4096) + + // 持续读取日志并发送到WebSocket + for { + select { + case <-ctx.Done(): + // 上下文被取消,退出循环 + return + default: + // 读取日志 + n, err := logStream.Read(buffer) + if err != nil { + if err == io.EOF { + // 日志读取完毕,等待新日志 + time.Sleep(500 * time.Millisecond) + continue + } + // 其他错误,关闭连接 + return + } + + if n > 0 { + // 清理Docker日志头部的8个字节控制信息 + cleanedLog := cleanDockerLogBuffer(buffer[:n]) + + // 发送日志到WebSocket + if err := c.WriteMessage(websocket.TextMessage, cleanedLog); err != nil { + // 发送失败,关闭连接 + return + } + } + } + } +} + +// ContainerLogsWebSocketMiddleware WebSocket中间件,用于升级HTTP连接到WebSocket +func ContainerLogsWebSocketMiddleware() fiber.Handler { + return websocket.New(WebSocketContainerLogs, websocket.Config{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + }) +} + +// cleanDockerLogBuffer 清理Docker日志缓冲区 +func cleanDockerLogBuffer(buffer []byte) []byte { + // 如果缓冲区太小,直接返回 + if len(buffer) <= 8 { + return buffer + } + + // 分割日志为行 + lines := strings.Split(string(buffer), "\n") + for i, line := range lines { + if len(line) > 8 { + // 跳过每行前8个字节的头部信息 + lines[i] = line[8:] + } + } + + // 合并并返回 + return []byte(strings.Join(lines, "\n")) +} diff --git a/internal/model/message.go b/internal/model/message.go index e39f496..812257f 100644 --- a/internal/model/message.go +++ b/internal/model/message.go @@ -21,8 +21,9 @@ const ( // Message 表示微信消息 type Message struct { BaseModel - RobotId uint // 机器人Id - MsgId int64 // 消息Id + RobotId uint `gorm:"index:deleted,unique"` // 机器人Id + MsgId int64 `gorm:"index:deleted,unique"` // 消息Id + ClientMsgId int `gorm:"index:deleted,unique"` // 客户端消息Id CreateTime int // 发送时间戳 CreateAt time.Time // 发送时间 Type types.MessageType // 消息类型 diff --git a/internal/server/server.go b/internal/server/server.go index b8024af..c02c5ec 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -132,6 +132,8 @@ func (s *Server) SetupRoutes() { // 添加容器API接口 s.app.Get("/api/v1/containers/:id/logs", handler.GetContainerLogs) s.app.Get("/api/v1/containers/:id/stats", handler.GetContainerStats) + // 添加WebSocket API,实时获取容器日志 + s.app.Get("/api/v1/ws/containers/:id/logs", handler.ContainerLogsWebSocketMiddleware()) // 受保护的路由 - 简化为仅保留必要功能 auth := s.app.Group("/admin", middleware.Authenticate()) diff --git a/internal/tasks/sync.go b/internal/tasks/sync.go index f3a60ee..e093119 100644 --- a/internal/tasks/sync.go +++ b/internal/tasks/sync.go @@ -43,6 +43,7 @@ func syncMessage(client *xybot.Client, robotId uint) { var m model.Message m.RobotId = robotId m.MsgId = message.NewMsgId + m.ClientMsgId = message.MsgId m.CreateTime = message.CreateTime m.CreateAt = time.Unix(int64(message.CreateTime), 0) m.Type = types.MessageType(message.MsgType) diff --git a/internal/tasks/tasks.go b/internal/tasks/tasks.go index bf3be85..4ba9af0 100644 --- a/internal/tasks/tasks.go +++ b/internal/tasks/tasks.go @@ -34,32 +34,7 @@ func Start() { } // 遍历机器人,添加任务 for _, robot := range robots { - // 初始化微信客户端 - robotCli, err := xybot.NewClient(robot.WechatID, robot.ContainerHost, false) - if err != nil { - log.Panicf("初始化微信客户端失败: %v", err) - } - var job gocron.Job - job, err = scheduler.NewJob( - gocron.CronJob("*/5 * * * * *", true), // 五秒钟同步一次 - gocron.NewTask(syncMessage, robotCli, robot.ID), - ) - if err != nil { - log.Panicf("添加定时任务失败: %v", err) - } - // 添加到已启动的任务列表 - enabledSyncMessageMap.Store(robot.ID, job.ID()) - - // 添加联系人同步任务 - job, err = scheduler.NewJob( - gocron.CronJob("0 */1 * * *", true), // 每小时同步一次 - gocron.NewTask(syncContact, robot.ContainerHost, robot.WechatID, robot.ID), - ) - if err != nil { - log.Panicf("添加联系人同步任务失败: %v", err) - } - // 添加到已启动的任务列表 - enabledSyncContactMap.Store(robot.ID, job.ID()) + AddJob(robot) // 添加定时任务 } // 启动定时任务 diff --git a/internal/view/robot/show.html b/internal/view/robot/show.html index 6d820d0..f6241a3 100644 --- a/internal/view/robot/show.html +++ b/internal/view/robot/show.html @@ -17,7 +17,7 @@
-
@@ -190,9 +190,60 @@ // 容器ID可能长度不一致,确保使用完整ID const containerId = "{{.Robot.ContainerID}}"; + + // WebSocket连接 + let logsSocket = null; - // 加载日志 - function loadLogs() { + // 连接WebSocket + function connectLogsWebSocket() { + logsLoading.style.display = 'flex'; + containerLogs.textContent = '正在连接到WebSocket...'; + + // 关闭已存在的WebSocket连接 + if (logsSocket) { + logsSocket.close(); + } + + // 创建新的WebSocket连接 + const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const wsUrl = `${wsProtocol}//${window.location.host}/api/v1/ws/containers/${containerId}/logs`; + + logsSocket = new WebSocket(wsUrl); + + // 连接打开时 + logsSocket.onopen = function() { + containerLogs.textContent = '已连接,等待日志数据...'; + logsLoading.style.display = 'none'; + }; + + // 收到消息时 + logsSocket.onmessage = function(event) { + // 如果是第一条消息,清空容器 + if (containerLogs.textContent === '已连接,等待日志数据...') { + containerLogs.textContent = ''; + } + + // 添加新日志 + containerLogs.textContent += event.data; + + // 自动滚动到底部 + containerLogs.scrollTop = containerLogs.scrollHeight; + }; + + // 连接关闭时 + logsSocket.onclose = function() { + containerLogs.textContent += '\n\n连接已关闭。点击刷新按钮重新连接。'; + }; + + // 发生错误时 + logsSocket.onerror = function(error) { + containerLogs.textContent = '连接错误: ' + JSON.stringify(error); + logsLoading.style.display = 'none'; + }; + } + + // 加载历史日志(初次加载时使用HTTP API获取) + function loadInitialLogs() { logsLoading.style.display = 'flex'; fetch(`/api/v1/containers/${containerId}/logs?lines=${logLines.value}`) @@ -201,6 +252,9 @@ if (data.success && data.logs) { containerLogs.textContent = data.logs; containerLogs.scrollTop = containerLogs.scrollHeight; + + // 加载历史日志后连接WebSocket获取实时日志 + connectLogsWebSocket(); } else { containerLogs.textContent = '无法获取日志: ' + (data.message || '未知错误'); } @@ -261,12 +315,28 @@ } // 初始加载日志和状态 - loadLogs(); + loadInitialLogs(); loadStats(); // 设置事件监听器 - reloadLogs.addEventListener('click', loadLogs); - logLines.addEventListener('change', loadLogs); + reloadLogs.addEventListener('click', function() { + // 先断开WebSocket连接 + if (logsSocket) { + logsSocket.close(); + } + // 重新加载初始日志,然后会自动重连WebSocket + loadInitialLogs(); + }); + + logLines.addEventListener('change', function() { + // 先断开WebSocket连接 + if (logsSocket) { + logsSocket.close(); + } + // 重新加载初始日志,然后会自动重连WebSocket + loadInitialLogs(); + }); + refreshBtn.addEventListener('click', function() { this.classList.add('animate-spin'); loadStats(); @@ -277,13 +347,20 @@ // 定期更新状态 setInterval(loadStats, 5000); // 每5秒更新一次 + + // 页面关闭时断开WebSocket连接 + window.addEventListener('beforeunload', function() { + if (logsSocket) { + logsSocket.close(); + } + }); }); // 确认删除函数 async function confirmDelete(id, name) { if (typeof confirmDialog === 'function') { const confirmed = await confirmDialog( - `确定要删除机器人"${name}"吗?此操作将永久删除容器及相关数据,无法恢复!`, + "确定要删除机器人\"" + name + "\"吗?此操作将永久删除容器及相关数据,无法恢复!", { type: 'danger', title: '删除机器人' } ); @@ -292,7 +369,7 @@ } } else { // 降级方案:使用原生confirm - if (confirm(`确定要删除机器人"${name}"吗?此操作不可恢复!`)) { + if (confirm("确定要删除机器人\"" + name + "\"吗?此操作不可恢复!")) { deleteRobot(id); } }