The December 2019 release of Apache Kafka 2.4.0 saw usability improvements in TopologyTestDriver, with the addition of new TestInputTopic and TestOutputTopic classes. These offer typesafe methods to easily produce and consume messages for your Kafka Streams tests. In this post we’ll explore these new classes in the context of Avro messages, which requires a small trick to get working.

In short we’ll start by defining two Avro contracts; create a Kafka Streams application that consumes one type on an input topic, and conditionally produces a second type on an output topic; then we’ll wire up our tests to verify the stream processing behaviour. The complete example application is available on Github.

Avro contracts

Apache Avro is a data serialization system that provides rich data structures in a compact, fast, binary data format. You define a schema in JSON, which is shared between the producer and consumer. This allows for the efficient serialization, transfer and storage of messages. When applied to Kafka this sees the addition of Confluent Schema Registry, which facilitates the exchange of schemas, as well as schema evolution. You’ll see this reflected in the tests later.

Schema definition

We follow along to Apache Avro Getting Started (Java) to create two contracts, with the Avro Maven plugin configured to generate the associated Java classes.

Listing 1. user.avsc
{"namespace": "example.avro",
 "type": "record",
 "name": "User",
 "fields": [
     {"name": "name", "type": "string"},
     {"name": "favorite_number",  "type": ["int", "null"]},
     {"name": "favorite_color", "type": ["string", "null"]}
 ]
}
Listing 2. color.avsc
{"namespace": "example.avro",
 "type": "record",
 "name": "Color",
 "fields": [
     {"name": "name", "type": "string"}
 ]
}

This results in two classes generated in target/generated-sources/avro, which you might need to add to your IDE build path. We’ve added two unit tests to demonstrate basic serialization and deserialization of these Avro contracts in isolation. Note that this differs from typical and actual use in the context of an Kafka Streams application; It serves only as an illustration of two types of serialization.

Kafka Streams Application

Next up we define a Kafka Streams application to consume and produce messages with the above Avro types. Through start.spring.io we bootstrap an application that uses both Spring Kafka and Kafka Streams. While we’re still using Spring Boot 2.2, we have to override the managed Apache Kafka dependency versions, as indicated in the release announcement. However, we diverge slightly from the approach mentioned in Appendix A, opting instead to customize the dependency versions through properties.

Listing 3. Kafka version override through properties
<properties>
  <java.version>11</java.version>
  <kafka.version>2.4.0</kafka.version>
  <spring-kafka.version>2.4.0.RELEASE</spring-kafka.version>
</properties>

Our full Kafka Streams application then only consists of a single class. We convert an input users-topic with users to an output colors-topic with the user’s favorite color, provided that color is not blue. Note that this is a highly simplified Stream topology; You can build, and test, a lot more advanced operations such as joins and aggregations with ease.

Listing 4. TopologyTestDriverAvroApplication.java
@SpringBootApplication
@EnableKafkaStreams
public class TopologyTestDriverAvroApplication {

  public static void main(String[] args) {
    SpringApplication.run(TopologyTestDriverAvroApplication.class, args);
  }

  @Bean
  public KStream<String, User> handleStream(StreamsBuilder builder) {
    KStream<String, User> userStream = builder.stream("users-topic");
    KStream<String, Color> colorStream = userStream
        .filter((username, user) -> !"blue".equals(user.getFavoriteColor()))
        .mapValues((username, user) -> new Color(user.getFavoriteColor()));
    colorStream.to("colors-topic");
    return userStream;
  }

}

We use the following configuration properties to ensure the messages are (de)serialized correctly.

Listing 5. application.properties
spring.application.name=topology-test-driver-avro
spring.kafka.streams.properties.default.key.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.kafka.streams.properties.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Testing Kafka Streams

Now finally we’ve arrived at the target of this blog post: Testing a Kafka Streams application using the new TestInputTopic and TestOutputTopic classes. The basic test structure is already given in reference guide chapter Testing a Streams application, which consists of:

  1. Creating a Topology through a StreamsBuilder;

  2. Defining required application Properties;

  3. Instantiate a new TopologyTestDriver;

  4. Call createInputTopic with proper key/value serializers;

  5. Call createOutputTopic with proper key/value deserializers;

  6. Within unit tests

    • pipe messages into input topics;

    • read messages from output topics.

