R(eactive)Socket: Bootified

  • request →← response
  • request →←← stream
  • request channel →→←→←
  • fire and forget →!

rsocket-server

  • Select the following dependencies and download the project to your local environment,
  • And the maven pom would be like,
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring:
rsocket:
server:
port: 8000

rsocket-client

  • Select the following dependencies and download the project to your local environment,
  • And the maven pom would be like,
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
@Bean
RSocketRequester requester(RSocketStrategies strategies) throws URISyntaxException {
return RSocketRequester.builder().rsocketStrategies(strategies)
.rsocketFactory(factory -> {
factory.dataMimeType(MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY);
})
.connect(TcpClientTransport.create(new InetSocketAddress(
clientConfigProp.getHost(), clientConfigProp.getPort())))
.retry().block();
}
enum EntQuote {

PVTL("PVTL", 11.59),
VMW("VMW", 169.60),
DELL("DELL", 54.99),
GOOGL("GOOGL", 1_064.54),
MSFT("MSFT", 131.58),
AAPL("AAPL", 199.80),
FB("FB", 189.50),
NFLX("NFLX", 370.02),
AMZN("AMZN", 1_904.28);
}

QuoteGenerator() {
stream(EntQuote.values()).forEach(e -> this.prices.add(
new Quote(e.getTicker(), new BigDecimal(e.getPrice(), this.mathContext),
Instant.now(), counter.incrementAndGet())));
this.quoteStream = Flux.interval(Duration.ofSeconds(1))
.flatMap((e) -> Flux.fromIterable(prices.stream().map(baseQuote -> {
BigDecimal priceChange = baseQuote.getPrice().multiply(
new BigDecimal(0.05 * this.random.nextDouble()),
this.mathContext);
return new Quote(baseQuote.getTicker(),
baseQuote.getPrice().add(priceChange), Instant.now(),
counter.incrementAndGet());
}).collect(Collectors.toList()))).share();
}

Let’s get into the interaction models:

  1. request →← response
@GetMapping(value = "/v1/quote/{symbol}", produces = MediaType.APPLICATION_JSON_VALUE)
public Publisher<Quote> getAQuote(@PathVariable String symbol) {
return requester.route("a-quote").data(Mono.just(symbol))
.retrieveMono(Quote.class);
}
@MessageMapping("a-quote")
public Mono<Quote> getAQuote(String symbol) {
return quoteGenerator.getQuote(symbol);
}
public Mono<Quote> getQuote(String symbol) {
return quoteStream.takeWhile(quote -> quote.getTicker().equalsIgnoreCase(symbol))
.take(1).switchIfEmpty(Mono.just(new Quote())).single();
}
/v1/quote/PVTL
@GetMapping(value = "/v1/stream/quotes", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Publisher<Quote> getAllQuotesStream() {
return this.requester.route("all-quote-stream").data(Mono.empty())
.retrieveFlux(Quote.class);
}
@MessageMapping("all-quote-stream")
public Flux<Quote> getAllQuotesStream() {
return quoteGenerator.getQuoteStream();
}
/v1/stream/quotes
@GetMapping(value = "/v1/stream/quotes/faang", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Publisher<Quote> getFilteredQuotesStream() {
return requester.route("filtered-quote-stream")
.data(Flux.just("FB", "AMZN", "AAPL", "NFLX", "GOOGL"))
.retrieveFlux(Quote.class);
}
@MessageMapping("filtered-quote-stream")
public Flux<Quote> getFilteredQuotesStream(Flux<String> symbol) {
return quoteGenerator.getFilteredQuoteStream(symbol);
}
public Flux<Quote> getFilteredQuoteStream(Flux<String> symbol) {
return symbol.flatMap(this::getQuoteStream).switchIfEmpty(Mono.just(new Quote()));
}
/v1/stream/quotes/faang

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Srinivasa Vasu

Srinivasa Vasu

Aspiring Software Artist | views expressed on this blog are solely mine |