diff --git a/server/gateway-proxy.js b/server/gateway-proxy.js index 1a82081..71e542e 100644 --- a/server/gateway-proxy.js +++ b/server/gateway-proxy.js @@ -6,8 +6,11 @@ const DEFAULT_UPSTREAM_HANDSHAKE_TIMEOUT_MS = 10_000; /** Maximum frame payload size (256 KB). */ const MAX_FRAME_SIZE = 256 * 1024; -/** Maximum frames per connection per second. */ -const MAX_FRAMES_PER_SECOND = 30; +/** Sustained frame rate per connection. */ +const MAX_FRAMES_PER_SECOND = 60; + +/** Allow short startup bursts before rate limiting. */ +const MAX_FRAME_BURST = 120; const buildErrorResponse = (id, code, message) => { return { @@ -28,19 +31,34 @@ const safeJsonParse = (raw) => { } }; -/** Per-connection frame rate limiter. */ -const createFrameRateLimiter = (maxPerSecond = MAX_FRAMES_PER_SECOND) => { - let count = 0; - const interval = setInterval(() => { - count = 0; - }, 1000); - interval.unref(); +/** Per-connection token bucket rate limiter. */ +const createFrameRateLimiter = ( + maxPerSecond = MAX_FRAMES_PER_SECOND, + maxBurst = MAX_FRAME_BURST +) => { + let tokens = maxBurst; + let lastRefillAt = Date.now(); + + const refill = () => { + const now = Date.now(); + const elapsedMs = Math.max(0, now - lastRefillAt); + if (elapsedMs <= 0) return; + const replenished = (elapsedMs / 1000) * maxPerSecond; + tokens = Math.min(maxBurst, tokens + replenished); + lastRefillAt = now; + }; + return { check() { - return ++count <= maxPerSecond; + refill(); + if (tokens < 1) { + return false; + } + tokens -= 1; + return true; }, destroy() { - clearInterval(interval); + // No-op: token bucket has no timers to clean up. }, }; }; @@ -318,6 +336,9 @@ function createGatewayProxy(options) { : Buffer.isBuffer(reasonBuffer) ? reasonBuffer.toString() : ""; + log( + `[gateway-proxy] upstream closed code=${code} reason=${reason || "(none)"} hadConnect=${Boolean(connectRequestId)} responseSent=${connectResponseSent}` + ); if (!connectRequestId) { pendingUpstreamSetupError ||= { code: "studio.upstream_closed", @@ -384,6 +405,13 @@ function createGatewayProxy(options) { // Rate limiting if (!frameRateLimiter.check()) { + log( + "[gateway-proxy] proxy rate limit hit (>" + + MAX_FRAMES_PER_SECOND + + " frames/s sustained, burst " + + MAX_FRAME_BURST + + ")" + ); closeBoth(1008, "rate limit exceeded"); return; } @@ -438,6 +466,7 @@ function createGatewayProxy(options) { }); browserWs.on("close", () => { + log("[gateway-proxy] browser disconnected"); closeBoth(1000, "client closed"); }); diff --git a/src/lib/gateway/GatewayClient.ts b/src/lib/gateway/GatewayClient.ts index 8cdbe5a..06c186a 100644 --- a/src/lib/gateway/GatewayClient.ts +++ b/src/lib/gateway/GatewayClient.ts @@ -265,6 +265,7 @@ export class GatewayClient { private rejectConnect: ((error: Error) => void) | null = null; private manualDisconnect = false; private lastHello: GatewayHelloOk | null = null; + private _lastDisconnectCode: number | null = null; onStatus(handler: StatusHandler) { this.statusHandlers.add(handler); @@ -323,6 +324,7 @@ export class GatewayClient { }, onClose: ({ code, reason }) => { if (this.client !== nextClient) return; + this._lastDisconnectCode = code; const connectFailed = code === CONNECT_FAILED_CLOSE_CODE ? parseConnectFailedCloseReason(reason) : null; const err = connectFailed @@ -414,6 +416,10 @@ export class GatewayClient { return this.lastHello; } + get lastDisconnectCode() { + return this._lastDisconnectCode; + } + private updateStatus(status: GatewayStatus) { this.status = status; this.statusHandlers.forEach((handler) => handler(status)); @@ -654,6 +660,10 @@ const isNonRetryableConnectErrorCode = (code: string | null): boolean => { return NON_RETRYABLE_CONNECT_ERROR_CODES.has(normalized); }; +/** WebSocket close code 1008 = policy violation (rate limit). */ +const WS_CLOSE_POLICY_VIOLATION = 1008; +const RATE_LIMIT_RETRY_DELAY_MS = 15_000; + export const resolveGatewayAutoRetryDelayMs = (params: { status: GatewayStatus; didAutoConnect: boolean; @@ -662,6 +672,7 @@ export const resolveGatewayAutoRetryDelayMs = (params: { gatewayUrl: string; errorMessage: string | null; connectErrorCode: string | null; + lastDisconnectCode: number | null; attempt: number; }): number | null => { if (params.status !== "disconnected") return null; @@ -673,8 +684,13 @@ export const resolveGatewayAutoRetryDelayMs = (params: { if (isNonRetryableConnectErrorCode(params.connectErrorCode)) return null; if (params.connectErrorCode === null && isAuthError(params.errorMessage)) return null; + const baseDelay = + params.lastDisconnectCode === WS_CLOSE_POLICY_VIOLATION + ? Math.max(INITIAL_RETRY_DELAY_MS, RATE_LIMIT_RETRY_DELAY_MS) + : INITIAL_RETRY_DELAY_MS; + return Math.min( - INITIAL_RETRY_DELAY_MS * Math.pow(1.5, params.attempt), + baseDelay * Math.pow(1.5, params.attempt), MAX_RETRY_DELAY_MS ); }; @@ -911,7 +927,6 @@ export const useGatewayConnection = ( setDetectedAdapterType("custom"); setStatus("connected"); setConnectErrorCode(null); - retryAttemptRef.current = 0; gatewayDebugLog("connect:custom-success", { gatewayUrl }); } catch (err) { setStatus("disconnected"); @@ -975,7 +990,6 @@ export const useGatewayConnection = ( ? hello.adapterType : "openclaw"; setDetectedAdapterType(nextDetectedAdapterType); - retryAttemptRef.current = 0; setHasLastKnownGoodState(nextDetectedAdapterType === selectedAdapterType); settingsCoordinator.schedulePatch({ gateway: { @@ -1037,6 +1051,7 @@ export const useGatewayConnection = ( gatewayUrl, errorMessage: error, connectErrorCode, + lastDisconnectCode: client.lastDisconnectCode, attempt, }); if (!isAutoManagedAdapter(selectedAdapterType)) return; @@ -1049,12 +1064,15 @@ export const useGatewayConnection = ( status, }); retryTimerRef.current = setTimeout(() => { + // Call connect first (it synchronously resets retryAttemptRef to 0), + // then override with the correct attempt count so the next auto-retry + // uses proper exponential backoff. + void connect(); retryAttemptRef.current = attempt + 1; gatewayDebugLog("auto-retry-fire", { selectedAdapterType, attempt: retryAttemptRef.current, }); - void connect(); }, delay); return () => { @@ -1065,11 +1083,17 @@ export const useGatewayConnection = ( }; }, [connect, connectErrorCode, error, gatewayUrl, selectedAdapterType, status]); - // Reset retry count on successful connection + // Reset retry count after the connection has been stable for a minimum + // duration. If the upstream drops the connection quickly (e.g. within a + // few seconds), keeping the current attempt count lets exponential backoff + // work properly instead of hammering the gateway every 2 seconds. useEffect(() => { if (status === "connected") { hasConnectedOnceRef.current = true; - retryAttemptRef.current = 0; + const stableTimer = setTimeout(() => { + retryAttemptRef.current = 0; + }, 10_000); + return () => clearTimeout(stableTimer); } }, [status]); diff --git a/tests/unit/gatewayConnectRetryPolicy.test.ts b/tests/unit/gatewayConnectRetryPolicy.test.ts index b5327dd..cf48b47 100644 --- a/tests/unit/gatewayConnectRetryPolicy.test.ts +++ b/tests/unit/gatewayConnectRetryPolicy.test.ts @@ -2,17 +2,24 @@ import { describe, expect, it } from "vitest"; import { resolveGatewayAutoRetryDelayMs } from "@/lib/gateway/GatewayClient"; +const baseParams = { + status: "disconnected" as const, + didAutoConnect: true, + hasConnectedOnce: true, + wasManualDisconnect: false, + gatewayUrl: "wss://remote.example", + errorMessage: null as string | null, + connectErrorCode: null as string | null, + lastDisconnectCode: null as number | null, + attempt: 0, +}; + describe("resolveGatewayAutoRetryDelayMs", () => { it("does not retry when upstream gateway url is missing on Studio host", () => { const delay = resolveGatewayAutoRetryDelayMs({ - status: "disconnected", - didAutoConnect: true, - hasConnectedOnce: true, - wasManualDisconnect: false, - gatewayUrl: "wss://remote.example", + ...baseParams, errorMessage: "Gateway error (studio.gateway_url_missing): Upstream gateway URL is missing.", connectErrorCode: "studio.gateway_url_missing", - attempt: 0, }); expect(delay).toBeNull(); @@ -20,15 +27,10 @@ describe("resolveGatewayAutoRetryDelayMs", () => { it("does not retry when the upstream websocket upgrade fails", () => { const delay = resolveGatewayAutoRetryDelayMs({ - status: "disconnected", - didAutoConnect: true, - hasConnectedOnce: true, - wasManualDisconnect: false, - gatewayUrl: "wss://remote.example", + ...baseParams, errorMessage: "Gateway error (studio.upstream_error): Failed to connect to upstream gateway WebSocket.", connectErrorCode: "studio.upstream_error", - attempt: 0, }); expect(delay).toBeNull(); @@ -36,15 +38,10 @@ describe("resolveGatewayAutoRetryDelayMs", () => { it("does not retry when the upstream websocket handshake times out", () => { const delay = resolveGatewayAutoRetryDelayMs({ - status: "disconnected", - didAutoConnect: true, - hasConnectedOnce: true, - wasManualDisconnect: false, - gatewayUrl: "wss://remote.example", + ...baseParams, errorMessage: "Gateway error (studio.upstream_timeout): Timed out connecting Studio to the upstream gateway WebSocket.", connectErrorCode: "studio.upstream_timeout", - attempt: 0, }); expect(delay).toBeNull(); @@ -52,18 +49,43 @@ describe("resolveGatewayAutoRetryDelayMs", () => { it("does not retry when the upstream gateway explicitly rejects pairing", () => { const delay = resolveGatewayAutoRetryDelayMs({ - status: "disconnected", - didAutoConnect: true, - hasConnectedOnce: true, - wasManualDisconnect: false, - gatewayUrl: "wss://remote.example", + ...baseParams, errorMessage: "Gateway error (studio.upstream_rejected): Upstream gateway rejected connect (1008): pairing required.", connectErrorCode: "studio.upstream_rejected", - attempt: 0, }); expect(delay).toBeNull(); }); + + it("uses a longer base delay when disconnected by rate limiting (code 1008)", () => { + const delay = resolveGatewayAutoRetryDelayMs({ + ...baseParams, + lastDisconnectCode: 1008, + attempt: 0, + }); + + expect(delay).toBe(15_000); + }); + + it("applies exponential backoff on top of rate-limit base delay", () => { + const delay = resolveGatewayAutoRetryDelayMs({ + ...baseParams, + lastDisconnectCode: 1008, + attempt: 1, + }); + + expect(delay).toBe(22_500); + }); + + it("uses standard base delay for normal disconnects", () => { + const delay = resolveGatewayAutoRetryDelayMs({ + ...baseParams, + lastDisconnectCode: 1012, + attempt: 0, + }); + + expect(delay).toBe(2_000); + }); }); diff --git a/tests/unit/gatewayProxy.test.ts b/tests/unit/gatewayProxy.test.ts index 1ff5153..7d2cd8c 100644 --- a/tests/unit/gatewayProxy.test.ts +++ b/tests/unit/gatewayProxy.test.ts @@ -712,4 +712,165 @@ describe("createGatewayProxy", () => { } }); + it("allows short bursts of post-connect traffic without closing the socket", async () => { + const upstream = new WebSocketServer({ port: 0 }); + const address = upstream.address(); + if (!address || typeof address === "string") { + throw new Error("expected upstream server to have a port"); + } + const upstreamUrl = `ws://127.0.0.1:${address.port}`; + + upstream.on("connection", (ws) => { + ws.on("message", (raw) => { + const parsed = JSON.parse(String(raw)); + if (parsed?.method === "connect") { + ws.send( + JSON.stringify({ + type: "res", + id: parsed.id, + ok: true, + payload: { type: "hello-ok", protocol: 3, auth: {} }, + }) + ); + } + }); + }); + + const { createGatewayProxy } = await import("../../server/gateway-proxy"); + + const proxyHttp = await import("node:http").then((m) => m.createServer()); + const proxy = createGatewayProxy({ + loadUpstreamSettings: async () => ({ url: upstreamUrl, token: "host-token-456" }), + allowWs: (req: { url?: string }) => req.url === "/api/gateway/ws", + logError: () => {}, + }); + proxyHttp.on("upgrade", (req, socket, head) => proxy.handleUpgrade(req, socket, head)); + + await new Promise((resolve) => proxyHttp.listen(0, "127.0.0.1", resolve)); + const proxyAddr = proxyHttp.address(); + if (!proxyAddr || typeof proxyAddr === "string") { + throw new Error("expected proxy server to have a port"); + } + + const browser = new WebSocket(`ws://127.0.0.1:${proxyAddr.port}/api/gateway/ws`); + try { + await waitForEvent(browser, "open"); + browser.send( + JSON.stringify({ + type: "req", + id: "connect-burst-ok", + method: "connect", + params: { auth: {} }, + }) + ); + + await waitForEvent(browser, "message"); + + for (let index = 0; index < 80; index += 1) { + browser.send( + JSON.stringify({ + type: "req", + id: `burst-ok-${index}`, + method: "noop", + params: { index }, + }) + ); + } + + await new Promise((resolve) => setTimeout(resolve, 25)); + + expect(browser.readyState).toBe(WebSocket.OPEN); + } finally { + for (const client of upstream.clients) { + client.close(); + } + await Promise.all([ + closeWebSocket(browser), + closeWebSocketServer(upstream), + closeHttpServer(proxyHttp), + ]); + } + }); + + it("still rate limits abusive bursts that exceed the token bucket", async () => { + const upstream = new WebSocketServer({ port: 0 }); + const address = upstream.address(); + if (!address || typeof address === "string") { + throw new Error("expected upstream server to have a port"); + } + const upstreamUrl = `ws://127.0.0.1:${address.port}`; + + upstream.on("connection", (ws) => { + ws.on("message", (raw) => { + const parsed = JSON.parse(String(raw)); + if (parsed?.method === "connect") { + ws.send( + JSON.stringify({ + type: "res", + id: parsed.id, + ok: true, + payload: { type: "hello-ok", protocol: 3, auth: {} }, + }) + ); + } + }); + }); + + const { createGatewayProxy } = await import("../../server/gateway-proxy"); + + const proxyHttp = await import("node:http").then((m) => m.createServer()); + const proxy = createGatewayProxy({ + loadUpstreamSettings: async () => ({ url: upstreamUrl, token: "host-token-456" }), + allowWs: (req: { url?: string }) => req.url === "/api/gateway/ws", + logError: () => {}, + }); + proxyHttp.on("upgrade", (req, socket, head) => proxy.handleUpgrade(req, socket, head)); + + await new Promise((resolve) => proxyHttp.listen(0, "127.0.0.1", resolve)); + const proxyAddr = proxyHttp.address(); + if (!proxyAddr || typeof proxyAddr === "string") { + throw new Error("expected proxy server to have a port"); + } + + const browser = new WebSocket(`ws://127.0.0.1:${proxyAddr.port}/api/gateway/ws`); + try { + await waitForEvent(browser, "open"); + browser.send( + JSON.stringify({ + type: "req", + id: "connect-burst-limit", + method: "connect", + params: { auth: {} }, + }) + ); + + await waitForEvent(browser, "message"); + const closePromise = waitForEvent<[number, Buffer]>(browser, "close"); + + for (let index = 0; index < 200; index += 1) { + browser.send( + JSON.stringify({ + type: "req", + id: `burst-limit-${index}`, + method: "noop", + params: { index }, + }) + ); + } + + const [closeCode, closeReason] = await closePromise; + expect(closeCode).toBe(1008); + expect(closeReason.toString()).toBe("rate limit exceeded"); + } finally { + for (const client of upstream.clients) { + client.close(); + } + await Promise.all([ + closeWebSocket(browser), + closeWebSocketServer(upstream), + closeHttpServer(proxyHttp), + ]); + } + }); + });