Implementing The Outbox-Pattern With Kafka – Part 1: Writing In The Outbox-Table

This article is part of a Blog-Series

Based on a very simple example-project
we will implemnt the Outbox-Pattern with Kafka.

TL;DR

In this part, we will implement the outbox (aka: the queueing of the messages in a database-table).

The Outbox Table

The outbox is represented by an additionall table in the database.
This table acts as a queue for messages, that should be send as part of the transaction.
Instead of sending the messages, the application stores them in the outbox-table.
The actual sending of the messages occures outside of the transaction.

Because the messages are read from the table outside of the transaction context, only entries related to sucessfully commited transactions are visible.
Hence, the sending of the message effectively becomes a part of the transaction.
It happens only, if the transaction was successfully completed.
Messages associated to an aborted transaction will not be send.

The Implementation

No special measures need to be taken when writing the messages to the table.
The only thing to be sure of is that the writing takes part in the transaction.

In our implementation, we simply store the serialized message, together with a key, that is needed for the partitioning of your data in Kafka, in case the order of the messages is important.
We also store a timestamp, that we plan to record as Event Time later.

One more thing that is worth noting is that we utilize the database to create an unique record-ID.
The generated unique and monotonically increasing id is required later, for the implementation of Exactly-Once semantics.

The SQL for the table looks like this:

CREATE TABLE outbox (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  key VARCHAR(127),
  value varchar(1023),
  issued timestamp
);

Decoupling The Business Logic

In order to decouple the business logic from the implementation of the messaging mechanism, I have implemented a thin layer, that uses Spring Application Events to publish the messages.

Messages are send as a subclass of ApplicationEvent:

publisher.publishEvent(
  new UserEvent(
    this,
    username,
    CREATED,
    ZonedDateTime.now(clock)));

The event takes a key (username) and an object as value (an instance of an enum in our case).
An EventListener receives the events and writes them in the outbox table:

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void onUserEvent(OutboxEvent event)
{
  try
  {
    repository.save(
        event.getKey(),
        mapper.writeValueAsString(event.getValue()),
        event.getTime());
  }
  catch (JsonProcessingException e)
  {
    throw new RuntimeException(e);
  }
}

The @TransactionalEventListener is not really needed here.
A normal EventListener would also suffice, because spring immediately executes all registered normal event listeners.
Therefore, the registered listeners would run in the same thread, that published the event, and participate in the existing transaction.

But if a @TransactionalEventListener is used, like in our example project, it is crucial, that the phase is switched to BEFORE_COMMIT when the Outbox Pattern is introduced.
This is, because the listener has to be executed in the same transaction context in which the event was published.
Otherwise, the writing of the messages would not be coupled to the success or abortion of the transaction, thus violating the idea of the pattern.

May The Source Be With You!

Since this part of the implementation only stores the messages in a normal database, it can be published as an independent component that does not require any dependencies on Kafka.
To highlight this, the implementation of this step does not use Kafka at all.
In a later step, we will separate the layer, that decouples the business code from our messaging logic in a separate package.

The complete source code of the example-project can be cloned here:

This version only includes the logic, that is needed to fill the outbox-tabel.
Reading the messages from this table and sending them through Kafka will be the topic of the next part of this blog-series.

The sources include a Setup for Docker Compose, that can be run without compiling
the project. And a runnable README.sh, that compiles and run the application and illustrates the example.

Implementing The Outbox-Pattern With Kafka – Part 0: The example

This article is part of a Blog-Series

Based on a very simple example-project
we will implemnt the Outbox-Pattern with Kafka.

TL;DR

In this part, a small example-project is introduced, that features a component, which has to inform another component upon every succsessfully completed operation.

The Plan

In this mini-series I will implement the Outbox-Pattern
as described on Chris Richardson’s fabolous website microservices.io.

The pattern enables you, to send a message as part of a database transaction in a reliable way, effectively turining the writing of the data
to the database and the sending of the message into an atomic operation:
either both operations are successful or neither.

