From 4674c5a2b75172881d528d1812cb78dcdbc932ed Mon Sep 17 00:00:00 2001 From: hzhang Date: Sun, 31 May 2026 20:19:58 +0100 Subject: [PATCH] fix(socketio): remove broken client-initiated ping (engine.io v4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The socket.io client was sending its own EIO ping frames every pingInterval (default 25s). That's wrong for engine.io v4: in v4 the SERVER initiates pings and the CLIENT must respond with pong inside pingTimeout, else the server closes the connection. Client-initiated pings get misinterpreted by Fabric's NestJS socket.io backend, which quietly closes the connection — producing the warn-flap every ~25s: inbound: socket ended; reconnecting err="read: failed to get reader: received close frame: status = StatusNoStatusRcvd and reason = \"\"" Fix: - delete pingLoop() entirely - delete the pingPeriod/pingTimeout struct fields + their assignments in recvOpen (server enforces both anyway; client doesn't need them) - keep the eioPing case in handlePacket (already correct — responds with pong) - drop the now-unused "time" import End-to-end verified on live Fabric: - Restarted Plexum at 20:17:35; watched for 90+ seconds - ZERO "socket ended" events (vs. ~3-4 per 90s before the fix) - Channel inbound still delivers: alice posted seq=20 → gem agent (gemini CLI) replied seq=21 "pong" The plugin no longer flaps. Reconnect backoff machinery (1s→60s) stays in place as a safety net for genuine network drops. Co-Authored-By: Claude Opus 4.7 --- internal/socketio/client.go | 64 ++++++++++++------------------------- 1 file changed, 21 insertions(+), 43 deletions(-) diff --git a/internal/socketio/client.go b/internal/socketio/client.go index aae2e69..8c558a3 100644 --- a/internal/socketio/client.go +++ b/internal/socketio/client.go @@ -24,7 +24,6 @@ import ( "fmt" "net/url" "sync" - "time" "nhooyr.io/websocket" ) @@ -65,11 +64,9 @@ type Client struct { // Read-only after Connect; mutating during a live connection is // undefined. - handlers map[string]Handler - handlerMu sync.RWMutex - conn *websocket.Conn - pingPeriod time.Duration // from server "open" packet - pingTimeout time.Duration + handlers map[string]Handler + handlerMu sync.RWMutex + conn *websocket.Conn // closed-on-disconnect; Connect returns when this fires. disconnected chan struct{} @@ -168,10 +165,12 @@ func (c *Client) Connect(ctx context.Context) error { return fmt.Errorf("CONNECT: %w", err) } - pingCtx, cancelPing := context.WithCancel(ctx) - defer cancelPing() - go c.pingLoop(pingCtx) - + // Engine.IO v4 keepalive contract: the SERVER pings every + // pingInterval; we MUST pong within pingTimeout or it disconnects. + // We do NOT initiate pings ourselves (an earlier version did, which + // caused Fabric guild backend to close the socket every ~25s — the + // client-side ping was misinterpreted as a misbehaving peer). Server + // pings are answered by handlePacket's eioPing case. for { _, data, err := conn.Read(ctx) if err != nil { @@ -197,7 +196,10 @@ func (c *Client) Disconnect() { } } -// recvOpen reads the EIO "open" frame and stashes ping intervals. +// recvOpen reads the EIO "open" frame. The server announces its +// pingInterval + pingTimeout there; we just acknowledge — the +// keepalive cadence is server-driven (we pong on demand, see +// handlePacket's eioPing case). func (c *Client) recvOpen(ctx context.Context) error { _, data, err := c.conn.Read(ctx) if err != nil { @@ -206,19 +208,14 @@ func (c *Client) recvOpen(ctx context.Context) error { if len(data) < 2 || data[0] != eioOpen { return fmt.Errorf("expected EIO open, got %q", string(data)) } + // Parse just to fail fast on malformed open; the values are + // informational (server enforces them). var info struct { - Sid string `json:"sid"` - PingInterval int `json:"pingInterval"` - PingTimeout int `json:"pingTimeout"` + Sid string `json:"sid"` } if err := json.Unmarshal(data[1:], &info); err != nil { return fmt.Errorf("parse open: %w", err) } - c.pingPeriod = time.Duration(info.PingInterval) * time.Millisecond - c.pingTimeout = time.Duration(info.PingTimeout) * time.Millisecond - if c.pingPeriod <= 0 { - c.pingPeriod = 25 * time.Second // EIO default - } return nil } @@ -238,35 +235,16 @@ func (c *Client) sendConnect(ctx context.Context, auth map[string]any) error { return c.conn.Write(ctx, websocket.MessageText, body) } -// pingLoop sends EIO ping frames per server-supplied interval. -func (c *Client) pingLoop(ctx context.Context) { - ticker := time.NewTicker(c.pingPeriod) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - if c.conn == nil { - return - } - writeCtx, cancel := context.WithTimeout(ctx, c.pingTimeout) - err := c.conn.Write(writeCtx, websocket.MessageText, []byte{eioPing}) - cancel() - if err != nil { - return - } - } - } -} - // handlePacket inspects the first byte (EIO type) + dispatches. func (c *Client) handlePacket(data []byte) error { switch data[0] { case eioPong: - return nil // server responding to our ping (or vice versa) + // Server doesn't normally send unsolicited pongs in v4; ignore + // if it does. (Future: could be used to confirm an upgrade ack.) + return nil case eioPing: - // Server-initiated ping; reply with pong. + // Server keepalive ping — MUST respond with pong inside + // server's pingTimeout, else server closes the connection. if c.conn == nil { return nil }