Apache Kafka is often used together with Confluent Schema Registry, as the schema registry allows you to store and retrieve your Avro, JSON Schema and Protobuf schemas for Kafka message (de)serialization. By storing a versioned history of schemas for topic values, with configurable enforced compatibility, you ensure producers and consumers can continue to exchange compact serialized messages even as schemas evolve.

By default, client applications automatically register new schemas. If they produce new messages to a new topic, then they will automatically try to register new schemas. This is very convenient in development environments, but in production environments we recommend that client applications do not automatically register new schemas. Best practice is to register schemas outside of the client application to control when schemas are registered with Schema Registry and how they evolve.
— On-Premises Schema Registry Tutorial
On Auto Schema Registration

The above quote is taken from the Confluent Schema Registry documentation. Unfortunately, the documentation only mentions how auto schema registration can be disabled, how you can manually register a schema, or how you can register a schema through a curl command. The documentation stops short of explicitly mentioning a more convenient way to register your schemas:
The Schema Registry Maven Plugin.

In this blogpost & repository I’ll walk you through creating a Maven project for your Avro schemas, and how to register your schemas with your schema registry by deploying a Docker image to production. The associated code can be found on Github.

Project structure

We start with a Maven pom.xml file with a dependency on Apache Avro through org.apache.avro:avro:1.10.2. Next we add test dependencies for JUnit 5 and AssertJ, such that we can demonstrate generated class serialization. Then we configure two Maven plugins:

Define Avro schemas

We define a total of three Avro schemas, one for books, and one for orders, both of which reference a price schema, just to make things slightly more interesting than the Apache Avro specification samples. We also use a variety of types to showcase the use of logicalTypes, as well as the effect of flags passed to the Avro Maven Plugin for Java code generation.

Listing 1. Price.avsc
{
  "namespace": "com.github.timtebeek.avro",
  "type": "record",
  "name": "Price",
  "fields": [
    {
      "name": "value",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 3
      }
    }
  ]
}
Listing 2. Order.avsc
{
  "namespace": "com.github.timtebeek.avro",
  "type": "record",
  "name": "Order",
  "fields": [
    {
      "name": "orderId",
      "type": {
        "type": "string",
        "logicalType": "uuid"
      }
    },
    {
      "name": "price",
      "type": "com.github.timtebeek.avro.Price"
    }
  ]
}
Listing 3. Book.avsc
{
  "namespace": "com.github.timtebeek.avro",
  "type": "record",
  "name": "Book",
  "fields": [
    {
      "name": "bookId",
      "type": {
        "type": "string",
        "logicalType": "uuid"
      }
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "subTitle",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "price",
      "type": "com.github.timtebeek.avro.Price"
    }
  ]
}

As you can see the Price type references in Book.avsc and Order.avsc use the namespace of the Price records. This way you can define types once, and reuse them between different Avro schemas. Note however that type references are not supported in all code generation implementations, a notable exception being AvroGen for C#. ☹️

Generate Java classes (optional)

Avro schemas can be converted into classes for easy reuse between projects. For Java this is most easily achieved through the Avro Maven plugin. In the below snippet we explain some of the configuration overrides applied to influence the generated classes. The import of Price.avsc is what allows us to reuse Price in Book.avsc and Order.avsc.

Listing 4. Avro Maven plugin configuration
<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.10.2</version>
  <configuration>
    <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
    <!-- Use private visibility for Avro class fields instead of public -->
    <fieldVisibility>PRIVATE</fieldVisibility>
    <!-- Convert Avro logicalType uuid into Java UUID instead of String -->
    <customConversions>org.apache.avro.Conversions$UUIDConversion</customConversions>
    <!-- Convert Avro logicalType decimal into Java BigDecimal instead of ByteBuffer -->
    <enableDecimalLogicalType>true</enableDecimalLogicalType>
    <!-- Convert Avro string into Java String instead of CharSequence -->
    <stringType>String</stringType>
    <imports>
      <!-- Allow Book and Order classes to reference type specified in Price.avsc -->
      <import>src/main/avro/com/github/timtebeek/avro/Price.avsc</import>
    </imports>
  </configuration>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
    </execution>
  </executions>