The pattern is well known and implementing it with Kafka looks like an easy straight forward job at first glance.
However, there are many obstacles that easily lead to an incomplete or incorrect implementation.
In this blog-series, we will circumnavigate these obstacles together step by step.

The Example Project

To illustrate our implementation, we will use a simple example-project.
It mimics a part of the registration process for an web application:
a (very!) simplistic service takes registration orders for new users.

  • Successfull registration requests will return a 201 (Created), that carries the URI, under which the data of the newly registered user can be accessed in the Location-header:


    echo peter | http :8080/users
    
    HTTP/1.1 201 
    Content-Length: 0
    Date: Fri, 05 Feb 2021 14:44:51 GMT
    Location: http://localhost:8080/users/peter
    

  • Requests to registrate an already existing user will result in a 400 (Bad Request):

    echo peter | http :8080/users
    
    HTTP/1.1 400 
    Connection: close
    Content-Length: 0
    Date: Fri, 05 Feb 2021 14:44:53 GMT
    

  • Successfully registrated users can be listed:

    http :8080/users
    
    HTTP/1.1 200 
    Content-Type: application/json;charset=UTF-8
    Date: Fri, 05 Feb 2021 14:53:59 GMT
    Transfer-Encoding: chunked
    
    [
        {
            "created": "2021-02-05T10:38:32.301",
            "loggedIn": false,
            "username": "peter"
        },
        ...
    ]
    

The Messaging Use-Case

As our messaging use-case imagine, that there has to happen several processes after a successful registration of a new user.
This may be the generation of an invoice, some business analytics or any other lengthy process that is best carried out asynchronously.
Hence, we have to generate an event, that informs the responsible services about new registrations.

Obviously, these events should only be generated, if the registration is completed successfully.
The event must not be fired, if the registration is rejected, because a duplicate username.

On the other hand, the publication of the event must happen reliably, because otherwise, the new might not be charged for the services, we offer…

The Transaction

The users are stored in a database and the creation of a new user happens in a transaction.
A “brilliant” colleague came up with the idea, to trigger an IncorrectResultSizeDataAccessException to detect duplicate usernames:

User user = new User(username);
repository.save(user);
// Triggers an Exception, if more than one entry is found
repository.findByUsername(username);

The query for the user by its names triggers an IncorrectResultSizeDataAccessException, if more than one entry is found.
The uncaught exception will mark the transaction for rollback, hence, canceling the requested registration.
The 400-response is then generated by a corresponding ExceptionHandler:

@ExceptionHandler
public ResponseEntity incorrectResultSizeDataAccessException(
    IncorrectResultSizeDataAccessException e)
{
  LOG.info("User already exists!");
  return ResponseEntity.badRequest().build();
}

Please do not code this at home…

But his weired implementation perfectly illustrates the requirements for our messaging use-case:
The user is written into the database.
But the registration is not successfully completed until the transaction is commited.
If the transaction is rolled back, no message must be send, because no new user was registered.

Decoupling with Springs EventPublisher

In the example implementation I am using an EventPublisher to decouple the business logic from the implementation of the messaging.
The controller publishes an event, when a new user is registered:

publisher.publishEvent(new UserEvent(this, usernam));

A listener annotated with @TransactionalEventListener receives the events and handles the messaging:

@TransactionalEventListener
public void onUserEvent(UserEvent event)
{
    // Sending the message happens here...
}

In non-critical use-cases, it might be sufficient to actually send the message to Kafka right here.
Spring ensures, that the message of the listener is only called, if the transaction completes successfully.
But in the case of a failure this naive implementation can loose messages.
If the application crashes, after the transaction has completed, but before the message could be send, the event would be lost.

In the following blog posts, we will step by step implement a solution based on the Outbox-Pattern, that can guarantee Exactly-Once semantics for the send messages.

May The Source Be With You!

The complete source code of the example-project can be cloned here:

It includes a Setup for Docker Compose, that can be run without compiling
the project. And a runnable README.sh, that compiles and run the application and illustrates the example.