fix(gateway): tolerate reconnect bursts without dropping sessions (#100)

Relax the proxy frame limiter to allow normal startup traffic while preserving abuse protection, and slow reconnect retries after policy-violation disconnects so remote gateways can recover cleanly.

Made-with: Cursor

Co-authored-by: iamlukethedev <lucas.guilherme@smartwayslfl.com>
This commit is contained in:
Luke The Dev
2026-04-03 23:19:59 -05:00
committed by GitHub
parent a18c8c630c
commit b573646f2d
4 changed files with 277 additions and 41 deletions
+40 -11
View File
@@ -6,8 +6,11 @@ const DEFAULT_UPSTREAM_HANDSHAKE_TIMEOUT_MS = 10_000;
/** Maximum frame payload size (256 KB). */ /** Maximum frame payload size (256 KB). */
const MAX_FRAME_SIZE = 256 * 1024; const MAX_FRAME_SIZE = 256 * 1024;
/** Maximum frames per connection per second. */ /** Sustained frame rate per connection. */
const MAX_FRAMES_PER_SECOND = 30; const MAX_FRAMES_PER_SECOND = 60;
/** Allow short startup bursts before rate limiting. */
const MAX_FRAME_BURST = 120;
const buildErrorResponse = (id, code, message) => { const buildErrorResponse = (id, code, message) => {
return { return {
@@ -28,19 +31,34 @@ const safeJsonParse = (raw) => {
} }
}; };
/** Per-connection frame rate limiter. */ /** Per-connection token bucket rate limiter. */
const createFrameRateLimiter = (maxPerSecond = MAX_FRAMES_PER_SECOND) => { const createFrameRateLimiter = (
let count = 0; maxPerSecond = MAX_FRAMES_PER_SECOND,
const interval = setInterval(() => { maxBurst = MAX_FRAME_BURST
count = 0; ) => {
}, 1000); let tokens = maxBurst;
interval.unref(); let lastRefillAt = Date.now();
const refill = () => {
const now = Date.now();
const elapsedMs = Math.max(0, now - lastRefillAt);
if (elapsedMs <= 0) return;
const replenished = (elapsedMs / 1000) * maxPerSecond;
tokens = Math.min(maxBurst, tokens + replenished);
lastRefillAt = now;
};
return { return {
check() { check() {
return ++count <= maxPerSecond; refill();
if (tokens < 1) {
return false;
}
tokens -= 1;
return true;
}, },
destroy() { destroy() {
clearInterval(interval); // No-op: token bucket has no timers to clean up.
}, },
}; };
}; };
@@ -318,6 +336,9 @@ function createGatewayProxy(options) {
: Buffer.isBuffer(reasonBuffer) : Buffer.isBuffer(reasonBuffer)
? reasonBuffer.toString() ? reasonBuffer.toString()
: ""; : "";
log(
`[gateway-proxy] upstream closed code=${code} reason=${reason || "(none)"} hadConnect=${Boolean(connectRequestId)} responseSent=${connectResponseSent}`
);
if (!connectRequestId) { if (!connectRequestId) {
pendingUpstreamSetupError ||= { pendingUpstreamSetupError ||= {
code: "studio.upstream_closed", code: "studio.upstream_closed",
@@ -384,6 +405,13 @@ function createGatewayProxy(options) {
// Rate limiting // Rate limiting
if (!frameRateLimiter.check()) { if (!frameRateLimiter.check()) {
log(
"[gateway-proxy] proxy rate limit hit (>" +
MAX_FRAMES_PER_SECOND +
" frames/s sustained, burst " +
MAX_FRAME_BURST +
")"
);
closeBoth(1008, "rate limit exceeded"); closeBoth(1008, "rate limit exceeded");
return; return;
} }
@@ -438,6 +466,7 @@ function createGatewayProxy(options) {
}); });
browserWs.on("close", () => { browserWs.on("close", () => {
log("[gateway-proxy] browser disconnected");
closeBoth(1000, "client closed"); closeBoth(1000, "client closed");
}); });
+29 -5
View File
@@ -265,6 +265,7 @@ export class GatewayClient {
private rejectConnect: ((error: Error) => void) | null = null; private rejectConnect: ((error: Error) => void) | null = null;
private manualDisconnect = false; private manualDisconnect = false;
private lastHello: GatewayHelloOk | null = null; private lastHello: GatewayHelloOk | null = null;
private _lastDisconnectCode: number | null = null;
onStatus(handler: StatusHandler) { onStatus(handler: StatusHandler) {
this.statusHandlers.add(handler); this.statusHandlers.add(handler);
@@ -323,6 +324,7 @@ export class GatewayClient {
}, },
onClose: ({ code, reason }) => { onClose: ({ code, reason }) => {
if (this.client !== nextClient) return; if (this.client !== nextClient) return;
this._lastDisconnectCode = code;
const connectFailed = const connectFailed =
code === CONNECT_FAILED_CLOSE_CODE ? parseConnectFailedCloseReason(reason) : null; code === CONNECT_FAILED_CLOSE_CODE ? parseConnectFailedCloseReason(reason) : null;
const err = connectFailed const err = connectFailed
@@ -414,6 +416,10 @@ export class GatewayClient {
return this.lastHello; return this.lastHello;
} }
get lastDisconnectCode() {
return this._lastDisconnectCode;
}
private updateStatus(status: GatewayStatus) { private updateStatus(status: GatewayStatus) {
this.status = status; this.status = status;
this.statusHandlers.forEach((handler) => handler(status)); this.statusHandlers.forEach((handler) => handler(status));
@@ -654,6 +660,10 @@ const isNonRetryableConnectErrorCode = (code: string | null): boolean => {
return NON_RETRYABLE_CONNECT_ERROR_CODES.has(normalized); return NON_RETRYABLE_CONNECT_ERROR_CODES.has(normalized);
}; };
/** WebSocket close code 1008 = policy violation (rate limit). */
const WS_CLOSE_POLICY_VIOLATION = 1008;
const RATE_LIMIT_RETRY_DELAY_MS = 15_000;
export const resolveGatewayAutoRetryDelayMs = (params: { export const resolveGatewayAutoRetryDelayMs = (params: {
status: GatewayStatus; status: GatewayStatus;
didAutoConnect: boolean; didAutoConnect: boolean;
@@ -662,6 +672,7 @@ export const resolveGatewayAutoRetryDelayMs = (params: {
gatewayUrl: string; gatewayUrl: string;
errorMessage: string | null; errorMessage: string | null;
connectErrorCode: string | null; connectErrorCode: string | null;
lastDisconnectCode: number | null;
attempt: number; attempt: number;
}): number | null => { }): number | null => {
if (params.status !== "disconnected") return null; if (params.status !== "disconnected") return null;
@@ -673,8 +684,13 @@ export const resolveGatewayAutoRetryDelayMs = (params: {
if (isNonRetryableConnectErrorCode(params.connectErrorCode)) return null; if (isNonRetryableConnectErrorCode(params.connectErrorCode)) return null;
if (params.connectErrorCode === null && isAuthError(params.errorMessage)) return null; if (params.connectErrorCode === null && isAuthError(params.errorMessage)) return null;
const baseDelay =
params.lastDisconnectCode === WS_CLOSE_POLICY_VIOLATION
? Math.max(INITIAL_RETRY_DELAY_MS, RATE_LIMIT_RETRY_DELAY_MS)
: INITIAL_RETRY_DELAY_MS;
return Math.min( return Math.min(
INITIAL_RETRY_DELAY_MS * Math.pow(1.5, params.attempt), baseDelay * Math.pow(1.5, params.attempt),
MAX_RETRY_DELAY_MS MAX_RETRY_DELAY_MS
); );
}; };
@@ -911,7 +927,6 @@ export const useGatewayConnection = (
setDetectedAdapterType("custom"); setDetectedAdapterType("custom");
setStatus("connected"); setStatus("connected");
setConnectErrorCode(null); setConnectErrorCode(null);
retryAttemptRef.current = 0;
gatewayDebugLog("connect:custom-success", { gatewayUrl }); gatewayDebugLog("connect:custom-success", { gatewayUrl });
} catch (err) { } catch (err) {
setStatus("disconnected"); setStatus("disconnected");
@@ -975,7 +990,6 @@ export const useGatewayConnection = (
? hello.adapterType ? hello.adapterType
: "openclaw"; : "openclaw";
setDetectedAdapterType(nextDetectedAdapterType); setDetectedAdapterType(nextDetectedAdapterType);
retryAttemptRef.current = 0;
setHasLastKnownGoodState(nextDetectedAdapterType === selectedAdapterType); setHasLastKnownGoodState(nextDetectedAdapterType === selectedAdapterType);
settingsCoordinator.schedulePatch({ settingsCoordinator.schedulePatch({
gateway: { gateway: {
@@ -1037,6 +1051,7 @@ export const useGatewayConnection = (
gatewayUrl, gatewayUrl,
errorMessage: error, errorMessage: error,
connectErrorCode, connectErrorCode,
lastDisconnectCode: client.lastDisconnectCode,
attempt, attempt,
}); });
if (!isAutoManagedAdapter(selectedAdapterType)) return; if (!isAutoManagedAdapter(selectedAdapterType)) return;
@@ -1049,12 +1064,15 @@ export const useGatewayConnection = (
status, status,
}); });
retryTimerRef.current = setTimeout(() => { retryTimerRef.current = setTimeout(() => {
// Call connect first (it synchronously resets retryAttemptRef to 0),
// then override with the correct attempt count so the next auto-retry
// uses proper exponential backoff.
void connect();
retryAttemptRef.current = attempt + 1; retryAttemptRef.current = attempt + 1;
gatewayDebugLog("auto-retry-fire", { gatewayDebugLog("auto-retry-fire", {
selectedAdapterType, selectedAdapterType,
attempt: retryAttemptRef.current, attempt: retryAttemptRef.current,
}); });
void connect();
}, delay); }, delay);
return () => { return () => {
@@ -1065,11 +1083,17 @@ export const useGatewayConnection = (
}; };
}, [connect, connectErrorCode, error, gatewayUrl, selectedAdapterType, status]); }, [connect, connectErrorCode, error, gatewayUrl, selectedAdapterType, status]);
// Reset retry count on successful connection // Reset retry count after the connection has been stable for a minimum
// duration. If the upstream drops the connection quickly (e.g. within a
// few seconds), keeping the current attempt count lets exponential backoff
// work properly instead of hammering the gateway every 2 seconds.
useEffect(() => { useEffect(() => {
if (status === "connected") { if (status === "connected") {
hasConnectedOnceRef.current = true; hasConnectedOnceRef.current = true;
const stableTimer = setTimeout(() => {
retryAttemptRef.current = 0; retryAttemptRef.current = 0;
}, 10_000);
return () => clearTimeout(stableTimer);
} }
}, [status]); }, [status]);
+45 -23
View File
@@ -2,17 +2,24 @@ import { describe, expect, it } from "vitest";
import { resolveGatewayAutoRetryDelayMs } from "@/lib/gateway/GatewayClient"; import { resolveGatewayAutoRetryDelayMs } from "@/lib/gateway/GatewayClient";
describe("resolveGatewayAutoRetryDelayMs", () => { const baseParams = {
it("does not retry when upstream gateway url is missing on Studio host", () => { status: "disconnected" as const,
const delay = resolveGatewayAutoRetryDelayMs({
status: "disconnected",
didAutoConnect: true, didAutoConnect: true,
hasConnectedOnce: true, hasConnectedOnce: true,
wasManualDisconnect: false, wasManualDisconnect: false,
gatewayUrl: "wss://remote.example", gatewayUrl: "wss://remote.example",
errorMessage: null as string | null,
connectErrorCode: null as string | null,
lastDisconnectCode: null as number | null,
attempt: 0,
};
describe("resolveGatewayAutoRetryDelayMs", () => {
it("does not retry when upstream gateway url is missing on Studio host", () => {
const delay = resolveGatewayAutoRetryDelayMs({
...baseParams,
errorMessage: "Gateway error (studio.gateway_url_missing): Upstream gateway URL is missing.", errorMessage: "Gateway error (studio.gateway_url_missing): Upstream gateway URL is missing.",
connectErrorCode: "studio.gateway_url_missing", connectErrorCode: "studio.gateway_url_missing",
attempt: 0,
}); });
expect(delay).toBeNull(); expect(delay).toBeNull();
@@ -20,15 +27,10 @@ describe("resolveGatewayAutoRetryDelayMs", () => {
it("does not retry when the upstream websocket upgrade fails", () => { it("does not retry when the upstream websocket upgrade fails", () => {
const delay = resolveGatewayAutoRetryDelayMs({ const delay = resolveGatewayAutoRetryDelayMs({
status: "disconnected", ...baseParams,
didAutoConnect: true,
hasConnectedOnce: true,
wasManualDisconnect: false,
gatewayUrl: "wss://remote.example",
errorMessage: errorMessage:
"Gateway error (studio.upstream_error): Failed to connect to upstream gateway WebSocket.", "Gateway error (studio.upstream_error): Failed to connect to upstream gateway WebSocket.",
connectErrorCode: "studio.upstream_error", connectErrorCode: "studio.upstream_error",
attempt: 0,
}); });
expect(delay).toBeNull(); expect(delay).toBeNull();
@@ -36,15 +38,10 @@ describe("resolveGatewayAutoRetryDelayMs", () => {
it("does not retry when the upstream websocket handshake times out", () => { it("does not retry when the upstream websocket handshake times out", () => {
const delay = resolveGatewayAutoRetryDelayMs({ const delay = resolveGatewayAutoRetryDelayMs({
status: "disconnected", ...baseParams,
didAutoConnect: true,
hasConnectedOnce: true,
wasManualDisconnect: false,
gatewayUrl: "wss://remote.example",
errorMessage: errorMessage:
"Gateway error (studio.upstream_timeout): Timed out connecting Studio to the upstream gateway WebSocket.", "Gateway error (studio.upstream_timeout): Timed out connecting Studio to the upstream gateway WebSocket.",
connectErrorCode: "studio.upstream_timeout", connectErrorCode: "studio.upstream_timeout",
attempt: 0,
}); });
expect(delay).toBeNull(); expect(delay).toBeNull();
@@ -52,18 +49,43 @@ describe("resolveGatewayAutoRetryDelayMs", () => {
it("does not retry when the upstream gateway explicitly rejects pairing", () => { it("does not retry when the upstream gateway explicitly rejects pairing", () => {
const delay = resolveGatewayAutoRetryDelayMs({ const delay = resolveGatewayAutoRetryDelayMs({
status: "disconnected", ...baseParams,
didAutoConnect: true,
hasConnectedOnce: true,
wasManualDisconnect: false,
gatewayUrl: "wss://remote.example",
errorMessage: errorMessage:
"Gateway error (studio.upstream_rejected): Upstream gateway rejected connect (1008): pairing required.", "Gateway error (studio.upstream_rejected): Upstream gateway rejected connect (1008): pairing required.",
connectErrorCode: "studio.upstream_rejected", connectErrorCode: "studio.upstream_rejected",
attempt: 0,
}); });
expect(delay).toBeNull(); expect(delay).toBeNull();
}); });
it("uses a longer base delay when disconnected by rate limiting (code 1008)", () => {
const delay = resolveGatewayAutoRetryDelayMs({
...baseParams,
lastDisconnectCode: 1008,
attempt: 0,
});
expect(delay).toBe(15_000);
});
it("applies exponential backoff on top of rate-limit base delay", () => {
const delay = resolveGatewayAutoRetryDelayMs({
...baseParams,
lastDisconnectCode: 1008,
attempt: 1,
});
expect(delay).toBe(22_500);
});
it("uses standard base delay for normal disconnects", () => {
const delay = resolveGatewayAutoRetryDelayMs({
...baseParams,
lastDisconnectCode: 1012,
attempt: 0,
});
expect(delay).toBe(2_000);
});
}); });
+161
View File
@@ -712,4 +712,165 @@ describe("createGatewayProxy", () => {
} }
}); });
it("allows short bursts of post-connect traffic without closing the socket", async () => {
const upstream = new WebSocketServer({ port: 0 });
const address = upstream.address();
if (!address || typeof address === "string") {
throw new Error("expected upstream server to have a port");
}
const upstreamUrl = `ws://127.0.0.1:${address.port}`;
upstream.on("connection", (ws) => {
ws.on("message", (raw) => {
const parsed = JSON.parse(String(raw));
if (parsed?.method === "connect") {
ws.send(
JSON.stringify({
type: "res",
id: parsed.id,
ok: true,
payload: { type: "hello-ok", protocol: 3, auth: {} },
})
);
}
});
});
const { createGatewayProxy } = await import("../../server/gateway-proxy");
const proxyHttp = await import("node:http").then((m) => m.createServer());
const proxy = createGatewayProxy({
loadUpstreamSettings: async () => ({ url: upstreamUrl, token: "host-token-456" }),
allowWs: (req: { url?: string }) => req.url === "/api/gateway/ws",
logError: () => {},
});
proxyHttp.on("upgrade", (req, socket, head) => proxy.handleUpgrade(req, socket, head));
await new Promise<void>((resolve) => proxyHttp.listen(0, "127.0.0.1", resolve));
const proxyAddr = proxyHttp.address();
if (!proxyAddr || typeof proxyAddr === "string") {
throw new Error("expected proxy server to have a port");
}
const browser = new WebSocket(`ws://127.0.0.1:${proxyAddr.port}/api/gateway/ws`);
try {
await waitForEvent(browser, "open");
browser.send(
JSON.stringify({
type: "req",
id: "connect-burst-ok",
method: "connect",
params: { auth: {} },
})
);
await waitForEvent(browser, "message");
for (let index = 0; index < 80; index += 1) {
browser.send(
JSON.stringify({
type: "req",
id: `burst-ok-${index}`,
method: "noop",
params: { index },
})
);
}
await new Promise((resolve) => setTimeout(resolve, 25));
expect(browser.readyState).toBe(WebSocket.OPEN);
} finally {
for (const client of upstream.clients) {
client.close();
}
await Promise.all([
closeWebSocket(browser),
closeWebSocketServer(upstream),
closeHttpServer(proxyHttp),
]);
}
});
it("still rate limits abusive bursts that exceed the token bucket", async () => {
const upstream = new WebSocketServer({ port: 0 });
const address = upstream.address();
if (!address || typeof address === "string") {
throw new Error("expected upstream server to have a port");
}
const upstreamUrl = `ws://127.0.0.1:${address.port}`;
upstream.on("connection", (ws) => {
ws.on("message", (raw) => {
const parsed = JSON.parse(String(raw));
if (parsed?.method === "connect") {
ws.send(
JSON.stringify({
type: "res",
id: parsed.id,
ok: true,
payload: { type: "hello-ok", protocol: 3, auth: {} },
})
);
}
});
});
const { createGatewayProxy } = await import("../../server/gateway-proxy");
const proxyHttp = await import("node:http").then((m) => m.createServer());
const proxy = createGatewayProxy({
loadUpstreamSettings: async () => ({ url: upstreamUrl, token: "host-token-456" }),
allowWs: (req: { url?: string }) => req.url === "/api/gateway/ws",
logError: () => {},
});
proxyHttp.on("upgrade", (req, socket, head) => proxy.handleUpgrade(req, socket, head));
await new Promise<void>((resolve) => proxyHttp.listen(0, "127.0.0.1", resolve));
const proxyAddr = proxyHttp.address();
if (!proxyAddr || typeof proxyAddr === "string") {
throw new Error("expected proxy server to have a port");
}
const browser = new WebSocket(`ws://127.0.0.1:${proxyAddr.port}/api/gateway/ws`);
try {
await waitForEvent(browser, "open");
browser.send(
JSON.stringify({
type: "req",
id: "connect-burst-limit",
method: "connect",
params: { auth: {} },
})
);
await waitForEvent(browser, "message");
const closePromise = waitForEvent<[number, Buffer]>(browser, "close");
for (let index = 0; index < 200; index += 1) {
browser.send(
JSON.stringify({
type: "req",
id: `burst-limit-${index}`,
method: "noop",
params: { index },
})
);
}
const [closeCode, closeReason] = await closePromise;
expect(closeCode).toBe(1008);
expect(closeReason.toString()).toBe("rate limit exceeded");
} finally {
for (const client of upstream.clients) {
client.close();
}
await Promise.all([
closeWebSocket(browser),
closeWebSocketServer(upstream),
closeHttpServer(proxyHttp),
]);
}
});
}); });