Added a simple implementation of the polling outbox pattern
authorKai Moritz <kai@juplo.de>
Sun, 16 May 2021 15:20:27 +0000 (17:20 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 16 May 2021 15:48:20 +0000 (17:48 +0200)
14 files changed:
README.md
README.sh
docker-compose.yml
outbox/.dockerignore [new file with mode: 0644]
outbox/Dockerfile [new file with mode: 0644]
outbox/pom.xml [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/Application.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java [new file with mode: 0644]
outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java [new file with mode: 0644]
outbox/src/main/resources/application.yml [new file with mode: 0644]
outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java [new file with mode: 0644]
pom.xml

index 2748ed9..2c61a4e 100644 (file)
--- a/README.md
+++ b/README.md
@@ -9,25 +9,25 @@ Execute [README.sh](README.sh) in a shell to demonstrate the example:
 
 The script will...
 
-* compile the component,
-* package it as Docker-Images,
-* start up the component and a PostreSQL as containers in a [Compose-Setup](docker-compose.yml),
+* compile the two components,
+* package them as Docker-Images,
+* start up the components and a minimal Kafka Cluster as containers in a [Compose-Setup](docker-compose.yml),
 * execute example-queries (CREATE / DELETE) against the API of [the example-project](https://juplo.de/implementing-the-outbox-pattern-with-kafka-part-0-the-example/) and
-* tail the logs of the containers `jdbc` to show what is going on.
+* tail the logs of the containers `jdbc` and `kafkacat` to show what is going on.
 
 You can verify the expected outcome of the demonstration by running a command like the following:
 
-    $ docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0  outbox | grep peter
-      1 | peter1  | "CREATED" | 2021-05-16 13:20:36.849
-     10 | peter2  | "CREATED" | 2021-05-16 13:20:42.141
-     19 | peter3  | "CREATED" | 2021-05-16 13:20:47.136
-     28 | peter4  | "CREATED" | 2021-05-16 13:20:52.087
-     37 | peter5  | "CREATED" | 2021-05-16 13:20:57.512
-     46 | peter6  | "CREATED" | 2021-05-16 13:21:02.493
-     55 | peter7  | "CREATED" | 2021-05-16 13:21:07.503
+    $ docker-compose logs kafkacat | grep peter
+    kafkacat_1   | peter1:"CREATED"
+    kafkacat_1   | peter2:"CREATED"
+    kafkacat_1   | peter3:"CREATED"
+    kafkacat_1   | peter4:"CREATED"
+    kafkacat_1   | peter5:"CREATED"
+    kafkacat_1   | peter6:"CREATED"
+    kafkacat_1   | peter7:"CREATED"
     $
 
-The example-output shows, that the CREATE-event for users with "peter" in their username are only stored exactly once in the outbox-table, although the script issues several requests for each of these users.
+The example-output shows, that the CREATE-event for users with "peter" in their username are only issued exactly once, although the script issues several requests for each of these users.
 
 Be aware, that the outcome of the script will be different, if you run it several times.
 In order to reproduce the same behaviour, you have to shut down the Compose-Setup before rerunning the script:
index 89aa269..c9f133c 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -5,11 +5,15 @@ then
   docker-compose down -v
   mvn clean
   docker image rm juplo/data-jdbc:polling-outbox-2-SNAPSHOT
+  docker image rm juplo/polling-outbox:polling-outbox-2-SNAPSHOT
   exit
 fi
 
+docker-compose up -d zookeeper kafka
+
 if [[
   $(docker image ls -q juplo/data-jdbc:polling-outbox-2-SNAPSHOT) == "" ||
+  $(docker image ls -q juplo/polling-outbox:polling-outbox-2-SNAPSHOT) == "" ||
   "$1" = "build"
 ]]
 then
@@ -17,9 +21,19 @@ then
 else
   echo "Using image existing images:"
   docker image ls juplo/data-jdbc:polling-outbox-2-SNAPSHOT
+  docker image ls juplo/polling-outbox:polling-outbox-2-SNAPSHOT
 fi
 
-docker-compose up -d jdbc
+while ! [[ $(docker-compose exec kafka zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
+do
+  echo "Waiting for kafka...";
+  sleep 1;
+done
+
+docker-compose exec kafka kafka-topics --zookeeper zookeeper:2181 --create --if-not-exists --replication-factor 1 --partitions 3 --topic outbox
+
+
+docker-compose up -d jdbc outbox kafkacat
 
 while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .status) == "UP" ]];
 do
@@ -28,7 +42,7 @@ do
 done
 
 
-docker-compose logs --tail=0 -f jdbc &
+docker-compose logs --tail=0 -f jdbc kafkacat &
 
 for i in `seq 1 7`;
 do
@@ -49,4 +63,5 @@ do
 done;
 
 docker-compose exec postgres psql -Uoutbox -c'SELECT * FROM outbox;' -Ppager=0  outbox
-docker-compose stop
+# "kill" the executions of "docker-compose logs ..."
+docker-compose stop jdbc kafkacat
index d04c665..3600d5f 100644 (file)
@@ -2,6 +2,33 @@ version: "3"
 
 services:
 
+  zookeeper:
+    image: confluentinc/cp-zookeeper:6.0.1
+    ports:
+      - 2181:2181
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: confluentinc/cp-kafka:6.0.1
+    ports:
+      - 9092:9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
+      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
+      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+    depends_on:
+      - zookeeper
+
+  kafkacat:
+    image: confluentinc/cp-kafkacat:6.0.1
+    command: "kafkacat -C -b kafka:9093 -q -t outbox -K:"
+    tty: true
+    depends_on:
+      - kafka
+
   jdbc:
     image: juplo/data-jdbc:polling-outbox-2-SNAPSHOT
     ports:
@@ -11,6 +38,14 @@ services:
     depends_on:
       - postgres
 
+  outbox:
+    image: juplo/polling-outbox:polling-outbox-2-SNAPSHOT
+    environment:
+      spring.profiles.active: prod
+    depends_on:
+      - postgres
+      - kafka
+
 
   postgres:
     image: postgres:13
diff --git a/outbox/.dockerignore b/outbox/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/outbox/Dockerfile b/outbox/Dockerfile
new file mode 100644 (file)
index 0000000..16ee25e
--- /dev/null
@@ -0,0 +1,5 @@
+FROM openjdk:11-jre
+VOLUME /tmp
+COPY target/*.jar /opt/app.jar
+ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ]
+CMD []
diff --git a/outbox/pom.xml b/outbox/pom.xml
new file mode 100644 (file)
index 0000000..2d57a06
--- /dev/null
@@ -0,0 +1,89 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project
+    xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.3.2.RELEASE</version>
+    <relativePath/> <!-- lookup parent from repository -->
+  </parent>
+
+  <groupId>de.juplo.kafka.outbox</groupId>
+  <artifactId>polling-outbox</artifactId>
+  <version>polling-outbox-2-SNAPSHOT</version>
+  <name>Outbox (Polling)</name>
+  <description>Simple example-implementation of the Outbox-Pattern (polling variant)</description>
+
+  <properties>
+    <java.version>11</java.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-data-jdbc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-json</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.postgresql</groupId>
+      <artifactId>postgresql</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.h2database</groupId>
+      <artifactId>h2</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>io.fabric8</groupId>
+        <artifactId>docker-maven-plugin</artifactId>
+        <version>0.33.0</version>
+        <configuration>
+          <images>
+            <image>
+              <name>juplo/%a:%v</name>
+            </image>
+          </images>
+        </configuration>
+        <executions>
+          <execution>
+             <id>build</id>
+             <phase>package</phase>
+             <goals>
+               <goal>build</goal>
+             </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/Application.java b/outbox/src/main/java/de/juplo/kafka/outbox/Application.java
new file mode 100644 (file)
index 0000000..a63d714
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.outbox;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+
+import java.util.concurrent.CountDownLatch;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
+public class Application
+{
+  public static void main(String[] args) throws Exception
+  {
+    SpringApplication.run(Application.class, args);
+
+    final CountDownLatch closeLatch = new CountDownLatch(1);
+
+    Runtime
+        .getRuntime()
+        .addShutdownHook(new Thread()
+        {
+          @Override
+          public void run()
+          {
+            log.info("Closing application...");
+            closeLatch.countDown();
+          }
+        });
+
+    closeLatch.await();
+  }
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java b/outbox/src/main/java/de/juplo/kafka/outbox/ApplicationProperties.java
new file mode 100644 (file)
index 0000000..1a5dca6
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.outbox;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("de.juplo.kafka.outbox")
+@Getter
+@Setter
+public class ApplicationProperties
+{
+  String bootstrapServers = "localhost:9092";
+  String topic = "outbox";
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxItem.java
new file mode 100644 (file)
index 0000000..99deafa
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+
+@Data
+@Value
+@Builder
+public class OutboxItem
+{
+  private final Long sequenceNumber;
+  private final String key;
+  private final String value;
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxProducer.java
new file mode 100644 (file)
index 0000000..627ca05
--- /dev/null
@@ -0,0 +1,107 @@
+package de.juplo.kafka.outbox;
+
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PreDestroy;
+
+
+@Component
+public class OutboxProducer
+{
+  final static Logger LOG = LoggerFactory.getLogger(OutboxProducer.class);
+
+
+  private final OutboxRepository repository;
+  private final KafkaProducer<String, String> producer;
+  private final String topic;
+
+  private long sequenceNumber = 0l;
+
+  public OutboxProducer(
+      ApplicationProperties properties,
+      OutboxRepository repository)
+  {
+    this.repository = repository;
+
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.bootstrapServers);
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+
+    this.producer = new KafkaProducer<>(props);
+    this.topic = properties.topic;
+  }
+
+  @Scheduled(fixedDelay = 500)
+  public void poll()
+  {
+    List<OutboxItem> items;
+    do
+    {
+      items = repository.fetch(sequenceNumber);
+      LOG.debug("Polled {} new items", items.size());
+      for (OutboxItem item : items)
+        send(item);
+    }
+    while (items.size() > 0);
+  }
+
+  void send(OutboxItem item)
+  {
+    final ProducerRecord<String, String> record =
+        new ProducerRecord<>(topic, item.getKey(), item.getValue());
+
+    sequenceNumber = item.getSequenceNumber();
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+    buffer.putLong(item.getSequenceNumber());
+    record.headers().add("SEQ#", buffer.array());
+
+    producer.send(record, (metadata, e) ->
+    {
+      if (metadata != null)
+      {
+        int deleted = repository.delete(item.getSequenceNumber());
+        LOG.info(
+            "{}/{}:{} - {}:{}={} - deleted: {}",
+            metadata.topic(),
+            metadata.partition(),
+            metadata.offset(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            deleted);
+      }
+      else
+      {
+        // HANDLE ERROR
+        LOG.error(
+            "{}/{} - {}:{}={} -> ",
+            record.topic(),
+            record.partition(),
+            item.getSequenceNumber(),
+            record.key(),
+            record.value(),
+            e);
+      }
+    });
+  }
+
+
+  @PreDestroy
+  public void close()
+  {
+    producer.close(Duration.ofSeconds(5));
+  }
+}
diff --git a/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java b/outbox/src/main/java/de/juplo/kafka/outbox/OutboxRepository.java
new file mode 100644 (file)
index 0000000..03a68ef
--- /dev/null
@@ -0,0 +1,62 @@
+package de.juplo.kafka.outbox;
+
+import lombok.AllArgsConstructor;
+import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+import java.sql.Timestamp;
+import java.time.ZonedDateTime;
+import java.util.List;
+
+
+@Repository
+@AllArgsConstructor
+public class OutboxRepository
+{
+  private static final String SQL_QUERY =
+      "SELECT id, key, value FROM outbox WHERE id > :sequenceNumber ORDER BY id ASC";
+  private static final String SQL_UPDATE =
+      "INSERT INTO outbox (key, value, issued) VALUES (:key, :value, :issued)";
+  private static final String SQL_DELETE =
+      "DELETE FROM outbox WHERE id = :id";
+
+  private final NamedParameterJdbcTemplate jdbcTemplate;
+
+
+  public void save(String key, String value, ZonedDateTime issued)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("key", key);
+    parameters.addValue("value", value);
+    parameters.addValue("issued", Timestamp.from(issued.toInstant()));
+    jdbcTemplate.update(SQL_UPDATE, parameters);
+  }
+
+  public int delete(Long id)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("id", id);
+    return jdbcTemplate.update(SQL_DELETE, parameters);
+  }
+
+  public List<OutboxItem> fetch(Long sequenceNumber)
+  {
+    MapSqlParameterSource parameters = new MapSqlParameterSource();
+    parameters.addValue("sequenceNumber", sequenceNumber);
+    return
+        jdbcTemplate.query(
+            SQL_QUERY,
+            parameters,
+            (resultSet, rowNumber) ->
+            {
+              return
+                  OutboxItem
+                      .builder()
+                      .sequenceNumber(resultSet.getLong(1))
+                      .key(resultSet.getString(2))
+                      .value(resultSet.getString(3))
+                      .build();
+            });
+  }
+}
diff --git a/outbox/src/main/resources/application.yml b/outbox/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..2a8502a
--- /dev/null
@@ -0,0 +1,53 @@
+management:
+  endpoints:
+    web:
+      exposure:
+        include: "*"
+
+spring:
+  flyway:
+    locations: classpath:db/migration/h2
+
+logging:
+  level:
+    de:
+      juplo:
+        kafka:
+          outbox: DEBUG
+
+---
+
+spring:
+  profiles: prod
+
+  datasource:
+    url: jdbc:postgresql://postgres:5432/outbox
+    username: outbox
+    password: outbox
+  flyway:
+    locations: classpath:db/migration/postgres
+
+de:
+  juplo:
+    kafka:
+      outbox:
+        bootstrap-servers: kafka:9093
+
+---
+
+spring:
+  profiles: dev
+
+  datasource:
+    url: jdbc:postgresql://localhost:5432/outbox
+    username: outbox
+    password: outbox
+  flyway:
+    locations: classpath:db/migration/postgres
+
+de:
+  juplo:
+    kafka:
+      outbox:
+        bootstrap-servers: localhost:9092
+
diff --git a/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java b/outbox/src/test/java/de/juplo/kafka/outbox/ApplicationTests.java
new file mode 100644 (file)
index 0000000..cb5bc43
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.outbox;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class ApplicationTests
+{
+  @Test
+  public void contextLoads()
+  {
+  }
+}
diff --git a/pom.xml b/pom.xml
index 3e46567..8825add 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,6 +15,7 @@
 
   <modules>
     <module>jdbc</module>
+    <module>outbox</module>
   </modules>
 
 </project>