Implementing The Outbox-Pattern With Kafka – Part 1: Writing In The Outbox-Table

This article is part of a Blog-Series

Based on a very simple example-project
we will implemnt the Outbox-Pattern with Kafka.

TL;DR

In this part, we will implement the outbox (aka: the queueing of the messages in a database-table).

The Outbox Table

The outbox is represented by an additionall table in the database.
This table acts as a queue for messages, that should be send as part of the transaction.
Instead of sending the messages, the application stores them in the outbox-table.
The actual sending of the messages occures outside of the transaction.

Because the messages are read from the table outside of the transaction context, only entries related to sucessfully commited transactions are visible.
Hence, the sending of the message effectively becomes a part of the transaction.
It happens only, if the transaction was successfully completed.
Messages associated to an aborted transaction will not be send.

The Implementation

No special measures need to be taken when writing the messages to the table.
The only thing to be sure of is that the writing takes part in the transaction.

In our implementation, we simply store the serialized message, together with a key, that is needed for the partitioning of your data in Kafka, in case the order of the messages is important.
We also store a timestamp, that we plan to record as Event Time later.

One more thing that is worth noting is that we utilize the database to create an unique record-ID.
The generated unique and monotonically increasing id is required later, for the implementation of Exactly-Once semantics.

The SQL for the table looks like this:

CREATE TABLE outbox (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  key VARCHAR(127),
  value varchar(1023),
  issued timestamp
);

Decoupling The Business Logic

In order to decouple the business logic from the implementation of the messaging mechanism, I have implemented a thin layer, that uses Spring Application Events to publish the messages.

Messages are send as a subclass of ApplicationEvent:

publisher.publishEvent(
  new UserEvent(
    this,
    username,
    CREATED,
    ZonedDateTime.now(clock)));

The event takes a key (username) and an object as value (an instance of an enum in our case).
An EventListener receives the events and writes them in the outbox table:

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void onUserEvent(OutboxEvent event)
{
  try
  {
    repository.save(
        event.getKey(),
        mapper.writeValueAsString(event.getValue()),
        event.getTime());
  }
  catch (JsonProcessingException e)
  {
    throw new RuntimeException(e);
  }
}

The @TransactionalEventListener is not really needed here.
A normal EventListener would also suffice, because spring immediately executes all registered normal event listeners.
Therefore, the registered listeners would run in the same thread, that published the event, and participate in the existing transaction.

But if a @TransactionalEventListener is used, like in our example project, it is crucial, that the phase is switched to BEFORE_COMMIT when the Outbox Pattern is introduced.
This is, because the listener has to be executed in the same transaction context in which the event was published.
Otherwise, the writing of the messages would not be coupled to the success or abortion of the transaction, thus violating the idea of the pattern.

May The Source Be With You!

Since this part of the implementation only stores the messages in a normal database, it can be published as an independent component that does not require any dependencies on Kafka.
To highlight this, the implementation of this step does not use Kafka at all.
In a later step, we will separate the layer, that decouples the business code from our messaging logic in a separate package.

The complete source code of the example-project can be cloned here:

This version only includes the logic, that is needed to fill the outbox-tabel.
Reading the messages from this table and sending them through Kafka will be the topic of the next part of this blog-series.

The sources include a Setup for Docker Compose, that can be run without compiling
the project. And a runnable README.sh, that compiles and run the application and illustrates the example.

Implementing The Outbox-Pattern With Kafka – Part 0: The example

This article is part of a Blog-Series

Based on a very simple example-project
we will implemnt the Outbox-Pattern with Kafka.

TL;DR

In this part, a small example-project is introduced, that features a component, which has to inform another component upon every succsessfully completed operation.

The Plan

In this mini-series I will implement the Outbox-Pattern
as described on Chris Richardson’s fabolous website microservices.io.

The pattern enables you, to send a message as part of a database transaction in a reliable way, effectively turining the writing of the data
to the database and the sending of the message into an atomic operation:
either both operations are successful or neither.

The pattern is well known and implementing it with Kafka looks like an easy straight forward job at first glance.
However, there are many obstacles that easily lead to an incomplete or incorrect implementation.
In this blog-series, we will circumnavigate these obstacles together step by step.

The Example Project

