From b3202edb4990b92d5643f530bacd52edfd0ac9e4 Mon Sep 17 00:00:00 2001 From: iluobei Date: Fri, 10 Apr 2026 15:25:21 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/mmw-agent/main.go | 70 ++--- internal/agent/client.go | 293 +++++++++++------- internal/collector/metrics.go | 43 +-- internal/config/config.go | 50 ++- internal/constants/defaults.go | 109 +++++++ internal/constants/routes.go | 43 +++ ...n_latency.go => domain_latency_handler.go} | 22 +- .../{manage.go => management_handler.go} | 287 +++++++++-------- .../handler/{api.go => pull_api_handler.go} | 30 +- internal/handler/routes.go | 41 +++ ...{embed.go => xray_default_config_embed.go} | 0 internal/xrpc/client.go | 4 +- internal/xrpc/services/handler/inbound.go | 21 +- internal/xrpc/services/handler/outbound.go | 1 - internal/xrpc/services/handler/users.go | 12 +- internal/xrpc/services/logger/logger.go | 7 +- internal/xrpc/services/stats/stats.go | 7 +- 17 files changed, 633 insertions(+), 407 deletions(-) create mode 100644 internal/constants/defaults.go create mode 100644 internal/constants/routes.go rename internal/handler/{domain_latency.go => domain_latency_handler.go} (88%) rename internal/handler/{manage.go => management_handler.go} (91%) rename internal/handler/{api.go => pull_api_handler.go} (69%) create mode 100644 internal/handler/routes.go rename internal/handler/{embed.go => xray_default_config_embed.go} (100%) diff --git a/cmd/mmw-agent/main.go b/cmd/mmw-agent/main.go index 6362d65..a4a026d 100644 --- a/cmd/mmw-agent/main.go +++ b/cmd/mmw-agent/main.go @@ -8,10 +8,10 @@ import ( "os" "os/signal" "syscall" - "time" "mmw-agent/internal/agent" "mmw-agent/internal/config" + "mmw-agent/internal/constants" "mmw-agent/internal/handler" ) @@ -20,19 +20,19 @@ func main() { configPathShort := flag.String("c", "", "Path to config file (shorthand)") flag.Parse() - // -c takes effect if -config is not set + // 仅在 -config 未设置时使用 -c cfgFile := *configPath if cfgFile == "" { cfgFile = *configPathShort } - // Default to config.yaml in working directory + // 默认读取工作目录下的 config.yaml if cfgFile == "" { if _, err := os.Stat("config.yaml"); err == nil { cfgFile = "config.yaml" } } - // Load configuration + // 加载配置 var cfg *config.Config var err error @@ -41,7 +41,7 @@ func main() { if err != nil { log.Fatalf("Failed to load config: %v", err) } - // Merge environment variables (env takes precedence) + // 合并环境变量(环境变量优先) cfg.Merge(config.FromEnv()) } else { cfg = config.FromEnv() @@ -56,72 +56,42 @@ func main() { log.Printf("[Main] Listen port: %s", cfg.ListenPort) log.Printf("[Main] Xray servers: %d configured", len(cfg.XrayServers)) - // Create agent client + // 创建 agent 客户端 agentClient := agent.NewClient(cfg) - // Create handlers + // 创建处理器 apiHandler := handler.NewAPIHandler(agentClient, cfg.Token) manageHandler := handler.NewManageHandler(cfg.Token) - // Setup HTTP routes + // 注册 HTTP 路由 mux := http.NewServeMux() + handler.RegisterChildRoutes(mux, apiHandler, manageHandler) - // Pull mode API - mux.HandleFunc("/api/child/traffic", apiHandler.ServeHTTP) - mux.HandleFunc("/api/child/speed", apiHandler.ServeSpeedHTTP) - - // Management API - mux.HandleFunc("/api/child/services/status", manageHandler.HandleServicesStatus) - mux.HandleFunc("/api/child/services/control", manageHandler.HandleServiceControl) - mux.HandleFunc("/api/child/xray/install", manageHandler.HandleXrayInstall) - mux.HandleFunc("/api/child/xray/remove", manageHandler.HandleXrayRemove) - mux.HandleFunc("/api/child/xray/config", manageHandler.HandleXrayConfig) - mux.HandleFunc("/api/child/xray/system-config", manageHandler.HandleXraySystemConfig) - mux.HandleFunc("/api/child/xray/config-files", manageHandler.HandleXrayConfigFiles) - mux.HandleFunc("/api/child/nginx/install", manageHandler.HandleNginxInstall) - mux.HandleFunc("/api/child/nginx/remove", manageHandler.HandleNginxRemove) - mux.HandleFunc("/api/child/nginx/config", manageHandler.HandleNginxConfig) - mux.HandleFunc("/api/child/nginx/config-files", manageHandler.HandleNginxConfigFiles) - mux.HandleFunc("/api/child/system/info", manageHandler.HandleSystemInfo) - mux.HandleFunc("/api/child/inbounds", manageHandler.HandleInbounds) - mux.HandleFunc("/api/child/outbounds", manageHandler.HandleOutbounds) - mux.HandleFunc("/api/child/routing", manageHandler.HandleRouting) - mux.HandleFunc("/api/child/scan", manageHandler.HandleScan) - mux.HandleFunc("/api/child/cert/deploy", manageHandler.HandleCertDeploy) - mux.HandleFunc("/api/child/nginx/setup-ssl", manageHandler.HandleNginxSetupSSL) - mux.HandleFunc("/api/child/domains/latency", manageHandler.HandleDomainLatencyProbe) - - // SSE streaming install/remove - mux.HandleFunc("/api/child/xray/install-stream", manageHandler.HandleXrayInstallStream) - mux.HandleFunc("/api/child/xray/remove-stream", manageHandler.HandleXrayRemoveStream) - mux.HandleFunc("/api/child/nginx/install-stream", manageHandler.HandleNginxInstallStream) - mux.HandleFunc("/api/child/nginx/remove-stream", manageHandler.HandleNginxRemoveStream) - - // Health check - mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") + // 健康检查 + mux.HandleFunc(constants.PathHealth, func(w http.ResponseWriter, r *http.Request) { + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok","mode":"` + string(agentClient.GetCurrentMode()) + `"}`)) }) - // Create HTTP server (no WriteTimeout — SSE streaming needs long-lived connections) + // 创建 HTTP 服务(不设置 WriteTimeout,避免影响 SSE 长连接) server := &http.Server{ Addr: ":" + cfg.ListenPort, Handler: mux, - ReadTimeout: 30 * time.Second, + ReadTimeout: constants.DefaultReadTimeout, } - // Setup graceful shutdown + // 配置优雅退出 ctx, cancel := context.WithCancel(context.Background()) defer cancel() sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - // Start agent client + // 启动 agent 客户端 agentClient.Start(ctx) - // Start HTTP server + // 启动 HTTP 服务 go func() { log.Printf("[Main] HTTP server listening on :%s", cfg.ListenPort) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -129,15 +99,15 @@ func main() { } }() - // Wait for shutdown signal + // 等待退出信号 sig := <-sigCh log.Printf("[Main] Received signal %v, shutting down...", sig) - // Graceful shutdown + // 优雅退出 cancel() agentClient.Stop() - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), constants.DefaultShutdownTimeout) defer shutdownCancel() if err := server.Shutdown(shutdownCtx); err != nil { diff --git a/internal/agent/client.go b/internal/agent/client.go index 7d767c7..a0d09ae 100644 --- a/internal/agent/client.go +++ b/internal/agent/client.go @@ -21,11 +21,12 @@ import ( "mmw-agent/internal/collector" "mmw-agent/internal/config" + "mmw-agent/internal/constants" "github.com/gorilla/websocket" ) -// ConnectionMode represents the current connection mode +// ConnectionMode 表示当前连接模式。 type ConnectionMode string const ( @@ -35,7 +36,7 @@ const ( ModeAuto ConnectionMode = "auto" ) -// Client represents an agent client that connects to a master server +// Client 表示连接主控端的 agent 客户端。 type Client struct { config *config.Config collector *collector.Collector @@ -47,20 +48,20 @@ type Client struct { stopCh chan struct{} wg sync.WaitGroup - // Connection state + // 连接状态 currentMode ConnectionMode httpClient *http.Client httpAvailable bool modeMu sync.RWMutex - // Speed calculation (from system network interface) + // 速率计算(基于系统网卡统计) lastRxBytes int64 lastTxBytes int64 lastSampleTime time.Time speedMu sync.Mutex } -// NewClient creates a new agent client +// 创建 agent 客户端。 func NewClient(cfg *config.Config) *Client { return &Client{ config: cfg, @@ -68,20 +69,20 @@ func NewClient(cfg *config.Config) *Client { xrayServers: cfg.XrayServers, stopCh: make(chan struct{}), httpClient: &http.Client{ - Timeout: 10 * time.Second, + Timeout: constants.DefaultHTTPClientTimeout, }, - currentMode: ModePull, // Default to pull mode + currentMode: ModePull, // 默认使用拉取模式 } } -// wsHeaders returns HTTP headers for WebSocket handshake +// 生成 WebSocket 握手请求头。 func (c *Client) wsHeaders() http.Header { h := http.Header{} - h.Set("User-Agent", config.AgentUserAgent) + h.Set(constants.HeaderUserAgent, constants.AgentUserAgent) return h } -// newRequest creates an HTTP request with standard headers (Content-Type, Authorization, User-Agent) +// 创建带标准请求头的 HTTP 请求。 func (c *Client) newRequest(ctx context.Context, method, urlStr string, body []byte) (*http.Request, error) { var req *http.Request var err error @@ -93,13 +94,13 @@ func (c *Client) newRequest(ctx context.Context, method, urlStr string, body []b if err != nil { return nil, err } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Bearer "+c.config.Token) - req.Header.Set("User-Agent", config.AgentUserAgent) + req.Header.Set(constants.HeaderContentType, constants.ContentTypeJSON) + req.Header.Set(constants.HeaderAuthorization, constants.BearerPrefix+c.config.Token) + req.Header.Set(constants.HeaderUserAgent, constants.AgentUserAgent) return req, nil } -// Start starts the agent client with automatic mode selection +// 按配置启动客户端。 func (c *Client) Start(ctx context.Context) { log.Printf("[Agent] Starting in %s mode", c.config.ConnectionMode) @@ -117,7 +118,7 @@ func (c *Client) Start(ctx context.Context) { case ModePull: c.setCurrentMode(ModePull) log.Printf("[Agent] Pull mode enabled - API will be served at /api/child/traffic and /api/child/speed") - // Report agent info immediately via HTTP heartbeat + // 启动后先通过 HTTP 上报一次心跳信息 if err := c.sendHeartbeatHTTP(ctx); err != nil { log.Printf("[Agent] Failed to send initial heartbeat in pull mode: %v", err) } @@ -130,7 +131,7 @@ func (c *Client) Start(ctx context.Context) { } } -// Stop stops the agent client +// 停止客户端。 func (c *Client) Stop() { close(c.stopCh) c.wg.Wait() @@ -144,33 +145,33 @@ func (c *Client) Stop() { log.Printf("[Agent] Stopped") } -// IsConnected returns whether the WebSocket is connected +// 返回 WebSocket 连接状态。 func (c *Client) IsConnected() bool { c.wsMu.Lock() defer c.wsMu.Unlock() return c.connected } -// GetCurrentMode returns the current connection mode +// 返回当前连接模式。 func (c *Client) GetCurrentMode() ConnectionMode { c.modeMu.RLock() defer c.modeMu.RUnlock() return c.currentMode } -// setCurrentMode sets the current connection mode +// 设置当前连接模式。 func (c *Client) setCurrentMode(mode ConnectionMode) { c.modeMu.Lock() defer c.modeMu.Unlock() c.currentMode = mode } -// runWebSocket manages the WebSocket connection lifecycle with fallback to auto mode +// 维护 WebSocket 连接,并在失败时回退自动模式。 func (c *Client) runWebSocket(ctx context.Context) { defer c.wg.Done() - maxConsecutiveFailures := 5 - maxAuthFailures := 10 + maxConsecutiveFailures := constants.WebSocketMaxConsecutiveFailures + maxAuthFailures := constants.WebSocketMaxAuthFailures consecutiveFailures := 0 authFailures := 0 @@ -190,22 +191,22 @@ func (c *Client) runWebSocket(ctx context.Context) { return } - // Check if this is an authentication error + // 判断是否为鉴权错误 if authErr, ok := err.(*AuthError); ok { authFailures++ if authErr.IsTokenInvalid() { log.Printf("[Agent] Authentication failed (invalid token): %v", err) if authFailures >= maxAuthFailures { log.Printf("[Agent] Too many auth failures (%d), entering sleep mode (30 min backoff)", authFailures) - c.waitWithTrafficReport(ctx, 30*time.Minute) + c.waitWithTrafficReport(ctx, constants.AuthFailureSleepBackoff) authFailures = 0 continue } } - // Use longer backoff for auth errors - backoff := time.Duration(authFailures) * 30 * time.Second - if backoff > 10*time.Minute { - backoff = 10 * time.Minute + // 鉴权错误使用更长退避时间 + backoff := time.Duration(authFailures) * constants.AuthFailureBackoffStep + if backoff > constants.AuthFailureMaxBackoff { + backoff = constants.AuthFailureMaxBackoff } log.Printf("[Agent] Auth error, reconnecting in %v...", backoff) c.waitWithTrafficReport(ctx, backoff) @@ -233,21 +234,21 @@ func (c *Client) runWebSocket(ctx context.Context) { } } -// calculateBackoff calculates the reconnection backoff duration with exponential increase +// 计算重连退避时长。 func (c *Client) calculateBackoff() time.Duration { c.reconnects++ - // Exponential backoff: 5s, 10s, 20s, 40s, 80s, 160s, 300s(cap) - backoff := 5 * time.Second - for i := 1; i < c.reconnects && backoff < 5*time.Minute; i++ { + // 指数退避: 5s, 10s, 20s, 40s, 80s, 160s, 300s(上限) + backoff := constants.ReconnectBaseBackoff + for i := 1; i < c.reconnects && backoff < constants.ReconnectMaxBackoff; i++ { backoff *= 2 } - if backoff > 5*time.Minute { - backoff = 5 * time.Minute + if backoff > constants.ReconnectMaxBackoff { + backoff = constants.ReconnectMaxBackoff } return backoff } -// connectAndRun establishes and maintains a WebSocket connection +// 建立并维持 WebSocket 连接。 func (c *Client) connectAndRun(ctx context.Context) error { masterURL := c.config.MasterURL u, err := url.Parse(masterURL) @@ -262,12 +263,12 @@ func (c *Client) connectAndRun(ctx context.Context) error { u.Scheme = "wss" } - u.Path = "/api/remote/ws" + u.Path = constants.PathRemoteWebSocket log.Printf("[Agent] Connecting to %s", u.String()) dialer := websocket.Dialer{ - HandshakeTimeout: 10 * time.Second, + HandshakeTimeout: constants.WebSocketHandshakeTimeout, } conn, _, err := dialer.DialContext(ctx, u.String(), c.wsHeaders()) @@ -298,18 +299,18 @@ func (c *Client) connectAndRun(ctx context.Context) error { log.Printf("[Agent] Connected and authenticated") - // Report agent info (listen_port) immediately after connection + // 连接成功后立即上报 agent 信息(listen_port) if err := c.sendHeartbeat(conn); err != nil { log.Printf("[Agent] Failed to send initial heartbeat: %v", err) } - // Send scan result to master for auto-sync + // 异步上报扫描结果,供主控端自动同步 go c.sendScanResult(conn) return c.runMessageLoop(ctx, conn) } -// authenticate sends the authentication message +// 发送鉴权消息。 func (c *Client) authenticate(conn *websocket.Conn) error { authPayload, _ := json.Marshal(map[string]string{ "token": c.config.Token, @@ -324,7 +325,7 @@ func (c *Client) authenticate(conn *websocket.Conn) error { return err } - conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + conn.SetReadDeadline(time.Now().Add(constants.WebSocketReadDeadline)) _, message, err := conn.ReadMessage() if err != nil { return err @@ -349,11 +350,11 @@ func (c *Client) authenticate(conn *websocket.Conn) error { return nil } -// runMessageLoop handles sending traffic data, speed data, and heartbeats +// 处理流量、速率和心跳上报。 func (c *Client) runMessageLoop(ctx context.Context, conn *websocket.Conn) error { trafficTicker := time.NewTicker(c.config.TrafficReportInterval) speedTicker := time.NewTicker(c.config.SpeedReportInterval) - heartbeatTicker := time.NewTicker(30 * time.Second) + heartbeatTicker := time.NewTicker(constants.WebSocketHeartbeatInterval) defer trafficTicker.Stop() defer speedTicker.Stop() defer heartbeatTicker.Stop() @@ -362,13 +363,13 @@ func (c *Client) runMessageLoop(ctx context.Context, conn *websocket.Conn) error errCh := make(chan error, 1) go func() { for { - conn.SetReadDeadline(time.Now().Add(5 * time.Minute)) + conn.SetReadDeadline(time.Now().Add(constants.WebSocketIdleDeadline)) _, message, err := conn.ReadMessage() if err != nil { errCh <- err return } - // Send message to processing channel + // 投递到消息处理通道 select { case msgCh <- message: default: @@ -406,7 +407,7 @@ func (c *Client) runMessageLoop(ctx context.Context, conn *websocket.Conn) error } } -// sendTrafficData collects and sends traffic data to the master +// 采集并发送流量数据。 func (c *Client) sendTrafficData(conn *websocket.Conn) error { stats, err := c.collectLocalMetrics() if err != nil { @@ -437,7 +438,7 @@ func (c *Client) sendTrafficData(conn *websocket.Conn) error { return nil } -// sendHeartbeat sends a heartbeat message +// 发送心跳消息。 func (c *Client) sendHeartbeat(conn *websocket.Conn) error { now := time.Now() listenPort, _ := strconv.Atoi(c.config.ListenPort) @@ -458,7 +459,7 @@ func (c *Client) sendHeartbeat(conn *websocket.Conn) error { return err } -// collectLocalMetrics collects traffic metrics from local Xray servers +// 采集本机 Xray 流量指标。 func (c *Client) collectLocalMetrics() (*collector.XrayStats, error) { stats := &collector.XrayStats{ Inbound: make(map[string]collector.TrafficData), @@ -487,23 +488,23 @@ func (c *Client) collectLocalMetrics() (*collector.XrayStats, error) { return stats, nil } -// GetStats returns the current traffic stats (for pull mode) +// 返回当前流量统计(拉取模式)。 func (c *Client) GetStats() (*collector.XrayStats, error) { return c.collectLocalMetrics() } -// GetSpeed returns the current speed data (for pull mode) +// 返回当前速率(拉取模式)。 func (c *Client) GetSpeed() (uploadSpeed, downloadSpeed int64) { return c.collectSpeed() } -// runAutoMode implements the three-tier fallback: WebSocket -> HTTP -> Pull +// 使用三层回退:WebSocket -> HTTP -> Pull。 func (c *Client) runAutoMode(ctx context.Context) { defer c.wg.Done() c.runAutoModeLoop(ctx) } -// runAutoModeLoop is the internal loop for auto mode fallback +// 是自动模式的内部循环。 func (c *Client) runAutoModeLoop(ctx context.Context) { autoRetries := 0 for { @@ -548,14 +549,14 @@ func (c *Client) runAutoModeLoop(ctx context.Context) { log.Printf("[Agent] Falling back to pull mode - API available at /api/child/traffic and /api/child/speed") c.sendHeartbeatHTTP(ctx) - // Exponential backoff for pull mode: 30s, 60s, 120s, 240s, 300s(cap) + // 拉取模式退避: 30s, 60s, 120s, 240s, 300s(上限) autoRetries++ - pullDuration := 30 * time.Second - for i := 1; i < autoRetries && pullDuration < 5*time.Minute; i++ { + pullDuration := constants.AutoModePullFallbackBackoff + for i := 1; i < autoRetries && pullDuration < constants.ReconnectMaxBackoff; i++ { pullDuration *= 2 } - if pullDuration > 5*time.Minute { - pullDuration = 5 * time.Minute + if pullDuration > constants.ReconnectMaxBackoff { + pullDuration = constants.ReconnectMaxBackoff } c.runPullModeWithTrafficReport(ctx, pullDuration) @@ -567,7 +568,7 @@ func (c *Client) runAutoModeLoop(ctx context.Context) { } } -// tryWebSocketOnce attempts a single WebSocket connection test +// 执行一次 WebSocket 可用性探测。 func (c *Client) tryWebSocketOnce(ctx context.Context) error { masterURL := c.config.MasterURL u, err := url.Parse(masterURL) @@ -581,10 +582,10 @@ func (c *Client) tryWebSocketOnce(ctx context.Context) error { case "https": u.Scheme = "wss" } - u.Path = "/api/remote/ws" + u.Path = constants.PathRemoteWebSocket dialer := websocket.Dialer{ - HandshakeTimeout: 10 * time.Second, + HandshakeTimeout: constants.WebSocketHandshakeTimeout, } conn, _, err := dialer.DialContext(ctx, u.String(), c.wsHeaders()) @@ -595,13 +596,13 @@ func (c *Client) tryWebSocketOnce(ctx context.Context) error { return nil } -// tryHTTPOnce tests if HTTP push is available +// 探测 HTTP 推送是否可用。 func (c *Client) tryHTTPOnce(ctx context.Context) bool { u, err := url.Parse(c.config.MasterURL) if err != nil { return false } - u.Path = "/api/remote/heartbeat" + u.Path = constants.PathRemoteHeartbeat req, err := c.newRequest(ctx, http.MethodPost, u.String(), []byte("{}")) if err != nil { @@ -619,18 +620,18 @@ func (c *Client) tryHTTPOnce(ctx context.Context) bool { return c.httpAvailable } -// runHTTPReporter runs the HTTP push reporter +// 运行 HTTP 推送上报器。 func (c *Client) runHTTPReporter(ctx context.Context) { defer c.wg.Done() c.setCurrentMode(ModeHTTP) c.runHTTPReporterLoop(ctx) } -// runHTTPReporterLoop runs the HTTP reporting loop +// 执行 HTTP 上报循环。 func (c *Client) runHTTPReporterLoop(ctx context.Context) { trafficTicker := time.NewTicker(c.config.TrafficReportInterval) speedTicker := time.NewTicker(c.config.SpeedReportInterval) - heartbeatTicker := time.NewTicker(30 * time.Second) + heartbeatTicker := time.NewTicker(constants.WebSocketHeartbeatInterval) defer trafficTicker.Stop() defer speedTicker.Stop() defer heartbeatTicker.Stop() @@ -676,7 +677,7 @@ func (c *Client) runHTTPReporterLoop(ctx context.Context) { } } -// sendTrafficHTTP sends traffic data via HTTP POST +// 通过 HTTP POST 发送流量数据。 func (c *Client) sendTrafficHTTP(ctx context.Context) error { stats, err := c.collectLocalMetrics() if err != nil { @@ -691,7 +692,7 @@ func (c *Client) sendTrafficHTTP(ctx context.Context) error { if err != nil { return err } - u.Path = "/api/remote/traffic" + u.Path = constants.PathRemoteTraffic req, err := c.newRequest(ctx, http.MethodPost, u.String(), payload) if err != nil { @@ -714,7 +715,7 @@ func (c *Client) sendTrafficHTTP(ctx context.Context) error { return nil } -// sendSpeedHTTP sends speed data via HTTP POST +// 通过 HTTP POST 发送速率数据。 func (c *Client) sendSpeedHTTP(ctx context.Context) error { uploadSpeed, downloadSpeed := c.collectSpeed() @@ -727,7 +728,7 @@ func (c *Client) sendSpeedHTTP(ctx context.Context) error { if err != nil { return err } - u.Path = "/api/remote/speed" + u.Path = constants.PathRemoteSpeed req, err := c.newRequest(ctx, http.MethodPost, u.String(), payload) if err != nil { @@ -749,7 +750,7 @@ func (c *Client) sendSpeedHTTP(ctx context.Context) error { return nil } -// sendHeartbeatHTTP sends heartbeat via HTTP POST +// 通过 HTTP POST 发送心跳。 func (c *Client) sendHeartbeatHTTP(ctx context.Context) error { now := time.Now() listenPort, _ := strconv.Atoi(c.config.ListenPort) @@ -762,7 +763,7 @@ func (c *Client) sendHeartbeatHTTP(ctx context.Context) error { if err != nil { return err } - u.Path = "/api/remote/heartbeat" + u.Path = constants.PathRemoteHeartbeat req, err := c.newRequest(ctx, http.MethodPost, u.String(), payload) if err != nil { @@ -783,7 +784,7 @@ func (c *Client) sendHeartbeatHTTP(ctx context.Context) error { return nil } -// runPullModeWithTrafficReport runs pull mode while sending traffic data to keep server online +// 在拉取模式下持续上报流量,保持在线状态。 func (c *Client) runPullModeWithTrafficReport(ctx context.Context, duration time.Duration) { trafficTicker := time.NewTicker(c.config.TrafficReportInterval) defer trafficTicker.Stop() @@ -810,13 +811,13 @@ func (c *Client) runPullModeWithTrafficReport(ctx context.Context, duration time } } -// waitWithTrafficReport waits for the specified duration while sending traffic data +// 在等待期间继续上报流量。 func (c *Client) waitWithTrafficReport(ctx context.Context, duration time.Duration) { if duration <= 0 { return } - if duration > 30*time.Second { + if duration > constants.PullModeTrafficReportThreshold { if err := c.sendTrafficHTTP(ctx); err != nil { log.Printf("[Agent] Traffic report during backoff failed: %v", err) } @@ -843,7 +844,7 @@ func (c *Client) waitWithTrafficReport(ctx context.Context, duration time.Durati } } -// sendSpeedData sends speed data via WebSocket +// 通过 WebSocket 发送速率数据。 func (c *Client) sendSpeedData(conn *websocket.Conn) error { uploadSpeed, downloadSpeed := c.collectSpeed() @@ -869,7 +870,7 @@ func (c *Client) sendSpeedData(conn *websocket.Conn) error { return nil } -// collectSpeed calculates the current upload and download speed from system network interface +// 基于系统网卡统计计算当前上下行速率。 func (c *Client) collectSpeed() (uploadSpeed, downloadSpeed int64) { c.speedMu.Lock() defer c.speedMu.Unlock() @@ -900,7 +901,7 @@ func (c *Client) collectSpeed() (uploadSpeed, downloadSpeed int64) { return uploadSpeed, downloadSpeed } -// getSystemNetworkStats reads network statistics from /proc/net/dev +// 从 /proc/net/dev 读取网卡统计。 func (c *Client) getSystemNetworkStats() (rxBytes, txBytes int64) { data, err := os.ReadFile("/proc/net/dev") if err != nil { @@ -936,7 +937,7 @@ func (c *Client) getSystemNetworkStats() (rxBytes, txBytes int64) { return rxBytes, txBytes } -// AuthError represents an authentication error +// AuthError 表示鉴权失败错误。 type AuthError struct { Message string Code string // "token_expired", "token_invalid", "server_error" @@ -946,12 +947,12 @@ func (e *AuthError) Error() string { return "authentication failed: " + e.Message } -// IsTokenInvalid returns true if the error indicates an invalid token +// 判断是否为 token 无效错误。 func (e *AuthError) IsTokenInvalid() bool { return e.Code == "token_invalid" || e.Message == "Invalid token" } -// WebSocket message types +// WebSocket 消息类型 const ( WSMsgTypeCertDeploy = "cert_deploy" WSMsgTypeTokenUpdate = "token_update" @@ -960,7 +961,7 @@ const ( WSMsgTypeDomainLatencyResult = "domain_latency_result" ) -// WSCertDeployPayload represents a certificate deploy command from master +// WSCertDeployPayload 是主控端下发的证书部署指令。 type WSCertDeployPayload struct { Domain string `json:"domain"` CertPEM string `json:"cert_pem"` @@ -970,20 +971,20 @@ type WSCertDeployPayload struct { Reload string `json:"reload"` } -// WSTokenUpdatePayload represents a token update from master +// WSTokenUpdatePayload 是主控端下发的 token 更新指令。 type WSTokenUpdatePayload struct { ServerToken string `json:"server_token"` ExpiresAt time.Time `json:"expires_at"` } -// WSDomainLatencyProbePayload is received from master +// WSDomainLatencyProbePayload 是主控端下发的域名延迟探测请求。 type WSDomainLatencyProbePayload struct { RequestID string `json:"request_id"` Domains []string `json:"domains"` TimeoutMs int `json:"timeout_ms"` } -// handleMessage processes incoming messages from master +// 处理主控端下发的消息。 func (c *Client) handleMessage(conn *websocket.Conn, message []byte) { var msg struct { Type string `json:"type"` @@ -1018,11 +1019,11 @@ func (c *Client) handleMessage(conn *websocket.Conn, message []byte) { } go c.handleDomainLatencyProbe(conn, payload) default: - // Ignore unknown message types + // 忽略未知消息类型 } } -// handleCertDeploy deploys a certificate received from master +// 处理主控端下发的证书部署。 func (c *Client) handleCertDeploy(payload WSCertDeployPayload) { log.Printf("[Agent] Received cert_deploy for domain: %s, target: %s", payload.Domain, payload.Reload) @@ -1065,7 +1066,7 @@ func deployCert(certPEM, keyPEM, certPath, keyPath, reloadTarget string) error { } func reloadNginxCmd() error { - for _, bin := range []string{"/usr/local/nginx/sbin/nginx", "nginx"} { + for _, bin := range constants.NginxBinarySearchPaths { if path, err := exec.LookPath(bin); err == nil { return runCmd(path, "-s", "reload") } @@ -1080,43 +1081,47 @@ func runCmd(name string, args ...string) error { return nil } -// handleTokenUpdate processes a token update from master +// 处理主控端下发的 token 更新。 func (c *Client) handleTokenUpdate(payload WSTokenUpdatePayload) { log.Printf("[Agent] Received token update from master, new token expires at %s", payload.ExpiresAt.Format(time.RFC3339)) - // Update the token in memory + // 更新内存中的 token c.config.Token = payload.ServerToken log.Printf("[Agent] Token updated successfully in memory") } -// handleDomainLatencyProbe probes domain latency locally and sends results back via WebSocket +// 在本机探测域名延迟并回传结果。 func (c *Client) handleDomainLatencyProbe(conn *websocket.Conn, payload WSDomainLatencyProbePayload) { log.Printf("[Agent] Received domain_latency_probe: %d domains, timeout=%dms", len(payload.Domains), payload.TimeoutMs) timeoutMs := payload.TimeoutMs if timeoutMs <= 0 { - timeoutMs = 2000 + timeoutMs = constants.DomainProbeDefaultTimeoutMS } - if timeoutMs < 200 { - timeoutMs = 200 + if timeoutMs < constants.DomainProbeMinTimeoutMS { + timeoutMs = constants.DomainProbeMinTimeoutMS } - if timeoutMs > 10000 { - timeoutMs = 10000 + if timeoutMs > constants.DomainProbeMaxTimeoutMS { + timeoutMs = constants.DomainProbeMaxTimeoutMS } timeout := time.Duration(timeoutMs) * time.Millisecond type probeResult struct { - Domain string `json:"domain"` - Target string `json:"target"` - Success bool `json:"success"` - LatencyMs int64 `json:"latency_ms,omitempty"` - Error string `json:"error,omitempty"` + Domain string `json:"domain"` + Target string `json:"target"` + Success bool `json:"success"` + LatencyMs int64 `json:"latency_ms,omitempty"` + Error string `json:"error,omitempty"` + NginxSSLPort int `json:"nginx_ssl_port,omitempty"` } + // 读取本机 nginx 配置,构造 domain -> ssl 端口映射 + nginxPortMap := readNginxSSLPorts(payload.Domains) + results := make([]probeResult, 0, len(payload.Domains)) resultCh := make(chan probeResult, len(payload.Domains)) - sem := make(chan struct{}, 16) + sem := make(chan struct{}, constants.DomainProbeConcurrency) var wg sync.WaitGroup for _, domain := range payload.Domains { @@ -1145,7 +1150,7 @@ func (c *Client) handleDomainLatencyProbe(conn *websocket.Conn, payload WSDomain return } _ = tcpConn.Close() - resultCh <- probeResult{Domain: host, Target: target, Success: true, LatencyMs: time.Since(start).Milliseconds()} + resultCh <- probeResult{Domain: host, Target: target, Success: true, LatencyMs: time.Since(start).Milliseconds(), NginxSSLPort: nginxPortMap[host]} }() } @@ -1155,7 +1160,7 @@ func (c *Client) handleDomainLatencyProbe(conn *websocket.Conn, payload WSDomain results = append(results, r) } - // Sort: success first, then by latency + // 排序:成功优先,再按延迟升序 sort.Slice(results, func(i, j int) bool { if results[i].Success != results[j].Success { return results[i].Success @@ -1201,9 +1206,80 @@ func (c *Client) handleDomainLatencyProbe(conn *websocket.Conn, payload WSDomain log.Printf("[Agent] Sent domain_latency_result: %d results", len(results)) } -// sendScanResult scans local xray status and sends results to master +// readNginxSSLPorts 读取 nginx 配置并返回 domain -> SSL 端口映射。 +// 会在常见 nginx 配置目录下查找 servers/{domain}.conf。 +func readNginxSSLPorts(domains []string) map[string]int { + result := make(map[string]int) + if len(domains) == 0 { + return result + } + + confDirs := constants.NginxSSLServerDirPaths + + for _, domain := range domains { + host := domain + if h, _, err := net.SplitHostPort(domain); err == nil && h != "" { + host = h + } + for _, dir := range confDirs { + confPath := filepath.Join(dir, host+".conf") + data, err := os.ReadFile(confPath) + if err != nil { + continue + } + if port := extractSSLListenPort(string(data)); port > 0 { + result[host] = port + break + } + } + } + return result +} + +// 提取 nginx 配置块中第一个 "listen ssl" 端口。 +func extractSSLListenPort(conf string) int { + // 匹配示例: listen 58443 ssl + for _, line := range strings.Split(conf, "\n") { + line = strings.TrimSpace(line) + if !strings.HasPrefix(line, "listen") { + continue + } + // 去掉 "listen" 前缀和结尾分号 + rest := strings.TrimPrefix(line, "listen") + rest = strings.TrimRight(rest, ";") + rest = strings.TrimSpace(rest) + fields := strings.Fields(rest) + if len(fields) < 2 { + continue + } + // 判断字段中是否包含 "ssl" + hasSSL := false + for _, f := range fields[1:] { + if f == "ssl" { + hasSSL = true + break + } + } + if !hasSSL { + continue + } + // 第一个字段是端口(或 [::]:port) + portStr := fields[0] + // 兼容 [::]:port 形式 + if idx := strings.LastIndex(portStr, ":"); idx >= 0 { + portStr = portStr[idx+1:] + } + port, err := strconv.Atoi(portStr) + if err == nil && port > 0 { + return port + } + } + return 0 +} + +// 扫描本机 xray 状态并上报主控端。 func (c *Client) sendScanResult(conn *websocket.Conn) { - // Check xray running status + // 检查 xray 运行状态 xrayRunning := false xrayVersion := "" cmd := exec.Command("xray", "version") @@ -1214,12 +1290,9 @@ func (c *Client) sendScanResult(conn *websocket.Conn) { xrayRunning = true } - // Read inbounds from config + // 从配置读取入站列表 var inbounds []map[string]interface{} - configPaths := []string{ - "/usr/local/etc/xray/config.json", - "/etc/xray/config.json", - } + configPaths := constants.DefaultXrayConfigPaths for _, cfgPath := range configPaths { data, err := os.ReadFile(cfgPath) if err != nil { diff --git a/internal/collector/metrics.go b/internal/collector/metrics.go index 53bcf5f..5949872 100644 --- a/internal/collector/metrics.go +++ b/internal/collector/metrics.go @@ -8,28 +8,29 @@ import ( "os" "strconv" "strings" - "time" + + "mmw-agent/internal/constants" ) -// XrayMetrics represents the metrics response from Xray's /debug/vars endpoint +// XrayMetrics 表示 Xray /debug/vars 的响应结构。 type XrayMetrics struct { Stats *XrayStats `json:"stats,omitempty"` } -// XrayStats contains inbound, outbound, and user traffic stats +// XrayStats 包含入站、出站和用户维度的流量统计。 type XrayStats struct { Inbound map[string]TrafficData `json:"inbound,omitempty"` Outbound map[string]TrafficData `json:"outbound,omitempty"` User map[string]TrafficData `json:"user,omitempty"` } -// TrafficData contains uplink and downlink traffic in bytes +// TrafficData 表示上下行流量(字节)。 type TrafficData struct { Uplink int64 `json:"uplink"` Downlink int64 `json:"downlink"` } -// XrayConfig represents the structure of xray config.json for reading metrics port +// XrayConfig 用于读取 xray config.json 中的 metrics 监听配置。 type XrayConfig struct { Log json.RawMessage `json:"log,omitempty"` DNS json.RawMessage `json:"dns,omitempty"` @@ -42,51 +43,51 @@ type XrayConfig struct { Metrics *MetricsConfig `json:"metrics,omitempty"` } -// MetricsConfig represents the metrics section in xray config +// MetricsConfig 对应 xray 配置中的 metrics 段。 type MetricsConfig struct { Tag string `json:"tag,omitempty"` - Listen string `json:"listen,omitempty"` // Format: "127.0.0.1:38889" + Listen string `json:"listen,omitempty"` // 格式示例: "127.0.0.1:38889" } -// Collector collects traffic metrics from Xray servers +// Collector 负责采集 Xray 流量指标。 type Collector struct { httpClient *http.Client defaultMetricsPort int defaultMetricsHost string } -// NewCollector creates a new metrics collector +// 创建指标采集器。 func NewCollector() *Collector { return &Collector{ - httpClient: &http.Client{Timeout: 10 * time.Second}, - defaultMetricsPort: 38889, - defaultMetricsHost: "127.0.0.1", + httpClient: &http.Client{Timeout: constants.DefaultHTTPClientTimeout}, + defaultMetricsPort: constants.DefaultMetricsPort, + defaultMetricsHost: constants.DefaultMetricsHost, } } -// GetMetricsPortFromConfig reads the metrics port from xray config file +// 从 xray 配置中读取 metrics 监听地址和端口。 func (c *Collector) GetMetricsPortFromConfig(configPath string) (string, int, error) { if configPath == "" { - return "127.0.0.1", c.defaultMetricsPort, nil + return constants.DefaultMetricsHost, c.defaultMetricsPort, nil } data, err := os.ReadFile(configPath) if err != nil { - return "127.0.0.1", c.defaultMetricsPort, fmt.Errorf("read config file: %w", err) + return constants.DefaultMetricsHost, c.defaultMetricsPort, fmt.Errorf("read config file: %w", err) } var config XrayConfig if err := json.Unmarshal(data, &config); err != nil { - return "127.0.0.1", c.defaultMetricsPort, fmt.Errorf("parse config file: %w", err) + return constants.DefaultMetricsHost, c.defaultMetricsPort, fmt.Errorf("parse config file: %w", err) } if config.Metrics == nil || config.Metrics.Listen == "" { return "", 0, fmt.Errorf("metrics not configured in xray config") } - // Parse listen address (format: "127.0.0.1:38889" or ":38889") + // 解析监听地址,支持 "127.0.0.1:38889" 或 ":38889" listen := config.Metrics.Listen - host := "127.0.0.1" + host := constants.DefaultMetricsHost var port int if strings.Contains(listen, ":") { @@ -102,7 +103,7 @@ func (c *Collector) GetMetricsPortFromConfig(configPath string) (string, int, er port = p } } else { - // Try to parse as port only + // 兼容仅填写端口的写法 p, err := strconv.Atoi(listen) if err != nil { return "", 0, fmt.Errorf("invalid metrics listen format: %s", listen) @@ -117,7 +118,7 @@ func (c *Collector) GetMetricsPortFromConfig(configPath string) (string, int, er return host, port, nil } -// FetchMetrics fetches metrics from Xray's /debug/vars endpoint +// 从 Xray 的 /debug/vars 拉取指标。 func (c *Collector) FetchMetrics(host string, port int) (*XrayMetrics, error) { url := fmt.Sprintf("http://%s:%d/debug/vars", host, port) @@ -149,7 +150,7 @@ func (c *Collector) FetchMetrics(host string, port int) (*XrayMetrics, error) { return &metrics, nil } -// MergeStats merges source stats into dest stats +// 将 source 的统计合并到 dest。 func MergeStats(dest, source *XrayStats) { if source == nil { return diff --git a/internal/config/config.go b/internal/config/config.go index eacfb2d..08c4869 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,12 +5,14 @@ import ( "strconv" "time" + "mmw-agent/internal/constants" + "gopkg.in/yaml.v3" ) -const AgentUserAgent = "miaomiaowux/0.1" +const AgentUserAgent = constants.AgentUserAgent -// Config holds the agent configuration +// Config 保存 agent 的运行配置。 type Config struct { MasterURL string `yaml:"master_url"` Token string `yaml:"token"` @@ -21,20 +23,16 @@ type Config struct { SpeedReportInterval time.Duration `yaml:"speed_report_interval"` } -// XrayServer represents a local Xray server configuration +// XrayServer 表示本机 Xray 节点配置。 type XrayServer struct { Name string `yaml:"name"` ConfigPath string `yaml:"config_path"` } -// DefaultXrayConfigPaths are the default paths to search for Xray config -var DefaultXrayConfigPaths = []string{ - "/usr/local/etc/xray/config.json", - "/etc/xray/config.json", - "/opt/xray/config.json", -} +// DefaultXrayConfigPaths 是默认的 Xray 配置搜索路径。 +var DefaultXrayConfigPaths = append([]string(nil), constants.DefaultXrayConfigPaths...) -// Load loads configuration from a YAML file +// 从 YAML 文件加载配置。 func Load(path string) (*Config, error) { data, err := os.ReadFile(path) if err != nil { @@ -46,13 +44,13 @@ func Load(path string) (*Config, error) { return nil, err } - // Apply defaults + // 补齐默认值 config.applyDefaults() return &config, nil } -// FromEnv creates configuration from environment variables +// 从环境变量构造配置。 func FromEnv() *Config { config := &Config{ MasterURL: os.Getenv("MMWX_MASTER_URL"), @@ -61,14 +59,14 @@ func FromEnv() *Config { ListenPort: os.Getenv("MMWX_LISTEN_PORT"), } - // Parse Xray config path from env + // 读取 Xray 配置路径 if xrayConfig := os.Getenv("MMWX_XRAY_CONFIG"); xrayConfig != "" { config.XrayServers = []XrayServer{ {Name: "primary", ConfigPath: xrayConfig}, } } - // Parse intervals + // 读取上报间隔 if interval := os.Getenv("MMWX_TRAFFIC_INTERVAL"); interval != "" { if d, err := time.ParseDuration(interval); err == nil { config.TrafficReportInterval = d @@ -84,7 +82,7 @@ func FromEnv() *Config { return config } -// Merge merges environment config into file config (env takes precedence) +// 合并环境变量配置到文件配置(环境变量优先)。 func (c *Config) Merge(env *Config) { if env.MasterURL != "" { c.MasterURL = env.MasterURL @@ -109,28 +107,28 @@ func (c *Config) Merge(env *Config) { } } -// applyDefaults sets default values for unset fields +// 为空字段填充默认值。 func (c *Config) applyDefaults() { if c.ConnectionMode == "" { - c.ConnectionMode = "auto" + c.ConnectionMode = constants.ConnectionModeAuto } if c.ListenPort == "" { - c.ListenPort = "23889" + c.ListenPort = constants.DefaultListenPort } if c.TrafficReportInterval == 0 { - c.TrafficReportInterval = 1 * time.Minute + c.TrafficReportInterval = constants.DefaultTrafficReportInterval } if c.SpeedReportInterval == 0 { - c.SpeedReportInterval = 3 * time.Second + c.SpeedReportInterval = constants.DefaultSpeedReportInterval } - // Auto-discover Xray servers if not configured + // 未显式配置时自动探测 Xray 配置 if len(c.XrayServers) == 0 { c.XrayServers = c.discoverXrayServers() } } -// discoverXrayServers scans default paths for Xray config files +// 扫描默认路径中的 Xray 配置文件。 func (c *Config) discoverXrayServers() []XrayServer { var servers []XrayServer for i, path := range DefaultXrayConfigPaths { @@ -144,11 +142,11 @@ func (c *Config) discoverXrayServers() []XrayServer { return servers } -// Validate checks if the configuration is valid +// 校验配置是否合法。 func (c *Config) Validate() error { - // Token is required for non-pull modes - if c.ConnectionMode != "pull" && c.Token == "" { - // Allow empty token, will work in pull mode only + // 拉取模式之外通常需要 token + if c.ConnectionMode != constants.ConnectionModePull && c.Token == "" { + // 兼容空 token,实际仅拉取模式可正常工作 } return nil } diff --git a/internal/constants/defaults.go b/internal/constants/defaults.go new file mode 100644 index 0000000..a83918b --- /dev/null +++ b/internal/constants/defaults.go @@ -0,0 +1,109 @@ +package constants + +import "time" + +const ( + AgentUserAgent = "miaomiaowux/0.1" +) + +const ( + HeaderAuthorization = "Authorization" + HeaderContentType = "Content-Type" + HeaderMMRemoteToken = "MM-Remote-Token" + HeaderUserAgent = "User-Agent" + ContentTypeJSON = "application/json" + BearerPrefix = "Bearer " +) + +const ( + ConnectionModeAuto = "auto" + ConnectionModePull = "pull" +) + +const ( + DefaultListenPort = "23889" +) + +const ( + DefaultTrafficReportInterval = 1 * time.Minute + DefaultSpeedReportInterval = 3 * time.Second + DefaultHTTPClientTimeout = 10 * time.Second + DefaultReadTimeout = 30 * time.Second + DefaultShutdownTimeout = 10 * time.Second + DefaultRPCShortTimeout = 5 * time.Second +) + +const ( + DefaultMetricsHost = "127.0.0.1" + DefaultMetricsPort = 38889 + DefaultMetricsListen = "127.0.0.1:38889" + LocalhostIP = "127.0.0.1" +) + +const ( + WebSocketMaxConsecutiveFailures = 5 + WebSocketMaxAuthFailures = 10 + AuthFailureSleepBackoff = 30 * time.Minute + PullModeTrafficReportThreshold = 30 * time.Second +) + +const ( + ReconnectBaseBackoff = 5 * time.Second + ReconnectMaxBackoff = 5 * time.Minute + AuthFailureBackoffStep = 30 * time.Second + AuthFailureMaxBackoff = 10 * time.Minute + AutoModePullFallbackBackoff = 30 * time.Second + WebSocketHandshakeTimeout = 10 * time.Second + WebSocketReadDeadline = 10 * time.Second + WebSocketHeartbeatInterval = 30 * time.Second + WebSocketIdleDeadline = 5 * time.Minute +) + +const ( + DomainProbeDefaultTimeoutMS = 2000 + DomainProbeMinTimeoutMS = 200 + DomainProbeMaxTimeoutMS = 10000 + DomainProbeMaxCount = 200 + DomainProbeConcurrency = 16 +) + +var ( + NginxPrimaryPrefixDir = "/usr/local/nginx" + + DefaultXrayConfigPaths = []string{ + "/usr/local/etc/xray/config.json", + "/etc/xray/config.json", + "/opt/xray/config.json", + } + XrayConfigDirPaths = []string{ + "/usr/local/etc/xray", + "/etc/xray", + "/opt/xray", + } + DefaultNginxConfigPaths = []string{ + "/etc/nginx/nginx.conf", + "/usr/local/nginx/conf/nginx.conf", + } + NginxConfigDirPaths = []string{ + "/etc/nginx", + "/etc/nginx/sites-available", + "/etc/nginx/sites-enabled", + "/etc/nginx/conf.d", + "/usr/local/nginx/conf", + } + NginxSSLServerDirPaths = []string{ + "/usr/local/nginx/servers", + "/usr/local/nginx/conf/servers", + "/etc/nginx/servers", + "/etc/nginx/conf.d", + } + XrayBinarySearchPaths = []string{ + "/usr/local/bin/xray", + "/usr/bin/xray", + "/opt/xray/xray", + } + NginxBinarySearchPaths = []string{ + "/usr/local/nginx/sbin/nginx", + "nginx", + } +) diff --git a/internal/constants/routes.go b/internal/constants/routes.go new file mode 100644 index 0000000..7e73c45 --- /dev/null +++ b/internal/constants/routes.go @@ -0,0 +1,43 @@ +package constants + +const ( + PathHealth = "/health" +) + +const ( + PathChildTraffic = "/api/child/traffic" + PathChildSpeed = "/api/child/speed" + PathChildServiceStats = "/api/child/services/status" + PathChildServiceCtl = "/api/child/services/control" + PathChildXrayInstall = "/api/child/xray/install" + PathChildXrayRemove = "/api/child/xray/remove" + PathChildXrayConfig = "/api/child/xray/config" + PathChildXraySysCfg = "/api/child/xray/system-config" + PathChildXrayCfgFiles = "/api/child/xray/config-files" + PathChildNginxInstall = "/api/child/nginx/install" + PathChildNginxRemove = "/api/child/nginx/remove" + PathChildNginxConfig = "/api/child/nginx/config" + PathChildNginxCfgFile = "/api/child/nginx/config-files" + PathChildSystemInfo = "/api/child/system/info" + PathChildInbounds = "/api/child/inbounds" + PathChildOutbounds = "/api/child/outbounds" + PathChildRouting = "/api/child/routing" + PathChildScan = "/api/child/scan" + PathChildCertDeploy = "/api/child/cert/deploy" + PathChildNginxSetup = "/api/child/nginx/setup-ssl" + PathChildDomainProbe = "/api/child/domains/latency" +) + +const ( + PathChildXrayInstallStream = "/api/child/xray/install-stream" + PathChildXrayRemoveStream = "/api/child/xray/remove-stream" + PathChildNginxInstallSSE = "/api/child/nginx/install-stream" + PathChildNginxRemoveSSE = "/api/child/nginx/remove-stream" +) + +const ( + PathRemoteWebSocket = "/api/remote/ws" + PathRemoteHeartbeat = "/api/remote/heartbeat" + PathRemoteTraffic = "/api/remote/traffic" + PathRemoteSpeed = "/api/remote/speed" +) diff --git a/internal/handler/domain_latency.go b/internal/handler/domain_latency_handler.go similarity index 88% rename from internal/handler/domain_latency.go rename to internal/handler/domain_latency_handler.go index 20aaf3f..e02f1aa 100644 --- a/internal/handler/domain_latency.go +++ b/internal/handler/domain_latency_handler.go @@ -9,6 +9,8 @@ import ( "strings" "sync" "time" + + "mmw-agent/internal/constants" ) type DomainLatencyProbeRequest struct { @@ -24,7 +26,7 @@ type DomainLatencyProbeResult struct { Error string `json:"error,omitempty"` } -// HandleDomainLatencyProbe handles POST /api/child/domains/latency +// 处理 POST /api/child/domains/latency 请求。 func (h *ManageHandler) HandleDomainLatencyProbe(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -49,13 +51,13 @@ func (h *ManageHandler) HandleDomainLatencyProbe(w http.ResponseWriter, r *http. timeoutMs := req.TimeoutMs if timeoutMs <= 0 { - timeoutMs = 2000 + timeoutMs = constants.DomainProbeDefaultTimeoutMS } - if timeoutMs < 200 { - timeoutMs = 200 + if timeoutMs < constants.DomainProbeMinTimeoutMS { + timeoutMs = constants.DomainProbeMinTimeoutMS } - if timeoutMs > 10000 { - timeoutMs = 10000 + if timeoutMs > constants.DomainProbeMaxTimeoutMS { + timeoutMs = constants.DomainProbeMaxTimeoutMS } timeout := time.Duration(timeoutMs) * time.Millisecond @@ -64,13 +66,13 @@ func (h *ManageHandler) HandleDomainLatencyProbe(w http.ResponseWriter, r *http. writeError(w, http.StatusBadRequest, "no valid domain to probe") return } - if len(domains) > 200 { - domains = domains[:200] + if len(domains) > constants.DomainProbeMaxCount { + domains = domains[:constants.DomainProbeMaxCount] } results := make([]DomainLatencyProbeResult, 0, len(domains)) resultCh := make(chan DomainLatencyProbeResult, len(domains)) - sem := make(chan struct{}, 16) + sem := make(chan struct{}, constants.DomainProbeConcurrency) var wg sync.WaitGroup for _, domain := range domains { @@ -206,7 +208,7 @@ func splitHostPortLoose(input string) (host string, port string, ok bool) { } } - // Fallback for "domain:443" without brackets handling. + // 兼容不带方括号的 "domain:443" 写法。 idx := strings.LastIndex(s, ":") if idx <= 0 || idx >= len(s)-1 { return "", "", false diff --git a/internal/handler/manage.go b/internal/handler/management_handler.go similarity index 91% rename from internal/handler/manage.go rename to internal/handler/management_handler.go index ac6046a..ddf6ea3 100644 --- a/internal/handler/manage.go +++ b/internal/handler/management_handler.go @@ -17,7 +17,7 @@ import ( "sync/atomic" "time" - "mmw-agent/internal/config" + "mmw-agent/internal/constants" "mmw-agent/internal/xrpc" "github.com/xtls/xray-core/app/proxyman/command" @@ -26,21 +26,21 @@ import ( var nginxInstalling atomic.Bool -// ManageHandler handles management API requests for child servers +// ManageHandler 处理子端管理接口请求。 type ManageHandler struct { configToken string } -// NewManageHandler creates a new management handler +// 创建管理处理器。 func NewManageHandler(configToken string) *ManageHandler { return &ManageHandler{ configToken: configToken, } } -// authenticate checks if the request is authorized (token + User-Agent) +// 校验请求身份(token + User-Agent)。 func (h *ManageHandler) authenticate(r *http.Request) bool { - if r.Header.Get("User-Agent") != config.AgentUserAgent { + if r.Header.Get(constants.HeaderUserAgent) != constants.AgentUserAgent { return false } @@ -48,30 +48,30 @@ func (h *ManageHandler) authenticate(r *http.Request) bool { return true } - auth := r.Header.Get("Authorization") + auth := r.Header.Get(constants.HeaderAuthorization) if auth == "" { - auth = r.Header.Get("MM-Remote-Token") + auth = r.Header.Get(constants.HeaderMMRemoteToken) } if auth == "" { return false } - if strings.HasPrefix(auth, "Bearer ") { - token := strings.TrimPrefix(auth, "Bearer ") + if strings.HasPrefix(auth, constants.BearerPrefix) { + token := strings.TrimPrefix(auth, constants.BearerPrefix) return token == h.configToken } return auth == h.configToken } -// writeJSON writes JSON response +// 输出 JSON 响应。 func writeJSON(w http.ResponseWriter, statusCode int, data interface{}) { - w.Header().Set("Content-Type", "application/json") + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) w.WriteHeader(statusCode) json.NewEncoder(w).Encode(data) } -// writeError writes error response +// 输出错误响应。 func writeError(w http.ResponseWriter, statusCode int, message string) { writeJSON(w, statusCode, map[string]interface{}{ "success": false, @@ -79,23 +79,23 @@ func writeError(w http.ResponseWriter, statusCode int, message string) { }) } -// ================== System Services Status ================== +// ================== 系统服务状态 ================== -// ServicesStatusResponse represents the response for services status +// ServicesStatusResponse 表示服务状态查询响应。 type ServicesStatusResponse struct { Success bool `json:"success"` Xray *ServiceStatus `json:"xray,omitempty"` Nginx *ServiceStatus `json:"nginx,omitempty"` } -// ServiceStatus represents a service status +// ServiceStatus 表示单个服务状态。 type ServiceStatus struct { Installed bool `json:"installed"` Running bool `json:"running"` Version string `json:"version,omitempty"` } -// HandleServicesStatus handles GET /api/child/services/status +// 处理 GET /api/child/services/status。 func (h *ManageHandler) HandleServicesStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -121,8 +121,7 @@ func (h *ManageHandler) getXrayStatus() *ServiceStatus { xrayPath, err := exec.LookPath("xray") if err != nil { - commonPaths := []string{"/usr/local/bin/xray", "/usr/bin/xray", "/opt/xray/xray"} - for _, p := range commonPaths { + for _, p := range constants.XrayBinarySearchPaths { if _, err := os.Stat(p); err == nil { xrayPath = p break @@ -142,7 +141,7 @@ func (h *ManageHandler) getXrayStatus() *ServiceStatus { } } - // Check systemctl first + // 优先使用 systemctl 检查 cmd := exec.Command("systemctl", "is-active", "xray") output, _ := cmd.Output() if strings.TrimSpace(string(output)) == "active" { @@ -150,14 +149,14 @@ func (h *ManageHandler) getXrayStatus() *ServiceStatus { return status } - // Fallback: check if xray process is running via pgrep + // 兜底:用 pgrep 检查 xray 进程 pgrepCmd := exec.Command("pgrep", "-x", "xray") if err := pgrepCmd.Run(); err == nil { status.Running = true return status } - // Fallback: check via ps for processes containing "xray" + // 兜底:用 ps 检查包含 "xray" 的进程 psCmd := exec.Command("bash", "-c", "ps aux | grep -v grep | grep -E '[x]ray' | head -1") if output, err := psCmd.Output(); err == nil && len(strings.TrimSpace(string(output))) > 0 { status.Running = true @@ -169,12 +168,17 @@ func (h *ManageHandler) getXrayStatus() *ServiceStatus { func (h *ManageHandler) getNginxStatus() *ServiceStatus { status := &ServiceStatus{} - // Check PATH first, then compiled install path + // 先查 PATH,再查编译安装路径 nginxPath, err := exec.LookPath("nginx") if err != nil { - if _, statErr := os.Stat("/usr/local/nginx/sbin/nginx"); statErr == nil { - nginxPath = "/usr/local/nginx/sbin/nginx" - err = nil + for _, candidate := range constants.NginxBinarySearchPaths { + if strings.Contains(candidate, "/") { + if _, statErr := os.Stat(candidate); statErr == nil { + nginxPath = candidate + err = nil + break + } + } } } if err == nil { @@ -190,7 +194,7 @@ func (h *ManageHandler) getNginxStatus() *ServiceStatus { status.Version = "安装中..." } - // Check systemctl first + // 优先使用 systemctl 检查 cmd := exec.Command("systemctl", "is-active", "nginx") output, _ := cmd.Output() if strings.TrimSpace(string(output)) == "active" { @@ -198,14 +202,14 @@ func (h *ManageHandler) getNginxStatus() *ServiceStatus { return status } - // Fallback: check if nginx process is running via pgrep + // 兜底:用 pgrep 检查 nginx 进程 pgrepCmd := exec.Command("pgrep", "-x", "nginx") if err := pgrepCmd.Run(); err == nil { status.Running = true return status } - // Fallback: check via ps for nginx master process + // 兜底:用 ps 检查 nginx master 进程 psCmd := exec.Command("bash", "-c", "ps aux | grep -v grep | grep -E 'nginx: master' | head -1") if output, err := psCmd.Output(); err == nil && len(strings.TrimSpace(string(output))) > 0 { status.Running = true @@ -214,15 +218,15 @@ func (h *ManageHandler) getNginxStatus() *ServiceStatus { return status } -// ================== Service Control ================== +// ================== 服务控制 ================== -// ServiceControlRequest represents a service control request +// ServiceControlRequest 表示服务控制请求。 type ServiceControlRequest struct { Service string `json:"service"` Action string `json:"action"` } -// HandleServiceControl handles POST /api/child/services/control +// 处理 POST /api/child/services/control。 func (h *ManageHandler) HandleServiceControl(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -265,9 +269,9 @@ func (h *ManageHandler) HandleServiceControl(w http.ResponseWriter, r *http.Requ }) } -// ================== Xray Installation ================== +// ================== Xray 安装 ================== -// HandleXrayInstall handles POST /api/child/xray/install +// 处理 POST /api/child/xray/install。 func (h *ManageHandler) HandleXrayInstall(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -297,7 +301,7 @@ func (h *ManageHandler) HandleXrayInstall(w http.ResponseWriter, r *http.Request log.Printf("[Manage] Xray installed successfully") - // Deploy default config if no config exists + // 若无配置则下发默认配置 h.deployDefaultXrayConfig() writeJSON(w, http.StatusOK, map[string]interface{}{ @@ -307,7 +311,7 @@ func (h *ManageHandler) HandleXrayInstall(w http.ResponseWriter, r *http.Request }) } -// HandleXrayRemove handles POST /api/child/xray/remove +// 处理 POST /api/child/xray/remove。 func (h *ManageHandler) HandleXrayRemove(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -344,9 +348,9 @@ func (h *ManageHandler) HandleXrayRemove(w http.ResponseWriter, r *http.Request) }) } -// ================== Xray Configuration ================== +// ================== Xray 配置 ================== -// HandleXrayConfig handles GET/POST /api/child/xray/config +// 处理 GET/POST /api/child/xray/config。 func (h *ManageHandler) HandleXrayConfig(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -364,11 +368,7 @@ func (h *ManageHandler) HandleXrayConfig(w http.ResponseWriter, r *http.Request) } func (h *ManageHandler) getXrayConfig(w http.ResponseWriter, r *http.Request) { - configPaths := []string{ - "/usr/local/etc/xray/config.json", - "/etc/xray/config.json", - "/opt/xray/config.json", - } + configPaths := constants.DefaultXrayConfigPaths var configPath string var content []byte @@ -413,7 +413,7 @@ func (h *ManageHandler) setXrayConfig(w http.ResponseWriter, r *http.Request) { configPath := req.Path if configPath == "" { - configPath = "/usr/local/etc/xray/config.json" + configPath = constants.DefaultXrayConfigPaths[0] } dir := filepath.Dir(configPath) @@ -436,9 +436,9 @@ func (h *ManageHandler) setXrayConfig(w http.ResponseWriter, r *http.Request) { }) } -// ================== Xray System Configuration ================== +// ================== Xray 系统配置 ================== -// XraySystemConfig represents the system configuration state +// XraySystemConfig 表示 Xray 系统配置状态。 type XraySystemConfig struct { MetricsEnabled bool `json:"metrics_enabled"` MetricsListen string `json:"metrics_listen"` @@ -447,7 +447,7 @@ type XraySystemConfig struct { GrpcPort int `json:"grpc_port"` } -// HandleXraySystemConfig handles GET/POST /api/child/xray/system-config +// 处理 GET/POST /api/child/xray/system-config。 func (h *ManageHandler) HandleXraySystemConfig(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -484,7 +484,7 @@ func (h *ManageHandler) getXraySystemConfig(w http.ResponseWriter, r *http.Reque } sysConfig := &XraySystemConfig{ - MetricsListen: "127.0.0.1:38889", + MetricsListen: constants.DefaultMetricsListen, GrpcPort: 46736, } @@ -605,10 +605,10 @@ func (h *ManageHandler) updateXraySystemConfig(w http.ResponseWriter, r *http.Re apiInbound := map[string]interface{}{ "tag": "api", "port": float64(req.GrpcPort), - "listen": "127.0.0.1", + "listen": constants.LocalhostIP, "protocol": "dokodemo-door", "settings": map[string]interface{}{ - "address": "127.0.0.1", + "address": constants.LocalhostIP, }, } config["inbounds"] = append([]interface{}{apiInbound}, inbounds...) @@ -618,10 +618,10 @@ func (h *ManageHandler) updateXraySystemConfig(w http.ResponseWriter, r *http.Re map[string]interface{}{ "tag": "api", "port": float64(req.GrpcPort), - "listen": "127.0.0.1", + "listen": constants.LocalhostIP, "protocol": "dokodemo-door", "settings": map[string]interface{}{ - "address": "127.0.0.1", + "address": constants.LocalhostIP, }, }, } @@ -722,9 +722,9 @@ func (h *ManageHandler) removeAPIRoutingRule(config map[string]interface{}) { routing["rules"] = newRules } -// ================== Nginx Installation ================== +// ================== Nginx 安装 ================== -// HandleNginxInstall handles POST /api/child/nginx/install +// 处理 POST /api/child/nginx/install。 func (h *ManageHandler) HandleNginxInstall(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -777,7 +777,7 @@ func (h *ManageHandler) HandleNginxInstall(w http.ResponseWriter, r *http.Reques }) } -// HandleNginxRemove handles POST /api/child/nginx/remove +// 处理 POST /api/child/nginx/remove。 func (h *ManageHandler) HandleNginxRemove(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -814,9 +814,9 @@ func (h *ManageHandler) HandleNginxRemove(w http.ResponseWriter, r *http.Request }) } -// ================== Nginx Configuration ================== +// ================== Nginx 配置 ================== -// HandleNginxConfig handles GET/POST /api/child/nginx/config +// 处理 GET/POST /api/child/nginx/config。 func (h *ManageHandler) HandleNginxConfig(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -834,10 +834,7 @@ func (h *ManageHandler) HandleNginxConfig(w http.ResponseWriter, r *http.Request } func (h *ManageHandler) getNginxConfig(w http.ResponseWriter, r *http.Request) { - configPaths := []string{ - "/etc/nginx/nginx.conf", - "/usr/local/nginx/conf/nginx.conf", - } + configPaths := constants.DefaultNginxConfigPaths var configPath string var content []byte @@ -876,7 +873,7 @@ func (h *ManageHandler) setNginxConfig(w http.ResponseWriter, r *http.Request) { configPath := req.Path if configPath == "" { - configPath = "/etc/nginx/nginx.conf" + configPath = constants.DefaultNginxConfigPaths[0] } backupPath := configPath + ".bak." + time.Now().Format("20060102150405") @@ -908,9 +905,9 @@ func (h *ManageHandler) setNginxConfig(w http.ResponseWriter, r *http.Request) { }) } -// ================== System Info ================== +// ================== 系统信息 ================== -// HandleSystemInfo handles GET /api/child/system/info +// 处理 GET /api/child/system/info。 func (h *ManageHandler) HandleSystemInfo(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -960,9 +957,9 @@ func (h *ManageHandler) HandleSystemInfo(w http.ResponseWriter, r *http.Request) writeJSON(w, http.StatusOK, info) } -// ================== Config Files Management ================== +// ================== 配置文件管理 ================== -// ConfigFileInfo represents a config file entry +// ConfigFileInfo 表示配置文件条目。 type ConfigFileInfo struct { Name string `json:"name"` Path string `json:"path"` @@ -970,7 +967,7 @@ type ConfigFileInfo struct { ModTime string `json:"mod_time"` } -// HandleXrayConfigFiles handles listing and managing xray config files +// 处理 xray 配置文件的列表与读写。 func (h *ManageHandler) HandleXrayConfigFiles(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -993,11 +990,7 @@ func (h *ManageHandler) HandleXrayConfigFiles(w http.ResponseWriter, r *http.Req } func (h *ManageHandler) listXrayConfigFiles(w http.ResponseWriter, r *http.Request) { - configDirs := []string{ - "/usr/local/etc/xray", - "/etc/xray", - "/opt/xray", - } + configDirs := constants.XrayConfigDirPaths var files []ConfigFileInfo var baseDir string @@ -1043,9 +1036,9 @@ func (h *ManageHandler) getXrayConfigFile(w http.ResponseWriter, r *http.Request file = filepath.Clean(file) configDirs := []string{ - "/usr/local/etc/xray", - "/etc/xray", - "/opt/xray", + constants.XrayConfigDirPaths[0], + constants.XrayConfigDirPaths[1], + constants.XrayConfigDirPaths[2], } var filePath string @@ -1100,9 +1093,9 @@ func (h *ManageHandler) saveXrayConfigFile(w http.ResponseWriter, r *http.Reques } configDirs := []string{ - "/usr/local/etc/xray", - "/etc/xray", - "/opt/xray", + constants.XrayConfigDirPaths[0], + constants.XrayConfigDirPaths[1], + constants.XrayConfigDirPaths[2], } var configDir string @@ -1114,7 +1107,7 @@ func (h *ManageHandler) saveXrayConfigFile(w http.ResponseWriter, r *http.Reques } if configDir == "" { - configDir = "/usr/local/etc/xray" + configDir = constants.XrayConfigDirPaths[0] if err := os.MkdirAll(configDir, 0755); err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to create config directory: %v", err)) return @@ -1145,7 +1138,7 @@ func (h *ManageHandler) saveXrayConfigFile(w http.ResponseWriter, r *http.Reques }) } -// HandleNginxConfigFiles handles listing and managing nginx config files +// 处理 nginx 配置文件的列表与读写。 func (h *ManageHandler) HandleNginxConfigFiles(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -1172,10 +1165,10 @@ func (h *ManageHandler) listNginxConfigFiles(w http.ResponseWriter, r *http.Requ dir string description string }{ - {"/etc/nginx", "main"}, - {"/etc/nginx/sites-available", "sites-available"}, - {"/etc/nginx/sites-enabled", "sites-enabled"}, - {"/etc/nginx/conf.d", "conf.d"}, + {constants.NginxConfigDirPaths[0], "main"}, + {constants.NginxConfigDirPaths[1], "sites-available"}, + {constants.NginxConfigDirPaths[2], "sites-enabled"}, + {constants.NginxConfigDirPaths[3], "conf.d"}, } result := make(map[string][]ConfigFileInfo) @@ -1218,11 +1211,11 @@ func (h *ManageHandler) getNginxConfigFile(w http.ResponseWriter, r *http.Reques file = filepath.Clean(file) allowedDirs := []string{ - "/etc/nginx", - "/etc/nginx/sites-available", - "/etc/nginx/sites-enabled", - "/etc/nginx/conf.d", - "/usr/local/nginx/conf", + constants.NginxConfigDirPaths[0], + constants.NginxConfigDirPaths[1], + constants.NginxConfigDirPaths[2], + constants.NginxConfigDirPaths[3], + constants.NginxConfigDirPaths[4], } var filePath string @@ -1283,8 +1276,8 @@ func (h *ManageHandler) saveNginxConfigFile(w http.ResponseWriter, r *http.Reque req.Path = filepath.Clean(req.Path) allowedDirs := []string{ - "/etc/nginx", - "/usr/local/nginx/conf", + constants.NginxConfigDirPaths[0], + constants.NginxConfigDirPaths[4], } allowed := false @@ -1338,16 +1331,16 @@ func (h *ManageHandler) saveNginxConfigFile(w http.ResponseWriter, r *http.Reque }) } -// ================== Xray Inbounds Management ================== +// ================== Xray 入站管理 ================== -// InboundRequest represents inbound management request +// InboundRequest 表示入站管理请求。 type InboundRequest struct { Action string `json:"action"` Inbound map[string]interface{} `json:"inbound,omitempty"` Tag string `json:"tag,omitempty"` } -// HandleInbounds handles inbound management +// 处理入站管理请求。 func (h *ManageHandler) HandleInbounds(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -1412,7 +1405,7 @@ func (h *ManageHandler) getInboundTagsFromGRPC() []string { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - clients, err := xrpc.New(ctx, "127.0.0.1", uint16(apiPort)) + clients, err := xrpc.New(ctx, constants.LocalhostIP, uint16(apiPort)) if err != nil { log.Printf("[Manage] Failed to connect to Xray gRPC: %v", err) return nil @@ -1517,7 +1510,7 @@ func (h *ManageHandler) manageInbound(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - clients, err := xrpc.New(ctx, "127.0.0.1", uint16(apiPort)) + clients, err := xrpc.New(ctx, constants.LocalhostIP, uint16(apiPort)) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to connect to Xray: %v", err)) return @@ -1557,27 +1550,27 @@ func (h *ManageHandler) manageInbound(w http.ResponseWriter, r *http.Request) { return } - // Try to remove from runtime (ignore error if not running) + // 尝试从运行态移除(未运行时报错可忽略) runtimeErr := h.removeInbound(ctx, clients.Handler, req.Tag) if runtimeErr != nil { log.Printf("[Manage] Warning: Failed to remove inbound from runtime: %v", runtimeErr) } - // Remove from config file (this is the primary operation) + // 从配置文件移除(主流程) configErr := h.removeInboundFromConfig(req.Tag) if configErr != nil { log.Printf("[Manage] Warning: Failed to remove inbound from config: %v", configErr) } - // Success if config operation succeeded (runtime removal is optional) - // The inbound might not exist in runtime if Xray wasn't restarted after config change + // 配置文件操作成功即可视为成功(运行态移除可选) + // 配置改动后若未重启,运行态可能还没有该入站 if configErr != nil { - // Config file operation failed + // 配置文件操作失败 if runtimeErr != nil { - // Both failed - check if it's just "not found" errors + // 两边都失败时,判断是否只是“未找到”错误 if strings.Contains(runtimeErr.Error(), "not enough information") { - // Xray says the inbound doesn't exist in runtime, which is fine - // Just report config error + // Xray 返回运行态不存在该入站,这属于可接受情况 + // 仅返回配置文件错误 writeError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to remove inbound from config: %v", configErr)) } else { writeError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to remove inbound: runtime=%v, config=%v", runtimeErr, configErr)) @@ -1588,7 +1581,7 @@ func (h *ManageHandler) manageInbound(w http.ResponseWriter, r *http.Request) { return } - // Config succeeded, runtime error is acceptable (inbound might not be loaded) + // 配置成功时,运行态报错可接受(可能尚未加载) writeJSON(w, http.StatusOK, map[string]interface{}{ "success": true, "message": "Inbound removed successfully", @@ -1599,16 +1592,16 @@ func (h *ManageHandler) manageInbound(w http.ResponseWriter, r *http.Request) { } } -// ================== Xray Outbounds Management ================== +// ================== Xray 出站管理 ================== -// OutboundRequest represents outbound management request +// OutboundRequest 表示出站管理请求。 type OutboundRequest struct { Action string `json:"action"` Outbound map[string]interface{} `json:"outbound,omitempty"` Tag string `json:"tag,omitempty"` } -// HandleOutbounds handles outbound management +// 处理出站管理请求。 func (h *ManageHandler) HandleOutbounds(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -1673,7 +1666,7 @@ func (h *ManageHandler) manageOutbound(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - clients, err := xrpc.New(ctx, "127.0.0.1", uint16(apiPort)) + clients, err := xrpc.New(ctx, constants.LocalhostIP, uint16(apiPort)) if err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to connect to Xray: %v", err)) return @@ -1726,9 +1719,9 @@ func (h *ManageHandler) manageOutbound(w http.ResponseWriter, r *http.Request) { } } -// ================== Xray Routing Management ================== +// ================== Xray 路由管理 ================== -// RoutingRequest represents routing management request +// RoutingRequest 表示路由管理请求。 type RoutingRequest struct { Action string `json:"action"` Routing map[string]interface{} `json:"routing,omitempty"` @@ -1736,7 +1729,7 @@ type RoutingRequest struct { Index int `json:"index,omitempty"` } -// HandleRouting handles routing management +// 处理路由管理请求。 func (h *ManageHandler) HandleRouting(w http.ResponseWriter, r *http.Request) { if !h.authenticate(r) { writeError(w, http.StatusUnauthorized, "Unauthorized") @@ -1871,14 +1864,10 @@ func (h *ManageHandler) manageRouting(w http.ResponseWriter, r *http.Request) { }) } -// ================== Helper Functions ================== +// ================== 辅助函数 ================== func (h *ManageHandler) findXrayConfigPath() string { - configPaths := []string{ - "/usr/local/etc/xray/config.json", - "/etc/xray/config.json", - "/opt/xray/config.json", - } + configPaths := constants.DefaultXrayConfigPaths for _, p := range configPaths { if _, err := os.Stat(p); err == nil { @@ -2124,9 +2113,9 @@ func (h *ManageHandler) removeOutboundFromConfig(tag string) error { return os.WriteFile(configPath, newContent, 0644) } -// ================== Scan ================== +// ================== 扫描 ================== -// ScanResponse represents the response for scan operation +// ScanResponse 表示扫描接口响应。 type ScanResponse struct { Success bool `json:"success"` Message string `json:"message"` @@ -2139,7 +2128,7 @@ type ScanResponse struct { ConfigAddedSections []string `json:"config_added_sections,omitempty"` } -// HandleScan handles POST /api/child/scan +// 处理 POST /api/child/scan。 func (h *ManageHandler) HandleScan(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -2220,9 +2209,9 @@ func (h *ManageHandler) HandleScan(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, response) } -// ================== Xray Config Auto-Complete ================== +// ================== Xray 配置自动补全 ================== -// EnsureXrayConfigResult holds the result of config check +// EnsureXrayConfigResult 表示配置检查结果。 type EnsureXrayConfigResult struct { ConfigPath string `json:"config_path"` Modified bool `json:"modified"` @@ -2230,7 +2219,7 @@ type EnsureXrayConfigResult struct { Error string `json:"error,omitempty"` } -// EnsureXrayConfig checks and completes Xray configuration +// 检查并补全 Xray 配置。 func (h *ManageHandler) EnsureXrayConfig() *EnsureXrayConfigResult { result := &EnsureXrayConfigResult{} @@ -2279,7 +2268,7 @@ func (h *ManageHandler) EnsureXrayConfig() *EnsureXrayConfigResult { if _, ok := config["metrics"]; !ok { config["metrics"] = map[string]interface{}{ "tag": "Metrics", - "listen": "127.0.0.1:38889", + "listen": constants.DefaultMetricsListen, } result.AddedSections = append(result.AddedSections, "metrics") modified = true @@ -2376,10 +2365,10 @@ func (h *ManageHandler) addAPIInbound(config map[string]interface{}) { apiInbound := map[string]interface{}{ "tag": "api", "port": float64(46736), - "listen": "127.0.0.1", + "listen": constants.LocalhostIP, "protocol": "dokodemo-door", "settings": map[string]interface{}{ - "address": "127.0.0.1", + "address": constants.LocalhostIP, }, } @@ -2432,9 +2421,9 @@ func (h *ManageHandler) addAPIRoutingRule(config map[string]interface{}) { routing["rules"] = append([]interface{}{apiRule}, rules...) } -// ================== Certificate Deploy ================== +// ================== 证书部署 ================== -// CertDeployRequest represents a certificate deploy request from master +// CertDeployRequest 表示主控端下发的证书部署请求。 type CertDeployRequest struct { Domain string `json:"domain"` CertPEM string `json:"cert_pem"` @@ -2444,7 +2433,7 @@ type CertDeployRequest struct { Reload string `json:"reload"` // nginx, xray, both, none } -// HandleCertDeploy handles POST /api/child/cert/deploy +// 处理 POST /api/child/cert/deploy。 func (h *ManageHandler) HandleCertDeploy(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "method not allowed") @@ -2513,9 +2502,9 @@ func deployNginxSSLConfig(domain string) { return } - confDir := "/usr/local/nginx/conf" + confDir := constants.NginxConfigDirPaths[4] if _, err := os.Stat(confDir); err != nil { - confDir = "/etc/nginx" + confDir = constants.NginxConfigDirPaths[0] } certDir := filepath.Join(confDir, "cert") @@ -2539,7 +2528,7 @@ func deployNginxSSLConfig(domain string) { } `, domain, domain, domain) - // Write to conf.d or append via include + // 写入 conf.d,或通过 include 挂载 confDDir := filepath.Join(confDir, "conf.d") os.MkdirAll(confDDir, 0755) sslConfPath := filepath.Join(confDDir, "ssl.conf") @@ -2549,7 +2538,7 @@ func deployNginxSSLConfig(domain string) { return } - // Ensure main nginx.conf includes conf.d/*.conf + // 确保主 nginx.conf 包含 conf.d/*.conf mainConf := filepath.Join(confDir, "nginx.conf") content, err := os.ReadFile(mainConf) if err != nil { @@ -2559,7 +2548,7 @@ func deployNginxSSLConfig(domain string) { includeDirective := "include conf.d/*.conf;" if !strings.Contains(string(content), includeDirective) { - // Insert include before the last closing brace of http block + // 在 http 块最后一个右括号前插入 include text := string(content) lastBrace := strings.LastIndex(text, "}") if lastBrace > 0 { @@ -2574,8 +2563,8 @@ func deployNginxSSLConfig(domain string) { log.Printf("[Manage] Nginx SSL config deployed for domain %s at %s", domain, sslConfPath) } -// HandleNginxSetupSSL handles POST /api/child/nginx/setup-ssl -// Deploys nginx.conf + domain server block to servers/{domain}.conf. +// HandleNginxSetupSSL 处理 POST /api/child/nginx/setup-ssl。 +// 部署 nginx.conf 和 servers/{domain}.conf。 func (h *ManageHandler) HandleNginxSetupSSL(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { writeError(w, http.StatusMethodNotAllowed, "Method not allowed") @@ -2598,17 +2587,17 @@ func (h *ManageHandler) HandleNginxSetupSSL(w http.ResponseWriter, r *http.Reque domain := strings.ToLower(strings.TrimSpace(req.Domain)) - confDir := "/usr/local/nginx" + confDir := constants.NginxPrimaryPrefixDir if _, err := os.Stat(confDir); err != nil { - confDir = "/etc/nginx" + confDir = constants.NginxConfigDirPaths[0] } - // Ensure cert and servers directories exist + // 确保证书和 servers 目录存在 os.MkdirAll(filepath.Join(confDir, "cert"), 0755) os.MkdirAll(filepath.Join(confDir, "servers"), 0755) if req.NginxConfig != "" { - // Deploy base nginx.conf + // 下发主 nginx.conf mainConf := filepath.Join(confDir, "nginx.conf") if content, err := os.ReadFile(mainConf); err == nil { os.WriteFile(mainConf+".bak."+time.Now().Format("20060102150405"), content, 0644) @@ -2621,7 +2610,7 @@ func (h *ManageHandler) HandleNginxSetupSSL(w http.ResponseWriter, r *http.Reque } if req.DomainConfig != "" { - // Deploy domain-specific server block to servers/{domain}.conf + // 下发域名 server 配置到 servers/{domain}.conf domainConfPath := filepath.Join(confDir, "servers", domain+".conf") if err := os.WriteFile(domainConfPath, []byte(req.DomainConfig), 0644); err != nil { writeError(w, http.StatusInternalServerError, fmt.Sprintf("Failed to write domain config: %v", err)) @@ -2629,11 +2618,11 @@ func (h *ManageHandler) HandleNginxSetupSSL(w http.ResponseWriter, r *http.Reque } log.Printf("[Manage] Domain config deployed at %s", domainConfPath) } else { - // Fallback: legacy behavior + // 兜底:沿用旧逻辑 deployNginxSSLConfig(domain) } - // Reload nginx to apply + // 重载 nginx 使配置生效 if err := reloadNginx(); err != nil { log.Printf("[Manage] Nginx reload after setup-ssl failed: %v", err) } @@ -2645,7 +2634,7 @@ func (h *ManageHandler) HandleNginxSetupSSL(w http.ResponseWriter, r *http.Reque } func reloadNginx() error { - for _, bin := range []string{"/usr/local/nginx/sbin/nginx", "nginx"} { + for _, bin := range constants.NginxBinarySearchPaths { if path, err := exec.LookPath(bin); err == nil { return runCommand(path, "-s", "reload") } @@ -2660,11 +2649,11 @@ func runCommand(name string, args ...string) error { return nil } -// deployDefaultXrayConfig deploys the embedded default xray config if no config exists. +// 在缺失配置时下发内置默认配置。 func (h *ManageHandler) deployDefaultXrayConfig() { - configPath := "/usr/local/etc/xray/config.json" + configPath := constants.DefaultXrayConfigPaths[0] if _, err := os.Stat(configPath); err == nil { - // Config already exists — run EnsureXrayConfig to add missing sections + // 配置已存在,执行 EnsureXrayConfig 补齐缺失段 result := h.EnsureXrayConfig() if result.Modified { log.Printf("[Manage] Xray config updated after install: added %v", result.AddedSections) @@ -2685,7 +2674,7 @@ func (h *ManageHandler) deployDefaultXrayConfig() { exec.Command("systemctl", "restart", "xray").Run() } -// ================== SSE Streaming Install/Remove ================== +// ================== SSE 流式安装/卸载 ================== func sseStreamCmd(w http.ResponseWriter, r *http.Request, cmd *exec.Cmd, completeMsg string) { w.Header().Set("Content-Type", "text/event-stream") @@ -2775,7 +2764,7 @@ func (h *ManageHandler) HandleXrayInstallStream(w http.ResponseWriter, r *http.R cmd.Env = os.Environ() sseStreamCmd(w, r, cmd, "Xray installed successfully") - // Deploy default config after install + // 安装完成后下发默认配置 h.deployDefaultXrayConfig() } diff --git a/internal/handler/api.go b/internal/handler/pull_api_handler.go similarity index 69% rename from internal/handler/api.go rename to internal/handler/pull_api_handler.go index 4a90950..606fd33 100644 --- a/internal/handler/api.go +++ b/internal/handler/pull_api_handler.go @@ -7,16 +7,16 @@ import ( "strings" "mmw-agent/internal/agent" - "mmw-agent/internal/config" + "mmw-agent/internal/constants" ) -// APIHandler handles API requests from the master server (for pull mode) +// APIHandler 处理来自主控端的请求(拉取模式)。 type APIHandler struct { client *agent.Client configToken string } -// NewAPIHandler creates a new API handler +// 创建 API 处理器。 func NewAPIHandler(client *agent.Client, configToken string) *APIHandler { return &APIHandler{ client: client, @@ -24,7 +24,7 @@ func NewAPIHandler(client *agent.Client, configToken string) *APIHandler { } } -// ServeHTTP handles the HTTP request for traffic data +// 返回流量数据。 func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) @@ -32,7 +32,7 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if !h.authenticate(r) { - w.Header().Set("Content-Type", "application/json") + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) w.WriteHeader(http.StatusUnauthorized) json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, @@ -44,7 +44,7 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { stats, err := h.client.GetStats() if err != nil { log.Printf("[API] Failed to get stats: %v", err) - w.Header().Set("Content-Type", "application/json") + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) w.WriteHeader(http.StatusInternalServerError) json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, @@ -53,14 +53,14 @@ func (h *APIHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - w.Header().Set("Content-Type", "application/json") + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "stats": stats, }) } -// ServeSpeedHTTP handles the HTTP request for speed data +// 返回速率数据。 func (h *APIHandler) ServeSpeedHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) @@ -68,7 +68,7 @@ func (h *APIHandler) ServeSpeedHTTP(w http.ResponseWriter, r *http.Request) { } if !h.authenticate(r) { - w.Header().Set("Content-Type", "application/json") + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) w.WriteHeader(http.StatusUnauthorized) json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, @@ -79,7 +79,7 @@ func (h *APIHandler) ServeSpeedHTTP(w http.ResponseWriter, r *http.Request) { uploadSpeed, downloadSpeed := h.client.GetSpeed() - w.Header().Set("Content-Type", "application/json") + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "upload_speed": uploadSpeed, @@ -87,9 +87,9 @@ func (h *APIHandler) ServeSpeedHTTP(w http.ResponseWriter, r *http.Request) { }) } -// authenticate checks if the request is authorized (token + User-Agent) +// 校验请求身份(token + User-Agent)。 func (h *APIHandler) authenticate(r *http.Request) bool { - if r.Header.Get("User-Agent") != config.AgentUserAgent { + if r.Header.Get(constants.HeaderUserAgent) != constants.AgentUserAgent { return false } @@ -97,13 +97,13 @@ func (h *APIHandler) authenticate(r *http.Request) bool { return true } - auth := r.Header.Get("Authorization") + auth := r.Header.Get(constants.HeaderAuthorization) if auth == "" { return false } - if strings.HasPrefix(auth, "Bearer ") { - token := strings.TrimPrefix(auth, "Bearer ") + if strings.HasPrefix(auth, constants.BearerPrefix) { + token := strings.TrimPrefix(auth, constants.BearerPrefix) return token == h.configToken } diff --git a/internal/handler/routes.go b/internal/handler/routes.go new file mode 100644 index 0000000..7fa0f89 --- /dev/null +++ b/internal/handler/routes.go @@ -0,0 +1,41 @@ +package handler + +import ( + "net/http" + + "mmw-agent/internal/constants" +) + +// 注册子端 API 路由 +func RegisterChildRoutes(mux *http.ServeMux, apiHandler *APIHandler, manageHandler *ManageHandler) { + // 拉取模式数据接口 + mux.HandleFunc(constants.PathChildTraffic, apiHandler.ServeHTTP) + mux.HandleFunc(constants.PathChildSpeed, apiHandler.ServeSpeedHTTP) + + // 管理接口 + mux.HandleFunc(constants.PathChildServiceStats, manageHandler.HandleServicesStatus) + mux.HandleFunc(constants.PathChildServiceCtl, manageHandler.HandleServiceControl) + mux.HandleFunc(constants.PathChildXrayInstall, manageHandler.HandleXrayInstall) + mux.HandleFunc(constants.PathChildXrayRemove, manageHandler.HandleXrayRemove) + mux.HandleFunc(constants.PathChildXrayConfig, manageHandler.HandleXrayConfig) + mux.HandleFunc(constants.PathChildXraySysCfg, manageHandler.HandleXraySystemConfig) + mux.HandleFunc(constants.PathChildXrayCfgFiles, manageHandler.HandleXrayConfigFiles) + mux.HandleFunc(constants.PathChildNginxInstall, manageHandler.HandleNginxInstall) + mux.HandleFunc(constants.PathChildNginxRemove, manageHandler.HandleNginxRemove) + mux.HandleFunc(constants.PathChildNginxConfig, manageHandler.HandleNginxConfig) + mux.HandleFunc(constants.PathChildNginxCfgFile, manageHandler.HandleNginxConfigFiles) + mux.HandleFunc(constants.PathChildSystemInfo, manageHandler.HandleSystemInfo) + mux.HandleFunc(constants.PathChildInbounds, manageHandler.HandleInbounds) + mux.HandleFunc(constants.PathChildOutbounds, manageHandler.HandleOutbounds) + mux.HandleFunc(constants.PathChildRouting, manageHandler.HandleRouting) + mux.HandleFunc(constants.PathChildScan, manageHandler.HandleScan) + mux.HandleFunc(constants.PathChildCertDeploy, manageHandler.HandleCertDeploy) + mux.HandleFunc(constants.PathChildNginxSetup, manageHandler.HandleNginxSetupSSL) + mux.HandleFunc(constants.PathChildDomainProbe, manageHandler.HandleDomainLatencyProbe) + + // SSE 流式安装和卸载接口 + mux.HandleFunc(constants.PathChildXrayInstallStream, manageHandler.HandleXrayInstallStream) + mux.HandleFunc(constants.PathChildXrayRemoveStream, manageHandler.HandleXrayRemoveStream) + mux.HandleFunc(constants.PathChildNginxInstallSSE, manageHandler.HandleNginxInstallStream) + mux.HandleFunc(constants.PathChildNginxRemoveSSE, manageHandler.HandleNginxRemoveStream) +} diff --git a/internal/handler/embed.go b/internal/handler/xray_default_config_embed.go similarity index 100% rename from internal/handler/embed.go rename to internal/handler/xray_default_config_embed.go diff --git a/internal/xrpc/client.go b/internal/xrpc/client.go index aa3ec0e..6253446 100644 --- a/internal/xrpc/client.go +++ b/internal/xrpc/client.go @@ -11,7 +11,7 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -// Clients groups the gRPC stubs that the samples rely on. +// Clients 汇总当前用到的 gRPC 客户端。 type Clients struct { Connection *grpc.ClientConn Handler handlerpb.HandlerServiceClient @@ -19,7 +19,7 @@ type Clients struct { Stats statspb.StatsServiceClient } -// New establishes an insecure (plaintext) connection against a running Xray API endpoint. +// 连接到运行中的 Xray API,默认使用明文连接。 func New(ctx context.Context, addr string, port uint16, dialOpts ...grpc.DialOption) (*Clients, error) { target := fmt.Sprintf("%s:%d", addr, port) opts := []grpc.DialOption{ diff --git a/internal/xrpc/services/handler/inbound.go b/internal/xrpc/services/handler/inbound.go index f8fc4c9..ff20a4f 100644 --- a/internal/xrpc/services/handler/inbound.go +++ b/internal/xrpc/services/handler/inbound.go @@ -21,7 +21,7 @@ import ( vmessin "github.com/xtls/xray-core/proxy/vmess/inbound" ) -// AddVMessInbound demonstrates HandlerServiceClient.AddInbound for VMess inbound. +// 添加 VMess 入站示例。 func AddVMessInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -46,7 +46,7 @@ func AddVMessInbound(ctx context.Context, client command.HandlerServiceClient, t return err } -// AddVLESSInbound adds a VLESS inbound with Vision style fallbacks. +// 添加带 Vision 回落配置的 VLESS 入站。 func AddVLESSInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -80,7 +80,7 @@ func AddVLESSInbound(ctx context.Context, client command.HandlerServiceClient, t return err } -// AddTrojanInbound registers a Trojan inbound with two users and ALPN fallback. +// 添加带双用户和 ALPN 回落的 Trojan 入站。 func AddTrojanInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -115,7 +115,7 @@ func AddTrojanInbound(ctx context.Context, client command.HandlerServiceClient, return err } -// AddShadowsocksInbound adds an AEAD Shadowsocks inbound supporting both TCP and UDP. +// 添加支持 TCP/UDP 的 AEAD Shadowsocks 入站。 func AddShadowsocksInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -137,7 +137,7 @@ func AddShadowsocksInbound(ctx context.Context, client command.HandlerServiceCli return err } -// AddShadowsocks2022Inbound covers both single user and multi-user deployment. +// 添加 Shadowsocks 2022 入站,兼容单用户和多用户场景。 func AddShadowsocks2022Inbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { server := &ss2022.MultiUserServerConfig{ Method: "2022-blake3-aes-128-gcm", @@ -168,7 +168,7 @@ func AddShadowsocks2022Inbound(ctx context.Context, client command.HandlerServic return err } -// AddSocksInbound exposes a SOCKS5 server with username/password authentication. +// 添加带账号密码认证的 SOCKS5 入站。 func AddSocksInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -184,7 +184,7 @@ func AddSocksInbound(ctx context.Context, client command.HandlerServiceClient, t return err } -// AddHTTPInbound adds an HTTP proxy inbound with basic auth. +// 添加带基础认证的 HTTP 入站。 func AddHTTPInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -199,7 +199,7 @@ func AddHTTPInbound(ctx context.Context, client command.HandlerServiceClient, ta return err } -// AddDokodemoInbound configures a dokodemo-door mirror port. +// 添加 dokodemo-door 入站并转发到目标端口。 func AddDokodemoInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32, targetPort uint32) error { inbound := inboundConfig( tag, @@ -216,7 +216,7 @@ func AddDokodemoInbound(ctx context.Context, client command.HandlerServiceClient return err } -// AddDNSInbound exposes the built-in DNS server on an API port. +// 添加内置 DNS 入站。 func AddDNSInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32) error { inbound := inboundConfig( tag, @@ -235,7 +235,7 @@ func AddDNSInbound(ctx context.Context, client command.HandlerServiceClient, tag return err } -// AddLoopbackInbound ties an inbound to an existing outbound chain. +// 添加 loopback 入站并绑定已有出站链路。 func AddLoopbackInbound(ctx context.Context, client command.HandlerServiceClient, tag string, port uint32, targetInbound string) error { inbound := inboundConfig( tag, @@ -245,4 +245,3 @@ func AddLoopbackInbound(ctx context.Context, client command.HandlerServiceClient _, err := client.AddInbound(ctx, &command.AddInboundRequest{Inbound: inbound}) return err } - diff --git a/internal/xrpc/services/handler/outbound.go b/internal/xrpc/services/handler/outbound.go index d828f15..68490ca 100644 --- a/internal/xrpc/services/handler/outbound.go +++ b/internal/xrpc/services/handler/outbound.go @@ -195,4 +195,3 @@ func AddVMessOutbound(ctx context.Context, client command.HandlerServiceClient, _, err := client.AddOutbound(ctx, &command.AddOutboundRequest{Outbound: cfg}) return err } - diff --git a/internal/xrpc/services/handler/users.go b/internal/xrpc/services/handler/users.go index 8400960..b7aff20 100644 --- a/internal/xrpc/services/handler/users.go +++ b/internal/xrpc/services/handler/users.go @@ -13,7 +13,7 @@ import ( "github.com/xtls/xray-core/proxy/vmess" ) -// AddVMessUser demonstrates AlterInbound(AddUserOperation) for VMess. +// 通过 AlterInbound(AddUserOperation) 为 VMess 入站加用户。 func AddVMessUser(ctx context.Context, client command.HandlerServiceClient, inboundTag, email string) error { req := &command.AlterInboundRequest{ Tag: inboundTag, @@ -34,7 +34,7 @@ func AddVMessUser(ctx context.Context, client command.HandlerServiceClient, inbo return err } -// AddVLESSUser shows how to add VLESS users dynamically. +// 为 VLESS 入站动态加用户。 func AddVLESSUser(ctx context.Context, client command.HandlerServiceClient, inboundTag, email string) error { req := &command.AlterInboundRequest{ Tag: inboundTag, @@ -53,7 +53,7 @@ func AddVLESSUser(ctx context.Context, client command.HandlerServiceClient, inbo return err } -// AddTrojanUser adds a Trojan password to an inbound handler. +// 为 Trojan 入站加密码用户。 func AddTrojanUser(ctx context.Context, client command.HandlerServiceClient, inboundTag, email, password string) error { req := &command.AlterInboundRequest{ Tag: inboundTag, @@ -71,7 +71,7 @@ func AddTrojanUser(ctx context.Context, client command.HandlerServiceClient, inb return err } -// AddShadowsocksUser sets up a Shadowsocks AEAD credential. +// 为 Shadowsocks 入站加 AEAD 凭据。 func AddShadowsocksUser(ctx context.Context, client command.HandlerServiceClient, inboundTag, email, password string) error { req := &command.AlterInboundRequest{ Tag: inboundTag, @@ -90,7 +90,7 @@ func AddShadowsocksUser(ctx context.Context, client command.HandlerServiceClient return err } -// AddShadowsocks2022User covers key rotation for SS2022. +// 为 SS2022 入站新增用户密钥。 func AddShadowsocks2022User(ctx context.Context, client command.HandlerServiceClient, inboundTag, email string) error { req := &command.AlterInboundRequest{ Tag: inboundTag, @@ -107,7 +107,7 @@ func AddShadowsocks2022User(ctx context.Context, client command.HandlerServiceCl return err } -// RemoveUser removes any user (identified by email) from an inbound. +// 按邮箱从入站移除用户。 func RemoveUser(ctx context.Context, client command.HandlerServiceClient, inboundTag, email string) error { req := &command.AlterInboundRequest{ Tag: inboundTag, diff --git a/internal/xrpc/services/logger/logger.go b/internal/xrpc/services/logger/logger.go index 561f974..085da51 100644 --- a/internal/xrpc/services/logger/logger.go +++ b/internal/xrpc/services/logger/logger.go @@ -2,14 +2,15 @@ package logger import ( "context" - "time" + + "mmw-agent/internal/constants" loggerpb "github.com/xtls/xray-core/app/log/command" ) -// RestartLogger triggers the LoggerService restartLogger RPC and waits for completion. +// 调用 LoggerService 的 restartLogger 接口并等待完成。 func RestartLogger(ctx context.Context, client loggerpb.LoggerServiceClient) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, constants.DefaultRPCShortTimeout) defer cancel() _, err := client.RestartLogger(ctx, &loggerpb.RestartLoggerRequest{}) return err diff --git a/internal/xrpc/services/stats/stats.go b/internal/xrpc/services/stats/stats.go index 3f8830e..c5e9d7d 100644 --- a/internal/xrpc/services/stats/stats.go +++ b/internal/xrpc/services/stats/stats.go @@ -2,13 +2,14 @@ package stats import ( "context" - "time" + + "mmw-agent/internal/constants" statspb "github.com/xtls/xray-core/app/stats/command" ) func QueryTraffic(ctx context.Context, client statspb.StatsServiceClient, pattern string, reset bool) (int64, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, constants.DefaultRPCShortTimeout) defer cancel() resp, err := client.QueryStats(ctx, &statspb.QueryStatsRequest{ Pattern: pattern, @@ -24,7 +25,7 @@ func QueryTraffic(ctx context.Context, client statspb.StatsServiceClient, patter } func GetSystemStats(ctx context.Context, client statspb.StatsServiceClient) (*statspb.SysStatsResponse, error) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, constants.DefaultRPCShortTimeout) defer cancel() return client.GetSysStats(ctx, &statspb.SysStatsRequest{}) }