Files
Plexum-fabric-channel-plugin/internal/socketio/client.go
hzhang 4674c5a2b7 fix(socketio): remove broken client-initiated ping (engine.io v4)
The socket.io client was sending its own EIO ping frames every
pingInterval (default 25s). That's wrong for engine.io v4: in v4 the
SERVER initiates pings and the CLIENT must respond with pong inside
pingTimeout, else the server closes the connection. Client-initiated
pings get misinterpreted by Fabric's NestJS socket.io backend, which
quietly closes the connection — producing the warn-flap every ~25s:

  inbound: socket ended; reconnecting
    err="read: failed to get reader: received close frame:
         status = StatusNoStatusRcvd and reason = \"\""

Fix:
- delete pingLoop() entirely
- delete the pingPeriod/pingTimeout struct fields + their assignments
  in recvOpen (server enforces both anyway; client doesn't need them)
- keep the eioPing case in handlePacket (already correct — responds
  with pong)
- drop the now-unused "time" import

End-to-end verified on live Fabric:
- Restarted Plexum at 20:17:35; watched for 90+ seconds
- ZERO "socket ended" events (vs. ~3-4 per 90s before the fix)
- Channel inbound still delivers: alice posted seq=20 → gem agent
  (gemini CLI) replied seq=21 "pong"

The plugin no longer flaps. Reconnect backoff machinery (1s→60s)
stays in place as a safety net for genuine network drops.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-31 20:19:58 +01:00

328 lines
9.5 KiB
Go

// Package socketio is a minimal Engine.IO v4 + Socket.IO v5 client over
// WebSocket. Just what the Fabric plugin needs:
//
// - WebSocket-only transport (no polling upgrade dance)
// - Single namespace (defaults to "/realtime" — caller-supplied)
// - CONNECT with caller-supplied auth payload (re-evaluated on every
// reconnect via the AuthFunc callback — this is the bug fix
// openclaw plugin specifically documented for socket.io-client-js
// and that the available Go socket.io library doesn't address)
// - Emit + receive named events with arbitrary JSON arg arrays
// - PING/PONG heartbeat per server-supplied interval
// - Caller-driven manual reconnect: connect returns when socket
// closes; supervisor loop calls Connect again with a fresh token
//
// This is intentionally narrower than full Socket.IO — no rooms (server
// joins us into rooms by event), no acks, no binary, no namespaces
// other than what the constructor takes.
package socketio
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/url"
"sync"
"nhooyr.io/websocket"
)
// Engine.IO v4 packet types (first char of frame).
const (
eioOpen byte = '0'
eioClose byte = '1'
eioPing byte = '2'
eioPong byte = '3'
eioMessage byte = '4'
eioNoop byte = '6'
)
// Socket.IO v5 packet types (first char inside an EIO message).
const (
sioConnect byte = '0'
sioDisconnect byte = '1'
sioEvent byte = '2'
sioAck byte = '3'
sioConnectErr byte = '4'
)
// AuthFunc returns the auth payload to send with CONNECT. Called on
// every (re)connect so the supervisor can plug in a fresh token.
type AuthFunc func(ctx context.Context) (map[string]any, error)
// Handler is the per-event callback signature. args is the JSON array
// payload after the event name; len(args) is usually 1 (one object).
type Handler func(args []json.RawMessage)
// Client is one Socket.IO connection. NOT safe for concurrent Emit;
// caller serializes if it wants to multi-write.
type Client struct {
URL string // e.g. "ws://localhost:7002/socket.io/?EIO=4&transport=websocket"
Namespace string // e.g. "/realtime"; "" → root namespace
Auth AuthFunc // CONNECT auth payload
// Read-only after Connect; mutating during a live connection is
// undefined.
handlers map[string]Handler
handlerMu sync.RWMutex
conn *websocket.Conn
// closed-on-disconnect; Connect returns when this fires.
disconnected chan struct{}
}
// New constructs a Client. host should be `ws://host:port` (or
// `wss://`). path is typically "/socket.io/" — the Engine.IO query
// params are appended automatically.
func New(host, path, namespace string, auth AuthFunc) (*Client, error) {
u, err := url.Parse(host)
if err != nil {
return nil, fmt.Errorf("socketio: parse host %q: %w", host, err)
}
switch u.Scheme {
case "http":
u.Scheme = "ws"
case "https":
u.Scheme = "wss"
}
if path == "" {
path = "/socket.io/"
}
u.Path = path
q := u.Query()
q.Set("EIO", "4")
q.Set("transport", "websocket")
u.RawQuery = q.Encode()
return &Client{
URL: u.String(),
Namespace: namespace,
Auth: auth,
handlers: map[string]Handler{},
}, nil
}
// On registers a handler for an event name. Safe to call before Connect.
// Replacing an existing handler is fine.
func (c *Client) On(event string, h Handler) {
c.handlerMu.Lock()
defer c.handlerMu.Unlock()
c.handlers[event] = h
}
// Emit sends an event with args. Server side receives `[event, args...]`.
func (c *Client) Emit(ctx context.Context, event string, args ...any) error {
if c.conn == nil {
return errors.New("socketio: not connected")
}
payload := append([]any{event}, args...)
body, err := json.Marshal(payload)
if err != nil {
return err
}
frame := buildEventFrame(c.Namespace, body)
return c.conn.Write(ctx, websocket.MessageText, frame)
}
// Connect dials the server, completes the Engine.IO handshake +
// Socket.IO CONNECT, then runs the read+heartbeat loop. Blocks until
// the connection closes (either side) or ctx is cancelled. Returns the
// terminating error (or nil for clean close).
//
// Caller-driven reconnect: wrap Connect in a loop that re-evaluates
// the auth payload (token refresh) before each Call.
func (c *Client) Connect(ctx context.Context) error {
c.disconnected = make(chan struct{})
defer close(c.disconnected)
authMap, err := c.Auth(ctx)
if err != nil {
return fmt.Errorf("auth: %w", err)
}
conn, _, err := websocket.Dial(ctx, c.URL, &websocket.DialOptions{
HTTPHeader: nil,
})
if err != nil {
return fmt.Errorf("dial %s: %w", c.URL, err)
}
c.conn = conn
// websocket library default read limit is 32 KiB; bump for chunky
// channel sync payloads.
conn.SetReadLimit(1 << 20) // 1 MiB
defer func() {
_ = conn.Close(websocket.StatusNormalClosure, "")
c.conn = nil
}()
// Engine.IO handshake: server sends `0{"sid":"...","upgrades":[...],"pingInterval":...,"pingTimeout":...}`
if err := c.recvOpen(ctx); err != nil {
return err
}
// Send Socket.IO CONNECT with auth.
if err := c.sendConnect(ctx, authMap); err != nil {
return fmt.Errorf("CONNECT: %w", err)
}
// Engine.IO v4 keepalive contract: the SERVER pings every
// pingInterval; we MUST pong within pingTimeout or it disconnects.
// We do NOT initiate pings ourselves (an earlier version did, which
// caused Fabric guild backend to close the socket every ~25s — the
// client-side ping was misinterpreted as a misbehaving peer). Server
// pings are answered by handlePacket's eioPing case.
for {
_, data, err := conn.Read(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("read: %w", err)
}
if len(data) == 0 {
continue
}
if err := c.handlePacket(data); err != nil {
return err
}
}
}
// Disconnect closes the underlying socket cleanly. Connect's read loop
// will see EOF and return.
func (c *Client) Disconnect() {
if c.conn != nil {
_ = c.conn.Close(websocket.StatusNormalClosure, "")
}
}
// recvOpen reads the EIO "open" frame. The server announces its
// pingInterval + pingTimeout there; we just acknowledge — the
// keepalive cadence is server-driven (we pong on demand, see
// handlePacket's eioPing case).
func (c *Client) recvOpen(ctx context.Context) error {
_, data, err := c.conn.Read(ctx)
if err != nil {
return fmt.Errorf("read open: %w", err)
}
if len(data) < 2 || data[0] != eioOpen {
return fmt.Errorf("expected EIO open, got %q", string(data))
}
// Parse just to fail fast on malformed open; the values are
// informational (server enforces them).
var info struct {
Sid string `json:"sid"`
}
if err := json.Unmarshal(data[1:], &info); err != nil {
return fmt.Errorf("parse open: %w", err)
}
return nil
}
// sendConnect: `4` (EIO message) + `0` (SIO CONNECT) + namespace,?json
func (c *Client) sendConnect(ctx context.Context, auth map[string]any) error {
body := []byte{eioMessage, sioConnect}
if c.Namespace != "" && c.Namespace != "/" {
body = append(body, []byte(c.Namespace+",")...)
}
if len(auth) > 0 {
raw, err := json.Marshal(auth)
if err != nil {
return err
}
body = append(body, raw...)
}
return c.conn.Write(ctx, websocket.MessageText, body)
}
// handlePacket inspects the first byte (EIO type) + dispatches.
func (c *Client) handlePacket(data []byte) error {
switch data[0] {
case eioPong:
// Server doesn't normally send unsolicited pongs in v4; ignore
// if it does. (Future: could be used to confirm an upgrade ack.)
return nil
case eioPing:
// Server keepalive ping — MUST respond with pong inside
// server's pingTimeout, else server closes the connection.
if c.conn == nil {
return nil
}
return c.conn.Write(context.Background(), websocket.MessageText, []byte{eioPong})
case eioClose:
return io_EOF
case eioMessage:
if len(data) < 2 {
return nil
}
return c.handleSIO(data[1:])
case eioNoop:
return nil
}
return nil
}
// io_EOF is returned on EIO close packet so the supervisor loop knows
// the server cleanly closed (vs network error).
var io_EOF = errors.New("socketio: server initiated close")
func (c *Client) handleSIO(data []byte) error {
if len(data) == 0 {
return nil
}
sioType := data[0]
rest := data[1:]
// Skip namespace prefix if present (e.g. "/realtime,").
if c.Namespace != "" && c.Namespace != "/" && len(rest) > len(c.Namespace) &&
string(rest[:len(c.Namespace)]) == c.Namespace && rest[len(c.Namespace)] == ',' {
rest = rest[len(c.Namespace)+1:]
}
switch sioType {
case sioConnect:
// Server ack of our CONNECT. Body is `{"sid":"..."}`; we don't
// need anything from it.
return nil
case sioDisconnect:
return io_EOF
case sioConnectErr:
return fmt.Errorf("socketio: CONNECT_ERROR: %s", string(rest))
case sioEvent:
return c.dispatchEvent(rest)
case sioAck:
// We don't use acks; ignore.
return nil
}
return nil
}
func (c *Client) dispatchEvent(body []byte) error {
var arr []json.RawMessage
if err := json.Unmarshal(body, &arr); err != nil {
return fmt.Errorf("dispatch parse: %w (body=%q)", err, string(body))
}
if len(arr) == 0 {
return nil
}
var event string
if err := json.Unmarshal(arr[0], &event); err != nil {
return fmt.Errorf("dispatch event-name: %w", err)
}
c.handlerMu.RLock()
h := c.handlers[event]
c.handlerMu.RUnlock()
if h == nil {
return nil // no subscriber, drop silently
}
h(arr[1:])
return nil
}
// buildEventFrame is exposed for the encoder unit test.
func buildEventFrame(namespace string, body []byte) []byte {
out := []byte{eioMessage, sioEvent}
if namespace != "" && namespace != "/" {
out = append(out, []byte(namespace+",")...)
}
return append(out, body...)
}