In the previous post, we talked about RSocket protocol and standalone usage of RSocket in Java. In this post, we'll discuss how to integrate RSocket with Spring framework.

The easiest way to use RSocket with Spring is using Spring Boot. You can create a new Spring Boot project using start.spring.io with Spring Boot 2.2.1 release and RSocket as the dependency. The Spring integration with RSocket is in spring-messaging module.

Request/Response mode

Controller

With Spring integration with RSocket, we can easily create a Controller using request/response interaction mode. In the EchoController below, the echo() method takes a String input and returns a Mono<String> result. The annotation @MessageMapping("echo") indicates this endpoint is available in path echo.

@Controller
public class EchoController {

  @MessageMapping("echo")
  public Mono<String> echo(String input) {
    return Mono.just("ECHO >> " + input);
  }
}

To actually start the RSocket server, you need to add the following line to application.properties. The property spring.rsocket.server.port triggers Spring Boot's auto-configuration of RSocket.

spring.rsocket.server.port=7100

When you start Spring Boot server, the RSocket server will be running on port 7100.

Unit Test

Below is a JUnit 5 test case for testing the EchoController shown above. In the test case, we use a RSocketRequester object to send request to the EchoController. requester.route("echo") is important to route the message to the correct controller. StepVerifier is used to verify the returned Mono<String> object.

@SpringBootTest
class EchoServerTest extends AbstractTest {

  @Test
  @DisplayName("Test echo server")
  void testEcho() {
    RSocketRequester requester = createRSocketRequester();
    Mono<String> response = requester.route("echo")
        .data("hello")
        .retrieveMono(String.class);
    StepVerifier.create(response)
        .expectNext("ECHO >> hello")
        .expectComplete()
        .verify();
  }

}

AbstractTest class shown below is the parent class for all RSocket related test cases. RSocketRequester.Builder class is used to build RSocketRequester objects. Here we use text/plain as the MIME type of the data payload.

abstract class AbstractTest {

  @Value("${spring.rsocket.server.port}")
  private int serverPort;
  @Autowired
  private RSocketRequester.Builder builder;

  RSocketRequester createRSocketRequester() {
    return builder.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
        .connect(TcpClientTransport.create(serverPort)).block();
  }
}

Other Interaction Modes

We can now add controllers for other RSocket interaction modes.

Request/Stream

StringSplitController class shown below is for request/stream mode.

@Controller
public class StringSplitController {
  @MessageMapping("stringSplit")
  public Flux<String> stringSplit(String input) {
    return Flux.fromStream(input.codePoints().mapToObj(c -> String.valueOf((char) c)));
  }
}

Fire-and-Forget

DataCollector class shown below is for fire-and-forget mode.

@Controller
public class DataCollector {

  @MessageMapping("collect")
  public Mono<Void> collect(String data) {
    System.out.println("Received >> " + data);
    return Mono.empty();
  }
}

Channel

StringsSplitController class shown below is for channel mode.

@Controller
public class StringsSplitController {

  @MessageMapping("stringsSplit")
  public Flux<String> stringsSplit(Publisher<String> strings) {
    return Flux.from(strings).flatMap(
        input -> Flux.fromStream(input.codePoints().mapToObj(c -> String.valueOf((char) c))));
  }
}

We can also add test cases for these controllers. StringsSplitTest class below is the test case for StringsSplitController.

@SpringBootTest
class StringsSplitTest extends AbstractTest {
  @Test
  @DisplayName("Test strings split")
  void testStringsSplit() {
    RSocketRequester requester = createRSocketRequester();
    Flux<String> response = requester.route("stringsSplit")
        .data(Flux.fromArray(new String[] {"hello", "world"}))
        .retrieveFlux(String.class);

    StepVerifier.create(response)
        .expectNext("h", "e", "l", "l", "o", "w", "o", "r", "l", "d")
        .expectComplete()
        .verify();
  }
}

As you can see in the code examples, Spring integration makes it very easy to use RSocket.

Debugging

To make debugging easier, you can add the following line to application.properties to enable debugging output. Once enabled, RSocket frames will be displayed.

logging.level.io.rsocket.FrameLogger=DEBUG

Below is a sample output of the debug logging message.

2019-11-12 11:52:49.975 DEBUG 22727 --- [actor-tcp-nio-1] io.rsocket.FrameLogger                   : sending ->
Frame => Stream ID: 1 Type: REQUEST_RESPONSE Flags: 0b100000000 Length: 23
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 65 63 68 6f                      |.....echo       |
+--------+-------------------------------------------------+----------------+
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 68 65 6c 6c 6f                                  |hello           |
+--------+-------------------------------------------------+----------------+

In the next part, we'll see how to use WebSockets with RSocket.

Source code

Source code of this series can be found on GitHub.