diff --git a/src/skopeo_service.go b/src/skopeo_service.go index d048cc9..631fd68 100644 --- a/src/skopeo_service.go +++ b/src/skopeo_service.go @@ -75,6 +75,10 @@ type Client struct { TaskID string Send chan []byte CloseOnce sync.Once + heartbeat *time.Ticker // 心跳定时器 + isActive bool // 连接是否活跃 + lastPing time.Time // 最后一次ping时间 + mu sync.RWMutex // 保护isActive和lastPing } var ( @@ -132,9 +136,11 @@ func handleWebSocket(c *gin.Context) { } client := &Client{ - Conn: conn, - TaskID: taskID, - Send: make(chan []byte, 256), + Conn: conn, + TaskID: taskID, + Send: make(chan []byte, 256), + isActive: true, + lastPing: time.Now(), } // 注册客户端 @@ -142,6 +148,9 @@ func handleWebSocket(c *gin.Context) { clients[taskID] = client clientLock.Unlock() + // 启动心跳保活机制 + client.startHeartbeat() + // 启动goroutine处理消息发送 go client.writePump() @@ -150,34 +159,133 @@ func handleWebSocket(c *gin.Context) { if task, exists := tasks[taskID]; exists { tasksLock.Unlock() taskJSON, _ := json.Marshal(task) - client.Send <- taskJSON + select { + case client.Send <- taskJSON: + default: + // 通道满时不阻塞 + } } else { tasksLock.Unlock() } - // 设置WebSocket超时 - conn.SetReadDeadline(time.Now().Add(120 * time.Second)) - conn.SetWriteDeadline(time.Now().Add(60 * time.Second)) + // 设置更宽松的读取超时,主要用于检测客户端断开 + conn.SetReadDeadline(time.Now().Add(5 * time.Minute)) + + // 不设置写入超时,让心跳机制处理连接活跃性 // 处理WebSocket关闭 conn.SetCloseHandler(func(code int, text string) error { - client.CloseOnce.Do(func() { - close(client.Send) - clientLock.Lock() - delete(clients, taskID) - clientLock.Unlock() - }) + client.close() return nil }) + + // 处理pong消息以确认连接活跃 + conn.SetPongHandler(func(appData string) error { + client.mu.Lock() + client.lastPing = time.Now() + client.mu.Unlock() + conn.SetReadDeadline(time.Now().Add(5 * time.Minute)) + return nil + }) + + // 启动读取循环(主要用于处理pong和检测断开) + go client.readPump() +} + +// 启动心跳保活机制 +func (c *Client) startHeartbeat() { + c.heartbeat = time.NewTicker(30 * time.Second) + go func() { + defer c.heartbeat.Stop() + for { + select { + case <-c.heartbeat.C: + c.mu.RLock() + if !c.isActive { + c.mu.RUnlock() + return + } + c.mu.RUnlock() + + // 发送ping消息保持连接活跃 + if err := c.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + fmt.Printf("发送心跳失败: %v\n", err) + c.close() + return + } + + // 检查上次pong响应时间 + c.mu.RLock() + timeSinceLastPong := time.Since(c.lastPing) + c.mu.RUnlock() + + // 如果超过2分钟没有收到pong响应,认为连接已断开 + if timeSinceLastPong > 2*time.Minute { + fmt.Printf("客户端 %s 心跳超时,关闭连接\n", c.TaskID) + c.close() + return + } + } + } + }() +} + +// 读取循环,主要处理pong消息和检测连接断开 +func (c *Client) readPump() { + defer c.close() + + for { + c.mu.RLock() + if !c.isActive { + c.mu.RUnlock() + break + } + c.mu.RUnlock() + + // 读取消息,主要是为了检测连接状态 + _, _, err := c.Conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + fmt.Printf("WebSocket意外关闭: %v\n", err) + } + break + } + } +} + +// 安全关闭客户端连接 +func (c *Client) close() { + c.CloseOnce.Do(func() { + c.mu.Lock() + c.isActive = false + c.mu.Unlock() + + if c.heartbeat != nil { + c.heartbeat.Stop() + } + + close(c.Send) + c.Conn.Close() + + clientLock.Lock() + delete(clients, c.TaskID) + clientLock.Unlock() + }) } // 客户端消息发送loop func (c *Client) writePump() { - defer func() { - c.Conn.Close() - }() + defer c.close() for message := range c.Send { + c.mu.RLock() + if !c.isActive { + c.mu.RUnlock() + break + } + c.mu.RUnlock() + + // 不设置写入超时,依赖心跳机制检测连接状态 err := c.Conn.WriteMessage(websocket.TextMessage, message) if err != nil { fmt.Printf("发送WS消息失败: %v\n", err) @@ -249,6 +357,22 @@ func initTask(task *DownloadTask) { task.done = make(chan struct{}) task.createTime = time.Now() + // 启动定期状态更新,保持WebSocket连接活跃 + go func() { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // 定期发送当前状态保持连接活跃 + sendTaskUpdate(task) + case <-task.done: + return + } + } + }() + // 启动进度处理goroutine go func() { defer func() { @@ -1050,11 +1174,19 @@ func sendTaskUpdate(task *DownloadTask) { clientLock.Unlock() if exists { - select { - case client.Send <- taskJSON: - // 成功发送 - default: - // 通道已满或关闭,忽略 + // 检查客户端是否活跃 + client.mu.RLock() + isActive := client.isActive + client.mu.RUnlock() + + if isActive { + select { + case client.Send <- taskJSON: + // 成功发送 + case <-time.After(5 * time.Second): + // 发送超时,可能客户端处理慢或连接有问题 + fmt.Printf("发送消息到客户端 %s 超时\n", task.ID) + } } } } @@ -1267,7 +1399,7 @@ func checkForCompletionMarkers(output string) bool { // cleanupWebSocketConnections 定期清理无效的WebSocket连接 func cleanupWebSocketConnections() { - ticker := time.NewTicker(5 * time.Minute) + ticker := time.NewTicker(2 * time.Minute) // 增加清理频率 defer ticker.Stop() for range ticker.C { @@ -1275,9 +1407,28 @@ func cleanupWebSocketConnections() { disconnectedClients := make([]string, 0) for taskID, client := range clients { - // 检查连接是否仍然活跃 - if err := client.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { - // 连接已断开,标记待清理 + client.mu.RLock() + isActive := client.isActive + lastPing := client.lastPing + client.mu.RUnlock() + + // 检查连接是否还活跃 + shouldRemove := false + + if !isActive { + shouldRemove = true + } else if time.Since(lastPing) > 3*time.Minute { + // 超过3分钟没有心跳响应,认为连接已断开 + shouldRemove = true + fmt.Printf("客户端 %s 心跳超时,标记为断开\n", taskID) + } else { + // 尝试发送ping测试连接 + if err := client.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + shouldRemove = true + } + } + + if shouldRemove { disconnectedClients = append(disconnectedClients, taskID) } } @@ -1285,11 +1436,7 @@ func cleanupWebSocketConnections() { // 清理断开的连接 for _, taskID := range disconnectedClients { if client, exists := clients[taskID]; exists { - client.CloseOnce.Do(func() { - close(client.Send) - client.Conn.Close() - }) - delete(clients, taskID) + client.close() } }