Spring Cloud Messaging using Kafka
“What is this Kafka I’ve been hearing about?”
In short, Kafka is a horizontally scalable streaming platform. In other words, Kafka is a message broker which can be run on multiple servers as a cluster. Different data streams are called topics. Producers can place messages on a topic whereas consumers can subscribe to topics. Topics can be configured for single- and multiple delivery of messages. Consumers can be grouped in so called consumer-groups, which makes it possible for multiple consumers to act as one when it comes to single-delivery.
But don’t take my word for it. There’s a lot more to Kafka than I can get into in this post and the original documentation is much clearer, so check out the documentation at https://kafka.apache.org/.
“How do I use Kafka in my Spring applications?”
Among all the abstractions Spring Boot delivers there is also an abstraction layer for using Kafka, called Spring Cloud Stream. The use of the cloud messaging API makes it very easy to produce messages to Kafka and to consume them.
Producing Strings
The following code (and a running Kafka server) is all that is needed to produce a String to a Kafka topic.
@EnableBinding(Source.class)
public class HelloWorldSource {
@InboundChannelAdapter(channel = Source.OUTPUT)
public String sayHello() {
return "Hello, World!";
}
}
The @EnableBinding
annotation tells Spring you want to use Spring Cloud Stream with the Source interface.
Spring uses the method annotated with @InboundChannelAdapter
to create a String which Spring will place on Kafka through the Source.OUTPUT
channel.
By default the sayHello method will be called once per second.
To control the Kafka topic the Source.OUTPUT
channel points to, you only have to set the spring.cloud.stream.bindings.output.destination
property in your application.yaml file to the name of your topic.
Consuming Strings
Spring Cloud Stream Kafka consumer code is just as simple as producing:
@EnableBinding(Sink.class)
public class HelloWorldSink {
private static final Logger LOGGER = LoggerFactory.getLogger(HelloWorldSink.class);
@ServiceActivator(inputChannel=Sink.INPUT)
public void printMessage(final String message) {
LOGGER.info("{}", message);
}
}
Here we use the Sink interface and the @ServiceActivator
annotation which makes Spring trigger the printMessage method each time a message is received on the Kafka topic set on the spring.cloud.stream.bindings.input.destination
property in your application.yaml.
“What about objects other than Strings?”
Most of the time our data is a bit more complex than plain String data and manual (un)marshalling is not very Spring-like.
The @InboundChannelAdapter
handles automatic marshalling just fine, but the @ServiceActivator
does not.
Don’t fret, there is a solution! Enter the @StreamListener
annotation, which will automatically unmarshall the data based on the content-type provided through the spring.cloud.stream.bindings.input.content-type
property.
@EnableBinding(Sink.class)
@Configuration
public class MessageHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandler.class);
@StreamListener(Sink.INPUT)
public void handle(final ChatMessage message) {
final DateTimeFormatter df = DateTimeFormatter.ofLocalizedTime(FormatStyle.MEDIUM).withZone(ZoneId.systemDefault());
final String time = df.format(Instant.ofEpochMilli(message.getTime()));
LOGGER.info("[{}]: {}", time, message.getContents());
}
}
Given a simple ChatMessage class this consumer will work together with the following producer.
@EnableBinding(Source.class)
public class TimerSource {
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
public ChatMessage timerMessageSource() {
return new ChatMessage("hello world", System.currentTimeMillis());
}
}
“Isn’t processing also a thing with Kafka?!”
There is another annotation, @SendTo
, which allows a method to filter Kafka messages or edit them before sending them to another topic.
This concept is shown in the following code.
@EnableBinding(Processor.class)
@Configuration
public class MessageFilter {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public ChatMessage transform(final ChatMessage chatmessage) {
final String contents = chatmessage.getContents().toUpperCase() + "!";
return new ChatMessage(contents, chatmessage.getTime());
}
}
Here we use both the @StreamListener
and the @SendTo
annotations to read a ChatMessage from Kafka, fiddle with the contents and send it to another topic ready for consumption.
A full example of the code in this post can be found at: https://gitlab.com/Dasyel/kafka_blogpost