Kafka Streams TopologyTestDriver with Avro
The December 2019 release of Apache Kafka 2.4.0 saw usability improvements in TopologyTestDriver, with the addition of new TestInputTopic
and TestOutputTopic
classes.
These offer typesafe methods to easily produce and consume messages for your Kafka Streams tests.
In this post we’ll explore these new classes in the context of Avro messages, which requires a small trick to get working.
In short we’ll start by defining two Avro contracts; create a Kafka Streams application that consumes one type on an input topic, and conditionally produces a second type on an output topic; then we’ll wire up our tests to verify the stream processing behaviour. The complete example application is available on Github.
Avro contracts
Apache Avro is a data serialization system that provides rich data structures in a compact, fast, binary data format. You define a schema in JSON, which is shared between the producer and consumer. This allows for the efficient serialization, transfer and storage of messages. When applied to Kafka this sees the addition of Confluent Schema Registry, which facilitates the exchange of schemas, as well as schema evolution. You’ll see this reflected in the tests later.
Schema definition
We follow along to Apache Avro Getting Started (Java) to create two contracts, with the Avro Maven plugin configured to generate the associated Java classes.
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
{"namespace": "example.avro",
"type": "record",
"name": "Color",
"fields": [
{"name": "name", "type": "string"}
]
}
This results in two classes generated in target/generated-sources/avro
, which you might need to add to your IDE build path.
We’ve added two unit tests to demonstrate basic serialization and deserialization of these Avro contracts in isolation.
Note that this differs from typical and actual use in the context of an Kafka Streams application; It serves only as an illustration of two types of serialization.
Kafka Streams Application
Next up we define a Kafka Streams application to consume and produce messages with the above Avro types. Through start.spring.io we bootstrap an application that uses both Spring Kafka and Kafka Streams. While we’re still using Spring Boot 2.2, we have to override the managed Apache Kafka dependency versions, as indicated in the release announcement. However, we diverge slightly from the approach mentioned in Appendix A, opting instead to customize the dependency versions through properties.
<properties>
<java.version>11</java.version>
<kafka.version>2.4.0</kafka.version>
<spring-kafka.version>2.4.0.RELEASE</spring-kafka.version>
</properties>
Our full Kafka Streams application then only consists of a single class.
We convert an input users-topic
with users to an output colors-topic
with the user’s favorite color, provided that color is not blue
.
Note that this is a highly simplified Stream topology; You can build, and test, a lot more advanced operations such as joins and aggregations with ease.
@SpringBootApplication
@EnableKafkaStreams
public class TopologyTestDriverAvroApplication {
public static void main(String[] args) {
SpringApplication.run(TopologyTestDriverAvroApplication.class, args);
}
@Bean
public KStream<String, User> handleStream(StreamsBuilder builder) {
KStream<String, User> userStream = builder.stream("users-topic");
KStream<String, Color> colorStream = userStream
.filter((username, user) -> !"blue".equals(user.getFavoriteColor()))
.mapValues((username, user) -> new Color(user.getFavoriteColor()));
colorStream.to("colors-topic");
return userStream;
}
}
We use the following configuration properties to ensure the messages are (de)serialized correctly.
spring.application.name=topology-test-driver-avro
spring.kafka.streams.properties.default.key.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.kafka.streams.properties.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
Testing Kafka Streams
Now finally we’ve arrived at the target of this blog post: Testing a Kafka Streams application using the new TestInputTopic
and TestOutputTopic
classes.
The basic test structure is already given in reference guide chapter Testing a Streams application, which consists of:
-
Creating a
Topology
through aStreamsBuilder
; -
Defining required application
Properties
; -
Instantiate a new
TopologyTestDriver
; -
Call
createInputTopic
with proper key/value serializers; -
Call
createOutputTopic
with proper key/value deserializers; -
Within unit tests
-
pipe messages into input topics;
-
read messages from output topics.
-
While this test structure works fine with straightforward Serdes, there’s a slight complication when it comes to Avro Serdes. As mentioned above, the Kafka Avro (de)serializers expect to call an instance of Confluent Schema Registry to exchange the Avro schemas. However, for our tests we do not want to have to run an instance of the Schema Registry as a packaged or external dependency.
Luckily, we can configure our Avro (de)serializers with an alternative SchemaRegistryClient
client in the form of MockSchemaRegistryClient
, brought in through kafka-schema-registry-client:5.3.2
.
This is an in memory implementation for tests, which is not persistent nor shared across instances.
Careful examination of the call structures around TopologyTestDriver
tells us the driver will create it’s own instances of the Properties
configured Serdes, while both createInputTopic
and createOutputTopic
expect to be passed instances as well.
This conflicts with the documented MockSchemaRegistryClient
behaviour that schemas are not shared across instances!
So, in order to share a MockSchemaRegistryClient
between all Avro (de)serializer instances used in our tests, we have to use a workaround.
Now originally I had subclassed SpecificAvroSerde
to always use a single static MockSchemaRegistryClient
instance.
But after sharing my approach Matthias J. Sax was kind enough to point out built-in support for a mock://
pseudo-protocol, specifically to support testing.
When you configure the same mock://
URL in both the Properties
passed into TopologyTestDriver
, as well as for the (de)serializer instances passed into createInputTopic
and createOutputTopic
, all (de)serializers will use the same MockSchemaRegistryClient
, with a single in memory schema store.
Support for the mock://
pseudo-protocol is achieved through AbstractKafkaAvroSerDe, which recognizes mocked URLs and through MockSchemaRegistry
holds a reusable MockSchemaRegistryClient
in a map using the URL suffix as key.
After your tests you can remove the mock registry client and associated schemas again through MockSchemaRegistry#dropScope(String)
.
In full, the test class then comes down to the following.
class TopologyTestDriverAvroApplicationTests {
private static final String SCHEMA_REGISTRY_SCOPE = TopologyTestDriverAvroApplicationTests.class.getName();
private static final String MOCK_SCHEMA_REGISTRY_URL = "mock://" + SCHEMA_REGISTRY_SCOPE;
private TopologyTestDriver testDriver;
private TestInputTopic<String, User> usersTopic;
private TestOutputTopic<String, Color> colorsTopic;
@BeforeEach
void beforeEach() throws Exception {
// Create topology to handle stream of users
StreamsBuilder builder = new StreamsBuilder();
new TopologyTestDriverAvroApplication().handleStream(builder);
Topology topology = builder.build();
// Dummy properties needed for test diver
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
// Create test driver
testDriver = new TopologyTestDriver(topology, props);
// Create Serdes used for test record keys and values
Serde<String> stringSerde = Serdes.String();
Serde<User> avroUserSerde = new SpecificAvroSerde<>();
Serde<Color> avroColorSerde = new SpecificAvroSerde<>();
// Configure Serdes to use the same mock schema registry URL
Map<String, String> config = Map.of(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, MOCK_SCHEMA_REGISTRY_URL);
avroUserSerde.configure(config, false);
avroColorSerde.configure(config, false);
// Define input and output topics to use in tests
usersTopic = testDriver.createInputTopic(
"users-topic",
stringSerde.serializer(),
avroUserSerde.serializer());
colorsTopic = testDriver.createOutputTopic(
"colors-topic",
stringSerde.deserializer(),
avroColorSerde.deserializer());
}
@AfterEach
void afterEach() {
testDriver.close();
MockSchemaRegistry.dropScope(SCHEMA_REGISTRY_SCOPE);
}
@Test
void shouldPropagateUserWithFavoriteColorRed() throws Exception {
User user = new User("Alice", 7, "red");
usersTopic.pipeInput("Alice", user);
assertEquals(new Color("red"), colorsTopic.readValue());
}
@Test
void shouldNotPropagateUserWithFavoriteColorBlue() throws Exception {
User user = new User("Bob", 14, "blue");
usersTopic.pipeInput("Bob", user);
assertTrue(colorsTopic.isEmpty());
}
}
Conclusion
Using the above approach we can easily unit test Kafka Streams applications which exchange Avro messages. Advanced testing functionality such as Wall-clock-time punctuations and accessing state stores are similarly available through TopologyTestDriver, allowing you to rapidly and easily test the full scope of your stream processing applications in a type safe manner.