RSocket is an application protocol that provides Reactive Streams semantics. RSocket is a binary protocol that can use different underlying transport layers, including TCP, WebSockets and Aeron. You can choose the best transport layer based on client requirements, runtime environments and device capabilities. The application logic doesn't need to change when switching transport layers. Comparing to HTTP, RSocket provides four interaction models that can be used in different scenarios.

  • request/response - Requester expects one response for a request.
  • request/stream - Requester expects a stream of responses for a request.
  • fire-and-forget - Requester doesn't expect responses for a request.
  • channel - Create a bidirectional channel between client and server.

RSocket Protocol

RSocket uses framing. A RSocket frame is a single message that contains a request, response, or protocol processing. A RSocket frame may be prepended with a 24 bits Frame Length field representing the length of frame in bytes. Depends on the underlying transport protocol used by RSocket, the Frame Length field may not be required.

  • For WebSocket and Aeron, Frame Length field is not required. RSocket frames are put into frames supported by the transport layer. *For TCP and HTTP/2 Stream, Frame Length field is required.

Header and Payload

RSocket frames begin with a RSocket Frame Header. The Frame Header includes following fields:

  • Stream ID is an unsigned 31 bits integer representing the stream identifier. 0 means this is a protocol processing message. Client uses odd numbers like 1, 3, 5, 7, etc as Stream IDs, while server uses even numbers like 2, 4, 6, 8, etc.
  • Frame Type is a 6 bits field representing the type of frame.
  • Flags is a 10 bits field for different flags. Supported flags depend on the type of frame.

RSocket supports two types of payload: Data and Metadata. Data and Metadata can be encoded in different formats. Some types of frames can attach Metadata. When Metadata is present, Frame Header field is followed by a Metadata Length field representing the length of Metadata. After Metadata Length field is a Metadata Payload field representing the content of Metadata. The Data payload begins after the Metadata Payload field. There is a flag in the Flags field to indicate whether Metadata is present.

Frame Types

There are currently 16 frame types in RSocket protocol. Some common frame types are listed below:

  • SETUP - Connection setup. Always uses Stream ID 0.
  • REQUEST_RESPONSE - Used in request/response model. Request a single message.
  • REQUEST_STREAM - Used in request/stream model. Request a stream of messages.
  • REQUEST_FNF - Used in fire-and-forget model. Sends a message.
  • REQUEST_CHANNEL - Used in channel model. Request a stream of messages in both directions.
  • REQUEST_N - Request more items. Used for flow control.
  • PAYLOAD - Payload of a message. Use flags for different states.
  • ERROR - Error at connection or application level.
  • CANCEL - Cancel the outstanding request.

Different frames use different flags. For example, PAYLOAD frame uses Complete flag to indicate stream completion, and Next flag to indicate payload Data and/or Metadata present.

Frames in Different Interaction Models

In RSocket protocol, Requester and Responder exchange frames based on the interaction models.

  • In request/response model, Requester sends a REQUEST_RESPONSE frame, Responder sends a PAYLOAD frame with Complete flag set.
  • In request/stream model, Requester sends a REQUEST_STREAM frame, Responser sends zero or multiple PAYLOAD frames. The PAYLOAD frame with Complete flag indicates that the stream is closed.
  • In fire-and-forget model, Requester sends a REQUEST_FNF frame.
  • In channel model, Requester sends a REQUEST_CHANNEL frame. Both Requester and Responder can send PAYLOAD frames to its peer. The PAYLOAD frame with Complete flag indicates the stream is closed on that side.

Except from fire-and-forget model, Responder can also send a ERROR or CANCEL frame to close the stream.

Standalone Java Implementation

RSocket provides a set of libraries for different programming languages, including Java, Kotlin, JavaScript, Go, .NET and C++. For Java apps, you can simply include the RSocket Maven dependencies. The current version of RSocket Java libraries is 1.0.0-RC5. rsocket-core is the core library. rsocket-transport-netty is the TCP and WebSockets transport layer using Netty. There are other transport layers available, see the Maven repository.

<dependency>
  <groupId>io.rsocket</groupId>
  <artifactId>rsocket-core</artifactId>
  <version>1.0.0-RC5</version>
</dependency>
<dependency>
  <groupId>io.rsocket</groupId>
  <artifactId>rsocket-transport-netty</artifactId>
  <version>1.0.0-RC5</version>
</dependency>

Request/Response model