</plugin>

Compiled classes will appear in target/generated-sources/avro/, as well as in the jar artifact produced by the project. The jar artifact can then be used as a versioned library in other projects.

Configure Schema Registry Maven plugin

Next we want to tell the Schema Registry Maven plugin which Avro schemas to register with what Schema Registry instance.

Listing 5. Schema Registry Maven plugin configuration
<plugin>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry-maven-plugin</artifactId>
  <version>6.1.1</version>
  <configuration>
    <schemaRegistryUrls>
      <param>${schema-registry-url}</param>
    </schemaRegistryUrls>
    <references>
      <!-- Reference price as stored in schema registry for books and orders topic values -->
      <books-topic-value>
        <reference>
          <name>com.github.timtebeek.avro.Price</name>
          <subject>com.github.timtebeek.avro.Price</subject>
          <version>1</version>
        </reference>
      </books-topic-value>
      <orders-topic-value>
        <reference>
          <name>com.github.timtebeek.avro.Price</name>
          <subject>com.github.timtebeek.avro.Price</subject>
          <version>1</version>
        </reference>
      </orders-topic-value>
    </references>
    <subjects>
      <!-- Register price separately at schema registry, so Book and Order values can reference it -->
      <com.github.timtebeek.avro.Price>src/main/avro/com/github/timtebeek/avro/Price.avsc</com.github.timtebeek.avro.Price>
      <!-- Register value serialization schemas for topics books and orders -->
      <books-topic-value>src/main/avro/com/github/timtebeek/avro/Book.avsc</books-topic-value>
      <orders-topic-value>src/main/avro/com/github/timtebeek/avro/Order.avsc</orders-topic-value>
    </subjects>
  </configuration>
</plugin>

Notice how we have to separately register the Price schema with the registry, and add explicit references for the books an orders topic values. If you’re not using references, you can of course leave out <references> completely.

If you’re using JSON Schema or ProtoBuf, you can add <schemaTypes> with the type per subject.

You’ll want to make sure your subject names match up with your subject name strategy. The default TopicNameStrategy derives the subject name from the topic name:

Listing 6. io.confluent.kafka.serializers.subject.TopicNameStrategy.java
@Override
public String subjectName(String topic, boolean isKey, ParsedSchema schema) {
  return isKey ? topic + "-key" : topic + "-value";
}

Local validation and compatibility tests

Before we register our Avro schemas with the production Schema Registry instance, we’ll want to validate our local Avro schemas, and test compatibility with previously registered Avro Schemas.

To aid in quick local validation, we’ve setup a Docker compose file that launches an instance of Confluent Schema Registry, along with required Kafka and Zookeeper instances. Additionally we launch an instance of AKHQ which allows us to inspect our registered Avro Schemas.

Show Docker-Compose.yml
Listing 7. docker-compose.yml
version: '3.6'

volumes:
  zookeeper-data:
    driver: local
  zookeeper-log:
    driver: local
  kafka-data:
    driver: local

services:
  akhq:
    image: tchiotludo/akhq
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            docker-kafka-server:
              properties:
                bootstrap.servers: "kafka:9092"
              schema-registry:
                url: "http://schema-registry:8085"
    ports:
      - 8080:8080
    links:
      - kafka
      - schema-registry

  zookeeper:
    image: confluentinc/cp-zookeeper
    volumes:
      - zookeeper-data:/var/lib/zookeeper/data:Z
      - zookeeper-log:/var/lib/zookeeper/log:Z
    environment:
      ZOOKEEPER_CLIENT_PORT: '2181'
      ZOOKEEPER_ADMIN_ENABLE_SERVER: 'false'

  kafka:
    image: confluentinc/cp-kafka
    volumes:
      - kafka-data:/var/lib/kafka/data:Z
    environment:
      KAFKA_BROKER_ID: '0'
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_NUM_PARTITIONS: '12'
      KAFKA_COMPRESSION_TYPE: 'gzip'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: '1'
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: '1'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092'
      KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE: 'false'
      KAFKA_JMX_PORT: '9091'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_AUTHORIZER_CLASS_NAME: 'kafka.security.auth.SimpleAclAuthorizer'
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true'
    links:
      - zookeeper

  schema-registry:
    image: confluentinc/cp-schema-registry
    depends_on:
      - kafka
    ports:
      - 8085:8085
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      SCHEMA_REGISTRY_HOST_NAME: 'schema-registry'
      SCHEMA_REGISTRY_LISTENERS: 'http://0.0.0.0:8085'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: 'INFO'

