In our Tech Radar we put CloudEvents in the trial phase of the platforms quadrant. We’d like to take you on a journey and show you what CloudEvents are and how they can be used. In our Second blog about CloudEvents we showed you how they can be applied. In this third blog we will be looking at CloudEvent extensions, a means to extend the specification with propriatary properties!

Building a CloudEventExtension

We’ll be continuing with the source code from the previous blog.

Within our context let’s say every message on the topic can be related to an employee. This is important meta information that we want to add to every message, so let’s write a CloudEventExtension for it! The CloudEvent SDK provides an interface for extensions, which we’ll be using. This interface defines a few convenience methods for retrieving the key(s) and their respective value(s).

Here is what the implemented extension looks like:

import io.cloudevents.CloudEventExtension;
import io.cloudevents.CloudEventExtensions;
import io.cloudevents.core.extensions.impl.ExtensionUtils;
import java.util.Set;

public class JDrivenCloudEventExtension implements CloudEventExtension {

  public static final String EMPLOYEE = "employee";

  private static final Set<String> KEY_SET = Set.of(EMPLOYEE);

  private String employee;

  public JDrivenCloudExtension() {
  }

  public JDrivenCloudExtension(String employee) {
    this.employee = employee;
  }

  @Override
  public void readFrom(CloudEventExtensions extensions) {
    Object employee = extensions.getExtension(EMPLOYEE);
    if (employee != null) {
      this.employee = employee.toString();
    }
  }

  @Override
  public Object getValue(String key) throws IllegalArgumentException {
    if (key.equals(EMPLOYEE)) {
      return this.employee;
    }
    throw ExtensionUtils.generateInvalidKeyException(this.getClass(), key);
  }

  @Override
  public Set<String> getKeys() {
    return KEY_SET;
  }
}

Now let’s see how we can use it!

Using the CloudEventExtension

Sending

In order to add the extension when sending a CloudEvent, we can use the provided builder method:

public CloudEvent getDemoCloudEvent() {
  // Build an event
  return CloudEventBuilder.v1()
    .withId("3")
    ...
    .withData(
        "<blog><title>CloudEvents in practice</title><author>Laurens Westerlaken</author></blog>"
            .getBytes(StandardCharsets.UTF_8))
    .withExtension(new JDrivenCloudEventExtension("Laurens Westerlaken"))
    .build();
}

Receiving

When receiving CloudEvents we can make use of the ExtensionProvider that is available:

@KafkaListener(topics = TOPIC)
public void receive(ConsumerRecord<String, CloudEvent> event) {
  ...

  JDrivenCloudEventExtension jcee = ExtensionProvider.getInstance()
          .parseExtension(JDrivenCloudEventExtension.class, event.value());
  logger.debug("with employee: {}", jcee.getValue(JDrivenCloudEventExtension.EMPLOYEE));
}

For this to work we have to register our CloudEventExtension with the provider. For now I have added this in a PostConstruct method on the sender class:

@PostConstruct
public void init() {
  ExtensionProvider.getInstance()
      .registerExtension(JDrivenCloudEventExtension.class, JDrivenCloudEventExtension::new);
}

And that’s all there is to it! We’ve created a CloudEventExtension which allows us to easily provide the employee name for every event sent! Let’s quickly verify it works.

Verifying it works

Let’s chain an anySatisfy to our existing testcase as follows:

@Test
void testDefaultEventSent() {
  LogCaptor logCaptor = LogCaptor.forClass(CloudEventReceiver.class);
  sender.send(sender.getDemoCloudEvent());
  await()
      .atMost(15, TimeUnit.SECONDS)
      .untilAsserted(
          () ->
              assertThat(logCaptor.getDebugLogs())
                  .anySatisfy(
                      ...
                  .anySatisfy(
                      logEvent ->
                          assertThat(logEvent)
                              .contains(
                                  "with employee: Laurens Westerlaken")));
}

Running the testcase shows us everything is still in working order, and the new meta information is being transferred!

shadow-left