To illustrate our implementation, we will use a simple example-project.
It mimics a part of the registration process for an web application:
a (very!) simplistic service takes registration orders for new users.

  • Successfull registration requests will return a 201 (Created), that carries the URI, under which the data of the newly registered user can be accessed in the Location-header:


    echo peter | http :8080/users
    
    HTTP/1.1 201 
    Content-Length: 0
    Date: Fri, 05 Feb 2021 14:44:51 GMT
    Location: http://localhost:8080/users/peter
    

  • Requests to registrate an already existing user will result in a 400 (Bad Request):

    echo peter | http :8080/users
    
    HTTP/1.1 400 
    Connection: close
    Content-Length: 0
    Date: Fri, 05 Feb 2021 14:44:53 GMT
    

  • Successfully registrated users can be listed:

    http :8080/users
    
    HTTP/1.1 200 
    Content-Type: application/json;charset=UTF-8
    Date: Fri, 05 Feb 2021 14:53:59 GMT
    Transfer-Encoding: chunked
    
    [
        {
            "created": "2021-02-05T10:38:32.301",
            "loggedIn": false,
            "username": "peter"
        },
        ...
    ]
    

The Messaging Use-Case

As our messaging use-case imagine, that there has to happen several processes after a successful registration of a new user.
This may be the generation of an invoice, some business analytics or any other lengthy process that is best carried out asynchronously.
Hence, we have to generate an event, that informs the responsible services about new registrations.

Obviously, these events should only be generated, if the registration is completed successfully.
The event must not be fired, if the registration is rejected, because a duplicate username.

On the other hand, the publication of the event must happen reliably, because otherwise, the new might not be charged for the services, we offer…

The Transaction

The users are stored in a database and the creation of a new user happens in a transaction.
A “brilliant” colleague came up with the idea, to trigger an IncorrectResultSizeDataAccessException to detect duplicate usernames:

User user = new User(username);
repository.save(user);
// Triggers an Exception, if more than one entry is found
repository.findByUsername(username);

The query for the user by its names triggers an IncorrectResultSizeDataAccessException, if more than one entry is found.
The uncaught exception will mark the transaction for rollback, hence, canceling the requested registration.
The 400-response is then generated by a corresponding ExceptionHandler:

@ExceptionHandler
public ResponseEntity incorrectResultSizeDataAccessException(
    IncorrectResultSizeDataAccessException e)
{
  LOG.info("User already exists!");
  return ResponseEntity.badRequest().build();
}

Please do not code this at home…

But his weired implementation perfectly illustrates the requirements for our messaging use-case:
The user is written into the database.
But the registration is not successfully completed until the transaction is commited.
If the transaction is rolled back, no message must be send, because no new user was registered.

Decoupling with Springs EventPublisher

In the example implementation I am using an EventPublisher to decouple the business logic from the implementation of the messaging.
The controller publishes an event, when a new user is registered:

publisher.publishEvent(new UserEvent(this, usernam));

A listener annotated with @TransactionalEventListener receives the events and handles the messaging:

@TransactionalEventListener
public void onUserEvent(UserEvent event)
{
    // Sending the message happens here...
}

In non-critical use-cases, it might be sufficient to actually send the message to Kafka right here.
Spring ensures, that the message of the listener is only called, if the transaction completes successfully.
But in the case of a failure this naive implementation can loose messages.
If the application crashes, after the transaction has completed, but before the message could be send, the event would be lost.

In the following blog posts, we will step by step implement a solution based on the Outbox-Pattern, that can guarantee Exactly-Once semantics for the send messages.

May The Source Be With You!

The complete source code of the example-project can be cloned here:

It includes a Setup for Docker Compose, that can be run without compiling
the project. And a runnable README.sh, that compiles and run the application and illustrates the example.

How To Instantiatiate Multiple Beans Dinamically in Spring-Boot Depending on Configuration-Properties

TL;DR

In this mini-HowTo I will show a way, how to instantiate multiple beans dinamically in Spring-Boot, depending on configuration-properties.
We will:

  • write a ApplicationContextInitializer to add the beans to the context, before it is refreshed
  • write a EnvironmentPostProcessor to access the configured configuration sources
  • register the EnvironmentPostProcessor with Spring-Boot