With the use of Docker compose these are the steps to validate and test local schema changes:

  1. Start the local Schema Registry, Kafka, Zookeeper & AKHQ

    $ docker-compose up --abort-on-container-exit
  2. Register the original unchanged schemas against the local schema registry instance

    $ ./mvnw schema-registry:register -Plocal-docker-compose
    [INFO] Scanning for projects...
    [INFO]
    [INFO] -------------< com.github.timtebeek:register-avro-schemas >-------------
    [INFO] Building register-avro-schemas 0.0.1-SNAPSHOT
    [INFO] --------------------------------[ jar ]---------------------------------
    [INFO]
    [INFO] --- kafka-schema-registry-maven-plugin:6.1.1:register (default-cli) @ register-avro-schemas ---
    [INFO] Registered subject(com.github.timtebeek.avro.Price) with id 1 version 1
    [INFO] Registered subject(books-topic-value) with id 2 version 1
    [INFO] Registered subject(orders-topic-value) with id 3 version 1
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
  3. Make any desired schema changes

  4. Validate your schema changes are free of syntax errors

    $ ./mvnw schema-registry:validate -Plocal-docker-compose
    [INFO] -------------< com.github.timtebeek:register-avro-schemas >-------------
    [INFO] Building register-avro-schemas 0.0.1-SNAPSHOT
    [INFO] --------------------------------[ jar ]---------------------------------
    [INFO]
    [INFO] --- kafka-schema-registry-maven-plugin:6.1.1:validate (default-cli) @ register-avro-schemas ---
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
  5. Test compatibility of your schema changes

    $ ./mvnw schema-registry:test-compatibility -Plocal-docker-compose
    [INFO] -------------< com.github.timtebeek:register-avro-schemas >-------------
    [INFO] Building register-avro-schemas 0.0.1-SNAPSHOT
    [INFO] --------------------------------[ jar ]---------------------------------
    [INFO]
    [INFO] --- kafka-schema-registry-maven-plugin:6.1.1:test-compatibility (default-cli) @ register-avro-schemas ---
    [INFO] Schema /home/tim/Documents/workspace/register-avro-schemas/src/main/avro/com/github/timtebeek/avro/Price.avsc is compatible with subject(com.github.timtebeek.avro.Price)
    [INFO] Schema /home/tim/Documents/workspace/register-avro-schemas/src/main/avro/com/github/timtebeek/avro/Book.avsc is compatible with subject(books-topic-value)
    [INFO] Schema /home/tim/Documents/workspace/register-avro-schemas/src/main/avro/com/github/timtebeek/avro/Order.avsc is compatible with subject(orders-topic-value)
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
  6. Optionally explore the schemas in AKHQ at http://localhost:8080/ui/docker-kafka-server/schema

  7. Clean up the local Schema Registry and attached volumes

    $ docker-compose down --volumes

Notice how we use the Maven profile local-docker-compose with an alternate schema registry URL. You can define additional profiles to similarly switch between instances if needed.

Push to production

Now we’re ready for the final step: Registering our Avro Schemas at the production Schema Registry instance. Most production environments won’t allow you to connect from your local machine to your production schema registry instance. We therefore copy Maven, our schemas, and any dependencies needed into a Docker image that runs the plugin.

Listing 8. Dockerfile
FROM openjdk:16
WORKDIR /app
COPY mvnw mvnw
COPY .mvn/ /app/.mvn/
COPY pom.xml /app/
RUN ./mvnw dependency:go-offline
COPY src/main/ /app/src/main/
ENTRYPOINT ./mvnw --batch-mode schema-registry:register

When this docker image is built and deployed to a production environment, it will register the contained Avro schemas at the configured Schema Registry URL. This ensures you have a repeatable and safe way to push your Avro Schemas to production, and you no longer have to rely on auto registration from your producers at runtime.

shadow-left