Let's start from the request/response model. In the code below, RSocketFactory.receive() creates a ServerRSocketFactory object to build a RSocket server. ServerRSocketFactory.acceptor() sets the SocketAcceptor object to accept connections. The AbstractRSocket object only implements requestResponse() method to handle request/response model. The return type Mono<Payload> means at most one Payload object is expected as the response. For each request received, the response will be the request payload with ECHO >> as the prefix. DefaultPayload.create() method is an easy way to create Payload objects. ServerRSocketFactory.transport() sets the transport layer used by RSocket. Here TcpServerTransport is used for TCP on port 7000.

RSocketFactory.connect() creates a ClientRSocketFactory object to build a RSocket client. The transport used is TcpClientTransport for TCP on the same port 7000. The created RSocket object is used to send requests to the server and receive responses.

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import reactor.core.publisher.Mono;

public class RequestResponseExample {

  public static void main(String[] args) {
    RSocketFactory.receive()
        .acceptor(((setup, sendingSocket) -> Mono.just(
            new AbstractRSocket() {
              @Override
              public Mono<Payload> requestResponse(Payload payload) {
                return Mono.just(DefaultPayload.create("ECHO >> " + payload.getDataUtf8()));
              }
            }
        )))
        .transport(TcpServerTransport.create("localhost", 7000))
        .start()
        .subscribe();

    RSocket socket = RSocketFactory.connect()
        .transport(TcpClientTransport.create("localhost", 7000))
        .start()
        .block();

    socket.requestResponse(DefaultPayload.create("hello"))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .block();

    socket.dispose();
  }
}

Request/Stream Model

Below is an example of using request/stream model. The AbstractRSocket object implements requestStream() method to transform input string into a stream of characters. The return type Flux<Payload> means a stream of Payload objects is expected as the response.

public class RequestStreamExample {

  public static void main(String[] args) {
    RSocketFactory.receive()
        .acceptor(((setup, sendingSocket) -> Mono.just(
            new AbstractRSocket() {
              @Override
              public Flux<Payload> requestStream(Payload payload) {
                return Flux.fromStream(payload.getDataUtf8().codePoints()
                    .mapToObj(c -> String.valueOf((char) c))
                    .map(DefaultPayload::create));
              }
            }
        )))
        .transport(TcpServerTransport.create("localhost", 7000))
        .start()
        .subscribe();

    RSocket socket = RSocketFactory.connect()
        .transport(TcpClientTransport.create("localhost", 7000))
        .start()
        .block();

    socket.requestStream(DefaultPayload.create("hello"))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .blockLast();

    socket.dispose();
  }
}

Fire-and-forget Model

Below is an example of using fire-and-forget model. The AbstractRSocket object implements fireAndForget() method to process request messages.

public class FireAndForgetExample {

  public static void main(String[] args) {
    RSocketFactory.receive()
        .acceptor(((setup, sendingSocket) -> Mono.just(
            new AbstractRSocket() {
              @Override
              public Mono<Void> fireAndForget(Payload payload) {
                System.out.println("Receive: " + payload.getDataUtf8());
                return Mono.empty();
              }
            }
        )))
        .transport(TcpServerTransport.create("localhost", 7000))
        .start()
        .subscribe();

    RSocket socket = RSocketFactory.connect()
        .transport(TcpClientTransport.create("localhost", 7000))
        .start()
        .block();

    socket.fireAndForget(DefaultPayload.create("hello")).block();
    socket.fireAndForget(DefaultPayload.create("world")).block();

    socket.dispose();
  }
}

Channel model

Below is an example of the channel model. The AbstractRSocket object implements requestChannel() method to process a stream of strings. flatMap is used to map a string to a stream of its characters.

public class RequestChannelExample {

  public static void main(String[] args) {
    RSocketFactory.receive()
        .acceptor(((setup, sendingSocket) -> Mono.just(
            new AbstractRSocket() {
              @Override
              public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
                return Flux.from(payloads).flatMap(payload ->
                    Flux.fromStream(
                        payload.getDataUtf8().codePoints()
                            .mapToObj(c -> String.valueOf((char) c))
                            .map(DefaultPayload::create)));
              }
            }
        )))
        .transport(TcpServerTransport.create("localhost", 7000))
        .start()
        .subscribe();

    RSocket socket = RSocketFactory.connect()
        .transport(TcpClientTransport.create("localhost", 7000))
        .start()
        .block();

    socket.requestChannel(Flux.just("hello", "world", "goodbye").map(DefaultPayload::create))
        .map(Payload::getDataUtf8)
        .doOnNext(System.out::println)
        .blockLast();

    socket.dispose();
  }
}

In the next part, we'll see how to use RSocket with Spring.

Source code

Source code of this series can be found on GitHub.