In the previous post, we already created RSocket endpoints with different interaction modes. In this post, we'll discuss how to use WebSocket in browser to work with RSocket messaging endpoints in Spring. We'll use stringSplit endpoint with request/stream mode.

Connection

The rsocket-websocket-client library is used to connect to RSocket servers. MessageService listed below is the main class to handle RSocket connections and messages. RSocketClient is the RSocket client in JavaScript. In the setup config, we set values to the following properties:

  • keepAlive: Specify the time interval between KEEPALIVE frames.
  • lifetime: Max time allowed after not receiving a KEEPALIVE frame.
  • dataMimeType: MIME type of data.
  • metadataMimeType: MIME type of metadata. MESSAGE_RSOCKET_COMPOSITE_METADATA.string is the MIME type that allows multiple entries in the metadata.

The transport of RSocketClient is set to RSocketWebSocketClient with given WebSocket URL. BufferEncoders specifies that Buffer is used to encode data and metadata.

RSocketClient.connect() is used to connect to the server. After connected, this._socket is a ReactiveSocket object to send and receive messages.

In the send() function, we need to encode route using MIME type MESSAGE_RSOCKET_ROUTING, this is how Spring RSocket integration gets the route to handle messages. The encoding of route is done in encodeRoute() function following specification of Routing Metadata Extension. The encoded route metadata will be put into a composite metadata object using encodeAndAddWellKnownMetadata() function. Then the message is sent to server using requestStream() with data and metadata. When receiving messages from the server, messageCallback is invoked.

import { RSocketClient, MAX_STREAM_ID } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { encodeAndAddWellKnownMetadata } from 'rsocket-core';
import {
  BufferEncoders,
  MESSAGE_RSOCKET_COMPOSITE_METADATA,
  MESSAGE_RSOCKET_ROUTING
} from 'rsocket-core';

export default class MessageService {
  constructor(connectCallback, messageCallback) {
    this._client = new RSocketClient({
      setup: {
        keepAlive: 5000,
        lifetime: 30000,
        dataMimeType: 'application/octet-stream',
        metadataMimeType: MESSAGE_RSOCKET_COMPOSITE_METADATA.string,
      },
      transport: new RSocketWebSocketClient({
        // eslint-disable-next-line no-restricted-globals
        url: `ws://${location.host}/ws`
      }, BufferEncoders),
    });
    this._connectCallback = connectCallback;
    this._messageCallback = messageCallback;
  }

  connect() {
    this._client.connect().then(
      (socket) => {
        this._socket = socket;
        this._connectCallback(null);
      },
      (error) => this._connectCallback(error),
    );
  }

  send(message) {
    const routeMetadata = this.encodeRoute('stringSplit');
    const metadata = encodeAndAddWellKnownMetadata(
      Buffer.alloc(0),
      MESSAGE_RSOCKET_ROUTING,
      routeMetadata
    );
    this._socket.requestStream({
      data: Buffer.from(message, 'utf8'),
      metadata,
    }).subscribe({
      onNext: (payload) => this._messageCallback(null, payload.data.toString('utf8')),
      onError: (error) => this._messageCallback(error),
      onSubscribe: (_subscription) => _subscription.request(MAX_STREAM_ID),
    });
  }

  encodeRoute(route) {
    const length = Buffer.byteLength(route, 'utf8');
    const buffer = Buffer.alloc(1);
    buffer.writeInt8(length);
    return Buffer.concat([buffer, Buffer.from(route, 'utf8')]);
  }
}

UI

App below is a simple React component that allows user to input text and send to the server. Responses are also displayed.

import React from 'react';
import './App.css';
import ResponseMessage from "./ResponseMessage";
import MessageService from "./MessageService";

class App extends React.Component {
  constructor(props) {
    super(props);
    this.state = {
      connected: false,
      error: null,
      input: '',
      messages: [],
    };

    this.handleChange = this.handleChange.bind(this);
    this.handleSend = this.handleSend.bind(this);
    this._service = new MessageService(
      this.handleConnection.bind(this),
      this.handleResponseMessage.bind(this)
    );
  }

  handleConnection(error) {
    this.setState({
      connected: !error,
      error,
    });
  }

  handleChange(event) {
    this.setState({ input: event.target.value });
  }

  handleSend() {
    this._service.send(this.state.input);
  }

  handleResponseMessage(error, message) {
    if (error) {
      this.setState({
        error,
      });
      console.error(error);
    } else {
      this.setState({
        input: '',
        messages: [
          ...this.state.messages,
          {
            message,
            timestamp: new Date().toISOString(),
          },
        ],
      });
    }
  }

  componentDidMount() {
    this._service.connect();
  }

  render() {
    const { input, connected, error, messages } = this.state;
    return (
      <div className="App">
        <h1>RSocket WebSocket example</h1>
        {error && <div className="error">{error.message}</div>}
        <div>
          <label>
            Input:
            <input type="text"
              value={input}
              disabled={!connected}
              onChange={this.handleChange} />
            <button disabled={!connected}
              onClick={this.handleSend}>Send</button>
          </label>
        </div>
        <h2>Response messages</h2>
        {messages.map((message, index) => ResponseMessage({ message, index }))}
      </div>
    );
  }
}

export default App;

Below is the screenshot of the running app.

WebSocket with RSocket

Summary

To sum up, the most important thing is to use correct format to encode route. For the payload data, you are free to use text format or binary format.

Source code

Source code of this series can be found on GitHub.