// Package socketio is a minimal Engine.IO v4 + Socket.IO v5 client over // WebSocket. Just what the Fabric plugin needs: // // - WebSocket-only transport (no polling upgrade dance) // - Single namespace (defaults to "/realtime" — caller-supplied) // - CONNECT with caller-supplied auth payload (re-evaluated on every // reconnect via the AuthFunc callback — this is the bug fix // openclaw plugin specifically documented for socket.io-client-js // and that the available Go socket.io library doesn't address) // - Emit + receive named events with arbitrary JSON arg arrays // - PING/PONG heartbeat per server-supplied interval // - Caller-driven manual reconnect: connect returns when socket // closes; supervisor loop calls Connect again with a fresh token // // This is intentionally narrower than full Socket.IO — no rooms (server // joins us into rooms by event), no acks, no binary, no namespaces // other than what the constructor takes. package socketio import ( "context" "encoding/json" "errors" "fmt" "net/url" "sync" "nhooyr.io/websocket" ) // Engine.IO v4 packet types (first char of frame). const ( eioOpen byte = '0' eioClose byte = '1' eioPing byte = '2' eioPong byte = '3' eioMessage byte = '4' eioNoop byte = '6' ) // Socket.IO v5 packet types (first char inside an EIO message). const ( sioConnect byte = '0' sioDisconnect byte = '1' sioEvent byte = '2' sioAck byte = '3' sioConnectErr byte = '4' ) // AuthFunc returns the auth payload to send with CONNECT. Called on // every (re)connect so the supervisor can plug in a fresh token. type AuthFunc func(ctx context.Context) (map[string]any, error) // Handler is the per-event callback signature. args is the JSON array // payload after the event name; len(args) is usually 1 (one object). type Handler func(args []json.RawMessage) // Client is one Socket.IO connection. NOT safe for concurrent Emit; // caller serializes if it wants to multi-write. type Client struct { URL string // e.g. "ws://localhost:7002/socket.io/?EIO=4&transport=websocket" Namespace string // e.g. "/realtime"; "" → root namespace Auth AuthFunc // CONNECT auth payload // Read-only after Connect; mutating during a live connection is // undefined. handlers map[string]Handler handlerMu sync.RWMutex conn *websocket.Conn // closed-on-disconnect; Connect returns when this fires. disconnected chan struct{} } // New constructs a Client. host should be `ws://host:port` (or // `wss://`). path is typically "/socket.io/" — the Engine.IO query // params are appended automatically. func New(host, path, namespace string, auth AuthFunc) (*Client, error) { u, err := url.Parse(host) if err != nil { return nil, fmt.Errorf("socketio: parse host %q: %w", host, err) } switch u.Scheme { case "http": u.Scheme = "ws" case "https": u.Scheme = "wss" } if path == "" { path = "/socket.io/" } u.Path = path q := u.Query() q.Set("EIO", "4") q.Set("transport", "websocket") u.RawQuery = q.Encode() return &Client{ URL: u.String(), Namespace: namespace, Auth: auth, handlers: map[string]Handler{}, }, nil } // On registers a handler for an event name. Safe to call before Connect. // Replacing an existing handler is fine. func (c *Client) On(event string, h Handler) { c.handlerMu.Lock() defer c.handlerMu.Unlock() c.handlers[event] = h } // Emit sends an event with args. Server side receives `[event, args...]`. func (c *Client) Emit(ctx context.Context, event string, args ...any) error { if c.conn == nil { return errors.New("socketio: not connected") } payload := append([]any{event}, args...) body, err := json.Marshal(payload) if err != nil { return err } frame := buildEventFrame(c.Namespace, body) return c.conn.Write(ctx, websocket.MessageText, frame) } // Connect dials the server, completes the Engine.IO handshake + // Socket.IO CONNECT, then runs the read+heartbeat loop. Blocks until // the connection closes (either side) or ctx is cancelled. Returns the // terminating error (or nil for clean close). // // Caller-driven reconnect: wrap Connect in a loop that re-evaluates // the auth payload (token refresh) before each Call. func (c *Client) Connect(ctx context.Context) error { c.disconnected = make(chan struct{}) defer close(c.disconnected) authMap, err := c.Auth(ctx) if err != nil { return fmt.Errorf("auth: %w", err) } conn, _, err := websocket.Dial(ctx, c.URL, &websocket.DialOptions{ HTTPHeader: nil, }) if err != nil { return fmt.Errorf("dial %s: %w", c.URL, err) } c.conn = conn // websocket library default read limit is 32 KiB; bump for chunky // channel sync payloads. conn.SetReadLimit(1 << 20) // 1 MiB defer func() { _ = conn.Close(websocket.StatusNormalClosure, "") c.conn = nil }() // Engine.IO handshake: server sends `0{"sid":"...","upgrades":[...],"pingInterval":...,"pingTimeout":...}` if err := c.recvOpen(ctx); err != nil { return err } // Send Socket.IO CONNECT with auth. if err := c.sendConnect(ctx, authMap); err != nil { return fmt.Errorf("CONNECT: %w", err) } // Engine.IO v4 keepalive contract: the SERVER pings every // pingInterval; we MUST pong within pingTimeout or it disconnects. // We do NOT initiate pings ourselves (an earlier version did, which // caused Fabric guild backend to close the socket every ~25s — the // client-side ping was misinterpreted as a misbehaving peer). Server // pings are answered by handlePacket's eioPing case. for { _, data, err := conn.Read(ctx) if err != nil { if errors.Is(err, context.Canceled) { return nil } return fmt.Errorf("read: %w", err) } if len(data) == 0 { continue } if err := c.handlePacket(data); err != nil { return err } } } // Disconnect closes the underlying socket cleanly. Connect's read loop // will see EOF and return. func (c *Client) Disconnect() { if c.conn != nil { _ = c.conn.Close(websocket.StatusNormalClosure, "") } } // recvOpen reads the EIO "open" frame. The server announces its // pingInterval + pingTimeout there; we just acknowledge — the // keepalive cadence is server-driven (we pong on demand, see // handlePacket's eioPing case). func (c *Client) recvOpen(ctx context.Context) error { _, data, err := c.conn.Read(ctx) if err != nil { return fmt.Errorf("read open: %w", err) } if len(data) < 2 || data[0] != eioOpen { return fmt.Errorf("expected EIO open, got %q", string(data)) } // Parse just to fail fast on malformed open; the values are // informational (server enforces them). var info struct { Sid string `json:"sid"` } if err := json.Unmarshal(data[1:], &info); err != nil { return fmt.Errorf("parse open: %w", err) } return nil } // sendConnect: `4` (EIO message) + `0` (SIO CONNECT) + namespace,?json func (c *Client) sendConnect(ctx context.Context, auth map[string]any) error { body := []byte{eioMessage, sioConnect} if c.Namespace != "" && c.Namespace != "/" { body = append(body, []byte(c.Namespace+",")...) } if len(auth) > 0 { raw, err := json.Marshal(auth) if err != nil { return err } body = append(body, raw...) } return c.conn.Write(ctx, websocket.MessageText, body) } // handlePacket inspects the first byte (EIO type) + dispatches. func (c *Client) handlePacket(data []byte) error { switch data[0] { case eioPong: // Server doesn't normally send unsolicited pongs in v4; ignore // if it does. (Future: could be used to confirm an upgrade ack.) return nil case eioPing: // Server keepalive ping — MUST respond with pong inside // server's pingTimeout, else server closes the connection. if c.conn == nil { return nil } return c.conn.Write(context.Background(), websocket.MessageText, []byte{eioPong}) case eioClose: return io_EOF case eioMessage: if len(data) < 2 { return nil } return c.handleSIO(data[1:]) case eioNoop: return nil } return nil } // io_EOF is returned on EIO close packet so the supervisor loop knows // the server cleanly closed (vs network error). var io_EOF = errors.New("socketio: server initiated close") func (c *Client) handleSIO(data []byte) error { if len(data) == 0 { return nil } sioType := data[0] rest := data[1:] // Skip namespace prefix if present (e.g. "/realtime,"). if c.Namespace != "" && c.Namespace != "/" && len(rest) > len(c.Namespace) && string(rest[:len(c.Namespace)]) == c.Namespace && rest[len(c.Namespace)] == ',' { rest = rest[len(c.Namespace)+1:] } switch sioType { case sioConnect: // Server ack of our CONNECT. Body is `{"sid":"..."}`; we don't // need anything from it. return nil case sioDisconnect: return io_EOF case sioConnectErr: return fmt.Errorf("socketio: CONNECT_ERROR: %s", string(rest)) case sioEvent: return c.dispatchEvent(rest) case sioAck: // We don't use acks; ignore. return nil } return nil } func (c *Client) dispatchEvent(body []byte) error { var arr []json.RawMessage if err := json.Unmarshal(body, &arr); err != nil { return fmt.Errorf("dispatch parse: %w (body=%q)", err, string(body)) } if len(arr) == 0 { return nil } var event string if err := json.Unmarshal(arr[0], &event); err != nil { return fmt.Errorf("dispatch event-name: %w", err) } c.handlerMu.RLock() h := c.handlers[event] c.handlerMu.RUnlock() if h == nil { return nil // no subscriber, drop silently } h(arr[1:]) return nil } // buildEventFrame is exposed for the encoder unit test. func buildEventFrame(namespace string, body []byte) []byte { out := []byte{eioMessage, sioEvent} if namespace != "" && namespace != "/" { out = append(out, []byte(namespace+",")...) } return append(out, body...) }