While this test structure works fine with straightforward Serdes, there’s a slight complication when it comes to Avro Serdes. As mentioned above, the Kafka Avro (de)serializers expect to call an instance of Confluent Schema Registry to exchange the Avro schemas. However, for our tests we do not want to have to run an instance of the Schema Registry as a packaged or external dependency.

Luckily, we can configure our Avro (de)serializers with an alternative SchemaRegistryClient client in the form of MockSchemaRegistryClient, brought in through kafka-schema-registry-client:5.3.2. This is an in memory implementation for tests, which is not persistent nor shared across instances. Careful examination of the call structures around TopologyTestDriver tells us the driver will create it’s own instances of the Properties configured Serdes, while both createInputTopic and createOutputTopic expect to be passed instances as well. This conflicts with the documented MockSchemaRegistryClient behaviour that schemas are not shared across instances! So, in order to share a MockSchemaRegistryClient between all Avro (de)serializer instances used in our tests, we have to use a workaround.

Now originally I had subclassed SpecificAvroSerde to always use a single static MockSchemaRegistryClient instance. But after sharing my approach Matthias J. Sax was kind enough to point out built-in support for a mock:// pseudo-protocol, specifically to support testing. When you configure the same mock:// URL in both the Properties passed into TopologyTestDriver, as well as for the (de)serializer instances passed into createInputTopic and createOutputTopic, all (de)serializers will use the same MockSchemaRegistryClient, with a single in memory schema store.

Support for the mock:// pseudo-protocol is achieved through AbstractKafkaAvroSerDe, which recognizes mocked URLs and through MockSchemaRegistry holds a reusable MockSchemaRegistryClient in a map using the URL suffix as key. After your tests you can remove the mock registry client and associated schemas again through MockSchemaRegistry#dropScope(String).

In full, the test class then comes down to the following.

class TopologyTestDriverAvroApplicationTests {

  private static final String SCHEMA_REGISTRY_SCOPE = TopologyTestDriverAvroApplicationTests.class.getName();
  private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE;

  private TopologyTestDriver testDriver;

  private TestInputTopic<String, User> usersTopic;
  private TestOutputTopic<String, Color> colorsTopic;

  @BeforeEach
  void beforeEach() throws Exception {
    // Create topology to handle stream of users
    StreamsBuilder builder = new StreamsBuilder();
    new TopologyTestDriverAvroApplication().handleStream(builder);
    Topology topology = builder.build();

    // Dummy properties needed for test diver
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);

    // Create test driver
    testDriver = new TopologyTestDriver(topology, props);

    // Create Serdes used for test record keys and values
    Serde<String> stringSerde = Serdes.String();
    Serde<User> avroUserSerde = new SpecificAvroSerde<>();
    Serde<Color> avroColorSerde = new SpecificAvroSerde<>();

    // Configure Serdes to use the same mock schema registry URL
    Map<String, String> config = Map.of(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
    avroUserSerde.configure(config, false);
    avroColorSerde.configure(config, false);

    // Define input and output topics to use in tests
    usersTopic = testDriver.createInputTopic(
        "users-topic",
        stringSerde.serializer(),
        avroUserSerde.serializer());
    colorsTopic = testDriver.createOutputTopic(
        "colors-topic",
        stringSerde.deserializer(),
        avroColorSerde.deserializer());
  }

  @AfterEach
  void afterEach() {
    testDriver.close();
    MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
  }

  @Test
  void shouldPropagateUserWithFavoriteColorRed() throws Exception {
    User user = new User("Alice", 7, "red");
    usersTopic.pipeInput("Alice", user);
    assertEquals(new Color("red"), colorsTopic.readValue());
  }

  @Test
  void shouldNotPropagateUserWithFavoriteColorBlue() throws Exception {
    User user = new User("Bob", 14, "blue");
    usersTopic.pipeInput("Bob", user);
    assertTrue(colorsTopic.isEmpty());
  }

}

Conclusion

Using the above approach we can easily unit test Kafka Streams applications which exchange Avro messages. Advanced testing functionality such as Wall-clock-time punctuations and accessing state stores are similarly available through TopologyTestDriver, allowing you to rapidly and easily test the full scope of your stream processing applications in a type safe manner.

shadow-left