Write an ApplicationContextInitializer

Additionally Beans can be added programatically very easy with the help of an ApplicationContextInitializer:

@AllArgsConstructor
public class MultipleBeansApplicationContextInitializer
    implements
      ApplicationContextInitializer
{
  private final String[] sites;

  @Override
  public void initialize(ConfigurableApplicationContext context)
  {
    ConfigurableListableBeanFactory factory =
        context.getBeanFactory();
    for (String site : sites)
    {
      SiteController controller =
          new SiteController(site, "Descrition of site " + site);
      factory.registerSingleton("/" + site, controller);
    }
  }
}

This simplified example is configured with a list of strings that should be registered as controllers with the DispatcherServlet.
All “sites” are insances of the same controller SiteController, which are instanciated and registered dynamically.

The instances are registered as beans with the method registerSingleton(String name, Object bean)
of a ConfigurableListableBeanFactory that can be accessed through the provided ConfigurableApplicationContext

The array of strings represents the accessed configuration properties in the simplified example.
The array will most probably hold more complex data-structures in a real-world application.

But how do we get access to the configuration-parameters, that are injected in this array here…?

Accessing the Configured Property-Sources

Instantiating and registering the additionally beans is easy.
The real problem is to access the configuration properties in the early plumbing-stage of the application-context, in that our ApplicationContextInitializer runs in:

The initializer cannot be instantiated and autowired by Spring!

The Bad News: In the early stage we are running in, we cannot use autowiring or access any of the other beans that will be instantiated by spring – especially not any of the beans, that are instantiated via @ConfigurationProperties, we are intrested in.

The Good News: We will present a way, how to access initialized instances of all property sources, that will be presented to your app

Write an EnvironmentPostProcessor

If you write an EnvironmentPostProcessor, you will get access to an instance of ConfigurableEnvironment, that contains a complete list of all PropertySource‘s, that are configured for your Spring-Boot-App.

public class MultipleBeansEnvironmentPostProcessor
    implements
      EnvironmentPostProcessor
{
  @Override
  public void postProcessEnvironment(
      ConfigurableEnvironment environment,
      SpringApplication application)
  {
    String sites =
        environment.getRequiredProperty("juplo.sites", String.class);

    application.addInitializers(
        new MultipleBeansApplicationContextInitializer(
            Arrays
                .stream(sites.split(","))
                .map(site -> site.trim())
                .toArray(size -> new String[size])));
  }
}

The Bad News:
Unfortunately, you have to scan all property-sources for the parameters, that you are interested in.
Also, all values are represented as stings in this early startup-phase of the application-context, because Spring’s convenient conversion mechanisms are not available yet.
So, you have to convert any values by yourself and stuff them in more complex data-structures as needed.

The Good News:
The property names are consistently represented in standard Java-Properties-Notation, regardless of the actual type (.properties / .yml) of the property source.

Register the EnvironmentPostProcessor

Finally, you have to register the EnvironmentPostProcessor with your Spring-Boot-App.
This is done in the META-INF/spring.factories:

org.springframework.boot.env.EnvironmentPostProcessor=\
  de.juplo.demos.multiplebeans.MultipleBeansEnvironmentPostProcessor

That’s it, your done!

Source Code

You can find the whole source code in a working mini-application on juplo.de and GitHub:

Other Blog-Posts On The Topic

  • The blog-post Dynamic Beans in Spring shows a way to register beans dynamically, but does not show how to access the configuration. Also, meanwhile another interface was added to spring, that facilitates this approach: BeanDefinitionRegistryPostProcessor
  • Benjamin shows in How To Create Your Own Dynamic Bean Definitions In Spring, how this interface can be applied and how one can access the configuration. But his example only works with plain Spring in a Servlet Container

Deduplicating Partitioned Data With a Kafka Streams ValueTransformer

Inspired by a current customer project and this article about
deduplicating events with Kafka Streams
I want to share a simple but powerful implementation of a deduplication mechanism, that works well for partitioned data and does not suffer of memory leaks, because a countless number of message-keys has to be stored.

Yet, the presented approach does not work for all use-cases, because it presumes, that a strictly monotonically increasing sequence numbering can be established across all messages – at least concerning all messages, that are routed to the same partition.

The Problem

