From b3cafcadb211f76f981670d5bc8ee8da81c56e28 Mon Sep 17 00:00:00 2001 From: iluobei Date: Fri, 3 Apr 2026 15:51:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=89=E8=A3=85nginx?= =?UTF-8?q?=E4=B8=8Exray=E8=BF=87=E7=A8=8B=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/mmw-agent/main.go | 15 ++-- internal/handler/manage.go | 148 +++++++++++++++++++++++++++++++++++++ 2 files changed, 158 insertions(+), 5 deletions(-) diff --git a/cmd/mmw-agent/main.go b/cmd/mmw-agent/main.go index 6242136..6fa8a97 100644 --- a/cmd/mmw-agent/main.go +++ b/cmd/mmw-agent/main.go @@ -89,6 +89,12 @@ func main() { mux.HandleFunc("/api/child/scan", manageHandler.HandleScan) mux.HandleFunc("/api/child/cert/deploy", manageHandler.HandleCertDeploy) + // SSE streaming install/remove + mux.HandleFunc("/api/child/xray/install-stream", manageHandler.HandleXrayInstallStream) + mux.HandleFunc("/api/child/xray/remove-stream", manageHandler.HandleXrayRemoveStream) + mux.HandleFunc("/api/child/nginx/install-stream", manageHandler.HandleNginxInstallStream) + mux.HandleFunc("/api/child/nginx/remove-stream", manageHandler.HandleNginxRemoveStream) + // Health check mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") @@ -96,12 +102,11 @@ func main() { w.Write([]byte(`{"status":"ok","mode":"` + string(agentClient.GetCurrentMode()) + `"}`)) }) - // Create HTTP server + // Create HTTP server (no WriteTimeout — SSE streaming needs long-lived connections) server := &http.Server{ - Addr: ":" + cfg.ListenPort, - Handler: mux, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + Addr: ":" + cfg.ListenPort, + Handler: mux, + ReadTimeout: 30 * time.Second, } // Setup graceful shutdown diff --git a/internal/handler/manage.go b/internal/handler/manage.go index 9c6375d..427124b 100644 --- a/internal/handler/manage.go +++ b/internal/handler/manage.go @@ -1,16 +1,19 @@ package handler import ( + "bufio" "bytes" "context" "encoding/json" "fmt" + "io" "log" "net/http" "os" "os/exec" "path/filepath" "strings" + "sync" "sync/atomic" "time" @@ -2498,3 +2501,148 @@ func runCommand(name string, args ...string) error { } return nil } + +// ================== SSE Streaming Install/Remove ================== + +func sseStreamCmd(w http.ResponseWriter, r *http.Request, cmd *exec.Cmd, completeMsg string) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming not supported", http.StatusInternalServerError) + return + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + sseEvent(w, flusher, map[string]string{"type": "error", "message": err.Error()}) + return + } + stderr, err := cmd.StderrPipe() + if err != nil { + sseEvent(w, flusher, map[string]string{"type": "error", "message": err.Error()}) + return + } + + if err := cmd.Start(); err != nil { + sseEvent(w, flusher, map[string]string{"type": "error", "message": err.Error()}) + return + } + + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(2) + + scanStream := func(rc io.ReadCloser) { + defer wg.Done() + scanner := bufio.NewScanner(rc) + scanner.Buffer(make([]byte, 256*1024), 256*1024) + for scanner.Scan() { + select { + case <-r.Context().Done(): + return + default: + } + mu.Lock() + sseEvent(w, flusher, map[string]string{"type": "output", "data": scanner.Text()}) + mu.Unlock() + } + } + + go scanStream(stdout) + go scanStream(stderr) + + done := make(chan error, 1) + go func() { done <- cmd.Wait() }() + + select { + case err := <-done: + wg.Wait() + if err != nil { + sseEvent(w, flusher, map[string]string{"type": "error", "message": err.Error()}) + } else { + sseEvent(w, flusher, map[string]interface{}{"type": "complete", "success": true, "message": completeMsg}) + } + case <-r.Context().Done(): + cmd.Process.Kill() + sseEvent(w, flusher, map[string]string{"type": "error", "message": "request cancelled"}) + } +} + +func sseEvent(w http.ResponseWriter, flusher http.Flusher, data interface{}) { + b, _ := json.Marshal(data) + fmt.Fprintf(w, "data: %s\n\n", b) + flusher.Flush() +} + +func (h *ManageHandler) HandleXrayInstallStream(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed") + return + } + if !h.authenticate(r) { + writeError(w, http.StatusUnauthorized, "Unauthorized") + return + } + log.Printf("[Manage] Starting Xray install (stream)...") + cmd := exec.CommandContext(r.Context(), "bash", "-c", + `bash -c "$(curl -L https://github.com/XTLS/Xray-install/raw/main/install-release.sh)" @ install`) + cmd.Env = os.Environ() + sseStreamCmd(w, r, cmd, "Xray installed successfully") +} + +func (h *ManageHandler) HandleXrayRemoveStream(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed") + return + } + if !h.authenticate(r) { + writeError(w, http.StatusUnauthorized, "Unauthorized") + return + } + log.Printf("[Manage] Starting Xray remove (stream)...") + cmd := exec.CommandContext(r.Context(), "bash", "-c", + `bash -c "$(curl -L https://github.com/XTLS/Xray-install/raw/main/install-release.sh)" @ remove`) + cmd.Env = os.Environ() + sseStreamCmd(w, r, cmd, "Xray removed successfully") +} + +func (h *ManageHandler) HandleNginxInstallStream(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed") + return + } + if !h.authenticate(r) { + writeError(w, http.StatusUnauthorized, "Unauthorized") + return + } + if nginxInstalling.Load() { + writeError(w, http.StatusConflict, "Nginx installation already in progress") + return + } + nginxInstalling.Store(true) + defer nginxInstalling.Store(false) + log.Printf("[Manage] Starting Nginx install (stream)...") + cmd := exec.CommandContext(r.Context(), "bash", "-c", + `curl -fsSL https://raw.githubusercontent.com/iluobei/miaomiaowuX/main/install-nginx.sh | bash`) + cmd.Env = os.Environ() + sseStreamCmd(w, r, cmd, "Nginx installed successfully") +} + +func (h *ManageHandler) HandleNginxRemoveStream(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed") + return + } + if !h.authenticate(r) { + writeError(w, http.StatusUnauthorized, "Unauthorized") + return + } + log.Printf("[Manage] Starting Nginx remove (stream)...") + cmd := exec.CommandContext(r.Context(), "bash", "-c", + `curl -fsSL https://raw.githubusercontent.com/iluobei/miaomiaowuX/main/uninstall-nginx.sh | bash -s -- -y`) + cmd.Env = os.Environ() + sseStreamCmd(w, r, cmd, "Nginx removed successfully") +}