In our Tech Radar we put CloudEvents in the trial phase. We’d like to take you on a journey and show you what CloudEvents are and how they can be used. In our First blog about CloudEvents we introduced you to the specification. In this second blog we will be looking at how CloudEvents can be used in practice!

The dependencies

We’ll be using the CloudEvents Kafka dependency in combination with Spring Boot to show you how you can use CloudEvents! On top of the Spring Boot starter we add the following dependencies:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
    <groupId>io.cloudevents</groupId>
    <artifactId>cloudevents-kafka</artifactId>
    <version>4.0.1</version>
</dependency>

We have also added logcaptor and awaitility to make life easy and verify CloudEvents are actually sent later on.

Sending a CloudEvent

The CloudEvent API comes with a builder that allows you to easily set all required values conform the specification. Let’s implement a component that can build and send a CloudEvent:

@Component
public class CloudEventSender {

  public CloudEventSender(KafkaTemplate<String, CloudEvent> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
  }

  private final KafkaTemplate<String, CloudEvent> kafkaTemplate;
  private static final String TOPIC = "blogs";

  public CloudEvent getDemoCloudEvent() {
    // Build an event
    return CloudEventBuilder.v1()
        .withId("2")
        .withSource(URI.create("/jdriven/blog"))
        .withType("com.jdriven.blog.1.0.0")
        .withDataContentType("text/xml")
        .withDataSchema(URI.create("schemas.jdriven.com/blog"))
        .withSubject("Second CloudEvent blog")
        .withTime(OffsetDateTime.of(2024, 7, 15, 9, 0, 0, 0, ZoneOffset.UTC))
        .withData(
            "<blog><title>A first look at CloudEvents</title><author>Laurens Westerlaken</author></blog>"
                .getBytes(StandardCharsets.UTF_8))
        .build();
  }

  public void send(CloudEvent event) {
    kafkaTemplate.send(new ProducerRecord<>(TOPIC, event.getId(), event));
  }
}

You see how easy this was? Now let’s move on to receiving the event!

Receiving a CloudEvent

Receiving the CloudEvent using Spring can be achieved by implementing a KafkaListener. In our case we use the annotation based approach with the @KafkaListener annotation.

@Component
public class CloudEventReceiver {

  private static final Logger logger = LoggerFactory.getLogger(CloudEventReceiver.class);

  private static final String TOPIC = "blogs";

  @KafkaListener(topics = TOPIC)
  public void receive(ConsumerRecord<String, CloudEvent> event) {
    logger.debug("receiving CloudEvent: {}", event.value().toString());
    logger.debug("with data: {}", new String(event.value().getData().toBytes()));
  }
}

The result is that our CloudEvent is received and logged. The data from our CloudEvent is extracted and logged as a separate log line.

All we need to do now is very it works as intended!

Verifying that it works

To be able to verify it works we first have to configure the correct serializer/deserializer If we don’t add this configuration an attempt is made to serialize/deserialize the CloudEvent to String. This boils down to returning the Object reference as a String.

spring.kafka.producer.value-serializer=io.cloudevents.kafka.CloudEventSerializer
spring.kafka.consumer.value-deserializer=io.cloudevents.kafka.CloudEventDeserializer

After that we can verify sending and receiving the CloudEvent works:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = "blogs")
class CloudEventTest {

  @Autowired private CloudEventSender sender;

  @Test
  void testDefaultEventSent() {
    // As our receiver only logs a message we capture the logs
    LogCaptor logCaptor = LogCaptor.forClass(CloudEventReceiver.class);
    sender.send(sender.getDemoCloudEvent());

    // We use awaitility to wait for asynchronous event processing
    await()
        .atMost(3, TimeUnit.SECONDS)
        .untilAsserted(
            () ->
                assertThat(logCaptor.getDebugLogs())
                    .anySatisfy(
                        logEvent ->
                            assertThat(logEvent)
                                .contains(
                                    "<blog><title>A first look at CloudEvents</title><author>Laurens Westerlaken</author></blog>")));
  }
}

And let’s run it:

CloudEventTest

We see that the test succeeded and when looking at the logs after the test ran we can see both log messages

DEBUG 15636 --- [cloudevents-demo] [ntainer#0-0-C-1] c.j.cloudevents.CloudEventReceiver       : receiving CloudEvent: CloudEvent{id='2', source=/jdriven/blog, type='com.jdriven.blog.1.0.0', datacontenttype='text/xml', dataschema=schemas.jdriven.com/blog, subject='Second CloudEvent blog', time=2024-07-14T09:00Z, data=BytesCloudEventData{value=[60, 98, 108, 111, 103, 62, 60, 116, 105, 116, 108, 101, 62, 65, 32, 102, 105, 114, 115, 116, 32, 108, 111, 111, 107, 32, 97, 116, 32, 67, 108, 111, 117, 100, 69, 118, 101, 110, 116, 115, 60, 47, 116, 105, 116, 108, 101, 62, 60, 97, 117, 116, 104, 111, 114, 62, 76, 97, 117, 114, 101, 110, 115, 32, 87, 101, 115, 116, 101, 114, 108, 97, 107, 101, 110, 60, 47, 97, 117, 116, 104, 111, 114, 62, 60, 47, 98, 108, 111, 103, 62]}, extensions={}}
DEBUG 15636 --- [cloudevents-demo] [ntainer#0-0-C-1] c.j.cloudevents.CloudEventReceiver       : with data: <blog><title>A first look at CloudEvents</title><author>Laurens Westerlaken</author></blog>

Wrapping up

As you can see it’s quite simple to get started with CloudEvents! Some things to take into account:

  • We represented the CloudEvent data as a String, but you’ll obviously be working with more complex objects.

  • Because we represented the data as String parsing was easy, when using a schema registry this becomes a bit more complex.

In the next blog we will be looking at implementing an extension of the CloudEvent specification, this allows you to add your own propriatary metadata to CloudEvents!

shadow-left