Micronaut Mastery: Consuming Server-Sent Events (SSE)
Normally we would consume server-sent events (SSE) in a web browser, but we can also consume them in our code on the server. Micronaut has a low-level HTTP client with a SseClient
interface that we can use to get server-sent events. The interface has an eventStream
method with different arguments that return a Publisher
type of the Reactive Streams API. We can use the RxSseClient
interface to get back RxJava2 Flowable
return type instead of Publisher
type. We can also use Micronaut’s declarative HTTP client, which we define using the @Client
annotation, that supports server-sent events with the correct annotation attributes.
In our example we first create a controller in Micronaut to send out server-sent events. We must create method that returns a Publisher
type with Event
objects. These Event
objects can contains some attributes like id
and name
, but also the actual object we want to send:
package mrhaki;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.sse.Event;
import io.reactivex.Flowable;
import java.util.concurrent.TimeUnit;
@Controller("/conferences")
public class ConferencesController {
private final ConferenceRepository repository;
public ConferencesController(final ConferenceRepository repository) {
this.repository = repository;
}
/**
* Send each second a random Conference.
*
* @return Server-sent events each second where the event is a randomly
* selected Conference object from the repository.
*/
@Get("/random")
Flowable> events() {
final Flowable tick = Flowable.interval(1, TimeUnit.SECONDS);
final Flowable randomConferences = repository.random().repeat();
return tick.zipWith(randomConferences, this::createEvent);
}
/**
* Create a server-sent event with id, name and the Conference data.
*
* @param counter Counter used as id for event.
* @param conference Conference data as payload for the event.
* @return Event with id, name and Conference object.
*/
private Event createEvent(Long counter, final Conference conference) {
return Event.of(conference)
.id(String.valueOf(counter))
.name("randomEvent");
}
}
Notice how easy it is in Micronaut to use server-sent events. Let’s add a declarative HTTP client that can consume the server-sent events. We must set the processes
attribute of the @Get
annotation with the value text/event-stream
. This way Micronaut can create an implementation of this interface with the correct code to consume server-sent events:
package mrhaki;
import io.micronaut.http.MediaType;
import io.micronaut.http.annotation.Get;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.sse.Event;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@Client("/conferences")
interface ConferencesSseClient {
/**
* Return Publisher with SSE containing Conference data.
* We must set the processes attribute with the value
* text/event-stream so Micronaut can generate an implementation
* to support server-sent events.
* We could also return Publisher implementation class
* like Flowable or Flux, Micronaut will do the conversion.
*
* @return Publisher with Event objects with Conference data.
*/
@Get(value = "/random", processes = MediaType.TEXT_EVENT_STREAM)
Publisher> randomEvents();
/**
* Here we use a Publisher implementation Flux. Also we don't
* add the Event in the return type: Micronaut will leave out
* the event metadata and we get the data that is part of
* the event as object.
*
* @return Flux with Conference data.
*/
@Get(value = "/random", processes = MediaType.TEXT_EVENT_STREAM)
Flux randomConferences();
}
Next we create a Spock specification to test our controller with server-sent events. In the specification we use the low-level HTTP client and the declarative client:
package mrhaki
import io.micronaut.context.ApplicationContext
import io.micronaut.http.client.sse.RxSseClient
import io.micronaut.http.client.sse.SseClient
import io.micronaut.http.sse.Event
import io.micronaut.runtime.server.EmbeddedServer
import io.reactivex.Flowable
import spock.lang.AutoCleanup
import spock.lang.Shared
import spock.lang.Specification
class ConferencesControllerSpec extends Specification {
@Shared
@AutoCleanup
private EmbeddedServer server = ApplicationContext.run(EmbeddedServer)
/**
* Low level client to interact with server
* that returns server side events, that supports
* RxJava2.
*/
@Shared
@AutoCleanup
private RxSseClient sseLowLevelClient =
server.applicationContext
.createBean(RxSseClient, server.getURL())
/**
* Declarative client for interacting
* with server that send server side events.
*/
@Shared
private ConferencesSseClient sseClient =
server.applicationContext
.getBean(ConferencesSseClient)
void "test event stream with low level SSE client"() {
when:
// Use eventStream method of RxSseClient to get SSE
// and convert data in event to Conference objects by
// setting second argument to Conference.class.
final List> result =
sseLowLevelClient.eventStream("/conferences/random", Conference.class)
.take(2)
.toList()
.blockingGet()
then:
result.name.every { name -> name == "randomEvent" }
result.id == ["0", "1"]
result.data.every { conference -> conference instanceof Conference }
}
void "test event stream with declarative SSE client"() {
when:
// Use declarative client (using @Client)
// with SSE support.
List> result =
Flowable.fromPublisher(sseClient.randomEvents())
.take(2)
.toList()
.blockingGet();
then:
result.name.every { name -> name == "randomEvent" }
result.id == ["0", "1"]
result.data.every { conference -> conference instanceof Conference }
}
void "test conference stream with declarative SSE client"() {
when:
// Use declarative client (using @Client)
// with built-in extraction of data in event.
List result =
sseClient.randomConferences()
.take(2)
.collectList()
.block();
then:
result.id.every(Closure.IDENTITY) // Check every id property is set.
result.every { conference -> conference instanceof Conference }
}
}
Written with Micronaut 1.0.0.RC1.