A source produces messages, with reliably unique ID’s.
From time to time, sending these messages to Kafka may fail.
The order, in which these messages are send, is crucial with respect to the incedent, they belong to.
Resending the messages in correct order after a failure (or downtime) is no problem.
But some of the messages may be send twice (or more often), because the producer does not know exactly, which messages were send successful.

Incident A - { id: 1,  data: "ab583cc8f8" }
Incident B - { id: 2,  data: "83ccc8f8f8" }
Incident C - { id: 3,  data: "115tab5b58" }
Incident C - { id: 4,  data: "83caac564b" }
Incident B - { id: 5,  data: "a583ccc8f8" }
Incident A - { id: 6,  data: "8f8bc8f890" }
Incident A - { id: 7,  data: "07583ab583" }

<< DOWNTIME OR FAILURE >>

Incident C - { id: 4,  data: "83caac564b" }
Incident B - { id: 5,  data: "a583ccc8f8" }
Incident A - { id: 6,  data: "8f8bc8f890" }
Incident A - { id: 7,  data: "07583ab583" }
Incident A - { id: 8,  data: "930fce58f3" }
Incident B - { id: 9,  data: "7583ab93ab" }
Incident C - { id: 10, data: "7583aab583" }
Incident B - { id: 11, data: "b583075830" }

Since eache message has a unique ID, all messages are inherently idempotent:
Deduplication is no problem, if the receiver keeps track of the messages, he has already seen.

Where is the problem?, you may ask. That’s trivial, I just code the deduplication into my consumer!

But this approach has several drawbacks, including:

  • Implementing the trivial algorithm described above is not efficent, since the algorithm in general has to remember the IDs of all messages for an indefinit period of time.
  • Implementing the algorithm over and over again for every consumer is cumbersome and errorprone.

Wouldn’t it be much nicer, if we had an efficient and bulletproof algorithm, that we can simply plug into our Kafka-pipelines?

The Idea

In his blog-article
Jaroslaw Kijanowski describes three deduplication algorithms.
The first does not scale well, because it does only work for single-partition topics.
The third aims at a slightly different problem and might fail deduplicating some messages, if the timing is not tuned correctly.
The looks like a robust solution.
But it also looks a bit hacky and is unnecessary complex in my opinion.

Playing around with his ideas, i have come up with the following algorithm, that combines elements of all three solutions:

  • All messages are keyed by an ID that represents the incident – not the message.
    This guarantees, that all messages concerning a specific incident will be stored in the same partition, so that their ordering is retained.
  • We generate unique strictly monotonically increasing sequence numbers, that are assigned to each message.
    If the IDs of the messages fullfill these requirements and are stored in the value (like above), they can be reused as sequence numbers
  • We keep track of the sequence number last seen for each partition.
  • We drop all messages with sequnce numbers, that are not greater than the last sequence number, that we saw on that partition.

The algorithm uses the well known approach, that TCP/IP uses to detect and drop duplicate packages.
It is efficient, since we never have to store more sequence numbers, than partitions, that we are handling.
The algorithm can be implemented easily based on a ValueTransformer, because Kafka Streams provides the ability to store state locally.

A simplified example-implementation

To clearify the idea, I further simplified the problem for the example implementation:

  • Key and value of the messages are of type String, for easy scripting.
  • In the example implementation, person-names take the part of the ID of the incident, that acts out as message-key.
  • The value of the message solely consists of the sequence number.

    In a real-world use-case, the sequence number would be stored in the message-value and would have to be extracted from there.
    Or it would be stored as a message-header.

That is, our message stream is simply a mapping from names to unique sequence numbers and we want to be able to separate out the contained sequence for a single person, without duplicate entries and without jeopardizing the order of that sequence.

In this simplified setup, the implementation effectively boils down to the following method-override:

@Override
public Iterable<String> transform(String value)
{
  Integer partition = context.partition();
  long sequenceNumber = Long.parseLong(value);

  Long seen = store.get(partition);
  if (seen == null || seen < sequenceNumber)
  {
    store.put(partition, sequenceNumber);
    return Arrays.asList(value);
  }

  return Collections.emptyList();
}

  • We can get the active partition from the ProcessorContext, that is handed to our Instance in the constructor, which is not shown here for brevity.
  • Parsing the String-value of the message as long corresponds to the extraction of the sequence number from the value of the message in our simplified setup.
  • We then check the local state, if a sequence-number was already seen for the active partition.

    Kafka Streams takes care of the initialization and resurection of the local state.
    Take a look at the full source-code see, how we instruct Kafka Streams to do so.
  • If this is the first sequence-number, that we see for this partition, or if the sequence-number is greater (that is: newer) than the stored one, we store it in our local state and return the value of the message, because it was seen for the first time.
  • Otherwise, we instruct Kafka Streams to drop the current (duplicate!) value, by returning an empty array.

We can use our ValueTransformer with flatTransformValues(),
to let Kafka Streams drop the detected duplicate values:

streamsBuilder
    .stream("input")
    .flatTransformValues(
        new ValueTransformerSupplier()
        {
          @Override
          public ValueTransformer get()
          {
            return new DeduplicationTransformer();
          }
        },
        "SequenceNumbers")
    .to("output");

One has to register an appropriate store to the StreamsBuilder under the referenced name.

The full source is available on github.com

Recapping Our Assumptions…

The presented deduplication algorithm presumes some assumptions, that may not fit your use-case.
It is crucial, that these prerequisites are not violated.
Therefor, I will spell them out once more:

  1. We can generate unique strictly monotonically increasing sequence numbers for all messages (of a partition).
  2. We have a strict ordering of all messages (per partition).
  3. And hence, since we want to handle more than one partition:
    The data is partitioned by key.
    That is, all messages for a specific key must always be routed to the same partition.

As a conclusion of this assumptions, we have to note:
We can only deduplicate messages, that are routed to the same partition.
This follows, because we can only guarantee message-order per partition. But it should not be a problem for the same reason:
We assume a use-case, where all messages concerning a specific incident are captured in the same partition.

What is not needed – but also does not hurt

Since we are only deduplicating messages, that are routed to the same partition, we do not need globally unique sequence numbers.
Our sequence numbers only have to be unique per partition, to enable us to detect, that we have seen a specific message before on that partition.
Golbally unique sequence numbers clearly are a stronger condition:
It does not hurt, if the sequence numbers are globally unique, because they are always unique per partition, if they are also globally unique.

We detect unseen messages, by the fact that their sequence number is greater than the last stored hight watermark for the partition, they are routed to.
Hence, we do not rely on a seamless numbering without gaps.
It does not hurt, if the series of sequence numbers does not have any gaps, as long as two different messages on the same partition never are assigned to the same sequence number.

That said, it should be clear, that a globally unique seamless numbering of all messages across all partitions – as in our simple example-implementation – does fit well with our approach, because the numbering is still unique, if one only considers the messages in one partition, and the gaps in the numbering, that are introduced by focusing only on the messages of a single partition, are not violating our assumptions.

Pointless / Contradictorily Usage Of The Presented Approach

Last but not least, I want to point out, that this approach silently assumes, that the sequence number of the message is not identically to the key of the message.
On the contrary: The sequence number is expected to be different from the key of the message!

If one would use the key of the message as its sequence number (provided that it is unique and represents a strictly increasing sequence of numbers), one would indeed assure, that all duplicates can be detected, but he would at once force the implementation to be indifferent, concerning the order of the messages.

That is, because subsequent messages are forced to have different keys, because all messages are required to have unique sequence numbers.
But messages with different keys may be routed to different partitions – and Kafka can only guarantee message ordering for messages, that live on the same partition.
Hence, one has to assume, that the order in which the messages are send is not retained, if he uses the message-keys as sequence numbers – unless, only one partition is utilized, which is contradictory to our primary goal here: enabling scalability through data-sharding.

This is also true, if the key of a message contains an invariant ID and only embeds the changing sequence number.
Because, the default partitioning algorithm always considers the key as a whole, and if any part of it changes, the outcome of the algorithm might change.

In a production-ready implementation of the presented approach, I would advice, to store the sequence number in a message header, or provide a configurable extractor, that can derive the sequence number from the contents of the value of the message.
It would be perfectly o.k., if the IDs of the messages are used as sequence numbers, as long as they are unique and monotonically increasing and are stored in the value of the message – not in / as the key!